JUC并发—7.AQS源码分析三

打印 上一主题 下一主题

主题 866|帖子 866|积分 2598

大纲
1.等待多线程完成的CountDownLatch先容
2.CountDownLatch.await()方法源码
3.CountDownLatch.coutDown()方法源码
4.CountDownLatch总结
5.控制并发线程数的Semaphore先容
6.Semaphore的令牌获取过程
7.Semaphore的令牌释放过程
8.同步屏障CyclicBarrier先容
9.CyclicBarrier的await()方法源码
10.使用CountDownLatch等待注册的完成
11.使用CyclicBarrier将工作任务多线程分而治之
12.使用CyclicBarrier聚合服务接口的返回效果
13.使用Semaphore等待指定数量线程完成任务
 
volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件
 
1.等待多线程完成的CountDownLatch
(1)CountDownLatch的简介
(2)CountDownLatch的应用
(3)CountDownLatch的例子
 
(1)CountDownLatch的简介
CountDownLatch答应一个或多个线程等待其他线程完成操纵。CountDownLatch提供了两个焦点方法,分别是await()方法和countDown()方法。CountDownLatch.await()方法让调用线程举行阻塞进入等待状态,CountDownLatch.countDown()方法用于对计数器举行递减。
 
CountDownLatch在构造时需要传入一个正整数作为计数器初始值。线程每调用一次countDown()方法,都会对该计数器减一。当计数器为0时,会唤醒所有执行await()方法时被阻塞的线程。
 
(2)CountDownLatch的应用
应用一:
使用多线程去解析一个Excel里多个sheet的数据,每个线程解析一个sheet里的数据,等所有sheet解析完再提示处理完成。此时便可以使用CountDownLatch来实现,当然可以使用Thread.join()方法。
 
留意:Thread.join()方法是基于wait()和notify()来实现的。在main线程里开启一个线程A,main线程假如执行了线程A的join()方法,那么就会导致main线程被阻塞,main线程会等待线程A执行完毕才会继续往下执行。
 
应用二:
微服务注册中心的register-client,为了在注册线程执行成功后,才发送心跳。可以使用CountDownLatch,当然也可以使用Thread.join()方法。
 
应用三:
可以通过CountDownLatch实现类似并发的效果。把CountDownLatch的计数器设置为1,然后让1000个线程调用await()方法。当1000个线程初始化完成后,在main线程调用countDown()让计数器归零。这样这1000个线程就会在一个for()循环中,依次被唤醒。
 
(3)CountDownLatch的例子
  1. public class CountDownLatchDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         final CountDownLatch latch = new CountDownLatch(2);
  4.         new Thread() {
  5.             public void run() {
  6.                 try {
  7.                     Thread.sleep(1000);
  8.                     System.out.println("线程1开始执行,休眠2秒...");
  9.                     Thread.sleep(1000);
  10.                     System.out.println("线程1准备执行countDown操作...");
  11.                     latch.countDown();
  12.                     System.out.println("线程1完成执行countDown操作...");
  13.                 } catch (Exception e) {
  14.                     e.printStackTrace();
  15.                 }
  16.             }
  17.         }.start();
  18.         new Thread() {
  19.             public void run() {
  20.                 try {
  21.                     Thread.sleep(1000);
  22.                     System.out.println("线程2开始执行,休眠2秒...");
  23.                     Thread.sleep(1000);
  24.                     System.out.println("线程2准备执行countDown操作...");
  25.                     latch.countDown();
  26.                     System.out.println("线程2完成执行countDown操作...");
  27.                 } catch (Exception e) {
  28.                     e.printStackTrace();
  29.                 }
  30.             }
  31.         }.start();
  32.         System.out.println("main线程准备执行countDownLatch的await操作,将会同步阻塞等待...");
  33.         latch.await();
  34.         System.out.println("所有线程都完成countDown操作,结束同步阻塞等待...");
  35.     }
  36. }
复制代码
 
2.CountDownLatch.await()方法源码
(1)CountDownLatch.await()方法的阻塞流程
(2)CountDownLatch.await()方法的唤醒流程
(3)CountDownLatch.await()方法的阻塞总结
 
(1)CountDownLatch.await()方法的阻塞流程
CountDownLatch是基于AQS中的共享锁来实现的。从CountDownLatch的构造方法可知,CountDownLatch的count就是AQS的state。
 
调用CountDownLatch的await()方法时,会先调用AQS的acquireSharedInterruptibly()模版方法,然后会调用CountDownLatch的内部类Sync实现的tryAcquireShared()方法。tryAcquireShared()方法会判断state的值是否为0,假如为0,才返回1,否则返回-1。
 
当调用CountDownLatch内部类Sync的tryAcquireShared()方法得到的返回值是-1时,才会调用AQS的doAcquireSharedInterruptibly()方法,将当火线程封装成Node结点加入等待队列,然后挂起当火线程举行阻塞。
  1. //A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  2. public class CountDownLatch {
  3.     private final Sync sync;
  4.    
  5.     public CountDownLatch(int count) {
  6.         if (count < 0) {
  7.             throw new IllegalArgumentException("count < 0");
  8.         }
  9.         this.sync = new Sync(count);
  10.     }
  11.    
  12.     //Synchronization control For CountDownLatch.
  13.     //Uses AQS state to represent count.
  14.     private static final class Sync extends AbstractQueuedSynchronizer {
  15.         Sync(int count) {
  16.             setState(count);
  17.         }
  18.         
  19.         int getCount() {
  20.             return getState();
  21.         }
  22.         
  23.         protected int tryAcquireShared(int acquires) {
  24.             return (getState() == 0) ? 1 : -1;
  25.         }
  26.         
  27.         protected boolean tryReleaseShared(int releases) {
  28.             //Decrement count; signal when transition to zero
  29.             for (;;) {
  30.                 int c = getState();
  31.                 if (c == 0) {
  32.                     return false;
  33.                 }
  34.                 int nextc = c-1;
  35.                 if (compareAndSetState(c, nextc)) {
  36.                     return nextc == 0;
  37.                 }
  38.             }
  39.         }
  40.     }
  41.    
  42.     //Causes the current thread to wait until the latch has counted down to zero,
  43.     //unless the thread is Thread#interrupt interrupted.
  44.     public void await() throws InterruptedException {
  45.         //执行AQS的acquireSharedInterruptibly()方法
  46.         sync.acquireSharedInterruptibly(1);
  47.     }
  48.     ...
  49. }
  50. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  51.     ...
  52.     //Acquires in shared mode, aborting if interrupted.
  53.     //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
  54.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
  55.     //invoking #tryAcquireShared until success or the thread is interrupted.
  56.     public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  57.         if (Thread.interrupted()) {
  58.             throw new InterruptedException();
  59.         }
  60.         //执行CountDownLatch的内部类Sync实现的tryAcquireShared()方法,抢占共享锁
  61.         if (tryAcquireShared(arg) < 0) {
  62.             //执行AQS的doAcquireSharedInterruptibly()方法
  63.             doAcquireSharedInterruptibly(arg);
  64.         }
  65.     }
  66.    
  67.     //Acquires in shared interruptible mode.
  68.     private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
  69.         final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点
  70.         boolean failed = true;
  71.         try {
  72.             //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法
  73.             //将node结点的有效前驱结点的状态设置为SIGNAL
  74.             for (;;) {
  75.                 final Node p = node.predecessor();//node结点的前驱结点
  76.                 if (p == head) {
  77.                     int r = tryAcquireShared(arg);
  78.                     if (r >= 0) {
  79.                         setHeadAndPropagate(node, r);
  80.                         p.next = null; // help GC
  81.                         failed = false;
  82.                         return;
  83.                     }
  84.                 }
  85.                 //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
  86.                 //执行parkAndCheckInterrupt()方法挂起当前线程
  87.                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
  88.                     throw new InterruptedException();
  89.                 }
  90.             }
  91.         } finally {
  92.             if (failed) {
  93.                 cancelAcquire(node);
  94.             }
  95.         }
  96.     }
  97.    
  98.     //Checks and updates status for a node that failed to acquire.
  99.     //Returns true if thread should block. This is the main signal control in all acquire loops.
  100.     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  101.         int ws = pred.waitStatus;
  102.         if (ws == Node.SIGNAL) {
  103.             //This node has already set status asking a release to signal it, so it can safely park.
  104.             return true;
  105.         }
  106.         if (ws > 0) {
  107.             //Predecessor was cancelled. Skip over predecessors and indicate retry.
  108.             do {
  109.                 node.prev = pred = pred.prev;
  110.             } while (pred.waitStatus > 0);
  111.             pred.next = node;
  112.         } else {
  113.             //waitStatus must be 0 or PROPAGATE.  
  114.             //Indicate that we need a signal, but don't park yet.  
  115.             //Caller will need to retry to make sure it cannot acquire before parking.
  116.             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  117.         }
  118.         return false;
  119.     }
  120.    
  121.     //设置头结点和唤醒后续线程
  122.     //Sets head of queue, and checks if successor may be waiting in shared mode,
  123.     //if so propagating if either propagate > 0 or PROPAGATE status was set.
  124.     private void setHeadAndPropagate(Node node, int propagate) {
  125.         Node h = head;
  126.         setHead(node);//将node结点设置为头结点
  127.         if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
  128.             Node s = node.next;
  129.             if (s == null || s.isShared()) {
  130.                 doReleaseShared();
  131.             }
  132.         }
  133.     }
  134.    
  135.     private void setHead(Node node) {
  136.         head = node;
  137.         node.thread = null;
  138.         node.prev = null;
  139.     }
  140.     ...
  141. }
复制代码
(2)CountDownLatch.await()方法的唤醒流程
调用await()方法时,起首会将当火线程封装成Node结点并添加到等待队列中,然后在执行第一次for循环时会设置该Node结点的前驱结点状态为SIGNAL,接着在执行第二次for循环时才会将当火线程举行挂起阻塞。
 
当该线程后续被唤醒时,该线程又会进入下一次for循环。假如该线程对应的node结点的前驱结点是等待队列的头结点且state值已为0,那么就执行AQS的setHeadAndPropagate()方法设置头结点 + 唤醒后续线程。
 
其中setHeadAndPropagate()方法有两个工作(设置头结点 + 唤醒通报):
工作一:设置当前被唤醒线程对应的结点为头结点
工作二:当满意如下这两个条件的时候需要调用doReleaseShared()方法唤醒后续的线程
条件一:propagate > 0,表现当前是共享锁,需要举行唤醒通报
条件二:s.isShared()判断当前结点为共享模式
 
CountDownLatch的实现中会在以下两个场景调用doReleaseShared()方法:
场景一:state为1时调用的countDown()方法会调用doReleaseShared()方法
场景二:当阻塞的线程被唤醒时,会调用setHeadAndPropagate()方法,进而调用doReleaseShared()方法,这样可以提升唤醒共享结点的速率
 
(3)CountDownLatch.await()方法的阻塞总结
只要state != 0,就会举行如下处理:
一.将当火线程封装成一个Node结点,然后添加到AQS的等待队列中
二.调用LockSupport.park()方法,挂起当火线程
 
3.CountDownLatch.coutDown()方法源码
(1)CountDownLatch.coutDown()的唤醒流程
(2)CountDownLatch.tryReleaseShared()
(3)AQS的doReleaseShared()方法
 
(1)CountDownLatch.coutDown()的唤醒流程
调用CountDownLatch的countDown()方法时,会先调用AQS的releaseShared()模版方法,然后会执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法。
 
假如tryReleaseShared()方法返回true,则执行AQS的doReleaseShared()方法,通过AQS的doReleaseShared()方法唤醒共享锁模式下的等待队列中的线程。
  1. //A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
  2. public class CountDownLatch {
  3.     private final Sync sync;
  4.    
  5.     public CountDownLatch(int count) {
  6.         if (count < 0) {
  7.             throw new IllegalArgumentException("count < 0");
  8.         }
  9.         this.sync = new Sync(count);
  10.     }
  11.    
  12.     //Synchronization control For CountDownLatch.
  13.     //Uses AQS state to represent count.
  14.     private static final class Sync extends AbstractQueuedSynchronizer {
  15.         Sync(int count) {
  16.             setState(count);
  17.         }
  18.         
  19.         int getCount() {
  20.             return getState();
  21.         }
  22.         
  23.         protected int tryAcquireShared(int acquires) {
  24.             return (getState() == 0) ? 1 : -1;
  25.         }
  26.         
  27.         protected boolean tryReleaseShared(int releases) {
  28.             //Decrement count; signal when transition to zero
  29.             for (;;) {
  30.                 int c = getState();
  31.                 if (c == 0) {
  32.                     return false;
  33.                 }
  34.                 int nextc = c-1;
  35.                 if (compareAndSetState(c, nextc)) {
  36.                     return nextc == 0;
  37.                 }
  38.             }
  39.         }
  40.     }
  41.            
  42.     //Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
  43.     public void countDown() {
  44.         //执行AQS的releaseShared()方法
  45.         sync.releaseShared(1);
  46.     }
  47.     ...
  48. }
  49. public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  50.     ...
  51.     //Releases in shared mode.  
  52.     //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
  53.     public final boolean releaseShared(int arg) {
  54.         //执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法,释放共享锁
  55.         if (tryReleaseShared(arg)) {
  56.             //执行AQS的doReleaseShared()方法
  57.             doReleaseShared();
  58.             return true;
  59.         }
  60.         return false;
  61.     }
  62.    
  63.     //Release action for shared mode -- signals successor and ensures propagation.
  64.     //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
  65.     private void doReleaseShared() {
  66.         for (;;) {
  67.             //每次循环时头结点都会发生变化
  68.             //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程
  69.             //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点
  70.             Node h = head;//获取最新的头结点
  71.             if (h != null && h != tail) {//等待队列中存在挂起线程的结点
  72.                 int ws = h.waitStatus;
  73.                 if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒
  74.                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
  75.                         continue;//loop to recheck cases
  76.                     }
  77.                     //唤醒头结点的后继结点
  78.                     //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点
  79.                     unparkSuccessor(h);
  80.                 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
  81.                     //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态
  82.                     continue;//loop on failed CAS
  83.                 }
  84.             }
  85.             if (h == head) {//判断头结点是否有变化
  86.                 break;//loop if head changed
  87.             }
  88.         }
  89.     }
  90.    
  91.     //Wakes up node's successor, if one exists.
  92.     private void unparkSuccessor(Node node) {
  93.         int ws = node.waitStatus;
  94.         if (ws < 0) {
  95.             compareAndSetWaitStatus(node, ws, 0);
  96.         }
  97.         Node s = node.next;
  98.         if (s == null || s.waitStatus > 0) {
  99.             s = null;
  100.             for (Node t = tail; t != null && t != node; t = t.prev) {
  101.                 if (t.waitStatus <= 0) {
  102.                     s = t;
  103.                 }
  104.             }
  105.         }
  106.         if (s != null) {
  107.             LockSupport.unpark(s.thread);
  108.         }
  109.     }
  110.     ...
  111. }
