kafka单条消息过大发送失败

打印 上一主题 下一主题

主题 840|帖子 840|积分 2520

一、配景

生产环境中利用kafka作为消息队列,生产者发送消息失败,查询报错日志,得到如下输出:
   Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 4067035 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
  这个错误信息表明生产者实验发送到Kafka的消息在序列化后巨细为4067035字节(约4MB),超过了Kafka配置中的max.request.size参数的限制,该参数当前设置为1048576字节(1MB)
二、缘故原由

排查kafka的max.request.size

排查发现最大的哀求巨细为10485760字节(10M)
排查springboot配置文件中max.request.size的值

项目中没有配置这个值
排查springboot默认kafka的max.request.size值

springboot中Kafka消息队列的默认max.request.size值通常是1048576字节,即1MB
好了就是这个缘故原由了
三、修改方案

要解决这个问题,通常有以下几种选择:
1. 增加kafka自己的message.max.bytes配置

  1. message.max.bytes=10485760 # 10MB
复制代码
当然也可以为特定的主题设置这个值:
  1. kafka-topics.sh --alter --zookeeper localhost:2181 --topic your_topic --config max.message.bytes=10485760
复制代码
2. 增加生产者max.request.size配置

如果是代码则配置如下
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("max.request.size", 10485760 ); // 10MB
复制代码
如果是yml 文件则配置如下 
  1. spring:
  2.   kafka:
  3.     producer:
  4.       properties:
  5.         max.request.size: 10485760
复制代码
3. 增加消耗者的fetch.max.bytes和max.partition.fetch.bytes配置

如果是代码则配置如下
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test-group");
  4. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  5. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  6. props.put("fetch.max.bytes", 10485760); // 10MB
  7. props.put("max.partition.fetch.bytes", 10485760); // 10MB
复制代码
如果是yml 文件则配置如下
  1. spring:
  2.   kafka:
  3.     consumer:
  4.       properties:
  5.         fetch.max.bytes: 10485760
  6.         max.partition.fetch.bytes: 10485760
复制代码
4. 分割消息

如果消息非常大,可能须要思量将其分割成较小的部门,然后在消耗者端重新组装。这种方法可以制止配置过大的消息巨细限制。
5.生产者压缩消息

你可以启用消息压缩来减少消息的巨细。Kafka支持多种压缩算法,如gzip、snappy、lz4等。我们可以在生产者配置中启用压缩:
 如果是代码则配置如下:
  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  4. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  5. props.put("compression.type", "gzip");
复制代码
如果是yml 文件则配置如下
  1. spring:
  2.   kafka:
  3.     producer:
  4.       properties:
  5.         compression.type: gzip
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表