IT评测·应用市场-qidao123.com

标题: Linux--多路转接之select [打印本页]

作者: 万有斥力    时间: 2024-10-14 06:07
标题: Linux--多路转接之select
媒介

多路转接(也称为多路复用)是一种用于管理多个I/O通道的技术,它能实现同时监听和处理多个I/O变乱,而不是为每个I/O通道创建单独的线程或进程。其中,select是实现多路转接的一种常用方法。
select()

select函数是系统提供的一个多路转接接口,用于让我们的程序同时监督多个文件描述符(file descriptor,简称fd)的状态变革,如读就绪、写就绪或异常。其函数原型如下:
  1. #include <sys/select.h>  
  2. #include <sys/time.h>  
  3. #include <unistd.h>  
  4.   
  5. int select(int nfds, fd_set *readfds, fd_set *writefds,  
  6.            fd_set *exceptfds, struct timeval *timeout);
复制代码
  参数说明
  
    返回值
  
    timeval是一个用于表示时间的布局体:
  1. #include <sys/time.h>  
  2. struct timeval {  
  3.    time_t tv_sec;  // 秒  
  4.    suseconds_t tv_usec;  // 微秒  
  5. };
复制代码
tv_sec:秒数,从 Unix 纪元(1970 年 1 月 1 日 00:00:00 UTC)开始计算的秒数(时间戳)。
tv_usec:微秒数,范围从 0 到 999999。
  fd_set

fd_set 实际上是一个位图(Bitmask)或位向量(Bitvector),它的每一位代表一个文件描述符。通过设置或清除位的方式,可以将文件描述符添加到或从 fd_set 中移除。
  1. typedef struct {  
  2.     __fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];  
  3. } fd_set;
复制代码
其中,__fd_mask 通常是一个整数类型(如 long),__FD_SETSIZE 定义了 fd_set 中可以包含的最大文件描述符数量(通常是 1024),__NFDBITS 是 __fd_mask 中包含的位数。
   与此同时,系统还提供一些接口来操作fd_set:
  
  select的实行过程

对于实行过程,大致分为四个步骤:
   1.初始化
  
  1. fd_set rfds;
  2. FD_ZERO(&rfds);
  3. FD_SET(_fd_array[i], &rfds);
复制代码
   2.调用select函数
  
  1. struct timeval timeout = {0, 0};//设置等待时间
  2. int n=select(max_fd+1,&rfds,nullptr,nullptr,&timeout);
复制代码
   3.处理返回结果
  
  1. if(n>0)
  2. {
  3.          for (int i = 0; i <1024; i++)
  4.      {   
  5.              if (FD_ISSET(_fd_array[i], &rfds))
  6.                    {
  7.                     //处理对应的就绪事件
  8.                   }
  9.          }
  10. }
复制代码
   4.重新设置并继承监督
  
  1. while(1)
  2. {
  3.         //包含以上内容
  4. }
复制代码
   注意事项
  
  利用select()建立一个Server服务器

InetAddr.hpp

包含网络地址的头文件:
  1. #pragma once
  2. #include <iostream>
  3. #include <sys/types.h>
  4. #include <sys/socket.h>
  5. #include <arpa/inet.h>
  6. #include <netinet/in.h>
  7. class InetAddr
  8. {
  9. private:
  10.     void GetAddress(std::string *ip, uint16_t *port)
  11.     {
  12.         *port = ntohs(_addr.sin_port);
  13.         *ip = inet_ntoa(_addr.sin_addr);
  14.     }
  15. public:
  16.     InetAddr(const struct sockaddr_in &addr) : _addr(addr)
  17.     {
  18.         GetAddress(&_ip, &_port);
  19.     }
  20.     InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
  21.     {
  22.         _addr.sin_family = AF_INET;
  23.         _addr.sin_port = htons(_port);
  24.         _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
  25.     }
  26.     InetAddr()
  27.     {}
  28.     std::string Ip()
  29.     {
  30.         return _ip;
  31.     }
  32.     bool operator == (const InetAddr &addr)
  33.     {
  34.         // if(_ip == addr._ip)
  35.         if(_ip == addr._ip && _port == addr._port) // 方便测试
  36.         {
  37.             return true;
  38.         }
  39.         return false;
  40.     }
  41.     // bool operator = (const struct sockaddr_in &addr)
  42.     // {
  43.     //     _addr = addr;
  44.     // }
  45.     struct sockaddr_in Addr()
  46.     {
  47.         return _addr;
  48.     }
  49.     uint16_t Port()
  50.     {
  51.         return _port;
  52.     }
  53.     ~InetAddr()
  54.     {
  55.     }
  56. private:
  57.     struct sockaddr_in _addr;
  58.     std::string _ip;
  59.     uint16_t _port;
  60. };
复制代码
Log.hpp

打印日记的头文件:
  1. #pragma once
  2. #include<iostream>
  3. #include<fstream>
  4. #include<ctime>
  5. #include<cstdarg>
  6. #include<string>
  7. #include<sys/types.h>
  8. #include<unistd.h>
  9. #include<cstdio>
  10. #include"LockGuard.hpp"
  11. using namespace std;
  12. bool gIsSave=false;//默认输出到屏幕
  13. const string logname="log.txt";
  14. //1.日志是有等级的
  15. enum Level
  16. {
  17.     DEBUG=0,
  18.     INFO,
  19.     WARNING,
  20.     ERROR,
  21.     FATAL
  22. };
  23. void SaveFile(const string& filename,const string& messages)
  24. {
  25.     ofstream out(filename,ios::app);
  26.     if(!out.is_open())
  27.     {
  28.         return;
  29.     }
  30.     out<<messages;
  31.     out.close();
  32. }
  33. //等级转化为字符串
  34. string LevelToString(int level)
  35. {
  36.     switch (level)
  37.     {
  38.     case DEBUG:
  39.         return "Debug";
  40.     case INFO:
  41.         return "Info";
  42.     case WARNING:
  43.         return "Warning";
  44.     case ERROR:
  45.         return "Error";
  46.     case FATAL:
  47.         return "Fatal";
  48.     default:
  49.         return "Unkonwn";
  50.     }
  51. }
  52. //获取当前时间
  53. string GetTimeString()
  54. {
  55.     time_t curr_time=time(nullptr);//时间戳
  56.     struct tm* format_time=localtime(&curr_time);//转化为时间结构
  57.     if(format_time==nullptr)
  58.         return "None";
  59.     char time_buffer[1024];
  60.     snprintf(time_buffer,sizeof(time_buffer),"%d-%d-%d %d:%d:%d",
  61.              format_time->tm_year + 1900,
  62.              format_time->tm_mon + 1,
  63.              format_time->tm_mday,
  64.              format_time->tm_hour,
  65.              format_time->tm_min,
  66.              format_time->tm_sec);
  67.     return time_buffer;
  68. }
  69. pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
  70. //获取日志信息
  71. void LogMessage(string filename,int line,bool issave,int level,char* format,...)
  72. {
  73.     string levelstr=LevelToString(level);
  74.     string timestr=GetTimeString();
  75.     pid_t selfid=getpid();
  76.     char buffer[1024];
  77.     va_list arg;
  78.     va_start(arg,format);
  79.     vsnprintf(buffer,sizeof(buffer),format,arg);
  80.     va_end(arg);
  81.     string message= "[" + timestr + "]" + "[" + levelstr + "]" +
  82.                           "[" + std::to_string(selfid) + "]" +
  83.                           "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
  84.     LockGuard lockguard(&lock);
  85.     if(!issave)
  86.     {
  87.         cout<<message;
  88.     }
  89.     else
  90.     {
  91.         SaveFile(logname,message);
  92.     }
  93.                                                         
  94. }
  95. #define LOG(level,format,...)                                               \
  96.     do                                                                        \
  97.     {                                                                          \
  98.         LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);       \
  99.     } while (0)
  100. #define EnableFile()         \
  101.     do                       \
  102.     {                        \
  103.         gIsSave=true;        \  
  104.     } while (0)
  105. #define EnableScreen()         \
  106.     do                         \
  107.     {                          \
  108.         gIsSave=false;         \  
  109.     } while (0)
  110.    
复制代码
LockGuard.hpp

互斥锁的头文件:
  1. #ifndef __LOCK_GUARD_HPP__
  2. #define __LOCK_GUARD_HPP__
  3. #include <iostream>
  4. #include <pthread.h>
  5. class LockGuard
  6. {
  7. public:
  8.     LockGuard(pthread_mutex_t* mutex):_mutex(mutex)
  9.     {
  10.         pthread_mutex_lock(_mutex);
  11.     }
  12.     ~LockGuard()
  13.     {
  14.         pthread_mutex_unlock(_mutex);
  15.     }
  16. private:
  17.     pthread_mutex_t *_mutex;
  18. };
  19. #endif
复制代码
Socket.hpp

包含一系列Socket套接字的接口函数,另有TcpSocket专门的接口:
  1. #include <iostream>
  2. #include <string>
  3. #include <functional>
  4. #include <sys/types.h> /* See NOTES */
  5. #include <sys/socket.h>
  6. #include <netinet/in.h>
  7. #include <arpa/inet.h>
  8. #include <unistd.h>
  9. #include <cstring>
  10. #include <pthread.h>
  11. #include <sys/types.h>
  12. #include <memory>
  13. #include "InetAddr.hpp"
  14. #include "Log.hpp"
  15. namespace socket_ns
  16. {
  17.     class Socket;
  18.     const static int gbacklog=8;
  19.     using socket_sptr=std::shared_ptr<Socket>;//套接字指针
  20.     enum
  21.     {
  22.         SOCKET_ERROR = 1,
  23.         BIND_ERROR,
  24.         LISTEN_ERROR,
  25.         USAGE_ERROR
  26.     };
  27.     //在基类创建一系列虚函数,只要派生类能用到就在这里创建
  28.     class Socket
  29.     {
  30.     public:
  31.         virtual void CreateSocketOrDie() =0; //创建套接字
  32.         virtual void BindSocketOrDie(InetAddr& addr) =0;  //绑定套接字
  33.         virtual void ListenSocketOrDie()=0; //监听套接字
  34.         virtual int Accepter(InetAddr* addr) =0; //接受客户端
  35.         virtual bool Connector(InetAddr &addr) = 0; //连接客户端
  36.         virtual void SetSocketAddrReuse() = 0; // 重启指定端口
  37.         virtual int SockFd() = 0; //获取Sockfd
  38.         virtual int Recv(std::string *out) = 0; //接收对方信息
  39.         virtual int Send(const std::string &in) = 0; //发送给对方信息
  40.     public:
  41.         //创建监听套接字,将一系列操作细分化,直接引用对应函数直接创建
  42.         void BuildListenSocket(InetAddr& addr)
  43.         {
  44.             CreateSocketOrDie();
  45.             SetSocketAddrReuse();
  46.             BindSocketOrDie(addr);
  47.             ListenSocketOrDie();
  48.         }
  49.         bool BuildClientSocket(InetAddr &addr)
  50.         {
  51.             CreateSocketOrDie();
  52.             return Connector(addr);
  53.         }
  54.     };
  55.     class TcpSocket : public Socket
  56.     {
  57.     public:
  58.         TcpSocket(int sockfd=-1)
  59.         :_sockfd(sockfd)
  60.         {}
  61.         void CreateSocketOrDie() override  //override明确的重写基类函数
  62.         {
  63.             _sockfd=socket(AF_INET,SOCK_STREAM,0);
  64.             if(_sockfd<0)
  65.             {
  66.                 LOG(FATAL, "socket error");
  67.                 exit(SOCKET_ERROR);
  68.             }
  69.             LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
  70.         }
  71.         void BindSocketOrDie(InetAddr& addr) override
  72.         {
  73.             struct sockaddr_in local;
  74.             memset(&local, 0, sizeof(local));
  75.             local.sin_family = AF_INET;
  76.             local.sin_port = htons(addr.Port());
  77.             local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
  78.             int n=bind(_sockfd,(struct sockaddr*)&local,sizeof(local));
  79.             if (n < 0)
  80.             {
  81.                 LOG(FATAL, "bind error\n");
  82.                 exit(BIND_ERROR);
  83.             }
  84.             LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
  85.         }
  86.         void ListenSocketOrDie() override
  87.         {
  88.             int n=listen(_sockfd,gbacklog);
  89.             if (n < 0)
  90.             {
  91.                 LOG(FATAL, "listen error\n");
  92.                 exit(LISTEN_ERROR);
  93.             }
  94.             LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
  95.         }
  96.         int Accepter(InetAddr* addr) override
  97.         {
  98.             struct sockaddr_in peer;
  99.             socklen_t len=sizeof(peer);
  100.             int sockfd = accept(_sockfd,(struct sockaddr*)&peer,&len);
  101.             if (sockfd < 0)
  102.             {
  103.                 LOG(WARNING, "accept error\n");
  104.                 return -1;
  105.             }
  106.             *addr=peer;
  107.             return sockfd;
  108.         }
  109.         virtual bool Connector(InetAddr& addr)
  110.         {
  111.             struct sockaddr_in server;
  112.             memset(&server,0,sizeof(server));
  113.             server.sin_family=AF_INET;
  114.             server.sin_addr.s_addr=inet_addr(addr.Ip().c_str());
  115.             server.sin_port=htons(addr.Port());
  116.             int n=connect(_sockfd,(struct sockaddr*)&server,sizeof(server));
  117.             if (n < 0)
  118.             {
  119.                 std::cerr << "connect error" << std::endl;
  120.                 return false;
  121.             }
  122.             return true;
  123.         }
  124.         void SetSocketAddrReuse() override
  125.         {
  126.             int opt = 1;
  127.             setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); //快速重启端口
  128.         }
  129.         int Recv(std::string *out) override
  130.         {
  131.             char inbuffer[1024];
  132.             ssize_t n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);
  133.             if (n > 0)
  134.             {
  135.                 inbuffer[n] = 0;
  136.                 *out += inbuffer; // 接收次数可能不只一次,一般是多次的,
  137.                
  138.             }
  139.             return n;
  140.         }
  141.         int Send(const std::string &in) override
  142.         {
  143.             int n = send(_sockfd,in.c_str(),in.size(),0);
  144.             return n;
  145.         }
  146.         int SockFd() override
  147.         {
  148.             return _sockfd;
  149.         }
  150.         ~TcpSocket()
  151.         {}
  152.     private:
  153.         int _sockfd;
  154.     };
  155. }
