一.Kafka的系统架构
如上图所示,Kafka由Producer、Broker、Consumer 以及负责集群管理的 ZooKeeper 构成,各部分功能如下:
- Producer:生产者,负责消息的创建并通过一定的路由策略发送消息到合适的 Broker
- Broker:服务实例,负责消息的长期化、中转等功能;
- Consumer:消费者,负责从 Broker 中拉取(Pull)订阅的消息并举行消费,通常多个消费者构成一个分组,消息只能被同组中的一个消费者消费;
- ZooKeeper:负责 broker、consumer 集群元数据的管理等;
上图消息流转过程中,还有几个特别告急的概念—主题(Topic)、分区(Partition)、分段(segment)、位移(offset)。
- topic:消息主题。Kafka 按 topic 对消息举行分类,我们在收发消息时只需指定 topic。
- partition:分区。为了提拔系统的吞吐,一个 topic 下通常有多个 partition,partition 分布在差别的 Broker 上,用于存储 topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 kafka 提供给了并行的消息处理能力和横向扩容能力。另外,为了提拔系统的可靠性,partition 通常会分组,且每组有一个主 partition、多个副本 partition,且分布在差别的 broker 上,从而起到容灾的作用。
- segment:分段。宏观上看,一个 partition 对应一个日志(Log)。由于生产者生产的消息会不停追加到 log 文件末尾,为防止 log 文件过大导致数据检索服从低下,Kafka 采取了分段和索引机制,将每个 partition 分为多个 segment,同时也便于消息的维护和清理。每个 segment 包含一个 .log 日志文件、两个索引(.index、timeindex)文件以及其他可能的文件。每个 Segment 的数据文件以该段中最小的 offset 为文件名,当查找 offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。
- offset:消息在日志中的位置,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的次序性,不外 offset 并不超过分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
二.Kafka的高可靠性(不丢失)
Kafka 高可靠性的核心是保证消息在传递过程中不丢失,涉及如下核心环节:
- 消息从生产者可靠地发送至 Broker;-- 网络、当地丢数据。
- 发送到 Broker 的消息可靠长期化;-- PageCache 缓存落盘、单点瓦解、主从同步跨网络
- 消费者从 Broker 消费到消息且最好只消费一次 -- 跨网络消息传输。
如何保障消息从生产者可靠地发送至 Broker
Producer 发送消息后,能够收到来自 Broker 的消息生存乐成 ack;
- Request.required.acks = 0:请求发送即以为乐成,不关心有没有写乐成,常用于日志举行分析场景。
- Request.required.acks = 1:当 leader partition 写入乐成以后,才算写入乐成,有丢数据的可能。
- Request.required.acks= -1:ISR 列表内里的全部副本都写完以后,这条消息才算写入乐成,强可靠性保证。
为了实现强可靠的 kafka 系统,我们需要设置 Request.required.acks= -1,同时还会设置集群中处于正常同步状态的副本 follower 数目 min.insync.replicas>2,另外,设置 unclean.leader.election.enable=false 使得集群中 ISR 的 follower 才可变成新的 leader,避免特殊情况下消息截断的出现。
三.Kafka的消息长期化
为了确保 Producer 收到 Broker 的乐成 ack 后,消息一定不在 Broker 环节丢失,我们核心要关注以下几点:
- Broker 返回 Producer 乐成 ack 时,消息是否已经落盘;
- Broker 宕机是否会导致数据丢失,容灾机制是什么;
- Replica 副本机制带来的多副本间数据同步一致性问题如何解决;
Broker 异步刷盘机制
kafka 为了得到更高吞吐,Broker 接收到消息后只是将数据写入 PageCache 后便以为消息已写入乐成,而 PageCache 中的数据通过 linux 的 flusher 程序举行异步刷盘(刷盘触发条:主动调用 sync 或 fsync 函数、可用内存低于阀值、dirty data 时间达到阀值),将数据次序写到磁盘。由于消息是写入到 PageCache ,单机场景,假如还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能丢失。为相识决单机故障可能带来的数据丢失问题,Kafka 为分区引入了副本机制。
Replica 副本机制
Kafka 每组分区通常有多个副本,同组分区的差别副本分布在差别的 Broker 上,生存相同的消息(可能有滞后)。副本之间是“一主多从”的关系,此中 leader 副本负责处理读写请求,follower 副本负责从 leader 拉取消息举行同步。分区的全部副本统称为 AR(Assigned Replicas),此中全部与 leader 副本保持一定同步的副本(包括 leader 副本在内)构成ISR(In-Sync Replicas),与 leader 同步滞后过多的副本构成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。follower 副本是否与leader同步的判定尺度取决于 Broker 端参数 replica.lag.time.max.ms(默以为10秒),follower 默认每隔 500ms 向 leader fetch 一次数据,只要一个 Follower 副本落伍 Leader 副本的时间不连续高出10秒,那么 Kafka 就以为该 Follower 副本与 leader 是同步的。在正常情况下,全部的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR,OSR 聚集为空。
当 leader 副本地点 Broker 宕机时,Kafka 会借助 ZK 从 follower 副本中选举新的 leader 继承对外提供服务,实现故障的自动转移,保证服务可用。为了使选举的新 leader 和旧 leader 数据尽可能一致,当 leader 副本发生故障时,默认情况下只有在 ISR 聚集中的副本才有资格被选举为新的 leader,而在 OSR 聚集中的副本则没有任何机遇(可通过设置unclean.leader.election.enable改变)。
当 Kafka 通过多副本机制解决单机故障问题时,同时也带来了多副本间数据同步一致性问题。Kafka 通过高水位更新机制、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,下面我们来依次看下这几大措施。
HW 和 LEO
- HW: High Watermark,高水位,表示已经提交(commit)的最大日志偏移量,Kafka 中某条日志“已提交”的意思是 ISR 中全部节点都包含了此条日志,并且消费者只能消费 HW 之前的数据;
- LEO: Log End Offset,表示当前 log 文件中下一条待写入消息的 offset;
如上图所示,它代表一个日志文件,这个日志文件中有8条消息,0至5之间的消息为已提交消息,5至7的消息为未提交消息。日志文件的 HW 为5,表示消费者只能拉取到5之前的消息,而 offset 为5的消息对消费者而言是不可见的。日志文件的 LEO 为8,下一条消息将在此处写入。
留意:全部副本都有对应的 HW 和 LEO,只不外 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义地点分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的 HW 有如下特点:
Leader HW:min(全部副本 LEO),为此 Leader 副本不仅要生存自己的 HW 和 LEO,还要生存 follower 副本的 HW 和 LEO,而 follower 副本只需生存自己的 HW 和 LEO;
Follower HW:min(follower 自身 LEO,leader HW)。
四.Kafka为什么这么快
Kafka 高性能的核心是保障系统低耽误、高吞吐地处理消息,为此,Kafaka 接纳了很多精妙的设计:
- 异步发送。
- 批量发送。
- 压缩技能。
- PageCache 机制&次序追加落盘。
- 零拷贝。
- 稀疏索引。
- broker & 数据分区。
- 多 reactor 多线程网络模型。
异步发送
如上文所述,Kafka 提供了异步和同步两种消息发送方式。在异步发送中,整个流程都是异步的。调用异步发送方法后,消息会被写入 channel,然后立即返回乐成。Dispatcher 协程会从 channel 轮询消息,将其发送到 Broker,同时会有另一个异步协程负责处理 Broker 返回的结果。同步发送本质上也是异步的,但是在处理结果时,同步发送通过 waitGroup 将异步操纵转换为同步。使用异步发送可以最大化提高消息发送的吞吐能力。
批量发送
Kafka 支持批量发送消息,将多个消息打包成一个批次举行发送,从而淘汰网络传输的开销,提高网络传输的服从和吞吐量。Kafka 的批量发送消息是通过以下两个参数来控制的:
batch.size:控制批量发送消息的巨细,默认值为16KB,可得当增长 batch.size 参数值提拔吞吐。但是,需要留意的是,假如批量发送的巨细设置得过大,可能会导致消息发送的耽误增长,因此需要根据实际情况举行调整。
linger.ms:控制消息在批量发送前的等待时间,默认值为0。当 linger.ms 大于0时,假如有消息发送,Kafka 会等待指定的时间,假如等待时间到达或者批量巨细达到 batch.size,就会将消息打包成一个批次举行发送。可得当增长 linger.ms 参数值提拔吞吐,比如10~100。
压缩技能
Kafka 支持压缩技能,通过将消息举行压缩后再举行传输,从而淘汰网络传输的开销(压缩息争压缩的过程会斲丧一定的 CPU 资源,因此需要根据实际情况举行调整。),提高网络传输的服从和吞吐量。
Kafka 支持多种压缩算法,通过配置参数 compression.type(默认值为 none,表示不举行压缩)控制。在 Kafka 2.1.0版本之前,仅支持 GZIP,Snappy 和 LZ4,2.1.0后还支持 Zstandard 算法(Facebook 开源,能够提供超高压缩比)。这些压缩算法性能对比(两指标都是越高越好)如下:
吞吐量:LZ4>Snappy>zstd 和 GZIP
压缩比:zstd>LZ4>GZIP>Snappy。
在 Kafka 的生产者客户端中,当发送消息时,假如启用了压缩技能,Kafka 会将消息举行压缩后再举行传输。在消费者客户端中,假如消息举行了压缩,Kafka 会在消费消息时将其解压缩。留意:Broker 假如设置了和生产者不通的压缩算法,接收消息后会解压后重新压缩生存。Broker 假如存在消息版本兼容也会触发解压后再压缩。
PageCache 机制&次序追加落盘
kafka 为了提拔系统吞吐、降低时延,Broker 接收到消息后只是将数据写入 PageCache 后便以为消息已写入乐成,而 PageCache 中的数据通过 linux 的 flusher 程序举行异步刷盘(避免了同步刷盘的巨大系统开销),将数据次序追加写到磁盘日志文件中。由于 PageCache 是在内存中举行缓存,因此读写速度非常快,可以大大提高读写服从。次序追加写充分使用次序 I/O 写操纵,避免了缓慢的随机 I/O 操纵,可有用提拔 Kafka 吞吐。
零拷贝
Kafka 中存在大量的网络数据长期化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程,这一过程的性能直接影响 Kafka 的整体吞吐量。传统的 IO 操纵存在多次数据拷贝和上下文切换,性能比较低。Kafka 使用零拷贝技能提拔上述过程性能,此中网络数据长期化磁盘主要用mmap技能,网络数据传输环节主要使用 sendfile 技能。
稀疏索引
为了方便对日志举行检索和过期清理,kafka 日志文件除了有用于存储日志的 .log 文件,还有一个位移索引文件 .index 和一个时间戳索引文件 .timeindex 文件,并且三文件的名字完全相同,如下:
Kafka 的索引文件是按照稀疏索引的思想举行设计的。稀疏索引的核心是不会为每个记录都生存索引,而是写入一定的记录之后才会增长一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数举行控制,默认巨细为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增长一个索引项。可见,单条消息巨细会影响 Kakfa 索引的插入频率,因此 log.index.interval.bytes 也是 Kafka 调优一个告急参数值。由于索引文件也是按照消息的次序性举行增长索引项的,因此 Kafka 可以使用二分查找算法来搜刮目的索引项,把时间复杂度降到了 O(lgN),大大淘汰了查找的时间。
位移索引文件的索引项布局如下:
相对位移:生存于索引文件名字上面的起始位移的差值,假设一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移则为 150 - 100 = 50,这么做的好处是使用 4 字节生存位移即可,可以节省非常多的磁盘空间。
文件物理位置:消息在 log 文件中生存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。下面我用图来表示 Kafka 是如何快速检索消息:
假设 Kafka 需要找出位移为 3550 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],得到索引项之后,Kafka 会根据该索引项的文件物理位置在 log 文件中从位置 2310272 开始次序查找,直至找到位移为 3550 的消息记录为止。
broker & 数据分区
Kafka 集群包含多个 broker。一个 topic 下通常有多个 partition,partition 分布在差别的 Broker 上,用于存储 topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 kafka 提供给了并行的消息处理能力和横向扩容能力。
多 reactor 多线程网络模型
多 Reactor 多线程网络模型 是一种高效的网络通信模型,它接纳了多个 Reactor 线程和多个工作线程来处理网络请求,可以充分使用多核 CPU 的性能,提高系统的吞吐量和相应速度。Kafka 为了提拔系统的吞吐,在 Broker 端处理消息时接纳了该模型,示意如下:
SocketServer 和 KafkaRequestHandlerPool 是此中最告急的两个组件:
- SocketServer:实现 Reactor 模式,用于处理多个 Client(包括客户端和其他 broker 节点)的并发请求,并将处理结果返回给 Client。
- KafkaRequestHandlerPool:Reactor 模式中的 Worker 线程池,内里定义了多个工作线程,用于处理实际的 I/O 请求逻辑。
整个服务端处理请求的流程大致分为以下几个步骤:
- Acceptor 接收客户端发来的请求。
- 轮询分发给 Processor 线程处理。
- Processor 将请求封装成 Request 对象,放到 RequestQueue 队列。
- KafkaRequestHandlerPool 分配工作线程,处理 RequestQueue 中的请求。
- KafkaRequestHandler 线程处理完请求后,将相应 Response 返回给 Processor 线程。
- Processor 线程将相应返回给客户端。
以时间戳查询消息
Kafka 在0.10.1.1 版本增长了时间戳索引文件,因此我们除了直接根据偏移量索引文件查询消息之外,还可以根据时间戳来访问消息。consumer-API 提供了一个offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)方法,该方法入参为一个Map 对象,Key 为待查询的分区,Value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的偏移量和时间戳。需要留意的是,若待查询的分区不存在,则该方法会被一直壅闭。
五.Kafka写入消息流程
- 毗连到 zk 集群,从 zookeeper 中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息(留意:kafka0.8 版本以后,producer 写入数据不依赖 zookpeer !)
- 毗连到对应的 leader 对应的 broker
- producer 将消息发送到 partition 的leader上
- leader 将消息写入当地 log, follower 从 leader pull 同步消息
- 写入当地 log 后,依次向 leader 返回/发送 ack
- leader 收到全部 replication 的 ack 后,向 producer 发送 ack
- 全部的 ISR 中的数据写入完成,才完成提交,整个写过程结束
六.Kafka读取消息流程
- 毗连到 zk 集群,从zookeeper 中拿到对应的 topic 的 partition 信息和 partition 的 leader 的相关信息
- 毗连到对应的 leader 对应的 broker
- consumer 将自己生存的 offset 发送给 leader
- leader 根据 offset 等信息定位到 segment(索引文件 .index 和日志文件 .log )
- 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给 consumer
说明:.index文件 存储大量的索引信息,.log文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。
举例:如如今要查找偏移量 offset 为 3 的消息,根据 .index 文件命名我们可以知道,offset 为 3 的索引应该从00000000000000000000.index 里查找。根据上图所示,其对应的索引地址为 756-911,以是 Kafka 将读取00000000000000000000.log 756~911区间的数据。
index 和 log 文件以当前 segment 的第一条消息的 offset 命名。偏移量 offset 是一个 64 位的长整形数,固定是20 位数字,长度未达到,用 0 举行填补,索引文件和日志文件都由此作为文件名命名规则。以是从上图可以看出,该分区的偏移量是从 57819368 开始的,.index 和 .log 文件名称都为 00000000000057819368。
七.幂等性和事务
幂等性
Kafka 通过 幂等生产者(Idempotent Producer)来实现幂等性。幂等生产者的配置参数是 enable.idempotence=true,启用后,Kafka 会为每个生产者分配一个唯一的 生产者 ID(Producer ID, PID),并为每条消息分配一个 序列号(Sequence Number)。Kafka broker 使用这些信息来检测和丢弃重复的消息。
生产者 ID (PID):当生产者第一次毗连到 Kafka broker 时,broker 会为其分配一个唯一的 PID。这个 PID 在生产者的生命周期内保持不变,即使生产者断开毗连并重新毗连,它仍然会使用相同的 PID。
序列号 (Sequence Number):每个生产者为每个分区维护一个递增的序列号。每次生产者发送一条消息时,序列号会递增,并与消息一起发送给 Kafka broker。Kafka broker 使用 PID 和序列号来跟踪每个生产者发送的消息。
重复消息检测
消息去重:当 Kafka broker 收到一条消息时,它会查抄该消息的 PID 和序列号。假如 broker 发现已经收到了相同 PID 和序列号的消息,它会以为这是一条重复消息,并将其丢弃。否则,broker 会将消息写入日志,并更新序列号。
超机遇制:为了防止生产者长时间未发送消息导致序列号过期,Kafka 引入了 会话超时(Session Timeout)机制。假如生产者在超时时间内没有发送任何消息,Kafka 会以为该生产者的会话已结束,并重新分配新的 PID 和序列号。默认的会话超时时间为 60 秒。
幂等性的上风
- 避免重复消息:幂等性确保即使生产者重试发送消息,也不会导致重复消息的产生。这对于需要严格消息次序的应用场景非常告急,例如金融交易系统、订单处理系统等。
- 简化重试逻辑:由于 Kafka 自动处理了重复消息的检测和去重,生产者不再需要手动实现复杂的重试逻辑,简化了开发工作。
- 提高可靠性:幂等性提高了消息传递的可靠性,特别是在网络不稳固或生产者故障的情况下,确保了消息的完备性和一致性。
事务
事务支持是指 Kafka 提供了一种机制,允很多个操纵作为一个整体举行提交或回滚,确保这些操纵要么全部乐成,要么全部失败。Kafka 的事务支持主要用于实现 准确一次语义(Exactly-Once Semantics, EOS),确保消息在生产、消费和处理过程中不会丢失或重复。
Kafka 的事务支持不仅适用于生产者发送消息的操纵,还支持跨多个主题和分区的事务性操纵。具体来说,Kafka 事务可以包括以下几种操纵:
- 消息生产:生产者可以将多条消息作为同一个事务的一部分发送到差别的主题和分区。
- 消息消费:消费者可以将多个消息的偏移量提交作为同一个事务的一部分,确保这些消息的消费是原子性的。
- 流处理:Kafka Streams API 支持事务性操纵,允许开发者在流处理过程中保证数据的一致性和完备性。
Kafka 的事务支持基于 两阶段提交协议(Two-Phase Commit Protocol),确保事务中的全部操纵要么全部乐成,要么全部失败。以下是 Kafka 事务的典范工作流程:
初始化事务
- 生产者调用 initTransactions() 方法,初始化一个事务上下文。Kafka 为该事务分配一个唯一的 事务 ID(Transaction ID),并记录事务的开始时间。
添加操纵到事务
- 生产者可以通过 send() 方法将消息添加到事务中。这些消息会被暂存起来,直到事务提交为止。
- 生产者还可以通过 addOffsetsToTransaction() 方法将消费者的偏移量提交操纵添加到事务中,确保消息的消费和处理是原子性的。
提交或回滚事务
- 当全部操纵完成后,生产者可以调用 commitTransaction() 方法提交事务。Kafka 会确保事务中的全部操纵都乐成完成,并将消息写入日志。
- 假如某个操纵失败,生产者可以调用 abortTransaction() 方法回滚事务,确保事务中的全部操纵都被取消。
事务协调器
- Kafka 为每个事务分配了一个 事务协调器(Transaction Coordinator),负责管理事务的状态和协调多个 broker 之间的同步。事务协调器会跟踪事务的进度,并在得当的时候通知其他 broker 提交或回滚事务。
Kafka事务的隔离级别
- 读已提交(Read Committed):消费者只能读取已经被提交的消息,不能读取正在处理中的事务消息。这是 Kafka 默认的隔离级别,适用于大多数场景。
- 读未提交(Read Uncommitted):消费者可以读取尚未提交的事务消息。这种隔离级别适用于对一致性要求较低的场景,但可能会导致消费者读取到未提交的消息。
事务的上风
准确一次语义:通过事务支持,Kafka 可以实现 准确一次语义,确保消息在生产、消费和处理过程中不会丢失或重复。这对于需要强一致性的应用场景非常告急,例如金融交易系统、订单处理系统等。
跨主题和分区的原子性:Kafka 的事务支持允很多个操纵超过多个主题和分区,确保这些操纵要么全部乐成,要么全部失败。这种方式提供了更高的机动性和可靠性。
流处理的一致性:Kafka Streams API 支持事务性操纵,允许开发者在流处理过程中保证数据的一致性和完备性。这对于构建复杂的实时数据处理管道非常有用。
事务的配置
要启用 Kafka 的事务支持,生产者需要配置以下参数:
enable.idempotence=true # 启用幂等性
transactional.id=<unique_transaction_id> # 设置唯一的事务 ID
此外,Kafka 还提供了一些与事务相关的配置参数,用于控制事务的超时和隔离级别:
transaction.timeout.ms:指定事务的最大持续时间。假如事务在超时时间内未完成,Kafka 会自动回滚该事务。默认值为 60000 毫秒(60 秒)。
transaction.state.log.replication.factor:指定事务状态日志的副本数。默认值为 3,发起根据集群的规模和可靠性需求举行调整。
isolation.level=read_committed:指定消费者的隔离级别为“读已提交”,确保消费者只能读取已经被提交的消息。
参考文章
图解Kafka:架构设计、消息可靠、数据长期、高性能背后的底层原理_kafka底层原理-CSDN博客
【kafka底子】-- 读写流程及举例_kafka读写流程-CSDN博客
Kafka上风剖析-幂等性和事务_kafka幂等性-CSDN博客
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |