Java ConcurrentHashMap 高并发安全实现原理分析
//最大容量private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始容量
private static final int DEFAULT_CAPACITY = 16;
//数组的最大容量,防止抛出OOM
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//最大并行度,仅用于兼容JDK1.7从前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//扩容因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树退化阈值
static final int UNTREEIFY_THRESHOLD = 6;
//链表转红黑树的最小总量
static final int MIN_TREEIFY_CAPACITY = 64;
//扩容搬运时批量搬运的最小槽位数
private static final int MIN_TRANSFER_STRIDE = 16;
//当前待扩容table的邮戳位,通常是高16位
private static final int RESIZE_STAMP_BITS = 16;
//同时搬运的线程数自增的最大值
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//搬运线程数的标识位,通常是低16位
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
static final int MOVED = -1; // 说明是forwardingNode
static final int TREEBIN = -2; // 红黑树
static final int RESERVED = -3; // 原子计算的占位Node
static final int HASH_BITS = 0x7fffffff; // 保证hashcode扰动计算结果为正数
//当前哈希表
transient volatile Node<K,V>[] table;
//下一个哈希表
private transient volatile Node<K,V>[] nextTable;
//计数的基准值
private transient volatile long baseCount;
//控制变量,不同场景有不同用途,参考下文
private transient volatile int sizeCtl;
//并发搬运过程中CAS获取区段的下限值
private transient volatile int transferIndex;
//计数cell初始化或者扩容时基于此字段利用自旋锁
private transient volatile int cellsBusy;
//加速多核CPU计数的cell数组
private transient volatile CounterCell[] counterCells;
四、安全操纵Node<K,V>数组
=====================
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
对Node<K,V>[] 上恣意一个index的读取和写入都利用了Unsafe辅助类,table本身是volatile范例的并不能保证其下的每个元素的内存语义也是volatile范例;
需要借助于Unsafe来保证Node<K,V>[]元素的“读/写/CAS”操纵在多核并发环境下的原子或者可见性。
五、读操纵get为什么是线程安全的
=====================
首先需要明白的是,C13Map的读操纵一般是不加锁的(TreeBin的读解锁除外),而读操纵与写操纵有大概并行;可以保证的是,因为C13Map的写操纵都要获取bin头部的syncronized互斥锁,能保证最多只有一个线程在做更新,这实在是一个单线程写、多线程读的并发安全性的标题。
C13Map的get方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//执行扰动函数
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
1、如果当前哈希表table为null
=======================
哈希表未初始化或者正在初始化未完成,直接返回null;虽然line5和line18之间其它线程大概经历了千山万水,至少在判断tab==null的时间点key肯定是不存在的,返回null符合某一时刻的客观事实。
2、如果读取的bin头节点为null
======================
说明该槽位尚未有节点,直接返回null。
3、如果读取的bin是一个链表
===================
说明头节点是个普通Node。
(1)如果正在发生链表向红黑树的treeify工作,因为treeify本身并不粉碎旧的链表bin的布局,只是在全部treeify完成后将头节点一次性替换为新创建的TreeBin,可以放心读取。
(2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不粉碎旧的链表bin的布局,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
(3)如果其它线程正在操纵链表,在当前线程遍历链表的恣意一个时间点,都有大概同时在发生add/replace/remove操纵。
[*] 如果是add操纵,因为链表的节点新增从JDK8以后都采用了后入式,无非是多遍历或者少遍历一个tailNode。
[*] 如果是remove操纵,存在遍历到某个Node时,正好有其它线程将其remove,导致其孤立于整个链表之外;但因为其next引用未发生变更,整个链表并没有断开,还是可以照常遍历链表直到tailNode。
[*] 如果是replace操纵,链表的布局未变,只是某个Node的value发生了变化,没有安全标题。
结论:对于链表这种线性数据布局,单线程写且插入操纵保证是后入式的条件下,并发读取是安全的;不会存在误读、链表断开导致的漏读、读到环状链表等标题。
4、如果读取的bin是一个红黑树
====================
说明头节点是个TreeBin节点。
(1)如果正在发生红黑树向链表的untreeify操纵,因为untreeify本身并不粉碎旧的红黑树布局,只是在全部untreeify完成后将头节点一次性替换为新创建的普通Node,可以放心读取。
(2)如果正在发生resize且当前bin正在被transfer,因为transfer本身并不粉碎旧的红黑树布局,只是在全部transfer完成后将头节点一次性替换为ForwardingNode,可以放心读取。
(3)如果其他线程在操纵红黑树,在当前线程遍历红黑树的恣意一个时间点,都大概有单个的其它线程发生add/replace/remove/红黑树的翻转等操纵,参考下面的红黑树的读写锁实现。
TreeBin中的读写锁实现
TreeNode<K,V> root;
volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
private final void lockRoot() { //如果一次性获取写锁失败,进入contendedLock循环体,循环获取写锁或者休眠等待
if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))
contendedLock(); // offload to separate method
} private final void unlockRoot() { lockState = 0;
} //对红黑树加互斥锁,也就是写锁
private final void contendedLock() { boolean waiting = false;
for (int s;
页:
[1]