一. AQS概述
AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks 包下面。
AQS(AbstractQueuedSynchronizer)是 Java 并发库中非常核心的一个类,它为构建锁(如 ReentrantLock)和其他同步工具(如 CountDownLatch, Semaphore 等)提供了框架。AQS的设计理念是通过一个先辈先出(FIFO)队列来管理线程的排队和同步操作。
- AQS 重要负责管理同步状态(即锁的持有和释放状态)和线程的排队,底层使用了 CAS(Compare And Swap)来实现原子操作,从而提高并发性能。
- AQS 并没有直接提供同步工具,而是提供了一些抽象方法,供我们在继续 AQS 时举行实现。
二. AQS工作原理
2.1 原理概览
AQS 核心头脑是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,而且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程壅闭等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。
AQS(AbstractQueuedSynchronizer)原理图:
AQS 内部维护了一个 state 变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。AQS 使用 CAS对该同步状态举行原子操作实现对其值的修改。
- private volatile int state; //共享变量,使用volatile修饰保证线程可见性
复制代码 状态信息通过 protected 范例的getState,setState,compareAndSetState举行操作
- //返回同步状态的当前值
- protected final int getState() {
- return state;
- }
- // 设置同步状态的值
- protected final void setState(int newState) {
- state = newState;
- }
- //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
- protected final boolean compareAndSetState(int expect, int update) {
- return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
- }
复制代码 2.2 AQS 对资源的共享方式
AQS 提供了两种资源共享方式,分别是 独占式和共享式资源访问方式,这两种方式分别适用于不同的同步场景:
1. 独占式资源访问(Exclusive Mode)
独占式资源访问模式指的是同一时候只有一个线程能够获取资源并执行任务。当资源被一个线程占用时,其他线程必须等待资源释放才气继续获取。这种模式适用于锁等需要线程独占资源的场景。
又可分为公平锁和非公平锁,ReentrantLock 同时支持两种锁,下面以 ReentrantLock 对这两种锁的界说做介绍:
- 公平锁:按照线程在队列中的排队次序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,先通过两次 CAS 操作去抢锁,如果没抢到,当火线程再加入到队列中等待唤醒。
下面来看 ReentrantLock 中干系的源代码:
ReentrantLock 默认采用非公平锁,因为思量得到更好的性能,通过 boolean 来决定是否用公平锁(传入 true 用公平锁)。
- /** Synchronizer providing all implementation mechanics */
- private final Sync sync;
- public ReentrantLock() {
- // 默认非公平锁
- sync = new NonfairSync();
- }
- public ReentrantLock(boolean fair) {
- sync = fair ? new FairSync() : new NonfairSync();
- }
复制代码 非公平锁的 lock 方法:
- static final class NonfairSync extends Sync {
- final void lock() {
- // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
- if (compareAndSetState(0, 1))
- setExclusiveOwnerThread(Thread.currentThread());
- else
- acquire(1);
- }
- // AbstractQueuedSynchronizer.acquire(int arg)
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- protected final boolean tryAcquire(int acquires) {
- return nonfairTryAcquire(acquires);
- }
- }
- /**
- * Performs non-fair tryLock. tryAcquire is implemented in
- * subclasses, but both need nonfair try for trylock method.
- */
- final boolean nonfairTryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- // 这里没有对阻塞队列进行判断
- if (compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0) // overflow
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
复制代码 公平锁的 lock 方法:
- static final class FairSync extends Sync {
- final void lock() {
- acquire(1);
- }
- // AbstractQueuedSynchronizer.acquire(int arg)
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- }
复制代码 总结
公平锁和非公平锁只有两处不同:
- 非公平锁在调用 lock 后,起首就会调用 CAS 举行一次抢锁,如果这个时候可巧锁没有被占用,那么直接就获取到锁返回了。
- 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。
公平锁和非公平锁就这两点区别,如果这两次 CAS 都不乐成,那么后面非公平锁和公平锁是一样的,都要进入到壅闭队列等待唤醒。
相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。固然,非公平锁让获取锁的时间变得更加不确定,大概会导致在壅闭队列中的线程长期处于饥饿状态。
Synchonized黑白公平锁,因为它采用非公平的锁获取机制。释放锁时,会唤醒等待线程,但不包管次序,导致先到的线程大概被后续线程插队。
2. 共享式资源访问(Shared Mode)
共享式资源访问模式答应多个线程同时访问资源。在这种模式下,多个线程可以并发地使用资源,直到资源被占用的最大限定为止。适用于那些答应多个线程同时执行某些任务的场景,如信号量(Semaphore)、读写锁(ReadWriteLock)等。
3. AQS中的模版模式应用
在模板模式(Template Pattern)中,一个抽象类公开界说了执行它的方法的方式/模板。它的子类可以按需要重写方法实现,但调用将以抽象类中界说的方式举行。这种范例的设计模式属于举动型模式。
同步器的设计是基于模板方法模式的,如果需要自界说同步器一般的方式是这样(模板方法模式很经典的一个应用):
- 使用者继续 AbstractQueuedSynchronizer 并重写指定的方法。(这些重写方法很简单,无非是对于共享资源 state 的获取和释放)
- 将 AQS 组合在自界说同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
AQS 提供的模板方法:
- isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false。
复制代码
- 子类(如 ReentrantLock)继续 AQS,并重写上述方法以界说详细的同步逻辑。
- 默认环境下,这些方法会抛出 UnsupportedOperationException,必须重写以实现功能。
- AQS中的其他方法通常是 final,不可重写,确保算法框架的固定结构。
- 用户只需专注于资源的获取和释放逻辑,别的复杂管理(如线程排队、状态维护)由 AQS 处理。
示例:ReentrantLock中的应用
- state变量:用于表示资源的持有状态。初始化为 0,表示未锁定。
- 锁的获取与释放:
- 获取锁(lock()):通过 tryAcquire() 独占获取。乐成则 state+1,多个线程只能有一个乐成。
- 释放锁(unlock()):通过 tryRelease() 独占释放。state 淘汰,回零时释放资源给其他线程。
- 可重入特性:同一线程可以多次获取锁,state 递增,释放时同样递减,确保最终能释放乐成。
示例:CountDownLatch中的应用
- 初始化state:设置为N,表示需要完成的子线程数量。
- 子线程完成:每个子线程完成后调用 countDown(),使用 CAS(比较并交换)操作将 state减 1。
- 主线程等待:直到 state 为 0,主线程被 unpark(),继续执行后续操作。
自界说同步器的选择
- 独占方式:当资源只能由一个线程使用时,实现 tryAcquire() 和 tryRelease()。
- 共享方式:当资源可以被多个线程共享时(如读锁),实现 tryAcquireShared() 和tryReleaseShared()。
- 结合使用:AQS 支持同时实现独占和共享方式,如 ReentrantReadWriteLock,其中读锁共享,写锁独占。
三. Semaphore(信号量)-答应多个线程同时访问
Semaphore 是 Java 并发包中用于控制同时访问某个资源的线程数量的工具。它通过许可证机制,答应多个线程(特定数量)同时访问共享资源,超过数量的线程会被壅闭,直到有许可证被释放。
3.1 核心方法
- acquire(): 获取一个许可证。如果没有可用许可证,线程会壅闭,直到有许可证被释放。
- release(): 释放一个许可证,增长可用许可证的数量,答应更多的线程继续执行。
- tryAcquire(): 尝试获取一个许可证,若获取失败则立即返回 false,不壅闭线程。
Semaphore 有两种模式,公平模式和非公平模式:
- 公平模式:线程按调用 acquire() 的次序依次获取许可证,包管先辈先得(FIFO)。
- 非公平模式:默认模式,线程尝试直接获取许可证,大概引起“抢断”现象,导致某些线程长时间等待。
3.2 内部实现
- 基于 AbstractQueuedSynchronizer(AQS),通过状态变量 state 管理许可证数量。
- acquire 方法淘汰 state,若不敷则壅闭线程;release 方法增长 state,唤醒壅闭线程。
3.3 示例
- public class SemaphoreExample1 {
- private static final int threadCount = 550;
- public static void main(String[] args) throws InterruptedException {
- ExecutorService threadPool = Executors.newFixedThreadPool(300);
- final Semaphore semaphore = new Semaphore(20);
- for (int i = 0; i < threadCount; i++) {
- final int threadnum = i;
- threadPool.execute(() -> {
- try {
- semaphore.acquire(); // 获取一个许可
- test(threadnum);
- semaphore.release(); // 释放一个许可
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- threadPool.shutdown();
- System.out.println("finish");
- }
- public static void test(int threadnum) throws InterruptedException {
- Thread.sleep(1000);
- System.out.println("threadnum:" + threadnum);
- Thread.sleep(1000);
- }
- }
复制代码 四. CountDownLatch (倒计时器)
CountDownLatch 是 Java 并发工具包中的一个计数器工具,答应主线程等待多个线程完成任务后才继续执行。它通过一个计数器来实现同步,当计数器减到 0 时,主线程被唤醒。
4.1 工作原理
- 初始化计数器:使用构造方法传入一个初始值 count,表示需要完成的任务数。
- 任务完成通知:每个任务完成后,调用 countDown() 方法,计数器减 1。
- 主线程等待:主线程调用 await() 方法壅闭,直到计数器为 0 方可继续执行。
4.2 两种典范用法
- 应用场景:启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
- 某一线程在开始运行前等待 n 个线程执行完毕。将 CountDownLatch 的计数器初始化为 n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减 1 countdownlatch.countDown(),当计数器的值变为 0 时,在CountDownLatch上 await() 的线程就会被唤醒。
- 应用场景:多个线程同时开始执行,好比并发测试。
初始化一个共享的 CountDownLatch 对象,将其计数器初始化为 1 :new CountDownLatch(1),多个线程在开始执行任务前起首 coundownlatch.await(),当主线程调用 countDown() 时,计数器变为 0,多个线程同时被唤醒。
4.3 示例
- /**
- *
- * @
- * @date 2025年2月17日
- * @Description: CountDownLatch 使用方法示例
- */
- public class CountDownLatchExample1 {
- // 请求的数量
- private static final int threadCount = 550;
- public static void main(String[] args) throws InterruptedException {
- // 创建一个具有固定线程数量的线程池对象(如果这里线程池的线程数量给太少的话你会发现执行的很慢)
- ExecutorService threadPool = Executors.newFixedThreadPool(300);
- final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
- for (int i = 0; i < threadCount; i++) {
- final int threadnum = i;
- threadPool.execute(() -> {// Lambda 表达式的运用
- try {
- test(threadnum);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- countDownLatch.countDown();// 表示一个请求已经被完成
- }
- });
- }
- countDownLatch.await();
- threadPool.shutdown();
- System.out.println("finish");
- }
- public static void test(int threadnum) throws InterruptedException {
- Thread.sleep(1000);// 模拟请求的耗时操作
- System.out.println("threadnum:" + threadnum);
- Thread.sleep(1000);// 模拟请求的耗时操作
- }
- }
复制代码 上面的代码中,我们界说了请求的数量为 550,当这 550 个请求被处理完成之后,才会执行System.out.println("finish");。
与 CountDownLatch 的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用 CountDownLatch.await() 方法。这样主线程的操作就会在这个方法上壅闭,直到其他线程完成各自的任务。
其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的 count 值就减 1。以是当 N 个线程都调 用了这个方法,count 的值等于 0,然后主线程就能通过 await()方法,恢复执行本身的任务。
4.4 CountDownLatch 的不敷
CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。
五. CyclicBarrier(循环栅栏)
5.2 原理
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被壅闭,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier 默认的构造方法是 CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉 CyclicBarrier 我已经到达了屏障,然后当火线程被壅闭。
- CyclicBarrier 和 CountDownLatch 非常类似,它也可以实现线程间的技能等待,但是它的功能比 CountDownLatch 更加复杂和强大。重要应用场景和 CountDownLatch 类似。
- CountDownLatch 的实现是基于 AQS 的,而 CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。
构造函数:
- public CyclicBarrier(int parties) {
- this(parties, null);
- }
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- this.parties = parties;
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
复制代码 parties 就代表了有拦截的线程的数量,当拦截的线程数量到达这个值的时候就打开栅栏,让所有线程通过。
5.2 应用场景
- 多线程数据处理:如分块处理大数据集,每个线程处理一部门,屏障点用于汇总效果。
- 分布式系统:多个节点需同步处理数据或状态更新。
- 模拟与测试:需多个线程协同执行特定阶段的场景。
5.3 实例
示例一:
- /**
- *
- * @
- * @date 2025年2月17日
- * @Description: 测试 CyclicBarrier 类中带参数的 await() 方法
- */
- public class CyclicBarrierExample2 {
- // 请求的数量
- private static final int threadCount = 550;
- // 需要同步的线程数量
- private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
- public static void main(String[] args) throws InterruptedException {
- // 创建线程池
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- for (int i = 0; i < threadCount; i++) {
- final int threadNum = i;
- Thread.sleep(1000);
- threadPool.execute(() -> {
- try {
- test(threadNum);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- });
- }
- threadPool.shutdown();
- }
- public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
- System.out.println("threadnum:" + threadnum + "is ready");
- try {
- /**等待60秒,保证子线程完全执行结束*/
- cyclicBarrier.await(60, TimeUnit.SECONDS);
- } catch (Exception e) {
- System.out.println("-----CyclicBarrierException------");
- }
- System.out.println("threadnum:" + threadnum + "is finish");
- }
- }
复制代码 运行效果,如下:

可以看到当线程数量也就是请求数量到达我们界说的 5 个的时候, await方法之后的方法才被执行。
别的,CyclicBarrier 还提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。示例代码如下:
- /**
- *
- * @
- * @date 2025年2月17日
- * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
- */
- public class CyclicBarrierExample3 {
- // 请求的数量
- private static final int threadCount = 550;
- // 需要同步的线程数量
- private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
- System.out.println("------当线程数达到之后,优先执行------");
- });
- public static void main(String[] args) throws InterruptedException {
- // 创建线程池
- ExecutorService threadPool = Executors.newFixedThreadPool(10);
- for (int i = 0; i < threadCount; i++) {
- final int threadNum = i;
- Thread.sleep(1000);
- threadPool.execute(() -> {
- try {
- test(threadNum);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- });
- }
- threadPool.shutdown();
- }
- public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
- System.out.println("threadnum:" + threadnum + "is ready");
- cyclicBarrier.await();
- System.out.println("threadnum:" + threadnum + "is finish");
- }
- }
复制代码 运行效果,如下:

程序的输出将分为多个批次,每批次处理5个线程。每个批次会依次打印5个线程的“is ready”信息,随后由屏障触发打印一次提示信息,然后这5个线程依次打印“is finish”信息,如此反复,直到所有550个线程处理完毕。
5.4 CyclicBarrier源码分析
当调用 CyclicBarrier 对象调用 await() 方法时,实际上调用的是dowait(false, 0L)方法。 await() 方法就像树立起一个栅栏的举动一样,将线程挡住了,当拦住的线程数量到达 parties 的值时,栅栏才会打开,线程才得以通过执行。
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
复制代码- // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
- private int count;
- /**
- * Main barrier code, covering the various policies.
- */
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- // 锁住
- lock.lock();
- try {
- final Generation g = generation;
- if (g.broken)
- throw new BrokenBarrierException();
- // 如果线程中断了,抛出异常
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- // cout减1
- int index = --count;
- // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- command.run();
- ranAction = true;
- // 将 count 重置为 parties 属性的初始化值
- // 唤醒之前等待的线程
- // 下一波执行开始
- nextGeneration();
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // loop until tripped, broken, interrupted, or timed out
- for (;;) {
- try {
- if (!timed)
- trip.await();
- else if (nanos > 0L)
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- // We're about to finish waiting even if we had not
- // been interrupted, so this interrupt is deemed to
- // "belong" to subsequent execution.
- Thread.currentThread().interrupt();
- }
- }
- if (g.broken)
- throw new BrokenBarrierException();
- if (g != generation)
- return index;
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
复制代码 总结:CyclicBarrier 内部通过一个 count 变量作为计数器,cout 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。
5.5 CyclicBarrier 和 CountDownLatch 的区别
特性CyclicBarrierCountDownLatch用途多线程到达屏障点后一起继续等待多个线程完成任务使用方式双向等待,多次使用单向等待,只能使用一次计数机制可重置,支持循环使用一次性,不可重置屏障动作支持自界说屏障动作不支持屏障动作
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |