springboot~kafka中延时消息的实现

打印 上一主题 下一主题

主题 941|帖子 941|积分 2823

应用场景


  • 用户下单5分钟后,给他发短信
  • 用户下单30分钟后,如果用户不付款就自动取消订单
kafka无死信队列

kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信队列实现的。
kafka有生产者拦截器

通过对生产者拦截器实现一个TTL的检查,然后再通过类似netty里的延时队列组件来实现消息的延时发送,发到咱们的死信队列里


  • ProducerInterceptorTTL源码
  1. public class ProducerInterceptorTTL implements ProducerInterceptor<Integer, String>, ApplicationContextAware {
  2.         // 消息延时,单位秒
  3.         public static String TTL = "ttl";
  4.         // 死信队列,延时后发送到的队列,我们称为死信队列
  5.         public static String DEAD_TOPIC = "dead_topic";
  6.         // 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取bean
  7.         private static ApplicationContext applicationContext;
  8.         // 时间轮,用于延时发送消息
  9.         private static LindTimeWheel timeWheel = new LindTimeWheel(1000, 8);
  10.         @Override
  11.         public ProducerRecord onSend(ProducerRecord<Integer, String> record) {
  12.                 final String topic = record.topic();
  13.                 final Integer partition = record.partition();
  14.                 final Integer key = record.key();
  15.                 final String value = record.value();
  16.                 final Long timestamp = record.timestamp();
  17.                 final Headers headers = record.headers();
  18.                 long ttl = -1;
  19.                 String deadTopic = null;
  20.                 for (Header header : headers) {
  21.                         if (header.key().equals(TTL)) {
  22.                                 ttl = toLong(header.value());
  23.                         }
  24.                         if (header.key().equals(DEAD_TOPIC)) {
  25.                                 deadTopic = new String(header.value());
  26.                         }
  27.                 }
  28.                 // 消息超时判定
  29.                 if (deadTopic != null && ttl > 0) {
  30.                         // 可以放在死信队列中
  31.                         String finalDeadTopic = deadTopic;
  32.                         long finalTtl = ttl * 1000;
  33.                         timeWheel.addTask(() -> {
  34.                                 System.out.println("消息超时了," + finalTtl + "需要发到topic:" + record.key());
  35.                                 KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
  36.                                 kafkaTemplate.send(finalDeadTopic, record.value());
  37.                         }, finalTtl);
  38.                 }
  39.                 // 拦截器拦下来之后改变原来的消息内容
  40.                 ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp,
  41.                                 key, value, headers);
  42.                 // 传递新的消息
  43.                 return newRecord;
  44.         }
  45.         @Override
  46.         public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
  47.         }
  48.         @Override
  49.         public void close() {
  50.         }
  51.         @Override
  52.         public void configure(Map<String, ?> map) {
  53.         }
  54.         @Override
  55.         public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  56.                 this.applicationContext = applicationContext;
  57.         }
  58. }
复制代码

  • 注册拦截器
  1. spring:
  2.   kafka:
  3.     producer:
  4.       properties:
  5.         interceptor.classes: com.ruoyi.lawyer.delay.ProducerInterceptorTTL
复制代码

  • 延时消息在某个时间段之后会送出


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表