broker-0
broker-1
- 一个kafka集群由多个broker组成,才能实现负载均衡,容错
- broker无状态,通过zookeeper来维护集群状态
- 一个kafka的broker可以处理10万/秒读写,每个broker都可以处理TB消息而不影响性能
- 是协调管理kafka集群的
- 存储了kafka的元数据,(多少个topic,partion啥的)
- zookeeper用于通知生产者和消费者kafka集群中有新的broker加入,或者kafka集群中有broker故障
消费者组的概念
1个topic,里面就1个partion,partion的名字是partion-0
groupId是全局唯一的,
有2个消费者组,一个叫做consumer-groupA,一个叫做consumer-groupB
在每个消费者组里面都有一堆消费者进程
消费者组A,消费者组B都可以同时消费互不影响
并且都可以消费到队列中的完整数据,
kafka队列中有0-9,10个数,消费者组A拉下来的是0-9,消费者组B拉下来的数据也是0-9
topic:test
partion数量:1
消费者组:consumerGroupA
消费者:consumer-0,consumer-1
现象:consumer-0可以正常拉取到kafka中的消息,consumer-1拉消息阻塞,没有正常的消息回显
-
总结:消费者组A里面有4个消费者,但是因为名为test的topic就只有1个partion分区,所以消费者组里面的4个消费者只有一个消费者(假设是consumer-A-0)能消费到partion-0,其余消费者等待,当consumer-A-0的消费者进程挂掉后,可能后面几个kafka进程抢占消费权限,可能consumer-A-1抢到了,接着消费partion-0上的数据
-
扩展:共同消费topic的数据的前提是,partion是多个不能是1个,如果是1个的话,这个消费者组里就只有1个消费者才能消费到数据,如果有2个partion,就可以再consumerGroupA组内的2个消费者都消费到,消费的是对应关系,consumer-0消费partion-0的数据,consumer-1消费partion-1的数据
-
5个partion的话,在一个consumerGroupA消费者组中,就一个消费者consunmer-0,它会消费者5个partion(partion-0,partion-1,partion-2,partion-3,partion-5)上的所有数据,是kafka集群后台专门设计实现的.
kafka中的分区,使用5个分区
kafka的分布式特性就依赖分区来搞定的
只考虑分区不考虑副本的情况下
10个mq消息ProductId1....ProductId10
则可能出现的情况时
- ProductId1存到了partion-0上
- ProductId2存到了partion-1上
- ProductId3存到了partion-2上
- ProductId4存到了partion-3上
- ProductId5存到了partion-4上
- ProductId6存到了partion-0上
- ProductId7存到了partion-1上
- ProductId8存到了partion-2上
- ProductId9存到了partion-3上
- ProductId10存到了partion-4上
分布式存储,将数据打散了.分别存到了不同的partion上,不同的partion又分别处在不同的broker上,所以就有了自动的负载均衡机制,不会讲数据只打到一个机器上,造成假分布式系统
解决partion的容错问题,当partion-0挂掉后,副本可以提供数据的同时还可以自己充当正常partion的角色,进行读写
以下是三副本的数据存储结构
一个分区至少大于1个的副本
主题,是一个逻辑的概念
消息都存在topic里面
- 一个kafka集群可以包含多个topic
- 一个Topic里包含多个partion
一般一个topic中消息是有结构的,一般一个主题包含了一类消息
偏移量
当前数据采集到哪了,针对到partion的粒度
消费到3的位置了
- offset记录这下一条将要发送给consumer的消息序号图例是4
- 默认kafka中的offset是存储到zookeeper中的
这个offset是kafka集群自己维护的,客户端只要使用高级api就可
每一个partion上的offset都是一个递增的id
offset是要和partion关联的
图例就是partion最新的offset
kafka中的topic具体能抗多少并发是和我们的分区partion数有关系的
如果是2个分区的时候,当前kafka集群的负载依然很高
消费不过来,可以新增更多的分区,让我们消费的更快
kafka生产者的幂等性
enable.idempotence=true
kafka引入了ProducerId
和Sequence Number
概念
- 当producer发送消息给broker
- broker保存消息到partion都正常结束
- 最后一步broker返回ack给producer时失败
- 此时客户端会重试发送就涉及到了幂等性问题,如果有幂等性问题,kafka里又会保存一条一模一样的数据
- PID: 每个producer初始化的时候,就有一个唯一的ID成为ProducerId,但是这个id对用户来说是透明的,生产者唯一编号
- Sequence Number: 针对每个生产者,发送到指定topic的partion消息都对应一个从-开始递增的sequence Number
- 发送消息会连着pid 和sequence Number
- 如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid,sequence number判断是否需要再保存数据
- 判断条件: 生产者发送过来的sequence number是否小于partion中间件
如果3个分区
- 轮训策略(默认策略),场景是key是null的时候采用这种
最大限度保证消息分布均匀
-
随机策略
-
按key分配策略
key.hash() % 获得分区数量
12.kafka乱序问题可能会出现数据切斜问题
如果某个key包含了大量的数据,key值一样,所有的数据都将分到一个分区,分区消息数量远远大于其他分区
分区写入策略都会有这个问题,按key分区可以实现局部有序
某一个partion里的消息是有序的
轮训算法或者按key分配,多个分区的情况下是无法保证完全有序的.只能保证局部有序
- Rebalance 再均衡,确保consumer group下的所有consumer如何达成一致,分配定于的topic每个分区的机制
- 再均衡: 在某些情况下,消费者组中的消费者消费的分区会产生变化,会导致消费者分配不均匀(例如有2个消费者消费3个partion,因为某个partion崩溃了,还有一个消费者当前没有分区要消峰),kafka consumer group就会启动rebalance机制,重新平衡这个consumer group内的消费者消费分区分配
- 触发时机
- 消费者数量发生变化
- 新增消费者
触发机制
触发时机:
- 某个consumer崩溃了,死掉了.新的consumer加入到消费者组中
- 订阅的topic个数发生变化
- 订阅的partion分区个数发生变化
Rebalance的不良影响
- 发生rebalance时,consumer group下所有的consumer都会协调在一起共同参与,kafka使用分配策略尽可能达到最公平的分配
- rebalance过程会对consumer group 产生非常严重的影响,rebalance过程所有的消费者都停止工作,直到rebalance结束,直到每个消费者都已经被成功分配到所需要消费的分区为止,rebalance结束
原则:保证每个消费者尽可能的均衡消费分区的数据,不能出现某个消费者消费分区数量特别多,某个消费者消费的分区数量特别少
-
Range分配策略
默认的分配策略,可以确保每个消费者消费的分区数量是均衡的
range是根据topic来定的
算法公式
n = 分区数量/ 消费者数量
m = 分区水浪 % 消费者数量
前m个消费者消费n+1个
剩余消费者消费n个
图例:8个partion,一个consumerGroup组中有3个consumer
n = 8 / 3 // 2
m = 8 % 3 // 2
得出结论:前2个消费者各消费3个,剩余消费者消费2个
-
轮训分配策略
RoundRobin轮询策略
消费组内的所有消费者以及消费者订阅的所有topic的partion
按照字典排序(topic和分区的hashcode进行排序),通过轮训方式逐个将分区分配给每个消费者
图例2个topic,共8个分区,放一起进行分配策略
-
粘性分配策略
kafka 0.11.x开始引入
- 分区尽可能均匀
- 发生rebalance时,分区的分配尽可能与上一次分配保持相同
- 没有rebalance时,striky粘性分配和roundRobin分配策略类似
保留之前的分配结果,如果挂了一个consumer后,有2个partion没人消费,rebalance后,将没人消费的2个partion会重新均分给在活的consumer消费者
副本的目的是冗余备份,当broker上 的分区数据丢失,依然保证高可用
主要是acks参数
acks参数表示生产者生产消息时,写入到副本的严格程序,决定了生产者如何在性能和可靠性之间做取舍
生产者这测可以配置
- ack:0 不等待broker确认,直接发送下一条数据,性能最高,但可能会存在丢失数据的情况
- ack:1 等待leader副本确认接收后,才会发送下一条数据,性能中等
一个partion中只有一个leader才能写,其余的faliover只能复制,不能写入消息 - ack:all/-1 等待所有的副本都同步完数据后,才会发送下一条数据,性能最慢,(leader分区写入确保消息对应副本failover都复制完毕后才算成功)
如何选择:如果要求性能最高,一部分丢失数据影响不大,可以选择0/1,如果要求数据一定不能丢,就得配置-1/all
16.Leaderleader和failover是针对partion来说的
leader能写入消息
failover只能同步消息,当leader挂了,可以选举出来,failover独立自己当leader
还是具有分布式的意义的
一个partion必须有一个leader,如果没有leader就不能写入了
监控kafka的工具
offset,lag变化,partion
1.前提需要开启jmx端口
export JMX_PORT=9999
// 接着跟kafka的启动命令
nohup bin/kafka-server-start.sh config/server.properties &
18.kafka的分区leader和follower
leader和follower是针对分区来的
每个topic都可以配置多个分区及多个副本
每个分区都有一个leader以及1个或多个follower
创建topic时候,kafka会将每个分区的leader均匀的分配在每个broker上
- leader:所有的读写操作都是leader来做的
- follower:leader出现故障的时候,follower会被选举leader选上去
把flollower可以按照不同状态分三类-AR,ISR,OSR
isr: In Sync Replics正在同步的副本数,所有与leader副本保持一定程序同步的副本
ar: 分区的所有副本Assigned Replicas已分配的副本
osr: Out-of-Sync Replias,由于follower副本同步滞后过多的副本
AR=ISR+OSR
正常情况下,所有的follower副本都应该与leader副本保持同步,AR=ISR,OSR集合为空
出现故障就会出现AR!=ISR,OSR集合中有数据
leader和follower统称为为Replicas
选举
leader出问题的时候,迅速选择出来leader,要不partion不可用
- kafka启动时候,会在所有的broker选择一个controller(针对broker来说的)
- 前面的leader和follower是针对partion的
- 创建topic,或者添加分区,修改副本数量的管理任务都是controller来完成的
- kafka分区leader的选举,也是由controller来决定的
controller选举
- kafka集群启动,每个broker都会尝试去zk上注册成为controller,使用的zk的临时节点
- 按时只有一个竞争成功,其他的broker会注册该节点的监视器
- 一旦该临时节点状态变化,就会进行相应的处理
- controller也是搞可用,一旦broker崩溃,其他broker会重新注册为controller
controller选举partion leader
- 所有partion的leader选举都由controller决定
- controller会将leader的改变直接通过rpc方式通知需要为此做出相应的broker
- controller读取到当前分区的isr,只要有一个replica还幸存,就作为其中一个leader否则任意选一个replica作为leader
- 如果该partion的所有replica都冗机,则新的leader为-1,分区就挂了
为什么不通过zk来选举partion的leader
- kafka集群如果业务很多的情况下,会很多partion
- 某个broker冗机,就会出现很多partion都需要选举leader
- 如果使用zk来选举leader,会给zk带来很大压力,所以kafka中leader选举没用zk来实现
perfered replica
- 在isr列表里,第一个replica就是perfered replica 优先的
- 第一个分区存放的broker,肯定就是preferred-replica
1.生产者从zk的某一个节点找到改topic对应的partion的leader
2.leader会写日志文件(broker上的leader将消息顺序写到本地log中)
3.follower同步数据,继续顺序写本地log,并向leader发送ack
4. leader接受到所有的isr的replica的ack后,并向生产者发送ack
通过zk节点可以找到topic中为test的partion的leader地址
写流程:
- 通过zk找partion对应的leader
- producer开始写入数据
- isr里面的follower开始同步数据,并返回给leader ACK
- 返回给producer ack
rabbitmq推模式
kafka拉模式,broker是无状态的
- kafka采用拉取模式,消费者自己记录消费状态,每个消费者互相独立顺序拉取每个分区的消息
- 消费者可以按照任意的顺序消费消息,消费者可以重置到旧的偏移量,重新处理之前已经消费国的消息,或者直接跳到最近的位置从当前的时刻开始消费
- 每个consumer都可以根据分配策略,获得要消费的分区
- 获取到consumer对应的offset(默认从zk获取上一次消费的offset)
- 找到该分区的leader,拉取数据
- 消费者提交offset
读流程:
- 通过zk找partion对应的leader,leader是负责读的
- 通过zk找到消费者对应的offset
- 开始从offset后顺序拉取数据
- 提交offset(自动提交-每隔多少秒提交一次offset,手动提交-放到事务中提交)
- 一个topic由多个partion组成
- 一个分区partion由多个segment组成
- 一个segment由多个文件组成(log,index,timeindex) ,数据其实是存到log里,index记的是索引
- kafka中,消息被定期清理,一次删除一个segment段的日志文件
- kafka日志管理器,会根据kafka配置,来决定哪些文件可以被删除
生产者通过分区leader写入数据后,所有再ISR中follower都会从leader复制数据,这样,可以确保leader崩溃了,其他的follower还有数据,还可以恢复
25.2 生产者数据不丢失- 生产者连接leader写数据,通过ack机制来确保数据已经成功写入,ack机制有三个可选配置
- ack响应为-1,所有节点都收到数据
- ack响应为1,表示leader收到数据
- ack响应为0,生产者只发送数据,不关系数据是否丢失
- 生产者可以采取同步和异步两种方式发送数据
- 同步:发送一批数据给kafka后,等待kafka返回结果
- 异步:发送一批数据给kafka后,只是提供一个回调函数
消费者消费数据,只要记录好offset值,就可以保证数据不丢失
重点保证offset,其中可能会有数据重复消费
场景:consumer消费mq消息,然后进行业务运算,最后保存到mysql的过程
过程中拉下来消息成功,返回ack,然后业务运算,最后保存mysql的时候异常
// 可能也会产生数据丢失,可以mysql中保存数据成功后,再提交offset(手动offset)
(消费消息+提交offset)做原子性保证
消息传递的语意性
at-most-once:最多一次(只管把数据消费到,不管有没有成功,可能会有数据丢失)
at-latest-once:最少一次(有可能会出现重复消费)
exactly-once:仅有一次(事务性的保证消息有且仅被处理一次)
业务逻辑写入mysql是成功的,但是写入zk中的offset是失败的会出现重复消费
业务逻辑时写入mysql是失败的.
业务消息仅被消费一次,幂等性的,可以使用mysql事务,将写入到mysql的数据和offset放到一个mysql事务里,要么成功要么全部失败.使用细粒度的api去读取指定offset位置的消息.kafka的事务保证不了
exactly-once
将offset写入到mysql中,写入offset到mysql成功,提交mysql事务,如果不成功回滚事务
kafka消费者消费速度非常快,由于一些外部io或者产生网络阻塞,就会造成kafka中数据积压.
End-Offset=当前的有多少条消息还未消费
(1)数据写入mysql报错,消费partion的offset一直没有自动提交,所以数据积压严重,不能提交offset,提交完offset,数据就丢了
(2)网络延迟消费失败,topic中出现消息积压,几万条消息没有被消费掉
kafka的消费者超时配置50ms,太小,网络一抖动,就超时了.
kafka的消息存在磁盘中,为了控制磁盘空间,kafka需要不断对过去的消息进行清理
kafka每个分区都有很多日志文件
- 日志删除: 按照指定的策略直接删除不符合条件的日志
- 日志压缩: 按照消息的key进行整合,有相同key的但又不同value值,值保留最后一个版本
配置
log.cleaner.enable true
log.cleanup.policy delete 删除日志
log.cleanup.policy compaction 压缩日志
log.cleanup.policy delete,compact 同时支持删除,压缩
日志删除以segement日志为单位进行定期清理的
基于事假你的保留策略
基于日志大小的保留策略
基于日志起始offset的保留策略
可以设置如果kafka中设置日志的保留天数,默认168小时,7天
删除日志分段,打上deleted后缀
设置topic的删除策略
消费kafka的时候,一定要选择手动提交commit offset.如果不手动提交offset,有可能网络问题提交offset失败的异常捕获不到,会出现重复消费数据问题.ump监控也捕获不到此类异常情况
在一个consumer出现网络问题的时候,kafka集群会自动balance,别的consumer去接管部分流量,别的机器入流量会升高,机器网络恢复后,那边升高的入流量可能会降下来.