IT评测·应用市场-qidao123.com

标题: RabbitMQ——延迟消息的实现 [打印本页]

作者: 民工心事    时间: 2025-3-7 05:18
标题: RabbitMQ——延迟消息的实现
延迟消息功能在消息队列(MQ)中占据紧张职位,它答应消息在指定时间后才被消耗者处置惩罚,实用于须要定时使命或等候特定条件满意的场景。企业利用这一功能可以优化业务流程,如订单超时取消、定时促销等,确保系统相应更加灵活高效。通过延迟消息,企业不但能提升用户体验,还能有用管理资源,制止因即时处置惩罚全部消息而导致的系统过载,增强了系统的可靠性和可扩展性。
  题目:虽然我们利用各种机制尽可能增长了消息的可靠性,但是MQ通知依旧可能失败。
办理方案:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。如许即便支付服务的MQ通知失败,我们依然能通过主动查询来包管订单状态的同等。
场景描画:

用户小李正在一家在线票务平台上预订一张热门演唱会的门票。 在选择盛意仪的座位并确认订单后,系统立即锁定了该座位,确保没有其他用户可以同时购买同一张票。此时,页面提示小李须要在接下来的30分钟内完成支付,否则订单将自动取消,座位也将重新开释给其他用户。
后台系统启动了一个定时使命,专门用于监控全部未支付订单的状态。 这个定时使命每分钟都会检查是否有订单靠近其支付超时时间点。当检测到小李的订单间隔超时只剩下最后几秒钟而仍未完成支付时,系统开始执行一系列预定的操作。
首先,系统会尝试最后一次查询小李的支付状态,以确保没有任何延迟的通知遗漏。假如确认支付确实未完成,那么系统将自动取消该订单,并更新数据库中的订单状态为“已取消”。与此同时,之前被锁定的座位资源也会被立即开释,使其他感爱好的顾客有机会购买这张票。
题目:如何才能正确的实现在下单后第30分钟去检查支付状态呢?
办理方案:RabbitMQ中实现延迟消息
在RabbitMQ中实现延迟消息也有两种方案:
1.死信互换机+TTL
2.延迟消息插件
什么是死信

当一个队列中的消息满意下列情况之一时,可以成为死信(dead letter):
1.消耗者使用basic.reject或 basic.nack声明消耗失败,并且消息的requeue参数设置为false
2.消息是一个逾期消息,超时无人消耗
3.要投递的队列消息满了,无法投递
什么是死信互换机

在消息队列系统中,当一条消息由于未被成功消耗而成为死信,并且原队列设置了dead-letter-exchange属性指向特定的互换机时,这个互换机就被称为死信互换机(Dead Letter Exchange, DLX)。随后,这些死信消息会被投递到DLX,并根据其绑定规则路由到与之关联的队列中。假如有队列通过绑定键与该DLX相连,则死信消息终极会被投递到这个队列,供专门的消耗者进行进一步处置惩罚。
工作流程:

基于死信队列虽然可以实现延迟消息,但是太繁琐了。
所以我推荐使用方案二:延迟消息插件
插件下载地点:
https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq
基于Docker安装,所以须要先查看RabbitMQ的插件目录对应的数据卷。
  1. docker volume inspect mq-plugins
复制代码

接下来执行命令,安装插件:
  1. cd /var/lib/docker/volumes/mq-plugins/_data // 进入目录
  2. docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码

声明延迟互换机

基于注解方式:
  1.     @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "delay.queue"),
  3.             exchange = @Exchange(name = "delay.exchange", delayed = "true"),
  4.             key = "hhh"
  5.     ))
  6.     public void listenDelayQueue(String msg){
  7.         System.out.println("消费者1接收到dix.queue的消息:【" + msg + "】"+
  8.                 LocalTime.now());
  9.     }
复制代码
发送消息时,必须通过x-delay属性设定延迟时间:
  1. @Test
  2. void testSendDelay(){
  3.     // 发送延迟消息到RabbitMQ
  4.     // 使用RabbitTemplate的convertAndSend方法,指定交换机名、路由键和消息内容
  5.     // 通过Lambda表达式设置消息属性,使其延迟5000毫秒
  6.     rabbitTemplate.convertAndSend("delay.exchange", "hhh", "hello delay",
  7.            message -> {
  8.               // 设置消息的延迟时间
  9.               message.getMessageProperties().setDelay(5000);
  10.               // 返回设置完延迟属性的消息
  11.               return message;
  12.            });
  13. }
复制代码
留意:
延迟消息插件内部会维护一个当地数据库表,同时使用Elang Timers功能实现计时。假如消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4