大家好,又见面了。
Apache kafka是主流的消息中间件的一种,本文对kafka的相关概念、原理、使用关键注意事项等进行总结说明。
1. kafka关键概念与术语
1.1 简单的例子说明kafka的使用场景
Apache kafka是消息中间件的一种,我发现很多人不知道消息中间件是什么,在开始学习之前,我这边就先简单的解释一下什么是消息中间件,只是粗略的讲解,目前kafka已经可以做更多的事情。
举个例子:
上面的例子里面:
1.2 kafka名词解释
1.2.1 通俗点的名词概念说明
大家会看到一些关于kafka的名词,比如topic、producer、consumer、broker,对应前面的例子里面,可以解释如下:
大家一定要学会抽象的去思考,上面只是属于业务的角度,如果从技术角度,topic标签实际就是队列,生产者把所有“鸡蛋(消息)”都放到对应的队列里了,消费者到指定的队列里取。
1.2.2 专业性的术语含义解释说明
Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
Producer
发布消息的对象称之为主题生产者(Kafka topic producer) 生产者往某个Topic上发布消息。生产者也负责选择发布到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。开发者负责如何选择分区的算法。
Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
1.3 Kafka作为一个分布式的流平台,这到底意味着什么?
一个流处理平台具有三个关键能力:
1.4 什么是kafka的优势?它主要应用于2大类应用:
构建实时的流数据管道,可靠地获取系统和应用程序之间的数据。 构建实时流的应用程序,对数据流进行转换或反应。
1.5 Kafka集群相关的几个概念:
kafka作为一个集群运行在一个或多个服务器上。 kafka集群存储的消息是以topic为类别记录的。 每个消息(也叫记录record,我习惯叫消息)是由一个key,一个value和时间戳构成。
1.5.1 Kafka 存储在文件系统上
Kafka 的消息是存在于文件系统之上的。Kafka 高度依赖文件系统来存储和缓存消息,一般的人认为 “磁盘是缓慢的”,所以对这样的设计持有怀疑态度。实际上,磁盘比人们预想的快很多也慢很多,这取决于它们如何被使用;一个好的磁盘结构设计可以使之跟网络速度一样快。
现代的操作系统针对磁盘的读写已经做了一些优化方案来加快磁盘的访问速度。比如,预读会提前将一个比较大的磁盘快读入内存。后写会将很多小的逻辑写操作合并起来组合成一个大的物理写操作。并且,操作系统还会将主内存剩余的所有空闲内存空间都用作磁盘缓存,所有的磁盘读写操作都会经过统一的磁盘缓存(除了直接 I/O 会绕过磁盘缓存)。
综合这几点优化特点,如果是针对磁盘的顺序访问,某些情况下它可能比随机的内存访问都要快,甚至可以和网络的速度相差无几。
1.5.2 主题(Topic)与分区(Partition)
在 Kafka 中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。
我们使用一个生活中的例子来说明:
现在 A 城市生产的某商品需要运输到 B 城市,走的是公路。那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。
所以我们现在引入分区(Partition)的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。
Topic 其实是逻辑上的概念,面向消费者和生产者,物理上存储的其实是 Partition,每一个 Partition 最终对应一个目录,里面存储所有的消息和索引文件。默认情况下,每一个 Topic 在创建时如果不指定 Partition 数量时只会创建 1 个 Partition。
比如,我创建了一个 Topic 名字为 test ,没有指定 Partition 的数量,那么会默认创建一个 test-0 的文件夹,这里的命名规则是:-。
任何发布到 Partition 的消息都会被追加到 Partition 数据文件的尾部,这样的顺序写磁盘操作让 Kafka 的效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是 Kafka 高吞吐率的一个很重要的保证)。每一条消息被发送到 Broker 中,会根据 Partition 规则选择被存储到哪一个 Partition。如果 Partition 规则设置的合理,所有消息可以均匀分布到不同的 Partition中。
1.5.3 Broker 和集群(Cluster)
一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。(现在动不动就百万量级,我特地去查了一把,好像确实集群的情况下吞吐量挺高的。)
若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader。当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。
1.6 kafka四个核心API
Client和Server之间的通讯,是通过一条简单、高性能并且和开发语言无关的TCP协议。并且该协议保持与老版本的兼容。Kafka提供了Java Client(客户端)。除了Java Client外,还有非常多的其它编程语言的Client。
2 消息存取机制
2.1 主题和日志 (Topic和Log)
Topic是发布的消息的类别或者种子Feed名。对于每一个Topic,Kafka集群维护这一个分区的log,就像下图中的示例:
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
Kafka集群保持所有的消息,直到它们过期, 无论消息是否被消费了。 实际上消费者所持有的仅有的元数据就是这个偏移量,也就是消费者在这个log中的位置。 这个偏移量由消费者控制:正常情况当消费者消费消息的时候,偏移量也线性的的增加。但是实际偏移量由消费者控制,消费者可以将偏移量重置为更老的一个偏移量,重新读取消息。 可以看到这种设计对消费者来说操作自如, 一个消费者的操作不会影响其它消费者对此log的处理。
2.2 消息消费模式(消费者组)
通常来讲,消息模型可以分为两种,队列和发布-订阅式。
队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。
Kafka使用的是发布-订阅式的处理机制,但是kafka也可以间接的支持队列机制。 因为kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)—— 消费者用一个消费者组名标记自己,消息以发布-订阅的模式发送给所有订阅的组中,然后在组内,1条消息只有1个消费者会去负责处理。
这样,如果要实现队列模式,只需要将所有的consumer都加入到同一个group中,保持某个topic仅有1个group订阅,则消息会发送到此group中,并由其中的一个消费者负责处理。
假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
更通用的, 我们可以创建一些消费者组作为逻辑上的订阅者。每个组包含数目不等的消费者, 一个组内多个消费者可以用来扩展性能和容错。正如下图所示:
2个kafka集群托管4个分区(P0-P3),2个消费者组,消费组A有2个消费者实例,消费组B有4个。
2.3 分布式(Distribution)
Log的分区被分布到集群中的多个服务器上。每个服务器处理它分到的分区。
根据配置每个分区还可以复制到其它服务器作为备份容错。 每个分区有一个leader,零或多个follower。 Leader处理此分区的所有的读写请求,而follower被动的复制数据。
如果leader宕机,其它的一个follower会被推举为新的leader。
一台服务器可能同时是一个分区的leader,另一个分区的follower。 这样可以平衡负载,避免所有的请求都只让一台或者某几台服务器处理。
3 消息保证
3.1 顺序保证
生产者发送到一个特定的Topic的分区上,消息将会按照它们发送的顺序依次加入,也就是说,如果一个消息M1和M2使用相同的producer发送,M1先发送,那么M1将比M2的offset低,并且优先的出现在日志中,消费者收到的消息也是此顺序。
kafka有比传统的消息系统更强的顺序保证。 传统的消息系统按顺序保存数据,如果多个消费者从队列消费,则服务器按存储的顺序发送消息,但是,尽管服务器按顺序发送,消息异步传递到消费者,因此消息可能乱序到达消费者。这意味着消息存在并行消费的情况,顺序就无法保证。消息系统常常通过仅设1个消费者来解决这个问题,但是这意味着没用到并行处理。
kafka做的更好。通过并行topic的parition —— kafka提供了顺序保证和负载均衡。每个partition仅由同一个消费者组中的一个消费者消费到。并确保消费者是该partition的唯一消费者,并按顺序消费数据。每个topic有多个分区,则需要对多个消费者做负载均衡,但请注意,相同的消费者组中不能有比分区更多的消费者,否则多出的消费者一直处于空等待,不会收到消息。
通俗的讲: 一个topic中对应多个parition分区,某个消费者组订阅此topic之后,kafka会保证给此消费者组分发消息的时候,从不同的分区分发消息到组里面,保证同一个组里面同一个分区的消息只有1条正在被处理。如果某个分区的消息已经在组里某个消费者正在处理,则kafka就不会再从此分区中拿消息发送给这个消费者组里,而是从其他分区里面取消息发给这个消费者组。
3.2 可靠性保证
如果一个Topic配置了复制因子(replication factor)为N, 那么可以允许N-1服务器宕机而不丢失任何已经提交(committed)的消息。
关系型数据库的可靠性保证是 ACID,也就是原子性(Atomicity)、一致性(Consistency)、隔离性(Isolation)和持久性(Durability)。
Kafka 中的可靠性保证有如下 4 点:
4 底层存储设计
假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「Topic1」和「Topic2」,Partition 数量分别为 1、2。
那么我们的根目录下就会创建如下三个文件夹:
| --topic1-0 | --topic2-0 | --topic2-1 在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录。
而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。
现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:
| --topic1-0 | --00000000000000000000.index | --00000000000000000000.log | --00000000000000368769.index | --00000000000000368769.log | --00000000000000737337.index | --00000000000000737337.log | --00000000000001105814.index | --00000000000001105814.log | --topic2-0 | --topic2-1Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。
数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
其中以索引文件中元数据 <3, 497> 为例,依次在数据文件中表示第 3 个 Message(在全局 Partition 表示第 368769 + 3 = 368772个 message)以及该消息的物理偏移地址为 497。
注意该 Index 文件并不是从0开始,也不是每次递增 1 的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引。它减少了索引文件大小,使得能够把 Index 映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。因为其文件名为上一个 Segment 最后一条消息的 Offset ,所以当需要查找一个指定 Offset 的 Message 时,通过在所有 Segment 的文件名中进行二分查找就能找到它归属的 Segment。再在其 Index 文件中找到其对应到文件上的物理位置,就能拿出该 Message。由于消息在 Partition 的 Segment 数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment 文件),这是顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。
Kafka 是如何准确的知道 Message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 Message 都包含了以下三个属性:
5 生产者&消费者设计
5.1 生产者设计概要
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到 Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过 Kafka 返回。
对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定。
不同的业务需要使用不同的写入方式和配置。具体的方式我们在这里不做讨论,现在先看下生产者写消息的基本流程:
流程如下:
5.2 消费者设计概要
5.2.1 消费者与消费组
假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。
我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。
假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息,如下所示:
如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息,如下所示:
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息,如下所示:
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息:
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。
对于上面的例子,假如我们新增了一个新的消费组 G2,而这个消费组有两个消费者,那么会是这样的:
在这个场景中,消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息,在逻辑意义上来说它们属于不同的应用。
最后,总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
5.2.2 消费组与分区重平衡
当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的。另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(Rebalance)。重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。
消费者通过定期发送心跳(Hearbeat)到一个作为组协调者(Group Coordinator)的 Broker 来保持在消费组内存活。这个 Broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费。通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
5.2.3 Partition 与消费模型
上面提到,Kafka 中一个 Topic 中的消息是被打散分配在多个 Partition(分区)中存储的, Consumer Group 在消费时需要从不同的 Partition 获取消息,那最终如何重建出 Topic 中消息的顺序呢?
答案是:没有办法。Kafka 只会保证在 Partition 内消息是有序的,而不管全局的情况。
下一个问题是:Partition 中的消息可以被(不同的 Consumer Group)多次消费,那 Partition中被消费的消息是何时删除的?Partition 又是如何知道一个 Consumer Group 当前消费的位置呢?
无论消息是否被消费,除非消息到期 Partition 从不删除消息。例如设置保留时间为 2 天,则消息发布 2 天内任何 Group 都可以消费,2 天后,消息自动被删除。Partition 会为每个 Consumer Group 保存一个偏移量,记录 Group 消费到的位置。如下图:
5.2.4 为什么 Kafka 是 Pull 模型
消费者应该向 Broker 要数据(Pull)还是 Broker 向消费者推送数据(Push)?
作为一个消息系统,Kafka 遵循了传统的方式,选择由 Producer 向 Broker Push 消息并由 Consumer 从 Broker Pull 消息。一些 logging-centric system,比如 Facebook 的 Scribe 和 Cloudera 的 Flume,采用 Push 模式。事实上,Push 模式和 Pull 模式各有优劣。
Push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 Broker 决定的。Push 模式的目标是尽可能以最快速度传递消息,但是这样很容易造成 Consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 Pull 模式则可以根据 Consumer 的消费能力以适当的速率消费消息。
对于** Kafka 而言,Pull 模式更合适**。Pull 模式可简化 Broker 的设计,Consumer 可自主控制消费消息的速率。同时 Consumer 可以自己控制消费方式,即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
6 kafka应用场景
6.1 消息
kafka更好的替换传统的消息系统,消息系统被用于各种场景(解耦数据生产者,缓存未处理的消息,等),与大多数消息系统比较,kafka有更好的吞吐量,内置分区,副本和故障转移,这有利于处理大规模的消息。根据我们的经验,消息往往用于较低的吞吐量,但需要低的端到端延迟,并需要提供强大的耐用性的保证。
6.2 网站活动追踪
kafka原本的使用场景:用户的活动追踪,网站的活动(网页游览,搜索或其他用户的操作信息)发布到不同的话题中心,这些消息可实时处理,实时监测,也可加载到Hadoop或离线处理数据仓库。
每个用户页面视图都会产生非常高的量。
6.3 指标
kafka也常常用于监测数据。分布式应用程序生成的统计数据集中聚合。
6.4 日志聚合
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器中收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据更清晰地抽象为消息流。这允许更低延迟的处理并更容易支持多个数据源和分布式数据消费。
6.5 流处理
kafka中消息处理一般包含多个阶段。其中原始输入数据是从kafka主题消费的,然后汇总,丰富,或者以其他的方式处理转化为新主题,例如,一个推荐新闻文章,文章内容可能从“articles”主题获取;然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流。从0.10.0.0开始,轻量,但功能强大的流处理,就可以这样进行数据处理了。
除了Kafka Streams,还有Apache Storm和Apache Samza可选择。
6.6 事件采集
事件采集是一种应用程序的设计风格,其中状态的变化根据时间的顺序记录下来,kafka支持这种非常大的存储日志数据的场景。
6.7 提交日志
kafka可以作为一种分布式的外部日志,可帮助节点之间复制数据,并作为失败的节点来恢复数据重新同步,kafka的日志压缩功能很好的支持这种用法,这种用法类似于Apacha BookKeeper项目。
我是悟道,聊技术、又不仅仅聊技术~
如果觉得有用,请点赞 + 关注让我感受到您的支持。也可以关注下我的公众号【架构悟道】,获取更及时的更新。
期待与你一起探讨,一起成长为更好的自己。