SpringBoot 集成 Kafka

打印 上一主题 下一主题

主题 1581|帖子 1581|积分 4743

(一)Kafka介绍


Kafka 也是是我们在开发过程中经常会使用的一种消息队列
Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。
   Producer:生产者,负责将数据发送到Kafka集群。
Consumer:消耗者,从Kafka集群中读取数据。
Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
Topic:主题,数据按主题举行分类。
Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
Offset:偏移量,每个消息在其分区中的唯一标识。
  (二)使用场景


Kafka适用于以下场景:
   日志网络:会集网络体系日志和应用日志,通过Kafka传输到大数据处理体系。
消息队列:作为高吞吐量、低延迟的消息队列体系。
数据流处理:实时处理数据流,用于实时分析、监控和处理。
事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
流数据管道:构建数据管道,连接数据源和数据存储体系。
  (三)Spring Boot 集成 Kafka

前置条件:先启动zooker 服务器、再启动kafka 服务端
  1. <dependency>
  2.     <groupId>org.springframework.kafka</groupId>
  3.     <artifactId>spring-kafka</artifactId>
  4. </dependency>
复制代码
   生产者端 propertise设置
  1. #####################################################################
  2. # Kafka生产者配置文件
  3. # 包含:必须配置、强烈建议配置和可选配置
  4. # 用途:配置Kafka生产者的行为,包括消息发送方式、性能优化、可靠性保证等
  5. # 注意:某些配置项的值需要根据实际生产环境进行调整
  6. #####################################################################
  7. ###########【必须配置】###########
  8. # Kafka服务器地址,多个地址用逗号分隔(必须)
  9. # 例如:localhost:9092,localhost:9093,localhost:9094
  10. spring.kafka.producer.bootstrap-servers=localhost:9092
  11. # 消息键和值的序列化器(必须)
  12. # 用于将Java对象转换为Kafka中的二进制数据
  13. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  14. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  15. ###########【强烈建议配置】###########
  16. # 可靠性配置
  17. # acks=all 所有副本都确认才算写入成功,最高可靠性
  18. # retries 发送失败时的重试次数
  19. # enable.idempotence=true 启用幂等性,防止消息重复
  20. spring.kafka.producer.acks=all
  21. spring.kafka.producer.retries=3
  22. spring.kafka.producer.properties.enable.idempotence=true
  23. # 性能配置
  24. # batch-size 批量发送的大小(字节)
  25. # buffer-memory 生产者缓冲区大小(字节)
  26. # compression-type 消息压缩类型(none, gzip, snappy, lz4, zstd)
  27. spring.kafka.producer.batch-size=16384
  28. spring.kafka.producer.buffer-memory=33554432
  29. spring.kafka.producer.compression-type=snappy
  30. # 请求配置
  31. # request.timeout.ms 等待服务器响应的最大时间
  32. # max.request.size 单个请求的最大大小
  33. spring.kafka.producer.properties.request.timeout.ms=30000
  34. spring.kafka.producer.properties.max.request.size=1048576
  35. # 批量发送配置
  36. # linger.ms 延迟发送时间,等待更多消息一起发送
  37. # 增加此值可以提高吞吐量,但会增加延迟
  38. spring.kafka.producer.properties.linger.ms=10
  39. # 发送缓冲区配置
  40. # send.buffer.bytes TCP发送缓冲区大小
  41. # receive.buffer.bytes TCP接收缓冲区大小
  42. spring.kafka.producer.properties.send.buffer.bytes=131072
  43. spring.kafka.producer.properties.receive.buffer.bytes=32768
  44. ###########【可选配置】###########
  45. # 事务配置
  46. # 如果启用事务,必须配置事务ID前缀
  47. # 不同的生产者必须使用不同的事务ID
  48. #spring.kafka.producer.transaction-id-prefix=tx-
  49. # 生产者限制配置
  50. # max.block.ms 发送阻塞的最大时间
  51. # max.in.flight.requests.per.connection 单个连接最大未确认请求数
  52. # spring.kafka.producer.properties.max.block.ms=60000
  53. # spring.kafka.producer.properties.max.in.flight.requests.per.connection=5
复制代码
  影响 生产者举行发送 的两个紧张设置 linger.ms 和 batch-size
  1. linger.ms 的作用
  2. linger.ms 控制了生产者在发送一个批次之前等待更多消息加入该批次的时间。
  3. 如果设置为 0,生产者将不会等待,而是会立即发送任何可用的消息批次,无论该批次是否达到了 batch-size 的大小限制。
  4. batch-size 的作用
  5. batch-size 定义了生产者发送的每个批次中消息的总字节大小。当生产者收集到足够多的消息(其总字节大小达到或超过 batch-size)时,它会发送这个批次,或者当达到 linger.ms 的时间限制时(如果 linger.ms 大于 0)。
  6. linger.ms=0 时的效果
  7. 当 linger.ms 设置为 0 时,生产者不会等待更多消息加入当前批次,而是会立即发送任何已经收集到的消息。这意味着,如果生产者只收集到少量消息(其总字节大小远小于 batch-size),这些消息也会被发送出去。然而,如果生产者能够迅速收集到足够多的消息以达到或超过 batch-size,那么这些消息仍然会被组合成一个较大的批次发送。
复制代码
  消耗者端 propertise设置
  1. # Kafka消费者配置文件
  2. # 包含:必须配置、强烈建议配置和可选配置
  3. # 用途:配置Kafka消费者的行为,包括消费方式、批量处理、会话管理等
  4. # 注意:某些配置项的值需要根据实际生产环境进行调整
  5. #####################################################################
  6. ###########【必须配置】###########
  7. # Kafka服务器地址,多个地址用逗号分隔(必须)
  8. # 例如:localhost:9092,localhost:9093,localhost:9094
  9. spring.kafka.consumer.bootstrap-servers=localhost:9092
  10. # 消费者组ID,同一组的消费者协同消费消息(必须)
  11. # 相同组ID的消费者消费不同分区的消息,实现负载均衡
  12. spring.kafka.consumer.group-id=defaultConsumerGroup
  13. # 消息键和值的反序列化器(必须)
  14. # 用于将Kafka中的二进制数据转换为Java对象
  15. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  16. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  17. ###########【强烈建议配置】###########
  18. # 消息提交方式配置
  19. # enable-auto-commit=false 关闭自动提交,防止消息丢失
  20. # ack-mode=MANUAL 手动确认模式,确保消息被正确处理后才提交
  21. spring.kafka.consumer.enable-auto-commit=false
  22. spring.kafka.listener.ack-mode=MANUAL
  23. # 批量消费配置
  24. # type=batch 启用批量消费模式,提高消费效率
  25. # max-poll-records 每次批量消费的最大消息数
  26. spring.kafka.listener.type=batch
  27. spring.kafka.consumer.max-poll-records=500
  28. # 会话超时配置
  29. # session.timeout.ms 消费者组会话超时时间,如果超过此时间没有心跳则认为消费者死亡
  30. # max.poll.interval.ms 两次poll之间的最大间隔,超过则认为消费者处理能力不足
  31. spring.kafka.consumer.properties.session.timeout.ms=45000
  32. spring.kafka.consumer.properties.max.poll.interval.ms=300000
  33. # 偏移量配置
  34. # earliest: 从最早的消息开始消费
  35. # latest: 从最新的消息开始消费,保证消费者只处理最新的消息
  36. # none: 如果无偏移量则抛出异常
  37. spring.kafka.consumer.auto-offset-reset=latest
  38. # 消费者拉取配置
  39. # fetch.min.bytes 每次最小拉取大小,避免频繁拉取
  40. # fetch.max.wait.ms 当数据量不足fetch.min.bytes时,最多等待时间
  41. spring.kafka.consumer.fetch.min.bytes=1
  42. spring.kafka.consumer.fetch.max.wait.ms=500
  43. # 心跳配置
  44. # heartbeat.interval.ms 心跳间隔时间,必须小于session.timeout.ms
  45. spring.kafka.consumer.properties.heartbeat.interval.ms=3000
  46. ###########【可选配置】###########
  47. # 并发消费配置
  48. # 设置消费者线程数,提高消费能力
  49. spring.kafka.listener.concurrency=3
  50. # 消费者限制配置
  51. # max.partition.fetch.bytes 每个分区返回的最大数据量
  52. # fetch.max.bytes 一次请求中返回的最大数据量
  53. spring.kafka.consumer.max-partition-fetch-bytes=1048576
  54. spring.kafka.consumer.fetch.max.bytes=52428800
复制代码
简单实践

  1. # 简单生产者
  2. @RestController
  3. public class KafkaProducer {
  4.     @Autowired
  5.     private KafkaTemplate<String, Object> kafkaTemplate;
  6.     // 发送消息
  7.     @GetMapping("/kafka/normal/{message}")
  8.     public void sendMessage1(@PathVariable("message") String normalMessage) {
  9.         kafkaTemplate.send("topic1", normalMessage);
  10.     }
  11. }
  12. -------------------------------------------------------------------------------
  13. # 简单消费者
  14. @Component
  15. public class KafkaConsumer {
  16.     // 消费监听
  17.     @KafkaListener(topics = {"topic1"})
  18.     public void onMessage1(ConsumerRecord<?, ?> record){
  19.         // 消费的哪个topic、partition的消息,打印出消息内容
  20.         System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
  21.     }
  22. }
复制代码
  上面示例创建了一个生产者,发送消息到topic1,消耗者监听topic1消耗消息。
监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。
启动项目,postman调接口触发生产者发送消息,
   

 
生产者

   基本发送方式
  
  ListenableFuture<SendResult<K, V>> send(String topic, V value);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V value);
  
  参数:
topic:要发送消息的主题。
key:消息的键(可选)。如果指定了键,Kafka会根据分区器(Partitioner)的计谋来决定消息应该发送到哪个分区。
  
value:消息的值。
使用场景:这是最常用的发送方法。
当只需要发送消息到指定主题时,可以使用第一个方法
当需要控制消息的分区时,可以使用第二个方法并指定键。
    指定分区发送
  
  ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V value);
  
  参数:
topic:要发送消息的主题。
partition:指定的分区号。
key:消息的键(虽然可以指定,但在这种环境下,分区已经过参数直接指定,因此键的分区作用被忽略)。
  
value:消息的值。
  使用场景:当需要确保消息发送到特定分区时,可以使用这个方法。
    带有回调的发送
  
  kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做赔偿处理:(推荐使用以下这种方式)
  1. @GetMapping("/kafka/callbackOne/{message}")
  2. public void sendMessage2(@PathVariable("message") String callbackMessage) {
  3.     kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
  4.         // 消息发送到的topic主题
  5.         String topic = success.getRecordMetadata().topic();
  6.         // 消息发送到的partition分区
  7.         int partition = success.getRecordMetadata().partition();
  8.         // 消息在分区内的offset偏移量
  9.         long offset = success.getRecordMetadata().offset();
  10.         System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
  11.     }, failure -> {
  12.         System.out.println("发送消息失败:" + failure.getMessage());
  13.     });
  14. }
复制代码
  监听器 (用于生产者这边,异步监听生产者消息是否发送成功)
  
  Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,
当消息发送失败我们可以拿到消息举行重试大概把失败消息记载到数据库定时重试。
  1. @Configuration
  2. public class KafkaConfig {
  3.    
  4.     private static final Logger logger = LoggerFactory.getLogger(KafkaConfig.class);
  5.     /**
  6.      * 配置生产者的KafkaTemplate
  7.      * 用于发送消息到Kafka
  8.      */
  9.     @Bean
  10.     public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
  11.         KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
  12.         
  13.         // 添加生产者监听器,用于监控消息发送状态
  14.         template.setProducerListener(new ProducerListener<String, String>() {
  15.             @Override
  16.             public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {
  17.                 logger.info("消息发送成功:topic = {}, partition = {}, offset = {}, value = {}",
  18.                     producerRecord.topic(),
  19.                     recordMetadata.partition(),
  20.                     recordMetadata.offset(),
  21.                     producerRecord.value());
  22.             }
  23.             @Override
  24.             public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {
  25.                 logger.error("消息发送失败:topic = {}, value = {}, error = {}",
  26.                     producerRecord.topic(),
  27.                     producerRecord.value(),
  28.                     exception.getMessage());
  29.             }
  30.         });
  31.         
  32.         return template;
  33.     }
  34. }
复制代码
  注意:当我们发送一条消息,既会走 ListenableFutureCallback 回调,也会走ProducerListener回调。
    自定义分区器
  
  我们知道,kafka中每个topic被分别为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区计谋,Kafka 为我们提供了默认的分区计谋,这些计谋对于数据在Kafka集群中的分布、负载均衡以及性能优化起着关键作用。以下是Kafka中几种常见的分区计谋:
   轮询计谋(Round-Robin Strategy)
    
  

  • 这是Kafka Java生产者API默认提供的分区计谋。
  • 如果没有指定分区计谋,则会默认使用轮询。
  • 轮询计谋按照次序将消息发送到不同的分区,每个消息被发送到其对应分区,按照次序轮询每个分区,以确保每个分区均匀地接收消息。
  • 这种计谋可以或许实现负载均衡,而且可以或许最大限度地利用集群资源。
  按键分配计谋(Key-Based Partitioning): 
   在Kafka中,如果消息指定了key,那么生产者会根据key的哈希值来决定消息应该发送到哪个分区。
这种计谋通过hash(key) % numPartitions来计算分区号,其中numPartitions是主题的总分区数。
如果key相同,那么这些消息会被发送到同一个分区,这有助于保持消息的局部性温次序性。
    Kafka 为我们提供了默认的分区计谋,同时它也支持自定义分区计谋。
其路由机制为: 
  
  若发送消息时指定了分区(即自定义分区计谋),则直接将消息append到指定分区;
  
若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值举行hash计算,根据计算结果路由到指定分区,这种环境下可以保证同一个 Key 的所有消息都进入到相同的分区;
  
patition 和 key 都未指定,则使用kafka默认的分区计谋,轮询选出一个 patition;
  我们来自定义一个分区计谋,将消息发送到我们指定的partition,起首新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区
  1. public class CustomizePartitioner implements Partitioner {
  2.     @Override
  3.     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4.         // 自定义分区规则(这里假设全部发到0号分区)
  5.         // ......
  6.         return 0;
  7.     }
  8.     @Override
  9.     public void close() {
  10.     }
  11.     @Override
  12.     public void configure(Map<String, ?> configs) {
  13.     }
  14. }
复制代码
  在application.propertise中设置自定义分区器,设置的值就是分区器类的全路径名
  1. # 自定义分区器
  2. spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
复制代码
开启事务
   在Spring Kafka中,当您设置了transaction-id-prefix后,Spring会为您的KafkaTemplate设置一个事务性生产者。
事务性生产者会要求它的所有发送操纵都必须在事务的上下文中举行,否则会报错
  
  错误如下:
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
  前提
   需要再设置文件中加transaction-id-prefix设置
来由:
唯一性:Kafka 使用事务 ID 来跟踪和协调跨多个分区和主题的消息发送。
防止存在多个生产者时,无法区分,以是需要通过这个参数来区分不同的生产者的事务。
比如生产者1号,我就给它设置 transaction-id-prefix=tx1_ 生产者2号,我就给它设置 transaction-id-prefix=tx2_
  1. # 所以建议若要开启事务,则不管生产者是一个还是多个,都需要习惯去配置这个事务前缀(非固定命名,任意)
  2. # 事务ID前缀,确保唯一性
  3. spring.kafka.producer.transaction-id-prefix=tx1_
  4. # 还需改动两个参数值
  5. # 事务性生产者时,必须设置一个非零的重试次数
  6. spring.kafka.producer.retries=1  
  7. # 使用幂等性生产者,必须将acks设置为all
  8. spring.kafka.producer.acks=all
复制代码
 事务的上下文的实现
  1. // 方式1:使用 @Transactional注解
  2. @Transactional
  3. @GetMapping("/kafka/transaction")
  4. public void sendMessage7(){
  5.     kafkaTemplate.send("topic1","哇哈哈");
  6.     //throw new RuntimeException("fail");
  7. }
  8. // 方式2:使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
  9. @GetMapping("/kafka/transaction")
  10. public void sendMessage7(){
  11.     // 声明事务:出现报错的话, executeInTransaction 中包裹的 send消息是不会发出去
  12.     kafkaTemplate.executeInTransaction(operations -> {
  13.         operations.send("topic1","111 executeInTransaction");
  14.         operations.send("topic1","222 executeInTransaction");
  15.         return null;
  16.         //throw new RuntimeException("fail");
  17.     });
  18. }
复制代码
 消耗者

   指定topic、partition、offset消耗
  
  前面我们在监听消耗topic1的时候,监听的是topic1上所有的消息。
如果我们想指定topic、指定partition、指定offset来消耗呢?
也很简单,@KafkaListener注解已全部为我们提供
  1. /**
  2. * @Title 指定topic、partition、offset消费
  3. * @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
  4. 属性解释:
  5. ① id:消费者ID
  6. ② groupId:消费组ID(这个配置项是用来指定消费者组ID的,它使得多个消费者实例可以协同工作,共同消费主题中的消息,并实现负载均衡、容错性和扩展性等功能)
  7. ③ topics:监听的topic,可监听多个
  8. ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听
  9. **/
  10. @KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
  11.         @TopicPartition(topic = "topic1", partitions = { "0" }),
  12.         @TopicPartition(topic = "topic2", partitions = "0",
  13.         partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "8"))
  14. })
  15. public void onMessage2(ConsumerRecord<?, ?> record) {
  16.     System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
  17. }
复制代码
  注意:topics和topicPartitions不能同时使用
    批量消耗
   请注意:当Kafka监听器被设置为批量消耗模式时,它接收的消息格式将是一个包罗多个ConsumerRecord 对象的列表。在这种设置下,监听器不再直接支持处理单个ConsumerRecord 对象的方法签名。
  
  设置application.prpertise开启批量消耗即可
  1. # 设置批量消费
  2. spring.kafka.listener.type=batch
  3. # 批量消费每次最多消费多少条消息
  4. spring.kafka.consumer.max-poll-records=50
复制代码
  1. // 接收消息时用List来接收,监听代码如下
  2. @KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
  3. public void onMessage3(List<ConsumerRecord<?, ?>> records) {
  4.     System.out.println(">>>批量消费一次,records.size()="+records.size());
  5.     for (ConsumerRecord<?, ?> record : records) {
  6.         System.out.println(record.value());
  7.     }
  8. }
复制代码
  提问:怎么测试才气有这个批量消耗的体现呢?
  
  好题目。
  
  回答:先把消耗者服务关闭,让生产者那边往同一个分区发多个消息,再把消耗者打开,你就会发现“批量消耗”效果出现了
  “批量消耗”效果如下:(生产者举行for循环了10次发送) 

非常处理 
   通过 ConsumerAwareListenerErrorHandler 非常处理器 ,我们可以处理consumer在消耗时发生的非常。
具体步调:
  新建一个 ConsumerAwareListenerErrorHandler 类型的非常处理方法。
  用@Bean注入,BeanName默认就是方法名。
  然后我们将这个非常处理器的BeanName放到@KafkaListener注解的errorHandler属性内里。
  当监听抛出非常的时候,则会自动调用非常处理器。
  1. import org.apache.kafka.clients.consumer.Consumer;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
  5. import org.springframework.kafka.listener.ListenerExecutionFailedException;
  6. import org.springframework.messaging.Message;
  7. @Configuration
  8. public class KafkaConfig {
  9.     @Bean
  10.     public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
  11.         return new ConsumerAwareListenerErrorHandler() {
  12.             @Override
  13.             public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
  14.                 System.out.println("消费异常:" + message.getPayload());
  15.                 return null;
  16.             }
  17.         };
  18.     }
  19. }
复制代码
   单个消耗加入非常类
  1. @Component
  2. public class KafkaConsumer {
  3.     @KafkaListener(topics = {"topic1"}, errorHandler = "consumerAwareErrorHandler")
  4.     public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
  5.         throw new Exception("简单消费-模拟异常");
  6.     }
  7. }
复制代码
    批量消耗加入非常类
    @Component
public class KafkaConsumer {
      @KafkaListener(topics = "topic1", errorHandler = "consumerAwareErrorHandler")
    public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
        System.out.println("批量消耗一次...");
        throw new Exception("批量消耗-模拟非常");
    }
}
    消息过滤器 
  
  消息过滤器可以在消息抵达 consumer 之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
  
  设置消息过滤只需要为 监听器工厂 设置一个RecordFilterStrategy(消息过滤计谋),返回true的时候消息将会被扬弃,返回false时,消息能正常抵达监听容器。
  
  其中有个设置如果是批量消耗的环境下就要加上,如果不是批量的话就不用加
  
// 启用批量监听功能  
// 配合application.properties中的批量消耗设置使用  
  factory.setBatchListener(true);
  1. /**
  2. * Kafka消息过滤器配置类
  3. * 用于配置消息的过滤规则和批量处理相关的设置
  4. */
  5. @Component
  6. public class KafkaFilterConfig {
  7.    
  8.     /**
  9.      * 注入Kafka消费者工厂
  10.      * 用于创建Kafka消费者实例
  11.      */
  12.     @Autowired
  13.     ConsumerFactory consumerFactory;
  14.     /**
  15.      * 创建并配置Kafka监听器容器工厂
  16.      * 该工厂用于创建处理Kafka消息的监听器容器
  17.      *
  18.      * @return 配置好的监听器容器工厂实例
  19.      */
  20.     @Bean
  21.     public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
  22.         // 创建监听器容器工厂,并指定String类型的key和value
  23.         ConcurrentKafkaListenerContainerFactory<String, String> factory =
  24.             new ConcurrentKafkaListenerContainerFactory<>();
  25.         
  26.         // 设置消费者工厂
  27.         factory.setConsumerFactory(consumerFactory);
  28.         
  29.         // 设置是否自动确认被过滤的消息
  30.         // true表示被过滤的消息将被自动确认,不会重新消费
  31.         factory.setAckDiscarded(true);
  32.         
  33.         // 启用批量监听功能
  34.         // 配合application.properties中的批量消费设置使用
  35.         factory.setBatchListener(true);
  36.         
  37.         /**
  38.          * 设置消息过滤策略
  39.          * 返回true表示消息将被过滤(丢弃)
  40.          * 返回false表示消息将被保留并处理
  41.          */
  42.         factory.setRecordFilterStrategy(consumerRecord -> {
  43.             try {
  44.                 // 获取消息的值并转换为字符串
  45.                 String value = consumerRecord.value();
  46.                 // 尝试将消息转换为整数
  47.                 int intValue = Integer.parseInt(value);
  48.                
  49.                 // 过滤规则:偶数返回true(被过滤),奇数返回false(被保留)
  50.                 // return true 表示消息将被过滤掉
  51.                 // return false 表示消息将被保留并处理
  52.                 return intValue % 2 == 0;
  53.             } catch (NumberFormatException e) {
  54.                 // 如果消息无法转换为整数,则过滤掉该消息
  55.                 // 可以在这里添加日志记录
  56.                 // System.err.println("无法解析消息值为整数: " + consumerRecord.value());
  57.                 return true;
  58.             }
  59.         });
  60.         return factory;
  61.     }
  62. }
复制代码
结果如下 

消息转发 
    
  在实际开发中,我们可能有如许的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
  
  重点:由于使用了转发操纵,相称于消耗者服务这边也是生产者了,以是需要设置生产者相关的设置。
  
  在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
  1. //消息转发 从topic1转发到topic2
  2. @KafkaListener(topics = {"topic1"})
  3. @SendTo("topic2")
  4. public String onMessage7(ConsumerRecord<?, ?> record) {
  5.     return record.value()+"-forward message";
  6. }
  7. @KafkaListener(topics = {"topic2"})
  8. public void onMessage8(ConsumerRecord<?, ?> record) {
  9.     System.out.println("收到topic2转发过来的消息:" + record.value());
  10. }
复制代码
定时启动、停止
    
  默认环境下,当消耗者项目启动的时候,监听器就开始工作,监听消耗发送到指定topic的消息,那如果我们不想让监听器立刻工作,想让它在我们指定的时间点开始工作,大概在我们指定的时间点停止工作,该怎么处理呢——使用 KafkaListenerEndpointRegistry ,下面我们就来实现:
  
  禁止监听器自启动;
  创建两个定时使命,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器
  
  新建一个定时使命类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动
  1. @EnableScheduling
  2. @Component
  3. public class CronTimer {
  4.     /**
  5.      * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
  6.      * 而是会被注册在KafkaListenerEndpointRegistry中,
  7.      * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
  8.      **/
  9.     @Autowired
  10.     private KafkaListenerEndpointRegistry registry;
  11.     @Autowired
  12.     private ConsumerFactory consumerFactory;
  13.     // 监听器容器工厂(设置禁止KafkaListener自启动)
  14.     @Bean
  15.     public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
  16.         ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
  17.         container.setConsumerFactory(consumerFactory);
  18.         //禁止KafkaListener自启动
  19.         container.setAutoStartup(false);
  20.         return container;
  21.     }
  22.     // 监听器
  23.     @KafkaListener(id="timingConsumer",topics = "sb_topic",containerFactory = "delayContainerFactory")
  24.     public void onMessage1(ConsumerRecord<?, ?> record){
  25.         System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
  26.     }
  27.     // 定时启动监听器
  28.     @Scheduled(cron = "0 42 11 * * ? ")
  29.     public void startListener() {
  30.         System.out.println("启动监听器...");
  31.         // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
  32.         if (!registry.getListenerContainer("timingConsumer").isRunning()) {
  33.             registry.getListenerContainer("timingConsumer").start();
  34.         }
  35.         //registry.getListenerContainer("timingConsumer").resume();
  36.     }
  37.     // 定时停止监听器
  38.     @Scheduled(cron = "0 45 11 * * ? ")
  39.     public void shutDownListener() {
  40.         System.out.println("关闭监听器...");
  41.         registry.getListenerContainer("timingConsumer").pause();
  42.     }
  43. }
复制代码
手动确认消息
    手动提交目前有两种模式
  
  MANUAL :对性能要求高(推荐
  MANUAL_IMMEDIATE:对数据一致性要求高
  1. # 关闭自动提交
  2. spring.kafka.consumer.enable-auto-commit=false
  3. # 手动ack模式
  4. spring.kafka.listener.ack-mode=MANUAL
复制代码
  消耗消息的时候,给方法添加 Acknowledgment 参数签收消息,同时执行 acknowledge  方法
  1. @KafkaListener(topics = {"sb_topic"})
  2. public void onMessage9(ConsumerRecord<String, Object> record, Acknowledgment ack) {
  3.     System.out.println("收到消息:" + record.value());
  4.     //确认消息
  5.     ack.acknowledge();
  6. }
复制代码
propertise 与 java 显式设置 的差别 (有坑,请看!!!)
   我在测试中,发现 有些 propertise 设置不见效,有些功能必须在 KafkaConfig 设置 中 java 显式设置,才气见效。
  
  Java 设置 优先级高于 properties 中的设置
   Kafka中生产者、消耗者设置类

生产者 
 
  1. package com.example.springbootkafkacustomer.config;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.apache.kafka.clients.producer.RecordMetadata;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.core.KafkaTemplate;
  9. import org.springframework.kafka.core.ProducerFactory;
  10. import org.springframework.kafka.support.ProducerListener;
  11. import org.springframework.kafka.support.converter.StringJsonMessageConverter;
  12. import org.springframework.kafka.transaction.KafkaTransactionManager;
  13. /**
  14. * Kafka生产者配置类
  15. * 用于配置生产者的高级特性,包括:
  16. * 1. 默认主题 - 配置消息发送的默认目标主题
  17. * 2. 消息转换 - 支持JSON格式的消息转换
  18. * 3. 事务支持 - 确保消息发送的事务性
  19. * 4. 发送监听 - 监控消息发送结果
  20. * 5. 重试队列 - 处理发送失败的消息
  21. * 6. 错误处理 - 统一处理发送异常
  22. * 7. 告警机制 - 发送失败时的告警通知
  23. *
  24. * 每个特性都可以通过功能开关单独控制,便于根据实际需求启用或禁用
  25. *
  26. * 注意:部分功能 需要特定的前置配置才能生效
  27. */
  28. @Configuration
  29. public class KafkaProducerConfig {
  30.    
  31.     private static final Logger logger = LoggerFactory.getLogger(KafkaProducerConfig.class);
  32.     /**
  33.      * 功能开关配置
  34.      */
  35.     private final boolean enableDefaultTopic = true;     // 是否启用默认主题
  36.     private final boolean enableJsonConverter = true;    // 是否启用JSON转换器
  37.     private final boolean enableTransaction = true;      // 是否启用事务支持
  38.     private final boolean enableProducerListener = true; // 是否启用发送结果监听
  39.     private final boolean enableRetryQueue = true;       // 是否启用重试队列
  40.     private final boolean enableAlert = true;           // 是否启用告警机制
  41.     /**
  42.      * 配置KafkaTemplate
  43.      * 用于发送消息到Kafka,支持事务和结果监听
  44.      *
  45.      */
  46.     @Bean
  47.     public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
  48.         KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory);
  49.         
  50.         // 1. 设置默认主题
  51.         // 替代的配置:
  52.         // spring.kafka.template.default-topic=defaultTopic
  53.         // 前置要求:
  54.         // spring.kafka.producer.bootstrap-servers 必须配置生产者服务器
  55.         if (enableDefaultTopic) {
  56.             template.setDefaultTopic("defaultTopic");
  57.         }
  58.         
  59.         // 2. 设置消息转换器
  60.         // 无法通过properties配置实现
  61.         if (enableJsonConverter) {
  62.             template.setMessageConverter(new StringJsonMessageConverter());
  63.         }
  64.         
  65.         // 3. 设置事务支持
  66.         // 替代的配置:
  67.         // spring.kafka.producer.transaction-id-prefix=tx-
  68.         // spring.kafka.producer.enable.idempotence=true
  69.         // spring.kafka.producer.acks=all
  70.         // 前置要求:
  71.         // 1. producerFactory必须支持事务
  72.         // 2. spring.kafka.producer.bootstrap-servers 必须配置
  73.         // 3. 必须配置唯一的transaction-id-prefix
  74.         if (enableTransaction) {
  75.             template.setTransactionIdPrefix("tx-");
  76.         }
  77.         
  78.         // 4. 设置发送结果监听器
  79.         // 无法通过properties配置实现
  80.         // 前置要求:
  81.         // 1. spring.kafka.producer.acks=all 建议配置
  82.         // 2. spring.kafka.producer.retries>0 建议配置
  83.         // 3. spring.kafka.producer.properties.max.in.flight.requests.per.connection=1 建议配置
  84.         if (enableProducerListener) {
  85.             template.setProducerListener(new ProducerListener<String, String>() {
  86.                 @Override
  87.                 public void onSuccess(ProducerRecord<String, String> record, RecordMetadata metadata) {
  88.                     logger.info("消息发送成功:topic={}, partition={}, offset={}, key={}, value={}",
  89.                         metadata.topic(),
  90.                         metadata.partition(),
  91.                         metadata.offset(),
  92.                         record.key(),
  93.                         record.value());
  94.                 }
  95.                 @Override
  96.                 public void onError(ProducerRecord<String, String> record, RecordMetadata recordMetadata, Exception exception) {
  97.                     logger.error("消息发送失败:topic={}, key={}, value={}, error={}",
  98.                         record.topic(),
  99.                         record.key(),
  100.                         record.value(),
  101.                         exception.getMessage());
  102.                     
  103.                     handleSendError(record, exception);
  104.                 }
  105.             });
  106.         }
  107.         
  108.         return template;
  109.     }
  110.     /**
  111.      * 处理发送失败的消息
  112.      * 无法通过properties配置实现
  113.      * 前置要求:
  114.      * 1. spring.kafka.producer.retries 建议配置重试次数
  115.      * 2. spring.kafka.producer.retry.backoff.ms 建议配置重试间隔
  116.      * 3. 如果要将失败消息保存到数据库,需要配置数据源
  117.      * 4. 如果要将失败消息保存到Redis,需要配置Redis连接
  118.      */
  119.     private void handleSendError(ProducerRecord<String, String> record, Exception exception) {
  120.         try {
  121.             // 记录错误日志
  122.             logger.error("处理发送失败的消息:topic={}, key={}, value={}, error={}",
  123.                 record.topic(), record.key(), record.value(), exception.getMessage());
  124.             
  125.             // 保存到重试队列
  126.             if (enableRetryQueue) {
  127.                 saveToRetryQueue(record);
  128.             }
  129.             
  130.             // 发送告警通知
  131.             if (enableAlert) {
  132.                 sendAlert(record, exception);
  133.             }
  134.         } catch (Exception e) {
  135.             logger.error("处理发送失败消息时发生异常", e);
  136.         }
  137.     }
  138.     /**
  139.      * 将消息保存到重试队列
  140.      * 无法通过properties配置实现
  141.      * 前置要求:
  142.      * 1. 如果使用数据库存储:
  143.      *    - 需要配置数据源
  144.      *    - 需要创建对应的数据表
  145.      * 2. 如果使用Redis存储:
  146.      *    - 需要配置Redis连接
  147.      *    - 需要定义数据结构
  148.      * 3. 如果使用Kafka重试主题:
  149.      *    - 需要创建重试主题
  150.      *    - 需要配置重试主题的分区数和副本数
  151.      */
  152.     private void saveToRetryQueue(ProducerRecord<String, String> record) {
  153.         if (!enableRetryQueue) {
  154.             return;
  155.         }
  156.         logger.info("将消息保存到重试队列:{}", record.value());
  157.         // TODO: 实现重试队列逻辑
  158.     }
  159.     /**
  160.      * 发送告警通知
  161.      * 无法通过properties配置实现
  162.      * 前置要求:
  163.      * 1. 邮件告警:
  164.      *    - spring.mail.* 相关配置
  165.      *    - 配置邮件服务器信息
  166.      * 2. 短信告警:
  167.      *    - 配置短信服务商的相关参数
  168.      * 3. 钉钉/企业微信告警:
  169.      *    - 配置webhook地址
  170.      *    - 配置加密密钥
  171.      */
  172.     private void sendAlert(ProducerRecord<String, String> record, Exception exception) {
  173.         logger.warn("发送告警通知:消息={}, 异常={}", record.value(), exception.getMessage());
  174.         // TODO: 实现告警通知逻辑
  175.     }
  176. }
复制代码
消耗者
  1. package com.example.springbootkafkacustomer.config;
  2. import org.apache.kafka.clients.consumer.Consumer;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  9. import org.springframework.kafka.core.ConsumerFactory;
  10. import org.springframework.kafka.core.KafkaTemplate;
  11. import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
  12. import org.springframework.kafka.listener.ContainerProperties;
  13. import org.springframework.kafka.listener.DefaultErrorHandler;
  14. import org.springframework.kafka.listener.ListenerExecutionFailedException;
  15. import org.springframework.messaging.Message;
  16. import org.springframework.util.backoff.FixedBackOff;
  17. /**
  18. * Kafka消费者配置类
  19. * 用于配置消费者的高级特性,包括:
  20. * 1. 手动确认机制 - 控制消息的确认方式,确保消息被正确处理
  21. * 2. 批量消费 - 支持批量接收和处理消息,提高处理效率
  22. * 3. 消息过滤 - 在消费前过滤不需要处理的消息
  23. * 4. 重试机制 - 消费失败时的重试策略
  24. * 5. 异常处理 - 统一处理消费过程中的异常
  25. * 6. 死信队列 - 处理无法正常消费的消息
  26. * 7. 消息转发 - 支持消息处理后转发到其他主题
  27. * 8. 告警机制 - 异常情况的告警通知
  28. *
  29. * 每个特性都可以通过功能开关单独控制,便于根据实际需求启用或禁用
  30. *
  31. * 注意:部分功能 需要特定的前置配置才能生效
  32. */
  33. @Configuration
  34. public class KafkaConsumerConfig {
  35.     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerConfig.class);
  36.     /**
  37.      * 功能开关配置
  38.      */
  39.     private final boolean enableManualAck = true;        // 是否启用手动确认
  40.     private final boolean enableBatchListener = true;    // 是否启用批量监听
  41.     private final boolean enableMessageFilter = true;    // 是否启用消息过滤
  42.     private final boolean enableErrorHandler = true;     // 是否启用错误处理
  43.     private final boolean enableRetry = true;           // 是否启用重试机制
  44.     private final boolean enableForward = true;         // 是否启用消息转发
  45.     private final boolean enableDeadLetter = true;      // 是否启用死信队列
  46.     private final boolean enableAlert = true;           // 是否启用告警机制
  47.     /**
  48.      * 配置消费者监听器工厂
  49.      * 用于创建消费者监听器容器,处理消息的消费逻辑
  50.      *
  51.      */
  52.     @Bean
  53.     public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
  54.             ConsumerFactory<String, String> consumerFactory,
  55.             KafkaTemplate<String, String> kafkaTemplate) {
  56.         ConcurrentKafkaListenerContainerFactory<String, String> factory =
  57.                 new ConcurrentKafkaListenerContainerFactory<>();
  58.         // 基础配置
  59.         factory.setConsumerFactory(consumerFactory);
  60.         // 1. 手动确认配置
  61.         // 替代的配置:
  62.         // spring.kafka.consumer.enable-auto-commit=false
  63.         // spring.kafka.listener.ack-mode=MANUAL
  64.         // 前置要求:
  65.         // spring.kafka.consumer.group-id 必须配置消费者组ID
  66.         if (enableManualAck) {
  67.             factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  68.         }
  69.         // 2. 批量监听配置
  70.         // 替代的配置:
  71.         // spring.kafka.listener.type=batch
  72.         // spring.kafka.consumer.max-poll-records=500
  73.         // 前置要求:
  74.         // spring.kafka.consumer.fetch.min.bytes 建议配置最小抓取大小
  75.         // spring.kafka.consumer.fetch.max.wait.ms 建议配置最大等待时间
  76.         if (enableBatchListener) {
  77.             factory.setBatchListener(true);
  78.         }
  79.         // 3. 消息过滤器配置
  80.         // 无法通过properties配置实现
  81.         // 前置要求:
  82.         // spring.kafka.consumer.group-id 必须配置消费者组ID
  83.         // spring.kafka.consumer.enable-auto-commit=false 建议关闭自动提交
  84.         if (enableMessageFilter) {
  85.             factory.setRecordFilterStrategy(consumerRecord -> {
  86.                 try {
  87.                     String value = consumerRecord.value();
  88.                     if (value == null || value.trim().isEmpty()) {
  89.                         logger.info("过滤掉空消息");
  90.                         return true;
  91.                     }
  92.                     return false;
  93.                 } catch (Exception e) {
  94.                     logger.error("消息过滤发生异常", e);
  95.                     return true;
  96.                 }
  97.             });
  98.         }
  99.         // 4. 错误处理配置
  100.         // 替代的配置:
  101.         // spring.kafka.consumer.properties.retry.backoff.ms=1000
  102.         // spring.kafka.consumer.properties.retries=3
  103.         // 前置要求:
  104.         // spring.kafka.consumer.group-id 必须配置消费者组ID
  105.         // spring.kafka.listener.ack-mode=MANUAL 建议使用手动提交
  106.         if (enableErrorHandler && enableRetry) {
  107.             DefaultErrorHandler errorHandler = new DefaultErrorHandler(
  108.                 new FixedBackOff(1000L, 3));  // 1秒间隔,重试3次
  109.             factory.setCommonErrorHandler(errorHandler);
  110.         }
  111.         // 5. 消息转发配置
  112.         // 无法通过properties配置实现
  113.         // 前置要求:
  114.         // spring.kafka.producer.bootstrap-servers 必须配置生产者服务器
  115.         // spring.kafka.producer.key-serializer 必须配置生产者序列化器
  116.         // spring.kafka.producer.value-serializer 必须配置生产者序列化器
  117.         if (enableForward) {
  118.             factory.setReplyTemplate(kafkaTemplate);
  119.         }
  120.         return factory;
  121.     }
  122.     /**
  123.      * 异常处理器配置
  124.      * 无法通过properties配置实现
  125.      * 前置要求:
  126.      * 1. spring.kafka.consumer.group-id 必须配置消费者组ID
  127.      * 2. spring.kafka.listener.ack-mode=MANUAL 建议使用手动提交
  128.      * 3. 如果需要发送告警邮件,需要配置邮件服务器
  129.      * 4. 如果需要发送告警短信,需要配置短信服务
  130.      */
  131.     @Bean
  132.     @ConditionalOnProperty(name = "kafka.error-handler.enabled", havingValue = "true", matchIfMissing = true)
  133.     public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
  134.         return (Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) -> {
  135.             // 记录错误信息
  136.             logger.error("消费异常:消费者组={}, topic={}, 分区={}, 偏移量={}, 消息内容={}, 异常={}",
  137.                 consumer.groupMetadata().groupId(),
  138.                 message.getHeaders().get("kafka_receivedTopic"),
  139.                 message.getHeaders().get("kafka_receivedPartitionId"),
  140.                 message.getHeaders().get("kafka_offset"),
  141.                 message.getPayload(),
  142.                 exception.getMessage());
  143.             // 消费告警通知
  144.             if (enableAlert) {
  145.                 sendAlert(message, exception);
  146.             }
  147.             // 将失败的消息写入死信队列
  148.             if (enableDeadLetter) {
  149.                 sendToDeadLetter(message);
  150.             }
  151.             return null;
  152.         };
  153.     }
  154.     /**
  155.      * 告警通知配置
  156.      * 无法通过properties配置实现
  157.      * 前置要求:
  158.      * 1. 如果使用邮件告警:
  159.      *    - spring.mail.* 相关配置
  160.      *    - 配置邮件服务器信息
  161.      * 2. 如果使用短信告警:
  162.      *    - 配置短信服务商的相关参数
  163.      * 3. 如果使用钉钉/企业微信告警:
  164.      *    - 配置webhook地址
  165.      *    - 配置加密密钥
  166.      */
  167.     private void sendAlert(Message<?> message, Exception exception) {
  168.         logger.warn("发送告警通知:消息={}, 异常={}", message.getPayload(), exception.getMessage());
  169.     }
  170.     /**
  171.      * 死信队列配置
  172.      * 无法通过properties配置实现
  173.      * 前置要求:
  174.      * 1. 如果使用Kafka死信主题:
  175.      *    - 需要创建死信主题
  176.      *    - 配置死信主题的分区数和副本数
  177.      * 2. 如果使用数据库存储:
  178.      *    - 需要配置数据源
  179.      *    - 需要创建对应的数据表
  180.      * 3. 如果使用Redis存储:
  181.      *    - 需要配置Redis连接
  182.      *    - 需要定义数据结构
  183.      */
  184.     private void sendToDeadLetter(Message<?> message) {
  185.         if (!enableDeadLetter) {
  186.             return;
  187.         }
  188.         logger.info("发送到死信队列:消息={}", message.getPayload());
  189.     }
  190. }
复制代码
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表