ConcurrentQueue开源库介绍
ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据布局,适用于多线程情况中的数据交换。concurrentqueue 支持多个生产者和多个消耗者,而且提供了多种配置选项来优化性能和内存使用。
ConcurrentQueue使用
0x01 使用场景说明
我的数据平台在吸收到四种不同的业务数据时,需要按数据分类写进RocketMQ。
0x02 自定义类用来存放和区分数据流
- 该类有定义消息的范例
- 该类中设计ST_BusinessSign布局体消息头,用来区分消息和获取消息体的长度
- BusinessFlowMsg类可以存放的数据长度为512KB
- #ifndef BUSINESSFLOWMSG_HPP
- #define BUSINESSFLOWMSG_HPP
- #include <string.h>
- #define MSG_ROCKETMQ_PNG 0x01
- #define MSG_ROCKETMQ_AIS 0x02
- #define MSG_ROCKETMQ_ROUTE 0x03
- #define MSG_ROCKETMQ_VOYAGE 0x04
- #pragma pack(push)
- #pragma pack(1)
- // 消息头
- typedef struct s_BusinessSign
- {
- int sign; // 业务标识
- unsigned int length; // 消息体的长度
- }ST_BusinessSign;
- #pragma pack(pop)
- class BusinessFlowMsg
- {
- public:
- BusinessFlowMsg() = default;
- ~BusinessFlowMsg() = default;
- char* get_data()
- {
- return _data;
- }
- int data_size()
- {
- return businessSign.length;
- }
- char* get_body()
- {
- return _data + sizeof(ST_BusinessSign);
- }
- int body_size()
- {
- return data_size() - sizeof(ST_BusinessSign);
- }
- ST_BusinessSign* header()
- {
- return &businessSign;
- }
- bool set_data(const char* data, int length, int sign)
- {
- if(length > (max_body_len + sizeof(ST_BusinessSign)))
- {
- return false;
- }
- businessSign.sign = sign;
- businessSign.length = length;
- memcpy(_data + sizeof(ST_BusinessSign), data, length);
- return true;
- }
- private:
- enum
- {
- max_body_len = 512 * 1024 // 512KB
- };
- ST_BusinessSign businessSign;
- char _data[max_body_len];
- };
- #endif // BUSINESSFLOWMSG_HPP
复制代码 0x03 创建PngUnit类模仿接到不同的业务数据
- PngUnit范例创建了四个线程来模仿不同的数据流。
- PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证实确实如此。
- #ifndef PNGUNIT_H
- #define PNGUNIT_H
- #include <thread>
- #include <mutex>
- class PngUnit
- {
- public:
- PngUnit();
- ~PngUnit() = default;
- void start();
- void sendPNG(int sign);
- void sendAIS(int sign);
- void sendRoute(int sign);
- void sendVoyage(int sign);
- private:
- std::thread m_th_png;
- std::thread m_th_ais;
- std::thread m_th_route;
- std::thread m_th_voyage;
- // std::mutex queue_mutex; // 互斥锁
- };
- #endif // PNGUNIT_H
复制代码
- #include "pngunit.h"
- #include <unistd.h>
- #include "rocketmqutils.h"
- #include "BusinessFlowMsg.hpp"
- #include "json11/json11.hpp"
- PngUnit::PngUnit()
- {
- }
- void PngUnit::start()
- {
- // m_th = std::thread([this](){
- // sendPNG(100);
- // });
- if(m_th_png.joinable())
- {
- printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");
- return;
- }
- m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));
- m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));
- m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));
- m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
- }
- void PngUnit::sendPNG(int sign)
- {
- while (true)
- {
- BusinessFlowMsg pngMsg;
- const char* pngFile = "1234567890ABCDEF";
- int fileLen = strlen(pngFile) + 1;
- pngMsg.set_data(pngFile, fileLen, sign);
- {
- // std::lock_guard<std::mutex> lock(queue_mutex);
- if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg))
- {
- printf("Failed to set PNG message data");
- }
- }
- sleep(1);
- }
- }
- void PngUnit::sendAIS(int sign)
- {
- json11::Json::object obj = {
- {"message", "AIS"},
- {"response", "success"}
- };
- std::string jsonStr = json11::Json(obj).dump();
- while (true)
- {
- BusinessFlowMsg aisMsg;
- int jsonStrLen = jsonStr.size();
- aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
- {
- // std::lock_guard<std::mutex> lock(queue_mutex);
- if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg))
- {
- printf("Failed to set AIS message data");
- }
- }
- sleep(2);
- }
- }
- void PngUnit::sendRoute(int sign)
- {
- json11::Json::object obj = {
- {"message", "Route"},
- {"response", "success"}
- };
- std::string jsonStr = json11::Json(obj).dump();
- while (true)
- {
- BusinessFlowMsg routeMsg;
- int jsonStrLen = jsonStr.size();
- routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
- {
- // std::lock_guard<std::mutex> lock(queue_mutex);
- if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg))
- {
- printf("Failed to set ROUTE message data");
- }
- }
- sleep(3);
- }
- }
- void PngUnit::sendVoyage(int sign)
- {
- json11::Json::object obj = {
- {"message", "Voyage"},
- {"response", "success"}
- };
- std::string jsonStr = json11::Json(obj).dump();
- while (true)
- {
- BusinessFlowMsg voyageMsg;
- int jsonStrLen = jsonStr.size();
- voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
- {
- // std::lock_guard<std::mutex> lock(queue_mutex);
- if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg))
- {
- printf("Failed to set VOYAGE message data");
- }
- }
- sleep(4);
- }
- }
复制代码 0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ
- #ifndef ROCKETMQUTILS_H
- #define ROCKETMQUTILS_H
- #include <thread>
- #include <concurrentqueue/moodycamel/concurrentqueue.h>
- #include "BusinessFlowMsg.hpp"
- class RocketMQUtils
- {
- public:
- static RocketMQUtils* Instance();
- private:
- RocketMQUtils();
- ~RocketMQUtils()=default;
- RocketMQUtils(const RocketMQUtils &) = delete;
- RocketMQUtils& operator=(const RocketMQUtils &) = delete;
- RocketMQUtils(RocketMQUtils &&) = delete;
- RocketMQUtils& operator=(RocketMQUtils &&) = delete;
- public:
- void start();
- void push();
- void poll();
- bool write(char *data, int len, int sign);
- public:
- moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;
- private:
- static RocketMQUtils* _instance;
- std::thread _pushThread;
- };
- #endif // ROCKETMQUTILS_H
复制代码
- #include "rocketmqutils.h"
- #include <unistd.h>
- RocketMQUtils * RocketMQUtils::_instance = nullptr;
- RocketMQUtils *RocketMQUtils::Instance()
- {
- if(_instance == nullptr)
- {
- _instance = new RocketMQUtils();
- }
- return _instance;
- }
- RocketMQUtils::RocketMQUtils()
- {
- }
- void RocketMQUtils::start()
- {
- if(_pushThread.joinable())
- {
- return;
- }
- _pushThread = std::thread(&RocketMQUtils::push, this);
- }
- void RocketMQUtils::push()
- {
- while (true)
- {
- BusinessFlowMsg busiMsg;
- if(g_businessQueue.try_dequeue(busiMsg))
- {
- write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);
- }
- else
- {
- printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");
- sleep(2);
- }
- }
- }
- void RocketMQUtils::poll()
- {
- }
- bool RocketMQUtils::write(char *data, int len, int sign)
- {
- std::string msg(data, len);
- if (sign == MSG_ROCKETMQ_PNG)
- {
- printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
- }
- else if (sign == MSG_ROCKETMQ_AIS)
- {
- printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
- }
- else if (sign == MSG_ROCKETMQ_ROUTE)
- {
- printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
- }
- else if (sign == MSG_ROCKETMQ_VOYAGE)
- {
- printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
- }
- else
- {
- printf("[%s:%d] data sign error\n", __FILE__, __LINE__);
- }
- }
复制代码 0x05 使用演示
- #include <iostream>
- #include <memory>
- #include "rocketmqutils.h"
- #include "pngunit.h"
- using namespace std;
- int main()
- {
- cout << "==Start==" << endl;
- RocketMQUtils* rocketmq = RocketMQUtils::Instance();
- rocketmq->start();
- std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();
- ptrPngUnit->start();
- getchar();
- cout << "==Over==" << endl;
- return 0;
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |