单reactor实战

打印 上一主题 下一主题

主题 1560|帖子 1560|积分 4680


前言:reactor作为一种高性能的范式,值得我们学习
本次目标 实现一个基于的reactor 具备echo功能的服务器

核心组件

Reactor本身是靠一个变乱驱动的框架,无疑引出一个类似于moduo的"EventLoop "以及boost.asio中的context而言,不停获取停当变乱,其本质无疑是io多用复路实现的。
下面就是核心功能:
  1. void EventLoop::loop(){
  2.     while(!quit){
  3.     std::vector<Channel*> chs;
  4.         chs = ep->poll();
  5.         for(auto it = chs.begin(); it != chs.end(); ++it){
  6.             (*it)->handleEvent();
  7.         }
  8.     }
  9. }
复制代码
当变乱检测到了,无疑我们要举行分发,这个所以我们也可以借助一个核心组件
Channel来分发
  1. class Channel
  2. {
  3. private:
  4.     EventLoop *loop;
  5.     int fd;
  6.     uint32_t events;
  7.     uint32_t revents;
  8.     bool inEpoll;
  9.     std::function<void()> callback;
  10. public:
  11.     Channel(EventLoop *_loop, int _fd);
  12.     ~Channel();
  13.     void enableReading();//设置检测什么事件
  14.     int getFd();
  15.     uint32_t getEvents();
  16.     uint32_t getRevents();
  17.     bool getInEpoll();
  18.     void setInEpoll();
  19.     // void setEvents(uint32_t);
  20.     void setRevents(uint32_t);
  21.     void setCallback(std::function<void()>);
  22. };
复制代码
不难发现,其实这个核心函数为std::function<void()> callback和void setCallback(std::function<void()>);,设置不同变乱的回调函数
  1. void Channel::setCallback(std::function<void()> _cb){
  2.     callback = _cb;
  3. }
复制代码
有过reactor的基础的人,一样平常都知道我们将业务分成毗连和业务处理
没基础也不要紧可以阅读这个Reactor解析 你也可以直接看图

这就引出了Acceptor类 专门处理毗连的
   

  • Acceptor类存在于变乱驱动EventLoop类中,也就是Reactor模式的main-Reactor
  • 类中的socket fd就是服务器监听的socket fd,每一个Acceptor对应一个socket fd
  • 这个类也通过一个独有的Channel负责分发到epoll,该Channel的变乱处理函数handleEvent()会调用Acceptor中的继承毗连函数来新建一个TCP毗连
  1. class Acceptor
  2. {
  3. private:
  4.     EventLoop *loop;
  5.     Socket *sock;//每一个Acceptor对应一个socket fd
  6.     Channel *acceptChannel;//这个类也通过一个独有的Channel负责分发到epoll
  7. public:
  8.     Acceptor(EventLoop *_loop);
  9.     ~Acceptor();
  10.     void acceptConnection();
  11.     std::function<void(Socket*)> newConnectionCallback;
  12.     void setNewConnectionCallback(std::function<void(Socket*)>);
  13. };
复制代码
这就引出为什么 的核心组件其毗连组件Acceptor(EventLoop *_loop);和 void acceptConnection();
  1. Acceptor::Acceptor(EventLoop *_loop) :loop(_loop){
  2.     sock=new Socket();
  3.     InetAddress* addr=new InetAddress("127.0.0.1",8080);
  4.     sock->bind(addr);
  5.     sock->listen();
  6.     sock->setnonblocking();
  7.     acceptChannel=new Channel(_loop,sock->getFd());
  8.     std::function<void()> cb = std::bind(&Acceptor::acceptConnection, this);//注册回调函数 进行连接
  9.     acceptChannel->setCallback(cb);
  10.     acceptChannel->enableReading();//注册事件
  11.     delete addr;
  12. }
  13. void Acceptor::acceptConnection(){
  14.     InetAddress*clnt_addr=new InetAddress();
  15.     Socket* clnt_sock=new Socket(sock->accept(clnt_addr));
  16.     printf("new client fd %d! IP: %s Port: %d\n", clnt_sock->getFd(), inet_ntoa(clnt_addr->getAddr().sin_addr), ntohs(clnt_addr->getAddr().sin_port));
  17.     clnt_sock->setnonblocking();
  18.     newConnectionCallback(sock);
  19.     delete clnt_addr;
  20. }
复制代码
讲一下这个 newConnectionCallback(sock); 这个就是业务逻辑处理。这份代码的不完整之处就在于 他的目的核心是实现一个功能 echo回响 所以直接毗连之后给了一个echo的业务逻辑。引出一个connect 即每一个毗连都有一个业务处理方法
   Connection类,这个类也有以下几个特点:
   

  • 类存在于变乱驱动EventLoop类中,也就是Reactor模式的main-Reactor
  • 类中的socket fd就是客户端的socket fd,每一个Connection对应一个socket fd
  • 每一个类的实例通过一个独有的Channel负责分发到epoll,该Channel的变乱处理函数handleEvent()会调用Connection中的变乱处理函数来响应客户端请求
  1. class Connection {
  2. private:
  3.     EventLoop *loop;
  4.     Socket *sock;
  5.     Channel *channel;
  6.     std::function<void(Socket*)> deleteConnectionCallback;//
  7. public:
  8.     Connection(EventLoop *_loop, Socket *_sock);
  9.     ~Connection();
  10.     void echo(int sockfd);
  11.     void setDeleteConnectionCallback(std::function<void(Socket*)>);
  12. };
复制代码
通过这个函数也可以看出一个echo就是一个connection的回调函数。然后我们再来看一下另一个重要的资源释放.设置一个自动释放的connection的回调函数。
  1. void Connection::setDeleteConnectionCallback(std::function<void(Socket*)> _cb){
  2.     deleteConnectionCallback = _cb;
  3. }
复制代码
也是注册函数
Reactor-server

  1. class Server
  2. {
  3. private:
  4.     EventLoop *loop;
  5.     //ADD 连接逻辑
  6.     Acceptor *acceptor;
  7.     std::map<int, Connection*> connections;//一一对应
  8. public:
  9.     Server(EventLoop*);
  10.     ~Server();
  11.     void handleReadEvent(int);
  12.     void newConnection(Socket *serv_sock);
  13.     void deleteConnection(Socket *sock);
  14. };
复制代码
这个reactor无疑包含了三个组件 变乱驱动,毗连逻辑和业务逻辑
  1. Server::Server(EventLoop *_loop) : loop(_loop){
  2.     acceptor = new Acceptor(loop);//完成所有的连接步骤 就没设置回调函数
  3.     std::function<void(Socket*)> cb = std::bind(&Server::newConnection, this, std::placeholders::_1);//这个是每次动态
  4.     acceptor->setNewConnectionCallback(cb);//
  5. }
  6. void Server::newConnection(Socket *sock){
  7.     Connection *conn = new Connection(loop, sock);
  8.     std::function<void(Socket*)> cb = std::bind(&Server::deleteConnection, this, std::placeholders::_1);
  9.     conn->setDeleteConnectionCallback(cb);
  10.     connections[sock->getFd()] = conn;
  11. }
  12. void Server::deleteConnection(Socket * sock){
  13.     Connection *conn = connections[sock->getFd()];
  14.     connections.erase(sock->getFd());
  15.     delete conn;
  16. }
复制代码


  • 解析
    这个reactor-main是不是有三个疑问
  • 为什么毗连步调完成了
  • 他的业务函数也就是echo在哪
  • 为什么用map来装饰Connection
首先我们来回顾一下团体流程,我们要明白一个点 一个socket的绑定IP和端口,不算毗连逻辑,也就是说当你初始化的时候就应该弄好了。
真正的毗连逻辑应该是设置毗连回调函数的时候,例如
  1. Acceptor::Acceptor(EventLoop *_loop) :loop(_loop){
  2.     sock=new Socket();
  3.     InetAddress* addr=new InetAddress("127.0.0.1",8080);
  4.     sock->bind(addr);
  5.     sock->listen();
  6.     sock->setnonblocking();
  7.     acceptChannel=new Channel(_loop,sock->getFd());
  8.     std::function<void()> cb = std::bind(&Acceptor::acceptConnection, this);//注册回调函数
  9.     acceptChannel->setCallback(cb);/
  10.     acceptChannel->enableReading();//注册事件
  11.     delete addr;
  12. }
复制代码
acceptChannel->setCallback(cb)这个就毗连逻辑,当有客户端来的时候 他就是实行这个逻辑。第一个疑问解决
然后 acceptor->setNewConnectionCallback(cb);这个是不是设置了业务逻辑,因为前面讲过 这个没有线程池而且业务逻辑单一,所有将echo默认每一个毗连的业务逻辑 所有直接再毗连回调的时候设置了一个业务逻辑(你去看connect的构造函数 就会发现他直接绑定了echo做业务逻辑)第二疑问解决
  1. Connection::Connection(EventLoop *_loop, Socket *_sock):loop(_loop),sock(_sock),channel(nullptr){
  2.     //属性
  3.     channel=new Channel(loop,sock->getFd());//
  4.     std::function<void()>cb=std::bind(&Connection::echo,this,sock->getFd());//业务处理函数;
  5.     channel->setCallback(cb);
  6.     channel->enableReading();
  7. }
  8. Connection::~Connection(){
  9.     delete channel;
  10.     delete sock;
  11. }
复制代码
也就是说 你可以从这下手 换一个业务逻辑 实现其他的业务
第三个疑问:
   对于断开TCP毗连操作,也就是销毁一个Connection类的实例。由于Connection的生命周期由Server举行管理,所以也应该由Server来删除毗连。假如在Connection业务中需要断开毗连操作,也应该和之前一样使用回调函数来实现.使用map就是好删除管理

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表