009 rocketmq延时消息

打印 上一主题 下一主题

主题 1013|帖子 1013|积分 3039

延迟消息

两个关键要素:暂时存储 延时服务
内置队列,RocketMQ包罗多个内置队列。
特点:对消费者不可见,消费者不能消费
SCHEDULE_TOPIC_XXXX对应的队列是一种内置队列
注意:RocketMQ不⽀持任意时间的延时,只⽀持以下⼏个固定的延时品级
  1. String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
复制代码
RocketMQ的延迟品级可以进⾏修改,以满⾜⾃⼰的业务需求,可以修改/添加新的level。
比方:你想⽀持1天的延迟,修改末了⼀个level的值为1d,这个时候依然是18个level;也可以增加⼀个1d,这个时候统共就有19个level。
  1. messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
复制代码
实现原理
延迟队列的核⼼思路是:全部的延迟消息由producer发出之后,都会存放到同⼀个topic(SCHEDULE_TOPIC_XXXX)下,根据延迟level的个数,创建对应数目的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由
定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可⻅,从⽽被consumer消费。
⽣产者在发送延迟消息⾮常简单,只需要设置⼀个延迟级别即可,注意不是具体的延迟时间,如:
  1. Message msg=new Message();
  2. msg.setTopic("TopicA");
  3. msg.setTags("Tag");
  4. msg.setBody("this is a delay message".getBytes());
  5. //设置延迟level为5,对应延迟1分钟
  6. msg.setDelayTimeLevel(5);
  7. producer.send(msg);
复制代码
延迟消息在RocketMQ Broker端的流转如下图所示:

可以看到,统共有6个步骤,下⾯会对这6个步骤进⾏具体的讲解:

  • 修改消息Topic名称和队列信息
  • 转发消息到延迟主题SCHEDULE_TOPIC_XXXX的CosumeQueue中
  • 延迟服务消费SCHEDULE_TOPIC_XXXX消息
  • 将信息重新存储到CommitLog中
  • 将消息投递到⽬标Topic中
  • 消费者消费⽬标topic中的数据
第⼀步:修改消息Topic名称和队列信息

RocketMQ Broker端在存储⽣产者写⼊的消息时,⾸先都会将其写⼊到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到⽬标Topic的指定队列(ConsumeQueue)中。
由于消息⼀旦存储到ConsumeQueue中,消费者就能消费到,⽽延迟消息不能被⽴即消费,所以这⾥将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。
同时,还会将消息原来要发送到的⽬标Topic和队列信息存储到消息的属性中。
   如果是延迟消息,修改topic,通过timelevel判断是延时消息
如果设置的级别凌驾了最大级别,重置延迟级别
更改topic
雷同的延迟品级发送到同一个队列,不同品级不同队列
记载真正的topic和队列
  /store/consumequeue/SCHEDULE_TOPIC_XXXX/2
相干源码如下所示:
org.apache.rocketmq.store.CommitLog#putMessage
  1. public PutMessageResult putMessage(final MessageExtBrokerInner msg) {}
复制代码
第⼆步:转发消息到延迟主题的CosumeQueue中

CommitLog中的消息转发到CosumeQueue中是异步进⾏的。在转发过程中,会对延迟消息进⾏特殊处理,重要是计算这条延迟消息需要在什么时候进⾏投递。
  1. 投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
复制代码
需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元构成结构如下图所示:

此中:
Commit Log Offset:记载在CommitLog中的位置。
Size:记载消息的⼤⼩
Message Tag HashCode:记载消息Tag的哈希值,⽤于消息过滤。特别的,对于延迟消息,这个字段记载的是消息的投递时间戳。这也是为什么java中hashCode⽅法返回⼀个int型,只占⽤4个字节,⽽这⾥Message Tag HashCode字段却设计成8个字节的原因。
相干源码参⻅:
CommitLog#checkMessageAndReturnSize
  1. public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer
  2. byteBuffer, final boolean checkCRC,
  3. final boolean readBody) {}
复制代码
第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息

Broker内部有⼀个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到⽬标Topic中。
ScheduleMessageService在启动时,其会创建⼀个定时器Timer,并根据延迟级别的个数,启动对应数目的TimerTask,每个TimerTask负责⼀个延迟级别的消费与投递。
相干源码如下所示:
ScheduleMessageService#start
  1. public void start() {}
复制代码
需要注意的是,每个TimeTask在检查消息是否到期时,⾸先检查对应队列中尚未投递第⼀条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进⾏投递,并检查之后的消息是否到期。
第四步:将信息重新存储到CommitLog中

在将消息到期后,需要投递到⽬标Topic。由于在第⼀步已经记载了原来的Topic和队列信息,因此这⾥重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这⾥需要重新计算tag的哈希值后再存储。
源码拜见:DeliverDelayedMessageTimerTask#messageTimeup⽅法
  1. private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {}
复制代码
第五步:将消息投递到目的Topic中

第六步:消费者消费目的topic中的数据

不同的延迟级别放在不同的队列序号下(queueId=delayLevel-1)。每⼀个延迟级别对应的延迟消息转换为普通消息的位置标识存放在~/store/config/delayOffset.json⽂件内。
key为对应的延迟级别,value对应不同延迟级别转换为普通消息的offset值。
  1. {
  2. "offsetTable":{3:202,4:2,5:2,6:2,7:2,8:2,9:2,10:2,11:2
  3. }
复制代码
ScheduledMessageProducer.java

  1. package com.example.rocketmq.demo.scheduled;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.apache.rocketmq.common.message.Message;
  4. public class ScheduledMessageProducer {
  5.     public static void main(String[] args) throws Exception {
  6.         // Instantiate a producer to send scheduled messages
  7.         DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  8.         producer.setNamesrvAddr("localhost:9876");
  9.         // Launch producer
  10.         producer.start();
  11.         for (int i = 0; i < 2; i++) {
  12.             Message message = new Message("TopicTest", ("Hello scheduled message " + i).getBytes());
  13.             // This message will be delivered to consumer 10 seconds later.
  14.             //设置延时等级,3 是等级 不是3s
  15.             message.setDelayTimeLevel(3);
  16.             // Send the message
  17.             producer.send(message);
  18.         }
  19.         // Shutdown producer after use.
  20.         producer.shutdown();
  21.     }
  22. }
复制代码
ScheduledMessageConsumer.java

  1. package com.example.rocketmq.demo.scheduled;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.common.message.MessageExt;
  7. import java.util.List;
  8. public class ScheduledMessageConsumer {
  9.     public static void main(String[] args) throws Exception {
  10.         // Instantiate message consumer
  11.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
  12.         // Subscribe topics
  13.         consumer.subscribe("TopicTest", "*");
  14.         // Register message listener
  15.         consumer.registerMessageListener(new MessageListenerConcurrently() {
  16.             @Override
  17.             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
  18.                 for (MessageExt message : messages) {
  19.                     // Print approximate delay time period
  20.                     System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
  21.                             + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
  22.                 }
  23.                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  24.             }
  25.         });
  26.         // Launch consumer
  27.         consumer.start();
  28.     }
  29. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表