RocketMQ 必知概念

张裕  金牌会员 | 2024-10-6 21:50:54 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 571|帖子 571|积分 1713

延迟消息

延迟等级

官方默认设置了 18 哥延迟等级
1s        5s        10s        30s        1m        2m        3m        4m        5m        6m        7m        8m        9m        10m        20m        30m        1h        2h
发送延迟消息:按照默认顺序 1-18 数字就对应上面的延迟时间
  1. Message msg = new Message (TOPIC, TAG, "OrderID199", "ok", getBytes(StandardCharsets.UTF_8));
  2. //设置延迟等级
  3. msg.setDelayTimeLevel(3);
  4. producer.send(msg);
复制代码
基本原理

延迟消息都会被存储到 RocketMQ 的一个内部 Topic:SCHEDULE_TOPIC_XXXX 中
SCHEDULE_TOPIC_XXXX 共有 18 个 MessageQueue:

  • 对应延迟消息的 18 个等级,根据指定的 DelayTimeLevel 来决定选择哪个 MessageQueue
  • 有一个定时任务,每 100 ms 执行一次判定 SCHEDULE_TOPIC_XXXX Topic 中的 MessageQueue 的消息是否到达延迟时间
  • 若到达延迟时间,将 SCHEDULE_TOPIC_XXXX 中的消息投递到消息最初必要投递的 Topic 之中
为什么不支持任意时间?
RocketMQ 并不支持任意时间的延迟,大概的主要缘故原由就是如果提供任意时间,就会涉及到消息的排序,会有一定的性能损耗
事件消息

RocketMQ 采用了 2PC 的思想来实现了提交事件消息,同时增加一个补偿逻辑来处置惩罚二阶段超时或失败的消息

基本流程

第一阶段:

  • 发送 Message,Half Message ,即半事件消息
  • 此范例的 Message 是不会被 Consumer 消费的
第二阶段:如果半事件消息投递成功,则会开始执行本地事件
分为如下三种 Case:

  • 本地事件执行成功:会为 Broker 发送 commit 消息,被 commit 过后的 Message 才能被 Consumer 消费
  • 本地事件执行失败:

    • 会为 Broker 发送 rollback 消息,Broker 则会将刚刚投递的半事件消息删除,从而保证上下游数据的一致性
    • 如果 Producer 实例或者网络出现题目,Producer 没能实时 de 将本地事件执行的结果通知 Broker,Broker 会通过扫描发现某条 Message 长时间处于半事件状态,Broker 会自动 de 给 Producer 询问此 Message 对应的事件状态

基本设计

采用 2PC 两阶段设计:

将 Message 原本真实的 Topic 和 MessageQueue 进行备份

  • 存入到 PROPERTY_REALTOPC、PROPERTY_REAL_QUEUE_ID 中
将消息投递到一个内部 Topic 中 RMQ_SYS_TRANS_HALF_TOPIC,该队列专门存储事件消息
所有的 Half Message 全部都写入到 queueId 为 0 的 MessageQueue,因为一个 Topic 下只有 1 个 MessageQueue:

  • 这个 Topic 下的所有 Message 就是全局有序的,ta 们会按照先来后到的顺序被消费
如果本地事件执行成功进行 Commit,则将 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息投递到真实的 Topic 中,供后续流程执行

  • 并删除这条 Half Message,但删除也是假删除,只是给 Message 打上一个删除的 tag
如果本地事件执行失败进行 rollback,则直接删除这条 Half Message,但删除也是假删除
如果本地事件吃吃没有返回结果(默认时间 6s),则会触发事件回查机制

  • 执行回查之前必要校验检查次数是否到达了最大值(必要手动设置,没有默认值)
  • 或者是当前 Half Message 存在是否凌驾了 Message 保存的上限,即 3 天
  • 如果满足上面条件中的一种 Half Message 会被放进 TRANS_CHECK_MAX_TOPIC Topic 中
  • 一旦判定为必要执行事件回查逻辑,那么当前这条 Half Message 就算已经被消费了
  • 在没有到达最大的校验次数之前,都还必要将其投递到事件队列当中,以便下次重试时再次执行 Check 逻辑
  • 如果回查成功,则删除投递的 Half Message
消息重试

重试时间

消息消费失败后,并不会立即重试,而是一个递增的时间间隔来进行重试的,重试次数默以为 16 次
只比延迟消息的时间间隔等级少了前两个,延迟消息总共有 18 个等级,而消息重试使用了延迟消息的第 3-18 等级
10s        30s        1m        2m        3m        4m        5m        6m        7m        8m        9m        10m        20m        30m        1h        2h
基本原理

重试的 Message,RocketMQ 的做法并不是将其投递回原来的 Topic,而是重试队列
每个 ConsumerGroup 都有自己的重试队列:

  • 其名称是由特定的前缀拼接上 ConsumerGroup 所组成,默认 %RETR% +  消费者组名称
  • 所有在 Consumer 启动时,就会同时消费其 ConsumerGroup 对应的重试队列和平凡队列
消费失败的 Message,Consumer 会将其投回 Broker:

  • 相当于这条 Message 已经被消费掉了,之后重试的只是内容雷同,但现实不是同一个的 Message
  • 然后会校验重试的次数,如果到达 16 次,则会进入死信队列,组成为 %DLQ% + 消费者组名称
  • 未到达最大重试次数,则会根据重试间隔时间等级将其投递到延迟队列 SCHEDULE_TOPIC_XXXX 中
  • 然后比及了延迟等级对应的时间后,在投递到 ConsumerGroup 所对应的重试队列当中,供后续消费
消息存储

整体架构

RocketMQ 的混合性存储结构(多个 Topic 的消息实体内容都存储于一个 CommitLog中)
针对 Producer 和 Consumer 分别采用了数据和索引部分相分离的存储结构
Producer 发送消息至 Broker 端,然后 Broker 端使用同步或者异步的方式对消息刷盘长期化,保存至 CommitLog 中

核心步调:

  • 首先,生产者根据 topic 发送消息,消息存储在 commitLog中,1 G一个文件,当文件满了,写入下一个文件
  • 其次,ReputMessageService 重写消息服务执行 2 个分发操作:

    • 创建 ConsumerQueue 逻辑消费队列:

      • 参数:commitLogOffset 物理偏移量、msgSize 消息长度、tagsCode tag 哈希

    • 创建 IndexFile 索引文件:

      • 以创建时的时间戳定名


  • 最后,消费者根据 topic、tag 拉取消息消费,根据 key 查询消息
重要文件

commitLog 消息日记:

  • 消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容
consumequeue 逻辑消费队列:

  • 存储了 commitLog 的起始物理 offset,目标是进步消费的性能
indexFile 索引文件:

  • 提供了一种可以通过 key 或者时间区间来查询消息的方法
consumequeue 文件:
consumequeue 文件采取定长设计,每一个条目共 20 个字节,分别为:

  • 8 字节的 commmitLog 物理偏移量
  • 4 字节 的消息长度
  • 8 字节 tag hashcode
单个文件由 30w 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件巨细约 5.72M

  • 默认一个 topic 对应 4 个 queueId,即 4 个 messageQueue
每个 messageQueue 文件夹下有多个 consumeQueue,所以:messageQueue 1 :N consumeQueue

通信机制

通信架构图


基本通讯流程如下:

  • Broker 启动后必要完成一次将自己注册至 NameServer 的操作,随后每隔 30s 时间定时向 NameServer 上报 Topic 路由信息
  • 消息生成者 Producer 作为客户端发送消息时,必要根据消息的 Topic 从本地缓存的 TopicPublishInfoTable 获取路由信息

    • 如果没有则更新路由信息会从 NameServer 上重新拉取,同时 Producer 会默认每隔 30s 向 NameServer 拉取一次路由信息

  • 消息生产者 Producer 根据获取的路由信息选择一个队列(MessageQueue) 进行消息发送

    • Broker 作为消息的吸收者吸收消息并落盘存储

  • 消息消费者 Consumer 根据获取的路由信息,并再完成客户端的负载均衡后,选择此中的某一个或者某几个消息队列来拉取消息并进行消费
为了实现客户端与服务器之间高效的数据请求与吸收:

  • RocketMQ-Remoting 包自定义了通信协议并在 Netty 的底子之上扩展了通信模块

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张裕

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表