23
RabbitMQ消息发送过程中可能会出现消息丢失,Case1:我们是基于网络发送,可能网络出现故障,导致消息未投递到MQ,出现用户已经付出了订单,却显示订单未完成的情况,给用户造成很不好的体验。Case2:付出服务成功发送消息给MQ,而MQ此时出现宕机,导致交易服务无法成功监听。
一、发送者可靠性
1.1 发送者重连
有的时候由于网络颠簸,可能会出现发送者连接MQ失败的情况。发送者重连默认是关闭。
通过设置实现重连
- spring:
- rabbitmq:
- connection-timeout: 1s # 设置MQ的连接超时时间
- template:
- retry:
- enabled: true # 开启超时重传
- initial-interval: 1000ms # 失败后的初始等待时间
- multiplier: 1 # 失败后下次的等待时长倍数
- max-attempts: 3 # 最大重试次数
复制代码 网络颠簸出现的重连需设置等待时间,若立刻重连,大概率照旧重连失败。
1.2 发送者确认ACK
SpringAMQP实现发送者确认:
1.在publisher这个微服务的application.yaml种添加设置
- spring:
- rabbitmq:
- publisher-confirm-type: correlated
- publisher-returns: true
复制代码 publisher-confirm-type有三种模式可选:
· none:关闭confirm机制
· simple:同步壅闭等待MQ的绘制信息
· correlated: MQ异步回调方式返回回执消息
2.每个RabbitTemplate只需设置一个ReturnCallback,因此必要在项目启动过程中设置:
- @Slf4j
- @AllArgsConstructor
- @Configuration
- public class MqConig{
- private final RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void init(){
- rabbitTemplate.setReturnsCallBack(new RabbitTemplate.ReturnsCallback(){
- @Override
- public void returnedMessage(ReturnedMessage returned){
- log.error("触发return callback");
- log.debug("exchange:{}",returned.getExchange());
- log.debug("routingKey:{}",returned.getRoutingKey());
- log.debug("message: {}", returned.getMessage());
- log.debug("replyCode: {}", returned.getReplyCode());
- log.debug("replyText: {}", returned.getReplyText());
- }
- });
-
- }
- }
复制代码 3.发送消息,指定消息ID、消息ConfirmCallback,ConfirmCallBack需每次发消息时指定,指定ID对象。
- @Test
- public void testconfirmCallback(){
- // 0.创建correlationData
- CorrelationData cd = new CorrelationData(UUID.randomUUID().toString);
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>(){
- @Override
- public void onFailure(Throwable ex){
- log.error("spring amqp 处理确认结果异常",ex);_
- }
- @Override
- public void onSuccess(CorrelationData.confirm result){
- //判断是否成功
- if(result.isAck()){
- log.debug("收到ConfirmCallback ack,消息发送成功!");
- }else{
- log.error("收到ConfirmCallback nack,消息发送失败 reason:{}",result.getReasso n());
- //理论上应该做消息重发这里不做处理
- }
- }
- String exchangeName ="shangpin.direct";
- String message = "zzydemo";
- rabbitTemplate.convertAndSend(exchangeName,"zzy",message,cd);
- //这里与之前比多了以一个cd
- Thread.sleep(2000);
- //给其充足的时间去响应回调。
- })
- }
复制代码 如果routingkey写错也会返回ack,同时通过returncallback返回失败的原因。
发送者确认必要额外的网络和系统资源开销,非必要,尽量不要使用 !对于nack信息可以进行有限次重试。
发送方把消息放到MQ就一定安全吗?
二、MQ可靠性
在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低信息收发的延迟。如许会导致两个标题:
1) 一旦MQ宕机,内存中的信息会丢失。
2) 内存空间有限,当斲丧者故障处置惩罚过慢时,会导致信息积压引发MQ壅闭。此时MQ把先进入队列的消息写入磁盘,但写磁盘在数据量大的情况下比力耗时较长,写时间内,发送方无法发送数据,需重发数据,而这会降低MQ性能,而且依然存在MQ壅闭标题。
解决方案:
2.1 数据持久化
RabbitMQ实现数据持久化包括3各方面:
1) 交换机持久化 控制台设置时默认为durable
2) 队列持久化 控制台设置时默认为durable
3)消息持久化 publish message的Delivery-mode
持久化的信息哪怕重启了依然在,推荐各人如许做。
Tips:convertAndSend会把String转化成Message对象,需自界说构建消息。
2.2 LazyQueue
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列。
惰性队列吸收到消息后直接存入磁盘,不再存储到内存
斲丧者要斲丧消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
开启方式:
显然LazyQueue的高并发本事比数据持久化高。
2.3 小结
下面 我们来综合分析一下不接纳数据持久化、接纳持久化、使用LazyQueue这三种情况
不接纳数据持久化:
当memory存储达到上限时会通过Paged out写入磁盘,在写入磁盘的那一块刹时降低到0,使MQ出现壅闭状态,性能急剧降落。出现波浪线效果。
接纳数据持久化:
每次memory里有多少,持久化persistent有多少,内存里每发一条消息就持久化一次,不会使得MQ壅闭、出现消息丢失的情况。
MQ中的消息不会出现在内存和持久化,而是直接Paged out,在内存中。
开启持久化和发送者确认时, RabbitMQ只有在消息持久化完成后才会给发送者返回ACK回执
三、斲丧者可靠性
3.1 斲丧者确认机制
SpringAMQP已经实现了消息确认功能。并答应我们通过设置文件选择ACK处置惩罚方式,有三种方式:
3.2 失败重传机制
为了解决开启斲丧者确认后,斲丧者异常无限重试导致资源耗尽、扬弃消息的标题。
3.3 业务幂等性
以余额付出案例为例,用户下单时,MQ通知交易服务成功,MQ标记订单为已付出,在返回ack时出现网络颠簸,导致MQ无法收到ack,MQ准备消息入队重发;可此时,用户准备取消订单,取消订单业务由交易服务处置惩罚,无需调用其他微服务。此时网络恢复正常,MQ准备重发消息给交易服务,让交易服务标记订单为已付出,覆盖了之前用户的取消订单信息。出现了消息错误。
解决方法一:
给每个消息都设置一个唯一id,利用id区分是否是重复消息:
1.每一条消息都生成一个唯一的id,与消息一起投递给斲丧者。
2.斲丧者担当到消息后处置惩罚本身的业务,业务处置惩罚成功后将消息ID保存到数据库。
3.如果下次又收到类似消息,去数据库查询判断是否存在,重复消息则放弃处置惩罚。
然而,方法一存在业务入侵,把消息ID保存到数据库,以及去数据库查询判断是否存在,本身和业务无关。
解决方法二:优化业务逻辑
3.4 小结
Q1:怎样保证付出服务与交易服务之间的订单同等性?
首先,付出服务会正在用户付出成功以后利用MQ消息通知交易服务,完成订单状态同步。
其次,为了保证MQ消息的可靠性,我们接纳了生产者确认、斲丧者确认、斲丧者失败重试机制等策略,确保消息投递和处置惩罚的可靠性;同时开启了MQ的持久化,避免因服务宕机导致消息丢失。
最后,我们还在交易服务更新订单状态时做了等幂判断,避免因消息重复斲丧导致订单状态异常。
Q2:如果交易服务消息处置惩罚失败,有没有什么兜底方案?
我们可以在交易服务设置定时任务,定期查询订单付出状态。如许即便MQ通知失败,还可以利用定时任务当做保底方案,确保最终订单状态的同等性。从而引入了延迟消息。
四、延迟消息
在电商的付出业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。比方电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。
但是如许就存在一个标题,如果用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户长处受损!
因此,电商中通常的做法就是:对于超过一定时间未付出的订单,应该立刻取消订单并释放占用的库存。
4.1 死信交换机
死信交换机有什么作用呢?
收集那些因处置惩罚失败而被拒绝的消息。
收集那些因队列满了而被拒绝的消息。
收集TTL过期的消息。
4.2 DelayExchange插件
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |