根本思路:
- 使用EMQX作为Mqtt broker
- mqtt-receive-server服务,用于接收设备上报的数据
- mqtt-sender-service服务,用于下发数据给设备
- KafKa实现数据解耦,mqtt-receive-server服务接收的数据简朴处理下直接扔到Kafka中
- 云服务各业务体系从KafKa中消费数据,各业务须要下发数据的话,调用mqtt-sender-service接口下发数据给设备
根本流程
DashBoard 界说认证用户
界说Mqtt协议主题
- // 设备激活
- public final static String ACTIVATE = "mqtt/0/1";
- // 设备重置
- public final static String RESET = "mqtt/0/0";
- // 上线
- public final static String ONLINE = "mqtt/1/1";
- // 下线
- public final static String OFFLINE = "mqtt/1/0";
- // 上行-设备上报数据到平台
- public final static String REPORT = "mqtt/2/1";
- // 下行-平台下发数据给设备
- public final static String ISSUED = "%s/2/0";
复制代码 设备认证流程
首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,初次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否正当,不正当将设备踢下线。
设备订阅${clientId}/2/0主题
- @PostConstruct
- public void init() throws MqttException {
- client.setCallback(new MqttCallbackHandler());
- client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));
- }
复制代码 mqtt-receive-server服务
使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简朴处理,转发到KafKa
- mqtt:
- broker-url: tcp://42.194.132.44:1883
- client-id: mqtt_receive_server
- username: mqtt_server
- password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
复制代码- @PostConstruct
- public void init() throws MqttException {
- client.setCallback(new MqttCallbackHandler(kafkaService));
- subscribe(MqttTopicConstant.ACTIVATE);
- subscribe(MqttTopicConstant.RESET);
- subscribe(MqttTopicConstant.ONLINE);
- subscribe(MqttTopicConstant.OFFLINE);
- subscribe(MqttTopicConstant.REPORT);
- }
复制代码- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- String data = new String(message.getPayload());
- log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
- UpData upData = JSONObject.parseObject(data, UpData.class);
- UpKafKaData upKafKaData = new UpKafKaData(topic, data);
- log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
- kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
- }
复制代码 mqtt-sender-service服务
使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子体系使用,用于下发数据给设备
- mqtt:
- broker-url: tcp://42.194.132.44:1883
- client-id: mqtt_sender_server
- username: mqtt_server
- password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
复制代码- package com.angel.ocean.listener;
- import com.alibaba.fastjson2.JSONObject;
- import com.angel.ocean.contants.MqttTopicConstant;
- import com.angel.ocean.domain.UpKafKaData;
- import com.angel.ocean.domain.client.ActivateData;
- import com.angel.ocean.mqtt.MqttService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.util.List;
- import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;
- @Slf4j
- @Component
- public class UpDataConsumerListener {
- @Resource
- private MqttService mqttService;
- /**
- * 批量消费
- */
- @KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")
- public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
- try {
- log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());
- for (ConsumerRecord<String, String> record : records) {
- UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);
- log.info("{}", record.value());
- handler(data.getTopic(), data.getData());
- }
- } catch (Exception e) {
- log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);
- } finally {
- // 手动确认
- ack.acknowledge();
- }
- }
- private void handler(String topic, String data) {
- switch (topic) {
- case MqttTopicConstant.ACTIVATE:
- activateHandler(data);
- break;
- case MqttTopicConstant.RESET:
- otherHandler(data);
- break;
- case MqttTopicConstant.OFFLINE:
- otherHandler(data);
- break;
- case MqttTopicConstant.ONLINE:
- otherHandler(data);
- break;
- case MqttTopicConstant.REPORT:
- otherHandler(data);
- break;
- default:
- otherHandler(data);
- }
- }
- private void activateHandler(String data) {
- ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);
- String clientId = activateData.getClientId();
- mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");
- }
- private void otherHandler(String data) {
- log.info("{}", data);
- }
- }
复制代码- package com.angel.ocean.controller;
- import com.angel.ocean.common.ApiResult;
- import com.angel.ocean.contants.MqttTopicConstant;
- import com.angel.ocean.mqtt.MqttService;
- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
- import javax.annotation.Resource;
- @Slf4j
- @RestController
- @RequestMapping("/mqtt/server")
- public class MqttController {
- @Resource
- private MqttClient server;
- @Resource
- private MqttService mqttService;
- /**
- * 数据下发接口
- * @param clientId
- * @param data
- * @return
- */
- @RequestMapping("/sender")
- public ApiResult<?> publish(String clientId, String data) {
- String topic = String.format(MqttTopicConstant.ISSUED, clientId);
- mqttService.publish(topic, data);
- if(server.isConnected()) {
- MqttMessage message = new MqttMessage(data.getBytes());
- message.setQos(0);
- try {
- server.publish(topic, message);
- log.info("Message published, topic:{}, data:{}", topic, data);
- } catch (MqttException e) {
- log.error("Message publish failed, topic:{}", topic, e);
- return ApiResult.error();
- }
- return ApiResult.success();
- }
- log.info("Message publish failed, not online.");
- return ApiResult.error();
- }
- }
复制代码 代码验证
场景:设备上报消息,云服务端复兴消息给设备; 云服务自动下发数据给设备。
模仿设备上报消息, 接收云平台复兴
发了两次:
mqtt-client 当地客户端日记:
mqtt-receive-server云服务日记:
mqtt-sender-server云服务日记:
模仿云平台自动下发数据
mqtt-sender-server云服务自动下发的日记:
mqtt-client数据接收日记:
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |