模拟RabbitMQ实现消息队列

打印 上一主题 下一主题

主题 879|帖子 879|积分 2637

代码地址: https://gitee.com/lv-shixiong/linux_test/tree/master/mq 
     项目介绍

     壅闭队列实现生产者消费者模型:
     可以或许解耦合,将数据的生产和消费分开
     支持并发
     支持忙闲不均
     消息队列就是把壅闭队列封装成独立的服务器程序,让它运行到某一台主机上,发消息到该主机,从该主机拉取消息(跨主机的生产者消费者模型)
     实现消息队列服务器,发布客户端,订阅客户端
     

     开发环境

     ubuntu-22.04、VSCode、g++、Makefile
     技能选型

     主开发语言:C++
     序列化框架:Protobuf二进制序列化
     网络通讯:自界说应用层协议+muduo库:对tcp长连接的封装,并且使用epoll的事件驱动模式,实现高并发服务器与客户端
     源数据信息数据库:SQLite3
     单元测试框架:Gtest
     环境搭建

     1、安装wget
     2、安装protobuf
     在 /usr/local/google里找protobuf
     3、安装muduo
     安装好的程序在../build
     Protobuf

     是序列化反序列化的框架
     特点:语言无关、平台无关;高效;扩展性、兼容性好。
     使用方法:
     

  • 编写.proto文件(界说结构对象)。
  • 使用protoc编译器编译文件天生一系列接口代码、.h中界说了我们所描述的数据结构对象类,.cc界说实现告终构化对象数据的访问&操作&序列化&反序列化。
  • 将天生的代码包含进我们的代码中,引入头文件。
     语法规范:
     

  • proto文件使用全小写,单词间_隔开。
  • 书写代码时,使用2个空格缩进。
  • 每行语句分号末了
  • //注释
     语法:
     指定proto3语法,syntax = "proto3"。
     声明命名空间,package 声明符,示.proto文件的命名空间。
     界说结构对象描述message 消息体名称{},名称用驼峰
     界说消息字段 字段范例 字段名 = 字段唯一编号
     

     

     编译:protoc --cpp_out=路径 proto文件名称
     protoc --cpp_out=. contacts.proto
                                   登录后复制                        
  1. syntax= "proto3";//声明语法版本
  2. package contacts;//声明代码的命名空间
  3. message contact{
  4.     //各个字段  字段类型 字段名 = 字段唯一编号
  5.     uint64 sn = 1;
  6.     string name = 2;
  7.     float score = 3;
  8. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
                                                     登录后复制                        
  1. main:main.cpp contacts.pb.cc
  2.         g++ -o $@ $^ -std=c++11 -lprotobuf
复制代码
      

  • 1.
  • 2.
                                                     登录后复制                        
  1. #include "contacts.pb.h"
  2. #include <iostream>
  3. int main()
  4. {
  5.     //创建通讯录对象
  6.     contacts::contact con;
  7.     //添加字段
  8.     con.set_name("Nesta");
  9.     con.set_sn(13);
  10.     con.set_score(99);
  11.     //序列化
  12.     std::string str=con.SerializeAsString();
  13.     //反序列化
  14.     contacts::contact conR;
  15.     conR.ParseFromString(str);
  16.     //打印信息
  17.     std::cout<<"name="<<conR.name()<<std::endl;
  18.     std::cout<<"sn="<<conR.sn()<<std::endl;
  19.     std::cout<<"socre="<<conR.score()<<std::endl;
  20.     return 0;
  21. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
                       Muduo

     muduo库是一个基于reactor模型的高性能服务器框架。
     什么是reactor?基于时间触发的模型(基于epoll进行IO事件监控)
     主从reactor模型:将IO事件监控进行进一步的层次分别。
     主reactor:只对新建连接事件进行监控(保证不受IO壅闭影响实现高效的新建连接获取)
     从reactor:针对新建连接进行IO事件监控(进行IO操作和业务处理)
     主从reactor必然是一个多执行流的并发模式---one  thread one loop,一个事件监控,占据一个线程,进行事件监控。
     

     muduo::net::TcpServer类
     muduo::net::EventLoop类
     muduo::net::TcpConnection类
     muduo::net::Buffer类
     Server:
                                   登录后复制                        
  1. #include "./include/muduo/net/TcpServer.h"
  2. #include "./include/muduo/net/EventLoop.h"
  3. #include "./include/muduo/net/TcpConnection.h"
  4. #include <iostream>
  5. #include <string>
  6. #include <functional>
  7. #include <unordered_map>
  8. class TranslateServer
  9. {
  10. public:
  11.     TranslateServer(uint16_t port)
  12.         :_server(&_loop
  13.         ,muduo::net::InetAddress("0.0.0.0",8888)
  14.         ,"translateServer"
  15.         ,muduo::net::TcpServer::kNoReusePort)
  16.         {
  17.             //注册回调函数
  18.             auto ccb=std::bind(&TranslateServer::OnConnection,this,std::placeholders::_1);
  19.             auto mcb=std::bind(&TranslateServer::OnMessage,this,std::placeholders::_1
  20.             ,std::placeholders::_2,std::placeholders::_3);
  21.             _server.setConnectionCallback(ccb);
  22.             _server.setMessageCallback(mcb);
  23.             _server.setConnectionCallback(ccb);
  24.             _server.setMessageCallback(mcb);
  25.         }
  26.     void Start()//启动服务器
  27.     {
  28.         
  29.         _server.start();
  30.         _loop.loop();//这是一个死循环
  31.     }
  32. private:
  33.     void OnConnection(const muduo::net::TcpConnectionPtr& conn)
  34.     {
  35.         //对连接的回调
  36.         //建立连接和断开连接时都会触发
  37.         if(conn->connected())//确定是否连接成功
  38.         {
  39.             std::cout<<"连接成功"<<std::endl;
  40.         }
  41.         else
  42.         {
  43.             std::cout<<"连接失败"<<std::endl;
  44.         }
  45.     }
  46.     std::string Translate(std::string key)
  47.     {
  48.         std::unordered_map<std::string,std::string> dict={
  49.             {"你好","hello"}
  50.             ,{"hello","你好"}
  51.             ,{"world","世界"}
  52.             ,{"世界","world"}
  53.         };//字典
  54.         auto it=dict.find(key);
  55.         if(it!=dict.end())//找到了
  56.         {
  57.             return it->second;
  58.         }
  59.         else//没找到
  60.         {
  61.             return "不认识这个词语";
  62.         }
  63.     }
  64.     void OnMessage(const muduo::net::TcpConnectionPtr& ptr,muduo::net::Buffer* buffer,muduo::Timestamp)
  65.     {
  66.         //对消息的回调
  67.         //1.从缓冲区里拿到数据
  68.         std::string key=buffer->retrieveAllAsString();
  69.         //2.翻译
  70.         std::string val=Translate(key);
  71.         //3.将翻译结果发回客户端
  72.         muduo::net::Buffer message;
  73.         message.append(val);
  74.         ptr->send(&message);
  75.     }
  76.     //loop是对epool的事件监听,触发事件后进行IO操作
  77.     muduo::net::EventLoop _loop;
  78.     //_server主要用于设置回调函数,告诉服务器收到什么消息进行怎样的处理
  79.     muduo::net::TcpServer _server;
  80. };
  81. int main()
  82. {
  83.     TranslateServer svr(8888);
  84.     svr.Start();
  85.     return 0;
  86. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
                       Client
                                   登录后复制                        
  1. #include "include/muduo/net/TcpClient.h"
  2. #include "include/muduo/net/Buffer.h"
  3. #include "include/muduo/net/EventLoopThread.h"
  4. #include "include/muduo/net/InetAddress.h"
  5. #include "include/muduo/base/CountDownLatch.h"
  6. #include <string>
  7. #include <iostream>
  8. #include <functional>
  9. class TranslateClient
  10. {
  11. public:
  12.     TranslateClient(std::string serverIp,uint16_t serverPort)
  13.         :_client(_loopThread.startLoop()
  14.         ,muduo::net::InetAddress(serverIp,serverPort)
  15.         ,"translateClient")
  16.         ,_latch(1)//一定要设为1
  17.     {
  18.         //注册回调函数
  19.         auto ccb=std::bind(&TranslateClient::OnConnection,this,std::placeholders::_1);
  20.         auto mcb=std::bind(&TranslateClient::OnMessage,this
  21.         ,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  22.         
  23.         _client.setConnectionCallback(ccb);
  24.         _client.setMessageCallback(mcb);
  25.         std::cout<<"初始化完成"<<std::endl;
  26.     }
  27.     void Connect()//连接服务器
  28.     {
  29.         _client.connect();//进行连接
  30.         //之所以要阻塞就是为了方式没有连接成功就进行了其他操作,导致错误
  31.         std::cout<<"开始连接"<<std::endl;
  32.         _latch.wait();//阻塞等待连接成功
  33.     }
  34.     void Send(std::string& message)//发送消息
  35.     {
  36.         if(_conn->connected())//确认连接成功再进行IO
  37.         {
  38.             _conn->send(message);
  39.         }
  40.         
  41.     }
  42. private:
  43.     void OnConnection(const muduo::net::TcpConnectionPtr& conn)
  44.     {
  45.         if(conn->connected())//连接上了
  46.         {
  47.             _latch.countDown();//解除阻塞
  48.             _conn=conn;//保存连接
  49.         }
  50.         else
  51.         {
  52.             //......
  53.         }
  54.     }
  55.     void OnMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buffer,muduo::Timestamp ts)
  56.     {
  57.         //来消息了就打印出来
  58.         std::string message=buffer->retrieveAllAsString();
  59.         std::cout<<"Server Say> "<<message<<std::endl;
  60.     }
  61. private:
  62.     muduo::CountDownLatch _latch;//用于同步
  63.     muduo::net::EventLoopThread _loopThread;//创建线程进行IO
  64.     muduo::net::TcpClient _client;
  65.     muduo::net::TcpConnectionPtr _conn;//客户端与服务器的连接
  66. };
  67. int main()
  68. {
  69.     TranslateClient cli("127.0.0.1",8888);
  70.     cli.Connect();//建立连接
  71.     while(1)
  72.     {
  73.         std::string message;
  74.         std::cin>>message;
  75.         cli.Send(message);
  76.     }
  77.     return 0;
  78. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
                       上面的服务器与客户端没有协议,直接通讯。
     下面是基于protobuf来进行通讯。
     

     muduo-master/examples/protobuf/codec/codec.cc和codec.h以及dispatcher.h
     examples是一些例子,要学会参考别人的代码。
     

     SQLite3

     是什么?进程内的轻量级数据库,是本地的,不必要服务器,它给我们提供了一系列接口。
     官方文档: List Of SQLite Functions
                                   登录后复制                        
  1. sqlite3操作流程:0. 查看当前数据库在编译阶段是否启动了线程安全
  2.                 int sqlite3_threadsafe(); 0-未启⽤; 1-启⽤
  3.                 需要注意的是sqlite3是有三种安全等级的:
  4.                 1. ⾮线程安全模式--不使用任何锁,效率最高
  5.     2. 线程安全模式(不同的连接在不同的线程/进程间是安全的,即⼀个句柄不能⽤于多线程
  6.                 间)
  7.                 3. 串⾏化模式(可以在不同的线程/进程间使⽤同⼀个句柄)
  8. 1. 创建/打开数据库⽂件,并返回操作句柄
  9.                 int sqlite3_open(const char *filename, sqlite3 **ppDb) 成功返回SQLITE_OK
  10.                 //若在编译阶段启动了线程安全,则在程序运⾏阶段可以通过参数选择线程安全等级
  11.                 int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const
  12.                 char *zVfs );
  13.                 flag:
  14.                 SQLITE_OPEN_READWRITE -- 以可读可写⽅式打开数据库⽂件
  15.                 SQLITE_OPEN_CREATE -- 不存在数据库⽂件则创建
  16.                 SQLITE_OPEN_NOMUTEX--多线程模式,只要不同的线程使⽤不同的连接即可保证线程
  17.                 安全
  18.                 SQLITE_OPEN_FULLMUTEX--串⾏化模式
  19.                 返回:SQLITE_OK表⽰成功,否则有问题
  20. 2. 执⾏语句
  21.                 int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),
  22.                 void* arg, char **err)
  23.         err是错误信息
  24.         
  25.                 int (*callback)(void*,int,char**,char**)
  26.                 void* : 是设置的在回调时传⼊的arg参数
  27.                 int:⼀⾏中数据的列数
  28.                 char**:存储⼀⾏数据的字符指针数组
  29.                 char**:每⼀列的字段名称
  30.                 这个回调函数有个int返回值,成功处理的情况下必须返回0,返回⾮0会触发ABORT退出程序
  31.                 返回:SQLITE_OK表⽰成功
  32. 3. 销毁句柄
  33.                 int sqlite3_close(sqlite3* db); 成功返回SQLITE_OK
  34.                 int sqlite3_close_v2(sqlite3*); 推荐使⽤--⽆论如何都会返回SQLITE_OK
  35. 获取错误信息
  36.                 const char *sqlite3_errmsg(sqlite3* db);
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
                       sqlite3 test.db//打开数据库文件
     .table//查看表
                                   登录后复制                        
  1. //封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成对数据增删改查操作
  2. //1.创建/打开数据库文件
  3. //2.针对打开的数据库进行操作
  4.     //2.1表的操作
  5.     //2.2数据的操作
  6. //3.关闭数据库
  7. #include <sqlite3.h>
  8. #include <string>
  9. #include <iostream>
  10. class SqliteHelper
  11. {
  12. public:
  13.     typedef int(*SqliteCallback)(void*,int,char**,char**);
  14.     SqliteHelper(std::string dbfile)
  15.         :_dbfile(dbfile)
  16.     {}
  17.     bool open(int safeLevel=SQLITE_OPEN_FULLMUTEX)//打开数据库文件
  18.     {
  19.         //默认安全级别为串行化模式
  20.         //int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs )
  21.         int ret=sqlite3_open_v2(_dbfile.c_str(),&_handler,SQLITE_OPEN_CREATE|SQLITE_OPEN_READWRITE|safeLevel,nullptr);
  22.         if(ret!=SQLITE_OK)
  23.         {
  24.             std::cout<<"创建/打开数据库文件失败:"<<sqlite3_errmsg(_handler)<<std::endl;
  25.             return false;
  26.         }
  27.         return true;
  28.     }
  29.     bool exec(const std::string& sql,SqliteCallback cb,void* arg)//进行操作
  30.     {
  31.         //int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),void* arg, char **err)
  32.         int ret=sqlite3_exec(_handler,sql.c_str(),cb,arg,nullptr);
  33.         if(ret!=SQLITE_OK)
  34.         {
  35.             std::cout<<sql<<"执行失败:"<<sqlite3_errmsg(_handler)<<std::endl;
  36.             return false;
  37.         }
  38.         return true;
  39.     }
  40.     void close()//关闭数据库文件
  41.     {
  42.         //int sqlite3_close_v2(sqlite3*)
  43.         sqlite3_close_v2(_handler);
  44.     }
  45. private:
  46.     sqlite3* _handler;//句柄
  47.     std::string _dbfile;//数据库文件名
  48. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
                                                     登录后复制                        
  1. #include "sqlite.hpp"
  2. #include <vector>
  3. #include <cassert>
  4. int select_stu_callback(void* arg,int col_count,char** result,char** fields_name) {
  5.     std::vector<std::string> *arry = (std::vector<std::string>*)arg;
  6.     arry->push_back(result[0]);
  7.     return 0; //必须有!!!
  8. }
  9. int main()
  10. {
  11.     SqliteHelper helper("./test.db");
  12.     //1. 创建/打开库文件
  13.     assert(helper.open());
  14.     //2. 创建表(不存在则创建), 学生信息: 学号,姓名,年龄
  15.     const char *ct = "create table if not exists student(sn int primary key, name varchar(32), age int);";
  16.     assert(helper.exec(ct, nullptr, nullptr));
  17.     //3. 新增数据 , 修改, 删除, 查询
  18.     // const char *insert_sql = "insert into student values(1, '小明', 18), (2, '小黑', 19), (3, '小红', 18);";
  19.     // assert(helper.exec(insert_sql, nullptr, nullptr));
  20.     // const char *update_sql = "update student set name='张小明' where sn=1";
  21.     // assert(helper.exec(update_sql, nullptr, nullptr));
  22.     // const char *delete_sql = "delete from student where sn=3";
  23.     // assert(helper.exec(delete_sql, nullptr, nullptr));
  24.     const char *select_sql = "select name from student;";
  25.     std::vector<std::string> arry;
  26.     assert(helper.exec(select_sql, select_stu_callback, &arry));
  27.     for (auto &name : arry) {
  28.         std::cout << name << std::endl;
  29.     }
  30.     //4. 关闭数据库
  31.     helper.close();
  32.     return 0;
  33. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
                       GTest

     使用:
     

  • 宏断言
  • 事件机制(全局,单独)
     GTest中的断⾔的宏可以分为两类:
     

  • ASSERT_系列:如果当前点检测失败则退出当前函数
  • EXPECT_系列:如果当前点检测失败则继承往下执⾏
     常用断言:
                                   登录后复制                        
  1. // bool值检查
  2. ASSERT_TRUE(参数),期待结果是true
  3. ASSERT_FALSE(参数),期待结果是false
  4. //数值型数据检查
  5. ASSERT_EQ(参数1,参数2),传⼊的是需要⽐较的两个数 equal
  6. ASSERT_NE(参数1,参数2),not equal,不等于才返回true
  7. ASSERT_LT(参数1,参数2),less than,⼩于才返回true
  8. ASSERT_GT(参数1,参数2),greater than,⼤于才返回true
  9. ASSERT_LE(参数1,参数2),less equal,⼩于等于才返回true
  10. ASSERT_GE(参数1,参数2),greater equal,⼤于等于才返回true
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
                       TEST(参数1,参数2){//......}
     参数1是测试名称,第二个参数是详细名称
     RUN_ALL_TESTS();//运行全部测试
                                   登录后复制                        
  1. #include<iostream>
  2. #include<gtest/gtest.h>
  3. int abs(int x)
  4. {
  5.         return x > 0 ? x : -x;
  6. }
  7. TEST(abs_test, test1)
  8. {
  9.         ASSERT_TRUE(abs(1) == 1) << "abs(1)=1";
  10.         ASSERT_TRUE(abs(-1) == 1);
  11.         ASSERT_FALSE(abs(-2) == -2);
  12.         ASSERT_EQ(abs(1),abs(-1));
  13.         ASSERT_NE(abs(-1),0);
  14.         ASSERT_LT(abs(-1),2);
  15.         ASSERT_GT(abs(-1),0);
  16.         ASSERT_LE(abs(-1),2);
  17.         ASSERT_GE(abs(-1),0);
  18. }
  19. int main(int argc,char *argv[])
  20. {
  21.         //将命令⾏参数传递给gtest
  22.         testing::InitGoogleTest(&argc, argv);
  23.         // 运⾏所有测试案例
  24.         return RUN_ALL_TESTS();
  25. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
                       

     全局事件:针对整个测试程序。实现全局的事件机制,必要创建⼀个⾃⼰的类,然后继承 testing::Environment类,然后分别实现成员函数 SetUp 和 TearDown ,同时在main函数内进⾏调⽤ testing::AddGlobalTestEnvironment(new MyEnvironment); 函数添加全局的事件机制
     全局测试套件,其实就是用户本身界说一个全局的测试环境类
                                   登录后复制                        
  1. #include <gtest/gtest.h>
  2. #include <iostream>
  3. #include <unordered_map>
  4. //全局事件:针对整个测试程序,提供全局事件机制,能够在测试之前配置测试环境数据,测试完毕后清理数据
  5. //先定义环境类,通过继承testing::Environment的派⽣类来完成
  6. //重写的虚函数接⼝SetUp会在测试之前被调⽤; TearDown会在测试完毕后调⽤.
  7. std::unordered_map<std::string,std::string> dict;
  8. class MyMapTest:public testing::Environment
  9. {
  10. public:
  11.     virtual void SetUp() override
  12.     {
  13.         std::cout<<"所有单元开始测试"<<std::endl;
  14.         dict.insert(std::make_pair("hello","你好"));
  15.         dict.insert(std::make_pair("世界","world"));
  16.     }
  17.     virtual void TearDown() override
  18.     {
  19.         std::cout<<"所有单元测试结束"<<std::endl;
  20.         dict.clear();
  21.     }
  22. };
  23. TEST(MyMapTest,test1)
  24. {
  25.     ASSERT_EQ(dict.size(),2);
  26.     dict.erase("hello");
  27. }
  28. TEST(MyMapTest,test2)
  29. {
  30.     ASSERT_EQ(dict.size(),2);
  31. }
  32. int main(int argc,char* argv[])
  33. {
  34.     testing::InitGoogleTest(&argc,argv);
  35.     testing::AddGlobalTestEnvironment(new MyMapTest());
  36.     RUN_ALL_TESTS();
  37.     return 0;
  38. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
                       TestSuite事件:针对⼀个个测试套件。测试套件的事件机制我们同样必要去创建⼀个类,继承⾃testing::Test ,实现两个静态函数 SetUpTestCase 和 TearDownTestCase ,测试套件的事件机制不必要像全局事件机制⼀样在 main 注册,⽽是必要将我们平常使⽤的 TEST 宏改为 TEST_F 宏。
     

  • SetUpTestCase() 函数是在测试套件第⼀个测试⽤例开始前执⾏

  • TearDownTestCase() 函数是在测试套件最后⼀个测试⽤例竣事后执⾏

  • 必要注意TEST_F的第⼀个参数是我们创建的类名,也就是当前测试套件的名称,如许在TEST_F宏的测试套件中就可以访问类中的成员了。
                                   登录后复制                        
  1. #include <iostream>
  2. #include <gtest/gtest.h>
  3. #include <unordered_map>
  4. class MyTest:public testing::Test
  5. {
  6. public:
  7.     static void SetUpTestCase()//所有单元测试开始时初始化
  8.     {
  9.         //假设有一个全局的变量,公共的测试数据就可以在这里初始化
  10.         std::cout<<"所有单元测试开始"<<std::endl;
  11.     }
  12.     virtual void SetUp() override//每次单元测试时初始化
  13.     {
  14.         //假设有一个全局的变量,在这里插入每个单元测试所需的独立的测试数据
  15.         std::cout<<"单元测试开始"<<std::endl;
  16.         mymap.insert(std::make_pair("hello","world"));  
  17.     }
  18.     virtual void TearDown() override//每次单元测试结束时清理
  19.     {
  20.         std::cout<<"单元测试结束"<<std::endl;
  21.         mymap.clear();
  22.     }
  23.     static void TearDownTestCase()//所有单元测试结束时清理
  24.     {
  25.         std::cout<<"所有单元测试结束"<<std::endl;
  26.     }
  27. public:
  28.     std::unordered_map<std::string,std::string> mymap;
  29. };
  30. //必须和类同名
  31. TEST_F(MyTest,test1)
  32. {
  33.     mymap.insert(std::make_pair("aaa","aaa"));
  34.     ASSERT_EQ(mymap.size(),2);
  35. }
  36. TEST_F(MyTest,test2)
  37. {
  38.     ASSERT_EQ(mymap.size(),1);
  39. }
  40. int main(int argc,char* argv[])
  41. {
  42.     testing::InitGoogleTest(&argc,argv);
  43.     RUN_ALL_TESTS();
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
                       C++11异步操作实现线程池

     怎样获取线程池里的线程完成使命后的结果?
     std::future:获取使命结果
     std::future是C++11标准库中的⼀个模板类,它表⽰q⼀个异步操作的结果。当我们在多线程编程中使⽤异步使命时,std::future可以帮助我们在必要的时候获取使命的执⾏结果。std::future的⼀个重要特性是可以或许壅闭当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。
     asyn
                                   登录后复制                        
  1. #include <iostream>
  2. #include <future>
  3. #include <thread>
  4. #include <chrono>
  5. int add(int num1,int num2)
  6. {
  7.     std::cout<<"aaaaaaaaaaaaaaaaaaaaa"<<std::endl;
  8.     std::cout<<"加法任务"<<std::endl;
  9.      std::this_thread::sleep_for(std::chrono::seconds(5));
  10.     std::cout<<"bbbbbbbbbbbbbbbbbbbbb"<<std::endl;
  11.     return num1+num2;
  12. }
  13. int main()
  14. {
  15.     //std::async(func, ...)      std::async(policy, func, ...)
  16.     //std::launch::deferred  在执行get获取异步结果的时候,才会执行异步任务
  17.     //std::launch::async   内部会创建工作线程,异步的完成任务
  18.     std::cout<<"----------1----------"<<std::endl;
  19.     std::future<int> result=std::async(std::launch::deferred,add,2,3);//把任务交它,它会创建一个工作线程来执行任务
  20.     std::this_thread::sleep_for(std::chrono::seconds(1));
  21.     std::cout<<"----------2----------"<<std::endl;
  22.     int ret=result.get();//此时如果任务还未完成,会阻塞等待
  23.     std::cout<<"----------3----------"<<std::endl;
  24.     std::cout<<"result="<<ret<<std::endl;
  25.     return 0;
  26. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
                       promise
                                   登录后复制                        
  1. #include <future>
  2. #include <thread>
  3. #include <chrono>
  4. #include <iostream>
  5. //通过在线程中对promise对象设置数据,其他线程中通过future获取设置数据的方式实现获取异步任务执行结果的功能
  6. void add(int num1,int num2,std::promise<int>& prom)
  7. {
  8.     prom.set_value(num1+num2);
  9.     std::this_thread::sleep_for(std::chrono::seconds(3));
  10. }
  11. int main()
  12. {
  13.     std::promise<int> prom;
  14.     std::future<int> fut=prom.get_future();//二者从此绑定了关系
  15.     std::thread th(add,11,12,std::ref(prom));//创建线程执行任务
  16.     std::cout<<fut.get()<<std::endl;//get会进行同步,等任务完成了才得到结果
  17.     th.join();//等待线程
  18.     return 0;
  19. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
                       packaged_task
                                   登录后复制                        
  1. #include <iostream>
  2. #include <future>
  3. #include <memory>
  4. #include <thread>
  5. //pakcaged_task的使用
  6. //pakcaged_task 是一个模板类,实例化的对象可以对一个函数进行二次封装,
  7. //pakcaged_task可以通过get_future获取一个future对象,来获取封装的这个函数的异步执行结果
  8. int add(int num1,int num2)
  9. {
  10.     return num1+num2;
  11. }
  12. int main()
  13. {
  14.     //task(11, 22);  task可以当作一个可调用对象来调用执行任务
  15.     //但是它又不能完全的当作一个函数来使用
  16.     //std::async(std::launch::async, task, 11, 22);
  17.     //std::thread thr(task, 11, 22);
  18.     //但是我们可以把task定义成为一个指针,传递到线程中,然后进行解引用执行
  19.     //但是如果单纯指针指向一个对象,存在生命周期的问题,很有可能出现风险
  20.     //思想就是在堆上new对象,用智能指针管理它的生命周期
  21.     auto pTask=std::make_shared<std::packaged_task<int(int,int)>>(add);//make_shared效率更高
  22.     std::future<int> fut=pTask->get_future();//建立同步关系
  23.     std::thread th([pTask](){
  24.         (*pTask)(1,2);
  25.     });
  26.     std::cout<<"结果为:"<<fut.get()<<std::endl;
  27.     th.join();
  28.     return 0;
  29. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
                       实现线程池
     packaged_task+futur
                                   登录后复制                        
  1. #include <thread>
  2. #include <future>
  3. #include <vector>
  4. #include <mutex>
  5. #include <atomic>
  6. #include <condition_variable>
  7. #include <functional>
  8. #include <memory>
  9. #include <iostream>
  10. class ThreadPool
  11. {
  12. public:
  13.     using Func = std::function<void(void)>;
  14.     ThreadPool(int threadCount = 1) // 构造函数
  15.         : _stop(false)
  16.     {
  17.         for (int i = 0; i < threadCount; ++i) // 创建指定数量的线程
  18.         {
  19.             _threads.emplace_back(std::thread(&ThreadPool::entry, this));
  20.         }
  21.     }
  22.     ~ThreadPool() // 析构函数
  23.     {
  24.         stop();
  25.     }
  26.     template <class F, class... Args>
  27.     // func是用户传过来的函数,接下来就是不定参数
  28.     // push函数会把func封装成一个异步任务(packaged_task),抛入任务池中,由工作线程取出处理
  29.     // push函数会返回一个future对象,让用户能得到任务结果
  30.     auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))> // 用户添加任务
  31.     {
  32.         // 1.将用户发过来的函数封装成packaged_task
  33.         using retType = decltype(func(args...));
  34.         auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...); // 将参数绑定,注意使用完美转发
  35.         auto task = std::make_shared<std::packaged_task<retType()>>(tmp_func);
  36.         std::future<retType> fut = task->get_future();
  37.         // 2.构造匿名lambda表达式,里面执行task
  38.         {
  39.             std::unique_lock<std::mutex> lock(_mutex); // 加锁
  40.             // 将lambda表达式抛入任务池中
  41.             _tasks.push_back([task]()
  42.                            { (*task)(); });
  43.             _cond.notify_one(); // 唤醒一个线程
  44.         }
  45.         return fut;
  46.     }
  47.     void stop() // 线程池停止工作
  48.     {
  49.         // 0.如果已经停止
  50.         if (_stop == true)
  51.             return;
  52.         // 1.将停止标志设为true
  53.         _stop = true;
  54.         // 2.唤醒所有线程
  55.         _cond.notify_all();
  56.         // 3.等待所有线程
  57.         for (auto &th : _threads)
  58.         {
  59.             th.join();
  60.         }
  61.     }
  62. private:
  63.     // 线程入口函数,内部不断从任务池中取出任务执行
  64.     void entry()
  65.     {
  66.         while (!_stop)
  67.         {
  68.             std::vector<Func> tmpTasks;
  69.             {
  70.                 std::unique_lock<std::mutex> lock(_mutex); // 加锁
  71.                 _cond.wait(lock, [this]()
  72.                            { return _stop || !_tasks.empty(); }); // 等待唤醒
  73.                 tmpTasks.swap(_tasks);//取出任务
  74.             }
  75.             std::cout<<"线程开始执行任务"<<std::endl;
  76.             for(auto& task:tmpTasks)//执行任务
  77.             {
  78.                 task();
  79.             }
  80.         }
  81.     }
  82. private:
  83.     std::atomic<bool> _stop;           // 线程池是否停止运行
  84.     std::mutex _mutex;                 // 互斥
  85.     std::condition_variable _cond;     // 同步
  86.     std::vector<Func> _tasks;          // 任务池
  87.     std::vector<std::thread> _threads; // 线程池
  88. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
                       需求分析

     1、焦点概念

     其中, Broker Server是最核⼼的部门, 负责消息的存储和转发。
     ⽽在AMQP(Advanced Message Queuing Protocol-⾼级消息队列协议,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,为⾯向消息的中间件设计,使得服从该规范的客⼾端应⽤和消息中间件服 务器的全功能互操作成为可能)模型中,也就是消息中间件服务器Broker中,⼜存在以下概念:
     

  • 假造机 (VirtualHost): 类似于 MySQL 的 "database", 是⼀个逻辑上的集合。⼀个 BrokerServer 上 可以存在多个 VirtualHost
  • 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则, 把消息转 发给不同的 Queue
  • 队列 (Queue): 真正⽤来存储消息的部门, 每个消费者决定⾃⼰从哪个 Queue 上读取消息
  • 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 "多对多"关系,使⽤⼀个关联表就可以把这两个概念接洽起来
  • 消息 (Message): 传递的内容
     所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是:
     ⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
     ⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange)
     

     

     2、焦点API

     对于 Broker 来说, 要实现以下核⼼ API,通过这些 API 来实现消息队列的基本功能
     1. 创建交换机 (exchangeDeclare)
     2. 销毁交换机 (exchangeDelete)
     3. 创建队列 (queueDeclare)
     4. 销毁队列 (queueDelete)
     5. 创建绑定 (queueBind)
     6. 解除绑定 (queueUnbind)
     7. 发布消息 (basicPublish)
     8. 订阅消息 (basicConsume)
     9. 确认消息 (basicAck)
     10. 取消订阅 (basicCancel)
     另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型
     3、交换机范例

     对于 RabbitMQ 来说, 主要⽀持四种交换机范例:
     

  • Direct
  • Fanout
  • Topic
  • Header
     其中 Header 这种⽅式⽐较复杂, ⽐较少⻅。常⽤的是前三种交换机范例,项⽬中也主要实现这三种
     

  • Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名
  • Fanout: ⽣产者发送的消息会被复制到该交换机的全队伍列中
  • Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列
     这三种操作就像给 qq 群发红包.
     Direct 是发⼀个专属红包, 只有指定的⼈能领.
     Fanout 是使⽤了魔法, 发⼀个 10 块钱红包, 群⾥的每个⼈都能领 10 块钱.
     Topic 是发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈, 才能领. 也是每个领到的⼈都能领 10 块钱.
     4、持久化

     Exchange, Queue, Binding, Message 等数据都有持久化需求
     当程序重启 / 主机重启, 保证上述内容不丢失
     5、网络通讯

     ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通讯。
     在⽹络通讯的过程中, 客⼾端部门要提供对应的 api, 来实现对服务器的操作。
     1. 创建 Connection
     2. 关闭 Connection
     3. 创建 Channel
     4. 关闭 Channel
     5. 创建队列 (queueDeclare)
     6. 销毁队列 (queueDelete)
     7. 创建交换机 (exchangeDeclare)
     8. 销毁交换机 (exchangeDelete)
     9. 创建绑定 (queueBind)
     10. 解除绑定 (queueUnbind)
     11. 发布消息 (basicPublish)
     12. 订阅消息 (basicConsume)
     13. 确认消息 (basicAck)
     14. 取消订阅(basicCancel)
     可以看到, 在 Broker 的基础上, 客⼾端还要增长 Connection 操作和 Channel 操作
     

  • Connection 对应⼀个 TCP 连接
  • Channel 则是 Connection 中的逻辑通道
     ⼀个 Connection 中可以包含多个 Channel。Channel 和 Channel 之间的数据是独⽴的,不会相互⼲扰。如许做主要是为了可以或许更好的复⽤ TCP 连接, 达到⻓连接的结果, 避免频仍的创建关闭 TCP 连接。
     Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥详细的线缆.
     

     6、消息应答

     被消费的消息, 必要进⾏应答。应答模式分成两种:
     

  • ⾃动应答: 消费者只要消费了消息, 就算应答完毕了,Broker 直接删除这个消息
  • ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答哀求之后, 才真正删除这个消息
     ⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.
     模块分别

     1、服务端模块

     1.1、数据管理模块

     1.1.1、交换机数据管理模块

     要管理的数据:描述了一个交换机应该有什么数据
     

  • 交换机名称:唯一标识
  • 交换机范例:决定了消息的转发方式,每个队列绑定中有个binding_key,每条消息中有个routing_key
           直接交换:binding_key与routing_key相同时,则将消息放入队列
      广播交换:将消息放入交换机绑定的全队伍列中
      主题交换:routing_key与多个绑定队列的binding_key有个匹配规则,匹配成功了则放入
         

  • 持久化标志:决定了当前交换机信息是否必要持久化存储
  • 自动删除标志:指的是关联了当前交换机的全部客户端都退出了,是否要自动删除交换机
  • 交换机的其它参数:当前未使用
     对交换机的管理操作:
     

  • 创建交换机:本质上必要的是声明--------强断言头脑-有就OK,没有则创建的意思
  • 删除交换机:注意每个交换机都会绑定一个或多个队列,意味着会有一个或多个绑定信息,因此删除交换机必要删除相干的绑定信息
  • 获取指定名称交换机
  • 获取当前交换机数目
     1.1.2、队列数据管理模块

     要管理的数据:
     

  • 队列名称:唯一的表现
  • 持久化标志:决定了是否将队列信息持久化存储起来,重启后,这个队列是否还存在
  • 是否独占标志:独占指的就是只有当前客户端本身可以或许订阅队列消息
  • 自动删除标志:当订阅了当前队列的全部客户端退出后,是否删除队列(暂未实现)
  • 其它参数:暂未使用
     提供的管理操作:
     

  • 创建队列
  • 删除队列
  • 获取指定队列信息
  • 获取队列数目
  • 获取全队伍列名称:当体系重启后,必要重新加载数据,加载汗青消息(消息以队列为单元存储在文件中),而加载消息必要知道队列名称,因为背面消息存储时,存储文件以队列名称命名
     一个队列如果持久化标志为false,则意味偏重启后,队列就没了,也就没有客户端可以或许订阅队列的消息,因此这个队列的消息如果持久化存储了,是没故意义的,因此通常一个队列的持久化标志是false,那么它的消息也就不必要持久化。
     1.1.3、绑定数据管理模块

     描述一下将哪个队列与哪个交换机绑定到了一起
     管理的数据:
     

  • 交换机名称
  • 队列名称
  • binding_key:绑定密钥,描述了交换机在主题交换&直接交换的消息发布匹配规则
           由数字、字符、_、#、。、*构成      binding_key:  news.music.#       消息中的routing_key:   news.music.pop
          管理的操作:
     

  • 添加绑定
  • 解除绑定
  • 获取交换机相干的全部绑定信息:删除交换机的时候,要删除相干的绑定信息。当消息发布到交换机,交换机得通过这些信息来将消息发布到指定队列
  • 获取队列相干的全部绑定信息:删除队列的时候,要删除相干的绑定信息
  • 获取绑定信息数目
     1.1.4、消息数据管理模块

     

  • 消息信息:
     

  • 消息属性
     

  • ID:消息的唯一标识
  • 持久化标志
  • routing_key:决定了当前消息要发布的队列(消息发布到交换机后,根据绑定队列的binding_key决定是否发布到指定队列)
     

  • 消息主体:消息内容
     以下是服务端为了管理所添加的信息
     

  • 存储偏移量:消息以队列为单元存储在文件中,这个偏移量,是当前相对于文件起始位置的偏移量
  • 消息长度:从偏移量位置取出指定长度的消息(解决粘报标题)
  • 是否有用标志:标识当前消息是否已经被删除(删除一条消息,并不必要从文件里把背面的数据拷贝到前面,而是设置标志,当一个文件中,有用消息占总消息比例不到50%,且数据量凌驾2000,则进行垃圾回收,重新整理文件数据存储)当体系重启也只必要重新加载有用消息即可(相称于进行了一次垃圾回收)
     

  • 消息的管理:
     

  • 管理方式:以队列为单元进行管理(因为消息的全部操作都是以队列为单元)
  • 管理数据:
     

  • 消息链表:保存全部的待推送消息
  • 待确认消息hash:消息推送给客户端后,会等候客户端进行消息确认,收到确认后,才会真正删除消息
  •  持久化消息hash:假设消息都会进行持久化存储,操作过程中会存在垃圾回收操作,但是垃圾回收会改变息的存储位置,内存中的消息也会存储消息的现实存储位置,垃圾回收后就不一致了。因此每次垃圾回收后都必要用新的位置,去更新持久化的消息。
  •  持久化的有用消息数目
  •   持久化的总消息数目:决定了什么时候进行垃圾回收
     

  • 管理操作:
     

  • 向队列新增消息
  • 获取队首消息:获取消息后,就会将消息从待推送链表删除(不再是待发送,而是待确认)
  • 对消息进行确认;从待确认消息中移除消息,并进行持久化数据的删除
  • 恢复队列汗青消息:主要是在构造函数中进行(只有在重启时进行)
  • 垃圾回收(消息持久化子模块完成):持久化文件中有用消息比例小于50%,且总消息数目凌驾2000进行垃圾回收
  • 删除队列相干消息文件:当一个队列被删除了,那它的消息也就没有存在意义了。
     

  • 队列消息管理:
     

  • 初始化队列消息结构
  • 移除队列消息结构:在一个队列创建/删除时调用
  • 向队列新增消息
  • 对队列消息进行确认
  • 恢复队列汗青消息
     以上四个模块分别实现数据的管理(增删查),以及持久化的存储
     1.2、假造机数据管理模块

     假造机其实就是交换机+队列+绑定+消息的逻辑单元。
     因此假造机的数据管理其实就是将以上四个模块的归并管理
     要管理的数据:
     

  • 交换机数据管理句柄
  • 队列数据管理句柄
  • 绑定信息数据管理句柄
  • 消息数据管理句柄
     要管理的操作:
     

  • 声明/删除交换机:注意---在删除交换机的时候要删除相干的绑定信息
  • 声明/删除队列:注意---在删除队列的时候,要删除相干的绑定信息
  • 队列的绑定/解除绑定
  • 获取指定队列的消息
  • 对指定队列的指定消息进行确认
  • 获取交换机相干的全部绑定信息:一个消息要发布给指定交换机的时候,交换机获取全部绑定信息啦,来确定消息要发布到哪个队列
     1.3、交换路由模块

     消息的发布,将一条消息发布到交换机上,由交换机决定放入哪些队列。而决定交给哪个队列,其中交换机范例起到了很大的作用(直接交换、广播交换、主题交换)
     直接交换和广播交换头脑都较为简朴,而主题交换涉及到了一个规则匹配的流程。交换路由模块就是专门做匹配过程的。
     在每个队列和交换机的绑定信息中,都有一个binding_key,这是队列发布的匹配规则。
     在每条要发布的消息中,都有一个routing_key,这是消息的发布规则。
     交换机有三种交换范例:直接、广播、主题。
     

  • 广播:直接将消息发布给交换机的全部绑定队列
  • 直接:routing_key与binding_key完全一致则匹配成功
  • 主题:binding_key中是匹配规则,routing_key是消息规则,匹配成功才能发布
     交换路由模块本质上来说,没有要管理的数据,只有向外提供的路由匹配操作:
     

  • 提供一个判定routing_key与binding_key是否可以或许匹配成功的接口。
  • 判定routing_key是否符合规定:只能由 数字字母_. 构成
  • 判定binding是否符合规定:只能由 数字字母_.#* 构成
     1.4、消费者管理模块

     消费者指的是订阅了一个队列消息的客户端,一旦这个队列有了消息就会推送给这个客户端。在焦点API中有一个订阅消息的服务,这里的订阅不是订阅了某条消息,而是订阅了某个队列的消息。当前主要实现了消息推送功能,因此一旦有了消息就要可以或许找到消费者相干的信息(消费者对应的信道)。
     消费者信息:
     

  • 消费者表现--tag
  • 订阅队列名称:当前队列有消息就推送给这个客户端,以及当客户端收到消息,必要对指定队列的消息进行确认
  • 自动确认标志:自动确认---推送消息后,直接删除消息不必要额外确认,手动确认---推送消息后,必要收到确认回复再去删除消息
  • 消费处理回调函数指针:队列有一条消息后,通过哪个函数进行处理(函数内部其实逻辑固定---向指定客户端推送消息)
     消费者管理:
     

  • 管理头脑:一队列为单元进行管理
     

  • 每个消费者订阅的都是指定队列的消息,消费者对消息进行确认也是以队列进行确认
  • 最关键的是:当指定队列中有消息了,必然是获取订阅了这个队列的消费者进行消息推送
     

  • 队列消费者管理结构:
     

  • 数据信息:消费者链表---保存当前队列的全部消费者信息(RR轮转每次取出下一个消费者进行消息推送---一条消息只必要被一个客户端处理即可)
  • 管理操作:
     

  • 新增消费者
  • RR轮转获取消费者
  • 删除消费者
  • 队列消费者数目
  • 是否为空
     

  • 管理操作:
     

  • 初始化队列消费者结构
  • 删除队列消费者结构
  • 向指定队列添加消费者
  • 获取指定队列消费者
  • 删除指定队列消费者
     1.5、信道管理模块

     信道是网络通讯中的一个概念,叫做通讯通道。
     网络通行的时候,必然都是通过网络通讯连接来完成的,为了可以或许更加充分地利用资源,因此对通讯连接又进行了进一步的细化,细化出了通讯信道。
     对于用户来说,一个通讯信道,就是进行网络通讯的载体,而一个真正的通讯连接,可以创建出多个通讯信道。
     每一个信道之间,在用户眼中是相互独立的,而在本质的底层它们使用同一个通讯连接进行网络服务。
     一个连接可能会对应有多个通讯信道。一旦某个客户端要关闭通讯,关闭的不是连接,而是本身对应通讯通道。,关闭信道就必要将客户端的订阅取消。
     信道提供的服务操作:
     

  • 声明/删除交换机
  • 声明/删除队列
  • 绑定/解绑队列与交换机
  • 发布消息/订阅队列消息/取消队列订阅/队列消息确认
     信道要管理的数据:
     

  • 信道ID
  • 信道关联的假造机句柄
  • 信道关联的消费者句柄:当信道关闭的时候,全部关联的消费者订阅都要取消,相称于删除全部的相干消费者
  • 工作线程池句柄:信道进行了消息发布到指定队列操作之后,从指定队列获取一个消费者,对这条消息进行消费,也就是将这条消息推送给一个客户端的操作交给线程池执行,并非每个信道都有一个线程池,而是整个服务器有一个线程池,各人全部的信道都是通过同一个线程池进行异步操作而已
     信道的管理:
     

  • 创建一个信道
  • 关闭一个信道
  • 获取指定信道句柄
     1.6、连接管理模块

     就是一个网络通讯对应的连接。
     使用muduo库来实现底层通讯,muduo库中本身就有Connection连接的概念和对象类。但是我们在连接中,尚有一个上层通讯信道的概念,这个概念在muduo库中时没有的。因此,我们必要在用户层面,对这个muduo库的Connection连接进行二次封装。
     当一个连接要关闭时,就应该把连接关联的信道全部关闭,因此也要管理关联的信道。形成我们本身所必要的连接管理。
     管理数据:
     

  • muduo库的通讯连接
  • 当前连接关联的信道管理句柄
     连接提供的操作:
     

  • 创建信道
  • 关闭信道
     管理的操作:
     

  • 新增连接
  • 关闭连接
  • 获取指定连接信息
     1.7、服务端BrokerServer模块

     对以上全部模块的整合,整合成一个服务器
     

  • 一个服务器有一个工作线程池,其它全部的信道操作的都是这同一个线程池
  • 一个服务器有一个假造机,其它全部交换机,队列,绑定,消息的操作都是针对这个假造机进行的
  • 一个服务器有一个消费者管理
  • 通讯相干连接管理,协议处理模块句柄,也是一整个服务器有一套
     管理信息:
     

  • 假造机管理模块句柄
  • 消费者管理模块句柄
  • 连接管理模块句柄
  • 工作线程池句柄
  • muduo库通讯所需元素......
     2、客户端模块

     2.1、消费者管理模块

     一个订阅客户端,当订阅一个队列消息时,就相称于创建了一个消费者
     

  • 消费者标识
  • 订阅的队列消息
  • 自动确认标志
  • 消息回调处理函数指针
     当当前消费者订阅了某一个队列的消息,这个队列有了消息之后,就会将消息推送给这个客户端,这个时候收到了消息则使用回调函数进行处理,处理完毕后,根据确认标志决定是否进行消息确认。
     管理操作:增删查
     2.2、信道管理模块

     客户端的信道与服务端的信道是逐一对应的,服务端信道提供的服务,客户端都有,相称于服务端为客户端提供服务,客户端为用户提供服务。
     全部提供的操作与服务端雷同,因为客户端给用户要提供什么服务,服务器就要给客户端提供什么服务。
     管理信息:
     

  • 消费者管理句柄:每个信道都有本身相干的消费者
  • 线程池句柄:对推送过来的消息进行回调处理,处理过程通过工作线程来进行
  • 信道关联的连接
     信道提供的服务:
     

  • 声明/删除交换机
  • 声明/删除交换机
  • 绑定/解绑队列与交换机
  • 发布消息/确认消息
  • 订阅消息队列/取消订阅队列消息
  • 创建/关闭信道
     信道的管理:信道的增删查
     2.3、连接管理模块

     对于用户来说,全部的服务都是通过信道完成的,信道在用户交角度就是一个通讯通道(而不是连接)。因此全部的哀求都是通过信道来完成的,连接的管理就包含了客户端资源的整合。
     客户端连接的管理,本质上就是对客户端TcpClient的二次封装和管理。面临用户。不必要有客户端的概念,连接对于用户来说就是客户端,通过连接创建信道,通过信道完成本身所必要的服务。因此,当前客户端这边的连接,就是一个资源的载体
     管理操作:
     

  • 连接服务器
  • 创建信道
  • 关闭信道
  • 关闭连接
     管理的资源:
     

  • 工作线程池
  • 连接关联的信道管理句柄
     2.4、异步线程池模块

     

  • TcpClient模块必要一个EventLoopThread模块进行IO事件监控。
  • 收到推送消息后,必要对推送过来的消息进行处理,因此必要一个线程池来帮助我们完成消息处理的过成
     将异步工作线程池模块单独拎出来,原因是多个连接用一个EventLoopThread进行IO事件监控就够了,以及全部的推送消息也只必要有一个线程池就够了。并不必要每个连接都有一个EveentLoop,也不必要每个信道都有本身的线程池。
     2.4、基于以上三个模块封装实现:订阅/发布客户端

     订阅客户端:订阅一个队列的消息,收到推送过来消息进行处理。
     发布客户端:向一个交换机发布消息
     

     项目创建

     

     mqdemo:编写⼀些功能⽤例时所在的⽬录
     mqcommon: 公共模块代码(线程池,数据库访问,⽂件访问,⽇志打印,pb相干,以及其他的⼀些琐碎功能模块代码)
     mqclient: 客⼾端模块代码
     mqserver: 服务器模块代码
     mqtest: 单元测试
     mqthird: ⽤到的第三⽅库存放⽬录
     服务端实现

     1、实现日志工具

                                   登录后复制                        
  1. #pragma once
  2. #include <iostream>
  3. #include <ctime>
  4. namespace lsxmq
  5. {
  6. #define DBG_LEVEL 0
  7. #define INF_LEVEL 1
  8. #define ERR_LEVEL 2
  9. #define DEFAULT_LEVEL DBG_LEVEL
  10. #define LOG(levelStr, level, format, ...)                                                                \
  11.     {                                                                                                    \
  12.         if (level >= DEFAULT_LEVEL)                                                                      \
  13.         {                                                                                                \
  14.             time_t t = time(nullptr);                                                                    \
  15.             struct tm *ptm = localtime(&t);                                                              \
  16.             char timeStr[32];                                                                            \
  17.             strftime(timeStr, 31, "%H:%M:%S", ptm);                                                      \
  18.             printf("[%s][%s][%s:%d]" format "\n", levelStr, timeStr, __FILE__, __LINE__, ##__VA_ARGS__); \
  19.         }                                                                                                \
  20.     }
  21.     //__VA_ARGS__是使用可变参数,##是为了保证没有可变参数时忽略前面的逗号
  22.     // 宏函数必须在一行里,在换行时加上\是为了转义\n
  23. #define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
  24. #define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
  25. #define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
  26. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
                       2、实现使用Helper工具

     2.1、Sqlite基础操作类

                                   登录后复制                        
  1. #pragma once
  2. // 封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成对数据增删改查操作
  3.     // 1.创建/打开数据库文件
  4.     // 2.针对打开的数据库进行操作
  5.     // 2.1表的操作
  6.     // 2.2数据的操作
  7.     // 3.关闭数据库
  8.     class SqliteHelper
  9.     {
  10.     public:
  11.         typedef int (*SqliteCallback)(void *, int, char **, char **);
  12.         SqliteHelper(std::string dbfile)
  13.             : _dbfile(dbfile)
  14.         {
  15.         }
  16.         bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX) // 打开数据库文件
  17.         {
  18.             // 默认安全级别为串行化模式
  19.             // int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs )
  20.             int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | safeLevel, nullptr);
  21.             if (ret != SQLITE_OK)
  22.             {
  23.                 ELOG("创建/打开数据库文件失败:%s",sqlite3_errmsg(_handler));
  24.                 return false;
  25.             }
  26.             return true;
  27.         }
  28.         bool exec(const std::string &sql, SqliteCallback cb, void *arg) // 进行操作
  29.         {
  30.             // int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),void* arg, char **err)
  31.             int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
  32.             if (ret != SQLITE_OK)
  33.             {
  34.                 ELOG("%s执行失败:%s",sql.c_str(),sqlite3_errmsg(_handler));
  35.                 return false;
  36.             }
  37.             return true;
  38.         }
  39.         void close() // 关闭数据库文件
  40.         {
  41.             // int sqlite3_close_v2(sqlite3*)
  42.             sqlite3_close_v2(_handler);
  43.         }
  44.     private:
  45.         sqlite3 *_handler;   // 句柄
  46.         std::string _dbfile; // 数据库文件名
  47.     };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
                       2.2、字符串操作类

     分割字符串功能:
                                   登录后复制                        
  1. static size_t split(const std::string& str,const std::string& sep,std::vector<std::string>& vs)
  2.     {
  3.         size_t idx=0;
  4.         while(idx<str.size())
  5.         {
  6.             size_t pos=str.find(sep,idx);
  7.             //找不到分隔符了
  8.             if(pos==std::string::npos)
  9.             {
  10.                 vs.push_back(str.substr(idx,str.size()-idx));
  11.                 return vs.size();
  12.             }
  13.             //pos的位置是一个分割符,而idx是上一个分割符的下一个位置
  14.             if(idx==pos)//意味着出现连续分割符,中间没有有效子串
  15.             {
  16.                 idx+=sep.size();
  17.                 continue;
  18.             }
  19.             vs.push_back(str.substr(idx,pos-idx));
  20.             idx=pos+sep.size();
  21.         }
  22.         return vs.size();
  23.     }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
                       2.3、UUID天生器类

     1.天生8个0~255之间的随机数
     2.取出一个8字节的序号
     通过以上数据,构成16字节的数据,转换为16进制字符,共32个
     随机数的天生
                                   登录后复制                        
  1. class UUIDHelper
  2.     {
  3.     public:
  4.         static std::string uuid()
  5.         {
  6.             std::random_device rd; // 这是一个随机数生成装置
  7.             // rd();//这样就可以生成机器随机数,但是生成机器随机数效率太低,所以不使用这种方法
  8.             // 实际上是把这个机器随机数作为随机数种子来生成伪随机数
  9.             std::mt19937_64 gernator(rd()); // 使用梅森旋转算法,生成一个伪随机数
  10.             // 因为要生成的随机数范围是0~255,所以需要限定随机数的生成范围
  11.             std::uniform_int_distribution<int> distribution(0, 255);
  12.             std::stringstream uuid;
  13.             for (int i = 0; i < 8; ++i) // 生成16位16进制字符
  14.             {
  15.                 int number = distribution(gernator); // 生成随机数
  16.                 uuid << std::setw(2) << std::setfill('0') << std::hex << number;
  17.                 if (i == 3 || i == 5 || i == 7)
  18.                 {
  19.                     uuid << "-";
  20.                 }
  21.             }
  22.             // 生成序号
  23.             static std::atomic<size_t> seq(0);
  24.             size_t tmpSeq = seq.fetch_add(1); // 每次加一
  25.             for (int i = 7; i >= 0; --i)
  26.             {
  27.                 uuid << std::setw(2) << std::setfill('0') << std::hex << (tmpSeq >> (i * 8) & 0xff);
  28.                 if (i == 6)
  29.                 {
  30.                     uuid << "-";
  31.                 }
  32.             }
  33.             return uuid.str();
  34.         }
  35.     };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
                       2.3、文件操作类

                                   登录后复制                        
  1. class FileHelper
  2.     {
  3.     public:
  4.         FileHelper(const std::string& filename)
  5.             :_filename(filename)
  6.         {}
  7.         bool exists()//文件是否存在
  8.         {
  9.             struct stat st;
  10.             //返回值为0表示能获取文件信息,文件自然就是存在的
  11.             return stat(_filename.c_str(),&st)==0;
  12.         }
  13.         size_t size()//文件大小
  14.         {
  15.             struct stat st;
  16.             if(stat(_filename.c_str(),&st)!=0)
  17.             {
  18.                 return 0;
  19.             }
  20.             return st.st_size;
  21.         }
  22.         bool read(char* body,size_t offset,size_t len)//从文件指定位置读取指定长度内容
  23.         {
  24.             //1.打开文件
  25.             std::ifstream ifs(_filename.c_str(),std::ios_base::in|std::ios_base::binary);
  26.             if(ifs.is_open()==false)
  27.             {
  28.                 ELOG("%s文件打开失败",_filename.c_str());
  29.                 ifs.close();
  30.                 return false;
  31.             }
  32.             //2.移动到指定位置
  33.             ifs.seekg(offset,std::ios_base::beg);
  34.             //3.读取文件内容
  35.             ifs.read(&body[0],len);
  36.             if(ifs.good()==false)
  37.             {
  38.                 ELOG("%s文件读取失败",_filename.c_str());
  39.                 ifs.close();
  40.                 return false;
  41.             }
  42.             //4.关闭文件
  43.             ifs.close();
  44.             return true;
  45.         }
  46.         bool read(std::string& body)//从文件读取内容
  47.         {
  48.             size_t fsize=this->size();
  49.             //根据fsize调整body的大小
  50.             body.resize(fsize);
  51.             return read(&body[0],0,fsize);
  52.         }
  53.         bool write(const char* body,size_t offset,size_t len)//向文件指定位置写入内容
  54.         {
  55.             //1.打开文件
  56.             std::fstream fs(_filename.c_str(),std::ios_base::in|std::ios_base::out|std::ios_base::binary);
  57.             //这里不能ofstream
  58.             //想要跳转到文件的指定位置必须有读权限,istream可以fstream可以,但是ofstream不可以
  59.             if(fs.is_open()==false)
  60.             {
  61.                 ELOG("%s文件打开失败",_filename.c_str());
  62.                 fs.close();
  63.                 return false;
  64.             }
  65.             //2.移动到指定位置
  66.             fs.seekg(offset,std::ios_base::beg);
  67.             //3.向文件写入内容
  68.             fs.write(body,len);
  69.             if(fs.good()==false)
  70.             {
  71.                 ELOG("%s文件写入失败",_filename.c_str());
  72.                 fs.close();
  73.                 return false;
  74.             }
  75.             //4.关闭文件
  76.             fs.close();
  77.             return true;
  78.         }
  79.         bool write(const std::string& body)//向文件写入内容
  80.         {
  81.             return write(&body[0],0,body.size());
  82.         }
  83.         static std::string parentDirectory(const std::string& filename)//返回当前文件的父路径
  84.         {
  85.             //   aaa/bbb/ccc/ddd/e.txt
  86.             int pos=filename.find_last_of('/');
  87.             if(pos==std::string::npos)//没找到
  88.             {
  89.                 //e.txt
  90.                 return "./";
  91.             }
  92.             return filename.substr(0,pos+1);
  93.         }
  94.         bool rename(const std::string& filename)//修改文件名称
  95.         {
  96.             return (::rename(_filename.c_str(),filename.c_str())==0);
  97.         }
  98.         static bool createFile(const std::string& filename)//创建文件
  99.         {
  100.             std::ofstream ofs(filename.c_str(),std::ios_base::binary|std::ios_base::out|std::ios_base::app);
  101.             if(ofs.is_open()==false)
  102.             {
  103.                 ELOG("%s文件创建失败",filename.c_str());
  104.                 ofs.close();
  105.                 return false;
  106.             }
  107.             ofs.close();
  108.             return true;
  109.         }
  110.         static bool removeFile(const std::string& filename)//删除文件
  111.         {
  112.             return (::remove(filename.c_str())==0);
  113.         }
  114.         static bool createDirectory(const std::string& path)//创建目录
  115.         {
  116.             size_t idx=0;
  117.             while(idx<path.size())
  118.             {
  119.                 int pos=path.find('/',idx);
  120.                 if(pos==std::string::npos)
  121.                 {
  122.                     if(mkdir(path.c_str(),0775)!=0)
  123.                     {
  124.                         ELOG("%s目录创建失败",path.c_str());
  125.                         return false;
  126.                     }
  127.                     return true;
  128.                 }
  129.                 std::string subPath=path.substr(0,pos);
  130.                 if(mkdir(subPath.c_str(),0775)!=0)
  131.                 {
  132.                     ELOG("%s目录创建失败",subPath.c_str());
  133.                     return false;
  134.                 }
  135.                 idx=pos+1;
  136.             }
  137.             return true;
  138.         }
  139.         static bool removeDirectory(const std::string& path)//删除目录
  140.         {
  141.             //rmdir必须保证文件夹是空的,所以不用这种方法
  142.             std::string cmd="rm -rf "+path;
  143.             return system(cmd.c_str())==0;
  144.         }
  145.         
  146.     private:
  147.         std::string _filename;
  148.     };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
                       3、消息范例界说与交换机范例界说

                                   登录后复制                        
  1. syntax= "proto3";
  2. package lsx;
  3. enum ExchangType//交换机类型
  4. {
  5.     UNKNOWTYPE=0;//因为必须从0开始,所以加一个未知类型
  6.     DIRECT=1;//直接
  7.     FANOUT=2;//广播
  8.     TOPIC=3;//主题
  9. };
  10. enum DeliveryMode//消息投递方式
  11. {
  12.     UNKONWMODE=0;
  13.     DURABLE=1;   //持久化
  14.     UNDURABLE=2;   //非持久化
  15. };
  16. message BasicProperties //基本特性
  17. {
  18.     string id=1;   //消息ID
  19.     DeliveryMode deliveryMode=2;   //是否持久化
  20.     string routing_key=3;
  21. };
  22. message Message
  23. {
  24.     message Payload//有效载荷
  25.     {
  26.         BasicProperties properties=1;   //基本属性
  27.         string body=2;   //消息主体
  28.         string valid=3;   //是否有效
  29.         //消息是否有效不用bool类型,而是使用字符的0/1
  30.         //因为bool类型序列化时true和false所占长度不同
  31.         //会导致修改文件中消息有效位后消息长度发生变化
  32.     }
  33.     Payload payload=1;   //有效载荷
  34.     int32 offset=2;   //偏移量
  35.     int32 len=3;   //长度
  36. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
                       4、交换机数据管理

      界说交换机数据类
     

  • 交换机名称
  • 交换机范例
  • 是否持久化标志
  • 是否⾃动删除标志
  • 其他参数
     界说交换机数据持久化类(数据持久化的sqlite3数据库中)
     

  • 创建/删除交换机数据表
  • 新增交换机数据
  • 移除交换机数据
  • 查询全部交换机数据
  • 查询指定交换机数据(根据名称)
     界说交换机数据管理类
     

  • 声明交换机,并添加管理(存在则OK,不存在则创建)
  • 删除交换机
  • 获取指定交换机
  • 销毁全部交换机数据
                                   登录后复制                        
  1. namespace lsxmq
  2. {
  3.     //1.定义交换机类
  4.     struct Exchange
  5.     {
  6.         using ptr=std::shared_ptr<Exchange>;
  7.         std::string name;//交换机名称
  8.         ExchangeType type;//交换机类型
  9.         bool durable;//是否持久化
  10.         bool autoDel;//是否自动删除
  11.         std::unordered_map<std::string,std::string> args;//其它参数
  12.         Exchange(const std::string& ename,ExchangeType etype
  13.         ,bool edurable,bool eautoDel
  14.         ,std::unordered_map<std::string,std::string> eargs)
  15.             :name(ename),type(etype),durable(edurable)
  16.             ,autoDel(eautoDel),args(eargs)
  17.         {}
  18.         //args存储的是键值对,在存储数据库的时候,会组织一个格式进行存储:key=value&key=value
  19.         //将字符串反序列化为args
  20.         void setArgs(std::string strArgs);
  21.         //将args序列化为字符串
  22.         std::string getArgs();
  23.     };
  24.     //2.定义交换机数据持久化存储类--数据存储在sqlite数据库中
  25.     class ExchangeMapper
  26.     {
  27.     public:
  28.         ExchangeMapper(const std::string dbfile)
  29.             :_sqliteHelper(dbfile)
  30.         {}
  31.         bool createTable();//创建表
  32.         bool removeTable();//删除表
  33.         bool insert(Exchange::ptr& exchang);//向数据库插入一条交换机数据
  34.         bool erase(Exchange::ptr& exchang);//从数据库中删除一条交换机数据
  35.         std::unordered_map<std::string,Exchange::ptr> getAll();//获取数据库中所有的交换机数据
  36.         //name:ptr
  37.     private:
  38.         SqliteHelper _sqliteHelper;
  39.     };
  40.     //3.定义交换机数据管理类
  41.     class ExchangeManager
  42.     {
  43.     public:
  44.         ExchangeManager(const std::string dbfile)
  45.             :_mapper(dbfile)
  46.         {}
  47.         bool declareExchange(const std::string& ename,ExchangeType etype
  48.         ,bool edurable,bool eautoDel
  49.         ,std::unordered_map<std::string,std::string> eargs);//声明交换机
  50.         bool deleteExchange(const std::string& name);//删除交换机
  51.         Exchange::ptr selectExchange(const std::string& name);//获取指定交换机
  52.         bool exists(const std::string& anme);//指定交换机是否存在
  53.         void clear();//清理数据
  54.     private:
  55.         std::mutex _mutex;
  56.         ExchangeMapper _mapper;
  57.         std::unordered_map<std::string,Exchange::ptr> _exchanges;
  58.         
  59.     };
  60. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
                       5、队列数据的管理

     当前队列数据的管理,本质上是队列描述信息的管理,描述当前服务器上有哪些队列。
     界说队列描述数据类
     

  • 队列名称
  • 是否持久化标志
  • 是否独占标志
  • 是否⾃动删除标志
  • 其他参数
     界说队列数据持久化类(数据持久化的sqlite3数据库中)
     

  • 创建/删除队列数据表
  • 新增队列数据
  • 移除队列数据
  • 查询全队伍列数据
     界说队列数据管理类
     

  • 创建队列,并添加管理(存在则OK,不存在则创建)
  • 删除队列
  • 获取指定队列
  • 获取全队伍列
  • 判定指定队列是否存在
  • 获取队列数目
  • 销毁全队伍列数据
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include <unordered_map>
  5. #include <memory>
  6. #include <cassert>
  7. #include <mutex>
  8. namespace lsxmq
  9. {
  10.     // 1.定义消息队列类
  11.     struct MessageQueue
  12.     {
  13.         using ptr = std::shared_ptr<MessageQueue>;
  14.         std::string name;                                  // 队列名称
  15.         bool durable;                                      // 是否持久化
  16.         bool exclusive;                                    // 是否独占
  17.         bool autoDel;                                      // 是否自动删除
  18.         std::unordered_map<std::string, std::string> args; // 其它参数
  19.         MessageQueue() {}
  20.         MessageQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qautoDel, std::unordered_map<std::string, std::string> qargs)
  21.             : name(qname), durable(qdurable), exclusive(qexclusive), autoDel(qautoDel), args(qargs)
  22.         {
  23.         }
  24.         // args存储的是键值对,在存储数据库的时候,会组织一个格式进行存储:key=value&key=value
  25.         // 将字符串反序列化为args
  26.         void setArgs(std::string strArgs)
  27.         {
  28.             std::vector<std::string> subStr;
  29.             StrHelper::split(strArgs, "&", subStr); // 分割字符串
  30.             for (auto &str : subStr)                // 继续分割
  31.             {
  32.                 int pos = str.find("=");
  33.                 if (pos != std::string::npos) // 加入args
  34.                 {
  35.                     std::string first = str.substr(0, pos);
  36.                     std::string second = str.substr(pos + 1, str.size());
  37.                     args.insert(std::make_pair(first, second));
  38.                 }
  39.             }
  40.         }
  41.         // 将args序列化为字符串
  42.         std::string getArgs()
  43.         {
  44.             std::string res;
  45.             for (auto &a : args)
  46.             {
  47.                 res += a.first;
  48.                 res += "=";
  49.                 res += a.second;
  50.                 res += "&";
  51.             }
  52.             res.pop_back();
  53.             return res;
  54.         }
  55.     };
  56.     // 2.定义消息队列数据持久化存储类--数据存储在sqlite数据库中
  57.     using MessageQueueMap = std::unordered_map<std::string, MessageQueue::ptr>;
  58.     class MessageQueueMapper
  59.     {
  60.     public:
  61.         MessageQueueMapper(const std::string &dbfile)
  62.             : _sqliteHelper(dbfile)
  63.         {
  64.             std::string path = FileHelper::parentDirectory(dbfile);
  65.             if (FileHelper(path).exists() == false) // 父目录不存在就创建
  66.             {
  67.                 FileHelper::createDirectory(path);
  68.             }
  69.             assert(_sqliteHelper.open());
  70.             this->createTable();
  71.         }
  72.         void createTable() // 创建一张消息队列数据管理表
  73.         {
  74. #define CREATE_TABLE "create table if not exists messageQueueTable(\
  75.             name varchar(32) primary key,\
  76.             durable int,\
  77.             exclusive int,\
  78.             autoDel int,\
  79.             args varchar(128));"
  80.             bool ret = _sqliteHelper.exec(CREATE_TABLE, nullptr, nullptr);
  81.             if (ret == false)
  82.             {
  83.                 ELOG("消息队列数据库表创建失败");
  84.                 abort();
  85.             }
  86.         }
  87.         void removeTable() // 删除表
  88.         {
  89. #define DROP_TABLE "drop table messageQueueTable"
  90.             bool ret = _sqliteHelper.exec(DROP_TABLE, nullptr, nullptr);
  91.             if (ret == false)
  92.             {
  93.                 ELOG("消息队列数据库表删除失败");
  94.                 abort();
  95.             }
  96.         }
  97.         bool insert(MessageQueue::ptr& mqp) // 向数据库插入一条消息队列数据
  98.         {
  99. #define INSERT "insert into messageQueueTable values('%s',%d,%d,%d,'%s');"
  100.             char sql[256] = {0};
  101.             sprintf(sql, INSERT, mqp->name.c_str(), mqp->durable, mqp->exclusive, mqp->autoDel, mqp->getArgs().c_str());
  102.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  103.         }
  104.         bool erase(const std::string &qname) // 从数据库中删除一条消息队列数据
  105.         {
  106. #define ERASE "delete from  messageQueueTable where name='%s';"
  107.             char sql[256] = {0};
  108.             sprintf(sql, ERASE, qname.c_str());
  109.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  110.         }
  111.         MessageQueueMap recovery() // 获取数据库中所有的消息队列数据(恢复)
  112.         {
  113. #define SELECT_ALL "select name,durable,exclusive,autoDel,args from  messageQueueTable"
  114.             MessageQueueMap result;
  115.             bool ret = _sqliteHelper.exec(SELECT_ALL, sqliteCallback, &result);
  116.             if (ret == false)
  117.             {
  118.                 ELOG("恢复消息队列数据失败");
  119.                 abort();
  120.             }
  121.             return result;
  122.         }
  123.         ~MessageQueueMapper()
  124.         {
  125.             _sqliteHelper.close();
  126.         }
  127.     private:
  128.         static int sqliteCallback(void *arg, int col, char **row, char **fields)
  129.         {
  130.             MessageQueueMap *result = (MessageQueueMap *)arg;
  131.             auto mqp = std::make_shared<MessageQueue>();
  132.             mqp->name = row[0];
  133.             mqp->durable = (bool)std::stoi(row[2]);
  134.             mqp->exclusive = (bool)std::stoi(row[2]);
  135.             mqp->autoDel = (bool)std::stoi(row[3]);
  136.             if (row[4])
  137.                 mqp->setArgs(row[4]); // 其它参数可能不存在
  138.             result->insert(std::make_pair(mqp->name, mqp));
  139.             return 0;
  140.         }
  141.     private:
  142.         SqliteHelper _sqliteHelper;
  143.     };
  144.     // 3.定义消息队列数据管理类
  145.     class MessageQueueManager
  146.     {
  147.     public:
  148.         using ptr = std::shared_ptr<MessageQueueManager>;
  149.         MessageQueueManager(const std::string dbfile)
  150.             : _mapper(dbfile)
  151.         {
  152.             _messageQueues = _mapper.recovery(); // 恢复数据
  153.         }
  154.         bool declareMessageQueue(const std::string &name, bool durable,
  155.         bool exclusive, bool autoDel, std::unordered_map<std::string, std::string> args) // 声明消息队列
  156.         {
  157.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  158.             auto it = _messageQueues.find(name);
  159.             if (it != _messageQueues.end()) // 已经存在了
  160.             {
  161.                 return true;
  162.             }
  163.             auto mqp = std::make_shared<MessageQueue>(name, durable,exclusive, autoDel, args);
  164.             if (durable == true) // 需要持久化
  165.             {
  166.                 bool ret = _mapper.insert(mqp);
  167.                 if (ret == false)
  168.                     return false;
  169.             }
  170.             _messageQueues.insert(std::make_pair(name, mqp));
  171.             return true;
  172.         }
  173.         bool deleteMessageQueue(const std::string &name) // 删除消息队列
  174.         {
  175.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  176.             auto it = _messageQueues.find(name);
  177.             if (it == _messageQueues.end()) // 不存在
  178.             {
  179.                 return true;
  180.             }
  181.             if (it->second->durable == true) // 从数据库中删除
  182.             {
  183.                 bool ret = _mapper.erase(it->second->name);
  184.                 if (ret == false)
  185.                     return false;
  186.             }
  187.             _messageQueues.erase(name);
  188.             return true;
  189.         }
  190.         MessageQueue::ptr selectMessageQueue(const std::string &name) // 获取指定消息队列
  191.         {
  192.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  193.             auto it = _messageQueues.find(name);
  194.             if (it == _messageQueues.end()) // 不存在
  195.             {
  196.                 return MessageQueue::ptr();
  197.             }
  198.             return it->second;
  199.         }
  200.         MessageQueueMap getall()//获取所有消息队列
  201.         {
  202.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  203.             return _messageQueues;
  204.         }
  205.         bool exists(const std::string &name) // 指定交换机是否存在
  206.         {
  207.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  208.             auto it = _messageQueues.find(name);
  209.             return it != _messageQueues.end();
  210.         }
  211.         void clear() // 清理数据
  212.         {
  213.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  214.             _mapper.removeTable();                     // 先删磁盘内容
  215.             _messageQueues.clear();                    // 再删内存内容
  216.         }
  217.         size_t size()
  218.         {
  219.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  220.             return _messageQueues.size();
  221.         }
  222.     private:
  223.         std::mutex _mutex; // 多线程使用,注意线程安全
  224.         MessageQueueMapper _mapper;
  225.         MessageQueueMap _messageQueues;
  226.     };
  227. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
                       6、绑定信息(交换机-消息队列)管理

     绑定信息,本质上就是⼀个交换构造联了哪些队列的描述。
     界说绑定信息类
     

  • 交换机名称
  • 队列名称
  • binding_key(分发匹配规则-决定了哪些数据能被交换机放⼊队列)
     界说绑定信息数据持久化类(数据持久化的sqlite3数据库中)
     

  • 创建/删除绑定信息数据表
  • 新增绑定信息数据
  • 移除指定绑定信息数据
  • 移除指定交换机相干绑定信息数据:移除交换机的时候会被调⽤
  • 移除指定队列相干绑定信息数据:移除队列的时候会被调⽤
  • 查询全部绑定信息数据:⽤于重启服务器时进⾏汗青数据恢复
      界说绑定信息数据管理类
     

  • 创建绑定信息,并添加管理(存在则OK,不存在则创建)
  • 解除指定的绑定信息
  • 删除指定队列的全部绑定信息
  • 删除交换机相干的全部绑定信息
  • 获取交换机相干的全部绑定信息:交换机收到消息后,必要分发给⾃⼰关联的队列
  • 判定指定绑定信息是否存在
  • 获取当前绑定信息数目
  • 销毁全部绑定信息数据
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include <unordered_map>
  5. #include <memory>
  6. #include <cassert>
  7. #include <mutex>
  8. namespace lsxmq
  9. {
  10.     //1.定义绑定类
  11.     struct Binding
  12.     {
  13.         using ptr=std::shared_ptr<Binding>;
  14.         std::string exchangeName;//交换机名称
  15.         std::string messageQueueName;//消息队列名称
  16.         std::string binding_key;
  17.         bool durable;//是否持久化
  18.         Binding(){}
  19.         Binding(const std::string& ename,const std::string& qname,const std::string& bdkey,bool bdurable)
  20.             :exchangeName(ename),messageQueueName(qname),binding_key(bdkey),durable(bdurable)
  21.         {}
  22.     };
  23.     //2.定义持久化管理绑定类
  24.     //交换机名称-->多个消息队列
  25.     //消息队列-->多个绑定信息
  26.     using MessageQueueMap=std::unordered_map<std::string,Binding::ptr>;
  27.     using BindingMap=std::unordered_map<std::string,MessageQueueMap>;
  28.     class BindingMapper
  29.     {
  30.     public:
  31.         BindingMapper(const std::string dbfile)
  32.             :_sqliteHelper(dbfile)
  33.         {
  34.             std::string path = FileHelper::parentDirectory(dbfile);
  35.             if (FileHelper(path).exists() == false) // 父目录不存在就创建
  36.             {
  37.                 FileHelper::createDirectory(path);
  38.             }
  39.             assert(_sqliteHelper.open());
  40.             this->createTable();
  41.         }
  42.         void createTable()//创建表
  43.         {
  44.             #define CREATE_TABLE "create table if not exists bindingTable(\
  45.             exchangeName varchar(32),\
  46.             messageQueueName varchar(32),\
  47.             binding_key varchar(128),\
  48.             durable int);"
  49.             bool ret = _sqliteHelper.exec(CREATE_TABLE, nullptr, nullptr);
  50.             if (ret == false)
  51.             {
  52.                 ELOG("绑定数据库表创建失败");
  53.                 abort();
  54.             }
  55.         }
  56.         void removeTable()//删除表
  57.         {
  58.             #define DROP_TABLE "drop table bindingTable"
  59.             bool ret = _sqliteHelper.exec(DROP_TABLE, nullptr, nullptr);
  60.             if (ret == false)
  61.             {
  62.                 ELOG("绑定数据库表删除失败");
  63.                 abort();
  64.             }
  65.         }
  66.         bool insert(Binding::ptr& bdp)//插入绑定数据
  67.         {
  68.             #define INSERT "insert into bindingTable values('%s','%s','%s',%d);"
  69.             char sql[256] = {0};
  70.             sprintf(sql, INSERT, bdp->exchangeName.c_str(),bdp->messageQueueName.c_str(),bdp->binding_key.c_str(),bdp->durable);
  71.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  72.         }
  73.         bool erase(const std::string& ename,const std::string qname)//删除绑定数据数据
  74.         {
  75.             #define ERASE "delete from  bindingTable where exchangeName='%s' and messageQueueName='%s';"
  76.             char sql[256] = {0};
  77.             sprintf(sql, ERASE, ename.c_str(),qname.c_str());
  78.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  79.         }
  80.         bool removeExchangeBinding(const std::string& ename)//删除交换机的绑定数据
  81.         {
  82.             #define ERASE_EXCHANGE "delete from  bindingTable where exchangeName='%s';"
  83.             char sql[256] = {0};
  84.             sprintf(sql, ERASE_EXCHANGE, ename.c_str());
  85.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  86.         }
  87.         bool removeMessageQueueBinding(const std::string& qname)//删除消息队列的绑定数据
  88.         {
  89.             #define ERASE_MESSAGEQUEUE "delete from  bindingTable where messageQueueName='%s';"
  90.             char sql[256] = {0};
  91.             sprintf(sql, ERASE_MESSAGEQUEUE, qname.c_str());
  92.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  93.         }
  94.         BindingMap recovery()//恢复数据
  95.         {
  96.             #define SELECT_ALL "select exchangeName,messageQueueName,binding_key,durable from  bindingTable"
  97.             BindingMap result;
  98.             bool ret = _sqliteHelper.exec(SELECT_ALL, sqliteCallback, &result);
  99.             if (ret == false)
  100.             {
  101.                 ELOG("恢复绑定数据失败");
  102.                 abort();
  103.             }
  104.             return result;
  105.         }
  106.         ~BindingMapper()
  107.         {
  108.             _sqliteHelper.close();
  109.         }
  110.     private:
  111.         static int sqliteCallback(void *arg, int col, char **row, char **fields)
  112.         {
  113.             BindingMap *result = (BindingMap *)arg;
  114.             auto bdp = std::make_shared<Binding>(row[0],row[1],row[2],row[3]);
  115.             //ename->(qname->binding)
  116.             //为了防止 交换机相关的绑定信息已经存在,因此不能直接创建队列映射,进行添加,这样会覆盖历史数据
  117.             //因此得先获取交换机对应的映射对象,往里边添加数据
  118.             //但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
  119.             //1.用ename拿出MessageQueueMap
  120.             MessageQueueMap& mqm=(*result)[row[0]];//已经有了就拿出来,没有就创建
  121.             mqm.insert(std::make_pair(row[1],bdp));
  122.             return 0;
  123.         }
  124.     private:
  125.         SqliteHelper _sqliteHelper;
  126.     };
  127.     //3.定义绑定内存管理类
  128.     class BindingManager
  129.     {
  130.     public:
  131.         using ptr=std::shared_ptr<BindingManager>;
  132.         BindingManager(const std::string& dbfile)
  133.             :_mapper(dbfile)
  134.         {
  135.             _bindings=_mapper.recovery();
  136.         }
  137.         bool bind(const std::string& ename,const std::string& qname,const std::string& bdkey,bool durable)//建立绑定关系
  138.         {
  139.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  140.             //先确定改绑定是否已存在
  141.             auto it=_bindings.find(ename);
  142.             if(it!=_bindings.end()&&it->second.find(qname)!=it->second.end())
  143.             {
  144.                 return true;
  145.             }
  146.             Binding::ptr bdp=std::make_shared<Binding>(ename,qname,bdkey,durable);
  147.             //绑定信息是否需要持久化,取决于什么? 交换机数据是持久化的,以及队列数据也是持久化的。
  148.             if(durable)//需要持久化
  149.             {
  150.                 bool ret=_mapper.insert(bdp);
  151.                 if(ret==false) return false;
  152.             }
  153.             auto& mqm=_bindings[ename];
  154.             mqm.insert(std::make_pair(qname,bdp));
  155.             return true;
  156.         }
  157.         bool unbind(const std::string& ename,const std::string& qname)//解除绑定关系
  158.         {
  159.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  160.             //先确定绑定是否不存在
  161.             auto it=_bindings.find(ename);
  162.             if(it==_bindings.end())//通过ename找不到MessageQueueMap
  163.             {
  164.                 return true;
  165.             }
  166.             auto it2=it->second.find(qname);
  167.             if(it2==it->second.end())//通过qname找不到binding
  168.             {
  169.                 return true;
  170.             }
  171.             //现在确定了要解除的绑定是存在的
  172.             if(it2->second->durable==true)//从数据库里删除
  173.             {
  174.                 bool ret=_mapper.erase(ename,qname);
  175.                 if(ret==false) return false;
  176.             }
  177.             //从内存中删除
  178.             it->second.erase(qname);
  179.             if(it->second.empty())
  180.             {
  181.                 _bindings.erase(ename);
  182.             }
  183.             return true;
  184.         }
  185.         bool deleteExchangeBinding(const std::string& ename)//删除与交换机相关的绑定
  186.         {
  187.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  188.             _bindings.erase(ename);
  189.             return _mapper.removeExchangeBinding(ename);
  190.         }
  191.         bool deleteMessageQueueBinding(const std::string& qname)//删除与消息队列相关的绑定
  192.         {
  193.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  194.             for(auto it=_bindings.begin();it!=_bindings.end();)
  195.             {
  196.                 it->second.erase(qname);
  197.                 if(it->second.empty())
  198.                 {
  199.                     it=_bindings.erase(it);
  200.                 }
  201.                 else
  202.                 {
  203.                     ++it;
  204.                 }
  205.             }
  206.             return _mapper.removeMessageQueueBinding(qname);
  207.         }
  208.         MessageQueueMap getExchangeBinding(const std::string& ename)//获取交换机绑定的所有消息队列
  209.         {
  210.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  211.             //该交换机是否存在
  212.             auto it=_bindings.find(ename);
  213.             if(it==_bindings.end())//该交换机不存在
  214.             {
  215.                 return MessageQueueMap();
  216.             }
  217.             return it->second;
  218.         }
  219.         Binding::ptr getBinding(const std::string& ename,const std::string& qname)
  220.         {
  221.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  222.             //先确定绑定是否不存在
  223.             auto it=_bindings.find(ename);
  224.             if(it==_bindings.end())//通过ename找不到MessageQueueMap
  225.             {
  226.                 return Binding::ptr();
  227.             }
  228.             auto it2=it->second.find(qname);
  229.             if(it2==it->second.end())//通过qname找不到binding
  230.             {
  231.                 return Binding::ptr();
  232.             }
  233.             //现在确定了要解除的绑定是存在的
  234.             return it2->second;
  235.         }
  236.         bool exists(const std::string& ename,const std::string& qname)//绑定信息是否存在
  237.         {
  238.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  239.             auto it=_bindings.find(ename);
  240.             if(it!=_bindings.end()&&it->second.find(qname)!=it->second.end())
  241.             {
  242.                 return true;
  243.             }
  244.             return false;
  245.         }
  246.         size_t size()//绑定数量
  247.         {
  248.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  249.             size_t ret=0;
  250.             for(auto& it:_bindings)
  251.             {
  252.                 ret+=it.second.size();
  253.             }
  254.             return ret;
  255.         }
  256.         void clear()//清空数据
  257.         {
  258.             ILOG("清空数据");
  259.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  260.             _mapper.removeTable();
  261.             _bindings.clear();
  262.         }
  263.     private:
  264.         std::mutex _mutex;
  265.         BindingMapper _mapper;
  266.         BindingMap _bindings;
  267.     };
  268. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
                       7、队列消息管理

     文件数据管理

     消息的要素(决定了消息的数据结构):
     

  • 网络传输的消息要素:消息id、消息routing_key、消息的投递模式
  • 服务器上的消息管理所需的额外要素:最主要的就是持久化原理
     

  • 消息有用标志位:这个字段必要随着消息的持久化内容一起持久化。每一条消息都可能要进行持久化存储,等到推送给客户端就会删除掉,然而每一次删除一条消息就重写一次文件,效率太低下。如果设置了有用标志位,每次只必要将这个有用标志位对应的数据给修改为无效即可。
  • 消息的垃圾回收机制:为了避免文件中消息存储越来越多(许多都是无效消息),因此文件中消息的垃圾回收机制就很有必要了。
  • 消息的现实存储位置(相对于文件起始位置的偏移量):当要删除某条消息时,必要重写覆盖这条消息在文件的对应位置(将有用标志位设为无效),这时候就必要可以或许找到这条消息
  • 消息的长度:当恢复汗青消息以及读取消息内容的时候,必要解决粘包标题。
     消息的持久化管理:
     

  • 以队列为单元进行消息的持久化管理
     

  • 当消息文件垃圾回收时,必要重新加载全部有用消息,重新天生新的数据文件,但是天生新的数据文件后,消息的存储位置发生了变化,这时候必要更新内存中的数据,这时候就必要对全部的队列数据进行加锁,然后进行更新---锁辩说频仍,效率低。因此,如果每个队列都有本身独立的数据文件,则每次只必要对操作的队列数据进行加锁即可。
  • 为什么不使用数据库存储?有些消息较大,不得当数据库,其次消息的持久化主要是为了备份,而不是为了查询,因此直接使用平常文件进行存储。
  • 既然是数据存储在文件中,那必然就会有数据格式的要求。4字节长度|数据|4字节长度|数据...... 。通过四字节长度描述消息现实存储长度,就可以解决粘包标题。
     向外提供的操作:
     

  • 消息文件的创建与删除
  • 消息的新增持久化/删除消息的持久化(并不是真正的删除,只是将标志位置为无效)
  • 汗青数据恢复/垃圾回收
     

  • 什么环境下必要垃圾回收?因为每次删除数据都不是真正的删除,因此文件中的数据会越来越多,但是也不是每次删除都必要垃圾回收。当文件中有用消息凌驾2000条,且其中有用消息比例低于50%。
  • 回收头脑:加载文件中全部有用消息,删除原文件,天生新的数据文件,将数据写入(存在风险,不采用)。加载文件中全部的有用消息,先写到一个临时文件中,然后再去删除原文件,将临时文件名称改为原文件名称。返回全部的有用消息(每条消息中都记录当前新的存储位置---用于更新内存中数据的内容)
     要管理的数据:
     

  • 队列名
  • 根据队列名天生数据文件名称:队列名.mqd
  • 根据队列名天生临时文件名称:队列名.mqd.tmp
     内存数据管理

     以队列为单元进行管理:
     

  • 如果内存中的全部消息团体进行管理,则在进行垃圾回收以及恢复汗青消息上就会变得麻烦,因此每个队列都有一个消息数据的管理结构,终极向外提供一个总体的消息管理类。
  • 队列消息管理:
     

  • 构造对象时:创建/打开队列数据文件,恢复队列汗青消息数据
  • 新增消息/确认消息(删除)
  • 垃圾回收:但持久化数据总量凌驾2000且有用比例低于50%则进行垃圾回收
  • 获取队首消息
  • 删除队列全部消息
  • 获取待推送消息数目
  • 获取待确认消息数目
  • 获取持久化消息数目
     

  • 要管理的数据:
     

  • 持久化的管理句柄
  • 待推送消息链表:以头插尾删的头脑实现队列功能
  • 持久化消息的hashmap:垃圾回收后,必要更新消息数据(现实存储位置)
  • 待确认消息的hashmap:一条消息被推送给客户端,并不会立即真正删除,而是等到被确认后才会删除。一条消息被推送给客户端后,取出待推送链表,参加到待确认hashmap中,等到确认后再删除。
  • 持久化文件中有用消息数目
  • 持久化文件中总体消息数目:可以盘算出文件中的有用消息比例(根据比例和总量决定是否进行垃圾回收)
  • 队列名称
     实现一个对外的总体消息管理类

     

  • 管理的时候每一个队列的消息
  • 管理的成员:
     

  • 互斥锁
  • 每个队列的消息管理句柄:队列名称&队列消息管理句柄的hash表
     

  • 提供的操作
     

  • 初始化队列的消息管理句柄:创建队列的时候调用
  • 销毁队列的消息管理句柄:删除队列的时候调用
  • 向队列新增消息
  • 获取队列队首消息
  • 对队列进行消息确认
  • 获取队列消息数目:可获取消息数目、持久化消息数目、有用待确认消息数目、总的持久化消息数目
  • 恢复队列汗青消息
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. #include <unordered_map>
  6. #include <memory>
  7. #include <cassert>
  8. #include <list>
  9. #include <mutex>
  10. namespace lsxmq
  11. {
  12.     using MessagePtr = std::shared_ptr<lsxmq::Message>;
  13. #define DATAFILE_SUBFIX ".mqd"
  14. #define TMPFILE_SUBFIX ".mqd.tmp"
  15. #define NOTVALID "0"
  16. #define ISVALID "1"
  17.     class MessageMapper
  18.     {
  19.     public:
  20.         MessageMapper(std::string &basedir, const std::string &qname)
  21.             : _qname(qname)
  22.         {
  23.             if (basedir.back() != '/')
  24.             {
  25.                 basedir.push_back('/'); // 如果没有就往末尾加一个/
  26.             }
  27.             _datafile = basedir + qname + DATAFILE_SUBFIX;
  28.             _tmpfile = basedir + qname + TMPFILE_SUBFIX;
  29.             assert(FileHelper::createDirectory(basedir)); // 创建基本目录
  30.             createMessageFile();
  31.         }
  32.         bool createMessageFile() // 创建消息文件
  33.         {
  34.             if (FileHelper(_datafile).exists() == true)
  35.             {
  36.                 return true;
  37.             }
  38.             if (FileHelper::createFile(_datafile) == false)
  39.             {
  40.                 ELOG("创建消息文件失败");
  41.                 return false;
  42.             }
  43.             return true;
  44.         }
  45.         bool removeMessageFile() // 删除消息文件
  46.         {
  47.             if (FileHelper(_datafile).exists() == false)
  48.             {
  49.                 return true;
  50.             }
  51.             if (FileHelper::removeFile(_datafile) == false)
  52.             {
  53.                 ELOG("删除消息文件失败");
  54.                 return false;
  55.             }
  56.             return true;
  57.         }
  58.         bool insert(MessagePtr &ptr) // 新增消息
  59.         {
  60.             return insert(_datafile, ptr); // 向data文件写入内容
  61.         }
  62.         bool erase(MessagePtr &ptr) // 删除消息
  63.         {
  64.             // 1.将message的有效标志置为0
  65.             ptr->mutable_payload()->set_valid(NOTVALID);
  66.             // 2.进行序列化
  67.             std::string body = ptr->payload().SerializeAsString();
  68.             if (body.size() != ptr->len())
  69.             {
  70.                 ELOG("新数据与原数据长度不一致")
  71.                 return false;
  72.             }
  73.             // 3.将序列化的message覆盖文件中原本的message
  74.             FileHelper dataFileHelper(_datafile);
  75.             if (dataFileHelper.write(body.c_str(), ptr->offset(), body.size()) == false)
  76.             {
  77.                 ELOG("消息重新写入文件失败");
  78.                 return false;
  79.             }
  80.             return true;
  81.         }
  82.         std::list<MessagePtr> gc() // 垃圾回收
  83.         {
  84.             bool ret = true;
  85.             std::list<MessagePtr> result;
  86.             // 1.加载数据
  87.             ret = load(result);
  88.             if (ret == false)
  89.             {
  90.                 ELOG("加载数据失败");
  91.                 return result;
  92.             }
  93.             // 2.将数据写入到临时文件
  94.             FileHelper::createFile(_tmpfile);
  95.             FileHelper tmpDataHelper(_tmpfile);
  96.             for (auto &mp : result)
  97.             {
  98.                 // 写入
  99.                 ret = this->insert(_tmpfile, mp);
  100.                 if (ret == false)
  101.                 {
  102.                     ELOG("消息写入临时文件失败");
  103.                     return result;
  104.                 }
  105.             }
  106.             // 3.删除原文件
  107.             ret = removeMessageFile();
  108.             if (ret == false)
  109.             {
  110.                 ELOG("删除消息文件失败");
  111.                 return result;
  112.             }
  113.             // 4.将原文件改为消息文件
  114.             ret = tmpDataHelper.rename(_datafile);
  115.             if (ret == false)
  116.             {
  117.                 ELOG("修改临时文件名称失败");
  118.                 return result;
  119.             }
  120.             return result;
  121.         }
  122.     private:
  123.         bool load(std::list<MessagePtr> &mplist) // 将文件中的消息加载进内存
  124.         {
  125.             size_t offset = 0;
  126.             FileHelper dataFileHelper(_datafile);
  127.             size_t fsize = dataFileHelper.size();
  128.             bool ret = true;
  129.             while (offset < fsize)
  130.             {
  131.                 // 1.前四位为数据长度
  132.                 size_t msize = 0;
  133.                 ret = dataFileHelper.read((char *)&msize, offset, sizeof(size_t));
  134.                 if (ret == false)
  135.                 {
  136.                     ELOG("读取信息长度失败");
  137.                     return false;
  138.                 }
  139.                 // 2.开始读取数据
  140.                 offset += sizeof(size_t);
  141.                 std::string body(msize,'\0');
  142.                 ret = dataFileHelper.read(&body[0], offset, msize);
  143.                 if (ret == false)
  144.                 {
  145.                     ELOG("读取信息内容失败");
  146.                     return false;
  147.                 }
  148.                 // 3.反序列化
  149.                 MessagePtr mp = std::make_shared<Message>();
  150.                 mp->mutable_payload()->ParseFromString(body);
  151.                 // 4.剔除无效信息
  152.                 if (mp->payload().valid() == NOTVALID)
  153.                 {
  154.                     offset += msize;
  155.                     continue;
  156.                 }
  157.                 // 4.保存结果
  158.                 mplist.push_back(mp);
  159.                 offset += msize;
  160.             }
  161.             return true;
  162.         }
  163.         bool insert(const std::string filename, MessagePtr &ptr)
  164.         {
  165.             // 1.对消息进行序列化
  166.             std::string body = ptr->payload().SerializeAsString();
  167.             MessagePtr tmp=std::make_shared<Message>();
  168.             tmp->mutable_payload()->ParseFromString(body);
  169.             // 2.获取文件大小
  170.             FileHelper FileHelper(filename);
  171.             size_t fsize = FileHelper.size();
  172.             size_t msize = body.size();
  173.             // 3.先写入8字节长度
  174.             if (FileHelper.write((char*)&msize, fsize, sizeof(size_t)) == false)
  175.             {
  176.                 ELOG("消息长度写入文件失败");
  177.                 return false;
  178.             }
  179.             //4.在写入指定长度内容
  180.             if (FileHelper.write(body.c_str(), fsize+sizeof(size_t), msize) == false)
  181.             {
  182.                 ELOG("消息内容写入文件失败");
  183.                 return false;
  184.             }
  185.             // 4.更新message的实际存储位置
  186.             ptr->set_offset(fsize+sizeof(size_t));
  187.             ptr->set_len(msize);
  188.             return true;
  189.         }
  190.     private:
  191.         std::string _qname;
  192.         std::string _datafile;
  193.         std::string _tmpfile;
  194.     };
  195.     class QueueMessage
  196.     {
  197.     public:
  198.         using ptr = std::shared_ptr<QueueMessage>;
  199.         QueueMessage(std::string &basedir, const std::string &qname)
  200.             : _mapper(basedir, qname), _validCount(0), _totalCount(0), _qname(qname)
  201.         {}
  202.         void recovery() // 恢复历史消息
  203.         {
  204.             std::unique_lock<std::mutex> lock(_mutex);
  205.             _pushMessages = _mapper.gc();
  206.             for (auto &mp : _pushMessages)
  207.             {
  208.                 _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  209.             }
  210.             _totalCount = _validCount = _durableMessages.size();
  211.         }
  212.         bool insert(BasicProperties *bp, const std::string& body, DeliveryMode deliveryMode) // 新增消息
  213.         {
  214.             // 1.构造message对象
  215.             MessagePtr mp = std::make_shared<Message>();
  216.             if (bp != nullptr)
  217.             {
  218.                 mp->mutable_payload()->mutable_properties()->set_id(bp->id());
  219.                 mp->mutable_payload()->mutable_properties()->set_deliverymode(bp->deliverymode());
  220.                 mp->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
  221.                 mp->mutable_payload()->set_body(body);
  222.                 mp->mutable_payload()->set_valid(ISVALID);
  223.             }
  224.             else
  225.             {
  226.                 mp->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
  227.                 mp->mutable_payload()->mutable_properties()->set_deliverymode(deliveryMode);
  228.                 mp->mutable_payload()->mutable_properties()->set_routing_key("");
  229.                 mp->mutable_payload()->set_body(body);
  230.                 mp->mutable_payload()->set_valid(ISVALID);
  231.             }
  232.             // 2.判断是否需要持久化
  233.             std::unique_lock<std::mutex> lock(_mutex);
  234.             bool ret = true;
  235.             if (mp->payload().properties().deliverymode() == DeliveryMode::DURABLE) // 需要
  236.             {
  237.                 ret = _mapper.insert(mp);
  238.                 if (ret == false)
  239.                 {
  240.                     ELOG("%s:消息持久化失败", body.c_str());
  241.                     return false;
  242.                 }
  243.                 _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  244.                 ++_validCount;
  245.                 ++_totalCount;
  246.             }
  247.             // 3.在内存中管理
  248.             _pushMessages.push_back(mp);
  249.             return true;
  250.         }
  251.         MessagePtr front() // 获取队首消息
  252.         {
  253.             std::unique_lock<std::mutex> lock(_mutex);
  254.             if(_pushMessages.size()==0)//没有消息能拿出来
  255.             {
  256.                 return MessagePtr();
  257.             }
  258.             // 获取队首消息
  259.             MessagePtr mp = _pushMessages.front();
  260.             // 将消息从待推送队列加到待确认hashmap中
  261.             _pushMessages.pop_front();
  262.             _waitAckMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  263.             return mp;
  264.         }
  265.         bool erase(const std::string &id) // 确认消息,每次删除一条消息后确认是否要进行垃圾回收
  266.         {
  267.             std::unique_lock<std::mutex> lock(_mutex);
  268.             // 1.从待确认消息中找到这条消息
  269.             auto it = _waitAckMessages.find(id);
  270.             if (it == _waitAckMessages.end()) // 找不到
  271.             {
  272.                 return true;
  273.             }
  274.             // 2.根据这条消息的投递模式决定是否从持久化中删除
  275.             bool ret = true;
  276.             if (it->second->payload().properties().deliverymode() == true) // 为持久化
  277.             {
  278.                 // 3.从持久化中删除
  279.                 ret = _mapper.erase(it->second);
  280.                 if (ret == false)
  281.                 {
  282.                     ELOG("从文件中删除id=%s的消息失败", id.c_str());
  283.                     return false;
  284.                 }
  285.                 _durableMessages.erase(id);
  286.                 --_validCount;
  287.                 // 进行垃圾回收
  288.                 gc();
  289.             }
  290.             // 4.从内存中删除
  291.             _waitAckMessages.erase(id);
  292.             return true;
  293.         }
  294.         size_t pushCount()
  295.         {
  296.             std::unique_lock<std::mutex> lock(_mutex);
  297.             return _pushMessages.size();
  298.         }
  299.         size_t durableCount()
  300.         {
  301.             std::unique_lock<std::mutex> lock(_mutex);
  302.             return _durableMessages.size();
  303.         }
  304.         size_t waitAckCount()
  305.         {
  306.             std::unique_lock<std::mutex> lock(_mutex);
  307.             return _waitAckMessages.size();
  308.         }
  309.         size_t validCount()
  310.         {
  311.             std::unique_lock<std::mutex> lock(_mutex);
  312.             return _validCount;
  313.         }
  314.         size_t totalCount()
  315.         {
  316.             std::unique_lock<std::mutex> lock(_mutex);
  317.             return _totalCount;
  318.         }
  319.         void clear()
  320.         {
  321.             _mapper.removeMessageFile();
  322.             _pushMessages.clear();
  323.             _durableMessages.clear();
  324.             _waitAckMessages.clear();
  325.             _totalCount = _validCount = 0;
  326.         }
  327.     private:
  328.         bool NeedGC() // 判断是否需要进行垃圾回收
  329.         {
  330.             if (_totalCount > 2000 && _validCount * 10 / _totalCount < 5)
  331.             {
  332.                 return true;
  333.             }
  334.             return false;
  335.         }
  336.         void gc() // 垃圾回收
  337.         {
  338.             if (NeedGC() == false)
  339.                 return;
  340.             // 1.接收垃圾回收后的消息
  341.             std::list<MessagePtr> mplist = _mapper.gc();
  342.             // 2.修改内存中消息的数据
  343.             for (auto &mp : mplist)
  344.             {
  345.                 // 从持久化消息中找到消息
  346.                 auto it = _durableMessages.find(mp->payload().properties().id());
  347.                 if (it == _durableMessages.end())
  348.                 {
  349.                     ILOG("出现了一条持久消息没有在内存中管理");
  350.                     // 解决策略:重新添加
  351.                     _pushMessages.push_back(mp);
  352.                     _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  353.                     continue;
  354.                 }
  355.                 // 更新消息与文件相关的数据
  356.                 it->second->set_offset(mp->offset());
  357.                 it->second->set_len(mp->len());
  358.             }
  359.             _totalCount = _validCount = _durableMessages.size();
  360.         }
  361.     private:
  362.         std::mutex _mutex;
  363.         MessageMapper _mapper;
  364.         std::list<MessagePtr> _pushMessages;                          // 待推送消息
  365.         std::unordered_map<std::string, MessagePtr> _durableMessages; // 持久化消息
  366.         std::unordered_map<std::string, MessagePtr> _waitAckMessages; // 待确认消息
  367.         size_t _validCount;                                           // 持久化有效消息个数
  368.         size_t _totalCount;                                           // 持久化总消息个数
  369.         std::string _qname;
  370.     };
  371.     class QueueMessageManager
  372.     {
  373.     public:
  374.         using ptr=std::shared_ptr<QueueMessageManager>;
  375.         QueueMessageManager(const std::string &basedir)
  376.             : _basedir(basedir)
  377.         {}
  378.         void clear()
  379.         {
  380.             std::unique_lock<std::mutex> lock(_mutex);
  381.             for(auto& a:_queueMessages)
  382.             {
  383.                 a.second->clear();
  384.             }
  385.         }
  386.         void initQueueMessage(const std::string &qname) // 初始化队列
  387.         {
  388.             QueueMessage::ptr qmp;
  389.             {
  390.                 std::unique_lock<std::mutex> lock(_mutex);
  391.                 // 该队列是否已存在
  392.                 auto it = _queueMessages.find(qname);
  393.                 if (it != _queueMessages.end()) // 已存在
  394.                 {
  395.                     return ;
  396.                 }
  397.                 qmp = std::make_shared<QueueMessage>(_basedir, qname);
  398.                 _queueMessages.insert(std::make_pair(qname,qmp));
  399.             }
  400.             qmp->recovery();
  401.         }
  402.         void destroyQueueMessage(const std::string &qname)// 销毁队列
  403.         {
  404.             QueueMessage::ptr qmp;
  405.             {
  406.                 std::unique_lock<std::mutex> lock(_mutex);
  407.                 // 该队列是否不存在
  408.                 auto it = _queueMessages.find(qname);
  409.                 if (it == _queueMessages.end()) // 不存在
  410.                 {
  411.                     return ;
  412.                 }
  413.                 qmp=it->second;
  414.                 _queueMessages.erase(qname);
  415.             }
  416.             qmp->clear();
  417.         }
  418.         bool insert(const std::string &qname, BasicProperties *bp, const std::string& body, DeliveryMode deliveryMode) // 向队列新增消息
  419.         {
  420.             QueueMessage::ptr qmp;
  421.             {
  422.                 std::unique_lock<std::mutex> lock(_mutex);
  423.                 // 该队列是否不存在
  424.                 auto it = _queueMessages.find(qname);
  425.                 if (it == _queueMessages.end()) // 不存在
  426.                 {
  427.                     ELOG("向队列%s新增消息失败,因为该队列不存在",qname.c_str());
  428.                     return false;
  429.                 }
  430.                 qmp=it->second;
  431.             }
  432.             qmp->insert(bp,body,deliveryMode);
  433.             return true;
  434.         }
  435.         MessagePtr front(const std::string &qname)// 获取队首消息
  436.         {
  437.             QueueMessage::ptr qmp;
  438.             {
  439.                 std::unique_lock<std::mutex> lock(_mutex);
  440.                 // 该队列是否不存在
  441.                 auto it = _queueMessages.find(qname);
  442.                 if (it == _queueMessages.end()) // 不存在
  443.                 {
  444.                     ELOG("从队列%s获取队首消息失败,因为该队列不存在",qname.c_str());
  445.                     return MessagePtr();
  446.                 }
  447.                 qmp=it->second;
  448.             }
  449.             return qmp->front();
  450.         }
  451.         bool ack(const std::string &qname, const std::string &id)// 确认消息
  452.         {
  453.             QueueMessage::ptr qmp;
  454.             {
  455.                 std::unique_lock<std::mutex> lock(_mutex);
  456.                 // 该队列是否不存在
  457.                 auto it = _queueMessages.find(qname);
  458.                 if (it == _queueMessages.end()) // 不存在
  459.                 {
  460.                     ELOG("向队列%s确认消息失败,因为该队列不存在",qname.c_str());
  461.                     return false;
  462.                 }
  463.                 qmp=it->second;
  464.             }
  465.             qmp->erase(id);
  466.             return true;
  467.         }
  468.         size_t pushCount(const std::string& qname)
  469.         {
  470.             QueueMessage::ptr qmp;
  471.             {
  472.                 std::unique_lock<std::mutex> lock(_mutex);
  473.                 // 该队列是否不存在
  474.                 auto it = _queueMessages.find(qname);
  475.                 if (it == _queueMessages.end()) // 不存在
  476.                 {
  477.                     ELOG("向队列%s获取待发送消息数量失败,因为该队列不存在",qname.c_str());
  478.                     return 0;
  479.                 }
  480.                 qmp=it->second;
  481.             }
  482.             return qmp->pushCount();
  483.         }
  484.         size_t durableCount(const std::string& qname)
  485.         {
  486.             QueueMessage::ptr qmp;
  487.             {
  488.                 std::unique_lock<std::mutex> lock(_mutex);
  489.                 // 该队列是否不存在
  490.                 auto it = _queueMessages.find(qname);
  491.                 if (it == _queueMessages.end()) // 不存在
  492.                 {
  493.                     ELOG("向队列%s获取持久化消息数量失败,因为该队列不存在",qname.c_str());
  494.                     return 0;
  495.                 }
  496.                 qmp=it->second;
  497.             }
  498.             return qmp->durableCount();
  499.         }
  500.         size_t waitAckCount(const std::string& qname)
  501.         {
  502.             QueueMessage::ptr qmp;
  503.             {
  504.                 std::unique_lock<std::mutex> lock(_mutex);
  505.                 // 该队列是否不存在
  506.                 auto it = _queueMessages.find(qname);
  507.                 if (it == _queueMessages.end()) // 不存在
  508.                 {
  509.                     ELOG("向队列%s获取等待确认消息数量失败,因为该队列不存在",qname.c_str());
  510.                     return 0;
  511.                 }
  512.                 qmp=it->second;
  513.             }
  514.             return qmp->waitAckCount();
  515.         }
  516.         size_t validCount(const std::string& qname)
  517.         {
  518.             QueueMessage::ptr qmp;
  519.             {
  520.                 std::unique_lock<std::mutex> lock(_mutex);
  521.                 // 该队列是否不存在
  522.                 auto it = _queueMessages.find(qname);
  523.                 if (it == _queueMessages.end()) // 不存在
  524.                 {
  525.                     ELOG("向队列%s获取有效持久化消息数量失败,因为该队列不存在",qname.c_str());
  526.                     return 0;
  527.                 }
  528.                 qmp=it->second;
  529.             }
  530.             return qmp->validCount();
  531.         }
  532.         size_t totalCount(const std::string& qname)
  533.         {
  534.             QueueMessage::ptr qmp;
  535.             {
  536.                 std::unique_lock<std::mutex> lock(_mutex);
  537.                 // 该队列是否不存在
  538.                 auto it = _queueMessages.find(qname);
  539.                 if (it == _queueMessages.end()) // 不存在
  540.                 {
  541.                     ELOG("向队列%s获取全部持久化消息数量失败,因为该队列不存在",qname.c_str());
  542.                     return 0;
  543.                 }
  544.                 qmp=it->second;
  545.             }
  546.             return qmp->totalCount();
  547.         }
  548.     private:
  549.         std::mutex _mutex;
  550.         std::unordered_map<std::string, QueueMessage::ptr> _queueMessages;
  551.         std::string _basedir;
  552.     };
  553. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
  • 330.
  • 331.
  • 332.
  • 333.
  • 334.
  • 335.
  • 336.
  • 337.
  • 338.
  • 339.
  • 340.
  • 341.
  • 342.
  • 343.
  • 344.
  • 345.
  • 346.
  • 347.
  • 348.
  • 349.
  • 350.
  • 351.
  • 352.
  • 353.
  • 354.
  • 355.
  • 356.
  • 357.
  • 358.
  • 359.
  • 360.
  • 361.
  • 362.
  • 363.
  • 364.
  • 365.
  • 366.
  • 367.
  • 368.
  • 369.
  • 370.
  • 371.
  • 372.
  • 373.
  • 374.
  • 375.
  • 376.
  • 377.
  • 378.
  • 379.
  • 380.
  • 381.
  • 382.
  • 383.
  • 384.
  • 385.
  • 386.
  • 387.
  • 388.
  • 389.
  • 390.
  • 391.
  • 392.
  • 393.
  • 394.
  • 395.
  • 396.
  • 397.
  • 398.
  • 399.
  • 400.
  • 401.
  • 402.
  • 403.
  • 404.
  • 405.
  • 406.
  • 407.
  • 408.
  • 409.
  • 410.
  • 411.
  • 412.
  • 413.
  • 414.
  • 415.
  • 416.
  • 417.
  • 418.
  • 419.
  • 420.
  • 421.
  • 422.
  • 423.
  • 424.
  • 425.
  • 426.
  • 427.
  • 428.
  • 429.
  • 430.
  • 431.
  • 432.
  • 433.
  • 434.
  • 435.
  • 436.
  • 437.
  • 438.
  • 439.
  • 440.
  • 441.
  • 442.
  • 443.
  • 444.
  • 445.
  • 446.
  • 447.
  • 448.
  • 449.
  • 450.
  • 451.
  • 452.
  • 453.
  • 454.
  • 455.
  • 456.
  • 457.
  • 458.
  • 459.
  • 460.
  • 461.
  • 462.
  • 463.
  • 464.
  • 465.
  • 466.
  • 467.
  • 468.
  • 469.
  • 470.
  • 471.
  • 472.
  • 473.
  • 474.
  • 475.
  • 476.
  • 477.
  • 478.
  • 479.
  • 480.
  • 481.
  • 482.
  • 483.
  • 484.
  • 485.
  • 486.
  • 487.
  • 488.
  • 489.
  • 490.
  • 491.
  • 492.
  • 493.
  • 494.
  • 495.
  • 496.
  • 497.
  • 498.
  • 499.
  • 500.
  • 501.
  • 502.
  • 503.
  • 504.
  • 505.
  • 506.
  • 507.
  • 508.
  • 509.
  • 510.
  • 511.
  • 512.
  • 513.
  • 514.
  • 515.
  • 516.
  • 517.
  • 518.
  • 519.
  • 520.
  • 521.
  • 522.
  • 523.
  • 524.
  • 525.
  • 526.
  • 527.
  • 528.
  • 529.
  • 530.
  • 531.
  • 532.
  • 533.
  • 534.
  • 535.
  • 536.
  • 537.
  • 538.
  • 539.
  • 540.
  • 541.
  • 542.
  • 543.
  • 544.
  • 545.
  • 546.
  • 547.
  • 548.
  • 549.
  • 550.
  • 551.
  • 552.
  • 553.
  • 554.
  • 555.
  • 556.
  • 557.
  • 558.
  • 559.
                       8、假造机管理

     假造机模块是对上述三个数据管理模块的整合,并基于数据之间的关联关系进⾏团结操作。
     界说假造机类包含以下成员:
     

  • 交换机数据管理模块句柄
  • 队列数据管理模块句柄
  • 绑定数据管理模块句柄
  • 消息数据管理模块句柄
      假造机包含操作:
     

  • 提供声明交换机的功能(存在则OK,不存在则创建)
  • 提供删除交换机的功能(删除交换机的同时删除关联绑定信息)
  • 提供声明队列的功能(存在则OK,不存在则创建,创建的同时创建队列关联消息管理对象)
  • 提供删除队列的功能(删除队列的同时删除关联绑定信息,删除关联消息管理对象及队列全部消息)
  • 提供交换机-队列绑定的功能
  • 提供交换机-队列解绑的功能
  • 提供获取交换机相干的全部绑定信息功能
  • 提供新增消息的功能
  • 提供获取指定队列队⾸消息的功能
  • 提供消息确认删除的功能
     假造机管理操作:可以有多台假造机,项目中并没有实现。
     

  • 增删查
                                   登录后复制                        
  1. #pragma once
  2. #include "binding.hpp"
  3. #include "exchange.hpp"
  4. #include "message.hpp"
  5. #include "messageQueue.hpp"
  6. namespace lsxmq
  7. {
  8.     class VirtualHost//虚拟机
  9.     {
  10.     public:
  11.         using ptr=std::shared_ptr<VirtualHost>;
  12.         VirtualHost(const std::string& hname,const std::string& dbfile,const std::string& basedir)
  13.             :_hostName(hname)
  14.             ,_bmp(std::make_shared<BindingManager>(dbfile))
  15.             ,_emp(std::make_shared<ExchangeManager>(dbfile))
  16.             ,_qmmp(std::make_shared<QueueMessageManager>(basedir))
  17.             ,_mqmp(std::make_shared<MessageQueueManager>(dbfile))
  18.         {
  19.             //恢复队列消息
  20.             //1.获取所有队列
  21.             MessageQueueMap mqmap=_mqmp->getAll();
  22.             //2.用队列名称初始化队列消息
  23.             for(auto& it:mqmap)
  24.             {
  25.                 _qmmp->initQueueMessage(it.second->name);
  26.             }
  27.         }
  28.         bool declareExchange(const std::string& name,ExchangeType type
  29.                 ,bool durable,bool autoDel
  30.                 ,std::unordered_map<std::string,std::string> args)//声明交换机
  31.         {
  32.             return _emp->declareExchange(name,type,durable,autoDel,args);
  33.         }
  34.         bool deleteExchange(const std::string& ename)//删除交换机
  35.         {
  36.             bool ret=_emp->deleteExchange(ename);
  37.             if(ret==false)
  38.             {
  39.                 return false;
  40.             }
  41.             //删除交换机对应的绑定
  42.             return _bmp->deleteExchangeBinding(ename);
  43.         }
  44.         bool existsExchange(const std::string& ename)
  45.         {
  46.             return _emp->exists(ename);
  47.         }
  48.         bool declareMessageQueue(const std::string &name, bool durable, bool exclusive
  49.                 , bool autoDel, std::unordered_map<std::string, std::string> args)//声明消息队列
  50.         {
  51.             bool ret=_mqmp->declareMessageQueue(name,durable,exclusive,autoDel,args);
  52.             if(ret==false)
  53.             {
  54.                 return false;
  55.             }
  56.             _qmmp->initQueueMessage(name);
  57.             return true;
  58.         }
  59.         bool deleteMessageQueue(const std::string &qname)//删除消息队列
  60.         {
  61.             auto ret=_mqmp->deleteMessageQueue(qname);
  62.             if(ret==false)
  63.             {
  64.                 return false;
  65.             }
  66.             //删除消息队列对应的绑定
  67.             ret=_bmp->deleteMessageQueueBinding(qname);
  68.             if(ret==false)
  69.             {
  70.                 return false;
  71.             }
  72.             //删除对应的队列消息
  73.             _qmmp->destroyQueueMessage(qname);
  74.             return true;
  75.         }
  76.         bool existsMessageQueue(const std::string& qname)
  77.         {
  78.             return _mqmp->exists(qname);
  79.         }
  80.         bool bind(const std::string& ename,const std::string& qname,const std::string& binding_key)//绑定
  81.         {
  82.             //交换机是否存在
  83.             Exchange::ptr ep=_emp->selectExchange(ename);
  84.             if(ep.get()==nullptr)
  85.             {
  86.                 ELOG("%s与%s绑定失败,因为交换机不存在",ename.c_str(),qname.c_str());
  87.                 return false;
  88.             }
  89.             //队列是否存在
  90.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  91.             if(mqp.get()==nullptr)
  92.             {
  93.                 ELOG("%s与%s绑定失败,因为队列不存在",ename.c_str(),qname.c_str());
  94.                 return false;
  95.             }
  96.             //根据交换机与队列的持久化方式决定绑定是否持久化
  97.             return _bmp->bind(ename,qname,binding_key,ep->durable&&mqp->durable);
  98.         }
  99.         bool unbind(const std::string& ename,const std::string& qname)//解绑
  100.         {
  101.              //交换机是否存在
  102.             Exchange::ptr ep=_emp->selectExchange(ename);
  103.             if(ep.get()==nullptr)
  104.             {
  105.                 ELOG("%s与%s解绑失败,因为交换机不存在",ename.c_str(),qname.c_str());
  106.                 return false;
  107.             }
  108.             //队列是否存在
  109.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  110.             if(mqp.get()==nullptr)
  111.             {
  112.                 ELOG("%s与%s解绑失败,因为队列不存在",ename.c_str(),qname.c_str());
  113.                 return false;
  114.             }
  115.             return _bmp->unbind(ename,qname);
  116.         }
  117.         MessageQueueBindingMap exchangeBindings(const std::string& ename)//获取一台交换机的所有绑定
  118.         {
  119.             return _bmp->getExchangeBinding(ename);
  120.         }
  121.         bool existsBinding(const std::string &ename, const std::string &qname)
  122.         {
  123.             return _bmp->exists(ename,qname);
  124.         }
  125.         bool basicPublish(const std::string &qname, BasicProperties *bp
  126.                 , const std::string& body)//发布一条消息
  127.         {
  128.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  129.             return _qmmp->insert(qname,bp,body,mqp->durable);
  130.         }
  131.         MessagePtr basicConsume(const std::string& qname)//获取一条消息
  132.         {
  133.             return _qmmp->front(qname);
  134.         }
  135.         bool basicAck(const std::string &qname, const std::string &id)//确认一条消息
  136.         {
  137.             return _qmmp->ack(qname,id);
  138.         }
  139.         
  140.         void clear()
  141.         {
  142.             _bmp->clear();
  143.             _emp->clear();
  144.             _qmmp->clear();
  145.             _mqmp->clear();
  146.         }
  147.     private:
  148.         std::string _hostName;//虚拟机名称
  149.         BindingManager::ptr _bmp;//绑定管理句柄
  150.         ExchangeManager::ptr _emp;//交换机管理句柄
  151.         QueueMessageManager::ptr _qmmp;//队列消息管理句柄
  152.         MessageQueueManager::ptr _mqmp;//消息队列管理句柄
  153.     };
  154. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
                       9、交换机路由模块

     交换路由规则取决要素:
     

  • 交换机范例
     

  • 广播交换:将消息发布到全部绑定的队列中
  • 直接交换:用户发布消息的时候,消息中有一个routing_key,与哪个队列的binding_key相同,则匹配成功
  • 主题交换:按照匹配规则匹配
     路由交换模块:
     

  • 功能:判定一个消息中的routing_key与队列的binding_key是否匹配成功。
  • 提供功能:
     

  • 判定routing_key是否合法:必须是a~z、A~Z、.、_构成
  • 判定binding_key是否合法:必须是a~z、A~Z、*、#、.、_构成,其中*#是通配符,必须独立存在,*和#不能连续出现
  • 进行routing_key与binding_key的路由匹配
     binding_key:
     是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部门,并⽀持 * 和 # 通配符。
     例如: news.music.# ,这⽤于表⽰交换机绑定的当前队列是⼀个⽤于发布⾳乐消息的队列。
     

  • ⽀持 * 和 # 两种通配符, 但是 * # 只能作为 . 切分出来的独⽴部门, 不能和其他数字字⺟混⽤,
     

  • ⽐如 a.*.b 是合法的, a.*a.b 是不合法的
  • * 可以匹配任意⼀个单词(注意是单词不是字⺟)
  • # 可以匹配零个或者多个任意单词(注意是单词不是字⺟)
     

  • 注意事项: a.#.b
     

  • ⼀个单词中不能既出现 * ⼜出现 #, 也就是,⼀个单词中只能有⼀个通配符,且必须独⽴存在
  • #通配符双方不能出现其他通配符,因为 # 可以匹配任意多个任意单词,因此连续出现是没故意义的。
     routing_key:
     是由数据、字⺟和下划线构成, 并且可以使⽤ . 分别成若⼲部门。
     例如: news.music.pop ,这⽤于表⽰当前发布的消息是⼀个流⾏⾳乐的消息。
     ⽐如,在进⾏队列绑定时,某队列的binding_key约定为:news.music.#表⽰这个队列⽤于发布⾳乐消息。⽽这时候客⼾端发布了⼀条消息,其中routing_key为:news.music.pop 则可以匹配成功,⽽,如果发布消息的routing_key为:news.sport.football,这时候就会匹配失败。
     匹配算法头脑:
     

  • 对binding_key和routing_key进行单词分割
  • 按照元素个数,界说出对应巨细+1的二维数组,并且将dp[0][0]位置置为1
  • 使用routing_key的每个元素,与binding_key的每个元素进行比对,在二维数组中进行标志
  • 标志规则:
     

  • 当两个单词匹配成功时:从左上方继承结果,dp[j]=dp[i-1][j-1]
  • 当遇到#通配符匹配成功时,不仅可以从左上方继承,还可以从左方继承结果dp[j]=dp[i-1][j-1] || dp[j-1]
  • 当遇到#通配符匹配成功时,不仅可以从左上方、左方继承,还可以从上方继承结果dp[j]=dp[i-1][j-1] || dp[j-1] || dp[i-1][j]
  • 当binding_key以#起始时,必要将#对应行的第0列置为1
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/message.pb.h"
  4. namespace lsxmq
  5. {
  6.     class Router
  7.     {
  8.     public:
  9.         static bool routingKeyIsLegal(const std::string& routingKey)//判断routing_key是否合法
  10.         {
  11.             //只需要确定是否有非法字符即可
  12.             //合法字符:a~z、A~Z、.、_
  13.             for(auto& ch:routingKey)
  14.             {
  15.                 if((ch>='a'&&ch<='z')
  16.                     ||(ch>='A'&&ch<='Z')
  17.                     ||(ch>='0'&&ch<='9')
  18.                     ||ch=='_'||ch=='.')
  19.                 {
  20.                     continue;
  21.                 }
  22.                 else
  23.                 {
  24.                     return false;//不合法
  25.                 }
  26.             }
  27.             return true;
  28.         }
  29.         static bool bindingKeyIsLegal(const std::string& bindingKey)
  30.         {
  31.             //1.确定是否有非法字符即可
  32.             //合法字符:a~z、A~Z、.、_、*、#
  33.             for(auto& ch:bindingKey)
  34.             {
  35.                 if((ch>='a'&&ch<='z')
  36.                     ||(ch>='A'&&ch<='Z')
  37.                     ||(ch>='0'&&ch<='9')
  38.                     ||ch=='_'||ch=='.'
  39.                     ||ch=='#'||ch=='*')
  40.                 {
  41.                     continue;
  42.                 }
  43.                 else
  44.                 {
  45.                     return false;//不合法
  46.                 }
  47.             }
  48.             //2.#和*必须独立存在(aa.a#.b这种就是非法)
  49.             std::vector<std::string> subStr;
  50.             StrHelper::split(bindingKey,".",subStr);
  51.             for(auto& word:subStr)
  52.             {
  53.                 if(word.size()>1
  54.                     &&word.find('#')!=std::string::npos
  55.                     &&word.find('*')!=std::string::npos)
  56.                 {
  57.                     return false;
  58.                 }
  59.             }
  60.             //3.#和*不连续出现,#的前面没有#和*,*的前面没有#
  61.             for(int i=1;i<subStr.size();++i)
  62.             {
  63.                 if(subStr[i]=="#"&&subStr[i-1]=="#")
  64.                 {
  65.                     return false;
  66.                 }
  67.                 if(subStr[i]=="#"&&subStr[i-1]=="*")
  68.                 {
  69.                     return false;
  70.                 }
  71.                 if(subStr[i]=="*"&&subStr[i-1]=="#")
  72.                 {
  73.                     return false;
  74.                 }
  75.             }
  76.             return true;
  77.         }
  78.         static bool route(const std::string& routingKey,const std::string& bindingKey,ExchangeType routeType)
  79.         {
  80.             if(routeType==DIRECT)//直接路由
  81.             {
  82.                 return routingKey==bindingKey;   
  83.             }
  84.             if(routeType==FANOUT)//广播路由
  85.             {
  86.                 return true;
  87.             }
  88.             if(routeType==TOPIC)//主题路由
  89.             {
  90.                 return topicRoute(routingKey,bindingKey);
  91.             }
  92.             ELOG("位置交换机类型:%d",routeType);
  93.             return false;
  94.         }
  95.     private:
  96.         static bool topicRoute(const std::string& routingKey,const std::string& bindingKey)
  97.         {
  98.             //1.先分割字符串
  99.             std::vector<std::string> subBinding;
  100.             StrHelper::split(bindingKey,".",subBinding);
  101.             std::vector<std::string> subRouting;
  102.             StrHelper::split(routingKey,".",subRouting);
  103.             //2.构造dp表
  104.             int row=subBinding.size(),col=subRouting.size();//行列
  105.             std::vector<std::vector<int>> dp(row+1,std::vector<int>(col+1,0));
  106.             dp[0][0]=1;
  107.             if(subBinding[0]=="#") dp[1][0]=1;//当binding_key以#起始时,需要将#对应行的第0列置为1
  108.             //3.开始匹配
  109.             for(int i=1;i<row+1;++i)
  110.             {
  111.                 for(int j=1;j<col+1;++j)
  112.                 {
  113.                     if(subBinding[i-1]=="#")//遇到#通配符匹配成功时,可以从左上方、左方、上方继承
  114.                     {
  115.                         dp[i][j]=dp[i-1][j-1]||dp[i-1][j]||dp[i][j-1];
  116.                     }
  117.                     else if(subBinding[i-1]=="*"||(subBinding[i-1]==subRouting[j-1]))
  118.                     {
  119.                         //当两个单词匹配成功时,从左上方继承结果
  120.                         dp[i][j]=dp[i-1][j-1];
  121.                     }
  122.                 }
  123.             }
  124.             return dp[row][col];
  125.         }
  126.     };
  127. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
                       10、队列消费者/订阅者管理

     

     界说消费者信息结构:
     

  • 消费者表现
  • 订阅的队列名称
  • 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,怎样消费?对于服务端来说就是调用这个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)
  • 是否自动应答标志(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等候客户端回应)
     消费者的管理以信道为单元照旧以队列为单元?
     

  • 以信道为单元:一个信道关闭时全部关联的消费者都要删除
  • 以队列为单元:一个队列收到了一条消息,必要找到订购了队列消息的消费者进行推送
     消费者管理(以队列为单元进行管理):
     

  • 操作:
     

  • 新增消费者:信道提供的服务式订阅队列消息的时候创建
  • 删除消费者:取消订阅/信道关闭/连接关闭的时候删除
  • 获取消费者:从队列中全部的消费者中按序取出一个消费者进行消息推送
  • 判定队列消费者是否为空
  • 判定指定消费者是否存在
  • 清理队列全部消费者
     

  • 元素
     

  • 消费者管理结构:vector
  • 轮转序号:一个队列可能会有多个消费者,但是一条消息只必要被一个消费者消费即可,因此采用RR轮转
  • 互斥锁:保证线程安全
  • 队列名称
     对消费者进行同一管理
     

  • 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)
  • 向指定队列新增消费者(客户端订阅指定队列消息的时候)新增完成时返回消费者对象,让信道能管理
  • 从指定队列移除消费者(客户端取消订阅的时候)
  • 移除指定队列的全部消费者(队列被删除时销毁)删除消费者的队列管理单元对象
  • 从指定队列获取一个消费者(轮询获取-消费者轮询消费起到负载平衡作用)
  • 判定队列消费者是否为空
  • 判定队列中指定消费者是否存在
  • 清理消费者
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. namespace lsxmq
  6. {
  7.     using ConsumerCallBack=std::function<void(const std::string&,BasicProperties*,const std::string&)>;
  8.                                                       //消费者标识     //基本属性             //消息内容
  9.     struct Consumer
  10.     {
  11.         using ptr=std::shared_ptr<Consumer>;
  12.         Consumer(){}
  13.         Consumer(const std::string& ctag,const std::string& cqname,bool cautoAck,const ConsumerCallBack& ccallback)
  14.             :tag(ctag),qname(cqname),autoAck(cautoAck),callback(ccallback)
  15.         {}
  16.         std::string tag;//消费者标识
  17.         std::string qname;//队列名称
  18.         bool autoAck;//自动确认标志
  19.         ConsumerCallBack callback;//消费回调函数
  20.     };
  21.     //以队列为单元管理消费者
  22.     class QueueConsumer
  23.     {
  24.     public:
  25.         using ptr=std::shared_ptr<QueueConsumer>;
  26.         QueueConsumer(const std::string& qname)
  27.             :_qname(qname),_rrSeq(0)
  28.         {}
  29.         Consumer::ptr create(const std::string& tag,bool autoAck,const ConsumerCallBack& callback)//创建一个消费者
  30.         {
  31.             //1.先加锁
  32.             std::unique_lock<std::mutex> lock(_mutex);
  33.             //2.判断该消费者是否已存在
  34.             for(auto& con:_consumers)
  35.             {
  36.                 if(con->tag==tag)
  37.                 {
  38.                     return con;
  39.                 }
  40.             }
  41.             //3.构造消费者
  42.             Consumer::ptr cp=std::make_shared<Consumer>(tag,_qname,autoAck,callback);
  43.             //4.添加管理并返回消费者
  44.             _consumers.push_back(cp);
  45.             return cp;
  46.         }
  47.         void remove(const std::string& tag)//删除一个消费者
  48.         {
  49.             //1.先加锁
  50.             std::unique_lock<std::mutex> lock(_mutex);
  51.             //2.查找消费者并删除
  52.             for(auto it=_consumers.begin();it!=_consumers.end();++it)
  53.             {
  54.                 if((*it)->tag==tag)
  55.                 {
  56.                     _consumers.erase(it);
  57.                     return ;
  58.                 }
  59.             }
  60.         }
  61.         Consumer::ptr choose()//选择一个消费者
  62.         {
  63.             //1.先加锁
  64.             std::unique_lock<std::mutex> lock(_mutex);
  65.             if(_consumers.size()==0)//没有消费者
  66.             {
  67.                 return Consumer::ptr();
  68.             }
  69.             //2.获取选择的消费者序号
  70.             size_t id=_rrSeq%_consumers.size();
  71.             ++_rrSeq;
  72.             //3.返回消费者
  73.             return _consumers[id];
  74.         }
  75.         bool empty()//判断队列中是否没有消费者
  76.         {
  77.             //1.先加锁
  78.             std::unique_lock<std::mutex> lock(_mutex);
  79.             return _consumers.empty();
  80.         }
  81.         bool exists(const std::string& tag)//判断消费者是否存在
  82.         {
  83.             //1.先加锁
  84.             std::unique_lock<std::mutex> lock(_mutex);
  85.             //2.判断消费者是否存在
  86.             for(auto& con:_consumers)
  87.             {
  88.                 if(con->tag==tag)
  89.                 {
  90.                     return true;
  91.                 }
  92.             }
  93.             return false;
  94.         }
  95.         void clear()//清空
  96.         {
  97.             //1.先加锁
  98.             std::unique_lock<std::mutex> lock(_mutex);
  99.             _consumers.clear();
  100.         }
  101.     private:
  102.         std::vector<Consumer::ptr> _consumers;
  103.         size_t _rrSeq;
  104.         std::mutex _mutex;
  105.         std::string _qname;
  106.     };
  107.     class QueueConsumerManager
  108.     {
  109.     public:
  110.         using ptr=std::shared_ptr<QueueConsumerManager>;
  111.         QueueConsumerManager(){}
  112.         void initQueueConsumer(const std::string& qname)//初始化队列
  113.         {
  114.             //1.加锁
  115.             std::unique_lock<std::mutex> lock(_mutex);
  116.             //2.是否已存在
  117.             auto it=_queueConsumers.find(qname);
  118.             if(it!=_queueConsumers.end())
  119.             {
  120.                 return ;
  121.             }
  122.             //2.构造队列
  123.             QueueConsumer::ptr qcp=std::make_shared<QueueConsumer>(qname);
  124.             //3.添加管理
  125.             _queueConsumers.insert(std::make_pair(qname,qcp));
  126.         }
  127.         void destroyQueueConsumer(const std::string& qname)//销毁队列
  128.         {
  129.             //1.加锁
  130.             std::unique_lock<std::mutex> lock(_mutex);
  131.             //2.是否不存在
  132.             auto it=_queueConsumers.find(qname);
  133.             if(it==_queueConsumers.end())
  134.             {
  135.                 return ;
  136.             }
  137.             //3.删除队列消费者
  138.             //it->second->clear();//不需要,智能指针指向的对象会自动释放
  139.             _queueConsumers.erase(qname);
  140.         }
  141.         Consumer::ptr create(const std::string& qname,const std::string& tag
  142.                 ,bool autoAck,const ConsumerCallBack& callback)//往某个队列创建消费者
  143.         {
  144.             QueueConsumer::ptr qcp;
  145.             {
  146.                  //1.加锁
  147.                 std::unique_lock<std::mutex> lock(_mutex);
  148.                 //2.判断队列是否不存在
  149.                 auto it=_queueConsumers.find(qname);
  150.                 if(it==_queueConsumers.end())
  151.                 {
  152.                     ELOG("向队列%s添加消费者%s失败,因为该队列不存在",qname.c_str(),tag.c_str());
  153.                     return Consumer::ptr();
  154.                 }
  155.                 qcp=it->second;
  156.             }
  157.             //3.通过队列消费者句柄创建消费者
  158.             return qcp->create(tag,autoAck,callback);
  159.         }
  160.         void remove(const std::string& qname,const std::string& tag)//从某个队列中删除一个消费者
  161.         {
  162.             QueueConsumer::ptr qcp;
  163.             {
  164.                  //1.加锁
  165.                 std::unique_lock<std::mutex> lock(_mutex);
  166.                 //2.判断队列是否不存在
  167.                 auto it=_queueConsumers.find(qname);
  168.                 if(it==_queueConsumers.end())
  169.                 {
  170.                     ELOG("从队列%s删除消费者%s失败,因为该队列不存在",qname.c_str(),tag.c_str());
  171.                     return ;
  172.                 }
  173.                 qcp=it->second;
  174.             }
  175.             //3.通过队列消费者句柄创建消费者
  176.             qcp->remove(tag);
  177.         }
  178.         Consumer::ptr choose(const std::string& qname)//选择一个消费者
  179.         {
  180.             QueueConsumer::ptr qcp;
  181.             {
  182.                  //1.加锁
  183.                 std::unique_lock<std::mutex> lock(_mutex);
  184.                 //2.判断队列是否不存在
  185.                 auto it=_queueConsumers.find(qname);
  186.                 if(it==_queueConsumers.end())
  187.                 {
  188.                     ELOG("从队列%s获取消费者失败,因为该队列不存在",qname.c_str());
  189.                     return Consumer::ptr();
  190.                 }
  191.                 qcp=it->second;
  192.             }
  193.             //3.通过队列消费者句柄创建消费者
  194.             return qcp->choose();
  195.         }
  196.         bool empty(const std::string& qname)
  197.         {
  198.             QueueConsumer::ptr qcp;
  199.             {
  200.                  //1.加锁
  201.                 std::unique_lock<std::mutex> lock(_mutex);
  202.                 //2.判断队列是否不存在
  203.                 auto it=_queueConsumers.find(qname);
  204.                 if(it==_queueConsumers.end())
  205.                 {
  206.                     ELOG("从队列%s获取是否为空失败,因为该队列不存在",qname.c_str());
  207.                     return false;
  208.                 }
  209.                 qcp=it->second;
  210.             }
  211.             //3.通过队列消费者句柄创建消费者
  212.             return qcp->empty();
  213.             
  214.         }
  215.         bool exists(const std::string& qname,const std::string& tag)
  216.         {
  217.             QueueConsumer::ptr qcp;
  218.             {
  219.                  //1.加锁
  220.                 std::unique_lock<std::mutex> lock(_mutex);
  221.                 //2.判断队列是否不存在
  222.                 auto it=_queueConsumers.find(qname);
  223.                 if(it==_queueConsumers.end())
  224.                 {
  225.                     ELOG("从队列%s获取消费者失败,因为该队列不存在",qname.c_str());
  226.                     return false;
  227.                 }
  228.                 qcp=it->second;
  229.             }
  230.             //3.通过队列消费者句柄创建消费者
  231.             return qcp->exists(tag);
  232.         }
  233.         void clear()
  234.         {
  235.             //1.加锁
  236.             std::unique_lock<std::mutex> lock(_mutex);
  237.             _queueConsumers.clear();
  238.         }
  239.     private:
  240.         std::mutex _mutex;
  241.         std::unordered_map<std::string,QueueConsumer::ptr> _queueConsumers;
  242.     };
  243. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
                       11、信道管理模块

     管理信息:
     

  • 信道id:信道的唯一标识
  • 信道关联的消费者:用于消费者信道在关闭时取消订阅,删除订阅者信息
  • 信道关联的连接:用于向客户端发送数据(相应/推送消息)
  • protobuf协议的处理句柄:网络通讯前的协议处理
  • 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息
  • 假造机句柄:交换机/队列/绑定/消息数据管理
  • 工作线程池句柄:一条消息被发布到队列后,必要将消息推送给订阅了对应队列的消费者,过程由线程池完成。
     管理操作:
     

  • 提供声明&删除交换机操作(删除交换机的同时删除交换构造联的绑定信息)
  • 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
  • 提供绑定&解绑队列操作
  • 提供订阅&取消订阅队列消息操作
  • 提供发布&确认消息操作
     信道管理:
     

  • 信道的增删查
     应用层协议设计:
                                   登录后复制                        
  1. syntax= "proto3";
  2. package lsxmq;
  3. import "message.proto";
  4. //信道的打开与关闭
  5. message OpenChannelRequest  //打开信道
  6. {
  7.     string rid = 1;  //请求id,对应不同的请求类型
  8.     string cid = 2;  //信道id
  9. };
  10. message CloseChannelRequest  //关闭信道
  11. {
  12.     string rid = 1;
  13.     string cid = 2;
  14. };
  15. //交换机的声明与删除
  16. message DeclareExchangeRequest  //声明交换机
  17. {
  18.     string rid = 1;
  19.     string cid = 2;
  20.     string exchange_name = 3;  //交换机名称
  21.     ExchangeType exchange_type = 4;  //交换机类型
  22.     bool durable = 5;  //持久化
  23.     bool auto_delete = 6;  //自动删除
  24.     map<string, string> args = 7;  //其它参数
  25. };
  26. message DeleteExchangeRequest  //删除交换机
  27. {
  28.     string rid = 1;
  29.     string cid = 2;
  30.     string exchange_name = 3;
  31. };
  32. //消息队列的声明与删除
  33. message DeclareMessageQueueRequest  //声明消息队列
  34. {
  35.     string rid = 1;
  36.     string cid = 2;
  37.     string queue_name = 3;  //队列名称
  38.     bool exclusive = 4;  //是否独占
  39.     bool durable = 5;  
  40.     bool auto_delete = 6;
  41.     map<string, string> args = 7;
  42. };
  43. message DeleteMessageQueueRequest  //删除消息队列
  44. {
  45.     string rid = 1;
  46.     string cid = 2;
  47.     string queue_name = 3;
  48. };
  49. //绑定与解绑
  50. message BindRequest  //绑定
  51. {
  52.     string rid = 1;
  53.     string cid = 2;
  54.     string exchange_name = 3;
  55.     string queue_name = 4;
  56.     string binding_key = 5;
  57. };
  58. message UnbindRequest  //解绑
  59. {
  60.     string rid = 1;
  61.     string cid = 2;
  62.     string exchange_name = 3;
  63.     string queue_name = 4;
  64. };
  65. //订阅与取消订阅
  66. message BasicConsumeRequest  //订阅
  67. {
  68.     string rid = 1;
  69.     string cid = 2;
  70.     string consumer_tag  =3;  //消费者标识
  71.     string queue_name = 4;
  72.     bool auto_ack = 5;  //是否自动确认
  73. };
  74. message BasicCancelRequest  //取消订阅
  75. {
  76.     string rid = 1;
  77.     string cid = 2;
  78.     string consumer_tag = 3;
  79.     string queue_name = 4;
  80. };
  81. //发布与确认消息
  82. message BasicPublishRequest //发布消息
  83. {
  84.     string rid = 1;
  85.     string cid = 2;
  86.     string exchange_name = 3;
  87.     string body = 4;  //消息内容
  88.     BasicProperties properties = 5;  //消息基本属性
  89. };
  90. message BasicAckRequest   //确认消息
  91. {
  92.     string rid = 1;
  93.     string cid = 2;
  94.     string queue_name = 3;
  95.     string mid = 4;
  96. };
  97. //消息的推送
  98. message basicConsumeResponse {
  99.     string cid = 1;
  100.     string consumer_tag = 2;
  101.     string body = 3;
  102.     BasicProperties properties = 4;
  103. };
  104. //通用响应
  105. message BasicCommonResponse
  106. {
  107.     string rid = 1;
  108.     string cid = 2;
  109.     bool ok = 3;
  110. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
                       12、连接管理模块

     向⽤⼾提供⼀个⽤于实现⽹络通讯的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与客⼾端进⾏⽹络通讯。
     成员信息:
     

  • 连接关联的信道管理句柄(实现信道的增删查)
  • 连接关联的现实⽤于通讯的muduo::net::Connection连接
  • protobuf协议处理的句柄(ProtobufCodec对象)
  • 消费者管理句柄
  • 假造机句柄
  • 异步⼯作线程池句柄
     连接操作:
     

  • 提供创建Channel信道的操作
  • 提供删除Channel信道的操作
     连接管理:
     

  • 连接的增删查
                                   登录后复制                        
  1. #pragma once
  2. #include "channel.hpp"
  3. namespace lsxmq
  4. {
  5.     class Connection
  6.     {
  7.     public:
  8.         using ptr=std::shared_ptr<Connection>;
  9.         Connection(const muduo::net::TcpConnectionPtr &conn
  10.             , const ProtobufCodecPtr &codec
  11.             , const QueueConsumerManager::ptr &qcmp
  12.             , const VirtualHost::ptr &vhp
  13.             , const ThreadPool::ptr &pool)
  14.             : _conn(conn)
  15.             , _codec(codec)
  16.             , _qcmp(qcmp)
  17.             , _vhp(vhp)
  18.             , _pool(pool)
  19.             , _chmp(std::make_shared<ChannelManager>())
  20.         {
  21.             DLOG("new Connection: %p",this);
  22.         }
  23.         ~Connection()
  24.         {
  25.             DLOG("delete Connection: %p",this);
  26.         }
  27.         void openChannel(const OpenChannelRequestPtr &req)
  28.         {
  29.             // 1.判断信道是否已打开
  30.             bool ret = _chmp->openChannel(req->cid(),_conn,_codec,_qcmp,_vhp,_pool);
  31.             if(ret==false)
  32.             {
  33.                 basicResponse(req->rid(),req->cid(),false);
  34.                 return;
  35.             }
  36.             //2.响应
  37.             basicResponse(req->rid(),req->cid(),true);
  38.         }
  39.         void closeChannel(const CloseChannelRequestPtr &req)
  40.         {
  41.             _chmp->closeChannel(req->cid());
  42.             basicResponse(req->rid(),req->cid(),true);
  43.         }
  44.         Channel::ptr getChannel(const std::string& cid)
  45.         {
  46.             return _chmp->getChannel(cid);
  47.         }
  48.     private:
  49.         void basicResponse(const std::string &rid, const std::string &cid, bool ok)
  50.         {
  51.             BasicCommonResponse rsp;
  52.             rsp.set_rid(rid);
  53.             rsp.set_cid(cid);
  54.             rsp.set_ok(ok);
  55.             _codec->send(_conn, rsp);
  56.         }
  57.     private:
  58.         muduo::net::TcpConnectionPtr _conn; // 关联的连接
  59.         ProtobufCodecPtr _codec;            // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
  60.         QueueConsumerManager::ptr _qcmp;    // 消费者管理句柄
  61.         VirtualHost::ptr _vhp;              // 虚拟机管理句柄
  62.         ThreadPool::ptr _pool;              // 线程池管理句柄
  63.         ChannelManager::ptr _chmp;          // 信道管理句柄
  64.     };
  65.     class ConnectionManager
  66.     {
  67.     public:
  68.         using ptr=std::shared_ptr<ConnectionManager>;
  69.         ConnectionManager()
  70.         {
  71.             DLOG("new ConnectionManager: %p",this);
  72.         }
  73.         ~ConnectionManager()
  74.         {
  75.             DLOG("delete ConnectionManager: %p",this);
  76.         }
  77.         void newConnection(const muduo::net::TcpConnectionPtr &conn
  78.             , const ProtobufCodecPtr &codec
  79.             , const QueueConsumerManager::ptr &qcmp
  80.             , const VirtualHost::ptr &vhp
  81.             , const ThreadPool::ptr &pool)
  82.         {
  83.             std::unique_lock<std::mutex> lock(_mutex);
  84.             //连接是否已存在
  85.             auto it=_conns.find(conn);
  86.             if(it!=_conns.end())
  87.             {
  88.                 return;
  89.             }
  90.             //构建对象
  91.             Connection::ptr cop=std::make_shared<Connection>(conn,codec,qcmp,vhp,pool);
  92.             //添加管理
  93.             _conns[conn]=cop;
  94.         }
  95.         
  96.         void deleteConnection(const muduo::net::TcpConnectionPtr &conn)
  97.         {
  98.             std::unique_lock<std::mutex> lock(_mutex);
  99.             //连接是否不存在
  100.             auto it=_conns.find(conn);
  101.             if(it==_conns.end())
  102.             {
  103.                 return;
  104.             }
  105.             //删除
  106.             _conns.erase(it);
  107.         }
  108.         Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn)
  109.         {
  110.             std::unique_lock<std::mutex> lock(_mutex);
  111.             //连接是否不存在
  112.             auto it=_conns.find(conn);
  113.             if(it==_conns.end())
  114.             {
  115.                 return Connection::ptr();
  116.             }
  117.             return it->second;
  118.         }
  119.     private:
  120.         std::mutex _mutex;
  121.         std::unordered_map<muduo::net::TcpConnectionPtr,Connection::ptr> _conns;
  122.     };
  123. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
                       13、服务器模块

     服务器模块借助muduo网络库来实现
     

  • _server:Muduo库提供的⼀个通⽤TCP服务器, 可以封装这个服务器进⾏TCP通讯
  • _baseloop:主事件循环器, ⽤于相应IO事件和定时器事件,主loop主要是为了相应监听描述符的IO事件
  • _codec: ⼀个protobuf编解码器, 在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边详细讲解
  • _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我必要按照消息的范例, 即上⾯提到的typeName进⾏消息分发, 会不不同范例的消息分发相对应的的处理函数中,下边详细讲解
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
  • _connections: 连接管理句柄,管理当前服务器上的全部已经建⽴的通讯连接。
  • _virtual_host:服务器持有的假造主机。 队列、交换机 、绑定、消息等数据都是通过假造主机管理。
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqthird/include/muduo/proto/codec.h"
  3. #include "../mqthird/include/muduo/proto/dispatcher.h"
  4. #include "../mqthird/include/muduo/base/Logging.h"
  5. #include "../mqthird/include/muduo/base/Mutex.h"
  6. #include "../mqthird/include/muduo/net/EventLoop.h"
  7. #include "../mqthird/include/muduo/net/TcpServer.h"
  8. #include "connection.hpp"
  9. namespace lsxmq
  10. {
  11.     //typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  12.     #define DBFILE "/meta.db"
  13.     #define HOSTNAME "lsxhost"
  14.     class Server
  15.     {
  16.     public:
  17.         Server(uint16_t port,const std::string& basedir)
  18.             : _server(&_loop, muduo::net::InetAddress(port)
  19.             , "ProtobufServer", muduo::net::TcpServer::kReusePort)
  20.             , _dispatcher(std::bind(&Server::onUnknownMessage
  21.                     , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  22.             , _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher
  23.                     , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
  24.             ,_consumerManager(std::make_shared<QueueConsumerManager>())
  25.             ,_virtualHost(std::make_shared<VirtualHost>(HOSTNAME,basedir+DBFILE,basedir))
  26.             ,_threadPool(std::make_shared<ThreadPool>())
  27.             ,_connectionManager(std::make_shared<ConnectionManager>())
  28.         {
  29.             //需要初始化队列的消费者结构,因为消息队列是持久化的,构造时会自动加载,但消费者的队列结构不会
  30.             //1.获取所有队列
  31.             auto mqmap=_virtualHost->allQueue();
  32.             for(auto& mqp:mqmap)
  33.             {
  34.                 _consumerManager->initQueueConsumer(mqp.second->name);
  35.             }
  36.             // 注册业务处理请求函数
  37.             _dispatcher.registerMessageCallback<OpenChannelRequest>(std::bind(&Server::onOpenChannel
  38.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  39.             _dispatcher.registerMessageCallback<CloseChannelRequest>(std::bind(&Server::onCloseChannel
  40.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  41.             _dispatcher.registerMessageCallback<DeclareExchangeRequest>(std::bind(&Server::onDeclareExchange
  42.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  43.             _dispatcher.registerMessageCallback<DeleteExchangeRequest>(std::bind(&Server::onDeleteExchange
  44.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  45.             _dispatcher.registerMessageCallback<DeclareMessageQueueRequest>(std::bind(&Server::onDeclareMessageQueue
  46.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  47.             _dispatcher.registerMessageCallback<DeleteMessageQueueRequest>(std::bind(&Server::onDeleteMessageQueue
  48.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  49.             _dispatcher.registerMessageCallback<BindRequest>(std::bind(&Server::onBind
  50.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  51.             _dispatcher.registerMessageCallback<UnbindRequest>(std::bind(&Server::onUnbind
  52.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  53.             _dispatcher.registerMessageCallback<BasicConsumeRequest>(std::bind(&Server::onBasicConsume
  54.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  55.             _dispatcher.registerMessageCallback<BasicCancelRequest>(std::bind(&Server::onBasicCancel
  56.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  57.             _dispatcher.registerMessageCallback<BasicPublishRequest>(std::bind(&Server::onBasicPublish
  58.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  59.             _dispatcher.registerMessageCallback<BasicAckRequest>(std::bind(&Server::onBasicAckRequest
  60.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  61.             //注册服务器的回调函数
  62.             _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
  63.             _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
  64.                                                  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  65.         }
  66.         void start()
  67.         {
  68.             _server.start();
  69.             _loop.loop();
  70.         }
  71.     private:
  72.         //打开信道
  73.         void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const OpenChannelRequestPtr &message, muduo::Timestamp)
  74.         {
  75.             //获取连接
  76.             auto selfConnP=_connectionManager->getConnection(conn);
  77.             if(selfConnP.get()==nullptr)
  78.             {
  79.                 ELOG("打开信道失败,因为连接不存在");
  80.                 return ;
  81.             }
  82.             selfConnP->openChannel(message);
  83.         }
  84.         //关闭信道
  85.         void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const CloseChannelRequestPtr &message, muduo::Timestamp)
  86.         {
  87.             //获取连接
  88.             auto selfConnP=_connectionManager->getConnection(conn);
  89.             if(selfConnP.get()==nullptr)
  90.             {
  91.                 ELOG("关闭信道失败,因为连接不存在");
  92.                 return ;
  93.             }
  94.             selfConnP->closeChannel(message);
  95.         }
  96.         //声明交换机
  97.         void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const DeclareExchangeRequestPtr &message, muduo::Timestamp)
  98.         {
  99.             //1.获取连接
  100.             auto selfConnP=_connectionManager->getConnection(conn);
  101.             if(selfConnP.get()==nullptr)
  102.             {
  103.                 ELOG("声明交换机失败,因为连接不存在");
  104.                 return ;
  105.             }
  106.             //2.获取信道
  107.             auto selfChannelP=selfConnP->getChannel(message->cid());
  108.             if(selfChannelP.get()==nullptr)
  109.             {
  110.                 ELOG("声明交换机失败,因为信道不存在");
  111.                 return ;
  112.             }
  113.             //声明交换机
  114.             selfChannelP->declareExchange(message);
  115.         }
  116.         //删除交换机
  117.         void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const DeleteExchangeRequestPtr &message, muduo::Timestamp)
  118.         {
  119.             //1.获取连接
  120.             auto selfConnP=_connectionManager->getConnection(conn);
  121.             if(selfConnP.get()==nullptr)
  122.             {
  123.                 ELOG("删除交换机失败,因为连接不存在");
  124.                 return ;
  125.             }
  126.             //2.获取信道
  127.             auto selfChannelP=selfConnP->getChannel(message->cid());
  128.             if(selfChannelP.get()==nullptr)
  129.             {
  130.                 ELOG("删除交换机失败,因为信道不存在");
  131.                 return ;
  132.             }
  133.             //删除交换机
  134.             selfChannelP->deleteExchange(message);
  135.         }
  136.         //声明消息队列
  137.         void onDeclareMessageQueue(const muduo::net::TcpConnectionPtr &conn, const DeclareMessageQueueRequestPtr &message, muduo::Timestamp)
  138.         {
  139.             //1.获取连接
  140.             auto selfConnP=_connectionManager->getConnection(conn);
  141.             if(selfConnP.get()==nullptr)
  142.             {
  143.                 ELOG("声明消息队列失败,因为连接不存在");
  144.                 return ;
  145.             }
  146.             //2.获取信道
  147.             auto selfChannelP=selfConnP->getChannel(message->cid());
  148.             if(selfChannelP.get()==nullptr)
  149.             {
  150.                 ELOG("声明消息队列失败,因为信道不存在");
  151.                 return ;
  152.             }
  153.             //声明消息队列
  154.             selfChannelP->declareMessageQueue(message);
  155.         }
  156.         //删除消息队列
  157.         void onDeleteMessageQueue(const muduo::net::TcpConnectionPtr &conn, const DeleteMessageQueueRequestPtr &message, muduo::Timestamp)
  158.         {
  159.             //1.获取连接
  160.             auto selfConnP=_connectionManager->getConnection(conn);
  161.             if(selfConnP.get()==nullptr)
  162.             {
  163.                 ELOG("删除消息队列失败,因为连接不存在");
  164.                 return ;
  165.             }
  166.             //2.获取信道
  167.             auto selfChannelP=selfConnP->getChannel(message->cid());
  168.             if(selfChannelP.get()==nullptr)
  169.             {
  170.                 ELOG("删除消息队列失败,因为信道不存在");
  171.                 return ;
  172.             }
  173.             //删除消息队列
  174.             selfChannelP->deleteMessageQueue(message);
  175.         }
  176.         //绑定
  177.         void onBind(const muduo::net::TcpConnectionPtr &conn, const BindRequestPtr &message, muduo::Timestamp)
  178.         {
  179.             //1.获取连接
  180.             auto selfConnP=_connectionManager->getConnection(conn);
  181.             if(selfConnP.get()==nullptr)
  182.             {
  183.                 ELOG("绑定失败,因为连接不存在");
  184.                 return ;
  185.             }
  186.             //2.获取信道
  187.             auto selfChannelP=selfConnP->getChannel(message->cid());
  188.             if(selfChannelP.get()==nullptr)
  189.             {
  190.                 ELOG("绑定失败,因为信道不存在");
  191.                 return ;
  192.             }
  193.             //绑定
  194.             selfChannelP->bind(message);
  195.         }
  196.         //解绑
  197.         void onUnbind(const muduo::net::TcpConnectionPtr &conn, const UnbindRequestPtr &message, muduo::Timestamp)
  198.         {
  199.             //1.获取连接
  200.             auto selfConnP=_connectionManager->getConnection(conn);
  201.             if(selfConnP.get()==nullptr)
  202.             {
  203.                 ELOG("解绑失败,因为连接不存在");
  204.                 return ;
  205.             }
  206.             //2.获取信道
  207.             auto selfChannelP=selfConnP->getChannel(message->cid());
  208.             if(selfChannelP.get()==nullptr)
  209.             {
  210.                 ELOG("解绑失败,因为信道不存在");
  211.                 return ;
  212.             }
  213.             //解绑
  214.             selfChannelP->unbind(message);
  215.         }
  216.         //订阅
  217.         void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRequestPtr &message, muduo::Timestamp)
  218.         {
  219.             //1.获取连接
  220.             auto selfConnP=_connectionManager->getConnection(conn);
  221.             if(selfConnP.get()==nullptr)
  222.             {
  223.                 ELOG("订阅失败,因为连接不存在");
  224.                 return ;
  225.             }
  226.             //2.获取信道
  227.             auto selfChannelP=selfConnP->getChannel(message->cid());
  228.             if(selfChannelP.get()==nullptr)
  229.             {
  230.                 ELOG("订阅失败,因为信道不存在");
  231.                 return ;
  232.             }
  233.             //订阅
  234.             selfChannelP->basicConsume(message);
  235.         }
  236.         //取消订阅
  237.         void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const BasicCancelRequestPtr &message, muduo::Timestamp)
  238.         {
  239.             //1.获取连接
  240.             auto selfConnP=_connectionManager->getConnection(conn);
  241.             if(selfConnP.get()==nullptr)
  242.             {
  243.                 ELOG("取消订阅失败,因为连接不存在");
  244.                 return ;
  245.             }
  246.             //2.获取信道
  247.             auto selfChannelP=selfConnP->getChannel(message->cid());
  248.             if(selfChannelP.get()==nullptr)
  249.             {
  250.                 ELOG("取消订阅失败,因为信道不存在");
  251.                 return ;
  252.             }
  253.             //取消订阅
  254.             selfChannelP->basicCancel(message);
  255.         }
  256.         //发布消息
  257.         void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const BasicPublishRequestPtr &message, muduo::Timestamp)
  258.         {
  259.             //1.获取连接
  260.             auto selfConnP=_connectionManager->getConnection(conn);
  261.             if(selfConnP.get()==nullptr)
  262.             {
  263.                 ELOG("发布消息失败,因为连接不存在");
  264.                 return ;
  265.             }
  266.             //2.获取信道
  267.             auto selfChannelP=selfConnP->getChannel(message->cid());
  268.             if(selfChannelP.get()==nullptr)
  269.             {
  270.                 ELOG("发布消息失败,因为信道不存在");
  271.                 return ;
  272.             }
  273.             //发布消息
  274.             selfChannelP->basicPublish(message);
  275.         }
  276.         //确认消息
  277.         void onBasicAckRequest(const muduo::net::TcpConnectionPtr &conn, const BasicAckRequestPtr &message, muduo::Timestamp)
  278.         {
  279.             //1.获取连接
  280.             auto selfConnP=_connectionManager->getConnection(conn);
  281.             if(selfConnP.get()==nullptr)
  282.             {
  283.                 ELOG("确认消息失败,因为连接不存在");
  284.                 return ;
  285.             }
  286.             //2.获取信道
  287.             auto selfChannelP=selfConnP->getChannel(message->cid());
  288.             if(selfChannelP.get()==nullptr)
  289.             {
  290.                 ELOG("确认消息失败,因为信道不存在");
  291.                 return ;
  292.             }
  293.             //确认消息
  294.             selfChannelP->basicAck(message);
  295.         }
  296.         void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const std::shared_ptr<google::protobuf::Message>& message,muduo::Timestamp)
  297.         {
  298.             //收到未知消息
  299.             LOG_INFO<<message->GetTypeName();
  300.             conn->shutdown();
  301.         }
  302.         void onConnection(const muduo::net::TcpConnectionPtr &conn)
  303.         {
  304.             if (conn->connected())
  305.             {
  306.                 LOG_INFO << "连接成功";
  307.                 _connectionManager->newConnection(conn,_codec,_consumerManager,_virtualHost,_threadPool);
  308.             }
  309.             else
  310.             {
  311.                 LOG_INFO << "断开连接";
  312.                 _connectionManager->deleteConnection(conn);
  313.             }
  314.         }
  315.     private:
  316.         muduo::net::EventLoop _loop;
  317.         muduo::net::TcpServer _server;  // 服务器
  318.         ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
  319.         ProtobufCodecPtr _codec;           // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
  320.         QueueConsumerManager::ptr _consumerManager;     // 消费者管理句柄
  321.         VirtualHost::ptr _virtualHost;      // 虚拟机管理句柄
  322.         ThreadPool::ptr _threadPool;        // 线程池管理句柄
  323.         ConnectionManager::ptr _connectionManager;     // 连接管理句柄
  324.     };
  325. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
                       客户端实现

     1、订阅者模块

     ⼀个并不直接对⽤⼾展⽰的模块,其在客⼾端体现的作⽤就是对于⻆⾊的描述,表⽰这是⼀个消费者。
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. namespace lsxmq
  6. {
  7.     using ConsumerCallBack=std::function<void(const std::string&,BasicProperties*,const std::string&)>;
  8.                                                       //消费者标识     //基本属性             //消息内容
  9.     struct Consumer
  10.     {
  11.         using ptr=std::shared_ptr<Consumer>;
  12.         Consumer()
  13.         {
  14.             DLOG("new Consumer: %p",this);
  15.         }
  16.         ~Consumer()
  17.         {
  18.             DLOG("delete Consumer: %p",this);
  19.         }
  20.         Consumer(const std::string& ctag,const std::string& cqname,bool cautoAck,const ConsumerCallBack& ccallback)
  21.             :tag(ctag),qname(cqname),autoAck(cautoAck),callback(ccallback)
  22.         {}
  23.         std::string tag;//消费者标识
  24.         std::string qname;//队列名称
  25.         bool autoAck;//自动确认标志
  26.         ConsumerCallBack callback;//消费回调函数
  27.     };
  28. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
                       2、信道管理模块

     信道信息:
     

  • 信道ID
  • 信道关联的网络通讯连接对象
  • protobuf协议处理对象
  • 信道关联的消费者
  • 请结构<哀求id,相应>,方便查找)
  • 互斥锁&条件变量(大部门哀求是壅闭操作,但muduo库的通讯是异步的,因此在收到应答后,通过判定是否是等候的指定相应来进行同步):发送消息时,还没有声明交换机成功怎么办?以是要同步
     信道操作:
     

  • 打开信道
  • 关闭信道
  • 声明交换机
  • 删除交换机
  • 声明队列
  • 删除队列
  • 绑定
  • 解绑
  • 订阅
  • 取消订阅
  • 发布消息
  • 确认消息
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/logger.hpp"
  3. #include "../mqcomm/threadpool.hpp"
  4. #include "../mqcomm/proto.pb.h"
  5. #include "../mqcomm/message.pb.h"
  6. #include "consumer.hpp"
  7. #include "muduo/net/TcpConnection.h"
  8. #include "muduo/proto/codec.h"
  9. namespace lsxmq
  10. {
  11.     using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;
  12.     using BasicCommonResponsePtr=std::shared_ptr<BasicCommonResponse>;
  13.     using BasicConsumeResponsePtr=std::shared_ptr<basicConsumeResponse>;
  14.     class Channel
  15.     {
  16.     public:
  17.         using ptr=std::shared_ptr<Channel>;
  18.         Channel(const muduo::net::TcpConnectionPtr& conn
  19.                 ,const ProtobufCodecPtr& codec)
  20.                 :_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec)
  21.         {}
  22.         ~Channel()
  23.         {
  24.             basicCancel();//析构时取消订阅
  25.         }
  26.         std::string cid()
  27.         {
  28.             return _cid;
  29.         }
  30.         //打开信道
  31.         bool openChannel()
  32.         {
  33.             //构造请求
  34.             OpenChannelRequest req;
  35.             req.set_rid(UUIDHelper::uuid());
  36.             req.set_cid(_cid);
  37.             //发送请求并等待
  38.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  39.             auto rsp=waitBasicResponse(req.rid());
  40.             if(rsp->ok()==false)
  41.             {
  42.                 ELOG("打开信道失败");
  43.             }
  44.             return rsp->ok();
  45.         }
  46.         //关闭信道
  47.         bool closeChannel()
  48.         {
  49.             //构造请求
  50.             CloseChannelRequest req;
  51.             req.set_rid(UUIDHelper::uuid());
  52.             req.set_cid(_cid);
  53.             //发送请求并等待
  54.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  55.             auto rsp=waitBasicResponse(req.rid());
  56.             if(rsp->ok()==false)
  57.             {
  58.                 ELOG("关闭信道失败");
  59.             }
  60.             return rsp->ok();
  61.         }
  62.         //声明交换机
  63.         bool declareExchange(const std::string& exchangeName
  64.                             ,ExchangeType exchangeType
  65.                             ,bool durable
  66.                             ,bool autoDelete
  67.                             ,google::protobuf::Map<std::string, std::string>& args)
  68.         {
  69.             //构造请求
  70.             DeclareExchangeRequest req;
  71.             req.set_rid(UUIDHelper::uuid());
  72.             req.set_cid(_cid);
  73.             req.set_exchange_name(exchangeName);
  74.             req.set_exchange_type(exchangeType);
  75.             req.set_durable(durable);
  76.             req.set_auto_delete(autoDelete);
  77.             req.mutable_args()->swap(args);
  78.             //发送请求并等待
  79.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  80.             auto rsp=waitBasicResponse(req.rid());
  81.             if(rsp->ok()==false)
  82.             {
  83.                 ELOG("声明交换机失败");
  84.             }
  85.             return rsp->ok();
  86.         }
  87.         //删除交换机
  88.         bool deleteExchange(const std::string& exchangeName)
  89.         {
  90.             //构造请求
  91.             DeleteExchangeRequest req;
  92.             req.set_rid(UUIDHelper::uuid());
  93.             req.set_cid(_cid);
  94.             req.set_exchange_name(exchangeName);
  95.             //发送请求并等待
  96.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  97.             auto rsp=waitBasicResponse(req.rid());
  98.             if(rsp->ok()==false)
  99.             {
  100.                 ELOG("删除交换机失败");
  101.             }
  102.             return rsp->ok();
  103.         }
  104.         //声明消息队列
  105.         bool declareMessageQueue(const std::string& queueName
  106.                                 ,bool exclusive
  107.                                 ,bool durable
  108.                                 ,bool autoDelete
  109.                                 ,google::protobuf::Map<std::string, std::string>& args)
  110.         {
  111.             //构造请求
  112.             DeclareMessageQueueRequest req;
  113.             req.set_rid(UUIDHelper::uuid());
  114.             req.set_cid(_cid);
  115.             req.set_queue_name(queueName);
  116.             req.set_exclusive(exclusive);
  117.             req.set_durable(durable);
  118.             req.set_auto_delete(autoDelete);
  119.             req.mutable_args()->swap(args);
  120.             //发送请求并等待
  121.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  122.             auto rsp=waitBasicResponse(req.rid());
  123.             if(rsp->ok()==false)
  124.             {
  125.                 ELOG("声明消息队列失败");
  126.             }
  127.             return rsp->ok();
  128.         }
  129.         //删除消息队列
  130.         bool deleteMessageQueue(const std::string& queueName)
  131.         {
  132.             //构造请求
  133.             DeleteMessageQueueRequest req;
  134.             req.set_rid(UUIDHelper::uuid());
  135.             req.set_cid(_cid);
  136.             req.set_queue_name(queueName);
  137.             //发送请求并等待
  138.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  139.             auto rsp=waitBasicResponse(req.rid());
  140.             if(rsp->ok()==false)
  141.             {
  142.                 ELOG("删除消息队列失败");
  143.             }
  144.             return rsp->ok();
  145.         }
  146.         //绑定
  147.         bool bind(const std::string& exchangeName
  148.                     ,const std::string& queueName
  149.                     ,const std::string& bindingKey)
  150.         {
  151.             //构造请求
  152.             BindRequest req;
  153.             req.set_rid(UUIDHelper::uuid());
  154.             req.set_cid(_cid);
  155.             req.set_exchange_name(exchangeName);
  156.             req.set_queue_name(queueName);
  157.             req.set_binding_key(bindingKey);
  158.             //发送请求并等待
  159.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  160.             auto rsp=waitBasicResponse(req.rid());
  161.             if(rsp->ok()==false)
  162.             {
  163.                 ELOG("绑定失败");
  164.             }
  165.             return rsp->ok();
  166.         }
  167.         //解绑
  168.         bool unbind(const std::string& exchangeName
  169.                     ,const std::string& queueName)
  170.         {
  171.             //构造请求
  172.             BindRequest req;
  173.             req.set_rid(UUIDHelper::uuid());
  174.             req.set_cid(_cid);
  175.             req.set_exchange_name(exchangeName);
  176.             req.set_queue_name(queueName);
  177.             //发送请求并等待
  178.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  179.             auto rsp=waitBasicResponse(req.rid());
  180.             if(rsp->ok()==false)
  181.             {
  182.                 ELOG("解绑失败");
  183.             }
  184.             return rsp->ok();
  185.         }
  186.         //订阅
  187.         bool basicConsume(const std::string& consumerTag
  188.                         ,const std::string& queueName
  189.                         ,bool autoAck
  190.                         ,const ConsumerCallBack& cb)
  191.         {
  192.             //构造请求
  193.             if(_consumer.get()!=nullptr)
  194.             {
  195.                 DLOG("已订阅,不可重复订阅")
  196.                 return false;
  197.             }
  198.             BasicConsumeRequest req;
  199.             req.set_rid(UUIDHelper::uuid());
  200.             req.set_cid(_cid);
  201.             req.set_consumer_tag(consumerTag);
  202.             req.set_queue_name(queueName);
  203.             req.set_auto_ack(autoAck);
  204.             //发送请求并等待
  205.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  206.             auto rsp=waitBasicResponse(req.rid());
  207.             if(rsp->ok()==false)
  208.             {
  209.                 ELOG("订阅失败");
  210.                 return false;
  211.             }
  212.             //订阅成功,要构造对应的消费者
  213.             _consumer=std::make_shared<Consumer>(consumerTag,queueName,autoAck,cb);
  214.             return rsp->ok();
  215.         }
  216.         //取消订阅
  217.         bool basicCancel()
  218.         {
  219.             if(_consumer.get()==nullptr)
  220.             {
  221.                 return true;
  222.             }
  223.             //构造请求
  224.             BasicCancelRequest req;
  225.             req.set_rid(UUIDHelper::uuid());
  226.             req.set_cid(_cid);
  227.             req.set_consumer_tag(_consumer->tag);
  228.             req.set_queue_name(_consumer->qname);
  229.             //发送请求并等待
  230.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  231.             auto rsp=waitBasicResponse(req.rid());
  232.             if(rsp->ok()==false)
  233.             {
  234.                 ELOG("取消订阅失败");
  235.                 return false;
  236.             }
  237.             //取消订阅时清理消费者对象
  238.             _consumer.reset();
  239.             return rsp->ok();
  240.         }
  241.         //发布消息
  242.         bool basicPublish(const std::string& exchangeName
  243.                             ,const std::string& body
  244.                             ,BasicProperties* bp)
  245.         {
  246.             //构造请求
  247.             BasicPublishRequest req;
  248.             req.set_rid(UUIDHelper::uuid());
  249.             req.set_cid(_cid);
  250.             req.set_exchange_name(exchangeName);
  251.             req.set_body(body);
  252.             if(bp!=nullptr)
  253.             {
  254.                 req.mutable_properties()->set_id(bp->id());
  255.                 req.mutable_properties()->set_deliverymode(bp->deliverymode());
  256.                 req.mutable_properties()->set_routing_key(bp->routing_key());
  257.             }
  258.             //发送请求并等待
  259.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  260.             auto rsp=waitBasicResponse(req.rid());
  261.             if(rsp->ok()==false)
  262.             {
  263.                 ELOG("发布消息失败");
  264.             }
  265.             return rsp->ok();
  266.         }
  267.         //确认消息
  268.         bool basicAck(const std::string& mid)
  269.         {
  270.             if(_consumer.get()==nullptr)
  271.             {
  272.                 ELOG("消息确认时,没有找到消费者")
  273.                 return false;
  274.             }
  275.             //构造请求
  276.             BasicAckRequest req;
  277.             req.set_rid(UUIDHelper::uuid());
  278.             req.set_cid(_cid);
  279.             req.set_queue_name(_consumer->qname);
  280.             req.set_mid(mid);
  281.             //发送请求并等待
  282.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  283.             auto rsp=waitBasicResponse(req.rid());
  284.             if(rsp->ok()==false)
  285.             {
  286.                 ELOG("确认消息失败");
  287.             }
  288.             return rsp->ok();
  289.         }
  290.     public:
  291.         //收到基础响应时,向hash map中添加响应
  292.         void putBasicResponse(const BasicCommonResponsePtr& rsp)
  293.         {
  294.             std::unique_lock<std::mutex> lock(_mutex);
  295.             _basicRsp.insert(std::make_pair(rsp->rid(),rsp));
  296.             _cond.notify_all();//唤醒所有等待线程,只有满足条件的才能竞争锁
  297.         }
  298.         //收到消息推送后,需要信道找到对应的消费者,通过回调函数进行处理
  299.         void consume(const BasicConsumeResponsePtr& rsp)
  300.         {
  301.             if(_consumer.get()==nullptr)
  302.             {
  303.                 ELOG("消费者不存在");
  304.                 return;
  305.             }
  306.             if(rsp->consumer_tag()!=_consumer->tag)
  307.             {
  308.                 ELOG("收到错误的响应,消费者标志匹配失败");
  309.                 return;
  310.             }
  311.             //执行回调函数
  312.             _consumer->callback(rsp->consumer_tag(),rsp->mutable_properties(),rsp->body());
  313.         }
  314.     private:
  315.         //等待响应
  316.         BasicCommonResponsePtr waitBasicResponse(const std::string& rid)
  317.         {
  318.             std::unique_lock<std::mutex> lock(_mutex);
  319.             _cond.wait(lock,[&rid,this](){
  320.                 return _basicRsp.find(rid)!=_basicRsp.end();//等到收到了对应的响应才退出
  321.             });
  322.             //拿到响应,然后再删除
  323.             BasicCommonResponsePtr rsp=_basicRsp[rid];
  324.             _basicRsp.erase(rid);
  325.             return rsp;
  326.         }
  327.     private:
  328.         std::string _cid;  //信道id
  329.         muduo::net::TcpConnectionPtr _conn;  //信道关联的网络通信连接
  330.         ProtobufCodecPtr _codec;  //protobuf协议处理器
  331.         Consumer::ptr _consumer;  //信道对应的消费者,当确定了信道角色后构造(订阅操作后)
  332.         std::mutex _mutex;
  333.         std::condition_variable _cond;
  334.         std::unordered_map<std::string,BasicCommonResponsePtr> _basicRsp;  //<请求id,响应>
  335.     };
  336.     class ChannelManager
  337.     {
  338.     public:
  339.         using ptr=std::shared_ptr<ChannelManager>;
  340.         ChannelManager()
  341.         {
  342.             DLOG("new ChannelManager: %p",this);
  343.         }
  344.         ~ChannelManager()
  345.         {
  346.             DLOG("delete ChannelManager: %p",this);
  347.         }
  348.         //打开信道
  349.         Channel::ptr createChannel(const muduo::net::TcpConnectionPtr& conn
  350.             ,const ProtobufCodecPtr& codec)
  351.         {
  352.             std::unique_lock<std::mutex> lock(_mutex);
  353.             //队列是否已存在?不需要担心,因为创建信道会自动生成id
  354.             //构造信道对象
  355.             Channel::ptr chp=std::make_shared<Channel>(conn,codec);
  356.             //添加管理
  357.             _channels[chp->cid()]=chp;
  358.             return chp;
  359.         }
  360.         void deleteChannel(const std::string& cid)//关闭信道
  361.         {
  362.             std::unique_lock<std::mutex> lock(_mutex);
  363.             //队列是否不存在
  364.             auto it=_channels.find(cid);
  365.             if(it==_channels.end())
  366.             {
  367.                 return;
  368.             }
  369.             //删除信道
  370.             _channels.erase(cid);
  371.         }
  372.         Channel::ptr getChannel(const std::string& cid)
  373.         {
  374.             std::unique_lock<std::mutex> lock(_mutex);
  375.             //队列是否不存在
  376.             auto it=_channels.find(cid);
  377.             if(it==_channels.end())
  378.             {
  379.                 return Channel::ptr();
  380.             }
  381.             return it->second;
  382.         }
  383.     private:
  384.         std::mutex _mutex;
  385.         std::unordered_map<std::string,Channel::ptr> _channels;
  386.     };
  387. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
  • 330.
  • 331.
  • 332.
  • 333.
  • 334.
  • 335.
  • 336.
  • 337.
  • 338.
  • 339.
  • 340.
  • 341.
  • 342.
  • 343.
  • 344.
  • 345.
  • 346.
  • 347.
  • 348.
  • 349.
  • 350.
  • 351.
  • 352.
  • 353.
  • 354.
  • 355.
  • 356.
  • 357.
  • 358.
  • 359.
  • 360.
  • 361.
  • 362.
  • 363.
  • 364.
  • 365.
  • 366.
  • 367.
  • 368.
  • 369.
  • 370.
  • 371.
  • 372.
  • 373.
  • 374.
  • 375.
  • 376.
  • 377.
  • 378.
  • 379.
  • 380.
  • 381.
  • 382.
  • 383.
  • 384.
  • 385.
  • 386.
  • 387.
  • 388.
  • 389.
                       3、异步工作线程池模块

     客⼾端这边存在两个异步⼯作线程,
     

  • ⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread,
  • ⼀个是当收到消息后进⾏异步处理的⼯作线程池。
     这两项都不是以连接为单元进⾏创建的,⽽是创建后,可以⽤以多个连接中,因此单独进⾏封装。
                                   登录后复制                        
  1. #pragma once
  2. #include "muduo/net/EventLoopThreadPool.h"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/threadpool.hpp"
  5. namespace lsxmq
  6. {
  7.     class AsyncWorker
  8.     {
  9.     public:
  10.         using ptr=std::shared_ptr<AsyncWorker>;
  11.         muduo::net::EventLoopThreadPool _loopthread;
  12.         ThreadPool _pool;
  13.     };
  14. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
                       4、连接管理模块

     在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作头脑转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
     这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。
                                   登录后复制                        
  1. #pragma once
  2. #include "muduo/proto/dispatcher.h"
  3. #include "muduo/proto/codec.h"
  4. #include "muduo/base/Logging.h"
  5. #include "muduo/base/Mutex.h"
  6. #include "muduo/net/EventLoop.h"
  7. #include "muduo/net/TcpClient.h"
  8. #include "muduo/base/CountDownLatch.h"
  9. #include "muduo/net/EventLoopThread.h"
  10. #include "worker.hpp"
  11. #include "channel.hpp"
  12. #include <string>
  13. namespace lsxmq
  14. {
  15.     typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  16.     class Connection
  17.     {
  18.     public:
  19.     using ptr=std::shared_ptr<Connection>;
  20.     Connection(const std::string &serverIp, uint16_t serverPort,const AsyncWorker::ptr& worker)
  21.             : _latch(1)
  22.             ,_worker(worker)
  23.             , _client(_worker->_loopthread.startLoop()
  24.                 , muduo::net::InetAddress(serverIp, serverPort), "ProtobufClient")
  25.             , _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1
  26.                 , std::placeholders::_2, std::placeholders::_3))
  27.             , _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher
  28.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
  29.             ,_channels(std::make_shared<ChannelManager>())
  30.         {
  31.             // 注册处理函数
  32.             _dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::onBasicCommonResponse, this
  33.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  34.             _dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::onBasicConsumeResponse, this
  35.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  36.             // 客户端注册回调函数
  37.             _client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
  38.             _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get()
  39.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  40.                
  41.             //连接服务器
  42.             _client.connect();
  43.             _latch.wait(); // 等待连接服务器成功
  44.         }
  45.         //打开信道
  46.         Channel::ptr openChannel()
  47.         {
  48.             //构造一个信道
  49.             auto channel=_channels->createChannel(_conn,_codec);
  50.             //打开信道
  51.             auto ret=channel->openChannel();
  52.             if(ret==false)
  53.             {
  54.                 ELOG("打开信道失败");
  55.                 _channels->deleteChannel(channel->cid());
  56.                 return Channel::ptr();
  57.             }
  58.             return channel;
  59.         }
  60.         //关闭信道
  61.         bool closeChannel(const Channel::ptr& channel)
  62.         {
  63.             auto ret=channel->closeChannel();
  64.             if(ret==false)
  65.             {
  66.                 ELOG("关闭信道失败");
  67.                 return false;
  68.             }
  69.             _channels->deleteChannel(channel->cid());
  70.             return true;
  71.         }
  72.     private:
  73.         //收到普通响应
  74.         void onBasicCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &message, muduo::Timestamp)
  75.         {
  76.             //1.获取信道
  77.             auto channel=_channels->getChannel(message->cid());
  78.             if(channel.get()==nullptr)
  79.             {
  80.                 ELOG("处理普通响应失败,因为获取信道失败");
  81.                 return;
  82.             }
  83.             //2.将响应加入hash map
  84.             channel->putBasicResponse(message);
  85.         }
  86.         //收到推送消息
  87.         void onBasicConsumeResponse(const muduo::net::TcpConnectionPtr &conn,const BasicConsumeResponsePtr& message, muduo::Timestamp)
  88.         {
  89.             //1.获取信道
  90.             auto channel=_channels->getChannel(message->cid());
  91.             if(channel.get()==nullptr)
  92.             {
  93.                 ELOG("处理普通响应失败,因为获取信道失败");
  94.                 return;
  95.             }
  96.             //2.对收到的消息进行处理,由工作线程池完成
  97.             auto task=std::bind(&Channel::consume,channel.get(),message);
  98.             _worker->_pool.push(task);
  99.         }
  100.         void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp)
  101.         {
  102.             LOG_INFO<<message->GetTypeName();
  103.             //收到了未定义的消息,断开连接
  104.             conn->shutdown();
  105.         }
  106.         void onConnection(const muduo::net::TcpConnectionPtr &conn)
  107.         {
  108.             if (conn->connected())
  109.             {
  110.                 LOG_INFO << "连接成功";
  111.                 _conn = conn;
  112.                 // 解除阻塞
  113.                 _latch.countDown();
  114.             }
  115.             else
  116.             {
  117.                 LOG_INFO << "断开连接";
  118.             }
  119.         }
  120.     private:
  121.         muduo::CountDownLatch _latch;            // 用于同步等待连接服务器成功
  122.         AsyncWorker::ptr _worker;  //异步工作线程
  123.         muduo::net::TcpClient _client;           // 客户端
  124.         ProtobufDispatcher _dispatcher;          // 派发器
  125.         ProtobufCodecPtr _codec;                    // protobuf协议处理器
  126.         muduo::net::TcpConnectionPtr _conn;
  127.         ChannelManager::ptr _channels;  //信道管理句柄
  128.     };
  129. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
                       综合测试

     广播交换模式下:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::FANOUT,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","queue1");
  33.     channel->bind("exchange1","queue2","news.music.#");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::FANOUT,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","queue1");
  21.     channel->bind("exchange1","queue2","news.music.#");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=11;i<=20;++i)
  24.     {
  25.         //交换机类型是广播交换,所以不需要BasicProperties
  26.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),nullptr);
  27.     }
  28.     //5.关闭信道
  29.         conn->closeChannel(channel);
  30.     return 0;
  31. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
                       直接交换模式:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::DIRECT,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","queue1");
  33.     channel->bind("exchange1","queue2","queue2");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::DIRECT,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","queue1");
  21.     channel->bind("exchange1","queue2","queue2");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=1;i<=10;++i)
  24.     {
  25.         lsxmq::BasicProperties bp;
  26.         bp.set_id(lsxmq::UUIDHelper::uuid());
  27.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  28.         bp.set_routing_key("queue1");
  29.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  30.     }
  31.     for(int i=11;i<=15;++i)
  32.     {
  33.         lsxmq::BasicProperties bp;
  34.         bp.set_id(lsxmq::UUIDHelper::uuid());
  35.         bp.set_deliverymode(lsxmq::DeliveryMode::UNDURABLE);
  36.         bp.set_routing_key("queue2");
  37.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  38.     }
  39.     //5.关闭信道
  40.         conn->closeChannel(channel);
  41.     return 0;
  42. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
                       主题交换模式:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::TOPIC,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","sport.#");
  33.     channel->bind("exchange1","queue2","music.#");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::TOPIC,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","sport.#");
  21.     channel->bind("exchange1","queue2","music.#");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=1;i<=10;++i)
  24.     {
  25.         lsxmq::BasicProperties bp;
  26.         bp.set_id(lsxmq::UUIDHelper::uuid());
  27.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  28.         bp.set_routing_key("music.pop");
  29.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  30.     }
  31.     for(int i=1;i<=5;++i)
  32.     {
  33.         lsxmq::BasicProperties bp;
  34.         bp.set_id(lsxmq::UUIDHelper::uuid());
  35.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  36.         bp.set_routing_key("music.pop.ajc");
  37.         channel->basicPublish("exchange1","good-morning-"+std::to_string(i),&bp);
  38.     }
  39.     for(int i=1;i<=5;++i)
  40.     {
  41.         lsxmq::BasicProperties bp;
  42.         bp.set_id(lsxmq::UUIDHelper::uuid());
  43.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  44.         bp.set_routing_key("news.gassip");
  45.         channel->basicPublish("exchange1","have-nice-day-"+std::to_string(i),&bp);
  46.     }
  47.     //5.关闭信道
  48.         conn->closeChannel(channel);
  49.     return 0;
  50. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表