题目:在生产环境中由于一些不明缘故起因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,必要手动处置惩罚和规复。于是,我们开始思考,怎样才华举行 RabbitMQ 的消息可靠投递呢? 特别是在如许比力极度的环境,RabbitMQ 集群不可用的时间,无法投递的消息该如那边理呢?
办理方案就是缓存,比如当生产者发送消息到互换机时,但互换机不存在,我们应该将消息放入缓存中;大概互换机存在,队列不存在了,当互换机发送不到队列中也应该将消息放入缓存。然后在缓存中设置一个定时使命,对没有发送乐成的消息重新举行投递。如许就制止了消息丢失的环境。
回调接口——消息确认
接下来我们通过代码实现以上机制,架构图如下所示:我们要办理题目就是如果图中的互换机大概队列出现题目,应该将消息举行缓存处置惩罚,防止消息丢失,具体的实现就是通过生产者的回调接口ConfirmCallback来实现。
1️⃣ 修改设置文件
在设置文件当中必要添加设置表现开开导布消息乐成到互换器后会触发回调方法
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息乐成到互换器后会触发回调方法
SIMPLE:经测试有两种结果,其一结果和 CORRELATED 值一样会触发回调方法;其二在发布消息乐成后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法期待 broker 节点返回发送结果,根据返回结果来判断下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
👂这个条理是在互换机条理做的工作,包管消息被准确发送到了互换机。
通过实现一个RabbitTemplate.ConfirmCallback接口,将接口注入到RabbitTemplate中,当消息发送到互换机后就会触发这个回调。如果失败了可以思量进入死信队列大概重新发送。但是做不到队列层面的工作。- /**
- * 交换机确认回调方法
- *
- * @param correlationData 保存回调消息的ID以及相关信息
- * @param ack 表示交换机是否收到消息(true表示收到)
- * @param cause 表示消息接收失败的原因(收到消息为null)
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {
- log.info("交换机已经收到ID为:{}的消息", id);
- } else {
- log.info("交换机还未收到ID为:{}的消息,原因为:{}", id, cause);
- }
- }
复制代码 回调接口——消息回退
我们知道在仅开启了生产者确认机制的环境下,互换机吸收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接抛弃,但此时生产者是不知道消息被抛弃这个变乱的。那么怎样让无法被路由的消息可以或许让生产者感知并做出处置惩罚呢
我们可以通过设置 mandatory 参数可以在当消息通报过程中不可达目标地时将消息返回给生产者。
在设置文件当中必要添加设置表现开启消息路由失败后会触发消息回退回调方法
👂通过实现 RabbitTemplate.ReturnsCallback 接口- @Override
- public void returnedMessage(ReturnedMessage returned) {
- log.info("消息{}:,被交换机{}退回,退回原因:{},路由key:{}",
- new String(returned.getMessage().getBody()),
- returned.getExchange(),
- returned.getReplyText(),
- returned.getRoutingKey());
- }
复制代码 备份互换机
前面我们提到互换机如果出现了题目担当不到消息,我们就让互换机举行消息确认,让生产者重新发消息。如果队列出题目收不到消息,我们就举行消息回退,也是让生产者重新发消息。别的,尚有一种办理方法就是给互换机添加一个备份互换机,有了备份互换机之后可以不消讲消息回退给生产者,而是将无法投递的消息交给备份互换机,让备份互换机通过自己的路由以及自己的队列发送给斲丧者,如许也能到达一个消息不丢失的目标。而且这种方式还能创建一个报警队列,用独立的斲丧者举行监测和报警。
当回调函数和备用互换机一起使用的时间,备份互换机优先级高。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!qidao123.com:ToB企服之家,中国第一个企服评测及软件市场,开放入驻,技术点评得现金 |