1.介绍
- RabbitMQ:消息队列组件,实现两个客户端主机之间消息传输的功能(发布&订阅)
- 核心概念:交换机、队列、绑定、消息
- 交换机类型:
- 广播交换:当交换机收到消息,则将消息发布到所有绑定的队列中
- 直接交换:根据消息中的bkey与绑定的rkey对比,一致则放入队列
- 主题交换:使用bkey与绑定的rkey进行规则匹配,成功则放入队列
2.安装
1.RabbitMq
- 安装:sudo apt install rabbitmq-server
- 简单使用:
- # 安装完成的时候默认有个用户guest,但是权限不够,要创建一个administrator用户,才可以做为远程登录和发表订阅消息
- #添加用户
- sudo rabbitmqctl add_user root <PASSWORD>
- #设置用户tag
- sudo rabbitmqctl set_user_tags root administrator
- #设置用户权限
- sudo rabbitmqctl set_permissions -p / root "." "." ".*"
- # RabbitMQ自带了web管理界面, 执行下面命令开启, 默认端口15672
- sudo rabbitmq-plugins enable rabbitmq_management
复制代码 2.客户端库
- C语言库
- C++库
- sudo apt install libev-dev #libev 网络库组件
- git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
- cd AMQP-CPP/
- make
- make install
复制代码 - 如果安装时出现以下报错,则表现ssl版本出现问题
- /usr/include/openssl/macros.h:147:4: error: #error
- "OPENSSL_API_COMPAT expresses an impossible API compatibility
- level"
- 147 | # error "OPENSSL_API_COMPAT expresses an impossible API
- compatibility level"
- | ^~~~~
- In file included from /usr/include/openssl/ssl.h:18,
- from linux_tcp/openssl.h:20,
- from linux_tcp/openssl.cpp:12:
- /usr/include/openssl/bio.h:687:1: error: expected constructor,
- destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’
- 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
- unsigned short *port_ptr))
复制代码 - 解决方案:卸载当前的ssl库,重新进行修复安装
- dpkg -l | grep ssl
- sudo dpkg -P --force-all libevent-openssl-2.1-7
- sudo dpkg -P --force-all openssl
- sudo dpkg -P --force-all libssl-dev
- sudo apt --fix-broken install
复制代码 3.AMQP-CPP 简单使用
1.介绍
- AMQP-CPP是用于与RabbitMq消息中间件通讯的C++库
- 它能剖析从RabbitMq服务发送来的数据,也可以生成发向RabbitMq的数据包
- AMQP-CPP库不会向RabbitMq建立网络连接,所有的网络IO由用户完成
- AMQP-CPP提供了可选的网络层接口,它预界说了TCP模块,用户就不用自己实现网络IO,
- 也可以选择libevent、libev、libuv、asio等异步通讯组件, 需要手动安装对应的组件
- AMQP-CPP完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中
- 注意:它需要C++17的支持
2.使用
- AMQP-CPP的使用有两种模式:
- 使用默认的TCP模块进行网络通讯
- 使用扩展的libevent、libev、libuv、asio异步通讯组件进行通讯
- 此处以libev为例,不需要自己实现monitor函数,可以直接使用AMQP:
ibEvHandler
4.类与接口
1.Channel
- channel是一个虚拟连接,一个连接上可以建立多个通道
- 而且所有的RabbitMq指令都是通过channel传输
- 因为所有操纵是异步的,以是在channel上执行指令的返回值并不能作为操纵执行结果
- 实际上它返回的是Deferred类,可以使用它安装处置惩罚函数
- namespace AMQP
- {
- /**
- * Generic callbacks that are used by many deferred objects
- */
- using SuccessCallback = std::function<void()>;
- using ErrorCallback = std::function<void(const char *message)>;
- using FinalizeCallback = std::function<void()>;
-
- /**
- * Declaring and deleting a queue
- */
- using QueueCallback = std::function<void(const std::string &name,
- uint32_t messagecount,
- uint32_t consumercount)>;
- using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
- using MessageCallback = std::function<void(const Message &message,
- uint64_t deliveryTag,
- bool redelivered)>;
- // 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallback
- using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
- // 使用确认包裹通道时,当消息被ack/nacked时,会调用这些回调
- using PublishAckCallback = std::function<void()>;
- using PublishNackCallback = std::function<void()>;
- using PublishLostCallback = std::function<void()>;
- // 信道类
- class Channel
- {
- Channel(Connection *connection);
- bool connected();
- /**
- *声明交换机
- *如果提供了一个空名称,则服务器将分配一个名称。
- *以下flags可用于交换机:
- *
- *-durable 持久化,重启后交换机依然有效
- *-autodelete 删除所有连接的队列后,自动删除交换
- *-passive 仅被动检查交换机是否存在
- *-internal 创建内部交换
- *
- *@param name 交换机的名称
- *@param-type 交换类型
- enum ExchangeType
- {
- fanout, 广播交换,绑定的队列都能拿到消息
- direct, 直接交换,只将消息交给routingkey一致的队列
- topic, 主题交换,将消息交给符合bindingkey规则的队列
- headers,
- consistent_hash,
- message_deduplication
- };
- *@param flags 交换机标志
- *@param arguments其他参数
- *
- *此函数返回一个延迟处理程序。可以安装回调
- using onSuccess(), onError() and onFinalize() methods.
- */
- Deferred &declareExchange(const std::string_view &name,
- ExchangeType type,
- int flags,
- const Table &arguments);
- /**
- *声明队列
- *如果不提供名称,服务器将分配一个名称。
- *flags可以是以下值的组合:
- *
- *-durable 持久队列在代理重新启动后仍然有效
- *-autodelete 当所有连接的使用者都离开时,自动删除队列
- *-passive 仅被动检查队列是否存在
- *-exclusive 队列仅存在于此连接,并且在连接断开时自动删除
- *
- *@param name 队列的名称
- *@param flags 标志组合
- *@param arguments 可选参数
- *
- *此函数返回一个延迟处理程序。可以安装回调
- *使用onSuccess()、onError()和onFinalize()方法。
- *
- Deferred &onError(const char *message)
- *
- *可以安装的onSuccess()回调应该具有以下签名:
- void myCallback(const std::string &name,
- uint32_t messageCount,
- uint32_t consumerCount);
- 例如:
- channel.declareQueue("myqueue").onSuccess(
- [](const std::string &name,
- uint32_t messageCount,
- uint32_t consumerCount) {
- std::cout << "Queue '" << name << "' ";
- std::cout << "has been declared with ";
- std::cout << messageCount;
- std::cout << " messages and ";
- std::cout << consumerCount;
- std::cout << " consumers" << std::endl;
- * });
- */
- DeferredQueue &declareQueue(const std::string_view &name,
- int flags,
- const Table &arguments);
- /**
- *将队列绑定到交换机
- *
- *@param exchange 源交换机
- *@param queue 目标队列
- *@param routingkey 路由密钥
- *@param arguments 其他绑定参数
- *
- *此函数返回一个延迟处理程序。可以安装回调
- *使用onSuccess()、onError()和onFinalize()方法。
- */
- Deferred &bindQueue(const std::string_view &exchange,
- const std::string_view &queue,
- const std::string_view &routingkey,
- const Table &arguments);
- /**
- *将消息发布到exchange
- *您必须提供交换机的名称和路由密钥。
- 然后,RabbitMQ将尝试将消息发送到一个或多个队列。
- 使用可选的flags参数,可以指定如果消息无法路由到队列时应该发生的情况。
- 默认情况下,不可更改的消息将被静默地丢弃。
- *
- *如果设置了'mandatory'或'immediate'标志,
- 则无法处理的消息将返回到应用程序。
- 在开始发布之前,请确保您已经调用了recall()-方法,
- 并设置了所有适当的处理程序来处理这些返回的消息。
- *
- *可以提供以下flags:
- *
- *-mandatory 如果设置,服务器将返回未发送到队列的消息
- *-immediate 如果设置,服务器将返回无法立即转发给使用者的消息。
- *@param exchange要发布到的交易所
- *@param routingkey路由密钥
- *@param envelope要发送的完整信封
- *@param message要发送的消息
- *@param size消息的大小
- *@param flags可选标志
- */
- bool publish(const std::string_view &exchange,
- const std::string_view &routingKey,
- const std::string &message,
- int flags = 0);
- /**
- *告诉RabbitMQ服务器已准备好使用消息-也就是 订阅队列消息
- *
- *调用此方法后,RabbitMQ开始向客户端应用程序传递消息。
- consumer tag是一个字符串标识符,
- 如果您以后想通过channel::cancel()调用停止它,
- 可以使用它来标识使用者。
- *如果您没有指定使用者tag,服务器将为您分配一个。
- *
- *支持以下flags:
- *
- *-nolocal 如果设置了,则不会同时消耗在此通道上发布的消息
- *-noack 如果设置了,则不必对已消费的消息进行确认
- *-exclusive 请求独占访问,只有此使用者可以访问队列
- *
- *@param queue 您要使用的队列
- *@param tag 将与此消费操作关联的消费者标记
- *@param flags 其他标记
- *@param arguments其他参数
- *
- *此函数返回一个延迟处理程序。
- 可以使用onSuccess()、onError()和onFinalize()方法安装回调
- 可以安装的onSuccess()回调应该具有以下格式:
- void myCallback(const std::string_view&tag);
- 样例:
- channel.consume("myqueue").onSuccess(
- [](const std::string_view& tag) {
- std::cout << "Started consuming under tag ";
- std::cout << tag << std::endl;
- });
- */
- DeferredConsumer &consume(const std::string_view &queue,
- const std::string_view &tag,
- int flags,
- const Table &arguments);
- /**
- *确认接收到的消息
- *
- *消费者客户端对收到的消息进行确认应答
- *
- *当在DeferredConsumer::onReceived()方法中接收到消息时,
- 必须确认该消息,
- 以便RabbitMQ将其从队列中删除(除非使用noack选项消费)
- *
- *支持以下标志:
- *
- *-多条确认多条消息:之前传递的所有未确认消息也会得到确认
- *
- *@param deliveryTag 消息的唯一delivery标签
- *@param flags 可选标志
- *@return bool
- */
- bool ack(uint64_t deliveryTag, int flags=0);
- };
- class DeferredConsumer
- {
- /*
- 注册一个回调函数,该函数在消费者启动时被调用
- void onSuccess(const std::string &consumertag)
- */
- DeferredConsumer &onSuccess(const ConsumeCallback& callback);
- /*
- 注册回调函数,用于接收到一个完整消息的时候被调用
- void MessageCallback(const AMQP::Message &message,
- uint64_t deliveryTag, bool redelivered)
- */
- DeferredConsumer &onReceived(const MessageCallback& callback);
- /* Alias for onReceived() */
- DeferredConsumer &onMessage(const MessageCallback& callback);
- /*
- 注册要在服务器取消消费者时调用的函数
- void CancelCallback(const std::string &tag)
- */
- DeferredConsumer &onCancelled(const CancelCallback& callback);
- };
- class Message : public Envelope
- {
- const std::string &exchange();
- const std::string &routingkey();
- };
-
- class Envelope : public MetaData
- {
- const char *body(); // 获取消息正文
- uint64_t bodySize(); // 获取消息正文大小
- };
- }
复制代码 2.ev
- typedef struct ev_async
- {
- EV_WATCHER (ev_async);
- EV_ATOMIC_T sent; /* private */
- }ev_async;
-
- //break type
- enum
- {
- EVBREAK_CANCEL = 0, /* undo unloop */
- EVBREAK_ONE = 1, /* unloop once */
- EVBREAK_ALL = 2 /* unloop all loops */
- };
- // 实例化并获取IO事件监控接口句柄
- struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
- # define EV_DEFAULT ev_default_loop (0)
-
- // 开始运行IO事件监控, 这是一个阻塞接口
- int ev_run (struct ev_loop *loop);
- /* break out of the loop */
- // 结束IO监控
- // 如果在主线程进行ev_run(), 则可以直接调用,
- // 如果在其他线程中进行ev_run(), 需要通过异步通知进行
- void ev_break (struct ev_loop *loop, int32_t break_type) ;
-
- void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
-
- // 初始化异步事件结构, 并设置回调函数
- void ev_async_init(ev_async *w, callback cb);
- // 启动事件监控循环中的异步任务处理
- void ev_async_start(struct ev_loop *loop, ev_async *w);
- // 发送当前异步事件到异步线程中执行
- void ev_async_send(struct ev_loop *loop, ev_async *w);
复制代码 5.使用
1.publish.cc
- #include <ev.h>
- #include <amqpcpp.h>
- #include <amqpcpp/libev.h>
- #include <openssl/ssl.h>
- #include <openssl/opensslv.h>
- int main()
- {
- // 1.实例化底层网络通信框架的IO事件监控句柄
- auto *loop = EV_DEFAULT;
- // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
- AMQP::LibEvHandler handler(loop);
- // 3.实例化连接对象
- AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
- AMQP::TcpConnection connection(&handler, address);
- // 4.实例化信道对象
- AMQP::TcpChannel channel(&connection);
- // 5.声明交换机
- channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
- .onError([](const char *message)
- { std::cout << "声明交换机失败: " << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-exchange 交换机创建成功" << std::endl; });
-
- // 6.声明队列
- channel.declareQueue("test-queue")
- .onError([](const char *message)
- { std::cout << "声明队列失败: " << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-queue 队列创建成功" << std::endl; });
- // 7.针对交换机和队列进行绑定
- channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
- .onError([](const char *message)
- { std::cout << "test-exchange - test-queue 绑定失败: " \
- << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-exchange - test-queue 绑定成功"
- << std::endl; });
- // 8.向交换机发布消息
- for (int i = 0; i < 5; ++i)
- {
- std::string msg = "Hello SnowK-" + std::to_string(i);
- if(channel.publish("test-exchange", "test-queue-key", msg) == false)
- {
- std::cout << "publish 失败" << std::endl;
- }
- }
- // 9.启动底层网络通信框架 -> 开启IO
- ev_run(loop, 0);
- return 0;
- }
复制代码 2.consume.cc
- #include <ev.h>
- #include <amqpcpp.h>
- #include <amqpcpp/libev.h>
- #include <openssl/ssl.h>
- #include <openssl/opensslv.h>
- void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message,
- uint64_t deliveryTag, bool redelivered)
- {
- std::string msg;
- msg.assign(message.body(), message.bodySize());
-
- // 不能这样使用, AMQP::Message后面没有存'\0'
- // std::cout << message << std::endl
-
- std::cout << msg << std::endl;
- channel->ack(deliveryTag);
- }
- int main()
- {
- // 1.实例化底层网络通信框架的IO事件监控句柄
- auto *loop = EV_DEFAULT;
- // 2.实例化libEvHandler句柄 -> 将AMQP框架与事件监控关联起来
- AMQP::LibEvHandler handler(loop);
- // 3.实例化连接对象
- AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
- AMQP::TcpConnection connection(&handler, address);
- // 4.实例化信道对象
- AMQP::TcpChannel channel(&connection);
- // 5.声明交换机
- channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
- .onError([](const char *message)
- { std::cout << "声明交换机失败: " << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-exchange 交换机创建成功" << std::endl; });
- // 6.声明队列
- channel.declareQueue("test-queue")
- .onError([](const char *message)
- { std::cout << "声明队列失败: " << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-queue 队列创建成功" << std::endl; });
- // 7.针对交换机和队列进行绑定
- channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
- .onError([](const char *message)
- { std::cout << "test-exchange - test-queue 绑定失败: " \
- << message << std::endl; })
- .onSuccess([]()
- { std::cout << "test-exchange - test-queue 绑定成功"; });
- // 8.订阅消息对垒 -> 设置消息处理回调函数
- auto callback = std::bind(MessageCB, &channel, std::placeholders::_1,
- std::placeholders::_2, std::placeholders::_3);
- channel.consume("test-queue", "consume-tag")
- .onReceived(callback)
- .onError([](const char *message)
- {
- std::cout << "订阅 test-queue 队列消息失败: " << message << std::endl;
- exit(0);
- });
- // 9.启动底层网络通信框架 -> 开启IO
- ev_run(loop, 0);
- return 0;
- }
复制代码 3.makefile
- all: publish consume
- publish: publish.cc
- g++ -o $@ $^ -lamqpcpp -lev -std=c++17
- consume: consume.cc
- g++ -o $@ $^ -lamqpcpp -lev -std=c++17
- .PHONY:clean
- clean:
- rm publish consume
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |