ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ(七)ACK 消息确认机制 [打印本页]

作者: 诗林    时间: 2024-8-18 20:53
标题: RabbitMQ(七)ACK 消息确认机制

   当我们在项目中引入了新的中心件之后,数据的风险性就要多一层考虑。那么,RabbitMQ 的消息是怎么知道有没有被消费者消费的呢?生产者又怎么确保自己发送成功了呢?这些问题将在文章中进行解答。
  一、简介

1.1 配景

在 MQ 中,消费者和生产者并不直接进行通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息。

1.2 定义

为了包管消息从队列可靠地到达消费者,RabbitMQ 提供了 消息确认机制(Message Acknowledgement)
消费者在订阅队列时,可以指定 autoAck 参数:

采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有富足的时间处置惩罚消息(任务),不用担心处置惩罚消息过程中消费者进程挂掉后消息丢失的问题,由于 RabbitMQ 会一直等待持有消息知道消费者显式调用 Basic.Ack 下令为止。
对于 RabbitMQ 服务器端而言,当 autoAck 参数为 false 时,队列中的消息分成了两部分:


如果 RabbitMQ 服务器端 一直没有收到消费者简直认信息,并且 消费此消息的消费者已经断开毗连,则服务器端会安排 该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未确认的消息设置逾期时间,它 判断此消息是否需要重新投递给消费者的唯一依据是该消息毗连是否已经断开,这个计划的原因是 RabbitMQ 允许消费者消费一条消息的时间可以好久好久。
1.3 如何查看确认/未确认的消息数?

RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数:



二、消息确认机制的分类

RabbitMQ 消息确认机制分为两大类:
2.1 消息发送确认

RabbitMQ 的消息发送确认有两种实现方式:ConfirmCallback 方法、ReturnCallback 方法。
1)ConfirmCallback方法

ConfirmCallback 是一个回调接口,用于确认消息否是到达互换机中
配置方式:
  1. spring.rabbitmq.publisher-confirm-type=correlated
复制代码
它有三个值:

2)ReturnCallback方法

ReturnCallback 也是一个回调接口,用于确认消息是否在互换机中路由到了队列
(该方法可以不使用,由于互换机和队列是在代码里面绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非代码写错了。)
配置方式:
  1. spring.rabbitmq.publisher-returns=true
复制代码
3)代码实现方式一:同一配置

a.配置类

RabbitDirectConfig.java
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  7. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
  10. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. /**
  14. * <p> @Title RabbitDirectConfig
  15. * <p> @Description 直连交换机配置
  16. * Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
  17. *
  18. * @author ACGkaka
  19. * @date 2023/1/12 15:09
  20. */
  21. @Slf4j
  22. @Configuration
  23. public class RabbitDirectConfig {
  24.     public static final String DIRECT_EXCHANGE_NAME = "TEST_DIRECT_EXCHANGE";
  25.     public static final String DIRECT_ROUTING_NAME = "TEST_DIRECT_ROUTING";
  26.     public static final String DIRECT_QUEUE_NAME = "TEST_DIRECT_QUEUE";
  27.     @Bean
  28.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  29.         RabbitTemplate rabbitTemplate = new RabbitTemplate();
  30.         rabbitTemplate.setConnectionFactory(connectionFactory);
  31.         // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
  32.         rabbitTemplate.setMandatory(true);
  33.         //设置message序列化方法
  34.         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  35.         // 设置消息发送到交换机(Exchange)回调
  36.         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  37.             if (ack) {
  38.                 log.info(">>>>>>>>>>【INFO】消息发送到交换机(Exchange)成功, 相关数据: {}", correlationData);
  39.             } else {
  40.                 log.error(">>>>>>>>>>【ERROR】消息发送到交换机(Exchange)失败, 错误原因: {}, 相关数据: {}", cause, correlationData);
  41.             }
  42.         });
  43.         // 设置消息发送到队列(Queue)回调(经测试,只有失败才会调用)
  44.         rabbitTemplate.setReturnsCallback((returnedMessage) -> {
  45.             log.error(">>>>>>>>>>【ERROR】消息发送到队列(Queue)失败:响应码: {}, 响应信息: {}, 交换机: {}, 路由键: {}, 消息内容: {}",
  46.                     returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());
  47.         });
  48.         return rabbitTemplate;
  49.     }
  50.     /**
  51.      * 消息监听-反序列化
  52.      */
  53.     @Bean
  54.     public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  55.         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  56.         factory.setConnectionFactory(connectionFactory);
  57.         factory.setMessageConverter(new Jackson2JsonMessageConverter());
  58.         return factory;
  59.     }
  60.     /**
  61.      * 队列,命名:testDirectQueue
  62.      *
  63.      * @return 队列
  64.      */
  65.     @Bean
  66.     public Queue testDirectQueue() {
  67.         // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
  68.         // exclusive:默认false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
  69.         // autoDelete:是否自动删除,当没有生产者或消费者使用此队列,该队列会自动删除。
  70.         // 一般设置一下队列的持久化就好,其余两个默认false
  71.         return new Queue(DIRECT_QUEUE_NAME, true);
  72.     }
  73.     /**
  74.      * Direct交换机,命名:testDirectExchange
  75.      * @return Direct交换机
  76.      */
  77.     @Bean
  78.     DirectExchange testDirectExchange() {
  79.         return new DirectExchange(DIRECT_EXCHANGE_NAME, true, false);
  80.     }
  81.     /**
  82.      * 绑定 将队列和交换机绑定,并设置用于匹配键:testDirectRouting
  83.      * @return 绑定
  84.      */
  85.     @Bean
  86.     Binding bindingDirect() {
  87.         return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING_NAME);
  88.     }
  89. }
复制代码
a.生产者

SendMessageController.java
  1. import com.demo.config.RabbitDirectConfig;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalDateTime;
  8. import java.time.format.DateTimeFormatter;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.UUID;
  12. /**
  13. * <p> @Title SendMessageController
  14. * <p> @Description 推送消息接口
  15. *
  16. * @author ACGkaka
  17. * @date 2023/1/12 15:23
  18. */
  19. @Slf4j
  20. @RestController
  21. public class SendMessageController {
  22.     /**
  23.      * 使用 RabbitTemplate,这提供了接收/发送等方法。
  24.      */
  25.     @Autowired
  26.     private RabbitTemplate rabbitTemplate;
  27.     @GetMapping("/sendDirectMessage")
  28.     public String sendDirectMessage() {
  29.         String messageId = String.valueOf(UUID.randomUUID());
  30.         String messageData = "Hello world.";
  31.         String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  32.         Map<String, Object> map = new HashMap<>();
  33.         map.put("messageId", messageId);
  34.         map.put("messageData", messageData);
  35.         map.put("createTime", createTime);
  36.         // 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
  37.         rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);
  38.         return "OK";
  39.     }
  40. }
复制代码
c.消费者

DirectReceiver.java
  1. import com.demo.config.RabbitDirectConfig;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import java.util.Map;
  6. /**
  7. * <p> @Title DirectReceiver
  8. * <p> @Description 直连交换机监听类
  9. *
  10. * @author ACGkaka
  11. * @date 2023/1/12 15:59
  12. */
  13. @Slf4j
  14. @Component
  15. public class DirectReceiver {
  16.     @RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)
  17.     public void process(Map<String, Object> testMessage) {
  18.         System.out.println("DirectReceiver消费者收到消息:" + testMessage.toString());
  19.     }
  20. }
复制代码
d.测试结果

成功发送时,执行结果:

互换机错误时,执行结果:

路由键错误时,执行结果:

4)代码实现方式二:单独配置

除了在配置类里面同一设置回调方法外,还可以在每次推送消息到队列时,手动使用 CorrelationData 指定回调方法。
  1. @GetMapping("/sendDirectMessage2")
  2. public String sendDirectMessage2() {
  3.     String messageId = String.valueOf(UUID.randomUUID());
  4.     String messageData = "Hello world.";
  5.     String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  6.     Map<String, Object> map = new HashMap<>();
  7.     map.put("messageId", messageId);
  8.     map.put("messageData", messageData);
  9.     map.put("createTime", createTime);
  10.     //生成唯一标识
  11.     CorrelationData correlationData = new CorrelationData(messageId);
  12.     //不管成功失败都会调用confirm或者throwable,这是异步调用
  13.     correlationData.getFuture().addCallback(
  14.             confirm -> {
  15.                 // 设置消息发送到交换机(Exchange)回调
  16.                 if (confirm != null && confirm.isAck()) {
  17.                     log.info(">>>>>>>>>>【INFO】发送成功ACK,msgId: {}, message: {}", correlationData.getId(), map);
  18.                 } else {
  19.                     log.error(">>>>>>>>>>【ERROR】发送失败NACK,msgId: {}, message: {}", correlationData.getId(), map);
  20.                 }
  21.             },
  22.             throwable -> {
  23.                 //发生错误,链接mq异常,mq未打开等...报错回调
  24.                 System.out.println("发送失败throwable = " + throwable + ",  id:" + correlationData.getId());
  25.             }
  26.     );
  27.     // 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
  28.     rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map, correlationData);
  29.     return "OK";
  30. }
复制代码


2.2 消息吸收确认

消费者确认发生在 监听队列的消费者处置惩罚业务失败,如:发生了异常、不符合要求的数据等。这些场景就 需要我们手动处置惩罚消息,比如:重新发送消息或者丢弃消息。
RabbitMQ 的 消息确认机制(ACK) 默认是自动确认的。自动确认会 在消息发送给消费者后立刻确认,但 存在丢失消息的可能。如果消费端消费逻辑抛出了异常,如果我们使用了事务的回滚,也只是包管了数据的同等性,消息还是丢失了。也就是消费端没有处置惩罚成功这条消息,那么就相当于丢失了消息。
消息简直认模式有三种:
消费者收到消息后,手动调用 Channel 的 basicAck()/basicReject()/basicNack() 方法后,RabbitMQ 收到消息后,才以为本次投递完成。
1)basicAck() 方法

basicAck() 方法 用于确认当前消息Channel 类中的方法定义如下:
  1. void basicAck(long deliveryTag, boolean multiple) throws IOException;
复制代码
参数阐明:

2)basicReject() 方法

basicReject() 方法 用于明确拒绝当前的消息。RabbitMQ 在 2.0.0 版本开始引入,Channel 类中的方法定义如下:
  1. void basicReject(long deliveryTag, boolean requeue) throws IOException;
复制代码
参数阐明:

3)basicNack() 方法

basicNack() 方法 用于批量拒绝消息。由于 basicReject() 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack() 方法。Channel 类中的方法定义如下:
参数阐明:

4)代码实现

a.配置方式一:代码配置

如果我们之前配置了 Jackson2JsonMessageConverter.java 的序列化方式,那么我们可以接着指定消费方的消息确认模式为 AcknowledgeMode.MANUL。
  1. /**
  2. * 消息监听配置
  3. */
  4. @Bean
  5. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  6.     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  7.     // 设置连接工厂
  8.     factory.setConnectionFactory(connectionFactory);
  9.     // 设置消息确认模式
  10.     factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  11.     // 设置反序列化
  12.     factory.setMessageConverter(new Jackson2JsonMessageConverter());
  13.     return factory;
  14. }
复制代码
b.配置方式二:配置文件

我们可以直接在 application.yml 中进行如下配置:
  1. # 确认模式,默认auto,自动确认;manual:手动确认
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual
复制代码
  注意: yaml中指定的是消费端容器的默认配置,如果我们在代码中有自定义注入 RabbitListenerContainerFactory 示例之后,还需要使用默认配置,需要在代码中进行设置,如下所示:
  1. @Autowired
  2. private SimpleRabbitListenerContainerFactoryConfigurer configurer;
  3. /**
  4. * 消息监听配置
  5. */
  6. @Bean
  7. public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
  8.     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  9.     // 设置连接工厂
  10.     factory.setConnectionFactory(connectionFactory);
  11.     // 采用yaml中的配置
  12.     configurer.configure(factory, connectionFactory);
  13.     // 设置反序列化
  14.     factory.setMessageConverter(new Jackson2JsonMessageConverter());
  15.     return factory;
  16. }
复制代码
c.生产者

SendMessageController.java
  1. import com.demo.config.RabbitDirectConfig;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import java.time.LocalDateTime;
  8. import java.time.format.DateTimeFormatter;
  9. import java.util.HashMap;
  10. import java.util.Map;
  11. import java.util.UUID;
  12. /**
  13. * <p> @Title SendMessageController
  14. * <p> @Description 推送消息接口
  15. *
  16. * @author ACGkaka
  17. * @date 2023/1/12 15:23
  18. */
  19. @Slf4j
  20. @RestController
  21. public class SendMessageController {
  22.     /**
  23.      * 使用 RabbitTemplate,这提供了接收/发送等方法。
  24.      */
  25.     @Autowired
  26.     private RabbitTemplate rabbitTemplate;
  27.     @GetMapping("/sendDirectMessage")
  28.     public String sendDirectMessage() {
  29.         String messageId = String.valueOf(UUID.randomUUID());
  30.         String messageData = "Hello world.";
  31.         String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  32.         Map<String, Object> map = new HashMap<>();
  33.         map.put("messageId", messageId);
  34.         map.put("messageData", messageData);
  35.         map.put("createTime", createTime);
  36.         // 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
  37.         rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME, RabbitDirectConfig.DIRECT_ROUTING_NAME, map);
  38.         return "OK";
  39.     }
  40. }
复制代码
d.消费者

DirectReceiver.java
  1. import com.demo.config.RabbitDirectConfig;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.Message;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.stereotype.Component;
  7. import java.io.IOException;
  8. import java.util.Map;
  9. /**
  10. * <p> @Title DirectReceiver
  11. * <p> @Description 直连交换机监听类
  12. *
  13. * @author ACGkaka
  14. * @date 2023/1/12 15:59
  15. */
  16. @Slf4j
  17. @Component
  18. public class DirectReceiver {
  19.     @RabbitListener(queues = RabbitDirectConfig.DIRECT_QUEUE_NAME)
  20.     public void process(Map<String, Object> testMessage, Message message, Channel channel) throws IOException {
  21.         try {
  22.             log.info("DirectReceiver消费者收到消息: {}", testMessage.toString());
  23.             // 手动答应消费完成,从队列中删除该消息
  24.             channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
  25.         } catch (Exception e) {
  26.             log.error("DirectReceiver消费者消费失败,原因: {}", e.getMessage(), e);
  27.             // 手动答应消费完成,从队列中删除该消息(不重回队列)
  28.             channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
  29.         }
  30.     }
  31. }
复制代码
e.测试结果

场景一:消费者进行手动确认,生产者推送2条消息:
可以看到,生产者推送2条消息后立马被消费了。

场景二:消费者不进行手动确认,生产者推送2条消息:
虽然消费者消费完毕,但是由于没有进行手动确认,所以2条消息会一直处于 Unacked 状态,直到消费者下线。

关闭 SpringBoot 步伐,消费者下线后,消息由 Unacked 状态转为 Ready 状态,等待下一个消费者上线后重新进行消费。

整理完毕,完结撒花~




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4