立聪堂德州十三局店 发表于 2024-8-18 16:34:21

图解 RocketMQ 架构

写在前面

Kafka、RocketMQ都是很出名的中间件,上次我们讲授了Kafka,这次我们来讲讲RocketMQ的原理。
根本架构图

https://i-blog.csdnimg.cn/direct/1dabba5666144c7c91cdd15da3a021b0.png
剖析

RocketMQ 统共可以分成四个模块


[*]NameServer:提供服务发现和路由功能,管理各种元数据信息。
[*]Broker:消息存储和路由分发节点,负责存储消息和将消息路由给斲丧者。
[*]Producer:消息生产者,负责产生并发送消息到指定的 Topic。
[*]Consumer:消息斲丧者,订阅 Topic 并从 Broker 拉取消息进行处理。
https://i-blog.csdnimg.cn/direct/2b1fb04772384d90854ba270b3f7d938.png
大体读写步调

[*]注册 Broker Cluster 到 NameServer。
[*]注册 Producer、Consumer 到 NameServer。
[*]Producer 获取MQ集群的Broker、Queue等信息。
[*]Producer 发消息,一样平常消息都会选择master进行写入,而slave进行读取。
[*]顺序写入消息到 commit log 中。
[*]queue log会记载每条commit log的存储信息,当然不会记载所有,只记载一些重要的,commitLogOffset等等。
[*]master 将消息异步给slave。
[*]Consumer读取slave的消息。
[*]Consumer返回ACK作为确认消息斲丧成功。
1. NameServer

当Broker服务启动后,会向NameServer注册信息,比如broker中的Topic、斲丧偏移量、队列、ip、端口等,由Broker的心跳发送到NameServer,BrokerCluster 中的每一个节点都会注册到NameServer上。
https://i-blog.csdnimg.cn/direct/1e49809084c440459d26b911c22d2bf2.png
即使一个NameService节点挂了,剩下的一个NameService节点仍旧包含所有的broker信息。不过NameService是无状态的, NameService之间不会相互通讯,那么一个NameService挂了,不会影响别的一个NameService。
注册完Broker之后,NameServer会每隔10s发送心跳查抄Broker,如果Broker超过120s还没有相应,则这个Broker被视为宕机
2. Broker

2.1 CommitLog & Message Queue

Broker 启动,跟所有的 NameServer 保持长连接,每 30s 发送一次发送心跳包(像心跳一样持续稳定的发送哀求)。心跳包中包含当前 Broker 信息 ( IP+ 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
https://i-blog.csdnimg.cn/direct/5878a4bd1c234b77b345c9a2e9b7d4d9.png
Broker担当消息,会顺序写入消息到 CommitLog 中。Broker里面的两个存储介质:Commit log 和 Message queue的区别:

[*]Commit log 存储消息实体。顺序写,随机读。虽然是随机读,但是使用package机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
[*]Message queue 存储消息的偏移量。读消息先读 message queue,根据偏移量到 commit log 读消息本身。
所以实在我们的消息不是存放在queue中,而是存放在commit log中,这就是为什么queue会被称为逻辑队列
2.2 Index File

2.2.1 先容

因为所有的消息都存在CommitLog中,如果要实现根据 key 查询 消息的方法,就会变得非常困难,所以为相识决这种业务需求,有了IndexFile的存在。用于为天生的索引文件提供访问服务,通过消息 Key 值查询消息真正的实体内容。
IndexFile 如何创建?以创建的时间戳命名。参数:phyOffset物理偏移量(也就是commitLogOffset)、keys。
https://i-blog.csdnimg.cn/direct/4d8a962e0d124dcd99d9eb50e913cc78.png
如何查询消息呢?
2.2.2 按照MessageId查询

RocketMQ中的MessageId的长度统共有16字节,此中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。

[*]Client 端从 MessageId 中剖析出 Broker 的地址(IP地址和端口)和Commit Log的偏移地址发送一个RPC哀求。
[*]Broker 端读取消息的过程用此中的 commitLog offset 和 size 去 commitLog 中找到真正的记载并剖析成一个完整的消息返回。
https://i-blog.csdnimg.cn/direct/52e184813e1f42ffb519c434f2b0ea1f.png
2.2.3 按照Message Key查询


[*]找槽位:slotKey = 40 byte + hash(topic + "#" + key) % 500W * 4byte。
[*]计算槽位:slotValue = 最新插入 index 的位置。
[*]遍历单向链表:从 slotValue 找到最新 index 在整个索引文件中位置 = 40byte +500w*4byte + slotValue*20byte,然后根据单个索引文件的 pre index 值找到前一个索引,一直遍历下去,直到index数据中key hash和时间区间都满足即可。添加到 commitLogOffset 的 list) 中。
[*]最终根据此中的 commitLogOffset 从 CommitLog 文件中读取消息的实体内容。
3. Producer

略。Producer似乎除了负载均衡,就没什么好讲的地方了。
4. Consumer

在RocketMQ中,Consumer端的两种斲丧模式(Push/Pull)都是基于拉模式来获取消息的,pull必要手动实现拉取消息,push只必要实现斲丧监听器。但实际底层都是pull。
在Consumer启动后,会通过定时使命不断地向所有Broker实例发送心跳包,包含:消息斲丧分组名称、订阅关系聚集、消息通讯模式和客户端id的值等信息
Broker端在收到Consumer的心跳消息后,会将它维护在 ConsumerManager 的本地缓存变量。会根据斲丧者组获取对应维护的斲丧者组信息。
https://i-blog.csdnimg.cn/direct/22ce3b144eeb4756aa67d498947b5d50.png
如果是新加入的consumer获取订阅信息变了,会关照这个斲丧者组里面的其他斲丧者说斲丧者有变化,被关照到的斲丧者就会重新负载均衡。
https://i-blog.csdnimg.cn/direct/4bba68cb6d1940e2bee9321c4567ccc9.png
https://i-blog.csdnimg.cn/direct/15144547813042f79f3fecd987db9857.png
参考

https://www.modb.pro/db/141171
https://www.cnblogs.com/duanxz/p/5020398.html
https://www.cnblogs.com/dennyzhangdd/p/15035116.html
https://www.alibabacloud.com/blog/rocketmq-5-0-architecture-analysis-how-to-support-diversified-scenarios-based-on-cloud-native-architecture_600564
https://cloud.tencent.com/developer/article/2277381
https://www.cnblogs.com/hzzjj/p/16552514.html

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 图解 RocketMQ 架构