RabbitMQ-如何保证消息不丢失

打印 上一主题 下一主题

主题 665|帖子 665|积分 1995

RabbitMQ常用于 异步发送,mysql,redis,es之间的数据同步 ,分布式事务,削峰填谷等.....
在微服务中,rabbitmq是我们经常用到的消息中间件。它能够异步的在各个业务之中进行消息的接受和发送,那么如何保证rabbitmq的消息不丢失就显得尤为重要。
起首要分析题目,我们就要明确rabbitmq在什么时间可能会出现消息丢失的情况呢?
我们直接说结果
RabbitMQ在每个阶段都有可能使消息发生丢失

我们在这里把他们简单归结为三个层面
层面一 :生产者发送消息没有到达交换机大概没有到达绑定的队列。
层面二:RabbitMQ宕机可能导致的消息的丢失。
层面三:消耗者宕机导致消息丢失。


层面一的办理方法常见的是
1.生产者确认机制
RabbitMQ提供了publisher confirm机制来制止消息发送到Mq的过程中丢失,消息发送到Mq以后,会返回一个结果给发送者,表现消息的发送乐成。
情况一:发送乐成 生产者正常发送消息到队列之后会返回一个publish-confirm ack 这个意思是告诉生产者已经吸收到消息了。
情况二:发送失败 这里的发送失败有两种,一种是生产者发送到交换机失败 此时返回 publish-confirm nack  。第二种是生产者发送到队列失败 返回 publish-return ack。

开启生产者确认机制的代码如下 ,在生产者的配置文件中参加以下配置
 
  1. spring:
  2.   rabbitmq:
  3.     publisher-confirm-type: correlated #开启生产者确认机制
  4.     publisher-returns: true
复制代码
这里的
  1. publisher-confirm-type:有三种模式可以选择:
复制代码
第一种是none:代表关闭confirm机制
第二种是 simple:表现同步壅闭并等候mq的回执消息,即发送完消息后不能干其他的事情,只能等候mq的回执,很显然这样服从很低。
第三种是correlated:MQ异步回调方式返回回执消息,即生产者发送完消息后可以干其他的事情,直到吸收到mq的回执。很显着这种服从要优于第二种。
配置return callback的代码如下,每个RabbitTemplate只能配置一个 代码如下
 
  1. package com.itheima.publisher.com.it.heima.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.ReturnedMessage;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.BeansException;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.context.ApplicationContext;
  8. import org.springframework.context.ApplicationContextAware;
  9. import org.springframework.context.annotation.Configuration;
  10. /**
  11. * @Auther: QuJingChuan
  12. * @Date: 2024/1/13 10:34
  13. * @Description:
  14. */
  15. @Slf4j
  16. @Configuration
  17. public class MqConfirmConfig implements ApplicationContextAware {
  18.     @Autowired
  19.     private RabbitTemplate rabbitTemplate;
  20.     @Override
  21.     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  22.         //配置回调
  23.         rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  24.             @Override
  25.             public void returnedMessage(ReturnedMessage returnedMessage) {
  26.                 log.debug("收到消息return的callback,  {},{},{},{},{}",
  27.                         returnedMessage.getExchange(),
  28.                         returnedMessage.getRoutingKey(),
  29.                         returnedMessage.getMessage(),
  30.                         returnedMessage.getReplyCode(),
  31.                         returnedMessage.getReplyText());
  32.             }
  33.         });
  34.     }
  35. }
复制代码

Confirm Callback必要每次发消息的时间都要配置(要订定发消息的id方便回执的时间直到是谁发的消息)这里写一个测试类方便各人看。
  1. @Test
  2.     void testConfirmCallback() throws InterruptedException {
  3.         //创建cd 参数为每次发送消息的id
  4.         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
  5.         //添加confirmCallBack
  6.         correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
  7.             @Override
  8.             public void onFailure(Throwable ex) {
  9.                 //这种情况一般是运行出现bug,一般不会发生。
  10.                 log.error("消息回调失败",ex);
  11.             }
  12.             @Override
  13.             public void onSuccess(CorrelationData.Confirm result) {
  14.                 log.debug("收到confirm callback 回执");
  15.                 if (result.isAck()){
  16.                     //消息发送成功
  17.                     log.debug("消息发送成功收到ack");
  18.                 }else {
  19.                     //消息发送失败
  20.                     log.debug("消息发送失败收到nack,原因:{}",result.getReason());
  21.                     //TODO 重发消息等业务
  22.                 }
  23.             }
  24.         });
  25.         rabbitTemplate.convertAndSend("amqp.test","amqptest","hello qjc",correlationData);
  26.         Thread.sleep(2000);
  27.     }
复制代码

那么我们如何办理这个题目呢
方案一:重发消息 
方案二:记录日志
方案三:保存到数据库中定时发送,发送乐成后删除表中的数据。
方案四:交给人工处置处罚。
~生产者确认机制必要额外的网络和系统的资源开销,尽量不要使用。
~假如业务必要,那么无需开启publisher-return机制,因为一般路由失败都是自己业务的原因。
~对于nack消息可以有限次数的重试,依然失败则记录异常消息。

层面二的办理方法常见的是
2.消息持久化
由于mq是基于内存存储消息的,那么在mq服务宕机等一些情况下可能导致消息的丢失。同时内存空间有限,当消耗者出现故障大概处置处罚过慢,会导致消息积存,mq会对消息做迁移(page out 写入磁盘)从而引发mq壅闭。我们将消息存储在磁盘上就制止了这个题目。
一 :持久化交换机。

这里要选择Durable,因为Transient是临时交换机,当mq宕机后会消失。

代码展示
 
  1. @Bean
  2.     public DirectExchange simpleExchange(){
  3.         //分别是三个参数 交换机名称 是否持久化 当没有队列绑定时是否自动删除
  4.         return new DirectExchange("qjc.exchange",true,false);
  5.     }
复制代码
二 :持久化队列。

这个与交换机类似,在此不做赘述。

代码展示
  1. @Bean
  2.     public Queue simpleQueue(){
  3.         //springamqp在使用QueueBuilder来创建队列的时候,默认就是持久化的
  4.         return QueueBuilder.durable("qjc.queue").build();
  5.     }
复制代码
三 :持久化消息。

这里选择delivery mode 选择2 ,1是不持久的。

代码展示
  1. Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8))
  2.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
  3.                 .build();
复制代码
假如不选择持久化队列,交换机,消息的话我们尚有另一种方案

Lazy Queue(惰性队列)
惰性队列的特征如下
~接受到消息的时间直接存入磁盘而非内存(内存中只保存最近的消息)
~消耗者必要消息的时间才会从磁盘中取出数据加载到内存
~支持数百万条的消息存储
在mq3.12版本后,全部的队列都是Lazy Queue模式,无法更改。
假如各位小伙伴的版本低于3.12那我这里提供了两种方式创建惰性队列



或用注解声明
  1.     @RabbitListener(queuesToDeclare = @Queue(
  2.             name = "lazy.queue",
  3.             durable = "true",
  4.             arguments = @Argument(name = "x-queue-mode",value = "lazy")
  5.     ))
  6.     public void listenLazyQueue(String msg){
  7.         log.debug("接收到lazyqueue的消息" + msg);
  8.     }
复制代码

3.消耗者确认机制
RabbitMQ支持消耗者确认机制,即:当消耗者处置处罚消息后可以向mq发送ack回执,mq收到消息后会在队列中删除该消息。

SpringAMQP已经实现了消息确认的功能,而且允许我们通过配置文件选择ack的处置处罚方式,有三种方式。
- none: 不处置处罚。即消息投递给消耗者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用  
- manual: 手动模式。必要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活  
- auto: 主动模式。SpringAMQP使用AOP对我们的消息处置处罚逻辑做了环绕增强,当业务正常执行时则主动返回ack.  
当业务出现异常时,根据异常判断返回差别结果:  
- 假如是业务异常,会主动返回nack  
- 假如是消息处置处罚或校验异常,主动返回reject
注意我们必要再消耗者的配置文件中参加参数

这就是mq保证消息不丢失的一些方式和办理方案。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表