锦通 发表于 2025-4-19 06:54:05

基于Redis的4种延时队列实现方式

延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列饰演着关键角色。比方,订单超时自动取消、定时提醒、延时支付等都依靠延时队列实现。
Redis作为高性能的内存数据库,具备原子利用、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。
1. 基于Sorted Set的延时队列

原理

利用Redis的Sorted Set(有序聚集),将消息ID作为member,执行时间戳作为score举行存储。通过ZRANGEBYSCORE命令可以获取到达执行时间的任务。
代码实现

public class RedisZSetDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String queueKey = "delay_queue:tasks";
   
    public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
    }
   
    /**
   * 添加延时任务
   * @param taskId 任务ID
   * @param taskInfo 任务信息(JSON字符串)
   * @param delayTime 延迟时间(秒)
   */
    public void addTask(String taskId, String taskInfo, long delayTime) {
      // 计算执行时间
      long executeTime = System.currentTimeMillis() + delayTime * 1000;
      
      // 存储任务详情
      redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);
      
      // 添加到延时队列
      redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);
      
      System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);
    }
   
    /**
   * 轮询获取到期任务
   */
    public List<String> pollTasks() {
      long now = System.currentTimeMillis();
      
      // 获取当前时间之前的任务
      Set<String> taskIds = redisTemplate.opsForZSet()
                .rangeByScore(queueKey, 0, now);
      
      if (taskIds == null || taskIds.isEmpty()) {
            return Collections.emptyList();
      }
      
      // 获取任务详情
      List<String> tasks = new ArrayList<>();
      for (String taskId : taskIds) {
            String taskInfo = (String) redisTemplate.opsForHash()
                  .get("delay_queue:details", taskId);
            
            if (taskInfo != null) {
                tasks.add(taskInfo);
               
                // 从集合和详情中移除任务
                redisTemplate.opsForZSet().remove(queueKey, taskId);
                redisTemplate.opsForHash().delete("delay_queue:details", taskId);
            }
      }
      
      return tasks;
    }
   
    // 定时任务示例
    public void startTaskProcessor() {
      ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
      scheduler.scheduleAtFixedRate(() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                  processTask(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
      }, 0, 1, TimeUnit.SECONDS);
    }
   
    private void processTask(String taskInfo) {
      System.out.println("Processing task: " + taskInfo);
      // 实际任务处理逻辑
    }
}
优缺点

长处


[*]实现简单,易于明白
[*]任务按执行时间自动排序
[*]支持精确的时间控制
缺点


[*]需要轮询获取到期任务,消耗CPU资源
[*]大量任务情况下,ZRANGEBYSCORE利用大概影响性能
[*]没有消费确认机制,需要额外实现
2. 基于List + 定时轮询的延时队列

原理

这种方式使用多个List作为存储容器,按延长时间的差异将任务分配到差异的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。
代码实现

public class RedisListDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String readyQueueKey = "delay_queue:ready";// 待处理队列
    private final Map<Integer, String> delayQueueKeys;// 延迟队列,按延时时间分级
   
    public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
      
      // 初始化不同延迟级别的队列
      delayQueueKeys = new HashMap<>();
      delayQueueKeys.put(5, "delay_queue:delay_5s");   // 5秒
      delayQueueKeys.put(60, "delay_queue:delay_1m");    // 1分钟
      delayQueueKeys.put(300, "delay_queue:delay_5m");   // 5分钟
      delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟
    }
   
    /**
   * 添加延时任务
   */
    public void addTask(String taskInfo, int delaySeconds) {
      // 选择合适的延迟队列
      String queueKey = selectDelayQueue(delaySeconds);
      
      // 任务元数据,包含任务信息和执行时间
      long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
      String taskData = executeTime + ":" + taskInfo;
      
      // 添加到延迟队列
      redisTemplate.opsForList().rightPush(queueKey, taskData);
      System.out.println("Task added to " + queueKey + ": " + taskData);
    }
   
    /**
   * 选择合适的延迟队列
   */
    private String selectDelayQueue(int delaySeconds) {
      // 找到最接近的延迟级别
      int closestDelay = delayQueueKeys.keySet().stream()
                .filter(delay -> delay >= delaySeconds)
                .min(Integer::compareTo)
                .orElse(Collections.max(delayQueueKeys.keySet()));
               
      return delayQueueKeys.get(closestDelay);
    }
   
    /**
   * 移动到期任务到待处理队列
   */
    public void moveTasksToReadyQueue() {
      long now = System.currentTimeMillis();
      
      // 遍历所有延迟队列
      for (String queueKey : delayQueueKeys.values()) {
            boolean hasMoreTasks = true;
            
            while (hasMoreTasks) {
                // 查看队列头部任务
                String taskData = redisTemplate.opsForList().index(queueKey, 0);
                if (taskData == null) {
                  hasMoreTasks = false;
                  continue;
                }
               
                // 解析任务执行时间
                long executeTime = Long.parseLong(taskData.split(":", 2));
               
                // 检查是否到期
                if (executeTime <= now) {
                  // 通过LPOP原子性地移除队列头部任务
                  String task = redisTemplate.opsForList().leftPop(queueKey);
                  
                  // 任务可能被其他进程处理,再次检查
                  if (task != null) {
                        // 提取任务信息并添加到待处理队列
                        String taskInfo = task.split(":", 2);
                        redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);
                        System.out.println("Task moved to ready queue: " + taskInfo);
                  }
                } else {
                  // 队列头部任务未到期,无需检查后面的任务
                  hasMoreTasks = false;
                }
            }
      }
    }
   
    /**
   * 获取待处理任务
   */
    public String getReadyTask() {
      return redisTemplate.opsForList().leftPop(readyQueueKey);
    }
   
    /**
   * 启动任务处理器
   */
    public void startTaskProcessors() {
      // 定时移动到期任务
      ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
      
      // 移动任务线程
      scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);
      
      // 处理任务线程
      scheduler.scheduleAtFixedRate(() -> {
            String task = getReadyTask();
            if (task != null) {
                processTask(task);
            }
      }, 0, 100, TimeUnit.MILLISECONDS);
    }
   
    private void processTask(String taskInfo) {
      System.out.println("Processing task: " + taskInfo);
      // 实际任务处理逻辑
    }
}
优缺点

长处


[*]分级队列设计,降低单队列压力
[*]相比Sorted Set占用内存少
[*]支持队列监控和任务优先级
缺点


[*]延长时间精度受轮询频率影响
[*]实现复杂度高
[*]需要维护多个队列
[*]时间判断和队列利用非原子性,需特别处理并发问题
3. 基于发布/订阅(Pub/Sub)的延时队列

原理

联合Redis发布/订阅功能与当地时间轮算法,实现延长任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。
代码实现

public class RedisPubSubDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String TASK_TOPIC = "delay_queue:task_channel";
    private final String TASK_HASH = "delay_queue:tasks";
   
    private final HashedWheelTimer timer;
   
    public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
      
      // 初始化时间轮,刻度100ms,轮子大小512
      this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
      
      // 启动消息订阅
      subscribeTaskChannel();
    }
   
    /**
   * 添加延时任务
   */
    public void addTask(String taskId, String taskInfo, long delaySeconds) {
      // 存储任务信息到Redis
      redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);
      
      // 添加到时间轮
      timer.newTimeout(timeout -> {
            // 发布任务就绪消息
            redisTemplate.convertAndSend(TASK_TOPIC, taskId);
      }, delaySeconds, TimeUnit.SECONDS);
      
      System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
    }
   
    /**
   * 订阅任务通道
   */
    private void subscribeTaskChannel() {
      redisTemplate.getConnectionFactory().getConnection().subscribe(
            (message, pattern) -> {
                String taskId = new String(message.getBody());
               
                // 获取任务信息
                String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);
               
                if (taskInfo != null) {
                  // 处理任务
                  processTask(taskId, taskInfo);
                  
                  // 删除任务
                  redisTemplate.opsForHash().delete(TASK_HASH, taskId);
                }
            },
            TASK_TOPIC.getBytes()
      );
    }
   
    private void processTask(String taskId, String taskInfo) {
      System.out.println("Processing task: " + taskId + " - " + taskInfo);
      // 实际任务处理逻辑
    }
   
    // 模拟HashedWheelTimer类
    public static class HashedWheelTimer {
      private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
      private final long tickDuration;
      private final TimeUnit unit;
      
      public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
            this.tickDuration = tickDuration;
            this.unit = unit;
      }
      
      public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
            long delayMillis = timeUnit.toMillis(delay);
            scheduler.schedule(
                () -> task.run(null),
                delayMillis,
                TimeUnit.MILLISECONDS
            );
      }
      
      public interface TimerTask {
            void run(Timeout timeout);
      }
      
      public interface Timeout {
      }
    }
}
优缺点

长处:


[*]即时触发,无需轮询
[*]高效的时间轮算法
[*]可以跨应用订阅任务
[*]分离任务调度和执行,降低耦合
缺点:


[*]依靠当地时间轮,非纯Redis实现
[*]Pub/Sub模式无消息长期化,大概丢失消息
[*]服务重启时需要重修时间轮
[*]订阅者需要保持连接
4. 基于Redis Stream的延时队列

原理

Redis 5.0引入的Stream是一个强盛的数据结构,专为消息队列设计。联合Stream的消费组和确认机制,可以构建可靠的延时队列。
代码实现

public class RedisStreamDelayQueue {
    private final StringRedisTemplate redisTemplate;
    private final String delayQueueKey = "delay_queue:stream";
    private final String consumerGroup = "delay_queue_consumers";
    private final String consumerId = UUID.randomUUID().toString();
   
    public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
      this.redisTemplate = redisTemplate;
      
      // 创建消费者组
      try {
            redisTemplate.execute((RedisCallback<String>) connection -> {
                connection.streamCommands().xGroupCreate(
                  delayQueueKey.getBytes(),
                  consumerGroup,
                  ReadOffset.from("0"),
                  true
                );
                return "OK";
            });
      } catch (Exception e) {
            // 消费者组可能已存在
            System.out.println("Consumer group may already exist: " + e.getMessage());
      }
    }
   
    /**
   * 添加延时任务
   */
    public void addTask(String taskInfo, long delaySeconds) {
      long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
      
      Map<String, Object> task = new HashMap<>();
      task.put("executeTime", String.valueOf(executeTime));
      task.put("taskInfo", taskInfo);
      
      redisTemplate.opsForStream().add(delayQueueKey, task);
      System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);
    }
   
    /**
   * 获取待执行的任务
   */
    public List<String> pollTasks() {
      long now = System.currentTimeMillis();
      List<String> readyTasks = new ArrayList<>();
      
      // 读取尚未处理的消息
      List<MapRecord<String, Object, Object>> records = redisTemplate.execute(
            (RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
                return connection.streamCommands().xReadGroup(
                  consumerGroup.getBytes(),
                  consumerId.getBytes(),
                  StreamReadOptions.empty().count(10),
                  StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">"))
                );
            }
      );
      
      if (records != null) {
            for (MapRecord<String, Object, Object> record : records) {
                String messageId = record.getId().getValue();
                Map<Object, Object> value = record.getValue();
               
                long executeTime = Long.parseLong((String) value.get("executeTime"));
                String taskInfo = (String) value.get("taskInfo");
               
                // 检查任务是否到期
                if (executeTime <= now) {
                  readyTasks.add(taskInfo);
                  
                  // 确认消息已处理
                  redisTemplate.execute((RedisCallback<String>) connection -> {
                        connection.streamCommands().xAck(
                            delayQueueKey.getBytes(),
                            consumerGroup.getBytes(),
                            messageId.getBytes()
                        );
                        return "OK";
                  });
                  
                  // 可选:从流中删除消息
                  redisTemplate.opsForStream().delete(delayQueueKey, messageId);
                } else {
                  // 任务未到期,放回队列
                  redisTemplate.execute((RedisCallback<String>) connection -> {
                        connection.streamCommands().xAck(
                            delayQueueKey.getBytes(),
                            consumerGroup.getBytes(),
                            messageId.getBytes()
                        );
                        return "OK";
                  });
                  
                  // 重新添加任务(可选:使用延迟重新入队策略)
                  Map<String, Object> newTask = new HashMap<>();
                  newTask.put("executeTime", String.valueOf(executeTime));
                  newTask.put("taskInfo", taskInfo);
                  redisTemplate.opsForStream().add(delayQueueKey, newTask);
                }
            }
      }
      
      return readyTasks;
    }
   
    /**
   * 启动任务处理器
   */
    public void startTaskProcessor() {
      ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
      scheduler.scheduleAtFixedRate(() -> {
            try {
                List<String> tasks = pollTasks();
                for (String task : tasks) {
                  processTask(task);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
      }, 0, 1, TimeUnit.SECONDS);
    }
   
    private void processTask(String taskInfo) {
      System.out.println("Processing task: " + taskInfo);
      // 实际任务处理逻辑
    }
}
优缺点

长处:


[*]支持消费者组和消息确认,提供可靠的消息处理
[*]内置消息长期化机制
[*]支持多消费者并行处理
[*]消息ID包含时间戳,便于排序
缺点:


[*]要求Redis 5.0+版本
[*]实现相对复杂
[*]仍需轮询获取到期任务
[*]对未到期任务的处理相对繁琐
性能对比与选型建议

实现方式性能可靠性实现复杂度内存占用实用场景Sorted Set★★★★☆★★★☆☆低中任务量适中,需要精确调度List + 轮询★★★★★★★★☆☆中低高并发,延时精度要求不高Pub/Sub + 时间轮★★★★★★★☆☆☆高低实时性要求高,可容忍服务重启丢失Stream★★★☆☆★★★★★高中可靠性要求高,需要消息确认 总结

在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素举行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产情况中稳定运行。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 基于Redis的4种延时队列实现方式