ToB企服应用市场:ToB评测及商务社交产业平台

标题: 基于C语言从0开始手撸MQTT协议代码连接标准的MQTT服务器,完成数据上传和命 [打印本页]

作者: 饭宝    时间: 2024-6-28 21:36
标题: 基于C语言从0开始手撸MQTT协议代码连接标准的MQTT服务器,完成数据上传和命
一、前言

近年来,物联网的发展如火如荼,已经渗透到我们生活的方方面面。从智能家居到工业主动化,从智慧城市到智慧农业,物联网正在以前所未有的速度改变着我们的生活。 大家现在可能已经习惯了通过手机控制家里的灯光、空调和电视,这就是物联网在智能家居领域的应用,如果在10年前看到这种装备的应用肯定觉得很牛批,而现在只要是个装备都能上云,这种家电装备的远程控制已经成了大家家常便饭的配置了。而在工业领域,物联网技能可以帮助企业实现主动化生产、装备监控和防备性维护,提高生产效率和产品质量。在智慧城市建设中,物联网技能可以用于交通管理、环境监测和公共安全等方面,提拔城市管理和住民生活的质量。
从物联网开始兴起的时候,各大厂家都纷纷推出了自家的IOT物联网平台。 好比: 机敏云、中国移动的onenet、阿里云的IOT、百度的天工物接入、华为云的IOT、腾讯云IOT等等。 这些大厂家的物联网服务器都支持标准的MQTT协议接入,大家不用自己搭建MQTT服务器可以直接使用这些现成的服务器接入装备开发黑白常的方便的。
我在这几年也写了很多物联网开发的案例,不管是、中国移动的onenet、阿里云的IOT、百度的天工物接入、华为云的IOT、腾讯云IOT 这些服务器都写了很多教程,演示装备接入平台,完成装备上云,手机APP对接,电脑步调对接,微信小步调接入,实现远程数据监测控制等等。这些案例都放在了智能家居与物联网项目实战专栏里。 这些案例里装备实现上云的方式主要是两种方式:HTTP协议、MQTT协议方式上云。 MQTT协议是标准的物联网协议,支持双向数据传输,也就是可以上传数据到服务器,也可以接收服务器下发的控制命令完成远程控制。 我写的这些案例里硬件端联网的模块主要是用到了4G模块、ESP8266-WIFI模块、GSM模块、NBIOT模块等等,通过它们联网,让单片机装备实现上云。
这些装备中有些是支持MQTT协议的(也就是自己的固件就支持MQTT协议),有些不支持的(可能有固件支持,需要自己烧写)。 如果说固件不支持MQTT协议,但只要装备支持TCP协议,那么我们也可以自己封装MQTT协议完成与MQTT服务器之间的通讯。 好比:ESP8266-WIFI模块,正常的官方默认固件中,ESP8266-WIFI是不支持MQTT协议的,如果我们不烧写固件的情况下,怎样自己实现MQTT协议上云? 这篇文章就先容,通过TCP协议自己封装MQTT协议报文,完成数据上云。 直接从0开始手撸MQTT协议报文,组合报文,完成与服务器之间的通讯。
MQTT协议也是分为两种,分MQTT和MQTTS,就像HTTP协议一样也分HTTP和HTTPS,那么区别呢? 带S就是要支持SSL协议,支持认证,更加安全,那么复杂度自然就上来了。 MQTT协议的端口是1883,MQTTS的端口是8883。 当前这篇文章先容非加密的MQTT协议,也就是1883端口。MQTTS协议也手撸不了,这玩意涉及到SSL协议,那就很复杂了,如果要用,直接使用现成的开源库就行,但本篇文章不讨论这个,后面文章再单独先容怎样实现MQTTS协议。
本篇文章的环境是在windows下,使用VS2022开发步调,使用windows下网络编程接口作为基础,封装MQTT协议连接华为云MQTT服务器,完成数据上云。
以是,大家只要有一台windows电脑,电脑上安装了VS开发环境,任何版本都可以(VS2010、VS2013、VS2015、VS2017、VS2019、VS2022等等都可以的) 跟着这篇文章进行学习,不需要其他任何硬件装备,我们现在是单纯的去学习MQTT协议。
前提呢,大家还是要懂得一点网络编程的知识,相识TCP协议,大致知道TCP协议通讯的简单过程,如果网络编程知识是完全0基础,发起先看另一篇文章学习下网络编程(我博客有专门讲解网络编程相干知识的文章)。 这篇文章也会简单先容下TCP协议和根本网络编程知识
那么接下来,我们就开始动手学习吧。
二、搭建开发环境

如果大家电脑已经有开发环境,这章节直接忽略。 这里贴出来为了给 完全0基础 的小伙伴学习
我这里先容下我用的环境安装过程。 所有版本的VS都可以的。
我当前环境是在Windows下,IDE用的是地表最强IDE VS2022。
下载地点:https://visualstudio.microsoft.com/zh-hans/downloads/

由于我这里只需要用到C++和C语言编程,那么安装的时候可以自己选择需要安装的包。

安装好之后,创建项目。


三、网络编程基础概念科普

如果是老手了,这章节可以直接忽略。 如果对网络编程是 0基础 的小伙伴,那么就认真看一下,相识下根本知识。
3.1 什么是网络编程

网络编程是通过使用IP地点和端口号等网络信息,使两台以上的盘算机可以或许相互通讯,按照规定的协议交换数据的编程方式。
在网络编程中,步调员使用各种协议和技能,使得不同的装备可以通过网络进行数据交换和信息共享。
要实现网络编程,步调员需要相识并掌握各种网络通讯协议,好比TCP/IP协议族,包罗TCP、UDP、IP等,这些协议是实现装备间通讯的基础。网络编程内部涉及到数据的打包、组装、发送、接收、解析等一系列过程,以实现信息的正确传输。
在TCP/IP协议族中,TCP和UDP是位于IP协议之上的传输层协议。 在OSI模子中,传输层是第四层,负责总体数据传输和数据控制,为会话层等高三层提供可靠的传输服务,为网络层提供可靠的目的地点信息。在TCP/IP协议族中,TCP和UDP正是位于这一层的协议。
这篇文章主要先容 TCP 和 UDP 协议 以及 使用方法。

3.2 TCP 和 UDP协议先容

TCP协议
TCP(传输控制协议)是一种面向连接的、可靠的传输层协议。在传输数据之前需要先创建连接,确保数据的次序和完整性。TCP通过三次握手创建连接,并通过确认、超时和重传机制确保数据的可靠传输。TCP采用流量控制和拥塞控制机制,以避免网络拥塞,确保数据的顺利传输。由于TCP的这些特性,通常被应用于需要高可靠性和次序性的应用,如网页浏览、电子邮件等。
UDP协议
UDP(用户数据报协议)是一种无连接的、不可靠的传输层协议。与TCP不同,UDP在传输数据之前不需要创建连接,直接将数据打包成数据报并发送出去。因此,UDP没有TCP的那些确认、超时和重传机制,也就不包管数据的可靠传输。UDP也没有TCP的流量控制和拥塞控制机制。由于UDP的简单性和高效性,通常被应用于实时性要求较高,但对数据可靠性要求不高的应用,如语音通话、视频直播等。
3.3 TCP通讯的实现过程

要实现TCP通讯,两头必须要知道对方的IP和端口号:
(1)IP地点:TCP协议是基于IP协议进行通讯的,因此需要知道对方的IP地点,才能创建连接。
(2)端口号:每个TCP连接都有一个唯一的端口号,用于标识进程和应用步调。创建连接时,需要指定当地端口号和远端端口号。
(3)应用层协议:TCP协议只提供数据传输服务,应用步调需要界说自己的应用层协议,用于解析报文和处理数据。例如,HTTP协议就是基于TCP协议的应用层协议。
在正常的TCP通讯过程中,第一步需要创建连接,这个过程称为“三次握手”。创建连接时,客户端向服务器发送一个SYN包,表示哀求创建连接;服务器接收到SYN包后,向客户端发送一个ACK包,表示确认收到了SYN包;最后客户端再向服务器发送一个ACK包,表示确认收到了服务器的ACK包,此时连接创建乐成。创建连接后,数据传输就可以开始了。

四、Windows下的网络编程相干API先容

由于当前文章是在Windows下先容MQTT协议,要用到网络编程的知识,需要使用Windows系统提供的API完成网络编程。Windows自己就有一套原生的网络编程接口可以直接使用。 在Linux系统下也是一样,都有自己一套原生的网络编程接口。
如果没有打仗这些API的小伙伴不用慌~~~。 你至少用过C语言里的printf、scanf、strlen之类的函数吧? 下面先容的这些网络编程API函数其实和它们没什么区别,都是普通的函数,功能不一样而已。 对你来说,只是多学了几个库函数,只要相识每个函数的功能就可以调用了。
那么接下来就学习一下常用的网络编程相干的函数。
微软的官方文档地点:https://learn.microsoft.com/zh-cn/windows/win32/api/_winsock/

4.1 常用的函数先容

在Windows下进行网络编程,可以使用Winsock API(Windows Sockets API)来实现。Winsock API是Windows平台上的标准网络编程接口,提供了一系列函数和数据结构,用于创建、连接、发送和接收网络数据等操作。
下面是常用的Winsock API接口函数:
(1)WSAStartup:初始化Winsock库,必须在使用其他Winsock函数之前调用。
(2)socket:创建一个套接字,用于网络通讯。
(3)bind:将套接字与当地地点(IP地点和端口号)绑定。
(4)listen:开始监听连接哀求,将套接字设置为被动模式。
(5)accept:接受客户端的连接哀求,创建一个新的套接字用于与客户端通讯。
(6)connect:与远程服务器创建连接。
(7)send:发送数据到已连接的套接字。
(8)recv:从已连接的套接字接收数据。
(9)sendto:发送数据到指定的目的地点。
(10)recvfrom:从指定的地点接收数据。
(11)closesocket:关闭套接字。
(12)getaddrinfo:根据主机名和服务名获取地点信息。
(13)gethostbyname:根据主机名获取主机的IP地点。
(14)gethostname:获取当地主机名。
4.2 函数参数先容

下面是常用的几个Winsock API函数及其函数原型和参数含义的先容:
(1)WSAStartup:
  1. int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData);
复制代码

(2)socket:
  1. SOCKET socket(int af, int type, int protocol);
复制代码

(3)bind:
  1. int bind(SOCKET s, const struct sockaddr* name, int namelen);
复制代码

(4)listen:
  1. int listen(SOCKET s, int backlog);
复制代码

(5)accept:
  1. SOCKET accept(SOCKET s, struct sockaddr* addr, int* addrlen);
复制代码

(6)connect:
  1. int connect(SOCKET s, const struct sockaddr* name, int namelen);
复制代码

(7)send:
  1. int send(SOCKET s, const char* buf, int len, int flags);
复制代码

(8)recv:
  1. int recv(SOCKET s, char* buf, int len, int flags);
复制代码

(9)sendto:
  1. int sendto(SOCKET s, const char* buf, int len, int flags, const struct sockaddr* to, int tolen);
复制代码

(10)recvfrom:
  1. int recvfrom(SOCKET s, char* buf, int len, int flags, struct sockaddr* from, int* fromlen);
复制代码

(11)closesocket:
  1. int closesocket(SOCKET s);
复制代码

(12)getaddrinfo:
  1. int getaddrinfo(const char* nodename, const char* servname, const struct addrinfo* hints, struct addrinfo** res);
复制代码

(13)gethostbyname:
  1. struct hostent* gethostbyname(const char* name);
复制代码

(14)gethostname:
  1. int gethostname(char* name, int namelen);
复制代码

4.3 编写代码体验网络编程

上面相识了这些函数,可能不知道怎样使用。 这里就写一个例子,以TCP客户端的身份去连接TCP服务器,完成数据传输。
**下面代码实现一个TCP客户端,连接到指定的服务器并完成通讯。 ** 可以直接将代码贴到你的工程里,运行,体验结果。
  1. #include <iostream>
  2. #include <winsock2.h>
  3. #include <ws2tcpip.h>
  4. #pragma comment(lib, "ws2_32.lib") //告诉编译器链接Winsock库
  5. int main()
  6. {
  7.     WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息
  8.     int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值
  9.     if (result != 0)
  10.     {
  11.         std::cout << "WSAStartup failed: " << result << std::endl; //输出错误信息并退出程序
  12.         return 1;
  13.     }
  14.     SOCKET connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值
  15.     if (connectSocket == INVALID_SOCKET)
  16.     {
  17.         std::cout << "socket failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  18.         WSACleanup(); //清除Winsock库
  19.         return 1;
  20.     }
  21.     sockaddr_in service; //创建一个结构体变量,用于存储服务器地址信息
  22.     service.sin_family = AF_INET; //指定地址族为IPv4
  23.     inet_pton(AF_INET, "127.0.0.1", &service.sin_addr); //将字符串类型的IP地址转换为二进制网络字节序的IP地址,并存储在结构体中
  24.     service.sin_port = htons(12345); //将端口号从主机字节序转换为网络字节序,并存储在结构体中
  25.     result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //连接到服务器,检查返回值
  26.     if (result == SOCKET_ERROR)
  27.     {
  28.         std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  29.         closesocket(connectSocket); //关闭套接字
  30.         WSACleanup(); //清除Winsock库
  31.         return 1;
  32.     }
  33.     std::cout << "Connected to server." << std::endl; //连接成功,输出消息
  34.     char sendBuffer[1024] = "Hello, server!"; //创建发送缓冲区,存储待发送的数据
  35.     result = send(connectSocket, sendBuffer, sizeof(sendBuffer), 0); //向服务器发送数据,检查返回值
  36.     if (result == SOCKET_ERROR)
  37.     {
  38.         std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  39.         closesocket(connectSocket); //关闭套接字
  40.         WSACleanup(); //清除Winsock库
  41.         return 1;
  42.     }
  43.     char recvBuffer[1024]; //创建接收缓冲区,用于存储从服务器接收到的数据
  44.     result = recv(connectSocket, recvBuffer, sizeof(recvBuffer), 0); //从服务器接收数据,检查返回值
  45.     if (result == SOCKET_ERROR)
  46.     {
  47.         std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  48.         closesocket(connectSocket); //关闭套接字
  49.         WSACleanup(); //清除Winsock库
  50.         return 1;
  51.     }
  52.     std::cout << "Received message from server: " << recvBuffer << std::endl; //输出从服务器收到的数据
  53.     closesocket(connectSocket); //关闭套接字
  54.     WSACleanup(); //清除Winsock库
  55.     return 0;
  56. }
复制代码
运行结果:

五、访问华为云IOT服务器创建一个产品和装备

5.2 开通物联网服务

地点: https://www.huaweicloud.com/product/iothub.html

现在可以免费创建的,按需付费。 只要是不高出规格,就可以免费使用,对于个人项目Demo来说,完全够用的

点击立即创建之后,会开始创建实例,需要等候片刻,再刷新浏览器就可以看到创建乐成了。

创建完成,点击实例,即可进入实例详情页面。

进入实例详情页面后,可以看到接入信息的描述,我们当前装备预备采用MQTT协议接入华为云平台,这里可以看到MQTT协议的地点和端口号等信息。

总结:
  1. 端口号:   MQTT (1883)
  2. 接入地址: 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com
复制代码
**根据域名地点得到IP地点信息: ** 打开windows的CMD窗口
  1. Microsoft Windows [版本 10.0.19045.3693]
  2. (c) Microsoft Corporation。保留所有权利。
  3. C:\Users\11266>ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com
  4. 正在 Ping 252d4bd608.st1.iotda-device.cn-north-4.myhuaweicloud.com [117.78.5.125] 具有 32 字节的数据:
  5. 来自 117.78.5.125 的回复: 字节=32 时间=41ms TTL=94
  6. 来自 117.78.5.125 的回复: 字节=32 时间=38ms TTL=94
  7. 来自 117.78.5.125 的回复: 字节=32 时间=37ms TTL=94
  8. 来自 117.78.5.125 的回复: 字节=32 时间=39ms TTL=94
  9. 117.78.5.125 的 Ping 统计信息:
  10.     数据包: 已发送 = 4,已接收 = 4,丢失 = 0 (0% 丢失),
  11. 往返行程的估计时间(以毫秒为单位):
  12.     最短 = 37ms,最长 = 41ms,平均 = 38ms
  13. C:\Users\11266>
复制代码

MQTT协议接入端口号有两个,1883黑白加密端口,8883是证书加密端口,单片机无法加载证书,以是使用1883端口比较符合。 接下来的ESP8266就采用1883端口连接华为云物联网平台。
5.3 创建产品

(1)创建产品

点击产品页,再点击左上角创建产品。

(2)填写产品信息

根据自己产品名字填写,下面的装备类型选择自界说类型。

(3)产品创建乐成

创建乐成之后,点击产品的名字就可以进入到产品的详情页。

(4)添加自界说模子

产品创建完成之后,点击进入产品详情页面,翻到最下面可以看到模子界说。

模子简单来说: 就是存放装备上传到云平台的数据。
先点击自界说模子。

再创建一个服务ID。

接着点击新增属性。

这里就创建一个温度的属性。我们这个装备用来测温的。


3.4 添加装备

产品是属于上层的抽象模子,接下来在产品模子下添加现实的装备。添加的装备终极需要与真实的装备关联在一起,完成数据交互。
(1)注册装备


(2)根据自己的装备填写


(3)保存装备信息

创建完毕之后,点击保存并关闭,得到创建的装备密匙信息。该信息在后续生成MQTT三元组的时候需要使用。
  1. {
  2.     "device_id": "65697df3585c81787ad4da82_stm32",
  3.     "secret": "12345678"
  4. }
复制代码
点击装备名称可以进入到装备详情页。

3.5 MQTT协议主题订阅与发布

(1)华为云平台MQTT协议使用限定

描述限定支持的MQTT协议版本3.1.1与标准MQTT协议的区别支持Qos 0和Qos 1支持Topic自界说不支持QoS2不支持will、retain msgMQTTS支持的安全品级采用TCP通道基础 + TLS协议(最高TLSv1.3版本)单帐号每秒最大MQTT连接哀求数无穷制单个装备每分钟支持的最大MQTT连接数1单个MQTT连接每秒的吞吐量,即带宽,包含直连装备和网关3KB/sMQTT单个发布消息最大长度,高出此大小的发布哀求将被直接拒绝1MBMQTT连接心跳时间发起值心跳时间限定为30至1200秒,保举设置为120秒产品是否支持自界说Topic支持消息发布与订阅装备只能对自己的Topic进行消息发布与订阅每个订阅哀求的最大订阅数无穷制 (2)主题订阅格式

帮助文档地点:https://support.huaweicloud.com/devg-iothub/iot_02_2200.html

对于装备而言,一般会订阅平台下发消息给装备 这个主题。
装备想接收平台下发的消息,就需要订阅平台下发消息给装备 的主题,订阅后,平台下发消息给装备,装备就会收到消息。
如果装备想要知道平台下发的消息,需要订阅上面图片里标注的主题。
  1. 以当前设备为例,最终订阅主题的格式如下:
  2. $oc/devices/{device_id}/sys/messages/down
  3. 最终的格式:
  4. $oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down
复制代码

(3)主题发布格式

对于装备来说,主题发布表示向云平台上传数据,将最新的传感器数据,装备状态上传到云平台。
这个操作称为:属性上报。
帮助文档地点:https://support.huaweicloud.com/usermanual-iothub/iot_06_v5_3010.html

