立聪堂德州十三局店 发表于 2024-8-21 12:09:21

RabbitMQ-高级

RabbitMQ-高级



1 发送者的可靠性

1.1 发送者重连

有的时间由于网络颠簸,大概会出现发送者连接MQ失败的情况,通过配置我们可以开启连接失败后的重连机制:
https://i-blog.csdnimg.cn/direct/8c9d39902cf049aea2594b3b1bc52011.png
注意: 当网络不稳定的时间,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等候的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,发起禁用重试机制。如果一定要利用,请合理配置等候时长和重试次数,当然也可以考虑利用异步线程来实行发送消息的代码。
1.2 发送者确认

SpringAMOP提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MO会返回确认效果给发送者。返回的效果有以下几种情况:


[*]消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由非常缘故原由,然后返回ACK,告知投递成功(这种错误是程序员造成了,是路由代码的错误,不是MQ的题目)
[*]临时消息(非持久化,没有写入磁盘)投递到了MQ,而且入队成功,返回ACK,告知投递成功。
[*]持久消息投递到了MQ,而且入队完成持久化,返回ACK,告知投递成功。
[*]其它情况都会返回NACK,告知投递失败
https://i-blog.csdnimg.cn/direct/d413d7e57a324b73b183e06a5dec8b21.png
1.2.1 SpringAMQP实现发送者确认


[*]在publisher这个微服务的application.yml中添加配置
https://i-blog.csdnimg.cn/direct/5e9b34f1a0e14199bcb8f60a69ac02f7.png
​ 配置说明:


[*]这里publisher-confirm-type有三种模式可选:

[*]none:关闭confirm机制
[*]simple:同步阻塞等候MQ的回执消息
[*]correlated:MQ异步回调方式返回回执消息


[*]每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
https://i-blog.csdnimg.cn/direct/73f9178bf2ab4495a9353713bae996bc.png

[*] 发送消息,指定消息ID、消息ConfirmCallback
https://i-blog.csdnimg.cn/direct/7a9df5d91ab1400893ae558ad417b6a3.png
发送成功与否都会实行 ConfirmCallback 发送失败才会实行 ReturnCallback 而且ReturnCallback只需要初始化一次就行,以是就在配置类中初始化一次,利用PostConstruct
发送者:
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {

    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.error("监听到了消息return callback");
                log.debug("exchange:{}",returnedMessage.getExchange());
                log.debug("routingKey:{}",returnedMessage.getRoutingKey());
                log.debug("message:{}",returnedMessage.getMessage());
                log.debug("replyCode:{}",returnedMessage.getReplyCode());
                log.debug("replyText:{}",returnedMessage.getReplyText());
            }
      });
    }
}
@Test
    public void testConfirmCallback()throws InterruptedException {
      //0. 创建correlationData
      CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
      cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("Spring amqp 处理确认结果异常",ex);
            }

            @Override
            public void onSuccess(CorrelationData.Confirm result) {
                // 判断是否成功
                if(result.isAck()){
                  //成功
                  log.debug("收到ConfirmCallback ack,消息发送成功");
                }else {
                  //失败
                  log.error("收到ConfirmCallback nack,消息发送失败!reason:{}",result.getReason());
                }
            }
      });

      //1. 交换机名
      String exchangeName = "hmall.direct";
      //2. 消息
      String message = "hello,Spring amqp";
      //3. 发送消息
      rabbitTemplate.convertAndSend(exchangeName, "red", message + "red",cd);
      rabbitTemplate.convertAndSend(exchangeName, "yellow", message + "yellow",cd);
      rabbitTemplate.convertAndSend(exchangeName, "blue2", message + "blue",cd);        //路由失败
      Thread.sleep(2000);
    }
效果:
https://i-blog.csdnimg.cn/direct/1fcf2fa0ec25424eb181c47c519584fd.png
2 MQ的可靠性

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的耽误。这样会导致两个题目:


[*]一旦MQ宕机,内存中的消息将会丢失
[*]内存空间有限,当斲丧者故障或处置惩罚过慢时,会导致消息积压,引发MQ阻塞。
2.1 数据持久化

RabbitMQ实现数据持久化包括三个方面:


[*] 交换机持久化
[*] 队列持久化
[*] 消息持久化(如果交换机队列是持久的,但是消息是临时,那么MQ重启后消息也会丢失)
https://i-blog.csdnimg.cn/direct/7528d1d578c14e1c951f01d88ac81747.png
java代码:将消息转化为非持久的
    @Test
    public void testSendMessage() {
      //1. 自定义构建消息
      Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)        //非持久
                    //.setDeliveryMode(MessageDeliveryMode.PERSISTENT)        //持久
                .build();
      //2. 发送消息
      rabbitTemplate.convertAndSend("object.queue",msg);
    }
2.2 Lazy Queue

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的耽误。但在某些特殊情况下,这会导致消息积压,比如:


[*]斲丧者宕机或出现网络故障
[*]消息发送量激增,凌驾了斲丧者处置惩罚速度
[*]斲丧者处置惩罚业务发生阻塞
一旦出现消息堆积题目,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会泯灭一段时间,而且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处置惩罚新的消息,生产者的全部请求都会被阻塞。
为了解决这个题目,从RabbitMQ的3.6.0版本开始,就增长了Lazy Queues的模式,也就是惰性队列。惰性队列的特性如下:


[*]吸收到消息后直接存入磁盘而非内存
[*]斲丧者要斲丧消息时才会 从磁盘中读取并加载到内存(也就是写得快,但是懒加载,不过可以提前缓存部分消息到内存,最多2048条)
[*]支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为全部队列的默认格式。因此官方推荐升级MQ为3.12版本或者全部队列都设置为LazyQueue模式。
控制台利用: 要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:
https://i-blog.csdnimg.cn/direct/8a68e8f2fa28467f8ecc3d4206fb8630.png
Java客户端利用:
https://i-blog.csdnimg.cn/direct/44a4e835d99942fe964999f137216c1b.png
https://i-blog.csdnimg.cn/direct/04eae098c93849429ace26adb4b5e59f.png
https://i-blog.csdnimg.cn/direct/05dd21fcec4a439e8598239d61ff0c7e.png
3 斲丧者的可靠性

3.1 斲丧者确认机制

斲丧者确实机制(Consumer Acknowledgement)是为了确认斲丧者是否成功处置惩罚消息。当斲丧者处置惩罚消息竣事后,应该向RabbitMQ发送一个回执,告知RabbitMQ本身消息处置惩罚状态:


[*] ack:成功处置惩罚消息,RabbitMQ从队列中删除该消息
[*] nack:消息处置惩罚失败,RabbitMO需要再次投递消息
[*] reject:消息处置惩罚失败并拒绝该消息,RabbitMQ从队列中删除该消息
https://i-blog.csdnimg.cn/direct/47c15e5e6ba142c5aa5af54c2bc592a8.png
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处置惩罚方式,有三种方式:


[*] none:不处置惩罚。即消息投递给斲丧者后立即ack,消息会立即从MQ删除。非常不安全,不发起利用
[*] manual:手动模式。需要本身在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
[*] auto:自动模式。SpringAMQP利用AOP对我们的消息处置惩罚逻辑做了围绕增强,当业务正常实行时则自动返回ack.
当业务出现非常时,根据非常判定返回不同效果:

[*]如果是业务非常,比如RuntimeException,会自动返回nack
[*]如果是消息处置惩罚或校验非常,比如MessageConversionException,自动返回reject

https://i-blog.csdnimg.cn/direct/0fa6b072827247b8be5490fb3c96c932.png
3.2 失败重试策略

SpringAMQP提供了斲丧者失败重试机制,在斲丧者出现非常时利用本地重试,而不是无限的requeue到mg。我们可以通过在application.yaml文件中添加配置来开启重试机制:
https://i-blog.csdnimg.cn/direct/2d657b0c8532486295587c46f5e513a9.png
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处置惩罚,它包含三种不同的实现:


