ToB企服应用市场:ToB评测及商务社交产业平台

标题: 【RocketMQ】消息的存储 [打印本页]

作者: 反转基因福娃    时间: 2022-8-21 13:49
标题: 【RocketMQ】消息的存储
Broker对消息的处理

BrokerController初始化的过程中,调用registerProcessor方法注册了处理器,在注册处理器的代码中可以看到创建了处理消息发送的处理器对象SendMessageProcessor,然后将其注册到远程服务中:
  1. public class BrokerController {
  2.     // 初始化
  3.     public boolean initialize() throws CloneNotSupportedException {
  4.         // ...
  5.         // 注册处理器
  6.         this.registerProcessor();
  7.         // ...
  8.     }
  9.   
  10.     // 注册处理器
  11.     public void registerProcessor() {
  12.         /**
  13.          * 发送消息处理器
  14.          */
  15.         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
  16.         // ...
  17.         // 注册消息发送处理器
  18.         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
  19.         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
  20.         // 省略其他注册...
  21.     }
  22. }
复制代码
在Broker收到生产者的发送消息请求时,会进入到SendMessageProcessor的processRequest方法中处理请求,然后又会调用asyncProcessRequest异步处理消息,然后从请求中解析请求头数据,并判断是否是批量发送消息的请求,如果是批量发送消息调用asyncSendBatchMessage方法处理,否则调用asyncSendMessage方法处理单个消息:
  1. public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
  2.     // 处理请求
  3.     @Override
  4.     public RemotingCommand processRequest(ChannelHandlerContext ctx,
  5.                                           RemotingCommand request) throws RemotingCommandException {
  6.         RemotingCommand response = null;
  7.         try {
  8.             // 处理请求
  9.             response = asyncProcessRequest(ctx, request).get();
  10.         } catch (InterruptedException | ExecutionException e) {
  11.             log.error("process SendMessage error, request : " + request.toString(), e);
  12.         }
  13.         return response;
  14.     }
  15.   
  16.     // 异步处理请求
  17.     public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
  18.                                                                   RemotingCommand request) throws RemotingCommandException {
  19.         final SendMessageContext mqtraceContext;
  20.         switch (request.getCode()) {
  21.             case RequestCode.CONSUMER_SEND_MSG_BACK:
  22.                 return this.asyncConsumerSendMsgBack(ctx, request);
  23.             default:
  24.                 // 解析请求头
  25.                 SendMessageRequestHeader requestHeader = parseRequestHeader(request);
  26.                 // ...
  27.                 if (requestHeader.isBatch()) {
  28.                     // 批量消息发送处理
  29.                     return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
  30.                 } else {
  31.                     // 单个消息发送处理
  32.                     return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
  33.                 }
  34.         }
  35.     }
  36.   
  37.     // 单个消息发送处理
  38.     private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
  39.                                                                 SendMessageContext mqtraceContext,
  40.                                                                 SendMessageRequestHeader requestHeader) {
  41.         // ...
  42.         CompletableFuture<PutMessageResult> putMessageResult = null;
  43.         String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  44.         // 是否使用事务
  45.         if (transFlag != null && Boolean.parseBoolean(transFlag)) {
  46.             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  47.                 response.setCode(ResponseCode.NO_PERMISSION);
  48.                 response.setRemark(
  49.                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  50.                                 + "] sending transaction message is forbidden");
  51.                 return CompletableFuture.completedFuture(response);
  52.             }
  53.             // 事务处理
  54.             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
  55.         } else {
  56.             // 消息持久化
  57.             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
  58.         }
  59.         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
  60.     }
  61. }
复制代码
以单个消息的发送处理方法asyncSendMessage为例看一下消息的接收过程:
  1. public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
  2.     // 单个消息发送处理
  3.     private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
  4.                                                                 SendMessageContext mqtraceContext,
  5.                                                                 SendMessageRequestHeader requestHeader) {
  6.         // ...
  7.         // 创建MessageExtBrokerInner对象,之后使用这个对象来操纵消息
  8.         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  9.         // 设置主题
  10.         msgInner.setTopic(requestHeader.getTopic());
  11.         // 设置消息所在的队列ID
  12.         msgInner.setQueueId(queueIdInt);
  13.         if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
  14.             return CompletableFuture.completedFuture(response);
  15.         }
  16.         // 设置消息内容
  17.         msgInner.setBody(body);
  18.         msgInner.setFlag(requestHeader.getFlag());
  19.         // 设置属性
  20.         Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
  21.         MessageAccessor.setProperties(msgInner, origProps);
  22.         // 设置发送消息时间
  23.         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
  24.         // 设置发送消息的主机地址
  25.         msgInner.setBornHost(ctx.channel().remoteAddress());
  26.         // 设置存储消息的主机地址
  27.         msgInner.setStoreHost(this.getStoreHost());
  28.         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
  29.         String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
  30.         // 属性中添加集群名称
  31.         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
  32.         // 如果属性中包含PROPERTY_WAIT_STORE_MSG_OK
  33.         if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
  34.             String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
  35.             // 设置消息属性
  36.             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  37.             origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
  38.         } else {
  39.             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  40.         }
  41.         CompletableFuture<PutMessageResult> putMessageResult = null;
  42.         String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  43.         // 是否使用事务
  44.         if (transFlag != null && Boolean.parseBoolean(transFlag)) {
  45.             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
  46.                 response.setCode(ResponseCode.NO_PERMISSION);
  47.                 response.setRemark(
  48.                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
  49.                                 + "] sending transaction message is forbidden");
  50.                 return CompletableFuture.completedFuture(response);
  51.             }
  52.             // 事务处理
  53.             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
  54.         } else {
  55.             // 消息写入
  56.             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
  57.         }
  58.         // 返回消息持久化结果
  59.         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
  60.     }
  61. }
复制代码
MessageStore是一个接口,在BrokerController的初始化方法中可以看到,具体使用的是DefaultMessageStore:
  1. public class BrokerController {
  2.     private MessageStore messageStore;
  3.     public boolean initialize() throws CloneNotSupportedException {
  4.         boolean result = this.topicConfigManager.load();
  5.         // ...
  6.         if (result) {
  7.             try {
  8.                 // 创建DefaultMessageStore
  9.                 this.messageStore =
  10.                     new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
  11.                         this.brokerConfig);
  12.                 // ...
  13.             } catch (IOException e) {
  14.                 result = false;
  15.                 log.error("Failed to initialize", e);
  16.             }
  17.     }
  18.          
  19.     // 获取MessageStore
  20.     public MessageStore getMessageStore() {
  21.         return messageStore;
  22.     }
  23. }
复制代码
消息存储

DefaultMessageStore中有一个CommitLog类型的成员变量,在DefaultMessageStore中的构造函数中可以看到,如果启用了Dleger,使用的是DLedgerCommitLog,DLedgerCommitLog是CommitLog的子类,如果未启用Dleger,就使用CommitLog自己(接下来会以CommitLog为例)。
在DefaultMessageStore的asyncPutMessage方法中,首先进行了一系列的合法性校验,校验通过后会调用CommitLog的asyncPutMessage进行消息写入:
  1. public class DefaultMessageStore implements MessageStore {
  2.   
  3.    private final CommitLog commitLog; // CommitLog
  4.   
  5.    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
  6.         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
  7.         // ...
  8.         // 如果启用了Dleger
  9.         if (messageStoreConfig.isEnableDLegerCommitLog()) {
  10.             // 使用DLedgerCommitLog
  11.             this.commitLog = new DLedgerCommitLog(this);
  12.         } else {
  13.             // 否则使用CommitLog
  14.             this.commitLog = new CommitLog(this);
  15.         }
  16.         // ...
  17.     }
  18.    
  19.     @Override
  20.     public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
  21.         // 校验存储状态
  22.         PutMessageStatus checkStoreStatus = this.checkStoreStatus();
  23.         if (checkStoreStatus != PutMessageStatus.PUT_OK) {
  24.             return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
  25.         }
  26.         // 校验消息合法性
  27.         PutMessageStatus msgCheckStatus = this.checkMessage(msg);
  28.         if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
  29.             return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
  30.         }
  31.         // 进行一系列校验
  32.         PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
  33.         if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
  34.             return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
  35.         }
  36.         long beginTime = this.getSystemClock().now();
  37.         // 调用CommitLog的asyncPutMessage方法写入消息
  38.         CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
  39.         putResultFuture.thenAccept((result) -> {
  40.             long elapsedTime = this.getSystemClock().now() - beginTime;
  41.             if (elapsedTime > 500) {
  42.                 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
  43.             }
  44.             this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
  45.             if (null == result || !result.isOk()) {
  46.                 this.storeStatsService.getPutMessageFailedTimes().add(1);
  47.             }
  48.         });
  49.         return putResultFuture;
  50.     }
  51.    
  52. }
复制代码
合法性校验

Broker存储检查

checkStoreStatus主要对Broker是否可以写入消息进行检查,包含以下几个方面:
  1.    private PutMessageStatus checkStoreStatus() {
  2.         // 是否处于停止状态
  3.         if (this.shutdown) {
  4.             log.warn("message store has shutdown, so putMessage is forbidden");
  5.             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  6.         }
  7.         // 是否SLAVE角色
  8.         if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
  9.             long value = this.printTimes.getAndIncrement();
  10.             if ((value % 50000) == 0) {
  11.                 log.warn("broke role is slave, so putMessage is forbidden");
  12.             }
  13.             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  14.         }
  15.         // 是否可写
  16.         if (!this.runningFlags.isWriteable()) {
  17.             long value = this.printTimes.getAndIncrement();
  18.             if ((value % 50000) == 0) {
  19.                 log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
  20.                     "the broker's disk is full, write to logic queue error, write to index file error, etc");
  21.             }
  22.             return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  23.         } else {
  24.             this.printTimes.set(0);
  25.         }
  26.         // 操作系统是否处于PAGECACHE繁忙状态
  27.         if (this.isOSPageCacheBusy()) {
  28.             return PutMessageStatus.OS_PAGECACHE_BUSY;
  29.         }
  30.         return PutMessageStatus.PUT_OK;
  31.     }
复制代码
消息长度检查

checkMessage方法主要是对主题的长度校验和消息属性的长度校验:
  1.   private PutMessageStatus checkMessage(MessageExtBrokerInner msg) {
  2.         // 如果主题的长度大于最大值
  3.         if (msg.getTopic().length() > Byte.MAX_VALUE) {
  4.             log.warn("putMessage message topic length too long " + msg.getTopic().length());
  5.             return PutMessageStatus.MESSAGE_ILLEGAL;
  6.         }
  7.         // 如果消息属性长度大于最大值
  8.         if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
  9.             log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
  10.             return PutMessageStatus.MESSAGE_ILLEGAL;
  11.         }
  12.         return PutMessageStatus.PUT_OK;
  13.     }
复制代码
checkLmqMessage

checkLmqMessage主要判断在开启LMQ(Light Message Queue)时是否超过了最大消费数量:
  1.   private PutMessageStatus checkLmqMessage(MessageExtBrokerInner msg) {
  2.         // 如果消息属性不为空、存在PROPERTY_INNER_MULTI_DISPATCH属性、并且超过了最大消费数量
  3.         if (msg.getProperties() != null
  4.             && StringUtils.isNotBlank(msg.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))
  5.             && this.isLmqConsumeQueueNumExceeded()) {
  6.             return PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED;
  7.         }
  8.         return PutMessageStatus.PUT_OK;
  9.    }
  10.    private boolean isLmqConsumeQueueNumExceeded() {
  11.         // 开启了LMQ && 开启了多个队列分发 && 消费数量大于了限定值
  12.         if (this.getMessageStoreConfig().isEnableLmq() && this.getMessageStoreConfig().isEnableMultiDispatch()
  13.             && this.lmqConsumeQueueNum.get() > this.messageStoreConfig.getMaxLmqConsumeQueueNum()) {
  14.             return true;
  15.         }
  16.         return false;
  17.     }
复制代码
消息写入

对消息进行校验完毕之后,调用了CommitLog的asyncPutMessage进行消息写入,为了简单起见,这里我们先不考虑事务,处理流程如下:
  1. public class CommitLog {
  2.     // 所有mappedFile集合
  3.     protected final MappedFileQueue mappedFileQueue;
  4.    
  5.     // ThreadLocal
  6.     private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
  7.     // 写入消息的回调函数
  8.     private final AppendMessageCallback appendMessageCallback;
  9.     public CommitLog(final DefaultMessageStore defaultMessageStore) { // 构造函数
  10.         //...
  11.         // 创建回调函数
  12.         this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
  13.         //...
  14.     }
  15.   
  16.     public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
  17.         // 设置存储时间
  18.         msg.setStoreTimestamp(System.currentTimeMillis());
  19.         // 设置消息的CRC值
  20.         msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
  21.         // 写入结果
  22.         AppendMessageResult result = null;
  23.         // 获取存储统计服务
  24.         StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
  25.         // 获取主题
  26.         String topic = msg.getTopic();
  27.         // 获取事务类型
  28.         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
  29.         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
  30.                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
  31.             // 省略事务相关处理
  32.         }
  33.         // 获取发送消息的主机地址
  34.         InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
  35.         if (bornSocketAddress.getAddress() instanceof Inet6Address) { // 如果是IPV6
  36.             msg.setBornHostV6Flag(); // 设置IPV6标识
  37.         }
  38.         // 获取存储消息的主机地址
  39.         InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
  40.         if (storeSocketAddress.getAddress() instanceof Inet6Address) {
  41.             msg.setStoreHostAddressV6Flag(); // 设置IPV6标识
  42.         }
  43.         // 获取当前线程绑定的PutMessageThreadLocal对象
  44.         PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
  45.         // 调用encode方法对消息进行编码,并写入buffer
  46.         PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
  47.         if (encodeResult != null) {
  48.             return CompletableFuture.completedFuture(encodeResult);
  49.         }
  50.         // 将存储编码消息的buffer设置到msg中
  51.         msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
  52.         // 创建PutMessageContext
  53.         PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
  54.         long elapsedTimeInLock = 0;
  55.         MappedFile unlockMappedFile = null;
  56.         // 加锁
  57.         putMessageLock.lock();
  58.         try {
  59.             // 获取上一次写入的文件
  60.             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
  61.             // 获取系统时间戳
  62.             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
  63.             this.beginTimeInLock = beginLockTimestamp;
  64.             // 再次更新存储时间戳,保证全局顺序
  65.             msg.setStoreTimestamp(beginLockTimestamp);
  66.             // 如果mapppedFile为空或者已满,说明是第一次写入消息还没有创建文件或者上一次写入的文件已满,需要新建一个文件
  67.             if (null == mappedFile || mappedFile.isFull()) {
  68.                 // 使用偏移量0创建一个新的文件
  69.                 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
  70.             }
  71.             // 如果依旧为空
  72.             if (null == mappedFile) {
  73.                 // 提示错误
  74.                 log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
  75.                 return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
  76.             }
  77.             // 写入消息
  78.             result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
  79.             // ...
  80.             elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
  81.         } finally {
  82.             beginTimeInLock = 0;
  83.             putMessageLock.unlock();
  84.         }
  85.         // ...
  86.         PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
  87.         // 统计相关
  88.         storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
  89.         storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
  90.         // 执行刷盘
  91.         CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
  92.         CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
  93.         return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
  94.             if (flushStatus != PutMessageStatus.PUT_OK) {
  95.                 putMessageResult.setPutMessageStatus(flushStatus);
  96.             }
  97.             if (replicaStatus != PutMessageStatus.PUT_OK) {
  98.                 putMessageResult.setPutMessageStatus(replicaStatus);
  99.             }
  100.             // 返回结果
  101.             return putMessageResult;
  102.         });
  103.     }
  104. }
复制代码
写入内存Buffer

编码消息

MessageExtEncoder是CommitLog的一个内部类,它被CommitLog的另外一个内部类PutMessageThreadLocal所引用,ThreadLocal一般用于多线程环境下,为每个线程创建自己的副本变量,从而互不影响,PutMessageThreadLocal在构造函数中对MessageExtEncoder进行了实例化,并指定了创建缓冲区的大小:
  1. public class CommitLog {
  2.    
  3.     // ThreadLocal
  4.     private final ThreadLocal<PutMessageThreadLocal> putMessageThreadLocal;
  5.    
  6.     // 添加消息的ThreadLocal对象
  7.     static class PutMessageThreadLocal {
  8.         private MessageExtEncoder encoder; // 引用MessageExtEncoder
  9.         private StringBuilder keyBuilder;
  10.         PutMessageThreadLocal(int size) {
  11.             // 创建MessageExtEncoder,size用来指定分配内存的大小
  12.             encoder = new MessageExtEncoder(size);
  13.             keyBuilder = new StringBuilder();
  14.         }
  15.         // ...
  16.     }
  17. }
复制代码
MessageExtEncoder中使用了ByteBuffer作为消息内容存放的缓冲区,上面可知缓冲区的大小是在PutMessageThreadLocal的构造函数中指定的,MessageExtEncoder的encode方法中对消息进了编码并将数据写入分配的缓冲区
  1. public class CommitLog {
  2.    
  3.     // MessageExtEncoder
  4.     public static class MessageExtEncoder {
  5.         // 字节缓冲区,存储消息内容的buffer
  6.         private final ByteBuffer encoderBuffer;
  7.         
  8.         MessageExtEncoder(final int size) {
  9.             // 分配内存
  10.             this.encoderBuffer = ByteBuffer.allocateDirect(size);
  11.             this.maxMessageSize = size;
  12.         }
  13.         // 对消息进行编码并写入buffer
  14.         protected PutMessageResult encode(MessageExtBrokerInner msgInner) {
  15.             // 消息属性数据
  16.             final byte[] propertiesData =
  17.                     msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
  18.             // 属性数据长度
  19.             final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
  20.             // 校验长度是否超过最大值
  21.             if (propertiesLength > Short.MAX_VALUE) {
  22.                 log.warn("putMessage message properties length too long. length={}", propertiesData.length);
  23.                 return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
  24.             }
  25.             // 获取主题数据
  26.             final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
  27.             final int topicLength = topicData.length;// 主题数据长度
  28.             // 获取消息体内容长度
  29.             final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
  30.             // 总消息内容长度
  31.             final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);
  32.             // 是否超过最大长度限制
  33.             if (msgLen > this.maxMessageSize) {
  34.                 CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
  35.                         + ", maxMessageSize: " + this.maxMessageSize);
  36.                 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
  37.             }
  38.             // 初始化
  39.             this.resetByteBuffer(encoderBuffer, msgLen);
  40.             // 1 写入消息长度
  41.             this.encoderBuffer.putInt(msgLen);
  42.             // 2 写入魔数
  43.             this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
  44.             // 3 写入消息体CRC校验和
  45.             this.encoderBuffer.putInt(msgInner.getBodyCRC());
  46.             // 4 写入队列ID
  47.             this.encoderBuffer.putInt(msgInner.getQueueId());
  48.             // 5 写入标识
  49.             this.encoderBuffer.putInt(msgInner.getFlag());
  50.             // 6 队列的偏移量, 稍后写入
  51.             this.encoderBuffer.putLong(0);
  52.             // 7 文件的物理偏移量, 稍后写入
  53.             this.encoderBuffer.putLong(0);
  54.             // 8 写入系统标识
  55.             this.encoderBuffer.putInt(msgInner.getSysFlag());
  56.             // 9 写入发送消息的时间戳
  57.             this.encoderBuffer.putLong(msgInner.getBornTimestamp());
  58.             // 10 写入发送消息的主机地址
  59.             socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
  60.             // 11 写入存储时间戳
  61.             this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
  62.             // 12 写入存储消息的主机地址
  63.             socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
  64.             // 13 RECONSUMETIMES
  65.             this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
  66.             // 14 Prepared Transaction Offset
  67.             this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
  68.             // 15 写入消息体长度
  69.             this.encoderBuffer.putInt(bodyLength);
  70.             if (bodyLength > 0)
  71.                 this.encoderBuffer.put(msgInner.getBody());// 写入消息内容
  72.             // 16 写入主题长度
  73.             this.encoderBuffer.put((byte) topicLength);
  74.             // 写入主题
  75.             this.encoderBuffer.put(topicData);
  76.             // 17 写入属性长度
  77.             this.encoderBuffer.putShort((short) propertiesLength);
  78.             if (propertiesLength > 0)
  79.                 this.encoderBuffer.put(propertiesData); // 写入属性数据
  80.             encoderBuffer.flip();
  81.             return null;
  82.         }
  83.     }
  84. }
复制代码
写入内存映射文件

前面提到MappedFile可以看做是每一个Commitlog文件的映射,里面记录了文件的大小以及数据已经写入的位置,还有两个字节缓冲区ByteBuffer和MappedByteBuffer,它们的继承关系如下:

ByteBuffer:字节缓冲区,用于在内存中分配空间,可以在JVM堆中分配内存(HeapByteBuffer),也可以在堆外分配内存(DirectByteBuffer)。
MappedByteBuffer:是ByteBuffer的子类,它是将磁盘的文件内容映射到虚拟地址空间,通过虚拟地址访问物理内存中映射的文件内容,也叫文件映射,可以减少数据的拷贝。
MappedFile提供了两种方式来进行内容的写入,对应不同的init方法:
第一种通过ByteBuffer分配缓冲区并将内容写入缓冲区,并且使用了池化技术对内存进行管理,需要时进行申请,使用完毕后回收,类似于数据库连接池。
第二种是通过MappedByteBuffer,对CommitLog进行文件映射,然后进行消息写入。
  1. public class MappedFile extends ReferenceResource {
  2.     // 记录文件的写入位置
  3.     protected final AtomicInteger wrotePosition = new AtomicInteger(0);
  4.     // 文件大小
  5.     protected int fileSize;
  6.     // 字节buffer
  7.     protected ByteBuffer writeBuffer = null;
  8.     // 文件映射
  9.     private MappedByteBuffer mappedByteBuffer;
  10.     // 池化技术,类似线程池,只不过池中存放的是申请的内存
  11.     protected TransientStorePool transientStorePool = null;
  12.     // 初始化
  13.     public void init(final String fileName, final int fileSize,
  14.         final TransientStorePool transientStorePool) throws IOException {
  15.         init(fileName, fileSize);
  16.         // 使用池化技术,从池中获取一块内存
  17.         this.writeBuffer = transientStorePool.borrowBuffer();
  18.         this.transientStorePool = transientStorePool;
  19.     }
  20.    
  21.     // 初始化
  22.     private void init(final String fileName, final int fileSize) throws IOException {
  23.         // ...
  24.         try {
  25.             // 获取文件
  26.             this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
  27.             // 进行文件映射
  28.             this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
  29.             TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
  30.             TOTAL_MAPPED_FILES.incrementAndGet();
  31.             ok = true;
  32.         } catch (FileNotFoundException e) {
  33.             // ...
  34.         } catch (IOException e) {
  35.             // ...
  36.         } finally {
  37.             if (!ok && this.fileChannel != null) {
  38.                 this.fileChannel.close();
  39.             }
  40.         }
  41.     }
  42. }
复制代码
经过之前的步骤,消息内容已经写入到内存缓冲区中,并且也知道准备进行写入的CommitLog对应的映射文件,接下来就可以调用MappedFile的appendMessagesInner方法将内存中的内容写入映射文件,处理逻辑如下:
  1. public class MappedFile extends ReferenceResource {
  2.     // 记录文件的写入位置
  3.     protected final AtomicInteger wrotePosition = new AtomicInteger(0);
  4.     // 文件大小
  5.     protected int fileSize;
  6.     // 字节buffer
  7.     protected ByteBuffer writeBuffer = null;
  8.     // 文件映射
  9.     private MappedByteBuffer mappedByteBuffer;
  10.     // 写入消息
  11.     public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb,
  12.             PutMessageContext putMessageContext) {
  13.         // 调用appendMessagesInner
  14.         return appendMessagesInner(msg, cb, putMessageContext);
  15.     }
  16.    
  17.     public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
  18.             PutMessageContext putMessageContext) {
  19.         assert messageExt != null;
  20.         assert cb != null;
  21.         // 获取写入位置
  22.         int currentPos = this.wrotePosition.get();
  23.         // 如果写指针小于文件大小
  24.         if (currentPos < this.fileSize) {
  25.             // 如果writeBuffer不为空,使用writeBuffer的slice方法创建共享内存区,否则使用mappedByteBuffer
  26.             ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
  27.             // 设置共享内存区的写入位置
  28.             byteBuffer.position(currentPos);
  29.             AppendMessageResult result;
  30.             if (messageExt instanceof MessageExtBrokerInner) { // 单个消息处理
  31.                 // 通过共享内存区byteBuffer写入数据
  32.                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
  33.                         (MessageExtBrokerInner) messageExt, putMessageContext);
  34.             } else if (messageExt instanceof MessageExtBatch) { // 批量消息
  35.                 // 通过共享内存区byteBuffer写入数据
  36.                 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
  37.                         (MessageExtBatch) messageExt, putMessageContext);
  38.             } else {
  39.                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  40.             }
  41.             // 更新MappedFile的写入位置
  42.             this.wrotePosition.addAndGet(result.getWroteBytes());
  43.             this.storeTimestamp = result.getStoreTimestamp();
  44.             return result;
  45.         }
  46.         log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
  47.         return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  48.     }
  49. }
复制代码
进入到DefaultAppendMessageCallback的doAppend方法中,首先来看方法的入参:
方法的处理逻辑如下:
消息写入结果
  1. public class CommitLog {
  2.     class DefaultAppendMessageCallback implements AppendMessageCallback {
  3.         // 预留空间大小,8个字节
  4.         private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
  5.         public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
  6.             final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
  7.             // 计算写入位置物理偏移量:文件起始位置 + 准备写入位置的偏移量
  8.             long wroteOffset = fileFromOffset + byteBuffer.position();
  9.             Supplier<String> msgIdSupplier = () -> {
  10.                 int sysflag = msgInner.getSysFlag();
  11.                 int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
  12.                 ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
  13.                 MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
  14.                 msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
  15.                 msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
  16.                 return UtilAll.bytes2string(msgIdBuffer.array());
  17.             };
  18.             // 获取消息队列信息
  19.             String key = putMessageContext.getTopicQueueTableKey();
  20.             // 从主题队列路由表中获取队列偏移量
  21.             Long queueOffset = CommitLog.this.topicQueueTable.get(key);
  22.             // 如果偏移量为空
  23.             if (null == queueOffset) {
  24.                 queueOffset = 0L; // 初始化为0
  25.                 // 添加到路由表中
  26.                 CommitLog.this.topicQueueTable.put(key, queueOffset);
  27.             }
  28.             boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
  29.             if (!multiDispatchWrapResult) {
  30.                 return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  31.             }
  32.             // 如果开启事务需要特殊处理
  33.             final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
  34.             // ...
  35.             // 获取之前已经写入到buffer的消息数据
  36.             ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
  37.             // 获取数据长度
  38.             final int msgLen = preEncodeBuffer.getInt(0);
  39.             // 校验是否有足够的空间写入数据,如果消息长度 + 预留空间大小 大于最大值
  40.             if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
  41.                 this.msgStoreItemMemory.clear();
  42.                 // 1 设置文件大小
  43.                 this.msgStoreItemMemory.putInt(maxBlank);
  44.                 // 2 写入魔数
  45.                 this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
  46.                 // 开始时间
  47.                 final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
  48.                 // 将文件大小和魔数写入buffer
  49.                 byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
  50.                 // 返回写入结果,由于剩余空间不足以写入消息内容,这里返回类型为END_OF_FILE
  51.                 return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
  52.                         maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
  53.                         msgIdSupplier, msgInner.getStoreTimestamp(),
  54.                         queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
  55.             }
  56.             // 计算队列偏移量的位置
  57.             int pos = 4 + 4 + 4 + 4 + 4;
  58.             // 6 写入队列偏移量
  59.             preEncodeBuffer.putLong(pos, queueOffset);
  60.             pos += 8;
  61.             // 7 写入物理偏移量
  62.             preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
  63.             int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
  64.             // 8 系统标识, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
  65.             pos += 8 + 4 + 8 + ipLen; // 计算存储时间戳的写入位置
  66.             // 更新新存储时间戳
  67.             preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
  68.             final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
  69.             // 将preEncodeBuffer的数据写入byteBuffer
  70.             byteBuffer.put(preEncodeBuffer);
  71.             // 清空buffer
  72.             msgInner.setEncodedBuff(null);
  73.             // 设置返回结果
  74.             AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
  75.                 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
  76.             switch (tranType) {
  77.                 case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
  78.                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
  79.                     break;
  80.                 case MessageSysFlag.TRANSACTION_NOT_TYPE:
  81.                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
  82.                     // The next update ConsumeQueue information
  83.                     CommitLog.this.topicQueueTable.put(key, ++queueOffset);
  84.                     CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
  85.                     break;
  86.                 default:
  87.                     break;
  88.             }
  89.             return result;
  90.         }
  91.     }
  92. }
复制代码
刷盘

由于篇幅原因,刷盘机制将另写一篇文章。
总结

参考
丁威、周继锋《RocketMQ技术内幕》
https://github.com/apache/rocketmq/blob/develop/docs/cn/Example_LMQ.md
RocketMQ版本:4.9.3

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4