Kafka消费消息丢失排查,原因竟是groupId重复

打印 上一主题 下一主题

主题 845|帖子 845|积分 2535

现象

BI的同事发现某指标数据展示有问题,发现最近入库的数据缺失,然后反馈到DBA. 经DBA排查后发现原始数据缺少.
排查


  • 之前笔者在休假,同事初步排查怀疑是消息阻塞导致.经过代码调整发版之后发现还是有情况发生.
  • 笔者接手之后,在本地打印指定点位的消息,发现没有丢失消息的情况.(15分钟一条消息)
    于是在线上系统中添加了打印指定点位的日志.(发版下班)
  • 第二天,查看日志发现有缺失情况,本地打印继续开启 发现没有复现
    于是查询消费组,收集到一组 host(ip)
  1. @Test
  2.     public void showGroupInfo() throws ExecutionException, InterruptedException {
  3.         String id = "kafka消费组id";
  4.         DescribeConsumerGroupsResult describeConsumerGroupsResult = admin.describeConsumerGroups(Collections.singleton(id));
  5.         final KafkaFuture<Map<String, ConsumerGroupDescription>> all = describeConsumerGroupsResult.all();
  6.         final Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = all.get();
  7.         final Set<Map.Entry<String, ConsumerGroupDescription>> entries1 = stringConsumerGroupDescriptionMap.entrySet();
  8.         for (Map.Entry<String, ConsumerGroupDescription> stringConsumerGroupDescriptionEntry : entries1) {
  9.             final ConsumerGroupDescription value = stringConsumerGroupDescriptionEntry.getValue();
  10.             final Collection<MemberDescription> members = value.members();
  11.             for (MemberDescription member : members) {
  12.                 String s1 = member.consumerId();
  13.                 String host = member.host();
  14.                 String s = member.clientId();
  15.                 String s2 = member.assignment().toString();
  16.                 System.out.printf("clientId[%s],memberId[%s],host[%s],assignment[%s]%n", s, s1, host, s2);
  17.             }
  18.         }
  19.     }
复制代码

  • 找运维同事查看ip之后发现了另一个项目,拉取jar反编译之后发现配置文件中kafka 消费者 groupId 配置相同
  • 反馈相关项目负责人
疑问

如何统一管理/监控 kafka group 划分 避免此类问题发生?
kafka groupId 是否要按照微服务项目来划分?

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

雁过留声

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表