利用Redisson实现订单关闭

打印 上一主题 下一主题

主题 1032|帖子 1032|积分 3096

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
实体类

为了方便测试,直接在测试类中的写内部类:
  1.     @Data
  2.     @AllArgsConstructor
  3.     @NoArgsConstructor
  4.     public class OrderInfo {
  5.         /**
  6.          * 订单id
  7.          */
  8.         private Integer id;
  9.         /**
  10.          * 描述:用来记录关闭时间,可以在测试时用来验证。关闭时间是否跟 expireTime相等
  11.          */
  12.         private String description;
  13.         /**
  14.          * 创建时间
  15.          */
  16.         private LocalDateTime createTime;
  17.         /**
  18.          * 过期时间:关闭时间
  19.          */
  20.         private LocalDateTime expireTime;
  21.     }
复制代码
生成订单

模拟生成订单并设置过期时间。
执行时会在redis创建2个key:

  • redisson_delay_queue:{ } :订单数据
  • redisson_delay_queue_timeout:{ } :zset类型,按时间戳排序
  1.     /**
  2.      * 创建订单,并设置过期时间
  3.      *
  4.      * @throws IOException
  5.      */
  6.     @Test
  7.     void createOrder() {
  8.         RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
  9.         RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
  10.         // 100条订单
  11.         int n = 100;
  12.         Random random = new Random();
  13.         for (int i = 0; i < n; i++) {
  14.             // 1~100之间的正整数
  15.             int i1 = random.nextInt(100) + 1;
  16.             LocalDateTime now = LocalDateTime.now();
  17.             delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS);
  18.         }
  19.     }
复制代码
关闭订单

关闭订单,这里会产生订阅。redis会出现redisson_delay_queue_channel。
  1.     /**
  2.      * 关闭订单
  3.      *
  4.      * @throws IOException
  5.      */
  6.     @Test
  7.     void closeOrder() {
  8.         ReentrantLock lock = new ReentrantLock();
  9.         // 5个线程
  10.         int poolSize = 5;
  11.         List<CompletableFuture<Void>> futureList = new ArrayList<>();
  12.         for (int i = 0; i < poolSize; i++) {
  13.             futureList.add(CompletableFuture.runAsync(() -> {
  14.                 RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
  15.                 // 加入监听
  16.                 redissonClient.getDelayedQueue(blockingDeque);
  17.                 while (true) {
  18.                     OrderInfo take;
  19.                     try {
  20.                         take = blockingDeque.take();
  21.                     } catch (Exception e) {
  22.                         continue;
  23.                     }
  24.                     if (take == null) {
  25.                         continue;
  26.                     }
  27.                     // 验证多次是否会重复关闭。正常里不会近,只是验证下。正式环境,可以删除
  28.                     try {
  29.                         lock.lock();
  30.                         if(closed.contains(take.getId())){
  31.                             log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId());
  32.                         }
  33.                         closed.add(take.getId());
  34.                     }finally {
  35.                         lock.unlock();
  36.                     }
  37.                     // 处理订单关闭逻辑
  38.                     log.info("订单[{}]关闭中。。。", take.getId());
  39.                     log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take));
  40.                 }
  41.             }));
  42.         }
  43.         // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了
  44.         CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
  45.     }
复制代码
完整测试类:
  1. package cn.skyjilygao.demo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.junit.jupiter.api.Test;
  7. import org.redisson.api.RBlockingDeque;
  8. import org.redisson.api.RDelayedQueue;
  9. import org.redisson.api.RedissonClient;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.boot.test.context.SpringBootTest;
  12. import java.io.IOException;
  13. import java.time.LocalDateTime;
  14. import java.util.ArrayList;
  15. import java.util.List;
  16. import java.util.Random;
  17. import java.util.Set;
  18. import java.util.concurrent.CompletableFuture;
  19. import java.util.concurrent.ConcurrentSkipListSet;
  20. import java.util.concurrent.TimeUnit;
  21. import java.util.concurrent.locks.ReentrantLock;
  22. import static cn.skyjilygao.util.EntityUtil.toJsonString;
  23. @Slf4j
  24. @SpringBootTest
  25. public class CloseOrderTests {
  26.     @Autowired
  27.     private RedissonClient redissonClient;
  28.     public static String closeKey = "order_close_test";
  29.     public volatile static Set<Integer> closed = new ConcurrentSkipListSet<>();
  30.     /**
  31.      * 创建订单,并设置过期时间
  32.      *
  33.      * @throws IOException
  34.      */
  35.     @Test
  36.     void createOrder() {
  37.         RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
  38.         RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
  39.         int a = 100;
  40.         Random random = new Random(100);
  41.         for (int i = 0; i < a; i++) {
  42.             int i1 = random.nextInt(1 + i) + 1;
  43.             delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, LocalDateTime.now(), LocalDateTime.now().plusSeconds(i1)), i1, TimeUnit.SECONDS);
  44.         }
  45.     }
  46.     /**
  47.      * 关闭订单
  48.      *
  49.      * @throws IOException
  50.      */
  51.     @Test
  52.     void closeOrder() {
  53.         ReentrantLock lock = new ReentrantLock();
  54.         // 5个线程
  55.         int poolSize = 5;
  56.         List<CompletableFuture<Void>> futureList = new ArrayList<>();
  57.         for (int i = 0; i < poolSize; i++) {
  58.             futureList.add(CompletableFuture.runAsync(() -> {
  59.                 RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
  60.                 // 加入监听
  61.                 redissonClient.getDelayedQueue(blockingDeque);
  62.                 while (true) {
  63.                     OrderInfo take;
  64.                     try {
  65.                         take = blockingDeque.take();
  66.                     } catch (Exception e) {
  67.                         continue;
  68.                     }
  69.                     if (take == null) {
  70.                         continue;
  71.                     }
  72.                     try {
  73.                         lock.lock();
  74.                         if(closed.contains(take.getId())){
  75.                             log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId());
  76.                         }
  77.                         closed.add(take.getId());
  78.                     }finally {
  79.                         lock.unlock();
  80.                     }
  81.                     log.info("订单[{}]关闭中。。。", take.getId());
  82.                     log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take));
  83.                 }
  84.             }));
  85.         }
  86.         // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了
  87.         CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
  88.     }
  89.     @Data
  90.     @AllArgsConstructor
  91.     @NoArgsConstructor
  92.     public class OrderInfo {
  93.         private Integer id;
  94.         private String description;
  95.         private LocalDateTime createTime;
  96.         private LocalDateTime expireTime;
  97.     }
  98. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表