马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一.TcpConnection 成员及方法
源码:
TcpConnection.h
- // Copyright 2010, Shuo Chen. All rights reserved.
- // http://code.google.com/p/muduo/
- //
- // Use of this source code is governed by a BSD-style license
- // that can be found in the License file.
- // Author: Shuo Chen (chenshuo at chenshuo dot com)
- //
- // This is a public header file, it must only include public header files.
- #ifndef MUDUO_NET_TCPCONNECTION_H
- #define MUDUO_NET_TCPCONNECTION_H
- #include "muduo/base/noncopyable.h"
- #include "muduo/base/StringPiece.h"
- #include "muduo/base/Types.h"
- #include "muduo/net/Callbacks.h"
- #include "muduo/net/Buffer.h"
- #include "muduo/net/InetAddress.h"
- #include <memory>
- #include <boost/any.hpp>
- // struct tcp_info is in <netinet/tcp.h>
- struct tcp_info;
- namespace muduo
- {
- namespace net
- {
- class Channel;
- class EventLoop;
- class Socket;
- ///
- /// TCP connection, for both client and server usage.
- ///
- /// This is an interface class, so don't expose too much details.
- class TcpConnection : noncopyable,
- public std::enable_shared_from_this<TcpConnection>
- {
- public:
- /// Constructs a TcpConnection with a connected sockfd
- ///
- /// User should not create this object.
- TcpConnection(EventLoop* loop,
- const string& name,
- int sockfd,
- const InetAddress& localAddr,
- const InetAddress& peerAddr);
- ~TcpConnection();
- EventLoop* getLoop() const { return loop_; }
- const string& name() const { return name_; }
- const InetAddress& localAddress() const { return localAddr_; }
- const InetAddress& peerAddress() const { return peerAddr_; }
- bool connected() const { return state_ == kConnected; }
- bool disconnected() const { return state_ == kDisconnected; }
- // return true if success.
- bool getTcpInfo(struct tcp_info*) const;
- string getTcpInfoString() const;
- // void send(string&& message); // C++11
- void send(const void* message, int len);
- void send(const StringPiece& message);
- // void send(Buffer&& message); // C++11
- void send(Buffer* message); // this one will swap data
- void shutdown(); // NOT thread safe, no simultaneous calling
- // void shutdownAndForceCloseAfter(double seconds); // NOT thread safe, no simultaneous calling
- void forceClose();
- void forceCloseWithDelay(double seconds);
- void setTcpNoDelay(bool on);
- // reading or not
- void startRead();
- void stopRead();
- bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop
- void setContext(const boost::any& context)
- { context_ = context; }
- const boost::any& getContext() const
- { return context_; }
- boost::any* getMutableContext()
- { return &context_; }
- void setConnectionCallback(const ConnectionCallback& cb)
- { connectionCallback_ = cb; }
- void setMessageCallback(const MessageCallback& cb)
- { messageCallback_ = cb; }
- void setWriteCompleteCallback(const WriteCompleteCallback& cb)
- { writeCompleteCallback_ = cb; }
- void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
- { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
- /// Advanced interface
- Buffer* inputBuffer()
- { return &inputBuffer_; }
- Buffer* outputBuffer()
- { return &outputBuffer_; }
- /// Internal use only.
- void setCloseCallback(const CloseCallback& cb)
- { closeCallback_ = cb; }
- // called when TcpServer accepts a new connection
- void connectEstablished(); // should be called only once
- // called when TcpServer has removed me from its map
- void connectDestroyed(); // should be called only once
- private:
- enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
- void handleRead(Timestamp receiveTime);
- void handleWrite();
- void handleClose();
- void handleError();
- // void sendInLoop(string&& message);
- void sendInLoop(const StringPiece& message);
- void sendInLoop(const void* message, size_t len);
- void shutdownInLoop();
- // void shutdownAndForceCloseInLoop(double seconds);
- void forceCloseInLoop();
- void setState(StateE s) { state_ = s; }
- const char* stateToString() const;
- void startReadInLoop();
- void stopReadInLoop();
- EventLoop* loop_;
- const string name_;
- StateE state_; // FIXME: use atomic variable
- bool reading_;
- // we don't expose those classes to client.
- std::unique_ptr<Socket> socket_;
- std::unique_ptr<Channel> channel_;
- const InetAddress localAddr_;
- const InetAddress peerAddr_;
- ConnectionCallback connectionCallback_;
- MessageCallback messageCallback_;
- WriteCompleteCallback writeCompleteCallback_;
- HighWaterMarkCallback highWaterMarkCallback_;
- CloseCallback closeCallback_;
- size_t highWaterMark_;
- Buffer inputBuffer_;
- Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
- boost::any context_;
- // FIXME: creationTime_, lastReceiveTime_
- // bytesReceived_, bytesSent_
- };
- typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
- } // namespace net
- } // namespace muduo
- #endif // MUDUO_NET_TCPCONNECTION_H
复制代码 TcpConnection.cc
- // Copyright 2010, Shuo Chen. All rights reserved.
- // http://code.google.com/p/muduo/
- //
- // Use of this source code is governed by a BSD-style license
- // that can be found in the License file.
- // Author: Shuo Chen (chenshuo at chenshuo dot com)
- #include "muduo/net/TcpConnection.h"
- #include "muduo/base/Logging.h"
- #include "muduo/base/WeakCallback.h"
- #include "muduo/net/Channel.h"
- #include "muduo/net/EventLoop.h"
- #include "muduo/net/Socket.h"
- #include "muduo/net/SocketsOps.h"
- #include <errno.h>
- using namespace muduo;
- using namespace muduo::net;
- void muduo::net::defaultConnectionCallback(const TcpConnectionPtr& conn)
- {
- LOG_TRACE << conn->localAddress().toIpPort() << " -> "
- << conn->peerAddress().toIpPort() << " is "
- << (conn->connected() ? "UP" : "DOWN");
- // do not call conn->forceClose(), because some users want to register message callback only.
- }
- void muduo::net::defaultMessageCallback(const TcpConnectionPtr&,
- Buffer* buf,
- Timestamp)
- {
- buf->retrieveAll();
- }
- TcpConnection::TcpConnection(EventLoop* loop,
- const string& nameArg,
- int sockfd,
- const InetAddress& localAddr,
- const InetAddress& peerAddr)
- : loop_(CHECK_NOTNULL(loop)),
- name_(nameArg),
- state_(kConnecting),
- reading_(true),
- socket_(new Socket(sockfd)),
- channel_(new Channel(loop, sockfd)),
- localAddr_(localAddr),
- peerAddr_(peerAddr),
- highWaterMark_(64*1024*1024)
- {
- channel_->setReadCallback(
- std::bind(&TcpConnection::handleRead, this, _1));
- channel_->setWriteCallback(
- std::bind(&TcpConnection::handleWrite, this));
- channel_->setCloseCallback(
- std::bind(&TcpConnection::handleClose, this));
- channel_->setErrorCallback(
- std::bind(&TcpConnection::handleError, this));
- LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
- << " fd=" << sockfd;
- socket_->setKeepAlive(true);
- }
- TcpConnection::~TcpConnection()
- {
- LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this
- << " fd=" << channel_->fd()
- << " state=" << stateToString();
- assert(state_ == kDisconnected);
- }
- bool TcpConnection::getTcpInfo(struct tcp_info* tcpi) const
- {
- return socket_->getTcpInfo(tcpi);
- }
- string TcpConnection::getTcpInfoString() const
- {
- char buf[1024];
- buf[0] = '\0';
- socket_->getTcpInfoString(buf, sizeof buf);
- return buf;
- }
- void TcpConnection::send(const void* data, int len)
- {
- send(StringPiece(static_cast<const char*>(data), len));
- }
- void TcpConnection::send(const StringPiece& message)
- {
- if (state_ == kConnected)
- {
- if (loop_->isInLoopThread())
- {
- sendInLoop(message);
- }
- else
- {
- void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
- loop_->runInLoop(
- std::bind(fp,
- this, // FIXME
- message.as_string()));
- //std::forward<string>(message)));
- }
- }
- }
- // FIXME efficiency!!!
- void TcpConnection::send(Buffer* buf)
- {
- if (state_ == kConnected)
- {
- if (loop_->isInLoopThread())
- {
- sendInLoop(buf->peek(), buf->readableBytes());
- buf->retrieveAll();
- }
- else
- {
- void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
- loop_->runInLoop(
- std::bind(fp,
- this, // FIXME
- buf->retrieveAllAsString()));
- //std::forward<string>(message)));
- }
- }
- }
- void TcpConnection::sendInLoop(const StringPiece& message)
- {
- sendInLoop(message.data(), message.size());
- }
- void TcpConnection::sendInLoop(const void* data, size_t len)
- {
- loop_->assertInLoopThread();
- ssize_t nwrote = 0;
- size_t remaining = len;
- bool faultError = false;
- if (state_ == kDisconnected)
- {
- LOG_WARN << "disconnected, give up writing";
- return;
- }
- // if no thing in output queue, try writing directly
- if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
- {
- nwrote = sockets::write(channel_->fd(), data, len);
- if (nwrote >= 0)
- {
- remaining = len - nwrote;
- if (remaining == 0 && writeCompleteCallback_)
- {
- loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
- }
- }
- else // nwrote < 0
- {
- nwrote = 0;
- if (errno != EWOULDBLOCK)
- {
- LOG_SYSERR << "TcpConnection::sendInLoop";
- if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
- {
- faultError = true;
- }
- }
- }
- }
- assert(remaining <= len);
- if (!faultError && remaining > 0)
- {
- size_t oldLen = outputBuffer_.readableBytes();
- if (oldLen + remaining >= highWaterMark_
- && oldLen < highWaterMark_
- && highWaterMarkCallback_)
- {
- loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
- }
- outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
- if (!channel_->isWriting())
- {
- channel_->enableWriting();
- }
- }
- }
- void TcpConnection::shutdown()
- {
- // FIXME: use compare and swap
- if (state_ == kConnected)
- {
- setState(kDisconnecting);
- // FIXME: shared_from_this()?
- loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
- }
- }
- void TcpConnection::shutdownInLoop()
- {
- loop_->assertInLoopThread();
- if (!channel_->isWriting())
- {
- // we are not writing
- socket_->shutdownWrite();
- }
- }
- // void TcpConnection::shutdownAndForceCloseAfter(double seconds)
- // {
- // // FIXME: use compare and swap
- // if (state_ == kConnected)
- // {
- // setState(kDisconnecting);
- // loop_->runInLoop(std::bind(&TcpConnection::shutdownAndForceCloseInLoop, this, seconds));
- // }
- // }
- // void TcpConnection::shutdownAndForceCloseInLoop(double seconds)
- // {
- // loop_->assertInLoopThread();
- // if (!channel_->isWriting())
- // {
- // // we are not writing
- // socket_->shutdownWrite();
- // }
- // loop_->runAfter(
- // seconds,
- // makeWeakCallback(shared_from_this(),
- // &TcpConnection::forceCloseInLoop));
- // }
- void TcpConnection::forceClose()
- {
- // FIXME: use compare and swap
- if (state_ == kConnected || state_ == kDisconnecting)
- {
- setState(kDisconnecting);
- loop_->queueInLoop(std::bind(&TcpConnection::forceCloseInLoop, shared_from_this()));
- }
- }
- void TcpConnection::forceCloseWithDelay(double seconds)
- {
- if (state_ == kConnected || state_ == kDisconnecting)
- {
- setState(kDisconnecting);
- loop_->runAfter(
- seconds,
- makeWeakCallback(shared_from_this(),
- &TcpConnection::forceClose)); // not forceCloseInLoop to avoid race condition
- }
- }
- void TcpConnection::forceCloseInLoop()
- {
- loop_->assertInLoopThread();
- if (state_ == kConnected || state_ == kDisconnecting)
- {
- // as if we received 0 byte in handleRead();
- handleClose();
- }
- }
- const char* TcpConnection::stateToString() const
- {
- switch (state_)
- {
- case kDisconnected:
- return "kDisconnected";
- case kConnecting:
- return "kConnecting";
- case kConnected:
- return "kConnected";
- case kDisconnecting:
- return "kDisconnecting";
- default:
- return "unknown state";
- }
- }
- void TcpConnection::setTcpNoDelay(bool on)
- {
- socket_->setTcpNoDelay(on);
- }
- void TcpConnection::startRead()
- {
- loop_->runInLoop(std::bind(&TcpConnection::startReadInLoop, this));
- }
- void TcpConnection::startReadInLoop()
- {
- loop_->assertInLoopThread();
- if (!reading_ || !channel_->isReading())
- {
- channel_->enableReading();
- reading_ = true;
- }
- }
- void TcpConnection::stopRead()
- {
- loop_->runInLoop(std::bind(&TcpConnection::stopReadInLoop, this));
- }
- void TcpConnection::stopReadInLoop()
- {
- loop_->assertInLoopThread();
- if (reading_ || channel_->isReading())
- {
- channel_->disableReading();
- reading_ = false;
- }
- }
- void TcpConnection::connectEstablished()
- {
- loop_->assertInLoopThread();
- assert(state_ == kConnecting);
- setState(kConnected);
- channel_->tie(shared_from_this());
- channel_->enableReading();
- connectionCallback_(shared_from_this());
- }
- void TcpConnection::connectDestroyed()
- {
- loop_->assertInLoopThread();
- if (state_ == kConnected)
- {
- setState(kDisconnected);
- channel_->disableAll();
- connectionCallback_(shared_from_this());
- }
- channel_->remove();
- }
- void TcpConnection::handleRead(Timestamp receiveTime)
- {
- loop_->assertInLoopThread();
- int savedErrno = 0;
- ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
- if (n > 0)
- {
- messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
- }
- else if (n == 0)
- {
- handleClose();
- }
- else
- {
- errno = savedErrno;
- LOG_SYSERR << "TcpConnection::handleRead";
- handleError();
- }
- }
- void TcpConnection::handleWrite()
- {
- loop_->assertInLoopThread();
- if (channel_->isWriting())
- {
- ssize_t n = sockets::write(channel_->fd(),
- outputBuffer_.peek(),
- outputBuffer_.readableBytes());
- if (n > 0)
- {
- outputBuffer_.retrieve(n);
- if (outputBuffer_.readableBytes() == 0)
- {
- channel_->disableWriting();
- if (writeCompleteCallback_)
- {
- loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
- }
- if (state_ == kDisconnecting)
- {
- shutdownInLoop();
- }
- }
- }
- else
- {
- LOG_SYSERR << "TcpConnection::handleWrite";
- // if (state_ == kDisconnecting)
- // {
- // shutdownInLoop();
- // }
- }
- }
- else
- {
- LOG_TRACE << "Connection fd = " << channel_->fd()
- << " is down, no more writing";
- }
- }
- void TcpConnection::handleClose()
- {
- loop_->assertInLoopThread();
- LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
- assert(state_ == kConnected || state_ == kDisconnecting);
- // we don't close fd, leave it to dtor, so we can find leaks easily.
- setState(kDisconnected);
- channel_->disableAll();
- TcpConnectionPtr guardThis(shared_from_this());
- connectionCallback_(guardThis);
- // must be the last line
- closeCallback_(guardThis);
- }
- void TcpConnection::handleError()
- {
- int err = sockets::getSocketError(channel_->fd());
- LOG_ERROR << "TcpConnection::handleError [" << name_
- << "] - SO_ERROR = " << err << " " << strerror_tl(err);
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |