Spring Boot 与 Kafka 集成是实现高效消息通报和数据流处理的常见方式。Spring Boot 提供了简化 Kafka 设置和使用的功能,使得集成过程变得更加直观和高效。以下是 Spring Boot 集成 Kafka 的详细步调,包括设置、生产者和斲丧者的实现以及一些高级特性。
1. 添加依赖
首先,你必要在 Spring Boot 项目标 pom.xml 文件中添加 Kafka 相干的依赖。使用 Spring Boot 的起步依赖可以简化设置。
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-kafka</artifactId>
- </dependency>
复制代码 2. 设置 Kafka
2.1. 设置文件
在 application.properties 或 application.yml 文件中设置 Kafka 相干属性。
application.properties:
- # Kafka 服务器地址
- spring.kafka.bootstrap-servers=localhost:9092
- # Kafka 消费者配置
- spring.kafka.consumer.group-id=my-group
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- # Kafka 生产者配置
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
复制代码 application.yml:
- spring:
- kafka:
- bootstrap-servers: localhost:9092
- consumer:
- group-id: my-group
- auto-offset-reset: earliest
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- producer:
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码 2.2. Kafka 设置类
在 Spring Boot 中,你可以使用 @Configuration 注解创建一个设置类,来界说 Kafka 的生产者和斲丧者设置。
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
- import org.springframework.kafka.listener.config.ContainerProperties;
- import org.springframework.kafka.annotation.EnableKafka;
- import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
- import org.springframework.kafka.core.ConsumerFactory;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
- import org.springframework.kafka.core.DefaultKafkaProducerFactory;
- import java.util.HashMap;
- import java.util.Map;
- @Configuration
- @EnableKafka
- public class KafkaConfig {
- @Bean
- public ProducerFactory<String, String> producerFactory() {
- Map<String, Object> configProps = new HashMap<>();
- configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return new DefaultKafkaProducerFactory<>(configProps);
- }
- @Bean
- public KafkaTemplate<String, String> kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
- @Bean
- public ConsumerFactory<String, String> consumerFactory() {
- Map<String, Object> configProps = new HashMap<>();
- configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
- configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return new DefaultKafkaConsumerFactory<>(configProps);
- }
- @Bean
- public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
- ConcurrentKafkaListenerContainerFactory<String, String> factory =
- new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- return factory;
- }
- }
复制代码 3. 实现 Kafka 生产者
3.1. 生产者服务
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaProducerService {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- private static final String TOPIC = "my_topic";
- public void sendMessage(String message) {
- kafkaTemplate.send(TOPIC, message);
- }
- }
复制代码 3.2. 控制器示例
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestBody;
- import org.springframework.web.bind.annotation.RestController;
- @RestController
- public class KafkaController {
- @Autowired
- private KafkaProducerService kafkaProducerService;
- @PostMapping("/send")
- public void sendMessage(@RequestBody String message) {
- kafkaProducerService.sendMessage(message);
- }
- }
复制代码 4. 实现 Kafka 斲丧者
4.1. 斲丧者服务
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.stereotype.Service;
- @Service
- public class KafkaConsumerService {
- @KafkaListener(topics = "my_topic", groupId = "my-group")
- public void listen(String message) {
- System.out.println("Received message: " + message);
- }
- }
复制代码 5. 高级特性
5.1. 消息事务
Kafka 支持消息事务,确保消息的原子性。
生产者设置:
- spring.kafka.producer.enable-idempotence=true
- spring.kafka.producer.transaction-id-prefix=my-transactional-id
复制代码 使用事务:
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.kafka.core.ProducerFactory;
- import org.springframework.kafka.core.TransactionTemplate;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- @Service
- public class KafkaTransactionalService {
- private final KafkaTemplate<String, String> kafkaTemplate;
- private final TransactionTemplate transactionTemplate;
- public KafkaTransactionalService(KafkaTemplate<String, String> kafkaTemplate, TransactionTemplate transactionTemplate) {
- this.kafkaTemplate = kafkaTemplate;
- this.transactionTemplate = transactionTemplate;
- }
- @Transactional
- public void sendMessageInTransaction(String message) {
- kafkaTemplate.executeInTransaction(t -> {
- kafkaTemplate.send("my_topic", message);
- return true;
- });
- }
- }
复制代码 5.2. 异步发送与回调
异步发送:
- public void sendMessageAsync(String message) {
- kafkaTemplate.send("my_topic", message).addCallback(
- result -> System.out.println("Sent message: " + message),
- ex -> System.err.println("Failed to send message: " + ex.getMessage())
- );
- }
复制代码 总结
Spring Boot 与 Kafka 的集成使得消息队列的使用变得更加简朴和高效。通过上述步调,你可以轻松地设置 Kafka、实现生产者和斲丧者,并使用 Spring Boot 提供的强大功能来处理消息流。相识 Kafka 的高级特性(如事务和异步处理)能够帮助你更好地满意业务需求,确保系统的高可用性和数据同等性。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |