RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)

打印 上一主题 下一主题

主题 1503|帖子 1503|积分 4509

RabbitMQ 核心功能

RabbitMQ 高级特性解析:RabbitMQ 消息可靠性保障 (上)-CSDN博客
RabbitMQ 高级特性:从 TTL 到消息分发的全面解析 (下)-CSDN博客

前言

   近来再看 RabbitMQ,看了看本身之前写的博客,诶,一言难尽,其时学的懵懵懂懂的。这里重新整理 RabbitMQ 的核心功能。
  在分布式体系中,消息队列是实现异步通讯、解耦服务的关键组件。RabbitMQ 作为一款功能强大的消息队列,其消息可靠性是确保体系稳定运行的紧张因素。这里将深入探讨 RabbitMQ 的消息确认机制、持久化策略、发送方确认机制以及重试机制!!
  
一、消息确认机制

1.1 消息确认机制概述

生产者发送消息到消费端后,大概出现消息处理乐成或异常的环境。假如 RabbitMQ 在发送消息后就将其删除,当消息处理异常时,就会造成消息丢失。为了确保消费端乐成接收并精确处理消息,RabbitMQ 提供了消息确认机制(message acknowledgement)。
   消费者在订阅队列时,可以指定autoAck参数,根据该参数设置,消息确认机制分为自动确认和手动确认两种:
  

  • 自动确认(autoAck=true):RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正消费到了这些消息。这种模式得当对消息可靠性要求不高的场景。
  • 手动确认(autoAck=false):RabbitMQ 会等待消费者显式地调用Basic.Ack下令,复兴确认信号后才从内存(或者磁盘)中移去消息。这种模式得当对消息可靠性要求比较高的场景。
  以下是basicConsume方法的界说:
  1. /**
  2. * Start a non-nolocal, non-exclusive consumer, with
  3. * a server-generated consumerTag.
  4. * @param queue the name of the queue
  5. * @param autoAck true if the server should consider messages
  6. * acknowledged once delivered; false if the server should expect
  7. * explicit acknowledgements
  8. * @param callback an interface to the consumer object
  9. * @return the consumerTag generated by the server
  10. * @throws java.io.IOException if an error is encountered
  11. * @see com.rabbitmq.client.AMQP.Basic.Consume
  12. * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
  13. * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
  14. */
  15. String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
复制代码
1.2 手动确认方法

消费者在收到消息之后,可以选择确认、直接拒绝或者跳过,RabbitMQ 提供了三种不同简直认应答方式:
   

  • 肯定确认:Channel.basicAck(long deliveryTag, boolean multiple):RabbitMQ 已知道该消息并且乐成处理消息,可以将其丢弃。

    • deliveryTag:消息的唯一标识,是一个单调递增的 64 位长整型值,每个通道(Channel)独立维护,所以在每个通道上都是唯一的。当消费者确认(ack)一条消息时,必须使用对应的通道举行确认。
    • multiple:是否批量确认。值为true则会一次性 ack 所有小于或等于指定deliveryTag的消息;值为false,则只确认当前指定deliveryTag的消息。

  • 否定确认:Channel.basicReject(long deliveryTag, boolean requeue):消费者客户端可以调用channel.basicReject方法来告诉 RabbitMQ 拒绝这个消息。

    • deliveryTag:参考channel.basicAck。
    • requeue:表示拒绝后,这条消息怎样处理。假如requeue参数设置为true,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;假如requeue参数设置为false,则 RabbitMQ 会把消息从队列中移除,而不会把它发送给新的消费者。

  • 否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):假如想要批量拒绝消息,可以使用Basic.Nack下令。

    • 参数介绍参考前面两个方法。multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

  

1.3 代码示例

我们基于 Spring Boot 来演示消息简直认机制,Spring - AMQP 对消息确认机制提供了三种策略:
  1. public enum AcknowledgeMode {
  2.     NONE,
  3.     MANUAL,
  4.     AUTO;
  5. }
复制代码
1.3.1 AcknowledgeMode.NONE



  • 配置确认机制
  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: none
复制代码


  • 发送消息
  1. public class Constant {
  2.     public static final String ACK_EXCHANGE_NAME = "ack_exchange";
  3.     public static final String ACK_QUEUE = "ack_queue";
  4. }
  5. @Configuration
  6. public class RabbitmqConfig {
  7.     @Bean("ackExchange")
  8.     public Exchange ackExchange(){
  9.         return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
  10.     }
  11.     @Bean("ackQueue")
  12.     public Queue ackQueue() {
  13.         return QueueBuilder.durable(Constant.ACK_QUEUE).build();
  14.     }
  15.     @Bean("ackBinding")
  16.     public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
  17.                               @Qualifier("ackQueue") Queue queue) {
  18.         return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
  19.     }
  20. }
  21. @RestController
  22. @RequestMapping("/producer")
  23. public class ProductController {
  24.     @Autowired
  25.     private RabbitTemplate rabbitTemplate;
  26.     @RequestMapping("/ack")
  27.     public String ack(){
  28.         rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");
  29.         return "发送成功!";
  30.     }
  31. }
复制代码


  • 消费端逻辑
  1. @Component
  2. public class AckQueueListener {
  3.     @RabbitListener(queues = Constant.ACK_QUEUE)
  4.     public void ListenerQueue(Message message, Channel channel) throws Exception {
  5.         System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"),
  6.                           message.getMessageProperties().getDeliveryTag());
  7.         //模拟处理失败
  8.         int num = 3/0;
  9.         System.out.println("处理完成");
  10.     }
  11. }
复制代码
运行结果:消息处理失败,但消息已从 RabbitMQ 中移除,因为NONE模式下消息一旦投递就会被自动确认。
1.3.2 AcknowledgeMode.AUTO



  • 配置确认机制
  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: auto
复制代码
重新运行步伐,当消费者出现异常时,RabbitMQ 会不停重发消息,由于异常多次重试还是失败,消息没被确认,也无法 nack,就不停是unacked状态,导致消息积存。
1.3.3 AcknowledgeMode.MANUAL



  • 配置确认机制
  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: manual
复制代码


  • 消费端手动确认逻辑
  1. @Component
  2. public class AckQueueListener {
  3.     @RabbitListener(queues = Constant.ACK_QUEUE)
  4.     public void ListenerQueue(Message message, Channel channel) throws Exception {
  5.         long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6.         try {
  7.             //1. 接收消息
  8.             System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"),
  9.                               message.getMessageProperties().getDeliveryTag());
  10.             //2. 处理业务逻辑
  11.             System.out.println("处理业务逻辑");
  12.             //手动设置一个异常, 来测试异常拒绝机制
  13.             // int num = 3/0;
  14.             //3. 手动签收
  15.             channel.basicAck(deliveryTag, true);
  16.         } catch (Exception e) {
  17.             //4. 异常了就拒绝签收
  18.             //第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送, 若为false, 则直接丢弃
  19.             channel.basicNack(deliveryTag, true, true);
  20.         }
  21.     }
  22. }
复制代码
运行结果:消息正常处理时会被签收;异常时会不停重试。

二、持久性

2.1 交换机持久化

交换器的持久化是通过在声明交换机时将durable参数置为true实现的。这样当 MQ 的服务器发生不测或关闭之后,重启 RabbitMQ 时不需要重新去创建交换机,交换机会自动创建。假如交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相干的交换机元数据会丢失。
  1. ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
复制代码
2.2 队列持久化

队列的持久化是通过在声明队列时将durable参数置为true实现的。假如队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列就会被删掉,数据也会丢失。但队列持久化不能包管内部所存储的消息不丢失,要确保消息不丢失,需要将消息设置为持久化。
  1. QueueBuilder.durable(Constant.ACK_QUEUE).build();
复制代码
创建非持久化队列:
  1. QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
复制代码
2.3 消息持久化

消息实现持久化,需要把消息的投递模式(MessageProperties中的deliveryMode)设置为 2,也就是MessageDeliveryMode.PERSISTENT。
  1. // 要发送的消息内容
  2. String message = "This is a persistent message";
  3. // 创建一个Message对象,设置为持久化
  4. Message messageObject = new Message(message.getBytes(), new MessageProperties());
  5. messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  6. // 使用RabbitTemplate发送消息
  7. rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
复制代码
  需要注意的是,将所有的消息都设置为持久化,会严峻影响 RabbitMQ 的性能,因为写入磁盘的速度比写入内存的速度慢很多。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个衡量。
  纵然将交换器、队列、消息都设置了持久化,也不能百分之百包管数据不丢失。例如,消费者订阅队列时autoAck参数设置为true,消费者接收到消息后还没来得及处理就宕机,会导致数据丢失;持久化的消息存入 RabbitMQ 后,还需要一段时间才能存入磁盘,假如在这段时间内 RabbitMQ 服务节点发生异常,消息大概会丢失。可以通过引入 RabbitMQ 的仲裁队列或在发送端引入事务机制、发送方确认机制来提高可靠性。(后续都会讲到)
  
三、发送方确认

3.1 confirm 确认模式

Producer 在发送消息时,对发送端设置一个ConfirmCallback的监听,无论消息是否到达Exchange,这个监听都会被实行。假如Exchange乐成收到消息,ACK为true;假如充公到消息,ACK为false。
配置 RabbitMQ
  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: manual
  7. #消息接收确认        publisher-confirm-type: correlated #消息发送确认
复制代码
设置确认回调逻辑并发送消息
  1. @Configuration
  2. public class RabbitTemplateConfig {
  3.     @Bean("confirmRabbitTemplate")
  4.     public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
  5.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  6.         rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  7.             @Override
  8.             public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  9.                 if (ack){
  10.                     System.out.printf("消息接收成功, id:%s \n", correlationData.getId());
  11.                 }else {
  12.                     System.out.printf("消息接收失败, id:%s, cause: %s", correlationData.getId(), cause);
  13.                 }
  14.             }
  15.         });
  16.         return rabbitTemplate;
  17.     }
  18. }
  19. @RestController
  20. @RequestMapping("/product")
  21. public class ProductController {
  22.     @Resource(name = "confirmRabbitTemplate")
  23.     private RabbitTemplate confirmRabbitTemplate;
  24.     @RequestMapping("/confirm")
  25.     public String confirm() throws InterruptedException {
  26.         CorrelationData correlationData1 = new CorrelationData("1");
  27.         confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME, "confirm", "confirm test...", correlationData1);
  28.         return "确认成功";
  29.     }
  30. }
复制代码


  • 测试
    运行步伐,调用接口http://127.0.0.1:8080/product/confirm,观察控制台,消息确认乐成。修改交换机名称,重新运行,会触发消息发送失败的结果。
3.2 return 退回模式

消息到达Exchange之后,会根据路由规则匹配,把消息放入Queue中。假如一条消息无法被任何队列消费,可以选择把消息退还给发送者。
配置 RabbitMQ
  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: manual
  7. #消息接收确认        publisher-confirm-type: correlated #消息发送确认
复制代码


  • 设置返回回调逻辑并发送消息
  1. @Configuration
  2. public class RabbitTemplateConfig {
  3.     @Bean("confirmRabbitTemplate")
  4.     public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory connectionFactory){
  5.         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  6.         rabbitTemplate.setMandatory(true);
  7.         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  8.             @Override
  9.             public void returnedMessage(ReturnedMessage returned) {
  10.                 System.out.printf("消息被退回: %s", returned);
  11.             }
  12.         });
  13.         return rabbitTemplate;
  14.     }
  15. }
  16. @RestController
  17. @RequestMapping("/product")
  18. public class ProductController {
  19.     @Resource(name = "confirmRabbitTemplate")
  20.     private RabbitTemplate confirmRabbitTemplate;
  21.     @RequestMapping("/msgReturn")
  22.     public String msgReturn(){
  23.         CorrelationData correlationData = new CorrelationData("2");
  24.         confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm11", "message return test...", correlationData);
  25.         return "消息发送成功";
  26.     }
  27. }
复制代码
测试
运行步伐,调用接口http://127.0.0.1:8080/product/msgReturn,观察控制台,消息被退回。



四、重试机制

在消息转达过程中,大概会碰到各种问题,如网络故障、服务不可用、资源不足等,这些问题大概导致消息处理失败。为了办理这些问题,RabbitMQ 提供了重试机制,答应消息在处理失败后重新发送。但假如是步伐逻辑引起的错误,那么多次重试也是没有用的,可以设置重试次数。
4.1 重试配置

  1. spring:
  2.   rabbitmq:
  3.     addresses: amqp://study:study@110.41.51.65:15673/bite
  4.     listener:
  5.       simple:
  6.         acknowledge-mode: auto
  7. #消息接收确认        retry:          enabled: true # 开启消费者失败重试          initial-interval: 5000ms # 初始失败等待时长为5秒          max-attempts: 5 # 最大重试次数(包括自身消费的一次)
复制代码
4.2 配置交换机 & 队列

  1. //重试机制
  2. public static final String RETRY_QUEUE = "retry_queue";
  3. public static final String RETRY_EXCHANGE_NAME = "retry_exchange";
  4. //重试机制 发布订阅模式
  5. //1. 交换机
  6. @Bean("retryExchange")
  7. public Exchange retryExchange() {
  8.     return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
  9. }
  10. //2. 队列
  11. @Bean("retryQueue")
  12. public Queue retryQueue() {
  13.     return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
复制代码

五:怎样包管 RabbitMQ 消息的可靠传输?


消息大概丢失的场景以及办理方案如下:
   生产者将消息发送到 RabbitMQ 失败
  

  • 大概缘故原由是网络问题等,
  • 办理办法是参考发送方确认 - confirm确认模式
  消息在交换机中无法路由到指定队列
  

  • 大概缘故原由是代码或者配置层面错误,导致消息路由失败,
  • 办理办法是参考发送方确认 - return模式
  消息队列自身数据丢失
  

  • 大概缘故原由是消息到达 RabbitMQ 之后,RabbitMQ Server 宕机导致消息丢失,
  • 办理办法是参考持久化开启 RabbitMQ 持久化,也可以通过集群的方式提高可靠性。
  消费者异常,导致消息丢失
  

  • 大概缘故原由是消息到达消费者,还没来得及消费,消费者宕机或消费者逻辑有问题,
  • 办理办法是参考消息确认,开启手动确认,配置重试机制
  
以上就是四个RabbitMQ包管消息可靠性的四个机制,后续有更多核心机制的更新,感谢阅览!!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦见你的名字

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表