Spring Boot 整合 Kafka 详解

打印 上一主题 下一主题

主题 845|帖子 845|积分 2537

Spring Boot 整合 Kafka 详解

本文将详细介绍怎样在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的设置、消息的同步和异步发送。
1. 情况准备

在开始之前,请确保你已经安装并设置好 Kafka 集群。假如还没有,请参考 Kafka 官方文档进行安装和设置。
2. 创建 Spring Boot 项目

2.1 使用 Spring Initializr 创建项目

访问 Spring Initializr,选择以下设置:


  • Project: Maven Project
  • Language: Java
  • Spring Boot: 2.2.2.RELEASE
  • Dependencies: Spring for Apache Kafka
点击 “Generate” 按钮,下载生成的项目,并解压到本地。
3. 添加依靠

在 pom.xml 文件中添加 Kafka 依靠:
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.boot</groupId>
  4.         <artifactId>spring-boot-starter</artifactId>
  5.     </dependency>
  6.     <dependency>
  7.         <groupId>org.springframework.kafka</groupId>
  8.         <artifactId>spring-kafka</artifactId>
  9.     </dependency>
  10. </dependencies>
复制代码
4. 设置 Kafka

在 src/main/resources 目次下创建 application.yml 文件,并添加以下设置:
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       group-id: my-group
  6.       auto-offset-reset: earliest
  7.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.     producer:
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
设置阐明:


  • bootstrap-servers: Kafka broker 的地址列表。
  • consumer: 斲丧者设置,包括斲丧者组 ID、偏移量重置策略、键和值的反序列化器。
  • producer: 生产者设置,包括键和值的序列化器。
5. 创建 Kafka 生产者

在 src/main/java/com/example/demo 目次下创建 KafkaProducerConfig.java 文件,并添加以下代码:
  1. package com.example.demo;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.kafka.core.ProducerFactory;
  9. import org.springframework.kafka.core.ProducerFactory;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. @Configuration
  15. public class KafkaProducerConfig {
  16.     @Bean
  17.     public ProducerFactory<String, String> producerFactory() {
  18.         Map<String, Object> configProps = new HashMap<>();
  19.         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  20.         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  21.         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  22.         return new DefaultKafkaProducerFactory<>(configProps);
  23.     }
  24.     @Bean
  25.     public KafkaTemplate<String, String> kafkaTemplate() {
  26.         return new KafkaTemplate<>(producerFactory());
  27.     }
  28. }
复制代码
6. 发送消息

在 src/main/java/com/example/demo 目次下创建 KafkaProducerService.java 文件,并添加以下代码:
  1. package com.example.demo;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.kafka.support.SendResult;
  5. import org.springframework.stereotype.Service;
  6. import org.springframework.util.concurrent.ListenableFuture;
  7. import org.springframework.util.concurrent.ListenableFutureCallback;
  8. @Service
  9. public class KafkaProducerService {
  10.     @Autowired
  11.     private KafkaTemplate<String, String> kafkaTemplate;
  12.     private static final String TOPIC = "my-topic";
  13.     // 同步发送消息
  14.     public void sendMessageSync(String message) {
  15.         try {
  16.             kafkaTemplate.send(TOPIC, message).get();
  17.             System.out.println("同步消息发送成功: " + message);
  18.         } catch (Exception e) {
  19.             e.printStackTrace();
  20.             System.out.println("同步消息发送失败: " + message);
  21.         }
  22.     }
  23.     // 异步发送消息
  24.     public void sendMessageAsync(String message) {
  25.         ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
  26.         future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
  27.             @Override
  28.             public void onSuccess(SendResult<String, String> result) {
  29.                 System.out.println("异步消息发送成功: " + message);
  30.             }
  31.             @Override
  32.             public void onFailure(Throwable ex) {
  33.                 ex.printStackTrace();
  34.                 System.out.println("异步消息发送失败: " + message);
  35.             }
  36.         });
  37.     }
  38. }
复制代码
7. 测试 Kafka 生产者

在 src/main/java/com/example/demo 目次下创建 DemoApplication.java 文件,并添加以下代码:
  1. package com.example.demo;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.CommandLineRunner;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. @SpringBootApplication
  7. public class DemoApplication implements CommandLineRunner {
  8.     @Autowired
  9.     private KafkaProducerService kafkaProducerService;
  10.     public static void main(String[] args) {
  11.         SpringApplication.run(DemoApplication.class, args);
  12.     }
  13.     @Override
  14.     public void run(String... args) throws Exception {
  15.         kafkaProducerService.sendMessageSync("Hello, Kafka (Sync)!");
  16.         kafkaProducerService.sendMessageAsync("Hello, Kafka (Async)!");
  17.     }
  18. }
复制代码
8. 运行结果

运行 DemoApplication 类,将看到控制台输出如下消息:
  1. 同步消息发送成功: Hello, Kafka (Sync)!
  2. 异步消息发送成功: Hello, Kafka (Async)!
复制代码
假如 Kafka 生产者发送消息失败,将看到错误信息。
9. 总结

本文详细介绍了怎样在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的设置、消息的同步和异步发送。通过明确和实践这些内容,可以资助你更好地掌握 Spring Boot 与 Kafka 的整合与应用。渴望本文对你有所资助,如有任何疑问或建议,欢迎留言讨论。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表