ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Kafka
[打印本页]
作者:
忿忿的泥巴坨
时间:
2024-10-31 11:15
标题:
Kafka
一、主要作用
异步处理:提升响应速度
系统解耦:低落系统各模块之间的耦合
流量削峰:高并发下 作为缓冲,减轻系统压力
数据分发:广播到多个消耗者, 实现数据的分发和共享。
二、特点
分布式:集群方式、多副本,高可靠性 高扩展性。
高吞吐,低延长,百万级tps。
支持及时的流式处理。
持久化日志:可重复读取 和 无限期保留。
三、Broker
定义
:Kafka集群中的
服务器节点
,负责存储和转发消息。
功能
:
存储消息
:Broker负责存储Topic中的消息,并包管消息的有序性和可靠性。
转发消息
:Broker负责将生产者发送的消息转发给消耗者,并包管消息的可靠性和次序性。
管理元数据
:Broker维护Topic和Partition的元数据信息,供生产者和消耗者利用。
提供API接口
:Broker提供了API接口,供生产者和消耗者利用。
四、Topic
定义
:消息的逻辑分组,是Kafka消息系统中的根本单元,用于将消息进行逻辑上的分组和归类。
作用
:通过创建差别的Topic,可以根据需求将消息进行分类、分割和分发,以满意差别业务场景下的消息传递需求。
特点
:每个Topic可以有一个或多个分区(Partition),每个分区可以保存差别时间段的消息序列。
五、Partition
定义
:分区,是Topic的物理分割,是Kafka中存储消息的最小单位。每个分区都有一个对应的日志文件,用于次序写入和追加消息。
作用
:
提高吞吐量
:通过分区,Kafka可以分散数据和负载,支持并行处理,从而提高团体吞吐量。
有序存储
:在同一个分区内,消息是按照写入的次序存储的,消耗者可以包管读取到的消息是有序的。
负载均衡
:通过合理分配分区,可以实现负载均衡,避免单点瓶颈。
特点
:
不可变性
:一旦消息写入分区,它就是不可变的,不能被修改或删除,直到超过保留时间或被压缩。
分段存储
:每个分区在底层是一个日志文件,由多个段(Segment)组成,每个段是一个物理文件,存储一批有序的消息。
索引文件
:每个段都有一个对应的索引文件,用于快速查找消息的偏移量(Offset)。
六、副本(Replication)
定义
:Partition的副本机制,也可以称之为备份机制,通过多副本包管数据的高可用性和容错性。Kafka将每个分区的消息复制到多个副本中,这些副本分布在差别的Broker上。如许纵然某个Broker出现故障,其他Broker上的副本仍然可以保留和规复消息。
角色
:
Leader
:每个分区有一个主副本(Leader),负责处理所有的读写哀求。
Follower
:其余副本是从副本(Follower),负责从主副本复制数据。称之为ISR副本聚集。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。
Unclean领导者推选
:当ISR聚集为空时,Kafka需要重新推选一个新的领导者副本。此时,Kafka可以选择从非同步副本中推选领导者副本,这称为Unclean领导者推选。开启Unclean领导者推选可能会造成数据丢失,但提升了系统的高可用性;而禁止Unclean领导者推选则维护了数据的同等性。
作用
:
数据冗余
:提供数据冗余,纵然系统部门组件失效,系统依然可以或许继续运转,增加了团体可用性以及数据持久性。
故障规复
:当主副本不可用时,从副本可以主动提升为主副本,确保系统的一连性。
同步原理
:Kafka接纳基于领导者(Leader)的副本机制。从副本(Follower)不对外提供服务,只是定期地异步拉取 领导者副本(Leader)中的数据,从而实现与领导者副本的同步。当领导者副本挂掉时,Kafka会依托于ZooKeeper提供的监控功能及时感知到,并立即开启新一轮的领导者推选,从跟随者副本中选一个作为新的领导者。
七、ISR机制
指分区中的一组与Leader副本保持同步的副本聚集; 这些副本包罗了Leader副本中的所有已确认消息。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。只有ISR中的副本才华参与消息的同步复制和数据的读写操作,从而确保数据的同等性和可靠性。
八、Controller机制
Kafka利用Controller机制来管理Broker、Partition、Leader等元数据信息。Controller是一个独立的组件,负责
监控和管理整个集群的状态和配置
。当出现非常环境时,Controller可以主动进行
故障转移和数据规复
操作,确保集群的稳定性和高可用性。
快速故障规复的机制。当某个Broker出现故障时,可以通过ISR机制和Controller机制快速进行主副本的推选和数据的规复,从而确保集群的高可用性和数据的可靠性。
九、ACK应答机制
是生产者发送数据后Kafka的接收确认方式。
在生产者发送消息到Kafka集群时,可以设置差别的ACK级别来确保消息在发送过程中的可靠性和持久性。
Kafka的ACK机制有三种级别,由acks参数的值控制:
acks=0:
无需等待确认,可靠性最低
acks=1:
默认值。生产者发送消息后会等待Broker的领导者(Leader)确认, 会确认消息已经被领导者接收写入,但不肯定被复制到所有的副本。
acks=-1(all):
等待主题分区中所有ISR 副本都成功写入消息后才返回确认。acks=all时,Kafka只有在消息被写入ISR中的所有副本后才会返回确认,这确保了消息纵然在Leader故障的环境下也不会丢失,因为ISR中的其他副本可以推选为新的Leader。
日志持久化
:
Kafka将消息持久化到磁盘上,纵然Broker宕机或重启,消息也不会丢失。Kafka利用次序写入和分段存储的方式来提高性能,同时包管数据的可靠性。每个主题都有一个或多个分区,每个分区都有一个对应的日志文件,用于次序写入和追加消息。
持久性包管
:
Kafka提供了多种消息传递语义,包罗至少一次传递和最多一次传递。这些语义可以确保消息在发送和接收之间的可靠传递,并包管消息不会丢失或重复。
十、Kafka中Zookeeper的作用
推选leader
:
在Kafka集群中,每个分区都会有一个leader来负责处理读写哀求。
当leader节点出现故障时,Zookeeper会利用推选算法协助进行新的leader推选,确保集群可以或许继续正常运行,并处理来自生产者和消耗者的哀求。
管理消耗者组的偏移量
:
消耗者组在消耗消息时,需要记录当前已经消耗的消息偏移量。
Zookeeper用来保存这些偏移量信息,以便在消耗者重启或发生故障时,可以或许继续从上次消耗的位置开始,从而包管消息消耗的一连性和准确性。
发现和注册服务
:
Kafka的broker节点和消耗者在启动时会向Zookeeper注册自己的信息,包罗IP地点、端口号和可用的分区等。
其他节点可以通过查询Zookeeper来发现可用的broker节点和分区,从而方便地进行消息的生产和消耗。
维护集群的元数据
:
Zookeeper保存了Kafka集群的元数据信息,这些信息包罗但不限于broker的列表、topic的分区信息、以及消耗者组的偏移量等。
Kafka通过与Zookeeper的交互来获取这些元数据,以确保集群内部各组件之间的和谐同等。
监控集群的康健状态
:
Zookeeper负责监控Kafka集群中各个节点的状态,包罗节点的上下线环境、存活状态以及partition的副本分配环境等。
通过提供心跳检测和故障检测功能,Zookeeper可以或许及时发现并关照集群中可能存在的问题,从而保障集群的稳定运行。
十一、如何包管消息有序?
1、生产者如何有序
(1)序列号:为每条消息分配一个唯一的序列号,标志数据的次序。消息排序,重新发送。消耗者 按序列号来消耗消息。
(2)指定分区发送。
2、存储有序
一个分区内有序,多个分区无法包管。获取数据后会将数据次序写入日志文件。
3、消耗着如何有序
(1)一个消耗者 单线程单实例 消耗。同一个Topic、同一个分区,一个Partition分区只会被一个消耗者组中的一个消耗者消耗,从而包管次序性。
(2)按序号消耗。
生产者如何将消息 发布到指定分区
?
kafka默认分区策略:轮询 选出一个partition分区
(1)自定义分区策略:实现Partitioner接口,重写partition( )方法
(2)每条消息设置同一个key:盘算key的hash值路由到指定分区,同一个key的消息都在同一个分区
消耗者如何消耗同一个分区消息?
(1)消耗者消耗方法上利用 @KafkaListener的topicPartitions属性 和 @TopicPartition @PartitionOffsets 注解指定topic、partition、offset。
十二、如何包管消息不丢失?
生产者(Producer):通过同步阻塞的方式的发送,设置acks=all,等待所有ISR 副本都成功写入消息后才返回确认;超时或者失败 会重试
Broker:支持同步刷盘(持久化到硬盘中)、异步刷盘的策略,支持 1主N从的策略。持久化的 offset (偏移量),消息 支持持久化到Commitlog里面。
消耗者(Consumer):主动提交偏移量,改为手动提交,确保业务逻辑真正实行成功 再提交偏移量。
主动提交
: 不是立即提交,默认每5秒 主动提交消耗偏移量。可能导致在处理消息时发生错误时无法回滚偏移量,造成消息的重复消耗或丢失。
手动提交
: 需要显式地调用提交方法来提交消耗偏移量。 可在 成功处理完消息后 提交偏移量, 确保消息至少被处理一次。
同步提交(commitSync)
: 消耗者在提交偏移量时会阻塞等待Kafka的确认。会导致消耗者在提交偏移量时发生阻塞,可能会影响消耗者的吞吐量。
异步提交(commitAsync)
: 不会阻塞等待Kafka的确认,会立即返回,消耗者可以继续处理其他使命。 如果提交失败,可能不会立即被发现,可能导致消息的重复消耗或丢失。但 Kafka允许为异步提交 提供回调函数(OffsetCommitCallback),以便在提交成功或失败时实行相应的操作。
通讯流程
:拉消息,拉取消息的过程通常是通过一个循环来实现的,在循环中,消耗者不停调用poll方法来拉取新的消息记录 。poll方法的参数指定了拉取消息的超时时间 。 拉取到的消息记录包罗多个消息,每个消息都包罗了消息的元数据(如主题、分区、偏移量等)和现实的消息内容。
指定消息在分区中的偏移量offset,消耗特定分区的消息,并且可向后回滚重复消耗。
十三、消耗端收到两条一样的消息,应该怎样处理?
(1)幂等性:消耗端 处理消息 保持幂等性,不管来多少条重复消息,最后处理的结果都一样
(2)记录消息ID和消耗状态:包管每条消息都有唯一ID编号, 消耗者端 维护张日志表 记录已经处理成功的消息的ID
十四、消息堆积怎么办?
(1)生产者:限流,设置合理的发送速率、利用滑动窗口控制速率
(2)消息队列:增加队列的
分区数
;设置消息逾期时间。 可以将堆积的
消息进行持久化
存储,以便后续处理。
(3)消耗端:服务器扩容;建Consumer消耗者集群,
增加消耗者数量,批量消耗 调整批量巨细和拉取间隔,异步消耗、异步提交
;优化消耗者性能(优化数据库查询、淘汰网络调用次数、利用多线程或异步处理)。
十五、消耗失败怎么办?
(1)
重试
:设置合理的重试次数和重试间隔。
(2)
记录
:将消耗失败的消息
存储到数据库
。消耗者可以
记录错误信息
并跳过有问题的消息。
(3)死信队列
:对于无法处理的消息,应将其移动到死信队列Topic中,并定期分析和处理这些消息。这有助于避免消息堆积对正常业务的影响。
十六、Kafka性能优化
通过压缩和批量发送优化。
Kafka支持Gzip、Snappy、LZ4、zstd等数据压缩算法。
zstd有最高的压缩比(zstd > LZ4 > gzip > Snappy),LZ4服从最高(LZ4 > Snappy > zstd > gzip)
十七、Kafka常见配置参数
kafka:
# Broker的全局设置
global:
broker.id: 0 # Kafka broker的唯一标识符,每个broker的id都应该是唯一的
log.dirs: /var/lib/kafka/data # Kafka存储消息的目录
zookeeper.connect: localhost:2181 # Kafka连接到Zookeeper的地址
# 网络设置
network:
listeners: PLAINTEXT://:9092 # Kafka监听的地址和端口,用于客户端连接
advertised.listeners: PLAINTEXT://your.host.name:9092 # 客户端连接broker的地址,通常是在集群外部访问的地址
num.network.threads: 3 # 用于处理网络请求的线程数
num.io.threads: 8 # 用于处理磁盘IO的线程数
socket.send.buffer.bytes: 102400 # Socket发送缓冲区大小
socket.receive.buffer.bytes: 102400 # Socket接收缓冲区大小
socket.request.max.bytes: 104857600 # 请求的最大字节数
# 日志设置
log:
log.retention.hours: 168 # 日志保留的小时数
log.segment.bytes: 1073741824 # 每个日志段的大小(以字节为单位)
log.retention.check.interval.ms: 300000 # 检查日志是否过期的间隔时间(以毫秒为单位)
log.cleaner.enable: true # 是否启用日志清理器(用于删除过期的日志)
# 复制设置
replication:
default.replication.factor: 1 # 默认的消息复制因子
min.insync.replicas: 1 # 在提交消息之前,必须有多少副本是同步的
replica.lag.time.max.ms: 10000 # 副本落后时间的最大阈值(以毫秒为单位)
replica.socket.timeout.ms: 30000 # 副本之间连接的超时时间(以毫秒为单位)
replica.fetch.max.bytes: 1048576 # 副本从leader拉取数据的最大字节数
# 消费者和生产者设置(这些通常在客户端配置,但可以在broker上设置默认值)
consumer_producer:
message.max.bytes: 1000000 # 消息的最大字节数(包括键和值)
replica.fetch.min.bytes: 1 # 副本从leader拉取的最小字节数
request.timeout.ms: 30000 # 请求的超时时间(以毫秒为单位)
group.min.session.timeout.ms: 6000 # 消费者组的最小会话超时时间(以毫秒为单位)
auto.commit.interval.ms: 5000 # 自动提交偏移量的间隔时间(以毫秒为单位)
# 其他设置
other:
num.partitions: 1 # 默认的主题分区数
controlled.shutdown.enable: true # 是否启用受控关闭
delete.topic.enable: true # 是否允许删除主题
复制代码
# 消费者配置
fetch.min.bytes=1 消费者拉取数据的最小批次大小
fetch.max.bytes=5242880 # 5MB 消费者每次拉取数据的最大量
max.poll.records=100 消费者每次拉取的最大记录数(即消息数)
fetch.max.wait.ms=500 消费者在拉取数据时,如果数据不足fetch.min.bytes所指定的量,则消费者会等待的最长时间(以毫秒为单位)。
min.insync.replicas 在提交消息之前,必须有多少副本是同步的。
replica.*参数 与副本相关的配置,如副本落后时间的最大阈值、副本之间连接的超时时间等。
message.max.bytes 消息的最大字节数。
request.timeout.ms 请求的超时时间。
group.min.session.timeout.ms 消费者组的最小会话超时时间。
auto.commit.interval.ms 自动提交偏移量的间隔时间。
num.partitions 默认的主题分区数。
controlled.shutdown.enable 是否启用受控关闭。
delete.topic.enable 是否允许删除主题。
auto.leader.rebalance.enable 是否启用自动领导者选举平衡。
leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.percentage 领导者不平衡检查和阈值设置。
# 性能和调优设置
queued.max.requests=500 等待处理的请求队列的最大大小。
compression.type=producer 生产者消息的压缩类型。
metric.reporters=[] 用于报告指标的类列表。
num.replica.fetchers=1 每个broker用于从其他broker拉取数据的线程数。
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4