Muduo网络库解析--网络模块(2)

打印 上一主题 下一主题

主题 1017|帖子 1017|积分 3051

前文

重写Muduo库实现核心模块的Git仓库
注:本文将重点剖析 Muduo 网络库的核心框架,深入探讨作者精妙的代码计划思绪,并针对核心代码部分举行重写,将原本依赖 boost 的实现更换为原生的 C++11 语法。需要阐明的是,本文并不打算对整个 Muduo 库举行完备的重写。Muduo库源码链接
在上文中,我们对Muduo网络库的核心网络模块中的Socket、InetAddress以及Acceptor举行了解析。这节我们将对剩余的核心网络模块中的TcpConnection以及TcpServer举行解析。
TcpConnection

Muduo 网络库 中,TcpConnection 是一个非常重要的类,重要用于表示并管理一个 TCP 连接。它抽象了应用层和网络层之间的交互,负责处理一个具体的 TCP 连接的生命周期以及数据的发送和接收。
TcpConnection 的重要作用

  • 抽象 TCP 连接

    • TcpConnection 表示一个具体的 TCP 连接,隐藏了底层的 socket 形貌符和 epoll 等细节,使得用户只需要关注逻辑层面。
    • 每一个客户端连接都会对应一个 TcpConnection 实例。

  • 管理 TCP 连接的生命周期

    • 包括连接的建立、数据的收发、连接的关闭等。
    • 在连接的不同阶段会触发对应的回调(callback),如连接建立回调、消息回调和关闭回调。

  • 提供高效的异步 I/O

    • 通过事件驱动模型,联合 Channel 和 EventLoop,实现异步非阻塞的 I/O 操作。

  • 数据缓冲

    • 提供输入缓冲区和输出缓冲区(Buffer),用于存储接收和发送的数据。

  • 支持用户自定义回调

    • 用户可以设置各种回调函数,比如连接建立的回调(ConnectionCallback)、消息到来的回调(MessageCallback)、写完成回调等。

  • 线程安全

    • TcpConnection 的大部分操作是线程安全的,支持跨线程调用,比如关闭连接时可以跨线程调用 shutdown。

TcpConnection 的核心功能

  • 连接受理

    • 提供方法来开启和关闭连接(connectEstablished() 和 connectDestroyed())。
    • 判断连接状态(isConnected() 等)。

  • 数据传输

    • 接收数据:通过 MessageCallback 回调函数处理接收到的数据。
    • 发送数据:提供 send() 方法,用于发送字符串或二进制数据。发送过程黑白阻塞的,数据会先存入输出缓冲区。

  • 回调设置

    • 支持用户设置各种回调函数,如:

      • ConnectionCallback:连接状态变革时的回调。
      • MessageCallback:收到数据时的回调。
      • WriteCompleteCallback:数据发送完毕时的回调。
      • CloseCallback:连接关闭时的回调。


  • 与事件循环集成

    • 每个 TcpConnection 实例绑定一个 EventLoop,并通过 Channel 监听和处理其对应 socket 的事件(如可读、可写等)。

类图如下:

类的关键成员变量和方法

  • 重要成员变量

    • EventLoop* loop_:所属的事件循环
    • StateE state_:表示连接的状态(如连接中、已连接、正在关闭、未连接)
    • unqiue_ptr<Socket> socket_:表示该TCP连接的socket
    • unique_ptr<Channel> channel_:表示该套接字形貌的channel
    • InetAddress localAddr_:本地的IP和端口
    • InetAddress peerAddr_:对端的IP和端口
    • 输入缓冲区和输出缓冲区
    • 一系列回调函数:connectionCallback_、messageCallback_等

  • 重要方法

    • void send(const std::string& message):发送数据。
    • void shutdown():关闭连接的写端。
    • void connectEstablished():在连接建立后被调用,初始化连接。
    • void connectDestroyed():在连接关闭后被调用,清理资源。
    • 回调设置方法:setConnectionCallback()、setMessageCallback() 等。

TcpConnection.h

  1. class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
  2. {
  3. public:
  4.     TcpConnection(EventLoop* loop,
  5.                 const std::string& name,
  6.                 int sockfd,
  7.                 const InetAddress& localAddr,
  8.                 const InetAddress& peerAddr);
  9.     ~TcpConnection();
  10.     EventLoop* getLoop() const { return loop_; }
  11.     const std::string& name() const { return name_; }
  12.     const InetAddress& localAddress() const { return localAddr_; }
  13.     const InetAddress& peerAddress() const { return peerAddr_; }
  14.     bool conncted() const { return state_ == StateE::kConnected; }
  15.     // 发送数据
  16.     void send(const std::string& buf);
  17.     // 关闭连接
  18.     void shutdown();
  19.    
  20.     void setConnectionCallback(const ConnectionCallback& cb)
  21.     { connectionCallback_ = cb; }
  22.     void setMessageCallback(const MessageCallback& cb)
  23.     { messageCallback_ = cb; }
  24.     void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  25.     { writeCompleteCallback_ = cb; }
  26.     void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  27.     { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
  28.     void setCloseCallback(const CloseCallback& cb)
  29.     { closeCallback_ = cb; }
  30.     // 建立连接
  31.     void connectEstablished();
  32.     // 销毁连接
  33.     void connectDestroyed();
  34. private:
  35.     enum StateE
  36.     {
  37.         kDisconnected,        // 已关闭连接
  38.         kConnecting,        // 正在连接
  39.         kConnected,                // 已连接
  40.         kDisconnecting        // 正在关闭连接
  41.     };
  42.     void handleRead(Timestamp receiveTime);
  43.     void handleWrite();
  44.     void handleClose();
  45.     void handleError();
  46.     void sendInLoop(const char* message, size_t len);
  47.     void shutdownInLoop();
  48.     void setState(StateE state) { state_ = state; }
  49.     EventLoop* loop_;
  50.     const std::string name_;
  51.     std::atomic_int state_;
  52.     bool reading_;
  53.     std::unique_ptr<Socket> socket_;
  54.     std::unique_ptr<Channel> channel_;
  55.     const InetAddress localAddr_;
  56.     const InetAddress peerAddr_;
  57.     ConnectionCallback connectionCallback_; // 有新连接的回调
  58.     MessageCallback messageCallback_;       // 有读写消息的回调
  59.     WriteCompleteCallback writeCompleteCallback_;    // 消息发送完成以后的回调
  60.     CloseCallback closeCallback_;
  61.     HighWaterMarkCallback highWaterMarkCallback_;   // 高水位回调
  62.    
  63.     // 高水位的值
  64.     size_t highWaterMark_;
  65.     Buffer inputBuffer_;
  66.     Buffer outputBuffer_;
  67. };
复制代码
TcpConnection.cc

构造函数

TcpConnection 表示并管理一个 TCP 连接,在事件就绪时,会主动调用用户注册的回调函数。与此相对应,channel_ 中负责注册和管理这些用户定义的回调函数。
  1. static EventLoop* CheckLoopNotNull(EventLoop* loop)
  2. {
  3.     if(loop == nullptr)
  4.     {
  5.         LOG_FATAL("%s:%s:%d TcpConnection is null!", __FILE__, __FUNCTION__, __LINE__);
  6.     }
  7.     return loop;
  8. }
  9. TcpConnection::TcpConnection(EventLoop *loop, std::string const &name,
  10.                              int sockfd, InetAddress const &localAddr,
  11.                              InetAddress const &peerAddr) :
  12.     loop_(CheckLoopNotNull(loop)),
  13.     name_(name),
  14.     state_(StateE::kConnecting),
  15.     reading_(true),
  16.     socket_(new Socket(sockfd)),
  17.     channel_(new Channel(loop_, sockfd)),
  18.     localAddr_(localAddr),
  19.     peerAddr_(peerAddr),
  20.     highWaterMark_(64*1024*1024)
  21. {
  22.     channel_->setReadCallback(
  23.         std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
  24.     );
  25.     channel_->setWriteCallback(
  26.         std::bind(&TcpConnection::handleWrite, this)
  27.     );
  28.     channel_->setCloseCallback(
  29.         std::bind(&TcpConnection::handleClose, this)
  30.     );
  31.     channel_->setErrorCallback(
  32.         std::bind(&TcpConnection::handleError, this)
  33.     );
  34.     LOG_INFO("TcpConnection::ctor[%s] at fd=%d", name_.c_str(), sockfd);
  35.     socket_->setKeepAlive(true);
  36. }
复制代码
新连接

在接受到新的客户端连接后,构造了一个新的TcpConnection,紧接着会实行一些后续操作(设置状态等),就是在MainLoop中实行TcpConnection::connectEstablished。在构造函数中,状态设置为正在连接,在connectEstablish设置为已连接,后续可以注册读写事件了。
  1. // 建立连接
  2. void TcpConnection::connectEstablished()
  3. {
  4.     setState(kConnected);
  5.     channel_->tie(shared_from_this()); // 将TcpConnection绑定到Channel上
  6.     channel_->enableReading();      // 向Poller注册EPOLLIN事件
  7.     // 新连接建立,执行回调
  8.     connectionCallback_(shared_from_this());
  9. }
复制代码
connectionCallback_为用户注册的回调函数。
读事件

Poller 在监听到读事件就绪后,会将活跃的 Channel 集合返回给 EventLoop,即 activeChannels。随后,EventLoop 遍历 activeChannels 中的每一个 Channel,并调用其对应的就绪事件回调函数。理解了前面这段话,我们可以思考谁把各种回调函数注册到了Channel?
   根据 TcpConnection 的构造函数,我们可以得出结论:TcpConnection 负责管理一个 TCP 连接的完备生命周期,而 Channel 则负责管理已连接后 socket 的各种回调函数、感兴趣的事件以及就绪事件等。因此,为了实现这一管理职责,TcpConnection 需要负责 Channel 的生命周期管理;
  并在构造函数中,调用Channel的公有接口注册回调函数。
  注册用于处理读事件的成员函数是 TcpConnection::handleRead,这意味着当读事件就绪时,会主动调用该函数。第一步先读数据,若数据已乐成读出则实行messageCallback_(),messageCallback_()实在就是我们作为用户利用TcpServer::setMessageCallback注册的回调函数,在此函数就负责业务逻辑处理,将读数据和业务处理解耦。
  1. void TcpConnection::handleRead(Timestamp receiveTime)
  2. {
  3.     int saveErrno = 0;
  4.     ssize_t n = inputBuffer_.readFd(channel_->fd(), &saveErrno);
  5.     if(n > 0)
  6.     {
  7.         // 已建立连接的用户,有可读事件发生
  8.         messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  9.     }
  10.     else if(n == 0)
  11.     {
  12.         // 客户端断开连接
  13.         handleClose();
  14.     }
  15.     else
  16.     {
  17.         errno = saveErrno;
  18.         LOG_ERROR("TcpConnection::handleRead");
  19.         handleError();
  20.     }
  21. }
复制代码
写事件

作为用户,我们渴望只调用简单的接口就可以实现将数据发送出去,而不用关心其内部细节。在Muduo中提供了如许简单的接口:void TcpConnection::send(std::string const &msg)
其内部帮我们实现了线程安全:


  • 若调用send的线程与loop_所属的线程雷同,则直接调用sendInLoop
  • 否则调用EventLoop::runInLoop,实在也等同于EventLoop::queueInLoop
  1. void TcpConnection::send(std::string const &msg)
  2. {
  3.     if(state_ == kConnected)
  4.     {
  5.         if(loop_->isInLoopThread())
  6.         {
  7.             // 在一个线程
  8.             sendInLoop(msg.c_str(), msg.size());   
  9.         }
  10.         else
  11.         {
  12.             loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, msg.c_str(), msg.size()));
  13.         }
  14.     }
  15. }
复制代码
Send只是向用户提供了一个向对端写数据的浅易接口,真正做出向对端socketwirte操作的是TcpConnection::sendInLoop。
此函数思量了从应用缓冲区向内核缓冲区拷贝与网卡从内核缓冲区发送数据之间的速度,设置了一个水位标记,只有应用缓冲区的数据量越过水位标记才会发送数据。
调用 sendInLoop 会通过系统调用 write 将数据发送到对端。如果数据全部发送乐成,则会触发回调函数 writeCompleteCallback_;如果未能全部发送完,则会将剩余数据存入写缓冲区,并为 Channel 设置 EPOLLOUT 事件。待内核缓冲区就绪时,Channel 会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite)来继承发送数据。
  1. /*
  2.     发送数据,应用程序写的快,内核发送慢,需要把带发送数据写入缓冲区,而且设置了水位回调
  3. */
  4. void TcpConnection::sendInLoop(const char *message, size_t len)
  5. {
  6.     ssize_t nwrote = 0;         // 本次已写字节
  7.     size_t remaining = len;     // 剩余字节
  8.     bool faultError = false;    // 是否发生错误
  9.     if(state_ == kDisconnected)
  10.     {
  11.         LOG_ERROR("disconnected, give up writing!");
  12.         return;
  13.     }
  14.     // 表示channel第一次写数据=> fd未注册写事件&&发送缓冲区可读字节为0
  15.     if(!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  16.     {
  17.         nwrote = ::write(channel_->fd(), message, len); // 向内核缓冲区写数据
  18.         if(nwrote >= 0)
  19.         {
  20.             remaining = len - nwrote;
  21.             if(remaining == 0 && writeCompleteCallback_)   
  22.             {
  23.                 // 表示数据已全部发送,调用发送完毕回调
  24.                 loop_->queueInLoop(
  25.                     std::bind(writeCompleteCallback_, shared_from_this())
  26.                 );
  27.             }
  28.         }
  29.         else // 出错
  30.         {
  31.             nwrote = 0;
  32.             if(errno != EWOULDBLOCK)
  33.             {
  34.                 LOG_ERROR("TcpConnection::sendInLoop");
  35.                 if(errno == EPIPE || errno == ECONNRESET)
  36.                 {
  37.                     faultError = true;
  38.                 }
  39.             }
  40.         }
  41.     }
  42.     /*
  43.     下面此判断逻辑说明:
  44.         1.当前这一次write并没有全部发送完毕,需要将剩余的数据保存到缓冲区outputBuffer_中
  45.         2.给Channel注册EPOLLOUT事件,poller发现tcp的发送缓冲区有空间,会通知sock - channel,调用writeCallback_回调
  46.         3.就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完为止
  47.     */
  48.     if(!faultError && remaining > 0)    // 这次write系统调用无错误 && 还有剩余数据待发送
  49.     {
  50.         /*
  51.             如果在某次调用sendInLoop并未一次性地把数据全部发送完,会把数据存到缓冲区;
  52.             待下一次调用sendInLoop会取到上次未读完的数据
  53.         */
  54.         size_t oldLen = outputBuffer_.readableBytes();  
  55.         if(oldLen + remaining >= highWaterMark_
  56.             && oldLen < highWaterMark_
  57.             && highWaterMarkCallback_)  
  58.             // 旧数据 + 此次未写数据 >= 高水位标志 && 旧数据 < 高水位标志
  59.             // => 意味着 此次未写数据要使写缓冲区待发送数据(缓冲区待发送数据 = 旧数据 + 此次未写数据)>=高水位标志
  60.         {
  61.             loop_->queueInLoop(
  62.                 std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)
  63.             );
  64.         }
  65.         // 将此次未写数据添加到 缓冲区
  66.         outputBuffer_.append(static_cast<const char*>(message) + nwrote, remaining);
  67.         if(!channel_->isWriting())
  68.         {
  69.             channel_->enableWriting();  // 设置EPOLLOUT事件
  70.         }
  71.     }
  72. }
复制代码
待内核缓冲区就绪时,Channel 会触发写事件,进而调用已注册的写回调函数(即 TcpConnection::handleWrite)来继承发送数据。
为什么在 handleWrite 中判断 state_ == kDisconnecting?
   写事件触发的机遇:当内核发送缓冲区有空间时,写事件会触发。这时,handleWrite 会实行继承发送缓冲区中的数据。
  判断是否完成发送:如果此时缓冲区中的数据已全部发送完成,而且连接状态是 kDisconnecting,阐明可以安全地关闭连接。
  1. void TcpConnection::handleWrite()
  2. {
  3.     if(channel_->isWriting())
  4.     {
  5.         int saveErrno = 0;
  6.         ssize_t n = outputBuffer_.writeFd(channel_->fd(), &saveErrno);
  7.         if(n > 0)
  8.         {
  9.             outputBuffer_.retrieve(n);
  10.             if(outputBuffer_.readableBytes() == 0) // 表示已发送完
  11.             {
  12.                 channel_->disableWriting();
  13.                 // 消息发送完之后的回调函数
  14.                 if(writeCompleteCallback_)
  15.                 {
  16.                     loop_->queueInLoop(
  17.                         std::bind(writeCompleteCallback_, shared_from_this())
  18.                     );
  19.                 }
  20.                 /*
  21.                     为什么要判断连接状态?
  22.                     1.保证在断开连接前,所有待发送的数据都已发送完毕。
  23.                     2.实现优雅关闭(半关闭)
  24.                 */
  25.                 if(state_ == kDisconnecting)
  26.                 {
  27.                     shutdownInLoop();
  28.                 }
  29.             }
  30.         }
  31.         else
  32.         {
  33.             LOG_ERROR("TcpConnection::handleWrite");
  34.         }
  35.     }
  36.     else
  37.     {
  38.         LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd());
  39.     }
  40. }
复制代码
关闭

作为用户,可以在恣意线程内(并非是TcpConnection所属的EventLoop线程内)调用 TcpConnection::shutdown() 来优雅地关闭与已连接客户端的连接。该方法会确保全部未发送的数据被完备发送后,再关闭连接的写端,从而实现对客户端的安全关闭操作。
  1. // 关闭连接
  2. void TcpConnection::。shutdown()
  3. {   
  4.     if(state_ == kConnected)
  5.     {
  6.         setState(kDisconnecting);
  7.         loop_->runInLoop(std::bind(
  8.             &TcpConnection::shutdownInLoop, this
  9.         ));
  10.     }
  11. }
复制代码
TcpConnection::shutdownInLoop 会确保在 EventLoop 所属的线程内实行,并起首检查数据是否已全部发送:


  • 若数据已全部发送:直接关闭写端,触发 EPOLLHUP 事件,关照对端连接已关闭。
  • 若数据未发送完:跳过关闭写端的逻辑,同时为 Channel 注册 EPOLLOUT 事件。随后,在 EventLoop 所属线程中,当内核发送缓冲区可用时触发写事件,实行 TcpConnection::handleWrite。
在 handleWrite 中:


  • 如果缓冲区中的数据被完全发送,则会再次调用 TcpConnection::shutdownInLoop,完成闭环。
  • 最终,当全部数据发送完毕,关闭写端并触发 EPOLLOUT 事件,实行 Channel 注册的关闭事件回调,完成优雅关闭流程。
  1. // 此方法会确保在EventLoop所属的线程内执行
  2. void TcpConnection::shutdownInLoop()
  3. {
  4.     if(!channel_->isWriting()) // 表示写缓冲区内的数据全部发送完
  5.     {
  6.         socket_->shutdownWrite();// 关闭写端,触发EPOLLHUP;
  7.         // =》channel::closeCallback_->TcpConnection::handleClose
  8.     }
  9. }
复制代码
在TcpConnection构造函数可知,Channel的关闭事件回调也就是TcpConnection::handleClose()。
  1. void TcpConnection::handleClose()
  2. {
  3.     LOG_INFO("TcpConnection::handleClose fd=%d state=%d", channel_->fd(), (int)state_);
  4.     state_ = kDisconnected;
  5.     channel_->disableAll();
  6.     TcpConnectionPtr connPtr(shared_from_this());
  7.     connectionCallback_(connPtr);   // 执行连接关闭的回调
  8.     closeCallback_(connPtr);        // 执行关闭连接的回调
  9. }
复制代码
connectionCallback_为用户注册的回调函数,closeCallaback_为TcpServer注册的回调函数,最终会调用TcpConnection::connectDestroyed。
  1. // 连接销毁
  2. void TcpConnection::connectDestroyed()
  3. {
  4.     if(state_ == kConnected)
  5.     {
  6.         setState(kDisconnected);
  7.         channel_->disableAll(); // 把channel的所以有感兴趣的事件从Poller中delete
  8.         connectionCallback_(shared_from_this());
  9.     }
  10.     channel_->remove();// 在Poller中移除remove
  11. }
复制代码
错误

发生错误时,即Channel触发了EPOLLERR事件,会调用Channel中注册的错误回调函数,此函数是在TcpConnection构造函数中设置的。
  1. void TcpConnection::handleError()
  2. {
  3.     int optval;
  4.     socklen_t optlen = sizeof optval;
  5.     int err;
  6.     if(::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
  7.     {
  8.         err = errno;
  9.     }
  10.     else
  11.     {
  12.         err = optval;
  13.     }
  14.     LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d", name_.c_str(), err);
  15. }
复制代码
至此,TcpConnection 的核心部分以及与其他组件的关联已经完备串联和解析。
TcpServer

TcpServer是Muduo网络库的最顶层模块,它抽象了服务端的网络通信流程,包括监听端口、接收客户端连接、创建 TcpConnection 实例,以及管理多个连接的生命周期和事件回调。
具体如何利用它来构建一个建议聊天服务器,大家可以看看我的这篇文章Muduo架构计划剖析
TcpServer的组成

  • Acceptor

    • 负责监听指定的地点和端口,并接收新连接。
       

    • 为每个连接分配一个文件形貌符(fd),并将其交给主线程或线程池中的事件循环(EventLoop)处理。

  • EventLoop

    • MainLoop,管理 TcpServer 和全部连接的事件驱动逻辑。

  • EventLoopThreadPool

    • 可以将新连接分配给不同的线程,进步并发处理本领。

  • 用户回调函数

    • 用户可以通过 TcpServer 注册各种回调函数(如连接回调、消息回调、写完成回调等),用于处理应用层逻辑。

类图如下:

Tcpconnection.h

  1. class TcpServer : noncopyable
  2. {
  3. public:
  4.     using ThreadInitCallback = std::function<void(EventLoop*)>;
  5.     enum Option
  6.     {
  7.         KnoReusePort,
  8.         kReusePort
  9.     };
  10.     TcpServer(EventLoop* loop,
  11.                 const InetAddress& listenAddr,
  12.                 const std::string& argName,
  13.                 Option option = KnoReusePort);
  14.    
  15.     ~TcpServer();
  16.     void setThreadInitCallback(const ThreadInitCallback& cb) { threadInitCallback_ = std::move(cb); }
  17.     void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = std::move(cb); }
  18.     void setMessageCallback(const MessageCallback& cb) { messageCallback_ = std::move(cb); }
  19.     void setWriteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = std::move(cb); }
  20.     // 设置subloop数量
  21.     void setThreadNum(int numThreads);
  22.     // 开启监听
  23.     void start();
  24. private:
  25.     void NewConnection(int sockfd, const InetAddress& peerAddr);
  26.     void removeConnection(const TcpConnectionPtr& conn);
  27.     void removeConnectionInLoop(const TcpConnectionPtr& conn);
  28. private:
  29.     using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;
  30.     EventLoop* loop_;   //用户定义的mainloop
  31.     const std::string ipPort_;
  32.     const std::string name_;
  33.     std::unique_ptr<Acceptor> accpetor_;
  34.     std::shared_ptr<EventLoopThreadPool> threadPool_;   // one loop per thread
  35.     ConnectionCallback connectionCallback_; // 有新连接的回调
  36.     MessageCallback messageCallback_;       // 有读写消息的回调
  37.     WriteCompleteCallback writeCompleteCallback_;    // 消息发送完成以后的回调
  38.     ThreadInitCallback threadInitCallback_; // loop线程初始化回调
  39.     std::atomic_int  started_;
  40.     int nextConnId_;
  41.     ConnectionMap connections_;
  42. };
复制代码
TcpConnection.cc

构造函数

  1. static EventLoop* CheckLoopNotNull(EventLoop* loop)
  2. {
  3.     if(loop == nullptr)
  4.     {
  5.         LOG_FATAL("%s:%s:%d mainloop is null!", __FILE__, __FUNCTION__, __LINE__);
  6.     }
  7.     return loop;
  8. }
  9. TcpServer::TcpServer(EventLoop *loop, InetAddress const &listenAddr, const std::string& argName, Option option) :
  10.     loop_(loop),
  11.     ipPort_(listenAddr.toIpPort()),
  12.     name_(argName),
  13.     accpetor_(new Acceptor(loop, listenAddr, option==kReusePort)),
  14.     threadPool_(new EventLoopThreadPool(loop, name_)),
  15.     connectionCallback_(),
  16.     messageCallback_(),
  17.     started_(0),
  18.     nextConnId_(1)
  19. {
  20.     // 当有新用户连接时,此函数作为回调函数
  21.     accpetor_->setNewConnectionCallback(
  22.         std::bind(&TcpServer::NewConnection,
  23.         this, std::placeholders::_1, std::placeholders::_2));
  24. }
复制代码
新用户连接回调函数

  1. // 在构造TcpServer时,创建了accpetor_,并把TcpServer::NewConnection绑定到Acceptor
  2. // 当有新连接时->在MainLoop中的Acceptor::handleRead()->TcpServer::NewConnection
  3. void TcpServer::NewConnection(int sockfd, InetAddress const &peerAddr)
  4. {
  5.     // 根据轮询算法选择一个subloop来管理对应的channel
  6.     EventLoop* ioloop = threadPool_->getNextLoop();
  7.     char buf[64] = {0};
  8.     snprintf(buf, sizeof(buf), "-%s#%d", ipPort_.c_str(), nextConnId_);
  9.     ++nextConnId_;
  10.     // TcpConnection的名字
  11.     std::string connName = name_ + buf;
  12.     LOG_INFO("TcpServer::newConnecton [%s] - new connection[%s] from %s\n",
  13.         name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()
  14.     );
  15.    // 通过sofkfd,获取其绑定的本地的ip地址和端口
  16.    sockaddr_in local;
  17.    ::bzero(&local, sizeof local);
  18.    socklen_t addrlen = sizeof local;
  19.    if(::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
  20.    {
  21.         LOG_ERROR("sockets::getLocalAddr");
  22.    }
  23.    InetAddress localAddr(local);
  24.    // 根据连接成功的sockfd, 创建TcpConnection对象
  25.    TcpConnectionPtr conn(
  26.         new TcpConnection(
  27.             ioloop,
  28.             connName,
  29.             sockfd,
  30.             localAddr,
  31.             peerAddr
  32.     ));
  33.     connections_[connName] = conn;
  34.     // 以下回调都是用户设置给 TcpServer
  35.     // TcpServer -> Channel -> poller => notify channel 调用回调
  36.     conn->setConnectionCallback(connectionCallback_);
  37.     conn->setMessageCallback(messageCallback_);
  38.     conn->setWriteCompleteCallback(writeCompleteCallback_);
  39.    
  40.     conn->setCloseCallback(
  41.         std::bind(&TcpServer::removeConnection,
  42.         this, std::placeholders::_1)
  43.     );
  44.     // 执行此语句时是在mainLoop,将其入队到ioloop的任务队列,调用TcpConnection::connectionEstablish
  45.     ioloop->runInLoop(
  46.         std::bind(&TcpConnection::connectEstablished,
  47.         conn)
  48.     );
  49. }
复制代码
构造完TcpConnection并设置好回调函数后,会在MainLoop中实行TcpConnection::connectEstablished表示此TCP连接建立乐成。
移除连接

TcpServer为每个TcpConnction设置了关闭回调conn->setCloseCallback(),为其绑定的函数是TcpServer::removeConnection,也就意味着当socket关闭时,就会触发EPOLLEHUP事件,Channel会调用其关闭回调函数,这个关闭回调函数就是TcpServer::removeConnection。
此函数会在其他线程调用(即不是loop_绑定的线程),通过runInLoop函数进而一定会在MainLoop中实行TcpServer::removeConnectionInLoop。
  1. void TcpServer::removeConnection(TcpConnectionPtr const &conn)
  2. {
  3.     loop_->runInLoop(
  4.         std::bind(&TcpServer::removeConnectionInLoop, this, conn)
  5.     );
  6. }
复制代码
在MainLoop运行的移除TCP连接的操作
此函数会在MainLoop上运行,就是移除connections_中的映射关系。但此中的TcpConnection::connectDestroyed会在TcpConnection对应的SubLoop中运行。
  1. void TcpServer::removeConnectionInLoop(TcpConnectionPtr const &conn)
  2. {
  3.     LOG_INFO("TcpServer::removeConnection [%s] - connection %s",
  4.         name_.c_str(), conn->name().c_str());
  5.     connections_.erase(conn->name());
  6.     EventLoop* ioLoop = conn->getLoop();
  7.     ioLoop->queueInLoop(
  8.         std::bind(&TcpConnection::connectDestroyed, conn)
  9.     );
  10. }
复制代码
设置SubLoop数量

  1. void TcpServer::setThreadNum(int numThreads)
  2. {
  3.     threadPool_->setThreadNum(numThreads);
  4. }
复制代码
开始监听客户端连接

  1. void TcpServer::start()
  2. {
  3.     if(started_++ == 0)// 防止一个TCPServer对象被start多次
  4.     {
  5.         // 启动底层的线程池
  6.         threadPool_->start(threadInitCallback_);
  7.         loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
  8.     }
  9. }
复制代码
设置SubLoop数量

  1. void TcpServer::setThreadNum(int numThreads)
  2. {
  3.     threadPool_->setThreadNum(numThreads);
  4. }
复制代码
开始监听客户端连接

  1. void TcpServer::start()
  2. {
  3.     if(started_++ == 0)// 防止一个TCPServer对象被start多次
  4.     {
  5.         // 启动底层的线程池
  6.         threadPool_->start(threadInitCallback_);
  7.         loop_->runInLoop(std::bind(&Acceptor::listen, accpetor_.get()));
  8.     }
  9. }
复制代码
多线程模型



  • 主线程

    • 负责管理 Acceptor 和接受新连接。
    • 新连接被分配给线程池中的工作线程。

  • 工作线程

    • 每个线程运行一个独立的 EventLoop,处理分配到的 TcpConnection 的读写事件和回调逻辑。

这种计划实现了主线程的轻量化,同时利用线程池处理大量并发连接。
总结

TcpServer 是 Muduo 的核心组件之一,为用户提供了简单易用的接口来实现高效的 TCP 服务器。它将底层复杂的网络操作(如 socket、epoll 等)封装为事件驱动的编程模型,并支持多线程,从而利用户能够专注于实现业务逻辑。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天空闲话

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表