1.架构图
RocketMQ的四层架构:
1.1 客户端层(Clients):
Producer集群:
(1)DefaultMQProducer:普通消息生产者
(2)TransactionMQProducer:事务消息生产者
(3)支持同步、异步、单向发送
Consumer集群:
(1)DefaultMQPushConsumer:服务端推送模式
(2)DefaultMQPullConsumer:客户端拉取模式
(3)支持集群消费和广播消费
1.2 定名服务层(NameServer)
(1)无状态设计,多节点互不通讯
(2)Broker管理:
- 吸收Broker心跳注册(每30s)
- 查抄Broker存活状态(每10s)
- 剔除不活跃的Broker(120s未收到心跳)
(3)路由管理:
- 维护Topic队列信息
- 维护Broker存活信息
- 提供路由信息查询服务
(4)提供服务发现能力
(5)Producer和Consumer会连接多个NameServer
1.3 消息服务层(Brokers)
(1)Broker Master:提供读写服务
(2)Broker Slave:提供读服务
(3)异步刷盘和同步刷盘
1.4 存储层(Store)
(1)CommitLog:消息存储文件
(2) ConsumeQueue:消息消费队列,相当于CommitLog索引
(3) IndexFile:消息索引文件,提供消息Key查询
2.RocketMQ的焦点流程
RocketMQ的焦点流程,重要包括生产、存储、消费三大环节
2.1消息生产流程
- // 核心代码结构
- public class DefaultMQProducer {
- private SendResult send(Message msg) {
- // 1. 检查消息
- this.checkMessage(msg);
-
- // 2. 查找Topic路由信息
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
-
- // 3. 选择消息队列(默认轮询策略)
- MessageQueue messageQueue = selectOneMessageQueue(topicPublishInfo);
-
- // 4. 发送消息
- SendResult sendResult = this.sendKernelImpl(msg, messageQueue, communicationMode, sendCallback, timeout);
- }
- }
复制代码
2.2消息存储流程:
- // Broker存储核心代码
- public class CommitLog {
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- // 1. 获取写入锁
- putMessageLock.lock();
- try {
- // 2. 验证Broker状态
- if (!this.checkMessage(msg)) {
- return null;
- }
-
- // 3. 写入消息
- result = this.mappedFile.appendMessage(msg);
-
- // 4. 刷盘
- handleDiskFlush(result);
-
- // 5. 复制到Slave
- handleHA(result);
-
- } finally {
- putMessageLock.unlock();
- }
- }
- }
复制代码 存储结构:
- - commitLog:
- |-- 00000000000000000000 (文件名代表偏移量)
- |-- 00000000001073741824
- |-- ...
- - consumeQueue:
- |-- topic
- |-- queue0
- |-- 00000000000000000000
- |-- 00000000000000000001
- |-- queue1
- |-- ...
- - index:
- |-- 20240205123456789
- |-- 20240205234567890
复制代码 2.3消息消费流程
- public class DefaultMQPushConsumerImpl {
- private void pullMessage(final PullRequest pullRequest) {
- // 1. 构建消费请求
- PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
- requestHeader.setConsumerGroup(this.consumerGroup);
- requestHeader.setTopic(pullRequest.getMessageQueue().getTopic());
- requestHeader.setQueueId(pullRequest.getMessageQueue().getQueueId());
- requestHeader.setQueueOffset(pullRequest.getNextOffset());
-
- // 2. 执行拉取
- PullCallback pullCallback = new PullCallback() {
- @Override
- public void onSuccess(PullResult pullResult) {
- // 处理拉取结果
- processQueue.putMessage(pullResult.getMsgFoundList());
- // 提交消费请求
- consumeMessageService.submitConsumeRequest(
- pullResult.getMsgFoundList(),
- processQueue,
- pullRequest.getMessageQueue());
- }
- };
- }
- }
复制代码 消费者重平衡流程:
- public class RebalanceImpl {
- private void rebalanceByTopic() {
- // 1. 获取Topic下所有消费者
- List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic);
-
- // 2. 对消费者排序
- Collections.sort(cidAll);
-
- // 3. 分配队列
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
- List<MessageQueue> allocateResult = strategy.allocate(
- this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll);
- }
- }
复制代码 3.生产者三种发送方式
- a. 同步发送:
- Producer → 等待Broker响应 → 继续业务流程
- b. 异步发送:
- Producer → 注册回调函数 → 继续业务流程
- ↓
- 收到响应后触发回调
- c. 单向发送:
- Producer → 发送消息 → 继续业务流程(不关心结果)
复制代码 4.故障处理
- a. Broker故障:
- - 自动切换到其他Broker
- - 触发重试机制
- - 更新故障延迟信息
- b. 网络故障:
- - 通过重试机制保证消息投递
- - 支持消息轨迹查询
复制代码 5.连接方式
各个角色之间的连接关系:
- Producer -> NameServer:长连接
- Producer -> Broker:长连接
- Consumer -> NameServer:长连接
- Consumer -> Broker:长连接
- Broker -> NameServer:长连接(心跳)
复制代码 连接断开处理:
- 如果心跳超时(默认120秒):
- - NameServer会删除Broker的路由信息
- - Producer会重新建立连接
- - Consumer会触发重平衡
复制代码 长连接机制的优点:
(1)减少连接创建的开销
(2)提高消息收发服从
(3)及时感知各个组件的存活状态
(4)包管集群的可用性和稳定性
6.关键的性能优化点
- // 1. 文件预分配
- public void preallocation() {
- File file = new File(storePath + File.separator + nextFilePath);
- try {
- file.createNewFile();
- RandomAccessFile raFile = new RandomAccessFile(file, "rw");
- raFile.setLength(fileSize);
- }
- }
- // 2. 零拷贝发送
- public SendResult sendMessage(final Channel channel, final Message msg) {
- // 使用FileChannel.transferTo实现零拷贝
- fileChannel.transferTo(position, count, socketChannel);
- }
- // 3. 批量消息处理
- public PullResult pullMessage(final int batchSize) {
- // 一次拉取多条消息
- List<MessageExt> msgFoundList = new ArrayList<>(batchSize);
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|