目录
应用层
再谈协议
网络版计算器
序列化与反序列化
重新明确read/write/recv/send以及tcp为什么支持全双工
代码实现
应用层
现实上,我们在网络部分写的所有代码都属于应用层,应用层必要协议,之前我们约定过双发发过来的都是字符串、单词、聊天系统、命令行等都是提前约定好的协议。
再谈协议
协议的本质就是一种约定,之前我们在利用socket读写数据时,都是按字符串的方式发送和接收,如果我们想传一些“结构化的数据”该怎么办呢?
实在,协议就是双方约定好的结构化的数据。
网络版计算器
例如,我们想要实现一个网络版的计算器,客户端必要把要计算的两个数发到服务器,然后由服务器去计算,最后把计算效果返回给客户端。
我们创建Protocol.hpp这个文件,里面界说好这两个结构体:
- //struct request req = {10, 20, '+'}
- struct request
- {
- int x;
- int y;
- char oper; // + - * / %
- };
- struct response
- {
- int result;
- int code; // 0:success 1:div zero 2:非法操作
- };
复制代码 struct request是客户端发送过来的数据,reponse是服务器给客户端返回的计算效果。将来让客户端和服务器都包罗这个文件,那它们就认识这两个结构体了。
- 发送数据时将这个结构体按照一个规则转换成字符串, 接收到数据的时候再按照雷同的规则把字符串转化回结构体。这个过程叫做序列化与反序列化。
为什么肯定要对发送数据举行序列化呢?直接把结构体传过去,然后用结构体接收不可以吗?缘故原由有以下几点:
- 将来可能是不同的OS平台举行通信,比如linux和windows,安卓ios和linux等,对同一个结构体可能存在内存对齐不划一、大小不一样的题目。也有可能服务器利用C++写的,而客户端可能是python、Java写的,它们界说结构体时范例大小可能都不一样。
- 即便我们之前在技术上克服了上面的题目,应用层的协议可能不断变革,一旦发生变革,跨平台等题目标处置惩罚要重新测试验证。这样带来的题目是可扩展性非常不好。
所以,我们严重不推荐直接发结构体等数据。
序列化与反序列化
现实上我们在聊天的时候,一条消息由多行构成,包括消息内容、发送时间、用户名等。在发送消息时,为了便于发送,必要举行序列化,把信息由多变一,酿成一行字符串,方便网络发送。然后通过网络,将这一行字符串推动到服务器,然后在客户端把一行字符串按照协议反序列化为结构化的数据,在上层就可以拿到message里的每一个内容。
在网络层,无论利用tcp/udp,它们都不关心发过来的是什么,只知道这是一个个字节构成的字节省就行了,解释工作由上层来做。
重新明确read/write/recv/send以及tcp为什么支持全双工
在客户端和服务端通信的时候,双方都要有套接字。当我们创建套接字时,tcp里会给我们创建两段缓冲区,即发送缓冲区和接收缓冲区。之前写tcp时,会accept新毗连,一个文件形貌符fd就代表一个毗连,一个毗连,对应两个缓冲区。我们之前用的write/send,本质是把数据拷贝到TCP的发送缓冲区,同样地,read/recv读数据,本质上是把缓冲区内部的数据拷贝到应用层。
- 所以,read、write、recv、send本质都是拷贝函数!
- 发数据的本质是从发送方的发送缓冲区的内容经过协议栈和网络拷贝给接收方的接收缓冲区。
- 由于TCP有两个缓冲区,可以同时对缓冲区分别举行读写,这就是TCP支持全双工通信的缘故原由。
当缓冲区没数据时,系统调用read函数会阻塞,进程被挂起等候,由R状态设为S,等有数据时再把进程唤醒。
那缓冲区里的数据什么时候发,发多少,出错了怎么办?这实在是由TCP协议自主来决定,这也就是TCP协议叫做传输控制协议的缘故原由!发送数据的本质就相称于文件缓冲区的数据革新到磁盘,只不外从前外设是磁盘,今天外设是网络。由于传输层和网络层属于OS,那缓冲区里的数据什么时候发,发多少,出错了怎么办这些题目本质也是由OS主动去处置惩罚的,换句话说,用户只是把数据交给OS,剩下的工作由OS自主完成。这未免和文件操作很相似。
这种有人把数据放入到缓冲区,有人把数据从缓冲区里拿出来,这实在是生产者消费者模子!
- 为什么IO函数要阻塞?本质就是在维护同步关系!这是从系统角度明确网络。
下面是一个完整的从用户层到数据链路层的通信模子:
TCP是面向字节省的,客户端发的,不肯定全部是服务器收的(类似发送自来水,你接自来水可能用杯子、用碗盆接。对方发了一次,你可能要接好多次,对方发了好多次,你一次就接完了)。
所以,怎么包管服务器从接收缓冲区读到的是一个完整的请求呢?必要分割完整的报文。
代码实现
学到这里,我们为了之后创建套接字方便,必要对创建套接字举行封装,利用模版模式方法。模板方法模式的焦点思想是在一个父类中界说一个算法的框架,而将算法中的某些步调延迟到子类中实现。模板方法模式的主要内容是:
1.创建一个抽象基类,里面包罗一个或多个纯虚函数(这些函数在子类中实现),以及一个或多个具体实现的函数(这些函数将调用纯虚函数,形成算法的框架)。
2.在子类中实现抽象基类中的纯虚函数,从而提供算法中不同步调的具体实现。
3.在客户端代码中,通过抽象基类的指针或引用来调用算法框架,实现多态性。
- enum
- {
- SOCKTE_ERROR = 1,
- BIND_ERROR,
- LISTEN_ERROR
- };
- const static int gbcklog = 8;
- namespace socket_ns
- {
- using SockSPtr = std::shared_ptr<Socket>;
- //模版模式方法
- class Socket
- {
- public:
- virtual void CreateSocketOrDie() = 0;
- virtual void CreateBindOrDie(uint16_t port) = 0;
- virtual void CreateListenOrDie(int gbcklog = gbcklog) = 0;
- virtual SockSPtr Accepter(InetAddr* cliaddr) = 0;
- virtual bool Connector(const std::string& peerip, uint16_t peerport) = 0;
- public:
- void BuildClientSocket(const std::string& peerip, uint16_t peerport)
- {
- CreateSocketOrDie();
- Connector(peerip, peerport);
- }
- void BuildListenSocket(uint16_t port)
- {
- CreateSocketOrDie();
- CreateBindOrDie(port);
- CreateListenOrDie();
- }
- };
- class TcpSocket : public Socket
- {
- public:
- TcpSocket()
- {
- }
- TcpSocket(int sockfd):_sockfd(sockfd)
- {
- }
- void CreateSocketOrDie() override
- {
- //1.创建socket
- _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
- if(_sockfd < 0)
- {
- LOG(FATAL, "socket create error\0");
- exit(SOCKTE_ERROR);
- }
- LOG(INFO,"socket create success, sockfd:%d\n", _sockfd);
- }
- void CreateBindOrDie(uint16_t port) override
- {
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(port);
- local.sin_addr.s_addr = INADDR_ANY;
- //2.bind sockfd 和sockaddr
- if(::bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
- {
- LOG(FATAL, "bind error\n");
- exit(BIND_ERROR);
- }
- LOG(INFO, "bind success, sockfd:%d\n",_sockfd);
- }
- void CreateListenOrDie(int gbcklog = gbcklog)override
- {
- //3.因为tcp是面向连接的,tcp需要未来不断能够获取到连接
- if(::listen(_sockfd, gbcklog) < 0)
- {
- LOG(FATAL, "listen error\n");
- exit(LISTEN_ERROR);
- }
- LOG(INFO, "listen success\n");
- }
- SockSPtr Accepter(InetAddr* cliaddr) override
- {
- struct sockaddr_in client;
- socklen_t len = sizeof(client);
- //4.获取新连接
- int sockfd = ::accept(_sockfd, (struct sockaddr*)&client, &len);
- if(sockfd < 0)
- {
- LOG(WARNING, "accept error\n");
- return nullptr;
- }
- *cliaddr = InetAddr(client);
- LOG(INFO, "get a new link, client info: %s, sockfd is %d\n", cliaddr->AddrStr().c_str(), sockfd);
- return std::make_shared<TcpSocket>(sockfd);
- }
- bool Connector(const std::string& peerip, uint16_t peerport) override
- {
- struct sockaddr_in server;
- memset(&server,0, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_port = htons(peerport);
- ::inet_pton(AF_INET, peerip.c_str(), &server.sin_addr);
- int n = ::connect(_sockfd, (struct sockaddr*)&server, sizeof(server));
- if(n < 0)
- {
- return false;
- }
- return true;
- }
- private:
- int _sockfd; //可以是监听套接字,也可以是普通套接字
- };
- }
复制代码 首先我们先计划一个Socket基类,里面有多少个虚函数,并且计划了计划了建立监听套接字和客户端套接字函数,对这些虚函数举行了封装。
在Protocol.hpp中,我们要界说好协议:
- //struct request req = {10, 20, '+'}
- struct request
- {
- int x;
- int y;
- char oper; // + - * / %
- };
- struct response
- {
- int result;
- int code; // 0:success 1:div zero 2:非法操作
- };
复制代码 在Request中,客户端将来要将请求发给别人,因此要序列化。服务器也要读到请求,因此要反序列化。在Response中,服务器要发相应给客户端,因此要序列化。客户端要读到相应,因此要反序列化。
序列化的目标是将结构化的字段转化为字符串,反序列化的目标是将字符串转化为结构化的字段。要把结构化的字段转化为字符串,既可以手动,也可以利用现成的工具。我们选择jsoncpp库来实现序列化和反序列化,主要缘故原由是简单并且效果可视。
接下来,我们先来看一下序列化的效果:
这种{"oper":43,"x":111,"y":222}的字符串,我们称之为json串。
别的,我们还可以再json中套json,比如:
我们再来看看怎样反序列化:
在做完Request的序列化和反序列化后,我们可以类似做一些Response的序列化和反序列化:
- class Response
- {
- public:
- Response()
- {
- }
- bool Serialize(std::string* out)
- {
- Json::Value root;
- root["result"] = _result;
- root["code"] = _code;
- root["_desc"] = _desc;
- Json::FastWriter writer;
- std::string s = writer.write(root); // 把结构化的字段root转换为字符串
- *out = s;
- return true;
- }
- bool Deserialize(const std::string& in)
- {
- Json::Value root;
- Json::Reader reader;
- bool res = reader.parse(in, root);
- if(!res) return false;
- _result = root["result"].asInt();
- _code = root["code"].asInt();
- _desc = root["desc"].asString();
- return true;
- }
- ~Response()
- {
- }
- private:
- int _result;
- int _code; // 0:success 1:div zero 2:非法操作
- std::string _desc;
- };
复制代码 可是,我们怎么包管读到的是一个完整的报文呢?如果是不完整的报文怎么办?所以,我们要继续计划一下协媾和报文的完整格式:
- "len"\r\n"{json}"\r\n -- 完整的报文
复制代码 此中,len是一个报文有效载荷的长度(json串的长度),我们可以先根据\r\n的位置读到"len",知道有效载荷的长度之后,就知道接下来必要读取多少字节了。如果后面的json串不完整(长度小于len),就不处置惩罚,等客户端再发一点再处置惩罚。第一个\r\n是为了区分len和json串,第二个\r\n临时没有其他用处,是为了打印方便,便于debug。
为了统一成这样的格式,我们首先要对json串举行处置惩罚:
- std::string Encode(const std::string& jsonstr)
- {
- int len = jsonstr.size();
- std::string lenstr = std::to_string(len);
- return lenstr + sep + jsonstr + sep;
- }
复制代码 然后必要对字符串举行解码,然而传过来的字符串可能会出现如下环境:
- // "le
- // "len"
- // "len"\r\n"{json}"\r\n
- // "len"\r\n"{j
- // "len"\r\n"{json}"\r\n// "len"\r\n"{js
- // "len"\r\n"{json}"\r\n// "len"\r\n"{json}"\r\n
- // "len"\r\n"{json}"\r\n// "len"\r\n"{json}"\r\n// "len"\r\n"{json}"\r\n// "len"\r\n"{json}"\r\
复制代码 如果传过来的字符串有完整的,就提取完整的,否则什么都不做,继续去读,对字符串举行读取:
- std::string Decode(std::string& packagestream)
- {
- auto pos = packagestream.find(sep);
- if(pos == std::string::npos) return std::string();
- std::string lenstr = packagestream.substr(0, pos);
- int len = std::stoi(lenstr);
- //计算一个完整的报文应该是多长
- int total = lenstr.size() + len + 2 * sep.size();
- if(packagestream.size() < total) return std::string();
- std::string jsonstr = packagestream.substr(pos+sep.size(), len);
- packagestream.erase(total);//这就是Decode参数不能带const的原因,当提取完一个报文之后,要删掉
- return jsonstr;
- }
复制代码 如果返回字符串,那肯定是一个完整的json串,否则返回空字符串。
对应的IOService如下:
- using process_t = std::function<std::shared_ptr<Response>(std::shared_ptr<Request>)>;
- class IOService
- {
- public:
- IOService(process_t process):_process(process)
- {}
- void IOExcute(SockSPtr sock, InetAddr& addr)
- {
- std::string packagestreamqueue;
- while(true)
- {
- //1.负责读取
- ssize_t n = sock->Recv(&packagestreamqueue);
- if(n <= 0)
- {
- LOG(INFO, "client %s quit or recv error\n", addr.AddrStr().c_str());
- break;
- }
- //我们能保证读到的是一个完整的报文吗?不能!
- //2.报文解析,提取报头和有效载荷
- std::string package = Decode(packagestreamqueue);
- if(package.empty()) continue;
- //我们能保证读到的是一个完整的报文吗?能!
- auto req = Factory::BuildRequestDafault();
- //3.反序列化
- req->Deserialize(package);
- //4.业务处理
- auto resp = _process(req); // 通过请求,得到应答
- //5.序列化应答
- std::string respjson;
- resp->Serialize(&respjson);
- //6.添加len长度报头
- respjson = Encode(respjson);
- //7.发送回去
- sock->Send(respjson);
- }
- }
- ~IOService()
- {}
- private:
- process_t _process;
- };
复制代码 _process用于处置惩罚业务,我们这里是一个计算器。为此,我们要实现一个计算器NetCal.hpp:
- #pragma once
- #include "Protocol.hpp"
- #include <memory>
- class NetCal
- {
- public:
- NetCal()
- {
- }
- ~NetCal()
- {
- }
- std::shared_ptr<Response> Calculator(std::shared_ptr<Request> req)
- {
- auto resp = Factory::BuildResponseDafault();
- switch (req->Oper())
- {
- case '+':
- resp->_result = req->X() + req->Y();
- break;
- case '-':
- resp->_result = req->X() - req->Y();
- break;
- case '*':
- resp->_result = req->X() * req->Y();
- break;
- case '/':
- {
- if (req->Y() == 0)
- {
- resp->_code = 1;
- resp->_desc = "div zero";
- }
- else
- {
- resp->_result = req->X() / req->Y();
- }
- }
- break;
- case '%':
- {
- if (req->Y() == 0)
- {
- resp->_code = 2;
- resp->_desc = "mod zero";
- }
- else
- {
- resp->_result = req->X() / req->Y();
- }
- }
- break;
- default:
- {
- resp->_code = 3;
- resp->_desc = "illegal operation";
- }
- break;
- }
- return resp;
- }
- };
复制代码 在main函数中:
- int main(int argc, char* argv[])
- {
- if(argc != 2)
- {
- std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;
- exit(0);
- }
- uint16_t port = std::stoi(argv[1]);
- NetCal cal;
- IOService service(std::bind(&NetCal::Calculator, &cal, std::placeholders::_1));
-
- std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(std::bind(&IOService::IOExcute, &service, std::placeholders::_1,std::placeholders::_2),port);
- tsvr->Loop();
- return 0;
- }
复制代码 在我们的代码中,手动划分了三层,为什么可以清晰地划分成三层呢?实在这对应OSI七层模子里的会话层、表示层、应用层。
在客户端主函数数实现中,我们这样做:
- int main(int argc, char *argv[])
- {
- if (argc != 3)
- {
- std::cerr << "Usage: " << argv[0] << "server_ip server_port" << std::endl;
- exit(0);
- }
- std::string server_ip = argv[1];
- uint16_t server_port = std::stoi(argv[2]);
- SockSPtr sock = std::make_shared<TcpSocket>();
- if (!sock->BuildClientSocket(server_ip, server_port))
- {
- std::cerr << "connect error" << std::endl;
- exit(1);
- }
- srand(time(nullptr) ^ getpid());
- const static std::string opers = "+-*/%?!";
- std::string packagestringstream;
- while (true)
- {
- int x = rand() % 10;
- usleep(x * 1000);
- int y = rand() % 10;
- usleep(x * y * 1000);
- char oper = opers[y % opers.size()];
- // 构建请求
- auto req = Factory::BuildRequestDafault();
- req->SetValue(x, y, oper);
- // 1.序列化
- std::string jsonstr;
- req->Serialize(&jsonstr);
- // 2.添加长度报头字段
- std::string reqstr = Encode(jsonstr);
- std::cout << "request string :\n" << reqstr << std::endl;
- // 3.发送数据
- sock->Send(reqstr);
- while (true)
- {
- // 4.读取应答
- ssize_t n = sock->Recv(&packagestringstream);
- if (n < 0)
- {
- break;
- }
- //5.报文解析,提取报头和有效载荷
- std::string package = Decode(packagestringstream);
- if (package.empty())
- {
- continue;
- }
- //6.反序列化
- auto resp = Factory::BuildResponseDafault();
- resp->Deserialize(package);
- //7.打印结果
- resp->PrintResult();
- break;
- }
- sleep(1);
- }
- sock->Close();
- return 0;
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |