Spring Boot 集成 Kafka

一给  金牌会员 | 2024-10-19 06:18:37 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 863|帖子 863|积分 2589

Spring Boot 与 Kafka 集成是实现高效消息通报和数据流处理的常见方式。Spring Boot 提供了简化 Kafka 设置和使用的功能,使得集成过程变得更加直观和高效。以下是 Spring Boot 集成 Kafka 的详细步调,包括设置、生产者和斲丧者的实现以及一些高级特性。
1. 添加依赖

首先,你必要在 Spring Boot 项目标 pom.xml 文件中添加 Kafka 相干的依赖。使用 Spring Boot 的起步依赖可以简化设置。
  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-kafka</artifactId>
  4. </dependency>
复制代码
2. 设置 Kafka

2.1. 设置文件

在 application.properties 或 application.yml 文件中设置 Kafka 相干属性。
application.properties:
  1. # Kafka 服务器地址
  2. spring.kafka.bootstrap-servers=localhost:9092
  3. # Kafka 消费者配置
  4. spring.kafka.consumer.group-id=my-group
  5. spring.kafka.consumer.auto-offset-reset=earliest
  6. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  7. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  8. # Kafka 生产者配置
  9. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  10. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
复制代码
application.yml:
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: localhost:9092
  4.     consumer:
  5.       group-id: my-group
  6.       auto-offset-reset: earliest
  7.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  8.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  9.     producer:
  10.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  11.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
2.2. Kafka 设置类

在 Spring Boot 中,你可以使用 @Configuration 注解创建一个设置类,来界说 Kafka 的生产者和斲丧者设置。
  1. import org.apache.kafka.clients.producer.ProducerConfig;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.core.ConsumerFactory;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.core.ProducerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  12. import org.springframework.kafka.core.KafkaTemplate;
  13. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  14. import org.springframework.kafka.listener.config.ContainerProperties;
  15. import org.springframework.kafka.annotation.EnableKafka;
  16. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  17. import org.springframework.kafka.core.ConsumerFactory;
  18. import org.springframework.kafka.core.ProducerFactory;
  19. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  20. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  21. import java.util.HashMap;
  22. import java.util.Map;
  23. @Configuration
  24. @EnableKafka
  25. public class KafkaConfig {
  26.     @Bean
  27.     public ProducerFactory<String, String> producerFactory() {
  28.         Map<String, Object> configProps = new HashMap<>();
  29.         configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  30.         configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  31.         configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  32.         return new DefaultKafkaProducerFactory<>(configProps);
  33.     }
  34.     @Bean
  35.     public KafkaTemplate<String, String> kafkaTemplate() {
  36.         return new KafkaTemplate<>(producerFactory());
  37.     }
  38.     @Bean
  39.     public ConsumerFactory<String, String> consumerFactory() {
  40.         Map<String, Object> configProps = new HashMap<>();
  41.         configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  42.         configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
  43.         configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  44.         configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  45.         return new DefaultKafkaConsumerFactory<>(configProps);
  46.     }
  47.     @Bean
  48.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  49.         ConcurrentKafkaListenerContainerFactory<String, String> factory =
  50.                 new ConcurrentKafkaListenerContainerFactory<>();
  51.         factory.setConsumerFactory(consumerFactory());
  52.         return factory;
  53.     }
  54. }
复制代码
3. 实现 Kafka 生产者

3.1. 生产者服务

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class KafkaProducerService {
  6.     @Autowired
  7.     private KafkaTemplate<String, String> kafkaTemplate;
  8.     private static final String TOPIC = "my_topic";
  9.     public void sendMessage(String message) {
  10.         kafkaTemplate.send(TOPIC, message);
  11.     }
  12. }
复制代码
3.2. 控制器示例

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.web.bind.annotation.PostMapping;
  3. import org.springframework.web.bind.annotation.RequestBody;
  4. import org.springframework.web.bind.annotation.RestController;
  5. @RestController
  6. public class KafkaController {
  7.     @Autowired
  8.     private KafkaProducerService kafkaProducerService;
  9.     @PostMapping("/send")
  10.     public void sendMessage(@RequestBody String message) {
  11.         kafkaProducerService.sendMessage(message);
  12.     }
  13. }
复制代码
4. 实现 Kafka 斲丧者

4.1. 斲丧者服务

  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class KafkaConsumerService {
  5.     @KafkaListener(topics = "my_topic", groupId = "my-group")
  6.     public void listen(String message) {
  7.         System.out.println("Received message: " + message);
  8.     }
  9. }
复制代码
5. 高级特性

5.1. 消息事务

Kafka 支持消息事务,确保消息的原子性。
生产者设置
  1. spring.kafka.producer.enable-idempotence=true
  2. spring.kafka.producer.transaction-id-prefix=my-transactional-id
复制代码
使用事务
  1. import org.springframework.kafka.core.KafkaTemplate;
  2. import org.springframework.kafka.core.ProducerFactory;
  3. import org.springframework.kafka.core.TransactionTemplate;
  4. import org.springframework.stereotype.Service;
  5. import org.springframework.transaction.annotation.Transactional;
  6. @Service
  7. public class KafkaTransactionalService {
  8.     private final KafkaTemplate<String, String> kafkaTemplate;
  9.     private final TransactionTemplate transactionTemplate;
  10.     public KafkaTransactionalService(KafkaTemplate<String, String> kafkaTemplate, TransactionTemplate transactionTemplate) {
  11.         this.kafkaTemplate = kafkaTemplate;
  12.         this.transactionTemplate = transactionTemplate;
  13.     }
  14.     @Transactional
  15.     public void sendMessageInTransaction(String message) {
  16.         kafkaTemplate.executeInTransaction(t -> {
  17.             kafkaTemplate.send("my_topic", message);
  18.             return true;
  19.         });
  20.     }
  21. }
复制代码
5.2. 异步发送与回调

异步发送
  1. public void sendMessageAsync(String message) {
  2.     kafkaTemplate.send("my_topic", message).addCallback(
  3.         result -> System.out.println("Sent message: " + message),
  4.         ex -> System.err.println("Failed to send message: " + ex.getMessage())
  5.     );
  6. }
复制代码
总结

Spring Boot 与 Kafka 的集成使得消息队列的使用变得更加简朴和高效。通过上述步调,你可以轻松地设置 Kafka、实现生产者和斲丧者,并使用 Spring Boot 提供的强大功能来处理消息流。相识 Kafka 的高级特性(如事务和异步处理)能够帮助你更好地满意业务需求,确保系统的高可用性和数据同等性。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

一给

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表