商道如狼道 发表于 2024-11-16 02:33:36

C++使用开源ConcurrentQueue库处理自定义业务数据类

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据布局,适用于多线程情况中的数据交换。concurrentqueue 支持多个生产者和多个消耗者,而且提供了多种配置选项来优化性能和内存使用。
ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在吸收到四种不同的业务数据时,需要按数据分类写进RocketMQ。
0x02 自定义类用来存放和区分数据流



[*]设计BusinessFlowMsg

[*]该类有定义消息的范例
[*]该类中设计ST_BusinessSign布局体消息头,用来区分消息和获取消息体的长度
[*]BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP

#include <string.h>

#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04

#pragma pack(push)
#pragma pack(1)

// 消息头
typedef struct s_BusinessSign
{
    int sign; // 业务标识
    unsigned int length; // 消息体的长度
}ST_BusinessSign;

#pragma pack(pop)

class BusinessFlowMsg
{
public:
    BusinessFlowMsg() = default;
    ~BusinessFlowMsg() = default;

    char* get_data()
    {
      return _data;
    }

    int data_size()
    {
      return businessSign.length;
    }

    char* get_body()
    {
      return _data + sizeof(ST_BusinessSign);
    }

    int body_size()
    {
      return data_size() - sizeof(ST_BusinessSign);
    }

    ST_BusinessSign* header()
    {
      return &businessSign;
    }

    bool set_data(const char* data, int length, int sign)
    {
      if(length > (max_body_len + sizeof(ST_BusinessSign)))
      {
            return false;
      }

      businessSign.sign = sign;
      businessSign.length = length;
      memcpy(_data + sizeof(ST_BusinessSign), data, length);

      return true;
    }

private:
    enum
    {
      max_body_len = 512 * 1024 // 512KB
    };

    ST_BusinessSign businessSign;
    char _data;
};

#endif // BUSINESSFLOWMSG_HPP
0x03 创建PngUnit类模仿接到不同的业务数据



[*]PngUnit范例创建了四个线程来模仿不同的数据流。
[*]PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证实确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H

#include <thread>
#include <mutex>

class PngUnit
{
public:
    PngUnit();
    ~PngUnit() = default;
    void start();
    void sendPNG(int sign);
    void sendAIS(int sign);
    void sendRoute(int sign);
    void sendVoyage(int sign);

private:
    std::thread m_th_png;
    std::thread m_th_ais;
    std::thread m_th_route;
    std::thread m_th_voyage;

    // std::mutex queue_mutex;// 互斥锁
};

#endif // PNGUNIT_H
#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"

PngUnit::PngUnit()
{

}

void PngUnit::start()
{
    // m_th = std::thread((){
    //   sendPNG(100);
    // });

    if(m_th_png.joinable())
    {
      printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");
      return;
    }

    m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));
    m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));
    m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));
    m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}

void PngUnit::sendPNG(int sign)
{
    while (true)
    {
      BusinessFlowMsg pngMsg;
      const char* pngFile = "1234567890ABCDEF";
      int fileLen = strlen(pngFile) + 1;

      pngMsg.set_data(pngFile, fileLen, sign);

      {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg))
            {
                printf("Failed to set PNG message data");
            }
      }

      sleep(1);
    }
}

void PngUnit::sendAIS(int sign)
{
    json11::Json::object obj = {
      {"message", "AIS"},
      {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
      BusinessFlowMsg aisMsg;
      int jsonStrLen = jsonStr.size();
      aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

      {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg))
            {
                printf("Failed to set AIS message data");
            }
      }

      sleep(2);
    }
}

void PngUnit::sendRoute(int sign)
{
    json11::Json::object obj = {
      {"message", "Route"},
      {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
      BusinessFlowMsg routeMsg;
      int jsonStrLen = jsonStr.size();
      routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

      {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg))
            {
                printf("Failed to set ROUTE message data");
            }
      }

      sleep(3);
    }
}

void PngUnit::sendVoyage(int sign)
{
    json11::Json::object obj = {
      {"message", "Voyage"},
      {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
      BusinessFlowMsg voyageMsg;
      int jsonStrLen = jsonStr.size();
      voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

      {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg))
            {
                printf("Failed to set VOYAGE message data");
            }
      }

      sleep(4);
    }
}
0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ



[*]RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H

#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"

class RocketMQUtils
{
public:
    static RocketMQUtils* Instance();

private:
    RocketMQUtils();
    ~RocketMQUtils()=default;
    RocketMQUtils(const RocketMQUtils &) = delete;
    RocketMQUtils& operator=(const RocketMQUtils &) = delete;

    RocketMQUtils(RocketMQUtils &&) = delete;
    RocketMQUtils& operator=(RocketMQUtils &&) = delete;

public:
    void start();
    void push();
    void poll();
    bool write(char *data, int len, int sign);

public:
    moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;

private:
    static RocketMQUtils* _instance;

    std::thread _pushThread;
};

#endif // ROCKETMQUTILS_H
#include "rocketmqutils.h"
#include <unistd.h>

RocketMQUtils * RocketMQUtils::_instance = nullptr;

RocketMQUtils *RocketMQUtils::Instance()
{
    if(_instance == nullptr)
    {
      _instance = new RocketMQUtils();
    }
    return _instance;
}

RocketMQUtils::RocketMQUtils()
{

}

void RocketMQUtils::start()
{
    if(_pushThread.joinable())
    {
      return;
    }
    _pushThread = std::thread(&RocketMQUtils::push, this);
}

void RocketMQUtils::push()
{
    while (true)
    {
      BusinessFlowMsg busiMsg;

      if(g_businessQueue.try_dequeue(busiMsg))
      {
            write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);
      }
      else
      {
            printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");
            sleep(2);
      }
    }
}

void RocketMQUtils::poll()
{

}

bool RocketMQUtils::write(char *data, int len, int sign)
{
    std::string msg(data, len);
    if (sign == MSG_ROCKETMQ_PNG)
    {
      printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_AIS)
    {
      printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_ROUTE)
    {
      printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_VOYAGE)
    {
      printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else
    {
      printf("[%s:%d] data sign error\n", __FILE__, __LINE__);
    }
}
0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"

using namespace std;

int main()
{
    cout << "==Start==" << endl;
    RocketMQUtils* rocketmq = RocketMQUtils::Instance();
    rocketmq->start();

    std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();
    ptrPngUnit->start();

    getchar();

    cout << "==Over==" << endl;

    return 0;
}
https://i-blog.csdnimg.cn/direct/9f50b2896f954f52a6369bf74e90ce81.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: C++使用开源ConcurrentQueue库处理自定义业务数据类