1. 概述
Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚等。本文将深入分析Kafka消费者消费消息的源码,并结合相干原理图进行解说。
以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简朴的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。
1.1 消费者代码使用示例
- 已经安装并启动了 Kafka 集群。
- 你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.6.0</version>
- </dependency>
复制代码 示例代码
- import org.apache.kafka.clients.consumer.*;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
- public class KafkaConsumerExample {
- private static final String TOPIC_NAME = "test-topic";
- private static final String BOOTSTRAP_SERVERS = "localhost:9092";
- private static final String GROUP_ID = "test-group";
- public static void main(String[] args) {
- // 配置消费者属性
- Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
- props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
- // 自动提交偏移量
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
- props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
- // 键和值的反序列化器
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // 创建 Kafka 消费者实例
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- try {
- // 订阅主题
- consumer.subscribe(Collections.singletonList(TOPIC_NAME));
- // 持续消费消息
- while (true) {
- // 从 Kafka 拉取消息
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 关闭消费者
- consumer.close();
- }
- }
- }
复制代码 代码解释
- 配置消费者属性:
- BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
- GROUP_ID_CONFIG:指定消费者所属的消费组。
- ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
- AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
- KEY_DESERIALIZER_CLASS_CONFIG 和 VALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
- 创建 Kafka 消费者实例:使用配置好的属性创建 KafkaConsumer 实例。
- 订阅主题:使用 subscribe 方法订阅指定的主题。
- 连续消费消息:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。
- 关闭消费者:在消费完成后,使用 close 方法关闭消费者。
注意事项
- 确保 Kafka 集群的地址和主题名称精确。
- 如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync() 或 commitAsync() 方法手动提交偏移量。
2. Kafka消费者消费消息的核心流程
Kafka消费者消费消息的核心流程可以分为以下几个步调:
- 消费者组协调:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
- 分区分配:组协调器为消费者分配分区。
- 消息拉取:消费者从分配的分区中拉取消息。
- 消息处置惩罚:消费者处置惩罚拉取到的消息。
- 提交偏移量:消费者提交已处置惩罚消息的偏移量。
下面我们将结合源码具体分析每个步调。
3. 源码分析
关键组件阐明
- ConsumerCoordinator:
- 负责消费者组的协调和分区分配。
- 管理消费者的心跳和重平衡。
- Fetcher:
- 负责从 Kafka Broker 拉取消息。
- 管理拉取哀求和相应的处置惩罚。
- SubscriptionState:
- 管理消费者订阅的主题和分区。
- 记录消费者的消费偏移量。
- PartitionAssignor:
- OffsetCommitCallback:
3.1 消费者组协调
消费者在启动时,起首需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。
- // org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
- public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
- // 1. 订阅主题
- this.subscriptions.subscribe(new HashSet<>(topics), listener);
-
- // 2. 加入消费者组
- coordinator.subscribe(subscriptions);
- }
复制代码 在subscribe方法中,消费者起首订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。
3.2 分区分配
组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配计谋由PartitionAssignor决定,Kafka提供了多种内置的分区分配计谋,如RangeAssignor、RoundRobinAssignor等。
- // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
- private void ensurePartitionAssignment() {
- // 1. 获取分区分配结果
- Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());
-
- // 2. 更新消费者的分区分配
- subscriptions.assignFromSubscribed(assignments.get(consumerId));
- }
复制代码 在ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。
3.3 消息拉取
消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。
- // org.apache.kafka.clients.consumer.KafkaConsumer#poll
- public ConsumerRecords<K, V> poll(Duration timeout) {
- // 1. 拉取消息
- Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
-
- // 2. 返回拉取到的消息
- return new ConsumerRecords<>(records);
- }
复制代码 在poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。
3.4 消息处置惩罚
消费者在拉取到消息后,会对消息进行处置惩罚。消息处置惩罚的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处置惩罚等。
- // org.apache.kafka.clients.consumer.KafkaConsumer#poll
- public ConsumerRecords<K, V> poll(Duration timeout) {
- // 1. 拉取消息
- Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
-
- // 2. 处理消息
- for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
- for (ConsumerRecord<K, V> record : entry.getValue()) {
- // 用户自定义的消息处理逻辑
- processRecord(record);
- }
- }
-
- // 3. 返回拉取到的消息
- return new ConsumerRecords<>(records);
- }
复制代码 在poll方法中,消费者通过processRecord方法处置惩罚每条消息。processRecord方法的具体实现由用户自定义。
3.5 提交偏移量
消费者在处置惩罚完消息后,需要提交已处置惩罚消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交计谋,如自动提交、同步提交、异步提交等。
- // org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
- public void commitSync() {
- // 1. 提交偏移量
- coordinator.commitOffsetsSync(subscriptions.allConsumed());
- }
复制代码 在commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交乐成。
4. 原理图
以下是Kafka消费者消费消息的核心流程表现图:
- +-------------------+ +-------------------+ +-------------------+
- | | | | | |
- | 消费者组协调 | ----> | 分区分配 | ----> | 消息拉取 |
- | | | | | |
- +-------------------+ +-------------------+ +-------------------+
- |
- v
- +-------------------+ +-------------------+ +-------------------+
- | | | | | |
- | 消息处理 | <---- | 提交偏移量 | <---- | 网络传输 |
- | | | | | |
- +-------------------+ +-------------------+ +-------------------+
复制代码 5. 总结
Kafka消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚和偏移量提交。通过源码分析,我们可以更深入地理解Kafka消费者的工作原理。希望本文可以或许帮助你更好地理解Kafka消费者的内部机制。
6. 参考
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |