ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka [打印本页]

作者: 忿忿的泥巴坨    时间: 2024-10-31 11:15
标题: Kafka
一、主要作用



二、特点




三、Broker


四、Topic


五、Partition


六、副本(Replication)



七、ISR机制

        指分区中的一组与Leader副本保持同步的副本聚集; 这些副本包罗了Leader副本中的所有已确认消息。当Leader副本出现故障时,可以从ISR中推选出一个新的Leader副本。只有ISR中的副本才华参与消息的同步复制和数据的读写操作,从而确保数据的同等性和可靠性。

八、Controller机制

        Kafka利用Controller机制来管理Broker、Partition、Leader等元数据信息。Controller是一个独立的组件,负责监控和管理整个集群的状态和配置。当出现非常环境时,Controller可以主动进行故障转移和数据规复操作,确保集群的稳定性和高可用性。

快速故障规复的机制。当某个Broker出现故障时,可以通过ISR机制和Controller机制快速进行主副本的推选和数据的规复,从而确保集群的高可用性和数据的可靠性。

九、ACK应答机制

是生产者发送数据后Kafka的接收确认方式。
在生产者发送消息到Kafka集群时,可以设置差别的ACK级别来确保消息在发送过程中的可靠性和持久性。
Kafka的ACK机制有三种级别,由acks参数的值控制:


日志持久化


持久性包管



十、Kafka中Zookeeper的作用



十一、如何包管消息有序?

1、生产者如何有序
(1)序列号:为每条消息分配一个唯一的序列号,标志数据的次序。消息排序,重新发送。消耗者 按序列号来消耗消息。
(2)指定分区发送。
2、存储有序
一个分区内有序,多个分区无法包管。获取数据后会将数据次序写入日志文件。

3、消耗着如何有序
(1)一个消耗者 单线程单实例 消耗。同一个Topic、同一个分区,一个Partition分区只会被一个消耗者组中的一个消耗者消耗,从而包管次序性。
(2)按序号消耗。

生产者如何将消息 发布到指定分区
kafka默认分区策略:轮询 选出一个partition分区
(1)自定义分区策略:实现Partitioner接口,重写partition( )方法
(2)每条消息设置同一个key:盘算key的hash值路由到指定分区,同一个key的消息都在同一个分区

消耗者如何消耗同一个分区消息?
(1)消耗者消耗方法上利用 @KafkaListener的topicPartitions属性 和 @TopicPartition @PartitionOffsets 注解指定topic、partition、offset。

十二、如何包管消息不丢失?





通讯流程:拉消息,拉取消息的过程通常是通过一个循环来实现的,在循环中,消耗者不停调用poll方法来拉取新的消息记录 。poll方法的参数指定了拉取消息的超时时间 。 拉取到的消息记录包罗多个消息,每个消息都包罗了消息的元数据(如主题、分区、偏移量等)和现实的消息内容。
指定消息在分区中的偏移量offset,消耗特定分区的消息,并且可向后回滚重复消耗。

十三、消耗端收到两条一样的消息,应该怎样处理?

(1)幂等性:消耗端 处理消息 保持幂等性,不管来多少条重复消息,最后处理的结果都一样
(2)记录消息ID和消耗状态:包管每条消息都有唯一ID编号, 消耗者端 维护张日志表 记录已经处理成功的消息的ID

十四、消息堆积怎么办?

(1)生产者:限流,设置合理的发送速率、利用滑动窗口控制速率
(2)消息队列:增加队列的分区数;设置消息逾期时间。 可以将堆积的消息进行持久化存储,以便后续处理。
(3)消耗端:服务器扩容;建Consumer消耗者集群,增加消耗者数量,批量消耗 调整批量巨细和拉取间隔,异步消耗、异步提交;优化消耗者性能(优化数据库查询、淘汰网络调用次数、利用多线程或异步处理)。

十五、消耗失败怎么办?

(1)重试:设置合理的重试次数和重试间隔。
(2)记录:将消耗失败的消息存储到数据库。消耗者可以记录错误信息并跳过有问题的消息。
(3)死信队列:对于无法处理的消息,应将其移动到死信队列Topic中,并定期分析和处理这些消息。这有助于避免消息堆积对正常业务的影响。



十六、Kafka性能优化

通过压缩和批量发送优化。
Kafka支持Gzip、Snappy、LZ4、zstd等数据压缩算法。
zstd有最高的压缩比(zstd > LZ4 > gzip > Snappy),LZ4服从最高(LZ4 > Snappy > zstd > gzip)

十七、Kafka常见配置参数

  1. kafka:  
  2.   # Broker的全局设置  
  3.   global:  
  4.     broker.id: 0                  # Kafka broker的唯一标识符,每个broker的id都应该是唯一的  
  5.     log.dirs: /var/lib/kafka/data # Kafka存储消息的目录  
  6.     zookeeper.connect: localhost:2181 # Kafka连接到Zookeeper的地址  
  7.   
  8.   # 网络设置  
  9.   network:  
  10.     listeners: PLAINTEXT://:9092  # Kafka监听的地址和端口,用于客户端连接  
  11.     advertised.listeners: PLAINTEXT://your.host.name:9092 # 客户端连接broker的地址,通常是在集群外部访问的地址  
  12.     num.network.threads: 3        # 用于处理网络请求的线程数  
  13.     num.io.threads: 8             # 用于处理磁盘IO的线程数  
  14.     socket.send.buffer.bytes: 102400 # Socket发送缓冲区大小  
  15.     socket.receive.buffer.bytes: 102400 # Socket接收缓冲区大小  
  16.     socket.request.max.bytes: 104857600 # 请求的最大字节数  
  17.   
  18.   # 日志设置  
  19.   log:  
  20.     log.retention.hours: 168      # 日志保留的小时数  
  21.     log.segment.bytes: 1073741824 # 每个日志段的大小(以字节为单位)  
  22.     log.retention.check.interval.ms: 300000 # 检查日志是否过期的间隔时间(以毫秒为单位)  
  23.     log.cleaner.enable: true      # 是否启用日志清理器(用于删除过期的日志)  
  24.   
  25.   # 复制设置  
  26.   replication:  
  27.     default.replication.factor: 1 # 默认的消息复制因子  
  28.     min.insync.replicas: 1        # 在提交消息之前,必须有多少副本是同步的  
  29.     replica.lag.time.max.ms: 10000 # 副本落后时间的最大阈值(以毫秒为单位)  
  30.     replica.socket.timeout.ms: 30000 # 副本之间连接的超时时间(以毫秒为单位)  
  31.     replica.fetch.max.bytes: 1048576 # 副本从leader拉取数据的最大字节数  
  32.   
  33.   # 消费者和生产者设置(这些通常在客户端配置,但可以在broker上设置默认值)  
  34.   consumer_producer:  
  35.     message.max.bytes: 1000000    # 消息的最大字节数(包括键和值)  
  36.     replica.fetch.min.bytes: 1    # 副本从leader拉取的最小字节数  
  37.     request.timeout.ms: 30000     # 请求的超时时间(以毫秒为单位)  
  38.     group.min.session.timeout.ms: 6000 # 消费者组的最小会话超时时间(以毫秒为单位)  
  39.     auto.commit.interval.ms: 5000 # 自动提交偏移量的间隔时间(以毫秒为单位)  
  40.   
  41.   # 其他设置  
  42.   other:  
  43.     num.partitions: 1             # 默认的主题分区数  
  44.     controlled.shutdown.enable: true # 是否启用受控关闭  
  45.     delete.topic.enable: true     # 是否允许删除主题
复制代码
  1. # 消费者配置  
  2. fetch.min.bytes=1                 消费者拉取数据的最小批次大小
  3. fetch.max.bytes=5242880 # 5MB     消费者每次拉取数据的最大量
  4. max.poll.records=100              消费者每次拉取的最大记录数(即消息数)
  5. fetch.max.wait.ms=500             消费者在拉取数据时,如果数据不足fetch.min.bytes所指定的量,则消费者会等待的最长时间(以毫秒为单位)。
  6. min.insync.replicas      在提交消息之前,必须有多少副本是同步的。
  7. replica.*参数            与副本相关的配置,如副本落后时间的最大阈值、副本之间连接的超时时间等。
  8. message.max.bytes        消息的最大字节数。
  9. request.timeout.ms       请求的超时时间。
  10. group.min.session.timeout.ms    消费者组的最小会话超时时间。
  11. auto.commit.interval.ms         自动提交偏移量的间隔时间。
  12. num.partitions                  默认的主题分区数。
  13. controlled.shutdown.enable      是否启用受控关闭。
  14. delete.topic.enable             是否允许删除主题。
  15. auto.leader.rebalance.enable    是否启用自动领导者选举平衡。
  16. leader.imbalance.check.interval.seconds和leader.imbalance.per.broker.percentage   领导者不平衡检查和阈值设置。
  17. # 性能和调优设置  
  18. queued.max.requests=500      等待处理的请求队列的最大大小。
  19. compression.type=producer    生产者消息的压缩类型。
  20. metric.reporters=[]          用于报告指标的类列表。
  21. num.replica.fetchers=1       每个broker用于从其他broker拉取数据的线程数。
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4