IT评测·应用市场-qidao123.com

标题: 【RabbitMQ高级特性】消息可靠性原理 [打印本页]

作者: tsx81429    时间: 2024-8-28 23:01
标题: 【RabbitMQ高级特性】消息可靠性原理
1. 消息确认机制

1.1 先容

我们可以看到RabbitMQ的消息流转图:

当消息从Broker投递给消耗者的时候会存在以下两种环境:
如果说RabbitMQ在每次将消息投递给消耗者的时候就将消息从Broker中删除,此时如果消息处置惩罚异常,就会造成消息丢失的环境!因此RabbitMQ提供了消息确认机制(Message Acknowledge),消耗者可以设置autoAck参数来举行确认:

当autoAck参数设置为false的时候,消息会被分为两部分:一部分是等待举行投递的消息,另一部分是已经投递但是还没有等到消耗者复兴的消息,其布局如下:

从RabbitMQ的Web管理平台也可以看到这两种状态:

1.2 手动确认方法

消耗者在收到消息之后,可以举行确认应答,也可以举行拒绝确认,RabbitMQ也提供的不同简直认方法API,在消耗者端可以使用channel的以下三种不同API举行应答:
表示消息已经被消耗者精确处置惩罚,通知RabbitMQ可以将消息举行移除了
参数说明:

表示消耗者拒绝该消息
参数说明:

表示消耗者拒绝该消息,并且可以批量拒绝消息
参数说明(参考上方)
1.3 代码演示

下面我们基于Spring-AMQP演示消息简直认机制,该确认机制有三种模式可以配置(必要注意与上述client模式有些不同):
1.3.1 NONE模式

该模式类似于上述讲的主动确认模式:即只要Broker将消息投递给消耗者就会删除队列中的消息,而不管消耗者有没有消耗成功,可能会造成消息丢失场景!
  1. spring:
  2.   application:
  3.     name: mq-advanced
  4.   rabbitmq:
  5.     username: guest
  6.     password: guest
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     virtual-host: springboot-mq
  10.     listener:
  11.       simple:
  12.         acknowledge-mode: NONE # NONE模式
复制代码
  1. @RequestMapping("/none")
  2. public String testNone() {
  3.     rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
  4.     return "消息发送成功!";
  5. }
复制代码
  1. @Component
  2. public class AckListener {
  3.     @RabbitListener(queues = QueueConstant.ACK_QUEUE)
  4.     public void ackListener(Message message, Channel channel) {
  5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.         String body = new String(message.getBody(), StandardCharsets.UTF_8);
  7.         System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
  8.         System.out.println("开始处理消息...");
  9.         int ret = 3 / 0;
  10.         System.out.println("消息处理完毕...");
  11.     }
  12. }
复制代码
此时就会出现以下环境:在消耗者中抛出异常,但是RabbitMQ中消息已经丢失!
1.3.2 AUTO模式(默认)

该模式作用如下:

  1. spring:
  2.   application:
  3.     name: mq-advanced
  4.   rabbitmq:
  5.     username: guest
  6.     password: guest
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     virtual-host: springboot-mq
  10.     listener:
  11.       simple:
  12.         acknowledge-mode: AUTO # AUTO模式
复制代码
  1. @RequestMapping("/none")
  2. public String testNone() {
  3.     rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
  4.     return "消息发送成功!";
  5. }
复制代码
  1. @Component
  2. public class AckListener {
  3.     @RabbitListener(queues = QueueConstant.ACK_QUEUE)
  4.     public void ackListener(Message message, Channel channel) {
  5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.         String body = new String(message.getBody(), StandardCharsets.UTF_8);
  7.         System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
  8.         System.out.println("开始处理消息...");
  9.         int ret = 3 / 0;
  10.         System.out.println("消息处理完毕...");
  11.     }
  12. }
复制代码
此时就会出现以下环境:在消耗者中抛出异常,但是消息不会丢失,而是源源不停投递给可用的消耗者!

1.3.3 MANUAL模式

该模式就可以举行手动确认:
  1. spring:
  2.   application:
  3.     name: mq-advanced
  4.   rabbitmq:
  5.     username: guest
  6.     password: guest
  7.     host: 127.0.0.1
  8.     port: 5672
  9.     virtual-host: springboot-mq
  10.     listener:
  11.       simple:
  12.         acknowledge-mode: MANUAL # MANUAL模式
复制代码
  1. @RequestMapping("/none")
  2. public String testNone() {
  3.     rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
  4.     return "消息发送成功!";
  5. }
复制代码
  1. @Component
  2. public class AckListener {
  3.     @RabbitListener(queues = QueueConstant.ACK_QUEUE)
  4.     public void ackListener(Message message, Channel channel) throws IOException {
  5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.         try {
  7.             String body = new String(message.getBody(), StandardCharsets.UTF_8);
  8.             System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
  9.             System.out.println("开始处理消息...");
  10.             int ret = 3 / 0;
  11.             System.out.println("消息处理完毕...");
  12.             channel.basicAck(deliveryTag, false);
  13.         } catch (Exception e) {
  14.             channel.basicReject(deliveryTag, true);
  15.         }
  16.     }
  17. }
