【项目日志】仿mudou的高并发服务器 --- 实现缓冲区模块,通用范例Any模块 ...

打印 上一主题 下一主题

主题 866|帖子 866|积分 2598


   一个人知道本身为什么而活,      就可以忍受任何一种生活。        --- 尼采 ---       
   ✨✨✨项目地点在这里 ✨✨✨
  ✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨
  

  
1 主从Reactor模子

这个项目标目标是实现一个可以高效处理哀求的服务器,那么对于如许的一个服务器要如何实现呢?
这里接纳主从Reactor模子的方案:

  • 主Reactor模子负责对监听套接字进行管理,进行获取连接操作。
  • 附属Reactor负责对连接的数据进行处理,而且为了保证线程安全,每一个附属Reactor都要绑定一个线程,这个连接的使命都在这个线程中完成。
简单来说就是如许的一个模子:

此中管理连接的对象是Connection对象,这个类是高并发服务器的焦点部门,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!
主要的工作就是搭建起主从Reactor模子,但是这个模子不是一下子就可以写出来的。为了实现主从Reactor模子,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模子
2 基础功能封装

2.1 缓冲区 Buffer模块

Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:

  • 需要支持字符串读取/写入
  • 需要支持char*缓冲区读取/写入
  • 需要正常按行读取 — 方便解析http哀求
Buffer模块成员变量很简单:


  • vector<char>容器_buffer: 对内存空间进行管理
  • uint64_t _reader_idx 读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。
  • uint64_t _writer_idx 写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。
接下来实现一下基础功能:

  • 构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个巨细 BUFFER_DEFAULT_SIZE。
  • 获取当前写入起始地点:_buffer空间的起始地点加入写偏移量即写入起始地点。
  • 获取当前读取起始地点: _buffer空间的起始地点加入读偏移量即读取起始地点。
  • 获取缓冲区末尾空闲空间巨细:写偏移之后的空闲空间 ,总体空间巨细减去写偏移就是写偏移之后的空间巨细。
  • 获取缓冲区起始空闲空间巨细:读偏移之前的空闲空间,其实就是读偏移的巨细。
  • 获取可读数据巨细:写偏移减去读偏移就就之间可读空间的巨细!
  • 读/写偏移向后移动
    * 先根据len判断是否小于可读数据巨细 len必须小于可读数据巨细,然后移动读偏移。
    * 向后移动必须小于当前后边的空闲空间巨细,写入数据必须小于缓冲区剩余空间巨细,不敷就进行扩容!
  • 确保可写空间足够 :
    * 末尾空闲空间足够 直接返回。
    * 末尾空间不敷,但算上起始位置的空闲空间巨细足够 ,将数据移动到起始位置。
    * 如果总空间不敷 ,进行扩容,扩容到足够空间即可
  • 写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续计划出针对string的写入、针对Buffer的写入以及写入 + 压入数据
  • 读取数据:首先读取巨细len必须小于可读取数据巨细,然后拷贝数据出来。同样可以计划出针对string的读取、针对Buffer的读取以及读取+弹出数据。
  • 清空缓冲区:将偏移量归零即可!
  • 读取一行数据:先找到换行符,然后进行读取。
  1. // 缓冲区Buffer类
  2. class Buffer
  3. {
  4. private:
  5.     std::vector<char> _buffer;
  6.     uint64_t _reader_idx;
  7.     uint64_t _writer_idx;
  8. private:
  9.     char *Begin() { return &*_buffer.begin(); }
  10. public:
  11.     Buffer() : _buffer(BUFFER_DEFAULT_SIZE), _reader_idx(0), _writer_idx(0) {}
  12.     // 获取当前写入起始地址
  13.     char *WritePos() { return Begin() + _writer_idx; }
  14.     // 获取当前读取起始地址
  15.     char *ReadPos() { return Begin() + _reader_idx; }
  16.     // 获取读取位置之前的空闲空间
  17.     uint64_t HeadIdleSize() { return _reader_idx; }
  18.     // 获取写入位置之后的空闲空间
  19.     uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
  20.     // 获取可读数据大小
  21.     uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }
  22.     // 读/写偏移移动
  23.     void MoveReadOffset(uint64_t len)
  24.     {
  25.         if (len <= 0)
  26.             return;
  27.         assert(len <= ReadAbleSize());
  28.         _reader_idx += len;
  29.     }
  30.     void MoveWriteOffset(uint64_t len)
  31.     {
  32.         if (len <= 0)
  33.             return;
  34.         assert(len <= TailIdleSize());
  35.         _writer_idx += len;
  36.     }
  37.     // 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容
  38.     void EnsureWriteSpace(uint64_t len)
  39.     {
  40.         // 当len小于写入位置之后的空闲空间 直接进行写入
  41.         if (len <= TailIdleSize())
  42.         {
  43.             return;
  44.         }
  45.         // len 小于总共的空闲空间
  46.         else if (len <= TailIdleSize() + HeadIdleSize())
  47.         {
  48.             // 记录总共的数据
  49.             uint64_t sz = ReadAbleSize();
  50.             // 移动数据
  51.             std::copy(ReadPos(), ReadPos() + sz, Begin());
  52.             // 更新写入读取位置
  53.             _reader_idx = 0;
  54.             _writer_idx = sz;
  55.         }
  56.         // 需要扩容
  57.         else
  58.         {
  59.             // 在写入位置之后扩充len个大小
  60.             _buffer.resize(_writer_idx + len);
  61.         }
  62.     }
  63.     // ----------写入数据------------
  64.     void Write(const void *data, uint64_t len)
  65.     {
  66.         // 写入的数据不能超过可写的总空间
  67.         // 确保可以正常写入
  68.         EnsureWriteSpace(len);
  69.         // 进行拷贝
  70.         const char *d = reinterpret_cast<const char *>(data);
  71.         std::copy(d, d + len, WritePos());
  72.     }
  73.     // 写入Buffer
  74.     void WriteBuffer(Buffer &buffer)
  75.     {
  76.         // 直接调用Write
  77.         return Write(buffer.ReadPos(), buffer.ReadAbleSize());
  78.     }
  79.     // 写入字符串
  80.     void WriteString(const std::string &str)
  81.     {
  82.         // 直接调用Write
  83.         return Write(&str[0], str.size());
  84.     }
  85.     // 写入 + 偏移
  86.     void WriteAndPush(const void *data, uint64_t len)
  87.     {
  88.         if (len <= 0)
  89.             return;
  90.         // 进行写入
  91.         Write(data, len);
  92.         // 更新写入偏移量
  93.         MoveWriteOffset(len);
  94.     }
  95.     void WriteStringAndPush(const std::string &str)
  96.     {
  97.         // 直接调用Write
  98.         WriteString(str);
  99.         LOG(DEBUG, "%s\n", WritePos());
  100.         // 更新偏移量
  101.         MoveWriteOffset(str.size());
  102.         LOG(DEBUG, "当前可读数据大小:%ld\n", ReadAbleSize());
  103.     }
  104.     void WriteBufferAndPush(Buffer &buffer)
  105.     {
  106.         // 直接调用Write
  107.         Write(buffer.ReadPos(), buffer.ReadAbleSize());
  108.         MoveWriteOffset(buffer.ReadAbleSize());
  109.     }
  110.     //---------读取数据----------
  111.     void Read(void *buf, uint64_t len)
  112.     {
  113.         // len 必须小于可读数据大小
  114.         assert(len <= ReadAbleSize());
  115.         // 进行拷贝
  116.         std::copy(ReadPos(), ReadPos() + len, (char *)buf);
  117.     }
  118.     // 读取字符串
  119.     std::string ReadAsString(uint64_t len)
  120.     {
  121.         // len 必须小于可读数据大小
  122.         assert(len <= ReadAbleSize());
  123.         std::string str;
  124.         str.resize(len);
  125.         // 直接调用Read
  126.         Read(&str[0], len);
  127.         return str;
  128.     }
  129.     // 读取 + Pop
  130.     void ReadAndPop(void *buf, uint64_t len)
  131.     {
  132.         // 进行读取
  133.         Read(buf, len);
  134.         // 更新读取偏移量
  135.         MoveReadOffset(len);
  136.     }
  137.     std::string ReadAsStringAndPop(uint64_t len)
  138.     {
  139.         // len 必须小于可读数据大小
  140.         assert(len <= ReadAbleSize());
  141.         // 进行读取
  142.         std::string str = ReadAsString(len);
  143.         // 更新偏移量
  144.         MoveReadOffset(len);
  145.         return str;
  146.     }
  147.     //-----------读取一行数据--------------
  148.     char *FindCRLF()
  149.     {
  150.         return (char *)memchr(ReadPos(), '\n', ReadAbleSize());
  151.     }
  152.     std::string GetLine()
  153.     {
  154.         // 先寻找换行符
  155.         char *pos = FindCRLF();
  156.         if (pos == nullptr)
  157.             return "";
  158.         // 读取要带走'\n'
  159.         std::string str = ReadAsString(pos - ReadPos() + 1);
  160.         return str;
  161.     }
  162.     std::string GetLineAndPop()
  163.     {
  164.         std::string str = GetLine();
  165.         MoveReadOffset(str.size());
  166.         return str;
  167.     }
  168.     // 清除数据
  169.     void Clear()
  170.     {
  171.         // 偏移量归零即可
  172.         _reader_idx = 0;
  173.         _writer_idx = 0;
  174.     }
  175. };
复制代码
2.2 通用范例 Any类

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有显着的协议倾向,它可以是恣意协议的上下文信息,因此就需要⼀个通⽤的范例来保存各种差别的数据结构

  • 一个连接必须拥有一个哀求接收与解析的上下文!
  • 上下文的范例大概结构不能固定!因为服务器的协议支持的协议许多,差别的协议,可能都有差别的上下文结构!
所以必须拥有一个容器,能够保存各种差别的范例!那么就要实现一个any类

  • 假如使用模版类方法,那么实例化对象的时间肯定要指明容器保存的数据范例!而我们需要的是any可以接收恣意范例Any a ; a = 10 ; a = "abc"!
  • 但是可以嵌套一下,在Any类中计划一个类,专门用于保存各种范例的数据,而Any类保存的是固定类的对象。
  • 对于这个固定类依旧不能使用模版。但这里这里可以采用多态,计划一个子类,这是一个模版类。
    如许可以通过父类指针读取子类数据!
  • Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个
    特定范例的对象出来,让子类对象保存数据!
  1. // 通用类型 Any类
  2. class Any
  3. {
  4. private:
  5.     class holder
  6.     {
  7.     public:
  8.         holder() {}
  9.         virtual ~holder() {};
  10.         virtual const std::type_info &type() = 0;
  11.         virtual holder *clone() = 0;
  12.     };
  13.     template <class T>
  14.     class placeholder : public holder
  15.     {
  16.     public:
  17.         placeholder(const T &val) : _val(val) {}
  18.         virtual const std::type_info &type() override
  19.         {
  20.             return typeid(T);
  21.         }
  22.         virtual holder *clone() override
  23.         {
  24.             return new placeholder(_val);
  25.         }
  26.         T _val;
  27.     };
  28.     // 通过这个父类指针访问子类的成员
  29.     holder *_content = nullptr;
  30. public:
  31.     Any()
  32.     {
  33.     }
  34.     template <class T>
  35.     // 拷贝构造
  36.     Any(const T &val) : _content(new placeholder<T>(val))
  37.     {
  38.     }
  39.     // 拷贝构造
  40.     Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
  41.     // 交换数据
  42.     Any &swap(Any &other)
  43.     {
  44.         std::swap(_content, other._content);
  45.         return *this;
  46.     }
  47.     // 赋值重载
  48.     Any &operator=(Any &other)
  49.     {
  50.         // 根据 other建立一个临时对象 ,进行资源的交换
  51.         Any(other).swap(*this);
  52.         return *this;
  53.     }
  54.     template <class T>
  55.     Any &operator=(const T &val)
  56.     {
  57.         // 根据 val 建立一个临时对象,进行资源的交换
  58.         Any(val).swap(*this);
  59.         return *this;
  60.     }
  61.     template <class T>
  62.     T *Get()
  63.     {
  64.         if (typeid(T) != _content->type())
  65.             return nullptr;
  66.         return &((reinterpret_cast<placeholder<T> *>(_content))->_val);
  67.     }
  68.     ~Any()
  69.     {
  70.         delete _content;
  71.     }
  72. };
