1.使用rabbitmq可能存在的问题
在我们使用消息队列时,是否考虑过一个问题,如果在发送消息的时候存在网络波动,会引发哪些问题?
- 无法正确的发送和接收消息
- 重复多次的消耗同一条消息
举个例子,我们在购物的时候,已经付出完成,但是消息没有正确的被消耗,前端发送哀求查询付出状态时,肯定是查询生意业务服务状态,会发现业务订单未付出,而用户本身知道已经付出乐成,这就导致用户体验不一致。
因此,这里我们必须尽可能确保MQ消息的可靠性,即:消息应该至少被消耗者处理1次(这里为什么是至少一次在反面业务的幂等性判断会解说)
那么问题来了:
- 我们该如何确保MQ消息的可靠性?
- 如果真的发送失败,有没有其它的兜底方案?
2.包管MQ消息可靠性的三种方式
2.1.发送者可靠性
2.1.1. 生产者重试机制
通过在配置文件中添加干系配置打开重试机制
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重试机制
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
- max-attempts: 3 # 最大重试次数
复制代码 注意:当网络不稳固的时候,使用重试机制可以有用提高消息发送的乐成率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试期待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,发起禁用重试机制。如果肯定要使用,请合理配置期待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
2.1.2. 生产者确认机制
我个人认为,生产者确认机制对性能影响较大,无特殊必要不要开启
一般情况下,只要生产者与MQ之间的网路毗连顺畅,根本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
不过,在少数情况下,也会出现消息发送到MQ之后丢失的征象,比如:
- MQ内部处理消息的历程发生了异常
- 生产者发送消息到达MQ后未找到Exchange
- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回差别的回执。
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递乐成
- 临时消息投递到了MQ,并且入队乐成,返回ACK,告知投递乐成
- 长期消息投递到了MQ,并且入队完成长期化,返回ACK ,告知投递乐成
- 其它情况都会返回NACK,告知投递失败
此中ack和nack属于Publisher Confirm机制,ack是投递乐成;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,必要通过配置文件来开启。
在生产者中添加配置
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
- publisher-returns: true # 开启publisher return机制
复制代码 这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞期待MQ的回执
- correlated:MQ异步回调返回回执
添加之后必要去配置一个ReturnCallback,每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中同一设置。我们在publisher模块定义一个配置类(如果返回 nack则会调用这个方法:
- package com.itheima.publisher.config;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.PostConstruct;
- @Slf4j
- @Configuration
- @RequiredArgsConstructor
- public class MqConfig {
- private final RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnsCallback(returnedMessage -> {
- log.error("监听到消息return callback");
- log.debug("exchange:{}", returnedMessage.getExchange());
- log.debug("routingKey:{}", returnedMessage.getRoutingKey());
- log.debug("message:{}", returnedMessage.getMessage());
- log.debug("replyCode:{}", returnedMessage.getReplyCode());
- log.debug("replyText:{}", returnedMessage.getReplyText());
- });
- }
- }
复制代码 ConfirmCallBack
由于每个消息发送时的处理逻辑不肯定相同,因此ConfirmCallback必要在每次发消息时定义。详细来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
- id:消息的唯一标示,MQ对差别的消息的回执以此做判断,避免混淆,我们这里使用UUID
- SettableListenableFuture:回执效果的Future对象
- @Test
- public void testConfirmCallback() throws InterruptedException {
- // 0.创建correlationData
- CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- log.error("spring amqp 处理确认结果异常", ex);
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- // 判断是否成功
- if (result.isAck()){
- log.debug("收到ConfirmCallback ack,消息发送成功!");
- }else {
- log.error("收到ConfirmCallback nack,消息发送失败!reason:{}", result.getReason());
- // 重发根据自己情况定义
-
- }
- }
- });
- // 1.交换机名
- String exchangeName = "hmall.direct";
- // 2.消息
- String message = "test...";
- // 3.发送
- rabbitTemplate.convertAndSend(exchangeName, "blue", message, cd);
- Thread.sleep(2000);
- }
复制代码 2.2.MQ可靠性(数据长期化)
2.2.1. 交换机长期化
在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数:
设置为Durable就是长期化模式,Transient就是临时模式。
2.2.2. 队列长期化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:
2.3.消耗者可靠性
2.3.1. 消耗者确认机制
为了确认消耗者是否乐成处理消息,RabbitMQ提供了消耗者确认机制(Consumer Acknowledgement)。即:当消耗者处理消息竣事后,应该向RabbitMQ发送一个回执,告知RabbitMQ本身消息处理状态。回执有三种可选值:
- ack:乐成处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ必要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开辟问题了。因此大多数情况下我们必要将消息处理的代码通过try catch机制捕获,消息处理乐成时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较同一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
- none:不处理。即消息投递给消耗者后立刻ack,消息会立刻从MQ删除。非常不安全,不发起使用
- manual:手动模式。必要本身在业务代码中调用api,发送ack或reject,存在业务入侵,但更机动
- auto:主动模式。SpringAMQP使用AOP对我们的消息处理逻辑做了围绕增强,当业务正常执行时则主动返回ack. 当业务出现异常时,根据异常判断返回差别效果:
- 如果是业务异常,会主动返回nack;
- 如果是消息处理或校验异常,主动返回reject;
通过下面的配置可以修改SpringAMQP的ACK处理方式:
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: none # 不做处理
复制代码 修改consumer服务的SpringRabbitListener类中的方法,模仿一个消息处理的异常:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "wang.queue3", durable = "true"),
- exchange = @Exchange(name = "wang.fanout", type = ExchangeTypes.FANOUT)
- ))
- public void listenWangQueue2(String message){
- log.info("spring 消费者接收到消息:【" + message + "】");
- if (true) {
- throw new MessageConversionException("故意的");
- }
- log.info("消息处理完成");
- }
复制代码
测试可以发现:当消息处理发生异常时,消息依然被RabbitMQ删除了。
我们再次把确认机制修改为auto:
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: auto # 自动ack
复制代码 在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态):
放行以后,由于抛出的是消息转换异常,因此Spring会主动返回reject,以是消息依然会被删除:
我们将异常改为RuntimeException类型:
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "wang.queue3", durable = "true"),
- exchange = @Exchange(name = "wang.fanout", type = ExchangeTypes.FANOUT)
- ))
- public void listenWangQueue2(String message){
- log.info("spring 消费者接收到消息:【" + message + "】");
- if (true) {
- throw new RuntimeException("故意的");
- }
- log.info("消息处理完成");
- }
复制代码 在异常位置打断点,然后再次发送消息测试,程序卡在断点时,可以发现此时消息状态为unacked(未确定状态)
放行以后,由于抛出的是业务异常,以是Spring返回nack,最终消息恢复至Ready状态,并且没有被RabbitMQ删除:
当我们把配置改为auto时,消息处理失败后,会回到RabbitMQ,并重新投递到消耗者。
2.3.2. 消耗者重连机制
当消耗者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消耗者。如果消耗者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理乐成为止。
极端情况就是消耗者不停无法执行乐成,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力:
当然,上述极端情况发生的概率还黑白常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消耗者失败重试机制:在消耗者出现异常时使用本地重试,而不是无限定的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000ms # 初识的失败等待时长为1秒
- multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
复制代码 重启consumer服务,重复之前的测试。可以发现:
- 消耗者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
- 本地重试3次以后,抛出了AmqpRejectAndDontRequeueException异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消耗者本地重试
- 重试到达最大次数后,Spring会返回reject,消息会被抛弃
2.4.失败处理计谋
在之前的测试中,本地测试到达最大重试次数后,消息会被抛弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理计谋,这个计谋是由MessageRecovery接口来定义的,它有3个差别实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,抛弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
我个人认为,比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
1)在consumer服务中定义处理失败消息的交换机和队列
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
复制代码 2)定义一个RepublishMessageRecoverer,关联队列和交换机
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
复制代码 完整代码如下:
- package com.itheima.consumer.config;
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.context.annotation.Bean;
- @Configuration
- @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
- public class ErrorMessageConfig {
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
复制代码 2.5.业务幂等性判断
何为幂等性?
幂等是一个数学概念,用函数表达式来形貌是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开辟中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
但数据的更新通常不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
以是,我们要尽可能避免业务被重复执行。
然而在现实业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
- 页面卡顿时频仍刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
我们在用户付出乐成后会发送MQ消息到生意业务服务,修改订单状态为已付出,就可能出现消息重复投递的情况。如果消耗者不做判断,很有可能导致消息被消耗多次,出现业务故障。
举例:
- 如果用户刚刚付出完成,并且投递消息到生意业务服务,生意业务服务更改订单为已付出状态。
- 由于某种缘故起因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给生意业务服务。
- 但是,在新投递的消息被消耗之前,用户选择了退款,将订单状态改为了已退款状态。
- 退款完成后,新投递的消息才被消耗,那么订单状态会被再次改为已付出。业务异常。
因此,我们必须想办法包管消息处理的幂等性。这里给出两种方案:
这里提供唯一消息ID的方法:
这个思路非常简单:
- 每一条消息都生成一个唯一的id,与消息一起投递给消耗者。
- 消耗者接收到消息后处理本身的业务,业务处理乐成后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
我们该如何给消息添加唯一ID呢?
其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jjmc.setCreateMessageIds(true);
- return jjmc;
- }
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |