三、kafka消费的全流程

[复制链接]
发表于 2025-6-12 14:43:57 | 显示全部楼层 |阅读模式
五、多线程安全问题

1、多线程安全的定义

使用多线程访问一个资源,这个资源始终都能体现出正确的行为。
不被运行的环境影响、多线程可以交替访问、不需要任何额外的同步和协同。
2、Java实现多线程安全生产者

   这里只是模仿多线程环境下使用生产者发送消息,其实没有做额外的线程安全操纵,就是把生产者当成了一个公共资源,全部线程都可以访问这个生产者。
  kafka默认客户端提供的生产者本身就是线程安全的,因为生产者发送消息只有一步操纵,就是发送消息。只要消息进入消息缓冲区就可以发送给broker,不会出现消息重复发送。
  1. package com.allwe.client.concurrent;
  2. import com.allwe.client.partitioner.MyPartitioner;
  3. import lombok.Data;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.kafka.clients.producer.KafkaProducer;
  6. import org.apache.kafka.clients.producer.ProducerConfig;
  7. import org.apache.kafka.clients.producer.ProducerRecord;
  8. import org.apache.kafka.common.serialization.StringSerializer;
  9. import java.util.Properties;
  10. import java.util.concurrent.CountDownLatch;
  11. import java.util.concurrent.ExecutorService;
  12. import java.util.concurrent.Executors;
  13. /**
  14. * 生产者多线程安全 - 测试demo
  15. *
  16. * @Author: AllWe
  17. * @Date: 2024/09/27/9:30
  18. */
  19. @Data
  20. @Slf4j
  21. public class ConcurrentProducerWorker {
  22.     /**
  23.      * 消息数量
  24.      */
  25.     private static final int RECORD_COUNT = 1000;
  26.     /**
  27.      * 固定线程池 - 线程数等于CPU核数
  28.      */
  29.     private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  30.     /**
  31.      * 发令枪
  32.      */
  33.     private static final CountDownLatch countDownLatch = new CountDownLatch(RECORD_COUNT);
  34.     /**
  35.      * 生产者 - 这里让所有的线程都共享同一个生产者
  36.      */
  37.     private static KafkaProducer<String, String> kafkaProducer;
  38.     /**
  39.      * 类初始化的时候 - 创建生产者实例
  40.      */
  41.     static {
  42.         // 设置属性
  43.         Properties properties = new Properties();
  44.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  45.         properties.put("key.serializer", StringSerializer.class);
  46.         properties.put("value.serializer", StringSerializer.class);
  47.         properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
  48.         kafkaProducer = new KafkaProducer<>(properties);
  49.     }
  50.     /**
  51.      * 启动器
  52.      */
  53.     public static void main(String[] args) {
  54.         try {
  55.             // 循环创建消息
  56.             for (int count = 0; count < RECORD_COUNT; count++) {
  57.                 ProducerRecord<String, String> record = new ProducerRecord<>("topic_6", "allwe", "allwe_" + count);
  58.                 executorService.submit(new ConcurrentProducer(record, kafkaProducer, countDownLatch));
  59.             }
  60.             countDownLatch.await();
  61.         } catch (Exception e) {
  62.             e.printStackTrace();
  63.         } finally {
  64.             // 关闭生产者连接
  65.             kafkaProducer.close();
  66.             // 释放线程池资源
  67.             executorService.shutdown();
  68.         }
  69.     }
  70. }
复制代码
  1. package com.allwe.client.concurrent;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.producer.KafkaProducer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import java.util.concurrent.CountDownLatch;
  7. /**
  8. * 生产者多线程安全 - 测试demo
  9. *
  10. * @Author: AllWe
  11. * @Date: 2024/09/27/9:30
  12. */
  13. @Data
  14. @Slf4j
  15. public class ConcurrentProducer implements Runnable {
  16.     /**
  17.      * 消息体
  18.      */
  19.     private ProducerRecord<String, String> record;
  20.     /**
  21.      * 生产者
  22.      */
  23.     private KafkaProducer<String, String> producer;
  24.     /**
  25.      * 发令枪
  26.      */
  27.     private CountDownLatch countDownLatch;
  28.     public ConcurrentProducer(ProducerRecord<String, String> record, KafkaProducer<String, String> producer, CountDownLatch countDownLatch) {
  29.         this.record = record;
  30.         this.producer = producer;
  31.         this.countDownLatch = countDownLatch;
  32.     }
  33.     @Override
  34.     public void run() {
  35.         try {
  36.             String name = Thread.currentThread().getName();
  37.             producer.send(record, new ConcurrentCallBackImpl(name));
  38.             countDownLatch.countDown();
  39.         } catch (Exception e) {
  40.             e.printStackTrace();
  41.         }
  42.     }
  43. }
复制代码
  1. package com.allwe.client.concurrent;
  2. import cn.hutool.core.util.ObjectUtil;
  3. import org.apache.kafka.clients.producer.Callback;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. /**
  6. * 异步发送消息回调解析器
  7. *
  8. * @Author: AllWe
  9. * @Date: 2024/09/27/9:30
  10. */
  11. public class ConcurrentCallBackImpl implements Callback {
  12.     private String threadName;
  13.     public ConcurrentCallBackImpl(String threadName) {
  14.         this.threadName = threadName;
  15.     }
  16.     @Override
  17.     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  18.         if (ObjectUtil.isNull(e)) {
  19.             // 解析回调元数据
  20.             System.out.println(threadName + "|-offset:" + recordMetadata.offset() + ",partition:" + recordMetadata.partition());
  21.         } else {
  22.             e.printStackTrace();
  23.         }
  24.     }
  25. }
复制代码
3、Java实现多线程安全消费者

   kafka客户端提供的消费者不是多线程安全的,是因为消费者在消费消息的时候,需要有2步操纵:取消息和ACK确认,在多线程场景下可能会出现:
  ① 线程1取到了消息,但是没来得及进行ACK确认。
  ② 线程2进来了,又消费了一次相同的消息。
  ③ 线程2提交ACK确认。
  ④ 线程1提交ACK确认。
  如许就会产生重复消费,这个时候就需要对消费者进行额外处理。
  有两个处理方案:
  ① 给消费过程加锁,但是会降低程序执行效率。
  ② 每一个线程都创建本身的消费者,只消费本身分区内的数据。
  我写的demo是使用第二种办法。
  1. package com.allwe.client.concurrent;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.clients.consumer.ConsumerRecord;
  6. import org.apache.kafka.clients.consumer.ConsumerRecords;
  7. import org.apache.kafka.clients.consumer.KafkaConsumer;
  8. import java.time.Duration;
  9. import java.util.Collections;
  10. import java.util.Properties;
  11. /**
  12. * 线程安全消费者 - 测试demo
  13. *
  14. * @Author: AllWe
  15. * @Date: 2024/09/27/12:19
  16. */
  17. @Data
  18. @Slf4j
  19. public class ConcurrentConsumer implements Runnable {
  20.     /**
  21.      * 消费者配置参数
  22.      */
  23.     private Properties properties;
  24.     /**
  25.      * 群组id
  26.      */
  27.     private String groupId;
  28.     /**
  29.      * 消费主题
  30.      */
  31.     private String topicName;
  32.     /**
  33.      * 消费者实例
  34.      */
  35.     private KafkaConsumer<String, String> consumer;
  36.     public ConcurrentConsumer(Properties properties, String groupId, String topicName) {
  37.         this.properties = properties;
  38.         this.groupId = groupId;
  39.         this.topicName = topicName;
  40.         // 补充配置参数
  41.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  42.         // 创建消费者实例 - 每一个线程都创建自己的消费者,避免共享相同的消费者实例
  43.         consumer = new KafkaConsumer<>(properties);
  44.         // 配置消费主题
  45.         consumer.subscribe(Collections.singleton(topicName));
  46.     }
  47.     @Override
  48.     public void run() {
  49.         try {
  50.             String threadName = Thread.currentThread().getName();
  51.             while (true) {
  52.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  53.                 for (ConsumerRecord<String, String> record : records) {
  54.                     StringBuilder stringBuilder = new StringBuilder(threadName).append("|-");
  55.                     stringBuilder.append("partition:").append(record.partition());
  56.                     stringBuilder.append("offset:").append(record.offset());
  57.                     stringBuilder.append("key:").append(record.key());
  58.                     stringBuilder.append("value:").append(record.value());
  59.                     System.out.println(stringBuilder);
  60.                 }
  61.             }
  62.         } finally {
  63.             consumer.close();
  64.         }
  65.     }
  66. }
复制代码
  1. package com.allwe.client.concurrent;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.ConsumerConfig;
  5. import org.apache.kafka.common.serialization.StringDeserializer;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. /**
  10. * 多线程安全消费者 - 测试demo
  11. *
  12. * @Author: AllWe
  13. * @Date: 2024/09/27/12:34
  14. */
  15. @Data
  16. @Slf4j
  17. public class ConcurrentConsumerWorker {
  18.     /**
  19.      * 消费线程数
  20.      */
  21.     private static final Integer THREAD_COUNT = 2;
  22.     /**
  23.      * 线程池 - 2个线程,别超过目标主题的分区数
  24.      */
  25.     private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
  26.     public static void main(String[] args) {
  27.         // 消费者配置
  28.         Properties properties = new Properties();
  29.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
  30.         properties.put("key.deserializer", StringDeserializer.class);
  31.         properties.put("value.deserializer", StringDeserializer.class);
  32.         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从头开始消费
  33.         for (Integer i = 0; i < THREAD_COUNT; i++) {
  34.             executorService.submit(new ConcurrentConsumer(properties, "allwe01", "topic_6"));
  35.         }
  36.     }
  37. }
复制代码
六、群组协调

1、群主

在每一个群组内部,都有一个【群主】。往往是第一个注册进入群组的消费者承担,它的职责是读取当前群组消费的主题,以及目标主题的分区信息。
群主节点的数据权限高于平凡消费者,它可以获取全部消费者节点对应的分区信息。但是平凡消费者节点只能看见本节点的分区信息。
2、消费者协调器

属于客户端,每个消费者群组内部都有一个消费者协调器,用于获取群主节点保存的分区信息,再协调群组内的其他消费者处理哪些主题和分区。
分配好使命后将配置信息推送给【组协调器】,组协调器再将消息发送给不同的消费者。
当群组内出现某个节点掉线、上线时,消费者协调器也会参与协调。
   1、向【组协调器】发送入组请求。
  2、发起同步组的请求 -- 由群组计算分配策略,确定消费者的分区分别,发送给组协调器。
  3、心跳机制(与组协调器维持)。
  4、提交ACK确认(发起已经提交的消费偏移量的请求)。
  5、主动发起离组请求。
  3、组协调器

属于kafka broker,重要负责以下功能
   1、处理申请加入群组的消费者,而且选举群主。
  2、收到同步组的请求后,触发分区再平衡,同步新的分配方案。
  3、心跳机制(与客户端维持),如果得知哪些客户端掉线了,触发分区再平衡机制。
  4、管理消费者已经消费的偏移量,保存在主题【__consumer_offsets】,默认有50个分区。
  4、新的消费者加入群组的处理流程

   1、消费者客户端启动、重连,都会给组协调器发送一个入组请求(joinGroup请求)。
  2、消费者客户端完成joinGroup后,消费者协调器向组协调器发起同步组请求(SyncGroup请求),获取新的分配方案。
  3、入组后保持心跳(客户端控制参数:max.poll.interval.ms)。
  4、消费者客户端掉线,触发离组处理。
  5、消费者群组的信息存储在哪里

存储在__consumer_offsets文件中,groupName.hashCode() % 50,获取配置文件的编号。
七、分区再平衡

1、功能


   针对单个消费者群组,对群组内的消费者负责的分区进行重新分配。
  1、假设【主题α】有三个分区,分别是①、②、③。
  2、进来两个消费者A、B。A负责分区①,B负责分区②③。
  3、又进来一个消费者C,再平衡监听器就把分区③分配给C。
  4、消费者C掉线,再平衡监听器把分区③分配给A或者B。
  2、Java代码验证分区再平衡

  1. package com.allwe.client.reBalance;
  2. import lombok.Data;
  3. import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.apache.kafka.clients.consumer.OffsetAndMetadata;
  6. import org.apache.kafka.common.TopicPartition;
  7. import java.util.Collection;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. /**
  11. * 分区再均衡处理器
  12. *
  13. * @Author: AllWe
  14. * @Date: 2024/10/17/8:05
  15. */
  16. @Data
  17. public class ReBalanceHandler implements ConsumerRebalanceListener {
  18.     // 记录每个分区的消费偏移量
  19.     public final static ConcurrentHashMap<TopicPartition, Long> partitionOffsetMap = new ConcurrentHashMap<TopicPartition, Long>();
  20.     private final Map<TopicPartition, OffsetAndMetadata> currOffsets;
  21.     private final KafkaConsumer<String, String> consumer;
  22.     public ReBalanceHandler(Map<TopicPartition, OffsetAndMetadata> currOffsets, KafkaConsumer<String, String> consumer) {
  23.         this.currOffsets = currOffsets;
  24.         this.consumer = consumer;
  25.     }
  26.     // 分区再均衡之前
  27.     // 某一个消费者在让出分区之前,需要先将已消费的偏移量提交
  28.     @Override
  29.     public void onPartitionsRevoked(Collection<TopicPartition> collection) {
  30.         // 线程id
  31.         final String id = Thread.currentThread().getId() + "";
  32.         System.out.println(id + "-onPartitionsRevoked参数值为:" + collection);
  33.         System.out.println(id + "-服务器准备分区再均衡,提交偏移量。当前偏移量为:" + currOffsets);
  34.         //我们可以不使用consumer.commitSync(currOffsets);
  35.         //提交偏移量到kafka,由我们自己维护*/
  36.         //开始事务
  37.         //偏移量写入数据库
  38.         System.out.println("分区偏移量表中:" + partitionOffsetMap);
  39.         for (TopicPartition topicPartition : collection) {
  40.             partitionOffsetMap.put(topicPartition, currOffsets.get(topicPartition).offset());
  41.         }
  42.         // 同步提交偏移量,等到成功后再往后执行
  43.         consumer.commitSync(currOffsets);
  44.     }
  45.     // 分区再均衡之后
  46.     // 新的消费者接管分区后,从上一次的偏移量开始消费
  47.     @Override
  48.     public void onPartitionsAssigned(Collection<TopicPartition> collection) {
  49.         // 线程id
  50.         final String threadId = Thread.currentThread().getId() + "";
  51.         System.out.println(threadId + "|-再均衡完成,onPartitionsAssigned参数值为:" + collection);
  52.         System.out.println("分区偏移量表中:" + partitionOffsetMap);
  53.         for (TopicPartition topicPartition : collection) {
  54.             System.out.println(threadId + "-topicPartition" + topicPartition);
  55.             // 取得接管分区之前的偏移量
  56.             Long offset = partitionOffsetMap.get(topicPartition);
  57.             if (offset == null) continue;
  58.             consumer.seek(topicPartition, partitionOffsetMap.get(topicPartition));
  59.         }
  60.     }
  61.     @Override
  62.     public void onPartitionsLost(Collection<TopicPartition> partitions) {
  63.         ConsumerRebalanceListener.super.onPartitionsLost(partitions);
  64.     }
  65. }
复制代码
  1. package com.allwe.client.reBalance;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.kafka.clients.consumer.*;
  5. import org.apache.kafka.common.TopicPartition;
  6. import java.time.Duration;
  7. import java.util.Collections;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. import java.util.Properties;
  11. /**
  12. * 线程安全消费者 - 测试demo
  13. *
  14. * @Author: AllWe
  15. * @Date: 2024/09/27/12:19
  16. */
  17. @Data
  18. @Slf4j
  19. public class ConcurrentConsumer implements Runnable {
  20.     /**
  21.      * 消费者配置参数
  22.      */
  23.     private Properties properties;
  24.     /**
  25.      * 群组id
  26.      */
  27.     private String groupId;
  28.     /**
  29.      * 消费主题
  30.      */
  31.     private String topicName;
  32.     /**
  33.      * 消费者实例
  34.      */
  35.     private KafkaConsumer<String, String> consumer;
  36.     /**
  37.      * 记录分区消费者偏移量
  38.      */
  39.     private final Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<>();
  40.     public ConcurrentConsumer(Properties properties, String groupId, String topicName) {
  41.         this.properties = properties;
  42.         this.groupId = groupId;
  43.         this.topicName = topicName;
  44.         // 补充配置参数
  45.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  46.         // 创建消费者实例 - 每一个线程都创建自己的消费者,避免共享相同的消费者实例
  47.         consumer = new KafkaConsumer<>(properties);
  48.         // 配置消费主题 - 配置再均衡监听器
  49.         consumer.subscribe(Collections.singleton(topicName), new ReBalanceHandler(currOffsets,consumer));
  50.     }
  51.     @Override
  52.     public void run() {
  53.         try {
  54.             String threadName = Thread.currentThread().getName();
  55.             Integer offset = 0;
  56.             while (true) {
  57.                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  58.                 for (ConsumerRecord<String, String> record : records) {
  59.                     StringBuilder stringBuilder = new StringBuilder(threadName).append("|-");
  60.                     stringBuilder.append("partition:").append(record.partition());
  61.                     stringBuilder.append(",offset:").append(record.offset());
  62.                     stringBuilder.append(",key:").append(record.key());
  63.                     stringBuilder.append(",value:").append(record.value());
  64.                     System.out.println(stringBuilder);
  65.                     offset++;
  66.                     currOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(offset, "no"));
  67.                 }
  68.             }
  69.         } finally {
  70.             consumer.close();
  71.         }
  72.     }
  73. }
复制代码
自定义一个再平衡监听器,消费者在订阅接口中指定这个监听器,即可自动执行监听器的使命。
  1. // 配置消费主题 - 配置再均衡监听器
  2. consumer.subscribe(Collections.singleton(topicName), new ReBalanceHandler(currOffsets,consumer));
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

×
登录参与点评抽奖,加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表