ToB企服应用市场:ToB评测及商务社交产业平台

标题: C++网络编程之利用Boost库搭建简单的异步服务器 [打印本页]

作者: 千千梦丶琪    时间: 2024-8-11 11:15
标题: C++网络编程之利用Boost库搭建简单的异步服务器
C++网络编程之利用Boost库搭建简单的异步服务器

技能概述


技能详述

配置

由于主要在Windows下编程训练,所以只先容Windows下如和配置Boost库。
下载库


项目中配置Boost库


注意,自建属性页目次要和当前项目的版本同等,如当前是Debug的x64程序,自建属性页就要在Debug | x64目次下。

末了的配置过程和配置项目的属性页一样。
利用Boost库

俗话说得好,饭要一口一口吃,所以先从Boost库中网络编程的比较主要和重要的开始利用。演示程序就是一个回显客户端消息的服务器。架构也很简单。下面我将自顶向下的先容怎样完成其编写及其细节。在此之前,必须要注意到的是,异步操作的执行时机总是不确定,所以代码中会大量地用到shared_ptr等智能指针来主动托管一个对象的生命周期。
基本架构


main

  1. #include "Server.h"
  2. int main(void) {
  3.         try {
  4.         //创建一个boost库的IO服务,这是boost使用库进行各种IO操作的核心,io_context的旧版本是io_service
  5.                 boost::asio::io_context ioc;
  6.         //实例化化Server类
  7.                 Server server(ioc, 10086);
  8.         //启动IO服务
  9.                 ioc.run();
  10.         }
  11.         catch (std::exception& e) {
  12.                 std::cerr << "Exception: " << e.what() << std::endl;
  13.         }
  14.         return 0;
  15. }
复制代码
Server

头文件

  1. #pragma once
  2. #include <boost/asio.hpp>
  3. #include <iostream>
  4. #include "Session.h"
  5. #include <memory>
  6. #include <map>
  7. #include "Session.h"
  8. class Session;
  9. class Server
  10. {
  11. public:
  12.     //构造函数,引用一个IO服务,同时指定监听的端口
  13.         Server(boost::asio::io_context& ioc, unsigned short port);
  14.     //按照Session的uuid删除uuid
  15.         void ClearSession(const std::string& uuid) {
  16.                 _sessions.erase(uuid);
  17.         }
  18. private:
  19.     //开始一个异步的监听socket连接
  20.         void start_accept();
  21.     //处理收到的socket连接
  22.         void handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec);
  23.        
  24.     //io_context不支持拷贝构造,使用引用方式
  25.         boost::asio::io_context& _ioc;
  26.     //用于监听连接
  27.         boost::asio::ip::tcp::acceptor _acceptor;
  28.         //通过智能指针延长Session对象的生命周期
  29.         std::map<std::string, std::shared_ptr<Session>> _sessions;
  30. };
复制代码
定义代码

  1. #include "Server.h"
  2. //注意这里的_acceptor的构造方式,接受一个io_context,以及一个端点。这个端点通过一个默认可用的ip和指定端口进行构造。
  3. Server::Server(boost::asio::io_context& ioc, unsigned short port): _ioc(ioc),
  4.         _acceptor(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {
  5.         std::cout << "Server starts on success, using port: " << port << std::endl;
  6.     //启动监听
  7.     //这么说不是一个严谨的说法,稍后将看到为什么
  8.         start_accept();
  9. }
  10. void Server::start_accept() {
  11.         //创建一个Session,注意到这里的构造又用到了io_context
  12.         auto new_session = std::make_shared<Session>(_ioc, this);
  13.     //这里_acceptor执行异步的accept函数
  14.     //这里做的事情不是accept,而是往io_context中注册一个accept事件,以及这个事件的处理函数
  15.         _acceptor.async_accept(
  16.                 new_session->Socket(), //传入Session的socket,事件发生时,_acceptor会初始化成连接进来的socket
  17.                 std::bind( //修饰处理收到连接的方法
  18.                         &Server::handle_accept,
  19.                         this,
  20.                         new_session,
  21.                         std::placeholders::_1
  22.                 )
  23.         );
  24. }
  25. void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec) {
  26.         if (ec) {
  27.                 std::cout << "Error occured in accepting. Errmessage: " << ec.message() << std::endl;
  28.                 //delete new_session;
  29.         }
  30.         else { //没有错误
  31.         //启动Session
  32.                 new_session->Start();
  33.                 _sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
  34.         }
  35.        
  36.     //这里无论成功还是失败,都要重新注册一个监听事件给io_context
  37.     //以便能够继续监听连接
  38.         start_accept();
  39. }
复制代码
源程序

Session

在处置惩罚读写数据时,我们不知道客户端会发多少数据来,同时还要处置惩罚数据粘包的题目。所以服务器和服务端之间的通信采用tlv(type length value)的协议举行通信。由于是回显服务器,type可用省略,在处置惩罚读到的数据时,要先解析头部,再读取头部看客户端发来多少数据。
头文件

  1. #pragma once
  2. #include <iostream>
  3. #include <boost/asio.hpp>
  4. #include <boost/uuid/uuid_generators.hpp>
  5. #include <boost/uuid/uuid_io.hpp>
  6. #include "Server.h"
  7. #include <mutex>
  8. #include <queue>
  9. #include "MsgNode.h"
  10. #include <iomanip>
  11. class Server;
  12. class MsgNode;
  13. //注意到这里继承了std::enable_shared_from_this<Session>
  14. //这样可以使得类内部能够使用shared_from_this()统一获取this指针的shared_ptr,shared_ptr的唯一性,防止不同的shared_ptr托管同一个this指针的资源
  15. class Session: public std::enable_shared_from_this<Session>
  16. {
  17. public:
  18.     //构造函数
  19.         Session(boost::asio::io_context& ioc, Server* server);
  20.         boost::asio::ip::tcp::socket& Socket();
  21.     //启动读写
  22.         void Start();
  23.     //发送数据
  24.         void Send(char* buf, std::size_t len);
  25.     //返回uuid
  26.         const std::string& GetUuid() {
  27.                 return _uuid;
  28.         }
  29.         ~Session() {
  30.                 std::cout << "delete session: uuid=" << _uuid << std::endl;
  31.         }
  32. private:
  33.     //处理读事件
  34.         void HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr);
  35.     //处理写事件
  36.         void HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr);
  37.         boost::asio::ip::tcp::socket _socket;
  38.         enum {max_length = 1024};
  39.     //接受数据的缓冲区
  40.         char _data[max_length] = {0};
  41.     //发送队列
  42.         std::queue<std::shared_ptr<MsgNode>> _send_que;
  43.     //队列的锁
  44.         std::mutex _send_lock;
  45.     //用于标识连接
  46.         std::string _uuid;
  47.     //引用server,以便能够在读写出错时,清除自身的连接
  48.         Server* _server;
  49.     //接受消息的节点
  50.         std::shared_ptr<MsgNode> _recv_msg_node;
  51.     //标识消息头部是否解析完成
  52.         bool _b_head_parse;
  53.     //头部消息节点
  54.         std::shared_ptr<MsgNode> _recv_head_node;
  55. };
复制代码
定义代码

  1. #include "Session.h"
  2. Session::Session(boost::asio::io_context& ioc, Server* server) : _socket(ioc),
  3.         _server(server), _b_head_parse(false) {
  4.     //为Session生成uuid   
  5.         boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
  6.         _uuid = boost::uuids::to_string(a_uuid);
  7.     //初始化消息头,通常头的长度是一个short类型的长度
  8.         _recv_head_node = std::make_shared<MsgNode>(HEAD_LENGTH);
  9. }
  10. //返回使用的socket
  11. boost::asio::ip::tcp::socket& Session::Socket() {
  12.         return _socket;
  13. }
  14. void Session::Start() {
  15.         //对象创建时已初始化_data
  16.     //往io_context注册读事件,区别于另一个async_read函数,这里是能读就返回
  17.         _socket.async_read_some(
  18.                 boost::asio::buffer( //构造一个buffer
  19.                         _data,
  20.                         max_length
  21.                 ),
  22.                 std::bind( //修饰处理
  23.                         &Session::HandleRead,
  24.                         this,
  25.                         std::placeholders::_1,
  26.                         std::placeholders::_2,
  27.                         //std::make_shared<Session>(this)导致引用计数不同步
  28.                         shared_from_this()
  29.                 )
  30.         );
  31. }
  32. void Session::Send(char* buf, std::size_t len) {
  33.     bool pending = false;
  34.         std::lock_guard<std::mutex> guard(_send_lock);
  35.     //如果在为加入消息节点之前,队列不为空,说明这个队列的消息正在发送(写完一个消息后,写事件的处理函数还会继续注册写事件,直到队列为空,所以,如果此时有消息,就说明后续还会继续回调写事件,不再需要在这里注册写事件)
  36.         if (_send_que.size() > 0) {
  37.                 pending = true;
  38.         }
  39.     //往队列投递消息
  40.         _send_que.emplace(std::make_shared<MsgNode>(buf, len));
  41.         if (pending) {
  42.                 return;
  43.         }
  44.     //注册写事件
  45.         boost::asio::async_write(
  46.                 _socket,
  47.                 boost::asio::buffer(
  48.                         buf,
  49.                         len
  50.                 ),
  51.                 std::bind(
  52.                         &Session::HandleWrite,
  53.                         this,
  54.                         std::placeholders::_1,
  55.                         shared_from_this()
  56.                 )
  57.         );
  58. }
  59. //由于这里使用async_read_some,一读到数据就返回,所以处理的过程比较繁琐。还可以可以使用async_read,读取指定字节在返回,其内部实现依然是异步的,和async_read_some一样高效。
  60. void Session::HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr) {
  61.         if (ec) {
  62.                 std::cout << "Error occured in reading. ErrMessage: " << ec.message() << std::endl;
  63.        
  64.                 _server->ClearSession(_uuid);
  65.         }
  66.         else {
  67.                 int copy_len = 0;
  68.                 while (bytes_transferred > 0) {
  69.                         if (!_b_head_parse) {
  70.                                 //收到的数据不足头部大小
  71.                                 if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
  72.                                         memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
  73.                                         _recv_head_node->_cur_len += bytes_transferred;
  74.                                         ::memset(_data, 0, max_length);
  75.                                         _socket.async_read_some(boost::asio::buffer(_data, max_length
  76.                                         ),
  77.                                                 std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
  78.                                         return;
  79.                                 }
  80.                                 //收到的数据比头部多
  81.                                 //头部剩余未复制的长度
  82.                                 int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
  83.                                 memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
  84.                                 //更新已处理的data长度和剩余未处理的长度
  85.                                 copy_len += head_remain;
  86.                                 bytes_transferred -= head_remain;
  87.                                 //获取头部数据
  88.                                 int data_len = 0;
  89.                                 memcpy(&data_len, _recv_head_node->_msg, HEAD_LENGTH);
  90.                                 std::cout << "data_len is " << data_len << std::endl;
  91.                                 //头部长度非法
  92.                                 if (data_len > max_length) {
  93.                                         std::cout << "invalid data length is " << data_len << std::endl;
  94.                                         _server->ClearSession(_uuid);
  95.                                         return;
  96.                                 }
  97.                                 _recv_msg_node = std::make_shared<MsgNode>(data_len);
  98.                                 //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
  99.                                 if (bytes_transferred < data_len) {
  100.                                         memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  101.                                         _recv_msg_node->_cur_len += bytes_transferred;
  102.                                         ::memset(_data, 0, max_length);
  103.                                         _socket.async_read_some(boost::asio::buffer(_data, max_length),
  104.                                                 std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
  105.                                         //头部处理完成
  106.                                         _b_head_parse = true;
  107.                                         return;
  108.                                 }
  109.                                 memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);
  110.                                 _recv_msg_node->_cur_len += data_len;
  111.                                 copy_len += data_len;
  112.                                 bytes_transferred -= data_len;
  113.                                 _recv_msg_node->_msg[_recv_msg_node->_tot_len] = '\0';
  114.                                 std::cout << "receive data is " << _recv_msg_node->_msg << std::endl;
  115.                                 //此处可以调用Send发送测试
  116.                                 Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len);
  117.                                 //继续轮询剩余未处理数据
  118.                                 _b_head_parse = false;
  119.                                 _recv_head_node->Clear();
  120.                                 if (bytes_transferred <= 0) {
  121.                                         ::memset(_data, 0, max_length);
  122.                                         _socket.async_read_some(boost::asio::buffer(_data, max_length),
  123.                                                 std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
  124.                                         return;
  125.                                 }
  126.                                 continue;
  127.                         }
  128.                         //已经处理完头部,处理上次未接受完的消息数据
  129.                         //接收的数据仍不足剩余未处理的
  130.                         int remain_msg = _recv_msg_node->_tot_len - _recv_msg_node->_cur_len;
  131.                         if (bytes_transferred < remain_msg) {
  132.                                 memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
  133.                                 _recv_msg_node->_cur_len += bytes_transferred;
  134.                                 ::memset(_data, 0, max_length);
  135.                                 _socket.async_read_some(boost::asio::buffer(_data, max_length),
  136.                                         std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
  137.                                 return;
  138.                         }
  139.                         memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
  140.                         _recv_msg_node->_cur_len += remain_msg;
  141.                         bytes_transferred -= remain_msg;
  142.                         copy_len += remain_msg;
  143.                         _recv_msg_node->_msg[_recv_msg_node->_tot_len] = '\0';
  144.                         std::cout << "receive data is " << _recv_msg_node->_msg << std::endl;
  145.                         //此处可以调用Send发送测试
  146.                         Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len);
  147.                         //继续轮询剩余未处理数据
  148.                         _b_head_parse = false;
  149.                         _recv_head_node->Clear();
  150.                         if (bytes_transferred <= 0) {
  151.                                 ::memset(_data, 0, max_length);
  152.                                 _socket.async_read_some(boost::asio::buffer(_data, max_length),
  153.                                         std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
  154.                                 return;
  155.                         }
  156.                         continue;
  157.                 }
  158.         }
  159. }
  160. //处理写事件
  161. void Session::HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr) {
  162.         if (ec) {
  163.                 std::cout << "Error occured in writting. ErrMessage: " << ec.message() << std::endl;
  164.                 //delete this;
  165.                 _server->ClearSession(_uuid);
  166.         }
  167.         else {
  168.                 std::lock_guard<std::mutex> guard(_send_lock);
  169.                 _send_que.pop(); //使用async_write,保证发完指定的字节数
  170.                 if (!_send_que.empty()) {
  171.                         auto& msg = _send_que.front();
  172.                         boost::asio::async_write(
  173.                                 _socket,
  174.                                 boost::asio::buffer(
  175.                                         msg->_msg,
  176.                                         msg->_tot_len
  177.                                 ), //使用async_write,可以不做偏移
  178.                                 std::bind(
  179.                                         &Session::HandleWrite,
  180.                                         this,
  181.                                         std::placeholders::_1,
  182.                                         _self_ptr
  183.                                 )
  184.                         );
  185.                 }
  186.         }
  187. }
复制代码
MsgNode

  1. #pragma once
  2. #include <memory>
  3. #include "Session.h"
  4. const int HEAD_LENGTH = sizeof(int);
  5. class MsgNode
  6. {
  7. public:
  8.         friend class Session;
  9.         MsgNode(char* msg, int tot_len) :_cur_len(0), _tot_len(tot_len) {
  10.                 _msg = new char[_tot_len + 1];
  11.                 memcpy(_msg, &tot_len, HEAD_LENGTH);
  12.                 memcpy(_msg + HEAD_LENGTH, msg, _tot_len);
  13.                 _msg[_tot_len] = '\0';
  14.         }
  15.         MsgNode(int tot_len) : _cur_len(0), _tot_len(tot_len) {
  16.                 _msg = new char[_tot_len + 1];
  17.                 memset(_msg, 0, _tot_len + 1);
  18.         }
  19.         ~MsgNode() {
  20.                 delete[] _msg;
  21.                 _msg = nullptr;
  22.         }
  23.         void Clear() {
  24.                 memset(_msg, 0, _tot_len);
  25.                 _cur_len = 0;
  26.         }
  27. private:
  28.         char* _msg;
  29.         int _cur_len;
  30.         int _tot_len;
  31. };
复制代码
技能利用中碰到的题目息争决过程。

题目

实际上,在项目开始时尽管知道了异步函数执行的时机不能确定,但是怎样延长Session的生命周期使其恰到利益的和注册的读写事件同步,也就是读写事件发生时,Session必须有用,它的读写事件不会再注册后,必须回收它的资源。
解决过程


举行总结。

根据自己的学习路线,这属于项目实战的内容。尽管只先容了一个简单的回显服务器,但是这个例子已经能够很好的体现异步服务器的层次和框架。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4