IT评测·应用市场-qidao123.com技术社区
标题:
RabbitMQ 的 Ack 机制是什么?怎么合理使用它?
[打印本页]
作者:
熊熊出没
时间:
2025-4-9 17:49
标题:
RabbitMQ 的 Ack 机制是什么?怎么合理使用它?
本文是博主在记录使用 RabbitMQ 在执行业务时遇到的问题和办理办法,因此查阅了相干资料并做了以下记载,记录了 Ack 的机制和使用要点,以及所带来的危害。
1、消息确认 ACK 机制
在大型项目中的开发中必然会经历到数据的处置惩罚,会使用到消息中间件,而 RabbitMQ 作为主流消息中间件被应用于各大项目。RabbitMQ 焦点架构包罗互换机(Exchange)和队列(Queue)两大组件。互换机负责将消息按预设路由规则分发至对应队列,队列则作为消息暂存容器实现生产者与消耗者的解耦。
在消息处置惩罚流程中,当消耗者在处置惩罚队列中的消息时发生非常,极有可能导致当前处置惩罚的消息未完成消耗确认,进而造成数据丢失。为办理这一问题,RabbitMQ提供了消息确认(ACK)机制。该机制通过要求消耗者在完成消息处置惩罚后发送显式确认,确保纵然发生非常情况,消息仍能保存在队列中举行重新投递,从而构建起可靠的消息通报体系。
ACK 机制是消息可靠性的保障!
那么具体的ACK的消息确认机制是什么?
ACK 机制是消耗者从 MQ 收到消息并处置惩罚完成后反馈给 MQ ,MQ 收到反馈后才将此消息从队列中删除。
上面的表明通报了几种信息,下面开始分析。
触发时机是在消耗者消耗消息后。
形式是消耗者对 RabbitMQ 的一个反馈举动。
ACK 机制乐成会导致被消耗的消息从队枚举行清除。
不乐成则不会举行清除消息。
上面的分析可以看出 ACK 机制能保障了消息通报的可靠性。
2、机制工作原理
消耗者从 MQ 队列中获取消息,当收到消息时会使用设定好的业务逻辑开始处置惩罚消息,转换或存储。
如果消耗者乐成处置惩罚消息,会向 MQ 发送一个 ACK 信号,告知消息已被乐成消耗。
MQ 收到 ACK 信号反馈后,会将该消息从队列中删除。
如果消耗者在处置惩罚消息时出现非常(如网络不稳固、服务器非常等),无法发送 ACK 信号,MQ 会认为该消息未被正常消耗,将其重新放回队列。
在集群环境下,MQ 会将该消息推送给其他在线的消耗者,在消耗者都失败后或没有消耗者也会视为未被正常消耗。
这种机制包管了在消耗者服务端故障的时间,不丢失任何消息和任务。消息永远不会从 MQ 中删除,只有当消耗者正确发送 ACK 反馈,MQ 确认收到后,消息才会从队列的中删除。
3、手动确认与主动确认
消息的 ACK 确认机制默认是打开的。有手动确认和主动确认两种。
手动确认:消耗者在调用 channel.basicConsume() 消耗消息时,设置 autoAck=false,进入手动确认模式。此时,消耗者需要显式调用 channel.basicAck() 方法来确认消息。如果处置惩罚失败,可以调用 channel.basicNack() 或 channel.basicReject() 方法拒绝消息,RabbitMQ 会将消息重新入队。
主动确认:当 autoAck=true 时,RabbitMQ 会主动将发送出去的消息置为确认状态,并从队列中删除。
主动确认方式不推荐,因为它无法包管消息真正被消耗者处置惩罚乐成,可能会导致消息丢失。
参数说明
deliveryTag:消息的唯一标识,用于确认具体的消息。
multiple:当设置为 true 时,表示确认当前 deliveryTag 及之前所有未确认的消息;设置为 false 时,仅确认当前的 deliveryTag。
requeue:在拒绝消息时,如果设置为 true,消息会重新入队;如果设置为 false,消息可能会被丢弃或进入死信队列。
4、机制的注意事项
如果消耗者忘记发送 ACK 信号,那么后果很严峻。当消耗者退出时间,消息会被一直重新分发。然后 MQ会占用越来越多的内存,从而引发 内存走漏 ,由于 MQ 长时间运行,因此这个内存走漏是致命的。
为克制内存走漏,可以在开发中举行非常捕获,确保消耗者程序正常执行。同时,可以通过配置 重试次数 来防止消息无穷重试。
在我的另一片文章中有具体的代码示例,是关于怎样使用 SpringBoot 实现 RabbitMQ 的示例代码,里面有这么几个代码片断:
@Bean
public SimpleMessageListenerContainer contactSyncContainer() {
if (!rabbitMqConfig.isUse()) {
return null;
}
log.info("contact begin");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
//设置一个队列
container.setQueueNames(rabbitMqConfig.getQueueOnlineInfo());
container.setMessageListener(infoConsumer);
log.info("contact end");
return container;
}
复制代码
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 这段代码是实现手动确认消息。
AcknowledgeMode 这个枚举类中存在三种方式:NONE、MANUAL 和 AUTO。
container 中具有参数 prefetchCount ,这个参数是计算在 ACK 机制前可以允许取多少条消息。
infoConsumer 就是我们定义的消耗者,接收消息并反馈 ACK。
channel.basicAck(deliveryTag, false); 接收消息并返回 ACK 的方法,删除队列消息。
channel.basicReject(deliveryTag, false); 接收消息失败,不删除队列消息。
综上所述:RabbitMQ 的 ACK 机制通过确认消息的处置惩罚状态,确保消息的可靠通报,克制消息丢失,是 RabbitMQ 包管消息可靠性的关键机制之一。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 IT评测·应用市场-qidao123.com技术社区 (https://dis.qidao123.com/)
Powered by Discuz! X3.4