问题:
- 我们该如何确保MQ消息的可靠性?
- 假如真的发送失败,有没有别的的兜底方案?
可能出现的情况:
消息丢失的可能性:
发送消息的流程如图所示:
在消息发送的流程中从生产者到消耗者每一步都有可能导致消息丢失:
可能的情况:
- 生产者发送消息时丢失:
- **网络问题:**生产者发送消息时毗连MQ失败
- 生产者发送消息到达MQ后未找到**Exchange**
- 生产者发送消息到达MQ的**Exchange**后,未找到合适的**Queue**
- 消息到达MQ后,处理消息的历程发生非常
- MQ导致消息丢失:
- 消耗者处理消息时:
- 消耗者接收消息后还未处理,MQ突然宕机
- 消息接收后处理过程中抛出非常
要从三个方面保证MQ的可靠性:
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消耗者一定要处理消息
解决流程与兜底方案
第一个方面:确保生产者一定把消息发送到MQ
确保生产者将消息发送到MQ有两种机制分别是:1.生产者重试机制、2.生产者确认机制
1.生产者重试机制
发生原因:
第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的毗连中断。
实现:
利用SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ断网后等情况下,毗连超时后,多次重试。
application.yaml文件配置
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重试机制
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
- max-attempts: 3 # 最大重试次数
复制代码 :::info
留意事项:
在网络不稳固的情况下,利用重试机制可以有用提高消息发送的乐成率。但是SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
在项目中假如对于该业务性能有要求,发起禁用重试机制。假如一定要利用,请公道配置等待时长和重试次数,固然也可以思量利用异步线程来实验发送消息的代码。
:::
2.生产者确认机制
发生原因:
一般情况下生产者与MQ之间的网络没有问题,基本就不会出现消息丢失的问题,但在少数情况下也有可能出现消息发送到MQ之后消息丢失的问题。
少数情况:
- 在MQ的内部处理消息的历程出现了问题
- 生产者发送消息到达MQ之后没有找到对应的交换机(Exchange)
- 生产者发送消息到达MQ的交换机(Exchange)后,没有找到合适的Queue,因此无法举行路由
方案:
RabbitMQ的生产者消息确认机制包括有:Publisher Confirm和Publisher Return两种。开启消息确认机制情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
如图所示:
图解如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回非常信息,同时返回ack的确认信息,代表投递乐成
- 暂时消息投递到了MQ,而且入队乐成,返回ACK,告知投递乐成
- 持久消息投递到了MQ,而且入队完成持久化,返回ACK ,告知投递乐成
- 别的情况都会返回NACK,告知投递失败
其中ack和nack属于Publisher Confirm机制,ack是投递乐成;nack是投递失败。而return则属于Publisher Return机制。
实现:
**在application.yaml**中添加配置,开启生产者确认:
- spring:
- rabbitmq:
- publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
- publisher-returns: true # 开启publisher return机制
复制代码 publisher-confirm-type有三种模式可选分别是:
- none:关闭confirm机制,
- 优点:性能高。
- 缺点:无法保证消息送达,可能丢失消息。
- simple:同步阻塞等待MQ的回执( 同步阻塞等待确认 ),会阻塞当前线程直到收到 confirm。
- correlated:MQ异步回调返回回执( 异步回调确认 )
- 优点**:** 性能更好,推荐利用
- 缺点**:** 实现略复杂,必要维护消息状态和回调处理
一般推荐利用:correlated机制,回调机制。
界说ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此可以在配置类中统一设置。在publisher模块界说一个配置类:
- package com.publisher.config;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.ReturnedMessage;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.PostConstruct;
- @Slf4j
- @AllArgsConstructor
- @Configuration
- public class MqConfig {
- private final RabbitTemplate rabbitTemplate;
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.error("触发return callback,");
- log.debug("exchange: {}", returned.getExchange());
- log.debug("routingKey: {}", returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- }
- });
- }
- }
复制代码 界说ConfirmCallback
由于每个消息发送时的处理逻辑不一定类似,因此ConfirmCallback必要在每次发消息时界说。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
CorrelationData中包罗两个核心的东西:
- id:消息的唯一标示,MQ对不同的消息的回执以此做判断,制止肴杂
- SettableListenableFuture:回执效果的Future对象
将来MQ的回执就会通过这个Future来返回,提前给CorrelationData中的Future添加回调函数来处理消息回执:
新建一个测试,向系统自带的交换机发送消息,而且添加ConfirmCallback:
- @Test
- void testPublisherConfirm() {
- // 1.创建CorrelationData
- CorrelationData cd = new CorrelationData();
- // 2.给Future添加ConfirmCallback
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable ex) {
- // 2.1.Future发生异常时的处理逻辑,基本不会触发
- log.error("send message fail", ex);
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
- if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
- log.debug("发送消息成功,收到 ack!");
- }else{ // result.getReason(),String类型,返回nack时的异常描述
- log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
- }
- }
- });
- // 3.发送消息
- rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
- }
复制代码 实验效果如下:
可以看到,由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。
当修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。
而假如连交换机都是错误的,则只会收到nack。
:::info
留意事项:
- 开启生产者确认比力消耗MQ性能,一般不发起开启。
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种必要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才必要开启,而且仅仅必要开启ConfirmCallback处理nack就可以了。
:::
第二个方面:确保MQ不会将消息丢失
可能出现的问题:
消息在生产者发送到达MQ之后,MQ不能及时举行保存就有可能会导致消息丢失。
数据持久化
为了提升性能,默认MQ的数据都是在内存存储的暂时数据,重启后就会消散。为了保证数据的可靠性,必须配置数据持久化
数据持久化方案有:
交换机持久化
控制台的Exchanges页面,添加交换机时配置交换机的Durability参数:
设置为Durable就是持久化模式,Transient就是暂时模式。
2.队列持久化
在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数:
3.消息持久化
控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个properties:
:::info
阐明:在开启持久化机制以后,假如同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。
不外出于性能思量,为了淘汰IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此发起生产者确认全部接纳异步方式。
:::
LazyQueue
可能出现的情况:
RabbitMQ默认将接收到的信息保存在内存中以低落消息收发的延迟。在某些特别情况下会导致消息积存:
- 消耗者宕机或出现网络故障
- 消息发送量激增,超过了消耗者处理速度
- 消耗者处理业务发生阻塞
出现消息堆积问题,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会泯灭一段时间,而且会阻塞队列历程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的全部哀求都会被阻塞。
配置Lazy Queue(惰性队列)作用:
- MQ接收到消息后将消息直接存入磁盘,不存入内存从而减小内存的占用,制止消息积存而导致的性能下降。
- 消耗者必要消耗消息时才会从磁盘中读取并加载到内存(懒加载)
- 支持数百万条消息的存储
控制台配置Lazy模式
添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
SpringAMQP代码声明Lazy模式
SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:
- @Bean
- public Queue lazyQueue(){
- return QueueBuilder
- .durable("lazy.queue")
- .lazy() // 开启Lazy模式
- .build();
- }
复制代码 基于注解来声明队列并设置为Lazy模式:
- @RabbitListener(queuesToDeclare = @Queue(
- name = "lazy.queue",
- durable = "true",
- arguments = @Argument(name = "x-queue-mode", value = "lazy")
- ))
- public void listenLazyQueue(String msg){
- log.info("接收到 lazy.queue的消息:{}", msg);
- }
复制代码 更新已有队列为lazy模式
对于已经存在的队列,也可以配置为lazy模式,但是要通过设置policy实现。
可以基于下令行设置policy:
- rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
复制代码 下令解读:
- rabbitmqctl :RabbitMQ的下令行工具
- set_policy :添加一个策略
- Lazy :策略名称,可以自界说
- "^lazy-queue$" :用正则表达式匹配队列的名字
- '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
- --apply-to queues:策略的作用对象,是全部的队列
也可以在控制台配置policy,进入在控制台的Admin页面,点击Policies,即可添加配置:
第三方面.确保消耗者一定要处理消息
**可能出现的情况:**RabbitMQ发送消息给消耗者后,消息不一定被消耗者正确消耗可能出现的故障:
- 消息投递的过程中出现了网络故障
- 消耗者接收到消息后突然宕机
- 消耗者接收到消息后,因处理不妥导致非常
- …
这时候RabbitMQ就必要知道消耗者的处理状态,消息处理失败时重新投递消息。
消耗者确认机制(Consumer Acknowledgement)
当消耗者处理消息结束后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:乐成处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ必要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
:::info
留意:
reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们必要将消息处理的代码通过try catch机制捕获,消息处理乐成时返回ack,处理失败时返回nack.
:::
SpringAMQP实现:
配置文件:
- spring:
- rabbitmq:
- listener:
- simple:
- acknowledge-mode: none # 不做处理
复制代码 有三种处理模式:
- **none**:不处理。即消息投递给消耗者后立刻ack,消息会立刻从MQ删除。非常不安全,不发起利用
- **manual**:手动模式。必要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更机动
- **auto**:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了围绕加强,当业务正常实验时则自动返回ack. 当业务出现非常时,根据非常判断返回不同效果:
- 假如是业务非常,会自动返回nack,MQ重新发送消息;
- 假如是消息处理或校验非常,自动返回reject;
失败重试机制
问题:
当消耗者出现非常后,消息会不断requeue(重入队)到队列,再重新发送给消耗者。假如消耗者再次实验依然出错,消息会再次requeue到队列,再次投递,直到消息处理乐成为止。
非常情况就是消耗者一直无法实验乐成,那么消息requeue就会无限循环,导致mq的消息处理飙升,给CPU和内存带来较大的压力。
解决方案:
开启消耗者重试机制:在消耗者出现非常时利用当地重试,而不是无限制的requeue到mq队列。
application.yml文件,添加内容:
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000ms # 初识的失败等待时长为1秒
- multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
复制代码 :::info
留意事项:
- 开启当地重试时,消息处理过程中抛出非常,不会requeue到队列,而是在消耗者当地重试
- 重试到达最大次数后,Spring会返回reject,消息会被丢弃在某些对于消息可靠性要求较高的业务场景下,显然不太合适,必要结合失败处理策略举行处理。
:::
失败处理策略:
Spring允们自界说重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来界说的,有3个不同实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放非常消息的队列,后续由人工会集处理。
示例代码:
1.consumer服务中界说处理失败消息的交换机和队列
- public class ErrorMessageConfig {
- //定义处理失败消息的交换机
- @Bean
- public DirectExchange errorMessageExchange(){
- return new DirectExchange("error.direct");
- }
- //定义处理失败消息的队列
- @Bean
- public Queue errorQueue(){
- return new Queue("error.queue", true);
- }
- @Bean
- public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
- return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
- }
- //关联队列和交换机
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
- return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
- }
- }
复制代码 当出现失败消息时可以在控制台查看并在后续举行处理:
业务幂等性
幂等性指的是:指同一个业务,实验一次或多次对业务状态的影响是一致的。
但数据的更新往往不是幂等的,假如重复实验可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。假如多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
可能出现重复实验的情况:
- 页面卡顿时频繁刷新导致表单重复提交
- 服务间调用的重试
- MQ消息的重复投递
唯一消息id:
- 每一条消息都天生一个唯一的id,与消息一起投递给消耗者。
- 消耗者接收到消息后处理自己的业务,业务处理乐成后将消息ID保存到数据库
- 假如下次又收到类似消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
实现:
SpringAMQP的MessageConverter自带了MessageID的功能
以Jackson的消息转换器为例:
- @Bean
- public MessageConverter messageConverter(){
- // 1.定义消息转换器
- Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
- // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
- jjmc.setCreateMessageIds(true);
- return jjmc;
- }
复制代码 业务判断:
业务判断就是基于业务自己的逻辑或状态来判断是否是重复的哀求或消息,不同的业务场景判断的思路也不一样。
付出修改订单的业务为例,markOrderPaySuccess方法:
- @Override
- public void markOrderPaySuccess(Long orderId) {
- // 1.查询订单
- Order old = getById(orderId);
- // 2.判断订单状态
- if (old == null || old.getStatus() != 1) {
- // 订单不存在或者订单状态不是1,放弃处理
- return;
- }
- // 3.尝试更新订单
- Order order = new Order();
- order.setId(orderId);
- order.setStatus(2);
- order.setPayTime(LocalDateTime.now());
- updateById(order);
- }
复制代码 优化合并:
- @Override
- public void markOrderPaySuccess(Long orderId) {
- lambdaUpdate()
- .set(Order::getStatus, 2)
- .set(Order::getPayTime, LocalDateTime.now())
- .eq(Order::getId, orderId)
- .eq(Order::getStatus, 1)
- .update();
- //相当于这样的SQL语句
- // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
- }
复制代码 兜底方案
**最终实现目的:**只管MQ通知失败也要保证订单的付出状态一致。
实现:
通过自动查询保证订单的一致性
流程:
色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己自动去查询付出状态。
利用定时任务定期查询,比方每隔20秒就查询一次,并判断付出状态。假如发现订单已经付出,则立刻更新订单状态为已付出即可。
总结:
保证MQ消息的可靠性:
保证MQ消息的可靠性,接纳了生产者确认机制、消耗者确认、消耗者失败重试等策略,确保消息投递的可靠性
兜底方案:
设置了定时任务,这样即便MQ通知失败,还可以利用定时任务作为兜底方案。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |