1. 消息确认机制
1.1 先容
我们可以看到RabbitMQ的消息流转图:
当消息从Broker投递给消耗者的时候会存在以下两种环境:
- consumer消耗消息成功
- consumer消耗消息异常
如果说RabbitMQ在每次将消息投递给消耗者的时候就将消息从Broker中删除,此时如果消息处置惩罚异常,就会造成消息丢失的环境!因此RabbitMQ提供了消息确认机制(Message Acknowledge),消耗者可以设置autoAck参数来举行确认:
- 主动确认:当设置autoAck参数为true时,RabbitMQ就会将本身发送出去的消息置为确认,并从内存和硬盘上移除,不管消耗者是否消耗消息成功,实用于消息可靠性要求不高的场景
- 手动确认:当设置autoAck参数为false时,RabbitMQ会等待消耗者表现调用Basic.Ack命令,如果确认消耗成功则举行删除消息操作,实用于消息可靠性较高的场景
当autoAck参数设置为false的时候,消息会被分为两部分:一部分是等待举行投递的消息,另一部分是已经投递但是还没有等到消耗者复兴的消息,其布局如下:
从RabbitMQ的Web管理平台也可以看到这两种状态:
1.2 手动确认方法
消耗者在收到消息之后,可以举行确认应答,也可以举行拒绝确认,RabbitMQ也提供的不同简直认方法API,在消耗者端可以使用channel的以下三种不同API举行应答:
- 肯定应答:channel.basicAck(long deliveryTag, boolean multiple)
表示消息已经被消耗者精确处置惩罚,通知RabbitMQ可以将消息举行移除了
参数说明:
- deliveryTag:是消息的唯一标识,是一个64位递增的长整数,该参数由每个channel举行单独维护,即在每个channel内部deliveryTag是不重复的
- multiple:是否举行批量确认,在某些环境下为了减少网络传输带宽,可以对连续的多个deliveryTag举行批量确认,当值设置为true的时候则会将ack<=deliveryTag的消息全部确认;如果值设置为false则只会将对应deliveryTag的消息举行确认
- 否定确认:channel.basicReject(long deliveryTag, boolean requeue)
表示消耗者拒绝该消息
参数说明:
- deliveryTag:参考basicAck
- requeue:表示拒绝该消息之后该消息怎样处置惩罚,如果设置为true,则RabbitMQ会重新将该消息放入队列,以便投递给下一个订阅的消耗者;如果设置为false,则RabbitMQ会将该消息从队列中移除
- 否定确认:channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
表示消耗者拒绝该消息,并且可以批量拒绝消息
参数说明(参考上方)
1.3 代码演示
下面我们基于Spring-AMQP演示消息简直认机制,该确认机制有三种模式可以配置(必要注意与上述client模式有些不同):
1.3.1 NONE模式
该模式类似于上述讲的主动确认模式:即只要Broker将消息投递给消耗者就会删除队列中的消息,而不管消耗者有没有消耗成功,可能会造成消息丢失场景!
- spring:
- application:
- name: mq-advanced
- rabbitmq:
- username: guest
- password: guest
- host: 127.0.0.1
- port: 5672
- virtual-host: springboot-mq
- listener:
- simple:
- acknowledge-mode: NONE # NONE模式
复制代码- @RequestMapping("/none")
- public String testNone() {
- rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
- return "消息发送成功!";
- }
复制代码- @Component
- public class AckListener {
- @RabbitListener(queues = QueueConstant.ACK_QUEUE)
- public void ackListener(Message message, Channel channel) {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- String body = new String(message.getBody(), StandardCharsets.UTF_8);
- System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
- System.out.println("开始处理消息...");
- int ret = 3 / 0;
- System.out.println("消息处理完毕...");
- }
- }
复制代码 此时就会出现以下环境:在消耗者中抛出异常,但是RabbitMQ中消息已经丢失!
1.3.2 AUTO模式(默认)
该模式作用如下:
- 当消耗者业务代码处置惩罚正常时就会对消息举行确认
- 但是如果消耗者业务代码中抛出了异常,就会对消息举行否定确认并重新投递
- spring:
- application:
- name: mq-advanced
- rabbitmq:
- username: guest
- password: guest
- host: 127.0.0.1
- port: 5672
- virtual-host: springboot-mq
- listener:
- simple:
- acknowledge-mode: AUTO # AUTO模式
复制代码- @RequestMapping("/none")
- public String testNone() {
- rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
- return "消息发送成功!";
- }
复制代码- @Component
- public class AckListener {
- @RabbitListener(queues = QueueConstant.ACK_QUEUE)
- public void ackListener(Message message, Channel channel) {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- String body = new String(message.getBody(), StandardCharsets.UTF_8);
- System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
- System.out.println("开始处理消息...");
- int ret = 3 / 0;
- System.out.println("消息处理完毕...");
- }
- }
复制代码 此时就会出现以下环境:在消耗者中抛出异常,但是消息不会丢失,而是源源不停投递给可用的消耗者!
1.3.3 MANUAL模式
该模式就可以举行手动确认:
- spring:
- application:
- name: mq-advanced
- rabbitmq:
- username: guest
- password: guest
- host: 127.0.0.1
- port: 5672
- virtual-host: springboot-mq
- listener:
- simple:
- acknowledge-mode: MANUAL # MANUAL模式
复制代码- @RequestMapping("/none")
- public String testNone() {
- rabbitTemplate.convertAndSend("", QueueConstant.ACK_QUEUE, "test none mode");
- return "消息发送成功!";
- }
复制代码- @Component
- public class AckListener {
- @RabbitListener(queues = QueueConstant.ACK_QUEUE)
- public void ackListener(Message message, Channel channel) throws IOException {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- String body = new String(message.getBody(), StandardCharsets.UTF_8);
- System.out.println("接收到消息: " + body + " deliveryTag: " + deliveryTag);
- System.out.println("开始处理消息...");
- int ret = 3 / 0;
- System.out.println("消息处理完毕...");
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- channel.basicReject(deliveryTag, true);
- }
- }
- }
复制代码 此时就会出现以下环境:如果处置惩罚成功,就会举行basicAck肯定确认,但是如果捕获到了异常就举行拒绝确认,并将消息重新入队投递给下一个消耗者使用!
2. 长期化机制
2.1 先容
我们再次回看RabbitMQ的消息流转图:
前面我们通过消息确认机制包管了Broker可以大概将消息可靠地投递给Consumer消耗者端,但是如今还存在一个问题:当消息存储在Broker中,但是RabbitMQ服务器遇到断电重启的环境怎样包管将消息恢复呢?RabbitMQ就提供了 长期化机制 ,在RabbitMQ中有以下三种长期化:
2.2 队列长期化
队列的长期化是通过在声明队列的时候设置参数durable为true实现的
- 如果队列不举行长期化,那么在重启的时候关于队列的元数据信息就会丢失(此时哪怕消息举行了长期化也无法恢复消息了,因为消息生存在队列中)
- 如果将队列设置为长期化,此时队列相关的元数据就可以从硬盘上举行恢复,但是并不能包管内部的消息不丢失,如果想要让消息不丢失,还必要设置消息的长期化
我们之前所创建队列的代码默认设置为长期化:
- /**
- * 声明持久化队列
- */
- @Bean("persistQueue")
- public Queue persistQueue() {
- return QueueBuilder
- .durable(QueueConstant.PERSIST_QUEUE)
- .build();
- }
复制代码 追踪durable方法源码:
继续追踪setDurable方法源码可以发现默认是举行长期化的!
如果我们想要设置队列为非长期化,可以使用如下代码:
- /**
- * 声明非持久化队列
- */
- @Bean("nonPersistQueue")
- public Queue nonPersistQueue() {
- return QueueBuilder
- .nonDurable(QueueConstant.NON_PERSIST_QUEUE)
- .build();
- }
复制代码 2.3 交换机长期化
交换机的长期化是通过在声明交换机的时候设置参数durable为true实现的
同队列一样,只有设置为长期化,才会将有关交换机的元数据信息生存在硬盘上,在重启RabbitMQ服务器的时候才会读取然后恢复交换机数据信息,我们可以通过在声明交换机的时候设置durable(true | false)表现声明是否长期化:
- /**
- * 声明持久化交换机
- */
- @Bean("persistDirectExchange")
- public DirectExchange persistDirectExchange() {
- return ExchangeBuilder
- .directExchange(ExchangeConstant.PERSIST_EXCHANGE)
- .durable(true)
- .build();
- }
- /**
- * 声明非持久化交换机
- */
- @Bean("nonPersistDirectExchange")
- public DirectExchange nonPersistDirectExchange() {
- return ExchangeBuilder
- .directExchange(ExchangeConstant.NON_PERSIST_EXCHANGE)
- .durable(false)
- .build();
- }
复制代码 2.4 消息长期化
如果想要让消息举行长期化,我们就必要设置消息的投递模式MessageProperties.deliveryMode为PERSISITENT,使用RabbitTemplate发送长期化消息代码如下:
- @RestController
- public class PersistController {
- @Resource
- private RabbitTemplate rabbitTemplate;
- @RequestMapping("/persist")
- public String sendPersist() {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- Message message = new Message("persist info".getBytes(), messageProperties);
- rabbitTemplate.convertAndSend(ExchangeConstant.PERSIST_EXCHANGE, "persist", message);
- return "发送成功!";
- }
- }
复制代码 如果想要设置消息的不长期化,则对应代码如下:
- @RequestMapping("/nonPersist")
- public String sendNonPersist() {
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- Message message = new Message("non-persist info".getBytes(), messageProperties);
- rabbitTemplate.convertAndSend(ExchangeConstant.NON_PERSIST_EXCHANGE, "non-persist", message);
- return "发送成功!";
- }
复制代码 3. 发送方确认机制
3.1 先容
我们再次回看RabbitMQ的消息流转图:
如今我们通过消息确认机制包管从Broker到Consumer链路上消息可靠性,通过长期化机制包管Broker内部消息可靠性,但是此时还存在着问题:如果说消息在生产者投递给Broker过程中由于网络等问题导致消息丢失、或者Broker处于重启等服务不可用状态该怎么办呢?即生产者怎样包管消息可以大概可靠到达RabbitMQ服务器?
RabbitMQ为相识决这个问题,提供了以下两种机制:
- 事故机制(性能较低,此处不先容)
- 发送方确认机制(Publisher Confirm)
在发送方确认机制中,可以配置以下两种模式:
确认模式指的是在发送者发送消息时设置一个ConfirmCallback的监听器,无论消息是否到达对应的Exchange,这个监听都会实行。如果消息到达对应的Exchange,则对应ACK参数为true,反之没有到达Exchange则ACK参数为false
我们等待Exchange可以大概依据特定的路由规则将消息投递给对应的队列,但是如果设置的路由键错误或者队列不存在时导致消息迟迟没有投递给队列,此时我们希望可以将消息退回给生产者,退回模式指的是在发送者发送消息时设置一个ReturnsCallback的监听器对退回的消息举行处置惩罚
|