立聪堂德州十三局店 发表于 2025-3-6 07:10:11

muduo库源码分析:TcpConnection 类

一.TcpConnection 成员及方法

https://i-blog.csdnimg.cn/direct/d81bf91cbe58496e89827ccdd3a6bd0f.png
源码:

TcpConnection.h

// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)
//
// This is a public header file, it must only include public header files.

#ifndef MUDUO_NET_TCPCONNECTION_H
#define MUDUO_NET_TCPCONNECTION_H

#include "muduo/base/noncopyable.h"
#include "muduo/base/StringPiece.h"
#include "muduo/base/Types.h"
#include "muduo/net/Callbacks.h"
#include "muduo/net/Buffer.h"
#include "muduo/net/InetAddress.h"

#include <memory>

#include <boost/any.hpp>

// struct tcp_info is in <netinet/tcp.h>
struct tcp_info;

namespace muduo
{
namespace net
{

class Channel;
class EventLoop;
class Socket;

///
/// TCP connection, for both client and server usage.
///
/// This is an interface class, so don't expose too much details.
class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection>
{
public:
/// Constructs a TcpConnection with a connected sockfd
///
/// User should not create this object.
TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }
bool connected() const { return state_ == kConnected; }
bool disconnected() const { return state_ == kDisconnected; }
// return true if success.
bool getTcpInfo(struct tcp_info*) const;
string getTcpInfoString() const;

// void send(string&& message); // C++11
void send(const void* message, int len);
void send(const StringPiece& message);
// void send(Buffer&& message); // C++11
void send(Buffer* message);// this one will swap data
void shutdown(); // NOT thread safe, no simultaneous calling
// void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
void forceClose();
void forceCloseWithDelay(double seconds);
void setTcpNoDelay(bool on);
// reading or not
void startRead();
void stopRead();
bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop

void setContext(const boost::any& context)
{ context_ = context; }

const boost::any& getContext() const
{ return context_; }

boost::any* getMutableContext()
{ return &context_; }

void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

/// Advanced interface
Buffer* inputBuffer()
{ return &inputBuffer_; }

Buffer* outputBuffer()
{ return &outputBuffer_; }

/// Internal use only.
void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// called when TcpServer accepts a new connection
void connectEstablished();   // should be called only once
// called when TcpServer has removed me from its map
void connectDestroyed();// should be called only once

private:
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
// void sendInLoop(string&& message);
void sendInLoop(const StringPiece& message);
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
// void shutdownAndForceCloseInLoop(double seconds);
void forceCloseInLoop();
void setState(StateE s) { state_ = s; }
const char* stateToString() const;
void startReadInLoop();
void stopReadInLoop();

EventLoop* loop_;
const string name_;
StateE state_;// FIXME: use atomic variable
bool reading_;
// we don't expose those classes to client.
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localAddr_;
const InetAddress peerAddr_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;
Buffer inputBuffer_;
Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
boost::any context_;
// FIXME: creationTime_, lastReceiveTime_
//      bytesReceived_, bytesSent_
};

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;

}// namespace net
}// namespace muduo

#endif// MUDUO_NET_TCPCONNECTION_H
TcpConnection.cc

// Copyright 2010, Shuo Chen.All rights reserved.
// http://code.google.com/p/muduo/
//
// Use of this source code is governed by a BSD-style license
// that can be found in the License file.

// Author: Shuo Chen (chenshuo at chenshuo dot com)

#include "muduo/net/TcpConnection.h"

#include "muduo/base/Logging.h"
#include "muduo/base/WeakCallback.h"
#include "muduo/net/Channel.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/Socket.h"
#include "muduo/net/SocketsOps.h"

#include <errno.h>

using namespace muduo;
using namespace muduo::net;

void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
{
LOG_TRACE << conn->localAddress().toIpPort() << " -> "
            << conn->peerAddress().toIpPort() << " is "
            << (conn->connected() ? "UP" : "DOWN");
// do not call conn->forceClose(), because some users want to register message callback only.
}

void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
                                        Buffer* buf,
                                        Timestamp)
{
buf->retrieveAll();
}

TcpConnection::TcpConnection(EventLoop* loop,
                           const string& nameArg,
                           int sockfd,
                           const InetAddress& localAddr,
                           const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
      std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
      std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
      std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
      std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" <<name_ << "] at " << this
            << " fd=" << sockfd;
socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{
LOG_DEBUG << "TcpConnection::dtor[" <<name_ << "] at " << this
            << " fd=" << channel_->fd()
            << " state=" << stateToString();
assert(state_ == kDisconnected);
}

bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const
{
return socket_->getTcpInfo(tcpi);
}

string TcpConnection::getTcpInfoString() const
{
char buf;
buf = '\0';
socket_->getTcpInfoString(buf, sizeof buf);
return buf;
}

void TcpConnection::send(const void* data, int len)
{
send(StringPiece(static_cast<const char*>(data), len));
}

void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
    if (loop_->isInLoopThread())
    {
      sendInLoop(message);
    }
    else
    {
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                  this,   // FIXME
                  message.as_string()));
                  //std::forward<string>(message)));
    }
}
}

// FIXME efficiency!!!
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
    if (loop_->isInLoopThread())
    {
      sendInLoop(buf->peek(), buf->readableBytes());
      buf->retrieveAll();
    }
    else
    {
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                  this,   // FIXME
                  buf->retrieveAllAsString()));
                  //std::forward<string>(message)));
    }
}
}

void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}

void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
    LOG_WARN << "disconnected, give up writing";
    return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)
      {
      loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
      LOG_SYSERR << "TcpConnection::sendInLoop";
      if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
      {
          faultError = true;
      }
      }
    }
}

assert(remaining <= len);
if (!faultError && remaining > 0)
{
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
      && oldLen < highWaterMark_
      && highWaterMarkCallback_)
    {
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();
    }
}
}

void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}

void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
    // we are not writing
    socket_->shutdownWrite();
}
}

// void TcpConnection::shutdownAndForceCloseAfter(double seconds)
// {
//   // FIXME: use compare and swap
//   if (state_ == kConnected)
//   {
//   setState(kDisconnecting);
//   loop_->runInLoop(std::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
//   }
// }

// void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
// {
//   loop_->assertInLoopThread();
//   if (!channel_->isWriting())
//   {
//   // we are not writing
//   socket_->shutdownWrite();
//   }
//   loop_->runAfter(
//       seconds,
//       makeWeakCallback(shared_from_this(),
//                        &TcpConnection::forceCloseInLoop));
// }

void TcpConnection::forceClose()
{
// FIXME: use compare and swap
if (state_ == kConnected || state_ == kDisconnecting)
{
    setState(kDisconnecting);
    loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
}
}

void TcpConnection::forceCloseWithDelay(double seconds)
{
if (state_ == kConnected || state_ == kDisconnecting)
{
    setState(kDisconnecting);
    loop_->runAfter(
      seconds,
      makeWeakCallback(shared_from_this(),
                         &TcpConnection::forceClose));// not forceCloseInLoop to avoid race condition
}
}

void TcpConnection::forceCloseInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnected || state_ == kDisconnecting)
{
    // as if we received 0 byte in handleRead();
    handleClose();
}
}

const char* TcpConnection::stateToString() const
{
switch (state_)
{
    case kDisconnected:
      return "kDisconnected";
    case kConnecting:
      return "kConnecting";
    case kConnected:
      return "kConnected";
    case kDisconnecting:
      return "kDisconnecting";
    default:
      return "unknown state";
}
}

void TcpConnection::setTcpNoDelay(bool on)
{
socket_->setTcpNoDelay(on);
}

void TcpConnection::startRead()
{
loop_->runInLoop(std::bind(&TcpConnection::startReadInLoop, this));
}

void TcpConnection::startReadInLoop()
{
loop_->assertInLoopThread();
if (!reading_ || !channel_->isReading())
{
    channel_->enableReading();
    reading_ = true;
}
}

void TcpConnection::stopRead()
{
loop_->runInLoop(std::bind(&TcpConnection::stopReadInLoop, this));
}

void TcpConnection::stopReadInLoop()
{
loop_->assertInLoopThread();
if (reading_ || channel_->isReading())
{
    channel_->disableReading();
    reading_ = false;
}
}

void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();

connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
}
channel_->remove();
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
    handleClose();
}
else
{
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
}
}

void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)
      {
      channel_->disableWriting();
      if (writeCompleteCallback_)
      {
          loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
      if (state_ == kDisconnecting)
      {
          shutdownInLoop();
      }
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
}
else
{
    LOG_TRACE << "Connection fd = " << channel_->fd()
            << " is down, no more writing";
}
}

void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);
// must be the last line
closeCallback_(guardThis);
}

void TcpConnection::handleError()
{
int err = sockets::getSocketError(channel_->fd());
LOG_ERROR << "TcpConnection::handleError [" << name_
            << "] - SO_ERROR = " << err << " " << strerror_tl(err);
}



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: muduo库源码分析:TcpConnection 类