基于Spring Boot的RabbitMQ延时队列技术实现

饭宝  论坛元老 | 2025-2-19 14:00:50 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1037|帖子 1037|积分 3111

基于Spring Boot的RabbitMQ延时队列技术实现

延时队列应用场景



  • 订单系统:30分钟未支付订单自动取消
  1. 1. 用户下单 → 发送延时消息(30分钟TTL)
  2. 2. 消息进入普通队列等待
  3. 3. 30分钟后消息过期 → 转入死信队列
  4. 4. 消费者检查订单状态:
  5.    - 未支付 → 执行关闭操作
  6.    - 已支付 → 忽略
复制代码


  • 定时关照:预约提示服务
  1. 场景:会议开始前15分钟提醒
  2. 1. 创建会议时发送延时消息
  3. 2. 消息存活直到会议开始前15分钟
  4. 3. 触发通知服务发送提醒
复制代码


  • 异步重试:失败任务延时重试机制
  1. 消息处理失败时:
  2. 1. 首次失败 → 延时5秒重试
  3. 2. 二次失败 → 延时30秒重试
  4. 3. 三次失败 → 进入死信队列人工处理
复制代码


  • 物流跟踪:预计送达时间状态更新
根本概念

延长消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延长任务:设置在肯定时间之后才执行的任务
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):


  • 消息被拒绝且不重新入队:消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息逾期:消息是一个逾期消息(到达了队列或消息本身设置的逾期时间),超时无人消费
  • 队列到达最大长度:要投递的队列消息堆积满了,最早的消息大概成为死信
假如队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)。


RabbitMQ 本身没有直接的延时队列功能,通常是通过死信队列和**TTL(Time-To-Live)**来实现的。
  1. [生产者] → [普通队列(设置TTL)] → (消息过期)→ [死信队列] → [消费者]
复制代码
实现延时队列

添加依赖

  1. <!-- amqp 依赖 -->
  2. <dependency>
  3.     <groupId>org.springframework.boot</groupId>
  4.     <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <!-- Mybatis-Plus包 -->
  7. <dependency>
  8.     <groupId>com.baomidou</groupId>
  9.     <artifactId>mybatis-plus-boot-starter</artifactId>
  10.     <version>3.5.1</version>
  11. </dependency>
  12. <!-- MySQL驱动包 -->
  13. <dependency>
  14.     <groupId>com.mysql</groupId>
  15.     <artifactId>mysql-connector-j</artifactId>
  16.     <scope>runtime</scope>
  17. </dependency>
  18. <!-- lombok包 -->
  19. <dependency>
  20.     <groupId>org.projectlombok</groupId>
  21.     <artifactId>lombok</artifactId>
  22.     <optional>true</optional>
  23. </dependency>
复制代码
基础设置

  1. server:
  2.   port: 8080
  3. spring:
  4.   datasource:
  5.     driver-class-name: com.mysql.cj.jdbc.Driver
  6.     url: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  7.     username: root
  8.     password: root
  9.   rabbitmq:
  10.     host: 127.0.0.1
  11.     port: 5672
  12.     username: guest
  13.     password: guest
  14. mybatis-plus:
  15.   type-aliases-package: com.hz.pojo #类型别名所在的包
  16.   #控制台打印sql语句
  17.   configuration:
  18.     log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  19.     map-underscore-to-camel-case: false # 驼峰映射
复制代码
死信队列三要素

  • DLX (Dead-Letter-Exchange):死信转发交换机
  • DLK (Dead-Letter-Routing-Key):死信路由键
  • TTL (Time-To-Live):消息存活时间
设置类筹划

  1. @Configuration
  2. public class RabbitMQConfig {
  3.     // 业务交换机
  4.     public static final String BUSINESS_EXCHANGE = "business.exchange";
  5.     // 业务队列
  6.     public static final String BUSINESS_QUEUE = "business.queue";
  7.     // 死信交换机
  8.     public static final String DLX_EXCHANGE = "dlx.exchange";
  9.     // 死信队列
  10.     public static final String DLX_QUEUE = "dlx.queue";
  11.    
  12.     // 业务队列路由键
  13.     private static final String BUSINESS_ROUTING_KEY = "business.key";
  14.     // 死信路由键
  15.     private static final String DLX_ROUTING_KEY = "dlx.key";
  16.     // 声明业务交换机(直连型)
  17.     @Bean
  18.     public DirectExchange businessExchange() {
  19.         return new DirectExchange(BUSINESS_EXCHANGE);
  20.     }
  21.     // 声明死信交换机
  22.     @Bean
  23.     public DirectExchange dlxExchange() {
  24.         return new DirectExchange(DLX_EXCHANGE);
  25.     }
  26.     // 声明业务队列(绑定死信属性)
  27.     @Bean
  28.     public Queue businessQueue() {
  29.         Map<String, Object> args = new HashMap<>();
  30.         args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机
  31.         args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键
  32.         args.put("x-message-ttl", 10000);
  33. // 队列统一TTL(单位:毫秒)
  34.         return new Queue(BUSINESS_QUEUE, true, false, false, args);
  35.     }
  36.     // 声明死信队列
  37.     @Bean
  38.     public Queue dlxQueue() {
  39.         return new Queue(DLX_QUEUE);
  40.     }
  41.     // 绑定业务队列到交换机
  42.     @Bean
  43.     public Binding businessBinding() {
  44.         return BindingBuilder.bind(businessQueue())
  45.                .to(businessExchange())
  46.                .with(BUSINESS_ROUTING_KEY);
  47.     }
  48.     // 绑定死信队列到交换机
  49.     @Bean
  50.     public Binding dlxBinding() {
  51.         return BindingBuilder.bind(dlxQueue())
  52.                .to(dlxExchange())
  53.                .with(DLX_ROUTING_KEY);
  54.     }
  55. }
复制代码
消息生产者

  1. @Service
  2. public class MessageProducer {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     /**
  6.      * 发送延时消息
  7.      * @param message 消息内容
  8.      * @param ttl 单位:秒
  9.      */
  10.     public void sendDelayMessage(String message, int ttl) {
  11.         // 消息属性设置
  12.         MessagePostProcessor processor = message -> {
  13.             message.getMessageProperties()
  14.                    .setExpiration(String.valueOf(ttl * 1000)); // 消息级别TTL
  15.             return message;
  16.         };
  17.         rabbitTemplate.convertAndSend(
  18.             RabbitMQConfig.BUSINESS_EXCHANGE,
  19.             RabbitMQConfig.BUSINESS_ROUTING_KEY,
  20.             message,
  21.             processor
  22.         );
  23.     }
  24. }
复制代码
消息消费者

  1. @Component
  2. public class MessageConsumer {
  3.     @Autowired
  4.     private BillService billService;
  5.     @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
  6.     public void processDelayMessage(String billCode) {
  7.         System.out.println("收到延时消息:" + billCode);
  8.         billService.closeBill(billCode);
  9.         System.out.println("超时未支付,订单已关闭--------------");
  10.     }
  11. }
复制代码
两种TTL设置方式

队列级别TTL
  1. args.put("x-message-ttl", 10000);
复制代码
  队列中所有消息统一逾期时间;消息实际存活时间 = 队列TTL;性能更优(RabbitMQ统一处置惩罚)
  消息级别TTL
  1. message.getMessageProperties().setExpiration("5000");
复制代码
  每个消息可以设置不同TTL;实际存活时间取最小值(队列TTL vs 消息TTL);须要逐个处置惩罚消息,性能开销较大
  订单超时关闭实例


订单服务

  1. @Service
  2. public class BillService {
  3.     @Autowired
  4.     private MessageProducer messageProducer;
  5.     @Resource
  6.     private BillMapper billMapper;
  7.     public void createBill(Bill bill) {
  8.         // 保存订单到数据库
  9.         bill.setIsPayment(1); // 设置初始状态 1:未支付 2:已支付 3:已关闭
  10.         billMapper.insert(bill);
  11.         // 发送延时消息(10s)
  12.         messageProducer.sendDelayMessage(bill.getBillCode(), 10);
  13.     }
  14.    
  15.     public void closeBill(String billCode) {
  16.         Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));
  17.         if (bill != null && bill.getIsPayment() == 1) {
  18.             bill.setIsPayment(3);
  19.             billMapper.updateById(bill);
  20.         }
  21.     }
  22. }
复制代码
消息处置惩罚

  1. @RestController
  2. @RequestMapping("/bill")
  3. public class BillController {
  4.     @Autowired
  5.     private BillService billService;
  6.     @GetMapping("/send")
  7.     public String send(){
  8.         // 创建测试订单
  9.         Bill bill = new Bill();
  10.         bill.setBillCode("BILL2025_999");
  11.         bill.setProductName("可口可乐");
  12.         // 创建账单并发送延时消息
  13.         billService.createBill(bill);
  14.         return "订单创建成功,10秒后未支付将自动关闭。订单号:" + bill.getBillCode();
  15.     }
  16. }
复制代码
流程:

  • 访问 localhost:8080/bill/send 创建测试订单

  • 订单初始状态为待支付(1)

  • 消息经过10秒延长进入死信队列

  • 消费者处置惩罚消息时检查订单状态
  • 若仍为未支付状态,更新为已关闭(3)

延长消息插件

RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange,它允许你发送延长消息而无需设置消息的 TTL 和死信队列。这个插件提供了一个新的交换机类型 x-delayed-message,可以用来实现消息的延长投递。
安装插件

可以从 RabbitMQ 的插件页面下载,大概直接使用以下命令举行安装(假设 RabbitMQ 安装在默认位置):
  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
安装完成后,重启 RabbitMQ 服务。
设置延长交换机

  1. @Bean
  2. public CustomExchange delayedExchange() {
  3.     Map<String, Object> args = new HashMap<>();
  4.     args.put("x-delayed-type", "direct");
  5.     return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
  6. }
  7. // 发送消息时设置延迟头
  8. rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {
  9.     msg.getMessageProperties().setHeader("x-delay", 5000);
  10.     return msg;
  11. });
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

饭宝

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表