当前位置 : 主页 > 编程语言 > java >

【kafka消息队列】Consumer消费者

来源:互联网 收集:自由互联 发布时间:2022-08-15
1、为什么会有消费者组? 单个消费者消费能力有限,消费能力可能跟不上生产消息的速度。通过指定消费者组,并不断往消费者组里增加消费者,增强消费处理能力。当然,若消费能

1、为什么会有消费者组?

单个消费者消费能力有限,消费能力可能跟不上生产消息的速度。通过指定消费者组,并不断往消费者组里增加消费者,增强消费处理能力。当然,若消费能力过强,而生产消费速度太慢,可以减少消费者组中消费者数量,避免消费者空转。综上消费者组能够实现消费能力的动态伸缩。【kafka消息队列】Consumer消费者_数据

由上图可知以下两点:

第一:topic中的一个partition只能由同一个消费者组中的一个消费者消费

由此可见,消费者组增加或减少消费者之后,消费者

第二:一个topic能够被多个消费者组消费,且消费者组之间互不干扰。


2、什么是再均衡(Rebalance)

再均衡(Rebalance):消费者组增加或减少消费者之后,主题中的分区会重新分配给不同的消费者。

【kafka消息队列】Consumer消费者_数据_02

Rebalance能够调整topic中分区与消费者的对应关系,当某个消费者宕机或者新增加消费者中,Rebalance能够动态调整分区与消费者的对应关系,给消费者组提供高可用性与伸缩性。

Rebalance带了高可用性与伸缩性的优点,那缺点是什么?

JVM中有个stop the world的概念,就是在进行垃圾回收时,所有线程会停止工作。Rebalance与stop the world类似,即再平衡期间,所有的消费者都会停止消费。

3、消费者如何确定要消费哪条消息?


1、 什么是偏移量offset? 

偏移量offset:表示消息在partition中的偏移量,记录它当前消费到了分区的哪个位置上,也是代表该消息的唯一序号。

2、偏移量保存在哪里?

偏移量存储在​​​_consumer_offset​​​ 这个topic中,由消费者将偏移量以消息的方式发送到​​_consumer_offset​​中。那么当发生再平衡(Rebalance)时,新增加的消费者能够知道应该消费分区里的哪条消息。

2、如何提交偏移量?

自动提交

将​​enable.auto.commit​​ 被设置为true,那么消费者每隔5s会自动把从 poll() 方法轮询到的最大偏移量提交上去。

这样会存在一个问题:消息重复消费。

比如,此时是8点整,刚提交完最大偏移量,5秒后要重新提交一次偏移量,但消费者在消费3秒后就宕机了。当Rebalance后,新的消费者会获取8点整提交的偏移量。所以宕机的消费者消费的3秒数据,新的消费者也会重新消费。

同步提交

同步提交( ​​commitSync()​):需要将​​​​enable.auto.commit​​​ 被设置为false,然后调用 ​​commitSync()​。它会直到偏移量被成功提交才会返回。如果提交过程中出现异常,该方法会将异常信息抛出。

同步的问题:需要等待偏移量被成功提交后才可以返回,而且失败后会进行重试,可能因为一些网络原因而阻塞进程,影响整个应用的TPS。

异步提交

异步提交(​​commitAsync()​​ :也需要将​​​​​enable.auto.commit​​​​​ 被设置为false,然后调用 ​​​​​​commitAsync()​​

异步的问题:异步提交不会重试,可能出现offset不是最新值,而发生消息重复消费。

同步和异步组合提交

一般情况下,执行异步的过程中,偶尔失败了,但下次再提交基本都会成功。但如果是在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

​需要将​commitAsync()​​和 ​​​commitSync()​​进行组合使用。对异步的​​commitAsync()​​进行try catch捕获,一旦发生异常再执行同步的​​​​​​commitSync()​​​​。

上一篇:Java中如何遍历字符串呢?
下一篇:没有了
网友评论