MQ 消息发送可靠性包管 —— 整合 Spring Retry 重试框架 + 赔偿发送方案 ...

打印 上一主题 下一主题

主题 954|帖子 954|积分 2862

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
MQ 消息发送可靠性包管 —— 整合 Spring Retry 重试框架 + 赔偿发送方案

RocketMQ Starter 本身提供重试机制较为简单,无法指定较复杂的重试策略
复杂的重试策略 RocketMQ 无法直接配置:

  • 隔断和延迟策略: RocketMQ 本身的重试机制没有内建对重试隔断和延迟时间的高级控制。例如,你不能简单地配置每次重试的延迟时间和隔断时间,或者实现指数级回退的策略。全部的重试都是在一个固定的时间内进行的,缺少对每次重试隔断的控制。
  • 定制化重试规则: 如果你想要一个更复杂的重试规则(如重试隔断时间渐渐增加、利用差别的隔断策略等),RocketMQ 的默认重试机制就比较难以满意。这种情况下,你需要自界说重试逻辑,比如利用 Spring Retry 来实现更复杂的策略。
Spring Retry 是一个用于为应用步伐提供主动重试功能的框架,特殊适用于实行大概会因暂时性题目失败的操作(如网络请求、数据库操作、消息队列操作等)。通过配置,Spring Retry 可以或许在失败时主动重试指定次数,且每次重试可以配置差别的延迟和隔断。
为什么利用 Spring Retry:

Spring Retry 的上风在于它可以或许提供比 RocketMQ 更细粒度的控制。你可以利用 Spring Retry 来设置如下复杂的策略:


  • 自界说重试次数: 你可以机动设置最大重试次数。
  • 延迟策略: 可以配置差别的延迟时间,支持指数回退、固定隔断、随机延迟等。
  • 重试隔断的乘法: 支持每次重试隔断成倍增加的策略(如 2x、3x 等)。
  • 失败回调: 可以界说失败后实行的回调,如写入数据库等操作。
因此,RocketMQ 的内建重试机制在某些特定场景下,尤其是需要复杂隔断、延迟或其他高级控制时,大概不如 Spring Retry 这么机动。在这种情况下,结合 Spring Retry 进行二次封装,可以或许提供更强大、更机动的重试控制。
1.添加依靠

  1.      <!-- Spring Retry 重试框架  -->
  2.         <dependency>
  3.             <groupId>org.springframework.retry</groupId>
  4.             <artifactId>spring-retry</artifactId>
  5.         </dependency>
  6.         
  7.         <!-- AOP 切面(Spring Retry 重试框架需要) -->
  8.         <dependency>
  9.             <groupId>org.springframework.boot</groupId>
  10.             <artifactId>spring-boot-starter-aop</artifactId>
  11.         </dependency>
复制代码
2.启用retry

启动类上加上@EnableRetry
3.重试配置

Retry-yaml

  1. retry:
  2.   max-attempts: 3 # 最大重试次数
  3.   init-interval: 1000 # 初始延迟时间,单位 ms
  4.   multiplier: 2 # 每次重试间隔加倍(每次乘以 2)
复制代码
RetryProperties

  1. @ConfigurationProperties(prefix = RetryProperties.PREFIX)
  2. @Component
  3. @Data
  4. public class RetryProperties {
  5.     public static final String PREFIX = "retry";
  6.     /**
  7.      * 最大重试次数
  8.      */
  9.     private Integer maxAttempts = 3;
  10.     /**
  11.      * 初始间隔时间,单位 ms
  12.      */
  13.     private Integer initInterval = 1000;
  14.     /**
  15.      * 乘积(每次乘以 2)
  16.      */
  17.     private Double multiplier = 2.0;
  18. }
复制代码
RetryTemplate

  1. @Configuration
  2. public class RetryConfig {
  3.     @Resource
  4.     private RetryProperties retryProperties;
  5.     @Bean
  6.     public RetryTemplate retryTemplate() {
  7.         RetryTemplate retryTemplate = new RetryTemplate();
  8.         // 定义重试策略(最多重试 3 次)
  9.         SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  10.         retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts()); // 最大重试次数
  11.         // 定义间隔策略
  12.         ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
  13.         backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔 2000ms
  14.         backOffPolicy.setMultiplier(retryProperties.getMultiplier());       // 每次乘以 2
  15.         retryTemplate.setRetryPolicy(retryPolicy);
  16.         retryTemplate.setBackOffPolicy(backOffPolicy);
  17.         return retryTemplate;
  18.     }
  19. }
复制代码
4.配置线程池

  1. @Configuration
  2. public class ThreadPoolConfig {
  3.     @Bean(name = "taskExecutor")
  4.     public Executor taskExecutor() {
  5.         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  6.         // 核心线程数
  7.         executor.setCorePoolSize(10);
  8.         // 最大线程数
  9.         executor.setMaxPoolSize(50);
  10.         // 队列容量
  11.         executor.setQueueCapacity(200);
  12.         // 线程活跃时间(秒)
  13.         executor.setKeepAliveSeconds(30);
  14.         // 线程名前缀
  15.         executor.setThreadNamePrefix("NoteExecutor-");
  16.         // 拒绝策略:由调用线程处理(一般为主线程)
  17.         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  18.         // 等待所有任务结束后再关闭线程池
  19.         executor.setWaitForTasksToCompleteOnShutdown(true);
  20.         // 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
  21.         executor.setAwaitTerminationSeconds(60);
  22.         executor.initialize();
  23.         return executor;
  24.     }
  25. }
复制代码
5.配置mq异步发送工具类

  1. @Component
  2. @Slf4j
  3. public class SendMqRetryHelper {
  4.     @Resource
  5.     private RocketMQTemplate rocketMQTemplate;
  6.     @Resource
  7.     private RetryTemplate retryTemplate;
  8.     @Resource(name = "taskExecutor")
  9.     private ThreadPoolTaskExecutor threadPoolTaskExecutor;
  10.     /**
  11.      * 异步发送 MQ
  12.      * @param topic
  13.      */
  14.     public void asyncSend(String topic, String body) {
  15.         log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);
  16.         // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
  17.         Message<String> message = MessageBuilder.withPayload(body)
  18.                 .build();
  19.         // 异步发送 MQ 消息,提升接口响应速度
  20.         rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
  21.             @Override
  22.             public void onSuccess(SendResult sendResult) {
  23.                 log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
  24.             }
  25.             @Override
  26.             public void onException(Throwable throwable) {
  27.                 log.error("==> 【评论发布】MQ 发送异常: ", throwable);
  28.                 handleRetry(topic, message);
  29.             }
  30.         });
  31.     }
  32.     /**
  33.      * 重试处理
  34.      * @param topic
  35.      * @param message
  36.      */
  37.     private void handleRetry(String topic, Message<String> message) {
  38.         // 异步处理
  39.         threadPoolTaskExecutor.submit(() -> {
  40.             try {
  41.                 // 通过 retryTemplate 执行重试
  42.                 retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
  43.                     log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
  44.                     // 同步发送 MQ
  45.                     rocketMQTemplate.syncSend(topic, message);
  46.                     return null;
  47.                 });
  48.             } catch (Exception e) {
  49.                 // 多次重试失败,进入兜底方案
  50.                 fallback(e, topic, message.getPayload());
  51.             }
  52.         });
  53.     }
  54.     /**
  55.      * 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
  56.      */
  57.     private void fallback(Exception e, String topic, String bodyJson) {
  58.         log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);
  59.         // TODO:
  60.     }
  61. }
复制代码
首先,RocketMQ 会异步发送消息并进行重试(取决于你的配置)。
如果 RocketMQ 异步发送失败而且重试 3 次后依然失败,onException 方法被调用。
在 onException 中,handleRetry 方法会被触发,该方法会调用 retryTemplate.execute(...) 来进行同步重试(取决于你的配置)。
如果全部重试失败,则会调用 fallback 方法进行兜底处置惩罚。
通常的话,这里mq的重试就配置成0了因为我们已经自己封装了重试机制
6.服务层利用

  1. // 发送 MQ (包含重试机制)
  2.         sendMqRetryHelper.asyncSend(MQConstants.TOPIC, JsonUtils.toJsonString(MqDTO));
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表