rocketmq架构

[复制链接]
发表于 2025-5-30 02:59:58 | 显示全部楼层 |阅读模式
1.架构图 


RocketMQ的四层架构:
1.1 客户端层(Clients):

Producer集群:
(1)DefaultMQProducer:普通消息生产者
(2)TransactionMQProducer:事务消息生产者
(3)支持同步、异步、单向发送
Consumer集群:
(1)DefaultMQPushConsumer:服务端推送模式
(2)DefaultMQPullConsumer:客户端拉取模式
(3)支持集群消费和广播消费
1.2 定名服务层(NameServer)

(1)无状态设计,多节点互不通讯
(2)Broker管理:
- 吸收Broker心跳注册(每30s)
- 查抄Broker存活状态(每10s)
- 剔除不活跃的Broker(120s未收到心跳)
(3)路由管理:
- 维护Topic队列信息
- 维护Broker存活信息
- 提供路由信息查询服务
(4)提供服务发现能力
(5)Producer和Consumer会连接多个NameServer
1.3 消息服务层(Brokers)

(1)Broker Master:提供读写服务
(2)Broker Slave:提供读服务
(3)异步刷盘和同步刷盘
1.4 存储层(Store)

(1)CommitLog:消息存储文件
(2) ConsumeQueue:消息消费队列,相当于CommitLog索引
(3) IndexFile:消息索引文件,提供消息Key查询

2.RocketMQ的焦点流程

RocketMQ的焦点流程,重要包括生产、存储、消费三大环节
2.1消息生产流程

  1. // 核心代码结构
  2. public class DefaultMQProducer {
  3.     private SendResult send(Message msg) {
  4.         // 1. 检查消息
  5.         this.checkMessage(msg);
  6.         
  7.         // 2. 查找Topic路由信息
  8.         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  9.         
  10.         // 3. 选择消息队列(默认轮询策略)
  11.         MessageQueue messageQueue = selectOneMessageQueue(topicPublishInfo);
  12.         
  13.         // 4. 发送消息
  14.         SendResult sendResult = this.sendKernelImpl(msg, messageQueue, communicationMode, sendCallback, timeout);
  15.     }
  16. }
复制代码

2.2消息存储流程:

  1. // Broker存储核心代码
  2. public class CommitLog {
  3.     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
  4.         // 1. 获取写入锁
  5.         putMessageLock.lock();
  6.         try {
  7.             // 2. 验证Broker状态
  8.             if (!this.checkMessage(msg)) {
  9.                 return null;
  10.             }
  11.             
  12.             // 3. 写入消息
  13.             result = this.mappedFile.appendMessage(msg);
  14.             
  15.             // 4. 刷盘
  16.             handleDiskFlush(result);
  17.             
  18.             // 5. 复制到Slave
  19.             handleHA(result);
  20.             
  21.         } finally {
  22.             putMessageLock.unlock();
  23.         }
  24.     }
  25. }
复制代码
存储结构:
  1. - commitLog:
  2.   |-- 00000000000000000000 (文件名代表偏移量)
  3.   |-- 00000000001073741824
  4.   |-- ...
  5. - consumeQueue:
  6.   |-- topic
  7.       |-- queue0
  8.           |-- 00000000000000000000
  9.           |-- 00000000000000000001
  10.       |-- queue1
  11.           |-- ...
  12. - index:
  13.   |-- 20240205123456789
  14.   |-- 20240205234567890
复制代码
2.3消息消费流程

  1. public class DefaultMQPushConsumerImpl {
  2.     private void pullMessage(final PullRequest pullRequest) {
  3.         // 1. 构建消费请求
  4.         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
  5.         requestHeader.setConsumerGroup(this.consumerGroup);
  6.         requestHeader.setTopic(pullRequest.getMessageQueue().getTopic());
  7.         requestHeader.setQueueId(pullRequest.getMessageQueue().getQueueId());
  8.         requestHeader.setQueueOffset(pullRequest.getNextOffset());
  9.         
  10.         // 2. 执行拉取
  11.         PullCallback pullCallback = new PullCallback() {
  12.             @Override
  13.             public void onSuccess(PullResult pullResult) {
  14.                 // 处理拉取结果
  15.                 processQueue.putMessage(pullResult.getMsgFoundList());
  16.                 // 提交消费请求
  17.                 consumeMessageService.submitConsumeRequest(
  18.                     pullResult.getMsgFoundList(),
  19.                     processQueue,
  20.                     pullRequest.getMessageQueue());
  21.             }
  22.         };
  23.     }
  24. }
复制代码
消费者重平衡流程:
  1. public class RebalanceImpl {
  2.     private void rebalanceByTopic() {
  3.         // 1. 获取Topic下所有消费者
  4.         List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic);
  5.         
  6.         // 2. 对消费者排序
  7.         Collections.sort(cidAll);
  8.         
  9.         // 3. 分配队列
  10.         AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  11.         List<MessageQueue> allocateResult = strategy.allocate(
  12.             this.consumerGroup,
  13.             this.mQClientFactory.getClientId(),
  14.             mqAll,
  15.             cidAll);
  16.     }
  17. }
复制代码
3.生产者三种发送方式 

  1. a. 同步发送:
  2.    Producer → 等待Broker响应 → 继续业务流程
  3. b. 异步发送:
  4.    Producer → 注册回调函数 → 继续业务流程
  5.              ↓
  6.    收到响应后触发回调
  7. c. 单向发送:
  8.    Producer → 发送消息 → 继续业务流程(不关心结果)
复制代码
4.故障处理 

  1. a. Broker故障:
  2.    - 自动切换到其他Broker
  3.    - 触发重试机制
  4.    - 更新故障延迟信息
  5. b. 网络故障:
  6.    - 通过重试机制保证消息投递
  7.    - 支持消息轨迹查询
复制代码
5.连接方式 

各个角色之间的连接关系:
  1. Producer -> NameServer:长连接
  2. Producer -> Broker:长连接
  3. Consumer -> NameServer:长连接
  4. Consumer -> Broker:长连接
  5. Broker -> NameServer:长连接(心跳)
复制代码
 连接断开处理:
  1. 如果心跳超时(默认120秒):
  2. - NameServer会删除Broker的路由信息
  3. - Producer会重新建立连接
  4. - Consumer会触发重平衡
复制代码
长连接机制的优点:
(1)减少连接创建的开销
(2)提高消息收发服从
(3)及时感知各个组件的存活状态
(4)包管集群的可用性和稳定性

6.关键的性能优化点

  1. // 1. 文件预分配
  2. public void preallocation() {
  3.     File file = new File(storePath + File.separator + nextFilePath);
  4.     try {
  5.         file.createNewFile();
  6.         RandomAccessFile raFile = new RandomAccessFile(file, "rw");
  7.         raFile.setLength(fileSize);
  8.     }
  9. }
  10. // 2. 零拷贝发送
  11. public SendResult sendMessage(final Channel channel, final Message msg) {
  12.     // 使用FileChannel.transferTo实现零拷贝
  13.     fileChannel.transferTo(position, count, socketChannel);
  14. }
  15. // 3. 批量消息处理
  16. public PullResult pullMessage(final int batchSize) {
  17.     // 一次拉取多条消息
  18.     List<MessageExt> msgFoundList = new ArrayList<>(batchSize);
  19. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
继续阅读请点击广告

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
回复

使用道具 举报

© 2001-2025 Discuz! Team. Powered by Discuz! X3.5

GMT+8, 2025-7-4 05:08 , Processed in 0.231213 second(s), 35 queries 手机版|qidao123.com技术社区-IT企服评测▪应用市场 ( 浙ICP备20004199 )|网站地图

快速回复 返回顶部 返回列表