千千梦丶琪 发表于 2024-8-11 11:15:55

C++网络编程之利用Boost库搭建简单的异步服务器

C++网络编程之利用Boost库搭建简单的异步服务器

技能概述



[*]做什么的:C++ Boost库是一个开源的C++软件库集合,旨在提升C++的标准库的性能和功能。在C++编程中,Boost库通常用于解决一些通用但复杂的题目。
[*]应用:网络通信、异步IO操作、跨平台方案、进步完成任务效率。
[*]学习动机:扩展标准库的功能、进步开发效率。
[*]技能难点:API复杂,学习成本高;相对于主流技能,学习资源少。
技能详述

配置

由于主要在Windows下编程训练,所以只先容Windows下如和配置Boost库。
下载库



[*] Boost库官网下载
https://img-blog.csdnimg.cn/direct/0890897194c843449822a67bba57559d.png#pic_center
[*] 解压后文件夹下有个一个bootstrap.bat文件,双击运行会生成b2.exe
https://img-blog.csdnimg.cn/direct/e1ace97538b44f98ae710fafde5b2f27.png#pic_center
[*] 然后在boost文件夹下启动cmd,执行 “.\b2.exe toolset=msvc”。执行编译事后,会在stage文件夹下生成lib文件夹,内里就是我们要用到的lib库。
项目中配置Boost库



[*] 右键点击项目,选择“属性”打开项目的属性页。在VC++的包含目次中配置boost库的目次位置(就是下载后解压的位置),库目次配置boost库中的stage\lib即可。
https://img-blog.csdnimg.cn/direct/d1136db99f5f40f8b728537ff926e571.png#pic_center
[*] 或者可以打开属性管理器,自建属性页配置(自建属性页配置好后会有一个文件,可以复用到其他项目,自建属性页所在目次要和)
https://img-blog.csdnimg.cn/direct/f9b7ae03cd1143b59525daa0dc674cd3.png#pic_center
注意,自建属性页目次要和当前项目的版本同等,如当前是Debug的x64程序,自建属性页就要在Debug | x64目次下。
https://img-blog.csdnimg.cn/direct/6c56cf944c5040c4ab1d74a58a953f5a.png#pic_center
末了的配置过程和配置项目的属性页一样。
利用Boost库

俗话说得好,饭要一口一口吃,所以先从Boost库中网络编程的比较主要和重要的开始利用。演示程序就是一个回显客户端消息的服务器。架构也很简单。下面我将自顶向下的先容怎样完成其编写及其细节。在此之前,必须要注意到的是,异步操作的执行时机总是不确定,所以代码中会大量地用到shared_ptr等智能指针来主动托管一个对象的生命周期。
基本架构

https://img-blog.csdnimg.cn/direct/df8258906c47464f9f72f602c07b7dad.png#pic_center
main

#include "Server.h"

int main(void) {

        try {
      //创建一个boost库的IO服务,这是boost使用库进行各种IO操作的核心,io_context的旧版本是io_service
                boost::asio::io_context ioc;
      //实例化化Server类
                Server server(ioc, 10086);
      //启动IO服务
                ioc.run();
        }
        catch (std::exception& e) {
                std::cerr << "Exception: " << e.what() << std::endl;
        }

        return 0;
}
Server

头文件

#pragma once
#include <boost/asio.hpp>
#include <iostream>
#include "Session.h"
#include <memory>
#include <map>
#include "Session.h"

class Session;

class Server
{
public:
    //构造函数,引用一个IO服务,同时指定监听的端口
        Server(boost::asio::io_context& ioc, unsigned short port);

    //按照Session的uuid删除uuid
        void ClearSession(const std::string& uuid) {
                _sessions.erase(uuid);
        }

private:
    //开始一个异步的监听socket连接
        void start_accept();
    //处理收到的socket连接
        void handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec);
       
    //io_context不支持拷贝构造,使用引用方式
        boost::asio::io_context& _ioc;
    //用于监听连接
        boost::asio::ip::tcp::acceptor _acceptor;

        //通过智能指针延长Session对象的生命周期
        std::map<std::string, std::shared_ptr<Session>> _sessions;

};
定义代码

