Muduo网络库实现 [七] - Connection模块

打印 上一主题 下一主题

主题 1890|帖子 1890|积分 5670

目次
设计思路
类的设计
模块实现
毗连前的预备阶段
毗连通信阶段 
疑惑点


设计思路


Connection模块是对通信毗连也就是通信套接字的整体的管理模块,对毗连的所有操作都是通过这个模块提供的接口来完成的。
   具体要进行哪些方面的管理呢?
  对于管理通信套接字来说,我们此时已经毗连上了客户端,当客户端发来消息的时候,我们无法保证发来的消息是完整的,所以此时必要有个缓冲区,通过send把数据从网络写入到输出缓冲区中,然后启动写事件监听,当内核告诉我们"你可以把数据写进来了",我们就把数据发送出去
缓冲区管理


  • 维护两个Buffer对象:输入缓冲区和输出缓冲区
  • 提供Send接口:将数据写入输出缓冲区,启动写事件监听
  • 当写事件触发时,通过Channel回调处理实际数据发送
其次,还也必要对每个通信套接字自己做管理,可以或许完成套接字的种种操作,其实就是内部必要包罗一个Socket对象,将来关闭套接字时可以或许直接调用Socket的接口
 套接字管理


  • 内部包罗Socket对象
  • 封装套接字的基本操作
  • 提供关闭套接字的接口,在必要时直接调用Socket对象的接口
我们上面说了,当数据发送到用户的输出缓冲区之后,必要启动写事件监听,而这个监听就必要Channel模块来实现了,所以我们要设计个事件监听管理进行把各种事件都封装进来
事件监听管理


  • 内部包罗Channel对象
  • 提供设置事件回调函数的接口
  • 提供设置必要监控事件的接口
  • 关联事件触发与回调处理的机制
还必要保存协议剖析的上下文信息,也就是一个Any对象,将来要提供接口用于获取内部的上下文信息的指针。
协议上下文管理


  • 保存协议剖析的上下文信息(Any范例)
  • 提供获取上下文信息指针的接口
  • 支持协议切换和状态保存
当我们的毗连创建成功之后,该如那边理,毗连关闭之后,该如那边理,以及对于任意事件的产生,又该如那边理呢?所以我们肯定也必要对应的回调函数
 回调函数管理


  • 存储用户通过TcpServer设置的回调函数
  • 在这些回调基础上增加Connection自身的处理逻辑
  • 将最终的回调函数设置到Channel模块,用于事件触发后的处理
这种设计思路体现了模块化和层次化的网络编程架构:
Connection作为中间层,向下管理Socket和Channel。向上为TcpServer提供接口,间接服务于用户,通过缓冲区和事件机制实现异步I/O处理,通过上下文和回调函数实现灵活的协议处理
这时候我们就必要考虑一个很告急的题目: Connection 对象怎样管理? 
起首,所有的Connection 对象肯定是要交给TcpServer 模块进行整体的管理的,这个我们还是很好理解,因为将来我们的Acceptor获取到新毗连之后,必要创建Connection对象,而创建Connection对象的接口就只能是TcpServer模块提供给他的,也就是由TcpServer来创建,创建之后,他也必要对所有的Connection进行管理。 
那么怎么管理呢?new一个Connection对象然后使用指针进行管理?那么会不会有这样一种情况:
想象你在一个网络聊天室里,有这样一个过程:

  • 小明的客户端与服务器创建了毗连
  • 他发送了一条消息
  • 在消息发送的过程中,网络突然断开
  • 这时服务器正预备处理这条消息
在这个场景中,毗连已经关闭,但是后续的操作还在尝试使用这个已经无效的毗连。
毗连有多种操作,如果其中一个操作把毗连关闭释放了,后续的操作再调用Conenction的接口不就是野指针访问了吗?那么程序会触发硬件非常而瓦解。这是我们不希望看到的。即便 TcpServer中可能还保存着这个对象的指针,但是指针指向的资源却可能已经释放了。
所以我们可以用到智能指针share_ptr 来对Connction 进行管理。使用只能指针有哪些上风呢?我们接着看
资源管理机制


  • 使用 share_ptr 智能指针管理 Connection 对象
  • 通过引用计数的方式控制对象的生命周期
安全访问原理


  • 当一个函数接收 Connection 的 share_ptr  作为参数时
  • 该函数栈帧会增加 Connection 对象的引用计数
  • 只要函数还在实行,Connection 对象就不会被真正释放
生命周期保证


  • 纵然其他操作尝试释放毗连
  • 只会淘汰引用计数,不会立即销毁对象
  • 确保正在使用的 Connection 资源不会被意外销毁
多重保护机制


  • TcpServer 管理的基础 share_ptr  可能被释放
  • 但只要还有其他 share_ptr  持有对象
  • Connection 的实际资源不会被销毁
防止野指针的关键


  • 引用计数为零时才真正释放对象
  • 制止悬空指针和非法内存访问
  • 提供了一种线程安全的资源管理方式
基于使用sharet_ptr来管理Connection的思想,我们将来设置接口的时候,就必要通报Connection 的智能指针对象大概引用了,而不是直接通报原始指针。
   Connection类该怎样设计?
  每一个毗连都必要有一个在服务器内部的唯一标识,也就是id,为什么不直接使用 fd ?因为我们对fd 的管理也不是直接使用原始的fd 的,而是使用Socket来管理,我们必要将其与系统的IO的部门进行解耦。 同时,将来这个毗连的id也是她所对应的定时任务的id。
其次,它必要一个Channel对象、一个Socket对象,两个Buffer对象以及一个Any对象
然后,他必要保存四个回调函数,这四个回调函数是由用户来进行设置的,分别是毗连创建时实行的回调函数,新数据到来时实行的回调方法,毗连关闭时实行的回调方法以及任意事件到来实行的回调方法。
由于Connection涉及到超时的管理,那么我们还必要一个值来表示是否启动了超时管理。
以及每一个Cconnection对象必要和一个EventLoop对象绑定,所以他还必要一个EventLoop的指针。
这是几个简朴的成员,我们还必要一个成员就是毗连所处的状态。
 这个状态并不是站在内核层面的状态,而是站在应用层角度的状态。
状态有以下几种:


  • 毗连创建中: 此时我们已经从底层将毗连的文件形貌符获取上来,并为其创建了Connection对象,但是Connection内部的各项设置还未完成
  • 毗连创建完成:Connection对象的各项设置已经完成,可以进行通信了
  • 毗连待关闭状态:对端大概本端必要关闭这个毗连,但是在实际关闭毗连之前我们还必要把缓冲区的数据全部处理完
  • 毗连关闭状态,处于这个状态的时候,就可以把套接字关闭,以及资源释放了
那么这个状态我们可以使用枚举来定义。
  1. enum CONN_STATU
  2. {
  3.     CLOSED,    //关闭状态,不再进行任何操作,释放资源
  4.     CONNECTING,  //连接带处理,还需要进行各项设置才能开始通信
  5.     CONNECTED,   //连接建立,可以通信
  6.     CLOSING  //连接带关闭,尽快处理缓冲区剩余数据
  7. };
复制代码
类的设计

  1. // 前向声明
  2. class Buffer;
  3. class Channel;
  4. class Socket;
  5. class EventLoop;
  6. // 连接状态枚举
  7. typedef enum
  8. {
  9.     CLOSED,     // 关闭状态,不再进行任何操作,释放资源
  10.     CONNECTING, // 连接待处理,还需要进行各项设置才能开始通信
  11.     CONNECTED,  // 连接建立,可以通信
  12.     CLOSING     // 连接待关闭,尽快处理缓冲区剩余数据
  13. } ConnStatus;
  14. // Connection 共享指针类型
  15. using PtrConnection = shared_ptr<Connection>;
  16. class Connection : public enable_shared_from_this<Connection>
  17. {
  18. private:
  19.     // 成员变量
  20.     uint64_t _connid;           // 连接的唯一id,也是定时任务的timeid
  21.     Buffer _in_buffer;          // 用户态的输入缓冲区
  22.     Buffer _out_buffer;         // 用户态的输出缓冲区
  23.     Socket _socket;             // 套接字的管理
  24.     Channel _channel;           // 连接的事件管理
  25.     EventLoop *_loop;           // 连接所关联的loop,函数有RunInLoop进行线程安全操作
  26.     bool _inactive_release;     // 是否开启超时连接销毁,默认为false
  27.     Any _context;               // 请求接收处理的上下文
  28.     ConnStatus _status;         // 连接状态
  29.     int _sockfd;                // 套接字文件描述符
  30.     // 回调函数类型定义
  31.     using ConnectedCallback = function<void(const PtrConnection &)>;
  32.     using MessageCallback = function<void(const PtrConnection &, Buffer *)>;
  33.     using CloseCallback = function<void(const PtrConnection &)>;
  34.     using EventCallback = function<void(const PtrConnection &)>;
  35.    
  36.     // 回调函数成员
  37.     ConnectedCallback _connected_callback;  // 连接建立后的回调函数
  38.     MessageCallback _message_callback;      // 消息处理的回调函数,也就是用来处理接收到的数据,并不是要处理发送出去的数据
  39.     CloseCallback _close_callback;          // 关闭连接的回调函数
  40.     EventCallback _event_callback;          // 任意事件的回调函数
  41.     CloseCallback _svrclose_callback;       // 服务器关闭回调函数
  42. private:
  43.     // 私有成员函数 - Channel 事件处理器
  44.     void HanderRead();      // 处理读事件
  45.     void HanderWrite();     // 处理写事件
  46.     void HandlerError();    // 处理错误事件
  47.     void HandlerClose();    // 处理关闭事件
  48.     void HandlerEvent();    // 处理通用事件
  49.    
  50.     // 私有成员函数 - 事件循环中的操作
  51.     void EstablishInLoop();                 // 在事件循环中建立连接
  52.     void SendInLoop(Buffer &buf);           // 在事件循环中发送数据
  53.     void ShutdownInLoop();                  // 在事件循环中关闭连接
  54.     void EnableInactiveReleaseInLoop(int seconds);  // 在事件循环中启用超时释放
  55.     void CancelInactiveReleaseInLoop();     // 在事件循环中取消超时释放
  56.     void ReleaseInLoop();                   // 在事件循环中释放连接
  57.     void UpgradeInLoop(const Any &context,  // 在事件循环中升级连接配置
  58.                        const ConnectedCallback &connected_callback,
  59.                        const MessageCallback &message_callback,
  60.                        const CloseCallback &close_callback,
  61.                        const EventCallback &event_callback);
  62. public:
  63.     // 构造函数和析构函数
  64.     Connection(EventLoop *loop, uint64_t connid, int sockfd);
  65.     ~Connection();
  66.    
  67.     // 获取器方法
  68.     int Socked();       // 获取套接字描述符
  69.     int Id();           // 获取连接ID
  70.     bool Connected();   // 检查是否已连接
  71.    
  72.     // 上下文管理
  73.     void SetContext(const Any &context);    // 设置上下文
  74.     Any *GetContext();                      // 获取上下文
  75.    
  76.     // 设置回调函数
  77.     void SetConnectedCallback(const ConnectedCallback &cb);   // 设置连接建立回调
  78.     void SetMessageCallback(const MessageCallback &cb);       // 设置消息处理回调
  79.     void SetCloseCallback(const CloseCallback &cb);           // 设置关闭连接回调
  80.     void SetSvrCloseCallback(const CloseCallback &cb);        // 设置服务器关闭回调
  81.     void SetEventCallback(const EventCallback &cb);           // 设置事件回调
  82.    
  83.     // 公共接口
  84.     void Establish();                       // 建立连接
  85.     void Send(const char *data, size_t len);// 发送数据
  86.     void ShutDown();                        // 关闭连接
  87.     void EnableInactiveRelease(int sec);    // 启用超时释放
  88.     void CancelInactiveRelease();           // 取消超时释放
  89.     void Release();                         // 释放连接
  90.     void Upgrade(const Any &context,        // 切换协议
  91.                  const ConnectedCallback &connected_callback,
  92.                  const MessageCallback &message_callback,
  93.                  const CloseCallback &close_callback,
  94.                  const EventCallback &event_callback);
  95. };
复制代码
模块实现

毗连前的预备阶段

起首是我们的构造函数和析构函数的实现,传入我们的毗连id,套接字,然后把超时释放先设置为false,等背面必要再由用户去设置成true,将状态设置成毗连待处理的状态,因为我们并不是构造函数之后就把所有的东西都初始化了,比如说是否启动读事件和毗连后的回调函数我们就必要初始化之后进行实现的。
  1. public:
  2.     Connection(EventLoop *loop, uint64_t connid, int sockfd)
  3.         : _connid(connid)
  4.         , _sockfd(sockfd)
  5.         , _inactive_release(false)
  6.         , _loop(loop)
  7.         , _status(CONNECTING)
  8.         , _socket(_sockfd)
  9.         , _channel(loop, _sockfd)
  10.     {
  11.         _channel.SetReadCallback(bind(&Connection::HanderRead,this));
  12.         _channel.SetWriteCallback(bind(&Connection::HanderWrite,this));
  13.         _channel.SetEventCallback(bind(&Connection::HanderEvent,this));
  14.         _channel.SetErrorCallback(bind(&Connection::HanderError,this));
  15.         _channel.SetCloseCallback(bind(&Connection::HanderClose,this));
  16.     }
  17.     ~Connection()
  18.     {
  19.         DBG_LOG("release connection:%p", this);
  20.     }
复制代码
那么我们现在就必要实现五个回调函数,
起首读事件回调,他的思路很简朴,只必要从套接字中读取数据,然后放在输入缓冲区,再来调用用户传进来的新数据回调函数就行了。

  1.     void HanderRead()
  2.     {
  3.         // 创建一个64KB的临时缓冲区用于接收socket数据
  4.         char buf[65536];
  5.         // 使用非阻塞方式从socket读取数据
  6.         // RecvNonBlock返回实际读取的字节数,或者在出错时返回负值
  7.         ssize_t ret = _socket.RecvNonBlock(buf, 65535);
  8.         // 如果返回值小于0,表示发生错误(如连接已关闭)
  9.         if (ret < 0)
  10.         {
  11.             // 关闭连接并退出
  12.             return ShutDownInLoop();
  13.         }
  14.         // 将读取的数据写入连接的输入缓冲区
  15.         _in_buffer.WriteAndPush(buf, ret);
  16.         // 如果输入缓冲区中有可读数据
  17.         if (_in_buffer.ReadAbleSize() > 0)
  18.         {
  19.             // 调用用户设置的消息回调函数处理数据
  20.             // shared_from_this()返回指向当前对象的智能指针,保证在回调期间对象不会被销毁
  21.             // &_in_buffer传递输入缓冲区的指针,让回调函数可以读取和解析数据
  22.             return _message_callback(shared_from_this(), &_in_buffer);
  23.         }
  24.     }
复制代码
 这里我们可能会疑惑,为什么调用完_message_cb 之后不添加写事件监控,其实是因为,将来用户只能调用我们Connection提供的Send接口来发送数据,但是Send接口我们也懂,只会将数据写入到输出缓冲区中,我们在Send函数的实现中,只有实际写入了再来启动写事件的监控更加合理
   shared_from_this是干嘛的?
  这里有一个细节,就是我们前面声明类的时候,Connection 类继续了 std::enable_shared_from_this这个类,这个继续关系是为了我们可以或许使用 shared_from_this这个功能。
这个用法叫做自我引用。
为什么我们必要引进一个shared_from_this这样的接口呢? 
我们说了使用shared_ptr对所有的Connection对象进行管理,这样可以或许防止在操作的过程中资源被释放。 但是,我们在给 _message_cb 这样的回调函数传参的时候,怎样保证传给他的shared_ptr对象是和管理Conenction 的shared_ptr的对象共享计数呢?
因为如果我们直接使用 shared_ptr<Connection> p (this) ,这样创建一个只能指针对象传参的时候,他的计数是独立的,并不会和TcpServer中管理Conenction的shared_ptr共享计数,那么我们就必要一个办法可以或许创建出一个和Conenction 的管理的shared_ptr对象共享技能的智能指针进行传参,而shared_from_this就可以解决这样的题目。


  • std::enable_shared_from_this<T> 内部维护了一个 std::weak_ptr<T>。
  • 当第一个 std::shared_ptr<T> 开始管理该对象时,这个 weak_ptr 被初始化。
  • 之后,当 shared_from_this() 被调用时,它将基于这个已经存在的 weak_ptr 返回一个新的 std::shared_ptr<T>,这个新的 shared_ptr 与原有的 shared_ptr 共享对对象的所有权。
那么使用这个接口,我们就能保证在这些回调函数在实行的时候,纵然其他的地方调用了_svr_close_cb把TcpServer模块中的基础计数的智能指针释放了,这份资源也还存在,至少在我们这次函数栈帧内还存在,不会出现野指针的题目。

第二个函数就是HandlerWrite ,起首这是设置给channel的回调函数,也就是说只有当写事件触发时才会调用,那么我们直接调用write接口是不会被壅闭的。 固然我们必要判断Write的返回值,判断是否堕落,如果写入数据堕落了,那么我们就没须要再继续处理数据了,纵然处理了也不可能再发出去,那么这时候我们就必要调用实际关闭毗连的接口。
同时,我们还要考虑一种情况,就是,此时其实是读堕落之后,调用ShutDown而监听调用的写事件,那么这时候写完这一次数据之后就必要关闭毗连。 其实也就是判断毗连是否为待关闭状态。
流程图如下 

  1.     void HanderWrite()
  2.     {
  3.         ssize_t ret = _socket.SendNonBlock(_out_buffer.ReadAblesize());
  4.         if (ret < 0)
  5.         {
  6.             // 如果发送错误,就看看是不是接收缓冲区还有数据
  7.             if (_in_buffer.ReadAbleSize() > 0)
  8.             {
  9.                 _message_callback(shared_from_this(), &_in_buffer);
  10.             }
  11.             // 然后再真正关闭
  12.             return Release();
  13.         }
  14.         _out_buffer.MoveRindex(ret);
  15.         if (_out_buffer.ReadAbleSize() == 0)
  16.         {
  17.             _channel.DisableWrite();
  18.             if (_status == DISCONNECTING)
  19.             {
  20.                 return Release();
  21.             }
  22.             return
  23.         }
  24.     }
复制代码
然后就是任意事件回调,任意事件回调我们只必要判断是否启动了超时毗连,如果启动了,那么就必要革新定时任务。 同时我们也必要实行以下用户的任意回调函数,除此之外就没其他的操作了。
  1.     void HandlerEvent()
  2.     {
  3.         if (_inactive_release == true)
  4.         {
  5.             _loop->RefreshTimerTask(_connid);
  6.         }
  7.         if (_event_callback)
  8.         {
  9.             _event_callback(shared_from_this());
  10.         }
  11.     }
复制代码
 然后就是挂断事件回调,挂断事件也很简朴,因为可能在挂断事件触发的时候,也触发了读事件,那么我们可以先处理以下数据,然后直接调用Release关闭毗连,处不处理都行,反正也无法返回了,但是最利益理以下,因为可能是一些功能性请求。
  1.     void HandlerClose()
  2.     {
  3.         if (_in_buffer.ReadAbleSize() > 0)
  4.         {
  5.             _message_callback(shared_from_this(), &_in_buffer);
  6.         }
  7.         return Release();
  8.     }
复制代码
末了就是错误事件回调,错误事件触发的时候,我们的处理方法和挂断事件是一样的。
  1.     void HandlerClose()
  2.     {
  3.         if (_in_buffer.ReadAbleSize() > 0)
  4.         {
  5.             _message_callback(shared_from_this(), &_in_buffer);
  6.         }
  7.         return Release();
  8.     }
复制代码
  为什么我们实现这些接口不必要包装一层 _loop->RunInLoop() 呢? 还是那句话,因为这些IO事件的回调肯定是在对应的EventLoop线程实行的?
  事件循环机制的工作原理


  • 在这种基于Reactor模式的网络库中,每个EventLoop都运行在自己的线程上
  • Channel对象注册到EventLoop时,会将这些回调函数注册到事件循环的事件分发器上
  • 当IO事件发生时,是EventLoop线程自己在调用这些回调函数,而不是其他线程
一线一库的设计理念


  • 每个Connection对象都绑定到特定的EventLoop
  • 每个EventLoop都运行在自己的线程上
  • 事件的检测和分发都在同一个线程中完成
所以没有线程安全题目,但是其他的操作就可能有了,比如我们的Release以及ShutDown等操作,这些对毗连的操作将来都可能是在其他线程中实行的,可能是在别的模块中被调用的,所以必要包装一层。


  • 这些方法可能被其他线程调用

    • 例如,用户可能在主线程或其他工作线程中调用Connection::Send()
    • 为了保证线程安全,必要将这些操作转移到Connection所属的EventLoop线程中实行

  • 制止同步题目

    • 使用RunInLoop机制可以制止对共享资源的直接访问,防止竞态条件
    • 所有的操作都会在同一个线程内按次序实行,无需担心锁的题目

接着我们设置一些获取器方法,用于获取 _sockfd 和 _connid和毗连状态
  1.     int Socked()
  2.     {
  3.         return _sockfd;
  4.     }
  5.     int Id()
  6.     {
  7.         return _connid;
  8.     }
  9.     bool Connected()
  10.     {
  11.         return (_status == CONNECTED);
  12.     }
复制代码
 再来实现几个简朴的接口,也就是用户设置回调函数的方法:
  1.     //用户设置的回调函数
  2.     void SetConnectedCallback()
  3.     {
  4.         _connected_callback = cb;
  5.     }
  6.     void SetMessageCallback()
  7.     {
  8.         _message_callback = cb;
  9.     }
  10.     void SetCloseCallback()
  11.     {
  12.         _close_callback = cb;
  13.     }
  14.     void SetSvrCloseCallback()
  15.     {
  16.         _svrclose_callback = cb;
  17.     }
  18.     void SetEventCallback()
  19.     {
  20.         _event_callback = cb;
  21.     }
复制代码
目前我们大概就能完成毗连待处理状态的操作了。然后设置Connection的属性并让其开始通信。
就如同上面的这几个设置回调函数的接口,我们都是在创建出来一个对象之后,正式通信之前设置的,除此之外,我们还可以设置启动和取消非活泼毗连销毁机制。
我们要注意的是,这两个接口其实有可能在通信的过程中被调用,如果是在通信之前被调用,那么是没有线程安全题目的,但是如果是在毗连已经开始通信之后再被调用,那么我们要保证线程安全,就必要封装一层函数放到 EventLoop 的RunInLoop 中运行,比如这些接口其实都是在TcpServer中调用的,我们无法确定具体在哪个线程进行实行。
  1.     void EnableInactiveReleaseInLoop(int seconds)
  2.     {
  3.         _inactive_release = true;
  4.         if (_loop->HasTimerTask(_connid)) // 判断是否有定时器
  5.         {
  6.             return _loop->RefreshTimerTask(_connid);
  7.         }
  8.         _loop->AddTimerTask(_connid, sec, bind(&Connection::Release, this));
  9.     }
  10.     void CancelInactiveReleaseInLoop(int seconds)
  11.     {
  12.         _inactive_release = false;
  13.         _loop->CancelTimerTask(_connid);
  14.     }
  15.    
  16.     void EnableInactiveRelease(int sec)
  17.     {
  18.         _loop->RunInLoop(bind(&Connection::EnableInactiveReleaseInLoop, this, seconds));
  19.     }
  20.     void CancelInactiveRelease()
  21.     {
  22.         _loop->RunInLoop(bind(&Connection::CancelInactiveReleaseInLoop, this));
  23.     }
复制代码
 当设置完这些之后,我们就可以启动毗连,开始通信了,我们可以设置一个 Established () 接口以供外界大概说上层的TcpServer调用。 但是为了线程安全,我们必要将实际的操作封装成一个任务交给loop。
  1.    
  2.     void EstablishInLoop()
  3.     {
  4.         assert(_status == CONNECTING);
  5.         _status = CONNECTED;
  6.         _channel.EnableRead();
  7.         if (_connected_callback)
  8.         {
  9.             _connected_callback(shared_from_this());
  10.         }
  11.     }
  12.     void Establish() // 连接获取之后进行各种设置,设置事件回调,启动读监控,调用读书简回调等
  13.     {
  14.         _loop->RunInLoop(bind(&Connection::EstblishInLoop, this));
  15.     }
复制代码
同时,在预备阶段也必要设置上下文的接口,这个接口我们就不关心线程安全的题目了,因为一般来说我们的逻辑没题目的话是不会重复设置的。 
  1.     void SetContext(const Any &context)
  2.     {
  3.         _context = context;
  4.     }
复制代码
毗连通信阶段 

那么已经步入通信阶段之后,我们必要提供一个接口给用户用于发送数据,也就是向输出缓冲区写入数据,这个接口也不是线程安全的,所以也必要封装成任务交给 RunInLoop
  1.     void SendInLoop(Buffer &buf) // 把数据写入到用户态的发送缓冲区中
  2.     {
  3.         if (_status == DISCONNECTED)
  4.         {
  5.             return;
  6.         }
  7.         _out_buffer.WriteBufferAndPush(buf);
  8.         if (_channel.WriteAble() == false)
  9.         {
  10.             _channel.EnableWrite();
  11.         }
  12.     }
  13.     void Send(const char *data, size_t len) // 发送并不是真的发送了,是要先把数据放到同一个线程下,然后再同一个线程中去发送
  14.     {
  15.         Buffer buf;
  16.         buf.WriteAndPush(data, len);
  17.         _loop->RunInLoop(bind(&Connection::SendInLoop, this, move(buf)));
  18.     }
复制代码
然后就是设置上下文的接口,同时要注意线程安全题目。 
设置上下文其实就必要重新设置上下文以及四个回调函数。因为不同的协议,它的处理方式可能不同,比如说原先的毗连回调函数的逻辑遇到事件直接就处理,但是你切换了某个协议之后,必要先进行验证,那此时旧的回调函数的逻辑就不符合了
  1.     void UpgradeInLoop(const Any &context,
  2.                        const ConnectionCallback &connected_callback,
  3.                        const MessageCallback &message_callback,
  4.                        const CloseCallback &close_callback,
  5.                        const EventCallback &event_callback)
  6.     {
  7.         _context = context;
  8.         _connected_callback = connected_callback;
  9.         _message_callback = message_callback;
  10.         _close_callback = close_callback;
  11.         _event_callback = event_callback;
  12.     }
复制代码
具体实行的操作就是这样的,但是 Upgrade 这个接口有点特别,它是线程不安全的,所以必须在EventLoop线程中实行,同时,这还不够,他必须被立马实行,不能放到任务队列中,因为如果如果放到任务队列中,那么如果此时有读事件到来,那么就会先进行事件处理,也就是会先使用老的处理函数进行处理,然后才更新回调方法,这是不对的,我们必须在调用Upgrade的时候立即将协议的回调函数和上下文进行替换。大概换一种说法:我们必须在EventLoop线程中调用Upgrade函数,如果在其他线程内调用,那么直接报错。 绝对不能在其他的线程中调用这个函数。
   这个读事件为什么会比他先进行 这个任务队列不是按照次序进行实行的吗?先把更新协议放到任务队列中 然后读事件来了,再把读事件放入到队列中?
  在EventLoop中,任务队列确实是按次序实行的,但题目出在读事件的触发机制上:
当网络数据到达时,操作系统会通知 EventLoop 有读事件发生。这个通知是通过 epoll/poll 等 I/O 复用机制实现的,发生在 EventLoop 的事件循环中。而任务队列通常是在处理完所有已触发的 I/O 事件后才会被实行。所以实行次序可能是这样的:


  • 调用 Upgrade 方法,将协议切换任务放入队列
  • 数据到达,触发读事件
  • EventLoop 先处理所有 I/O 事件(包括刚才触发的读事件)
  • 然后才实行任务队列中的任务(包括升级毗连的任务)
那么我们可以在EventLoop中再提供一个接口
  1.     void AssertInLoop()const{assert(_thread_id == std::this_thread::get_id());}
复制代码
所以我们的Upgrade接口的实现就是这样的:
  1.     void Upgrade(const Any& context,const ConnectCallBack& con , const MessageCallBack& msg, const CloseCallBack& clo,const EventCallBack& evt)
  2.     {
  3.        _loop->AssertInLoop();
  4.         _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop,this,context,con,msg,clo,evt));
  5.     }
复制代码
我们还必要提供一个接口用于获取上下文,这个接口可以不进行封装
  1.     Any* GetContext() {return &_context;}
复制代码
然后就是关闭毗连的接口了,我们有两套接口,起首来实现ShutDown,也就是用户关闭毗连的接口,这个接口也是必要注意线程安全题目,必要封装成任务。
在这个接口中,我们必要处理输入缓冲区和输出缓冲区的数据,然后再调用Release接口关闭毗连。
  1.     void ShutDownInLoop()
  2.     {
  3.         _con_statu = CLOSING;   //设置连接待关闭
  4.         if(_in_buffer.ReadSize()) //有数据待处理
  5.             if(_message_cb) _message_cb(shared_from_this(),&_in_buffer);
  6.         //所有数据都处理完之后,处理待发送数据
  7.         if(_out_buffer.ReadSize())  //如果有数据待发送
  8.         {
  9.             _channel.EnableWrite();    //启动写事件监听
  10.             //触发写事件之后,在写事件回调中会调用Release进行连接释放
  11.         }
  12.         else Release(); //如果没有数据待发送就直接关闭
  13.     }
  14.     void ShutDown()
  15.     {
  16.         _loop->RunInLoop(std::bind(&Connection::ShutDownInLoop,this));
  17.     }
复制代码
末了就是实际释放毗连的Release接口了
起首,毗连状态设置为 DISCONNECTED, 移除所有事件监控,关闭文件形貌符,然后取消定时任务,以及调用用户的回调函数,末了释放掉TcpServer所管理的基础计数
  1.     void ReleaseInLoop()
  2.     {
  3.         // 1.判断状态 2.移除所有事件监控 3.关闭套接字 4.移除定时器 5.调用关闭回调函数
  4.         _status = DISCONNECTED;
  5.         _channel.Remove();
  6.         _socket.Close();
  7.         if (_loop->HasTimerTask(_connid)) // 判断是否有定时器
  8.         {
  9.             CancelInactiveReleaseInLoop();
  10.         }
  11.         if (_close_callback)
  12.         {
  13.             _close_callback(share_from_this());
  14.         }
  15.     }
  16.     void Release()
  17.     {
  18.         _loop->RunInLoop(bind(&Connection::ReleaseInLoop, this));
  19.     }
复制代码
疑惑点

   在设计 Connection 类时,我们必要考虑怎样唯一标识每个毗连。对于为什么不直接使用文件形貌符(fd)作为唯一标识?
  有几个告急缘故起因:

  • 文件形貌符可能会被重用 - 当一个毗连关闭后,操作系统可能会将这个 fd 重新分配给新的毗连,这可能导致标识肴杂
  • 跨历程/跨服务器题目 - 如果服务器是分布式或多历程的,fd 在不同历程间可能不唯一
  • 持久化题目 - 如果必要持久化毗连信息,当服务器重启后 fd 会改变
    HanderRead的流程图
  
 

 ​​​​​​


   为啥connection模块还必要一些InLoop的接口 connection不就在EventLoop模块中吗 不是已经绑定了线程了吗?
  

那不能直接把这个send放入私有接口内里吗?

    std::bind(&Connection::SendInLoop, this, in, len)不理解这个代码
  


    move的作用
  

   不是channel管理的是事件回调吗?怎么这里也要设置呢 不能调用channel模块的回调函数吗? 
  简而言之,Channel模块的回调函数负责的是底层Socket的事件管理,Connection模块的回调函数负责的是用户层的事件回调 
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

杀鸡焉用牛刀

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表