kafka消息发送几种方式

打印 上一主题 下一主题

主题 885|帖子 885|积分 2655

同步发送 or 异步发送
       消息发送根据是否必要处理发送的效果分为同步发送、异步发送。
同步发送:等待发送效果返回,这种方式是可靠的,因为异常能及时处理,但同步发送必要阻塞等待一条消息发送完才处理下一条,吞吐量差。

异步发送:发送是异步的,不关心发送的效果,吞吐量最高,但可能存在发送失败的环境。
    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送效果。异步发送则忽略send 返回值。
  1. ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
  2.   try {
  3.       SendResult sendResult = future.get();
  4.   } catch (InterruptedException e) {
  5.       e.printStackTrace();
  6.   } catch (ExecutionException e) {
  7.       e.printStackTrace();
  8.   }
复制代码

发送完成回调
有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且包管发送失败能被监控到。

  1. producer.send(record, new Callback() {
  2. @Override
  3. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  4. if(e != null){
  5.     System.out.println("send error ");
  6. }else {
  7.     System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
  8. }
  9. }
  10. });
复制代码
发送异常
       有些发送异常可以通过重试几次后办理,比如网络异常,对于有些异常比如消息太大超出kafka设置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以设置重试次数
  1. spring.kafka.producer.retries=10
复制代码

SpringBoot 集成简单介绍
     参考上篇文章SpringBoot 集成设置(pom依赖、application设置),简单讲解SpringBoot 几个告急自动装配类。
KafkaAutoConfiguration
KafkaAutoConfiguration给我们自动设置了几个类
KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是利用的KafkaProducer发送消息的。
ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer
利用方式也很简单,由于直接KafkaAutoConfiguration已经界说了相关Bean, 利用时注入Bean即可


  1. @Autowired
  2. private KafkaTemplate kafkaTemplate;
  3. @Autowired
  4. private ProducerFactory producerFactory;
复制代码
详细代码
同步发送、异步发送的方式直接利用 kafkaTemplate即可完成,同步发送效果处理:这里简单的打印出消息的topic partition offset 等信息如下图

  1. ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
  2. SendResult sendResult = future.get();
  3. RecordMetadata recordMetadata = sendResult.getRecordMetadata();
  4. System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
复制代码
  1. [/code]                                    
  2. [align=center][img=1080,186]https://img-blog.csdnimg.cn/img_convert/e002b15f43b322a29db3a785fdc59784.png[/img][/align]
  3. 发送回调kafkaTemplate没有对应api , 必要通过Producer发送,我们通过producerFactory获取。
  4. [code]ProducerRecord record = new ProducerRecord(topic,content);
  5.     Producer producer = producerFactory.createProducer();
  6.     producer.send(record, new Callback() {
  7.         @Override
  8.         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  9.             if(e != null){
  10.                 System.out.println("send error ");
  11.             }else {
  12.                 System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
  13.             }
  14.         }
  15.     });
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

伤心客

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表