媒介
多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O变乱,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。
select()
select函数是系统提供的一个多路转接接口,用于让我们的程序同时监督多个文件描述符(file descriptor,简称fd)的状态变革,如读就绪、写就绪或异常。其函数原型如下:
- #include <sys/select.h>
- #include <sys/time.h>
- #include <unistd.h>
-
- int select(int nfds, fd_set *readfds, fd_set *writefds,
- fd_set *exceptfds, struct timeval *timeout);
复制代码 参数说明
- nfds:是文件描述符集合中最大文件描述符值加1。这个参数实际上被忽略,由于如今的系统不再需要它来确定文件描述符的范围。
- readfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查察它们是否预备好被读取。
- writefds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查察它们是否预备好被写入。
- exceptfds:指向文件描述符集合的指针,该集合中的文件描述符被检查以查察是否有异常条件发生(例如,带外数据到达)。
- timeout:是一个指向 timeval 布局的指针,该布局指定了函数等待的最大时间长度。如果 timeout 是 NULL,则 select() 会无穷期地等待直到至少有一个文件描述符预备就绪。
返回值
- 乐成时,select() 返回预备就绪的文件描述符的总数(不包罗 exceptfds 中的文件描述符)。
- 如果在调用时没有任何文件描述符预备就绪,并且 timeout 非空且指定的时间已过,则返回 0。
- 如果出现错误,则返回 -1,并设置 errno 以指示错误原因。
timeval是一个用于表示时间的布局体:
- #include <sys/time.h>
-
- struct timeval {
- time_t tv_sec; // 秒
- suseconds_t tv_usec; // 微秒
- };
复制代码 tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。
tv_usec:微秒数,范围从 0 到 999999。
fd_set
fd_set 实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符。通过设置或清除位的方式,可以将文件描述符添加到或从 fd_set 中移除。
- typedef struct {
- __fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];
- } fd_set;
复制代码 其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。
与此同时,系统还提供一些接口来操作fd_set:
- FD_ZERO(fd_set *set):将 fd_set 中的全部位清零,即将全部文件描述符从集合中移除。
- FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 添加到 fd_set 中。
- FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从 fd_set 中移除。
- FD_ISSET(int fd, fd_set *set):检查指定的文件描述符 fd 是否在 fd_set 中。在调用 select() 后,利用此宏来检测哪些文件描述符已预备就绪。
select的实行过程
对于实行过程,大致分为四个步骤:
1.初始化:
- 创建一个或多个fd_set布局,用于存储需要监督的文件描述符。
- 利用FD_ZERO宏清空这些fd_set布局。
- 利用FD_SET宏将需要监督的文件描述符添加到相应的fd_set布局中。
- 对于添加到位图中的变乱, 都会在位图表示成1
- fd_set rfds;
- FD_ZERO(&rfds);
- FD_SET(_fd_array[i], &rfds);
复制代码 2.调用select函数:
- 将最大的文件描述符值加1作为nfds参数。
- 将之前创建的fd_set布局作为readfds、writefds和exceptfds参数(如果需要监督相应的变乱)。
- 设置timeout参数以控制select的等待时间。
- 调用select函数。
- struct timeval timeout = {0, 0};//设置等待时间
- int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);
复制代码 3.处理返回结果
- 此时颠末select函数的实行后,已经影响了fd_set 中的之前添加进来的文件描述符,只要没有就绪的话,那么都会被清空为0
- 如果select返回大于0的值,表示有文件描述符的变乱已经就绪。
- 利用for循环对每个变乱进行检查是否就绪,
- 利用FD_ISSET宏检查哪些文件描述符的变乱已经就绪。
- 根据就绪的文件描述符实行相应的操作,如读取数据、写入数据或处理异常变乱。
- if(n>0)
- {
- for (int i = 0; i <1024; i++)
- {
- if (FD_ISSET(_fd_array[i], &rfds))
- {
- //处理对应的就绪事件
- }
- }
- }
复制代码 4.重新设置并继承监督:
- 由于select函数会修改传入的fd_set布局,因此在每次调用select之前都需要重新设置这些布局。
- 根据需要更新timeout参数。
- 重复实行上述步骤以继承监督文件描述符的状态变革。
注意事项:
- 每次调用select之前都需要重新设置fd_set布局和timeout参数。
- select函数只负责等待文件描述符的状态变革,并不负责数据的拷贝。数据的拷贝需要利用如read、write等函数来完成。
- select函数有一个限定,即它能够监督的文件描述符数量是有限的,通常取决于fd_set布局的大小(在32位系统上通常为1024个)。
利用select()建立一个Server服务器
InetAddr.hpp
包含网络地址的头文件:
- #pragma once
- #include <iostream>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <netinet/in.h>
- class InetAddr
- {
- private:
- void GetAddress(std::string *ip, uint16_t *port)
- {
- *port = ntohs(_addr.sin_port);
- *ip = inet_ntoa(_addr.sin_addr);
- }
- public:
- InetAddr(const struct sockaddr_in &addr) : _addr(addr)
- {
- GetAddress(&_ip, &_port);
- }
- InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
- {
- _addr.sin_family = AF_INET;
- _addr.sin_port = htons(_port);
- _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
- }
- InetAddr()
- {}
- std::string Ip()
- {
- return _ip;
- }
- bool operator == (const InetAddr &addr)
- {
- // if(_ip == addr._ip)
- if(_ip == addr._ip && _port == addr._port) // 方便测试
- {
- return true;
- }
- return false;
- }
- // bool operator = (const struct sockaddr_in &addr)
- // {
- // _addr = addr;
- // }
- struct sockaddr_in Addr()
- {
- return _addr;
- }
- uint16_t Port()
- {
- return _port;
- }
- ~InetAddr()
- {
- }
- private:
- struct sockaddr_in _addr;
- std::string _ip;
- uint16_t _port;
- };
复制代码 Log.hpp
打印日记的头文件:
- #pragma once
- #include<iostream>
- #include<fstream>
- #include<ctime>
- #include<cstdarg>
- #include<string>
- #include<sys/types.h>
- #include<unistd.h>
- #include<cstdio>
- #include"LockGuard.hpp"
- using namespace std;
- bool gIsSave=false;//默认输出到屏幕
- const string logname="log.txt";
- //1.日志是有等级的
- enum Level
- {
- DEBUG=0,
- INFO,
- WARNING,
- ERROR,
- FATAL
- };
- void SaveFile(const string& filename,const string& messages)
- {
- ofstream out(filename,ios::app);
- if(!out.is_open())
- {
- return;
- }
- out<<messages;
- out.close();
- }
- //等级转化为字符串
- string LevelToString(int level)
- {
- switch (level)
- {
- case DEBUG:
- return "Debug";
- case INFO:
- return "Info";
- case WARNING:
- return "Warning";
- case ERROR:
- return "Error";
- case FATAL:
- return "Fatal";
- default:
- return "Unkonwn";
- }
- }
- //获取当前时间
- string GetTimeString()
- {
- time_t curr_time=time(nullptr);//时间戳
- struct tm* format_time=localtime(&curr_time);//转化为时间结构
- if(format_time==nullptr)
- return "None";
- char time_buffer[1024];
- snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
- format_time->tm_year + 1900,
- format_time->tm_mon + 1,
- format_time->tm_mday,
- format_time->tm_hour,
- format_time->tm_min,
- format_time->tm_sec);
- return time_buffer;
- }
- pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
- //获取日志信息
- void LogMessage(string filename,int line,bool issave,int level,char* format,...)
- {
- string levelstr=LevelToString(level);
- string timestr=GetTimeString();
- pid_t selfid=getpid();
- char buffer[1024];
- va_list arg;
- va_start(arg,format);
- vsnprintf(buffer,sizeof(buffer),format,arg);
- va_end(arg);
- string message= "[" + timestr + "]" + "[" + levelstr + "]" +
- "[" + std::to_string(selfid) + "]" +
- "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
- LockGuard lockguard(&lock);
- if(!issave)
- {
- cout<<message;
- }
- else
- {
- SaveFile(logname,message);
- }
-
- }
- #define LOG(level,format,...) \
- do \
- { \
- LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__); \
- } while (0)
- #define EnableFile() \
- do \
- { \
- gIsSave=true; \
- } while (0)
- #define EnableScreen() \
- do \
- { \
- gIsSave=false; \
- } while (0)
-
复制代码 LockGuard.hpp
互斥锁的头文件:
- #ifndef __LOCK_GUARD_HPP__
- #define __LOCK_GUARD_HPP__
- #include <iostream>
- #include <pthread.h>
- class LockGuard
- {
- public:
- LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
- {
- pthread_mutex_lock(_mutex);
- }
- ~LockGuard()
- {
- pthread_mutex_unlock(_mutex);
- }
- private:
- pthread_mutex_t *_mutex;
- };
- #endif
复制代码 Socket.hpp
包含一系列Socket套接字的接口函数,另有TcpSocket专门的接口:
- #include <iostream>
- #include <string>
- #include <functional>
- #include <sys/types.h> /* See NOTES */
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <unistd.h>
- #include <cstring>
- #include <pthread.h>
- #include <sys/types.h>
- #include <memory>
- #include "InetAddr.hpp"
- #include "Log.hpp"
- namespace socket_ns
- {
- class Socket;
- const static int gbacklog=8;
- using socket_sptr=std::shared_ptr<Socket>;//套接字指针
- enum
- {
- SOCKET_ERROR = 1,
- BIND_ERROR,
- LISTEN_ERROR,
- USAGE_ERROR
- };
- //在基类创建一系列虚函数,只要派生类能用到就在这里创建
- class Socket
- {
- public:
- virtual void CreateSocketOrDie() =0; //创建套接字
- virtual void BindSocketOrDie(InetAddr& addr) =0; //绑定套接字
- virtual void ListenSocketOrDie()=0; //监听套接字
- virtual int Accepter(InetAddr* addr) =0; //接受客户端
- virtual bool Connector(InetAddr &addr) = 0; //连接客户端
- virtual void SetSocketAddrReuse() = 0; // 重启指定端口
- virtual int SockFd() = 0; //获取Sockfd
- virtual int Recv(std::string *out) = 0; //接收对方信息
- virtual int Send(const std::string &in) = 0; //发送给对方信息
- public:
- //创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
- void BuildListenSocket(InetAddr& addr)
- {
- CreateSocketOrDie();
- SetSocketAddrReuse();
- BindSocketOrDie(addr);
- ListenSocketOrDie();
- }
- bool BuildClientSocket(InetAddr &addr)
- {
- CreateSocketOrDie();
- return Connector(addr);
- }
- };
- class TcpSocket : public Socket
- {
- public:
- TcpSocket(int sockfd=-1)
- :_sockfd(sockfd)
- {}
- void CreateSocketOrDie() override //override明确的重写基类函数
- {
- _sockfd=socket(AF_INET,SOCK_STREAM,0);
- if(_sockfd<0)
- {
- LOG(FATAL, "socket error");
- exit(SOCKET_ERROR);
- }
- LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
- }
- void BindSocketOrDie(InetAddr& addr) override
- {
- struct sockaddr_in local;
- memset(&local, 0, sizeof(local));
- local.sin_family = AF_INET;
- local.sin_port = htons(addr.Port());
- local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
- int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
- if (n < 0)
- {
- LOG(FATAL, "bind error\n");
- exit(BIND_ERROR);
- }
- LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
- }
- void ListenSocketOrDie() override
- {
- int n=listen(_sockfd,gbacklog);
- if (n < 0)
- {
- LOG(FATAL, "listen error\n");
- exit(LISTEN_ERROR);
- }
- LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
- }
- int Accepter(InetAddr* addr) override
- {
- struct sockaddr_in peer;
- socklen_t len=sizeof(peer);
- int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
- if (sockfd < 0)
- {
- LOG(WARNING, "accept error\n");
- return -1;
- }
- *addr=peer;
- return sockfd;
- }
- virtual bool Connector(InetAddr& addr)
- {
- struct sockaddr_in server;
- memset(&server,0,sizeof(server));
- server.sin_family=AF_INET;
- server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
- server.sin_port=htons(addr.Port());
- int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
- if (n < 0)
- {
- std::cerr << "connect error" << std::endl;
- return false;
- }
- return true;
- }
- void SetSocketAddrReuse() override
- {
- int opt = 1;
- setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); //快速重启端口
- }
- int Recv(std::string *out) override
- {
- char inbuffer[1024];
- ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
- if (n > 0)
- {
- inbuffer[n] = 0;
- *out += inbuffer; // 接收次数可能不只一次,一般是多次的,
-
- }
- return n;
- }
- int Send(const std::string &in) override
- {
- int n = send(_sockfd,in.c_str(),in.size(),0);
- return n;
- }
- int SockFd() override
- {
- return _sockfd;
- }
- ~TcpSocket()
- {}
- private:
- int _sockfd;
- };
- }
复制代码 SelectServer.hpp
- #pragma once
- #include <iostream>
- #include <string>
- #include <memory>
- #include "Socket.hpp"
- using namespace socket_ns;
- //select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加
- class SelectServer
- {
- const static int defaultfd = -1; //默认sockfd
- const static int N = sizeof(fd_set) * 8; //监视文件描述符的最大值
- public:
- SelectServer(uint16_t port)
- :_port(port),
- _listensock(make_unique<TcpSocket>())
- {
- InetAddr addr("0", _port); //网络地址初始化
- _listensock->BuildListenSocket(addr);//创建监听套接字
- //初始化辅助数组
- for (int i = 0; i < N; i++)
- {
- _fd_array[i] = defaultfd;
- }
- _fd_array[0] = _listensock->SockFd();
- }
- void AcceptClient()
- {
- // 我们今天只关心了读,而读有:listensock 和 normal sockfd
- InetAddr clientaddr;
- int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了
- if (sockfd < 0)
- return;
- LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
- //select托管(监视状态):将新fd放入辅助数组中
- int pos = 1;
- for (; pos < N; pos++)
- {
- if (_fd_array[pos] == defaultfd)
- break;
- }//让pos到辅助数组的空缺位置
- if (pos == N)//说明监视的文件描述符满了
- {
- ::close(sockfd); // sockfd->Close();
- LOG(WARNING, "server is full!\n");
- return;
- }
- else
- {
- _fd_array[pos] = sockfd;
- LOG(DEBUG, "%d add to select array!\n", sockfd);
- }
- LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
- }
- void ServiceIO(int pos)
- {
- char buffer[1024];
- ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会
- if (n > 0)//处理接收数据
- {
- buffer[n] = 0;
- std::cout << "client say# " << buffer << std::endl;
- std::string echo_string = "[server echo]# ";
- echo_string += buffer;
- ::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);//返回给客户端
- }
- else if (n == 0)//说明对方已断开连接
- {
- LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
- ::close(_fd_array[pos]);
- _fd_array[pos] = defaultfd;
- LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
- }
- else//出现错误
- {
- LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
- ::close(_fd_array[pos]);
- _fd_array[pos] = defaultfd;
- LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
- }
- }
- //处理准备就绪的事件
- void HandlerEvent(fd_set &rfds)
- {
- for (int i = 0; i < N; i++)
- {
- if (_fd_array[i] == defaultfd)
- continue;
- if (FD_ISSET(_fd_array[i], &rfds))
- {
- if (_fd_array[i] == _listensock->SockFd())//新的连接
- {
- AcceptClient();
- }
- else
- {
- // 普通的sockfd读事件就绪
- ServiceIO(i);
- }
- }
- }
- }
-
- void Loop()
- {
- while(true)
- {
- //监听套接字在等待对方发送连接
- //新的连接 == 读事件就绪
- //要将listensock添加到select中!
- fd_set rfds; //一个记录文件描述符状态的集合
- FD_ZERO(&rfds);//将所有文件描述符移除集合
- int max_fd = defaultfd;//最大的文件描述符值
- for (int i = 0; i < N; i++)
- {
- if (_fd_array[i] == defaultfd)
- continue;
- FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
- if (max_fd < _fd_array[i])
- {
- max_fd = _fd_array[i]; // 更新出最大的fd的值
- }
- }
- struct timeval timeout = {0, 0};//设置等待时间
- int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);
- //timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪
- switch (n)
- {
- case 0://指定时间内没有任何文件描述符准备就绪
- LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
- break;
- case -1://出现错误
- LOG(ERROR, "select error...\n");
- break;
- default://成功状态
- LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!
- HandlerEvent(rfds);
- break;
- }
- }
- }
- //打印出已存在的fd
- std::string RfdsToString()
- {
- std::string fdstr;
- for (int i = 0; i < N; i++)
- {
- if (_fd_array[i] == defaultfd)
- continue;
- fdstr += std::to_string(_fd_array[i]);
- fdstr += " ";
- }
- return fdstr;
- }
- ~SelectServer()
- {
- }
- private:
- uint16_t _port; //端口号
- std::unique_ptr<Socket> _listensock;//监听socket
- int _fd_array[N]; // 辅助数组
- };
复制代码 成员:

