【网络】高级IO——Reactor版TCP服务器

打印 上一主题 下一主题

主题 1535|帖子 1535|积分 4605

1.什么是Reactor

Reactor 是一种应用在服务器端的开发模式(也有说法称 Reactor 是一种 IO 模式),目的是提高服务端步伐的并发能力
   它要办理什么题目呢?
  传统的 thread per connection 用法中,线程在真正处置惩罚请求之前首先必要从 socket 中读取网络请求,而在读取完成之前,线程本身被阻塞,不能做任何事,这就导致线程资源被占用,而线程资源本身是很珍贵的,尤其是在处置惩罚高并发请求时。 
  而 Reactor 模式指出,在等候 IO 时,线程可以先退出,如许就不会因为有线程在等候 IO 而占用资源。但是如许原先的执行流程就没法还原了,因此,我们可以利用变乱驱动的方式,要求线程在退出之前向 event loop 注册回调函数,如许 IO 完成时 event loop 就可以调用回调函数完成剩余的操作。
       所以说,Reactor 模式通过减少服务器的资源消耗,提高了并发的能力。固然,从实现角度上,变乱驱动编程会更难写,难 debug 一些。
 1.1.餐厅里的Reactor模式

我们用“餐厅”类比的话,就像下图:

对于每个新来的顾客,前台都必要找到一个服务员和厨师来服务这个顾客。

  • 服务员给出菜单,并等候点菜
  • 顾客检察菜单,并点菜
  • 服务员把菜单交给厨师,厨师照着做菜
  • 厨师做好菜后端到餐桌上
   这就是传统的多线程服务器。每个顾客都有自己的服务团队(线程),在人少的环境下是可以良好的运作的。如今餐厅的口碑好,顾客人数不停增加,这时服务员就有点处置惩罚不过来了。
这时老板发现,每个服务员在服务完客人后,都要去休息一下,因此老板就说,“你们都别休息了,在旁边待命”。如许大概 10 个服务员也来得及服务 20 个顾客了。这也是“线程池”的方式,通过重用线程来减少线程的创建和销毁时间,从而提高性能。
  但是客人又进一步增加了,仅仅靠剥削服务员的休息时间也没有办法服务这么多客人。老板细致观察,发现着实服务员并不是不停在干活的,大部分时间他们只是站在餐桌旁边等客人点菜。
  于是老板就对服务员说,客人点菜的时候你们就别傻站着了,先去服务其它客人,有客人点好的时候喊你们再过去。对应于下图:

最后,老板发现根本不必要那么多的服务员,于是裁了一波员,终极甚至可以只有一个服务员。
        这就是 Reactor 模式的核心思想:减少等候。当遇到必要等候 IO 时,先释放资源,而在 IO 完成时,再通过变乱驱动 (event driven) 的方式,继续接下来的处置惩罚。从团体上减少了资源的消耗。
2.Reactor的由来

如果要让服务器服务多个客户端,那么最直接的方式就是为每一条毗连创建线程。
    着实创建历程也是可以的,原理是一样的,历程和线程的区别在于线程比较轻量级些,线程的创建和线程间切换的本钱要小些,为了描述简述,背面都以线程为例。
 处置惩罚完业务逻辑后,随着毗连关闭后线程也同样要销毁了,但是如许不停地创建和销毁线程,不但会带来性能开销,也会造成浪费资源,而且如果要毗连几万条毗连,创建几万个线程去应对也是不现实的。
     要这么办理这个题目呢?我们可以使用「资源复用」的方式。
        也就是不用再为每个毗连创建线程,而是创建一个「线程池」,将毗连分配给线程,然后一个线程可以处置惩罚多个毗连的业务。
  不过,如许又引来一个新的题目,线程怎样才能高效地处置惩罚多个毗连的业务?
        当一个毗连对应一个线程时,线程一般接纳「read -> 业务处置惩罚 -> send」的处置惩罚流程,如果当前毗连没有数据可读,那么线程会阻塞在 read 操作上( socket 默认环境是阻塞 I/O),不过这种阻塞方式并不影响其他线程。
     但是引入了线程池,那么一个线程要处置惩罚多个毗连的业务,线程在处置惩罚某个毗连的 read 操作时,如果遇到没有数据可读,就会发生阻塞,那么线程就没办法继续处置惩罚其他毗连的业务。
       要办理这一个题目,最简单的方式就是将 socket 改成非阻塞,然后线程不停地轮询调用 read 操作来判定是否有数据,这种方式虽然该能够办理阻塞的题目,但是办理的方式比较粗暴,因为轮询是要消耗 CPU 的,而且随着一个 线程处置惩罚的毗连越多,轮询的服从就会越低。
    上面的题目在于,线程并不知道当前毗连是否有数据可读,从而必要每次通过 read 去试探。
        那有没有办法在只有当毗连上有数据的时候,线程才去发起读请求呢?答案是有的,实现这一技能的就是 I/O 多路复用。
     I/O 多路复用技能会用一个系统调用函数来监听我们全部关心的毗连,也就说可以在一个监控线程内里监控许多的毗连。


 我们熟悉的 select/poll/epoll 就是内核提供给用户态的多路复用系统调用,线程可以通过一个系统调用函数从内核中获取多个变乱。 
select/poll/epoll 是如何获取网络变乱的呢?
在获取变乱时,先把我们要关心的毗连传给内核,再由内核检测:
如果没有变乱发生,线程只需阻塞在这个系统调用,而无需像前面的线程池方案那样轮训调用 read 操作来判定是否有数据。
如果有变乱发生,内核会返回产生了变乱的毗连,线程就会从阻塞状态返回,然后在用户态中再处置惩罚这些毗连对应的业务即可。

 
   
  

  • 当下开源软件能做到网络高性能的原因就是 I/O 多路复用吗? 
  是的,根本是基于 I/O 多路复用,用过 I/O 多路复用接口写网络步伐的同砚,肯定知道是面向过程的方式写代码的,如许的开发的服从不高。
     于是,大佬们基于面向对象的思想,对 I/O 多路复用作了一层封装,让使用者不用思量底层网络 API 的细节,只必要关注应用代码的编写。大佬们还为这种模式取了个让人第一时间难以理解的名字:Reactor 模式。

   Reactor 翻译过来的意思是「反应堆」,大概各人会联想到物理学里的核反应堆,实际上并不是的这个意思。
        这里的反应指的是「对变乱反应」,也就是来了一个变乱,Reactor 就有相对应的反应/相应。

   究竟上,Reactor 模式也叫 Dispatcher 模式,我觉得这个名字更贴合该模式的含义,即 I/O 多路复用监听变乱,收到变乱后,根据变乱类型分配(Dispatch)给某个历程 / 线程。
Reactor 模式主要由 Reactor 和处置惩罚资源池这两个核心部分构成,它俩负责的事情如下:


  • Reactor 负责监听和分发变乱,变乱类型包含毗连变乱、读写变乱;
  • 处置惩罚资源池负责处置惩罚变乱,如 read -> 业务逻辑 -> send;
Reactor 模式是机动多变的,可以应对差别的业务场景,机动在于:


  • Reactor 的数量可以只有一个,也可以有多个;
  • 处置惩罚资源池可以是单个历程 / 线程,也可以是多个历程 /线程;
将上面的两个因素排列组设一下,理论上就可以有 4 种方案选择:

方案具体使用历程照旧线程,要看使用的编程语言以及平台有关:



  • 单 Reactor 单历程 / 线程;
  • 单 Reactor 多历程 / 线程;
  • 多 Reactor 单历程 / 线程;
  • 多 Reactor 多历程 / 线程; 其中,「多 Reactor 单历程 / 线程」实现方案相比「单 Reactor 单历程 / 线程」方案,不但复杂而且也没有性能优势,因此实际中并没有应用。
    剩下的 3 个方案都是比较经典的,且都有应用在实际的项目中:
  • 单 Reactor 单历程 / 线程;
  • 单 Reactor 多线程 / 历程;
  • 多 Reactor 多历程 / 线程;
  • Java 语言一般使用线程,好比 Netty;
  • C 语言使用历程和线程都可以,比方 Nginx 使用的是历程,Memcache 使用的是线程。
  • 接下来,分别先容这三个经典的 Reactor 方案。
    2.1.单 Reactor 单历程 / 线程

    一般来说,C 语言实现的是「单 Reactor 单历程」的方案,因为 C 语编写完的步伐,运行后就是一个独立的历程,不必要在历程中再创建线程。
    我们来看看「单 Reactor 单历程」的方案示意图:

可以看到历程里有 Reactor、Acceptor、Handler 这三个对象:


  • Reactor 对象的作用是监听和分发变乱;
  • Acceptor 对象的作用是获取毗连;
  • Handler 对象的作用是处置惩罚业务;
  • 对象里的 select、accept、read、send 是系统调用函数,dispatch 和 「业务处置惩罚」是必要完成的操作,其中 dispatch 是分发变乱操作。
    接下来,先容下「单 Reactor 单历程」这个方案:
  • Reactor 对象通过 select (IO 多路复用接口) 监听变乱,收到变乱后通过 dispatch 举行分发,具体分发给 Acceptor 对象照旧 Handler 对象,还要看收到的变乱类型;
  • 如果是毗连创建的变乱,则交由 Acceptor 对象举行处置惩罚,Accep
  • tor 对象会通过 accept 方法 获取毗连,并创建一个 Handler 对象来处置惩罚后续的相应变乱;
  • 如果不是毗连创建变乱, 则交由当前毗连对应的 Handler 对象来举行相应;
  • Handler 对象通过 read -> 业务处置惩罚 -> send 的流程来完成完整的业务流程。
  • 单 Reactor 单历程的方案因为全部工作都在同一个历程内完成,所以实现起来比较简单,不必要思量历程间通信,也不用担心多历程竞争。
  • 但是,这种方案存在 2 个缺点:
  • 第一个缺点,因为只有一个历程,无法充分利用 多核 CPU 的性能;
  • 第二个缺点,Handler 对象在业务处置惩罚时,整个历程是无法处置惩罚其他毗连的变乱的,如果业务处置惩罚耗时比较长,那么就造成相应的延迟;
  • 所以,单 Reactor 单历程的方案不实用计算机麋集型的场景,只实用于业务处置惩罚非常快速的场景。 Redis 是由 C 语言实现的,它接纳的正是「单 Reactor 单历程」的方案,因为 Redis 业务处置惩罚主要是在内存中完成,操作的速度是很快的,性能瓶颈不在 CPU 上,所以 Redis 对于命令的处置惩罚是单历程的方案。
     2.2.单 Reactor 多线程 / 多历程

    如果要克服「单 Reactor 单线程 / 历程」方案的缺点,那么就必要引入多线程 / 多历程,如许就产生了单 Reactor 多线程/ 多历程的方案。
    闻其名不如看其图,先来看看「单 Reactor 多线程」方案的示意图如下

    多历程
详细说一下这个方案:


  • Reactor 对象通过 select (IO 多路复用接口) 监听变乱,收到变乱后通过 dispatch 举行分发,具体分发给 Acceptor 对象照旧 Handler 对象,还要看收到的变乱类型;
  • 如果是毗连创建的变乱,则交由 Acceptor 对象举行处置惩罚,Acceptor 对象会通过 accept 方法 获取毗连,并创建一个 Handler 对象来处置惩罚后续的相应变乱;
  • 如果不是毗连创建变乱, 则交由当前毗连对应的 Handler 对象来举行相应 上面的三个步骤和单 Reactor 单线程方案是一样的,接下来的步骤就开始不一样了:
  • Handler 对象不再负责业务处置惩罚,只负责数据的接收和发送,Handler 对象通过 read 读取到数据后,会将数据发给子线程里的 Processor 对象举行业务处置惩罚;
  • 子线程里的 Processor 对象就举行业务处置惩罚,处置惩罚完后,将效果发给主线程中的 Handler 对象,接着由 Handler 通过 send 方法将相应效果发送给 client;
  • 单 Reator 多线程的方案优势在于能够充分利用多核 CPU 的能,那既然引入多线程,那么自然就带来了多线程竞争资源的题目。
       比方,子线程完成业务处置惩罚后,要把效果传递给主线程的 Reactor 举行发送,这里涉及共享数据的竞争。
            要制止多线程由于竞争共享资源而导致数据错乱的题目,就必要在操作共享资源前加上互斥锁,以保证恣意时间里只有一个线程在操作共享资源,待该线程操作完释放互斥锁后,其他线程才有时机操作共享数据。
聊完单 Reactor 多线程的方案,接着来看看单 Reactor 多历程的方案   


  • 究竟上,单 Reactor 多历程相比单 Reactor 多线程实现起来很麻烦,主要因为要思量子历程 <-> 父历程的双向通信,而且父历程还得知道子历程要将数据发送给哪个客户端。
  •    而多线程间可以共享数据,虽然要额外思量并发题目,但是这远比历程间通信的复杂度低得多,因此实际应用中也看不到单 Reactor 多历程的模式。



       另外,「单 Reactor」的模式另有个题目,因为一个 Reactor 对象承担全部变乱的监听和相应,而且只在主线程中运行,在面对瞬间高并发的场景时,容易成为性能的瓶颈的地方。


2.3.多 Reactor 多历程 / 线程

要办理「单 Reactor」的题目,就是将「单 Reactor」实现成「多 Reactor」,如许就产生了第 多 Reactor 多历程 / 线程的方案。
老规矩,闻其名不如看其图。多 Reactor 多历程 / 线程方案的示意图如下(以线程为例):




方案详细阐明如下:


  • 主线程中的 MainReactor 对象通过 select 监控毗连创建变乱,收到变乱后通过 Acceptor 对象中的 accept 获取毗连,将新的毗连分配给某个子线程;
  • 子线程中的 SubReactor 对象将 MainReactor 对象分配的毗连加入 select 继续举行监听,并创建一个 Handler 用于处置惩罚毗连的相应变乱。
  • 如果有新的变乱发生时,SubReactor 对象会调用当前毗连对应的 Handler 对象来举行相应。
  • Handler 对象通过 read -> 业务处置惩罚 -> send 的流程来完成完整的业务流程。
  • 多 Reactor 多线程的方案虽然看起来复杂的,但是实际实现时比单 Reactor 多线程的方案要简单的多,原因如下:
  • 主线程和子线程分工明确,主线程只负责接收新毗连,子线程负责完成后续的业务处置惩罚。
  • 主线程和子线程的交互很简单,主线程只必要把新毗连传给子线程,子线程无须返回数据,直接就可以在子线程将处置惩罚效果发送给客户端。 大名鼎鼎的两个开源软件 Netty 和 Memcache 都接纳了「多 Reactor 多线程」的方案。
            接纳了「多 Reactor 多历程」方案的开源软件是 Nginx,不过方案与尺度的多 Reactor 多历程有些差异。
      具体差异体如今主历程中仅仅用来初始化 socket,并没有创建 mainReactor 来 accept 毗连,而是由子历程的 Reactor 来 accept 毗连,通过锁来控制一次只有一个子历程举行 accept(防止出现惊群征象),子历程 accept 新毗连后就放到自己的 Reactor 举行处置惩罚,不会再分配给其他子历程。
3.实现单 Reactor 单历程版本的TCP服务器

接下来我们就要来实现一个单 Reactor 单历程版本的TCP服务器

首先我们必要下面这些文件



这个Socket.hpp,nocopy.hpp,Epoller.hpp都是我们封装好了的,我们直接使用就行了,接下来我们只必要编写tcpserver.hpp和main.cc
3.1.Connection类


    承接上一节中的 epoll 服务器:如今的题目是,来自用户的数据大概会被 TCP 协议拆分成多个报文,那么服务器怎么才能知道什么时候最后一个小报文被接收了呢?要保证完整地读取客户端发送的数据,服务器必要将这次读取到的数据保存起来,对它们举行一定的处置惩罚(报文大概会有报头,以办理粘包题目),最后将它们拼接起来,再向上层应用步伐交付。
  题目是 Recver 中的缓冲区 buffer 是一个局部变量,每次循环都会重置。而服务端大概会有成百上千个来自客户端创建毗连后打开的文件描述符,这无法保证为每个文件描述符都保存本轮循环读取的数据。
    办理办法是为套接字文件描述符创建独立的接收和发送缓冲区,因为套接字是基于毗连的,所以用一个名为 Connection 的类来保存全部和毗连相干的属性,比方文件描述符,收发缓冲区,以及对文件描述符的操作(包括读、写和异常操作),所以要设置三个回调函数以供后续在差别的分支调用,最后还要设置一个回指指针,它将会保存服务器对象的地点,到背面会先容它的用处。
  1. #pragma once
  2. #include<iostream>
  3. #include<string>
  4. #include<functional>
  5. #include<memory>
  6. class Connection;
  7. using func_t =std::function<void(std::shared_ptr<Connection>)>;
  8. class Connection
  9. {
  10. private:
  11. int _sock;
  12. std::string _inbuffer;//这里来当输入缓冲区,但是这里是有缺点的,它不能处理二进制流
  13. std::string _outbuffer;
  14. func_t _recv_cb;//读回调函数
  15. func_t _send_cb;//写回调函数
  16. func_t _except_cd;//
  17. //添加一个回指指针
  18. std::shared_ptr<TcpServer> _tcp_server_ptr;
  19. };
  20. class TcpServer
  21. {
  22. };
复制代码


 Connection布局中除了包含文件描述符和其对应的读回调、写回调和异常回调外,还包含一个输入缓冲区_inbuffer、一个输出缓冲区_outbuffer以及一个回指指针_tcp_setver_ptr


   当某个文件描述符的读变乱就绪时,调用recv函数读取客户端发来的数据,但并不能保证读到了一个完整报文,因此必要将读取到的数据暂时存放到该文件描述符对应的_inBuffer中,当_inBuffer中可以分离出一个完整的报文后再将其分离出来举行数据处置惩罚,_inBuffer本质就是用来办理粘包题目的 
当处置惩罚完一个报文请求后,需将相应数据发送给客户端,但并不能保证底层TCP的发送缓冲区中有足够的空间写入,因此需将要发送的数据暂时存放到该文件描述符对应的_outBuffer中,当底层TCP的发送缓冲区中有空间,即写变乱就绪时,再依次发送_outBuffer中的数据
    Connection布局中设置回指指针_svrPtr,便于快速找到TcpServer对象,因为后续必要根据Connection布局找到这个TcpServer对象。如上层业务处置惩罚函数NetCal函数向_outBuffer输出缓冲区递交数据后,需通过Connection中的回指指针,"提醒"TcpServer举行处置惩罚
Connection布局中需提供一个管理回调的成员函数,便于外部对回调举行设置
3.2.TcpServer类

按照之前的经验,我们很快就能写出下面这个
  1. #include <memory> // 包含shared_ptr和unique_ptr等智能指针的头文件  
  2. #include <unordered_map> // 包含unordered_map的头文件  
  3.   
  4. class TcpServer  
  5. {  
  6. public:  
  7.     // 构造函数,初始化TCP服务器  
  8.     // 接收一个端口号作为参数,用于创建监听套接字,并初始化事件轮询器  
  9.     TcpServer(uint16_t port)  
  10.         : _port(port), // 存储端口号  
  11.           _listensock_ptr(new Sock()), // 创建一个Sock对象用于监听,并使用shared_ptr管理  
  12.           _epoller_ptr(new Epoller()) // 创建一个Epoller对象用于事件轮询,并使用shared_ptr管理  
  13.     {  
  14.         _listensock_ptr->Socket(); // 调用Sock的Socket方法创建套接字  
  15.         _listensock_ptr->Bind(_port); // 绑定端口号  
  16.         _listensock_ptr->Listen(); // 开始监听  
  17.   
  18.         // 注意:这里可能还需要将监听套接字添加到事件轮询器中,但代码中没有显示  
  19.     }  
  20.   
  21.     // 初始化方法,可以在这里添加额外的初始化逻辑  
  22.     // 但从提供的代码来看,这个方法目前是空的  
  23.     void Init()  
  24.     {  
  25.         // 可以在这里添加初始化代码  
  26.     }  
  27.   
  28.     // 启动方法,用于启动服务器的事件循环  
  29.     // 但从提供的代码来看,这个方法目前是空的  
  30.     // 通常,这里会包含启动事件轮询器的逻辑  
  31.     void Start()  
  32.     {  
  33.         // 启动事件轮询器,开始处理网络事件  
  34.         // 注意:实际代码中需要实现这部分逻辑  
  35.     }  
  36.   
  37.     // 析构函数,用于清理资源  
  38.     // 注意:由于使用了shared_ptr,这里的资源清理将是自动的  
  39.     // 但如果还有其他需要手动清理的资源(如文件描述符等),则应该在这里处理  
  40.     ~TcpServer()  
  41.     {  
  42.         // 析构函数会自动调用shared_ptr的析构,从而销毁Sock和Epoller对象  
  43.         // 如果需要,可以在这里添加额外的清理代码  
  44.     }  
  45.   
  46. private:  
  47.     uint16_t _port; // 存储服务器监听的端口号  
  48.     std::shared_ptr<Epoller> _epoller_ptr; // 事件轮询器的智能指针  
  49.     std::unordered_map<int, std::shared_ptr<Connection>> _connections; // 存储连接的哈希表,键为套接字描述符,值为Connection对象的智能指针  
  50.     std::shared_ptr<Sock> _listensock_ptr; // 监听套接字的智能指针  
  51. };
复制代码


这里答复一个题目

   

  • 当服务器开始运行时,一定会有大量的Connection布局体对象必要被new出来,那么这些布局体对象需不必要被管理呢?
  
   固然是必要的,所以在服务器类内里,定义了一个哈希表_connections,用sock来作为哈希表的键值,sock对应的布局体connection和sock一起作为键值对,也就是哈希桶中存储的值(存储键值对<sock, connection>),今天是不会出现哈希冲突的,所以每个键值下面的哈希桶只会挂一个键值对,即一个<sock, connection>.
          初始化服务器时,第一个必要被添加到哈希表中的sock,一定是listensock,所以在init方法中,先把listensock添加到哈希表内里,添加的同时还要传该listensock所对应的关心变乱的方法,对于listensock来说,只必要关注读方法即可,其他两个方法设为nullptr即可。
  
接下来是使用ET模式的,所以我们要实施下面这三点
相比于LT模式,关于ET(边缘触发)模式的使用,确实必要关注这几个关键点:


        1.设置EPOLLET标记:在将文件描述符(如socket)添加到epoll实例时,你必要通过epoll_event布局体中的events字段设置EPOLLET标记,以启用边缘触发模式。这告诉epoll,你希望在该文件描述符的状态从非就绪变为就绪时只接收一次变乱通知。
        2.将socket文件描述符设置为非阻塞:由于ET模式要求你能够在一个变乱通知中尽大概多地处置惩罚数据,直到没有更多数据可读,因此将socket设置为非阻塞模式是非常告急的。这允许你的read调用在没有数据可读时立即返回EAGAIN或EWOULDBLOCK,而不是阻塞等候。
        3.循环调用read直到返回EAGAIN或EWOULDBLOCK:在接收到一个ET模式的变乱通知后,你必要在一个循环中调用read函数,不停尝试从socket中读取数据,直到read返回EAGAIN或EWOULDBLOCK。这表现当前没有更多数据可读,你可以安全地继续等候下一个变乱通知。

为了方便使用,我们将设置文件描述符为非阻塞的代码封装到set_non_blocking函数内里去了
   TcpServer类
  1. class TcpServer : public nocopy
  2. {
  3.     static const int num = 64;
  4. public:
  5.     TcpServer(uint16_t port) : _port(port),
  6.                                _listensock_ptr(new Sock()),
  7.                                _epoller_ptr(new Epoller()),
  8.                                _quit(true)
  9.     {
  10.         _listensock_ptr->Socket();
  11.         set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞
  12.         _listensock_ptr->Bind(_port);
  13.         _listensock_ptr->Listen();
  14.     }
  15.     ~TcpServer()
  16.     {
  17.     }
  18.     // 设置文件描述符为非阻塞
  19.     int set_non_blocking(int fd)
  20.     {
  21.         int flags = fcntl(fd, F_GETFL, 0);
  22.         if (flags == -1)
  23.         {
  24.             perror("fcntl: get flags");
  25.             return -1;
  26.         }
  27.         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
  28.         {
  29.             perror("fcntl: set non-blocking");
  30.             return -1;
  31.         }
  32.         return 0;
  33.     }
  34.     void Init()
  35.     {
  36.     }
  37.     void Start()
  38.     {
  39.         _quit = false;
  40.         // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
  41.         // 1.将listensock添加到红黑树
  42.         _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, _listensock_ptr->Fd(), (EPOLLIN | EPOLLET)); // 注意这里的EPOLLET设置了ET模式
  43.         // 设置listen文件描述符为非阻塞,这个在初始化已经完成了
  44.         struct epoll_event revs[num]; // 专门用来处理事件的
  45.         while (!_quit)
  46.         {
  47.             int n=_epoller_ptr->EpollerWait(revs,num);
  48.             for(int i=0;i<num;i++)
  49.             {
  50.                
  51.             }
  52.         }
  53.         _quit = true;
  54.     }
  55. private:
  56.     uint16_t _port;
  57.     std::shared_ptr<Epoller> _epoller_ptr;
  58.     std::unordered_map<int, std::shared_ptr<Connection>> _connections;
  59.     std::shared_ptr<Sock> _listensock_ptr;
  60.     bool _quit;
  61. };
复制代码

3.3.Connection的真正用处 

写到这里,我们发现我们这个代码和之前的不是差不多吗?但究竟上,如今才到关键的地方!! 


   我们在上面提及:
          当某个文件描述符的读变乱就绪时,调用recv函数读取客户端发来的数据,但并不能保证读到了一个完整报文,因此必要将读取到的数据暂时存放到该文件描述符对应的_inBuffer中,当_inBuffer中可以分离出一个完整的报文后再将其分离出来举行数据处置惩罚,_inBuffer本质就是用来办理粘包题目的
          当处置惩罚完一个报文请求后,需将相应数据发送给客户端,但并不能保证底层TCP的发送缓冲区中有足够的空间写入,因此需将要发送的数据暂时存放到该文件描述符对应的_outBuffer中,当底层TCP的发送缓冲区中有空间,即写变乱就绪时,再依次发送_outBuffer中的数据
  
  
        写到这里,我们必要理解,我们不但仅必要将listen套接字加入到我们的epoll的红黑树内里,我还必要将listen套接字创建一个Connection对象,将listen套接字加入到Connection中,同时还要即要将<listen套接字,Connection>放到我们的unordered_map对象_connections内里去。只要我们把Connection对象管理好了,我们的毗连就管理好了。
所以,我们如今必要重新编写一下我们的毗连类——Connection类
Connection类
  1. class Connection
  2. {
  3. public:
  4.     Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) : _sock(sock), _tcp_server_ptr(tcp_server_ptr)
  5.     {
  6.     }
  7.     ~Connection()
  8.     {
  9.     }
  10.     void setcallback(func_t recv_cb, func_t send_cb, func_t except_cb)
  11.     {
  12.         _recv_cb = recv_cb;
  13.         _send_cb = send_cb;
  14.         _except_cd = except_cb;
  15.     }
  16. private:
  17.     int _sock;
  18.     std::string _inbuffer; // 这里来当输入缓冲区,但是这里是有缺点的,它不能处理二进制流
  19.     std::string _outbuffer;
  20.     func_t _recv_cb;   // 读回调函数
  21.     func_t _send_cb;   // 写回调函数
  22.     func_t _except_cd; //
  23.     // 添加一个回指指针
  24.     std::shared_ptr<TcpServer> _tcp_server_ptr;
  25.   };
复制代码

如今我们就必要来实现我们上面的内容了
  1. void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,
  2.                                                         std::string clientip="0.0.0.0",uint16_t clientport=0)
  3.     {
  4.         
  5.         // 1.给listen套接字创建一个Connection对象
  6.         std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this)); // 创建Connection对象
  7.         new_connection->setcallback(recv_cb, send_cb, except_cb);
  8.      
  9.         // 2.添加到_connections里面去
  10.         _connections.insert(std::make_pair(sock, new_connection));
  11.         
  12.         // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
  13.         // 3.将listensock添加到红黑树
  14.         _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, event); // 注意这里的EPOLLET设置了ET模式
  15.         std::cout << "add a new connection success,sockfd:" << sock << std::endl;
  16.     }
复制代码

正如上面所说,这个函数完成了三件事情。


  • 1.给listen套接字创建一个Connection对象
  • 2.添加到_connections内里去
  • 3.将listensock添加到红黑树

接下来我们就可以修改我们的Init函数
   init函数
  1. void Init()
  2.     {
  3.          _listensock_ptr->Socket();
  4.         set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞
  5.         _listensock_ptr->Bind(_port);
  6.         _listensock_ptr->Listen();
  7.         AddConnection(_listensock_ptr->Fd(),(EPOLLIN|EPOLLET),nullptr,nullptr,nullptr);//暂时设置成nullptr
  8.     }
复制代码

3.4.Dispatcher——变乱派发器

我们在这里将Start函数更名为Loop函数,而且创建一个新函数Dispatcher
  1. bool IsConnectionSafe(int fd)
  2.     {
  3.         auto iter = _connections.find(fd);
  4.         if (iter == _connections.end())
  5.         {
  6.             return false;
  7.         }
  8.         else
  9.         {
  10.             return true;
  11.         }
  12.     }
  13.     void Dispatcher() // 事件派发器
  14.     {
  15.         int n = _epoller_ptr->EpollerWait(revs, num); // 获取已经就绪的事件
  16.         for (int i = 0; i < num; i++)
  17.         {
  18.             uint32_t events = revs[i].events;
  19.             int sock = revs[i].data.fd;
  20.             // 如果出现异常,统一转发为读写问题,只需要处理读写就行
  21.             if (events & EPOLLERR) // 出现错误了
  22.             {
  23.                 events |= (EPOLLIN | EPOLLOUT);
  24.             }
  25.             if (events & EPOLLHUP)
  26.             {
  27.                 events |= (EPOLLIN | EPOLLOUT);
  28.             }
  29.             // 只需要处理读写就行
  30.             if (events & EPOLLIN&&IsConnectionSafe(sock)) // 读事件就绪
  31.             {
  32.                 if(_connections[sock]->_recv_cb)
  33.                 _connections[sock]->_recv_cb(_connections[sock]);
  34.             }
  35.             if (events & EPOLLOUT&&IsConnectionSafe(sock)) // 写事件就绪
  36.             {
  37.                 if(_connections[sock]->_send_cb)
  38.                 _connections[sock]->_send_cb(_connections[sock]);
  39.             }
  40.         }
  41.     }
  42.     void Loop()
  43.     {
  44.         _quit = false;
  45.         while (!_quit)
  46.         {
  47.             Dispatcher();
  48.         }
  49.         _quit = true;
  50.     }
复制代码


   变乱派发器是真正服务器要开始运行了,服务器会迁就绪的每个毗连都举行处置惩罚,首先如果毗连不在哈希表中,那就阐明这个毗连中的sock还没有被添加到epoll模子中的红黑树,不能直接举行处置惩罚,必要先添加到红黑树中,然后让epoll_wait来拿取就绪的毗连再告知步伐员,这个时候再举行处置惩罚,如许才不会等候,而是直接举行数据拷贝。
         Loop中处置惩罚就绪的变乱的方法非常非常的简单,如果该就绪的fd关心的是读变乱,那就直接调用该sock所在毗连布局体内部的读方法即可,如果是写变乱那就调用写方法即可。有人说那如果fd关心异常变乱呢?着实异常变乱大部分也都是读变乱,不过也有写变乱,所以处置惩罚异常的逻辑我们直接放到读方法和写方法内里即可,当有异常变乱到来时,直接去对应的读方法或写方法内里执行对应的逻辑即可。
  以下是一些大概触发 EPOLLERR 变乱的环境的示例:
  

  • 1.毗连错误:当使用非阻塞套接字举行毗连时,如果毗连失败,套接字的 epoll 变乱集中将包含 EPOLLERR 变乱。可以通过检査 events 字段中是否包含 EPOLLERR 来处置惩罚毗连错误。 2.接收错误:在非阻塞套接字上举行读取操作时,如果发生错误,比方对方关闭了毗连或者接收缓冲区溢出,套接字的 epol 变乱集中将包含 EPOLLERR 变乱。
    3.发送错误:在非阻塞套接字上举行写入操作时,如果发生错误,比方对方关闭了毗连或者发送缓冲区溢出,套接字的 epol 变乱集中将包含 EPOLLERR 变乱。
  • 4.文件操作错误:当使用 epol 监听文件描述符时,如果在读取或写入文件时发生错误,文件描述符的 epol 变乱集中将包含 EPOLLERR 变乱。
  必要留意的是,EPOLLERR 变乱通常与 EPOLLIN 或 EPOLLOUT 变乱一起使用。当发生 EPOLLERR 变乱时,通常也会同时发生 EPOLLIN 或EPOLLOUT 变乱..
  
            假设某个异常变乱发生了,那么这个异常变乱会自动被内核设置到epoll_wait返回的变乱集中,这个异常变乱一定会和一个sock关联,好比客户端和服务器用sock通信着,忽然客户端关闭毗连,那么服务器的sock上原本关心着读变乱,此时内核会自动将异常变乱设置到该sock关心的变乱聚集里,在处置惩罚sock关心的读变乱时,读方法会捎带处置惩罚掉这个异常变乱,处置惩罚方式为服务器关闭通信的sock,因为客户端已经把毗连断开了,服务器没必要维护和这个客户端的毗连了,服务器也断开就好,如许的逻辑在读方法内里就可以实现。
  
3.5.回调函数

我们看上面的变乱派发器,最后都是派发给_send_cb和_recv_cb函数,这两个函数我们还没有设置呢!所以我们必要来设置一下
  1. [/code] [code] void Init()
  2.     {
  3.         _listensock_ptr->Socket();
  4.         set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞
  5.         _listensock_ptr->Bind(_port);
  6.         _listensock_ptr->Listen();
  7.         AddConnection(_listensock_ptr->Fd(), (EPOLLIN | EPOLLET),
  8.                       std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // 暂时设置成nullptr
  9.     }
  10.     void Accepter(std::shared_ptr<Connection> conection)
  11.     {
  12.         while (1)
  13.         {
  14.             struct sockaddr_in peer;
  15.             socklen_t len=sizeof(peer);
  16.             int sock=accept(conection->Getsock(),(sockaddr*)&peer,&len);
  17.             if(sock>0)
  18.             {
  19.                 set_non_blocking(sock);//设置非阻塞
  20.                AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);
  21.             }
  22.             else{
  23.                 if(errno==EWOULDBLOCK) break;
  24.                 else if(errno==EINTR) break;
  25.                 else break;
  26.             }
  27.             
  28.         }
  29.     }
复制代码

  •     在代码实现上,给AddConnection传参时,用到了一个C++11的知识,就是bind绑定的使用,一般环境下,如果你将包装器包装的函数指针类型传参给包装器类型时,是没有任何题目的,因为包装器本质就是一个仿函数,内部调用了被包装的对象的方法,所以传参是没有任何题目的。
  •   但如果你要是在类内传参,那就有题目了,会出现类型不匹配的题目,这个题目真的很恶心,而且这个题目一报错就劈里啪啦的报一大堆错,因为function是模板,C++报错最恶心的就是模板报错,一报错人都要炸了。话说回来,为什么是类型不匹配呢?因为在类内调用类内方法时,着实是通过this指针来调用的,如果你直接将Accepter方法传给AddConnection,两者类型是不匹配的,因为Accepter的第一个参数是this指针,精确的做法是利用包装器的适配器bind来举行传参,bind将Accepter举行绑定,前两个参数为绑定的对象类型 和 给绑定的对象所传的参数,因为Accepter第一个参数是this指针,所以第一个参数就可以固定传this,背面的一个参数不应该是如今传,而应该是调用Accepter方法的时候再传,只有如许才能在类内将类成员函数指针传给包装器类型。
  •  不过吧另有一种不常用的方法,就是利用lambda表达式来举行传参,lambda可以捕捉上下文的this指针,然后再把lambda类型传给包装器类型,这种方式不常用,用起来也怪别扭的,function和bind是适配模式,两者搭配在一起用照旧更顺眼一些,lambda这种方式了解一下就好。
为了演示效果,我们写了一个打印函数来展示!
  1.     void Accepter(std::shared_ptr<Connection> conection)
  2.     {
  3.         while (1)
  4.         {
  5.             struct sockaddr_in peer;
  6.             socklen_t len=sizeof(peer);
  7.             int sock=accept(conection->Getsock(),(sockaddr*)&peer,&len);
  8.             if(sock>0)
  9.             {
  10.                 //获取客户端信息
  11.                 uint16_t clientport=ntohs(peer.sin_port);
  12.                 char buffer[128];
  13.                 inet_ntop(AF_INET,&(peer.sin_addr),buffer,sizeof(buffer));
  14.                 std::cout<<"get a new client from:"<<buffer<<conection->Getsock()<<std::endl;
  15.                
  16.                 set_non_blocking(sock);//设置非阻塞
  17.                AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);
  18.             }
  19.             else{
  20.                 if(errno==EWOULDBLOCK) break;
  21.                 else if(errno==EINTR) break;
  22.                 else break;
  23.             }
  24.             
  25.         }
  26.     }
  27.     void PrintConnection()
  28.     {
  29.         std::cout<<"_connections fd list: "<<std::endl;
  30.         for(auto&connection:_connections)
  31.         {
  32.             std::cout<<connection.second->Getsock()<<" ";
  33.         }
  34.         std::cout<<std::endl;
  35.     }
复制代码


   tcpserver.hpp 
  1. #pragma once#include <iostream>#include <string>#include <functional>#include <memory>#include <unordered_map> #include "Socket.hpp"#include "Epoller.hpp"#include "nocopy.hpp" class Connection;using func_t = std::function<void(std::shared_ptr<Connection>)>;class TcpServer; class Connection{public:    Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) : _sock(sock), _tcp_server_ptr(tcp_server_ptr)    {    }    ~Connection()    {    }    void setcallback(func_t recv_cb, func_t send_cb, func_t except_cb)    {        _recv_cb = recv_cb;        _send_cb = send_cb;        _except_cb = except_cb;    }    int Getsock()    {        return _sock;    } private:    int _sock;    std::string _inbuffer; // 这里来当输入缓冲区,但是这里是有缺点的,它不能处置惩罚二进制流    std::string _outbuffer; public:    func_t _recv_cb;   // 读回调函数    func_t _send_cb;   // 写回调函数    func_t _except_cb; //     // 添加一个回指指针    std::shared_ptr<TcpServer> _tcp_server_ptr;};class TcpServer : public nocopy{    static const int num = 64; public:    TcpServer(uint16_t port) : _port(port),                               _listensock_ptr(new Sock()),                               _epoller_ptr(new Epoller()),                               _quit(true)    {    }    ~TcpServer()    {    }    // 设置文件描述符为非阻塞    int set_non_blocking(int fd)    {        int flags = fcntl(fd, F_GETFL, 0);        if (flags == -1)        {            perror("fcntl: get flags");            return -1;        }        if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)        {            perror("fcntl: set non-blocking");            return -1;        }        return 0;    }     void Init()    {        _listensock_ptr->Socket();        set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞        _listensock_ptr->Bind(_port);        _listensock_ptr->Listen();        AddConnection(_listensock_ptr->Fd(), (EPOLLIN | EPOLLET),                      std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // 暂时设置成nullptr    }    void Accepter(std::shared_ptr<Connection> conection)
  2.     {
  3.         while (1)
  4.         {
  5.             struct sockaddr_in peer;
  6.             socklen_t len=sizeof(peer);
  7.             int sock=accept(conection->Getsock(),(sockaddr*)&peer,&len);
  8.             if(sock>0)
  9.             {
  10.                 //获取客户端信息
  11.                 uint16_t clientport=ntohs(peer.sin_port);
  12.                 char buffer[128];
  13.                 inet_ntop(AF_INET,&(peer.sin_addr),buffer,sizeof(buffer));
  14.                 std::cout<<"get a new client from:"<<buffer<<conection->Getsock()<<std::endl;
  15.                
  16.                 set_non_blocking(sock);//设置非阻塞
  17.                AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);
  18.             }
  19.             else{
  20.                 if(errno==EWOULDBLOCK) break;
  21.                 else if(errno==EINTR) break;
  22.                 else break;
  23.             }
  24.             
  25.         }
  26.     }
  27.     void PrintConnection()
  28.     {
  29.         std::cout<<"_connections fd list: "<<std::endl;
  30.         for(auto&connection:_connections)
  31.         {
  32.             std::cout<<connection.second->Getsock()<<" ";
  33.         }
  34.         std::cout<<std::endl;
  35.     }    void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb)    {        // 1.给listen套接字创建一个Connection对象        std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this)); // 创建Connection对象        new_connection->setcallback(recv_cb, send_cb, except_cb);         // 2.添加到_connections内里去        _connections.insert(std::make_pair(sock, new_connection));         // 将listen套接字添加到epoll中->将listensock和他关心的变乱,添加到内核的epoll模子中的红黑树内里        // 3.将listensock添加到红黑树        _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, event); // 留意这里的EPOLLET设置了ET模式         std::cout<<"add a new connection success,sockfd:"<<sock<<std::endl;    }    bool IsConnectionSafe(int fd)    {        auto iter = _connections.find(fd);        if (iter == _connections.end())        {            return false;        }        else        {            return true;        }    }    void Dispatcher() // 变乱派发器    {        int n = _epoller_ptr->EpollerWait(revs, num); // 获取已经就绪的变乱        for (int i = 0; i < num; i++)        {            uint32_t events = revs[i].events;            int sock = revs[i].data.fd;            // 如果出现异常,同一转发为读写题目,只必要处置惩罚读写就行            if (events & EPOLLERR) // 出现错误了            {                events |= (EPOLLIN | EPOLLOUT);            }            if (events & EPOLLHUP)            {                events |= (EPOLLIN | EPOLLOUT);            }            // 只必要处置惩罚读写就行            if (events & EPOLLIN && IsConnectionSafe(sock)) // 读变乱就绪            {                if (_connections[sock]->_recv_cb)                    _connections[sock]->_recv_cb(_connections[sock]);            }            if (events & EPOLLOUT && IsConnectionSafe(sock)) // 写变乱就绪            {                if (_connections[sock]->_send_cb)                    _connections[sock]->_send_cb(_connections[sock]);            }        }    }    void Loop()    {        _quit = false;         while (!_quit)        {            Dispatcher();            PrintConnection();        }        _quit = true;    } private:    uint16_t _port;    std::shared_ptr<Epoller> _epoller_ptr;    std::unordered_map<int, std::shared_ptr<Connection>> _connections;    std::shared_ptr<Sock> _listensock_ptr;    bool _quit;    struct epoll_event revs[num]; // 专门用来处置惩罚变乱的};
复制代码

我们看看效果











还可以啊!!!到这里我们就算是买通了我们毗连的过程,我们接着看
3.6.区分读写异常变乱

我们看看这个Accepter函数
  1.   void Accepter(std::shared_ptr<Connection> conection)
  2.     {
  3.         while (1)
  4.         {
  5.             struct sockaddr_in peer;
  6.             socklen_t len=sizeof(peer);
  7.             int sock=accept(conection->Getsock(),(sockaddr*)&peer,&len);
  8.             if(sock>0)
  9.             {
  10.                 //获取客户端信息
  11.                 uint16_t clientport=ntohs(peer.sin_port);
  12.                 char buffer[128];
  13.                 inet_ntop(AF_INET,&(peer.sin_addr),buffer,sizeof(buffer));
  14.                 std::cout<<"get a new client from:"<<buffer<<conection->Getsock()<<std::endl;
  15.                
  16.                 set_non_blocking(sock);//设置非阻塞
  17.                AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);
  18.             }
  19.             else{
  20.                 if(errno==EWOULDBLOCK) break;
  21.                 else if(errno==EINTR) break;
  22.                 else break;
  23.             }
  24.             
  25.         }
  26.     }
复制代码


留意内里的AddConnection函数,这个函数是用文件描述符来创建Connection对象的。但是文件描述符有两种啊!一个是listen套接字,一个是普通套接字。Listen套接字只关心读变乱,而其他文件描述符则是关心读,写,异常变乱的!!但是我们在这里却同一使用了AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);一棍子打死,只关心读变乱。如许子是非常不公道的。
所以对应AddConnection (sock,EPOLLIN,nullptr,nullptr,nullptr);这里,我们必要修改
  1. //连接管理器
  2.     void Accepter(std::shared_ptr<Connection> conection)
  3.     {
  4.         while (1)
  5.         {
  6.             struct sockaddr_in peer;
  7.             socklen_t len=sizeof(peer);
  8.             int sock=accept(conection->Getsock(),(sockaddr*)&peer,&len);
  9.             if(sock>0)
  10.             {
  11.                 //获取客户端信息
  12.                 uint16_t clientport=ntohs(peer.sin_port);
  13.                 char buffer[128];
  14.                 inet_ntop(AF_INET,&(peer.sin_addr),buffer,sizeof(buffer));
  15.                 std::cout<<"get a new client from:"<<buffer<<conection->Getsock()<<std::endl;
  16.                
  17.                 set_non_blocking(sock);//设置非阻塞
  18.                AddConnection (sock,EPOLLIN,
  19.                     std::bind(&TcpServer::Recver, this, std::placeholders::_1),
  20.                     std::bind(&TcpServer::Sender, this, std::placeholders::_1),
  21.                     std::bind(&TcpServer::Excepter, this, std::placeholders::_1);
  22.             }
  23.             else{
  24.                 if(errno==EWOULDBLOCK) break;
  25.                 else if(errno==EINTR) break;
  26.                 else break;
  27.             }
  28.             
  29.         }
  30.     }
  31.     //事件管理器
  32.      void Recver(std::shared_ptr<Connection> conection)
  33.     {
  34.         std::cout<<"haha ,got you"<<conection->Getsock()<<std::endl;
  35.     }
  36.     void Sender(std::shared_ptr<Connection> conection)
  37.     {}
  38.     void Excepter(std::shared_ptr<Connection> conection)
  39.     {}
复制代码
我们可以测试一下,到底有没有用




我们发现它在不停打印!阐明我们成功了!!
3.7.读变乱的处置惩罚

好了,我们如今就应该来处置惩罚读写异常变乱
   

  • 读变乱的处置惩罚
  

    Recver这里照旧和之前一样的题目,也是前面在写三个多路转接接口服务器时,不停没有处置惩罚的题目,你怎么保证你一次就把全部数据全部都读上来了呢?
如果不能保证,那就和Accepter一样,必须打死循环来举行读取,当recv返回值大于0,那我们就把读取到的数据先放入缓冲区,缓冲区在哪里呢?
  着实就在connection参数所指向的布局体内里,布局体里会有sock所对应的收发缓冲区。然后就调用外部传入的回调函数_service,对服务器收到的数据举行应用层的业务逻辑处置惩罚。

  • 当recv读到0时,阐明客户端把毗连关了,那这
  • 就算异常变乱,直接回调sock对应的异常处置惩罚方法即可。
  • 当recv的返回值小于0,同时错误码被设置为EAGAIN或EWOULDBLOCK时,则阐明recv已经把sock底层的数据全部读走了,则此时直接break跳出循环即可。也有大概是被信号给停止了,则此时应该继续执行循环
  • 另外一种环境就是recv系统调用真的出错了,则此时也调用sock的异常方法举行处置惩罚即可。
  • 业务逻辑处置惩罚方法应该在本次循环读取到全部的数据之后再举行处置惩罚。
  1. class Connection
  2. {
  3. ......
  4.     void Append(std::string info)//读取成功了,就把读取的内容存到这里来!
  5.     {
  6.         _inbuffer+=info;
  7.     }
  8. private:
  9.     int _sock;
  10.     std::string _inbuffer; // 这里来当输入缓冲区,但是这里是有缺点的,它不能处理二进制流
  11.     std::string _outbuffer;
  12. };
复制代码

我们接着写,
   为了方便,我们这里给Connection类加入两个新成员,下面是修改的部分
  1. class Connection
  2. {
  3. public:
  4.     uint16_t _clientport;
  5.     std::string _clientip;
  6. };
  7. class TcpServer : public nocopy
  8. {
  9.     // 连接管理器
  10.     void Accepter(std::shared_ptr<Connection> conection)
  11.     {
  12.         while (1)
  13.         {
  14.             struct sockaddr_in peer;
  15.             socklen_t len = sizeof(peer);
  16.             int sock = accept(conection->Getsock(), (sockaddr *)&peer, &len);
  17.             if (sock > 0)
  18.             {
  19.                 // 获取客户端信息
  20.                 uint16_t clientport = ntohs(peer.sin_port);
  21.                 char buffer[128];
  22.                 inet_ntop(AF_INET, &(peer.sin_addr), buffer, sizeof(buffer));
  23.                 std::cout << "get a new client from:" << buffer << conection->Getsock() << std::endl;
  24.                 set_non_blocking(sock); // 设置非阻塞
  25.                 AddConnection(sock, EPOLLIN,
  26.                               std::bind(&TcpServer::Recver, this, std::placeholders::_1),
  27.                               std::bind(&TcpServer::Sender, this, std::placeholders::_1),
  28.                               std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
  29.                               buffer,clientport);
  30.             }
  31.             else
  32.             {
  33.                 if (errno == EWOULDBLOCK)
  34.                     break;
  35.                 else if (errno == EINTR)
  36.                     break;
  37.                 else
  38.                     break;
  39.             }
  40.         }
  41.     }
  42.     // 事件管理器
  43.     void Recver(std::shared_ptr<Connection> conection)
  44.     {
  45.         int sock = conection->Getsock();
  46.         while (1)
  47.         {
  48.             char buffer[128];
  49.             memset(buffer, 0, sizeof(buffer));
  50.             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
  51.             if (n > 0)                                             // 成功了!!
  52.             {
  53.                 conection->Append(buffer); // 把读取的数据放到Connection对象的输入缓冲区里面
  54.             }
  55.             else if (n == 0)
  56.             {
  57.             }
  58.             else
  59.             {
  60.             }
  61.         }
  62.     }
  63.    
  64.     void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,
  65.                                                         std::string clientip="0.0.0.0",uint16_t clientport=0)
  66.     {
  67.         
  68.         // 1.给listen套接字创建一个Connection对象
  69.         std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this)); // 创建Connection对象
  70.         new_connection->setcallback(recv_cb, send_cb, except_cb);
  71.         new_connection->_clientip=clientip;
  72.         new_connection->_clientport=clientport;
  73.         // 2.添加到_connections里面去
  74.         _connections.insert(std::make_pair(sock, new_connection));
  75.         
  76.         // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
  77.         // 3.将listensock添加到红黑树
  78.         _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, event); // 注意这里的EPOLLET设置了ET模式
  79.         std::cout << "add a new connection success,sockfd:" << sock << std::endl;
  80.     }
  81.    
  82. };
复制代码
具体修改看代码即可
  
 我们接着美满我们的读变乱的处置惩罚
  1. // 事件管理器
  2.     void Recver(std::shared_ptr<Connection> conection)
  3.     {
  4.         int sock = conection->Getsock();
  5.         while (1)
  6.         {
  7.             char buffer[128];
  8.             memset(buffer, 0, sizeof(buffer));
  9.             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
  10.             if (n > 0)                                             // 成功了!!
  11.             {
  12.                 conection->Append(buffer); // 把读取的数据放到Connection对象的输入缓冲区里面
  13.             }
  14.             else if (n == 0) // 客户端
  15.             {
  16.                 std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "quit" << std::endl;
  17.                 Excepter(conection);
  18.             }
  19.             else
  20.             {
  21.                 if (errno == EWOULDBLOCK)
  22.                     break;
  23.                 else if (errno == EINTR)
  24.                     break;
  25.                 else
  26.                 {
  27.                     std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "recv err" << std::endl;
  28.                     Excepter(conection);
  29.                 }
  30.             }
  31.         }
  32.     }
  33.     void Sender(std::shared_ptr<Connection> conection)
  34.     {
  35.     }
  36.     void Excepter(std::shared_ptr<Connection> conection)
  37.     {
  38.     }
复制代码

好,我们把源代码拿出来测试一下
   tcpserver.hpp
  
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. #include <memory>
  6. #include <unordered_map>
  7. #include "Socket.hpp"
  8. #include "Epoller.hpp"
  9. #include "nocopy.hpp"
  10. class Connection;
  11. using func_t = std::function<void(std::shared_ptr<Connection>)>;
  12. class TcpServer;
  13. class Connection
  14. {
  15. public:
  16.     Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) : _sock(sock), _tcp_server_ptr(tcp_server_ptr)
  17.     {
  18.     }
  19.     ~Connection()
  20.     {
  21.     }
  22.     void setcallback(func_t recv_cb, func_t send_cb, func_t except_cb)
  23.     {
  24.         _recv_cb = recv_cb;
  25.         _send_cb = send_cb;
  26.         _except_cb = except_cb;
  27.     }
  28.     int Getsock()
  29.     {
  30.         return _sock;
  31.     }
  32.     void Append(std::string info)
  33.     {
  34.         _inbuffer += info;
  35.     }
  36. public:
  37.     int _sock;
  38.     std::string _inbuffer; // 这里来当输入缓冲区,但是这里是有缺点的,它不能处理二进制流
  39.     std::string _outbuffer;
  40. public:
  41.     func_t _recv_cb;                            // 读回调函数
  42.     func_t _send_cb;                            // 写回调函数
  43.     func_t _except_cb;                          //
  44.     std::shared_ptr<TcpServer> _tcp_server_ptr; // 添加一个回指指针
  45.     uint16_t _clientport;
  46.     std::string _clientip;
  47. };
  48. class TcpServer : public nocopy
  49. {
  50.     static const int num = 64;
  51. public:
  52.     TcpServer(uint16_t port) : _port(port),
  53.                                _listensock_ptr(new Sock()),
  54.                                _epoller_ptr(new Epoller()),
  55.                                _quit(true)
  56.     {
  57.     }
  58.     ~TcpServer()
  59.     {
  60.     }
  61.     // 设置文件描述符为非阻塞
  62.     int set_non_blocking(int fd)
  63.     {
  64.         int flags = fcntl(fd, F_GETFL, 0);
  65.         if (flags == -1)
  66.         {
  67.             perror("fcntl: get flags");
  68.             return -1;
  69.         }
  70.         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
  71.         {
  72.             perror("fcntl: set non-blocking");
  73.             return -1;
  74.         }
  75.         return 0;
  76.     }
  77.     void Init()
  78.     {
  79.         _listensock_ptr->Socket();
  80.         set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞
  81.         _listensock_ptr->Bind(_port);
  82.         _listensock_ptr->Listen();
  83.         AddConnection(_listensock_ptr->Fd(), (EPOLLIN | EPOLLET),
  84.                       std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // 暂时设置成nullptr
  85.     }
  86.     // 连接管理器
  87.     void Accepter(std::shared_ptr<Connection> conection)
  88.     {
  89.         while (1)
  90.         {
  91.             struct sockaddr_in peer;
  92.             socklen_t len = sizeof(peer);
  93.             int sock = accept(conection->Getsock(), (sockaddr *)&peer, &len);
  94.             if (sock > 0)
  95.             {
  96.                 // 获取客户端信息
  97.                 uint16_t clientport = ntohs(peer.sin_port);
  98.                 char buffer[128];
  99.                 inet_ntop(AF_INET, &(peer.sin_addr), buffer, sizeof(buffer));
  100.                 std::cout << "get a new client from:" << buffer << conection->Getsock() << std::endl;
  101.                 set_non_blocking(sock); // 设置非阻塞
  102.                 AddConnection(sock, EPOLLIN,
  103.                               std::bind(&TcpServer::Recver, this, std::placeholders::_1),
  104.                               std::bind(&TcpServer::Sender, this, std::placeholders::_1),
  105.                               std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
  106.                               buffer, clientport);
  107.             }
  108.             else
  109.             {
  110.                 if (errno == EWOULDBLOCK)
  111.                     break;
  112.                 else if (errno == EINTR)
  113.                     break;
  114.                 else
  115.                     break;
  116.             }
  117.         }
  118.     }
  119.     // 事件管理器
  120.     void Recver(std::shared_ptr<Connection> conection)
  121.     {
  122.         int sock = conection->Getsock();
  123.         while (1)
  124.         {
  125.             char buffer[128];
  126.             memset(buffer, 0, sizeof(buffer));
  127.             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
  128.             if (n > 0)                                             // 成功了!!
  129.             {
  130.                 conection->Append(buffer); // 把读取的数据放到Connection对象的输入缓冲区里面
  131.             }
  132.             else if (n == 0) // 客户端
  133.             {
  134.                 std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "quit" << std::endl;
  135.                 Excepter(conection);
  136.             }
  137.             else
  138.             {
  139.                 if (errno == EWOULDBLOCK)
  140.                     break;
  141.                 else if (errno == EINTR)
  142.                     break;
  143.                 else
  144.                 {
  145.                     std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "recv err" << std::endl;
  146.                     Excepter(conection);
  147.                 }
  148.             }
  149.         }
  150.     }
  151.     void Sender(std::shared_ptr<Connection> conection)
  152.     {
  153.     }
  154.     void Excepter(std::shared_ptr<Connection> conection)
  155.     {
  156.         std::cout<<"Execpted ! fd:"<<conection->Getsock()<<std::endl;
  157.     }
  158.     void PrintConnection()
  159.     {
  160.         std::cout << "_connections fd list: " ;
  161.         for (auto &connection : _connections)
  162.         {
  163.             std::cout << connection.second->Getsock() << " ";
  164.             std::cout<<connection.second->_inbuffer;
  165.         }
  166.         std::cout << std::endl;
  167.     }
  168.     void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,
  169.                        std::string clientip = "0.0.0.0", uint16_t clientport = 0)
  170.     {
  171.         // 1.给listen套接字创建一个Connection对象
  172.         std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this)); // 创建Connection对象
  173.         new_connection->setcallback(recv_cb, send_cb, except_cb);
  174.         new_connection->_clientip = clientip;
  175.         new_connection->_clientport = clientport;
  176.         // 2.添加到_connections里面去
  177.         _connections.insert(std::make_pair(sock, new_connection));
  178.         // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
  179.         // 3.将listensock添加到红黑树
  180.         _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, event); // 注意这里的EPOLLET设置了ET模式
  181.         std::cout << "add a new connection success,sockfd:" << sock << std::endl;
  182.     }
  183.     bool IsConnectionSafe(int fd)
  184.     {
  185.         auto iter = _connections.find(fd);
  186.         if (iter == _connections.end())
  187.         {
  188.             return false;
  189.         }
  190.         else
  191.         {
  192.             return true;
  193.         }
  194.     }
  195.     void Dispatcher() // 事件派发器
  196.     {
  197.         int n = _epoller_ptr->EpollerWait(revs, num); // 获取已经就绪的事件
  198.         for (int i = 0; i < num; i++)
  199.         {
  200.             uint32_t events = revs[i].events;
  201.             int sock = revs[i].data.fd;
  202.             // 如果出现异常,统一转发为读写问题,只需要处理读写就行
  203.             if (events & EPOLLERR) // 出现错误了
  204.             {
  205.                 events |= (EPOLLIN | EPOLLOUT);
  206.             }
  207.             if (events & EPOLLHUP)
  208.             {
  209.                 events |= (EPOLLIN | EPOLLOUT);
  210.             }
  211.             // 只需要处理读写就行
  212.             if (events & EPOLLIN && IsConnectionSafe(sock)) // 读事件就绪
  213.             {
  214.                 if (_connections[sock]->_recv_cb)
  215.                     _connections[sock]->_recv_cb(_connections[sock]);
  216.             }
  217.             if (events & EPOLLOUT && IsConnectionSafe(sock)) // 写事件就绪
  218.             {
  219.                 if (_connections[sock]->_send_cb)
  220.                     _connections[sock]->_send_cb(_connections[sock]);
  221.             }
  222.         }
  223.     }
  224.     void Loop()
  225.     {
  226.         _quit = false;
  227.         while (!_quit)
  228.         {
  229.             Dispatcher();
  230.             PrintConnection();
  231.         }
  232.         _quit = true;
  233.     }
  234. private:
  235.     uint16_t _port;
  236.     std::shared_ptr<Epoller> _epoller_ptr;
  237.     std::unordered_map<int, std::shared_ptr<Connection>> _connections;
  238.     std::shared_ptr<Sock> _listensock_ptr;
  239.     bool _quit;
  240.     struct epoll_event revs[num]; // 专门用来处理事件的
  241. };
复制代码













好像没有什么题目啊!!! 我们确实做到了读取数据!但是我们照旧没有处置惩罚这些数据!!
所以我们又要弄一个回调函数来处置惩罚这些数据
  1. class TcpServer : public nocopy
  2. {
  3.     public:
  4.     TcpServer(uint16_t port,func_t OnMessage) : _port(port),
  5.                                _listensock_ptr(new Sock()),
  6.                                _epoller_ptr(new Epoller()),
  7.                                _quit(true),
  8.                                _OnMessage(OnMessage)
  9.     {
  10.     }
  11. ...
  12.     // 事件管理器
  13.     void Recver(std::shared_ptr<Connection> conection)
  14.     {
  15.         int sock = conection->Getsock();
  16.         while (1)
  17.         {
  18.             char buffer[128];
  19.             memset(buffer, 0, sizeof(buffer));
  20.             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
  21.             if (n > 0)                                             // 成功了!!
  22.             {
  23.                 conection->Append(buffer); // 把读取的数据放到Connection对象的输入缓冲区里面
  24.             }
  25.             else if (n == 0) // 客户端
  26.             {
  27.                 std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "quit" << std::endl;
  28.                 Excepter(conection);
  29.             }
  30.             else
  31.             {
  32.                 if (errno == EWOULDBLOCK)
  33.                     break;
  34.                 else if (errno == EINTR)
  35.                     break;
  36.                 else
  37.                 {
  38.                     std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "recv err" << std::endl;
  39.                     Excepter(conection);
  40.                 }
  41.             }
  42.             _OnMessage(conection);//将读取的数据交给上层处理
  43.         }
  44.     }
  45.    
  46. private:
  47.   ....
  48.     //上层处理数据
  49.     func_t _OnMessage;//将数据交给上层
  50. };
复制代码
我们要求上层来完成检测,来处置惩罚粘包的题目!这个_OnMessage不就是回调函数吗!!!
我们简单的写一个
   mian.cc
  1. #include"tcpserver.hpp"
  2. #include<memory>
  3. void DefaultOmMessage(std::shared_ptr<Connection> connection_ptr)
  4. {
  5.     std::cout<<"上层得到了数据:"<<connection_ptr->_inbuffer<<std::endl;
  6. }
  7. int main()
  8. {
  9.     std::unique_ptr<TcpServer> svr(new TcpServer(8877,DefaultOmMessage));
  10.     svr->Init();
  11.     svr->Loop();
  12. }
复制代码
我们来测试一下行不可



3.8.处置惩罚数据 ——(反)序列化/编解码

可以哦!!! 我们已经拿到数据了,那我们应该怎么处置惩罚数据呢?我们应该在这里完成序列和反序列化!!!由于我之前写过序列和反序列化,以及编码解码的代码,所以我直接拿过复制粘贴好了。
   Serialization.hpp
  1. #pragma
  2. #define CRLF "\t"               // 分隔符
  3. #define CRLF_LEN strlen(CRLF)   // 分隔符长度
  4. #define SPACE " "               // 空格
  5. #define SPACE_LEN strlen(SPACE) // 空格长度
  6. #define OPS "+-*/%"             // 运算符
  7. #include <iostream>
  8. #include <string>
  9. #include <cstring>
  10. #include<assert.h>
  11. //参数len为in的长度,是一个输出型参数。如果为0代表err
  12. std::string decode(std::string& in,size_t*len)
  13. {
  14.     assert(len);//如果长度为0是错误的
  15.    
  16.     // 1.确认in的序列化字符串完整(分隔符)
  17.     *len=0;
  18.     size_t pos = in.find(CRLF);//查找\t第一次出现时的下标
  19.     //查找不到,err
  20.     if(pos == std::string::npos){
  21.         return "";//返回空串
  22.     }
  23.    
  24.     // 2.有分隔符,判断长度是否达标
  25.     // 此时pos下标正好就是标识大小的字符长度
  26.     std::string inLenStr = in.substr(0,pos);//从下标0开始一直截取到第一个\t之前
  27.     //到这里我们要明白,我们这上面截取的是最开头的长度,也就是说,我们截取到的一定是个数字,这个是我们序列化字符的长度
  28.    
  29.     size_t inLen = atoi(inLenStr.c_str());//把截取的这个字符串转int,inLen就是序列化字符的长度
  30.     //传入的字符串的长度 - 第一个\t前面的字符数 - 2个\t
  31.     size_t left = in.size() - inLenStr.size()- 2*CRLF_LEN;//原本预计的序列化字符串长度
  32.     if(left<inLen){//真实的序列化字符串长度和预计的字符串长度进行比较
  33.         return ""; //剩下的长度(序列化字符串的长度)没有达到标明的长度
  34.     }
  35.     // 3.走到此处,字符串完整,开始提取序列化字符串
  36.     std::string ret = in.substr(pos+CRLF_LEN,inLen);//从pos+CRLF_LEN下标开始读取inLen个长度的字符串——即序列化字符串
  37.     *len = inLen;
  38.     // 4.因为in中可能还有其他的报文(下一条)
  39.     // 所以需要把当前的报文从in中删除,方便下次decode,避免二次读取
  40.     size_t rmLen = inLenStr.size() + ret.size() + 2*CRLF_LEN;//长度+2个\t+序列字符串的长度
  41.     in.erase(0,rmLen);//移除从索引0开始长度为rmLen的字符串
  42.     // 5.返回
  43.     return ret;
  44. }
  45. //编码不需要修改源字符串,所以const。参数len为in的长度
  46. std::string encode(const std::string& in,size_t len)
  47. {
  48.     std::string ret = std::to_string(len);//将长度转为字符串添加在最前面,作为标识
  49.     ret+=CRLF;
  50.     ret+=in;
  51.     ret+=CRLF;
  52.     return ret;
  53. }
  54. class Request//客户端使用的
  55. {
  56. public:
  57.     // 将用户的输入转成内部成员
  58.     // 用户可能输入x+y,x+ y,x +y,x + y等等格式
  59.     // 提前修改用户输入(主要还是去掉空格),提取出成员
  60.     Request()
  61.     {
  62.       
  63.     }
  64.    
  65.     // 删除输入中的空格
  66.     void rmSpace(std::string &in)
  67.     {
  68.         std::string tmp;
  69.         for (auto e : in)
  70.         {
  71.             if (e != ' ')
  72.             {
  73.                 tmp += e;
  74.             }
  75.         }
  76.         in = tmp;
  77.     }
  78.     // 序列化 (入参应该是空的,会返回一个序列化字符串)
  79.     void serialize(std::string &out)//这个是客户端在发送消息给服务端时使用的,在这之后要先编码,才能发送出去
  80.     {
  81.         // x + y
  82.         out.clear(); // 序列化的入参是空的
  83.         out += std::to_string(_x);
  84.         out += SPACE;
  85.         out += _ops; // 操作符不能用tostring,会被转成ascii
  86.         out += SPACE;
  87.         out += std::to_string(_y);
  88.         // 不用添加分隔符(这是encode要干的事情)
  89.     }
  90.     //序列化之后应该要编码,去加个长度
  91.     // 反序列化(解开
  92.     bool deserialize(const std::string &in)//这个是服务端接收到客户端发来的消息后使用的,在这之前要先解码
  93.     {
  94.         // x + y 需要取出x,y和操作符
  95.         size_t space1 = in.find(SPACE);  // 第一个空格
  96.         if (space1 == std::string::npos) // 没找到
  97.         {
  98.             return false;
  99.         }
  100.         size_t space2 = in.rfind(SPACE); // 第二个空格
  101.         if (space2 == std::string::npos) // 没找到
  102.         {
  103.             return false;
  104.         }
  105.         // 两个空格都存在,开始取数据
  106.         std::string dataX = in.substr(0, space1);
  107.         std::string dataY = in.substr(space2 + SPACE_LEN); // 默认取到结尾
  108.         std::string op = in.substr(space1 + SPACE_LEN, space2 - (space1 + SPACE_LEN));
  109.         if (op.size() != 1)
  110.         {
  111.             return false; // 操作符长度有问题
  112.         }
  113.         // 没问题了,转内部成员
  114.         _x = atoi(dataX.c_str());
  115.         _y = atoi(dataY.c_str());
  116.         _ops = op[0];
  117.         return true;
  118.     }
  119. public:
  120.     int _x;
  121.     int _y;
  122.     char _ops;
  123. };
  124. class Response // 服务端必须回应
  125. {
  126.     public:
  127.     Response(int code = 0, int result = 0)
  128.         : _exitCode(code), _result(result)
  129.     {
  130.     }
  131.     // 序列化
  132.     void serialize(std::string &out)//这个是服务端发送消息给客户端使用的,使用之后要编码
  133.     {
  134.         // code ret
  135.         out.clear();
  136.         out += std::to_string(_exitCode);
  137.         out += SPACE;
  138.         out += std::to_string(_result);
  139.         out += CRLF;
  140.     }
  141.     // 反序列化
  142.     bool deserialize(const std::string &in)//这个是客户端接收服务端消息后使用的,使用之前要先解码
  143.     {
  144.         // 只有一个空格
  145.         size_t space = in.find(SPACE);  // 寻找第一个空格的下标
  146.         if (space == std::string::npos) // 没找到
  147.         {
  148.             return false;
  149.         }
  150.         std::string dataCode = in.substr(0, space);
  151.         std::string dataRes = in.substr(space + SPACE_LEN);
  152.         _exitCode = atoi(dataCode.c_str());
  153.         _result = atoi(dataRes.c_str());
  154.         return true;
  155.     }
  156. public:
  157.     int _exitCode; // 计算服务的退出码
  158.     int _result;   // 结果
  159. };
  160. Response Caculater(const Request& req)
  161. {
  162.     Response resp;//构造函数中已经指定了exitcode为0
  163.     switch (req._ops)
  164.     {
  165.     case '+':
  166.         resp._result = req._x + req._y;
  167.         break;
  168.     case '-':
  169.         resp._result = req._x - req._y;
  170.         break;
  171.     case '*':
  172.         resp._result = req._x * req._y;
  173.         break;
  174.     case '%':
  175.     {
  176.         if(req._y == 0)
  177.         {
  178.             resp._exitCode = -1;//取模错误
  179.             break;
  180.         }
  181.         resp._result = req._x % req._y;//取模是可以操作负数的
  182.         break;
  183.     }
  184.     case '/':
  185.     {
  186.         if(req._y == 0)
  187.         {
  188.             resp._exitCode = -2;//除0错误
  189.             break;
  190.         }
  191.         resp._result = req._x / req._y;//取模是可以操作负数的
  192.         break;
  193.     }
  194.     default:
  195.         resp._exitCode = -3;//操作符非法
  196.         break;
  197.     }
  198.     return resp;
  199. }