#include "Server.h"
//注意这里的_acceptor的构造方式,接受一个io_context,以及一个端点。这个端点通过一个默认可用的ip和指定端口进行构造。
Server::Server(boost::asio::io_context& ioc, unsigned short port): _ioc(ioc),
        _acceptor(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {


        std::cout << "Server starts on success, using port: " << port << std::endl;

    //启动监听
    //这么说不是一个严谨的说法,稍后将看到为什么
        start_accept();

}

void Server::start_accept() {
        //创建一个Session,注意到这里的构造又用到了io_context
        auto new_session = std::make_shared<Session>(_ioc, this);
    //这里_acceptor执行异步的accept函数
    //这里做的事情不是accept,而是往io_context中注册一个accept事件,以及这个事件的处理函数
        _acceptor.async_accept(
                new_session->Socket(), //传入Session的socket,事件发生时,_acceptor会初始化成连接进来的socket
                std::bind( //修饰处理收到连接的方法
                        &Server::handle_accept,
                        this,
                        new_session,
                        std::placeholders::_1
                )
        );
}

void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec) {
        if (ec) {
                std::cout << "Error occured in accepting. Errmessage: " << ec.message() << std::endl;
                //delete new_session;
        }
        else { //没有错误
      //启动Session
                new_session->Start();
                _sessions.insert(std::make_pair(new_session->GetUuid(), new_session));
        }
       
    //这里无论成功还是失败,都要重新注册一个监听事件给io_context
    //以便能够继续监听连接
        start_accept();

}
源程序

Session

在处置惩罚读写数据时,我们不知道客户端会发多少数据来,同时还要处置惩罚数据粘包的题目。所以服务器和服务端之间的通信采用tlv(type length value)的协议举行通信。由于是回显服务器,type可用省略,在处置惩罚读到的数据时,要先解析头部,再读取头部看客户端发来多少数据。
头文件

#pragma once
#include <iostream>
#include <boost/asio.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include "Server.h"
#include <mutex>
#include <queue>
#include "MsgNode.h"
#include <iomanip>

class Server;

class MsgNode;

//注意到这里继承了std::enable_shared_from_this<Session>
//这样可以使得类内部能够使用shared_from_this()统一获取this指针的shared_ptr,shared_ptr的唯一性,防止不同的shared_ptr托管同一个this指针的资源
class Session: public std::enable_shared_from_this<Session>
{
public:
    //构造函数
        Session(boost::asio::io_context& ioc, Server* server);
        boost::asio::ip::tcp::socket& Socket();

    //启动读写
        void Start();

    //发送数据
        void Send(char* buf, std::size_t len);

    //返回uuid
        const std::string& GetUuid() {
                return _uuid;
        }

        ~Session() {
                std::cout << "delete session: uuid=" << _uuid << std::endl;
        }


private:
    //处理读事件
        void HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr);
    //处理写事件
        void HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr);
        boost::asio::ip::tcp::socket _socket;
        enum {max_length = 1024};
    //接受数据的缓冲区
        char _data = {0};

    //发送队列
        std::queue<std::shared_ptr<MsgNode>> _send_que;
    //队列的锁
        std::mutex _send_lock;

    //用于标识连接
        std::string _uuid;
    //引用server,以便能够在读写出错时,清除自身的连接
        Server* _server;

    //接受消息的节点
        std::shared_ptr<MsgNode> _recv_msg_node;
    //标识消息头部是否解析完成
        bool _b_head_parse;
    //头部消息节点
        std::shared_ptr<MsgNode> _recv_head_node;

};


定义代码

#include "Session.h"
Session::Session(boost::asio::io_context& ioc, Server* server) : _socket(ioc),
        _server(server), _b_head_parse(false) {
    //为Session生成uuid   
        boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
        _uuid = boost::uuids::to_string(a_uuid);
    //初始化消息头,通常头的长度是一个short类型的长度
        _recv_head_node = std::make_shared<MsgNode>(HEAD_LENGTH);
}
//返回使用的socket
boost::asio::ip::tcp::socket& Session::Socket() {
        return _socket;
}

void Session::Start() {

        //对象创建时已初始化_data
    //往io_context注册读事件,区别于另一个async_read函数,这里是能读就返回
        _socket.async_read_some(
                boost::asio::buffer( //构造一个buffer
                        _data,
                        max_length
                ),
                std::bind( //修饰处理
                        &Session::HandleRead,
                        this,
                        std::placeholders::_1,
                        std::placeholders::_2,
                        //std::make_shared<Session>(this)导致引用计数不同步
                        shared_from_this()
                )
        );
}

void Session::Send(char* buf, std::size_t len) {

    bool pending = false;

        std::lock_guard<std::mutex> guard(_send_lock);
    //如果在为加入消息节点之前,队列不为空,说明这个队列的消息正在发送(写完一个消息后,写事件的处理函数还会继续注册写事件,直到队列为空,所以,如果此时有消息,就说明后续还会继续回调写事件,不再需要在这里注册写事件)
        if (_send_que.size() > 0) {
                pending = true;
        }
    //往队列投递消息
        _send_que.emplace(std::make_shared<MsgNode>(buf, len));
        if (pending) {
                return;
        }


    //注册写事件
        boost::asio::async_write(
                _socket,
                boost::asio::buffer(
                        buf,
                        len
                ),
                std::bind(
                        &Session::HandleWrite,
                        this,
                        std::placeholders::_1,
                        shared_from_this()
                )
        );

}

//由于这里使用async_read_some,一读到数据就返回,所以处理的过程比较繁琐。还可以可以使用async_read,读取指定字节在返回,其内部实现依然是异步的,和async_read_some一样高效。
void Session::HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr) {
        if (ec) {
                std::cout << "Error occured in reading. ErrMessage: " << ec.message() << std::endl;
       
                _server->ClearSession(_uuid);
        }
        else {

                int copy_len = 0;
                while (bytes_transferred > 0) {
                        if (!_b_head_parse) {
                                //收到的数据不足头部大小
                                if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
                                        memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
                                        _recv_head_node->_cur_len += bytes_transferred;
                                        ::memset(_data, 0, max_length);
                                        _socket.async_read_some(boost::asio::buffer(_data, max_length
                                        ),
                                                std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
                                        return;
                                }
                                //收到的数据比头部多
                                //头部剩余未复制的长度
                                int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
                                memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain);
                                //更新已处理的data长度和剩余未处理的长度
                                copy_len += head_remain;
                                bytes_transferred -= head_remain;
                                //获取头部数据
                                int data_len = 0;
                                memcpy(&data_len, _recv_head_node->_msg, HEAD_LENGTH);
                                std::cout << "data_len is " << data_len << std::endl;
                                //头部长度非法
                                if (data_len > max_length) {
                                        std::cout << "invalid data length is " << data_len << std::endl;
                                        _server->ClearSession(_uuid);
                                        return;
                                }
                                _recv_msg_node = std::make_shared<MsgNode>(data_len);
                                //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
                                if (bytes_transferred < data_len) {
                                        memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                                        _recv_msg_node->_cur_len += bytes_transferred;
                                        ::memset(_data, 0, max_length);
                                        _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                                std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
                                        //头部处理完成
                                        _b_head_parse = true;
                                        return;
                                }
                                memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len);
                                _recv_msg_node->_cur_len += data_len;
                                copy_len += data_len;
                                bytes_transferred -= data_len;
                                _recv_msg_node->_msg = '\0';
                                std::cout << "receive data is " << _recv_msg_node->_msg << std::endl;
                                //此处可以调用Send发送测试
                                Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len);
                                //继续轮询剩余未处理数据
                                _b_head_parse = false;
                                _recv_head_node->Clear();
                                if (bytes_transferred <= 0) {
                                        ::memset(_data, 0, max_length);
                                        _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                                std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
                                        return;
                                }
                                continue;
                        }
                        //已经处理完头部,处理上次未接受完的消息数据
                        //接收的数据仍不足剩余未处理的
                        int remain_msg = _recv_msg_node->_tot_len - _recv_msg_node->_cur_len;
                        if (bytes_transferred < remain_msg) {
                                memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                                _recv_msg_node->_cur_len += bytes_transferred;
                                ::memset(_data, 0, max_length);
                                _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                        std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
                                return;
                        }
                        memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
                        _recv_msg_node->_cur_len += remain_msg;
                        bytes_transferred -= remain_msg;
                        copy_len += remain_msg;
                        _recv_msg_node->_msg = '\0';
                        std::cout << "receive data is " << _recv_msg_node->_msg << std::endl;
                        //此处可以调用Send发送测试
                        Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len);
                        //继续轮询剩余未处理数据
                        _b_head_parse = false;
                        _recv_head_node->Clear();
                        if (bytes_transferred <= 0) {
                                ::memset(_data, 0, max_length);
                                _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                        std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr));
                                return;
                        }
                        continue;
                }
        }
}
//处理写事件
void Session::HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr) {
        if (ec) {
                std::cout << "Error occured in writting. ErrMessage: " << ec.message() << std::endl;
                //delete this;
                _server->ClearSession(_uuid);
        }
        else {
                std::lock_guard<std::mutex> guard(_send_lock);
                _send_que.pop(); //使用async_write,保证发完指定的字节数
                if (!_send_que.empty()) {
                        auto& msg = _send_que.front();
                        boost::asio::async_write(
                                _socket,
                                boost::asio::buffer(
                                        msg->_msg,
                                        msg->_tot_len
                                ), //使用async_write,可以不做偏移
                                std::bind(
                                        &Session::HandleWrite,
                                        this,
                                        std::placeholders::_1,
                                        _self_ptr
                                )
                        );
                }
        }
}
MsgNode

#pragma once

#include <memory>
#include "Session.h"

const int HEAD_LENGTH = sizeof(int);

class MsgNode
{
public:

        friend class Session;

        MsgNode(char* msg, int tot_len) :_cur_len(0), _tot_len(tot_len) {
                _msg = new char;
                memcpy(_msg, &tot_len, HEAD_LENGTH);
                memcpy(_msg + HEAD_LENGTH, msg, _tot_len);
                _msg = '\0';
        }

        MsgNode(int tot_len) : _cur_len(0), _tot_len(tot_len) {
                _msg = new char;
                memset(_msg, 0, _tot_len + 1);
        }

        ~MsgNode() {
                delete[] _msg;
                _msg = nullptr;
        }

        void Clear() {
                memset(_msg, 0, _tot_len);
                _cur_len = 0;
        }

private:

        char* _msg;
        int _cur_len;
        int _tot_len;

};


技能利用中碰到的题目息争决过程。

题目

实际上,在项目开始时尽管知道了异步函数执行的时机不能确定,但是怎样延长Session的生命周期使其恰到利益的和注册的读写事件同步,也就是读写事件发生时,Session必须有用,它的读写事件不会再注册后,必须回收它的资源。
解决过程



[*]利用shared_ptr管理Session,同时Session要注册到Server的_sessions。在Session堕落或者对端关闭socket的写端之后,再从Server中利用ClearSession()清除自己。
[*]仅仅这样还是不够的,为什么?因为这个毗连在仅仅在这个事件清除了自己,但是后续大概另有这个毗连的其他事件,这样会使后续事件碰到无效的对象。所以对于每个事件回调,还要拷贝这个Session的shared_ptr,以保证这个Session在这个事件的回调中是有用的。
举行总结。

根据自己的学习路线,这属于项目实战的内容。尽管只先容了一个简单的回显服务器,但是这个例子已经能够很好的体现异步服务器的层次和框架。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: C++网络编程之利用Boost库搭建简单的异步服务器