微服务架构 --- 使用RabbitMQ进行异步处置惩罚

打印 上一主题 下一主题

主题 902|帖子 902|积分 2706

目次
一.什么是RabbitMQ?
二.异步调用处置惩罚逻辑:
三.RabbitMQ的基本使用:
1.安装:
2.架构图:
 3.RabbitMQ控制台的使用:
(1)Exchanges 交换机:
(2)Queues 队列:
 (3)Admin :
①Users(用户管理):
②Virtual Hosts(虚拟主机):
 四.SpringAMOP的使用:
1.导入依赖:
2.添加配置:
3.在publisher服务中使用RabbitTemplate实现消息发送:
4.界说消费者实现异步调用:
①@RequiredArgsConstructor:
②@RabbitListener:
③QueueBinding(队列绑定):
5.总流程处置惩罚过程:
五.使用配置类管理界说交换机,队列及两者关系:
1.创建队列:Queue:
2.创建交换机:Exchange:
 3.创建绑定关系:Binding:
 4.多个队列绑定到同一个交换机:
5.配置差别范例的交换机:
(1)Direct Exchange:
(2)Fanout Exchange:
(3)Headers Exchange:
总结:


一.什么是RabbitMQ?

RabbitMQ 是一种流行的消息队列(Message Queue)实现,基于 AMQP 协议(Advanced Message Queuing Protocol)。它支持异步通讯,使多个系统之间以非阻塞的方式交换数据。
在我们使用微服务的时候,微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于 OpenFeign 的调用。这种调用中,调用者发起请求后必要等待服务提供者执行业务返回结果后,才气继续执行背面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通
假如我们的业务必要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而假如我们追求更高的服从,而且不必要实时响应,则应该选择异步通讯(异步调用)。
二.异步调用处置惩罚逻辑:

   异步调用方式实在就是基于消息通知的方式,一般包含三个脚色:
  

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它明白成微信服务器
  • 消息接收者:接收和处置惩罚消息的人,就是原来的服务提供方
  

 在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据本身的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处置惩罚。这样,发送消息的人和接收消息的人就完全解耦了。
   异步调用的优势包括:
  

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,克制级联失败
    固然,异步通讯也并非完善无缺,它存在下列缺点:
  

  • 完全依赖于Broker的可靠性、安全性和性能
  • 架构复杂,后期维护和调试麻烦
  三.RabbitMQ的基本使用:

下面是RabbitMQ的官网:https://www.rabbitmq.com/
1.安装:

起首将RabbitMQ的镜像拉取下来,然后运行下面命令:
  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=itheima \
  3. -e RABBITMQ_DEFAULT_PASS=123321 \
  4. -v mq-plugins:/plugins \
  5. --name mq \
  6. --hostname mq \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. --network hm-net\
  10. -d \
  11. rabbitmq:3.8-management
复制代码
随后我们访问http://虚拟机IP地址:15672来打开RabbitMQ的控制台。

   在控制台上主要可以关注三个信息:Exchanges(交换机),Queues(队列),Admin(用户管理)。
  2.架构图:

 

   其中包含几个概念:
  

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处置惩罚
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
   3.RabbitMQ控制台的使用:

RabbitMQ 中,交换机(Exchange)队列(Queue) 是焦点概念。它们之间的关系决定了消息的路由和存储方式。
(1)Exchanges 交换机:



  • 交换机是 消息的路由器,负责决定消息应该被发送到哪个队列。
  • 生产者将消息发送给交换机,而不是直接发送到队列。
  • 交换机根据路由规则 决定消息的走向(即发往哪些队列)。
Exchange(交换机)只负责转发消息,不具备存储消息的本领,因此假如没有任何队列与Exchange绑定,大概没有符合路由规则的队列,那么消息会丢失!
   RabbitMQ 提供了四种常用的交换机范例,每种范例的路由规则差别:
  

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不外RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。
  

我们可以再这里创建交换机,Name表现创建的交换机的名字,Type表现可以选择交换机的四种范例。创建成功后就可以在上面看到创建的交换机名字:

好比我们点击amq.fanout查看交换机数据而且可以发送消息给消费者。
注意!!!假如我们不将交换机指定队列的话,由于没有消费者存在,终极消息丢失了,这样阐明交换机没有存储消息的本领。
 所以下面我们要先创建队列,然后让生产者推送的消息经过交换机的通报后,到达消息队列,然后再给消费者。所以生产者无需知道队列的存在以此来达到解耦的结果。
(2)Queues 队列:



  • 队列用于 存储消息,直到消费者消费它们。
  • 队列与消费者逐一对应,即一个消费者从一个队列读取消息。
  • 队列按 FIFO(First In, First Out)的顺序存储消息。

在这里我们填写队列名字即可,其他临时可以不用填写。 
随后我们向交换机进行绑定(bind)队列,随后通过队列传输给消费者。

这里的Routing key的出现是为了让 Direct (交换机的范例)可以或许选择队列而存在的。
我们在绑定队列完成后会出现下面这样,这样证明我们成功为交换机绑定好两个队列:

随后我们在下面窗口推送消息:

 (3)Admin :


①Users(用户管理):

管理 RabbitMQ 中的用户账号,在这里 添加、删除用户,并设置每个用户的权限。
每个用户可分配差别的 脚色


  • administrator:管理员,具有所有权限。
  • monitoring:可以监控和查看信息,但不能管理。
  • policymaker:可以设置策略和参数。
  • management:可以访问管理界面但没有策略权限。

   

  • Name:itheima,也就是用户名
  • Tags:administrator,阐明itheima用户是超等管理员,拥有所有权限
  • Can access virtual host: /,可以访问的virtual host,这里的/是默认的virtual host
  ②Virtual Hosts(虚拟主机):

将 RabbitMQ 服务器分别为多个 虚拟主机(vhost),类似于一个独立的命名空间。


  • 差别的应用可以使用差别的虚拟主机,彼此隔离。
  • 每个虚拟主机都有本身的 交换机、队列和用户权限

 四.SpringAMOP的使用:

Spring AMQPSpring for Advanced Message Queuing Protocol)是 Spring 提供的一个消息队列集成模块,主要用于简化与 RabbitMQ 的集成。它通过 AMQP 协议来实现消息的生产和消费。

   

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如那里置惩罚消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的范例。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不外队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化
  1.导入依赖:

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
2.添加配置:

在publisher以及consumer服务的application.yml中添加配置:
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.150.101 # 你的虚拟机IP
  4.     port: 5672 # 端口
  5.     virtual-host: /hmall # 虚拟主机
  6.     username: hmall # 用户名
  7.     password: 123 # 密码
  8.     listener:
  9.       simple:
  10.         prefetch: 1 # (能者多劳)每次只能获取一条消息,处理完成才能获取下一个消息
复制代码
3.在publisher服务中使用RabbitTemplate实现消息发送:

  1. @SpringBootTest
  2. public class SpringAmqpTest {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSimpleQueue() {
  7.         // 队列名称
  8.         String queueName = "simple.queue";
  9.         // 消息
  10.         String message = "hello, spring amqp!";
  11.         // 发送消息
  12.         rabbitTemplate.convertAndSend(queueName, message);
  13.     }
  14. }
复制代码
4.界说消费者实现异步调用:

  1. @Component
  2. @RequiredArgsConstructor
  3. public class PayStatusListener {
  4.     private final IOrderService orderService;
  5.     @RabbitListener(bindings = @QueueBinding(
  6.             value = @Queue(name = "trade.pay.success.queue", durable = "true"),
  7.             exchange = @Exchange(name = "pay.topic"),
  8.             key = "pay.success"
  9.     ))
  10.     public void listenPaySuccess(Long orderId){
  11.         //调用方法
  12.         orderService.markOrderPaySuccess(orderId);
  13.     }
  14. }
复制代码
①@RequiredArgsConstructor:

这是 Lombok 提供的注解,主动为类中所有 final 修饰的字段天生一个包含这些字段的构造函数。使用这个注解可以免除手动编写构造函数的麻烦,尤其是在使用依赖注入时(例如注入 IOrderService)。
②@RabbitListener:

@RabbitListener 注解用于监听来自 RabbitMQ 队列的消息。它会主动监听指定的队列,当有消息到达时,会触发 listenPaySuccess 方法进行处置惩罚。
③QueueBinding(队列绑定):

通过 @QueueBinding 注解,绑定了 队列交换机,并指定了 路由键
   

  • @Queue

    • name = "trade.pay.success.queue":指定队列的名称为 trade.pay.success.queue。
    • durable = "true":表现队列是 恒久化 的,即 RabbitMQ 重启后队列依然存在。
    • 队列的作用:队列是消息的临时存储地,消费者会从队列中拉取消息并处置惩罚。

  • @Exchange

    • name = "pay.topic":指定交换机的名称为 pay.topic,这是一个 Topic Exchange(主题交换机)。
    • 交换机的作用:交换机决定消息如何路由到队列。Topic Exchange 可以根据路由键的匹配规则将消息路由到符合的队列。

  • key = "pay.success"

    • 路由键:指定了路由键为 pay.success。这意味着当生产者发送的消息路由键是 pay.success 时,消息将被路由到 trade.pay.success.queue 队列。

  5.总流程处置惩罚过程:



  • 生产者:在支付成功后,生产者会发送一条消息到 pay.topic 交换机,消息的路由键为 pay.success。
  • 交换机:pay.topic 交换时机根据路由键 pay.success 将消息路由到 trade.pay.success.queue 队列。
  • 消费者:PayStatusListener 作为消费者监听 trade.pay.success.queue,当有消息到达队列时,它会接收到订单 ID 并调用订单服务更新订单状态。
五.使用配置类管理界说交换机,队列及两者关系:

在 Spring AMQP 中,交换机(Exchange)、队列(Queue)、以及绑定(Binding)可以通过配置类来界说和管理。配置类可以资助你灵活地创建和绑定交换机与队列,而且可以根据业务需求自界说各种参数。
创建配置类结果展示:
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // 创建队列
  10.     @Bean
  11.     public Queue queue() {
  12.         return new Queue("trade.pay.success.queue", true); // durable=true 表示队列持久化
  13.     }
  14.     // 创建交换机
  15.     @Bean
  16.     public TopicExchange exchange() {
  17.         return new TopicExchange("pay.topic"); // 创建主题交换机
  18.     }
  19.     // 创建绑定关系(队列与交换机通过 routing key 绑定)
  20.     @Bean
  21.     public Binding binding(Queue queue, TopicExchange exchange) {
  22.         return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由键是 pay.success
  23.     }
  24. }
复制代码
1.创建队列:Queue:

  1. @Bean
  2. public Queue queue() {
  3.     return new Queue("trade.pay.success.queue", true);
  4. }
复制代码


  • 作用:通过 @Bean 注解界说了一个队列 Bean。Spring 容器会主动管理这个队列,并在 RabbitMQ 上创建该队列。
  • 参数

    • "trade.pay.success.queue":这是队列的名称。每个队列在 RabbitMQ 中必须有唯一的名称。
    • true:表现这个队列是 恒久化 的。恒久化的队列在 RabbitMQ 服务重启后依然存在。

2.创建交换机:Exchange:

  1. @Bean
  2. public TopicExchange exchange() {
  3.     return new TopicExchange("pay.topic");
  4. }
复制代码


  • 作用:通过 @Bean 注解界说了一个 Topic Exchange 范例的交换机。
  • 参数

    • "pay.topic":这是交换机的名称。同样,交换机在 RabbitMQ 中也必须有唯一的名称。

   Topic Exchange 是一种交换机范例,它答应使用通配符来进行路由。例如,路由键可以是 "pay.*",可以匹配 "pay.success" 或 "pay.failure"。在这里可以使用四种交换机范例来界说交换机,具体场景具体分析使用。
   3.创建绑定关系:Binding:

  1. @Bean
  2. public Binding binding(Queue queue, TopicExchange exchange) {
  3.     return BindingBuilder.bind(queue).to(exchange).with("pay.success");
  4. }
复制代码


  • 作用:通过 @Bean 注解界说了队列和交换机的绑定关系。这个绑定决定了消息在何种条件下会从交换机路由到队列。
  • 参数

    • queue:我们界说的 trade.pay.success.queue 队列。
    • exchange:我们界说的 pay.topic 交换机。
    • "pay.success":这是路由键(Routing Key)。它用于告诉交换机:只有当消息的路由键是 pay.success 时,消息才会被路由到 trade.pay.success.queue 队列。

 4.多个队列绑定到同一个交换机:

我们可以将多个队列绑定到同一个交换机,并使用差别的路由键。这样可以实现根据差别的路由键来发送差别范例的消息到各自的队列。
  1. @Bean
  2. public Queue paySuccessQueue() {
  3.     return new Queue("pay.success.queue", true);
  4. }
  5. @Bean
  6. public Queue payFailureQueue() {
  7.     return new Queue("pay.failure.queue", true);
  8. }
  9. @Bean
  10. public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
  11.     return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
  12. }
  13. @Bean
  14. public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
  15.     return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
  16. }
复制代码


  • 在这个例子中,pay.success.queue 和 pay.failure.queue 都绑定到同一个交换机 pay.topic,但使用差别的路由键。
  • 消息路由逻辑

    • 当生产者发送路由键为 pay.success 的消息时,消息会路由到 pay.success.queue 队列。
    • 当生产者发送路由键为 pay.failure 的消息时,消息会路由到 pay.failure.queue 队列。

5.配置差别范例的交换机:

除了 Topic Exchange,RabbitMQ 还支持其他几种常见的交换机范例。这里分别演示如何创建 Direct ExchangeFanout ExchangeHeaders Exchange
(1)Direct Exchange:

  1. @Bean
  2. public DirectExchange directExchange() {
  3.     return new DirectExchange("direct.exchange");
  4. }
  5. @Bean
  6. public Binding directBinding(Queue queue, DirectExchange directExchange) {
  7.     return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
  8. }
复制代码
 Direct Exchange:直接交换时机根据 完全匹配的路由键 将消息发送到队列。只有当消息的路由键和绑定的路由键 完全一致 时,消息才会被路由到指定队列。
(2)Fanout Exchange:

  1. @Bean
  2. public FanoutExchange fanoutExchange() {
  3.     return new FanoutExchange("fanout.exchange");
  4. }
  5. @Bean
  6. public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
  7.     return BindingBuilder.bind(queue).to(fanoutExchange);  // 不需要路由键
  8. }
复制代码
 Fanout Exchange:扇出交换时机将消息发送到所有绑定的队列,不必要考虑路由键。这个交换机通常用于广播消息。
(3)Headers Exchange:

  1. @Bean
  2. public HeadersExchange headersExchange() {
  3.     return new HeadersExchange("headers.exchange");
  4. }
  5. @Bean
  6. public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
  7.     return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
  8. }
复制代码
 Headers Exchange:头交换机根据消息头的内容进行路由,而不是依赖路由键。适用于按消息的元数据进行路由的场景。
总结:

通过 Spring AMQP 的配置类,你可以非常灵活地界说 RabbitMQ 的 交换机队列绑定关系,并通过差别的路由键和交换机范例实现复杂的消息路由逻辑。以下是一些关键要点:

  • 队列(Queue):消息的临时存储地,可以是恒久化的。
  • 交换机(Exchange):控制消息如何分发到差别的队列。

    • Direct Exchange:严格匹配路由键。
    • Topic Exchange:支持通配符匹配路由键。
    • Fanout Exchange:广播消息到所有绑定的队列。
    • Headers Exchange:根据消息头的内容进行路由。

  • 绑定(Binding):将队列与交换机连接起来,使用路由键来决定消息的流向。
通过配置类来界说这些组件,可以或许简化 RabbitMQ 与 Spring 应用的集成,而且通过灵活的路由规则支持复杂的消息通报需求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

科技颠覆者

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表