目次
5 muduo网络库
5.1 muduo库核心:multi-Reator架构
5.1.1 前置知识:Reator模式
5.1.2 Multi-Reator架构
1. Channel类
2. Poller/EpollPoller类
3. EventLoop类
4. 其余主要类的介绍
5.1.3 Muduo库学习与实践
5.1.3.1 用Muduo库搭一个最简单的Echo服务器
5.1.3.2 Muduo连接建立思想
5.1.3.3 Muduo消息读取逻辑
5.1.3.4 Muduo消息发送逻辑
5.1.3.5 Muduo连接断开逻辑
5.1.3.5.1 连接被动断开
5.1.3.5.2 服务器主动关闭导致连接断开
5.1.3.6 线程思想-One Loop Per Thread
原文链接:https://zhuanlan.zhihu.com/p/495016351
5 muduo网络库
5.1 muduo库核心:multi-Reator架构
muduo库是基于Reator模式实现的TCP网络编程库。
5.1.1 前置知识:Reator模式
反应器模式,本质是一个事件转发器,是网络模块核心中枢,负责将读写事件分发给对应的读写事件处理者。
I/O多路复用(阻塞型):用户线程请求数据时,先向内核态发起select调用,询问数据是否预备就绪,如果返回已预备就绪,调用read。相比同步阻塞IO和同步非阻塞IO一个显著的改进是将数据请求分为两步走,并且一个select可以向内核态多个channel发起轮询查询,当某些socket数据总网卡到达内核时就可以返回到用户态,完成复用。read不是立刻返回,阻塞态。
异步IO(非阻塞):当read调用同时注册回调函数,read立刻返回,当数据预备好后执行回调将数据返回,用户线程非阻塞。
select、poll、epoll:
- select:最大形貌符限制1024,需要遍历形貌符数组状态
- poll:最大形貌符大于select,也需要遍历形貌符数组状态
- epoll:没有文件形貌符的限制,同时epoll_wait方法可以直接返回就绪的形貌符,不需要遍历。
- epoll 实例内部维护了两个结构,分别是记录「要监听的文件形貌符」和「已经就绪的文件形貌符」,而对于已经就绪的文件形貌符来说,它们会被返回给用户步调举行处理(引用返回)。
- 值得分析的是,这里「已就绪的文件形貌符」是由内核主动添加至就绪文件形貌符集合中,我们从用户态调用 epoll_wait 就直接查询该集合是否有就绪 I/O 事件,如许下来,就减少了全遍历全部文件形貌符的操纵。
Reactor模型的核心就是事件驱动型,即将「连接」和「业务线程」分开处理,当「连接层」有事件触发时提交给「业务线程」,制止了业务线程因「网络数据处于预备中」导致的长时间等待问题,节省线程资源。
Reactor模式主要解决的问题是连接建立、IO事件的监听和读写和分发、业务逻辑处理(计算等)。主要由两大脚色组成:
- Reactor 线程的职责:主要负责连接建立、监听IO事件、IO事件读写以及将事件分发到Handlers 处理器。
- Handlers 处理器(业务处理)的职责:非阻塞的执行业务处理逻辑。
三种处理模式,对业务逻辑、连接建立以及IO事件处理举行优化。
- 单线程模型:全部IO操纵(包括连接建立、数据读写、事件分发)、业务处理在一个线程中完成。缺点是:连接数目有限,事件无法并行处理、事件积存。
- 多线程模型:由于整个过程中业务逻辑的处理是最泯灭资源的,因此改进利用线程池处理业务逻辑来提升性能,IO操纵照旧在单线程举行。
- 主从多线程模型:多线程模型的主要缺陷是无法应对大量新连接和IO就绪事件,因此在IO操纵这个过程用主从模型解决。
- 主Reactor处理新建立的连接,连接乐成后将新连接的对象注册到从Reactor
- 从Reactor处理IO读写事件/事件分发,由于比较耗时,采用线程池处理。
- 同样的,可以联合多线程模型,对业务逻辑处理(Handler)仍然保存线程池处理
5.1.2 Multi-Reator架构
基于Reactor主从多线程模式实现。Muduo库有三个核心模块实现Reactor。主要实现持续监听一组fd,并根据每个fd发生的事件调用相应的处理函数(回调函数)。三大模块分别是:Channel类、Poller/EpollPoller类以及EventLoop类。
EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果。而Channel类则在此中起到了将fd及其相关属性封装的作用,将fd及其感兴趣事件和发生的事件以及差异事件对应的回调函数封装在一起,如许在各个模块中通报更加方便。接着EventLoop调用。
在Muduo库中,实现One Loop Per Thread ,也就是每一个EventLoop都绑定了一个线程,充份利用了多核cpu的能力,每一个核的线程负责循环监听一组文件形貌符的集合。
1. Channel类
Channel类封装了事件监听器、fd处理器、处理函数。具体来说,事件监听器定义监听fd组、fd事件集合以及实际发生的事件集合;fd处理器实现实际发生事件的注册到事件监听器或者移除;处理函数实现对fd每种事件对应的处理函数。其功能类似于epoll_ctl。
- int fd_这个Channel对象照看的文件形貌符
- int events_代表fd感兴趣的事件范例集合
- int revents_代表事件监听器实际监听到该fd发生的事件范例集合,当事件监听器监听到一个fd发生了什么事件,通过Channel::set_revents()函数来设置revents值。
- EventLoop* loop这个fd属于哪个EventLoop对象,这个暂时不解释。
- read_callback_ 、write_callback_、close_callback_、error_callback_:这些是std::function范例,代表着这个Channel为这个文件形貌符保存的各事件范例发生时的处理函数。
- 处理函数:向Channel对象注册各类事件的处理函数
- //move语法:属于移动语义的应用
- /*
- std::move 是一个标准库函数,用于将其参数转换为右值引用。
- 右值引用是C++11引入的一种引用类型,用于标识临时对象。
- 这种引用允许我们安全地从一个对象转移资源到另一个对象,而不是进行深拷贝。
- 移动赋值通常比深拷贝更有效率,因为它避免了不必要的资源复制,直接利用已存在的资源。
- */
- //read_callback_ = std::move(cb); 表示使用 cb 的资源来设置或更新 read_callback_ 的状态。
- //这通常涉及资源的转移,如内存、文件句柄、套接字等,而不是复制这些资源。
- void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
- void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
- void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
- void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);}
复制代码
- fd处理器:将Channel中的文件形貌符及其感兴趣事件注册事件监听器上或从事件监听器上移除
- /*
- 关于语法:
- 位或操作(|=):
- 操作符 |= 是位或赋值操作符的简写。
- 它的作用是将左侧变量 events_ 与右侧表达式 kReadEvent 进行位或操作,并将结果存回 events_。
- 位或操作的特点是,对应的位中只要有一个为1,结果位就为1。这使得 |= 操作符非常适合用于添加(设置)标志位。
- update()私有成员方法,这个update其实本质上就是调用了epoll_ctl()。
- */
- void enableReading() {events_ |= kReadEvent; upadte();}
- void disableReading() {events_ &= ~kReadEvent; update();}
- void enableWriting() {events_ |= kWriteEvent; update();}
- void disableWriting() {events_ &= ~kWriteEvent; update();}
- void disableAll() {events_ |= kNonEvent; update();}
复制代码
int set_revents(int revt) {revents_ = revt;}
当事件监听器监听到某个文件形貌符发生了什么事件,通过这个函数可以将这个文件形貌符实际发生的事件封装进这个Channel中。
当调用epoll_wait()后,可以得知事件监听器上哪些Channel(文件形貌符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。
Channel::HandleEvent,让每个发生了事件的Channel调用本身保管的事件处理函数。每个Channel会根据本身文件形貌符实际发生的事件(通过Channel中的revents_变量得知)和感兴趣的事件(通过Channel中的events_变量得知)来选择调用read_callback_和/或write_callback_和/或close_callback_和/或error_callback_。
2. Poller/EpollPoller类
负责监听文件形貌符事件是否触发以及返回发生事件的文件形貌符以及具体事件。相当于事件监听器的具体实现。一个Poller对象对应一个事件监听器。在multi-reactor模型中,有多少reactor就有多少Poller。
muduo提供了epoll和poll两种IO多路复用方法来实现事件监听。不过默认是使用epoll来实现,也可以通过选项选择poll。
Poller是个抽象虚类,由EpollPoller和PollPoller继续实现,与监听文件形貌符和返回监听结果的具体方法也根本上是在这两个派生类中实现。EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法,PollPoller就是封装了poll方法实现的与事件监听有关的各种方法。
- Poller/EpollPoller的重要成员变量
epollfd_就是用epoll_create方法返回的epoll句柄,也就是返回一个文件形貌符,代表一个epoll实例,这个文件形貌符可以用于管理和监视多个文件形貌符,以便在这些文件形貌符上发生 I/O 事件时举行高效的事件通知和处理。
channels_:这个变量是std::unordered_map<int, Channel*>范例,负责记录 文件形貌符 ---> Channel的映射,也帮助保管全部注册在你这个Poller上的Channel。
ownerLoop_:所属的EventLoop对象
TimeStamp poll(int timeoutMs, ChannelList *activeChannels)
这个函数是EpollPoller的核心。上文可知,文件形貌符fd以及其感兴趣事件和实际发生的事件封装在Channel类中。
当外部调用poll方法时,该方法底层实在是通过epoll_wait获取这个事件监听器上发生事件的fd及其对应发生的事件,通过哈希表channels_可以根据fd找到封装这个fd的Channel,然后将监听到该fd发生的事件写到Channel类的revents成员变量中。
然后把这个Channel装进activeChannels中(它是一个vector<Channel*>)。如许,当外界调用完poll之后就能拿到事件监听器的监听结果(activeChannels_)这个activeChannels就是事件监听器监听到的发生事件的fd,以及每个fd都发生了什么事件。
3. EventLoop类
调用一次Poll方法可以返回事件监听器的监听结果(发生事件的fd以及发生的事件),为了网络服务器具有持续监听、获取、处理事件的能力,需要循环调用poll方法,
调用Poller:poll方法获取实际发生事件的Channel集合,然后调用这些Channel内里保管的差异范例事件的处理函数(调用Channel::HandlerEvent方法)。
- void EventLoop::loop()
- { //EventLoop 所属线程执行
- 省略代码 省略代码 省略代码
- while(!quit_)
- {
- activeChannels_.clear();
- pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
- for(Channel *channel : activeChannels_)
- channel->HandlerEvent(pollReturnTime_);
- 省略代码 省略代码 省略代码
- }
- LOG_INFO("EventLoop %p stop looping. \n", t_loopInThisThread);
- }
复制代码 每个EventLoop对象都唯一绑定了一个线程,这个线程实在就在一直执行这个函数内里的while循环。调用Poller::poll方法获取事件监听器上的监听结果。接下来在loop内里就会调用监听结果中每一个Channel的处理函数HandlerEvent( )。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。
4. 其余主要类的介绍
- Acceptor:接受新用户连接并分发连接给SubReactor(SubEventLoop)
- acceptSocket_:这个是服务器监听套接字的文件形貌符
- acceptChannel_:这是个Channel类,把acceptSocket_及其感兴趣事件和事件对应的处理函数都封装进去。
- EventLoop *loop:监听套接字的fd由哪个EventLoop负责循环监听以及处理相应事件,实在这个EventLoop就是main EventLoop。
- newConnectionCallback_: TcpServer构造函数中将TcpServer::newConnection( )函数注册给了这个成员变量。这个TcpServer::newConnection函数的功能是公平的选择一个subEventLoop,并把已经接受的连接分发给这个subEventLoop。
- listen( ):该函数底层调用了linux的函数listen( ),开启对acceptSocket_的监听同时将acceptChannel及其感兴趣事件(可读事件)注册到main EventLoop的事件监听器上。换言之就是让main EventLoop事件监听器去监听acceptSocket_
- handleRead( ):这是一个私有成员方法,这个方法是要注册到acceptChannel_上的, 同时handleRead( )方法内部还调用了成员变量newConnectionCallback_保存的函数。当main EventLoop监听到acceptChannel_上发生了可读事件时(新用户连接事件),就是调用这个handleRead( )方法。
简单来说,Acceptor利用listen()将新连接注册到main EventLoop上。当main EventLoop监听到新连接事件时,调用handleRead()方法,以负载均衡的选择方式选择一个sub EventLoop,并把这个新连接分发到这个subEventLoop上。
- Buffer类(读写配合,缓冲区内部调整以及动态扩容)
封装了一个用户缓冲区,以及向这个缓冲区写数据读数据等一系列控制方法。
这个Buffer类是vector<char>(方便动态扩容),对外表现出std::queue的特性,它的内部原理大概就是下图如许子的,用两个游标(readerIndex_和writerIndex_)标记可读缓冲区的起始位置和空闲空间的起始位置。
此中需要关注的一个思想就是,随着写入数据和读入数据,蓝色的空闲空间会越来越少,prependable空间会越来越大,当什么时候空用空间耗尽了,就会向步骤4一样,把全部数据拷贝前移,重新调整。
另外当整个缓冲区的prependable空间和蓝色的空闲空间都无法装下新来的数据时,那就会调用vector的resize,实现扩容机制。
- append(const char* data, size_t len):将data数据添加到缓冲区中。
- retrieveAsString(size_t len); :获取缓冲区中长度为len的数据,并以strnig返回。
- retrieveAllString();获取缓冲区全部数据,并以string返回。
- ensureWritableByts(size_t len);当你计划向缓冲区写入长度为len的数据之前,先调用这个函数,这个函数会查抄你的缓冲区可写空间能不能装下长度为len的数据,如果不能,就动态扩容。
下面两个方法主要是封装了调用了上面几个方法:
- ssize_t Buffer::readFd(int fd, int* saveErrno);:客户端发来数据,readFd从该TCP吸收缓冲区中将数据读出来并放到Buffer中。
- ssize_t Buffer::writeFd(int fd, int* saveErrno);:服务端要向这条TCP连接发送数据,通过该方法将Buffer中的数据拷贝到TCP发送缓冲区中。
TcpConnection类和Acceptor类是兄弟关系,Acceptor用于main EventLoop中,对服务器监听套接字fd及其相关方法举行封装(监听、接受连接、分发连接给SubEventLoop等),TcpConnection用于SubEventLoop中,对连接套接字fd及其相关方法举行封装(读消息事件、发送消息事件、连接关闭事件、错误事件等)。
- socket_:用于保存已连接套接字文件形貌符。
- channel_:封装了上面的socket_及其各类事件的处理函数(读、写、错误、关闭等事件处理函数)。这个Channel种保存的各类事件的处理函数是在TcpConnection对象构造函数中注册的。
- loop_:这是一个EventLoop*范例,该Tcp连接的Channel注册到了哪一个sub EventLoop上。这个loop_就是那一个sub EventLoop。
- inputBuffer_:这是一个Buffer类,是该TCP连接对应的用户吸收缓冲区。
- outputBuffer_:也是一个Buffer类,不过是用于暂存那些暂时发送不出去的待发送数据。因为Tcp发送缓冲区是有大小限制的,假如到达了高水位线,就没办法把发送的数据通过send()直接拷贝到Tcp发送缓冲区,而是暂存在这个outputBuffer_中,等TCP发送缓冲区有空间了,触发可写事件了,再把outputBuffer_中的数据拷贝到Tcp发送缓冲区中。
- state_:这个成员变量标识了当前TCP连接的状态(Connected、Connecting、Disconnecting、Disconnected)
- connetionCallback_、messageCallback_、writeCompleteCallback_、closeCallback_ : 用户会自定义 [连接建立/关闭后的处理函数] 、[收到消息后的处理函数]、[消息发送完后的处理函数]以及Muduo库中定义的[连接关闭后的处理函数]。这四个函数都会分别注册给这四个成员变量保存。
handleRead()、handleWrite()、handleClose()、handleError():
这四个函数都是私有成员方法,在一个已经建立好的Tcp连接上主要会发生四类事件:可读事件、可写事件、连接关闭事件、错误事件。当事件监听器监听到一个连接发生了以上的事件,那么就会在EventLoop中调用这些事件对应的处理函数。
- handleRead()负责处理Tcp连接的可读事件,它会将客户端发送来的数据拷贝到用户缓冲区中(inputBuffer_),然后再调用connectionCallback_保存的 [连接建立后的处理函数]。
- handleWrite( )负责处理Tcp连接的可写事件。这个函数的环境有些复杂,留到下一篇解说。
- handleClose( )负责处理Tcp连接关闭的事件。大概的处理逻辑就是将这个TcpConnection对象中的channel_从事件监听器中移除。然后调用connectionCallback_和closeCallback_保存的回调函数。这closeCallback_中保存的函数是由Muduo库提供的,connectionCallback_保存的回调函数则由用户提供的(无关紧急实在)
5.1.3 Muduo库学习与实践
5.1.3.1 用Muduo库搭一个最简单的Echo服务器
- Main EventLoop 负责循环监听连接事件
- TCPserver 负责处理连接后的IO事件,TCPsever中定义线程池(sub EventLoop)以及处理函数(回调)
- /*利用Muduo库实现一个简单的Eco服务器*/
- /*
- 头文件包含:
- muduo/net
- muduo/base
- 1、创建main Eventloop,循环事件监听器,负责循环监听新用户的连接情况
- 2、设置监听的地址和端口
- 3、创建sub Eventloop,封装为EchoServer类,包含Tcpserver和loop
- 4、向TcpServer注册各类事件的用户自定义的处理函数: setMessageCallback()、setConnectionCallback();并在private进行实现
- 5、创建EchoServer 对象,启动TcpServer
- 6、main Eventloop 执行loop();
- 工作流程:
- 主线程的EventLoop:
- 创建并运行主EventLoop。
- 监听新连接。
- 线程池中的EventLoop:
- TcpServer根据设置的线程数创建多个线程,每个线程运行一个EventLoop。
- 当有新连接到来时,主EventLoop会将连接分配给线程池中的某个EventLoop。
- 处理连接:
- 分配到线程池中的EventLoop会处理该连接的所有I/O事件(如读写数据)。
- */
- #include<iostream>
- using namespace std;
- #include<muduo/base/Logging.h>
- #include<muduo/net/EventLoop.h>
- #include<muduo/net/TcpServer.h>
- using namespace muduo;
- using namespace muduo::net;
- #include<functional>
- // #include<sting>
- class EchoServer{
- public:
- //构造函数
- EchoServer(EventLoop *loop,
- const InetAddress &addr,
- const string &name)
- : server_(loop, addr, name),
- loop_(loop)
- {
- //设置用户处理函数的回调函数,当服务器检测到新连接并调用回调函数时,会将参数传递给成员函数onConnection并调用
- //利用bind创建一个绑定对象作为回调函数的参数,
- //第一个参数是成员函数的指针,也就是要回调的函数,第二个参数为当前类的实例对象,也就是调用哪个对象的成员函数
- //第三个参数为占位符,表示将来调用回调函数时提供第一个参数,而不是在绑定时提供。换句话讲,当调用回调函数时提供TcpConnectionPtr &conn,进而作为onConnection的参数,而不是绑定时提供
- //下面这个类似于当调用回调函数时,执行this->onConnection(conn)
- server_.setConnectionCallback(bind(&EchoServer::onConnection,this,placeholders::_1));
- //将用户定义的可读事件处理函数注册进TcpServer中,TcpServer发生可读事件时会执行onMessage函数。
- //占位符的简单使用:回调的函数有几个参数就写几个占位符
- server_.setMessageCallback(bind(&EchoServer::onMessage, this,
- placeholders::_1, std::placeholders::_2, std::placeholders::_3));
- //设置sub reactor线程数,有几个线程就有几个subreactor,每个线程有一个独立的EventLoop。
- server_.setThreadNum(3);
- }
- void strat(){
- server_.start();
- }
- private:
- //用户自定义的处理函数
- //用户连接处理函数,当服务端接收到新连接建立请求,则打印Connection 对象的地址和端口,如果是关闭连接请求,则打印Connection Down
- //TcpConnectionPtr是一个共享所有权的智能指针,可以在多个地方共享同一个 TcpConnection 对象的所有权
- // TcpConnection 是一个类或结构体,表示一个TCP连接。它封装了与TCP连接相关的操作和状态,例如连接的建立、断开、数据传输等
- void onConnection(const TcpConnectionPtr &conn){
- if (conn->connected())
- {
- //使用 LOG_INFO 宏记录一条信息,内容是连接的对端地址和端口
- LOG_INFO << "Connection from " << conn->peerAddress().toIpPort();
- }
- else
- {
- LOG_INFO << "Connection disconnected " << conn->peerAddress().toIpPort();
- }
- }
- //用户可读事件处理函数:当一个TCP发生了可读事件,就把接收到的消息原封不动传回去
- void onMessage(const TcpConnectionPtr &conn,
- Buffer *buf,
- Timestamp time)
- {
- //从缓冲区检索所有接收到的数据
- string msg=buf->retrieveAllAsString();
- conn->send(msg);
- conn->shutdown();
- }
- //创建服务器对象
- TcpServer server_;
- //创建sub EventLoop
- EventLoop* loop_;
- };
- int main(){
- //定义main eventloop
- EventLoop loop;
- //封装socketaddr_in,设置监听地址和端口号,端口号为2007,ip地址为默认的0.0.0.0(INADDR_ANY,即服务器可以连接所有可用的网络接口)
- InetAddress addr(2007);
- EchoServer server(&loop,addr,"echoserver-01");
- server.strat();
- loop.loop();
- return 0;
- }
复制代码 测试结果:


5.1.3.2 Muduo连接建立思想
- TcpServer::TcpServer( ):实例化Acceptor对象
当创建Tcpserver对象时,即执行代码TcpServer server(&loop, listenAddr);调用构造函数实例化Acceptor对象,并向这个Acceptor对象注册一个回调函数TcpServer::newConnection()。
- Acceptor::Acceptor( ):实例化Channel对象
当在TcpServer构造函数实例化Acceptor对象时,Acceptor的构造函数中实例化了一个Channel对象,即acceptChannel_,该Channel对象封装了服务器监听套接字文件形貌符(尚未注册到main EventLoop的事件监听器上)。
接着Acceptor构造函数将Acceptor::handleRead( )方法注册进acceptChannel_中,这也意味着,日后如果事件监听器监听到acceptChannel_发生可读事件(新用户连接),将会调用AcceptorC::handleRead( )函数。
至此,TcpServer对象创建完毕,用户调用TcpServer::start( )方法,开启TcpServer。我们来直接看一下TcpServer::start( )方法都干了什么。
- /**** TcpServer.cc *******/
- void TcpServer::start() //开启服务器监听
- {
- 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略
- loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
- //让这个EventLoop,也就是mainloop来执行Acceptor的listen函数,开启服务端监听
- 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略
- }
- /***** Acceptor.cc ********/
- void Acceptor::listen()
- {
- listenning_ = true;
- acceptSocket_.listen();
- acceptChannel_.enableReading();
- }
复制代码 实在就是将实在主要就是调用Acceptor::listen( )函数(底层是调用了linux的函数listen( ))监听服务器套接字,以及将acceptChannel_注册到main EventLoop的事件监听器上监听它的可读事件(新用户连接事件)
接着用户调用loop.loop( ),即调用了EventLoop::loop( )函数,该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。
- Acceptor::handleRead( ):处理新用户连接,调用accept()
当步调如果执行到了这个函数内里,分析acceptChannel_发生可读事件,步调处理新客户连接请求。
该函数起首调用了Linux的函数accept( )接受新客户连接。
接着调用了TcpServer::newConnection( )函数,这个函数是在步骤1中注册给Acceptor并由成员变量newConnectionCallback_保存。
- TcpServer::newConnection( ):将建立的连接公平分发到sub EventLoop
该函数的主要功能就是将建立好的连接举行封装(封装成TcpConnection对象),并使用选择算法公平的选择一个sub EventLoop,并调用TcpConnection::connectEstablished( )将TcpConnection::channel_注册到刚刚选择的sub EventLoop上。
- /******** Callbacks.h ********/
- using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
- /******** TcpServer.cc ********/
- void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
- {
- 代码省略
- TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
- connections_[connName] = conn;
- 代码省略
- ioLoop->runInLoop(bind(&TcpConnection::connectEstablished, conn));
- }
复制代码 在TcpServer::newConnection()函数中,当接受了一个新用户连接,就要把这个Tcp连接封装成一个TcpConnection对象,也就是上面代码中的new TcpConnection(...)。然后用一个共享型智能指针来管理这个对象。
这里使用智能指针管理TcpConnetion的最重要缘故原由在于防止指针悬空,而指针悬空可能会来自以下这两个方面:
a. TcpConnection会和用户直接交互,用户可能会误删除。
在我们编写Echo服务器的时候,我们用户可以自定义连接事件发生后的处理函数(如下所示),并将这个函数注册到TcpServer中。
- /**** 用户自定义的连接事件发生后的处理函数 *****/
- void onConnection(const TcpConnectionPtr &conn)
- {
- .......
- }
复制代码 假如这里的onConnection函数传入的是TcpConnection而不是TcpConnectionPtr,用户在onConnection函数中把TcpConnection对象给Delete了怎么办?删除了之后,步调内部还要好几处地方都在使用TcpConnection对象。结果这个对象的内存忽然消失了,服务器访问非法内存崩溃。虽然这一系列连锁反应会让人以为用户很笨。但是作为计划者的我们必须要包管,编程计划不可以依靠用户行为,肯定要尽可能地封死用户的误操纵。以是这里用了共享智能指针。
b. TcpConnection对象的多线程安全问题:
假如服务器要关闭了这个时候MainEventLoop线程中的TcpServer::~TcpServer()函数开始把全部TcpConnection对象都删掉。那么其他线程还在使用这个TcpConnection对象,如果你把它的内存空间都释放了,其他线程访问了非法内存,会直接崩溃。你可能会以为,反正我都要把服务器给关了,崩就崩了吧。这种想法是错的!因为可能在你关闭服务器的时候,其他线程正在处理TcpConnection的发送消息任务,这个时候你应该等它发完才释放TcpConnection对象的内存才对!
总结:muduo连接建立:TcpServer( )实例化Acceptor对象,Acceptor( )实例化Channel对象调用listen(),handleRead( ):处理新用户连接,调用accept(),调用TcpServer::newConnection()函数把新用户Tcp连接封装成TcpConnection对象,分发给subEventloop
5.1.3.3 Muduo消息读取逻辑
如上图所示,SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着 [一个fd] 及 [fd感兴趣的事件] 和 [事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_、writeCallback_、closeCallback_、errorCallback_)。
readCallback_保存的函数实在是TcpConnection::handleRead( ),消息读取的处理逻辑也就是由这个函数提供的,我们稍微分析一下这个函数:
- void TcpConnection::handleRead(TimeStamp receiveTime)
- {
- int savedErrno = 0;
- ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
- if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
- {
- messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
- }
- else if(n == 0)
- handleClose();
- else
- {
- errno = savedErrno;
- LOG_ERROR("TcpConnection::handleRead");
- handleError();
- }
- }
复制代码 TcpConnection::handleRead( )函数起首调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp吸收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。
当readFd( )返回值大于0,分析从吸收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。
readFd( )返回值等于0,分析客户端连接关闭,这时候应该调用TcpConnection::handleClose( )来处理连接关闭事件
readFd( )返回值等于-1,分析发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,以是读事件不会出现EAGAIN的错误,以是一旦出现错误,分析肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。
补充:epoll LT 模式和 ET 模式
与 poll 的事件宏相比,epoll 新增了一个事件宏 EPOLLET,这就是所谓的边沿触发模式(Edge Trigger,ET),而默认的模式我们称为 水平触发模式(Level Trigger,LT)。这两种模式的区别在于:
- 对于水平触发模式,一个事件只要有,就会一直触发;
- 对于边沿触发模式,只有一个事件从无到有才会触发。
特性总结:
- LT 模式下,读事件触发后,可以按需收取想要的字节数,不用把本次吸收到的数据收取干净(即不用循环到 recv 或者 read 函数返回 -1,错误码为 EWOULDBLOCK 或 EAGAIN);ET 模式下,读事件必须把数据收取干净,因为你不肯定有下一次机会再收取数据了,纵然有机会,也可能存在上次没读完的数据没有及时处理,造成客户端相应耽误。
- LT 模式下,不需要写事件肯定要及时移除,制止不须要的触发,浪费 CPU 资源;ET 模式下,写事件触发后,如果还需要下一次的写事件触发来驱动任务(例如发上次剩余的数据),你需要继续注册一次检测可写事件。
- LT 模式和 ET 模式各有优缺点,无所谓孰优孰劣。使用 LT 模式,我们可以自由决定每次收取多少字节(对于平凡 socket)或何时吸收连接(对于侦听 socket),但是可能会导致多次触发;使用 ET 模式,我们必须每次都要将数据收完(对于平凡 socket)或必须理解调用 accept 吸收连接(对于侦听socket),其优点是触发次数少。
原文链接:epoll LT 模式和 ET 模式详解(文末赠书)-腾讯云开辟者社区-腾讯云
Buffer::readFd()函数分析:
分析这个函数是因为这个函数的计划有可取之处。这个readFd巧妙的计划,可以让用户一次性把全部TCP吸收缓冲区的全部数据全部都读出来并放到用户自定义的缓冲区Buffer中。
用户自定义缓冲区Buffer是有大小限制的,我们一开始不知道TCP吸收缓冲区中的数据量有多少,如果一次性读出来会不会导致Buffer装不下而溢出。以是在readFd( )函数中会在栈上创建一个临时空间extrabuf,然后使用readv的分散读特性,将TCP缓冲区中的数据先拷贝到Buffer中,如果Buffer容量不敷,就把剩余的数据都拷贝到extrabuf中,然后再调整Buffer的容量(动态扩容),再把extrabuf的数据拷贝到Buffer中。当这个函数结束后,extrabuf也会被释放。另外extrabuf是在栈上开辟的空间,速度比在堆上开辟还要快。
- ssize_t Buffer::readFd(int fd, int* saveErrno)
- {
- char extrabuf[65536] = {0}; //栈上的内存空间
- struct iovec vec[2];
- const size_t writableSpace = writableBytes(); //可写缓冲区的大小
- vec[0].iov_base = begin() + writerIndex_; //第一块缓冲区
- vec[0].iov_len = writableSpace; //当我们用readv从socket缓冲区读数据,首先会先填满这个vec[0],也就是我们的Buffer缓冲区
- vec[1].iov_base = extrabuf; //第二块缓冲区,如果Buffer缓冲区都填满了,那就填到我们临时创建的
- vec[1].iov_len = sizeof(extrabuf); //栈空间上。
- const int iovcnt = (writableSpace < sizeof(extrabuf) ? 2 : 1);
- const ssize_t n = ::readv(fd, vec, iovcnt);
- if(n < 0){
- *saveErrno = errno; //出错了!!
- }
- else if(n <= writableSpace){ //说明Buffer空间足够存了
- writerIndex_ += n; //
- }
- else{ //Buffer空间不够存,需要把溢出的部分(extrabuf)倒到Buffer中(会先触发扩容机制)
- writerIndex_ = buffer_.size();
- append(extrabuf, n-writableSpace);
- }
- return n;
- }
复制代码 5.1.3.4 Muduo消息发送逻辑
当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部实在是调用了Linux的函数write( )(别说你不知道write( )是啥)
如果TCP发送缓冲区能一次性容纳buf,那这个write( )函数将buf全部拷贝到发送缓冲区中。
如果TCP发送缓冲区内不能一次性容纳buf:
- 这时候write( )函数buf数据尽可能地拷贝到TCP发送缓冲区中,并且将errno设置为EWOULDBLOCK。
- 剩余未拷贝到TCP发送缓冲区中的buf数据会被存放在TcpConnection:
utputBuffer_中。并且向事件监听器上注册该TcpConnection::channel_的可写事件。
- 事件监听器监听到该Tcp连接可写事件,就会调用TcpConnection::handleWrite( )函数把TcpConnection:
utputBuffer_中剩余的数据发送出去。
- 在TcpConnection::handleWrite( )函数中,通过调用Buffer::writeFd()函数将outputBuffer_的数据写入到Tcp发送缓冲区,如果Tcp发送缓冲区能容纳全部剩余的未发送数据,那最好不过了。如果Tcp发送缓冲区依旧没法容纳剩余的未发送数据,那就尽可能地将数据拷贝到Tcp发送缓冲区中,继续保持可写事件的监听。
- 当数据全部拷贝到Tcp发送缓冲区之后,就会调用用户自定义的【写完后的事件处理函数】,并且移除该TcpConnection在事件监听器上的可写事件。(移除可写事件是为了进步服从,不会让epoll_wait() 毫无意义的频繁触发可写事件。因为大多数时候是没有数据需要发送的,频繁触发可写事件但又没有数据可写。)
5.1.3.5 Muduo连接断开逻辑
5.1.3.5.1 连接被动断开
服务端TcpConnection::handleRead()中感知到客户端把连接断开了。
TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )。
上图中的标号1、2、3是函数调用顺序,我们可以看到:
- 在执行TcpConnection::handle_Close()的时候,该函数照旧在SubEventLoop线程中运行的,接着调用closeCallback_(connPtr)回调函数,该函数保存的实在是TcpServer::removeConnection( )函数
- TcpServer::removeConnection( )函数调用了remvoveConnectionInLoop( )函数,该函数的运行是在MainEventLoop线程中执行的,这里涉及到线程切换技能,反面再讲。
- removeConnectionInLoop( )函数:TcpServer对象中有一个connections_成员变量,这是一个unordered_map,负责保存【string --> TcpConnection】的映射,实在就是保存着Tcp连接的名字到TcpConnection对象的映射。因为这个Tcp连接要关闭了,以是也要把这个TcpConnection对象从connections_中删掉。然后再调用TcpConnection::connectDestroyed函数。
另外为什么removeConnectionInLoop()要在MainEventLoop中运行,因为该函数主要是从TcpServer对象中删除某条数据。而TcpServer对象是属于MainEventLoop的。这也是贯彻了One Loop Per Thread的理念。
- TcpConnection::connectDestroyed( )函数的执行是又跳回到了subEventLoop线程中。该函数就是将Tcp连接的监听形貌符从事件监听器中移除。另外SubEventLoop中的Poller类对象还保存着这条Tcp连接的channel_,以是调用channel_.remove( )将这个Tcp连接的channel对象从Poller内的数据结构中删除。
5.1.3.5.2 服务器主动关闭导致连接断开
当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。
这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象 loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!
以是我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One Loop Per Thread的计划模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才气释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。
全部上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以到达释放TcpConnection对象的堆内存空间的目的。
但是这内里实在有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操纵不在同一个线程中运行,以是要思量怎么包管一个TcpConnectino对象的堆内存释放操纵是在TcpConnection::connectDestroyed()调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。
- TcpServer::~TcpServer()
- {
- //connections类型为std::unordered_map<std::string, TcpConnectionPtr>;
- for(auto &item : connections_)
- {
- //创建一个临时的 TcpConnectionPtr 智能指针:
- //item.second 是 std::shared_ptr<TcpConnection>,它持有一个 TcpConnection 对象。
- //创建了一个新的 shared_ptr(即 conn)与 item.second 共享同一个 TcpConnection 对象。此时,TcpConnection 的引用计数加1。
- TcpConnectionPtr conn(item.second);
- //调用 reset 方法将 item.second 所持有的 TcpConnection 对象释放。
- //由于 conn 还持有该对象,TcpConnection 的引用计数减1,但不会被销毁。
- item.second.reset();
- /*
- conn->getLoop() 返回 TcpConnection 所属的事件循环对象。
- 调用 runInLoop 方法将一个任务(bind(&TcpConnection::connectDestroyed, conn))添加到事件循环中。
- 任务的逻辑是调用 TcpConnection::connectDestroyed,通知连接对象销毁。
- 此时,conn 依然持有连接对象,确保对象不会在事件循环中使用时被销毁。
- 每次循环结束时,conn 超出其作用域被销毁,TcpConnection 对象的引用计数减1。
- 如果没有其他地方再持有该对象(引用计数为0),则 TcpConnection 对象会被销毁。
- */
- conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
- }
- }
复制代码
- 起首TcpServer::connections_是一个unordered_map<string, TcpConnectionPtr>,此中TcpConnectionPtr的寄义是指向TcpConnection的shared_ptr。
- 在一开始,每一个TcpConnection对象都被一个共享智能指针TcpConnetionPtr持有,当执行了TcpConnectionPtr conn(item.second)时,这个TcpConnetion对象就被conn和这个item.second共同持有,但是这个conn的生存周期很短,只要离开了当前的这一次for循环,conn就会被释放。
- 紧接着调用item.second.reset()释放掉TcpServer中保存的该TcpConnectino对象的智能指针。此时在当前环境下,只剩下conn还持有这个TcpConnection对象,因此当前TcpConnection对象还不会被析构。
- 接着调用了conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
这句话的寄义是让SubEventLoop线程去执行TcpConnection::connectDestroyed()函数。当你把这个conn的成员函数传进去的时候,conn所指向的资源的引用计数会加1。因为传给runInLoop的不但有函数,另有这个函数所属的对象conn。
- SubEventLoop线程开始运行TcpConnection::connectDestroyed()
- MainEventLoop线程当前这一轮for循环跑完,共享智能指针conn离开代码块,因此被析构,但是TcpConnection对象还不会被释放,因为另有一个共享智能指针指向这个TcpConnection对象,而且这个智能指针在TcpConnection::connectDestroyed()中,只不过这个智能指针你看不到,它在这个函数中是一个隐式的this的存在。当这个函数执行完后,智能指针就真的被释放了。到此,就没有任何智能指针指向这个TcpConnection对象了。TcpConnection对象就彻底被析构删除了。
- 如果TcpConnection中有正在发送的数据,怎么包管在触发TcpConnection关闭机制后,能先让TcpConnection先把数据发送完再释放TcpConnection对象的资源?
- /***** TcpConnection.cc *****/
- void TcpConnection::connectEstablished()
- {
- setState(kConnected);
- channel_->tie(shared_from_this());
- channel_->enableReading(); //向poller注册channel的epollin事件
- //新连接建立,执行回调
- connectionCallback_(shared_from_this());
- }
复制代码 我们先相识一下shared_from_this()是什么意思,起首TcpConnection类继续了一个类,继续了这个类之后才气使用。
- class TcpConnection :public std::enable_shared_from_this<TcpConnection>
复制代码 假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA。
接着这个shared_ptr就作为channel_的Channel::tie()函数的函数参数。
- /***** Channel.h ******/
- std::weak_ptr<void> tie_;
- /***** Channel.cc ******/
- void Channel::tie(const shared_ptr<void>& obj)
- {
- tie_ = obj;
- tied_ = true;
- }
- void Channel::HandlerEvent(TimeStamp receiveTime)
- {
- if(tied_){
- shared_ptr<void> guard = tie_.lock();
- if (guard)
- HandleEventWithGuard(receiveTime);
- }
- else{
- 。。。。一般不会执行到这里其实。我实在想不到正常运行的情况下怎么会执行到这里,可能是我比较菜。
- HandleEventWithGuard(receiveTime);
- }
- }
复制代码 当事件监听器返回监听结果,就要对每一个发生事件的channel对象调用他们的HandlerEvent()函数。在这个HandlerEvent函数中,会先把tie_这个weak_ptr提升为强共享智能指针。这个强共享智能指针会指向当前的TcpConnection对象。就算你表面调用删除析构了其他全部的指向该TcpConnection对象的智能指针。你只要HandleEventWithGuard()函数没执行完,你这个TcpConnetion对象都不会被析构释放堆内存。而HandleEventWithGuard()函数内里就有负责处理消息发送事件的逻辑。当HandleEventWithGuard()函数调用完毕,这个guard智能指针就会被释放。
5.1.3.6 线程思想-One Loop Per Thread
One Loop Per Thread的寄义就是,一个EventLoop和一个线程唯一绑定,和这个EventLoop有关的,被这个EventLoop管辖的一切操纵都必须在这个EventLoop绑定线程中执行,比如在MainEventLoop中,负责新连接建立的操纵都要在MainEventLoop线程中运行。已建立的连接分发到某个SubEventLoop上,这个已建立连接的任何操纵,比如吸收数据发送数据,连接断开等事件处理都必须在这个SubEventLoop线程上运行,还不准跑到别的SubEventLoop线程上运行。
- #include <sys/eventfd.h>
- int eventfd(unsigned int initval, int flags);
复制代码 调用函数eventfd()会创建一个eventfd对象,或者也可以理解打开一个eventfd范例的文件,类似平凡文件的open操纵。eventfd的在内核空间维护一个无符号64位整型计数器, 初始化为initval的值。
- EFD_CLOEXEC(2.6.27~) : eventfd()返回一个文件形貌符,如果该进程被fork的时候,这个文件形貌符也会被复制过去,这个时候就会有多个形貌符指向同一个eventfd对象,如果设置了这个标记,则子进程在执行exec的时候,会主动扫撤除父进程的这个文件形貌符。
- EFD_NONBLOCK(2.6.27~):文件形貌符会被设置为O_NONBLOCK,如果没有设置这个标记位,read操纵的时候将会阻塞直到计数器中有值,如果设置了这个这个标记位,计数器没有值得时候也会立刻返回-1。
- EFD_SEMAPHORE(2.6.30~): 这个标记位会影响read操纵。反面讲。
- 如果写入的值和小于0xFFFFFFFFFFFFFFFE则写入乐成
- 如果大于0xFFFFFFFFFFFFFFFE
- 如果设置了EFD_NONBLOCK标记位就直接返回-1
- 如果没有设置EFD_NONBLOCK标记位就会一直阻塞到read操纵执行。
- 如果设置了EFD_SEMAPHORE标记位,则返回1,且计数器的值减去1
- 如果没有设置EFD_SEMAPHORE标记位,则返回计数器中的值,并且设置计数器值为0.
- 设置了EFD_NONBLOCK标记位就直接返回-1.
- 没有设置EFD_NONBLOCK标记位就会一直阻塞直到计数器中的值大于0.
- 如何包管一个EventLoop对象和一个线程唯一绑定(该线程只能绑定一个EventLoop对象,该EventLoop对象也必须绑定一个线程)
下图是EventLoop构造函数(我把不相关的代码全删了。以是看上去可能会有一点点光秃秃)
- /***** EventLoop.cc *****/
- __thread EventLoop *t_loopInThisThread = nullptr;
- EventLoop::EventLoop() :
- wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
- wakeupChannel_(new Channel(this, wakeupFd_))
- {
- LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
- if(t_loopInThisThread) //如果当前线程已经绑定了某个EventLoop对象了,那么该线程就无法创建新的EventLoop对象了
- LOG_FATAL("Another EventLoop %p exits in this thread %d \n", t_loopInThisThread, threadId_);
- else
- t_loopInThisThread = this;
- wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
- wakeupChannel_->enableReading();
- }
复制代码 介绍一下这个__thread,这个__thread是一个关键字,被这个关键字修饰的全局变量 t_loopInThisThread会具备一个属性,那就是该变量在每一个线程内都会有一个独立的实体。因为一样平常的全局变量都是被同一个进程中的多个线程所共享,但是这里我们不希望如许。
在EventLoop对象的构造函数中,如果当前线程没有绑定EventLoop对象,那么t_loopInThisThread为nullptr,然后就让该指针变量指向EventLoop对象的地址。如果t_loopInThisThread不为nullptr,分析当前线程已经绑定了一个EventLoop对象了,这时候EventLoop对象构造失败!
- Muduo库如何实现每个EventLoop线程只运行从属于该EventLoop的操纵?
EventLoop构造函数的初始化列表中,如下所示:
- int createEventfd()
- {
- int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
- if(evtfd < 0) //eventfd创建失败,一般不会失败,除非一个进程把文件描述符(Linux一个进程1024个最多)全用光了。
- LOG_FATAL("eventfd error: %d \n", errno);
-
- return evtfd;
- }
- EventLoop::EventLoop()
- : wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
- ...
- {...}
复制代码 在EventLoop的初始化列表中:
- CreateEventfd()返回一个eventfd文件形貌符,并且该文件形貌符设置为非阻塞和子进程不拷贝模式。该eventfd文件形貌符赋给了EventLoop对象的成员变量wakeupFd_。
- 随即将wakeupFd_用Channel封装起来,得到wakeupChannel_。接着在EventLoop构造函数中
在EventLoop的构造函数体内:
- 先是给这个Channel注册一个读事件处理函数EventLoop::handleRead()
- 随即将这个wakeupChannel_注册到事件监听器上监听其可读事件。当事件监听器监听到wakeupChannel_的可读事件时就会调用EventLoop::handleRead()函数。
我们来刻画一个景象,我们知道每个EventLoop线程主要就是在执行其EventLoop对象的loop函数(该函数就是一个while循环,循环的获取事件监听器的结果以及调用每一个发生事件的Channel的事件处理函数)。此时SubEventLoop上注册的Tcp连接都没有任何动静,整个SubEventLoop线程就阻塞在epoll_wait()上。
此时MainEventLoop接受了一个新连接请求,并把这个新连接封装成一个TcpConnection对象,并且希望在SubEventLoop线程中执行TcpConnection::connectEstablished()函数,因为该函数的目的是将TcpConnection注册到SubEventLoop的事件监听器上,并且调用用户自定义的连接建立后的处理函数。当该TcpConnection对象注册到SubEventLoop之后,这个TcpConnection对象的任何操纵(包括调用用户自定义的连接建立后的处理函数。)都必须要在这个SubEventLoop线程中运行,以是TcpConnection::connectEstablished()函数必须要在SubEventLoop线程中运行。
那么我们怎么在MainEventLoop线程中通知SubEventLoop线程起来执行TcpConnection::connectEstablished()函数呢?这里就要好好研究一下EventLoop::runInLoop()函数了。
- void EventLoop::runInLoop(Functor cb)
- {//该函数保证了cb这个函数对象一定是在其EventLoop线程中被调用。
- if(isInLoopThread())//如果当前调用runInLoop的线程正好是EventLoop的运行线程,则直接执行此函数
- cb();
- else//否则调用 queueInLoop 函数
- queueInLoop(cb);
- }
- void EventLoop::queueInLoop(Functor cb)
- {
- {
- unique_lock<mutex> lock(mutex_);
- pendingFunctors_.emplace_back(cb);
- }
- if(!isInLoopThread() || callingPendingFunctors_)
- wakeup();
- }
- void EventLoop::wakeup()
- {
- uint64_t one = 1;
- ssize_t n = write(wakeupFd_, &one, sizeof(one));
- if(n != sizeof(n))
- LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
- }
- void EventLoop::loop()
- { //EventLoop 所属线程执行
- looping_ = true;
- quit_ = false;
- LOG_INFO("EventLoop %p start looping \n", this);
- while(!quit_)
- {
- activeChannels_.clear();
- pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
- for(Channel *channel : activeChannels_)
- channel->HandlerEvent(pollReturnTime_);
- doPendingFunctors(); //执行当前EventLoop事件循环需要处理的回调操作。
- }
- }
- void EventLoop::doPendingFunctors()
- {
- std::vector<Functor> functors;
- callingPendingFunctors_ = true;
- {
- unique_lock<mutex> lock(mutex_);
- functors.swap(pendingFunctors_); //这里的swap其实只是交换的vector对象指向的内存空间的指针而已。
- }
- for(const Functor &functor:functors)
- {
- functor();
- }
- callingPendingFunctors_ = false;
- }
复制代码 起首EventLoop::runInLoop函数接受一个可调用的函数对象Functor cb,如果当前cpu正在运行的线程就是该EventLoop对象绑定的线程,那么就直接执行cb函数。否则就把cb传给queueInLoop()函数。
在queueInLoop()函数中主要就是把cb这个可调用对象保存在EventLoop对象的pendingFunctors_这个数组中,我们希望这个cb能在某个EventLoop对象所绑定的线程上运行,但是由于当前cpu执行的线程不是我们等待的这个EventLoop线程,我们只能把这个可调用对象先存在这个EventLoop对象的数组成员pendingFunctors_中。
我们再把目光转移到上面代码中的EventLoop::loop()函数中。我们知道EventLoop::loop()肯定是运行在其所绑定的EventLoop线程内,在该函数内会调用doPendingFunctors()函数,这个函数就是把本身这个EventLoop对象中的pendingFunctors_数组中保存的可调用对象拿出来执行。pendingFunctors_中保存的是其他线程希望你这个EventLoop线程执行的函数。
另有一个问题,假如EventLoop A线程阻塞在EventLoop::loop()中的epoll_wait()调用上(EventLoop A上监听的文件形貌符没有任何事件发生),这时候EventLoop线程要求EventLoopA赶紧执行某个函数,那其他线程要怎么唤醒这个阻塞住的EventLoopA线程呢?这时候我们就要把目光聚焦在上面的wakeup()函数了。
wakeup()函数就是向我们想唤醒的线程所绑定的EventLoop对象持有的wakeupFd_随便写入一个8字节数据,因为wakeupFd_已经注册到了这个EventLoop中的事件监听器上,这时候事件监听器监听到有文件形貌符的事件发生,epoll_wait()阻塞结束而返回。这就相当于起到了唤醒线程的作用!你这个EventLoop对象既然阻塞在事件监听上,那我就通过wakeup()函数给你这个EventLoop对象一个事件,让你结束监听阻塞。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |