Kafka

打印 上一主题 下一主题

主题 850|帖子 850|积分 2550

一、主要作用



  • 异步处理:提升响应速度
  • 系统解耦:低落系统各模块之间的耦合
  • 流量削峰:高并发下 作为缓冲,减轻系统压力
  • 数据分发:广播到多个消耗者, 实现数据的分发和共享。

二、特点



  • 分布式:集群方式、多副本,高可靠性 高扩展性。
  • 高吞吐,低延长,百万级tps。
  • 支持及时的流式处理。
  • 持久化日志:可重复读取 和 无限期保留。


三、Broker


  • 定义:Kafka集群中的服务器节点,负责存储和转发消息。
  • 功能

    • 存储消息:Broker负责存储Topic中的消息,并包管消息的有序性和可靠性。
    • 转发消息:Broker负责将生产者发送的消息转发给消耗者,并包管消息的可靠性和次序性。
    • 管理元数据:Broker维护Topic和Partition的元数据信息,供生产者和消耗者利用。
    • 提供API接口:Broker提供了API接口,供生产者和消耗者利用。


四、Topic


  • 定义:消息的逻辑分组,是Kafka消息系统中的根本单元,用于将消息进行逻辑上的分组和归类。
  • 作用:通过创建差别的Topic,可以根据需求将消息进行分类、分割和分发,以满意差别业务场景下的消息传递需求。
  • 特点:每个Topic可以有一个或多个分区(Partition),每个分区可以保存差别时间段的消息序列。

五、Partition


  • 定义:分区,是Topic的物理分割,是Kafka中存储消息的最小单位。每个分区都有一个对应的日志文件,用于次序写入和追加消息。
  • 作用

    • 提高吞吐量:通过分区,Kafka可以分散数据和负载,支持并行处理,从而提高团体吞吐量。
    • 有序存储:在同一个分区内,消息是按照写入的次序存储的,消耗者可以包管读取到的消息是有序的。
    • 负载均衡:通过合理分配分区,可以实现负载均衡,避免单点瓶颈。

  • 特点

    • 不可变性:一旦消息写入分区,它就是不可变的,不能被修改或删除,直到超过保留时间或被压缩。
    • 分段存储:每个分区在底层是一个日志文件,由多个段(Segment)组成,每个段是一个物理文件,存储一批有序的消息。
    • 索引文件:每个段都有一个对应的索引文件,用于快速查找消息的偏移量(Offset)。


六、副本(Replication)


  • 定义:Partition的副本机制,也可以称之为备份机制,通过多副本包管数据的高可用性和容错性。Kafka将每个分区的消息复制到多个副本中,这些副本分布在差别的Broker上。如许纵然某个Broker出现故障,其他Broker上的副本仍然可以保留和规复消息。
  • 角色

    • Leader:每个分区有一个主副本(Leader),负责处理所有的读写哀求。
    • Follower:其余副本是从副本(Follower),负责从主副本复制数据。称之为ISR副本聚集。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。
    • Unclean领导者推选:当ISR聚集为空时,Kafka需要重新推选一个新的领导者副本。此时,Kafka可以选择从非同步副本中推选领导者副本,这称为Unclean领导者推选。开启Unclean领导者推选可能会造成数据丢失,但提升了系统的高可用性;而禁止Unclean领导者推选则维护了数据的同等性。

  • 作用

    • 数据冗余:提供数据冗余,纵然系统部门组件失效,系统依然可以或许继续运转,增加了团体可用性以及数据持久性。
    • 故障规复:当主副本不可用时,从副本可以主动提升为主副本,确保系统的一连性。

  • 同步原理:Kafka接纳基于领导者(Leader)的副本机制。从副本(Follower)不对外提供服务,只是定期地异步拉取 领导者副本(Leader)中的数据,从而实现与领导者副本的同步。当领导者副本挂掉时,Kafka会依托于ZooKeeper提供的监控功能及时感知到,并立即开启新一轮的领导者推选,从跟随者副本中选一个作为新的领导者。


七、ISR机制

        指分区中的一组与Leader副本保持同步的副本聚集; 这些副本包罗了Leader副本中的所有已确认消息。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。只有ISR中的副本才华参与消息的同步复制和数据的读写操作,从而确保数据的同等性和可靠性。

八、Controller机制

        Kafka利用Controller机制来管理Broker、Partition、Leader等元数据信息。Controller是一个独立的组件,负责监控和管理整个集群的状态和配置。当出现非常环境时,Controller可以主动进行故障转移和数据规复操作,确保集群的稳定性和高可用性。

快速故障规复的机制。当某个Broker出现故障时,可以通过ISR机制和Controller机制快速进行主副本的推选和数据的规复,从而确保集群的高可用性和数据的可靠性。

九、ACK应答机制

是生产者发送数据后Kafka的接收确认方式。
在生产者发送消息到Kafka集群时,可以设置差别的ACK级别来确保消息在发送过程中的可靠性和持久性。
Kafka的ACK机制有三种级别,由acks参数的值控制:


  • acks=0:无需等待确认,可靠性最低
  • acks=1:默认值。生产者发送消息后会等待Broker的领导者(Leader)确认, 会确认消息已经被领导者接收写入,但不肯定被复制到所有的副本。
  • acks=-1(all):等待主题分区中所有ISR 副本都成功写入消息后才返回确认。acks=all时,Kafka只有在消息被写入ISR中的所有副本后才会返回确认,这确保了消息纵然在Leader故障的环境下也不会丢失,因为ISR中的其他副本可以推选为新的Leader。

日志持久化


  • Kafka将消息持久化到磁盘上,纵然Broker宕机或重启,消息也不会丢失。Kafka利用次序写入和分段存储的方式来提高性能,同时包管数据的可靠性。每个主题都有一个或多个分区,每个分区都有一个对应的日志文件,用于次序写入和追加消息。

持久性包管


  • Kafka提供了多种消息传递语义,包罗至少一次传递和最多一次传递。这些语义可以确保消息在发送和接收之间的可靠传递,并包管消息不会丢失或重复。


十、Kafka中Zookeeper的作用


  • 推选leader

    • 在Kafka集群中,每个分区都会有一个leader来负责处理读写哀求。
    • 当leader节点出现故障时,Zookeeper会利用推选算法协助进行新的leader推选,确保集群可以或许继续正常运行,并处理来自生产者和消耗者的哀求。

  • 管理消耗者组的偏移量

    • 消耗者组在消耗消息时,需要记录当前已经消耗的消息偏移量。
    • Zookeeper用来保存这些偏移量信息,以便在消耗者重启或发生故障时,可以或许继续从上次消耗的位置开始,从而包管消息消耗的一连性和准确性。

  • 发现和注册服务

    • Kafka的broker节点和消耗者在启动时会向Zookeeper注册自己的信息,包罗IP地点、端口号和可用的分区等。
    • 其他节点可以通过查询Zookeeper来发现可用的broker节点和分区,从而方便地进行消息的生产和消耗。

  • 维护集群的元数据

    • Zookeeper保存了Kafka集群的元数据信息,这些信息包罗但不限于broker的列表、topic的分区信息、以及消耗者组的偏移量等。
    • Kafka通过与Zookeeper的交互来获取这些元数据,以确保集群内部各组件之间的和谐同等。

  • 监控集群的康健状态

    • Zookeeper负责监控Kafka集群中各个节点的状态,包罗节点的上下线环境、存活状态以及partition的副本分配环境等。
    • 通过提供心跳检测和故障检测功能,Zookeeper可以或许及时发现并关照集群中可能存在的问题,从而保障集群的稳定运行。



十一、如何包管消息有序?

1、生产者如何有序
(1)序列号:为每条消息分配一个唯一的序列号,标志数据的次序。消息排序,重新发送。消耗者 按序列号来消耗消息。
(2)指定分区发送。
2、存储有序
一个分区内有序,多个分区无法包管。获取数据后会将数据次序写入日志文件。

3、消耗着如何有序
(1)一个消耗者 单线程单实例 消耗。同一个Topic、同一个分区,一个Partition分区只会被一个消耗者组中的一个消耗者消耗,从而包管次序性。
(2)按序号消耗。

生产者如何将消息 发布到指定分区
kafka默认分区策略:轮询 选出一个partition分区
(1)自定义分区策略:实现Partitioner接口,重写partition( )方法
(2)每条消息设置同一个key:盘算key的hash值路由到指定分区,同一个key的消息都在同一个分区

消耗者如何消耗同一个分区消息?
(1)消耗者消耗方法上利用 @KafkaListener的topicPartitions属性 和 @TopicPartition @PartitionOffsets 注解指定topic、partition、offset。

十二、如何包管消息不丢失?



  • 生产者(Producer):通过同步阻塞的方式的发送,设置acks=all,等待所有ISR 副本都成功写入消息后才返回确认;超时或者失败 会重试
  • Broker:支持同步刷盘(持久化到硬盘中)、异步刷盘的策略,支持 1主N从的策略。持久化的 offset (偏移量),消息 支持持久化到Commitlog里面。
  • 消耗者(Consumer):主动提交偏移量,改为手动提交,确保业务逻辑真正实行成功 再提交偏移量。



  • 主动提交: 不是立即提交,默认每5秒 主动提交消耗偏移量。可能导致在处理消息时发生错误时无法回滚偏移量,造成消息的重复消耗或丢失。
  • 手动提交: 需要显式地调用提交方法来提交消耗偏移量。 可在 成功处理完消息后 提交偏移量, 确保消息至少被处理一次。
  • 同步提交(commitSync): 消耗者在提交偏移量时会阻塞等待Kafka的确认。会导致消耗者在提交偏移量时发生阻塞,可能会影响消耗者的吞吐量。
  • 异步提交(commitAsync): 不会阻塞等待Kafka的确认,会立即返回,消耗者可以继续处理其他使命。 如果提交失败,可能不会立即被发现,可能导致消息的重复消耗或丢失。但 Kafka允许为异步提交 提供回调函数(OffsetCommitCallback),以便在提交成功或失败时实行相应的操作。

通讯流程:拉消息,拉取消息的过程通常是通过一个循环来实现的,在循环中,消耗者不停调用poll方法来拉取新的消息记录 。poll方法的参数指定了拉取消息的超时时间 。 拉取到的消息记录包罗多个消息,每个消息都包罗了消息的元数据(如主题、分区、偏移量等)和现实的消息内容。
指定消息在分区中的偏移量offset,消耗特定分区的消息,并且可向后回滚重复消耗。

十三、消耗端收到两条一样的消息,应该怎样处理?

(1)幂等性:消耗端 处理消息 保持幂等性,不管来多少条重复消息,最后处理的结果都一样
(2)记录消息ID和消耗状态:包管每条消息都有唯一ID编号, 消耗者端 维护张日志表 记录已经处理成功的消息的ID

十四、消息堆积怎么办?

(1)生产者:限流,设置合理的发送速率、利用滑动窗口控制速率
(2)消息队列:增加队列的分区数;设置消息逾期时间。 可以将堆积的消息进行持久化存储,以便后续处理。
(3)消耗端:服务器扩容;建Consumer消耗者集群,增加消耗者数量,批量消耗 调整批量巨细和拉取间隔,异步消耗、异步提交;优化消耗者性能(优化数据库查询、淘汰网络调用次数、利用多线程或异步处理)。

十五、消耗失败怎么办?

(1)重试:设置合理的重试次数和重试间隔。
(2)记录:将消耗失败的消息存储到数据库。消耗者可以记录错误信息并跳过有问题的消息。
(3)死信队列:对于无法处理的消息,应将其移动到死信队列Topic中,并定期分析和处理这些消息。这有助于避免消息堆积对正常业务的影响。



十六、Kafka性能优化

通过压缩和批量发送优化。
Kafka支持Gzip、Snappy、LZ4、zstd等数据压缩算法。
zstd有最高的压缩比(zstd > LZ4 > gzip > Snappy),LZ4服从最高(LZ4 > Snappy > zstd > gzip)

十七、Kafka常见配置参数

  1. kafka:  
  2.   # Broker的全局设置  
  3.   global:  
  4.     broker.id: 0                  # Kafka broker的唯一标识符,每个broker的id都应该是唯一的  
  5.     log.dirs: /var/lib/kafka/data # Kafka存储消息的目录  
  6.     zookeeper.connect: localhost:2181 # Kafka连接到Zookeeper的地址  
  7.   
  8.   # 网络设置  
  9.   network:  
  10.     listeners: PLAINTEXT://:9092  # Kafka监听的地址和端口,用于客户端连接  
  11.     advertised.listeners: PLAINTEXT://your.host.name:9092 # 客户端连接broker的地址,通常是在集群外部访问的地址  
  12.     num.network.threads: 3        # 用于处理网络请求的线程数  
  13.     num.io.threads: 8             # 用于处理磁盘IO的线程数  
  14.     socket.send.buffer.bytes: 102400 # Socket发送缓冲区大小  
  15.     socket.receive.buffer.bytes: 102400 # Socket接收缓冲区大小  
  16.     socket.request.max.bytes: 104857600 # 请求的最大字节数  
  17.   
  18.   # 日志设置  
  19.   log:  
  20.     log.retention.hours: 168      # 日志保留的小时数  
  21.     log.segment.bytes: 1073741824 # 每个日志段的大小(以字节为单位)  
  22.     log.retention.check.interval.ms: 300000 # 检查日志是否过期的间隔时间(以毫秒为单位)  
  23.     log.cleaner.enable: true      # 是否启用日志清理器(用于删除过期的日志)  
  24.   
  25.   # 复制设置  
  26.   replication:  
  27.     default.replication.factor: 1 # 默认的消息复制因子  
  28.     min.insync.replicas: 1        # 在提交消息之前,必须有多少副本是同步的  
  29.     replica.lag.time.max.ms: 10000 # 副本落后时间的最大阈值(以毫秒为单位)  
  30.     replica.socket.timeout.ms: 30000 # 副本之间连接的超时时间(以毫秒为单位)  
  31.     replica.fetch.max.bytes: 1048576 # 副本从leader拉取数据的最大字节数  
  32.   
  33.   # 消费者和生产者设置(这些通常在客户端配置,但可以在broker上设置默认值)  
  34.   consumer_producer:  
  35.     message.max.bytes: 1000000    # 消息的最大字节数(包括键和值)  
  36.     replica.fetch.min.bytes: 1    # 副本从leader拉取的最小字节数  
  37.     request.timeout.ms: 30000     # 请求的超时时间(以毫秒为单位)  
  38.     group.min.session.timeout.ms: 6000 # 消费者组的最小会话超时时间(以毫秒为单位)  
  39.     auto.commit.interval.ms: 5000 # 自动提交偏移量的间隔时间(以毫秒为单位)  
  40.   
  41.   # 其他设置  
  42.   other:  
  43.     num.partitions: 1             # 默认的主题分区数  
  44.     controlled.shutdown.enable: true # 是否启用受控关闭  
  45.     delete.topic.enable: true     # 是否允许删除主题
复制代码
  1. # 消费者配置  
  2. fetch.min.bytes=1                 消费者拉取数据的最小批次大小
  3. fetch.max.bytes=5242880 # 5MB     消费者每次拉取数据的最大量
  4. max.poll.records=100              消费者每次拉取的最大记录数(即消息数)
  5. fetch.max.wait.ms=500             消费者在拉取数据时,如果数据不足fetch.min.bytes所指定的量,则消费者会等待的最长时间(以毫秒为单位)。
  6. min.insync.replicas      在提交消息之前,必须有多少副本是同步的。
  7. replica.*参数            与副本相关的配置,如副本落后时间的最大阈值、副本之间连接的超时时间等。
  8. message.max.bytes        消息的最大字节数。
  9. request.timeout.ms       请求的超时时间。
  10. group.min.session.timeout.ms    消费者组的最小会话超时时间。
  11. auto.commit.interval.ms         自动提交偏移量的间隔时间。
  12. num.partitions                  默认的主题分区数。
  13. controlled.shutdown.enable      是否启用受控关闭。
  14. delete.topic.enable             是否允许删除主题。
  15. auto.leader.rebalance.enable    是否启用自动领导者选举平衡。
  16. leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.percentage   领导者不平衡检查和阈值设置。
  17. # 性能和调优设置  
  18. queued.max.requests=500      等待处理的请求队列的最大大小。
  19. compression.type=producer    生产者消息的压缩类型。
  20. metric.reporters=[]          用于报告指标的类列表。
  21. num.replica.fetchers=1       每个broker用于从其他broker拉取数据的线程数。
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表