Kafka 在 Spring Boot 项目中的实战指南

打印 上一主题 下一主题

主题 1006|帖子 1006|积分 3018

Kafka 在 Spring Boot 项目中的实战指南

弁言

随着大数据与微服务架构的盛行,Kafka 作为一款高性能的分布式流处理平台,在企业级项目里愈发举足轻重。Spring Boot 则以简化配置、快速开发著称,将二者结合,能高效搭建起具备强盛数据处理能力的应用步伐。本文将详细介绍 Kafka 在 Spring Boot 项目中的使用步骤及其背后的原理。

环境搭建


  • 引入依赖:在项目的 pom.xml 文件里添加 Kafka 相关的依赖。
  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.springframework.kafka</groupId>
  4.         <artifactId>spring-kafka</artifactId>
  5.     </dependency>
  6. </dependencies>
复制代码
spring-kafka 是 Spring 官方提供的整合 Kafka 的库,它封装了 Kafka 客户端,让 Spring Boot 开发者可以用熟悉的 Spring 编程风格去和 Kafka 交互,免去直接处理复杂 Kafka 客户端 API 的麻烦。

  • 配置 Kafka 服务器:在 application.properties 中配置 Kafka 连接信息。
  1. spring.kafka.bootstrap-servers=localhost:9092
复制代码
spring.kafka.bootstrap-servers 用来指定 Kafka 集群的初始连接点,也就是告诉 Spring Boot 项目该去那里找到 Kafka 服务器。这里配置为当地的 localhost:9092,假如是集群环境,可添加多个地点,用逗号分隔,方便后续客户端发现集群中的所有 Broker。
定义 Kafka 生产者


  • 配置类创建
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.common.serialization.StringSerializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  6. import org.springframework.kafka.core.KafkaProducerFactory;
  7. import org.springframework.kafka.core.ProducerFactory;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. @Configuration
  11. public class KafkaProducerConfig {
  12.     @Bean
  13.     public ProducerFactory<String, String> producerFactory() {
  14.         Map<String, Object> configProps = new HashMap<>();
  15.         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  16.         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  17.         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  18.         return new DefaultKafkaProducerFactory<>(configProps);
  19.     }
  20.     @Bean
  21.     public org.springframework.kafka.core.KafkaProducer<String, String> kafkaProducer() {
  22.         return new org.springframework.kafka.core.KafkaProducer<>(producerFactory());
  23.     }
  24. }
复制代码
@Configuration 标志这是一个配置类。producerFactory 方法构建了一个 ProducerFactory,它是创建 Kafka 生产者的工厂,内里配置了 Kafka 服务器地点、键与值的序列化类,序列化是为了把 Java 对象转化成能在 Kafka 网络传输的字节数组。kafkaProducer 方法利用这个工厂创建出现实的 Kafka 生产者实例。

  • 生产者服务类
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaProducer;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaMessageProducer {
  6.     @Autowired
  7.     private KafkaProducer<String, String> kafkaProducer;
  8.     public void sendMessage(String topic, String message) {
  9.         kafkaProducer.send(topic, message);
  10.     }
  11. }
复制代码
@Service 表明这是一个业务服务类。sendMessage 方法借助注入的 Kafka 生产者,将给定的消息发送到指定的 Kafka 主题。这里的 topic 就是消息的分类标签,差别主题可用于区分差别业务的数据。
定义 Kafka 消耗者


  • 配置类
  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.kafka.annotation.EnableKafka;
  6. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. @Configuration
  12. @EnableKafka
  13. public class KafkaConsumerConfig {
  14.     @Bean
  15.     public ConsumerFactory<String, String> consumerFactory() {
  16.         Map<String, Object> configProps = new HashMap<>();
  17.         configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  18.         configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  19.         configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  20.         return new DefaultKafkaConsumerFactory<>(configProps);
  21.     }
  22.     @Bean
  23.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  24.         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  25.         factory.setConsumerFactory(consumerFactory());
  26.         return factory;
  27.     }
  28. }
复制代码
@Configuration 用于配置类,@EnableKafka 开启 Spring Boot 项目对 Kafka 的监听功能。consumerFactory 方法创建了 ConsumerFactory,配置了 Kafka 连接、键值的反序列化类,反序列化把从 Kafka 吸收的字节数组变回 Java 对象。kafkaListenerContainerFactory 方法则构建了一个容器工厂,用来创建管理 Kafka 消耗者的容器。

  • 消耗者服务类
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Component;
  3. @Component
  4. public class KafkaMessageConsumer {
  5.     @KafkaListener(topics = "testTopic", groupId = "testGroup")
  6.     public void receiveMessage(String message) {
  7.         System.out.println("Received message: " + message);
  8.     }
  9. }
复制代码
@Component 标识这是一个 Spring 组件。@KafkaListener 注解指定监听的主题是 testTopic,所属消耗组是 testGroup。当 testTopic 中有新消息时,receiveMessage 方法就会被触发,在这里只是简单打印收到的消息。
测试集成

可以在 Spring Boot 的控制器或者测试类中注入生产者服务类来测试:
  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.GetMapping;
  3. import org.springframework.web.bind.annotation.RestController;
  4. @RestController
  5. public class TestController {
  6.     @Autowired
  7.     private KafkaMessageProducer kafkaMessageProducer;
  8.     @GetMapping("/send")
  9.     public String sendMessage() {
  10.         kafkaMessageProducer.sendMessage("testTopic", "Hello, Kafka in Spring Boot!");
  11.         return "Message sent";
  12.     }
  13. }
复制代码
运行 Spring Boot 项目,访问 /send 接口,生产者会将消息发送到 Kafka 主题,随后消耗者便能吸收到并处理,完成了 Kafka 在 Spring Boot 项目中的根本集成流程。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

钜形不锈钢水箱

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表