Kafka 在 Spring Boot 项目中的实战指南
弁言
随着大数据与微服务架构的盛行,Kafka 作为一款高性能的分布式流处理平台,在企业级项目里愈发举足轻重。Spring Boot 则以简化配置、快速开发著称,将二者结合,能高效搭建起具备强盛数据处理能力的应用步伐。本文将详细介绍 Kafka 在 Spring Boot 项目中的使用步骤及其背后的原理。
环境搭建
- 引入依赖:在项目的 pom.xml 文件里添加 Kafka 相关的依赖。
- <dependencies>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- </dependencies>
复制代码 spring-kafka 是 Spring 官方提供的整合 Kafka 的库,它封装了 Kafka 客户端,让 Spring Boot 开发者可以用熟悉的 Spring 编程风格去和 Kafka 交互,免去直接处理复杂 Kafka 客户端 API 的麻烦。
- 配置 Kafka 服务器:在 application.properties 中配置 Kafka 连接信息。
- spring.kafka.bootstrap-servers=localhost:9092
复制代码 spring.kafka.bootstrap-servers 用来指定 Kafka 集群的初始连接点,也就是告诉 Spring Boot 项目该去那里找到 Kafka 服务器。这里配置为当地的 localhost:9092,假如是集群环境,可添加多个地点,用逗号分隔,方便后续客户端发现集群中的所有 Broker。
定义 Kafka 生产者
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaProducerFactory;
- import org.springframework.kafka.core.ProducerFactory;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- public class KafkaProducerConfig {
- @Bean
- public ProducerFactory<String, String> producerFactory() {
- Map<String, Object> configProps = new HashMap<>();
- configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(configProps);
- }
- @Bean
- public org.springframework.kafka.core.KafkaProducer<String, String> kafkaProducer() {
- return new org.springframework.kafka.core.KafkaProducer<>(producerFactory());
- }
- }
复制代码 @Configuration 标志这是一个配置类。producerFactory 方法构建了一个 ProducerFactory,它是创建 Kafka 生产者的工厂,内里配置了 Kafka 服务器地点、键与值的序列化类,序列化是为了把 Java 对象转化成能在 Kafka 网络传输的字节数组。kafkaProducer 方法利用这个工厂创建出现实的 Kafka 生产者实例。
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaProducer;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaMessageProducer {
- @Autowired
- private KafkaProducer<String, String> kafkaProducer;
- public void sendMessage(String topic, String message) {
- kafkaProducer.send(topic, message);
- }
- }
复制代码 @Service 表明这是一个业务服务类。sendMessage 方法借助注入的 Kafka 生产者,将给定的消息发送到指定的 Kafka 主题。这里的 topic 就是消息的分类标签,差别主题可用于区分差别业务的数据。
定义 Kafka 消耗者
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- @EnableKafka
- public class KafkaConsumerConfig {
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- Map<String, Object> configProps = new HashMap<>();
- configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return new DefaultKafkaConsumerFactory<>(configProps);
- }
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
- }
复制代码 @Configuration 用于配置类,@EnableKafka 开启 Spring Boot 项目对 Kafka 的监听功能。consumerFactory 方法创建了 ConsumerFactory,配置了 Kafka 连接、键值的反序列化类,反序列化把从 Kafka 吸收的字节数组变回 Java 对象。kafkaListenerContainerFactory 方法则构建了一个容器工厂,用来创建管理 Kafka 消耗者的容器。
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Component;
- @Component
- public class KafkaMessageConsumer {
- @KafkaListener(topics = "testTopic", groupId = "testGroup")
- public void receiveMessage(String message) {
- System.out.println("Received message: " + message);
- }
- }
复制代码 @Component 标识这是一个 Spring 组件。@KafkaListener 注解指定监听的主题是 testTopic,所属消耗组是 testGroup。当 testTopic 中有新消息时,receiveMessage 方法就会被触发,在这里只是简单打印收到的消息。
测试集成
可以在 Spring Boot 的控制器或者测试类中注入生产者服务类来测试:
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- public class TestController {
- @Autowired
- private KafkaMessageProducer kafkaMessageProducer;
- @GetMapping("/send")
- public String sendMessage() {
- kafkaMessageProducer.sendMessage("testTopic", "Hello, Kafka in Spring Boot!");
- return "Message sent";
- }
- }
复制代码 运行 Spring Boot 项目,访问 /send 接口,生产者会将消息发送到 Kafka 主题,随后消耗者便能吸收到并处理,完成了 Kafka 在 Spring Boot 项目中的根本集成流程。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |