马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Java并发探索--下篇
承接上文:
博客园【上篇】:https://www.cnblogs.com/jackjavacpp/p/18852416
csdn:【上篇】:https://blog.csdn.net/okok__TXF/article/details/147595101
1. AQS实现锁
AQS前传
网址:https://www.cnblogs.com/jackjavacpp/p/18787832
1) aqs分析
AQS 的核心原理是通过一个 int 类型的状态变量 state 来表示同步状态,使用一个 FIFO 队列来管理等待的线程。通过 CAS 操作来保证状态的原子性更新,同时提供了独占模式和共享模式的获取与释放方法。子类可以通过重写 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等方法来实现具体的同步逻辑。- // 关键的属性:
- // 同步状态,0 表示未锁定,大于 0 表示已锁定[>1表示可重入锁的重入次数]
- private volatile int state;
- // 队列的头节点
- private transient volatile Node head;
- // 队列的尾节点
- private transient volatile Node tail;
- // 其中Node的重要变量
- //节点已取消: 表示该节点关联的线程已放弃等待(如超时、被中断),需从队列中移除
- static final int CANCELLED = 1;
- //需唤醒后继节点: 当前节点的线程释放锁或取消时,必须唤醒其后继节点。
- //节点入队后需确保前驱节点的waitStatus为SIGNAL,否则需调整前驱状态。
- static final int SIGNAL = -1;
- //节点在条件队列中: 表示节点处于条件队列(如Condition的等待队列),而非同步队列(CLH队列)。
- //状态转换:当调用Condition.signal()时,节点从条件队列转移到同步队列,状态重置为0
- static final int CONDITION = -2;
- //共享模式下唤醒需传播: 在共享锁(如Semaphore)释放时,确保唤醒动作传播给所有后续节点。
- static final int PROPAGATE = -3;
- //通过状态值控制线程的阻塞、唤醒和队列管理
- volatile int waitStatus;
复制代码 aqs独占锁、共享锁的获取和释放分析
独占锁的获取:- // AbstractQueuedSynchronizer.java
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
复制代码 tryAcquire:尝试直接获取锁,这是一个必要子类实现的方法, 尝试直接获取锁(如CAS修改state)。【非公平锁:直接尝试CAS抢占资源;公平锁:先检查队列中是否有等待线程(hasQueuedPredecessors()),避免插队】
如果第一步返回false, 则进入第二步: addWaiter:将当前线程封装成一个 Node 节点,并添加到等待队列的尾部。- private Node addWaiter(Node mode) {
- Node node = new Node(Thread.currentThread(), mode);
- // 尝试快速插入到队列尾部
- Node pred = tail;
- if (pred != null) {
- node.prev = pred;
- if (compareAndSetTail(pred, node)) {
- pred.next = node;
- return node;
- }
- }
- // 快速插入失败,使用 enq 方法插入
- enq(node);//enq 方法会通过循环和 CAS 操作确保节点成功插入到队列尾部。
- return node;
- }
- private Node enq(final Node node) {
- // 死循环
- for (;;) {
- Node t = tail;
- if (t == null) {
- // 使用CAS设置头结点 -- 这里是设置了一个普通的node
- // 下次循环才会把传进来的node放到队列尾部
- if (compareAndSetHead(new Node()))
- // 首尾指向同一个节点
- tail = head;
- } else { // 尾部tail不为空,说明队列中有节点
- node.prev = t;
- if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
- }
- }
- }
- }
复制代码 然后,结点添加到队列尾部之后,acquireQueued:让当前线程在同步队列中阻塞,然后在被其他线程唤醒时去获取锁;【让线程在同步队列中阻塞,直到它成为头节点的下一个节点,被头节点对应的线程唤醒,然后开始获取锁,若获取成功才会从方法中返回】。- final boolean acquireQueued(final Node node, int arg) {
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- // 获取当前线程节点的前一个节点
- final Node p = node.predecessor();
- if (p == head && tryAcquire(arg)) {// 前驱是头节点且获取锁成功
- setHead(node);
- p.next = null; // help GC
- failed = false;
- return interrupted;
- }
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int ws = pred.waitStatus;
- if (ws == Node.SIGNAL)
- // 说明前驱节点会在释放锁时唤醒当前节点,当前线程可以安全地阻塞
- return true;
- // 如果前驱节点的等待状态大于 0,即 CANCELLED 状态
- if (ws > 0) {
- // 前驱节点已取消,需要跳过这些已取消的节点
- do {
- // 将当前节点的前驱节点指向前驱节点的前驱节点
- node.prev = pred = pred.prev;
- } while (pred.waitStatus > 0);
- // 更新跳过已取消节点后的前驱节点的后继节点为当前节点
- pred.next = node;
- } else {
- // 前驱节点的等待状态为 0 或 PROPAGATE,将其状态设置为 SIGNAL
- compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
- }
- // 当前线程不应该被阻塞,需要再次尝试获取锁
- return false;
- }
复制代码 shouldParkAfterFailedAcquire():确保前驱节点的waitStatus为SIGNAL(表示会唤醒后继节点),否则清理已取消的节点;它通过检查前驱节点的等待状态,决定当前线程在获取锁失败后是否应该被阻塞。它处理了前驱节点的不同状态,确保等待队列的正确性和线程的正确阻塞与唤醒,
parkAndCheckInterrupt() :让当前线程阻塞,并且在被唤醒之后检查该线程是否被中断 【里面用到了LockSupport,见后面并发工具】
独占锁的释放:- public final boolean release(int arg) {
- //调用tryRelease【子类里面实现】
- //尝试修改state释放锁,若成功,将返回true,否则false
- if (tryRelease(arg)) {
- Node h = head;
- // 检查头节点不为空且头节点的等待状态不为 0
- if (h != null && h.waitStatus != 0)
- // 唤醒头节点的后继节点
- unparkSuccessor(h);
- return true; // 释放成功,返回 true
- }
- return false;// 释放失败,返回 false
- }
- private void unparkSuccessor(Node node) {
- // 获取节点的等待状态
- int ws = node.waitStatus;
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
- // 获取节点的后继节点
- Node s = node.next;
- // 如果后继节点为空或者后继节点的等待状态大于 0(已取消)
- if (s == null || s.waitStatus > 0) {
- s = null;
- // 从队列尾部开始向前查找,找到第一个等待状态小于等于 0 的节点
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- // 如果找到了合适的后继节点,唤醒该节点对应的线程
- if (s != null)
- LockSupport.unpark(s.thread);
- }
复制代码 setHeadAndPropagate(node, r); 先把当前获取到同步状态的节点设置为新的头节点,接着根据不同条件判断是否要将共享状态的获取传播给后续节点。要是满足传播条件,就会调用 doReleaseShared 方法去唤醒后续等待的共享节点。
共享锁的释放- public final void acquireShared(int arg) {
- if (tryAcquireShared(arg) < 0)//这个方法子类实现
- //若返回值小于0,表示获取共享锁失败,则线程需要进入到同步队列中等待
- doAcquireShared(arg);
- }
- private void doAcquireShared(int arg) {
- final Node node = addWaiter(Node.SHARED); // 以SHARED加入一个结点
- boolean failed = true;
- try {
- boolean interrupted = false;
- for (;;) {
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) { // 获取共享锁成功
- setHeadAndPropagate(node, r); // 传播给其他线程
- p.next = null; // help GC
- if (interrupted)
- selfInterrupt();
- failed = false;
- return;
- }
- }
- // 判断是否阻塞,唤醒后是否被中断--同上
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- interrupted = true;
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
- private void setHeadAndPropagate(Node node, int propagate) {
- Node h = head; // 记录旧的头节点
- setHead(node); // 将当前节点设置为新的头节点
- if (propagate > 0 || h == null || h.waitStatus < 0 ||
- (h = head) == null || h.waitStatus < 0) {
- Node s = node.next;
- if (s == null || s.isShared())
- doReleaseShared();
- }
- }
复制代码 2) 自定义锁
学一下jdk源码,写一个内置的Sync同步器,低位16位记录写锁重入次数,高位16位记录读锁获取次数- public final boolean release(int arg) {
- // 调用tryRelease:【子类实现】
- if (tryRelease(arg)) {
- // 若释放锁成功,需要将当前线程移出同步队列
- Node h = head;
- // 若head不是null,且waitStatus不为0,表示它是一个装有线程的正常节点,
- // 在之前提到的addWaiter方法中,若同步队列为空,则会创建一个默认的节点放入head
- // 这个默认的节点不包含线程,它的waitStatus就是0,所以不能释放锁
- if (h != null && h.waitStatus != 0)
- // 若head是一个正常的节点,则调用unparkSuccessor唤醒它的下一个节点所对应的线程
- unparkSuccessor(h);
- // 释放成功
- return true;
- }
- // 释放锁失败
- return false;
- }
复制代码 上面的同步器中,必要注意的点如下:
高16位和低16位是啥情况?- // 内置同步器
- private static class Sync extends AbstractQueuedSynchronizer {
- static final int SHARED_SHIFT = 16;
- static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
- // 写锁方法(tryAcquire/tryRelease)-- 独占
- protected boolean tryAcquire(int acquires) {
- Thread current = Thread.currentThread();
- int state = getState();
- int writeCount = getWriteCount(state);
- // 如果存在读锁或写锁(且持有者不是当前线程),获取失败
- if (state != 0) {
- // writeCount是0,但是state不是0,说明有线程获取到了读锁
- if (writeCount == 0 || current != getExclusiveOwnerThread())
- return false;
- }
- // 检查是否超过最大重入次数(低16位是否溢出)
- if (writeCount + acquires > EXCLUSIVE_MASK)
- throw new Error("超出最大重入次数");
- // CAS更新写锁状态
- if (compareAndSetState(state, state + acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int releases) {
- if (Thread.currentThread() != getExclusiveOwnerThread())
- throw new IllegalMonitorStateException();
- int newState = getState() - releases;
- boolean free = (getWriteCount(newState) == 0);
- if (free)
- setExclusiveOwnerThread(null);
- setState(newState);
- return free;
- }
- // 读锁方法(tryAcquireShared/tryReleaseShared)
- protected int tryAcquireShared(int acquires) {
- Thread current = Thread.currentThread();
- int state = getState();
- // 如果有其他线程持有写锁,且不是当前线程(允许锁降级),则获取失败
- if (getWriteCount(state) != 0 && getExclusiveOwnerThread() != current)
- return -1;
- // 计算读锁数量
- int readCount = getReadCount(state);
- if (readCount == (1 << SHARED_SHIFT) - 1)
- throw new Error("超出最大读锁数量");
- // CAS增加读锁计数(高16位)
- if (compareAndSetState(state, state + (1 << SHARED_SHIFT))) {
- return 1; // 成功获取读锁
- }
- return -1; // 需要进入队列等待
- }
- protected boolean tryReleaseShared(int releases) {
- for (;;) {
- int state = getState();
- int readCount = getReadCount(state);
- if (readCount == 0)
- throw new IllegalMonitorStateException();
- // CAS减少读锁计数
- int newState = state - (1 << SHARED_SHIFT);
- if (compareAndSetState(state, newState)) {
- return readCount == 1; // 最后一个读锁释放时可能触发唤醒
- }
- }
- }
- // 其他辅助方法
- int getReadCount(int state) { return state >>> SHARED_SHIFT; }
- int getWriteCount(int state) { return state & EXCLUSIVE_MASK; }
- }
复制代码 知道了这些然后就好理解了- 1. 从state获取写重入次数 和 读锁持有数====================
- 先说低16位,我们都知道int是32位的整数,用低16位的二进制位表示写锁的重入次数,如下:
- 32位二进制:
- [高位16位]11111111 11111111 | [低位16位]11111111 11111111
- 16位二进制全部是1,那么其表示的数字就是 2^16 - 1 = 65535【也就是说最大可重入次数是65535次】
- 既然现在是用的state的低位16位来记录的写锁重入次数,我们要怎么获取state的低位16位表示的数字呢?
- 很明显: state & ( 65535 ) 就行了: 也就是上面的 state & EXCLUSIVE_MASK
- 高位16位呢?【读锁获取的次数】
- 是不是state无符号右移16位就行了,剩下的不就是高位的16位了吗
- 也就是上面的:state >>> SHARED_SHIFT
- 2. 增加/减少重入次数 和 读锁持有数====================
- 写锁的话,直接state加减就可以了,因为直接加减就是从最低位开始的;
- 读呢? 因为需要把数字加到高位部分的那16位去,所以把需要加的数左移16位就好了;减的话同理。
复制代码 这样自定义了一个简单的读写锁就完成了, 然后测试一下- public class TReadWriteLock {
- private final Sync sync;
- private final ReadLock readLock;
- private final WriteLock writeLock;
- public TReadWriteLock() {
- sync = new Sync();
- readLock = new ReadLock(sync);
- writeLock = new WriteLock(sync);
- }
- // 对外暴露读写锁
- public Lock readLock() {return readLock;}
- public Lock writeLock() {return writeLock;}
- // 同步器Sync
- ....
- // 读锁(共享)
- public static class ReadLock implements Lock {
- private final Sync sync;
- public ReadLock(Sync sync) { this.sync = sync; }
- public void lock() { sync.acquireShared(1); }
- public void unlock() { sync.releaseShared(1); }
- // 其他方法(略)
- }
- // 写锁(独占)
- public static class WriteLock implements Lock {
- private final Sync sync;
- public WriteLock(Sync sync) { this.sync = sync; }
- public void lock() { sync.acquire(1); }
- public void unlock() { sync.release(1); }
- // 其他方法(略)
- }
- }
复制代码 2. 探索并发工具
①ConcurrentHashMap
- jdk1.8
Java 提供了一个多线程版本的ConcurrentHashMap。不仅线程安全,还能保持肯定的性能。平凡版本的HashMap看这里:
平凡的Map --网址:https://www.cnblogs.com/jackjavacpp/p/18787832
本文这里主要看其put方法和get方法: 我这里就写在表明里面了
先看put方法:- public class CustomLockTest {
- private TReadWriteLock readWriteLock;
- private Lock readLock;
- private Lock writeLock;
- private Map<String, String> data;
- public CustomLockTest() {
- readWriteLock = new TReadWriteLock();
- readLock = readWriteLock.readLock();
- writeLock = readWriteLock.writeLock();
- data = new HashMap<>();
- }
- public static void main(String[] args) {
- CustomLockTest obj = new CustomLockTest();
- // 两个线程写
- new Thread(() -> obj.write("key", "value"), "写Thread-1").start();
- new Thread(() -> obj.write("key", "value5"), "写Thread-2").start();
- // 4个线程读
- for (int i = 0; i < 4; i++)
- new Thread(() -> System.out.println(obj.read("key")), "读" + i).start();
- try {
- TimeUnit.SECONDS.sleep(5);
- System.out.println("main线程结束");
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- public void write(String key, String value) {
- writeLock.lock();
- try {
- System.out.println( Thread.currentThread().getName() + "写入中~~~");
- TimeUnit.SECONDS.sleep(1);
- data.put(key, value);
- System.out.println( Thread.currentThread().getName() + "写入ok~~~");
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- writeLock.unlock();
- }
- }
- public String read(String key) {
- readLock.lock();
- try {
- System.out.println( Thread.currentThread().getName() + "读取中~~~");
- TimeUnit.SECONDS.sleep(2);
- System.out.println( Thread.currentThread().getName() + "读取ok~~~" + data.get(key));
- return data.get(key);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } finally {
- readLock.unlock();
- }
- }
- }
复制代码 值得注意的是:tabAt方法是以原子操作的方式获取 ConcurrentHashMap 底层数组中指定索引位置的节点,以此保证数据的一致性和线程安全。- final V putVal(K key, V value, boolean onlyIfAbsent) {
- // 熟悉HashMap的都知道,HashMap是允许key为null的!
- // 这里key、value都不能为null!!!!
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;// 用于记录链表或红黑树中节点的数量
- // 熟悉HashMap的都知道,HashMap.put东西最外层是没有循环的
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- if (tab == null || (n = tab.length) == 0)
- tab = initTable(); // 初始化底层table
- // hash计算出的index上的位置是空的
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- // cas修改这个位置--那么看到这里应该很清楚了,外面为什么会有for循环了
- // 这一看就是cas的自旋锁嘛
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // cas修改ok,就break了
- }
- // 如果该位置的节点的哈希值为 MOVED,说明正在进行扩容操作,当前线程协助进行扩容
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- else { // hash计算出的index上的位置不是空的
- V oldVal = null;
- // f是table[(n - 1) & hash]的元素,
- // 可以理解为f表示某一个桶,这里锁某一个桶,减小了锁的粒度
- synchronized (f) {
- // 判断一下该位置是不是被别人动过了
- if (tabAt(tab, i) == f) {
- // fh是f的hash值
- if (fh >= 0) {
- binCount = 1;
- // 遍历链表
- for (Node<K,V> e = f;; ++binCount) {
- K ek;
- // 检查当前节点的键是否与要插入的键相同
- if (e.hash == hash &&
- ((ek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val;// 记录旧值
- if (!onlyIfAbsent)// 如果 onlyIfAbsent 为 false,更新节点的值
- e.val = value;
- break;
- }
- Node<K,V> pred = e;
- // 如果遍历到链表末尾,将新节点插入到链表尾部
- if ((e = e.next) == null) {
- pred.next = new Node<K,V>(hash, key,
- value, null);
- break;
- }
- }
- }
- // 如果该位置的节点是 TreeBin 类型,说明该位置是一个红黑树
- else if (f instanceof TreeBin) {
- Node<K,V> p;
- binCount = 2;
- if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
- value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent)
- p.val = value;
- }
- }
- }
- }
- // 如果 binCount 不为 0,说明已经完成插入或更新操作
- if (binCount != 0) {
- // 如果链表长度达到树化阈值,将链表转换为红黑树
- if (binCount >= TREEIFY_THRESHOLD)
- treeifyBin(tab, i);
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- // 更新元素数量并检查是否需要扩容
- addCount(1L, binCount);
- return null;
- }
复制代码 可以看到get方法并没有加锁。
ConcurrentHashMap的新方法:
putlfAbsent(K key,Vvalue):只有当key不存在时才插入。
此外,ConcurrentHashMap中map.put(key, map.get(key) + 1);并不会保证原子性。为了保证复合操作的原子性,ConcurrentHashMap在1.8中还有HashMap里面没有的新方法:
<strong>compute(K key, BiFunction |