最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。
Future接口以及它的局限性
我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1.5 以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。
于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。- FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
- new Thread(futureTask).start();
- System.out.println(futureTask.get());
复制代码 或者使用线程池的方式- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<String> future = executorService.submit(() -> "三友");
- System.out.println(future.get());
- executorService.shutdown();
复制代码 线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。
Future接口的局限性
虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。
Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。- ExecutorService executorService = Executors.newSingleThreadExecutor();
- Future<String> future = executorService.submit(() -> "三友");
- while (!future.isDone()) {
- //任务有没有完成,没有就继续循环判断
- }
- System.out.println(future.get());
- executorService.shutdown();
复制代码 但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。
同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。
所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1.8的时候,Doug Lea 大神为我们提供了一种更为强大的类CompletableFuture。
什么是CompletableFuture?
CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。
CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。
CompletableFuture常见api详解
CompletableFuture的方法api多,但主要可以分为以下几类。
1、实例化CompletableFuture
构造方法创建
- CompletableFuture<String> completableFuture = new CompletableFuture<>();
- System.out.println(completableFuture.get());
复制代码 此时如果有其它线程执行如下代码,就能执行打印出 三友- completableFuture.complete("三友")
复制代码 静态方法创建
除了使用构造方法构造,CompletableFuture还提供了静态方法来创建- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
- public static CompletableFuture<Void> runAsync(Runnable runnable);
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
复制代码 supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。至于另一个参数Executor 就是用来执行异步任务的线程池,如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。
一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。- CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "三友");
- System.out.println(completableFuture.get());
复制代码 一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。
所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。
2、获取任务执行结果
- public T get();
- public T get(long timeout, TimeUnit unit);
- public T getNow(T valueIfAbsent);
- public T join();
复制代码 get()和get(long timeout, TimeUnit unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout, TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。
getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。
join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。
3、主动触发任务完成
- public boolean complete(T value);
- public boolean completeExceptionally(Throwable ex);
复制代码 complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。
completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。
4、对任务执行结果进行下一步处理
只能接收任务正常执行后的回调
[code]public <U> CompletionStage<U> thenApply(Function |