自由的羽毛 发表于 2023-8-31 07:37:16

springboot~kafka中延时消息的实现

应用场景


[*]用户下单5分钟后,给他发短信
[*]用户下单30分钟后,如果用户不付款就自动取消订单
kafka无死信队列

kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信队列实现的。
kafka有生产者拦截器

通过对生产者拦截器实现一个TTL的检查,然后再通过类似netty里的延时队列组件来实现消息的延时发送,发到咱们的死信队列里
https://img2023.cnblogs.com/blog/118538/202308/118538-20230822175648225-1791319007.png

[*]ProducerInterceptorTTL源码
public class ProducerInterceptorTTL implements ProducerInterceptor<Integer, String>, ApplicationContextAware {

        // 消息延时,单位秒
        public static String TTL = "ttl";

        // 死信队列,延时后发送到的队列,我们称为死信队列
        public static String DEAD_TOPIC = "dead_topic";

        // 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取bean
        private static ApplicationContext applicationContext;

        // 时间轮,用于延时发送消息
        private static LindTimeWheel timeWheel = new LindTimeWheel(1000, 8);

        @Override
        public ProducerRecord onSend(ProducerRecord<Integer, String> record) {
                final String topic = record.topic();
                final Integer partition = record.partition();
                final Integer key = record.key();
                final String value = record.value();
                final Long timestamp = record.timestamp();
                final Headers headers = record.headers();
                long ttl = -1;
                String deadTopic = null;
                for (Header header : headers) {
                        if (header.key().equals(TTL)) {
                                ttl = toLong(header.value());
                        }
                        if (header.key().equals(DEAD_TOPIC)) {
                                deadTopic = new String(header.value());
                        }
                }
                // 消息超时判定
                if (deadTopic != null && ttl > 0) {
                        // 可以放在死信队列中
                        String finalDeadTopic = deadTopic;
                        long finalTtl = ttl * 1000;
                        timeWheel.addTask(() -> {
                                System.out.println("消息超时了," + finalTtl + "需要发到topic:" + record.key());
                                KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
                                kafkaTemplate.send(finalDeadTopic, record.value());

                        }, finalTtl);
                }
                // 拦截器拦下来之后改变原来的消息内容
                ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(topic, partition, timestamp,
                                key, value, headers);
                // 传递新的消息
                return newRecord;

        }

        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

        }

        @Override
        public void close() {

        }

        @Override
        public void configure(Map<String, ?> map) {

        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
                this.applicationContext = applicationContext;
        }

}

[*]注册拦截器
spring:
kafka:
    producer:
      properties:
      interceptor.classes: com.ruoyi.lawyer.delay.ProducerInterceptorTTL

[*]延时消息在某个时间段之后会送出
https://img2023.cnblogs.com/blog/118538/202308/118538-20230822180049986-2002977981.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: springboot~kafka中延时消息的实现