Redis实现延迟队列

打印 上一主题 下一主题

主题 1794|帖子 1794|积分 5382

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
利用Redis实现延迟队列

实现思绪

redis作为一款高性能的NoSQL数据库,具备快熟读写,高并发,数据持久化等特点,非常实用与实现延迟队列 ,redis提供了丰富的数据布局.
其中利用redis的ZSET聚集 (有序聚集)数据布局就可以实现一个简单的延迟队列
redis的zset数据布局中的每个元素都有一个分数score和一个值value,我们可以将任务的实验时间戳作为score,
将任务数据作为value,将任务插入到zset中,每个任务有一个唯一的id(比如订单id),以及任务实验时间(比如30min),
任务内容(比如订单超时支付系统自动取消)等信息体。然后另起一个线程,该线程会周期性地从zset中取出score最小
(即最早要实验的)的任务,如果该任务的score小于当前时间戳,则实验任务,否则等候一段时间再次检查,
直到任务可以实验,实验任务后,通过Redis的remove命令删除已经成功实验的任务即可。
详细步骤

本文将介绍怎样利用Redis的Sorted Set数据布局来实现延迟队列,并提供一个完整的示例代码。同时,我们还将会给出对应的测试用例和测试结果。
如下我先给同学们概括下,针对Spring Boot项目,怎样利用Redis实现延迟队列的一些实现步骤?

  • 引入相干依靠 (集成redis)
  1.         <!--集成redis-->
  2.         <dependency>
  3.             <groupId>org.springframework.boot</groupId>
  4.             <artifactId>spring-boot-starter-data-redis</artifactId>
  5.         </dependency>
复制代码

  • 设置redis
  1. #redis配置
  2. Spring:
  3.   redis:
  4.     database: 0    #Redis数据库索引(默认为0)
  5.     host: 127.0.0.1  #redis服务器ip,由于我是搭建在本地,固指向本地ip
  6.     port: 6379  #redis服务器连接端口
  7.     password:    #redis服务器连接密码(默认为空)
  8.     # 连接池配置
  9.     jedis.pool:
  10.       max-active: 20      #连接池最大连接数(使用负值表示没有限制)
  11.       max-wait: -1     #连接池最大阻塞等待时间(使用负值表示没有限制)
  12.       max-idle: 10        #连接池中的最大空闲连接
  13.       min-idle: 0         #连接池中的最小空闲连接
  14.       timeout: 1000      #连接超时时间(毫秒)。我设置的是1秒
复制代码

  • 创建redis设置
  1. @Configuration
  2. public class RedisConfig {
  3.     /**
  4.      * RedisTemplate配置
  5.      */
  6.     @Bean("redisTemplate")
  7.     public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
  8.         RedisTemplate<Object, Object> template = new RedisTemplate<>();
  9.         template.setConnectionFactory(redisConnectionFactory);
  10.         // 使用fastjson进行序列化处理,提高解析效率
  11.         FastJsonRedisSerializer<Object> serializer = new FastJsonRedisSerializer<Object>(Object.class);
  12.         // value值的序列化采用fastJsonRedisSerializer
  13.         template.setValueSerializer(serializer);
  14.         template.setHashValueSerializer(serializer);
  15.         // key的序列化采用StringRedisSerializer
  16.         template.setKeySerializer(new StringRedisSerializer());
  17.         template.setHashKeySerializer(new StringRedisSerializer());
  18.         template.setConnectionFactory(redisConnectionFactory);
  19.         // 使用fastjson时需设置此项,否则会报异常not support type
  20.         ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
  21.         return template;
  22.     }
  23.     /**
  24.      * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
  25.      * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
  26.      *
  27.      * @param connectionFactory
  28.      * @return
  29.      */
  30.     @Bean
  31.     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
  32.         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  33.         container.setConnectionFactory(connectionFactory);
  34.         return container;
  35.     }
  36. }
复制代码

  • 序列化
  1. /**   
  2. * @Description:使用fastjson实现redis的序列化   
  3. */
  4. public class FastJsonRedisSerializer<T> implements RedisSerializer<T> {
  5.         public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
  6.          
  7.     private Class<T> clazz;
  8.     public FastJsonRedisSerializer(Class<T> clazz) {
  9.         super();
  10.         this.clazz = clazz;
  11.     }
  12.     @Override
  13.     public byte[] serialize(T t) throws SerializationException {
  14.         if (t == null) {
  15.             return new byte[0];
  16.         }
  17.         return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
  18.     }
  19.     @Override
  20.     public T deserialize(byte[] bytes) throws SerializationException {
  21.         if (bytes == null || bytes.length <= 0) {
  22.             return null;
  23.         }
  24.         String str = new String(bytes, DEFAULT_CHARSET);
  25.         return (T) JSON.parseObject(str, clazz);
  26.     }
  27. }
复制代码

  • 创建消息类 DelayMessage


  • 这里定义一个消息类 , 包含消息的id,消息内容,以及到期时间(消息的实验时间) , 代码如下
  1. @Data
  2. @AllArgsConstructor
  3. @NoArgsConstructor
  4. public class DelayMessage implements Serializable {
  5.     /**
  6.      * 切记实例化
  7.      */
  8.     private static final long serialVersionUID = -7671756385477179547L;
  9.     /**
  10.      * 消息 id
  11.      */
  12.     private String id;
  13.     /**
  14.      * 消息内容
  15.      */
  16.     private String content;
  17.     /**
  18.      * 消息到期时间(指定当前消息在什么时间开始消费(时间戳))
  19.      */
  20.     private long expireTime;
  21. }
复制代码

  • 创建延迟队列类 DelayQueue


  • 创建一个延迟队列类 , 提供,添加消息,删除消息,和获取消息的方法 , 具体代码如下
  1. @Component
  2. public class DelayQueue {
  3.     /**
  4.      * key后面拼接当前机器的内网ip : 用于集群区分,解决集群出现的并发问题
  5.      */
  6.     private static final String KEY = "delay_queue:" + getHostAddress();
  7.     @Autowired
  8.     private RedisTemplate redisTemplate;
  9.     /**
  10.      * 添加消息到延时队列中
  11.      */
  12.     public void put(DelayMessage message) {
  13.         redisTemplate.opsForZSet().add(KEY, message, message.getExpireTime());
  14.     }
  15.     /**
  16.      * 从延时队列中删除消息
  17.      */
  18.     public Long remove(DelayMessage message) {
  19.         Long remove = redisTemplate.opsForZSet().remove(KEY, message);
  20.         return remove;
  21.     }
  22.     /**
  23.      * 获取延时队列中已到期的消息
  24.      */
  25.     public List<DelayMessage> getExpiredMessages() {
  26. //        1 : 获取到开始时间
  27.         long minScore = 0;
  28. //        2 : 获取到结束时间
  29.         long maxScore = System.currentTimeMillis();
  30. //        3 : 获取到指定范围区间的数据列表
  31.         Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(KEY, minScore, maxScore);
  32.         if (messages == null || messages.isEmpty()) {
  33.             return Collections.emptyList();
  34.         }
  35. //        4 : 把对象进行封装,返回
  36.         List<DelayMessage> result = new ArrayList<>();
  37.         for (Object message : messages) {
  38.             DelayMessage delayMessage = JSONObject.parseObject(JSON.toJSONString(message), DelayMessage.class);
  39.             result.add(delayMessage);
  40.         }
  41.         return result;
  42.     }
  43.     /**
  44.      * 获取地址(服务器的内网地址)(内网ip)
  45.      *
  46.      * @return
  47.      */
  48.     public static String getHostAddress() {
  49.         InetAddress localHost = null;
  50.         try {
  51.             localHost = InetAddress.getLocalHost();
  52.         } catch (
  53.                 UnknownHostException e) {
  54.             e.printStackTrace();
  55.         }
  56.         return localHost.getHostAddress();
  57.     }
  58. }
复制代码

  • 创建 DelayMessageHandler 消息处理类


  • 创建一个消息处理累, 添加一个处理过期的消息,写个定时任务,隔断1s轮询延时队列中已到期的任务,如果获取不到为空,
    则不举行消息处理的逻辑 , 反之继续轮询
  1. @Component
  2. public class DelayMessageHandler {
  3.    
  4.     public static SimpleDateFormat dateTimeFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  5.     @Autowired
  6.     private DelayQueue delayQueue;
  7.     /**
  8.      * 处理已到期的消息(轮询)
  9.      */
  10.     @Scheduled(fixedDelay = 1000)
  11.     public void handleExpiredMessages() {
  12.         String currentTime = getCurrentTime();
  13. //      1 : 扫描任务,并将需要执行的任务加入到任务队列中
  14.         List<DelayMessage> messages = delayQueue.getExpiredMessages();
  15.         List<DelayMessage> messages_2 = delayQueue.getExpiredMessages();
  16.         System.out.println(currentTime + " 待处理消息数量:" + messages.size());
  17. //      2 : 开始处理消息
  18.         if (!messages.isEmpty()) {
  19.             for (DelayMessage message : messages) {
  20.                     System.out.println(message.getId() + " --> 消息开始处理");
  21.                     try {
  22. //                      2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
  23.                         Thread.sleep(3000);
  24.                     } catch (Exception e) {
  25.                         e.printStackTrace();
  26.                     }
  27.                     System.out.println(message.getId() + " --> 消息处理结束");
  28. //                2.2 : 处理完消息,删除消息
  29.                     delayQueue.remove(message);
  30.             }
  31.         }
  32.     }
  33.     /**
  34.      * 获取到的当前时分秒
  35.      *
  36.      * @return
  37.      */
  38.     public static String getCurrentTime() {
  39.         String format = dateTimeFormater.format(new Date());
  40.         return format;
  41.     }
  42. }
复制代码
实验结果 : (我们可以看到 , 消息正在慢慢的被消耗)
  1. 2023-11-03 15:06:01 待处理消息数量:0
  2. 2023-11-03 15:06:02 待处理消息数量:0
  3. 2023-11-03 15:06:03 待处理消息数量:0
  4. 2023-11-03 15:06:04 待处理消息数量:0
  5. # 此处开始调用接口 , 往延迟队列中添加消息
  6. 2023-11-03 15:06:05 待处理消息数量:4
  7. 2023-11-03 15:06:05 :1 --> 消息开始处理
  8. 2023-11-03 15:06:05 :1 --> 消息处理结束
  9. 2023-11-03 15:06:05 :13 --> 消息开始处理
  10. 2023-11-03 15:06:05 :13 --> 消息处理结束
  11. 2023-11-03 15:06:05 :5 --> 消息开始处理
  12. 2023-11-03 15:06:05 :5 --> 消息处理结束
  13. 2023-11-03 15:06:05 :9 --> 消息开始处理
  14. 2023-11-03 15:06:05 :9 --> 消息处理结束
  15. 2023-11-03 15:06:18 待处理消息数量:12
  16. 2023-11-03 15:06:18 :10 --> 消息开始处理
  17. 2023-11-03 15:06:18 :10 --> 消息处理结束
  18. 2023-11-03 15:06:18 :14 --> 消息开始处理
  19. 2023-11-03 15:06:18 :14 --> 消息处理结束
  20. 2023-11-03 15:06:18 :2 --> 消息开始处理
  21. 2023-11-03 15:06:18 :2 --> 消息处理结束
  22. 2023-11-03 15:06:18 :6 --> 消息开始处理
复制代码
此处我们会发现一个问题 , @Scheduled 注解是轮询实验的 , 如果上一个任务没实验完毕 , 定时器会等候 , 等候上一次实验完毕
也就是说 , @Scheduled 注解表示同步实验的 , 那么就会出现一个问题 , 每一个消息处理都会耗时3秒,
假设有 A B 两条消息 , 消息的过期时间是一致的 , 那么这两个消息会被同时从缓存中取出准备消耗 ,假设A消息第一个开始消耗 ,
那么B消息,就要等候3秒 , 等A消息实验完成,才开始消耗B消息 , 那么就会出现消息堆积,延迟消耗的环境 , 原来14:00就要消耗的消息,比及了 14:10 才开始消耗(可能会更晚) ,
如果消息量充足大的环境下 , 就会出现问题 , 内存走漏 , 消息堆积 , 延迟消耗等环境
解决办法 : 开线程去实验 (利用线程池) , 利用以下代码 , 我们消耗一条消息,就必要创建一个线程去背景消耗 , 就会解决了上面的问题 ,
(这里必要用到线程池,我为了偷懒 ,就简单模仿了一下)
  1.     /**
  2.      * 处理已到期的消息(轮询)
  3.      */
  4.     @Scheduled(fixedDelay = 1000)
  5.     public void handleExpiredMessages() {
  6.         String currentTime = getCurrentTime();
  7. //      1 : 扫描任务,并将需要执行的任务加入到任务队列中
  8.         List<DelayMessage> messages = delayQueue.getExpiredMessages();
  9.         System.out.println(currentTime + " 待处理消息数量:" + messages.size());
  10. //      2 : 开始处理消息
  11.         if (!messages.isEmpty()) {
  12.             for (DelayMessage message : messages) {
  13. //                2.1 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
  14.                 new Thread(() -> {
  15.                     System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
  16.                     try {
  17. //                      2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
  18.                         Thread.sleep(3000);
  19.                     } catch (Exception e) {
  20.                         e.printStackTrace();
  21.                     }
  22.                     System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
  23. //                2.2 : 处理完消息,删除消息
  24.                     delayQueue.remove(message);
  25.                 }).start();
  26.             }
  27.         }
  28.     }
