ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka17-消息阻塞原理及优化方案 [打印本页]

作者: 守听    时间: 2024-8-6 01:04
标题: kafka17-消息阻塞原理及优化方案
目次
Broker 端负载不均衡导致消息阻塞
硬盘利用率不均衡和 IOutil 不均衡场景
数据迁徙办理方案
基于空闲硬盘优先的分区迁徙示例 
生成迁徙操持:
提交迁徙操持:
验证迁徙操持:
自动负载均衡工具:
PageCache 容量不足导致消息阻塞 
PageCache导致消息阻塞原理
 办理方法
Kafka 和 Flink 实时作业混淆摆设导致消息阻塞
混淆摆设的背景
办理方案
生产者buffer.memory容量不足
buffer.memory容量不足导致消息发送阻塞的原理
RecordAccumulator 类中的解释和逻辑
BufferPool 类中的 allocate 方法
办理方案
 


Broker 端负载不均衡导致消息阻塞

硬盘利用率不均衡和 IOutil 不均衡场景

数据迁徙办理方案

基于空闲硬盘优先的分区迁徙示例 

生成迁徙操持:

提交迁徙操持:

验证迁徙操持:

利用以下命令验证迁徙使命的实行环境,确保迁徙成功并移除限流设置:
  1. bin/kafka-reassign-partitions.sh --bootstrap-server test-1:9092 --reassignment-json-file topics-to-move.json --verify
复制代码
自动负载均衡工具:



PageCache 容量不足导致消息阻塞 

PageCache导致消息阻塞原理


 办理方法

在办理PageCache污染问题时,主要会合在增大内存以及优化写入方式和利用硬件加快的方法:
总结来说,办理PageCache污染问题必要综合考虑增加内存、优化数据写入方式以及利用硬件加快技能。每种方法都有其范围性和实用场景,合理地结合这些方法可以有效地改善体系的性能和稳定性,特殊是在面临长尾消耗者导致的PageCache污染时。
Kafka 和 Flink 实时作业混淆摆设导致消息阻塞

混淆摆设的背景

随着公司追求低落成本和提高资源利用率,许多企业选择将IO密集型应用(如Kafka和RocketMQ)与CPU密集型应用(如Flink和SparkStreaming)混淆摆设。然而,这种摆设方式大概会导致物理资源的竞争问题,特殊是在CPU核心、L1Cache和L2Cache的共享环境下。例如,当Flink实时计算平台的CPU利用率飙升时,很大概会影响到Kafka的读写性能,表现为Kafka生产者在发送消息时出现阻塞征象。
办理方案


生产者buffer.memory容量不足

buffer.memory容量不足导致消息发送阻塞的原理

在 Kafka Producer 中,当初始化时指定了缓冲池大小参数 buffer.memory 默认为 32MB。如果生产者在运行过程中,实验将消息追加到 RecordAccumulator 缓冲池中时,发现缓冲池的内存资源已经耗尽,会出现以下环境:

RecordAccumulator 类中的解释和逻辑

在 org.apache.kafka.clients.producer.internals.RecordAccumulator 类中,有如下的解释和相关逻辑:
/**
 * The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless
 * this behavior is explicitly disabled.
 */
这段解释分析了 RecordAccumulator 利用了有限的内存,当内存耗尽时,追加操作将会阻塞,除非显式地禁用了这种举动。
BufferPool 类中的 allocate 方法

在 org.apache.kafka.clients.producer.internals.BufferPool 类的 allocate 方法中,有如下的形貌和逻辑:
  1. public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
  2.     // ...
  3.     while (accumulated < size) {
  4.         long startWaitNs = time.nanoseconds();
  5.         long timeNs;
  6.         boolean waitingTimeElapsed;
  7.         try {
  8.             waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
  9.         } finally {
  10.             long endWaitNs = time.nanoseconds();
  11.             timeNs = Math.max(0L, endWaitNs - startWaitNs);
  12.             recordWaitTime(timeNs);
  13.         }
  14.         if (this.closed)
  15.             throw new KafkaException("Producer closed while allocating memory");
  16.         if (waitingTimeElapsed) {
  17.             this.metrics.sensor("buffer-exhausted-records").record();
  18.             throw new BufferExhaustedException("Failed to allocate " + size + " bytes within the configured max blocking time "
  19.                 + maxTimeToBlockMs + " ms. Total memory: " + totalMemory() + " bytes. Available memory: " + availableMemory()
  20.                 + " bytes. Poolable size: " + poolableSize() + " bytes");
  21.         }
  22.     }
  23.     // ...
  24. }
复制代码
这段代码展示了 Producer 在实验分配内存时的逻辑:

办理方案





免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4