- 如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可
- 如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费
- 如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交
RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。- public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
- @Override
- public void submitConsumeRequest(
- final List<MessageExt> msgs,
- final ProcessQueue processQueue,
- final MessageQueue messageQueue,
- final boolean dispatchToConsume) {
- final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
- // 如果消息的个数小于等于批量消费的大小
- if (msgs.size() <= consumeBatchSize) {
- // 构建消费请求
- ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
- try {
- // 加入到消费线程池中
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- this.submitConsumeRequestLater(consumeRequest);
- }
- } else {
- // 遍历消息
- for (int total = 0; total < msgs.size(); ) {
- // 创建消息列表,大小为consumeBatchSize,用于批量提交使用
- List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
- for (int i = 0; i < consumeBatchSize; i++, total++) {
- if (total < msgs.size()) {
- // 加入到消息列表中
- msgThis.add(msgs.get(total));
- } else {
- break;
- }
- }
- // 创建ConsumeRequest
- ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
- try {
- // 加入到消费线程池中
- this.consumeExecutor.submit(consumeRequest);
- } catch (RejectedExecutionException e) {
- for (; total < msgs.size(); total++) {
- msgThis.add(msgs.get(total));
- }
- // 如果出现拒绝提交异常,延迟进行提交
- this.submitConsumeRequestLater(consumeRequest);
- }
- }
- }
- }
- }
复制代码 处理消费结果
- CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
- RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。
开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:
- 消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
- 延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。
以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。- public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
- class ConsumeRequest implements Runnable {
- private final List<MessageExt> msgs;
- private final ProcessQueue processQueue; // 处理队列
- private final MessageQueue messageQueue; // 消息队列
- @Override
- public void run() {
- // 如果处理队列已被删除
- if (this.processQueue.isDropped()) {
- log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
- return;
- }
- // 获取消息监听器
- MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
- ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
- ConsumeConcurrentlyStatus status = null;
- // 重置消息重试主题名称
- defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
- ConsumeMessageContext consumeMessageContext = null;
- // 如果设置了钩子函数
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- // ...
- // 执行钩子函数
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
- }
- long beginTimestamp = System.currentTimeMillis();
- boolean hasException = false;
- ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
- try {
- if (msgs != null && !msgs.isEmpty()) {
- for (MessageExt msg : msgs) {
- // 设置消费开始时间戳
- MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
- }
- }
- // 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态
- status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
- } catch (Throwable e) {
- log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
- RemotingHelper.exceptionSimpleDesc(e),
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue), e);
- hasException = true;
- }
- // 计算消费时长
- long consumeRT = System.currentTimeMillis() - beginTimestamp;
- if (null == status) {
- if (hasException) {
- // 出现异常
- returnType = ConsumeReturnType.EXCEPTION;
- } else {
- // 返回NULL
- returnType = ConsumeReturnType.RETURNNULL;
- }
- } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时
- returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时
- } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费
- returnType = ConsumeReturnType.FAILED; // 返回类置为失败
- } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态
- returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功
- }
- // ...
- // 如果消费状态为空
- if (null == status) {
- log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
- ConsumeMessageConcurrentlyService.this.consumerGroup,
- msgs,
- messageQueue);
- // 状态置为延迟消费
- status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
- }
- // 如果设置了钩子函数
- if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
- consumeMessageContext.setStatus(status.toString());
- consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
- // 执行executeHookAfter方法
- ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
- }
- ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
- .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
- if (!processQueue.isDropped()) {
- // 处理消费结果
- ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
- } else {
- log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
- }
- }
- }
- }
- // 重置消息重试主题
- public class DefaultMQPushConsumerImpl implements MQConsumerInner {
- public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
- // 获取消费组的重试主题:%RETRY% + 消费组名称
- final String groupTopic = MixAll.getRetryTopic(consumerGroup);
- for (MessageExt msg : msgs) {
- // 获取消息的重试主题名称
- String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- // 如果重试主题不为空并且与消费组的重试主题一致
- if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
- // 设置重试主题
- msg.setTopic(retryTopic);
- }
- if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
- msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
- }
- }
- }
- }
- // 消费结果状态
- public enum ConsumeConcurrentlyStatus {
- /**
- * 消费成功
- */
- /**
- * 消费失败,延迟进行消费
- */
- }
RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfig的messageDelayLevel变量中:- public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
- public void processConsumeResult(
- final ConsumeConcurrentlyStatus status,
- final ConsumeConcurrentlyContext context,
- final ConsumeRequest consumeRequest
- ) {
- // 获取ackIndex
- int ackIndex = context.getAckIndex();
- if (consumeRequest.getMsgs().isEmpty())
- return;
- switch (status) {
- case CONSUME_SUCCESS: // 如果消费成功
- // 如果ackIndex大于等于消息的大小
- if (ackIndex >= consumeRequest.getMsgs().size()) {
- // 设置为消息大小-1
- ackIndex = consumeRequest.getMsgs().size() - 1;
- }
- // 计算消费成功的的个数
- int ok = ackIndex + 1;
- // 计算消费失败的个数
- int failed = consumeRequest.getMsgs().size() - ok;
- this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
- break;
- case RECONSUME_LATER: // 如果延迟消费
- // ackIndex置为-1
- ackIndex = -1;
- this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
- consumeRequest.getMsgs().size());
- break;
- default:
- break;
- }
- // 判断消费模式
- switch (this.defaultMQPushConsumer.getMessageModel()) {
- case BROADCASTING: // 广播模式
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- MessageExt msg = consumeRequest.getMsgs().get(i);
- log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
- }
- break;
- case CLUSTERING: // 集群模式
- List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
- // 遍历消费失败的消息
- for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
- // 获取消息
- MessageExt msg = consumeRequest.getMsgs().get(i);
- // 向Broker发送延迟消息
- boolean result = this.sendMessageBack(msg, context);
- // 如果发送失败
- if (!result) {
- // 消费次数+1
- msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
- // 加入失败消息列表中
- msgBackFailed.add(msg);
- }
- }
- // 如果不为空
- if (!msgBackFailed.isEmpty()) {
- consumeRequest.getMsgs().removeAll(msgBackFailed);
- // 稍后重新进行消费
- this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
- }
- break;
- default:
- break;
- }
- // 从处理队列中移除消息
- long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
- // 更新拉取偏移量
- this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
- }
- }
- }
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
在sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImpl的sendMessageBack发送消息:- public class MessageStoreConfig {
- private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- }
复制代码 DefaultMQPushConsumerImp的sendMessageBack方法中又调用了MQClientAPIImpl的consumerSendMessageBack方法进行发送:- public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
- public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
- // 获取延迟级别
- int delayLevel = context.getDelayLevelWhenNextConsume();
- // 对主题添加上Namespace
- msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
- try {
- // 向Broker发送消息
- this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
- return true;
- } catch (Exception e) {
- log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
- }
- return false;
- }
- }
- // 并发消费上下文
- public class ConsumeConcurrentlyContext {
- /**
- * -1,不进行重试,加入DLQ队列
- * 0, Broker控制重试频率
- * >0, 客户端控制
- */
- private int delayLevelWhenNextConsume = 0; // 默认为0
- }
复制代码 在MQClientAPIImpl的consumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:- public class DefaultMQPushConsumerImpl implements MQConsumerInner {
- public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
- try {
- // 获取Broker地址
- String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
- // 调用consumerSendMessageBack方法发送消息
- this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
- } catch (Exception e) {
- // ...
- } finally {
- msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
- }
- }
- }
复制代码 Broker对请求的处理
- 根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
- 获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息
- 根据消息的物理偏移量从commitlog中获取消息
- 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0:
- 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
- 如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别
- 新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:
- 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
- 未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费
- 调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】
- public class MQClientAPIImpl {
- public void consumerSendMessageBack(
- final String addr,
- final MessageExt msg,
- final String consumerGroup,
- final int delayLevel,
- final long timeoutMillis,
- final int maxConsumeRetryTimes
- ) throws RemotingException, MQBrokerException, InterruptedException {
- // 创建请求头
- ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
- // 设置消费组
- requestHeader.setGroup(consumerGroup);
- requestHeader.setOriginTopic(msg.getTopic());
- // 设置消息物理偏移量
- requestHeader.setOffset(msg.getCommitLogOffset());
- // 设置延迟级别
- requestHeader.setDelayLevel(delayLevel);
- // 设置消息ID
- requestHeader.setOriginMsgId(msg.getMsgId());
- // 设置最大消费次数
- requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
- // 向Broker发送请求
- RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
- request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- return;
- }
- default:
- break;
- }
- throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
- }
- }
复制代码 延迟消息处理
- 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别
- public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
- // 处理请求
- public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- final SendMessageContext mqtraceContext;
- switch (request.getCode()) {
- case RequestCode.CONSUMER_SEND_MSG_BACK:
- // 处理请求
- return this.asyncConsumerSendMsgBack(ctx, request);
- default:
- // ...
- }
- }
- private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final ConsumerSendMsgBackRequestHeader requestHeader =
- (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
- // ...
- // 根据消费组获取订阅信息配置
- SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
- // 如果为空,直接返回
- if (null == subscriptionGroupConfig) {
- response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
- response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
- return CompletableFuture.completedFuture(response);
- }
- // ...
- // 获取消费组的重试主题
- String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
- // 从重试队列中随机选取一个队列
- int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
- int topicSysFlag = 0;
- if (requestHeader.isUnitMode()) {
- topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
- }
- // 创建TopicConfig主题配置信息
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
- newTopic,
- subscriptionGroupConfig.getRetryQueueNums(),
- PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
- //...
- // 根据消息物理偏移量从commitLog文件中获取消息
- MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
- if (null == msgExt) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("look message by offset failed, " + requestHeader.getOffset());
- return CompletableFuture.completedFuture(response);
- }
- // 获取消息的重试主题
- final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- if (null == retryTopic) {
- MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
- }
- msgExt.setWaitStoreMsgOK(false);
- // 延迟等级获取
- int delayLevel = requestHeader.getDelayLevel();
- // 获取最大消费重试次数
- int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
- if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- Integer times = requestHeader.getMaxReconsumeTimes();
- if (times != null) {
- maxReconsumeTimes = times;
- }
- }
- // 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
- if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
- || delayLevel < 0) {
- // 获取DLQ主题
- newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
- // 选取一个队列
- queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
- // 创建DLQ的topicConfig
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
- PermName.PERM_WRITE | PermName.PERM_READ, 0);
- // ...
- } else {
- // 如果延迟级别为0
- if (0 == delayLevel) {
- // 更新延迟级别
- delayLevel = 3 + msgExt.getReconsumeTimes();
- }
- // 设置延迟级别
- msgExt.setDelayTimeLevel(delayLevel);
- }
- // 新建消息
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setTopic(newTopic); // 设置主题
- msgInner.setBody(msgExt.getBody()); // 设置消息
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
- msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
- msgInner.setQueueId(queueIdInt); // 设置队列ID
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(msgExt.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数
- // 原始的消息ID
- String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
- // 设置消息ID
- MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
- // 添加重试消息
- CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
- return putMessageResult.thenApply((r) -> {
- if (r != null) {
- switch (r.getPutMessageStatus()) {
- case PUT_OK:
- // ...
- return response;
- default:
- break;
- }
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(r.getPutMessageStatus().name());
- return response;
- }
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("putMessageResult is null");
- return response;
- });
- }
- }
复制代码 - 根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中
- 备份之前的TOPIC和队列ID
- 更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理
- public class TopicValidator {
- // ...
- public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
- }
复制代码 拉取进度持久化
在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中:- public class CommitLog {
- public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
- // ...
- // 获取事务类型
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- // 如果未使用事务或者提交事务
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // 判断延迟级别
- if (msg.getDelayTimeLevel() > 0) {
- // 如果超过了最大延迟级别
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- // 根据延迟级别选取对应的队列
- int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
- // 备份之前的TOPIC和队列ID
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
- // ...
- }
- }
复制代码 加载进度
LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径- public class LocalFileOffsetStore implements OffsetStore {
- // offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量
- private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- // 获取之前的拉取进度
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- // 如果之前不存在,进行创建
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
- // 如果不为空
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- // 更新拉取偏移量
- offsetOld.set(offset);
- }
- }
- }
- }
- }
复制代码 在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:- public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
- "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
复制代码 在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:- this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +
- this.groupName + File.separator + "offsets.json";
复制代码 OffsetSerializeWrapper
OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapper的offsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStore的offsetTable中:- public class LocalFileOffsetStore implements OffsetStore {
- // 文件路径
- public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
- "rocketmq.client.localOffsetStoreDir",
- System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
- private final String storePath;
- // ...
- public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
- this.mQClientFactory = mQClientFactory;
- this.groupName = groupName;
- // 设置拉取进度文件的路径
- this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
- this.mQClientFactory.getClientId() + File.separator +
- this.groupName + File.separator +
- "offsets.json";
- }
- @Override
- public void load() throws MQClientException {
- // 从本地读取拉取偏移量
- OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
- if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
- // 加入到offsetTable中
- offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
- for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
- AtomicLong offset = mqEntry.getValue();
- log.info("load consumer's offset, {} {} {}",
- this.groupName,
- mqEntry.getKey(),
- offset.get());
- }
- }
- }
- // 从本地加载文件
- private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
- String content = null;
- try {
- // 读取文件
- content = MixAll.file2String(this.storePath);
- } catch (IOException e) {
- log.warn("Load local offset store file exception", e);
- }
- if (null == content || content.length() == 0) {
- return this.readLocalOffsetBak();
- } else {
- OffsetSerializeWrapper offsetSerializeWrapper = null;
- try {
- // 序列化
- offsetSerializeWrapper =
- OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
- } catch (Exception e) {
- log.warn("readLocalOffset Exception, and try to correct", e);
- return this.readLocalOffsetBak();
- }
- return offsetSerializeWrapper;
- }
- }
- }
复制代码 持久化进度
- 创建OffsetSerializeWrapper对象
- 遍历LocalFileOffsetStore的offsetTable,将数据加入到OffsetSerializeWrapper的OffsetTable中
- 将OffsetSerializeWrapper转为JSON
- 调用string2File方法将JSON数据保存到磁盘文件
- public class OffsetSerializeWrapper extends RemotingSerializable {
- private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
- public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
- return offsetTable;
- }
- public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
- this.offsetTable = offsetTable;
- }
- }
复制代码 集群模式
集群模式下的更新进度与广播模式下的更新类型,都是只更新了offsetTable中的数据:- public class LocalFileOffsetStore implements OffsetStore {
- @Override
- public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
- return;OffsetSerializeWrapper
- // 创建
- OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
- // 遍历offsetTable
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- if (mqs.contains(entry.getKey())) {
- // 获取拉取偏移量
- AtomicLong offset = entry.getValue();
- // 加入到OffsetSerializeWrapper的OffsetTable中
- offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
- }
- }
- // 将对象转为JSON
- String jsonString = offsetSerializeWrapper.toJson(true);
- if (jsonString != null) {
- try {
- // 将JSON数据保存到磁盘文件
- MixAll.string2File(jsonString, this.storePath);
- } catch (IOException e) {
- log.error("persistAll consumer offset Exception, " + this.storePath, e);
- }
- }
- }
- }
复制代码 加载
集群模式下加载消费进度需要从Broker获取,在消费者发送消息拉取请求的时候,Broker会计算消费偏移量,所以RemoteBrokerOffsetStore的load方法为空,什么也没有干:- public class RemoteBrokerOffsetStore implements OffsetStore {
- private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>();
- @Override
- public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
- if (mq != null) {
- // 获取消息队列的进度
- AtomicLong offsetOld = this.offsetTable.get(mq);
- if (null == offsetOld) {
- // 将消费进度保存在offsetTable中
- offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
- }
- if (null != offsetOld) {
- if (increaseOnly) {
- MixAll.compareAndIncreaseOnly(offsetOld, offset);
- } else {
- // 更新拉取偏移量
- offsetOld.set(offset);
- }
- }
- }
- }
- }
复制代码 持久化
由于集群模式下消费进度保存在Broker端,所以persistAll方法中调用了updateConsumeOffsetToBroker向Broker发送请求进行消费进度保存:- public class RemoteBrokerOffsetStore implements OffsetStore {
- @Override
- public void load() {
- }
- }
复制代码 持久化的触发
MQClientInstance在启动定时任务的方法startScheduledTask中注册了定时任务,定时调用persistAllConsumerOffset对拉取进度进行持久化,persistAllConsumerOffset中又调用了MQConsumerInner的persistConsumerOffset方法:- public class RemoteBrokerOffsetStore implements OffsetStore {
- @Override
- public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
- return;
- final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
- for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
- MessageQueue mq = entry.getKey();
- AtomicLong offset = entry.getValue();
- if (offset != null) {
- if (mqs.contains(mq)) {
- try {
- // 向Broker发送请求更新拉取偏移量
- this.updateConsumeOffsetToBroker(mq, offset.get());
- log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
- this.groupName,
- this.mQClientFactory.getClientId(),
- mq,
- offset.get());
- } catch (Exception e) {
- log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
- }
- } else {
- unusedMQ.add(mq);
- }
- }
- }
- // ...
- }
- }
复制代码 DefaultMQPushConsumerImpl是MQConsumerInner的一个子类,以它为例可以看到在persistConsumerOffset方法中调用了offsetStore的persistAll方法进行持久化:- public class MQClientInstance {
- private void startScheduledTask() {
- // ...
- // 注册定时任务,定时持久化拉取进度
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- // 持久化
- MQClientInstance.this.persistAllConsumerOffset();
- } catch (Exception e) {
- log.error("ScheduledTask persistAllConsumerOffset exception", e);
- }
- }
- }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- // ...
- }
- private void persistAllConsumerOffset() {
- Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, MQConsumerInner> entry = it.next();
- MQConsumerInner impl = entry.getValue();
- // 调用persistConsumerOffset进行持久化
- impl.persistConsumerOffset();
- }
- }
- }
复制代码 总结

