CompletableFuture详解

打印 上一主题 下一主题

主题 799|帖子 799|积分 2397

CompletableFuture详解

学习链接:https://juejin.cn/post/7124124854747398175?searchId=20240806151438B643DF2AAD2FC5E6F11E
一、CompletableFuture简介

       在JAVA8开始引入了全新的CompletableFuture类,它是Future接口的一个实现类。也就是在Future接口的基础上,额外封装提供了一些执行方法,用来办理Future使用场景中的一些不足,对流水线处理能力提供了支持。

        当我们必要进行异步处理的时候,我们可以通过CompletableFuture.supplyAsync方法,传入一个具体的要执行的处理逻辑函数,如许就轻松的完成了CompletableFuture的创建与触发执行。
方法名称作用描述supplyAsync静态方法,用于构建一个CompletableFuture<T> 对象,并异步执行传入的参数,允许执行函数有返回值runAsync静态方法,用于构建一个CompletableFuture<Void> 对象,并异步执行传入函数,与supplyAsync的区别在于此方法传入的是Callable范例,仅执行,没有返回值 使用示例:
  1. public void testCreateFuture(String product) {
  2.     // supplyAsync, 执行逻辑有返回值PriceResult
  3.     CompletableFuture<PriceResult> supplyAsyncResult =
  4.             CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product));
  5.     // runAsync, 执行逻辑没有返回值
  6.     CompletableFuture<Void> runAsyncResult =
  7.             CompletableFuture.runAsync(() -> System.out.println(product));
  8. }
复制代码
特别补充:
   supplyAsync大概runAsync创建后便会立即执行,无需手动调用触发。
  二、环环相扣处理

       在流水线处理场景中,往往都是一个任务环节处理完成后,下一个任务环节接着上一环节处理效果继续处理。CompletableFuture用于这种流水线环节驱动类的方法有许多,相互之间主要是在返回值大概给到下一环节的入参上有些许差异,使用时必要留意区分:

具体的方法的描述归纳如下:
方法名称作用描述thenApply对CompletableFuture的执行后的效果进行追加处理,并将当前的CompletableFuture泛型对象更改为处理后新的对象范例,返回当前CompletableFuture对象thenCompose与thenApply类似,区别点在于:此方法的入参函数返回一个CompletableFuture范例对象thenAccept在全部异步任务完成后执行一系列操作,与thenApply类似,区别点在于thenApply返回void范例,没有具体效果输出,适合无需返回值的场景thenRun与thenAccept类似,区别点在于thenAccept可以将前面CompletableFuture执行的实际效果作为参数进行传入并使用,但是thenRun方法没有任何入参,只能执行一个Runnable函数,并且返回void范例        由于上述thenApply、thenCompose方法的输出仍然都是一个CompletableFuture对象,所以各个方法是可以一环接一环的进行调用,形成流水线式的处理逻辑:

       期望总是美好的,但是实际情况却总不尽如人意。在我们编排流水线的时候,如果某一个环节执行抛出非常了,会导致整个流水线后续的环节就没法再继续下去了,比如下面的例子:
  1. public void testExceptionHandle() {
  2.     CompletableFuture.supplyAsync(() -> {
  3.         throw new RuntimeException("supplyAsync excetion occurred...");
  4.     }).thenApply(obj -> {
  5.         System.out.println("thenApply executed...");
  6.         return obj;
  7.     }).join();
  8. }
复制代码
       执行之后会发现,supplyAsync抛出非常后,后面的thenApply并没有被执行。
