【RabbitMQ 项目】服务端:数据管理模块之消息队列管理 ...

卖不甜枣  金牌会员 | 3 天前 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 564|帖子 564|积分 1692

一.编写思绪

   

  • 定义消息队列

    • 名字
    • 是否持久化

  • 定义队列持久化类(持久化到 sqlite3)

    • 构造函数(只能成功,不能失败)

      • 如果数据库(文件)不存在则创建
      • 打开数据库
      • 打开 msg_queue_table 数据库表

    • 插入队列
    • 移除队列
    • 将数据库中的队列恢复到内存中
      传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表

  • 定义队列管理类(包罗内存管理和持久化管理)

    • 构造函数:从数据库中恢复队列
    • 声明队列
    • 移除队列
    • 获取队列

  二.代码实践

MsgQueue.hpp:
  1. #pragma once
  2. #include "../common/Log.hpp"
  3. #include "../common/Util.hpp"
  4. #include "../common/Util.hpp"
  5. #include <memory>
  6. #include <unordered_map>
  7. #include <mutex>
  8. namespace ns_data
  9. {
  10.     class MsgQueue;
  11.     using MsgQueuePtr = std::shared_ptr<MsgQueue>;
  12.     /************
  13.      * 定义消息队列
  14.      * ****************/
  15.     struct MsgQueue
  16.     {
  17.         std::string _name;
  18.         bool _isDurable;
  19.         MsgQueue(const std::string &name, bool isDurable)
  20.             : _name(name),
  21.               _isDurable(isDurable)
  22.         {
  23.         }
  24.     };
  25.     /*****************
  26.      * 定义消息队列持久化类
  27.      * ******************/
  28.     class MsgQueueMapper
  29.     {
  30.     private:
  31.         ns_util::Sqlite3Util _sqlite;
  32.     public:
  33.         MsgQueueMapper(const std::string &dbName)
  34.             : _sqlite(dbName)
  35.         {
  36.             // 确保数据库文件已经存在,不存在就创建
  37.             if (!ns_util::FileUtil::createFile(dbName))
  38.             {
  39.                 LOG(FATAL) << "create database " << dbName << " fail" << endl;
  40.                 exit(1);
  41.             }
  42.             if (!_sqlite.open())
  43.             {
  44.                 LOG(FATAL) << "open database " << dbName << " fail" << endl;
  45.                 exit(1);
  46.             }
  47.             createTable();
  48.         }
  49.         /*************
  50.          * 插入消息队列
  51.          * *************/
  52.         bool insertMsgQueue(MsgQueuePtr msgQueuePtr)
  53.         {
  54.             char insertSql[1024];
  55.             sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",
  56.                     msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);
  57.             if (!_sqlite.exec(insertSql, nullptr, nullptr))
  58.             {
  59.                 LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;
  60.                 return false;
  61.             }
  62.             return true;
  63.         }
  64.         /**********
  65.          * 移除消息队列
  66.          * ***************/
  67.         void removeMsgQueue(const std::string &name)
  68.         {
  69.             char deleteSql[1024];
  70.             sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());
  71.             if (!_sqlite.exec(deleteSql, nullptr, nullptr))
  72.             {
  73.                 LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;
  74.             }
  75.         }
  76.         /***********
  77.          * 从数据库中恢复消息队列到内存
  78.          * *****************/
  79.         void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr)
  80.         {
  81.             const std::string selectSql = "select * from msg_queue_table;";
  82.             if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr))
  83.             {
  84.                 LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;
  85.                 exit(1);
  86.             }
  87.         }
  88.         /**************
  89.          * 删除数据库表(仅调试)
  90.          * ***************/
  91.         void removeTable()
  92.         {
  93.             const std::string dropSql = "drop table if exists msg_queue_table;";
  94.             if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr))
  95.             {
  96.                 LOG(WARNING) << "remove table msg_queue_table fail" << endl;
  97.             }
  98.         }
  99.     private:
  100.         void createTable()
  101.         {
  102.             const std::string createSql = "create table if not exists msg_queue_table(\
  103.                 name varchar(32) primary key,\
  104.                 durable int\
  105.             );";
  106.             if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr))
  107.             {
  108.                 LOG(FATAL) << "create table msg_queue_table fail" << endl;
  109.                 exit(1);
  110.             }
  111.         }
  112.         static int selectCallback(void *arg, int colNum, char **line, char **fields)
  113.         {
  114.             auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);
  115.             std::string name = line[0];
  116.             bool isDurable = std::stoi(line[1]);
  117.             auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
  118.             mapPtr->insert({name, msgQueuePtr});
  119.             return 0;
  120.         }
  121.     };
  122.     class MsgQueueManager
  123.     {
  124.     private:
  125.         MsgQueueMapper _mapper;
  126.         std::unordered_map<std::string, MsgQueuePtr> _msgQueues;
  127.         std::mutex _mtx;
  128.     public:
  129.         MsgQueueManager(const std::string &dbName)
  130.             : _mapper(dbName)
  131.         {
  132.             _mapper.recoverMsgQueue(&_msgQueues);
  133.         }
  134.         /***********
  135.          * 声明队列
  136.          * ************/
  137.         bool declareMsgQueue(const std::string &name, bool isDurable)
  138.         {
  139.             std::unique_lock<std::mutex> lck(_mtx);
  140.             if (_msgQueues.count(name))
  141.             {
  142.                 return true;
  143.             }
  144.             auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
  145.             _msgQueues[name] = msgQueuePtr;
  146.             if (isDurable)
  147.             {
  148.                 return _mapper.insertMsgQueue(msgQueuePtr);
  149.             }
  150.             return true;
  151.         }
  152.         /**********
  153.          * 移除队列
  154.          * ***********/
  155.         void removeMsgQueue(const std::string &name)
  156.         {
  157.             std::unique_lock<std::mutex> lck(_mtx);
  158.             
  159.             auto it = _msgQueues.find(name);
  160.             if (it == _msgQueues.end())
  161.             {
  162.                 return;
  163.             }
  164.             if (it->second->_isDurable)
  165.             {
  166.                 _mapper.removeMsgQueue(name);
  167.             }
  168.             _msgQueues.erase(name);
  169.         }
  170.         /************
  171.          * 获取指定队列
  172.          * ***************/
  173.         MsgQueuePtr getMsgQueue(const std::string &name)
  174.         {
  175.             std::unique_lock<std::mutex> lck(_mtx);
  176.             
  177.             if (_msgQueues.count(name) == 0)
  178.             {
  179.                 return nullptr;
  180.             }
  181.             return _msgQueues[name];
  182.         }
  183.         /*************
  184.          * 清理所有队列(仅调试)
  185.          * ******************/
  186.         void clearMsgQueues()
  187.         {
  188.             std::unique_lock<std::mutex> lck(_mtx);
  189.             _msgQueues.clear();
  190.             _mapper.removeTable();
  191.         }
  192.     };
  193. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

卖不甜枣

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表