MQTT客户端在HarmonyOS应用中的搭建
本文先容了MQTT服务端部署,测试到MQTT客户端搭建的全过程,全文篇幅有点长,盼望各位大佬能耐烦查阅,多多指教。什么是MQTT?
在上个项目的开辟中,碰到一个很特殊的需求,在鸿蒙中搭建MQTT的客户端,刚接到的需求的时间也是一脸懵,MQTT是啥东西,都没打仗过,然后经过一系列的查询,终于搞清楚了什么是MQTT:
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为毗连长途设备提供实时可靠的消息服务
于是下个困难出现了,这个协议在鸿蒙中该如何搭建,于是我去 鸿蒙开辟者论坛 一搜索,发现鸿蒙中好像根本不支持这协议:
https://i-blog.csdnimg.cn/direct/ed4891472f7b450a8374bc7ad53f32fb.jpeg#pic_center
于是我寻思在Android和IOS中竟然能实现,那在鸿蒙中应该也是能实现,经过了大量的资料查询,在鸿蒙开源三方库资源汇总 中终于找到了基于鸿蒙的MQTT客户端第三方库 : @ohos/mqtt
鸿蒙开源三方库资源汇总:https://gitee.com/openharmony-tpc/tpc_resource
@ohos/mqtt:https://gitee.com/openharmony-tpc/ohos_mqtt
搭建MQTT服务端并测试
由于 @ohos/mqtt 只能应用于鸿蒙客户端的搭建,对于初次使用且没有MQTT服务端情况的同学不是特殊友好,以是本文会在开始前先容如何部署一个简易MQTT服务器用来生产消息,且用可视化客户端工具毗连服务器,测试服务器是否部署成功 ,若是有服务端情况的同学则可以跳过本节。
一、开通云服务(免费)
笔者这边推荐一个免费的MQTT云服务,EMQX Cloud,这个网站新人注册会有免费开通云服务的资格,直接注册即可
EMQX Cloud:https://www.emqx.com/zh/cloud
步骤如下:
1.点击免费试用
https://i-blog.csdnimg.cn/direct/ee500c3e339745f79857691bf4929aad.png
2.点击购买专有版,输入部署名称,其他选项默认,初次注册购买会有14天免费使用期限,到期删除部署即可
https://i-blog.csdnimg.cn/direct/7312f65fc07b4156bc0aae6fa2fe174d.png
3.检察毗连信息
云服务开通后会进入如何页面,点击 概览,下图红圈中的毗连信息即使我们客户端必要的信息,如图所示可以使用各种毗连协议,我们本次使用的是 MQTT over TCP 端口为 1883
https://i-blog.csdnimg.cn/direct/86fb6dcef6624b02a640e0ca3739f7dd.png
4.添加客户端认证
如下图所示,点击添加,输入所必要登录的客户端账号密码,这个后面会用到,写个自己认识的就好
https://i-blog.csdnimg.cn/direct/95dbefd5b7dd4d429193aee6b7aab455.png
至此,MQTT云服务已部署完毕,很简单吧,哈哈哈哈
接下来我们来测试云服务是否能正常发送消息
二、在MQTTX中毗连服务器
首先我们必要下载一个可视化客户端工具:MQTTX
MQTTX :https://mqttx.app/zh
下载完成之后打开工具,先设置成中文表现,如下图所示
https://i-blog.csdnimg.cn/direct/657316735fb64b2da2a5d7975d70e68a.png
然后点击配置进入配置页面,配置毗连地点,用户名,密码等
https://i-blog.csdnimg.cn/direct/e2c3940506634688b7e912cdb0238f84.png
名称随意设置一个,ClientID随机即可,协议选择mqtt://,ip在刚刚云服务中的毗连信息可以检察,端口为:1883,用户名和密码即我们刚刚设置的,至此毗连信息配置完毕,点击右上角毗连
https://i-blog.csdnimg.cn/direct/f8238e3f3f62404ab6e007ba45e39712.png
毗连成功之后,我们点击添加订阅,设置订阅主题,主题名为随意设置,记着即可,我这边设置的是:topic1,qos设置为1,至少一次
https://i-blog.csdnimg.cn/direct/c1f3cef014ef4e3098ba8b008ea897aa.png
回到主页面,点击左侧主题,变色即表示订阅成功,然后点击右下角发送消息,在发送视图中如果我们能看到吸收到的服务端的消息,则表示测试成功,我们部署的云服务可以成功毗连。
https://i-blog.csdnimg.cn/direct/d5db60f010454da098741e5bbf7fc21b.png
在鸿蒙应用中搭建MQTT客户端
好了,铺垫了这么多,终于到了写代码的阶段了
一、安装@ohos/mqtt
1.安装
ohpm install @ohos/mqtt
2.开启网络权限
在 module.json5 中开启网络服务权限
"requestPermissions": [
{
"name": "ohos.permission.INTERNET",
}
]
二、封装MQTTClient方法
根据 @ohos/mqtt文档,搭建MQTT客户端的第一步是创建一个MQTT实例
1.创建实例
import {
MqttAsync,
MqttClient,
MqttMessage,
MqttQos,
MqttResponse,
MqttSubscribeOptions,
} from '@ohos/mqtt';
import emitter from '@ohos.events.emitter';
public createMqttClient() {
this.mqttClient = MqttAsync.createMqtt({
url: this.url,
clientId: this.clientId,
persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件系统)持久性机制。1=在内存持久性中使用。2=使用特定于应用程序的持久性实现
})
}
2.毗连服务器
public async connectMqtt() {
if (!this.mqttClient) {
return
}
await this.mqttClient.connect({
userName: this.userName,
password: this.password,
connectTimeout: 30, // 设置连接超时时间,默认
automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
// sslOptions:{
// enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
// trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
// }
}).then((res: MqttResponse) => {
console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
}).catch((err: Error) => {
console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
})
}
3.订阅及监听主题
public async subscribe() {
if (!this.mqttClient) {
return
}
let subscribeOption: MqttSubscribeOptions = {
topic: this.topic,
qos: this.qos as MqttQos
}
await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
}).catch((err: Error) => {
console.error('MQTT订阅主题失败:', JSON.stringify(err))
})
// 监听订阅主题
this.messageArrived()
}
// 监听订阅主题
public messageArrived() {
if (!this.mqttClient) {
return
}
// 监听主题发送消息
this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
if (err) {
console.error('MQTT接收消息失败:', JSON.stringify(err))
} else {
console.info('MQTT接收消息成功:', JSON.stringify(data))
// 发送消息至线程
emitter.emit({
eventId: EVENTID,
priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
}, {
data: {
content: data.payload,
}
})
}
})
}
4.发送消息
public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
if (!this.mqttClient) {
return
}
await this.mqttClient.publish({
topic: pic,
qos: qo,
payload: msg
}).then((data: MqttResponse) => {
console.info("MQTT消息发送成功:", JSON.stringify(data))
}).catch((err: Error) => {
console.error("MQTT消息发送失败:", JSON.stringify(err))
})
}
理解了这四步的代码即可搭建一个简单的客户端了,但现实项目中,仅仅如许写是不敷的,我们必要把以上代码封装成一个类,便于页面的调用及复用,借助与ts的特性,我们还可以把这个类优化一下,封装成一个单例,如许的利益在于,应用有且仅有一个实例,在应用入口一次声明,在应用出口一次销毁即可,防止开辟者多次声明的情况发生,好了,让我们结合上面的代码封装一个 MQTTClient 类
import {MqttAsync,MqttClient,MqttMessage,MqttQos,MqttResponse,MqttSubscribeOptions,} from '@ohos/mqtt';import emitter from '@ohos.events.emitter';// 事件idconst EVENTID = 1001interface MQTTOptionsType {url?: stringclientId?: stringuserName?: stringpassword?: stringtopic?: stringqos?: MqttQos | undefined}class MQTTClient {private static instance: MQTTClientprivate mqttClient: MqttClient | null = nullprivate url: string = '' // 以null结尾的字符串,指定客户端将毗连到的服务器。它接纳的形式protocol://host:port.protocol必须是tcp、ssl、ws或wss。对于主机,可以指定IP地点或主机名。例如,tcp://localhost:1883private clientId: string = '' // 客户端毗连到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串private userName: string = '' // 客户端用户名private password: string = '' // 客户端密码private topic: string = '' // 主题名称private qos: MqttQos | undefined = undefined // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次constructor(mqttOptions: MQTTOptionsType) { mqttOptions.url && (this.url = mqttOptions.url) mqttOptions.clientId && (this.clientId = mqttOptions.clientId) mqttOptions.userName && (this.userName = mqttOptions.userName) mqttOptions.password && (this.password = mqttOptions.password) mqttOptions.topic && (this.topic = mqttOptions.topic) mqttOptions.qos && (this.qos = mqttOptions.qos) this.init()}// 静态方法获取唯一实例public static getInstance(mqttOptions: MQTTOptionsType): MQTTClient { if (!MQTTClient.instance) { MQTTClient.instance = new MQTTClient(mqttOptions) } return MQTTClient.instance}// 初始化方法async init() { this.createMqttClient() // 创建mqtt实例 await this.connectMqtt() // 毗连服务器 await this.subscribe() // 订阅主题 this.messageArrived() // 监听订阅主题}// 创建mqtt实例public createMqttClient() { this.mqttClient = MqttAsync.createMqtt({ url: this.url, clientId: this.clientId, persistenceType: 1 // 客户端使用的持久性类型,0=默认值:使用默认(基于文件体系)持久性机制。1=在内存持久性中使用。2=使用特定于应用步伐的持久性实现 })}// 毗连服务器public async connectMqtt() {
if (!this.mqttClient) {
return
}
await this.mqttClient.connect({
userName: this.userName,
password: this.password,
connectTimeout: 30, // 设置连接超时时间,默认
automaticReconnect: true, // 是否在连接丢失的情况下自动重新连接
MQTTVersion: 0, // 设置要在连接上使用的MQTT版本, 0=默认值:从3.1.1开始,如果失败,则返回到3.1,3=仅尝试版本3.1,4=仅尝试3.1.1版本
// sslOptions:{
// enableServerCertAuth: true, // 是否开启服务端证书认证,默认为false
// trustStore: fileDir + "/ca.crt" // CA证书的沙箱路径
// }
}).then((res: MqttResponse) => {
console.info('MQTT服务器连接连接成功:' + JSON.stringify(res.message))
}).catch((err: Error) => {
console.error('MQTT服务器连接连接失败:' + JSON.stringify(err))
})
}
// 订阅主题public async subscribe() {
if (!this.mqttClient) {
return
}
let subscribeOption: MqttSubscribeOptions = {
topic: this.topic,
qos: this.qos as MqttQos
}
await this.mqttClient.subscribe(subscribeOption).then((res: MqttResponse) => {
console.info('MQTT订阅主题成功:', JSON.stringify(res.message))
}).catch((err: Error) => {
console.error('MQTT订阅主题失败:', JSON.stringify(err))
})
// 监听订阅主题
this.messageArrived()
}
// 监听订阅主题
public messageArrived() {
if (!this.mqttClient) {
return
}
// 监听主题发送消息
this.mqttClient.messageArrived((err: Error, data: MqttMessage) => {
if (err) {
console.error('MQTT接收消息失败:', JSON.stringify(err))
} else {
console.info('MQTT接收消息成功:', JSON.stringify(data))
// 发送消息至线程
emitter.emit({
eventId: EVENTID,
priority: emitter.EventPriority.LOW // 表示事件优于IDLE优先级投递,事件的默认优先级是LOW。
}, {
data: {
content: data.payload,
}
})
}
})
}
/** * 发送消息 * @param pic 订阅的主题 * @param msg 消息内容 * @param qo qos */public async pushMessage(msg: string, pic: string = this.topic, qo: MqttQos = this.qos as MqttQos) {
if (!this.mqttClient) {
return
}
await this.mqttClient.publish({
topic: pic,
qos: qo,
payload: msg
}).then((data: MqttResponse) => {
console.info("MQTT消息发送成功:", JSON.stringify(data))
}).catch((err: Error) => {
console.error("MQTT消息发送失败:", JSON.stringify(err))
})
}
// 销毁客户端实例public async destroy() { if (!this.mqttClient) { return } await this.mqttClient.destroy().then((data: boolean) => { if (data) { console.info('MQTT实例销毁成功') emitter.off(EVENTID) } else { console.error('MQTT实例销毁失败') } })}}export {MQTTClient, EVENTID, MQTTOptionsType} 在上面的代码中,我们还用到了一个ArkTS中的内置模块 @ohos.events.emitter 线程通信,它可以在应用的任何地方分发事件,在页面中监听事件并吸收数据,稍后的页面中我们会用到
三、使用MQTTClient方法
首先我们必要构建一个页面视图,视图中可以表现发送的消息,吸收到的消息等,因封装的方法中声明实例即自动毗连服务器,以是不必要毗连按钮
@Entry
@Component
struct Index {
@State receiveMsg: string = ''
@State sendMsg: string = ''
build() {
Column({ space: 24 }) {
Text('MQTT客户端')
.fontSize(32)
.fontWeight(500)
.margin({ bottom: 20 })
Row() {
Text('接收到的消息:')
TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
.layoutWeight(1)
}
Row({ space: 16 }) {
Button('发送消息')
TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
.layoutWeight(1)
}
.width('100%')
Button('销毁客户端')
.width('100%')
}
.padding(20)
.width('100%')
}
}
https://i-blog.csdnimg.cn/direct/099fcb9900d0438fa1ac20e61e2a5df6.png
然后我们引入并声明刚刚封装的 MQTTClient类,并配置毗连信息,与MQTTX工具中配置的一样即可,必要注意的是url,必须是 mqtt协议+ip + 端口的格式,如:mqtt://0.0.0.0:1883
import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
import { emitter } from '@kit.BasicServicesKit'
const MQTTOption: MQTTOptionsType = {
url: 'mqtt:/${ip}:1883',// 注意:填写自己部署的连接地址
clientId: 'mqtt123456@', // 客户端连接到服务器时传递给服务器的客户端标识符。它是一个以空结尾的UTF-8编码字符串
userName: 'root', // 客户端用户名
password: 'admin', // 客户端密码
topic: 'topic1', // 主题名称
qos: 1 // 消息的服务质量设置 0 最多一次 1 至少一次 2 近一次
}
const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
引入之后即可给视图绑定事件,并在声明周期中监听服务端发送的消息,完整代码如下:
import { MQTTClient, MQTTOptionsType, EVENTID } from '../utils/MQTTClientInstance'
import { emitter } from '@kit.BasicServicesKit'
const MQTTOption: MQTTOptionsType = {
url: 'mqtt://k178b5e8.ala.dedicated.aliyun.emqxcloud.cn:1883',
clientId: 'mqtt123456@',
userName: 'root',
password: 'admin',
topic: 'topic1',
qos: 1
}
const MQTTInstance: MQTTClient = MQTTClient.getInstance(MQTTOption)
@Entry
@Component
struct Index {
@State receiveMsg: string = ''
@State sendMsg: string = ''
onPageShow(): void {
emitter.on({
eventId: EVENTID
}, (data) => {
this.receiveMsg = JSON.stringify(data)
});
}
build() {
Column({ space: 24 }) {
Text('MQTT客户端')
.fontSize(32)
.fontWeight(500)
.margin({ bottom: 20 })
Row() {
Text('接收到的消息:')
TextInput({ text: $$this.receiveMsg, placeholder: '接收到的消息' })
.layoutWeight(1)
}
Row({ space: 16 }) {
Button('发送消息')
.onClick(() => {
if (this.sendMsg) {
MQTTInstance.pushMessage(this.sendMsg)
this.sendMsg = ''
}
})
TextInput({ text: $$this.sendMsg, placeholder: '请填写需要发送的消息' })
.layoutWeight(1)
}
.width('100%')
Button('销毁客户端')
.width('100%')
.onClick(() => {
MQTTInstance.destroy()
})
}
.padding(20)
.width('100%')
}
}
在模仿器中运行视图代码,然后打开日记就可以看到自己的打印信息,如毗连成功,订阅主题成功等
https://i-blog.csdnimg.cn/direct/9e046d1a6fb54ac3a9e0af12291aa1f1.png
在输入框中填写必要发送给服务端的信息,点击发送,即可在吸收信息框中看到服务端发送到客户端的信息
https://i-blog.csdnimg.cn/direct/daca1ea91ac7482ab787552700bf95c4.png
点击销毁客户端可销毁当前实例,这个步骤可以推出应用时触发,至此,MQTT客户端搭建完毕。
https://i-blog.csdnimg.cn/direct/40a274a86a3541399074fa6ac949d011.png
总结
第一次写文章,全文总计一万多字,才知道原创码字原来这么累,盼望能得到各位大佬的一个点赞。
https://i-blog.csdnimg.cn/direct/013ce0a24e8a4bbea5723568b8487b62.png
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]