Spring Boot 整合 Kafka 详解

打印 上一主题 下一主题

主题 865|帖子 865|积分 2595

前言:
上一篇分享了 Kafka 的一些根本概念及应用场景,本篇我们来分享一下在 Spring Boot 项目中怎样使用 Kafka。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 集成 Kafka

引入 Kafka 依赖
在项目的 pom.xml 文件中引入 Kafka 依赖,如下:
  1. <dependency>
  2.         <groupId>org.springframework.cloud</groupId>
  3.         <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  4.         <version>3.1.6</version>
  5. </dependency>
复制代码
添加 Kafka 设置
在 application.properties 文件中设置 Kafka 相干设置,如下:
  1. #kafka server 地址
  2. spring.kafka.bootstrap-servers=10.xxx.4.xxx:9092,10.xxx.4.xxx::9092,10.xxx.4.xxx::9092
  3. spring.kafka.producer.acks = 1
  4. spring.kafka.producer.retries = 0
  5. spring.kafka.producer.batch-size = 30720000
  6. spring.kafka.producer.buffer-memory = 33554432
  7. spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
  8. spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
  9. #消费者配置
  10. spring.kafka.consumer.group-id = test-kafka
  11. #是否开启手动提交 默认自动提交
  12. spring.kafka.consumer.enable-auto-commit = true
  13. #如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔
  14. spring.kafka.consumer.auto-commit-interval = 5000
  15. #earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费
  16. spring.kafka.consumer.auto-offset-reset = earliest
  17. spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
  18. spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
  19. #kafka 没有创建指定的 topic 下  项目启动是否报错 true  false
  20. spring.kafka.listener.missing-topics-fatal = false
  21. #Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
  22. spring.kafka.listener.type = single
  23. #一次调用 poll() 操作时返回的最大记录数 默认为 500 条
  24. spring.kafka.consumer.max-poll-records = 2
  25. #消息 ACK 模式 有7种
  26. spring.kafka.listener.ack-mode = manual_immediate
  27. #kafka session timeout
  28. spring.kafka.consumer.session.timeout.ms = 300000
复制代码
设置 Kafka Producer
我们创建一个设置类,并设置生产者工厂,设置 KafkaTemplate。
  1. package com.order.service.config;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.annotation.EnableKafka;
  8. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. /**
  14. * @author :author
  15. * @description:
  16. * @modified By:
  17. * @version: V1.0
  18. */
  19. @Configuration
  20. @EnableKafka
  21. public class KafkaProducerConfig {
  22.     @Value("${spring.kafka.bootstrap-servers}")
  23.     private String servers;
  24.     @Bean("myProducerKafkaProps")
  25.     public Map<String, Object> getMyKafkaProps() {
  26.         Map<String, Object> props = new HashMap<>(4);
  27.         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  28.         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  29.         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  30.         return props;
  31.     }
  32.     @Bean
  33.     public ProducerFactory<String, String> newProducerFactory() {
  34.         return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
  35.     }
  36.     @Bean
  37.     public KafkaTemplate<String, String> kafkaTemplate() {
  38.         return new KafkaTemplate<>(newProducerFactory());
  39.     }
  40. }
复制代码
设置 Kafka Cousumer
我们创建一个设置类,设置消费者工厂和监听容器。
  1. package com.order.service.config;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.annotation.EnableKafka;
  8. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  9. import org.springframework.kafka.config.KafkaListenerContainerFactory;
  10. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  11. import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
  12. import org.springframework.kafka.listener.ContainerProperties;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. /**
  16. * @author :author
  17. * @description:
  18. * @modified By:
  19. * @version: V1.0
  20. */
  21. @Configuration
  22. @EnableKafka
  23. public class KafkaConsumerConfig {
  24.     @Value("${spring.kafka.bootstrap-servers}")
  25.     private String servers;
  26.     @Value("${spring.kafka.consumer.group-id}")
  27.     private String groupId;
  28.     @Value("${spring.kafka.consumer.auto-offset-reset}")
  29.     private String offsetReset;
  30.     @Value("${spring.kafka.consumer.max-poll-records}")
  31.     private String maxPollRecords;
  32.     @Value("${spring.kafka.consumer.auto-commit-interval}")
  33.     private String autoCommitIntervalMs;
  34.     @Value("${spring.kafka.consumer.enable-auto-commit}")
  35.     private boolean enableAutoCommit;
  36.     @Bean("myConsumerKafkaProps")
  37.     public Map<String, Object> getMyKafkaProps() {
  38.         Map<String, Object> props = new HashMap<>(12);
  39.         //是否自动提交
  40.         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  41.         //kafak 服务器
  42.         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  43.         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  44.         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  45.         //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
  46.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
  47.         //消费组id
  48.         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  49.         //一次调用poll()操作时返回的最大记录数,默认值为500
  50.         props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
  51.         //自动提交时间间隔 默认 5秒
  52.         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
  53.         //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
  54.         return props;
  55.     }
  56.     /**
  57.      * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
  58.      * @date 2024/10/22 19:41
  59.      * @description kafka 消费者工厂
  60.      */
  61.     @Bean("myContainerFactory")
  62.     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  63.         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  64.         factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
  65.         // 并发创建的消费者数量
  66.         factory.setConcurrency(3);
  67.         // 开启批处理
  68.         factory.setBatchListener(true);
  69.         //拉取超时时间
  70.         factory.getContainerProperties().setPollTimeout(1500);
  71.         //是否自动提交 ACK kafka 默认是自动提交
  72.         if (!enableAutoCommit) {
  73.             //共有其中方式
  74.             factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
  75.         }
  76.         return factory;
  77.     }
  78. }
复制代码
Kafka 消息发送
创建一个 Kafka 的 Producer,注入 KafkaTemplate,完成消息发送。
  1. package com.order.service.kafka.producer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.stereotype.Component;
  6. /**
  7. * @ClassName: KafkaProducer
  8. * @Author: Author
  9. * @Date: 2024/10/22 19:22
  10. * @Description:
  11. */
  12. @Slf4j
  13. @Component
  14. public class MyKafkaProducer {
  15.     @Autowired
  16.     private KafkaTemplate<String, String> kafkaTemplate;
  17.     public void sendMessage( String message) {
  18.         this.kafkaTemplate.send("my-topic", message);
  19.     }
  20. }
复制代码
Kafka 消息消费
创建一个 Kafka 的 Consumer,使用 @KafkaListener 注解完成消息消费。
  1. package com.order.service.kafka.consumer;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @ClassName: KafkaConsumer
  7. * @Author: Author
  8. * @Date: 2024/10/22 19:22
  9. * @Description:
  10. */
  11. @Slf4j
  12. @Component
  13. public class MyKafkaConsumer {
  14.     @KafkaListener(id = "my-kafka-consumer",
  15.             idIsGroup = false, topics = "my-topic",
  16.             containerFactory = "myContainerFactory")
  17.     public void listen(String message) {
  18.         log.info("消息消费成功,消息内容:{}", message);
  19.     }
  20. }
复制代码
Kafka 消息发送消费测试
触发消息发送后,得到如下效果:
  1. 2024-10-22 20:22:43.041  INFO 36496 --- [-consumer-0-C-1] c.o.s.kafka.consumer.MyKafkaConsumer     : 消息消费成功,消息内容:第一条 kafka 消息
复制代码
效果符合预期。
以上我们简单的分享了使用 Spring Boot 集成 Kafka 的过程,希望资助到有必要的朋侪。
如有不精确的地方接待各位指出纠正。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

兜兜零元

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表