物联网之使用Vertx实现MQTT-Server最佳实践【相应式】

打印 上一主题 下一主题

主题 1970|帖子 1970|积分 5910

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现MQTT-Server
实现MQTT-Server【相应式】

vertx-mqtt地址
实现思路

1.启动MQTT Server并绑定许多端口记录到缓存,服务注册到Nacos,通过接口的方式获取IP和端口【负载均衡】
2.MQTT Client连接MQTT Server并上报数据
3.MQTT Server吸收到数据并通过MQ转发出去
代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

代码比较简单,懒得讲解啦

实现过程

查看源码
kafka安装

接纳docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6
  1. services:
  2.     kafka:
  3.       image: bitnami/kafka:4.0.0
  4.       container_name: kafka
  5.       tty: true
  6.       ports:
  7.         - '9092:9092'
  8.         - '9093:9093'
  9.       environment:
  10.         # 节点ID
  11.         - KAFKA_BROKER_ID=1
  12.         # 允许使用kraft,即Kafka替代Zookeeper
  13.         - KAFKA_ENABLE_KRAFT=yes
  14.         # kafka角色,做broker,也要做controller
  15.         - KAFKA_CFG_PROCESS_ROLES=broker,controller
  16.         # 指定供外部使用的控制类请求信息
  17.         - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  18.         # 定义安全协议
  19.         - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
  20.         # 定义kafka服务端socket监听端口
  21.         - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
  22.         # 外网访问地址
  23.         - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
  24.         # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
  25.         - ALLOW_PLAINTEXT_LISTENER=yes
  26.         # 设置broker最大内存,和初始内存
  27.         - KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
  28.         # 集群地址
  29.         - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
  30.         # 节点ID
  31.         - KAFKA_CFG_NODE_ID=1
  32.       restart: always
  33.       privileged: true
  34.       networks:
  35.         - laokou_network
  36. networks:
  37.     laokou_network:
  38.         driver: bridge
复制代码
  1. # 创建topic【进入bin目录执行】 => 每个topic 3个分区和一个副本
  2. kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_report --partitions 3 --replication-factor 1
  3. kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_reply --partitions 3 --replication-factor 1
复制代码
kafka【相应式】

1.依赖
  1. <dependency>
  2.   <groupId>org.springframework.kafka</groupId>
  3.   <artifactId>spring-kafka</artifactId>
  4.   <version>3.3.5</version>
  5. </dependency>
  6. <dependency>
  7.   <groupId>io.projectreactor.kafka</groupId>
  8.   <artifactId>reactor-kafka</artifactId>
  9.   <version>1.3.23</version>
  10. </dependency>
复制代码
2.代码
KafkaAutoConfig
  1. /**
  2. * @author laokou
  3. */
  4. @Configuration
  5. public class KafkaAutoConfig {
  6.     @Bean("defaultKafkaTemplate")
  7.     @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
  8.     public DefaultKafkaTemplate defaultKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
  9.        return new DefaultKafkaTemplate(kafkaTemplate);
  10.     }
  11.     @Bean(value = "reactiveKafkaSender")
  12.     @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
  13.     public KafkaSender reactiveKafkaSender(SenderOptions<String, String> senderOptions) {
  14.        return new ReactiveKafkaSender(
  15.              new reactor.kafka.sender.internals.DefaultKafkaSender<>(ProducerFactory.INSTANCE, senderOptions));
  16.     }
  17.     @Bean
  18.     @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
  19.     public SenderOptions<String, String> senderOptions(KafkaProperties kafkaProperties) {
  20.        Map<String, Object> props = new HashMap<>();
  21.        KafkaProperties.Producer producer = kafkaProperties.getProducer();
  22.        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  23.        props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
  24.        props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
  25.        props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) producer.getBatchSize().toBytes());
  26.        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (int) producer.getBufferMemory().toBytes());
  27.        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  28.        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  29.        return SenderOptions.create(props);
  30.     }
  31.     @Bean
  32.     @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
  33.     public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {
  34.        Map<String, Object> props = new HashMap<>();
  35.        KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
  36.        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  37.        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
  38.        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.getMaxPollRecords());
  39.        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
  40.        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  41.        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  42.        return ReceiverOptions.create(props);
  43.     }
  44. }
复制代码
KafkaSender
  1. /**
  2. * @author laokou
  3. */
  4. public interface KafkaSender {
  5.     Flux<Boolean> send(String topic, String payload);
  6. }
复制代码
ReactiveKafkaSender
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. @RequiredArgsConstructor
  6. public class ReactiveKafkaSender implements KafkaSender {
  7.     private final DefaultKafkaSender<String, String> defaultKafkaSender;
  8.     @Override
  9.     public Flux<Boolean> send(String topic, String payload) {
  10.        return defaultKafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, payload, null)))
  11.           .map(result -> {
  12.              Exception exception = result.exception();
  13.              if (ObjectUtils.isNotNull(exception)) {
  14.                 log.error("【Kafka】 => 发送消息失败,错误信息:{}", exception.getMessage(), exception);
  15.                 return false;
  16.              }
  17.              else {
  18.                 return true;
  19.              }
  20.           });
  21.     }
  22. }
复制代码
3.yaml设置【自动批量提交】
  1. spring:
  2.   kafka:
  3.     bootstrap-servers: kafka:9092
  4.     consumer:
  5.       group-id: laokou-mqtt
  6.       # 禁用自动提交(按周期)已消费offset
  7.       enable-auto-commit: true
  8.       # 单次poll()调用返回的记录数
  9.       max-poll-records: 50
  10.       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11.       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12.     producer:
  13.       # 发生错误后,消息重发的次数。
  14.       retries: 5
  15.       # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
  16.       batch-size: 16384
  17.       # 设置生产者内存缓冲区的大小。
  18.       buffer-memory: 33554432
  19.       # 键的序列化方式
  20.       key-serializer: org.apache.kafka.common.serialization.StringSerializer
  21.       # 值的序列化方式
  22.       value-serializer: org.apache.kafka.common.serialization.StringSerializer
  23.       # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
  24.       # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
  25.       # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
  26.       acks: 0
  27.     listener:
  28.       # 在侦听器容器中运行的线程数。
  29.       concurrency: 5
  30.       # 批量提交模式
  31.       ack-mode: batch
  32.       # 批量batch类型
  33.       type: batch
  34.       # topic不存在报错
  35.       missing-topics-fatal: false
  36.     admin:
  37.       auto-create: false
复制代码
mqtt-server【相应式】

依赖
  1. <dependency>
  2.   <groupId>io.vertx</groupId>
  3.   <artifactId>vertx-mqtt</artifactId>
  4.   <version>4.5.14</version>
  5. </dependency>
复制代码
VertxConfig
  1. /**
  2. * @author laokou
  3. */
  4. @Configuration
  5. public class VertxConfig {
  6.     @Bean(destroyMethod = "close")
  7.     public Vertx vertx() {
  8.        VertxOptions vertxOptions = new VertxOptions();
  9.        vertxOptions.setMaxEventLoopExecuteTime(30);
  10.        vertxOptions.setWorkerPoolSize(40);
  11.        vertxOptions.setMaxWorkerExecuteTime(30);
  12.        vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
  13.        vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
  14.        vertxOptions.setPreferNativeTransport(true);
  15.        vertxOptions.setInternalBlockingPoolSize(40);
  16.        vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));
  17.        return Vertx.vertx(vertxOptions);
  18.     }
  19. }
复制代码
MqttServerProperties【设置了账号和暗码】
  1. /**
  2. * @author laokou
  3. */
  4. @Data
  5. @Component
  6. @ConfigurationProperties(prefix = "spring.mqtt-server")
  7. public class MqttServerProperties {
  8.     private boolean auth = true;
  9.     private String username = "vertx";
  10.     private String password = "laokou123";
  11.     private String host = "0.0.0.0";
  12.     private int port = 0;
  13.     private int threadSize = 32;
  14.     private int maxMessageSize = 8196;
  15.     private boolean isAutoClientId = true;
  16.     private int maxClientIdLength = 30;
  17.     private int timeoutOnConnect = 90;
  18.     private boolean useWebSocket = false;
  19.     private int webSocketMaxFrameSize = 65536;
  20.     private boolean perFrameWebSocketCompressionSupported = true;
  21.     private boolean perMessageWebSocketCompressionSupported = true;
  22.     private int webSocketCompressionLevel = 6;
  23.     private boolean webSocketAllowServerNoContext = false;
  24.     private boolean webSocketPreferredClientNoContext = false;
  25.     private boolean tcpNoDelay = true;
  26.     private boolean tcpKeepAlive = false;
  27.     private int tcpKeepAliveIdleSeconds = -1;
  28.     private int tcpKeepAliveCount = -1;
  29.     private int tcpKeepAliveIntervalSeconds = -1;
  30.     private int soLinger = -1;
  31.     private int idleTimeout = 0;
  32.     private int readIdleTimeout = 0;
  33.     private int writeIdleTimeout = 0;
  34.     private TimeUnit idleTimeoutUnit = TimeUnit.SECONDS;
  35.     private boolean ssl = false;
  36.     private boolean tcpFastOpen = false;
  37.     private boolean tcpCork = false;
  38.     private boolean tcpQuickAck = false;
  39.     private int tcpUserTimeout = 0;
  40. }
复制代码
VertxMqttServer
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. public final class VertxMqttServer {
  6.     private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many()
  7.        .multicast()
  8.        .onBackpressureBuffer(Integer.MAX_VALUE, false);
  9.     private volatile Flux<MqttServer> mqttServer;
  10.     private final Vertx vertx;
  11.     private final MqttServerProperties properties;
  12.     private final List<ReactiveMessageHandler> reactiveMessageHandlers;
  13.     private volatile boolean isClosed = false;
  14.     public VertxMqttServer(final Vertx vertx, final MqttServerProperties properties,
  15.           List<ReactiveMessageHandler> reactiveMessageHandlers) {
  16.        this.properties = properties;
  17.        this.vertx = vertx;
  18.        this.reactiveMessageHandlers = reactiveMessageHandlers;
  19.     }
  20.     public Flux<MqttServer> start() {
  21.        return mqttServer = getMqttServerOptions().map(mqttServerOption -> MqttServer.create(vertx, mqttServerOption)
  22.           .exceptionHandler(
  23.                 error -> log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,错误信息:{}", error.getMessage(), error))
  24.           .endpointHandler(endpoint -> Optional.ofNullable(authHandler(endpoint))
  25.              .ifPresent(e -> e.closeHandler(close -> log.info("【Vertx-MQTT-Server】 => MQTT客户端断开连接"))
  26.                 .subscribeHandler(subscribe -> {
  27.                    for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
  28.                       log.info("【Vertx-MQTT-Server】 => MQTT客户端订阅主题:{}", topicSubscription.topicName());
  29.                    }
  30.                 })
  31.                 .disconnectHandler(disconnect -> log.info("【Vertx-MQTT-Server】 => MQTT客户端主动断开连接"))
  32.                 .pingHandler(ping -> log.info("【Vertx-MQTT-Server】 => MQTT客户端发送心跳"))
  33.                 .publishHandler(messageSink::tryEmitNext)
  34.                 // 不保留会话
  35.                 .accept(false)))
  36.           .listen(mqttServerOption.getPort(), mqttServerOption.getHost(), asyncResult -> {
  37.              if (isClosed) {
  38.                 return;
  39.              }
  40.              if (asyncResult.succeeded()) {
  41.                 log.info("【Vertx-MQTT-Server】 => MQTT服务启动成功,主机:{},端口:{}", mqttServerOption.getHost(),
  42.                       mqttServerOption.getPort());
  43.                 // 写入缓存
  44.                 PortCache.add(mqttServerOption.getPort());
  45.              }
  46.              else {
  47.                 log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,主机:{},端口:{},错误信息:{}", mqttServerOption.getHost(),
  48.                       mqttServerOption.getPort(), asyncResult.cause().getMessage(), asyncResult.cause());
  49.              }
  50.           }));
  51.     }
  52.     public Flux<MqttServer> stop() {
  53.        isClosed = true;
  54.        return mqttServer.doOnNext(server -> server.close(completionHandler -> {
  55.           if (completionHandler.succeeded()) {
  56.              log.info("【Vertx-MQTT-Server】 => MQTT服务停止成功");
  57.           }
  58.           else {
  59.              log.error("【Vertx-MQTT-Server】 => MQTT服务停止失败,错误信息:{}", completionHandler.cause().getMessage(),
  60.                    completionHandler.cause());
  61.           }
  62.        }));
  63.     }
  64.     public Flux<Boolean> publish() {
  65.        return messageSink.asFlux().flatMap(message -> {
  66.           // @formatter:off
  67.              // log.info("【Vertx-MQTT-Server】 => MQTT服务接收到消息,主题:{},内容:{}", message.topicName(), message.payload().toString());
  68.              // @formatter:on
  69.           return Flux
  70.              .fromStream(reactiveMessageHandlers.stream()
  71.                 .filter(reactiveMessageHandler -> reactiveMessageHandler.isSubscribe(message.topicName())))
  72.              .flatMap(reactiveMessageHandler -> reactiveMessageHandler
  73.                 .handle(new MqttMessage(message.payload(), message.topicName())));
  74.        });
  75.     }
  76.     private int detectAvailablePort(String host) {
  77.        try (ServerSocket socket = SSLServerSocketFactory.getDefault().createServerSocket()) {
  78.           socket.bind(new InetSocketAddress(host, properties.getPort()));
  79.           return socket.getLocalPort();
  80.        }
  81.        catch (IOException e) {
  82.           throw new RuntimeException("Port auto-detection failed", e);
  83.        }
  84.     }
  85.     private Flux<MqttServerOptions> getMqttServerOptions() {
  86.        return Flux.range(1, Math.max(properties.getThreadSize(), CpuCoreSensor.availableProcessors()))
  87.           .map(item -> getMqttServerOption());
  88.     }
  89.     /**
  90.      * 认证.
  91.      */
  92.     private MqttEndpoint authHandler(MqttEndpoint endpoint) {
  93.        MqttAuth mqttAuth = endpoint.auth();
  94.        if (properties.isAuth()) {
  95.           if (ObjectUtils.isNull(mqttAuth)) {
  96.              endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
  97.              return null;
  98.           }
  99.           if (!ObjectUtils.equals(mqttAuth.getUsername(), properties.getUsername())
  100.                 || !ObjectUtils.equals(mqttAuth.getPassword(), properties.getPassword())) {
  101.              endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
  102.              return null;
  103.           }
  104.        }
  105.        return endpoint;
  106.     }
  107.     // @formatter:off
  108.     private MqttServerOptions getMqttServerOption() {
  109.        MqttServerOptions mqttServerOptions = new MqttServerOptions();
  110.        mqttServerOptions.setHost(properties.getHost());
  111.        mqttServerOptions.setPort(detectAvailablePort(properties.getHost()));
  112.        mqttServerOptions.setMaxMessageSize(properties.getMaxMessageSize());
  113.        mqttServerOptions.setAutoClientId(properties.isAutoClientId());
  114.        mqttServerOptions.setMaxClientIdLength(properties.getMaxClientIdLength());
  115.        mqttServerOptions.setTimeoutOnConnect(properties.getTimeoutOnConnect());
  116.        mqttServerOptions.setUseWebSocket(properties.isUseWebSocket());
  117.        mqttServerOptions.setWebSocketMaxFrameSize(properties.getWebSocketMaxFrameSize());
  118.        mqttServerOptions.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
  119.        mqttServerOptions.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
  120.        mqttServerOptions.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
  121.        mqttServerOptions.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
  122.        mqttServerOptions.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
  123.        mqttServerOptions.setTcpNoDelay(properties.isTcpNoDelay());
  124.        mqttServerOptions.setTcpKeepAlive(properties.isTcpKeepAlive());
  125.        mqttServerOptions.setTcpKeepAliveIdleSeconds(properties.getTcpKeepAliveIdleSeconds());
  126.        mqttServerOptions.setTcpKeepAliveIntervalSeconds(properties.getTcpKeepAliveIntervalSeconds());
  127.        mqttServerOptions.setTcpKeepAliveCount(properties.getTcpKeepAliveCount());
  128.        mqttServerOptions.setSoLinger(properties.getSoLinger());
  129.        mqttServerOptions.setIdleTimeout(properties.getIdleTimeout());
  130.        mqttServerOptions.setReadIdleTimeout(properties.getReadIdleTimeout());
  131.        mqttServerOptions.setWriteIdleTimeout(properties.getWriteIdleTimeout());
  132.        mqttServerOptions.setIdleTimeoutUnit(properties.getIdleTimeoutUnit());
  133.        mqttServerOptions.setSsl(properties.isSsl());
  134.        mqttServerOptions.setTcpFastOpen(properties.isTcpFastOpen());
  135.        mqttServerOptions.setTcpCork(properties.isTcpCork());
  136.        mqttServerOptions.setTcpQuickAck(properties.isTcpQuickAck());
  137.        mqttServerOptions.setTcpUserTimeout(properties.getTcpUserTimeout());
  138.        return mqttServerOptions;
  139.     }
  140.     // @formatter:on
  141. }
