MQTT协议图解,一文看懂MQTT协议数据包MQTT协议史上最全剖析以及 手写 Linu ...

打印 上一主题 下一主题

主题 836|帖子 836|积分 2508

一、MQTT协议

MQTT协议在lot领域是利用的最广泛的通用协议,MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
mqtt协议的优点,官方的解释是:用极少的代码和有限的带宽,为毗连远程设备提供实时可靠的消息服务;简朴明白就是,实现起来简朴,而且在传输上无效的数据也很少,而且能够包管数据传输的可靠性。
二、协议详解


2.1 协议结构



MQTT协议由三部分构成,固定报头,可变报头,有效载荷;固定报头是所有的报文同一的格式,可变抱头则根据固定抱头中的报文类型不同基本不同,每个报文类型基本上都有自己的可变报头格式。

MQTT结构表示图
 

2.1.1 固定报头


固定报头是MQTT协议开头,2个字节,分为三个部分:标志位、报文类型、剩余长度,下图为固定报头结构表示图:

固定报头结构

第一个字节分为2个部分:标志位和报文类型,一个字节有8位二进制,前面4位用来做标志位
后面4位用来表现表现具体的报文类型,一共有16种,有部分类型是预留未利用的类型,在代码实现时,可以暂时忽略;具体的报文类型如下表所示:

 
第二个字节是剩余长度,其表现的就是在剩余长度之后的数据长度(还有多少个字节),剩余长度的字节数是不固定的,至少一个字节,最多4个字节,所以固定报头中包含的一个字节就是剩余长度的第一个字节;这里需要阐明一下,剩余长度的每一个字节的最高位是一个标志位,用来表现下一个字节是否也属于剩余长度!

举例阐明:
比如:
0100 0000(最高位为0) 则阐明后面的字节数据不属于剩余长度了
1010 0000
(第一个字节,最高位为1)
0000 0001
(第二个字节,最高位为0)
则阐明后面的字节数据属于剩余长度,第2个字节最高位为0,则阐明后面没有属于剩余长度的字节了,那么此剩余长度则为这2个字节
因为剩余长度的每个字节的最高位为标志位,所以其真实值并不是这些字节直接合并而成,需要另外进行盘算;

如果第一个字节的最高位为0,则阐明剩余长度只有一个字节,而且其值就是这个字节的值,如:
那么剩余长度的真实值就是:64
如果剩余长度高出一个字节,那么就需要将每个字节的高字节去掉,然后构成一个新的数据,盘算其值。

下面举例阐明:
2个字节长度的剩余长度盘算
第一个字节第二个字节1010 00000000 0001 去掉标志位后,其新值为
第一个字节第二个字节010 0000000 0001 合并时,后面的字节在高位,则合并后值为:1010 0000,则剩余长度的值为十进制的160;
三个字节长度的剩余长度盘算
第一个字节第二个字节第三个字节1001 00011100 10010100 1001 去掉标志位后,其新值为
第一个字节第二个字节第三个字节001 0001100 1001100 1001 后面的字节在高位,则合并后值为:100 1001 100 1001 001 0001,则剩余长度的值为十进制的1205393;
四个字节长度的剩余长度盘算
第一个字节第二个字节第三个字节第四个个字节1001 00011100 10011100 10010100 1101 去掉标志位后,其新值为
第一个字节第二个字节第三个字节第四个个字节001 0001100 1001100 1001100 1101
后面的字节在高位,则合并后值为:100 1101 100 1001 100 1001 001 0001,则剩余长度的值为十进制的162686097;
2.1.2 可变报头


可变报头的数据依据固定报头中的报文类型而不同,一般是包含和报文类型相关的数据;
比方客户端毗连服务器报文,包含了协议名,协议级别(用来表现mqtt的版本信息)、毗连标志、保持毗连;可变报头内容表示图如下:

发布消息报文可变报头,则内容仅包含了主题名和报文标识符(仅当标志位Qos>0)

2.1.3 有效载荷



有效载荷内容也是根据报文类型的不同而不同,在上述可变报头的2张表示图中也能看出,毗连服务器报文的有效载荷则包含了:客户端标识符、遗属主题、遗属消息、用户、暗码,而发布消息报文,则仅仅包含了应用消息;
不同的有效载荷剖析方式有所不同,具体要看每一条报文的规定,详情可以参见末了一个章节提供的Mqtt官方手册。
三、具体协议报文详解(重要)


3.1 毗连服务器报文详解


起首来看看毗连服务器报文协议,先贴出具体的一条毗连服务器报文协议数据(16进制数,每2个数字表现一个字节)
  1. 10ab0100044d51545404c2001400177061686f31363735313537353030373437303030303030000464656d6f00803846334238444532464443384244334437393242453737454143343132303130393731373635453542444436433439394144434545383430434534343142444546313745333036383442443935434137303846353530323232323243433631363144304432334332444643423132463841433939384635394537323133333933
复制代码

先把团体的剖析数据罗列出,再进行详细的剖析阐明:

3.1.1、固定报文

第1个字节10,二进制则为:0001 0000(位置从后往前数) ,前4位为0000,则标志位全为0,标志位数据表现的意思需要根据报文类型来确定;后4位0001,值为1,则表现报文类型为客户端毗连服务器(CONNECT)
第2个字节:ab,二进制为:1010 1011,第2个字节为剩余长度的起始字节,其最高位为1,则阐明后面一个字节也属于剩余长度
3.1.2、可变报文

第3个字节:01,二进制为:0000 0001 最高位为0,则剩余长度结束,共包含了2个字节,依据前面剩余长度的算法解释,可得出剩余长度等于171,表现该报文后面还有171个字节。
根据官方文档的解释:毗连服务器报文可变报头的起始2个字节为协议名的长度;前面已经剖析了3个字节;
第4个字节至第5个字节则为协议名称的长度:00 04 协议名长度为4,
第6-9个字节为协议名,6-9字节为:4d 51 54 54 ,根据协议规定,协议名为ascii码值,则协议名剖析为:MQTT
第10个字节为协议级别:04 (官方解释:0x04 表现3.1.1版本)
接下来的第11个字节为 毗连标志,毗连标志位的详细解释如下图所示:

第11个字节:c2 二进制为:1100 0010,那么会话清理(clean session)为1,表现丢弃之前的会话,开始一个新的会话;另外用户名和暗码标志位为1,则阐明在有效载荷中携带了用户名和暗码信息,遗属标志位为0,则有效载荷中就没有遗属主题和遗属消息
第12-13字节:00 14,用来表现保持毗连的时间(秒),即为10十进制:20秒
到此处可变报头就剖析完了,接下来就是剖析有效载荷部分了,毗连服务器报文有效载荷格式为前个字节为字段长度,然后接字段值的格式
3.1.3、有效载荷

第14-15字节:00 17,用来表现客户端标识符长度,即为10十进制:23个字节
第16-38字节:7061686f31363735313537353030373437303030303030 ,表现客户端标识符,客服端标识符为ascii码格式,则此处为:paho1675157500747000000
第39-40字节:00 04,用来表现用户长度,即为10十进制:4个字节
第41-44字节:64 65 6d 6f,用来表现用户名,ascii码为(demo)
第45-46字节:00 80 ,用来表现暗码长度 ,即为10进制:128
第47-174字节
3846334238444532464443384244334437393242453737454143343132303130393731373635453542444436433439394144434545383430434534343142444546313745333036383442443935434137303846353530323232323243433631363144304432334332444643423132463841433939384635394537323133333933
      即为暗码,也是利用ascii码
至此我们已经完整的剖析了一条mqtt客户端毗连服务器报文,源代码如下:
  1. int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,\
  2.     const char* will_topic, const char* will_data){
  3.     size_t ID_len = strlen(ID);//客户端标识符
  4.     size_t username_len = strlen(username);
  5.     size_t password_len = strlen(password);
  6.     size_t will_topic_len = strlen(will_topic);
  7.     size_t will_data_len = strlen(will_data);//will_data:遗嘱消息内容
  8.     size_t index = 1;         // 填写数据用的索引,从1开始是因为第0位固定为0x10
  9.         
  10. //如果客户端标识符为空,函数返回错误码1
  11.     if(ID_len <= 0) return 1;   // ID必须有
  12.    
  13.     // 可变报头和有效载荷的长度(剩余长度),其中12是可变报头固定 10byte 和记录ID的长度的 2byte
  14. //12:这是CONNECT报文固定报头之后的长度。它包括:
  15. //协议名“MQTT”(4字节)及其长度(2字节)。
  16. //协议级别(1字节)。
  17. //连接标志(1字节)。
  18. //保持连接时间(2字节)。
  19. //
  20. /*(username_len == 0 ? 0 : 2):如果用户名非空,则加上2字节的长度前缀。
  21. (password_len == 0 ? 0 : 2):如果密码非空,则加上2字节的长度前缀。
  22. (will_topic_len == 0 ? 0 : 2):如果遗嘱主题非空,则加上2字节的长度前缀。
  23. (will_data_len == 0 ? 0 : 2):如果遗嘱消息非空,则加上2字节的长度前缀。*/
  24.     uint32_t send_size = 12 + ID_len + username_len + password_len + will_topic_len + \
  25.         will_data_len + (username_len == 0 ? 0 : 2) + (password_len == 0 ? 0 : 2) + \
  26.         (will_topic_len == 0 ? 0 : 2) + (will_data_len == 0 ? 0 : 2);   
  27.    
  28.     // 填入固定报头 0001 0000 CONNECT固定第一字节
  29.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_CONNECT;
  30.         
  31.          //剩余长度字段的解码算法
  32.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  33.         uint8_t encodedByte = send_size % 128;
  34.         send_size /= 128;
  35.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  36.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  37.     }while ( send_size > 0 );
  38.     // 可变报头-协议名(固定) utf-8字符串形式
  39.     Load_Utf8_String(&index,"MQTT");
  40.     // 可变报头-协议级别(3.1.1固定为0x04)
  41.     Z_Mqtt_Send_Buffer[index++] = 0x04;
  42.     // 可变报头-连接标志
  43.     uint8_t connect_flag = 0x00;
  44.     if(strlen(username) > 0) connect_flag |= 0x80;      //设置用户名标志
  45.     if(strlen(password) > 0) connect_flag |= 0x40;      //设置密码标志
  46.     if(strlen(will_topic) > 0) connect_flag |= 0x24;    // 设置遗嘱标志与遗嘱信息保留标志
  47.     Z_Mqtt_Send_Buffer[index++] = connect_flag;
  48.     // 可变报头-保持连接时间
  49.     Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME/128;
  50.     Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME%128;
  51.     // 有效载荷-客户端标识符(ID 必须有)
  52.     Load_Utf8_String(&index,ID);
  53.     // 有效载荷-遗嘱主题(will_topic 如果有)
  54.     if(strlen(will_topic) > 0)  Load_Utf8_String(&index,will_topic);
  55.     // 有效载荷-遗嘱内容(will_data 如果有)
  56.     if(strlen(will_data) > 0)   Load_Utf8_String(&index,will_data);
  57.     // 有效载荷-用户名(username 如果有)
  58.     if(strlen(username) > 0)    Load_Utf8_String(&index,username);
  59.     // 有效载荷-密码(password 如果有)
  60.     if(strlen(password) > 0)    Load_Utf8_String(&index,password);
  61.     write(mqtt_client, Z_Mqtt_Send_Buffer, index);
  62.    
  63.     ssize_t receive_len = Z_MQTT_TryReceiveData();
  64.     if(receive_len <= 0){
  65.         printf("no receive mqssage\n");
  66.         return -1;
  67.     }
复制代码
3.2 发布消息报文详解


发布消息报文,此报文可变报头仅有2个部分,一个是主题名一个是报文标识符,报文标识符仅当Qos大于0时才有,然后就是有效载荷,有效载荷都是自己定义的内容,收到数据后按自定义规则剖析即可;

发布消息报文结构表示图

来看一条具体报文数据(报文都是16进制表现,2个字符表现一个字节)
  1. 301200047465737468656c6c6f2c776f726c64
复制代码
先把团体的剖析数据罗列出,再进行详细的剖析阐明:

3.2.1、固定报文

第1个字节是30,二进制则为:0011 0000(位置从后往前数) ,前4位为0000,则标志位全为0,标志位数据表现的意思需要根据报文类型来确定;后4位0011 值为3,则表现报文类型为发布消息(PUBLISH)
第2个字节:11,二进制为:0001 0001,第2个字节为剩余长度的起始字节,其最高位为0,则阐明剩余长度就只有这一个字节,10进制值为17,表现该报文后面还有17个字节。
3.2.2、可变报文(主题)

第一个就是主题,格式内容为2个字节的主题名称长度,然后跟上主题名;
第3-4个字节为主题名长度:00 04,表现有4个字节的主题名
第5-8字节为主题名:74657374 ,主题名为asicc码格式,则为:test
因为固定报头中标志位为0,则Qos=0,则没有报文标识符
3.2.3、有效载荷(内容)


第9-19字节为有效载荷:68656c6c6f2c776f726c64 (ascii码:hello,world)

再看一条带有Qos=1的发布消息报文数据
  1. 3213000474657374000168656c6c6f2c776f726c64
复制代码
先列出团体的剖析数据

MQTT的服务质量提供3个品级:
QoS0:最多发送一次消息,在消息发送出去后,接收者不会发送回应,发送者也不会重发消息,消息可能送达一次也可能根本没送达,这个服务质量常用在不重要的消息传递中,因为即使消息丢了也没有太大关系。
QoS1:最少发送一次消息(消息最少需要送达一次,也有可送达多次),QoS 1的PUBLISH报文的可变报头中包含一个报文标识符,需要PUBACK报文确认。即需要接收者返回PUBACK应答报文。
QoS2:只发送一次,这是最高品级的服务质量,消息丢失和重复都是不可担当的,只不外利用这个服务质量品级会有额外的开销,这个品级常用于付出中,因为付出是必须有且仅有一次乐成,总不能没给钱或者给了多次钱吧。
其中,利用 QoS 0 可能丢失消息,利用 QoS 1 可以包管收到消息,但消息可能重复,利用 QoS 2 可以包管消息既不丢失也不重复。QoS 品级从低到高,不仅意味着消息可靠性的提拔,也意味着传输复杂程度的提拔。在发布者到订阅者的消息投递流程中,QoS 品级是由发布者在 PUBLISH 报文中指定的。
3.2.4、固定报文

第1个字节是32,二进制则为:0011 0010(位置从后往前数) ,第2-3位表现的是Qos,则Qos=1那么后面的可变报文中,就会添加报文标识符了;后4位0011 值为3,则表现报文类型为发布消息(PUBLISH)
第2个字节:13,二进制为:0001 0011,第2个字节为剩余长度的起始字节,其最高位为0,则阐明剩余长度就只有这一个字节,10进制值为19,表现该报文后面还有19个字节。
3.2.5、可变报文(主题)

第3-4个字节为主题名长度:00 04,表现有4个字节的主题名
第5-8个字节为主题名:74657374 ,主题名为asicc码格式,则为:test
因为标志位中Qos=1,则下面的2个字节表现报文标识符,报文标识符一般都是用来表现消息数,用来区分不同的消息,避免处理惩罚重复消息
第9-10字节:00 01,则报文标识符为1
3.2.6、有效载荷(内容)

第11-21字节为有效载荷:68656c6c6f2c776f726c64 (ascii码:hello,world)
两条报文对比一下,因为固定报头中的标志位不同,影响了可变报头的内容,多了一个报文标识符。
  1. // 发布主题信息
  2. // topic 主题名    data 发布内容    retain 保留标志位
  3. // 成功返回0, 失败返回-1, 参数错误返回1;
  4. int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain){
  5.     size_t index = 1;
  6.     size_t topic_len = strlen(topic);
  7.     size_t data_len = strlen(data);
  8.     printf("this is publish topic is %s,data is %s\n",topic,data);
  9.     if(topic_len == 0) return 1;
  10.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PUBLISH;
  11.     if(retain) Z_Mqtt_Send_Buffer[0] |= 0x01;
  12.     // 可变报头和有效载荷的长度(剩余长度)
  13.     uint32_t send_size = topic_len + data_len + 2;   
  14.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  15.         uint8_t encodedByte = send_size % 128;
  16.         send_size /= 128;
  17.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  18.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  19.     }while ( send_size > 0 );
  20.     Load_Utf8_String(&index,topic);     // 装载topic
  21.     memcpy((void*)&Z_Mqtt_Send_Buffer[index],(void*)data,data_len); // 装载data
  22.     write(mqtt_client, Z_Mqtt_Send_Buffer, index + data_len);   
  23.     return 0;
  24. }
复制代码
3.3、 订阅主题报文

客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅。每个订阅注册客户端关心的一个或多个主题。为了将应用消息转发给那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。SUBSCRIBE报文也(为每个订阅)指定了最大的QoS品级,服务端根据这个发送应用消息给客户端。
3.3.1 固定报头


SUBSCRIBE控制报固定报头的第3,2,1,0位是保留位,必须分别设置为0,0,1,0。服务端必须将其它的任何值都当做是不合法的并关闭网络毗连。
剩余长度字段:等于SUBSCRIBE可变报头的长度(2字节)加上有效载荷的长度。
3.3.2 可变报头



可变报头包含客户端报文标识符。
报文标识符相当于自定义的Topic的ID,用ID号去代替具体的Topic,而不是字段,使得区分发来的Topic的同时又可以节省流量,可自定义,发起自己预先制定一个服务ID表。
订阅返回,返回Topic订阅乐成信息,返回的不是具体Topic,返回的就是报文标识符
3.3.3 有效载荷



SUBSCRIBE报文的有效载荷包含了一个主题过滤器列表,它们表现客户端想要订阅的主题。SUBSCRIBE报文有效载荷中的主题过滤器列表必须是UTF-8字符串。服务端应该支持包含通配符的主题过滤器。如果服务端选择不支持包含通配符的主题过滤器,必须拒绝任何包含通配符过滤器的订阅请求。每一个过滤器后面跟着一个字节,这个字节被叫做 服务质量要求(Requested QoS)。它给出了服务端向客户端发送应用消息所答应的最大QoS品级。
SUBSCRIBE报文的有效载荷必须包含至少一对主题过滤器 和 QoS品级字段组合。没有有效载荷的SUBSCRIBE报文是违反协议的。
当前版本的协议没有效到服务质量要求(Requested QoS)字节的高六位。如果有效载荷中的任何位是非零值,或者QoS不等于0,1或2,服务端必须以为SUBSCRIBE报文是不合法的并关闭网络毗连。
响应:服务端收到客户端发送的一个SUBSCRIBE报文时,必须利用SUBACK报文响应。
服务端发送给客户端的SUBACK(确认主题订阅)报文对每一对主题过滤器 和QoS品级都必须包含一个返回码。这个返回码必须表现谁人订阅被授予的最大QoS品级,或者表现这个订阅失败。服务端可以授予比订阅者要求的低一些的QoS品级。为响应订阅而发出的消息的有效载荷的QoS必须是原始发布消息的QoS和服务端授予的QoS两者中的最小值。如果原始消息的QoS是1而被授予的最大QoS是0,答应服务端重复发送一个消息的副本给订阅者。
  1. // 订阅主题
  2. // 成功返回0,失败-1,参数错误返回1
  3. int Z_MQTT_Subscribe(const char* topic){
  4.     size_t index = 1;
  5.     size_t topic_len = strlen(topic);
  6.     if(topic_len == 0){
  7.         printf("parameter error\n");
  8.         return 1;
  9.     }
  10.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_SUBSCRIBE | 0x02;
  11.     // 可变报头和有效载荷的长度(剩余长度)
  12.     uint32_t send_size = topic_len + 5;   
  13.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  14.         uint8_t encodedByte = send_size % 128;
  15.         send_size /= 128;
  16.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  17.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  18.     }while ( send_size > 0 );
  19.     Z_Mqtt_Send_Buffer[index++] = 0x00;     // 报文标识符,咱随便给个非0数就行
  20.     Z_Mqtt_Send_Buffer[index++] = 0x01;     // 后面需要和返回的报文作对比
  21.     Load_Utf8_String(&index,topic);
  22.     Z_Mqtt_Send_Buffer[index++] = 0x00;     // Qos0 等级
  23.     write(mqtt_client,Z_Mqtt_Send_Buffer,index);
  24.     ssize_t receive_len = Z_MQTT_TryReceiveData();
  25.     if(receive_len <= 0){
  26.         printf("no receive mqssage\n");
  27.         return -1;
  28.     }
  29.     Printf_Receive(receive_len);
  30.     if(receive_len != 5){
  31.         printf("unknow error\n");
  32.         return -1;
  33.     }
  34.     if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_SUBACK){
  35.         printf("message type error\n");
  36.         return -1;
  37.     }
  38.     // 报文标识符对不上
  39.     if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){
  40.         printf("message ID error\n");
  41.         return -1;
  42.     }
  43.     if(Z_Mqtt_Receive_Buffer[4] != 0x00){
  44.         printf("Qos error\n");
  45.         return -1;
  46.     }
  47.     return 0;
  48. }
复制代码

四、全部源码

4.1 main.c

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <sys/socket.h>
  6. #include <arpa/inet.h>
  7. #include <pthread.h>
  8. #include "Z_Mqtt_Tool.h"
  9. // 获取的topic最长长度
  10. #define MQTT_TOPIC_SIZE 128
  11. // 获取的data最长长度
  12. #define MQTT_DATA_SIZE 1024
  13. //客户端ID
  14. #define MQTT_ID         "zhetu"
  15. #define MQTT_USERNAME   "EMS"
  16. #define MQTT_PASSWORD   "EMS"
  17. //遗嘱主题
  18. #define MQTT_WILLTOPIC  "EMS"
  19. //遗嘱消息
  20. #define MQTT_WILLDATA   "EMS"
  21. //存储客户端的socket连接
  22. int client = 0;
  23. //用于存储接收到的主题和数据。
  24. char Mqtt_Topic_Buffer[MQTT_TOPIC_SIZE];
  25. char Mqtt_Data_Buffer[MQTT_DATA_SIZE];
  26. //这个函数用于从标准输入读取主题或数据。根据传入的 flag 参数,它将读取主题或数据,并存储在相应的缓冲区中
  27. void get_str(char flag){
  28.     if(flag == 't'){
  29.         memset(Mqtt_Topic_Buffer,0,MQTT_TOPIC_SIZE);
  30.         printf("input topic: \n");
  31.         fgets(Mqtt_Topic_Buffer,MQTT_TOPIC_SIZE,stdin);
  32.         Mqtt_Topic_Buffer[strlen(Mqtt_Topic_Buffer) - 1] = '\0';    // 消除回车
  33.     }else if(flag == 'd'){
  34.         memset(Mqtt_Data_Buffer,0,MQTT_DATA_SIZE);
  35.         printf("input data: \n");
  36.         fgets(Mqtt_Data_Buffer,MQTT_DATA_SIZE,stdin);
  37.         Mqtt_Data_Buffer[strlen(Mqtt_Data_Buffer) - 1] = '\0';    // 消除回车
  38.     }
  39. }
  40. //Get_Mqtt_Message 函数:这是一个线程函数,它在一个无限循环中不断调用 Z_MQTT_ReceivePush 函数来接收MQTT代理推送的消息,并打印出来。使用 usleep 函数在每次循环后暂停100毫秒
  41. void* Get_Mqtt_Message(void* arg){
  42.     while(1){
  43.                  //它将接收到的主题存储在 Mqtt_Topic_Buffer 中,将数据存储在 Mqtt_Data_Buffer 中
  44.         if(0 == Z_MQTT_ReceivePush(Mqtt_Topic_Buffer,Mqtt_Data_Buffer,MQTT_TOPIC_SIZE,MQTT_DATA_SIZE)){
  45.                           //printf 函数用于打印接收到的消息。它显示主题和数据内容
  46.             printf("receive MQTT pubish,topic is "%s",data is "%s"\n",Mqtt_Topic_Buffer,Mqtt_Data_Buffer);
  47.         }
  48.         usleep(100000);     // 100ms检测一次
  49.     }
  50.     return NULL;
  51. }
  52. int main(void){
  53.     printf("hello world!\n");
  54.     int res = 0;
  55.     char inputchar = 0;
  56.     char inputstr[1024] = {0};
  57.     pthread_t tid;
  58. //用于创建一个新线程,该线程将执行 Get_Mqtt_Message 函数,以便在后台接收MQTT消息
  59. /*
  60. int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
  61.                    void *(*start_routine)(void *), void *arg);
  62. pthread_t *thread:指向 pthread_t 类型的指针,用于存储新创建线程的标识符。
  63. const pthread_attr_t *attr:指向线程属性对象的指针,可以为 NULL,表示使用默认属性。
  64. void *(*start_routine)(void *):新线程开始执行时调用的函数,即线程函数。
  65. void *arg:传递给线程函数的参数,可以为 NULL。
  66. */
  67.     int ret = pthread_create(&tid,NULL,Get_Mqtt_Message,NULL);
  68.     while(1){
  69.         printf("\
  70. /****************************************************\n\
  71. \ta\tconnect TCP server\n\
  72. \tb\tconnect MQTT Broker\n\
  73. \td\tdisconnect MQTT Broker\n\
  74. \ts\tsubscribe topic\n\
  75. \tu\tunsubscribe topic\n\
  76. \tp\tpublish data\n\
  77. \tj\tsend ping to MQTT broker\n\
  78. \tq\tquit\n\
  79. ****************************************************/\n");
  80.         inputchar = getchar();
  81.         getchar();
  82.         if(inputchar == 'a'){
  83.             if(0 == Z_Mqtt_Init()){
  84.                 printf("connect TCP server success!\n");
  85.             }else{
  86.                 printf("connect TCP error,try again\n");
  87.             }
  88.         }else if(inputchar == 'b'){
  89.             if(0 == Z_MQTT_ConnectBroker(MQTT_ID,MQTT_USERNAME,MQTT_PASSWORD,MQTT_WILLTOPIC,MQTT_WILLDATA)){
  90.                 printf("connect MQTT broker success!\n");   
  91.             }else{
  92.                 printf("connect MQTT broker error,rty again\n");
  93.             }
  94.         }else if(inputchar == 'd'){
  95.             Z_MQTT_DisconnectBroker();
  96.         }else if(inputchar == 's'){
  97.             get_str('t');
  98.             if(0 == Z_MQTT_Subscribe(Mqtt_Topic_Buffer)){
  99.                 printf("subscribe success!\n");
  100.             }else{
  101.                 printf("subscribe error,try again\n");
  102.             }
  103.         }else if(inputchar == 'u'){
  104.             get_str('t');
  105.             if(0 == Z_MQTT_Unsubscribe(Mqtt_Topic_Buffer)){
  106.                 printf("unsubscribe success!\n");
  107.             }else{
  108.                 printf("unsubscribe error,try again\n");
  109.             }
  110.         }else if(inputchar == 'p'){
  111.             get_str('t');
  112.             get_str('d');
  113.             if(0 == Z_MQTT_Publish(Mqtt_Topic_Buffer,Mqtt_Data_Buffer,0)){
  114.                 printf("send publish success!\n");
  115.             }else{
  116.                 printf("parameter error,try agagin\n");
  117.             }
  118.         }else if(inputchar == 'j'){
  119.             if(0 == Z_MQTT_Ping()){
  120.                 printf("send ping success!\n");
  121.             }else{
  122.                 printf("unknown error,try agagin\n");
  123.             }
  124.         }else if(inputchar == 'q'){
  125.             Z_MQTT_Close_TCP();
  126.             break;
  127.         }
  128.     }
  129.     printf("good bye~\n");
  130.     return 0;
  131. }
复制代码
4.2 Z_Mqtt_Tool.h

  1. #ifndef __Z_MQTT_TOOL_H
  2. #define __Z_MQTT_TOOL_H
  3. #include <stdio.h>
  4. #include <stdlib.h>
  5. #include <string.h>
  6. #include <unistd.h>
  7. #include <sys/socket.h>
  8. #include <arpa/inet.h>
  9. #include <pthread.h>
  10. // 报文类型
  11. #define Z_MQTT_FLAG_CONNECT      0x10
  12. #define Z_MQTT_FLAG_CONNACK      0x20
  13. #define Z_MQTT_FLAG_PUBLISH      0x30
  14. #define Z_MQTT_FLAG_PUBACK       0x40
  15. #define Z_MQTT_FLAG_PUBREC       0x50
  16. #define Z_MQTT_FLAG_PUBREL       0x60
  17. #define Z_MQTT_FLAG_PUBCOMP      0x70
  18. #define Z_MQTT_FLAG_SUBSCRIBE    0x80
  19. #define Z_MQTT_FLAG_SUBACK       0x90
  20. #define Z_MQTT_FLAG_UNSUBSCRIBE  0xA0
  21. #define Z_MQTT_FLAG_UNSUBACK     0xB0
  22. #define Z_MQTT_FLAG_PINGREQ      0xC0
  23. #define Z_MQTT_FLAG_PINGRESP     0xD0
  24. #define Z_MQTT_FLAG_DISCONNECT   0xE0
  25. int Z_Mqtt_Init(void);
  26. void Z_MQTT_Close_TCP(void);
  27. void Load_Utf8_String(size_t* index, const char* data);
  28. int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,const char* will_topic, const char* will_data);
  29. int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain);
  30. int Z_MQTT_Subscribe(const char* topic);
  31. int Z_MQTT_Unsubscribe(const char* topic);
  32. int Z_MQTT_ReceivePush(char* topic, char* data, size_t topic_len, size_t data_len);
  33. void Z_MQTT_DisconnectBroker(void);
  34. int Z_MQTT_Ping(void);
  35. #endif
复制代码
4.3 Z_Mqtt_Tool.c

  1. #include "Z_Mqtt_Tool.h"// 接收缓冲区巨细以及发送缓冲区巨细#define Z_MQTT_RECEIVE_SIZE 2048#define Z_MQTT_SEND_SIZE    2048// MQTT服务器IP以及端口#define Z_MQTT_BROKER_IP    "10.1.193.87"#define Z_MQTT_BROKER_PORT  1883// 心跳隔断时间,单元秒#define Z_MQTT_KEEP_ALIVE_TIME 100// 等待复兴报文时间,单元秒#define Z_MQTT_WAIT_TIME 10#include <arpa/inet.h> // 包含inet_ntoa函数的头文件//Z_MQTT_RECEIVE_SIZE 和 Z_MQTT_SEND_SIZE:分别定义了接收和发送缓冲区的巨细  //全局变量Z_Mqtt_Send_Buffer 和 Z_Mqtt_Receive_Buffer:分别用于存储发送和接收的数据static uint8_t Z_Mqtt_Send_Buffer[Z_MQTT_SEND_SIZE] = {0}; static uint8_t Z_Mqtt_Receive_Buffer[Z_MQTT_RECEIVE_SIZE] = {0}; mqtt_client:存储客户端的socket文件描述符static int mqtt_client;static pthread_mutex_t Mqtt_Receive_Mutex;// 调试用void Printf_Receive(ssize_t receive_len){    printf("-------------------------------------------------\n");    printf("receive_len is %ld,receive data is ",receive_len);     size_t i;//%02x 是格式化字符串,它表现打印一个两位的十六进制数,如果数字不敷两位,则在前面补零    for(i = 0; i < receive_len; ++i){        printf("%02x\t",(unsigned int)Z_Mqtt_Receive_Buffer[i]);    }    printf("\n");    printf("-------------------------------------------------\n");}// 创建TCP毗连 失败返回-1,乐成返回0int Z_Mqtt_Init(void){    // 初始化发送和接收的缓冲区(buff)以及互斥锁    memset(Z_Mqtt_Send_Buffer,0,Z_MQTT_SEND_SIZE);    memset(Z_Mqtt_Receive_Buffer,0,Z_MQTT_RECEIVE_SIZE);    pthread_mutex_init(&Mqtt_Receive_Mutex,NULL);    // PF_INET 和 AF_INET 差不多,可以混用(发起同一,方便自己看)    mqtt_client = socket(AF_INET, SOCK_STREAM, 0);       // 利用IPv4协议族,TCP    if(-1 == mqtt_client){        printf("create socket error\n");         return -1;    }    // 毗连服务器    struct sockaddr_in addr;/*addr.sin_family 设置为 AF_INET,表现利用IPv4地点族。addr.sin_port 设置为 htons(Z_MQTT_BROKER_PORT),将端口号从主机字节序转换为网络字节序。inet_pton 函数用于将点分十进制的IP地点字符串转换为网络字节序的二进制形式*/    addr.sin_family = AF_INET;          // 利用IPv4协议族    addr.sin_port = htons(Z_MQTT_BROKER_PORT);        // 设置端口,需用htons转成大端    // 将IP地点(点分十进制字符串)转换为网络字节序(IPv4)写进addr.sin_addr.s_addr    inet_pton(AF_INET, Z_MQTT_BROKER_IP, &addr.sin_addr.s_addr);        // 将IP地点(点分十进制字符串)转换为网络字节序(IPv4)写进addr.sin_addr.s_addrinet_pton(AF_INET, Z_MQTT_BROKER_IP, &addr.sin_addr.s_addr);// 打印IP地点char ip_str[INET_ADDRSTRLEN]; // INET_ADDRSTRLEN 是IPv4地点字符串的最大长度struct in_addr ip_addr; // 用于存储网络字节序的IP地点ip_addr.s_addr = addr.sin_addr.s_addr; // 将网络字节序的IP地点赋值给ip_addrstrcpy(ip_str, inet_ntoa(ip_addr)); // 将网络字节序的IP地点转换为点分十进制字符串printf("MQTT Broker IP: %s\n", ip_str); // 打印IP地点 /*int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);int sockfd:这是客户端的套接字文件描述符(socket file descriptor),它必须是一个有效的、已经创建但尚未毗连的套接字。const struct sockaddr *addr:这是一个指向 sockaddr 结构体的指针,它包含了服务器的地点信息。对于IPv4,这通常是 struct sockaddr_in 结构体;addr 参数指定了服务器的协议族、IP地点和端口号socklen_t addrlen:这是 addr 参数指向的 sockaddr 结构体的巨细,以字节为单元。这个参数告诉 connect 函数 addr 结构体中有多少字节是有效的,需要被读取。*/    if(-1 == connect(mqtt_client, (struct sockaddr*)&addr, sizeof(addr))){   // 毗连目标服务器        printf("connect error\n");        return -1;       }    return 0;}void Z_MQTT_Close_TCP(void){    close(mqtt_client);}// 以utf-8编码字符串的形式把数据装载进发送缓冲区  MQTT协议规定,所有的字符串都必须利用UTF-8编码//一个UTF-8字符串可以非常长。因此,在发送之前,需要在字符串前附加一个长度前缀,这样接收方就能知道字符串的确切长度void Load_Utf8_String(size_t* index, const char* data){    //装载长度    Z_Mqtt_Send_Buffer[(*index)++] = strlen(data)/128;    Z_Mqtt_Send_Buffer[(*index)++] = strlen(data)%128;        // 装载数据    memcpy((void*)&Z_Mqtt_Send_Buffer[*index], (void*)data, strlen(data));    (*index) += strlen(data);}// 实验获取来自MQTT的报文int Z_MQTT_TryReceiveData(void){    pthread_mutex_lock(&Mqtt_Receive_Mutex);    ssize_t receive_len = -1;    int count = 0;    do{        receive_len = recv(mqtt_client,Z_Mqtt_Receive_Buffer,Z_MQTT_RECEIVE_SIZE,MSG_DONTWAIT);        usleep(10000);      // 等待10ms        if(++count >= (Z_MQTT_WAIT_TIME * 100)){            printf("wait to long\n");             pthread_mutex_unlock(&Mqtt_Receive_Mutex);            return -1;        }    }while(receive_len <= 0);    pthread_mutex_unlock(&Mqtt_Receive_Mutex);    return receive_len;} // ID:毗连MQTT服务器所利用的ID// username password 毗连MQTT服务器所利用的用户名和暗码(可以为空字符串)// will_topic will_data 遗嘱主题与遗嘱内容(可以为空字符串)// 毗连乐成返回0,失败返回-1,参数错误返回1int Z_MQTT_ConnectBroker(const char* ID, const char* username,const char* password,\
  2.     const char* will_topic, const char* will_data){
  3.     size_t ID_len = strlen(ID);//客户端标识符
  4.     size_t username_len = strlen(username);
  5.     size_t password_len = strlen(password);
  6.     size_t will_topic_len = strlen(will_topic);
  7.     size_t will_data_len = strlen(will_data);//will_data:遗嘱消息内容
  8.     size_t index = 1;         // 填写数据用的索引,从1开始是因为第0位固定为0x10
  9.         
  10. //如果客户端标识符为空,函数返回错误码1
  11.     if(ID_len <= 0) return 1;   // ID必须有
  12.    
  13.     // 可变报头和有效载荷的长度(剩余长度),其中12是可变报头固定 10byte 和记录ID的长度的 2byte
  14. //12:这是CONNECT报文固定报头之后的长度。它包括:
  15. //协议名“MQTT”(4字节)及其长度(2字节)。
  16. //协议级别(1字节)。
  17. //连接标志(1字节)。
  18. //保持连接时间(2字节)。
  19. //
  20. /*(username_len == 0 ? 0 : 2):如果用户名非空,则加上2字节的长度前缀。
  21. (password_len == 0 ? 0 : 2):如果密码非空,则加上2字节的长度前缀。
  22. (will_topic_len == 0 ? 0 : 2):如果遗嘱主题非空,则加上2字节的长度前缀。
  23. (will_data_len == 0 ? 0 : 2):如果遗嘱消息非空,则加上2字节的长度前缀。*/
  24.     uint32_t send_size = 12 + ID_len + username_len + password_len + will_topic_len + \
  25.         will_data_len + (username_len == 0 ? 0 : 2) + (password_len == 0 ? 0 : 2) + \
  26.         (will_topic_len == 0 ? 0 : 2) + (will_data_len == 0 ? 0 : 2);   
  27.    
  28.     // 填入固定报头 0001 0000 CONNECT固定第一字节
  29.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_CONNECT;
  30.         
  31.          //剩余长度字段的解码算法
  32.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  33.         uint8_t encodedByte = send_size % 128;
  34.         send_size /= 128;
  35.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  36.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  37.     }while ( send_size > 0 );
  38.     // 可变报头-协议名(固定) utf-8字符串形式
  39.     Load_Utf8_String(&index,"MQTT");
  40.     // 可变报头-协议级别(3.1.1固定为0x04)
  41.     Z_Mqtt_Send_Buffer[index++] = 0x04;
  42.     // 可变报头-连接标志
  43.     uint8_t connect_flag = 0x00;
  44.     if(strlen(username) > 0) connect_flag |= 0x80;      //设置用户名标志
  45.     if(strlen(password) > 0) connect_flag |= 0x40;      //设置密码标志
  46.     if(strlen(will_topic) > 0) connect_flag |= 0x24;    // 设置遗嘱标志与遗嘱信息保留标志
  47.     Z_Mqtt_Send_Buffer[index++] = connect_flag;
  48.     // 可变报头-保持连接时间
  49.     Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME/128;
  50.     Z_Mqtt_Send_Buffer[index++] = Z_MQTT_KEEP_ALIVE_TIME%128;
  51.     // 有效载荷-客户端标识符(ID 必须有)
  52.     Load_Utf8_String(&index,ID);
  53.     // 有效载荷-遗嘱主题(will_topic 如果有)
  54.     if(strlen(will_topic) > 0)  Load_Utf8_String(&index,will_topic);
  55.     // 有效载荷-遗嘱内容(will_data 如果有)
  56.     if(strlen(will_data) > 0)   Load_Utf8_String(&index,will_data);
  57.     // 有效载荷-用户名(username 如果有)
  58.     if(strlen(username) > 0)    Load_Utf8_String(&index,username);
  59.     // 有效载荷-密码(password 如果有)
  60.     if(strlen(password) > 0)    Load_Utf8_String(&index,password);
  61.     write(mqtt_client, Z_Mqtt_Send_Buffer, index);
  62.    
  63.     ssize_t receive_len = Z_MQTT_TryReceiveData();
  64.     if(receive_len <= 0){
  65.         printf("no receive mqssage\n");
  66.         return -1;
  67.     }    // 调试    Printf_Receive(receive_len);    if(receive_len != 4){                   // 返回字节数不为4,未知错误        printf("unknow error\n");        return -1;    }    if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_CONNACK){        printf("message type error\n");     // 第一字节不为0x20,服务器发送的报文类型错误        return -1;            }    if(Z_Mqtt_Receive_Buffer[3] != 0x00){   // 第四字节不为0,一般为参数错误        printf("parameter error\n");        return 1;    }        return 0;}// 断开与MQTT服务器的毗连void Z_MQTT_DisconnectBroker(void){    Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_DISCONNECT;    Z_Mqtt_Send_Buffer[1] = 0x00;    write(mqtt_client,Z_Mqtt_Send_Buffer,2);    pthread_mutex_destroy(&Mqtt_Receive_Mutex);}// 发送心跳数据包,发送乐成返回0,失败返回1int Z_MQTT_Ping(void){    Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PINGREQ;    Z_Mqtt_Send_Buffer[1] = 0x00;    write(mqtt_client,Z_Mqtt_Send_Buffer,2);    ssize_t receive_len = Z_MQTT_TryReceiveData();    if(receive_len <= 0){        printf("no receive mqssage\n");        return -1;    }        Printf_Receive(receive_len);    if(receive_len != 2){        printf("unknown error\n");        return 1;    }    if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_PINGRESP){        printf("message type error\n");     // 第一字节不为0xD0,服务器发的报文类型错误        return 1;    }     return 0;}// 发布主题信息
  68. // topic 主题名    data 发布内容    retain 保留标志位
  69. // 成功返回0, 失败返回-1, 参数错误返回1;
  70. int Z_MQTT_Publish(const char* topic, const char* data, uint8_t retain){
  71.     size_t index = 1;
  72.     size_t topic_len = strlen(topic);
  73.     size_t data_len = strlen(data);
  74.     printf("this is publish topic is %s,data is %s\n",topic,data);
  75.     if(topic_len == 0) return 1;
  76.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_PUBLISH;
  77.     if(retain) Z_Mqtt_Send_Buffer[0] |= 0x01;
  78.     // 可变报头和有效载荷的长度(剩余长度)
  79.     uint32_t send_size = topic_len + data_len + 2;   
  80.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  81.         uint8_t encodedByte = send_size % 128;
  82.         send_size /= 128;
  83.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  84.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  85.     }while ( send_size > 0 );
  86.     Load_Utf8_String(&index,topic);     // 装载topic
  87.     memcpy((void*)&Z_Mqtt_Send_Buffer[index],(void*)data,data_len); // 装载data
  88.     write(mqtt_client, Z_Mqtt_Send_Buffer, index + data_len);   
  89.     return 0;
  90. }// 订阅主题
  91. // 成功返回0,失败-1,参数错误返回1
  92. int Z_MQTT_Subscribe(const char* topic){
  93.     size_t index = 1;
  94.     size_t topic_len = strlen(topic);
  95.     if(topic_len == 0){
  96.         printf("parameter error\n");
  97.         return 1;
  98.     }
  99.     Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_SUBSCRIBE | 0x02;
  100.     // 可变报头和有效载荷的长度(剩余长度)
  101.     uint32_t send_size = topic_len + 5;   
  102.     do{ // 将剩余长度填进CONNECT报文的固定报头中
  103.         uint8_t encodedByte = send_size % 128;
  104.         send_size /= 128;
  105.         if ( send_size > 0 )    encodedByte = encodedByte | 0x80;
  106.         Z_Mqtt_Send_Buffer[index++] = encodedByte;
  107.     }while ( send_size > 0 );
  108.     Z_Mqtt_Send_Buffer[index++] = 0x00;     // 报文标识符,咱随便给个非0数就行
  109.     Z_Mqtt_Send_Buffer[index++] = 0x01;     // 后面需要和返回的报文作对比
  110.     Load_Utf8_String(&index,topic);
  111.     Z_Mqtt_Send_Buffer[index++] = 0x00;     // Qos0 等级
  112.     write(mqtt_client,Z_Mqtt_Send_Buffer,index);
  113.     ssize_t receive_len = Z_MQTT_TryReceiveData();
  114.     if(receive_len <= 0){
  115.         printf("no receive mqssage\n");
  116.         return -1;
  117.     }
  118.     Printf_Receive(receive_len);
  119.     if(receive_len != 5){
  120.         printf("unknow error\n");
  121.         return -1;
  122.     }
  123.     if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_SUBACK){
  124.         printf("message type error\n");
  125.         return -1;
  126.     }
  127.     // 报文标识符对不上
  128.     if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){
  129.         printf("message ID error\n");
  130.         return -1;
  131.     }
  132.     if(Z_Mqtt_Receive_Buffer[4] != 0x00){
  133.         printf("Qos error\n");
  134.         return -1;
  135.     }
  136.     return 0;
  137. }// 取消订阅topic// 乐成返回0 失败返回-1 参数错误返回1int Z_MQTT_Unsubscribe(const char* topic){    size_t index = 1;    size_t topic_len = strlen(topic);    if(topic_len == 0){        printf("parameter error\n");        return 1;    }    Z_Mqtt_Send_Buffer[0] = Z_MQTT_FLAG_UNSUBSCRIBE | 0x02;    // 可变报头和有效载荷的长度(剩余长度)    uint32_t send_size = topic_len + 4;        do{ // 将剩余长度填进CONNECT报文的固定报头中        uint8_t encodedByte = send_size % 128;        send_size /= 128;        if ( send_size > 0 )    encodedByte = encodedByte | 0x80;        Z_Mqtt_Send_Buffer[index++] = encodedByte;    }while ( send_size > 0 );    Z_Mqtt_Send_Buffer[index++] = 0x00;     // 报文标识符,咱恣意给个非0数就行    Z_Mqtt_Send_Buffer[index++] = 0x01;    Load_Utf8_String(&index,topic);    write(mqtt_client,Z_Mqtt_Send_Buffer,index);    ssize_t receive_len = Z_MQTT_TryReceiveData();    if(receive_len <= 0){        printf("no receive mqssage\n");        return -1;    }    Printf_Receive(receive_len);    if(receive_len != 4){        printf("unknow error\n");        return -1;    }    if(Z_Mqtt_Receive_Buffer[2] != 0x00 || Z_Mqtt_Receive_Buffer[3] != 0x01){        printf("meaasge ID error\n");        return -1;    }    return 0;}// 检测是否收到推送数据// 乐成收取数据返回0,无数据返回-1,参数错误返回1,接收缓冲区错误返回2//size_t tl:主题缓冲区的长度//size_t dl:数据缓冲区的长度int Z_MQTT_ReceivePush(char* topic, char* data, size_t tl, size_t dl){//在实验接收数据之前,获取互斥锁以确保线程安全。这是为了防止多个线程同时访问共享资源(如网络套接字)    pthread_mutex_lock(&Mqtt_Receive_Mutex);/*ssize_t recv(int sockfd, void *buf, size_t len, int flags);int sockfd:套接字文件描述符,表现要从中读取数据的socket。void *buf:指向一个缓冲区的指针,用于存储接收到的数据。size_t len:缓冲区的长度,即最多可以接收的数据字节数。int flags:一些控制标志,如 MSG_DONTWAIT 表现非阻塞操作。*///ssize_t:返回值表现接收到的字节数//mqtt_client 是一个全局变量,它存储了MQTT客户端的socket文件描述符//Z_Mqtt_Receive_Buffer 是一个全局缓冲区,用于存储接收到的数据  //MSG_DONTWAIT 使得 recv 调用非阻塞,这样如果当前没有数据可读,recv 会立刻返回    ssize_t receive_len = recv(mqtt_client, Z_Mqtt_Receive_Buffer, Z_MQTT_RECEIVE_SIZE, MSG_DONTWAIT);//pthread_mutex_unlock(&Mqtt_Receive_Mutex);:在数据接收完毕后释放互斥锁    pthread_mutex_unlock(&Mqtt_Receive_Mutex);        if(receive_len <= 0)    return -1;        Printf_Receive(receive_len);//数组的第一个元素(索引为0)包含了消息的固定头部的第一个字节,这个字节包含了消息的类型和其他一些标志位 #define Z_MQTT_FLAG_PUBLISH      0x30    if(Z_Mqtt_Receive_Buffer[0] != Z_MQTT_FLAG_PUBLISH){        printf("message type error\n");        return 2;    }    size_t index = 1;    int multiplier = 1;    uint8_t encodedByte = 0;    receive_len = 0;    do{                //从接收缓冲区中读取下一个字节        encodedByte = Z_Mqtt_Receive_Buffer[index++];                //encodedByte & 127 将 encodedByte 的最高位清零,提取出7位的数值        receive_len += (encodedByte & 127) * multiplier;                //multiplier *= 128;:这行代码将乘数翻倍,为下一个7位段的解码做准备        multiplier *= 128;                //MQTT协议规定,剩余长度不能高出4个字节,这意味着 multiplier 的最大值应该是 128^4=268435456            if (multiplier > 128*128*128){                        printf("unknow error\n");                        return 2;                 }    }while ((encodedByte & 128) != 0);///保了所有剩余长度的字节都被读取和处理惩罚                //MQTT协议中,主题长度是两个字节,第一个字节乘以128,然后加上第二个字节,得到主题的总长度    ssize_t topic_len = Z_Mqtt_Receive_Buffer[index++] * 128 + Z_Mqtt_Receive_Buffer[index++];    if(topic_len > tl){        printf("topic buffer so sort\n");        return 1;    }         //查抄剖析出的主题长度是否高出了提供的主题缓冲区巨细         //析出的数据长度(剩余长度减去主题长度再减去固定头部的2个字节)是否高出了提供的数据缓冲区巨细    if(receive_len - 2 - topic_len > dl){        printf("data buffer so sort\n");        return 1;    }    memcpy(topic,&Z_Mqtt_Receive_Buffer[index],topic_len);          //在主题的末尾添加空字符,确保它是一个精确终止的字符串    topic[topic_len] = '\0';    index += topic_len;    memcpy(data,&Z_Mqtt_Receive_Buffer[index],receive_len - 2 - topic_len);    data[receive_len - 2 - topic_len] = '\0';    return 0;}
复制代码
 文章参考大佬小黑侠kary MQTT协议图解,一文看懂MQTT协议数据包(真实报文数据剖析解释)-CSDN博客

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

商道如狼道

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表