并发编程-FutureTask解析

打印 上一主题 下一主题

主题 937|帖子 937|积分 2811

1、FutureTask对象介绍

Future对象大家都不陌生,是JDK1.5提供的接口,是用来以阻塞的方式获取线程异步执行完的结果。
在Java中想要通过线程执行一个任务,离不开Runnable与Callable这两个接口。
Runnable与Callable的区别在于,Runnable接口只有一个run方法,该方法用来执行逻辑,但是并没有返回值;而Callable的call方法,同样用来执行业务逻辑,但是是有一个返回值的。
Callable执行任务过程中可以通过FutureTask获得任务的执行状态,并且可以在执行完成后通过Future.get()方式获取执行结果。
Future是一个接口,而FutureTask就是Future的实现类。并且FutureTask实现了 RunnableFuture(Runnable + Future),说明我们可以创建一个FutureTask并直接把它放到线程池执行,然后获取FutureTask的执行结果。
2、FutureTask源码解析

2.1 主要方法和属性

那么FutureTask是如何通过阻塞的方式来获取到异步线程执行的结果的呢?我们看下FutureTask中的属性。
  1. // FutureTask的状态及其常量
  2. private volatile int state;
  3.     private static final int NEW          = 0;
  4.     private static final int COMPLETING   = 1;
  5.     private static final int NORMAL       = 2;
  6.     private static final int EXCEPTIONAL  = 3;
  7.     private static final int CANCELLED    = 4;
  8.     private static final int INTERRUPTING = 5;
  9.     private static final int INTERRUPTED  = 6;
  10.    
  11.     // callable对象,执行完后置空
  12.     private Callable<V> callable;
  13.     // 要返回的结果或要引发的异常来自 get() 方法
  14.     private Object outcome; // non-volatile, protected by state reads/writes
  15.     // 执行Callable的线程
  16.     private volatile Thread runner;
  17.     // 等待线程的一个链表结构
  18.     private volatile WaitNode waiters;
复制代码
FutureTask中几个比较重要的方法。
  1. // 取消任务的执行
  2. boolean cancel(boolean mayInterruptIfRunning);
  3. // 返回任务是否已经被取消
  4. boolean isCancelled();
  5. // 返回任务是否已经完成,任务状态不为NEW即为完成
  6. boolean isDone();
  7. // 通过get方法获取任务的执行结果
  8. V get() throws InterruptedException, ExecutionException;
  9. // 通过get方法获取任务的执行结果,带有超时,如果超过给定时间则抛出异常
  10. V get(long timeout, TimeUnit unit)
  11.         throws InterruptedException, ExecutionException, TimeoutException;
复制代码
2.2 FutureTask执行

当我们在线程池中执行一个Callable方法时,其实是将Callable任务封装成一个RunnableFuture对象去执行,同时将这个RunnableFuture对象返回,这样我们就拿到了FutureTask的引用,可以随时获取到任务执行的状态,并且可以在任务执行完成后通过该对象获取执行结果。
以下为ThreadPoolExecutor线程池提交一个callable方法的源码。
  1. public <T> Future<T> submit(Callable<T> task) {
  2.         if (task == null) throw new NullPointerException();
  3.         RunnableFuture<T> ftask = newTaskFor(task);
  4.         execute(ftask);
  5.         return ftask;
  6.     }
  7.        
  8.         protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  9.         return new FutureTask<T>(callable);
  10.     }
复制代码
2.3 run方法介绍

RunnableFuture其实也是一个可以执行的runnable,我们看下他的run方法。其主要流程就是执行call方法,正常执行完毕后将result结果赋值到outcome属性上。
  1. public void run() {
  2.         if (state != NEW ||
  3.             !UNSAFE.compareAndSwapObject(this, runnerOffset,
  4.                                          null, Thread.currentThread()))
  5.             return;
  6.         try {
  7.             // 将callable赋值到本地变量
  8.             Callable<V> c = callable;
  9.             // 判断callable不为空并且FutureTask的状态必须为新创建
  10.             if (c != null && state == NEW) {
  11.                 V result;
  12.                 boolean ran;
  13.                 try {
  14.                     // 执行call方法(用户自己实现的call逻辑),并获取到result结果
  15.                     result = c.call();
  16.                     ran = true;
  17.                 } catch (Throwable ex) {
  18.                     result = null;
  19.                     ran = false;
  20.                     // 如果执行过程出现异常,则将异常对象赋值到outcome上
  21.                     setException(ex);
  22.                 }
  23.                 // 如果正常执行完毕,则将result赋值到outcome属性上
  24.                 if (ran)
  25.                     set(result);
  26.             }
  27.         } finally {
  28.             // runner must be non-null until state is settled to
  29.             // prevent concurrent calls to run()
  30.             runner = null;
  31.             // state must be re-read after nulling runner to prevent
  32.             // leaked interrupts
  33.             int s = state;
  34.             if (s >= INTERRUPTING)
  35.                 handlePossibleCancellationInterrupt(s);
  36.         }
  37.     }
复制代码
以下逻辑为正常执行完成后赋值的逻辑。
  1. // 如果任务没有被取消,将future执行完的返回值赋值给result结果
  2. // FutureTask任务的执行状态是通过CAS的方式进行赋值的,并且由此可知,COMPLETING其实是一个瞬时状态
  3. // 当将线程执行结果赋值给outcome后,状态会修改为对应的NORMAL,即正常结束
  4. protected void set(V v) {
  5.         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  6.             outcome = v;
  7.             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  8.             finishCompletion();
  9.         }
  10.     }
复制代码
以下为执行异常时赋值逻辑,直接将Throwable对象赋值到outcome属性上。
  1. protected void setException(Throwable t) {
  2.         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  3.             outcome = t;
  4.             UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  5.             finishCompletion();
  6.         }
  7.     }
复制代码
无论是正常执行还是异常执行,最终都会调用一个finishCompletion方法,用来做工作的收尾工作。
2.4 get方法介绍

Future的get方法有两个重载的方法,一个是get()获取结果,一个是get(long, TimeUnit)带有超时时间的获取结果,我们看下FutureTask中的这两个方法是如何实现的。
  1. // 不带有超时时间,一直阻塞直到获取结果
  2. public V get() throws InterruptedException, ExecutionException {
  3.         int s = state;
  4.         if (s <= COMPLETING)
  5.             // 等待结果完成,带有超时的get方法也是调用的awaitDone方法
  6.             s = awaitDone(false, 0L);
  7.         // 返回结果
  8.         return report(s);
  9.     }
  10. // 带有超时时间的获取结果,如果超过时间还没有获取到结果则抛出异常
  11. public V get(long timeout, TimeUnit unit)
  12.         throws InterruptedException, ExecutionException, TimeoutException {
  13.         if (unit == null)
  14.             throw new NullPointerException();
  15.         int s = state;
  16.         // 如果任务未中断,调用awaitDone方法等待任务结果
  17.         if (s <= COMPLETING &&
  18.             (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
  19.             throw new TimeoutException();
  20.         // 返回结果
  21.         return report(s);
  22.     }
复制代码
我们看到done方法是一个受保护的空方法,此处没有任何逻辑,由其子类去根据自己的业务去实现相应的逻辑。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
  1. // timed:是否需要超时获取
  2. // nanos:超时时间单位纳秒
  3. private int awaitDone(boolean timed, long nanos)
  4.         throws InterruptedException {
  5.         final long deadline = timed ? System.nanoTime() + nanos : 0L;
  6.         WaitNode q = null;
  7.         boolean queued = false;
  8.         // 此方法会一直for循环判断任务状态是否已经完成,是Future.get阻塞的原因
  9.         for (;;) {
  10.             if (Thread.interrupted()) {
  11.                 removeWaiter(q);
  12.                 throw new InterruptedException();
  13.             }
  14.             int s = state;
  15.             // 任务状态大于COMPLETING,则表明任务结束,直接返回
  16.             if (s > COMPLETING) {
  17.                 if (q != null)
  18.                     q.thread = null;
  19.                 return s;
  20.             }
  21.             else if (s == COMPLETING) // cannot time out yet
  22.                 // Thread.yield() 方法,使当前线程由执行状态,变成为就绪状态,让出cpu时间,在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
  23.                 // COMPLETING状态为瞬时状态,任务执行完成,要么是正常结束,要么异常结束,后续会被置为NORMAL或者EXCEPTIONAL
  24.                 Thread.yield();
  25.             else if (q == null)
  26.                 // 每调用一次get方法,都会创建一个WaitNode等待节点
  27.                 q = new WaitNode();
  28.             else if (!queued)
  29.                 // 将该等待节点添加到链表结构waiters中,q.next = waiters 即在waiters的头部插入
  30.                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
  31.                                                      q.next = waiters, q);
  32.             // 如果方法带有超时判断,则判断当前时间是否已经超过了截止时间,如果超过了及截止日期,则退出循环直接返回当前状态,此时任务状态一定是NEW
  33.             else if (timed) {
  34.                 nanos = deadline - System.nanoTime();
  35.                 if (nanos <= 0L) {
  36.                     removeWaiter(q);
  37.                     return state;
  38.                 }
  39.                 LockSupport.parkNanos(this, nanos);
  40.             }
  41.             else
  42.                 LockSupport.park(this);
  43.         }
  44.     }
复制代码
3、总结

通过源码解读可以了解到Future的原理:
第一步:主线程将任务封装成一个Callable对象,通过submit方法提交到线程池去执行。
第二步:线程池执行任务的run方法,主线程则可以继续执行其他逻辑。
第三步:线程池中方法执行完成后将结果赋值到outcome属性上,并修改任务状态。
第四步:主线程在需要拿到异步任务结果的时候,主动调用fugure.get()方法来获取结果。
第五步:如果异步线程在执行过程中发生异常,则会在调用future.get()方法的时候抛出来。
以上就是对于FutureTask的分析,我们可以了解FutureTask任务执行的方式以及Future.get已阻塞的方式获取线程执行的结果原理,并且从代码中可以了解FutureTask的任务执行状态以及状态的变化过程。
作者:京东物流 丁冬
来源:京东云开发者社区 自猿其说Tech

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

反转基因福娃

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表