ToB企服应用市场:ToB评测及商务社交产业平台

标题: RocketMQ(一):消息中央件缘起,一览整体架构及核心组件 [打印本页]

作者: 知者何南    时间: 2025-1-7 14:20
标题: RocketMQ(一):消息中央件缘起,一览整体架构及核心组件
RocketMQ(一):消息中央件缘起,一览整体架构及核心组件

消息队列MessageQueue,简称MQ
在队列的基础上,参加生产者与消耗者模型,使用队列作为载体就可以大概组成简单的消息队列,在队列中“运输”的数据被称为消息

消息队列可以在单节点内存中使用,也可以作为分布式存储的中央件来使用
由于项目的架构组织,现在常接触的消息队列往往是作为分布式存储的消息中央件来使用,比如:RabbitMQ、RocketMQ、Kafka等
内存队列相比于消息中央件往往有轻量、低延迟(无需网络通信)、简单易用的特点,但也存在不能持久化(消息丢了怎么办?)、无法扩展(消息量太大怎么办?)的缺陷
消息中央件的特点较多如:持久化、高可用、集群扩展、负载均衡、系统解耦等特点,但同时也会增加调用链路、提升系统复杂度,因此常用于分布式系统中
   特点
  异步通信:MQ提供异步通信,无需同步等待,得当必要异步场景
持久化:消息会进行持久化,持久化后无需担心异步通信的消息会丢失
削峰填谷:面对突发流量,MQ相当于缓冲区,防止后端服务短时间内接收过多请求导致服务崩溃
系统解耦:松耦合,生产者(调用方)、消耗者(被调用方)可以独立升级/扩展
集群:与其他中央件集群类似,方便水平/垂直扩展,提高系统吞吐量/可用性
消息中央件除了这些特点外,另有它们独有的功能与特点,本文就从RocketMQ开始,快速入门消息中央件专栏
在专栏中一步步分析消息中央件的架构、流程、原理、源码等,再分析各种消息中央件的上风以及适用场景
RocketMQ架构概念


通过下面两个主节点的Broker图,很容易的可以明白它们的关系:


通过以下的架构图,可以大概容易明白NameServer、Broker、Product、Consumer集群之间的关系:

在这个流程中,小菜有个疑问:为什么Product、Consumer获取Broker数据要通过NameServer通信?
Spring Boot 快速上手

RocketMQ的broker作为服务端,NameServer作为注册中央,与编写代码的接触比较少,较多的照旧生产者与消耗者(客户端)
经过大量的理论知识,我们知道MQ的大致流程,接下来使用SpringBoot编写代码实现Product和Consumer客户端
原生RocketMQ提供的生产者与消耗者API繁多并且使用时必要try catch、使用起来贫苦,企业级开辟通常会在其基础上进行封装常用的API
Spring Boot 框架作为脚手架,整合RocketMQ会非常快,并且还提供对应的RocketMQTemplate对原生API进行封装简化开辟
  1. <dependency>
  2.     <groupId>org.apache.rocketmq</groupId>
  3.     <artifactId>rocketmq-spring-boot-starter</artifactId>
  4.     <version>2.2.3</version>
  5. </dependency>
复制代码
企业级开辟经常会对原生API进行封装,而其中的ServerProduct是自界说的类,组合原生默认的生产者DefaultMQProducer来封装API简化开辟
在这个过程中,通常会用设置文件的方式设置有关生产者的参数如:组名、nameserver地址、发送消息失败重试次数、发送消息超时时间等
设置完参数后,启动生产者 producer.start()
  1. public class ServerProduct {
  2.     private DefaultMQProducer producer;
  3.     public ServerProduct(String producerGroup) {
  4.         producer = new DefaultMQProducer(producerGroup);
  5.         init();
  6.     }
  7.     public ServerProduct() {
  8.         producer = new DefaultMQProducer("Default_Server_Producer_Group");
  9.         init();
  10.     }
  11.     private void init() {
  12.         //初始化 主要通过配置文件的值进行set 最后启动生产者
  13.         producer.setNamesrvAddr("127.0.0.1:9876");
  14.         //...
  15.         try {
  16.             producer.start();
  17.         } catch (MQClientException e) {
  18.             throw new RuntimeException(e);
  19.         }
  20.     }
  21. }
复制代码
启动生产者重要会去启动定时使命同步nameserver、broker数据,并初始化一些组件,后续用于客户端网络通信、负载均衡等
这些原理放到后文源码分析再具体聊聊~
然后再封装一个发送消息的API:
sendSyncMsg 发送同步消息API中第一个参数为topic(一级分类),第二个传输为tag(二级分类),第三个传输为消息体
内部会通过参数构建Message并使用原生API发送给Broker
  1. public SendResult sendSyncMsg(String topic, String tag, String jsonBody) {
  2.     Message message = new Message(topic, tag, jsonBody.getBytes(StandardCharsets.UTF_8));
  3.     SendResult sendResult;
  4.     try {
  5.         sendResult = producer.send(message);
  6.     } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  7.         throw new RuntimeException(e);
  8.     }
  9.     return sendResult;
  10. }
复制代码
  1. @RequestMapping("/warn")
  2. @RestController
  3. @Slf4j
  4. public class WarnController {
  5.     private static final String topic = "TopicTest";
  6.     @Autowired
  7.     private ServerProduct producer;
  8.     @GetMapping("/syncSend")
  9.     public SendResult syncSend() {
  10.         return producer.sendSyncMsg(topic, "tag", "sync hello world!");
  11.     }
  12. }
复制代码
发送完消息后,消息会持久化到broker中,因此我们必要使用消耗者获取消息并进行消耗
企业级开辟时通常会使用注解的方式标识consumer必要订阅的信息,再通过分析注解的方式将数据注入的消耗者中,我们这里直接使用spring提供的注解@RocketMQMessageListener
  1. @Component
  2. @RocketMQMessageListener(topic = "TopicTest", consumerGroup = "warn_consumer_group")
  3. public class WarnConsumer implements RocketMQListener<String> {
  4.     @Override
  5.     public void onMessage(String message) {
  6.         // 处理消息
  7.         System.out.println("Received message: " + message);
  8.     }
  9. }
复制代码
使用Dashboard手动创建Topic
NameServer、Broker的摆设可以查看官方文档
至此我们履历过消息的生产与消耗,但在消息的“一生”中还可能出现各种各样的环境:
这些环境后文都会由浅入深一一解决,查看RocketMQ实现原理,并从原理中体验出计划的思想,再去查看其他的消息中央件~
总结

消息中央件通常有削峰填谷、异步通信、架构解耦、高性能、高可用、集群扩展、负载均衡等相干特点,同时项目中引入消息中央件也会增加调用链路、系统复杂度
RocketMQ由NameServer、Broker、Product、Consumer等集群组成
其中Broker作为服务端,负责接收消息、对消息进行高效持久化、消耗消息时高效查询
界说Topic对消息进行分类,为了提升水平扩展写入本领,Topic下可以设置MessageQueue队列,消息作为数据载体存储在队列中,等待被消耗
为了满足高可用,相同的Topic会被放到不同的master broker,避免”所有坤蛋都在同一个篮子“中
NameServer集群作为“注册中央”,节点无状态之间互不通信,只与Broker集群心跳同时更新路由信息,比及Product、Consumer定时通信时再将Broker信息进行传输
Product为消息生产方,通过与NameServer获取的Broker中Topic、队列ID等信息,使用负载均衡算法后找到对应Broker进行通信
Consumer为消息消耗者,根据再均衡负载均衡得到自己负责消耗的队列,再通过Broker获取消息进行消耗
最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 消息中央件,感爱好的同学可以连续关注喔
本篇文章笔记以及案例被收入 Gitee-CaiCaiJava、 Github-CaiCaiJava,除此之外另有更多Java进阶相干知识,感爱好的同学可以starred连续关注喔~
有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~
关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4