辅助数组:由上面select实行过程可以知道,当select实行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;
初始化:

对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个变乱;所以要在初始化就添加进来;
Loop:
这里操作流程就是跟上面的实行过程是一样的,只是增长了一些细节:

N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的变乱,就需要通过循环来一个一个添加到rfds中;
select()返回值:

根据select()的返回值实行差别的代码;
这里所说的底层变乱就绪,如果没有处理已就绪的变乱,那么select就会一直监测到变乱就绪,一直实行default语句的内容;
HandlerEvent():

在for循环内里通过FD_ISSET函数找出每个已经就绪的变乱,然后再判断是不是监听变乱的;
Main.cc
- #include "SelectSever.hpp"
- #include <memory>
- // ./selectserver port
- int main(int argc, char *argv[])
- {
- if (argc != 2)
- {
- std::cout << "Usage: " << argv[0] << " port" << std::endl;
- return 0;
- }
- uint16_t port = std::stoi(argv[1]);//获取端口号
- EnableScreen();
- std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
- svr->Loop();
- return 0;
- }
复制代码 通过telnet进行测试:
开启Server服务:
telnet进行访问
Server的监听socket收到就绪的变乱,创建一个新的IO服务客户端:
客户端任意发送内容:
客户端服务端都能通过Select的底层就绪互相接收发送:
Select()的优缺点
优点
- 多路复用:select()函数能够同时监督多个文件描述符,实现I/O多路复用,从而提高了程序的并发处理本领和资源利用率。
- 简朴易用:select()函数的接口相对简朴,易于理解和利用,特别是对于初学者来说。
- 灵活性:select()函数允许程序根据文件描述符的读、写、异常等变乱进行灵活的处理,满足差别的I/O需求。
缺点
- 文件描述符数量的限定:select()函数能够监督的文件描述符数量有限,通常在Linux上默以为1024个(只管可以通过修改宏定义或重新编译内核来提升这一限定,但这样做可能会低落效率)。这对于需要监督大量文件描述符的应用程序来说是一个显著的限定。
- 性能瓶颈:当监督的文件描述符数量较多时,select()函数的性能可能会成为瓶颈。由于每次调用select()时,内核都需要扫描全部被监督的文件描述符,这会导致不须要的开销。
- 内核拷贝开销:在select()调用过程中,由于每次都要事先准本fd_set布局内容,内核与用户空间之间需要进行内存拷贝操作,以传递文件描述符集合和结果。这会增长额外的开销,并可能影响性能。
因此,在选择是否利用select()函数时,需要根据具体的应用场景和需求进行衡量。对于需要监督大量文件描述符或追求高性能的应用程序来说,可能需要思量利用更高级的I/O多路复用机制,如poll()或epoll()等。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |