1. Spring AMQP 简介
Spring AMQP(Spring for Advanced Message Queuing Protocol) 是 Spring 框架的一个子项目,用于简化与消息署理(如 RabbitMQ)的集成。Spring AMQP 提供了基于 AMQP 协议的抽象层,使得 Java 程序员能够更轻松地使用消息队列完成异步通讯、消息分发和数据流处置处罚。Spring AMQP 的核心模块是 spring-rabbit,它封装了与 RabbitMQ 的交互。
未来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。而且RabbitMQ官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一样平常生产情况下我们更多会联合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。而且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
官方地点:https://spring.io/projects/spring-amqp/
2. Spring AMQP 的核心组件
- RabbitTemplate:Spring AMQP 提供的主要操纵类,用于向队列发送和接收消息。它提供了简单且易用的 API 进行消息发送和接收。
- AmqpAdmin:用于管理 RabbitMQ 的资源,例如创建和删除队列、交换机、绑定等。
- @RabbitListener 和 @RabbitHandler:注解驱动的消息监听机制,可以在类或方法上使用 @RabbitListener 注解来监听指定队列的消息。
- MessageConverter:用于将 Java 对象和消息之间进行相互转换,Spring AMQP 提供了多种消息转换器,例如 Jackson JSON、SimpleMessageConverter 等,方便消息的序列化和反序列化。
发送消息示例:
- // 交换机名称
- String exchangeName = "cyt.topic"; // 指定要发送消息的交换机名称。这里使用了一个名为 "cyt.topic" 的 Topic 类型交换机
- // 消息内容
- String message = "喜报!孙悟空大战哥斯拉,胜!"; // 要发送的消息内容,可以是任何文本,当前内容为示例新闻
- // 发送消息到指定交换机和路由键
- rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
- // 使用 RabbitTemplate 将消息发送到指定的交换机。
- // 参数解释:
- // - exchangeName:要发送消息的交换机名称 "cyt.topic"。
- // - "china.news":消息的路由键,适用于 Topic 交换机,用于匹配绑定队列的路由模式。
- // - message:要发送的消息内容。
- // 如果有队列绑定到 "cyt.topic" 交换机,并且匹配 "china.news" 的路由键(例如 "china.*"),
- // 则消息会被路由到该队列并被消费者接收处理。
复制代码 3. 交换机类型
交换机的类型有四种:
- Fanout:广播,将消息交给全部绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
- Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
- Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
3.1 Fanout交换机
- 1) 可以有多个队列
- 2) 每个队列都要绑定到Exchange(交换机)
- 3) 生产者发送的消息,只能发送到交换机
- 4) 交换机把消息发送给绑定过的全部队列
- 5) 订阅队列的消费者都能拿到消息
3.2 Direct交换机
在Fanout模式中,一条消息,会被全部订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
- 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全同等,才会接收到消息
3.3 Topic交换机
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一样平常都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
4. 声明交换机和队列
程序启动时检查队列和交换机是否存在,假如不存在自动创建。
4.1 Direct模式的交换机和队列
- // 消费者1 - 监听绑定的 direct.queue1 队列
- @RabbitListener(bindings = @QueueBinding(
- // 定义并绑定队列
- value = @Queue(name = "direct.queue1"), // 创建一个名称为 direct.queue1 的队列
- // 定义并绑定交换机
- exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
- // 绑定的路由键
- key = {"red", "blue"} // 路由键,指定该消费者会接收 "red" 和 "blue" 路由键的消息
- ))
- public void listenDirectQueue1(String msg){
- // 接收到消息时打印输出
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
- // 消费者2 - 监听绑定的 direct.queue2 队列
- @RabbitListener(bindings = @QueueBinding(
- // 定义并绑定队列
- value = @Queue(name = "direct.queue2"), // 创建一个名称为 direct.queue2 的队列
- // 定义并绑定交换机
- exchange = @Exchange(name = "cyt.direct", type = ExchangeTypes.DIRECT), // 指定交换机名称 cyt.direct,类型为 Direct 交换机
- // 绑定的路由键
- key = {"red", "yellow"} // 路由键,指定该消费者会接收 "red" 和 "yellow" 路由键的消息
- ))
- public void listenDirectQueue2(String msg){
- // 接收到消息时打印输出
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
复制代码 4.2 Topic模式的交换机和队列
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue1"),
- exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
- key = "china.#"
- ))
- public void listenTopicQueue1(String msg){
- System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
- }
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "topic.queue2"),
- exchange = @Exchange(name = "cyt.topic", type = ExchangeTypes.TOPIC),
- key = "#.news"
- ))
- public void listenTopicQueue2(String msg){
- System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
- }
复制代码 5. 消息转换器
Spring的消息发送代码接收的消息体是一个Object。
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列题目:
显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。
5.1 引入依靠
- <dependency>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-xml</artifactId>
- <version>2.9.10</version>
- </dependency>
复制代码 注意,假如项目中引入了spring-boot-starter-web依靠,则无需再次引入Jackson依靠。
5.2 配置JSON转换器
配置消息转换器,在服务的启动类中添加一个Bean即可:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jackson2JsonMessageConverter.setCreateMessageIds(true);
- return jackson2JsonMessageConverter;
- }
复制代码 6. 使用示例
6.1 引入依靠
- <!--消息发送-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
复制代码 6.2 配置
- spring:
- rabbitmq:
- host: 192.168.1.101 # 你的虚拟机IP
- port: 5672 # 端口
- virtual-host: /cyt# 虚拟主机
- username: cyt# 用户名
- password: 123 # 密码
复制代码 6.3 接收消息
在服务中界说一个消息监听类:
- package com.cyt.trade.listener;
- @Component
- @RequiredArgsConstructor
- public class PayStatusListener {
- // 注入订单服务,用于更新订单状态
- private final IOrderService orderService;
- /**
- * 监听支付成功的消息队列,当接收到支付成功的消息时,更新订单状态。
- *
- * @RabbitListener 注解用于声明这是一个消息监听方法。
- * @QueueBinding 注解定义队列、交换机和路由键的绑定关系。
- * - @Queue 用于定义队列的属性,例如队列名和是否持久化。
- * - @Exchange 用于定义交换机的属性,例如交换机名和类型。
- * - key 指定路由键,将匹配该路由键的消息发送到此队列。
- *
- * @param orderId 支付成功的订单ID,从消息中提取的参数。
- */
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "trade.pay.success.queue", durable = "true"), // 定义队列名称和持久化
- exchange = @Exchange(name = "pay.topic"), // 指定交换机名称
- key = "pay.success" // 路由键,用于匹配支付成功的消息
- ))
- public void listenPaySuccess(Long orderId) {
- // 调用订单服务,更新订单为支付成功状态
- orderService.markOrderPaySuccess(orderId);
- }
- }
复制代码 6.4 发送消息
在需要发送消息的地方使用rabbitTemplate即可发送消息。
- rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |