一、主要作用
- 异步处理:提升响应速度
- 系统解耦:低落系统各模块之间的耦合
- 流量削峰:高并发下 作为缓冲,减轻系统压力
- 数据分发:广播到多个消耗者, 实现数据的分发和共享。
二、特点
- 分布式:集群方式、多副本,高可靠性 高扩展性。
- 高吞吐,低延长,百万级tps。
- 支持及时的流式处理。
- 持久化日志:可重复读取 和 无限期保留。
三、Broker
- 定义:Kafka集群中的服务器节点,负责存储和转发消息。
- 功能:
- 存储消息:Broker负责存储Topic中的消息,并包管消息的有序性和可靠性。
- 转发消息:Broker负责将生产者发送的消息转发给消耗者,并包管消息的可靠性和次序性。
- 管理元数据:Broker维护Topic和Partition的元数据信息,供生产者和消耗者利用。
- 提供API接口:Broker提供了API接口,供生产者和消耗者利用。
四、Topic
- 定义:消息的逻辑分组,是Kafka消息系统中的根本单元,用于将消息进行逻辑上的分组和归类。
- 作用:通过创建差别的Topic,可以根据需求将消息进行分类、分割和分发,以满意差别业务场景下的消息传递需求。
- 特点:每个Topic可以有一个或多个分区(Partition),每个分区可以保存差别时间段的消息序列。
五、Partition
- 定义:分区,是Topic的物理分割,是Kafka中存储消息的最小单位。每个分区都有一个对应的日志文件,用于次序写入和追加消息。
- 作用:
- 提高吞吐量:通过分区,Kafka可以分散数据和负载,支持并行处理,从而提高团体吞吐量。
- 有序存储:在同一个分区内,消息是按照写入的次序存储的,消耗者可以包管读取到的消息是有序的。
- 负载均衡:通过合理分配分区,可以实现负载均衡,避免单点瓶颈。
- 特点:
- 不可变性:一旦消息写入分区,它就是不可变的,不能被修改或删除,直到超过保留时间或被压缩。
- 分段存储:每个分区在底层是一个日志文件,由多个段(Segment)组成,每个段是一个物理文件,存储一批有序的消息。
- 索引文件:每个段都有一个对应的索引文件,用于快速查找消息的偏移量(Offset)。
六、副本(Replication)
- 定义:Partition的副本机制,也可以称之为备份机制,通过多副本包管数据的高可用性和容错性。Kafka将每个分区的消息复制到多个副本中,这些副本分布在差别的Broker上。如许纵然某个Broker出现故障,其他Broker上的副本仍然可以保留和规复消息。
- 角色:
- Leader:每个分区有一个主副本(Leader),负责处理所有的读写哀求。
- Follower:其余副本是从副本(Follower),负责从主副本复制数据。称之为ISR副本聚集。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。
- Unclean领导者推选:当ISR聚集为空时,Kafka需要重新推选一个新的领导者副本。此时,Kafka可以选择从非同步副本中推选领导者副本,这称为Unclean领导者推选。开启Unclean领导者推选可能会造成数据丢失,但提升了系统的高可用性;而禁止Unclean领导者推选则维护了数据的同等性。
- 作用:
- 数据冗余:提供数据冗余,纵然系统部门组件失效,系统依然可以或许继续运转,增加了团体可用性以及数据持久性。
- 故障规复:当主副本不可用时,从副本可以主动提升为主副本,确保系统的一连性。
- 同步原理:Kafka接纳基于领导者(Leader)的副本机制。从副本(Follower)不对外提供服务,只是定期地异步拉取 领导者副本(Leader)中的数据,从而实现与领导者副本的同步。当领导者副本挂掉时,Kafka会依托于ZooKeeper提供的监控功能及时感知到,并立即开启新一轮的领导者推选,从跟随者副本中选一个作为新的领导者。
七、ISR机制
指分区中的一组与Leader副本保持同步的副本聚集; 这些副本包罗了Leader副本中的所有已确认消息。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。只有ISR中的副本才华参与消息的同步复制和数据的读写操作,从而确保数据的同等性和可靠性。
八、Controller机制
Kafka利用Controller机制来管理Broker、Partition、Leader等元数据信息。Controller是一个独立的组件,负责监控和管理整个集群的状态和配置。当出现非常环境时,Controller可以主动进行故障转移和数据规复操作,确保集群的稳定性和高可用性。
快速故障规复的机制。当某个Broker出现故障时,可以通过ISR机制和Controller机制快速进行主副本的推选和数据的规复,从而确保集群的高可用性和数据的可靠性。
九、ACK应答机制
是生产者发送数据后Kafka的接收确认方式。
在生产者发送消息到Kafka集群时,可以设置差别的ACK级别来确保消息在发送过程中的可靠性和持久性。
Kafka的ACK机制有三种级别,由acks参数的值控制:
- acks=0:无需等待确认,可靠性最低
- acks=1:默认值。生产者发送消息后会等待Broker的领导者(Leader)确认, 会确认消息已经被领导者接收写入,但不肯定被复制到所有的副本。
- acks=-1(all):等待主题分区中所有ISR 副本都成功写入消息后才返回确认。acks=all时,Kafka只有在消息被写入ISR中的所有副本后才会返回确认,这确保了消息纵然在Leader故障的环境下也不会丢失,因为ISR中的其他副本可以推选为新的Leader。
日志持久化:
- Kafka将消息持久化到磁盘上,纵然Broker宕机或重启,消息也不会丢失。Kafka利用次序写入和分段存储的方式来提高性能,同时包管数据的可靠性。每个主题都有一个或多个分区,每个分区都有一个对应的日志文件,用于次序写入和追加消息。
持久性包管:
- Kafka提供了多种消息传递语义,包罗至少一次传递和最多一次传递。这些语义可以确保消息在发送和接收之间的可靠传递,并包管消息不会丢失或重复。
十、Kafka中Zookeeper的作用
- 推选leader:
- 在Kafka集群中,每个分区都会有一个leader来负责处理读写哀求。
- 当leader节点出现故障时,Zookeeper会利用推选算法协助进行新的leader推选,确保集群可以或许继续正常运行,并处理来自生产者和消耗者的哀求。
- 管理消耗者组的偏移量:
- 消耗者组在消耗消息时,需要记录当前已经消耗的消息偏移量。
- Zookeeper用来保存这些偏移量信息,以便在消耗者重启或发生故障时,可以或许继续从上次消耗的位置开始,从而包管消息消耗的一连性和准确性。
- 发现和注册服务:
- Kafka的broker节点和消耗者在启动时会向Zookeeper注册自己的信息,包罗IP地点、端口号和可用的分区等。
- 其他节点可以通过查询Zookeeper来发现可用的broker节点和分区,从而方便地进行消息的生产和消耗。
- 维护集群的元数据:
- Zookeeper保存了Kafka集群的元数据信息,这些信息包罗但不限于broker的列表、topic的分区信息、以及消耗者组的偏移量等。
- Kafka通过与Zookeeper的交互来获取这些元数据,以确保集群内部各组件之间的和谐同等。
- 监控集群的康健状态:
- Zookeeper负责监控Kafka集群中各个节点的状态,包罗节点的上下线环境、存活状态以及partition的副本分配环境等。
- 通过提供心跳检测和故障检测功能,Zookeeper可以或许及时发现并关照集群中可能存在的问题,从而保障集群的稳定运行。
十一、如何包管消息有序?
1、生产者如何有序
(1)序列号:为每条消息分配一个唯一的序列号,标志数据的次序。消息排序,重新发送。消耗者 按序列号来消耗消息。
(2)指定分区发送。
2、存储有序
一个分区内有序,多个分区无法包管。获取数据后会将数据次序写入日志文件。
3、消耗着如何有序
(1)一个消耗者 单线程单实例 消耗。同一个Topic、同一个分区,一个Partition分区只会被一个消耗者组中的一个消耗者消耗,从而包管次序性。
(2)按序号消耗。
生产者如何将消息 发布到指定分区?
kafka默认分区策略:轮询 选出一个partition分区
(1)自定义分区策略:实现Partitioner接口,重写partition( )方法
(2)每条消息设置同一个key:盘算key的hash值路由到指定分区,同一个key的消息都在同一个分区
消耗者如何消耗同一个分区消息?
(1)消耗者消耗方法上利用 @KafkaListener的topicPartitions属性 和 @TopicPartition @PartitionOffsets 注解指定topic、partition、offset。
十二、如何包管消息不丢失?
- 生产者(Producer):通过同步阻塞的方式的发送,设置acks=all,等待所有ISR 副本都成功写入消息后才返回确认;超时或者失败 会重试
- Broker:支持同步刷盘(持久化到硬盘中)、异步刷盘的策略,支持 1主N从的策略。持久化的 offset (偏移量),消息 支持持久化到Commitlog里面。
- 消耗者(Consumer):主动提交偏移量,改为手动提交,确保业务逻辑真正实行成功 再提交偏移量。
- 主动提交: 不是立即提交,默认每5秒 主动提交消耗偏移量。可能导致在处理消息时发生错误时无法回滚偏移量,造成消息的重复消耗或丢失。
- 手动提交: 需要显式地调用提交方法来提交消耗偏移量。 可在 成功处理完消息后 提交偏移量, 确保消息至少被处理一次。
- 同步提交(commitSync): 消耗者在提交偏移量时会阻塞等待Kafka的确认。会导致消耗者在提交偏移量时发生阻塞,可能会影响消耗者的吞吐量。
- 异步提交(commitAsync): 不会阻塞等待Kafka的确认,会立即返回,消耗者可以继续处理其他使命。 如果提交失败,可能不会立即被发现,可能导致消息的重复消耗或丢失。但 Kafka允许为异步提交 提供回调函数(OffsetCommitCallback),以便在提交成功或失败时实行相应的操作。
通讯流程:拉消息,拉取消息的过程通常是通过一个循环来实现的,在循环中,消耗者不停调用poll方法来拉取新的消息记录 。poll方法的参数指定了拉取消息的超时时间 。 拉取到的消息记录包罗多个消息,每个消息都包罗了消息的元数据(如主题、分区、偏移量等)和现实的消息内容。
指定消息在分区中的偏移量offset,消耗特定分区的消息,并且可向后回滚重复消耗。
十三、消耗端收到两条一样的消息,应该怎样处理?
(1)幂等性:消耗端 处理消息 保持幂等性,不管来多少条重复消息,最后处理的结果都一样
(2)记录消息ID和消耗状态:包管每条消息都有唯一ID编号, 消耗者端 维护张日志表 记录已经处理成功的消息的ID
十四、消息堆积怎么办?
(1)生产者:限流,设置合理的发送速率、利用滑动窗口控制速率
(2)消息队列:增加队列的分区数;设置消息逾期时间。 可以将堆积的消息进行持久化存储,以便后续处理。
(3)消耗端:服务器扩容;建Consumer消耗者集群,增加消耗者数量,批量消耗 调整批量巨细和拉取间隔,异步消耗、异步提交;优化消耗者性能(优化数据库查询、淘汰网络调用次数、利用多线程或异步处理)。
十五、消耗失败怎么办?
(1)重试:设置合理的重试次数和重试间隔。
(2)记录:将消耗失败的消息存储到数据库。消耗者可以记录错误信息并跳过有问题的消息。
(3)死信队列:对于无法处理的消息,应将其移动到死信队列Topic中,并定期分析和处理这些消息。这有助于避免消息堆积对正常业务的影响。
十六、Kafka性能优化
通过压缩和批量发送优化。
Kafka支持Gzip、Snappy、LZ4、zstd等数据压缩算法。
zstd有最高的压缩比(zstd > LZ4 > gzip > Snappy),LZ4服从最高(LZ4 > Snappy > zstd > gzip)
十七、Kafka常见配置参数
- kafka:
- # Broker的全局设置
- global:
- broker.id: 0 # Kafka broker的唯一标识符,每个broker的id都应该是唯一的
- log.dirs: /var/lib/kafka/data # Kafka存储消息的目录
- zookeeper.connect: localhost:2181 # Kafka连接到Zookeeper的地址
-
- # 网络设置
- network:
- listeners: PLAINTEXT://:9092 # Kafka监听的地址和端口,用于客户端连接
- advertised.listeners: PLAINTEXT://your.host.name:9092 # 客户端连接broker的地址,通常是在集群外部访问的地址
- num.network.threads: 3 # 用于处理网络请求的线程数
- num.io.threads: 8 # 用于处理磁盘IO的线程数
- socket.send.buffer.bytes: 102400 # Socket发送缓冲区大小
- socket.receive.buffer.bytes: 102400 # Socket接收缓冲区大小
- socket.request.max.bytes: 104857600 # 请求的最大字节数
-
- # 日志设置
- log:
- log.retention.hours: 168 # 日志保留的小时数
- log.segment.bytes: 1073741824 # 每个日志段的大小(以字节为单位)
- log.retention.check.interval.ms: 300000 # 检查日志是否过期的间隔时间(以毫秒为单位)
- log.cleaner.enable: true # 是否启用日志清理器(用于删除过期的日志)
-
- # 复制设置
- replication:
- default.replication.factor: 1 # 默认的消息复制因子
- min.insync.replicas: 1 # 在提交消息之前,必须有多少副本是同步的
- replica.lag.time.max.ms: 10000 # 副本落后时间的最大阈值(以毫秒为单位)
- replica.socket.timeout.ms: 30000 # 副本之间连接的超时时间(以毫秒为单位)
- replica.fetch.max.bytes: 1048576 # 副本从leader拉取数据的最大字节数
-
- # 消费者和生产者设置(这些通常在客户端配置,但可以在broker上设置默认值)
- consumer_producer:
- message.max.bytes: 1000000 # 消息的最大字节数(包括键和值)
- replica.fetch.min.bytes: 1 # 副本从leader拉取的最小字节数
- request.timeout.ms: 30000 # 请求的超时时间(以毫秒为单位)
- group.min.session.timeout.ms: 6000 # 消费者组的最小会话超时时间(以毫秒为单位)
- auto.commit.interval.ms: 5000 # 自动提交偏移量的间隔时间(以毫秒为单位)
-
- # 其他设置
- other:
- num.partitions: 1 # 默认的主题分区数
- controlled.shutdown.enable: true # 是否启用受控关闭
- delete.topic.enable: true # 是否允许删除主题
复制代码- # 消费者配置
- fetch.min.bytes=1 消费者拉取数据的最小批次大小
- fetch.max.bytes=5242880 # 5MB 消费者每次拉取数据的最大量
- max.poll.records=100 消费者每次拉取的最大记录数(即消息数)
- fetch.max.wait.ms=500 消费者在拉取数据时,如果数据不足fetch.min.bytes所指定的量,则消费者会等待的最长时间(以毫秒为单位)。
- min.insync.replicas 在提交消息之前,必须有多少副本是同步的。
- replica.*参数 与副本相关的配置,如副本落后时间的最大阈值、副本之间连接的超时时间等。
- message.max.bytes 消息的最大字节数。
- request.timeout.ms 请求的超时时间。
- group.min.session.timeout.ms 消费者组的最小会话超时时间。
- auto.commit.interval.ms 自动提交偏移量的间隔时间。
- num.partitions 默认的主题分区数。
- controlled.shutdown.enable 是否启用受控关闭。
- delete.topic.enable 是否允许删除主题。
- auto.leader.rebalance.enable 是否启用自动领导者选举平衡。
- leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.percentage 领导者不平衡检查和阈值设置。
- # 性能和调优设置
- queued.max.requests=500 等待处理的请求队列的最大大小。
- compression.type=producer 生产者消息的压缩类型。
- metric.reporters=[] 用于报告指标的类列表。
- num.replica.fetchers=1 每个broker用于从其他broker拉取数据的线程数。
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |