RabbitMQ----进阶篇

打印 上一主题 下一主题

主题 580|帖子 580|积分 1740

23
RabbitMQ消息发送过程中可能会出现消息丢失,Case1:我们是基于网络发送,可能网络出现故障,导致消息未投递到MQ,出现用户已经付出了订单,却显示订单未完成的情况,给用户造成很不好的体验。Case2:付出服务成功发送消息给MQ,而MQ此时出现宕机,导致交易服务无法成功监听。

一、发送者可靠性

1.1 发送者重连

有的时候由于网络颠簸,可能会出现发送者连接MQ失败的情况。发送者重连默认是关闭。
通过设置实现重连
  1. spring:
  2.    rabbitmq:
  3.       connection-timeout: 1s # 设置MQ的连接超时时间
  4.       template:
  5.          retry:
  6.             enabled: true # 开启超时重传
  7.             initial-interval: 1000ms # 失败后的初始等待时间
  8.             multiplier: 1  # 失败后下次的等待时长倍数
  9.             max-attempts: 3    # 最大重试次数
复制代码
网络颠簸出现的重连需设置等待时间,若立刻重连,大概率照旧重连失败。
1.2 发送者确认ACK



  SpringAMQP实现发送者确认:
1.在publisher这个微服务的application.yaml种添加设置
  1. spring:
  2.    rabbitmq:
  3.      publisher-confirm-type: correlated
  4.      publisher-returns: true
复制代码
publisher-confirm-type有三种模式可选:
· none:关闭confirm机制
· simple:同步壅闭等待MQ的绘制信息
· correlated: MQ异步回调方式返回回执消息
2.每个RabbitTemplate只需设置一个ReturnCallback,因此必要在项目启动过程中设置:
  1. @Slf4j
  2. @AllArgsConstructor
  3. @Configuration
  4. public class MqConig{
  5.   private final RabbitTemplate rabbitTemplate;
  6.    
  7.   @PostConstruct
  8.   public void init(){
  9.        rabbitTemplate.setReturnsCallBack(new RabbitTemplate.ReturnsCallback(){
  10.          @Override
  11.          public void returnedMessage(ReturnedMessage returned){
  12.              log.error("触发return callback");
  13.              log.debug("exchange:{}",returned.getExchange());
  14.              log.debug("routingKey:{}",returned.getRoutingKey());
  15.              log.debug("message: {}", returned.getMessage());
  16.              log.debug("replyCode: {}", returned.getReplyCode());
  17.              log.debug("replyText: {}", returned.getReplyText());
  18.          }
  19.     });
  20.   
  21.   }
  22. }
复制代码
3.发送消息,指定消息ID、消息ConfirmCallback,ConfirmCallBack需每次发消息时指定,指定ID对象。
  1. @Test
  2. public void testconfirmCallback(){
  3. // 0.创建correlationData
  4.     CorrelationData cd = new CorrelationData(UUID.randomUUID().toString);
  5.     cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){
  6.         @Override
  7.         public void onFailure(Throwable ex){
  8.            log.error("spring amqp 处理确认结果异常",ex);_
  9.         }
  10.         @Override
  11.         public void onSuccess(CorrelationData.confirm result){
  12.         //判断是否成功
  13.            if(result.isAck()){
  14.              log.debug("收到ConfirmCallback ack,消息发送成功!");
  15.            }else{
  16.              log.error("收到ConfirmCallback nack,消息发送失败 reason:{}",result.getReasso n());
  17.               //理论上应该做消息重发这里不做处理
  18.            }
  19.         }
  20.      String exchangeName ="shangpin.direct";
  21.      String message = "zzydemo";
  22.      rabbitTemplate.convertAndSend(exchangeName,"zzy",message,cd);
  23.      //这里与之前比多了以一个cd
  24.       Thread.sleep(2000);
  25. //给其充足的时间去响应回调。
  26.     })
  27. }
复制代码
如果routingkey写错也会返回ack,同时通过returncallback返回失败的原因。
发送者确认必要额外的网络和系统资源开销,非必要,尽量不要使用 !对于nack信息可以进行有限次重试。
发送方把消息放到MQ就一定安全吗?
二、MQ可靠性

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低信息收发的延迟。如许会导致两个标题:
1) 一旦MQ宕机,内存中的信息会丢失。
2) 内存空间有限,当斲丧者故障处置惩罚过慢时,会导致信息积压引发MQ壅闭。此时MQ把先进入队列的消息写入磁盘,但写磁盘在数据量大的情况下比力耗时较长,写时间内,发送方无法发送数据,需重发数据,而这会降低MQ性能,而且依然存在MQ壅闭标题。
解决方案:
2.1 数据持久化

RabbitMQ实现数据持久化包括3各方面:
  1) 交换机持久化  控制台设置时默认为durable
  2) 队列持久化   控制台设置时默认为durable
  3)消息持久化    publish message的Delivery-mode

持久化的信息哪怕重启了依然在,推荐各人如许做。
Tips:convertAndSend会把String转化成Message对象,需自界说构建消息。


2.2 LazyQueue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列吸收到消息后直接存入磁盘,不再存储到内存
斲丧者要斲丧消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
开启方式:


显然LazyQueue的高并发本事比数据持久化高。
2.3 小结

下面 我们来综合分析一下不接纳数据持久化、接纳持久化、使用LazyQueue这三种情况
不接纳数据持久化:

当memory存储达到上限时会通过Paged out写入磁盘,在写入磁盘的那一块刹时降低到0,使MQ出现壅闭状态,性能急剧降落。出现波浪线效果。
接纳数据持久化:

每次memory里有多少,持久化persistent有多少,内存里每发一条消息就持久化一次,不会使得MQ壅闭、出现消息丢失的情况。

MQ中的消息不会出现在内存和持久化,而是直接Paged out,在内存中。
开启持久化和发送者确认时, RabbitMQ只有在消息持久化完成后才会给发送者返回ACK回执
三、斲丧者可靠性

3.1 斲丧者确认机制


SpringAMQP已经实现了消息确认功能。并答应我们通过设置文件选择ACK处置惩罚方式,有三种方式:

3.2 失败重传机制

为了解决开启斲丧者确认后,斲丧者异常无限重试导致资源耗尽、扬弃消息的标题。


3.3 业务幂等性

以余额付出案例为例,用户下单时,MQ通知交易服务成功,MQ标记订单为已付出,在返回ack时出现网络颠簸,导致MQ无法收到ack,MQ准备消息入队重发;可此时,用户准备取消订单,取消订单业务由交易服务处置惩罚,无需调用其他微服务。此时网络恢复正常,MQ准备重发消息给交易服务,让交易服务标记订单为已付出,覆盖了之前用户的取消订单信息。出现了消息错误。

解决方法一:
给每个消息都设置一个唯一id,利用id区分是否是重复消息:
1.每一条消息都生成一个唯一的id,与消息一起投递给斲丧者。
2.斲丧者担当到消息后处置惩罚本身的业务,业务处置惩罚成功后将消息ID保存到数据库。
3.如果下次又收到类似消息,去数据库查询判断是否存在,重复消息则放弃处置惩罚。

然而,方法一存在业务入侵,把消息ID保存到数据库,以及去数据库查询判断是否存在,本身和业务无关。
解决方法二:优化业务逻辑

3.4 小结

Q1:怎样保证付出服务与交易服务之间的订单同等性?
首先,付出服务会正在用户付出成功以后利用MQ消息通知交易服务,完成订单状态同步。
其次,为了保证MQ消息的可靠性,我们接纳了生产者确认、斲丧者确认、斲丧者失败重试机制等策略,确保消息投递和处置惩罚的可靠性;同时开启了MQ的持久化,避免因服务宕机导致消息丢失。
最后,我们还在交易服务更新订单状态时做了等幂判断,避免因消息重复斲丧导致订单状态异常。
Q2:如果交易服务消息处置惩罚失败,有没有什么兜底方案?
我们可以在交易服务设置定时任务,定期查询订单付出状态。如许即便MQ通知失败,还可以利用定时任务当做保底方案,确保最终订单状态的同等性。从而引入了延迟消息。
四、延迟消息

在电商的付出业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。比方电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是如许就存在一个标题,如果用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户长处受损!
因此,电商中通常的做法就是:对于超过一定时间未付出的订单,应该立刻取消订单并释放占用的库存
4.1 死信交换机


死信交换机有什么作用呢?
收集那些因处置惩罚失败而被拒绝的消息。
收集那些因队列满了而被拒绝的消息。
收集TTL过期的消息。
4.2 DelayExchange插件







免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

梦应逍遥

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表