1、默认计谋,步伐自动计算并指定分区
1.1、指定key,不指定分区
生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看步伐终极会把消息发送到哪个分区。
- package com.power.producer;
- import com.power.model.User;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.header.Headers;
- import org.apache.kafka.common.header.internals.RecordHeaders;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.KafkaHeaders;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- @Component
- public class EventProducer {
- @Resource
- private KafkaTemplate<String,Object> kafkaTemplate2;
-
- public void send9(){
- User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
- //分区是null,让kafka自己去决定把消息发送到哪个分区
- kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);
- }
- }
复制代码 测试类:
- package com.power;
- import com.power.model.User;
- import com.power.producer.EventProducer;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- import java.util.Date;
- @SpringBootTest
- public class SpringBoot01KafkaBaseApplication {
- @Resource
- private EventProducer eventProducer;
-
- @Test
- void send9(){
- eventProducer.send9();
- }
- }
复制代码 步伐终极是通过以下代码进行目标分区计算的:
- Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
复制代码 通过调试发现,步伐是通过以下代码进行目标分区计算的:
步伐自动读取生产者发送消息时的key(本次发送时值为“key9”),将key天生一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果肯定在0-8之间),进而计算得出消息默认发送到的分区值
1.2、不指定key,不指定分区
生产者:
测试类:
此时时通过随机数与默认分区取余数计算默认分区的
2、轮询分配计谋RoundRobinPartitioner
通过查看kafka源码发现,分区接口有一个轮询分配计谋相关实现类。
在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配计谋的配置,那么该如何试下轮询指定分区的配置呢?
须要编写代码试下轮询指定分区计谋:
2.1、创建配置类
- package com.power.config;
- import org.apache.kafka.clients.admin.NewTopic;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.RoundRobinPartitioner;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- public class KafkaConfig {
- @Value("${spring.kafka.bootstrap-servers}")
- private String bootstrapServers;
- @Value("${spring.kafka.producer.key-serializer}")
- private String keySerializer;
- @Value("${spring.kafka.producer.value-serializer}")
- private String valueSerializer;
- public Map<String, Object> producerConfigs() {
- Map<String, Object> props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
- props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
- return props;
- }
- public ProducerFactory<String, ?> producerFactory() {
- return new DefaultKafkaProducerFactory<>(producerConfigs());
- }
- @Bean
- public KafkaTemplate<String, ?> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
- //第二次创建
- @Bean
- public NewTopic newTopic9() {
- return new NewTopic("heTopic", 9, (short) 1);
- }
- }
复制代码 2.2、application.yml文件
- spring:
- application:
- #应用名称
- name: spring-boot-01-kafka-base
- #kafka连接地址(ip+port)
- kafka:
- bootstrap-servers: <你的kafka服务器IP>:9092
- #配置生产者(有24个配置)
- producer:
- #key默认是StringSerializer序列化
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- #value默认是ToStringSerializer序列化
- value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer
- #配置消费者(有24个配置)
- consumer:
- auto-offset-reset: earliest
- template:
- default-topic: default-topic
复制代码
2.3、生产者
- package com.power.producer;
- import com.power.model.User;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.header.Headers;
- import org.apache.kafka.common.header.internals.RecordHeaders;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.KafkaHeaders;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- @Component
- public class EventProducer {
- @Resource
- private KafkaTemplate<String,Object> kafkaTemplate2;
- public void send10(){
- User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
- //分区是null,让kafka自己去决定把消息发送到哪个分区
- kafkaTemplate2.send("heTopic",user);
- }
- }
复制代码
2.4、测试类
- package com.power;
- import com.power.model.User;
- import com.power.producer.EventProducer;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- import java.util.Date;
- @SpringBootTest
- public class SpringBoot01KafkaBaseApplication {
- @Resource
- private EventProducer eventProducer;
- @Test
- void send10(){
- for (int i = 0; i <5 ; i++) {
- eventProducer.send10();
- }
- }
- }
复制代码 2.5、实行结果
实行完测试类,发现5次哀求分别发送到了kafka的heTopic主题的5个差别分区中:
3、自界说分区分配计谋
3.1、创建自界说分配计谋类
- package com.power.config;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;import org.apache.kafka.common.utils.Utils;import java.util.List;import java.util.Map;import java.util.concurrent.atomic.AtomicInteger;public class CustomerPartitioner implements Partitioner { private AtomicInteger nextPartition = new AtomicInteger(0); @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if(key==null){ //使用轮询方式选择分区 int next = nextPartition.getAndIncrement(); if(next>=numPartitions){ nextPartition.compareAndSet(next,0); } if(next>0){ next--; } System.out.println("分区值:"+next); return next; }else { //如果key不为inull,则使用默认的分区计谋 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- } } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}
复制代码 3.2、修改kafka配置类
指定使用自界说的分区分配类
3.3、application.yml文件
- spring:
- application:
- #应用名称
- name: spring-boot-01-kafka-base
- #kafka连接地址(ip+port)
- kafka:
- bootstrap-servers: <你的kafka服务器IP>:9092
- #配置生产者(有24个配置)
- producer:
- #key默认是StringSerializer序列化
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- #value默认是ToStringSerializer序列化
- value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer
- #配置消费者(有24个配置)
- consumer:
- auto-offset-reset: earliest
- template:
- default-topic: default-topic
复制代码
3.4、生产者
- package com.power.producer;
- import com.power.model.User;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.header.Headers;
- import org.apache.kafka.common.header.internals.RecordHeaders;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.support.KafkaHeaders;
- import org.springframework.kafka.support.SendResult;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.stereotype.Component;
- import org.springframework.util.concurrent.ListenableFuture;
- import javax.annotation.Resource;
- import java.nio.charset.StandardCharsets;
- import java.util.Date;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- @Component
- public class EventProducer {
- @Resource
- private KafkaTemplate<String,Object> kafkaTemplate2;
- public void send10(){
- User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
- //分区是null,让kafka自己去决定把消息发送到哪个分区
- kafkaTemplate2.send("heTopic",user);
- }
- }
复制代码
3.5、测试类
- package com.power;
- import com.power.model.User;
- import com.power.producer.EventProducer;
- import org.junit.jupiter.api.Test;
- import org.springframework.boot.test.context.SpringBootTest;
- import javax.annotation.Resource;
- import java.util.Date;
- @SpringBootTest
- public class SpringBoot01KafkaBaseApplication {
- @Resource
- private EventProducer eventProducer;
- @Test
- void send10(){
- for (int i = 0; i <5 ; i++) {
- eventProducer.send10();
- }
- }
- }
复制代码 3.6、测试结果
3.7、总结
使用自界说分区计谋类尝试发送消息,发现发送的5次消息,并没有一连发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。
doSend方法;
- private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- TopicPartition tp = null;
- try {
- throwIfProducerClosed();
- // first make sure the metadata for the topic is available
- long nowMs = time.milliseconds();
- ClusterAndWaitTime clusterAndWaitTime;
- try {
- clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
- } catch (KafkaException e) {
- if (metadata.isClosed())
- throw new KafkaException("Producer closed while send in progress", e);
- throw e;
- }
- nowMs += clusterAndWaitTime.waitedOnMetadataMs;
- long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
- Cluster cluster = clusterAndWaitTime.cluster;
- byte[] serializedKey;
- try {
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in key.serializer", cce);
- }
- byte[] serializedValue;
- try {
- serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in value.serializer", cce);
- }
- int partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- setReadOnly(record.headers());
- Header[] headers = record.headers().toArray();
- int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
- compressionType, serializedKey, serializedValue, headers);
- ensureValidRecordSize(serializedSize);
- long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
- if (log.isTraceEnabled()) {
- log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
- }
- // producer callback will make sure to call both 'callback' and interceptor callback
- Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
- if (transactionManager != null && transactionManager.isTransactional()) {
- transactionManager.failIfNotReadyForSend();
- }
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
- if (result.abortForNewBatch) {
- int prevPartition = partition;
- partitioner.onNewBatch(record.topic(), cluster, prevPartition);
- partition = partition(record, serializedKey, serializedValue, cluster);
- tp = new TopicPartition(record.topic(), partition);
- if (log.isTraceEnabled()) {
- log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
- }
- // producer callback will make sure to call both 'callback' and interceptor callback
- interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
- result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
- }
- if (transactionManager != null && transactionManager.isTransactional())
- transactionManager.maybeAddPartitionToTransaction(tp);
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
- return result.future;
- // handling exceptions and record the errors;
- // for API exceptions return them in the future,
- // for other exceptions throw directly
- } catch (ApiException e) {
- log.debug("Exception occurred during message send:", e);
- if (callback != null)
- callback.onCompletion(null, e);
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- return new FutureFailure(e);
- } catch (InterruptedException e) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- throw new InterruptException(e);
- } catch (KafkaException e) {
- this.errors.record();
- this.interceptors.onSendError(record, tp, e);
- throw e;
- } catch (Exception e) {
- // we notify interceptor about all exceptions, since onSend is called before anything else in this method
- this.interceptors.onSendError(record, tp, e);
- throw e;
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |