Future源码一观-JUC系列

打印 上一主题 下一主题

主题 893|帖子 893|积分 2679

背景介绍

在程序中,主线程启动一个子线程进行异步计算,主线程是不阻塞继续执行的,这点看起来是非常自然的,都已经选择启动子线程去异步执行了,主线程如果是阻塞的话,那还不如主线程自己去执行不就好了。那会不会有一种场景,异步线程执行的结果主线程是需要使用的,或者说主线程先做一些工作,然后需要确认子线程执行情况来进行后续的操作。那么这里就需要子线程异步执行完任务能把结果告诉主线程,并且主线程还能访问到子线程执行任务的状态,比如是否执行完成或正在执行中。
Future就是上面概念的抽象,按照源码中的注释,它代表着一个异步计算的结果,提供的方法中可以通过get方法获取异步线程计算的结果,如果计算还没结束,就会阻塞等待返回成功;也可以通过cancel方法取消异步计算任务;还可以通过isCancelled和isDone获得异步执行的状态;如果一个异步执行的内容并没有返回值,但是希望可以使用Future来获得取消异步计算任务的能力,可以返回null。
FutureTask

FutureTask提供了对Future的基础实现,在进入FutureTask源码之前,我们先考虑下如果要实现Future的功能可以怎么设计呢?
1,异步线程进入执行任务的时候,主线程先阻塞住,等到一步线程任务完成有返回结果了,唤醒主线程,把返回结果给它。
2,需要有个标记,记录异步线程有没有执行结束,异步线程任务执行一结束,这个标记就要变更,通过这个标记就可以知道执行状态。
3,能获取异步线程,在执行还没完成先,对异步线程可以中断,这样就可以取消异步线程执行的任务了。
4,异步线程执行和取消操作是有并发竞争的,所以应该对标记的更新做锁保护处理。
对照Future的API,大致能想到这些,实际还有大量关键细节组合才能实现。可以带这个实现思路进入源码的学习。
Task

FutureTask本身就是继承Runnable,Runnable的run方法是没有返回参数的。那么既然FutureTask需要把异步线程执行结果返回,就意味着需要把结果拿到记录。
构造函数
  1. public FutureTask(Callable<V> callable) {
  2.     if (callable == null)
  3.         throw new NullPointerException();
  4.     this.callable = callable;
  5.     this.state = NEW;       // ensure visibility of callable
  6. }
  7. public FutureTask(Runnable runnable, V result) {
  8.     this.callable = Executors.callable(runnable, result);
  9.     this.state = NEW;       // ensure visibility of callable
  10. }
复制代码
当构造函数传的是Runnable的时候,会适配成Callable,所以对于自己的run方法需要返回结果的事那么就好办了,就是调用callable的run方法就行。我们再衍生开去看下Executors.callable(runnable, result);的实现。
  1. public static <T> Callable<T> callable(Runnable task, T result) {
  2.     if (task == null)
  3.         throw new NullPointerException();
  4.     return new RunnableAdapter<T>(task, result);
  5. }       
  6. static final class RunnableAdapter<T> implements Callable<T> {
  7.     final Runnable task;
  8.     final T result;
  9.     RunnableAdapter(Runnable task, T result) {
  10.         this.task = task;
  11.         this.result = result;
  12.     }
  13.     public T call() {
  14.         task.run();
  15.         return result;
  16.     }
  17. }
复制代码
这个适配没什么特殊,把一个result引用作为参数传入,然后作为结果返回。所以其实很少用这种方式来获取result,更多的是传一个null进来,因为更多的时候是想知道异步线程是否执行结束了,而不是要结果。
run方法

FutureTask既然本身就是Runnable,把它作为task提交给线程池执行,就是调用它的run方法。根据前面的分析,这个run方法需要调用内部属性callable的run获得result,然后保存result,以备get方法来获取的时候能直接返回,另外肯定也是要处理异常的场景。
以下是run方法的源码,再加上仔细关注一下状态的流转,就可以比较好的理解这个核心代码了。
  1. public void run() {
  2.           // 【1】
  3.     if (state != NEW ||
  4.         !UNSAFE.compareAndSwapObject(this, runnerOffset,
  5.                                      null, Thread.currentThread()))
  6.         return;
  7.     try {
  8.         Callable<V> c = callable;
  9.         if (c != null && state == NEW) {
  10.             V result;
  11.             boolean ran;
  12.             try {
  13.                 result = c.call();
  14.                 ran = true;
  15.             } catch (Throwable ex) {
  16.                 result = null;
  17.                 ran = false;
  18.                       // 【2】
  19.                 setException(ex);
  20.             }
  21.             if (ran)
  22.                       // 【3】
  23.                 set(result);
  24.         }
  25.     } finally {
  26.               //【4】
  27.         // runner must be non-null until state is settled to
  28.         // prevent concurrent calls to run()
  29.         runner = null;
  30.         // state must be re-read after nulling runner to prevent
  31.         // leaked interrupts
  32.         int s = state;
  33.         if (s >= INTERRUPTING)
  34.                   //【5】
  35.             handlePossibleCancellationInterrupt(s);
  36.     }
  37. }
  38. protected void set(V v) {
  39.   if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  40.     outcome = v;
  41.     UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
  42.     finishCompletion();
  43.   }
  44. }
  45. protected void setException(Throwable t) {
  46.   if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
  47.     outcome = t;
  48.     UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
  49.     finishCompletion();
  50.   }
  51. }
  52. private void handlePossibleCancellationInterrupt(int s) {
  53.   if (s == INTERRUPTING)
  54.     while (state == INTERRUPTING)
  55.       Thread.yield(); // wait out pending interrupt
  56. }
复制代码

  • 【1】执行的起始状态必须是NEW,初始化FutureTask的时候设置的NEW状态,如果不是NEW状态,就退出run方法;并且CAS设置runner字段为当前执行线程,设置失败表示已经设置过,就退出run方法。根据状态和CAS设置runner字段判断,确保了FutureTask实例同时只能有一个一个线程在执行。
  • 【2】执行callable的run方法异常,进行setException操作,先把状态从NEW设置成COMPLETING,设置成功后把outcome字段设置成异常结果,然后将状态设置成EXCEPTIONAL。finishCompletion方法在状态进入终态(final state)的时候都会被调用,他会唤醒等待的线程节点,是流程中的关键一环,在后续中详细分析。
  • 【3】正常执行callable的run方法会获得结果,进行set操作,老规矩,先把状态从NEW设置成COMPLIETING,设置成功后把outcome字段设置成返回结果result,以备等待线程来获取,然后把状态设置成NORMAL。NORMAL作为终态,也会调用finishCompletion方法。
  • 【4】finally代码块,前面有通过判断runner是否为空来避免并发执行,所以最后把runner设置成null,这个注释好理解,在状态确定之前,Runner必须是非空的,以防止对run()的并发调用,这一点结合【1】就可以解释。第二步的注释说,状态重新读取必须在将runner设置为null之后,以防止泄漏中断,这里需要结合cancel方法分析,cancel方法中执行的顺序是先将state修改成INTERRUPTING成功后再使用runner,这里就保证了先设置runner为null后再获取state的最新值。
  • 【5】handlePossibleCancellationInterrupt方法中用一个while循环加Thread.yield()来等待state在INTERRUPTING下变成INTERRUPTED。也就是说当cancel方法把state改成INTERRUPTING后,run方法就会等待cancel方法执行结束后自己才执行结束。
直到网上找到了这篇文章why outcome object in FutureTask is non-volatile?
这里有个很巧妙的设计,就是利用java的happends before中的传递原则,使得在不使用锁的情况下,保证其他线程读到state=NORMAL时,该线程一定能读到outcome的最新值
Task State

前面提到需要一个标记来记录任务的执行状态,源码实现中有一个volatile修饰的int类型state字段(和AQS一样的配方的感觉来了)。
  1.     /**
  2.      * NEW -> COMPLETING -> NORMAL
  3.      * NEW -> COMPLETING -> EXCEPTIONAL
  4.      * NEW -> CANCELLED
  5.      * NEW -> INTERRUPTING -> INTERRUPTED
  6.     **/
  7.                 private volatile int state;
  8.     private static final int NEW          = 0;
  9.     private static final int COMPLETING   = 1;
  10.     private static final int NORMAL       = 2;
  11.     private static final int EXCEPTIONAL  = 3;
  12.     private static final int CANCELLED    = 4;
  13.     private static final int INTERRUPTING = 5;
  14.     private static final int INTERRUPTED  = 6;
复制代码
注释提供了全部状态流转路径,核心逻辑就是一步步变更状态来进行的。
Treiber Stack

我们需要了解清楚这个Treiber Stack的概念,因为这在JUC源码很多地方有使用,有助于我们理解JUC其他组件代码的实现。
对于一个栈,我们并发往栈里放节点的时候如何处理竞争呢?比较简单的方式就是使用锁,放的时候锁,取的时候锁。
有个大佬他不想用锁,而是利用CAS并发原语设计了一个无锁堆栈,并发表了论文,他名字就叫Treiber,这个就是Treiber Stack的由来。在FutureTask的源码注释中专门提到,很多JDK源码中都用到了类似这种引用,表达这个实现是有坚实理论依据的,有一种做学术的专业氛围。
直接看《Java Concurrency in Practice》中提供的实现代码:
  1. public class TreiberStack <E> {
  2.     AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
  3.     public void push(E item) {
  4.         Node<E> newHead = new Node<E>(item);
  5.         Node<E> oldHead;
  6.         do {
  7.             oldHead = top.get();
  8.             newHead.next = oldHead;
  9.         } while (!top.compareAndSet(oldHead, newHead));
  10.     }
  11.     public E pop() {
  12.         Node<E> oldHead;
  13.         Node<E> newHead;
  14.         do {
  15.             oldHead = top.get();
  16.             if (oldHead == null)
  17.                 return null;
  18.             newHead = oldHead.next;
  19.         } while (!top.compareAndSet(oldHead, newHead));
  20.         return oldHead.item;
  21.     }
  22.     private static class Node <E> {
  23.         public final E item;
  24.         public Node<E> next;
  25.         public Node(E item) {
  26.             this.item = item;
  27.         }
  28.     }
  29. }
复制代码
这个队列在入队和出队的时候都没有进行锁操作,而是CAS设置头节点是否成功,如果设置成功表示头节点没有被修改过,就没有竞争发生,直接设置头节点,如果CAS设置失败表示有竞争发生,则字段继续,知道设置头节点成功。
其实只要记住一点,操作这个数据结构的入口集中在头节点上,原子操作头节点保证不会发生并发引起的读写数据异常的问题。
下面看一下FutureTask是如何定义这个链表节点的:
WaitNode

使用WaitNode来表示链表节点,内部有记录阻塞等待的线程和下一个节点的引用。
  1. static final class WaitNode {
  2.     volatile Thread thread;
  3.     volatile WaitNode next;
  4.     WaitNode() { thread = Thread.currentThread(); }
  5. }
复制代码
以下是FutureTask中实现的Treiber Stack结构图:

get方法

前面已经提过,get方法是阻塞线程等待的,怎么阻塞的?多个线程都调用get方法阻塞的时候如何维护这些线程?带着这两问题继续阅读源码。
[code]public V get() throws InterruptedException, ExecutionException {    int s = state;    if (s

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

干翻全岛蛙蛙

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表