Kafka Producer之拦截器

打印 上一主题 下一主题

主题 949|帖子 949|积分 2847

1. Producer流程


新建ProducerRecord类后,传入topic、key、value等数据构建Record之后,隔断发送至kafka集群还必要经历若干过程。

  • 拦截器列表,对数据进行过滤,更改等行为,处置惩罚异常不会导致流程停止。
  • 获取Kafka集群元数据
  • 对数据进行序列化
  • 根据元数据选择分区和Broker
  • 数据校验
  • 进入数据发送缓存区,批次发送
  • send
2. 代码测试

  1. public class KafkaProducerInterceptorTest {
  2.     public static void main(String[] args) throws InterruptedException {
  3.         //创建producer
  4.         HashMap<String, Object> config = new HashMap<>();
  5.         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
  6.         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7.         config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8.         //指定拦截器
  9.         config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
  10.         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
  11.         for (int i = 0; i < 10; i++) {
  12.             //创建record
  13.             ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  14.                     "test1",
  15.                     "key"+i,
  16.                     "我是你爹"+i
  17.             );
  18.             //发送record
  19.             producer.send(record);
  20.             Thread.sleep(500);
  21.         }
  22.         //关闭producer
  23.         producer.close();
  24.     }
  25. }
复制代码
拦截器自界说类
  1. package org.dragon.producer;
  2. import org.apache.kafka.clients.producer.ProducerInterceptor;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import java.util.Map;
  6. /**
  7. * 自定义value拦截器试验<br/>
  8. * 1. 实现接口
  9. * 2. 定义泛型
  10. * 3. 重写方法
  11. *
  12. *
  13. * @author mumu
  14. * @date 2024/07/15
  15. */
  16. public class ValueInterceptorTest implements ProducerInterceptor<String, String> {
  17.     /**
  18.      * 发送数据时会调用这个方法<br/>
  19.      * 让value复制2次
  20.      *
  21.      * @param producerRecord 生产者记录
  22.      * @return {@link ProducerRecord}<{@link String}, {@link String}>
  23.      */
  24.     @Override
  25.     public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
  26.         return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.key(), producerRecord.value() + producerRecord.value());
  27.     }
  28.     /**
  29.      * 发送数据完毕,服务器返回的响应,会调用此方法。
  30.      *
  31.      * @param recordMetadata 记录元数据
  32.      * @param e              e
  33.      */
  34.     @Override
  35.     public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
  36.     }
  37.     /**
  38.      * 生产者关闭,会调用此方法
  39.      */
  40.     @Override
  41.     public void close() {
  42.     }
  43.     /**
  44.      * 创建生产者对象时调用
  45.      *
  46.      * @param map 地图
  47.      */
  48.     @Override
  49.     public void configure(Map<String, ?> map) {
  50.     }
  51. }
复制代码
3. 测试结果

可以看到value是复制了2次,乐成。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表