RabbitMQ实现即时通讯居然如此简朴!后端代码都省得写了 ...

打印 上一主题 下一主题

主题 924|帖子 924|积分 2772


  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
  • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
  • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
  • QoS 2(Exactly Once):只有一次,确保消息只到达一次。
RabbitMQ启用MQTT功能
=======================================================================================
   RabbitMQ启用MQTT功能,需要先安装然RabbitMQ然后再启用MQTT插件。
  

  • 首先我们需要安装并启动RabbitMQ,对RabbitMQ不了解的朋友可以参考《花了3天总结的RabbitMQ实用技巧,有点东西!》;
  • 接下来就是启用RabbitMQ的MQTT插件了,默认是不启用的,使用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_mqtt


  • 开启成功后,查察管理控制台,我们可以发现MQTT服务运行在1883端口上了。

MQTT客户端
==============================================================================
   我们可以使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTBox这个客户端工具。
  

  • 首先下载并安装好MQTTBox,下载地址:http://workswithweb.com/mqttbox.html



  • 点击Create MQTT Client按钮来创建一个MQTT客户端;



  • 接下来对MQTT客户端进行设置,主要是设置好协议端口、毗连用户名密码和QoS即可;



  • 再设置一个订阅者,订阅者订阅testTopicA这个主题,我们会向这个主题发送消息;



  • 发布者向主题中发布消息,订阅者可以实时接收到。

前端直接实现即时通讯
=================================================================================
   既然MQTTBox客户端可以直接通过RabbitMQ实现即时通讯,那我们是不是直接使用前端技术也可以实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简朴的聊天功能,真正不写一行后端代码实现即时通讯!
  

  • 由于RabbitMQ与Web端交互底层使用的是WebSocket,所以我们需要开启RabbitMQ的MQTT WEB支持,使用如下命令开启即可;
rabbitmq-plugins enable rabbitmq_web_mqtt


  • 开启成功后,查察管理控制台,我们可以发现MQTT的WEB服务运行在15675端口上了;



  • WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库,项目地址:https://github.com/mqttjs/MQTT.js



  • 实现的功能非常简朴,一个单聊功能,需要留意的是设置好MQTT服务的访问地址为:ws://localhost:15675/ws
         Title         目标Topic:  
      发送消息:  
    发送    清空       

  • 接下来我们订阅不同的主题开启两个页面测试下功能(页面放在了SpringBoot应用的resource目次下了,需要先启动应用再访问):
  • 第一个订阅主题testTopicA,访问地址:http://localhost:8088/page/index?topic=testTopicA
  • 第二个订阅主题testTopicB,访问地址:http://localhost:8088/page/index?topic=testTopicB
  • 之后互相发送消息,让我们来看看效果吧!

在SpringBoot中使用
=====================================================================================
   没有特别业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但是有时候我们需要通过服务端去关照前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲怎样在SpringBoot应用中使用MQTT。
  

  • 首先我们需要在pom.xml中添加MQTT相干依赖;
    org.springframework.integration    spring-integration-mqtt

  • 在application.yml中添加MQTT相干设置,主要是访问地址、用户名密码、默认主题信息;
rabbitmq:  mqtt:    url: tcp://localhost:1883    username: guest    password: guest    defaultTopic: testTopic


  • 编写一个Java设置类从设置文件中读取设置便于使用;
/** * MQTT相干设置 * Created by macro on 2020/9/15. /@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = “rabbitmq.mqtt”)public class MqttConfig {    /*     * RabbitMQ毗连用户名     /    private String username;    /*     * RabbitMQ毗连密码     /    private String password;    /*     * RabbitMQ的MQTT默认topic     /    private String defaultTopic;    /*     * RabbitMQ的MQTT毗连地址     */    private String url;}


  • 添加MQTT消息订阅者相干设置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处置惩罚订阅消息;
/** * MQTT消息订阅者相干设置 * Created by macro on 2020/9/15. */@Slf4j@Configurationpublic class MqttInboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }    @Bean    public MessageProducer inbound() {        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), “subscriberClient”,                        mqttConfig.getDefaultTopic());        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        //设置消息质量:0->至多一次;1->至少一次;2->只有一次        adapter.setQos(1);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }    @Bean    @ServiceActivator(inputChannel = “mqttInputChannel”)    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message<?> message) throws MessagingException {                //处置惩罚订阅消息                log.info(“handleMessage : {}”,message.getPayload());            }        };    }}


  • 添加MQTT消息发布者相干设置;
/** * MQTT消息发布者相干设置 * Created by macro on 2020/9/15. */@Configurationpublic class MqttOutboundConfig {    @Autowired    private MqttConfig mqttConfig;    @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        MqttConnectOptions options = new MqttConnectOptions();        options.setServerURIs(new String[] { mqttConfig.getUrl()});        options.setUserName(mqttConfig.getUsername());        options.setPassword(mqttConfig.getPassword().toCharArray());        factory.setConnectionOptions(options);        return factory;    }    @Bean    @ServiceActivator(inputChannel = “mqttOutboundChannel”)    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler =                new MqttPahoMessageHandler(“publisherClient”, mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());        return messageHandler;    }    @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }}


  • 添加MQTT网关,用于向主题中发送消息;
/** * MQTT网关,通过接口将数据通报到集成流 * Created by macro on 2020/9/15. /@Component@MessagingGateway(defaultRequestChannel = “mqttOutboundChannel”)public interface MqttGateway {    /*     * 发送消息到默认topic     /    void sendToMqtt(String payload);    /*     * 发送消息到指定topic     /    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);    /*     * 发送消息到指定topic并设置QOS     */    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}


  • 添加MQTT测试接口,使用MQTT网关向特定主题中发送消息;
/** * MQTT测试接口 * Created by macro on 2020/9/15. */@Api(tags = “MqttController”, description = “MQTT测试接口”)@RestController@RequestMapping(“/mqtt”)public class MqttController {    @Autowired    private MqttGateway mqttGateway;    @PostMapping(“/sendToDefaultTopic”)    @ApiOperation(“向默认主题发送消息”)    public CommonResult sendToDefaultTopic(String payload) {        mqttGateway.sendToMqtt(payload);        return CommonResult.success(null);    }    @PostMapping(“/sendToTopic”)    @ApiOperation(“向指定主题发送消息”)    public CommonResult sendToTopic(String payload, String topic) {        mqttGateway.sendToMqtt(payload, topic);        return CommonResult.success(null);    }}


  • 调用接口向主题中发送消息进行测试;


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

汕尾海湾

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表