IT评测·应用市场-qidao123.com技术社区

标题: muduo库源码分析:TcpConnection 类 [打印本页]

作者: 立聪堂德州十三局店    时间: 2025-3-6 07:10
标题: muduo库源码分析:TcpConnection 类
一.TcpConnection 成员及方法


源码:

TcpConnection.h

  1. // Copyright 2010, Shuo Chen.  All rights reserved.
  2. // http://code.google.com/p/muduo/
  3. //
  4. // Use of this source code is governed by a BSD-style license
  5. // that can be found in the License file.
  6. // Author: Shuo Chen (chenshuo at chenshuo dot com)
  7. //
  8. // This is a public header file, it must only include public header files.
  9. #ifndef MUDUO_NET_TCPCONNECTION_H
  10. #define MUDUO_NET_TCPCONNECTION_H
  11. #include "muduo/base/noncopyable.h"
  12. #include "muduo/base/StringPiece.h"
  13. #include "muduo/base/Types.h"
  14. #include "muduo/net/Callbacks.h"
  15. #include "muduo/net/Buffer.h"
  16. #include "muduo/net/InetAddress.h"
  17. #include <memory>
  18. #include <boost/any.hpp>
  19. // struct tcp_info is in <netinet/tcp.h>
  20. struct tcp_info;
  21. namespace muduo
  22. {
  23. namespace net
  24. {
  25. class Channel;
  26. class EventLoop;
  27. class Socket;
  28. ///
  29. /// TCP connection, for both client and server usage.
  30. ///
  31. /// This is an interface class, so don't expose too much details.
  32. class TcpConnection : noncopyable,
  33.                       public std::enable_shared_from_this<TcpConnection>
  34. {
  35. public:
  36.   /// Constructs a TcpConnection with a connected sockfd
  37.   ///
  38.   /// User should not create this object.
  39.   TcpConnection(EventLoop* loop,
  40.                 const string& name,
  41.                 int sockfd,
  42.                 const InetAddress& localAddr,
  43.                 const InetAddress& peerAddr);
  44.   ~TcpConnection();
  45.   EventLoop* getLoop() const { return loop_; }
  46.   const string& name() const { return name_; }
  47.   const InetAddress& localAddress() const { return localAddr_; }
  48.   const InetAddress& peerAddress() const { return peerAddr_; }
  49.   bool connected() const { return state_ == kConnected; }
  50.   bool disconnected() const { return state_ == kDisconnected; }
  51.   // return true if success.
  52.   bool getTcpInfo(struct tcp_info*) const;
  53.   string getTcpInfoString() const;
  54.   // void send(string&& message); // C++11
  55.   void send(const void* message, int len);
  56.   void send(const StringPiece& message);
  57.   // void send(Buffer&& message); // C++11
  58.   void send(Buffer* message);  // this one will swap data
  59.   void shutdown(); // NOT thread safe, no simultaneous calling
  60.   // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
  61.   void forceClose();
  62.   void forceCloseWithDelay(double seconds);
  63.   void setTcpNoDelay(bool on);
  64.   // reading or not
  65.   void startRead();
  66.   void stopRead();
  67.   bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
  68.   void setContext(const boost::any& context)
  69.   { context_ = context; }
  70.   const boost::any& getContext() const
  71.   { return context_; }
  72.   boost::any* getMutableContext()
  73.   { return &context_; }
  74.   void setConnectionCallback(const ConnectionCallback& cb)
  75.   { connectionCallback_ = cb; }
  76.   void setMessageCallback(const MessageCallback& cb)
  77.   { messageCallback_ = cb; }
  78.   void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  79.   { writeCompleteCallback_ = cb; }
  80.   void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  81.   { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
  82.   /// Advanced interface
  83.   Buffer* inputBuffer()
  84.   { return &inputBuffer_; }
  85.   Buffer* outputBuffer()
  86.   { return &outputBuffer_; }
  87.   /// Internal use only.
  88.   void setCloseCallback(const CloseCallback& cb)
  89.   { closeCallback_ = cb; }
  90.   // called when TcpServer accepts a new connection
  91.   void connectEstablished();   // should be called only once
  92.   // called when TcpServer has removed me from its map
  93.   void connectDestroyed();  // should be called only once
  94. private:
  95.   enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  96.   void handleRead(Timestamp receiveTime);
  97.   void handleWrite();
  98.   void handleClose();
  99.   void handleError();
  100.   // void sendInLoop(string&& message);
  101.   void sendInLoop(const StringPiece& message);
  102.   void sendInLoop(const void* message, size_t len);
  103.   void shutdownInLoop();
  104.   // void shutdownAndForceCloseInLoop(double seconds);
  105.   void forceCloseInLoop();
  106.   void setState(StateE s) { state_ = s; }
  107.   const char* stateToString() const;
  108.   void startReadInLoop();
  109.   void stopReadInLoop();
  110.   EventLoop* loop_;
  111.   const string name_;
  112.   StateE state_;  // FIXME: use atomic variable
  113.   bool reading_;
  114.   // we don't expose those classes to client.
  115.   std::unique_ptr<Socket> socket_;
  116.   std::unique_ptr<Channel> channel_;
  117.   const InetAddress localAddr_;
  118.   const InetAddress peerAddr_;
  119.   ConnectionCallback connectionCallback_;
  120.   MessageCallback messageCallback_;
  121.   WriteCompleteCallback writeCompleteCallback_;
  122.   HighWaterMarkCallback highWaterMarkCallback_;
  123.   CloseCallback closeCallback_;
  124.   size_t highWaterMark_;
  125.   Buffer inputBuffer_;
  126.   Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
  127.   boost::any context_;
  128.   // FIXME: creationTime_, lastReceiveTime_
  129.   //        bytesReceived_, bytesSent_
  130. };
  131. typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
  132. }  // namespace net
  133. }  // namespace muduo
  134. #endif  // MUDUO_NET_TCPCONNECTION_H
复制代码
TcpConnection.cc

  1. // Copyright 2010, Shuo Chen.  All rights reserved.
  2. // http://code.google.com/p/muduo/
  3. //
  4. // Use of this source code is governed by a BSD-style license
  5. // that can be found in the License file.
  6. // Author: Shuo Chen (chenshuo at chenshuo dot com)
  7. #include "muduo/net/TcpConnection.h"
  8. #include "muduo/base/Logging.h"
  9. #include "muduo/base/WeakCallback.h"
  10. #include "muduo/net/Channel.h"
  11. #include "muduo/net/EventLoop.h"
  12. #include "muduo/net/Socket.h"
  13. #include "muduo/net/SocketsOps.h"
  14. #include <errno.h>
  15. using namespace muduo;
  16. using namespace muduo::net;
  17. void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
  18. {
  19.   LOG_TRACE << conn->localAddress().toIpPort() << " -> "
  20.             << conn->peerAddress().toIpPort() << " is "
  21.             << (conn->connected() ? "UP" : "DOWN");
  22.   // do not call conn->forceClose(), because some users want to register message callback only.
  23. }
  24. void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
  25.                                         Buffer* buf,
  26.                                         Timestamp)
  27. {
  28.   buf->retrieveAll();
  29. }
  30. TcpConnection::TcpConnection(EventLoop* loop,
  31.                              const string& nameArg,
  32.                              int sockfd,
  33.                              const InetAddress& localAddr,
  34.                              const InetAddress& peerAddr)
  35.   : loop_(CHECK_NOTNULL(loop)),
  36.     name_(nameArg),
  37.     state_(kConnecting),
  38.     reading_(true),
  39.     socket_(new Socket(sockfd)),
  40.     channel_(new Channel(loop, sockfd)),
  41.     localAddr_(localAddr),
  42.     peerAddr_(peerAddr),
  43.     highWaterMark_(64*1024*1024)
  44. {
  45.   channel_->setReadCallback(
  46.       std::bind(&TcpConnection::handleRead, this, _1));
  47.   channel_->setWriteCallback(
  48.       std::bind(&TcpConnection::handleWrite, this));
  49.   channel_->setCloseCallback(
  50.       std::bind(&TcpConnection::handleClose, this));
  51.   channel_->setErrorCallback(
  52.       std::bind(&TcpConnection::handleError, this));
  53.   LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
  54.             << " fd=" << sockfd;
  55.   socket_->setKeepAlive(true);
  56. }
  57. TcpConnection::~TcpConnection()
  58. {
  59.   LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
  60.             << " fd=" << channel_->fd()
  61.             << " state=" << stateToString();
  62.   assert(state_ == kDisconnected);
  63. }
  64. bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const
  65. {
  66.   return socket_->getTcpInfo(tcpi);
  67. }
  68. string TcpConnection::getTcpInfoString() const
  69. {
  70.   char buf[1024];
  71.   buf[0] = '\0';
  72.   socket_->getTcpInfoString(buf, sizeof buf);
  73.   return buf;
  74. }
  75. void TcpConnection::send(const void* data, int len)
  76. {
  77.   send(StringPiece(static_cast<const char*>(data), len));
  78. }
  79. void TcpConnection::send(const StringPiece& message)
  80. {
  81.   if (state_ == kConnected)
  82.   {
  83.     if (loop_->isInLoopThread())
  84.     {
  85.       sendInLoop(message);
  86.     }
  87.     else
  88.     {
  89.       void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
  90.       loop_->runInLoop(
  91.           std::bind(fp,
  92.                     this,     // FIXME
  93.                     message.as_string()));
  94.                     //std::forward<string>(message)));
  95.     }
  96.   }
  97. }
  98. // FIXME efficiency!!!
  99. void TcpConnection::send(Buffer* buf)
  100. {
  101.   if (state_ == kConnected)
  102.   {
  103.     if (loop_->isInLoopThread())
  104.     {
  105.       sendInLoop(buf->peek(), buf->readableBytes());
  106.       buf->retrieveAll();
  107.     }
  108.     else
  109.     {
  110.       void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
  111.       loop_->runInLoop(
  112.           std::bind(fp,
  113.                     this,     // FIXME
  114.                     buf->retrieveAllAsString()));
  115.                     //std::forward<string>(message)));
  116.     }
  117.   }
  118. }
  119. void TcpConnection::sendInLoop(const StringPiece& message)
  120. {
  121.   sendInLoop(message.data(), message.size());
  122. }
  123. void TcpConnection::sendInLoop(const void* data, size_t len)
  124. {
  125.   loop_->assertInLoopThread();
  126.   ssize_t nwrote = 0;
  127.   size_t remaining = len;
  128.   bool faultError = false;
  129.   if (state_ == kDisconnected)
  130.   {
  131.     LOG_WARN << "disconnected, give up writing";
  132.     return;
  133.   }
  134.   // if no thing in output queue, try writing directly
  135.   if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  136.   {
  137.     nwrote = sockets::write(channel_->fd(), data, len);
  138.     if (nwrote >= 0)
  139.     {
  140.       remaining = len - nwrote;
  141.       if (remaining == 0 && writeCompleteCallback_)
  142.       {
  143.         loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
  144.       }
  145.     }
  146.     else // nwrote < 0
  147.     {
  148.       nwrote = 0;
  149.       if (errno != EWOULDBLOCK)
  150.       {
  151.         LOG_SYSERR << "TcpConnection::sendInLoop";
  152.         if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
  153.         {
  154.           faultError = true;
  155.         }
  156.       }
  157.     }
  158.   }
  159.   assert(remaining <= len);
  160.   if (!faultError && remaining > 0)
  161.   {
  162.     size_t oldLen = outputBuffer_.readableBytes();
  163.     if (oldLen + remaining >= highWaterMark_
  164.         && oldLen < highWaterMark_
  165.         && highWaterMarkCallback_)
  166.     {
  167.       loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
  168.     }
  169.     outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
  170.     if (!channel_->isWriting())
  171.     {
  172.       channel_->enableWriting();
  173.     }
  174.   }
  175. }
  176. void TcpConnection::shutdown()
  177. {
  178.   // FIXME: use compare and swap
  179.   if (state_ == kConnected)
  180.   {
  181.     setState(kDisconnecting);
  182.     // FIXME: shared_from_this()?
  183.     loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
  184.   }
  185. }
  186. void TcpConnection::shutdownInLoop()
  187. {
  188.   loop_->assertInLoopThread();
  189.   if (!channel_->isWriting())
  190.   {
  191.     // we are not writing
  192.     socket_->shutdownWrite();
  193.   }
  194. }
  195. // void TcpConnection::shutdownAndForceCloseAfter(double seconds)
  196. // {
  197. //   // FIXME: use compare and swap
  198. //   if (state_ == kConnected)
  199. //   {
  200. //     setState(kDisconnecting);
  201. //     loop_->runInLoop(std::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
  202. //   }
  203. // }
  204. // void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
  205. // {
  206. //   loop_->assertInLoopThread();
  207. //   if (!channel_->isWriting())
  208. //   {
  209. //     // we are not writing
  210. //     socket_->shutdownWrite();
  211. //   }
  212. //   loop_->runAfter(
  213. //       seconds,
  214. //       makeWeakCallback(shared_from_this(),
  215. //                        &TcpConnection::forceCloseInLoop));
  216. // }
  217. void TcpConnection::forceClose()
  218. {
  219.   // FIXME: use compare and swap
  220.   if (state_ == kConnected || state_ == kDisconnecting)
  221.   {
  222.     setState(kDisconnecting);
  223.     loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
  224.   }
  225. }
  226. void TcpConnection::forceCloseWithDelay(double seconds)
  227. {
  228.   if (state_ == kConnected || state_ == kDisconnecting)
  229.   {
  230.     setState(kDisconnecting);
  231.     loop_->runAfter(
  232.         seconds,
  233.         makeWeakCallback(shared_from_this(),
  234.                          &TcpConnection::forceClose));  // not forceCloseInLoop to avoid race condition
  235.   }
  236. }
  237. void TcpConnection::forceCloseInLoop()
  238. {
  239.   loop_->assertInLoopThread();
  240.   if (state_ == kConnected || state_ == kDisconnecting)
  241.   {
  242.     // as if we received 0 byte in handleRead();
  243.     handleClose();
  244.   }
  245. }
  246. const char* TcpConnection::stateToString() const
  247. {
  248.   switch (state_)
  249.   {
  250.     case kDisconnected:
  251.       return "kDisconnected";
  252.     case kConnecting:
  253.       return "kConnecting";
  254.     case kConnected:
  255.       return "kConnected";
  256.     case kDisconnecting:
  257.       return "kDisconnecting";
  258.     default:
  259.       return "unknown state";
  260.   }
  261. }
  262. void TcpConnection::setTcpNoDelay(bool on)
  263. {
  264.   socket_->setTcpNoDelay(on);
  265. }
  266. void TcpConnection::startRead()
  267. {
  268.   loop_->runInLoop(std::bind(&TcpConnection::startReadInLoop, this));
  269. }
  270. void TcpConnection::startReadInLoop()
  271. {
  272.   loop_->assertInLoopThread();
  273.   if (!reading_ || !channel_->isReading())
  274.   {
  275.     channel_->enableReading();
  276.     reading_ = true;
  277.   }
  278. }
  279. void TcpConnection::stopRead()
  280. {
  281.   loop_->runInLoop(std::bind(&TcpConnection::stopReadInLoop, this));
  282. }
  283. void TcpConnection::stopReadInLoop()
  284. {
  285.   loop_->assertInLoopThread();
  286.   if (reading_ || channel_->isReading())
  287.   {
  288.     channel_->disableReading();
  289.     reading_ = false;
  290.   }
  291. }
  292. void TcpConnection::connectEstablished()
  293. {
  294.   loop_->assertInLoopThread();
  295.   assert(state_ == kConnecting);
  296.   setState(kConnected);
  297.   channel_->tie(shared_from_this());
  298.   channel_->enableReading();
  299.   connectionCallback_(shared_from_this());
  300. }
  301. void TcpConnection::connectDestroyed()
  302. {
  303.   loop_->assertInLoopThread();
  304.   if (state_ == kConnected)
  305.   {
  306.     setState(kDisconnected);
  307.     channel_->disableAll();
  308.     connectionCallback_(shared_from_this());
  309.   }
  310.   channel_->remove();
  311. }
  312. void TcpConnection::handleRead(Timestamp receiveTime)
  313. {
  314.   loop_->assertInLoopThread();
  315.   int savedErrno = 0;
  316.   ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  317.   if (n > 0)
  318.   {
  319.     messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  320.   }
  321.   else if (n == 0)
  322.   {
  323.     handleClose();
  324.   }
  325.   else
  326.   {
  327.     errno = savedErrno;
  328.     LOG_SYSERR << "TcpConnection::handleRead";
  329.     handleError();
  330.   }
  331. }
  332. void TcpConnection::handleWrite()
  333. {
  334.   loop_->assertInLoopThread();
  335.   if (channel_->isWriting())
  336.   {
  337.     ssize_t n = sockets::write(channel_->fd(),
  338.                                outputBuffer_.peek(),
  339.                                outputBuffer_.readableBytes());
  340.     if (n > 0)
  341.     {
  342.       outputBuffer_.retrieve(n);
  343.       if (outputBuffer_.readableBytes() == 0)
  344.       {
  345.         channel_->disableWriting();
  346.         if (writeCompleteCallback_)
  347.         {
  348.           loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
  349.         }
  350.         if (state_ == kDisconnecting)
  351.         {
  352.           shutdownInLoop();
  353.         }
  354.       }
  355.     }
  356.     else
  357.     {
  358.       LOG_SYSERR << "TcpConnection::handleWrite";
  359.       // if (state_ == kDisconnecting)
  360.       // {
  361.       //   shutdownInLoop();
  362.       // }
  363.     }
  364.   }
  365.   else
  366.   {
  367.     LOG_TRACE << "Connection fd = " << channel_->fd()
  368.               << " is down, no more writing";
  369.   }
  370. }
  371. void TcpConnection::handleClose()
  372. {
  373.   loop_->assertInLoopThread();
  374.   LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  375.   assert(state_ == kConnected || state_ == kDisconnecting);
  376.   // we don't close fd, leave it to dtor, so we can find leaks easily.
  377.   setState(kDisconnected);
  378.   channel_->disableAll();
  379.   TcpConnectionPtr guardThis(shared_from_this());
  380.   connectionCallback_(guardThis);
  381.   // must be the last line
  382.   closeCallback_(guardThis);
  383. }
  384. void TcpConnection::handleError()
  385. {
  386.   int err = sockets::getSocketError(channel_->fd());
  387.   LOG_ERROR << "TcpConnection::handleError [" << name_
  388.             << "] - SO_ERROR = " << err << " " << strerror_tl(err);
  389. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/) Powered by Discuz! X3.4