科技颠覆者 发表于 2024-11-7 22:57:58

kafka客户端消费者吞吐量优化

问题背景

业务场景
mq消息消费及时性要求不高,盼望可以牺牲一部门及时性,调换吞吐量,比方:数据库单条insert优化为batchInsert。优化后效果不符合预期:消费者消费消息的batchSize远小于实际设置的max.poll.records,导致在批量消息达到时想要聚合mq批量操纵业务数据效果与单条处置惩罚效果雷同。于是翻查影响kafka吞吐量的相关设置
缘故原由分析

Kafka 的 poll() 方法返回的消息数目与 batch.size 参数并不是直接相关的,影响 Kafka 消费者 poll() 时能获取消息数目的因素有很多。让我们徐徐分析这些因素,并探究大概的优化方法。
影响 poll() 返回消息数目的因素


[*]**max.poll.records**** 设置**

[*]max.poll.records 限定了每次 poll() 获取的消息最大数目。即使 batch.size 设置得较大,假如 max.poll.records 较小,poll() 获取的消息数依然受限定。

[*]消息生产速度

[*]假如生产者写入 Kafka 的速度较慢,消费端的 poll() 方法在调用时大概没有足够的消息积聚,因此无法返回足够多的消息。
[*]办理方案:监控生产者的写入速度和 Kafka 的积存情况,确保有足够的消息被生产和累积。

[*]分区数目与消费者线程数

[*]假如 Kafka topic 的分区数目较少,且每个消费者线程处置惩罚一个或多个分区,那么分区中的消息总量大概不足,导致每次 poll() 返回的消息数较少。
[*]办理方案:确保 topic 分区数目公道,而且适当增加消费者实例,以进步并行处置惩罚能力。

[*]**fetch.min.bytes**** 和 **fetch.max.wait.ms** 设置**

[*]fetch.min.bytes:消费者从 Kafka 获取消息时,设置了一个最小的字节数,当数据量不足时,Kafka 将等待更多的数据写入。
[*]fetch.max.wait.ms:当 fetch.min.bytes 的数据量未达到时,Kafka 消费者会等待一定的时间再返回消息。假如设置过短,大概会在消息不足时提早返回。
[*]优化思绪:增加 fetch.min.bytes 值,使消费者等待更多数据积聚后再返回。同时适当调解 fetch.max.wait.ms,确保不会过早返回消息。

[*]消费者的 **poll()** 调用频率

[*]消费者应用步伐调用 poll() 的频率也影响返回的消息数目。假如频繁调用 poll(),每次返回的消息大概较少。
[*]办理方案:适当调解 poll() 的调用频率,确保消费者等待足够的消息后再调用。

[*]**session.timeout.ms**** 和 **heartbeat.interval.ms** 设置**

[*]假如这些参数设置不妥,Kafka 消费者大概会因为过频繁发送心跳而导致每次 poll() 间隔较短,未能积聚足够的消息。
[*]办理方案:增加 session.timeout.ms 和 heartbeat.interval.ms 的值,答应消费者有足够时间 poll() 更多消息。

优化发起


[*]**增加 ****max.poll.records**

[*]将 max.poll.records 设置得更大,以确保每次 poll() 尽大概多地返回消息。比方,实验将其从默认的 500 增加到 1000 或更大。

[*]**调解 **fetch.min.bytes** 和 ****fetch.max.wait.ms**

[*]可以将 fetch.min.bytes 增加到 1MB 或 5MB,如许消费者将会等待 Kafka 网络到足够的消息再返回。也可以适当增加 fetch.max.wait.ms,让 Kafka 多等待一段时间再返回消息。

[*]监控消费者调用的频率

[*]适当降低 poll() 的调用频率,确保 Kafka 消费者有时间积聚足够的数据。

[*]增加分区数

[*]确保 Kafka topic 有足够的分区,使得每个分区中可以累积足够的消息。此外,可以考虑增加消费者数目来并行处置惩罚消息。

通过这些设置调解,你可以增加每次 poll() 获取的消息数目,从而进步批量处置惩罚效率。
参数含义

**fetch.max.wait.ms**



[*]描述: fetch.max.wait.ms 是 Kafka 消费者端的设置,表现当消费者从 Kafka broker 请求消息时,假如可用的数据量不足 fetch.min.bytes,消费者最多会等待的时间(毫秒)。当超出这个时间后,即使没有足够的数据,也会返回当前已经积聚的消息。
[*]用途:

[*]这个参数主要用于优化消费者在没有足够数据的情况下的等待时间。通过设定一个公道的 fetch.max.wait.ms,消费者可以等待更多数据积聚来进步吞吐量,但不会因数据不足无限等待。
[*]假如消息到达频率低,消费者就会等待 fetch.max.wait.ms 毫秒后返回;假如数据积聚足够快,消费者会尽早返回。

[*]默认值: 通常默认值是 500 毫秒。
[*]场景: 应用于从本地 Kafka broker 拉取消息的耽误和吞吐量控制。
**remote.fetch.max.wait.ms**



[*]描述: remote.fetch.max.wait.ms 是 Kafka 远程存储机制相关的一个参数。远程存储是在 Kafka 3.0 引入的架构优化,答应将过期的日志段(log segments)存储在远程存储介质上,如云存储(Amazon S3,Google Cloud Storage 等),而不是一直保存在本地磁盘上。remote.fetch.max.wait.ms 控制 Kafka 代理从远程存储拉取过期日志段时,最大等待的时间。
[*]用途:

[*]当消费者实验读取的数据已经从本地磁盘迁移到远程存储时,Kafka 代剖析从远程存储体系中拉取该段数据。remote.fetch.max.wait.ms 就是用来限定 Kafka 在从远程存储读取数据时的最大等待时间。
[*]远程存储拉取通常比本地拉取要慢,因为涉及外部存储体系,以是这个参数用于优化消费者在这种情况下的性能。

[*]场景: 应用于 Kafka 从远程存储拉取过期消息的等待时间控制。
fetch.min.bytes

fetch.min.bytes 是 Kafka 消费者端的一个设置参数,它用于控制每次从 Kafka broker 拉取数据时的最小数据量。这个参数决定了消费者拉取消息时的举动,影响数据批量处置惩罚的效率和耽误。
参数作用:



[*]功能:指定 Kafka broker 每次返回给消费者的消息的最小字节数。当消费者发起拉取请求时,Kafka broker 会等待消息积聚到至少 fetch.min.bytes 指定的字节数后,再将数据返回给消费者。
[*]假如 broker 在 fetch.max.wait.ms 时间内没有积聚到足够的数据(即 fetch.min.bytes),它会返回当前可用的数据量,即使小于 fetch.min.bytes。
默认值:



[*]默认值为 1,意味着 broker 不需要等待消息积聚到一定的字节数,只要有消息(即使只有一条消息),就可以立刻返回给消费者。
使用场景:


[*]高吞吐量场景:

[*]假如你的应用需要批量处置惩罚 Kafka 消息,可以将 fetch.min.bytes 设置得大一些,确保每次拉取的数据足够多,以淘汰频繁的网络请求。
[*]得当需要处置惩罚大批量数据的体系,比如数据分析或日志处置惩罚体系。这时,你可以设置较高的 fetch.min.bytes,让 Kafka broker 等待更多消息积聚后再返回。

[*]低耽误场景:

[*]假如你希望 Kafka 消息能尽快被消费,不希望消费者等待消息积聚,你可以将 fetch.min.bytes 保持默认值(1)。这时,broker 会在有数据可供消费时尽快返回,淘汰耽误。
[*]得当需要快速响应的体系,比如及时监控或流数据处置惩罚。

假如poll(100ms),fetch.max.wait.ms=500ms,那么100ms后mq未达到fetch.min.bytes。客户端会得到当前的records吗?

不会
例子:

假设有以下设置:


[*]fetch.min.bytes = 1MB
[*]fetch.max.wait.ms = 500ms
[*]消费者调用了 poll() 向 broker 请求数据。
情况1:Broker 上的消息量 < 1MB


[*]当消费者请求数据时,broker 上只有 500KB 的消息。
[*]Broker 会比及 fetch.max.wait.ms(500ms),试图等待更多消息的到达。
[*]假如在 500ms 内消息累积达到了 1MB,broker 会立刻返回这 1MB 的消息。
[*]假如 500ms 已往了,仍然没有足够的消息(比如只有 700KB),broker 会返回这些 700KB 的消息。
情况2:Broker 上的消息量 >= 1MB


[*]假如消费者请求时,broker 上已经有 1MB 或更多消息,broker 会立刻返回这些消息,不再等待 fetch.max.wait.ms。
**fetch.max.wait.ms**** 与 **fetch.min.bytes** 配合的意义:**



[*]**fetch.min.bytes** 设置了消费者希望每次拉取的最小数据量,如答应以制止频繁拉取少量消息,进步吞吐量。
[*]**fetch.max.wait.ms** 防止消费者因为等不到足够多的消息而无限期等待,设置了一个时间上限。假如在这个时间内没有足够的数据,broker 仍然会返回已有的数据,制止消费者一直没有数据处置惩罚。
fetch.max.wait.ms 与 poll的超时应该相当比力公道,如许poll不会在mq消息量不足的时候拉到空数据空跑浪费cpu资源?

不是
理解两者的关系:


[*]**fetch.max.wait.ms**:

[*]这是 Kafka broker 在消息不足时等待积聚更多数据的最大时间。假如在这段时间内没有更多数据到达,broker 会返回已经积聚的消息(即使小于 fetch.min.bytes)。
[*]主要目标是制止过于频繁的拉取请求,淘汰网络传输的开销,增加单次拉取的消息量。

[*]**poll()**** 超时时间**:

[*]这是 Kafka 消费者客户端在本地等待 broker 返回数据的最大时间。
[*]假如在 poll() 超时时间内 broker 没有返回数据,poll() 会返回空效果,而且消费者会继续下一轮 poll()。
[*]主要目标是控制消费者的等待时间,以确保在没有数据的情况下不会壅闭太久。

两者的配合:



[*]将 **fetch.max.wait.ms** 与 **poll()** 超时时间设置为相当:假如 poll() 的超时时间等于 fetch.max.wait.ms,理论上可以制止 poll() 过早返回空数据,因为 broker 正在等待积聚足够的数据。
这种设置的好处是可以淘汰消费者空跑的概率,尤其是在消息量较小的场景中。它确保了在 poll() 的等待时间内,broker 至少有足够的时间来积聚消息,最大限度地进步单次拉取的数据量。


[*]实际应用中的权衡:

[*]在实际场景中,将 **fetch.max.wait.ms** 设置略小于 **poll()** 超时时间大概是更公道的选择。比方,你可以设置 fetch.max.wait.ms 为 500ms,poll() 超时时间为 600ms。这种设置让 broker 有足够时间积聚数据,而且消费者 poll() 方法也有足够时间等待 broker 返回数据。
[*]假如 poll() 超时时间与 fetch.max.wait.ms 完全相当,有时大概会导致 poll() 稍微早于 broker 返回数据,从而造成一些无效的 poll() 调用。稍微延长 poll() 时间可以制止这一问题。

其他因素影响:



[*]消息吞吐量和耽误:过长的 fetch.max.wait.ms 和 poll() 时间会增加数据的累积量,但也大概增加处置惩罚耽误。假如应用对及时性要求较高,大概需要缩短这两个时间,使得消费者更频繁地获取消息。
[*]CPU 和网络资源:延长这两个时间可以淘汰空跑和频繁的拉取请求,从而节流 CPU 和网络资源。但假如设置过长,大概会造成消费者响应不及时,特别是当消息积存严肃时。
示例:

假设 fetch.max.wait.ms 设置为 500ms,poll() 超时时间设置为 600ms:


[*]假如在 500ms 内 broker 积聚了足够消息,broker 会立刻返回数据,消费者的 poll() 将会在接收到消息后立刻处置惩罚。
[*]假如 broker 在 500ms 内没有积聚到足够的消息,它会返回当前可用的数据,poll() 超时不会过早触发,确保了消费者不会空跑。
[*]假如 poll() 超时时间设置得比 fetch.max.wait.ms 短,消费者大概会在 broker 还未返回数据之前超时,导致空轮询。
总结:



[*]一般发起:可以将 poll() 超时时间设置得稍微大于 fetch.max.wait.ms,如答应以确保 broker 有足够时间积聚消息,同时制止 poll() 过早超时。
[*]公道的设置:fetch.max.wait.ms = 500ms,poll() 超时 = 600ms。如许 broker 可以最大限度积聚数据,消费者也有足够时间等待数据返回,制止空跑。
这种策略会帮助你在淘汰空轮询和进步批量处置惩罚效率之间找到均衡。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: kafka客户端消费者吞吐量优化