【中间件系列】浅析redis是否得当做消息队列

铁佛  金牌会员 | 2024-6-14 21:20:58 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 988|帖子 988|积分 2964

  用redis也是比较久了,并且也对其他消息中间件也用了相称多的时间,如今就redis是否得当做消息队列来梳理下,获取梳理完之后,可以有一个更加清晰的认知。笔者会从以下几个方面举行梳理。

一、简朴的list消息队列

  众所周知,redis常见的数据结构有StringHashListSetzset。其中List可以是一个列表结构。可以通过LPUSHRPOP两个命令来实现一个简朴的队列。


  • LPUSH 将元素依次插入到列表头部
  • RPOP 获取末了一个元素,并且删除。
  如下图所示:生产者通过LPUSH命令,依次插入a、b、c、d四个元素。斲丧者通过RPOP命令举行斲丧。

1.命令示例

生产者:
  1. # 通过LPUSH命令往test_queue填充a、b、c、d
  2. 127.0.0.1:6379> LPUSH test_queue a
  3. (integer) 1
  4. 127.0.0.1:6379> LPUSH test_queue b
  5. (integer) 2
  6. 127.0.0.1:6379> LPUSH test_queue c
  7. (integer) 3
  8. 127.0.0.1:6379> LPUSH test_queue d
  9. (integer) 4
  10. 127.0.0.1:6379>
复制代码
斲丧者:
  1. 消费者
  2. 127.0.0.1:6379> RPOP test_queue
  3. "a"
  4. 127.0.0.1:6379> RPOP test_queue
  5. "b"
  6. 127.0.0.1:6379> RPOP test_queue
  7. "c"
  8. 127.0.0.1:6379> RPOP test_queue
  9. "d"
  10. 127.0.0.1:6379> RPOP test_queue
  11. (nil)
  12. 127.0.0.1:6379>
复制代码
2.伪代码示例

  生产者相对简朴,以简朴的订单支付为例子
  1. //在订单支付成功之后发送给积分系统
  2. public void afterPayHandler(Order order){
  3.         //发送消息到积分系统
  4.         redisTemp.LPUSH("order",order);
  5. }
复制代码
斲丧者
  1. public void orderMessageListener(){
  2.         while(true){
  3.                 //获取订单信息
  4.                 Order order = redisTemp.RPOP("order");
  5.         if(order != null){
  6.             //做积分系统的业务,如添加积分等逻辑。
  7.         }
  8.         }
  9. }
复制代码
  如上所示:斲丧者在斲丧的时间,必须通过循环一直拉取队列数据,达到数据的实时性,但是也出现了CPU空转的问题。如果我们判断空的时间sleep休眠一段时间,那就会存在消息实时性问题。休眠多久合适就成为了难以处置惩罚的问题。
好在redis有阻塞拉取的命令。
BRPOP test_queue 10(秒)。拉取命令,阻塞10秒。如果是0就是一直阻塞。
3.方案优劣


  • 优点:充足简朴,也很好理解。但是我确实是想不到哪个场景得当这个方案(笑哭)。感觉也只能算遍及知识了。
  • 缺点

    • 不支持多斲丧者。任何一个斲丧者将redis的元素拉取删除之后,其他斲丧者都无法再次拉取到。那就只能仅限于一对一斲丧了。
    • 消息丢失。没有ACK机制,如果拉取消息后宕机后,无法正常斲丧,就会导致消息的丢失。

二、Pub/Sub发布订阅

  List数据结构可以认为是开发者为了简朴方便,从而引进的一种消息队列的方式,但是绝不”正宗“。Pub/Sub这种从名字上可以看出来,就是专门为了消息队列而生的。

  ​ 从上图可以看出,发布订阅模式,解决了多斲丧者的问题。但是还是存在两个问题。
1.消息丢失

  发布订阅模型,没有举行消息存储,只是一个单纯的通道,实时的把消息传送给斲丧者。那么这样就会有一个问题,如果斲丧者中间下线,再次上线的时间,只能从最新的位置举行斲丧,这样就会有消息丢失啦。
2.消息堆积


  上文说,发布订阅模型没有基于任何数据雷同,因此,这个操纵不会写入RDB和AOF中(redis持久化机制)。别的,在消息堆积的时间,数据是通过Buffer缓冲区实现的。这个缓冲区的大小可以在redis中举行设置。如果超过了缓冲区设置的上限,此时,Redis 就会「强制」把这个斲丧者踢下线。
  总的说,这个发布订阅模式相对比较脆弱,虽然解决了多斲丧者的问题,但是消息同等性较低,消息丢失概率较高(发布版本时重启了就大概丢消息),试用的场景较少。
三、相对成熟的Stream

  Redis5.0 中增加了Stream消息队列相对成熟,解决了较多的问题。

  • 支持消息ACK反馈,在消息斲丧乐成的时间,返回斲丧乐成,才不会再次推送消息。
  • 支持多斲丧者组。
  • 消息堆积问题优化。

1.redis命令介绍

发布命令
  1. 解释
  2. #topic为 myStream1
  3. # * 代表使用自动生成的ID作为消息的ID
  4. # 接下来是多个 field value 组成的信息。
  5. 127.0.0.1:6379> XADD myStream1 * name zhangsan sex 20
  6. "1717500637523-0"
  7. 127.0.0.1:6379> XADD myStream1 * name lisi sex 20
  8. "1717500644429-0"
  9. 127.0.0.1:6379>
复制代码
斲丧命令
  1. # 消费myStream队列的10个数据,最后的0意思是从头开始消费。
  2. 127.0.0.1:6379> XREAD COUNT 10 STREAMS myStream1 0
  3. 1) 1) "myStream1"
  4.    2) 1) 1) "1717500637523-0"
  5.          2) 1) "name"
  6.             2) "zhangsan"
  7.             3) "sex"
  8.             4) "20"
  9.       2) 1) "1717500644429-0"
  10.          2) 1) "name"
  11.             2) "lisi"
  12.             3) "sex"
  13.             4) "20"
  14. 127.0.0.1:6379>
复制代码
2.多斲丧者组测试

  1. #创建一个消费者组为myGroup1并且指定消费位置。最后这个长串是信息的id
  2. 127.0.0.1:6379> XGROUP CREATE myStream1 myGroup1 1717500644429-0
  3. OK
复制代码
  1. #消费者组消费,消费者组为myGroup1 当前消费者id为consumer1 拉取10个信息  注意最后这个 ‘>‘
  2. 127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 >
  3. 1) 1) "myStream1"
  4.    2) 1) 1) "1717501299234-0"
  5.          2) 1) "name"
  6.             2) "lisi"
  7.             3) "sex"
  8.             4) "20"
  9. 127.0.0.1:6379>
复制代码
验证ACK机制

  • myGroup1斲丧者组拉取一次之后将所有的消息拉取回来
  • 由于没有举行消息反馈ACK。所以再次拉取的时间,还是将全量的消息拉取回来。
  • 执行一次ACK命令之后,再次拉取消息,发现少了一条消息。
  • 再次执行ACK命令后,拉取不到消息了。
  1. 127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
  2. 1) 1) "myStream1"
  3.    2) 1) 1) "1717501299234-0"
  4.          2) 1) "name"
  5.             2) "lisi"
  6.             3) "sex"
  7.             4) "20"
  8.       2) 1) "1717502213457-0"
  9.          2) 1) "name"
  10.             2) "lisi"
  11.             3) "sex"
  12.             4) "20"
  13. 127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
  14. 1) 1) "myStream1"
  15.    2) 1) 1) "1717501299234-0"
  16.          2) 1) "name"
  17.             2) "lisi"
  18.             3) "sex"
  19.             4) "20"
  20.       2) 1) "1717502213457-0"
  21.          2) 1) "name"
  22.             2) "lisi"
  23.             3) "sex"
  24.             4) "20"
  25. 127.0.0.1:6379>
  26. 127.0.0.1:6379>
  27. 127.0.0.1:6379>
  28. 127.0.0.1:6379> XACK myStream1 myGroup1 1717502213457-0
  29. (integer) 1
  30. 127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
  31. 1) 1) "myStream1"
  32.    2) 1) 1) "1717501299234-0"
  33.          2) 1) "name"
  34.             2) "lisi"
  35.             3) "sex"
  36.             4) "20"
  37. 127.0.0.1:6379> XACK myStream1 myGroup1 1717501299234-0
  38. (integer) 1
  39. 127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
  40. 1) 1) "myStream1"
  41.    2) (empty list or set)
  42. 127.0.0.1:6379>
复制代码
3.Stream会持久化吗?

  会,不管是RDB 还是AOF都会写入。所以不用担心宕机的问题。
4.消息堆积如何解决?

  既然会将Stream会举行持久化,那么一定消息也会生存在内存中,但是为了内存爆炸,Stream可以在XADD命令的时间,可以通过MAXLEN命令指定消息的最大长度,在超过最大长度的时间,旧消息会被删除,只保留固定长度的新消息。这样看来,消息堆积的问题只是举行了优化,并没有完美的解决
总结

  到此,获取对于redis消息队列的历史有了肯定的了解,redis作为运行在内存的数据库而言,这个功能已经是很不错了,或许你的场景充足简朴,消息的数目不多,并且对于消息的丢失不是特别的敏感的话,redis的Stream消息队列也是一个不错的选择。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

铁佛

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表