基于Spring Boot的RabbitMQ延时队列技术实现
延时队列应用场景
- 1. 用户下单 → 发送延时消息(30分钟TTL)
- 2. 消息进入普通队列等待
- 3. 30分钟后消息过期 → 转入死信队列
- 4. 消费者检查订单状态:
- - 未支付 → 执行关闭操作
- - 已支付 → 忽略
复制代码
- 场景:会议开始前15分钟提醒
- 1. 创建会议时发送延时消息
- 2. 消息存活直到会议开始前15分钟
- 3. 触发通知服务发送提醒
复制代码
- 消息处理失败时:
- 1. 首次失败 → 延时5秒重试
- 2. 二次失败 → 延时30秒重试
- 3. 三次失败 → 进入死信队列人工处理
复制代码
根本概念
延长消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延长任务:设置在肯定时间之后才执行的任务
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
- 消息被拒绝且不重新入队:消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 消息逾期:消息是一个逾期消息(到达了队列或消息本身设置的逾期时间),超时无人消费
- 队列到达最大长度:要投递的队列消息堆积满了,最早的消息大概成为死信
假如队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead Letter Exchange,简称 DLX)。
RabbitMQ 本身没有直接的延时队列功能,通常是通过死信队列和**TTL(Time-To-Live)**来实现的。
- [生产者] → [普通队列(设置TTL)] → (消息过期)→ [死信队列] → [消费者]
复制代码 实现延时队列
添加依赖
- <!-- amqp 依赖 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <!-- Mybatis-Plus包 -->
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.5.1</version>
- </dependency>
- <!-- MySQL驱动包 -->
- <dependency>
- <groupId>com.mysql</groupId>
- <artifactId>mysql-connector-j</artifactId>
- <scope>runtime</scope>
- </dependency>
- <!-- lombok包 -->
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
复制代码 基础设置
- server:
- port: 8080
- spring:
- datasource:
- driver-class-name: com.mysql.cj.jdbc.Driver
- url: jdbc:mysql://127.0.0.1:3306/smbms?useUnicode=true&characterEncoding=UTF-8&useSSL=false
- username: root
- password: root
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: guest
- password: guest
- mybatis-plus:
- type-aliases-package: com.hz.pojo #类型别名所在的包
- #控制台打印sql语句
- configuration:
- log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
- map-underscore-to-camel-case: false # 驼峰映射
复制代码 死信队列三要素
- DLX (Dead-Letter-Exchange):死信转发交换机
- DLK (Dead-Letter-Routing-Key):死信路由键
- TTL (Time-To-Live):消息存活时间
设置类筹划
- @Configuration
- public class RabbitMQConfig {
- // 业务交换机
- public static final String BUSINESS_EXCHANGE = "business.exchange";
- // 业务队列
- public static final String BUSINESS_QUEUE = "business.queue";
- // 死信交换机
- public static final String DLX_EXCHANGE = "dlx.exchange";
- // 死信队列
- public static final String DLX_QUEUE = "dlx.queue";
-
- // 业务队列路由键
- private static final String BUSINESS_ROUTING_KEY = "business.key";
- // 死信路由键
- private static final String DLX_ROUTING_KEY = "dlx.key";
- // 声明业务交换机(直连型)
- @Bean
- public DirectExchange businessExchange() {
- return new DirectExchange(BUSINESS_EXCHANGE);
- }
- // 声明死信交换机
- @Bean
- public DirectExchange dlxExchange() {
- return new DirectExchange(DLX_EXCHANGE);
- }
- // 声明业务队列(绑定死信属性)
- @Bean
- public Queue businessQueue() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-dead-letter-exchange", DLX_EXCHANGE); // 设置死信交换机
- args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键
- args.put("x-message-ttl", 10000);
- // 队列统一TTL(单位:毫秒)
- return new Queue(BUSINESS_QUEUE, true, false, false, args);
- }
- // 声明死信队列
- @Bean
- public Queue dlxQueue() {
- return new Queue(DLX_QUEUE);
- }
- // 绑定业务队列到交换机
- @Bean
- public Binding businessBinding() {
- return BindingBuilder.bind(businessQueue())
- .to(businessExchange())
- .with(BUSINESS_ROUTING_KEY);
- }
- // 绑定死信队列到交换机
- @Bean
- public Binding dlxBinding() {
- return BindingBuilder.bind(dlxQueue())
- .to(dlxExchange())
- .with(DLX_ROUTING_KEY);
- }
- }
复制代码 消息生产者
- @Service
- public class MessageProducer {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * 发送延时消息
- * @param message 消息内容
- * @param ttl 单位:秒
- */
- public void sendDelayMessage(String message, int ttl) {
- // 消息属性设置
- MessagePostProcessor processor = message -> {
- message.getMessageProperties()
- .setExpiration(String.valueOf(ttl * 1000)); // 消息级别TTL
- return message;
- };
- rabbitTemplate.convertAndSend(
- RabbitMQConfig.BUSINESS_EXCHANGE,
- RabbitMQConfig.BUSINESS_ROUTING_KEY,
- message,
- processor
- );
- }
- }
复制代码 消息消费者
- @Component
- public class MessageConsumer {
- @Autowired
- private BillService billService;
- @RabbitListener(queues = RabbitMQConfig.DLX_QUEUE)
- public void processDelayMessage(String billCode) {
- System.out.println("收到延时消息:" + billCode);
- billService.closeBill(billCode);
- System.out.println("超时未支付,订单已关闭--------------");
- }
- }
复制代码 两种TTL设置方式
队列级别TTL
- args.put("x-message-ttl", 10000);
复制代码 队列中所有消息统一逾期时间;消息实际存活时间 = 队列TTL;性能更优(RabbitMQ统一处置惩罚)
消息级别TTL
- message.getMessageProperties().setExpiration("5000");
复制代码 每个消息可以设置不同TTL;实际存活时间取最小值(队列TTL vs 消息TTL);须要逐个处置惩罚消息,性能开销较大
订单超时关闭实例
订单服务
- @Service
- public class BillService {
- @Autowired
- private MessageProducer messageProducer;
- @Resource
- private BillMapper billMapper;
- public void createBill(Bill bill) {
- // 保存订单到数据库
- bill.setIsPayment(1); // 设置初始状态 1:未支付 2:已支付 3:已关闭
- billMapper.insert(bill);
- // 发送延时消息(10s)
- messageProducer.sendDelayMessage(bill.getBillCode(), 10);
- }
-
- public void closeBill(String billCode) {
- Bill bill = billMapper.selectOne(new QueryWrapper<Bill>().eq("billCode", billCode));
- if (bill != null && bill.getIsPayment() == 1) {
- bill.setIsPayment(3);
- billMapper.updateById(bill);
- }
- }
- }
复制代码 消息处置惩罚
- @RestController
- @RequestMapping("/bill")
- public class BillController {
- @Autowired
- private BillService billService;
- @GetMapping("/send")
- public String send(){
- // 创建测试订单
- Bill bill = new Bill();
- bill.setBillCode("BILL2025_999");
- bill.setProductName("可口可乐");
- // 创建账单并发送延时消息
- billService.createBill(bill);
- return "订单创建成功,10秒后未支付将自动关闭。订单号:" + bill.getBillCode();
- }
- }
复制代码 流程:
- 访问 localhost:8080/bill/send 创建测试订单
- 订单初始状态为待支付(1)
- 消息经过10秒延长进入死信队列
- 消费者处置惩罚消息时检查订单状态
- 若仍为未支付状态,更新为已关闭(3)
延长消息插件
RabbitMQ 提供了官方插件 rabbitmq_delayed_message_exchange,它允许你发送延长消息而无需设置消息的 TTL 和死信队列。这个插件提供了一个新的交换机类型 x-delayed-message,可以用来实现消息的延长投递。
安装插件
可以从 RabbitMQ 的插件页面下载,大概直接使用以下命令举行安装(假设 RabbitMQ 安装在默认位置):
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码 安装完成后,重启 RabbitMQ 服务。
设置延长交换机
- @Bean
- public CustomExchange delayedExchange() {
- Map<String, Object> args = new HashMap<>();
- args.put("x-delayed-type", "direct");
- return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
- }
- // 发送消息时设置延迟头
- rabbitTemplate.convertAndSend("delayed.exchange", "routing.key", message, msg -> {
- msg.getMessageProperties().setHeader("x-delay", 5000);
- return msg;
- });
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |