一个人知道本身为什么而活, 就可以忍受任何一种生活。 --- 尼采 --- ✨✨✨项目地点在这里 ✨✨✨
✨✨✨https://gitee.com/penggli_2_0/TcpServer✨✨✨
1 主从Reactor模子
这个项目标目标是实现一个可以高效处理哀求的服务器,那么对于如许的一个服务器要如何实现呢?
这里接纳主从Reactor模子的方案:
- 主Reactor模子负责对监听套接字进行管理,进行获取连接操作。
- 附属Reactor负责对连接的数据进行处理,而且为了保证线程安全,每一个附属Reactor都要绑定一个线程,这个连接的使命都在这个线程中完成。
简单来说就是如许的一个模子:
此中管理连接的对象是Connection对象,这个类是高并发服务器的焦点部门,这里包含了对套接字的处理,事件等待等一系列操作!而Connection对象中对于这些操作的管理也要通过其他的对象来进行!
主要的工作就是搭建起主从Reactor模子,但是这个模子不是一下子就可以写出来的。为了实现主从Reactor模子,我们先需要实现一些基础类,对基础功能进封装,然后对这些功能进行整合,最终实现主从Reactor模子!
2 基础功能封装
2.1 缓冲区 Buffer模块
Buffer模块是⼀个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能:
- 需要支持字符串读取/写入
- 需要支持char*缓冲区读取/写入
- 需要正常按行读取 — 方便解析http哀求
Buffer模块成员变量很简单:
- vector<char>容器_buffer: 对内存空间进行管理
- uint64_t _reader_idx 读偏移:进行读取位置在_buffer容器中的偏移量,即读取的起始位置。
- uint64_t _writer_idx 写偏移:下一次写入位置在_buffer容器中的偏移量,即写入的起始位置。
接下来实现一下基础功能:
- 构造函数:初始化读/写偏移为0 ,容器_buffer初始化一个巨细 BUFFER_DEFAULT_SIZE。
- 获取当前写入起始地点:_buffer空间的起始地点加入写偏移量即写入起始地点。
- 获取当前读取起始地点: _buffer空间的起始地点加入读偏移量即读取起始地点。
- 获取缓冲区末尾空闲空间巨细:写偏移之后的空闲空间 ,总体空间巨细减去写偏移就是写偏移之后的空间巨细。
- 获取缓冲区起始空闲空间巨细:读偏移之前的空闲空间,其实就是读偏移的巨细。
- 获取可读数据巨细:写偏移减去读偏移就就之间可读空间的巨细!
- 读/写偏移向后移动:
* 先根据len判断是否小于可读数据巨细 len必须小于可读数据巨细,然后移动读偏移。
* 向后移动必须小于当前后边的空闲空间巨细,写入数据必须小于缓冲区剩余空间巨细,不敷就进行扩容!
- 确保可写空间足够 :
* 末尾空闲空间足够 直接返回。
* 末尾空间不敷,但算上起始位置的空闲空间巨细足够 ,将数据移动到起始位置。
* 如果总空间不敷 ,进行扩容,扩容到足够空间即可
- 写入数据:首先保证有足够空间,然后将数据数据拷贝进去。可以继续计划出针对string的写入、针对Buffer的写入以及写入 + 压入数据
- 读取数据:首先读取巨细len必须小于可读取数据巨细,然后拷贝数据出来。同样可以计划出针对string的读取、针对Buffer的读取以及读取+弹出数据。
- 清空缓冲区:将偏移量归零即可!
- 读取一行数据:先找到换行符,然后进行读取。
- // 缓冲区Buffer类
- class Buffer
- {
- private:
- std::vector<char> _buffer;
- uint64_t _reader_idx;
- uint64_t _writer_idx;
- private:
- char *Begin() { return &*_buffer.begin(); }
- public:
- Buffer() : _buffer(BUFFER_DEFAULT_SIZE), _reader_idx(0), _writer_idx(0) {}
- // 获取当前写入起始地址
- char *WritePos() { return Begin() + _writer_idx; }
- // 获取当前读取起始地址
- char *ReadPos() { return Begin() + _reader_idx; }
- // 获取读取位置之前的空闲空间
- uint64_t HeadIdleSize() { return _reader_idx; }
- // 获取写入位置之后的空闲空间
- uint64_t TailIdleSize() { return _buffer.size() - _writer_idx; }
- // 获取可读数据大小
- uint64_t ReadAbleSize() { return _writer_idx - _reader_idx; }
- // 读/写偏移移动
- void MoveReadOffset(uint64_t len)
- {
- if (len <= 0)
- return;
- assert(len <= ReadAbleSize());
- _reader_idx += len;
- }
- void MoveWriteOffset(uint64_t len)
- {
- if (len <= 0)
- return;
- assert(len <= TailIdleSize());
- _writer_idx += len;
- }
- // 确保可写空间足够 --- 整体空闲空间足够了就移动数据,否则进行扩容
- void EnsureWriteSpace(uint64_t len)
- {
- // 当len小于写入位置之后的空闲空间 直接进行写入
- if (len <= TailIdleSize())
- {
- return;
- }
- // len 小于总共的空闲空间
- else if (len <= TailIdleSize() + HeadIdleSize())
- {
- // 记录总共的数据
- uint64_t sz = ReadAbleSize();
- // 移动数据
- std::copy(ReadPos(), ReadPos() + sz, Begin());
- // 更新写入读取位置
- _reader_idx = 0;
- _writer_idx = sz;
- }
- // 需要扩容
- else
- {
- // 在写入位置之后扩充len个大小
- _buffer.resize(_writer_idx + len);
- }
- }
- // ----------写入数据------------
- void Write(const void *data, uint64_t len)
- {
- // 写入的数据不能超过可写的总空间
- // 确保可以正常写入
- EnsureWriteSpace(len);
- // 进行拷贝
- const char *d = reinterpret_cast<const char *>(data);
- std::copy(d, d + len, WritePos());
- }
- // 写入Buffer
- void WriteBuffer(Buffer &buffer)
- {
- // 直接调用Write
- return Write(buffer.ReadPos(), buffer.ReadAbleSize());
- }
- // 写入字符串
- void WriteString(const std::string &str)
- {
- // 直接调用Write
- return Write(&str[0], str.size());
- }
- // 写入 + 偏移
- void WriteAndPush(const void *data, uint64_t len)
- {
- if (len <= 0)
- return;
- // 进行写入
- Write(data, len);
- // 更新写入偏移量
- MoveWriteOffset(len);
- }
- void WriteStringAndPush(const std::string &str)
- {
- // 直接调用Write
- WriteString(str);
- LOG(DEBUG, "%s\n", WritePos());
- // 更新偏移量
- MoveWriteOffset(str.size());
- LOG(DEBUG, "当前可读数据大小:%ld\n", ReadAbleSize());
- }
- void WriteBufferAndPush(Buffer &buffer)
- {
- // 直接调用Write
- Write(buffer.ReadPos(), buffer.ReadAbleSize());
- MoveWriteOffset(buffer.ReadAbleSize());
- }
- //---------读取数据----------
- void Read(void *buf, uint64_t len)
- {
- // len 必须小于可读数据大小
- assert(len <= ReadAbleSize());
- // 进行拷贝
- std::copy(ReadPos(), ReadPos() + len, (char *)buf);
- }
- // 读取字符串
- std::string ReadAsString(uint64_t len)
- {
- // len 必须小于可读数据大小
- assert(len <= ReadAbleSize());
- std::string str;
- str.resize(len);
- // 直接调用Read
- Read(&str[0], len);
- return str;
- }
- // 读取 + Pop
- void ReadAndPop(void *buf, uint64_t len)
- {
- // 进行读取
- Read(buf, len);
- // 更新读取偏移量
- MoveReadOffset(len);
- }
- std::string ReadAsStringAndPop(uint64_t len)
- {
- // len 必须小于可读数据大小
- assert(len <= ReadAbleSize());
- // 进行读取
- std::string str = ReadAsString(len);
- // 更新偏移量
- MoveReadOffset(len);
- return str;
- }
- //-----------读取一行数据--------------
- char *FindCRLF()
- {
- return (char *)memchr(ReadPos(), '\n', ReadAbleSize());
- }
- std::string GetLine()
- {
- // 先寻找换行符
- char *pos = FindCRLF();
- if (pos == nullptr)
- return "";
- // 读取要带走'\n'
- std::string str = ReadAsString(pos - ReadPos() + 1);
- return str;
- }
- std::string GetLineAndPop()
- {
- std::string str = GetLine();
- MoveReadOffset(str.size());
- return str;
- }
- // 清除数据
- void Clear()
- {
- // 偏移量归零即可
- _reader_idx = 0;
- _writer_idx = 0;
- }
- };
复制代码 2.2 通用范例 Any类
每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有显着的协议倾向,它可以是恣意协议的上下文信息,因此就需要⼀个通⽤的范例来保存各种差别的数据结构:
- 一个连接必须拥有一个哀求接收与解析的上下文!
- 上下文的范例大概结构不能固定!因为服务器的协议支持的协议许多,差别的协议,可能都有差别的上下文结构!
所以必须拥有一个容器,能够保存各种差别的范例!那么就要实现一个any类
- 假如使用模版类方法,那么实例化对象的时间肯定要指明容器保存的数据范例!而我们需要的是any可以接收恣意范例Any a ; a = 10 ; a = "abc"!
- 但是可以嵌套一下,在Any类中计划一个类,专门用于保存各种范例的数据,而Any类保存的是固定类的对象。
- 对于这个固定类依旧不能使用模版。但这里这里可以采用多态,计划一个子类,这是一个模版类。
如许可以通过父类指针读取子类数据!
- Any类中,保存的是holder类的指针,当Any类需要保存一个数据时,只需要提供placeholder子类实例化一个
特定范例的对象出来,让子类对象保存数据!
- // 通用类型 Any类
- class Any
- {
- private:
- class holder
- {
- public:
- holder() {}
- virtual ~holder() {};
- virtual const std::type_info &type() = 0;
- virtual holder *clone() = 0;
- };
- template <class T>
- class placeholder : public holder
- {
- public:
- placeholder(const T &val) : _val(val) {}
- virtual const std::type_info &type() override
- {
- return typeid(T);
- }
- virtual holder *clone() override
- {
- return new placeholder(_val);
- }
- T _val;
- };
- // 通过这个父类指针访问子类的成员
- holder *_content = nullptr;
- public:
- Any()
- {
- }
- template <class T>
- // 拷贝构造
- Any(const T &val) : _content(new placeholder<T>(val))
- {
- }
- // 拷贝构造
- Any(const Any &other) : _content(other._content ? other._content->clone() : nullptr) {}
- // 交换数据
- Any &swap(Any &other)
- {
- std::swap(_content, other._content);
- return *this;
- }
- // 赋值重载
- Any &operator=(Any &other)
- {
- // 根据 other建立一个临时对象 ,进行资源的交换
- Any(other).swap(*this);
- return *this;
- }
- template <class T>
- Any &operator=(const T &val)
- {
- // 根据 val 建立一个临时对象,进行资源的交换
- Any(val).swap(*this);
- return *this;
- }
- template <class T>
- T *Get()
- {
- if (typeid(T) != _content->type())
- return nullptr;
- return &((reinterpret_cast<placeholder<T> *>(_content))->_val);
- }
- ~Any()
- {
- delete _content;
- }
- };
复制代码 2.3 套接字 Socket模块
Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。是连接模块Connection 与监听模块Accpter的基础!对于套接字的操作我们已经在熟悉不过了:
- 构造函数 析构函数
- 创建套接字
bool Create()
- 绑定地点信息
- 开始监听
- 向服务器发起连接
- 获取新连接
- 接收数据
- 发送数据
- 关闭套接字
- 创建一个服务端连接:首先创建套接字,然后将历程绑定地点信息,开启监听状态,留意设置为非阻塞读取(为了配合多路转接IO,不需要IO接口进行等待,统统等待都由多路转接负责);同时启动地点重用 ,保护客户端
- 创建一个客户端连接:创建套接字,连接服务器。
- 设置套接字选项 — 开启地点端口重用
- 设置套接字阻塞属性 — 设置为非阻塞读取
- // 套接字Socket类
- class Socket
- {
- private:
- int _sockfd; // 套接字文件描述符
- private:
- // 创建套接字
- bool Create()
- {
- // int socket(int domain, int type, int protocol);
- // IPV4 数据流IO
- _sockfd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
- if (_sockfd < 0)
- {
- LOG(ERROR, "create socket failed!\n");
- return false;
- }
- LOG(INFO, "Sockfd:%d create success \n", _sockfd);
- return true;
- }
- // 绑定地址信息
- bool bind(const std::string &ip, uint16_t port)
- {
- struct sockaddr_in addr;
- addr.sin_family = AF_INET; // 使用IPv4版本地址
- addr.sin_port = htons(port); // 主机端口转网络端口
- addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
- // 进行绑定
- socklen_t len = sizeof(struct sockaddr_in);
- int n = ::bind(_sockfd, (struct sockaddr *)&addr, len);
- if (n < 0)
- {
- LOG(ERROR, "bind failed!\n");
- return false;
- }
- return true;
- }
- // 建立监听套接字
- bool Listen(int backlog = MAX_LIETENSIZE)
- {
- // 全连接队列的大小
- int n = ::listen(_sockfd, backlog);
- if (n < 0)
- {
- LOG(ERROR, "listen failed!\n");
- return false;
- }
- return true;
- }
- // 向服务器发起连接
- bool Connect(const std::string &ip, uint16_t port)
- {
- struct sockaddr_in addr;
- addr.sin_family = AF_INET; // 使用IPv4版本地址
- addr.sin_port = htons(port); // 主机端口转网络端口
- addr.sin_addr.s_addr = inet_addr(ip.c_str()); // 将ip字符串转化为网络ip
- // 进行绑定
- socklen_t len = sizeof(struct sockaddr_in);
- int n = ::connect(_sockfd, (struct sockaddr *)&addr, len);
- if (n < 0)
- {
- LOG(ERROR, "connect failed!\n");
- return false;
- }
- return true;
- }
- // 非阻塞启动地址重用
- void ReuseAddress(uint16_t port, const std::string &ip)
- {
- int val = 1;
- socklen_t len = sizeof(int);
- // 将端口号设置为可重用
- if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, len) == -1)
- {
- throw std::runtime_error("Failed to set SO_REUSEPORT");
- }
- // 将IP地址设置为可重用
- if (setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, len) == -1)
- {
- throw std::runtime_error("Failed to set SO_REUSEADDR");
- }
- }
- void NonBlock()
- {
- int flag = ::fcntl(_sockfd, F_GETFL, 0);
- ::fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
- }
- public:
- // 构造函数
- Socket() : _sockfd(-1) {};
- Socket(int sockfd) : _sockfd(sockfd) {}
- // 析构函数
- ~Socket() { Close(); }
- // 返回套接字文件描述符
- int Sockfd() { return _sockfd; }
- // 接收数据
- ssize_t Recv(void *buf, size_t len, int flag = 0)
- {
- ssize_t n = ::recv(_sockfd, buf, len, flag);
- if (n <= 0)
- {
- // 被信号中断 没有数据了- 非阻塞读取完毕
- if (errno == EINTR || errno == EAGAIN)
- return 0; // 没有读取到数据
- LOG(ERROR, "recv failed!\n");
- return -1;
- }
- return n; // 实际发送的长度
- }
- ssize_t NonBlockRecv(void *buf, size_t len)
- {
- // MSG_DONTWAIT 非阻塞式读取
- if (len <= 0)
- return 0;
- return Recv(buf, len, MSG_DONTWAIT);
- }
- // 发送数据
- ssize_t Send(void *buf, size_t len, int flag = 0)
- {
- if (len <= 0)
- return 0;
- ssize_t n = ::send(_sockfd, buf, len, flag);
- if (n <= 0)
- {
- // 被信号中断 没有数据了- 非阻塞读取完毕
- if (errno == EINTR || errno == EAGAIN)
- return 0; // 没有读取到数据
- LOG(ERROR, "send failed!\n");
- return -1;
- }
- LOG(DEBUG, "Send :%s\n", (char *)buf);
- return n; // 实际发送的数据大小
- }
- ssize_t NonBlockSend(void *buf, size_t len)
- {
- // MSG_DONTWAIT 非阻塞式发送
- if (len <= 0)
- return 0;
- return Send(buf, len, MSG_DONTWAIT);
- }
- // 创建服务端套接字
- bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = 0)
- {
- // 创建套接字
- if (Create() == false)
- return false;
- // 设置为非阻塞
- if (flag)
- NonBlock();
- // 将地址与端口设置为可重用
- ReuseAddress(port, ip);
- // 绑定地址信息
- if (bind(ip, port) == false)
- return false;
- // 进行监听
- if (Listen() == false)
- return false;
- return true;
- }
- // 创建客户端套接字
- bool CreateClient(uint16_t port, const std::string &ip)
- {
- // 创建套接字
- if (Create() == false)
- return false;
- // 连接服务器
- if (Connect(ip, port) == false)
- return false;
- return true;
- }
- // 服务器获取连接
- int Accept()
- {
- int newfd = ::accept(_sockfd, nullptr, nullptr);
- if (newfd < 0)
- {
- LOG(ERROR, "accept failed!\n");
- return -1;
- }
- return newfd;
- }
- // 关闭套接字
- void Close()
- {
- LOG(INFO, "Close sockfd: %d\n", _sockfd);
- if (_sockfd != -1)
- {
- ::close(_sockfd);
- }
- }
- };
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |