伤心客 发表于 2024-12-26 06:08:24

kafka消息发送几种方式

同步发送 or 异步发送
       消息发送根据是否必要处理发送的效果分为同步发送、异步发送。
同步发送:等待发送效果返回,这种方式是可靠的,因为异常能及时处理,但同步发送必要阻塞等待一条消息发送完才处理下一条,吞吐量差。
异步发送:发送是异步的,不关心发送的效果,吞吐量最高,但可能存在发送失败的环境。
    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送效果。异步发送则忽略send 返回值。
ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
try {
      SendResult sendResult = future.get();
} catch (InterruptedException e) {
      e.printStackTrace();
} catch (ExecutionException e) {
      e.printStackTrace();
}
发送完成回调
有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且包管发送失败能被监控到。

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if(e != null){
    System.out.println("send error ");
}else {
    System.out.println("send resulttopic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "offset=" + recordMetadata.offset() );
}

}
}); 发送异常
       有些发送异常可以通过重试几次后办理,比如网络异常,对于有些异常比如消息太大超出kafka设置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以设置重试次数
spring.kafka.producer.retries=10
SpringBoot 集成简单介绍
     参考上篇文章SpringBoot 集成设置(pom依赖、application设置),简单讲解SpringBoot 几个告急自动装配类。
KafkaAutoConfiguration
KafkaAutoConfiguration给我们自动设置了几个类
KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是利用的KafkaProducer发送消息的。
ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer
利用方式也很简单,由于直接KafkaAutoConfiguration已经界说了相关Bean, 利用时注入Bean即可

https://img-blog.csdnimg.cn/img_convert/24ccf1bbd1b94ed8b6f72163db75fad8.png
@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private ProducerFactory producerFactory; 详细代码
同步发送、异步发送的方式直接利用 kafkaTemplate即可完成,同步发送效果处理:这里简单的打印出消息的topic partition offset 等信息如下图

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
SendResult sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );                                    
https://img-blog.csdnimg.cn/img_convert/e002b15f43b322a29db3a785fdc59784.png
发送回调kafkaTemplate没有对应api , 必要通过Producer发送,我们通过producerFactory获取。
ProducerRecord record = new ProducerRecord(topic,content);
    Producer producer = producerFactory.createProducer();
    producer.send(record, new Callback() {
      @Override
      public void onCompletion(RecordMetadata recordMetadata, Exception e) {

            if(e != null){
                System.out.println("send error ");
            }else {
                System.out.println("send resulttopic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "offset=" + recordMetadata.offset() );
            }
      }
    });

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: kafka消息发送几种方式