[C++][第三方库][RabbitMq]详细讲解

宁睿  金牌会员 | 2024-10-20 00:33:11 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 950|帖子 950|积分 2860


1.介绍



  • RabbitMQ:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
  • 核心概念:交换机、队列、绑定、消息
  • 交换机类型

    • 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中
    • 直接交换:根据消息中的bkey与绑定的rkey对比,一致则放入队列
    • 主题交换:使用bkey与绑定的rkey进行规则匹配,成功则放入队列


2.安装

1.RabbitMq



  • 安装:sudo apt install rabbitmq-server
  • 简单使用
    1. # 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息
    2. #添加用户
    3. sudo rabbitmqctl add_user root <PASSWORD>
    4. #设置用户tag
    5. sudo rabbitmqctl set_user_tags root administrator
    6. #设置用户权限
    7. sudo rabbitmqctl set_permissions -p / root "." "." ".*"
    8. # RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672
    9. sudo rabbitmq-plugins enable rabbitmq_management
    复制代码

2.客户端库



  • C语言库
  • C++库
    1. sudo apt install libev-dev #libev 网络库组件
    2. git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
    3. cd AMQP-CPP/
    4. make
    5. make install
    复制代码
  • 如果安装时出现以下报错,则表现ssl版本出现问题
    1. /usr/include/openssl/macros.h:147:4: error: #error
    2. "OPENSSL_API_COMPAT expresses an impossible API compatibility
    3. level"
    4.   147 | #  error "OPENSSL_API_COMPAT expresses an impossible API
    5. compatibility level"
    6.       |    ^~~~~
    7. In file included from /usr/include/openssl/ssl.h:18,
    8.                  from linux_tcp/openssl.h:20,
    9.                  from linux_tcp/openssl.cpp:12:
    10. /usr/include/openssl/bio.h:687:1: error: expected constructor,
    11. destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’
    12.   687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
    13. unsigned short *port_ptr))
    复制代码
  • 解决方案:卸载当前的ssl库,重新进行修复安装
    1. dpkg -l | grep ssl
    2. sudo dpkg -P --force-all libevent-openssl-2.1-7
    3. sudo dpkg -P --force-all openssl
    4. sudo dpkg -P --force-all libssl-dev
    5. sudo apt --fix-broken install
    复制代码

3.AMQP-CPP 简单使用

1.介绍



  • AMQP-CPP是用于与RabbitMq消息中间件通讯的C++库

    • 它能剖析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包
    • AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成

  • AMQP-CPP提供了可选的网络层接口,它预界说了TCP模块,用户就不用自己实现网络IO,

    • 也可以选择libevent、libev、libuv、asio等异步通讯组件, 需要手动安装对应的组件

  • AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中
  • 注意:它需要C++17的支持

2.使用



  • AMQP-CPP的使用有两种模式:

    • 使用默认的TCP模块进行网络通讯
    • 使用扩展的libevent、libev、libuv、asio异步通讯组件进行通讯

  • 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP:ibEvHandler

4.类与接口

1.Channel



  • channel是一个虚拟连接,一个连接上可以建立多个通道

    • 而且所有的RabbitMq指令都是通过channel传输

      • 以是连接建立后的第一步,就是建立channel

    • 因为所有操纵是异步的,以是在channel上执行指令的返回值并不能作为操纵执行结果

      • 实际上它返回的是Deferred类,可以使用它安装处置惩罚函数


  1. namespace AMQP
  2. {
  3.     /**
  4.      *  Generic callbacks that are used by many deferred objects
  5.      */
  6.     using SuccessCallback = std::function<void()>;
  7.     using ErrorCallback = std::function<void(const char *message)>;
  8.     using FinalizeCallback = std::function<void()>;
  9.    
  10.     /**
  11.      *  Declaring and deleting a queue
  12.      */
  13.     using QueueCallback = std::function<void(const std::string &name,
  14.                                                    uint32_t messagecount,
  15.                                                    uint32_t consumercount)>;
  16.     using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
  17.     using MessageCallback = std::function<void(const Message &message,
  18.                                                    uint64_t deliveryTag,
  19.                                                    bool redelivered)>;
  20.     // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback
  21.     using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
  22.     // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调
  23.     using PublishAckCallback = std::function<void()>;
  24.     using PublishNackCallback = std::function<void()>;
  25.     using PublishLostCallback = std::function<void()>;
  26.         // 信道类
  27.     class Channel
  28.     {
  29.         Channel(Connection *connection);
  30.         bool connected();
  31.         /**
  32.             *声明交换机
  33.             *如果提供了一个空名称,则服务器将分配一个名称。
  34.             *以下flags可用于交换机:
  35.             *
  36.             *-durable     持久化,重启后交换机依然有效
  37.             *-autodelete  删除所有连接的队列后,自动删除交换
  38.             *-passive     仅被动检查交换机是否存在
  39.             *-internal    创建内部交换
  40.             *
  41.             *@param name    交换机的名称
  42.             *@param-type    交换类型
  43.                 enum ExchangeType
  44.                 {
  45.                     fanout,  广播交换,绑定的队列都能拿到消息
  46.                     direct,  直接交换,只将消息交给routingkey一致的队列
  47.                     topic,   主题交换,将消息交给符合bindingkey规则的队列
  48.                     headers,
  49.                     consistent_hash,
  50.                     message_deduplication
  51.                 };
  52.             *@param flags    交换机标志
  53.             *@param arguments其他参数
  54.             *
  55.             *此函数返回一个延迟处理程序。可以安装回调
  56.             using onSuccess(), onError() and onFinalize() methods.
  57.         */
  58.         Deferred &declareExchange(const std::string_view &name,
  59.                                   ExchangeType type,
  60.                                   int flags,
  61.                                   const Table &arguments);
  62.         /**
  63.             *声明队列
  64.             *如果不提供名称,服务器将分配一个名称。
  65.             *flags可以是以下值的组合:
  66.             *
  67.             *-durable 持久队列在代理重新启动后仍然有效
  68.             *-autodelete 当所有连接的使用者都离开时,自动删除队列
  69.             *-passive 仅被动检查队列是否存在
  70.             *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
  71.             *
  72.             *@param name        队列的名称
  73.             *@param flags       标志组合
  74.             *@param arguments  可选参数
  75.             *
  76.             *此函数返回一个延迟处理程序。可以安装回调
  77.             *使用onSuccess()、onError()和onFinalize()方法。
  78.             *
  79.             Deferred &onError(const char *message)
  80.             *
  81.             *可以安装的onSuccess()回调应该具有以下签名:
  82.             void myCallback(const std::string &name,  
  83.                 uint32_t messageCount,  
  84.                 uint32_t consumerCount);
  85.             例如:
  86.             channel.declareQueue("myqueue").onSuccess(
  87.                 [](const std::string &name,  
  88.                     uint32_t messageCount,
  89.                     uint32_t consumerCount) {
  90.                        std::cout << "Queue '" << name << "' ";
  91.                        std::cout << "has been declared with ";
  92.                        std::cout << messageCount;
  93.                        std::cout << " messages and ";
  94.                        std::cout << consumerCount;
  95.                        std::cout << " consumers" << std::endl;
  96.          *  });
  97.         */
  98.         DeferredQueue &declareQueue(const std::string_view &name,
  99.                                     int flags,
  100.                                     const Table &arguments);
  101.         /**
  102.             *将队列绑定到交换机
  103.             *
  104.             *@param exchange     源交换机
  105.             *@param queue        目标队列
  106.             *@param routingkey   路由密钥
  107.             *@param arguments    其他绑定参数
  108.             *
  109.             *此函数返回一个延迟处理程序。可以安装回调
  110.             *使用onSuccess()、onError()和onFinalize()方法。
  111.         */
  112.         Deferred &bindQueue(const std::string_view &exchange,
  113.                             const std::string_view &queue,
  114.                             const std::string_view &routingkey,
  115.                             const Table &arguments);
  116.         /**
  117.             *将消息发布到exchange
  118.             *您必须提供交换机的名称和路由密钥。
  119.             然后,RabbitMQ将尝试将消息发送到一个或多个队列。
  120.             使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。
  121.             默认情况下,不可更改的消息将被静默地丢弃。
  122.             *  
  123.             *如果设置了'mandatory'或'immediate'标志,
  124.             则无法处理的消息将返回到应用程序。
  125.             在开始发布之前,请确保您已经调用了recall()-方法,
  126.             并设置了所有适当的处理程序来处理这些返回的消息。
  127.             *  
  128.             *可以提供以下flags:
  129.             *  
  130.             *-mandatory 如果设置,服务器将返回未发送到队列的消息
  131.             *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
  132.             *@param exchange要发布到的交易所
  133.             *@param routingkey路由密钥
  134.             *@param envelope要发送的完整信封
  135.             *@param message要发送的消息
  136.             *@param size消息的大小
  137.             *@param flags可选标志
  138.         */
  139.         bool publish(const std::string_view &exchange,
  140.                      const std::string_view &routingKey,
  141.                      const std::string &message,
  142.                      int flags = 0);
  143.         /**
  144.             *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息
  145.             *
  146.             *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。
  147.             consumer tag是一个字符串标识符,
  148.             如果您以后想通过channel::cancel()调用停止它,
  149.             可以使用它来标识使用者。
  150.             *如果您没有指定使用者tag,服务器将为您分配一个。
  151.             *
  152.             *支持以下flags:
  153.             *
  154.             *-nolocal    如果设置了,则不会同时消耗在此通道上发布的消息
  155.             *-noack      如果设置了,则不必对已消费的消息进行确认
  156.             *-exclusive  请求独占访问,只有此使用者可以访问队列
  157.             *
  158.             *@param queue    您要使用的队列
  159.             *@param tag      将与此消费操作关联的消费者标记
  160.             *@param flags    其他标记
  161.             *@param arguments其他参数
  162.             *
  163.             *此函数返回一个延迟处理程序。
  164.             可以使用onSuccess()、onError()和onFinalize()方法安装回调
  165.             可以安装的onSuccess()回调应该具有以下格式:
  166.                 void myCallback(const std::string_view&tag);
  167.             样例:
  168.             channel.consume("myqueue").onSuccess(
  169.                 [](const std::string_view& tag) {
  170.                     std::cout << "Started consuming under tag ";
  171.                     std::cout << tag << std::endl;
  172.             });
  173.         */
  174.         DeferredConsumer &consume(const std::string_view &queue,
  175.                                   const std::string_view &tag,
  176.                                   int flags,
  177.                                   const Table &arguments);
  178.         /**
  179.             *确认接收到的消息
  180.             *
  181.             *消费者客户端对收到的消息进行确认应答
  182.             *
  183.             *当在DeferredConsumer::onReceived()方法中接收到消息时,
  184.             必须确认该消息,
  185.             以便RabbitMQ将其从队列中删除(除非使用noack选项消费)
  186.             *
  187.             *支持以下标志:
  188.             *
  189.             *-多条确认多条消息:之前传递的所有未确认消息也会得到确认
  190.             *
  191.             *@param deliveryTag    消息的唯一delivery标签
  192.             *@param flags          可选标志
  193.             *@return bool
  194.         */
  195.         bool ack(uint64_t deliveryTag, int flags=0);
  196.     };
  197.     class DeferredConsumer
  198.     {
  199.         /*
  200.             注册一个回调函数,该函数在消费者启动时被调用
  201.             void onSuccess(const std::string &consumertag)
  202.         */
  203.         DeferredConsumer &onSuccess(const ConsumeCallback& callback);
  204.         /*
  205.             注册回调函数,用于接收到一个完整消息的时候被调用
  206.             void MessageCallback(const AMQP::Message &message,  
  207.                 uint64_t deliveryTag, bool redelivered)
  208.         */
  209.         DeferredConsumer &onReceived(const MessageCallback& callback);
  210.         /* Alias for onReceived() */
  211.         DeferredConsumer &onMessage(const MessageCallback& callback);
  212.         /*
  213.             注册要在服务器取消消费者时调用的函数
  214.             void CancelCallback(const std::string &tag)
  215.         */
  216.         DeferredConsumer &onCancelled(const CancelCallback& callback);
  217.     };
  218.     class Message : public Envelope
  219.     {
  220.         const std::string &exchange();
  221.         const std::string &routingkey();
  222.     };
  223.    
  224.     class Envelope : public MetaData
  225.     {
  226.         const char *body();  // 获取消息正文
  227.         uint64_t bodySize(); // 获取消息正文大小
  228.     };
  229. }
复制代码

2.ev

  1. typedef struct ev_async
  2. {
  3.     EV_WATCHER (ev_async);
  4.     EV_ATOMIC_T sent; /* private */
  5. }ev_async;
  6. //break type
  7. enum
  8. {
  9.     EVBREAK_CANCEL = 0, /* undo unloop */
  10.     EVBREAK_ONE    = 1, /* unloop once */
  11.     EVBREAK_ALL    = 2  /* unloop all loops */
  12. };
  13. // 实例化并获取IO事件监控接口句柄
  14. struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
  15. # define EV_DEFAULT  ev_default_loop (0)
  16. // 开始运行IO事件监控, 这是一个阻塞接口
  17. int  ev_run (struct ev_loop *loop);
  18. /* break out of the loop */
  19. // 结束IO监控
  20. // 如果在主线程进行ev_run(), 则可以直接调用,
  21. // 如果在其他线程中进行ev_run(), 需要通过异步通知进行
  22. void ev_break (struct ev_loop *loop, int32_t break_type) ;  
  23. void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
  24. // 初始化异步事件结构, 并设置回调函数
  25. void ev_async_init(ev_async *w, callback cb);
  26. // 启动事件监控循环中的异步任务处理
  27. void ev_async_start(struct ev_loop *loop, ev_async *w);
  28. // 发送当前异步事件到异步线程中执行
  29. void ev_async_send(struct ev_loop *loop, ev_async *w);
复制代码

5.使用

1.publish.cc

  1. #include <ev.h>
  2. #include <amqpcpp.h>
  3. #include <amqpcpp/libev.h>
  4. #include <openssl/ssl.h>
  5. #include <openssl/opensslv.h>
  6. int main()
  7. {
  8.     // 1.实例化底层网络通信框架的IO事件监控句柄
  9.     auto *loop = EV_DEFAULT;
  10.     // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
  11.     AMQP::LibEvHandler handler(loop);
  12.     // 3.实例化连接对象
  13.     AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
  14.     AMQP::TcpConnection connection(&handler, address);
  15.     // 4.实例化信道对象
  16.     AMQP::TcpChannel channel(&connection);
  17.     // 5.声明交换机
  18.     channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
  19.         .onError([](const char *message)
  20.                 { std::cout << "声明交换机失败: " << message << std::endl; })
  21.         .onSuccess([]()
  22.                 { std::cout << "test-exchange 交换机创建成功" << std::endl; });
  23.    
  24.     // 6.声明队列
  25.     channel.declareQueue("test-queue")
  26.         .onError([](const char *message)
  27.                  { std::cout << "声明队列失败: " << message << std::endl; })
  28.         .onSuccess([]()
  29.                  { std::cout << "test-queue 队列创建成功" << std::endl; });
  30.     // 7.针对交换机和队列进行绑定
  31.     channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
  32.         .onError([](const char *message)
  33.                  { std::cout << "test-exchange - test-queue 绑定失败: " \
  34.                  << message << std::endl; })
  35.         .onSuccess([]()
  36.                  { std::cout << "test-exchange - test-queue 绑定成功"
  37.                  << std::endl; });
  38.     // 8.向交换机发布消息
  39.     for (int i = 0; i < 5; ++i)
  40.     {
  41.         std::string msg = "Hello SnowK-" + std::to_string(i);
  42.         if(channel.publish("test-exchange", "test-queue-key", msg) == false)
  43.         {
  44.             std::cout << "publish 失败" << std::endl;
  45.         }
  46.     }
  47.     // 9.启动底层网络通信框架 -> 开启IO
  48.     ev_run(loop, 0);
  49.     return 0;
  50. }
