IT评测·应用市场-qidao123.com
标题:
Kafka Producer之拦截器
[打印本页]
作者:
悠扬随风
时间:
2024-7-17 12:25
标题:
Kafka Producer之拦截器
1. Producer流程
新建ProducerRecord类后,传入topic、key、value等数据构建Record之后,隔断发送至kafka集群还必要经历若干过程。
拦截器列表,对数据进行过滤,更改等行为,处置惩罚异常不会导致流程停止。
获取Kafka集群元数据
对数据进行序列化
根据元数据选择分区和Broker
数据校验
进入数据发送缓存区,批次发送
send
2. 代码测试
public class KafkaProducerInterceptorTest {
public static void main(String[] args) throws InterruptedException {
//创建producer
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//指定拦截器
config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
for (int i = 0; i < 10; i++) {
//创建record
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
"test1",
"key"+i,
"我是你爹"+i
);
//发送record
producer.send(record);
Thread.sleep(500);
}
//关闭producer
producer.close();
}
}
复制代码
拦截器自界说类
package org.dragon.producer;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 自定义value拦截器试验<br/>
* 1. 实现接口
* 2. 定义泛型
* 3. 重写方法
*
*
* @author mumu
* @date 2024/07/15
*/
public class ValueInterceptorTest implements ProducerInterceptor<String, String> {
/**
* 发送数据时会调用这个方法<br/>
* 让value复制2次
*
* @param producerRecord 生产者记录
* @return {@link ProducerRecord}<{@link String}, {@link String}>
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
return new ProducerRecord<String, String>(producerRecord.topic(), producerRecord.key(), producerRecord.value() + producerRecord.value());
}
/**
* 发送数据完毕,服务器返回的响应,会调用此方法。
*
* @param recordMetadata 记录元数据
* @param e e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
}
/**
* 生产者关闭,会调用此方法
*/
@Override
public void close() {
}
/**
* 创建生产者对象时调用
*
* @param map 地图
*/
@Override
public void configure(Map<String, ?> map) {
}
}
复制代码
3. 测试结果
可以看到value是复制了2次,乐成。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/)
Powered by Discuz! X3.4