【RocketMQ】消息的消费

打印 上一主题 下一主题

主题 874|帖子 874|积分 2622

上一讲【RocketMQ】消息的拉取
消息消费

当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyService的submitConsumeRequest方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:

  • 如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可
  • 如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费
  • 如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交
RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。
  1. public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
  2.     @Override
  3.     public void submitConsumeRequest(
  4.         final List<MessageExt> msgs,
  5.         final ProcessQueue processQueue,
  6.         final MessageQueue messageQueue,
  7.         final boolean dispatchToConsume) {
  8.         final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
  9.         // 如果消息的个数小于等于批量消费的大小
  10.         if (msgs.size() <= consumeBatchSize) {
  11.             // 构建消费请求
  12.             ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
  13.             try {
  14.                 // 加入到消费线程池中
  15.                 this.consumeExecutor.submit(consumeRequest);
  16.             } catch (RejectedExecutionException e) {
  17.                 this.submitConsumeRequestLater(consumeRequest);
  18.             }
  19.         } else {
  20.             // 遍历消息
  21.             for (int total = 0; total < msgs.size(); ) {
  22.                 // 创建消息列表,大小为consumeBatchSize,用于批量提交使用
  23.                 List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
  24.                 for (int i = 0; i < consumeBatchSize; i++, total++) {
  25.                     if (total < msgs.size()) {
  26.                         // 加入到消息列表中
  27.                         msgThis.add(msgs.get(total));
  28.                     } else {
  29.                         break;
  30.                     }
  31.                 }
  32.                 // 创建ConsumeRequest
  33.                 ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
  34.                 try {
  35.                     // 加入到消费线程池中
  36.                     this.consumeExecutor.submit(consumeRequest);
  37.                 } catch (RejectedExecutionException e) {
  38.                     for (; total < msgs.size(); total++) {
  39.                         msgThis.add(msgs.get(total));
  40.                     }
  41.                     // 如果出现拒绝提交异常,延迟进行提交
  42.                     this.submitConsumeRequestLater(consumeRequest);
  43.                 }
  44.             }
  45.         }
  46.     }
  47. }
复制代码
处理消费结果

一、设置ackIndex
ackIndex的值用来判断失败消息的个数,在processConsumeResult方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:

  • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
  • RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。
二、处理消费失败的消息
广播模式
广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。
集群模式
开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

  • 消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
  • 延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。
三、移除消息,更新拉取偏移量
以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。
  1. public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
  2.     class ConsumeRequest implements Runnable {
  3.         private final List<MessageExt> msgs;
  4.         private final ProcessQueue processQueue; // 处理队列
  5.         private final MessageQueue messageQueue; // 消息队列
  6.       
  7.         @Override
  8.         public void run() {
  9.             // 如果处理队列已被删除
  10.             if (this.processQueue.isDropped()) {
  11.                 log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  12.                 return;
  13.             }
  14.             // 获取消息监听器
  15.             MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
  16.             ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
  17.             ConsumeConcurrentlyStatus status = null;
  18.             // 重置消息重试主题名称
  19.             defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
  20.             ConsumeMessageContext consumeMessageContext = null;
  21.             // 如果设置了钩子函数
  22.             if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  23.                 // ...
  24. // 执行钩子函数            
  25.               ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
  26.             }
  27.             long beginTimestamp = System.currentTimeMillis();
  28.             boolean hasException = false;
  29.             ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  30.             try {
  31.                 if (msgs != null && !msgs.isEmpty()) {
  32.                     for (MessageExt msg : msgs) {
  33.                         // 设置消费开始时间戳
  34.                         MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
  35.                     }
  36.                 }
  37.                 // 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态
  38.                 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  39.             } catch (Throwable e) {
  40.                 log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
  41.                     RemotingHelper.exceptionSimpleDesc(e),
  42.                     ConsumeMessageConcurrentlyService.this.consumerGroup,
  43.                     msgs,
  44.                     messageQueue), e);
  45.                 hasException = true;
  46.             }
  47.             // 计算消费时长
  48.             long consumeRT = System.currentTimeMillis() - beginTimestamp;
  49.             if (null == status) {
  50.                 if (hasException) {
  51.                     // 出现异常
  52.                     returnType = ConsumeReturnType.EXCEPTION;
  53.                 } else {
  54.                     // 返回NULL
  55.                     returnType = ConsumeReturnType.RETURNNULL;
  56.                 }
  57.             } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时
  58.                 returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时
  59.             } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费
  60.                 returnType = ConsumeReturnType.FAILED; // 返回类置为失败
  61.             } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态
  62.                 returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功
  63.             }
  64.             // ...
  65.             // 如果消费状态为空
  66.             if (null == status) {
  67.                 log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
  68.                     ConsumeMessageConcurrentlyService.this.consumerGroup,
  69.                     msgs,
  70.                     messageQueue);
  71.                 // 状态置为延迟消费
  72.                 status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
  73.             }
  74.             // 如果设置了钩子函数
  75.             if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  76.                 consumeMessageContext.setStatus(status.toString());
  77.                 consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
  78.                 // 执行executeHookAfter方法
  79.                 ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
  80.             }
  81.             ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
  82.                 .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
  83.             if (!processQueue.isDropped()) {
  84.                 // 处理消费结果
  85.                 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  86.             } else {
  87.                 log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  88.             }
  89.         }
  90.     }
  91. }
  92. // 重置消息重试主题
  93. public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  94.    public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
  95.         // 获取消费组的重试主题:%RETRY% + 消费组名称
  96.         final String groupTopic = MixAll.getRetryTopic(consumerGroup);
  97.         for (MessageExt msg : msgs) {
  98.             // 获取消息的重试主题名称
  99.             String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
  100.             // 如果重试主题不为空并且与消费组的重试主题一致
  101.             if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
  102.                 // 设置重试主题
  103.                 msg.setTopic(retryTopic);
  104.             }
  105.             if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
  106.                 msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
  107.             }
  108.         }
  109.     }
  110.   
  111. }
  112. // 消费结果状态
  113. public enum ConsumeConcurrentlyStatus {
  114.     /**
  115.      * 消费成功
  116.      */
  117.     CONSUME_SUCCESS,
  118.     /**
  119.      * 消费失败,延迟进行消费
  120.      */
  121.     RECONSUME_LATER;
  122. }
复制代码
发送CONSUMER_SEND_MSG_BACK消息

延迟级别

RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfig的messageDelayLevel变量中:
  1. public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
  2.     public void processConsumeResult(
  3.         final ConsumeConcurrentlyStatus status,
  4.         final ConsumeConcurrentlyContext context,
  5.         final ConsumeRequest consumeRequest
  6.     ) {
  7.         // 获取ackIndex
  8.         int ackIndex = context.getAckIndex();
  9.         if (consumeRequest.getMsgs().isEmpty())
  10.             return;
  11.         switch (status) {
  12.             case CONSUME_SUCCESS: // 如果消费成功
  13.                 // 如果ackIndex大于等于消息的大小
  14.                 if (ackIndex >= consumeRequest.getMsgs().size()) {
  15.                     // 设置为消息大小-1
  16.                     ackIndex = consumeRequest.getMsgs().size() - 1;
  17.                 }
  18.                 // 计算消费成功的的个数
  19.                 int ok = ackIndex + 1;
  20.                 // 计算消费失败的个数
  21.                 int failed = consumeRequest.getMsgs().size() - ok;
  22.                 this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
  23.                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
  24.                 break;
  25.             case RECONSUME_LATER: // 如果延迟消费
  26.                 // ackIndex置为-1
  27.                 ackIndex = -1;
  28.                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
  29.                     consumeRequest.getMsgs().size());
  30.                 break;
  31.             default:
  32.                 break;
  33.         }
  34.         // 判断消费模式
  35.         switch (this.defaultMQPushConsumer.getMessageModel()) {
  36.             case BROADCASTING: // 广播模式
  37.                 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  38.                     MessageExt msg = consumeRequest.getMsgs().get(i);
  39.                     log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
  40.                 }
  41.                 break;
  42.             case CLUSTERING: // 集群模式
  43.                 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
  44.                 // 遍历消费失败的消息
  45.                 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  46.                     // 获取消息
  47.                     MessageExt msg = consumeRequest.getMsgs().get(i);
  48.                     // 向Broker发送延迟消息
  49.                     boolean result = this.sendMessageBack(msg, context);
  50.                     // 如果发送失败
  51.                     if (!result) {
  52.                         // 消费次数+1
  53.                         msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
  54.                         // 加入失败消息列表中
  55.                         msgBackFailed.add(msg);
  56.                     }
  57.                 }
  58.                 // 如果不为空
  59.                 if (!msgBackFailed.isEmpty()) {
  60.                     consumeRequest.getMsgs().removeAll(msgBackFailed);
  61.                     // 稍后重新进行消费
  62.                     this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
  63.                 }
  64.                 break;
  65.             default:
  66.                 break;
  67.         }
  68.         // 从处理队列中移除消息
  69.         long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
  70.         if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  71.             // 更新拉取偏移量
  72.             this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
  73.         }
  74.     }
  75. }
复制代码
延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 --->  延迟时间5s
延迟级别2 --->  延迟时间10s
...
以此类推,最大的延迟时间为2h
在sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImpl的sendMessageBack发送消息:
  1. public class MessageStoreConfig {
  2.     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  3. }
复制代码
DefaultMQPushConsumerImp的sendMessageBack方法中又调用了MQClientAPIImpl的consumerSendMessageBack方法进行发送:
  1. public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
  2.    public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
  3.         // 获取延迟级别
  4.         int delayLevel = context.getDelayLevelWhenNextConsume();
  5.         // 对主题添加上Namespace
  6.         msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
  7.         try {
  8.             // 向Broker发送消息
  9.             this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
  10.             return true;
  11.         } catch (Exception e) {
  12.             log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
  13.         }
  14.         return false;
  15.     }
  16. }
  17. // 并发消费上下文
  18. public class ConsumeConcurrentlyContext {
  19.     /**
  20.      * -1,不进行重试,加入DLQ队列
  21.      * 0, Broker控制重试频率
  22.      * >0, 客户端控制
  23.      */
  24.     private int delayLevelWhenNextConsume = 0; // 默认为0
  25. }
复制代码
在MQClientAPIImpl的consumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:
  1. public class DefaultMQPushConsumerImpl implements MQConsumerInner {
  2.     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
  3.         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
  4.         try {
  5.             // 获取Broker地址
  6.             String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
  7.                 : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
  8.             // 调用consumerSendMessageBack方法发送消息
  9.             this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
  10.                 this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
  11.         } catch (Exception e) {
  12.             // ...
  13.         } finally {
  14.             msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
  15.         }
  16.     }
  17. }
复制代码
Broker对请求的处理

Broker对CONSUMER_SEND_MSG_BACK类型的请求在SendMessageProcessor中,处理逻辑如下:

  • 根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
  • 获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息
  • 根据消息的物理偏移量从commitlog中获取消息
  • 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0

    • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
    • 如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别

  • 新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:

    • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
    • 未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费

  • 调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】
  1. public class MQClientAPIImpl {
  2.     public void consumerSendMessageBack(
  3.         final String addr,
  4.         final MessageExt msg,
  5.         final String consumerGroup,
  6.         final int delayLevel,
  7.         final long timeoutMillis,
  8.         final int maxConsumeRetryTimes
  9.     ) throws RemotingException, MQBrokerException, InterruptedException {
  10.         // 创建请求头
  11.         ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
  12.         // 设置请求类型为CONSUMER_SEND_MSG_BACK
  13.         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
  14.         // 设置消费组
  15.         requestHeader.setGroup(consumerGroup);
  16.         requestHeader.setOriginTopic(msg.getTopic());
  17.         // 设置消息物理偏移量
  18.         requestHeader.setOffset(msg.getCommitLogOffset());
  19.         // 设置延迟级别
  20.         requestHeader.setDelayLevel(delayLevel);
  21.         // 设置消息ID
  22.         requestHeader.setOriginMsgId(msg.getMsgId());
  23.         // 设置最大消费次数
  24.         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
  25.         // 向Broker发送请求
  26.         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
  27.             request, timeoutMillis);
  28.         assert response != null;
  29.         switch (response.getCode()) {
  30.             case ResponseCode.SUCCESS: {
  31.                 return;
  32.             }
  33.             default:
  34.                 break;
  35.         }
  36.         throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
  37.     }
  38. }
复制代码
延迟消息处理

【消息的存储】文章可知,消息添加会进入到asyncPutMessage方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

  • 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别
  • 获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:
    1. public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    2.     // 处理请求
    3.     public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
    4.                                                                   RemotingCommand request) throws RemotingCommandException {
    5.         final SendMessageContext mqtraceContext;
    6.         switch (request.getCode()) {
    7.             case RequestCode.CONSUMER_SEND_MSG_BACK:
    8.                 // 处理请求
    9.                 return this.asyncConsumerSendMsgBack(ctx, request);
    10.             default:
    11.                 // ...
    12.         }
    13.     }
    14.   
    15.     private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
    16.                                                                         RemotingCommand request) throws RemotingCommandException {
    17.         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    18.         final ConsumerSendMsgBackRequestHeader requestHeader =
    19.                 (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
    20.         // ...
    21.         // 根据消费组获取订阅信息配置
    22.         SubscriptionGroupConfig subscriptionGroupConfig =
    23.             this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
    24.         // 如果为空,直接返回
    25.         if (null == subscriptionGroupConfig) {
    26.             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    27.             response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
    28.                 + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
    29.             return CompletableFuture.completedFuture(response);
    30.         }
    31.         // ...
    32.    
    33.         // 获取消费组的重试主题
    34.         String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    35.         // 从重试队列中随机选取一个队列
    36.         int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
    37.         int topicSysFlag = 0;
    38.         if (requestHeader.isUnitMode()) {
    39.             topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    40.         }
    41.         // 创建TopicConfig主题配置信息
    42.         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
    43.             newTopic,
    44.             subscriptionGroupConfig.getRetryQueueNums(),
    45.             PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    46.         //...
    47.    
    48.         // 根据消息物理偏移量从commitLog文件中获取消息
    49.         MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    50.         if (null == msgExt) {
    51.             response.setCode(ResponseCode.SYSTEM_ERROR);
    52.             response.setRemark("look message by offset failed, " + requestHeader.getOffset());
    53.             return CompletableFuture.completedFuture(response);
    54.         }
    55.         // 获取消息的重试主题
    56.         final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    57.         if (null == retryTopic) {
    58.             MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    59.         }
    60.         msgExt.setWaitStoreMsgOK(false);
    61.         // 延迟等级获取
    62.         int delayLevel = requestHeader.getDelayLevel();
    63.         // 获取最大消费重试次数
    64.         int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    65.         if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    66.             Integer times = requestHeader.getMaxReconsumeTimes();
    67.             if (times != null) {
    68.                 maxReconsumeTimes = times;
    69.             }
    70.         }
    71.         // 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
    72.         if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
    73.             || delayLevel < 0) {
    74.             // 获取DLQ主题
    75.             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    76.             // 选取一个队列
    77.             queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
    78.             // 创建DLQ的topicConfig
    79.             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
    80.                     DLQ_NUMS_PER_GROUP,
    81.                     PermName.PERM_WRITE | PermName.PERM_READ, 0);
    82.             // ...
    83.         } else {
    84.              // 如果延迟级别为0
    85.             if (0 == delayLevel) {
    86.                 // 更新延迟级别
    87.                 delayLevel = 3 + msgExt.getReconsumeTimes();
    88.             }
    89.             // 设置延迟级别
    90.             msgExt.setDelayTimeLevel(delayLevel);
    91.         }
    92.         // 新建消息
    93.         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    94.         msgInner.setTopic(newTopic); // 设置主题
    95.         msgInner.setBody(msgExt.getBody()); // 设置消息
    96.         msgInner.setFlag(msgExt.getFlag());
    97.         MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性
    98.         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    99.         msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
    100.         msgInner.setQueueId(queueIdInt); // 设置队列ID
    101.         msgInner.setSysFlag(msgExt.getSysFlag());
    102.         msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    103.         msgInner.setBornHost(msgExt.getBornHost());
    104.         msgInner.setStoreHost(msgExt.getStoreHost());
    105.         msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数
    106.         // 原始的消息ID
    107.         String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    108.         // 设置消息ID
    109.         MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    110.         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    111.         // 添加重试消息
    112.         CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    113.         return putMessageResult.thenApply((r) -> {
    114.             if (r != null) {
    115.                 switch (r.getPutMessageStatus()) {
    116.                     case PUT_OK:
    117.                         // ...
    118.                         return response;
    119.                     default:
    120.                         break;
    121.                 }
    122.                 response.setCode(ResponseCode.SYSTEM_ERROR);
    123.                 response.setRemark(r.getPutMessageStatus().name());
    124.                 return response;
    125.             }
    126.             response.setCode(ResponseCode.SYSTEM_ERROR);
    127.             response.setRemark("putMessageResult is null");
    128.             return response;
    129.         });
    130.     }
    131. }
    复制代码
  • 根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中
  • 备份之前的TOPIC和队列ID
  • 更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理
  1. public class TopicValidator {
  2.     // ...
  3.     public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
  4. }
复制代码
拉取进度持久化

RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。
广播模式

更新进度

LocalFileOffsetStore中使用了一个ConcurrentMap类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中
  1. public class CommitLog {
  2.     public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
  3.         // ...
  4.         // 获取事务类型
  5.         final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
  6.         // 如果未使用事务或者提交事务
  7.         if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
  8.                 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
  9.             // 判断延迟级别
  10.             if (msg.getDelayTimeLevel() > 0) {
  11.                 // 如果超过了最大延迟级别
  12.                 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
  13.                     msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
  14.                 }
  15.                 // 获取RMQ_SYS_SCHEDULE_TOPIC
  16.                 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
  17.                 // 根据延迟级别选取对应的队列
  18.                 int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
  19.                 // 备份之前的TOPIC和队列ID
  20.                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
  21.                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
  22.                 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
  23.                 // 设置SCHEDULE_TOPIC
  24.                 msg.setTopic(topic);
  25.                 msg.setQueueId(queueId);
  26.             }
  27.         }
  28.         // ...
  29.     }
  30. }
复制代码
加载进度

由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。
LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径
  1. public class LocalFileOffsetStore implements OffsetStore {
  2.     // offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量
  3.     private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
  4.         new ConcurrentHashMap<MessageQueue, AtomicLong>();
  5.   
  6.     @Override
  7.     public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  8.         if (mq != null) {
  9.             // 获取之前的拉取进度
  10.             AtomicLong offsetOld = this.offsetTable.get(mq);
  11.             if (null == offsetOld) {
  12.                 // 如果之前不存在,进行创建
  13.                 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
  14.             }
  15.             // 如果不为空
  16.             if (null != offsetOld) {
  17.                 if (increaseOnly) {
  18.                     MixAll.compareAndIncreaseOnly(offsetOld, offset);
  19.                 } else {
  20.                     // 更新拉取偏移量
  21.                     offsetOld.set(offset);
  22.                 }
  23.             }
  24.         }
  25.     }
  26. }
复制代码
在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:
  1. public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
  2.         "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
复制代码
在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:
  1. this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +
  2.             this.groupName + File.separator + "offsets.json";
复制代码
OffsetSerializeWrapper
OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapper的offsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStore的offsetTable中:
  1. public class LocalFileOffsetStore implements OffsetStore {
  2.    
  3.     // 文件路径
  4.     public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
  5.         "rocketmq.client.localOffsetStoreDir",
  6.         System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
  7.     private final String storePath;
  8.     // ...
  9.    
  10.     public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
  11.         this.mQClientFactory = mQClientFactory;
  12.         this.groupName = groupName;
  13.         // 设置拉取进度文件的路径
  14.         this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
  15.             this.mQClientFactory.getClientId() + File.separator +
  16.             this.groupName + File.separator +
  17.             "offsets.json";
  18.     }
  19.     @Override
  20.     public void load() throws MQClientException {
  21.         // 从本地读取拉取偏移量
  22.         OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
  23.         if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
  24.             // 加入到offsetTable中
  25.             offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
  26.             for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
  27.                 AtomicLong offset = mqEntry.getValue();
  28.                 log.info("load consumer's offset, {} {} {}",
  29.                         this.groupName,
  30.                         mqEntry.getKey(),
  31.                         offset.get());
  32.             }
  33.         }
  34.     }
  35.   
  36.     // 从本地加载文件
  37.     private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
  38.         String content = null;
  39.         try {
  40.             // 读取文件
  41.             content = MixAll.file2String(this.storePath);
  42.         } catch (IOException e) {
  43.             log.warn("Load local offset store file exception", e);
  44.         }
  45.         if (null == content || content.length() == 0) {
  46.             return this.readLocalOffsetBak();
  47.         } else {
  48.             OffsetSerializeWrapper offsetSerializeWrapper = null;
  49.             try {
  50.                 // 序列化
  51.                 offsetSerializeWrapper =
  52.                     OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
  53.             } catch (Exception e) {
  54.                 log.warn("readLocalOffset Exception, and try to correct", e);
  55.                 return this.readLocalOffsetBak();
  56.             }
  57.             return offsetSerializeWrapper;
  58.         }
  59.     }
  60. }
复制代码
持久化进度

updateOffset更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:

  • 创建OffsetSerializeWrapper对象
  • 遍历LocalFileOffsetStore的offsetTable,将数据加入到OffsetSerializeWrapper的OffsetTable中
  • 将OffsetSerializeWrapper转为JSON
  • 调用string2File方法将JSON数据保存到磁盘文件
  1. public class OffsetSerializeWrapper extends RemotingSerializable {
  2.     private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
  3.         new ConcurrentHashMap<MessageQueue, AtomicLong>();
  4.     public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
  5.         return offsetTable;
  6.     }
  7.     public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
  8.         this.offsetTable = offsetTable;
  9.     }
  10. }
复制代码
集群模式

集群模式下消费进度保存在Broker端。
更新进度

集群模式下的更新进度与广播模式下的更新类型,都是只更新了offsetTable中的数据:
  1. public class LocalFileOffsetStore implements OffsetStore {
  2.     @Override
  3.     public void persistAll(Set<MessageQueue> mqs) {
  4.         if (null == mqs || mqs.isEmpty())
  5.             return;OffsetSerializeWrapper
  6.         // 创建
  7.         OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
  8.         // 遍历offsetTable
  9.         for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
  10.             if (mqs.contains(entry.getKey())) {
  11.                 // 获取拉取偏移量
  12.                 AtomicLong offset = entry.getValue();
  13.                 // 加入到OffsetSerializeWrapper的OffsetTable中
  14.                 offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
  15.             }
  16.         }
  17.         // 将对象转为JSON
  18.         String jsonString = offsetSerializeWrapper.toJson(true);
  19.         if (jsonString != null) {
  20.             try {
  21.                 // 将JSON数据保存到磁盘文件
  22.                 MixAll.string2File(jsonString, this.storePath);
  23.             } catch (IOException e) {
  24.                 log.error("persistAll consumer offset Exception, " + this.storePath, e);
  25.             }
  26.         }
  27.     }
  28. }
