守听 发表于 2024-8-6 01:04:11

kafka17-消息阻塞原理及优化方案

目次
Broker 端负载不均衡导致消息阻塞
硬盘利用率不均衡和 IOutil 不均衡场景
数据迁徙办理方案
基于空闲硬盘优先的分区迁徙示例 
生成迁徙操持:
提交迁徙操持:
验证迁徙操持:
自动负载均衡工具:
PageCache 容量不足导致消息阻塞 
PageCache导致消息阻塞原理
 办理方法
Kafka 和 Flink 实时作业混淆摆设导致消息阻塞
混淆摆设的背景
办理方案
生产者buffer.memory容量不足
buffer.memory容量不足导致消息发送阻塞的原理
RecordAccumulator 类中的解释和逻辑
BufferPool 类中的 allocate 方法
办理方案
 

Broker 端负载不均衡导致消息阻塞

硬盘利用率不均衡和 IOutil 不均衡场景


[*] 分区设计不合理:如果某些 Topic 的流量远大于其他 Topic,或者生产者在发送消息时指定了分区,而未指定的分区没有消息,这大概导致节点间或分区间的数据不均衡。
[*] 副本分配不均:每个分区有一个或多个副本分布在不同的 Broker 节点上。如果副本分配策略不当,大概导致某些节点或磁盘的负载过高。
[*] 磁盘性能差别:如果集群中利用的磁盘性能乱七八糟,大概会导致性能较差的磁盘成为瓶颈,进而影响团体性能。
[*] 热点 Topic:某些特殊热门的 Topic 大概会导致对应的磁盘负载不均衡。
[*] Broker 扩容问题:Kafka 扩容了 Broker 节点后,如果新增的节点没有合理分配分区,也会导致节点间的数据不均衡。
[*] Leader 副本切换:随着集群状态的变化,Leader 副本的切换或迁徙大概导致个别 Broker 节点上的数据更多,进而导致节点间的数据不均衡。
[*] 客户端利用问题:客户端利用不当,如利用 key 进行 hash 分配大概导致生产倾斜,进而影响消耗出现 lag。
数据迁徙办理方案


[*] 评估迁徙使命:在迁徙之前,评估必要迁徙的数据量和目标硬盘的容量,确保迁徙后的数据分布均匀。
[*] 利用 Kafka 的迁徙工具:利用 kafka-reassign-partitions.sh 工具进行数据迁徙。注意,不同版本的 Kafka 工具有不同的特性,比如 2.6.0 版本支持并行提交迁徙使命。
[*] 并行迁徙与限流:在新版本中,可以利用并行迁徙提高服从,但同时要设置合适的限流参数,以制止迁徙过程中对正常业务造成影响。
[*] 选择合适的时间窗口:选择体系负载较低的时间段进行迁徙,比如午夜之后,以淘汰对用户活跃时段的影响。
[*] 监控迁徙进度:实时监控迁徙进度和体系性能指标,确保迁徙使命按操持进行,及时发现并办理迁徙过程中的问题。
[*] 处理迁徙中的资源竞争:迁徙请求和实时消耗数据的 poll() 操作共用 Fetcher 线程,因此要特殊注意分区迁徙大概对实时消耗造成的影响。大概必要调解消耗者的配置,比如增加消耗者数量或调解 max.poll.records 参数。
[*] 优化 Kafka 配置:根据迁徙使命的需求,大概必要暂时调解 Kafka 配置,比如调解 replica.alter.log.dirs.count 参数来控制副本的日记目次数量。
[*] 利用监控工具:利用 Kafka Manager、Burrow 或其他监控工具监控集群状态,确保迁徙过程中集群的健康。
[*] 准备回滚操持:在迁徙前准备好回滚操持,以便在迁徙出现问题时可以或许快速规复到迁徙前的状态。
基于空闲硬盘优先的分区迁徙示例 

生成迁徙操持:


[*] 准备形貌文件 move-json-file.json,示例如下:
{
"topics": [
    {"topic": "test-topic"}
],
"version": 1
}
[*] 利用以下命令生成迁徙操持,假设Kafka 集群的 Bootstrap Server 是 test-1:9092,而且在 Broker 101, 102, 103 上重新分布分区:
bin/kafka-reassign-partitions.sh --bootstrap-server test-1:9092 --topics-to-move-json-file move-json-file.json --broker-list "101,102,103" --generate
提交迁徙操持:


[*] 将生成的迁徙方案保存到 topics-to-move.json 文件中。
[*] 利用以下命令提交迁徙操持,并设置合适的 --throttle 和 --replica-alter-log-dirs-throttle 参数以控制迁徙速率:
bin/kafka-reassign-partitions.sh --bootstrap-server test-1:9092 --reassignment-json-file topics-to-move.json --throttle 1048576 --replica-alter-log-dirs-throttle 1048576 --execute
验证迁徙操持:

利用以下命令验证迁徙使命的实行环境,确保迁徙成功并移除限流设置:
bin/kafka-reassign-partitions.sh --bootstrap-server test-1:9092 --reassignment-json-file topics-to-move.json --verify 自动负载均衡工具:


[*] Cruise Control: LinkedIn 开发的 Cruise Control 项目可以自动进行 Kafka 集群的动态负载均衡,包罗但不限于 CPU、硬盘利用率、IOutil 和副本分布。它通过监控集群状态并自动调解资源分配来优化集群性能。

[*] 功能特点:

[*]自动进行负载均衡,淘汰手动干预。
[*]支持多种资源的监控和优化。
[*]具备 Leader 重新选主的能力。
[*]允许更改 Topic 配置以适应不同的业务需求。

[*] 利用场景:

[*]当 Kafka 集群规模较大,手动运维成本高时。
[*]必要持续优化集群性能和资源利用率的场景。

[*] 摆设和配置:

[*]根据 Cruise Control 的文档进行摆设和配置。
[*]监控其性能并根据现实环境调解参数。




PageCache 容量不足导致消息阻塞 

PageCache导致消息阻塞原理



[*] PageCache及其作用:

[*]PageCache是操作体系内核利用内存缓存磁盘数据的一种机制,用于加快数据读取和写入操作。在Kafka中,PageCache可以或许利用"零拷贝"技能,直接将数据从PageCache复制到网卡,从而低落数据消耗者的耽误。

[*] 容量不足引发的问题:

[*]PageCache的容量受限于机器的物理内存大小,一样平常不大概超过这个限定。当PageCache容量不足时,无法完全缓存全部必要快速访问的数据,导致频仍的磁盘读取操作。
[*]例如,如果一台机器的PageCache容量为30GB,而数据的读写速度为160MB/s,则PageCache最多可以缓存约2分钟的数据。当消耗者必要访问2分钟之前的数据时,大概会发现这些数据已经不再PageCache中,从而触发额外的磁盘读取。

[*] PageCache污染:

[*]当PageCache容量不足时,操作体系必要将新的数据块写入PageCache时,大概会更换掉之前被消耗者必要的数据块。这种环境被称为PageCache污染。
[*]PageCache污染会导致消耗者必要额外的磁盘读取耽误,由于之前的数据大概已经被更换,无法通过"零拷贝"技能快速获取。

[*] 生产者和消耗者的影响:

[*]当消耗者面临PageCache容量不足和污染时,其数据读取速度大概会显著下降,从而影响到Kafka体系的团体性能。
[*]同时,生产者生成的新数据也大概由于PageCache被消耗者利用而无法立即写入,导致发送消息的阻塞征象,进一步影响体系的吞吐量和耽误。

 办理方法

在办理PageCache污染问题时,主要会合在增大内存以及优化写入方式和利用硬件加快的方法:

[*] 增大内存的范围性:

[*]PageCache污染的根本缘故原由是内存不足,导致无法完全缓存全部必要快速访问的数据。然而,现实上将内存增大到与硬盘相当的大小是不现实的。即使增加了内存,也只是缓解了PageCache污染带来的读写速度慢的问题,而不能完全办理。

[*] 优化写入方式:

[*]一种有效的办理方案是将随机写操作只管改成次序写,而且在每次实行次序写操作时,尽大概多地写入数据。Kafka擅长次序写,因此通过优化写入次序和批量写入可以淘汰PageCache的频仍更新,低落PageCache污染的大概性。

[*] 硬件加快的应用:

[*]针对HDD硬盘随机写性能不足的问题,可以考虑利用RAID卡加快。RAID卡具备自带缓存,类似于PageCache,在RAID层级会将数据合并成更大的块然后写入HDD硬盘。这样做可以或许更充分地利用次序写的带宽,淘汰磁盘的IO次数,从而间接提升随机写性能。

总结来说,办理PageCache污染问题必要综合考虑增加内存、优化数据写入方式以及利用硬件加快技能。每种方法都有其范围性和实用场景,合理地结合这些方法可以有效地改善体系的性能和稳定性,特殊是在面临长尾消耗者导致的PageCache污染时。
Kafka 和 Flink 实时作业混淆摆设导致消息阻塞

混淆摆设的背景

随着公司追求低落成本和提高资源利用率,许多企业选择将IO密集型应用(如Kafka和RocketMQ)与CPU密集型应用(如Flink和SparkStreaming)混淆摆设。然而,这种摆设方式大概会导致物理资源的竞争问题,特殊是在CPU核心、L1Cache和L2Cache的共享环境下。例如,当Flink实时计算平台的CPU利用率飙升时,很大概会影响到Kafka的读写性能,表现为Kafka生产者在发送消息时出现阻塞征象。
办理方案



[*] 选择合适的存储介质:

[*]硬盘选择:Kafka 集群通常恰当利用HDD硬盘,由于其特点是次序读写较多,而且数据持久化和稳定性较为重要。对于 Flink 和 SparkStreaming 这类CPU密集型应用,更恰当利用SSD硬盘,由于它们更依赖于低耽误和高并发的随机访问能力。
[*]注意写穿问题:如果将 Kafka 和 Flink 都摆设在SSD上,必要注意Kafka的写入操作大概导致SSD的快速消耗。可以考虑利用合理的存储条理布局或者控制策略,以低落这种风险。

[*] 利用 RAID 卡优化IO性能:

[*]在硬件层面,可以考虑利用RAID卡来优化IO性能。RAID卡不但可以提供硬盘的冗余和并发性能,还能在数据写入时进行缓冲和整合,淘汰对物理资源的竞争和频仍的IO操作,从而提升团体性能和稳定性。

[*] 分开摆设IO密集型和CPU密集型应用:

[*]最简单有效的方式是将 Kafka 和 Flink 分开摆设到不同的物理机器或者虚拟机上。这样可以制止它们在相同物理核心上的竞争,淘汰由于资源争取而导致的性能颠簸和阻塞征象。这种方法虽然增加了物理资源的利用率上限,但可以确保每个应用获得更可控的资源分配和独立的性能保障。

生产者buffer.memory容量不足

buffer.memory容量不足导致消息发送阻塞的原理

在 Kafka Producer 中,当初始化时指定了缓冲池大小参数 buffer.memory 默认为 32MB。如果生产者在运行过程中,实验将消息追加到 RecordAccumulator 缓冲池中时,发现缓冲池的内存资源已经耗尽,会出现以下环境:


[*] 阻塞机制: 如果缓冲池中的内存资源耗尽,Producer 将会阻塞,直到有足够的空闲内存块可用为止。这意味着后续的消息追加操作会一直等待,直到体系可以或许分配到足够的内存。
[*] 异常抛出: 如果配置的最大阻塞时间内仍未能分配到足够的内存,将抛出 BufferExhaustedException 异常。这种环境下,不但会导致生产者发送消息的耽误,还会影响整个体系的稳定性和性能。
RecordAccumulator 类中的解释和逻辑

在 org.apache.kafka.clients.producer.internals.RecordAccumulator 类中,有如下的解释和相关逻辑:
/**
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 */
这段解释分析了 RecordAccumulator 利用了有限的内存,当内存耗尽时,追加操作将会阻塞,除非显式地禁用了这种举动。
BufferPool 类中的 allocate 方法

在 org.apache.kafka.clients.producer.internals.BufferPool 类的 allocate 方法中,有如下的形貌和逻辑:
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    // ...
    while (accumulated < size) {
      long startWaitNs = time.nanoseconds();
      long timeNs;
      boolean waitingTimeElapsed;
      try {
            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
      } finally {
            long endWaitNs = time.nanoseconds();
            timeNs = Math.max(0L, endWaitNs - startWaitNs);
            recordWaitTime(timeNs);
      }

      if (this.closed)
            throw new KafkaException("Producer closed while allocating memory");

      if (waitingTimeElapsed) {
            this.metrics.sensor("buffer-exhausted-records").record();
            throw new BufferExhaustedException("Failed to allocate " + size + " bytes within the configured max blocking time "
                + maxTimeToBlockMs + " ms. Total memory: " + totalMemory() + " bytes. Available memory: " + availableMemory()
                + " bytes. Poolable size: " + poolableSize() + " bytes");
      }
    }
    // ...
}
这段代码展示了 Producer 在实验分配内存时的逻辑:


[*]如果未能在配置的 maxTimeToBlockMs 时间内分配到足够的内存,将会抛出 BufferExhaustedException 异常。
[*]在等待过程中,如果 Producer 被关闭,也会抛出 KafkaException。
办理方案



[*] 办理方案一:增加 RecordAccumulator 的内存大小(通过增加 buffer.memory 参数的默认值)。

[*]问题:尽管增加内存可以淘汰频仍的批次合并,但如果批次大小大于 batch.size,仍旧必要进行 GC,大概导致性能问题,尤其是大概出现 Full GC,影响全部生产者线程的运行。

[*] 办理方案二:利用 RecordAccumulator 的内存池机制,确保每次申请的 ProducerBatch 大小都等于 batch.size。

[*]原理:通过优化 batch.size 的配置,使得大多数消息的大小符合或靠近 batch.size 的值。
[*]长处:这样可以充分利用内存池的重用特性,制止频仍申请和开释内存,从而淘汰 GC 的频率和大概的性能影响。

[*] 结论:

[*]如果大多数消息的大小超过了当前配置的 batch.size,应考虑增大 batch.size 的值,以便更好地利用内存池机制,制止频仍的 GC 和大概的耽误问题。
[*]通过这种方式,可以有效地管理 Kafka 生产者的性能,并淘汰由于垃圾回收带来的埋伏影响,确保消息发送的稳定性和可靠性。




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: kafka17-消息阻塞原理及优化方案