ToB企服应用市场:ToB评测及商务社交产业平台

标题: Kafka Producer发送消息流程之消息异步发送和同步发送 [打印本页]

作者: 圆咕噜咕噜    时间: 2024-7-19 09:43
标题: Kafka Producer发送消息流程之消息异步发送和同步发送

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步担当效果。
  1. public class KafkaProducerCallbackTest {
  2.     public static void main(String[] args) throws InterruptedException {
  3.         //创建producer
  4.         HashMap<String, Object> config = new HashMap<>();
  5.         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
  6.         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7.         config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8.         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
  9.         for (int i = 0; i < 10; i++) {
  10.             //创建record
  11.             ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  12.                     "test2",
  13.                     ""+i,
  14.                     "我是你爹"+i
  15.             );
  16.             //发送record
  17.             producer.send(record, new Callback() {
  18.                 @Override
  19.                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  20.                     System.out.println("回调信息:消息发送成功");
  21.                 }
  22.             });
  23.             System.out.println("发送数据");
  24.         }
  25.         //关闭producer
  26.         producer.close();
  27.     }
  28. }
复制代码
Main线程中,对于多条数据,下一条消息的发送并不期待上一条消息简直认,而是继续发送。
  1. 2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
  2. 发送数据
  3. 发送数据
  4. 发送数据
  5. 发送数据
  6. 发送数据
  7. 发送数据
  8. 发送数据
  9. 发送数据
  10. 发送数据
  11. 发送数据
  12. 2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
  13. 2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
  14. 回调信息:消息发送成功
  15. 回调信息:消息发送成功
  16. 回调信息:消息发送成功
  17. 回调信息:消息发送成功
  18. 回调信息:消息发送成功
  19. 回调信息:消息发送成功
  20. 回调信息:消息发送成功
  21. 回调信息:消息发送成功
  22. 回调信息:消息发送成功
  23. 回调信息:消息发送成功
  24. 2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
复制代码
可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。
2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。
按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据网络器,到Sender,最后还要期待回调确认,才可以开始下一条消息的流转。
  1. public class KafkaProducerCallbackTest {
  2.     public static void main(String[] args) throws InterruptedException, ExecutionException {
  3.         //创建producer
  4.         HashMap<String, Object> config = new HashMap<>();
  5.         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
  6.         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  7.         config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  8.         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);
  9.         for (int i = 0; i < 10; i++) {
  10.             //创建record
  11.             ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  12.                     "test2",
  13.                     ""+i,
  14.                     "我是你爹"+i
  15.             );
  16.             //发送record
  17.             Future<RecordMetadata> send = producer.send(record, new Callback() {
  18.                 @Override
  19.                 public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  20.                     System.out.println("回调信息:消息发送成功");
  21.                 }
  22.             });
  23.             System.out.println("发送数据");
  24.             send.get();
  25.         }
  26.         //关闭producer
  27.         producer.close();
  28.     }
  29. }
复制代码
  1. 2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
  2. 发送数据
  3. 回调信息:消息发送成功
  4. 发送数据
  5. 回调信息:消息发送成功
  6. 发送数据
  7. 回调信息:消息发送成功
  8. 发送数据
  9. 回调信息:消息发送成功
  10. 发送数据
  11. 回调信息:消息发送成功
  12. 发送数据
  13. 回调信息:消息发送成功
  14. 发送数据
  15. 回调信息:消息发送成功
  16. 发送数据
  17. 回调信息:消息发送成功
  18. 发送数据
  19. 回调信息:消息发送成功
  20. 发送数据
  21. 回调信息:消息发送成功
  22. 2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
  23. 2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4