并发编程--下篇

打印 上一主题 下一主题

主题 2013|帖子 2013|积分 6039

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Java并发探索--下篇

承接上文:
博客园【上篇】:https://www.cnblogs.com/jackjavacpp/p/18852416
csdn:【上篇】:https://blog.csdn.net/okok__TXF/article/details/147595101
1. AQS实现锁

AQS前传
网址:https://www.cnblogs.com/jackjavacpp/p/18787832
1) aqs分析

AQS 的核心原理是通过一个 int 类型的状态变量 state 来表示同步状态,使用一个 FIFO 队列来管理等待的线程。通过 CAS 操作来保证状态的原子性更新,同时提供了独占模式和共享模式的获取与释放方法。子类可以通过重写 tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared 等方法来实现具体的同步逻辑。
  1. // 关键的属性:
  2. // 同步状态,0 表示未锁定,大于 0 表示已锁定[>1表示可重入锁的重入次数]
  3. private volatile int state;
  4. // 队列的头节点
  5. private transient volatile Node head;
  6. // 队列的尾节点
  7. private transient volatile Node tail;
  8. // 其中Node的重要变量
  9. //节点已取消: 表示该节点关联的线程已放弃等待(如超时、被中断),需从队列中移除
  10. static final int CANCELLED =  1;
  11. //需唤醒后继节点: 当前节点的线程释放锁或取消时,必须唤醒其后继节点。
  12. //节点入队后需确保前驱节点的waitStatus为SIGNAL,否则需调整前驱状态。
  13. static final int SIGNAL    = -1;
  14. //节点在条件队列中: 表示节点处于条件队列(如Condition的等待队列),而非同步队列(CLH队列)。
  15. //状态转换:当调用Condition.signal()时,节点从条件队列转移到同步队列,状态重置为0
  16. static final int CONDITION = -2;
  17. //共享模式下唤醒需传播: 在共享锁(如Semaphore)释放时,确保唤醒动作传播给所有后续节点。
  18. static final int PROPAGATE = -3;
  19. //通过状态值控制线程的阻塞、唤醒和队列管理
  20. volatile int waitStatus;
复制代码
aqs独占锁、共享锁的获取和释放分析

  • 独占锁
独占锁的获取:
  1. // AbstractQueuedSynchronizer.java
  2. public final void acquire(int arg) {
  3.     if (!tryAcquire(arg) &&
  4.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  5.         selfInterrupt();
  6. }
复制代码
tryAcquire:尝试直接获取锁,这是一个必要子类实现的方法,  尝试直接获取锁(如CAS修改state)。【非公平锁:直接尝试CAS抢占资源;公平锁:先检查队列中是否有等待线程(hasQueuedPredecessors()),避免插队】
如果第一步返回false, 则进入第二步: addWaiter:将当前线程封装成一个 Node 节点,并添加到等待队列的尾部。
  1. private Node addWaiter(Node mode) {
  2.     Node node = new Node(Thread.currentThread(), mode);
  3.     // 尝试快速插入到队列尾部
  4.     Node pred = tail;
  5.     if (pred != null) {
  6.         node.prev = pred;
  7.         if (compareAndSetTail(pred, node)) {
  8.             pred.next = node;
  9.             return node;
  10.         }
  11.     }
  12.     // 快速插入失败,使用 enq 方法插入
  13.     enq(node);//enq 方法会通过循环和 CAS 操作确保节点成功插入到队列尾部。
  14.     return node;
  15. }
  16. private Node enq(final Node node) {
  17.     // 死循环
  18.     for (;;) {
  19.         Node t = tail;
  20.         if (t == null) {
  21.             // 使用CAS设置头结点 -- 这里是设置了一个普通的node
  22.             // 下次循环才会把传进来的node放到队列尾部
  23.             if (compareAndSetHead(new Node()))
  24.                 // 首尾指向同一个节点
  25.                 tail = head;
  26.         } else { // 尾部tail不为空,说明队列中有节点
  27.             node.prev = t;
  28.             if (compareAndSetTail(t, node)) {
  29.                 t.next = node;
  30.                 return t;
  31.             }
  32.         }
  33.     }
  34. }
复制代码
然后,结点添加到队列尾部之后,acquireQueued:让当前线程在同步队列中阻塞,然后在被其他线程唤醒时去获取锁;【让线程在同步队列中阻塞,直到它成为头节点的下一个节点,被头节点对应的线程唤醒,然后开始获取锁,若获取成功才会从方法中返回】。
  1. final boolean acquireQueued(final Node node, int arg) {
  2.         boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.         for (;;) {
  6.             // 获取当前线程节点的前一个节点
  7.             final Node p = node.predecessor();
  8.             if (p == head && tryAcquire(arg)) {// 前驱是头节点且获取锁成功
  9.                 setHead(node);
  10.                 p.next = null; // help GC
  11.                 failed = false;
  12.                 return interrupted;
  13.             }
  14.             if (shouldParkAfterFailedAcquire(p, node) &&
  15.                 parkAndCheckInterrupt())
  16.                 interrupted = true;
  17.         }
  18.     } finally {
  19.         if (failed)
  20.             cancelAcquire(node);
  21.     }
  22. }
  23. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  24.     int ws = pred.waitStatus;
  25.     if (ws == Node.SIGNAL)
  26.         // 说明前驱节点会在释放锁时唤醒当前节点,当前线程可以安全地阻塞
  27.         return true;
  28.     // 如果前驱节点的等待状态大于 0,即 CANCELLED 状态
  29.     if (ws > 0) {
  30.         // 前驱节点已取消,需要跳过这些已取消的节点
  31.         do {
  32.             // 将当前节点的前驱节点指向前驱节点的前驱节点
  33.             node.prev = pred = pred.prev;
  34.         } while (pred.waitStatus > 0);
  35.         // 更新跳过已取消节点后的前驱节点的后继节点为当前节点
  36.         pred.next = node;
  37.     } else {
  38.         // 前驱节点的等待状态为 0 或 PROPAGATE,将其状态设置为 SIGNAL
  39.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  40.     }
  41.     // 当前线程不应该被阻塞,需要再次尝试获取锁
  42.     return false;
  43. }
复制代码
shouldParkAfterFailedAcquire():确保前驱节点的waitStatus为SIGNAL(表示会唤醒后继节点),否则清理已取消的节点;它通过检查前驱节点的等待状态,决定当前线程在获取锁失败后是否应该被阻塞。它处理了前驱节点的不同状态,确保等待队列的正确性和线程的正确阻塞与唤醒,
parkAndCheckInterrupt() :让当前线程阻塞,并且在被唤醒之后检查该线程是否被中断 【里面用到了LockSupport,见后面并发工具】
独占锁的释放:
  1. public final boolean release(int arg) {
  2.     //调用tryRelease【子类里面实现】
  3.     //尝试修改state释放锁,若成功,将返回true,否则false
  4.     if (tryRelease(arg)) {
  5.         Node h = head;
  6.         // 检查头节点不为空且头节点的等待状态不为 0
  7.         if (h != null && h.waitStatus != 0)
  8.             // 唤醒头节点的后继节点
  9.             unparkSuccessor(h);
  10.         return true; // 释放成功,返回 true
  11.     }
  12.     return false;// 释放失败,返回 false
  13. }
  14. private void unparkSuccessor(Node node) {
  15.     // 获取节点的等待状态
  16.     int ws = node.waitStatus;
  17.     if (ws < 0)
  18.         compareAndSetWaitStatus(node, ws, 0);
  19.     // 获取节点的后继节点
  20.     Node s = node.next;
  21.     // 如果后继节点为空或者后继节点的等待状态大于 0(已取消)
  22.     if (s == null || s.waitStatus > 0) {
  23.         s = null;
  24.         // 从队列尾部开始向前查找,找到第一个等待状态小于等于 0 的节点
  25.         for (Node t = tail; t != null && t != node; t = t.prev)
  26.             if (t.waitStatus <= 0)
  27.                 s = t;
  28.     }
  29.     // 如果找到了合适的后继节点,唤醒该节点对应的线程
  30.     if (s != null)
  31.         LockSupport.unpark(s.thread);
  32. }
复制代码
setHeadAndPropagate(node, r); 先把当前获取到同步状态的节点设置为新的头节点,接着根据不同条件判断是否要将共享状态的获取传播给后续节点。要是满足传播条件,就会调用 doReleaseShared 方法去唤醒后续等待的共享节点。
共享锁的释放
  1. public final void acquireShared(int arg) {
  2.     if (tryAcquireShared(arg) < 0)//这个方法子类实现
  3.         //若返回值小于0,表示获取共享锁失败,则线程需要进入到同步队列中等待
  4.         doAcquireShared(arg);
  5. }
  6. private void doAcquireShared(int arg) {
  7.     final Node node = addWaiter(Node.SHARED); // 以SHARED加入一个结点
  8.     boolean failed = true;
  9.     try {
  10.         boolean interrupted = false;
  11.         for (;;) {
  12.             final Node p = node.predecessor();
  13.             if (p == head) {
  14.                 int r = tryAcquireShared(arg);
  15.                 if (r >= 0) { // 获取共享锁成功
  16.                     setHeadAndPropagate(node, r); // 传播给其他线程
  17.                     p.next = null; // help GC
  18.                     if (interrupted)
  19.                         selfInterrupt();
  20.                     failed = false;
  21.                     return;
  22.                 }
  23.             }
  24.             // 判断是否阻塞,唤醒后是否被中断--同上
  25.             if (shouldParkAfterFailedAcquire(p, node) &&
  26.                 parkAndCheckInterrupt())
  27.                 interrupted = true;
  28.         }
  29.     } finally {
  30.         if (failed)
  31.             cancelAcquire(node);
  32.     }
  33. }
  34. private void setHeadAndPropagate(Node node, int propagate) {
  35.     Node h = head; // 记录旧的头节点
  36.     setHead(node); // 将当前节点设置为新的头节点
  37.     if (propagate > 0 || h == null || h.waitStatus < 0 ||
  38.         (h = head) == null || h.waitStatus < 0) {
  39.         Node s = node.next;
  40.         if (s == null || s.isShared())
  41.             doReleaseShared();
  42.     }
  43. }
复制代码
2) 自定义锁


  • 自定义一个读写锁
学一下jdk源码,写一个内置的Sync同步器,低位16位记录写锁重入次数,高位16位记录读锁获取次数
  1. public final boolean release(int arg) {
  2.     // 调用tryRelease:【子类实现】
  3.     if (tryRelease(arg)) {
  4.         // 若释放锁成功,需要将当前线程移出同步队列
  5.         Node h = head;
  6.         // 若head不是null,且waitStatus不为0,表示它是一个装有线程的正常节点,
  7.         // 在之前提到的addWaiter方法中,若同步队列为空,则会创建一个默认的节点放入head
  8.         // 这个默认的节点不包含线程,它的waitStatus就是0,所以不能释放锁
  9.         if (h != null && h.waitStatus != 0)
  10.             // 若head是一个正常的节点,则调用unparkSuccessor唤醒它的下一个节点所对应的线程
  11.             unparkSuccessor(h);
  12.         // 释放成功
  13.         return true;
  14.     }
  15.     // 释放锁失败
  16.     return false;
  17. }
复制代码
上面的同步器中,必要注意的点如下:
高16位和低16位是啥情况?
  1. // 内置同步器
  2. private static class Sync extends AbstractQueuedSynchronizer {
  3.     static final int SHARED_SHIFT = 16;
  4.     static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
  5.     // 写锁方法(tryAcquire/tryRelease)-- 独占
  6.     protected boolean tryAcquire(int acquires) {
  7.         Thread current = Thread.currentThread();
  8.         int state = getState();
  9.         int writeCount = getWriteCount(state);
  10.         // 如果存在读锁或写锁(且持有者不是当前线程),获取失败
  11.         if (state != 0) {
  12.             // writeCount是0,但是state不是0,说明有线程获取到了读锁
  13.             if (writeCount == 0 || current != getExclusiveOwnerThread())
  14.                 return false;
  15.         }
  16.         // 检查是否超过最大重入次数(低16位是否溢出)
  17.         if (writeCount + acquires > EXCLUSIVE_MASK)
  18.             throw new Error("超出最大重入次数");
  19.         // CAS更新写锁状态
  20.         if (compareAndSetState(state, state + acquires)) {
  21.             setExclusiveOwnerThread(current);
  22.             return true;
  23.         }
  24.         return false;
  25.     }
  26.     protected boolean tryRelease(int releases) {
  27.         if (Thread.currentThread() != getExclusiveOwnerThread())
  28.             throw new IllegalMonitorStateException();
  29.         int newState = getState() - releases;
  30.         boolean free = (getWriteCount(newState) == 0);
  31.         if (free)
  32.             setExclusiveOwnerThread(null);
  33.         setState(newState);
  34.         return free;
  35.     }
  36.     // 读锁方法(tryAcquireShared/tryReleaseShared)
  37.     protected int tryAcquireShared(int acquires) {
  38.         Thread current = Thread.currentThread();
  39.         int state = getState();
  40.         // 如果有其他线程持有写锁,且不是当前线程(允许锁降级),则获取失败
  41.         if (getWriteCount(state) != 0 && getExclusiveOwnerThread() != current)
  42.             return -1;
  43.         // 计算读锁数量
  44.         int readCount = getReadCount(state);
  45.         if (readCount == (1 << SHARED_SHIFT) - 1)
  46.             throw new Error("超出最大读锁数量");
  47.         // CAS增加读锁计数(高16位)
  48.         if (compareAndSetState(state, state + (1 << SHARED_SHIFT))) {
  49.             return 1; // 成功获取读锁
  50.         }
  51.         return -1; // 需要进入队列等待
  52.     }
  53.     protected boolean tryReleaseShared(int releases) {
  54.         for (;;) {
  55.             int state = getState();
  56.             int readCount = getReadCount(state);
  57.             if (readCount == 0)
  58.                 throw new IllegalMonitorStateException();
  59.             // CAS减少读锁计数
  60.             int newState = state - (1 << SHARED_SHIFT);
  61.             if (compareAndSetState(state, newState)) {
  62.                 return readCount == 1; // 最后一个读锁释放时可能触发唤醒
  63.             }
  64.         }
  65.     }
  66.     // 其他辅助方法
  67.     int getReadCount(int state) { return state >>> SHARED_SHIFT; }
  68.     int getWriteCount(int state) { return state & EXCLUSIVE_MASK; }
  69. }
复制代码
知道了这些然后就好理解了
  1. 1. 从state获取写重入次数 和 读锁持有数====================
  2. 先说低16位,我们都知道int是32位的整数,用低16位的二进制位表示写锁的重入次数,如下:
  3. 32位二进制:
  4. [高位16位]11111111 11111111 | [低位16位]11111111 11111111
  5. 16位二进制全部是1,那么其表示的数字就是 2^16 - 1 = 65535【也就是说最大可重入次数是65535次】
  6. 既然现在是用的state的低位16位来记录的写锁重入次数,我们要怎么获取state的低位16位表示的数字呢?
  7. 很明显: state & ( 65535 ) 就行了: 也就是上面的 state & EXCLUSIVE_MASK
  8. 高位16位呢?【读锁获取的次数】
  9. 是不是state无符号右移16位就行了,剩下的不就是高位的16位了吗
  10. 也就是上面的:state >>> SHARED_SHIFT
  11. 2. 增加/减少重入次数 和 读锁持有数====================
  12. 写锁的话,直接state加减就可以了,因为直接加减就是从最低位开始的;
  13. 读呢? 因为需要把数字加到高位部分的那16位去,所以把需要加的数左移16位就好了;减的话同理。
复制代码
这样自定义了一个简单的读写锁就完成了, 然后测试一下
  1. public class TReadWriteLock {
  2.     private final Sync sync;
  3.     private final ReadLock readLock;
  4.     private final WriteLock writeLock;
  5.     public TReadWriteLock() {
  6.         sync = new Sync();
  7.         readLock = new ReadLock(sync);
  8.         writeLock = new WriteLock(sync);
  9.     }
  10.     // 对外暴露读写锁
  11.     public Lock readLock() {return readLock;}
  12.     public Lock writeLock() {return writeLock;}
  13.     // 同步器Sync
  14.     ....
  15.     // 读锁(共享)
  16.     public static class ReadLock implements Lock {
  17.         private final Sync sync;
  18.         public ReadLock(Sync sync) { this.sync = sync; }
  19.         public void lock() { sync.acquireShared(1); }
  20.         public void unlock() { sync.releaseShared(1); }
  21.         // 其他方法(略)
  22.     }
  23.     // 写锁(独占)
  24.     public static class WriteLock implements Lock {
  25.         private final Sync sync;
  26.         public WriteLock(Sync sync) { this.sync = sync; }
  27.         public void lock() { sync.acquire(1); }
  28.         public void unlock() { sync.release(1); }
  29.         // 其他方法(略)
  30.     }
  31. }
复制代码
2. 探索并发工具

①ConcurrentHashMap

- jdk1.8

Java 提供了一个多线程版本的ConcurrentHashMap。不仅线程安全,还能保持肯定的性能。平凡版本的HashMap看这里:
平凡的Map --网址:https://www.cnblogs.com/jackjavacpp/p/18787832
本文这里主要看其put方法和get方法:  我这里就写在表明里面了
先看put方法:
  1. public class CustomLockTest {
  2.     private TReadWriteLock readWriteLock;
  3.     private Lock readLock;
  4.     private Lock writeLock;
  5.     private Map<String, String> data;
  6.     public CustomLockTest() {
  7.         readWriteLock = new TReadWriteLock();
  8.         readLock = readWriteLock.readLock();
  9.         writeLock = readWriteLock.writeLock();
  10.         data = new HashMap<>();
  11.     }
  12.     public static void main(String[] args) {
  13.         CustomLockTest obj = new CustomLockTest();
  14.         // 两个线程写
  15.         new Thread(() -> obj.write("key", "value"), "写Thread-1").start();
  16.         new Thread(() -> obj.write("key", "value5"), "写Thread-2").start();
  17.         // 4个线程读
  18.         for (int i = 0; i < 4; i++)
  19.             new Thread(() -> System.out.println(obj.read("key")), "读" + i).start();
  20.         try {
  21.             TimeUnit.SECONDS.sleep(5);
  22.             System.out.println("main线程结束");
  23.         } catch (InterruptedException e) {
  24.             throw new RuntimeException(e);
  25.         }
  26.     }
  27.     public void write(String key, String value) {
  28.         writeLock.lock();
  29.         try {
  30.             System.out.println( Thread.currentThread().getName() + "写入中~~~");
  31.             TimeUnit.SECONDS.sleep(1);
  32.             data.put(key, value);
  33.             System.out.println( Thread.currentThread().getName() + "写入ok~~~");
  34.         } catch (InterruptedException e) {
  35.             throw new RuntimeException(e);
  36.         } finally {
  37.             writeLock.unlock();
  38.         }
  39.     }
  40.     public String read(String key) {
  41.         readLock.lock();
  42.         try {
  43.             System.out.println( Thread.currentThread().getName() + "读取中~~~");
  44.             TimeUnit.SECONDS.sleep(2);
  45.             System.out.println( Thread.currentThread().getName() + "读取ok~~~" + data.get(key));
  46.             return data.get(key);
  47.         } catch (InterruptedException e) {
  48.             throw new RuntimeException(e);
  49.         } finally {
  50.             readLock.unlock();
  51.         }
  52.     }
  53. }
复制代码
值得注意的是:tabAt方法是以原子操作的方式获取 ConcurrentHashMap 底层数组中指定索引位置的节点,以此保证数据的一致性和线程安全。
  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2.     // 熟悉HashMap的都知道,HashMap是允许key为null的!
  3.     // 这里key、value都不能为null!!!!
  4.     if (key == null || value == null) throw new NullPointerException();
  5.     int hash = spread(key.hashCode());
  6.     int binCount = 0;// 用于记录链表或红黑树中节点的数量
  7.     // 熟悉HashMap的都知道,HashMap.put东西最外层是没有循环的
  8.     for (Node<K,V>[] tab = table;;) {
  9.         Node<K,V> f; int n, i, fh;
  10.         if (tab == null || (n = tab.length) == 0)
  11.             tab = initTable(); // 初始化底层table
  12.         // hash计算出的index上的位置是空的
  13.         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  14.             // cas修改这个位置--那么看到这里应该很清楚了,外面为什么会有for循环了
  15.             // 这一看就是cas的自旋锁嘛
  16.             if (casTabAt(tab, i, null,
  17.                          new Node<K,V>(hash, key, value, null)))
  18.                 break;  // cas修改ok,就break了
  19.         }
  20.         // 如果该位置的节点的哈希值为 MOVED,说明正在进行扩容操作,当前线程协助进行扩容
  21.         else if ((fh = f.hash) == MOVED)
  22.             tab = helpTransfer(tab, f);
  23.         else { //  hash计算出的index上的位置不是空的
  24.             V oldVal = null;
  25.             // f是table[(n - 1) & hash]的元素,
  26.             // 可以理解为f表示某一个桶,这里锁某一个桶,减小了锁的粒度
  27.             synchronized (f) {
  28.                 // 判断一下该位置是不是被别人动过了
  29.                 if (tabAt(tab, i) == f) {
  30.                     // fh是f的hash值
  31.                     if (fh >= 0) {
  32.                         binCount = 1;
  33.                         // 遍历链表
  34.                         for (Node<K,V> e = f;; ++binCount) {
  35.                             K ek;
  36.                             // 检查当前节点的键是否与要插入的键相同
  37.                             if (e.hash == hash &&
  38.                                 ((ek = e.key) == key ||
  39.                                  (ek != null && key.equals(ek)))) {
  40.                                 oldVal = e.val;// 记录旧值
  41.                                 if (!onlyIfAbsent)// 如果 onlyIfAbsent 为 false,更新节点的值
  42.                                     e.val = value;
  43.                                 break;
  44.                             }
  45.                             Node<K,V> pred = e;
  46.                             // 如果遍历到链表末尾,将新节点插入到链表尾部
  47.                             if ((e = e.next) == null) {
  48.                                 pred.next = new Node<K,V>(hash, key,
  49.                                                           value, null);
  50.                                 break;
  51.                             }
  52.                         }
  53.                     }
  54.                     // 如果该位置的节点是 TreeBin 类型,说明该位置是一个红黑树
  55.                     else if (f instanceof TreeBin) {
  56.                         Node<K,V> p;
  57.                         binCount = 2;
  58.                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  59.                                                        value)) != null) {
  60.                             oldVal = p.val;
  61.                             if (!onlyIfAbsent)
  62.                                 p.val = value;
  63.                         }
  64.                     }
  65.                 }
  66.             }
  67.             // 如果 binCount 不为 0,说明已经完成插入或更新操作
  68.             if (binCount != 0) {
  69.                 // 如果链表长度达到树化阈值,将链表转换为红黑树
  70.                 if (binCount >= TREEIFY_THRESHOLD)
  71.                     treeifyBin(tab, i);
  72.                 if (oldVal != null)
  73.                     return oldVal;
  74.                 break;
  75.             }
  76.         }
  77.     }
  78.     // 更新元素数量并检查是否需要扩容
  79.     addCount(1L, binCount);
  80.     return null;
  81. }
复制代码
可以看到get方法并没有加锁。
ConcurrentHashMap的新方法:
putlfAbsent(K key,Vvalue):只有当key不存在时才插入。
此外,ConcurrentHashMap中map.put(key, map.get(key) + 1);并不会保证原子性。为了保证复合操作的原子性,ConcurrentHashMap在1.8中还有HashMap里面没有的新方法:

<strong>compute(K key, BiFunction
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表