Linux网络编程——基于ET模式下的Reactor

打印 上一主题 下一主题

主题 1607|帖子 1607|积分 4821

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
一、前言

上篇文章中我们已经讲解了多路转接剩下的两个接口:pollepoll,并且知道了epoll的两种工作模式分别是 LT模式ET模式,下来我们就实现的是一个简洁版的 Reactor,即半同步半异步I/O,在linux网络中,最常用最频繁的一种网络IO计划模式
    Reactor计划模式的概念:
Reactor计划模式是一种为处理惩罚并发操纵的事件处理惩罚模式。它将事件的检测和相应分离,使得事件的监听者可以注册对特定类型事件的爱好,并在这些事件发生时得到通知。这种模式非常得当于需要处理惩罚大量并发连接的应用程序,比如Web服务器。
  二、代码

1、首先对于网络来说必须有套接字的创建、绑定、监听和连接,将它们封装起来
  1. //Sock.hpp
  2. #pragma once
  3. #include <iostream>
  4. #include <string>
  5. #include <cstring>
  6. #include <unistd.h>
  7. #include <sys/types.h>
  8. #include <sys/socket.h>
  9. #include <netinet/in.h>
  10. #include <arpa/inet.h>
  11. #include "Log.hpp"
  12. #include "Err.hpp"
  13. const static int backlog = 32;
  14. const static int defaultsock = -1;
  15. class Sock
  16. {
  17. public:
  18.     Sock():listensock_(defaultsock)
  19.     {}
  20.     ~Sock()
  21.     {
  22.         if(listensock_ != defaultsock) close(listensock_);
  23.     }
  24. public:
  25.     void Socket()
  26.     {
  27.         // 1. 创建socket文件套接字对象
  28.         listensock_ = socket(AF_INET, SOCK_STREAM, 0);
  29.         if (listensock_ < 0)
  30.         {
  31.             logMessage(FATAL, "create socket error");
  32.             exit(SOCKET_ERR);
  33.         }
  34.         logMessage(NORMAL, "create socket success: %d", listensock_);
  35.         int opt = 1;
  36.         setsockopt(listensock_, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
  37.     }
  38.     void Bind(int port)
  39.     {
  40.         // 2. bind绑定自己的网络信息
  41.         struct sockaddr_in local;
  42.         memset(&local, 0, sizeof(local));
  43.         local.sin_family = AF_INET;
  44.         local.sin_port = htons(port);
  45.         local.sin_addr.s_addr = INADDR_ANY;
  46.         if (bind(listensock_, (struct sockaddr *)&local, sizeof(local)) < 0)
  47.         {
  48.             logMessage(FATAL, "bind socket error");
  49.             exit(BIND_ERR);
  50.         }
  51.         logMessage(NORMAL, "bind socket success");
  52.     }
  53.     void Listen()
  54.     {
  55.         // 3. 设置socket 为监听状态
  56.         if (listen(listensock_, backlog) < 0)
  57.         {
  58.             logMessage(FATAL, "listen socket error");
  59.             exit(LISTEN_ERR);
  60.         }
  61.         logMessage(NORMAL, "listen socket success");
  62.     }
  63.     int Accept(std::string *clientip, uint16_t *clientport, int *err)
  64.     {
  65.         struct sockaddr_in peer;
  66.         socklen_t len = sizeof(peer);
  67.         int sock = accept(listensock_, (struct sockaddr *)&peer, &len);
  68.         *err = errno;
  69.         if (sock < 0){}
  70.             // logMessage(ERROR, "accept error, next");
  71.         else
  72.         {
  73.             // logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
  74.             *clientip = inet_ntoa(peer.sin_addr);
  75.             *clientport = ntohs(peer.sin_port);
  76.         }
  77.         return sock;
  78.     }
  79.     int Fd()
  80.     {
  81.         return listensock_;
  82.     }
  83.     void Close()
  84.     {
  85.         if(listensock_ != defaultsock) close(listensock_);
  86.     }
  87. private:
  88.     int listensock_;
  89. };
复制代码
2、接着我们还想要将日记文件带进来
  1. //日志文件,生成和输出日志信息
  2. //Log.hpp
  3. #pragma once
  4. #include <iostream>
  5. #include <string>
  6. #include <cstdarg>
  7. #include <ctime>
  8. #include <unistd.h>
  9. #define DEBUG   0
  10. #define NORMAL  1
  11. #define WARNING 2
  12. #define ERROR   3
  13. #define FATAL   4
  14. const char * to_levelstr(int level)
  15. {
  16.     switch(level)
  17.     {
  18.         case DEBUG : return "DEBUG";
  19.         case NORMAL: return "NORMAL";
  20.         case WARNING: return "WARNING";
  21.         case ERROR: return "ERROR";
  22.         case FATAL: return "FATAL";
  23.         default : return nullptr;
  24.     }
  25. }
  26. void logMessage(int level, const char *format, ...)
  27. {
  28. #define NUM 1024
  29.     char logprefix[NUM];//用于存储日志前缀,包含日志级别,时间戳和进程PID
  30.     snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",//格式化输出,并将结果按格式存储到指定的字符数组中,
  31.         to_levelstr(level), (long int)time(nullptr), getpid());//这一行是可变数量的参数
  32.     char logcontent[NUM];//用于存储日志内容
  33.     va_list arg;
  34.     va_start(arg, format);//初始化arg指向第一个可选参数
  35.     vsnprintf(logcontent, sizeof(logcontent), format, arg);
  36.     std::cout << logprefix << logcontent << std::endl;
  37. }
复制代码
3、还有对错误代码举行定义
  1. //Err.hpp
  2. #pragma once
  3. #include <iostream>
  4. //枚举错误
  5. enum
  6. {
  7.     USAGE_ERR = 1,//参数使用不当
  8.     SOCKET_ERR,//创建套接字失败
  9.     BIND_ERR,//绑定套接字失败
  10.     LISTEN_ERR,//设置监听失败
  11.     EPOLL_CREATE_ERR//创建epoll失败
  12. };
复制代码
4、我们要知道ET模式下,我们必须将文件描述符设置为非壅闭的,以是我们还需要一个设置文件描述符非壅闭接口
  1. #pragma once
  2. #include <iostream>
  3. #include <unistd.h>
  4. #include <fcntl.h>
  5. //给文件描述符设置非阻塞模式
  6. class Util
  7. {
  8. public:
  9.     static bool SetNonBlock(int fd)
  10.     {
  11.         int fl = fcntl(fd, F_GETFL);
  12.         if (fl < 0) return false;
  13.         fcntl(fd, F_SETFL, fl | O_NONBLOCK);
  14.         return true;
  15.     }
  16. };
复制代码
5、接着想在网络中添加的网络计算器的业务逻辑,为了方便对转发的数据大概报文举行完整读取,我们还需要订定传输的协议
  1. //制定数据的传输格式,即协议
  2. //Protocol.hpp
  3. #pragma once
  4. #include <iostream>
  5. #include <cstring>
  6. #include <string>
  7. #include <sys/types.h>
  8. #include <sys/socket.h>
  9. #include <jsoncpp/json/json.h>
  10. #define SEP " "
  11. #define SEP_LEN strlen(SEP) // 不敢使用sizeof()
  12. #define LINE_SEP "\r\n"
  13. #define LINE_SEP_LEN strlen(LINE_SEP) // 不敢使用sizeof()
  14. enum
  15. {
  16.     OK = 0,
  17.     DIV_ZERO,
  18.     MOD_ZERO,
  19.     OP_ERROR
  20. };
  21. // "x op y" -> "content_len"\r\n"x op y"\r\n
  22. // "exitcode result" -> "content_len"\r\n"exitcode result"\r\n
  23. std::string enLength(const std::string &text)//encode编码,将文本內容加工成上面的形式
  24. {
  25.     std::string send_string = std::to_string(text.size());
  26.     send_string += LINE_SEP;
  27.     send_string += text;
  28.     send_string += LINE_SEP;
  29.     return send_string;
  30. }
  31. // "content_len"\r\n"exitcode result"\r\n
  32. bool deLength(const std::string &package, std::string *text)//decode解码
  33. {
  34.     auto pos = package.find(LINE_SEP);
  35.     if (pos == std::string::npos)//没有找到\r\n
  36.         return false;
  37.     std::string text_len_string = package.substr(0, pos);//获取文本长度
  38.     int text_len = std::stoi(text_len_string);//从字符串转成整型
  39.     *text = package.substr(pos + LINE_SEP_LEN, text_len);
  40.     return true;
  41. }
  42. // 没有人规定我们网络通信的时候,只能有一种协议!!
  43. // 我们怎么让系统知道我们用的是哪一种协议呢??
  44. // "content_len"\r\n"协议编号"\r\n"x op y"\r\n
  45. class Request//请求
  46. {
  47. public:
  48.     Request() : x(0), y(0), op(0)//默认构造函数
  49.     {
  50.     }
  51.     Request(int x_, int y_, char op_) : x(x_), y(y_), op(op_)//构造函数
  52.     {
  53.     }
  54.     // 1. 自己写
  55.     // 2. 用现成的
  56.     bool serialize(std::string *out)//序列化
  57.     {
  58. #ifdef MYSELF//自己的序列化
  59.         *out = "";
  60.         // 结构化 -> "x op y";
  61.         std::string x_string = std::to_string(x);
  62.         std::string y_string = std::to_string(y);
  63.         *out = x_string;
  64.         *out += SEP;
  65.         *out += op;
  66.         *out += SEP;
  67.         *out += y_string;
  68. #else
  69.         Json::Value root;
  70.         root["first"] = x;
  71.         root["second"] = y;
  72.         root["oper"] = op;
  73.         Json::FastWriter writer;
  74.         // Json::StyledWriter writer;
  75.         *out = writer.write(root);
  76. #endif
  77.         return true;
  78.     }
  79.     // "x op yyyy";
  80.     bool deserialize(const std::string &in)//反序列化
  81.     {
  82. #ifdef MYSELF
  83.         // "x op y" -> 结构化
  84.         auto left = in.find(SEP);
  85.         auto right = in.rfind(SEP);
  86.         if (left == std::string::npos || right == std::string::npos)
  87.             return false;
  88.         if (left == right)
  89.             return false;
  90.         if (right - (left + SEP_LEN) != 1)
  91.             return false;
  92.         std::string x_string = in.substr(0, left); // [0, 2) [start, end) , start, end - start
  93.         std::string y_string = in.substr(right + SEP_LEN);
  94.         if (x_string.empty())
  95.             return false;
  96.         if (y_string.empty())
  97.             return false;
  98.         x = std::stoi(x_string);
  99.         y = std::stoi(y_string);
  100.         op = in[left + SEP_LEN];
  101. #else
  102.         Json::Value root;
  103.         Json::Reader reader;
  104.         reader.parse(in, root);
  105.         x = root["first"].asInt();
  106.         y = root["second"].asInt();
  107.         op = root["oper"].asInt();
  108. #endif
  109.         return true;
  110.     }
  111. public:
  112.     // "x op y"
  113.     int x;
  114.     int y;
  115.     char op;
  116. };
  117. class Response
  118. {
  119. public:
  120.     Response() : exitcode(0), result(0)
  121.     {
  122.     }
  123.     Response(int exitcode_, int result_) : exitcode(exitcode_), result(result_)
  124.     {
  125.     }
  126.     bool serialize(std::string *out)
  127.     {
  128. #ifdef MYSELF
  129.         *out = "";
  130.         std::string ec_string = std::to_string(exitcode);
  131.         std::string res_string = std::to_string(result);
  132.         *out = ec_string;
  133.         *out += SEP;
  134.         *out += res_string;
  135. #else
  136.         Json::Value root;
  137.         root["exitcode"] = exitcode;
  138.         root["result"] = result;
  139.         Json::FastWriter writer;
  140.         *out = writer.write(root);
  141. #endif
  142.         return true;
  143.     }
  144.     bool deserialize(const std::string &in)
  145.     {
  146. #ifdef MYSELF
  147.         // "exitcode result"
  148.         auto mid = in.find(SEP);
  149.         if (mid == std::string::npos)
  150.             return false;
  151.         std::string ec_string = in.substr(0, mid);
  152.         std::string res_string = in.substr(mid + SEP_LEN);
  153.         if (ec_string.empty() || res_string.empty())
  154.             return false;
  155.         exitcode = std::stoi(ec_string);
  156.         result = std::stoi(res_string);
  157. #else
  158.         Json::Value root;
  159.         Json::Reader reader;
  160.         reader.parse(in, root);
  161.         exitcode = root["exitcode"].asInt();
  162.         result = root["result"].asInt();
  163. #endif
  164.         return true;
  165.     }
  166. public:
  167.     int exitcode; // 0:计算成功,!0表示计算失败,具体是多少,定好标准
  168.     int result;   // 计算结果
  169. };
  170. // "content_len"\r\n"x op y"\r\n     "content_len"\r\n"x op y"\r\n"content_len"\r\n"x op
  171. //从输入缓冲区 inbuffer 中解析出一个完整的报文,并将其存储在 text 指针指向的位置。
  172. bool ParseOnePackage(std::string &inbuffer, std::string *text)
  173. {
  174.     *text = "";//清空 text 指向的字符串,确保不会残留旧数据。
  175.     // 分析处理
  176.     auto pos = inbuffer.find(LINE_SEP);//查找 inbuffer 中的第一个 \r\n
  177.     if (pos == std::string::npos)//如果找不到,则返回 false 表示当前缓冲区中没有完整的报文头部信息。
  178.         return false;
  179.     std::string text_len_string = inbuffer.substr(0, pos);//提取从开始到第一个 \r\n 之前的部分作为文本长度字符串。
  180.     int text_len = std::stoi(text_len_string);//将该字符串转换为整数 text_len,表示实际消息内容的长度。
  181.     int total_len = text_len_string.size() + 2 * LINE_SEP_LEN + text_len;//整个报文的总长度
  182.     if (inbuffer.size() < total_len)
  183.         return false;
  184.     // 至少有一个完整的报文
  185.     *text = inbuffer.substr(0, total_len);//读取
  186.     inbuffer.erase(0, total_len);//从 inbuffer 中移除已处理的报文部分,以便后续继续处理剩余的数据。
  187.     return true;
  188. }
复制代码
6、接着封装epoll,包罗它的添加事件、等候事件就绪和其他操纵
  1. //Epoller.hpp
  2. #pragma once
  3. #include <iostream>
  4. #include <string>
  5. #include <cstring>
  6. #include <sys/epoll.h>
  7. #include "Err.hpp"
  8. #include "Log.hpp"
  9. const static int defaultepfd = -1;
  10. const static int size = 128;
  11. class Epoller
  12. {
  13. public:
  14.     Epoller():epfd_(defaultepfd)
  15.     {}
  16.     ~Epoller()
  17.     {
  18.         if(epfd_ != defaultepfd) close(epfd_);
  19.     }
  20. public:
  21.     void Create()
  22.     {
  23.         epfd_ = epoll_create(size);
  24.         if(epfd_ < 0)
  25.         {
  26.             logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
  27.             exit(EPOLL_CREATE_ERR);
  28.         }
  29.     }
  30.     // user -> kernel
  31.     bool AddEvent(int sock, uint32_t events)
  32.     {
  33.         struct epoll_event ev;
  34.         ev.events = events;
  35.         ev.data.fd = sock;
  36.         int n = epoll_ctl(epfd_, EPOLL_CTL_ADD, sock, &ev);
  37.         return n == 0;
  38.     }
  39.     // kernel -> user
  40.     int Wait(struct epoll_event revs[], int num, int timeout)
  41.     {
  42.         int n = epoll_wait(epfd_, revs, num, timeout);
  43.         return n;
  44.     }
  45.    
  46.     bool Control(int sock, uint32_t event, int action)
  47.     {
  48.         int n = 0;
  49.         if(action == EPOLL_CTL_MOD)
  50.         {
  51.             struct epoll_event ev;
  52.             ev.events = event;
  53.             ev.data.fd = sock;
  54.             n = epoll_ctl(epfd_, action, sock, &ev);
  55.         }
  56.         else if(action == EPOLL_CTL_DEL)
  57.         {
  58.             n = epoll_ctl(epfd_, action, sock, nullptr);
  59.         }
  60.         else n = -1;
  61.         return n == 0;
  62.     }
  63.     void Close()
  64.     {
  65.         if(epfd_ != defaultepfd) close(epfd_);
  66.     }
  67.    
  68. private:
  69.     int epfd_;
  70. };
复制代码
7、接着就是重头戏了,实现封装服务器
  1. #pragma once
  2. #include <iostream>
  3. #include <cassert>
  4. #include <functional>
  5. #include <unordered_map>
  6. #include "Log.hpp"
  7. #include "Sock.hpp"
  8. #include "Err.hpp"
  9. #include "Epoller.hpp"
  10. #include "Util.hpp"
  11. #include "Protocol.hpp"
  12. namespace tcpserver
  13. {
  14.     class Connection;//声明
  15.     class TcpServer;
  16.     static const uint16_t defaultport = 8080;
  17.     static const int num = 64;
  18.     using func_t = std::function<void(Connection *)>;
  19.     // using hander_t = std::function<void(const std::string &package)>;
  20.     class Connection//封装套接字,使得他们都有着自己的缓冲区空间
  21.     {
  22.     public:
  23.         Connection(int sock, TcpServer *tsp) : sock_(sock), tsp_(tsp)
  24.         {
  25.         }
  26.         void Register(func_t r, func_t s, func_t e)//注册方法,创建该结构体的时候,注册它的三个方法,即数据就绪之后怎么处理
  27.         {
  28.             recver_ = r;
  29.             sender_ = s;
  30.             excepter_ = e;
  31.         }
  32.         ~Connection()
  33.         {
  34.         }
  35.         void Close()
  36.         {
  37.             close(sock_);
  38.         }
  39.     public:
  40.         int sock_;
  41.         std::string inbuffer_;  // 输入缓冲区,但是这里只能处理字符串类信息,图片视频类就难解决了
  42.         std::string outbuffer_; // 输出缓冲区,发送也是一样,因为不能保证自己对一个文本发送到哪里了
  43.         func_t recver_;   // 从sock_读
  44.         func_t sender_;   // 向sock_写
  45.         func_t excepter_; // 处理sock_ IO的时候上面的异常事件
  46.         TcpServer *tsp_; // 回执指针,可以指向TcpServer,可以被省略
  47.         uint64_t lasttime;
  48.     };
  49.     class TcpServer // Reactor反应堆模式
  50.     {
  51.     private:
  52.         void Recver(Connection *conn)//对于收到的消息如果不做处理,后面再收到时就会看到一直追加在原数据后面
  53.         {
  54.             conn->lasttime = time(nullptr);
  55.             char buffer[1024];
  56.             while (true)//循环将文件描述符的东西全部读上来
  57.             {
  58.                 ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0);
  59.                 if (s > 0)
  60.                 {
  61.                     buffer[s] = 0;
  62.                     conn->inbuffer_ += buffer; // 将读到的数据入队列
  63.                     logMessage(DEBUG, "\n%s", conn->inbuffer_);
  64.                     service_(conn);
  65.                 }
  66.                 else if (s == 0)
  67.                 {
  68.                     if (conn->excepter_)
  69.                     {
  70.                         conn->excepter_(conn);//如果此时异常了,且异常被设置了,此时只需要回调它的异常就可以了
  71.                         return;
  72.                     }
  73.                 }
  74.                 else
  75.                 {
  76.                     if (errno == EAGAIN || errno == EWOULDBLOCK)//底层没数据了
  77.                         break;
  78.                     else if (errno == EINTR)//信号被中断了
  79.                         continue;
  80.                     else//出错了
  81.                     {
  82.                         if (conn->excepter_)
  83.                         {
  84.                             conn->excepter_(conn);
  85.                             return;
  86.                         }
  87.                     }
  88.                 }
  89.             } // while
  90.         }
  91.         void Sender(Connection *conn)
  92.         {
  93.             conn->lasttime = time(nullptr);
  94.             while (true)
  95.             {
  96.                 ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
  97.                 if (s > 0)
  98.                 {
  99.                     if (conn->outbuffer_.empty())//发完了
  100.                     {
  101.                         // EnableReadWrite(conn, true, false);
  102.                         break;
  103.                     }
  104.                     else//没有发完,发出去多少清除多少
  105.                         conn->outbuffer_.erase(0, s);
  106.                 }
  107.                 else
  108.                 {
  109.                     if (errno == EAGAIN || errno == EWOULDBLOCK)//发送缓冲区满了
  110.                         break;
  111.                     else if (errno == EINTR)//被信号中断
  112.                         continue;
  113.                     else//发送出错
  114.                     {
  115.                         if (conn->excepter_)
  116.                         {
  117.                             conn->excepter_(conn);
  118.                             return;
  119.                         }
  120.                     }
  121.                 }
  122.             }
  123.             // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
  124.             if (!conn->outbuffer_.empty())
  125.                 conn->tsp_->EnableReadWrite(conn, true, true);
  126.             else
  127.                 conn->tsp_->EnableReadWrite(conn, true, false);
  128.         }
  129.         void Excepter(Connection *conn)
  130.         {
  131.             logMessage(DEBUG, "Excepter begin");
  132.             epoller_.Control(conn->sock_, 0, EPOLL_CTL_DEL);
  133.             conn->Close();
  134.             connections_.erase(conn->sock_);
  135.             logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);
  136.             delete conn;
  137.         }
  138.         void Accepter(Connection *conn)
  139.         {
  140.             for (;;)//因为监听套接字已经被设置为ET了,所以无论来多少连接也只是通知一次,这就倒逼程序员一次拿完所有的连接,所以我们需要循环读取,直到失败
  141.             {
  142.                 std::string clientip;
  143.                 uint16_t clientport;
  144.                 int err = 0;
  145.                 int sock = sock_.Accept(&clientip, &clientport, &err);
  146.                 if (sock > 0)
  147.                 {
  148.                     AddConnection(
  149.                         sock, EPOLLIN | EPOLLET,
  150.                         std::bind(&TcpServer::Recver, this, std::placeholders::_1),
  151.                         std::bind(&TcpServer::Sender, this, std::placeholders::_1),
  152.                         std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
  153.                     logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
  154.                 }
  155.                 else//当底层没有连接时,使用accept拿的时候,错误码会被设置
  156.                 {
  157.                     if (err == EAGAIN || err == EWOULDBLOCK)//表示底层没有连接了
  158.                         break;
  159.                     else if (err == EINTR)//表示正在读的时候被信号中断了
  160.                         continue;
  161.                     else//这才是真正出错了
  162.                         break;
  163.                 }
  164.             }
  165.         }
  166.         
  167.         void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)//管理Connection对象
  168.         {
  169.             // 1. 首先要为该sock创建Connection,并初始化,并添加到connections_
  170.             if (events & EPOLLET)//如果事件设置了ET模式,那么就将该套接字设置为非阻塞
  171.                 Util::SetNonBlock(sock);
  172.             Connection *conn = new Connection(sock, this); //构建对象
  173.             // 2. 给对应的sock设置对应回调处理方法
  174.             conn->Register(recver, sender, excepter);
  175.             // 2. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
  176.             bool r = epoller_.AddEvent(sock, events);//一般这里是不会出什么问题的
  177.             assert(r);//断言一下
  178.             (void)r;
  179.             // 3. 将kv添加到connections_
  180.             connections_.insert(std::pair<int, Connection *>(sock, conn));
  181.             logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
  182.         }
  183.         bool IsConnectionExists(int sock)//判断文件描述符是否在我们对应的connection中
  184.         {
  185.             auto iter = connections_.find(sock);
  186.             return iter != connections_.end();
  187.         }
  188.         void Loop(int timeout)
  189.         {
  190.             int n = epoller_.Wait(revs_, num_, timeout); // 获取已经就绪的事件
  191.             for (int i = 0; i < n; i++)
  192.             {
  193.                 int sock = revs_[i].data.fd;
  194.                 uint32_t events = revs_[i].events;
  195.                 // 将所有的异常问题,全部转化 成为读写问题
  196.                 if (events & EPOLLERR)
  197.                     events |= (EPOLLIN | EPOLLOUT);//如果出现异常,就设置它的读事件和写事件就绪,读写本来就要做异常处理
  198.                 if (events & EPOLLHUP)
  199.                     events |= (EPOLLIN | EPOLLOUT);
  200.                 // listen事件就绪
  201.                 if ((events & EPOLLIN) && IsConnectionExists(sock) && connections_[sock]->recver_)//读事件就绪并且connection对应的套接字存在,并且recver对象也存在
  202.                     connections_[sock]->recver_(connections_[sock]);
  203.                 if ((events & EPOLLOUT) && IsConnectionExists(sock) && connections_[sock]->sender_)
  204.                     connections_[sock]->sender_(connections_[sock]);
  205.             }
  206.         }
  207.     public:
  208.         TcpServer(func_t func, uint16_t port = defaultport) : service_(func), port_(port), revs_(nullptr)
  209.         {
  210.         }
  211.         void InitServer()
  212.         {
  213.             // 1. 创建socket
  214.             sock_.Socket();
  215.             sock_.Bind(port_);
  216.             sock_.Listen();
  217.             // 2. 构建Epoll
  218.             epoller_.Create();
  219.             // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
  220.       
  221.             // listensock_也是一个socket啊,也要看做成为一个Connection
  222.             AddConnection(sock_.Fd(), EPOLLIN | EPOLLET,
  223.                           std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
  224.             revs_ = new struct epoll_event[num];//初始化一段缓冲区用来保存已经就绪的事件
  225.             num_ = num;
  226.         }
  227.         void EnableReadWrite(Connection *conn, bool readable, bool writeable)
  228.         {
  229.             uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
  230.             epoller_.Control(conn->sock_, event, EPOLL_CTL_MOD);
  231.         }
  232.         // 事件派发器
  233.         void Dispatcher()
  234.         {
  235.             int timeout = 1000;
  236.             while (true)
  237.             {
  238.                 Loop(timeout);
  239.                 // logMessage(DEBUG, "time out ...");
  240.                 // 遍历connections_,计算每一个链接的已经有多长时间没有动了
  241.                
  242.             }
  243.         }
  244.         ~TcpServer()
  245.         {
  246.             sock_.Close();
  247.             epoller_.Close();
  248.             if (nullptr == revs_)
  249.                 delete[] revs_;
  250.         }
  251.     private:
  252.         uint16_t port_;
  253.         Sock sock_;
  254.         Epoller epoller_;
  255.         std::unordered_map<int, Connection *> connections_;
  256.         struct epoll_event *revs_;//存储多个就绪时间的空间,指针
  257.         int num_;//表示用来保存就绪事件的缓冲区一共有多少个元素
  258.         // hander_t handler_;
  259.         func_t service_;
  260.     };
  261. }
复制代码
  这里有着很多细节,需要强调点如下:
  就读事件来说,在我们之前实现无论是select、poll还是epoll的时候,对底层数据到位之后,读取时候的处理惩罚都是特殊不完善的。先看我们之前怎么处理惩罚的,如下:
  1. else if(revs[i].events&EPOLLIN)//读事件就绪
  2.             {
  3.                 char buffer[64];
  4.                 ssize_t s=recv(fd,buffer,sizeof(buffer)-1,0);
  5.                 if(s>0)//读取成功
  6.                 {
  7.                     buffer[s]='\0';
  8.                     std::cout<<"echo#"<<buffer<<std::endl;
  9.                 }
  10.                 else if(s==0)//对端连接关闭
  11.                 {
  12.                     std::cout<<"Client quit"<<std::endl;
  13.                     close(fd);
  14.                     DelEvent(fd);//将文件描述符从模型中删除
  15.                 }
  16.                 else{//读取错误
  17.                     std::cerr<<"recv error!"<<std::endl;
  18.                     close(fd);
  19.                     DelEvent(fd);
  20.                 }
  21.             }
复制代码
         我们要知道在epoll模式下,如果底层数据已经就绪,我们假设把本轮的数据全部读完,这时候也不一定可以或许读到一个完整的请求,本轮读完是ET的要求,能不能完整读完是跟协议要求干系的。纵然循环读,读完也不行。如果我们并不能读到一个完整的请求,那么我们就只能 将自己读到的数据临时放在buffer中,但是临时生存在buffer中的话,这个文件描述符生存了,其他的文件描述符数据也就绪了怎么办,而且在当前读完之后,整个的这个代码区间全部都被开释掉了(因为它是栈上的空间),以是再想下一次读的时候buffer早都被开释完了。
          以是光在栈上的缓冲区远远不够,对于每一个文件描述符,我们都得给他们的输入输出的用户层缓冲区全都带上。以是要对每一个套接字举行封装,每一个套接字都要有着自己的缓冲区空间,读取的时候,将数据暂存到自己的缓冲区中,没读完下次再读,这也才不会和其他的数据胶葛。在发送的时候也是同理。
          我们在这次的Reactor代码中对每一个文件描述符都封装了Connection,系统中出现大量的Connection对象,需要被管理起来,先描述再组织,描述已经到位就是Connection结构体,接下来用unordered_map<int ,Connection*>管理起来,,纵然用kv将文件描述符和我们所创建的Connection结构体结合起来  ,  以是将来我们将文件描述符添加到epoll模型中的时候,同时将文件描述符的Connect对象也添加进unordered_map中,这样将来就可以通过unordered_map检察该文件描述符所对应的各种事件和输入输出缓冲区了。
  8、Main函数
  1. //Main.cc
  2. #include "TcpServer.hpp"
  3. #include <memory>
  4. using namespace tcpserver;
  5. static void usage(std::string proc)
  6. {
  7.     std::cerr << "Usage:\n\t" << proc << " port"
  8.               << "\n\n";
  9. }
  10. bool cal(const Request &req, Response &resp)
  11. {
  12.     // req已经有结构化完成的数据啦,你可以直接使用
  13.     resp.exitcode = OK;
  14.     resp.result = OK;
  15.     switch (req.op)
  16.     {
  17.     case '+':
  18.         resp.result = req.x + req.y;
  19.         break;
  20.     case '-':
  21.         resp.result = req.x - req.y;
  22.         break;
  23.     case '*':
  24.         resp.result = req.x * req.y;
  25.         break;
  26.     case '/':
  27.     {
  28.         if (req.y == 0)
  29.             resp.exitcode = DIV_ZERO;
  30.         else
  31.             resp.result = req.x / req.y;
  32.     }
  33.     break;
  34.     case '%':
  35.     {
  36.         if (req.y == 0)
  37.             resp.exitcode = MOD_ZERO;
  38.         else
  39.             resp.result = req.x % req.y;
  40.     }
  41.     break;
  42.     default:
  43.         resp.exitcode = OP_ERROR;
  44.         break;
  45.     }
  46.     return true;
  47. }
  48. void calculate(Connection *conn)
  49. {
  50.     std::string onePackage;
  51.     while (ParseOnePackage(conn->inbuffer_, &onePackage))
  52.     {
  53.         std::string reqStr;
  54.         if (!deLength(onePackage, &reqStr))
  55.             return;
  56.         std::cout << "去掉报头的正文:\n"
  57.                   << reqStr << std::endl;
  58.         // 2. 对请求Request,反序列化
  59.         // 2.1 得到一个结构化的请求对象
  60.         Request req;
  61.         if (!req.deserialize(reqStr))
  62.             return;
  63.         Response resp;
  64.         cal(req, resp);
  65.         std::string respStr;
  66.         resp.serialize(&respStr);
  67.         // 5. 然后我们在发送响应
  68.         // 5.1 构建成为一个完整的报文
  69.         conn->outbuffer_ += enLength(respStr);//添加到自己的发送缓冲区中
  70.         std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
  71.     }
  72.     // 直接发
  73.     if (conn->sender_)
  74.         conn->sender_(conn);
  75.     // // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
  76.     // if (!conn->outbuffer_.empty())
  77.     //     conn->tsp_->EnableReadWrite(conn, true, true);
  78.     // else
  79.     //     conn->tsp_->EnableReadWrite(conn, true, false);
  80. }
  81. int main(int argc, char *argv[])
  82. {
  83.     if (argc != 2)
  84.     {
  85.         usage(argv[0]);
  86.         exit(USAGE_ERR);
  87.     }
  88.     uint16_t port = atoi(argv[1]);
  89.     std::unique_ptr<TcpServer> tsvr(new TcpServer(calculate, port));
  90.     tsvr->InitServer();
  91.     tsvr->Dispatcher();
  92.     return 0;
  93. }
复制代码
这就是全部代码了。

感谢阅读! 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

羊蹓狼

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表