RabbitMQ实现延迟队列

打印 上一主题 下一主题

主题 528|帖子 528|积分 1584

RabbitMQ 实现延迟队列的方式主要有两种: 死信交换机、延迟队列插件
延迟队列的使用场景包罗:
① 延迟发送短信
② 用户下单,如果用户在15分钟内未支付,则主动取消
③ 预约工作集会,20分钟后主动通知全部参会职员
死信交换机

根本概念

当一个队列中的消息满足下列环境之一时,可以成为死信(dead letter)
① 消耗者使用 basic.reject 或 basic.nack 声明消耗失败,并且消息的 requeue 参数设置为 false
② 消息是一个过期消息,超时无人消耗
③ 要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
死信交换机和 RepublishMessageRecoverer 的区别是,死信交换机的消息是通过队列传递给交换机的,而 RepublishMessageRecoverer 是通过消耗者传递给交换机的
给队列绑定死信交换机的方式:
1.给队列设置 dead-letter-exchange 属性,指定一个交换机
2.给队列设置 dead-letter-routing-key 属性,设置死信交换机与死信队列的 RoutingKey
TTL:
TTL,也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消耗,则会变为死信,TTL 超时分为两种环境:
① 消息地点的队列设置了存活时间
② 消息本身设置了存活时间
若设置了 TTL 和死信交换机,就可以让一个消耗者监听死信队列,进而实现消息的延迟投递
如果队列和消息都设置了TTL,那么以较小的值为准
代码实现


下面实现上图的延迟消息:
1.在 consumer 声明死信交换机和队列,编写监听方法
  1. @Component
  2. public class DelayListener {
  3.     @RabbitListener(bindings = @QueueBinding(
  4.             value = @Queue(name = "dl.queue"),
  5.             exchange = @Exchange(name = "dl.direct"),
  6.             key = "dl"
  7.     ))
  8.     public void listenDlQueue(String msg) {
  9.         System.out.println(LocalDateTime.now() + ":收到 dl.queue 的延迟消息" + msg);
  10.     }
  11. }
复制代码
2.在 consumer 声明延迟队列和对应的交换机,在声明队列时配置 ttl、dead-letter-exchange、dead-letter-routing-key 属性
  1. @Configuration
  2. public class DelayConfig {
  3.     @Bean
  4.     public DirectExchange ttlExchange() {
  5.         return new DirectExchange("ttl.direct");
  6.     }
  7.     @Bean
  8.     public Queue ttlQueue() {
  9.         return QueueBuilder
  10.                 .durable("ttl.queue")
  11.                 .ttl(10000) // 10s
  12.                 .deadLetterExchange("dl.direct")
  13.                 .deadLetterRoutingKey("dl")
  14.                 .build();
  15.     }
  16.     @Bean
  17.     public Binding ttlBinding(Queue ttlQueue, DirectExchange ttlExchange) {
  18.         return BindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl");
  19.     }
  20. }
复制代码
3.在 publisher 编写测试类方法,发送消息,并设置超时时间
  1. @SpringBootTest
  2. public class DelayTests {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSendTTLMessage() {
  7.         Message message = MessageBuilder.withBody("ttl message".getBytes(StandardCharsets.UTF_8))
  8.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  9.                 .setExpiration("5000") // 5s
  10.                 .build();
  11.         rabbitTemplate.convertAndSend("ttl.queue", "ttl", message);
  12.         System.out.println(LocalDateTime.now() + ":消息发送完毕");
  13.     }
  14. }
复制代码
测试结果:



可以看到,实际上消息延迟了 5s,原因是如果队列和消息都设置了延迟时间,那么实际的延迟时间是二者的较小值
延迟队列插件

RabbitMQ 的官方也推出了一个延迟队列插件 DelayExchange,需要先去安装这一插件才可以使用。这里对插件的安装不做介绍,主要介绍 SpringAMQP 如何使用延迟队列插件
DelayExchange 的原理是将消息暂存到延迟交换机中,延迟一定时间再发到队列。其本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定 delayed 属性为 true 即可
1.声明延迟交换机,设置 delayed 属性为true
设置的方式有两种:
(1)基于注解方式
  1. @RabbitListener(bindings = @QueueBinding(
  2.         value = @Queue(name = "delay.queue"),
  3.         exchange = @Exchange(name = "delay.direct", delayed = "true"),
  4.         key = "delay"
  5. ))
  6. public void listenDelayQueue(String msg) {
  7.     System.out.println(LocalDateTime.now() + ":收到 delay.queue 的延迟消息" + msg);
  8. }
复制代码
(2)基于配置类的方式
  1. @Configuration
  2. public class DelayConfig {
  3.     @Bean
  4.     public DirectExchange delayExchange() {
  5.         return ExchangeBuilder
  6.                 .directExchange("delay.direct")
  7.                 .delayed()
  8.                 .durable(true)
  9.                 .build();
  10.     }
  11.     @Bean
  12.     public Queue delayQueue() {
  13.         return new Queue("delay.queue");
  14.     }
  15.     @Bean
  16.     public Binding delayBinding(Queue delayQueue, DirectExchange delayExchange) {
  17.         return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay");
  18.     }
  19. }
复制代码
3.在 publisher 中编写测试类方法来发送延迟消息,需要注意的是,这里一定要给消息添加一个 header:x-delay,值为延迟的时间,单位为毫秒
  1. @SpringBootTest
  2. public class DelayTests {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSendDelayMessage() {
  7.         Message message = MessageBuilder.withBody("delay message".getBytes(StandardCharsets.UTF_8))
  8.                 .setHeader("x-delay", 5000) // 5s
  9.                 .build();
  10.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  11.         rabbitTemplate.convertAndSend("delay.direct", "delay", message);
  12.         System.out.println(LocalDateTime.now() + ":消息发送完毕");
  13.     }
  14. }
复制代码
测试结果:



可以看到,消息延迟了 5s 后消耗者才收到。但需要注意的是,如果配置了 ReturnCallback,这里会报错,原因是消息没有立刻路由到队列,因此报错 NO_ROUTE(消息没有到队列),但实际上消息只是延迟发送了而已
所以需要修改 ReturnCallback 的逻辑,否则一旦 ReturnCallback 中设置了消息重发,那么此时的延迟消息每次都会被重发
  1. @Configuration
  2. @Slf4j
  3. public class RabbitConfig implements ApplicationContextAware {
  4.     @Override
  5.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  6.         RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  7.         // 配置 ReturnCallback
  8.         rabbitTemplate.setReturnsCallback((returnedMessage -> {
  9.             if(returnedMessage.getMessage().getMessageProperties().getReceivedDelay() > 0) {
  10.                 // 如果为延迟消息,则直接跳过
  11.                 return;
  12.             }
  13.             log.error("ReturnCallback");
  14.             log.error("消息路由到队列失败");
  15.             log.error("响应码:{}", returnedMessage.getReplyCode());
  16.             log.error("失败原因;{}", returnedMessage.getReplyText());
  17.             log.error("交换机;{}", returnedMessage.getExchange());
  18.             log.error("路由Key:{}", returnedMessage.getRoutingKey());
  19.             log.error("消息:{}", returnedMessage.getMessage());
  20.             // 这里省略消息重发的逻辑
  21.         }));
  22.     }
  23. }
复制代码
总结

RabbitMQ 实现延迟队列的方式主要有两种:死信交换机、延迟队列插件
死信交换机的实现步骤是:
① 声明死信交换机并编写监听方法
② 声明延迟队列和对应的交换机,在声明队列时配置ttl、dead-letter-exchange、dead-letter-routing-key属性
③ 发送消息时,设置超时时间(实际上延迟队列的ttl和消息的超时时间至少有一个设置了就行)
延迟队列插件的实现步骤是:
① 声明一个交换机,设置 delayed 属性为true
② 发送消息时,添加 x-delay 头,值为超时时间

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表