Spring-Kafka确认机制报错:No Acknowledgment available as an argument ...

耶耶耶耶耶  金牌会员 | 2024-9-6 05:39:02 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

问题出现

在spring boot集成kafka时报错,报错信息:
  1. No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
复制代码
翻译: Acknowledgment  参数不可用,监听容器需要设置一个手动的 AckMode 才能添补Acknowledgment。
问题代码:
  1. @KafkaListener(topics = PENDING_TOPIC, groupId = KafkaProducer.TOPIC_GROUP1)
  2.     public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  3.         Optional message = Optional.ofNullable(record.value());
  4.         if (message.isPresent()) {
  5.             Object msg = message.get();
  6.             logger.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
  7.             ack.acknowledge();
  8.         }
  9.     }
复制代码
这是参考其他人的消耗者端的代码,方法里有Acknowledgment 范例参数,说明该参数没有被识别。首先查一下这个参数的含义。
问题分析

Acknowledgment  范例参数的含义:
控制消息的确认机制。在kafka中,消耗者提交偏移量时需要确认,执行ack.acknowledge();后代表告诉 Kafka 消耗者组,当前的消息已经被成功处理并且可以提交偏移量。这意味着消耗者不会再次处理这条消息。
Spring-kafka 提供了多种确认机制,也就是 AckMode。官方文档表现AckMode 有以下几种:



  • RECORD: 当监听器处理完记载返回时,提交偏移量.
  • BATCH: 处理完 poll() 返回的全部记载后,提交偏移量。
  • TIME: 当poll()返回的全部记载都已处理完毕后,只要超过上次提交后的 ackTime,就提交偏移量。
  • COUNT: 当poll()返回的全部记载都已处理完毕后,只要上次提交后已收到 ackCount 记载,就提交偏移量。
  • COUNT_TIME: 与 TIME 和 COUNT 类似,但如果任一条件为真,都会执行提交。
  • MANUAL: 处理完一批数据后,手动调用 Acknowledgment.acknowledge()方法将offest提交至缓存,之后在下一个poll() 之前用BATCH 方式提交。
  • MANUAL_IMMEDIATE: 当监听器调用 Acknowledgment.acknowledge() 方法时,立即提交偏移量。
报错缘故原由:

从上面的会合确认机制中可以看到,只有MANUAL 和MANUAL_IMMEDIATE 用到了Acknowledgment 其他都没有。以是需要把AckMode 配置成这两个其中一个才行,而查看配置文件发现我确实没有配。
解决办法:

在application.yaml 中配置ack-mode: manual_immediate 。
在自界说配置AckMode 的时候,首先需要将enable-auto-commit 设置成false 才行,2.3版本之后默认是false,不过利用的默认机制是BATCH,以是还需要将ack-mode设成manual_immediate 来覆盖默认配置。

  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       auto-offset-reset: earliest
  6.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  7.         # 值的反序列化方式
  8.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.       group-id: geo-friend
  10.       enable-auto-commit: false # 设置成手动提交
  11.       auto-commit-interval: 1S
  12.     producer:
  13.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15.     listener:        # 配置监听容器的ackmode
  16.       concurrency: 5
  17.       ack-mode: manual_immediate
  18.       missing-topics-fatal: false
复制代码
参考资料:
spring-kafka 文档:https://docs.spring.io/spring-kafka/docs/2.8.2/reference/html/#committing-offsets

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

耶耶耶耶耶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表