物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。这篇文章的目的是手把手教大家写书写一个mqtt网关,后端存储支持Kafka/Pulsar,支持mqtt 连接、断链、发送消息、订阅消息。技术选型:
- Netty java最流行的网络框架
- netty-codec-mqtt netty的子项目,mqtt编解码插件
- Pulsar/Kafka 流行的消息中间件作为后端存储
核心pom依赖如下- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-mqtt</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${pulsar.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- </dependency>
- <dependency>
- <groupId>org.eclipse.paho</groupId>
- <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
- <version>${mqtt-client.version}</version>
- <scope>test</scope>
- </dependency>
复制代码 软件参数设计
监听端口的配置即使是写demo也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在java中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用mqtt的默认端口1883。- package io.github.protocol.mqtt.broker.util;
- import java.io.IOException;
- import java.io.UncheckedIOException;
- import java.net.ServerSocket;
- public class SocketUtil {
- public static int getFreePort() {
- try (ServerSocket serverSocket = new ServerSocket(0)) {
- return serverSocket.getLocalPort();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- }
- }
复制代码 后端存储配置
我们的mqtt网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持Pulsar、Kafka两种类型。定义枚举类如下- public enum ProcessorType {
- }
复制代码 对应的KafkaProcessorConfig、PulsarProcessorConfig比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项- @Setter
- @Getter
- public class KafkaProcessorConfig {
- private String bootstrapServers = "localhost:9092";
- public KafkaProcessorConfig() {
- }
- }
- @Setter
- @Getter
- public class PulsarProcessorConfig {
- private String httpUrl = "http://localhost:8080";
- private String serviceUrl = "pulsar://localhost:6650";
- public PulsarProcessorConfig() {
- }
- }
复制代码 启动netty MqttServer
我们通过netty启动一个mqttServer,添加mqtt解码器- package io.github.protocol.mqtt.broker;
- import io.github.protocol.mqtt.broker.processor.KafkaProcessor;
- import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig;
- import io.github.protocol.mqtt.broker.processor.MqttProcessor;
- import io.github.protocol.mqtt.broker.processor.PulsarProcessor;
- import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig;
- import io.github.protocol.mqtt.broker.util.SocketUtil;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.mqtt.MqttDecoder;
- import io.netty.handler.codec.mqtt.MqttEncoder;
- import io.netty.handler.logging.LogLevel;
- import io.netty.handler.logging.LoggingHandler;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class MqttServer {
- private final MqttServerConfig mqttServerConfig;
- public MqttServer() {
- this(new MqttServerConfig());
- }
- public MqttServer(MqttServerConfig mqttServerConfig) {
- this.mqttServerConfig = mqttServerConfig;
- if (mqttServerConfig.getPort() == 0) {
- mqttServerConfig.setPort(SocketUtil.getFreePort());
- }
- }
- public void start() throws Exception {
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- // decoder
- p.addLast(new MqttDecoder());
- p.addLast(MqttEncoder.INSTANCE);
- }
- });
- // Start the server.
- ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();
- // Wait until the server socket is closed.
- f.channel().closeFuture().sync();
- } finally {
- // Shut down all event loops to terminate all threads.
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- private MqttProcessor processor(MqttServerConfig config) {
- return switch (config.getProcessorType()) {
- case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());
- case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());
- };
- }
- public int getPort() {
- return mqttServerConfig.getPort();
- }
- }
复制代码 MqttserverStarter.java
我们写一个简单的main函数用来启动mqttServer,方便调测- package io.github.protocol.mqtt.broker;
- public class MqttServerStarter {
- public static void main(String[] args) throws Exception {
- new MqttServer().start();
- }
- }
复制代码 客户端使用eclipse mqtt client进行测试- package io.github.protocol.mqtt;
- import lombok.extern.log4j.Log4j2;
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- @Log4j2
- public class MqttClientPublishExample {
- public static void main(String[] args) throws Exception {
- String topic = "MQTT Examples";
- String content = "Message from MqttPublishExample";
- int qos = 2;
- String broker = "tcp://";
- String clientId = "JavaSample";
- MemoryPersistence persistence = new MemoryPersistence();
- try {
- MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(true);
- log.info("Connecting to broker: {}", broker);
- sampleClient.connect(connOpts);
- log.info("Connected");
- log.info("Publishing message: {}", content);
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(qos);
- sampleClient.publish(topic, message);
- log.info("Message published");
- sampleClient.disconnect();
- log.info("Disconnected");
- System.exit(0);
- } catch (MqttException me) {
- log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);
- }
- }
- }
复制代码 然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了- Connecting to broker: tcp://
复制代码 这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应
所以我们需要在接收到Connect后,返回connAck消息。我们创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要重点继承其中的channelRead方法,以及channelInactive方法,用来释放断链时需要释放的资源- package com.github.shoothzj.mqtt;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class MqttHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- super.channelRead(ctx, msg);
- }
- }
复制代码 然后把这个handler加入到netty的职责链中,放到解码器的后面
在mqtt handler中插入我们的代码- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- super.channelRead(ctx, msg);
- if (msg instanceof MqttConnectMessage) {
- handleConnect(ctx, (MqttConnectMessage) msg);
- } else {
- log.error("Unsupported type msg [{}]", msg);
- }
- }
- private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {
- log.info("connect msg is [{}]", connectMessage);
- }
复制代码 打印出connectMessage如下- [MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
复制代码- 通常,mqtt connect message中会包含qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为null,我们先不校验这些消息,直接给客户端返回connack消息,代表连接成功
复制代码- final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
- ctx.channel().writeAndFlush(ackMessage);
复制代码 我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景
附上此阶段的MqttHandler代码- package com.github.shoothzj.mqtt;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.codec.mqtt.MqttConnAckMessage;import io.netty.handler.codec.mqtt.MqttConnectMessage;import io.netty.handler.codec.mqtt.MqttConnectPayload;import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;import io.netty.handler.codec.mqtt.MqttFixedHeader;import io.netty.handler.codec.mqtt.MqttMessageBuilders;import lombok.extern.slf4j.Slf4j;import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;@Slf4jpublic class MqttHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); } else { log.error("Unsupported type msg [{}]", msg); } } private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { log.info("connect msg is [{}]", connectMessage); final MqttFixedHeader fixedHeader = connectMessage.fixedHeader(); final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader(); final MqttConnectPayload connectPayload = connectMessage.payload(); final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
- ctx.channel().writeAndFlush(ackMessage); }}
复制代码 我们当前把所有的逻辑都放在MqttHandler里面,不方便后续的扩展。抽象出一个MqttProcessor接口来处理具体的请求,MqttHandler负责解析MqttMessage的类型并分发。MqttProcess接口设计如下- package io.github.protocol.mqtt.broker.processor;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.mqtt.MqttConnAckMessage;
- import io.netty.handler.codec.mqtt.MqttConnectMessage;
- import io.netty.handler.codec.mqtt.MqttMessage;
- import io.netty.handler.codec.mqtt.MqttPubAckMessage;
- import io.netty.handler.codec.mqtt.MqttPublishMessage;
- import io.netty.handler.codec.mqtt.MqttSubAckMessage;
- import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
- import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
- import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
- public interface MqttProcessor {
- void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;
- void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;
- void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;
- void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;
- void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;
- void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;
- void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;
- void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;
- void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- void processDisconnect(ChannelHandlerContext ctx) throws Exception;
- void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;
- }
复制代码 我们允许这些方法抛出异常,当遇到极难处理的故障时,把mqtt连接断掉(如后端存储故障),等待客户端的重连。
MqttHandler中来调用MqttProcessor,相关MqttHandler代码如下- Preconditions.checkArgument(message instanceof MqttMessage);
- MqttMessage msg = (MqttMessage) message;
- try {
- if (msg.decoderResult().isFailure()) {
- Throwable cause = msg.decoderResult().cause();
- if (cause instanceof MqttUnacceptableProtocolVersionException) {
- // Unsupported protocol version
- MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.CONNACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0),
- new MqttConnAckVariableHeader(
- false), null);
- ctx.writeAndFlush(connAckMessage);
- log.error("connection refused due to invalid protocol, client address [{}]",
- ctx.channel().remoteAddress());
- ctx.close();
- return;
- } else if (cause instanceof MqttIdentifierRejectedException) {
- // ineligible clientId
- MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.CONNACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0),
- new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
- false), null);
- ctx.writeAndFlush(connAckMessage);
- log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());
- ctx.close();
- return;
- }
- throw new IllegalStateException(msg.decoderResult().cause().getMessage());
- }
- MqttMessageType messageType = msg.fixedHeader().messageType();
- if (log.isDebugEnabled()) {
- log.debug("Processing MQTT Inbound handler message, type={}", messageType);
- }
- switch (messageType) {
- case CONNECT:
- Preconditions.checkArgument(msg instanceof MqttConnectMessage);
- processor.processConnect(ctx, (MqttConnectMessage) msg);
- break;
- case CONNACK:
- Preconditions.checkArgument(msg instanceof MqttConnAckMessage);
- processor.processConnAck(ctx, (MqttConnAckMessage) msg);
- break;
- case PUBLISH:
- Preconditions.checkArgument(msg instanceof MqttPublishMessage);
- processor.processPublish(ctx, (MqttPublishMessage) msg);
- break;
- case PUBACK:
- Preconditions.checkArgument(msg instanceof MqttPubAckMessage);
- processor.processPubAck(ctx, (MqttPubAckMessage) msg);
- break;
- case PUBREC:
- processor.processPubRec(ctx, msg);
- break;
- case PUBREL:
- processor.processPubRel(ctx, msg);
- break;
- case PUBCOMP:
- processor.processPubComp(ctx, msg);
- break;
- Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);
- processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);
- break;
- case SUBACK:
- Preconditions.checkArgument(msg instanceof MqttSubAckMessage);
- processor.processSubAck(ctx, (MqttSubAckMessage) msg);
- break;
- Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);
- processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
- break;
- case UNSUBACK:
- Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);
- processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);
- break;
- case PINGREQ:
- processor.processPingReq(ctx, msg);
- break;
- case PINGRESP:
- processor.processPingResp(ctx, msg);
- break;
- processor.processDisconnect(ctx);
- break;
- case AUTH:
- processor.processAuth(ctx, msg);
- break;
- default:
- throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
- }
- } catch (Throwable ex) {
- ReferenceCountUtil.safeRelease(msg);
- log.error("Exception was caught while processing MQTT message, ", ex);
- ctx.close();
- }
复制代码 这里的代码,主要是针对MqttMessage的不同类型,调用MqttProcessor的不同方法,值得一提的有两点
- 提前判断了一些解码异常,fast fail
- 全局捕获异常,并进行断链处理
维护Mqtt会话的session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护Mqtt的Session,我们构筑一个AbstractMqttProcessor来维护MqttSession- package io.github.protocol.mqtt.broker.processor;
- import io.github.protocol.mqtt.broker.MqttSessionKey;
- import io.github.protocol.mqtt.broker.auth.MqttAuth;
- import io.github.protocol.mqtt.broker.util.ChannelUtils;
- import io.github.protocol.mqtt.broker.util.MqttMessageUtil;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.mqtt.MqttConnAckMessage;
- import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
- import io.netty.handler.codec.mqtt.MqttConnectMessage;
- import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
- import io.netty.handler.codec.mqtt.MqttFixedHeader;
- import io.netty.handler.codec.mqtt.MqttMessage;
- import io.netty.handler.codec.mqtt.MqttMessageFactory;
- import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
- import io.netty.handler.codec.mqtt.MqttMessageType;
- import io.netty.handler.codec.mqtt.MqttPubAckMessage;
- import io.netty.handler.codec.mqtt.MqttPublishMessage;
- import io.netty.handler.codec.mqtt.MqttQoS;
- import io.netty.handler.codec.mqtt.MqttSubAckMessage;
- import io.netty.handler.codec.mqtt.MqttSubAckPayload;
- import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
- import io.netty.handler.codec.mqtt.MqttSubscribePayload;
- import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
- import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import java.util.stream.IntStream;
- @Slf4j
- public abstract class AbstractProcessor implements MqttProcessor {
- protected final MqttAuth mqttAuth;
- public AbstractProcessor(MqttAuth mqttAuth) {
- this.mqttAuth = mqttAuth;
- }
- @Override
- public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {
- String clientId = msg.payload().clientIdentifier();
- String username = msg.payload().userName();
- byte[] pwd = msg.payload().passwordInBytes();
- if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {
- MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.CONNACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0),
- new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,
- false), null);
- ctx.writeAndFlush(connAckMessage);
- log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
- ctx.close();
- return;
- }
- if (!mqttAuth.connAuth(clientId, username, pwd)) {
- MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.CONNACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0),
- new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,
- false), null);
- ctx.writeAndFlush(connAckMessage);
- log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());
- ctx.close();
- return;
- }
- MqttSessionKey mqttSessionKey = new MqttSessionKey();
- mqttSessionKey.setUsername(username);
- mqttSessionKey.setClientId(clientId);
- ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);
- log.info("username {} clientId {} remote address {} connected",
- username, clientId, ctx.channel().remoteAddress());
- onConnect(mqttSessionKey);
- MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(
- new MqttFixedHeader(MqttMessageType.CONNACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0),
- new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
- null);
- ctx.writeAndFlush(mqttConnectMessage);
- }
- protected void onConnect(MqttSessionKey mqttSessionKey) {
- }
- @Override
- public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("publish, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- return;
- }
- if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {
- log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());
- return;
- }
- if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {
- log.error("does not support QoS2 protocol. clientId {}, username {} ",
- mqttSession.getClientId(), mqttSession.getUsername());
- return;
- }
- onPublish(ctx, mqttSession, msg);
- }
- protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- MqttPublishMessage msg) throws Exception {
- }
- @Override
- public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("sub, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- onSubscribe(ctx, mqttSession, msg.payload());
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0);
- IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());
- MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());
- ctx.writeAndFlush(MqttMessageFactory.newMessage(
- fixedHeader,
- MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),
- payload));
- }
- protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- MqttSubscribePayload subscribePayload) throws Exception {
- }
- @Override
- public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- ctx.writeAndFlush(MqttMessageUtil.pingResp());
- }
- @Override
- public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- @Override
- public void processDisconnect(ChannelHandlerContext ctx) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());
- }
- onDisconnect(mqttSession);
- }
- protected void onDisconnect(MqttSessionKey mqttSessionKey) {
- }
- @Override
- public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
- MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());
- if (mqttSession == null) {
- log.error("auth, client address {} not authed", ctx.channel().remoteAddress());
- ctx.close();
- }
- }
- }
复制代码 可以看到,这里的AbstractProcessor主要是维护了MqttSessionKey,校验MqttSessionKey,并拦截publish中不支持的Qos2、Failure。同时,也影响了mqtt心跳请求。同样的,我们允许在onPublish、onSubscribe中抛出异常。
基于消息队列实现的mqtt网关的基础思想也比较简单,简而言之就是,有publish消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个mqtt topic和producer、consumer的对应关系,因为像kafka、pulsar这些消息中间件的消费者都是区分topic的,片段通用代码如下:- protected final ReentrantReadWriteLock.ReadLock rLock;
- protected final ReentrantReadWriteLock.WriteLock wLock;
- protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
- protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
- protected final Map<MqttTopicKey, P> producerMap;
- protected final Map<MqttTopicKey, C> consumerMap;
- public AbstractMqProcessor(MqttAuth mqttAuth) {
- super(mqttAuth);
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- rLock = lock.readLock();
- wLock = lock.writeLock();
- this.sessionProducerMap = new HashMap<>();
- this.sessionConsumerMap = new HashMap<>();
- this.producerMap = new HashMap<>();
- this.consumerMap = new HashMap<>();
- }
- @Override
- protected void onConnect(MqttSessionKey mqttSessionKey) {
- wLock.lock();
- try {
- sessionProducerMap.put(mqttSessionKey, new ArrayList<>());
- sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());
- } finally {
- wLock.unlock();
- }
- }
- @Override
- protected void onDisconnect(MqttSessionKey mqttSessionKey) {
- wLock.lock();
- try {
- // find producers
- List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);
- if (produceTopicKeys != null) {
- for (MqttTopicKey mqttTopicKey : produceTopicKeys) {
- P producer = producerMap.get(mqttTopicKey);
- if (producer != null) {
- ClosableUtils.close(producer);
- producerMap.remove(mqttTopicKey);
- }
- }
- }
- sessionProducerMap.remove(mqttSessionKey);
- List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);
- if (consumeTopicKeys != null) {
- for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {
- C consumer = consumerMap.get(mqttTopicKey);
- if (consumer != null) {
- ClosableUtils.close(consumer);
- consumerMap.remove(mqttTopicKey);
- }
- }
- }
- sessionConsumerMap.remove(mqttSessionKey);
- } finally {
- wLock.unlock();
- }
- }
- }
复制代码 kafka processor实现
- 由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动KafkaProducer
复制代码- private final KafkaProcessorConfig kafkaProcessorConfig;
- private final KafkaProducer<String, ByteBuffer> producer;
- public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {
- super(mqttAuth);
- this.kafkaProcessorConfig = kafkaProcessorConfig;
- this.producer = createProducer();
- }
- protected KafkaProducer<String, ByteBuffer> createProducer() {
- Properties properties = new Properties();
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);
- return new KafkaProducer<>(properties);
- }
复制代码- 处理MqttPublish消息,MqttPublish消息包含如下几个关键参数
复制代码- MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel();
- String topic = publishMessage.variableHeader().topicName();
- ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
复制代码 其中
- qos代表这条消息的质量级别,0没有任何保障,1代表至少一次,2代表恰好一次。当前仅支持qos0、qos1
- topicName就是topic的名称
- ByteBuffer就是消息的内容
根据topic、qos发送消息,代码如下- String topic = msg.variableHeader().topicName();
- ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());
- switch (msg.fixedHeader().qosLevel()) {
- case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {
- if (exception != null) {
- log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);
- return;
- }
- log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
- mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());
- });
- case AT_LEAST_ONCE -> {
- try {
- RecordMetadata recordMetadata = producer.send(record).get();
- log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",
- mqttSessionKey, recordMetadata.topic(),
- recordMetadata.partition(), recordMetadata.offset());
- ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));
- } catch (Exception e) {
- log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);
- }
- }
- case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
- String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
- }
复制代码 处理订阅消息,我们暂时仅根据订阅的topic,创建topic进行消费即可,由于kafka原生客户端建议的消费代码模式如下- while (true) {
- ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, byte[]> record : records) {
- // do logic
- }
- }
复制代码 我们需要切换到其他线程对consumer进行消息,书写一个KafkaConsumerListenerWrapper的wrapper,转换为listener异步消费模型- package io.github.protocol.mqtt.broker.processor;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.admin.AdminClient;
- import org.apache.kafka.clients.admin.AdminClientConfig;
- import org.apache.kafka.clients.admin.KafkaAdminClient;
- import org.apache.kafka.clients.admin.NewTopic;
- import org.apache.kafka.clients.admin.TopicDescription;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
- import org.apache.kafka.common.errors.WakeupException;
- import org.apache.kafka.common.serialization.ByteArrayDeserializer;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
- @Slf4j
- public class KafkaConsumerListenerWrapper implements AutoCloseable {
- private final AdminClient adminClient;
- private final KafkaConsumer<String, byte[]> consumer;
- public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {
- Properties adminProperties = new Properties();
- adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
- this.adminClient = KafkaAdminClient.create(adminProperties);
- Properties properties = new Properties();
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- this.consumer = new KafkaConsumer<>(properties);
- }
- public void start(String topic, KafkaMessageListener listener) throws Exception {
- try {
- TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))
- .values().get(topic).get();
- log.info("topic info is {}", topicDescription);
- } catch (ExecutionException ee) {
- if (ee.getCause() instanceof UnknownTopicOrPartitionException) {
- log.info("topic {} not exist, create it", topic);
- adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));
- } else {
- log.error("find topic info {} error", topic, ee);
- }
- } catch (Exception e) {
- throw new IllegalStateException("find topic info error", e);
- }
- consumer.subscribe(Collections.singletonList(topic));
- log.info("consumer topic {} start", topic);
- new Thread(() -> {
- try {
- while (true) {
- ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord<String, byte[]> record : records) {
- listener.messageReceived(record);
- }
- }
- } catch (WakeupException we) {
- consumer.close();
- } catch (Exception e) {
- log.error("consumer topic {} consume error", topic, e);
- consumer.close();
- }
- }).start();
- Thread.sleep(5_000);
- }
- @Override
- public void close() throws Exception {
- log.info("wake up {} consumer", consumer);
- consumer.wakeup();
- }
- }
- @Override
- protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- MqttSubscribePayload subscribePayload) throws Exception {
- for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
- KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());
- subscribe(ctx, consumer, topicSubscription.topicName());
- }
- }
- private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {
- MqttTopicKey mqttTopicKey = new MqttTopicKey();
- mqttTopicKey.setTopic(topic);
- mqttTopicKey.setMqttSessionKey(mqttSessionKey);
- wLock.lock();
- try {
- KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);
- if (consumer == null) {
- consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());
- sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
- if (mqttTopicKeys == null) {
- mqttTopicKeys = new ArrayList<>();
- }
- mqttTopicKeys.add(mqttTopicKey);
- return mqttTopicKeys;
- });
- consumerMap.put(mqttTopicKey, consumer);
- }
- return consumer;
- } finally {
- wLock.unlock();
- }
- }
- protected void subscribe(ChannelHandlerContext ctx,
- KafkaConsumerListenerWrapper consumer, String topic) throws Exception {
- BoundInt boundInt = new BoundInt(65535);
- consumer.start(topic, record -> {
- log.info("receive message from kafka, topic {}, partition {}, offset {}",
- record.topic(), record.partition(), record.offset());
- MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
- MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());
- ctx.writeAndFlush(mqttPublishMessage);
- });
- }
复制代码 在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId等,在写demo的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。
pulsar processor实现
- pulsar支持百万topic、topic实现更轻量
- pulsar原生支持listener的消费模式,不需要每个消费者启动一个线程
- pulsar支持share的消费模式,消费模式更灵活
- pulsar消费者的subscribe可确保成功创建订阅,相比kafka的消费者没有这样的语义保障
- protected final ReentrantReadWriteLock.ReadLock rLock;
- protected final ReentrantReadWriteLock.WriteLock wLock;
- protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;
- protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;
- protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;
- protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;
- private final PulsarProcessorConfig pulsarProcessorConfig;
- private final PulsarAdmin pulsarAdmin;
- private final PulsarClient pulsarClient;
- public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {
- super(mqttAuth);
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- rLock = lock.readLock();
- wLock = lock.writeLock();
- this.sessionProducerMap = new HashMap<>();
- this.sessionConsumerMap = new HashMap<>();
- this.producerMap = new HashMap<>();
- this.consumerMap = new HashMap<>();
- this.pulsarProcessorConfig = pulsarProcessorConfig;
- try {
- this.pulsarAdmin = PulsarAdmin.builder()
- .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())
- .build();
- this.pulsarClient = PulsarClient.builder()
- .serviceUrl(pulsarProcessorConfig.getServiceUrl())
- .build();
- } catch (Exception e) {
- throw new IllegalStateException("Failed to create pulsar client", e);
- }
- }
复制代码 处理publish消息- @Override
- protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- MqttPublishMessage msg) throws Exception {
- String topic = msg.variableHeader().topicName();
- Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);
- int len = msg.payload().readableBytes();
- byte[] messageBytes = new byte[len];
- msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);
- switch (msg.fixedHeader().qosLevel()) {
- case AT_MOST_ONCE -> producer.sendAsync(messageBytes).
- thenAccept(messageId -> log.info("clientId [{}],"
- + " username [{}]. send message to pulsar success messageId: {}",
- mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))
- .exceptionally((e) -> {
- log.error("clientId [{}], username [{}]. send message to pulsar fail: ",
- mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);
- return null;
- });
- case AT_LEAST_ONCE -> {
- try {
- MessageId messageId = producer.send(messageBytes);
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,
- false, MqttQoS.AT_MOST_ONCE, false, 0);
- MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,
- MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);
- log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",
- mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);
- ctx.writeAndFlush(pubAckMessage);
- } catch (PulsarClientException e) {
- log.error("clientId [{}], username [{}]. send pulsar error: {}",
- mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());
- }
- }
- case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(
- String.format("mqttSessionKey %s can not reach here", mqttSessionKey));
- }
- }
- private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {
- MqttTopicKey mqttTopicKey = new MqttTopicKey();
- mqttTopicKey.setTopic(topic);
- mqttTopicKey.setMqttSessionKey(mqttSessionKey);
- rLock.lock();
- try {
- Producer<byte[]> producer = producerMap.get(mqttTopicKey);
- if (producer != null) {
- return producer;
- }
- } finally {
- rLock.unlock();
- }
- wLock.lock();
- try {
- Producer<byte[]> producer = producerMap.get(mqttTopicKey);
- if (producer == null) {
- producer = createProducer(topic);
- sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
- if (mqttTopicKeys == null) {
- mqttTopicKeys = new ArrayList<>();
- }
- mqttTopicKeys.add(mqttTopicKey);
- return mqttTopicKeys;
- });
- producerMap.put(mqttTopicKey, producer);
- }
- return producer;
- } finally {
- wLock.unlock();
- }
- }
- protected Producer<byte[]> createProducer(String topic) throws Exception {
- return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
- }
复制代码 处理subscribe消息- @Override
- protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- MqttSubscribePayload subscribePayload) throws Exception {
- for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {
- subscribe(ctx, mqttSessionKey, topicSubscription.topicName());
- }
- }
- protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,
- String topic) throws Exception {
- MqttTopicKey mqttTopicKey = new MqttTopicKey();
- mqttTopicKey.setTopic(topic);
- mqttTopicKey.setMqttSessionKey(mqttSessionKey);
- wLock.lock();
- try {
- Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);
- if (consumer == null) {
- consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);
- sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {
- if (mqttTopicKeys == null) {
- mqttTopicKeys = new ArrayList<>();
- }
- mqttTopicKeys.add(mqttTopicKey);
- return mqttTopicKeys;
- });
- consumerMap.put(mqttTopicKey, consumer);
- }
- } finally {
- wLock.unlock();
- }
- }
- protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,
- String topic) throws Exception {
- BoundInt boundInt = new BoundInt(65535);
- try {
- PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);
- log.info("topic {} partitioned stats {}", topic, partitionedStats);
- } catch (PulsarAdminException.NotFoundException nfe) {
- log.info("topic {} not found", topic);
- pulsarAdmin.topics().createPartitionedTopic(topic, 1);
- }
- return pulsarClient.newConsumer(Schema.BYTES).topic(topic)
- .messageListener((consumer, msg) -> {
- log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());
- MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(
- MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());
- ctx.writeAndFlush(mqttPublishMessage);
- })
- .subscriptionName(username).subscribe();
- }
复制代码 集成测试
我们可以通过embedded-kafka-java这个项目来启动用做单元测试的kafka broker。通过如下的group引入依赖- <dependency>
- <groupId>io.github.embedded-middleware</groupId>
- <artifactId>embedded-kafka-core</artifactId>
- <version>0.0.2</version>
- <scope>test</scope>
- </dependency>
复制代码 我们就可以通过如下的代码启动基于kafka的mqtt broker- @Slf4j
- public class MqttKafkaTestUtil {
- public static MqttServer setupMqttKafka() throws Exception {
- EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();
- new Thread(() -> {
- try {
- embeddedKafkaServer.start();
- } catch (Exception e) {
- log.error("kafka broker started exception ", e);
- }
- }).start();
- Thread.sleep(5_000);
- MqttServerConfig mqttServerConfig = new MqttServerConfig();
- mqttServerConfig.setPort(0);
- mqttServerConfig.setProcessorType(ProcessorType.KAFKA);
- KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();
- kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));
- mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);
- MqttServer mqttServer = new MqttServer(mqttServerConfig);
- new Thread(() -> {
- try {
- mqttServer.start();
- } catch (Exception e) {
- log.error("mqsar broker started exception ", e);
- }
- }).start();
- Thread.sleep(5000L);
- return mqttServer;
- }
- }
复制代码 kafka端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来- @Log4j2
- public class MqttKafkaPubSubTest {
- @Test
- public void pubSubTest() throws Exception {
- MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();
- String topic = UUID.randomUUID().toString();
- String content = "test-msg";
- String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
- String clientId = UUID.randomUUID().toString();
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName(UUID.randomUUID().toString());
- connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
- connOpts.setCleanSession(true);
- log.info("Mqtt connecting to broker");
- sampleClient.connect(connOpts);
- CompletableFuture<String> future = new CompletableFuture<>();
- log.info("Mqtt subscribing");
- sampleClient.subscribe(topic, (s, mqttMessage) -> {
- log.info("messageArrived");
- future.complete(mqttMessage.toString());
- });
- log.info("Mqtt subscribed");
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(1);
- log.info("Mqtt message publishing");
- sampleClient.publish(topic, message);
- log.info("Mqtt message published");
- TimeUnit.SECONDS.sleep(3);
- sampleClient.disconnect();
- String msg = future.get(5, TimeUnit.SECONDS);
- Assertions.assertEquals(content, msg);
- }
- }
复制代码 pulsar
我们可以通过embedded-pulsar-java这个项目来启动用做单元测试的pulsar broker。通过如下的group引入依赖- <dependency>
- <groupId>io.github.embedded-middleware</groupId>
- <artifactId>embedded-pulsar-core</artifactId>
- <version>0.0.2</version>
- <scope>test</scope>
- </dependency>
复制代码 我们就可以通过如下的代码启动基于pulsar的mqtt broker- @Slf4j
- public class MqttPulsarTestUtil {
- public static MqttServer setupMqttPulsar() throws Exception {
- EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();
- embeddedPulsarServer.start();
- MqttServerConfig mqttServerConfig = new MqttServerConfig();
- mqttServerConfig.setPort(0);
- mqttServerConfig.setProcessorType(ProcessorType.PULSAR);
- PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();
- pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));
- pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));
- mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);
- MqttServer mqttServer = new MqttServer(mqttServerConfig);
- new Thread(() -> {
- try {
- mqttServer.start();
- } catch (Exception e) {
- log.error("mqsar broker started exception ", e);
- }
- }).start();
- Thread.sleep(5000L);
- return mqttServer;
- }
- }
复制代码 pulsar端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来- @Log4j2
- public class MqttPulsarPubSubTest {
- @Test
- public void pubSubTest() throws Exception {
- MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();
- String topic = UUID.randomUUID().toString();
- String content = "test-msg";
- String broker = String.format("tcp://localhost:%d", mqttServer.getPort());
- String clientId = UUID.randomUUID().toString();
- MemoryPersistence persistence = new MemoryPersistence();
- MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
- MqttConnectOptions connOpts = new MqttConnectOptions();
- connOpts.setUserName(UUID.randomUUID().toString());
- connOpts.setPassword(UUID.randomUUID().toString().toCharArray());
- connOpts.setCleanSession(true);
- log.info("Mqtt connecting to broker");
- sampleClient.connect(connOpts);
- CompletableFuture<String> future = new CompletableFuture<>();
- log.info("Mqtt subscribing");
- sampleClient.subscribe(topic, (s, mqttMessage) -> {
- log.info("messageArrived");
- future.complete(mqttMessage.toString());
- });
- log.info("Mqtt subscribed");
- MqttMessage message = new MqttMessage(content.getBytes());
- message.setQos(1);
- log.info("Mqtt message publishing");
- sampleClient.publish(topic, message);
- log.info("Mqtt message published");
- TimeUnit.SECONDS.sleep(3);
- sampleClient.disconnect();
- String msg = future.get(5, TimeUnit.SECONDS);
- Assertions.assertEquals(content, msg);
- }
- }
复制代码 性能优化
public class EventLoopUtil { /** * @return an EventLoopGroup suitable for the current platform */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { if (Epoll.isAvailable()) { return new EpollEventLoopGroup(nThreads, threadFactory); } else { return new NioEventLoopGroup(nThreads, threadFactory); } } public static Class