ToB企服应用市场:ToB评测及商务社交产业平台

标题: RabbitMQ-高级 [打印本页]

作者: 立聪堂德州十三局店    时间: 2024-8-21 12:09
标题: RabbitMQ-高级
RabbitMQ-高级


  
1 发送者的可靠性

1.1 发送者重连

有的时间由于网络颠簸,大概会出现发送者连接MQ失败的情况,通过配置我们可以开启连接失败后的重连机制:

注意: 当网络不稳定的时间,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等候的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,发起禁用重试机制。如果一定要利用,请合理配置等候时长和重试次数,当然也可以考虑利用异步线程来实行发送消息的代码。
1.2 发送者确认

SpringAMOP提供了 Publisher ConfirmPublisher Return 两种确认机制。开启确机制认后,当发送者发送消息给MQ后,MO会返回确认效果给发送者。返回的效果有以下几种情况:


1.2.1 SpringAMQP实现发送者确认


​ 配置说明:



2 MQ的可靠性

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的耽误。这样会导致两个题目:

2.1 数据持久化

RabbitMQ实现数据持久化包括三个方面:


java代码:将消息转化为非持久的
  1.     @Test
  2.     public void testSendMessage() {
  3.         //1. 自定义构建消息
  4.         Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8))
  5.                 .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)        //非持久
  6.                     //.setDeliveryMode(MessageDeliveryMode.PERSISTENT)        //持久
  7.                 .build();
  8.         //2. 发送消息
  9.         rabbitTemplate.convertAndSend("object.queue",msg);
  10.     }
复制代码
2.2 Lazy Queue

在默认情况下,RabbitMQ会将吸收到的信息保存在内存中以降低消息收发的耽误。但在某些特殊情况下,这会导致消息积压,比如:

一旦出现消息堆积题目,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个行为成为PageOut. PageOut会泯灭一段时间,而且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处置惩罚新的消息,生产者的全部请求都会被阻塞。
为了解决这个题目,从RabbitMQ的3.6.0版本开始,就增长了Lazy Queues的模式,也就是惰性队列。惰性队列的特性如下:

而在3.12版本之后,LazyQueue已经成为全部队列的默认格式。因此官方推荐升级MQ为3.12版本或者全部队列都设置为LazyQueue模式。
控制台利用: 要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:

Java客户端利用:



3 斲丧者的可靠性

3.1 斲丧者确认机制

斲丧者确实机制(Consumer Acknowledgement)是为了确认斲丧者是否成功处置惩罚消息。当斲丧者处置惩罚消息竣事后,应该向RabbitMQ发送一个回执,告知RabbitMQ本身消息处置惩罚状态:


SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处置惩罚方式,有三种方式:


3.2 失败重试策略

SpringAMQP提供了斲丧者失败重试机制,在斲丧者出现非常时利用本地重试,而不是无限的requeue到mg。我们可以通过在application.yaml文件中添加配置来开启重试机制:

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处置惩罚,它包含三种不同的实现:


失败消息处置惩罚策略
将失败处置惩罚策略改为RepublishMessageRecoverer:


  1. @Configuration
  2. @RequiredArgsConstructor
  3. public class ErrorMessageConfiguration {
  4.    private final RabbitTemplate rabbitTemplate;
  5.    @Bean
  6.    public DirectExchange errorExchange(){
  7.        return new DirectExchange("error.direct");
  8.    }
  9.    @Bean
  10.    public Queue errorQueue(){
  11.        return new Queue("error.queue");
  12.    }
  13.    @Bean
  14.    public Binding errorQueueBinding(){
  15.        return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");
  16.    }
  17.    @Bean
  18.    public MessageRecoverer messageRecoverer(){
  19.        return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
  20.    }
  21. }
复制代码

3.3 业务幂等性

幂等是一个数学概念,用函数表达式来形貌是这样的:f(x) = f(f(x)),例如求绝对值函数。
在程序开发中,则是指同一个业务,实行一次或多次对业务状态的影响是同等的。例如:

但数据的更新每每不是幂等的,如果重复实行大概造成不一样的后果。比如:

以是,我们要尽大概避免业务被重复实行。
然而在实际业务场景中,由于意外常常会出现业务被重复实行的情况,例如:

以是,我们要尽大概避免业务被重复实行。
然而在实际业务场景中,由于意外常常会出现业务被重复实行的情况,例如:

我们在用户支付成功后会发送MQ消息到生意业务服务,修改订单状态为已支付,就大概出现消息重复投递的情况。如果斲丧者不做判定,很有大概导致消息被斲丧多次,出现业务故障。
举例:
因此,我们必须想办法保证消息处置惩罚的幂等性。这里给出两种方案:

3.3.1 唯一消息id

方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息

SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
  1. @Bean
  2. public MessageConverter messageConverter(){
  3.     // 1.定义消息转换器
  4.     Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
  5.     // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
  6.     jjmc.setCreateMessageIds(true);
  7.     return jjmc;
  8. }
复制代码
此时吸收对象可以直接用Message吸收

3.3.2 业务判定

方案二:是结合业务逻辑,基于业务本身做判定。以余额支付业务为例:


java代码多了第1、2步
  1.     public void listenPaySuccess(Long orderId){
  2.         //1.查询订单
  3.         Order order = orderService.getById(orderId);
  4.         //2. 判断订单状态 是否为未支付
  5.         if(order == null || order.getStatus() != 1){
  6.             //不做处理
  7.             return;
  8.         }
  9.         //3. 标记订单状态为已支付
  10.         orderService.markOrderPaySuccess(orderId);
  11.     }
复制代码

4 耽误消息

耽误消息: 发送者发送消息时指定一个时间,斲丧者不会立即收到消息,而是在指定时间之后才收到消息。
耽误任务: 设置在一定时间之后才实行的任务。

4.1 死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

如果 队列 通过dead-letter-exchange属性指定了一个 交换机 ,那么该队列中的死信就会投递到这个交换机中。这个交换机 称为死信交换机(Dead LetterExchange,简称DLX)。

dlx交换机代码
  1.     @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "dlx.queue", durable = "true"),
  3.             exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
  4.             key = {"hi"}
  5.     ))
  6.     public void listendlxQueue(String message) {
  7.         log.info("消费者监听到dlx.queue的消息:{}", message);
  8.     }
复制代码
normal交换机
  1. @Slf4j
  2. @Configuration
  3. public class NormalConfiguration {
  4.     @Bean
  5.     public DirectExchange normalExchange(){
  6.         return new DirectExchange("normal.direct");
  7.     }
  8.     @Bean
  9.     public Queue normalQueue(){
  10.         return QueueBuilder.durable("normal.queue").deadLetterExchange("dlx.direct").build();
  11.     }
  12.     @Bean
  13.     public Binding normalExchangeBinding(Queue normalQueue,DirectExchange normalExchange){
  14.         return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
  15.     }
  16. }
复制代码
值得一提的是,两个交换机中绑定的key要同等,比如例中都是"hi"
发送方添加了耽误时间的消息
  1.     @Test
  2.     public void testSendDelayMessage() {
  3. //        //1. 自定义构建消息
  4. //        Message msg = MessageBuilder.withBody("hello,SpringAMQP".getBytes(StandardCharsets.UTF_8))
  5. //                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)    //非持久化
  6.                 .setDeliveryMode(MessageDeliveryMode.PERSISTENT)      //持久化
  7. //                .build();
  8.         //2. 发送消息
  9.         rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", new MessagePostProcessor() {
  10.             @Override
  11.             public Message postProcessMessage(Message message) throws AmqpException {
  12.                 message.getMessageProperties().setExpiration("10000");
  13.                 return message;
  14.             }
  15.         });
  16.     }
复制代码
  注意,自定义构建的消息是不具备将消息转化为JSON格式的,而我们之前设置了发送消息是JSON格式的,以是这里不能自定义消息。
  当消息从normal.direct交换机到normal.queue队列,过了10s后,发现还没有人吸收,则会将消息由dlx.direct交换机发送给dlx.queue队列,这样就真正的斲丧者就可以担当耽误了10s后的消息。
4.2 耽误消息插件

这个插件可以将平凡交换机改造为支持耽误消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列

4.2.1 声明耽误交换机

基于注解方式:

  1.     @RabbitListener(bindings = @QueueBinding(
  2.             value = @Queue(name = "delay.queue", durable = "true"),
  3.             exchange = @Exchange(name = "delay.direct", delayed = "true",type = ExchangeTypes.DIRECT),
  4.             key = {"hi"}
  5.     ))
  6.     public void listenDelayQueue(String message) {
  7.         log.info("消费者监听到dlx.queue的消息:{}", message);
  8.     }
复制代码
基于@Bean方式

4.2.2 发送耽误消息

发送消息时需要通过消息头x-delay来设置逾期时间:

  1.     @Test
  2.     public void testSendDelayMessageByPlugin() {
  3.         //发送消息
  4.         rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", new MessagePostProcessor() {
  5.             @Override
  6.             public Message postProcessMessage(Message message) throws AmqpException {
  7.                 message.getMessageProperties().setDelay(10000);
  8.                 return message;
  9.             }
  10.         });
  11.     }
复制代码
耽误插件更方便,只要声明一个耽误交换机、队列及其斲丧者即可,不消声明两套,值得注意的是发送消息中耽误函数变成了setDelay
4.3 取消超时订单




免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4