手把手教大家写书写一个Mqtt网关

打印 上一主题 下一主题

主题 836|帖子 836|积分 2508

摘要:物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。
本文分享自华为云社区《一文带你掌握物联网mqtt网关搭建背后的技术原理》,作者:张俭。
前言

物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。这篇文章的目的是手把手教大家写书写一个mqtt网关,后端存储支持Kafka/Pulsar,支持mqtt 连接、断链、发送消息、订阅消息。技术选型:

  • Netty java最流行的网络框架
  • netty-codec-mqtt netty的子项目,mqtt编解码插件
  • Pulsar/Kafka 流行的消息中间件作为后端存储
核心pom依赖如下
  1. <dependency>
  2. <groupId>io.netty</groupId>
  3. <artifactId>netty-codec-mqtt</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>io.netty</groupId>
  7. <artifactId>netty-common</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>io.netty</groupId>
  11. <artifactId>netty-transport</artifactId>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.pulsar</groupId>
  15. <artifactId>pulsar-client-original</artifactId>
  16. <version>${pulsar.version}</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.kafka</groupId>
  20. <artifactId>kafka-clients</artifactId>
  21. <version>${kafka.version}</version>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.eclipse.paho</groupId>
  25. <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  26. <version>${mqtt-client.version}</version>
  27. <scope>test</scope>
  28. </dependency>
复制代码
软件参数设计

软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有
MqttServer监听的端口

监听端口的配置即使是写demo也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在java中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用mqtt的默认端口1883。
  1. package io.github.protocol.mqtt.broker.util;
  2. import java.io.IOException;
  3. import java.io.UncheckedIOException;
  4. import java.net.ServerSocket;
  5. public class SocketUtil {
  6. public static int getFreePort() {
  7. try (ServerSocket serverSocket = new ServerSocket(0)) {
  8. return serverSocket.getLocalPort();
  9.         } catch (IOException e) {
  10. throw new UncheckedIOException(e);
  11.         }
  12.     }
  13. }
复制代码
后端存储配置

我们的mqtt网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持Pulsar、Kafka两种类型。定义枚举类如下
  1. public enum ProcessorType {
  2.     KAFKA,
  3.     PULSAR,
  4. }
复制代码
对应的KafkaProcessorConfig、PulsarProcessorConfig比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项
  1. @Setter
  2. @Getter
  3. public class KafkaProcessorConfig {
  4. private String bootstrapServers = "localhost:9092";
  5. public KafkaProcessorConfig() {
  6.     }
  7. }
  8. @Setter
  9. @Getter
  10. public class PulsarProcessorConfig {
  11. private String httpUrl = "http://localhost:8080";
  12. private String serviceUrl = "pulsar://localhost:6650";
  13. public PulsarProcessorConfig() {
  14.     }
  15. }
复制代码
启动netty MqttServer

我们通过netty启动一个mqttServer,添加mqtt解码器
  1. package io.github.protocol.mqtt.broker;
  2. import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
  3. import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
  4. import io.github.protocol.mqtt.broker.processor.MqttProcessor;
  5. import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
  6. import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
  7. import io.github.protocol.mqtt.broker.util.SocketUtil;
  8. import io.netty.bootstrap.ServerBootstrap;
  9. import io.netty.channel.ChannelFuture;
  10. import io.netty.channel.ChannelInitializer;
  11. import io.netty.channel.ChannelOption;
  12. import io.netty.channel.ChannelPipeline;
  13. import io.netty.channel.EventLoopGroup;
  14. import io.netty.channel.nio.NioEventLoopGroup;
  15. import io.netty.channel.socket.SocketChannel;
  16. import io.netty.channel.socket.nio.NioServerSocketChannel;
  17. import io.netty.handler.codec.mqtt.MqttDecoder;
  18. import io.netty.handler.codec.mqtt.MqttEncoder;
  19. import io.netty.handler.logging.LogLevel;
  20. import io.netty.handler.logging.LoggingHandler;
  21. import lombok.extern.slf4j.Slf4j;
  22. @Slf4j
  23. public class MqttServer {
  24. private final MqttServerConfig mqttServerConfig;
  25. public MqttServer() {
  26. this(new MqttServerConfig());
  27.     }
  28. public MqttServer(MqttServerConfig mqttServerConfig) {
  29. this.mqttServerConfig = mqttServerConfig;
  30. if (mqttServerConfig.getPort() == 0) {
  31.             mqttServerConfig.setPort(SocketUtil.getFreePort());
  32.         }
  33.     }
  34. public void start() throws Exception {
  35.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  36.         EventLoopGroup workerGroup = new NioEventLoopGroup();
  37. try {
  38.             ServerBootstrap b = new ServerBootstrap();
  39.             b.group(bossGroup, workerGroup)
  40.                     .channel(NioServerSocketChannel.class)
  41.                     .option(ChannelOption.SO_BACKLOG, 100)
  42.                     .handler(new LoggingHandler(LogLevel.INFO))
  43.                     .childHandler(new ChannelInitializer<SocketChannel>() {
  44.                         @Override
  45. public void initChannel(SocketChannel ch) throws Exception {
  46.                             ChannelPipeline p = ch.pipeline();
  47. // decoder
  48.                             p.addLast(new MqttDecoder());
  49.                             p.addLast(MqttEncoder.INSTANCE);
  50.                         }
  51.                     });
  52. // Start the server.
  53.             ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
  54. // Wait until the server socket is closed.
  55.             f.channel().closeFuture().sync();
  56.         } finally {
  57. // Shut down all event loops to terminate all threads.
  58.             bossGroup.shutdownGracefully();
  59.             workerGroup.shutdownGracefully();
  60.         }
  61.     }
  62. private MqttProcessor processor(MqttServerConfig config) {
  63. return switch (config.getProcessorType()) {
  64. case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
  65. case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());
  66.         };
  67.     }
  68. public int getPort() {
  69. return mqttServerConfig.getPort();
  70.     }
  71. }
复制代码
MqttserverStarter.java

我们写一个简单的main函数用来启动mqttServer,方便调测
  1. package io.github.protocol.mqtt.broker;
  2. public class MqttServerStarter {
  3. public static void main(String[] args) throws Exception {
  4. new MqttServer().start();
  5.     }
  6. }
复制代码
客户端使用eclipse mqtt client进行测试
  1. package io.github.protocol.mqtt;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.eclipse.paho.client.mqttv3.MqttClient;
  4. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5. import org.eclipse.paho.client.mqttv3.MqttException;
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;
  7. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  8. @Log4j2
  9. public class MqttClientPublishExample {
  10. public static void main(String[] args) throws Exception {
  11.         String topic = "MQTT Examples";
  12.         String content = "Message from MqttPublishExample";
  13.         int qos = 2;
  14.         String broker = "tcp://127.0.0.1:1883";
  15.         String clientId = "JavaSample";
  16.         MemoryPersistence persistence = new MemoryPersistence();
  17. try {
  18.             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  19.             MqttConnectOptions connOpts = new MqttConnectOptions();
  20.             connOpts.setCleanSession(true);
  21.             log.info("Connecting to broker: {}", broker);
  22.             sampleClient.connect(connOpts);
  23.             log.info("Connected");
  24.             log.info("Publishing message: {}", content);
  25.             MqttMessage message = new MqttMessage(content.getBytes());
  26.             message.setQos(qos);
  27.             sampleClient.publish(topic, message);
  28.             log.info("Message published");
  29.             sampleClient.disconnect();
  30.             log.info("Disconnected");
  31.             System.exit(0);
  32.         } catch (MqttException me) {
  33.             log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);
  34.         }
  35.     }
  36. }
复制代码
然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了
  1. Connecting to broker: tcp://127.0.0.1:1883
复制代码
这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应
但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应
所以我们需要在接收到Connect后,返回connAck消息。我们创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要重点继承其中的channelRead方法,以及channelInactive方法,用来释放断链时需要释放的资源
  1. package com.github.shoothzj.mqtt;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. import lombok.extern.slf4j.Slf4j;
  5. @Slf4j
  6. public class MqttHandler extends ChannelInboundHandlerAdapter {
  7.     @Override
  8. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  9. super.channelRead(ctx, msg);
  10.     }
  11. }
复制代码
然后把这个handler加入到netty的职责链中,放到解码器的后面
在mqtt handler中插入我们的代码
  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. super.channelRead(ctx, msg);
  4. if (msg instanceof MqttConnectMessage) {
  5.             handleConnect(ctx, (MqttConnectMessage) msg);
  6.         } else {
  7.             log.error("Unsupported type msg [{}]", msg);
  8.         }
  9.     }
  10. private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
  11.         log.info("connect msg is [{}]", connectMessage);
  12.     }
复制代码
打印出connectMessage如下
  1. [MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
复制代码
  1. 通常,mqtt connect message中会包含qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为null,我们先不校验这些消息,直接给客户端返回connack消息,代表连接成功
复制代码
  1. final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
  2.         ctx.channel().writeAndFlush(ackMessage);
复制代码
我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景
附上此阶段的MqttHandler代码
  1. package com.github.shoothzj.mqtt;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectPayload;import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessageBuilders;import lombok.extern.slf4j.Slf4j;import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter {    @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) {            handleConnect(ctx, (MqttConnectMessage) msg);        } else {            log.error("Unsupported type msg [{}]", msg);        }    } private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {        log.info("connect msg is [{}]", connectMessage);        final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();        final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();        final MqttConnectPayload connectPayload = connectMessage.payload();        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
  2.         ctx.channel().writeAndFlush(ackMessage);    }}
复制代码
我们当前把所有的逻辑都放在MqttHandler里面,不方便后续的扩展。抽象出一个MqttProcessor接口来处理具体的请求,MqttHandler负责解析MqttMessage的类型并分发。MqttProcess接口设计如下
  1. package io.github.protocol.mqtt.broker.processor;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.handler.codec.mqtt.MqttConnAckMessage;
  4. import io.netty.handler.codec.mqtt.MqttConnectMessage;
  5. import io.netty.handler.codec.mqtt.MqttMessage;
  6. import io.netty.handler.codec.mqtt.MqttPubAckMessage;
  7. import io.netty.handler.codec.mqtt.MqttPublishMessage;
  8. import io.netty.handler.codec.mqtt.MqttSubAckMessage;
  9. import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  10. import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
  11. import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
  12. public interface MqttProcessor {
  13.     void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
  14.     void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
  15.     void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
  16.     void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
  17.     void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  18.     void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  19.     void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  20.     void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
  21.     void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
  22.     void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
  23.     void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
  24.     void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  25.     void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  26.     void processDisconnect(ChannelHandlerContext ctx) throws Exception;
  27.     void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
  28. }
复制代码
我们允许这些方法抛出异常,当遇到极难处理的故障时,把mqtt连接断掉(如后端存储故障),等待客户端的重连。
MqttHandler中来调用MqttProcessor,相关MqttHandler代码如下
  1.    Preconditions.checkArgument(message instanceof MqttMessage);
  2.         MqttMessage msg = (MqttMessage) message;
  3. try {
  4. if (msg.decoderResult().isFailure()) {
  5.                 Throwable cause = msg.decoderResult().cause();
  6. if (cause instanceof MqttUnacceptableProtocolVersionException) {
  7. // Unsupported protocol version
  8.                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  9. new MqttFixedHeader(MqttMessageType.CONNACK,
  10. false, MqttQoS.AT_MOST_ONCE, false, 0),
  11. new MqttConnAckVariableHeader(
  12.                                     MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,
  13. false), null);
  14.                     ctx.writeAndFlush(connAckMessage);
  15.                     log.error("connection refused due to invalid protocol, client address [{}]",
  16.                             ctx.channel().remoteAddress());
  17.                     ctx.close();
  18. return;
  19.                 } else if (cause instanceof MqttIdentifierRejectedException) {
  20. // ineligible clientId
  21.                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  22. new MqttFixedHeader(MqttMessageType.CONNACK,
  23. false, MqttQoS.AT_MOST_ONCE, false, 0),
  24. new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
  25. false), null);
  26.                     ctx.writeAndFlush(connAckMessage);
  27.                     log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());
  28.                     ctx.close();
  29. return;
  30.                 }
  31. throw new IllegalStateException(msg.decoderResult().cause().getMessage());
  32.             }
  33.             MqttMessageType messageType = msg.fixedHeader().messageType();
  34. if (log.isDebugEnabled()) {
  35.                 log.debug("Processing MQTT Inbound handler message, type={}", messageType);
  36.             }
  37. switch (messageType) {
  38. case CONNECT:
  39.                     Preconditions.checkArgument(msg instanceof MqttConnectMessage);
  40.                     processor.processConnect(ctx, (MqttConnectMessage) msg);
  41. break;
  42. case CONNACK:
  43.                     Preconditions.checkArgument(msg instanceof MqttConnAckMessage);
  44.                     processor.processConnAck(ctx, (MqttConnAckMessage) msg);
  45. break;
  46. case PUBLISH:
  47.                     Preconditions.checkArgument(msg instanceof MqttPublishMessage);
  48.                     processor.processPublish(ctx, (MqttPublishMessage) msg);
  49. break;
  50. case PUBACK:
  51.                     Preconditions.checkArgument(msg instanceof MqttPubAckMessage);
  52.                     processor.processPubAck(ctx, (MqttPubAckMessage) msg);
  53. break;
  54. case PUBREC:
  55.                     processor.processPubRec(ctx, msg);
  56. break;
  57. case PUBREL:
  58.                     processor.processPubRel(ctx, msg);
  59. break;
  60. case PUBCOMP:
  61.                     processor.processPubComp(ctx, msg);
  62. break;
  63. case SUBSCRIBE:
  64.                     Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);
  65.                     processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
  66. break;
  67. case SUBACK:
  68.                     Preconditions.checkArgument(msg instanceof MqttSubAckMessage);
  69.                     processor.processSubAck(ctx, (MqttSubAckMessage) msg);
  70. break;
  71. case UNSUBSCRIBE:
  72.                     Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);
  73.                     processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
  74. break;
  75. case UNSUBACK:
  76.                     Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);
  77.                     processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
  78. break;
  79. case PINGREQ:
  80.                     processor.processPingReq(ctx, msg);
  81. break;
  82. case PINGRESP:
  83.                     processor.processPingResp(ctx, msg);
  84. break;
  85. case DISCONNECT:
  86.                     processor.processDisconnect(ctx);
  87. break;
  88. case AUTH:
  89.                     processor.processAuth(ctx, msg);
  90. break;
  91. default:
  92. throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
  93.             }
  94.         } catch (Throwable ex) {
  95.             ReferenceCountUtil.safeRelease(msg);
  96.             log.error("Exception was caught while processing MQTT message, ", ex);
  97.             ctx.close();
  98.         }
复制代码
这里的代码,主要是针对MqttMessage的不同类型,调用MqttProcessor的不同方法,值得一提的有两点

  • 提前判断了一些解码异常,fast fail
  • 全局捕获异常,并进行断链处理
维护MqttSession

维护Mqtt会话的session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护Mqtt的Session,我们构筑一个AbstractMqttProcessor来维护MqttSession
  1. package io.github.protocol.mqtt.broker.processor;
  2. import io.github.protocol.mqtt.broker.MqttSessionKey;
  3. import io.github.protocol.mqtt.broker.auth.MqttAuth;
  4. import io.github.protocol.mqtt.broker.util.ChannelUtils;
  5. import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
  6. import io.netty.channel.ChannelHandlerContext;
  7. import io.netty.handler.codec.mqtt.MqttConnAckMessage;
  8. import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
  9. import io.netty.handler.codec.mqtt.MqttConnectMessage;
  10. import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
  11. import io.netty.handler.codec.mqtt.MqttFixedHeader;
  12. import io.netty.handler.codec.mqtt.MqttMessage;
  13. import io.netty.handler.codec.mqtt.MqttMessageFactory;
  14. import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
  15. import io.netty.handler.codec.mqtt.MqttMessageType;
  16. import io.netty.handler.codec.mqtt.MqttPubAckMessage;
  17. import io.netty.handler.codec.mqtt.MqttPublishMessage;
  18. import io.netty.handler.codec.mqtt.MqttQoS;
  19. import io.netty.handler.codec.mqtt.MqttSubAckMessage;
  20. import io.netty.handler.codec.mqtt.MqttSubAckPayload;
  21. import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
  22. import io.netty.handler.codec.mqtt.MqttSubscribePayload;
  23. import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
  24. import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
  25. import lombok.extern.slf4j.Slf4j;
  26. import org.apache.commons.lang3.StringUtils;
  27. import java.util.stream.IntStream;
  28. @Slf4j
  29. public abstract class AbstractProcessor implements MqttProcessor {
  30. protected final MqttAuth mqttAuth;
  31. public AbstractProcessor(MqttAuth mqttAuth) {
  32. this.mqttAuth = mqttAuth;
  33.     }
  34.     @Override
  35. public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
  36.         String clientId = msg.payload().clientIdentifier();
  37.         String username = msg.payload().userName();
  38.         byte[] pwd = msg.payload().passwordInBytes();
  39. if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {
  40.             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  41. new MqttFixedHeader(MqttMessageType.CONNACK,
  42. false, MqttQoS.AT_MOST_ONCE, false, 0),
  43. new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
  44. false), null);
  45.             ctx.writeAndFlush(connAckMessage);
  46.             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
  47.             ctx.close();
  48. return;
  49.         }
  50. if (!mqttAuth.connAuth(clientId, username, pwd)) {
  51.             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  52. new MqttFixedHeader(MqttMessageType.CONNACK,
  53. false, MqttQoS.AT_MOST_ONCE, false, 0),
  54. new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
  55. false), null);
  56.             ctx.writeAndFlush(connAckMessage);
  57.             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
  58.             ctx.close();
  59. return;
  60.         }
  61.         MqttSessionKey mqttSessionKey = new MqttSessionKey();
  62.         mqttSessionKey.setUsername(username);
  63.         mqttSessionKey.setClientId(clientId);
  64.         ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);
  65.         log.info("username {} clientId {} remote address {} connected",
  66.                 username, clientId, ctx.channel().remoteAddress());
  67.         onConnect(mqttSessionKey);
  68.         MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
  69. new MqttFixedHeader(MqttMessageType.CONNACK,
  70. false, MqttQoS.AT_MOST_ONCE, false, 0),
  71. new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
  72. null);
  73.         ctx.writeAndFlush(mqttConnectMessage);
  74.     }
  75. protected void onConnect(MqttSessionKey mqttSessionKey) {
  76.     }
  77.     @Override
  78. public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {
  79.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  80. if (mqttSession == null) {
  81.             log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());
  82.             ctx.close();
  83.         }
  84.     }
  85.     @Override
  86. public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {
  87.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  88. if (mqttSession == null) {
  89.             log.error("publish, client address {} not authed", ctx.channel().remoteAddress());
  90.             ctx.close();
  91. return;
  92.         }
  93. if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {
  94.             log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
  95. return;
  96.         }
  97. if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
  98.             log.error("does not support QoS2 protocol. clientId {}, username {} ",
  99.                     mqttSession.getClientId(), mqttSession.getUsername());
  100. return;
  101.         }
  102.         onPublish(ctx, mqttSession, msg);
  103.     }
  104. protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  105.                              MqttPublishMessage msg) throws Exception {
  106.     }
  107.     @Override
  108. public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
  109.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  110. if (mqttSession == null) {
  111.             log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());
  112.             ctx.close();
  113.         }
  114.     }
  115.     @Override
  116. public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  117.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  118. if (mqttSession == null) {
  119.             log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());
  120.             ctx.close();
  121.         }
  122.     }
  123.     @Override
  124. public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  125.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  126. if (mqttSession == null) {
  127.             log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());
  128.             ctx.close();
  129.         }
  130.     }
  131.     @Override
  132. public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  133.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  134. if (mqttSession == null) {
  135.             log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());
  136.             ctx.close();
  137.         }
  138.     }
  139.     @Override
  140. public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
  141.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  142. if (mqttSession == null) {
  143.             log.error("sub, client address {} not authed", ctx.channel().remoteAddress());
  144.             ctx.close();
  145.         }
  146.         onSubscribe(ctx, mqttSession, msg.payload());
  147.         MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,
  148. false, MqttQoS.AT_MOST_ONCE, false, 0);
  149.         IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());
  150.         MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());
  151.         ctx.writeAndFlush(MqttMessageFactory.newMessage(
  152.                 fixedHeader,
  153.                 MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
  154.                 payload));
  155.     }
  156. protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  157.                                MqttSubscribePayload subscribePayload) throws Exception {
  158.     }
  159.     @Override
  160. public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {
  161.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  162. if (mqttSession == null) {
  163.             log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());
  164.             ctx.close();
  165.         }
  166.     }
  167.     @Override
  168. public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
  169.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  170. if (mqttSession == null) {
  171.             log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());
  172.             ctx.close();
  173.         }
  174.     }
  175.     @Override
  176. public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {
  177.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  178. if (mqttSession == null) {
  179.             log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());
  180.             ctx.close();
  181.         }
  182.     }
  183.     @Override
  184. public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  185.         ctx.writeAndFlush(MqttMessageUtil.pingResp());
  186.     }
  187.     @Override
  188. public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  189.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  190. if (mqttSession == null) {
  191.             log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());
  192.             ctx.close();
  193.         }
  194.     }
  195.     @Override
  196. public void processDisconnect(ChannelHandlerContext ctx) throws Exception {
  197.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  198. if (mqttSession == null) {
  199.             log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());
  200.         }
  201.         onDisconnect(mqttSession);
  202.     }
  203. protected void onDisconnect(MqttSessionKey mqttSessionKey) {
  204.     }
  205.     @Override
  206. public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
  207.         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
  208. if (mqttSession == null) {
  209.             log.error("auth, client address {} not authed", ctx.channel().remoteAddress());
  210.             ctx.close();
  211.         }
  212.     }
  213. }
复制代码
可以看到,这里的AbstractProcessor主要是维护了MqttSessionKey,校验MqttSessionKey,并拦截publish中不支持的Qos2、Failure。同时,也影响了mqtt心跳请求。同样的,我们允许在onPublishonSubscribe中抛出异常。
基于消息队列实现的mqtt网关的基础思想也比较简单,简而言之就是,有publish消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个mqtt topic和producer、consumer的对应关系,因为像kafka、pulsar这些消息中间件的消费者都是区分topic的,片段通用代码如下:
  1. protected final ReentrantReadWriteLock.ReadLock rLock;
  2. protected final ReentrantReadWriteLock.WriteLock wLock;
  3. protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
  4. protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
  5. protected final Map<MqttTopicKey, P> producerMap;
  6. protected final Map<MqttTopicKey, C> consumerMap;
  7. public AbstractMqProcessor(MqttAuth mqttAuth) {
  8. super(mqttAuth);
  9.         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  10.         rLock = lock.readLock();
  11.         wLock = lock.writeLock();
  12. this.sessionProducerMap = new HashMap<>();
  13. this.sessionConsumerMap = new HashMap<>();
  14. this.producerMap = new HashMap<>();
  15. this.consumerMap = new HashMap<>();
  16.     }
  17.     @Override
  18. protected void onConnect(MqttSessionKey mqttSessionKey) {
  19.         wLock.lock();
  20. try {
  21.             sessionProducerMap.put(mqttSessionKey, new ArrayList<>());
  22.             sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());
  23.         } finally {
  24.             wLock.unlock();
  25.         }
  26.     }
  27.     @Override
  28. protected void onDisconnect(MqttSessionKey mqttSessionKey) {
  29.         wLock.lock();
  30. try {
  31. // find producers
  32.             List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);
  33. if (produceTopicKeys != null) {
  34. for (MqttTopicKey mqttTopicKey : produceTopicKeys) {
  35.                     P producer = producerMap.get(mqttTopicKey);
  36. if (producer != null) {
  37.                         ClosableUtils.close(producer);
  38.                         producerMap.remove(mqttTopicKey);
  39.                     }
  40.                 }
  41.             }
  42.             sessionProducerMap.remove(mqttSessionKey);
  43.             List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
  44. if (consumeTopicKeys != null) {
  45. for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {
  46.                     C consumer = consumerMap.get(mqttTopicKey);
  47. if (consumer != null) {
  48.                         ClosableUtils.close(consumer);
  49.                         consumerMap.remove(mqttTopicKey);
  50.                     }
  51.                 }
  52.             }
  53.             sessionConsumerMap.remove(mqttSessionKey);
  54.         } finally {
  55.             wLock.unlock();
  56.         }
  57.     }
  58. }
复制代码
kafka processor实现
  1. 由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动KafkaProducer
复制代码
  1. private final KafkaProcessorConfig kafkaProcessorConfig;
  2. private final KafkaProducer<String, ByteBuffer> producer;
  3. public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {
  4. super(mqttAuth);
  5. this.kafkaProcessorConfig = kafkaProcessorConfig;
  6. this.producer = createProducer();
  7.     }
  8. protected KafkaProducer<String, ByteBuffer> createProducer() {
  9.         Properties properties = new Properties();
  10.         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());
  11.         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  12.         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
  13. return new KafkaProducer<>(properties);
  14.     }
复制代码
  1. 处理MqttPublish消息,MqttPublish消息包含如下几个关键参数
复制代码
  1. MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
  2. String topic = publishMessage.variableHeader().topicName();
  3. ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
复制代码
其中

  • qos代表这条消息的质量级别,0没有任何保障,1代表至少一次,2代表恰好一次。当前仅支持qos0、qos1
  • topicName就是topic的名称
  • ByteBuffer就是消息的内容
根据topic、qos发送消息,代码如下
  1.   String topic = msg.variableHeader().topicName();
  2.         ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());
  3. switch (msg.fixedHeader().qosLevel()) {
  4. case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {
  5. if (exception != null) {
  6.                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);
  7. return;
  8.                 }
  9.                 log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
  10.                         mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());
  11.             });
  12. case AT_LEAST_ONCE -> {
  13. try {
  14.                     RecordMetadata recordMetadata = producer.send(record).get();
  15.                     log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
  16.                             mqttSessionKey, recordMetadata.topic(),
  17.                             recordMetadata.partition(), recordMetadata.offset());
  18.                     ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));
  19.                 } catch (Exception e) {
  20.                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);
  21.                 }
  22.             }
  23. case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
  24.                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
  25.         }
复制代码
处理订阅消息,我们暂时仅根据订阅的topic,创建topic进行消费即可,由于kafka原生客户端建议的消费代码模式如下
  1. while (true) {
  2.   ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
  3. for (ConsumerRecord<String, byte[]> record : records) {
  4. // do logic
  5.   }
  6. }
