马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
×
在 Spring Boot 集成 RabbitMQ 时,可以通过以下几种方式让 RabbitMQ 保存服务断开重连,以保证高可用:
配置自动重连
- application.properties 配置 :在 Spring Boot 的配置文件 application.properties 中,可以设置 RabbitMQ 的毗连工厂相干参数来开启自动重连功能。
- spring.rabbitmq.listener.simple.recovery-interval:设置自动重连的时间隔断,在指定的时间隔断后会尝试重新建立毗连。 spring.rabbitmq.connection-timeout:设置毗连超时时间,单位为毫秒。如果在指定的时间内无法完成毗连,就会以为毗连失败,之后会根据其他重连机制进行尝试。
- 代码配置 :可以在配置类中通过编程的方式配置毗连工厂的相干参数来实现自动重连。
- @Bean
- public ConnectionFactory rabbitConnectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- connectionFactory.setConnectionTimeout(5000);
- connectionFactory.setRequestedHeartbeat(20);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- connectionFactory.setTopologyRecoveryEnabled(true);
- return connectionFactory;
- }
复制代码 此中,setAutomaticRecoveryEnabled(true) 启用自动重连功能,setTopologyRecoveryEnabled(true) 表示在重连时会自动恢复队列、交换器等拓扑结构。
利用重试机制
- @Retryable 注解 :在发送消息的方法上添加@Retryable注解,当发送消息失败时,会根据注解的配置进行重试。
- @Retryable(value = {AmqpException.class}, maxAttempts = 3, backoff = @Backoff(delay = 3000))
- public void sendMessage(String message) {
- rabbitTemplate.convertAndSend("exchange", "routingKey", message);
- }
复制代码 这里maxAttempts指定了最大重试次数为 3 次,backoff指定了重试的延迟时间为 3000 毫秒。
- RetryTemplate 配置 :可以通过配置RetryTemplate来自定义重试计谋,比方设置重试的隔断、重试的次数等,然后将其注入到消息发送相干的类中利用。
- @Bean
- public RetryTemplate retryTemplate() {
- RetryTemplate retryTemplate = new RetryTemplate();
- FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
- fixedBackOffPolicy.setBackOffPeriod(3000);
- retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
- SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
- retryPolicy.setMaxAttempts(3);
- retryTemplate.setRetryPolicy(retryPolicy);
- return retryTemplate;
- }
复制代码 在发送消息的方法中利用retryTemplate.execute来实验发送消息的操作。
集群模式
- 搭建 RabbitMQ 集群 :将多个 RabbitMQ 服务器组成一个集群,当一个节点出现故障时,客户端可以自动毗连到其他可用的节点上。在 Spring Boot 中,可以通过配置多个 RabbitMQ 节点的地址来实现与集群的毗连。
- @Bean
- public ConnectionFactory rabbitConnectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- List<String> addresses = new ArrayList<>();
- addresses.add("amqp://guest:guest@localhost:5672");
- addresses.add("amqp://guest:guest@localhost:5673");
- addresses.add("amqp://guest:guest@localhost:5674");
- connectionFactory.setAddresses(addresses);
- connectionFactory.setUsername("guest");
- connectionFactory.setPassword("guest");
- return connectionFactory;
- }
复制代码 这样,当毗连到某个节点失败时,会自动尝试毗连其他节点。
消息持久化
- 消息持久化 :将消息设置为持久化存储,这样在 RabbitMQ 服务重启后,消息不会丢失。可以通过在发送消息时设置消息的持久化标志来实现。
- rabbitTemplate.convertAndSend("exchange", "routingKey", message, messagePostProcessor -> {
- messagePostProcessor.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
- return messagePostProcessor;
- });
复制代码 同时,还需要将队列设置为持久化,在声明队列时指定durable参数为true。- Queue queue = new Queue("queueName", true);
复制代码 消费者确认机制
- 手动确认 :在消息消费完成后,手动发送确认消息,这样在服务断开重连后,RabbitMQ 会重新发送未确认的消息,保证消息的可靠性。
- @RabbitListener(queues = "queueName")
- public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
- try {
- // 消息处理逻辑
- channel.basicAck(tag, false);
- } catch (Exception e) {
- channel.basicNack(tag, false, true);
- }
- }
复制代码 如果消息处理乐成,则发送basicAck确认消息;如果处理失败,则发送basicNack拒绝消息,并将消息重新入队。
监控 和告警
- 监控
工具 :利用监控 工具如 Prometheus 和 Grafana 等来监控 RabbitMQ 的运行状态,包括毗连数、队列长度、消息速率等。当出现异常情况时,实时发现并进行处理。
- 告警机制 :配置告警规则,当监控指标超过设定的阈值时,触发告警通知,实时通知运维人员进行干预,确保 RabbitMQ 的稳固运行。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
|