ToB企服应用市场:ToB评测及商务社交产业平台

标题: 模拟RabbitMQ实现消息队列 [打印本页]

作者: 老婆出轨    时间: 7 天前
标题: 模拟RabbitMQ实现消息队列
代码地址: https://gitee.com/lv-shixiong/linux_test/tree/master/mq 
     项目介绍

     壅闭队列实现生产者消费者模型:
     可以或许解耦合,将数据的生产和消费分开
     支持并发
     支持忙闲不均
     消息队列就是把壅闭队列封装成独立的服务器程序,让它运行到某一台主机上,发消息到该主机,从该主机拉取消息(跨主机的生产者消费者模型)
     实现消息队列服务器,发布客户端,订阅客户端
     

     开发环境

     ubuntu-22.04、VSCode、g++、Makefile
     技能选型

     主开发语言:C++
     序列化框架:Protobuf二进制序列化
     网络通讯:自界说应用层协议+muduo库:对tcp长连接的封装,并且使用epoll的事件驱动模式,实现高并发服务器与客户端
     源数据信息数据库:SQLite3
     单元测试框架:Gtest
     环境搭建

     1、安装wget
     2、安装protobuf
     在 /usr/local/google里找protobuf
     3、安装muduo
     安装好的程序在../build
     Protobuf

     是序列化反序列化的框架
     特点:语言无关、平台无关;高效;扩展性、兼容性好。
     使用方法:
     
     语法规范:
     
     语法:
     指定proto3语法,syntax = "proto3"。
     声明命名空间,package 声明符,示.proto文件的命名空间。
     界说结构对象描述message 消息体名称{},名称用驼峰
     界说消息字段 字段范例 字段名 = 字段唯一编号
     

     

     编译:protoc --cpp_out=路径 proto文件名称
     protoc --cpp_out=. contacts.proto
                                   登录后复制                        
  1. syntax= "proto3";//声明语法版本
  2. package contacts;//声明代码的命名空间
  3. message contact{
  4.     //各个字段  字段类型 字段名 = 字段唯一编号
  5.     uint64 sn = 1;
  6.     string name = 2;
  7.     float score = 3;
  8. };
复制代码
      
                                                     登录后复制                        
  1. main:main.cpp contacts.pb.cc
  2.         g++ -o $@ $^ -std=c++11 -lprotobuf
复制代码
      
                                                     登录后复制                        
  1. #include "contacts.pb.h"
  2. #include <iostream>
  3. int main()
  4. {
  5.     //创建通讯录对象
  6.     contacts::contact con;
  7.     //添加字段
  8.     con.set_name("Nesta");
  9.     con.set_sn(13);
  10.     con.set_score(99);
  11.     //序列化
  12.     std::string str=con.SerializeAsString();
  13.     //反序列化
  14.     contacts::contact conR;
  15.     conR.ParseFromString(str);
  16.     //打印信息
  17.     std::cout<<"name="<<conR.name()<<std::endl;
  18.     std::cout<<"sn="<<conR.sn()<<std::endl;
  19.     std::cout<<"socre="<<conR.score()<<std::endl;
  20.     return 0;
  21. }
复制代码
      
                       Muduo

     muduo库是一个基于reactor模型的高性能服务器框架。
     什么是reactor?基于时间触发的模型(基于epoll进行IO事件监控)
     主从reactor模型:将IO事件监控进行进一步的层次分别。
     主reactor:只对新建连接事件进行监控(保证不受IO壅闭影响实现高效的新建连接获取)
     从reactor:针对新建连接进行IO事件监控(进行IO操作和业务处理)
     主从reactor必然是一个多执行流的并发模式---one  thread one loop,一个事件监控,占据一个线程,进行事件监控。
     

     muduo::net::TcpServer类
     muduo::net::EventLoop类
     muduo::net::TcpConnection类
     muduo::net::Buffer类
     Server:
                                   登录后复制                        
  1. #include "./include/muduo/net/TcpServer.h"
  2. #include "./include/muduo/net/EventLoop.h"
  3. #include "./include/muduo/net/TcpConnection.h"
  4. #include <iostream>
  5. #include <string>
  6. #include <functional>
  7. #include <unordered_map>
  8. class TranslateServer
  9. {
  10. public:
  11.     TranslateServer(uint16_t port)
  12.         :_server(&_loop
  13.         ,muduo::net::InetAddress("0.0.0.0",8888)
  14.         ,"translateServer"
  15.         ,muduo::net::TcpServer::kNoReusePort)
  16.         {
  17.             //注册回调函数
  18.             auto ccb=std::bind(&TranslateServer::OnConnection,this,std::placeholders::_1);
  19.             auto mcb=std::bind(&TranslateServer::OnMessage,this,std::placeholders::_1
  20.             ,std::placeholders::_2,std::placeholders::_3);
  21.             _server.setConnectionCallback(ccb);
  22.             _server.setMessageCallback(mcb);
  23.             _server.setConnectionCallback(ccb);
  24.             _server.setMessageCallback(mcb);
  25.         }
  26.     void Start()//启动服务器
  27.     {
  28.         
  29.         _server.start();
  30.         _loop.loop();//这是一个死循环
  31.     }
  32. private:
  33.     void OnConnection(const muduo::net::TcpConnectionPtr& conn)
  34.     {
  35.         //对连接的回调
  36.         //建立连接和断开连接时都会触发
  37.         if(conn->connected())//确定是否连接成功
  38.         {
  39.             std::cout<<"连接成功"<<std::endl;
  40.         }
  41.         else
  42.         {
  43.             std::cout<<"连接失败"<<std::endl;
  44.         }
  45.     }
  46.     std::string Translate(std::string key)
  47.     {
  48.         std::unordered_map<std::string,std::string> dict={
  49.             {"你好","hello"}
  50.             ,{"hello","你好"}
  51.             ,{"world","世界"}
  52.             ,{"世界","world"}
  53.         };//字典
  54.         auto it=dict.find(key);
  55.         if(it!=dict.end())//找到了
  56.         {
  57.             return it->second;
  58.         }
  59.         else//没找到
  60.         {
  61.             return "不认识这个词语";
  62.         }
  63.     }
  64.     void OnMessage(const muduo::net::TcpConnectionPtr& ptr,muduo::net::Buffer* buffer,muduo::Timestamp)
  65.     {
  66.         //对消息的回调
  67.         //1.从缓冲区里拿到数据
  68.         std::string key=buffer->retrieveAllAsString();
  69.         //2.翻译
  70.         std::string val=Translate(key);
  71.         //3.将翻译结果发回客户端
  72.         muduo::net::Buffer message;
  73.         message.append(val);
  74.         ptr->send(&message);
  75.     }
  76.     //loop是对epool的事件监听,触发事件后进行IO操作
  77.     muduo::net::EventLoop _loop;
  78.     //_server主要用于设置回调函数,告诉服务器收到什么消息进行怎样的处理
  79.     muduo::net::TcpServer _server;
  80. };
  81. int main()
  82. {
  83.     TranslateServer svr(8888);
  84.     svr.Start();
  85.     return 0;
  86. }
复制代码
      
                       Client
                                   登录后复制                        
  1. #include "include/muduo/net/TcpClient.h"
  2. #include "include/muduo/net/Buffer.h"
  3. #include "include/muduo/net/EventLoopThread.h"
  4. #include "include/muduo/net/InetAddress.h"
  5. #include "include/muduo/base/CountDownLatch.h"
  6. #include <string>
  7. #include <iostream>
  8. #include <functional>
  9. class TranslateClient
  10. {
  11. public:
  12.     TranslateClient(std::string serverIp,uint16_t serverPort)
  13.         :_client(_loopThread.startLoop()
  14.         ,muduo::net::InetAddress(serverIp,serverPort)
  15.         ,"translateClient")
  16.         ,_latch(1)//一定要设为1
  17.     {
  18.         //注册回调函数
  19.         auto ccb=std::bind(&TranslateClient::OnConnection,this,std::placeholders::_1);
  20.         auto mcb=std::bind(&TranslateClient::OnMessage,this
  21.         ,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  22.         
  23.         _client.setConnectionCallback(ccb);
  24.         _client.setMessageCallback(mcb);
  25.         std::cout<<"初始化完成"<<std::endl;
  26.     }
  27.     void Connect()//连接服务器
  28.     {
  29.         _client.connect();//进行连接
  30.         //之所以要阻塞就是为了方式没有连接成功就进行了其他操作,导致错误
  31.         std::cout<<"开始连接"<<std::endl;
  32.         _latch.wait();//阻塞等待连接成功
  33.     }
  34.     void Send(std::string& message)//发送消息
  35.     {
  36.         if(_conn->connected())//确认连接成功再进行IO
  37.         {
  38.             _conn->send(message);
  39.         }
  40.         
  41.     }
  42. private:
  43.     void OnConnection(const muduo::net::TcpConnectionPtr& conn)
  44.     {
  45.         if(conn->connected())//连接上了
  46.         {
  47.             _latch.countDown();//解除阻塞
  48.             _conn=conn;//保存连接
  49.         }
  50.         else
  51.         {
  52.             //......
  53.         }
  54.     }
  55.     void OnMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buffer,muduo::Timestamp ts)
  56.     {
  57.         //来消息了就打印出来
  58.         std::string message=buffer->retrieveAllAsString();
  59.         std::cout<<"Server Say> "<<message<<std::endl;
  60.     }
  61. private:
  62.     muduo::CountDownLatch _latch;//用于同步
  63.     muduo::net::EventLoopThread _loopThread;//创建线程进行IO
  64.     muduo::net::TcpClient _client;
  65.     muduo::net::TcpConnectionPtr _conn;//客户端与服务器的连接
  66. };
  67. int main()
  68. {
  69.     TranslateClient cli("127.0.0.1",8888);
  70.     cli.Connect();//建立连接
  71.     while(1)
  72.     {
  73.         std::string message;
  74.         std::cin>>message;
  75.         cli.Send(message);
  76.     }
  77.     return 0;
  78. }
复制代码
      
                       上面的服务器与客户端没有协议,直接通讯。
     下面是基于protobuf来进行通讯。
     

     muduo-master/examples/protobuf/codec/codec.cc和codec.h以及dispatcher.h
     examples是一些例子,要学会参考别人的代码。
     

     SQLite3

     是什么?进程内的轻量级数据库,是本地的,不必要服务器,它给我们提供了一系列接口。
     官方文档: List Of SQLite Functions
                                   登录后复制                        
  1. sqlite3操作流程:0. 查看当前数据库在编译阶段是否启动了线程安全
  2.                 int sqlite3_threadsafe(); 0-未启⽤; 1-启⽤
  3.                 需要注意的是sqlite3是有三种安全等级的:
  4.                 1. ⾮线程安全模式--不使用任何锁,效率最高
  5.     2. 线程安全模式(不同的连接在不同的线程/进程间是安全的,即⼀个句柄不能⽤于多线程
  6.                 间)
  7.                 3. 串⾏化模式(可以在不同的线程/进程间使⽤同⼀个句柄)
  8. 1. 创建/打开数据库⽂件,并返回操作句柄
  9.                 int sqlite3_open(const char *filename, sqlite3 **ppDb) 成功返回SQLITE_OK
  10.                 //若在编译阶段启动了线程安全,则在程序运⾏阶段可以通过参数选择线程安全等级
  11.                 int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const
  12.                 char *zVfs );
  13.                 flag:
  14.                 SQLITE_OPEN_READWRITE -- 以可读可写⽅式打开数据库⽂件
  15.                 SQLITE_OPEN_CREATE -- 不存在数据库⽂件则创建
  16.                 SQLITE_OPEN_NOMUTEX--多线程模式,只要不同的线程使⽤不同的连接即可保证线程
  17.                 安全
  18.                 SQLITE_OPEN_FULLMUTEX--串⾏化模式
  19.                 返回:SQLITE_OK表⽰成功,否则有问题
  20. 2. 执⾏语句
  21.                 int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),
  22.                 void* arg, char **err)
  23.         err是错误信息
  24.         
  25.                 int (*callback)(void*,int,char**,char**)
  26.                 void* : 是设置的在回调时传⼊的arg参数
  27.                 int:⼀⾏中数据的列数
  28.                 char**:存储⼀⾏数据的字符指针数组
  29.                 char**:每⼀列的字段名称
  30.                 这个回调函数有个int返回值,成功处理的情况下必须返回0,返回⾮0会触发ABORT退出程序
  31.                 返回:SQLITE_OK表⽰成功
  32. 3. 销毁句柄
  33.                 int sqlite3_close(sqlite3* db); 成功返回SQLITE_OK
  34.                 int sqlite3_close_v2(sqlite3*); 推荐使⽤--⽆论如何都会返回SQLITE_OK
  35. 获取错误信息
  36.                 const char *sqlite3_errmsg(sqlite3* db);
复制代码
      
                       sqlite3 test.db//打开数据库文件
     .table//查看表
                                   登录后复制                        
  1. //封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成对数据增删改查操作
  2. //1.创建/打开数据库文件
  3. //2.针对打开的数据库进行操作
  4.     //2.1表的操作
  5.     //2.2数据的操作
  6. //3.关闭数据库
  7. #include <sqlite3.h>
  8. #include <string>
  9. #include <iostream>
  10. class SqliteHelper
  11. {
  12. public:
  13.     typedef int(*SqliteCallback)(void*,int,char**,char**);
  14.     SqliteHelper(std::string dbfile)
  15.         :_dbfile(dbfile)
  16.     {}
  17.     bool open(int safeLevel=SQLITE_OPEN_FULLMUTEX)//打开数据库文件
  18.     {
  19.         //默认安全级别为串行化模式
  20.         //int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs )
  21.         int ret=sqlite3_open_v2(_dbfile.c_str(),&_handler,SQLITE_OPEN_CREATE|SQLITE_OPEN_READWRITE|safeLevel,nullptr);
  22.         if(ret!=SQLITE_OK)
  23.         {
  24.             std::cout<<"创建/打开数据库文件失败:"<<sqlite3_errmsg(_handler)<<std::endl;
  25.             return false;
  26.         }
  27.         return true;
  28.     }
  29.     bool exec(const std::string& sql,SqliteCallback cb,void* arg)//进行操作
  30.     {
  31.         //int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),void* arg, char **err)
  32.         int ret=sqlite3_exec(_handler,sql.c_str(),cb,arg,nullptr);
  33.         if(ret!=SQLITE_OK)
  34.         {
  35.             std::cout<<sql<<"执行失败:"<<sqlite3_errmsg(_handler)<<std::endl;
  36.             return false;
  37.         }
  38.         return true;
  39.     }
  40.     void close()//关闭数据库文件
  41.     {
  42.         //int sqlite3_close_v2(sqlite3*)
  43.         sqlite3_close_v2(_handler);
  44.     }
  45. private:
  46.     sqlite3* _handler;//句柄
  47.     std::string _dbfile;//数据库文件名
  48. };
复制代码
      
                                                     登录后复制                        
  1. #include "sqlite.hpp"
  2. #include <vector>
  3. #include <cassert>
  4. int select_stu_callback(void* arg,int col_count,char** result,char** fields_name) {
  5.     std::vector<std::string> *arry = (std::vector<std::string>*)arg;
  6.     arry->push_back(result[0]);
  7.     return 0; //必须有!!!
  8. }
  9. int main()
  10. {
  11.     SqliteHelper helper("./test.db");
  12.     //1. 创建/打开库文件
  13.     assert(helper.open());
  14.     //2. 创建表(不存在则创建), 学生信息: 学号,姓名,年龄
  15.     const char *ct = "create table if not exists student(sn int primary key, name varchar(32), age int);";
  16.     assert(helper.exec(ct, nullptr, nullptr));
  17.     //3. 新增数据 , 修改, 删除, 查询
  18.     // const char *insert_sql = "insert into student values(1, '小明', 18), (2, '小黑', 19), (3, '小红', 18);";
  19.     // assert(helper.exec(insert_sql, nullptr, nullptr));
  20.     // const char *update_sql = "update student set name='张小明' where sn=1";
  21.     // assert(helper.exec(update_sql, nullptr, nullptr));
  22.     // const char *delete_sql = "delete from student where sn=3";
  23.     // assert(helper.exec(delete_sql, nullptr, nullptr));
  24.     const char *select_sql = "select name from student;";
  25.     std::vector<std::string> arry;
  26.     assert(helper.exec(select_sql, select_stu_callback, &arry));
  27.     for (auto &name : arry) {
  28.         std::cout << name << std::endl;
  29.     }
  30.     //4. 关闭数据库
  31.     helper.close();
  32.     return 0;
  33. }
复制代码
      
                       GTest

     使用:
     
     GTest中的断⾔的宏可以分为两类:
     
     常用断言:
                                   登录后复制                        
  1. // bool值检查
  2. ASSERT_TRUE(参数),期待结果是true
  3. ASSERT_FALSE(参数),期待结果是false
  4. //数值型数据检查
  5. ASSERT_EQ(参数1,参数2),传⼊的是需要⽐较的两个数 equal
  6. ASSERT_NE(参数1,参数2),not equal,不等于才返回true
  7. ASSERT_LT(参数1,参数2),less than,⼩于才返回true
  8. ASSERT_GT(参数1,参数2),greater than,⼤于才返回true
  9. ASSERT_LE(参数1,参数2),less equal,⼩于等于才返回true
  10. ASSERT_GE(参数1,参数2),greater equal,⼤于等于才返回true
复制代码
      
                       TEST(参数1,参数2){//......}
     参数1是测试名称,第二个参数是详细名称
     RUN_ALL_TESTS();//运行全部测试
                                   登录后复制                        
  1. #include<iostream>
  2. #include<gtest/gtest.h>
  3. int abs(int x)
  4. {
  5.         return x > 0 ? x : -x;
  6. }
  7. TEST(abs_test, test1)
  8. {
  9.         ASSERT_TRUE(abs(1) == 1) << "abs(1)=1";
  10.         ASSERT_TRUE(abs(-1) == 1);
  11.         ASSERT_FALSE(abs(-2) == -2);
  12.         ASSERT_EQ(abs(1),abs(-1));
  13.         ASSERT_NE(abs(-1),0);
  14.         ASSERT_LT(abs(-1),2);
  15.         ASSERT_GT(abs(-1),0);
  16.         ASSERT_LE(abs(-1),2);
  17.         ASSERT_GE(abs(-1),0);
  18. }
  19. int main(int argc,char *argv[])
  20. {
  21.         //将命令⾏参数传递给gtest
  22.         testing::InitGoogleTest(&argc, argv);
  23.         // 运⾏所有测试案例
  24.         return RUN_ALL_TESTS();
  25. }
复制代码
      
                       

     全局事件:针对整个测试程序。实现全局的事件机制,必要创建⼀个⾃⼰的类,然后继承 testing::Environment类,然后分别实现成员函数 SetUp 和 TearDown ,同时在main函数内进⾏调⽤ testing::AddGlobalTestEnvironment(new MyEnvironment); 函数添加全局的事件机制
     全局测试套件,其实就是用户本身界说一个全局的测试环境类
                                   登录后复制                        
  1. #include <gtest/gtest.h>
  2. #include <iostream>
  3. #include <unordered_map>
  4. //全局事件:针对整个测试程序,提供全局事件机制,能够在测试之前配置测试环境数据,测试完毕后清理数据
  5. //先定义环境类,通过继承testing::Environment的派⽣类来完成
  6. //重写的虚函数接⼝SetUp会在测试之前被调⽤; TearDown会在测试完毕后调⽤.
  7. std::unordered_map<std::string,std::string> dict;
  8. class MyMapTest:public testing::Environment
  9. {
  10. public:
  11.     virtual void SetUp() override
  12.     {
  13.         std::cout<<"所有单元开始测试"<<std::endl;
  14.         dict.insert(std::make_pair("hello","你好"));
  15.         dict.insert(std::make_pair("世界","world"));
  16.     }
  17.     virtual void TearDown() override
  18.     {
  19.         std::cout<<"所有单元测试结束"<<std::endl;
  20.         dict.clear();
  21.     }
  22. };
  23. TEST(MyMapTest,test1)
  24. {
  25.     ASSERT_EQ(dict.size(),2);
  26.     dict.erase("hello");
  27. }
  28. TEST(MyMapTest,test2)
  29. {
  30.     ASSERT_EQ(dict.size(),2);
  31. }
  32. int main(int argc,char* argv[])
  33. {
  34.     testing::InitGoogleTest(&argc,argv);
  35.     testing::AddGlobalTestEnvironment(new MyMapTest());
  36.     RUN_ALL_TESTS();
  37.     return 0;
  38. }
复制代码
      
                       TestSuite事件:针对⼀个个测试套件。测试套件的事件机制我们同样必要去创建⼀个类,继承⾃testing::Test ,实现两个静态函数 SetUpTestCase 和 TearDownTestCase ,测试套件的事件机制不必要像全局事件机制⼀样在 main 注册,⽽是必要将我们平常使⽤的 TEST 宏改为 TEST_F 宏。
     
                                   登录后复制                        
  1. #include <iostream>
  2. #include <gtest/gtest.h>
  3. #include <unordered_map>
  4. class MyTest:public testing::Test
  5. {
  6. public:
  7.     static void SetUpTestCase()//所有单元测试开始时初始化
  8.     {
  9.         //假设有一个全局的变量,公共的测试数据就可以在这里初始化
  10.         std::cout<<"所有单元测试开始"<<std::endl;
  11.     }
  12.     virtual void SetUp() override//每次单元测试时初始化
  13.     {
  14.         //假设有一个全局的变量,在这里插入每个单元测试所需的独立的测试数据
  15.         std::cout<<"单元测试开始"<<std::endl;
  16.         mymap.insert(std::make_pair("hello","world"));  
  17.     }
  18.     virtual void TearDown() override//每次单元测试结束时清理
  19.     {
  20.         std::cout<<"单元测试结束"<<std::endl;
  21.         mymap.clear();
  22.     }
  23.     static void TearDownTestCase()//所有单元测试结束时清理
  24.     {
  25.         std::cout<<"所有单元测试结束"<<std::endl;
  26.     }
  27. public:
  28.     std::unordered_map<std::string,std::string> mymap;
  29. };
  30. //必须和类同名
  31. TEST_F(MyTest,test1)
  32. {
  33.     mymap.insert(std::make_pair("aaa","aaa"));
  34.     ASSERT_EQ(mymap.size(),2);
  35. }
  36. TEST_F(MyTest,test2)
  37. {
  38.     ASSERT_EQ(mymap.size(),1);
  39. }
  40. int main(int argc,char* argv[])
  41. {
  42.     testing::InitGoogleTest(&argc,argv);
  43.     RUN_ALL_TESTS();
  44.     return 0;
  45. }
复制代码
      
                       C++11异步操作实现线程池

     怎样获取线程池里的线程完成使命后的结果?
     std::future:获取使命结果
     std::future是C++11标准库中的⼀个模板类,它表⽰q⼀个异步操作的结果。当我们在多线程编程中使⽤异步使命时,std::future可以帮助我们在必要的时候获取使命的执⾏结果。std::future的⼀个重要特性是可以或许壅闭当前线程,直到异步操作完成,从⽽确保我们在获取结果时不会遇到未完成的操作。
     asyn
                                   登录后复制                        
  1. #include <iostream>
  2. #include <future>
  3. #include <thread>
  4. #include <chrono>
  5. int add(int num1,int num2)
  6. {
  7.     std::cout<<"aaaaaaaaaaaaaaaaaaaaa"<<std::endl;
  8.     std::cout<<"加法任务"<<std::endl;
  9.      std::this_thread::sleep_for(std::chrono::seconds(5));
  10.     std::cout<<"bbbbbbbbbbbbbbbbbbbbb"<<std::endl;
  11.     return num1+num2;
  12. }
  13. int main()
  14. {
  15.     //std::async(func, ...)      std::async(policy, func, ...)
  16.     //std::launch::deferred  在执行get获取异步结果的时候,才会执行异步任务
  17.     //std::launch::async   内部会创建工作线程,异步的完成任务
  18.     std::cout<<"----------1----------"<<std::endl;
  19.     std::future<int> result=std::async(std::launch::deferred,add,2,3);//把任务交它,它会创建一个工作线程来执行任务
  20.     std::this_thread::sleep_for(std::chrono::seconds(1));
  21.     std::cout<<"----------2----------"<<std::endl;
  22.     int ret=result.get();//此时如果任务还未完成,会阻塞等待
  23.     std::cout<<"----------3----------"<<std::endl;
  24.     std::cout<<"result="<<ret<<std::endl;
  25.     return 0;
  26. }
复制代码
      
                       promise
                                   登录后复制                        
  1. #include <future>
  2. #include <thread>
  3. #include <chrono>
  4. #include <iostream>
  5. //通过在线程中对promise对象设置数据,其他线程中通过future获取设置数据的方式实现获取异步任务执行结果的功能
  6. void add(int num1,int num2,std::promise<int>& prom)
  7. {
  8.     prom.set_value(num1+num2);
  9.     std::this_thread::sleep_for(std::chrono::seconds(3));
  10. }
  11. int main()
  12. {
  13.     std::promise<int> prom;
  14.     std::future<int> fut=prom.get_future();//二者从此绑定了关系
  15.     std::thread th(add,11,12,std::ref(prom));//创建线程执行任务
  16.     std::cout<<fut.get()<<std::endl;//get会进行同步,等任务完成了才得到结果
  17.     th.join();//等待线程
  18.     return 0;
  19. }
复制代码
      
                       packaged_task
                                   登录后复制                        
  1. #include <iostream>
  2. #include <future>
  3. #include <memory>
  4. #include <thread>
  5. //pakcaged_task的使用
  6. //pakcaged_task 是一个模板类,实例化的对象可以对一个函数进行二次封装,
  7. //pakcaged_task可以通过get_future获取一个future对象,来获取封装的这个函数的异步执行结果
  8. int add(int num1,int num2)
  9. {
  10.     return num1+num2;
  11. }
  12. int main()
  13. {
  14.     //task(11, 22);  task可以当作一个可调用对象来调用执行任务
  15.     //但是它又不能完全的当作一个函数来使用
  16.     //std::async(std::launch::async, task, 11, 22);
  17.     //std::thread thr(task, 11, 22);
  18.     //但是我们可以把task定义成为一个指针,传递到线程中,然后进行解引用执行
  19.     //但是如果单纯指针指向一个对象,存在生命周期的问题,很有可能出现风险
  20.     //思想就是在堆上new对象,用智能指针管理它的生命周期
  21.     auto pTask=std::make_shared<std::packaged_task<int(int,int)>>(add);//make_shared效率更高
  22.     std::future<int> fut=pTask->get_future();//建立同步关系
  23.     std::thread th([pTask](){
  24.         (*pTask)(1,2);
  25.     });
  26.     std::cout<<"结果为:"<<fut.get()<<std::endl;
  27.     th.join();
  28.     return 0;
  29. }
复制代码
      
                       实现线程池
     packaged_task+futur
                                   登录后复制                        
  1. #include <thread>
  2. #include <future>
  3. #include <vector>
  4. #include <mutex>
  5. #include <atomic>
  6. #include <condition_variable>
  7. #include <functional>
  8. #include <memory>
  9. #include <iostream>
  10. class ThreadPool
  11. {
  12. public:
  13.     using Func = std::function<void(void)>;
  14.     ThreadPool(int threadCount = 1) // 构造函数
  15.         : _stop(false)
  16.     {
  17.         for (int i = 0; i < threadCount; ++i) // 创建指定数量的线程
  18.         {
  19.             _threads.emplace_back(std::thread(&ThreadPool::entry, this));
  20.         }
  21.     }
  22.     ~ThreadPool() // 析构函数
  23.     {
  24.         stop();
  25.     }
  26.     template <class F, class... Args>
  27.     // func是用户传过来的函数,接下来就是不定参数
  28.     // push函数会把func封装成一个异步任务(packaged_task),抛入任务池中,由工作线程取出处理
  29.     // push函数会返回一个future对象,让用户能得到任务结果
  30.     auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))> // 用户添加任务
  31.     {
  32.         // 1.将用户发过来的函数封装成packaged_task
  33.         using retType = decltype(func(args...));
  34.         auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...); // 将参数绑定,注意使用完美转发
  35.         auto task = std::make_shared<std::packaged_task<retType()>>(tmp_func);
  36.         std::future<retType> fut = task->get_future();
  37.         // 2.构造匿名lambda表达式,里面执行task
  38.         {
  39.             std::unique_lock<std::mutex> lock(_mutex); // 加锁
  40.             // 将lambda表达式抛入任务池中
  41.             _tasks.push_back([task]()
  42.                            { (*task)(); });
  43.             _cond.notify_one(); // 唤醒一个线程
  44.         }
  45.         return fut;
  46.     }
  47.     void stop() // 线程池停止工作
  48.     {
  49.         // 0.如果已经停止
  50.         if (_stop == true)
  51.             return;
  52.         // 1.将停止标志设为true
  53.         _stop = true;
  54.         // 2.唤醒所有线程
  55.         _cond.notify_all();
  56.         // 3.等待所有线程
  57.         for (auto &th : _threads)
  58.         {
  59.             th.join();
  60.         }
  61.     }
  62. private:
  63.     // 线程入口函数,内部不断从任务池中取出任务执行
  64.     void entry()
  65.     {
  66.         while (!_stop)
  67.         {
  68.             std::vector<Func> tmpTasks;
  69.             {
  70.                 std::unique_lock<std::mutex> lock(_mutex); // 加锁
  71.                 _cond.wait(lock, [this]()
  72.                            { return _stop || !_tasks.empty(); }); // 等待唤醒
  73.                 tmpTasks.swap(_tasks);//取出任务
  74.             }
  75.             std::cout<<"线程开始执行任务"<<std::endl;
  76.             for(auto& task:tmpTasks)//执行任务
  77.             {
  78.                 task();
  79.             }
  80.         }
  81.     }
  82. private:
  83.     std::atomic<bool> _stop;           // 线程池是否停止运行
  84.     std::mutex _mutex;                 // 互斥
  85.     std::condition_variable _cond;     // 同步
  86.     std::vector<Func> _tasks;          // 任务池
  87.     std::vector<std::thread> _threads; // 线程池
  88. };
复制代码
      
                       需求分析

     1、焦点概念

     其中, Broker Server是最核⼼的部门, 负责消息的存储和转发。
     ⽽在AMQP(Advanced Message Queuing Protocol-⾼级消息队列协议,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,为⾯向消息的中间件设计,使得服从该规范的客⼾端应⽤和消息中间件服 务器的全功能互操作成为可能)模型中,也就是消息中间件服务器Broker中,⼜存在以下概念:
     
     所谓的 Exchange 和 Queue 可以理解成 "多对多" 关系, 和数据库中的 "多对多" ⼀样. 意思是:
     ⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
     ⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange)
     

     

     2、焦点API

     对于 Broker 来说, 要实现以下核⼼ API,通过这些 API 来实现消息队列的基本功能
     1. 创建交换机 (exchangeDeclare)
     2. 销毁交换机 (exchangeDelete)
     3. 创建队列 (queueDeclare)
     4. 销毁队列 (queueDelete)
     5. 创建绑定 (queueBind)
     6. 解除绑定 (queueUnbind)
     7. 发布消息 (basicPublish)
     8. 订阅消息 (basicConsume)
     9. 确认消息 (basicAck)
     10. 取消订阅 (basicCancel)
     另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型
     3、交换机范例

     对于 RabbitMQ 来说, 主要⽀持四种交换机范例:
     
     其中 Header 这种⽅式⽐较复杂, ⽐较少⻅。常⽤的是前三种交换机范例,项⽬中也主要实现这三种
     
     这三种操作就像给 qq 群发红包.
     Direct 是发⼀个专属红包, 只有指定的⼈能领.
     Fanout 是使⽤了魔法, 发⼀个 10 块钱红包, 群⾥的每个⼈都能领 10 块钱.
     Topic 是发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈, 才能领. 也是每个领到的⼈都能领 10 块钱.
     4、持久化

     Exchange, Queue, Binding, Message 等数据都有持久化需求
     当程序重启 / 主机重启, 保证上述内容不丢失
     5、网络通讯

     ⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器,通过⽹络进⾏通讯。
     在⽹络通讯的过程中, 客⼾端部门要提供对应的 api, 来实现对服务器的操作。
     1. 创建 Connection
     2. 关闭 Connection
     3. 创建 Channel
     4. 关闭 Channel
     5. 创建队列 (queueDeclare)
     6. 销毁队列 (queueDelete)
     7. 创建交换机 (exchangeDeclare)
     8. 销毁交换机 (exchangeDelete)
     9. 创建绑定 (queueBind)
     10. 解除绑定 (queueUnbind)
     11. 发布消息 (basicPublish)
     12. 订阅消息 (basicConsume)
     13. 确认消息 (basicAck)
     14. 取消订阅(basicCancel)
     可以看到, 在 Broker 的基础上, 客⼾端还要增长 Connection 操作和 Channel 操作
     
     ⼀个 Connection 中可以包含多个 Channel。Channel 和 Channel 之间的数据是独⽴的,不会相互⼲扰。如许做主要是为了可以或许更好的复⽤ TCP 连接, 达到⻓连接的结果, 避免频仍的创建关闭 TCP 连接。
     Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥详细的线缆.
     

     6、消息应答

     被消费的消息, 必要进⾏应答。应答模式分成两种:
     
     ⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.
     模块分别

     1、服务端模块

     1.1、数据管理模块

     1.1.1、交换机数据管理模块

     要管理的数据:描述了一个交换机应该有什么数据
     
           直接交换:binding_key与routing_key相同时,则将消息放入队列
      广播交换:将消息放入交换机绑定的全队伍列中
      主题交换:routing_key与多个绑定队列的binding_key有个匹配规则,匹配成功了则放入
         
     对交换机的管理操作:
     
     1.1.2、队列数据管理模块

     要管理的数据:
     
     提供的管理操作:
     
     一个队列如果持久化标志为false,则意味偏重启后,队列就没了,也就没有客户端可以或许订阅队列的消息,因此这个队列的消息如果持久化存储了,是没故意义的,因此通常一个队列的持久化标志是false,那么它的消息也就不必要持久化。
     1.1.3、绑定数据管理模块

     描述一下将哪个队列与哪个交换机绑定到了一起
     管理的数据:
     
           由数字、字符、_、#、。、*构成      binding_key:  news.music.#       消息中的routing_key:   news.music.pop
          管理的操作:
     
     1.1.4、消息数据管理模块

     
     
     
     
     以下是服务端为了管理所添加的信息
     
     
     
     
     
     
     
     
     以上四个模块分别实现数据的管理(增删查),以及持久化的存储
     1.2、假造机数据管理模块

     假造机其实就是交换机+队列+绑定+消息的逻辑单元。
     因此假造机的数据管理其实就是将以上四个模块的归并管理
     要管理的数据:
     
     要管理的操作:
     
     1.3、交换路由模块

     消息的发布,将一条消息发布到交换机上,由交换机决定放入哪些队列。而决定交给哪个队列,其中交换机范例起到了很大的作用(直接交换、广播交换、主题交换)
     直接交换和广播交换头脑都较为简朴,而主题交换涉及到了一个规则匹配的流程。交换路由模块就是专门做匹配过程的。
     在每个队列和交换机的绑定信息中,都有一个binding_key,这是队列发布的匹配规则。
     在每条要发布的消息中,都有一个routing_key,这是消息的发布规则。
     交换机有三种交换范例:直接、广播、主题。
     
     交换路由模块本质上来说,没有要管理的数据,只有向外提供的路由匹配操作:
     
     1.4、消费者管理模块

     消费者指的是订阅了一个队列消息的客户端,一旦这个队列有了消息就会推送给这个客户端。在焦点API中有一个订阅消息的服务,这里的订阅不是订阅了某条消息,而是订阅了某个队列的消息。当前主要实现了消息推送功能,因此一旦有了消息就要可以或许找到消费者相干的信息(消费者对应的信道)。
     消费者信息:
     
     消费者管理:
     
     
     
     
     
     
     
     1.5、信道管理模块

     信道是网络通讯中的一个概念,叫做通讯通道。
     网络通行的时候,必然都是通过网络通讯连接来完成的,为了可以或许更加充分地利用资源,因此对通讯连接又进行了进一步的细化,细化出了通讯信道。
     对于用户来说,一个通讯信道,就是进行网络通讯的载体,而一个真正的通讯连接,可以创建出多个通讯信道。
     每一个信道之间,在用户眼中是相互独立的,而在本质的底层它们使用同一个通讯连接进行网络服务。
     一个连接可能会对应有多个通讯信道。一旦某个客户端要关闭通讯,关闭的不是连接,而是本身对应通讯通道。,关闭信道就必要将客户端的订阅取消。
     信道提供的服务操作:
     
     信道要管理的数据:
     
     信道的管理:
     
     1.6、连接管理模块

     就是一个网络通讯对应的连接。
     使用muduo库来实现底层通讯,muduo库中本身就有Connection连接的概念和对象类。但是我们在连接中,尚有一个上层通讯信道的概念,这个概念在muduo库中时没有的。因此,我们必要在用户层面,对这个muduo库的Connection连接进行二次封装。
     当一个连接要关闭时,就应该把连接关联的信道全部关闭,因此也要管理关联的信道。形成我们本身所必要的连接管理。
     管理数据:
     
     连接提供的操作:
     
     管理的操作:
     
     1.7、服务端BrokerServer模块

     对以上全部模块的整合,整合成一个服务器
          管理信息:
     
     2、客户端模块

     2.1、消费者管理模块

     一个订阅客户端,当订阅一个队列消息时,就相称于创建了一个消费者
     
     当当前消费者订阅了某一个队列的消息,这个队列有了消息之后,就会将消息推送给这个客户端,这个时候收到了消息则使用回调函数进行处理,处理完毕后,根据确认标志决定是否进行消息确认。
     管理操作:增删查
     2.2、信道管理模块

     客户端的信道与服务端的信道是逐一对应的,服务端信道提供的服务,客户端都有,相称于服务端为客户端提供服务,客户端为用户提供服务。
     全部提供的操作与服务端雷同,因为客户端给用户要提供什么服务,服务器就要给客户端提供什么服务。
     管理信息:
     
     信道提供的服务:
     
     信道的管理:信道的增删查
     2.3、连接管理模块

     对于用户来说,全部的服务都是通过信道完成的,信道在用户交角度就是一个通讯通道(而不是连接)。因此全部的哀求都是通过信道来完成的,连接的管理就包含了客户端资源的整合。
     客户端连接的管理,本质上就是对客户端TcpClient的二次封装和管理。面临用户。不必要有客户端的概念,连接对于用户来说就是客户端,通过连接创建信道,通过信道完成本身所必要的服务。因此,当前客户端这边的连接,就是一个资源的载体
     管理操作:
     
     管理的资源:
     
     2.4、异步线程池模块

     
     将异步工作线程池模块单独拎出来,原因是多个连接用一个EventLoopThread进行IO事件监控就够了,以及全部的推送消息也只必要有一个线程池就够了。并不必要每个连接都有一个EveentLoop,也不必要每个信道都有本身的线程池。
     2.4、基于以上三个模块封装实现:订阅/发布客户端

     订阅客户端:订阅一个队列的消息,收到推送过来消息进行处理。
     发布客户端:向一个交换机发布消息
     

     项目创建

     

     mqdemo:编写⼀些功能⽤例时所在的⽬录
     mqcommon: 公共模块代码(线程池,数据库访问,⽂件访问,⽇志打印,pb相干,以及其他的⼀些琐碎功能模块代码)
     mqclient: 客⼾端模块代码
     mqserver: 服务器模块代码
     mqtest: 单元测试
     mqthird: ⽤到的第三⽅库存放⽬录
     服务端实现

     1、实现日志工具

                                   登录后复制                        
  1. #pragma once
  2. #include <iostream>
  3. #include <ctime>
  4. namespace lsxmq
  5. {
  6. #define DBG_LEVEL 0
  7. #define INF_LEVEL 1
  8. #define ERR_LEVEL 2
  9. #define DEFAULT_LEVEL DBG_LEVEL
  10. #define LOG(levelStr, level, format, ...)                                                                \
  11.     {                                                                                                    \
  12.         if (level >= DEFAULT_LEVEL)                                                                      \
  13.         {                                                                                                \
  14.             time_t t = time(nullptr);                                                                    \
  15.             struct tm *ptm = localtime(&t);                                                              \
  16.             char timeStr[32];                                                                            \
  17.             strftime(timeStr, 31, "%H:%M:%S", ptm);                                                      \
  18.             printf("[%s][%s][%s:%d]" format "\n", levelStr, timeStr, __FILE__, __LINE__, ##__VA_ARGS__); \
  19.         }                                                                                                \
  20.     }
  21.     //__VA_ARGS__是使用可变参数,##是为了保证没有可变参数时忽略前面的逗号
  22.     // 宏函数必须在一行里,在换行时加上\是为了转义\n
  23. #define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
  24. #define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
  25. #define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
  26. }
复制代码
      
                       2、实现使用Helper工具

     2.1、Sqlite基础操作类

                                   登录后复制                        
  1. #pragma once
  2. // 封装实现一个SqliteHelper类,提供简单的sqlite数据库操作接口,完成对数据增删改查操作
  3.     // 1.创建/打开数据库文件
  4.     // 2.针对打开的数据库进行操作
  5.     // 2.1表的操作
  6.     // 2.2数据的操作
  7.     // 3.关闭数据库
  8.     class SqliteHelper
  9.     {
  10.     public:
  11.         typedef int (*SqliteCallback)(void *, int, char **, char **);
  12.         SqliteHelper(std::string dbfile)
  13.             : _dbfile(dbfile)
  14.         {
  15.         }
  16.         bool open(int safeLevel = SQLITE_OPEN_FULLMUTEX) // 打开数据库文件
  17.         {
  18.             // 默认安全级别为串行化模式
  19.             // int sqlite3_open_v2(const char *filename, sqlite3 **ppDb, int flags, const char *zVfs )
  20.             int ret = sqlite3_open_v2(_dbfile.c_str(), &_handler, SQLITE_OPEN_CREATE | SQLITE_OPEN_READWRITE | safeLevel, nullptr);
  21.             if (ret != SQLITE_OK)
  22.             {
  23.                 ELOG("创建/打开数据库文件失败:%s",sqlite3_errmsg(_handler));
  24.                 return false;
  25.             }
  26.             return true;
  27.         }
  28.         bool exec(const std::string &sql, SqliteCallback cb, void *arg) // 进行操作
  29.         {
  30.             // int sqlite3_exec(sqlite3*, char *sql, int (*callback)(void*,int,char**,char**),void* arg, char **err)
  31.             int ret = sqlite3_exec(_handler, sql.c_str(), cb, arg, nullptr);
  32.             if (ret != SQLITE_OK)
  33.             {
  34.                 ELOG("%s执行失败:%s",sql.c_str(),sqlite3_errmsg(_handler));
  35.                 return false;
  36.             }
  37.             return true;
  38.         }
  39.         void close() // 关闭数据库文件
  40.         {
  41.             // int sqlite3_close_v2(sqlite3*)
  42.             sqlite3_close_v2(_handler);
  43.         }
  44.     private:
  45.         sqlite3 *_handler;   // 句柄
  46.         std::string _dbfile; // 数据库文件名
  47.     };
复制代码
      
                       2.2、字符串操作类

     分割字符串功能:
                                   登录后复制                        
  1. static size_t split(const std::string& str,const std::string& sep,std::vector<std::string>& vs)
  2.     {
  3.         size_t idx=0;
  4.         while(idx<str.size())
  5.         {
  6.             size_t pos=str.find(sep,idx);
  7.             //找不到分隔符了
  8.             if(pos==std::string::npos)
  9.             {
  10.                 vs.push_back(str.substr(idx,str.size()-idx));
  11.                 return vs.size();
  12.             }
  13.             //pos的位置是一个分割符,而idx是上一个分割符的下一个位置
  14.             if(idx==pos)//意味着出现连续分割符,中间没有有效子串
  15.             {
  16.                 idx+=sep.size();
  17.                 continue;
  18.             }
  19.             vs.push_back(str.substr(idx,pos-idx));
  20.             idx=pos+sep.size();
  21.         }
  22.         return vs.size();
  23.     }
复制代码
      
                       2.3、UUID天生器类

     1.天生8个0~255之间的随机数
     2.取出一个8字节的序号
     通过以上数据,构成16字节的数据,转换为16进制字符,共32个
     随机数的天生
                                   登录后复制                        
  1. class UUIDHelper
  2.     {
  3.     public:
  4.         static std::string uuid()
  5.         {
  6.             std::random_device rd; // 这是一个随机数生成装置
  7.             // rd();//这样就可以生成机器随机数,但是生成机器随机数效率太低,所以不使用这种方法
  8.             // 实际上是把这个机器随机数作为随机数种子来生成伪随机数
  9.             std::mt19937_64 gernator(rd()); // 使用梅森旋转算法,生成一个伪随机数
  10.             // 因为要生成的随机数范围是0~255,所以需要限定随机数的生成范围
  11.             std::uniform_int_distribution<int> distribution(0, 255);
  12.             std::stringstream uuid;
  13.             for (int i = 0; i < 8; ++i) // 生成16位16进制字符
  14.             {
  15.                 int number = distribution(gernator); // 生成随机数
  16.                 uuid << std::setw(2) << std::setfill('0') << std::hex << number;
  17.                 if (i == 3 || i == 5 || i == 7)
  18.                 {
  19.                     uuid << "-";
  20.                 }
  21.             }
  22.             // 生成序号
  23.             static std::atomic<size_t> seq(0);
  24.             size_t tmpSeq = seq.fetch_add(1); // 每次加一
  25.             for (int i = 7; i >= 0; --i)
  26.             {
  27.                 uuid << std::setw(2) << std::setfill('0') << std::hex << (tmpSeq >> (i * 8) & 0xff);
  28.                 if (i == 6)
  29.                 {
  30.                     uuid << "-";
  31.                 }
  32.             }
  33.             return uuid.str();
  34.         }
  35.     };
复制代码
      
                       2.3、文件操作类

                                   登录后复制                        
  1. class FileHelper
  2.     {
  3.     public:
  4.         FileHelper(const std::string& filename)
  5.             :_filename(filename)
  6.         {}
  7.         bool exists()//文件是否存在
  8.         {
  9.             struct stat st;
  10.             //返回值为0表示能获取文件信息,文件自然就是存在的
  11.             return stat(_filename.c_str(),&st)==0;
  12.         }
  13.         size_t size()//文件大小
  14.         {
  15.             struct stat st;
  16.             if(stat(_filename.c_str(),&st)!=0)
  17.             {
  18.                 return 0;
  19.             }
  20.             return st.st_size;
  21.         }
  22.         bool read(char* body,size_t offset,size_t len)//从文件指定位置读取指定长度内容
  23.         {
  24.             //1.打开文件
  25.             std::ifstream ifs(_filename.c_str(),std::ios_base::in|std::ios_base::binary);
  26.             if(ifs.is_open()==false)
  27.             {
  28.                 ELOG("%s文件打开失败",_filename.c_str());
  29.                 ifs.close();
  30.                 return false;
  31.             }
  32.             //2.移动到指定位置
  33.             ifs.seekg(offset,std::ios_base::beg);
  34.             //3.读取文件内容
  35.             ifs.read(&body[0],len);
  36.             if(ifs.good()==false)
  37.             {
  38.                 ELOG("%s文件读取失败",_filename.c_str());
  39.                 ifs.close();
  40.                 return false;
  41.             }
  42.             //4.关闭文件
  43.             ifs.close();
  44.             return true;
  45.         }
  46.         bool read(std::string& body)//从文件读取内容
  47.         {
  48.             size_t fsize=this->size();
  49.             //根据fsize调整body的大小
  50.             body.resize(fsize);
  51.             return read(&body[0],0,fsize);
  52.         }
  53.         bool write(const char* body,size_t offset,size_t len)//向文件指定位置写入内容
  54.         {
  55.             //1.打开文件
  56.             std::fstream fs(_filename.c_str(),std::ios_base::in|std::ios_base::out|std::ios_base::binary);
  57.             //这里不能ofstream
  58.             //想要跳转到文件的指定位置必须有读权限,istream可以fstream可以,但是ofstream不可以
  59.             if(fs.is_open()==false)
  60.             {
  61.                 ELOG("%s文件打开失败",_filename.c_str());
  62.                 fs.close();
  63.                 return false;
  64.             }
  65.             //2.移动到指定位置
  66.             fs.seekg(offset,std::ios_base::beg);
  67.             //3.向文件写入内容
  68.             fs.write(body,len);
  69.             if(fs.good()==false)
  70.             {
  71.                 ELOG("%s文件写入失败",_filename.c_str());
  72.                 fs.close();
  73.                 return false;
  74.             }
  75.             //4.关闭文件
  76.             fs.close();
  77.             return true;
  78.         }
  79.         bool write(const std::string& body)//向文件写入内容
  80.         {
  81.             return write(&body[0],0,body.size());
  82.         }
  83.         static std::string parentDirectory(const std::string& filename)//返回当前文件的父路径
  84.         {
  85.             //   aaa/bbb/ccc/ddd/e.txt
  86.             int pos=filename.find_last_of('/');
  87.             if(pos==std::string::npos)//没找到
  88.             {
  89.                 //e.txt
  90.                 return "./";
  91.             }
  92.             return filename.substr(0,pos+1);
  93.         }
  94.         bool rename(const std::string& filename)//修改文件名称
  95.         {
  96.             return (::rename(_filename.c_str(),filename.c_str())==0);
  97.         }
  98.         static bool createFile(const std::string& filename)//创建文件
  99.         {
  100.             std::ofstream ofs(filename.c_str(),std::ios_base::binary|std::ios_base::out|std::ios_base::app);
  101.             if(ofs.is_open()==false)
  102.             {
  103.                 ELOG("%s文件创建失败",filename.c_str());
  104.                 ofs.close();
  105.                 return false;
  106.             }
  107.             ofs.close();
  108.             return true;
  109.         }
  110.         static bool removeFile(const std::string& filename)//删除文件
  111.         {
  112.             return (::remove(filename.c_str())==0);
  113.         }
  114.         static bool createDirectory(const std::string& path)//创建目录
  115.         {
  116.             size_t idx=0;
  117.             while(idx<path.size())
  118.             {
  119.                 int pos=path.find('/',idx);
  120.                 if(pos==std::string::npos)
  121.                 {
  122.                     if(mkdir(path.c_str(),0775)!=0)
  123.                     {
  124.                         ELOG("%s目录创建失败",path.c_str());
  125.                         return false;
  126.                     }
  127.                     return true;
  128.                 }
  129.                 std::string subPath=path.substr(0,pos);
  130.                 if(mkdir(subPath.c_str(),0775)!=0)
  131.                 {
  132.                     ELOG("%s目录创建失败",subPath.c_str());
  133.                     return false;
  134.                 }
  135.                 idx=pos+1;
  136.             }
  137.             return true;
  138.         }
  139.         static bool removeDirectory(const std::string& path)//删除目录
  140.         {
  141.             //rmdir必须保证文件夹是空的,所以不用这种方法
  142.             std::string cmd="rm -rf "+path;
  143.             return system(cmd.c_str())==0;
  144.         }
  145.         
  146.     private:
  147.         std::string _filename;
  148.     };
复制代码
      
                       3、消息范例界说与交换机范例界说

                                   登录后复制                        
  1. syntax= "proto3";
  2. package lsx;
  3. enum ExchangType//交换机类型
  4. {
  5.     UNKNOWTYPE=0;//因为必须从0开始,所以加一个未知类型
  6.     DIRECT=1;//直接
  7.     FANOUT=2;//广播
  8.     TOPIC=3;//主题
  9. };
  10. enum DeliveryMode//消息投递方式
  11. {
  12.     UNKONWMODE=0;
  13.     DURABLE=1;   //持久化
  14.     UNDURABLE=2;   //非持久化
  15. };
  16. message BasicProperties //基本特性
  17. {
  18.     string id=1;   //消息ID
  19.     DeliveryMode deliveryMode=2;   //是否持久化
  20.     string routing_key=3;
  21. };
  22. message Message
  23. {
  24.     message Payload//有效载荷
  25.     {
  26.         BasicProperties properties=1;   //基本属性
  27.         string body=2;   //消息主体
  28.         string valid=3;   //是否有效
  29.         //消息是否有效不用bool类型,而是使用字符的0/1
  30.         //因为bool类型序列化时true和false所占长度不同
  31.         //会导致修改文件中消息有效位后消息长度发生变化
  32.     }
  33.     Payload payload=1;   //有效载荷
  34.     int32 offset=2;   //偏移量
  35.     int32 len=3;   //长度
  36. };
复制代码
      
                       4、交换机数据管理

      界说交换机数据类
     
     界说交换机数据持久化类(数据持久化的sqlite3数据库中)
     
     界说交换机数据管理类
     
                                   登录后复制                        
  1. namespace lsxmq
  2. {
  3.     //1.定义交换机类
  4.     struct Exchange
  5.     {
  6.         using ptr=std::shared_ptr<Exchange>;
  7.         std::string name;//交换机名称
  8.         ExchangeType type;//交换机类型
  9.         bool durable;//是否持久化
  10.         bool autoDel;//是否自动删除
  11.         std::unordered_map<std::string,std::string> args;//其它参数
  12.         Exchange(const std::string& ename,ExchangeType etype
  13.         ,bool edurable,bool eautoDel
  14.         ,std::unordered_map<std::string,std::string> eargs)
  15.             :name(ename),type(etype),durable(edurable)
  16.             ,autoDel(eautoDel),args(eargs)
  17.         {}
  18.         //args存储的是键值对,在存储数据库的时候,会组织一个格式进行存储:key=value&key=value
  19.         //将字符串反序列化为args
  20.         void setArgs(std::string strArgs);
  21.         //将args序列化为字符串
  22.         std::string getArgs();
  23.     };
  24.     //2.定义交换机数据持久化存储类--数据存储在sqlite数据库中
  25.     class ExchangeMapper
  26.     {
  27.     public:
  28.         ExchangeMapper(const std::string dbfile)
  29.             :_sqliteHelper(dbfile)
  30.         {}
  31.         bool createTable();//创建表
  32.         bool removeTable();//删除表
  33.         bool insert(Exchange::ptr& exchang);//向数据库插入一条交换机数据
  34.         bool erase(Exchange::ptr& exchang);//从数据库中删除一条交换机数据
  35.         std::unordered_map<std::string,Exchange::ptr> getAll();//获取数据库中所有的交换机数据
  36.         //name:ptr
  37.     private:
  38.         SqliteHelper _sqliteHelper;
  39.     };
  40.     //3.定义交换机数据管理类
  41.     class ExchangeManager
  42.     {
  43.     public:
  44.         ExchangeManager(const std::string dbfile)
  45.             :_mapper(dbfile)
  46.         {}
  47.         bool declareExchange(const std::string& ename,ExchangeType etype
  48.         ,bool edurable,bool eautoDel
  49.         ,std::unordered_map<std::string,std::string> eargs);//声明交换机
  50.         bool deleteExchange(const std::string& name);//删除交换机
  51.         Exchange::ptr selectExchange(const std::string& name);//获取指定交换机
  52.         bool exists(const std::string& anme);//指定交换机是否存在
  53.         void clear();//清理数据
  54.     private:
  55.         std::mutex _mutex;
  56.         ExchangeMapper _mapper;
  57.         std::unordered_map<std::string,Exchange::ptr> _exchanges;
  58.         
  59.     };
  60. };
复制代码
      
                       5、队列数据的管理

     当前队列数据的管理,本质上是队列描述信息的管理,描述当前服务器上有哪些队列。
     界说队列描述数据类
     
     界说队列数据持久化类(数据持久化的sqlite3数据库中)
     
     界说队列数据管理类
     
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include <unordered_map>
  5. #include <memory>
  6. #include <cassert>
  7. #include <mutex>
  8. namespace lsxmq
  9. {
  10.     // 1.定义消息队列类
  11.     struct MessageQueue
  12.     {
  13.         using ptr = std::shared_ptr<MessageQueue>;
  14.         std::string name;                                  // 队列名称
  15.         bool durable;                                      // 是否持久化
  16.         bool exclusive;                                    // 是否独占
  17.         bool autoDel;                                      // 是否自动删除
  18.         std::unordered_map<std::string, std::string> args; // 其它参数
  19.         MessageQueue() {}
  20.         MessageQueue(const std::string &qname, bool qdurable, bool qexclusive, bool qautoDel, std::unordered_map<std::string, std::string> qargs)
  21.             : name(qname), durable(qdurable), exclusive(qexclusive), autoDel(qautoDel), args(qargs)
  22.         {
  23.         }
  24.         // args存储的是键值对,在存储数据库的时候,会组织一个格式进行存储:key=value&key=value
  25.         // 将字符串反序列化为args
  26.         void setArgs(std::string strArgs)
  27.         {
  28.             std::vector<std::string> subStr;
  29.             StrHelper::split(strArgs, "&", subStr); // 分割字符串
  30.             for (auto &str : subStr)                // 继续分割
  31.             {
  32.                 int pos = str.find("=");
  33.                 if (pos != std::string::npos) // 加入args
  34.                 {
  35.                     std::string first = str.substr(0, pos);
  36.                     std::string second = str.substr(pos + 1, str.size());
  37.                     args.insert(std::make_pair(first, second));
  38.                 }
  39.             }
  40.         }
  41.         // 将args序列化为字符串
  42.         std::string getArgs()
  43.         {
  44.             std::string res;
  45.             for (auto &a : args)
  46.             {
  47.                 res += a.first;
  48.                 res += "=";
  49.                 res += a.second;
  50.                 res += "&";
  51.             }
  52.             res.pop_back();
  53.             return res;
  54.         }
  55.     };
  56.     // 2.定义消息队列数据持久化存储类--数据存储在sqlite数据库中
  57.     using MessageQueueMap = std::unordered_map<std::string, MessageQueue::ptr>;
  58.     class MessageQueueMapper
  59.     {
  60.     public:
  61.         MessageQueueMapper(const std::string &dbfile)
  62.             : _sqliteHelper(dbfile)
  63.         {
  64.             std::string path = FileHelper::parentDirectory(dbfile);
  65.             if (FileHelper(path).exists() == false) // 父目录不存在就创建
  66.             {
  67.                 FileHelper::createDirectory(path);
  68.             }
  69.             assert(_sqliteHelper.open());
  70.             this->createTable();
  71.         }
  72.         void createTable() // 创建一张消息队列数据管理表
  73.         {
  74. #define CREATE_TABLE "create table if not exists messageQueueTable(\
  75.             name varchar(32) primary key,\
  76.             durable int,\
  77.             exclusive int,\
  78.             autoDel int,\
  79.             args varchar(128));"
  80.             bool ret = _sqliteHelper.exec(CREATE_TABLE, nullptr, nullptr);
  81.             if (ret == false)
  82.             {
  83.                 ELOG("消息队列数据库表创建失败");
  84.                 abort();
  85.             }
  86.         }
  87.         void removeTable() // 删除表
  88.         {
  89. #define DROP_TABLE "drop table messageQueueTable"
  90.             bool ret = _sqliteHelper.exec(DROP_TABLE, nullptr, nullptr);
  91.             if (ret == false)
  92.             {
  93.                 ELOG("消息队列数据库表删除失败");
  94.                 abort();
  95.             }
  96.         }
  97.         bool insert(MessageQueue::ptr& mqp) // 向数据库插入一条消息队列数据
  98.         {
  99. #define INSERT "insert into messageQueueTable values('%s',%d,%d,%d,'%s');"
  100.             char sql[256] = {0};
  101.             sprintf(sql, INSERT, mqp->name.c_str(), mqp->durable, mqp->exclusive, mqp->autoDel, mqp->getArgs().c_str());
  102.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  103.         }
  104.         bool erase(const std::string &qname) // 从数据库中删除一条消息队列数据
  105.         {
  106. #define ERASE "delete from  messageQueueTable where name='%s';"
  107.             char sql[256] = {0};
  108.             sprintf(sql, ERASE, qname.c_str());
  109.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  110.         }
  111.         MessageQueueMap recovery() // 获取数据库中所有的消息队列数据(恢复)
  112.         {
  113. #define SELECT_ALL "select name,durable,exclusive,autoDel,args from  messageQueueTable"
  114.             MessageQueueMap result;
  115.             bool ret = _sqliteHelper.exec(SELECT_ALL, sqliteCallback, &result);
  116.             if (ret == false)
  117.             {
  118.                 ELOG("恢复消息队列数据失败");
  119.                 abort();
  120.             }
  121.             return result;
  122.         }
  123.         ~MessageQueueMapper()
  124.         {
  125.             _sqliteHelper.close();
  126.         }
  127.     private:
  128.         static int sqliteCallback(void *arg, int col, char **row, char **fields)
  129.         {
  130.             MessageQueueMap *result = (MessageQueueMap *)arg;
  131.             auto mqp = std::make_shared<MessageQueue>();
  132.             mqp->name = row[0];
  133.             mqp->durable = (bool)std::stoi(row[2]);
  134.             mqp->exclusive = (bool)std::stoi(row[2]);
  135.             mqp->autoDel = (bool)std::stoi(row[3]);
  136.             if (row[4])
  137.                 mqp->setArgs(row[4]); // 其它参数可能不存在
  138.             result->insert(std::make_pair(mqp->name, mqp));
  139.             return 0;
  140.         }
  141.     private:
  142.         SqliteHelper _sqliteHelper;
  143.     };
  144.     // 3.定义消息队列数据管理类
  145.     class MessageQueueManager
  146.     {
  147.     public:
  148.         using ptr = std::shared_ptr<MessageQueueManager>;
  149.         MessageQueueManager(const std::string dbfile)
  150.             : _mapper(dbfile)
  151.         {
  152.             _messageQueues = _mapper.recovery(); // 恢复数据
  153.         }
  154.         bool declareMessageQueue(const std::string &name, bool durable,
  155.         bool exclusive, bool autoDel, std::unordered_map<std::string, std::string> args) // 声明消息队列
  156.         {
  157.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  158.             auto it = _messageQueues.find(name);
  159.             if (it != _messageQueues.end()) // 已经存在了
  160.             {
  161.                 return true;
  162.             }
  163.             auto mqp = std::make_shared<MessageQueue>(name, durable,exclusive, autoDel, args);
  164.             if (durable == true) // 需要持久化
  165.             {
  166.                 bool ret = _mapper.insert(mqp);
  167.                 if (ret == false)
  168.                     return false;
  169.             }
  170.             _messageQueues.insert(std::make_pair(name, mqp));
  171.             return true;
  172.         }
  173.         bool deleteMessageQueue(const std::string &name) // 删除消息队列
  174.         {
  175.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  176.             auto it = _messageQueues.find(name);
  177.             if (it == _messageQueues.end()) // 不存在
  178.             {
  179.                 return true;
  180.             }
  181.             if (it->second->durable == true) // 从数据库中删除
  182.             {
  183.                 bool ret = _mapper.erase(it->second->name);
  184.                 if (ret == false)
  185.                     return false;
  186.             }
  187.             _messageQueues.erase(name);
  188.             return true;
  189.         }
  190.         MessageQueue::ptr selectMessageQueue(const std::string &name) // 获取指定消息队列
  191.         {
  192.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  193.             auto it = _messageQueues.find(name);
  194.             if (it == _messageQueues.end()) // 不存在
  195.             {
  196.                 return MessageQueue::ptr();
  197.             }
  198.             return it->second;
  199.         }
  200.         MessageQueueMap getall()//获取所有消息队列
  201.         {
  202.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  203.             return _messageQueues;
  204.         }
  205.         bool exists(const std::string &name) // 指定交换机是否存在
  206.         {
  207.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  208.             auto it = _messageQueues.find(name);
  209.             return it != _messageQueues.end();
  210.         }
  211.         void clear() // 清理数据
  212.         {
  213.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  214.             _mapper.removeTable();                     // 先删磁盘内容
  215.             _messageQueues.clear();                    // 再删内存内容
  216.         }
  217.         size_t size()
  218.         {
  219.             std::unique_lock<std::mutex> lock(_mutex); // 先加锁
  220.             return _messageQueues.size();
  221.         }
  222.     private:
  223.         std::mutex _mutex; // 多线程使用,注意线程安全
  224.         MessageQueueMapper _mapper;
  225.         MessageQueueMap _messageQueues;
  226.     };
  227. };
复制代码
      
                       6、绑定信息(交换机-消息队列)管理

     绑定信息,本质上就是⼀个交换构造联了哪些队列的描述。
     界说绑定信息类
     
     界说绑定信息数据持久化类(数据持久化的sqlite3数据库中)
     
      界说绑定信息数据管理类
     
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include <unordered_map>
  5. #include <memory>
  6. #include <cassert>
  7. #include <mutex>
  8. namespace lsxmq
  9. {
  10.     //1.定义绑定类
  11.     struct Binding
  12.     {
  13.         using ptr=std::shared_ptr<Binding>;
  14.         std::string exchangeName;//交换机名称
  15.         std::string messageQueueName;//消息队列名称
  16.         std::string binding_key;
  17.         bool durable;//是否持久化
  18.         Binding(){}
  19.         Binding(const std::string& ename,const std::string& qname,const std::string& bdkey,bool bdurable)
  20.             :exchangeName(ename),messageQueueName(qname),binding_key(bdkey),durable(bdurable)
  21.         {}
  22.     };
  23.     //2.定义持久化管理绑定类
  24.     //交换机名称-->多个消息队列
  25.     //消息队列-->多个绑定信息
  26.     using MessageQueueMap=std::unordered_map<std::string,Binding::ptr>;
  27.     using BindingMap=std::unordered_map<std::string,MessageQueueMap>;
  28.     class BindingMapper
  29.     {
  30.     public:
  31.         BindingMapper(const std::string dbfile)
  32.             :_sqliteHelper(dbfile)
  33.         {
  34.             std::string path = FileHelper::parentDirectory(dbfile);
  35.             if (FileHelper(path).exists() == false) // 父目录不存在就创建
  36.             {
  37.                 FileHelper::createDirectory(path);
  38.             }
  39.             assert(_sqliteHelper.open());
  40.             this->createTable();
  41.         }
  42.         void createTable()//创建表
  43.         {
  44.             #define CREATE_TABLE "create table if not exists bindingTable(\
  45.             exchangeName varchar(32),\
  46.             messageQueueName varchar(32),\
  47.             binding_key varchar(128),\
  48.             durable int);"
  49.             bool ret = _sqliteHelper.exec(CREATE_TABLE, nullptr, nullptr);
  50.             if (ret == false)
  51.             {
  52.                 ELOG("绑定数据库表创建失败");
  53.                 abort();
  54.             }
  55.         }
  56.         void removeTable()//删除表
  57.         {
  58.             #define DROP_TABLE "drop table bindingTable"
  59.             bool ret = _sqliteHelper.exec(DROP_TABLE, nullptr, nullptr);
  60.             if (ret == false)
  61.             {
  62.                 ELOG("绑定数据库表删除失败");
  63.                 abort();
  64.             }
  65.         }
  66.         bool insert(Binding::ptr& bdp)//插入绑定数据
  67.         {
  68.             #define INSERT "insert into bindingTable values('%s','%s','%s',%d);"
  69.             char sql[256] = {0};
  70.             sprintf(sql, INSERT, bdp->exchangeName.c_str(),bdp->messageQueueName.c_str(),bdp->binding_key.c_str(),bdp->durable);
  71.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  72.         }
  73.         bool erase(const std::string& ename,const std::string qname)//删除绑定数据数据
  74.         {
  75.             #define ERASE "delete from  bindingTable where exchangeName='%s' and messageQueueName='%s';"
  76.             char sql[256] = {0};
  77.             sprintf(sql, ERASE, ename.c_str(),qname.c_str());
  78.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  79.         }
  80.         bool removeExchangeBinding(const std::string& ename)//删除交换机的绑定数据
  81.         {
  82.             #define ERASE_EXCHANGE "delete from  bindingTable where exchangeName='%s';"
  83.             char sql[256] = {0};
  84.             sprintf(sql, ERASE_EXCHANGE, ename.c_str());
  85.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  86.         }
  87.         bool removeMessageQueueBinding(const std::string& qname)//删除消息队列的绑定数据
  88.         {
  89.             #define ERASE_MESSAGEQUEUE "delete from  bindingTable where messageQueueName='%s';"
  90.             char sql[256] = {0};
  91.             sprintf(sql, ERASE_MESSAGEQUEUE, qname.c_str());
  92.             return _sqliteHelper.exec(sql, nullptr, nullptr);
  93.         }
  94.         BindingMap recovery()//恢复数据
  95.         {
  96.             #define SELECT_ALL "select exchangeName,messageQueueName,binding_key,durable from  bindingTable"
  97.             BindingMap result;
  98.             bool ret = _sqliteHelper.exec(SELECT_ALL, sqliteCallback, &result);
  99.             if (ret == false)
  100.             {
  101.                 ELOG("恢复绑定数据失败");
  102.                 abort();
  103.             }
  104.             return result;
  105.         }
  106.         ~BindingMapper()
  107.         {
  108.             _sqliteHelper.close();
  109.         }
  110.     private:
  111.         static int sqliteCallback(void *arg, int col, char **row, char **fields)
  112.         {
  113.             BindingMap *result = (BindingMap *)arg;
  114.             auto bdp = std::make_shared<Binding>(row[0],row[1],row[2],row[3]);
  115.             //ename->(qname->binding)
  116.             //为了防止 交换机相关的绑定信息已经存在,因此不能直接创建队列映射,进行添加,这样会覆盖历史数据
  117.             //因此得先获取交换机对应的映射对象,往里边添加数据
  118.             //但是,若这时候没有交换机对应的映射信息,因此这里的获取要使用引用(会保证不存在则自动创建)
  119.             //1.用ename拿出MessageQueueMap
  120.             MessageQueueMap& mqm=(*result)[row[0]];//已经有了就拿出来,没有就创建
  121.             mqm.insert(std::make_pair(row[1],bdp));
  122.             return 0;
  123.         }
  124.     private:
  125.         SqliteHelper _sqliteHelper;
  126.     };
  127.     //3.定义绑定内存管理类
  128.     class BindingManager
  129.     {
  130.     public:
  131.         using ptr=std::shared_ptr<BindingManager>;
  132.         BindingManager(const std::string& dbfile)
  133.             :_mapper(dbfile)
  134.         {
  135.             _bindings=_mapper.recovery();
  136.         }
  137.         bool bind(const std::string& ename,const std::string& qname,const std::string& bdkey,bool durable)//建立绑定关系
  138.         {
  139.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  140.             //先确定改绑定是否已存在
  141.             auto it=_bindings.find(ename);
  142.             if(it!=_bindings.end()&&it->second.find(qname)!=it->second.end())
  143.             {
  144.                 return true;
  145.             }
  146.             Binding::ptr bdp=std::make_shared<Binding>(ename,qname,bdkey,durable);
  147.             //绑定信息是否需要持久化,取决于什么? 交换机数据是持久化的,以及队列数据也是持久化的。
  148.             if(durable)//需要持久化
  149.             {
  150.                 bool ret=_mapper.insert(bdp);
  151.                 if(ret==false) return false;
  152.             }
  153.             auto& mqm=_bindings[ename];
  154.             mqm.insert(std::make_pair(qname,bdp));
  155.             return true;
  156.         }
  157.         bool unbind(const std::string& ename,const std::string& qname)//解除绑定关系
  158.         {
  159.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  160.             //先确定绑定是否不存在
  161.             auto it=_bindings.find(ename);
  162.             if(it==_bindings.end())//通过ename找不到MessageQueueMap
  163.             {
  164.                 return true;
  165.             }
  166.             auto it2=it->second.find(qname);
  167.             if(it2==it->second.end())//通过qname找不到binding
  168.             {
  169.                 return true;
  170.             }
  171.             //现在确定了要解除的绑定是存在的
  172.             if(it2->second->durable==true)//从数据库里删除
  173.             {
  174.                 bool ret=_mapper.erase(ename,qname);
  175.                 if(ret==false) return false;
  176.             }
  177.             //从内存中删除
  178.             it->second.erase(qname);
  179.             if(it->second.empty())
  180.             {
  181.                 _bindings.erase(ename);
  182.             }
  183.             return true;
  184.         }
  185.         bool deleteExchangeBinding(const std::string& ename)//删除与交换机相关的绑定
  186.         {
  187.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  188.             _bindings.erase(ename);
  189.             return _mapper.removeExchangeBinding(ename);
  190.         }
  191.         bool deleteMessageQueueBinding(const std::string& qname)//删除与消息队列相关的绑定
  192.         {
  193.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  194.             for(auto it=_bindings.begin();it!=_bindings.end();)
  195.             {
  196.                 it->second.erase(qname);
  197.                 if(it->second.empty())
  198.                 {
  199.                     it=_bindings.erase(it);
  200.                 }
  201.                 else
  202.                 {
  203.                     ++it;
  204.                 }
  205.             }
  206.             return _mapper.removeMessageQueueBinding(qname);
  207.         }
  208.         MessageQueueMap getExchangeBinding(const std::string& ename)//获取交换机绑定的所有消息队列
  209.         {
  210.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  211.             //该交换机是否存在
  212.             auto it=_bindings.find(ename);
  213.             if(it==_bindings.end())//该交换机不存在
  214.             {
  215.                 return MessageQueueMap();
  216.             }
  217.             return it->second;
  218.         }
  219.         Binding::ptr getBinding(const std::string& ename,const std::string& qname)
  220.         {
  221.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  222.             //先确定绑定是否不存在
  223.             auto it=_bindings.find(ename);
  224.             if(it==_bindings.end())//通过ename找不到MessageQueueMap
  225.             {
  226.                 return Binding::ptr();
  227.             }
  228.             auto it2=it->second.find(qname);
  229.             if(it2==it->second.end())//通过qname找不到binding
  230.             {
  231.                 return Binding::ptr();
  232.             }
  233.             //现在确定了要解除的绑定是存在的
  234.             return it2->second;
  235.         }
  236.         bool exists(const std::string& ename,const std::string& qname)//绑定信息是否存在
  237.         {
  238.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  239.             auto it=_bindings.find(ename);
  240.             if(it!=_bindings.end()&&it->second.find(qname)!=it->second.end())
  241.             {
  242.                 return true;
  243.             }
  244.             return false;
  245.         }
  246.         size_t size()//绑定数量
  247.         {
  248.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  249.             size_t ret=0;
  250.             for(auto& it:_bindings)
  251.             {
  252.                 ret+=it.second.size();
  253.             }
  254.             return ret;
  255.         }
  256.         void clear()//清空数据
  257.         {
  258.             ILOG("清空数据");
  259.             std::unique_lock<std::mutex> lock(_mutex);//先加锁
  260.             _mapper.removeTable();
  261.             _bindings.clear();
  262.         }
  263.     private:
  264.         std::mutex _mutex;
  265.         BindingMapper _mapper;
  266.         BindingMap _bindings;
  267.     };
  268. };
复制代码
      
                       7、队列消息管理

     文件数据管理

     消息的要素(决定了消息的数据结构):
               消息的持久化管理:
               向外提供的操作:
               要管理的数据:
          内存数据管理

     以队列为单元进行管理:
                         实现一个对外的总体消息管理类

                                                       登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. #include <unordered_map>
  6. #include <memory>
  7. #include <cassert>
  8. #include <list>
  9. #include <mutex>
  10. namespace lsxmq
  11. {
  12.     using MessagePtr = std::shared_ptr<lsxmq::Message>;
  13. #define DATAFILE_SUBFIX ".mqd"
  14. #define TMPFILE_SUBFIX ".mqd.tmp"
  15. #define NOTVALID "0"
  16. #define ISVALID "1"
  17.     class MessageMapper
  18.     {
  19.     public:
  20.         MessageMapper(std::string &basedir, const std::string &qname)
  21.             : _qname(qname)
  22.         {
  23.             if (basedir.back() != '/')
  24.             {
  25.                 basedir.push_back('/'); // 如果没有就往末尾加一个/
  26.             }
  27.             _datafile = basedir + qname + DATAFILE_SUBFIX;
  28.             _tmpfile = basedir + qname + TMPFILE_SUBFIX;
  29.             assert(FileHelper::createDirectory(basedir)); // 创建基本目录
  30.             createMessageFile();
  31.         }
  32.         bool createMessageFile() // 创建消息文件
  33.         {
  34.             if (FileHelper(_datafile).exists() == true)
  35.             {
  36.                 return true;
  37.             }
  38.             if (FileHelper::createFile(_datafile) == false)
  39.             {
  40.                 ELOG("创建消息文件失败");
  41.                 return false;
  42.             }
  43.             return true;
  44.         }
  45.         bool removeMessageFile() // 删除消息文件
  46.         {
  47.             if (FileHelper(_datafile).exists() == false)
  48.             {
  49.                 return true;
  50.             }
  51.             if (FileHelper::removeFile(_datafile) == false)
  52.             {
  53.                 ELOG("删除消息文件失败");
  54.                 return false;
  55.             }
  56.             return true;
  57.         }
  58.         bool insert(MessagePtr &ptr) // 新增消息
  59.         {
  60.             return insert(_datafile, ptr); // 向data文件写入内容
  61.         }
  62.         bool erase(MessagePtr &ptr) // 删除消息
  63.         {
  64.             // 1.将message的有效标志置为0
  65.             ptr->mutable_payload()->set_valid(NOTVALID);
  66.             // 2.进行序列化
  67.             std::string body = ptr->payload().SerializeAsString();
  68.             if (body.size() != ptr->len())
  69.             {
  70.                 ELOG("新数据与原数据长度不一致")
  71.                 return false;
  72.             }
  73.             // 3.将序列化的message覆盖文件中原本的message
  74.             FileHelper dataFileHelper(_datafile);
  75.             if (dataFileHelper.write(body.c_str(), ptr->offset(), body.size()) == false)
  76.             {
  77.                 ELOG("消息重新写入文件失败");
  78.                 return false;
  79.             }
  80.             return true;
  81.         }
  82.         std::list<MessagePtr> gc() // 垃圾回收
  83.         {
  84.             bool ret = true;
  85.             std::list<MessagePtr> result;
  86.             // 1.加载数据
  87.             ret = load(result);
  88.             if (ret == false)
  89.             {
  90.                 ELOG("加载数据失败");
  91.                 return result;
  92.             }
  93.             // 2.将数据写入到临时文件
  94.             FileHelper::createFile(_tmpfile);
  95.             FileHelper tmpDataHelper(_tmpfile);
  96.             for (auto &mp : result)
  97.             {
  98.                 // 写入
  99.                 ret = this->insert(_tmpfile, mp);
  100.                 if (ret == false)
  101.                 {
  102.                     ELOG("消息写入临时文件失败");
  103.                     return result;
  104.                 }
  105.             }
  106.             // 3.删除原文件
  107.             ret = removeMessageFile();
  108.             if (ret == false)
  109.             {
  110.                 ELOG("删除消息文件失败");
  111.                 return result;
  112.             }
  113.             // 4.将原文件改为消息文件
  114.             ret = tmpDataHelper.rename(_datafile);
  115.             if (ret == false)
  116.             {
  117.                 ELOG("修改临时文件名称失败");
  118.                 return result;
  119.             }
  120.             return result;
  121.         }
  122.     private:
  123.         bool load(std::list<MessagePtr> &mplist) // 将文件中的消息加载进内存
  124.         {
  125.             size_t offset = 0;
  126.             FileHelper dataFileHelper(_datafile);
  127.             size_t fsize = dataFileHelper.size();
  128.             bool ret = true;
  129.             while (offset < fsize)
  130.             {
  131.                 // 1.前四位为数据长度
  132.                 size_t msize = 0;
  133.                 ret = dataFileHelper.read((char *)&msize, offset, sizeof(size_t));
  134.                 if (ret == false)
  135.                 {
  136.                     ELOG("读取信息长度失败");
  137.                     return false;
  138.                 }
  139.                 // 2.开始读取数据
  140.                 offset += sizeof(size_t);
  141.                 std::string body(msize,'\0');
  142.                 ret = dataFileHelper.read(&body[0], offset, msize);
  143.                 if (ret == false)
  144.                 {
  145.                     ELOG("读取信息内容失败");
  146.                     return false;
  147.                 }
  148.                 // 3.反序列化
  149.                 MessagePtr mp = std::make_shared<Message>();
  150.                 mp->mutable_payload()->ParseFromString(body);
  151.                 // 4.剔除无效信息
  152.                 if (mp->payload().valid() == NOTVALID)
  153.                 {
  154.                     offset += msize;
  155.                     continue;
  156.                 }
  157.                 // 4.保存结果
  158.                 mplist.push_back(mp);
  159.                 offset += msize;
  160.             }
  161.             return true;
  162.         }
  163.         bool insert(const std::string filename, MessagePtr &ptr)
  164.         {
  165.             // 1.对消息进行序列化
  166.             std::string body = ptr->payload().SerializeAsString();
  167.             MessagePtr tmp=std::make_shared<Message>();
  168.             tmp->mutable_payload()->ParseFromString(body);
  169.             // 2.获取文件大小
  170.             FileHelper FileHelper(filename);
  171.             size_t fsize = FileHelper.size();
  172.             size_t msize = body.size();
  173.             // 3.先写入8字节长度
  174.             if (FileHelper.write((char*)&msize, fsize, sizeof(size_t)) == false)
  175.             {
  176.                 ELOG("消息长度写入文件失败");
  177.                 return false;
  178.             }
  179.             //4.在写入指定长度内容
  180.             if (FileHelper.write(body.c_str(), fsize+sizeof(size_t), msize) == false)
  181.             {
  182.                 ELOG("消息内容写入文件失败");
  183.                 return false;
  184.             }
  185.             // 4.更新message的实际存储位置
  186.             ptr->set_offset(fsize+sizeof(size_t));
  187.             ptr->set_len(msize);
  188.             return true;
  189.         }
  190.     private:
  191.         std::string _qname;
  192.         std::string _datafile;
  193.         std::string _tmpfile;
  194.     };
  195.     class QueueMessage
  196.     {
  197.     public:
  198.         using ptr = std::shared_ptr<QueueMessage>;
  199.         QueueMessage(std::string &basedir, const std::string &qname)
  200.             : _mapper(basedir, qname), _validCount(0), _totalCount(0), _qname(qname)
  201.         {}
  202.         void recovery() // 恢复历史消息
  203.         {
  204.             std::unique_lock<std::mutex> lock(_mutex);
  205.             _pushMessages = _mapper.gc();
  206.             for (auto &mp : _pushMessages)
  207.             {
  208.                 _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  209.             }
  210.             _totalCount = _validCount = _durableMessages.size();
  211.         }
  212.         bool insert(BasicProperties *bp, const std::string& body, DeliveryMode deliveryMode) // 新增消息
  213.         {
  214.             // 1.构造message对象
  215.             MessagePtr mp = std::make_shared<Message>();
  216.             if (bp != nullptr)
  217.             {
  218.                 mp->mutable_payload()->mutable_properties()->set_id(bp->id());
  219.                 mp->mutable_payload()->mutable_properties()->set_deliverymode(bp->deliverymode());
  220.                 mp->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());
  221.                 mp->mutable_payload()->set_body(body);
  222.                 mp->mutable_payload()->set_valid(ISVALID);
  223.             }
  224.             else
  225.             {
  226.                 mp->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());
  227.                 mp->mutable_payload()->mutable_properties()->set_deliverymode(deliveryMode);
  228.                 mp->mutable_payload()->mutable_properties()->set_routing_key("");
  229.                 mp->mutable_payload()->set_body(body);
  230.                 mp->mutable_payload()->set_valid(ISVALID);
  231.             }
  232.             // 2.判断是否需要持久化
  233.             std::unique_lock<std::mutex> lock(_mutex);
  234.             bool ret = true;
  235.             if (mp->payload().properties().deliverymode() == DeliveryMode::DURABLE) // 需要
  236.             {
  237.                 ret = _mapper.insert(mp);
  238.                 if (ret == false)
  239.                 {
  240.                     ELOG("%s:消息持久化失败", body.c_str());
  241.                     return false;
  242.                 }
  243.                 _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  244.                 ++_validCount;
  245.                 ++_totalCount;
  246.             }
  247.             // 3.在内存中管理
  248.             _pushMessages.push_back(mp);
  249.             return true;
  250.         }
  251.         MessagePtr front() // 获取队首消息
  252.         {
  253.             std::unique_lock<std::mutex> lock(_mutex);
  254.             if(_pushMessages.size()==0)//没有消息能拿出来
  255.             {
  256.                 return MessagePtr();
  257.             }
  258.             // 获取队首消息
  259.             MessagePtr mp = _pushMessages.front();
  260.             // 将消息从待推送队列加到待确认hashmap中
  261.             _pushMessages.pop_front();
  262.             _waitAckMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  263.             return mp;
  264.         }
  265.         bool erase(const std::string &id) // 确认消息,每次删除一条消息后确认是否要进行垃圾回收
  266.         {
  267.             std::unique_lock<std::mutex> lock(_mutex);
  268.             // 1.从待确认消息中找到这条消息
  269.             auto it = _waitAckMessages.find(id);
  270.             if (it == _waitAckMessages.end()) // 找不到
  271.             {
  272.                 return true;
  273.             }
  274.             // 2.根据这条消息的投递模式决定是否从持久化中删除
  275.             bool ret = true;
  276.             if (it->second->payload().properties().deliverymode() == true) // 为持久化
  277.             {
  278.                 // 3.从持久化中删除
  279.                 ret = _mapper.erase(it->second);
  280.                 if (ret == false)
  281.                 {
  282.                     ELOG("从文件中删除id=%s的消息失败", id.c_str());
  283.                     return false;
  284.                 }
  285.                 _durableMessages.erase(id);
  286.                 --_validCount;
  287.                 // 进行垃圾回收
  288.                 gc();
  289.             }
  290.             // 4.从内存中删除
  291.             _waitAckMessages.erase(id);
  292.             return true;
  293.         }
  294.         size_t pushCount()
  295.         {
  296.             std::unique_lock<std::mutex> lock(_mutex);
  297.             return _pushMessages.size();
  298.         }
  299.         size_t durableCount()
  300.         {
  301.             std::unique_lock<std::mutex> lock(_mutex);
  302.             return _durableMessages.size();
  303.         }
  304.         size_t waitAckCount()
  305.         {
  306.             std::unique_lock<std::mutex> lock(_mutex);
  307.             return _waitAckMessages.size();
  308.         }
  309.         size_t validCount()
  310.         {
  311.             std::unique_lock<std::mutex> lock(_mutex);
  312.             return _validCount;
  313.         }
  314.         size_t totalCount()
  315.         {
  316.             std::unique_lock<std::mutex> lock(_mutex);
  317.             return _totalCount;
  318.         }
  319.         void clear()
  320.         {
  321.             _mapper.removeMessageFile();
  322.             _pushMessages.clear();
  323.             _durableMessages.clear();
  324.             _waitAckMessages.clear();
  325.             _totalCount = _validCount = 0;
  326.         }
  327.     private:
  328.         bool NeedGC() // 判断是否需要进行垃圾回收
  329.         {
  330.             if (_totalCount > 2000 && _validCount * 10 / _totalCount < 5)
  331.             {
  332.                 return true;
  333.             }
  334.             return false;
  335.         }
  336.         void gc() // 垃圾回收
  337.         {
  338.             if (NeedGC() == false)
  339.                 return;
  340.             // 1.接收垃圾回收后的消息
  341.             std::list<MessagePtr> mplist = _mapper.gc();
  342.             // 2.修改内存中消息的数据
  343.             for (auto &mp : mplist)
  344.             {
  345.                 // 从持久化消息中找到消息
  346.                 auto it = _durableMessages.find(mp->payload().properties().id());
  347.                 if (it == _durableMessages.end())
  348.                 {
  349.                     ILOG("出现了一条持久消息没有在内存中管理");
  350.                     // 解决策略:重新添加
  351.                     _pushMessages.push_back(mp);
  352.                     _durableMessages.insert(std::make_pair(mp->payload().properties().id(), mp));
  353.                     continue;
  354.                 }
  355.                 // 更新消息与文件相关的数据
  356.                 it->second->set_offset(mp->offset());
  357.                 it->second->set_len(mp->len());
  358.             }
  359.             _totalCount = _validCount = _durableMessages.size();
  360.         }
  361.     private:
  362.         std::mutex _mutex;
  363.         MessageMapper _mapper;
  364.         std::list<MessagePtr> _pushMessages;                          // 待推送消息
  365.         std::unordered_map<std::string, MessagePtr> _durableMessages; // 持久化消息
  366.         std::unordered_map<std::string, MessagePtr> _waitAckMessages; // 待确认消息
  367.         size_t _validCount;                                           // 持久化有效消息个数
  368.         size_t _totalCount;                                           // 持久化总消息个数
  369.         std::string _qname;
  370.     };
  371.     class QueueMessageManager
  372.     {
  373.     public:
  374.         using ptr=std::shared_ptr<QueueMessageManager>;
  375.         QueueMessageManager(const std::string &basedir)
  376.             : _basedir(basedir)
  377.         {}
  378.         void clear()
  379.         {
  380.             std::unique_lock<std::mutex> lock(_mutex);
  381.             for(auto& a:_queueMessages)
  382.             {
  383.                 a.second->clear();
  384.             }
  385.         }
  386.         void initQueueMessage(const std::string &qname) // 初始化队列
  387.         {
  388.             QueueMessage::ptr qmp;
  389.             {
  390.                 std::unique_lock<std::mutex> lock(_mutex);
  391.                 // 该队列是否已存在
  392.                 auto it = _queueMessages.find(qname);
  393.                 if (it != _queueMessages.end()) // 已存在
  394.                 {
  395.                     return ;
  396.                 }
  397.                 qmp = std::make_shared<QueueMessage>(_basedir, qname);
  398.                 _queueMessages.insert(std::make_pair(qname,qmp));
  399.             }
  400.             qmp->recovery();
  401.         }
  402.         void destroyQueueMessage(const std::string &qname)// 销毁队列
  403.         {
  404.             QueueMessage::ptr qmp;
  405.             {
  406.                 std::unique_lock<std::mutex> lock(_mutex);
  407.                 // 该队列是否不存在
  408.                 auto it = _queueMessages.find(qname);
  409.                 if (it == _queueMessages.end()) // 不存在
  410.                 {
  411.                     return ;
  412.                 }
  413.                 qmp=it->second;
  414.                 _queueMessages.erase(qname);
  415.             }
  416.             qmp->clear();
  417.         }
  418.         bool insert(const std::string &qname, BasicProperties *bp, const std::string& body, DeliveryMode deliveryMode) // 向队列新增消息
  419.         {
  420.             QueueMessage::ptr qmp;
  421.             {
  422.                 std::unique_lock<std::mutex> lock(_mutex);
  423.                 // 该队列是否不存在
  424.                 auto it = _queueMessages.find(qname);
  425.                 if (it == _queueMessages.end()) // 不存在
  426.                 {
  427.                     ELOG("向队列%s新增消息失败,因为该队列不存在",qname.c_str());
  428.                     return false;
  429.                 }
  430.                 qmp=it->second;
  431.             }
  432.             qmp->insert(bp,body,deliveryMode);
  433.             return true;
  434.         }
  435.         MessagePtr front(const std::string &qname)// 获取队首消息
  436.         {
  437.             QueueMessage::ptr qmp;
  438.             {
  439.                 std::unique_lock<std::mutex> lock(_mutex);
  440.                 // 该队列是否不存在
  441.                 auto it = _queueMessages.find(qname);
  442.                 if (it == _queueMessages.end()) // 不存在
  443.                 {
  444.                     ELOG("从队列%s获取队首消息失败,因为该队列不存在",qname.c_str());
  445.                     return MessagePtr();
  446.                 }
  447.                 qmp=it->second;
  448.             }
  449.             return qmp->front();
  450.         }
  451.         bool ack(const std::string &qname, const std::string &id)// 确认消息
  452.         {
  453.             QueueMessage::ptr qmp;
  454.             {
  455.                 std::unique_lock<std::mutex> lock(_mutex);
  456.                 // 该队列是否不存在
  457.                 auto it = _queueMessages.find(qname);
  458.                 if (it == _queueMessages.end()) // 不存在
  459.                 {
  460.                     ELOG("向队列%s确认消息失败,因为该队列不存在",qname.c_str());
  461.                     return false;
  462.                 }
  463.                 qmp=it->second;
  464.             }
  465.             qmp->erase(id);
  466.             return true;
  467.         }
  468.         size_t pushCount(const std::string& qname)
  469.         {
  470.             QueueMessage::ptr qmp;
  471.             {
  472.                 std::unique_lock<std::mutex> lock(_mutex);
  473.                 // 该队列是否不存在
  474.                 auto it = _queueMessages.find(qname);
  475.                 if (it == _queueMessages.end()) // 不存在
  476.                 {
  477.                     ELOG("向队列%s获取待发送消息数量失败,因为该队列不存在",qname.c_str());
  478.                     return 0;
  479.                 }
  480.                 qmp=it->second;
  481.             }
  482.             return qmp->pushCount();
  483.         }
  484.         size_t durableCount(const std::string& qname)
  485.         {
  486.             QueueMessage::ptr qmp;
  487.             {
  488.                 std::unique_lock<std::mutex> lock(_mutex);
  489.                 // 该队列是否不存在
  490.                 auto it = _queueMessages.find(qname);
  491.                 if (it == _queueMessages.end()) // 不存在
  492.                 {
  493.                     ELOG("向队列%s获取持久化消息数量失败,因为该队列不存在",qname.c_str());
  494.                     return 0;
  495.                 }
  496.                 qmp=it->second;
  497.             }
  498.             return qmp->durableCount();
  499.         }
  500.         size_t waitAckCount(const std::string& qname)
  501.         {
  502.             QueueMessage::ptr qmp;
  503.             {
  504.                 std::unique_lock<std::mutex> lock(_mutex);
  505.                 // 该队列是否不存在
  506.                 auto it = _queueMessages.find(qname);
  507.                 if (it == _queueMessages.end()) // 不存在
  508.                 {
  509.                     ELOG("向队列%s获取等待确认消息数量失败,因为该队列不存在",qname.c_str());
  510.                     return 0;
  511.                 }
  512.                 qmp=it->second;
  513.             }
  514.             return qmp->waitAckCount();
  515.         }
  516.         size_t validCount(const std::string& qname)
  517.         {
  518.             QueueMessage::ptr qmp;
  519.             {
  520.                 std::unique_lock<std::mutex> lock(_mutex);
  521.                 // 该队列是否不存在
  522.                 auto it = _queueMessages.find(qname);
  523.                 if (it == _queueMessages.end()) // 不存在
  524.                 {
  525.                     ELOG("向队列%s获取有效持久化消息数量失败,因为该队列不存在",qname.c_str());
  526.                     return 0;
  527.                 }
  528.                 qmp=it->second;
  529.             }
  530.             return qmp->validCount();
  531.         }
  532.         size_t totalCount(const std::string& qname)
  533.         {
  534.             QueueMessage::ptr qmp;
  535.             {
  536.                 std::unique_lock<std::mutex> lock(_mutex);
  537.                 // 该队列是否不存在
  538.                 auto it = _queueMessages.find(qname);
  539.                 if (it == _queueMessages.end()) // 不存在
  540.                 {
  541.                     ELOG("向队列%s获取全部持久化消息数量失败,因为该队列不存在",qname.c_str());
  542.                     return 0;
  543.                 }
  544.                 qmp=it->second;
  545.             }
  546.             return qmp->totalCount();
  547.         }
  548.     private:
  549.         std::mutex _mutex;
  550.         std::unordered_map<std::string, QueueMessage::ptr> _queueMessages;
  551.         std::string _basedir;
  552.     };
  553. };
复制代码
      
                       8、假造机管理

     假造机模块是对上述三个数据管理模块的整合,并基于数据之间的关联关系进⾏团结操作。
     界说假造机类包含以下成员:
     
      假造机包含操作:
     
     假造机管理操作:可以有多台假造机,项目中并没有实现。
     
                                   登录后复制                        
  1. #pragma once
  2. #include "binding.hpp"
  3. #include "exchange.hpp"
  4. #include "message.hpp"
  5. #include "messageQueue.hpp"
  6. namespace lsxmq
  7. {
  8.     class VirtualHost//虚拟机
  9.     {
  10.     public:
  11.         using ptr=std::shared_ptr<VirtualHost>;
  12.         VirtualHost(const std::string& hname,const std::string& dbfile,const std::string& basedir)
  13.             :_hostName(hname)
  14.             ,_bmp(std::make_shared<BindingManager>(dbfile))
  15.             ,_emp(std::make_shared<ExchangeManager>(dbfile))
  16.             ,_qmmp(std::make_shared<QueueMessageManager>(basedir))
  17.             ,_mqmp(std::make_shared<MessageQueueManager>(dbfile))
  18.         {
  19.             //恢复队列消息
  20.             //1.获取所有队列
  21.             MessageQueueMap mqmap=_mqmp->getAll();
  22.             //2.用队列名称初始化队列消息
  23.             for(auto& it:mqmap)
  24.             {
  25.                 _qmmp->initQueueMessage(it.second->name);
  26.             }
  27.         }
  28.         bool declareExchange(const std::string& name,ExchangeType type
  29.                 ,bool durable,bool autoDel
  30.                 ,std::unordered_map<std::string,std::string> args)//声明交换机
  31.         {
  32.             return _emp->declareExchange(name,type,durable,autoDel,args);
  33.         }
  34.         bool deleteExchange(const std::string& ename)//删除交换机
  35.         {
  36.             bool ret=_emp->deleteExchange(ename);
  37.             if(ret==false)
  38.             {
  39.                 return false;
  40.             }
  41.             //删除交换机对应的绑定
  42.             return _bmp->deleteExchangeBinding(ename);
  43.         }
  44.         bool existsExchange(const std::string& ename)
  45.         {
  46.             return _emp->exists(ename);
  47.         }
  48.         bool declareMessageQueue(const std::string &name, bool durable, bool exclusive
  49.                 , bool autoDel, std::unordered_map<std::string, std::string> args)//声明消息队列
  50.         {
  51.             bool ret=_mqmp->declareMessageQueue(name,durable,exclusive,autoDel,args);
  52.             if(ret==false)
  53.             {
  54.                 return false;
  55.             }
  56.             _qmmp->initQueueMessage(name);
  57.             return true;
  58.         }
  59.         bool deleteMessageQueue(const std::string &qname)//删除消息队列
  60.         {
  61.             auto ret=_mqmp->deleteMessageQueue(qname);
  62.             if(ret==false)
  63.             {
  64.                 return false;
  65.             }
  66.             //删除消息队列对应的绑定
  67.             ret=_bmp->deleteMessageQueueBinding(qname);
  68.             if(ret==false)
  69.             {
  70.                 return false;
  71.             }
  72.             //删除对应的队列消息
  73.             _qmmp->destroyQueueMessage(qname);
  74.             return true;
  75.         }
  76.         bool existsMessageQueue(const std::string& qname)
  77.         {
  78.             return _mqmp->exists(qname);
  79.         }
  80.         bool bind(const std::string& ename,const std::string& qname,const std::string& binding_key)//绑定
  81.         {
  82.             //交换机是否存在
  83.             Exchange::ptr ep=_emp->selectExchange(ename);
  84.             if(ep.get()==nullptr)
  85.             {
  86.                 ELOG("%s与%s绑定失败,因为交换机不存在",ename.c_str(),qname.c_str());
  87.                 return false;
  88.             }
  89.             //队列是否存在
  90.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  91.             if(mqp.get()==nullptr)
  92.             {
  93.                 ELOG("%s与%s绑定失败,因为队列不存在",ename.c_str(),qname.c_str());
  94.                 return false;
  95.             }
  96.             //根据交换机与队列的持久化方式决定绑定是否持久化
  97.             return _bmp->bind(ename,qname,binding_key,ep->durable&&mqp->durable);
  98.         }
  99.         bool unbind(const std::string& ename,const std::string& qname)//解绑
  100.         {
  101.              //交换机是否存在
  102.             Exchange::ptr ep=_emp->selectExchange(ename);
  103.             if(ep.get()==nullptr)
  104.             {
  105.                 ELOG("%s与%s解绑失败,因为交换机不存在",ename.c_str(),qname.c_str());
  106.                 return false;
  107.             }
  108.             //队列是否存在
  109.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  110.             if(mqp.get()==nullptr)
  111.             {
  112.                 ELOG("%s与%s解绑失败,因为队列不存在",ename.c_str(),qname.c_str());
  113.                 return false;
  114.             }
  115.             return _bmp->unbind(ename,qname);
  116.         }
  117.         MessageQueueBindingMap exchangeBindings(const std::string& ename)//获取一台交换机的所有绑定
  118.         {
  119.             return _bmp->getExchangeBinding(ename);
  120.         }
  121.         bool existsBinding(const std::string &ename, const std::string &qname)
  122.         {
  123.             return _bmp->exists(ename,qname);
  124.         }
  125.         bool basicPublish(const std::string &qname, BasicProperties *bp
  126.                 , const std::string& body)//发布一条消息
  127.         {
  128.             MessageQueue::ptr mqp=_mqmp->selectMessageQueue(qname);
  129.             return _qmmp->insert(qname,bp,body,mqp->durable);
  130.         }
  131.         MessagePtr basicConsume(const std::string& qname)//获取一条消息
  132.         {
  133.             return _qmmp->front(qname);
  134.         }
  135.         bool basicAck(const std::string &qname, const std::string &id)//确认一条消息
  136.         {
  137.             return _qmmp->ack(qname,id);
  138.         }
  139.         
  140.         void clear()
  141.         {
  142.             _bmp->clear();
  143.             _emp->clear();
  144.             _qmmp->clear();
  145.             _mqmp->clear();
  146.         }
  147.     private:
  148.         std::string _hostName;//虚拟机名称
  149.         BindingManager::ptr _bmp;//绑定管理句柄
  150.         ExchangeManager::ptr _emp;//交换机管理句柄
  151.         QueueMessageManager::ptr _qmmp;//队列消息管理句柄
  152.         MessageQueueManager::ptr _mqmp;//消息队列管理句柄
  153.     };
  154. };
复制代码
      
                       9、交换机路由模块

     交换路由规则取决要素:
               路由交换模块:
               binding_key:
     是由数字字⺟下划线构成的, 并且使⽤ . 分成若⼲部门,并⽀持 * 和 # 通配符。
     例如: news.music.# ,这⽤于表⽰交换机绑定的当前队列是⼀个⽤于发布⾳乐消息的队列。
     
     
     
     
     routing_key:
     是由数据、字⺟和下划线构成, 并且可以使⽤ . 分别成若⼲部门。
     例如: news.music.pop ,这⽤于表⽰当前发布的消息是⼀个流⾏⾳乐的消息。
     ⽐如,在进⾏队列绑定时,某队列的binding_key约定为:news.music.#表⽰这个队列⽤于发布⾳乐消息。⽽这时候客⼾端发布了⼀条消息,其中routing_key为:news.music.pop 则可以匹配成功,⽽,如果发布消息的routing_key为:news.sport.football,这时候就会匹配失败。
     匹配算法头脑:
                                             登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/message.pb.h"
  4. namespace lsxmq
  5. {
  6.     class Router
  7.     {
  8.     public:
  9.         static bool routingKeyIsLegal(const std::string& routingKey)//判断routing_key是否合法
  10.         {
  11.             //只需要确定是否有非法字符即可
  12.             //合法字符:a~z、A~Z、.、_
  13.             for(auto& ch:routingKey)
  14.             {
  15.                 if((ch>='a'&&ch<='z')
  16.                     ||(ch>='A'&&ch<='Z')
  17.                     ||(ch>='0'&&ch<='9')
  18.                     ||ch=='_'||ch=='.')
  19.                 {
  20.                     continue;
  21.                 }
  22.                 else
  23.                 {
  24.                     return false;//不合法
  25.                 }
  26.             }
  27.             return true;
  28.         }
  29.         static bool bindingKeyIsLegal(const std::string& bindingKey)
  30.         {
  31.             //1.确定是否有非法字符即可
  32.             //合法字符:a~z、A~Z、.、_、*、#
  33.             for(auto& ch:bindingKey)
  34.             {
  35.                 if((ch>='a'&&ch<='z')
  36.                     ||(ch>='A'&&ch<='Z')
  37.                     ||(ch>='0'&&ch<='9')
  38.                     ||ch=='_'||ch=='.'
  39.                     ||ch=='#'||ch=='*')
  40.                 {
  41.                     continue;
  42.                 }
  43.                 else
  44.                 {
  45.                     return false;//不合法
  46.                 }
  47.             }
  48.             //2.#和*必须独立存在(aa.a#.b这种就是非法)
  49.             std::vector<std::string> subStr;
  50.             StrHelper::split(bindingKey,".",subStr);
  51.             for(auto& word:subStr)
  52.             {
  53.                 if(word.size()>1
  54.                     &&word.find('#')!=std::string::npos
  55.                     &&word.find('*')!=std::string::npos)
  56.                 {
  57.                     return false;
  58.                 }
  59.             }
  60.             //3.#和*不连续出现,#的前面没有#和*,*的前面没有#
  61.             for(int i=1;i<subStr.size();++i)
  62.             {
  63.                 if(subStr[i]=="#"&&subStr[i-1]=="#")
  64.                 {
  65.                     return false;
  66.                 }
  67.                 if(subStr[i]=="#"&&subStr[i-1]=="*")
  68.                 {
  69.                     return false;
  70.                 }
  71.                 if(subStr[i]=="*"&&subStr[i-1]=="#")
  72.                 {
  73.                     return false;
  74.                 }
  75.             }
  76.             return true;
  77.         }
  78.         static bool route(const std::string& routingKey,const std::string& bindingKey,ExchangeType routeType)
  79.         {
  80.             if(routeType==DIRECT)//直接路由
  81.             {
  82.                 return routingKey==bindingKey;   
  83.             }
  84.             if(routeType==FANOUT)//广播路由
  85.             {
  86.                 return true;
  87.             }
  88.             if(routeType==TOPIC)//主题路由
  89.             {
  90.                 return topicRoute(routingKey,bindingKey);
  91.             }
  92.             ELOG("位置交换机类型:%d",routeType);
  93.             return false;
  94.         }
  95.     private:
  96.         static bool topicRoute(const std::string& routingKey,const std::string& bindingKey)
  97.         {
  98.             //1.先分割字符串
  99.             std::vector<std::string> subBinding;
  100.             StrHelper::split(bindingKey,".",subBinding);
  101.             std::vector<std::string> subRouting;
  102.             StrHelper::split(routingKey,".",subRouting);
  103.             //2.构造dp表
  104.             int row=subBinding.size(),col=subRouting.size();//行列
  105.             std::vector<std::vector<int>> dp(row+1,std::vector<int>(col+1,0));
  106.             dp[0][0]=1;
  107.             if(subBinding[0]=="#") dp[1][0]=1;//当binding_key以#起始时,需要将#对应行的第0列置为1
  108.             //3.开始匹配
  109.             for(int i=1;i<row+1;++i)
  110.             {
  111.                 for(int j=1;j<col+1;++j)
  112.                 {
  113.                     if(subBinding[i-1]=="#")//遇到#通配符匹配成功时,可以从左上方、左方、上方继承
  114.                     {
  115.                         dp[i][j]=dp[i-1][j-1]||dp[i-1][j]||dp[i][j-1];
  116.                     }
  117.                     else if(subBinding[i-1]=="*"||(subBinding[i-1]==subRouting[j-1]))
  118.                     {
  119.                         //当两个单词匹配成功时,从左上方继承结果
  120.                         dp[i][j]=dp[i-1][j-1];
  121.                     }
  122.                 }
  123.             }
  124.             return dp[row][col];
  125.         }
  126.     };
  127. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
                       10、队列消费者/订阅者管理

     

     界说消费者信息结构:
     

  • 消费者表现
  • 订阅的队列名称
  • 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,怎样消费?对于服务端来说就是调用这个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)
  • 是否自动应答标志(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等候客户端回应)
     消费者的管理以信道为单元照旧以队列为单元?
     

  • 以信道为单元:一个信道关闭时全部关联的消费者都要删除
  • 以队列为单元:一个队列收到了一条消息,必要找到订购了队列消息的消费者进行推送
     消费者管理(以队列为单元进行管理):
     

  • 操作:
     

  • 新增消费者:信道提供的服务式订阅队列消息的时候创建
  • 删除消费者:取消订阅/信道关闭/连接关闭的时候删除
  • 获取消费者:从队列中全部的消费者中按序取出一个消费者进行消息推送
  • 判定队列消费者是否为空
  • 判定指定消费者是否存在
  • 清理队列全部消费者
     

  • 元素
     

  • 消费者管理结构:vector
  • 轮转序号:一个队列可能会有多个消费者,但是一条消息只必要被一个消费者消费即可,因此采用RR轮转
  • 互斥锁:保证线程安全
  • 队列名称
     对消费者进行同一管理
     

  • 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)
  • 向指定队列新增消费者(客户端订阅指定队列消息的时候)新增完成时返回消费者对象,让信道能管理
  • 从指定队列移除消费者(客户端取消订阅的时候)
  • 移除指定队列的全部消费者(队列被删除时销毁)删除消费者的队列管理单元对象
  • 从指定队列获取一个消费者(轮询获取-消费者轮询消费起到负载平衡作用)
  • 判定队列消费者是否为空
  • 判定队列中指定消费者是否存在
  • 清理消费者
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. namespace lsxmq
  6. {
  7.     using ConsumerCallBack=std::function<void(const std::string&,BasicProperties*,const std::string&)>;
  8.                                                       //消费者标识     //基本属性             //消息内容
  9.     struct Consumer
  10.     {
  11.         using ptr=std::shared_ptr<Consumer>;
  12.         Consumer(){}
  13.         Consumer(const std::string& ctag,const std::string& cqname,bool cautoAck,const ConsumerCallBack& ccallback)
  14.             :tag(ctag),qname(cqname),autoAck(cautoAck),callback(ccallback)
  15.         {}
  16.         std::string tag;//消费者标识
  17.         std::string qname;//队列名称
  18.         bool autoAck;//自动确认标志
  19.         ConsumerCallBack callback;//消费回调函数
  20.     };
  21.     //以队列为单元管理消费者
  22.     class QueueConsumer
  23.     {
  24.     public:
  25.         using ptr=std::shared_ptr<QueueConsumer>;
  26.         QueueConsumer(const std::string& qname)
  27.             :_qname(qname),_rrSeq(0)
  28.         {}
  29.         Consumer::ptr create(const std::string& tag,bool autoAck,const ConsumerCallBack& callback)//创建一个消费者
  30.         {
  31.             //1.先加锁
  32.             std::unique_lock<std::mutex> lock(_mutex);
  33.             //2.判断该消费者是否已存在
  34.             for(auto& con:_consumers)
  35.             {
  36.                 if(con->tag==tag)
  37.                 {
  38.                     return con;
  39.                 }
  40.             }
  41.             //3.构造消费者
  42.             Consumer::ptr cp=std::make_shared<Consumer>(tag,_qname,autoAck,callback);
  43.             //4.添加管理并返回消费者
  44.             _consumers.push_back(cp);
  45.             return cp;
  46.         }
  47.         void remove(const std::string& tag)//删除一个消费者
  48.         {
  49.             //1.先加锁
  50.             std::unique_lock<std::mutex> lock(_mutex);
  51.             //2.查找消费者并删除
  52.             for(auto it=_consumers.begin();it!=_consumers.end();++it)
  53.             {
  54.                 if((*it)->tag==tag)
  55.                 {
  56.                     _consumers.erase(it);
  57.                     return ;
  58.                 }
  59.             }
  60.         }
  61.         Consumer::ptr choose()//选择一个消费者
  62.         {
  63.             //1.先加锁
  64.             std::unique_lock<std::mutex> lock(_mutex);
  65.             if(_consumers.size()==0)//没有消费者
  66.             {
  67.                 return Consumer::ptr();
  68.             }
  69.             //2.获取选择的消费者序号
  70.             size_t id=_rrSeq%_consumers.size();
  71.             ++_rrSeq;
  72.             //3.返回消费者
  73.             return _consumers[id];
  74.         }
  75.         bool empty()//判断队列中是否没有消费者
  76.         {
  77.             //1.先加锁
  78.             std::unique_lock<std::mutex> lock(_mutex);
  79.             return _consumers.empty();
  80.         }
  81.         bool exists(const std::string& tag)//判断消费者是否存在
  82.         {
  83.             //1.先加锁
  84.             std::unique_lock<std::mutex> lock(_mutex);
  85.             //2.判断消费者是否存在
  86.             for(auto& con:_consumers)
  87.             {
  88.                 if(con->tag==tag)
  89.                 {
  90.                     return true;
  91.                 }
  92.             }
  93.             return false;
  94.         }
  95.         void clear()//清空
  96.         {
  97.             //1.先加锁
  98.             std::unique_lock<std::mutex> lock(_mutex);
  99.             _consumers.clear();
  100.         }
  101.     private:
  102.         std::vector<Consumer::ptr> _consumers;
  103.         size_t _rrSeq;
  104.         std::mutex _mutex;
  105.         std::string _qname;
  106.     };
  107.     class QueueConsumerManager
  108.     {
  109.     public:
  110.         using ptr=std::shared_ptr<QueueConsumerManager>;
  111.         QueueConsumerManager(){}
  112.         void initQueueConsumer(const std::string& qname)//初始化队列
  113.         {
  114.             //1.加锁
  115.             std::unique_lock<std::mutex> lock(_mutex);
  116.             //2.是否已存在
  117.             auto it=_queueConsumers.find(qname);
  118.             if(it!=_queueConsumers.end())
  119.             {
  120.                 return ;
  121.             }
  122.             //2.构造队列
  123.             QueueConsumer::ptr qcp=std::make_shared<QueueConsumer>(qname);
  124.             //3.添加管理
  125.             _queueConsumers.insert(std::make_pair(qname,qcp));
  126.         }
  127.         void destroyQueueConsumer(const std::string& qname)//销毁队列
  128.         {
  129.             //1.加锁
  130.             std::unique_lock<std::mutex> lock(_mutex);
  131.             //2.是否不存在
  132.             auto it=_queueConsumers.find(qname);
  133.             if(it==_queueConsumers.end())
  134.             {
  135.                 return ;
  136.             }
  137.             //3.删除队列消费者
  138.             //it->second->clear();//不需要,智能指针指向的对象会自动释放
  139.             _queueConsumers.erase(qname);
  140.         }
  141.         Consumer::ptr create(const std::string& qname,const std::string& tag
  142.                 ,bool autoAck,const ConsumerCallBack& callback)//往某个队列创建消费者
  143.         {
  144.             QueueConsumer::ptr qcp;
  145.             {
  146.                  //1.加锁
  147.                 std::unique_lock<std::mutex> lock(_mutex);
  148.                 //2.判断队列是否不存在
  149.                 auto it=_queueConsumers.find(qname);
  150.                 if(it==_queueConsumers.end())
  151.                 {
  152.                     ELOG("向队列%s添加消费者%s失败,因为该队列不存在",qname.c_str(),tag.c_str());
  153.                     return Consumer::ptr();
  154.                 }
  155.                 qcp=it->second;
  156.             }
  157.             //3.通过队列消费者句柄创建消费者
  158.             return qcp->create(tag,autoAck,callback);
  159.         }
  160.         void remove(const std::string& qname,const std::string& tag)//从某个队列中删除一个消费者
  161.         {
  162.             QueueConsumer::ptr qcp;
  163.             {
  164.                  //1.加锁
  165.                 std::unique_lock<std::mutex> lock(_mutex);
  166.                 //2.判断队列是否不存在
  167.                 auto it=_queueConsumers.find(qname);
  168.                 if(it==_queueConsumers.end())
  169.                 {
  170.                     ELOG("从队列%s删除消费者%s失败,因为该队列不存在",qname.c_str(),tag.c_str());
  171.                     return ;
  172.                 }
  173.                 qcp=it->second;
  174.             }
  175.             //3.通过队列消费者句柄创建消费者
  176.             qcp->remove(tag);
  177.         }
  178.         Consumer::ptr choose(const std::string& qname)//选择一个消费者
  179.         {
  180.             QueueConsumer::ptr qcp;
  181.             {
  182.                  //1.加锁
  183.                 std::unique_lock<std::mutex> lock(_mutex);
  184.                 //2.判断队列是否不存在
  185.                 auto it=_queueConsumers.find(qname);
  186.                 if(it==_queueConsumers.end())
  187.                 {
  188.                     ELOG("从队列%s获取消费者失败,因为该队列不存在",qname.c_str());
  189.                     return Consumer::ptr();
  190.                 }
  191.                 qcp=it->second;
  192.             }
  193.             //3.通过队列消费者句柄创建消费者
  194.             return qcp->choose();
  195.         }
  196.         bool empty(const std::string& qname)
  197.         {
  198.             QueueConsumer::ptr qcp;
  199.             {
  200.                  //1.加锁
  201.                 std::unique_lock<std::mutex> lock(_mutex);
  202.                 //2.判断队列是否不存在
  203.                 auto it=_queueConsumers.find(qname);
  204.                 if(it==_queueConsumers.end())
  205.                 {
  206.                     ELOG("从队列%s获取是否为空失败,因为该队列不存在",qname.c_str());
  207.                     return false;
  208.                 }
  209.                 qcp=it->second;
  210.             }
  211.             //3.通过队列消费者句柄创建消费者
  212.             return qcp->empty();
  213.             
  214.         }
  215.         bool exists(const std::string& qname,const std::string& tag)
  216.         {
  217.             QueueConsumer::ptr qcp;
  218.             {
  219.                  //1.加锁
  220.                 std::unique_lock<std::mutex> lock(_mutex);
  221.                 //2.判断队列是否不存在
  222.                 auto it=_queueConsumers.find(qname);
  223.                 if(it==_queueConsumers.end())
  224.                 {
  225.                     ELOG("从队列%s获取消费者失败,因为该队列不存在",qname.c_str());
  226.                     return false;
  227.                 }
  228.                 qcp=it->second;
  229.             }
  230.             //3.通过队列消费者句柄创建消费者
  231.             return qcp->exists(tag);
  232.         }
  233.         void clear()
  234.         {
  235.             //1.加锁
  236.             std::unique_lock<std::mutex> lock(_mutex);
  237.             _queueConsumers.clear();
  238.         }
  239.     private:
  240.         std::mutex _mutex;
  241.         std::unordered_map<std::string,QueueConsumer::ptr> _queueConsumers;
  242.     };
  243. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
                       11、信道管理模块

     管理信息:
     

  • 信道id:信道的唯一标识
  • 信道关联的消费者:用于消费者信道在关闭时取消订阅,删除订阅者信息
  • 信道关联的连接:用于向客户端发送数据(相应/推送消息)
  • protobuf协议的处理句柄:网络通讯前的协议处理
  • 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息
  • 假造机句柄:交换机/队列/绑定/消息数据管理
  • 工作线程池句柄:一条消息被发布到队列后,必要将消息推送给订阅了对应队列的消费者,过程由线程池完成。
     管理操作:
     

  • 提供声明&删除交换机操作(删除交换机的同时删除交换构造联的绑定信息)
  • 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
  • 提供绑定&解绑队列操作
  • 提供订阅&取消订阅队列消息操作
  • 提供发布&确认消息操作
     信道管理:
     

  • 信道的增删查
     应用层协议设计:
                                   登录后复制                        
  1. syntax= "proto3";
  2. package lsxmq;
  3. import "message.proto";
  4. //信道的打开与关闭
  5. message OpenChannelRequest  //打开信道
  6. {
  7.     string rid = 1;  //请求id,对应不同的请求类型
  8.     string cid = 2;  //信道id
  9. };
  10. message CloseChannelRequest  //关闭信道
  11. {
  12.     string rid = 1;
  13.     string cid = 2;
  14. };
  15. //交换机的声明与删除
  16. message DeclareExchangeRequest  //声明交换机
  17. {
  18.     string rid = 1;
  19.     string cid = 2;
  20.     string exchange_name = 3;  //交换机名称
  21.     ExchangeType exchange_type = 4;  //交换机类型
  22.     bool durable = 5;  //持久化
  23.     bool auto_delete = 6;  //自动删除
  24.     map<string, string> args = 7;  //其它参数
  25. };
  26. message DeleteExchangeRequest  //删除交换机
  27. {
  28.     string rid = 1;
  29.     string cid = 2;
  30.     string exchange_name = 3;
  31. };
  32. //消息队列的声明与删除
  33. message DeclareMessageQueueRequest  //声明消息队列
  34. {
  35.     string rid = 1;
  36.     string cid = 2;
  37.     string queue_name = 3;  //队列名称
  38.     bool exclusive = 4;  //是否独占
  39.     bool durable = 5;  
  40.     bool auto_delete = 6;
  41.     map<string, string> args = 7;
  42. };
  43. message DeleteMessageQueueRequest  //删除消息队列
  44. {
  45.     string rid = 1;
  46.     string cid = 2;
  47.     string queue_name = 3;
  48. };
  49. //绑定与解绑
  50. message BindRequest  //绑定
  51. {
  52.     string rid = 1;
  53.     string cid = 2;
  54.     string exchange_name = 3;
  55.     string queue_name = 4;
  56.     string binding_key = 5;
  57. };
  58. message UnbindRequest  //解绑
  59. {
  60.     string rid = 1;
  61.     string cid = 2;
  62.     string exchange_name = 3;
  63.     string queue_name = 4;
  64. };
  65. //订阅与取消订阅
  66. message BasicConsumeRequest  //订阅
  67. {
  68.     string rid = 1;
  69.     string cid = 2;
  70.     string consumer_tag  =3;  //消费者标识
  71.     string queue_name = 4;
  72.     bool auto_ack = 5;  //是否自动确认
  73. };
  74. message BasicCancelRequest  //取消订阅
  75. {
  76.     string rid = 1;
  77.     string cid = 2;
  78.     string consumer_tag = 3;
  79.     string queue_name = 4;
  80. };
  81. //发布与确认消息
  82. message BasicPublishRequest //发布消息
  83. {
  84.     string rid = 1;
  85.     string cid = 2;
  86.     string exchange_name = 3;
  87.     string body = 4;  //消息内容
  88.     BasicProperties properties = 5;  //消息基本属性
  89. };
  90. message BasicAckRequest   //确认消息
  91. {
  92.     string rid = 1;
  93.     string cid = 2;
  94.     string queue_name = 3;
  95.     string mid = 4;
  96. };
  97. //消息的推送
  98. message basicConsumeResponse {
  99.     string cid = 1;
  100.     string consumer_tag = 2;
  101.     string body = 3;
  102.     BasicProperties properties = 4;
  103. };
  104. //通用响应
  105. message BasicCommonResponse
  106. {
  107.     string rid = 1;
  108.     string cid = 2;
  109.     bool ok = 3;
  110. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
                       12、连接管理模块

     向⽤⼾提供⼀个⽤于实现⽹络通讯的Connection对象,从其内部可创建出粒度更轻的Channel对象,⽤于与客⼾端进⾏⽹络通讯。
     成员信息:
     

  • 连接关联的信道管理句柄(实现信道的增删查)
  • 连接关联的现实⽤于通讯的muduo::net::Connection连接
  • protobuf协议处理的句柄(ProtobufCodec对象)
  • 消费者管理句柄
  • 假造机句柄
  • 异步⼯作线程池句柄
     连接操作:
     

  • 提供创建Channel信道的操作
  • 提供删除Channel信道的操作
     连接管理:
     

  • 连接的增删查
                                   登录后复制                        
  1. #pragma once
  2. #include "channel.hpp"
  3. namespace lsxmq
  4. {
  5.     class Connection
  6.     {
  7.     public:
  8.         using ptr=std::shared_ptr<Connection>;
  9.         Connection(const muduo::net::TcpConnectionPtr &conn
  10.             , const ProtobufCodecPtr &codec
  11.             , const QueueConsumerManager::ptr &qcmp
  12.             , const VirtualHost::ptr &vhp
  13.             , const ThreadPool::ptr &pool)
  14.             : _conn(conn)
  15.             , _codec(codec)
  16.             , _qcmp(qcmp)
  17.             , _vhp(vhp)
  18.             , _pool(pool)
  19.             , _chmp(std::make_shared<ChannelManager>())
  20.         {
  21.             DLOG("new Connection: %p",this);
  22.         }
  23.         ~Connection()
  24.         {
  25.             DLOG("delete Connection: %p",this);
  26.         }
  27.         void openChannel(const OpenChannelRequestPtr &req)
  28.         {
  29.             // 1.判断信道是否已打开
  30.             bool ret = _chmp->openChannel(req->cid(),_conn,_codec,_qcmp,_vhp,_pool);
  31.             if(ret==false)
  32.             {
  33.                 basicResponse(req->rid(),req->cid(),false);
  34.                 return;
  35.             }
  36.             //2.响应
  37.             basicResponse(req->rid(),req->cid(),true);
  38.         }
  39.         void closeChannel(const CloseChannelRequestPtr &req)
  40.         {
  41.             _chmp->closeChannel(req->cid());
  42.             basicResponse(req->rid(),req->cid(),true);
  43.         }
  44.         Channel::ptr getChannel(const std::string& cid)
  45.         {
  46.             return _chmp->getChannel(cid);
  47.         }
  48.     private:
  49.         void basicResponse(const std::string &rid, const std::string &cid, bool ok)
  50.         {
  51.             BasicCommonResponse rsp;
  52.             rsp.set_rid(rid);
  53.             rsp.set_cid(cid);
  54.             rsp.set_ok(ok);
  55.             _codec->send(_conn, rsp);
  56.         }
  57.     private:
  58.         muduo::net::TcpConnectionPtr _conn; // 关联的连接
  59.         ProtobufCodecPtr _codec;            // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
  60.         QueueConsumerManager::ptr _qcmp;    // 消费者管理句柄
  61.         VirtualHost::ptr _vhp;              // 虚拟机管理句柄
  62.         ThreadPool::ptr _pool;              // 线程池管理句柄
  63.         ChannelManager::ptr _chmp;          // 信道管理句柄
  64.     };
  65.     class ConnectionManager
  66.     {
  67.     public:
  68.         using ptr=std::shared_ptr<ConnectionManager>;
  69.         ConnectionManager()
  70.         {
  71.             DLOG("new ConnectionManager: %p",this);
  72.         }
  73.         ~ConnectionManager()
  74.         {
  75.             DLOG("delete ConnectionManager: %p",this);
  76.         }
  77.         void newConnection(const muduo::net::TcpConnectionPtr &conn
  78.             , const ProtobufCodecPtr &codec
  79.             , const QueueConsumerManager::ptr &qcmp
  80.             , const VirtualHost::ptr &vhp
  81.             , const ThreadPool::ptr &pool)
  82.         {
  83.             std::unique_lock<std::mutex> lock(_mutex);
  84.             //连接是否已存在
  85.             auto it=_conns.find(conn);
  86.             if(it!=_conns.end())
  87.             {
  88.                 return;
  89.             }
  90.             //构建对象
  91.             Connection::ptr cop=std::make_shared<Connection>(conn,codec,qcmp,vhp,pool);
  92.             //添加管理
  93.             _conns[conn]=cop;
  94.         }
  95.         
  96.         void deleteConnection(const muduo::net::TcpConnectionPtr &conn)
  97.         {
  98.             std::unique_lock<std::mutex> lock(_mutex);
  99.             //连接是否不存在
  100.             auto it=_conns.find(conn);
  101.             if(it==_conns.end())
  102.             {
  103.                 return;
  104.             }
  105.             //删除
  106.             _conns.erase(it);
  107.         }
  108.         Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn)
  109.         {
  110.             std::unique_lock<std::mutex> lock(_mutex);
  111.             //连接是否不存在
  112.             auto it=_conns.find(conn);
  113.             if(it==_conns.end())
  114.             {
  115.                 return Connection::ptr();
  116.             }
  117.             return it->second;
  118.         }
  119.     private:
  120.         std::mutex _mutex;
  121.         std::unordered_map<muduo::net::TcpConnectionPtr,Connection::ptr> _conns;
  122.     };
  123. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
                       13、服务器模块

     服务器模块借助muduo网络库来实现
     

  • _server:Muduo库提供的⼀个通⽤TCP服务器, 可以封装这个服务器进⾏TCP通讯
  • _baseloop:主事件循环器, ⽤于相应IO事件和定时器事件,主loop主要是为了相应监听描述符的IO事件
  • _codec: ⼀个protobuf编解码器, 在TCP服务器上设计了⼀层应⽤层协议, 这个编解码器主要就是负责实现应⽤层协议的解析和封装, 下边详细讲解
  • _dispatcher:⼀个消息分发器, 当Socket接收到⼀个报⽂消息后, 我必要按照消息的范例, 即上⾯提到的typeName进⾏消息分发, 会不不同范例的消息分发相对应的的处理函数中,下边详细讲解
  • _consumer: 服务器中的消费者信息管理句柄。
  • _threadpool: 异步⼯作线程池,主要⽤于队列消息的推送⼯作。
  • _connections: 连接管理句柄,管理当前服务器上的全部已经建⽴的通讯连接。
  • _virtual_host:服务器持有的假造主机。 队列、交换机 、绑定、消息等数据都是通过假造主机管理。
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqthird/include/muduo/proto/codec.h"
  3. #include "../mqthird/include/muduo/proto/dispatcher.h"
  4. #include "../mqthird/include/muduo/base/Logging.h"
  5. #include "../mqthird/include/muduo/base/Mutex.h"
  6. #include "../mqthird/include/muduo/net/EventLoop.h"
  7. #include "../mqthird/include/muduo/net/TcpServer.h"
  8. #include "connection.hpp"
  9. namespace lsxmq
  10. {
  11.     //typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  12.     #define DBFILE "/meta.db"
  13.     #define HOSTNAME "lsxhost"
  14.     class Server
  15.     {
  16.     public:
  17.         Server(uint16_t port,const std::string& basedir)
  18.             : _server(&_loop, muduo::net::InetAddress(port)
  19.             , "ProtobufServer", muduo::net::TcpServer::kReusePort)
  20.             , _dispatcher(std::bind(&Server::onUnknownMessage
  21.                     , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))
  22.             , _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher
  23.                     , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
  24.             ,_consumerManager(std::make_shared<QueueConsumerManager>())
  25.             ,_virtualHost(std::make_shared<VirtualHost>(HOSTNAME,basedir+DBFILE,basedir))
  26.             ,_threadPool(std::make_shared<ThreadPool>())
  27.             ,_connectionManager(std::make_shared<ConnectionManager>())
  28.         {
  29.             //需要初始化队列的消费者结构,因为消息队列是持久化的,构造时会自动加载,但消费者的队列结构不会
  30.             //1.获取所有队列
  31.             auto mqmap=_virtualHost->allQueue();
  32.             for(auto& mqp:mqmap)
  33.             {
  34.                 _consumerManager->initQueueConsumer(mqp.second->name);
  35.             }
  36.             // 注册业务处理请求函数
  37.             _dispatcher.registerMessageCallback<OpenChannelRequest>(std::bind(&Server::onOpenChannel
  38.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  39.             _dispatcher.registerMessageCallback<CloseChannelRequest>(std::bind(&Server::onCloseChannel
  40.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  41.             _dispatcher.registerMessageCallback<DeclareExchangeRequest>(std::bind(&Server::onDeclareExchange
  42.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  43.             _dispatcher.registerMessageCallback<DeleteExchangeRequest>(std::bind(&Server::onDeleteExchange
  44.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  45.             _dispatcher.registerMessageCallback<DeclareMessageQueueRequest>(std::bind(&Server::onDeclareMessageQueue
  46.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  47.             _dispatcher.registerMessageCallback<DeleteMessageQueueRequest>(std::bind(&Server::onDeleteMessageQueue
  48.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  49.             _dispatcher.registerMessageCallback<BindRequest>(std::bind(&Server::onBind
  50.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  51.             _dispatcher.registerMessageCallback<UnbindRequest>(std::bind(&Server::onUnbind
  52.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  53.             _dispatcher.registerMessageCallback<BasicConsumeRequest>(std::bind(&Server::onBasicConsume
  54.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  55.             _dispatcher.registerMessageCallback<BasicCancelRequest>(std::bind(&Server::onBasicCancel
  56.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  57.             _dispatcher.registerMessageCallback<BasicPublishRequest>(std::bind(&Server::onBasicPublish
  58.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  59.             _dispatcher.registerMessageCallback<BasicAckRequest>(std::bind(&Server::onBasicAckRequest
  60.                 , this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  61.             //注册服务器的回调函数
  62.             _server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
  63.             _server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),
  64.                                                  std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  65.         }
  66.         void start()
  67.         {
  68.             _server.start();
  69.             _loop.loop();
  70.         }
  71.     private:
  72.         //打开信道
  73.         void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const OpenChannelRequestPtr &message, muduo::Timestamp)
  74.         {
  75.             //获取连接
  76.             auto selfConnP=_connectionManager->getConnection(conn);
  77.             if(selfConnP.get()==nullptr)
  78.             {
  79.                 ELOG("打开信道失败,因为连接不存在");
  80.                 return ;
  81.             }
  82.             selfConnP->openChannel(message);
  83.         }
  84.         //关闭信道
  85.         void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const CloseChannelRequestPtr &message, muduo::Timestamp)
  86.         {
  87.             //获取连接
  88.             auto selfConnP=_connectionManager->getConnection(conn);
  89.             if(selfConnP.get()==nullptr)
  90.             {
  91.                 ELOG("关闭信道失败,因为连接不存在");
  92.                 return ;
  93.             }
  94.             selfConnP->closeChannel(message);
  95.         }
  96.         //声明交换机
  97.         void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const DeclareExchangeRequestPtr &message, muduo::Timestamp)
  98.         {
  99.             //1.获取连接
  100.             auto selfConnP=_connectionManager->getConnection(conn);
  101.             if(selfConnP.get()==nullptr)
  102.             {
  103.                 ELOG("声明交换机失败,因为连接不存在");
  104.                 return ;
  105.             }
  106.             //2.获取信道
  107.             auto selfChannelP=selfConnP->getChannel(message->cid());
  108.             if(selfChannelP.get()==nullptr)
  109.             {
  110.                 ELOG("声明交换机失败,因为信道不存在");
  111.                 return ;
  112.             }
  113.             //声明交换机
  114.             selfChannelP->declareExchange(message);
  115.         }
  116.         //删除交换机
  117.         void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const DeleteExchangeRequestPtr &message, muduo::Timestamp)
  118.         {
  119.             //1.获取连接
  120.             auto selfConnP=_connectionManager->getConnection(conn);
  121.             if(selfConnP.get()==nullptr)
  122.             {
  123.                 ELOG("删除交换机失败,因为连接不存在");
  124.                 return ;
  125.             }
  126.             //2.获取信道
  127.             auto selfChannelP=selfConnP->getChannel(message->cid());
  128.             if(selfChannelP.get()==nullptr)
  129.             {
  130.                 ELOG("删除交换机失败,因为信道不存在");
  131.                 return ;
  132.             }
  133.             //删除交换机
  134.             selfChannelP->deleteExchange(message);
  135.         }
  136.         //声明消息队列
  137.         void onDeclareMessageQueue(const muduo::net::TcpConnectionPtr &conn, const DeclareMessageQueueRequestPtr &message, muduo::Timestamp)
  138.         {
  139.             //1.获取连接
  140.             auto selfConnP=_connectionManager->getConnection(conn);
  141.             if(selfConnP.get()==nullptr)
  142.             {
  143.                 ELOG("声明消息队列失败,因为连接不存在");
  144.                 return ;
  145.             }
  146.             //2.获取信道
  147.             auto selfChannelP=selfConnP->getChannel(message->cid());
  148.             if(selfChannelP.get()==nullptr)
  149.             {
  150.                 ELOG("声明消息队列失败,因为信道不存在");
  151.                 return ;
  152.             }
  153.             //声明消息队列
  154.             selfChannelP->declareMessageQueue(message);
  155.         }
  156.         //删除消息队列
  157.         void onDeleteMessageQueue(const muduo::net::TcpConnectionPtr &conn, const DeleteMessageQueueRequestPtr &message, muduo::Timestamp)
  158.         {
  159.             //1.获取连接
  160.             auto selfConnP=_connectionManager->getConnection(conn);
  161.             if(selfConnP.get()==nullptr)
  162.             {
  163.                 ELOG("删除消息队列失败,因为连接不存在");
  164.                 return ;
  165.             }
  166.             //2.获取信道
  167.             auto selfChannelP=selfConnP->getChannel(message->cid());
  168.             if(selfChannelP.get()==nullptr)
  169.             {
  170.                 ELOG("删除消息队列失败,因为信道不存在");
  171.                 return ;
  172.             }
  173.             //删除消息队列
  174.             selfChannelP->deleteMessageQueue(message);
  175.         }
  176.         //绑定
  177.         void onBind(const muduo::net::TcpConnectionPtr &conn, const BindRequestPtr &message, muduo::Timestamp)
  178.         {
  179.             //1.获取连接
  180.             auto selfConnP=_connectionManager->getConnection(conn);
  181.             if(selfConnP.get()==nullptr)
  182.             {
  183.                 ELOG("绑定失败,因为连接不存在");
  184.                 return ;
  185.             }
  186.             //2.获取信道
  187.             auto selfChannelP=selfConnP->getChannel(message->cid());
  188.             if(selfChannelP.get()==nullptr)
  189.             {
  190.                 ELOG("绑定失败,因为信道不存在");
  191.                 return ;
  192.             }
  193.             //绑定
  194.             selfChannelP->bind(message);
  195.         }
  196.         //解绑
  197.         void onUnbind(const muduo::net::TcpConnectionPtr &conn, const UnbindRequestPtr &message, muduo::Timestamp)
  198.         {
  199.             //1.获取连接
  200.             auto selfConnP=_connectionManager->getConnection(conn);
  201.             if(selfConnP.get()==nullptr)
  202.             {
  203.                 ELOG("解绑失败,因为连接不存在");
  204.                 return ;
  205.             }
  206.             //2.获取信道
  207.             auto selfChannelP=selfConnP->getChannel(message->cid());
  208.             if(selfChannelP.get()==nullptr)
  209.             {
  210.                 ELOG("解绑失败,因为信道不存在");
  211.                 return ;
  212.             }
  213.             //解绑
  214.             selfChannelP->unbind(message);
  215.         }
  216.         //订阅
  217.         void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const BasicConsumeRequestPtr &message, muduo::Timestamp)
  218.         {
  219.             //1.获取连接
  220.             auto selfConnP=_connectionManager->getConnection(conn);
  221.             if(selfConnP.get()==nullptr)
  222.             {
  223.                 ELOG("订阅失败,因为连接不存在");
  224.                 return ;
  225.             }
  226.             //2.获取信道
  227.             auto selfChannelP=selfConnP->getChannel(message->cid());
  228.             if(selfChannelP.get()==nullptr)
  229.             {
  230.                 ELOG("订阅失败,因为信道不存在");
  231.                 return ;
  232.             }
  233.             //订阅
  234.             selfChannelP->basicConsume(message);
  235.         }
  236.         //取消订阅
  237.         void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const BasicCancelRequestPtr &message, muduo::Timestamp)
  238.         {
  239.             //1.获取连接
  240.             auto selfConnP=_connectionManager->getConnection(conn);
  241.             if(selfConnP.get()==nullptr)
  242.             {
  243.                 ELOG("取消订阅失败,因为连接不存在");
  244.                 return ;
  245.             }
  246.             //2.获取信道
  247.             auto selfChannelP=selfConnP->getChannel(message->cid());
  248.             if(selfChannelP.get()==nullptr)
  249.             {
  250.                 ELOG("取消订阅失败,因为信道不存在");
  251.                 return ;
  252.             }
  253.             //取消订阅
  254.             selfChannelP->basicCancel(message);
  255.         }
  256.         //发布消息
  257.         void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const BasicPublishRequestPtr &message, muduo::Timestamp)
  258.         {
  259.             //1.获取连接
  260.             auto selfConnP=_connectionManager->getConnection(conn);
  261.             if(selfConnP.get()==nullptr)
  262.             {
  263.                 ELOG("发布消息失败,因为连接不存在");
  264.                 return ;
  265.             }
  266.             //2.获取信道
  267.             auto selfChannelP=selfConnP->getChannel(message->cid());
  268.             if(selfChannelP.get()==nullptr)
  269.             {
  270.                 ELOG("发布消息失败,因为信道不存在");
  271.                 return ;
  272.             }
  273.             //发布消息
  274.             selfChannelP->basicPublish(message);
  275.         }
  276.         //确认消息
  277.         void onBasicAckRequest(const muduo::net::TcpConnectionPtr &conn, const BasicAckRequestPtr &message, muduo::Timestamp)
  278.         {
  279.             //1.获取连接
  280.             auto selfConnP=_connectionManager->getConnection(conn);
  281.             if(selfConnP.get()==nullptr)
  282.             {
  283.                 ELOG("确认消息失败,因为连接不存在");
  284.                 return ;
  285.             }
  286.             //2.获取信道
  287.             auto selfChannelP=selfConnP->getChannel(message->cid());
  288.             if(selfChannelP.get()==nullptr)
  289.             {
  290.                 ELOG("确认消息失败,因为信道不存在");
  291.                 return ;
  292.             }
  293.             //确认消息
  294.             selfChannelP->basicAck(message);
  295.         }
  296.         void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const std::shared_ptr<google::protobuf::Message>& message,muduo::Timestamp)
  297.         {
  298.             //收到未知消息
  299.             LOG_INFO<<message->GetTypeName();
  300.             conn->shutdown();
  301.         }
  302.         void onConnection(const muduo::net::TcpConnectionPtr &conn)
  303.         {
  304.             if (conn->connected())
  305.             {
  306.                 LOG_INFO << "连接成功";
  307.                 _connectionManager->newConnection(conn,_codec,_consumerManager,_virtualHost,_threadPool);
  308.             }
  309.             else
  310.             {
  311.                 LOG_INFO << "断开连接";
  312.                 _connectionManager->deleteConnection(conn);
  313.             }
  314.         }
  315.     private:
  316.         muduo::net::EventLoop _loop;
  317.         muduo::net::TcpServer _server;  // 服务器
  318.         ProtobufDispatcher _dispatcher; // 请求分发器对象--要向其中注册请求处理函数
  319.         ProtobufCodecPtr _codec;           // protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
  320.         QueueConsumerManager::ptr _consumerManager;     // 消费者管理句柄
  321.         VirtualHost::ptr _virtualHost;      // 虚拟机管理句柄
  322.         ThreadPool::ptr _threadPool;        // 线程池管理句柄
  323.         ConnectionManager::ptr _connectionManager;     // 连接管理句柄
  324.     };
  325. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
                       客户端实现

     1、订阅者模块

     ⼀个并不直接对⽤⼾展⽰的模块,其在客⼾端体现的作⽤就是对于⻆⾊的描述,表⽰这是⼀个消费者。
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/helper.hpp"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/message.pb.h"
  5. namespace lsxmq
  6. {
  7.     using ConsumerCallBack=std::function<void(const std::string&,BasicProperties*,const std::string&)>;
  8.                                                       //消费者标识     //基本属性             //消息内容
  9.     struct Consumer
  10.     {
  11.         using ptr=std::shared_ptr<Consumer>;
  12.         Consumer()
  13.         {
  14.             DLOG("new Consumer: %p",this);
  15.         }
  16.         ~Consumer()
  17.         {
  18.             DLOG("delete Consumer: %p",this);
  19.         }
  20.         Consumer(const std::string& ctag,const std::string& cqname,bool cautoAck,const ConsumerCallBack& ccallback)
  21.             :tag(ctag),qname(cqname),autoAck(cautoAck),callback(ccallback)
  22.         {}
  23.         std::string tag;//消费者标识
  24.         std::string qname;//队列名称
  25.         bool autoAck;//自动确认标志
  26.         ConsumerCallBack callback;//消费回调函数
  27.     };
  28. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
                       2、信道管理模块

     信道信息:
     

  • 信道ID
  • 信道关联的网络通讯连接对象
  • protobuf协议处理对象
  • 信道关联的消费者
  • 请结构<哀求id,相应>,方便查找)
  • 互斥锁&条件变量(大部门哀求是壅闭操作,但muduo库的通讯是异步的,因此在收到应答后,通过判定是否是等候的指定相应来进行同步):发送消息时,还没有声明交换机成功怎么办?以是要同步
     信道操作:
     

  • 打开信道
  • 关闭信道
  • 声明交换机
  • 删除交换机
  • 声明队列
  • 删除队列
  • 绑定
  • 解绑
  • 订阅
  • 取消订阅
  • 发布消息
  • 确认消息
                                   登录后复制                        
  1. #pragma once
  2. #include "../mqcomm/logger.hpp"
  3. #include "../mqcomm/threadpool.hpp"
  4. #include "../mqcomm/proto.pb.h"
  5. #include "../mqcomm/message.pb.h"
  6. #include "consumer.hpp"
  7. #include "muduo/net/TcpConnection.h"
  8. #include "muduo/proto/codec.h"
  9. namespace lsxmq
  10. {
  11.     using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;
  12.     using BasicCommonResponsePtr=std::shared_ptr<BasicCommonResponse>;
  13.     using BasicConsumeResponsePtr=std::shared_ptr<basicConsumeResponse>;
  14.     class Channel
  15.     {
  16.     public:
  17.         using ptr=std::shared_ptr<Channel>;
  18.         Channel(const muduo::net::TcpConnectionPtr& conn
  19.                 ,const ProtobufCodecPtr& codec)
  20.                 :_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec)
  21.         {}
  22.         ~Channel()
  23.         {
  24.             basicCancel();//析构时取消订阅
  25.         }
  26.         std::string cid()
  27.         {
  28.             return _cid;
  29.         }
  30.         //打开信道
  31.         bool openChannel()
  32.         {
  33.             //构造请求
  34.             OpenChannelRequest req;
  35.             req.set_rid(UUIDHelper::uuid());
  36.             req.set_cid(_cid);
  37.             //发送请求并等待
  38.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  39.             auto rsp=waitBasicResponse(req.rid());
  40.             if(rsp->ok()==false)
  41.             {
  42.                 ELOG("打开信道失败");
  43.             }
  44.             return rsp->ok();
  45.         }
  46.         //关闭信道
  47.         bool closeChannel()
  48.         {
  49.             //构造请求
  50.             CloseChannelRequest req;
  51.             req.set_rid(UUIDHelper::uuid());
  52.             req.set_cid(_cid);
  53.             //发送请求并等待
  54.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  55.             auto rsp=waitBasicResponse(req.rid());
  56.             if(rsp->ok()==false)
  57.             {
  58.                 ELOG("关闭信道失败");
  59.             }
  60.             return rsp->ok();
  61.         }
  62.         //声明交换机
  63.         bool declareExchange(const std::string& exchangeName
  64.                             ,ExchangeType exchangeType
  65.                             ,bool durable
  66.                             ,bool autoDelete
  67.                             ,google::protobuf::Map<std::string, std::string>& args)
  68.         {
  69.             //构造请求
  70.             DeclareExchangeRequest req;
  71.             req.set_rid(UUIDHelper::uuid());
  72.             req.set_cid(_cid);
  73.             req.set_exchange_name(exchangeName);
  74.             req.set_exchange_type(exchangeType);
  75.             req.set_durable(durable);
  76.             req.set_auto_delete(autoDelete);
  77.             req.mutable_args()->swap(args);
  78.             //发送请求并等待
  79.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  80.             auto rsp=waitBasicResponse(req.rid());
  81.             if(rsp->ok()==false)
  82.             {
  83.                 ELOG("声明交换机失败");
  84.             }
  85.             return rsp->ok();
  86.         }
  87.         //删除交换机
  88.         bool deleteExchange(const std::string& exchangeName)
  89.         {
  90.             //构造请求
  91.             DeleteExchangeRequest req;
  92.             req.set_rid(UUIDHelper::uuid());
  93.             req.set_cid(_cid);
  94.             req.set_exchange_name(exchangeName);
  95.             //发送请求并等待
  96.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  97.             auto rsp=waitBasicResponse(req.rid());
  98.             if(rsp->ok()==false)
  99.             {
  100.                 ELOG("删除交换机失败");
  101.             }
  102.             return rsp->ok();
  103.         }
  104.         //声明消息队列
  105.         bool declareMessageQueue(const std::string& queueName
  106.                                 ,bool exclusive
  107.                                 ,bool durable
  108.                                 ,bool autoDelete
  109.                                 ,google::protobuf::Map<std::string, std::string>& args)
  110.         {
  111.             //构造请求
  112.             DeclareMessageQueueRequest req;
  113.             req.set_rid(UUIDHelper::uuid());
  114.             req.set_cid(_cid);
  115.             req.set_queue_name(queueName);
  116.             req.set_exclusive(exclusive);
  117.             req.set_durable(durable);
  118.             req.set_auto_delete(autoDelete);
  119.             req.mutable_args()->swap(args);
  120.             //发送请求并等待
  121.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  122.             auto rsp=waitBasicResponse(req.rid());
  123.             if(rsp->ok()==false)
  124.             {
  125.                 ELOG("声明消息队列失败");
  126.             }
  127.             return rsp->ok();
  128.         }
  129.         //删除消息队列
  130.         bool deleteMessageQueue(const std::string& queueName)
  131.         {
  132.             //构造请求
  133.             DeleteMessageQueueRequest req;
  134.             req.set_rid(UUIDHelper::uuid());
  135.             req.set_cid(_cid);
  136.             req.set_queue_name(queueName);
  137.             //发送请求并等待
  138.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  139.             auto rsp=waitBasicResponse(req.rid());
  140.             if(rsp->ok()==false)
  141.             {
  142.                 ELOG("删除消息队列失败");
  143.             }
  144.             return rsp->ok();
  145.         }
  146.         //绑定
  147.         bool bind(const std::string& exchangeName
  148.                     ,const std::string& queueName
  149.                     ,const std::string& bindingKey)
  150.         {
  151.             //构造请求
  152.             BindRequest req;
  153.             req.set_rid(UUIDHelper::uuid());
  154.             req.set_cid(_cid);
  155.             req.set_exchange_name(exchangeName);
  156.             req.set_queue_name(queueName);
  157.             req.set_binding_key(bindingKey);
  158.             //发送请求并等待
  159.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  160.             auto rsp=waitBasicResponse(req.rid());
  161.             if(rsp->ok()==false)
  162.             {
  163.                 ELOG("绑定失败");
  164.             }
  165.             return rsp->ok();
  166.         }
  167.         //解绑
  168.         bool unbind(const std::string& exchangeName
  169.                     ,const std::string& queueName)
  170.         {
  171.             //构造请求
  172.             BindRequest req;
  173.             req.set_rid(UUIDHelper::uuid());
  174.             req.set_cid(_cid);
  175.             req.set_exchange_name(exchangeName);
  176.             req.set_queue_name(queueName);
  177.             //发送请求并等待
  178.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  179.             auto rsp=waitBasicResponse(req.rid());
  180.             if(rsp->ok()==false)
  181.             {
  182.                 ELOG("解绑失败");
  183.             }
  184.             return rsp->ok();
  185.         }
  186.         //订阅
  187.         bool basicConsume(const std::string& consumerTag
  188.                         ,const std::string& queueName
  189.                         ,bool autoAck
  190.                         ,const ConsumerCallBack& cb)
  191.         {
  192.             //构造请求
  193.             if(_consumer.get()!=nullptr)
  194.             {
  195.                 DLOG("已订阅,不可重复订阅")
  196.                 return false;
  197.             }
  198.             BasicConsumeRequest req;
  199.             req.set_rid(UUIDHelper::uuid());
  200.             req.set_cid(_cid);
  201.             req.set_consumer_tag(consumerTag);
  202.             req.set_queue_name(queueName);
  203.             req.set_auto_ack(autoAck);
  204.             //发送请求并等待
  205.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  206.             auto rsp=waitBasicResponse(req.rid());
  207.             if(rsp->ok()==false)
  208.             {
  209.                 ELOG("订阅失败");
  210.                 return false;
  211.             }
  212.             //订阅成功,要构造对应的消费者
  213.             _consumer=std::make_shared<Consumer>(consumerTag,queueName,autoAck,cb);
  214.             return rsp->ok();
  215.         }
  216.         //取消订阅
  217.         bool basicCancel()
  218.         {
  219.             if(_consumer.get()==nullptr)
  220.             {
  221.                 return true;
  222.             }
  223.             //构造请求
  224.             BasicCancelRequest req;
  225.             req.set_rid(UUIDHelper::uuid());
  226.             req.set_cid(_cid);
  227.             req.set_consumer_tag(_consumer->tag);
  228.             req.set_queue_name(_consumer->qname);
  229.             //发送请求并等待
  230.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  231.             auto rsp=waitBasicResponse(req.rid());
  232.             if(rsp->ok()==false)
  233.             {
  234.                 ELOG("取消订阅失败");
  235.                 return false;
  236.             }
  237.             //取消订阅时清理消费者对象
  238.             _consumer.reset();
  239.             return rsp->ok();
  240.         }
  241.         //发布消息
  242.         bool basicPublish(const std::string& exchangeName
  243.                             ,const std::string& body
  244.                             ,BasicProperties* bp)
  245.         {
  246.             //构造请求
  247.             BasicPublishRequest req;
  248.             req.set_rid(UUIDHelper::uuid());
  249.             req.set_cid(_cid);
  250.             req.set_exchange_name(exchangeName);
  251.             req.set_body(body);
  252.             if(bp!=nullptr)
  253.             {
  254.                 req.mutable_properties()->set_id(bp->id());
  255.                 req.mutable_properties()->set_deliverymode(bp->deliverymode());
  256.                 req.mutable_properties()->set_routing_key(bp->routing_key());
  257.             }
  258.             //发送请求并等待
  259.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  260.             auto rsp=waitBasicResponse(req.rid());
  261.             if(rsp->ok()==false)
  262.             {
  263.                 ELOG("发布消息失败");
  264.             }
  265.             return rsp->ok();
  266.         }
  267.         //确认消息
  268.         bool basicAck(const std::string& mid)
  269.         {
  270.             if(_consumer.get()==nullptr)
  271.             {
  272.                 ELOG("消息确认时,没有找到消费者")
  273.                 return false;
  274.             }
  275.             //构造请求
  276.             BasicAckRequest req;
  277.             req.set_rid(UUIDHelper::uuid());
  278.             req.set_cid(_cid);
  279.             req.set_queue_name(_consumer->qname);
  280.             req.set_mid(mid);
  281.             //发送请求并等待
  282.             _codec->send(_conn,req);//send是异步的,所以需要我们手动进行等待
  283.             auto rsp=waitBasicResponse(req.rid());
  284.             if(rsp->ok()==false)
  285.             {
  286.                 ELOG("确认消息失败");
  287.             }
  288.             return rsp->ok();
  289.         }
  290.     public:
  291.         //收到基础响应时,向hash map中添加响应
  292.         void putBasicResponse(const BasicCommonResponsePtr& rsp)
  293.         {
  294.             std::unique_lock<std::mutex> lock(_mutex);
  295.             _basicRsp.insert(std::make_pair(rsp->rid(),rsp));
  296.             _cond.notify_all();//唤醒所有等待线程,只有满足条件的才能竞争锁
  297.         }
  298.         //收到消息推送后,需要信道找到对应的消费者,通过回调函数进行处理
  299.         void consume(const BasicConsumeResponsePtr& rsp)
  300.         {
  301.             if(_consumer.get()==nullptr)
  302.             {
  303.                 ELOG("消费者不存在");
  304.                 return;
  305.             }
  306.             if(rsp->consumer_tag()!=_consumer->tag)
  307.             {
  308.                 ELOG("收到错误的响应,消费者标志匹配失败");
  309.                 return;
  310.             }
  311.             //执行回调函数
  312.             _consumer->callback(rsp->consumer_tag(),rsp->mutable_properties(),rsp->body());
  313.         }
  314.     private:
  315.         //等待响应
  316.         BasicCommonResponsePtr waitBasicResponse(const std::string& rid)
  317.         {
  318.             std::unique_lock<std::mutex> lock(_mutex);
  319.             _cond.wait(lock,[&rid,this](){
  320.                 return _basicRsp.find(rid)!=_basicRsp.end();//等到收到了对应的响应才退出
  321.             });
  322.             //拿到响应,然后再删除
  323.             BasicCommonResponsePtr rsp=_basicRsp[rid];
  324.             _basicRsp.erase(rid);
  325.             return rsp;
  326.         }
  327.     private:
  328.         std::string _cid;  //信道id
  329.         muduo::net::TcpConnectionPtr _conn;  //信道关联的网络通信连接
  330.         ProtobufCodecPtr _codec;  //protobuf协议处理器
  331.         Consumer::ptr _consumer;  //信道对应的消费者,当确定了信道角色后构造(订阅操作后)
  332.         std::mutex _mutex;
  333.         std::condition_variable _cond;
  334.         std::unordered_map<std::string,BasicCommonResponsePtr> _basicRsp;  //<请求id,响应>
  335.     };
  336.     class ChannelManager
  337.     {
  338.     public:
  339.         using ptr=std::shared_ptr<ChannelManager>;
  340.         ChannelManager()
  341.         {
  342.             DLOG("new ChannelManager: %p",this);
  343.         }
  344.         ~ChannelManager()
  345.         {
  346.             DLOG("delete ChannelManager: %p",this);
  347.         }
  348.         //打开信道
  349.         Channel::ptr createChannel(const muduo::net::TcpConnectionPtr& conn
  350.             ,const ProtobufCodecPtr& codec)
  351.         {
  352.             std::unique_lock<std::mutex> lock(_mutex);
  353.             //队列是否已存在?不需要担心,因为创建信道会自动生成id
  354.             //构造信道对象
  355.             Channel::ptr chp=std::make_shared<Channel>(conn,codec);
  356.             //添加管理
  357.             _channels[chp->cid()]=chp;
  358.             return chp;
  359.         }
  360.         void deleteChannel(const std::string& cid)//关闭信道
  361.         {
  362.             std::unique_lock<std::mutex> lock(_mutex);
  363.             //队列是否不存在
  364.             auto it=_channels.find(cid);
  365.             if(it==_channels.end())
  366.             {
  367.                 return;
  368.             }
  369.             //删除信道
  370.             _channels.erase(cid);
  371.         }
  372.         Channel::ptr getChannel(const std::string& cid)
  373.         {
  374.             std::unique_lock<std::mutex> lock(_mutex);
  375.             //队列是否不存在
  376.             auto it=_channels.find(cid);
  377.             if(it==_channels.end())
  378.             {
  379.                 return Channel::ptr();
  380.             }
  381.             return it->second;
  382.         }
  383.     private:
  384.         std::mutex _mutex;
  385.         std::unordered_map<std::string,Channel::ptr> _channels;
  386.     };
  387. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.
  • 156.
  • 157.
  • 158.
  • 159.
  • 160.
  • 161.
  • 162.
  • 163.
  • 164.
  • 165.
  • 166.
  • 167.
  • 168.
  • 169.
  • 170.
  • 171.
  • 172.
  • 173.
  • 174.
  • 175.
  • 176.
  • 177.
  • 178.
  • 179.
  • 180.
  • 181.
  • 182.
  • 183.
  • 184.
  • 185.
  • 186.
  • 187.
  • 188.
  • 189.
  • 190.
  • 191.
  • 192.
  • 193.
  • 194.
  • 195.
  • 196.
  • 197.
  • 198.
  • 199.
  • 200.
  • 201.
  • 202.
  • 203.
  • 204.
  • 205.
  • 206.
  • 207.
  • 208.
  • 209.
  • 210.
  • 211.
  • 212.
  • 213.
  • 214.
  • 215.
  • 216.
  • 217.
  • 218.
  • 219.
  • 220.
  • 221.
  • 222.
  • 223.
  • 224.
  • 225.
  • 226.
  • 227.
  • 228.
  • 229.
  • 230.
  • 231.
  • 232.
  • 233.
  • 234.
  • 235.
  • 236.
  • 237.
  • 238.
  • 239.
  • 240.
  • 241.
  • 242.
  • 243.
  • 244.
  • 245.
  • 246.
  • 247.
  • 248.
  • 249.
  • 250.
  • 251.
  • 252.
  • 253.
  • 254.
  • 255.
  • 256.
  • 257.
  • 258.
  • 259.
  • 260.
  • 261.
  • 262.
  • 263.
  • 264.
  • 265.
  • 266.
  • 267.
  • 268.
  • 269.
  • 270.
  • 271.
  • 272.
  • 273.
  • 274.
  • 275.
  • 276.
  • 277.
  • 278.
  • 279.
  • 280.
  • 281.
  • 282.
  • 283.
  • 284.
  • 285.
  • 286.
  • 287.
  • 288.
  • 289.
  • 290.
  • 291.
  • 292.
  • 293.
  • 294.
  • 295.
  • 296.
  • 297.
  • 298.
  • 299.
  • 300.
  • 301.
  • 302.
  • 303.
  • 304.
  • 305.
  • 306.
  • 307.
  • 308.
  • 309.
  • 310.
  • 311.
  • 312.
  • 313.
  • 314.
  • 315.
  • 316.
  • 317.
  • 318.
  • 319.
  • 320.
  • 321.
  • 322.
  • 323.
  • 324.
  • 325.
  • 326.
  • 327.
  • 328.
  • 329.
  • 330.
  • 331.
  • 332.
  • 333.
  • 334.
  • 335.
  • 336.
  • 337.
  • 338.
  • 339.
  • 340.
  • 341.
  • 342.
  • 343.
  • 344.
  • 345.
  • 346.
  • 347.
  • 348.
  • 349.
  • 350.
  • 351.
  • 352.
  • 353.
  • 354.
  • 355.
  • 356.
  • 357.
  • 358.
  • 359.
  • 360.
  • 361.
  • 362.
  • 363.
  • 364.
  • 365.
  • 366.
  • 367.
  • 368.
  • 369.
  • 370.
  • 371.
  • 372.
  • 373.
  • 374.
  • 375.
  • 376.
  • 377.
  • 378.
  • 379.
  • 380.
  • 381.
  • 382.
  • 383.
  • 384.
  • 385.
  • 386.
  • 387.
  • 388.
  • 389.
                       3、异步工作线程池模块

     客⼾端这边存在两个异步⼯作线程,
     

  • ⼀个是muduo库中客⼾端连接的异步循环线程EventLoopThread,
  • ⼀个是当收到消息后进⾏异步处理的⼯作线程池。
     这两项都不是以连接为单元进⾏创建的,⽽是创建后,可以⽤以多个连接中,因此单独进⾏封装。
                                   登录后复制                        
  1. #pragma once
  2. #include "muduo/net/EventLoopThreadPool.h"
  3. #include "../mqcomm/logger.hpp"
  4. #include "../mqcomm/threadpool.hpp"
  5. namespace lsxmq
  6. {
  7.     class AsyncWorker
  8.     {
  9.     public:
  10.         using ptr=std::shared_ptr<AsyncWorker>;
  11.         muduo::net::EventLoopThreadPool _loopthread;
  12.         ThreadPool _pool;
  13.     };
  14. };
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
                       4、连接管理模块

     在客⼾端这边,RabbitMQ弱化了客⼾端的概念,因为⽤⼾所需的服务都是通过信道来提供的,因此操作头脑转换为先创建连接,通过连接创建信道,通过信道提供服务这⼀流程。
     这个模块同样是针对muduo库客⼾端连接的⼆次封装,向⽤⼾提供创建channel信道的接⼝,创建信道后,可以通过信道来获取指定服务。
                                   登录后复制                        
  1. #pragma once
  2. #include "muduo/proto/dispatcher.h"
  3. #include "muduo/proto/codec.h"
  4. #include "muduo/base/Logging.h"
  5. #include "muduo/base/Mutex.h"
  6. #include "muduo/net/EventLoop.h"
  7. #include "muduo/net/TcpClient.h"
  8. #include "muduo/base/CountDownLatch.h"
  9. #include "muduo/net/EventLoopThread.h"
  10. #include "worker.hpp"
  11. #include "channel.hpp"
  12. #include <string>
  13. namespace lsxmq
  14. {
  15.     typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
  16.     class Connection
  17.     {
  18.     public:
  19.     using ptr=std::shared_ptr<Connection>;
  20.     Connection(const std::string &serverIp, uint16_t serverPort,const AsyncWorker::ptr& worker)
  21.             : _latch(1)
  22.             ,_worker(worker)
  23.             , _client(_worker->_loopthread.startLoop()
  24.                 , muduo::net::InetAddress(serverIp, serverPort), "ProtobufClient")
  25.             , _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1
  26.                 , std::placeholders::_2, std::placeholders::_3))
  27.             , _codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher
  28.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)))
  29.             ,_channels(std::make_shared<ChannelManager>())
  30.         {
  31.             // 注册处理函数
  32.             _dispatcher.registerMessageCallback<BasicCommonResponse>(std::bind(&Connection::onBasicCommonResponse, this
  33.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  34.             _dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::onBasicConsumeResponse, this
  35.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  36.             // 客户端注册回调函数
  37.             _client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));
  38.             _client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get()
  39.                 , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  40.                
  41.             //连接服务器
  42.             _client.connect();
  43.             _latch.wait(); // 等待连接服务器成功
  44.         }
  45.         //打开信道
  46.         Channel::ptr openChannel()
  47.         {
  48.             //构造一个信道
  49.             auto channel=_channels->createChannel(_conn,_codec);
  50.             //打开信道
  51.             auto ret=channel->openChannel();
  52.             if(ret==false)
  53.             {
  54.                 ELOG("打开信道失败");
  55.                 _channels->deleteChannel(channel->cid());
  56.                 return Channel::ptr();
  57.             }
  58.             return channel;
  59.         }
  60.         //关闭信道
  61.         bool closeChannel(const Channel::ptr& channel)
  62.         {
  63.             auto ret=channel->closeChannel();
  64.             if(ret==false)
  65.             {
  66.                 ELOG("关闭信道失败");
  67.                 return false;
  68.             }
  69.             _channels->deleteChannel(channel->cid());
  70.             return true;
  71.         }
  72.     private:
  73.         //收到普通响应
  74.         void onBasicCommonResponse(const muduo::net::TcpConnectionPtr &conn, const BasicCommonResponsePtr &message, muduo::Timestamp)
  75.         {
  76.             //1.获取信道
  77.             auto channel=_channels->getChannel(message->cid());
  78.             if(channel.get()==nullptr)
  79.             {
  80.                 ELOG("处理普通响应失败,因为获取信道失败");
  81.                 return;
  82.             }
  83.             //2.将响应加入hash map
  84.             channel->putBasicResponse(message);
  85.         }
  86.         //收到推送消息
  87.         void onBasicConsumeResponse(const muduo::net::TcpConnectionPtr &conn,const BasicConsumeResponsePtr& message, muduo::Timestamp)
  88.         {
  89.             //1.获取信道
  90.             auto channel=_channels->getChannel(message->cid());
  91.             if(channel.get()==nullptr)
  92.             {
  93.                 ELOG("处理普通响应失败,因为获取信道失败");
  94.                 return;
  95.             }
  96.             //2.对收到的消息进行处理,由工作线程池完成
  97.             auto task=std::bind(&Channel::consume,channel.get(),message);
  98.             _worker->_pool.push(task);
  99.         }
  100.         void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn,const MessagePtr& message,muduo::Timestamp)
  101.         {
  102.             LOG_INFO<<message->GetTypeName();
  103.             //收到了未定义的消息,断开连接
  104.             conn->shutdown();
  105.         }
  106.         void onConnection(const muduo::net::TcpConnectionPtr &conn)
  107.         {
  108.             if (conn->connected())
  109.             {
  110.                 LOG_INFO << "连接成功";
  111.                 _conn = conn;
  112.                 // 解除阻塞
  113.                 _latch.countDown();
  114.             }
  115.             else
  116.             {
  117.                 LOG_INFO << "断开连接";
  118.             }
  119.         }
  120.     private:
  121.         muduo::CountDownLatch _latch;            // 用于同步等待连接服务器成功
  122.         AsyncWorker::ptr _worker;  //异步工作线程
  123.         muduo::net::TcpClient _client;           // 客户端
  124.         ProtobufDispatcher _dispatcher;          // 派发器
  125.         ProtobufCodecPtr _codec;                    // protobuf协议处理器
  126.         muduo::net::TcpConnectionPtr _conn;
  127.         ChannelManager::ptr _channels;  //信道管理句柄
  128.     };
  129. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
                       综合测试

     广播交换模式下:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::FANOUT,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","queue1");
  33.     channel->bind("exchange1","queue2","news.music.#");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::FANOUT,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","queue1");
  21.     channel->bind("exchange1","queue2","news.music.#");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=11;i<=20;++i)
  24.     {
  25.         //交换机类型是广播交换,所以不需要BasicProperties
  26.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),nullptr);
  27.     }
  28.     //5.关闭信道
  29.         conn->closeChannel(channel);
  30.     return 0;
  31. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
                       直接交换模式:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::DIRECT,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","queue1");
  33.     channel->bind("exchange1","queue2","queue2");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::DIRECT,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","queue1");
  21.     channel->bind("exchange1","queue2","queue2");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=1;i<=10;++i)
  24.     {
  25.         lsxmq::BasicProperties bp;
  26.         bp.set_id(lsxmq::UUIDHelper::uuid());
  27.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  28.         bp.set_routing_key("queue1");
  29.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  30.     }
  31.     for(int i=11;i<=15;++i)
  32.     {
  33.         lsxmq::BasicProperties bp;
  34.         bp.set_id(lsxmq::UUIDHelper::uuid());
  35.         bp.set_deliverymode(lsxmq::DeliveryMode::UNDURABLE);
  36.         bp.set_routing_key("queue2");
  37.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  38.     }
  39.     //5.关闭信道
  40.         conn->closeChannel(channel);
  41.     return 0;
  42. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
                       主题交换模式:
                                   登录后复制                        
  1. //订阅者
  2. #include "connection.hpp"
  3. void callBack(const lsxmq::Channel::ptr& channel,const std::string& tag,lsxmq::BasicProperties* bp,const std::string& body)
  4. {
  5.     std::cout<<"消费者"<<tag<<"收到一条消息:"<<body<<std::endl;
  6.     //发送响应
  7.     channel->basicAck(bp->id());
  8. }
  9. int main(int argc,char* argv[])
  10. {
  11.     if(argc!=2)
  12.     {
  13.         std::cout<<"格式错误"<<std::endl;
  14.         return 0;
  15.     }
  16.     std::string qname=argv[1];
  17.     //1.构造异步工作者
  18.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  19.     //2.构造连接
  20.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  21.     //3.打开信道
  22.     lsxmq::Channel::ptr channel=conn->openChannel();
  23.     //4.通过信道进行操作
  24.         //4.1.声明一台交换机
  25.     google::protobuf::Map<std::string, std::string> args;
  26.     channel->declareExchange("exchange1",lsxmq::ExchangeType::TOPIC,true,false,args);
  27.         //4.2.声明队列queue1
  28.     channel->declareMessageQueue("queue1",false,true,false,args);
  29.         //4.3.声明队列queue2
  30.     channel->declareMessageQueue("queue2",false,true,false,args);
  31.         //4.4.将队列与交换机绑定
  32.     channel->bind("exchange1","queue1","sport.#");
  33.     channel->bind("exchange1","queue2","music.#");
  34.         //4.5订阅一个队列
  35.     auto cb=std::bind(callBack,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
  36.     channel->basicConsume("consumer1",qname,false,cb);
  37.     //5.等待
  38.     while(1)
  39.     {
  40.         std::this_thread::sleep_for(std::chrono::seconds(3));
  41.     }
  42.     //5.关闭信道
  43.         conn->closeChannel(channel);
  44.     return 0;
  45. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
                                                     登录后复制                        
  1. //发布者
  2. #include "connection.hpp"
  3. int main()
  4. {
  5.     //1.构造异步工作者
  6.     lsxmq::AsyncWorker::ptr worker=std::make_shared<lsxmq::AsyncWorker>();
  7.     //2.构造连接
  8.     lsxmq::Connection::ptr conn=std::make_shared<lsxmq::Connection>("127.0.0.1",8888,worker);
  9.     //3.打开信道
  10.     lsxmq::Channel::ptr channel=conn->openChannel();
  11.     //4.通过信道进行操作
  12.         //4.1.声明一台交换机
  13.     google::protobuf::Map<std::string, std::string> args;
  14.     channel->declareExchange("exchange1",lsxmq::ExchangeType::TOPIC,true,false,args);
  15.         //4.2.声明队列queue1
  16.     channel->declareMessageQueue("queue1",false,true,false,args);
  17.         //4.3.声明队列queue2
  18.     channel->declareMessageQueue("queue2",false,true,false,args);
  19.         //4.4.将队列与交换机绑定
  20.     channel->bind("exchange1","queue1","sport.#");
  21.     channel->bind("exchange1","queue2","music.#");
  22.         //4.5循环向交换机发布消息
  23.     for(int i=1;i<=10;++i)
  24.     {
  25.         lsxmq::BasicProperties bp;
  26.         bp.set_id(lsxmq::UUIDHelper::uuid());
  27.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  28.         bp.set_routing_key("music.pop");
  29.         channel->basicPublish("exchange1","hello-world-"+std::to_string(i),&bp);
  30.     }
  31.     for(int i=1;i<=5;++i)
  32.     {
  33.         lsxmq::BasicProperties bp;
  34.         bp.set_id(lsxmq::UUIDHelper::uuid());
  35.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  36.         bp.set_routing_key("music.pop.ajc");
  37.         channel->basicPublish("exchange1","good-morning-"+std::to_string(i),&bp);
  38.     }
  39.     for(int i=1;i<=5;++i)
  40.     {
  41.         lsxmq::BasicProperties bp;
  42.         bp.set_id(lsxmq::UUIDHelper::uuid());
  43.         bp.set_deliverymode(lsxmq::DeliveryMode::DURABLE);
  44.         bp.set_routing_key("news.gassip");
  45.         channel->basicPublish("exchange1","have-nice-day-"+std::to_string(i),&bp);
  46.     }
  47.     //5.关闭信道
  48.         conn->closeChannel(channel);
  49.     return 0;
  50. }
复制代码
      

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4