Kafka源码详解4-Kafka生产者数据发送源码详解

打印 上一主题 下一主题

主题 874|帖子 874|积分 2622

       Kafka生产者是Kafka消息队列的消息发送端,负责将消息从客户端发送到 Kafka 集群的指定 Topic 下的各个 Partition分区中。并可提供消息预处理处罚、数据序列化、数据压缩、传输确认等数据处理处罚与传输功能。
        本文内容为Kafka生产者数据发送过程的源码解析,详细解析了Sender在元数据更新、ProducerBatch数据发送、NetworkClient网络传输等各个环节的Kafka源码,完整展现了Kafka生产者举行数据发送的各步骤操纵细节。
        本文解析的Kafka源码版本为:kafka-3.8.0
1.KafkaProducer数据发送功能概述

        在消息生产时,KafkaProducer已将生产好的消息写入RecoredAccumulator消息累加器中。在数据发送时,数据发送器Sender负责将RecordAccumulator中的数据取走,并向Kafka集群举行数据发送。
        在发送时,Sender先通过元数据找到本次发送的Kafka集群的各目的Node节点,再从RecoredAccumulator消息累加器取出Ready发送的ProducerBatch,清除RecordAccumulator与InflightBatches中已发送或已失效的ProducerBatch,最后通过NetworkClient向Kafka集群发送ProducerBatch数据。

Sender数据发送详细步骤如下:
      1.获取RecordAccumulator内Ready的ProduceBatch的元数据(Topic/Partition位置)。
      2.若待发送ProduceBatch的Topic和其Partition无法找到在本地元数据中找到,则更新元数据。
      3.在Ready发送的ProducerBatch中检测Partition所在Node是否网络可达且正常工作。
      4.从RecordAccumulator中取出可发送的所有ProducerBatch。
      5.在InflightBatchList队列中加入本次可以发送的ProducerBatch。
      6.清除RecordAccumulator中已被Sender读取,待发送的ProducerBatch。
      7.处理处罚在InflightBatchList队列和RecordAccumulator失效的ProducerBatch。
      8.通过NetworkClient向Kafka集群发送ProducerBatch。
完成的Sender数据发送流程详解如下:

完整代码解析:

2.数据发送步调入口

        数据发送器Sender是Kafka生产者数据发送的核心组件。KafkaProducer在调用自身构造方法KafkaProducer()举行实例化的过程中,创建并启动了Sender线程。
KafkaProdcuer.KafkaProdcuer()构造方法内容:
  1. KafkaProducer(ProducerConfig config,
  2.               Serializer<K> keySerializer,
  3.               Serializer<V> valueSerializer,
  4.               ProducerMetadata metadata,
  5.               KafkaClient kafkaClient,
  6.               ProducerInterceptors<K, V> interceptors,
  7.     //...
  8.     //创建网络通信Client与Sender实例
  9.     this.sender = newSender(logContext, kafkaClient, this.metadata);
  10.    
  11.            //启动Sender线程
  12.     this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  13.     this.ioThread.start();
  14. }
复制代码
        因此,Sender被KafkaProducer创建并启动后,分析Sender线程举行数据发送的步调入口为Sender.run()方法。
代码图解:

        Sender.run()会一直循环调用runOnce(),在运行状态下一直举行数据发送工作。        
Sender.run()方法内容:
  1. public void run() {
  2.     //在运行状态下
  3.     while (running) {
  4.             try {
  5.                 //一直执行runOnce()方法进行数据发送
  6.                 runOnce();
  7.             } catch (Exception e) {
  8.                 log.error("Uncaught error in kafka producer I/O thread: ", e);
  9.             }
  10.      }
  11.     //...
  12. }
复制代码
        Sender的runOnce()则为单次发送数据的方法,重要步骤为:
                1.调用Sender的sendProducerData()方法举行数据发送。
                2.调用NetworkClient的poll()方法举行数据发送的详细网络传输。
 Sender.runOnce()方法内容:
  1. void runOnce() {
  2.         //...
  3.        
  4.         //发送ProducerData核心逻辑
  5.         long pollTimeout = sendProducerData(currentTimeMs);
  6.        
  7.         //通过NetworkClient完成网络层数据传输
  8.         client.poll(pollTimeout, currentTimeMs);
  9. }
复制代码
        先从Sender的sendProducerData()方法开始解析Sender的数据发送过程。

        Sender.sendProducerData()为Sender举行数据发送的核心逻辑。下一章节开始详细解析Sender.sendProducerData()方法。

3.元数据拉取与更新

        Sender.sendProducerData()方法首先读取本地的MetadataSnapshot元数据快照,并从RecordAccumulator消息累加器中获取的待发送ProduceBatch聚集的元数据,为每个ProduceBatch找到其Partition所在的Kafka集群的Broker Node节点。
        若 Sender的本地元数据存储不包含ProduceBatch的Topic与Partition信息,则需要向Kafka集群拉取元数据。若ProducerBatch所在的Node网络无法毗连、无法发送数据,则会从本次发送的数据中剔除该Node的ProducerBatch。
源码图解:

        在Sender.sendProducerData()方法中,首先Kafka读取了本地的MetadataSnapshot元数据快照,并从RecordAccumulator消息累加器中获取的待发送ProduceBatch的元数据。
        ProduceBatch聚集的元数据拉取是通过调用了RecordAccumulator的ready()方法实现的,元数据数据范例为:RecordAccumulator.ReadyCheckResult。
Sender.sendProducerData()方法元数据拉取源码:
  1. private long sendProducerData(long now) {
  2.     //从本地元数据快照中获取元数据
  3.     MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
  4.    
  5.     //获取RecordAccumulator元数据内ready的ProduceBatch的元数据
  6.     RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
  7.     //...
  8. }
复制代码
        然后从本地元数据中查找待发送ProduceBatch聚集的Topic和Partition,如果Topic和Partition无法在本地元数据中找到,则Sender需要通过NetworkClient拉取Kafka集群最新的元数据。
Sender.sendProducerData()方法元数据更新源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.    
  4.     //如果有无法找到Topic和其Leader的Partition,需要更新元数据
  5.     for (String topic : result.unknownLeaderTopics)
  6.                 this.metadata.add(topic, now);
  7.    
  8.     this.metadata.requestUpdate(false);
  9.     //...
  10. }
复制代码
        通过Kafka集群元数据,找到待发送ProducerBatch中Partition的Leader,验证Leader所在Node节点是否可举行数据传输,若不可,则移除该Node节点。
        Sender遍历了所有Node,通过调用了NeworkClient的ready()依次验证了各目的Node节点是否可举行网络传输,是否正常工作。
Sender.sendProducerData()方法移除不可用Node源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.    
  4.         //获取ProducerNode集合所有待发送Node的迭代器
  5.         Iterator<Node> iter = result.readyNodes.iterator();
  6.        
  7.         //遍历所有待发送Node
  8.         while (iter.hasNext()) {
  9.             Node node = iter.next();
  10.             //如果无法发送
  11.             if (!this.client.ready(node, now)) {
  12.                 this.accumulator.updateNodeLatencyStats(node.id(), now, false);
  13.                 
  14.                 //从待发送Node的迭代器中移除本次无法发送的节点
  15.                 iter.remove();
  16.                 
  17.                 notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
  18.             } else {
  19.                 //可以正常发送的则更新状态
  20.                 this.accumulator.updateNodeLatencyStats(node.id(), now, true);
  21.             }
  22.         }
  23.     //...
  24. }
复制代码
4.Sender发送数据

        当更新元数据后,Sender开始拉取RecordAccumulator中Ready的ProducerBatch,向Kafka集群发送。详细步骤如下:
代码图解:

4.1.拉取消息累加器中ProducerBatch

        首先,Sender先从RecordAccumulator消息累加器中取出可Ready发送的ProducerBatch数据。

        Sender调用了RecordAccumulator的drain()方法,拉取RecordAccumulator消息累加器中已经Ready的ProducerBatch。其中传入参数为result.readyNodes,体现拉取时会排除上述Node无法发送的ProducerBatch。
Sender.sendProducerData()方法拉取ProducerBatch源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.     //从RecordAccumulator中取出可发送的ProducerBatch
  4.     Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);
  5.     //...
  6. }
复制代码
        其中取出的batches的数据布局为Map<Integer, List<roducerBatch>>,Map的Key为每个可发送Node的ID,Value为每个节点待发送的ProducerBatch的List列表。

4.2.在InFlightBatchList中加入ProducerBatch

        InflightBatchList是Sender记录正在发送ProducerBatch的数据布局,将ProducerBatch加入InflightBatchList意味着该ProducerBatch正在Sender被发送。

        Sender将发送ProducerBatch的加入InflightBatchList的源码如下:
源码图解:

        首先Sender在sendProducerData()中调用了addToInflightBatches()方法。
Sender.sendProducerData()方法加入InflightBatchList源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.     //将发送ProducerBatch的加入inflightBatchList
  4.     addToInflightBatches(batches);
  5.     //...
  6. }
复制代码
        Sender先遍历所有待发送的Node节点,遍历Map<Integer, List<roducerBatch>>,其中每个Entry为每个Node待发送的List<roducerBatch>。对单个Node节点的所有List<roducerBatch>继续调用 addToInflightBatches()方法【参数为List<roducerBatch>】举行处理处罚。
Sender.addToInflightBatches()方法源码:
  1.     public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) {
  2.         //先按节点维度,遍历每个节点的List<ProducerBatch>
  3.         for (List<ProducerBatch> batchList : batches.values()) {
  4.             //对本节点的ProducerBatch加入inflightList
  5.             addToInflightBatches(batchList);
  6.         }
  7.     }
复制代码
        对每个Node,Sender再遍历每个Node的List<roducerBatch>,将每个ProducerBatch加入InFlightBatchList中,记录下当前ProducerBatch正在被Sender发送。
Sender.addToInflightBatches()方法源码:
  1. private void addToInflightBatches(List<ProducerBatch> batches) {
  2.     //遍历所有ProducerBatch
  3.     for (ProducerBatch batch : batches) {  
  4.         //获取inflightBatchList   
  5.         List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
  6.             k -> new ArrayList<>());
  7.         //将ProducerBatch加入inflightBatchList中
  8.         inflightBatchList.add(batch);
  9.     }
  10. }
复制代码
4.3.清除RecordAccumulator中已被Sender读取的ProducerBatch

        因为Ready的ProducerBatch已被取走待发送,并被记录在InFlightBatchList中了,因此可以清除RecordAccumulator中的ProducerBatch了。

        Sender通过两个循环,遍历了Map<Integer, List<roducerBatch>>中所有ProducerBatch,在RecordAccumulator消息累加器中移除ProducerBatch,避免消息重复发送,并为RecordAccumulator消息累加器腾出空间。       
Sender.sendProducerData()方法清除RecordAccumulator源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.     //遍历每个Node
  4.     for (List<ProducerBatch> batchList : batches.values()) {
  5.         //遍历单个Node的List<ProducerBatch>
  6.             for (ProducerBatch batch : batchList)
  7.             //清除RecordAccumulator中已被Sender读取的ProducerBatch
  8.                 this.accumulator.mutePartition(batch.topicPartition);
  9.         }
  10.     //...
  11. }
复制代码


4.4.处理处罚在InflightBatchList和RecordAccumulator失效的ProducerBatch

        Sender会在每次发送前,清除InflightBatchList和RecordAccumulator失效的ProducerBatch。

        Sender首先会获取InflightBatchList与RecordAccumulator中失效的ProducerBatch,通过failBatch()方法抛弃失效ProdcuerBatch。
Sender.sendProducerData()方法处理处罚失效ProducerBatch源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.     //获取InflightBatche失败的batch和RecordAccumulator超时的batch
  4.         List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
  5.         List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
  6.         expiredBatches.addAll(expiredInflightBatches);
  7.         //...
  8.                
  9.         //遍历
  10.         for (ProducerBatch expiredBatch : expiredBatches){
  11.                 //丢弃失败的Batch
  12.                 failBatch(expiredBatch, new TimeoutException(errorMessage), false);
  13.                
  14.                 //...
  15.         }
  16.     //...
  17. }
复制代码

4.5.Sender发送已经ready的ProducerBatch

        最终Sender遍历所有可发送的ProducerBatch,按Node维度天生ClientRequest,通过NetworkClient发送给Kafka集群的目的Node节点。

代码图解:

        Sender发送过程重要分为两层遍历,先以目的Node维度遍历,再对每个Node待发送的ProducerBatch按Partition举行编排,天生ClientRequest封装数据,再通过NetworkClient发送给Kafka集群。
        首先Sender调用sendProduceRequests()方法进入数据发送逻辑。
Sender.sendProducerData()数据发送源码:
  1. private long sendProducerData(long now) {
  2.     //...
  3.     //进入数据发送逻辑
  4.     sendProduceRequests(batches, now);
  5.     //...
  6. }
复制代码
        Sende先以目的Node维度遍历,对每个Node调用sendProduceRequests()方法发送本Node的List<roducerBatch>。
Sender.sendProduceRequests()数据发送源码:
  1. private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
  2.     //遍历每个Node   
  3.     for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
  4.         //单个Node数据发送   
  5.         sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
  6. }
复制代码
         调用Sender.sendProduceRequest后对于每个Node发送List<roducerBatch>步骤如下:   
代码图解:
         对于每个Node节点,Sender先把其待发送的数据集List<roducerBatch>按Partition为单元重新编排为ProduceRequestData.TopicProduceDataCollection。
        再用ClientRequest封装所有待发送的ProducerBatch数据,通过NetworkClient举行网络传输,发送到Kafka集群的目的Node节点。
Sender.sendProduceRequest()数据发送源码:
  1. private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
  2.    
  3.     //...
  4.    
  5.     //新建TopicProduceDataCollection封装待发送的各个Topic的ProducerBatch
  6.     ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
  7.     //...
  8.         for (ProducerBatch batch : batches) {
  9.             //取出每个ProducerBatch的TopicPartition和records数据
  10.             TopicPartition tp = batch.topicPartition;
  11.             MemoryRecords records = batch.records();
  12.        
  13.             //在待发送的tpd中找到本ProducerBatch对应Topic的数据
  14.             ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
  15.        
  16.             //不存在则新建TopicProduceData,并添加到tpd
  17.             if (tpData == null){
  18.                       tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
  19.                       tpd.add(tpData);
  20.              }
  21.        
  22.              //将本ProducerBatch添加到本topic待发送tpd的partitionData中
  23.              tpData.partitionData().add(new     
  24.                                        ProduceRequestData.PartitionProduceData()
  25.                                        .setIndex(tp.partition())
  26.                                        .setRecords(records));
  27.         }
  28.     //...
  29.     //构建发送请求与回调函数,待发送数封装在tpd
  30.         //发送请求
  31.         ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
  32.                         new ProduceRequestData()
  33.                                 .setAcks(acks)
  34.                                 .setTimeoutMs(timeout)
  35.                                 .setTransactionalId(transactionalId)
  36.                                 .setTopicData(tpd));
  37.         //回调函数
  38.         RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
  39.        
  40.         //...
  41.        
  42.         //构建Kafak客户端请求
  43.         ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback);
  44.        
  45.         //通过KafkaClient(NetworkClient)发送ProducerBatch到Kafka集群
  46.         client.send(clientRequest, now);
  47.    
  48. }
复制代码
        最终,Sender通过调用NetworkClient的send()方法发送封装了该Node节点所有的ProducerBatch的ClientRequest请求。
代码图解:

        NetworkClient的send()方法继续调用了doSend()方法。
NetworkClient.send()数据发送源码:
  1. public void send(ClientRequest request, long now) {
  2.         doSend(request, false, now);
  3.     }
复制代码
        再继续调用。
NetworkClient.doSend()数据发送源码:
  1. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
  2.     //...
  3.    
  4.     //继续调用
  5.     doSend(clientRequest, isInternalRequest, now, builder.build(version));
  6.    
  7.     //...
  8. }
复制代码
        进入最终NetworkClient发送的doSend()方法。 NetworkClient获取目的发送Node,构建RequestHeader,将ClientRequest放入InFlightRequest聚集,最终通过JavaNIO底层的Selector举行网络传输。
NetworkClient.doSend()数据发送源码:
  1. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
  2.         //获取待发送的目的Node
  3.         String destination = clientRequest.destination();
  4.         //构建RequestHeader
  5.         RequestHeader header = clientRequest.makeHeader(request.version());
  6.         if (log.isDebugEnabled()) {
  7.             log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
  8.                 clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
  9.         }
  10.         //将header放入request
  11.         Send send = request.toSend(header);
  12.         //将clientRequest放入InFlightRequest
  13.         InFlightRequest inFlightRequest = new InFlightRequest(
  14.                 clientRequest,
  15.                 header,
  16.                 isInternalRequest,
  17.                 request,
  18.                 send,
  19.                 now);
  20.         this.inFlightRequests.add(inFlightRequest);
  21.         //通过selector继续发送
  22.         selector.send(new NetworkSend(clientRequest.destination(), send));
  23.     }
复制代码
        NetworkClient调用Selector.send()现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。

5.NetworkClient网络传输

        NetworkClient 是 Kafka 生产者的Java NIO 网络通讯组件,负责举行Kafka生产者与 Kafka Broker Node节点的网络传输。它支持高效的异步通讯,负责发送请求、吸收相应,并提供相应的重试处理处罚和错误恢复机制。
        NetworkClient维护一个InFlightRequests队列记录正在发送的请求,并通过Selector与Kafka集群举行数据通讯。     

代码图解:

        当Sender在实行runOnce()方法时,调用sendProducerData()完成发送数据后,又调用了NetworkClient的poll()方法举行网络传输。
代码图解:

Sender.runOnce()方法源码:
  1. void runOnce() {
  2.                
  3.     //...
  4.         //Sender发送数据
  5.     long pollTimeout = sendProducerData(currentTimeMs);
  6.    
  7.     //NetworkClient进行数据传输
  8.     client.poll(pollTimeout, currentTimeMs);
  9. }
复制代码
        NetworkClient举行网络传输是通过NetworkClient.poll()方法举行的,首先会向Kafka集群发送元数据更新请求,然后通过Selector发送消息到Kafka集群,最后处理处罚完成的Action。
NetworkClient.poll()源码:
  1. public List<ClientResponse> poll(long timeout, long now) {
  2.         //...
  3.        
  4.         //更新元数据
  5.         long metadataTimeout = metadataUpdater.maybeUpdate(now);
  6.        
  7.         //...       
  8.        
  9.         //由Selector进行具体数据读取与发送的请求
  10.         this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs));
  11.        
  12.         //...
  13.        
  14.         //处理各类已完成的Action
  15.         long updatedNow = this.time.milliseconds();
  16.         List<ClientResponse> responses = new ArrayList<>();
  17.         handleCompletedSends(responses, updatedNow);
  18.         handleCompletedReceives(responses, updatedNow);
  19.         handleDisconnections(responses, updatedNow);
  20.         handleConnections();
  21.         handleInitiateApiVersionRequests(updatedNow);
  22.         handleTimedOutConnections(responses, updatedNow);
  23.         handleTimedOutRequests(responses, updatedNow);
  24.         completeResponses(responses);
  25. }
复制代码
5.1.元数据更新

        NetworkClient首先通过调用MetadataUpdater.maybeUpdate()方法发送元数据更新请求。
代码图解:

        源码经过多级调用,最终在NetworkClient.sendInternalMetadataRequest()方法构建元数据更新请求,调用NetworkClient.doSend()方法举行发送。
NetworkClient.sendInternalMetadataRequest()方法源码:
  1. void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
  2.     //构建元数据更新请求
  3.     ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
  4.    
  5.     //进行请求发送
  6.     doSend(clientRequest, true, now);
  7. }
复制代码
        后续的步骤与数据发送时调用NetworkClient.doSend()方法的步骤相同。
NetworkClient.doSend()数据发送源码:
  1. private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
  2.         //获取待发送的目的Node
  3.         String destination = clientRequest.destination();
  4.         //构建RequestHeader
  5.         RequestHeader header = clientRequest.makeHeader(request.version());
  6.         if (log.isDebugEnabled()) {
  7.             log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
  8.                 clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
  9.         }
  10.         //将header放入request
  11.         Send send = request.toSend(header);
  12.         //将clientRequest放入InFlightRequest
  13.         InFlightRequest inFlightRequest = new InFlightRequest(
  14.                 clientRequest,
  15.                 header,
  16.                 isInternalRequest,
  17.                 request,
  18.                 send,
  19.                 now);
  20.         this.inFlightRequests.add(inFlightRequest);
  21.         //通过selector继续发送
  22.         selector.send(new NetworkSend(clientRequest.destination(), send));
  23.     }
复制代码
        NetworkClient调用Selector.send()现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。

5.2.NetworkClient发送数据

        在NetworkClient发送元数据请求与Sender发送数据时,最终都是调用了Selector.send()方法。Selector.send()方法,现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。
Selector.send()方法源码:
  1. public void send(NetworkSend send) {
  2.           //打开KafkaChannel
  3.     KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
  4.    
  5.     //...
  6.    
  7.     //在KafkaChannel放入待发送的请求
  8.     channel.setSend(send);
  9.     //...
  10. }
复制代码
代码图解:

        NetworkClient网络传输最终是通过Selector举行的,Selector会先找到缓存中所有可发送的事件,并开始举行读写操纵。
Selector.poll()数据发送源码:
  1. public void poll(long timeout) throws IOException {
  2.         //...
  3.        
  4.         //获取已经ready进行读写的事件
  5.         pollSelectionKeys(readyKeys, false, endSelect);
  6.        
  7.         //...
  8.        
  9.         //进行读写操作
  10.         pollSelectionKeys(readyKeys, false, endSelect);
  11.        
  12.         //...
  13. }
复制代码
        举行读写操纵的代码在Selector的pollSelectionKeys()方法中。
Selector.pollSelectionKeys()数据发送源码:
  1. void pollSelectionKeys(Set<SelectionKey> selectionKeys,
  2.                        boolean isImmediatelyConnected,
  3.                        long currentTimeNanos) {
  4.     //遍历所有可发送的读写事件
  5.     for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
  6.         //获取KafkaChannel
  7.         KafkaChannel channel = channel(key);
  8.         //...
  9.             
  10.         //进行读尝试
  11.         attemptRead(channel);
  12.         //...
  13.         //进行写尝试
  14.         attemptWrite(key, channel, nowNanos);
  15.         //...
  16.     }
  17. }
复制代码
        Selector的attemptRead()与attemptWrite()方法为底层的网络传输读写方法。
Selector.attemptRead()方法源码:
  1. private void attemptRead(KafkaChannel channel) throws IOException {
  2.     String nodeId = channel.id();
  3.    
  4.     //由KafkaChannel进行数据读操作
  5.     long bytesReceived = channel.read();
  6.     if (bytesReceived != 0) {
  7.         long currentTimeMs = time.milliseconds();
  8.         sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
  9.         madeReadProgressLastPoll = true;
  10.         NetworkReceive receive = channel.maybeCompleteReceive();
  11.         if (receive != null) {
  12.             addToCompletedReceives(channel, receive, currentTimeMs);
  13.         }
  14.     }
  15.     if (channel.isMuted()) {
  16.         outOfMemory = true; //channel has muted itself due to memory pressure.
  17.     } else {
  18.         madeReadProgressLastPoll = true;
  19.     }
  20. }
复制代码
Selector.attemptWrite()方法源码:
  1. private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
  2.     if (channel.hasSend()
  3.                 && channel.ready()
  4.                 && key.isWritable()
  5.                 && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
  6.         //继续调用   
  7.         write(channel);
  8.     }
  9. }
复制代码
Selector.Write()方法源码:
  1. void write(KafkaChannel channel) throws IOException {
  2.     String nodeId = channel.id();
  3.     //由KafkaChannel进行数据写操作
  4.     long bytesSent = channel.write();
  5.     NetworkSend send = channel.maybeCompleteSend();
  6.     // We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
  7.     // caused the pending writes to be written to the socket channel buffer
  8.     if (bytesSent > 0 || send != null) {
  9.         long currentTimeMs = time.milliseconds();
  10.         if (bytesSent > 0)
  11.             this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
  12.         if (send != null) {
  13.             this.completedSends.add(send);
  14.             this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
  15.         }
  16.     }
  17. }
复制代码
        Selector的数据读写最终通过调用KafkaChannel.read()与KafkaChannel.write()方法完成底层网络传输,将数据发送到Kafka集群。

6.结语

       KafkaProducer数据发送过程重要是由Sender举行了元数据更新、ProducerBatch数据发送、NetworkClient网络传输,最终将数据发送到Kafka集群中。本文接上文KafkaProducer消息生产,完整解析了Sender举行数据发送的各步骤操纵细节。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

风雨同行

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表