马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
MQ 消息发送可靠性包管 —— 整合 Spring Retry 重试框架 + 赔偿发送方案
RocketMQ Starter 本身提供重试机制较为简单,无法指定较复杂的重试策略
复杂的重试策略 RocketMQ 无法直接配置:
- 隔断和延迟策略: RocketMQ 本身的重试机制没有内建对重试隔断和延迟时间的高级控制。例如,你不能简单地配置每次重试的延迟时间和隔断时间,或者实现指数级回退的策略。全部的重试都是在一个固定的时间内进行的,缺少对每次重试隔断的控制。
- 定制化重试规则: 如果你想要一个更复杂的重试规则(如重试隔断时间渐渐增加、利用差别的隔断策略等),RocketMQ 的默认重试机制就比较难以满意。这种情况下,你需要自界说重试逻辑,比如利用 Spring Retry 来实现更复杂的策略。
Spring Retry 是一个用于为应用步伐提供主动重试功能的框架,特殊适用于实行大概会因暂时性题目失败的操作(如网络请求、数据库操作、消息队列操作等)。通过配置,Spring Retry 可以或许在失败时主动重试指定次数,且每次重试可以配置差别的延迟和隔断。
为什么利用 Spring Retry:
Spring Retry 的上风在于它可以或许提供比 RocketMQ 更细粒度的控制。你可以利用 Spring Retry 来设置如下复杂的策略:
- 自界说重试次数: 你可以机动设置最大重试次数。
- 延迟策略: 可以配置差别的延迟时间,支持指数回退、固定隔断、随机延迟等。
- 重试隔断的乘法: 支持每次重试隔断成倍增加的策略(如 2x、3x 等)。
- 失败回调: 可以界说失败后实行的回调,如写入数据库等操作。
因此,RocketMQ 的内建重试机制在某些特定场景下,尤其是需要复杂隔断、延迟或其他高级控制时,大概不如 Spring Retry 这么机动。在这种情况下,结合 Spring Retry 进行二次封装,可以或许提供更强大、更机动的重试控制。
1.添加依靠
- <!-- Spring Retry 重试框架 -->
- <dependency>
- <groupId>org.springframework.retry</groupId>
- <artifactId>spring-retry</artifactId>
- </dependency>
-
- <!-- AOP 切面(Spring Retry 重试框架需要) -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
- </dependency>
复制代码 2.启用retry
启动类上加上@EnableRetry
3.重试配置
Retry-yaml
- retry:
- max-attempts: 3 # 最大重试次数
- init-interval: 1000 # 初始延迟时间,单位 ms
- multiplier: 2 # 每次重试间隔加倍(每次乘以 2)
复制代码 RetryProperties
- @ConfigurationProperties(prefix = RetryProperties.PREFIX)
- @Component
- @Data
- public class RetryProperties {
- public static final String PREFIX = "retry";
- /**
- * 最大重试次数
- */
- private Integer maxAttempts = 3;
- /**
- * 初始间隔时间,单位 ms
- */
- private Integer initInterval = 1000;
- /**
- * 乘积(每次乘以 2)
- */
- private Double multiplier = 2.0;
- }
复制代码 RetryTemplate
- @Configuration
- public class RetryConfig {
- @Resource
- private RetryProperties retryProperties;
- @Bean
- public RetryTemplate retryTemplate() {
- RetryTemplate retryTemplate = new RetryTemplate();
- // 定义重试策略(最多重试 3 次)
- SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
- retryPolicy.setMaxAttempts(retryProperties.getMaxAttempts()); // 最大重试次数
- // 定义间隔策略
- ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
- backOffPolicy.setInitialInterval(retryProperties.getInitInterval()); // 初始间隔 2000ms
- backOffPolicy.setMultiplier(retryProperties.getMultiplier()); // 每次乘以 2
- retryTemplate.setRetryPolicy(retryPolicy);
- retryTemplate.setBackOffPolicy(backOffPolicy);
- return retryTemplate;
- }
- }
复制代码 4.配置线程池
- @Configuration
- public class ThreadPoolConfig {
- @Bean(name = "taskExecutor")
- public Executor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- // 核心线程数
- executor.setCorePoolSize(10);
- // 最大线程数
- executor.setMaxPoolSize(50);
- // 队列容量
- executor.setQueueCapacity(200);
- // 线程活跃时间(秒)
- executor.setKeepAliveSeconds(30);
- // 线程名前缀
- executor.setThreadNamePrefix("NoteExecutor-");
- // 拒绝策略:由调用线程处理(一般为主线程)
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
- // 等待所有任务结束后再关闭线程池
- executor.setWaitForTasksToCompleteOnShutdown(true);
- // 设置等待时间,如果超过这个时间还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是被没有完成的任务阻塞
- executor.setAwaitTerminationSeconds(60);
- executor.initialize();
- return executor;
- }
- }
复制代码 5.配置mq异步发送工具类
- @Component
- @Slf4j
- public class SendMqRetryHelper {
- @Resource
- private RocketMQTemplate rocketMQTemplate;
- @Resource
- private RetryTemplate retryTemplate;
- @Resource(name = "taskExecutor")
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
- /**
- * 异步发送 MQ
- * @param topic
- */
- public void asyncSend(String topic, String body) {
- log.info("==> 开始异步发送 MQ, Topic: {}, Body: {}", topic, body);
- // 构建消息对象,并将 DTO 转成 Json 字符串设置到消息体中
- Message<String> message = MessageBuilder.withPayload(body)
- .build();
- // 异步发送 MQ 消息,提升接口响应速度
- rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- log.info("==> 【评论发布】MQ 发送成功,SendResult: {}", sendResult);
- }
- @Override
- public void onException(Throwable throwable) {
- log.error("==> 【评论发布】MQ 发送异常: ", throwable);
- handleRetry(topic, message);
- }
- });
- }
- /**
- * 重试处理
- * @param topic
- * @param message
- */
- private void handleRetry(String topic, Message<String> message) {
- // 异步处理
- threadPoolTaskExecutor.submit(() -> {
- try {
- // 通过 retryTemplate 执行重试
- retryTemplate.execute((RetryCallback<Void, RuntimeException>) context -> {
- log.info("==> 开始重试 MQ 发送, 当前重试次数: {}, 时间: {}", context.getRetryCount() + 1, LocalDateTime.now());
- // 同步发送 MQ
- rocketMQTemplate.syncSend(topic, message);
- return null;
- });
- } catch (Exception e) {
- // 多次重试失败,进入兜底方案
- fallback(e, topic, message.getPayload());
- }
- });
- }
- /**
- * 兜底方案: 将发送失败的 MQ 写入数据库,之后,通过定时任务扫表,将发送失败的 MQ 再次发送,最终发送成功后,将该记录物理删除
- */
- private void fallback(Exception e, String topic, String bodyJson) {
- log.error("==> 多次发送失败, 进入兜底方案, Topic: {}, bodyJson: {}", topic, bodyJson);
- // TODO:
- }
- }
复制代码 首先,RocketMQ 会异步发送消息并进行重试(取决于你的配置)。
如果 RocketMQ 异步发送失败而且重试 3 次后依然失败,onException 方法被调用。
在 onException 中,handleRetry 方法会被触发,该方法会调用 retryTemplate.execute(...) 来进行同步重试(取决于你的配置)。
如果全部重试失败,则会调用 fallback 方法进行兜底处置惩罚。
通常的话,这里mq的重试就配置成0了因为我们已经自己封装了重试机制
6.服务层利用
- // 发送 MQ (包含重试机制)
- sendMqRetryHelper.asyncSend(MQConstants.TOPIC, JsonUtils.toJsonString(MqDTO));
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |