RabbitMQ开启MQTT协议支持

打印 上一主题 下一主题

主题 597|帖子 597|积分 1791

1)RabbitMQ启用MQTT插件
  1. root@mq:/# rabbitmq-plugins enable rabbitmq_mqtt
  2. Enabling plugins on node rabbit@mq:
  3. rabbitmq_mqtt
  4. The following plugins have been configured:
  5.   rabbitmq_management
  6.   rabbitmq_management_agent
  7.   rabbitmq_mqtt
  8.   rabbitmq_web_dispatch
  9. Applying plugin configuration to rabbit@mq...
  10. The following plugins have been enabled:
  11.   rabbitmq_mqtt
  12. started 1 plugins.
  13. root@mq:/# rabbitmq-plugins enable rabbitmq_web_mqtt
  14. Enabling plugins on node rabbit@mq:
  15. rabbitmq_web_mqtt
  16. The following plugins have been configured:
  17.   rabbitmq_management
  18.   rabbitmq_management_agent
  19.   rabbitmq_mqtt
  20.   rabbitmq_web_dispatch
  21.   rabbitmq_web_mqtt
  22. Applying plugin configuration to rabbit@mq...
  23. The following plugins have been enabled:
  24.   rabbitmq_web_mqtt
  25. started 1 plugins.
  26. root@mq:/#
复制代码
2)RabbitMQ管理控制台查看
假如插件启动乐成,rabbitmq会打开1883和15675端口:

3)用MQTTX工具测试

4)用eclipse paho客户端测试
添加依赖
  1. <dependency>
  2.     <groupId>org.eclipse.paho</groupId>
  3.     <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  4.     <version>1.2.5</version>
  5. </dependency>
复制代码
收发消息测试
  1. @RestController
  2. public class DemoController {
  3.     @GetMapping("/publish")
  4.     public String publish() throws MqttException {
  5.         MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
  6.         MqttClient client = new MqttClient("tcp://192.168.137.138:1883", "abc", persistence);
  7.         //连接选项中定义用户名密码和其它配置
  8.         MqttConnectOptions options = new MqttConnectOptions();
  9.         options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
  10.         options.setAutomaticReconnect(true);//是否自动重连
  11.         options.setConnectionTimeout(30);//连接超时时间  秒
  12.         options.setKeepAliveInterval(10);//连接保持检查周期  秒
  13.         options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
  14.         options.setUserName("xjs1919");
  15.         options.setPassword("123321".toCharArray());
  16.         // client.setManualAcks(true);
  17.         client.connect(options);//连接
  18.         //订阅topic
  19.         client.subscribe("demoTopic", 2);
  20.         // 设置回调,将来收到消息的时候会被回调
  21.         client.setCallback(new MqttCallbackExtended() {
  22.             @Override
  23.             public void connectComplete(boolean reconnect, String serverURI) {
  24.                 System.out.println("连接完成");
  25.             }
  26.             @Override
  27.             public void connectionLost(Throwable cause) {
  28.                 System.out.println("连接丢失");
  29.             }
  30.             @Override
  31.             public void messageArrived(String topic, MqttMessage message) throws Exception {
  32.                 System.out.println("收到消息,topic:"+topic + ", msg:" + new String(message.getPayload()));
  33.                 //client.messageArrivedComplete(message.getId(),message.getQos());
  34.             }
  35.             @Override
  36.             public void deliveryComplete(IMqttDeliveryToken token) {
  37.                 // Qos0: 当消息发送出去就回调
  38.                 // Qos1: 当发送者收到了puback的时候的回调
  39.                 // Qos2: 当发送者收到了pubcomp的时候的回调
  40.                 System.out.println("消息发送完成");
  41.             }
  42.         });
  43.         client.publish("demoTopic", "hello,这是一个测试消息!".getBytes(), 2, false);
  44.         return "ok";
  45.     }
  46. }
复制代码
参考文章:
https://www.cnblogs.com/motion/p/14974024.html
https://blog.csdn.net/u013615903/article/details/131395264

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

自由的羽毛

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表