运维.售后
论坛
潜水/灌水快乐,沉淀知识,认识更多同行。
ToB圈子
加入IT圈,遇到更多同好之人。
朋友圈
看朋友圈动态,了解ToB世界。
博客
Blog
ToB门户
了解全球最新的ToB事件
排行榜
Ranklist
文库
业界最专业的IT文库,上传资料也可以赚钱
下载
分享
Share
导读
Guide
相册
Album
记录
Doing
搜索
本版
文章
帖子
ToB圈子
用户
免费入驻
产品入驻
解决方案入驻
公司入驻
案例入驻
登录
·
注册
只需一步,快速开始
账号登录
立即注册
找回密码
用户名
Email
自动登录
找回密码
密码
登录
立即注册
首页
找靠谱产品
找解决方案
找靠谱公司
找案例
找对的人
专家智库
悬赏任务
圈子
SAAS
ToB企服应用市场:ToB评测及商务社交产业平台
»
论坛
›
软件与程序人生
›
后端开发
›
Java
›
RocketMQ的push消费方式实现的太聪明了
RocketMQ的push消费方式实现的太聪明了
农妇山泉一亩田
金牌会员
|
2022-9-16 17:17:55
|
显示全部楼层
|
阅读模式
楼主
主题
880
|
帖子
880
|
积分
2640
大家好,我是三友,我又来了~~
最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,来掰扯一下,防止过两天就忘得一干二净了。
MQ消费方式
消费方式就是指消费者如何从MQ中获取到消息,分为两种方式,push(推方式)和pull(拉方式)。
1、push(推方式)
push,顾名思义,就是推的意思。就是当MQ收到生产者产生的消息的时候,会主动将消息推送到消费者进行消费,这种模式就叫push,也就是MQ将消息推给到消费者的意思。
push模式push这种模式的好处就是响应快,消息的实时性比较高,一旦消息MQ收到消息,那么就能立马将消息推送给消费者,消费者也就能立马收到消息进行消费。
但是这种push的模式,有个缺点就是一旦消息量比较大时,对消费者性能要求比较高,因为是消费者无法控制MQ消息的推送速度,一旦消息量大,那么消费者消费消息的压力就比较大。
2、pull(拉方式)
push是MQ主动给消费者推消息,那么pull呢?刚好跟push相反,就是消费者主动去MQ中拉取消息。
pull模式那么pull的优缺点自然也就跟push刚好相反。因为是消费者主动去MQ中拉取消息,那么消费者可根据自身消费的情况,决定何时去拉取消息,主动权在自己手上,这样消费者的压力就会相对小点;但是缺点也很明显,那么就会实时性相对于push方式会低一些,因为你得决定拉的时间间隔。
其实想想,消费方式就跟拿快递一样,快递就是一个消息,我自己就是消费者,快递要么快递小哥主动送(push)到家,要么我自己去快递站拿(pull)。
RocketMQ对于消费方式的实现
上一节说了消费消息的两种方式push和pull,或者说算一种理念。尚大的周阳老师有一句经常说的话我比较赞同,那就是“天上飞的理念,必然有落地的实现”。所以push或者pull到底如何落地,得看具体的MQ的产品了。
而RocketMQ作为阿里开源的一款高性能、功能丰富的MQ,自然同时实现了push和pull的两种消费方式,用户可以选择在项目中使用push还是pull。
push模式的实现
pull模式的实现但是一般情况下,项目中都是使用push的方式来消费,因为pull除了时实性差外,pull方式还得让开发人员主动去维护消息消费进度,增加额外的操作。
所以接下来就着重讲一下RocketMQ是如何实现push的逻辑。
RocketMQ聪明地实现push的原因
上文说到push模式的优点是时实性好,但是缺点就是消费者压力会比较大,所以,难道实现push模式,只能舍弃压力的控制么?
就在这时,RocketMQ大喊了一声
是的,RocketMQ对于push模式做到了实时和压力的平衡,这主要是因为RocketMQ的push模式其实算是一个“伪push”模式,真正底层的实现还是基于pull。
到这里可能有的小伙伴比较迷糊,怎么push变成“伪push”了,还是用pull实现的,到底是push还是pull?
前面我说过,push和pull只是一种理论,具体的实现看MQ。
所以RocketMQ为了兼顾两者,就选择通过消费者主动拉消息来实现push的效果,这也是为什么我称为“伪push”的原因,RocketMQ都给封装好了,让你用起来感觉是MQ主动push消息给你的。
既然底层是pull,那么RokcetMQ在实现消费者的逻辑的时候,就可以很容易实现控制压力的效果,毕竟这是“拉”方式天然自带的buff;但是如何通过pull实现push的时实的优点呢?毕竟鱼和熊掌我RokcetMQ偏要兼得。
这时这就不得不提到一种叫“长轮询”的机制。
轮询与长轮询
轮询与长轮询都属于pull的实现,都是由客户端主动给服务端发送请求,拉取数据。套到MQ中,就是都是消费者主动去MQ拉消息。
轮询
轮询是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。
再拿快递举例子,轮询就好比,小明买的iphone 13 pro max快递到了,显示正在派送中,但是小明等不及了,于是就去快递站拿,但是快递还没放到快递站,但是小明的心里急啊,他忍受不了相思之苦,于是小明每隔5分钟就往快递站跑一次,问一下快递到了没,到了就拿回来。这就是轮询的意思,也就是不论有没有数据,客户端都会每隔一定时间去请求一次服务端。
来分析一下拿快递的例子的问题:
每隔5分钟就往快递站跑,那不是累死个小明么。
还有一个问题,假设刚跑到快递站,快递没到,就回去了,但是刚到家的时候,快递到了,于是又等了5分钟,再去快递站终于拿到快递了,但是其实快递都到了几分钟了,你还是没有第一时间拿到快递,这就造成了延迟。
从而对应到程序中,就是会产生如下问题
对于消息而言,会一直产生,这就要求消费者不停地间隔一定时间去拉取消息,即使没有消息也需要去请求,就会造成大量无用的请求,白白浪费大量耗费服务器内存和宽带资源。
可能造成数据的延迟
长轮询
说长轮询概念之前,先来救救小明吧,毕竟小明可不想狗带。
既然原先小明每隔5分钟跑一次,那么是不是可以换种思路,当快递还没到的时候,让小明不要回来,直接在快递站待着,当快递到的时候,才让小明拿着快递回家。这下小明就喜死了,既可以有时间刷刷某音,逛逛某东,还可以在第一时间拿到13 pro max。
所以这种可以在快递站等待的机制,就叫长轮询。
长轮询也是客户端请求服务端,如果服务端有数据,那么就立马返回,客户端再次请求;当服务端不存在数据的时候,服务端并不会给客户端响应,而是将请求给hold住,当服务端有数据的时候才会给客户端响应,返回数据。
所以长轮询可以解决如下问题
解决轮询带来的频繁请求服务端但是没有的问题
一旦新的数据到了,那么消费者能立马就可以获取到新的数据,所以从效果上,有点像是push的感觉。
但是长轮询也会带来服务端代码实现逻辑复杂的问题,当然相比于优点来说,都不太重要。
push消费方式源码探究
理论都讲完了,接下来就到了show me the code的时间了,来看看RocketMQ的是如何通过长轮询机制来实现压力和时实的平衡。
这里我画了一张push模式下消费者消费流程图。
消费者拉取消息的逻辑
①消费者有一个后台线程,会去处理拉取消息(PullRequest)
②先去判断有没有过多消息没有消费,如果有的话,那么就间隔一定时间再次从①开始执行拉取消息的逻辑
③消费者没有过多消息没有消费,那么就会直接向MQ发送拉取消息的请求,有消息就返回,没有消息就hold住请求,等有新的消息到的时候才返回
④消费者获取到消息之后,会去找用户自定义的消息处理逻辑的实现(MessageListener的实现)去消费消息,同时会再次拉取消息,继续从①开始执行逻辑
1、消费者拉取消息控制压力源码
当消费者准备去拉消息的时候,会先去判断当前消费者消费的压力再决定是否去拉取消息。
RocketMQ提供了两种判断消费压力逻辑,一种是基于还未消费的消息的数量的大小,还有一种是基于还未消费的消息所占内存的大小。
控制压力源码
判断还未消费消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑
判断还未消费消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑
总的一句话就是,当消费者消费的压力过大时,就不会去拉取消息,而是等待一定的时间再去执行拉取消息的逻辑,如果压力还是很大,就还继续等,如此循环,直到消费者的消费压力小于阈值的时候,才会真正的发送请求到MQ中拉取消息。
2、MQ将请求hold住源码
当服务端未找到消息时,就将请求进行挂起,存起来
请求hold住源码拉取不到消息时,会调用PullRequestHoldService的suspendPullRequest方法讲请求存储起来。PullRequestHoldService是用来存储拉取请求的类。
PullRequestHoldServicesuspendPullRequest方法会将请求分类,放到ManyPullRequest里,然后用一个ConcurrentHashMap进行存储
3、MQ收到消息响应给消费者的源码
NotifyMessageArrivingListener当生产者发送的消息达到MQ的时候,MQ会回调NotifyMessageArrivingListener的arriving方法,之后就会调用PullRequestHoldService的notifyMessageArriving方法,MQ会重新处理拉取消息的逻辑,此时就能找到最新来的那条消息,从而将最新的消息通过网络返回给消费者。
notifyMessageArriving和返回消息逻辑
最后
所以从以上的分析可以看出,RocketMQ对于push的消费方式的实现是基于长轮询机制来实现的,同时平衡了时实和压力,这其实就很nice了。
最后我想说一句,其实不论是pull还是push,又或是轮询和长轮询,其实都是一种理论或者说是一种思想,不单单是MQ的东西,就比如在Nacos中,也使用了push和长轮询机制。但是这些理论在不同产品的具体实现,实现方式可能不太一样,但都是大同小异,所以当你懂了这些思想,再看其它框架的源码,其实就很容易了。
最后的最后,我再说一句,终于***发年终奖了。。
本文如果对你有点帮助,还请帮忙点赞、在看、转发、非常感谢。
往期热门文章推荐
三万字盘点Spring/Boot的那些常用扩展点
一网打尽异步神器CompletableFuture
@Async注解的坑,小心
万字+28张图带你探秘小而美的规则引擎框
7000字+24张图带你彻底弄懂线程池
扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
本帖子中包含更多资源
您需要
登录
才可以下载或查看,没有账号?
立即注册
x
回复
使用道具
举报
0 个回复
正序浏览
返回列表
快速回复
高级模式
B
Color
Image
Link
Quote
Code
Smilies
您需要登录后才可以回帖
登录
or
立即注册
本版积分规则
发表回复
回帖并转播
发新帖
回复
农妇山泉一亩田
金牌会员
这个人很懒什么都没写!
楼主热帖
厉害了,腾讯云云巢荣获信通院“云原生 ...
2022 春节抖音视频红包系统设计与实现 ...
数据库概述
【Redis高手修炼之路】②Redis的五大数 ...
原型设计工具比较及实践--滴爱音乐 ...
多线程(一)-两种创建线程的方式 ...
OpenHarmony和HarmonyOS有什么区别?这 ...
ABP Framework 5.3.0 版本新增功能和变 ...
Kafka原理介绍+安装+基本操作(kafka o ...
【视频】k8s套娃开发调试dapr应用 - 在 ...
标签云
存储
服务器
快速回复
返回顶部
返回列表