那如果我们想要让流水线的每个环节处理失败之后都能让流水线继续往下面环节处理,让后续环节可以拿到前面环节的效果大概是抛出的非常并进行对应的应对处理,就必要用到handle和whenCompletable方法了。
先看下两个方法的作用描述:
方法名称方法描述handle与thenApply类似,区别点在于handle执行函数的入参有两个,一个是CompletableFuture执行的实际效果,一个是是Throwable对象,如许如果前面执行出现非常的时候,可以通过handle获取到非常并进行处理。whenComplete与handle类似,区别点在于whenComplete执行后无返回值。 我们对上面一段代码示例修改使用handle方法来处理:
  1. public void testExceptionHandle() {
  2.     CompletableFuture.supplyAsync(() -> {
  3.         throw new RuntimeException("supplyAsync excetion occurred...");
  4.     }).handle((obj, e) -> {
  5.         if (e != null) {
  6.             System.out.println("thenApply executed, exception occurred...
  7. ");
  8.         }
  9.         return obj;
  10.     }).join();
  11. }
复制代码
再执行可以发现,纵然前面环节出现非常,后面环节也可以继续处理,且可以拿到前一环节抛出的非常信息:
  1. thenApply executed, exception occurred...
复制代码
三、多个CompletableFuture组合操作

       前面不停在先容流水线式的处理场景,但是许多时候,流水线处理场景也不会是一个链路顺序往下走的情况,许多时候为了提拔并行效率,一些没有依靠的环节我们会让他们同时去执行,然后在某些环节必要依靠的时候,进行效果的依靠归并处理,类似如下图的效果。

CompletableFuture相比于Future的一大优势,就是可以方便的实现多个并行环节的归并处理。相关涉及方法先容归纳如下:
方法名称方法描述thenCombine将两个CompletableFuture对象组合起来进行下一步处理,可以拿到两个执行效果,并传给自己的执行函数进行下一步处理,最后返回一个新的CompletableFuture对象。thenAcceptBoth与thenCombine类似,区别点在于thenAcceptBoth传入的执行函数没有返回值,即thenAcceptBoth返回值为CompletableFuture<Void>。runAfterBoth等候两个CompletableFuture都执行完成后再执行某个Runnable对象,再执行下一个的逻辑,类似thenRun。applyToEither两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenApply。acceptEither两个CompletableFuture中任意一个完成的时候,继续执行后面给定的新的函数处理。再执行后面给定函数的逻辑,类似thenAccept。runAfterEither等候两个CompletableFuture中任意一个执行完成后再执行某个Runnable对象,可以理解为thenRun的升级版,留意与runAfterBoth对比理解。allOf静态方法,阻塞等候全部给定的CompletableFuture执行结束后,返回一个CompletableFuture<Void>效果。anyOf静态方法,阻塞等候任意一个给定的CompletableFuture对象执行结束后,返回一个CompletableFuture<Void>效果。 四、效果等候与获取

       在执行线程中将任务放到工作线程中进行处理的时候,执行线程与工作线程之间是异步执行的模式,如果执行线程必要获取到共工作线程的执行效果,则可以通过get大概join方法,阻塞等候并从CompletableFuture中获取对应的值。

对get和join的方法功能寄义阐明归纳如下:
方法名称作用描述get()等候CompletableFuture执行完成并获取其具体执行效果,大概会抛出非常,必要代码调用的地方手动try...catch进行处理。get(long, TimeUnit)与get()雷同,只是允许设定阻塞等候超时时间,如果等候凌驾设定时间,则会抛出非常终止阻塞等候。join()等候CompletableFuture执行完成并获取其具体执行效果,大概会抛出运行时非常,无需代码调用的地方手动try...catch进行处理。 从先容上可以看出,两者的区别就在于是否必要调用方显式的进行try…catch处理逻辑,使用代码示例如下:
  1. public void testGetAndJoin(String product) {
  2.     // join无需显式try...catch...
  3.     PriceResult joinResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
  4.             .join();
  5.    
  6.     try {
  7.         // get显式try...catch...
  8.         PriceResult getResult = CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
  9.                 .get(5L, TimeUnit.SECONDS);
  10.     } catch (Exception e) {
  11.         e.printStackTrace();
  12.     }
  13. }
复制代码
五、CompletableFuture方法及其Async版本

       我们在使用CompletableFuture的时候会发现,有许多的方法,都会同时有两个以Async定名结尾的方法版本。以前面我们用的比较多的thenCombine方法为例:

  • thenCombine(CompletionStage, BiFunction)
  • thenCombineAsync(CompletionStage, BiFunction)
  • thenCombineAsync(CompletionStage, BiFunction, Executor)
从参数上看,区别并不大,仅第三个方法入参中多了线程池Executor对象。看下三个方法的源码实现,会发现其整体实现逻辑都是一致的,仅仅是使用线程池这个地方的逻辑有一点点的差异:

有爱好的可以去翻一下此部分的源码实现,这里概括下三者的区别:

  • thenCombine方法,沿用上一个执行任务所使用的线程池进行处理
  • thenCombineAsync两个入参的方法,使用默认的ForkJoinPool线程池中的工作线程进行处理
  • themCombineAsync三个入参的方法,支持自定义线程池并指定使用自定义线程池中的线程作为工作线程行止理待执行任务。
为了更好的理解下上述的三个差异点,我们通过下面的代码来演示下:
用法1:其中一个supplyAsync方法以及thenCombineAsync指定使用自定义线程池,另一个supplyAsync方法不指定线程池(使用默认线程池)
  1. public PriceResult getCheapestPlatAndPrice4(String product) {
  2.     // 构造自定义线程池
  3.     ExecutorService executor = Executors.newFixedThreadPool(5);
  4.    
  5.     return
  6.         CompletableFuture.supplyAsync(
  7.             () -> HttpRequestMock.getMouXiXiPrice(product),
  8.             executor
  9.         ).thenCombineAsync(
  10.             CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
  11.             this::computeRealPrice,
  12.             executor
  13.         ).join();
  14. }
复制代码
对上述代码实现策略的解读,以及与执行效果的关系展示如下图所示,可以看出,没有指定自定义线程池的supplyAsync方法,其使用了默认的ForkJoinPool工作线程来运行,而另外两个指定了自定义线程池的方法,则使用了自定义线程池来执行。

用法2: 不指定自定义线程池,使用默认线程池策略,使用thenCombine方法
  1. public PriceResult getCheapestPlatAndPrice5(String product) {
  2.     return
  3.         CompletableFuture.supplyAsync(
  4.             () -> HttpRequestMock.getMouXiXiPrice(product)
  5.         ).thenCombine(
  6.             CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
  7.             this::computeRealPrice
  8.         ).join();
  9. }
复制代码
执行效果如下,可以看到执行线程名称与用法1示例相比发生了变化。由于没有指定线程池,所以两个supplyAsync方法都是用的默认的ForkJoinPool线程池,而thenCombine使用的是上一个任务所使用的线程池,所以也是用的ForkJoinPool。
  1. 14:34:27.815[ForkJoinPool.commonPool-worker-1|12]获取某夕夕上 Iphone13的价格
  2. 14:34:27.815[ForkJoinPool.commonPool-worker-2|13]获取某夕夕上 Iphone13的优惠
  3. 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]获取某夕夕上 Iphone13的优惠完成: -5300
  4. 14:34:28.831[ForkJoinPool.commonPool-worker-1|12]获取某夕夕上 Iphone13的价格完成: 5399
  5. 14:34:28.831[ForkJoinPool.commonPool-worker-2|13]某夕夕最终价格计算完成:99
  6. 获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】
  7. -----执行耗时: 1083ms  ------
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表