基于Redis的4种延时队列实现方式

锦通  论坛元老 | 2025-4-19 06:54:05 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1580|帖子 1580|积分 4740

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列饰演着关键角色。比方,订单超时自动取消、定时提醒、延时支付等都依靠延时队列实现。
Redis作为高性能的内存数据库,具备原子利用、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。
1. 基于Sorted Set的延时队列

原理

利用Redis的Sorted Set(有序聚集),将消息ID作为member,执行时间戳作为score举行存储。通过ZRANGEBYSCORE命令可以获取到达执行时间的任务。
代码实现

  1. public class RedisZSetDelayQueue {
  2.     private final StringRedisTemplate redisTemplate;
  3.     private final String queueKey = "delay_queue:tasks";
  4.    
  5.     public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
  6.         this.redisTemplate = redisTemplate;
  7.     }
  8.    
  9.     /**
  10.      * 添加延时任务
  11.      * @param taskId 任务ID
  12.      * @param taskInfo 任务信息(JSON字符串)
  13.      * @param delayTime 延迟时间(秒)
  14.      */
  15.     public void addTask(String taskId, String taskInfo, long delayTime) {
  16.         // 计算执行时间
  17.         long executeTime = System.currentTimeMillis() + delayTime * 1000;
  18.         
  19.         // 存储任务详情
  20.         redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);
  21.         
  22.         // 添加到延时队列
  23.         redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);
  24.         
  25.         System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);
  26.     }
  27.    
  28.     /**
  29.      * 轮询获取到期任务
  30.      */
  31.     public List<String> pollTasks() {
  32.         long now = System.currentTimeMillis();
  33.         
  34.         // 获取当前时间之前的任务
  35.         Set<String> taskIds = redisTemplate.opsForZSet()
  36.                 .rangeByScore(queueKey, 0, now);
  37.         
  38.         if (taskIds == null || taskIds.isEmpty()) {
  39.             return Collections.emptyList();
  40.         }
  41.         
  42.         // 获取任务详情
  43.         List<String> tasks = new ArrayList<>();
  44.         for (String taskId : taskIds) {
  45.             String taskInfo = (String) redisTemplate.opsForHash()
  46.                     .get("delay_queue:details", taskId);
  47.             
  48.             if (taskInfo != null) {
  49.                 tasks.add(taskInfo);
  50.                
  51.                 // 从集合和详情中移除任务
  52.                 redisTemplate.opsForZSet().remove(queueKey, taskId);
  53.                 redisTemplate.opsForHash().delete("delay_queue:details", taskId);
  54.             }
  55.         }
  56.         
  57.         return tasks;
  58.     }
  59.    
  60.     // 定时任务示例
  61.     public void startTaskProcessor() {
  62.         ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  63.         scheduler.scheduleAtFixedRate(() -> {
  64.             try {
  65.                 List<String> tasks = pollTasks();
  66.                 for (String task : tasks) {
  67.                     processTask(task);
  68.                 }
  69.             } catch (Exception e) {
  70.                 e.printStackTrace();
  71.             }
  72.         }, 0, 1, TimeUnit.SECONDS);
  73.     }
  74.    
  75.     private void processTask(String taskInfo) {
  76.         System.out.println("Processing task: " + taskInfo);
  77.         // 实际任务处理逻辑
  78.     }
  79. }
复制代码
优缺点

长处


  • 实现简单,易于明白
  • 任务按执行时间自动排序
  • 支持精确的时间控制
缺点


  • 需要轮询获取到期任务,消耗CPU资源
  • 大量任务情况下,ZRANGEBYSCORE利用大概影响性能
  • 没有消费确认机制,需要额外实现
2. 基于List + 定时轮询的延时队列

原理

这种方式使用多个List作为存储容器,按延长时间的差异将任务分配到差异的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。
代码实现

  1. public class RedisListDelayQueue {
  2.     private final StringRedisTemplate redisTemplate;
  3.     private final String readyQueueKey = "delay_queue:ready";  // 待处理队列
  4.     private final Map<Integer, String> delayQueueKeys;  // 延迟队列,按延时时间分级
  5.    
  6.     public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
  7.         this.redisTemplate = redisTemplate;
  8.         
  9.         // 初始化不同延迟级别的队列
  10.         delayQueueKeys = new HashMap<>();
  11.         delayQueueKeys.put(5, "delay_queue:delay_5s");     // 5秒
  12.         delayQueueKeys.put(60, "delay_queue:delay_1m");    // 1分钟
  13.         delayQueueKeys.put(300, "delay_queue:delay_5m");   // 5分钟
  14.         delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟
  15.     }
  16.    
  17.     /**
  18.      * 添加延时任务
  19.      */
  20.     public void addTask(String taskInfo, int delaySeconds) {
  21.         // 选择合适的延迟队列
  22.         String queueKey = selectDelayQueue(delaySeconds);
  23.         
  24.         // 任务元数据,包含任务信息和执行时间
  25.         long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
  26.         String taskData = executeTime + ":" + taskInfo;
  27.         
  28.         // 添加到延迟队列
  29.         redisTemplate.opsForList().rightPush(queueKey, taskData);
  30.         System.out.println("Task added to " + queueKey + ": " + taskData);
  31.     }
  32.    
  33.     /**
  34.      * 选择合适的延迟队列
  35.      */
  36.     private String selectDelayQueue(int delaySeconds) {
  37.         // 找到最接近的延迟级别
  38.         int closestDelay = delayQueueKeys.keySet().stream()
  39.                 .filter(delay -> delay >= delaySeconds)
  40.                 .min(Integer::compareTo)
  41.                 .orElse(Collections.max(delayQueueKeys.keySet()));
  42.                
  43.         return delayQueueKeys.get(closestDelay);
  44.     }
  45.    
  46.     /**
  47.      * 移动到期任务到待处理队列
  48.      */
  49.     public void moveTasksToReadyQueue() {
  50.         long now = System.currentTimeMillis();
  51.         
  52.         // 遍历所有延迟队列
  53.         for (String queueKey : delayQueueKeys.values()) {
  54.             boolean hasMoreTasks = true;
  55.             
  56.             while (hasMoreTasks) {
  57.                 // 查看队列头部任务
  58.                 String taskData = redisTemplate.opsForList().index(queueKey, 0);
  59.                 if (taskData == null) {
  60.                     hasMoreTasks = false;
  61.                     continue;
  62.                 }
  63.                
  64.                 // 解析任务执行时间
  65.                 long executeTime = Long.parseLong(taskData.split(":", 2)[0]);
  66.                
  67.                 // 检查是否到期
  68.                 if (executeTime <= now) {
  69.                     // 通过LPOP原子性地移除队列头部任务
  70.                     String task = redisTemplate.opsForList().leftPop(queueKey);
  71.                     
  72.                     // 任务可能被其他进程处理,再次检查
  73.                     if (task != null) {
  74.                         // 提取任务信息并添加到待处理队列
  75.                         String taskInfo = task.split(":", 2)[1];
  76.                         redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);
  77.                         System.out.println("Task moved to ready queue: " + taskInfo);
  78.                     }
  79.                 } else {
  80.                     // 队列头部任务未到期,无需检查后面的任务
  81.                     hasMoreTasks = false;
  82.                 }
  83.             }
  84.         }
  85.     }
  86.    
  87.     /**
  88.      * 获取待处理任务
  89.      */
  90.     public String getReadyTask() {
  91.         return redisTemplate.opsForList().leftPop(readyQueueKey);
  92.     }
  93.    
  94.     /**
  95.      * 启动任务处理器
  96.      */
  97.     public void startTaskProcessors() {
  98.         // 定时移动到期任务
  99.         ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
  100.         
  101.         // 移动任务线程
  102.         scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);
  103.         
  104.         // 处理任务线程
  105.         scheduler.scheduleAtFixedRate(() -> {
  106.             String task = getReadyTask();
  107.             if (task != null) {
  108.                 processTask(task);
  109.             }
  110.         }, 0, 100, TimeUnit.MILLISECONDS);
  111.     }
  112.    
  113.     private void processTask(String taskInfo) {
  114.         System.out.println("Processing task: " + taskInfo);
  115.         // 实际任务处理逻辑
  116.     }
  117. }
复制代码
优缺点

长处


  • 分级队列设计,降低单队列压力
  • 相比Sorted Set占用内存少
  • 支持队列监控和任务优先级
缺点


  • 延长时间精度受轮询频率影响
  • 实现复杂度高
  • 需要维护多个队列
  • 时间判断和队列利用非原子性,需特别处理并发问题
3. 基于发布/订阅(Pub/Sub)的延时队列

原理

联合Redis发布/订阅功能与当地时间轮算法,实现延长任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。
代码实现

  1. public class RedisPubSubDelayQueue {
  2.     private final StringRedisTemplate redisTemplate;
  3.     private final String TASK_TOPIC = "delay_queue:task_channel";
  4.     private final String TASK_HASH = "delay_queue:tasks";
  5.    
  6.     private final HashedWheelTimer timer;
  7.    
  8.     public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
  9.         this.redisTemplate = redisTemplate;
  10.         
  11.         // 初始化时间轮,刻度100ms,轮子大小512
  12.         this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
  13.         
  14.         // 启动消息订阅
  15.         subscribeTaskChannel();
  16.     }
  17.    
  18.     /**
  19.      * 添加延时任务
  20.      */
  21.     public void addTask(String taskId, String taskInfo, long delaySeconds) {
  22.         // 存储任务信息到Redis
  23.         redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);
  24.         
  25.         // 添加到时间轮
  26.         timer.newTimeout(timeout -> {
  27.             // 发布任务就绪消息
  28.             redisTemplate.convertAndSend(TASK_TOPIC, taskId);
  29.         }, delaySeconds, TimeUnit.SECONDS);
  30.         
  31.         System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
  32.     }
  33.    
  34.     /**
  35.      * 订阅任务通道
  36.      */
  37.     private void subscribeTaskChannel() {
  38.         redisTemplate.getConnectionFactory().getConnection().subscribe(
  39.             (message, pattern) -> {
  40.                 String taskId = new String(message.getBody());
  41.                
  42.                 // 获取任务信息
  43.                 String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);
  44.                
  45.                 if (taskInfo != null) {
  46.                     // 处理任务
  47.                     processTask(taskId, taskInfo);
  48.                     
  49.                     // 删除任务
  50.                     redisTemplate.opsForHash().delete(TASK_HASH, taskId);
  51.                 }
  52.             },
  53.             TASK_TOPIC.getBytes()
  54.         );
  55.     }
  56.    
  57.     private void processTask(String taskId, String taskInfo) {
  58.         System.out.println("Processing task: " + taskId + " - " + taskInfo);
  59.         // 实际任务处理逻辑
  60.     }
  61.    
  62.     // 模拟HashedWheelTimer类
  63.     public static class HashedWheelTimer {
  64.         private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  65.         private final long tickDuration;
  66.         private final TimeUnit unit;
  67.         
  68.         public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
  69.             this.tickDuration = tickDuration;
  70.             this.unit = unit;
  71.         }
  72.         
  73.         public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
  74.             long delayMillis = timeUnit.toMillis(delay);
  75.             scheduler.schedule(
  76.                 () -> task.run(null),
  77.                 delayMillis,
  78.                 TimeUnit.MILLISECONDS
  79.             );
  80.         }
  81.         
  82.         public interface TimerTask {
  83.             void run(Timeout timeout);
  84.         }
  85.         
  86.         public interface Timeout {
  87.         }
  88.     }
  89. }
复制代码
优缺点

长处


  • 即时触发,无需轮询
  • 高效的时间轮算法
  • 可以跨应用订阅任务
  • 分离任务调度和执行,降低耦合
缺点


  • 依靠当地时间轮,非纯Redis实现
  • Pub/Sub模式无消息长期化,大概丢失消息
  • 服务重启时需要重修时间轮
  • 订阅者需要保持连接
4. 基于Redis Stream的延时队列

原理

Redis 5.0引入的Stream是一个强盛的数据结构,专为消息队列设计。联合Stream的消费组和确认机制,可以构建可靠的延时队列。
代码实现

  1. public class RedisStreamDelayQueue {
  2.     private final StringRedisTemplate redisTemplate;
  3.     private final String delayQueueKey = "delay_queue:stream";
  4.     private final String consumerGroup = "delay_queue_consumers";
  5.     private final String consumerId = UUID.randomUUID().toString();
  6.    
  7.     public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
  8.         this.redisTemplate = redisTemplate;
  9.         
  10.         // 创建消费者组
  11.         try {
  12.             redisTemplate.execute((RedisCallback<String>) connection -> {
  13.                 connection.streamCommands().xGroupCreate(
  14.                     delayQueueKey.getBytes(),
  15.                     consumerGroup,
  16.                     ReadOffset.from("0"),
  17.                     true
  18.                 );
  19.                 return "OK";
  20.             });
  21.         } catch (Exception e) {
  22.             // 消费者组可能已存在
  23.             System.out.println("Consumer group may already exist: " + e.getMessage());
  24.         }
  25.     }
  26.    
  27.     /**
  28.      * 添加延时任务
  29.      */
  30.     public void addTask(String taskInfo, long delaySeconds) {
  31.         long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
  32.         
  33.         Map<String, Object> task = new HashMap<>();
  34.         task.put("executeTime", String.valueOf(executeTime));
  35.         task.put("taskInfo", taskInfo);
  36.         
  37.         redisTemplate.opsForStream().add(delayQueueKey, task);
  38.         System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);
  39.     }
  40.    
  41.     /**
  42.      * 获取待执行的任务
  43.      */
  44.     public List<String> pollTasks() {
  45.         long now = System.currentTimeMillis();
  46.         List<String> readyTasks = new ArrayList<>();
  47.         
  48.         // 读取尚未处理的消息
  49.         List<MapRecord<String, Object, Object>> records = redisTemplate.execute(
  50.             (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
  51.                 return connection.streamCommands().xReadGroup(
  52.                     consumerGroup.getBytes(),
  53.                     consumerId.getBytes(),
  54.                     StreamReadOptions.empty().count(10),
  55.                     StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">"))
  56.                 );
  57.             }
  58.         );
  59.         
  60.         if (records != null) {
  61.             for (MapRecord<String, Object, Object> record : records) {
  62.                 String messageId = record.getId().getValue();
  63.                 Map<Object, Object> value = record.getValue();
  64.                
  65.                 long executeTime = Long.parseLong((String) value.get("executeTime"));
  66.                 String taskInfo = (String) value.get("taskInfo");
  67.                
  68.                 // 检查任务是否到期
  69.                 if (executeTime <= now) {
  70.                     readyTasks.add(taskInfo);
  71.                     
  72.                     // 确认消息已处理
  73.                     redisTemplate.execute((RedisCallback<String>) connection -> {
  74.                         connection.streamCommands().xAck(
  75.                             delayQueueKey.getBytes(),
  76.                             consumerGroup.getBytes(),
  77.                             messageId.getBytes()
  78.                         );
  79.                         return "OK";
  80.                     });
  81.                     
  82.                     // 可选:从流中删除消息
  83.                     redisTemplate.opsForStream().delete(delayQueueKey, messageId);
  84.                 } else {
  85.                     // 任务未到期,放回队列
  86.                     redisTemplate.execute((RedisCallback<String>) connection -> {
  87.                         connection.streamCommands().xAck(
  88.                             delayQueueKey.getBytes(),
  89.                             consumerGroup.getBytes(),
  90.                             messageId.getBytes()
  91.                         );
  92.                         return "OK";
  93.                     });
  94.                     
  95.                     // 重新添加任务(可选:使用延迟重新入队策略)
  96.                     Map<String, Object> newTask = new HashMap<>();
  97.                     newTask.put("executeTime", String.valueOf(executeTime));
  98.                     newTask.put("taskInfo", taskInfo);
  99.                     redisTemplate.opsForStream().add(delayQueueKey, newTask);
  100.                 }
  101.             }
  102.         }
  103.         
  104.         return readyTasks;
  105.     }
  106.    
  107.     /**
  108.      * 启动任务处理器
  109.      */
  110.     public void startTaskProcessor() {
  111.         ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
  112.         scheduler.scheduleAtFixedRate(() -> {
  113.             try {
  114.                 List<String> tasks = pollTasks();
  115.                 for (String task : tasks) {
  116.                     processTask(task);
  117.                 }
  118.             } catch (Exception e) {
  119.                 e.printStackTrace();
  120.             }
  121.         }, 0, 1, TimeUnit.SECONDS);
  122.     }
  123.    
  124.     private void processTask(String taskInfo) {
  125.         System.out.println("Processing task: " + taskInfo);
  126.         // 实际任务处理逻辑
  127.     }
  128. }
复制代码
优缺点

长处


  • 支持消费者组和消息确认,提供可靠的消息处理
  • 内置消息长期化机制
  • 支持多消费者并行处理
  • 消息ID包含时间戳,便于排序
缺点


  • 要求Redis 5.0+版本
  • 实现相对复杂
  • 仍需轮询获取到期任务
  • 对未到期任务的处理相对繁琐
性能对比与选型建议

实现方式性能可靠性实现复杂度内存占用实用场景Sorted Set★★★★☆★★★☆☆低中任务量适中,需要精确调度List + 轮询★★★★★★★★☆☆中低高并发,延时精度要求不高Pub/Sub + 时间轮★★★★★★★☆☆☆高低实时性要求高,可容忍服务重启丢失Stream★★★☆☆★★★★★高中可靠性要求高,需要消息确认 总结

在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素举行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产情况中稳定运行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

锦通

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表