kafka消费者接收不到消息

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

背景:

   对kafka消息举行监听,生产者发了消息,但是消费端没有接到消息,监听代码
消费端,kafka配置
spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";
spring.kafka.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
#=============== provider =======================
spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=dq
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

  1. @KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"})
  2. public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {
复制代码
}
offset explorer发现生产者发送了消息,offset是0

标题办理:

厥后查看生产者kafka配置,发现他们的enable-auto-commit是false:
spring.kafka.consumer.enable-auto-commit=false

修改kafka配置
spring.kafka.consumer.enable-auto-commit=false
# 在侦听器容器中运行的线程数
spring.kafka.listener.concurrency=5
# listner负责ack,每调用commit方法,立即向服务器提交
spring.kafka.listener.ack-mode=manual_immediate

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表