复制代码
我们需要切换到其他线程对consumer进行消息,书写一个KafkaConsumerListenerWrapper的wrapper,转换为listener异步消费模型
  1. package io.github.protocol.mqtt.broker.processor;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.admin.AdminClient;
  4. import org.apache.kafka.clients.admin.AdminClientConfig;
  5. import org.apache.kafka.clients.admin.KafkaAdminClient;
  6. import org.apache.kafka.clients.admin.NewTopic;
  7. import org.apache.kafka.clients.admin.TopicDescription;
  8. import org.apache.kafka.clients.consumer.ConsumerConfig;
  9. import org.apache.kafka.clients.consumer.ConsumerRecord;
  10. import org.apache.kafka.clients.consumer.ConsumerRecords;
  11. import org.apache.kafka.clients.consumer.KafkaConsumer;
  12. import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
  13. import org.apache.kafka.common.errors.WakeupException;
  14. import org.apache.kafka.common.serialization.ByteArrayDeserializer;
  15. import org.apache.kafka.common.serialization.StringDeserializer;
  16. import java.time.Duration;
  17. import java.util.Collections;
  18. import java.util.Properties;
  19. import java.util.concurrent.ExecutionException;
  20. @Slf4j
  21. public class KafkaConsumerListenerWrapper implements AutoCloseable {
  22. private final AdminClient adminClient;
  23. private final KafkaConsumer<String, byte[]> consumer;
  24. public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {
  25.         Properties adminProperties = new Properties();
  26.         adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
  27. this.adminClient = KafkaAdminClient.create(adminProperties);
  28.         Properties properties = new Properties();
  29.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
  30.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);
  31.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  32.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  33. this.consumer = new KafkaConsumer<>(properties);
  34.     }
  35. public void start(String topic, KafkaMessageListener listener) throws Exception {
  36. try {
  37.             TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))
  38.                     .values().get(topic).get();
  39.             log.info("topic info is {}", topicDescription);
  40.         } catch (ExecutionException ee) {
  41. if (ee.getCause() instanceof UnknownTopicOrPartitionException) {
  42.                 log.info("topic {} not exist, create it", topic);
  43.                 adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
  44.             } else {
  45.                 log.error("find topic info {} error", topic, ee);
  46.             }
  47.         } catch (Exception e) {
  48. throw new IllegalStateException("find topic info error", e);
  49.         }
  50.         consumer.subscribe(Collections.singletonList(topic));
  51.         log.info("consumer topic {} start", topic);
  52. new Thread(() -> {
  53. try {
  54. while (true) {
  55.                     ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
  56. for (ConsumerRecord<String, byte[]> record : records) {
  57.                         listener.messageReceived(record);
  58.                     }
  59.                 }
  60.             } catch (WakeupException we) {
  61.                 consumer.close();
  62.             } catch (Exception e) {
  63.                 log.error("consumer topic {} consume error", topic, e);
  64.                 consumer.close();
  65.             }
  66.         }).start();
  67.         Thread.sleep(5_000);
  68.     }
  69.     @Override
  70. public void close() throws Exception {
  71.         log.info("wake up {} consumer", consumer);
  72.         consumer.wakeup();
  73.     }
  74. }
  75.     @Override
  76. protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  77.                                MqttSubscribePayload subscribePayload) throws Exception {
  78. for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
  79.             KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());
  80.             subscribe(ctx, consumer, topicSubscription.topicName());
  81.         }
  82.     }
  83. private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {
  84.         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  85.         mqttTopicKey.setTopic(topic);
  86.         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  87.         wLock.lock();
  88. try {
  89.             KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
  90. if (consumer == null) {
  91.                 consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());
  92.                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  93. if (mqttTopicKeys == null) {
  94.                         mqttTopicKeys = new ArrayList<>();
  95.                     }
  96.                     mqttTopicKeys.add(mqttTopicKey);
  97. return mqttTopicKeys;
  98.                 });
  99.                 consumerMap.put(mqttTopicKey, consumer);
  100.             }
  101. return consumer;
  102.         } finally {
  103.             wLock.unlock();
  104.         }
  105.     }
  106. protected void subscribe(ChannelHandlerContext ctx,
  107.                              KafkaConsumerListenerWrapper consumer, String topic) throws Exception {
  108.         BoundInt boundInt = new BoundInt(65535);
  109.         consumer.start(topic, record -> {
  110.             log.info("receive message from kafka, topic {}, partition {}, offset {}",
  111.                     record.topic(), record.partition(), record.offset());
  112.             MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
  113.                     MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());
  114.             ctx.writeAndFlush(mqttPublishMessage);
  115.         });
  116.     }
复制代码
在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId等,在写demo的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。
使用BountInt这个简单的工具类来生成从0~65535的packageId,满足协议的要求
pulsar processor实现

pulsar相比kafka来说,更适合作为mqtt协议的代理。原因有如下几点:

  • pulsar支持百万topic、topic实现更轻量
  • pulsar原生支持listener的消费模式,不需要每个消费者启动一个线程
  • pulsar支持share的消费模式,消费模式更灵活
  • pulsar消费者的subscribe可确保成功创建订阅,相比kafka的消费者没有这样的语义保障
  1. protected final ReentrantReadWriteLock.ReadLock rLock;
  2. protected final ReentrantReadWriteLock.WriteLock wLock;
  3. protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
  4. protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
  5. protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
  6. protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
  7. private final PulsarProcessorConfig pulsarProcessorConfig;
  8. private final PulsarAdmin pulsarAdmin;
  9. private final PulsarClient pulsarClient;
  10. public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {
  11. super(mqttAuth);
  12.         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  13.         rLock = lock.readLock();
  14.         wLock = lock.writeLock();
  15. this.sessionProducerMap = new HashMap<>();
  16. this.sessionConsumerMap = new HashMap<>();
  17. this.producerMap = new HashMap<>();
  18. this.consumerMap = new HashMap<>();
  19. this.pulsarProcessorConfig = pulsarProcessorConfig;
  20. try {
  21. this.pulsarAdmin = PulsarAdmin.builder()
  22.                     .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())
  23.                     .build();
  24. this.pulsarClient = PulsarClient.builder()
  25.                     .serviceUrl(pulsarProcessorConfig.getServiceUrl())
  26.                     .build();
  27.         } catch (Exception e) {
  28. throw new IllegalStateException("Failed to create pulsar client", e);
  29.         }
  30.     }
复制代码
处理publish消息
  1.    @Override
  2. protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  3.                              MqttPublishMessage msg) throws Exception {
  4.         String topic = msg.variableHeader().topicName();
  5.         Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);
  6.         int len = msg.payload().readableBytes();
  7.         byte[] messageBytes = new byte[len];
  8.         msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
  9. switch (msg.fixedHeader().qosLevel()) {
  10. case AT_MOST_ONCE -> producer.sendAsync(messageBytes).
  11.                     thenAccept(messageId -> log.info("clientId [{}],"
  12.                                     + " username [{}]. send message to pulsar success messageId: {}",
  13.                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))
  14.                     .exceptionally((e) -> {
  15.                         log.error("clientId [{}], username [{}]. send message to pulsar fail: ",
  16.                                 mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
  17. return null;
  18.                     });
  19. case AT_LEAST_ONCE -> {
  20. try {
  21.                     MessageId messageId = producer.send(messageBytes);
  22.                     MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,
  23. false, MqttQoS.AT_MOST_ONCE, false, 0);
  24.                     MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,
  25.                             MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
  26.                     log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",
  27.                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);
  28.                     ctx.writeAndFlush(pubAckMessage);
  29.                 } catch (PulsarClientException e) {
  30.                     log.error("clientId [{}], username [{}]. send pulsar error: {}",
  31.                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());
  32.                 }
  33.             }
  34. case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
  35.                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
  36.         }
  37.     }
  38. private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {
  39.         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  40.         mqttTopicKey.setTopic(topic);
  41.         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  42.         rLock.lock();
  43. try {
  44.             Producer<byte[]> producer = producerMap.get(mqttTopicKey);
  45. if (producer != null) {
  46. return producer;
  47.             }
  48.         } finally {
  49.             rLock.unlock();
  50.         }
  51.         wLock.lock();
  52. try {
  53.             Producer<byte[]> producer = producerMap.get(mqttTopicKey);
  54. if (producer == null) {
  55.                 producer = createProducer(topic);
  56.                 sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  57. if (mqttTopicKeys == null) {
  58.                         mqttTopicKeys = new ArrayList<>();
  59.                     }
  60.                     mqttTopicKeys.add(mqttTopicKey);
  61. return mqttTopicKeys;
  62.                 });
  63.                 producerMap.put(mqttTopicKey, producer);
  64.             }
  65. return producer;
  66.         } finally {
  67.             wLock.unlock();
  68.         }
  69.     }
  70. protected Producer<byte[]> createProducer(String topic) throws Exception {
  71. return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
  72.     }
复制代码
处理subscribe消息
  1.     @Override
  2. protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  3.                                MqttSubscribePayload subscribePayload) throws Exception {
  4. for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
  5.             subscribe(ctx, mqttSessionKey, topicSubscription.topicName());
  6.         }
  7.     }
  8. protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
  9.                              String topic) throws Exception {
  10.         MqttTopicKey mqttTopicKey = new MqttTopicKey();
  11.         mqttTopicKey.setTopic(topic);
  12.         mqttTopicKey.setMqttSessionKey(mqttSessionKey);
  13.         wLock.lock();
  14. try {
  15.             Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);
  16. if (consumer == null) {
  17.                 consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);
  18.                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
  19. if (mqttTopicKeys == null) {
  20.                         mqttTopicKeys = new ArrayList<>();
  21.                     }
  22.                     mqttTopicKeys.add(mqttTopicKey);
  23. return mqttTopicKeys;
  24.                 });
  25.                 consumerMap.put(mqttTopicKey, consumer);
  26.             }
  27.         } finally {
  28.             wLock.unlock();
  29.         }
  30.     }
  31. protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,
  32.                                               String topic) throws Exception {
  33.         BoundInt boundInt = new BoundInt(65535);
  34. try {
  35.             PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);
  36.             log.info("topic {} partitioned stats {}", topic, partitionedStats);
  37.         } catch (PulsarAdminException.NotFoundException nfe) {
  38.             log.info("topic {} not found", topic);
  39.             pulsarAdmin.topics().createPartitionedTopic(topic, 1);
  40.         }
  41. return pulsarClient.newConsumer(Schema.BYTES).topic(topic)
  42.                 .messageListener((consumer, msg) -> {
  43.                     log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());
  44.                     MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
  45.                             MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());
  46.                     ctx.writeAndFlush(mqttPublishMessage);
  47.                 })
  48.                 .subscriptionName(username).subscribe();
  49.     }
复制代码
集成测试

kafka

我们可以通过embedded-kafka-java这个项目来启动用做单元测试的kafka broker。通过如下的group引入依赖
  1. <dependency>
  2. <groupId>io.github.embedded-middleware</groupId>
  3. <artifactId>embedded-kafka-core</artifactId>
  4. <version>0.0.2</version>
  5. <scope>test</scope>
  6. </dependency>
复制代码
我们就可以通过如下的代码启动基于kafka的mqtt broker
  1. @Slf4j
  2. public class MqttKafkaTestUtil {
  3. public static MqttServer setupMqttKafka() throws Exception {
  4.         EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();
  5. new Thread(() -> {
  6. try {
  7.                 embeddedKafkaServer.start();
  8.             } catch (Exception e) {
  9.                 log.error("kafka broker started exception ", e);
  10.             }
  11.         }).start();
  12.         Thread.sleep(5_000);
  13.         MqttServerConfig mqttServerConfig = new MqttServerConfig();
  14.         mqttServerConfig.setPort(0);
  15.         mqttServerConfig.setProcessorType(ProcessorType.KAFKA);
  16.         KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();
  17.         kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));
  18.         mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);
  19.         MqttServer mqttServer = new MqttServer(mqttServerConfig);
  20. new Thread(() -> {
  21. try {
  22.                 mqttServer.start();
  23.             } catch (Exception e) {
  24.                 log.error("mqsar broker started exception ", e);
  25.             }
  26.         }).start();
  27.         Thread.sleep(5000L);
  28. return mqttServer;
  29.     }
  30. }
复制代码
kafka端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来
  1. @Log4j2
  2. public class MqttKafkaPubSubTest {
  3.     @Test
  4. public void pubSubTest() throws Exception {
  5.         MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();
  6.         String topic = UUID.randomUUID().toString();
  7.         String content = "test-msg";
  8.         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
  9.         String clientId = UUID.randomUUID().toString();
  10.         MemoryPersistence persistence = new MemoryPersistence();
  11.         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  12.         MqttConnectOptions connOpts = new MqttConnectOptions();
  13.         connOpts.setUserName(UUID.randomUUID().toString());
  14.         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
  15.         connOpts.setCleanSession(true);
  16.         log.info("Mqtt connecting to broker");
  17.         sampleClient.connect(connOpts);
  18.         CompletableFuture<String> future = new CompletableFuture<>();
  19.         log.info("Mqtt subscribing");
  20.         sampleClient.subscribe(topic, (s, mqttMessage) -> {
  21.             log.info("messageArrived");
  22.             future.complete(mqttMessage.toString());
  23.         });
  24.         log.info("Mqtt subscribed");
  25.         MqttMessage message = new MqttMessage(content.getBytes());
  26.         message.setQos(1);
  27.         log.info("Mqtt message publishing");
  28.         sampleClient.publish(topic, message);
  29.         log.info("Mqtt message published");
  30.         TimeUnit.SECONDS.sleep(3);
  31.         sampleClient.disconnect();
  32.         String msg = future.get(5, TimeUnit.SECONDS);
  33.         Assertions.assertEquals(content, msg);
  34.     }
  35. }
复制代码
pulsar

我们可以通过embedded-pulsar-java这个项目来启动用做单元测试的pulsar broker。通过如下的group引入依赖
  1. <dependency>
  2. <groupId>io.github.embedded-middleware</groupId>
  3. <artifactId>embedded-pulsar-core</artifactId>
  4. <version>0.0.2</version>
  5. <scope>test</scope>
  6. </dependency>
复制代码
我们就可以通过如下的代码启动基于pulsar的mqtt broker
  1. @Slf4j
  2. public class MqttPulsarTestUtil {
  3. public static MqttServer setupMqttPulsar() throws Exception {
  4.         EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();
  5.         embeddedPulsarServer.start();
  6.         MqttServerConfig mqttServerConfig = new MqttServerConfig();
  7.         mqttServerConfig.setPort(0);
  8.         mqttServerConfig.setProcessorType(ProcessorType.PULSAR);
  9.         PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();
  10.         pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));
  11.         pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));
  12.         mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);
  13.         MqttServer mqttServer = new MqttServer(mqttServerConfig);
  14. new Thread(() -> {
  15. try {
  16.                 mqttServer.start();
  17.             } catch (Exception e) {
  18.                 log.error("mqsar broker started exception ", e);
  19.             }
  20.         }).start();
  21.         Thread.sleep(5000L);
  22. return mqttServer;
  23.     }
  24. }
复制代码
pulsar端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来
  1. @Log4j2
  2. public class MqttPulsarPubSubTest {
  3.     @Test
  4. public void pubSubTest() throws Exception {
  5.         MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();
  6.         String topic = UUID.randomUUID().toString();
  7.         String content = "test-msg";
  8.         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
  9.         String clientId = UUID.randomUUID().toString();
  10.         MemoryPersistence persistence = new MemoryPersistence();
  11.         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
  12.         MqttConnectOptions connOpts = new MqttConnectOptions();
  13.         connOpts.setUserName(UUID.randomUUID().toString());
  14.         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
  15.         connOpts.setCleanSession(true);
  16.         log.info("Mqtt connecting to broker");
  17.         sampleClient.connect(connOpts);
  18.         CompletableFuture<String> future = new CompletableFuture<>();
  19.         log.info("Mqtt subscribing");
  20.         sampleClient.subscribe(topic, (s, mqttMessage) -> {
  21.             log.info("messageArrived");
  22.             future.complete(mqttMessage.toString());
  23.         });
  24.         log.info("Mqtt subscribed");
  25.         MqttMessage message = new MqttMessage(content.getBytes());
  26.         message.setQos(1);
  27.         log.info("Mqtt message publishing");
  28.         sampleClient.publish(topic, message);
  29.         log.info("Mqtt message published");
  30.         TimeUnit.SECONDS.sleep(3);
  31.         sampleClient.disconnect();
  32.         String msg = future.get(5, TimeUnit.SECONDS);
  33.         Assertions.assertEquals(content, msg);
  34.     }
  35. }
复制代码
性能优化

这里我们简单描述几个性能优化点,像一些调整线程数、buffer大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。
在linux上使用Epoll网络模型

[code]public class EventLoopUtil { /**     * @return an EventLoopGroup suitable for the current platform */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { if (Epoll.isAvailable()) { return new EpollEventLoopGroup(nThreads, threadFactory);        } else { return new NioEventLoopGroup(nThreads, threadFactory);        }    } public static Class
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表