马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
实体类
为了方便测试,直接在测试类中的写内部类:- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class OrderInfo {
- /**
- * 订单id
- */
- private Integer id;
- /**
- * 描述:用来记录关闭时间,可以在测试时用来验证。关闭时间是否跟 expireTime相等
- */
- private String description;
- /**
- * 创建时间
- */
- private LocalDateTime createTime;
- /**
- * 过期时间:关闭时间
- */
- private LocalDateTime expireTime;
- }
复制代码 生成订单
模拟生成订单并设置过期时间。
执行时会在redis创建2个key:
- redisson_delay_queue:{ } :订单数据
- redisson_delay_queue_timeout:{ } :zset类型,按时间戳排序
- /**
- * 创建订单,并设置过期时间
- *
- * @throws IOException
- */
- @Test
- void createOrder() {
- RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
- RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
- // 100条订单
- int n = 100;
- Random random = new Random();
- for (int i = 0; i < n; i++) {
- // 1~100之间的正整数
- int i1 = random.nextInt(100) + 1;
- LocalDateTime now = LocalDateTime.now();
- delayedQueue.offer(new OrderInfo(i + 1, "close: " + i1, now, now.plusSeconds(i1)), i1, TimeUnit.SECONDS);
- }
- }
复制代码 关闭订单
关闭订单,这里会产生订阅。redis会出现redisson_delay_queue_channel。- /**
- * 关闭订单
- *
- * @throws IOException
- */
- @Test
- void closeOrder() {
- ReentrantLock lock = new ReentrantLock();
- // 5个线程
- int poolSize = 5;
- List<CompletableFuture<Void>> futureList = new ArrayList<>();
- for (int i = 0; i < poolSize; i++) {
- futureList.add(CompletableFuture.runAsync(() -> {
- RBlockingDeque<OrderInfo> blockingDeque = redissonClient.getBlockingDeque(closeKey);
- // 加入监听
- redissonClient.getDelayedQueue(blockingDeque);
- while (true) {
- OrderInfo take;
- try {
- take = blockingDeque.take();
- } catch (Exception e) {
- continue;
- }
- if (take == null) {
- continue;
- }
- // 验证多次是否会重复关闭。正常里不会近,只是验证下。正式环境,可以删除
- try {
- lock.lock();
- if(closed.contains(take.getId())){
- log.info("测试是否会抢占:已存在其他线程处理关闭订单[{}]", take.getId());
- }
- closed.add(take.getId());
- }finally {
- lock.unlock();
- }
- // 处理订单关闭逻辑
- log.info("订单[{}]关闭中。。。", take.getId());
- log.info("订单[{}]已关闭!order={}", take.getId(), toJsonString(take));
- }
- }));
- }
- // 模拟正式环境中进程一直在运行,因为test时,没有join则会只执行一次出现消费完数据后进程就关闭了
- CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
- }
复制代码 完整测试类:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |