谈天通讯架构
本博客基于Linux实现一个简单的谈天通讯服务,以认识Linux的网络接口。
总代码地址:[UDPsocket-简单谈天通讯]
文件结构如下:
在server文件夹中,包含三个类,分别写在三个文件中:
- InetAddr.hpp:记载通讯主机的ip和port,方便进行通讯
- UdpServer.hpp:完成服务端UDP套接字的创建,并接收来自客户端的消息
- MessageRouter:对收到的消息进行业务处理
两个文件中的main.cpp是源文件,分别编译得到服务端与客户端的可实行文件。
Server
InetAddr
- class InetAddr
- {
- private:
- struct sockaddr_in _addr;
- std::string _ip;
- uint16_t _port;
- };
复制代码 类成员:
- _addr:套接字地址
- _ip:主机地址
- _port:主机端口号
其实在_addr内部已经存储了地址与端口号,这一层封装的意义是提供更加便捷的接口来访问地址与端口。
- InetAddr(const struct sockaddr_in& addr)
- : _addr(addr)
- {
- _ip = inet_ntoa(addr.sin_addr);
- _port = ntohs(addr.sin_port);
- }
复制代码 构造函数接受一个套接字地址addr,随后初始化_ip与_port。
- 对_ip地址来说,要把四字节的序列通过inet_ntoa转化为字符串形式
- 对_port端口来说,则是要把网络字节序转化为主机字节序
- std::string ip()
- {
- return _ip;
- }
- uint16_t port()
- {
- return _port;
- }
- struct sockaddr_in addr()
- {
- return _addr;
- }
复制代码 这些接口用于外部访问类成员。
- bool operator==(const InetAddr& other) const
- {
- return _ip == other._ip && _port == other._port;
- }
复制代码 后续要完成客户的网络地址之间的身份标识,通过_ip + _port的组合,来确定一个客户,这样就可以区分前后是否是同一个人发消息。因此此处要重载operator==,辨别两个UDP报文是否是同一个客户发送的。
UdpServer
- using func_t = std::function<void(int sockfd, std::string message, InetAddr cliAddr)>;
- class UdpServer
- {
- private:
- int _sockfd;
- uint16_t _port;
- func_t _callback;
- };
复制代码 类成员:
- _sockfd:创建网络套接字得到的文件形貌符,后续通过读写该形貌符操纵网络
- _port:指定服务端监听的端口
- _callback:一个回调函数,当服务端收到消息后,调用该回调函数处理信息,这一层操纵的意义在于把UDP套接字与业务逻辑进行解耦
- enum
- {
- SOCKET_ERROR = 1, // 套接字错误
- BIND_ERROR, // 绑定错误
- };
复制代码 为例方便后续指明错误范例,此处枚举了三个错误码。
- UdpServer(uint16_t port, func_t callback)
- : _port(port)
- , _callback(callback)
- {
- _sockfd = socket(AF_INET, SOCK_DGRAM, 0); // 创建套接字
- if (_sockfd < 0)
- exit(SOCKET_ERROR);
- struct sockaddr_in addr; // 初始化套接字信息
- bzero(&addr, sizeof(addr));
- addr.sin_family = AF_INET;
- addr.sin_port = htons(_port);
- addr.sin_addr.s_addr = INADDR_ANY;
- // 绑定套接字
- int n = bind(_sockfd, (struct sockaddr*)&addr, sizeof(addr));
- if (n < 0)
- exit(BIND_ERROR);
- }
复制代码 在构造函数中,实现UDP的套接字创建。
参数:
- port:该服务开放的端口
- callback:上层处理消息的业务逻辑的回调函数
随后通过socket函数创建套接字,参数:
- AF_INET:使用ipv4通讯
- SOCK_DGRAM:使用UDP进行通讯
- 0:不消管,直接填0即可
得到一个文件形貌符_sockfd,后续通过操纵该文件形貌符进行网络通讯。
但是当前套接字还只是一个内存中的变量,操纵体系还没有进行真正的网络监听,此时要将套接字绑定起来。
起首初始化套接字地址的信息:
- bzero(&addr, sizeof(addr)); // 清空内存原有内容
- addr.sin_family = AF_INET; // 使用ipv4通信
- addr.sin_port = htons(_port); // 使用指定端口号
- addr.sin_addr.s_addr = INADDR_ANY; // 绑定地址
复制代码 此处addr.sin_addr.s_addr表示该套接字,接收来自于哪些地址的哀求,比如填入127.0.0.1,那么就只有127.0.0.1地址可以与该服务通讯。而填入0.0.0.0表示可以接收任意地址的哀求,此处INADDR_ANY就代表0.0.0.0,只不过被封装为了一个宏。
最后通过bind进行绑定,此时就创建了一个UDP套接字,基于ipv4进行通讯,监听任意地址发送的哀求。
- void start()
- {
- while (true)
- {
- char buffer[1024];
- struct sockaddr_in cliAddr;
- bzero(&cliAddr, sizeof(cliAddr));
- socklen_t len = sizeof(cliAddr);
- int n = recvfrom(_sockfd, buffer, sizeof(buffer) - 1, 0, (sockaddr*)&cliAddr, &len);
- if (n > 0)
- {
- buffer[n] = '\0';
- _callback(_sockfd, buffer, cliAddr);
- }
- }
- }
复制代码 最后写一个start函数,这个函数用于接收来自客户端的消息。通过recvfrom接口读取网络中的UDP报文,读取到buffer数组中,为了防止字符串没有末了,最后buffer[n] = '\0'添加一个字符串的终止符。因为recvfrom返回接收到的字符个数,所以最后一个字符的下标为n - 1,在下标n处补充一个'\0'。
接收到消息后,通过_callback把套接字文件形貌符_sockfd,接收到的数据buffer以及客户端信息cliAddr发送给业务层处理。
此处注意:
- using func_t = std::function<void(int sockfd, std::string message, InetAddr cliAddr)>;
复制代码 这是_callback函数的范例,其中std::string message是一个普通的std::string范例,他不是引用,也不是指向字符串的指针。因为buffer是个在栈区的数组,比及下一轮while循环,这个数组的内容就是未界说的。所以会导致指针越界,访问到错误数据等问题。因此不能使用指针或引用,而是让std::string对buffer内的数据进行一次拷贝。cliAddr同理,不是一个引用大概指针,要进行一次拷贝。
MessageRouter
MessageRouter是业务层的逻辑,接收到一条消息后,处理消息并发送回给客户端。
如图,当UdpServer接收到来自客户端的消息后,MessageRouter要把这个消息转发给其他客户端,也就是说MessageRouter的任务就是转发消息。而发送消息必要通过套接字文件形貌符,这也就是为什么在刚才的_callback要传一个_sockfd。
- class MessageRouter
- {
- private:
- std::vector<InetAddr> _online_user;
- pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
- };
复制代码 MessageRouter要维护全部的用户,所以使用一个数组来存储全部的客户端。每个客户端用一个InetAddr表示,也就是一个ip + port确定一个唯一的客户端。
因为后续要引入多线程,会出现并发访问数组的问题,此处使用一把_mutex锁来进行并发控制。
- bool addUser(const InetAddr& user)
- {
- pthread_mutex_lock(&_mutex);
- for (auto& o_user : _online_user) // 遍历数组
- {
- if (user == o_user) // 用户已存在
- {
- pthread_mutex_unlock(&_mutex);
- return false;
- }
- }
- _online_user.push_back(user); // 新增用户
- pthread_mutex_unlock(&_mutex);
- return true;
- }
复制代码 增加一个用户,就要访问数组_online_user,访问之前要加锁,访问竣事后再解锁。
访问前先遍历数组,检察是否当前用户已经存在,如果存在直接返回,返回前别忘了解锁。如果不存在,则尾插新用户到数组中。
- bool delUser(const InetAddr& user)
- {
- pthread_mutex_lock(&_mutex);
- auto it = find(_online_user.begin(), _online_user.end(), user);
- if (it == _online_user.end())
- {
- pthread_mutex_unlock(&_mutex);
- return false;
- }
- _online_user.erase(it);
- pthread_mutex_unlock(&_mutex);
- return true;
- }
复制代码 删除用户与添加用户同理,先遍历数组,如果用户不存在,直接返回。如果存在,那么删掉该用户。
- struct SendPackage
- {
- SendPackage(MessageRouter* self, int sockfd, std::string message, InetAddr cliAddr)
- : _self(self)
- , _sockfd(sockfd)
- , _message(message)
- , _cliAddr(cliAddr)
- {}
- MessageRouter* _self;
- int _sockfd;
- std::string _message;
- InetAddr _cliAddr;
- };
复制代码 为了不影响主线程接收消息,提高进行处理消息的效率,此处将消息转发的任务交给一个线程来完成。而Linux中,线程要使用一个void*(void*)范例的函数,这样就欠好将参数通报给线程了,所以要先用一个结构体将全部参数进行打包。再把指向该结构体的指针转为void*传给线程。
SendPackage是一个内部类,用于对线程所需的参数进行打包,让线程可以进行消息的转发。
- _self:指向MessageRouter的指针,因为线程要访问全部效户,也就是访问_online_user,所以要一个指针回指来访问
- _sockfd:套接字文件形貌符,进行消息转发也就是进行网络通讯,网络通讯依赖于套接字文件形貌符
- _message:要转发的消息
- _cliAddr:发送方客户端的信息
- static void* messageSender(void* args)
- {
- SendPackage* sendpkg = (SendPackage*)args;
- std::string msg = "[" + sendpkg->_cliAddr.ip() + ":"
- + std::to_string(sendpkg->_cliAddr.port()) + "]"
- + sendpkg->_message;
- std::cout << "sending..." << msg << std::endl;
- pthread_mutex_lock(&sendpkg->_self->_mutex);
- for (auto& usr : sendpkg->_self->_online_user)
- {
- struct sockaddr_in cliaddr = usr.addr();
- sendto(sendpkg->_sockfd, msg.c_str(), msg.size(), 0, (sockaddr*)&cliaddr, sizeof(cliaddr));
- }
- pthread_mutex_unlock(&sendpkg->_self->_mutex);
- delete sendpkg;
- return nullptr;
- }
复制代码 该函数是线程实行的函数,用于对消息进行转发,起首将参数void*转回SendPackage*,也就是刚刚的参数包结构体。
随后拼接字符串msg,这是要转发消息内容。格式为:
前面的[]表明这是哪一个用户发送的消息,后面是具体的消息内容。
随后服务端输出一条日记std::cout << "sending..." << msg << std::endl;,表示自己转发了这条消息。
随后访问_online_user数组,遍历全部成员,并且对通过sendto函数,进行消息转发。
转发完消息后,进行解锁,并且delete释放sendpkg,这是一个堆区上的对象,后续会讲解原因。
- void router(int sockfd, std::string message, InetAddr cliAddr)
- {
- // 首次发消息 -> 注册
- addUser(cliAddr);
- // 用户退出
- if (message == "/quit")
- {
- delUser(cliAddr);
- return;
- }
- // 线程转发消息
- SendPackage* sendpkg = new SendPackage(this, sockfd, message, cliAddr);
- pthread_t tid;
- pthread_create(&tid, 0, messageSender, messageSender);
- pthread_detach(tid);
- }
复制代码 这个函数,就是UdpSerever中回调的函数,当用户要发消息时,起首添加该用户addUser(cliAddr),如果用户已经存在,addUser函数内部也不会重复添加。
如果用户想要退出,输入"/quit",此时会进行删除delUser。
如果前面已经添加好了用户,随后就开始进行消息转发,此时对参数进行打包new SendPackage。这里要用new创建,把这个参数包创建在堆区,因为router创建完线程,就直接退出了,此时栈区中的全部数据都会销毁。那么线程就无法读取到栈区中的参数包,所以要把参数包创建在堆区,随后让线程自己释放。
创建线程时,给线程指定函数messageSender,参数messageSender,这样线程就会去调用函数,然后完成数据的转发。
最后router退出之前,先把创建的线程detach,让其自己接纳。
main
在main.cpp中,完成全部逻辑的拼接,启动整个服务。
- void useage(char* argv[])
- {
- std::cout << "useage:" << std::endl;
- std::cout << "\t" << argv[0] << " + port" << std::endl;
- }
- int main(int argc, char* argv[], char* env[])
- {
- if (argc != 2)
- {
- useage(argv);
- return 0;
- }
-
- // 主逻辑
- return 0;
- }
复制代码 起首判定用户实行该步伐的指令,用户必要指定一个端口,表示该服务使用的端口。如果没有指定,则useage输出提示用户输入一个端口号。
- uint16_t port = std::stoi(argv[1]);
- MessageRouter msgRouter;
- auto func = std::bind(&MessageRouter::router, &msgRouter, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
- UdpServer udpSvr(port, func);
- udpSvr.start();
复制代码 当确定用户输入了一个端口后,起首用port接收这个端口,从字符串转为数字。
随后把MessageRouter的router函数作为回调函数,传给udpSvr对象。由于该函数是一个类内的函数,第一个参数为this指针。因此使用bind,把第一个参数绑定为&msgRouter,也就是一个具体对象的指针。这样新函数func的范例就与UdpServer的回调函数一致。
最后传入端口号port与回调函数func,启动服务。
Client
客户端的任务很简单,只必要完成数据的发送与接收即可。此处把发送消息和接收消息交给两个差别的线程去完成。
- struct SockInfo
- {
- SockInfo(int sockfd, const struct sockaddr_in& sockaddr)
- : _sockfd(sockfd)
- , _sockaddr(sockaddr)
- {}
- int _sockfd;
- struct sockaddr_in _sockaddr;
- };
复制代码 由于要使用多线程,和之前也一样要把全部参数放到一个结构体一起传参。此处只必要把服务端的信息,以及通讯的套接字文件形貌符传送给线程。
- _sockfd:与服务端通讯的文件形貌符
- _sockaddr:服务端的套接字地址信息
- void* recvMessage(void* args)
- {
- SockInfo* sockInfo = (SockInfo*)args;
- while(true)
- {
- char buffer[1024];
- int n = recvfrom(sockInfo->_sockfd, buffer, sizeof(buffer) - 1, 0, nullptr, nullptr);
- if (n > 0)
- {
- buffer[n] = '\0';
- std::cout << buffer << std::endl;
- }
- }
- return nullptr;
- }
复制代码 起首把收到的void*参数包,转回SockInfo*。
随后进入死循环,接收来自服务端的消息。此处recvfrom的最后两个参数设为nullptr,表示不关心谁发送的消息,忽略消息发送方的地址与端口信息。因为通过sockInfo->_sockfd通讯,而这个套接字就是在和服务端通讯,无需再确认身份了。
收到消息后,直接cout << buffer << endl,输出接收到的消息。
- void* sendMessage(void* args)
- {
- SockInfo* sockInfo = (SockInfo*)args;
- std::string message;
- while(true)
- {
- std::getline(std::cin, message);
-
- sendto(sockInfo->_sockfd, message.c_str(), message.size(),
- 0, (sockaddr*)&sockInfo->_sockaddr, sizeof(sockInfo->_sockaddr));
- }
- return nullptr;
- }
复制代码 同理,解析出参数包后,进入一个死循环。每轮循环等待用户输入一个消息,随后把这个消息发送给服务端,服务端会进行消息转发。
- void usage(char* argv[])
- {
- std::cout << "Usage:\n\t";
- std::cout << argv[0] << " server_ip server_port" << std::endl;
- }
- int main(int argc, char* argv[], char* env[])
- {
- if (argc != 3)
- {
- usage(argv);
- return 0;
- }
-
- // 主逻辑
-
- return 0;
- }
复制代码 主函数中,必要用户输入一个ip和一个port,表示客户端的地址和端口,如果输入错误,调用usage提示用户。
- // 解析地址与端口
- std::string ip = argv[1];
- uint16_t port = std::stoi(argv[2]);
- // 创建套接字
- int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
- // 初始化服务端信息
- struct sockaddr_in server;
- bzero(&server, sizeof(server));
- server.sin_family = AF_INET;
- server.sin_addr.s_addr = inet_addr(ip.c_str());
- server.sin_port = htons(port);
- // 创建线程
- SockInfo sockInfo(sockfd, server);
- pthread_t sender;
- pthread_t recver;
- pthread_create(&sender, 0, sendMessage, &sockInfo);
- pthread_create(&recver, 0, recvMessage, &sockInfo);
- pthread_join(sender, nullptr);
- pthread_join(recver, nullptr);
复制代码 起首创建套接字,然后初始化服务端的信息。在sockaddr_in server中就填充了服务端的地址与端口号。
客户端无需进行bind绑定,在第一次通过sendto发送消息时,操纵体系会自动为其分配一个端口号。
随后创建两个线程,分别实行sendMessage和recvMessage,进行消息的接收与发送。
此处有一个小细节,sockInfo不是new出来的,而是直接存储在栈区的变量。这个情况与之前有所差别,之前是因为router函数创建完线程后,就直接退出了。而此处的主函数不能退出,主函数退出整个进程都终止了,所以栈区中的数据会一直存在,不必要new。
最后在main函数中通过join等待两个线程。
测试
左上角是服务端,剩余三个终端是客户端。起首右上角的终端启动,发送了一个hello,随后服务端把hello返送回给了右上角的终端。因为之前写逻辑时,只有客户端发送一次消息,服务端才会把客户端加入到_online_users中,下面两个终端没有发消息,所以服务端不知道这两个客户端存在,也就没有转发消息。
随后左下角终端发送了iammike,这个消息被转发给了右上角的终端,因为右上角的终端已经在_online_users中。
发送一段时间消息后,右上角终端输入/quit,此时服务端将其删除。最后左下角终端发送iamlisa,此时右上角终端收不到该消息了,说明右上角终端已经成功退出。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |