光之使者 发表于 2025-2-19 20:05:53

JUC并发—7.AQS源码分析三

大纲
1.等待多线程完成的CountDownLatch先容
2.CountDownLatch.await()方法源码
3.CountDownLatch.coutDown()方法源码
4.CountDownLatch总结
5.控制并发线程数的Semaphore先容
6.Semaphore的令牌获取过程
7.Semaphore的令牌释放过程
8.同步屏障CyclicBarrier先容
9.CyclicBarrier的await()方法源码
10.使用CountDownLatch等待注册的完成
11.使用CyclicBarrier将工作任务多线程分而治之
12.使用CyclicBarrier聚合服务接口的返回效果
13.使用Semaphore等待指定数量线程完成任务
 
volatile、synchronized、CAS、AQS、读写锁、锁优化和锁故障、并发集合、线程池、同步组件
 
1.等待多线程完成的CountDownLatch
(1)CountDownLatch的简介
(2)CountDownLatch的应用
(3)CountDownLatch的例子
 
(1)CountDownLatch的简介
CountDownLatch答应一个或多个线程等待其他线程完成操纵。CountDownLatch提供了两个焦点方法,分别是await()方法和countDown()方法。CountDownLatch.await()方法让调用线程举行阻塞进入等待状态,CountDownLatch.countDown()方法用于对计数器举行递减。
 
CountDownLatch在构造时需要传入一个正整数作为计数器初始值。线程每调用一次countDown()方法,都会对该计数器减一。当计数器为0时,会唤醒所有执行await()方法时被阻塞的线程。
 
(2)CountDownLatch的应用
应用一:
使用多线程去解析一个Excel里多个sheet的数据,每个线程解析一个sheet里的数据,等所有sheet解析完再提示处理完成。此时便可以使用CountDownLatch来实现,当然可以使用Thread.join()方法。
 
留意:Thread.join()方法是基于wait()和notify()来实现的。在main线程里开启一个线程A,main线程假如执行了线程A的join()方法,那么就会导致main线程被阻塞,main线程会等待线程A执行完毕才会继续往下执行。
 
应用二:
微服务注册中心的register-client,为了在注册线程执行成功后,才发送心跳。可以使用CountDownLatch,当然也可以使用Thread.join()方法。
 
应用三:
可以通过CountDownLatch实现类似并发的效果。把CountDownLatch的计数器设置为1,然后让1000个线程调用await()方法。当1000个线程初始化完成后,在main线程调用countDown()让计数器归零。这样这1000个线程就会在一个for()循环中,依次被唤醒。
 
(3)CountDownLatch的例子
public class CountDownLatchDemo {
    public static void main(String[] args) throws Exception {
      final CountDownLatch latch = new CountDownLatch(2);
      new Thread() {
            public void run() {
                try {
                  Thread.sleep(1000);
                  System.out.println("线程1开始执行,休眠2秒...");
                  Thread.sleep(1000);

                  System.out.println("线程1准备执行countDown操作...");
                  latch.countDown();

                  System.out.println("线程1完成执行countDown操作...");
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      }.start();

      new Thread() {
            public void run() {
                try {
                  Thread.sleep(1000);
                  System.out.println("线程2开始执行,休眠2秒...");
                  Thread.sleep(1000);

                  System.out.println("线程2准备执行countDown操作...");
                  latch.countDown();

                  System.out.println("线程2完成执行countDown操作...");
                } catch (Exception e) {
                  e.printStackTrace();
                }
            }
      }.start();

      System.out.println("main线程准备执行countDownLatch的await操作,将会同步阻塞等待...");
      latch.await();
      System.out.println("所有线程都完成countDown操作,结束同步阻塞等待...");
    }

2.CountDownLatch.await()方法源码
(1)CountDownLatch.await()方法的阻塞流程
(2)CountDownLatch.await()方法的唤醒流程
(3)CountDownLatch.await()方法的阻塞总结
 
(1)CountDownLatch.await()方法的阻塞流程
CountDownLatch是基于AQS中的共享锁来实现的。从CountDownLatch的构造方法可知,CountDownLatch的count就是AQS的state。
 
调用CountDownLatch的await()方法时,会先调用AQS的acquireSharedInterruptibly()模版方法,然后会调用CountDownLatch的内部类Sync实现的tryAcquireShared()方法。tryAcquireShared()方法会判断state的值是否为0,假如为0,才返回1,否则返回-1。
 
当调用CountDownLatch内部类Sync的tryAcquireShared()方法得到的返回值是-1时,才会调用AQS的doAcquireSharedInterruptibly()方法,将当火线程封装成Node结点加入等待队列,然后挂起当火线程举行阻塞。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {
    private final Sync sync;
   
    public CountDownLatch(int count) {
      if (count < 0) {
            throw new IllegalArgumentException("count < 0");
      }
      this.sync = new Sync(count);
    }
   
    //Synchronization control For CountDownLatch.
    //Uses AQS state to represent count.
    private static final class Sync extends AbstractQueuedSynchronizer {
      Sync(int count) {
            setState(count);
      }
      
      int getCount() {
            return getState();
      }
      
      protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
      }
      
      protected boolean tryReleaseShared(int releases) {
            //Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0) {
                  return false;
                }
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) {
                  return nextc == 0;
                }
            }
      }
    }
   
    //Causes the current thread to wait until the latch has counted down to zero,
    //unless the thread is Thread#interrupt interrupted.
    public void await() throws InterruptedException {
      //执行AQS的acquireSharedInterruptibly()方法
      sync.acquireSharedInterruptibly(1);
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    //Acquires in shared mode, aborting if interrupted.
    //Implemented by first checking interrupt status, then invoking at least once #tryAcquireShared, returning on success.
    //Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
    //invoking #tryAcquireShared until success or the thread is interrupted.
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
      if (Thread.interrupted()) {
            throw new InterruptedException();
      }
      //执行CountDownLatch的内部类Sync实现的tryAcquireShared()方法,抢占共享锁
      if (tryAcquireShared(arg) < 0) {
            //执行AQS的doAcquireSharedInterruptibly()方法
            doAcquireSharedInterruptibly(arg);
      }
    }
   
    //Acquires in shared interruptible mode.
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
      final Node node = addWaiter(Node.SHARED);//封装当前线程为Shared类型的Node结点
      boolean failed = true;
      try {
            //第一次循环r = -1,所以会执行AQS的shouldParkAfterFailedAcquire()方法
            //将node结点的有效前驱结点的状态设置为SIGNAL
            for (;;) {
                final Node p = node.predecessor();//node结点的前驱结点
                if (p == head) {
                  int r = tryAcquireShared(arg);
                  if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                  }
                }
                //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL
                //执行parkAndCheckInterrupt()方法挂起当前线程
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                  throw new InterruptedException();
                }
            }
      } finally {
            if (failed) {
                cancelAcquire(node);
            }
      }
    }
   
    //Checks and updates status for a node that failed to acquire.
    //Returns true if thread should block. This is the main signal control in all acquire loops.
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL) {
            //This node has already set status asking a release to signal it, so it can safely park.
            return true;
      }
      if (ws > 0) {
            //Predecessor was cancelled. Skip over predecessors and indicate retry.
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
      } else {
            //waitStatus must be 0 or PROPAGATE.
            //Indicate that we need a signal, but don't park yet.
            //Caller will need to retry to make sure it cannot acquire before parking.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
    }
   
    //设置头结点和唤醒后续线程
    //Sets head of queue, and checks if successor may be waiting in shared mode,
    //if so propagating if either propagate > 0 or PROPAGATE status was set.
    private void setHeadAndPropagate(Node node, int propagate) {
      Node h = head;
      setHead(node);//将node结点设置为头结点
      if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared()) {
                doReleaseShared();
            }
      }
    }
   
    private void setHead(Node node) {
      head = node;
      node.thread = null;
      node.prev = null;
    }
    ...
}(2)CountDownLatch.await()方法的唤醒流程
调用await()方法时,起首会将当火线程封装成Node结点并添加到等待队列中,然后在执行第一次for循环时会设置该Node结点的前驱结点状态为SIGNAL,接着在执行第二次for循环时才会将当火线程举行挂起阻塞。
 
当该线程后续被唤醒时,该线程又会进入下一次for循环。假如该线程对应的node结点的前驱结点是等待队列的头结点且state值已为0,那么就执行AQS的setHeadAndPropagate()方法设置头结点 + 唤醒后续线程。
 
其中setHeadAndPropagate()方法有两个工作(设置头结点 + 唤醒通报):
工作一:设置当前被唤醒线程对应的结点为头结点
工作二:当满意如下这两个条件的时候需要调用doReleaseShared()方法唤醒后续的线程
条件一:propagate > 0,表现当前是共享锁,需要举行唤醒通报
条件二:s.isShared()判断当前结点为共享模式
 
CountDownLatch的实现中会在以下两个场景调用doReleaseShared()方法:
场景一:state为1时调用的countDown()方法会调用doReleaseShared()方法
场景二:当阻塞的线程被唤醒时,会调用setHeadAndPropagate()方法,进而调用doReleaseShared()方法,这样可以提升唤醒共享结点的速率
 
(3)CountDownLatch.await()方法的阻塞总结
只要state != 0,就会举行如下处理:
一.将当火线程封装成一个Node结点,然后添加到AQS的等待队列中
二.调用LockSupport.park()方法,挂起当火线程
 
3.CountDownLatch.coutDown()方法源码
(1)CountDownLatch.coutDown()的唤醒流程
(2)CountDownLatch.tryReleaseShared()
(3)AQS的doReleaseShared()方法
 
(1)CountDownLatch.coutDown()的唤醒流程
调用CountDownLatch的countDown()方法时,会先调用AQS的releaseShared()模版方法,然后会执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法。
 
假如tryReleaseShared()方法返回true,则执行AQS的doReleaseShared()方法,通过AQS的doReleaseShared()方法唤醒共享锁模式下的等待队列中的线程。
//A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
public class CountDownLatch {
    private final Sync sync;
   
    public CountDownLatch(int count) {
      if (count < 0) {
            throw new IllegalArgumentException("count < 0");
      }
      this.sync = new Sync(count);
    }
   
    //Synchronization control For CountDownLatch.
    //Uses AQS state to represent count.
    private static final class Sync extends AbstractQueuedSynchronizer {
      Sync(int count) {
            setState(count);
      }
      
      int getCount() {
            return getState();
      }
      
      protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
      }
      
      protected boolean tryReleaseShared(int releases) {
            //Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0) {
                  return false;
                }
                int nextc = c-1;
                if (compareAndSetState(c, nextc)) {
                  return nextc == 0;
                }
            }
      }
    }
         
    //Decrements the count of the latch, releasing all waiting threads if the count reaches zero.
    public void countDown() {
      //执行AQS的releaseShared()方法
      sync.releaseShared(1);
    }
    ...
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    ...
    //Releases in shared mode.
    //Implemented by unblocking one or more threads if #tryReleaseShared returns true.
    public final boolean releaseShared(int arg) {
      //执行CountDownLatch的内部类Sync实现的tryReleaseShared()方法,释放共享锁
      if (tryReleaseShared(arg)) {
            //执行AQS的doReleaseShared()方法
            doReleaseShared();
            return true;
      }
      return false;
    }
   
    //Release action for shared mode -- signals successor and ensures propagation.
    //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.
    private void doReleaseShared() {
      for (;;) {
            //每次循环时头结点都会发生变化
            //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程
            //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点
            Node h = head;//获取最新的头结点
            if (h != null && h != tail) {//等待队列中存在挂起线程的结点
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//头结点的状态正常,表示对应的线程可以被唤醒
                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {
                        continue;//loop to recheck cases
                  }
                  //唤醒头结点的后继结点
                  //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点
                  unparkSuccessor(h);
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                  //如果ws = 0表示初始状态,则修改结点为PROPAGATE状态
                  continue;//loop on failed CAS
                }
            }
            if (h == head) {//判断头结点是否有变化
                break;//loop if head changed
            }
      }
    }
   
    //Wakes up node's successor, if one exists.
    private void unparkSuccessor(Node node) {
      int ws = node.waitStatus;
      if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
      }

      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) {
                if (t.waitStatus <= 0) {
                  s = t;
                }
            }
      }
      if (s != null) {
            LockSupport.unpark(s.thread);
      }
    }
    ...

7.Semaphore的令牌释放过程
(1)Semaphore的令牌释放过程
(2)Semaphore的令牌释放本质
 
(1)Semaphore的令牌释放过程
在调用Semaphore的release()方法去释放令牌时:起首会执行AQS的模版方法releaseShared(),然后执行Sync实现的tryReleaseShared()方法来释放锁(累加state值)。假如释放锁成功,则执行AQS的doReleaseShared()方法去唤醒线程。
 
(2)Semaphore的令牌释放本质
Semaphore的release()方法释放令牌的本质就是对state字段举行累加,然后唤醒等待队列头结点的后继结点 + 唤醒通报来唤醒等待的线程。
 
留意:并非一定要执行acquire()方法的线程才能调用release()方法,任意一个线程都可以调用release()方法,也可以通过reducePermits()方法来减少令牌数。
public class Semaphore implements java.io.Serializable {    private final Sync sync;      //Creates a Semaphore with the given number of permits and nonfair fairness setting.    public Semaphore(int permits) {      sync = new NonfairSync(permits);    }      //Releases a permit, returning it to the semaphore.    public void release() {      //执行AQS的模版方法releaseShared()      sync.releaseShared(1);    }      //Synchronization implementation for semaphore.      //Uses AQS state to represent permits. Subclassed into fair and nonfair versions.    abstract static class Sync extends AbstractQueuedSynchronizer {      Sync(int permits) {            //设置state的值为传入的令牌数            setState(permits);      }                //实验释放锁,也就是对state值举行累加      protected final boolean tryReleaseShared(int releases) {            for (;;) {                int current = getState();                int next = current + releases;                if (next < current) {                  throw new Error("Maximum permit count exceeded");                }                if (compareAndSetState(current, next)) {                  return true;                }            }      }      ...    }    ...}public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    ...      //Releases in shared mode.      //Implemented by unblocking one or more threads if #tryReleaseShared returns true.    public final boolean releaseShared(int arg) {      //执行Semaphore的内部类Sync实现的tryReleaseShared()方法,释放共享锁      if (tryReleaseShared(arg)) {            //执行AQS的doReleaseShared()方法,唤醒等待队列中的线程            doReleaseShared();            return true;      }      return false;    }      //Release action for shared mode -- signals successor and ensures propagation.   //Note: For exclusive mode, release just amounts to calling unparkSuccessor of head if it needs signal.    private void doReleaseShared() {      for (;;) {            //每次循环时头结点都会发生变化            //因为调用unparkSuccessor()方法会唤醒doAcquireSharedInterruptibly()方法中阻塞的线程            //然后阻塞的线程会在执行setHeadAndPropagate()方法时通过setHead()修改头结点            Node h = head;//获取最新的头结点            if (h != null && h != tail) {//等待队列中存在挂起线程的结点                int ws = h.waitStatus;                if (ws == Node.SIGNAL) {//头结点的状态正常,表现对应的线程可以被唤醒                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) {                        continue;//loop to recheck cases                  }                  //唤醒头结点的后继结点                  //唤醒的线程会在doAcquireSharedInterruptibly()方法中执行setHeadAndPropagate()方法修改头结点                  unparkSuccessor(h);                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {                  //假如ws = 0表现初始状态,则修改结点为PROPAGATE状态                  continue;//loop on failed CAS                }            }            if (h == head) {//判断头结点是否有变化                break;//loop if head changed            }      }    }      //Wakes up node's successor, if one exists.    private void unparkSuccessor(Node node) {      int ws = node.waitStatus;      if (ws < 0) {            compareAndSetWaitStatus(node, ws, 0);      }      Node s = node.next;      if (s == null || s.waitStatus > 0) {            s = null;            for (Node t = tail; t != null && t != node; t = t.prev) {                if (t.waitStatus0L) {                        nanos = trip.awaitNanos(nanos);                  }                } catch (InterruptedException ie) {                  if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                  } else {                        Thread.currentThread().interrupt();                  }                }                if (g.broken) {                  throw new BrokenBarrierException();                }                if (g != generation) {                  return index;                }                if (timed && nanos
页: [1]
查看完整版本: JUC并发—7.AQS源码分析三