IT评测·应用市场-qidao123.com

标题: Muduo网络库学习与源码分析 [打印本页]

作者: 北冰洋以北    时间: 2025-1-23 21:51
标题: Muduo网络库学习与源码分析
目次
5 muduo网络库
5.1 muduo库核心:multi-Reator架构
5.1.1 前置知识:Reator模式
5.1.2 Multi-Reator架构
1. Channel类
2. Poller/EpollPoller类
3. EventLoop类
4. 其余主要类的介绍
5.1.3 Muduo库学习与实践
5.1.3.1 用Muduo库搭一个最简单的Echo服务器
5.1.3.2 Muduo连接建立思想
5.1.3.3 Muduo消息读取逻辑
5.1.3.4 Muduo消息发送逻辑
5.1.3.5 Muduo连接断开逻辑
5.1.3.5.1 连接被动断开
5.1.3.5.2 服务器主动关闭导致连接断开
5.1.3.6 线程思想-One Loop Per Thread
原文链接:https://zhuanlan.zhihu.com/p/495016351 
5 muduo网络库

5.1 muduo库核心:multi-Reator架构

muduo库是基于Reator模式实现的TCP网络编程库。
5.1.1 前置知识:Reator模式

反应器模式,本质是一个事件转发器,是网络模块核心中枢,负责将读写事件分发给对应的读写事件处理者。
I/O多路复用(阻塞型):用户线程请求数据时,先向内核态发起select调用,询问数据是否预备就绪,如果返回已预备就绪,调用read。相比同步阻塞IO和同步非阻塞IO一个显著的改进是将数据请求分为两步走,并且一个select可以向内核态多个channel发起轮询查询,当某些socket数据总网卡到达内核时就可以返回到用户态,完成复用。read不是立刻返回,阻塞态。

异步IO(非阻塞):当read调用同时注册回调函数,read立刻返回,当数据预备好后执行回调将数据返回,用户线程非阻塞。

select、poll、epoll:



Reactor模型的核心就是事件驱动型,即将「连接」和「业务线程」分开处理,当「连接层」有事件触发时提交给「业务线程」,制止了业务线程因「网络数据处于预备中」导致的长时间等待问题,节省线程资源。
Reactor模式主要解决的问题是连接建立、IO事件的监听和读写和分发、业务逻辑处理(计算等)。主要由两大脚色组成:


  
三种处理模式,对业务逻辑、连接建立以及IO事件处理举行优化。





5.1.2 Multi-Reator架构

基于Reactor主从多线程模式实现。Muduo库有三个核心模块实现Reactor。主要实现持续监听一组fd,并根据每个fd发生的事件调用相应的处理函数(回调函数)。三大模块分别是:Channel类、Poller/EpollPoller类以及EventLoop类。


EventLoop起到一个驱动循环的功能,Poller负责从事件监听器上获取监听结果。而Channel类则在此中起到了将fd及其相关属性封装的作用,将fd及其感兴趣事件和发生的事件以及差异事件对应的回调函数封装在一起,如许在各个模块中通报更加方便。接着EventLoop调用。


在Muduo库中,实现One Loop Per Thread ,也就是每一个EventLoop都绑定了一个线程,充份利用了多核cpu的能力,每一个核的线程负责循环监听一组文件形貌符的集合。
1. Channel类

Channel类封装了事件监听器、fd处理器、处理函数。具体来说,事件监听器定义监听fd组、fd事件集合以及实际发生的事件集合;fd处理器实现实际发生事件的注册到事件监听器或者移除;处理函数实现对fd每种事件对应的处理函数。其功能类似于epoll_ctl。


  1. //move语法:属于移动语义的应用
  2. /*
  3. std::move 是一个标准库函数,用于将其参数转换为右值引用。
  4. 右值引用是C++11引入的一种引用类型,用于标识临时对象。
  5. 这种引用允许我们安全地从一个对象转移资源到另一个对象,而不是进行深拷贝。
  6. 移动赋值通常比深拷贝更有效率,因为它避免了不必要的资源复制,直接利用已存在的资源。
  7. */
  8. //read_callback_ = std::move(cb); 表示使用 cb 的资源来设置或更新 read_callback_ 的状态。
  9. //这通常涉及资源的转移,如内存、文件句柄、套接字等,而不是复制这些资源。
  10. void setReadCallback(ReadEventCallback cb) {read_callback_ = std::move(cb);}
  11. void setWriteCallback(Eventcallback cb) {write_callback_ = std::move(cb);}
  12. void setCloseCallback(EventCallback cb) {close_callback_ = std::move(cb);}
  13. void setErrorCallback(EventCallback cb) {error_callback_ = std::move(cb);}
复制代码

  1. /*
  2. 关于语法:
  3. 位或操作(|=):
  4. 操作符 |= 是位或赋值操作符的简写。
  5. 它的作用是将左侧变量 events_ 与右侧表达式 kReadEvent 进行位或操作,并将结果存回 events_。
  6. 位或操作的特点是,对应的位中只要有一个为1,结果位就为1。这使得 |= 操作符非常适合用于添加(设置)标志位。
  7. update()私有成员方法,这个update其实本质上就是调用了epoll_ctl()。
  8. */
  9. void enableReading() {events_ |= kReadEvent; upadte();}
  10. void disableReading() {events_ &= ~kReadEvent; update();}
  11. void enableWriting() {events_ |= kWriteEvent; update();}
  12. void disableWriting() {events_ &= ~kWriteEvent; update();}
  13. void disableAll() {events_ |= kNonEvent; update();}   
复制代码

int set_revents(int revt) {revents_ = revt;}
当事件监听器监听到某个文件形貌符发生了什么事件,通过这个函数可以将这个文件形貌符实际发生的事件封装进这个Channel中。
当调用epoll_wait()后,可以得知事件监听器上哪些Channel(文件形貌符)发生了哪些事件,事件发生后自然就要调用这些Channel对应的处理函数。
Channel::HandleEvent,让每个发生了事件的Channel调用本身保管的事件处理函数。每个Channel会根据本身文件形貌符实际发生的事件(通过Channel中的revents_变量得知)和感兴趣的事件(通过Channel中的events_变量得知)来选择调用read_callback_和/或write_callback_和/或close_callback_和/或error_callback_。
2. Poller/EpollPoller类

负责监听文件形貌符事件是否触发以及返回发生事件的文件形貌符以及具体事件。相当于事件监听器的具体实现。一个Poller对象对应一个事件监听器。在multi-reactor模型中,有多少reactor就有多少Poller。
muduo提供了epoll和poll两种IO多路复用方法来实现事件监听。不过默认是使用epoll来实现,也可以通过选项选择poll。
Poller是个抽象虚类,由EpollPoller和PollPoller继续实现,与监听文件形貌符和返回监听结果的具体方法也根本上是在这两个派生类中实现。EpollPoller就是封装了用epoll方法实现的与事件监听有关的各种方法,PollPoller就是封装了poll方法实现的与事件监听有关的各种方法。
epollfd_就是用epoll_create方法返回的epoll句柄,也就是返回一个文件形貌符,代表一个epoll实例,这个文件形貌符可以用于管理和监视多个文件形貌符,以便在这些文件形貌符上发生 I/O 事件时举行高效的事件通知和处理。
channels_:这个变量是std::unordered_map<int, Channel*>范例,负责记录 文件形貌符 ---> Channel的映射,也帮助保管全部注册在你这个Poller上的Channel。
ownerLoop_:所属的EventLoop对象
TimeStamp poll(int timeoutMs, ChannelList *activeChannels)
这个函数是EpollPoller的核心。上文可知,文件形貌符fd以及其感兴趣事件和实际发生的事件封装在Channel类中。
当外部调用poll方法时,该方法底层实在是通过epoll_wait获取这个事件监听器上发生事件的fd及其对应发生的事件,通过哈希表channels_可以根据fd找到封装这个fd的Channel,然后将监听到该fd发生的事件写到Channel类的revents成员变量中。
然后把这个Channel装进activeChannels中(它是一个vector<Channel*>)。如许,当外界调用完poll之后就能拿到事件监听器的监听结果(activeChannels_这个activeChannels就是事件监听器监听到的发生事件的fd,以及每个fd都发生了什么事件。
3. EventLoop类

调用一次Poll方法可以返回事件监听器的监听结果(发生事件的fd以及发生的事件),为了网络服务器具有持续监听、获取、处理事件的能力,需要循环调用poll方法
调用Poller:poll方法获取实际发生事件的Channel集合,然后调用这些Channel内里保管的差异范例事件的处理函数(调用Channel::HandlerEvent方法)。
  1. void EventLoop::loop()
  2. { //EventLoop 所属线程执行
  3.     省略代码 省略代码 省略代码  
  4.     while(!quit_)
  5.     {
  6.         activeChannels_.clear();
  7.         pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
  8.         for(Channel *channel : activeChannels_)
  9.             channel->HandlerEvent(pollReturnTime_);
  10.         省略代码 省略代码 省略代码
  11.     }
  12.     LOG_INFO("EventLoop %p stop looping. \n", t_loopInThisThread);
  13. }
复制代码
每个EventLoop对象都唯一绑定了一个线程,这个线程实在就在一直执行这个函数内里的while循环。调用Poller::poll方法获取事件监听器上的监听结果。接下来在loop内里就会调用监听结果中每一个Channel的处理函数HandlerEvent( )。每一个Channel的处理函数会根据Channel类中封装的实际发生的事件,执行Channel类中封装的各事件处理函数。
4. 其余主要类的介绍


简单来说,Acceptor利用listen()将新连接注册到main EventLoop上。当main EventLoop监听到新连接事件时,调用handleRead()方法,以负载均衡的选择方式选择一个sub EventLoop,并把这个新连接分发到这个subEventLoop上。


封装了一个用户缓冲区,以及向这个缓冲区写数据读数据等一系列控制方法。
这个Buffer类是vector<char>(方便动态扩容),对外表现出std::queue的特性,它的内部原理大概就是下图如许子的,用两个游标(readerIndex_和writerIndex_)标记可读缓冲区的起始位置和空闲空间的起始位置。


此中需要关注的一个思想就是,随着写入数据和读入数据,蓝色的空闲空间会越来越少,prependable空间会越来越大,当什么时候空用空间耗尽了,就会向步骤4一样,把全部数据拷贝前移,重新调整。
另外当整个缓冲区的prependable空间和蓝色的空闲空间都无法装下新来的数据时,那就会调用vector的resize,实现扩容机制。

下面两个方法主要是封装了调用了上面几个方法:

TcpConnection类和Acceptor类是兄弟关系,Acceptor用于main EventLoop中,对服务器监听套接字fd及其相关方法举行封装(监听、接受连接、分发连接给SubEventLoop等),TcpConnection用于SubEventLoop中,对连接套接字fd及其相关方法举行封装(读消息事件、发送消息事件、连接关闭事件、错误事件等)。

handleRead()、handleWrite()、handleClose()、handleError():
这四个函数都是私有成员方法,在一个已经建立好的Tcp连接上主要会发生四类事件:可读事件、可写事件、连接关闭事件、错误事件。当事件监听器监听到一个连接发生了以上的事件,那么就会在EventLoop中调用这些事件对应的处理函数。

5.1.3 Muduo库学习与实践

5.1.3.1 用Muduo库搭一个最简单的Echo服务器


  1. /*利用Muduo库实现一个简单的Eco服务器*/
  2. /*
  3. 头文件包含:
  4. muduo/net
  5. muduo/base
  6. 1、创建main Eventloop,循环事件监听器,负责循环监听新用户的连接情况
  7. 2、设置监听的地址和端口
  8. 3、创建sub Eventloop,封装为EchoServer类,包含Tcpserver和loop
  9. 4、向TcpServer注册各类事件的用户自定义的处理函数: setMessageCallback()、setConnectionCallback();并在private进行实现
  10. 5、创建EchoServer 对象,启动TcpServer
  11. 6、main Eventloop 执行loop();
  12. 工作流程:
  13. 主线程的EventLoop:
  14.     创建并运行主EventLoop。
  15.     监听新连接。
  16. 线程池中的EventLoop:
  17.     TcpServer根据设置的线程数创建多个线程,每个线程运行一个EventLoop。
  18.     当有新连接到来时,主EventLoop会将连接分配给线程池中的某个EventLoop。
  19. 处理连接:
  20.     分配到线程池中的EventLoop会处理该连接的所有I/O事件(如读写数据)。
  21. */
  22. #include<iostream>
  23. using namespace std;
  24. #include<muduo/base/Logging.h>
  25. #include<muduo/net/EventLoop.h>
  26. #include<muduo/net/TcpServer.h>
  27. using namespace muduo;
  28. using namespace muduo::net;
  29. #include<functional>
  30. // #include<sting>
  31. class EchoServer{
  32. public:
  33.     //构造函数
  34.     EchoServer(EventLoop *loop,
  35.                const InetAddress &addr,
  36.                const string &name)
  37.         : server_(loop, addr, name),
  38.           loop_(loop)
  39.     {
  40.         //设置用户处理函数的回调函数,当服务器检测到新连接并调用回调函数时,会将参数传递给成员函数onConnection并调用
  41.         //利用bind创建一个绑定对象作为回调函数的参数,
  42.         //第一个参数是成员函数的指针,也就是要回调的函数,第二个参数为当前类的实例对象,也就是调用哪个对象的成员函数
  43.         //第三个参数为占位符,表示将来调用回调函数时提供第一个参数,而不是在绑定时提供。换句话讲,当调用回调函数时提供TcpConnectionPtr &conn,进而作为onConnection的参数,而不是绑定时提供
  44.         //下面这个类似于当调用回调函数时,执行this->onConnection(conn)
  45.         server_.setConnectionCallback(bind(&EchoServer::onConnection,this,placeholders::_1));
  46.         //将用户定义的可读事件处理函数注册进TcpServer中,TcpServer发生可读事件时会执行onMessage函数。
  47.         //占位符的简单使用:回调的函数有几个参数就写几个占位符
  48.         server_.setMessageCallback(bind(&EchoServer::onMessage, this,
  49.                                         placeholders::_1, std::placeholders::_2, std::placeholders::_3));
  50.         //设置sub reactor线程数,有几个线程就有几个subreactor,每个线程有一个独立的EventLoop。
  51.         server_.setThreadNum(3);
  52.     }
  53.     void strat(){
  54.         server_.start();
  55.     }
  56. private:
  57.     //用户自定义的处理函数
  58.     //用户连接处理函数,当服务端接收到新连接建立请求,则打印Connection 对象的地址和端口,如果是关闭连接请求,则打印Connection Down
  59.     //TcpConnectionPtr是一个共享所有权的智能指针,可以在多个地方共享同一个 TcpConnection 对象的所有权
  60.     // TcpConnection 是一个类或结构体,表示一个TCP连接。它封装了与TCP连接相关的操作和状态,例如连接的建立、断开、数据传输等
  61.     void onConnection(const TcpConnectionPtr &conn){
  62.         if (conn->connected())
  63.         {
  64.             //使用 LOG_INFO 宏记录一条信息,内容是连接的对端地址和端口
  65.             LOG_INFO << "Connection from " << conn->peerAddress().toIpPort();
  66.         }
  67.         else
  68.         {
  69.             LOG_INFO << "Connection disconnected " << conn->peerAddress().toIpPort();
  70.         }
  71.     }
  72.     //用户可读事件处理函数:当一个TCP发生了可读事件,就把接收到的消息原封不动传回去
  73.     void onMessage(const TcpConnectionPtr &conn,
  74.                    Buffer *buf,
  75.                    Timestamp time)
  76.     {
  77.         //从缓冲区检索所有接收到的数据
  78.         string msg=buf->retrieveAllAsString();
  79.         conn->send(msg);
  80.         conn->shutdown();
  81.     }
  82.     //创建服务器对象
  83.     TcpServer server_;
  84.     //创建sub EventLoop
  85.     EventLoop* loop_;
  86. };
  87. int main(){
  88.     //定义main eventloop
  89.     EventLoop loop;
  90.     //封装socketaddr_in,设置监听地址和端口号,端口号为2007,ip地址为默认的0.0.0.0(INADDR_ANY,即服务器可以连接所有可用的网络接口)
  91.     InetAddress addr(2007);
  92.     EchoServer server(&loop,addr,"echoserver-01");
  93.     server.strat();
  94.     loop.loop();
  95.     return 0;
  96. }
复制代码
测试结果:




5.1.3.2 Muduo连接建立思想



当创建Tcpserver对象时,即执行代码TcpServer server(&loop, listenAddr);调用构造函数实例化Acceptor对象,并向这个Acceptor对象注册一个回调函数TcpServer::newConnection()。
当在TcpServer构造函数实例化Acceptor对象时,Acceptor的构造函数中实例化了一个Channel对象,即acceptChannel_,该Channel对象封装了服务器监听套接字文件形貌符(尚未注册到main EventLoop的事件监听器上)。
接着Acceptor构造函数将Acceptor::handleRead( )方法注册进acceptChannel_中,这也意味着,日后如果事件监听器监听到acceptChannel_发生可读事件(新用户连接),将会调用AcceptorC::handleRead( )函数。
至此,TcpServer对象创建完毕,用户调用TcpServer::start( )方法,开启TcpServer。我们来直接看一下TcpServer::start( )方法都干了什么。
  1. /**** TcpServer.cc *******/
  2. void TcpServer::start() //开启服务器监听
  3. {
  4.     省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略
  5.     loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
  6.     //让这个EventLoop,也就是mainloop来执行Acceptor的listen函数,开启服务端监听
  7.     省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略 省略
  8. }
  9. /***** Acceptor.cc ********/
  10. void Acceptor::listen()
  11. {
  12.     listenning_ = true;
  13.     acceptSocket_.listen();
  14.     acceptChannel_.enableReading();
  15. }
复制代码
实在就是将实在主要就是调用Acceptor::listen( )函数(底层是调用了linux的函数listen( ))监听服务器套接字,以及将acceptChannel_注册到main EventLoop的事件监听器上监听它的可读事件(新用户连接事件)
接着用户调用loop.loop( ),即调用了EventLoop::loop( )函数,该函数就会循环的获取事件监听器的监听结果,并且根据监听结果调用注册在事件监听器上的Channel对象的事件处理函数。
当步调如果执行到了这个函数内里,分析acceptChannel_发生可读事件,步调处理新客户连接请求。
该函数起首调用了Linux的函数accept( )接受新客户连接。
接着调用了TcpServer::newConnection( )函数,这个函数是在步骤1中注册给Acceptor并由成员变量newConnectionCallback_保存。
该函数的主要功能就是将建立好的连接举行封装(封装成TcpConnection对象),并使用选择算法公平的选择一个sub EventLoop,并调用TcpConnection::connectEstablished( )将TcpConnection::channel_注册到刚刚选择的sub EventLoop上。
  1. /******** Callbacks.h  ********/
  2. using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
  3. /******** TcpServer.cc ********/
  4. void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
  5. {
  6.     代码省略
  7.         TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
  8.     connections_[connName] = conn;
  9.     代码省略
  10.     ioLoop->runInLoop(bind(&TcpConnection::connectEstablished, conn));
  11. }
复制代码
在TcpServer::newConnection()函数中,当接受了一个新用户连接,就要把这个Tcp连接封装成一个TcpConnection对象,也就是上面代码中的new TcpConnection(...)。然后用一个共享型智能指针来管理这个对象。
这里使用智能指针管理TcpConnetion的最重要缘故原由在于防止指针悬空,而指针悬空可能会来自以下这两个方面:
a. TcpConnection会和用户直接交互,用户可能会误删除。
在我们编写Echo服务器的时候,我们用户可以自定义连接事件发生后的处理函数(如下所示),并将这个函数注册到TcpServer中。
  1. /**** 用户自定义的连接事件发生后的处理函数 *****/
  2. void onConnection(const TcpConnectionPtr &conn)
  3. {
  4.     .......
  5. }
复制代码
假如这里的onConnection函数传入的是TcpConnection而不是TcpConnectionPtr,用户在onConnection函数中把TcpConnection对象给Delete了怎么办?删除了之后,步调内部还要好几处地方都在使用TcpConnection对象。结果这个对象的内存忽然消失了,服务器访问非法内存崩溃。虽然这一系列连锁反应会让人以为用户很笨。但是作为计划者的我们必须要包管,编程计划不可以依靠用户行为,肯定要尽可能地封死用户的误操纵。以是这里用了共享智能指针。
b. TcpConnection对象的多线程安全问题:
假如服务器要关闭了这个时候MainEventLoop线程中的TcpServer::~TcpServer()函数开始把全部TcpConnection对象都删掉。那么其他线程还在使用这个TcpConnection对象,如果你把它的内存空间都释放了,其他线程访问了非法内存,会直接崩溃。你可能会以为,反正我都要把服务器给关了,崩就崩了吧。这种想法是错的!因为可能在你关闭服务器的时候,其他线程正在处理TcpConnection的发送消息任务,这个时候你应该等它发完才释放TcpConnection对象的内存才对!
总结:muduo连接建立:TcpServer( )实例化Acceptor对象,Acceptor( )实例化Channel对象调用listen(),handleRead( ):处理新用户连接,调用accept(),调用TcpServer::newConnection()函数把新用户Tcp连接封装成TcpConnection对象,分发给subEventloop
5.1.3.3 Muduo消息读取逻辑



如上图所示,SubEventLoop中的EventLoop::loop()函数内部会循环的执行上图中的步骤1和步骤2。步骤1就是调用Poller::poll()方法获取事件监听结果,这个事件监听结果是一个Channel集合,每一个Channel封装着 [一个fd] 及 [fd感兴趣的事件] 和 [事件监听器监听到该fd实际发生的事件]。步骤2就是调用每一个Channel的Channel::HandlerEvent方法。该方法会根据每一个Channel的感兴趣事件以及实际发生的事件调用提前注册在Channel内的对应的事件处理函数(readCallback_、writeCallback_、closeCallback_、errorCallback_)。
readCallback_保存的函数实在是TcpConnection::handleRead( ),消息读取的处理逻辑也就是由这个函数提供的,我们稍微分析一下这个函数:
  1. void TcpConnection::handleRead(TimeStamp receiveTime)
  2. {
  3.     int savedErrno = 0;
  4.     ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  5.     if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
  6.     {
  7.         messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  8.     }
  9.     else if(n == 0)
  10.         handleClose();
  11.     else
  12.     {
  13.         errno = savedErrno;
  14.         LOG_ERROR("TcpConnection::handleRead");
  15.         handleError();
  16.     }
  17. }
复制代码
TcpConnection::handleRead( )函数起首调用Buffer_.readFd(channel_->fd(), &saveErrno),该函数底层调用Linux的函数readv( ),将Tcp吸收缓冲区数据拷贝到用户定义的缓冲区中(inputBuffer_)。如果在读取拷贝的过程中发生了什么错误,这个错误信息就会保存在savedErrno中。
当readFd( )返回值大于0,分析从吸收缓冲区中读取到了数据,那么会接着调用messageCallback_中保存的用户自定义的读取消息后的处理函数。
readFd( )返回值等于0,分析客户端连接关闭,这时候应该调用TcpConnection::handleClose( )来处理连接关闭事件
readFd( )返回值等于-1,分析发生了错误,调用TcpConnection::handleError( )来处理savedErrno的错误事件。Moduo库只支持LT模式,以是读事件不会出现EAGAIN的错误,以是一旦出现错误,分析肯定是比较不好的非正常错误了。而EAGAIN错误只不过是非阻塞IO调用时的一种常见错误而已。
补充:epoll LT 模式和 ET 模式
与 poll 的事件宏相比,epoll 新增了一个事件宏 EPOLLET,这就是所谓的边沿触发模式Edge Trigger,ET),而默认的模式我们称为 水平触发模式Level Trigger,LT)。这两种模式的区别在于:

特性总结:

原文链接:epoll LT 模式和 ET 模式详解(文末赠书)-腾讯云开辟者社区-腾讯云
Buffer::readFd()函数分析:
分析这个函数是因为这个函数的计划有可取之处。这个readFd巧妙的计划,可以让用户一次性把全部TCP吸收缓冲区的全部数据全部都读出来并放到用户自定义的缓冲区Buffer中。
用户自定义缓冲区Buffer是有大小限制的,我们一开始不知道TCP吸收缓冲区中的数据量有多少,如果一次性读出来会不会导致Buffer装不下而溢出。以是在readFd( )函数中会在栈上创建一个临时空间extrabuf,然后使用readv的分散读特性,将TCP缓冲区中的数据先拷贝到Buffer中,如果Buffer容量不敷,就把剩余的数据都拷贝到extrabuf中,然后再调整Buffer的容量(动态扩容),再把extrabuf的数据拷贝到Buffer中。当这个函数结束后,extrabuf也会被释放。另外extrabuf是在栈上开辟的空间,速度比在堆上开辟还要快。
  1. ssize_t Buffer::readFd(int fd, int* saveErrno)
  2. {
  3.     char extrabuf[65536] = {0}; //栈上的内存空间
  4.     struct iovec vec[2];
  5.     const size_t writableSpace = writableBytes(); //可写缓冲区的大小
  6.     vec[0].iov_base = begin() + writerIndex_; //第一块缓冲区
  7.     vec[0].iov_len = writableSpace; //当我们用readv从socket缓冲区读数据,首先会先填满这个vec[0],也就是我们的Buffer缓冲区
  8.     vec[1].iov_base = extrabuf; //第二块缓冲区,如果Buffer缓冲区都填满了,那就填到我们临时创建的
  9.     vec[1].iov_len = sizeof(extrabuf); //栈空间上。
  10.     const int iovcnt = (writableSpace < sizeof(extrabuf) ? 2 : 1);
  11.     const ssize_t n = ::readv(fd, vec, iovcnt);
  12.     if(n < 0){
  13.         *saveErrno = errno; //出错了!!
  14.     }
  15.     else if(n <= writableSpace){ //说明Buffer空间足够存了
  16.         writerIndex_ += n;  //
  17.     }
  18.     else{ //Buffer空间不够存,需要把溢出的部分(extrabuf)倒到Buffer中(会先触发扩容机制)
  19.         writerIndex_ = buffer_.size();
  20.         append(extrabuf, n-writableSpace);
  21.     }
  22.     return n;
  23. }
复制代码
5.1.3.4 Muduo消息发送逻辑

当用户调用了TcpConnetion::send(buf)函数时,相当于要求muduo库把数据buf发送给该Tcp连接的客户端。此时该TcpConnection注册在事件监听器上的感兴趣事件中是没有可写事件的。TcpConnection::send(buf)函数内部实在是调用了Linux的函数write( )(别说你不知道write( )是啥)
如果TCP发送缓冲区能一次性容纳buf,那这个write( )函数将buf全部拷贝到发送缓冲区中。
如果TCP发送缓冲区内不能一次性容纳buf:



5.1.3.5 Muduo连接断开逻辑

5.1.3.5.1 连接被动断开

服务端TcpConnection::handleRead()中感知到客户端把连接断开了。
TcpConnection::handleRead( )函数内部调用了Linux的函数readv( ),当readv( )返回0的时候,服务端就知道客户端断开连接了。然后就接着调用TcpConnection::handleClose( )。


上图中的标号1、2、3是函数调用顺序,我们可以看到:
5.1.3.5.2 服务器主动关闭导致连接断开

当服务器主动关闭时,调用TcpServer::~TcpServer()析构函数。
这里在提示一下EventLoop::runInLoop()函数的意义,假如你有一个EventLoop对象 loop_,当你调用了loop_->runInLoop(function)函数时,这个function函数的执行会在这个loop_绑定的线程上运行!
以是我们画了下面这幅图,在创建TcpConnection对象时,Acceptor都要将这个对象分发给一个SubEventLoop来管理。这个TcpConnection对象的一切函数执行都要在其管理的SubEventLoop线程中运行。再一次贯彻One Loop Per Thread的计划模式。比如要想彻底删除一个TcpConnection对象,就必须要调用这个对象的connecDestroyed()方法,这个方法执行完后才气释放这个对象的堆内存。每个TcpConnection对象的connectDestroyed()方法都必须在这个TcpConnection对象所属的SubEventLoop绑定的线程中执行。


全部上面的TcpServer::~TcpServer()函数就是干这事儿的,不断循环的让这个TcpConnection对象所属的SubEventLoop线程执行TcpConnection::connectDestroyed()函数,同时在MainEventLoop的TcpServer::~TcpServer()函数中调用item.second.reset()释放保管TcpConnection对象的共享智能指针,以到达释放TcpConnection对象的堆内存空间的目的。
但是这内里实在有一个问题需要解决,TcpConnection::connectDestroyed()函数的执行以及这个TcpConnection对象的堆内存释放操纵不在同一个线程中运行,以是要思量怎么包管一个TcpConnectino对象的堆内存释放操纵是在TcpConnection::connectDestroyed()调用完后。
这个析构函数巧妙利用了共享智能指针的特点,当没有共享智能指针指向这个TcpConnection对象时(引用计数为0),这个TcpConnection对象就会被析构删除(堆内存释放)。
  1. TcpServer::~TcpServer()
  2. {
  3.     //connections类型为std::unordered_map<std::string, TcpConnectionPtr>;
  4.     for(auto &item : connections_)
  5.     {
  6.         //创建一个临时的 TcpConnectionPtr 智能指针:
  7.         //item.second 是 std::shared_ptr<TcpConnection>,它持有一个 TcpConnection 对象。
  8.         //创建了一个新的 shared_ptr(即 conn)与 item.second 共享同一个 TcpConnection 对象。此时,TcpConnection 的引用计数加1。
  9.         TcpConnectionPtr conn(item.second);
  10.         //调用 reset 方法将 item.second 所持有的 TcpConnection 对象释放。
  11.         //由于 conn 还持有该对象,TcpConnection 的引用计数减1,但不会被销毁。
  12.         item.second.reset();
  13.         /*
  14.         conn->getLoop() 返回 TcpConnection 所属的事件循环对象。
  15.         调用 runInLoop 方法将一个任务(bind(&TcpConnection::connectDestroyed, conn))添加到事件循环中。
  16.         任务的逻辑是调用 TcpConnection::connectDestroyed,通知连接对象销毁。
  17.         此时,conn 依然持有连接对象,确保对象不会在事件循环中使用时被销毁。
  18.         每次循环结束时,conn 超出其作用域被销毁,TcpConnection 对象的引用计数减1。
  19.         如果没有其他地方再持有该对象(引用计数为0),则 TcpConnection 对象会被销毁。
  20.         */
  21.         conn->getLoop()->runInLoop(bind(&TcpConnection::connectDestroyed, conn));
  22.     }
  23. }
复制代码


  1. /***** TcpConnection.cc *****/
  2. void TcpConnection::connectEstablished()
  3. {
  4.     setState(kConnected);
  5.     channel_->tie(shared_from_this());
  6.     channel_->enableReading(); //向poller注册channel的epollin事件
  7.     //新连接建立,执行回调
  8.     connectionCallback_(shared_from_this());
  9. }
复制代码
我们先相识一下shared_from_this()是什么意思,起首TcpConnection类继续了一个类,继续了这个类之后才气使用。
  1. class TcpConnection :public std::enable_shared_from_this<TcpConnection>
复制代码
假如我们在TcpConnection对象(我们管这个对象叫TCA)中的成员函数中调用了shared_from_this(),该函数可以返回一个shared_ptr,并且这个shared_ptr指向的对象是TCA。
接着这个shared_ptr就作为channel_的Channel::tie()函数的函数参数。
  1. /*****  Channel.h   ******/
  2. std::weak_ptr<void> tie_;
  3. /*****  Channel.cc   ******/
  4. void Channel::tie(const shared_ptr<void>& obj)
  5. {
  6.     tie_ = obj;
  7.     tied_ = true;
  8. }
  9. void Channel::HandlerEvent(TimeStamp receiveTime)
  10. {
  11.     if(tied_){
  12.         shared_ptr<void> guard = tie_.lock();
  13.         if (guard)
  14.             HandleEventWithGuard(receiveTime);
  15.     }
  16.     else{
  17.         。。。。一般不会执行到这里其实。我实在想不到正常运行的情况下怎么会执行到这里,可能是我比较菜。
  18.         HandleEventWithGuard(receiveTime);
  19.     }
  20. }
复制代码
当事件监听器返回监听结果,就要对每一个发生事件的channel对象调用他们的HandlerEvent()函数。在这个HandlerEvent函数中,会先把tie_这个weak_ptr提升为强共享智能指针。这个强共享智能指针会指向当前的TcpConnection对象。就算你表面调用删除析构了其他全部的指向该TcpConnection对象的智能指针。你只要HandleEventWithGuard()函数没执行完,你这个TcpConnetion对象都不会被析构释放堆内存。而HandleEventWithGuard()函数内里就有负责处理消息发送事件的逻辑。当HandleEventWithGuard()函数调用完毕,这个guard智能指针就会被释放。
5.1.3.6 线程思想-One Loop Per Thread

One Loop Per Thread的寄义就是,一个EventLoop和一个线程唯一绑定,和这个EventLoop有关的,被这个EventLoop管辖的一切操纵都必须在这个EventLoop绑定线程中执行,比如在MainEventLoop中,负责新连接建立的操纵都要在MainEventLoop线程中运行。已建立的连接分发到某个SubEventLoop上,这个已建立连接的任何操纵,比如吸收数据发送数据,连接断开等事件处理都必须在这个SubEventLoop线程上运行,还不准跑到别的SubEventLoop线程上运行。

  1. #include <sys/eventfd.h>
  2. int eventfd(unsigned int initval, int flags);
复制代码
调用函数eventfd()会创建一个eventfd对象,或者也可以理解打开一个eventfd范例的文件,类似平凡文件的open操纵。eventfd的在内核空间维护一个无符号64位整型计数器, 初始化为initval的值。











下图是EventLoop构造函数(我把不相关的代码全删了。以是看上去可能会有一点点光秃秃)
  1. /***** EventLoop.cc *****/
  2. __thread EventLoop *t_loopInThisThread = nullptr;
  3. EventLoop::EventLoop() :
  4.     wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
  5.         wakeupChannel_(new Channel(this, wakeupFd_))
  6. {
  7.     LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
  8.     if(t_loopInThisThread) //如果当前线程已经绑定了某个EventLoop对象了,那么该线程就无法创建新的EventLoop对象了
  9.         LOG_FATAL("Another EventLoop %p exits in this thread %d \n", t_loopInThisThread, threadId_);
  10.     else
  11.         t_loopInThisThread = this;
  12.     wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
  13.     wakeupChannel_->enableReading();
  14. }
复制代码
介绍一下这个__thread,这个__thread是一个关键字,被这个关键字修饰的全局变量 t_loopInThisThread会具备一个属性,那就是该变量在每一个线程内都会有一个独立的实体。因为一样平常的全局变量都是被同一个进程中的多个线程所共享,但是这里我们不希望如许。
在EventLoop对象的构造函数中,如果当前线程没有绑定EventLoop对象,那么t_loopInThisThread为nullptr,然后就让该指针变量指向EventLoop对象的地址。如果t_loopInThisThread不为nullptr,分析当前线程已经绑定了一个EventLoop对象了,这时候EventLoop对象构造失败!
EventLoop构造函数的初始化列表中,如下所示:
  1. int createEventfd()
  2. {
  3.     int evtfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  4.     if(evtfd < 0) //eventfd创建失败,一般不会失败,除非一个进程把文件描述符(Linux一个进程1024个最多)全用光了。
  5.         LOG_FATAL("eventfd error: %d \n", errno);
  6.    
  7.     return evtfd;
  8. }
  9. EventLoop::EventLoop()
  10.     : wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
  11.         ...
  12. {...}
复制代码
在EventLoop的初始化列表中:

在EventLoop的构造函数体内:

我们来刻画一个景象,我们知道每个EventLoop线程主要就是在执行其EventLoop对象的loop函数(该函数就是一个while循环,循环的获取事件监听器的结果以及调用每一个发生事件的Channel的事件处理函数)。此时SubEventLoop上注册的Tcp连接都没有任何动静,整个SubEventLoop线程就阻塞在epoll_wait()上。
此时MainEventLoop接受了一个新连接请求,并把这个新连接封装成一个TcpConnection对象,并且希望在SubEventLoop线程中执行TcpConnection::connectEstablished()函数,因为该函数的目的是将TcpConnection注册到SubEventLoop的事件监听器上,并且调用用户自定义的连接建立后的处理函数。当该TcpConnection对象注册到SubEventLoop之后,这个TcpConnection对象的任何操纵(包括调用用户自定义的连接建立后的处理函数。)都必须要在这个SubEventLoop线程中运行,以是TcpConnection::connectEstablished()函数必须要在SubEventLoop线程中运行。
那么我们怎么在MainEventLoop线程中通知SubEventLoop线程起来执行TcpConnection::connectEstablished()函数呢?这里就要好好研究一下EventLoop::runInLoop()函数了。
  1. void EventLoop::runInLoop(Functor cb)
  2. {//该函数保证了cb这个函数对象一定是在其EventLoop线程中被调用。
  3.     if(isInLoopThread())//如果当前调用runInLoop的线程正好是EventLoop的运行线程,则直接执行此函数
  4.         cb();
  5.     else//否则调用 queueInLoop 函数
  6.         queueInLoop(cb);
  7. }
  8. void EventLoop::queueInLoop(Functor cb)
  9. {
  10.     {
  11.         unique_lock<mutex> lock(mutex_);
  12.         pendingFunctors_.emplace_back(cb);
  13.     }
  14.     if(!isInLoopThread() || callingPendingFunctors_)
  15.         wakeup();
  16. }
  17. void EventLoop::wakeup()
  18. {
  19.     uint64_t one = 1;
  20.     ssize_t n = write(wakeupFd_, &one, sizeof(one));
  21.     if(n != sizeof(n))
  22.         LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
  23. }
  24. void EventLoop::loop()
  25. { //EventLoop 所属线程执行
  26.     looping_ = true;
  27.     quit_ = false;
  28.     LOG_INFO("EventLoop %p start looping \n", this);   
  29.     while(!quit_)
  30.     {
  31.         activeChannels_.clear();
  32.         pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
  33.         for(Channel *channel : activeChannels_)
  34.             channel->HandlerEvent(pollReturnTime_);
  35.         doPendingFunctors(); //执行当前EventLoop事件循环需要处理的回调操作。
  36.     }
  37. }
  38. void EventLoop::doPendingFunctors()
  39. {
  40.    std::vector<Functor> functors;
  41.    callingPendingFunctors_ = true;
  42.    {
  43.        unique_lock<mutex> lock(mutex_);
  44.        functors.swap(pendingFunctors_); //这里的swap其实只是交换的vector对象指向的内存空间的指针而已。
  45.    }
  46.    for(const Functor &functor:functors)
  47.    {
  48.        functor();
  49.    }
  50.    callingPendingFunctors_ = false;
  51. }
复制代码
起首EventLoop::runInLoop函数接受一个可调用的函数对象Functor cb,如果当前cpu正在运行的线程就是该EventLoop对象绑定的线程,那么就直接执行cb函数。否则就把cb传给queueInLoop()函数。
在queueInLoop()函数中主要就是把cb这个可调用对象保存在EventLoop对象的pendingFunctors_这个数组中,我们希望这个cb能在某个EventLoop对象所绑定的线程上运行,但是由于当前cpu执行的线程不是我们等待的这个EventLoop线程,我们只能把这个可调用对象先存在这个EventLoop对象的数组成员pendingFunctors_中。
我们再把目光转移到上面代码中的EventLoop::loop()函数中。我们知道EventLoop::loop()肯定是运行在其所绑定的EventLoop线程内,在该函数内会调用doPendingFunctors()函数,这个函数就是把本身这个EventLoop对象中的pendingFunctors_数组中保存的可调用对象拿出来执行。pendingFunctors_中保存的是其他线程希望你这个EventLoop线程执行的函数。
另有一个问题,假如EventLoop A线程阻塞在EventLoop::loop()中的epoll_wait()调用上(EventLoop A上监听的文件形貌符没有任何事件发生),这时候EventLoop线程要求EventLoopA赶紧执行某个函数,那其他线程要怎么唤醒这个阻塞住的EventLoopA线程呢?这时候我们就要把目光聚焦在上面的wakeup()函数了。
wakeup()函数就是向我们想唤醒的线程所绑定的EventLoop对象持有的wakeupFd_随便写入一个8字节数据,因为wakeupFd_已经注册到了这个EventLoop中的事件监听器上,这时候事件监听器监听到有文件形貌符的事件发生,epoll_wait()阻塞结束而返回。这就相当于起到了唤醒线程的作用!你这个EventLoop对象既然阻塞在事件监听上,那我就通过wakeup()函数给你这个EventLoop对象一个事件,让你结束监听阻塞。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4