乌市泽哥 发表于 2024-8-28 07:43:11

kafka发送消息-生产者发送消息的分区计谋(消息发送到哪个分区中?是什么策

https://i-blog.csdnimg.cn/direct/53ea3b9451474dae944e7765896a75ed.png
1、默认计谋,步伐自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看步伐终极会把消息发送到哪个分区。
package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;
   
    public void send9(){
      User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
      //分区是null,让kafka自己去决定把消息发送到哪个分区
      kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);
    }
}
测试类:
package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;
   
    @Test
    void send9(){
      eventProducer.send9();
    }
}
步伐终极是通过以下代码进行目标分区计算的:
Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
通过调试发现,步伐是通过以下代码进行目标分区计算的:
步伐自动读取生产者发送消息时的key(本次发送时值为“key9”),将key天生一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果肯定在0-8之间),进而计算得出消息默认发送到的分区值
https://i-blog.csdnimg.cn/direct/d4e1f28347344e94b79da2fb0e681ac0.png
https://i-blog.csdnimg.cn/direct/fa0b5f2350534c5a94dadfe084385be5.png
1.2、不指定key,不指定分区

生产者:
https://i-blog.csdnimg.cn/direct/960e3e13d9d64c79b40cc718941bbea5.png
测试类:
https://i-blog.csdnimg.cn/direct/a08f49c41788409695df680a5fe71758.png
此时时通过随机数与默认分区取余数计算默认分区的
使用随机数 % numPartitions
2、轮询分配计谋RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配计谋相关实现类。
https://i-blog.csdnimg.cn/direct/987906f35a7e443297cdf8fb59a6bb4a.png
在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配计谋的配置,那么该如何试下轮询指定分区的配置呢?
https://i-blog.csdnimg.cn/direct/d284b3f07e614ef1a85c9e93e3394739.png
须要编写代码试下轮询指定分区计谋:
https://i-blog.csdnimg.cn/direct/0577273988aa4897bfb1d0c522dd604c.png
2.1、创建配置类

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public Map<String, Object> producerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
      props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
      return props;
    }

    public ProducerFactory<String, ?> producerFactory() {
      return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, ?> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }


    //第二次创建
    @Bean
    public NewTopic newTopic9() {
      return new NewTopic("heTopic", 9, (short) 1);
    }
}
2.2、application.yml文件

spring:
application:
    #应用名称
    name: spring-boot-01-kafka-base

#kafka连接地址(ip+port)
kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(有24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(有24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic
https://i-blog.csdnimg.cn/direct/ffadef1cf28a4890a1e841084996848c.png
2.3、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
      User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
      //分区是null,让kafka自己去决定把消息发送到哪个分区
      kafkaTemplate2.send("heTopic",user);
    }
}
https://i-blog.csdnimg.cn/direct/997ad87434734f4fad1b23fb7decc448.png
2.4、测试类

https://i-blog.csdnimg.cn/direct/2474da3c3a514f2fa7b92da0b168c058.png
package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
      for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
      }

    }

}
2.5、实行结果

实行完测试类,发现5次哀求分别发送到了kafka的heTopic主题的5个差别分区中:
https://i-blog.csdnimg.cn/direct/74a16965b9af4df18b9588bbd8976a6f.png
https://i-blog.csdnimg.cn/direct/6570207b8863401eba09a05fd2603429.png
https://i-blog.csdnimg.cn/direct/d3d138acabee4d388bb9bfbe81f85aa4.png
https://i-blog.csdnimg.cn/direct/9bdeb53adf7641f59ecdfc4752d5b316.png
https://i-blog.csdnimg.cn/direct/003f24b1426f401280eab2b8a79b1054.png
https://i-blog.csdnimg.cn/direct/2727958fa91c49d1922e9d09e9f7353e.png
https://i-blog.csdnimg.cn/direct/f8942d8556f04003a8dba27c9ea7245e.png
https://i-blog.csdnimg.cn/direct/873696ee1f164cb5b527073e3d66dc98.png
https://i-blog.csdnimg.cn/direct/6dc01377e354497dac00207727d9b031.png
3、自界说分区分配计谋

https://i-blog.csdnimg.cn/direct/d72f1dbebf754fb6a01101b8f730613a.png
3.1、创建自界说分配计谋类

package com.power.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.utils.Utils;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {    private AtomicInteger nextPartition = new AtomicInteger(0);    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {      List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);      int numPartitions = partitions.size();      if(key==null){            //使用轮询方式选择分区            int next = nextPartition.getAndIncrement();            if(next>=numPartitions){                nextPartition.compareAndSet(next,0);            }            if(next>0){                next--;            }            System.out.println("分区值:"+next);            return next;      }else {            //如果key不为inull,则使用默认的分区计谋            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
      }    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }} 3.2、修改kafka配置类

指定使用自界说的分区分配类
https://i-blog.csdnimg.cn/direct/f36767e0bd75434ea88140a68f551885.png
3.3、application.yml文件

spring:
application:
    #应用名称
    name: spring-boot-01-kafka-base

#kafka连接地址(ip+port)
kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(有24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(有24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic
https://i-blog.csdnimg.cn/direct/ffadef1cf28a4890a1e841084996848c.png
3.4、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
      User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
      //分区是null,让kafka自己去决定把消息发送到哪个分区
      kafkaTemplate2.send("heTopic",user);
    }
}
https://i-blog.csdnimg.cn/direct/997ad87434734f4fad1b23fb7decc448.png
3.5、测试类

https://i-blog.csdnimg.cn/direct/2474da3c3a514f2fa7b92da0b168c058.png
package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
      for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
      }

    }

}
3.6、测试结果

https://i-blog.csdnimg.cn/direct/dc445140e8bd4868bb828b296bb44ec0.png
https://i-blog.csdnimg.cn/direct/e78485c2281043148b7f42ceb391822f.png
https://i-blog.csdnimg.cn/direct/eef813e668614fc4b468bd76550079f7.png
https://i-blog.csdnimg.cn/direct/a576de151fc74ed282c1c83dde463dcf.png
https://i-blog.csdnimg.cn/direct/d41b40478d46441ebf38abc219bc7c60.png
https://i-blog.csdnimg.cn/direct/f50df9aece7944d0b232b46b08ef1d4e.png
https://i-blog.csdnimg.cn/direct/dbb53feb0304466ab43035dc37f6e9c8.png
https://i-blog.csdnimg.cn/direct/1dd19055cd3440b1b4ad688fff7130f1.png
3.7、总结

使用自界说分区计谋类尝试发送消息,发现发送的5次消息,并没有一连发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。
doSend方法;
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
      throwIfProducerClosed();
      // first make sure the metadata for the topic is available
      long nowMs = time.milliseconds();
      ClusterAndWaitTime clusterAndWaitTime;
      try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
      } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
      }
      nowMs += clusterAndWaitTime.waitedOnMetadataMs;
      long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
      Cluster cluster = clusterAndWaitTime.cluster;
      byte[] serializedKey;
      try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
      } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                  " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                  " specified in key.serializer", cce);
      }
      byte[] serializedValue;
      try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
      } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                  " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                  " specified in value.serializer", cce);
      }
      int partition = partition(record, serializedKey, serializedValue, cluster);
      tp = new TopicPartition(record.topic(), partition);

      setReadOnly(record.headers());
      Header[] headers = record.headers().toArray();

      int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
      ensureValidRecordSize(serializedSize);
      long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
      if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
      }
      // producer callback will make sure to call both 'callback' and interceptor callback
      Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

      if (transactionManager != null && transactionManager.isTransactional()) {
            transactionManager.failIfNotReadyForSend();
      }
      RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

      if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
      }

      if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);

      if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
      }
      return result.future;
      // handling exceptions and record the errors;
      // for API exceptions return them in the future,
      // for other exceptions throw directly
    } catch (ApiException e) {
      log.debug("Exception occurred during message send:", e);
      if (callback != null)
            callback.onCompletion(null, e);
      this.errors.record();
      this.interceptors.onSendError(record, tp, e);
      return new FutureFailure(e);
    } catch (InterruptedException e) {
      this.errors.record();
      this.interceptors.onSendError(record, tp, e);
      throw new InterruptException(e);
    } catch (KafkaException e) {
      this.errors.record();
      this.interceptors.onSendError(record, tp, e);
      throw e;
    } catch (Exception e) {
      // we notify interceptor about all exceptions, since onSend is called before anything else in this method
      this.interceptors.onSendError(record, tp, e);
      throw e;
    }
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: kafka发送消息-生产者发送消息的分区计谋(消息发送到哪个分区中?是什么策