基于redis实现延长队列

打印 上一主题 下一主题

主题 1048|帖子 1048|积分 3144

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Redis实现延时队列

延时队列里装的紧张是延时任务,用延时队列来维护延时任务的实行时间。
1、延时队列有哪些使用情景?

1、如果哀求加锁没加成功
可以将这个哀求扔到延时队列里,延后处理。
2、业务中有延时任务的必要
比如说,文章定时发布。
2、基于redis实现延时队列

2.1.使用zset实现



  • 使用redis的zset来实现延时队列
  • 使用zset的添加、查询、删除下令

    • ZADD下令:
       
    1. ZADD key score member [score member...]
    复制代码
       

    • ZRANGEBYSCORE下令:
       
    1. ZRANGEBYSCORE key min max
    复制代码
       

    • ZREM下令:
       
    1. ZREM key member [member ...]
    复制代码
       
  • 1)将延时任务的【到期实行时间作为zset的score】、【延时任务序列化为一个字符串作为zset的member】使用zadd下令装进zset中。
  • 2)zset会为这些延时任务按照到期实行时间排序。
  • 3)设置多线程轮询zset获取到期的任务 | 用@Schedule注解标注定时轮询
  • 4)在zset中删除这些到期任务
  • 5)将这些到期任务放进list中,在list中使用【blpop/brpop】消费任务。
优化1:
对于第3步来说,由于是多线程,所以同一个到期任务可能会被多个进程获取到,然后再使用 zrem 举行争抢。终极一定只有一个进程zrem成功。因此对于那些zrem没成功的进程相当于白取任务。所以我们可以进一步优化:【使用 lua scripting 让 zrangebyscore 和 zrem 一同挪到服务器端举行原子化操作】。
优化2:
使用redis管道操作:通过管道方式将获取到的到期任务push到list中。

优化:redis管道

1)Redis的消息交互


客户端将哀求传送给服务器,服务器处理完毕后,再将响应回复给客户端。这要耗费一个网络数据包来回的时间。
   具体过程是如许的:
  1、客户端进程调用 write 将消息写到操作系统内核为套接字分配的发送缓冲 send buffer。
  2、客户端操作系统内核将发送缓冲的内容发送到网卡,网卡硬件将数据通过「网际路由」送到服务器的网卡。
  3、服务器操作系统内核将网卡的数据放到内核为套接字分配的接收缓冲 recv buffer。
  4、服务器进程调用 read 从接收缓冲中取出消息举行处理。
  5、服务器进程调用 write 将响应消息写到内核为套接字分配的发送缓冲 send buffer。
  6、服务器操作系统内核将发送缓冲的内容发送到网卡,网卡硬件将数据通过「网际路由」送到客户端的网卡。
  7、客户端操作系统内核将网卡的数据放到内核为套接字分配的接收缓冲 recv buffer。
  8、客户端进程调用 read 从接收缓冲中取出消息返回给上层业务逻辑举行处理。
  9、竣事。
  由上面的具体过程,可以知道:
【write操作】只负责将数据写到发送缓冲。但是如果发送缓冲满了,那么就必要等候缓冲空出空闲空间。这个就是写操作 IO 操作的真正耗时。
【read操作】只负责将数据从接收缓冲中取出来就完事了。如果缓冲是空的,那么就必要等候数据到来,这个就是读操作 IO 操作的真正耗时。
因此,一条消息的消息交互所耗费的时间紧张在于:
write 操作几乎没有耗时,直接写到发送缓冲就返回,而 read 就会比力耗时了,因为它要等候消息颠末网络路由到目的机器处理后的响应消息,再回送到当前的内核读缓冲才可以返回。这才是一个网络来回的真正开销。
2)连续多条消息指令的消息交互

(1)普通方式:

如果要实行多条消息指令,则必要耗费多个网络来回的时间。
(2)使用redis管道:
管道方式是客户端通过将多条下令white到发送缓存中,然后再一次性发送到服务端的缓存中,服务端处理完这些下令后将响应效果写到发送缓存中,最后再发送给接收缓存,客户端第一次read的时候,必要等候一个网络来回,而后续的read操作直接从接收缓存中取就行了,因此总的耗费时间只有一个网络来回的时间。

以上两种方式的示例代码:
  1. public class RedisPipelineTestDemo {
  2.    public static void main(String[] args) {
  3.        //连接redis
  4.        Jedis jedis = new Jedis("10.101.17.180", 6379);
  5.        //jedis逐一给每个set新增一个value
  6.        String zSetKey = "Pipeline-test-set";
  7.        int size = 100000;
  8.         
  9.        //普通方式
  10.        long begin = System.currentTimeMillis();
  11.        for (int i = 0; i < size; i++) {
  12.            jedis.sadd(zSetKey + i, "aaa");
  13.        }
  14.        log.info("Jedis逐一给每个set新增一个value耗时:{}ms", (System.currentTimeMillis() - begin));
  15.        //管道方式      
  16.        Pipeline Pipeline = jedis.Pipelined();
  17.        begin = System.currentTimeMillis();
  18.        for (int i = 0; i < size; i++) {            
  19.            Pipeline.sadd(zSetKey + i, "bbb");
  20.        }        
  21.        Pipeline.sync();
  22.        log.info("Jedis Pipeline模式耗时:{}ms", (System.currentTimeMillis() - begin));
  23.    }
  24. }
  25. 普通方式:162655ms
  26. 管道方式:504ms
复制代码
2.2.使用redis键空间通知

redis的PubSub,发布者订阅者模子。

   摘自javaguide:
  在 pub/sub 模式下,生产者必要指定消息发送到哪个 channel 中,而消费者则订阅对应的 channel 以获取消息。
  Redis 中有很多默认的 channel,这些 channel 是由 Redis 本身向它们发送消息的,而不是我们本身编写的代码。其中,__keyevent@0__:expired 就是一个默认的 channel,负责监听 key 的过期变乱。也就是说,当一个 key 过期之后,Redis 会发布一个 key 过期的变乱到__keyevent@<db>__:expired这个 channel 中。
  我们只必要监听这个 channel,就可以拿到过期的 key 的消息,进而实现了延时任务功能。
  这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。
  代码实现:
Spring已经实现了监听__keyevent@*__:expired这个channel这个功能,__keyevent@*__:expired中的*代表通配符的意思,监听所有的数据库。
在设置类中:
  1. @Configuration
  2. public class RedisConfiguration {
  3.    @Bean
  4.    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
  5.        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
  6.        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
  7.        return redisMessageListenerContainer;
  8.    }
  9.    @Bean
  10.    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
  11.        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
  12.    }
  13.    
  14.    /**
  15.    KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听
  16.    
  17.    当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件
  18.    **/
  19. }
复制代码
所以我们只必要监听RedisKeyExpiredEvent变乱就可以拿到过期消息的Key,也就是延长消息。对RedisKeyExpiredEvent变乱的监听实现MyRedisKeyExpiredEventListener。
  1. @Component
  2. public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {
  3.    @Override
  4.    public void onApplicationEvent(RedisKeyExpiredEvent event) {
  5.        byte[] body = event.getSource();
  6.        System.out.println("获取到延迟消息:" + new String(body));
  7.    }
  8. }
复制代码
缺点:
1、可否及时的监听到过期键过期,取决于【redis的过期键删除策略】。由于可能出现redis过期键已颠末期,但是redis还未将其删除从而导致无法监听到过期键过期。使得最后消息延长。

补充:redis过期键删除策略



  • 定时删除
  • 惰性删除
1)定时删除

Redis 默认会每秒举行十次过期扫描,过期扫描不会遍历过期字典中所有的 key,而是接纳了一种简朴的贪心策略。
1、从过期字典中随机 20 个 key;
2、删除这 20 个 key 中已颠末期的 key;
3、如果过期的 key 比率超过 1/4,那就重复步骤 1;
同时,为了保证过期扫描不会出现循环太过,导致线程卡死征象,算法还增加了扫描时间的上限,默认不会超过【 25ms】。
大量key同时过期存在的问题:
但是如果一个Redis 实例中所有的 key 在同一时间过期,即使有25ms的扫描时间上限,如果此时有101 个客户端同时将哀求发过来,25ms后才能处理一个客户端的哀求,然后25ms后再处理一个客户端的哀求,那么第101个客户端必要等候2500ms后才能被处理哀求。
大量key同时过期解决方法:
给过期时间设置一个随机范围,而不能全部在同一时间过期。
2)惰性删除

在客户端访问这个 key 的时候,redis 对 key 的过期时间举行检查,如果过期了就立即删除。

参考:

《Redis深度历险》
Redis常晤面试题总结(上) | JavaGuide


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表