IT评测·应用市场-qidao123.com
标题:
【kafka实战】05 Kafka消费者消费消息过程源码分析
[打印本页]
作者:
滴水恩情
时间:
2025-3-18 00:52
标题:
【kafka实战】05 Kafka消费者消费消息过程源码分析
1. 概述
Kafka消费者(Consumer)是Kafka系统中负责从Kafka集群中拉取消息的客户端组件。消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚等。本文将深入分析Kafka消费者消费消息的源码,并结合相干原理图进行解说。
以下是一个使用 Java 编写的 KafkaConsumer 的示例。在这个示例中,我们将创建一个简朴的 Kafka 消费者,连接到 Kafka 集群,订阅一个主题,并消费该主题中的消息。
1.1 消费者代码使用示例
已经安装并启动了 Kafka 集群。
你已经添加了 Kafka 客户端依赖到你的项目中。如果你使用 Maven,可以在 pom.xml 中添加以下依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
复制代码
示例代码
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
private static final String TOPIC_NAME = "test-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "test-group";
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 自动提交偏移量
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 键和值的反序列化器
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建 Kafka 消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
// 持续消费消息
while (true) {
// 从 Kafka 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
}
}
复制代码
代码解释
配置消费者属性
:
BOOTSTRAP_SERVERS_CONFIG:指定 Kafka 集群的地址。
GROUP_ID_CONFIG:指定消费者所属的消费组。
ENABLE_AUTO_COMMIT_CONFIG:设置是否自动提交偏移量。
AUTO_COMMIT_INTERVAL_MS_CONFIG:设置自动提交偏移量的时间间隔。
KEY_DESERIALIZER_CLASS_CONFIG 和 VALUE_DESERIALIZER_CLASS_CONFIG:指定键和值的反序列化器。
创建 Kafka 消费者实例
:使用配置好的属性创建 KafkaConsumer 实例。
订阅主题
:使用 subscribe 方法订阅指定的主题。
连续消费消息
:使用 poll 方法从 Kafka 拉取消息,并遍历消费记录,打印消息的偏移量、键和值。
关闭消费者
:在消费完成后,使用 close 方法关闭消费者。
注意事项
确保 Kafka 集群的地址和主题名称精确。
如果需要手动提交偏移量,可以将 ENABLE_AUTO_COMMIT_CONFIG 设置为 false,并使用 commitSync() 或 commitAsync() 方法手动提交偏移量。
2. Kafka消费者消费消息的核心流程
Kafka消费者消费消息的核心流程可以分为以下几个步调:
消费者组协调
:消费者加入消费者组,并与组协调器(GroupCoordinator)进行协调。
分区分配
:组协调器为消费者分配分区。
消息拉取
:消费者从分配的分区中拉取消息。
消息处置惩罚
:消费者处置惩罚拉取到的消息。
提交偏移量
:消费者提交已处置惩罚消息的偏移量。
下面我们将结合源码具体分析每个步调。
3. 源码分析
关键组件阐明
ConsumerCoordinator
:
负责消费者组的协调和分区分配。
管理消费者的心跳和重平衡。
Fetcher
:
负责从 Kafka Broker 拉取消息。
管理拉取哀求和相应的处置惩罚。
SubscriptionState
:
管理消费者订阅的主题和分区。
记录消费者的消费偏移量。
PartitionAssignor
:
负责分区分配计谋的实现。
OffsetCommitCallback
:
处置惩罚偏移量提交的回调逻辑。
3.1 消费者组协调
消费者在启动时,起首需要加入消费者组,并与组协调器进行协调。组协调器负责管理消费者组的成员和分区分配。
// org.apache.kafka.clients.consumer.KafkaConsumer#subscribe
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// 1. 订阅主题
this.subscriptions.subscribe(new HashSet<>(topics), listener);
// 2. 加入消费者组
coordinator.subscribe(subscriptions);
}
复制代码
在subscribe方法中,消费者起首订阅指定的主题,然后通过coordinator.subscribe方法加入消费者组。组协调器会为消费者分配一个唯一的memberId,并将其加入到消费者组中。
3.2 分区分配
组协调器在消费者加入消费者组后,会为消费者分配分区。分区分配计谋由PartitionAssignor决定,Kafka提供了多种内置的分区分配计谋,如RangeAssignor、RoundRobinAssignor等。
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensurePartitionAssignment
private void ensurePartitionAssignment() {
// 1. 获取分区分配结果
Map<String, List<TopicPartition>> assignments = partitionAssignor.assign(metadata.fetch(), subscriptions.subscription());
// 2. 更新消费者的分区分配
subscriptions.assignFromSubscribed(assignments.get(consumerId));
}
复制代码
在ensurePartitionAssignment方法中,组协调器通过partitionAssignor.assign方法为消费者分配分区,并将分配结果更新到消费者的订阅信息中。
3.3 消息拉取
消费者在分配到分区后,会从分配的分区中拉取消息。Kafka消费者采用拉取模式(Pull Model),即消费者主动从Kafka集群中拉取消息。
// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
// 1. 拉取消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
// 2. 返回拉取到的消息
return new ConsumerRecords<>(records);
}
复制代码
在poll方法中,消费者通过fetcher.fetchRecords方法从Kafka集群中拉取消息。fetcher是Kafka消费者中的一个重要组件,负责管理消息的拉取和偏移量的提交。
3.4 消息处置惩罚
消费者在拉取到消息后,会对消息进行处置惩罚。消息处置惩罚的具体逻辑由用户自定义,通常包括消息的反序列化、业务逻辑处置惩罚等。
// org.apache.kafka.clients.consumer.KafkaConsumer#poll
public ConsumerRecords<K, V> poll(Duration timeout) {
// 1. 拉取消息
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchRecords(timeout);
// 2. 处理消息
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
for (ConsumerRecord<K, V> record : entry.getValue()) {
// 用户自定义的消息处理逻辑
processRecord(record);
}
}
// 3. 返回拉取到的消息
return new ConsumerRecords<>(records);
}
复制代码
在poll方法中,消费者通过processRecord方法处置惩罚每条消息。processRecord方法的具体实现由用户自定义。
3.5 提交偏移量
消费者在处置惩罚完消息后,需要提交已处置惩罚消息的偏移量。偏移量的提交可以手动或自动进行,Kafka提供了多种偏移量提交计谋,如自动提交、同步提交、异步提交等。
// org.apache.kafka.clients.consumer.KafkaConsumer#commitSync
public void commitSync() {
// 1. 提交偏移量
coordinator.commitOffsetsSync(subscriptions.allConsumed());
}
复制代码
在commitSync方法中,消费者通过coordinator.commitOffsetsSync方法同步提交偏移量。同步提交会阻塞当前线程,直到偏移量提交乐成。
4. 原理图
以下是Kafka消费者消费消息的核心流程表现图:
+-------------------+ +-------------------+ +-------------------+
| | | | | |
| 消费者组协调 | ----> | 分区分配 | ----> | 消息拉取 |
| | | | | |
+-------------------+ +-------------------+ +-------------------+
|
v
+-------------------+ +-------------------+ +-------------------+
| | | | | |
| 消息处理 | <---- | 提交偏移量 | <---- | 网络传输 |
| | | | | |
+-------------------+ +-------------------+ +-------------------+
复制代码
5. 总结
Kafka消费者消费消息的过程涉及多个步调,包括消费者组的协调、分区分配、消息拉取、消息处置惩罚和偏移量提交。通过源码分析,我们可以更深入地理解Kafka消费者的工作原理。希望本文可以或许帮助你更好地理解Kafka消费者的内部机制。
6. 参考
Kafka官方文档
Kafka源码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4