来自云龙湖轮廓分明的月亮 发表于 2025-4-13 17:24:13

消息队列之-Kafka

消息队列

消息队列(Message Queue)是一种跨历程的通信机制,用于在分布式系统中发送、存储和接收消息。它允许不同的应用程序或服务以异步的方式举行数据交换。以下是消息队列的一些根本概念:
消息:消息是通信的根本单位,通常包罗两部门:头部(metadata),如发送者信息、时间戳等;以及主体(payload),即实际要传输的数据内容。
生产者/发布者:指的是发送消息的应用程序或服务。生产者创建消息并将其发送到消息队列中。
消耗者/订阅者:指的是接收消息的应用程序或服务。消耗者从消息队列中读取消息,并根据消息的内容实验相应的操纵。
队列:是一个暂时存储消息的地方,用于保存生产者发送的消息直到这些消息被消耗者处理。队列遵循先进先出(FIFO, First In First Out)原则,除非特殊设置。
Broker(代理):在某些消息队列实现中,会有一个中间件称为broker,负责消息的传递、存储及路由。它作为生产者和消耗者之间的桥梁,确保消息能够精确地从生产者发送到消耗者。
持久化与非持久化:消息队列中的消息可以设置为持久化的或非持久化的。持久化消息纵然在系统崩溃后也能规复,而非持久化消息则不会在系统故障时保留。
广播与点对点:消息队列支持两种重要的消息分发模式。广播模式下,每条消息会被发送给全部订阅了该主题的消耗者;而在点对点模式下,每条消息只会被一个消耗者处理。
事务支持:一些消息队列系统提供了事务支持,确保一组消息要么全部乐成提交,要么全部不提交,以此来保证数据的一致性和完整性。
使用消息队列有助于解耦应用组件、进步系统的可扩展性、增强系统的容错本领,并且能够均衡负载,使得不同速度的服务之间能够高效协作。常见的消息队列产品包括RabbitMQ、Apache Kafka、ActiveMQ等。
消息队列的使用场景

解耦服务:通过消息队列,生产者和消耗者可以独立地摆设、扩展和开发。例如,在电子商务系统中,订单服务生成订单后,可以通过消息队列通知库存服务更新库存,而无需直接调用库存服务的接口。
异步处理:对于一些不需要立刻响应的操纵,可以接纳异步的方式举行处理,从而进步系统的响应速度。比如用户注册乐成后发送欢迎邮件,这个操纵就可以通过消息队列异步实验,而不必等候邮件发送完成后再返回给用户注册乐成的提示。
流量削峰:在高并发的情况下,如秒杀运动或节沐日促销,短时间内大量哀求可能会导致系统过载。使用消息队列可以在流量高峰期缓冲这些哀求,平滑地将使命分配给背景处理,保护系统免受瞬间高峰流量的影响。
初识Kafka

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),重要应用与大数据实时处理领域。
https://i-blog.csdnimg.cn/direct/f39fd987b374479ebcba47514a7e3d25.png
Kafka 存储的消息来自恣意多被称为 Producer 生产者的历程。数据从而可以被发布到不同的 Topic 主题下的不同 Partition 分区。
在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为 Consumer 消耗者的历程可以从分区订阅消息。
Kafka 运行在一个由一台或多台服务器构成的集群上,并且分区可以跨集群结点分布。
下面给出 Kafka 一些重要概念,让大家对 Kafka 有个团体的认识和感知:
Producer: 消息生产者,向 Kafka Broker 发消息的客户端。
Consumer:消息消耗者,从 Kafka Broker 取消息的客户端。
Consumer Group:消耗者组(CG),消耗者组内每个消耗者负责消耗不同分区的数据,进步消耗本领。一个分区只能由组内一个消耗者消耗,消耗者组之间互不影响。全部的消耗者都属于某个消耗者组,即消耗者组是逻辑上的一个订阅者。
Broker:一台 Kafka 机器就是一个 Broker。一个集群由多个 Broker 构成。一个 Broker 可以容纳多个 Topic。
Topic:可以明白为一个队列,Topic 将消息分类,生产者和消耗者面向的是同一个 Topic。
Partition:为了实现扩展性,进步并发本领,一个非常大的 Topic 可以分布到多个 Broker (即服务器)上,一个 Topic 可以分为多个 Partition,每个 Partition 是一个 有序的队列。
Replica:副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有多少个副本,一个 Leader 和多少个 Follower。
Leader:每个分区多个副本的“主”副本,生产者发送数据的对象,以及消耗者消耗数据的对象,都是 Leader。
Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader。
Offset:消耗者接纳pull的方式从broker上拉取消息举行消耗,该值记载消耗的位置信息,当消耗者挂掉再重新规复的时候,可以从消耗位置继续消耗。
Zookeeper:Kafka 集群能够正常工作,需要依赖于 Zookeeper,Zookeeper 帮助 Kafka 存储和管理集群信息。
Kafka设计思想

Consumer Group:Kafka按照消耗组来消耗消息,一个消耗组下面的机器构成一个Consumer Group,每条消息只能被该Consumer Group一个Consumer消耗。不同的Consumer Group可以消耗同一条消息。
消息状态:在Kafka中,消息是否被消耗的状态保存在Consumer中,Broker不会关心是否被消耗或被谁消耗,Consumer会记载一个offset值(指向partition中吓一跳将要被消耗的消息位置)。
消息持久化:Kafka会把消息持久化到本地文件,并且具有极高性能(零拷贝)。
批量发送:Kafka支持以消息集合为单位举行批量发送,以进步服从。
Push-and-Pull: Producer向Broker push消息,Consumer从Broker pull消息。
分区机制(Partition):
Kafka 的 Topic 被划分为多个分区(Partition),每个分区是一个有序、不可变的消息队列。分区的重要作用包括:
提供并行处理本领:不同的分区可以分布在不同的 Broker 上,从而支持高吞吐量。
保证消息次序:在一个分区内,消息是严格按次序存储和消耗的。
支持程度扩展:通过增长分区数,可以提升系统的并发处理本领。
提供数据备份:支持数据容灾本领,支持服务高可用
Kafka消息结构

在 Kafka 中,每条消息(也称为记载)都有一个明确的格式和构成部门。以下是 Kafka 消息的根本结构:
Offset:每个消息在它所属的分区中都有一个唯一的序列号,称为 Offset。这个数字用于标识消息在分区中的位置。
Message Size:表示消息体的大小,以字节为单位。这有助于消耗者知道需要读取多少数据。
Key(可选):消息键,可以为空。如果界说了 Key,它可以用来确定消息怎样被路由到特定的分区。Kafka 使用 Key 的哈希值来决定消息应该放在哪个分区,从而保证具有雷同 Key 的消息总是被发送到同一个分区。
Value:这是实际的消息内容,即要传输的数据。它可以是任何类型的序列化后的数据,如字符串、JSON、Avro 等。
Headers(可选):从 Kafka 0.11.0 版本开始引入,允许用户为消息添加元数据信息。这些元数据是以键值对的形式存在,可以用来携带关于消息的额外信息而不影响消息的实际内容。
Timestamp:表示消息创建的时间或当消息到达 broker 时的时间戳。时间戳对于日记压缩、基于时间的查询等功能非常重要。
消息发送

分区选择策略
如果消息没有指定 Key(即 key == null)
轮询(Round-Robin)策略(默认选用),将消息均匀地分配到全部可用分区中。
随机策略:从分区中随机选择一个
如果消息指定了 Key,则根据 Key 的哈希值计算目的分区:
partition = hash(key) % numPartitions
自界说策略
Kafka 允许用户通过实现 org.apache.kafka.clients.producer.Partitioner 接口来自界说分区逻辑。例如,可以根据业务规则将某些类型的消息路由到特定的分区。
消息消耗

Consumer Group保存了本身的位移信息,只需要一个简单的整数表示位置就可以了。
老版本的位移是提交到Zookeeper中,但是Zookeeper不得当记性大批量的读写操纵,尤其是写操纵。
从0.9版本开始kafka增长了__consumer_offsets这个Topic,将Offset这个信息写入Topic,如许就不需要以来Zookeeper。
Consumer Group接纳pull的方式来消耗消息,那么每个Consumer该消耗哪个Partition的消息需要一套严格的机制来保证,防止各Consumer间重复消耗消息。而且,Partition是可以程度无限扩展的,随着Partition的扩展,Consumer消耗的Partition也会重新分配,在Kafka内部有两种默认的消耗分区分配策略:Range和RoundRobin。
当发生以下事件时,Kafka会重新举行分配:
同一个Consumer Group内新增消耗者
消耗者离开当前所属的Group,比如机器Shutdown或者Crash
订阅的主题新增Partition
Range策略
工作原理:
RangeAssignor 是 Kafka 的默认分区分配策略。
它按照主题的分区范围(Range)将分区分配给消耗者。
具体步骤如下:
将每个主题的全部分区按次序编号(例如 P0, P1, P2, …, Pn)。
按照消耗者的字典次序排序(例如 C0, C1, C2)。
根据消耗者数量计算每个消耗者应分配的分区范围。
分区总数 ÷ 消耗者总数 = 每个消耗者分到的分区数(整数部门)。
剩余的分区会依次分配给前面的消耗者。
最终,每个消耗者得到一个连续的分区范围。
示例
假设有一个 Topic 包罗 8 个分区(P0-P7),消耗者组中有 3 个消耗者(C0, C1, C2)。使用 RangeAssignor 策略时,分区分配如下:
8 / 3 = 2 ,每个消耗者分到2个分区
按次序分配分区:
C0: P0, P1(基天职区)+ P2(剩余分区的第一个)
C1: P3, P4(基天职区)+ P5 (剩余分区的第二个)
C2: P6, P7(基天职区)
最终分配效果:
C0: P0, P1, P2
C1: P3, P4, P5
C2: P6, P7
RoundRobin策略
工作原理:
RoundRobinAssignor 使用轮询的方式将全部主题的分区均匀地分配给消耗者。
具体步骤如下:
将全部主题的全部分区按次序分列。
按照消耗者的字典次序排序。
使用轮询的方式依次为每个消耗者分配分区。
直到全部分区都被分配完毕。
示例:
假设有一个 Topic 包罗 8 个分区(P0-P7),消耗者组中有 3 个消耗者(C0, C1, C2)。使用 RoundRobinAssignor 策略时,分区分配如下:
将全部分区按次序分列:P0, P1, P2, P3, P4, P5, P6, P7。
按照消耗者的字典次序排序:C0, C1, C2。
使用轮询方式分配分区:
第一轮:C0 -> P0, C1 -> P1, C2 -> P2。
第二轮:C0 -> P3, C1 -> P4, C2 -> P5。
第三轮:C0 -> P6, C1 -> P7。
最终分配效果:
C0: P0, P3, P6
C1: P1, P4, P7
C2: P2, P5
总结:
策略优点缺点适用场景RangeAssignor实现简单,得当分区数量与消耗者数量靠近的场景可能导致负载不均分区数量与消耗者数量靠近RoundRobinAssignor分区分配更加均匀实现稍复杂分区数量较多或涉及多主题的场景 Kafka高可用

对分布式系统来说,当集群规模上升到一定程度后,一台或多台机器宕机的风险会增长,Kafka接纳多机备份和消息ACK应答机制,办理数据丢失问题,并通过一套失败规复机制办理服务不可用问题
消息备份机制

1. 根本原理

Kafka 每个主题(Topic)可以被划分为多个分区(Partition),而每个分区又可以有多个副本(Replica)。这些副天职布在不同的 Broker 上,其中一个副本作为 Leader 副本,别的的作为 Follower 副本。生产者总是向 Leader 副本写入数据,Follower 副本则从 Leader 异步拉取数据举行同步。
2. ISR(In-Sync Replicas)



[*]界说:ISR 是一组与 Leader 副本保持同步的副本集合。只有当一个副本能够跟上 Leader 的更新速度时,它才会被以为是 ISR 成员。
[*]作用:ISR 确保了纵然发生故障,也总有一个最新的副本可以快速接管成为新的 Leader,从而淘汰数据丢失的风险。
[*]动态调解:如果某个 Follower 落后太多(例如因为网络延迟或负载过高),它将被从 ISR 中移除;被移除的Follower会向Leader发送FetchRequest哀求,试图再次跟上Leader,重新进入ISR。
3. ACK(Acknowledgements)



[*]界说:ACK 指的是生产者发送消息后等候 Broker 返回确认的过程。Kafka 提供了三种 ACK 策略:

[*]acks=0:生产者不等候任何确认,消息可能丢失。
[*]acks=1:生产者只需 Leader 副本确认收到消息即可。
[*]acks=all 或 acks=-1:生产者需要全部同步副本(ISR)确认收到消息。

[*]影响:选择不同的 ACK 策略是在性能和可靠性之间做出权衡。acks=all 提供了最高的数据安全性,但也会增长延迟。
4. LEO(Log End Offset)



[*]界说:LEO 表示每个副本当前已写入的最大偏移量。换句话说,它是该副本日记文件的末了位置。
[*]作用:LEO 反映了副本上的最新消息位置。对于 Leader 和 Follower 副本来说,它们各自的 LEO 可能不同,特殊是在 Follower 正在追赶 Leader 的情况下。
5. HW(High Watermark)



[*]界说:HW 是消耗者可以看到的最大偏移量。它代表了全部 ISR 中最小的 LEO。换句话说,HW 标识了全部副本都已确认的消息界限。
[*]作用:HW 确保了纵然某些副本落后,消耗者也只能读取到已经被全部 ISR 成员确认的消息。这提供了一种保证,即消耗者不会看到未完全同步的消息,从而避免了数据不一致的问题。
实际场景示例

假设有一个 Topic 包罗三个副本(复制因子为 3),并且设置了 min.insync.replicas=2 和 acks=all。在这种情况下:

[*]生产者发送消息:生产者将消息发送给 Leader 副本,并等候全部 ISR 成员(至少两个)确认接收。
[*]Leader 更新 LEO:Leader 将消息写入其日记并更新自身的 LEO。
[*]Follower 同步数据:Follower 副本从 Leader 获取新消息并更新本身的 LEO。
[*]确定 HW:一旦至少有两个副本(包括 Leader)确认接收到消息,Leader 会更新 HW 到最新的安全偏移量。
[*]消耗者读取消息:消耗者只能读取到 HW 之前的消息,确保了数据的一致性和可靠性。
这种设计使得 Kafka 在保证高性能的同时,也能提供强大的容错本领和数据一致性保障。
ISR 确保了在 Leader 故障时能迅速选出新的 Leader 而不影响服务;
ACK 策略让用户可以根据需求选择合适的数据安全级别;
LEO 和 HW 的机制则进一步保障了数据的一致性和可靠性。
故障规复

Broker 故障处理

当一个或多个 Broker 发生故障时,Kafka 依靠其内置的副本(Replication)和推举机制来规复服务。
1. 检测 Broker 故障



[*]Kafka 使用 ZooKeeper 来监控集群中的全部 Broker。每个 Broker 在启动时都会在 ZooKeeper 中注册一个暂时节点。
[*]如果某个 Broker 宕机或与 ZooKeeper 断开连接超过一定时间(由 session.timeout.ms 控制),ZooKeeper 将删除该 Broker 的暂时节点,从而触发故障检测。
2. Leader 副本故障



[*]对于受影响的每个分区,如果 Leader 副本所在的 Broker 宕机,Kafka 控制器将从 ISR(In-Sync Replicas)中选择一个新的 Leader。
[*]新的 Leader 被选中后,控制器会更新元数据并将新的 Leader 信息传播给其他 Broker 和客户端。
[*]这个过程通常非常快,因为 ISR 中的副本已经与原 Leader 同步了最新的数据。
3. Follower 副本故障



[*]如果某个 Follower 副本所在 Broker 宕机,它将暂时无法同步数据,并被从 ISR 中移除。
[*]当该 Broker 规复正常后,它会尝试重新加入 ISR。首先,它需要从当前的 Leader 同步丢失的数据,直到赶上最新状态。
[*]一旦同步完成,该副本可以重新加入 ISR 并规复正常操纵。
4. 设置参数影响



[*]min.insync.replicas:决定了必须有多少个副本确认接收到消息才能被视为已提交。这影响了系统在部门副本失败时的容错本领。
[*]unclean.leader.election.enable:控制是否允许非 ISR 成员成为 Leader。默认情况下禁用此选项以避免潜在的数据丢失风险。
Controller 故障处理

Kafka 集群中的控制器(Controller)负责管理主题、分区、副本等资源的状态变化,如 Leader 推举、分区分配等使命。控制器本身也是一个 Broker,但它负担了额外的责任。
1. 自动推举新控制器



[*]Kafka 集群中只有一个活跃的控制器。当现有控制器发生故障时,Kafka 会在剩余的 Broker 中自动推举出一个新的控制器。
[*]全部 Broker 在启动时都会试图注册成为候选控制器,但最终只会有一个乐成。这个过程通过 ZooKeeper 实现,使用了 ZooKeeper 的原子性特性确保只有一个 Broker 成为控制器。
2. 接管职责



[*]新当选的控制器立刻接管原有控制器的全部职责,包括:

[*]管理分区的 Leader 推举。
[*]监控 Broker 的上线和下线情况。
[*]处理主题的创建、删除及设置变动等操纵。

3. 状态规复



[*]新控制器需要从 ZooKeeper 中读取集群的当前状态信息,以便精确地继续管理工作。这包括但不限于:

[*]当前的主题和分区信息。
[*]分区的 Leader 和 ISR 列表。
[*]其他重要的集群设置和状态数据。

4. 防止脑裂问题



[*]“脑裂”是指在网络分区的情况下,两个不同的控制器同时以为本身是主控制器的情况。为了避免这种情况,Kafka 使用 ZooKeeper 来保证只有一个控制器处于运动状态。只有当现有的控制器失去与 ZooKeeper 的联系时,才会允许其他 Broker 竞争成为新的控制器。
实际场景示例

假设在一个 Kafka 集群中有三个 Broker(B1, B2, B3),其中 B1 是当前的控制器。现在考虑以下两种场景:
场景 1:Broker 故障



[*]如果 B1 宕机,而 B1 上托管了一个 Topic 的 Leader 副本。
[*]控制器将从 ISR 中选出一个新的 Leader(可能是 B2 或 B3)。
[*]更新后的 Leader 信息会被传播给集群中的其他 Broker 和客户端。
[*]当 B1 规复后,它可以作为普通 Broker 重新加入集群,并开始从当前的 Leader 同步数据。
场景 2:Controller 故障



[*]如果 B1(控制器)宕机,ZooKeeper 会注意到这一点并触发新的控制器推举。
[*]假设 B2 乐成当选为新的控制器。
[*]B2 将接管全部的控制器职责,包括管理 Leader 推举、处理主题变动等。
[*]如果 B1 规复,它将不会自动重新成为控制器,除非当前控制器再次失效且 B1 再次赢得推举。
Kafka高性能

1. 批量发送消息

Kafka 通过批量发送消息的方式显着进步了吞吐量,淘汰了网络传输的开销。
(1)批量机制的工作原理



[*]Kafka 生产者不会逐条发送消息,而是将多条消息打包成一个批次(Batch),然后一次性发送到 Broker。
[*]批量发送的核心思想是使用缓冲区(Buffer)暂存消息,并在满意一定条件时触发发送。
(2)关键设置参数



[*]batch.size:控制每个批次的最大字节数。当累积的消息到达这个大小时,生产者会立刻发送该批次。
[*]linger.ms:指定生产者等候更多消息加入当前批次的时间。纵然批次未满,也会在 linger.ms 时间后发送消息。

[*]如果 linger.ms 设置为 0,则只要消息生成绩会立刻发送。
[*]如果设置为非零值(如 5ms),则允许生产者稍微延迟发送以积累更多的消息,从而进步吞吐量。

(3)性能上风



[*]淘汰网络开销:每次网络哀求都会带来一定的开销(如 TCP 连接建立、头部信息等)。批量发送可以显着淘汰网络哀求的次数,从而降低这些开销。
[*]压缩服从提升:Kafka 支持对消息举行压缩(如 GZIP、Snappy、LZ4 等)。由于压缩算法通常在更大的数据块上表现更好,因此批量发送可以进一步进步压缩服从。
2. 持久化消息