复制代码
 
7.Semaphore的令牌释放过程
(1)Semaphore的令牌释放过程
(2)Semaphore的令牌释放本质
 
(1)Semaphore的令牌释放过程
在调用Semaphore的release()方法去释放令牌时:起首会执行AQS的模版方法releaseShared(),然后执行Sync实现的tryReleaseShared()方法来释放锁(累加state值)。假如释放锁成功,则执行AQS的doReleaseShared()方法去唤醒线程。
 
(2)Semaphore的令牌释放本质
Semaphore的release()方法释放令牌的本质就是对state字段举行累加,然后唤醒等待队列头结点的后继结点 + 唤醒通报来唤醒等待的线程。
 
留意:并非一定要执行acquire()方法的线程才能调用release()方法,任意一个线程都可以调用release()方法,也可以通过reducePermits()方法来减少令牌数。
[code]public class Semaphore implements java.io.Serializable {    private final Sync sync;        //Creates a Semaphore with the given number of permits and nonfair fairness setting.    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }        //Releases a permit, returning it to the semaphore.    public void release() {        //执行AQS的模版方法releaseShared()        sync.releaseShared(1);    }        //Synchronization implementation for semaphore.      //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.    abstract static class Sync extends AbstractQueuedSynchronizer {        Sync(int permits) {            //设置state的值为传入的令牌数            setState(permits);        }                //实验释放锁,也就是对state值举行累加        protected final boolean tryReleaseShared(int releases) {            for (;;) {                int current = getState();                int next = current + releases;                if (next < current) {                    throw new Error("Maximum permit count exceeded");                }                if (compareAndSetState(current, next)) {                    return true;                }            }        }        ...    }    ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    ...        //Releases in shared mode.      //Implemented by unblocking one or more threads if #tryReleaseShared returns true.    public final boolean releaseShared(int arg) {        //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁        if (tryReleaseShared(arg)) {            //执行AQS的doReleaseShared()方法,唤醒等待队列中的线程            doReleaseShared();            return true;        }        return false;    }        //Release action for shared mode -- signals successor and ensures propagation.     //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.    private void doReleaseShared() {        for (;;) {            //每次循环时头结点都会发生变化            //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程            //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点            Node h = head;//获取最新的头结点            if (h != null && h != tail) {//等待队列中存在挂起线程的结点                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {//头结点的状态正常,表现对应的线程可以被唤醒                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {                        continue;//loop to recheck cases                    }                    //唤醒头结点的后继结点                    //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点                    unparkSuccessor(h);                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {                    //假如ws = 0表现初始状态,则修改结点为PROPAGATE状态                    continue;//loop on failed CAS                }            }            if (h == head) {//判断头结点是否有变化                break;//loop if head changed            }        }    }        //Wakes up node's successor, if one exists.    private void unparkSuccessor(Node node) {        int ws = node.waitStatus;        if (ws < 0) {            compareAndSetWaitStatus(node, ws, 0);        }        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev) {                if (t.waitStatus  0L) {                        nanos = trip.awaitNanos(nanos);                    }                } catch (InterruptedException ie) {                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        Thread.currentThread().interrupt();                    }                }                if (g.broken) {                    throw new BrokenBarrierException();                }                if (g != generation) {                    return index;                }                if (timed && nanos
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

光之使者

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表