[*]RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
[*]ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
[*]RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
https://i-blog.csdnimg.cn/direct/f3d2bda0ea5e439dbf39cb16c4b0cc76.png
失败消息处置惩罚策略
将失败处置惩罚策略改为RepublishMessageRecoverer:


[*]首先,定义吸收失败消息的交换机、队列及其绑定关系,此处略:
[*]然后,定义RepublishMessageRecoverer:
https://i-blog.csdnimg.cn/direct/7f2ad44d119c4525990a524594404a29.png
@Configuration
@RequiredArgsConstructor
public class ErrorMessageConfiguration {

   private final RabbitTemplate rabbitTemplate;

   @Bean
   public DirectExchange errorExchange(){
       return new DirectExchange("error.direct");
   }

   @Bean
   public Queue errorQueue(){
       return new Queue("error.queue");
   }

   @Bean
   public Binding errorQueueBinding(){
       return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
   }

   @Bean
   public MessageRecoverer messageRecoverer(){
       return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
   }
}
https://i-blog.csdnimg.cn/direct/013ad41251244c67b12fcb3b7d91e5d1.png
3.3 业务幂等性

幂等是一个数学概念,用函数表达式来形貌是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,实行一次或多次对业务状态的影响是同等的。例如:


[*]根据id删除数据
[*]查询数据
[*]新增数据
但数据的更新每每不是幂等的,如果重复实行大概造成不一样的后果。比如:


[*]取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增长的情况
[*]退款业务。重复退款对商家而言会有经济丧失。
以是,我们要尽大概避免业务被重复实行。
然而在实际业务场景中,由于意外常常会出现业务被重复实行的情况,例如:


[*]页面卡顿时频繁刷新导致表单重复提交
[*]服务间调用的重试
[*]MQ消息的重复投递
以是,我们要尽大概避免业务被重复实行。
然而在实际业务场景中,由于意外常常会出现业务被重复实行的情况,例如:


[*]页面卡顿时频繁刷新导致表单重复提交
[*]服务间调用的重试
[*]MQ消息的重复投递
我们在用户支付成功后会发送MQ消息到生意业务服务,修改订单状态为已支付,就大概出现消息重复投递的情况。如果斲丧者不做判定,很有大概导致消息被斲丧多次,出现业务故障。
举例:

[*]假如用户刚刚支付完成,而且投递消息到生意业务服务,生意业务服务更改订单为已支付状态。
[*]由于某种缘故原由,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给生意业务服务。
[*]但是,在新投递的消息被斲丧之前,用户选择了退款,将订单状态改为了已退款状态。
[*]退款完成后,新投递的消息才被斲丧,那么订单状态会被再次改为已支付。业务非常。
因此,我们必须想办法保证消息处置惩罚的幂等性。这里给出两种方案:


[*]唯一消息ID
[*]业务状态判定
3.3.1 唯一消息id

方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息


[*]每一条消息都生成一个唯一的id,与消息一起投递给斲丧者。
[*]斲丧者吸收到消息后处置惩罚本身的业务,业务处置惩罚成功后将消息ID保存到数据库
[*]如果下次又收到相同消息,去数据库查询判定是否存在,存在则为重复消息放弃处置惩罚。
SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
此时吸收对象可以直接用Message吸收
https://i-blog.csdnimg.cn/direct/af19699fe7834b46868990506e175d55.png
3.3.2 业务判定

方案二:是结合业务逻辑,基于业务本身做判定。以余额支付业务为例:
https://i-blog.csdnimg.cn/direct/45e09caa5310495db4ee83ecd73a26fc.png
https://i-blog.csdnimg.cn/direct/224146ec60cf46eba9d1b5b89052b95f.png
java代码多了第1、2步
    public void listenPaySuccess(Long orderId){
      //1.查询订单
      Order order = orderService.getById(orderId);
      //2. 判断订单状态 是否为未支付
      if(order == null || order.getStatus() != 1){
            //不做处理
            return;
      }
      //3. 标记订单状态为已支付
      orderService.markOrderPaySuccess(orderId);
    }
https://i-blog.csdnimg.cn/direct/de544b7be4dd4797841d39d8f3cc07bf.png
4 耽误消息

耽误消息: 发送者发送消息时指定一个时间,斲丧者不会立即收到消息,而是在指定时间之后才收到消息。
耽误任务: 设置在一定时间之后才实行的任务。
https://i-blog.csdnimg.cn/direct/37076dd2940d420099752677faf43b2b.png
4.1 死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):


[*]斲丧者利用basic.reject或 basic.nack声明斲丧失败,而且消息的requeue参数设置为false
[*]消息是一个逾期消息(达到了队列或消息本身设置的逾期时间),超时无人斲丧
[*]要投递的队列消息堆积满了,最早的消息大概成为死信
如果 队列 通过dead-letter-exchange属性指定了一个 交换机 ,那么该队列中的死信就会投递到这个交换机中。这个交换机 称为死信交换机(Dead LetterExchange,简称DLX)。
https://i-blog.csdnimg.cn/direct/4487abbcc50846cc89bce506f9d21ed3.png
dlx交换机代码
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlx.queue", durable = "true"),
            exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
            key = {"hi"}
    ))
    public void listendlxQueue(String message) {
      log.info("消费者监听到dlx.queue的消息:{}", message);

    }
normal交换机
@Slf4j
@Configuration
public class NormalConfiguration {

    @Bean
    public DirectExchange normalExchange(){
      return new DirectExchange("normal.direct");
    }

    @Bean
    public Queue normalQueue(){
      return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
    }

    @Bean
    public Binding normalExchangeBinding(Queue normalQueue,DirectExchange normalExchange){

      return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
    }

}
值得一提的是,两个交换机中绑定的key要同等,比如例中都是"hi"
发送方添加了耽误时间的消息
    @Test
    public void testSendDelayMessage() {
//      //1. 自定义构建消息
//      Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8))
//                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)    //非持久化
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)      //持久化
//                .build();
      //2. 发送消息
      rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("10000");
                return message;
            }
      });
    }
   注意,自定义构建的消息是不具备将消息转化为JSON格式的,而我们之前设置了发送消息是JSON格式的,以是这里不能自定义消息。
当消息从normal.direct交换机到normal.queue队列,过了10s后,发现还没有人吸收,则会将消息由dlx.direct交换机发送给dlx.queue队列,这样就真正的斲丧者就可以担当耽误了10s后的消息。
4.2 耽误消息插件

这个插件可以将平凡交换机改造为支持耽误消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列
https://i-blog.csdnimg.cn/direct/52764a9f39a64707b979b4f2f0cd2723.png
4.2.1 声明耽误交换机

基于注解方式:
https://i-blog.csdnimg.cn/direct/a57df397439d4d018ebb994a5da6a297.png
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true",type = ExchangeTypes.DIRECT),
            key = {"hi"}
    ))
    public void listenDelayQueue(String message) {
      log.info("消费者监听到dlx.queue的消息:{}", message);

    }
基于@Bean方式
https://i-blog.csdnimg.cn/direct/481000bac0994c4d8e163ccc9496239e.png
4.2.2 发送耽误消息

发送消息时需要通过消息头x-delay来设置逾期时间:
https://i-blog.csdnimg.cn/direct/422619ac7036465bb72e38fe2c2c9d25.png
    @Test
    public void testSendDelayMessageByPlugin() {
      //发送消息
      rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(10000);
                return message;
            }
      });
    }

耽误插件更方便,只要声明一个耽误交换机、队列及其斲丧者即可,不消声明两套,值得注意的是发送消息中耽误函数变成了setDelay
4.3 取消超时订单

https://i-blog.csdnimg.cn/direct/8dea1fc800a547019e2b039ebcd3cfa5.png
https://i-blog.csdnimg.cn/direct/c61f527951e3405583db3b8bf5d6eb95.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: RabbitMQ-高级