ToB企服应用市场:ToB评测及商务社交产业平台
标题:
MQ - RabbitMQ - 消息的可靠性 --学习笔记
[打印本页]
作者:
八卦阵
时间:
2024-7-19 19:51
标题:
MQ - RabbitMQ - 消息的可靠性 --学习笔记
消息的可靠性
RabbitMQ 提供了一系列的特性和机制来确保消息的可靠性,即确保消息不丢失、按需到达目的地。要实现在 RabbitMQ 中消息的可靠性,可通过以下几个方面举行操作:
一、发送者的可靠性
1、生产者重试机制
什么是生产者重试机制?
消息队列(MQ)的生产者重试机制是指当生产者尝试将消息发送到消息队列中时,如果由于某些原因(如网络题目、队列不可用或其他任何导致消息发送失败的题目)导致消息未能成功发送,那么生产者会根据预设的计谋尝试重新发送消息的过程。
如何实现生产者重试机制?
修改设置文件即可:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
复制代码
2、生产者确认机制
什么是生产者确认机制?
生产者确认机制是一种确保消息从生产者安全达到消息队列的方法,尤其是在利用像RabbitMQ如许的消息中心件时,这种机制尤为重要。生产者确认(Publisher Confirms)也被视为是一种可靠性保障,使得生产者在发送消息后可以知晓该消息是否已被消息队列成功吸取。
在不利用生产者确认机制的情况下,生产者将消息发送出去之后,就没有办法知道消息是否正确到达队列。这在某些场景下大概会导致数据丢失。因此,通过实现生产者确认机制,可以显著增长消息体系的可靠性。
如何实现生产者确认机制?
RabbitMQ提供的生产者消息确认机制包罗Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处置处罚的情况返回不同的
回执
。
当消息投递到MQ,但是路由失败时,通过
Publisher Return
返回非常信息,同时返回ack简直认信息,代表投递成功
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
长期消息投递到了MQ,并且入队完成长期化,返回ACK ,告知投递成功
别的情况都会返回NACK,告知投递失败
其中ack和nack属于
Publisher Confirm
机制,ack是投递成功;nack是投递失败。而return则属于
Publisher Return
机制。
默认两种机制都是关闭状态,必要通过设置文件来开启:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
复制代码
这里publisher-confirm-type有三种模式可选:
none:关闭confirm机制
simple:同步阻塞等候MQ的回执
correlated:MQ异步回调返回回执
一样平常保举利用correlated模式,分身了性能可靠性
ReturnCallback
ReturnCallback是RabbitMQ中Java客户端库提供的一个接口,用于处置处罚不能被路由到合适队列的消息。每个RabbitTemplate只能设置一个ReturnCallback,因此我们可以在设置类中统一设置:
package com.itheima.publisher.config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
复制代码
ConfirmCallback
ConfirmCallback是RabbitMQ在Java客户端库中提供的一个接口,用于处置处罚消息确认情况。当生产者将消息发送到RabbitMQ服务器时,可以通过实现ConfirmCallback接口来监听这些消息是否成功被服务器吸取(acknowledged)或被拒绝(nacked)。这是一种异步通信模式,允许生产者在不阻塞当火线程的情况下获得消息发送后的状态反馈。由于每个消息发送时的处置处罚逻辑不一定相同,因此ConfirmCallback必要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id:消息的唯一标示,MQ对不同的消息的回执以此做判定,制止混淆
SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处置处罚消息回执:
示例,新建一个测试,向体系自带的交换机发送消息,并且添加ConfirmCallback:
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
复制代码
二、MQ的可靠性
1、数据长期化
为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须设置数据长期化,包罗:
交换机长期化
队列长期化
消息长期化
在控制台设置数据长期化:
交换机&队列:添加时设置Durability参数为Durable就是长期化模式,Transient就是临时模式。
消息:发送消息时设置Dilivery mode参数为persistent即可
在java客户端设置数据长期化:
交换机&队列:创建时设置属性durable为true即可
消息:发送时设置MessageProperties.PERSISTENT_TEXT_PLAIN属性:
String message = "Hello World!";
channel.basicPublish("my_exchange", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
复制代码
注意:只管消息标记为长期化,RabbitMQ也不会为每条消息都做fsync操作,由于如许会严重影响性能。RabbitMQ会按照自己的节奏批量地将消息写入磁盘。
2、LazyQueue
在默认情况下,RabbitMQ会将吸取到的信息保存在内存中以降低消息收发的延迟。但在某些特别情况下,这会导致消息积压,比如:
消费者宕机或出现网络故障
消息发送量激增,凌驾了消费者处置处罚速度
消费者处置处罚业务发生阻塞
一旦出现消息堆积题目,RabbitMQ的内存占用就会越来越高,直到触发内存预警上限。此时RabbitMQ会将内存消息刷到磁盘上,这个举动成为PageOut. PageOut会泯灭一段时间,并且会阻塞队列历程。因此在这个过程中RabbitMQ不会再处置处罚新的消息,生产者的所有请求都会被阻塞。
为相识决这个题目,从RabbitMQ的3.6.0版本开始,就增长了Lazy Queues的模式,也就是惰性队列。惰性队列的特征如下:
吸取到消息后直接存入磁盘而非内存
消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
支持数百万条的消息存储
而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方保举升级MQ为3.12版本大概所有队列都设置为LazyQueue模式。
控制台设置Lazy模式
在添加队列的时候,添加x-queue-mod=lazy参数即可设置队列为Lazy模式:
代码设置Lazy模式
在利用SpringAMQP声明队列的时候,添加x-queue-mod=lazy参数也可设置队列为Lazy模式:
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}
复制代码
这里是通过QueueBuilder的lazy()函数设置Lazy模式,底层源码如下:
也可以基于注解来声明队列并设置为Lazy模式:
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);
}
复制代码
三、消费者的可靠性
1、消费者确认机制
为了确认消费者是否成功处置处罚消息,RabbitMQ提供了消费者确认机制(
Consumer Acknowledgement
)。即:当消费者处置处罚消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处置处罚状态。回执有三种可选值:
ack:成功处置处罚消息,RabbitMQ从队列中删除该消息
nack:消息处置处罚失败,RabbitMQ必要再次投递消息
reject:消息处置处罚失败并拒绝该消息,RabbitMQ从队列中删除该消息
一样平常reject方式用的较少,除非是消息格式有题目,那就是开发题目了。因此大多数情况下我们必要将消息处置处罚的代码通过try catch机制捕获,消息处置处罚成功时返回ack,处置处罚失败时返回nack.
由于消息回执的处置处罚代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过设置文件设置ACK处置处罚方式,有三种模式:
none
:不处置处罚。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不发起利用
manual
:手动模式。必要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更机动
auto
:主动模式。SpringAMQP利用AOP对我们的消息处置处罚逻辑做了环绕加强,当业务正常实行时则主动返回ack. 当业务出现非常时,根据非常判定返回不同结果:
如果是
业务非常
,会主动返回nack;
如果是
消息处置处罚或校验非常
,主动返回reject;
通过下面的设置可以修改SpringAMQP的ACK处置处罚方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
复制代码
2、失败重试机制
当消费者出现非常后,消息会不停requeue(重入队)到队列,再重新发送给消费者。如果消费者再次实行依然堕落,消息会再次requeue到队列,再次投递,直到消息处置处罚成功为止。
极端情况就是消费者一直无法实行成功,那么消息requeue就会无限循环,导致mq的消息处置处罚飙升,带来不必要的压力:
当然,上述极端情况发生的概率还是非常低的,不外不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现非常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
复制代码
3、失败处置处罚计谋
在之前的测试中,本地测试达到最大重试次数后,消息会被抛弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处置处罚计谋,这个计谋是由MessageRecovery接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,抛弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处置处罚方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放非常消息的队列,后续由人工会合处置处罚。
定义处置处罚失败消息的交换机和队列,定义一个RepublishMessageRecoverer,关联队列和交换机:
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
复制代码
4、业务幂等性
何为幂等性?
幂等
是一个数学概念,用函数表达式来形貌是如许的:f(x) = f(f(x)),比方求绝对值函数。
在步伐开发中,则是指同一个业务,实行一次或多次对业务状态的影响是同等的。比方:
根据id删除数据
查询数据
新增数据
但数据的更新每每不是幂等的,如果重复实行大概造成不一样的后果。比如:
取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增长的情况
退款业务。重复退款对商家而言会有经济丧失。
所以,我们要尽大概制止业务被重复实行。
然而在实际业务场景中,由于不测经常会出现业务被重复实行的情况,比方:
页面卡顿时频仍刷新导致表单重复提交
服务间调用的重试
MQ消息的重复投递
因此,我们必须想办法保证消息处置处罚的幂等性。这里给出两种方案:
唯一消息ID
这个思绪非常简朴:
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
消费者吸取到消息后处置处罚业务,业务处置处罚成功后将消息ID保存到数据库
如果下次又收到相同消息,去数据库查询判定是否存在,存在则为重复消息放弃处置处罚。
我们该如何给消息添加唯一ID呢?
实在很简朴,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
以Jackson的消息转换器为例:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
复制代码
业务状态判定
业务判定就是基于业务本身的逻辑或状态来判定是否是重复的请求或消息,不同的业务场景判定的思绪也不一样。
比方我们当前案例中,处置处罚消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在实行业务时判定订单状态是否是未支付,如果不是则证明订单已经被处置处罚过,无需重复处置处罚。
相比较而言,消息ID的方案必要改造原有的数据库,所以更保举利用业务判定的方案。
5、兜底方案
虽然我们利用各种机制尽大概增长了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?
有没有别的兜底方案,能够确保订单的支付状态同等呢?
实在思想很简朴:既然MQ通知不一定发送到消费者,那就消费者定时去查询相关生产者状态,如许即便MQ通知失败,还可以利用定时使命作为兜底方案,确保状态的最终同等性。
而这就必要利用到延迟消息的技术:
延迟消息
延迟消息是指在消息队列(MQ)体系中允许消息在一定时间后才被消费者消费的消息。这意味着,消息被发送到消息队列后,并不会立刻被消费者处置处罚,而是在指定的延迟时间之后才可用于消费。这种机制允许开发者在设计体系时,能够处置处罚那些必要延迟实行的使命,比方定时使命、延迟通知、过期订单的处置处罚等。
在RabbitMQ中实现延迟消息也有两种方案:
死信交换机+TTL
延迟消息插件
死信交换机+TTL
首先,什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
消费者利用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
消息是一个过期消息,超时无人消费
要投递的队列消息满了,无法投递
那么,什么是死信交换机呢?
如果一个队列中的消息已经成为死信,并且这个队列通过
dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为
死信交换机
(Dead Letter Exchange)。
TTL(Time-To-Live,生存时间):是指消息或队列存在的时长。
那么如何通过死信交换机+TTL实现延迟消息呢?
设想一下如许的场景:
如图,有一组绑定的交换机(ttl.fanout)和队列(ttl.queue)。但是ttl.queue没有消费者监听,而是设定了死信交换机hmall.direct,而队列direct.queue1则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout,RoutingKey为blue,并设置消息的
有效期
为5000毫秒,消息肯定会被投递到ttl.queue之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信,死信被再次投递到死信交换机hmall.direct,并相沿之前的RoutingKey,也就是blue,由于direct.queue1与hmall.direct绑定的key是blue,因此最终消息被成功路由到direct.queue1,如果此时有消费者与direct.queue1绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:
也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了
延迟消息
。
DelayExchange插件
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
插件下载地址:rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ (github.com)
下载好后将插件上传到RabbitMQ的插件目录对应的数据卷,然后实行命令安装插件(docker):
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
在java客户端声明延迟交换机
基于注解方式:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
复制代码
基于@Bean的方式:
package com.itheima.consumer.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
复制代码
发送延迟消息
发送消息时,必须通过x-delay属性设定延迟时间:
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
复制代码
注意:
延迟消息插件内部会维护一个本地数据库表,同时利用Elang Timers功能实现计时。如果消息的延迟时间设置较长,大概会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,
不发起设置延迟时间过长的延迟消息
。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4