IT评测·应用市场-qidao123.com技术社区
标题:
【项目日记(三)】
[打印本页]
作者:
没腿的鸟
时间:
前天 14:36
标题:
【项目日记(三)】
目录
SERVER服务器模块实现:
1、Buffer模块:缓冲区模块
2、套接字Socket类实现:
3、事件管理Channel类实现:
4、 形貌符事件监控Poller类实现:
5、定时使命管理TimerWheel类实现:
eventfd
6、Reactor-EventLoop线程池类实现:
SERVER服务器模块实现:
1、Buffer模块:缓冲区模块
提供的功能:存储数据,取出数据
实现头脑:
1、实现缓冲区得有一块内存空间,采取vector<char>
vector底层着实使用的就是一个线性的内存空间
2、要素:
a、默认的空间大小
b、当前的读取数据数据位置
c、当前的写入数据位置
3、操纵
a、写入数据:
当前写入位置指向哪里,就从哪里开始写
如果后面剩余空间不敷了
1、考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前面有大概会有空闲空间)
足够:将数据移动到起始位置即可
不敷:给数组vector扩容
数据一旦写入乐成,当前位置,就要向后移动
2、读取数据:
当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读
可读数据大小:当前写入位置减去当前读取位置
实现缓冲区类,该怎样设计:
class Buffer{
private:
std::vector<char> _buffer;
//位置,是一个相对偏移量,而不是绝对地址
uint64_t _read_idx;//相对写偏移量
uint64_t _weite_idx;//相对读偏移量
public:
1、获取当前写位置地址
2、确保可写空间足够(移动+ 扩容)
3、获取前沿空闲空间大小
4、获取后沿空闲空间大小
5、将写位置向后移动指定长度
6、获取当前读位置的地址
7、获取可读空间大小
8、将读位置向后移动指定长度
9、整理功能
代码实现:
#include <iostream>
#include <vector>
#include <string>
#include <cassert>
#include <cstring>
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:
std::vector<char> _buffer;//使用vector进行内存空间管理
uint64_t _reader_idx;//读偏移
uint64_t _writer_idx; //写偏移
public:
Buffer():_reader_idx(0),_writer_idx(0),_buffer(BUFFER_DEFAULT_SIZE){}
char *Begin(){return &*_buffer.begin();} //获取起始位置
//获取当前写入起始位置 //buffer空间起始地址+写偏移量
char *WritePosition()
{ return Begin() + _writer_idx;}
//获取当前读取起始位置
char *ReadPosition()
{return Begin() + _reader_idx;}
//获取缓冲区末尾空闲空间大小 --- 写偏移之后的空闲空间----总体空间大小减去写偏移
uint64_t TailIdleSize() {return _buffer.size() - _writer_idx;}
//获取缓冲区起始空闲空闲空间大小 --- 读偏移之前的空闲时间
uint64_t HeadIdleSize() {return _reader_idx;}
//获取可读数据大小
uint64_t ReadAbleSize() {return _writer_idx - _reader_idx;}
//将读偏移向后移动
void MoveReadOffset(uint64_t len) {
if(len == 0) return;
//向后移动得大小必须小于可读数据得大小
assert(_reader_idx + len <= ReadAbleSize());
_reader_idx += len;
}
//将写偏移向后移动
void MoveWriteOffset(uint64_t len)
{
//向后移动得大小,必须小于当前后边的空闲空间大小
assert(len <= TailIdleSize());
_writer_idx += len;
}
//确保可写空间足够(整体空闲空间够了就移动,不够就扩容)
void EnsureWriteSpace(uint64_t len)
{
//如果末尾空间大小足够,直接返回
if(TailIdleSize() >= len) return;
//末尾空闲空间不够,则加上起始空间大小是否足够
if(len <= TailIdleSize() + HeadIdleSize())
{
//够了,将数据移动到起始位置
uint64_t rsz = ReadAbleSize();//把当前数据大小先保存起来
std::copy(ReadPosition(),ReadPosition() + rsz,Begin());//把可读数据拷贝到起始位置
_reader_idx = 0;//将读偏移归0
_writer_idx = rsz;//将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量
}else
{
//总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可
_buffer.resize(_writer_idx + len);
}
}
//写入数据
void Write(const void *data,uint64_t len)
{
//1、保证有足够空间 2、拷贝数据进去
if(len == 0) return;
EnsureWriteSpace(len);
const char *d = (const char *)data;
std::copy(d, d + len, WritePosition());
}
void WriteAndPush(const void *data, uint64_t len)
{
Write(data,len);
MoveWriteOffset(len);
}
void WriteString(const std::string &data)
{
return Write(data.c_str(), data.size());
}
void WriteStringAndPush(const std::string &data)
{
WriteString(data);
MoveWriteOffset(data.size());
}
void WriteBuffer(Buffer &data)
{
Write(data.ReadPosition(), data.ReadAbleSize());
}
void WriteBufferAndPush(Buffer &data)
{
WriteBuffer(data);
MoveWriteOffset(data.ReadAbleSize());
}
//读取数据
void Read(void *buf, uint64_t len)
{
//要求要获取的数据大小必须小于可读数据大小
assert(len <= ReadAbleSize());
std::copy(ReadPosition(),ReadPosition() + len, (char*)buf);
}
void readAndPop(void *buf, uint64_t len)
{
Read(buf, len); //读数据
MoveReadOffset(len);//指针向后移动
}
std::string ReadAsString(uint64_t len)
{
assert(len <= ReadAbleSize());
std::string str;
str.resize(len);
Read(&str[0], len);
return str;
}
std::string ReadAsStringAndPop(uint64_t len)
{
std::string str = ReadAsString(len); //读数据
MoveReadOffset(len);//指针向后移动
return str;
}
char *FindCRLF() //查找换行符
{
char *res = (char*)memchr(ReadPosition(),'\n', ReadAbleSize());
return res;
}
std::string GetLine() //取出一行
{
char *pos = FindCRLF();
if(pos == NULL)
{
return "";
}
//+1是为了把换行符也取出来
return ReadAsString(pos - ReadPosition() + 1);
}
std::string GetLineAndPop()
{
std::string str = GetLine();
MoveReadOffset(str.size());
}
//清空缓冲区
void Clear()
{
//只需要将偏移量归0 覆盖写即可
_reader_idx = 0;
_writer_idx = 0;
}
};
复制代码
2、套接字Socket类实现:
创建套接字
绑定地址信息
开始监听
向服务器发起毗连
获取新毗连
接收数据
发送数据
关闭套接字
创建一个服务端毗连
创建一个客户端毗连
设置套接字选项---开启地址端口重用 一个毗连绑定端口和地址之后,一旦主动断开毗连他就会进入timewait掩护状态,套接字并不会立即被释放,所以ip地址和端标语就依然被占用,无法立纵然用它。在服务器的使用中就会造成服务器一旦出了问题,就会无法立即启动,因此要开启地址和端口重用。
设置套接字的壅闭属性---设置为非壅闭 壅闭是当缓冲区中没有数据了套接字就一直等,程序就无法向后执行,因此要设置为非壅闭
代码实现:
#define MAX_LISTEN 1024
class Socket {
private:
int _sockfd;
public:
Socket():_sockfd(-1) {}
Socket(int fd): _sockfd(fd) {}
~Socket() { Close(); }
int Fd() { return _sockfd; }
//创建套接字
bool Create() { //因为创建套接字可能会失败,失败之后如何处理由使用者来决定
// int socket(int domain, int type, int protocol)
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);//ipv4、流式套接字、tcp协议(协议类型)
if (_sockfd < 0) {
ERR_LOG("CREATE SOCKET FAILED!!");
return false;
}
return true;
}
//绑定地址信息
bool Bind(const std::string &ip, uint16_t port) { //要告诉绑定什么
struct sockaddr_in addr; //组织地址结构
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int bind(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("BIND ADDRESS FAILED!");
return false;
}
return true;
}
//开始监听
bool Listen(int backlog = MAX_LISTEN) { //MAX_LISTEN同一时间最大并发连接数
// int listen(int backlog)
int ret = listen(_sockfd, backlog);//将一个套接字的状态设置为listen状态,并设置同一时间最大连接数
if (ret < 0) {
ERR_LOG("SOCKET LISTEN FAILED!");
return false;
}
return true;
}
//向服务器发起连接
bool Connect(const std::string &ip, uint16_t port) {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
// int connect(int sockfd, struct sockaddr*addr, socklen_t len);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0) {
ERR_LOG("CONNECT SERVER FAILED!");
return false;
}
return true;
}
//获取新连接
int Accept() { //获取新连接的描述符返回
// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);
int newfd = accept(_sockfd, NULL, NULL);//通过监听套接字获取一个新建连接的描述符并且返回当前连接上的客户端的地址信息(但是这些地址信息并没有用到,因此只需要获取新的描述符即可)
if (newfd < 0) {
ERR_LOG("SOCKET ACCEPT FAILED!");
return -1;
}
return newfd;
}
//接收数据
ssize_t Recv(void *buf, size_t len, int flag = 0) { //有符号长整型
// ssize_t recv(int sockfd, void *buf, size_t len, int flag);
ssize_t ret = recv(_sockfd, buf, len, flag);
if (ret <= 0) {
//EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误
//EINTR 表示当前socket的阻塞等待,被信号打断了,
if (errno == EAGAIN || errno == EINTR) {
return 0;//表示这次接收没有接收到数据
}
ERR_LOG("SOCKET RECV FAILED!!");
return -1;
}
return ret; //实际接收的数据长度
}
ssize_t NonBlockRecv(void *buf, size_t len) { //非阻塞接收数据
return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。
}
//发送数据
ssize_t Send(const void *buf, size_t len, int flag = 0) {
// ssize_t send(int sockfd, void *data, size_t len, int flag);
ssize_t ret = send(_sockfd, buf, len, flag);
if (ret < 0) {
if (errno == EAGAIN || errno == EINTR) {
return 0;
}
ERR_LOG("SOCKET SEND FAILED!!");
return -1;
}
return ret;//实际发送的数据长度
}
ssize_t NonBlockSend(void *buf, size_t len) { //非阻塞发送数据
if (len == 0) return 0;
return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。
}
//关闭套接字
void Close() {
if (_sockfd != -1) {
close(_sockfd);
_sockfd = -1;
}
}
//创建一个服务端连接
bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {
//1. 创建套接字,2. 绑定地址,3. 开始监听,4. 设置非阻塞, 5. 启动地址重用
if (Create() == false) return false;
if (block_flag) NonBlock();//开启非阻塞
if (Bind(ip, port) == false) return false;
if (Listen() == false) return false;
ReuseAddress(); //开启地址重用
return true;
}
//创建一个客户端连接
bool CreateClient(uint16_t port, const std::string &ip) {
//1. 创建套接字,2.指向连接服务器
if (Create() == false) return false;
if (Connect(ip, port) == false) return false; //给的是服务器的ip地址和端口号
return true;
}
//设置套接字选项---开启地址端口重用
void ReuseAddress() {
// int setsockopt(int fd, int leve, int optname, void *val, int vallen)
int val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));
//第二个参数 表示选项所在的协议层 SOL_SOCKET 代表套接字层
//SO_REUSEADDR 允许在绑定地址时,即使该地址已被占用,
//只要原占用的套接字处于 TIME_WAIT 状态,新的套接字也能绑定该地址。
val = 1;
setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));
//SO_REUSEPORT:允许多个套接字绑定到相同的地址和端口,不过前提是这些套接字都设置了该选项。
//这在负载均衡和多线程 / 多进程网络编程中非常有用。
}
//设置套接字阻塞属性-- 设置为非阻塞
void NonBlock() {
//int fcntl(int fd, int cmd, ... /* arg */ );
int flag = fcntl(_sockfd, F_GETFL, 0);
fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
}
};
复制代码
3、事件管理Channel类实现:
事件触发后的处理的管理:
1、须要处理的事件:可读、可写、挂断、错误、任意
2、事件处理回调函数
成员:因为后边使用epoll进行事件监控
EPOLLIN 可读
EPOLLOUT 可写
EPOLLRDHUP 毗连断开
EPOLLPRI 优先数据
EPOLLERR 出错了
EPOLLHUP 挂断
而以上的事件都是数值 uint32_t 进行生存
要进行事件管理,就须要有一个uint32_t 类型的成员生存当前须要监控的事件
事件处理这里,因为有五种事件须要处理,就须要五个回调函数
代码实现:
class Poller;
class EventLoop;
//Channel用于管理文件描述符的事件监控和处理
class Channel {
private:
int _fd;//要监控的对象
EventLoop *_loop;//EventLoop 事件循环的核心类,负责事件的轮询和分发
uint32_t _events; // 当前需要监控的事件 //uint32_t 每个位可以表示一个特定事件类型
uint32_t _revents; // 当前连接触发的事件//事件----读、写、错误、连接断开、任意事件被触发
using EventCallback = std::function<void()>;
EventCallback _read_callback; //可读事件被触发的回调函数
EventCallback _write_callback; //可写事件被触发的回调函数
EventCallback _error_callback; //错误事件被触发的回调函数
EventCallback _close_callback; //连接断开事件被触发的回调函数
EventCallback _event_callback; //任意事件被触发的回调函数
public:
Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}
int Fd() { return _fd; }
uint32_t Events() { return _events; }//获取想要监控的事件
void SetREvents(uint32_t events) { _revents = events; }//设置实际就绪的事件
void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }
void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }
void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }
void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }
void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }
//当前是否监控了可读
bool ReadAble() { return (_events & EPOLLIN); } //& 都为1是1,否则为0 即如果结果不为0就是监控了读事件
//当前是否监控了可写
bool WriteAble() { return (_events & EPOLLOUT); }//同理
//启动读事件监控
void EnableRead() { _events |= EPOLLIN; Update(); }//或 (有一个为1就是1,两个都是0,才是0)//将读事件添加到需要监控的事件集合中
//启动写事件监控
void EnableWrite() { _events |= EPOLLOUT; Update(); }
//关闭读事件监控
void DisableRead() { _events &= ~EPOLLIN; Update(); }//&EPOLLIN的~ 就会将原来为1的位置置0(&运算---只有全都为1,才是1,否则为0)//读0000 0001 写0000 0010 _events 里对应 EPOLLIN 的那一位就会被置为 0,也就意味着取消了对可读事件的监控
//关闭写事件监控
void DisableWrite() { _events &= ~EPOLLOUT; Update(); }
//关闭所有事件监控
void DisableAll() { _events = 0; Update(); }
//移除监控---从epoll的红黑树上直接进行移除
void Remove();
void Update();//更新事件监控状态
//事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定
void HandleEvent() {
if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {//只有当_revents有相应的事件(即对应位为1时,&才会为1)
//检查是否触发了可读事件、检查对方是否关闭了连接的写端(半关闭)、检查是否有紧急数据可读
/*不管任何事件,都调用的回调函数*/
if (_read_callback) _read_callback();
}
/*有可能会释放连接的操作事件,一次只处理一个*/
if (_revents & EPOLLOUT) { //检查是否触发了可写事件
if (_write_callback) _write_callback();
}else if (_revents & EPOLLERR) { //检查是否触发了错误事件
if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调
}else if (_revents & EPOLLHUP) { //检查是否触发了连接断开事件
if (_close_callback) _close_callback();
}
if (_event_callback) _event_callback();//无论前面处理了哪些具体事件,只要 _event_callback 不为空,都会调用该回调函数。
//这个回调函数可以用于处理一些通用的事件逻辑,例如记录日志、统计事件次数等。
}
};
复制代码
4、 形貌符事件监控Poller类实现:
通过epoll实现对形貌符的IO事件监控
封装头脑:
1、必须拥有一个epoll
操纵句柄
2、拥有一个struct epoll_event
结构数组
,监控时生存所有的活泼事件
3、使用
哈希表
管理形貌符与形貌符对应的事件管理Channel对象
逻辑流程:
1、对形貌符进行监控,通过Channel才气知道形貌符须要监控什么事件
2、当形貌符就绪了,通过形貌符在哈希表中找到对应的Channel(得到了channel才知道什么事件怎样处理)
当形貌符就绪了,返回就绪形貌符对应的channel
public:(对外的接口)添加或更新形貌符所监控的事件、移除形貌符的监控、开始监控,获取就绪的channel
Channel类和Poller类的关系:
Channel类:负责封装单个文件形貌符的事件管理和处理逻辑。记录文件形貌符一样平常都须要监控的事件(可读、可写、错误等等),并且为差别类型的事件设置对应的回调函数。(当文件形貌符上的事件触发,Channel类会调用相应的回调函数进行处理)
对文件形貌符的事件管理进行封装,记录fd要监控的事件,并在触发时调用相应的回调函数。(只是管理,不进行监控)
Poller类:作为一个事件轮询器,负责管理多个Channel对象。他通过epoll机制来监听所有注册的文件形貌符上的事件,并在有事件发生的时候通知对应的Channel对象。类提供了添加、修改和移除事件监控的接口,以及开始轮询事件的功能。
(管理多个Channel对象,使用epoll机制进行监控)
协作流程
注册阶段:用户创建channel对象并设置好须要监控的事件和回调函数,然后通过Poller类的UpdateEvent方法将Channel对象注册到Poller中Poller会将Channel对象的文件形貌符和对应的事件信息添加到epoll实例中进行监控。
轮询阶段
oller类调用epoll_wait函数进入轮询状态,等候文件形貌符上的事件发生。当有事件发生时,Poller 会获取到就绪的文件形貌符列表,并根据文件形貌符找到对应的Channel对象。
事件处理阶段
oller会调用 Channel 对象的SetREvent方法设置现实触发的事件,然后将Channel对象添加到活泼列表中。最后,用户可以从活泼列表中取出Channel对象,并调用其Handlevent方法处理事件。
代码实现:
#define MAX_EPOLLEVENTS 1024
class Poller {
private:
int _epfd;
struct epoll_event _evs[MAX_EPOLLEVENTS];
std::unordered_map<int, Channel *> _channels;
private:
//对epoll的直接操作
void Update(Channel *channel, int op) {
// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
int fd = channel->Fd();
struct epoll_event ev;//存储要监控的事件信息
ev.data.fd = fd;
ev.events = channel->Events();//将 Channel 对象中需要监控的事件设置到 ev.events 中
int ret = epoll_ctl(_epfd, op, fd, &ev);//_epfd 是 epoll 实例的文件描述符,fd 是要操作的文件描述符
if (ret < 0) {
ERR_LOG("EPOLLCTL FAILED!");
}
return;
}
//判断一个Channel是否已经添加了事件监控
bool HasChannel(Channel *channel) {//判断指定的Channel对象是否以及添加到epoll实例的监控列表中
auto it = _channels.find(channel->Fd());
if (it == _channels.end()) {
return false;
}
return true;
}
public:
Poller() { //构造函数 创建epoll实例
_epfd = epoll_create(MAX_EPOLLEVENTS); //创建epoll实例
if (_epfd < 0) {
ERR_LOG("EPOLL CREATE FAILED!!");
abort();//退出程序
}
}
//添加或修改监控事件
void UpdateEvent(Channel *channel) {
bool ret = HasChannel(channel);
if (ret == false) {
//不存在则添加
_channels.insert(std::make_pair(channel->Fd(), channel));//它的作用是创建一个 std::pair 对象 能够存储两个不同类型的值,分别称为 first 和 second
return Update(channel, EPOLL_CTL_ADD);//添加
}
return Update(channel, EPOLL_CTL_MOD);//修改
}
//移除监控
void RemoveEvent(Channel *channel) {
auto it = _channels.find(channel->Fd());
if (it != _channels.end()) {
_channels.erase(it);
}
Update(channel, EPOLL_CTL_DEL);
}
//开始监控,返回活跃连接
void Poll(std::vector<Channel*> *active) {
// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)
int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
//_epfd 是 epoll 实例的文件描述符,_evs 是存储就绪事件的数组,
//MAX_EPOLLEVENTS 是一次最多能处理的事件数量,-1 表示无限等待,直到有事件发生
if (nfds < 0) { //nfds 表示就绪事件的数量
if (errno == EINTR) {
return ;
}
ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));
abort();//退出程序
}
for (int i = 0; i < nfds; i++) {
auto it = _channels.find(_evs[i].data.fd);
assert(it != _channels.end());
it->second->SetREvents(_evs[i].events);//设置实际就绪的事件
active->push_back(it->second);
}
return;
}
};
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
复制代码
5、定时使命管理TimerWheel类实现:
定时器模块整合:
timefd:实现内核每隔一段时间,给线程一次超时势件(timerfd可读)
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时使命
要实现一个完整的秒级定时器,就须要将这两个功能整合到一起
timerfd设置为每秒钟触发一次定时势件,当事件触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时使命
而timerfd的事件监控与触发,可以融合EventLoop来实现
TimerTask.hpp TimerWheel.hpp
class TimerTask //这个类代表定时器任务
{
private:
uint64_t _id; //定时器任务对象ID
uint32_t _timeout; //定时器任务的超时时间
bool _canceled; //false表示没有被取消 true表示被取消了
TaskFunc _task_cb; //定时器对象要执行的定时任务 --- 任务回调函数
ReleaseFunc _release; //定时任务结束时 用于删除 TimerWheel中保存的定时器对象信息
public:
TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb)
:_id(id),
_timeout(delay),//外界自己传入
_task_cb(cb),
_canceled(false)
{}
~TimerTask() //执行定时器任务
{
if(_canceled == false)
_task_cb(); //当定时任务触发时,需要执行的具体操作//在析构的时候执行是因为 定时器的任务是销毁不活跃的连接 那么 他的本质任务就是销毁 即可以在类对象析构的时候任务对象被销毁
//具体执行什么函数会自己设置 在这个任务构造的时候 需要自己传入的参数第三个
_release();// 从TimerWheel 的 _timers 哈希表中删除当前定时器任务的信息 --调用这个函数就是调用TimerWheel类中的RemoveTimer(因为下面的bind函数)
}
void Cancel()
{
_canceled = true; //true代表已经被取消
}
void SetRelease(const ReleaseFunc &cb) //传入的参数是函数
{
_release = cb;
}
uint32_t DelayTime()
{
return _timeout;
}
};
class TimerWheel { ///管理这些定时器任务
private:
using WeakTask = std::weak_ptr<TimerTask>;
using PtrTask = std::shared_ptr<TimerTask>;
int _tick; //当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务
int _capacity; //表盘最大数量---其实就是最大延迟时间
std::vector<std::vector<PtrTask>> _wheel; //二维数组 里面存放的是定时器任务的指针指针
std::unordered_map<uint64_t, WeakTask> _timers;
EventLoop *_loop;
//定时器超时 读取一次数据 运行过期任务
int _timerfd;//定时器描述符--可读事件回调就是读取计数器,执行定时任务
std::unique_ptr<Channel> _timer_channel;
private:
void RemoveTimer(uint64_t id) { //从哈希表中删除任务 通过任务的id找到任务
auto it = _timers.find(id);
if (it != _timers.end()) {
_timers.erase(it);
}
}
static int CreateTimerfd() {
int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);//创建一个定时器文件描述符
//CLOCK_MONOTONIC 表示单调时钟,它从系统启动时开始计时,不会因为系统时间的调整(如设置系统时间)而发生跳变,适合用于测量时间间隔和定时任务
if (timerfd < 0) {
ERR_LOG("TIMERFD CREATE FAILED!");
abort();
}
//int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);
struct itimerspec itime; //定时器每秒触发一次超时
itime.it_value.tv_sec = 1;
itime.it_value.tv_nsec = 0;//第一次超时时间为1s后
itime.it_interval.tv_sec = 1;
itime.it_interval.tv_nsec = 0; //第一次超时后,每次超时的间隔时
//第一次超时时间为 1 秒后,之后每次超时的间隔也是 1 秒
timerfd_settime(timerfd, 0, &itime, NULL);//设置定时器的超时时间,最后返回创建好的定时器文件描述符
return timerfd;
}
int ReadTimefd() {
uint64_t times;
//有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次
//read读取到的数据times就是从上一次read之后超时的次数
int ret = read(_timerfd, ×, 8);//从定时器文件描述符 _timerfd 中读取数据,数据表示从上一次读取之后的超时次数
if (ret < 0) {
ERR_LOG("READ TIMEFD FAILED!");
abort();
}
return times;
}
//这个函数应该每秒钟被执行一次,相当于秒针向后走了一步
void RunTimerTask() {
_tick = (_tick + 1) % _capacity; //_tick指到哪里哪里被清理
_wheel[_tick].clear();//清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉
}
void OnTime() {
//根据实际超时的次数,执行对应的超时任务
int times = ReadTimefd();
for (int i = 0; i < times; i++) {
RunTimerTask(); //给超时任务规定执行的次数 即根据超时事件的基本单位 来确定超时次数 再通过超时一次 执行一次定时任务
}
}
//定时任务的添加必须在EventLoop线程中去添加
void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务 --第三个参数就是定时器任务触发时,具体需要执行的任务
{
PtrTask pt(new TimerTask(id, delay, cb));
pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));//在添加定时任务的时候,就将id和将RemoveTimer绑定形成一个新的函数,并将这个函数设置为 TimerTask 对象的 _release 回调函数,即在添加定时任务的时候就已经设置好了,该任务在超时的时候应该执行什么任务
int pos = (_tick + delay) % _capacity;
_wheel[pos].push_back(pt);//数组
_timers[id] = WeakTask(pt); //_timers哈希表中,值为id的元素(如果有就跟新,如果没有就新创建) WeakTask(pt)----以pt这个 std::shared_ptr为参数构建了一个std::weak_ptr<TimerTask> 类型的弱引用
}
void TimerRefreshInLoop(uint64_t id)//刷新/延迟定时任务
{
//通过保存的定时器对象的weak_ptr构造一个share_ptr出来,添加到轮子中
auto it = _timers.find(id);
if(it == _timers.end())
{
return;//没找到定时任务,无法进行刷新,无法延迟
}
PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptr
//it->second代表 与id对应的 std::weak_ptr<TimerTask> 对象
//std::weak_ptr 类的一个成员函数,它的作用是尝试创建一个指向 std::weak_ptr 所观察对象的 std::shared_ptr
//从 _timers 哈希表中找到与给定 id 对应的 std::weak_ptr<TimerTask> 对象,
//然后调用其 lock() 方法尝试获取一个指向该 TimerTask 对象的 std::shared_ptr。
//如果该 TimerTask 对象还存在(即其引用计数不为 0),则 lock() 方法会返回一个有效的 std::shared_ptr,
//并将其赋值给 pt;如果该 TimerTask 对象已经被销毁(引用计数为 0),则 lock() 方法会返回一个空的 std::shared_ptr。
//为什么这样写????
//由于 _timers 中存储的是 std::weak_ptr,我们不能直接通过它来操作对象。
//因此,需要调用 lock() 方法获取一个 std::shared_ptr,这样才能确保在操作对象时,对象是存在的。
//同时,使用 std::shared_ptr 操作对象可以保证在操作期间对象不会被意外销毁,因为 std::shared_ptr 会增加对象的引用计数。
int dalay = pt->DelayTime();//DelayTime() 这个时间外界自己传入
int pos = (_tick + dalay) % _capacity;
_wheel[pos].push_back(pt); //重新更新位置
}
void TimerCancelInLoop(uint64_t id)
{
auto it = _timers.find(id);
if(it != _timers.end())
{
return;//没找到定时任务,无法进行刷新,无法延迟
}
PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptr
if(pt)
pt->Cancel();
}
public:
TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop),
_timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) {
_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
//(Channel 类的对象)_timer_channel 通常代表一个事件通道,用于管理某个文件描述符 这里是是定时器文件描述符 _timerfd 的事件和回调
//通过调用 SetReadCallback 方法,将 OnTime 函数设置为当该通道对应的文件描述符有可读事件发生时要执行的回调函数
//OnTime 它会读取定时器文件描述符中的超时次数,并根据超时次数执行相应的定时器任务
_timer_channel->EnableRead();//启动读事件监控
}
/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*/
/*如果不想加锁,那就把对定期的所有操作,都放到一个线程中进行*/
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);
//刷新/延迟定时任务
void TimerRefresh(uint64_t id);
void TimerCancel(uint64_t id);
/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/
bool HasTimer(uint64_t id) {
auto it = _timers.find(id);
if (it == _timers.end()) {
return false;
}
return true;
}
};
复制代码
eventfd
eventfd:一种事件通知机制
创建一个形貌符用于实现事件通知
eventfd本质在内核里面管理管理的就是一个计数器
创建eventfd就会在内核中创建一个计数器(结构),每当向eventfd中写入一个数值-----用于表示事件通知的次数。可以使用read进行数据的读写,读取到的数据就是通知的次数
假设每次给eventfd中写入一个1,就表示通知了1次,一连写了三次之后,再去read读取出来的数字就是3,读取之后计数清0.
用处:在EventLoop模块中实现线程间的事件通知功能。
#include <sys/eventfd.h>
int eventfd(unsigned int intval, int flags);
功能:创建一个eventfd对象,实现事件通知
参数:
initval:计数初值
flags:EFD_CLOEXEC--禁止历程复制(
表示在执行exec系列函数时关闭该文件形貌符)
EFD_NONBLOCK ---启动非壅闭属性
返回值:返回一个文件形貌符用于操纵
eventfd也是通过read/write/close进行操纵的
注意:read 和 write进行IO的时候数据只能是一个8字节的数据
int mian()
{
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if(efd < 0)
{
perror("eventfd failed!!");
return -1;
}
uint64_t val = 1;
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
write(efd, &val, sizeof(val));
uint64_t res = 0;
read(efd, &res, sizeof(res));
printf("%ld\n", res);
return 0;
}
运行结果: 3
复制代码
6、Reactor-EventLoop线程池类实现:
有多少个线程就有多少了EventLoop
监控了一个毗连,而这个毗连一旦就绪,就要进行事件处理。如果在毗连处理过程中,这个毗连又触发了其他的事件处理,会不会被分配到其他线程中去处理。如果这个形貌符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题(如果每一个毗连都创建一把锁,显然不现实,斲丧很大)。
因此我们须要将一个毗连事件监控,以及毗连事件处理,以及其他操纵都放在同一个线程中进行(一个毗连无法绑定一个线程,可是一个EventLoop对应一个线程,我们可以把一个毗连绑定到EventLoop中)
怎样保证一个毗连的所有操纵都在eventloop对应的线程中?
解决方案:给eventloop模块中添加一个使命队列。对毗连的所有操纵都进行一次封装,将对毗连的操纵并不直接执行,而是当作使命添加到使命队列中。
eventloop处理流程:
1、在线程中对形貌符进行事件监控
2、又形貌符就绪则对形貌符进行事件处理(怎样保证处理回调函数的操纵都在线程中)
3、所有的就绪事件处理完了,这时候再去将使命队列中的所有使命一一执行
这样能保证对于毗连的所有操纵,都是在一个线程中执行的,不涉及线程安全问题
但是对于使命队列的操纵有线程安全问题,只须要给task的操纵加一把锁即可
1、事件监控:
使用Poller模块 有事件就绪则进行事件处理
2、执行使命队列中的使命
一个线程安全的使命队列
注意:因为有大概因为等候形貌符IO事件就绪,导致执行流流程壅闭,这时候使命队列中的使命将得不到执行
因此,要有一个事件通知的东西,能够唤醒事件监控的壅闭
当事件就绪,须要处理的时候,处理过程中,如果对毗连进行某些操纵:
这些操纵必须在eventloop对应的线程中执行,保证对毗连的各项操纵都是线程安全的。
1、如果执行的操纵本就在线程中,就不须要将操纵压入队列了,可以直接执行
2、如果执行的操纵不在线程中,才须要参加使命池,等候事件处理完了然后执行使命
class EventLoop {
private:
using Functor = std::function<void()>;
std::thread::id _thread_id;//线程ID--用于判断某个操作是否在该 EventLoop 对应的线程中执行 是就在线程里面执行 不是就压入线程池
int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞
std::unique_ptr<Channel> _event_channel;//一个智能指针,指向与 _event_fd 相关联的 Channel 对象,用于管理 eventfd 的事件和回调
Poller _poller;//进行所有描述符的事件监控
std::vector<Functor> _tasks;//任务池 用于存储待执行的任务队列,每个任务都是一个 Functor 类型的函数对象
std::mutex _mutex;//实现任务池操作的线程安全
TimerWheel _timer_wheel;//定时器模块
public:
//执行放入任务池中的所有任务
//RunAllTask 函数通常在 EventLoop 对应的线程中执行,也就是说任务的执行是在单线程环境下进行的。
//在单线程环境中,不存在多个线程同时访问和修改共享资源的问题,因此可以避免线程安全问题
void RunAllTask() {
std::vector<Functor> functor;
{
std::unique_lock<std::mutex> _lock(_mutex);
_tasks.swap(functor);//交换完之后functor里面就是任务,_tasks里面就是空的了 对共享资源进行加锁
}
for (auto &f : functor) { //这样做的好处是可以在解锁后再执行任务,减少锁的持有时间,提高程序的并发性能。
f();
}
return ;
}
static int CreateEventFd() {
//eventfd用于创建一个文件描述符
int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); //CreateEventFd()返回值efd就赋值给了_event_fd
if (efd < 0) {
ERR_LOG("CREATE EVENTFD FAILED!!");
abort();//让程序异常退出
}
return efd;
}
void ReadEventfd() {//读取efd
uint64_t res = 0;
int ret = read(_event_fd, &res, sizeof(res));//从 _event_fd 中读取数据到 res 中 把数据读取出来进行清0 不读取就一直提示有数据可读
if (ret < 0) {
//EINTR -- 被信号打断; EAGAIN -- 表示无数据可读
if (errno == EINTR || errno == EAGAIN) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
void WeakUpEventFd() {//向 eventfd 写入数据,从而唤醒可能因没有事件就绪而阻塞的 IO 事件监控
uint64_t val = 1;
int ret = write(_event_fd, &val, sizeof(val));//_event_fd 写入 val 的值 写入数据了 就不会阻塞了
if (ret < 0) {
if (errno == EINTR) {
return;
}
ERR_LOG("READ EVENTFD FAILED!");
abort();
}
return ;
}
public:
EventLoop():_thread_id(std::this_thread::get_id()), //获取当前线程的 ID 并赋值给 _thread_id
_event_fd(CreateEventFd()), //CreateEventFd()返回值efd就赋值给了_event_fd
_event_channel(new Channel(this, _event_fd)),
_timer_wheel(this) {
//给eventfd添加可读事件回调函数,读取eventfd事件通知次数
_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));
//启动eventfd的读事件监控
_event_channel->EnableRead();
}
//启动EventLoop模块//三步走--事件监控-》就绪事件处理-》执行任务
void Start() { //整个EventLoop流程
while(1) {
//1. 事件监控,
std::vector<Channel *> actives;
_poller.Poll(&actives);//Poller类中的Poll函数 开始监控并返回活跃连接
//2. 事件处理。
for (auto &channel : actives) { //actives 活跃连接
channel->HandleEvent();//进行事件处理 不同事件进行不同处理
}
//3. 执行任务
RunAllTask();
}
}
//用于判断当前线程是否是EventLoop对应的线程;
bool IsInLoop() {
return (_thread_id == std::this_thread::get_id());//_thread_id EventLoop创建时的id std::this_thread::get_id()获取当前线程的id
}
void AssertInLoop() {
assert(_thread_id == std::this_thread::get_id());
}
//判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。
void RunInLoop(const Functor &cb) { //给我提供一个任务 他在线程中就执行他 他不在就将他压入线程池中
if (IsInLoop()) {
return cb(); //在就执行任务函数
}
return QueueInLoop(cb); //不在 压入任务池
}
//将操作压入任务池
void QueueInLoop(const Functor &cb) {
{
std::unique_lock<std::mutex> _lock(_mutex); //加锁 创建_lock对象的时候 会对_mutex加锁 保证在_lock生命收起内 _mutex保护的区域不会有别的线程访问
_tasks.push_back(cb); //我们将任务压入到任务池中了,可是线程阻塞在事件监控,现在没有描述符就绪事件,那么事件监控就一直在等,等有事件了,才会处理任务,就会导致事件久久得不到执行
}
//因此需要 唤醒有可能因为没有事件就绪,而导致的epoll阻塞;----- 因为我们是先进行事件监控再进行任务执行
//唤醒事件就绪-----其实就是给eventfd写入一个数据,eventfd就会触发可读事件(有事件就绪了,就不会再阻塞了)
WeakUpEventFd();
}
//事件监控
//添加/修改描述符的事件监控
void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }
//移除描述符的监控
void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }
void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }
void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }
void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }
bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};
void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }
复制代码
对以上代码模块的整体明白 梳理上面模块的关系:
#include "../source/server.hpp"
void HandleClose(Channel *channel)
{
DBG_LOG("close fd:%d",channel->Fd());
channel->Remove();//移除监控
delete channel;
}
void HandleRead(Channel *channel)
{
int fd = channel->Fd();
char buf[1024] = {0};
int ret = recv(fd, buf, 1023, 0);//读取数据
if(ret <= 0)
{
return HandleClose(channel);//关闭释放
}
DBG_LOG("%s", buf);
channel->EnableWrite();//启动可写事件监控,以便后续可以向该套接字发送数据
}
void HandleWrite(Channel *channel)
{
int fd = channel->Fd();
const char *data = "天气还不错!";
int ret = send(fd, data, strlen(data), 0);
if(ret < 0)
{
return HandleClose(channel);关闭释放
}
channel->DisableWrite();//关闭写监控 因为数据已经发送完,不需要再监控写事件(可写)就是向套接字的发送缓冲区写数据
}
void HandleEvent(EventLoop *loop, Channel *channel,uint64_t timerid)
{
loop->TimerRefresh(timerid);
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{
int fd = lst_channel->Fd();
int newfd = accept(fd,NULL, NULL);
if(newfd < 0) {return;}
uint64_t timerid = rand() % 10000; //生成一个随机的定时器 ID 范围在 0 到 9999 之间
Channel *channel = new Channel(loop, newfd);//创建一个新的 Channel 类对象,关联 EventLoop 对象 loop 和新连接的文件描述符 newfd
channel->SetReadCallback(std::bind(HandleRead,channel));//为通信套接字设置可读事件回调函数为 HandleWrite 函数
channel->SetWriteCallback(std::bind(HandleWrite,channel));//可写事件的回调函数
channel->SetCloseCallback(std::bind(HandleClose,channel));//关闭事件的回调函数
//channel->SetErrorCallback(std::bind(HandleError,channel));//错误事件的回调函数
channel->SetEventCallback(std::bind(HandleEvent,loop,channel,timerid));//任意事件的回调函数
//非活跃连接的超时释放操作,10s后关闭连接
//注意:定时销毁任务,必须在启动读事件之前,因为可能启动了事件监控之后,立即有了事件,但是这时候还没有任务
loop->TimerAdd(timerid, 10, std::bind(HandleClose,channel));//添加一个定时任务 timerid 为定时器 ID,10 表示超时时间为 10 秒 回调函数为 HandleClose 函数,用于在 10 秒后关闭该通道
channel->EnableRead();//启动新通道的可读事件监控,以便接收新连接上的数据
}
int main()
{
srand(time(NULL));
EventLoop loop;
Socket lst_sock;
lst_sock.CreateServer(8500);//创建一个监听再8500端口的服务器套接字
Channel channel(&loop, lst_sock.Fd());
channel.SetReadCallback(std::bind(Acceptor,&loop,&channel));//设置监听通道的可读事件回调函数为 Acceptor 函数
channel.EnableRead();//启动监听通道的可读事件监控,以便接受新的连接请求
while(1)
{
loop.Start();//启动事件循环,处理各种事件(如连接请求、数据读写等)
}
lst_sock.Close();//调用 Socket 类的 Close 方法,关闭监听套接字
return 0;
}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4