订单超时自动取消的技术方案解析及代码实现

尚未崩坏  金牌会员 | 2023-7-18 09:27:56 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 951|帖子 951|积分 2853

前言

订单超时自动取消是电商平台中常见的功能之一,例如在淘宝、京东、拼多多等商城下单后,如果在一定的时间内没有付款,那么订单会自动被取消,是怎么做到的呢?作为技术人员我们应该了解自动取消的原理和实现逻辑,本文将介绍几种常用的技术方案,帮助开发者实现订单超时自动取消的功能。

通过以上图我们可以看到其实超时自动取消的方案有很多,虽然方案多(大多数都是结合延迟队列来实现的),但每个方案都有自己的优缺点,具体场景需要选用合适的方案。
本文我们主要讲解以下几种常用取消方案,其他方案可自行搜索研究。

  • 方案1:定时轮询(quartz实现)
  • 方案2:JDK延迟队列DelayQueue
  • 方案3:时间轮算法(netty的HashedWheelTimer)
  • 方案4:Redis
  • 方案5:RabbitMQ消息队列
方案 1:定时轮询(quartz实现)

方案描述

通过定时任务的方式去轮询扫描数据库表,根据订单有效期来判断订单是否到期,到期则更新订单状态。
这里我们使用quartz作业调度框架来实现定时轮询。
代码

需要添加maven依赖
  1. <dependency>
  2.     <groupId>org.quartz-scheduler</groupId>
  3.     <artifactId>quartz</artifactId>
  4.     <version>2.3.1</version>
  5. </dependency>
复制代码
代码如下:
  1. public class CancelOrderJob implements Job {
  2.     @Override
  3.     public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
  4.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  5.         System.out.println(time + ":扫描订单表超时未付款订单...");
  6.     }
  7. }
  8. public class QuartzJobTest {
  9.     public static void main(String[] args) throws Exception {
  10.         JobDetail jobDetail = JobBuilder.newJob(CancelOrderJob.class).build();
  11.         Trigger trigger = TriggerBuilder.newTrigger()
  12.                 .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(1).repeatForever())
  13.                 .build();
  14.         Scheduler scheduler = new StdSchedulerFactory().getScheduler();
  15.         scheduler.scheduleJob(jobDetail, trigger);
  16.         System.out.println("定时任务开启,每隔1秒执行一次");
  17.         scheduler.start();
  18.     }
  19. }
复制代码
运行结果:
  1. 定时任务开启,每隔1秒执行一次
  2. 2023-06-27 11:53:43:扫描订单表超时未付款订单...
  3. 2023-06-27 11:53:43:扫描订单表超时未付款订单...
  4. 2023-06-27 11:53:44:扫描订单表超时未付款订单...
  5. 2023-06-27 11:53:45:扫描订单表超时未付款订单...
  6. 2023-06-27 11:53:46:扫描订单表超时未付款订单...
  7. 2023-06-27 11:53:47:扫描订单表超时未付款订单...
  8. ...
复制代码
优点

这种方案优点是实现简单,通过quartz框架进行任务调度,无其他依赖,支持集群部署。
缺点

简单粗暴的全表扫描方式对数据库性能影响特别大,可能影响其他正常的业务操作响应时效,另外配置扫描时间间隔也是个问题,配置大了,扫描延迟,影响取消订单的精准时间,在数据量较大的情况下,配置小了影响数据库性能,所以需要根据实际情况进行评估。
方案 2:JDK延迟队列DelayQueue

方案描述

JDK中的DelayQueue可以实现延迟,是一个无界阻塞队列,其实底层使用的是优先级队列PriorityQueue,可以对放入的对象进行排序,对象需要实现Delayed接口,采用阻塞的方式获取数据,也就是相当于延迟时间到了就会获取到数据。
代码
  1. public class CancelOrder implements Delayed {
  2.     private String orderNo;
  3.     private long timeout;
  4.     CancelOrder(String orderNo, long timeout) {
  5.         this.orderNo = orderNo;
  6.         this.timeout = timeout + System.nanoTime();
  7.     }
  8.     public int compareTo(Delayed other) {
  9.         if (other == this) {
  10.             return 0;
  11.         }
  12.         CancelOrder t = (CancelOrder) other;
  13.         long d = (getDelay(TimeUnit.NANOSECONDS) - t.getDelay(TimeUnit.NANOSECONDS));
  14.         return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
  15.     }
  16.     @Override
  17.     public long getDelay(TimeUnit unit) {
  18.         return unit.convert(timeout - System.nanoTime(), TimeUnit.NANOSECONDS);
  19.     }
  20.     @Override
  21.     public String toString() {
  22.         return "CancelOrder{" +
  23.                 "orderNo='" + orderNo + '\'' +
  24.                 ", timeout=" + timeout +
  25.                 '}';
  26.     }
  27. }
复制代码
  1. public class DelayQueueTest {
  2.     public static void main(String[] args) throws Exception {
  3.         DelayQueue<CancelOrder> queue = new DelayQueue<>();
  4.         for (int i = 0; i < 5; i++) {
  5.             // 生成订单,10秒超时
  6.             CancelOrder cancelOrder = new CancelOrder("orderNo100" + i, TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS));
  7.             String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  8.             System.out.println(time + ":生成了订单,10秒有效期,order:" + cancelOrder);
  9.             queue.put(cancelOrder);
  10.             // 每1秒生成一个订单
  11.             Thread.sleep(1000);
  12.         }
  13.         try {
  14.             while (!queue.isEmpty()) {
  15.                 CancelOrder order = queue.take();
  16.                 String timeout = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  17.                 System.out.println(timeout + ":订单超时,order:" + order);
  18.             }
  19.         } catch (InterruptedException e) {
  20.             throw new RuntimeException(e);
  21.         }
  22.     }
  23. }
复制代码
运行结果:
  1. 2023-06-27 18:43:25:生成了订单,10秒有效期,order:CancelOrder{orderNo='orderNo1000', timeout=1030377584852000}
  2. 2023-06-27 18:43:26:生成了订单,10秒有效期,order:CancelOrder{orderNo='orderNo1001', timeout=1030378653717600}
  3. 2023-06-27 18:43:27:生成了订单,10秒有效期,order:CancelOrder{orderNo='orderNo1002', timeout=1030379654276300}
  4. 2023-06-27 18:43:28:生成了订单,10秒有效期,order:CancelOrder{orderNo='orderNo1003', timeout=1030380655228900}
  5. 2023-06-27 18:43:29:生成了订单,10秒有效期,order:CancelOrder{orderNo='orderNo1004', timeout=1030381656177500}
  6. 2023-06-27 18:43:35:订单超时,order:CancelOrder{orderNo='orderNo1000', timeout=1030377584852000}
  7. 2023-06-27 18:43:36:订单超时,order:CancelOrder{orderNo='orderNo1001', timeout=1030378653717600}
  8. 2023-06-27 18:43:37:订单超时,order:CancelOrder{orderNo='orderNo1002', timeout=1030379654276300}
  9. 2023-06-27 18:43:38:订单超时,order:CancelOrder{orderNo='orderNo1003', timeout=1030380655228900}
  10. 2023-06-27 18:43:39:订单超时,order:CancelOrder{orderNo='orderNo1004', timeout=1030381656177500}
复制代码
例子中设置的订单有效期是10秒中,每隔1秒生成一个订单,目的是为了便于观察不同的订单到期时间,可以看到10s后各订单相继超时。
优点

不需要任何第三方依赖,实现非常简单
缺点

数据全部保存在JVM内存中,占用内存,可能会引发内存溢出,另外宕机或重启数据会全部丢失,无法做集群。
方案 3:时间轮算法(netty的HashedWheelTimer)

方案描述

时间轮算法用的是一个环形的数据结构(使用数组实现),每一轮相当于沿着环形走一圈,类似于钟表,可以分成很多格子(秒针一圈分成60格,算法中叫bucket,这个bucket里可以存放任务),然后每个格子有持续的时间间隔(比如秒针一个格子是1秒,也就是走过这一格持续1秒的时间,算法中对应的是tickDuration)。
时间轮算法有多种实现,单轮算法,多轮算法(相当于在单轮上做了循环),分层时间轮算法(类似于水表,有多个表盘共同计算出总水量)
时间轮算法使用一个worker线程,将任务放到计算获得的bucket里,并按指定的时间间隔tickDuration去执行bucket的时间到期任务。
netty4版本中的时间轮结构如下:

图中HashedWheelTimer内部存储使用的是HashedWheelBucket数组,形成一个环形结构,每一个HashedWheelBucket中存储的是HashedWheelTimeout双向链表,在HashedWheelTimeout中存的是TimerTask,就是具体要执行的任务。
假设当前指针指在3上,如有有一个任务2秒后执行,那么会存在5的格子上,如果有一个任务8秒后执行,则会放到3上,转了一圈,这是任务的轮数就加了1。
代码

需要添加maven依赖
  1. <dependency>
  2.     <groupId>io.netty</groupId>
  3.     <artifactId>netty-all</artifactId>
  4.     <version>4.1.24.Final</version>
  5. </dependency>
复制代码
  1. public class CancelOrderTimerTask implements TimerTask {
  2.     private String orderNo;
  3.     public CancelOrderTimerTask(String orderNo) {
  4.         this.orderNo = orderNo;
  5.     }
  6.     @Override
  7.     public void run(Timeout timeout) throws Exception {
  8.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  9.         System.out.println(time + ":处理订单超时,orderNo:" + orderNo);
  10.     }
  11. }
复制代码
  1. public class HashedWheelTimerTest {
  2.     public static void main(String[] argv) {
  3.         /*
  4.         此处使用的HashedWheelTimer构造方法参数解释如下:
  5.         threadFactory:创建线程的工厂
  6.         tickDuration:时间间隔,这里的1,结合后面的TimeUnit.SECONDS,就是走完一格需要1秒时间。
  7.         unit:时间单位,这是里秒
  8.         ticksPerWheel:表示数组的大小,也就是格子的多少
  9.          */
  10.         HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new DefaultThreadFactory("test-thread"), 1, TimeUnit.SECONDS, 60);
  11.         hashedWheelTimer.start();
  12.         CancelOrderTimerTask timerTask0 = new CancelOrderTimerTask("orderNo1000");
  13.         CancelOrderTimerTask timerTask1 = new CancelOrderTimerTask("orderNo1001");
  14.         CancelOrderTimerTask timerTask2 = new CancelOrderTimerTask("orderNo1002");
  15.         CancelOrderTimerTask timerTask3 = new CancelOrderTimerTask("orderNo1003");
  16.         hashedWheelTimer.newTimeout(timerTask0, 0, TimeUnit.SECONDS);
  17.         hashedWheelTimer.newTimeout(timerTask1, 5, TimeUnit.SECONDS);
  18.         hashedWheelTimer.newTimeout(timerTask2, 30, TimeUnit.SECONDS);
  19.         hashedWheelTimer.newTimeout(timerTask3, 70, TimeUnit.SECONDS);
  20.     }
  21. }
复制代码
运行结果:
  1. 2023-06-28 11:43:42:处理订单超时,orderNo:orderNo1000
  2. 2023-06-28 11:43:47:处理订单超时,orderNo:orderNo1001
  3. 2023-06-28 11:44:12:处理订单超时,orderNo:orderNo1002
  4. 2023-06-28 11:44:52:处理订单超时,orderNo:orderNo1003
复制代码
任务均是按指定时间间隔执行的。
优点

精度灵活可控制,执行效率高,延迟时间比DelayQueue队列低。
缺点

同DelayQueue一样,数据全部保存在JVM内存中,占用内存,可能会引发内存溢出,另外宕机或重启数据会全部丢失,大数据量的情况下也会影响延迟精度。
方案 4:Redis

redis有两种方案可以实现延迟,一种是采用轮询有序集合zset,一种是采用key过期监听
方案4.1:定时任务轮询有序集合zset

方案描述

zset是一个有序集合,存储的每个元素都有个score分值,可以把score当做过期时间,按照score排序(默认按照score从小到大排序,降序可使用zrerange命令),再结合使用一个线程轮询该集合即可实现延迟功能。
代码

需要添加maven依赖
  1. <dependency>
  2.     <groupId>redis.clients</groupId>
  3.     <artifactId>jedis</artifactId>
  4.     <version>3.2.0</version>
  5. </dependency>
复制代码
  1. public class CancelOrderRedisTest {
  2.     private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
  3.     public static void main(String[] args) throws Exception {
  4.         // 放入几个元素到zset有序集合里
  5.         new Thread(() -> {
  6.             try {
  7.                 for (int i = 0; i < 5; i++) {
  8.                     String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  9.                     redisClient().zadd("cancel:order:list", System.currentTimeMillis() + (i + 1) * 1000, "orderNo100" + i);
  10.                     System.out.println(time + ":生成订单,订单号:orderNo100" + i + ",有效期:" + (i + 1) + "秒");
  11.                     Thread.sleep(1000);
  12.                 }
  13.             } catch (Exception e) {
  14.                 e.printStackTrace();
  15.             }
  16.         }).start();
  17.         // 开一个线程轮询这个有序集合
  18.         new Thread(() -> {
  19.             Jedis jedis = redisClient();
  20.             while (true) {
  21.                 Set<Tuple> items = jedis.zrangeWithScores("cancel:order:list", 0, 1);
  22.                 if (items == null || items.isEmpty()) {
  23.                     try {
  24.                         Thread.sleep(100);
  25.                     } catch (InterruptedException e) {
  26.                         e.printStackTrace();
  27.                     }
  28.                 } else {
  29.                     Tuple tuple = (Tuple) items.toArray()[0];
  30.                     long score = (long) tuple.getScore();
  31.                     if (System.currentTimeMillis() >= score) {
  32.                         Long num = jedis.zrem("cancel:order:list", tuple.getElement());
  33.                         if (num != null && num > 0) {
  34.                             String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  35.                             System.out.println(time + ":订单号:" + tuple.getElement() + "已到期");
  36.                         }
  37.                     }
  38.                 }
  39.             }
  40.         }).start();
  41.     }
  42.     /**
  43.      * 获取redis连接
  44.      *
  45.      * @return
  46.      */
  47.     public static Jedis redisClient() {
  48.         return jedisPool.getResource();
  49.     }
  50. }
复制代码
运行结果:
  1. 2023-06-28 12:41:31:生成订单,订单号:orderNo1000,有效期:1秒
  2. 2023-06-28 12:41:32:订单号:orderNo1000已到期
  3. 2023-06-28 12:41:32:生成订单,订单号:orderNo1001,有效期:2秒
  4. 2023-06-28 12:41:33:生成订单,订单号:orderNo1002,有效期:3秒
  5. 2023-06-28 12:41:34:订单号:orderNo1001已到期
  6. 2023-06-28 12:41:34:生成订单,订单号:orderNo1003,有效期:4秒
  7. 2023-06-28 12:41:35:生成订单,订单号:orderNo1004,有效期:5秒
  8. 2023-06-28 12:41:36:订单号:orderNo1002已到期
  9. 2023-06-28 12:41:38:订单号:orderNo1003已到期
  10. 2023-06-28 12:41:40:订单号:orderNo1004已到期
复制代码
优点

实现简单,redis内存操作,速度快,性能高,集群扩展方便,可存储大量订单数据,持久化机制使得故障时通过AOF或RDB方式恢复,适合对延迟精度要求不高的业务场景
缺点

轮询线程如果不带休眠或休眠时间短,可能导致空轮询,CPU飙高,带休眠时间,休眠多久不好评估,休眠时间过长可能导致延迟不准确。另外处理消息异常时可能要实现重试机制,还有一个就是可靠性问题,比如是先删数据在处理订单还是先处理订单再删除数据,处理异常时可能会导致数据丢失。
方案4.2:Redis key过期监听

方案描述

过期监听机制是redis在2.8版本以上提供的功能,如果key失效后,redis会给客户端发送消息即pub/sub机制,从而实现延迟方案。
以windows系统的redis为例。
在redis安装目录的redis.windows.conf文件中找到“notify-keyspace-events”如果被注释则放开,将这行配成如下所示:
  1. notify-keyspace-events Ex
复制代码
然后再启动(或重启)redis。
注意:windows系统下,直接使用redis-server.exe,不会加载redis.windows.conf这个配置文件,需要用命令行启动。命令行进入redis安装目录,执行命令:redis-server.exe redis.windows.conf

代码
  1. public class RedisKeyExpireTest {
  2.     private static JedisPool jedisPool = new JedisPool("127.0.0.1", 6379);
  3.     public static void main(String[] args) throws InterruptedException {
  4.         // subscribe方法会阻塞等待,用异步去初始化订阅监听事件
  5.         new Thread(() -> {
  6.             jedisPool.getResource().subscribe(new RedisSub(), "__keyevent@0__:expired");
  7.         }).start();
  8.         // 添加几个带过期时间的key
  9.         new Thread(() -> {
  10.             try {
  11.                 for (int i = 0; i < 5; i++) {
  12.                     String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  13.                     jedisPool.getResource().setex("orderNo100" + i, i + 1, "orderNo100" + i);
  14.                     System.out.println(time + ":生成订单,订单号:orderNo100" + i + ",有效期:" + (i + 1) + "秒");
  15.                     Thread.sleep(1000);
  16.                 }
  17.             } catch (Exception e) {
  18.                 e.printStackTrace();
  19.             }
  20.         }).start();
  21.     }
  22. }
  23. class RedisSub extends JedisPubSub {
  24.     @Override
  25.     public void onMessage(String channel, String message) {
  26.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  27.         System.out.println(time + ":订单号:" + message + "已到期");
  28.     }
  29. }
复制代码
运行结果:
  1. 2023-06-28 12:56:17:生成订单,订单号:orderNo1000,有效期:1秒
  2. 2023-06-28 12:56:18:生成订单,订单号:orderNo1001,有效期:2秒
  3. 2023-06-28 12:56:18:订单号:orderNo1000已到期
  4. 2023-06-28 12:56:19:生成订单,订单号:orderNo1002,有效期:3秒
  5. 2023-06-28 12:56:20:生成订单,订单号:orderNo1003,有效期:4秒
  6. 2023-06-28 12:56:20:订单号:orderNo1001已到期
  7. 2023-06-28 12:56:21:生成订单,订单号:orderNo1004,有效期:5秒
  8. 2023-06-28 12:56:22:订单号:orderNo1002已到期
  9. 2023-06-28 12:56:24:订单号:orderNo1003已到期
  10. 2023-06-28 12:56:26:订单号:orderNo1004已到期
复制代码
通过redis的key过期监听实现了延迟功能,需要开启redis服务器的key过期监听配置。
优点

实现简单,redis内存操作,速度快,性能高,集群扩展方便,可存储大量订单数据,持久化机制使得故障时通过AOF或RDB方式恢复,适合对延迟精度要求不高的业务场景
缺点

redis的key过期有惰性清除和定时清除两种策略,可能会存在延迟时间不精确的问题,另外redis的pub/sub 机制是不可靠的,如果客户端故障或重启期间有key过期则过期通知事件的数据就丢失了,从而订单无法过期,可以通过补偿机制配合使用,定时任务去做轮询补偿。
方案 5:RabbitMQ消息队列

方案5.1:消息TTL+死信队列

RabbitMQ 可以针对 Queue 和 Message 设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为 dead letter
RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了 deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能。
方案5.2:RabbitMQ延迟队列插件

在这里下载RabbitMQ对应的插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases,本文使用的RabbitMQ版本是3.8.17,所以找到对应的版本下载的是ez结尾的文件,直接放到RabbitMQ的插件目录plugins即可,位置:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.17/plugins,然后执行下面的命令使插件生效(若不生效,需要重启RabbitMQ)。
  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
查看是否生效(rabbitmq-plugins list命令是查看所有的插件):
  1. rabbitmq-plugins list | grep delayed
复制代码
显示如下表示已启动。

代码

采用springboot集成rabbitMQ实现,代码如下:

  • 配置类
  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     /**
  10.      * 正常交换机名称
  11.      */
  12.     public static final String EXCHANGE_NAME_NORMAL_DIRECT = "exchange.normal.direct";
  13.     /**
  14.      * 消息不带ttl队列名称
  15.      */
  16.     public static final String QUEUE_NAME_WITHOUT_TTL = "queue.without.ttl";
  17.     /**
  18.      * 消息带ttl队列名称
  19.      */
  20.     public static final String QUEUE_NAME_WITH_TTL = "queue.with.ttl";
  21.     /**
  22.      * 消息不带ttl消息路由
  23.      */
  24.     public static final String ROUTING_KEY_WITHOUT_TTL = "routing.without.ttl.*";
  25.     /**
  26.      * 消息带ttl消息路由
  27.      */
  28.     public static final String ROUTING_KEY_WITH_TTL = "routing.with.ttl.*";
  29.     /**
  30.      * 死信交换机名称
  31.      */
  32.     public static final String EXCHANGE_NAME_DEADLETTER_DIRECT = "exchange.deadLetter.direct";
  33.     /**
  34.      * 死信队列名称
  35.      */
  36.     public static final String QUEUE_NAME_DEAD_LETTER = "queue.deadLetter";
  37.     /**
  38.      * 死信队列消息路由
  39.      */
  40.     public static final String ROUTING_KEY_DEAD_LETTER = "routing.deadletter.*";
  41.     /**
  42.      * rabbitMQ插件实现的延迟队列-队列名称
  43.      */
  44.     public static final String QUEUE_NAME_PLUGIN = "queue.plugin";
  45.     /**
  46.      * rabbitMQ插件实现的延迟队列-交换机名称
  47.      */
  48.     public static final String EXCHANGE_NAME_PLUGIN = "exchange.customexchange.plugin";
  49.     /**
  50.      * rabbitMQ插件实现的延迟队列-路由名称
  51.      */
  52.     public static final String ROUTING_KEY_PLUGIN = "routing.plugin.*";
  53.     /**
  54.      * 正常交换机
  55.      *
  56.      * @return
  57.      */
  58.     @Bean("normalExchange")
  59.     public DirectExchange normalExchange() {
  60.         return new DirectExchange(EXCHANGE_NAME_NORMAL_DIRECT);
  61.     }
  62.     /**
  63.      * 死信交换机
  64.      *
  65.      * @return
  66.      */
  67.     @Bean("deadLetterExchange")
  68.     public DirectExchange deadLetterExchange() {
  69.         return new DirectExchange(EXCHANGE_NAME_DEADLETTER_DIRECT);
  70.     }
  71.     /**
  72.      * rabbitMQ插件实现的延迟队列-自定义的交换机
  73.      *
  74.      * @return
  75.      */
  76.     @Bean
  77.     public CustomExchange customExchange() {
  78.         Map<String, Object> args = new HashMap<>();
  79.         args.put("x-delayed-type", "direct");
  80.         return new CustomExchange(EXCHANGE_NAME_PLUGIN, "x-delayed-message", true, false, args);
  81.     }
  82.     /**
  83.      * 消息不带ttl队列并设置死信交换机
  84.      *
  85.      * @return
  86.      */
  87.     @Bean("withOutttlQueue")
  88.     public Queue withOutttlQueue() {
  89.         Map<String, Object> args = new HashMap<>(3);
  90.         args.put("x-dead-letter-exchange", EXCHANGE_NAME_DEADLETTER_DIRECT);
  91.         args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_LETTER);
  92.         args.put("x-message-ttl", 10000);
  93.         return QueueBuilder.durable(QUEUE_NAME_WITHOUT_TTL).withArguments(args).build();
  94.     }
  95.     /**
  96.      * 消息带ttl队列并设置死信交换机
  97.      *
  98.      * @return
  99.      */
  100.     @Bean("withttlQueue")
  101.     public Queue withttlQueue() {
  102.         Map<String, Object> args = new HashMap<>(3);
  103.         args.put("x-dead-letter-exchange", EXCHANGE_NAME_DEADLETTER_DIRECT);
  104.         args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_LETTER);
  105.         return QueueBuilder.durable(QUEUE_NAME_WITH_TTL).withArguments(args).build();
  106.     }
  107.     /**
  108.      * 死信队列
  109.      *
  110.      * @return
  111.      */
  112.     @Bean("deadLetterQueue")
  113.     public Queue deadLetterQueue() {
  114.         return new Queue(QUEUE_NAME_DEAD_LETTER);
  115.     }
  116.     /**
  117.      * rabbitMQ插件实现的延迟队列-队列
  118.      *
  119.      * @return
  120.      */
  121.     @Bean
  122.     public Queue pluginQueue() {
  123.         return new Queue(QUEUE_NAME_PLUGIN);
  124.     }
  125.     /**
  126.      * 消息不带ttl队列与正常交换机绑定
  127.      *
  128.      * @param queue
  129.      * @param exchange
  130.      * @return
  131.      */
  132.     @Bean
  133.     public Binding withoutttlQueueBinding(@Qualifier("withOutttlQueue") Queue queue,
  134.                                           @Qualifier("normalExchange") DirectExchange exchange) {
  135.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_WITHOUT_TTL);
  136.     }
  137.     /**
  138.      * 消息带ttl队列与正常交换机绑定
  139.      *
  140.      * @param queue
  141.      * @param exchange
  142.      * @return
  143.      */
  144.     @Bean
  145.     public Binding withttlQueueBinding(@Qualifier("withttlQueue") Queue queue,
  146.                                        @Qualifier("normalExchange") DirectExchange exchange) {
  147.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_WITH_TTL);
  148.     }
  149.     /**
  150.      * 死信队列与死信交换机绑定
  151.      *
  152.      * @param queue
  153.      * @param exchange
  154.      * @return
  155.      */
  156.     @Bean
  157.     public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
  158.                                      @Qualifier("deadLetterExchange") DirectExchange exchange) {
  159.         return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_DEAD_LETTER);
  160.     }
  161.     /**
  162.      * rabbitMQ插件实现的延迟队列-队列绑定交换机
  163.      *
  164.      * @param queue
  165.      * @param customExchange
  166.      * @return
  167.      */
  168.     @Bean
  169.     public Binding pluginBinding(@Qualifier("pluginQueue") Queue queue,
  170.                                  @Qualifier("customExchange") CustomExchange customExchange) {
  171.         return BindingBuilder.bind(queue).to(customExchange).with(ROUTING_KEY_PLUGIN).noargs();
  172.     }
  173. }
复制代码

  • 消息生产者
  1. import org.springframework.amqp.core.MessagePostProcessor;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. import static com.star95.springcloud.rabbitmq.cancelorder.RabbitMQConfig.*;
  6. @Component
  7. public class MessageSender {
  8.     @Autowired
  9.     private RabbitTemplate rabbitTemplate;
  10.     /**
  11.      * 发送带自定义ttl的消息
  12.      *
  13.      * @param msg
  14.      * @param ttl
  15.      */
  16.     public void sendMsgWithTtl(String msg, String ttl) {
  17.         MessagePostProcessor messagePostProcessor = message -> {
  18.             message.getMessageProperties().setExpiration(ttl);//设置消息过期时间
  19.             return message;
  20.         };
  21.         rabbitTemplate.convertAndSend(EXCHANGE_NAME_NORMAL_DIRECT, ROUTING_KEY_WITH_TTL, msg, messagePostProcessor);
  22.     }
  23.     /**
  24.      * 发送公共ttl的消息
  25.      *
  26.      * @param msg
  27.      */
  28.     public void sendMsgWithOutTtl(String msg) {
  29.         rabbitTemplate.convertAndSend(EXCHANGE_NAME_NORMAL_DIRECT, ROUTING_KEY_WITHOUT_TTL, msg);
  30.     }
  31.     /**
  32.      * 使用rabbitmq插件实现的延迟队列,发送带自定义ttl的消息
  33.      * @param msg
  34.      * @param ttl
  35.      */
  36.     public void sendMsgWithPlugin(String msg, Integer ttl) {
  37.         rabbitTemplate.convertAndSend(EXCHANGE_NAME_PLUGIN, ROUTING_KEY_PLUGIN, msg, a -> {
  38.             a.getMessageProperties().setDelay(ttl);
  39.             return a;
  40.         });
  41.     }
  42. }
复制代码

  • 消息消费者
  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.io.IOException;
  7. import java.time.LocalDateTime;
  8. import java.time.format.DateTimeFormatter;
  9. import static com.star95.springcloud.rabbitmq.cancelorder.RabbitMQConfig.QUEUE_NAME_DEAD_LETTER;
  10. import static com.star95.springcloud.rabbitmq.cancelorder.RabbitMQConfig.QUEUE_NAME_PLUGIN;
  11. @Slf4j
  12. @Component
  13. public class QueueConsumer {
  14.     /**
  15.      * 不带ttl队列消费消息
  16.      *
  17.      * @param message
  18.      * @param channel
  19.      * @throws IOException
  20.      */
  21.     // @RabbitListener(queues = QUEUE_NAME_WITHOUT_TTL)
  22.     public void withoutttlQueueReceive(Message message, Channel channel) throws IOException {
  23.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  24.         String msg = new String(message.getBody());
  25.         log.info("当前时间:{},公共ttl队列消费的消息内容:{}", time, msg);
  26.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  27.     }
  28.     /**
  29.      * 带ttl队列消费消息
  30.      *
  31.      * @param message
  32.      * @param channel
  33.      * @throws IOException
  34.      */
  35.     // @RabbitListener(queues = QUEUE_NAME_WITH_TTL)
  36.     public void withttlQueueReceive(Message message, Channel channel) throws IOException {
  37.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  38.         String msg = new String(message.getBody());
  39.         log.info("当前时间:{},自定义ttl队列消费的消息内容:{}", time, msg);
  40.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
  41.     }
  42.     /**
  43.      * 死信队列消费消息
  44.      *
  45.      * @param message
  46.      * @param channel
  47.      * @throws IOException
  48.      */
  49.     @RabbitListener(queues = QUEUE_NAME_DEAD_LETTER)
  50.     public void deadQueueReceive(Message message, Channel channel) throws IOException {
  51.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  52.         String msg = new String(message.getBody());
  53.         log.info("当前时间:{},死信队列消费的消息内容:{}", time, msg);
  54.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  55.     }
  56.     /**
  57.      * 使用rabbitmq插件实现延迟队列消费消息
  58.      *
  59.      * @param message
  60.      * @param channel
  61.      * @throws IOException
  62.      */
  63.     @RabbitListener(queues = QUEUE_NAME_PLUGIN)
  64.     public void pluginQueueReceive(Message message, Channel channel) throws IOException {
  65.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  66.         String msg = new String(message.getBody());
  67.         log.info("当前时间:{},使用rabbitmq插件实现延迟队列,消费的消息内容:{}", time, msg);
  68.         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  69.     }
  70. }
复制代码

  • controller发送消息入口
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.RequestMapping;
  4. import org.springframework.web.bind.annotation.RestController;
  5. import java.time.LocalDateTime;
  6. import java.time.format.DateTimeFormatter;
  7. @Slf4j
  8. @RequestMapping("cancelorder")
  9. @RestController
  10. public class RabbitMQMsgController {
  11.     @Autowired
  12.     private MessageSender sender;
  13.     @RequestMapping("/msgwithttl")
  14.     public void msgWithttl(String msg, String ttl) {
  15.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  16.         log.info("当前时间:{},创建带自定义ttl消息,msg:{},ttl:{}", time, msg, ttl);
  17.         sender.sendMsgWithTtl(msg, ttl);
  18.     }
  19.     @RequestMapping("/msgwithoutttl")
  20.     public void msgWithoutttl(String msg) {
  21.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  22.         log.info("当前时间:{},创建公共ttl消息,msg:{}", time, msg);
  23.         sender.sendMsgWithOutTtl(msg);
  24.     }
  25.     @RequestMapping("msgwithplugin")
  26.     public void msgWithPlugin(String msg, Integer ttl) {
  27.         String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  28.         log.info("当前时间:{},使用rabbitmq插件实现延迟队列,发送的消息,msg:{},ttl:{}", time, msg, ttl);
  29.         sender.sendMsgWithPlugin(msg, ttl);
  30.     }
  31. }
复制代码
注意:@RabbitListener(queues = QUEUE_NAME_WITHOUT_TTL),@RabbitListener(queues = QUEUE_NAME_WITH_TTL),这两个是注释掉的,如果不注释消息就被正常消费了,注释掉就可以测试消息到期会进入死信队列实现延迟队列的功能。


  • 使用公共ttl的接口发送两条消息
http://127.0.0.1:8080/cancelorder/msgwithoutttl?msg=order123
http://127.0.0.1:8080/cancelorder/msgwithoutttl?msg=order321
结果:
  1. 当前时间:2023-06-27 14:45:58,创建公共ttl消息,msg:order123
  2. 当前时间:2023-06-27 14:45:00,创建公共ttl消息,msg:order321
  3. 当前时间:2023-06-27 14:45:08,死信队列消费的消息内容:order123
  4. 当前时间:2023-06-27 14:45:10,死信队列消费的消息内容:order321
复制代码
不管是两条请求谁先执行,结果均是在默认的10秒后过期,结果正常,这种适用于消息具有同一过期时间的场景。

  • 使用自定义ttl的接口发送两条消息
http://127.0.0.1:8080/cancelorder/msgwithttl?msg=order321&ttl=5000(先执行)
http://127.0.0.1:8080/cancelorder/msgwithttl?msg=order123&ttl=20000(后执行)
结果:
  1. 当前时间:2023-06-27 14:51:40,创建带自定义ttl消息,msg:order321,ttl:5000
  2. 当前时间:2023-06-27 14:51:45,死信队列消费的消息内容:order321
  3. 当前时间:2023-06-27 14:51:52,创建带自定义ttl消息,msg:order123,ttl:20000
  4. 当前时间:2023-06-27 14:52:12,死信队列消费的消息内容:order123
复制代码
这里可以看到先执行延迟5秒的请求,再执行延迟20秒的请求,结果是正常按指定时间消费的消息。
如果我们按下面这个顺序执行(先执行延迟20秒的请求,再执行延迟5秒的请求)
http://127.0.0.1:8080/cancelorder/msgwithttl?msg=order123&ttl=20000(先执行)
http://127.0.0.1:8080/cancelorder/msgwithttl?msg=order321&ttl=5000(后执行)
结果:
  1. 当前时间:2023-06-27 14:53:27,创建带自定义ttl消息,msg:order123,ttl:20000
  2. 当前时间:2023-06-27 14:53:31,创建带自定义ttl消息,msg:order321,ttl:5000
  3. 当前时间:2023-06-27 14:53:47,死信队列消费的消息内容:order123
  4. 当前时间:2023-06-27 14:53:47,死信队列消费的消息内容:order321
复制代码
这里两条请求执行完后,结果却是同一时间消费的消息,这是因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先得到执行,这个问题可以通过rabbMQ的延迟插件来解决。

  • 使用rabbitMQ插件实现的延迟队列接口发送两条消息
http://127.0.0.1:8080/cancelorder/msgwithplugin?msg=order123&ttl=20000(先执行)
http://127.0.0.1:8080/cancelorder/msgwithplugin?msg=order321&ttl=5000(后执行)
结果:
  1. 当前时间:2023-06-27 15:08:04,使用rabbitmq插件实现延迟队列,发送的消息,msg:order123,ttl:20000
  2. 当前时间:2023-06-27 15:08:06,使用rabbitmq插件实现延迟队列,发送的消息,msg:order321,ttl:5000
  3. 当前时间:2023-06-27 15:08:11,使用rabbitmq插件实现延迟队列,消费的消息内容:order321
  4. 当前时间:2023-06-27 15:08:24,使用rabbitmq插件实现延迟队列,消费的消息内容:order123
复制代码
这里消息过期正常被消费,解决了由于消息过期时长不一致导致的不能及时消费的问题。
优点

RabbitMQ消息服务可靠性高,消息处理速度快,支持大数据量,并且支持分布式横向扩展方便。
缺点

引入RabbitMQ中间件系统复杂度增高,运维成本增加,使用起来配置较复杂。
总结

订单超时自动取消是电商平台中非常重要的功能之一,通过合适的技术方案,可以实现自动化处理订单超时的逻辑,提升用户体验和系统效率。本文介绍了几种常用的技术方案,开发者可以根据具体的业务需求和技术栈选择合适的方案,并结合相应的文档和示例进行实现和配置。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表