ToB企服应用市场:ToB评测及商务社交产业平台
标题:
【在Linux世界中追寻巨大的One Piece】多路转接epoll(续)
[打印本页]
作者:
欢乐狗
时间:
2024-11-17 03:32
标题:
【在Linux世界中追寻巨大的One Piece】多路转接epoll(续)
目录
1 -> epoll的工作方式
1.1 -> 水平触发(Level Triggered)工作模式
1.2 -> 边沿触发(Edge Triggered)工作模式
2 -> 对比LT与ET
3 -> 明确ET模式和非壅闭文件形貌符
4 -> epoll的利用场景
5 -> epoll示例
5.1 -> epoll服务器(LT模式)
5.2 -> epoll服务器(ET模式)
1 -> epoll的工作方式
epoll有2中工作方式:
水平触发(LT)
边沿触发(ET)
加入有如许一个例子:
已经把一个tcp_socket添加到epoll形貌符。
这个时间socket的另一端被写入了2KB的数据。
调用epoll_wait,而且它会返回。说明它已经准备好读取操纵。
然后调用read,只读取了1KB的数据。
继续调用epoll_wait……
1.1 -> 水平触发(Level Triggered)工作模式
epoll默认状态下就是LT工作模式。
当epoll检测到socket上事件就绪的时间,可以不立刻举行处理。或者只处理一部分。
如上面的例子,由于只读了1K的数据,缓冲区中还剩1K的数据,在第二次调用epoll_wait时,epoll_wait仍旧会立刻返回并关照socket读事件就绪。
直到缓冲区上所有的数据都被处理完,epoll_wait才不会立刻返回。
支持壅闭读写和非壅闭读写
。
1.2 -> 边沿触发(Edge Triggered)工作模式
如果我们在第一步将socket添加到epoll形貌符的时间利用了EPOLLET标记,epoll进入ET工作模式。
当epoll检测到socket上事件就绪时,必须立刻处理。
如上面的例子,虽然只读了1K的数据,缓冲区还剩1K的数据,在第二次调用epoll_wait的时间,epoll_wait不会再返回了。
也就是说,ET模式下,文件形貌符上的事件就绪后,只有一次处理机会。
ET的性能比LT的性能更高(epoll_wait返回的次数少了许多)。Nginx默认采用ET模式利用epoll。
只支持非壅闭的读写
。
select和poll着实也是工作在LT模式下。epoll既可以支持LT,也可以支持ET。
2 -> 对比LT与ET
LT是epoll的默认行为
。
利用ET能够减少epoll触发的次数。但是代价就是强逼着步伐员一次相应就绪过程中就把所有的数据都处理完。
相称于一个文件形貌符就绪后,不会反复被提示就绪,看起来就比LT更高效一些。但是在LT环境下如果也能做到每次就绪的文件形貌符都立刻处理,不让这个就绪被反复提示的话,着实性能也是一样的。
另一方面,ET的代码复杂水平更高了。
3 -> 明确ET模式和非壅闭文件形貌符
利用ET模式的epoll,需要将文件形貌设置为非壅闭。这个不是接口上的要求,而是“工程实践”上的要求。
假设一个场景:服务器接收到一个10k的请求,会向客户端返回一个应答数据。如果客户端收不到应答,不会发送第二个10K的请求。
如果服务端写的代码是壅闭式的read,而且一次只read1K数据的话(read不能包管一次就把所有的数据都读出来,参考man手册的说明,大概被信号打断),剩下的9K数据就会待在缓冲区中。
此时由于epoll是ET模式,并不会认为文件形貌符读就绪。epoll_wait就不会再次返回。剩下的9K数据会不停在缓冲区中。直到下一次客户端再给服务器写数据。
但是问题来了。
服务器只读到1K个数据,要10K读完才会给客户端返回相应数据。
客户端要读到服务器的相应,才会发送下一个请求。
客户端发送了下一个请求,epoll_wait才会返回,才气去读缓冲区中剩余的数据。
以是,为了办理上述问题(壅闭read不肯定能一下把完整的请求读完),于是就可以利用非壅闭轮训的方式来读缓冲区,包管肯定能把完整的请求都读出来。
而如果是LT没这个问题。只要缓冲区中的数据没读完,就能够让epoll_wait返回文件形貌符读就绪。
4 -> epoll的利用场景
epoll的高性能,是有肯定的特定场景的。如果场景选择的不适宜,epoll的性能大概适得其反。
对于多连接,且多连接中只有一部分连接比力活跃时,比力适合利用epoll。
比方,典型的一个需要处理上万个客户端的服务器,比方各种互联网APP的入口服务器,如许的服务器就很适合epoll。
如果只是系统内部,服务器和服务器之间举行通讯,只有少数的几个连接,这种环境下用epoll就并不适合。具体要根据需求和场景特点来决定利用哪种IO模子。
5 -> epoll示例
5.1 -> epoll服务器(LT模式)
tcp_epoll_server.hpp
///
// 封装一个 Epoll 服务器, 只考虑读就绪的情况
///
#pragma once
#include <vector>
#include <functional>
#include <iostream>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string*resp)> Handler;
class Epoll
{
public:
Epoll()
{
epoll_fd_ = epoll_create(10);
}
~Epoll()
{
close(epoll_fd_);
}
bool Add(const TcpSocket& sock) const
{
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
ev.events = EPOLLIN;
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0)
{
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const
{
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0)
{
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const
{
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0)
{
perror("epoll_wait");
return false;
}
// [注意!] 此处必须是循环到 nfds, 不能多循环
for (int i = 0; i < nfds; ++i)
{
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer
{
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip),
port_(port)
{
}
bool Start(Handler handler)
{
// 1. 创建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 绑定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 监听
CHECK_RET(listen_sock.Listen(5));
// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 进入事件循环
for (;;)
{
// 6. 进行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output))
{
continue;
}
// 7. 根据就绪的文件描述符的种类决定如何处理
for (size_t i = 0; i < output.size(); ++i)
{
if (output[i].GetFd() == listen_sock.GetFd())
{
// 如果是 listen_sock, 就调用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock);
}
else
{
// 如果是 new_sock, 就进行一次读写
std::string req, resp;
bool ret = output[i].Recv(&req);
if (!ret)
{
// [注意!!] 需要把不用的 socket 关闭
// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].Send(resp);
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
复制代码
dict_server.cc只需要将server对象的类型改成TcpEpollServer即可。
5.2 -> epoll服务器(ET模式)
基于LT版本稍加修改即可。
修改tcp_socket.hpp,新增非壅闭读和非壅闭写接口。
对于accept返回的new_sock加上EPOLLET如许的选项。
注意:
此代码暂时未考虑listen_sock ET的环境。如果将listen_sock设为ET,则需要非壅闭轮询的方式accept。否则会导致同一时刻大量的客户端同时连接的时间,只能accept一次的问题。
tcp_socket.hpp
// 以下代码添加在 TcpSocket 类中
// 非阻塞 IO 接口
bool SetNoBlock()
{
int fl = fcntl(fd_, F_GETFL);
if (fl < 0)
{
perror("fcntl F_GETFL");
return false;
}
int ret = fcntl(fd_, F_SETFL, fl | O_NONBLOCK);
if (ret < 0)
{
perror("fcntl F_SETFL");
return false;
}
return true;
}
bool RecvNoBlock(std::string* buf) const
{
// 对于非阻塞 IO 读数据, 如果 TCP 接受缓冲区为空, 就会返回错误
// 错误码为 EAGAIN 或者 EWOULDBLOCK, 这种情况也是意料之中, 需要重试
// 如果当前读到的数据长度小于尝试读的缓冲区的长度, 就退出循环
// 这种写法其实不算特别严谨(没有考虑粘包问题)
buf->clear();
char tmp[1024 * 10] = { 0 };
for (;;)
{
ssize_t read_size = recv(fd_, tmp, sizeof(tmp) - 1, 0);
if (read_size < 0)
{
if (errno == EWOULDBLOCK || errno == EAGAIN)
{
continue;
}
perror("recv");
return false;
}
if (read_size == 0)
{
// 对端关闭, 返回 false
return false;
}
tmp[read_size] = '\0';
*buf += tmp;
if (read_size < (ssize_t)sizeof(tmp) - 1)
{
break;
}
}
return true;
}
bool SendNoBlock(const std::string& buf) const
{
// 对于非阻塞 IO 的写入, 如果 TCP 的发送缓冲区已经满了, 就会出现出错的情况
// 此时的错误号是 EAGAIN 或者 EWOULDBLOCK. 这种情况下不应放弃治疗
// 而要进行重试
ssize_t cur_pos = 0; // 记录当前写到的位置
ssize_t left_size = buf.size();
for (;;)
{
ssize_t write_size = send(fd_, buf.data() + cur_pos,
left_size, 0);
if (write_size < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
// 重试写入
continue;
}
return false;
}
cur_pos += write_size;
left_size -= write_size;
// 这个条件说明写完需要的数据了
if (left_size <= 0)
{
break;
}
}
return true;
}
复制代码
tcp_epoll_server.hpp
///
// 封装一个 Epoll ET 服务器
// 修改点:
// 1. 对于 new sock, 加上 EPOLLET 标记
// 2. 修改 TcpSocket 支持非阻塞读写
// [注意!] listen_sock 如果设置成 ET, 就需要非阻塞调用 accept 了
///
#pragma once
#include <vector>
#include <functional>
#include <sys/epoll.h>
#include "tcp_socket.hpp"
typedef std::function<void(const std::string&, std::string* resp)> Handler;
class Epoll
{
public:
Epoll()
{
epoll_fd_ = epoll_create(10);
}
~Epoll()
{
close(epoll_fd_);
}
bool Add(const TcpSocket& sock, bool epoll_et = false) const
{
int fd = sock.GetFd();
printf("[Epoll Add] fd = %d\n", fd);
epoll_event ev;
ev.data.fd = fd;
if (epoll_et)
{
ev.events = EPOLLIN | EPOLLET;
}
else
{
ev.events = EPOLLIN;
}
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
if (ret < 0)
{
perror("epoll_ctl ADD");
return false;
}
return true;
}
bool Del(const TcpSocket& sock) const
{
int fd = sock.GetFd();
printf("[Epoll Del] fd = %d\n", fd);
int ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, NULL);
if (ret < 0)
{
perror("epoll_ctl DEL");
return false;
}
return true;
}
bool Wait(std::vector<TcpSocket>* output) const
{
output->clear();
epoll_event events[1000];
int nfds = epoll_wait(epoll_fd_, events, sizeof(events) / sizeof(events[0]), -1);
if (nfds < 0)
{
perror("epoll_wait");
return false;
}
// [注意!] 此处必须是循环到 nfds, 不能多循环
for (int i = 0; i < nfds; ++i)
{
TcpSocket sock(events[i].data.fd);
output->push_back(sock);
}
return true;
}
private:
int epoll_fd_;
};
class TcpEpollServer
{
public:
TcpEpollServer(const std::string& ip, uint16_t port) : ip_(ip),
port_(port)
{
}
bool Start(Handler handler)
{
// 1. 创建 socket
TcpSocket listen_sock;
CHECK_RET(listen_sock.Socket());
// 2. 绑定
CHECK_RET(listen_sock.Bind(ip_, port_));
// 3. 监听
CHECK_RET(listen_sock.Listen(5));
// 4. 创建 Epoll 对象, 并将 listen_sock 加入进去
Epoll epoll;
epoll.Add(listen_sock);
// 5. 进入事件循环
for (;;)
{
// 6. 进行 epoll_wait
std::vector<TcpSocket> output;
if (!epoll.Wait(&output))
{
continue;
}
// 7. 根据就绪的文件描述符的种类决定如何处理
for (size_t i = 0; i < output.size(); ++i)
{
if (output[i].GetFd() == listen_sock.GetFd())
{
// 如果是 listen_sock, 就调用 accept
TcpSocket new_sock;
listen_sock.Accept(&new_sock);
epoll.Add(new_sock, true);
}
else
{
// 如果是 new_sock, 就进行一次读写
std::string req, resp;
bool ret = output[i].RecvNoBlock(&req);
if (!ret)
{
// [注意!!] 需要把不用的 socket 关闭
// 先后顺序别搞反. 不过在 epoll 删除的时候其实就已经关闭 socket 了
epoll.Del(output[i]);
output[i].Close();
continue;
}
handler(req, &resp);
output[i].SendNoBlock(resp);
printf("[client %d] req: %s, resp: %s\n",
output[i].GetFd(),
req.c_str(), resp.c_str());
} // end for
} // end for (;;)
}
return true;
}
private:
std::string ip_;
uint16_t port_;
};
复制代码
感谢各位大佬支持!!!
互三啦!!!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4