星球的眼睛 发表于 2024-8-5 03:36:10

RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?

弁言:在当今的大数据和分布式体系中,消息队列扮演着至关重要的角色,它们作为体系之间通讯和数据传输的媒介,为各种场景下的数据流动提供了可靠的底子设施支持。在消息队列的设计中,推拉模式是两种常见的消息转达机制,它们各自具有独特的上风和实用场景。本文将聚焦于两个著名的消息队列体系:RocketMQ 和 Kafka,并探究它们在消息转达过程中是如何实现拉模式的。虽然两者都选择了拉模式,但它们的详细实现方式略有不同,从内部机制到性能优化,都反映了对不同应用场景的思考和针对性的改进。
题目

RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?
保举解析

那到底是推还是拉?

推模式和拉模式各有优缺点,到底该如何选择呢?
RocketMQ 和 Kafka 都选择了拉模式,当然业界也有基于推模式的消息队列如 ActiveMQ。
我个人觉得拉模式更加的符合,因为现在的消息队列都有持久化消息的需求,也就是说本身它就有个存储功能,它的任务就是接受消息,生存好消息使得消费者可以消费消息即可。
而消费者各种各样,身为 Broker 不应该有依赖于消费者的倾向,我已经为你生存好消息了,你要就来拿好了。
虽说一般而言 Broker 不会成为瓶颈,因为消费端有业务斲丧比较慢,但是 Broker 毕竟是一个中心点,能轻量就尽量轻量。
那么竟然 RocketMQ 和 Kafka 都选择了拉模式,它们就不怕拉模式的缺点么? 怕,以是它们操作了一波,减轻了拉模式的缺点。
长轮询

RocketMQ 和 Kafka 都是利用“长轮询”来实现拉模式,我们就来看看它们是如何操作的。
为了简单化,下面我把消息不满足本次拉取的条数啊、总巨细啊等等都统一描述成还没有消息,反正都是不满足条件。
RocketMQ 中的长轮询

RocketMQ 中的 PushConsumer 其实是披着拉模式的方法,只是看起来像推模式而已。
因为 RocketMQ 在被背后偷偷的帮我们去 Broker 哀求数据了。
配景会有个 RebalanceService 线程,这个线程会根据 topic 的队列数量和当前消费组的消费者个数做负载均衡,每个队列产生的 pullRequest 放入壅闭队列 pullRequestQueue 中。然后又有个PullMessageService 线程不绝的从壅闭队列 pullRequestQueue 中获取 pullRequest,然后通过网络哀求 broker,如许实现的准实时拉取消息。
这一部分代码我不截了,就是这么个事儿,稍后会用图来展示。
然后 Broker 的 PullMessageProcessor 内里的 processRequest 方法是用来处理拉消息哀求的,有消息就直接返回,如果没有消息怎么办呢?我们来看一下代码。
https://img-blog.csdnimg.cn/img_convert/538cc582c4c0eeba0916575fb3c8d06a.jpeg
我们再来看下 suspendPullRequest 方法做了什么。
https://img-blog.csdnimg.cn/img_convert/ce0f7dda7e596e3fdca8ac0797c46d8f.jpeg
而 PullRequestHoldService 这个线程会每 5 秒从 pullRequestTable 取PullRequest哀求,然后看看待拉取消息哀求的偏移量是否小于当前消费队列最大偏移量,如果条件成立则说明有新消息了,则会调用 notifyMessageArriving ,终极调用 PullMessageProcessor 的 executeRequestWhenWakeup() 方法重新实行处理这个消息的哀求,也就是再来一次,整个长轮询的时间默认 30 秒。
https://img-blog.csdnimg.cn/img_convert/c731a9a9ee4e92eed673344380069a23.jpeg
简单的说就是 5 秒会检查一次消息时候到了,如果到了则调用 processRequest 再处理一次。这好像不太实时啊? 5秒?
别急,还有个 ReputMessageService 线程,这个线程用来不绝地从 commitLog 中解析数据并分发哀求,构建出 ConsumeQueue 和 IndexFile 两种范例的数据,并且也会有唤醒哀求的操作,来弥补每 5s 一次这么慢的延伸
代码我就不截了,就是消息写入并且会调用 pullRequestHoldService 的 notifyMessageArriving 方法。
最后我再来画个图,描述一下整个流程。
https://img-blog.csdnimg.cn/img_convert/d8102061b30d0a023df554d395171158.jpeg
Kafka 中的长轮询

像 Kafka 在拉哀求中有参数,可以使得消费者哀求在 “长轮询” 中壅闭等待。
简单的说就是消费者去 Broker 拉消息,定义了一个超时时间,也就是说消费者去哀求消息,如果有的话马上返回消息,如果没有的话消费者等着直到超时,然后再次发起拉消息哀求。
并且 Broker 也得配合,如果消费者哀求过来,有消息肯定马上返回,没有消息那就创建一个延伸操作,等条件满足了再返回。
我们来简单的看一下源码,为了突出重点,我会删减一些代码。
先来看消费者端的代码。
https://img-blog.csdnimg.cn/img_convert/36bf227aa48efe630d880d92e931ce3a.jpeg
上面那个 poll 接口想必各人都很熟悉,其实从注解直接就知道了确实是等待数据的到来或者超时,我们再简单的往下看。
https://img-blog.csdnimg.cn/img_convert/a22aec0bb0cf80422b5c271458b7cb87.jpeg
我们再来看下终极 client.poll 调用的是什么。
https://img-blog.csdnimg.cn/img_convert/6339690767d4f950f2a3349b02440c84.jpeg
最后调用的就是 Kafka 包装过的 selector,而终极会调用 Java nio 的 select(timeout)。
现在消费者端的代码已经清晰了,我们再来看看 Broker 如何做的。
Broker 处理全部哀求的入口其实我在之前的文章先容过,就在 KafkaApis.scala 文件的 handle 方法下,这次的主角就是 handleFetchRequest 。
https://img-blog.csdnimg.cn/img_convert/fcddf8b720ede3a52c0df408f6c6c3ff.jpeg
这个方法进来,我截取最重要的部分。
https://img-blog.csdnimg.cn/img_convert/3354bef130166bdc624afdf60969220e.jpeg
下面的图片就是 fetchMessages 方法内部实现,源码给的注释已经很清晰了,各人放大图片看下即可。
https://img-blog.csdnimg.cn/img_convert/4393202598ece8438387f300b1ca379f.jpeg
这个炼狱名字取得很有趣,简单的说就是利用时间轮来执行定时任务,比方这里是 delayedFetchPurgatory ,专门用来处理延伸拉取操作。
我们先简单想一下,这个延伸操作都必要实现哪些方法,起首构建的延伸操作必要有检查机制,来检察消息是否已经到了,然后呢还得有个消息到了之后该执行的方法,还必要有执行完毕之后该干啥的方法,当然还得有个超时之后得干啥的方法。
这几个方法其实对应的就是代码里的 DelayedFetch ,这个类继承了 DelayedOperation 内部有:


[*]isCompleted 检查条件是否满足的方法
[*]tryComplete 条件满足之后执行的方法
[*]onComplete 执行完毕之后调用的方法
[*]onExpiration 逾期之后必要执行的方法
判断是否逾期就是由时间轮来推动判断的,但是总不能等逾期的时候再去看消息到了没吧?
这里 Kafka 和 RocketMQ 的机制一样,也会在消息写入的时候提醒这些延伸哀求消息来了,详细代码我不贴了, 在 ReplicaManager 和 appendRecords 方法内部再深入个两方法可以看到。
不过虽说代码不贴,图还是要画一下的。
https://img-blog.csdnimg.cn/img_convert/59a6989d0de1ff7d1b9d2b7fbc60a52d.jpeg
小结一下

可以看到 RocketMQ 和 Kafka 都是接纳“长轮询”的机制,详细的做法都是通过消费者等待消息,当有消息的时候 Broker 会直接返回消息,如果没有消息都会接纳延伸处理的策略,并且为了保证消息的实时性,在对应队列或者分区有新消息到来的时候都会提醒消息来了,实时返回消息。
一句话说就是消费者和 Broker 相互配合,拉取消息哀求不满足条件的时候 hold 住,避免了多次频繁的拉取动作,当消息一到就提醒返回。
最后

总的而言推拉模式各有优劣,而我个人觉得一般情况下拉模式更得当于消息队列。
其他补充

鱼聪明 AI 的回答:
   鱼聪明 AI 地址:https://www.yucongming.com/
RocketMQ

优点:

[*]高吞吐量和低延伸: RocketMQ 可以或许处理每秒数百万条消息的高吞吐量,并提供低延伸的消息转达本领,得当必要快速数据处理和相应的场景。
[*]丰富的消息模型支持: RocketMQ 支持丰富的消息模型,包括次序消息、变乱消息等,可以或许满足复杂应用中对消息可靠性和次序性的需求。
[*]分布式特性和高可用性: RocketMQ 设计为分布式体系,具备精良的程度扩展性和高可用性,支持消息的持久化存储和数据备份,保障数据的安全性和可靠性。
[*]灵活的摆设模式: RocketMQ 支持多种摆设模式,包括单机摆设、主从模式和集群摆设,可以根据应用需求选择符合的摆设方式。
缺点:

[*]社区和生态体系相对较小: 相较于Kafka,RocketMQ 的社区规模和生态体系相对较小,可能在某些特定功能支持和第三方集成方面不如Kafka成熟。
[*]复杂性较高: RocketMQ 在设置和管理上相对较复杂,必要肯定的运维经验和专业知识来保障体系的稳固性和性能表现。
重要利用场景:


[*]电商和金融行业的实时数据处理: RocketMQ 的低延伸和高吞吐量特性使其非常得当处理电商和金融领域的实时数据流,如订单处理、付出通知等。
[*]分布式变乱应用: RocketMQ 的变乱消息模型支持确保消息的可靠性转达,得当必要变乱性保证的应用场景,如分布式变乱、应用步伐集成等。
[*]大规模日记收集与分析: RocketMQ 的高吞吐量和次序消息特性,使其成为大规模日记收集和分析体系的抱负选择。
Kafka

优点:

[*]极高的吞吐量和存储容量: Kafka 是为大规模数据流设计的,可以或许处理每秒数百万条消息和PB级别的存储数据,得当大数据场景下的实时数据处理和分析。
[*]持久性和可靠性: Kafka 提供了持久化存储和数据复制机制,确保消息的可靠转达和数据的安全性,支持高可用性和故障容错。
[*]灵活的消息发布订阅模型: Kafka 提供了灵活的发布订阅模型,支持多种消费者订阅方式和数据分发模式,如分区、复制和副本机制,可以或许满足复杂数据流的处理需求。
[*]成熟的生态体系和社区支持: Kafka 拥有庞大的开发社区和丰富的生态体系,支持大量的第三方集成和工具,如数据流处理、流媒体处理等。
缺点:

[*]运维复杂度较高: Kafka 的设置和管理较为复杂,必要专业的运维团队来维护和管理整个集群,包括监控、扩展和故障处理等方面的工作。
[*]实时性稍逊: 尽管 Kafka 在吞吐量上表现出色,但在一些低延伸的场景下可能无法满足实时性要求,特别是对于次序消息的处理。
重要利用场景:


[*]大数据流处理和实时分析: Kafka 的高吞吐量和大容量存储本领使其成为大数据场景下的首选,如日记收集、实时监控、用户举动分析等。
[*]变乱驱动架构: Kafka 的消息发布订阅模型得当构建变乱驱动的微服务架构和实时数据处理流水线,如变乱溯源、实时通知等。
[*]流媒体处理和连续集成: Kafka 的持久化存储和可靠传输特性使其得当于流媒体处理和连续集成的场景,如实时保举体系、数据流管道等。
总结

RocketMQ 和 Kafka 都是功能强盛的消息队列体系,各安闲不同的应用场景中有着显著的上风和实用性。选择符合的体系取决于详细的业务需求,包括数据处理的速率、可靠性要求以及团体架构设计等方面的考量。
接待交流

本文重要先容两种不同的消息队列的推拉模式、以及各种的优缺点和利用场景,在文末还有三个关于消息队列的题目,接待小同伴在批评区留言!近期面试鸭小步伐已全面上线,想要刷题的小同伴可以积极参与!
https://img-blog.csdnimg.cn/img_convert/0f61bfa8e32f00dc8c5e563cc9134ac9.png 1)消息队列体系在分布式体系中的角色和优化策略是什么?
2)消息队列体系如何确保消息次序性?
3)消息队列如何保证消息的可靠性转达?

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RocketMQ 和 Kafka 关于消息队列的推拉模式是怎么做的?