golang开源的可嵌入应用步伐高性能的MQTT服务

[复制链接]
发表于 2026-2-25 12:12:42 | 显示全部楼层 |阅读模式
golang开源的可嵌入应用步伐高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,计划用于在低带宽、高延长或不可靠的网络环境中举行通讯。MQTT最初由IBM开辟,现已成为OASIS标准。
MQTT的计划目的是提供一种简朴、轻量、可扩展的协议,实用于各种装备和网络条件。它通常用于物联网物联网(IoT)和传感器网络,此中装备须要以有效的方式举行通讯,而且资源(如带宽和电池寿命)大概受到限定。
MQTT的简朴计划和实用性使其成为物联网物联网中常用的通讯协议之一。它被广泛用于传感器网络、嵌入式装备、移动应用步伐和其他场景中,提供了一种可靠、高效的消息传输机制。
什么是Mochi-MQTT

代码地点:https://github.com/mochi-mqtt/server
Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中心件/服务器,完全利用 Go 语言编写,旨在用于遥测和物联网物联网项目的开辟。它可以作为独立的二进制文件利用,也可以嵌入到你本身的应用步伐中库来利用,颠末提出的计划以实现标题的轻量化和快速摆设,同时也非常器重代码的质量和可维护性。
用途

物联网项目开辟时,经常须要利用MQTT协议对装备接入,在很多场景中,私有化摆设物联网体系时资源比力少,性能要求高,一些大型的MQTT服务不满意要求,而且代码不可控。
尚有在边沿场景下,须要在边沿网关,边沿控制器装备上摆设物联网体系,但是边沿网关的资源很少,内存约莫只有4G,以是利用java开辟的物联网体系就很难摆设上去;利用C/C++开辟服从又很低,以是Go语言是最符合的,
Mochi-MQTT刚好又美满是Go编写的开源的,可以嵌入到本身的步伐启动。
Mochi MQTT独立摆设

Golang的环境设置这里不做分析,请看我前面的博文分析
Mochi MQTT 可以作为独立的中心件利用。只需拉取此堆栈代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控监控 (:8080) 。
  1. cd cmd
  2. go build -o mqtt && ./mqtt
复制代码
docker摆设

可以从 Docker Hub 堆栈中拉取并运行Mochi MQTT官方镜像:
  1. docker pull mochimqtt/server
  2. 或者
  3. docker run mochimqtt/server
复制代码
也提供了一个简朴的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:
  1. docker build -t mochi:latest .
  2. docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest
复制代码
嵌入本身项目运行和开辟

下载Mochi MQTT包
  1. go get github.com/mochi-mqtt/server/v2
复制代码
将Mochi MQTT作为包导入利用, 示例代码如下
  1. import (
  2.   mqttServer "github.com/mochi-mqtt/server/v2"
  3.         "github.com/mochi-mqtt/server/v2/listeners"
  4.         "github.com/mochi-mqtt/server/v2/packets"
  5. )
  6. var Server *mqttServer.Server
  7. func ServerMqttInit() {
  8.         // 创建新的 MQTT 服务器
  9.         Server = mqttServer.New(&mqttServer.Options{
  10.                 InlineClient: true, // 启动内联客户端
  11.         })
  12.        
  13.         // 初始化数据库实例
  14.         edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
  15.                 productDao:     productDao.NewProductRepository(),
  16.         }
  17.         // 添加自定义权限方法
  18.         err := Server.AddHook(edge, nil)
  19.         if err != nil {
  20.                 log.Fatal(err)
  21.         }
  22.         // 在1883端口上创建一个 TCP 服务端。
  23.         tcp := listeners.NewTCP("t1", ":1883", nil)
  24.         err = Server.AddListener(tcp)
  25.         if err != nil {
  26.                 log.Fatal(err)
  27.         }
  28.         // 在1882端口上创建一个 Websocket 服务端。
  29.         ws := listeners.NewWebsocket("ws1", ":1882", nil)
  30.         err = server.AddListener(ws)
  31.         if err != nil {
  32.                 log.Fatal(err)
  33.         }
  34.         go func() {
  35.                 err := Server.Serve()
  36.                 if err != nil {
  37.                         log.Fatal(err)
  38.                 }
  39.         }()
  40. }
  41. type edgeHook struct {
  42.         mqttServer.HookBase
  43.         deviceDao      deviceDao.DeviceRepository
  44.         productDao     productDao.ProductRepository
  45. }
  46. func (h *edgeHook) ID() string {
  47.         return "mqtt-auth"
  48. }
  49. func (h *edgeHook) Provides(b byte) bool {
  50.         // 实现钩子函数
  51.         return bytes.Contains([]byte{
  52.                 //MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
  53.                 mqttServer.OnConnectAuthenticate,
  54.                 //MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
  55.                 mqttServer.OnACLCheck,
  56.                 //在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
  57.                 mqttServer.OnSessionEstablish,
  58.                 //当客户端因任何原因断开连接时调用。
  59.                 mqttServer.OnDisconnect,
  60.                 //当客户端向订阅者发布消息后调用。
  61.                 mqttServer.OnPublished,
  62.         }, []byte{b})
  63. }
  64. // OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
  65. func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {
  66.         username := string(pk.Connect.Username)
  67.         password := string(pk.Connect.Password)
  68.         if username == "" || len(username) == 0 {
  69.                 return false
  70.         }
  71.         if password == "" || len(password) == 0 {
  72.                 return false
  73.         }
  74.         return true
  75. }
  76. // OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
  77. func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {
  78.         username := string(cl.Properties.Username)
  79.         if username == "" || len(username) == 0 {
  80.                 return false
  81.         }
  82.         if topic == "" || len(topic) == 0 {
  83.                 return false
  84.         }
  85.         return true
  86. }
  87. // OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
  88. func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {
  89.         username := string(cl.Properties.Username)
  90.         if username == "" || len(username) == 0 {
  91.                 return
  92.         }
  93.         //设备连接MQTT成功后保存设备在线状态
  94. }
  95. // OnDisconnect 当客户端因任何原因断开连接时调用。
  96. func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {
  97.         username := string(cl.Properties.Username)
  98.         if username == "" || len(username) == 0 {
  99.                 return
  100.         }
  101.         //设备断开MQTT成功后保存设备离线状态
  102. }
  103. // OnPublished 当客户端向订阅者发布消息后调用。
  104. func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {
  105.         Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
  106.         //收到客户端消息后做业务逻辑处理
  107. }
  108. // 使用内联客户端方式,向MQTT发送消息
  109. func PublishMsg(topic string, msg []byte) bool {
  110.         err := Server.Publish(topic, msg, false, 0)
  111.         if err != nil {
  112.                 Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
  113.                 return false
  114.         }
  115.         return true
  116. }
  117. // 使用内联客户端方式,订阅边缘MQTT消息topic
  118. func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
  119.         callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
  120.                 Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
  121.                         "topic", pk.TopicName, "payload", string(pk.Payload))
  122.                 callback(pk.TopicName, pk.Payload)
  123.         }
  124.         _ = Server.Subscribe(topic, subscriptionId, callbackFn)
  125. }
  126. // 使用内联客户端方式,取消订阅边缘MQTT消息topic
  127. func UnsubscribeTopic(topic string, subscriptionId int) {
  128.         _ = Server.Unsubscribe(topic, subscriptionId)
  129. }
  130. func main() {
  131.         // 创建信号用于等待服务端关闭信号
  132.   sigs := make(chan os.Signal, 1)
  133.   done := make(chan bool, 1)
  134.   signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  135.   go func() {
  136.     <-sigs
  137.     done <- true
  138.   }()
  139.   
  140.   <-done
  141.         Log.Error("caught signal, stopping...")
  142.         Server.Close()
  143.         Log.Error("main.go finished")
  144. }
复制代码
监控监控MQTT指标信息
  1. mqttRouters := r.Group("/mqtt", func(context *gin.Context) {})
  2.         {
  3.                 mqttRouters.GET("stats", func(c *gin.Context) {
  4.                         util.R(c, nil, mqtt.Server.Info)
  5.                 })
  6.         }
复制代码

详情利用指南请看:https://github.com/mochi-mqtt/server

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

登录后关闭弹窗

登录参与点评抽奖  加入IT实名职场社区
去登录
快速回复 返回顶部 返回列表