复制代码
SelectServer.hpp

  1. #pragma once
  2. #include <iostream>
  3. #include <string>
  4. #include <memory>
  5. #include "Socket.hpp"
  6. using namespace socket_ns;
  7. //select服务器要正确的编写,需要借助一个第三方数组来完成,保存合法的,所有的fd到数组中,方便后期批量化统一添加
  8. class SelectServer
  9. {
  10.     const static int defaultfd = -1; //默认sockfd
  11.     const static int N = sizeof(fd_set) * 8; //监视文件描述符的最大值
  12. public:
  13.     SelectServer(uint16_t port)
  14.     :_port(port),
  15.     _listensock(make_unique<TcpSocket>())
  16.     {
  17.         InetAddr addr("0", _port); //网络地址初始化
  18.         _listensock->BuildListenSocket(addr);//创建监听套接字
  19.         //初始化辅助数组
  20.         for (int i = 0; i < N; i++)
  21.         {
  22.             _fd_array[i] = defaultfd;
  23.         }
  24.         _fd_array[0] = _listensock->SockFd();
  25.     }
  26.     void AcceptClient()
  27.     {
  28.         // 我们今天只关心了读,而读有:listensock 和 normal sockfd
  29.         InetAddr clientaddr;
  30.         int sockfd = _listensock->Accepter(&clientaddr); // 这里调用accept会不会阻塞呢??不会。因为事件已经就绪了
  31.         if (sockfd < 0)
  32.             return;
  33.         LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
  34.         //select托管(监视状态):将新fd放入辅助数组中
  35.         int pos = 1;
  36.         for (; pos < N; pos++)
  37.         {
  38.             if (_fd_array[pos] == defaultfd)
  39.                 break;
  40.         }//让pos到辅助数组的空缺位置
  41.         if (pos == N)//说明监视的文件描述符满了
  42.         {
  43.             ::close(sockfd); // sockfd->Close();
  44.             LOG(WARNING, "server is full!\n");
  45.             return;
  46.         }
  47.         else
  48.         {
  49.             _fd_array[pos] = sockfd;
  50.             LOG(DEBUG, "%d add to select array!\n", sockfd);
  51.         }
  52.         LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
  53.     }
  54.     void ServiceIO(int pos)
  55.     {
  56.         char buffer[1024];
  57.         ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); // 这里读取会不会被阻塞?不会
  58.         if (n > 0)//处理接收数据
  59.         {
  60.             buffer[n] = 0;
  61.             std::cout << "client say# " << buffer << std::endl;
  62.             std::string echo_string = "[server echo]# ";
  63.             echo_string += buffer;
  64.             ::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);//返回给客户端
  65.         }
  66.         else if (n == 0)//说明对方已断开连接
  67.         {
  68.             LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
  69.             ::close(_fd_array[pos]);
  70.             _fd_array[pos] = defaultfd;
  71.             LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
  72.         }
  73.         else//出现错误
  74.         {
  75.             LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
  76.             ::close(_fd_array[pos]);
  77.             _fd_array[pos] = defaultfd;
  78.             LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
  79.         }
  80.     }
  81.     //处理准备就绪的事件
  82.     void HandlerEvent(fd_set &rfds)
  83.     {
  84.          for (int i = 0; i < N; i++)
  85.         {
  86.             if (_fd_array[i] == defaultfd)
  87.                 continue;
  88.             if (FD_ISSET(_fd_array[i], &rfds))
  89.             {
  90.                 if (_fd_array[i] == _listensock->SockFd())//新的连接
  91.                 {
  92.                     AcceptClient();
  93.                 }
  94.                 else
  95.                 {
  96.                     // 普通的sockfd读事件就绪
  97.                     ServiceIO(i);
  98.                 }
  99.             }
  100.         }
  101.     }
  102.      
  103.     void Loop()
  104.     {
  105.         while(true)
  106.         {
  107.             //监听套接字在等待对方发送连接
  108.             //新的连接 == 读事件就绪
  109.             //要将listensock添加到select中!
  110.             fd_set rfds; //一个记录文件描述符状态的集合
  111.             FD_ZERO(&rfds);//将所有文件描述符移除集合
  112.             int max_fd = defaultfd;//最大的文件描述符值
  113.             for (int i = 0; i < N; i++)
  114.             {
  115.                 if (_fd_array[i] == defaultfd)
  116.                     continue;
  117.                 FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
  118.                 if (max_fd < _fd_array[i])
  119.                 {
  120.                     max_fd = _fd_array[i]; // 更新出最大的fd的值
  121.                 }
  122.             }
  123.             struct timeval timeout = {0, 0};//设置等待时间
  124.             int n=select(max_fd+1,&rfds,nullptr,nullptr,nullptr);
  125.             //timeout 是 NULL,则 select() 会无限期地等待直到至少有一个文件描述符准备就绪
  126.             switch (n)
  127.             {
  128.             case 0://指定时间内没有任何文件描述符准备就绪
  129.                 LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
  130.                 break;
  131.             case -1://出现错误
  132.                 LOG(ERROR, "select error...\n");
  133.                 break;
  134.             default://成功状态
  135.                 LOG(DEBUG, "Event Happen. n : %d\n", n); // 底层有一个事件就绪,select为什么会一直通知我?因为:我们没有处理!
  136.                 HandlerEvent(rfds);
  137.                 break;
  138.             }
  139.         }
  140.     }
  141.     //打印出已存在的fd
  142.     std::string RfdsToString()
  143.     {
  144.         std::string fdstr;
  145.         for (int i = 0; i < N; i++)
  146.         {
  147.             if (_fd_array[i] == defaultfd)
  148.                 continue;
  149.             fdstr += std::to_string(_fd_array[i]);
  150.             fdstr += " ";
  151.         }
  152.         return fdstr;
  153.     }
  154.     ~SelectServer()
  155.     {
  156.     }
  157. private:
  158.     uint16_t _port; //端口号
  159.     std::unique_ptr<Socket> _listensock;//监听socket
  160.     int _fd_array[N]; // 辅助数组
  161. };
复制代码
  成员:

辅助数组:由上面select实行过程可以知道,当select实行后,会对fd_set的文件描述符产生影响,所以为了能够在循环中多次调用select函数,就需要一个数组来进行对文件描述符的保存;
    初始化:

对于辅助数组来说,只要没有新的fd进来,那么文件描述符将保持为负;而对于Select来说,监听fd就是第一个变乱;所以要在初始化就添加进来;
    Loop:
这里操作流程就是跟上面的实行过程是一样的,只是增长了一些细节:

N是1024,指的是Select能存储的最大fd的数目;我们需要让select监听我们想要监听的变乱,就需要通过循环来一个一个添加到rfds中;
    select()返回值:

根据select()的返回值实行差别的代码;
这里所说的底层变乱就绪,如果没有处理已就绪的变乱,那么select就会一直监测到变乱就绪,一直实行default语句的内容;
    HandlerEvent():

在for循环内里通过FD_ISSET函数找出每个已经就绪的变乱,然后再判断是不是监听变乱的;
  Main.cc

  1. #include "SelectSever.hpp"
  2. #include <memory>
  3. // ./selectserver port
  4. int main(int argc, char *argv[])
  5. {
  6.     if (argc != 2)
  7.     {
  8.         std::cout << "Usage: " << argv[0] << " port" << std::endl;
  9.         return 0;
  10.     }
  11.     uint16_t port = std::stoi(argv[1]);//获取端口号
  12.     EnableScreen();
  13.     std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(port);
  14.     svr->Loop();
  15.     return 0;
  16. }
复制代码
通过telnet进行测试:
   开启Server服务:

    telnet进行访问

    Server的监听socket收到就绪的变乱,创建一个新的IO服务客户端:

    客户端任意发送内容:

客户端服务端都能通过Select的底层就绪互相接收发送:

  Select()的优缺点

优点


缺点


因此,在选择是否利用select()函数时,需要根据具体的应用场景和需求进行衡量。对于需要监督大量文件描述符或追求高性能的应用程序来说,可能需要思量利用更高级的I/O多路复用机制,如poll()或epoll()等。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4