kafkaTemplate.sendDefault(message).addCallback

打印 上一主题 下一主题

主题 867|帖子 867|积分 2601

在利用 kafkaTemplate.sendDefault(message).addCallback 时,你可以通过 addCallback 方法来处置惩罚发送消息后的成功和失败回调。
  1. import org.springframework.kafka.core.KafkaTemplate;
  2. import org.springframework.kafka.support.Callback;
  3. import org.springframework.kafka.support.SendResult;
  4. public class KafkaProducer {
  5.     private final KafkaTemplate<String, String> kafkaTemplate;
  6.     public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  7.         this.kafkaTemplate = kafkaTemplate;
  8.     }
  9.     public void sendMessage(String message) {
  10.         kafkaTemplate.sendDefault(message).addCallback(new Callback() {
  11.             @Override
  12.             public void onSuccess(SendResult<String, String> result) {
  13.                 // **处理成功的逻辑**
  14.                 System.out.println("Message sent successfully: " + result.getProducerRecord().value());
  15.             }
  16.             @Override
  17.             public void onFailure(org.apache.kafka.clients.producer.ProducerRecord<String, String> producerRecord, Exception ex) {
  18.                 // **处理失败的逻辑**
  19.                 System.err.println("Message failed to send: " + ex.getMessage());
  20.             }
  21.         });
  22.     }
  23. }
复制代码
关键点:



  • 成功回调:在 onSuccess 方法中,你可以处置惩罚消息成功发送后的逻辑。
  • 失败回调:在 onFailure 方法中,你可以处置惩罚消息发送失败的情况。
@KafkaListener 和 kafkaTemplate.sendDefault(message).addCallback 是 Kafka 中用于不同目标的两个概念,具体区别如下:
1. 功能目标



  • @KafkaListener:

    • 用于消费消息。它是一个注解,用于标记一个方法,使其可以或许自动吸收来自指定主题的消息。

  • kafkaTemplate.sendDefault(message).addCallback:

    • 用于发送消息。它是 KafkaTemplate 的一个方法,用于将消息发送到 Kafka 主题,并提供成功和失败的回调处置惩罚。

2. 利用场景



  • @KafkaListener:

    • 当你需要处置惩罚来自 Kafka 主题的消息时,利用 @KafkaListener 注解的方法会被自动调用。

  • kafkaTemplate.sendDefault(message).addCallback:

    • 当你需要将消息发送到 Kafka 主题时,利用 kafkaTemplate 发送消息,并可以通过回调处置惩罚发送效果。

3. 示例代码



  • @KafkaListener 示例:
    1. import org.springframework.kafka.annotation.KafkaListener;
    2. import org.springframework.stereotype.Service;
    3. @Service
    4. public class KafkaConsumer {
    5.     @KafkaListener(topics = "your_topic", groupId = "your_group_id")
    6.     public void listen(String message) {
    7.         // **处理接收到的消息**
    8.         System.out.println("Received message: " + message);
    9.     }
    10. }
    复制代码
  • kafkaTemplate.sendDefault 示例:
    1. kafkaTemplate.sendDefault("your_topic", message).addCallback(new Callback() {
    2.     @Override
    3.     public void onSuccess(SendResult<String, String> result) {
    4.         // **处理成功的逻辑**
    5.     }
    6.     @Override
    7.     public void onFailure(ProducerRecord<String, String> producerRecord, Exception ex) {
    8.         // **处理失败的逻辑**
    9.     }
    10. });
    复制代码
总结



  • @KafkaListener 是用于消费消息的,而 kafkaTemplate.sendDefault 是用于发送消息的。
在 kafkaTemplate.sendDefault(message).addCallback 的成功回调中,包罗的信息主要是 SendResult 对象。这个对象提供了关于发送消息的具体信息,包罗:

  • ProducerRecord:发送的消息记录。
  • RecordMetadata:关于消息的元数据,例如主题、分区、偏移量等。
示例代码

以下是一个示例,展示了如何在成功回调中利用这些信息:
  1. import org.springframework.kafka.core.KafkaTemplate;
  2. import org.springframework.kafka.support.Callback;
  3. import org.springframework.kafka.support.SendResult;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. public class KafkaProducer {
  7.     private final KafkaTemplate<String, String> kafkaTemplate;
  8.     public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
  9.         this.kafkaTemplate = kafkaTemplate;
  10.     }
  11.     public void sendMessage(String message) {
  12.         kafkaTemplate.sendDefault(message).addCallback(new Callback() {
  13.             @Override
  14.             public void onSuccess(SendResult<String, String> result) {
  15.                 // **获取发送的消息记录**
  16.                 String sentMessage = result.getProducerRecord().value();
  17.                 // **获取元数据**
  18.                 String topic = result.getRecordMetadata().topic();
  19.                 int partition = result.getRecordMetadata().partition();
  20.                 long offset = result.getRecordMetadata().offset();
  21.                 // **处理成功的逻辑**
  22.                 System.out.printf("Message sent successfully: %s, Topic: %s, Partition: %d, Offset: %d%n",
  23.                         sentMessage, topic, partition, offset);
  24.             }
  25.             @Override
  26.             public void onFailure(org.apache.kafka.clients.producer.ProducerRecord<String, String> producerRecord, Exception ex) {
  27.                 // **处理失败的逻辑**
  28.                 System.err.println("Message failed to send: " + ex.getMessage());
  29.             }
  30.         });
  31.     }
  32. }
复制代码
关键点



  • result.getProducerRecord().value():获取发送的消息内容。
  • result.getRecordMetadata().topic():获取消息发送到的主题。
  • result.getRecordMetadata().partition():获取消息发送到的分区。
  • result.getRecordMetadata().offset():获取消息在分区中的偏移量。
总结

在成功回调中,你可以获取到关于发送消息的具体信息,这些信息对于后续的处置惩罚和日记记录非常有用。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曂沅仴駦

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表