Kafka系列之如何提高消费者消费速率

打印 上一主题 下一主题

主题 925|帖子 925|积分 2775

前言

在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速率可能决定系统性能瓶颈。
实现方案

为了提高消费者的消费速率,我们可以采取以下步调:


  • 将主题的分区数目增大,如 20,通过concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。
  • 将每次批量拉取消息的数目max.poll.records增大到 500,提高单次处理消息的数目。
  • 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步举行,提高并发处理消息的速率。
  • 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来实行。否则在消息量特殊大的情况下,很可能会因为线程池任务队列满了而丢失数据。
  • 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,淘汰在队列中的等待时间。
  • 在数据上报的时间举行幂等性验证,防止重复上报数据。
  1. @Component
  2. public class OrderConsumer {
  3.         @Resource(name = "execThreadPool")
  4.     private ThreadPoolTaskExecutor execThreadPool;
  5.    
  6.         @KafkaListener(
  7.             id = "record_consumer",
  8.             topics = "record",
  9.             groupId = "g_record_consumer",
  10.             concurrency = "10",
  11.             properties = {"max.poll.interval.ms:300000", "max.poll.records:500"}
  12.     )
  13.     public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {
  14.         execThreadPool.submit(()-> {
  15.                 // 业务逻辑
  16.                 }
  17.         );
  18.         ack.acknowledge();
  19.     }
  20. }
复制代码
ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和实行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用步伐中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor重要特点:


  • 线程池配置: ThreadPoolTaskExecutor 答应你配置核心线程数、最大线程数、队列容量等线程池属性。
  • 线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不须要的线程创建和销毁开销。
  • 线程复用: 线程池中的线程可以被复用,从而淘汰线程创建的开销。
  • 队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待实行。
  • 拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
    RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于界说当线程池已满并且无法接受新任务时,如那边理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝实行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
    在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有差别的拒绝策略:
    AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 非常,表现任务被拒绝实行。
    CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会实行实行被拒绝的任务。
    DiscardPolicy: 这个策略会冷静地丢弃被拒绝的任务,不会产生任何非常。
    DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后实行将新任务添加到队列中。
    除了这些内置的策略,你还可以实现自界说的 RejectedExecutionHandler 接口,以界说特定于你应用步伐需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日记、通知管理员、重试等。
  1. @Configuration
  2. public class ThreadPoolConfig {
  3.         @Bean
  4.         private ThreadPoolTaskExecutor execThreadPool() {
  5.         ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
  6.         pool.setCorePoolSize(50);  // 核心线程数
  7.         pool.setMaxPoolSize(10000);  // 最大线程数
  8.         pool.setQueueCapacity(0);  // 等待队列size
  9.         pool.setKeepAliveSeconds(60);  // 线程最大空闲存活时间
  10.         pool.setWaitForTasksToCompleteOnShutdown(true);
  11.         pool.setAwaitTerminationSeconds(60);  // 程序shutdown时最多等60秒钟让现存任务结束
  12.         pool.setRejectedExecutionHandler(new CallerRunsPolicy());  // 拒绝策略
  13.         return pool;
  14.     }
  15. }
复制代码
通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

汕尾海湾

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表