IT评测·应用市场-qidao123.com

标题: 【kafka实战】04 Kafka生产者发送消息过程源码分析 [打印本页]

作者: 瑞星    时间: 2025-3-12 14:48
标题: 【kafka实战】04 Kafka生产者发送消息过程源码分析
Kafka生产者发送消息过程源码分析

1. 概述

Kafka生产者(Producer)是Kafka系统中负责将消息发送到Kafka集群的客户端组件。生产者发送消息的过程涉及多个步骤,包罗消息的序列化、分区选择、消息累加、批次发送等。本文将深入分析Kafka生产者发送消息的源码,并团结相关原理图举行讲解。
Kafka 基本概念与术语


2. Kafka生产者发送消息的核心流程


Kafka生产者发送消息的核心流程可以分为以下几个步骤:

下面我们将团结源码详细分析每个步骤。
Kafka 生产者主要由以下几个紧张部分构成:

3. 源码分析

3.1 消息创建与序列化

生产者发送消息的第一步是创建消息对象,并将消息的键和值举行序列化。Kafka消息的键和值可以是恣意范例的数据,但最终需要序列化为字节数组才能通过网络传输。
  1. // org.apache.kafka.clients.producer.KafkaProducer#send
  2. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  3.     // 1. 序列化消息的键和值
  4.     byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  5.     byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  6.     // 2. 分区选择
  7.     int partition = partition(record, serializedKey, serializedValue, cluster);
  8.    
  9.     // 3. 将消息添加到累加器
  10.     RecordAccumulator.RecordAppendResult result = accumulator.append(record, serializedKey, serializedValue, headers, partition, maxTimeToBlock);
  11.    
  12.     // 4. 如果批次已满或创建了新批次,则唤醒发送线程
  13.     if (result.batchIsFull || result.newBatchCreated) {
  14.         this.sender.wakeup();
  15.     }
  16.    
  17.     return result.future;
  18. }
复制代码
在send方法中,起首通过序列化器将消息的键和值序列化为字节数组。Kafka提供了多种内置的序列化器,如StringSerializerByteArraySerializer等,用户也可以自定义序列化器。
3.2 分区选择

Kafka消息发送到哪个分区是由分区器(Partitioner)决定的。默认情况下,Kafka使用DefaultPartitioner,它根据消息的键举行哈希盘算,然后根据哈希值选择分区。如果消息没有键,则采用轮询的方式选择分区。
  1. // org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
  2. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  3.     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  4.     int numPartitions = partitions.size();
  5.    
  6.     if (keyBytes == null) {
  7.         // 如果消息没有键,则采用轮询方式选择分区
  8.         int nextValue = counter.getAndIncrement();
  9.         List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  10.         if (availablePartitions.size() > 0) {
  11.             return Utils.toPositive(nextValue) % availablePartitions.size();
  12.         } else {
  13.             return Utils.toPositive(nextValue) % numPartitions;
  14.         }
  15.     } else {
  16.         // 如果消息有键,则根据键的哈希值选择分区
  17.         return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  18.     }
  19. }
复制代码
3.3 消息累加

消息累加器(RecordAccumulator)是Kafka生产者中的一个紧张组件,它负责将消息按分区举行缓存,并等待批量发送。每个分区对应一个消息批次(RecordBatch),当批次大小到达肯定阈值或等待时间超过肯定阈值时,批次会被发送到Kafka集群。
  1. // org.apache.kafka.clients.producer.internals.RecordAccumulator#append
  2. public RecordAppendResult append(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Header[] headers, int partition, long maxTimeToBlock) {
  3.     // 获取或创建对应分区的Deque
  4.     Deque<ProducerBatch> deque = getOrCreateDeque(partition);
  5.    
  6.     // 尝试将消息添加到批次中
  7.     synchronized (deque) {
  8.         RecordAppendResult result = tryAppend(deque, record, serializedKey, serializedValue, headers);
  9.         if (result != null) {
  10.             return result;
  11.         }
  12.     }
  13.    
  14.     // 如果批次已满或创建了新批次,则返回结果
  15.     return appendNewBatch(deque, partition, record, serializedKey, serializedValue, headers, maxTimeToBlock);
  16. }
复制代码
3.4 批次发送

当消息累加器中的批次满意发送条件时,发送线程(Sender)会将批次发送到Kafka集群。发送线程会从累加器中获取预备好的批次,并将其封装成ProducerRequest,然后通过网络发送到Kafka集群。
  1. // org.apache.kafka.clients.producer.internals.Sender#run
  2. public void run() {
  3.     while (running) {
  4.         // 从累加器中获取准备好的批次
  5.         RecordAccumulator.ReadyCheckResult result = accumulator.ready(cluster, now);
  6.         
  7.         // 发送批次
  8.         sendProduceRequests(result.readyNodes, now);
  9.     }
  10. }
复制代码
3.5 响应处理惩罚

Kafka集群在接收到消息后,会返回一个响应(ProducerResponse)。发送线程会处理惩罚这个响应,并根据响应结果更新消息的状态。如果消息发送乐成,则调用用户提供的回调函数(Callback);如果发送失败,则根据配置的重试策略举行重试。
  1. // org.apache.kafka.clients.producer.internals.Sender#handleProduceResponse
  2. private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
  3.     for (Map.Entry<TopicPartition, ProducerBatch> entry : batches.entrySet()) {
  4.         TopicPartition tp = entry.getKey();
  5.         ProducerBatch batch = entry.getValue();
  6.         
  7.         if (response.wasDisconnected()) {
  8.             // 处理网络断开的情况
  9.             handleDisconnection(batch, tp, now);
  10.         } else if (response.hasResponse()) {
  11.             // 处理成功响应
  12.             handleSuccessfulResponse(batch, tp, response, now);
  13.         } else {
  14.             // 处理其他错误
  15.             handleErrorResponse(batch, tp, response, now);
  16.         }
  17.     }
  18. }
复制代码
4. 原理图

以下是Kafka生产者发送消息的核心流程示意图:

5. 总结

现将发送消息的详细流程总结如下

Kafka生产者发送消息的过程涉及多个步骤,包罗消息的序列化、分区选择、消息累加、批次发送和响应处理惩罚。通过源码分析,我们可以更深入地明白Kafka生产者的工作原理。盼望本文能够资助你更好地明白Kafka生产者的内部机制。
6. 参考



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4