Kafka 使用次序写入磁盘和操纵系统缓存来实现高效的消息持久化,确保数据的安全性和高性能。
(1)次序写入磁盘



[*]Kafka 的日记文件是以追加的方式写入磁盘的。这种次序写入操纵比随机写入快得多,因为磁盘的寻址时间和旋转延迟在次序写入中被最小化。
[*]现代硬盘(尤其是 SSD)对次序写入有非常高的吞吐量支持,因此 Kafka 能够充分使用硬件性能。
(2)操纵系统页缓存(Page Cache)



[*]Kafka 并不直接将消息写入磁盘,而是依赖于操纵系统的页缓存(Page Cache)。生产者发送的消息首先被写入内存中的页缓存,随后由操纵系统异步地刷新到磁盘。
[*]优点:

[*]写入速度极快,因为写入的是内存而非直接写入磁盘。
[*]消耗者的读取操纵可以直接从页缓存中获取数据,而无需访问磁盘,进一步提升了读取性能。

(3)刷盘策略



[*]Kafka 提供了灵活的刷盘策略,用户可以通过以下参数控制数据的持久性:

[*]acks 参数:

[*]acks=0:生产者不等候任何确认,消息可能会丢失。
[*]acks=1:生产者只需 Leader 副本确认收到消息即可。
[*]acks=all 或 acks=-1:生产者需要全部同步副本(ISR)确认收到消息。

[*]flush.messages 和 flush.ms:

[*]控制 Kafka 将消息从页缓存刷到磁盘的频率。
[*]默认情况下,Kafka 不会频繁刷盘,而是依赖操纵系统的定时同步机制。


(4)性能上风



[*]高吞吐量:通过次序写入和页缓存,Kafka 实现了极高的写入吞吐量。
[*]低延迟:消耗者的读取操纵直接从页缓存中获取数据,避免了磁盘 I/O 开销。
[*]数据可靠性:通过灵活的刷盘策略,用户可以在性能和可靠性之间找到均衡。
3. 零拷贝(Zero-Copy)

零拷贝技能是 Kafka 实现高性能的关键之一,它通过淘汰数据在不同内存区域之间的复制次数来进步吞吐量。
(1)传统数据传输方式的问题

在传统的数据传输过程中,数据需要颠末多次拷贝:

[*]数据从磁盘读取到内核空间的缓冲区。
[*]数据从内核空间拷贝到用户空间的缓冲区。
[*]用户空间的应用程序处理数据后,再将其拷贝回内核空间的缓冲区。
[*]最后,数据从内核空间发送到网络接口。
这种多次拷贝的过程不仅增长了 CPU 的开销,还引入了额外的延迟。
(2)Kafka 的零拷贝实现

Kafka 使用了 Linux 的 sendfile 系统调用来实现零拷贝。以下是零拷贝的工作流程:

[*]数据从磁盘直接加载到内核空间的页缓存。
[*]使用 sendfile 系统调用,数据直接从内核空间的页缓存发送到网络接口,而无需颠末用户空间。
(3)性能上风



[*]淘汰 CPU 开销:零拷贝避免了用户空间和内核空间之间的数据拷贝,降低了 CPU 的使用率。
[*]降低延迟:数据传输路径更短,淘汰了上下文切换和内存拷贝的时间。
[*]进步吞吐量:通过淘汰不必要的操纵,Kafka 能够以更高的速度处理大规模数据。
总结

优化点实现方式性能上风批量发送消息将多条消息打包成一个批次发送,淘汰网络哀求次数淘汰网络开销,进步吞吐量,增强压缩服从持久化消息使用次序写入磁盘和操纵系统页缓存,结合灵活的刷盘策略高吞吐量、低延迟、可靠的数据存储零拷贝使用 Linux 的 sendfile 系统调用,避免数据在用户空间和内核空间之间的拷贝淘汰 CPU 开销,降低延迟,进步吞吐量
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 消息队列之-Kafka