【kafka实战】05 Kafka消费者消费消息过程源码分析

打印 上一主题 下一主题

主题 989|帖子 989|积分 2967

1. 概述

Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚等。本文将深入分析Kafka消费者消费消息的源码,并结合相干原理图进行解说。
以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简朴的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。
1.1 消费者代码使用示例



  • 已经安装并启动了 Kafka 集群。
  • 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>3.6.0</version>
  5. </dependency>
复制代码
示例代码

  1. import org.apache.kafka.clients.consumer.*;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import java.time.Duration;
  4. import java.util.Collections;
  5. import java.util.Properties;
  6. public class KafkaConsumerExample {
  7.     private static final String TOPIC_NAME = "test-topic";
  8.     private static final String BOOTSTRAP_SERVERS = "localhost:9092";
  9.     private static final String GROUP_ID = "test-group";
  10.     public static void main(String[] args) {
  11.         // 配置消费者属性
  12.         Properties props = new Properties();
  13.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
  14.         props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
  15.         // 自动提交偏移量
  16.         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
  17.         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  18.         // 键和值的反序列化器
  19.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  20.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  21.         // 创建 Kafka 消费者实例
  22.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  23.         try {
  24.             // 订阅主题
  25.             consumer.subscribe(Collections.singletonList(TOPIC_NAME));
  26.             // 持续消费消息
  27.             while (true) {
  28.                 // 从 Kafka 拉取消息
  29.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  30.                 for (ConsumerRecord<String, String> record : records) {
  31.                     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
  32.                 }
  33.             }
  34.         } catch (Exception e) {
  35.             e.printStackTrace();
  36.         } finally {
  37.             // 关闭消费者
  38.             consumer.close();
  39.         }
  40.     }
  41. }
复制代码
代码解释


  • 配置消费者属性

    • BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
    • GROUP_ID_CONFIG:指定消费者所属的消费组。
    • ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
    • KEY_DESERIALIZER_CLASS_CONFIG 和 VALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。

  • 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。
  • 订阅主题:使用 subscribe 方法订阅指定的主题。
  • 连续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。
  • 关闭消费者:在消费完成后,使用 close 方法关闭消费者。
注意事项



  • 确保 Kafka 集群的地址和主题名称精确。
  • 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync() 或 commitAsync() 方法手动提交偏移量。
2. Kafka消费者消费消息的核心流程

Kafka消费者消费消息的核心流程可以分为以下几个步调:

  • 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
  • 分区分配:组协调器为消费者分配分区。
  • 消息拉取:消费者从分配的分区中拉取消息。
  • 消息处置惩罚:消费者处置惩罚拉取到的消息。
  • 提交偏移量:消费者提交已处置惩罚消息的偏移量。
下面我们将结合源码具体分析每个步调。

3. 源码分析


关键组件阐明


  • ConsumerCoordinator

    • 负责消费者组的协调和分区分配。
    • 管理消费者的心跳和重平衡。

  • Fetcher

    • 负责从 Kafka Broker 拉取消息。
    • 管理拉取哀求和相应的处置惩罚。

  • SubscriptionState

    • 管理消费者订阅的主题和分区。
    • 记录消费者的消费偏移量。

  • PartitionAssignor

    • 负责分区分配计谋的实现。

  • OffsetCommitCallback

    • 处置惩罚偏移量提交的回调逻辑。


3.1 消费者组协调

消费者在启动时,起首需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。
  1. // org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
  2. public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
  3.     // 1. 订阅主题
  4.     this.subscriptions.subscribe(new HashSet<>(topics), listener);
  5.    
  6.     // 2. 加入消费者组
  7.     coordinator.subscribe(subscriptions);
  8. }
复制代码
在subscribe方法中,消费者起首订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。
3.2 分区分配

组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配计谋由PartitionAssignor决定,Kafka提供了多种内置的分区分配计谋,如RangeAssignor、RoundRobinAssignor等。
  1. // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
  2. private void ensurePartitionAssignment() {
  3.     // 1. 获取分区分配结果
  4.     Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());
  5.    
  6.     // 2. 更新消费者的分区分配
  7.     subscriptions.assignFromSubscribed(assignments.get(consumerId));
  8. }
复制代码
在ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。
3.3 消息拉取

消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。
  1. // org.apache.kafka.clients.consumer.KafkaConsumer#poll
  2. public ConsumerRecords<K, V> poll(Duration timeout) {
  3.     // 1. 拉取消息
  4.     Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
  5.    
  6.     // 2. 返回拉取到的消息
  7.     return new ConsumerRecords<>(records);
  8. }
复制代码
在poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。
3.4 消息处置惩罚

消费者在拉取到消息后,会对消息进行处置惩罚。消息处置惩罚的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处置惩罚等。
  1. // org.apache.kafka.clients.consumer.KafkaConsumer#poll
  2. public ConsumerRecords<K, V> poll(Duration timeout) {
  3.     // 1. 拉取消息
  4.     Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
  5.    
  6.     // 2. 处理消息
  7.     for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
  8.         for (ConsumerRecord<K, V> record : entry.getValue()) {
  9.             // 用户自定义的消息处理逻辑
  10.             processRecord(record);
  11.         }
  12.     }
  13.    
  14.     // 3. 返回拉取到的消息
  15.     return new ConsumerRecords<>(records);
  16. }
复制代码
在poll方法中,消费者通过processRecord方法处置惩罚每条消息。processRecord方法的具体实现由用户自定义。
3.5 提交偏移量

消费者在处置惩罚完消息后,需要提交已处置惩罚消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交计谋,如自动提交、同步提交、异步提交等。
  1. // org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
  2. public void commitSync() {
  3.     // 1. 提交偏移量
  4.     coordinator.commitOffsetsSync(subscriptions.allConsumed());
  5. }
复制代码
在commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交乐成。
4. 原理图

以下是Kafka消费者消费消息的核心流程表现图:
  1. +-------------------+       +-------------------+       +-------------------+
  2. |                   |       |                   |       |                   |
  3. |  消费者组协调      | ----> |     分区分配       | ----> |     消息拉取       |
  4. |                   |       |                   |       |                   |
  5. +-------------------+       +-------------------+       +-------------------+
  6.                                                                       |
  7.                                                                       v
  8. +-------------------+       +-------------------+       +-------------------+
  9. |                   |       |                   |       |                   |
  10. |     消息处理       | <---- |     提交偏移量     | <---- |     网络传输       |
  11. |                   |       |                   |       |                   |
  12. +-------------------+       +-------------------+       +-------------------+
复制代码
5. 总结

Kafka消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚和偏移量提交。通过源码分析,我们可以更深入地理解Kafka消费者的工作原理。希望本文可以或许帮助你更好地理解Kafka消费者的内部机制。
6. 参考



  • Kafka官方文档
  • Kafka源码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表