马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
小伙伴们,你们好呀,我是老寇,跟我一起学习使用Vertx实现MQTT-Server
实现MQTT-Server【相应式】
vertx-mqtt地址
实现思路
1.启动MQTT Server并绑定许多端口记录到缓存,服务注册到Nacos,通过接口的方式获取IP和端口【负载均衡】
2.MQTT Client连接MQTT Server并上报数据
3.MQTT Server吸收到数据并通过MQ转发出去
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
代码比较简单,懒得讲解啦
实现过程
查看源码
kafka安装
接纳docker-compose一键式启动!!!
还没有安装docker朋友,参考文章下面两篇文章
# Ubuntu20.04安装Docker
# Centos7安装Docker 23.0.6- services:
- kafka:
- image: bitnami/kafka:4.0.0
- container_name: kafka
- tty: true
- ports:
- - '9092:9092'
- - '9093:9093'
- environment:
- # 节点ID
- - KAFKA_BROKER_ID=1
- # 允许使用kraft,即Kafka替代Zookeeper
- - KAFKA_ENABLE_KRAFT=yes
- # kafka角色,做broker,也要做controller
- - KAFKA_CFG_PROCESS_ROLES=broker,controller
- # 指定供外部使用的控制类请求信息
- - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- # 定义安全协议
- - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- # 定义kafka服务端socket监听端口
- - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- # 外网访问地址
- - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- # 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用
- - ALLOW_PLAINTEXT_LISTENER=yes
- # 设置broker最大内存,和初始内存
- - KAFKA_HEAP_OPTS=-Xmx512M -Xms512M
- # 集群地址
- - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
- # 节点ID
- - KAFKA_CFG_NODE_ID=1
- restart: always
- privileged: true
- networks:
- - laokou_network
- networks:
- laokou_network:
- driver: bridge
复制代码- # 创建topic【进入bin目录执行】 => 每个topic 3个分区和一个副本
- kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_report --partitions 3 --replication-factor 1
- kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic laokou_mqtt_property_reply --partitions 3 --replication-factor 1
复制代码 kafka【相应式】
1.依赖- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>3.3.5</version>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- <version>1.3.23</version>
- </dependency>
复制代码 2.代码
KafkaAutoConfig- /**
- * @author laokou
- */
- @Configuration
- public class KafkaAutoConfig {
- @Bean("defaultKafkaTemplate")
- @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
- public DefaultKafkaTemplate defaultKafkaTemplate(KafkaTemplate<String, String> kafkaTemplate) {
- return new DefaultKafkaTemplate(kafkaTemplate);
- }
- @Bean(value = "reactiveKafkaSender")
- @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
- public KafkaSender reactiveKafkaSender(SenderOptions<String, String> senderOptions) {
- return new ReactiveKafkaSender(
- new reactor.kafka.sender.internals.DefaultKafkaSender<>(ProducerFactory.INSTANCE, senderOptions));
- }
- @Bean
- @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
- public SenderOptions<String, String> senderOptions(KafkaProperties kafkaProperties) {
- Map<String, Object> props = new HashMap<>();
- KafkaProperties.Producer producer = kafkaProperties.getProducer();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
- props.put(ProducerConfig.ACKS_CONFIG, producer.getAcks());
- props.put(ProducerConfig.RETRIES_CONFIG, producer.getRetries());
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, (int) producer.getBatchSize().toBytes());
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, (int) producer.getBufferMemory().toBytes());
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- return SenderOptions.create(props);
- }
- @Bean
- @ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
- public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {
- Map<String, Object> props = new HashMap<>();
- KafkaProperties.Consumer consumer = kafkaProperties.getConsumer();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer.getGroupId());
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumer.getMaxPollRecords());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer.getEnableAutoCommit());
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return ReceiverOptions.create(props);
- }
- }
复制代码 KafkaSender- /**
- * @author laokou
- */
- public interface KafkaSender {
- Flux<Boolean> send(String topic, String payload);
- }
复制代码 ReactiveKafkaSender- /**
- * @author laokou
- */
- @Slf4j
- @RequiredArgsConstructor
- public class ReactiveKafkaSender implements KafkaSender {
- private final DefaultKafkaSender<String, String> defaultKafkaSender;
- @Override
- public Flux<Boolean> send(String topic, String payload) {
- return defaultKafkaSender.send(Mono.just(SenderRecord.create(topic, null, null, null, payload, null)))
- .map(result -> {
- Exception exception = result.exception();
- if (ObjectUtils.isNotNull(exception)) {
- log.error("【Kafka】 => 发送消息失败,错误信息:{}", exception.getMessage(), exception);
- return false;
- }
- else {
- return true;
- }
- });
- }
- }
复制代码 3.yaml设置【自动批量提交】- spring:
- kafka:
- bootstrap-servers: kafka:9092
- consumer:
- group-id: laokou-mqtt
- # 禁用自动提交(按周期)已消费offset
- enable-auto-commit: true
- # 单次poll()调用返回的记录数
- max-poll-records: 50
- key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- producer:
- # 发生错误后,消息重发的次数。
- retries: 5
- # 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
- batch-size: 16384
- # 设置生产者内存缓冲区的大小。
- buffer-memory: 33554432
- # 键的序列化方式
- key-serializer: org.apache.kafka.common.serialization.StringSerializer
- # 值的序列化方式
- value-serializer: org.apache.kafka.common.serialization.StringSerializer
- # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
- # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
- # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
- acks: 0
- listener:
- # 在侦听器容器中运行的线程数。
- concurrency: 5
- # 批量提交模式
- ack-mode: batch
- # 批量batch类型
- type: batch
- # topic不存在报错
- missing-topics-fatal: false
- admin:
- auto-create: false
复制代码 mqtt-server【相应式】
依赖- <dependency>
- <groupId>io.vertx</groupId>
- <artifactId>vertx-mqtt</artifactId>
- <version>4.5.14</version>
- </dependency>
复制代码 VertxConfig- /**
- * @author laokou
- */
- @Configuration
- public class VertxConfig {
- @Bean(destroyMethod = "close")
- public Vertx vertx() {
- VertxOptions vertxOptions = new VertxOptions();
- vertxOptions.setMaxEventLoopExecuteTime(30);
- vertxOptions.setWorkerPoolSize(40);
- vertxOptions.setMaxWorkerExecuteTime(30);
- vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
- vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
- vertxOptions.setPreferNativeTransport(true);
- vertxOptions.setInternalBlockingPoolSize(40);
- vertxOptions.setEventLoopPoolSize(Math.max(32, 2 * CpuCoreSensor.availableProcessors()));
- return Vertx.vertx(vertxOptions);
- }
- }
复制代码 MqttServerProperties【设置了账号和暗码】- /**
- * @author laokou
- */
- @Data
- @Component
- @ConfigurationProperties(prefix = "spring.mqtt-server")
- public class MqttServerProperties {
- private boolean auth = true;
- private String username = "vertx";
- private String password = "laokou123";
- private String host = "0.0.0.0";
- private int port = 0;
- private int threadSize = 32;
- private int maxMessageSize = 8196;
- private boolean isAutoClientId = true;
- private int maxClientIdLength = 30;
- private int timeoutOnConnect = 90;
- private boolean useWebSocket = false;
- private int webSocketMaxFrameSize = 65536;
- private boolean perFrameWebSocketCompressionSupported = true;
- private boolean perMessageWebSocketCompressionSupported = true;
- private int webSocketCompressionLevel = 6;
- private boolean webSocketAllowServerNoContext = false;
- private boolean webSocketPreferredClientNoContext = false;
- private boolean tcpNoDelay = true;
- private boolean tcpKeepAlive = false;
- private int tcpKeepAliveIdleSeconds = -1;
- private int tcpKeepAliveCount = -1;
- private int tcpKeepAliveIntervalSeconds = -1;
- private int soLinger = -1;
- private int idleTimeout = 0;
- private int readIdleTimeout = 0;
- private int writeIdleTimeout = 0;
- private TimeUnit idleTimeoutUnit = TimeUnit.SECONDS;
- private boolean ssl = false;
- private boolean tcpFastOpen = false;
- private boolean tcpCork = false;
- private boolean tcpQuickAck = false;
- private int tcpUserTimeout = 0;
- }
复制代码 VertxMqttServer- /**
- * @author laokou
- */
- @Slf4j
- public final class VertxMqttServer {
- private final Sinks.Many<MqttPublishMessage> messageSink = Sinks.many()
- .multicast()
- .onBackpressureBuffer(Integer.MAX_VALUE, false);
- private volatile Flux<MqttServer> mqttServer;
- private final Vertx vertx;
- private final MqttServerProperties properties;
- private final List<ReactiveMessageHandler> reactiveMessageHandlers;
- private volatile boolean isClosed = false;
- public VertxMqttServer(final Vertx vertx, final MqttServerProperties properties,
- List<ReactiveMessageHandler> reactiveMessageHandlers) {
- this.properties = properties;
- this.vertx = vertx;
- this.reactiveMessageHandlers = reactiveMessageHandlers;
- }
- public Flux<MqttServer> start() {
- return mqttServer = getMqttServerOptions().map(mqttServerOption -> MqttServer.create(vertx, mqttServerOption)
- .exceptionHandler(
- error -> log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,错误信息:{}", error.getMessage(), error))
- .endpointHandler(endpoint -> Optional.ofNullable(authHandler(endpoint))
- .ifPresent(e -> e.closeHandler(close -> log.info("【Vertx-MQTT-Server】 => MQTT客户端断开连接"))
- .subscribeHandler(subscribe -> {
- for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
- log.info("【Vertx-MQTT-Server】 => MQTT客户端订阅主题:{}", topicSubscription.topicName());
- }
- })
- .disconnectHandler(disconnect -> log.info("【Vertx-MQTT-Server】 => MQTT客户端主动断开连接"))
- .pingHandler(ping -> log.info("【Vertx-MQTT-Server】 => MQTT客户端发送心跳"))
- .publishHandler(messageSink::tryEmitNext)
- // 不保留会话
- .accept(false)))
- .listen(mqttServerOption.getPort(), mqttServerOption.getHost(), asyncResult -> {
- if (isClosed) {
- return;
- }
- if (asyncResult.succeeded()) {
- log.info("【Vertx-MQTT-Server】 => MQTT服务启动成功,主机:{},端口:{}", mqttServerOption.getHost(),
- mqttServerOption.getPort());
- // 写入缓存
- PortCache.add(mqttServerOption.getPort());
- }
- else {
- log.error("【Vertx-MQTT-Server】 => MQTT服务启动失败,主机:{},端口:{},错误信息:{}", mqttServerOption.getHost(),
- mqttServerOption.getPort(), asyncResult.cause().getMessage(), asyncResult.cause());
- }
- }));
- }
- public Flux<MqttServer> stop() {
- isClosed = true;
- return mqttServer.doOnNext(server -> server.close(completionHandler -> {
- if (completionHandler.succeeded()) {
- log.info("【Vertx-MQTT-Server】 => MQTT服务停止成功");
- }
- else {
- log.error("【Vertx-MQTT-Server】 => MQTT服务停止失败,错误信息:{}", completionHandler.cause().getMessage(),
- completionHandler.cause());
- }
- }));
- }
- public Flux<Boolean> publish() {
- return messageSink.asFlux().flatMap(message -> {
- // @formatter:off
- // log.info("【Vertx-MQTT-Server】 => MQTT服务接收到消息,主题:{},内容:{}", message.topicName(), message.payload().toString());
- // @formatter:on
- return Flux
- .fromStream(reactiveMessageHandlers.stream()
- .filter(reactiveMessageHandler -> reactiveMessageHandler.isSubscribe(message.topicName())))
- .flatMap(reactiveMessageHandler -> reactiveMessageHandler
- .handle(new MqttMessage(message.payload(), message.topicName())));
- });
- }
- private int detectAvailablePort(String host) {
- try (ServerSocket socket = SSLServerSocketFactory.getDefault().createServerSocket()) {
- socket.bind(new InetSocketAddress(host, properties.getPort()));
- return socket.getLocalPort();
- }
- catch (IOException e) {
- throw new RuntimeException("Port auto-detection failed", e);
- }
- }
- private Flux<MqttServerOptions> getMqttServerOptions() {
- return Flux.range(1, Math.max(properties.getThreadSize(), CpuCoreSensor.availableProcessors()))
- .map(item -> getMqttServerOption());
- }
- /**
- * 认证.
- */
- private MqttEndpoint authHandler(MqttEndpoint endpoint) {
- MqttAuth mqttAuth = endpoint.auth();
- if (properties.isAuth()) {
- if (ObjectUtils.isNull(mqttAuth)) {
- endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
- return null;
- }
- if (!ObjectUtils.equals(mqttAuth.getUsername(), properties.getUsername())
- || !ObjectUtils.equals(mqttAuth.getPassword(), properties.getPassword())) {
- endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
- return null;
- }
- }
- return endpoint;
- }
- // @formatter:off
- private MqttServerOptions getMqttServerOption() {
- MqttServerOptions mqttServerOptions = new MqttServerOptions();
- mqttServerOptions.setHost(properties.getHost());
- mqttServerOptions.setPort(detectAvailablePort(properties.getHost()));
- mqttServerOptions.setMaxMessageSize(properties.getMaxMessageSize());
- mqttServerOptions.setAutoClientId(properties.isAutoClientId());
- mqttServerOptions.setMaxClientIdLength(properties.getMaxClientIdLength());
- mqttServerOptions.setTimeoutOnConnect(properties.getTimeoutOnConnect());
- mqttServerOptions.setUseWebSocket(properties.isUseWebSocket());
- mqttServerOptions.setWebSocketMaxFrameSize(properties.getWebSocketMaxFrameSize());
- mqttServerOptions.setPerFrameWebSocketCompressionSupported(properties.isPerFrameWebSocketCompressionSupported());
- mqttServerOptions.setPerMessageWebSocketCompressionSupported(properties.isPerMessageWebSocketCompressionSupported());
- mqttServerOptions.setWebSocketCompressionLevel(properties.getWebSocketCompressionLevel());
- mqttServerOptions.setWebSocketAllowServerNoContext(properties.isWebSocketAllowServerNoContext());
- mqttServerOptions.setWebSocketPreferredClientNoContext(properties.isWebSocketPreferredClientNoContext());
- mqttServerOptions.setTcpNoDelay(properties.isTcpNoDelay());
- mqttServerOptions.setTcpKeepAlive(properties.isTcpKeepAlive());
- mqttServerOptions.setTcpKeepAliveIdleSeconds(properties.getTcpKeepAliveIdleSeconds());
- mqttServerOptions.setTcpKeepAliveIntervalSeconds(properties.getTcpKeepAliveIntervalSeconds());
- mqttServerOptions.setTcpKeepAliveCount(properties.getTcpKeepAliveCount());
- mqttServerOptions.setSoLinger(properties.getSoLinger());
- mqttServerOptions.setIdleTimeout(properties.getIdleTimeout());
- mqttServerOptions.setReadIdleTimeout(properties.getReadIdleTimeout());
- mqttServerOptions.setWriteIdleTimeout(properties.getWriteIdleTimeout());
- mqttServerOptions.setIdleTimeoutUnit(properties.getIdleTimeoutUnit());
- mqttServerOptions.setSsl(properties.isSsl());
- mqttServerOptions.setTcpFastOpen(properties.isTcpFastOpen());
- mqttServerOptions.setTcpCork(properties.isTcpCork());
- mqttServerOptions.setTcpQuickAck(properties.isTcpQuickAck());
- mqttServerOptions.setTcpUserTimeout(properties.getTcpUserTimeout());
- return mqttServerOptions;
- }
- // @formatter:on
- }
复制代码 PortCache【缓存端口】- /**
- * @author laokou
- */
- public final class PortCache {
- private PortCache() {
- }
- public static final List<Integer> PORTS = new CopyOnWriteArrayList<>();
- public static void add(int port) {
- PORTS.add(port);
- }
- public static List<Integer> get() {
- return PORTS;
- }
- public static void clear() {
- PORTS.clear();
- }
- }
复制代码 ReactiveMessageHandler【消息处理,没啥好说的,就是用来转发消息到MQ】- /**
- * @author laokou
- */
- public interface ReactiveMessageHandler {
- boolean isSubscribe(String topic);
- Flux<Boolean> handle(MqttMessage mqttMessage);
- }
复制代码- /**
- * 属性回复消息处理器.
- *
- * @author laokou
- */
- @Component
- @RequiredArgsConstructor
- public class ReactivePropertyReplyMessageHandler implements ReactiveMessageHandler {
- private final KafkaSender kafkaSender;
- @Override
- public boolean isSubscribe(String topic) {
- return TopicUtils.match("/+/+/property/reply", topic);
- }
- @Override
- public Flux<Boolean> handle(MqttMessage mqttMessage) {
- return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPLY, mqttMessage.getPayload().toString());
- }
- }
复制代码- /**
- * 属性上报消息处理.
- *
- * @author laokou
- */
- @Component
- @RequiredArgsConstructor
- public class ReactivePropertyReportMessageHandler implements ReactiveMessageHandler {
- private final KafkaSender kafkaSender;
- @Override
- public boolean isSubscribe(String topic) {
- return TopicUtils.match("/+/+/property/report", topic);
- }
- @Override
- public Flux<Boolean> handle(MqttMessage mqttMessage) {
- return kafkaSender.send(LAOKOU_MQTT_PROPERTY_REPORT, mqttMessage.getPayload().toString());
- }
- }
复制代码 设置yaml- spring:
- application:
- name: ${SERVICE_ID:laokou-mqtt}
- threads:
- virtual:
- enabled: true
- mqtt-server:
- auth: true
- username: vertx
- password: laokou123
- # 开启8196个端口
- thread-size: 8196
复制代码 启动MQTT-Server- /**
- * @author laokou
- */
- @Slf4j
- @EnableDiscoveryClient
- @RequiredArgsConstructor
- @EnableConfigurationProperties
- @SpringBootApplication(scanBasePackages = "org.laokou")
- public class MqttServerApp implements CommandLineRunner {
- private final Vertx vertx;
- private final MqttServerProperties properties;
- private final List<ReactiveMessageHandler> reactiveMessageHandlers;
- private final ExecutorService virtualThreadExecutor;
- @Override
- public void run(String... args) {
- virtualThreadExecutor.execute(this::listenMessage);
- }
- private void listenMessage() {
- VertxMqttServer vertxMqttServer = new VertxMqttServer(vertx, properties, reactiveMessageHandlers);
- // 启动服务
- vertxMqttServer.start().subscribeOn(Schedulers.boundedElastic()).subscribe();
- // 推送数据
- vertxMqttServer.publish().subscribeOn(Schedulers.boundedElastic()).subscribe();
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- // 清除缓存
- PortCache.clear();
- // 停止服务
- vertxMqttServer.stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
- }));
- }
- }
复制代码 启动好之后,请自行测试,这个东西没啥好说,vertx帮我们都实现了,就是简单调用API,自己玩吧~
我是老寇,我们下次再见啦~
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |