MQ高级:RabbitMQ小细节
在之前的学习中,我们只介绍了消息的发送,但是没有思量到异常的情况,今天我们就介绍一些异常情况,和细节的部分。目录
生产者可靠性
生产者重连
生产者确认
MQ可靠性
长期化
Lazy Queue
消费者可靠性
消费者确认机制
失败重试机制
业务幂等性
延迟消息
死信交换机
延迟消息插件
生产者可靠性
生产者重连
有时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
max-attempts: 3 # 最大重试次数 在停止mq服务之后,运行代码,会发现测试失败,由于连接失败。最大重试次数设置的是3,此处就重试了3次再停止。https://i-blog.csdnimg.cn/direct/8b676262e91a4259b2e47cb344ba0b4e.png
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当火线程是被阻塞的,
会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。如果一定要利用,请合理配置等待时长和重试次数,当然也可以思量利用异步线程来实验发送消息的代码。
生产者确认
在开启了生产者确认机制后,在MQ成功收到消息后会返回确认消息ACK给生产者,如果有异常,会返回NACK。
[*]消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
[*]暂时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
[*]长期消息投递到了MQ,并且入队完成长期化,返回ACK,告知投递成功
[*]别的情况都会返回NACK,告知投递失败
https://i-blog.csdnimg.cn/direct/1de6fc8828064f57a61b0f24a1fc5b87.png
spring:
rabbitmq:
publisher-confirm-type: correlated
# 开启publisher confirm机制,并设置confirm类型为correlated
publisher-returns: true
# 开启publisher return机制 这里publisher-confirm-type有三种模式可选:
[*]none:关闭confirm机制
[*]simple:同步阻塞等待MO的回执消息
[*]correlated:MO异步回调方式返回回执消息
但是!生产者确认需要额外的网络和体系资源开销,只管不要利用。
如果一定要利用,无需开启Publisher-Return机制,由于一样平常路由失败是自己业务问题。
对于nack消息可以有限次数重试,依然失败则记录异常消息。
MQ可靠性
办理了生产者可靠性,还需要办理MQ的可靠性。
通过生产者发送的消息被MQ存放到内存中,经过某些特殊情况大概MQ重启后,这部分数据会丢失。并且内存空间是有限的,当消费者故障大概处理太慢时,会导致消息积存,导致MQ阻塞。
长期化
为了办理这个问题,MQ引入了数据长期化,包罗交换机长期化、队列长期化、消息长期化。
前两者只需要在创建的时候设置成Durable(默认)即可。
https://i-blog.csdnimg.cn/direct/dcb5ecba70df400896fa3f7fe463a1e7.png
消息长期化默认是不开启的,要手动开启。
https://i-blog.csdnimg.cn/direct/1336b512029b4c7ba375b2f1f225487e.png
当不开启磁盘长期化,消息会全部存放在内存中。但是发送消息过多,会占满内存。之后多出来的消息会存放到Paged out中,也就是磁盘中。等待内存中的消息被处理完后,会再把磁盘中的消息加载到内存中,再继续处理。
当开启了磁盘长期化,接收到的消息会在内存和磁盘中都存一份。此时处理的消息是从内存中处理。内存会在将要满的时候整理一次,再继续完成消息处理。磁盘则会把全部消息都保存下来。
Lazy Queue
Lazy Queue惰性队列的特征如下:
[*]接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条
[*]消费者要消费消息时才会从磁盘中读取并加载到内存
[*]支持数百万条的消息存储
https://i-blog.csdnimg.cn/direct/771ea885668e45e287d30fc0b62295d7.png
并且惰性队列的性能很高,比之前的几种性能都会好一些。
消费者可靠性
消费者确认机制
当不开启消费者确认机制,生产者投递了一条消息,不管消费者是否处理完了,会马上被RabbitMQ删除,当做已经处理完了。但是如果消费者出现网络波动大概其他异常情况,会导致没有接收到这条消息,生产者这边还会认为消费者已经接收到消息了。告知RabbitMQ自己消息处理状态。处理消息竣事后,应该向RabbitMQ发送一个回执,
回执有三种可选值:
[*]ack:成功处理消息,RabbitMO从队列中删除该消息
[*]nack:消息处理失败,RabbitMO需要再次投递消息
[*]reiect:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息
SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:
[*] none:不处理消息确认。消息投递给消费者后立即ack,消息会立即从MQ删除。这种方式非常不安全,不建议利用。
[*] manual:手动模式。需要在业务代码中手动调用API发送ack或reject。虽然存在业务侵入,但提供了更大的灵活性。
[*] auto:主动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强。当业务正常实验时,主动返回ack。当业务出现异常时,根据异常类型主动返回差别的效果:
1.如果是业务异常,主动返回nack。
2.如果是消息处理或校验异常,主动返回reject。
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto# 可以设置为 none, manual, auto
失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,形成无穷循环。这会导致MQ的消息处理量飙升,给体系带来不须要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时进行本地重试,而不是无穷定地requeue到MQ队列。并且指定最大重试次数。
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true# 开启消费者失败重试
initial-interval: 1000ms# 初始的失败等待时长为1秒
multiplier: 1# 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3# 最大重试次数
stateless: true# true无状态; false有状态。如果业务中包含事务,这里改为false
在开启重试模式后,如果重试次数耗尽且消息依然失败,则需要有MessageRecoverer接口来处理。MessageRecoverer包含三种差别的实现:
[*]RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
[*]ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
[*]RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
当通过末了一种方式重试耗尽后,我们可以额外设置一个队列,比如error.queue,当发送失败的消息进入这个队列后,再通过邮件强提醒如许的机制推送给工作职员,可以有效办理消息发送失败的极端情况。
业务幂等性
在步伐开发中,业务幂等性指的是同一个业务,实验一次大概实验多次对业务的状态是没有影响的。
唯一消息id
方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息:
[*]每一条消息都生成一个唯一的id,与消息一起投递给消费者。
[*]消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
[*]如果下次又收到相同消息,去数据库查询判定是否存在,存在则为重复消息放弃处理。
业务判定
方案二,是结合业务逻辑,基于业务自己做判定。以我们的业务为例:我们要在付出后修改订单状态为已付出,应该在修改订单状态前先查询订单状态,判定状态是否是未付出。只有未付出订单才需要修改,别的状态不做处理。
如何包管付出服务与生意业务服务之间的订单状态同等性?
为确保付出服务与生意业务服务之间的订单状态同等性,我们接纳了以下措施:
[*] 消息关照:
[*]付出服务在用户付出成功后,通过MQ(消息队列)发送消息关照生意业务服务,以完成订单状态的同步。
[*] 消息可靠性策略:
[*]接纳生产者确认机制、消费者确认和消费者失败重试等策略,确保消息的可靠投递和处理。
[*]开启MQ的长期化功能,避免因服务宕机导致消息丢失。
[*] 业务幂等性:
[*]在生意业务服务更新订单状态时进行业务幂等性判定,防止因消息重复消费导致订单状态异常。
如果生意业务服务消息处理失败,有什么兜底方案?
[*]定时任务:
[*]在生意业务服务中设置定时任务,定期查询订单的付出状态。
[*]即使MQ关照失败,定时任务也可以作为兜底方案,确保订单付出状态的最终同等性。
延迟消息
延迟消息是消息队列中的一种重要功能,它允许消息在被发送到消息队列后并不会立即被消费者消费,而是在经过特定的时间延迟后才华被消费者获取和处理。这种特性在很多业务场景中都非常有用,比如订单处理超时、定时提醒等。
比如,用户A下单某商品的末了一件,订单确认后,迟迟不付出,但是又占用着这个名额。比及很久以后取消这个订单,此时想买的人没买到,商家没有卖掉,而这个人又没有买。这种情况用延迟消息就能很好的办理这个问题。当用户下单商品后,会设置一个延迟消息,假设30分钟内没有下单,这个延迟消息就会被发送到MQ,提醒数据库这个人订单超时了,强制让这个人取消订单。
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
[*]消费者利用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
[*]消息是一个逾期消息(到达了队列或消息自己设置的逾期时间),超时无人消费
[*]要投递的队列消息堆积满了,最早的消息可能成为死信
死信消息,经过死信交换机可以酿成延迟消息。
https://i-blog.csdnimg.cn/direct/5463418696524d608d9ac5bc6d93c808.png
当publisher发布一个消息后,通过交换机进入队列。通过手动设置一个逾期时间,让消息酿成死信消息,此时消息会主动进入通过dead-letter-exchang设置的交换机dlx.direct,再一步步的进入到consumer。
延迟消息插件
RabbitMO的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机
当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
@RabbitListener(bindings=@QueueBinding(
value=@Queue(name="delay.queue", durable="true"),
exchange=@Exchange(name="delay.direct",delayed="true"),
key="delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}",msg);
} @Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct")
.delayed()//设置delay的属性为true
.durable(true)//持久化
.build();
} 发送消息时需要通过消息头x-delay来设置逾期时间:
@Test
void testPublisherDelayMessage() {
//1.创建消息
String message = "hello, delayed message";
//2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]