一、MQTT协议
MQTT协议在lot领域是利用的最广泛的通用协议,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
mqtt协议的优点,官方的解释是:用极少的代码和有限的带宽,为毗连远程设备提供实时可靠的消息服务;简朴明白就是,实现起来简朴,而且在传输上无效的数据也很少,而且能够包管数据传输的可靠性。
二、协议详解
2.1 协议结构
MQTT协议由三部分构成,固定报头,可变报头,有效载荷;固定报头是所有的报文同一的格式,可变抱头则根据固定抱头中的报文类型不同基本不同,每个报文类型基本上都有自己的可变报头格式。
2.1.1 固定报头
固定报头是MQTT协议开头,2个字节,分为三个部分:标志位、报文类型、剩余长度,下图为固定报头结构表示图:
固定报头结构
第一个字节分为2个部分:标志位和报文类型,一个字节有8位二进制,前面4位用来做标志位
后面4位用来表现表现具体的报文类型,一共有16种,有部分类型是预留未利用的类型,在代码实现时,可以暂时忽略;具体的报文类型如下表所示:
第二个字节是剩余长度,其表现的就是在剩余长度之后的数据长度(还有多少个字节),剩余长度的字节数是不固定的,至少一个字节,最多4个字节,所以固定报头中包含的一个字节就是剩余长度的第一个字节;这里需要阐明一下,剩余长度的每一个字节的最高位是一个标志位,用来表现下一个字节是否也属于剩余长度!
举例阐明:
比如:
0100 0000(最高位为0) 则阐明后面的字节数据不属于剩余长度了
1010 0000
(第一个字节,最高位为1)
0000 0001
(第二个字节,最高位为0)
则阐明后面的字节数据属于剩余长度,第2个字节最高位为0,则阐明后面没有属于剩余长度的字节了,那么此剩余长度则为这2个字节
因为剩余长度的每个字节的最高位为标志位,所以其真实值并不是这些字节直接合并而成,需要另外进行盘算;
如果第一个字节的最高位为0,则阐明剩余长度只有一个字节,而且其值就是这个字节的值,如:
那么剩余长度的真实值就是:64
如果剩余长度高出一个字节,那么就需要将每个字节的高字节去掉,然后构成一个新的数据,盘算其值。
下面举例阐明:
2个字节长度的剩余长度盘算
第一个字节第二个字节1010 00000000 0001 去掉标志位后,其新值为
第一个字节第二个字节010 0000000 0001 合并时,后面的字节在高位,则合并后值为:1010 0000,则剩余长度的值为十进制的160;
三个字节长度的剩余长度盘算
第一个字节第二个字节第三个字节1001 00011100 10010100 1001 去掉标志位后,其新值为
第一个字节第二个字节第三个字节001 0001100 1001100 1001 后面的字节在高位,则合并后值为:100 1001 100 1001 001 0001,则剩余长度的值为十进制的1205393;
四个字节长度的剩余长度盘算
第一个字节第二个字节第三个字节第四个个字节1001 00011100 10011100 10010100 1101 去掉标志位后,其新值为
第一个字节第二个字节第三个字节第四个个字节001 0001100 1001100 1001100 1101
后面的字节在高位,则合并后值为:100 1101 100 1001 100 1001 001 0001,则剩余长度的值为十进制的162686097;
2.1.2 可变报头
可变报头的数据依据固定报头中的报文类型而不同,一般是包含和报文类型相关的数据;
比方客户端毗连服务器报文,包含了协议名,协议级别(用来表现mqtt的版本信息)、毗连标志、保持毗连;可变报头内容表示图如下:
发布消息报文可变报头,则内容仅包含了主题名和报文标识符(仅当标志位Qos>0)
2.1.3 有效载荷
有效载荷内容也是根据报文类型的不同而不同,在上述可变报头的2张表示图中也能看出,毗连服务器报文的有效载荷则包含了:客户端标识符、遗属主题、遗属消息、用户、暗码,而发布消息报文,则仅仅包含了应用消息;
不同的有效载荷剖析方式有所不同,具体要看每一条报文的规定,详情可以参见末了一个章节提供的Mqtt官方手册。
三、具体协议报文详解(重要)
3.1 毗连服务器报文详解
起首来看看毗连服务器报文协议,先贴出具体的一条毗连服务器报文协议数据(16进制数,每2个数字表现一个字节)
- 10ab0100044d51545404c2001400177061686f31363735313537353030373437303030303030000464656d6f00803846334238444532464443384244334437393242453737454143343132303130393731373635453542444436433439394144434545383430434534343142444546313745333036383442443935434137303846353530323232323243433631363144304432334332444643423132463841433939384635394537323133333933
复制代码
先把团体的剖析数据罗列出,再进行详细的剖析阐明:
3.1.1、固定报文
第1个字节:10,二进制则为:0001 0000(位置从后往前数) ,前4位为0000,则标志位全为0,标志位数据表现的意思需要根据报文类型来确定;后4位0001,值为1,则表现报文类型为客户端毗连服务器(CONNECT)
第2个字节:ab,二进制为:1010 1011,第2个字节为剩余长度的起始字节,其最高位为1,则阐明后面一个字节也属于剩余长度
3.1.2、可变报文
第3个字节:01,二进制为:0000 0001 最高位为0,则剩余长度结束,共包含了2个字节,依据前面剩余长度的算法解释,可得出剩余长度等于171,表现该报文后面还有171个字节。
根据官方文档的解释:毗连服务器报文可变报头的起始2个字节为协议名的长度;前面已经剖析了3个字节;
第4个字节至第5个字节则为协议名称的长度:00 04 协议名长度为4,
第6-9个字节为协议名,6-9字节为:4d 51 54 54 ,根据协议规定,协议名为ascii码值,则协议名剖析为:MQTT
第10个字节为协议级别:04 (官方解释:0x04 表现3.1.1版本)
接下来的第11个字节为 毗连标志,毗连标志位的详细解释如下图所示:
第11个字节:c2 二进制为:1100 0010,那么会话清理(clean session)为1,表现丢弃之前的会话,开始一个新的会话;另外用户名和暗码标志位为1,则阐明在有效载荷中携带了用户名和暗码信息,遗属标志位为0,则有效载荷中就没有遗属主题和遗属消息
第12-13字节:00 14,用来表现保持毗连的时间(秒),即为10十进制:20秒
到此处可变报头就剖析完了,接下来就是剖析有效载荷部分了,毗连服务器报文有效载荷格式为前个字节为字段长度,然后接字段值的格式
3.1.3、有效载荷
第14-15字节:00 17,用来表现客户端标识符长度,即为10十进制:23个字节
第16-38字节:7061686f31363735313537353030373437303030303030 ,表现客户端标识符,客服端标识符为ascii码格式,则此处为:paho1675157500747000000
第39-40字节:00 04,用来表现用户长度,即为10十进制:4个字节
第41-44字节:64 65 6d 6f,用来表现用户名,ascii码为(demo)
第45-46字节:00 80 ,用来表现暗码长度 ,即为10进制:128
第47-174字节:
3846334238444532464443384244334437393242453737454143343132303130393731373635453542444436433439394144434545383430434534343142444546313745333036383442443935434137303846353530323232323243433631363144304432334332444643423132463841433939384635394537323133333933
即为暗码,也是利用ascii码
至此我们已经完整的剖析了一条mqtt客户端毗连服务器报文,源代码如下:
- int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,\
- const char* will_topic, const char* will_data){
- size_t ID_len = strlen(ID);//客户端标识符
- size_t username_len = strlen(username);
- size_t password_len = strlen(password);
- size_t will_topic_len = strlen(will_topic);
- size_t will_data_len = strlen(will_data);//will_data:遗嘱消息内容
- size_t index = 1; // 填写数据用的索引,从1开始是因为第0位固定为0x10
-
- //如果客户端标识符为空,函数返回错误码1
- if(ID_len <= 0) return 1; // ID必须有
-
- // 可变报头和有效载荷的长度(剩余长度),其中12是可变报头固定 10byte 和记录ID的长度的 2byte
- //12:这是CONNECT报文固定报头之后的长度。它包括:
- //协议名“MQTT”(4字节)及其长度(2字节)。
- //协议级别(1字节)。
- //连接标志(1字节)。
- //保持连接时间(2字节)。
- //
- /*(username_len == 0 ? 0 : 2):如果用户名非空,则加上2字节的长度前缀。
- (password_len == 0 ? 0 : 2):如果密码非空,则加上2字节的长度前缀。
- (will_topic_len == 0 ? 0 : 2):如果遗嘱主题非空,则加上2字节的长度前缀。
- (will_data_len == 0 ? 0 : 2):如果遗嘱消息非空,则加上2字节的长度前缀。*/
- uint32_t send_size = 12 + ID_len + username_len + password_len + will_topic_len + \
- will_data_len + (username_len == 0 ? 0 : 2) + (password_len == 0 ? 0 : 2) + \
- (will_topic_len == 0 ? 0 : 2) + (will_data_len == 0 ? 0 : 2);
-
- // 填入固定报头 0001 0000 CONNECT固定第一字节
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_CONNECT;
-
- //剩余长度字段的解码算法
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- // 可变报头-协议名(固定) utf-8字符串形式
- Load_Utf8_String(&index,"MQTT");
- // 可变报头-协议级别(3.1.1固定为0x04)
- Z_Mqtt_Send_Buffer[index++] = 0x04;
- // 可变报头-连接标志
- uint8_t connect_flag = 0x00;
- if(strlen(username) > 0) connect_flag |= 0x80; //设置用户名标志
- if(strlen(password) > 0) connect_flag |= 0x40; //设置密码标志
- if(strlen(will_topic) > 0) connect_flag |= 0x24; // 设置遗嘱标志与遗嘱信息保留标志
- Z_Mqtt_Send_Buffer[index++] = connect_flag;
- // 可变报头-保持连接时间
- Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME/128;
- Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME%128;
- // 有效载荷-客户端标识符(ID 必须有)
- Load_Utf8_String(&index,ID);
- // 有效载荷-遗嘱主题(will_topic 如果有)
- if(strlen(will_topic) > 0) Load_Utf8_String(&index,will_topic);
- // 有效载荷-遗嘱内容(will_data 如果有)
- if(strlen(will_data) > 0) Load_Utf8_String(&index,will_data);
- // 有效载荷-用户名(username 如果有)
- if(strlen(username) > 0) Load_Utf8_String(&index,username);
- // 有效载荷-密码(password 如果有)
- if(strlen(password) > 0) Load_Utf8_String(&index,password);
- write(mqtt_client, Z_Mqtt_Send_Buffer, index);
-
- ssize_t receive_len = Z_MQTT_TryReceiveData();
- if(receive_len <= 0){
- printf("no receive mqssage\n");
- return -1;
- }
复制代码 3.2 发布消息报文详解
发布消息报文,此报文可变报头仅有2个部分,一个是主题名,一个是报文标识符,报文标识符仅当Qos大于0时才有,然后就是有效载荷,有效载荷都是自己定义的内容,收到数据后按自定义规则剖析即可;
来看一条具体报文数据(报文都是16进制表现,2个字符表现一个字节)
- 301200047465737468656c6c6f2c776f726c64
复制代码 先把团体的剖析数据罗列出,再进行详细的剖析阐明:
3.2.1、固定报文
第1个字节是30,二进制则为:0011 0000(位置从后往前数) ,前4位为0000,则标志位全为0,标志位数据表现的意思需要根据报文类型来确定;后4位0011 值为3,则表现报文类型为发布消息(PUBLISH)
第2个字节:11,二进制为:0001 0001,第2个字节为剩余长度的起始字节,其最高位为0,则阐明剩余长度就只有这一个字节,10进制值为17,表现该报文后面还有17个字节。
3.2.2、可变报文(主题)
第一个就是主题,格式内容为2个字节的主题名称长度,然后跟上主题名;
第3-4个字节为主题名长度:00 04,表现有4个字节的主题名
第5-8字节为主题名:74657374 ,主题名为asicc码格式,则为:test
因为固定报头中标志位为0,则Qos=0,则没有报文标识符
3.2.3、有效载荷(内容)
第9-19字节为有效载荷:68656c6c6f2c776f726c64 (ascii码:hello,world)
再看一条带有Qos=1的发布消息报文数据
- 3213000474657374000168656c6c6f2c776f726c64
复制代码 先列出团体的剖析数据
MQTT的服务质量提供3个品级:
QoS0:最多发送一次消息,在消息发送出去后,接收者不会发送回应,发送者也不会重发消息,消息可能送达一次也可能根本没送达,这个服务质量常用在不重要的消息传递中,因为即使消息丢了也没有太大关系。
QoS1:最少发送一次消息(消息最少需要送达一次,也有可送达多次),QoS 1的PUBLISH报文的可变报头中包含一个报文标识符,需要PUBACK报文确认。即需要接收者返回PUBACK应答报文。
QoS2:只发送一次,这是最高品级的服务质量,消息丢失和重复都是不可担当的,只不外利用这个服务质量品级会有额外的开销,这个品级常用于付出中,因为付出是必须有且仅有一次乐成,总不能没给钱或者给了多次钱吧。
其中,利用 QoS 0 可能丢失消息,利用 QoS 1 可以包管收到消息,但消息可能重复,利用 QoS 2 可以包管消息既不丢失也不重复。QoS 品级从低到高,不仅意味着消息可靠性的提拔,也意味着传输复杂程度的提拔。在发布者到订阅者的消息投递流程中,QoS 品级是由发布者在 PUBLISH 报文中指定的。
3.2.4、固定报文
第1个字节是32,二进制则为:0011 0010(位置从后往前数) ,第2-3位表现的是Qos,则Qos=1那么后面的可变报文中,就会添加报文标识符了;后4位0011 值为3,则表现报文类型为发布消息(PUBLISH)
第2个字节:13,二进制为:0001 0011,第2个字节为剩余长度的起始字节,其最高位为0,则阐明剩余长度就只有这一个字节,10进制值为19,表现该报文后面还有19个字节。
3.2.5、可变报文(主题)
第3-4个字节为主题名长度:00 04,表现有4个字节的主题名
第5-8个字节为主题名:74657374 ,主题名为asicc码格式,则为:test
因为标志位中Qos=1,则下面的2个字节表现报文标识符,报文标识符一般都是用来表现消息数,用来区分不同的消息,避免处理惩罚重复消息
第9-10字节:00 01,则报文标识符为1
3.2.6、有效载荷(内容)
第11-21字节为有效载荷:68656c6c6f2c776f726c64 (ascii码:hello,world)
两条报文对比一下,因为固定报头中的标志位不同,影响了可变报头的内容,多了一个报文标识符。
- // 发布主题信息
- // topic 主题名 data 发布内容 retain 保留标志位
- // 成功返回0, 失败返回-1, 参数错误返回1;
- int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain){
- size_t index = 1;
- size_t topic_len = strlen(topic);
- size_t data_len = strlen(data);
- printf("this is publish topic is %s,data is %s\n",topic,data);
- if(topic_len == 0) return 1;
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PUBLISH;
- if(retain) Z_Mqtt_Send_Buffer[0] |= 0x01;
- // 可变报头和有效载荷的长度(剩余长度)
- uint32_t send_size = topic_len + data_len + 2;
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- Load_Utf8_String(&index,topic); // 装载topic
- memcpy((void*)&Z_Mqtt_Send_Buffer[index],(void*)data,data_len); // 装载data
- write(mqtt_client, Z_Mqtt_Send_Buffer, index + data_len);
- return 0;
- }
复制代码 3.3、 订阅主题报文
客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。SUBSCRIBE报文也(为每个订阅)指定了最大的QoS品级,服务端根据这个发送应用消息给客户端。
3.3.1 固定报头
SUBSCRIBE控制报固定报头的第3,2,1,0位是保留位,必须分别设置为0,0,1,0。服务端必须将其它的任何值都当做是不合法的并关闭网络毗连。
剩余长度字段:等于SUBSCRIBE可变报头的长度(2字节)加上有效载荷的长度。
3.3.2 可变报头
可变报头包含客户端报文标识符。
报文标识符相当于自定义的Topic的ID,用ID号去代替具体的Topic,而不是字段,使得区分发来的Topic的同时又可以节省流量,可自定义,发起自己预先制定一个服务ID表。
订阅返回,返回Topic订阅乐成信息,返回的不是具体Topic,返回的就是报文标识符。
3.3.3 有效载荷
SUBSCRIBE报文的有效载荷包含了一个主题过滤器列表,它们表现客户端想要订阅的主题。SUBSCRIBE报文有效载荷中的主题过滤器列表必须是UTF-8字符串。服务端应该支持包含通配符的主题过滤器。如果服务端选择不支持包含通配符的主题过滤器,必须拒绝任何包含通配符过滤器的订阅请求。每一个过滤器后面跟着一个字节,这个字节被叫做 服务质量要求(Requested QoS)。它给出了服务端向客户端发送应用消息所答应的最大QoS品级。
SUBSCRIBE报文的有效载荷必须包含至少一对主题过滤器 和 QoS品级字段组合。没有有效载荷的SUBSCRIBE报文是违反协议的。
当前版本的协议没有效到服务质量要求(Requested QoS)字节的高六位。如果有效载荷中的任何位是非零值,或者QoS不等于0,1或2,服务端必须以为SUBSCRIBE报文是不合法的并关闭网络毗连。
响应:服务端收到客户端发送的一个SUBSCRIBE报文时,必须利用SUBACK报文响应。
服务端发送给客户端的SUBACK(确认主题订阅)报文对每一对主题过滤器 和QoS品级都必须包含一个返回码。这个返回码必须表现谁人订阅被授予的最大QoS品级,或者表现这个订阅失败。服务端可以授予比订阅者要求的低一些的QoS品级。为响应订阅而发出的消息的有效载荷的QoS必须是原始发布消息的QoS和服务端授予的QoS两者中的最小值。如果原始消息的QoS是1而被授予的最大QoS是0,答应服务端重复发送一个消息的副本给订阅者。
- // 订阅主题
- // 成功返回0,失败-1,参数错误返回1
- int Z_MQTT_Subscribe(const char* topic){
- size_t index = 1;
- size_t topic_len = strlen(topic);
- if(topic_len == 0){
- printf("parameter error\n");
- return 1;
- }
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_SUBSCRIBE | 0x02;
- // 可变报头和有效载荷的长度(剩余长度)
- uint32_t send_size = topic_len + 5;
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- Z_Mqtt_Send_Buffer[index++] = 0x00; // 报文标识符,咱随便给个非0数就行
- Z_Mqtt_Send_Buffer[index++] = 0x01; // 后面需要和返回的报文作对比
- Load_Utf8_String(&index,topic);
- Z_Mqtt_Send_Buffer[index++] = 0x00; // Qos0 等级
- write(mqtt_client,Z_Mqtt_Send_Buffer,index);
- ssize_t receive_len = Z_MQTT_TryReceiveData();
- if(receive_len <= 0){
- printf("no receive mqssage\n");
- return -1;
- }
- Printf_Receive(receive_len);
- if(receive_len != 5){
- printf("unknow error\n");
- return -1;
- }
- if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_SUBACK){
- printf("message type error\n");
- return -1;
- }
- // 报文标识符对不上
- if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){
- printf("message ID error\n");
- return -1;
- }
- if(Z_Mqtt_Receive_Buffer[4] != 0x00){
- printf("Qos error\n");
- return -1;
- }
- return 0;
- }
复制代码
四、全部源码
4.1 main.c
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <pthread.h>
- #include "Z_Mqtt_Tool.h"
- // 获取的topic最长长度
- #define MQTT_TOPIC_SIZE 128
- // 获取的data最长长度
- #define MQTT_DATA_SIZE 1024
- //客户端ID
- #define MQTT_ID "zhetu"
- #define MQTT_USERNAME "EMS"
- #define MQTT_PASSWORD "EMS"
- //遗嘱主题
- #define MQTT_WILLTOPIC "EMS"
- //遗嘱消息
- #define MQTT_WILLDATA "EMS"
- //存储客户端的socket连接
- int client = 0;
- //用于存储接收到的主题和数据。
- char Mqtt_Topic_Buffer[MQTT_TOPIC_SIZE];
- char Mqtt_Data_Buffer[MQTT_DATA_SIZE];
- //这个函数用于从标准输入读取主题或数据。根据传入的 flag 参数,它将读取主题或数据,并存储在相应的缓冲区中
- void get_str(char flag){
- if(flag == 't'){
- memset(Mqtt_Topic_Buffer,0,MQTT_TOPIC_SIZE);
- printf("input topic: \n");
- fgets(Mqtt_Topic_Buffer,MQTT_TOPIC_SIZE,stdin);
- Mqtt_Topic_Buffer[strlen(Mqtt_Topic_Buffer) - 1] = '\0'; // 消除回车
- }else if(flag == 'd'){
- memset(Mqtt_Data_Buffer,0,MQTT_DATA_SIZE);
- printf("input data: \n");
- fgets(Mqtt_Data_Buffer,MQTT_DATA_SIZE,stdin);
- Mqtt_Data_Buffer[strlen(Mqtt_Data_Buffer) - 1] = '\0'; // 消除回车
- }
- }
- //Get_Mqtt_Message 函数:这是一个线程函数,它在一个无限循环中不断调用 Z_MQTT_ReceivePush 函数来接收MQTT代理推送的消息,并打印出来。使用 usleep 函数在每次循环后暂停100毫秒
- void* Get_Mqtt_Message(void* arg){
- while(1){
- //它将接收到的主题存储在 Mqtt_Topic_Buffer 中,将数据存储在 Mqtt_Data_Buffer 中
- if(0 == Z_MQTT_ReceivePush(Mqtt_Topic_Buffer,Mqtt_Data_Buffer,MQTT_TOPIC_SIZE,MQTT_DATA_SIZE)){
- //printf 函数用于打印接收到的消息。它显示主题和数据内容
- printf("receive MQTT pubish,topic is "%s",data is "%s"\n",Mqtt_Topic_Buffer,Mqtt_Data_Buffer);
- }
- usleep(100000); // 100ms检测一次
- }
- return NULL;
- }
- int main(void){
- printf("hello world!\n");
- int res = 0;
- char inputchar = 0;
- char inputstr[1024] = {0};
- pthread_t tid;
- //用于创建一个新线程,该线程将执行 Get_Mqtt_Message 函数,以便在后台接收MQTT消息
- /*
- int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
- void *(*start_routine)(void *), void *arg);
- pthread_t *thread:指向 pthread_t 类型的指针,用于存储新创建线程的标识符。
- const pthread_attr_t *attr:指向线程属性对象的指针,可以为 NULL,表示使用默认属性。
- void *(*start_routine)(void *):新线程开始执行时调用的函数,即线程函数。
- void *arg:传递给线程函数的参数,可以为 NULL。
- */
- int ret = pthread_create(&tid,NULL,Get_Mqtt_Message,NULL);
- while(1){
- printf("\
- /****************************************************\n\
- \ta\tconnect TCP server\n\
- \tb\tconnect MQTT Broker\n\
- \td\tdisconnect MQTT Broker\n\
- \ts\tsubscribe topic\n\
- \tu\tunsubscribe topic\n\
- \tp\tpublish data\n\
- \tj\tsend ping to MQTT broker\n\
- \tq\tquit\n\
- ****************************************************/\n");
- inputchar = getchar();
- getchar();
- if(inputchar == 'a'){
- if(0 == Z_Mqtt_Init()){
- printf("connect TCP server success!\n");
- }else{
- printf("connect TCP error,try again\n");
- }
- }else if(inputchar == 'b'){
- if(0 == Z_MQTT_ConnectBroker(MQTT_ID,MQTT_USERNAME,MQTT_PASSWORD,MQTT_WILLTOPIC,MQTT_WILLDATA)){
- printf("connect MQTT broker success!\n");
- }else{
- printf("connect MQTT broker error,rty again\n");
- }
- }else if(inputchar == 'd'){
- Z_MQTT_DisconnectBroker();
- }else if(inputchar == 's'){
- get_str('t');
- if(0 == Z_MQTT_Subscribe(Mqtt_Topic_Buffer)){
- printf("subscribe success!\n");
- }else{
- printf("subscribe error,try again\n");
- }
- }else if(inputchar == 'u'){
- get_str('t');
- if(0 == Z_MQTT_Unsubscribe(Mqtt_Topic_Buffer)){
- printf("unsubscribe success!\n");
- }else{
- printf("unsubscribe error,try again\n");
- }
- }else if(inputchar == 'p'){
- get_str('t');
- get_str('d');
- if(0 == Z_MQTT_Publish(Mqtt_Topic_Buffer,Mqtt_Data_Buffer,0)){
- printf("send publish success!\n");
- }else{
- printf("parameter error,try agagin\n");
- }
- }else if(inputchar == 'j'){
- if(0 == Z_MQTT_Ping()){
- printf("send ping success!\n");
- }else{
- printf("unknown error,try agagin\n");
- }
- }else if(inputchar == 'q'){
- Z_MQTT_Close_TCP();
- break;
- }
- }
- printf("good bye~\n");
- return 0;
- }
复制代码 4.2 Z_Mqtt_Tool.h
- #ifndef __Z_MQTT_TOOL_H
- #define __Z_MQTT_TOOL_H
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <sys/socket.h>
- #include <arpa/inet.h>
- #include <pthread.h>
- // 报文类型
- #define Z_MQTT_FLAG_CONNECT 0x10
- #define Z_MQTT_FLAG_CONNACK 0x20
- #define Z_MQTT_FLAG_PUBLISH 0x30
- #define Z_MQTT_FLAG_PUBACK 0x40
- #define Z_MQTT_FLAG_PUBREC 0x50
- #define Z_MQTT_FLAG_PUBREL 0x60
- #define Z_MQTT_FLAG_PUBCOMP 0x70
- #define Z_MQTT_FLAG_SUBSCRIBE 0x80
- #define Z_MQTT_FLAG_SUBACK 0x90
- #define Z_MQTT_FLAG_UNSUBSCRIBE 0xA0
- #define Z_MQTT_FLAG_UNSUBACK 0xB0
- #define Z_MQTT_FLAG_PINGREQ 0xC0
- #define Z_MQTT_FLAG_PINGRESP 0xD0
- #define Z_MQTT_FLAG_DISCONNECT 0xE0
- int Z_Mqtt_Init(void);
- void Z_MQTT_Close_TCP(void);
- void Load_Utf8_String(size_t* index, const char* data);
- int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,const char* will_topic, const char* will_data);
- int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain);
- int Z_MQTT_Subscribe(const char* topic);
- int Z_MQTT_Unsubscribe(const char* topic);
- int Z_MQTT_ReceivePush(char* topic, char* data, size_t topic_len, size_t data_len);
- void Z_MQTT_DisconnectBroker(void);
- int Z_MQTT_Ping(void);
- #endif
复制代码 4.3 Z_Mqtt_Tool.c
- #include "Z_Mqtt_Tool.h"// 接收缓冲区巨细以及发送缓冲区巨细#define Z_MQTT_RECEIVE_SIZE 2048#define Z_MQTT_SEND_SIZE 2048// MQTT服务器IP以及端口#define Z_MQTT_BROKER_IP "10.1.193.87"#define Z_MQTT_BROKER_PORT 1883// 心跳隔断时间,单元秒#define Z_MQTT_KEEP_ALIVE_TIME 100// 等待复兴报文时间,单元秒#define Z_MQTT_WAIT_TIME 10#include <arpa/inet.h> // 包含inet_ntoa函数的头文件//Z_MQTT_RECEIVE_SIZE 和 Z_MQTT_SEND_SIZE:分别定义了接收和发送缓冲区的巨细 //全局变量Z_Mqtt_Send_Buffer 和 Z_Mqtt_Receive_Buffer:分别用于存储发送和接收的数据static uint8_t Z_Mqtt_Send_Buffer[Z_MQTT_SEND_SIZE] = {0}; static uint8_t Z_Mqtt_Receive_Buffer[Z_MQTT_RECEIVE_SIZE] = {0}; mqtt_client:存储客户端的socket文件描述符static int mqtt_client;static pthread_mutex_t Mqtt_Receive_Mutex;// 调试用void Printf_Receive(ssize_t receive_len){ printf("-------------------------------------------------\n"); printf("receive_len is %ld,receive data is ",receive_len); size_t i;//%02x 是格式化字符串,它表现打印一个两位的十六进制数,如果数字不敷两位,则在前面补零 for(i = 0; i < receive_len; ++i){ printf("%02x\t",(unsigned int)Z_Mqtt_Receive_Buffer[i]); } printf("\n"); printf("-------------------------------------------------\n");}// 创建TCP毗连 失败返回-1,乐成返回0int Z_Mqtt_Init(void){ // 初始化发送和接收的缓冲区(buff)以及互斥锁 memset(Z_Mqtt_Send_Buffer,0,Z_MQTT_SEND_SIZE); memset(Z_Mqtt_Receive_Buffer,0,Z_MQTT_RECEIVE_SIZE); pthread_mutex_init(&Mqtt_Receive_Mutex,NULL); // PF_INET 和 AF_INET 差不多,可以混用(发起同一,方便自己看) mqtt_client = socket(AF_INET, SOCK_STREAM, 0); // 利用IPv4协议族,TCP if(-1 == mqtt_client){ printf("create socket error\n"); return -1; } // 毗连服务器 struct sockaddr_in addr;/*addr.sin_family 设置为 AF_INET,表现利用IPv4地点族。addr.sin_port 设置为 htons(Z_MQTT_BROKER_PORT),将端口号从主机字节序转换为网络字节序。inet_pton 函数用于将点分十进制的IP地点字符串转换为网络字节序的二进制形式*/ addr.sin_family = AF_INET; // 利用IPv4协议族 addr.sin_port = htons(Z_MQTT_BROKER_PORT); // 设置端口,需用htons转成大端 // 将IP地点(点分十进制字符串)转换为网络字节序(IPv4)写进addr.sin_addr.s_addr inet_pton(AF_INET, Z_MQTT_BROKER_IP, &addr.sin_addr.s_addr); // 将IP地点(点分十进制字符串)转换为网络字节序(IPv4)写进addr.sin_addr.s_addrinet_pton(AF_INET, Z_MQTT_BROKER_IP, &addr.sin_addr.s_addr);// 打印IP地点char ip_str[INET_ADDRSTRLEN]; // INET_ADDRSTRLEN 是IPv4地点字符串的最大长度struct in_addr ip_addr; // 用于存储网络字节序的IP地点ip_addr.s_addr = addr.sin_addr.s_addr; // 将网络字节序的IP地点赋值给ip_addrstrcpy(ip_str, inet_ntoa(ip_addr)); // 将网络字节序的IP地点转换为点分十进制字符串printf("MQTT Broker IP: %s\n", ip_str); // 打印IP地点 /*int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);int sockfd:这是客户端的套接字文件描述符(socket file descriptor),它必须是一个有效的、已经创建但尚未毗连的套接字。const struct sockaddr *addr:这是一个指向 sockaddr 结构体的指针,它包含了服务器的地点信息。对于IPv4,这通常是 struct sockaddr_in 结构体;addr 参数指定了服务器的协议族、IP地点和端口号socklen_t addrlen:这是 addr 参数指向的 sockaddr 结构体的巨细,以字节为单元。这个参数告诉 connect 函数 addr 结构体中有多少字节是有效的,需要被读取。*/ if(-1 == connect(mqtt_client, (struct sockaddr*)&addr, sizeof(addr))){ // 毗连目标服务器 printf("connect error\n"); return -1; } return 0;}void Z_MQTT_Close_TCP(void){ close(mqtt_client);}// 以utf-8编码字符串的形式把数据装载进发送缓冲区 MQTT协议规定,所有的字符串都必须利用UTF-8编码//一个UTF-8字符串可以非常长。因此,在发送之前,需要在字符串前附加一个长度前缀,这样接收方就能知道字符串的确切长度void Load_Utf8_String(size_t* index, const char* data){ //装载长度 Z_Mqtt_Send_Buffer[(*index)++] = strlen(data)/128; Z_Mqtt_Send_Buffer[(*index)++] = strlen(data)%128; // 装载数据 memcpy((void*)&Z_Mqtt_Send_Buffer[*index], (void*)data, strlen(data)); (*index) += strlen(data);}// 实验获取来自MQTT的报文int Z_MQTT_TryReceiveData(void){ pthread_mutex_lock(&Mqtt_Receive_Mutex); ssize_t receive_len = -1; int count = 0; do{ receive_len = recv(mqtt_client,Z_Mqtt_Receive_Buffer,Z_MQTT_RECEIVE_SIZE,MSG_DONTWAIT); usleep(10000); // 等待10ms if(++count >= (Z_MQTT_WAIT_TIME * 100)){ printf("wait to long\n"); pthread_mutex_unlock(&Mqtt_Receive_Mutex); return -1; } }while(receive_len <= 0); pthread_mutex_unlock(&Mqtt_Receive_Mutex); return receive_len;} // ID:毗连MQTT服务器所利用的ID// username password 毗连MQTT服务器所利用的用户名和暗码(可以为空字符串)// will_topic will_data 遗嘱主题与遗嘱内容(可以为空字符串)// 毗连乐成返回0,失败返回-1,参数错误返回1int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,\
- const char* will_topic, const char* will_data){
- size_t ID_len = strlen(ID);//客户端标识符
- size_t username_len = strlen(username);
- size_t password_len = strlen(password);
- size_t will_topic_len = strlen(will_topic);
- size_t will_data_len = strlen(will_data);//will_data:遗嘱消息内容
- size_t index = 1; // 填写数据用的索引,从1开始是因为第0位固定为0x10
-
- //如果客户端标识符为空,函数返回错误码1
- if(ID_len <= 0) return 1; // ID必须有
-
- // 可变报头和有效载荷的长度(剩余长度),其中12是可变报头固定 10byte 和记录ID的长度的 2byte
- //12:这是CONNECT报文固定报头之后的长度。它包括:
- //协议名“MQTT”(4字节)及其长度(2字节)。
- //协议级别(1字节)。
- //连接标志(1字节)。
- //保持连接时间(2字节)。
- //
- /*(username_len == 0 ? 0 : 2):如果用户名非空,则加上2字节的长度前缀。
- (password_len == 0 ? 0 : 2):如果密码非空,则加上2字节的长度前缀。
- (will_topic_len == 0 ? 0 : 2):如果遗嘱主题非空,则加上2字节的长度前缀。
- (will_data_len == 0 ? 0 : 2):如果遗嘱消息非空,则加上2字节的长度前缀。*/
- uint32_t send_size = 12 + ID_len + username_len + password_len + will_topic_len + \
- will_data_len + (username_len == 0 ? 0 : 2) + (password_len == 0 ? 0 : 2) + \
- (will_topic_len == 0 ? 0 : 2) + (will_data_len == 0 ? 0 : 2);
-
- // 填入固定报头 0001 0000 CONNECT固定第一字节
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_CONNECT;
-
- //剩余长度字段的解码算法
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- // 可变报头-协议名(固定) utf-8字符串形式
- Load_Utf8_String(&index,"MQTT");
- // 可变报头-协议级别(3.1.1固定为0x04)
- Z_Mqtt_Send_Buffer[index++] = 0x04;
- // 可变报头-连接标志
- uint8_t connect_flag = 0x00;
- if(strlen(username) > 0) connect_flag |= 0x80; //设置用户名标志
- if(strlen(password) > 0) connect_flag |= 0x40; //设置密码标志
- if(strlen(will_topic) > 0) connect_flag |= 0x24; // 设置遗嘱标志与遗嘱信息保留标志
- Z_Mqtt_Send_Buffer[index++] = connect_flag;
- // 可变报头-保持连接时间
- Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME/128;
- Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME%128;
- // 有效载荷-客户端标识符(ID 必须有)
- Load_Utf8_String(&index,ID);
- // 有效载荷-遗嘱主题(will_topic 如果有)
- if(strlen(will_topic) > 0) Load_Utf8_String(&index,will_topic);
- // 有效载荷-遗嘱内容(will_data 如果有)
- if(strlen(will_data) > 0) Load_Utf8_String(&index,will_data);
- // 有效载荷-用户名(username 如果有)
- if(strlen(username) > 0) Load_Utf8_String(&index,username);
- // 有效载荷-密码(password 如果有)
- if(strlen(password) > 0) Load_Utf8_String(&index,password);
- write(mqtt_client, Z_Mqtt_Send_Buffer, index);
-
- ssize_t receive_len = Z_MQTT_TryReceiveData();
- if(receive_len <= 0){
- printf("no receive mqssage\n");
- return -1;
- } // 调试 Printf_Receive(receive_len); if(receive_len != 4){ // 返回字节数不为4,未知错误 printf("unknow error\n"); return -1; } if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_CONNACK){ printf("message type error\n"); // 第一字节不为0x20,服务器发送的报文类型错误 return -1; } if(Z_Mqtt_Receive_Buffer[3] != 0x00){ // 第四字节不为0,一般为参数错误 printf("parameter error\n"); return 1; } return 0;}// 断开与MQTT服务器的毗连void Z_MQTT_DisconnectBroker(void){ Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_DISCONNECT; Z_Mqtt_Send_Buffer[1] = 0x00; write(mqtt_client,Z_Mqtt_Send_Buffer,2); pthread_mutex_destroy(&Mqtt_Receive_Mutex);}// 发送心跳数据包,发送乐成返回0,失败返回1int Z_MQTT_Ping(void){ Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PINGREQ; Z_Mqtt_Send_Buffer[1] = 0x00; write(mqtt_client,Z_Mqtt_Send_Buffer,2); ssize_t receive_len = Z_MQTT_TryReceiveData(); if(receive_len <= 0){ printf("no receive mqssage\n"); return -1; } Printf_Receive(receive_len); if(receive_len != 2){ printf("unknown error\n"); return 1; } if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_PINGRESP){ printf("message type error\n"); // 第一字节不为0xD0,服务器发的报文类型错误 return 1; } return 0;}// 发布主题信息
- // topic 主题名 data 发布内容 retain 保留标志位
- // 成功返回0, 失败返回-1, 参数错误返回1;
- int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain){
- size_t index = 1;
- size_t topic_len = strlen(topic);
- size_t data_len = strlen(data);
- printf("this is publish topic is %s,data is %s\n",topic,data);
- if(topic_len == 0) return 1;
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PUBLISH;
- if(retain) Z_Mqtt_Send_Buffer[0] |= 0x01;
- // 可变报头和有效载荷的长度(剩余长度)
- uint32_t send_size = topic_len + data_len + 2;
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- Load_Utf8_String(&index,topic); // 装载topic
- memcpy((void*)&Z_Mqtt_Send_Buffer[index],(void*)data,data_len); // 装载data
- write(mqtt_client, Z_Mqtt_Send_Buffer, index + data_len);
- return 0;
- }// 订阅主题
- // 成功返回0,失败-1,参数错误返回1
- int Z_MQTT_Subscribe(const char* topic){
- size_t index = 1;
- size_t topic_len = strlen(topic);
- if(topic_len == 0){
- printf("parameter error\n");
- return 1;
- }
- Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_SUBSCRIBE | 0x02;
- // 可变报头和有效载荷的长度(剩余长度)
- uint32_t send_size = topic_len + 5;
- do{ // 将剩余长度填进CONNECT报文的固定报头中
- uint8_t encodedByte = send_size % 128;
- send_size /= 128;
- if ( send_size > 0 ) encodedByte = encodedByte | 0x80;
- Z_Mqtt_Send_Buffer[index++] = encodedByte;
- }while ( send_size > 0 );
- Z_Mqtt_Send_Buffer[index++] = 0x00; // 报文标识符,咱随便给个非0数就行
- Z_Mqtt_Send_Buffer[index++] = 0x01; // 后面需要和返回的报文作对比
- Load_Utf8_String(&index,topic);
- Z_Mqtt_Send_Buffer[index++] = 0x00; // Qos0 等级
- write(mqtt_client,Z_Mqtt_Send_Buffer,index);
- ssize_t receive_len = Z_MQTT_TryReceiveData();
- if(receive_len <= 0){
- printf("no receive mqssage\n");
- return -1;
- }
- Printf_Receive(receive_len);
- if(receive_len != 5){
- printf("unknow error\n");
- return -1;
- }
- if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_SUBACK){
- printf("message type error\n");
- return -1;
- }
- // 报文标识符对不上
- if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){
- printf("message ID error\n");
- return -1;
- }
- if(Z_Mqtt_Receive_Buffer[4] != 0x00){
- printf("Qos error\n");
- return -1;
- }
- return 0;
- }// 取消订阅topic// 乐成返回0 失败返回-1 参数错误返回1int Z_MQTT_Unsubscribe(const char* topic){ size_t index = 1; size_t topic_len = strlen(topic); if(topic_len == 0){ printf("parameter error\n"); return 1; } Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_UNSUBSCRIBE | 0x02; // 可变报头和有效载荷的长度(剩余长度) uint32_t send_size = topic_len + 4; do{ // 将剩余长度填进CONNECT报文的固定报头中 uint8_t encodedByte = send_size % 128; send_size /= 128; if ( send_size > 0 ) encodedByte = encodedByte | 0x80; Z_Mqtt_Send_Buffer[index++] = encodedByte; }while ( send_size > 0 ); Z_Mqtt_Send_Buffer[index++] = 0x00; // 报文标识符,咱恣意给个非0数就行 Z_Mqtt_Send_Buffer[index++] = 0x01; Load_Utf8_String(&index,topic); write(mqtt_client,Z_Mqtt_Send_Buffer,index); ssize_t receive_len = Z_MQTT_TryReceiveData(); if(receive_len <= 0){ printf("no receive mqssage\n"); return -1; } Printf_Receive(receive_len); if(receive_len != 4){ printf("unknow error\n"); return -1; } if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){ printf("meaasge ID error\n"); return -1; } return 0;}// 检测是否收到推送数据// 乐成收取数据返回0,无数据返回-1,参数错误返回1,接收缓冲区错误返回2//size_t tl:主题缓冲区的长度//size_t dl:数据缓冲区的长度int Z_MQTT_ReceivePush(char* topic, char* data, size_t tl, size_t dl){//在实验接收数据之前,获取互斥锁以确保线程安全。这是为了防止多个线程同时访问共享资源(如网络套接字) pthread_mutex_lock(&Mqtt_Receive_Mutex);/*ssize_t recv(int sockfd, void *buf, size_t len, int flags);int sockfd:套接字文件描述符,表现要从中读取数据的socket。void *buf:指向一个缓冲区的指针,用于存储接收到的数据。size_t len:缓冲区的长度,即最多可以接收的数据字节数。int flags:一些控制标志,如 MSG_DONTWAIT 表现非阻塞操作。*///ssize_t:返回值表现接收到的字节数//mqtt_client 是一个全局变量,它存储了MQTT客户端的socket文件描述符//Z_Mqtt_Receive_Buffer 是一个全局缓冲区,用于存储接收到的数据 //MSG_DONTWAIT 使得 recv 调用非阻塞,这样如果当前没有数据可读,recv 会立刻返回 ssize_t receive_len = recv(mqtt_client, Z_Mqtt_Receive_Buffer, Z_MQTT_RECEIVE_SIZE, MSG_DONTWAIT);//pthread_mutex_unlock(&Mqtt_Receive_Mutex);:在数据接收完毕后释放互斥锁 pthread_mutex_unlock(&Mqtt_Receive_Mutex); if(receive_len <= 0) return -1; Printf_Receive(receive_len);//数组的第一个元素(索引为0)包含了消息的固定头部的第一个字节,这个字节包含了消息的类型和其他一些标志位 #define Z_MQTT_FLAG_PUBLISH 0x30 if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_PUBLISH){ printf("message type error\n"); return 2; } size_t index = 1; int multiplier = 1; uint8_t encodedByte = 0; receive_len = 0; do{ //从接收缓冲区中读取下一个字节 encodedByte = Z_Mqtt_Receive_Buffer[index++]; //encodedByte & 127 将 encodedByte 的最高位清零,提取出7位的数值 receive_len += (encodedByte & 127) * multiplier; //multiplier *= 128;:这行代码将乘数翻倍,为下一个7位段的解码做准备 multiplier *= 128; //MQTT协议规定,剩余长度不能高出4个字节,这意味着 multiplier 的最大值应该是 128^4=268435456 if (multiplier > 128*128*128){ printf("unknow error\n"); return 2; } }while ((encodedByte & 128) != 0);///保了所有剩余长度的字节都被读取和处理惩罚 //MQTT协议中,主题长度是两个字节,第一个字节乘以128,然后加上第二个字节,得到主题的总长度 ssize_t topic_len = Z_Mqtt_Receive_Buffer[index++] * 128 + Z_Mqtt_Receive_Buffer[index++]; if(topic_len > tl){ printf("topic buffer so sort\n"); return 1; } //查抄剖析出的主题长度是否高出了提供的主题缓冲区巨细 //析出的数据长度(剩余长度减去主题长度再减去固定头部的2个字节)是否高出了提供的数据缓冲区巨细 if(receive_len - 2 - topic_len > dl){ printf("data buffer so sort\n"); return 1; } memcpy(topic,&Z_Mqtt_Receive_Buffer[index],topic_len); //在主题的末尾添加空字符,确保它是一个精确终止的字符串 topic[topic_len] = '\0'; index += topic_len; memcpy(data,&Z_Mqtt_Receive_Buffer[index],receive_len - 2 - topic_len); data[receive_len - 2 - topic_len] = '\0'; return 0;}
复制代码 文章参考大佬小黑侠kary MQTT协议图解,一文看懂MQTT协议数据包(真实报文数据剖析解释)-CSDN博客
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |