MQTT协议及其使用案例

打印 上一主题 下一主题

主题 934|帖子 934|积分 2802

MQTT 概述

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。 可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。 使用MQTT协议,消息发送者与接收者不受时间和空间的限制。
Docker 部署 MQTT(采用docker-compose.yml)
  1. version: "3"
  2. services:
  3.     mqtt:
  4.         image: eclipse-mosquitto
  5.         container_name: mqtt
  6.         privileged: true
  7.         ports:
  8.             - 1883:1883
  9.             - 9001:9001
  10.         volumes:
  11.             - ./config:/mosquitto/config
  12.             - ./data:/mosquitto/data
  13.             - ./log:/mosquitto/log
复制代码

  • 文件夹

  • 创建 config/mosquitto.conf
  1. persistence true
  2. listener 1883
  3. persistence_location /mosquitto/data
  4. log_dest file /mosquitto/log/mosquitto.log
  5. # 关闭匿名模式
  6. # allow_anonymous true
  7. # 指定密码文件
  8. password_file /mosquitto/config/pwfile.conf
复制代码

  • docker部署执行:docker compose up -d
  • 设置访问权限(用户名:admin,密码:admin123)
  1. docker exec -it mqtt sh
  2. touch /mosquitto/config/pwfile.conf
  3. chmod -R 755 /mosquitto/config/pwfile.conf
  4. mosquitto_passwd -b /mosquitto/config/pwfile.conf admin admin123
复制代码

  • 重启mqtt容器:docker compose restart
Springboot 整合


  • 依赖
  1.     <parent>
  2.         <groupId>org.springframework.boot</groupId>
  3.         <artifactId>spring-boot-starter-parent</artifactId>
  4.         <version>2.5.5</version>
  5.         <relativePath/>
  6.     </parent>
  7.    
  8.     <dependencies>
  9.                    
  10.         <dependency>
  11.             <groupId>org.springframework.integration</groupId>
  12.             <artifactId>spring-integration-mqtt</artifactId>
  13.         </dependency>
  14.         
  15.         <dependency>
  16.             <groupId>org.projectlombok</groupId>
  17.             <artifactId>lombok</artifactId>
  18.         </dependency>
  19.         
  20.         <dependency>
  21.             <groupId>org.springframework.boot</groupId>
  22.             <artifactId>spring-boot-starter</artifactId>
  23.         </dependency>
  24.         <dependency>
  25.             <groupId>org.springframework.boot</groupId>
  26.             <artifactId>spring-boot-starter-web</artifactId>
  27.         </dependency>
  28.         <dependency>
  29.             <groupId>com.alibaba</groupId>
  30.             <artifactId>fastjson</artifactId>
  31.             <version>1.2.62</version>
  32.         </dependency>
  33.         
  34.         <dependency>
  35.             <groupId>org.apache.httpcomponents</groupId>
  36.             <artifactId>httpclient</artifactId>
  37.         </dependency>
  38.         
  39.         <dependency>
  40.             <groupId>junit</groupId>
  41.             <artifactId>junit</artifactId>
  42.             <version>4.12</version>
  43.         </dependency>
  44.         <dependency>
  45.             <groupId>org.springframework</groupId>
  46.             <artifactId>spring-test</artifactId>
  47.         </dependency>
  48.         <dependency>
  49.             <groupId>org.springframework.boot</groupId>
  50.             <artifactId>spring-boot-test</artifactId>
  51.         </dependency>
  52.     </dependencies>
复制代码

  • 配置文件
  1. mqtt.host=tcp://127.0.0.1:1883
  2. mqtt.clientId=mqttx_a071ba88
  3. mqtt.username=admin
  4. mqtt.password=admin123
  5. mqtt.topic=test_topic
  6. mqtt.timeout=36000
  7. mqtt.keepAlive=6000
复制代码

  • 配置类
  1. @Slf4j
  2. @Configuration
  3. public class MyMqttConfiguration {
  4.     @Value("${mqtt.host}")
  5.     String broker;
  6.     @Value("${mqtt.clientId}")
  7.     String clientId;
  8.     @Value("${mqtt.username}")
  9.     String username;
  10.     @Value("${mqtt.password}")
  11.     String password;
  12.     @Value("${mqtt.timeout}")
  13.     Integer timeout;
  14.     @Value("${mqtt.keepAlive}")
  15.     Integer keepAlive;
  16.     @Value("${mqtt.topic}")
  17.     String topic;
  18.     @Autowired
  19.     MyHandle myHandle;
  20.     @Bean
  21.     public MyMqttClient myMqttClient(){
  22.         MyMqttClient mqttClient = new MyMqttClient(broker, username, password, clientId, timeout, keepAlive,myHandle);
  23.         for (int i = 0; i < 10; i++) {
  24.             try {
  25.                 mqttClient.connect();
  26.                 mqttClient.subscribe(topic,0);
  27.                 return mqttClient;
  28.             } catch (MqttException e) {
  29.                 log.error("MQTT connect exception,connect time = " + i);
  30.                 try {
  31.                     Thread.sleep(2000);
  32.                 } catch (InterruptedException e1) {
  33.                     e1.printStackTrace();
  34.                 }
  35.             }
  36.         }
  37.         return mqttClient;
  38.     }
  39. }
复制代码

  • 客户端
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.eclipse.paho.client.mqttv3.*;
  3. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  4. import org.springframework.util.ObjectUtils;
  5. @Slf4j
  6. public class MyMqttClient {
  7.     private static MqttClient client;
  8.     private String host;
  9.     private String clientId;
  10.     private String username;
  11.     private String password;
  12.     private Integer timeout;
  13.     private Integer keepAlive;
  14.     private MyHandle myHandle;
  15.     public  MyMqttClient(){
  16.         System.out.println("MyMqttClient空构造函数");
  17.     }
  18.     public MyMqttClient(String host, String username, String password, String clientId, Integer timeOut, Integer keepAlive,MyHandle myHandle) {
  19.         System.out.println("MyMqttClient全参构造");
  20.         this.host = host;
  21.         this.username = username;
  22.         this.password = password;
  23.         this.clientId = clientId;
  24.         this.timeout = timeOut;
  25.         this.keepAlive = keepAlive;
  26.         this.myHandle = myHandle;
  27.     }
  28.     public static MqttClient getClient() {
  29.         return client;
  30.     }
  31.     public static void setClient(MqttClient client) {
  32.         MyMqttClient.client = client;
  33.     }
  34.     /**
  35.      * 设置mqtt连接参数
  36.      */
  37.      public MqttConnectOptions setMqttConnectOptions(String username,String password,Integer timeout, Integer keepAlive){
  38.          MqttConnectOptions options = new MqttConnectOptions();
  39.          options.setUserName(username);
  40.          options.setPassword(password.toCharArray());
  41.          options.setConnectionTimeout(timeout);
  42.          options.setKeepAliveInterval(keepAlive);
  43.          options.setCleanSession(true);
  44.          options.setAutomaticReconnect(true);
  45.          return options;
  46.      }
  47.     /**
  48.      * 连接mqtt服务端
  49.      */
  50.     public void connect() throws MqttException {
  51.         if(client == null){
  52.             client = new MqttClient(host,clientId,new MemoryPersistence());
  53.             client.setCallback(new MyMqttCallback(MyMqttClient.this,this.myHandle));
  54.         }
  55.         MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepAlive);
  56.         if(!client.isConnected()){
  57.             client.connect(mqttConnectOptions);
  58.         }else{
  59.             client.disconnect();
  60.             client.connect(mqttConnectOptions);
  61.         }
  62.         log.info("MQTT connect success");
  63.     }
  64.     /**
  65.      * 断开连接
  66.      * @throws MqttException
  67.      */
  68.     public void disconnect()throws MqttException{
  69.         if(null!=client && client.isConnected()){
  70.             client.disconnect();;
  71.         }
  72.     }
  73.     /**
  74.      * 发布,qos默认为0,非持久化
  75.      */
  76.      public void publish(String pushMessage,String topic,int qos){
  77.          publish(pushMessage, topic, qos, false);
  78.      }
  79.     /**
  80.      * 发布消息
  81.      *
  82.      * @param pushMessage
  83.      * @param topic
  84.      * @param qos
  85.      * @param retained:留存
  86.      */
  87.     public void publish(String pushMessage, String topic, int qos, boolean retained) {
  88.         MqttMessage mqttMessage = new MqttMessage();
  89.         mqttMessage.setPayload(pushMessage.getBytes());
  90.         mqttMessage.setQos(qos);
  91.         mqttMessage.setRetained(retained);
  92.         MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
  93.         if(ObjectUtils.isEmpty(mqttTopic)){
  94.             log.error("主题不存在");
  95.         }
  96.         synchronized (this){
  97.             try{
  98.                 MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);
  99.                 mqttDeliveryToken.waitForCompletion(1000L);
  100.             }catch (MqttException e){
  101.                 e.printStackTrace();
  102.             }
  103.         }
  104.     }
  105.     /**
  106.      * 订阅
  107.      *
  108.      * @param topic
  109.      * @param qos
  110.      */
  111.     public void subscribe(String topic, int qos) {
  112.         try {
  113.             MyMqttClient.getClient().subscribe(topic, qos);
  114.             log.info("订阅主题:"+topic+"成功!");
  115.         } catch (MqttException e) {
  116.             log.error("订阅主题:"+topic+"失败!",e);
  117.         }
  118.     }
  119.     /**
  120.      * 取消订阅
  121.      */
  122.     public void cleanTopic(String topic){
  123.         if(!ObjectUtils.isEmpty(client) && client.isConnected()){
  124.             try{
  125.                 client.unsubscribe(topic);
  126.             }catch (MqttException e){
  127.                 log.error("取消订阅失败!"+e);
  128.             }
  129.         }else{
  130.             log.info("主题不存在或未连接!");
  131.         }
  132.     }
  133. }
复制代码

  • 回调类(消息发送和接收时响应)
  1. @Slf4j
  2. public class MyMqttCallback implements MqttCallbackExtended {
  3.     private MyMqttClient myMqttClient;
  4.     private MyHandle myHandle;
  5.     public MyMqttCallback(MyMqttClient myMqttClient,MyHandle myHandle) {
  6.         this.myMqttClient = myMqttClient;
  7.         this.myHandle = myHandle;
  8.     }
  9.     /**
  10.      * 连接完成
  11.      * @param reconnect
  12.      * @param serverURI
  13.      */
  14.     @Override
  15.     public void connectComplete(boolean reconnect,String serverURI) {
  16.         log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
  17.         //订阅主题(可以在这里订阅主题)
  18.         try {
  19.             MyMqttClient.getClient().subscribe("topic1");
  20.         } catch (MqttException e) {
  21.             log.error("主题订阅失败");
  22.         }
  23.     }
  24.     /**
  25.      * 连接丢失 进行重连操作
  26.      * @param throwable
  27.      */
  28.     @Override
  29.     public void connectionLost(Throwable throwable) {
  30.         log.warn("mqtt connectionLost >>> 5S之后尝试重连: {}", throwable.getMessage());
  31.         long reconnectTimes = 1;
  32.         while (true){
  33.             try{
  34.                 Thread.sleep(5000);
  35.             }catch (InterruptedException ignored){}
  36.             try{
  37.                 if(MyMqttClient.getClient().isConnected()){ // 已连接
  38.                     return;
  39.                 }
  40.                 reconnectTimes+=1;
  41.                 log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
  42.                 MyMqttClient.getClient().reconnect();
  43.             }catch (MqttException e){
  44.                 log.error("mqtt断链异常",e);
  45.             }
  46.         }
  47.     }
  48.     /**
  49.      * 订阅者收到消息之后执行
  50.      * @param topic
  51.      * @param mqttMessage
  52.      * @throws Exception
  53.      */
  54.     @Override
  55.     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  56.         log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
  57.         myHandle.handle(topic,mqttMessage);
  58.     }
  59.     /**
  60.      * * 消息到达后
  61.      * subscribe后,执行的回调函数
  62.      * publish后,配送完成后回调的方法
  63.      *
  64.      * @param iMqttDeliveryToken
  65.      */
  66.     @Override
  67.     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
  68.         System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");
  69.         log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
  70.     }
  71. }
复制代码
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.eclipse.paho.client.mqttv3.MqttMessage;
  3. import org.springframework.scheduling.annotation.Async;
  4. import org.springframework.stereotype.Service;
  5. @Service
  6. @Slf4j
  7. public class MyHandle {
  8.     @Async
  9.     public void handle(String topic, MqttMessage message) {
  10.         log.info("处理消息主题:" + topic + " 信息:" + message);
  11.     }
  12. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表