Java中「Future」接口详解

打印 上一主题 下一主题

主题 587|帖子 587|积分 1761

目录

主打一手结果导向;
一、背景

在系统中,异步执行任务,是很常见的功能逻辑,但是在不同的场景中,又存在很多细节差异;
有的任务只强调「执行过程」,并不需要追溯任务自身的「执行结果」,这里并不是指对系统和业务产生的效果,比如定时任务、消息队列等场景;
但是有些任务即强调「执行过程」,又需要追溯任务自身的「执行结果」,在流程中依赖某个异步结果,判断流程是否中断,比如「并行」处理;
串行处理】整个流程按照逻辑逐步推进,如果出现异常会导致流程中断;

并行处理】主流程按照逻辑逐步推进,其他「异步」交互的流程执行完毕后,将结果返回到主流程,如果「异步」流程异常,会影响部分结果;

此前在《「订单」业务》的内容中,聊过关于「串行」和「并行」的应用对比,即在订单详情的加载过程中,通过「并行」的方式读取:商品、商户、订单、用户等信息,提升接口的响应时间;
二、Future接口

1、入门案例

异步是对流程的解耦,但是有的流程中又依赖异步执行的最终结果,此时就可以使用「Future」接口来达到该目的,先来看一个简单的入门案例;
  1. public class ServerTask implements Callable<Integer> {
  2.     @Override
  3.     public Integer call() throws Exception {
  4.         Thread.sleep(2000);
  5.         return 3;
  6.     }
  7. }
  8. public class FutureBase01 {
  9.     public static void main(String[] args) throws Exception {
  10.         TimeInterval timer = DateUtil.timer();
  11.         // 线程池
  12.         ExecutorService executor = Executors.newFixedThreadPool(3);
  13.         // 批量任务
  14.         List<ServerTask> serverTasks = new ArrayList<>() ;
  15.         for (int i=0;i<3;i++){
  16.             serverTasks.add(new ServerTask());
  17.         }
  18.         List<Future<Integer>> taskResList = executor.invokeAll(serverTasks) ;
  19.         // 结果输出
  20.         for (Future<Integer> intFuture:taskResList){
  21.             System.out.println(intFuture.get());
  22.         }
  23.         // 耗时统计
  24.         System.out.println("timer...interval = "+timer.interval());
  25.     }
  26. }
复制代码
2、核心方法

2.1 实例方法
  1. public class FutureBase02 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池执行任务
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
  6.             @Override
  7.             public String call() throws Exception {
  8.                 Thread.sleep(3000);
  9.                 return "task...OK";
  10.             }
  11.         }) ;
  12.         executor.execute(futureTask);
  13.         // 任务信息获取
  14.         System.out.println("是否完成:"+futureTask.isDone());
  15.         System.out.println("是否取消:"+futureTask.isCancelled());
  16.         System.out.println("获取结果:"+futureTask.get());
  17.         System.out.println("尝试取消:"+futureTask.cancel(Boolean.TRUE));
  18.     }
  19. }
复制代码
2.2 计算方法
  1. public class CompletableBase01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         // 任务执行
  6.         CompletableFuture<String> cft = CompletableFuture.supplyAsync(() -> {
  7.             try {
  8.                 Thread.sleep(3000);
  9.             } catch (InterruptedException e) {
  10.                 e.printStackTrace();
  11.             }
  12.             return "Res...OK";
  13.         }, executor);
  14.         // 结果输出
  15.         System.out.println(cft.get());
  16.     }
  17. }
复制代码
2.3 结果获取方法
  1. public class Completable01 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         // 1、创建未完成的CompletableFuture,通过complete()方法完成
  6.         CompletableFuture<Integer> cft01 = new CompletableFuture<>() ;
  7.         cft01.complete(99) ;
  8.         // 2、创建已经完成CompletableFuture,并且给定结果
  9.         CompletableFuture<String> cft02 = CompletableFuture.completedFuture("given...value");
  10.         // 3、有返回值,默认ForkJoinPool线程池
  11.         CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {return "OK-3";});
  12.         // 4、有返回值,采用Executor自定义线程池
  13.         CompletableFuture<String> cft04 = CompletableFuture.supplyAsync(() -> {return "OK-4";},executor);
  14.         // 5、无返回值,默认ForkJoinPool线程池
  15.         CompletableFuture<Void> cft05 = CompletableFuture.runAsync(() -> {});
  16.         // 6、无返回值,采用Executor自定义线程池
  17.         CompletableFuture<Void> cft06 = CompletableFuture.runAsync(()-> {}, executor);
  18.     }
  19. }
复制代码
2.4 任务编排方法
  1. public class Completable02 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
  6.             try {
  7.                 Thread.sleep(2000);
  8.             } catch (InterruptedException e) {
  9.                 e.printStackTrace();
  10.             }
  11.             return "OK";
  12.         },executor);
  13.         // 1、计算完成后,执行后续处理
  14.         // cft01.whenComplete((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex));
  15.         // 2、触发计算,如果没有完成,则get设定的值,如果已完成,则get任务返回值
  16.         // boolean completeFlag = cft01.complete("given...value");
  17.         // if (completeFlag){
  18.         //     System.out.println(cft01.get());
  19.         // } else {
  20.         //     System.out.println(cft01.get());
  21.         // }
  22.         // 3、开启新CompletionStage,重新获取线程执行任务
  23.         cft01.whenCompleteAsync((res, ex) -> System.out.println("Result:"+res+";Exe:"+ex),executor);
  24.     }
  25. }
复制代码
2.5 异常处理方法
  1. public class Completable03 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
  6.             try {
  7.                 Thread.sleep(2000);
  8.             } catch (InterruptedException e) {
  9.                 e.printStackTrace();
  10.             }
  11.             return "Res...OK";
  12.         },executor);
  13.         // 1、阻塞直到获取结果
  14.         // System.out.println(cft01.get());
  15.         // 2、设定超时的阻塞获取结果
  16.         // System.out.println(cft01.get(4, TimeUnit.SECONDS));
  17.         // 3、非阻塞获取结果,如果任务已经完成,则返回结果,如果任务未完成,返回给定的值
  18.         // System.out.println(cft01.getNow("given...value"));
  19.         // 4、get获取抛检查异常,join获取非检查异常
  20.         System.out.println(cft01.join());
  21.     }
  22. }
复制代码
3、线程池问题


  • 在实践中,通常不使用ForkJoinPool#commonPool()公共线程池,会出现线程竞争问题,从而形成系统瓶颈;
  • 在任务编排中,如果出现依赖情况或者父子任务,尽量使用多个线程池,从而避免任务请求同一个线程池,规避死锁情况发生;
四、CompletableFuture原理

1、核心结构

在分析「CompletableFuture」其原理之前,首先看一下涉及的核心结构;

CompletableFuture
在该类中有两个关键的字段:「result」存储当前CF的结果,「stack」代表栈顶元素,即当前CF计算完成后会触发的依赖动作;从上面案例中可知,依赖动作可以没有或者有多个;
Completion
依赖动作的封装类;
UniCompletion
继承Completion类,一元依赖的基础类,「executor」指线程池,「dep」指依赖的计算,「src」指源动作;
BiCompletion
继承UniCompletion类,二元或者多元依赖的基础类,「snd」指第二个源动作;
2、零依赖

顾名思义,即各个CF之间不产生依赖关系;
  1. public class Completable04 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
  6.             try {
  7.                 Thread.sleep(2000);
  8.             } catch (InterruptedException e) {
  9.                 e.printStackTrace();
  10.             }
  11.             System.out.println("OK-1");
  12.             return "OK";
  13.         },executor);
  14.         // 1、cft01任务执行完成后,执行之后的任务,此处不关注cft01的结果
  15.         // cft01.thenRun(() -> System.out.println("task...run")) ;
  16.         // 2、cft01任务执行完成后,执行之后的任务,可以获取cft01的结果
  17.         // cft01.thenAccept((res) -> {
  18.         //     System.out.println("cft01:"+res);
  19.         //     System.out.println("task...run");
  20.         // });
  21.         // 3、cft01任务执行完成后,执行之后的任务,获取cft01的结果,并且具有返回值
  22.         // CompletableFuture<Integer> cft02 = cft01.thenApply((res) -> {
  23.         //     System.out.println("cft01:"+res);
  24.         //     return 99 ;
  25.         // });
  26.         // System.out.println(cft02.get());
  27.         // 4、顺序执行cft01、cft02
  28.         // CompletableFuture<String> cft02 = cft01.thenCompose((res) ->  CompletableFuture.supplyAsync(() -> {
  29.         //     System.out.println("cft01:"+res);
  30.         //     return "OK-2";
  31.         // }));
  32.         // cft02.whenComplete((res,ex) -> System.out.println("Result:"+res+";Exe:"+ex));
  33.         // 5、对比任务的执行效率,由于cft02先完成,所以取cft02的结果
  34.         // CompletableFuture<String> cft02 = cft01.applyToEither(CompletableFuture.supplyAsync(() -> {
  35.         //     System.out.println("run...cft02");
  36.         //     try {
  37.         //         Thread.sleep(3000);
  38.         //     } catch (InterruptedException e) {
  39.         //         e.printStackTrace();
  40.         //     }
  41.         //     return "OK-2";
  42.         // }),(res) -> {
  43.         //     System.out.println("either...result:" + res);
  44.         //     return res;
  45.         // });
  46.         // System.out.println("finally...result:" + cft02.get());
  47.         // 6、两组任务执行完成后,对结果进行合并
  48.         // CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> "OK-2") ;
  49.         // String finallyRes = cft01.thenCombine(cft02,(res1,res2) -> {
  50.         //     System.out.println("res1:"+res1+";res2:"+res2);
  51.         //     return res1+";"+res2 ;
  52.         // }).get();
  53.         // System.out.println(finallyRes);
  54.         CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {
  55.             System.out.println("OK-2");
  56.             return  "OK-2";
  57.         }) ;
  58.         CompletableFuture<String> cft03 = CompletableFuture.supplyAsync(() -> {
  59.             System.out.println("OK-3");
  60.             return "OK-3";
  61.         }) ;
  62.         // 7、等待批量任务执行完返回
  63.         // CompletableFuture.allOf(cft01,cft02,cft03).get();
  64.         // 8、任意一个任务执行完即返回
  65.         System.out.println("Sign:"+CompletableFuture.anyOf(cft01,cft02,cft03).get());
  66.     }
  67. }
复制代码
3、一元依赖

即CF之间的单个依赖关系;这里使用「thenApply」方法演示,为了看到效果,使「cft1」长时间休眠,断点查看「stack」结构;
  1. public class Completable05 {
  2.     public static void main(String[] args) throws Exception {
  3.         // 线程池
  4.         ExecutorService executor = Executors.newFixedThreadPool(3);
  5.         CompletableFuture<String> cft01 = CompletableFuture.supplyAsync(() -> {
  6.             if (1 > 0){
  7.                 throw new RuntimeException("task...exception");
  8.             }
  9.             return "OK";
  10.         },executor);
  11.         // 1、捕获cft01的异常信息,并提供返回值
  12.         String finallyRes = cft01.thenApply((res) -> {
  13.             System.out.println("cft01-res:" + res);
  14.             return res;
  15.         }).exceptionally((ex) -> {
  16.             System.out.println("cft01-exe:" + ex.getMessage());
  17.             return "error" ;
  18.         }).get();
  19.         System.out.println("finallyRes="+finallyRes);
  20.         CompletableFuture<String> cft02 = CompletableFuture.supplyAsync(() -> {
  21.             try {
  22.                 Thread.sleep(1000);
  23.             } catch (InterruptedException e) {
  24.                 e.printStackTrace();
  25.             }
  26.             return "OK-2";
  27.         },executor);
  28.         // 2、如果cft02未完成,则get时抛出指定异常信息
  29.         boolean exeFlag = cft02.completeExceptionally(new RuntimeException("given...exception"));
  30.         if (exeFlag){
  31.             System.out.println(cft02.get());
  32.         } else {
  33.             System.out.println(cft02.get());
  34.         }
  35.     }
  36. }
