异步编程——CompletableFuture详解

打印 上一主题 下一主题

主题 906|帖子 906|积分 2718

Future

JDK5 新增了Future接口,用于描述一个异步计算的效果。
固然 Future 以及相关使用方法提供了异步实验任务的能力,但是对于效果的获取却是很不方便,我们必须使用Future.get()的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取效果。
而且,Future 无法解决多个异步任务相互依赖的场景,简单点说就是,主线程需要等候子线程任务实验完毕之后在进行实验,这个时候你大概想到了 「CountDownLatch」,没错确实可以解决,代码如下。
这里界说两个 Future,第一个通过用户 id 获取用户信息,第二个通过商品 id 获取商品信息。
  1. public void testCountDownLatch() throws InterruptedException, ExecutionException {
  2.     ExecutorService executorService = Executors.newFixedThreadPool(5);
  3.     CountDownLatch downLatch = new CountDownLatch(2);
  4.     long startTime = System.currentTimeMillis();
  5.     Future<String> userFuture = executorService.submit(() -> {
  6.         //模拟查询商品耗时500毫秒
  7.         Thread.sleep(500);
  8.         downLatch.countDown();
  9.         return "用户A";
  10.     });
  11.     Future<String> goodsFuture = executorService.submit(() -> {
  12.         //模拟查询商品耗时500毫秒
  13.         Thread.sleep(400);
  14.         downLatch.countDown();
  15.         return "商品A";
  16.     });
  17.     downLatch.await();
  18.     //模拟主程序耗时时间
  19.     Thread.sleep(600);
  20.     System.out.println("获取用户信息:" + userFuture.get());
  21.     System.out.println("获取商品信息:" + goodsFuture.get());
  22.     System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  23. }
复制代码
Java8 以后这不再是一种优雅的解决方式,接下来来相识下 CompletableFuture 的使用。
CompletableFuture
  1. @Test
  2. public void testCompletableInfo() throws InterruptedException, ExecutionException {
  3.     long startTime = System.currentTimeMillis();
  4.     //调用用户服务获取用户基本信息
  5.     CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
  6.             //模拟查询商品耗时500毫秒
  7.     {
  8.         try {
  9.             Thread.sleep(500);
  10.         } catch (InterruptedException e) {
  11.             e.printStackTrace();
  12.         }
  13.         return "用户A";
  14.     });
  15.     //调用商品服务获取商品基本信息
  16.     CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
  17.             //模拟查询商品耗时500毫秒
  18.     {
  19.         try {
  20.             Thread.sleep(400);
  21.         } catch (InterruptedException e) {
  22.             e.printStackTrace();
  23.         }
  24.         return "商品A";
  25.     });
  26.     System.out.println("获取用户信息:" + userFuture.get());
  27.     System.out.println("获取商品信息:" + goodsFuture.get());
  28.     //模拟主程序耗时时间
  29.     Thread.sleep(600);
  30.     System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
  31. }
复制代码
CompletableFuture 创建方式

「supplyAsync」实验任务,支持返回值。
「runAsync」实验任务,没有返回值。
参数假如传了线程池就使用自界说的线程池,没传则使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建实验任务。(注意:默认内置线程池焦点数为机器焦点数减一,假如机器焦点数比2小时,会创建一个新线程去跑任务,发起在高并发场景使用自界说线程池
  1. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
  2. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
  3. public static CompletableFuture<Void> runAsync(Runnable runnable){..}
  4. public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor){..}
复制代码
CompletableFuture 获取方式
  1. //方式一
  2. public T get()
  3. //方式二
  4. public T get(long timeout, TimeUnit unit)
  5. //方式三
  6. public T getNow(T valueIfAbsent)
  7. //方式四
  8. public T join()
复制代码
说明:
「get()和 get(long timeout, TimeUnit unit)」 => 在 Future 中就已经提供了,后者提供超时处置处罚,假如在指定时间内未获取效果将抛出超时非常
「getNow」 => 立即获取效果不阻塞,效果计算已完成将返回效果或计算过程中的非常,假如未计算完成将返回设定的 valueIfAbsent 值
「join」 =>  方法里有非常不会抛出非常,但会抛出 CompletionException
异步回调方法

1、thenRun/thenRunAsync
平常点讲就是,「做完第一个任务后,再做第二个任务,第二个任务也没有返回值」。
【Async】加了则第一个任务使用的是你本身传入的线程池,第二个任务使用的是 ForkJoin 线程池,没加则第二个线程池也用传入的线程池。
2、thenAccept/thenAcceptAsync
第一个任务实验完成后,实验第二个回调方法任务,会将该任务的实验效果,作为入参,通报到回调方法中,但是回调方法是没有返回值的。
3、thenApply/thenApplyAsync
表示第一个任务实验完成后,实验第二个回调方法任务,会将该任务的实验效果,作为入参,通报到回调方法中,而且回调方法是有返回值的。
非常回调

whenComplete + exceptionally 示例
  1. public void testWhenCompleteExceptionally() throws ExecutionException, InterruptedException {
  2.     CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
  3.         if (Math.random() < 0.5) {
  4.             throw new RuntimeException("出错了");
  5.         }
  6.         System.out.println("正常结束");
  7.         return 0.11;
  8.     }).whenComplete((aDouble, throwable) -> {
  9.         if (aDouble == null) {
  10.             System.out.println("whenComplete aDouble is null");
  11.         } else {
  12.             System.out.println("whenComplete aDouble is " + aDouble);
  13.         }
  14.         if (throwable == null) {
  15.             System.out.println("whenComplete throwable is null");
  16.         } else {
  17.             System.out.println("whenComplete throwable is " + throwable.getMessage());
  18.         }
  19.     }).exceptionally((throwable) -> {
  20.         System.out.println("exceptionally中异常:" + throwable.getMessage());
  21.         return 0.0;
  22.     });
  23.     System.out.println("最终返回的结果 = " + future.get());
  24. }
复制代码
当出现非常时,exceptionally 中会捕获该非常,给出默认返回值 0.0。
而 「whenComplete」 这个回调函数:
「正常完成」:whenComplete 返回效果和上级任务一致,非常为 null;
「出现非常」:whenComplete 返回效果为 null,非常为上级任务的非常;
效果:
  1. whenComplete aDouble is null
  2. whenComplete throwable is java.lang.RuntimeException: 出错了
  3. exceptionally中异常:java.lang.RuntimeException: 出错了
  4. 最终返回的结果 = 0.0
复制代码
注意点

1、Future 需要获取返回值,才能获取非常信息

Future 需要获取返回值,才能获取到非常信息。假如不加 get()/join()方法,看不到非常信息。假如想要获取,思量是否加 try...catch...或者使用 exceptionally 方法。
2、CompletableFuture 的 get()方法是阻塞的

CompletableFuture 的 get()方法是阻塞的,假如使用它来获取异步调用的返回值,需要添加超时时间。
3、不发起使用默认线程池

CompletableFuture 代码中使用了默认的 「ForkJoin 线程池」, 处置处罚的线程个数是电脑 「CPU 核数-1」 。在大量请求过来的时候,处置处罚逻辑复杂的话,响应会很慢。一般发起使用自界说线程池,优化线程池配置参数。
4、自界说线程池时,注意拒绝策略

假如线程池拒绝策略是 DiscardPolicy(丢弃当前任务) 或者 DiscardOldestPolicy(丢弃最旧的那个任务),当线程池饱和时,会直接丢弃任务,不会抛弃非常。因此发起,CompletableFuture 线程池策略最好使用 AbortPolicy(抛出实验非常)或者CallerRunsPolicy(让主线程实验)。
联合业务代码使用示例

Util工具类
  1. public class CompletableFutureUtil {
  2.     private CompletableFutureUtil(){}
  3.     public static <R> CompletableFuture<R>  executeWithFallbackAndContextPropagation(@Nonnull Supplier<R> normalFunction,
  4.                                                  @Nonnull Supplier<R> exceptionFunction,
  5.                                                  @Nonnull ThreadPoolTaskExecutor taskExecutor,
  6.                                                  @Nonnull String exceptionMsg){
  7.         Thread mainThread = Thread.currentThread();
  8.         return CompletableFuture
  9.                 .supplyAsync(normalFunction,taskExecutor)
  10.                 .exceptionally(e -> {
  11.                     log.error(exceptionMsg, e);
  12.                     return exceptionFunction.get();
  13.                 })
  14.                 .whenComplete((data,e)->{
  15.                     if(!mainThread.equals(Thread.currentThread())){
  16.                         MallContextHolderManager.clearContext();
  17.                     }
  18.                 });
  19.     }
  20.    
  21. }
复制代码
使用Util创建任务代码
  1.     private CompletableFuture<Boolean> asyncQueryCommentPic(ProductDetailInfoNewDto detailInfoDto, ProductInfoQueryDTO productInfoQuery) {
  2.         ThreadPoolTaskExecutor taskExecutor = bizThreadPoolManager.getBizThreadPoolTaskExecutor(BIZ_THREAD_POOL_NAME);
  3.         // 兜底获取不到线程池时降级
  4.         if (taskExecutor == null) {
  5.             detailInfoDto.setShowPrimaryPic(Boolean.FALSE);
  6.             return null;
  7.         }
  8.         return CompletableFutureUtil.executeWithFallbackAndContextPropagation(
  9.                 () -> queryShowPrimaryPic(detailInfoDto, productInfoQuery),
  10.                 () -> Boolean.FALSE,
  11.                 taskExecutor,
  12.                 "异步任务执行异常");
  13.     }
复制代码
获取任务效果代码
  1.     private void handShowPrimaryPic(ProductDetailInfoNewDto detailInfoDto, CompletableFuture<Boolean> commentPicFuture) {
  2.         detailInfoDto.setShowPrimaryPic(Boolean.FALSE);
  3.         if (commentPicFuture != null) {
  4.             try {
  5.                 Boolean showPrimaryPic = commentPicFuture.get(asyncGetCommentPrimaryPicTimeout, TimeUnit.MILLISECONDS);
  6.                 detailInfoDto.setShowPrimaryPic(showPrimaryPic);
  7.             } catch (Exception e) {
  8.                 log.error("任务等待结果异常:future={}", JSON.toJSONString(commentPicFuture), e);
  9.             }
  10.         }
  11.     }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张国伟

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表