本文先容了MQTT服务端部署,测试到MQTT客户端搭建的全过程,全文篇幅有点长,盼望各位大佬能耐烦查阅,多多指教。
什么是MQTT?
在上个项目的开辟中,碰到一个很特殊的需求,在鸿蒙中搭建MQTT的客户端,刚接到的需求的时间也是一脸懵,MQTT是啥东西,都没打仗过,然后经过一系列的查询,终于搞清楚了什么是MQTT:
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为毗连长途设备提供实时可靠的消息服务
于是下个困难出现了,这个协议在鸿蒙中该如何搭建,于是我去 鸿蒙开辟者论坛 一搜索,发现鸿蒙中好像根本不支持这协议:
于是我寻思在Android和IOS中竟然能实现,那在鸿蒙中应该也是能实现,经过了大量的资料查询,在鸿蒙开源三方库资源汇总 中终于找到了基于鸿蒙的MQTT客户端第三方库 : @ohos/mqtt
鸿蒙开源三方库资源汇总:https://gitee.com/openharmony-tpc/tpc_resource
@ohos/mqtt:https://gitee.com/openharmony-tpc/ohos_mqtt
搭建MQTT服务端并测试
由于 @ohos/mqtt 只能应用于鸿蒙客户端的搭建,对于初次使用且没有MQTT服务端情况的同学不是特殊友好,以是本文会在开始前先容如何部署一个简易MQTT服务器用来生产消息,且用可视化客户端工具毗连服务器,测试服务器是否部署成功 ,若是有服务端情况的同学则可以跳过本节。
一、开通云服务(免费)
笔者这边推荐一个免费的MQTT云服务,EMQX Cloud,这个网站新人注册会有免费开通云服务的资格,直接注册即可
EMQX Cloud:https://www.emqx.com/zh/cloud
步骤如下:
1.点击免费试用
2.点击购买专有版,输入部署名称,其他选项默认,初次注册购买会有14天免费使用期限,到期删除部署即可
3.检察毗连信息
云服务开通后会进入如何页面,点击 概览,下图红圈中的毗连信息即使我们客户端必要的信息,如图所示可以使用各种毗连协议,我们本次使用的是 MQTT over TCP 端口为 1883
4.添加客户端认证
如下图所示,点击添加,输入所必要登录的客户端账号密码,这个后面会用到,写个自己认识的就好
至此,MQTT云服务已部署完毕,很简单吧,哈哈哈哈
接下来我们来测试云服务是否能正常发送消息
二、在MQTTX中毗连服务器
首先我们必要下载一个可视化客户端工具:MQTTX
MQTTX :https://mqttx.app/zh
下载完成之后打开工具,先设置成中文表现,如下图所示
然后点击配置进入配置页面,配置毗连地点,用户名,密码等
名称随意设置一个,ClientID随机即可,协议选择mqtt://,ip在刚刚云服务中的毗连信息可以检察,端口为:1883,用户名和密码即我们刚刚设置的,至此毗连信息配置完毕,点击右上角毗连
毗连成功之后,我们点击添加订阅,设置订阅主题,主题名为随意设置,记着即可,我这边设置的是:topic1,qos设置为1,至少一次
回到主页面,点击左侧主题,变色即表示订阅成功,然后点击右下角发送消息,在发送视图中如果我们能看到吸收到的服务端的消息,则表示测试成功,我们部署的云服务可以成功毗连。
在鸿蒙应用中搭建MQTT客户端
好了,铺垫了这么多,终于到了写代码的阶段了
一、安装@ohos/mqtt
1.安装
2.开启网络权限
在 module.json5 中开启网络服务权限
- "requestPermissions": [
- {
- "name": "ohos.permission.INTERNET",
- }
- ]
复制代码 二、封装MQTTClient方法
根据 @ohos/mqtt文档,搭建MQTT客户端的第一步是创建一个MQTT实例
1.创建实例
- import {
- MqttAsync,
- MqttClient,
- MqttMessage,
- MqttQos,
- MqttResponse,
- MqttSubscribeOptions,
- } from '@ohos/mqtt';
- import emitter from '@ohos.events.emitter';
- public createMqttClient() {
- this.mqttClient = MqttAsync.createMqtt({
- url: this.url,
- clientId: this.clientId,
- persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件系统)持久性机制。1=在内存持久性中使用。2=使用特定于应用程序的持久性实现
- })
- }
复制代码 2.毗连服务器
- public async connectMqtt() {
- if (!this.mqttClient) {
- return
- }
- await this.mqttClient.connect({
- userName: this.userName,
- password: this.password,
- connectTimeout: 30, // 设置连接超时时间,默认
- automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
- MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
- // sslOptions:{
- // enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
- // trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
- // }
- }).then((res: MqttResponse) => {
- console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
- }).catch((err: Error) => {
- console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
- })
- }
复制代码 3.订阅及监听主题
- public async subscribe() {
- if (!this.mqttClient) {
- return
- }
- let subscribeOption: MqttSubscribeOptions = {
- topic: this.topic,
- qos: this.qos as MqttQos
- }
- await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
- console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
- }).catch((err: Error) => {
- console.error('MQTT订阅主题失败:', JSON.stringify(err))
- })
- // 监听订阅主题
- this.messageArrived()
- }
- // 监听订阅主题
- public messageArrived() {
- if (!this.mqttClient) {
- return
- }
- // 监听主题发送消息
- this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
- if (err) {
- console.error('MQTT接收消息失败:', JSON.stringify(err))
- } else {
- console.info('MQTT接收消息成功:', JSON.stringify(data))
- // 发送消息至线程
- emitter.emit({
- eventId: EVENTID,
- priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
- }, {
- data: {
- content: data.payload,
- }
- })
- }
- })
- }
复制代码 4.发送消息
- public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
- if (!this.mqttClient) {
- return
- }
- await this.mqttClient.publish({
- topic: pic,
- qos: qo,
- payload: msg
- }).then((data: MqttResponse) => {
- console.info("MQTT消息发送成功:", JSON.stringify(data))
- }).catch((err: Error) => {
- console.error("MQTT消息发送失败:", JSON.stringify(err))
- })
- }
复制代码 理解了这四步的代码即可搭建一个简单的客户端了,但现实项目中,仅仅如许写是不敷的,我们必要把以上代码封装成一个类,便于页面的调用及复用,借助与ts的特性,我们还可以把这个类优化一下,封装成一个单例,如许的利益在于,应用有且仅有一个实例,在应用入口一次声明,在应用出口一次销毁即可,防止开辟者多次声明的情况发生,好了,让我们结合上面的代码封装一个 MQTTClient 类
- import { MqttAsync, MqttClient, MqttMessage, MqttQos, MqttResponse, MqttSubscribeOptions,} from '@ohos/mqtt';import emitter from '@ohos.events.emitter';// 事件idconst EVENTID = 1001interface MQTTOptionsType { url?: string clientId?: string userName?: string password?: string topic?: string qos?: MqttQos | undefined}class MQTTClient { private static instance: MQTTClient private mqttClient: MqttClient | null = null private url: string = '' // 以null结尾的字符串,指定客户端将毗连到的服务器。它接纳的形式protocol://host:port.protocol必须是tcp、ssl、ws或wss。对于主机,可以指定IP地点或主机名。例如,tcp://localhost:1883 private clientId: string = '' // 客户端毗连到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串 private userName: string = '' // 客户端用户名 private password: string = '' // 客户端密码 private topic: string = '' // 主题名称 private qos: MqttQos | undefined = undefined // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次 constructor(mqttOptions: MQTTOptionsType) { mqttOptions.url && (this.url = mqttOptions.url) mqttOptions.clientId && (this.clientId = mqttOptions.clientId) mqttOptions.userName && (this.userName = mqttOptions.userName) mqttOptions.password && (this.password = mqttOptions.password) mqttOptions.topic && (this.topic = mqttOptions.topic) mqttOptions.qos && (this.qos = mqttOptions.qos) this.init() } // 静态方法获取唯一实例 public static getInstance(mqttOptions: MQTTOptionsType): MQTTClient { if (!MQTTClient.instance) { MQTTClient.instance = new MQTTClient(mqttOptions) } return MQTTClient.instance } // 初始化方法 async init() { this.createMqttClient() // 创建mqtt实例 await this.connectMqtt() // 毗连服务器 await this.subscribe() // 订阅主题 this.messageArrived() // 监听订阅主题 } // 创建mqtt实例 public createMqttClient() { this.mqttClient = MqttAsync.createMqtt({ url: this.url, clientId: this.clientId, persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件体系)持久性机制。1=在内存持久性中使用。2=使用特定于应用步伐的持久性实现 }) } // 毗连服务器 public async connectMqtt() {
- if (!this.mqttClient) {
- return
- }
- await this.mqttClient.connect({
- userName: this.userName,
- password: this.password,
- connectTimeout: 30, // 设置连接超时时间,默认
- automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
- MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
- // sslOptions:{
- // enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
- // trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
- // }
- }).then((res: MqttResponse) => {
- console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
- }).catch((err: Error) => {
- console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
- })
- }
- // 订阅主题 public async subscribe() {
- if (!this.mqttClient) {
- return
- }
- let subscribeOption: MqttSubscribeOptions = {
- topic: this.topic,
- qos: this.qos as MqttQos
- }
- await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
- console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
- }).catch((err: Error) => {
- console.error('MQTT订阅主题失败:', JSON.stringify(err))
- })
- // 监听订阅主题
- this.messageArrived()
- }
- // 监听订阅主题
- public messageArrived() {
- if (!this.mqttClient) {
- return
- }
- // 监听主题发送消息
- this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
- if (err) {
- console.error('MQTT接收消息失败:', JSON.stringify(err))
- } else {
- console.info('MQTT接收消息成功:', JSON.stringify(data))
- // 发送消息至线程
- emitter.emit({
- eventId: EVENTID,
- priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
- }, {
- data: {
- content: data.payload,
- }
- })
- }
- })
- }
- /** * 发送消息 * @param pic 订阅的主题 * @param msg 消息内容 * @param qo qos */ public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
- if (!this.mqttClient) {
- return
- }
- await this.mqttClient.publish({
- topic: pic,
- qos: qo,
- payload: msg
- }).then((data: MqttResponse) => {
- console.info("MQTT消息发送成功:", JSON.stringify(data))
- }).catch((err: Error) => {
- console.error("MQTT消息发送失败:", JSON.stringify(err))
- })
- }
- // 销毁客户端实例 public async destroy() { if (!this.mqttClient) { return } await this.mqttClient.destroy().then((data: boolean) => { if (data) { console.info('MQTT实例销毁成功') emitter.off(EVENTID) } else { console.error('MQTT实例销毁失败') } }) }}export { MQTTClient, EVENTID, MQTTOptionsType}
复制代码 在上面的代码中,我们还用到了一个ArkTS中的内置模块 @ohos.events.emitter 线程通信,它可以在应用的任何地方分发事件,在页面中监听事件并吸收数据,稍后的页面中我们会用到
三、使用MQTTClient方法
首先我们必要构建一个页面视图,视图中可以表现发送的消息,吸收到的消息等,因封装的方法中声明实例即自动毗连服务器,以是不必要毗连按钮
- @Entry
- @Component
- struct Index {
- @State receiveMsg: string = ''
- @State sendMsg: string = ''
- build() {
- Column({ space: 24 }) {
- Text('MQTT客户端')
- .fontSize(32)
- .fontWeight(500)
- .margin({ bottom: 20 })
- Row() {
- Text('接收到的消息:')
- TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
- .layoutWeight(1)
- }
- Row({ space: 16 }) {
- Button('发送消息')
- TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
- .layoutWeight(1)
- }
- .width('100%')
- Button('销毁客户端')
- .width('100%')
- }
- .padding(20)
- .width('100%')
- }
- }
复制代码
然后我们引入并声明刚刚封装的 MQTTClient 类,并配置毗连信息,与MQTTX工具中配置的一样即可,必要注意的是url,必须是 mqtt协议+ip + 端口的格式,如:mqtt://0.0.0.0:1883
- import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
- import { emitter } from '@kit.BasicServicesKit'
- const MQTTOption: MQTTOptionsType = {
- url: 'mqtt:/${ip}:1883', // 注意:填写自己部署的连接地址
- clientId: 'mqtt123456@', // 客户端连接到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串
- userName: 'root', // 客户端用户名
- password: 'admin', // 客户端密码
- topic: 'topic1', // 主题名称
- qos: 1 // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次
- }
- const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
复制代码 引入之后即可给视图绑定事件,并在声明周期中监听服务端发送的消息,完整代码如下:
- import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
- import { emitter } from '@kit.BasicServicesKit'
- const MQTTOption: MQTTOptionsType = {
- url: 'mqtt://k178b5e8.ala.dedicated.aliyun.emqxcloud.cn:1883',
- clientId: 'mqtt123456@',
- userName: 'root',
- password: 'admin',
- topic: 'topic1',
- qos: 1
- }
- const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
- @Entry
- @Component
- struct Index {
- @State receiveMsg: string = ''
- @State sendMsg: string = ''
- onPageShow(): void {
- emitter.on({
- eventId: EVENTID
- }, (data) => {
- this.receiveMsg = JSON.stringify(data)
- });
- }
- build() {
- Column({ space: 24 }) {
- Text('MQTT客户端')
- .fontSize(32)
- .fontWeight(500)
- .margin({ bottom: 20 })
- Row() {
- Text('接收到的消息:')
- TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
- .layoutWeight(1)
- }
- Row({ space: 16 }) {
- Button('发送消息')
- .onClick(() => {
- if (this.sendMsg) {
- MQTTInstance.pushMessage(this.sendMsg)
- this.sendMsg = ''
- }
- })
- TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
- .layoutWeight(1)
- }
- .width('100%')
- Button('销毁客户端')
- .width('100%')
- .onClick(() => {
- MQTTInstance.destroy()
- })
- }
- .padding(20)
- .width('100%')
- }
- }
复制代码 在模仿器中运行视图代码,然后打开日记就可以看到自己的打印信息,如毗连成功,订阅主题成功等
在输入框中填写必要发送给服务端的信息,点击发送,即可在吸收信息框中看到服务端发送到客户端的信息
点击销毁客户端可销毁当前实例,这个步骤可以推出应用时触发,至此,MQTT客户端搭建完毕。
总结
第一次写文章,全文总计一万多字,才知道原创码字原来这么累,盼望能得到各位大佬的一个点赞。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |