MQTT从入门到醒目之 MQTT 客户端编程

打印 上一主题 下一主题

主题 1183|帖子 1183|积分 3549

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
MQTT 客户端编程

1 在VUE中利用MQTT

详细步骤如下所示:
1、初始化vue项目
  1. // 创建一个使用vite构建的前端项目
  2. npm create vite@latest
  3. // 进入到项目中,执行如下命令安装项目依赖
  4. npm install
复制代码
2、安装element plus
  1. // 安装element plus
  2. npm install element-plus --save
  3. // 安装mqtt.js依赖
  4. npm install mqtt --save
  5. // 在main.js中添加如下代码
  6. import { createApp } from 'vue'
  7. import App from './App.vue'
  8. import ElementPlus from 'element-plus'
  9. import 'element-plus/dist/index.css'
  10. const app = createApp(App)
  11. app.use(ElementPlus)
  12. app.mount('#app')
复制代码
3、导入课程资料中的MqttDemo.vue页面到components文件夹下
4、更改App.vue页面代码如下所示
  1. <script setup>
  2. import MqttDemo from "./components/MqttDemo.vue";
  3. </script>
  4. <template>
  5.   <MqttDemo/>
  6. </template>
  7. <style>
  8. </style>
复制代码
5、创建和关闭毗连
  1. <script setup>
  2. import mqtt from "mqtt";
  3. // 定义链接信息对象
  4. const connectionInfo = ref({
  5.   protocol: 'ws',
  6.   host: "192.168.136.147",
  7.   port: 8083,
  8.   clientId: "emqx_vue3_" + Math.random().toString(16).substring(2, 8),
  9.   username: "zhangsan",
  10.   password: "123",
  11.   clean: true,
  12.   connectTimeout: 10 * 1000, // ms
  13.   reconnectPeriod: 4000, // ms
  14. })
  15. // 创建链接对象
  16. const client = ref({})
  17. const clientInitData = ref({      // 链接初始化相关数据
  18.   connnected: false
  19. })
  20. // 建立连接事件处理函数
  21. const createConnection = () => {
  22.   const { protocol, host, port , ...options } = connectionInfo.value;
  23.   const connectUrl = `${protocol}://${host}:${port}/mqtt`;
  24.   console.log(connectUrl)
  25.   client.value = mqtt.connect(connectUrl , options);   // 建立连接
  26.   clientInitData.value.connnected = true ;
  27.   console.info("createConnection successful...")
  28. }
  29. // 端口链接事件处理函数
  30. const closeConnection = () => {
  31.   // 关闭链接
  32.   client.value.end(false , () => {   // 如果设置为true,将会立即关闭套接字,并且不发送MQTT DISCONNECT包。如果设置为false(默认值),则会发送MQTT DISCONNECT包给代理,然后关闭套接字。
  33.     clientInitData.value.connnected = false;
  34.     console.info("closeConnection successful...")
  35.   })
  36. }
  37. </script>
  38. <template>
  39.   <div class="mqtt-demo">
  40.     <el-card>
  41.       <h1>配置信息</h1>
  42.       <el-form label-position="top" >
  43.         <el-row :gutter="20">
  44.           <el-col :span="8">
  45.             <el-form-item prop="protocol" label="选择协议">
  46.               <el-select v-model="connectionInfo.protocol">
  47.                 <el-option label="ws://" value="ws"></el-option>
  48.                 <el-option label="wss://" value="wss"></el-option>
  49.               </el-select>
  50.             </el-form-item>
  51.           </el-col>
  52.           <el-col :span="8">
  53.             <el-form-item prop="host" label="主机地址">
  54.               <el-input v-model="connectionInfo.host" ></el-input>
  55.             </el-form-item>
  56.           </el-col>
  57.           <el-col :span="8">
  58.             <el-form-item prop="port" label="端口号">
  59.               <el-input type="number" v-model="connectionInfo.port" placeholder="8083/8084"></el-input>
  60.             </el-form-item>
  61.           </el-col>
  62.           <el-col :span="8">
  63.             <el-form-item prop="clientId" label="客户端ID">
  64.               <el-input v-model="connectionInfo.clientId"> </el-input>
  65.             </el-form-item>
  66.           </el-col>
  67.           <el-col :span="8">
  68.             <el-form-item prop="username" label="用户名">
  69.               <el-input v-model="connectionInfo.username"></el-input>
  70.             </el-form-item>
  71.           </el-col>
  72.           <el-col :span="8">
  73.             <el-form-item prop="password" label="密码">
  74.               <el-input v-model="connectionInfo.password"></el-input>
  75.             </el-form-item>
  76.           </el-col>
  77.           <el-col :span="24">
  78.             <el-button type="primary" :disabled="clientInitData.connnected" @click="createConnection">建立连接</el-button>
  79.             <el-button type="danger" :disabled="!clientInitData.connnected" @click="closeConnection">断开连接</el-button>
  80.           </el-col>
  81.         </el-row>
  82.       </el-form>
  83.     </el-card>
  84.    </div>
  85. </template>
复制代码
6、订阅和取消订阅
  1. <script setup>
  2.    
  3. // 消息质量取值数组
  4. const qosList = [0, 1, 2];
  5. const receivedMessages = ref(null)
  6. const subscriptionInfo = ref({     // 订阅参数数据模型
  7.   topic: '' ,
  8.   qos: 0
  9. })
  10. const subscriptionInitData = ref({        // 订阅初始化数据
  11.   subscription: false
  12. })
  13. // 定义定义主题的事件处理函数
  14. const subscriptionTopicHandler = () => {
  15.   const { topic, qos } = subscriptionInfo.value
  16.   console.info(qos)
  17.   client.value.subscribe(topic, { qos } , (error , res) => {
  18.     if(error) {
  19.       console.info("subscriptionTopic Error:", error);
  20.       return ;
  21.     }
  22.     subscriptionInitData.value.subscription = true ;
  23.     console.info("subscriptionTopic successful.... ");
  24.     // 订阅成功以后,监听发送消息事件
  25.     client.value.on('message' , (topic , message) => {
  26.       console.info("topic -----> " + topic + ", message -----> " + message)
  27.       receivedMessages.value = "topic -----> " + topic + ", message -----> " + message ;
  28.     })
  29.   })
  30. }
  31. // 定义取消订阅的事件处理函数
  32. const unSubscriptionTopicHandler = () => {
  33.   const {topic, qos } = subscriptionInfo.value
  34.   client.value.unsubscribe(topic, { qos } , (error , res) => {
  35.     if(error) {
  36.       console.info("unSubscriptionTopic Error:", error);
  37.       return ;
  38.     }
  39.     subscriptionInitData.value.subscription = false ;
  40.     console.info("unSubscriptionTopic successful.... ");
  41.   })
  42. }
  43. </script>
  44. <template>
  45.             <el-card>
  46.           <h1>订阅主题</h1>
  47.           <el-form label-position="top" >
  48.             <el-row :gutter="20">
  49.               <el-col :span="8">
  50.                 <el-form-item prop="topic" label="Topic">
  51.                   <el-input v-model="subscriptionInfo.topic"></el-input>
  52.                 </el-form-item>
  53.               </el-col>
  54.               <el-col :span="8">
  55.                 <el-form-item prop="qos" label="QoS">
  56.                   <el-select v-model="subscriptionInfo.qos">
  57.                     <el-option
  58.                         v-for="qos in qosList"
  59.                         :key="qos"
  60.                         :label="qos"
  61.                         :value="qos"
  62.                     ></el-option>
  63.                   </el-select>
  64.                 </el-form-item>
  65.               </el-col>
  66.               <el-col :span="8">
  67.                 <el-button  type="primary" class="sub-btn" :disabled="subscriptionInitData.subscription" @click="subscriptionTopicHandler">订阅主题</el-button>
  68.                 <el-button type="primary" class="sub-btn" :disabled="!subscriptionInitData.subscription" @click="unSubscriptionTopicHandler">取消订阅</el-button>
  69.               </el-col>
  70.             </el-row>
  71.           </el-form>
  72.         </el-card>
  73. </template>
复制代码
7、发布消息
  1. <script setup>
  2.    
  3. // 发布消息参数
  4. const publishInfo = ref({
  5.   topic: '' ,
  6.   qos: 0,
  7.   payLoad: ''
  8. })
  9.    
  10. // 定义发布消息的事件处理函数
  11. const doPublish = () => {
  12.   const {topic , payLoad , qos } = publishInfo.value ;
  13.   client.value.publish(topic , payLoad , { qos } , (error , res) => {
  14.     if(error) {
  15.       console.info("publish msg info error...." , error)
  16.       return ;
  17.     }
  18.     console.info("publish msg info successful....")
  19.   }) ;
  20. }
  21. </script>
  22. <template>
  23.         <el-card>
  24.       <h1>发布消息</h1>
  25.       <el-form label-position="top" >
  26.         <el-row :gutter="20">
  27.           <el-col :span="8">
  28.             <el-form-item prop="topic" label="Topic">
  29.               <el-input v-model="publishInfo.topic"></el-input>
  30.             </el-form-item>
  31.           </el-col>
  32.           <el-col :span="8">
  33.             <el-form-item prop="payload" label="Payload">
  34.               <el-input v-model="publishInfo.payLoad"></el-input>
  35.             </el-form-item>
  36.           </el-col>
  37.           <el-col :span="8">
  38.             <el-form-item prop="qos" label="QoS">
  39.               <el-select v-model="publishInfo.qos">
  40.                 <el-option
  41.                     v-for="qos in qosList"
  42.                     :key="qos"
  43.                     :label="qos"
  44.                     :value="qos"
  45.                 ></el-option>
  46.               </el-select>
  47.             </el-form-item>
  48.           </el-col>
  49.         </el-row>
  50.       </el-form>
  51.       <el-col :span="24" class="text-right">
  52.         <el-button type="primary" @click="doPublish">发布消息</el-button>
  53.       </el-col>
  54.     </el-card>
  55. </template>
复制代码
2 在Java中利用MQTT

2.1 Eclipse Paho Java Client

详细步骤:
1、创建一个Spring Boot项目,添加如下依赖
  1. <parent>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-parent</artifactId>
  4.     <version>3.0.5</version>
  5. </parent>
  6. <dependencies>
  7.     <!-- spring boot整合junit单元测试的起步依赖 -->
  8.     <dependency>
  9.         <groupId>org.springframework.boot</groupId>
  10.         <artifactId>spring-boot-starter-test</artifactId>
  11.     </dependency>
  12.     <!-- mqtt java客户端依赖 -->
  13.     <dependency>
  14.         <groupId>org.eclipse.paho</groupId>
  15.         <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  16.         <version>1.2.5</version>
  17.     </dependency>
  18. </dependencies>
复制代码
2、创建毗连代码实现
  1. @Test
  2. public void createConnection() throws MqttException {
  3.     // 定义链接相关参数
  4.     String broker = "tcp://192.168.136.147:1883";
  5.     String username = "zhangsan";
  6.     String password = "123";
  7.     String clientid = "mqtt_java_client_01";
  8.     // 创建MqttJava客户端对象
  9.     // MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息
  10.     MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());   
  11.     MqttConnectOptions options = new MqttConnectOptions();
  12.     options.setUserName(username);
  13.     options.setPassword(password.toCharArray());
  14.     client.connect(options);
  15.     // 阻塞当前线程
  16.     while (true) ;
  17. }
复制代码
3、发布消息代码演示
  1. @Test
  2. public void sendMessage() throws MqttException {
  3.     // 定义链接相关参数
  4.     String broker = "tcp://192.168.136.147:1883";
  5.     String username = "zhangsan";
  6.     String password = "123";
  7.     String clientid = "mqtt_java_client_01";
  8.     // 创建MqttJava客户端对象
  9.     MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());
  10.     MqttConnectOptions options = new MqttConnectOptions();
  11.     options.setUserName(username);
  12.     options.setPassword(password.toCharArray());
  13.     client.connect(options);
  14.     // 创建消息对象QoS
  15.     String content = "hello mqtt";
  16.     MqttMessage message = new MqttMessage(content.getBytes());
  17.     message.setQos(2);
  18.     message.setRetained(true);
  19.     // 发送消息
  20.     client.publish("a/c" , message);
  21.     // 关闭链接释放资源
  22.     client.disconnect();
  23.     client.close();
  24. }
复制代码
4、订阅主题获取消息
  1. @Test
  2. public void receiveMessage() throws MqttException {
  3.     // 定义链接相关参数
  4.     String broker = "tcp://192.168.136.147:1883";
  5.     String username = "zhangsan";
  6.     String password = "123";
  7.     String clientid = "mqtt_java_client_02";
  8.     // 创建MqttJava客户端对象
  9.     MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());
  10.     MqttConnectOptions options = new MqttConnectOptions();
  11.     options.setUserName(username);
  12.     options.setPassword(password.toCharArray());
  13.     // 添加回调函数获取主题消息
  14.     client.setCallback(new MqttCallback() {
  15.         
  16.         @Override
  17.         public void connectionLost(Throwable cause) {  // 连接丢失时被调用
  18.             System.out.println("connectionLost: " + cause.getMessage());
  19.         }
  20.         @Override
  21.         public void messageArrived(String topic, MqttMessage message) throws Exception {  // 接收到消息时被调用
  22.             System.out.println("topic: " + topic);
  23.             System.out.println("Qos: " + message.getQos());
  24.             System.out.println("message content: " + new String(message.getPayload()));
  25.         }
  26.         @Override
  27.         public void deliveryComplete(IMqttDeliveryToken token) {  // 消息接收完成时被调用
  28.             System.out.println("deliveryComplete---------" + token.isComplete());
  29.         }
  30.         
  31.     });
  32.     // 订阅主题
  33.     client.connect(options);
  34.     client.subscribe("a/d" , 2);
  35.     while(true) ;
  36. }
复制代码
2.2 spring-integration-mqtt

2.2.1 底子环境搭建

1、创建一个Spring Boot项目,并加入如下依赖:
  1. <dependencies>
  2.        
  3.     <!-- spring boot项目web开发的起步依赖 -->
  4.     <dependency>
  5.         <groupId>org.springframework.boot</groupId>
  6.         <artifactId>spring-boot-starter-web</artifactId>
  7.     </dependency>
  8.         <!-- spring boot项目集成消息中间件基础依赖 -->
  9.     <dependency>
  10.         <groupId>org.springframework.boot</groupId>
  11.         <artifactId>spring-boot-starter-integration</artifactId>
  12.     </dependency>
  13.         <!-- spring boot项目和mqtt客户端集成起步依赖 -->
  14.     <dependency>
  15.         <groupId>org.springframework.integration</groupId>
  16.         <artifactId>spring-integration-mqtt</artifactId>
  17.         <version>5.4.3</version>
  18.     </dependency>
  19.     <!-- lombok依赖 -->
  20.     <dependency>
  21.         <groupId>org.projectlombok</groupId>
  22.         <artifactId>lombok</artifactId>
  23.     </dependency>
  24.     <!-- fastjson依赖 -->
  25.     <dependency>
  26.         <groupId>com.alibaba</groupId>
  27.         <artifactId>fastjson</artifactId>
  28.         <version>1.2.83</version>
  29.     </dependency>
  30. </dependencies>
复制代码
2、编写启动类
  1. @EnableConfigurationProperties(value = MqttConfigurationProperties.class)
  2. @SpringBootApplication
  3. public class MqttDemoApplication {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(MqttDemoApplication.class , args) ;
  6.     }
  7. }
复制代码
3、在application.yml文件中添加如下配置
  1. spring:
  2.   mqtt:
  3.     username: zhangsan
  4.     password: 123
  5.     url: tcp://192.168.136.147:1883
  6.     subClientId: sub_client_id_123
  7.     subTopic: liujh/iot/lamp/line
  8.     pubClientId: pub_client_id_123
复制代码
4、创建实体类读取自定义配置
  1. @Data
  2. @ConfigurationProperties(prefix = "spring.mqtt")
  3. public class MqttConfigurationProperties {
  4.     private String username;
  5.     private String password;
  6.     private String url;
  7.     private String subClientId ;
  8.     private String subTopic ;
  9.     private String pubClientId ;
  10. }
复制代码
5、创建配置类配置链接工厂
  1. @Configuration
  2. public class MqttConfiguration {
  3.     @Autowired
  4.     private MqttConfigurationProperties mqttConfigurationProperties ;
  5.     @Bean
  6.     public MqttPahoClientFactory mqttClientFactory(){
  7.         // 创建客户端工厂
  8.         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  9.         // 创建MqttConnectOptions对象
  10.         MqttConnectOptions options = new MqttConnectOptions();
  11.         options.setCleanSession(true);
  12.         options.setUserName(mqttConfigurationProperties.getUsername());
  13.         options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
  14.         options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
  15.         factory.setConnectionOptions(options);
  16.         // 返回
  17.         return factory;
  18.     }
  19. }
复制代码
2.2.2 订阅主题获取消息

详细步骤:
1、配置入站适配器
  1. @Configuration
  2. public class MqttInboundConfiguration {
  3.     @Autowired
  4.     private MqttConfigurationProperties mqttConfigurationProperties ;
  5.     @Autowired
  6.     private ReceiverMessageHandler receiverMessageHandler;
  7.     /**
  8.      * 配置消息传输通道
  9.      * @return
  10.      */
  11.     @Bean
  12.     public MessageChannel mqttInputChannel() {
  13.         return new DirectChannel();
  14.     }
  15.     /**
  16.      * 配置入站适配器
  17.      */
  18.     @Bean
  19.     public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {
  20.         MqttPahoMessageDrivenChannelAdapter adapter  =
  21.                 new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl() ,
  22.                         mqttConfigurationProperties.getSubClientId() ,
  23.                         mqttPahoClientFactory , mqttConfigurationProperties.getSubTopic().split(",")) ;
  24.         adapter.setConverter(new DefaultPahoMessageConverter());
  25.         adapter.setQos(1);
  26.         adapter.setOutputChannel(mqttInputChannel());
  27.         return adapter ;
  28.     }
  29.     /**
  30.      * 配置入站消息处理器
  31.      * @return
  32.      */
  33.     @Bean
  34.     @ServiceActivator(inputChannel = "mqttInputChannel")
  35.     public MessageHandler messageHandler() {
  36.         return this.receiverMessageHandler ;
  37.     }
  38. }
复制代码
2、定义监听主题消息的处理器
  1. @Component
  2. public class ReceiverMessageHandler implements MessageHandler {
  3.     @Override
  4.     public void handleMessage(Message<?> message) throws MessagingException {
  5.         MessageHeaders headers = message.getHeaders();
  6.         String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
  7.         if("liujh/iot/lamp/line".equals(receivedTopicName)) {
  8.             System.out.println("接收到消息:" + message.getPayload());
  9.         }
  10.     }
  11. }
复制代码
测试:通过MQTTX向liujh/iot/lamp/line主题发送消息
2.2.3 向指定主题发送消息

详细步骤:
1、配置出站消息处理器
  1. @Configuration
  2. public class MqttOutboundConfiguration {
  3.     @Autowired
  4.     private MqttConfigurationProperties mqttConfigurationProperties ;
  5.     @Autowired
  6.     private MqttPahoClientFactory pahoClientFactory ;
  7.     @Bean
  8.     public MessageChannel mqttOutputChannel() {
  9.         return new DirectChannel();
  10.     }
  11.     @Bean
  12.     @ServiceActivator(inputChannel = "mqttOutputChannel")
  13.     public MessageHandler mqttOutboundMassageHandler() {
  14.         MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl() ,
  15.                 mqttConfigurationProperties.getPubClientId() , pahoClientFactory ) ;
  16.         messageHandler.setAsync(true);
  17.         messageHandler.setDefaultQos(0);
  18.         messageHandler.setDefaultTopic("default");
  19.         return messageHandler ;
  20.     }
  21. }
复制代码
2、定义发送消息的网关接口
  1. @MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
  2. public interface MqttGateway {
  3.     /**
  4.      * 发送mqtt消息
  5.      * @param topic 主题
  6.      * @param payload 内容
  7.      */
  8.     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
  9.     /**
  10.      * 发送包含qos的消息
  11.      * @param topic 主题
  12.      * @param qos 对消息处理的几种机制。
  13.      *          * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
  14.      *          * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
  15.      *          * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  16.      * @param payload 消息体
  17.      */
  18.     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
  19.    
  20. }
复制代码
3、定义发送消息的服务类
  1. @Component
  2. @AllArgsConstructor
  3. public class MqttMessageSender {
  4.     private MqttGateway mqttGateway;
  5.     /**
  6.      * 发送mqtt消息
  7.      * @param topic 主题
  8.      * @param message 内容
  9.      */
  10.     public void send(String topic, String message) {
  11.         mqttGateway.sendToMqtt(topic, message);
  12.     }
  13.     /**
  14.      * 发送包含qos的消息
  15.      * @param topic 主题
  16.      * @param qos 质量
  17.      * @param message 消息体
  18.      */
  19.     public void send(String topic, int qos, byte[] message){
  20.         mqttGateway.sendToMqtt(topic, qos, message);
  21.     }
  22. }
复制代码
3 智能灯泡案例

需求:
1、智能灯泡装备上线以后向MQTT服务端发送消息,后端服务从MQTT中获取消息记录装备信息到数据库中
2、后端微服务向MQTT服务端发送开灯或者关灯消息,装备端从MQTT中获取消息控制灯泡的开和关
3、装备端对灯泡举行开和关操作的时候向MQTT中发送消息,后端服务获取MQTT消息记录灯泡的开关状态
3.1 服务端获取装备上线消息

3.1.1 环境准备

详细步骤:
1、创建对应的数据库表
  1. -- 智能灯泡设备表
  2. CREATE TABLE `tb_lamp` (
  3.   `id` bigint NOT NULL AUTO_INCREMENT,
  4.   `deviceId` varchar(50) DEFAULT NULL,
  5.   `status` int DEFAULT NULL COMMENT '1:上线  0:下线',
  6.   `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  7.   `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  8.   PRIMARY KEY (`id`)
  9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
  10. -- 智能灯泡设备状态表
  11. CREATE TABLE `tb_lamp_status` (
  12.   `id` int NOT NULL AUTO_INCREMENT,
  13.   `deviceId` varchar(50) DEFAULT NULL,
  14.   `status` int DEFAULT NULL COMMENT '0: 关灯   1:开灯',
  15.   `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  16.   PRIMARY KEY (`id`)
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码
2、在spring-integration-mqtt案例中加入如下依赖
  1. <dependency>
  2.     <groupId>com.baomidou</groupId>
  3.     <artifactId>mybatis-plus-boot-starter</artifactId>
  4.     <version>3.5.3.1</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>mysql</groupId>
  8.     <artifactId>mysql-connector-java</artifactId>
  9.     <version>8.0.32</version>
  10. </dependency>
复制代码
3、在application.yml文件中加入如下依赖
  1. spring:
  2.   datasource:
  3.     driver-class-name: com.mysql.cj.jdbc.Driver
  4.     url: jdbc:mysql://192.168.136.147:3306/lamp_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
  5.     username: root
  6.     password: 1234
  7. mybatis-plus:
  8.   configuration:
  9.     log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
  10.     map-underscore-to-camel-case: true
  11.   mapper-locations: classpath*:mapper/*Mapper.xml
复制代码
4、通过mybatis的逆向工程天生tb_lamp和tb_lamp_status表对应的底子代码
5、在启动类上添加@MapperScan注解指定Mapper接口的包路径
3.1.2 接口分析

   接口一:装备上线
  当终端装备毗连上EMQX以后,发奉上线消息到EMQX服务端,分析如下:
  1. 主题: liujh/iot/lamp/line
  2. 消息内容:
  3.     {
  4.         "deviceId": "xxxxxx",
  5.         "online": 1
  6.     }
  7. 数据说明:
  8.         deviceId: 设备id
  9.         online:   上线状态,1表示上线,0表示离线
复制代码
3.1.3 业务代码

对ReceiverMessageHandler类的代码举行如下改造:
  1. @Component
  2. public class ReceiverMessageHandler implements MessageHandler {
  3.     @Autowired
  4.     private TbLampService tbLampService ;
  5.     @Override
  6.     public void handleMessage(Message<?> message) throws MessagingException {
  7.         MessageHeaders headers = message.getHeaders();
  8.         String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
  9.         if("liujh/iot/lamp/line".equals(receivedTopicName)) {
  10.             tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态
  11.         }
  12.     }
  13. }
复制代码
对TbLampServiceImpl类的代码举行如下改造:
  1. @Service
  2. public class TbLampServiceImpl extends ServiceImpl<TbLampMapper, TbLamp> implements TbLampService {
  3.     @Override
  4.     public void updateLampOnlineStatus(String jsonInfo) {
  5.         // 解析消息获取设备id和上线状态
  6.         Map<String ,  Object> map = JSON.parseObject(jsonInfo, Map.class);
  7.         String deviceId = map.get("deviceId").toString();
  8.         Integer status = Integer.parseInt(map.get("online").toString());
  9.         // 根据设备的id查询设备数据
  10.         LambdaQueryWrapper<TbLamp> lambdaQueryWrapper = new LambdaQueryWrapper<>() ;
  11.         lambdaQueryWrapper.eq(TbLamp::getDeviceid , deviceId) ;
  12.         TbLamp tbLamp = this.getOne(lambdaQueryWrapper);
  13.         if(tbLamp == null) {        // 设备不存在,新增设备
  14.            tbLamp = new TbLamp() ;
  15.            tbLamp.setDeviceid(deviceId);
  16.            tbLamp.setStatus(status);
  17.            this.save(tbLamp) ;
  18.         }else {     // 设备已经存在,修改设备的状态
  19.             tbLamp.setStatus(status);
  20.             tbLamp.setUpdateTime(new Date());
  21.             this.updateById(tbLamp) ;
  22.         }
  23.     }
  24. }
复制代码
3.2 服务端发送关灯开灯消息到MQTT

3.2.1 接口分析

   接口三:后端发送消息控制智能灯泡开关
  后端可以发送控制灯泡状态消息到EMQX中,装备端监听指定主题获取消息,控制灯泡的开关状态,分析如下:
  1. 主题: liujh/iot/lamp/server/status
  2. 消息内容:
  3.         {
  4.                 "deviceId": "xxxxxx",
  5.                 "status": 0
  6.         }
  7. 数据说明:               
  8.         status:        0:关灯   , 1:开灯
复制代码
3.2.2 业务代码

在spring-integration-mqtt案例中添加如下controller接口方法
  1. @RestController
  2. @RequestMapping(value = "/api/lamp")
  3. public class LampApiController {
  4.     @Autowired
  5.     private MqttMessageSender mqttMessageSender;
  6.     @GetMapping(value = "/{deviceId}/{status}")
  7.     public String sendStatusLampMsg(@PathVariable(value = "deviceId") String deviceId , @PathVariable(value = "status") Integer status) {
  8.         Map<String , Object> map = new HashMap<>() ;
  9.         map.put("deviceId" , deviceId) ;
  10.         map.put("status" , status) ;
  11.         String json = JSON.toJSONString(map);
  12.         mqttMessageSender.send("liujh/iot/lamp/server/status" , json);
  13.         return "ok" ;
  14.     }
  15. }
复制代码
3.3 服务端获取装备开灯关灯消息

3.3.1 接口分析

   接口四:装备端改变智能灯泡开关的状态,状态发给给后端,后端记录状态
  1. 主题:liujh/iot/lamp/device/status
  2. 消息内容:
  3.         {
  4.                 "deviceId": "xxxxx"  
  5.                 "status": 0
  6.         }
  7. 数据说明:       
  8.         deviceId:设备id
  9.         status:0:关灯   , 1:开灯
复制代码
3.3.2 业务代码

对ReceiverMessageHandler类的代码举行如下改造:
  1. // com.liujh.mqtt.receiver.ReceiverMessageHandler
  2. @Override
  3. public void handleMessage(Message<?> message) throws MessagingException {
  4.     MessageHeaders headers = message.getHeaders();
  5.     String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
  6.     if("liujh/iot/lamp/line".equals(receivedTopicName)) {
  7.         tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态
  8.     }else if("liujh/iot/lamp/device/status".equals(receivedTopicName)) {
  9.         tbLampStatusService.saveDeviceStatus(message.getPayload().toString()) ;
  10.     }
  11. }
复制代码
对TbLampStatusServiceImpl类的代码举行如下改造:
  1. @Service
  2. public class TbLampStatusServiceImpl extends ServiceImpl<TbLampStatusMapper, TbLampStatus> implements TbLampStatusService {
  3.     @Override
  4.     public void saveDeviceStatus(String json) {
  5.         // 获取消息内容
  6.         Map<String , Object> map = JSON.parseObject(json, Map.class);
  7.         String deviceId = map.get("deviceId").toString();
  8.         Integer status = Integer.parseInt(map.get("status").toString());
  9.         // 创建对象封装消息
  10.         TbLampStatus tbLampStatus = new TbLampStatus() ;
  11.         tbLampStatus.setDeviceid(deviceId);
  12.         tbLampStatus.setStatus(status);
  13.         this.save(tbLampStatus) ;
  14.     }
  15. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

魏晓东

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表