Spring Boot集成Kafka动态创建消耗者并实现多消耗者发布订阅模子 ...

打印 上一主题 下一主题

主题 957|帖子 957|积分 2871

在Spring Boot集成Kafka时,大家都知道可以使用@KafkaListener注解创建消耗者。但是@KafkaListener注解是静态的,意味着在编译时就已经确定了消耗者,无法动态地创建消耗者。
不过事实上,使用Kafka提供的Java API,使用KafkaConsumer类就可以完成消耗者的动态创建。
我们也知道在一个消耗者组中,同一条消息只会被消耗一次。而动态创建消耗者的情景也通常是满足动态的发布订阅模子(一个发布者,但是大概有不定量的消耗者),以是在这里我们使每个动态创建的消耗者的消耗者组也不一样即可。
下面,我们就来实现一下这个功能。
1,创建消耗者对象

我们可以定义一个“消耗者工厂”类,专门用于创建Kafka消耗者对象,如下:
  1. package com.gitee.swsk33.kafkadynamicconsumer.factory;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  7. import org.springframework.stereotype.Component;
  8. import java.util.Collections;
  9. import java.util.Properties;
  10. @Component
  11. public class KafkaDynamicConsumerFactory {
  12.         @Autowired
  13.         private KafkaProperties kafkaProperties;
  14.         @Value("${spring.kafka.consumer.key-deserializer}")
  15.         private String keyDeSerializerClassName;
  16.         @Value("${spring.kafka.consumer.value-deserializer}")
  17.         private String valueDeSerializerClassName;
  18.         /**
  19.          * 创建一个Kafka消费者
  20.          *
  21.          * @param topic   消费者订阅的话题
  22.          * @param groupId 消费者组名
  23.          * @return 消费者对象
  24.          */
  25.         public <K, V> KafkaConsumer<K, V> createConsumer(String topic, String groupId) throws ClassNotFoundException {
  26.                 Properties consumerProperties = new Properties();
  27.                 // 设定一些关于新的消费者的配置信息
  28.                 consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  29.                 // 设定新的消费者的组名
  30.                 consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  31.                 // 设定反序列化方式
  32.                 consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Class.forName(keyDeSerializerClassName));
  33.                 consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Class.forName(valueDeSerializerClassName));
  34.                 // 新建一个消费者
  35.                 KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerProperties);
  36.                 // 使这个消费者订阅对应话题
  37.                 consumer.subscribe(Collections.singleton(topic));
  38.                 return consumer;
  39.         }
  40. }
复制代码
可见这里我们注入了设置文件中反序列化的设置,并用于新创建的消耗者对象。
2,使用定时任务实现消耗者及时订阅

上面仅仅是创建了消耗者,但是消耗者接收消息以及处置处罚消息的操作,也是需要我们手动定义的。
如何让创建的消耗者都去不绝的接收并处置处罚我们的消息呢?大致思路如下:


  • 使用定时任务,在定时任务中使消耗者不绝地接收并处置处罚消息
  • 与此同时,将每个定时任务和消耗者都存起来,后面在消耗者不需要的时候可以移除它们并关闭定时任务
这里,我们编写一个上下文类,用于存放所有的消耗者定时任务,并编写增加和移除定时任务的方法:
  1. package com.gitee.swsk33.kafkadynamicconsumer.context;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.time.Duration;
  6. import java.util.Map;
  7. import java.util.concurrent.*;
  8. /**
  9. * Kafka消费者任务上下文
  10. */
  11. public class KafkaConsumerContext {
  12.         /**
  13.          * 存放所有自己创建的Kafka消费者任务
  14.          * key: groupId
  15.          * value: kafka消费者任务
  16.          */
  17.         private static final Map<String, KafkaConsumer<?, ?>> consumerMap = new ConcurrentHashMap<>();
  18.         /**
  19.          * 存放所有定时任务的哈希表
  20.          * key: groupId
  21.          * value: 定时任务对象,用于定时执行kafka消费者的消息消费任务
  22.          */
  23.         private static final Map<String, ScheduledFuture<?>> scheduleMap = new ConcurrentHashMap<>();
  24.         /**
  25.          * 任务调度器,用于定时任务
  26.          */
  27.         private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(24);
  28.         /**
  29.          * 添加一个Kafka消费者任务
  30.          *
  31.          * @param groupId  消费者的组名
  32.          * @param consumer 消费者对象
  33.          * @param <K>      消息键类型
  34.          * @param <V>      消息值类型
  35.          */
  36.         public static <K, V> void addConsumerTask(String groupId, KafkaConsumer<K, V> consumer) {
  37.                 // 创建定时任务
  38.                 // 每隔1s拉取消息并处理
  39.                 ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
  40.                         // 拉取消息
  41.                         ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
  42.                         for (ConsumerRecord<K, V> record : records) {
  43.                                 // 自定义处理每次拉取的消息
  44.                                 System.out.println(record.value());
  45.                         }
  46.                 }, 0, 1, TimeUnit.SECONDS);
  47.                 // 将任务和存入对应的列表以后续管理
  48.                 consumerMap.put(groupId, consumer);
  49.                 scheduleMap.put(groupId, future);
  50.         }
  51.         /**
  52.          * 移除Kafka消费者定时任务并关闭消费者订阅
  53.          *
  54.          * @param groupId 消费者的组名
  55.          */
  56.         public static void removeConsumerTask(String groupId) {
  57.                 if (!consumerMap.containsKey(groupId)) {
  58.                         return;
  59.                 }
  60.                 // 取出对应的消费者与任务,并停止
  61.                 KafkaConsumer<?, ?> consumer = consumerMap.get(groupId);
  62.                 ScheduledFuture<?> future = scheduleMap.get(groupId);
  63.                 consumer.close();
  64.                 future.cancel(true);
  65.                 // 移除列表中的消费者和任务
  66.                 consumerMap.remove(groupId);
  67.                 scheduleMap.remove(groupId);
  68.         }
  69. }
复制代码
在增加消耗者定时任务的方法中,调用消耗者对象的poll方法能够拉取一次消息,一次通常大概拉取到多条消息,遍历并处置处罚即可。如许在定时任务中,我们每隔一段时间就拉取一次消息并处置处罚,就实现了消耗者及时订阅消息的效果。
除此之外,在使用定时任务时,即ScheduledExecutorService对象的scheduleAtFixedRate方法,可以实现每隔一定的时间实行一次任务,上述第一个参数传入Runnable接口的实现类,这里使用匿名内部类传入,即自定义的任务,第二个参数是启动延迟时间,第三个参数是每隔多长时间重复实行任务,第四个参数是时间单位。该方法返回一个任务对象,通过这个对象的cancel方法可以取消掉任务。
在移除消耗者定时任务方法中,调用消耗者对象的close方法即可关闭消耗者取消订阅。
3,编写个API测试

现在编写一个API测试一下效果:
  1. package com.gitee.swsk33.kafkadynamicconsumer.api;
  2. import com.gitee.swsk33.kafkadynamicconsumer.context.KafkaConsumerContext;
  3. import com.gitee.swsk33.kafkadynamicconsumer.factory.KafkaDynamicConsumerFactory;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.web.bind.annotation.GetMapping;
  8. import org.springframework.web.bind.annotation.PathVariable;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. /**
  12. * 消息测试api
  13. */
  14. @RestController
  15. @RequestMapping("/api/kafka")
  16. public class KafkaTestAPI {
  17.         @Autowired
  18.         private KafkaTemplate<String, String> kafkaTemplate;
  19.         @Autowired
  20.         private KafkaDynamicConsumerFactory factory;
  21.         @GetMapping("/send")
  22.         public String send() {
  23.                 kafkaTemplate.send("my-topic", "hello!");
  24.                 return "发送完成!";
  25.         }
  26.         @GetMapping("/create/{groupId}")
  27.         public String create(@PathVariable String groupId) throws ClassNotFoundException {
  28.                 // 这里统一使用一个topic
  29.                 KafkaConsumer<String, String> consumer = factory.createConsumer("my-topic", groupId);
  30.                 KafkaConsumerContext.addConsumerTask(groupId, consumer);
  31.                 return "创建成功!";
  32.         }
  33.         @GetMapping("/remove/{groupId}")
  34.         public String remove(@PathVariable String groupId) {
  35.                 KafkaConsumerContext.removeConsumerTask(groupId);
  36.                 return "移除成功!";
  37.         }
  38. }
复制代码
现在依次访问/api/kafka/create/a和/api/kafka/create/b,就创建了两个消耗者,然后访问/api/kafka/send发送消息,结果如下:

可见,两个消耗者都接收到了消息。
4,总结

可见要动态地创建Kafka消耗者,只需创建并设置好Kafka消耗者对象,并使用定时任务使它们一直拉取消息,就可以实现发布订阅的效果。当然,我们要管理好创建的所有的消耗者和定时任务,防止资源浪费。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

千千梦丶琪

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表