ToB企服应用市场:ToB评测及商务社交产业平台

标题: 微服务架构 --- 使用RabbitMQ进行异步处置惩罚 [打印本页]

作者: 科技颠覆者    时间: 2024-11-17 17:41
标题: 微服务架构 --- 使用RabbitMQ进行异步处置惩罚
目次
一.什么是RabbitMQ?
二.异步调用处置惩罚逻辑:
三.RabbitMQ的基本使用:
1.安装:
2.架构图:
 3.RabbitMQ控制台的使用:
(1)Exchanges 交换机:
(2)Queues 队列:
 (3)Admin :
①Users(用户管理):
②Virtual Hosts(虚拟主机):
 四.SpringAMOP的使用:
1.导入依赖:
2.添加配置:
3.在publisher服务中使用RabbitTemplate实现消息发送:
4.界说消费者实现异步调用:
①@RequiredArgsConstructor:
②@RabbitListener:
③QueueBinding(队列绑定):
5.总流程处置惩罚过程:
五.使用配置类管理界说交换机,队列及两者关系:
1.创建队列:Queue:
2.创建交换机:Exchange:
 3.创建绑定关系:Binding:
 4.多个队列绑定到同一个交换机:
5.配置差别范例的交换机:
(1)Direct Exchange:
(2)Fanout Exchange:
(3)Headers Exchange:
总结:


一.什么是RabbitMQ?

RabbitMQ 是一种流行的消息队列(Message Queue)实现,基于 AMQP 协议(Advanced Message Queuing Protocol)。它支持异步通讯,使多个系统之间以非阻塞的方式交换数据。
在我们使用微服务的时候,微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于 OpenFeign 的调用。这种调用中,调用者发起请求后必要等待服务提供者执行业务返回结果后,才气继续执行背面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通
假如我们的业务必要实时得到服务提供方的响应,则应该选择同步通讯(同步调用)。而假如我们追求更高的服从,而且不必要实时响应,则应该选择异步通讯(异步调用)。
二.异步调用处置惩罚逻辑:

   异步调用方式实在就是基于消息通知的方式,一般包含三个脚色:
  
  

 在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据本身的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处置惩罚。这样,发送消息的人和接收消息的人就完全解耦了。
   异步调用的优势包括:
  
    固然,异步通讯也并非完善无缺,它存在下列缺点:
  
  三.RabbitMQ的基本使用:

下面是RabbitMQ的官网:https://www.rabbitmq.com/
1.安装:

起首将RabbitMQ的镜像拉取下来,然后运行下面命令:
  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=itheima \
  3. -e RABBITMQ_DEFAULT_PASS=123321 \
  4. -v mq-plugins:/plugins \
  5. --name mq \
  6. --hostname mq \
  7. -p 15672:15672 \
  8. -p 5672:5672 \
  9. --network hm-net\
  10. -d \
  11. rabbitmq:3.8-management
复制代码
随后我们访问http://虚拟机IP地址:15672来打开RabbitMQ的控制台。

   在控制台上主要可以关注三个信息:Exchanges(交换机),Queues(队列),Admin(用户管理)。
  2.架构图:

 

   其中包含几个概念:
  
   3.RabbitMQ控制台的使用:

RabbitMQ 中,交换机(Exchange)队列(Queue) 是焦点概念。它们之间的关系决定了消息的路由和存储方式。
(1)Exchanges 交换机:


Exchange(交换机)只负责转发消息,不具备存储消息的本领,因此假如没有任何队列与Exchange绑定,大概没有符合路由规则的队列,那么消息会丢失!
   RabbitMQ 提供了四种常用的交换机范例,每种范例的路由规则差别:
  
  

我们可以再这里创建交换机,Name表现创建的交换机的名字,Type表现可以选择交换机的四种范例。创建成功后就可以在上面看到创建的交换机名字:

好比我们点击amq.fanout查看交换机数据而且可以发送消息给消费者。
注意!!!假如我们不将交换机指定队列的话,由于没有消费者存在,终极消息丢失了,这样阐明交换机没有存储消息的本领。
 所以下面我们要先创建队列,然后让生产者推送的消息经过交换机的通报后,到达消息队列,然后再给消费者。所以生产者无需知道队列的存在以此来达到解耦的结果。
(2)Queues 队列:



在这里我们填写队列名字即可,其他临时可以不用填写。 
随后我们向交换机进行绑定(bind)队列,随后通过队列传输给消费者。

这里的Routing key的出现是为了让 Direct (交换机的范例)可以或许选择队列而存在的。
我们在绑定队列完成后会出现下面这样,这样证明我们成功为交换机绑定好两个队列:

随后我们在下面窗口推送消息:

 (3)Admin :


①Users(用户管理):

管理 RabbitMQ 中的用户账号,在这里 添加、删除用户,并设置每个用户的权限。
每个用户可分配差别的 脚色


   
  ②Virtual Hosts(虚拟主机):

将 RabbitMQ 服务器分别为多个 虚拟主机(vhost),类似于一个独立的命名空间。


 四.SpringAMOP的使用:

Spring AMQPSpring for Advanced Message Queuing Protocol)是 Spring 提供的一个消息队列集成模块,主要用于简化与 RabbitMQ 的集成。它通过 AMQP 协议来实现消息的生产和消费。

   
  1.导入依赖:

  1. <dependency>
  2.     <groupId>org.springframework.boot</groupId>
  3.     <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
复制代码
2.添加配置:

在publisher以及consumer服务的application.yml中添加配置:
  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.150.101 # 你的虚拟机IP
  4.     port: 5672 # 端口
  5.     virtual-host: /hmall # 虚拟主机
  6.     username: hmall # 用户名
  7.     password: 123 # 密码
  8.     listener:
  9.       simple:
  10.         prefetch: 1 # (能者多劳)每次只能获取一条消息,处理完成才能获取下一个消息
复制代码
3.在publisher服务中使用RabbitTemplate实现消息发送:

  1. @SpringBootTest
  2. public class SpringAmqpTest {
  3.     @Autowired
  4.     private RabbitTemplate rabbitTemplate;
  5.     @Test
  6.     public void testSimpleQueue() {
  7.         // 队列名称
  8.         String queueName = "simple.queue";
  9.         // 消息
  10.         String message = "hello, spring amqp!";
  11.         // 发送消息
  12.         rabbitTemplate.convertAndSend(queueName, message);
  13.     }
  14. }
复制代码
4.界说消费者实现异步调用:

  1. @Component
  2. @RequiredArgsConstructor
  3. public class PayStatusListener {
  4.     private final IOrderService orderService;
  5.     @RabbitListener(bindings = @QueueBinding(
  6.             value = @Queue(name = "trade.pay.success.queue", durable = "true"),
  7.             exchange = @Exchange(name = "pay.topic"),
  8.             key = "pay.success"
  9.     ))
  10.     public void listenPaySuccess(Long orderId){
  11.         //调用方法
  12.         orderService.markOrderPaySuccess(orderId);
  13.     }
  14. }
复制代码
①@RequiredArgsConstructor:

这是 Lombok 提供的注解,主动为类中所有 final 修饰的字段天生一个包含这些字段的构造函数。使用这个注解可以免除手动编写构造函数的麻烦,尤其是在使用依赖注入时(例如注入 IOrderService)。
②@RabbitListener:

@RabbitListener 注解用于监听来自 RabbitMQ 队列的消息。它会主动监听指定的队列,当有消息到达时,会触发 listenPaySuccess 方法进行处置惩罚。
③QueueBinding(队列绑定):

通过 @QueueBinding 注解,绑定了 队列交换机,并指定了 路由键
   
  5.总流程处置惩罚过程:


五.使用配置类管理界说交换机,队列及两者关系:

在 Spring AMQP 中,交换机(Exchange)、队列(Queue)、以及绑定(Binding)可以通过配置类来界说和管理。配置类可以资助你灵活地创建和绑定交换机与队列,而且可以根据业务需求自界说各种参数。
创建配置类结果展示:
  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. @Configuration
  8. public class RabbitMQConfig {
  9.     // 创建队列
  10.     @Bean
  11.     public Queue queue() {
  12.         return new Queue("trade.pay.success.queue", true); // durable=true 表示队列持久化
  13.     }
  14.     // 创建交换机
  15.     @Bean
  16.     public TopicExchange exchange() {
  17.         return new TopicExchange("pay.topic"); // 创建主题交换机
  18.     }
  19.     // 创建绑定关系(队列与交换机通过 routing key 绑定)
  20.     @Bean
  21.     public Binding binding(Queue queue, TopicExchange exchange) {
  22.         return BindingBuilder.bind(queue).to(exchange).with("pay.success"); // 路由键是 pay.success
  23.     }
  24. }
复制代码
1.创建队列:Queue:

  1. @Bean
  2. public Queue queue() {
  3.     return new Queue("trade.pay.success.queue", true);
  4. }
复制代码

2.创建交换机:Exchange:

  1. @Bean
  2. public TopicExchange exchange() {
  3.     return new TopicExchange("pay.topic");
  4. }
复制代码

   Topic Exchange 是一种交换机范例,它答应使用通配符来进行路由。例如,路由键可以是 "pay.*",可以匹配 "pay.success" 或 "pay.failure"。在这里可以使用四种交换机范例来界说交换机,具体场景具体分析使用。
   3.创建绑定关系:Binding:

  1. @Bean
  2. public Binding binding(Queue queue, TopicExchange exchange) {
  3.     return BindingBuilder.bind(queue).to(exchange).with("pay.success");
  4. }
复制代码

 4.多个队列绑定到同一个交换机:

我们可以将多个队列绑定到同一个交换机,并使用差别的路由键。这样可以实现根据差别的路由键来发送差别范例的消息到各自的队列。
  1. @Bean
  2. public Queue paySuccessQueue() {
  3.     return new Queue("pay.success.queue", true);
  4. }
  5. @Bean
  6. public Queue payFailureQueue() {
  7.     return new Queue("pay.failure.queue", true);
  8. }
  9. @Bean
  10. public Binding paySuccessBinding(Queue paySuccessQueue, TopicExchange exchange) {
  11.     return BindingBuilder.bind(paySuccessQueue).to(exchange).with("pay.success");
  12. }
  13. @Bean
  14. public Binding payFailureBinding(Queue payFailureQueue, TopicExchange exchange) {
  15.     return BindingBuilder.bind(payFailureQueue).to(exchange).with("pay.failure");
  16. }
复制代码

5.配置差别范例的交换机:

除了 Topic Exchange,RabbitMQ 还支持其他几种常见的交换机范例。这里分别演示如何创建 Direct ExchangeFanout ExchangeHeaders Exchange
(1)Direct Exchange:

  1. @Bean
  2. public DirectExchange directExchange() {
  3.     return new DirectExchange("direct.exchange");
  4. }
  5. @Bean
  6. public Binding directBinding(Queue queue, DirectExchange directExchange) {
  7.     return BindingBuilder.bind(queue).to(directExchange).with("direct.routing.key");
  8. }
复制代码
 Direct Exchange:直接交换时机根据 完全匹配的路由键 将消息发送到队列。只有当消息的路由键和绑定的路由键 完全一致 时,消息才会被路由到指定队列。
(2)Fanout Exchange:

  1. @Bean
  2. public FanoutExchange fanoutExchange() {
  3.     return new FanoutExchange("fanout.exchange");
  4. }
  5. @Bean
  6. public Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
  7.     return BindingBuilder.bind(queue).to(fanoutExchange);  // 不需要路由键
  8. }
复制代码
 Fanout Exchange:扇出交换时机将消息发送到所有绑定的队列,不必要考虑路由键。这个交换机通常用于广播消息。
(3)Headers Exchange:

  1. @Bean
  2. public HeadersExchange headersExchange() {
  3.     return new HeadersExchange("headers.exchange");
  4. }
  5. @Bean
  6. public Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
  7.     return BindingBuilder.bind(queue).to(headersExchange).where("header-key").matches("header-value");
  8. }
复制代码
 Headers Exchange:头交换机根据消息头的内容进行路由,而不是依赖路由键。适用于按消息的元数据进行路由的场景。
总结:

通过 Spring AMQP 的配置类,你可以非常灵活地界说 RabbitMQ 的 交换机队列绑定关系,并通过差别的路由键和交换机范例实现复杂的消息路由逻辑。以下是一些关键要点:
通过配置类来界说这些组件,可以或许简化 RabbitMQ 与 Spring 应用的集成,而且通过灵活的路由规则支持复杂的消息通报需求。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4