复制代码

2.consume.cc

  1. #include <ev.h>
  2. #include <amqpcpp.h>
  3. #include <amqpcpp/libev.h>
  4. #include <openssl/ssl.h>
  5. #include <openssl/opensslv.h>
  6. void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message,
  7.                uint64_t deliveryTag, bool redelivered)
  8. {
  9.     std::string msg;
  10.     msg.assign(message.body(), message.bodySize());
  11.    
  12.     // 不能这样使用, AMQP::Message后面没有存'\0'
  13.     // std::cout << message << std::endl
  14.    
  15.     std::cout << msg << std::endl;
  16.     channel->ack(deliveryTag);
  17. }
  18. int main()
  19. {
  20.     // 1.实例化底层网络通信框架的IO事件监控句柄
  21.     auto *loop = EV_DEFAULT;
  22.     // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
  23.     AMQP::LibEvHandler handler(loop);
  24.     // 3.实例化连接对象
  25.     AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
  26.     AMQP::TcpConnection connection(&handler, address);
  27.     // 4.实例化信道对象
  28.     AMQP::TcpChannel channel(&connection);
  29.     // 5.声明交换机
  30.     channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
  31.         .onError([](const char *message)
  32.                  { std::cout << "声明交换机失败: " << message << std::endl; })
  33.         .onSuccess([]()
  34.                  { std::cout << "test-exchange 交换机创建成功" << std::endl; });
  35.     // 6.声明队列
  36.     channel.declareQueue("test-queue")
  37.         .onError([](const char *message)
  38.                  { std::cout << "声明队列失败: " << message << std::endl; })
  39.         .onSuccess([]()
  40.                  { std::cout << "test-queue 队列创建成功" << std::endl; });
  41.     // 7.针对交换机和队列进行绑定
  42.     channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
  43.         .onError([](const char *message)
  44.                  { std::cout << "test-exchange - test-queue 绑定失败: " \
  45.                                  << message << std::endl; })
  46.         .onSuccess([]()
  47.                  { std::cout << "test-exchange - test-queue 绑定成功"; });
  48.     // 8.订阅消息对垒 -> 设置消息处理回调函数
  49.     auto callback = std::bind(MessageCB, &channel, std::placeholders::_1,
  50.                               std::placeholders::_2, std::placeholders::_3);
  51.     channel.consume("test-queue", "consume-tag")
  52.         .onReceived(callback)
  53.         .onError([](const char *message)
  54.         {
  55.             std::cout << "订阅 test-queue 队列消息失败: " << message << std::endl;
  56.             exit(0);
  57.         });
  58.     // 9.启动底层网络通信框架 -> 开启IO
  59.     ev_run(loop, 0);
  60.     return 0;
  61. }
复制代码

3.makefile

  1. all: publish consume
  2. publish: publish.cc
  3.         g++ -o $@ $^ -lamqpcpp -lev -std=c++17
  4. consume: consume.cc
  5.         g++ -o $@ $^ -lamqpcpp -lev -std=c++17
  6. .PHONY:clean
  7. clean:
  8.         rm publish consume
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宁睿

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表