复制代码
 接下来我们就可以对读取的数据举行处置惩罚了!!!怎么处置惩罚呢?我们照旧跟自定义协议那篇一样,搞一个计算器好了!
   Caculater函数
  1. Response Caculater(const Request& req)
  2. {
  3.     Response resp;//构造函数中已经指定了exitcode为0
  4.     switch (req._ops)
  5.     {
  6.     case '+':
  7.         resp._result = req._x + req._y;
  8.         break;
  9.     case '-':
  10.         resp._result = req._x - req._y;
  11.         break;
  12.     case '*':
  13.         resp._result = req._x * req._y;
  14.         break;
  15.     case '%':
  16.     {
  17.         if(req._y == 0)
  18.         {
  19.             resp._exitCode = -1;//取模错误
  20.             break;
  21.         }
  22.         resp._result = req._x % req._y;//取模是可以操作负数的
  23.         break;
  24.     }
  25.     case '/':
  26.     {
  27.         if(req._y == 0)
  28.         {
  29.             resp._exitCode = -2;//除0错误
  30.             break;
  31.         }
  32.         resp._result = req._x / req._y;//取模是可以操作负数的
  33.         break;
  34.     }
  35.     default:
  36.         resp._exitCode = -3;//操作符非法
  37.         break;
  38.     }
  39.     return resp;
  40. }
复制代码
我们把这个放到了我们的 Serialization.hpp
接下来就接着修改我们的main.cc好了!
   main.cc
  1. #include "tcpserver.hpp"
  2. #include <memory>
  3. #include "Serialization.hpp"
  4. void DefaultOmMessage(std::shared_ptr<Connection> connection_ptr)
  5. {
  6.     std::cout << "上层得到了数据:" << connection_ptr->_inbuffer << std::endl;
  7.     std::string inbuf = connection_ptr->_inbuffer;
  8.     size_t packageLen = inbuf.size();
  9.     //由于我们是使用telnet来测试的所以,我们就不解码了
  10.     /*
  11.     // 3.1.解码和反序列化客户端传来的消息
  12.     std::string package = decode(inbuf, &packageLen); // 解码
  13.     if (packageLen == 0)
  14.     {
  15.         printf("decode err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, packageLen);
  16.         // 报文不完整或有误
  17.     }
  18.     */
  19.     Request req;
  20.     bool deStatus = req.deserialize(inbuf); // 使用Request的反序列化,packsge内部各个成员已经有了数值
  21.     if (deStatus)                             // 获取消息反序列化成功
  22.     {
  23.         // 3.2.获取结构化的相应
  24.         Response resp = Caculater(req); // 将计算任务的结果存放到Response里面去
  25.         // 3.3.序列化和编码响应
  26.         std::string echoStr;
  27.         resp.serialize(echoStr); // 序列化
  28.         //由于我们使用的是telnet来测试的,所以我们不编码了
  29.        // echoStr = encode(echoStr, echoStr.size()); // 编码
  30.         // 3.4.写入,发送返回值给输出缓冲区
  31.         connection_ptr->_outbuffer=echoStr;
  32.         std::cout<<connection_ptr->_outbuffer<<std::endl;
  33.     }
  34.     else // 客户端消息反序列化失败
  35.     {
  36.         printf("deserialize err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, deStatus);
  37.         return;
  38.     }
  39. }
  40. int main()
  41. {
  42.     std::unique_ptr<TcpServer> svr(new TcpServer(8877, DefaultOmMessage));
  43.     svr->Init();
  44.     svr->Loop();
  45. }
复制代码
我们测试一下 



可以啊!!! 接下来就是专门处置惩罚写变乱
3.9.写变乱的处置惩罚

  之前写服务器时,我们从来没处置惩罚过写变乱,写变乱和读变乱不太一样,关心读变乱是要常设置的,但写变乱一般都是就绪的,因为内核发送缓冲区大概率都是有空间的,如果每次都要让epoll帮我们关心读变乱,这着实是一种资源的浪费,因为大部分环境下,你send数据,都是会直接将应用层数据拷贝到内核缓冲区的,不会出现等候的环境,而recv就不太一样,recv在读取的时候,有大概数据还在网络内里,所以recv要等候的概率是比较高的,所以对于读变乱来说,经常都要将其设置到sock所关心的变乱聚集中。
        但写变乱并不是如许的,写变乱应该是偶尔设置到关心聚集中,好比你这次没把数据一次性发完,但你又没设置该sock关心写变乱,当下次写变乱就绪了,也就是内核发送缓冲区有空间了,epoll_wait也不会通知你,那你还怎么发送剩余数据啊,所以这个时候你就应该设置写变乱关心了,让epoll_wait帮你监督sock上的写变乱,以便于下次epoll_wait通知你时,你还能够继续发奉上次没发完的数据。
        这个时候大概有人会问,ET模式不是只会通知一次吗?如果我这次设置了写关心,但下次发送数据的时候,照旧没发送完毕(因为内核发送缓冲区大概没有剩余空间了),那背面ET模式是不是就不会通知我了呀,那我还怎么继续发送剩余的数据呢?ET模式在底层就绪的变乱状态发生变化时,还会再通知上层一次的,对于读变乱来说,当数据从无到有,从有到多状态发生变化时,ET就还会通知上层一次,对于写变乱来说,当内核发送缓冲区剩余空间从无到有,从有到多状态发生变化时,ET也还会通知上层一次,所以不用担心数据发送不完的题目产生,因为ET是会通知我们的。
        在循环外,我们只必要通过判定outbuffer是否为空的环境,来决定是否要设置写变乱关心,当数据发送完了那我们就取消对于写变乱的关心,不占用epoll的资源,如果数据没发送完,那就设置对于写变乱的关心,因为我们要保证下次写变乱就绪时,epoll_wait能够通知我们对写变乱举行处置惩罚。
   main.cc
  1. #include "tcpserver.hpp"
  2. #include <memory>
  3. #include "Serialization.hpp"
  4. void DefaultOmMessage(std::shared_ptr<Connection> connection_ptr)
  5. {
  6.     std::cout << "上层得到了数据:" << connection_ptr->_inbuffer << std::endl;
  7.     std::string inbuf = connection_ptr->_inbuffer;
  8.     size_t packageLen = inbuf.size();
  9.     //由于我们是使用telnet来测试的所以,我们就不解码了
  10.     /*
  11.     // 3.1.解码和反序列化客户端传来的消息
  12.     std::string package = decode(inbuf, &packageLen); // 解码
  13.     if (packageLen == 0)
  14.     {
  15.         printf("decode err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, packageLen);
  16.         // 报文不完整或有误
  17.     }
  18.     */
  19.     Request req;
  20.     bool deStatus = req.deserialize(inbuf); // 使用Request的反序列化,packsge内部各个成员已经有了数值
  21.     if (deStatus)                             // 获取消息反序列化成功
  22.     {
  23.         // 3.2.获取结构化的相应
  24.         Response resp = Caculater(req); // 将计算任务的结果存放到Response里面去
  25.         // 3.3.序列化和编码响应
  26.         std::string echoStr;
  27.         resp.serialize(echoStr); // 序列化
  28.         //由于我们使用的是telnet来测试的,所以我们不编码了
  29.        // echoStr = encode(echoStr, echoStr.size()); // 编码
  30.         // 3.4.写入,发送返回值给输出缓冲区
  31.         connection_ptr->_outbuffer=echoStr;
  32.         std::cout<<connection_ptr->_outbuffer<<std::endl;
  33.         
  34.         //发送
  35.         connection_ptr->_tcp_server_ptr->Sender(connection_ptr);//调用里面的方法来发送
  36.     }
  37.     else // 客户端消息反序列化失败
  38.     {
  39.         printf("deserialize err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, deStatus);
  40.         return;
  41.     }
  42.    
  43. }
复制代码
我们留意最后一句,我们直接把发生认为交给了Tcpserver类里的Sender函数,不过这个函数还没有写,我们来写一下!
   Tcpserver类里的Sender函数
  1. void Sender(std::shared_ptr<Connection> connection) // 使用shared_ptr管理Connection对象的生命周期  
  2. {  
  3.     // 无限循环,直到输出缓冲区为空或发生需要退出的错误  
  4.     while (true)  
  5.     {  
  6.         // 引用当前连接的输出缓冲区  
  7.         auto &outbuffer = connection->_outbuffer;  
  8.   
  9.         // 尝试发送数据,send函数返回发送的字节数,或者-1表示错误  
  10.         ssize_t n = send(connection->Getsock(), outbuffer.data(), outbuffer.size(), 0);  
  11.   
  12.         // 如果n大于0,表示部分或全部数据发送成功  
  13.         if (n > 0)  
  14.         {  
  15.             // 从输出缓冲区中移除已发送的数据  
  16.             outbuffer.erase(0, n);  
  17.   
  18.             // 如果输出缓冲区为空,则退出循环  
  19.             if (outbuffer.empty())  
  20.                 break;  
  21.         }  
  22.         // 如果n等于0,表示连接已关闭(对端执行了关闭操作),退出函数  
  23.         else if (n == 0)  
  24.         {  
  25.             return;  
  26.         }  
  27.         // 处理发送失败的情况  
  28.         else  
  29.         {  
  30.             // 根据errno的值判断错误类型  
  31.             if (errno == EWOULDBLOCK)  
  32.             {  
  33.                 // EWOULDBLOCK表示非阻塞模式下资源暂时不可用,可稍后重试,但这里选择直接退出循环  
  34.                 break;  
  35.             }  
  36.             else if (errno == EINTR)  
  37.             {  
  38.                 // EINTR表示操作被信号中断,可以安全地重新尝试  
  39.                 continue;  
  40.             }  
  41.             else  
  42.             {  
  43.                 // 打印错误信息,包含套接字描述符和客户端IP地址及端口  
  44.                 std::cout << "sockfd:" << connection->Getsock() << ",client:" << connection->_clientip << ":" << connection->_clientport << "send error" << std::endl;  
  45.   
  46.                 // 调用异常处理回调函数  
  47.                 Excepter(conection);  
  48.   
  49.                 // 退出循环  
  50.                 break;  
  51.             }  
  52.         }  
  53.     }  
  54. }
复制代码
但是这还不敷啊,万一我没写完数据呢?我们还必要举行写处置惩罚,这里我们就借助epoll机制来帮我们。
  1. // Sender函数负责通过给定的Connection对象发送数据。  
  2. // 它使用epoll机制来管理套接字的读写事件,根据发送情况调整对写事件的关注。  
  3. void Sender(std::shared_ptr<Connection> connection)  
  4. {  
  5.     // 引用Connection对象的输出缓冲区  
  6.     auto &outbuffer = connection->_outbuffer;  
  7.   
  8.     // 循环发送数据,直到输出缓冲区为空或发生错误  
  9.     while (true)  
  10.     {  
  11.         // 尝试发送数据  
  12.         ssize_t n = send(connection->Getsock(), outbuffer.data(), outbuffer.size(), 0); // 注意:使用.data()获取缓冲区首地址  
  13.   
  14.         // 如果发送成功(n > 0)  
  15.         if (n > 0)  
  16.         {  
  17.             // 从输出缓冲区中移除已发送的数据  
  18.             outbuffer.erase(0, n);  
  19.   
  20.             // 如果输出缓冲区为空,则退出循环  
  21.             if (outbuffer.empty())  
  22.                 break;  
  23.         }  
  24.         // 如果n == 0,表示连接已正常关闭(对端调用了close),退出函数  
  25.         else if (n == 0)  
  26.         {  
  27.             return;  
  28.         }  
  29.         // 处理发送失败的情况  
  30.         else  
  31.         {  
  32.             // 检查errno以确定错误类型  
  33.             if (errno == EWOULDBLOCK)  
  34.             {  
  35.                 // EWOULDBLOCK表示资源暂时不可用,通常发生在非阻塞模式下,退出循环  
  36.                 break;  
  37.             }  
  38.             else if (errno == EINTR)  
  39.             {  
  40.                 // EINTR表示操作被信号中断,继续循环尝试发送  
  41.                 continue;  
  42.             }  
  43.             else  
  44.             {  
  45.                 // 打印错误信息,并调用Connection对象的异常处理回调函数  
  46.                 std::cout << "sockfd:" << connection->Getsock() << ",client:" << connection->_clientip << ":" << connection->_clientport << "send error" << std::endl;  
  47.                 Excepter(conection);  
  48.                 break;  
  49.             }  
  50.         }  
  51.     }  
  52.   
  53.     // 根据输出缓冲区是否为空,调整对写事件的关注  
  54.     if (!outbuffer.empty())  
  55.     {  
  56.         // 如果输出缓冲区不为空,开启对写事件的关注  
  57.         EnableEvent(connection->Getsock(), true, true); // 同时关心读和写事件  
  58.     }  
  59.     else  
  60.     {  
  61.         // 如果输出缓冲区为空,关闭对写事件的关注,但保持对读事件的关注  
  62.         EnableEvent(connection->Getsock(), true, false); // 只关心读事件  
  63.     }  
  64. }  
  65.   
  66. // EnableEvent函数用于根据给定的参数,更新epoll事件监听器对套接字的事件关注。  
  67. void EnableEvent(int sock, bool readable, bool sendable)  
  68. {  
  69.     // 初始化events变量,用于存储要设置的事件标志  
  70.     uint32_t events = 0;  
  71.   
  72.     // 如果需要关注读事件,则设置EPOLLIN标志  
  73.     events |= (readable ? EPOLLIN : 0);  
  74.   
  75.     // 如果需要关注写事件,则设置EPOLLOUT标志  
  76.     events |= (sendable ? EPOLLOUT : 0);  
  77.   
  78.     // 总是设置EPOLLET标志,表示使用边缘触发模式(Edge Triggered)  
  79.     events |= EPOLLET;  
  80.   
  81.     // 调用epoll更新函数,修改对指定套接字的事件关注  
  82.     // 注意:_epoller_ptr应该是一个指向epoll事件监听器对象的指针,EPOLL_CTL_MOD表示修改现有事件  
  83.     _epoller_ptr->EpollUpDate(EPOLL_CTL_MOD, sock, events);  
  84. }
复制代码
我们测试一下,这能不能完成写的使命啊!

 






很好,显然是成功了!!!
3.10.异常变乱的处置惩罚 
        各人细致看看上面的代码就会知道,读写变乱出题目了之后,都是立马将错误和异常都传递给了Exceptrt函数!


        下面是异常变乱的处置惩罚方法,我们同一对全部异常变乱,都先将其从epoll模子中移除,然后关闭文件描述符,最后将conn从哈希表_connecions中移除。
  1. void Excepter(std::shared_ptr<Connection> conection)
  2.     {
  3.         std::cout << "Execpted ! fd:" << conection->Getsock() << std::endl;
  4.        //1.将文件描述符从epoll模型里面移除来
  5.        _epoller_ptr->EpollUpDate(EPOLL_CTL_DEL,conection->Getsock(),0);
  6.         //2.关闭异常的文件描述符
  7.         close(conection->Getsock());
  8.         //3.从unordered_map中删除
  9.         _connections.erase(conection->Getsock());
  10.     }
复制代码

我们来测试一下:
我们毗连我们的服务器,然退却出,服务器就会打印下面这些内容



很显然失败了!
 为什么呢?就是connection指针的生命周期有题目
          值得留意的是,connnection指针指向的毗连布局体空间,必须由我们自己释放,有人说,为什么啊?你哈希表不是都已经erase了么?为什么还要步伐员自己再delete毗连布局体空间呢?  

这里要给各人阐明一点的是,全部的容器在erase的时候,都只释放容器自己所new出来的空间,像哈希表如许的容器,它会new一个存储键值对的节点空间,节点内里存储着conn指针和sockfd,当调用哈希表的erase时,哈希表只会释放它自己new出来的节点空间,至于这个节点空间内里存储了一个Connection类型的指针,而且这个指针变量指向一个布局体空间,这些事情哈希表才不会管你呢,容器只会释放他自己开辟的空间,哈希表是vector挂单链表的方式来实现的。
所以我们要自己手动释放conn指向的空间,如果你不想自己手动释放conn指向的堆空间资源,则可以存储智能指针对象,如许在哈希表erase时,就会释放智能指针对象的空间,从而自动调用Connection类的析构函数。

  如许搞起来着实照旧很麻烦的,所以我们就自己手动释放就好了,如果不手动释放那就会造成内存泄露。
由于实现起来比较麻烦,必要修改代码的许多地方,我就不改了,但是最核心的部分就像下面如许子


 至此,我们的浅易版本的单Reactor单历程版本的TCP服务器就算完成了!
3.11.源代码

   tcpserver.hpp
  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <functional>
  5. #include <memory>
  6. #include <unordered_map>
  7. #include "Socket.hpp"
  8. #include "Epoller.hpp"
  9. #include "nocopy.hpp"
  10. #include <sys/socket.h>
  11. #include <sys/types.h>
  12. class Connection;
  13. using func_t = std::function<void(std::shared_ptr<Connection>)>;
  14. class TcpServer;
  15. class Connection
  16. {
  17. public:
  18.     Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr) : _sock(sock), _tcp_server_ptr(tcp_server_ptr)
  19.     {
  20.     }
  21.     ~Connection()
  22.     {
  23.     }
  24.     void setcallback(func_t recv_cb, func_t send_cb, func_t except_cb)
  25.     {
  26.         _recv_cb = recv_cb;
  27.         _send_cb = send_cb;
  28.         _except_cb = except_cb;
  29.     }
  30.     int Getsock()
  31.     {
  32.         return _sock;
  33.     }
  34.     void Append(std::string info)
  35.     {
  36.         _inbuffer += info;
  37.     }
  38. public:
  39.     int _sock;
  40.     std::string _inbuffer; // 这里来当输入缓冲区,但是这里是有缺点的,它不能处理二进制流
  41.     std::string _outbuffer;
  42. public:
  43.     func_t _recv_cb;                            // 读回调函数
  44.     func_t _send_cb;                            // 写回调函数
  45.     func_t _except_cb;                          //
  46.     std::shared_ptr<TcpServer> _tcp_server_ptr; // 添加一个回指指针
  47.     uint16_t _clientport;
  48.     std::string _clientip;
  49. };
  50. class TcpServer : public nocopy
  51. {
  52.     static const int num = 64;
  53. public:
  54.     TcpServer(uint16_t port, func_t OnMessage) : _port(port),
  55.                                                  _listensock_ptr(new Sock()),
  56.                                                  _epoller_ptr(new Epoller()),
  57.                                                  _quit(true),
  58.                                                  _OnMessage(OnMessage)
  59.     {
  60.     }
  61.     ~TcpServer()
  62.     {
  63.     }
  64.     // 设置文件描述符为非阻塞
  65.     int set_non_blocking(int fd)
  66.     {
  67.         int flags = fcntl(fd, F_GETFL, 0);
  68.         if (flags == -1)
  69.         {
  70.             perror("fcntl: get flags");
  71.             return -1;
  72.         }
  73.         if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
  74.         {
  75.             perror("fcntl: set non-blocking");
  76.             return -1;
  77.         }
  78.         return 0;
  79.     }
  80.     void Init()
  81.     {
  82.         _listensock_ptr->Socket();
  83.         set_non_blocking(_listensock_ptr->Fd()); // 设置listen文件描述符为非阻塞
  84.         _listensock_ptr->Bind(_port);
  85.         _listensock_ptr->Listen();
  86.         AddConnection(_listensock_ptr->Fd(), (EPOLLIN | EPOLLET),
  87.                       std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // 暂时设置成nullptr
  88.     }
  89.     // 连接管理器
  90.     void Accepter(std::shared_ptr<Connection> conection)
  91.     {
  92.         while (1)
  93.         {
  94.             struct sockaddr_in peer;
  95.             socklen_t len = sizeof(peer);
  96.             int sock = accept(conection->Getsock(), (sockaddr *)&peer, &len);
  97.             if (sock > 0)
  98.             {
  99.                 // 获取客户端信息
  100.                 uint16_t clientport = ntohs(peer.sin_port);
  101.                 char buffer[128];
  102.                 inet_ntop(AF_INET, &(peer.sin_addr), buffer, sizeof(buffer));
  103.                 std::cout << "get a new client from:" << buffer << conection->Getsock() << std::endl;
  104.                 set_non_blocking(sock); // 设置非阻塞
  105.                 AddConnection(sock, EPOLLIN,
  106.                               std::bind(&TcpServer::Recver, this, std::placeholders::_1),
  107.                               std::bind(&TcpServer::Sender, this, std::placeholders::_1),
  108.                               std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
  109.                               buffer, clientport);
  110.             }
  111.             else
  112.             {
  113.                 if (errno == EWOULDBLOCK)
  114.                     break;
  115.                 else if (errno == EINTR)
  116.                     break;
  117.                 else
  118.                     break;
  119.             }
  120.         }
  121.     }
  122.     // 事件管理器
  123.     void Recver(std::shared_ptr<Connection> conection)
  124.     {
  125.         int sock = conection->Getsock();
  126.         while (1)
  127.         {
  128.             char buffer[128];
  129.             memset(buffer, 0, sizeof(buffer));
  130.             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取
  131.             if (n > 0)                                             // 成功了!!
  132.             {
  133.                 conection->Append(buffer); // 把读取的数据放到Connection对象的输入缓冲区里面
  134.             }
  135.             else if (n == 0) // 客户端
  136.             {
  137.                 std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "quit" << std::endl;
  138.                 Excepter(conection);
  139.             }
  140.             else
  141.             {
  142.                 if (errno == EWOULDBLOCK)
  143.                     break;
  144.                 else if (errno == EINTR)
  145.                     break;
  146.                 else
  147.                 {
  148.                     std::cout << "sockfd:" << sock << ",client:" << conection->_clientip << ":" << conection->_clientport << "recv err" << std::endl;
  149.                     Excepter(conection);
  150.                 }
  151.             }
  152.             _OnMessage(conection); // 将读取的数据交给上层处理
  153.         }
  154.     }
  155.     void Sender(std::shared_ptr<Connection> conection)
  156.     {
  157.         std::string outbuffer = conection->_outbuffer;
  158.         while (1)
  159.         {
  160.             ssize_t n = send(conection->Getsock(), outbuffer.data(), outbuffer.size(), 0);
  161.             if (n > 0) // 发成功了
  162.             {
  163.                 outbuffer.erase(0, n);
  164.                 if (outbuffer.empty())
  165.                     break;
  166.             }
  167.             else if (n == 0)
  168.             {
  169.                 return;
  170.             }
  171.             else
  172.             {
  173.                 if (errno == EWOULDBLOCK)
  174.                     break;
  175.                 else if (errno == EINTR)
  176.                     continue;
  177.                 else
  178.                 {
  179.                     std::cout << "sockfd:" << conection->Getsock() << ",client:" << conection->_clientip << ":" << conection->_clientport << "send error" << std::endl;
  180.                     Excepter(conection);
  181.                     break;
  182.                 }
  183.             }
  184.         }
  185.         if (!outbuffer.empty())
  186.         {
  187.             // 开启对写事件的关心
  188.             EnableEvent(conection->Getsock(), true, true); // 关心读写
  189.         }
  190.         else
  191.         {
  192.             // 关闭对写事件的关心
  193.             EnableEvent(conection->Getsock(), true, false); // 关心读,不关心写
  194.         }
  195.     }
  196.     void EnableEvent(int sock, bool readable, bool sendable)
  197.     {
  198.         uint32_t events = 0;
  199.         events |= (readable ? EPOLLIN : 0) | (sendable ? EPOLLOUT : 0) | EPOLLET;
  200.         _epoller_ptr->EpollUpDate(EPOLL_CTL_MOD, sock, events);
  201.     }
  202.     void Excepter(std::shared_ptr<Connection> conection)
  203.     {
  204.         if (!conection)
  205.         {
  206.             // Connection 对象可能已被销毁,无需进一步操作
  207.             std::cout << "Connection already destroyed, no further action taken." << std::endl;
  208.             return;
  209.         }
  210.         std::cout << "Execpted ! fd:" << conection->Getsock() << std::endl;
  211.         // 1.将文件描述符从epoll模型里面移除来
  212.         _epoller_ptr->EpollUpDate(EPOLL_CTL_DEL, conection->Getsock(), 0);
  213.         // 2.关闭异常的文件描述符
  214.         close(conection->Getsock());
  215.         // 3.从unordered_map中删除
  216.         _connections.erase(conection->Getsock());
  217.     }
  218.     void PrintConnection()
  219.     {
  220.         std::cout << "_connections fd list: ";
  221.         for (auto &connection : _connections)
  222.         {
  223.             std::cout << connection.second->Getsock() << " ";
  224.             std::cout << connection.second->_inbuffer;
  225.         }
  226.         std::cout << std::endl;
  227.     }
  228.     void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb,
  229.                        std::string clientip = "0.0.0.0", uint16_t clientport = 0)
  230.     {
  231.         // 1.给listen套接字创建一个Connection对象
  232.         std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sock, std::shared_ptr<TcpServer>(this)); // 创建Connection对象
  233.         new_connection->setcallback(recv_cb, send_cb, except_cb);
  234.         new_connection->_clientip = clientip;
  235.         new_connection->_clientport = clientport;
  236.         // 2.添加到_connections里面去
  237.         _connections.insert(std::make_pair(sock, new_connection));
  238.         // 将listen套接字添加到epoll中->将listensock和他关心的事件,添加到内核的epoll模型中的红黑树里面
  239.         // 3.将listensock添加到红黑树
  240.         _epoller_ptr->EpollUpDate(EPOLL_CTL_ADD, sock, event); // 注意这里的EPOLLET设置了ET模式
  241.         std::cout << "add a new connection success,sockfd:" << sock << std::endl;
  242.     }
  243.     bool IsConnectionSafe(int fd)
  244.     {
  245.         auto iter = _connections.find(fd);
  246.         if (iter == _connections.end())
  247.         {
  248.             return false;
  249.         }
  250.         else
  251.         {
  252.             return true;
  253.         }
  254.     }
  255.     void Dispatcher() // 事件派发器
  256.     {
  257.         int n = _epoller_ptr->EpollerWait(revs, num); // 获取已经就绪的事件
  258.         for (int i = 0; i < num; i++)
  259.         {
  260.             uint32_t events = revs[i].events;
  261.             int sock = revs[i].data.fd;
  262.             // 如果出现异常,统一转发为读写问题,只需要处理读写就行
  263.             if (events & EPOLLERR) // 出现错误了
  264.             {
  265.                 events |= (EPOLLIN | EPOLLOUT);
  266.             }
  267.             if (events & EPOLLHUP)
  268.             {
  269.                 events |= (EPOLLIN | EPOLLOUT);
  270.             }
  271.             // 只需要处理读写就行
  272.             if (events & EPOLLIN && IsConnectionSafe(sock)) // 读事件就绪
  273.             {
  274.                 if (_connections[sock]->_recv_cb)
  275.                     _connections[sock]->_recv_cb(_connections[sock]);
  276.             }
  277.             if (events & EPOLLOUT && IsConnectionSafe(sock)) // 写事件就绪
  278.             {
  279.                 if (_connections[sock]->_send_cb)
  280.                     _connections[sock]->_send_cb(_connections[sock]);
  281.             }
  282.         }
  283.     }
  284.     void Loop()
  285.     {
  286.         _quit = false;
  287.         while (!_quit)
  288.         {
  289.             Dispatcher();
  290.             PrintConnection();
  291.         }
  292.         _quit = true;
  293.     }
  294. private:
  295.     uint16_t _port;
  296.     std::shared_ptr<Epoller> _epoller_ptr;
  297.     std::unordered_map<int, std::shared_ptr<Connection>> _connections;
  298.     std::shared_ptr<Sock> _listensock_ptr;
  299.     bool _quit;
  300.     struct epoll_event revs[num]; // 专门用来处理事件的
  301.     // 上层处理数据
  302.     func_t _OnMessage; // 将数据交给上层
  303. };