复制代码
PortCache【缓存端口】
  1. /**
  2. * @author laokou
  3. */
  4. public final class PortCache {
  5.     private PortCache() {
  6.     }
  7.     public static final List<Integer> PORTS = new CopyOnWriteArrayList<>();
  8.     public static void add(int port) {
  9.        PORTS.add(port);
  10.     }
  11.     public static List<Integer> get() {
  12.        return PORTS;
  13.     }
  14.     public static void clear() {
  15.        PORTS.clear();
  16.     }
  17. }
复制代码
ReactiveMessageHandler【消息处理,没啥好说的,就是用来转发消息到MQ】
  1. /**
  2. * @author laokou
  3. */
  4. public interface ReactiveMessageHandler {
  5.     boolean isSubscribe(String topic);
  6.     Flux<Boolean> handle(MqttMessage mqttMessage);
  7. }
复制代码
  1. /**
  2. * 属性回复消息处理器.
  3. *
  4. * @author laokou
  5. */
  6. @Component
  7. @RequiredArgsConstructor
  8. public class ReactivePropertyReplyMessageHandler implements ReactiveMessageHandler {
  9.     private final KafkaSender kafkaSender;
  10.     @Override
  11.     public boolean isSubscribe(String topic) {
  12.        return TopicUtils.match("/+/+/property/reply", topic);
  13.     }
  14.     @Override
  15.     public Flux<Boolean> handle(MqttMessage mqttMessage) {
  16.        return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPLY, mqttMessage.getPayload().toString());
  17.     }
  18. }
复制代码
  1. /**
  2. * 属性上报消息处理.
  3. *
  4. * @author laokou
  5. */
  6. @Component
  7. @RequiredArgsConstructor
  8. public class ReactivePropertyReportMessageHandler implements ReactiveMessageHandler {
  9.     private final KafkaSender kafkaSender;
  10.     @Override
  11.     public boolean isSubscribe(String topic) {
  12.        return TopicUtils.match("/+/+/property/report", topic);
  13.     }
  14.     @Override
  15.     public Flux<Boolean> handle(MqttMessage mqttMessage) {
  16.        return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPORT, mqttMessage.getPayload().toString());
  17.     }
  18. }
复制代码
设置yaml
  1. spring:
  2.   application:
  3.     name: ${SERVICE_ID:laokou-mqtt}
  4.   threads:
  5.     virtual:
  6.       enabled: true
  7.   mqtt-server:
  8.     auth: true
  9.     username: vertx
  10.     password: laokou123
  11.     # 开启8196个端口
  12.     thread-size: 8196
复制代码
启动MQTT-Server
  1. /**
  2. * @author laokou
  3. */
  4. @Slf4j
  5. @EnableDiscoveryClient
  6. @RequiredArgsConstructor
  7. @EnableConfigurationProperties
  8. @SpringBootApplication(scanBasePackages = "org.laokou")
  9. public class MqttServerApp implements CommandLineRunner {
  10.     private final Vertx vertx;
  11.     private final MqttServerProperties properties;
  12.     private final List<ReactiveMessageHandler> reactiveMessageHandlers;
  13.     private final ExecutorService virtualThreadExecutor;
  14.     @Override
  15.     public void run(String... args) {
  16.        virtualThreadExecutor.execute(this::listenMessage);
  17.     }
  18.     private void listenMessage() {
  19.        VertxMqttServer vertxMqttServer = new VertxMqttServer(vertx, properties, reactiveMessageHandlers);
  20.        // 启动服务
  21.        vertxMqttServer.start().subscribeOn(Schedulers.boundedElastic()).subscribe();
  22.        // 推送数据
  23.        vertxMqttServer.publish().subscribeOn(Schedulers.boundedElastic()).subscribe();
  24.        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  25.           // 清除缓存
  26.           PortCache.clear();
  27.           // 停止服务
  28.           vertxMqttServer.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
  29.        }));
  30.     }
  31. }
复制代码
启动好之后,请自行测试,这个东西没啥好说,vertx帮我们都实现了,就是简单调用API,自己玩吧~
我是老寇,我们下次再见啦~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

欢乐狗

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表