根据帮助文档的先容, 当前装备发布主题,上报属性的格式总结如下:
  1. 发布的主题格式:
  2. $oc/devices/{device_id}/sys/properties/report
  3. 最终的格式:
  4. $oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report
  5. 发布主题时,需要上传数据,这个数据格式是JSON格式。
  6. 上传的JSON数据格式如下:
  7. {
  8.   "services": [
  9.     {
  10.       "service_id": <填服务ID>,
  11.       "properties": {
  12.         "<填属性名称1>": <填属性值>,
  13.         "<填属性名称2>": <填属性值>,
  14.         ..........
  15.       }
  16.     }
  17.   ]
  18. }
  19. 根据JSON格式,一次可以上传多个属性字段。 这个JSON格式里的,服务ID,属性字段名称,属性值类型,在前面创建产品的时候就已经介绍了,不记得可以翻到前面去查看。
  20. 根据这个格式,组合一次上传的属性数据:
  21. {"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
复制代码
3.6 MQTT三元组

MQTT协议登录需要填用户ID,装备ID,装备密码等信息,就像我们平时登录QQ,微信一样要输入账号密码才能登录。MQTT协议登录的这3个参数,一般称为MQTT三元组。
接下来先容,华为云平台的MQTT三元组参数怎样得到。
(1)MQTT服务器地点

要登录MQTT服务器,起首记得先知道服务器的地点是多少,端口是多少。
帮助文档地点:https://console.huaweicloud.com/iotdm/?region=cn-north-4#/dm-portal/home

MQTT协议的端口支持1883和8883,它们的区别是:8883 是加密端口更加安全。但是单片机上使用比较困难,以是当前的装备是采用1883端口进连接的。
根据上面的域名和端口号,得到下面的IP地点和端口号信息: 如果装备支持填写域名可以直接填域名,不支持就直接填写IP地点。 (IP地点就是域名解析得到的)
  1. 华为云的MQTT服务器地址:117.78.5.125
  2. 华为云的MQTT端口号:1883
复制代码
(2)生成MQTT三元组

华为云提供了一个在线工具,用来生成MQTT鉴权三元组: https://iot-tool.obs-website.cn-north-4.myhuaweicloud.com/
打开这个工具,填入装备的信息(也就是刚才创建完装备之后保存的信息),点击生成,就可以得到MQTT的登录信息了。
下面是打开的页面:

填入装备的信息: (上面两行就是装备创建完成之后保存得到的)
直接得到三元组信息。

得到三元组之后,装备端通过MQTT协议登录鉴权的时候,填入参数即可。
  1. ClientId  65697df3585c81787ad4da82_stm32_0_0_2023120106
  2. Username  65697df3585c81787ad4da82_stm32
  3. Password  12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
复制代码
到此,云平台的部署已经完成,装备已经可以正常上传数据了。
(3)MQTT登录测试参数总结

  1. IP地点:117.78.5.125端口号:1883ClientId  65697df3585c81787ad4da82_stm32_0_0_2023120106
  2. Username  65697df3585c81787ad4da82_stm32
  3. Password  12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58
  4. 订阅主题:$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down发布主题:$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report发布数据:{"services": [{"service_id": "stm32","properties":{"TEMP":36.2}}]}
复制代码
六、开始学习MQTT协议

6.1 先相识下MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅模式的“轻量级”的消息协议,可在发布者和订阅者之间转达消息。MQTT协议构建于TCP/IP协议上,由IBM在1999年发布,当前已经成为了一种主流的物联网通讯协议。
MQTT最大的优点在于,可以或许以极少的代码和有限的带宽,为连接远程装备提供实时可靠的消息服务。它是一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型装备、移动应用等方面有较广泛的应用。由于其小巧、高效和可靠的特点,MQTT在物联网领域得到了广泛的应用。在很多情况下,包罗受限的环境中,如:机器与机器(M2M)通讯和物联网(IoT),且已经广泛应用于通过卫星链路通讯传感器、偶尔拨号的医疗装备、智能家居、及一些小型化装备中。
MQTT协议的工作原理是基于发布/订阅模式。在这种模式下,发布者可以向一个或多个主题发布消息,而订阅者可以订阅这些主题以接收相干消息。这种模式允很多个发布者和订阅者同时存在,实现了一种灵活的消息转达机制。别的,MQTT协议还支持三种消息转达质量品级,可根据需要进行选择。
MQTT协议的另一个重要特点是其轻量级和简单的设计。它的消息头非常小,只有2个字节,这意味着在网络带宽有限的环境下也可以或许实现高效的消息转达。别的,MQTT协议还支持长期化连接和消息队列等高级功能,可进一步提高消息的可靠性和转达效率。
MQTT协议的应用范围非常广泛。例如,在智能家居领域,可以使用MQTT协议将各种智能装备连接在一起,实现装备的远程控制和监测。在工业领域,MQTT协议可以用于实现装备的远程监控和维护,提高生产效率和产品质量。在智慧城市建设中,MQTT协议可以用于交通管理、环境监测和公共安全等方面,提拔城市管理和住民生活的质量。
6.2 MQTT协议官网先容

现在MQTT协议主要是3.1.1 和 5.0 两个版本。 本篇文章是先容3.1.1版本的MQTT协议。 各大标准的MQTT服务器都支持3.1.1.
链接:https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

在文档的下方就是先容MQTT协议的每个包怎样封装的。照着协议写代码就行了。

6.3 需要实现的3个函数

整个MQTT协议里,主要是实现3个函数就行了(其他的接口看自己需求)。
下面列出的3个函数,在一般的MQTT通讯里是必备的。我们只要实现了这3个函数,那么完成根本的MQTT通讯就没有问题了。
  1. //发布主题
  2. unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
  3. //订阅或者取消订阅主题
  4. unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
  5. //登录MQTT服务器
  6. unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
复制代码
6.4 检察协议文档,相识怎样组合协议报文

【1】打开文档,打开目录,翻到第3章节—MQTT控制报文。

【2】在第3章,控制报文里,找到对应的子章节,也就是我们接线需要照着文档实现协议组包的章节。

6.5 实现MQTT_Connect函数

先认真阅读文档: 相识这个报文的规则,以及出现错误之后错误代码的含义。
客户端到服务端的网络连接创建后,客户端发送给服务端的第一个报文必须是 CONNECT 报文。 在一个网络连接上,客户端只能发送一次CONNECT 报文。服务端必须将客户端发送的第二个 CONNECT 报文看成协议违规处理并断开客户端的连接。

接下来就按次序检察文档,相识协议报文里每个字节怎样组成的。

接下来就开始编写代码,按照文档的提示,组合报文。
起首,界说一个数组,用来存放我们按照MQTT协议封装的数据。
  1. unsigned char mqtt_txbuf[256];//接收数据缓存区
复制代码
在界说一个变量,用来保存数组的下标,每赋值一次,就自增++;
  1. int mqtt_txlen = 0;
复制代码
【1】固定报文头

文档说了,数组的第一字节固定为:0x10。 (不懂为什么是0X10,仔细看下面文档里的红色框框,文档已经把二进制位的每个位都标注出来了,如果还是看不懂,就需要补习一下C语言的位运算,熟悉位运算之后,再来看应该就很容易了)。

那么第一行代码就是这么写:
  1. //固定报头
  2. //控制报文类型
  3. mqtt_txbuf[mqtt_txlen++] = 0x10;                //MQTT Message Type CONNECT
复制代码
接下来第2个字节,文档让看2.2.3小节的说明,那么翻到2.2.3小节,相识怎样填写剩余长度值。


根据文档说明,那么编写代码如下:
  1. //剩余长度(不包括固定头部)
  2. do
  3. {
  4.         unsigned char encodedByte = DataLen % 128;
  5.         DataLen = DataLen / 128;
  6.         // if there are more data to encode, set the top bit of this byte
  7.         if (DataLen > 0)
  8.                 encodedByte = encodedByte | 128;
  9.         mqtt_txbuf[mqtt_txlen++] = encodedByte;
  10. } while (DataLen > 0);
复制代码
好了,现在第2个字节赋值已经完毕。
【2】协议名

那么接着看剩下的字节怎样填写:

根据文档说明,编写代码如下: (继承按次序赋值就行了)
  1. //可变报头
  2. //协议名
  3. mqtt_txbuf[mqtt_txlen++] = 0;                // Protocol Name Length MSB   
  4. mqtt_txbuf[mqtt_txlen++] = 4;           // Protocol Name Length LSB   
  5. mqtt_txbuf[mqtt_txlen++] = 'M';                // ASCII Code for M   
  6. mqtt_txbuf[mqtt_txlen++] = 'Q';                // ASCII Code for Q   
  7. mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T   
  8. mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T   
复制代码
【3】协议级别

看文档说明。

编写代码:
  1. //协议级别
  2. mqtt_txbuf[mqtt_txlen++] = 4;                        // MQTT Protocol version = 4   
复制代码
【4】连接标志

关于每个标志的含义,文档向下翻,下面有详细的先容,每个标志位的含义。


编写代码:
  1. //连接标志
  2. mqtt_txbuf[mqtt_txlen++] = 0xc2;                // conn flags
复制代码
【5】保持连接的时间

检察文档说明:

编写代码:
  1. mqtt_txbuf[mqtt_txlen++] = 0;                        // Keep-alive Time Length MSB   
  2. mqtt_txbuf[mqtt_txlen++] = 100;                // Keep-alive Time Length LSB  100S心跳包
复制代码
【6】 可变报头非规范示例


【7】最后部门:填写客户端ID、用户名、密码。

这里面提到的客户端标识符、用户名、密码。 就是在前面章节创建华为云IOT服务器,得到的MQTT三元组信息。
检察文档:

编写代码:
  1. mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB   
  2. mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB         
  3. memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);
  4. mqtt_txlen += ClientIDLen;
  5. if (UsernameLen > 0)
  6. {
  7.         mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);                //username length MSB   
  8.         mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);            //username length LSB   
  9.         memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);
  10.         mqtt_txlen += UsernameLen;
  11. }
  12. if (PasswordLen > 0)
  13. {
  14.         mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);                //password length MSB   
  15.         mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);            //password length LSB  
  16.         memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);
  17.         mqtt_txlen += PasswordLen;
  18. }
复制代码
【8】相应

上面报文封装完毕之后,直接就可以通过网络接口发送出去就行了。
前提是创建套接字,连接上MQTT服务器,然后再将上面封装好的报文发送过去就行了。
关于怎样Windows下怎样创建套接字,连接服务器,前面章节专门讲过了,忘记了可以归去再看看。
编写代码发送出去:
  1. MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
复制代码
这个函数里面的代码: 就是直接调用的网络接口函数发送的。
  1. int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值
  2. if (result == SOCKET_ERROR)
  3. {
  4.         std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  5.         return -1;
  6. }
复制代码
发送过去之后,服务器肯定有返回值的。 失败?乐成? 那么我们得判定。
通过相应章节的文档说明,这里提到一个CONNACK相应报文,是由服务器下发给客户端的。

在3.2小节,讲解了CONNACK报文的字段含义。如果客户端的连接报文是正确的,服务器会下发0x20 0x00 的确认连接报文给客户端,告诉客户端连接乐成。

编写代码: 写代码接收服务器返回的数据,判定返回的数据是不是符合要求。
  1. Client_GetData(buff);//从服务器获取数据
  2. const unsigned char parket_connetAck[] = { 0x20,0x02};
  3. if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接成功                          
  4. {
  5.         return 0;//连接成功
  6. }
复制代码
CONNACK 报文除了固定报文头之外还有可变报头。也就是后面还有2个字节。 我们可以继承看文档下面的先容。
一个叫连接确认标志,一个叫连接返回码。正确的情况下,这两个值应该为0x00 0x00.

关于返回码的值,我们可以对它进行判定。如果连接失败,也可以知道具体的原因。

【9】完整代码

  1. /*函数功能: 登录服务器函数返回值: 0表示乐成 1表示失败*/unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password){        unsigned short i, j;        int ClientIDLen = (int)strlen(ClientID);        int UsernameLen = (int)strlen(Username);        int PasswordLen = (int)strlen(Password);        unsigned int DataLen;        mqtt_txlen = 0;        unsigned int size = 0;        unsigned char buff[256];                //可变报头+Payload  每个字段包含两个字节的长度标识        DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);        //固定报头        //控制报文类型        mqtt_txbuf[mqtt_txlen++] = 0x10;                //MQTT Message Type CONNECT        //剩余长度(不包罗固定头部)        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        //可变报头        //协议名        mqtt_txbuf[mqtt_txlen++] = 0;                // Protocol Name Length MSB            mqtt_txbuf[mqtt_txlen++] = 4;           // Protocol Name Length LSB            mqtt_txbuf[mqtt_txlen++] = 'M';                // ASCII Code for M            mqtt_txbuf[mqtt_txlen++] = 'Q';                // ASCII Code for Q            mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T            mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T            //协议级别        mqtt_txbuf[mqtt_txlen++] = 4;                        // MQTT Protocol version = 4            //连接标志        mqtt_txbuf[mqtt_txlen++] = 0xc2;                // conn flags         mqtt_txbuf[mqtt_txlen++] = 0;                        // Keep-alive Time Length MSB            mqtt_txbuf[mqtt_txlen++] = 100;                // Keep-alive Time Length LSB  100S心跳包          mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB            mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB                  memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);        mqtt_txlen += ClientIDLen;        if (UsernameLen > 0)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);                //username length MSB                    mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);            //username length LSB                    memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);                mqtt_txlen += UsernameLen;        }        if (PasswordLen > 0)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);                //password length MSB                    mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);            //password length LSB                  memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);                mqtt_txlen += PasswordLen;        }        for (i = 0; i < 5; i++)        {                memset(mqtt_rxbuf, 0, mqtt_rxlen);                MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  2.                 size = Client_GetData(buff);//从服务器获取数据                if (size <= 0)continue;                memcpy(mqtt_rxbuf, buff, size);                printf("登录应答:\r\n");                for (j = 0; j < size; j++)                {                        printf("%#X ", buff[j]);                }                printf("\r\n");                if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接乐成                                           {                        return 0;//连接乐成                }        }        return 1;}
复制代码
6.6 实现MQTT_PublishData函数

【1】检察文档说明

和前面一章节一样,看文档说明编写代码。

【2】固定报文头

通过文档相识到,发布消息的固定报文头由2个字节组成。第一个字节每个位的组成含义可看文档的表格先容。

最高4位是MQTT控制报文类型,固定的值:0x3
后面4个字节分为是DUP(重发标志)、QOS品级(服务质量品级)、RETAIN(消息的保存标志)。
DUP(重发标志):

QOS品级(服务质量品级):

RETAIN(保存标志–固定位0):

那么经过文档的解释,我们编写代码如下: (关于后面4个字节可以根据自己的需求设置)
  1. //固定报头
  2. //控制报文类型
  3. mqtt_txbuf[mqtt_txlen++] = 0x30;    // MQTT Message Type PUBLISH  
复制代码
【3】剩余字段长度

固定报文头的第2个字节是填写剩余字段长度。

在文档里对剩余长度字段盘算的先容:

剩余长度字段: 等于可变报头的长度加上有效载荷的长度。
这里先要相识: 什么是可变报头? 什么是有效载荷的长度?
往下翻文档,可看到可变报头的先容。 **可变报头是:按次序包含主题名和报文标识符。 ** 而报文标识符只有在QOS为1大概2的时候才有。

再往下翻文档,可看到有效载荷的长度的先容。有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样盘算:用固定 报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的 PUBLISH 报文是正当的。

经过文档的先容,我们知道了剩余长度字段怎样先容。
那么这个剩余长度字段盘算出来之后,怎样赋值到报文数组里去? 其着实Connect报文的固定字段第2个字节也是要天剩余长度字段,在Connect报文的章节已经先容过,在文档的2.2.3小节的有详细说明,那么翻到2.2.3小节,相识怎样填写剩余长度值。 (其实我们上一节已经先容了一遍)


接下来就编写代码:
  1. unsigned int topicLength = (int)strlen(topic);  //计算主题的长度
  2. unsigned int messageLength = (int)strlen(message); //计算发送的消息长度
  3. unsigned int DataLen; //保存最终长度
  4. //有效载荷的长度这样计算:用固定报头中的剩余长度字段的值减去可变报头的长度
  5. //QOS为0时没有标识符
  6. //数据长度             主题名   报文标识符   有效载荷
  7. if (qos)        DataLen = (2 + topicLength) + 2 + messageLength;
  8. else        DataLen = (2 + topicLength) + messageLength;
  9. //剩余长度
  10. do
  11. {
  12.     unsigned char encodedByte = DataLen % 128;
  13.     DataLen = DataLen / 128;
  14.     // if there are more data to encode, set the top bit of this byte
  15.     if (DataLen > 0)
  16.         encodedByte = encodedByte | 128;
  17.     mqtt_txbuf[mqtt_txlen++] = encodedByte;
  18. } while (DataLen > 0);
复制代码
【4】可变报头


PUBLISH 报文可变报头非规范示例:

编写代码:
  1. mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB
  2. mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB
  3. memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题
  4. mqtt_txlen += topicLength;
  5. //报文标识符
  6. if (qos)
  7. {
  8.         mqtt_txbuf[mqtt_txlen++] = BYTE1(id);
  9.         mqtt_txbuf[mqtt_txlen++] = BYTE0(id);
  10.         id++;
  11. }
复制代码
接着就添加有效载荷,也就是现实发送的消息内容。

编写代码:
  1. memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);
  2. mqtt_txlen += messageLength;
复制代码
最后将报文发送出去:
  1. MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
复制代码
【5】完整代码

  1. //MQTT发布数据打包函数//topic   主题 //message 消息//qos     消息品级 unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos){        unsigned int topicLength = (int)strlen(topic);        unsigned int messageLength = (int)strlen(message);        unsigned short id = 0;        unsigned int DataLen;        mqtt_txlen = 0;        printf("上报JSON消息长度:%d\r\n", messageLength);        printf("message=%s\r\n", message);        //有效载荷的长度这样盘算:用固定报头中的剩余长度字段的值减去可变报头的长度        //QOS为0时没有标识符        //数据长度             主题名   报文标识符   有效载荷        if (qos)        DataLen = (2 + topicLength) + 2 + messageLength;        else        DataLen = (2 + topicLength) + messageLength;        //固定报头        //控制报文类型        mqtt_txbuf[mqtt_txlen++] = 0x30;    // MQTT Message Type PUBLISH          //剩余长度        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB        mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB         memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题        mqtt_txlen += topicLength;        //报文标识符        if (qos)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(id);                mqtt_txbuf[mqtt_txlen++] = BYTE0(id);                id++;        }        memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);        mqtt_txlen += messageLength;        MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  2.         return mqtt_txlen;}
复制代码
【6】发布确认

如果消息质量大于0,那么可以继承看文档下面的发布确认,对本次发送的消息进行相应处理。


6.7 实现 MQTT_SubscribeTopic 函数

【1】检察文档:订阅主题的格式

和前面一样,检察文档的说明,编写代码。

【2】检察文档:取消订阅的格式


【3】固定报头

订阅主题和取消订阅主题格式一样的,只是固定报头不一样。
可以封装一个函数,传入一个参数实现两种功能。
编写判定代码:
  1. //固定报头
  2. //控制报文类型
  3. if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅
  4. else        mqtt_txbuf[mqtt_txlen++] = 0xA2;    //取消订阅
复制代码
剩余长度字段:

编写代码:
  1. unsigned int topiclen = (int)strlen(topic);
  2. unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度
复制代码
剩余长度字段的填写规则与前面一样。
编写代码:
  1. //剩余长度
  2. do
  3. {
  4.     unsigned char encodedByte = DataLen % 128;
  5.     DataLen = DataLen / 128;
  6.     // if there are more data to encode, set the top bit of this byte
  7.     if (DataLen > 0)
  8.         encodedByte = encodedByte | 128;
  9.     mqtt_txbuf[mqtt_txlen++] = encodedByte;
  10. } while (DataLen > 0);
复制代码
【4】可变报头


编写代码:
  1. //可变报头
  2. mqtt_txbuf[mqtt_txlen++] = 0;                        //消息标识符 MSB
  3. mqtt_txbuf[mqtt_txlen++] = 0x01;        //消息标识符 LSB
复制代码

【5】完整代码

  1. /*函数功能: MQTT订阅/取消订阅数据打包函数函数参数:        topic       主题        qos         消息品级 0:最多分发一次  1: 至少分发一次  2: 仅分发一次        whether     订阅/取消订阅哀求包 (1表示订阅,0表示取消订阅)返回值: 0表示乐成 1表示失败*/unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether){        unsigned char i, j;        mqtt_txlen = 0;        unsigned int size = 0;        unsigned char buff[256];        unsigned int topiclen = (int)strlen(topic);        unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度        //固定报头        //控制报文类型        if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅        else        mqtt_txbuf[mqtt_txlen++] = 0xA2;    //取消订阅        //剩余长度        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        //可变报头        mqtt_txbuf[mqtt_txlen++] = 0;                        //消息标识符 MSB        mqtt_txbuf[mqtt_txlen++] = 0x01;        //消息标识符 LSB        //有效载荷        mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB        mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB           memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);        mqtt_txlen += topiclen;        if (whether)        {                mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别        }        for (i = 0; i < 100; i++)        {                memset(mqtt_rxbuf, 0, mqtt_rxlen);                MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  2.                 //printf("订阅消息发布乐成\n");                size = Client_GetData(buff);//从服务器获取数据                if (size <= 0)                {                        continue;                }                memcpy(mqtt_rxbuf, buff, size);                printf("订阅应答:\r\n");                for (j = 0; j < size; j++)                {                        printf("%#X ", buff[j]);                }                printf("\r\n");                if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接乐成                                           {                        return 0;//连接乐成                }                Sleep(1000);        }        return 1; //失败}
复制代码
七、运行项目、连接华为云服务器

7.1 整个项目的完整代码

前面章节陆续已经编写好了重要的函数,那么这里就贴出我编写好的整体的代码,进行运行测试:

  1. #include <stdio.h>#include <stdlib.h>#include <time.h>#pragma warning(disable:4996)#include <string.h>#include <stdio.h>#include <iostream>#include <winsock2.h>#include <ws2tcpip.h>#pragma comment(lib, "ws2_32.lib") //告诉编译器链接Winsock库//---------------------------------------MQTT协议相干的子函数声明-------------------------------------------------------//发布主题
  2. unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos);
  3. //订阅或者取消订阅主题
  4. unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether);
  5. //登录MQTT服务器
  6. unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password);
  7. //MQTT协议缓冲区初始化void MQTT_Init(void);//调用底层接口发送数据包void MQTT_SendBuf(unsigned char* buf, unsigned short len);//MQTT协议里最底层的接口,最底层的如果要移植协议到其他地方运行,那么改这里就行了。其他地方不用改的。int Client_SendData(unsigned char* buff, unsigned int len);//发送数据到服务器int Client_GetData(unsigned char* buff);//从服务器获取数据//---------------------------------------全局变量界说--------------------------------------------------------------------#define BYTE0(dwTemp)       (*( char *)(&dwTemp))#define BYTE1(dwTemp)       (*((char *)(&dwTemp) + 1))#define BYTE2(dwTemp)       (*((char *)(&dwTemp) + 2))#define BYTE3(dwTemp)       (*((char *)(&dwTemp) + 3))unsigned char mqtt_rxbuf[1024 * 1024];//发送数据缓存区unsigned char mqtt_txbuf[256];//接收数据缓存区
  8. unsigned int mqtt_rxlen;unsigned int mqtt_txlen;typedef enum{        //名字             值                         报文流动方向         描述        M_RESERVED1 = 0,        //        禁止        保存        M_CONNECT,        //        客户端到服务端        客户端哀求连接服务端        M_CONNACK,        //        服务端到客户端        连接报文确认        M_PUBLISH,        //        两个方向都允许        发布消息        M_PUBACK,        //        两个方向都允许        QoS 1消息发布收到确认        M_PUBREC,        //        两个方向都允许        发布收到(包管交付第一步)        M_PUBREL,        //        两个方向都允许        发布开释(包管交付第二步)        M_PUBCOMP,        //        两个方向都允许        QoS 2消息发布完成(包管交互第三步)        M_SUBSCRIBE,        //        客户端到服务端        客户端订阅哀求        M_SUBACK,        //        服务端到客户端        订阅哀求报文确认        M_UNSUBSCRIBE,        //        客户端到服务端        客户端取消订阅哀求        M_UNSUBACK,        //        服务端到客户端        取消订阅报文确认        M_PINGREQ,        //        客户端到服务端        心跳哀求        M_PINGRESP,        //        服务端到客户端        心跳相应        M_DISCONNECT,        //        客户端到服务端        客户端断开连接        M_RESERVED2,        //        禁止        保存}_typdef_mqtt_message;//连接乐成服务器回应 20 02 00 00//客户端主动断开连接 e0 00const unsigned char parket_connetAck[] = { 0x20,0x02,0x00,0x00 };const unsigned char parket_disconnet[] = { 0xe0,0x00 };const unsigned char parket_heart[] = { 0xc0,0x00 };const unsigned char parket_heart_reply[] = { 0xc0,0x00 };const unsigned char parket_subAck[] = { 0x90,0x03 };void MQTT_Init(void){        //缓冲区赋值        mqtt_rxlen = sizeof(mqtt_rxbuf);        mqtt_txlen = sizeof(mqtt_txbuf);        memset(mqtt_rxbuf, 0, mqtt_rxlen);        memset(mqtt_txbuf, 0, mqtt_txlen);}/*函数功能: 登录服务器函数返回值: 0表示乐成 1表示失败*/unsigned char MQTT_Connect(char* ClientID, char* Username, char* Password){        unsigned short i, j;        int ClientIDLen = (int)strlen(ClientID);        int UsernameLen = (int)strlen(Username);        int PasswordLen = (int)strlen(Password);        unsigned int DataLen;        mqtt_txlen = 0;        unsigned int size = 0;        unsigned char buff[256];                //可变报头+Payload  每个字段包含两个字节的长度标识        DataLen = 10 + (ClientIDLen + 2) + (UsernameLen + 2) + (PasswordLen + 2);        //固定报头        //控制报文类型        mqtt_txbuf[mqtt_txlen++] = 0x10;                //MQTT Message Type CONNECT        //剩余长度(不包罗固定头部)        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        //可变报头        //协议名        mqtt_txbuf[mqtt_txlen++] = 0;                // Protocol Name Length MSB            mqtt_txbuf[mqtt_txlen++] = 4;           // Protocol Name Length LSB            mqtt_txbuf[mqtt_txlen++] = 'M';                // ASCII Code for M            mqtt_txbuf[mqtt_txlen++] = 'Q';                // ASCII Code for Q            mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T            mqtt_txbuf[mqtt_txlen++] = 'T';                // ASCII Code for T            //协议级别        mqtt_txbuf[mqtt_txlen++] = 4;                        // MQTT Protocol version = 4            //连接标志        mqtt_txbuf[mqtt_txlen++] = 0xc2;                // conn flags         mqtt_txbuf[mqtt_txlen++] = 0;                        // Keep-alive Time Length MSB            mqtt_txbuf[mqtt_txlen++] = 100;                // Keep-alive Time Length LSB  100S心跳包          mqtt_txbuf[mqtt_txlen++] = BYTE1(ClientIDLen);// Client ID length MSB            mqtt_txbuf[mqtt_txlen++] = BYTE0(ClientIDLen);// Client ID length LSB                  memcpy(&mqtt_txbuf[mqtt_txlen], ClientID, ClientIDLen);        mqtt_txlen += ClientIDLen;        if (UsernameLen > 0)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(UsernameLen);                //username length MSB                    mqtt_txbuf[mqtt_txlen++] = BYTE0(UsernameLen);            //username length LSB                    memcpy(&mqtt_txbuf[mqtt_txlen], Username, UsernameLen);                mqtt_txlen += UsernameLen;        }        if (PasswordLen > 0)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(PasswordLen);                //password length MSB                    mqtt_txbuf[mqtt_txlen++] = BYTE0(PasswordLen);            //password length LSB                  memcpy(&mqtt_txbuf[mqtt_txlen], Password, PasswordLen);                mqtt_txlen += PasswordLen;        }        for (i = 0; i < 5; i++)        {                memset(mqtt_rxbuf, 0, mqtt_rxlen);                MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  9.                 size = Client_GetData(buff);//从服务器获取数据                if (size <= 0)continue;                memcpy(mqtt_rxbuf, buff, size);                printf("登录应答:\r\n");                for (j = 0; j < size; j++)                {                        printf("%#X ", buff[j]);                }                printf("\r\n");                if (mqtt_rxbuf[0] == parket_connetAck[0] && mqtt_rxbuf[1] == parket_connetAck[1]) //连接乐成                                           {                        return 0;//连接乐成                }        }        return 1;}/*函数功能: MQTT订阅/取消订阅数据打包函数函数参数:        topic       主题        qos         消息品级 0:最多分发一次  1: 至少分发一次  2: 仅分发一次        whether     订阅/取消订阅哀求包 (1表示订阅,0表示取消订阅)返回值: 0表示乐成 1表示失败*/unsigned char MQTT_SubscribeTopic(char* topic, unsigned char qos, unsigned char whether){        unsigned char i, j;        mqtt_txlen = 0;        unsigned int size = 0;        unsigned char buff[256];        unsigned int topiclen = (int)strlen(topic);        unsigned int DataLen = 2 + (topiclen + 2) + (whether ? 1 : 0);//可变报头的长度(2字节)加上有效载荷的长度        //固定报头        //控制报文类型        if (whether)mqtt_txbuf[mqtt_txlen++] = 0x82; //消息类型和标志订阅        else        mqtt_txbuf[mqtt_txlen++] = 0xA2;    //取消订阅        //剩余长度        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        //可变报头        mqtt_txbuf[mqtt_txlen++] = 0;                        //消息标识符 MSB        mqtt_txbuf[mqtt_txlen++] = 0x01;        //消息标识符 LSB        //有效载荷        mqtt_txbuf[mqtt_txlen++] = BYTE1(topiclen);//主题长度 MSB        mqtt_txbuf[mqtt_txlen++] = BYTE0(topiclen);//主题长度 LSB           memcpy(&mqtt_txbuf[mqtt_txlen], topic, topiclen);        mqtt_txlen += topiclen;        if (whether)        {                mqtt_txbuf[mqtt_txlen++] = qos;//QoS级别        }        for (i = 0; i < 100; i++)        {                memset(mqtt_rxbuf, 0, mqtt_rxlen);                MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  10.                 //printf("订阅消息发布乐成\n");                size = Client_GetData(buff);//从服务器获取数据                if (size <= 0)                {                        continue;                }                memcpy(mqtt_rxbuf, buff, size);                printf("订阅应答:\r\n");                for (j = 0; j < size; j++)                {                        printf("%#X ", buff[j]);                }                printf("\r\n");                if (mqtt_rxbuf[0] == parket_subAck[0] && mqtt_rxbuf[1] == parket_subAck[1]) //连接乐成                                           {                        return 0;//连接乐成                }                Sleep(1000);        }        return 1; //失败}//MQTT发布数据打包函数//topic   主题 //message 消息//qos     消息品级 unsigned char MQTT_PublishData(char* topic, char* message, unsigned char qos){        unsigned int topicLength = (int)strlen(topic);        unsigned int messageLength = (int)strlen(message);        unsigned short id = 0;        unsigned int DataLen;        mqtt_txlen = 0;        printf("上报JSON消息长度:%d\r\n", messageLength);        printf("message=%s\r\n", message);        //有效载荷的长度这样盘算:用固定报头中的剩余长度字段的值减去可变报头的长度        //QOS为0时没有标识符        //数据长度             主题名   报文标识符   有效载荷        if (qos)        DataLen = (2 + topicLength) + 2 + messageLength;        else        DataLen = (2 + topicLength) + messageLength;        //固定报头        //控制报文类型        mqtt_txbuf[mqtt_txlen++] = 0x30;    // MQTT Message Type PUBLISH          //剩余长度        do        {                unsigned char encodedByte = DataLen % 128;                DataLen = DataLen / 128;                // if there are more data to encode, set the top bit of this byte                if (DataLen > 0)                        encodedByte = encodedByte | 128;                mqtt_txbuf[mqtt_txlen++] = encodedByte;        } while (DataLen > 0);        mqtt_txbuf[mqtt_txlen++] = BYTE1(topicLength);//主题长度MSB        mqtt_txbuf[mqtt_txlen++] = BYTE0(topicLength);//主题长度LSB         memcpy(&mqtt_txbuf[mqtt_txlen], topic, topicLength);//拷贝主题        mqtt_txlen += topicLength;        //报文标识符        if (qos)        {                mqtt_txbuf[mqtt_txlen++] = BYTE1(id);                mqtt_txbuf[mqtt_txlen++] = BYTE0(id);                id++;        }        memcpy(&mqtt_txbuf[mqtt_txlen], message, messageLength);        mqtt_txlen += messageLength;        MQTT_SendBuf(mqtt_txbuf, mqtt_txlen);
  11.         return mqtt_txlen;}void MQTT_SendBuf(unsigned char* buf, unsigned short len){        Client_SendData(buf, len);//发送数据到服务器}//-----------------------------------------MQTT服务器的参数------------------------------------------------------------//服务器IP#define SERVER_IP "117.78.5.125"#define SERVER_PORT 1883 //端口号//MQTT三元组#define ClientID "65697df3585c81787ad4da82_stm32_0_0_2023120106"#define Username "65697df3585c81787ad4da82_stm32"#define Password "12cc9b1f637da8d755fa2cbd007bb669e6f292e3e63017538b5e6e13eef0cf58"//密文 //订阅主题:#define SET_TOPIC  "$oc/devices/65697df3585c81787ad4da82_stm32/sys/messages/down"//订阅//发布主题:#define POST_TOPIC "$oc/devices/65697df3585c81787ad4da82_stm32/sys/properties/report"//发布//-----------------------------------------主函数------------------------------------------------------------char mqtt_message[1024];//数据缓存区SOCKET connectSocket; //网络套接字WSADATA wsaData; //创建一个结构体变量,用于存储关于Winsock库的信息double TEMP = 10.0;int main(){        int result = WSAStartup(MAKEWORD(2, 2), &wsaData); //初始化Winsock库,指定版本号2.2,检查返回值        if (result != 0)        {                printf("WSAStartup failed: %d\r\n", result);//输出错误信息并退出步调                return 1;        }         connectSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); //创建一个TCP套接字,检查返回值        if (connectSocket == INVALID_SOCKET)        {                printf("socket failed with error: %d", WSAGetLastError());//输出错误信息并退出步调                WSACleanup(); //扫除Winsock库                return 1;        }        sockaddr_in service; //创建一个结构体变量,用于存储服务器地点信息        service.sin_family = AF_INET; //指定地点族为IPv4        inet_pton(AF_INET, SERVER_IP, &service.sin_addr); //将字符串类型的IP地点转换为二进制网络字节序的IP地点,并存储在结构体中        service.sin_port = htons(SERVER_PORT); //将端口号从主机字节序转换为网络字节序,并存储在结构体中        result = connect(connectSocket, (SOCKADDR*)&service, sizeof(service)); //连接到服务器,检查返回值        if (result == SOCKET_ERROR)        {                std::cout << "connect failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出步调                closesocket(connectSocket); //关闭套接字                WSACleanup(); //扫除Winsock库                return 1;        }        std::cout << "Connected to server." << std::endl; //连接乐成,输出消息        MQTT_Init();        while (1)        {                /*登录服务器*/                if (MQTT_Connect((char*)ClientID, (char*)Username, (char*)Password) == 0)                {                        break;                }                // 延时1000毫秒,即1秒                Sleep(1000);                printf("MQTT服务器登录校验中....\n");        }        printf("连接乐成_666\r\n");        //订阅物联网平台数据        int stat = MQTT_SubscribeTopic((char*)SET_TOPIC, 1, 1);        if (stat)        {                printf("订阅失败\r\n");                closesocket(connectSocket); //关闭套接字                WSACleanup(); //扫除Winsock库                return 1;        }        printf("订阅乐成\r\n");        /*创建线程*/        while (1)        {                sprintf(mqtt_message, "{"services": [{"service_id": "stm32","properties":{"TEMP":%.1f}}]}", (double)(TEMP+=0.2));//温度                                //发布主题                MQTT_PublishData((char*)POST_TOPIC, mqtt_message, 0);                printf("发布消息乐成\r\n");                Sleep(5000);        }}/*发送数据到服务器*/int Client_SendData(unsigned char* buff, unsigned int len){        int result = send(connectSocket,(const char*)buff, len, 0); //向服务器发送数据,检查返回值        if (result == SOCKET_ERROR)        {                std::cout << "send failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出步调                return -1;        }        return 0;}/*获取服务器下发数据*/int Client_GetData(unsigned char* buff){        int result = recv(connectSocket, (char*)buff,200, 0); //从服务器接收数据,检查返回值        if (result == SOCKET_ERROR)        {                std::cout << "recv failed with error: " << WSAGetLastError() << std::endl; //输出错误信息并退出步调                return -1;        }        return result;}
复制代码
7.2 代码里核心的地方

这里填写MQTT服务器的信息,也就是前面创建华为云IOT服务器得到的信息。

这里是主函数,登录服务器后订阅主题,发布消息。

7.3 编译运行代码

按下Ctrl+F5 运行代码。 弹出控制台窗口之后,可以看到,我们已经连接了华为云MQTT服务器,并且完成数据上传。

7.4 登录华为云IOT云端检察数据

可以看到装备已经在线了。

可以看到我们的消息也在实时的上传。

到此,说明我们的MQTT协议已经封装完成,可以正常的运行了。
八、下发命令的处理

一般MQTT装备端除了上传数据以外,还需要接收MQTT服务器下发的控制命令。
那么我们接下来就完善一下代码,接收华为云MQTT服务器下发的命令,并进行回应。
8.1 添加命令

要测试命令下发,那么起首需要再华为云IOT平台添加命令。

添加一个控制命令。


命令添加完成:

8.2 下发命令测试

留意:下发命令是同步的,装备端必须在线才可以下发命令。

这个下发的命令是有反馈。装备端收到之后,可以向服务器反馈状态,这样服务器才能知道刚才的控制命令确实发送乐成了。
装备收到信息之后,上传回应给服务器的主题和内容格式:
  1. Topic:$oc/devices/{device_id}/sys/commands/response/request_id={request_id}
  2. 数据格式:  
  3. {
  4.     "result_code": 0,
  5.     "response_name": "COMMAND_RESPONSE",
  6.     "paras": {
  7.         "result": "success"
  8.     }
  9. }
复制代码
云端发送控制命令之后,装备收到的消息如下:
  1. $oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689{"paras":{"lock":true},"service_id":"lock","command_name":"锁开关控制"}
复制代码
其中request_id=d49f0bb9-ba87-4c9b-b915-98a1f0fcf689 就是本次的哀求ID。回应的时候需要加上哀求ID。服务器才好对应。
以当前装备为例:
  1. 发布的主题这样填: $oc/devices/65113d05a559fd7cd41435f8_lock1/sys/commands/response/request_id=ce49181e-7636-4b24-946d-c034ca083c1c
  2.    
  3. 发布的内容这样填:
  4. {"result_code":0,"response_name":"COMMAND_RESPONSE","paras":{"result":"success"}}
复制代码
8.3 编写代码

为了可以或许实时接收服务器的代码,我们单独增长一个线程来接收服务器的消息。
在主函数里MQTT服务器连接乐成之后,增长以下代码:
  1. /*创建线程*/
  2. HANDLE hThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)ReceiveData, NULL, 0, NULL);
  3. if (hThread == NULL) {
  4.         printf("CreateThread failed.\n");
  5.         return 1;
  6. }
复制代码
编写线程的工作函数:
  1. // 处理服务器下发的数据
  2. void ReceiveData(void)
  3. {
  4.         // 接收数据
  5.         char buffer[1024];
  6.         char request_id[100];
  7.         char send_cmd[500];
  8.         int recvSize;
  9.         while (1)
  10.         {
  11.                 //等待服务器下发消息
  12.                 recvSize = recv(connectSocket, buffer, 1024, 0);
  13.                 if (recvSize == SOCKET_ERROR)
  14.                 {
  15.                         std::cout << "网络断开连接: " << WSAGetLastError() << std::endl; //输出错误信息并退出程序
  16.                         return;
  17.                 }
  18.                 if (recvSize > 0)
  19.                 {
  20.                         printf("服务器下发消息:\r\n");
  21.                         //接收下发的数据
  22.                         for (int i = 0; i < recvSize; i++)
  23.                         {
  24.                                 printf("%c", buffer[i]);
  25.                         }
  26.                         printf("\r\n");
  27.                         //下发指令请求回应给服务器(命令下发)
  28.                         if (strstr((char*)&buffer[5], "sys/commands/request_id="))
  29.                         {
  30.                                 char* p = NULL;
  31.                                 p = strstr((char*)&buffer[5], "request_id=");
  32.                                 if (p)
  33.                                 {
  34.                                         //解析数据
  35.                                         //$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/request_id=6e925cc1-4a8d-4eab-8d85-6c7f15d72189
  36.                                         strncpy(request_id, p, 47);
  37.                                 }
  38.                                 //上报数据
  39.                                 sprintf(mqtt_message, "{"result_code":0,"response_name":"COMMAND_RESPONSE","paras":{"result":"success"}}");
  40.                                 sprintf(send_cmd, "$oc/devices/65697df3585c81787ad4da82_stm32/sys/commands/response/%s", request_id);
  41.                                 MQTT_PublishData(send_cmd, mqtt_message, 0);
  42.                                 printf("(命令)发布主题:%s\r\n", send_cmd);
  43.                                 printf("(命令)发布数据:%s\r\n", mqtt_message);
  44.                         }
  45.                 }               
  46.         }
  47. }
复制代码
8.4 运行代码测试

先运行客户端的代码,登录MQTT服务器。

然后,在华为云IOT平台下发命令。 如果点击下发之后,右上角弹出了 命令下发乐成,就表示我们客户端代码写OK了。

我们看装备端收到的消息:

九、总结

到此,我们的MQTT协议已经开发完成了。如果大家详细阅读了文章,并且跟着步骤操作了一次,相信你现在对MQTT协议应该有所认识了。我是DS小龙哥,欢迎关注我,后续会有更多的技能文章、项目文章发布。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4