kafka发送消息-生产者发送消息的分区计谋(消息发送到哪个分区中?是什么策 ...

打印 上一主题 下一主题

主题 567|帖子 567|积分 1701


1、默认计谋,步伐自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看步伐终极会把消息发送到哪个分区。
  1. package com.power.producer;
  2. import com.power.model.User;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.header.Headers;
  5. import org.apache.kafka.common.header.internals.RecordHeaders;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.support.KafkaHeaders;
  8. import org.springframework.kafka.support.SendResult;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.util.concurrent.ListenableFuture;
  13. import javax.annotation.Resource;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Date;
  16. import java.util.concurrent.CompletableFuture;
  17. import java.util.concurrent.ExecutionException;
  18. @Component
  19. public class EventProducer {
  20.     @Resource
  21.     private KafkaTemplate<String,Object> kafkaTemplate2;
  22.    
  23.     public void send9(){
  24.         User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
  25.         //分区是null,让kafka自己去决定把消息发送到哪个分区
  26.         kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);
  27.     }
  28. }
复制代码
测试类:
  1. package com.power;
  2. import com.power.model.User;
  3. import com.power.producer.EventProducer;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.annotation.Resource;
  7. import java.util.Date;
  8. @SpringBootTest
  9. public class SpringBoot01KafkaBaseApplication {
  10.     @Resource
  11.     private EventProducer eventProducer;
  12.    
  13.     @Test
  14.     void send9(){
  15.         eventProducer.send9();
  16.     }
  17. }
复制代码
步伐终极是通过以下代码进行目标分区计算的:
  1. Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
复制代码
通过调试发现,步伐是通过以下代码进行目标分区计算的:
步伐自动读取生产者发送消息时的key(本次发送时值为“key9”),将key天生一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果肯定在0-8之间),进而计算得出消息默认发送到的分区值


1.2、不指定key,不指定分区

生产者:

测试类:

此时时通过随机数与默认分区取余数计算默认分区的
  1. 使用随机数 % numPartitions
复制代码
2、轮询分配计谋RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配计谋相关实现类。

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配计谋的配置,那么该如何试下轮询指定分区的配置呢?

须要编写代码试下轮询指定分区计谋:

2.1、创建配置类

  1. package com.power.config;
  2. import org.apache.kafka.clients.admin.NewTopic;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.RoundRobinPartitioner;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @Configuration
  14. public class KafkaConfig {
  15.     @Value("${spring.kafka.bootstrap-servers}")
  16.     private String bootstrapServers;
  17.     @Value("${spring.kafka.producer.key-serializer}")
  18.     private String keySerializer;
  19.     @Value("${spring.kafka.producer.value-serializer}")
  20.     private String valueSerializer;
  21.     public Map<String, Object> producerConfigs() {
  22.         Map<String, Object> props = new HashMap<>();
  23.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  24.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
  25.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
  26.         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
  27.         return props;
  28.     }
  29.     public ProducerFactory<String, ?> producerFactory() {
  30.         return new DefaultKafkaProducerFactory<>(producerConfigs());
  31.     }
  32.     @Bean
  33.     public KafkaTemplate<String, ?> kafkaTemplate() {
  34.         return new KafkaTemplate<>(producerFactory());
  35.     }
  36.     //第二次创建
  37.     @Bean
  38.     public NewTopic newTopic9() {
  39.         return new NewTopic("heTopic", 9, (short) 1);
  40.     }
  41. }
复制代码
2.2、application.yml文件

  1. spring:
  2.   application:
  3.     #应用名称
  4.     name: spring-boot-01-kafka-base
  5.   #kafka连接地址(ip+port)
  6.   kafka:
  7.     bootstrap-servers: <你的kafka服务器IP>:9092
  8.     #配置生产者(有24个配置)
  9.     producer:
  10.       #key默认是StringSerializer序列化
  11.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.       #value默认是ToStringSerializer序列化
  13.       value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer
  14.     #配置消费者(有24个配置)
  15.     consumer:
  16.       auto-offset-reset: earliest
  17.     template:
  18.       default-topic: default-topic
复制代码

2.3、生产者

  1. package com.power.producer;
  2. import com.power.model.User;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.header.Headers;
  5. import org.apache.kafka.common.header.internals.RecordHeaders;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.support.KafkaHeaders;
  8. import org.springframework.kafka.support.SendResult;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.util.concurrent.ListenableFuture;
  13. import javax.annotation.Resource;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Date;
  16. import java.util.concurrent.CompletableFuture;
  17. import java.util.concurrent.ExecutionException;
  18. @Component
  19. public class EventProducer {
  20.     @Resource
  21.     private KafkaTemplate<String,Object> kafkaTemplate2;
  22.     public void send10(){
  23.         User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
  24.         //分区是null,让kafka自己去决定把消息发送到哪个分区
  25.         kafkaTemplate2.send("heTopic",user);
  26.     }
  27. }
复制代码

2.4、测试类


  1. package com.power;
  2. import com.power.model.User;
  3. import com.power.producer.EventProducer;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.annotation.Resource;
  7. import java.util.Date;
  8. @SpringBootTest
  9. public class SpringBoot01KafkaBaseApplication {
  10.     @Resource
  11.     private EventProducer eventProducer;
  12.     @Test
  13.     void send10(){
  14.         for (int i = 0; i <5 ; i++) {
  15.             eventProducer.send10();
  16.         }
  17.     }
  18. }
复制代码
2.5、实行结果

实行完测试类,发现5次哀求分别发送到了kafka的heTopic主题的5个差别分区中:









3、自界说分区分配计谋


3.1、创建自界说分配计谋类

  1. package com.power.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.utils.Utils;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner {    private AtomicInteger nextPartition = new AtomicInteger(0);    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);        int numPartitions = partitions.size();        if(key==null){            //使用轮询方式选择分区            int next = nextPartition.getAndIncrement();            if(next>=numPartitions){                nextPartition.compareAndSet(next,0);            }            if(next>0){                next--;            }            System.out.println("分区值:"+next);            return next;        }else {            //如果key不为inull,则使用默认的分区计谋            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  2.         }    }    @Override    public void close() {    }    @Override    public void configure(Map<String, ?> configs) {    }}
复制代码
3.2、修改kafka配置类

指定使用自界说的分区分配类

3.3、application.yml文件

  1. spring:
  2.   application:
  3.     #应用名称
  4.     name: spring-boot-01-kafka-base
  5.   #kafka连接地址(ip+port)
  6.   kafka:
  7.     bootstrap-servers: <你的kafka服务器IP>:9092
  8.     #配置生产者(有24个配置)
  9.     producer:
  10.       #key默认是StringSerializer序列化
  11.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12.       #value默认是ToStringSerializer序列化
  13.       value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer
  14.     #配置消费者(有24个配置)
  15.     consumer:
  16.       auto-offset-reset: earliest
  17.     template:
  18.       default-topic: default-topic
复制代码

3.4、生产者

  1. package com.power.producer;
  2. import com.power.model.User;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.header.Headers;
  5. import org.apache.kafka.common.header.internals.RecordHeaders;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.kafka.support.KafkaHeaders;
  8. import org.springframework.kafka.support.SendResult;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.util.concurrent.ListenableFuture;
  13. import javax.annotation.Resource;
  14. import java.nio.charset.StandardCharsets;
  15. import java.util.Date;
  16. import java.util.concurrent.CompletableFuture;
  17. import java.util.concurrent.ExecutionException;
  18. @Component
  19. public class EventProducer {
  20.     @Resource
  21.     private KafkaTemplate<String,Object> kafkaTemplate2;
  22.     public void send10(){
  23.         User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
  24.         //分区是null,让kafka自己去决定把消息发送到哪个分区
  25.         kafkaTemplate2.send("heTopic",user);
  26.     }
  27. }
复制代码

3.5、测试类


  1. package com.power;
  2. import com.power.model.User;
  3. import com.power.producer.EventProducer;
  4. import org.junit.jupiter.api.Test;
  5. import org.springframework.boot.test.context.SpringBootTest;
  6. import javax.annotation.Resource;
  7. import java.util.Date;
  8. @SpringBootTest
  9. public class SpringBoot01KafkaBaseApplication {
  10.     @Resource
  11.     private EventProducer eventProducer;
  12.     @Test
  13.     void send10(){
  14.         for (int i = 0; i <5 ; i++) {
  15.             eventProducer.send10();
  16.         }
  17.     }
  18. }
复制代码
3.6、测试结果









3.7、总结

使用自界说分区计谋类尝试发送消息,发现发送的5次消息,并没有一连发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。
doSend方法;
  1. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  2.     TopicPartition tp = null;
  3.     try {
  4.         throwIfProducerClosed();
  5.         // first make sure the metadata for the topic is available
  6.         long nowMs = time.milliseconds();
  7.         ClusterAndWaitTime clusterAndWaitTime;
  8.         try {
  9.             clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
  10.         } catch (KafkaException e) {
  11.             if (metadata.isClosed())
  12.                 throw new KafkaException("Producer closed while send in progress", e);
  13.             throw e;
  14.         }
  15.         nowMs += clusterAndWaitTime.waitedOnMetadataMs;
  16.         long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
  17.         Cluster cluster = clusterAndWaitTime.cluster;
  18.         byte[] serializedKey;
  19.         try {
  20.             serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  21.         } catch (ClassCastException cce) {
  22.             throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
  23.                     " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
  24.                     " specified in key.serializer", cce);
  25.         }
  26.         byte[] serializedValue;
  27.         try {
  28.             serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
  29.         } catch (ClassCastException cce) {
  30.             throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
  31.                     " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
  32.                     " specified in value.serializer", cce);
  33.         }
  34.         int partition = partition(record, serializedKey, serializedValue, cluster);
  35.         tp = new TopicPartition(record.topic(), partition);
  36.         setReadOnly(record.headers());
  37.         Header[] headers = record.headers().toArray();
  38.         int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
  39.                 compressionType, serializedKey, serializedValue, headers);
  40.         ensureValidRecordSize(serializedSize);
  41.         long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
  42.         if (log.isTraceEnabled()) {
  43.             log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
  44.         }
  45.         // producer callback will make sure to call both 'callback' and interceptor callback
  46.         Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  47.         if (transactionManager != null && transactionManager.isTransactional()) {
  48.             transactionManager.failIfNotReadyForSend();
  49.         }
  50.         RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  51.                 serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
  52.         if (result.abortForNewBatch) {
  53.             int prevPartition = partition;
  54.             partitioner.onNewBatch(record.topic(), cluster, prevPartition);
  55.             partition = partition(record, serializedKey, serializedValue, cluster);
  56.             tp = new TopicPartition(record.topic(), partition);
  57.             if (log.isTraceEnabled()) {
  58.                 log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
  59.             }
  60.             // producer callback will make sure to call both 'callback' and interceptor callback
  61.             interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
  62.             result = accumulator.append(tp, timestamp, serializedKey,
  63.                 serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
  64.         }
  65.         if (transactionManager != null && transactionManager.isTransactional())
  66.             transactionManager.maybeAddPartitionToTransaction(tp);
  67.         if (result.batchIsFull || result.newBatchCreated) {
  68.             log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
  69.             this.sender.wakeup();
  70.         }
  71.         return result.future;
  72.         // handling exceptions and record the errors;
  73.         // for API exceptions return them in the future,
  74.         // for other exceptions throw directly
  75.     } catch (ApiException e) {
  76.         log.debug("Exception occurred during message send:", e);
  77.         if (callback != null)
  78.             callback.onCompletion(null, e);
  79.         this.errors.record();
  80.         this.interceptors.onSendError(record, tp, e);
  81.         return new FutureFailure(e);
  82.     } catch (InterruptedException e) {
  83.         this.errors.record();
  84.         this.interceptors.onSendError(record, tp, e);
  85.         throw new InterruptException(e);
  86.     } catch (KafkaException e) {
  87.         this.errors.record();
  88.         this.interceptors.onSendError(record, tp, e);
  89.         throw e;
  90.     } catch (Exception e) {
  91.         // we notify interceptor about all exceptions, since onSend is called before anything else in this method
  92.         this.interceptors.onSendError(record, tp, e);
  93.         throw e;
  94.     }
  95. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表