复制代码
2.3 套接字 Socket模块

Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:

  • 构造函数 析构函数
  • 创建套接字
    bool Create()
  • 绑定地点信息
  • 开始监听
  • 向服务器发起连接
  • 获取新连接
  • 接收数据
  • 发送数据
  • 关闭套接字
  • 创建一个服务端连接:首先创建套接字,然后将历程绑定地点信息,开启监听状态,留意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,统统等待都由多路转接负责);同时启动地点重用 ,保护客户端
  • 创建一个客户端连接:创建套接字,连接服务器。
  • 设置套接字选项 — 开启地点端口重用
  • 设置套接字阻塞属性 — 设置为非阻塞读取
  1. // 套接字Socket类
  2. class Socket
  3. {
  4. private:
  5.     int _sockfd; // 套接字文件描述符
  6. private:
  7.     // 创建套接字
  8.     bool Create()
  9.     {
  10.         // int socket(int domain, int type, int protocol);
  11.         // IPV4     数据流IO
  12.         _sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
  13.         if (_sockfd < 0)
  14.         {
  15.             LOG(ERROR, "create socket failed!\n");
  16.             return false;
  17.         }
  18.         LOG(INFO, "Sockfd:%d create success \n", _sockfd);
  19.         return true;
  20.     }
  21.     // 绑定地址信息
  22.     bool bind(const std::string &ip, uint16_t port)
  23.     {
  24.         struct sockaddr_in addr;
  25.         addr.sin_family = AF_INET;                    // 使用IPv4版本地址
  26.         addr.sin_port = htons(port);                  // 主机端口转网络端口
  27.         addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
  28.         // 进行绑定
  29.         socklen_t len = sizeof(struct sockaddr_in);
  30.         int n = ::bind(_sockfd, (struct sockaddr *)&addr, len);
  31.         if (n < 0)
  32.         {
  33.             LOG(ERROR, "bind failed!\n");
  34.             return false;
  35.         }
  36.         return true;
  37.     }
  38.     // 建立监听套接字
  39.     bool Listen(int backlog = MAX_LIETENSIZE)
  40.     {
  41.         //                        全连接队列的大小
  42.         int n = ::listen(_sockfd, backlog);
  43.         if (n < 0)
  44.         {
  45.             LOG(ERROR, "listen failed!\n");
  46.             return false;
  47.         }
  48.         return true;
  49.     }
  50.     // 向服务器发起连接
  51.     bool Connect(const std::string &ip, uint16_t port)
  52.     {
  53.         struct sockaddr_in addr;
  54.         addr.sin_family = AF_INET;                    // 使用IPv4版本地址
  55.         addr.sin_port = htons(port);                  // 主机端口转网络端口
  56.         addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
  57.         // 进行绑定
  58.         socklen_t len = sizeof(struct sockaddr_in);
  59.         int n = ::connect(_sockfd, (struct sockaddr *)&addr, len);
  60.         if (n < 0)
  61.         {
  62.             LOG(ERROR, "connect failed!\n");
  63.             return false;
  64.         }
  65.         return true;
  66.     }
  67.     // 非阻塞启动地址重用
  68.     void ReuseAddress(uint16_t port, const std::string &ip)
  69.     {
  70.         int val = 1;
  71.         socklen_t len = sizeof(int);
  72.         // 将端口号设置为可重用
  73.         if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, len) == -1)
  74.         {
  75.             throw std::runtime_error("Failed to set SO_REUSEPORT");
  76.         }
  77.         // 将IP地址设置为可重用
  78.         if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, len) == -1)
  79.         {
  80.             throw std::runtime_error("Failed to set SO_REUSEADDR");
  81.         }
  82.     }
  83.     void NonBlock()
  84.     {
  85.         int flag = ::fcntl(_sockfd, F_GETFL, 0);
  86.         ::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
  87.     }
  88. public:
  89.     // 构造函数
  90.     Socket() : _sockfd(-1) {};
  91.     Socket(int sockfd) : _sockfd(sockfd) {}
  92.     // 析构函数
  93.     ~Socket() { Close(); }
  94.     // 返回套接字文件描述符
  95.     int Sockfd() { return _sockfd; }
  96.     // 接收数据
  97.     ssize_t Recv(void *buf, size_t len, int flag = 0)
  98.     {
  99.         ssize_t n = ::recv(_sockfd, buf, len, flag);
  100.         if (n <= 0)
  101.         {
  102.             // 被信号中断         没有数据了- 非阻塞读取完毕
  103.             if (errno == EINTR || errno == EAGAIN)
  104.                 return 0; // 没有读取到数据
  105.             LOG(ERROR, "recv failed!\n");
  106.             return -1;
  107.         }
  108.         return n; // 实际发送的长度
  109.     }
  110.     ssize_t NonBlockRecv(void *buf, size_t len)
  111.     {
  112.         //       MSG_DONTWAIT  非阻塞式读取
  113.         if (len <= 0)
  114.             return 0;
  115.         return Recv(buf, len, MSG_DONTWAIT);
  116.     }
  117.     // 发送数据
  118.     ssize_t Send(void *buf, size_t len, int flag = 0)
  119.     {
  120.         if (len <= 0)
  121.             return 0;
  122.         ssize_t n = ::send(_sockfd, buf, len, flag);
  123.         if (n <= 0)
  124.         {
  125.             // 被信号中断         没有数据了- 非阻塞读取完毕
  126.             if (errno == EINTR || errno == EAGAIN)
  127.                 return 0; // 没有读取到数据
  128.             LOG(ERROR, "send failed!\n");
  129.             return -1;
  130.         }
  131.         LOG(DEBUG, "Send :%s\n", (char *)buf);
  132.         return n; // 实际发送的数据大小
  133.     }
  134.     ssize_t NonBlockSend(void *buf, size_t len)
  135.     {
  136.         //       MSG_DONTWAIT  非阻塞式发送
  137.         if (len <= 0)
  138.             return 0;
  139.         return Send(buf, len, MSG_DONTWAIT);
  140.     }
  141.     // 创建服务端套接字
  142.     bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = 0)
  143.     {
  144.         // 创建套接字
  145.         if (Create() == false)
  146.             return false;
  147.         // 设置为非阻塞
  148.         if (flag)
  149.             NonBlock();
  150.         // 将地址与端口设置为可重用
  151.         ReuseAddress(port, ip);
  152.         // 绑定地址信息
  153.         if (bind(ip, port) == false)
  154.             return false;
  155.         // 进行监听
  156.         if (Listen() == false)
  157.             return false;
  158.         return true;
  159.     }
  160.     // 创建客户端套接字
  161.     bool CreateClient(uint16_t port, const std::string &ip)
  162.     {
  163.         // 创建套接字
  164.         if (Create() == false)
  165.             return false;
  166.         // 连接服务器
  167.         if (Connect(ip, port) == false)
  168.             return false;
  169.         return true;
  170.     }
  171.     // 服务器获取连接
  172.     int Accept()
  173.     {
  174.         int newfd = ::accept(_sockfd, nullptr, nullptr);
  175.         if (newfd < 0)
  176.         {
  177.             LOG(ERROR, "accept failed!\n");
  178.             return -1;
  179.         }
  180.         return newfd;
  181.     }
  182.     //  关闭套接字
  183.     void Close()
  184.     {
  185.         LOG(INFO, "Close sockfd: %d\n", _sockfd);
  186.         if (_sockfd != -1)
  187.         {
  188.             ::close(_sockfd);
  189.         }
  190.     }
  191. };
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

罪恶克星

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表