【Qt】tcp服务器、tcp多线程服务器、心跳保持、服务端组包 ...

打印 上一主题 下一主题

主题 552|帖子 552|积分 1656

背景:

局域网内,客户端会进行udp广播,服务端会监听udp广播并复兴,以此可以让客户端发现服务端。
发现之后,客户端可以发起建立tcp毗连的请求,服务端响应请求,建立tcp毗连。
然后客户端可以与服务端进行tcp消息的收发(数据以json包格式传输)。
注意:本文仅是服务端代码
代码实现(服务端):

封装了一个networkmanager类来完成网络通讯功能。
networkmanager.h
  1. #pragma once
  2. #include <QtWidgets/QWidget>
  3. #include <QUdpSocket>
  4. #include <QTcpServer>
  5. #include <QTcpSocket>
  6. #include <QDebug>
  7. #include <QByteArray>
  8. #include <QMap>
  9. #include <QString>
  10. #include <QJsonDocument>
  11. #include <QJsonObject>
  12. class NetworkManager : public QWidget
  13. {
  14.         Q_OBJECT
  15. public:
  16.         NetworkManager(QWidget* parent = nullptr);
  17.         void tcpSendCommand(QString ip, QString Command);//tcp发送指令
  18.         void removeTcpConnect(QString ip);
  19. signals:
  20.         void signalProjectData(QString ip, QJsonObject jsonObject);
  21.         void signalPerformanceData(QString ip, QJsonObject jsonObject);
  22.         void signalStatusData(QString ip, QJsonObject jsonObject);
  23.         void signalDisconnect(QString ip);
  24. private:
  25.         class Impl;
  26.         std::unique_ptr<Impl> _impl = nullptr;
  27. };
复制代码
networkmanager.cpp
  1. #include "networkmanager.h"
  2. class NetworkManager::Impl : public QObject
  3. {
  4.     Q_OBJECT
  5. public:
  6.     explicit Impl(NetworkManager* obj);
  7.     ~Impl() final;
  8.     NetworkManager* _self = nullptr;
  9.     QUdpSocket* _udpSocket = nullptr;
  10.     QTcpServer* _tcpServer = nullptr;
  11.     QMap<QString, QTcpSocket*> _tcpSocketList;//tcp连接列表
  12. public:
  13.     void udpStartListening();//udp开始监听
  14.     void tcpStartListening();// tcp开始监听
  15. public slots:
  16.     void slotUdpReadData();
  17.     void slotTcpProcessConnection();
  18.     void slotReadSocket();
  19.     void slotDisconnectSocket();
  20. };
  21. NetworkManager::Impl::Impl(NetworkManager* obj) :_self(obj)
  22. {}
  23. NetworkManager::Impl::~Impl() = default;
  24. NetworkManager::NetworkManager(QWidget* parent): QWidget(parent), _impl(std::make_unique<Impl>(this))
  25. {
  26.         _impl->_udpSocket = new QUdpSocket(this);
  27.     _impl->_tcpServer = new QTcpServer(this);
  28.     _impl->tcpStartListening();
  29.     _impl->udpStartListening();
  30. }
  31. void NetworkManager::Impl::tcpStartListening()
  32. {
  33.     // 有新的连接 绑定连接处理函数
  34.     connect(_tcpServer, &QTcpServer::newConnection, this, &Impl::slotTcpProcessConnection);
  35.    
  36.     if (_tcpServer->listen(QHostAddress::Any, 5556)) { // 端口号
  37.         qDebug() << "tcp Listening on port 5555";
  38.     }
  39.     else {
  40.         qDebug() << "TCP Error" << "Failed to Listen to port 5555";
  41.     }
  42. }
  43. void NetworkManager::Impl::udpStartListening()// udp监听
  44. {
  45.     connect(_udpSocket, &QUdpSocket::readyRead, this, &Impl::slotUdpReadData);
  46.     if (_udpSocket->bind(QHostAddress::Any, 5555)) { // 端口号
  47.         qDebug() << "udp Listening on port 5555";
  48.     }
  49.     else {
  50.         qDebug()<< "UDP Error"<< "Failed to bind to port 5555";
  51.     }
  52. }
  53. void NetworkManager::Impl::slotUdpReadData()
  54. {
  55.     while (_udpSocket->hasPendingDatagrams()) {
  56.         QByteArray datagram;
  57.         datagram.resize(_udpSocket->pendingDatagramSize());
  58.         QHostAddress sender;
  59.         quint16 senderPort;
  60.         // 读取数据包
  61.         _udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort);
  62.         // 打印接收到的数据
  63.         qDebug() << "Received datagram from" << sender.toString() << ":" << senderPort;
  64.         // 发送响应消息
  65.         QByteArray response = "udp responsed" ;
  66.         _udpSocket->writeDatagram(response, QHostAddress(sender), senderPort);
  67.     }
  68. }
  69. void NetworkManager::Impl::slotTcpProcessConnection()// tcp 连接处理
  70. {
  71.     // 通过TcpServer拿到一个socket对象clientsocket,用它来和客户端通信;
  72.     QTcpSocket* newTcpSocket = _tcpServer->nextPendingConnection();
  73.    
  74.     QString ip = newTcpSocket->peerAddress().toString();
  75.     QString ipv4 = QHostAddress(ip.mid(7)).toString();// 获取ip地址
  76.    
  77.     qDebug() << ipv4;
  78.     // 将连接存到map中
  79.     _tcpSocketList.insert(ipv4, newTcpSocket);
  80.     connect(newTcpSocket, &QTcpSocket::readyRead, this, &Impl::slotReadSocket);
  81.     connect(newTcpSocket, &QTcpSocket::disconnected, this, &Impl::slotDisconnectSocket);
  82. }
  83. void NetworkManager::Impl::slotReadSocket()
  84. {
  85.     qDebug() << "tcp read data";
  86.     QTcpSocket* tempSocket = qobject_cast<QTcpSocket*>(sender());
  87.     if (tempSocket)
  88.     {
  89.         //先获取发送消息方的ip地址
  90.         QString ip = tempSocket->peerAddress().toString();
  91.         QString ipv4 = QHostAddress(ip.mid(7)).toString();
  92.         if (_tcpSocketList.contains(ipv4))
  93.         {
  94.             //解析
  95.             QByteArray jsonData = _tcpSocketList[ipv4]->readAll();// 读取数据
  96.             QJsonDocument document = QJsonDocument::fromJson(jsonData);
  97.             if (document.isNull()) {
  98.                 // 处理JSON解析错误
  99.                 qDebug()<<"document is Null";
  100.             }
  101.             else {
  102.                 if (document.isObject()) {
  103.                     QJsonObject jsonObject = document.object();
  104.                     // 解析JSON文件 根据type 做不同处理
  105.                     QString type = jsonObject["Type"].toString();
  106.                     if (type == "Project")
  107.                     {
  108.                         emit _self->signalProjectData(ipv4, jsonObject);
  109.                     }
  110.                     else if (type == "Performance")
  111.                     {
  112.                         emit _self->signalPerformanceData(ipv4, jsonObject);
  113.                     }
  114.                     else if (type == "Status")
  115.                     {
  116.                         emit _self->signalStatusData(ipv4, jsonObject);
  117.                     }
  118.                 }
  119.             }
  120.         }
  121.     }
  122. }
  123. // 处理断连
  124. void NetworkManager::Impl::slotDisconnectSocket()
  125. {
  126.     QTcpSocket* tempSocket = qobject_cast<QTcpSocket*>(sender());
  127.     QString ip = tempSocket->peerAddress().toString();
  128.     QString ipv4 = QHostAddress(ip.mid(7)).toString();
  129.     qDebug() << "tcp disconnect:" << ipv4;
  130.     _tcpSocketList[ipv4]->close();
  131.     _tcpSocketList.remove(ipv4);//移除列表
  132.     // 发出信号  提示断连
  133.     emit _self->signalDisconnect(ipv4);
  134. }
  135. // 通信 发送指令
  136. void NetworkManager::tcpSendCommand(QString ip, QString Command)
  137. {
  138.     qDebug() << Command<<"   "<<ip;
  139.     if (_impl->_tcpSocketList.contains(ip))
  140.     {
  141.         // 发送指令
  142.         qDebug() << "send command:"<< Command;
  143.         _impl->_tcpSocketList[ip]->write(Command.toUtf8());
  144.     }
  145. }
  146. void NetworkManager::removeTcpConnect(QString ip)
  147. {
  148.     // 移除该tcp连接
  149.     _impl->_tcpSocketList.remove(ip);
  150. }
  151. #include "networkmanager.moc"
  152.    
复制代码
总结

这个服务端的网络通信代码,虽然实现了功能,但是尚有以下不足:
1、收发消息是串行的。如果多台客户端毗连,会将每个客户端的ip以及socket存入map,收发消息都是主线程在做,如果多个同时来消息,实在是串行处置惩罚的。
2、没有做心跳保持。如果客户端断线,服务器并不知道。
改进方案:多线程tcp服务器

将通信交给子线程去做,实现并行消息收发,而且在子线程内加上心跳保持。
实现方案:
封装TcpServer类,重写TcpServer的incomingConnection方法,每当有新的毗连进来,会自动调用这个函数,我们重写这个函数,在这个函数中进行子线程的创建,利用子线程进行通信。
将子线程与通信的标识符,存在一个map中,方便对多个客户端通信。
重写Thread,封装为TcpThread,重写run函数,在内里初始化该线程的socket,并毗连相应的收发消息信号和槽函数。
重写TcpSocket类,主要是在内部实现,收发消息信号的转发和处置惩罚。

代码实现(服务端)

networkmanager.h
  1. #include "mytcpsocket.h"
  2. #include "tcpthread.h"
  3. #include <QtWidgets/QWidget>
  4. #include <QUdpSocket>
  5. #include <QTcpServer>
  6. #include <QTcpSocket>
  7. #include <QDebug>
  8. #include <QByteArray>
  9. #include <QMap>
  10. #include <QString>
  11. #include <QJsonDocument>
  12. #include <QJsonObject>
  13. class NetworkManager : public QWidget
  14. {
  15.         Q_OBJECT
  16. public:
  17.         NetworkManager(QWidget* parent = nullptr);
  18.         void tcpSendCommand(QString ip, QString Command);//tcp发送指令
  19.         void removeTcpConnect(QString ip);
  20. signals:
  21.         void signalProjectData(QString ip, const QJsonObject& jsonObject);
  22.         void signalPerformanceData(QString ip, const QJsonObject& jsonObject);
  23.         void signalStatusData(QString ip, const QJsonObject& jsonObject);
  24.         void signalDisconnect(QString ip);
  25. private:
  26.         class Impl;
  27.         std::unique_ptr<Impl> _impl = nullptr;
  28. };
复制代码
networkmanager.cpp
  1. #include "networkmanager.h"
  2. class NetworkManager::Impl : public QObject
  3. {
  4.     Q_OBJECT
  5. public:
  6.     explicit Impl(NetworkManager* obj);
  7.     ~Impl() final;
  8.     void udpStartListening();//udp开始监听
  9.     void tcpStartListening();// tcp开始监听
  10. public:
  11.     NetworkManager* _self = nullptr;
  12.     QUdpSocket* _udpSocket = nullptr;
  13.     MyTcpServer* _tcpServer = nullptr;
  14.     QMap<QString, qintptr> _tcpList;//tcp连接列表  ip和描述符
  15. public slots:
  16.     void slotUdpReadData();
  17.     void slotTcpProcessConnection(QString ip, qintptr sockDesc);
  18.     void slotReadSocket(QString ip, qintptr sockDesc, const QByteArray& msg);
  19.     void slotDisconnectSocket(QString ip, qintptr sockDesc);
  20. };
  21. NetworkManager::Impl::Impl(NetworkManager* obj) :_self(obj)
  22. {}
  23. NetworkManager::Impl::~Impl() = default;
  24. NetworkManager::NetworkManager(QWidget* parent): QWidget(parent), _impl(std::make_unique<Impl>(this))
  25. {
  26.         _impl->_udpSocket = new QUdpSocket(this);
  27.     _impl->_tcpServer = new MyTcpServer(this);// 将网络管理类设置为父对象
  28.     _impl->tcpStartListening();
  29.     _impl->udpStartListening();
  30. }
  31. // tcp 开始监听
  32. void NetworkManager::Impl::tcpStartListening()
  33. {
  34.     // 有新的连接 绑定连接处理函数
  35.     connect(_tcpServer, &MyTcpServer::signalNewTcpConnect, this, &Impl::slotTcpProcessConnection);
  36.     // 收到新的消息
  37.     connect(_tcpServer, &MyTcpServer::signalTcpGetMsg, this, &Impl::slotReadSocket);
  38.     // 客户端断连消息
  39.     connect(_tcpServer, &MyTcpServer::signalTcpDisconnect, this, &Impl::slotDisconnectSocket);
  40.     if (_tcpServer->listen(QHostAddress::Any, 5556)) { // 端口号
  41.         qDebug() << "tcp Listening on port 5556";
  42.     }
  43.     else {
  44.         qDebug() << "TCP Error" << "Failed to Listen to port 5556";
  45.     }
  46. }
  47. // udp开始监听
  48. void NetworkManager::Impl::udpStartListening()
  49. {
  50.     // 收到udp消息
  51.     connect(_udpSocket, &QUdpSocket::readyRead, this, &Impl::slotUdpReadData);
  52.     if (_udpSocket->bind(QHostAddress::Any, 5555)) { // 端口号
  53.         qDebug() << "udp Listening on port 5555";
  54.     }
  55.     else {
  56.         qDebug()<< "UDP Error"<< "Failed to bind to port 5555";
  57.     }
  58. }
  59. // udp消息读取槽函数
  60. void NetworkManager::Impl::slotUdpReadData()
  61. {
  62.     while (_udpSocket->hasPendingDatagrams())
  63.     {
  64.         QByteArray datagram;
  65.         datagram.resize(_udpSocket->pendingDatagramSize());
  66.         QHostAddress sender;
  67.         quint16 senderPort;
  68.         // 读取数据包
  69.         _udpSocket->readDatagram(datagram.data(), datagram.size(), &sender, &senderPort);
  70.         // 回复响应消息
  71.         QByteArray response = "udp responsed" ;
  72.         _udpSocket->writeDatagram(response, QHostAddress(sender), senderPort);
  73.     }
  74. }
  75. // tcp 连接处理
  76. void NetworkManager::Impl::slotTcpProcessConnection(QString ip, qintptr sockDesc)
  77. {
  78.     qDebug() << "new connect:"<<ip;
  79.     _tcpList.insert(ip, sockDesc);// 将ip地址和描述符存起来
  80.     // ps: 此处业务逻辑。新的连接建立 主界面并不做响应  建立连接后客户端会发送消息 收到消息后主界面才做响应
  81. }
  82. // tcp读取消息槽函数
  83. void NetworkManager::Impl::slotReadSocket(QString ip, qintptr sockDesc, const QByteArray& msg)
  84. {
  85.     QJsonDocument document = QJsonDocument::fromJson(msg);
  86.     if (document.isNull()) {
  87.         // 处理JSON解析错误
  88.         qDebug() << "JSON is NULL";
  89.     }
  90.     else {
  91.         if (document.isObject()) {
  92.             
  93.             QJsonObject jsonObject = document.object();
  94.             // 根据key获取value
  95.             QString type = jsonObject["Type"].toString();
  96.             if (type == "Project")
  97.             {
  98.                 emit _self->signalProjectData(ip, jsonObject);
  99.             }
  100.             else if (type == "Performance")
  101.             {
  102.                 emit _self->signalPerformanceData(ip, jsonObject);
  103.             }
  104.             else if (type == "Status")
  105.             {
  106.                 emit _self->signalStatusData(ip, jsonObject);
  107.             }
  108.         }
  109.     }
  110. }
  111. // 处理断开连接
  112. void NetworkManager::Impl::slotDisconnectSocket(QString ip, qintptr sockDesc)
  113. {
  114.     qDebug() << "tcp disconnect:" << ip;
  115.     // 发出信号  提示断连
  116.     emit _self->signalDisconnect(ip);
  117. }
  118. // 通信 发送指令
  119. void NetworkManager::tcpSendCommand(QString ip, QString Command)
  120. {
  121.     if (_impl->_tcpList.contains(ip))
  122.     {
  123.         // 发送指令
  124.         qDebug() << "send command:" << Command << " to " << ip;
  125.         _impl->_tcpServer->sendData(_impl->_tcpList[ip], Command.toUtf8());
  126.     }
  127.     else
  128.     {
  129.         qDebug() << "The ip address is not in the _tcpList. Unable to send message";
  130.     }
  131. }
  132. void NetworkManager::removeTcpConnect(QString ip)
  133. {
  134.     // 移除该tcp连接
  135.     _impl->_tcpList.remove(ip);
  136. }
  137. #include "networkmanager.moc"
复制代码
mytcpserver.h
  1. #pragma once
  2. #include "mytcpsocket.h"
  3. #include "tcpthread.h"
  4. #include "networkmanager.h"
  5. #include <QObject>
  6. #include <QMap>
  7. #include <QTcpServer>
  8. class MyTcpServer :
  9.     public QTcpServer
  10. {
  11.     Q_OBJECT
  12. public:
  13.     explicit MyTcpServer(QObject* parent = nullptr);
  14.     void sendData(qintptr sockDesc, const QByteArray& msg);
  15.     void slotTcpDisconnect(QString ip, qintptr sockDesc);
  16. signals:
  17.     void signalNewTcpConnect(QString ip, qintptr sockDesc);// 有新的tcp连接
  18.     void signalTcpGetMsg(QString ip, qintptr Desc, const QByteArray& msg);// 收到新的消息
  19.     void signalTcpDisconnect(QString ip, qintptr sockDesc);// tcp断连 传给网络管理类
  20. private:
  21.     void incomingConnection(qintptr sockDesc) override;
  22.     QMap<qintptr, TcpThread*> _socketMap;//用来保存每一个客户端的通信线程
  23. };
复制代码
mytcpserver.cpp
重写了incomingConnection方法
  1. #include "mytcpserver.h"
  2. MyTcpServer::MyTcpServer(QObject* parent) : QTcpServer(parent)
  3. {
  4. }
  5. //当有新的连接进来的时候会自动调用这个函数,不需要你去绑定信号槽
  6. void MyTcpServer::incomingConnection(qintptr sockDesc)
  7. {
  8.     // 用于注册一个类型,使其可以被Qt的元系统识别(不写这个,线程通信会报错)
  9.     qRegisterMetaType<qintptr>("qintptr");
  10.     //产生线程用于通信
  11.     TcpThread* tcpthread = new TcpThread(sockDesc);
  12.     // 转移
  13.     tcpthread->moveToThread(tcpthread);
  14.     //将标识符和对应的线程 存在map中
  15.     _socketMap.insert(sockDesc, tcpthread);
  16.     //将新的tcp连接的ip发送给网络管理类
  17.     connect(tcpthread, &TcpThread::signalSendIp, this, &MyTcpServer::signalNewTcpConnect);
  18.     // 底层socket收到消息时触发readyread,通过signalGetMsg将消息传给线程
  19.     // 线程通过signalThreadGetMsg将消息转发给tcpserver
  20.     // tcpserver通过signalTcpGetMsg再将消息转发给网络管理类
  21.     connect(tcpthread, &TcpThread::signalThreadGetMsg, this, &MyTcpServer::signalTcpGetMsg);
  22.     //线程中发出断开tcp连接,传给网络管理类,tcp断开连接
  23.     connect(tcpthread, &TcpThread::signalTcpDisconnect, this, &MyTcpServer::slotTcpDisconnect);
  24.     // 开启线程
  25.     tcpthread->start();
  26. }
  27. // 发送消息
  28. void MyTcpServer::sendData(qintptr sockDesc, const QByteArray& msg)
  29. {
  30.     if(_socketMap.contains(sockDesc))
  31.     {
  32.         // 触发对应线程发送消息
  33.         emit _socketMap[sockDesc]->signalSendData(sockDesc, msg);
  34.     }
  35. }
  36. void MyTcpServer::slotTcpDisconnect(QString ip, qintptr sockDesc)
  37. {
  38.     _socketMap.remove(sockDesc); // 移除
  39.     emit signalTcpDisconnect(ip, sockDesc); // 发信号告诉网络管理类 tcp断连
  40. }
