马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一、前言
上篇文章中我们已经讲解了多路转接剩下的两个接口:poll和epoll,并且知道了epoll的两种工作模式分别是 LT模式和ET模式,下来我们就实现的是一个简洁版的 Reactor,即半同步半异步I/O,在linux网络中,最常用最频繁的一种网络IO计划模式
Reactor计划模式的概念:
Reactor计划模式是一种为处理惩罚并发操纵的事件处理惩罚模式。它将事件的检测和相应分离,使得事件的监听者可以注册对特定类型事件的爱好,并在这些事件发生时得到通知。这种模式非常得当于需要处理惩罚大量并发连接的应用程序,比如Web服务器。
二、代码
1、首先对于网络来说必须有套接字的创建、绑定、监听和连接,将它们封装起来
- //Sock.hpp
- #pragma once
- #include <iostream>
- #include <string>
- #include <cstring>
- #include <unistd.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include "Log.hpp"
- #include "Err.hpp"
- const static int backlog = 32;
- const static int defaultsock = -1;
- class Sock
- {
- public:
- Sock():listensock_(defaultsock)
- {}
- ~Sock()
- {
- if(listensock_ != defaultsock) close(listensock_);
- }
- public:
- void Socket()
- {
- // 1. 创建socket文件套接字对象
- listensock_ = socket(AF_INET, SOCK_STREAM, 0);
- if (listensock_ < 0)
- {
- logMessage(FATAL, "create socket error");
- exit(SOCKET_ERR);
- }
- logMessage(NORMAL, "create socket success: %d", listensock_);
- int opt = 1;
- setsockopt(listensock_, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt));
- }
- void Bind(int port)
- {
- // 2. bind绑定自己的网络信息
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(port);
- local.sin_addr.s_addr = INADDR_ANY;
- if (bind(listensock_, (struct sockaddr *)&local, sizeof(local)) < 0)
- {
- logMessage(FATAL, "bind socket error");
- exit(BIND_ERR);
- }
- logMessage(NORMAL, "bind socket success");
- }
- void Listen()
- {
- // 3. 设置socket 为监听状态
- if (listen(listensock_, backlog) < 0)
- {
- logMessage(FATAL, "listen socket error");
- exit(LISTEN_ERR);
- }
- logMessage(NORMAL, "listen socket success");
- }
- int Accept(std::string *clientip, uint16_t *clientport, int *err)
- {
- struct sockaddr_in peer;
- socklen_t len = sizeof(peer);
- int sock = accept(listensock_, (struct sockaddr *)&peer, &len);
- *err = errno;
- if (sock < 0){}
- // logMessage(ERROR, "accept error, next");
- else
- {
- // logMessage(NORMAL, "accept a new link success, get new sock: %d", sock); // ?
- *clientip = inet_ntoa(peer.sin_addr);
- *clientport = ntohs(peer.sin_port);
- }
- return sock;
- }
- int Fd()
- {
- return listensock_;
- }
- void Close()
- {
- if(listensock_ != defaultsock) close(listensock_);
- }
- private:
- int listensock_;
- };
复制代码 2、接着我们还想要将日记文件带进来
- //日志文件,生成和输出日志信息
- //Log.hpp
- #pragma once
- #include <iostream>
- #include <string>
- #include <cstdarg>
- #include <ctime>
- #include <unistd.h>
- #define DEBUG 0
- #define NORMAL 1
- #define WARNING 2
- #define ERROR 3
- #define FATAL 4
- const char * to_levelstr(int level)
- {
- switch(level)
- {
- case DEBUG : return "DEBUG";
- case NORMAL: return "NORMAL";
- case WARNING: return "WARNING";
- case ERROR: return "ERROR";
- case FATAL: return "FATAL";
- default : return nullptr;
- }
- }
- void logMessage(int level, const char *format, ...)
- {
- #define NUM 1024
- char logprefix[NUM];//用于存储日志前缀,包含日志级别,时间戳和进程PID
- snprintf(logprefix, sizeof(logprefix), "[%s][%ld][pid: %d]",//格式化输出,并将结果按格式存储到指定的字符数组中,
- to_levelstr(level), (long int)time(nullptr), getpid());//这一行是可变数量的参数
- char logcontent[NUM];//用于存储日志内容
- va_list arg;
- va_start(arg, format);//初始化arg指向第一个可选参数
- vsnprintf(logcontent, sizeof(logcontent), format, arg);
- std::cout << logprefix << logcontent << std::endl;
- }
复制代码 3、还有对错误代码举行定义
- //Err.hpp
- #pragma once
- #include <iostream>
- //枚举错误
- enum
- {
- USAGE_ERR = 1,//参数使用不当
- SOCKET_ERR,//创建套接字失败
- BIND_ERR,//绑定套接字失败
- LISTEN_ERR,//设置监听失败
- EPOLL_CREATE_ERR//创建epoll失败
- };
复制代码 4、我们要知道ET模式下,我们必须将文件描述符设置为非壅闭的,以是我们还需要一个设置文件描述符非壅闭接口
- #pragma once
- #include <iostream>
- #include <unistd.h>
- #include <fcntl.h>
- //给文件描述符设置非阻塞模式
- class Util
- {
- public:
- static bool SetNonBlock(int fd)
- {
- int fl = fcntl(fd, F_GETFL);
- if (fl < 0) return false;
- fcntl(fd, F_SETFL, fl | O_NONBLOCK);
- return true;
- }
- };
复制代码 5、接着想在网络中添加的网络计算器的业务逻辑,为了方便对转发的数据大概报文举行完整读取,我们还需要订定传输的协议
- //制定数据的传输格式,即协议
- //Protocol.hpp
- #pragma once
- #include <iostream>
- #include <cstring>
- #include <string>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <jsoncpp/json/json.h>
- #define SEP " "
- #define SEP_LEN strlen(SEP) // 不敢使用sizeof()
- #define LINE_SEP "\r\n"
- #define LINE_SEP_LEN strlen(LINE_SEP) // 不敢使用sizeof()
- enum
- {
- OK = 0,
- DIV_ZERO,
- MOD_ZERO,
- OP_ERROR
- };
- // "x op y" -> "content_len"\r\n"x op y"\r\n
- // "exitcode result" -> "content_len"\r\n"exitcode result"\r\n
- std::string enLength(const std::string &text)//encode编码,将文本內容加工成上面的形式
- {
- std::string send_string = std::to_string(text.size());
- send_string += LINE_SEP;
- send_string += text;
- send_string += LINE_SEP;
- return send_string;
- }
- // "content_len"\r\n"exitcode result"\r\n
- bool deLength(const std::string &package, std::string *text)//decode解码
- {
- auto pos = package.find(LINE_SEP);
- if (pos == std::string::npos)//没有找到\r\n
- return false;
- std::string text_len_string = package.substr(0, pos);//获取文本长度
- int text_len = std::stoi(text_len_string);//从字符串转成整型
- *text = package.substr(pos + LINE_SEP_LEN, text_len);
- return true;
- }
- // 没有人规定我们网络通信的时候,只能有一种协议!!
- // 我们怎么让系统知道我们用的是哪一种协议呢??
- // "content_len"\r\n"协议编号"\r\n"x op y"\r\n
- class Request//请求
- {
- public:
- Request() : x(0), y(0), op(0)//默认构造函数
- {
- }
- Request(int x_, int y_, char op_) : x(x_), y(y_), op(op_)//构造函数
- {
- }
- // 1. 自己写
- // 2. 用现成的
- bool serialize(std::string *out)//序列化
- {
- #ifdef MYSELF//自己的序列化
- *out = "";
- // 结构化 -> "x op y";
- std::string x_string = std::to_string(x);
- std::string y_string = std::to_string(y);
- *out = x_string;
- *out += SEP;
- *out += op;
- *out += SEP;
- *out += y_string;
- #else
- Json::Value root;
- root["first"] = x;
- root["second"] = y;
- root["oper"] = op;
- Json::FastWriter writer;
- // Json::StyledWriter writer;
- *out = writer.write(root);
- #endif
- return true;
- }
- // "x op yyyy";
- bool deserialize(const std::string &in)//反序列化
- {
- #ifdef MYSELF
- // "x op y" -> 结构化
- auto left = in.find(SEP);
- auto right = in.rfind(SEP);
- if (left == std::string::npos || right == std::string::npos)
- return false;
- if (left == right)
- return false;
- if (right - (left + SEP_LEN) != 1)
- return false;
- std::string x_string = in.substr(0, left); // [0, 2) [start, end) , start, end - start
- std::string y_string = in.substr(right + SEP_LEN);
- if (x_string.empty())
- return false;
- if (y_string.empty())
- return false;
- x = std::stoi(x_string);
- y = std::stoi(y_string);
- op = in[left + SEP_LEN];
- #else
- Json::Value root;
- Json::Reader reader;
- reader.parse(in, root);
- x = root["first"].asInt();
- y = root["second"].asInt();
- op = root["oper"].asInt();
- #endif
- return true;
- }
- public:
- // "x op y"
- int x;
- int y;
- char op;
- };
- class Response
- {
- public:
- Response() : exitcode(0), result(0)
- {
- }
- Response(int exitcode_, int result_) : exitcode(exitcode_), result(result_)
- {
- }
- bool serialize(std::string *out)
- {
- #ifdef MYSELF
- *out = "";
- std::string ec_string = std::to_string(exitcode);
- std::string res_string = std::to_string(result);
- *out = ec_string;
- *out += SEP;
- *out += res_string;
- #else
- Json::Value root;
- root["exitcode"] = exitcode;
- root["result"] = result;
- Json::FastWriter writer;
- *out = writer.write(root);
- #endif
- return true;
- }
- bool deserialize(const std::string &in)
- {
- #ifdef MYSELF
- // "exitcode result"
- auto mid = in.find(SEP);
- if (mid == std::string::npos)
- return false;
- std::string ec_string = in.substr(0, mid);
- std::string res_string = in.substr(mid + SEP_LEN);
- if (ec_string.empty() || res_string.empty())
- return false;
- exitcode = std::stoi(ec_string);
- result = std::stoi(res_string);
- #else
- Json::Value root;
- Json::Reader reader;
- reader.parse(in, root);
- exitcode = root["exitcode"].asInt();
- result = root["result"].asInt();
- #endif
- return true;
- }
- public:
- int exitcode; // 0:计算成功,!0表示计算失败,具体是多少,定好标准
- int result; // 计算结果
- };
- // "content_len"\r\n"x op y"\r\n "content_len"\r\n"x op y"\r\n"content_len"\r\n"x op
- //从输入缓冲区 inbuffer 中解析出一个完整的报文,并将其存储在 text 指针指向的位置。
- bool ParseOnePackage(std::string &inbuffer, std::string *text)
- {
- *text = "";//清空 text 指向的字符串,确保不会残留旧数据。
- // 分析处理
- auto pos = inbuffer.find(LINE_SEP);//查找 inbuffer 中的第一个 \r\n
- if (pos == std::string::npos)//如果找不到,则返回 false 表示当前缓冲区中没有完整的报文头部信息。
- return false;
- std::string text_len_string = inbuffer.substr(0, pos);//提取从开始到第一个 \r\n 之前的部分作为文本长度字符串。
- int text_len = std::stoi(text_len_string);//将该字符串转换为整数 text_len,表示实际消息内容的长度。
- int total_len = text_len_string.size() + 2 * LINE_SEP_LEN + text_len;//整个报文的总长度
- if (inbuffer.size() < total_len)
- return false;
- // 至少有一个完整的报文
- *text = inbuffer.substr(0, total_len);//读取
- inbuffer.erase(0, total_len);//从 inbuffer 中移除已处理的报文部分,以便后续继续处理剩余的数据。
- return true;
- }
复制代码 6、接着封装epoll,包罗它的添加事件、等候事件就绪和其他操纵
- //Epoller.hpp
- #pragma once
- #include <iostream>
- #include <string>
- #include <cstring>
- #include <sys/epoll.h>
- #include "Err.hpp"
- #include "Log.hpp"
- const static int defaultepfd = -1;
- const static int size = 128;
- class Epoller
- {
- public:
- Epoller():epfd_(defaultepfd)
- {}
- ~Epoller()
- {
- if(epfd_ != defaultepfd) close(epfd_);
- }
- public:
- void Create()
- {
- epfd_ = epoll_create(size);
- if(epfd_ < 0)
- {
- logMessage(FATAL, "epoll_create error, code: %d, errstring: %s", errno, strerror(errno));
- exit(EPOLL_CREATE_ERR);
- }
- }
- // user -> kernel
- bool AddEvent(int sock, uint32_t events)
- {
- struct epoll_event ev;
- ev.events = events;
- ev.data.fd = sock;
- int n = epoll_ctl(epfd_, EPOLL_CTL_ADD, sock, &ev);
- return n == 0;
- }
- // kernel -> user
- int Wait(struct epoll_event revs[], int num, int timeout)
- {
- int n = epoll_wait(epfd_, revs, num, timeout);
- return n;
- }
-
- bool Control(int sock, uint32_t event, int action)
- {
- int n = 0;
- if(action == EPOLL_CTL_MOD)
- {
- struct epoll_event ev;
- ev.events = event;
- ev.data.fd = sock;
- n = epoll_ctl(epfd_, action, sock, &ev);
- }
- else if(action == EPOLL_CTL_DEL)
- {
- n = epoll_ctl(epfd_, action, sock, nullptr);
- }
- else n = -1;
- return n == 0;
- }
- void Close()
- {
- if(epfd_ != defaultepfd) close(epfd_);
- }
-
- private:
- int epfd_;
- };
复制代码 7、接着就是重头戏了,实现封装服务器
- #pragma once
- #include <iostream>
- #include <cassert>
- #include <functional>
- #include <unordered_map>
- #include "Log.hpp"
- #include "Sock.hpp"
- #include "Err.hpp"
- #include "Epoller.hpp"
- #include "Util.hpp"
- #include "Protocol.hpp"
- namespace tcpserver
- {
- class Connection;//声明
- class TcpServer;
- static const uint16_t defaultport = 8080;
- static const int num = 64;
- using func_t = std::function<void(Connection *)>;
- // using hander_t = std::function<void(const std::string &package)>;
- class Connection//封装套接字,使得他们都有着自己的缓冲区空间
- {
- public:
- Connection(int sock, TcpServer *tsp) : sock_(sock), tsp_(tsp)
- {
- }
- void Register(func_t r, func_t s, func_t e)//注册方法,创建该结构体的时候,注册它的三个方法,即数据就绪之后怎么处理
- {
- recver_ = r;
- sender_ = s;
- excepter_ = e;
- }
- ~Connection()
- {
- }
- void Close()
- {
- close(sock_);
- }
- public:
- int sock_;
- std::string inbuffer_; // 输入缓冲区,但是这里只能处理字符串类信息,图片视频类就难解决了
- std::string outbuffer_; // 输出缓冲区,发送也是一样,因为不能保证自己对一个文本发送到哪里了
- func_t recver_; // 从sock_读
- func_t sender_; // 向sock_写
- func_t excepter_; // 处理sock_ IO的时候上面的异常事件
- TcpServer *tsp_; // 回执指针,可以指向TcpServer,可以被省略
- uint64_t lasttime;
- };
- class TcpServer // Reactor反应堆模式
- {
- private:
- void Recver(Connection *conn)//对于收到的消息如果不做处理,后面再收到时就会看到一直追加在原数据后面
- {
- conn->lasttime = time(nullptr);
- char buffer[1024];
- while (true)//循环将文件描述符的东西全部读上来
- {
- ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0);
- if (s > 0)
- {
- buffer[s] = 0;
- conn->inbuffer_ += buffer; // 将读到的数据入队列
- logMessage(DEBUG, "\n%s", conn->inbuffer_);
- service_(conn);
- }
- else if (s == 0)
- {
- if (conn->excepter_)
- {
- conn->excepter_(conn);//如果此时异常了,且异常被设置了,此时只需要回调它的异常就可以了
- return;
- }
- }
- else
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK)//底层没数据了
- break;
- else if (errno == EINTR)//信号被中断了
- continue;
- else//出错了
- {
- if (conn->excepter_)
- {
- conn->excepter_(conn);
- return;
- }
- }
- }
- } // while
- }
- void Sender(Connection *conn)
- {
- conn->lasttime = time(nullptr);
- while (true)
- {
- ssize_t s = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);
- if (s > 0)
- {
- if (conn->outbuffer_.empty())//发完了
- {
- // EnableReadWrite(conn, true, false);
- break;
- }
- else//没有发完,发出去多少清除多少
- conn->outbuffer_.erase(0, s);
- }
- else
- {
- if (errno == EAGAIN || errno == EWOULDBLOCK)//发送缓冲区满了
- break;
- else if (errno == EINTR)//被信号中断
- continue;
- else//发送出错
- {
- if (conn->excepter_)
- {
- conn->excepter_(conn);
- return;
- }
- }
- }
- }
- // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
- if (!conn->outbuffer_.empty())
- conn->tsp_->EnableReadWrite(conn, true, true);
- else
- conn->tsp_->EnableReadWrite(conn, true, false);
- }
- void Excepter(Connection *conn)
- {
- logMessage(DEBUG, "Excepter begin");
- epoller_.Control(conn->sock_, 0, EPOLL_CTL_DEL);
- conn->Close();
- connections_.erase(conn->sock_);
- logMessage(DEBUG, "关闭%d 文件描述符的所有的资源", conn->sock_);
- delete conn;
- }
- void Accepter(Connection *conn)
- {
- for (;;)//因为监听套接字已经被设置为ET了,所以无论来多少连接也只是通知一次,这就倒逼程序员一次拿完所有的连接,所以我们需要循环读取,直到失败
- {
- std::string clientip;
- uint16_t clientport;
- int err = 0;
- int sock = sock_.Accept(&clientip, &clientport, &err);
- if (sock > 0)
- {
- AddConnection(
- sock, EPOLLIN | EPOLLET,
- std::bind(&TcpServer::Recver, this, std::placeholders::_1),
- std::bind(&TcpServer::Sender, this, std::placeholders::_1),
- std::bind(&TcpServer::Excepter, this, std::placeholders::_1));
- logMessage(DEBUG, "get a new link, info: [%s:%d]", clientip.c_str(), clientport);
- }
- else//当底层没有连接时,使用accept拿的时候,错误码会被设置
- {
- if (err == EAGAIN || err == EWOULDBLOCK)//表示底层没有连接了
- break;
- else if (err == EINTR)//表示正在读的时候被信号中断了
- continue;
- else//这才是真正出错了
- break;
- }
- }
- }
-
- void AddConnection(int sock, uint32_t events, func_t recver, func_t sender, func_t excepter)//管理Connection对象
- {
- // 1. 首先要为该sock创建Connection,并初始化,并添加到connections_
- if (events & EPOLLET)//如果事件设置了ET模式,那么就将该套接字设置为非阻塞
- Util::SetNonBlock(sock);
- Connection *conn = new Connection(sock, this); //构建对象
- // 2. 给对应的sock设置对应回调处理方法
- conn->Register(recver, sender, excepter);
- // 2. 其次将sock与它要关心的事件"写透式"注册到epoll中,让epoll帮我们关心
- bool r = epoller_.AddEvent(sock, events);//一般这里是不会出什么问题的
- assert(r);//断言一下
- (void)r;
- // 3. 将kv添加到connections_
- connections_.insert(std::pair<int, Connection *>(sock, conn));
- logMessage(DEBUG, "add new sock : %d in epoll and unordered_map", sock);
- }
- bool IsConnectionExists(int sock)//判断文件描述符是否在我们对应的connection中
- {
- auto iter = connections_.find(sock);
- return iter != connections_.end();
- }
- void Loop(int timeout)
- {
- int n = epoller_.Wait(revs_, num_, timeout); // 获取已经就绪的事件
- for (int i = 0; i < n; i++)
- {
- int sock = revs_[i].data.fd;
- uint32_t events = revs_[i].events;
- // 将所有的异常问题,全部转化 成为读写问题
- if (events & EPOLLERR)
- events |= (EPOLLIN | EPOLLOUT);//如果出现异常,就设置它的读事件和写事件就绪,读写本来就要做异常处理
- if (events & EPOLLHUP)
- events |= (EPOLLIN | EPOLLOUT);
- // listen事件就绪
- if ((events & EPOLLIN) && IsConnectionExists(sock) && connections_[sock]->recver_)//读事件就绪并且connection对应的套接字存在,并且recver对象也存在
- connections_[sock]->recver_(connections_[sock]);
- if ((events & EPOLLOUT) && IsConnectionExists(sock) && connections_[sock]->sender_)
- connections_[sock]->sender_(connections_[sock]);
- }
- }
- public:
- TcpServer(func_t func, uint16_t port = defaultport) : service_(func), port_(port), revs_(nullptr)
- {
- }
- void InitServer()
- {
- // 1. 创建socket
- sock_.Socket();
- sock_.Bind(port_);
- sock_.Listen();
- // 2. 构建Epoll
- epoller_.Create();
- // 3. 将目前唯一的一个sock,添加到epoller中, 之前需要先将对应的fd设置成为非阻塞
-
- // listensock_也是一个socket啊,也要看做成为一个Connection
- AddConnection(sock_.Fd(), EPOLLIN | EPOLLET,
- std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
- revs_ = new struct epoll_event[num];//初始化一段缓冲区用来保存已经就绪的事件
- num_ = num;
- }
- void EnableReadWrite(Connection *conn, bool readable, bool writeable)
- {
- uint32_t event = (readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET;
- epoller_.Control(conn->sock_, event, EPOLL_CTL_MOD);
- }
- // 事件派发器
- void Dispatcher()
- {
- int timeout = 1000;
- while (true)
- {
- Loop(timeout);
- // logMessage(DEBUG, "time out ...");
- // 遍历connections_,计算每一个链接的已经有多长时间没有动了
-
- }
- }
- ~TcpServer()
- {
- sock_.Close();
- epoller_.Close();
- if (nullptr == revs_)
- delete[] revs_;
- }
- private:
- uint16_t port_;
- Sock sock_;
- Epoller epoller_;
- std::unordered_map<int, Connection *> connections_;
- struct epoll_event *revs_;//存储多个就绪时间的空间,指针
- int num_;//表示用来保存就绪事件的缓冲区一共有多少个元素
- // hander_t handler_;
- func_t service_;
- };
- }
复制代码 这里有着很多细节,需要强调点如下:
就读事件来说,在我们之前实现无论是select、poll还是epoll的时候,对底层数据到位之后,读取时候的处理惩罚都是特殊不完善的。先看我们之前怎么处理惩罚的,如下:
- else if(revs[i].events&EPOLLIN)//读事件就绪
- {
- char buffer[64];
- ssize_t s=recv(fd,buffer,sizeof(buffer)-1,0);
- if(s>0)//读取成功
- {
- buffer[s]='\0';
- std::cout<<"echo#"<<buffer<<std::endl;
- }
- else if(s==0)//对端连接关闭
- {
- std::cout<<"Client quit"<<std::endl;
- close(fd);
- DelEvent(fd);//将文件描述符从模型中删除
- }
- else{//读取错误
- std::cerr<<"recv error!"<<std::endl;
- close(fd);
- DelEvent(fd);
- }
- }
复制代码 我们要知道在epoll模式下,如果底层数据已经就绪,我们假设把本轮的数据全部读完,这时候也不一定可以或许读到一个完整的请求,本轮读完是ET的要求,能不能完整读完是跟协议要求干系的。纵然循环读,读完也不行。如果我们并不能读到一个完整的请求,那么我们就只能 将自己读到的数据临时放在buffer中,但是临时生存在buffer中的话,这个文件描述符生存了,其他的文件描述符数据也就绪了怎么办,而且在当前读完之后,整个的这个代码区间全部都被开释掉了(因为它是栈上的空间),以是再想下一次读的时候buffer早都被开释完了。
以是光在栈上的缓冲区远远不够,对于每一个文件描述符,我们都得给他们的输入输出的用户层缓冲区全都带上。以是要对每一个套接字举行封装,每一个套接字都要有着自己的缓冲区空间,读取的时候,将数据暂存到自己的缓冲区中,没读完下次再读,这也才不会和其他的数据胶葛。在发送的时候也是同理。
我们在这次的Reactor代码中对每一个文件描述符都封装了Connection,系统中出现大量的Connection对象,需要被管理起来,先描述再组织,描述已经到位就是Connection结构体,接下来用unordered_map<int ,Connection*>管理起来,,纵然用kv将文件描述符和我们所创建的Connection结构体结合起来 , 以是将来我们将文件描述符添加到epoll模型中的时候,同时将文件描述符的Connect对象也添加进unordered_map中,这样将来就可以通过unordered_map检察该文件描述符所对应的各种事件和输入输出缓冲区了。
8、Main函数
- //Main.cc
- #include "TcpServer.hpp"
- #include <memory>
- using namespace tcpserver;
- static void usage(std::string proc)
- {
- std::cerr << "Usage:\n\t" << proc << " port"
- << "\n\n";
- }
- bool cal(const Request &req, Response &resp)
- {
- // req已经有结构化完成的数据啦,你可以直接使用
- resp.exitcode = OK;
- resp.result = OK;
- switch (req.op)
- {
- case '+':
- resp.result = req.x + req.y;
- break;
- case '-':
- resp.result = req.x - req.y;
- break;
- case '*':
- resp.result = req.x * req.y;
- break;
- case '/':
- {
- if (req.y == 0)
- resp.exitcode = DIV_ZERO;
- else
- resp.result = req.x / req.y;
- }
- break;
- case '%':
- {
- if (req.y == 0)
- resp.exitcode = MOD_ZERO;
- else
- resp.result = req.x % req.y;
- }
- break;
- default:
- resp.exitcode = OP_ERROR;
- break;
- }
- return true;
- }
- void calculate(Connection *conn)
- {
- std::string onePackage;
- while (ParseOnePackage(conn->inbuffer_, &onePackage))
- {
- std::string reqStr;
- if (!deLength(onePackage, &reqStr))
- return;
- std::cout << "去掉报头的正文:\n"
- << reqStr << std::endl;
- // 2. 对请求Request,反序列化
- // 2.1 得到一个结构化的请求对象
- Request req;
- if (!req.deserialize(reqStr))
- return;
- Response resp;
- cal(req, resp);
- std::string respStr;
- resp.serialize(&respStr);
- // 5. 然后我们在发送响应
- // 5.1 构建成为一个完整的报文
- conn->outbuffer_ += enLength(respStr);//添加到自己的发送缓冲区中
- std::cout << "--------------result: " << conn->outbuffer_ << std::endl;
- }
- // 直接发
- if (conn->sender_)
- conn->sender_(conn);
- // // 如果没有发送完毕,需要对对应的sock开启对写事件的关系, 如果发完了,我们要关闭对写事件的关心!
- // if (!conn->outbuffer_.empty())
- // conn->tsp_->EnableReadWrite(conn, true, true);
- // else
- // conn->tsp_->EnableReadWrite(conn, true, false);
- }
- int main(int argc, char *argv[])
- {
- if (argc != 2)
- {
- usage(argv[0]);
- exit(USAGE_ERR);
- }
- uint16_t port = atoi(argv[1]);
- std::unique_ptr<TcpServer> tsvr(new TcpServer(calculate, port));
- tsvr->InitServer();
- tsvr->Dispatcher();
- return 0;
- }
复制代码 这就是全部代码了。
感谢阅读!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |