上篇文章说了,kafka位移提交通过enable.auto.commit控制手动提交还是自动提交,手动提交又分为异步提交和同步提交,还可以指定分区进行提交,默认是提交给所有分区。手动提交可以对应不同的业务场景,当需要业务全部处理完才提交位移,则可以选择手动提交,但这时候需要做幂等性处理,因为当业务执行完毕,但系统宕机,这时候consumer重启则因为位移没提交会重复消费之前的数据。
一、Rebalance
Rebalance是什么?
它本质是一组协议,规定了consumer group如何达成一致性来分配订阅所有分区的。假设有20个consumer,需要订阅100个分区的topic,这时候就会每个consumer会平均订阅5个分区,这个过程就是rebalace。
和旧版本依托于zookeeper不同,新版本consumer使用了kafka内置一个权限的协调协议(group coordination protocol)。Kafka的某个broker会被选举为组协调者(group coordinator),他负责对组的状态进行管理,他的主要职责是当新成员到达时促进组内所有的成员重新分配,即coordinator负责rebalance。
什么时候他会触发rebalance呢?
- 组成员发生变化,比如新的consumer加入组,或者有consumer离开组,或者consumer崩溃时候触发。
- 消费组订阅的topic发生变化。
- 组订阅的topic分区发生变更。
真实应用场景中引用rebalance最常见原因违背了第一条件,特别是consumer崩溃情况,崩溃不一定是consumer进程宕机或者挂掉,当consumer无法在指定时间内完成消息处理时候,那么coordinator则会认为consumer已经崩溃,从而引发新一轮的rebalance。当group程序下业务处理逻辑过重,这时候就会导致消费超时,从而导致coordinator认为consumer挂掉,引发rebalance,这时候就要注意这些参数的配置request.timeout.ms、max.poll.interval.ms、max.poll.records等。
Rebalance分区配置?
之前提到过rebalance时group下所有consumer会一起协调共同参与分区分配,kafka新版本consumer默认提供了三种分区策略,分别是range、round-robin、sticky。
Range策略主要是基于范围思想,它将单个topic的所有分区按照顺序排列,然后把这些分区划分为固定大小的分区并且依次分给各个consumer。而round-robin策略则会把所有topic的所有分区顺序摆开,然后轮询式的分配给各个consumer。最新发布的sticky策略有效避免上诉两种策略完全无视历史分配方案缺陷,采用“有粘性”对所有consumer实例进行分配,可以最大程度的避免分配倾斜。
新版本consumer默认的分配策略是range,用户根据consumer参数partition.assignment.strategy来进行设置,另外也可以通过自定义来分配策略。
Rebalance协议:
前面说了rebalance本质就是一组协议,group与coordinator共同使用这组协议来完成group的rebalance,最新版本的kafka中提供下面五种协议来处理rebalance。
Joingroup请求:consumer请求加入组。
SyncGroup请求:group leader吧分配方案同步更新到组内所有成员中。
Heartbeat请求:consumer定期向coordinator汇报心跳表明依然存活。
LeaveGroup请求:consumer主动通知coordinator该consume即将离组。
DescribeGroup请求:查看组的所有信息,包括成员信息,协议信息,分配方案,订阅信息。该请求类型主要提供管理员使用。Coordinator不使用该请求执行rebalance。
在rebalance过程中,coordinator主要处理consumer发过来的joinGroup和syncGroup请求,当consumer主动离组时会发送leaveGroup请求给coordinator。
在成功rebalance后,组内所有consumer都需要定期向coordinator发送heartbeat请求,而每个consumer也是根据heartBeat请求的响应中是否包含rebalance_in_progress来判断当前group是否开启了新一轮的rebalance。
rebalance监听器:
在位移提交章节中,consumer默认在新版本是把位移提交到_consumer_offsets中。其实kafka也支持把位移提交到外部存储中,比如数据库。若要实现这个功能,则必须使用rebalance监听器,而使用监听器的前提是用户必须使用consumer group。如果使用独立的consumer或者直接手动分配分区,那么rebalance监听是无效的。
多线程实例消费
如前所述,kafkaConsumer是非线程安全的,他和kafkaProducer不同,后者是线程安全的,因此可以在多个线程中使用同一个kafkaProducer实例,而且这样的效率是比每个线程维护一个kafkaProducer更高。
Consumer group分为 每个线程单独维护一个kafkaConsumer,和 单kafkaConsumer+多work线程。
两者区别是,后者在全局维护一个或者多个kafkaConsumer实例执行消息获取任务。使用全局的kafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的work线程执行工作,之后work线程完成处理上报位移状态,由全局的consumer提交位移。
那么他们的优缺点呢?
每个线程维护专属consumer:优点:实现简单,速度快,因为无线程之间的交互管理,方便管理位移,易于维护分区间的消费顺序。缺点:socker连接开销大;consumer受限与topic分区,扩展性差。Broker端处理负载高(因为发往broker请求多);rebalance可能性大。
单consumer+多worker模式:优点:消息获取处理解耦;扩展性强,独立扩展consumer数量和worker。缺点:实现负载;难以维护分区内的顺序消息;处理链路变长,导致位移管理困难;worker线程异常导致数据丢失。
独立consumer
前面说的都是group consumer消费者组形式出现,group自动实行分区分配和rebalance。对于需要多个consumer共同读取某个topic来说,使用group非常方便。但有的时候用户需要精准消费某个consumer消费某个分区。
- 如果进程自己维护分区状态,那么它就可以固定消费某些分区而不用担心状态丢失问题。
- 如果进程本身已经是高可用且能够自动重启恢复错误,那么它就不需要让kafka来帮它完成错误检测和状态恢复。
以上两种情况中consumer group都无用武之地,而独立consumer更合适(standlone consumer)。
使用standalone方法就是调用kafkaConsumer.assign,前面我们订阅则是使用kafkaConsumer.subscribe。