复制代码
tcpthread.h
  1. #pragma once
  2. #include "mytcpsocket.h"
  3. #include <QThread>
  4. #include <QTimer>
  5. #include <QJsonDocument>
  6. #include <QJsonObject>
  7. #include <QHostAddress>
  8. class TcpThread :
  9.     public QThread
  10. {
  11.     Q_OBJECT
  12. public:
  13.     //构造函数初始化套接字标识符
  14.     explicit TcpThread(qintptr sockDesc, QObject* parent = nullptr);
  15.     ~TcpThread();
  16.     void run() override;
  17. signals:
  18.     void signalTcpDisconnect(QString ip, qintptr sockDesc);// 线程给tcpserver发信号 tcp断连
  19.     void signalSendData(qintptr Desc, const QByteArray& msg); // 线程发送消息
  20.     void signalThreadGetMsg(QString ip, qintptr Desc, const QByteArray& msg); // 线程收到消息
  21.     void signalSendIp(QString ip, qintptr Desc); // 线程给tcpserver发信号 有新的连接(主要是发ip)
  22. public  slots:
  23.     void slotSendData(qintptr Desc, const QByteArray& msg);
  24.     void slotReceiveData(qintptr Desc, const QByteArray& msg);
  25.     void slotDisconnect();
  26.     void slotSendHeartbeat();
  27. private:
  28.     qintptr _socketDesc;
  29.     QString _ip;
  30.     QByteArray _imgData;// 用于大文件组包
  31.     MyTcpSocket* _socket = nullptr;
  32.     QTimer* _heartbeatTimer;// 心跳包发送间隔
  33.     int _checkTime = 5;// 控制心跳断联时间
  34. };
复制代码
tcpthread.cpp
ps:这个内里的实现逻辑必要注意以下,在run中,主要是四个操作:收消息、发消息、断连处置惩罚、心跳保持
由于我这里收到的消息都是json包(和客户端约定好的格式),所以我拿收到的消息解析一下json,判断一下字段,即可知道是不是心跳包,如果是心跳包,我就直接对我的心跳计数更新,如果不是,那就分析是数据包,我就转发给主线程。
这个tcpthread类实在相称于mytcpsocket和mytcpserver的中间层,底层的mytcpsocket是真正完成收发消息的,tcpthread主要是负责管理这个socket的线程,它主要做的就是将收到的消息转发出去,以及接收外界的发消息指令,将指令转给socekt。
分析两个实现思路:
心跳保持:

我定义了一个计时器,每隔两秒我会发送一条心跳消息emit _socket->signalSocketSendMsg(this->_socketDesc, "heartbeat");
客户端接收到心跳消息后,也会回我一个心跳复兴,是个json,内里type字段是heartbeat。我收到这个消息,这就完成了心跳保持。
但是代码里有个_checkTime是什么意思呢?
如果我充公到心跳复兴,那么并不能立刻分析客户端掉线了,有的时候可能网络颠簸,或者别的什么原因。所以我们想给心跳保持加一个容错时间。
我设置了一个变量:_checkTime初始化=5(可根据情况本身定义)
当我发送一次心跳包,我就_checkTime--
我收到一次心跳包,我就_checkTime++
这样只要双方正常收发心跳,那么_checkTime就不停是5,如果没有收到客户端的复兴,那么_checkTime就会不停减少。
而我在每次发送心跳包之前都会检查一下_checkTime,如果小于0了,那么分析客户端掉线了。也就是说,我给客户端留的冗余时间是10秒
容错时间=_checkTime*心跳包发送频率
大文件收发

我们的业务中,收发的都是json包,而且其中有的时候可能json包中有图片(编码后的图片)
这个图片比较大,可能64kb-170kb
而tcpsocket的readyRead是有接收限定的(似乎是最大可以接收64kb的数据,记不清了,可以去搜刮一下)
这样的话图片一次就发不完,客户端必要多次发送。
一样寻常来讲,这种大文件传输的情况精确的处置惩罚方法是:
客户端拆包,服务器组包。而且客户端发送之前必要先发送如文件名,文件大小等信息,客户端接收到后,再向客户端请求文件数据。
客户端接收到请求后,再发送文件数据,而且双方约定好每次传输的字节大小,接收完了之后,服务端以此为依据完成组包。
详细我就不赘述了,可以搜刮别的去看。
我这里用的不是正常的处置惩罚方法,我这里取了个巧。只实用于我的业务场景。
由于我这里收发都是json包,所以我利用了json包的解析来完成了一个简易的组包。
详细实现思路:客户端不必要管,文件大不大,直接发就行,文件小的话一次就发了,文件大的话socket内部会自动帮你分为几次发送的。
而服务端接收到数据以后,拿去解析json,如果小文件,直接解析成功了。如果这次是大文件,一次没发完,那么就会解析失败。ok,关键就是这里,解析失败我就把这次的数据存起来,然后下次数据来了,由于不完备天然还会解析失败,我就累加上去(这也就相称于组包了),每次解析失败我都累加,而且累加完了我再次解析,如果解析成功了,那就分析发完了。那就正常转发消息即可,注意要清空用来组包的变量。
由此就可以完成一个简易的组包,这只实用于特别情况下。
  1. #include "tcpthread.h"
  2. TcpThread::TcpThread(qintptr sockDesc, QObject* parent) : QThread(parent)
  3. {
  4.     this->_socketDesc = sockDesc;
  5. }
  6. void TcpThread::run()
  7. {
  8.     // 打印线程id  测试用
  9.     /*qDebug() << "run:";
  10.     qDebug() << "Current thread ID:" << QThread::currentThreadId();*/
  11.     _socket = new MyTcpSocket(this->_socketDesc);
  12.     //绑定套接字标识符绑定给自定义套接字对象
  13.     _socket->setSocketDescriptor(this->_socketDesc);
  14.     // 拿到ip地址
  15.     QString ipHostAddress = _socket->peerAddress().toString();
  16.     _ip = QHostAddress(ipHostAddress.mid(7)).toString();
  17.     // 告诉tcpserver 有新的连接建立 并将ip传递出去
  18.     emit signalSendIp(_ip, this->_socketDesc);
  19.    
  20.     // 收到消息
  21.     // socket发出有消息的信号,然后触发线程中发出有消息的信号
  22.     connect(_socket, &MyTcpSocket::signalSocketGetMsg, this, &TcpThread::slotReceiveData);
  23.     // 发送消息
  24.     // 当线程收到sendData信号时候,通知socket发送消息
  25.     connect(this, &TcpThread::signalSendData, _socket, &MyTcpSocket::slotSendData);
  26.     // 断开连接
  27.     // 当套接字断开时,发送底层的disconnected信号
  28.     connect(_socket, &MyTcpSocket::disconnected, this, &TcpThread::slotDisconnect);
  29.     // 心跳连接
  30.     _heartbeatTimer = new QTimer(this);
  31.     connect(_heartbeatTimer, &QTimer::timeout, this, &TcpThread::slotSendHeartbeat);
  32.     _heartbeatTimer->start(2*1000); // 2秒发送一次心跳
  33.     this->exec();//在Qt中启动消息机制
  34. }
  35. TcpThread::~TcpThread()
  36. {
  37.     delete  _socket;
  38. }
  39. // 发送消息
  40. void TcpThread::slotSendData(qintptr Desc, const QByteArray& msg)
  41. {
  42.     // 触发socket发送消息的信号 将消息传递进去
  43.     if (_socket)
  44.     {
  45.         emit _socket->signalSocketSendMsg(Desc, msg);
  46.     }
  47. }
  48. // 收到消息
  49. void TcpThread::slotReceiveData(qintptr Desc, const QByteArray& msg)
  50. {
  51.     // 解析一下
  52.     QJsonDocument document = QJsonDocument::fromJson(msg);
  53.     if (document.isNull()) { // 如果是大数据(如图片) 可能一次发不完 需要进行合包
  54.         // 处理JSON解析错误
  55.         qDebug() << "JSON is NULL(Thread)";
  56.         
  57.         _imgData += msg;// 累加合包
  58.         QJsonDocument imgDocument = QJsonDocument::fromJson(_imgData);
  59.         if (!imgDocument.isNull())
  60.         {
  61.             if (imgDocument.isObject())
  62.             {
  63.                 emit signalThreadGetMsg(_ip, Desc, _imgData);// 转发出去
  64.                 qDebug() << "QByteArray:img data size" << _imgData.size();
  65.                 _imgData.clear();// 清除
  66.             }
  67.         }
  68.     }
  69.     else { //不是大文件 一次可以接收完
  70.         if (document.isObject())
  71.         {
  72.             QJsonObject jsonObject = document.object();
  73.             QString type = jsonObject["Type"].toString();
  74.             if (type == "Heartbeat") // 如果是心跳包 直接拦截
  75.             {
  76.                 _checkTime++; // 是心跳回复 计数加1
  77.             }
  78.             else
  79.             {
  80.                 // 如果不是心跳包 说明是数据包 转发出去
  81.                 emit signalThreadGetMsg(_ip, Desc, msg);
  82.             }
  83.         }
  84.     }
  85. }
  86. // 断开连接
  87. void TcpThread::slotDisconnect()
  88. {
  89.     // 告诉tcpserver tcp断联
  90.     emit signalTcpDisconnect(_ip, this->_socketDesc);
  91.     //让该线程中的套接字断开连接
  92.     _socket->disconnectFromHost();//断开连接
  93.     //线程退出
  94.     qDebug() << "thread quit";
  95.     this->quit();
  96. }
  97. // 心跳保持
  98. void TcpThread::slotSendHeartbeat()
  99. {
  100.     qDebug() << "heartbeat-----------"<<_checkTime;
  101.     if(_checkTime <= 0) // 检查心跳计时
  102.     {
  103.         // 客户端掉线
  104.         qDebug() << "heartbeat:client disconnect";
  105.         _socket->disconnectFromHost();//断开连接  执行这个会触发socket的disconnect
  106.         //线程退出
  107.         qDebug() << "thread quit";
  108.         this->quit();
  109.     }
  110.     else
  111.     {
  112.         // 发送心跳包
  113.         emit _socket->signalSocketSendMsg(this->_socketDesc, "heartbeat");
  114.         _checkTime--;
  115.     }
  116. }      
复制代码
mytcpsocket.h
  1. #pragma once
  2. #include <QObject>
  3. #include <QThread>
  4. #include <QTcpSocket>
  5. class MyTcpSocket :
  6.     public QTcpSocket
  7. {
  8.     Q_OBJECT
  9. public:
  10.     explicit MyTcpSocket(qintptr socketDesc, QObject* parent = nullptr);
  11. public slots:
  12.     void slotSendData(qintptr socketDesc, const QByteArray& data);
  13. signals:
  14.     void signalSocketSendMsg(qintptr socketDesc, const QByteArray& msg);// 发送消息
  15.     void signalSocketGetMsg(qintptr socketDesc, const QByteArray& msg); // 接收消息
  16. private:
  17.     qintptr _sockDesc;
  18. };
复制代码
mytcpsocket.cpp
  1. #include "mytcpsocket.h"
  2. MyTcpSocket::MyTcpSocket(qintptr socketDesc, QObject* parent) : QTcpSocket(parent)
  3. {
  4.     this->_sockDesc = socketDesc;
  5.     // 线程内触发socket的发送消息信号 则执行对应发送消息的槽函数
  6.     connect(this, &MyTcpSocket::signalSocketSendMsg, this, &MyTcpSocket::slotSendData);
  7.     // 收到消息  传递出去
  8.     connect(this, &MyTcpSocket::readyRead, this, [=] {
  9.         QByteArray msg = readAll();
  10.         emit signalSocketGetMsg(_sockDesc, msg);
  11.        });
  12. }
  13. // 发送消息
  14. void MyTcpSocket::slotSendData(qintptr socketDesc, const QByteArray& msg)
  15. {
  16.     if (socketDesc == _sockDesc && !msg.isEmpty()) {
  17.         this->write(msg);
  18.     }
  19. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

麻花痒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表