ToB企服应用市场:ToB评测及商务社交产业平台

标题: 多线程系列(二十) -CompletableFuture使用详解 [打印本页]

作者: 嚴華    时间: 2024-5-14 04:53
标题: 多线程系列(二十) -CompletableFuture使用详解
一、摘要

在上篇文章中,我们介绍了Future相关的用法,使用它可以获取异步任务执行的返回值。
我们再次回首一下Future相关的用法。
  1. public class FutureTest {
  2.     public static void main(String[] args) throws Exception {
  3.         long startTime = System.currentTimeMillis();
  4.         // 创建一个线程池
  5.         ExecutorService executor = Executors.newFixedThreadPool(1);
  6.         // 提交任务并获得Future的实例
  7.         Future<String> future = executor.submit(new Callable<String>() {
  8.             @Override
  9.             public String call() throws Exception {
  10.                 // 执行下载某文件任务,并返回文件名称
  11.                 System.out.println("thread name:" +  Thread.currentThread().getName() + " 开始执行下载任务");
  12.                 Thread.sleep(200);
  13.                 return "xxx.png";
  14.             }
  15.         });
  16.         //模拟主线程其它操作耗时
  17.         Thread.sleep(300);
  18.         // 通过阻塞方式,从Future中获取异步执行返回的结果
  19.         String result = future.get();
  20.         System.out.println("任务执行结果:" +  result);
  21.         System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms");
  22.         // 任务执行完毕之后,关闭线程池
  23.         executor.shutdown();
  24.     }
  25. }
复制代码
运行效果如下:
  1. thread name:pool-1-thread-1 开始执行下载任务
  2. 任务执行结果:xxx.png
  3. 总共用时:308ms
复制代码
假如不采用线程执行,那么总共用时应该会是 200 + 300 = 500 ms,而采用线程来异步执行,总共用时是 308 ms。不难发现,通过Future和线程池的搭配使用,可以有效的提拔程序的执行效率。
但是Future对异步执行效果的获取并不是很友好,要么调用阻塞方法get()获取效果,要么轮训调用isDone()方法是否等于true来判定任务是否执行完毕来获取效果,这两种方法都不算很好,由于主线程会被迫等待。
因此,从 Java 8 开始引入了CompletableFuture,它针对Future做了很多的改进,在实现Future接口相关功能之外,还支持传入回调对象,当异步任务完成大概发生异常时,主动调用回调对象方法。
下面我们一起来看看CompletableFuture相关的用法!
二、CompletableFuture 用法介绍

我们照旧以上面的例子为例,改用CompletableFuture来实现,内容如下:
  1. public class FutureTest2 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 创建异步执行任务
  4.         CompletableFuture<String> cf = CompletableFuture.supplyAsync(FutureTest2::download);
  5.         // 如果执行成功,回调此方法
  6.         cf.thenAccept((result) -> {
  7.             System.out.println("任务执行成功,返回结果值:" +  result);
  8.         });
  9.         // 如果执行异常,回调此方法
  10.         cf.exceptionally((e) -> {
  11.             System.out.println("任务执行失败,原因:" +  e.getMessage());
  12.             return null;
  13.         });
  14.         //模拟主线程其它操作耗时
  15.         Thread.sleep(300);
  16.     }
  17.     /**
  18.      * 下载某个任务
  19.      * @return
  20.      */
  21.     private static String download(){
  22.         // 执行下载某文件任务,并返回文件名称
  23.         System.out.println("thread name:" +  Thread.currentThread().getName() + " 开始执行下载任务");
  24.         try {
  25.             Thread.sleep(200);
  26.         } catch (InterruptedException e) {
  27.             e.printStackTrace();
  28.         }
  29.         return "xxx.png";
  30.     }
  31. }
复制代码
运行效果如下:
  1. thread name:ForkJoinPool.commonPool-worker-1 开始执行下载任务
  2. 任务执行成功,返回结果值:xxx.png
复制代码
可以发现,采用CompletableFuture类的supplyAsync()方法进行异步编程,代码上简便了很多,不必要单独创建线程池。
实际上,CompletableFuture也使用了线程池来执行任务,部门焦点源码如下:
  1. public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
  2.     // 判断当前机器 cpu 可用逻辑核心数是否大于1
  3.     private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1);
  4.    
  5.     // 默认采用的线程池
  6.     // 如果useCommonPool = true,采用 ForkJoinPool.commonPool 线程池
  7.     // 如果useCommonPool = false,采用 ThreadPerTaskExecutor 执行器
  8.     private static final Executor asyncPool = useCommonPool ?
  9.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
  10.     // ThreadPerTaskExecutor执行器类
  11.     static final class ThreadPerTaskExecutor implements Executor {
  12.         public void execute(Runnable r) { new Thread(r).start(); }
  13.     }
  14.     // 异步执行任务的方法
  15.     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
  16.         return asyncSupplyStage(asyncPool, supplier);
  17.     }
  18.     // 异步执行任务的方法,支持传入自定义线程池
  19.     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
  20.                                                        Executor executor) {
  21.         return asyncSupplyStage(screenExecutor(executor), supplier);
  22.     }
  23. }
复制代码
从源码上可以分析出如下几点:
其中ForkJoinPool线程池是从 JDK 1.7 版本引入的,它是一个全新的线程池,后面在介绍Fork/Join框架文章中对其进行介绍。
除此之外,CompletableFuture为开发者还提供了几十种方法,以便满足更多的异步任务执行的场景。这些方法包罗创建异步任务、任务异步回调、多个任务组合处理等内容,下面我们就一起来学习一下相关的使用方式。
2.1、创建异步任务

CompletableFuture创建异步任务,常用的方法有两个。
runAsync()和supplyAsync()方法相关的源码如下:
  1. // 使用默认内置线程池执行任务,根据runnable构建执行任务,无返回值
  2. public static CompletableFuture<Void> runAsync(Runnable runnable)
  3. // 使用自定义线程池执行任务,根据runnable构建执行任务,无返回值
  4. public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  5. // 使用默认内置线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
  6. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
  7. // 使用自定义线程池执行任务,根据supplyAsync构建执行任务,可以带返回值
  8. public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
复制代码
两者都支持使用自定义的线程池来执行任务,稍有不同的是supplyAsync()方法的入参使用的是Supplier接口,它表示效果的提供者,该效果返回一个对象且不担当任何参数,支持通过 lambda 语法简写
下面我们一起来看看相关的使用示例!
2.1.1、runAsync 使用示例
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<Void> cf = CompletableFuture.runAsync(new Runnable() {
  4.         @Override
  5.         public void run() {
  6.             System.out.println("runAsync,执行完毕");
  7.         }
  8.     });
  9.     System.out.println("runAsync,任务执行结果:" + cf.get());
  10. }
复制代码
输出效果:
  1. runAsync,执行完毕
  2. runAsync,任务执行结果:null
复制代码
2.1.2、supplyAsync 使用示例
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         return "hello world";
  6.     });
  7.     System.out.println("supplyAsync,任务执行结果:" + cf.get());
  8. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. supplyAsync,任务执行结果:hello world
复制代码
2.2、任务异步回调

当创建的异步任务执行完毕之后,我们盼望拿着上一个任务的执行效果,继承执行后续的任务,此时就可以采用回调方法来处理。
CompletableFuture针对任务异步回调做了很多的支持,常用的方法如下:
下面我们一起来看看相关的使用示例!
2.2.1、thenRun/thenRunAsync

thenRun()/thenRunAsync()方法,都表示上一个任务执行成功后的回调处理,无入参,无返回值。稍有不同的是,thenRunAsync()方法会采用独立的线程池来执行任务。
相关的源码方法如下:
  1. // 默认线程池
  2. private static final Executor asyncPool = useCommonPool ?
  3.         ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
  4. // 采用与上一个任务的线程池来执行任务
  5. public CompletableFuture<Void> thenRun(Runnable action) {
  6.     return uniRunStage(null, action);
  7. }
  8. // 采用默认线程池来执行任务
  9. public CompletableFuture<Void> thenRunAsync(Runnable action) {
  10.     return uniRunStage(asyncPool, action);
  11. }
复制代码
从源码上可以清晰的看到,thenRun()/thenRunAsync()方法都调用了uniRunStage()方法,不同的是thenRunAsync()使用了asyncPool参数,也就是默认的线程池;而thenRun()方法使用的是null,底层采用上一个任务的线程池来执行,总结下来就是:
thenAccept()/thenAcceptAsync()、thenApply()/thenApplyAsync()、whenComplete()/whenCompleteAsync()、handle()/handleAsync()方法之间的区别也类似,下文不再重复讲解。
下面我们一起来看看thenRun()方法的使用示例。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         return "hello world";
  6.     });
  7.     // 当上一个任务执行成功,会继续回调当前方法
  8.     CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
  9.         System.out.println("thenRun1,执行完毕");
  10.     });
  11.     CompletableFuture<Void> cf3 = cf2.thenRun(() -> {
  12.         System.out.println("thenRun2,执行完毕");
  13.     });
  14.     System.out.println("任务执行结果:" + cf3.get());
  15. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. thenRun1,执行完毕
  3. thenRun2,执行完毕
  4. 任务执行结果:null
复制代码
假如上一个任务执行异常,是不会回调thenRun()方法的,示比方下:
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         if(1 == 1){
  6.             throw new RuntimeException("执行异常");
  7.         }
  8.         return "hello world";
  9.     });
  10.     // 当上一个任务执行成功,会继续回调当前方法
  11.     CompletableFuture<Void> cf1 = cf.thenRun(() -> {
  12.         System.out.println("thenRun1,执行完毕");
  13.     });
  14.     // 监听执行时异常的回调方法
  15.     CompletableFuture<Void> cf2 = cf1.exceptionally((e) -> {
  16.         System.out.println("发生异常,错误信息:" + e.getMessage());
  17.         return null;
  18.     });
  19.     System.out.println("任务执行结果:" + cf2.get());
  20. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. 发生异常,错误信息:java.lang.RuntimeException: 执行异常
  3. 任务执行结果:null
复制代码
可以清晰的看到,thenRun()方法没有回调。
thenAccept()、thenAcceptAsync()、thenApply()、thenApplyAsync()方法也类似,当上一个任务执行异常,不会回调这些方法。
2.2.2、thenAccept/thenAcceptAsync

thenAccept()/thenAcceptAsync()方法,表示上一个任务执行成功后的回调方法,有入参,无返回值。
相关的示比方下。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         return "hello world";
  6.     });
  7.     // 当上一个任务执行成功,会继续回调当前方法
  8.     CompletableFuture<Void> cf2 = cf1.thenAccept((r) -> {
  9.         System.out.println("thenAccept,执行完毕,上一个任务执行结果值:" + r);
  10.     });
  11.     System.out.println("任务执行结果:" + cf2.get());
  12. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. thenAccept,执行完毕,上一个任务执行结果值:hello world
  3. 任务执行结果:null
复制代码
2.2.3、thenApply/thenApplyAsync

thenApply()/thenApplyAsync()方法,表示上一个任务执行成功后的回调方法,有入参,有返回值。
相关的示比方下。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         return "hello world";
  6.     });
  7.     // 当上一个任务执行成功,会继续回调当前方法
  8.     CompletableFuture<String> cf2 = cf1.thenApply((r) -> {
  9.         System.out.println("thenApply,执行完毕,上一个任务执行结果值:" + r);
  10.         return "gogogo";
  11.     });
  12.     System.out.println("任务执行结果:" + cf2.get());
  13. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. thenApply,执行完毕,上一个任务执行结果值:hello world
  3. 任务执行结果:gogogo
复制代码
2.2.4、whenComplete/whenCompleteAsync

whenComplete()/whenCompleteAsync()方法,表示任务执行完成后的回调方法,有入参,无返回值。
稍有不同的是:无论任务执行成功照旧失败,它都会回调。
相关的示比方下。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         if(1 == 1){
  6.             throw new RuntimeException("执行异常");
  7.         }
  8.         return "hello world";
  9.     });
  10.     // 当任务执行完成,会继续回调当前方法
  11.     CompletableFuture<String> cf2 = cf1.whenComplete((r, e) -> {
  12.         System.out.println("whenComplete,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
  13.     });
  14.     // 监听执行时异常的回调方法
  15.     CompletableFuture<String> cf3 = cf2.exceptionally((e) -> {
  16.         System.out.println("发生异常,错误信息:" + e.getMessage());
  17.         return e.getMessage();
  18.     });
  19.     System.out.println("任务执行结果:" + cf3.get());
  20. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. whenComplete,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
  3. 发生异常,错误信息:java.lang.RuntimeException: 执行异常
  4. 任务执行结果:java.lang.RuntimeException: 执行异常
复制代码
2.2.5、handle/handleAsync

handle()/handleAsync()方法,表示任务执行完成后的回调方法,有入参,有返回值。
同样的,无论任务执行成功照旧失败,它都会回调。
相关的示比方下。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行完毕");
  5.         if(1 == 1){
  6.             throw new RuntimeException("执行异常");
  7.         }
  8.         return "hello world";
  9.     });
  10.     // 当任务执行完成,会继续回调当前方法
  11.     CompletableFuture<String> cf2 = cf1.handle((r, e) -> {
  12.         System.out.println("handle,执行完毕,上一个任务执行结果值:" + r + ",异常信息:" + e.getMessage());
  13.         return "handle";
  14.     });
  15.    
  16.     System.out.println("任务执行结果:" + cf2.get());
  17. }
复制代码
输出效果:
  1. supplyAsync,执行完毕
  2. handle,执行完毕,上一个任务执行结果值:null,异常信息:java.lang.RuntimeException: 执行异常
  3. 任务执行结果:handle
复制代码
2.2.6、exceptionally

exceptionally()方法,表示任务执行异常后的回调方法。在上文的示例中有所介绍。
末了我们照旧简单的看下示例。
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync,执行开始");
  5.         if(1 == 1){
  6.             throw new RuntimeException("执行异常");
  7.         }
  8.         return "hello world";
  9.     });
  10.     // 监听执行时异常的回调方法
  11.     CompletableFuture<String> cf2 = cf1.exceptionally((e) -> {
  12.         System.out.println("发生异常,错误信息:" + e.getMessage());
  13.         return e.getMessage();
  14.     });
  15.     System.out.println("任务执行结果:" + cf2.get());
  16. }
复制代码
输出效果:
  1. supplyAsync,执行开始
  2. 发生异常,错误信息:java.lang.RuntimeException: 执行异常
  3. 任务执行结果:java.lang.RuntimeException: 执行异常
复制代码
2.3、多个任务组合处理

某些场景下,假如盼望获取两个不同的异步执行效果进行组合处理,可以采用多个任务组合处理方式。
CompletableFuture针对多个任务组合处理做了很多的支持,常用的组合方式有以下几种。
下面我们一起来看看相关的使用示例!
2.3.1、AND组合

实现AND组合的操纵方法有很多,比如runAfterBoth()、thenAcceptBoth()、thenCombine()等方法,它们之间的区别在于:是否带有入参、是否带有返回值。
其中thenCombine()方法支持传入参、带返回值。
相关示比方下:
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync1,执行完毕");
  5.         return "supplyAsync1";
  6.     });
  7.     CompletableFuture<String> cf2 = CompletableFuture
  8.             .supplyAsync(() -> {
  9.                 System.out.println("supplyAsync2,执行完毕");
  10.                 return "supplyAsync2";
  11.             })
  12.             .thenCombine(cf1, (r1, r2) -> {
  13.                 System.out.println("r1任务执行结果:" + r1);
  14.                 System.out.println("r2任务执行结果:" + r2);
  15.                 return r1 + "_" + r2;
  16.             });
  17.     System.out.println("任务执行结果:" + cf2.get());
  18. }
复制代码
输出效果:
  1. supplyAsync1,执行完毕
  2. supplyAsync2,执行完毕
  3. r1任务执行结果:supplyAsync2
  4. r2任务执行结果:supplyAsync1
  5. 任务执行结果:supplyAsync2_supplyAsync1
复制代码
2.3.2、OR组合

实现OR组合的操纵方法有很多,比如runAfterEither()、acceptEither()、applyToEither()等方法,区别同上。
其中applyToEither()方法支持传入参、带返回值。
相关示比方下:
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync1,执行完毕");
  5.         return "supplyAsync1";
  6.     });
  7.     CompletableFuture<String> cf2 = CompletableFuture
  8.             .supplyAsync(() -> {
  9.                 System.out.println("supplyAsync2,执行完毕");
  10.                 return "supplyAsync2";
  11.             })
  12.             .applyToEither(cf1, (r) -> {
  13.                 System.out.println("第一个执行成功的任务结果:" + r);
  14.                 return r + "_applyToEither";
  15.             });
  16.     System.out.println("任务执行结果:" + cf2.get());
  17. }
复制代码
输出效果:
  1. supplyAsync1,执行完毕
  2. supplyAsync2,执行完毕
  3. 第一个执行成功的任务结果:supplyAsync2
  4. 任务执行结果:supplyAsync2_applyToEither
复制代码
2.3.2、AllOf组合

实现AllOf组合的操纵就一个方法allOf(),可以将多个任务进行组合,只有都执行成功才会回调,回调入参为空值。
相关示比方下:
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync1,执行完毕");
  5.         return "supplyAsync1";
  6.     });
  7.     CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
  8.         System.out.println("supplyAsync2,执行完毕");
  9.         return "supplyAsync2";
  10.     });
  11.     // 将多个任务,进行AND组合
  12.     CompletableFuture<String> cf3 = CompletableFuture
  13.             .allOf(cf1, cf2)
  14.             .handle((r, e) -> {
  15.                 System.out.println("所有任务都执行成功,result:" +  r);
  16.                 return "over";
  17.             });
  18.     System.out.println(cf3.get());
  19. }
复制代码
输出效果:
  1. supplyAsync1,执行完毕
  2. supplyAsync2,执行完毕
  3. 所有任务都执行成功,result:null
  4. over
复制代码
2.3.3、AnyOf组合

实现AnyOf组合的操纵,同样就一个方法anyOf(),可以将多个任务进行组合,只要一个执行成功就会回调,回调入参有值。
相关示比方下:
  1. public static void main(String[] args) throws Exception {
  2.     // 创建异步执行任务
  3.     CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  4.         System.out.println("supplyAsync1,执行完毕");
  5.         return "supplyAsync1";
  6.     });
  7.     CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
  8.         System.out.println("supplyAsync2,执行完毕");
  9.         return "supplyAsync2";
  10.     });
  11.     // 将多个任务,进行AND组合
  12.     CompletableFuture<String> cf3 = CompletableFuture
  13.             .anyOf(cf1, cf2)
  14.             .handle((r, e) -> {
  15.                 System.out.println("某个任务执行成功,返回值:" + r);
  16.                 return "over";
  17.             });
  18.     System.out.println(cf3.get());
  19. }
复制代码
输出效果:
  1. supplyAsync1,执行完毕
  2. supplyAsync2,执行完毕
  3. 某个任务执行成功,返回值:supplyAsync1
  4. over
复制代码
三、小结

本文主要围绕CompletableFuture类相关用法进行了一次知识总结,通过CompletableFuture类可以简化异步编程,同时支持多种异步任务,按照条件组合处理,相比别的的并发工具类,操纵更加强大、实用。
本篇内容比较多,假如有描述不对的地方,欢迎网友留言指出,盼望本文知识总结能帮助到大家。
四、参考

1.https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
2.https://juejin.cn/post/6970558076642394142

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4