复制代码
加载

集群模式下加载消费进度需要从Broker获取,在消费者发送消息拉取请求的时候,Broker会计算消费偏移量,所以RemoteBrokerOffsetStore的load方法为空,什么也没有干:
  1. public class RemoteBrokerOffsetStore implements OffsetStore {
  2.    
  3.     private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
  4.         new ConcurrentHashMap<MessageQueue, AtomicLong>();
  5.     @Override
  6.     public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  7.         if (mq != null) {
  8.             // 获取消息队列的进度
  9.             AtomicLong offsetOld = this.offsetTable.get(mq);
  10.             if (null == offsetOld) {
  11.                 // 将消费进度保存在offsetTable中
  12.                 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
  13.             }
  14.             if (null != offsetOld) {
  15.                 if (increaseOnly) {
  16.                     MixAll.compareAndIncreaseOnly(offsetOld, offset);
  17.                 } else {
  18.                     // 更新拉取偏移量
  19.                     offsetOld.set(offset);
  20.                 }
  21.             }
  22.         }
  23.     }
  24. }
复制代码
持久化

由于集群模式下消费进度保存在Broker端,所以persistAll方法中调用了updateConsumeOffsetToBroker向Broker发送请求进行消费进度保存:
  1. public class RemoteBrokerOffsetStore implements OffsetStore {
  2.     @Override
  3.     public void load() {
  4.     }
  5. }
复制代码
持久化的触发

MQClientInstance在启动定时任务的方法startScheduledTask中注册了定时任务,定时调用persistAllConsumerOffset对拉取进度进行持久化,persistAllConsumerOffset中又调用了MQConsumerInner的persistConsumerOffset方法:
  1. public class RemoteBrokerOffsetStore implements OffsetStore {
  2.     @Override
  3.     public void persistAll(Set<MessageQueue> mqs) {
  4.         if (null == mqs || mqs.isEmpty())
  5.             return;
  6.         final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
  7.         for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
  8.             MessageQueue mq = entry.getKey();
  9.             AtomicLong offset = entry.getValue();
  10.             if (offset != null) {
  11.                 if (mqs.contains(mq)) {
  12.                     try {
  13.                         // 向Broker发送请求更新拉取偏移量
  14.                         this.updateConsumeOffsetToBroker(mq, offset.get());
  15.                         log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
  16.                             this.groupName,
  17.                             this.mQClientFactory.getClientId(),
  18.                             mq,
  19.                             offset.get());
  20.                     } catch (Exception e) {
  21.                         log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
  22.                     }
  23.                 } else {
  24.                     unusedMQ.add(mq);
  25.                 }
  26.             }
  27.         }
  28.         // ...
  29.     }
  30. }
复制代码
DefaultMQPushConsumerImpl是MQConsumerInner的一个子类,以它为例可以看到在persistConsumerOffset方法中调用了offsetStore的persistAll方法进行持久化:
  1. public class MQClientInstance {
  2.     private void startScheduledTask() {
  3.         // ...
  4.         // 注册定时任务,定时持久化拉取进度
  5.         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  6.             @Override
  7.             public void run() {
  8.                 try {
  9.                     // 持久化
  10.                     MQClientInstance.this.persistAllConsumerOffset();
  11.                 } catch (Exception e) {
  12.                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
  13.                 }
  14.             }
  15.         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  16.         // ...
  17.     }
  18.    
  19.     private void persistAllConsumerOffset() {
  20.         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
  21.         while (it.hasNext()) {
  22.             Entry<String, MQConsumerInner> entry = it.next();
  23.             MQConsumerInner impl = entry.getValue();
  24.             // 调用persistConsumerOffset进行持久化
  25.             impl.persistConsumerOffset();
  26.         }
  27.     }
  28. }
复制代码
总结

参考
丁威、周继锋《RocketMQ技术内幕》
RocketMQ版本:4.9.3

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

忿忿的泥巴坨

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表