C++使用开源ConcurrentQueue库处理自定义业务数据类

打印 上一主题 下一主题

主题 1807|帖子 1807|积分 5421

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据布局,适用于多线程情况中的数据交换。concurrentqueue 支持多个生产者和多个消耗者,而且提供了多种配置选项来优化性能和内存使用。
ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在吸收到四种不同的业务数据时,需要按数据分类写进RocketMQ。
0x02 自定义类用来存放和区分数据流



  • 设计BusinessFlowMsg

  • 该类有定义消息的范例
  • 该类中设计ST_BusinessSign布局体消息头,用来区分消息和获取消息体的长度
  • BusinessFlowMsg类可以存放的数据长度为512KB
  1. #ifndef BUSINESSFLOWMSG_HPP
  2. #define BUSINESSFLOWMSG_HPP
  3. #include <string.h>
  4. #define MSG_ROCKETMQ_PNG 0x01
  5. #define MSG_ROCKETMQ_AIS 0x02
  6. #define MSG_ROCKETMQ_ROUTE 0x03
  7. #define MSG_ROCKETMQ_VOYAGE 0x04
  8. #pragma pack(push)
  9. #pragma pack(1)
  10. // 消息头
  11. typedef struct s_BusinessSign
  12. {
  13.     int sign; // 业务标识
  14.     unsigned int length; // 消息体的长度
  15. }ST_BusinessSign;
  16. #pragma pack(pop)
  17. class BusinessFlowMsg
  18. {
  19. public:
  20.     BusinessFlowMsg() = default;
  21.     ~BusinessFlowMsg() = default;
  22.     char* get_data()
  23.     {
  24.         return _data;
  25.     }
  26.     int data_size()
  27.     {
  28.         return businessSign.length;
  29.     }
  30.     char* get_body()
  31.     {
  32.         return _data + sizeof(ST_BusinessSign);
  33.     }
  34.     int body_size()
  35.     {
  36.         return data_size() - sizeof(ST_BusinessSign);
  37.     }
  38.     ST_BusinessSign* header()
  39.     {
  40.         return &businessSign;
  41.     }
  42.     bool set_data(const char* data, int length, int sign)
  43.     {
  44.         if(length > (max_body_len + sizeof(ST_BusinessSign)))
  45.         {
  46.             return false;
  47.         }
  48.         businessSign.sign = sign;
  49.         businessSign.length = length;
  50.         memcpy(_data + sizeof(ST_BusinessSign), data, length);
  51.         return true;
  52.     }
  53. private:
  54.     enum
  55.     {
  56.         max_body_len = 512 * 1024 // 512KB
  57.     };
  58.     ST_BusinessSign businessSign;
  59.     char _data[max_body_len];
  60. };
  61. #endif // BUSINESSFLOWMSG_HPP
复制代码
0x03 创建PngUnit类模仿接到不同的业务数据



  • PngUnit范例创建了四个线程来模仿不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证实确实如此。
  1. #ifndef PNGUNIT_H
  2. #define PNGUNIT_H
  3. #include <thread>
  4. #include <mutex>
  5. class PngUnit
  6. {
  7. public:
  8.     PngUnit();
  9.     ~PngUnit() = default;
  10.     void start();
  11.     void sendPNG(int sign);
  12.     void sendAIS(int sign);
  13.     void sendRoute(int sign);
  14.     void sendVoyage(int sign);
  15. private:
  16.     std::thread m_th_png;
  17.     std::thread m_th_ais;
  18.     std::thread m_th_route;
  19.     std::thread m_th_voyage;
  20.     // std::mutex queue_mutex;  // 互斥锁
  21. };
  22. #endif // PNGUNIT_H
复制代码

  1. #include "pngunit.h"
  2. #include <unistd.h>
  3. #include "rocketmqutils.h"
  4. #include "BusinessFlowMsg.hpp"
  5. #include "json11/json11.hpp"
  6. PngUnit::PngUnit()
  7. {
  8. }
  9. void PngUnit::start()
  10. {
  11.     // m_th = std::thread([this](){
  12.     //     sendPNG(100);
  13.     // });
  14.     if(m_th_png.joinable())
  15.     {
  16.         printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");
  17.         return;
  18.     }
  19.     m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));
  20.     m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));
  21.     m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));
  22.     m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
  23. }
  24. void PngUnit::sendPNG(int sign)
  25. {
  26.     while (true)
  27.     {
  28.         BusinessFlowMsg pngMsg;
  29.         const char* pngFile = "1234567890ABCDEF";
  30.         int fileLen = strlen(pngFile) + 1;
  31.         pngMsg.set_data(pngFile, fileLen, sign);
  32.         {
  33.             // std::lock_guard<std::mutex> lock(queue_mutex);
  34.             if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg))
  35.             {
  36.                 printf("Failed to set PNG message data");
  37.             }
  38.         }
  39.         sleep(1);
  40.     }
  41. }
  42. void PngUnit::sendAIS(int sign)
  43. {
  44.     json11::Json::object obj = {
  45.         {"message", "AIS"},
  46.         {"response", "success"}
  47.     };
  48.     std::string jsonStr = json11::Json(obj).dump();
  49.     while (true)
  50.     {
  51.         BusinessFlowMsg aisMsg;
  52.         int jsonStrLen = jsonStr.size();
  53.         aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
  54.         {
  55.             // std::lock_guard<std::mutex> lock(queue_mutex);
  56.             if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg))
  57.             {
  58.                 printf("Failed to set AIS message data");
  59.             }
  60.         }
  61.         sleep(2);
  62.     }
  63. }
  64. void PngUnit::sendRoute(int sign)
  65. {
  66.     json11::Json::object obj = {
  67.         {"message", "Route"},
  68.         {"response", "success"}
  69.     };
  70.     std::string jsonStr = json11::Json(obj).dump();
  71.     while (true)
  72.     {
  73.         BusinessFlowMsg routeMsg;
  74.         int jsonStrLen = jsonStr.size();
  75.         routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
  76.         {
  77.             // std::lock_guard<std::mutex> lock(queue_mutex);
  78.             if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg))
  79.             {
  80.                 printf("Failed to set ROUTE message data");
  81.             }
  82.         }
  83.         sleep(3);
  84.     }
  85. }
  86. void PngUnit::sendVoyage(int sign)
  87. {
  88.     json11::Json::object obj = {
  89.         {"message", "Voyage"},
  90.         {"response", "success"}
  91.     };
  92.     std::string jsonStr = json11::Json(obj).dump();
  93.     while (true)
  94.     {
  95.         BusinessFlowMsg voyageMsg;
  96.         int jsonStrLen = jsonStr.size();
  97.         voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);
  98.         {
  99.             // std::lock_guard<std::mutex> lock(queue_mutex);
  100.             if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg))
  101.             {
  102.                 printf("Failed to set VOYAGE message data");
  103.             }
  104.         }
  105.         sleep(4);
  106.     }
  107. }
复制代码
0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ



  • RocketMQUtils类是一个单例类
  1. #ifndef ROCKETMQUTILS_H
  2. #define ROCKETMQUTILS_H
  3. #include <thread>
  4. #include <concurrentqueue/moodycamel/concurrentqueue.h>
  5. #include "BusinessFlowMsg.hpp"
  6. class RocketMQUtils
  7. {
  8. public:
  9.     static RocketMQUtils* Instance();
  10. private:
  11.     RocketMQUtils();
  12.     ~RocketMQUtils()=default;
  13.     RocketMQUtils(const RocketMQUtils &) = delete;
  14.     RocketMQUtils& operator=(const RocketMQUtils &) = delete;
  15.     RocketMQUtils(RocketMQUtils &&) = delete;
  16.     RocketMQUtils& operator=(RocketMQUtils &&) = delete;
  17. public:
  18.     void start();
  19.     void push();
  20.     void poll();
  21.     bool write(char *data, int len, int sign);
  22. public:
  23.     moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;
  24. private:
  25.     static RocketMQUtils* _instance;
  26.     std::thread _pushThread;
  27. };
  28. #endif // ROCKETMQUTILS_H
复制代码

  1. #include "rocketmqutils.h"
  2. #include <unistd.h>
  3. RocketMQUtils * RocketMQUtils::_instance = nullptr;
  4. RocketMQUtils *RocketMQUtils::Instance()
  5. {
  6.     if(_instance == nullptr)
  7.     {
  8.         _instance = new RocketMQUtils();
  9.     }
  10.     return _instance;
  11. }
  12. RocketMQUtils::RocketMQUtils()
  13. {
  14. }
  15. void RocketMQUtils::start()
  16. {
  17.     if(_pushThread.joinable())
  18.     {
  19.         return;
  20.     }
  21.     _pushThread = std::thread(&RocketMQUtils::push, this);
  22. }
  23. void RocketMQUtils::push()
  24. {
  25.     while (true)
  26.     {
  27.         BusinessFlowMsg busiMsg;
  28.         if(g_businessQueue.try_dequeue(busiMsg))
  29.         {
  30.             write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);
  31.         }
  32.         else
  33.         {
  34.             printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");
  35.             sleep(2);
  36.         }
  37.     }
  38. }
  39. void RocketMQUtils::poll()
  40. {
  41. }
  42. bool RocketMQUtils::write(char *data, int len, int sign)
  43. {
  44.     std::string msg(data, len);
  45.     if (sign == MSG_ROCKETMQ_PNG)
  46.     {
  47.         printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
  48.     }
  49.     else if (sign == MSG_ROCKETMQ_AIS)
  50.     {
  51.         printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
  52.     }
  53.     else if (sign == MSG_ROCKETMQ_ROUTE)
  54.     {
  55.         printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
  56.     }
  57.     else if (sign == MSG_ROCKETMQ_VOYAGE)
  58.     {
  59.         printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
  60.     }
  61.     else
  62.     {
  63.         printf("[%s:%d] data sign error\n", __FILE__, __LINE__);
  64.     }
  65. }
复制代码
0x05 使用演示

  1. #include <iostream>
  2. #include <memory>
  3. #include "rocketmqutils.h"
  4. #include "pngunit.h"
  5. using namespace std;
  6. int main()
  7. {
  8.     cout << "==Start==" << endl;
  9.     RocketMQUtils* rocketmq = RocketMQUtils::Instance();
  10.     rocketmq->start();
  11.     std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();
  12.     ptrPngUnit->start();
  13.     getchar();
  14.     cout << "==Over==" << endl;
  15.     return 0;
  16. }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

商道如狼道

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表