ToB企服应用市场:ToB评测及商务社交产业平台

标题: kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务) [打印本页]

作者: 熊熊出没    时间: 2025-1-8 08:55
标题: kafka生产者专题(原理+拦截器+序列化+分区+数据可靠+数据去重+事务)
生产者

发送数据原理


说明
参数说明

参数默认值作用形貌bootstrap.serversnode2:9092[,node3:9092][,node4:9092]生产者连接集群所需的broker地址清单,一个或多个(逗号隔开)key.serializer(无)指定发送消息的key的序列化类型,必须写全类名value.serializer(无)指定发送消息的value的序列化类型,必须写全类名buffer.memory32MRecordAccumulator缓冲区总巨细batch.size16K缓冲区一批数据最大值,适当增加可进步吞吐量,但可能增加延迟linger.ms0ms(表示没有延迟)如果数据未达到batch.size,sender等待linger.time后发送数据acks-1应答机制:0-不需要应答,1-Leader应答,-1(all)-全部节点应答max.in.flight.requests.per.connection5允许最多没有返回ack的次数,开启幂等性时建议1-5之间enable.idempotencetrue是否开启幂等性,默认开启retries2147483647(int最大值)消息发送错误时的重试次数retry.backoff.ms100ms两次重试之间的时间间隔compression.typenone生产者发送的全部数据的压缩方式,默认不压缩 代码示例(同步发送数据)

  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutionException;
  8. public class SyncCustomProducer {
  9.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  10.         //TODO 1.声明并实例化Kafka Producer的配置文件对象
  11.         Properties prop = new Properties();
  12.         //TODO 2.为配置文件对象设置参数
  13.         //TODO 2.1 配置bootstrap_server(生产者连接集群所需的broker地址清单)
  14.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  15.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  16.         //TODO 2.2 配置key和value的序列化类
  17.         // 设置序列化器:指定key和value的序列化类为StringSerializer,用于将字符串类型的key和value转换为字节数组,以便发送到Kafka。
  18.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  19.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  20.         //TODO 3.声明并实例化生产者对象
  21.         KafkaProducer<String,String> producer =
  22.                 new KafkaProducer<String, String>(prop);
  23.         //TODO 4.发送消息
  24.         // producer.send(...).get():同步发送消息,send()方法返回一个Future对象,调用get()方法等待发送完成并获取结果。
  25.         for(int i = 0;i<5;i++){
  26.             //同步发送消息
  27.             producer.send(new ProducerRecord<>("topicA","sync_msg"+i)).get();
  28.         }
  29.         //TODO 5.关闭生产者
  30.         producer.close();
  31.     }
  32. }
复制代码
代码说明
类/对象形貌用途PropertiesJava标准库中的类,用于维护键值对列表。 Properties类提供了一种方便的方式来读取和写入属性文件(通常是.properties文件)在本代码中用于存储Kafka生产者的配置参数。KafkaProducer<K, V>Kafka客户端库中的类,用于向Kafka主题发送消息。泛型参数K和V分别表示消息键和值的类型。创建生产者实例,发送消息到Kafka主题。ProducerConfigKafka客户端库中的类,包含生产者配置的常量。提供配置参数的常量值,如broker地址、序列化器等。ProducerRecord<K, V>Kafka客户端库中的类,表示要发送到Kafka主题的消息记录。泛型参数K和V分别表示消息键和值的类型。创建消息记录对象,包含主题、键和值。StringSerializerKafka客户端库中的类,实现了Serializer<String>接口,用于将字符串类型的键或值序列化为字节数组。作为键和值的序列化器,将字符串转换为字节数组进行传输。 效果

代码示例(异步)

  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. public class UnSyncCustomProducer {
  8.     public static void main(String[] args) {
  9.         //实例化Properties
  10.         Properties prop = new Properties();
  11.         //集群节点
  12.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  13.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  14.         //key和value
  15.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.         //创建kafka生产者对象,并写入响应参数
  18.         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
  19.         //发送数据
  20.         for (int i = 0; i < 5; i++) {
  21.             //异步发送数据,不调用get方法
  22.             producer.send(new ProducerRecord<>(
  23.                     "topicA", "unsync_msg" + i
  24.             ));
  25.         }
  26.         producer.close();
  27.     }
  28. }
复制代码
异步和同步的区别

同步发送

界说与流程

界说: 同步发送是指生产者在发送一条消息后,会立即等待Kafka服务器的响应。只有在服务器返回成功响应后,生产者才会继续发送下一条消息。
流程:
生产者调用send()方法发送消息。
send()方法返回一个Future对象。
生产者调用Future对象的get()方法,该方法会阻塞当前线程,直到Kafka服务器返反响应或抛出异常。
生产者收到响应后,根据结果(成功或失败)进行后续操作。
特点

高可靠性:同步发送确保每条消息都被Kafka集群接收并长期化。生产者等待Kafka确认消息已经成功写入指定分区且复制到满足副本因子的节点上,从而进步了消息的可靠性。
异常处理:如果发送过程中发生异常,生产者可以立即感知并处理,避免了消息的丢失。
性能较低:由于同步发送需要阻塞等待响应,因此会增加消息的延迟,降低系统的吞吐量。特别是在高并发场景下,可能会导致线程资源的大量占用和性能瓶颈。
易调试:便于发现和处理异常,有利于开发和测试阶段的调试工作。
异步发送

界说与流程

界说: 异步发送是指生产者在发送一条消息后,不会立即等待Kafka服务器的响应,而是继续发送下一条消息。发送方通过通报一个回调函数给send()方法,该回调函数将在消息发送结果(成功或失败)可用时被异步调用。
流程:
生产者调用send()方法发送消息,并通报一个回调函数。
Kafka客户端将消息放入内部缓冲区,并立即返回。
Sender线程负责将缓冲区中的消息批量发送到Kafka集群。
当消息发送成功或失败时,Kafka客户端调用之前通报的回调函数,通知生产者消息发送的结果。
特点

高性能:异步发送方式下,生产者无需等待每个消息的确认即可继续发送下一条消息,从而进步了消息的发送效率,适用于高吞吐量场景。
灵活性:通过回调函数,生产者可以对消息发送的结果进行异步处理,如记录日记、重试发送等。
可靠性相对较低:由于生产者不会立即得知消息是否成功写入Kafka,因此消息的可靠性需要额外关注。如果生产者在发送消息后立即崩溃,可能会导致部分消息丢失。
调试复杂:由于消息发送和结果是异步的,因此调试时可能需要更多的日记记录和监控手段来确保消息的可靠性和完整性。
异步回调

形貌

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。消息发送失败后会自动重试,不需要再回调函数中手动重试。
代码示例

  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class UnSyncCallBackCustomProducer {
  7.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  8.         //TODO 1.声明并实例化Kafka Producer的配置文件对象
  9.         Properties prop = new Properties();
  10.         //TODO 2.为配置文件对象设置参数
  11.         // 2.1 配置bootstrap_servers
  12.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  13.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  14.         // 2.2 配置key和value的序列化类
  15.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.         //TODO 3.声明并实例化生产者对象
  18.         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
  19.         //TODO 4.发送消息
  20.         for(int i = 0;i<5;i++){
  21.             //异步发送消息 不调用get()方法
  22.             producer.send(new ProducerRecord<>("topicA", "unsync_msg" + i),
  23.                     new Callback() {
  24.                         //如下方法在生产者收到acks确认时异步调用
  25.                         @Override
  26.                         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  27.                             if(e == null){
  28.                                 //无异常信息,笑死发送成功,输出主题和分区信息到控制台
  29.                                 System.out.println("topic:"+recordMetadata.topic()
  30.                                         +",partition:"+recordMetadata.partition());
  31.                             }else{//打印异常信息
  32.                                 System.out.println(e.getMessage());
  33.                             }
  34.                         }
  35.                     });
  36.             Thread.sleep(5);
  37.         }
  38.         //TODO 5.关闭生产者
  39.         producer.close();
  40.     }
  41. }
复制代码
拦截器

形貌

拦截器(Interceptor)是kafka0.10.0.0版本中引入的新功能,主要用于实现clients端的定制化控制逻辑。它可以使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,好比修改消息等。同时允许指定多个Interceptor按序作用于同一条消息从而形成一个拦截器链(Interceptor Chain)。
自界说拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
拦截器内部方法
拦截器Interceptor可能运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。别的,若指定了多个Interceptor,则producer将按照指定顺序调用它们,同时把每个Interceptor中捕获的异常记录写入到错误日记中而不是向上通报。这在使用过程中要特别注意。
代码示例

实现一个简单的双interceptor构成的拦截链。
第一个Interceptor会在消息发送前将时间戳信息加到消息value的前面
第二个Interceptor在消息发送后更新成功发送消息数或失败发送消息数。
第一个拦截器示例
  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import java.util.Map;
  6. public class TimeStampInterceptor implements ProducerInterceptor<String,String> {
  7.     /**初始化拦截器,并接收Kafka生产者的配置参数。
  8.      * */
  9.     @Override
  10.     public void configure(Map<String, ?> configs) {
  11.     }
  12.     /**发送之前被调用,对消息进行处理。
  13.      * */
  14.     @Override
  15.     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  16.         return new ProducerRecord<String, String>(
  17.                 //原始消息记录的主题、分区、时间戳和键。
  18.                 record.topic(),record.partition(),
  19.                 record.timestamp(), record.key(),
  20.                 //将当前系统时间戳(System.currentTimeMillis())和原始消息值拼接成新的消息值,中间用逗号分隔。
  21.                 System.currentTimeMillis()+","+record.value());
  22.     }
  23.     /**消息发送成功或失败后被调用。
  24.      * */
  25.     @Override
  26.     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  27.     }
  28.     /**在拦截器不再使用时进行资源清理。
  29.      * */
  30.     @Override
  31.     public void close() {
  32.     }
  33. }
复制代码
第二个拦截器示例
  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import java.util.Map;
  6. public class CounterIntercepter implements ProducerInterceptor<String,String> {
  7.     private int errorCounter = 0;
  8.     private int successCounter = 0;
  9.     /**onSend方法,该方法在消息发送之前被调用,用于对消息进行处理。
  10.      * 由于这是第二个拦截器,因此这里接受的是前一个拦截器的输出
  11.      * */
  12.     @Override
  13.     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
  14.         return record;
  15.     }
  16.     /**在消息发送成功或失败后被调用。
  17.      * 统计消息发送成功或失败的数量
  18.      * */
  19.     @Override
  20.     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
  21.         if(exception==null){
  22.             successCounter++;
  23.         }else{
  24.             errorCounter++;
  25.         }
  26.     }
  27.     /**拦截器关闭时会进行的额外操作
  28.      * 打印成功或失败的消息数量
  29.      * */
  30.     @Override
  31.     public void close() {
  32.         System.out.println("successful send:"+successCounter);
  33.         System.out.println("failed send:"+errorCounter);
  34.     }
  35.     @Override
  36.     public void configure(Map<String, ?> configs) {
  37.     }
  38. }
复制代码
拦截器调用
  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.ArrayList;
  7. import java.util.List;
  8. import java.util.Properties;
  9. import java.util.concurrent.ExecutionException;
  10. public class SyncCustomProducerInterceptor {
  11.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  12.         Properties prop = new Properties();
  13.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  14.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  15.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.         // 构造拦截器链
  18.         List<String> interceptors = new ArrayList<>();
  19.         interceptors.add("com.wunaiieq.TimeStampInterceptor");
  20.         interceptors.add("com.wunaiieq.CounterIntercepter");
  21.         //配置拦截器链(将拦截器链加入到配置文件中)
  22.         prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
  23.         KafkaProducer<String, String> producer =
  24.                 new KafkaProducer<String, String>(prop);
  25.         for (int i = 5; i < 10; i++) {
  26.             //同步发送消息
  27.             producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();
  28.         }
  29.         //一定要关闭生产者,这样才会调用interceptor的close方法
  30.         producer.close();
  31.     }
  32. }
复制代码
效果


消息序列化

形貌


消息序列化是将对象转换为字节流的过程。在Kafka中,生产者需要将消息对象序列化为字节流,以便通过网络发送给Kafka集群;而消费者则需要将从Kafka集群接收到的字节流反序列化为对象,以便进行后续处理。
代码示例(自界说序列化)

pom.xml
增加依靠
  1.     <dependency>
  2.       <groupId>org.codehaus.jackson</groupId>
  3.       <artifactId>jackson-mapper-asl</artifactId>
  4.       <version>1.9.13</version>
  5.     </dependency>
复制代码
UserVo.java
值对象
  1. package com.wunaiieq;
  2. public class UserVo {
  3.     private String name;
  4.     private int age;
  5.     private String address;
  6.     public UserVo(String name, int age, String address) {
  7.         this.name = name;
  8.         this.age = age;
  9.         this.address = address;
  10.     }
  11.     @Override
  12.     public String toString() {
  13.         return "UserVo{" +
  14.                 "name='" + name + '\'' +
  15.                 ", age=" + age +
  16.                 ", address='" + address + '\'' +
  17.                 '}';
  18.     }
  19.     public String getName() {
  20.         return name;
  21.     }
  22.     public void setName(String name) {
  23.         this.name = name;
  24.     }
  25.     public int getAge() {
  26.         return age;
  27.     }
  28.     public void setAge(int age) {
  29.         this.age = age;
  30.     }
  31.     public String getAddress() {
  32.         return address;
  33.     }
  34.     public void setAddress(String address) {
  35.         this.address = address;
  36.     }
  37. }
复制代码
UserSerializer.java
重写Serializer接口实现序列化操作
  1. package com.wunaiieq;
  2. import org.apache.kafka.common.errors.SerializationException;
  3. import org.apache.kafka.common.serialization.Serializer;
  4. import org.codehaus.jackson.map.ObjectMapper;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Map;
  8. public class UserSerializer implements Serializer<UserVo> {
  9.     private ObjectMapper objectMapper;
  10.     @Override
  11.     public void configure(Map<String, ?> configs, boolean isKey) {
  12.         objectMapper = new ObjectMapper();
  13.         //Serializer.super.configure(configs, isKey);
  14.     }
  15.     /**
  16.      * @param topic 消息将要发送到的主题名
  17.      * @param data 需要序列化的UserVo对象。
  18.      * */
  19.     @Override
  20.     public byte[] serialize(String topic, UserVo data) {
  21.         //存储序列化后的字节数组
  22.         byte[] ret = null;
  23.         try {
  24.             //data写成JSON字符串再写成UTF_8的字节数组
  25.             ret = objectMapper.writeValueAsString(data)
  26.                     .getBytes(StandardCharsets.UTF_8);
  27.         } catch (IOException e) {
  28.             throw new SerializationException("Error when serializing UserVo to byte[],exception is " + e.getMessage());
  29.         }
  30.         return ret;
  31.     }
  32.     @Override
  33.     public void close() {
  34.         objectMapper = null;
  35.         //Serializer.super.close();
  36.     }
  37. }
复制代码
UserSerProducer.java
调用自界说序列化机制
  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class UserSerProducer {
  7.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  8.         Properties prop = new Properties();
  9.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  10.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  11.         // TODO 不用修改key的序列化机制,后续没用到
  12.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         // TODO 修改value的序列化机制
  14.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
  15.         KafkaProducer<String,UserVo> producer = new KafkaProducer<String, UserVo>(prop);
  16.         UserVo userVo = new UserVo("wunaiieq",18,"北京");
  17.         producer.send(
  18.                 // TODO 关于消息记录的构造中,可以指定 1.主题、值 2.主题、键、值
  19.                 new ProducerRecord<String,UserVo>("topicA", userVo),
  20.                 new Callback() {
  21.                     //如下方法在生产者收到acks确认时异步调用
  22.                     @Override
  23.                     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  24.                         if(e == null){
  25.                             //无异常信息,输出主题和分区信息到控制台
  26.                             System.out.println("topic:"+recordMetadata.topic()
  27.                                     +",partition:"+recordMetadata.partition());
  28.                         }else{//打印异常信息
  29.                             System.out.println(e.getMessage());
  30.                         }
  31.                     }
  32.                 });
  33.         Thread.sleep(50);
  34.         producer.close();
  35.     }
  36. }
复制代码
效果
前面的不消管,只是没清空而已


分区

形貌

分区位于拦截器链后面

生产者分区的优势:

分区计谋

以下为提供的默认分区计谋,自行选择即可
代码示例

写入默认分区(0号分区)

消息只会发送到指定的分区内部
  1. package com.wunaiieq.partition;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class ProducerToPartition {
  7.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  8.         //TODO 1.声明并实例化Kafka Producer的配置文件对象
  9.         Properties prop = new Properties();
  10.         //TODO 2.为配置文件对象设置参数
  11.         // 2.1 配置bootstrap_servers
  12.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  13.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  14.         // 2.2 配置key和value的序列化类
  15.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  16.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  17.         //TODO 3.声明并实例化生产者对象
  18.         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
  19.         //TODO 4.发送消息
  20.         for(int i = 0;i<5;i++){
  21.             //指定数据发送到0号分区,key为null
  22.             producer.send(new ProducerRecord<>("topicA",0,null, "unsync_msg" + i),
  23.                     new Callback() {
  24.                         //如下方法在生产者收到acks确认时异步调用
  25.                         @Override
  26.                         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  27.                             if(e == null){
  28.                                 //无异常信息,输出主题和分区信息到控制台
  29.                                 System.out.println("topic:"+recordMetadata.topic()
  30.                                         +",partition:"+recordMetadata.partition());
  31.                             }else{//打印异常信息
  32.                                 System.out.println(e.getMessage());
  33.                             }
  34.                         }
  35.                     });
  36.             Thread.sleep(5);
  37.         }
  38.         //TODO 5.关闭生产者
  39.         producer.close();
  40.     }
  41. }
复制代码
自界说分区机制

部分消息可能需要额外的处理内容,好比审计等等,这类消息的key会携带关键字符串“wunaiieq”,如今让其发送到topicA主题的末了一个分区上,以便于后续处理,其他的消息则随机发送(不包罗末了一个分区)
WunaiieqPartitioner.java
分区写入计谋
  1. package com.wunaiieq.partition;
  2. import org.apache.kafka.clients.producer.Partitioner;
  3. import org.apache.kafka.common.Cluster;
  4. import org.apache.kafka.common.PartitionInfo;
  5. import java.util.List;
  6. import java.util.Map;
  7. import java.util.Random;
  8. public class WunaiieqPartitioner implements Partitioner {
  9.     private Random random;
  10.     @Override
  11.     public void configure(Map<String, ?> configs) {
  12.         //该方法实现必要资源的初始化工作
  13.         random = new Random();
  14.     }
  15.     /** 计算信息对应的分区
  16.      * @param topic   主题
  17.      * @param key    消息的key
  18.      * @param keyBytes  消息的key序列化后的字节数组
  19.      * @param value   消息的value
  20.      * @param valueBytes 消息value序列化后的字节数组
  21.      * @param cluster  集群元数据 可以获取分区信息
  22.      * @return  息对应的分区号
  23.      */
  24.     @Override
  25.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  26.         //将key转换为字符串
  27.         String keyInfo = (String)key;
  28.         //获取主题的分区对象列表
  29.         List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
  30.         //获取主题下的分区总数量
  31.         int partCount = partitionInfoList.size();
  32.         if (partCount <= 1) {
  33.             System.out.println("1 partition");
  34.             return 0;  // 只有一个分区时,直接返回0
  35.         }
  36.         //最后一个分区号
  37.         int wunaiieqPartition = partCount-1;
  38.         //如果 key 为空、key 为空字符串或 key 不包含 "wunaiieq",则随机选择一个除最后一个分区外的分区;否则,消息发送到最后一个分区。
  39.         return keyInfo==null || keyInfo.isEmpty()
  40.                 ||!keyInfo.contains("wunaiieq")
  41.                 ? random.nextInt(partCount-1) : wunaiieqPartition ;
  42.     }
  43.     @Override
  44.     public void close() {
  45.         //该方法实现必要资源的清理工作
  46.         random = null;
  47.     }
  48. }
复制代码
CustomPartitionerProducer.java
调用分区计谋
  1. package com.wunaiieq.partition;
  2. import org.apache.kafka.clients.producer.*;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class CustomPartitionerProducer {
  7.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  8.         Properties prop = new Properties();
  9.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  10.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  11.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  12.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  13.         // TODO 在用于构造KafkaProducer的Properties对象中设置partitioner.class参数
  14.         prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
  15.                 "com.wunaiieq.partition.WunaiieqPartitioner");
  16.         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
  17.         for(int i = 0;i<5;i++){
  18.             // TODO 不指定分区号,key为"wunaiieq"测试运行一次,改为"kafka"后再测试一次。
  19.             producer.send(new ProducerRecord<>("topicA","aa", "unsync_msg" + i),
  20.                     new Callback() {
  21.                         //如下方法在生产者收到acks确认时异步调用
  22.                         @Override
  23.                         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  24.                             if(e == null){
  25.                                 //无异常信息,输出主题和分区信息到控制台
  26.                                 System.out.println("topic:"+recordMetadata.topic()
  27.                                         +",partition:"+recordMetadata.partition());
  28.                             }else{//打印异常信息
  29.                                 System.out.println(e.getMessage());
  30.                             }
  31.                         }
  32.                     });
  33.             Thread.sleep(5);
  34.         }
  35.         producer.close();
  36.     }
  37. }
复制代码
消息丢失


判断消息丢失时,一般看应答机制
特别情况
在acks=-1或all的情况下,Leader接收到数据并长期化后,全部Follower开始同步Leader刚刚长期化的数据,但是有一个Follower因故障迟迟不能进行数据同步,该问题应该怎么解决?

Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader聚集(leader:0,isr:0,1,2)。
如果Follower长时间未向Leader发送通讯请求或同步数据,则该Follower将被踢出ISR。
该时间阈值由replica.lag.time.max.ms参数设定,默认30000ms。比方1超时,(leader:0, isr:0,2)。如许就不消等长期联系不上或者已经故障的节点。
消息绝对不丢失的条件


生产中配置响应级别代码块:
  1. // 设置 acks
  2. prop.put(ProducerConfig.ACKS_CONFIG, "all");
  3. //重试次数 retries,默认是 int 最大值,2147483647
  4. prop.put(ProducerConfig.RETRIES_CONFIG, 3);
复制代码
数据去重

形貌

数据重复的缘故原由

数据去重

幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会长期化一条,包管了不重复。


使用幂等性的使用
开启参数 enable.idempotence 默以为 true,false 关闭。
事务


     代码示例(事务)

  1. package com.wunaiieq;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutionException;
  8. public class ProducerTransaction {
  9.     public static void main(String[] args) throws ExecutionException, InterruptedException {
  10.         Properties prop = new Properties();
  11.         prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  12.                 "192.168.16.100:9092,192.168.16.101:9092,192.168.16.102:9092");
  13.         prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14.         prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  15.         //TODO 设置事务id
  16.         prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
  17.                 "transaction_id_topicA_0");
  18.         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(prop);
  19.         //TODO 初始化事务
  20.         producer.initTransactions();
  21.         //TODO 开启事务
  22.         producer.beginTransaction();
  23.         //TODO 添加异常处理,成功提交事务,失败回滚事务
  24.         try {
  25.             //发送消息
  26.             for (int i = 0; i < 5; i++) {
  27.                 //同步发送消息
  28.                 producer.send(new ProducerRecord<>("topicA", "sync_msg" + i)).get();
  29.             }
  30.             //TODO 提交事务
  31.             producer.commitTransaction();
  32.         }catch (Exception e){
  33.             //TODO 放弃事务
  34.             producer.abortTransaction();
  35.         }
  36.         producer.close();
  37.     }
  38. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4