复制代码
此时就会出现以下环境:如果处置惩罚成功,就会举行basicAck肯定确认,但是如果捕获到了异常就举行拒绝确认,并将消息重新入队投递给下一个消耗者使用!

2. 长期化机制

2.1 先容

我们再次回看RabbitMQ的消息流转图:

前面我们通过消息确认机制包管了Broker可以大概将消息可靠地投递给Consumer消耗者端,但是如今还存在一个问题:当消息存储在Broker中,但是RabbitMQ服务器遇到断电重启的环境怎样包管将消息恢复呢?RabbitMQ就提供了 长期化机制 ,在RabbitMQ中有以下三种长期化:
2.2 队列长期化

队列的长期化是通过在声明队列的时候设置参数durable为true实现的

我们之前所创建队列的代码默认设置为长期化:
  1. /**
  2. * 声明持久化队列
  3. */
  4. @Bean("persistQueue")
  5. public Queue persistQueue() {
  6.     return QueueBuilder
  7.             .durable(QueueConstant.PERSIST_QUEUE)
  8.             .build();
  9. }
复制代码
追踪durable方法源码:

继续追踪setDurable方法源码可以发现默认是举行长期化的!

如果我们想要设置队列为非长期化,可以使用如下代码:
  1. /**
  2. * 声明非持久化队列
  3. */
  4. @Bean("nonPersistQueue")
  5. public Queue nonPersistQueue() {
  6.     return QueueBuilder
  7.             .nonDurable(QueueConstant.NON_PERSIST_QUEUE)
  8.             .build();
  9. }
复制代码
2.3 交换机长期化

交换机的长期化是通过在声明交换机的时候设置参数durable为true实现的
同队列一样,只有设置为长期化,才会将有关交换机的元数据信息生存在硬盘上,在重启RabbitMQ服务器的时候才会读取然后恢复交换机数据信息,我们可以通过在声明交换机的时候设置durable(true | false)表现声明是否长期化:
  1. /**
  2. * 声明持久化交换机
  3. */
  4. @Bean("persistDirectExchange")
  5. public DirectExchange persistDirectExchange() {
  6.     return ExchangeBuilder
  7.             .directExchange(ExchangeConstant.PERSIST_EXCHANGE)
  8.             .durable(true)
  9.             .build();
  10. }
  11. /**
  12. * 声明非持久化交换机
  13. */
  14. @Bean("nonPersistDirectExchange")
  15. public DirectExchange nonPersistDirectExchange() {
  16.     return ExchangeBuilder
  17.             .directExchange(ExchangeConstant.NON_PERSIST_EXCHANGE)
  18.             .durable(false)
  19.             .build();
  20. }
复制代码
2.4 消息长期化

如果想要让消息举行长期化,我们就必要设置消息的投递模式MessageProperties.deliveryMode为PERSISITENT,使用RabbitTemplate发送长期化消息代码如下:
  1. @RestController
  2. public class PersistController {
  3.     @Resource
  4.     private RabbitTemplate rabbitTemplate;
  5.     @RequestMapping("/persist")
  6.     public String sendPersist() {
  7.         MessageProperties messageProperties = new MessageProperties();
  8.         messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  9.         Message message = new Message("persist info".getBytes(), messageProperties);
  10.         rabbitTemplate.convertAndSend(ExchangeConstant.PERSIST_EXCHANGE, "persist", message);
  11.         return "发送成功!";
  12.     }
  13. }
复制代码
如果想要设置消息的不长期化,则对应代码如下:
  1. @RequestMapping("/nonPersist")
  2. public String sendNonPersist() {
  3.     MessageProperties messageProperties = new MessageProperties();
  4.     messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
  5.     Message message = new Message("non-persist info".getBytes(), messageProperties);
  6.     rabbitTemplate.convertAndSend(ExchangeConstant.NON_PERSIST_EXCHANGE, "non-persist", message);
  7.     return "发送成功!";
  8. }
复制代码
3. 发送方确认机制

3.1 先容

我们再次回看RabbitMQ的消息流转图:

如今我们通过消息确认机制包管从Broker到Consumer链路上消息可靠性,通过长期化机制包管Broker内部消息可靠性,但是此时还存在着问题:如果说消息在生产者投递给Broker过程中由于网络等问题导致消息丢失、或者Broker处于重启等服务不可用状态该怎么办呢?即生产者怎样包管消息可以大概可靠到达RabbitMQ服务器?
RabbitMQ为相识决这个问题,提供了以下两种机制:
在发送方确认机制中,可以配置以下两种模式:
确认模式指的是在发送者发送消息时设置一个ConfirmCallback的监听器,无论消息是否到达对应的Exchange,这个监听都会实行。如果消息到达对应的Exchange,则对应ACK参数为true,反之没有到达Exchange则ACK参数为false
我们等待Exchange可以大概依据特定的路由规则将消息投递给对应的队列,但是如果设置的路由键错误或者队列不存在时导致消息迟迟没有投递给队列,此时我们希望可以将消息退回给生产者,退回模式指的是在发送者发送消息时设置一个ReturnsCallback的监听器对退回的消息举行处置惩罚
   




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4