重试机制与 CompletableFuture 拓展

打印 上一主题 下一主题

主题 995|帖子 995|积分 2985

重试机制与 CompletableFuture 拓展

禁止转载。
本文旨在讨论重试机制的特点和策略,分析常用重试类库的实现,讨论为 CompletableFuture 添加重试机制的方法。文章首发同名公众号,接待关注。
重试示例

以下是一个常见的使用异步重试的例子,当我们需要重试功能时,只需调用 retry 方法,传入相应的重试策略即可。这里的重试策略为重试 2 次,使用回退策略(backoff),重试间隔为 100ms,抖动因子为 0.75,同时指定了调度器。
  1. // Project Reactor 提供的重试方法
  2. public Mono<String> getUsername(String userId) {
  3.           // backoff 重试策略
  4.     var backoffRetry = Retry
  5.         .backoff(2, Duration.ofMillis(100))
  6.         .jitter(0.75)
  7.         .scheduler(Schedulers.newSingle("retry scheduler"));
  8.     return webClient.get()
  9.         .uri("localhost:8080/user/", userId)
  10.         .accept(MediaType.APPLICATION_JSON)
  11.         .retrieve()
  12.         .bodyToMono(String.class)
  13.               // 若为简单重试可改为调用 retry(n)
  14.         .retryWhen(backoffRetry);
  15. }
复制代码
以下图片摘自 Mono#retryWhen 文档注释

Project Reactor 是基于发布-订阅模子的相应式组件。从图中可以看出,每次获取数据失败后,会等候一段时间,然后再次订阅发布者以获取数据,反复以上过程直到达到最终重试次数大概出现成功结果。
Spring Retry 类库提供了重试模版:
  1. RetryTemplate template = RetryTemplate.builder()
  2.                                 .maxAttempts(3)
  3.                                 .fixedBackoff(1000)
  4.                                 .retryOn(RemoteAccessException.class)
  5.                                 .build();
  6. // 重试
  7. template.execute(ctx -> {
  8.     // ... do something
  9. });
复制代码
重试模版需要传入使命,而 Project Reactor 中发布者-订阅者两者解耦,可以实现多次订阅,因此不影响链式调用。
若想为 CompletableFuture 增长重试功能,最好是使用类似 Spring-Retry 的模式,添加工具类方法 retry,参数包括使命、重试策略等。
重试策略


  • 触发重试策略
    特定非常(如支持好坏名单)、特定返回值、自定义
  • 等候策略(backoff 算法)
    无等候、固定时间(fixed)、等量增长时间(incremental)、指数增长时间(exponentail backoff)、随机时间(random)、斐波那契数列(fibnonacci) 、自定义
  • 停止策略
    尝试次数(maxAttempts)、超时停止、自定义
重试策略应该留意区分有状态重试和无状态重试:
有状态重试表示各个重试之间存在相互依赖,好比

  • 每次访问网站信息时,返回错误信息包含了下一次可以正常访问的时间
  • 输入密码多次错误后,需要等候若干时间再重试
  • 共用相同的限流组件;
无状态重试表示每次重试不依赖其他重试的结果,实现容易,某些复杂的有状态重试可以使用无状态重试实现。
重试上下文信息

常见的重试上下文有:重试次数、每次返回结果、日记记录、回调。
回调方法包括每次返回结果时回调、最终返回结果时回调。
简易实现代码

手动实现最简单的方法是调用 exceptionally 大概 exceptionallyCompose 方法,多次传入重试使命。
1. 迭代实现 N 次重试

以下代码使用了迭代法,缺点是造成 CompletableFuture 内部维护的 stack 过深,增长不须要的内存开销;无法实现无限次重试。
  1. public static <T> CompletableFuture<T> retry(Supplier<T> supplier, int attempts) {
  2.     var cf = supplyAsync(supplier);
  3.     for (int i = 0; i < attempts; i++) {
  4.         cf = cf.exceptionally(ex -> supplier.get());
  5.     }
  6.     return cf;
  7. }
复制代码
2. 递归实现 N 次重试

使用递归办理了以上问题:
  1. @Slf4j
  2. class RetryNAttemptsDemo {
  3.           // 演示用,忽略线程池配置
  4.     public static void main(String[] args) {
  5.               // 任务3次重试后返回正确结果
  6.         var times = new AtomicInteger();
  7.         Supplier<Integer> task = () -> {
  8.             if (times.getAndIncrement() < 3) {
  9.                 throw new RuntimeException("异常结果");
  10.             } else {
  11.                 return 42;
  12.             }
  13.         };
  14.               // 使用重试
  15.         retry(4, () -> supplyAsync(task))
  16.             .thenAcceptAsync(r -> log.info("获取结果: {}", r))
  17.             .whenComplete((__, ex) -> log.error("最终获取结果异常", ex))
  18.             .join();
  19.     }
  20.     public static <T> CompletableFuture<T> retry(int attempts, Supplier<CompletionStage<T>> supplier) {
  21.               // 使用CompletableFuture的写功能
  22.         var result = new CompletableFuture<T>();
  23.         retryNAttempts(result, attempts, supplier);
  24.         return result;
  25.     }
  26.     private static <T> void retryNAttempts(CompletableFuture<T> result, int attempts, Supplier<CompletionStage<T>> supplier) {
  27.         supplier.get()
  28.             .thenAccept(result::complete)
  29.             .whenComplete((__, throwable) -> {
  30.                 if (attempts > 0L) {
  31.                     log.warn("异常重试");
  32.                     retryNAttempts(result, attempts - 1, supplier);
  33.                 } else {
  34.                     log.error("多次重试异常结果", throwable);
  35.                     result.completeExceptionally(throwable);
  36.                 }
  37.             });
  38.     }
  39. }
复制代码
执行结果如下,符合预期。
  1. > Task :RetryNAttemptsDemo.main()
  2. 23:18:32.042 [main] WARN com.example.demo.futures.RetryNAttemptsDemo -- 异常重试
  3. 23:18:32.043 [main] WARN com.example.demo.futures.RetryNAttemptsDemo -- 异常重试
  4. 23:18:32.044 [main] WARN com.example.demo.futures.RetryNAttemptsDemo -- 异常重试
  5. 23:18:32.044 [ForkJoinPool.commonPool-worker-1] INFO com.example.demo.futures.RetryNAttemptsDemo -- 获取结果: 42
复制代码
3. 递归实现 backoff

思绪:

  • 正常结果和非常结果分别处理,如有最闭幕果则记录到 result
  • 处理结果为重试等候时间
  • 执行重试(使用 ScheduledExecutorService#schedule)
  1. @Slf4j
  2. class BackoffRetryDemo {
  3.     public static final long STOP_RETRY = -1L;
  4.   
  5.     private final int maxAttempts;
  6.     private final AtomicInteger attempts = new AtomicInteger();
  7.     // 延迟时间(ms)
  8.     private final long delay;
  9.     BackoffRetryDemo(int maxAttempts, long delay) {
  10.         this.maxAttempts = maxAttempts;
  11.         this.delay = delay;
  12.     }
  13.     public <T> CompletableFuture<T> retry(Supplier<CompletionStage<T>> stageSupplier, ScheduledExecutorService delayer) {
  14.         CompletableFuture<T> result = new CompletableFuture<>();
  15.         retry(stageSupplier, delayer, result);
  16.         return result;
  17.     }
  18.     private <T> void retry(Supplier<CompletionStage<T>> stageSupplier, ScheduledExecutorService delayer, CompletableFuture<T> result) {
  19.         attempts.incrementAndGet();
  20.         stageSupplier.get()
  21.             .thenApply(r -> {
  22.                 result.complete(r);
  23.                 return STOP_RETRY;
  24.             })
  25.             .exceptionally(throwable -> {
  26.                 if (attempts.get() >= maxAttempts) {
  27.                     result.completeExceptionally(throwable);
  28.                     return STOP_RETRY;
  29.                 }
  30.                 log.warn("异常重试");
  31.                 return delay;
  32.             })
  33.             .thenAccept(delay -> {
  34.                 if (delay == 0L)
  35.                     delayer.execute(() -> retry(stageSupplier, delayer, result));
  36.                 else if (delay > 0L)
  37.                     delayer.schedule(() -> retry(stageSupplier, delayer, result), delay, TimeUnit.MILLISECONDS);
  38.             });
  39.     }
  40.     public static void main(String[] args) {
  41.         var times = new AtomicInteger();
  42.         Supplier<Integer> task = () -> {
  43.             if (times.getAndIncrement() < 3) {
  44.                 throw new RuntimeException("异常结果");
  45.             } else {
  46.                 return 42;
  47.             }
  48.         };
  49.         var backoffRetry = new BackoffRetryDemo(4, 500);
  50.         backoffRetry.retry(() -> supplyAsync(task), Executors.newSingleThreadScheduledExecutor())
  51.             .thenAcceptAsync(r -> log.info("获取结果: {}", r))
  52.             .exceptionallyAsync(throwable -> {
  53.                 log.error("最终获取结果异常", throwable);
  54.                 return null;
  55.             })
  56.             .join();
  57.     }
  58. }
复制代码
执行日记如下:
  1. > Task :BackoffRetryDemo.main()
  2. 23:54:12.099 [main] WARN com.example.demo.futures.BackoffRetryDemo -- 异常重试
  3. 23:54:12.610 [pool-1-thread-1] WARN com.example.demo.futures.BackoffRetryDemo -- 异常重试
  4. 23:54:13.113 [ForkJoinPool.commonPool-worker-1] WARN com.example.demo.futures.BackoffRetryDemo -- 异常重试
  5. 23:54:13.621 [ForkJoinPool.commonPool-worker-1] INFO com.example.demo.futures.BackoffRetryDemo -- 获取结果: 42
复制代码
从结果可以看出,实现了延迟重试,重试等候时间为 500ms,三次尝试后获取到了正确结果。
差别类库的实现浅析

1. Resiliance4J

将 Retry 视为高阶函数装饰器,可以实现对任意方法的加强,如 Supplier, Consumer, CompletableFuture
  1. CheckedFunction0<String> retryableSupplier = Retry
  2.   .decorateCheckedSupplier(retry, helloWorldService::sayHelloWorld);
复制代码
  1. // 线程安全类
  2. public interface Retry {
  3.           // 装饰器方法,为 supplier 增加可重试功能
  4.     static <T> Supplier<CompletionStage<T>> decorateCompletionStage(
  5.         Retry retry,
  6.         ScheduledExecutorService scheduler,
  7.         Supplier<CompletionStage<T>> supplier
  8.     ) {
  9.         return () -> {
  10.                   // 这里使用 final 可能是为了兼容 JDK7
  11.             final CompletableFuture<T> promise = new CompletableFuture<>();
  12.             final Runnable block = new AsyncRetryBlock<>(scheduler, retry.asyncContext(), supplier,
  13.                 promise);
  14.             block.run();
  15.             return promise;
  16.         };
  17.     }
  18.   
  19.           // 全局管理 Retry 支持
  20.     String getName();
  21.           Map<String, String> getTags();
  22.           // 上下文支持回调
  23.     <T> Retry.Context<T> context();
  24.     <T> Retry.AsyncContext<T> asyncContext();
  25.           // 重试策略
  26.     RetryConfig getRetryConfig();
  27.     // 事件支持
  28.     EventPublisher getEventPublisher();
  29.          
  30.     default <T> CompletionStage<T> executeCompletionStage(ScheduledExecutorService scheduler,
  31.                                                           Supplier<CompletionStage<T>> supplier) {
  32.         return decorateCompletionStage(this, scheduler, supplier).get();
  33.     }
  34.           // 略去其他执行方法,如 executeSupplier,executeRunnable
  35.   
  36.                 // 监控信息
  37.     Metrics getMetrics();
  38.     interface Metrics {
  39.         long getNumberOfSuccessfulCallsWithoutRetryAttempt();
  40.         long getNumberOfFailedCallsWithoutRetryAttempt();
  41.         long getNumberOfSuccessfulCallsWithRetryAttempt();
  42.         long getNumberOfFailedCallsWithRetryAttempt();
  43.     }
  44.                 // 回调支持
  45.     interface AsyncContext<T> {
  46.         void onComplete();
  47.         long onError(Throwable throwable);
  48.         long onResult(T result);
  49.     }
  50.     interface Context<T> {
  51.         void onComplete();
  52.         boolean onResult(T result);
  53.         void onError(Exception exception) throws Exception;
  54.         void onRuntimeError(RuntimeException runtimeException);
  55.     }
  56.                
  57.           // 事件支持,发布订阅模式,实现回调或者异步的另一种机制,发布者和订阅者(消费者)解耦
  58.     interface EventPublisher extends io.github.resilience4j.core.EventPublisher<RetryEvent> {
  59.         EventPublisher onRetry(EventConsumer<RetryOnRetryEvent> eventConsumer);
  60.         EventPublisher onSuccess(EventConsumer<RetryOnSuccessEvent> eventConsumer);
  61.         EventPublisher onError(EventConsumer<RetryOnErrorEvent> eventConsumer);
  62.         EventPublisher onIgnoredError(EventConsumer<RetryOnIgnoredErrorEvent> eventConsumer);
  63.     }
  64.                 // 这个类不知为何放在接口里面,实际上可以提出来
  65.     class AsyncRetryBlock<T> implements Runnable {
  66.               // 下一部分分析
  67.     }
  68. }
复制代码
不过异步加强的 CompletableFuture 不支持 Error 类型 fallback,封装了异步执行逻辑,实现逻辑和上一节 backoff 简易实现同等。
  1. class AsyncRetryBlock<T> implements Runnable {
  2.     private final ScheduledExecutorService scheduler;
  3.           // 调用其回调方法 onResult, onError
  4.     private final Retry.AsyncContext<T> retryContext;
  5.     private final Supplier<CompletionStage<T>> supplier;
  6.           // 最终结果,使用 CompletableFuture 的写功能
  7.     private final CompletableFuture<T> promise;
  8.           // 略去构造器代码
  9.     @Override
  10.     public void run() {
  11.         final CompletionStage<T> stage = supplier.get();
  12.         stage.whenComplete((result, throwable) -> {
  13.             if (throwable != null) {
  14.                       // 支持 Exception 类型 fallback
  15.                 if (throwable instanceof Exception) {
  16.                     onError((Exception) throwable);
  17.                 } else {
  18.                     promise.completeExceptionally(throwable);
  19.                 }
  20.             } else {
  21.                 onResult(result);
  22.             }
  23.         });
  24.     }
  25.                
  26.           // 重试或结束
  27.     private void onError(Exception t) {
  28.         final long delay = retryContext.onError(t);
  29.         if (delay < 1) {
  30.             promise.completeExceptionally(t);
  31.         } else {
  32.             scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
  33.         }
  34.     }
  35.           // 重试或结束
  36.     private void onResult(T result) {
  37.         final long delay = retryContext.onResult(result);
  38.         if (delay < 1) {
  39.             try {
  40.                 retryContext.onComplete();
  41.                 promise.complete(result);
  42.             } catch (Exception e) {
  43.                 promise.completeExceptionally(e);
  44.             }
  45.         } else {
  46.             scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
  47.         }
  48.     }
  49. }
复制代码
再来看 Context 的具体实现,总结为以下几点:

  • 记录执行统计信息(如 numOfAttempts, lastException, succeededWithoutRetryCounter)
  • 发布相关变乱(publishRetryEvent)
  • 每次执行前后支持回调, 如 consumeResultBeforeRetryAttempt
  • 代码执行时调用 RetryConfig 指定的策略(策略模式)
  1. // RetryImpl 的内部类, RetryImpl 持有统计信息相关字段,重试策略相关字段
  2. public final class AsyncContextImpl implements Retry.AsyncContext<T> {
  3.     private final AtomicInteger numOfAttempts = new AtomicInteger(0);
  4.     private final AtomicReference<Throwable> lastException = new AtomicReference<>();
  5.     @Override
  6.     public long onError(Throwable throwable) {
  7.         totalAttemptsCounter.increment();
  8.         // Handle the case if the completable future throw CompletionException wrapping the original exception
  9.         // where original exception is the one to retry not the CompletionException.
  10.               // 异常解包
  11.         if (throwable instanceof CompletionException || throwable instanceof ExecutionException) {
  12.             Throwable cause = throwable.getCause();
  13.             return handleThrowable(cause);
  14.         } else {
  15.             return handleThrowable(throwable);
  16.         }
  17.     }
  18.                
  19.           // handleThrowable 和 handleOnError 做了类似的逻辑,从名字上无法区分,还不如直接合并成一个方法
  20.     private long handleThrowable(Throwable throwable) {
  21.               // 自定义方法判断是否需要 retry,exceptionPredicate 来自 RetryConfig
  22.         if (!exceptionPredicate.test(throwable)) {
  23.             failedWithoutRetryCounter.increment();
  24.             publishRetryEvent(() -> new RetryOnIgnoredErrorEvent(getName(), throwable));
  25.             return -1;
  26.         }
  27.         return handleOnError(throwable);
  28.     }
  29.     private long handleOnError(Throwable throwable) {
  30.         lastException.set(throwable);
  31.         int attempt = numOfAttempts.incrementAndGet();
  32.         if (attempt >= maxAttempts) {
  33.             failedAfterRetryCounter.increment();
  34.             publishRetryEvent(() -> new RetryOnErrorEvent(name, attempt, throwable));
  35.             return -1;
  36.         }
  37.                                 // backoff 策略, 来自 RetryConfig
  38.         long interval = intervalBiFunction.apply(attempt, Either.left(throwable));
  39.         if (interval < 0) {
  40.             publishRetryEvent(() -> new RetryOnErrorEvent(getName(), attempt, throwable));
  41.         } else {
  42.             publishRetryEvent(() -> new RetryOnRetryEvent(getName(), attempt, throwable, interval));
  43.         }
  44.         return interval;
  45.     }
  46.           // 略去其他方法
  47. }
复制代码
2. Spring Retry

这里不讨论 AOP 实现的重试加强,仅讨论命令式代码实现。
Spring Retry 实现了有状态的重试,很多方法需要显式传参数 RetryContext,有多种 RetryContext 支持,RetrySynchronizationManager 提供了全局 RetryContext 上下文支持,底层使用 ThreadLocal,提供获取上下文、注册上下文等方法。
使命封装为 RetryCallback,不直接支持 CompletableFuture。
  1. // 封装的重试任务
  2. public interface RetryCallback<T, E extends Throwable> {
  3.                 // 无状态重试不需要使用context
  4.     /**
  5.      * Execute an operation with retry semantics.
  6.      */
  7.     T doWithRetry(RetryContext context) throws E;
  8.     /**
  9.      * A logical identifier for this callback to distinguish retries around business
  10.      * operations.
  11.      */
  12.     default String getLabel() {
  13.        return null;
  14.     }
  15. }
复制代码
RetryOperation 定义了重试操纵:
  1. public interface RetryOperations {
  2.         <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E;
  3.         <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) throws E;
  4.         <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RetryState retryState) throws E, ExhaustedRetryException;
  5.         <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState retryState) throws E;
  6. }
复制代码
回调接口定义了回调操纵:
  1. public interface RetryListener {
  2.                 // 开始重试时回调
  3.     /**
  4.      * Called before the first attempt in a retry. For instance, implementers can set up
  5.      * state that is needed by the policies in the {@link RetryOperations}. The whole
  6.      * retry can be vetoed by returning false from this method, in which case a
  7.      * {@link TerminatedRetryException} will be thrown.
  8.      */
  9.     default <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
  10.        return true;
  11.     }
  12.                
  13.                 // 结束重试时回调
  14.     /**
  15.      * Called after the final attempt (successful or not). Allow the listener to clean up
  16.      * any resource it is holding before control returns to the retry caller.
  17.      */
  18.     default <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
  19.           Throwable throwable) {
  20.     }
  21.                 // 成功时回调
  22.     /**
  23.      * Called after a successful attempt; allow the listener to throw a new exception to
  24.      * cause a retry (according to the retry policy), based on the result returned by the
  25.      * {@link RetryCallback#doWithRetry(RetryContext)}
  26.      */
  27.     default <T, E extends Throwable> void onSuccess(RetryContext context, RetryCallback<T, E> callback, T result) {
  28.     }
  29.                 // 失败时回调
  30.     /**
  31.      * Called after every unsuccessful attempt at a retry.
  32.      */
  33.     default <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
  34.           Throwable throwable) {
  35.     }
  36. }
复制代码
这里仅讨论第一个 execute 方法的实现:
  1. // 不可变类,线程安全类
  2. public class RetryTemplate implements RetryOperations {
  3.   // 略去 execute 语义外方法,如对象创建与初始化
  4.   protected final Log logger = LogFactory.getLog(getClass());
  5.   private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();
  6.   private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
  7.   private volatile RetryListener[] listeners = new RetryListener[0];
  8.   private RetryContextCache retryContextCache = new MapRetryContextCache();
  9.   private boolean throwLastExceptionOnExhausted;
  10.   @Override
  11.   public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback) throws E {
  12.     return doExecute(retryCallback, null, null);
  13.   }
  14.   
  15.         // 方法比较长,模版方法模式
  16.   protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
  17.                                                  RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {
  18.     RetryPolicy retryPolicy = this.retryPolicy;
  19.     BackOffPolicy backOffPolicy = this.backOffPolicy;
  20.    
  21.     // Allow the retry policy to initialise itself...
  22.     // 重试过程中,context 不断变化,每次重试需要初始化
  23.     RetryContext context = open(retryPolicy, state);
  24.     if (this.logger.isTraceEnabled()) {
  25.       this.logger.trace("RetryContext retrieved: " + context);
  26.     }
  27.    
  28.     // Make sure the context is available globally for clients who need
  29.     // it...
  30.     // 保证重试执行时可以随时获得 context,使用了 ThreadLocal, context 和线程绑定
  31.     RetrySynchronizationManager.register(context);
  32.     Throwable lastException = null;
  33.     boolean exhausted = false;
  34.     try {
  35.       // 一些准备工作
  36.       // 回调,可提前终止重试
  37.       // Give clients a chance to enhance the context...
  38.       boolean running = doOpenInterceptors(retryCallback, context);
  39.       if (!running) {
  40.         throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
  41.       }
  42.                         // 设置 context 最大重试数
  43.       if (!context.hasAttribute(RetryContext.MAX_ATTEMPTS)) {
  44.         context.setAttribute(RetryContext.MAX_ATTEMPTS, retryPolicy.getMaxAttempts());
  45.       }
  46.       // Get or Start the backoff context...
  47.       BackOffContext backOffContext = null;
  48.       Object resource = context.getAttribute("backOffContext");
  49.       if (resource instanceof BackOffContext) {
  50.         backOffContext = (BackOffContext) resource;
  51.       }
  52.       if (backOffContext == null) {
  53.         backOffContext = backOffPolicy.start(context);
  54.         if (backOffContext != null) {
  55.           context.setAttribute("backOffContext", backOffContext);
  56.         }
  57.       }
  58.       Object label = retryCallback.getLabel();
  59.       String labelMessage = (label != null) ? "; for: '" + label + "'" : "";
  60.       // 准备工作结束,开始执行 retry 核心代码
  61.       // 循环内部为任务执行的完整 try-catch 过程,基本思想和函数式基于轨道编程(Railway-Oriented-Programming)的 CompletableFuture 不同
  62.       /*
  63.       * We allow the whole loop to be skipped if the policy or context already
  64.       * forbid the first try. This is used in the case of external retry to allow a
  65.       * recovery in handleRetryExhausted without the callback processing (which
  66.       * would throw an exception).
  67.       */
  68.       while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
  69.         try {
  70.           if (this.logger.isDebugEnabled()) {
  71.             this.logger.debug("Retry: count=" + context.getRetryCount() + labelMessage);
  72.           }
  73.           // Reset the last exception, so if we are successful
  74.           // the close interceptors will not think we failed...
  75.           lastException = null;
  76.           // 任务执行
  77.           T result = retryCallback.doWithRetry(context);
  78.           // 成功回调
  79.           doOnSuccessInterceptors(retryCallback, context, result);
  80.           return result;
  81.         }
  82.         catch (Throwable e) {
  83.           lastException = e;
  84.           try {
  85.             // 每次异常回调
  86.             // 进行的操作一般有:失败次数 + 1, 记录 lastException
  87.             registerThrowable(retryPolicy, state, context, e);
  88.           }
  89.           catch (Exception ex) {
  90.             throw new TerminatedRetryException("Could not register throwable", ex);
  91.           }
  92.           finally {
  93.                    // RetryListener 失败回调
  94.             doOnErrorInterceptors(retryCallback, context, e);
  95.           }
  96.           // 执行 backoff 策略
  97.           if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
  98.             try {
  99.               backOffPolicy.backOff(backOffContext);
  100.             }
  101.             catch (BackOffInterruptedException ex) {
  102.               // back off was prevented by another thread - fail the retry
  103.               if (this.logger.isDebugEnabled()) {
  104.                 this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount()
  105.                                   + labelMessage);
  106.               }
  107.               throw ex;
  108.             }
  109.           }
  110.          
  111.           if (this.logger.isDebugEnabled()) {
  112.             this.logger.debug("Checking for rethrow: count=" + context.getRetryCount() + labelMessage);
  113.           }
  114.           if (shouldRethrow(retryPolicy, context, state)) {
  115.             if (this.logger.isDebugEnabled()) {
  116.               this.logger
  117.                 .debug("Rethrow in retry for policy: count=" + context.getRetryCount() + labelMessage);
  118.             }
  119.             throw RetryTemplate.<E>wrapIfNecessary(e);
  120.           }
  121.         } // while 循环内 try-catch 结束
  122.                        
  123.         // 仅考虑无状态重试(state is null),可以忽略这段代码
  124.         /*
  125.         * A stateful attempt that can retry may rethrow the exception before now,
  126.         * but if we get this far in a stateful retry there's a reason for it,
  127.         * like a circuit breaker or a rollback classifier.
  128.         */
  129.         if (state != null && context.hasAttribute(GLOBAL_STATE)) {
  130.           break;
  131.         }
  132.       } // while 循环末尾
  133.       if (state == null && this.logger.isDebugEnabled()) {
  134.         this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount() + labelMessage);
  135.       }
  136.       exhausted = true;
  137.       return handleRetryExhausted(recoveryCallback, context, state);
  138.     }
  139.     catch (Throwable e) {
  140.       // 重试代码抛出异常,无法处理,rethrow
  141.       throw RetryTemplate.<E>wrapIfNecessary(e);
  142.     }
  143.     finally {
  144.       close(retryPolicy, context, state, lastException == null || exhausted);
  145.       // RetryListener 关闭回调
  146.       doCloseInterceptors(retryCallback, context, lastException);
  147.       RetrySynchronizationManager.clear();
  148.     }
  149.   }
  150. }
复制代码
总结一下 Spring-Retry 的特点

  • 支持回调(RetryListener) 和有状态上下文(RetryContext、backoffContext、RetryState)
  • 缺点:不支持异步 backoff,backoff 在同一线程内。
  • 上下文和线程绑定,底层使用 ThreadLocal,代码中会有隐式传参问题。
CompletableFuture 和重试机制有关的特点


  • 若想实现特定返回值触发重试策略,CompletableFuture 存在成功运算管道和非常管道,保举的做法是:thenCompose 转化某些错误值到特定非常,配置特定非常触发重试策略。
  • ComletableFuture 中的结果为非常时,需要解包才能获取真实的代码执行时非常。
  • CompletableFuture 提供了限时获取值方法,可以轻松实现超时停止策略。
  • 取消机制,上文中的简易实现没有考虑 retry 方法返回结果被取消的情况,此时运行中的使命应该主动 cancel。
  • 可以天然地支持异步重试(重试使命执行不限于同一线程中)
  • 在单线程中sleep一段时间,再重试也是一种能担当的办理方案
CFFU

CFFU(CompletableFuture Fu )是一个小小的 CompletableFuture(CF)辅助加强库,提升 CF 使用体验并淘汰误用,在业务中更方便高效安全地使用 CF。
CFFU 并不支持重试,假如你想实现 CompletableFuture 的重试功能,可以使用 Resilience4J。
本文由博客一文多发平台 OpenWrite 发布!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

半亩花草

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表