ToB企服应用市场:ToB评测及商务社交产业平台
标题:
RabbitMQ 高级特性——重试机制
[打印本页]
作者:
南七星之家
时间:
2024-10-23 00:01
标题:
RabbitMQ 高级特性——重试机制
前言
前面我们学习了 RabbitMQ 包管消息通报可靠性的机制——消息确认、长期化和发送发确认,那么对于消息确认和发送方确认,如果吸收方没有收到消息,那么这时该怎么做呢?这就要提到 RabbitMQ 的重试机制了,当发送方发送的消息没有成功到达吸收方,那么发送方就会使用重试机制来向吸收方重新发送消息,如果吸收方由于步伐逻辑错误引起的,那么不管发送方重新发送多少次,吸收方都不能正确的处置惩罚这个消息,那么发送方也不能无休止的向吸收方发送消息,当发送的次数超过某个数目的时候,那么这个消息就会被标记为死信,然后被处置惩罚掉。那么这篇文章将详细的说明一下 RabbitMQ 的重试机制。
重试机制
RabbitMQ的重试机制是一种强大的功能,它允许在消息处置惩罚失败时自动重试,从而提高体系的可靠性和稳定性。
重试机制的触发条件
消息发送失败:当消息发送到RabbitMQ服务器时,如果由于网络题目、认证失败或其他缘故原由导致发送失败,RabbitMQ会尝试重试发送。
消息消费失败:当消费者从队列中获取消息并尝试处置惩罚时,如果由于某种缘故原由(如业务逻辑错误、体系异常等)导致处置惩罚失败,RabbitMQ会根据预设的重试策略进行重试。
我们都知道 Spring AMQP 提供了四种确认策略:NONE、MANUAL、AUTO。当确认策略为 NONE 的时候队列不管这个消息是否被成功处置惩罚都会将这个消息从队列中删除,而确认策略为 MANUAL 的时候则更多的靠步伐本身处置惩罚,那么重试机制更多的使用在确认策略为 AUTO 的环境下。
配置文件设置
spring:
rabbitmq:
addresses: amqp://admin:admin@x.x.x.x:5672/test
listener:
simple:
acknowledge-mode: auto # 确认模式
retry:
enabled: true # 开始重试机制
initial-interval: 5000ms # 每次重试的间隔时间
max-attempts: 5 # 最大重试次数
复制代码
生命交换机、队列和绑定关系
public class Constants {
public static final String RETRY_EXCHANGE = "retry.exchange";
public static final String RETRY_QUEUE = "retry.queue";
}
复制代码
@Configuration
public class RabbitConfig {
@Bean("retryExchange")
public DirectExchange retryExchange() {
return ExchangeBuilder.directExchange(Constants.RETRY_EXCHANGE).durable(true).build();
}
@Bean("retryQueue")
public Queue retryQueue() {
return QueueBuilder.durable(Constants.RETRY_QUEUE).build();
}
@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryQueue") Queue queue, @Qualifier("retryExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("retry");
}
}
复制代码
生产者发送消息
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/retry")
public String retry() {
rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"retry","rabbitmq retry");
return "消息发送成功";
}
}
复制代码
消费消息
@Component
public class RetryListener {
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void listener(Message message, Channel channel) {
System.out.println("接收到消息:" + message + channel);
//制造出异常使得消费者不能正常处理消息
int ret = 3/0;
System.out.println("消息处理成功");
}
}
复制代码
我们先将重试机制的配置给注释掉,然后运行之后会发现消费者会重复读取一条消息而且不停抛异常,这是由于:
由于监听器方法抛出了异常而且没有被捕获,Spring AMQP将不会发送ACK,RabbitMQ将不停重新投递这条消息给消费者,直到它终极被确认、被拒绝(并可能发送到死信队列)或队列被删除。
那么我们配置重试机制的重试次数之后再看看什么效果:
可以发现,当配置了重试次数之后,包括第一次投递消息,队列一共给消费者发送了五次消息,而且这个五次的消息的 deliveryTag 是一样的,也就说明是队列的重试机制投递的,五次之后队列便不再向该消费者发送该消息,这样就制止了因消费者的步伐逻辑题目而导致队列无休止的向消费者发送消息的题目,而且还包管了消息通报的可靠性。
那么我们将消息简直认策略更换为 MANUAL 手动确认的方式,然后看看这个重试机制的配置会不会见效:
@RabbitListener(queues = Constants.RETRY_QUEUE)
public void listener(Message message, Channel channel) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("接收到消息:" + message + channel);
//制造出异常使得消费者不能正常处理消息
int ret = 3/0;
System.out.println("消息处理成功");
channel.basicAck(deliveryTag,false);
} catch (Exception e) {
Thread.sleep(1000);
//第三个参数requeue表示被拒绝的消息是否重新进入队列,true表示重新进入队列并且重新投递这个消息,否则直接丢弃
channel.basicNack(deliveryTag,false,true);
}
}
复制代码
发现当确认策略为 manual 手动确认的时候,我们的重试机制的配置是不见效的,而且可以发现我们的消息的 deliveryTag 是不停递增的,也就是说之前拒绝的消息每次都会重新入队列然后重新投递,跟重试机制是不一样的。
而如果我们 basicNack 方法的第三个参数 requeue 参数的值为 fasle 时候,那么这个被拒绝的消息就会被丢弃或者投递到的死信队列中。
使用重试机制时需要注意:
自动确认模式下:步伐逻辑出现异常,即使多次重试还是失败,消息也会被自动确认,那么这条消息就会丢失。
在手动确认模式下,消费者需要显式地发送ACK(Acknowledgment)或NACK来告诉RabbitMQ消息是否已被成功处置惩罚。如果发生异常且没有发送ACK,消息将保持为unacked状态,这允许RabbitMQ重新将消息发送给其他消费者(如果配置了多个消费者)或根据队列的其他配置(如死信队列)来处置惩罚未确认的消息。然而,如果全部消费者都无法处置惩罚该消息,它将导致消息在队列中积压,除非配置了得当的超时机制或死信队列来处置惩罚这些长时间未确认的消息。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4