RabbitMQ进阶--包管消息的可靠性

打印 上一主题 下一主题

主题 894|帖子 894|积分 2682

1.使用rabbitmq可能存在的问题

在我们使用消息队列时,是否考虑过一个问题,如果在发送消息的时候存在网络波动,会引发哪些问题?


  • 无法正确的发送和接收消息
  • 重复多次的消耗同一条消息
举个例子,我们在购物的时候,已经付出完成,但是消息没有正确的被消耗,前端发送哀求查询付出状态时,肯定是查询生意业务服务状态,会发现业务订单未付出,而用户本身知道已经付出乐成,这就导致用户体验不一致。
因此,这里我们必须尽可能确保MQ消息的可靠性,即:消息应该至少被消耗者处理1次(这里为什么是至少一次在反面业务的幂等性判断会解说)
那么问题来了:


  • 我们该如何确保MQ消息的可靠性?
  • 如果真的发送失败,有没有其它的兜底方案?
2.包管MQ消息可靠性的三种方式

2.1.发送者可靠性

2.1.1. 生产者重试机制

   通过在配置文件中添加干系配置打开重试机制
  1. spring:
  2. rabbitmq:
  3. connection-timeout: 1s # 设置MQ的连接超时时间
  4. template:
  5.    retry:
  6.      enabled: true # 开启超时重试机制
  7.      initial-interval: 1000ms # 失败后的初始等待时间
  8.      multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
  9.      max-attempts: 3 # 最大重试次数
复制代码
注意:当网络不稳固的时候,使用重试机制可以有用提高消息发送的乐成率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试期待的过程中,当前线程是被阻塞的。
  如果对于业务性能有要求,发起禁用重试机制。如果肯定要使用,请合理配置期待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
  2.1.2. 生产者确认机制

我个人认为,生产者确认机制对性能影响较大,无特殊必要不要开启
一般情况下,只要生产者与MQ之间的网路毗连顺畅,根本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的征象,比如:


  • MQ内部处理消息的历程发生了异常
  • 生产者发送消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回差别的回执



  • 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递乐成
  • 临时消息投递到了MQ,并且入队乐成,返回ACK,告知投递乐成
  • 长期消息投递到了MQ,并且入队完成长期化,返回ACK ,告知投递乐成
  • 其它情况都会返回NACK,告知投递失败
此中ack和nack属于Publisher Confirm机制,ack是投递乐成;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,必要通过配置文件来开启。
在生产者中添加配置
  1. spring:
  2.   rabbitmq:
  3.     publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
  4.     publisher-returns: true # 开启publisher return机制
复制代码
这里publisher-confirm-type有三种模式可选:


  • none:关闭confirm机制
  • simple:同步阻塞期待MQ的回执
  • correlated:MQ异步回调返回回执
添加之后必要去配置一个ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中同一设置。我们在publisher模块定义一个配置类(如果返回 nack则会调用这个方法:
  1. package com.itheima.publisher.config;
  2. import lombok.RequiredArgsConstructor;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.core.ReturnedMessage;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.context.annotation.Configuration;
  7. import javax.annotation.PostConstruct;
  8. @Slf4j
  9. @Configuration
  10. @RequiredArgsConstructor
  11. public class MqConfig {
  12.     private final RabbitTemplate rabbitTemplate;
  13.     @PostConstruct
  14.     public void init(){
  15.         rabbitTemplate.setReturnsCallback(returnedMessage -> {
  16.             log.error("监听到消息return callback");
  17.             log.debug("exchange:{}", returnedMessage.getExchange());
  18.             log.debug("routingKey:{}", returnedMessage.getRoutingKey());
  19.             log.debug("message:{}", returnedMessage.getMessage());
  20.             log.debug("replyCode:{}", returnedMessage.getReplyCode());
  21.             log.debug("replyText:{}", returnedMessage.getReplyText());
  22.         });
  23.     }
  24. }
复制代码
ConfirmCallBack

由于每个消息发送时的处理逻辑不肯定相同,因此ConfirmCallback必要在每次发消息时定义。详细来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

这里的CorrelationData中包含两个核心的东西:


  • id:消息的唯一标示,MQ对差别的消息的回执以此做判断,避免混淆,我们这里使用UUID
  • SettableListenableFuture:回执效果的Future对象
  1. @Test
  2. public void testConfirmCallback() throws InterruptedException {
  3.     // 0.创建correlationData
  4.     CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
  5.     cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
  6.         @Override
  7.         public void onFailure(Throwable ex) {
  8.             log.error("spring amqp 处理确认结果异常", ex);
  9.         }
  10.         @Override
  11.         public void onSuccess(CorrelationData.Confirm result) {
  12.             // 判断是否成功
  13.             if (result.isAck()){
  14.                 log.debug("收到ConfirmCallback ack,消息发送成功!");
  15.             }else {
  16.                 log.error("收到ConfirmCallback nack,消息发送失败!reason:{}", result.getReason());
  17.                 // 重发根据自己情况定义
  18.                
  19.             }
  20.         }
  21.     });
  22.     // 1.交换机名
  23.     String exchangeName = "hmall.direct";
  24.     // 2.消息
  25.     String message = "test...";
  26.     // 3.发送
  27.     rabbitTemplate.convertAndSend(exchangeName, "blue", message, cd);
  28.     Thread.sleep(2000);
  29. }
复制代码
2.2.MQ可靠性(数据长期化)

2.2.1. 交换机长期化

在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:

设置为Durable就是长期化模式,Transient就是临时模式。
2.2.2. 队列长期化

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:

2.3.消耗者可靠性

2.3.1. 消耗者确认机制

为了确认消耗者是否乐成处理消息,RabbitMQ提供了消耗者确认机制(Consumer Acknowledgement)。即:当消耗者处理消息竣事后,应该向RabbitMQ发送一个回执,告知RabbitMQ本身消息处理状态。回执有三种可选值:


  • ack:乐成处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ必要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开辟问题了。因此大多数情况下我们必要将消息处理的代码通过try catch机制捕获,消息处理乐成时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较同一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:


  • none:不处理。即消息投递给消耗者后立刻ack,消息会立刻从MQ删除。非常不安全,不发起使用
  • manual:手动模式。必要本身在业务代码中调用api,发送ack或reject,存在业务入侵,但更机动
  • auto:主动模式。SpringAMQP使用AOP对我们的消息处理逻辑做了围绕增强,当业务正常执行时则主动返回ack. 当业务出现异常时,根据异常判断返回差别效果:

    • 如果是业务异常,会主动返回nack;
    • 如果是消息处理或校验异常,主动返回reject;


通过下面的配置可以修改SpringAMQP的ACK处理方式:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         acknowledge-mode: none # 不做处理
复制代码
修改consumer服务的SpringRabbitListener类中的方法,模仿一个消息处理的异常:
  1.     @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "wang.queue3", durable = "true"),
  3.             exchange = @Exchange(name = "wang.fanout", type = ExchangeTypes.FANOUT)
  4.     ))
  5.     public void listenWangQueue2(String message){
  6.         log.info("spring 消费者接收到消息:【" + message + "】");
  7.         if (true) {
  8.             throw new MessageConversionException("故意的");
  9.         }
  10.         log.info("消息处理完成");
  11.     }
复制代码

测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
我们再次把确认机制修改为auto:
  1. spring:
  2.   rabbitmq:
  3.     listener:
  4.       simple:
  5.         acknowledge-mode: auto # 自动ack
复制代码
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):

放行以后,由于抛出的是消息转换异常,因此Spring会主动返回reject,以是消息依然会被删除:
我们将异常改为RuntimeException类型:
  1.         @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "wang.queue3", durable = "true"),
  3.             exchange = @Exchange(name = "wang.fanout", type = ExchangeTypes.FANOUT)
  4.     ))
  5.     public void listenWangQueue2(String message){
  6.         log.info("spring 消费者接收到消息:【" + message + "】");
  7.         if (true) {
  8.             throw new RuntimeException("故意的");
  9.         }
  10.         log.info("消息处理完成");
  11.     }
复制代码
在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态)
放行以后,由于抛出的是业务异常,以是Spring返回nack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:

​ 当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消耗者。
2.3.2. 消耗者重连机制

当消耗者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消耗者。如果消耗者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理乐成为止。
极端情况就是消耗者不停无法执行乐成,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

当然,上述极端情况发生的概率还黑白常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消耗者失败重试机制:在消耗者出现异常时使用本地重试,而不是无限定的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
  1. spring:
  2. rabbitmq:
  3. listener:
  4.    simple:
  5.      retry:
  6.        enabled: true # 开启消费者失败重试
  7.        initial-interval: 1000ms # 初识的失败等待时长为1秒
  8.        multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
  9.        max-attempts: 3 # 最大重试次数
  10.        stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
复制代码
重启consumer服务,重复之前的测试。可以发现:


  • 消耗者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
  • 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:


  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消耗者本地重试
  • 重试到达最大次数后,Spring会返回reject,消息会被抛弃
2.4.失败处理计谋

在之前的测试中,本地测试到达最大重试次数后,消息会被抛弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理计谋,这个计谋是由MessageRecovery接口来定义的,它有3个差别实现:


  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,抛弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
我个人认为,比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
  1. @Bean
  2. public DirectExchange errorMessageExchange(){
  3.     return new DirectExchange("error.direct");
  4. }
  5. @Bean
  6. public Queue errorQueue(){
  7.     return new Queue("error.queue", true);
  8. }
  9. @Bean
  10. public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
  11.     return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
  12. }
复制代码
2)定义一个RepublishMessageRecoverer,关联队列和交换机
  1. @Bean
  2. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
  3.     return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  4. }
复制代码
完整代码如下:
  1. package com.itheima.consumer.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.DirectExchange;
  5. import org.springframework.amqp.core.Queue;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  8. import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
  9. import org.springframework.context.annotation.Bean;
  10. @Configuration
  11. @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
  12. public class ErrorMessageConfig {
  13.     @Bean
  14.     public DirectExchange errorMessageExchange(){
  15.         return new DirectExchange("error.direct");
  16.     }
  17.     @Bean
  18.     public Queue errorQueue(){
  19.         return new Queue("error.queue", true);
  20.     }
  21.     @Bean
  22.     public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
  23.         return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
  24.     }
  25.     @Bean
  26.     public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
  27.         return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
  28.     }
  29. }
复制代码
2.5.业务幂等性判断

何为幂等性?
幂等是一个数学概念,用函数表达式来形貌是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开辟中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:


  • 根据id删除数据
  • 查询数据
  • 新增数据
但数据的更新通常不是幂等的,如果重复执行可能造成不一样的后果。比如:


  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。
以是,我们要尽可能避免业务被重复执行。
然而在现实业务场景中,由于意外经常会出现业务被重复执行的情况,例如:


  • 页面卡顿时频仍刷新导致表单重复提交
  • 服务间调用的重试
  • MQ消息的重复投递
我们在用户付出乐成后会发送MQ消息到生意业务服务,修改订单状态为已付出,就可能出现消息重复投递的情况。如果消耗者不做判断,很有可能导致消息被消耗多次,出现业务故障。
举例:

  • 如果用户刚刚付出完成,并且投递消息到生意业务服务,生意业务服务更改订单为已付出状态。
  • 由于某种缘故起因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给生意业务服务。
  • 但是,在新投递的消息被消耗之前,用户选择了退款,将订单状态改为了已退款状态。
  • 退款完成后,新投递的消息才被消耗,那么订单状态会被再次改为已付出。业务异常。
因此,我们必须想办法包管消息处理的幂等性。这里给出两种方案:


  • 唯一消息ID
  • 业务状态判断
这里提供唯一消息ID的方法:
这个思路非常简单:

  • 每一条消息都生成一个唯一的id,与消息一起投递给消耗者。
  • 消耗者接收到消息后处理本身的业务,业务处理乐成后将消息ID保存到数据库
  • 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
  1. @Bean
  2. public MessageConverter messageConverter(){
  3.     // 1.定义消息转换器
  4.     Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  5.     // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  6.     jjmc.setCreateMessageIds(true);
  7.     return jjmc;
  8. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表