MQTT客户端在HarmonyOS应用中的搭建

莱莱  金牌会员 | 2024-10-31 02:27:11 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 837|帖子 837|积分 2511

本文先容了MQTT服务端部署,测试到MQTT客户端搭建的全过程,全文篇幅有点长,盼望各位大佬能耐烦查阅,多多指教。
什么是MQTT?

在上个项目的开辟中,碰到一个很特殊的需求,在鸿蒙中搭建MQTT的客户端,刚接到的需求的时间也是一脸懵,MQTT是啥东西,都没打仗过,然后经过一系列的查询,终于搞清楚了什么是MQTT:
   MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为毗连长途设备提供实时可靠的消息服务
  于是下个困难出现了,这个协议在鸿蒙中该如何搭建,于是我去 鸿蒙开辟者论坛 一搜索,发现鸿蒙中好像根本不支持这协议:

于是我寻思在Android和IOS中竟然能实现,那在鸿蒙中应该也是能实现,经过了大量的资料查询,在鸿蒙开源三方库资源汇总 中终于找到了基于鸿蒙的MQTT客户端第三方库 : @ohos/mqtt
   鸿蒙开源三方库资源汇总:https://gitee.com/openharmony-tpc/tpc_resource
@ohos/mqtt:https://gitee.com/openharmony-tpc/ohos_mqtt
  搭建MQTT服务端并测试

由于 @ohos/mqtt 只能应用于鸿蒙客户端的搭建,对于初次使用且没有MQTT服务端情况的同学不是特殊友好,以是本文会在开始前先容如何部署一个简易MQTT服务器用来生产消息,且用可视化客户端工具毗连服务器,测试服务器是否部署成功 ,若是有服务端情况的同学则可以跳过本节。
一、开通云服务(免费)

笔者这边推荐一个免费的MQTT云服务,EMQX Cloud,这个网站新人注册会有免费开通云服务的资格,直接注册即可
   EMQX Cloud:https://www.emqx.com/zh/cloud
  步骤如下:
1.点击免费试用

2.点击购买专有版,输入部署名称,其他选项默认,初次注册购买会有14天免费使用期限,到期删除部署即可

3.检察毗连信息
云服务开通后会进入如何页面,点击 概览,下图红圈中的毗连信息即使我们客户端必要的信息,如图所示可以使用各种毗连协议,我们本次使用的是 MQTT over TCP 端口为 1883

4.添加客户端认证
如下图所示,点击添加,输入所必要登录的客户端账号密码,这个后面会用到,写个自己认识的就好

至此,MQTT云服务已部署完毕,很简单吧,哈哈哈哈
接下来我们来测试云服务是否能正常发送消息
二、在MQTTX中毗连服务器

首先我们必要下载一个可视化客户端工具:MQTTX
   MQTTX :https://mqttx.app/zh
  下载完成之后打开工具,先设置成中文表现,如下图所示

然后点击配置进入配置页面,配置毗连地点,用户名,密码等

名称随意设置一个,ClientID随机即可,协议选择mqtt://,ip在刚刚云服务中的毗连信息可以检察,端口为:1883,用户名和密码即我们刚刚设置的,至此毗连信息配置完毕,点击右上角毗连

毗连成功之后,我们点击添加订阅,设置订阅主题,主题名为随意设置,记着即可,我这边设置的是:topic1,qos设置为1,至少一次

回到主页面,点击左侧主题,变色即表示订阅成功,然后点击右下角发送消息,在发送视图中如果我们能看到吸收到的服务端的消息,则表示测试成功,我们部署的云服务可以成功毗连。

在鸿蒙应用中搭建MQTT客户端

好了,铺垫了这么多,终于到了写代码的阶段了
一、安装@ohos/mqtt

1.安装
  1. ohpm install @ohos/mqtt
复制代码
2.开启网络权限
在 module.json5 中开启网络服务权限
  1. "requestPermissions": [
  2.    {
  3.       "name": "ohos.permission.INTERNET",
  4.     }
  5. ]
复制代码
二、封装MQTTClient方法

根据 @ohos/mqtt文档,搭建MQTT客户端的第一步是创建一个MQTT实例
1.创建实例
  1. import {
  2.   MqttAsync,
  3.   MqttClient,
  4.   MqttMessage,
  5.   MqttQos,
  6.   MqttResponse,
  7.   MqttSubscribeOptions,
  8. } from '@ohos/mqtt';
  9. import emitter from '@ohos.events.emitter';
  10. public createMqttClient() {
  11.    this.mqttClient = MqttAsync.createMqtt({
  12.      url: this.url,
  13.      clientId: this.clientId,
  14.      persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件系统)持久性机制。1=在内存持久性中使用。2=使用特定于应用程序的持久性实现
  15.    })
  16. }
复制代码
2.毗连服务器
  1.   public async connectMqtt() {
  2.     if (!this.mqttClient) {
  3.       return
  4.     }
  5.     await this.mqttClient.connect({
  6.       userName: this.userName,
  7.       password: this.password,
  8.       connectTimeout: 30, // 设置连接超时时间,默认
  9.       automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
  10.       MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
  11.       // sslOptions:{
  12.       //   enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
  13.       //    trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
  14.       // }
  15.     }).then((res: MqttResponse) => {
  16.       console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
  17.     }).catch((err: Error) => {
  18.       console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
  19.     })
  20.   }
复制代码
3.订阅及监听主题
  1. public async subscribe() {
  2.     if (!this.mqttClient) {
  3.       return
  4.     }
  5.     let subscribeOption: MqttSubscribeOptions = {
  6.       topic: this.topic,
  7.       qos: this.qos as MqttQos
  8.     }
  9.     await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
  10.       console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
  11.     }).catch((err: Error) => {
  12.       console.error('MQTT订阅主题失败:', JSON.stringify(err))
  13.     })
  14.     // 监听订阅主题
  15.     this.messageArrived()
  16.   }
  17.   // 监听订阅主题
  18.   public messageArrived() {
  19.     if (!this.mqttClient) {
  20.       return
  21.     }
  22.     // 监听主题发送消息
  23.     this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
  24.       if (err) {
  25.         console.error('MQTT接收消息失败:', JSON.stringify(err))
  26.       } else {
  27.         console.info('MQTT接收消息成功:', JSON.stringify(data))
  28.         // 发送消息至线程
  29.         emitter.emit({
  30.           eventId: EVENTID,
  31.           priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
  32.         }, {
  33.           data: {
  34.             content: data.payload,
  35.           }
  36.         })
  37.       }
  38.     })
  39.   }
复制代码
4.发送消息
  1. public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
  2.     if (!this.mqttClient) {
  3.       return
  4.     }
  5.     await this.mqttClient.publish({
  6.       topic: pic,
  7.       qos: qo,
  8.       payload: msg
  9.     }).then((data: MqttResponse) => {
  10.       console.info("MQTT消息发送成功:", JSON.stringify(data))
  11.     }).catch((err: Error) => {
  12.       console.error("MQTT消息发送失败:", JSON.stringify(err))
  13.     })
  14.   }
复制代码
理解了这四步的代码即可搭建一个简单的客户端了,但现实项目中,仅仅如许写是不敷的,我们必要把以上代码封装成一个类,便于页面的调用及复用,借助与ts的特性,我们还可以把这个类优化一下,封装成一个单例,如许的利益在于,应用有且仅有一个实例,在应用入口一次声明,在应用出口一次销毁即可,防止开辟者多次声明的情况发生,好了,让我们结合上面的代码封装一个 MQTTClient 类
  1. import {  MqttAsync,  MqttClient,  MqttMessage,  MqttQos,  MqttResponse,  MqttSubscribeOptions,} from '@ohos/mqtt';import emitter from '@ohos.events.emitter';// 事件idconst EVENTID = 1001interface MQTTOptionsType {  url?: string  clientId?: string  userName?: string  password?: string  topic?: string  qos?: MqttQos | undefined}class MQTTClient {  private static instance: MQTTClient  private mqttClient: MqttClient | null = null  private url: string =    '' // 以null结尾的字符串,指定客户端将毗连到的服务器。它接纳的形式protocol://host:port.protocol必须是tcp、ssl、ws或wss。对于主机,可以指定IP地点或主机名。例如,tcp://localhost:1883  private clientId: string = '' // 客户端毗连到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串  private userName: string = '' // 客户端用户名  private password: string = '' // 客户端密码  private topic: string = '' // 主题名称  private qos: MqttQos | undefined = undefined // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次  constructor(mqttOptions: MQTTOptionsType) {    mqttOptions.url && (this.url = mqttOptions.url)    mqttOptions.clientId && (this.clientId = mqttOptions.clientId)    mqttOptions.userName && (this.userName = mqttOptions.userName)    mqttOptions.password && (this.password = mqttOptions.password)    mqttOptions.topic && (this.topic = mqttOptions.topic)    mqttOptions.qos && (this.qos = mqttOptions.qos)    this.init()  }  // 静态方法获取唯一实例  public static getInstance(mqttOptions: MQTTOptionsType): MQTTClient {    if (!MQTTClient.instance) {      MQTTClient.instance = new MQTTClient(mqttOptions)    }    return MQTTClient.instance  }  // 初始化方法  async init() {    this.createMqttClient() // 创建mqtt实例    await this.connectMqtt() // 毗连服务器    await this.subscribe() // 订阅主题    this.messageArrived() // 监听订阅主题  }  // 创建mqtt实例  public createMqttClient() {    this.mqttClient = MqttAsync.createMqtt({      url: this.url,      clientId: this.clientId,      persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件体系)持久性机制。1=在内存持久性中使用。2=使用特定于应用步伐的持久性实现    })  }  // 毗连服务器  public async connectMqtt() {
  2.     if (!this.mqttClient) {
  3.       return
  4.     }
  5.     await this.mqttClient.connect({
  6.       userName: this.userName,
  7.       password: this.password,
  8.       connectTimeout: 30, // 设置连接超时时间,默认
  9.       automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
  10.       MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
  11.       // sslOptions:{
  12.       //   enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
  13.       //    trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
  14.       // }
  15.     }).then((res: MqttResponse) => {
  16.       console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
  17.     }).catch((err: Error) => {
  18.       console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
  19.     })
  20.   }
  21.   // 订阅主题  public async subscribe() {
  22.     if (!this.mqttClient) {
  23.       return
  24.     }
  25.     let subscribeOption: MqttSubscribeOptions = {
  26.       topic: this.topic,
  27.       qos: this.qos as MqttQos
  28.     }
  29.     await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
  30.       console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
  31.     }).catch((err: Error) => {
  32.       console.error('MQTT订阅主题失败:', JSON.stringify(err))
  33.     })
  34.     // 监听订阅主题
  35.     this.messageArrived()
  36.   }
  37.   // 监听订阅主题
  38.   public messageArrived() {
  39.     if (!this.mqttClient) {
  40.       return
  41.     }
  42.     // 监听主题发送消息
  43.     this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
  44.       if (err) {
  45.         console.error('MQTT接收消息失败:', JSON.stringify(err))
  46.       } else {
  47.         console.info('MQTT接收消息成功:', JSON.stringify(data))
  48.         // 发送消息至线程
  49.         emitter.emit({
  50.           eventId: EVENTID,
  51.           priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
  52.         }, {
  53.           data: {
  54.             content: data.payload,
  55.           }
  56.         })
  57.       }
  58.     })
  59.   }
  60.   /**   * 发送消息   * @param pic 订阅的主题   * @param msg 消息内容   * @param qo qos   */  public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
  61.     if (!this.mqttClient) {
  62.       return
  63.     }
  64.     await this.mqttClient.publish({
  65.       topic: pic,
  66.       qos: qo,
  67.       payload: msg
  68.     }).then((data: MqttResponse) => {
  69.       console.info("MQTT消息发送成功:", JSON.stringify(data))
  70.     }).catch((err: Error) => {
  71.       console.error("MQTT消息发送失败:", JSON.stringify(err))
  72.     })
  73.   }
  74.   // 销毁客户端实例  public async destroy() {    if (!this.mqttClient) {      return    }    await this.mqttClient.destroy().then((data: boolean) => {      if (data) {        console.info('MQTT实例销毁成功')        emitter.off(EVENTID)      } else {        console.error('MQTT实例销毁失败')      }    })  }}export {  MQTTClient, EVENTID, MQTTOptionsType}
复制代码
在上面的代码中,我们还用到了一个ArkTS中的内置模块 @ohos.events.emitter 线程通信,它可以在应用的任何地方分发事件,在页面中监听事件并吸收数据,稍后的页面中我们会用到
三、使用MQTTClient方法