复制代码
断点截图

原理分析

观察者Completion注册到「cft1」,注册时会检查计算是否完成,未完成则观察者入栈,当「cft1」计算完成会弹栈;已完成则直接触发观察者;
可以调整断点代码,让「cft1」先处于完成状态,再查看其运行时结构,从而分析完整的逻辑;
4、二元依赖

即一个CF同时依赖两个CF;这里使用「thenCombine」方法演示;为了看到效果,使「cft1、cft2」长时间休眠,断点查看「stack」结构;
  1. public class DepZero {
  2.     public static void main(String[] args) throws Exception {
  3.         ExecutorService executor = Executors.newFixedThreadPool(3);
  4.         CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(()-> "OK-1",executor);
  5.         CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(()-> "OK-2",executor);
  6.         System.out.println(cft1.get()+";"+cft2.get());
  7.     }
  8. }
复制代码
断点截图

原理分析

在「cft1」和「cft2」未完成的状态下,尝试将BiApply压入「cft1」和「cft2」两个栈中,任意CF完成时,会尝试触发观察者,观察者检查「cft1」和「cft2」是否都完成,如果完成则执行;
5、多元依赖

即一个CF同时依赖多个CF;这里使用「allOf」方法演示;为了看到效果,使「cft1、cft2、cft3」长时间休眠,断点查看「stack」结构;
  1. public class DepOne {
  2.     public static void main(String[] args) throws Exception {
  3.         ExecutorService executor = Executors.newFixedThreadPool(3);
  4.         CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
  5.             try {
  6.                 Thread.sleep(30000);
  7.             } catch (InterruptedException e) {
  8.                 e.printStackTrace();
  9.             }
  10.             return "OK-1";
  11.         },executor);
  12.         CompletableFuture<String> cft2 = cft1.thenApply(res -> {
  13.             System.out.println("cft01-res"+res);
  14.             return "OK-2" ;
  15.         });
  16.         System.out.println("cft02-res"+cft2.get());
  17.     }
  18. }
复制代码
断点截图

原理分析

多元依赖的回调方法除了「allOf」还有「anyOf」,其实现原理都是将依赖的多个CF补全为平衡二叉树,从断点图可知会按照树的层级处理,核心结构参考二元依赖即可;
五、参考源码
  1. public class DepTwo {
  2.     public static void main(String[] args) throws Exception {
  3.         ExecutorService executor = Executors.newFixedThreadPool(3);
  4.         CompletableFuture<String> cft1 = CompletableFuture.supplyAsync(() -> {
  5.             try {
  6.                 Thread.sleep(30000);
  7.             } catch (InterruptedException e) {
  8.                 e.printStackTrace();
  9.             }
  10.             return "OK-1";
  11.         },executor);
  12.         CompletableFuture<String> cft2 = CompletableFuture.supplyAsync(() -> {
  13.             try {
  14.                 Thread.sleep(30000);
  15.             } catch (InterruptedException e) {
  16.                 e.printStackTrace();
  17.             }
  18.             return "OK-2";
  19.         },executor);
  20.         // cft3 依赖 cft1和cft2 的计算结果
  21.         CompletableFuture<String> cft3 = cft1.thenCombine(cft2,(res1,res2) -> {
  22.             System.out.println("cft01-res:"+res1);
  23.             System.out.println("cft02-res:"+res2);
  24.             return "OK-3" ;
  25.         });
  26.         System.out.println("cft03-res:"+cft3.get());
  27.     }
  28. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

曹旭辉

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表