Kafka 消耗者启动后与服务器的交互流程

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

Kafka 消耗者启动后与服务器的交互流程涉及多个关键步骤,主要包罗初始化、查找组和谐器、加入消耗者组、分区分配、心跳维持、拉取数据和提交偏移量等。以下是详细的流程阐明:
1. 初始化消耗者



  • 创建消耗者实例:应用步调通过调用KafkaConsumer的构造函数,传入配置参数创建消耗者实例。
  • 配置参数:包罗bootstrap.servers(Kafka集群地址)、group.id(消耗者组ID)、key.deserializer、value.deserializer等。
2. 订阅主题



  • 调用subscribe方法:消耗者通过调用subscribe方法订阅一个或多个主题,也可以使用正则表达式来匹配多个主题。
3. 查找组和谐器



  • 消耗者发送FindCoordinator请求:消耗者向Kafka集群中的任意Broker发送FindCoordinator请求,请求中包含消耗者组ID。
  • Broker服务器接收请求:Broker根据消耗者组ID盘算出组和谐器所在的Broker节点,并返回该节点的地址信息。
    - 盘算组和谐器算法:
  1. /**
  2.      * 表示 内部主题 __consumer_offsets 的分区数量,默认初始化值是50(顺带一提__consumer_offsets 副本因子默认值是3)
  3.      * 初始值为 -1,表示尚未设置。
  4.      * 使用 volatile 关键字确保在多线程环境中对该变量的修改是可见的。
  5.      */
  6.      private volatile int numPartitions = -1;
  7.     /**
  8.      * 内部主题 __consumer_offsets 的各个分区分布在各个Broker服务器上,算出当前消费者组的协调器在哪个服务器
  9.      * 消费者组协调器所在brokerId = 消费者组id的哈希值 % 50
  10.      */
  11.          coordinator_broker_id = Utils.abs(groupId.hashCode()) % numPartitions
  12.          
  13.          public static int abs(int n) {
  14.         return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
  15.     }
复制代码


  • 消耗者毗连组和谐器

    • 消耗者根据FindCoordinator响应中的地址信息,毗连到组和谐器。

4. 加入消耗者组



  • Kafka消耗者加入消耗者组的过程主要涉及JoinGroup和SyncGroup两个关键步骤。这个过程确保消耗者可以或许以和谐的方式加入消耗者组,并且分区可以或许被公道地分配给消耗者组内的消耗者。以下是JoinGroup和SyncGroup的详细流程:
  • 阶段一:JoinGroup阶段

    • (1)发送JoinGroup请求:当消耗者启动并调用poll方法时,如果它尚未加入消耗者组,或者必要重新加入(比方,因为再平衡),它会向组和谐器发送JoinGroup请求。这个请求包含消耗者的group.id、订阅的主题列表以及消耗者使用的分区分配策略。
    • (2)等待响应:组和谐器收到JoinGroup请求后,会等待一段时间,以允许其他消耗者也发送他们的JoinGroup请求。这个等待时间是为了收集同一消耗者组内所有消耗者的信息。
    • (3)选择Leader:对于同一个消耗者组的第一次JoinGroup请求,和谐器会选择第一个消耗者作为Leader。Leader负责为组内的所有消耗者分配分区。Leader的选择基于消耗者的JoinGroup请求次序。
    • (4)分区分配策略:Leader消耗者收到和谐器的响应后,会根据提供的分区分配策略(如Range、RoundRobin等)和所有消耗者的订阅信息来决定分区的分配方案。

  • 阶段二:SyncGroup阶段

    • (1)发送SyncGroup请求:Leader消耗者将分区分配方案通过SyncGroup请求发送给组和谐器。随后,组内的其他消耗者也发送SyncGroup请求,但不包含分区分配方案。
    • (2)和谐器广播分区分配方案:组和谐器接收到SyncGroup请求后,将leader消耗者的分区分配方案广播给消耗者组内的所有消耗者。

5. 开始消耗



  • 消耗者接收分区分配:每个消耗者接收到SyncGroup响应后,会知道自己被分配到了哪些分区。
  • 初始化分区消耗:消耗者根据分配到的分区,初始化分区消耗的相关资源,如设置分区的偏移量。
  • 拉取数据并消耗:消耗者开始从分配给它的分区拉取数据并进行消耗。
6. 心跳维持和再平衡



  • 发送心跳:消耗者会定期向组和谐器发送心跳,以表明它仍然活泼。
  • 处置惩罚再平衡:如果有新的消耗者加入或现有消耗者离开消耗者组,和谐器会触发再平衡过程,重新分配分区。
7. 拉取数据



  • 发送Fetch请求:消耗者向分配给它的分区的Leader Broker发送Fetch请求,请求包含拉取数据的偏移量。
  • 接收数据:Broker返回包含消息的响应,消耗者处置惩罚这些消息。
8. 提交偏移量



  • 主动提交:如果启用了主动提交(enable.auto.commit=true),消耗者会定期主动提交消耗的偏移量。
  • 手动提交:如果使用手动提交,消耗者必要调用commitSync或commitAsync方法提交偏移量。
9. 处置惩罚再平衡



  • 再平衡触发:当消耗者组成员发生变化,和谐器会触发再平衡。


      • 消耗者加入消耗者组
        当新的消耗者加入现有的消耗者组时,会触发再平衡。新消耗者大概是新启动的实例,或者是之前失败后重新加入的实例。


      • 消耗者离开消耗者组
           

      • 主动离开:消耗者调用close方法或者主动离开消耗者组时,会触发再平衡。
      • 被动离开:如果消耗者因为网络问题、瓦解或者长时间未发送心跳而被组和谐器以为已经离开,也会触发再平衡。


      • 订阅主题的分区数变化
        如果消耗者组订阅的主题新增了分区,那么为了将新增的分区分配给消耗者,也会触发再平衡。


      • 消耗者订阅模式变化
        如果消耗者组内的任何消耗者更改了其订阅模式(比方,通过subscribe方法订阅了新的主题或者取消订阅了某些主题),这也会触发再平衡。


      • 组和谐器变更
        如果负责管理消耗者组的组和谐器(Group Coordinator)发生变化(比方,因为原和谐器所在的Broker宕机),新的和谐器在接管消耗者组管理职责时,会触发再平衡。


      • 主题元数据变化
        消耗者定期从Broker获取订阅主题的元数据(如分区信息)。如果检测到元数据变化,大概会触发再平衡,只管这种情况较少见。


  • 停息拉取:在再平衡期间,消耗者会停息拉取数据。
  • 重新分配分区:和谐器重新分配分区,并关照消耗者新的分区分配情况。
  • 规复拉取:再平衡完成后,消耗者规复拉取数据。
示例代码

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test-group");
  4. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  5. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  6. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  7. consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题
  8. // 消费者加入消费者组并开始消费的过程是在第一次调用poll方法时触发的
  9. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  10. for (ConsumerRecord<String, String> record : records) {
  11.     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  12. }
  13. // 提交偏移量
  14. consumer.commitSync();
复制代码
在上述代码中,消耗者通过调用subscribe方法订阅了主题test-topic,然后通过调用poll方法触发了加入消耗者组的完整流程,包罗查找组和谐器、加入消耗者组、分区分配、拉取数据和提交偏移量等步骤。
总结

Kafka消耗者启动后与服务器的交互流程是一个复杂的过程,涉及与组和谐器的多次交互。这个流程确保了消耗者可以或许正确地加入消耗者组,分区可以或许被公道地分配给消耗者组内的消耗者,并且在消耗者组成员变化时可以或许进行适当的再平衡,同时保证了消耗者可以或许从正确的位置继续消耗数据。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

怀念夏天

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表