Kafka生产者是Kafka消息队列的消息发送端,负责将消息从客户端发送到 Kafka 集群的指定 Topic 下的各个 Partition分区中。并可提供消息预处理处罚、数据序列化、数据压缩、传输确认等数据处理处罚与传输功能。
本文内容为Kafka生产者数据发送过程的源码解析,详细解析了Sender在元数据更新、ProducerBatch数据发送、NetworkClient网络传输等各个环节的Kafka源码,完整展现了Kafka生产者举行数据发送的各步骤操纵细节。
本文解析的Kafka源码版本为:kafka-3.8.0
1.KafkaProducer数据发送功能概述
在消息生产时,KafkaProducer已将生产好的消息写入RecoredAccumulator消息累加器中。在数据发送时,数据发送器Sender负责将RecordAccumulator中的数据取走,并向Kafka集群举行数据发送。
在发送时,Sender先通过元数据找到本次发送的Kafka集群的各目的Node节点,再从RecoredAccumulator消息累加器取出Ready发送的ProducerBatch,清除RecordAccumulator与InflightBatches中已发送或已失效的ProducerBatch,最后通过NetworkClient向Kafka集群发送ProducerBatch数据。
Sender数据发送详细步骤如下:
1.获取RecordAccumulator内Ready的ProduceBatch的元数据(Topic/Partition位置)。
2.若待发送ProduceBatch的Topic和其Partition无法找到在本地元数据中找到,则更新元数据。
3.在Ready发送的ProducerBatch中检测Partition所在Node是否网络可达且正常工作。
4.从RecordAccumulator中取出可发送的所有ProducerBatch。
5.在InflightBatchList队列中加入本次可以发送的ProducerBatch。
6.清除RecordAccumulator中已被Sender读取,待发送的ProducerBatch。
7.处理处罚在InflightBatchList队列和RecordAccumulator失效的ProducerBatch。
8.通过NetworkClient向Kafka集群发送ProducerBatch。
完成的Sender数据发送流程详解如下:
完整代码解析:
2.数据发送步调入口
数据发送器Sender是Kafka生产者数据发送的核心组件。KafkaProducer在调用自身构造方法KafkaProducer()举行实例化的过程中,创建并启动了Sender线程。
KafkaProdcuer.KafkaProdcuer()构造方法内容:
- KafkaProducer(ProducerConfig config,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- ProducerMetadata metadata,
- KafkaClient kafkaClient,
- ProducerInterceptors<K, V> interceptors,
- //...
- //创建网络通信Client与Sender实例
- this.sender = newSender(logContext, kafkaClient, this.metadata);
-
- //启动Sender线程
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
- this.ioThread.start();
- }
复制代码 因此,Sender被KafkaProducer创建并启动后,分析Sender线程举行数据发送的步调入口为Sender.run()方法。
代码图解:
Sender.run()会一直循环调用runOnce(),在运行状态下一直举行数据发送工作。
Sender.run()方法内容:
- public void run() {
- //在运行状态下
- while (running) {
- try {
- //一直执行runOnce()方法进行数据发送
- runOnce();
- } catch (Exception e) {
- log.error("Uncaught error in kafka producer I/O thread: ", e);
- }
- }
- //...
- }
复制代码 Sender的runOnce()则为单次发送数据的方法,重要步骤为:
1.调用Sender的sendProducerData()方法举行数据发送。
2.调用NetworkClient的poll()方法举行数据发送的详细网络传输。
Sender.runOnce()方法内容:
- void runOnce() {
- //...
-
- //发送ProducerData核心逻辑
- long pollTimeout = sendProducerData(currentTimeMs);
-
- //通过NetworkClient完成网络层数据传输
- client.poll(pollTimeout, currentTimeMs);
- }
复制代码 先从Sender的sendProducerData()方法开始解析Sender的数据发送过程。
Sender.sendProducerData()为Sender举行数据发送的核心逻辑。下一章节开始详细解析Sender.sendProducerData()方法。
3.元数据拉取与更新
Sender.sendProducerData()方法首先读取本地的MetadataSnapshot元数据快照,并从RecordAccumulator消息累加器中获取的待发送ProduceBatch聚集的元数据,为每个ProduceBatch找到其Partition所在的Kafka集群的Broker Node节点。
若 Sender的本地元数据存储不包含ProduceBatch的Topic与Partition信息,则需要向Kafka集群拉取元数据。若ProducerBatch所在的Node网络无法毗连、无法发送数据,则会从本次发送的数据中剔除该Node的ProducerBatch。
源码图解:
在Sender.sendProducerData()方法中,首先Kafka读取了本地的MetadataSnapshot元数据快照,并从RecordAccumulator消息累加器中获取的待发送ProduceBatch的元数据。
ProduceBatch聚集的元数据拉取是通过调用了RecordAccumulator的ready()方法实现的,元数据数据范例为:RecordAccumulator.ReadyCheckResult。
Sender.sendProducerData()方法元数据拉取源码:
- private long sendProducerData(long now) {
- //从本地元数据快照中获取元数据
- MetadataSnapshot metadataSnapshot = metadata.fetchMetadataSnapshot();
-
- //获取RecordAccumulator元数据内ready的ProduceBatch的元数据
- RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadataSnapshot, now);
- //...
- }
复制代码 然后从本地元数据中查找待发送ProduceBatch聚集的Topic和Partition,如果Topic和Partition无法在本地元数据中找到,则Sender需要通过NetworkClient拉取Kafka集群最新的元数据。
Sender.sendProducerData()方法元数据更新源码:
- private long sendProducerData(long now) {
- //...
-
- //如果有无法找到Topic和其Leader的Partition,需要更新元数据
- for (String topic : result.unknownLeaderTopics)
- this.metadata.add(topic, now);
-
- this.metadata.requestUpdate(false);
- //...
- }
复制代码 通过Kafka集群元数据,找到待发送ProducerBatch中Partition的Leader,验证Leader所在Node节点是否可举行数据传输,若不可,则移除该Node节点。
Sender遍历了所有Node,通过调用了NeworkClient的ready()依次验证了各目的Node节点是否可举行网络传输,是否正常工作。
Sender.sendProducerData()方法移除不可用Node源码:
- private long sendProducerData(long now) {
- //...
-
- //获取ProducerNode集合所有待发送Node的迭代器
- Iterator<Node> iter = result.readyNodes.iterator();
-
- //遍历所有待发送Node
- while (iter.hasNext()) {
- Node node = iter.next();
- //如果无法发送
- if (!this.client.ready(node, now)) {
- this.accumulator.updateNodeLatencyStats(node.id(), now, false);
-
- //从待发送Node的迭代器中移除本次无法发送的节点
- iter.remove();
-
- notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
- } else {
- //可以正常发送的则更新状态
- this.accumulator.updateNodeLatencyStats(node.id(), now, true);
- }
- }
- //...
- }
复制代码 4.Sender发送数据
当更新元数据后,Sender开始拉取RecordAccumulator中Ready的ProducerBatch,向Kafka集群发送。详细步骤如下:
代码图解:
4.1.拉取消息累加器中ProducerBatch
首先,Sender先从RecordAccumulator消息累加器中取出可Ready发送的ProducerBatch数据。
Sender调用了RecordAccumulator的drain()方法,拉取RecordAccumulator消息累加器中已经Ready的ProducerBatch。其中传入参数为result.readyNodes,体现拉取时会排除上述Node无法发送的ProducerBatch。
Sender.sendProducerData()方法拉取ProducerBatch源码:
- private long sendProducerData(long now) {
- //...
- //从RecordAccumulator中取出可发送的ProducerBatch
- Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadataSnapshot, result.readyNodes, this.maxRequestSize, now);
- //...
- }
复制代码 其中取出的batches的数据布局为Map<Integer, List< roducerBatch>>,Map的Key为每个可发送Node的ID,Value为每个节点待发送的ProducerBatch的List列表。
4.2.在InFlightBatchList中加入ProducerBatch
InflightBatchList是Sender记录正在发送ProducerBatch的数据布局,将ProducerBatch加入InflightBatchList意味着该ProducerBatch正在Sender被发送。
Sender将发送ProducerBatch的加入InflightBatchList的源码如下:
源码图解:
首先Sender在sendProducerData()中调用了addToInflightBatches()方法。
Sender.sendProducerData()方法加入InflightBatchList源码:
- private long sendProducerData(long now) {
- //...
- //将发送ProducerBatch的加入inflightBatchList
- addToInflightBatches(batches);
- //...
- }
复制代码 Sender先遍历所有待发送的Node节点,遍历Map<Integer, List< roducerBatch>>,其中每个Entry为每个Node待发送的List< roducerBatch>。对单个Node节点的所有List< roducerBatch>继续调用 addToInflightBatches()方法【参数为List< roducerBatch>】举行处理处罚。
Sender.addToInflightBatches()方法源码:
- public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) {
- //先按节点维度,遍历每个节点的List<ProducerBatch>
- for (List<ProducerBatch> batchList : batches.values()) {
- //对本节点的ProducerBatch加入inflightList
- addToInflightBatches(batchList);
- }
- }
复制代码 对每个Node,Sender再遍历每个Node的List< roducerBatch>,将每个ProducerBatch加入InFlightBatchList中,记录下当前ProducerBatch正在被Sender发送。
Sender.addToInflightBatches()方法源码:
- private void addToInflightBatches(List<ProducerBatch> batches) {
- //遍历所有ProducerBatch
- for (ProducerBatch batch : batches) {
- //获取inflightBatchList
- List<ProducerBatch> inflightBatchList = inFlightBatches.computeIfAbsent(batch.topicPartition,
- k -> new ArrayList<>());
- //将ProducerBatch加入inflightBatchList中
- inflightBatchList.add(batch);
- }
- }
复制代码 4.3.清除RecordAccumulator中已被Sender读取的ProducerBatch
因为Ready的ProducerBatch已被取走待发送,并被记录在InFlightBatchList中了,因此可以清除RecordAccumulator中的ProducerBatch了。
Sender通过两个循环,遍历了Map<Integer, List< roducerBatch>>中所有ProducerBatch,在RecordAccumulator消息累加器中移除ProducerBatch,避免消息重复发送,并为RecordAccumulator消息累加器腾出空间。
Sender.sendProducerData()方法清除RecordAccumulator源码:
- private long sendProducerData(long now) {
- //...
- //遍历每个Node
- for (List<ProducerBatch> batchList : batches.values()) {
- //遍历单个Node的List<ProducerBatch>
- for (ProducerBatch batch : batchList)
- //清除RecordAccumulator中已被Sender读取的ProducerBatch
- this.accumulator.mutePartition(batch.topicPartition);
- }
- //...
- }
复制代码
4.4.处理处罚在InflightBatchList和RecordAccumulator失效的ProducerBatch
Sender会在每次发送前,清除InflightBatchList和RecordAccumulator失效的ProducerBatch。
Sender首先会获取InflightBatchList与RecordAccumulator中失效的ProducerBatch,通过failBatch()方法抛弃失效ProdcuerBatch。
Sender.sendProducerData()方法处理处罚失效ProducerBatch源码:
- private long sendProducerData(long now) {
- //...
- //获取InflightBatche失败的batch和RecordAccumulator超时的batch
- List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
- List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
- expiredBatches.addAll(expiredInflightBatches);
- //...
-
- //遍历
- for (ProducerBatch expiredBatch : expiredBatches){
- //丢弃失败的Batch
- failBatch(expiredBatch, new TimeoutException(errorMessage), false);
-
- //...
- }
- //...
- }
复制代码
4.5.Sender发送已经ready的ProducerBatch
最终Sender遍历所有可发送的ProducerBatch,按Node维度天生ClientRequest,通过NetworkClient发送给Kafka集群的目的Node节点。
代码图解:
Sender发送过程重要分为两层遍历,先以目的Node维度遍历,再对每个Node待发送的ProducerBatch按Partition举行编排,天生ClientRequest封装数据,再通过NetworkClient发送给Kafka集群。
首先Sender调用sendProduceRequests()方法进入数据发送逻辑。
Sender.sendProducerData()数据发送源码:
- private long sendProducerData(long now) {
- //...
- //进入数据发送逻辑
- sendProduceRequests(batches, now);
- //...
- }
复制代码 Sende先以目的Node维度遍历,对每个Node调用sendProduceRequests()方法发送本Node的List< roducerBatch>。
Sender.sendProduceRequests()数据发送源码:
- private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
- //遍历每个Node
- for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
- //单个Node数据发送
- sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
- }
复制代码 调用Sender.sendProduceRequest后,对于每个Node发送List< roducerBatch>步骤如下:
代码图解:
对于每个Node节点,Sender先把其待发送的数据集List< roducerBatch>按Partition为单元重新编排为ProduceRequestData.TopicProduceDataCollection。
再用ClientRequest封装所有待发送的ProducerBatch数据,通过NetworkClient举行网络传输,发送到Kafka集群的目的Node节点。
Sender.sendProduceRequest()数据发送源码:
- private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
-
- //...
-
- //新建TopicProduceDataCollection封装待发送的各个Topic的ProducerBatch
- ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
- //...
- for (ProducerBatch batch : batches) {
- //取出每个ProducerBatch的TopicPartition和records数据
- TopicPartition tp = batch.topicPartition;
- MemoryRecords records = batch.records();
-
- //在待发送的tpd中找到本ProducerBatch对应Topic的数据
- ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
-
- //不存在则新建TopicProduceData,并添加到tpd
- if (tpData == null){
- tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
- tpd.add(tpData);
- }
-
- //将本ProducerBatch添加到本topic待发送tpd的partitionData中
- tpData.partitionData().add(new
- ProduceRequestData.PartitionProduceData()
- .setIndex(tp.partition())
- .setRecords(records));
- }
- //...
- //构建发送请求与回调函数,待发送数封装在tpd
- //发送请求
- ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
- new ProduceRequestData()
- .setAcks(acks)
- .setTimeoutMs(timeout)
- .setTransactionalId(transactionalId)
- .setTopicData(tpd));
- //回调函数
- RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
-
- //...
-
- //构建Kafak客户端请求
- ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback);
-
- //通过KafkaClient(NetworkClient)发送ProducerBatch到Kafka集群
- client.send(clientRequest, now);
-
- }
复制代码 最终,Sender通过调用NetworkClient的send()方法发送封装了该Node节点所有的ProducerBatch的ClientRequest请求。
代码图解:
NetworkClient的send()方法继续调用了doSend()方法。
NetworkClient.send()数据发送源码:
- public void send(ClientRequest request, long now) {
- doSend(request, false, now);
- }
复制代码 再继续调用。
NetworkClient.doSend()数据发送源码:
- private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
- //...
-
- //继续调用
- doSend(clientRequest, isInternalRequest, now, builder.build(version));
-
- //...
- }
复制代码 进入最终NetworkClient发送的doSend()方法。 NetworkClient获取目的发送Node,构建RequestHeader,将ClientRequest放入InFlightRequest聚集,最终通过JavaNIO底层的Selector举行网络传输。
NetworkClient.doSend()数据发送源码:
- private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
- //获取待发送的目的Node
- String destination = clientRequest.destination();
- //构建RequestHeader
- RequestHeader header = clientRequest.makeHeader(request.version());
- if (log.isDebugEnabled()) {
- log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
- clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
- }
- //将header放入request
- Send send = request.toSend(header);
- //将clientRequest放入InFlightRequest
- InFlightRequest inFlightRequest = new InFlightRequest(
- clientRequest,
- header,
- isInternalRequest,
- request,
- send,
- now);
- this.inFlightRequests.add(inFlightRequest);
- //通过selector继续发送
- selector.send(new NetworkSend(clientRequest.destination(), send));
- }
复制代码 NetworkClient调用Selector.send()现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。
5.NetworkClient网络传输
NetworkClient 是 Kafka 生产者的Java NIO 网络通讯组件,负责举行Kafka生产者与 Kafka Broker Node节点的网络传输。它支持高效的异步通讯,负责发送请求、吸收相应,并提供相应的重试处理处罚和错误恢复机制。
NetworkClient维护一个InFlightRequests队列记录正在发送的请求,并通过Selector与Kafka集群举行数据通讯。
代码图解:
当Sender在实行runOnce()方法时,调用sendProducerData()完成发送数据后,又调用了NetworkClient的poll()方法举行网络传输。
代码图解:
Sender.runOnce()方法源码:
- void runOnce() {
-
- //...
- //Sender发送数据
- long pollTimeout = sendProducerData(currentTimeMs);
-
- //NetworkClient进行数据传输
- client.poll(pollTimeout, currentTimeMs);
- }
复制代码 NetworkClient举行网络传输是通过NetworkClient.poll()方法举行的,首先会向Kafka集群发送元数据更新请求,然后通过Selector发送消息到Kafka集群,最后处理处罚完成的Action。
NetworkClient.poll()源码:
- public List<ClientResponse> poll(long timeout, long now) {
- //...
-
- //更新元数据
- long metadataTimeout = metadataUpdater.maybeUpdate(now);
-
- //...
-
- //由Selector进行具体数据读取与发送的请求
- this.selector.poll(Utils.min(timeout, metadataTimeout, telemetryTimeout, defaultRequestTimeoutMs));
-
- //...
-
- //处理各类已完成的Action
- long updatedNow = this.time.milliseconds();
- List<ClientResponse> responses = new ArrayList<>();
- handleCompletedSends(responses, updatedNow);
- handleCompletedReceives(responses, updatedNow);
- handleDisconnections(responses, updatedNow);
- handleConnections();
- handleInitiateApiVersionRequests(updatedNow);
- handleTimedOutConnections(responses, updatedNow);
- handleTimedOutRequests(responses, updatedNow);
- completeResponses(responses);
- }
复制代码 5.1.元数据更新
NetworkClient首先通过调用MetadataUpdater.maybeUpdate()方法发送元数据更新请求。
代码图解:
源码经过多级调用,最终在NetworkClient.sendInternalMetadataRequest()方法构建元数据更新请求,调用NetworkClient.doSend()方法举行发送。
NetworkClient.sendInternalMetadataRequest()方法源码:
- void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
- //构建元数据更新请求
- ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
-
- //进行请求发送
- doSend(clientRequest, true, now);
- }
复制代码 后续的步骤与数据发送时调用NetworkClient.doSend()方法的步骤相同。
NetworkClient.doSend()数据发送源码:
- private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
- //获取待发送的目的Node
- String destination = clientRequest.destination();
- //构建RequestHeader
- RequestHeader header = clientRequest.makeHeader(request.version());
- if (log.isDebugEnabled()) {
- log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
- clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
- }
- //将header放入request
- Send send = request.toSend(header);
- //将clientRequest放入InFlightRequest
- InFlightRequest inFlightRequest = new InFlightRequest(
- clientRequest,
- header,
- isInternalRequest,
- request,
- send,
- now);
- this.inFlightRequests.add(inFlightRequest);
- //通过selector继续发送
- selector.send(new NetworkSend(clientRequest.destination(), send));
- }
复制代码 NetworkClient调用Selector.send()现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。
5.2.NetworkClient发送数据
在NetworkClient发送元数据请求与Sender发送数据时,最终都是调用了Selector.send()方法。Selector.send()方法,现实是把数据放入了Selector缓存,最终在Selector.poll()中才真正举行数据传输。
Selector.send()方法源码:
- public void send(NetworkSend send) {
- //打开KafkaChannel
- KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
-
- //...
-
- //在KafkaChannel放入待发送的请求
- channel.setSend(send);
- //...
- }
复制代码 代码图解:
NetworkClient网络传输最终是通过Selector举行的,Selector会先找到缓存中所有可发送的事件,并开始举行读写操纵。
Selector.poll()数据发送源码:
- public void poll(long timeout) throws IOException {
- //...
-
- //获取已经ready进行读写的事件
- pollSelectionKeys(readyKeys, false, endSelect);
-
- //...
-
- //进行读写操作
- pollSelectionKeys(readyKeys, false, endSelect);
-
- //...
- }
复制代码 举行读写操纵的代码在Selector的pollSelectionKeys()方法中。
Selector.pollSelectionKeys()数据发送源码:
- void pollSelectionKeys(Set<SelectionKey> selectionKeys,
- boolean isImmediatelyConnected,
- long currentTimeNanos) {
- //遍历所有可发送的读写事件
- for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
- //获取KafkaChannel
- KafkaChannel channel = channel(key);
- //...
-
- //进行读尝试
- attemptRead(channel);
- //...
- //进行写尝试
- attemptWrite(key, channel, nowNanos);
- //...
- }
- }
复制代码 Selector的attemptRead()与attemptWrite()方法为底层的网络传输读写方法。
Selector.attemptRead()方法源码:
- private void attemptRead(KafkaChannel channel) throws IOException {
- String nodeId = channel.id();
-
- //由KafkaChannel进行数据读操作
- long bytesReceived = channel.read();
- if (bytesReceived != 0) {
- long currentTimeMs = time.milliseconds();
- sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
- madeReadProgressLastPoll = true;
- NetworkReceive receive = channel.maybeCompleteReceive();
- if (receive != null) {
- addToCompletedReceives(channel, receive, currentTimeMs);
- }
- }
- if (channel.isMuted()) {
- outOfMemory = true; //channel has muted itself due to memory pressure.
- } else {
- madeReadProgressLastPoll = true;
- }
- }
复制代码 Selector.attemptWrite()方法源码:
- private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
- if (channel.hasSend()
- && channel.ready()
- && key.isWritable()
- && !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
- //继续调用
- write(channel);
- }
- }
复制代码 Selector.Write()方法源码:
- void write(KafkaChannel channel) throws IOException {
- String nodeId = channel.id();
- //由KafkaChannel进行数据写操作
- long bytesSent = channel.write();
- NetworkSend send = channel.maybeCompleteSend();
- // We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
- // caused the pending writes to be written to the socket channel buffer
- if (bytesSent > 0 || send != null) {
- long currentTimeMs = time.milliseconds();
- if (bytesSent > 0)
- this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
- if (send != null) {
- this.completedSends.add(send);
- this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
- }
- }
- }
复制代码 Selector的数据读写最终通过调用KafkaChannel.read()与KafkaChannel.write()方法完成底层网络传输,将数据发送到Kafka集群。
6.结语
KafkaProducer数据发送过程重要是由Sender举行了元数据更新、ProducerBatch数据发送、NetworkClient网络传输,最终将数据发送到Kafka集群中。本文接上文KafkaProducer消息生产,完整解析了Sender举行数据发送的各步骤操纵细节。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |