本文是博主在记录使用 RabbitMQ 在执行业务时遇到的问题和办理办法,因此查阅了相干资料并做了以下记载,记录了 Ack 的机制和使用要点,以及所带来的危害。
1、消息确认 ACK 机制
在大型项目中的开发中必然会经历到数据的处置惩罚,会使用到消息中间件,而 RabbitMQ 作为主流消息中间件被应用于各大项目。RabbitMQ 焦点架构包罗互换机(Exchange)和队列(Queue)两大组件。互换机负责将消息按预设路由规则分发至对应队列,队列则作为消息暂存容器实现生产者与消耗者的解耦。
在消息处置惩罚流程中,当消耗者在处置惩罚队列中的消息时发生非常,极有可能导致当前处置惩罚的消息未完成消耗确认,进而造成数据丢失。为办理这一问题,RabbitMQ提供了消息确认(ACK)机制。该机制通过要求消耗者在完成消息处置惩罚后发送显式确认,确保纵然发生非常情况,消息仍能保存在队列中举行重新投递,从而构建起可靠的消息通报体系。
ACK 机制是消息可靠性的保障!
那么具体的ACK的消息确认机制是什么?
ACK 机制是消耗者从 MQ 收到消息并处置惩罚完成后反馈给 MQ ,MQ 收到反馈后才将此消息从队列中删除。
上面的表明通报了几种信息,下面开始分析。
- 触发时机是在消耗者消耗消息后。
- 形式是消耗者对 RabbitMQ 的一个反馈举动。
- ACK 机制乐成会导致被消耗的消息从队枚举行清除。
- 不乐成则不会举行清除消息。
上面的分析可以看出 ACK 机制能保障了消息通报的可靠性。
2、机制工作原理
- 消耗者从 MQ 队列中获取消息,当收到消息时会使用设定好的业务逻辑开始处置惩罚消息,转换或存储。
- 如果消耗者乐成处置惩罚消息,会向 MQ 发送一个 ACK 信号,告知消息已被乐成消耗。
- MQ 收到 ACK 信号反馈后,会将该消息从队列中删除。
- 如果消耗者在处置惩罚消息时出现非常(如网络不稳固、服务器非常等),无法发送 ACK 信号,MQ 会认为该消息未被正常消耗,将其重新放回队列。
- 在集群环境下,MQ 会将该消息推送给其他在线的消耗者,在消耗者都失败后或没有消耗者也会视为未被正常消耗。
这种机制包管了在消耗者服务端故障的时间,不丢失任何消息和任务。消息永远不会从 MQ 中删除,只有当消耗者正确发送 ACK 反馈,MQ 确认收到后,消息才会从队列的中删除。
3、手动确认与主动确认
消息的 ACK 确认机制默认是打开的。有手动确认和主动确认两种。
- 手动确认:消耗者在调用 channel.basicConsume() 消耗消息时,设置 autoAck=false,进入手动确认模式。此时,消耗者需要显式调用 channel.basicAck() 方法来确认消息。如果处置惩罚失败,可以调用 channel.basicNack() 或 channel.basicReject() 方法拒绝消息,RabbitMQ 会将消息重新入队。
- 主动确认:当 autoAck=true 时,RabbitMQ 会主动将发送出去的消息置为确认状态,并从队列中删除。
主动确认方式不推荐,因为它无法包管消息真正被消耗者处置惩罚乐成,可能会导致消息丢失。
参数说明
- deliveryTag:消息的唯一标识,用于确认具体的消息。
- multiple:当设置为 true 时,表示确认当前 deliveryTag 及之前所有未确认的消息;设置为 false 时,仅确认当前的 deliveryTag。
- requeue:在拒绝消息时,如果设置为 true,消息会重新入队;如果设置为 false,消息可能会被丢弃或进入死信队列。
4、机制的注意事项
如果消耗者忘记发送 ACK 信号,那么后果很严峻。当消耗者退出时间,消息会被一直重新分发。然后 MQ会占用越来越多的内存,从而引发 内存走漏 ,由于 MQ 长时间运行,因此这个内存走漏是致命的。
为克制内存走漏,可以在开发中举行非常捕获,确保消耗者程序正常执行。同时,可以通过配置 重试次数 来防止消息无穷重试。
在我的另一片文章中有具体的代码示例,是关于怎样使用 SpringBoot 实现 RabbitMQ 的示例代码,里面有这么几个代码片断:
- @Bean
- public SimpleMessageListenerContainer contactSyncContainer() {
- if (!rabbitMqConfig.isUse()) {
- return null;
- }
- log.info("contact begin");
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
- container.setConcurrentConsumers(1);
- container.setMaxConcurrentConsumers(1);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
- //设置一个队列
- container.setQueueNames(rabbitMqConfig.getQueueOnlineInfo());
- container.setMessageListener(infoConsumer);
- log.info("contact end");
- return container;
- }
复制代码
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 这段代码是实现手动确认消息。
- AcknowledgeMode 这个枚举类中存在三种方式:NONE、MANUAL 和 AUTO。
- container 中具有参数 prefetchCount ,这个参数是计算在 ACK 机制前可以允许取多少条消息。
- infoConsumer 就是我们定义的消耗者,接收消息并反馈 ACK。
- channel.basicAck(deliveryTag, false); 接收消息并返回 ACK 的方法,删除队列消息。
- channel.basicReject(deliveryTag, false); 接收消息失败,不删除队列消息。
综上所述:RabbitMQ 的 ACK 机制通过确认消息的处置惩罚状态,确保消息的可靠通报,克制消息丢失,是 RabbitMQ 包管消息可靠性的关键机制之一。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |