论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
ToB门户
了解全球最新的ToB事件
博客
Blog
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
IT评测·应用市场-qidao123.com
»
论坛
›
物联网
›
物联网
›
【中间件系列】浅析redis是否得当做消息队列 ...
【中间件系列】浅析redis是否得当做消息队列
铁佛
金牌会员
|
2024-6-14 21:20:58
|
显示全部楼层
|
阅读模式
楼主
主题
988
|
帖子
988
|
积分
2964
用redis也是比较久了,并且也对其他消息中间件也用了相称多的时间,如今就redis是否得当做消息队列来梳理下,获取梳理完之后,可以有一个更加清晰的认知。笔者会从以下几个方面举行梳理。
一、简朴的list消息队列
众所周知,redis常见的数据结构有
String
、
Hash
、
List
、
Set
、
zset
。其中List可以是一个列表结构。可以通过
LPUSH
、
RPOP
两个命令来实现一个简朴的队列。
LPUSH 将元素依次插入到列表头部
RPOP 获取末了一个元素,并且删除。
如下图所示:生产者通过LPUSH命令,依次插入a、b、c、d四个元素。斲丧者通过RPOP命令举行斲丧。
1.命令示例
生产者:
# 通过LPUSH命令往test_queue填充a、b、c、d
127.0.0.1:6379> LPUSH test_queue a
(integer) 1
127.0.0.1:6379> LPUSH test_queue b
(integer) 2
127.0.0.1:6379> LPUSH test_queue c
(integer) 3
127.0.0.1:6379> LPUSH test_queue d
(integer) 4
127.0.0.1:6379>
复制代码
斲丧者:
消费者
127.0.0.1:6379> RPOP test_queue
"a"
127.0.0.1:6379> RPOP test_queue
"b"
127.0.0.1:6379> RPOP test_queue
"c"
127.0.0.1:6379> RPOP test_queue
"d"
127.0.0.1:6379> RPOP test_queue
(nil)
127.0.0.1:6379>
复制代码
2.伪代码示例
生产者相对简朴,以简朴的订单支付为例子
//在订单支付成功之后发送给积分系统
public void afterPayHandler(Order order){
//发送消息到积分系统
redisTemp.LPUSH("order",order);
}
复制代码
斲丧者
public void orderMessageListener(){
while(true){
//获取订单信息
Order order = redisTemp.RPOP("order");
if(order != null){
//做积分系统的业务,如添加积分等逻辑。
}
}
}
复制代码
如上所示:斲丧者在斲丧的时间,必须通过循环一直拉取队列数据,达到数据的实时性,但是也出现了CPU空转的问题。如果我们判断空的时间sleep休眠一段时间,那就会存在消息实时性问题。休眠多久合适就成为了难以处置惩罚的问题。
好在redis有阻塞拉取的命令。
BRPOP test_queue 10(秒)。拉取命令,阻塞10秒。如果是0就是一直阻塞。
3.方案优劣
优点
:充足简朴,也很好理解。但是我确实是想不到哪个场景得当这个方案(笑哭)。感觉也只能算遍及知识了。
缺点
:
不支持多斲丧者。任何一个斲丧者将redis的元素拉取删除之后,其他斲丧者都无法再次拉取到。那就只能仅限于一对一斲丧了。
消息丢失。没有ACK机制,如果拉取消息后宕机后,无法正常斲丧,就会导致消息的丢失。
二、Pub/Sub发布订阅
List数据结构可以认为是开发者为了简朴方便,从而引进的一种消息队列的方式,但是绝不”正宗“。Pub/Sub这种从名字上可以看出来,就是
专门
为了消息队列而生的。
从上图可以看出,发布订阅模式,解决了多斲丧者的问题。但是还是存在两个问题。
1.消息丢失
发布订阅模型,没有举行消息存储,只是一个单纯的
通道
,实时的把消息传送给斲丧者。那么这样就会有一个问题,如果斲丧者中间下线,再次上线的时间,只能从最新的位置举行斲丧,这样就会有
消息丢失
啦。
2.消息堆积
上文说,发布订阅模型没有基于任何数据雷同,因此,这个操纵不会写入RDB和AOF中(redis持久化机制)。别的,在消息堆积的时间,数据是通过Buffer缓冲区实现的。这个缓冲区的大小可以在redis中举行设置。如果超过了缓冲区设置的上限,此时,Redis 就会「强制」把这个斲丧者踢下线。
总的说,这个发布订阅模式相对比较脆弱,虽然解决了多斲丧者的问题,但是消息同等性较低,消息丢失概率较高(发布版本时重启了就大概丢消息),试用的场景较少。
三、相对成熟的Stream
Redis5.0 中增加了Stream消息队列相对成熟,解决了较多的问题。
支持消息ACK反馈,在消息斲丧乐成的时间,返回斲丧乐成,才不会再次推送消息。
支持多斲丧者组。
消息堆积问题优化。
1.redis命令介绍
发布命令
解释
#topic为 myStream1
# * 代表使用自动生成的ID作为消息的ID
# 接下来是多个 field value 组成的信息。
127.0.0.1:6379> XADD myStream1 * name zhangsan sex 20
"1717500637523-0"
127.0.0.1:6379> XADD myStream1 * name lisi sex 20
"1717500644429-0"
127.0.0.1:6379>
复制代码
斲丧命令
# 消费myStream队列的10个数据,最后的0意思是从头开始消费。
127.0.0.1:6379> XREAD COUNT 10 STREAMS myStream1 0
1) 1) "myStream1"
2) 1) 1) "1717500637523-0"
2) 1) "name"
2) "zhangsan"
3) "sex"
4) "20"
2) 1) "1717500644429-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
127.0.0.1:6379>
复制代码
2.多斲丧者组测试
#创建一个消费者组为myGroup1并且指定消费位置。最后这个长串是信息的id
127.0.0.1:6379> XGROUP CREATE myStream1 myGroup1 1717500644429-0
OK
复制代码
#消费者组消费,消费者组为myGroup1 当前消费者id为consumer1 拉取10个信息 注意最后这个 ‘>‘
127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 >
1) 1) "myStream1"
2) 1) 1) "1717501299234-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
127.0.0.1:6379>
复制代码
验证ACK机制
myGroup1斲丧者组拉取一次之后将所有的消息拉取回来
由于没有举行消息反馈ACK。所以再次拉取的时间,还是将全量的消息拉取回来。
执行一次ACK命令之后,再次拉取消息,发现少了一条消息。
再次执行ACK命令后,拉取不到消息了。
127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) "myStream1"
2) 1) 1) "1717501299234-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
2) 1) "1717502213457-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) "myStream1"
2) 1) 1) "1717501299234-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
2) 1) "1717502213457-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379> XACK myStream1 myGroup1 1717502213457-0
(integer) 1
127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) "myStream1"
2) 1) 1) "1717501299234-0"
2) 1) "name"
2) "lisi"
3) "sex"
4) "20"
127.0.0.1:6379> XACK myStream1 myGroup1 1717501299234-0
(integer) 1
127.0.0.1:6379> XREADGROUP GROUP myGroup1 consumer1 COUNT 10 STREAMS myStream1 0
1) 1) "myStream1"
2) (empty list or set)
127.0.0.1:6379>
复制代码
3.Stream会持久化吗?
会,不管是RDB 还是AOF都会写入。所以不用担心宕机的问题。
4.消息堆积如何解决?
既然会将Stream会举行持久化,那么一定消息也会生存在内存中,但是为了内存爆炸,Stream可以在XADD命令的时间,可以通过MAXLEN命令指定消息的最大长度,在超过最大长度的时间,旧消息会被删除,只保留固定长度的新消息。这样看来,消息堆积的问题
只是举行了优化,并没有完美的解决
。
总结
到此,获取对于redis消息队列的历史有了肯定的了解,redis作为运行在内存的数据库而言,这个功能已经是很不错了,或许你的场景充足简朴,消息的数目不多,并且对于消息的丢失不是
特别的敏感
的话,redis的Stream消息队列也是一个不错的选择。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
使用道具
举报
0 个回复
倒序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
回帖后跳转到最后一页
发新帖
回复
铁佛
金牌会员
这个人很懒什么都没写!
楼主热帖
R语言使用dplyr包的arrange函数对dataf ...
Visual Studio 2022 安装低版本的 .Net ...
手把手教你入门Python中的Web开发框架 ...
身为一个测试工程师只会点点点?我劝您 ...
Apifox:节省研发团队的每一分钟 ...
通过cookie和localstorage实现数据持久 ...
.net6下使用DotnetZip解压文件,中文出 ...
.Net Core 5.x Api开发笔记 -- Swagger ...
实现华为多屏协同--非华为电脑下载12.0 ...
反射(一)-常用方法及加载资源文件 ...
标签云
运维
CIO
存储
服务器
浏览过的版块
移动端开发
快速回复
返回顶部
返回列表