rabbitMq-----broker服务器

打印 上一主题 下一主题

主题 784|帖子 784|积分 2367

提示:文章写完后,目录可以自动天生,如何天生可参考右边的帮助文档
  
  

前言

搭建一个网络服务器,在内部提供各个业务接口即可。
在业务处理函数中,每次请求过来找到对应的信道,通过信道句柄调用前边封装好的处理接口进行处理,最后返回相应即可。

管理的字段

服务器必要管理的字段,其中必要搭建一个tcp服务器。然后就是我们业务所需的句柄,一个是虚拟机管理句柄,消耗者管理句柄,毗连管理句柄和线程池句柄。
  1. class Server
  2. {
  3. private:
  4. using MessagePtr = std::shared_ptr<google::protobuf::Message>;
  5.     muduo::net::TcpServer _server;
  6.     muduo::net::EventLoop _baseloop;
  7.     ProtobufCodecPtr _codec;        // 协议处理器 对收到的请求进行protobuf协议处理
  8.     ProtobufDispatcher _dispatcher; // 请求分发器
  9.     VirtualHost::ptr _host;
  10.     ConsumerManager::ptr _consumer_manager;
  11.     ConnectionManager::ptr _connection_manager;
  12.     ThreadPool::ptr _pool;
  13. }
复制代码
我们必要为服务器注册业务请求处理函数。muduo库是支持protobuf协议的处理的。
  1. // 注册请求处理函数
  2. _dispatcher.registerMessageCallback<openChannelRequest>(std::bind(&Server::OnOpenChannel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  3. _dispatcher.registerMessageCallback<closeChannelRequest>(std::bind(&Server::OnClodeChannle,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  4. _dispatcher.registerMessageCallback<declareExchangeRequest>(std::bind(&Server::OnDeclareExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  5. _dispatcher.registerMessageCallback<deleteExchangeRequest>(std::bind(&Server::OnDeleteExchange,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  6. _dispatcher.registerMessageCallback<declareQueueRequest>(std::bind(&Server::OnDeclareQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  7. _dispatcher.registerMessageCallback<deleteQueueRequest>(std::bind(&Server::OnDeleteQueue,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  8. _dispatcher.registerMessageCallback<queueBindRequest>(std::bind(&Server::onQueueBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  9. _dispatcher.registerMessageCallback<queueUnBindRequest>(std::bind(&Server::onQueueUnBind,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  10. _dispatcher.registerMessageCallback<basicPublishRequest>(std::bind(&Server::onBasicPublish,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  11. _dispatcher.registerMessageCallback<basicAckRequest>(std::bind(&Server::onBasicAck,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  12. _dispatcher.registerMessageCallback<basicConsumeRequest>(std::bind(&Server::onBasicConsume,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  13. _dispatcher.registerMessageCallback<basicCancelRequest>(std::bind(&Server::onBasicCancel,this,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
  14.         }
复制代码
当新毗连创建时,我们需调用毗连句柄创建一个新毗连。
而毗连断开时,必要删除毗连对象。
  1. void onConnection(const muduo::net::TcpConnectionPtr &conn)
  2. {
  3.      if (conn->connected() == true)
  4.      {
  5.          LOG_INFO << "新连接建立了";
  6.          _connection_manager->newConnection(_host,_consumer_manager,_codec,conn,_pool);
  7.      }
  8.      else
  9.      {
  10.          LOG_INFO << "连接断开了";
  11.          _connection_manager->delConnection(conn);
  12.      }
  13. }
复制代码
而其他注册的业务处理函数也比较简答,大体流程就是通过毗连找到对应的毗连管理对象,然后通过请求中的rid字段找道毗连管理中的对应信道。调用信道中封装好的处理接口进行处理即可。
  1. void OnDeclareExchange(const muduo::net::TcpConnectionPtr &conn,const declareExchangeRequestPtr &message,muduo::Timestamp){
  2.            Connection::ptr mconn =  _connection_manager->getConnection(conn);
  3. if(mconn.get() == nullptr)
  4.   {
  5.       DLOG("声明交换机时,没有找到连接对应的Connection对象!");
  6.       conn->shutdown();
  7.       return;
  8.   }
  9.   Channel::ptr cp = mconn->getChannel(message->cid());
  10.   if(cp.get() == nullptr)
  11.   {
  12.       DLOG("声明交换机时,没有找到信道!");
  13.       conn->shutdown();
  14.       return;
  15.   }
  16.   return cp->declareExchange(message);
  17. }
复制代码
这里有两个比较特殊,一个打开信道,一个是关闭信道
这两个操作是通过毗连找到毗连管理对象,然后调用毗连管理对象提供的打开信道和关闭信道操作进行处理。
  1. void OnOpenChannel(const muduo::net::TcpConnectionPtr &conn,const openChannelRequestPtr &message,muduo::Timestamp){
  2.             Connection::ptr mconn = _connection_manager->getConnection(conn);
  3. if(mconn.get() == nullptr)
  4. {
  5.      DLOG("打开信道时,没有找到连接对应的Connection对象!");
  6.      conn->shutdown();
  7.      return;
  8. }
  9. return mconn->openChannel(message);
  10. }
  11. //关闭信道
  12. void OnClodeChannle(const muduo::net::TcpConnectionPtr &conn,const closeChannelRequestPtr &message,muduo::Timestamp){
  13. Connection::ptr mconn = _connection_manager->getConnection(conn);
  14. if(mconn.get() == nullptr)
  15. {
  16.      DLOG("关闭信道时,没有找到连接对应的Connection对象!");
  17.      conn->shutdown();
  18.      return;
  19. }
  20. return mconn->closeChannel(message);
  21. }
复制代码
在构造函数的时候,由于队列消耗者是以队列为单位管理的,以是我们要获取已经存在的队列,来进行队列消耗者的初始化。
  1. //消费者是按照队列为单元进行管理的,针对历史消息中的所有队列,需要初始化队列的消费者管理结构QueueConsumer
  2. std::unordered_map<std::string, mq::MsgQueue::ptr> mqmp =  _host->allQueue();
  3. for(auto &mq : mqmp)
  4. {
  5.      _consumer_manager->initQueueConsumer(mq.first);
  6. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连全瓷种植牙齿制作中心

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表