首先我们必要构建一个页面视图,视图中可以表现发送的消息,吸收到的消息等,因封装的方法中声明实例即自动毗连服务器,以是不必要毗连按钮
  1. @Entry
  2. @Component
  3. struct Index {
  4.   @State receiveMsg: string = ''
  5.   @State sendMsg: string = ''
  6.   build() {
  7.     Column({ space: 24 }) {
  8.       Text('MQTT客户端')
  9.         .fontSize(32)
  10.         .fontWeight(500)
  11.         .margin({ bottom: 20 })
  12.       Row() {
  13.         Text('接收到的消息:')
  14.         TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
  15.           .layoutWeight(1)
  16.       }
  17.       Row({ space: 16 }) {
  18.         Button('发送消息')
  19.         TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
  20.           .layoutWeight(1)
  21.       }
  22.       .width('100%')
  23.       Button('销毁客户端')
  24.         .width('100%')
  25.     }
  26.     .padding(20)
  27.     .width('100%')
  28.   }
  29. }
复制代码

然后我们引入并声明刚刚封装的 MQTTClient  类,并配置毗连信息,与MQTTX工具中配置的一样即可,必要注意的是url,必须是 mqtt协议+ip + 端口的格式,如:mqtt://0.0.0.0:1883
  1. import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
  2. import { emitter } from '@kit.BasicServicesKit'
  3. const MQTTOption: MQTTOptionsType = {
  4.   url: 'mqtt:/${ip}:1883',  // 注意:填写自己部署的连接地址
  5.   clientId: 'mqtt123456@', // 客户端连接到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串
  6.   userName: 'root', // 客户端用户名
  7.   password: 'admin', // 客户端密码
  8.   topic: 'topic1', // 主题名称
  9.   qos: 1 // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次
  10. }
  11. const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
复制代码
引入之后即可给视图绑定事件,并在声明周期中监听服务端发送的消息,完整代码如下:
  1. import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
  2. import { emitter } from '@kit.BasicServicesKit'
  3. const MQTTOption: MQTTOptionsType = {
  4.   url: 'mqtt://k178b5e8.ala.dedicated.aliyun.emqxcloud.cn:1883',
  5.   clientId: 'mqtt123456@',
  6.   userName: 'root',
  7.   password: 'admin',
  8.   topic: 'topic1',
  9.   qos: 1
  10. }
  11. const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
  12. @Entry
  13. @Component
  14. struct Index {
  15.   @State receiveMsg: string = ''
  16.   @State sendMsg: string = ''
  17.   onPageShow(): void {
  18.     emitter.on({
  19.       eventId: EVENTID
  20.     }, (data) => {
  21.       this.receiveMsg = JSON.stringify(data)
  22.     });
  23.   }
  24.   build() {
  25.     Column({ space: 24 }) {
  26.       Text('MQTT客户端')
  27.         .fontSize(32)
  28.         .fontWeight(500)
  29.         .margin({ bottom: 20 })
  30.       Row() {
  31.         Text('接收到的消息:')
  32.         TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
  33.           .layoutWeight(1)
  34.       }
  35.       Row({ space: 16 }) {
  36.         Button('发送消息')
  37.           .onClick(() => {
  38.             if (this.sendMsg) {
  39.               MQTTInstance.pushMessage(this.sendMsg)
  40.               this.sendMsg = ''
  41.             }
  42.           })
  43.         TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
  44.           .layoutWeight(1)
  45.       }
  46.       .width('100%')
  47.       Button('销毁客户端')
  48.         .width('100%')
  49.         .onClick(() => {
  50.           MQTTInstance.destroy()
  51.         })
  52.     }
  53.     .padding(20)
  54.     .width('100%')
  55.   }
  56. }
复制代码
在模仿器中运行视图代码,然后打开日记就可以看到自己的打印信息,如毗连成功,订阅主题成功等

在输入框中填写必要发送给服务端的信息,点击发送,即可在吸收信息框中看到服务端发送到客户端的信息

点击销毁客户端可销毁当前实例,这个步骤可以推出应用时触发,至此,MQTT客户端搭建完毕。

总结

第一次写文章,全文总计一万多字,才知道原创码字原来这么累,盼望能得到各位大佬的一个点赞。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莱莱

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表