ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka 消耗者启动后与服务器的交互流程 [打印本页]

作者: 怀念夏天    时间: 2024-8-18 23:14
标题: Kafka 消耗者启动后与服务器的交互流程
Kafka 消耗者启动后与服务器的交互流程涉及多个关键步骤,主要包罗初始化、查找组和谐器、加入消耗者组、分区分配、心跳维持、拉取数据和提交偏移量等。以下是详细的流程阐明:
1. 初始化消耗者


2. 订阅主题


3. 查找组和谐器


  1. /**
  2.      * 表示 内部主题 __consumer_offsets 的分区数量,默认初始化值是50(顺带一提__consumer_offsets 副本因子默认值是3)
  3.      * 初始值为 -1,表示尚未设置。
  4.      * 使用 volatile 关键字确保在多线程环境中对该变量的修改是可见的。
  5.      */
  6.      private volatile int numPartitions = -1;
  7.     /**
  8.      * 内部主题 __consumer_offsets 的各个分区分布在各个Broker服务器上,算出当前消费者组的协调器在哪个服务器
  9.      * 消费者组协调器所在brokerId = 消费者组id的哈希值 % 50
  10.      */
  11.          coordinator_broker_id = Utils.abs(groupId.hashCode()) % numPartitions
  12.          
  13.          public static int abs(int n) {
  14.         return (n == Integer.MIN_VALUE) ? 0 : Math.abs(n);
  15.     }
复制代码

4. 加入消耗者组


5. 开始消耗


6. 心跳维持和再平衡


7. 拉取数据


8. 提交偏移量


9. 处置惩罚再平衡


示例代码

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("group.id", "test-group");
  4. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  5. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  6. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  7. consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题
  8. // 消费者加入消费者组并开始消费的过程是在第一次调用poll方法时触发的
  9. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  10. for (ConsumerRecord<String, String> record : records) {
  11.     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  12. }
  13. // 提交偏移量
  14. consumer.commitSync();
复制代码
在上述代码中,消耗者通过调用subscribe方法订阅了主题test-topic,然后通过调用poll方法触发了加入消耗者组的完整流程,包罗查找组和谐器、加入消耗者组、分区分配、拉取数据和提交偏移量等步骤。
总结

Kafka消耗者启动后与服务器的交互流程是一个复杂的过程,涉及与组和谐器的多次交互。这个流程确保了消耗者可以或许正确地加入消耗者组,分区可以或许被公道地分配给消耗者组内的消耗者,并且在消耗者组成员变化时可以或许进行适当的再平衡,同时保证了消耗者可以或许从正确的位置继续消耗数据。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4