1.作用
包管每个partition内的消息顺序传输
2.消息系统原理
a.数据单元
b.消息发送
点对点消息通报:
在点对点消息系统中,消息长期化到一个队列中。此时,将有一个或多个消耗者消耗队列中的数据。但是一条消息只能被消耗一次。
当一个消耗者消耗了队列中的某条数据之后,该条数据则从消息队列中删除。
该模式即使有多个消耗者同时消耗数据,也能包管数据处理惩罚的顺序。
发布订阅消息通报:
在发布-订阅消息系统中,消息被长期化到一个topic中。消息的生产者称为发布者,消耗者称为订阅者。
消耗者可以订阅一个或多个topic,消耗者可以消耗该topic中所有的数据,同一条数据可以被多个消耗者消耗,数据被消耗后不会立马删除。
Kafka 采取拉取模型(Poll),由本身控制消耗速度,消耗者可以按照恣意的偏移量进行消耗。
当发布者消息量很大时,显然单个订阅者的处理惩罚本领是不敷的。实际上实际场景中是多个订阅者节点构成一个订阅组负载均衡消耗 Topic 消息即分组订阅,这样订阅者很容易实现消耗本领线性扩展。
对比:
3.系统架构
管理模型
Brocker:
每个kafka server称为一个Broker,多个borker构成 Kafka Cluster。
一个broker可以维护多个topic.
Controller:
Controller 表示控制器,Kafka 节点中的主节点。集群中恣意一台 Broker 都能充当控制器的脚色,但是,在运行过程中,只能有一个 Broker 成为控制器,行使其管理和和谐的职责。
在分布式系统中,通常必要有一个和谐者,该和谐者会在分布式系统发生异常时发挥特殊的作用。在 Kafka 中该和谐者称之为控制器(Controller),实在该控制器并没有什么特殊之处,它本身也是一个平常的 Broker,只不外必要负责一些额外的工作:
Broker 管理(新增 Broker、Broker 主动关闭、Broker 故障);
Topic 管理(创建主题、删除主题);
Partition 管理(Leader 分区选举、增加分区、Rebalance 分区)。
值得注意的是:Kafka 集群中始终只有一个 Controller Broker。
2.8 版本以前通过 ZooKeeper 实现选主
2.8 版本以后新增了 KRax 实现选主(扬弃 ZooKeeper 独立工作)。
========================================
内部架构
Message:
Message 表示消息。通过 Kafka 集群进行通报的消息对象实体,存储必要传送的信息。Message 是实际发送和订阅的信息的实际载体,Producer 发送到 Kafka 集群中的每条消息,都被Kafka 包装成了一个个 Message 对象,之后再存储在磁盘中,而不是直接存储的。
Message 在磁盘中的物理结构如下所示:
此中 key 和 value 存储的是实际的 Message 内容,长度不固定,而其他都是对 Message 内容的统计和形貌,长度固定。因此在查找实际 Message 过程中,磁盘指针会根据 Message 的offset 和 message length 盘算移动位数,以加速 Message 的查找过程。之所以可以这样加速,由于 Kafka 的 .log 文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随机写的操纵。
===============================================================
脚色分配:
架构和谐者:
zookeeper
==================================================
4.数据存储
数据检索流程
step1:
先根据 offset 盘算这条 offset 是这个文件中的第几条;
step2:
读取.index索引文件,根据二分检索,从索引中找到离这条数据近来偏小的位置;【根据二分检索,在 .index 索引文件找到对应的近来偏小的位置】
step3:
读取.log 文件从近来位置读取到要查找的数据
这样将要查找的数据范围从1G缩小到4K
==========================================================
二分查找法
得知一个Offset之后:Offset-文件名=offset
timestamp:包含时间戳和offset
index:包含offset和Position
得到Position之后就可以直接读取log文件了
5.文件删除
Setp1:设置逾期时间
Kafka 中 默认的日志保存时间为 7 天 ,定时使命会查看每个分段的最大时间戳(盘算逻辑同上),若最大时间戳距当前时间超过7天,则必要删除。
删除日志分段时, 首先会先从跳跃表中移除待删除的日志分段,包管没有线程对这些日志分段进行读取操纵。然后将日志分段所对应的所有文件添加上".delete"的后缀。最后由专门的定时使命来删除以".delete"为后缀的文件。
可以通过调解如下参数修改保存时间。设置文件在kafka的config/server.properties文件中:
Setp2:选择清除策略
kafka有两种数据清除策略,delete删除和compact压缩,默认是删除。
delete删除:
log.cleanup.policy = delete
当不活跃的segment的时间戳是大于设置的时间的时候,当前segment就会被删除
compact压缩:
log.cleanup.policy = compact
对于相同key的差别value值,只保留最后一个版本。
应用场景:统计商城当前的销售总额
Step3:确认清除时机
Kafka的segment数据段清除不是实时的,他更像JVM垃圾回收那样,先打上deleted清除标志,在下一次清除的时候一起回收。
如果历史数据被删除,这时如果有消耗者的偏移量就是从0开始的,确实是会造成偏移量丢失,并且默认强制把偏移量移动到近来的一个起始位置开始消耗。
6.生产者
数据发送流程
消息累加器(RecordAccumulator)
kafka生产者架构实行步骤
- 主线程KafkaProducer创建消息对象ProducerRecord,然后通过生产者拦截器。
- 生产者拦截器对消息的key ,value做一定的处理惩罚,交给序列化器。
- 序列化器对消息key和vlue做序列化处理惩罚,然后给分区器。
- 分区器给消息分配分区、并发送给消息网络器。
- 将一条ProducerRecord添加到RecordAccumulator,首先会根据分区确定对应分区所在的双端队列,在双端队列获取尾部的一个ProducerBatch对象,查看该ProducerBatch是否可以写入该ProducerRecord消息,如果可以则写入,不能的话,会在双端队列末尾在创建一个ProducerBatch对象,创建时会评估这条消息是否是否超过batch.size参数的巨细。如果不超过,就以batch.size参数的巨细来创建ProducerBatch对象。通知sender线程发送消息。
- sender线程获取RecordAccumulator中的消息,必要将原本的<分区,Deque>情势再次封装成<Node,List>的情势,此中Node节点表示Kafka集群中的broker节点。对于网络毗连而言,生产者客户端是与具体broker节点建立毗连,也就是向具体的broker节点发送消息。而并不关心消息属于哪个分区;sender线程还会进一步封装成<Node,Request>的情势,这样就可以将请求发往各个Node了。这里的Request是指kafka的各种请求协议。
- sender线程发往Kafka之前还会保存到InFlightRequest中,InFlightRequest保存对象的具体情势为Map<NodeId,Deque>,他的重要作用是缓存了已经发出去但是还没有收到相应的请求。
- sender将Request交给Selector准备发送。
- Selector将Request发送到对应的kafka节点(Broker)。
- Selector相应结果反馈给InFlightRequest。
- 主线程清算RecordAccumulator已经发送完毕的消息。
数据分区发送的好处:
分区策略
集群架构界说
AR,ISR,OSR,LEO,HW,CheckPoint
CheckPoint:将Kafka Broker端告急的日志元数据保存下来。
recovery-point-offset-checkpoint:表示已经刷写到磁盘的offset信息,对应LEO信息。
replication-offset-checkpoint:用来存储每个replica的HW,表示已经被commited的offset信息。
log-start-offset-checkpoint:对应logStartOw set,用来标识日志的起始偏移量。
cleaner-offset-checkpoint:最后清算存了每个log的offset.
副本故障处理惩罚
ISR为空问题:
Unclean Leader Election:Kafka 必要重新在OSR即将规复的follwer中选举一个新的Leader。
开启 Unclean 向导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本不停存在,不至于停止对外提供服务,因此提拔了高可用性。反之,禁止 Unclean 向导者选举的好处在于维护了数据的一致性,制止了消息丢失,但牺牲了高可用性。
数据发送包管
a.消息确认机制(生产者)
只有一条=至少一条+幂等性+事件
b.消息发送丢失:
可重试规复错误:找不到 Leader;找不到目的分区。这种环境通常重试一下就能发送乐成。
不可重试规复错误:消息体过大、缓冲区满了。这种环境即使你再重试也是会失败的,由于消息体过大除非你淘汰消息量,或者接纳压缩算法,重试是没用的。
c.消息发送重复:使用Kafka幂等性来解决
Kafka 引入了 Producer ID(即 PID) 和 Sequence Number。
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
缺点:不能跨分区
d.事件机制
e.数据顺序包管
原理:max.in.flight.requests.per.connection:生产者在收到服务器响应之前可以发送多少个请求(消息),默认为 5。它的值越高,占用的内存越多,不外会提拔吞吐量,把它设为 1 可以包管消息是按照发送的顺序写入服务器的(即使发生了重试)。如果设置为比 1 大的整数,发生错误时,可能会造成数据的发送顺序改变。
eg:可以把 retries 设置为 0(失败不重试),把 max.in.flight.requests.per.connection 设置为 1。
f.数据序列化(代码实现)
Kafka 生产者将对象序列化成字节数组并发送到服务器,消耗者必要将字节数组转换成对象(反序
列化)。序列化与反序列化必要匹配,推荐使用 Avro 序列化方式。
7.消耗者
消耗者与消耗者组
分区分配策略
1)RangeAssignor(默认)2) RoundRobinAssignor3)StickyAssignor(0.11.x版本开始引入)4)CooperativeStickyAssignor(3.0 以及以后版本)
将分区的所有权从一个消耗者移到另一个消耗者称为重新平衡(rebalance),分区分配的时机
同一个 Consumer Group 内新增消耗者
消耗者离开当前所属的Consumer Group,包括shuts down 或 crashes
订阅的主题新增分区
1)RangeAssignor
缺点:可以明显的看到这样的分配并不均匀,如果将雷同的情况扩大,有可能会出现部分消耗者过载的环境。
2)RoundRobinAssignor
将消耗组内所有消耗者以及消耗者所订阅的所有topic的partition按照字典序排序,然后通过轮询消耗者方式逐个将分区分配给每个消耗者。
消耗者订阅相同数量的Topic:
消耗者订阅差别数量的Topic:
如果同一个消耗组内的消耗者所订阅的Topic 是不相同的,那么在实行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。
如果某个消耗者没有订阅消耗组内的某个topic,那么在分配分区的时候此消耗者将分配不到这个topic的任何分区。
3)StickyAssignor
它重要有两个目的:
① 分区的分配要尽可能的均匀;
② 分区的分配尽可能的与前次分配的保持相同。
③ 当两者发生辩论时,第一个目的优先于第二个目的。
消耗者订阅差别数量的Topic:
4)CooperativeStickyAssignor
互助者粘性分配器除了保持 Sticky 的逻辑之外,还答应互助者再平衡。
数据积压
1)增加分区数和消耗者数:如果是 Kafka 消耗本领不敷,则可以考虑增加 Topic 的分区数,并且同时提拔消耗的消耗者数量,推荐消耗者数=分区数。(两者缺一不可)
2)进步拉取最大消息条数:批次拉取数据过少(拉取数据 / 处理惩罚时间 < 生产速度),使处理惩罚的数据小于生产的数据,也会造成数据积压。所以如果是下游的数据处理惩罚不实时,进步每批次拉取的数量。
8.Kafka Offset
生产端
Kafka接收到生产者发送的消息实际上是以日志文件的情势保存在对应分区的磁盘上。每条消息都有一个offset值来表示它在分区中的位置。每次写入都是追加到文件的末尾.
消耗端
a.主动提交 Offset
b.手动提交Offset
d.组合提交Offset
一样平常来说异步和同步是可以组合起来使用的,如果在运行期间,没有问题产生的时候使用异步提交,进步吞吐,在直接关闭消耗者的时候,使用同步提交,包管偏移量的提交乐成。
e.外部管理Offset
重置Offset
更新Offset由三个维度决定:Topic的作用域、重置策略、实行方案。
9.高效读写
1)文件结构
(分片、稀疏索引、二分查找)
2)顺序读写磁盘
3)Zero Copy
零 Copy 并不是真的不进行数据 Copy,实在还是必要进行两次数据 Copy 的,只不外这两次都是通过 DMA(直接存储器访问) 进行 Copy的,无需 CPU 到场,所以叫做零 Copy。
4)OS/Page Cache(页缓存的内存使用量) + sendfile(将读到内核空间的数据,转到 Socket Buffer)
Page Cache(OS Cache):页缓存的内存使用量。以 Page 为单位,缓存文件内容,缓存在 Page Cache 中的文件数据,能够更快的被用户读取。同时对于带 Buffer 的写入操纵,数据在写入到 Page Cache 中即可立即返回,而不需期待数据被实际长期化到磁盘,进而进步了上层应用读写文件的整体性能。页是逻辑上的概念,因此 Page Cache 是与文件系统同级的。
Buffer Cache:磁盘等块设备的缓存的内存使用量。磁盘等块设备的缓存,内存的这一部分是要写入到磁盘里的。块是物理上的概念,因此 Buffer Cache 是与块设备驱动步伐同级的。
buff/cache:Buffer Cache 和 Page Cache 总共的内存使用量
========================================================
说到 sendfile 就不得不提 mmap,mmap 和 sendfile 都是零 Copy 的实现方案
mmap将磁盘文件映射到内存,用户通过修改内存就能修改磁盘文件。
sendfile 是将读到内核空间的数据,转到 Socket Buffer,进行网络发送,相比力上面的 mmap 又节省了一次 CPU copy。
5)网络模型(基于 Java NIO)
Kafka 本身实现了网络模型做 RPC。底层基于 Java NIO(同步非阻塞IO),接纳和 Netty 一样的 Reactor 线程模型。
Reactor:一种事件驱动的网络编程模式。
6)拉模型消耗方式(pull)
使用拉模型,Kafka 此类的拉模型将这一块功能都交由 Consumer 主动维护,反而让服务器淘汰了更多的不须要的开支,因此从同等资源的角度来讲,Kafka 具备链接的 Producer 和 Consumer 将会更多,极大的降低了消息堵塞的环境,因此看起来更快了。
7)消息批量处理惩罚
消息批量处理惩罚并不是只有发送的时候是批量,而是从生产者、Broker 到消耗者不停都是这样,这个批次的消息看起来就好像是一条消息一样,生产者在生产端将消息聚合,Broker 什么都不做,按“一个”消息处理惩罚,最后由消耗者拿到消息后解开一条条消耗。消耗者拿到这个团体消息后,使用 Offset 机制实现从差别位置消耗消息。
8)消息压缩
生产者将批量消息进行压缩,消耗者获取后解压缩,淘汰网络 IO 传输.
批量处理惩罚和压缩的目的是为了使用相称小的 CPU 开销,调换高效的网路传输,解决网络带宽的瓶颈。
9)分区并行和可扩展
10.消息积压
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |