口试题:如何能够包管T2在T1执行完后执行,T3在T2执行完后执行?——CountD ...

打印 上一主题 下一主题

主题 892|帖子 892|积分 2676

CountDownLatch的使用方式

CountDownLatch用于某个线程等候其他线程执行完使命再执行,与thread.join()功能雷同。常见的应用场景是开启多个线程同时执行某个使命,比及所有使命执行完再执行特定利用,如汇总统计结果。
口试题:如何能够包管T2在T1执行完后执行,T3在T2执行完后执行?
join方法

可以使用join方法解决这个问题。比如在线程A中,调用线程B的join方法表示的意思就是: A等候B线程执行完毕后(开释CPU执行权),在继续执行。
  1. public class RunnableJob {
  2.     public static void main(String[] args) throws InterruptedException {
  3.         Worker runnableJob = new Worker();
  4.         Thread t1 = new Thread(runnableJob, "T1");
  5.         Thread t2 = new Thread(runnableJob, "T2");
  6.         Thread t3 = new Thread(runnableJob, "T3");
  7.         t1.start();
  8.         //这里就是在main主线程中,调用t1线程的join方法。
  9.         //也就是main主线程要等待t1执行完成后才能继续往下执行
  10.         t1.join();
  11.         t2.start();
  12.         t2.join();
  13.         t3.start();
  14.         t3.join();
  15.         System.out.println("主线程执行完毕----");
  16.     }
  17. }
  18. class Worker implements Runnable{
  19.     public void run() {
  20.         Thread thread = Thread.currentThread();
  21.         try {
  22.             Thread.sleep(1000);
  23.             System.out.println(thread.getName()+"正在执行");
  24.         } catch (InterruptedException e) {
  25.             e.printStackTrace();
  26.         }
  27.     }
  28. }
  29. //输出
  30. T1正在执行
  31. T2正在执行
  32. T3正在执行
  33. 主线程执行完毕----
复制代码
CountDownLatch

倒计时计数器
CountDownLatch用于某个线程等候其他线程执行完使命再执行,可以被认为是加强版的join()。
  1. public class CountDownLatchTest {
  2.     public static void main(String[] args) {
  3.         final CountDownLatch countDownLatch = new CountDownLatch(3);
  4.         new Thread("T1"){
  5.             public void run() {
  6.                 try {
  7.                     Thread.sleep(3000);
  8.                     System.out.println(Thread.currentThread().getName()+"正在执行");
  9.                     countDownLatch.countDown();
  10.                 } catch (InterruptedException e) {
  11.                     e.printStackTrace();
  12.                 }
  13.             };
  14.         }.start();
  15.         new Thread("T2"){
  16.             public void run() {
  17.                 try {
  18.                     Thread.sleep(3000);
  19.                     System.out.println(Thread.currentThread().getName()+"正在执行");
  20.                     countDownLatch.countDown();
  21.                 } catch (InterruptedException e) {
  22.                     e.printStackTrace();
  23.                 }
  24.             };
  25.         }.start();
  26.         new Thread("T3"){
  27.             public void run() {
  28.                 try {
  29.                     Thread.sleep(3000);
  30.                     System.out.println(Thread.currentThread().getName()+"正在执行");
  31.                     countDownLatch.countDown();
  32.                 } catch (InterruptedException e) {
  33.                     e.printStackTrace();
  34.                 }
  35.             };
  36.         }.start();
  37.         System.out.println("等待三个线程执行完,主线程才能执行");
  38.         try {
  39.             //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行;
  40.             //或者等待timeout时间后count值还没变为0的话也会继续执行
  41.             countDownLatch.await();
  42. //            countDownLatch.await(20000, TimeUnit.MILLISECONDS);
  43.         } catch (InterruptedException e) {
  44.             e.printStackTrace();
  45.         }
  46.         System.out.println("主线程执行完毕");
  47.     }
  48. }
  49. //输出
  50. 等待三个线程执行完,主线程才能执行
  51. T1正在执行
  52. T3正在执行
  53. T2正在执行
  54. 主线程执行完毕
复制代码
调用了await后,主线程被挂起,它会等候直到count值为0才继续执行;因此只影响主线程的执行次序一定要在T1 T2 T3之后,但T1 T2 T3之间的次序互不影响
应用场景: 开启多个线程同时执行某个使命,比及所有使命执行完再执行特定利用,如汇总统计结果。
两者区别

相同点:都能等候一个或者多个线程执行完成利用,比如等候三个线程执行完毕后,第四个线程才能执行
不同点:join能让线程按我们预想的的次序执行,比如线程1执行完了,线程2才能执行,线程2执行完,线程3才能执行,但是CountDownLatch就做不到.
当调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当火线程,直到N变为零(也就是线程都执行完了),由于countDown方法可以用在任何地方,以是这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多线程时,只需把这个CountDownLatch的引用传递到线程中即可
CountDownLatch原理

从源码可知,其底层是由AQS提供支持,以是其数据结构可以参考AQS的数据结构,而AQS的数据结构核心就是两个虚拟队列: 同步队列sync queue 和条件队列condition queue,不同的条件会有不同的条件队列。
CountDownLatch对AQS的共享方式实现为:CountDownLatch 将使命分为N个子线程去执行,将 state 初始化为 N, N与线程的个数一致,N个子线程是井行执行的,每个子线程都在执行完成后 countDown() 1次, state 执行 CAS 利用并减1。在所有子线程都执行完成(state=0)时会unpark()主线程,然后主线程会从 await()返回,继续执行后续的动作。

CountDownLatch源码分析

类的继承关系

CountDownLatch没有显示继承哪个父类或者实现哪个父接口, 它底层是AQS是通过内部类Sync来实现的。
  1. public class CountDownLatch {}
复制代码

类的内部类

CountDownLatch类存在一个内部类Sync,继承自AbstractQueuedSynchronizer,其源代码如下。
  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2.     // 版本号
  3.     private static final long serialVersionUID = 4982264981922014374L;
  4.    
  5.     // 构造器
  6.     Sync(int count) {
  7.         setState(count);
  8.     }
  9.    
  10.     // 返回当前计数
  11.     int getCount() {
  12.         return getState();
  13.     }
  14.     // 试图在共享模式下获取对象状态
  15.     protected int tryAcquireShared(int acquires) {
  16.         return (getState() == 0) ? 1 : -1;
  17.     }
  18.     // 试图设置状态来反映共享模式下的一个释放
  19.     protected boolean tryReleaseShared(int releases) {
  20.         // Decrement count; signal when transition to zero
  21.         // 无限循环
  22.         for (;;) {
  23.             // 获取状态
  24.             int c = getState();
  25.             if (c == 0) // 没有被线程占有
  26.                 return false;
  27.             // 下一个状态
  28.             int nextc = c-1;
  29.             if (compareAndSetState(c, nextc)) // 比较并且设置成功
  30.                 return nextc == 0;
  31.         }
  32.     }
  33. }
复制代码
说明: 对CountDownLatch方法的调用会转发到对Sync或AQS的方法的调用,以是,AQS对CountDownLatch提供支持。
类的属性

可以看到CountDownLatch类的内部只有一个Sync类型的属性:
  1. public class CountDownLatch {
  2.     // 同步队列
  3.     private final Sync sync;
  4. }
复制代码
类的构造函数
  1. public CountDownLatch(int count) {
  2.     if (count < 0) throw new IllegalArgumentException("count < 0");
  3.     // 初始化状态数
  4.     this.sync = new Sync(count);
  5. }
复制代码
说明: 该构造函数可以构造一个用给定计数初始化的CountDownLatch,而且构造函数内完成了sync的初始化,并设置了状态数。
核心函数 - await函数

此函数将会使当火线程在锁存器倒计数至零之前不绝等候,除非线程被停止。其源码如下
  1. public void await() throws InterruptedException {
  2.     // 转发到sync对象上
  3.     sync.acquireSharedInterruptibly(1);
  4. }
复制代码
说明: 由源码可知,对CountDownLatch对象的await的调用会转发为对Sync的acquireSharedInterruptibly(从AQS继承的方法)方法的调用。

  • acquireSharedInterruptibly源码如下:
  1. public final void acquireSharedInterruptibly(int arg)
  2.         throws InterruptedException {
  3.     if (Thread.interrupted())
  4.         throw new InterruptedException();
  5.     if (tryAcquireShared(arg) < 0)
  6.         doAcquireSharedInterruptibly(arg);
  7. }
复制代码
说明: 从源码中可知,acquireSharedInterruptibly又调用了CountDownLatch的内部类Sync的tryAcquireShared和AQS的doAcquireSharedInterruptibly函数。

  • tryAcquireShared函数的源码如下:
  1. protected int tryAcquireShared(int acquires) {
  2.     return (getState() == 0) ? 1 : -1;
  3. }
复制代码
说明: 该函数只是简单的判断AQS的state是否为0,为0则返回1,不为0则返回-1。

  • doAcquireSharedInterruptibly函数的源码如下:
  1. private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  2.     // 添加节点至等待队列
  3.     final Node node = addWaiter(Node.SHARED);
  4.     boolean failed = true;
  5.     try {
  6.         for (;;) { // 无限循环
  7.             // 获取node的前驱节点
  8.             final Node p = node.predecessor();
  9.             if (p == head) { // 前驱节点为头节点
  10.                 // 试图在共享模式下获取对象状态
  11.                 int r = tryAcquireShared(arg);
  12.                 if (r >= 0) { // 获取成功
  13.                     // 设置头节点并进行繁殖
  14.                     setHeadAndPropagate(node, r);
  15.                     // 设置节点next域
  16.                     p.next = null; // help GC
  17.                     failed = false;
  18.                     return;
  19.                 }
  20.             }
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt()) // 在获取失败后是否需要禁止线程并且进行中断检查
  23.                 // 抛出异常
  24.                 throw new InterruptedException();
  25.         }
  26.     } finally {
  27.         if (failed)
  28.             cancelAcquire(node);
  29.     }
  30. }
复制代码
说明: 在AQS的doAcquireSharedInterruptibly中可能会再次调用CountDownLatch的内部类Sync的tryAcquireShared方法和AQS的setHeadAndPropagate方法。

  • setHeadAndPropagate方法源码如下。
  1. private void setHeadAndPropagate(Node node, int propagate) {
  2.     // 获取头节点
  3.     Node h = head; // Record old head for check below
  4.     // 设置头节点
  5.     setHead(node);
  6.     /*
  7.         * Try to signal next queued node if:
  8.         *   Propagation was indicated by caller,
  9.         *     or was recorded (as h.waitStatus either before
  10.         *     or after setHead) by a previous operation
  11.         *     (note: this uses sign-check of waitStatus because
  12.         *      PROPAGATE status may transition to SIGNAL.)
  13.         * and
  14.         *   The next node is waiting in shared mode,
  15.         *     or we don't know, because it appears null
  16.         *
  17.         * The conservatism in both of these checks may cause
  18.         * unnecessary wake-ups, but only when there are multiple
  19.         * racing acquires/releases, so most need signals now or soon
  20.         * anyway.
  21.         */
  22.     // 进行判断
  23.     if (propagate > 0 || h == null || h.waitStatus < 0 ||
  24.         (h = head) == null || h.waitStatus < 0) {
  25.         // 获取节点的后继
  26.         Node s = node.next;
  27.         if (s == null || s.isShared()) // 后继为空或者为共享模式
  28.             // 以共享模式进行释放
  29.             doReleaseShared();
  30.     }
  31. }   
复制代码
说明: 该方法设置头节点而且开释头节点后面的满足条件的结点,该方法中可能会调用到AQS的doReleaseShared方法,其源码如下。
  1. private void doReleaseShared() {
  2.     /*
  3.         * Ensure that a release propagates, even if there are other
  4.         * in-progress acquires/releases.  This proceeds in the usual
  5.         * way of trying to unparkSuccessor of head if it needs
  6.         * signal. But if it does not, status is set to PROPAGATE to
  7.         * ensure that upon release, propagation continues.
  8.         * Additionally, we must loop in case a new node is added
  9.         * while we are doing this. Also, unlike other uses of
  10.         * unparkSuccessor, we need to know if CAS to reset status
  11.         * fails, if so rechecking.
  12.         */
  13.     // 无限循环
  14.     for (;;) {
  15.         // 保存头节点
  16.         Node h = head;
  17.         if (h != null && h != tail) { // 头节点不为空并且头节点不为尾结点
  18.             // 获取头节点的等待状态
  19.             int ws = h.waitStatus;
  20.             if (ws == Node.SIGNAL) { // 状态为SIGNAL
  21.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
  22.                     continue;            // loop to recheck cases
  23.                 // 释放后继结点
  24.                 unparkSuccessor(h);
  25.             }
  26.             else if (ws == 0 &&
  27.                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
  28.                 continue;                // loop on failed CAS
  29.         }
  30.         if (h == head) // 若头节点改变,继续循环  
  31.             break;
  32.     }
  33. }
复制代码
说明: 该方法在共享模式下开释。
以是,对CountDownLatch的await调用大致会有如下的调用链。

说明: 上图给出了可能会调用到的紧张方法,并非一定会调用到
核心函数 - countDown函数

此函数将递减锁存器的计数,如果计数到达零,则开释所有等候的线程
  1. public void countDown() {
  2.     sync.releaseShared(1);
  3. }
复制代码
说明: 对countDown的调用转换为对Sync对象的releaseShared(从AQS继承而来)方法的调用。

  • releaseShared源码如下
  1. public final boolean releaseShared(int arg) {
  2.     if (tryReleaseShared(arg)) {
  3.         // 当state状态为0了,才会执行这里
  4.         doReleaseShared();
  5.         return true;
  6.     }
  7.     return false;
  8. }
复制代码
说明: 此函数会以共享模式开释对象,而且在函数中会调用到CountDownLatch的tryReleaseShared函数,而且可能会调用AQS的doReleaseShared函数。

  • tryReleaseShared源码如下
  1. protected boolean tryReleaseShared(int releases) {
  2.     // Decrement count; signal when transition to zero
  3.     // 无限循环
  4.     for (;;) {
  5.         // 获取状态
  6.         int c = getState();
  7.         if (c == 0) // 没有被线程占有
  8.             return false;
  9.         // 下一个状态
  10.         int nextc = c-1;
  11.         if (compareAndSetState(c, nextc)) // 比较并且设置成功
  12.             return nextc == 0;
  13.     }
  14. }
复制代码
说明: 此函数会试图设置状态来反映共享模式下的一个开释。具体的流程在下面的示例中会举行分析。

  • AQS的doReleaseShared的源码如下
  1. private void doReleaseShared() {
  2.     /*
  3.         * Ensure that a release propagates, even if there are other
  4.         * in-progress acquires/releases.  This proceeds in the usual
  5.         * way of trying to unparkSuccessor of head if it needs
  6.         * signal. But if it does not, status is set to PROPAGATE to
  7.         * ensure that upon release, propagation continues.
  8.         * Additionally, we must loop in case a new node is added
  9.         * while we are doing this. Also, unlike other uses of
  10.         * unparkSuccessor, we need to know if CAS to reset status
  11.         * fails, if so rechecking.
  12.         */
  13.     // 无限循环
  14.     for (;;) {
  15.         // 保存头节点
  16.         Node h = head;
  17.         if (h != null && h != tail) { // 头节点不为空并且头节点不为尾结点
  18.             // 获取头节点的等待状态
  19.             int ws = h.waitStatus;
  20.             if (ws == Node.SIGNAL) { // 状态为SIGNAL
  21.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 不成功就继续
  22.                     continue;            // loop to recheck cases
  23.                 // 释放后继结点
  24.                 unparkSuccessor(h);
  25.             }
  26.             else if (ws == 0 &&
  27.                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 状态为0并且不成功,继续
  28.                 continue;                // loop on failed CAS
  29.         }
  30.         if (h == head) // 若头节点改变,继续循环  
  31.             break;
  32.     }
  33. }
复制代码
说明: 此函数在共享模式下开释资源。
以是,对CountDownLatch的countDown调用大致会有如下的调用链

示例

下面给出了一个使用CountDownLatch的示例。
  1. import java.util.concurrent.CountDownLatch;
  2. class MyThread extends Thread {
  3.     private CountDownLatch countDownLatch;
  4.    
  5.     public MyThread(String name, CountDownLatch countDownLatch) {
  6.         super(name);
  7.         this.countDownLatch = countDownLatch;
  8.     }
  9.    
  10.     public void run() {
  11.         System.out.println(Thread.currentThread().getName() + " doing something");
  12.         try {
  13.             Thread.sleep(1000);
  14.         } catch (InterruptedException e) {
  15.             e.printStackTrace();
  16.         }
  17.         System.out.println(Thread.currentThread().getName() + " finish");
  18.         countDownLatch.countDown();
  19.     }
  20. }
  21. public class CountDownLatchDemo {
  22.     public static void main(String[] args) {
  23.         CountDownLatch countDownLatch = new CountDownLatch(2);
  24.         MyThread t1 = new MyThread("t1", countDownLatch);
  25.         MyThread t2 = new MyThread("t2", countDownLatch);
  26.         t1.start();
  27.         t2.start();
  28.         System.out.println("Waiting for t1 thread and t2 thread to finish");
  29.         try {
  30.             countDownLatch.await();
  31.         } catch (InterruptedException e) {
  32.             e.printStackTrace();
  33.         }            
  34.         System.out.println(Thread.currentThread().getName() + " continue");        
  35.     }
  36. }
复制代码
运行结果(某一次):
  1. Waiting for t1 thread and t2 thread to finish
  2. t1 doing something
  3. t2 doing something
  4. t1 finish
  5. t2 finish
  6. main continue
复制代码
说明: 本程序首先计数器初始化为2。根据结果,可能会存在如下的一种时序图。

说明: 首先main线程会调用await利用,此时main线程会被阻塞,等候被唤醒,之后t1线程执行了countDown利用,末了,t2线程执行了countDown利用,此时main线程就被唤醒了,可以继续运行。下面,举行具体分析。

  • main线程执行countDownLatch.await利用,紧张调用的函数如下。

说明: 在末了,main线程就被park了,即克制运行了。此时Sync queue(同步队列)中有两个节点,AQS的state为2,包含main线程的结点的nextWaiter指向SHARED结点。

  • t1线程执行countDownLatch.countDown利用,紧张调用的函数如下。

说明: 此时,Sync queue队列里的结点个数未发生变革,但是此时,AQS的state已经变为1了。

  • t2线程执行countDownLatch.countDown利用,紧张调用的函数如下。

说明: 经过调用后,AQS的state为0,而且此时,main线程会被unpark,可以继续运行。当main线程获取cpu资源后,继续运行。

  • main线程获取cpu资源,继续运行,由于main线程是在parkAndCheckInterrupt函数中被克制的,以是此时,继续在parkAndCheckInterrupt函数运行。

说明: main线程恢复,继续在parkAndCheckInterrupt函数中运行,之后又会回到最终达到的状态为AQS的state为0,而且head与tail指向同一个结点,该节点的额nextWaiter域还是指向SHARED结点
口试题专栏

Java口试题专栏已上线,欢迎访问。

  • 如果你不知道简历怎么写,简历项目不知道怎么包装;
  • 如果简历中有些内容你不知道该不应写上去;
  • 如果有些综合性问题你不知道怎么答;
那么可以私信我,我会尽我所能帮助你。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

滴水恩情

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表