罪恶克星 发表于 2024-11-24 06:48:26

【项目日志】仿mudou的高并发服务器 --- 实现缓冲区模块,通用范例Any模块

https://i-blog.csdnimg.cn/direct/4380fc19571b4c7f938b1e2ac516020d.png
   一个人知道本身为什么而活,    就可以忍受任何一种生活。        --- 尼采 ---           ✨✨✨项目地点在这里 ✨✨✨
✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨


1 主从Reactor模子

这个项目标目标是实现一个可以高效处理哀求的服务器,那么对于如许的一个服务器要如何实现呢?
这里接纳主从Reactor模子的方案:

[*]主Reactor模子负责对监听套接字进行管理,进行获取连接操作。
[*]附属Reactor负责对连接的数据进行处理,而且为了保证线程安全,每一个附属Reactor都要绑定一个线程,这个连接的使命都在这个线程中完成。
简单来说就是如许的一个模子:
https://i-blog.csdnimg.cn/direct/1fb10cfbe75b4634abe7c0d4f4b8f0fb.png
此中管理连接的对象是Connection对象,这个类是高并发服务器的焦点部门,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!
主要的工作就是搭建起主从Reactor模子,但是这个模子不是一下子就可以写出来的。为了实现主从Reactor模子,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模子!
2 基础功能封装

2.1 缓冲区 Buffer模块

Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:

[*]需要支持字符串读取/写入
[*]需要支持char*缓冲区读取/写入
[*]需要正常按行读取 — 方便解析http哀求
Buffer模块成员变量很简单:


[*]vector<char>容器_buffer: 对内存空间进行管理
[*]uint64_t _reader_idx 读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。
[*]uint64_t _writer_idx 写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。
接下来实现一下基础功能:

[*]构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个巨细 BUFFER_DEFAULT_SIZE。
[*]获取当前写入起始地点:_buffer空间的起始地点加入写偏移量即写入起始地点。
[*]获取当前读取起始地点: _buffer空间的起始地点加入读偏移量即读取起始地点。
[*]获取缓冲区末尾空闲空间巨细:写偏移之后的空闲空间 ,总体空间巨细减去写偏移就是写偏移之后的空间巨细。
[*]获取缓冲区起始空闲空间巨细:读偏移之前的空闲空间,其实就是读偏移的巨细。
[*]获取可读数据巨细:写偏移减去读偏移就就之间可读空间的巨细!
[*]读/写偏移向后移动:
* 先根据len判断是否小于可读数据巨细 len必须小于可读数据巨细,然后移动读偏移。
* 向后移动必须小于当前后边的空闲空间巨细,写入数据必须小于缓冲区剩余空间巨细,不敷就进行扩容!
[*]确保可写空间足够 :
* 末尾空闲空间足够 直接返回。
* 末尾空间不敷,但算上起始位置的空闲空间巨细足够 ,将数据移动到起始位置。
* 如果总空间不敷 ,进行扩容,扩容到足够空间即可
[*]写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续计划出针对string的写入、针对Buffer的写入以及写入 + 压入数据
[*]读取数据:首先读取巨细len必须小于可读取数据巨细,然后拷贝数据出来。同样可以计划出针对string的读取、针对Buffer的读取以及读取+弹出数据。
[*]清空缓冲区:将偏移量归零即可!
[*]读取一行数据:先找到换行符,然后进行读取。
// 缓冲区Buffer类
class Buffer
{
private:
    std::vector<char> _buffer;
    uint64_t _reader_idx;
    uint64_t _writer_idx;

private:
    char *Begin() { return &*_buffer.begin(); }

public:
    Buffer() : _buffer(BUFFER_DEFAULT_SIZE), _reader_idx(0), _writer_idx(0) {}
    // 获取当前写入起始地址
    char *WritePos() { return Begin() + _writer_idx; }
    // 获取当前读取起始地址
    char *ReadPos() { return Begin() + _reader_idx; }
    // 获取读取位置之前的空闲空间
    uint64_t HeadIdleSize() { return _reader_idx; }
    // 获取写入位置之后的空闲空间
    uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
    // 获取可读数据大小
    uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }
    // 读/写偏移移动
    void MoveReadOffset(uint64_t len)
    {
      if (len <= 0)
            return;
      assert(len <= ReadAbleSize());
      _reader_idx += len;
    }
    void MoveWriteOffset(uint64_t len)
    {
      if (len <= 0)
            return;
      assert(len <= TailIdleSize());
      _writer_idx += len;
    }
    // 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容
    void EnsureWriteSpace(uint64_t len)
    {
      // 当len小于写入位置之后的空闲空间 直接进行写入
      if (len <= TailIdleSize())
      {
            return;
      }

      // len 小于总共的空闲空间
      else if (len <= TailIdleSize() + HeadIdleSize())
      {
            // 记录总共的数据
            uint64_t sz = ReadAbleSize();
            // 移动数据
            std::copy(ReadPos(), ReadPos() + sz, Begin());
            // 更新写入读取位置
            _reader_idx = 0;
            _writer_idx = sz;
      }
      // 需要扩容
      else
      {
            // 在写入位置之后扩充len个大小
            _buffer.resize(_writer_idx + len);
      }
    }
    // ----------写入数据------------
    void Write(const void *data, uint64_t len)
    {
      // 写入的数据不能超过可写的总空间
      // 确保可以正常写入
      EnsureWriteSpace(len);
      // 进行拷贝
      const char *d = reinterpret_cast<const char *>(data);
      std::copy(d, d + len, WritePos());
    }
    // 写入Buffer
    void WriteBuffer(Buffer &buffer)
    {
      // 直接调用Write
      return Write(buffer.ReadPos(), buffer.ReadAbleSize());
    }
    // 写入字符串
    void WriteString(const std::string &str)
    {
      // 直接调用Write
      return Write(&str, str.size());
    }
    // 写入 + 偏移
    void WriteAndPush(const void *data, uint64_t len)
    {
      if (len <= 0)
            return;
      // 进行写入
      Write(data, len);
      // 更新写入偏移量
      MoveWriteOffset(len);
    }
    void WriteStringAndPush(const std::string &str)
    {

      // 直接调用Write
      WriteString(str);
      LOG(DEBUG, "%s\n", WritePos());
      // 更新偏移量
      MoveWriteOffset(str.size());
      LOG(DEBUG, "当前可读数据大小:%ld\n", ReadAbleSize());
    }
    void WriteBufferAndPush(Buffer &buffer)
    {
      // 直接调用Write
      Write(buffer.ReadPos(), buffer.ReadAbleSize());
      MoveWriteOffset(buffer.ReadAbleSize());
    }

    //---------读取数据----------
    void Read(void *buf, uint64_t len)
    {
      // len 必须小于可读数据大小
      assert(len <= ReadAbleSize());
      // 进行拷贝
      std::copy(ReadPos(), ReadPos() + len, (char *)buf);
    }
    // 读取字符串
    std::string ReadAsString(uint64_t len)
    {
      // len 必须小于可读数据大小
      assert(len <= ReadAbleSize());
      std::string str;
      str.resize(len);
      // 直接调用Read
      Read(&str, len);
      return str;
    }
    // 读取 + Pop
    void ReadAndPop(void *buf, uint64_t len)
    {
      // 进行读取
      Read(buf, len);
      // 更新读取偏移量
      MoveReadOffset(len);
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
      // len 必须小于可读数据大小
      assert(len <= ReadAbleSize());
      // 进行读取
      std::string str = ReadAsString(len);
      // 更新偏移量
      MoveReadOffset(len);
      return str;
    }

    //-----------读取一行数据--------------
    char *FindCRLF()
    {
      return (char *)memchr(ReadPos(), '\n', ReadAbleSize());
    }
    std::string GetLine()
    {
      // 先寻找换行符
      char *pos = FindCRLF();
      if (pos == nullptr)
            return "";
      // 读取要带走'\n'
      std::string str = ReadAsString(pos - ReadPos() + 1);
      return str;
    }
    std::string GetLineAndPop()
    {
      std::string str = GetLine();
      MoveReadOffset(str.size());
      return str;
    }

    // 清除数据
    void Clear()
    {
      // 偏移量归零即可
      _reader_idx = 0;
      _writer_idx = 0;
    }
};
2.2 通用范例 Any类

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有显着的协议倾向,它可以是恣意协议的上下文信息,因此就需要⼀个通⽤的范例来保存各种差别的数据结构:

[*]一个连接必须拥有一个哀求接收与解析的上下文!
[*]上下文的范例大概结构不能固定!因为服务器的协议支持的协议许多,差别的协议,可能都有差别的上下文结构!
所以必须拥有一个容器,能够保存各种差别的范例!那么就要实现一个any类

[*]假如使用模版类方法,那么实例化对象的时间肯定要指明容器保存的数据范例!而我们需要的是any可以接收恣意范例Any a ; a = 10 ; a = "abc"!
[*]但是可以嵌套一下,在Any类中计划一个类,专门用于保存各种范例的数据,而Any类保存的是固定类的对象。
[*]对于这个固定类依旧不能使用模版。但这里这里可以采用多态,计划一个子类,这是一个模版类。
如许可以通过父类指针读取子类数据!
[*]Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个
特定范例的对象出来,让子类对象保存数据!
// 通用类型 Any类
class Any
{
private:
    class holder
    {
    public:
      holder() {}
      virtual ~holder() {};
      virtual const std::type_info &type() = 0;
      virtual holder *clone() = 0;
    };
    template <class T>
    class placeholder : public holder
    {
    public:
      placeholder(const T &val) : _val(val) {}

      virtual const std::type_info &type() override
      {
            return typeid(T);
      }
      virtual holder *clone() override
      {
            return new placeholder(_val);
      }

      T _val;
    };
    // 通过这个父类指针访问子类的成员
    holder *_content = nullptr;

public:
    Any()
    {
    }
    template <class T>
    // 拷贝构造
    Any(const T &val) : _content(new placeholder<T>(val))
    {
    }
    // 拷贝构造
    Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
    // 交换数据
    Any &swap(Any &other)
    {
      std::swap(_content, other._content);
      return *this;
    }

    // 赋值重载
    Any &operator=(Any &other)
    {
      // 根据 other建立一个临时对象 ,进行资源的交换
      Any(other).swap(*this);
      return *this;
    }
    template <class T>
    Any &operator=(const T &val)
    {
      // 根据 val 建立一个临时对象,进行资源的交换
      Any(val).swap(*this);
      return *this;
    }
    template <class T>
    T *Get()
    {
      if (typeid(T) != _content->type())
            return nullptr;
      return &((reinterpret_cast<placeholder<T> *>(_content))->_val);
    }

    ~Any()
    {
      delete _content;
    }
};
2.3 套接字 Socket模块

Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:

[*]构造函数 析构函数
[*]创建套接字
bool Create()
[*]绑定地点信息
[*]开始监听
[*]向服务器发起连接
[*]获取新连接
[*]接收数据
[*]发送数据
[*]关闭套接字
[*]创建一个服务端连接:首先创建套接字,然后将历程绑定地点信息,开启监听状态,留意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,统统等待都由多路转接负责);同时启动地点重用 ,保护客户端
[*]创建一个客户端连接:创建套接字,连接服务器。
[*]设置套接字选项 — 开启地点端口重用
[*]设置套接字阻塞属性 — 设置为非阻塞读取
// 套接字Socket类
class Socket
{
private:
    int _sockfd; // 套接字文件描述符

private:
    // 创建套接字
    bool Create()
    {
      // int socket(int domain, int type, int protocol);
      // IPV4   数据流IO
      _sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
      if (_sockfd < 0)
      {
            LOG(ERROR, "create socket failed!\n");
            return false;
      }
      LOG(INFO, "Sockfd:%d create success \n", _sockfd);
      return true;
    }
    // 绑定地址信息
    bool bind(const std::string &ip, uint16_t port)
    {
      struct sockaddr_in addr;
      addr.sin_family = AF_INET;                  // 使用IPv4版本地址
      addr.sin_port = htons(port);                  // 主机端口转网络端口
      addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
      // 进行绑定
      socklen_t len = sizeof(struct sockaddr_in);
      int n = ::bind(_sockfd, (struct sockaddr *)&addr, len);
      if (n < 0)
      {
            LOG(ERROR, "bind failed!\n");
            return false;
      }
      return true;
    }
    // 建立监听套接字
    bool Listen(int backlog = MAX_LIETENSIZE)
    {
      //                        全连接队列的大小
      int n = ::listen(_sockfd, backlog);
      if (n < 0)
      {
            LOG(ERROR, "listen failed!\n");
            return false;
      }
      return true;
    }
    // 向服务器发起连接
    bool Connect(const std::string &ip, uint16_t port)
    {
      struct sockaddr_in addr;
      addr.sin_family = AF_INET;                  // 使用IPv4版本地址
      addr.sin_port = htons(port);                  // 主机端口转网络端口
      addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
      // 进行绑定
      socklen_t len = sizeof(struct sockaddr_in);
      int n = ::connect(_sockfd, (struct sockaddr *)&addr, len);
      if (n < 0)
      {
            LOG(ERROR, "connect failed!\n");
            return false;
      }
      return true;
    }

    // 非阻塞启动地址重用
    void ReuseAddress(uint16_t port, const std::string &ip)
    {
      int val = 1;
      socklen_t len = sizeof(int);
      // 将端口号设置为可重用
      if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, len) == -1)
      {
            throw std::runtime_error("Failed to set SO_REUSEPORT");
      }

      // 将IP地址设置为可重用
      if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, len) == -1)
      {
            throw std::runtime_error("Failed to set SO_REUSEADDR");
      }
    }
    void NonBlock()
    {
      int flag = ::fcntl(_sockfd, F_GETFL, 0);
      ::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }

public:
    // 构造函数
    Socket() : _sockfd(-1) {};
    Socket(int sockfd) : _sockfd(sockfd) {}
    // 析构函数
    ~Socket() { Close(); }
    // 返回套接字文件描述符
    int Sockfd() { return _sockfd; }

    // 接收数据
    ssize_t Recv(void *buf, size_t len, int flag = 0)
    {
      ssize_t n = ::recv(_sockfd, buf, len, flag);
      if (n <= 0)
      {
            // 被信号中断         没有数据了- 非阻塞读取完毕
            if (errno == EINTR || errno == EAGAIN)
                return 0; // 没有读取到数据

            LOG(ERROR, "recv failed!\n");
            return -1;
      }
      return n; // 实际发送的长度
    }
    ssize_t NonBlockRecv(void *buf, size_t len)
    {
      //       MSG_DONTWAIT非阻塞式读取
      if (len <= 0)
            return 0;
      return Recv(buf, len, MSG_DONTWAIT);
    }
    // 发送数据
    ssize_t Send(void *buf, size_t len, int flag = 0)
    {
      if (len <= 0)
            return 0;
      ssize_t n = ::send(_sockfd, buf, len, flag);
      if (n <= 0)
      {
            // 被信号中断         没有数据了- 非阻塞读取完毕
            if (errno == EINTR || errno == EAGAIN)
                return 0; // 没有读取到数据
            LOG(ERROR, "send failed!\n");
            return -1;
      }
      LOG(DEBUG, "Send :%s\n", (char *)buf);
      return n; // 实际发送的数据大小
    }
    ssize_t NonBlockSend(void *buf, size_t len)
    {
      //       MSG_DONTWAIT非阻塞式发送
      if (len <= 0)
            return 0;
      return Send(buf, len, MSG_DONTWAIT);
    }

    // 创建服务端套接字
    bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = 0)
    {

      // 创建套接字
      if (Create() == false)
            return false;
      // 设置为非阻塞
      if (flag)
            NonBlock();
      // 将地址与端口设置为可重用
      ReuseAddress(port, ip);
      // 绑定地址信息
      if (bind(ip, port) == false)
            return false;
      // 进行监听
      if (Listen() == false)
            return false;

      return true;
    }
    // 创建客户端套接字
    bool CreateClient(uint16_t port, const std::string &ip)
    {
      // 创建套接字
      if (Create() == false)
            return false;
      // 连接服务器
      if (Connect(ip, port) == false)
            return false;
      return true;
    }
    // 服务器获取连接
    int Accept()
    {
      int newfd = ::accept(_sockfd, nullptr, nullptr);
      if (newfd < 0)
      {
            LOG(ERROR, "accept failed!\n");
            return -1;
      }
      return newfd;
    }
    //关闭套接字
    void Close()
    {
      LOG(INFO, "Close sockfd: %d\n", _sockfd);
      if (_sockfd != -1)
      {
            ::close(_sockfd);
      }
    }
};

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 【项目日志】仿mudou的高并发服务器 --- 实现缓冲区模块,通用范例Any模块