EMQX构建浅易的云服务

打印 上一主题 下一主题

主题 1034|帖子 1034|积分 3102

根本思路:


  • 使用EMQX作为Mqtt broker
  • mqtt-receive-server服务,用于接收设备上报的数据
  • mqtt-sender-service服务,用于下发数据给设备
  • KafKa实现数据解耦,mqtt-receive-server服务接收的数据简朴处理下直接扔到Kafka中
  • 云服务各业务体系从KafKa中消费数据,各业务须要下发数据的话,调用mqtt-sender-service接口下发数据给设备
根本流程


DashBoard 界说认证用户


界说Mqtt协议主题

  1. // 设备激活
  2. public final static String ACTIVATE = "mqtt/0/1";
  3. // 设备重置
  4. public final static String RESET = "mqtt/0/0";
  5. // 上线
  6. public final static String ONLINE = "mqtt/1/1";
  7. // 下线
  8. public final static String OFFLINE = "mqtt/1/0";
  9. // 上行-设备上报数据到平台
  10. public final static String REPORT = "mqtt/2/1";
  11. // 下行-平台下发数据给设备
  12. public final static String ISSUED = "%s/2/0";
复制代码
设备认证流程

   首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,初次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否正当,不正当将设备踢下线。
设备订阅${clientId}/2/0主题
  1. @PostConstruct
  2. public void init() throws MqttException {
  3.      client.setCallback(new MqttCallbackHandler());
  4.      client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));
  5. }
复制代码
mqtt-receive-server服务

   使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简朴处理,转发到KafKa
  1. mqtt:
  2.     broker-url: tcp://42.194.132.44:1883
  3.     client-id: mqtt_receive_server
  4.     username: mqtt_server
  5.     password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
复制代码
  1. @PostConstruct
  2. public void init() throws MqttException {
  3.     client.setCallback(new MqttCallbackHandler(kafkaService));
  4.     subscribe(MqttTopicConstant.ACTIVATE);
  5.     subscribe(MqttTopicConstant.RESET);
  6.     subscribe(MqttTopicConstant.ONLINE);
  7.     subscribe(MqttTopicConstant.OFFLINE);
  8.     subscribe(MqttTopicConstant.REPORT);
  9. }
复制代码
  1. @Override
  2. public void messageArrived(String topic, MqttMessage message) throws Exception {
  3.     String data = new String(message.getPayload());
  4.     log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
  5.     UpData upData = JSONObject.parseObject(data, UpData.class);
  6.     UpKafKaData upKafKaData = new UpKafKaData(topic, data);
  7.     log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
  8.     kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
  9. }
复制代码
mqtt-sender-service服务

   使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子体系使用,用于下发数据给设备
  1. mqtt:
  2.     broker-url: tcp://42.194.132.44:1883
  3.     client-id: mqtt_sender_server
  4.     username: mqtt_server
  5.     password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
复制代码
  1. package com.angel.ocean.listener;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.angel.ocean.contants.MqttTopicConstant;
  4. import com.angel.ocean.domain.UpKafKaData;
  5. import com.angel.ocean.domain.client.ActivateData;
  6. import com.angel.ocean.mqtt.MqttService;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.springframework.kafka.annotation.KafkaListener;
  10. import org.springframework.kafka.support.Acknowledgment;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.Resource;
  13. import java.util.List;
  14. import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;
  15. @Slf4j
  16. @Component
  17. public class UpDataConsumerListener {
  18.     @Resource
  19.     private MqttService mqttService;
  20.     /**
  21.      * 批量消费
  22.      */
  23.     @KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")
  24.     public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
  25.         try {
  26.             log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());
  27.             for (ConsumerRecord<String, String> record : records) {
  28.                 UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);
  29.                 log.info("{}", record.value());
  30.                 handler(data.getTopic(), data.getData());
  31.             }
  32.         } catch (Exception e) {
  33.             log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);
  34.         } finally {
  35.             // 手动确认
  36.             ack.acknowledge();
  37.         }
  38.     }
  39.     private void handler(String topic, String data) {
  40.         switch (topic) {
  41.             case MqttTopicConstant.ACTIVATE:
  42.                 activateHandler(data);
  43.                 break;
  44.             case MqttTopicConstant.RESET:
  45.                 otherHandler(data);
  46.                 break;
  47.             case MqttTopicConstant.OFFLINE:
  48.                 otherHandler(data);
  49.                 break;
  50.             case MqttTopicConstant.ONLINE:
  51.                 otherHandler(data);
  52.                 break;
  53.             case MqttTopicConstant.REPORT:
  54.                 otherHandler(data);
  55.                 break;
  56.             default:
  57.                 otherHandler(data);
  58.         }
  59.     }
  60.     private void activateHandler(String data) {
  61.         ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);
  62.         String clientId = activateData.getClientId();
  63.         mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");
  64.     }
  65.     private void otherHandler(String data) {
  66.         log.info("{}", data);
  67.     }
  68. }
复制代码
  1. package com.angel.ocean.controller;
  2. import com.angel.ocean.common.ApiResult;
  3. import com.angel.ocean.contants.MqttTopicConstant;
  4. import com.angel.ocean.mqtt.MqttService;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.eclipse.paho.client.mqttv3.MqttClient;
  7. import org.eclipse.paho.client.mqttv3.MqttException;
  8. import org.eclipse.paho.client.mqttv3.MqttMessage;
  9. import org.eclipse.paho.client.mqttv3.MqttTopic;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import javax.annotation.Resource;
  13. @Slf4j
  14. @RestController
  15. @RequestMapping("/mqtt/server")
  16. public class MqttController {
  17.     @Resource
  18.     private MqttClient server;
  19.     @Resource
  20.     private MqttService mqttService;
  21.     /**
  22.      * 数据下发接口
  23.      * @param clientId
  24.      * @param data
  25.      * @return
  26.      */
  27.     @RequestMapping("/sender")
  28.     public ApiResult<?> publish(String clientId, String data) {
  29.         String topic = String.format(MqttTopicConstant.ISSUED, clientId);
  30.         mqttService.publish(topic, data);
  31.         if(server.isConnected()) {
  32.             MqttMessage message = new MqttMessage(data.getBytes());
  33.             message.setQos(0);
  34.             try {
  35.                 server.publish(topic, message);
  36.                 log.info("Message published, topic:{}, data:{}", topic, data);
  37.             } catch (MqttException e) {
  38.                 log.error("Message publish failed, topic:{}", topic, e);
  39.                 return ApiResult.error();
  40.             }
  41.             return ApiResult.success();
  42.         }
  43.         log.info("Message publish failed, not online.");
  44.         return ApiResult.error();
  45.     }
  46. }
复制代码
代码验证

   场景:设备上报消息,云服务端复兴消息给设备; 云服务自动下发数据给设备。
  模仿设备上报消息, 接收云平台复兴

发了两次:
mqtt-client 当地客户端日记:

mqtt-receive-server云服务日记:

mqtt-sender-server云服务日记:

模仿云平台自动下发数据

mqtt-sender-server云服务自动下发的日记:
mqtt-client数据接收日记:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王海鱼

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表