qidao123.com技术社区-IT企服评测·应用市场
标题:
并发编程--下篇
[打印本页]
作者:
尚未崩坏
时间:
2025-5-2 20:58
标题:
并发编程--下篇
Java并发探索--下篇
承接上文:
博客园【
上篇
】:
https://www.cnblogs.com/jackjavacpp/p/18852416
csdn:【
上篇
】:
https://blog.csdn.net/okok__TXF/article/details/147595101
1. AQS实现锁
AQS前传
网址:
https://www.cnblogs.com/jackjavacpp/p/18787832
1) aqs分析
AQS 的核心原理是通过一个 int 类型的状态变量 state 来表示同步状态,使用一个 FIFO 队列来管理等待的线程。通过 CAS 操作来保证状态的原子性更新,同时提供了独占模式和共享模式的获取与释放方法。子类可以通过重写 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等方法来实现具体的同步逻辑。
// 关键的属性:
// 同步状态,0 表示未锁定,大于 0 表示已锁定[>1表示可重入锁的重入次数]
private volatile int state;
// 队列的头节点
private transient volatile Node head;
// 队列的尾节点
private transient volatile Node tail;
// 其中Node的重要变量
//节点已取消: 表示该节点关联的线程已放弃等待(如超时、被中断),需从队列中移除
static final int CANCELLED = 1;
//需唤醒后继节点: 当前节点的线程释放锁或取消时,必须唤醒其后继节点。
//节点入队后需确保前驱节点的waitStatus为SIGNAL,否则需调整前驱状态。
static final int SIGNAL = -1;
//节点在条件队列中: 表示节点处于条件队列(如Condition的等待队列),而非同步队列(CLH队列)。
//状态转换:当调用Condition.signal()时,节点从条件队列转移到同步队列,状态重置为0
static final int CONDITION = -2;
//共享模式下唤醒需传播: 在共享锁(如Semaphore)释放时,确保唤醒动作传播给所有后续节点。
static final int PROPAGATE = -3;
//通过状态值控制线程的阻塞、唤醒和队列管理
volatile int waitStatus;
复制代码
aqs独占锁、共享锁的获取和释放分析
独占锁
独占锁的获取:
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
复制代码
tryAcquire:尝试直接获取锁,这是一个必要
子类实现
的方法, 尝试直接获取锁(如CAS修改state)。【非公平锁:直接尝试CAS抢占资源;公平锁:先检查队列中是否有等待线程(hasQueuedPredecessors()),避免插队】
如果第一步返回false, 则进入第二步: addWaiter:将当前线程封装成一个 Node 节点,并添加到等待队列的尾部。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速插入到队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 快速插入失败,使用 enq 方法插入
enq(node);//enq 方法会通过循环和 CAS 操作确保节点成功插入到队列尾部。
return node;
}
private Node enq(final Node node) {
// 死循环
for (;;) {
Node t = tail;
if (t == null) {
// 使用CAS设置头结点 -- 这里是设置了一个普通的node
// 下次循环才会把传进来的node放到队列尾部
if (compareAndSetHead(new Node()))
// 首尾指向同一个节点
tail = head;
} else { // 尾部tail不为空,说明队列中有节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
复制代码
然后,结点添加到队列尾部之后,acquireQueued:让当前线程在同步队列中阻塞,然后在被其他线程唤醒时去获取锁;【
让线程在同步队列中阻塞,直到它成为头节点的下一个节点,被头节点对应的线程唤醒,然后开始获取锁,若获取成功才会从方法中返回
】。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前线程节点的前一个节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 前驱是头节点且获取锁成功
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 说明前驱节点会在释放锁时唤醒当前节点,当前线程可以安全地阻塞
return true;
// 如果前驱节点的等待状态大于 0,即 CANCELLED 状态
if (ws > 0) {
// 前驱节点已取消,需要跳过这些已取消的节点
do {
// 将当前节点的前驱节点指向前驱节点的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 更新跳过已取消节点后的前驱节点的后继节点为当前节点
pred.next = node;
} else {
// 前驱节点的等待状态为 0 或 PROPAGATE,将其状态设置为 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 当前线程不应该被阻塞,需要再次尝试获取锁
return false;
}
复制代码
shouldParkAfterFailedAcquire()
:确保前驱节点的waitStatus为SIGNAL(表示会唤醒后继节点),否则清理已取消的节点;它通过检查前驱节点的等待状态,决定当前线程在获取锁失败后是否应该被阻塞。它处理了前驱节点的不同状态,确保等待队列的正确性和线程的正确阻塞与唤醒,
parkAndCheckInterrupt()
:让当前线程阻塞,并且在被唤醒之后检查该线程是否被中断 【里面用到了LockSupport,见后面并发工具】
独占锁的释放:
public final boolean release(int arg) {
//调用tryRelease【子类里面实现】
//尝试修改state释放锁,若成功,将返回true,否则false
if (tryRelease(arg)) {
Node h = head;
// 检查头节点不为空且头节点的等待状态不为 0
if (h != null && h.waitStatus != 0)
// 唤醒头节点的后继节点
unparkSuccessor(h);
return true; // 释放成功,返回 true
}
return false;// 释放失败,返回 false
}
private void unparkSuccessor(Node node) {
// 获取节点的等待状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取节点的后继节点
Node s = node.next;
// 如果后继节点为空或者后继节点的等待状态大于 0(已取消)
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列尾部开始向前查找,找到第一个等待状态小于等于 0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了合适的后继节点,唤醒该节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
复制代码
setHeadAndPropagate(node, r); 先把当前获取到同步状态的节点设置为新的头节点,接着根据不同条件判断是否要将共享状态的获取传播给后续节点。要是满足传播条件,就会调用 doReleaseShared 方法去唤醒后续等待的共享节点。
共享锁的释放
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//这个方法子类实现
//若返回值小于0,表示获取共享锁失败,则线程需要进入到同步队列中等待
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 以SHARED加入一个结点
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取共享锁成功
setHeadAndPropagate(node, r); // 传播给其他线程
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否阻塞,唤醒后是否被中断--同上
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录旧的头节点
setHead(node); // 将当前节点设置为新的头节点
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
复制代码
2) 自定义锁
自定义一个读写锁
学一下jdk源码,写一个内置的Sync同步器,低位16位记录写锁重入次数,高位16位记录读锁获取次数
public final boolean release(int arg) {
// 调用tryRelease:【子类实现】
if (tryRelease(arg)) {
// 若释放锁成功,需要将当前线程移出同步队列
Node h = head;
// 若head不是null,且waitStatus不为0,表示它是一个装有线程的正常节点,
// 在之前提到的addWaiter方法中,若同步队列为空,则会创建一个默认的节点放入head
// 这个默认的节点不包含线程,它的waitStatus就是0,所以不能释放锁
if (h != null && h.waitStatus != 0)
// 若head是一个正常的节点,则调用unparkSuccessor唤醒它的下一个节点所对应的线程
unparkSuccessor(h);
// 释放成功
return true;
}
// 释放锁失败
return false;
}
复制代码
上面的同步器中,必要注意的点如下:
高16位和低16位是啥情况?
// 内置同步器
private static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 写锁方法(tryAcquire/tryRelease)-- 独占
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int state = getState();
int writeCount = getWriteCount(state);
// 如果存在读锁或写锁(且持有者不是当前线程),获取失败
if (state != 0) {
// writeCount是0,但是state不是0,说明有线程获取到了读锁
if (writeCount == 0 || current != getExclusiveOwnerThread())
return false;
}
// 检查是否超过最大重入次数(低16位是否溢出)
if (writeCount + acquires > EXCLUSIVE_MASK)
throw new Error("超出最大重入次数");
// CAS更新写锁状态
if (compareAndSetState(state, state + acquires)) {
setExclusiveOwnerThread(current);
return true;
}
return false;
}
protected boolean tryRelease(int releases) {
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
int newState = getState() - releases;
boolean free = (getWriteCount(newState) == 0);
if (free)
setExclusiveOwnerThread(null);
setState(newState);
return free;
}
// 读锁方法(tryAcquireShared/tryReleaseShared)
protected int tryAcquireShared(int acquires) {
Thread current = Thread.currentThread();
int state = getState();
// 如果有其他线程持有写锁,且不是当前线程(允许锁降级),则获取失败
if (getWriteCount(state) != 0 && getExclusiveOwnerThread() != current)
return -1;
// 计算读锁数量
int readCount = getReadCount(state);
if (readCount == (1 << SHARED_SHIFT) - 1)
throw new Error("超出最大读锁数量");
// CAS增加读锁计数(高16位)
if (compareAndSetState(state, state + (1 << SHARED_SHIFT))) {
return 1; // 成功获取读锁
}
return -1; // 需要进入队列等待
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int state = getState();
int readCount = getReadCount(state);
if (readCount == 0)
throw new IllegalMonitorStateException();
// CAS减少读锁计数
int newState = state - (1 << SHARED_SHIFT);
if (compareAndSetState(state, newState)) {
return readCount == 1; // 最后一个读锁释放时可能触发唤醒
}
}
}
// 其他辅助方法
int getReadCount(int state) { return state >>> SHARED_SHIFT; }
int getWriteCount(int state) { return state & EXCLUSIVE_MASK; }
}
复制代码
知道了这些然后就好理解了
1. 从state获取写重入次数 和 读锁持有数====================
先说低16位,我们都知道int是32位的整数,用低16位的二进制位表示写锁的重入次数,如下:
32位二进制:
[高位16位]11111111 11111111 | [低位16位]11111111 11111111
16位二进制全部是1,那么其表示的数字就是 2^16 - 1 = 65535【也就是说最大可重入次数是65535次】
既然现在是用的state的低位16位来记录的写锁重入次数,我们要怎么获取state的低位16位表示的数字呢?
很明显: state & ( 65535 ) 就行了: 也就是上面的 state & EXCLUSIVE_MASK
高位16位呢?【读锁获取的次数】
是不是state无符号右移16位就行了,剩下的不就是高位的16位了吗
也就是上面的:state >>> SHARED_SHIFT
2. 增加/减少重入次数 和 读锁持有数====================
写锁的话,直接state加减就可以了,因为直接加减就是从最低位开始的;
读呢? 因为需要把数字加到高位部分的那16位去,所以把需要加的数左移16位就好了;减的话同理。
复制代码
这样自定义了一个简单的读写锁就完成了, 然后测试一下
public class TReadWriteLock {
private final Sync sync;
private final ReadLock readLock;
private final WriteLock writeLock;
public TReadWriteLock() {
sync = new Sync();
readLock = new ReadLock(sync);
writeLock = new WriteLock(sync);
}
// 对外暴露读写锁
public Lock readLock() {return readLock;}
public Lock writeLock() {return writeLock;}
// 同步器Sync
....
// 读锁(共享)
public static class ReadLock implements Lock {
private final Sync sync;
public ReadLock(Sync sync) { this.sync = sync; }
public void lock() { sync.acquireShared(1); }
public void unlock() { sync.releaseShared(1); }
// 其他方法(略)
}
// 写锁(独占)
public static class WriteLock implements Lock {
private final Sync sync;
public WriteLock(Sync sync) { this.sync = sync; }
public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
// 其他方法(略)
}
}
复制代码
2. 探索并发工具
①ConcurrentHashMap
- jdk1.8
Java 提供了一个多线程版本的ConcurrentHashMap。不仅线程安全,还能保持肯定的性能。平凡版本的HashMap看这里:
平凡的Map
--网址:
https://www.cnblogs.com/jackjavacpp/p/18787832
本文这里主要看其put方法和get方法: 我这里就写在表明里面了
先看put方法:
public class CustomLockTest {
private TReadWriteLock readWriteLock;
private Lock readLock;
private Lock writeLock;
private Map<String, String> data;
public CustomLockTest() {
readWriteLock = new TReadWriteLock();
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
data = new HashMap<>();
}
public static void main(String[] args) {
CustomLockTest obj = new CustomLockTest();
// 两个线程写
new Thread(() -> obj.write("key", "value"), "写Thread-1").start();
new Thread(() -> obj.write("key", "value5"), "写Thread-2").start();
// 4个线程读
for (int i = 0; i < 4; i++)
new Thread(() -> System.out.println(obj.read("key")), "读" + i).start();
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("main线程结束");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void write(String key, String value) {
writeLock.lock();
try {
System.out.println( Thread.currentThread().getName() + "写入中~~~");
TimeUnit.SECONDS.sleep(1);
data.put(key, value);
System.out.println( Thread.currentThread().getName() + "写入ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock();
}
}
public String read(String key) {
readLock.lock();
try {
System.out.println( Thread.currentThread().getName() + "读取中~~~");
TimeUnit.SECONDS.sleep(2);
System.out.println( Thread.currentThread().getName() + "读取ok~~~" + data.get(key));
return data.get(key);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
readLock.unlock();
}
}
}
复制代码
值得注意的是:tabAt方法是以原子操作的方式获取 ConcurrentHashMap 底层数组中指定索引位置的节点,以此保证数据的一致性和线程安全。
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 熟悉HashMap的都知道,HashMap是允许key为null的!
// 这里key、value都不能为null!!!!
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;// 用于记录链表或红黑树中节点的数量
// 熟悉HashMap的都知道,HashMap.put东西最外层是没有循环的
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化底层table
// hash计算出的index上的位置是空的
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// cas修改这个位置--那么看到这里应该很清楚了,外面为什么会有for循环了
// 这一看就是cas的自旋锁嘛
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // cas修改ok,就break了
}
// 如果该位置的节点的哈希值为 MOVED,说明正在进行扩容操作,当前线程协助进行扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else { // hash计算出的index上的位置不是空的
V oldVal = null;
// f是table[(n - 1) & hash]的元素,
// 可以理解为f表示某一个桶,这里锁某一个桶,减小了锁的粒度
synchronized (f) {
// 判断一下该位置是不是被别人动过了
if (tabAt(tab, i) == f) {
// fh是f的hash值
if (fh >= 0) {
binCount = 1;
// 遍历链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 检查当前节点的键是否与要插入的键相同
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;// 记录旧值
if (!onlyIfAbsent)// 如果 onlyIfAbsent 为 false,更新节点的值
e.val = value;
break;
}
Node<K,V> pred = e;
// 如果遍历到链表末尾,将新节点插入到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果该位置的节点是 TreeBin 类型,说明该位置是一个红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果 binCount 不为 0,说明已经完成插入或更新操作
if (binCount != 0) {
// 如果链表长度达到树化阈值,将链表转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 更新元素数量并检查是否需要扩容
addCount(1L, binCount);
return null;
}
复制代码
可以看到get方法并没有加锁。
ConcurrentHashMap的新方法:
putlfAbsent(K key,Vvalue)
:只有当key不存在时才插入。
此外,ConcurrentHashMap中map.put(key, map.get(key) + 1);并不会保证原子性。为了保证
复合操作的原子性
,ConcurrentHashMap在1.8中还有HashMap里面没有的新方法:
<strong>compute(K key, BiFunction
欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/)
Powered by Discuz! X3.4