1)RabbitMQ启用MQTT插件
- root@mq:/# rabbitmq-plugins enable rabbitmq_mqtt
- Enabling plugins on node rabbit@mq:
- rabbitmq_mqtt
- The following plugins have been configured:
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_mqtt
- rabbitmq_web_dispatch
- Applying plugin configuration to rabbit@mq...
- The following plugins have been enabled:
- rabbitmq_mqtt
- started 1 plugins.
- root@mq:/# rabbitmq-plugins enable rabbitmq_web_mqtt
- Enabling plugins on node rabbit@mq:
- rabbitmq_web_mqtt
- The following plugins have been configured:
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_mqtt
- rabbitmq_web_dispatch
- rabbitmq_web_mqtt
- Applying plugin configuration to rabbit@mq...
- The following plugins have been enabled:
- rabbitmq_web_mqtt
- started 1 plugins.
- root@mq:/#
复制代码 2)RabbitMQ管理控制台查看
假如插件启动乐成,rabbitmq会打开1883和15675端口:
3)用MQTTX工具测试
4)用eclipse paho客户端测试
添加依赖
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>1.2.5</version>
- </dependency>
复制代码 收发消息测试
- @RestController
- public class DemoController {
- @GetMapping("/publish")
- public String publish() throws MqttException {
- MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
- MqttClient client = new MqttClient("tcp://192.168.137.138:1883", "abc", persistence);
- //连接选项中定义用户名密码和其它配置
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
- options.setAutomaticReconnect(true);//是否自动重连
- options.setConnectionTimeout(30);//连接超时时间 秒
- options.setKeepAliveInterval(10);//连接保持检查周期 秒
- options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
- options.setUserName("xjs1919");
- options.setPassword("123321".toCharArray());
- // client.setManualAcks(true);
- client.connect(options);//连接
- //订阅topic
- client.subscribe("demoTopic", 2);
- // 设置回调,将来收到消息的时候会被回调
- client.setCallback(new MqttCallbackExtended() {
- @Override
- public void connectComplete(boolean reconnect, String serverURI) {
- System.out.println("连接完成");
- }
- @Override
- public void connectionLost(Throwable cause) {
- System.out.println("连接丢失");
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- System.out.println("收到消息,topic:"+topic + ", msg:" + new String(message.getPayload()));
- //client.messageArrivedComplete(message.getId(),message.getQos());
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- // Qos0: 当消息发送出去就回调
- // Qos1: 当发送者收到了puback的时候的回调
- // Qos2: 当发送者收到了pubcomp的时候的回调
- System.out.println("消息发送完成");
- }
- });
- client.publish("demoTopic", "hello,这是一个测试消息!".getBytes(), 2, false);
- return "ok";
- }
- }
复制代码 参考文章:
https://www.cnblogs.com/motion/p/14974024.html
https://blog.csdn.net/u013615903/article/details/131395264
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |