基于RabbitMQ原理的分布式消息队列系统

嚴華  金牌会员 | 2024-9-28 19:50:56 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

1. 项目介绍

什么是RabbitMQ?

   ​RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息署理软件(亦称面向消息的中心件)。
  重要用途:


  • 异步处理无需即时返回且耗时的操纵,提高系统吞吐量。
  • 解耦生产者和消耗者,提高系统机动性。
  • 实现分布式系统的集成。
RabbitMQ现实上是实现了一个基于AMQP的生产者消耗者模型。生产者消耗者模型是后端开发的常用编程方式,它有诸多好处:


  • 解耦合。
  • 并发处理。
  • 支持忙闲不均。
  • 削谷填峰等。
在现实的后端开发中,尤其是分布式系统里,跨主机之间使用生产者消耗者模型,也是很普遍的需求。因此我们以AMQP为焦点封装一个独立的服务器程序。这样的服务程序我们就称为消息队列(Message Queue)。市面上成熟的消息队列有很多:


  • Rabbit
  • Kafka
  • RocketMQ
  • ActiveMQ等
2. 开发环境



  • Linux(ubuntu-22.04)
  • VSCode
  • g++/gdb
  • Makefile
3. 技术选型



  • 主开发语言:C++
  • 序列化框架:ProtoBuf二进制序列化
  • 网络通讯:自定义应用层协议 + muduo库(对长TCP长连接的封装,并使用epoll的事件驱动模式实现高并发服务器与客户端)
  • 数据管理数据库:SQLite3
  • 单元测试框架:Gtest
3.1ProtoBuf使用介绍:


3.2 Muduo库

Muduo是由陈硕大佬开发,基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。它是一款基于主从Reactor模型的网络库,使用的线程模型是one loop per thread,所谓one loop per thread指的是:


  • 一个线程只能有一个事件循环(EventLoop),用于响应计时器和IO事件。
  • 一个文件形貌符只能由一个线程进行读写,也就是说一个TCP连接必须归属于某个EventLoop管理。

3.3 SQLite3

什么是SQLIte?

SQLite是一个进程内的轻量级数据库,它实现了自给自足的、无服务器的、零配置的、事件性的SQL数据库引擎。我们不需要再系统中配置。SQLite引擎不是一个独立的进程。可以按应用程序需求进行静态或动态链接。SQLite直接访问其存储文件。
为什么要用SQLite?



  • 不需要一个单独的服务器进程或操纵的系统(无服务器的)。
  • SQLite不需要配置。
  • 一个完备的SQLite数据库存储在一个单一的跨平台的磁盘文件。
  • SQLite非常小,是轻量级的。完全配置时小于400KiB,省略可选功能时小于250KiB。
  • SQLite自给自足,不需要任何外部的依靠
  • SQLite事件完全兼容ACID,允许从多个进程或线程安全访问。
  • SQLite支持SQL92标准的大多数查询语言功能。
  • SQLite使用ANSI-C编写,并提供了简朴和易于使用的API
  • SQLite可在UNIX(Linux,Mac OS-X,Android,iOS)和Windows(Win32,WinCE,WinRT)中运行。
3.4 Gtest

什么是Gtest

Gtest是一个跨平台的C++单元测试框架,由google公司发布。gtest是为了在不同平台上为编写C++单元测试而天生的。提供了丰富的断言、致命和非致命、参数化等等。
4. 需求分析

4.1 焦点概念



  • 生产者(Producer)
  • 消耗者(Consumer)
  • 中心人(Broker)
  • 发布(Publish)
  • 订阅(Subscribe)
  • 一个生产者,一个消耗者

  • N个生产者,N个消耗者

    此中Broker Server为焦点部分,负责消息的存储和转发。
    而在AMQP模型中,也就是消息中心件服务器Broker中,又存在以下概念:
  • 捏造机(VirtualHost):类似于MySQl的“database”,是一个逻辑上的集合。一个BrokerServer上可以存在多个VirtualHost。
  • 互换机(Exchange):生产者先把消息发送到Broker中的Exchange上,再根据不同的匹配规则,把消息转发给不同的Queue。
  • 队列(Queue):真正用来存储消息的部分,每个消耗者本身决定从哪个Queue上读取消息。
  • 绑定(Binding):Exchange和Queue之间的关联关系,两者可明白为“多对多”关系,使用一个关联表就可以把这两个概念关联起来。(一个Exchange可以绑定多个Queue,一个Queue也可以被多个Exchange绑定)
  • 消息(Message):传递的内容。


上述数据结构,既需要再内存中存储,也需要在硬盘中存储


  • 内存存储:方便使用
  • 硬盘存储:重启数据不丢失
4.2 焦点API

对于Broker来说,要通过以下焦点API实现消息队列的根本功能。

  • 创建互换机(DeclareExchange)
  • 烧毁互换机(DeleteExchange)
  • 创建队列(DeclareQueue)
  • 烧毁队列(DeleteQueue)
  • 创建绑定(Bind)
  • 解除绑定(UnBind)
  • 发布消息(BasicPublish)
  • 订阅消息(BasicConsume)
  • 确认消息(BasicAck)
  • 取消订阅(BasicCancel)
而Producer和Consumer则通过网络长途调用这些API,实现生产者消耗者模型。
4.3 互换机范例

对于RAbbitMQ,重要支持四种互换机范例:


  • Direct
  • Fanout
  • Topic
  • Header
此中Header比较复杂且少见。常用的是前三种范例,项目中也重要实现这三种:


  • Direct:生产者发送消息时,直接指定该互换机绑定的队列名。
  • Fanout:生产者发送的消息会被复制到该互换机全部队列中,也就是广播。
  • Topic:队列与互换机绑定时,指定一个字符串bindingKey,发送的消息里有指定字符串routingKey。当routingKey与bindingKey满足一定的匹配条件时,把消息投递到对应队列。
4.4 恒久化

Exchange、Queue、Binding、Message等数据都有恒久化需求,当程序重启 / 主机重启,包管上述内容不丢失。
4.5 网络通讯

生产者和消耗者都是客户端程序,Broker是服务端程序,通过网络进行通讯。
在通讯过程中,客户端要提供对应的API实现对服务器的操纵。

  • 创建Connection
  • 关闭Connection
  • 创建Channel(OpenChannel)
  • 关闭Channel(CloseChannel)
  • 创建互换机(DeclareExchange)
  • 烧毁互换机(DeleteExchange)
  • 创建队列(DeclareQueue)
  • 烧毁队列(DeleteQueue)
  • 创建绑定(QueueBind)
  • 解除绑定(QueueUnBind)
  • 发布消息(BasicPublish)
  • 订阅消息(BasicConsume)
  • 确认消息(BasicAck)
  • 取消订阅(BasicCancel)
在Broker的基础上,还要增长Connection操纵和Channel操纵:


  • Connection对应一个TCP连接
  • Channel是Connection中的逻辑通道
一个Connection中可以包含多个Channel(一个Connection能被多个Channel使用)。Channel和Channel之间数据独立,不会互干系扰。这样能更好地复用TCP连接,达到长连接的效果,制止频仍创建关闭TCP连接。
4.6 消息应答

被消耗的消息,需要消耗者客户端进行应答。应答模式分为两种:


  • 自动应答:消耗者只要消耗了消息,就算应答完毕。Broker直接删除这个消息。
  • 手动应答:消耗者手动调用应答接口,Broker收到应答哀求后,才真正删除这个消息(未应答时,消息位于待确认队列,没有被真正删除)。
手动应答目的是为了包管消耗者处理成功了,在一些对数据可靠性要求较高的场景比较常见。
5. 模块划分

5.1 服务端模块

5.1.1 恒久化数据管理中央模块

在数据管理模块中管理互换机、队列、队列绑定、消息等数据。
1.互换机管理:



  • 管理信息:名称、范例(Direct等)、是否恒久化标记、是否(无人使用时)自动删除标记、其它参数(作为扩展)。
  • 管理操纵:恢复历史数据、声明、删除、获取、判定是否存在。
2. 队列管理:



  • 管理信息:名称、是否恒久化标记、是否独占标记、是否(无人使用时)自动删除标记、其它参数。
  • 管理操纵:恢复历史数据、声明、删除、获取、判定是否存在。
3. 绑定管理:



  • 管理信息:互换机名称、队列名称、绑定主题(bindingKey)。
  • 管理操纵:恢复历史数据、绑定、解绑、解除互换机关联绑定信息、解除队列关联绑定信息、获取互换机关联绑定信息。、获取指定绑定信息等。
4. 消息管理:


  • 管理信息:


  • 属性:消息ID、路由主题(routingKey)、恒久化模式标记
  • 消息内容
  • 有效标记(共同恒久化需要)
  • 恒久化位置(消息在文件中的偏移量)
  • 恒久化消息长度(该消息存储在文件中的长度)

  • 管理操纵:恢复历史消息、向指定队列新增消息、获取指定队列队首消息、确认移除消息。
这几个概念数据都要在内存和硬盘中存储。


  • 以内存存储为主,包管快速查找信息进行处理。
  • 以硬盘存储为辅,包管服务器重启后,之前的信息可以正常保持。
5.1.2 捏造机管理模块

因为互换机、队列、绑定、都是以捏造机为单元团体进行操纵。因此捏造机是对以上数据管理模块的整合模块。
1. 捏造机管理信息:



  • 互换机数据管理模块句柄
  • 队列数据管理模块句柄
  • 绑定命据管理模块句柄
  • 消息数据管理模块句柄
2. 捏造机对外操纵:



  • 提供捏造机内互换机声明,互换机删除操纵(删除时同时需要删除互换机关联的绑定信息)
  • 提供捏造机队列声明、队列删除操纵(删除时同时需要删除队列关联的绑定信息以及消息管理)
  • 提供捏造机内互换机 - 队列绑定、解绑操纵
  • 获取互换机相关绑定信息
  • 发布消息、消耗消息、确认消息等
3. 捏造机管理操纵:



  • 创建捏造机
  • 查询捏造机
  • 删除捏造机
5.1.3 互换路由模块

   当客户端发送一条消息到互换机后,这条消息应该转发给该互换机绑定的哪些队列中?
  由互换路由模块决定的。绑定信息中有bindingKey,而每条发布的消息中有routingKey。可否入队取决于两个要素:互换机范例和Key


  • 直接互换(Direct):将消息入队到绑定信息中bindingKey与消息routingKey一致的队列中
  • 广播互换(Fanout):将消息入队到该互换机的全部绑定队列中
  • 主题互换(Topic):将消息入队到绑定信息中bindingKey与消息routingKey匹配成功的队列中
bindingKey
由数字字母下划线构成,并使用 . 分成多少部分。
如: news.music.# 用于表示互换机绑定的当前队列是一个用于发布音乐新闻的队列。
支持 *# 两种通配符,但 * # 只能作为 . 切分出来的单独部分,不能和其他数字字母混用。如:


  • __a.*.b__是正当的,而 a.*a.b 是不正当的
  • * 可以匹配任意一个单词(注意不是字母)
  • # 可以匹配任意零个或多个单词(注意不是字母)
  • 注意: *# 不能相邻,因为 # 完全可以替代 * 的功能
routingKey
由数字字母下划线构成,并且可以使用 . 划分成多少部分。如:


  • news.music.pop ,表示当前发布的消息是一个流行音乐的新闻。
5.1.4 消耗者管理模块

消耗者管理是以队列为单元的,因为每个消耗者都会在开始的时候订阅一个队列的消息,当队列中有消息后,会将消息轮询推送给订阅该队列的消耗者(负载平衡)。
因此操纵流程为:从队列关联的消息管理中取出消息,从队列关联的消耗者中取出一个消耗者,将消息推送给消耗者。

  • 消耗者信息:


  • 消耗者标识tag
  • 订阅队列名称
  • 自动应答标记(决定一条消息推送给消耗者后,是否需要等待收到确认后再删除消息)
  • 消息处理回调函数指针(一个消息发布后被Push到线程池,调用传入Push的事件处理函数,函数内部选择队列关联的消息和消耗者,接着调用消耗者的消息处理回调函数将消息发送给消耗者客户端)
  1. void(const std::string &tag, const BasicProperties& bp, const std::string &body)
复制代码

  • 消耗者管理:添加、删除、获取指定队列消耗者,移除队列全部消耗者等操纵。
5.1.5 信道管理模块

在AMQP模型中,除了通讯连接Connection概念外,另有一个Channel概念。Channel是针对Connection连接的一个更细粒度的通讯信道,多个Channel可以使用同一个Connection进行通讯,同时一个Connection之间的Channel之间互相独立。
信道模块是再次将上述模块进行整合提供服务的模块。

  • 管理信息:


  • 信道ID
  • 信道关联的消耗者 / 生产者
  • 信道关联的连接
  • 信道关联的捏造机
  • 工作线程池(一条消息被发布到队列后,需要将消息推送给订阅了该队列的消耗者,该工作由线程池完成)

  • 管理操纵:


  • 互换机的声明 / 删除
  • 队列的声明 / 删除
  • 互换机 - 队列的绑定 / 解绑
  • 消息的订阅 / 取消订阅
  • 消息的发布 / 确认消息
5.1.6 连接管理模块

本项目的服务器是通过muduo库实现底层通讯的,muduo库并不能提供我们所需的全部操纵,我们需要连接管理模块实现对muduo库的二次封装(同时对信道管理模块的封装),以完成我们的需求。


  • 管理信息:连接关联的信道管理句柄、连接关联的muduo库Connection、信道管理所需的模块句柄。
  • 管理操纵:新增、删除、获取连接,打开 / 关闭信道
5.1.7 Broker服务器管理模块

综合以上全部模块,搭建网络通讯服务器,实现与客户端的网络通讯,辨认客户端的哀求并提供哀求处理服务。


  • 管理信息:捏造机、消耗者、连接管理句柄,工作线程池句柄,muduo库通讯所需元素。
  • 提供服务:综合打开 / 关闭信道、消息订阅 / 取消订阅等哀求处理接口。
5.2 客户端模块

5.2.1 消耗者管理

消耗者在客户端存在感比较薄弱,在用户使用角度中,只要创建一个信道,就可以通过信道完成全部操纵。对于消耗者的感官更多的是在订阅时传入了一个消耗者标识。尤其是本项目的简朴实现是一个信道只能订阅一个队列,也就是说一个信道只能创建一个消耗者,一一对应更弱化了消耗者的存在。


  • 消耗者信息:消耗者标识、订阅队列名称、自动应答标记、消息处理回调函数
5.2.2 信道哀求模块

与服务端信道类似,客户端也有Channel的概念。

  • 信道管理信息:


  • 信道ID
  • 信道关联的连接
  • 信道关联的消耗者
  • 哀求对应的消息响应队列(这里使用hash表,以快速查找指定相应)
  • 互斥锁&条件变量(大部分的哀求都是阻塞操纵,发送哀求后需要等到响应后才能继续。但muduo库的通讯是异步的,因此需要我们在收到响应后,通过判定是否是等待的指定响应来进行同步)
信道管理操纵:


  • 信道的创建 / 删除
  • 互换机的声明 / 删除
  • 队列的声明 / 删除
  • 互换机 - 队列的绑定 / 解绑
  • 添加 / 取消订阅
  • 消息的发布 / 确认等
5.2.3 通讯连接模块

向用户提供一个实现网络通讯的Connection对象,内部可创建更细粒化的Channel对象与服务器通讯。

  • 管理信息:


  • 连接关联的现实用于通讯的muduo::net::Connection连接对象
  • 连接关联的信道管理句柄(实现信道的增删查)
  • 连接关联的Event Loop异步循环工作线程
  • 异步工作线程池(对服务器发来的消息进行处理)

  • 操纵管理:


  • 管道的创建 / 删除
5.3 项目模块关系图


6. 项目效果简朴演示


  • 启动服务器

  • 发布客户端发布自定义消息(字符串或使命,这里简朴起见使用字符串)

  • 消耗者客户端进行消耗(命令行指定要订阅的队列)

7. 总结

本项目模仿RabbitMQ实现简化版的消息队列组件,内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布、订阅以及消息推送功能。
本篇博客,简朴搭建了项目总体框架,难点在于要理清各数据结构间的关系以及回调函数较为复杂。
项目链接:项目源码

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

嚴華

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表