SpringAMQP — RabbitMQ操纵工具

打印 上一主题 下一主题

主题 848|帖子 848|积分 2544

1. Spring AMQP 简介

Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息署理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通讯、消息分发和数据流处置处罚。Spring AMQP 的核心模块是 spring-rabbit,它封装了与 RabbitMQ 的交互。
 未来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。而且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一样平常生产情况下我们更多会联合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。而且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
官方地点:https://spring.io/projects/spring-amqp/
2. Spring AMQP 的核心组件



  • RabbitTemplate:Spring AMQP 提供的主要操纵类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。
  • AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。
  • @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用 @RabbitListener 注解来监听指定队列的消息。
  • MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。
发送消息示例:
  1. // 交换机名称
  2. String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机
  3. // 消息内容
  4. String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻
  5. // 发送消息到指定交换机和路由键
  6. rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
  7. // 使用 RabbitTemplate 将消息发送到指定的交换机。
  8. // 参数解释:
  9. // - exchangeName:要发送消息的交换机名称 "cyt.topic"。
  10. // - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
  11. // - message:要发送的消息内容。
  12. // 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
  13. // 则消息会被路由到该队列并被消费者接收处理。
复制代码
3. 交换机类型

交换机的类型有四种:


  • Fanout:广播,将消息交给全部绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符


  • Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.1 Fanout交换机



  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的全部队列
  • 5) 订阅队列的消费者都能拿到消息
3.2 Direct交换机

在Fanout模式中,一条消息,会被全部订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:


  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全同等,才会接收到消息
3.3 Topic交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一样平常都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:


  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词
4. 声明交换机和队列

程序启动时检查队列和交换机是否存在,假如不存在自动创建。
4.1 Direct模式的交换机和队列

  1. // 消费者1 - 监听绑定的 direct.queue1 队列
  2. @RabbitListener(bindings = @QueueBinding(
  3.     // 定义并绑定队列
  4.     value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
  5.     // 定义并绑定交换机
  6.     exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
  7.     // 绑定的路由键
  8.     key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
  9. ))
  10. public void listenDirectQueue1(String msg){
  11.     // 接收到消息时打印输出
  12.     System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
  13. }
  14. // 消费者2 - 监听绑定的 direct.queue2 队列
  15. @RabbitListener(bindings = @QueueBinding(
  16.     // 定义并绑定队列
  17.     value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
  18.     // 定义并绑定交换机
  19.     exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
  20.     // 绑定的路由键
  21.     key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
  22. ))
  23. public void listenDirectQueue2(String msg){
  24.     // 接收到消息时打印输出
  25.     System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
  26. }
复制代码
4.2 Topic模式的交换机和队列

  1. @RabbitListener(bindings = @QueueBinding(
  2.     value = @Queue(name = "topic.queue1"),
  3.     exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
  4.     key = "china.#"
  5. ))
  6. public void listenTopicQueue1(String msg){
  7.     System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
  8. }
  9. @RabbitListener(bindings = @QueueBinding(
  10.     value = @Queue(name = "topic.queue2"),
  11.     exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
  12.     key = "#.news"
  13. ))
  14. public void listenTopicQueue2(String msg){
  15.     System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
  16. }
复制代码
5. 消息转换器

Spring的消息发送代码接收的消息体是一个Object。
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列题目:


  • 数据体积过大
  • 有安全漏洞
  • 可读性差
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
5.1 引入依靠

  1. <dependency>
  2.     <groupId>com.fasterxml.jackson.dataformat</groupId>
  3.     <artifactId>jackson-dataformat-xml</artifactId>
  4.     <version>2.9.10</version>
  5. </dependency>
复制代码
注意,假如项目中引入了spring-boot-starter-web依靠,则无需再次引入Jackson依靠。
5.2 配置JSON转换器

配置消息转换器,在服务的启动类中添加一个Bean即可:
  1. @Bean
  2. public MessageConverter messageConverter(){
  3.     // 1.定义消息转换器
  4.     Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  5.     // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  6.     jackson2JsonMessageConverter.setCreateMessageIds(true);
  7.     return jackson2JsonMessageConverter;
  8. }
复制代码
6. 使用示例

6.1 引入依靠

  1.   <!--消息发送-->
  2.   <dependency>
  3.       <groupId>org.springframework.boot</groupId>
  4.       <artifactId>spring-boot-starter-amqp</artifactId>
  5.   </dependency>
复制代码
6.2 配置

  1. spring:
  2.   rabbitmq:
  3.     host: 192.168.1.101 # 你的虚拟机IP
  4.     port: 5672 # 端口
  5.     virtual-host: /cyt# 虚拟主机
  6.     username: cyt# 用户名
  7.     password: 123 # 密码
复制代码
6.3 接收消息

在服务中界说一个消息监听类:
  1. package com.cyt.trade.listener;
  2. @Component
  3. @RequiredArgsConstructor
  4. public class PayStatusListener {
  5.     // 注入订单服务,用于更新订单状态
  6.     private final IOrderService orderService;
  7.     /**
  8.      * 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
  9.      *
  10.      * @RabbitListener 注解用于声明这是一个消息监听方法。
  11.      * @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
  12.      *   - @Queue 用于定义队列的属性,例如队列名和是否持久化。
  13.      *   - @Exchange 用于定义交换机的属性,例如交换机名和类型。
  14.      *   - key 指定路由键,将匹配该路由键的消息发送到此队列。
  15.      *
  16.      * @param orderId 支付成功的订单ID,从消息中提取的参数。
  17.      */
  18.     @RabbitListener(bindings = @QueueBinding(
  19.             value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
  20.             exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
  21.             key = "pay.success" // 路由键,用于匹配支付成功的消息
  22.     ))
  23.     public void listenPaySuccess(Long orderId) {
  24.         // 调用订单服务,更新订单为支付成功状态
  25.         orderService.markOrderPaySuccess(orderId);
  26.     }
  27. }
复制代码
6.4 发送消息

在需要发送消息的地方使用rabbitTemplate即可发送消息。
  1. rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

悠扬随风

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表