一.编写思绪
- 定义消息队列
- 定义队列持久化类(持久化到 sqlite3)
- 构造函数(只能成功,不能失败)
- 如果数据库(文件)不存在则创建
- 打开数据库
- 打开 msg_queue_table 数据库表
- 插入队列
- 移除队列
- 将数据库中的队列恢复到内存中
传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表
- 定义队列管理类(包罗内存管理和持久化管理)
- 构造函数:从数据库中恢复队列
- 声明队列
- 移除队列
- 获取队列
二.代码实践
MsgQueue.hpp:
- #pragma once
- #include "../common/Log.hpp"
- #include "../common/Util.hpp"
- #include "../common/Util.hpp"
- #include <memory>
- #include <unordered_map>
- #include <mutex>
- namespace ns_data
- {
- class MsgQueue;
- using MsgQueuePtr = std::shared_ptr<MsgQueue>;
- /************
- * 定义消息队列
- * ****************/
- struct MsgQueue
- {
- std::string _name;
- bool _isDurable;
- MsgQueue(const std::string &name, bool isDurable)
- : _name(name),
- _isDurable(isDurable)
- {
- }
- };
- /*****************
- * 定义消息队列持久化类
- * ******************/
- class MsgQueueMapper
- {
- private:
- ns_util::Sqlite3Util _sqlite;
- public:
- MsgQueueMapper(const std::string &dbName)
- : _sqlite(dbName)
- {
- // 确保数据库文件已经存在,不存在就创建
- if (!ns_util::FileUtil::createFile(dbName))
- {
- LOG(FATAL) << "create database " << dbName << " fail" << endl;
- exit(1);
- }
- if (!_sqlite.open())
- {
- LOG(FATAL) << "open database " << dbName << " fail" << endl;
- exit(1);
- }
- createTable();
- }
- /*************
- * 插入消息队列
- * *************/
- bool insertMsgQueue(MsgQueuePtr msgQueuePtr)
- {
- char insertSql[1024];
- sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",
- msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);
- if (!_sqlite.exec(insertSql, nullptr, nullptr))
- {
- LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;
- return false;
- }
- return true;
- }
- /**********
- * 移除消息队列
- * ***************/
- void removeMsgQueue(const std::string &name)
- {
- char deleteSql[1024];
- sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());
- if (!_sqlite.exec(deleteSql, nullptr, nullptr))
- {
- LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;
- }
- }
- /***********
- * 从数据库中恢复消息队列到内存
- * *****************/
- void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr)
- {
- const std::string selectSql = "select * from msg_queue_table;";
- if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr))
- {
- LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;
- exit(1);
- }
- }
- /**************
- * 删除数据库表(仅调试)
- * ***************/
- void removeTable()
- {
- const std::string dropSql = "drop table if exists msg_queue_table;";
- if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr))
- {
- LOG(WARNING) << "remove table msg_queue_table fail" << endl;
- }
- }
- private:
- void createTable()
- {
- const std::string createSql = "create table if not exists msg_queue_table(\
- name varchar(32) primary key,\
- durable int\
- );";
- if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr))
- {
- LOG(FATAL) << "create table msg_queue_table fail" << endl;
- exit(1);
- }
- }
- static int selectCallback(void *arg, int colNum, char **line, char **fields)
- {
- auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);
- std::string name = line[0];
- bool isDurable = std::stoi(line[1]);
- auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
- mapPtr->insert({name, msgQueuePtr});
- return 0;
- }
- };
- class MsgQueueManager
- {
- private:
- MsgQueueMapper _mapper;
- std::unordered_map<std::string, MsgQueuePtr> _msgQueues;
- std::mutex _mtx;
- public:
- MsgQueueManager(const std::string &dbName)
- : _mapper(dbName)
- {
- _mapper.recoverMsgQueue(&_msgQueues);
- }
- /***********
- * 声明队列
- * ************/
- bool declareMsgQueue(const std::string &name, bool isDurable)
- {
- std::unique_lock<std::mutex> lck(_mtx);
- if (_msgQueues.count(name))
- {
- return true;
- }
- auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);
- _msgQueues[name] = msgQueuePtr;
- if (isDurable)
- {
- return _mapper.insertMsgQueue(msgQueuePtr);
- }
- return true;
- }
- /**********
- * 移除队列
- * ***********/
- void removeMsgQueue(const std::string &name)
- {
- std::unique_lock<std::mutex> lck(_mtx);
-
- auto it = _msgQueues.find(name);
- if (it == _msgQueues.end())
- {
- return;
- }
- if (it->second->_isDurable)
- {
- _mapper.removeMsgQueue(name);
- }
- _msgQueues.erase(name);
- }
- /************
- * 获取指定队列
- * ***************/
- MsgQueuePtr getMsgQueue(const std::string &name)
- {
- std::unique_lock<std::mutex> lck(_mtx);
-
- if (_msgQueues.count(name) == 0)
- {
- return nullptr;
- }
- return _msgQueues[name];
- }
- /*************
- * 清理所有队列(仅调试)
- * ******************/
- void clearMsgQueues()
- {
- std::unique_lock<std::mutex> lck(_mtx);
- _msgQueues.clear();
- _mapper.removeTable();
- }
- };
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |