深入理解Java中的FutureTask:用法和原理

鼠扑  金牌会员 | 2024-10-30 06:16:56 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 925|帖子 925|积分 2775

媒介

Callable、Future和FutureTask是jdk1.5,java.util.concurrent包提供的异步框架
这里先讲一下什么是异步?异步是指起多个线程,多个线程之间互不干扰,各自执行各自的任务,在代码中大概书写顺序有先有后,但有大概写在后面的线程会比写在前面的线程先执行任务,异步对应并行的概念,常见的异步操作有线程池、Callable、completeFuture等。
同步是多线程里针对竞争现象的一个处置惩罚,竞争是指同一时刻有多个线程访问临界资源,大概会引发程序执行错误的效果,同步就是包管某一时刻仅能有一个线程访问临界资源,同步对应串行的概念,常见的同步操作有synchronized关键字、Lock、线程变量、Atomic原子类等。
什么时候必要异步?比如要执行一个任务,该任务执行完后会返回一个效果供其他任务使用,但是该任务很耗时,如果我们把程序计划成串行执行,先执行这个耗时任务,等他结束后再把执行效果给下一个任务使用,这样会耗时,且在这个任务执行期间,其他任务都被阻塞了。那么就可以把程序计划成异步,起一个线程执行这个耗时任务,此外主线程做其他事变,等这个耗时任务执行完毕后,主线程再把效果拿到,使用这个效果继续做其他事变,这样在这个耗时任务执行的过程中,主线程可以去做其他事变而不是等他执行完,这样效率会很高,因此异步编程在进步并发量上使用广泛。
Callable接口

先看Callable接口的源码:
  1. @FunctionalInterface
  2. public interface Callable<V> {
  3.     /**
  4.      * Computes a result, or throws an exception if unable to do so.
  5.      *
  6.      * @return computed result
  7.      * @throws Exception if unable to compute a result
  8.      */
  9.     V call() throws Exception;
  10. }
复制代码
首先是注解是函数式接口,意味着可以用lambda表达式更简洁地使用它。Callable是个泛型接口,只有一个方法call,该方法返回类型就是传递进来的V类型。call方法还支持抛出异常.
与Callable对应的是Runnable接口,实现了这两个接口的类都可以当做线程任务递交给线程池执行,Runnable接口的源码如下:
  1. @FunctionalInterface
  2. public interface Runnable {
  3.     /**
  4.      * When an object implementing interface Runnable is used
  5.      * to create a thread, starting the thread causes the object's
  6.      * run method to be called in that separately executing
  7.      * thread.
  8.      * <p>
  9.      * The general contract of the method run is that it may
  10.      * take any action whatsoever.
  11.      *
  12.      * @see     java.lang.Thread#run()
  13.      */
  14.     public abstract void run();
  15. }
复制代码
既然实现了这两个接口的类都可以当做线程任务,那么这两个接口有什么区别呢?

  • Runnable接口是java1.1就有的,Callable接口是java1.5才有的,可以以为Callable接口是升级版的Runnable接口;
  • Runnable接口里线程任务是在run方法里写的,Callable接口里线程任务是在call方法里写;
  • Callable接口的任务执行后会有返回值,Runnable接口的任务无返回值(void);
  • Callable接口的call方法支持抛出异常,Runnable接口的run方法不可以;
  • 加入线程池运行,Runnable使用ExecutorService的execute方法,Callable使用ExecutorService的submit方法;
  • 运行Callable任务可以拿到一个Future对象,表现异步计算的效果。Future对象封装了检查计算是否完成、检索计算的效果的方法,而Runnable接口没有。
Callable使用ExecutorService的submit方法,这里看一下ExecutorService接口里的submit方法的重载情况:
  1. <T> Future<T> submit(Callable<T> task);
  2. <T> Future<T> submit(Runnable task, T result);
  3. Future<?> submit(Runnable task);
复制代码
常用的是第一个和第三个,这两个方法分别提交实现了Callable接口的类和实现了Runnable接口的类作为线程任务,返回异步计算效果Future,Future里面封装了一些实用方法可以对异步计算效果进行进一步处置惩罚。
Future接口

Future接口代表异步计算的效果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行效果并获取执行效果,同时还可以取消执行。Future接口的定义如下:
  1. public interface Future<V> {
  2.     boolean cancel(boolean mayInterruptIfRunning);
  3.     boolean isCancelled();
  4.     boolean isDone();
  5.     V get() throws InterruptedException, ExecutionException;
  6.     V get(long timeout, TimeUnit unit)
  7.         throws InterruptedException, ExecutionException, TimeoutException;
  8. }
复制代码
下面对这五个方法介绍:

  • cancel():用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些缘故起因不能取消,则会返回false。如果任务还没有被执行,则会返回true而且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
  • isCanceled():判定任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
  • isDone():判定任务是否已经完成,如果完成则返回true,否则返回false。必要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
  • get():获取任务执行效果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。
  • get(long timeout,Timeunit unit):带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。
注意这里两个get方法都会抛出异常。
FutureTask

Future是一个接口,而FutureTask 为 Future 提供了底子实现,如获取任务执行效果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行效果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用runAndReset执行计算)。FutureTask 常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来包管。
源码如下:
  1. public class FutureTask<V> implements RunnableFuture<V> {
  2. ...
  3. }
复制代码
FutureTask类实现的是RunnableFuture 接口,该接口的源码如下:
  1. public interface RunnableFuture<V> extends Runnable, Future<V> {
  2.     /**
  3.      * Sets this Future to the result of its computation
  4.      * unless it has been cancelled.
  5.      */
  6.     void run();
  7. }
复制代码
该接口继续了Runnable接口和Future接口,因此FutureTask类既可以当做线程任务递交给线程池执行,又能当Callable任务的计算效果。
Future VS FutureTask

Future与FutureTask的区别:

  • Future是一个接口,FutureTask是一个实现类;
  • 使用Future初始化一个异步任务效果一般必要搭配线程池的submit,且submit方法有返回值;而初始化一个FutureTask对象必要传入一个实现了Callable接口的类的对象,直接将FutureTask对象submit给线程池,无返回值;
  • Future + Callable获取效果必要Future对象的get,而FutureTask获取效果直接用FutureTask对象的get方法即可。
使用示例

Callable + Future

实现Callable接口创建一个异步任务的类,在主线程中起一个线程池执行异步任务,然后在主线程里拿到异步任务的返回效果。
  1. import java.util.concurrent.*;
  2. public class AsynDemo {
  3.     public static void main(String[] args) {
  4.         // 初始化线程池
  5.         ExecutorService threadPool = Executors.newFixedThreadPool(5);
  6.         // 初始化线程任务
  7.         MyTask myTask = new MyTask(5);
  8.         // 向线程池递交线程任务
  9.         Future<Integer> result = threadPool.submit(myTask);
  10.         // 主线程休眠2秒,模拟主线程在做其他的事情
  11.         try {
  12.             Thread.sleep(2000);
  13.         } catch (InterruptedException e) {
  14.             e.printStackTrace();
  15.         }
  16.         // 主线程获取异步任务的执行结果
  17.         try {
  18.             System.out.println("异步线程任务执行结果 " + result.get());
  19.             System.out.println("检查异步线程任务是否执行完毕 " + result.isDone());
  20.         } catch (InterruptedException e) {
  21.             e.printStackTrace();
  22.         } catch (ExecutionException e) {
  23.             e.printStackTrace();
  24.         }
  25.     }
  26.     // 静态内部类,实现Callable接口的任务类
  27.     static class MyTask implements Callable<Integer> {
  28.         private int num;
  29.         public MyTask(int num) {
  30.             this.num = num;
  31.         }
  32.         @Override
  33.         public Integer call() throws Exception {
  34.             for (int i = 1; i < 10; ++i) {
  35.                 this.num *= i;
  36.             }
  37.             return this.num;
  38.         }
  39.     }
  40. }
复制代码
执行效果如下:
  1. 异步线程任务执行结果 1814400
  2. 检查异步线程任务是否执行完毕 true
复制代码
Callable + FutureTask

要做的事变跟4.1一样。
  1. import java.util.concurrent.*;
  2. public class FutureTaskDemo {
  3.     public static void main(String[] args) {
  4.         // 初始化线程池
  5.         ExecutorService threadPool = Executors.newFixedThreadPool(5);
  6.         // 初始化MyTask实例
  7.         MyTask myTask = new MyTask(5);
  8.         // 用MyTask的实例对象初始化一个FutureTask对象
  9.         FutureTask<Integer> myFutureTask = new FutureTask<>(myTask);
  10.         // 向线程池递交线程任务
  11.         threadPool.submit(myFutureTask);
  12.         // 关闭线程池
  13.         threadPool.shutdown();
  14.         // 主线程休眠2秒,模拟主线程在做其他的事情
  15.         try {
  16.             Thread.sleep(2000);
  17.         } catch (InterruptedException e) {
  18.             e.printStackTrace();
  19.         }
  20.         // 主线程获取异步任务的执行结果
  21.         try {
  22.             System.out.println("异步线程任务执行结果 " + myFutureTask.get());
  23.             System.out.println("检查异步线程任务是否执行完毕 " + myFutureTask.isDone());
  24.         } catch (InterruptedException e) {
  25.             e.printStackTrace();
  26.         } catch (ExecutionException e) {
  27.             e.printStackTrace();
  28.         }
  29.     }
  30.     // 静态内部类,实现Callable接口的任务类
  31.     static class MyTask implements Callable<Integer> {
  32.         private int num;
  33.         public MyTask(int num) {
  34.             this.num = num;
  35.         }
  36.         @Override
  37.         public Integer call() throws Exception {
  38.             System.out.println(Thread.currentThread().getName() + " 正在执行异步任务");
  39.             for (int i = 1; i < 10; ++i) {
  40.                 this.num *= i;
  41.             }
  42.             return this.num;
  43.         }
  44.     }
  45. }
复制代码
执行效果与4.1一样。
底层源码解析

FutureTask类关系


可以看到,FutureTask实现了RunnableFuture接口,则RunnableFuture接口继续了Runnable接口和Future接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算效果。
焦点属性
  1. //内部持有的callable任务,运行完毕后置空
  2. private Callable<V> callable;
  3. //从get()中返回的结果或抛出的异常
  4. private Object outcome; // non-volatile, protected by state reads/writes
  5. //运行callable的线程
  6. private volatile Thread runner;
  7. //使用Treiber栈保存等待线程
  8. private volatile WaitNode waiters;
  9. //任务状态
  10. private volatile int state;
  11. private static final int NEW          = 0;
  12. private static final int COMPLETING   = 1;
  13. private static final int NORMAL       = 2;
  14. private static final int EXCEPTIONAL  = 3;
  15. private static final int CANCELLED    = 4;
  16. private static final int INTERRUPTING = 5;
  17. private static final int INTERRUPTED  = 6;
复制代码
其中必要注意的是state是volatile类型的,也就是说只要有任何一个线程修改了这个变量,那么其他全部的线程都会知道最新的值。7种状态具体表现:

  • NEW:表现是个新的任务或者还没被执行完的任务。这是初始状态。
  • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行效果或者异常缘故起因还没有保存到outcome字段(outcome字段用来保存任务执行效果,如果发生异常,则用来保存异常缘故起因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中心状态。
  • NORMAL:任务已经执行完成而且任务执行效果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
  • EXCEPTIONAL:任务执行发生异常而且异常缘故起因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
  • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
  • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务而且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中心状态。
  • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 有一点必要注意的是,全部值大于COMPLETING的状态都表现任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。
各个状态之间的大概转换关系如下图所示:

构造函数


  • FutureTask(Callable callable)
  1. public FutureTask(Callable<V> callable) {
  2.     if (callable == null)
  3.         throw new NullPointerException();
  4.     this.callable = callable;
  5.     this.state = NEW;       // ensure visibility of callable
  6. }
复制代码
这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callable callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。

  • FutureTask(Runnable runnable, V result)
  1. public FutureTask(Runnable runnable, V result) {
  2.     this.callable = Executors.callable(runnable, result);
  3.     this.state = NEW;       // ensure visibility of callable
  4. }
复制代码
这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不必要返回值的话可以传入一个null。
顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码如下:
  1. public static <T> Callable<T> callable(Runnable task, T result) {
  2.     if (task == null)
  3.        throw new NullPointerException();
  4.     return new RunnableAdapter<T>(task, result);
  5. }
复制代码
可以看到这里接纳的是适配器模式,调用RunnableAdapter(task, result)方法来适配,实现如下:
  1. static final class RunnableAdapter<T> implements Callable<T> {
  2.     final Runnable task;
  3.     final T result;
  4.     RunnableAdapter(Runnable task, T result) {
  5.         this.task = task;
  6.         this.result = result;
  7.     }
  8.     public T call() {
  9.         task.run();
  10.         return result;
  11.     }
  12. }
复制代码
这个适配器很简单,就是简单的实现了Callable接口,在call()实现中调用Runnable.run()方法,然后把传入的result作为任务的效果返回。
在new了一个FutureTask对象之后,接下来就是在另一个线程中执行这个Task,无论是通过直接new一个Thread照旧通过线程池,执行的都是run()方法,接下来就看看run()方法的实现。
焦点方法 - run()
  1. public void run() {
  2.     //新建任务,CAS替换runner为当前线程
  3.     if (state != NEW ||
  4.         !UNSAFE.compareAndSwapObject(this, runnerOffset,
  5.                                      null, Thread.currentThread()))
  6.         return;
  7.     try {
  8.         Callable<V> c = callable;
  9.         if (c != null && state == NEW) {
  10.             V result;
  11.             boolean ran;
  12.             try {
  13.                 result = c.call();
  14.                 ran = true;
  15.             } catch (Throwable ex) {
  16.                 result = null;
  17.                 ran = false;
  18.                 setException(ex);
  19.             }
  20.             if (ran)
  21.                 set(result);//设置执行结果
  22.         }
  23.     } finally {
  24.         // runner must be non-null until state is settled to
  25.         // prevent concurrent calls to run()
  26.         runner = null;
  27.         // state must be re-read after nulling runner to prevent
  28.         // leaked interrupts
  29.         int s = state;
  30.         if (s >= INTERRUPTING)
  31.             handlePossibleCancellationInterrupt(s);//处理中断逻辑
  32.     }
  33. }
复制代码
说明:

  • 运行任务,如果任务状态为NEW状态,则利用CAS修改为当火线程。执行完毕调用set(result)方法设置执行效果。set(result)源码如下:
  1. protected void set(V v) {
  2.     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3.         outcome = v;
  4.         UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  5.         finishCompletion();//执行完毕,唤醒等待线程
  6.     }
  7. }
复制代码

  • 首先利用cas修改state状态为COMPLETING,设置返回效果,然后使用 lazySet(UNSAFE.putOrderedInt)的方式设置state状态为NORMAL。效果设置完毕后,调用finishCompletion()方法唤醒等待线程,源码如下:
  1. private void finishCompletion() {
  2.     // assert state > COMPLETING;
  3.     for (WaitNode q; (q = waiters) != null;) {
  4.         if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程
  5.             for (;;) {//自旋遍历等待线程
  6.                 Thread t = q.thread;
  7.                 if (t != null) {
  8.                     q.thread = null;
  9.                     LockSupport.unpark(t);//唤醒等待线程
  10.                 }
  11.                 WaitNode next = q.next;
  12.                 if (next == null)
  13.                     break;
  14.                 q.next = null; // unlink to help gc
  15.                 q = next;
  16.             }
  17.             break;
  18.         }
  19.     }
  20.     //任务完成后调用函数,自定义扩展
  21.     done();
  22.     callable = null;        // to reduce footprint
  23. }
复制代码

  • 回到run方法,如果在 run 期间被中断,此时必要调用handlePossibleCancellationInterrupt方法来处置惩罚中断逻辑,确保任何中断(例如cancel(true))只停顿在当前run或runAndReset的任务中,源码如下:
  1. private void handlePossibleCancellationInterrupt(int s) {
  2.     //在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
  3.     if (s == INTERRUPTING)
  4.         while (state == INTERRUPTING)
  5.             Thread.yield(); // wait out pending interrupt
  6. }
复制代码
焦点方法 - get()
  1. //获取执行结果
  2. public V get() throws InterruptedException, ExecutionException {
  3.     int s = state;
  4.     if (s <= COMPLETING)
  5.         s = awaitDone(false, 0L);
  6.     return report(s);
  7. }
复制代码
说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。

  • 如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED。
  • 如果当前状态不为NEW,则根据参数mayInterruptIfRunning决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion移除并唤醒全部等待线程
面试题专栏

Java面试题专栏已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不该写上去;
  • 如果有些综合性问题你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

鼠扑

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表