Kafka批量消耗

打印 上一主题 下一主题

主题 889|帖子 889|积分 2677

在Spring Kafka中,使用@KafkaListener注解处理批量信息时,首先需要开启批量监听模式,并配置相应的consumer参数来控制批量消耗举动。以下是配置和处理批量消息的基本步骤:

  • 配置Kafka消耗者工厂
    设置batchListener属性为true,使@KafkaListener支持批量消耗。
    1. @Bean
    2. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    3.     ConcurrentKafkaListenerContainerFactory<String, String> factory =
    4.             new ConcurrentKafkaListenerContainerFactory<>();
    5.     factory.setConsumerFactory(consumerFactory());
    6.     // 开启批量监听模式
    7.     factory.setBatchListener(true);
    8.     // 其他相关配置,比如并发度、错误处理等
    9.     return factory;
    10. }
    复制代码
  • 配置消耗者参数
    设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG,指定每次poll请求从Kafka服务器获取的最大记录数。并且关闭offset主动提交enable-auto-commit: false
    1. # application.properties 或 application.yml
    2. spring:
    3.   kafka:
    4.     consumer:
    5.       bootstrap-servers: localhost:9092
    6.       group-id: my-group
    7.       max-poll-records: 100
    8.       # 其他配置项,如enable-auto-commit, auto-offset-reset等
    复制代码
  • 编写批量处理方法
    定义一个方法,其参数是一个包含多条消息的列表,@KafkaListener注解下的方法将会接收到批量的消息。
    1. @KafkaListener(topics = "my-topic")
    2. public void processMessages(List<ConsumerRecord<String, String>> records,
    3.                             Acknowledgment acknowledgment) {
    4.     try {
    5.         // 处理批量消息
    6.         for (ConsumerRecord<String, String> record : records) {
    7.             // 对每条消息进行处理
    8.         }
    9.         // 成功处理后手动提交偏移量
    10.         acknowledgment.acknowledge();
    11.     } catch (Exception e) {
    12.         // 错误处理,记录错误,考虑是否重试或者有其他补偿措施
    13.         log.error("Error processing message batch", e);
    14.     }
    15. }
    复制代码
  • 处理异常和偏移量提交
    当批量处理消息时,需要注意的是,一旦消息处理完成且没有错误,应当手动提交偏移量,以确认这些消息已经被乐成消耗。假如有消息处理失败,则大概需要根据业务需求选择差别的策略,比如重新尝试处理整个批次、跳过错误消息或者记录错误信息稍后处理。
通过以上步骤,@KafkaListener就能按照批处理的方式接收并处理Kafka主题中的消息了。
批量消耗Kafka中的消息,然后将这些消息放入队列中,末了利用线程池异步处理这些队列中的消息。这种方式有助于优化资源利用率,尤其是当消息处理逻辑较为耗时或者IO麋集型时,可以有用提拔体系的并行处理本事和吞吐量。
  1. import org.springframework.kafka.annotation.KafkaListener;
  2. import org.springframework.kafka.support.Acknowledgment;
  3. import org.springframework.stereotype.Component;
  4. import org.springframework.util.concurrent.ListenableFuture;
  5. import org.springframework.util.concurrent.ListenableFutureCallback;
  6. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  7. import java.util.List;
  8. import java.util.concurrent.BlockingQueue;
  9. import java.util.concurrent.LinkedBlockingQueue;
  10. @Component
  11. public class BatchMessageProcessor {
  12.     private final ThreadPoolTaskExecutor taskExecutor;
  13.     private final BlockingQueue<ConsumerRecord<String, String>> messageQueue = new LinkedBlockingQueue<>();
  14.     public BatchMessageProcessor(ThreadPoolTaskExecutor taskExecutor) {
  15.         this.taskExecutor = taskExecutor;
  16.     }
  17.     @KafkaListener(topics = "my-topic", batch = true)
  18.     public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment) {
  19.         for (ConsumerRecord<String, String> record : records) {
  20.             // 将消费到的消息放入队列
  21.             messageQueue.offer(record);
  22.         }
  23.         // 异步处理消息队列
  24.         processMessageQueue(acknowledgment);
  25.     }
  26.     private void processMessageQueue(Acknowledgment acknowledgment) {
  27.         List<ConsumerRecord<String, String>> messagesToProcess;
  28.         synchronized (messageQueue) {
  29.             // 从队列中批量取出消息
  30.             messagesToProcess = new ArrayList<>(messageQueue.size());
  31.             messageQueue.drainTo(messagesToProcess, 100); // 假设批量处理100条
  32.         }
  33.         if (!messagesToProcess.isEmpty()) {
  34.             ListenableFuture<?> future = taskExecutor.submit(() -> {
  35.                 for (ConsumerRecord<String, String> record : messagesToProcess) {
  36.                     // 实际处理消息的逻辑
  37.                     processSingleMessage(record);
  38.                 }
  39.                 // 所有消息处理完毕后提交偏移量
  40.                 acknowledgment.acknowledge();
  41.             });
  42.             // 可以添加回调函数,用于处理线程池任务执行后的结果
  43.             future.addCallback(new ListenableFutureCallback<Object>() {
  44.                 @Override
  45.                 public void onSuccess(Object result) {
  46.                     // 处理成功逻辑
  47.                 }
  48.                 @Override
  49.                 public void onFailure(Throwable ex) {
  50.                     // 处理失败逻辑,如日志记录、重试等
  51.                 }
  52.             });
  53.         }
  54.     }
  55.     private void processSingleMessage(ConsumerRecord<String, String> record) {
  56.         // 这里实现单个消息的具体处理逻辑
  57.     }
  58. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

篮之新喜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表