复制代码
实验结果 : 开启线程异步实验消息
  1. 2023-11-03 15:18:33 待处理消息数量:0
  2. 2023-11-03 15:18:34 待处理消息数量:0
  3. 2023-11-03 15:18:35 待处理消息数量:0
  4. 2023-11-03 15:18:36 待处理消息数量:4
  5. 2023-11-03 15:18:36 :1 --> 消息开始处理
  6. 2023-11-03 15:18:36 :13 --> 消息开始处理
  7. 2023-11-03 15:18:36 :5 --> 消息开始处理
  8. 2023-11-03 15:18:36 :9 --> 消息开始处理
  9. 2023-11-03 15:18:37 待处理消息数量:4
  10. 2023-11-03 15:18:37 :1 --> 消息开始处理  // 注意:(此消息被重复消费了)
  11. 2023-11-03 15:18:37 :13 --> 消息开始处理
  12. 2023-11-03 15:18:37 :5 --> 消息开始处理
  13. 2023-11-03 15:18:37 :9 --> 消息开始处理
  14. 2023-11-03 15:18:38 待处理消息数量:8
  15. 2023-11-03 15:18:38 :1 --> 消息开始处理
  16. 2023-11-03 15:18:38 :5 --> 消息开始处理
  17. 2023-11-03 15:18:38 :9 --> 消息开始处理
  18. 2023-11-03 15:18:38 :13 --> 消息开始处理
  19. 2023-11-03 15:18:38 :10 --> 消息开始处理
  20. 2023-11-03 15:18:38 :6 --> 消息开始处理
  21. 2023-11-03 15:18:38 :2 --> 消息开始处理
  22. 2023-11-03 15:18:38 :14 --> 消息开始处理
  23. 2023-11-03 15:18:36 :9 --> 消息处理结束
  24. 2023-11-03 15:18:36 :5 --> 消息处理结束
  25. 2023-11-03 15:18:36 :1 --> 消息处理结束
  26. 2023-11-03 15:18:36 :13 --> 消息处理结束
复制代码
我们利用了开启新线程的方式来消耗消息 , 消息延迟的问题解决了 , 但是又出现了新的问题 , 消息会出现重复消耗的环境
问题的缘故原由 : 我们第一次定时 , 取出了符合条件的4条过期的消息 , 我们开启了4个线程去实验 , 当第二秒 , 我们又获取了符合条件的消息 ,
因为第一次获取的消息实验必要时间 , 那么我们第二次拿消息的时候 , 就会有可能把第一次的4条消息 , 也拿出来 , 然后开线程再次消耗 , 就会出现重复消耗的环境了
解决方案 :
这个问题出现缘故原由是 , 当前线程不知道这个消息已经被其他线程正在处理了 ,只要解决这个问题 ,
当前线程开始处理这个消息,先判断当前消息有没有被其他线程处理 , 如果正在处理,则不举行处理了 , 如果没处理,则开始举行处理
我们知道 redis删除元素的 remove() 方法 , 有一个返回值 , 表示删除的状态 ,
我们可以在消息处理前 , 先 remove() 这个消息 , 如果 remove()成功,则表示当前消息没有被消耗 , 如果 remove()失败,则表示该消息已经被消耗了
  1.     /**
  2.      * 处理已到期的消息(轮询)
  3.      */
  4.     @Scheduled(fixedDelay = 1000)
  5.     public void handleExpiredMessages() {
  6.         String currentTime = getCurrentTime();
  7. //      1 : 扫描任务,并将需要执行的任务加入到任务队列中
  8.         List<DelayMessage> messages = delayQueue.getExpiredMessages();
  9.         System.out.println(currentTime + " 待处理消息数量:" + messages.size());
  10. //      2 : 开始处理消息
  11.         if (!messages.isEmpty()) {
  12.             for (DelayMessage message : messages) {
  13. //                2.1 : 处理消息:先删除消息,获取当前消息是否已经被其他人消费
  14.                 Long remove = delayQueue.remove(message);
  15.                 if (remove > 0) {
  16. //                2.2 : 开启线程异步处理消息:不让处理消息的时间阻塞当前线程
  17.                     new Thread(() -> {
  18.                         System.out.println(currentTime + " :" + message.getId() + " --> 消息开始处理");
  19.                         try {
  20. //                      2.1.1 : 模拟睡眠3秒,任务的处理时间(实际可能会更长)
  21.                             Thread.sleep(3000);
  22.                         } catch (Exception e) {
  23.                             e.printStackTrace();
  24.                         }
  25.                         System.out.println(currentTime + " :" + message.getId() + " --> 消息处理结束");
  26.                     }).start();
  27.                 }
  28.             }
  29.         }
  30.     }
复制代码


  • 实验结果 : 我们会发现 , 重复消耗的问题 , 解决了
  1. 2023-11-03 15:31:36 待处理消息数量:4
  2. 2023-11-03 15:31:36 :1 --> 消息开始处理
  3. 2023-11-03 15:31:36 :13 --> 消息开始处理
  4. 2023-11-03 15:31:36 :5 --> 消息开始处理
  5. 2023-11-03 15:31:36 :9 --> 消息开始处理
  6. 2023-11-03 15:31:37 待处理消息数量:0
  7. 2023-11-03 15:31:38 待处理消息数量:4
  8. 2023-11-03 15:31:38 :10 --> 消息开始处理
  9. 2023-11-03 15:31:38 :14 --> 消息开始处理
  10. 2023-11-03 15:31:38 :2 --> 消息开始处理
  11. 2023-11-03 15:31:38 :6 --> 消息开始处理
  12. 2023-11-03 15:31:36 :9 --> 消息处理结束
  13. 2023-11-03 15:31:36 :5 --> 消息处理结束
  14. 2023-11-03 15:31:36 :13 --> 消息处理结束
  15. 2023-11-03 15:31:36 :1 --> 消息处理结束
  16. 2023-11-03 15:31:39 待处理消息数量:0
  17. 2023-11-03 15:31:40 待处理消息数量:0
  18. 2023-11-03 15:31:38 :10 --> 消息处理结束
  19. 2023-11-03 15:31:38 :2 --> 消息处理结束
  20. 2023-11-03 15:31:38 :6 --> 消息处理结束
  21. 2023-11-03 15:31:38 :14 --> 消息处理结束
  22. 2023-11-03 15:31:41 待处理消息数量:4
  23. 2023-11-03 15:31:41 :11 --> 消息开始处理
  24. 2023-11-03 15:31:41 :15 --> 消息开始处理
  25. 2023-11-03 15:31:41 :3 --> 消息开始处理
  26. 2023-11-03 15:31:41 :7 --> 消息开始处理
  27. 2023-11-03 15:31:42 待处理消息数量:0
  28. 2023-11-03 15:31:43 待处理消息数量:0
  29. 2023-11-03 15:31:41 :7 --> 消息处理结束
  30. 2023-11-03 15:31:41 :11 --> 消息处理结束
  31. 2023-11-03 15:31:41 :3 --> 消息处理结束
  32. 2023-11-03 15:31:41 :15 --> 消息处理结束
复制代码
但是还会出现问题 , 如果服务重启 , 或者服务宕机 , 那么当前实验中的消息 , 在下次服务启动的时候 , 就会出现消息丢失的环境
我给出的解决方案就是 : 创建一张临时数据表 , 当消息开始消耗的时候 ,在表中添加一条纪录,当消息消耗成功,则把临时表中的纪录删除
当服务重启 , 则把临时表中的纪录,读到延迟队列中 , 就解决了消息丢失的环境
关键点

  • 利用 缓存的key带内网ip的方式,解决了集群,多呆板会出现的所有问题.
  • 利用 背景线程,线程池,解决了消息堆积,延迟消耗的问题.
  • 利用 先删除key的方法 , 解决了消息重复消耗的问题.
  • 把当前处理的消息举行持久化,解决了消息丢失的问题.
这个只是我给出的解决方案 , 并不是美满的 , 如果想实现消息队列 , 最好是利用 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

魏晓东

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表