MQTT 概述
MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。 可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。 使用MQTT协议,消息发送者与接收者不受时间和空间的限制。
Docker 部署 MQTT(采用docker-compose.yml)
- version: "3"
- services:
- mqtt:
- image: eclipse-mosquitto
- container_name: mqtt
- privileged: true
- ports:
- - 1883:1883
- - 9001:9001
- volumes:
- - ./config:/mosquitto/config
- - ./data:/mosquitto/data
- - ./log:/mosquitto/log
复制代码
- 文件夹

- 创建 config/mosquitto.conf
- persistence true
- listener 1883
- persistence_location /mosquitto/data
- log_dest file /mosquitto/log/mosquitto.log
-
- # 关闭匿名模式
- # allow_anonymous true
- # 指定密码文件
- password_file /mosquitto/config/pwfile.conf
复制代码
- docker部署执行:docker compose up -d
- 设置访问权限(用户名:admin,密码:admin123)
- docker exec -it mqtt sh
- touch /mosquitto/config/pwfile.conf
- chmod -R 755 /mosquitto/config/pwfile.conf
- mosquitto_passwd -b /mosquitto/config/pwfile.conf admin admin123
复制代码
- 重启mqtt容器:docker compose restart
Springboot 整合
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.5</version>
- <relativePath/>
- </parent>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.integration</groupId>
- <artifactId>spring-integration-mqtt</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.62</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-test</artifactId>
- </dependency>
- </dependencies>
复制代码- mqtt.host=tcp://127.0.0.1:1883
- mqtt.clientId=mqttx_a071ba88
- mqtt.username=admin
- mqtt.password=admin123
- mqtt.topic=test_topic
- mqtt.timeout=36000
- mqtt.keepAlive=6000
复制代码- @Slf4j
- @Configuration
- public class MyMqttConfiguration {
- @Value("${mqtt.host}")
- String broker;
- @Value("${mqtt.clientId}")
- String clientId;
- @Value("${mqtt.username}")
- String username;
- @Value("${mqtt.password}")
- String password;
- @Value("${mqtt.timeout}")
- Integer timeout;
- @Value("${mqtt.keepAlive}")
- Integer keepAlive;
- @Value("${mqtt.topic}")
- String topic;
- @Autowired
- MyHandle myHandle;
- @Bean
- public MyMqttClient myMqttClient(){
- MyMqttClient mqttClient = new MyMqttClient(broker, username, password, clientId, timeout, keepAlive,myHandle);
- for (int i = 0; i < 10; i++) {
- try {
- mqttClient.connect();
- mqttClient.subscribe(topic,0);
- return mqttClient;
- } catch (MqttException e) {
- log.error("MQTT connect exception,connect time = " + i);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- }
- }
- return mqttClient;
- }
- }
复制代码- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.*;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- import org.springframework.util.ObjectUtils;
- @Slf4j
- public class MyMqttClient {
- private static MqttClient client;
- private String host;
- private String clientId;
- private String username;
- private String password;
- private Integer timeout;
- private Integer keepAlive;
- private MyHandle myHandle;
- public MyMqttClient(){
- System.out.println("MyMqttClient空构造函数");
- }
- public MyMqttClient(String host, String username, String password, String clientId, Integer timeOut, Integer keepAlive,MyHandle myHandle) {
- System.out.println("MyMqttClient全参构造");
- this.host = host;
- this.username = username;
- this.password = password;
- this.clientId = clientId;
- this.timeout = timeOut;
- this.keepAlive = keepAlive;
- this.myHandle = myHandle;
- }
- public static MqttClient getClient() {
- return client;
- }
- public static void setClient(MqttClient client) {
- MyMqttClient.client = client;
- }
- /**
- * 设置mqtt连接参数
- */
- public MqttConnectOptions setMqttConnectOptions(String username,String password,Integer timeout, Integer keepAlive){
- MqttConnectOptions options = new MqttConnectOptions();
- options.setUserName(username);
- options.setPassword(password.toCharArray());
- options.setConnectionTimeout(timeout);
- options.setKeepAliveInterval(keepAlive);
- options.setCleanSession(true);
- options.setAutomaticReconnect(true);
- return options;
- }
- /**
- * 连接mqtt服务端
- */
- public void connect() throws MqttException {
- if(client == null){
- client = new MqttClient(host,clientId,new MemoryPersistence());
- client.setCallback(new MyMqttCallback(MyMqttClient.this,this.myHandle));
- }
- MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepAlive);
- if(!client.isConnected()){
- client.connect(mqttConnectOptions);
- }else{
- client.disconnect();
- client.connect(mqttConnectOptions);
- }
- log.info("MQTT connect success");
- }
- /**
- * 断开连接
- * @throws MqttException
- */
- public void disconnect()throws MqttException{
- if(null!=client && client.isConnected()){
- client.disconnect();;
- }
- }
- /**
- * 发布,qos默认为0,非持久化
- */
- public void publish(String pushMessage,String topic,int qos){
- publish(pushMessage, topic, qos, false);
- }
- /**
- * 发布消息
- *
- * @param pushMessage
- * @param topic
- * @param qos
- * @param retained:留存
- */
- public void publish(String pushMessage, String topic, int qos, boolean retained) {
- MqttMessage mqttMessage = new MqttMessage();
- mqttMessage.setPayload(pushMessage.getBytes());
- mqttMessage.setQos(qos);
- mqttMessage.setRetained(retained);
- MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
- if(ObjectUtils.isEmpty(mqttTopic)){
- log.error("主题不存在");
- }
- synchronized (this){
- try{
- MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);
- mqttDeliveryToken.waitForCompletion(1000L);
- }catch (MqttException e){
- e.printStackTrace();
- }
- }
- }
- /**
- * 订阅
- *
- * @param topic
- * @param qos
- */
- public void subscribe(String topic, int qos) {
- try {
- MyMqttClient.getClient().subscribe(topic, qos);
- log.info("订阅主题:"+topic+"成功!");
- } catch (MqttException e) {
- log.error("订阅主题:"+topic+"失败!",e);
- }
- }
- /**
- * 取消订阅
- */
- public void cleanTopic(String topic){
- if(!ObjectUtils.isEmpty(client) && client.isConnected()){
- try{
- client.unsubscribe(topic);
- }catch (MqttException e){
- log.error("取消订阅失败!"+e);
- }
- }else{
- log.info("主题不存在或未连接!");
- }
- }
- }
复制代码- @Slf4j
- public class MyMqttCallback implements MqttCallbackExtended {
- private MyMqttClient myMqttClient;
- private MyHandle myHandle;
- public MyMqttCallback(MyMqttClient myMqttClient,MyHandle myHandle) {
- this.myMqttClient = myMqttClient;
- this.myHandle = myHandle;
- }
- /**
- * 连接完成
- * @param reconnect
- * @param serverURI
- */
- @Override
- public void connectComplete(boolean reconnect,String serverURI) {
- log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
- //订阅主题(可以在这里订阅主题)
- try {
- MyMqttClient.getClient().subscribe("topic1");
- } catch (MqttException e) {
- log.error("主题订阅失败");
- }
- }
- /**
- * 连接丢失 进行重连操作
- * @param throwable
- */
- @Override
- public void connectionLost(Throwable throwable) {
- log.warn("mqtt connectionLost >>> 5S之后尝试重连: {}", throwable.getMessage());
- long reconnectTimes = 1;
- while (true){
- try{
- Thread.sleep(5000);
- }catch (InterruptedException ignored){}
- try{
- if(MyMqttClient.getClient().isConnected()){ // 已连接
- return;
- }
- reconnectTimes+=1;
- log.warn("mqtt reconnect times = {} try again... mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
- MyMqttClient.getClient().reconnect();
- }catch (MqttException e){
- log.error("mqtt断链异常",e);
- }
- }
- }
- /**
- * 订阅者收到消息之后执行
- * @param topic
- * @param mqttMessage
- * @throws Exception
- */
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
- log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
- myHandle.handle(topic,mqttMessage);
- }
- /**
- * * 消息到达后
- * subscribe后,执行的回调函数
- * publish后,配送完成后回调的方法
- *
- * @param iMqttDeliveryToken
- */
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");
- log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
- }
- }
复制代码- import lombok.extern.slf4j.Slf4j;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- @Service
- @Slf4j
- public class MyHandle {
- @Async
- public void handle(String topic, MqttMessage message) {
- log.info("处理消息主题:" + topic + " 信息:" + message);
- }
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |