Kafka消息能正常发送,但是无法消费题目排查

打印 上一主题 下一主题

主题 885|帖子 885|积分 2655

这里是小奏,觉得文章不错可以关注公众号小奏技术
  kafka version



  • kafka_2.13-3.5.0
背景

线上的kafka集群要进行扩容,原先的2broker,扩容之后变成了新增3个broker,然后下掉了原先老的broker。
新集群看着没题目,但是出现了一个题目。
消息发送都正常,但是消息消费异常
排查

消费端打印log

消费者启动后不停打印如下关键log
  1. 2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:887 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Received FindCoordinator response ClientResponse(receivedTimeMs=1718330063896, latencyMs=285, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-gid_xiaozou-1, correlationId=61, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='gid_xiaozou', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')]))
  2. 2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:914 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Group coordinator lookup failed:
  3. 2024-06-14 09:54:23 DEBUG o.a.k.c.c.i.ConsumerCoordinator:276 - [Consumer clientId=consumer-gid_abaca-1, groupId=gid_abaca] Coordinator discovery failed, refreshing metadata
  4. org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
复制代码
可以看到关键的log是CoordinatorNotAvailableException: The coordinator is not available.
检察消费者状态

进入到kafka进去bin目录实行如下脚本
  1. ./kafka-consumer-groups.sh --bootstrap-server <bootstrap.servers> --describe --group <your-consumer-group-id>
复制代码
输出如下信息
  1. Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
  2. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
  3.         at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
  4.         at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
  5.         at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
  6.         at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:543)
  7.         at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
  8.         at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
  9.         at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.map(JavaCollectionWrappers.scala:313)
  10.         at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:542)
  11.         at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:558)
  12.         at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
  13.         at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
  14.         at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
  15.         at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
  16. Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeGroups(api=FIND_COORDINATOR), deadlineMs=1718330591913, tries=42094, nextAllowedTryMs=1718330592017) timed out at 1718330591917 after 42094 attempt(s)
  17. Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled describeGroups(api=FIND_COORDINATOR) request with correlation id 42096 due to node 6 being disconnected
复制代码
看着是查询不到comsumer group的信息,然后报了TimeoutException异常,没有什么有效的信息
什么情况会出现 The coordinator is not available

网上有一种说法是假如broker配置文件中的offsets.topic.replication.factor必须小于等于broker的数量,否则会出现The coordinator is not available的题目。
比如broker的数量是3,offsets.topic.replication.factor是4,就会出现这个题目。
实际我这边是不会出现这个题目,由于我的broker数量是3,offsets.topic.replication.factor配置的是2
删除topic重建是否可行

删除topic也没用,照旧会出现The coordinator is not available的题目
是否是__consumer_offsets出了题目

kafka的消费位点主要通过__consumer_offsets这个topic来管理
由于本次为了迁移简单,同时线上的业务数据可以丢弃,所以并没有进行topic分区迁移,而是直接删除了topic,然后重建topic。
然后我们检察__consumer_offsets这个topic的分区情况

点进topic详情发现 partition 照旧仅在老的broker中,即已经下掉的topic。这就导致新的broker没有__consumer_offsets这个topic
办理方式

删撤除__consumer_offsets这个topic,然后系统会自动重建,就可以办理了。
假如想更平滑的方式也可以思量对__consumer_offsets进行分区迁移,注意重建后使用原来的comsumer group好像消费照旧会有题目,假如使用原来的comsumer group照旧消费异常,换个comsumer group就好了
总结

假如kafka能正常发送消息,但是消费异常,一般是消费位点出现了题目,即管理消费位点__consumer_offsets的这个toipc
现在来看新增了3个broker kafka并没有自动对__consumer_offsets进行分区迁移,须要手动进行迁移
所以后续出现消费相关的题目可以优先检查__consumer_offsets这个topic的情况,毕竟kafka得消费位点都依靠于__consumer_offsets这个topic

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表