f 数据仓库与分析-RabbitMQ-高级 - Powered by qidao123.com技术社区

RabbitMQ-高级

宁睿  论坛元老 | 2025-5-20 11:57:47 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 2150|帖子 2150|积分 6460

RabbitMQ-高级


  
前言:消息可靠性问题

如图,在实现业务中可能会出现以下几个问题:


  • 支付服务向MQ发送消息时网络故障,导致消息消息丢失
  • MQ还没有来得及发送至好易服务,自己宕机导致消息丢失
  • 交易服务执行时抛出异常或者宕机
这三个任意一部门出现问题都会导致业务的执行失败,因此我们来一起学习MQ怎样保证消息可靠性。
1.生产者可靠性

1.生产者重连

有的时候由于网络颠簸,可能会出现客户端毗连MQ失败的情况。通过设置我们可以开启毗连失败后的重连机制
  1. spring:
  2.   rabbitmq:
  3.     connection-timeout: ls # 设置MQ的连接超时时间
  4.     template:
  5.         retry:
  6.         enabled: true # 开启超时重试机制
  7.         initial-interval: 1000ms # 失败后的初始等待时间
  8.         multiplier: l # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
  9.         max-atempts: 3 # 最大重试次数
复制代码
留意(客户端毗连的重试,不是消息发送的重试)
当网络不稳固的时候,利用重试机制可以有用提高消息发送的乐成率。不过SpringAMQ提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当火线程是被阻塞的,会影响业务性能
如果对于业务性能有要求,建议禁用重试机制。如果一定要利用,请合理设置等待时长和重试次数,当然也可以考虑利用异步线程来执行发送消息的代码。
2.生产者确认机制

RabbitMq实现了Publisher ConfirmPublisher Return两种确认机制。开启确认机制后,在MQ乐成收到消息后会返回确认消息给生产者。返回的效果有以下几种:


  • 消息投递到了MQ,但是路由失败(一样平常会是代码或者路由设置的问题)。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递乐成。
  • 临时消息投递到了MQ,并且乐成进入队列,返回ACK,告知投递乐成
  • 长期消息投递到了MQ,并且乐成进入队列和长期化,返回ACK,告知投递乐成
  • 其他情况都会返回NACK,告知投递失败

3.生产者代码实现原理

利用SpringAMQP实现生产者确认

  • 在publisher这个微服务的application.yml中添加设置:
    1. spring:
    2.     rabbitmq:
    3.     publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    4.     publisher-returns: true # 开启publisher return机制
    5. #配置说明:
    6. #这里publisher-confirm-type有三种模式可选:
    7.   #none:关闭confirm机制
    8.   #simple:同步阻塞等待MQ的回执消息
    9.   #correlated:MQ异步回调方式返回回执消息
    复制代码
  • 每个RabbitTemplate只能设置一个ReturnCallback,因此需要在项目启动过程中设置:
  1. @S1f4j
  2. @Configuration
  3. public class CommonConfig implements ApplicationContextAware {
  4. @Override
  5. public void setApplicationContext(ApplicationContext applicationContext)
  6.     throws BeansException
  7. {
  8.     // 获取RabbitTemplate
  9.     RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  10.     // 设置ReturnCallback
  11.     rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
  12.             @Override
  13.             public void returnedMessage(ReturnedMessage returnedMessage) {
  14.                 log.info("收到消息的ReturnBack: exchange:{}, route:{}, replyCode:{}, replyText:{}",
  15.                         returnedMessage.getExchange(),
  16.                         returnedMessage.getRoutingKey(),
  17.                         returnedMessage.getReplyCode(),
  18.                         returnedMessage.getReplyText());
  19.             }
  20.         });
  21. }}
复制代码
关于ApplicationContextAware :


  • 界说
    ApplicationContextAware 是 Spring 框架提供的一个接口,允许 Bean 感知并获取 Spring 容器的引用(即 ApplicationContext)。当一个类实现该接口后,Spring 会在初始化该 Bean 时自动调用 setApplicationContext() 方法,并传入当前的 ApplicationContext 对象。
  • 作用
    获取 Spring 容器(ApplicationContext),用于动态管理 Bean、读取设置、发布事件等。
    在非依赖注入场景下访问 Spring 功能,例如在工具类或设置类中手动获取 Bean。
  • 利用场景
    动态获取 Bean(如运行时根据条件加载不同的组件)。
    设置全局回调(如示例中的 RabbitTemplate 设置 ReturnCallback)。
    框架扩展(如自界说 Starter 需要与 Spring 容器交互)
​ 3.发送消息,指定消息Id,消息ConfirmCallBack(每次发消息时指定)
  1. @Test
  2. void testPublisherConfirm() throws InterruptedException {
  3.     // 1. 创建CorrelationData
  4.     CorrelationData cd = new CorrelationData();
  5.     // 2. 给Future添加ConfirmCallback
  6.     cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
  7.         @Override
  8.         public void onFailure(Throwable ex) {
  9.             // 2.1. Future发生异常时的处理逻辑,基本不会触发
  10.             log.error("Handle message ack fail", ex);
  11.         }
  12.         @Override
  13.         public void onSuccess(CorrelationData.Confirm result) {
  14.             // 2.2. Future接收到回执的处理逻辑,参数中的result就是回执内容
  15.             if(result.isAck()){ // result.IsAck(), boolean类型, true代表ack回执, false 代表 nack回执
  16.                 log.debug("发送消息成功,收到 ack!");
  17.             }else{ // result.getReason(), String类型, 返回nack时的异常描述
  18.                 log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
  19.             }
  20.         }
  21.     });
  22.     // 3. 发送消息
  23.     rabbitTemplate.convertAndSend("ExchangeName", "RoutingKey", "hello", cd);
  24. }
复制代码
关于CorrelationData:

  • 唯一标识消息

    • CorrelationData 通常包含一个唯一的 ID(如 correlationId),用于标识某条消息,以便在收到 ACK/NACK 回执时能精确匹配到对应的消息。
    • 如果不手动设置 ID,RabbitMQ 会自动生成一个。

  • 接收消息确认效果

    • 通过 cd.getFuture() 可以获取一个 ListenableFuture,用于异步监听该消息的 ACK(乐成)NACK(失败) 回执。
    • 示例代码中通过 addCallback() 方法注册回调,处理消息简直认效果。

  • 与 rabbitTemplate 配合利用

    • 在发送消息时(convertAndSend),将 CorrelationData 传入,RabbitMQ 会在 Broker 确认消息后,通过该对象返回效果

2.MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的耽误。如许会导致两个问题:


  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

如图当MQ的消息堆积时,MQ会执行PageOut,将老的消息落到磁盘上,给内存腾出空间,而这个过程MQ是阻塞执行的,此时来新的消息就可能无法执行,严重影响效率。
1.数据长期化

RabbitMQ实现数据长期化的3个方面:

  • 互换机长期化(默认临时,在java代码中实现则是默认长期化,队列一样)

  • 队列长期化

  • 消息长期化
    Delivery mode为2时则为长期模式

这里给出消息长期和非长期代码,各人可以尝试测试MQ的运行过程:
  1.     @Test
  2.     void testPageOut(){
  3.         Message message = MessageBuilder
  4.                 .withBody(("Hello,world!").getBytes())
  5.             //持久化
  6.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
  7.                 //非持久化
  8.                 //.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();
  9.         for (int i=0;i<1000000;i++){
  10.             rabbitTemplate.convertAndSend("study.direct","blue",message);
  11.         }
  12.     }
复制代码
如图为开启了生产者确认机制下消息非长期化,会发现大约三十秒左右只发送了十分之一。(看In memory)

此时我们关闭后再次执行会发现消息处理快许多,此时出现Paged Out,图中折线降为0处即执行Paged Out出现阻塞

下图为消息长期化,会发现并未执行Paged OutPersistent体现长期化到磁盘中,每当In memory到达阈值会删除一些信息,此时折线图的暂时下降一些(效率暂时降低)。

2.LazyQueue

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queue的概念,也就是惰性队列
惰性队列的特性如下:


  • 接收到消息后直接存入磁盘而非内存(内存中只保留近来的消息,默认2048条)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

  • 要设置一个队列为惰性队列,只需要在声明队列时,订定x-queue-mode属性为lazy即可:(3.12后的默以为lazy)

  • 在java代码中

    基于注解

如图为我们向lazy队列发送一百万条数据:

会发现所有消息直接写进Paged Out,且险些一直在峰值,执行时间也只有19s

相比消息长期化快了许多(虽然是写入磁盘,但底层对IO有特别处理)。
3.消费者可靠性

1.消费者确认机制

1.确认机制

为了确认消费者是否乐成处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。
回执有三种可选值


  • ack: 乐成处理消息,RabbitMQ从队列中删除该消息
  • nack: 消息处理失败,RabbitMQ需要再次投递消息
  • reject: 消息处理失败并拒绝该消息,RabbitMQ**从队列中删除**该消息
2.确认功能

SpringAMQP实现了消息确认功能。并允许我们通过设置文件选择ACK处理方式
有以下三种方式:


  • none:**不处理。**即消息发送给消费者后立即ack,消息会立即从MQ删除。非常不安全,不建议利用
  • manual:**手动模式。**需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
  • auto自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了围绕加强,当业务正常执行时则自动返回ack。
    业务异常时,根据异常判断返回不同效果:

    • 如果是业务异常,会自动返回nack
    • 如果是消息处理或检验异常,会自动返回reject
      1. spring:
      2.   rabbitmq:
      3.     listener:
      4.       simple:
      5.         prefetch: 1
      6.         acknowledge-mode: none #关闭ack;manual,手动ack;auto,自动ack
      复制代码

各人可以在编写代码测试效果,这里不做演示。
2.失败重试机制

1.开启失败重试机制

消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理频升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用当地重试,而不是无限制的requeue到mq队列:
  1. spring:  
  2.   rabbitmq:  
  3.     listener:  
  4.       simple:  
  5.         prefetch: 1
  6.         acknowledge-mode: auto
  7.         retry:  
  8.           enabled: true # 开启消费者失败重试  
  9.           initial-interval: 1000ms # 初始的失败等待时长为1秒  
  10.           multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval  
  11.           max-attempts: 3 # 最大重试次数  
  12.           stateless: true # true无状态; false有状态。如果业务中包含事务,这里改为false  
  13.           #如果消息处理逻辑不涉及事务(如纯计算、查询等),用 stateless: true(默认值)。
  14.                   #如果消息处理包含事务(如订单支付等),需设为 stateless: false,否则重试时事务可能失效
复制代码
2.多次失败处理

开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现:


  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject抛弃消息。默认就是这种方式。
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队。
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的互换机,后续人工参与。

    这里我们演示一下第三种方式:

  • 将失败处理策略改为RepublishMessageRecoverer
    a. 起首,界说接收失败消息的互换机,队列及其绑定关系
    b. 然后,界说RepublishMessageRecoverer
    1. @Configuration
    2. public class DirectConfiguration {
    3.      @Bean
    4.      public DirectExchange directExchange() {
    5.          return new DirectExchange("error.exchange");
    6.      }
    7.      @Bean
    8.      public Queue errorQueue() {
    9.          return new Queue("error.queue");
    10.      }
    11.     @Bean
    12.      public Binding errorBinding(Queue errorQueue,
    13.                            DirectExchange directExchange)
    14.      {
    15.         return BindingBuilder.bind(errorQueue).
    16.                to(directExchange).with("error");
    17.      }
    18.      @Bean
    19.      public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate)
    20.      {
    21.          return new RepublishMessageRecoverer(
    22.              rabbitTemplate, "error.exchange", "error");
    23.      }
    24. }
    复制代码
  • 启动消费者并利用生产者发送消息
    1. //消费者
    2.     @RabbitListener(queues = "simple.queue")
    3.     public void listener2(String msg){
    4.         log.info("我是超人收到消息:{}",msg);
    5.         throw new RuntimeException("故意的");
    6.     }
    7. //-----------------------------------------------------------------
    8. //生产者
    9.   @Test
    10.     void testSendMessage2Queue(){
    11.         String queueName="simple.queue";
    12.         String msg="Hello,world!";
    13.         rabbitTemplate.convertAndSend(queueName,msg);
    14.     }
    复制代码
  • 运行效果

    error.queue收到的消息

3.业务幂等性

幂等是一个数学概念,用函数表达式来形貌是如许的:f(x)=f(f(x))。在程序开发中,则是只同一个业务,执行一次或多次对业务状态的影响是一致的。

因此为了保证业务幂等性我们应该怎么做呢?
这里给出个解决方案:

  • 唯一消息Id

    • 每一条消息都生成一个唯一的Id,与消息一起投递给消费者。
    • 消费者接收到消息后处理自己的业务,业务处理乐成后将Id保存到数据库
    • 如果下次又收到雷同的消息,去数据库查询是都存在,存在则为重复消息并放弃处理
    1. @Bean
    2.     public MessageConverter jackaonMessageConvert(){
    3.         Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    4.         jjmc.setCreateMessageIds(true);
    5.         return jjmc;
    6.     }
    7. // jjmc.setCreateMessageIds(true);这个方法会给转化的消息加入一个UUID确保Id唯一
    复制代码

  • 业务判断
    联合业务逻辑,基于业务本身做判断,以修改订单状态为例:我们需要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否为未支付。只有未支付订单才需要修改,其他状态不做处理:

根据本节学习内容回答问题:

  • 怎样保证支付服务与交易服务之间的订单状态一致性?

    • 起首,支付服务会正在用户支付乐成以后利用MQ消息通知交易服务,完成订单状态同步。
    • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的长期化,避免因服务有机导致消息丢失。
    • 最后,我们还在交易服务更新订单状态时做了业务量等判断,避免因消息重复消费导致订单状态异常。

  • 如果交易服务消息处理失败,有没有什么兜底方案?

    • 我们可以在交易服务设置定时任务(主动方案,可以想象为高考结束学校(MQ)一直不通知查分(不发送订单信息,可能发送失败),我们自己查(定时任务)),定期查询订单支付状态。如许即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的终极一致性。

4.耽误消息

1.明白耽误消息

耽误消息:生产者发送消息时指定一个时间,消费者不会立即收到消息,而是在指定时间之后才收到消息。
延时任务:设置在一定时间之后才执行的任务。

2.耽误消息的实现

1.死信互换机

什么是死信互换机?
当一个队列中的消息满意下列情况之一时,就会成为死信(dead letter)


  • 消费者利用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个逾期消息(达到了队列或消息本身设置的逾期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过dead-letter-exchange属性订定了一个互换机,那么该队列中的死信就会投递到这个互换机中。这个互换机成为死信互换机(Dead Letter Exchange,简称DLX)。

如图给simple.queue队列通过属性绑定了dlx.direct互换机(称作死信互换机,与平常互换机无异),当发布一个逾期时间为30s的消息进入队列后,没有消费者消费导致消息逾期,此时消息被被转发至死信互换机,终极被消费者消费,利用死信机制实现了耽误消息发送。
我们来简单实现一下:

  • 创建两组互换机和队列(图中的两组)并进行关系属性绑定(这里不做演示自行操纵)
  • 编写生产者与消费者代码,并设置超时时间
    1. //生产者
    2. @Test
    3. void testSendTTLMessage(){
    4. String exchangeName="simple.direct";
    5. String msg="死信";
    6. rabbitTemplate.convertAndSend(exchangeName, "dlx", msg, new MessagePostProcessor()
    7. {
    8.     @Override
    9.     public Message postProcessMessage(Message message) throws AmqpException
    10.     {
    11.         message.getMessageProperties().setExpiration("10000");//10s
    12.         return message;
    13.      }
    14.   });
    15.         log.info("发送成功!");
    16. }
    复制代码
    1. //消费者
    2. @RabbitListener(queues = "dlx.queue")
    3. public void listenerDlx(String msg)
    4. {
    5.   log.info("我是超人,收到死信消息:{}",msg);
    6. }
    复制代码
  • 消费者查察消息到达时间

    • 发送时间

    • 消费者收到时间


2.耽误消息插件

RabbitMQ官方推出的一个原生支持耽误消息功能的插件。该插件的原理是计划了一种支持耽误消息功能的互换机,当消息投递到互换机后可以暂存一定时间,到期后再投递到队列。
插件需要自己安装,详细安装方式自行学习。
在java代码中实现发送监听耽误队列的消息:
  1. //消费者
  2. @RabbitListener(bindings = @QueueBinding(
  3.   value=@Queue(name="delay.queue",durable ="true"),
  4.   exchange = @Exchange(name ="delay.direct",delayed = "true"),
  5.   key ="delay"))
  6. public void listenerDelay(String msg)
  7. {
  8.     log.info("收到delay.queue的延迟消息:{}"+msg);
  9. }
  10. //--------------------------------------------
  11. //生产者
  12. @Test
  13. void testSendDelayMessage()
  14. {
  15.    String exchangeName="delay.direct";
  16.    String msg="延迟999999ms+";
  17.    rabbitTemplate.convertAndSend(exchangeName, "delay", msg, new MessagePostProcessor()
  18.   {
  19.     @Override
  20.     public Message postProcessMessage(Message message) throws AmqpException
  21.     {
  22.       message.getMessageProperties().setDelay(5000);//延迟5s
  23.       return message;
  24.     }});
  25.   log.info("延迟消息发送成功!");
  26. }
复制代码
发送效果:

  • 生产者发送消息

  • 消费者5s后收到消息

如上我们保举实现耽误消息时利用插件实现,当然所有的定时功能都是有性能斲丧的(redis除外),MQ和Spring内部在程序内部维护一个时钟,始终每隔一会向前跳一次(精度高的可能毫秒以致纳秒级跳一次),每定一个定时任务,都需要维护自己的时钟,时钟的运行就需要cpu不断计算,因此定时任务是一种cpu密集型任务,定时任务越多,cpu斲丧越大,导致cpu压力增大,所以什么时候去利用它取决于业务的详细情况。
本文系统讲解了RabbitMQ高可靠方案,涵盖生产者确认、长期化机制、消费者重试策略及幂等性保障,同时解析了耽误消息的两种实现方式(死信互换机和插件)。通过设置重试、ACK机制和Lazy Queue优化性能与可靠性,联合业务场景计划兜底方案,确保消息终极一致性。学习后深刻体会到消息队列在分布式系统中保障数据安全与高效传输的核心价值。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

宁睿

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表