复制代码

   Epoller.hpp
  1. v#pragma once
  2. #include <iostream>
  3. #include <sys/epoll.h>
  4. #include <unistd.h>
  5. #include <cerrno>
  6. #include "nocopy.hpp"
  7. class Epoller : public nocopy
  8. {
  9.     static const int size = 128;
  10. public:
  11.     Epoller()
  12.     {
  13.         _epfd = epoll_create(size);
  14.         if (_epfd == -1)
  15.         {
  16.             perror("epoll_creat error");
  17.         }
  18.         else
  19.         {
  20.             printf("epoll_creat successful:%d\n", _epfd);
  21.         }
  22.     }
  23.     ~Epoller()
  24.     {
  25.         if (_epfd > 0)
  26.         {
  27.             close(_epfd);
  28.         }
  29.     }
  30.     int EpollerWait(struct epoll_event revents[],int num)
  31.     {
  32.         int n=epoll_wait(_epfd,revents,num,3000);
  33.         return n;
  34.     }
  35.     int EpollUpDate(int oper,int sock,uint16_t event)
  36.     {
  37.         int n;
  38.         if(oper==EPOLL_CTL_DEL)//将该事件从epoll红黑树里面删除
  39.         {
  40.             n=epoll_ctl(_epfd,oper,sock,nullptr);
  41.              if(n!=0)
  42.             {
  43.                 printf("delete epoll_ctl error");
  44.             }
  45.         }
  46.         else{//添加和修改,即EPOLL_CTL_MOD和EPOLL_CTL_ADD
  47.             struct epoll_event ev;
  48.             ev.events=event;
  49.             ev.data.fd=sock;//方便我们知道是哪个fd就绪了
  50.             n=epoll_ctl(_epfd,oper,sock,&ev);
  51.             if(n!=0)
  52.             {
  53.                 perror("add epoll_ctl error");
  54.             }
  55.         }
  56.         return n;
  57.     }
  58. private:
  59.     int _epfd;
  60. };
复制代码

   nocopy.hpp
  1. #pragma once  
  2.   
  3. class nocopy  
  4. {  
  5. public:  
  6.     // 允许使用默认构造函数(由编译器自动生成)  
  7.     nocopy() = default;   
  8.   
  9.     // 禁用拷贝构造函数,防止通过拷贝来创建类的实例  
  10.     nocopy(const nocopy&) = delete;   
  11.   
  12.     // 禁用赋值运算符,防止类的实例之间通过赋值操作进行内容复制  
  13.     nocopy& operator=(const nocopy&) = delete;   
  14. };
复制代码

   Serialization.hpp
  1. #pragma
  2. #define CRLF "\t"               // 分隔符
  3. #define CRLF_LEN strlen(CRLF)   // 分隔符长度
  4. #define SPACE " "               // 空格
  5. #define SPACE_LEN strlen(SPACE) // 空格长度
  6. #define OPS "+-*/%"             // 运算符
  7. #include <iostream>
  8. #include <string>
  9. #include <cstring>
  10. #include<assert.h>
  11. //参数len为in的长度,是一个输出型参数。如果为0代表err
  12. std::string decode(std::string& in,size_t*len)
  13. {
  14.     assert(len);//如果长度为0是错误的
  15.    
  16.     // 1.确认in的序列化字符串完整(分隔符)
  17.     *len=0;
  18.     size_t pos = in.find(CRLF);//查找\t第一次出现时的下标
  19.     //查找不到,err
  20.     if(pos == std::string::npos){
  21.         return "";//返回空串
  22.     }
  23.    
  24.     // 2.有分隔符,判断长度是否达标
  25.     // 此时pos下标正好就是标识大小的字符长度
  26.     std::string inLenStr = in.substr(0,pos);//从下标0开始一直截取到第一个\t之前
  27.     //到这里我们要明白,我们这上面截取的是最开头的长度,也就是说,我们截取到的一定是个数字,这个是我们序列化字符的长度
  28.    
  29.     size_t inLen = atoi(inLenStr.c_str());//把截取的这个字符串转int,inLen就是序列化字符的长度
  30.     //传入的字符串的长度 - 第一个\t前面的字符数 - 2个\t
  31.     size_t left = in.size() - inLenStr.size()- 2*CRLF_LEN;//原本预计的序列化字符串长度
  32.     if(left<inLen){//真实的序列化字符串长度和预计的字符串长度进行比较
  33.         return ""; //剩下的长度(序列化字符串的长度)没有达到标明的长度
  34.     }
  35.     // 3.走到此处,字符串完整,开始提取序列化字符串
  36.     std::string ret = in.substr(pos+CRLF_LEN,inLen);//从pos+CRLF_LEN下标开始读取inLen个长度的字符串——即序列化字符串
  37.     *len = inLen;
  38.     // 4.因为in中可能还有其他的报文(下一条)
  39.     // 所以需要把当前的报文从in中删除,方便下次decode,避免二次读取
  40.     size_t rmLen = inLenStr.size() + ret.size() + 2*CRLF_LEN;//长度+2个\t+序列字符串的长度
  41.     in.erase(0,rmLen);//移除从索引0开始长度为rmLen的字符串
  42.     // 5.返回
  43.     return ret;
  44. }
  45. //编码不需要修改源字符串,所以const。参数len为in的长度
  46. std::string encode(const std::string& in,size_t len)
  47. {
  48.     std::string ret = std::to_string(len);//将长度转为字符串添加在最前面,作为标识
  49.     ret+=CRLF;
  50.     ret+=in;
  51.     ret+=CRLF;
  52.     return ret;
  53. }
  54. class Request//客户端使用的
  55. {
  56. public:
  57.     // 将用户的输入转成内部成员
  58.     // 用户可能输入x+y,x+ y,x +y,x + y等等格式
  59.     // 提前修改用户输入(主要还是去掉空格),提取出成员
  60.     Request()
  61.     {
  62.       
  63.     }
  64.    
  65.     // 删除输入中的空格
  66.     void rmSpace(std::string &in)
  67.     {
  68.         std::string tmp;
  69.         for (auto e : in)
  70.         {
  71.             if (e != ' ')
  72.             {
  73.                 tmp += e;
  74.             }
  75.         }
  76.         in = tmp;
  77.     }
  78.     // 序列化 (入参应该是空的,会返回一个序列化字符串)
  79.     void serialize(std::string &out)//这个是客户端在发送消息给服务端时使用的,在这之后要先编码,才能发送出去
  80.     {
  81.         // x + y
  82.         out.clear(); // 序列化的入参是空的
  83.         out += std::to_string(_x);
  84.         out += SPACE;
  85.         out += _ops; // 操作符不能用tostring,会被转成ascii
  86.         out += SPACE;
  87.         out += std::to_string(_y);
  88.         // 不用添加分隔符(这是encode要干的事情)
  89.     }
  90.     //序列化之后应该要编码,去加个长度
  91.     // 反序列化(解开
  92.     bool deserialize(const std::string &in)//这个是服务端接收到客户端发来的消息后使用的,在这之前要先解码
  93.     {
  94.         // x + y 需要取出x,y和操作符
  95.         size_t space1 = in.find(SPACE);  // 第一个空格
  96.         if (space1 == std::string::npos) // 没找到
  97.         {
  98.             return false;
  99.         }
  100.         size_t space2 = in.rfind(SPACE); // 第二个空格
  101.         if (space2 == std::string::npos) // 没找到
  102.         {
  103.             return false;
  104.         }
  105.         // 两个空格都存在,开始取数据
  106.         std::string dataX = in.substr(0, space1);
  107.         std::string dataY = in.substr(space2 + SPACE_LEN); // 默认取到结尾
  108.         std::string op = in.substr(space1 + SPACE_LEN, space2 - (space1 + SPACE_LEN));
  109.         if (op.size() != 1)
  110.         {
  111.             return false; // 操作符长度有问题
  112.         }
  113.         // 没问题了,转内部成员
  114.         _x = atoi(dataX.c_str());
  115.         _y = atoi(dataY.c_str());
  116.         _ops = op[0];
  117.         return true;
  118.     }
  119. public:
  120.     int _x;
  121.     int _y;
  122.     char _ops;
  123. };
  124. class Response // 服务端必须回应
  125. {
  126.     public:
  127.     Response(int code = 0, int result = 0)
  128.         : _exitCode(code), _result(result)
  129.     {
  130.     }
  131.     // 序列化
  132.     void serialize(std::string &out)//这个是服务端发送消息给客户端使用的,使用之后要编码
  133.     {
  134.         // code ret
  135.         out.clear();
  136.         out += std::to_string(_exitCode);
  137.         out += SPACE;
  138.         out += std::to_string(_result);
  139.         out += CRLF;
  140.     }
  141.     // 反序列化
  142.     bool deserialize(const std::string &in)//这个是客户端接收服务端消息后使用的,使用之前要先解码
  143.     {
  144.         // 只有一个空格
  145.         size_t space = in.find(SPACE);  // 寻找第一个空格的下标
  146.         if (space == std::string::npos) // 没找到
  147.         {
  148.             return false;
  149.         }
  150.         std::string dataCode = in.substr(0, space);
  151.         std::string dataRes = in.substr(space + SPACE_LEN);
  152.         _exitCode = atoi(dataCode.c_str());
  153.         _result = atoi(dataRes.c_str());
  154.         return true;
  155.     }
  156. public:
  157.     int _exitCode; // 计算服务的退出码
  158.     int _result;   // 结果
  159. };
  160. Response Caculater(const Request& req)
  161. {
  162.     Response resp;//构造函数中已经指定了exitcode为0
  163.     switch (req._ops)
  164.     {
  165.     case '+':
  166.         resp._result = req._x + req._y;
  167.         break;
  168.     case '-':
  169.         resp._result = req._x - req._y;
  170.         break;
  171.     case '*':
  172.         resp._result = req._x * req._y;
  173.         break;
  174.     case '%':
  175.     {
  176.         if(req._y == 0)
  177.         {
  178.             resp._exitCode = -1;//取模错误
  179.             break;
  180.         }
  181.         resp._result = req._x % req._y;//取模是可以操作负数的
  182.         break;
  183.     }
  184.     case '/':
  185.     {
  186.         if(req._y == 0)
  187.         {
  188.             resp._exitCode = -2;//除0错误
  189.             break;
  190.         }
  191.         resp._result = req._x / req._y;//取模是可以操作负数的
  192.         break;
  193.     }
  194.     default:
  195.         resp._exitCode = -3;//操作符非法
  196.         break;
  197.     }
  198.     return resp;
  199. }
复制代码

    main.cc
   
  1. #include "tcpserver.hpp"
  2. #include <memory>
  3. #include "Serialization.hpp"
  4. void DefaultOmMessage(std::shared_ptr<Connection> connection_ptr)
  5. {
  6.     std::cout << "上层得到了数据:" << connection_ptr->_inbuffer << std::endl;
  7.     std::string inbuf = connection_ptr->_inbuffer;
  8.     size_t packageLen = inbuf.size();
  9.     //由于我们是使用telnet来测试的所以,我们就不解码了
  10.     /*
  11.     // 3.1.解码和反序列化客户端传来的消息
  12.     std::string package = decode(inbuf, &packageLen); // 解码
  13.     if (packageLen == 0)
  14.     {
  15.         printf("decode err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, packageLen);
  16.         // 报文不完整或有误
  17.     }
  18.     */
  19.     Request req;
  20.     bool deStatus = req.deserialize(inbuf); // 使用Request的反序列化,packsge内部各个成员已经有了数值
  21.     if (deStatus)                             // 获取消息反序列化成功
  22.     {
  23.         // 3.2.获取结构化的相应
  24.         Response resp = Caculater(req); // 将计算任务的结果存放到Response里面去
  25.         // 3.3.序列化和编码响应
  26.         std::string echoStr;
  27.         resp.serialize(echoStr); // 序列化
  28.         //由于我们使用的是telnet来测试的,所以我们不编码了
  29.        // echoStr = encode(echoStr, echoStr.size()); // 编码
  30.         // 3.4.写入,发送返回值给输出缓冲区
  31.         connection_ptr->_outbuffer=echoStr;
  32.         std::cout<<connection_ptr->_outbuffer<<std::endl;
  33.         connection_ptr->_tcp_server_ptr->Sender(connection_ptr);//调用里面的方法来发送
  34.     }
  35.     else // 客户端消息反序列化失败
  36.     {
  37.         printf("deserialize err: %s[%d] status: %d", connection_ptr->_clientip, connection_ptr->_clientport, deStatus);
  38.         return;
  39.     }
  40.    
  41. }
  42. int main()
  43. {
  44.     std::unique_ptr<TcpServer> svr(new TcpServer(8877, DefaultOmMessage));
  45.     svr->Init();
  46.     svr->Loop();
  47. }
复制代码
3.12.总结


        Reactor主要围绕变乱派发和自动反应睁开的,就好比毗连请求到来,epoll_wait提醒步伐员就绪的变乱到来,应该尽快处置惩罚,则与就绪变乱相干联的sock会对应着一个connection布局体,这个布局体我觉得就是反应堆模式的英华所在,无论是什么样就绪的变乱,每个sock都会有对应的回调方法,所以处置惩罚就绪的变乱很容易,直接回调connection内的对应方法即可,是读变乱就调用读方法,是写变乱就调用写方法,是异常变乱,则在读方法或写方法中处置惩罚IO的同时,趁便处置惩罚掉异常变乱。
        所以我感觉Reactor就像一个化学反应堆,你向这个反应堆内里扔一些毗连请求,或者网络数据,则这个反应堆会自动匹配相对应的处置惩罚机制来处置惩罚到来的变乱,很方便,同时由于ET模式和EPOLL,这就让Reactor在处置惩罚高并发毗连时,展现出了不俗的实力。









免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

前进之路

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表