ToB企服应用市场:ToB评测及商务社交产业平台

标题: RocketMQ架构 [打印本页]

作者: 南飓风    时间: 6 天前
标题: RocketMQ架构
目次
一、简介
二、工作流程
三、四大核心组件
1、NameServer
2、Broker
2.1、部署方式
2.2、高可用与负载均衡
3、生产者(Producer)
3.1、生产者类型
3.2、发送方式
3.3、生产者组
3.4、启动流程
3.5、高可用
4、消费者
4.1、消费者类型
4.2、消费者组
4.3、高可用
四、设计架构
1、NameServer 集群
2、Broker 集群
3、Producer 集群
4、Consumer 集群
5、集群间的交互方式
6、RocketMQ的消息
6.1、顺序消息
6.2、消息过滤
6.3、消息存储
6.4、延迟消息
五、Docker下安装部署
1、准备镜像
2、挂载目次
3、启动namesvr
4、启动broker
5、启动控制面板
6、访问


一、简介

RocketMQ可以或许提供更高的吞吐量和更低的延迟,单机吞吐十万级,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。其功能重要围绕 NameServer、Broker、Producer、Consumer 四大核心组件。
核心特性:
二、工作流程


三、四大核心组件

四大核心组件为:NameServer、Broker、Producer、Consumer

1、NameServer

NameServer 相称于服务注册中央的脚色,管理 Broker,用来保存 Broker 相干元信息并给 Producer 和 Comsumer 查找 Broker 信息。
NamerServer 是无状态的,可横向扩展,节点之间相互无通讯。
每个 Broker 在启动的时候都会到 NameServer 注册,Producer 在发送消息前会根据 topic 到 NameServer 获取到 Broker 的路由信息进而和 Broker 取得联系,Consumer 也会定时获取 topic 的路由信息。
NameServer 之所以被称为无状态,是由于它不依赖长期化存储,节点间不进行状态同步,仅在内存中保存由 Broker 等脚色主动上报的动态状态信息,且这些信息在 NameServer 重启后不会保留,这种设计简化了系统架构,提升了服务的可伸缩性和容错能力,同时也低落了运维复杂度:
2、Broker

消息服务器(Broker)是消息存储中央,用于接收来自 Producer 的消息并存储,Consumer 从这里获取消息并消费。他还存储与消息相干的元数据,包括用户组、消费进度偏移量、队列信息等。
Broker 分为 master 和 slave ,master 既可以写又可以读,slave 不可以写只可以读。
2.1、部署方式

Broker 的部署相对复杂,分为 master 和 slave ,一个 master 可以对应多个 slave,但是一个 slave 只能对应一个 master。master 和 slave 的对应关系通过指定雷同的 BrokerName 差异的 BrokerId来定义,BrokerId为0表示 master,非0表示 slave。Broker 集群有四种属方式:
2.2、高可用与负载均衡

Broker 的高可用与负载均衡重要依赖于:NameServer和自动故障转移实现的。

3、生产者(Producer)

3.1、生产者类型


3.2、发送方式


3.3、生产者组

生产者组(Producer Group)是一类 Producer 的聚集,这类 Producer 通常发送一类消息并且发送逻辑同等。从部署结构上看,生产者通过 Producer Group 的名字来标记本身是一个集群。
3.4、启动流程

3.5、高可用

Producer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 topic 路由信息,并向提供服务的 master 建立长连接,且定时向 master 发送心跳。
Producer 完全无状态,可集群部署。
Producer 每隔30s(由 ClientConfig 的 pollNameServerInterval)从 NameServer 获取全部 topic 队列的最新环境,这意味着假如 Broker 不可用,Producer 最多30s可以或许感知,在此期间发往该 Broker 的全部消息都会失败。
Producer 每隔30s(由 ClientConfig 的 hearbeatBrokerInterval决定)向全部关联的 Broker 发送心跳,Broker 每隔10s扫描全部存活的连接,假如 Broker 在2分钟内没有收到心跳数据,则关闭与 Producer 的连接。
4、消费者

消息订阅者,负责从 Topic 接收并消费消息,它从 Broker 拉取消息或者由 Broker 推送消息给消费者,具体是推还是拉取取决于所使用的消费模式。
4.1、消费者类型


4.2、消费者组

消费者组一类 Consumer 的聚集名称,这类 Consumer 通常消费同一类消息并且消费逻辑同等,所以将这些 Consumer 分组在一起。消费者与生产者类似,都是将雷同脚色的分组在一起并名。
RocketMQ中的消息有个特点:同一条消息,只能被某一消费组其中的一台机器消费,但是可以同时被差异的消费组消费。
4.3、高可用

与生产者一样,Consumer 与 NameServer 集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 取 topic 路由信息,并向提供服务的 master、slave 建立长连接,且定时向 master、slave 发送心跳。
Consumer 既可以从 master 订阅消息,也可以从 slave 订阅消息,订阅规则由 Broker 设置决定。
Consumer 每隔30s从 NameServer 获取 topic 的最新队列环境,这意味着 Broker 不可用时,Consumer 最多需要30s才感知。
Consumer 每隔30s(由 ClientConfig 中 heartbeatBrokerInterval 决定)向全部关联的 Broker 发送心跳,Broker 每隔10s扫描全部存活的连接,若某个连接2分钟内没有发送心跳数据,则关闭连接;并向该 Consumer Group 的全部 Consumer 发出通知,Group 内的 Consumer 重新分配队列,然后消费。
当 Consumer 的到 master 宕机通知后,转向 slave 消费,slave 不能包管 master 的消息100%都同步过来了,因此会有少量的消息丢失。但是一旦 master 规复,未同步过去的消息会被终极消费掉。
四、设计架构

RocketMQ 的设计基于主题的发布与订阅模式(观察者模式),团体设计追求简单与性能第一,重要体如今以下三个方面:

1、NameServer 集群

提供轻量级的服务发现及路由,每个 NameServer 记录完整的路由信息,提供相应的读写服务,支持快速扩展。其重要包含两个功能:

2、Broker 集群

通过提供轻量级的 topic 和 queue 机制处置惩罚消息存储,同时支持推(push)和拉(pull)两种模型。提供强盛的峰值添补和以原始时间顺序累积数千亿条消息的能力。此外还提供灾难规复,丰富的指标统计数据和警报机制,这些都是传统的消息系统缺乏的。
Broker 有几个重要的子模块:

3、Producer 集群

消息生产者支持分布式部署,通过多种负载均衡模式向 Broker 集群发送消息。
4、Consumer 集群

消费者也支持 push 和 pull 模型的分布式部署,还支持集群消费和广播消费。
提供了及时的消息订阅机制,可以满足大多数消费者的需求。
5、集群间的交互方式


6、RocketMQ的消息

6.1、顺序消息

一种严格按照顺序进行发布和消费的消息类型,要求消息的发布和消费都按照顺序进行,RocketMQ可以包管消息有序。但是这个顺序是有限定的。
6.2、消息过滤

消息过滤是指在消息消费时,消费者可以对同一主题下的消息按照规则只消费本身感爱好的消息。RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制:

6.3、消息存储

消息中央件一个核心的能力是对消息的存储,一般有两个维度的考量:

RocketMQ 追求消息存储的高性能,引入内存映射机制,全部主题的消息顺序存储在同一个文件中。同时为了避免消息无穷在消息存储服务器中累积,引入了消息文件过期机制与文件存储空间报警机制。RocketMQ 的消息存储机制是其高性能、高可靠性的核心之一,重要采用了文件系统存储方式,并设计了经心优化的数据结构来包管消息的高效存储和检索。有以下几个关键构成部门和机制:
综上所述,RocketMQ的消息存储机制通过CommitLog、ConsumeQueue、IndexFile等多种文件结构和高效的刷盘计谋,结合主从复制,既包管了消息的高性能存储与检索,也确保了消息的高可靠性和数据安全性。
6.4、延迟消息

RocketMQ 支持延迟消息发送,但并非任意时间而是特定的延迟等级。延迟等级共有18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。默认的18个等级对应的时间可以修改,在 broker.conf中修改设置。


五、Docker下安装部署

1、准备镜像


  1. # 拉取rmq镜像
  2. # 最好带着版本号
  3. docker pull apache/rocketmq:4.7.1
  4. # 拉取控制台镜像
  5. docker pull styletang/rocketmq-console-ng:1.0.0
复制代码
2、挂载目次

  1. # 创建namesrv挂载目录
  2. mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
  3. # 创建broker挂载目录
  4. mkdir -p /docker/rocketmq/data/broker/logs /docker/rocketmq/data/broker/store
  5. # 创建broker配置文件
  6. mkdir -p /docker/rocketmq/data/conf
  7. touch broker.conf
  8. # broker 配置
  9. # 所属集群名称,如果节点较多可以配置多个
  10. brokerClusterName = DefaultCluster
  11. #broker名称,master和slave使用相同的名称,表明他们的主从关系
  12. brokerName = broker-a
  13. #0表示Master,大于0表示不同的slave
  14. brokerId = 0
  15. #表示几点做消息删除动作,默认是凌晨4点
  16. deleteWhen = 04
  17. #在磁盘上保留消息的时长,单位是小时
  18. fileReservedTime = 48
  19. #有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
  20. brokerRole = ASYNC_MASTER
  21. #刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
  22. flushDiskType = ASYNC_FLUSH
  23. # 设置broker节点所在服务器的ip地址,也就是centosOS7的服务ip
  24. # broker暴露的IP:端口,容器环境注意IP的映射
  25. # 外网IP,报漏服务,可外部访问
  26. brokerIP1 = 120.77.19.44
  27. # 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
  28. diskMaxUsedSpaceRatio=95
复制代码
3、启动namesvr

  1. docker run -d -p 9876:9876 \
  2. -itd --network=docker-net --ip=172.20.0.11 \
  3. --name rocketmq_nameserver \
  4. --restart=always \
  5. -v /docker/rocketmq/data/namesrv/logs:/root/logs \
  6. -v /docker/rocketmq/data/namesrv/store:/root/store \
  7. -e "MAX_POSSIBLE_HEAP=100000000" \
  8. apache/rocketmq:4.7.1 \
  9. sh mqnamesrv
复制代码

看到这个启动成功。
4、启动broker

  1. docker run -d \
  2. -itd --network=docker-net --ip=172.20.0.12 \
  3. --restart=always \
  4. --name rocketmq_broker \
  5. --link rocketmq_nameserver:namesrv \
  6. -p 10911:10911 \
  7. -p 10909:10909 \
  8. -v /docker/rocketmq/data/broker/logs:/root/logs \
  9. -v /docker/rocketmq/data/broker/store:/root/store \
  10. -v /docker/rocketmq/data/conf/broker.conf:/opt/rocketmq-4.7.1/conf/broker.conf \
  11. -e "NAMESRV_ADDR=namesrv:9876" \
  12. -e "MAX_POSSIBLE_HEAP=200000000" \
  13. -e "-Xms128m -Xmx128m -Xmn64m" \
  14. apache/rocketmq:4.7.1 sh mqbroker \
  15. -c /opt/rocketmq-4.7.1/conf/broker.conf
复制代码

看到这个启动成功。
注:namesvr和broker调整JVM内存大小

namesvr和broker用的是同一个镜像,启动后再镜像内可以看到namesvr和broker的启动文件,

修改设置文件参数即可。
5、启动控制面板

  1. docker run -d \
  2. -itd --network=docker-net --ip=172.20.0.13 \
  3. --restart=always \
  4. --name rocketmq_console \
  5. -e "JAVA_OPTS=-Drocketmq.namesrv.addr=172.20.0.11:9876 \
  6. -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
  7. -p 7100:8080 \
  8. styletang/rocketmq-console-ng:1.0.0
复制代码

看到这个启动成功。
6、访问



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4