JUC并发—8.并发安全集合二

金歌  论坛元老 | 2025-3-24 06:55:23 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1027|帖子 1027|积分 3081

大纲
1.JDK 1.7的HashMap的死循环与数据丢失
2.ConcurrentHashMap的并发安全
3.ConcurrentHashMap的设计介绍
4.ConcurrentHashMap的put操作流程
5.ConcurrentHashMap的Node数组初始化
6.ConcurrentHashMap对Hash冲突的处置惩罚
7.ConcurrentHashMap的并发扩容机制
8.ConcurrentHashMap的分段锁统计元素数据
9.ConcurrentHashMap的查询操作是否涉及锁
10.ConcurrentHashMap中红黑树的利用

7.ConcurrentHashMap的并发扩容机制
(1)ConcurrentHashMap扩容的前置操作
(2)ConcurrentHashMap并发扩容的机制
(3)ConcurrentHashMap并发扩容的流程

(1)ConcurrentHashMap扩容的前置操作
ConcurrentHashMap的tryPresize()方法用于处置惩罚数组扩容前的前置操作,该方法主要分为四部门。

第一部门:
首先通过tableSizeFor()方法盘算传入size的最小的2的幂次方。

第二部门:
然后判断Node数组是否已初始化,假如还没初始化则要先举行初始化。初始化时会盘算扩容阈值为数组大小的0.75倍 + 将扩容阈值赋值给sizeCtl。

第三部门:
假如Node数组已经初始化,则判断是否需要举行扩容。假如Node数组已经被其他线程完成扩容,则当前线程退出循环,无需扩容。假如Node数组已到达最大容量,则无法再举行扩容,也需退出循环。

第四部门:
调用transfer()方法开始执行扩容操作。假如sizeCtl < 0,说明此时已经有其他线程在执行扩容了。假如sizeCtl >= 0,说明此时没有其他线程举行扩容。当前线程都会先通过CAS成功设置sizeCtl后,再调用transfer()方法来扩容。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Returns a power of two table size for the given desired capacity.
  4.     private static final int tableSizeFor(int c) {
  5.         int n = c - 1;
  6.         n |= n >>> 1;
  7.         n |= n >>> 2;
  8.         n |= n >>> 4;
  9.         n |= n >>> 8;
  10.         n |= n >>> 16;
  11.         return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
  12.     }
  13.     //Tries to presize table to accommodate the given number of elements.
  14.     private final void tryPresize(int size) {
  15.         //一.首先通过tableSizeFor()方法计算传入size的最小的2的幂次方
  16.         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
  17.         int sc;
  18.         while ((sc = sizeCtl) >= 0) {
  19.             Node<K,V>[] tab = table; int n;
  20.             //二.判断Node数组是否已经初始化,如果还没初始化,需要先进行初始化
  21.             if (tab == null || (n = tab.length) == 0) {
  22.                 n = (sc > c) ? sc : c;
  23.                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  24.                     try {
  25.                         if (table == tab) {
  26.                             @SuppressWarnings("unchecked")
  27.                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  28.                             table = nt;
  29.                             sc = n - (n >>> 2);//扩容阈值为数组大小的0.75倍
  30.                         }
  31.                     } finally {
  32.                         sizeCtl = sc;//将扩容阈值赋值给sizeCtl
  33.                     }
  34.                 }
  35.             }
  36.             //三.如果Node数组已经初始化,则判断是否需要进行扩容
  37.             else if (c <= sc || n >= MAXIMUM_CAPACITY) {
  38.                 //c <= sc,说明Node数组已经被其他线程完成扩容了,不需要再进行扩容
  39.                 //n >= MAXIMUM_CAPACITY,说明Node数组已达到最大容量,无法再进行扩容
  40.                 break;
  41.             }
  42.             //四.调用transfer()方法开始执行扩容操作
  43.             else if (tab == table) {
  44.                 int rs = resizeStamp(n);
  45.                 //如果sc < 0,说明此时已经有其他线程在执行扩容了
  46.                 //于是当前线程可以先通过CAS成功设置sizeCtl的值后,再调用transfer()方法协助扩容
  47.                 if (sc < 0) {
  48.                     Node<K,V>[] nt;
  49.                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) {
  50.                         break;
  51.                     }
  52.                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  53.                         transfer(tab, nt);
  54.                     }
  55.                 }
  56.                 //如果sc >= 0,说明此时没有其他线程进行扩容
  57.                 //于是当前线程也是先通过CAS成功设置sizeCtl的值后,再调用transfer()方法进行扩容
  58.                 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {
  59.                     transfer(tab, null);
  60.                 }
  61.             }
  62.         }
  63.     }
  64.     ...
  65. }
复制代码
(2)ConcurrentHashMap并发扩容的机制
一.ConcurrentHashMap中的扩容设计
二.多线程并发扩容的原理

一.ConcurrentHashMap中的扩容设计
扩容就是创建一个2倍原大小的数组,然后把原数组的数据迁徙到新数组中。但多线程环境下的扩容,需要思量其他线程会同时往数组添加元素的情况。假如简单地对扩容过程增加一把同步锁,包管扩容过程不存在其他线程操作,那么就会对性能的消耗特殊大,特殊是数据量比力大时,壅闭的线程会许多。

首先利用CAS来实现盘算每个线程的迁徙区间。然后利用synchronized把锁粒度控制到每个数组元素上。假如数组有16个元素就有16把锁,假如数组有32个元素就有32把锁。接着假如线程A在举行数组扩容时,线程B要修改数组的某个元素f。那么就让修改元素的线程参加迁徙,从而实现多线程并发扩容来进步服从。等数组扩容完成后,线程B才继续去修改元素f。最后通过高低位迁徙逻辑盘算出高位链和低位链,大大减少了数据迁徙次数。

二.多线程并发扩容的原理
当存在多个线程并发扩容及数据迁徙时,默认会给每个线程分配一个区间。这个区间的默认长度是16,每个线程会负责自己区间内的数据迁徙工作。假如只有两个线程对长度为64的数组迁徙数据,则每个线程要做2次迁徙,迁徙过程会依靠transferIndex来更新每个线程的迁徙区间。

(3)ConcurrentHashMap并发扩容的流程
ConcurrentHashMap的transfer()方法用于处置惩罚数组扩容时的流程细节,该方法主要分为五部门:
第一部门:创建扩容后的数组
第二部门:盘算当前线程的数据迁徙区间
第三部门:更新扩容标志advance
第四部门:开始数据迁徙和扩容
第五部门:完成迁徙后的判断

第一部门:创建扩容后的数组
这部门代码主要做两件事情:
一.盘算每个线程处置惩罚的迁徙区间长度,默认是16。
二.初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable。该数组的长度是原数组的2倍,并且设置transferIndex的值为为原数组大小。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     //The next table to use; non-null only while resizing.
  3.     private transient volatile Node<K,V>[] nextTable;
  4.     ...
  5.    
  6.     //Moves and/or copies the nodes in each bin to new table.
  7.     //tab是原数组,nextTab是扩容后的数组
  8.     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  9.         int n = tab.length, stride;
  10.         //计算每个线程处理的迁移区间长度,默认是16
  11.         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
  12.             stride = MIN_TRANSFER_STRIDE;//subdivide range
  13.         }
  14.         //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍
  15.         //并且设置transferIndex的值为为原数组大小
  16.         if (nextTab == null) {//initiating
  17.             try {
  18.                 @SuppressWarnings("unchecked")
  19.                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n
  20.                 nextTab = nt;//将创建的扩容数组赋值给nextTab
  21.             } catch (Throwable ex) {//try to cope with OOME
  22.                 sizeCtl = Integer.MAX_VALUE;
  23.                 return;
  24.             }
  25.             nextTable = nextTab;//将创建的扩容数组赋值给nextTable
  26.             transferIndex = n;//设置transferIndex为原来的数组大小
  27.         }
  28.         ...
  29.     }
  30. }
复制代码
第二部门:盘算当前线程的数据迁徙区间
下面的while循环会盘算每个执行到此处的线程需要负责的数据迁徙区间。假设当前数组长度是32,需要扩容到64。那么此时transferIndex = 32,nextn = 64,n = 32。

当前线程第一次for循环:nextIndex被transferIndex赋值为32,之后CAS修改transferIndex。CAS修改成功后,nextBound = 32 - 16 = 16,transferIndex = 16。以是bound = 16,i = 31,当前线程负责的迁徙区间为[bound, i] = [16, 31]。

当前线程第二次for循环,大概有其他线程进来第一次for循环:由于此时transferIndex = 16,以是nextIndex会被transferIndex赋值为16。之后CAS修改transferIndex为0,修改成功后,nextBound = 16 - 16 = 0。以是bound = 0,i = 15,此时线程负责的迁徙区间为[bound, i] = [0, 15]。

需要留意的是:每次循环都会通过if (--i >= bound || finishing)判断区间是否已迁徙完成。假如已完成,则会继续进入while循环中的CAS,获取新的迁徙区间。

数组从高位往低位举行迁徙,好比第一次for循环,处置惩罚的区间是[16, 31]。那么就会从位置为31开始往前举行遍历,对每个数组元素举行数据迁徙。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     //The next table to use; non-null only while resizing.
  3.     private transient volatile Node<K,V>[] nextTable;
  4.     ...
  5.    
  6.     //Moves and/or copies the nodes in each bin to new table.
  7.     //tab是原数组,nextTab是扩容后的数组
  8.     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  9.         ...
  10.         int nextn = nextTab.length;//扩容后的数组长度
  11.         //继承自Node的ForwardingNode表示一个正在被迁移的Node
  12.         //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
  13.         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  14.         //advance字段用来判断是否还有待处理的数据迁移工作,也就是扩容标记
  15.         boolean advance = true;
  16.         boolean finishing = false; // to ensure sweep before committing nextTab
  17.         //当前线程负责的迁移区间是[bound, i]
  18.         for (int i = 0, bound = 0;;) {
  19.             Node<K,V> f; int fh;
  20.             //while循环会计算每个执行到此处的线程需要负责的数据迁移区间
  21.             while (advance) {
  22.                 //假设当前数组长度是32,需要扩容到64;
  23.                 //那么此时transferIndex = 32,nextn = 64,n = 32;
  24.                 //刚开始循环时i = 0,nextIndex被transferIndex赋值为32
  25.                 int nextIndex, nextBound;
  26.                 if (--i >= bound || finishing) {
  27.                     //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中
  28.                     //但后来bound = 16, i = 31后,就会进入这里,退出循环
  29.                     //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间
  30.                     advance = false;
  31.                 } else if ((nextIndex = transferIndex) <= 0) {
  32.                     //判断当前线程是否已经分配到了新的迁移区间
  33.                     i = -1;
  34.                     advance = false;
  35.                 } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
  36.                     //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,
  37.                     //那么advance设置为false,退出while循环
  38.                     //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间
  39.                     bound = nextBound;//第一次for循环nextBound = 16
  40.                     i = nextIndex - 1;//第一次for循环i = 31
  41.                     advance = false;
  42.                 }
  43.             }
  44.             ...
  45.         }
  46.         ...
  47.     }
  48.    
  49.     //A node inserted at head of bins during transfer operations.
  50.     static final class ForwardingNode<K,V> extends Node<K,V> {
  51.         final Node<K,V>[] nextTable;
  52.         ForwardingNode(Node<K,V>[] tab) {
  53.             super(MOVED, null, null, null);
  54.             this.nextTable = tab;
  55.         }
  56.         ...
  57.     }
  58. }
复制代码
第三部门:更新扩容标志advance
假如位置i的数组元素Node为空,说明该Node对象不需要迁徙。以是通过casTabAt()方法修改原数组在位置i的元素为fwd对象,如许其他线程在举行put()操作的时候就可以发现当前数组正在扩容。

假如位置i的数组元素Node的hash值为MOVED,那么说明该Node对象已经被迁徙了。以是设置扩容标志位advance为true,等下次for循环时进入while循环--i。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     //The next table to use; non-null only while resizing.
  3.     private transient volatile Node<K,V>[] nextTable;
  4.    
  5.     //The array of bins. Lazily initialized upon first insertion.
  6.     //Size is always a power of two. Accessed directly by iterators.
  7.     transient volatile Node<K,V>[] table;
  8.    
  9.     ...
  10.     //Moves and/or copies the nodes in each bin to new table.
  11.     //tab是原数组,nextTab是扩容后的数组
  12.     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  13.         ...
  14.         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  15.         ...
  16.         //当前线程负责的迁移区间是[bound, i]
  17.         for (int i = 0, bound = 0;;) {
  18.             ...
  19.             } else if ((f = tabAt(tab, i)) == null) {
  20.                 //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容
  21.                 advance = casTabAt(tab, i, null, fwd);
  22.             } else if ((fh = f.hash) == MOVED) {
  23.                 //设置扩容标记位advance为true,等下次for循环时进入while循环--i
  24.                 advance = true; // already processed
  25.                 //第三部分结束
  26.             } else {
  27.             ...
  28.         }
  29.         ...
  30.     }
  31.    
  32.     //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
  33.     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
  34.     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  35.         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  36.     }
  37.     static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
  38.         return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  39.     }
  40. }
复制代码
第四部门:开始数据迁徙和扩容
首先对当前要迁徙的Node结点f添加同步锁synchronized,避免多线程竞争。

假如结点f的哈希值大于0,则表现Node结点f为链表或平凡结点,那么此时就需要按照链表或平凡结点的方式来举行数据迁徙。

假如结点f属于TreeBin类型,则表现结点f为红黑树,那么此时就要按红黑树的规则举行数据迁徙。

需要留意的是,数据迁徙之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表。

接着利用高位链和低位链的盘算方法构造高位链和低位链,遍历链表的每一个结点,盘算p.hash & n的值。假如值为0,表现需要迁徙,属于高位链;否则不需要迁徙,属于低位链。

好比在数组长度为16的一个链表中,hash值为:4, 20, 52, 68, 84, 100。经过hash & (n - 1)得到的下标位置都是4,接着数组长度需要扩容到32。于是经过hash & (n - 1)盘算,发现20, 52, 84对应的下标变成了20。这就意味着,这个链表中hash值为20, 52, 84的结点需要迁徙到位置20。

最后把低位链设置到扩容后的数组的位置i,把高位链设置到位置i + n。此时当前线程已处置惩罚完位置为i的数据迁徙,于是设置advance为true,让后续的for循环可以进入while循环来实现对i的递减继续迁徙数据。

第五部门:完成迁徙后的判断
假如数据迁徙完成了,则把扩容后的数组赋值给table。假如还没完成数据迁徙,则通过CAS修改并发扩容的线程数。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     //The next table to use; non-null only while resizing.
  3.     private transient volatile Node<K,V>[] nextTable;
  4.     ...
  5.    
  6.     //Moves and/or copies the nodes in each bin to new table.
  7.     private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  8.         //第一部分开始:创建扩容后的数组
  9.         int n = tab.length, stride;//n就是原数组大小
  10.         //计算每个线程处理的迁移区间长度,默认是16
  11.         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
  12.             stride = MIN_TRANSFER_STRIDE;//subdivide range
  13.         }
  14.         //初始化一个新的数组nt,赋值给方法入参nextTab和全局变量nextTable,该数组的长度是原数组的2倍
  15.         //并且设置transferIndex的值为为原数组大小
  16.         if (nextTab == null) {//initiating
  17.             try {
  18.                 @SuppressWarnings("unchecked")
  19.                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩容为2n
  20.                 nextTab = nt;//将创建的扩容数组赋值给nextTab
  21.             } catch (Throwable ex) {//try to cope with OOME
  22.                 sizeCtl = Integer.MAX_VALUE;
  23.                 return;
  24.             }
  25.             nextTable = nextTab;//将创建的扩容数组赋值给nextTable
  26.             transferIndex = n;//设置transferIndex为原来的数组大小
  27.         }
  28.         //第一部分结束
  29.         //第二部分开始:计算当前线程的数据迁移区间
  30.         int nextn = nextTab.length;//扩容后的数组长度
  31.         //继承自Node的ForwardingNode表示一个正在被迁移的Node
  32.         //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
  33.         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  34.         //advance字段用来判断是否还有待处理的数据迁移工作
  35.         boolean advance = true;
  36.         boolean finishing = false; // to ensure sweep before committing nextTab
  37.       
  38.         //当前线程负责的迁移区间是[bound, i]
  39.         for (int i = 0, bound = 0;;) {
  40.             Node<K,V> f; int fh;
  41.             //while循环会计算每个执行到此处的线程需要负责的数据迁移区间
  42.             while (advance) {
  43.                 //假设当前数组长度是32,需要扩容到64;
  44.                 //那么此时transferIndex = 32,nextn = 64,n = 32;
  45.                 //刚开始循环时i = 0,nextIndex被transferIndex赋值为32
  46.                 int nextIndex, nextBound;
  47.                 if (--i >= bound || finishing) {
  48.                     //一开始i = bound = 0,所以不会进入这里,而是进入U.compareAndSwapInt()的条件中
  49.                     //但后来bound = 16, i = 31后,就会进入这里,退出循环
  50.                     //此后,每次--i,当i = bound = 16时,就又会进入U.compareAndSwapInt()的条件中,重新获取数据迁移区间
  51.                     advance = false;
  52.                 } else if ((nextIndex = transferIndex) <= 0) {
  53.                     //判断当前线程是否已经分配到了新的迁移区间
  54.                     i = -1;
  55.                     advance = false;
  56.                 } else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
  57.                     //如果CAS设置transferIndex从nextIndex=32变为nextIndex-16=16成功后,
  58.                     //那么advance设置为false,退出while循环
  59.                     //当前线程再次进入while循环或者其他线程也进入while循环,就会从transferIndex = 16开始,计算要负责的迁移区间
  60.                     bound = nextBound;//第一次for循环nextBound = 16
  61.                     i = nextIndex - 1;//第一次for循环i = 31
  62.                     advance = false;
  63.                 }
  64.             }
  65.             //第二部分结束
  66.             if (i < 0 || i >= n || i + n >= nextn) {
  67.                 //第五部分开始:完成迁移后的判断
  68.                 int sc;
  69.                 //如果数据迁移完成了,则把扩容后的数组赋值给table
  70.                 if (finishing) {
  71.                     nextTable = null;
  72.                     table = nextTab;
  73.                     sizeCtl = (n << 1) - (n >>> 1);
  74.                     return;
  75.                 }
  76.                 //如果还没完成数据迁移,则通过CAS修改并发扩容的线程数
  77.                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  78.                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
  79.                         return;
  80.                     }
  81.                     finishing = advance = true;
  82.                     i = n; // recheck before commit
  83.                 }
  84.                 //第五部分结束
  85.             } else if ((f = tabAt(tab, i)) == null) {
  86.                 //第三部分开始:更新扩容标记advance,这样其他线程在put()的时候就可以发现当前数组正在扩容
  87.                 advance = casTabAt(tab, i, null, fwd);
  88.             } else if ((fh = f.hash) == MOVED) {
  89.                 //设置扩容标记位advance为true,等下次for循环时进入while循环--i
  90.                 advance = true; // already processed
  91.                 //第三部分结束
  92.             } else {
  93.                 //第四部分开始:开始数据迁移和扩容
  94.                 synchronized (f) {//首先对当前要迁移的Node结点f添加同步锁synchronized,避免多线程竞争
  95.                     if (tabAt(tab, i) == f) {
  96.                         Node<K,V> ln, hn;
  97.                         //如果fh >= 0,则表示Node结点f为链表或普通结点,此时需要按照链表或普通结点的方式来进行数据迁移
  98.                         if (fh >= 0) {
  99.                             int runBit = fh & n;
  100.                             Node<K,V> lastRun = f;
  101.                             //for循环遍历链表,计算出当前链表最后一个需要迁移或者不需要迁移的结点位置
  102.                             //遍历链表的每一个结点,计算p.hash & n,如果值为0,表示需要迁移,否则不需要迁移
  103.                             for (Node<K,V> p = f.next; p != null; p = p.next) {
  104.                                 int b = p.hash & n;
  105.                                 if (b != runBit) {
  106.                                     runBit = b;
  107.                                     lastRun = p;
  108.                                 }
  109.                             }
  110.                             if (runBit == 0) {
  111.                                 ln = lastRun;
  112.                                 hn = null;
  113.                             } else {
  114.                                 hn = lastRun;
  115.                                 ln = null;
  116.                             }
  117.                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
  118.                                 int ph = p.hash; K pk = p.key; V pv = p.val;
  119.                                 if ((ph & n) == 0) {
  120.                                     //将ln作为参数,以ln为基础构造低位链,不需要迁移
  121.                                     ln = new Node<K,V>(ph, pk, pv, ln);
  122.                                 } else {
  123.                                     //将hn作为参数,以hn为基础构造高位链,需要迁移
  124.                                     hn = new Node<K,V>(ph, pk, pv, hn);
  125.                                 }
  126.                             }
  127.                             //把低位链设置到扩容后的数组的位置i
  128.                             setTabAt(nextTab, i, ln);
  129.                             //把高位链设置到扩容后的数组的位置i + n
  130.                             setTabAt(nextTab, i + n, hn);
  131.                             //当原数组中某位置的数据完成迁移后,会对该位置设置一个ForwardingNode,表示该位置已经处理过了
  132.                             setTabAt(tab, i, fwd);
  133.                             //当前线程已处理完位置为i的数据的迁移,于是设置advance为true,让后续的for循环继续进入while循环来实现对i的递减
  134.                             advance = true;
  135.                         } else if (f instanceof TreeBin) {
  136.                             //如果f instanceof TreeBin,则表示结点f为红黑树,需要按照红黑树的规则进行数据迁移
  137.                             //需要注意的是,数据迁移之后可能会存在红黑树转化为链表的情况,当链表长度小于等于6时,红黑树就会转化为链表
  138.                             TreeBin<K,V> t = (TreeBin<K,V>)f;
  139.                             TreeNode<K,V> lo = null, loTail = null;
  140.                             TreeNode<K,V> hi = null, hiTail = null;
  141.                             int lc = 0, hc = 0;
  142.                             for (Node<K,V> e = t.first; e != null; e = e.next) {
  143.                                 int h = e.hash;
  144.                                 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
  145.                                 if ((h & n) == 0) {
  146.                                     if ((p.prev = loTail) == null) {
  147.                                         lo = p;
  148.                                     } else {
  149.                                         loTail.next = p;
  150.                                     }
  151.                                     loTail = p;
  152.                                     ++lc;
  153.                                 } else {
  154.                                     if ((p.prev = hiTail) == null) {
  155.                                         hi = p;
  156.                                     } else {
  157.                                         hiTail.next = p;
  158.                                     }
  159.                                     hiTail = p;
  160.                                     ++hc;
  161.                                 }
  162.                             }
  163.                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
  164.                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
  165.                             setTabAt(nextTab, i, ln);
  166.                             setTabAt(nextTab, i + n, hn);
  167.                             setTabAt(tab, i, fwd);
  168.                             advance = true;
  169.                         }
  170.                     }
  171.                 }
  172.                 //第四部分结束
  173.             }
  174.         }
  175.     }
  176.     ...
  177. }
复制代码

8.ConcurrentHashMap的分段锁统计元素数据
(1)ConcurrentHashMap维护数组元素个数思绪
(2)ConcurrentHashMap维护数组元素个数流程
(3)维护数组元素个数的addCount()方法
(4)维护数组元素个数的fullAddCount()方法
(5)获取数组元素个数的size()方法

(1)ConcurrentHashMap维护数组元素个数思绪
当调用完put()方法后,ConcurrentHashMap必须增加当前元素的个数,以方便在size()方法中获得存储的元素个数。

在常规的集合中,只需要一个全局int类型的字段保存元素个数即可。每次添加一个元素,就对这个size变量 + 1。

在ConcurrentHashMap中,需要包管对该变量修改的并发安全。假如利用同步锁synchronized,那么性能开销比力大,不符合。以是ConcurrentHashMap利用了自旋 + 分段锁来维护元素个数。

(2)ConcurrentHashMap维护数组元素个数流程
ConcurrentHashMap采取了两种方式来保存元素的个数。当线程竞争不激烈时,直接利用baseCount + 1来增加元素个数。当线程竞争比力激烈时,构建一个CounterCell数组,默认长度为2。然后随机选择一个CounterCell,针对该CounterCell中的value举行保存。

增加元素个数的流程如下:


(3)维护数组元素个数的addCount()方法
addCount()方法的作用主要包括两部门:
一.累加ConcurrentHashMap中的元素个数
二.通过check >= 0判断是否需要举行数组扩容

此中增加数组元素个数的核心逻辑是:
首先通过CAS修改全局成员变量baseCount来举行累加。留意会先判断(as = counterCells) != null,再尝试对baseCount举行累加。这是因为假如一个集合发生过并发,那么后续发生并发的可能性会更大。假如CAS累加baseCount失败,则尝试利用CounterCell来举行累加。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Base counter value, used mainly when there is no contention,
  4.     but also as a fallback during table initialization races. Updated via CAS.
  5.     private transient volatile long baseCount;
  6.    
  7.     //Table of counter cells. When non-null, size is a power of 2.
  8.     private transient volatile CounterCell[] counterCells;
  9.     private static final long BASECOUNT;
  10.     static {
  11.         try {
  12.             U = sun.misc.Unsafe.getUnsafe();
  13.             ...
  14.             BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));
  15.             ...
  16.         } catch (Exception e) {
  17.             throw new Error(e);
  18.         }
  19.     }
  20.    
  21.     //Maps the specified key to the specified value in this table.
  22.     //Neither the key nor the value can be null.
  23.     public V put(K key, V value) {
  24.         return putVal(key, value, false);
  25.     }
  26.    
  27.     final V putVal(K key, V value, boolean onlyIfAbsent) {
  28.         ...
  29.         //调用addCount()方法统计Node数组元素的个数
  30.         addCount(1L, binCount);
  31.         return null;
  32.     }
  33.    
  34.     //Adds to count, and if table is too small and not already resizing, initiates transfer.
  35.     //If already resizing, helps perform transfer if work is available.  
  36.     //Rechecks occupancy after a transfer to see if another resize is already needed because resizings are lagging additions.
  37.     //x是要增加的数组元素个数
  38.     private final void addCount(long x, int check) {
  39.         CounterCell[] as; long b, s;
  40.         //首先通过CAS修改全局成员变量baseCount来进行累加
  41.         //注意:这里先判断(as = counterCells) != null,再尝试对baseCount进行CAS累加
  42.         //这是因为如果一个集合发生过并发,那么后续发生并发的可能性会更大,这种思想在并发编程中很常见
  43.         if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
  44.             //增加数组元素个数
  45.             CounterCell a; long v; int m;
  46.             boolean uncontended = true;
  47.             //如果CAS修改baseCount失败,则尝试使用CounterCell来进行累加
  48.             //1.as == null,说明CounterCell数组还没初始化
  49.             //2.(m = as.length - 1) < 0,说明CounterCell数组还没初始化
  50.             //3.(a = as[ThreadLocalRandom.getProbe() & m]) == null,说明CounterCell数组已经创建了,
  51.             //但是Hash定位到的数组位置没有对象实例,说明这个数字还存在没有CounterCell实例对象的情况
  52.             //4.如果U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)返回false,说明存在多线程竞争
  53.             if (as == null || (m = as.length - 1) < 0
  54.                 || (a = as[ThreadLocalRandom.getProbe() & m]) == null
  55.                 || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
  56.                 //调用fullAddCount()方法实现数组元素个数的累加
  57.                 fullAddCount(x, uncontended);
  58.                 return;
  59.             }
  60.             if (check <= 1) {
  61.                 return;
  62.             }
  63.             //sumCount()方法返回总的元素个数,也就是CounterCell数组的元素个数和baseCount两者的和
  64.             s = sumCount();
  65.         }
  66.         if (check >= 0) {
  67.             //处理数组扩容
  68.             Node<K,V>[] tab, nt; int n, sc;
  69.             while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
  70.                 int rs = resizeStamp(n);
  71.                 if (sc < 0) {
  72.                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  73.                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  74.                         transferIndex <= 0) {
  75.                         break;
  76.                     }
  77.                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  78.                         transfer(tab, nt);
  79.                     }
  80.                 } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {
  81.                     transfer(tab, null);
  82.                 }
  83.                 s = sumCount();
  84.             }
  85.         }
  86.     }
  87.    
  88.     final long sumCount() {
  89.         CounterCell[] as = counterCells; CounterCell a;
  90.         long sum = baseCount;
  91.         if (as != null) {
  92.             for (int i = 0; i < as.length; ++i) {
  93.                 if ((a = as[i]) != null) {
  94.                     sum += a.value;
  95.                 }
  96.             }
  97.         }
  98.         return sum;
  99.     }
  100.     ...
  101. }
复制代码
(4)维护数组元素个数的fullAddCount()方法
fullAddCount()方法的作用主要包括三部门:初始化CounterCell数组、增加数组元素个数、对CounterCell数组扩容。

留意:为了定位当前线程添加的数组元素个数应落到CounterCell数组哪个元素,会利用ThreadLocalRandom确定当前线程对应的hash值,由该hash值和CounterCell数组大小举行类似于取模的位与运算来决定。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
  4.     private transient volatile int cellsBusy;
  5.     //Table of counter cells. When non-null, size is a power of 2.
  6.     private transient volatile CounterCell[] counterCells;
  7.    
  8.     //x是要增加的数组元素个数
  9.     private final void fullAddCount(long x, boolean wasUncontended) {
  10.         //通过ThreadLocalRandom来确定当前线程对应的hash值
  11.         int h;
  12.         if ((h = ThreadLocalRandom.getProbe()) == 0) {
  13.             ThreadLocalRandom.localInit();//force initialization
  14.             h = ThreadLocalRandom.getProbe();
  15.             wasUncontended = true;
  16.         }
  17.         boolean collide = false;// True if last slot nonempty
  18.         for (;;) {
  19.             CounterCell[] as; CounterCell a; int n; long v;
  20.             //(as = counterCells) != null && (n = as.length) > 0,表示counterCells数组已经完成初始化
  21.             if ((as = counterCells) != null && (n = as.length) > 0) {
  22.                 //第二部分:增加数组元素个数,分两种情况
  23.                 if ((a = as[(n - 1) & h]) == null) {
  24.                     //情况一:(a = as[(n - 1) & h]) == null,表示将当前线程定位到counterCells数组的某位置的元素为null
  25.                     //此时直接把当前要增加的元素个数x保存到新创建的CounterCell对象,然后将该对象赋值到CounterCell数组的该位置即可
  26.                     if (cellsBusy == 0) {//Try to attach new Cell
  27.                         //先创建一个CounterCell对象,并把x保存进去
  28.                         CounterCell r = new CounterCell(x);//Optimistic create
  29.                         //U.compareAndSwapInt(this, CELLSBUSY, 0, 1)返回true,表示当前线程占有了锁
  30.                         if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
  31.                             boolean created = false;
  32.                             try {//Recheck under lock
  33.                                 CounterCell[] rs; int m, j;
  34.                                 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
  35.                                     //把新构建的保存了元素个数x的CounterCell对象保存到rs[j]的位置
  36.                                     rs[j] = r;
  37.                                     created = true;
  38.                                 }
  39.                             } finally {
  40.                                 cellsBusy = 0;
  41.                             }
  42.                             if (created) {
  43.                                 break;
  44.                             }
  45.                             continue;//Slot is now non-empty
  46.                         }
  47.                     }
  48.                     collide = false;
  49.                 } else if (!wasUncontended) {//CAS already known to fail
  50.                     wasUncontended = true;//Continue after rehash
  51.                 } else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) {
  52.                     //情况二:如果将当前线程定位到counterCells数组的某位置的元素不为null,
  53.                     //那么直接通过U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)操作,对counterCells数组的指定位置进行累加
  54.                     break;
  55.                 } else if (counterCells != as || n >= NCPU) {
  56.                     collide = false;//At max size or stale
  57.                 } else if (!collide) {
  58.                     collide = true;
  59.                 } else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
  60.                     //第三部分:counterCells数组扩容
  61.                     //需要先通过cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),抢占锁
  62.                     try {
  63.                         if (counterCells == as) {// Expand table unless stale
  64.                             //在原有的基础上扩容一倍
  65.                             CounterCell[] rs = new CounterCell[n << 1];
  66.                             //通过for循环进行数据迁移
  67.                             for (int i = 0; i < n; ++i) {
  68.                                 rs[i] = as[i];
  69.                             }
  70.                             //把扩容后的对象赋值给counterCells
  71.                             counterCells = rs;
  72.                         }
  73.                     } finally {
  74.                         //恢复标识
  75.                         cellsBusy = 0;
  76.                     }
  77.                     collide = false;
  78.                     continue;//继续下一次自旋
  79.                 }
  80.                 h = ThreadLocalRandom.advanceProbe(h);
  81.             } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
  82.                 //第一部分:初始化CounterCell数组
  83.                 //cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1),通过cellsBusy字段来抢占锁,通过CAS修改该字段值为1表示抢到锁
  84.                 boolean init = false;
  85.                 try {//Initialize table
  86.                     if (counterCells == as) {
  87.                         //构造一个元素个数为2的CounterCell数组
  88.                         CounterCell[] rs = new CounterCell[2];
  89.                         //把要增加的数组元素个数x,保存到CounterCell数组的某个元素中
  90.                         rs[h & 1] = new CounterCell(x);
  91.                         //把初始化的CounterCell数组赋值给全局对象counterCells
  92.                         counterCells = rs;
  93.                         init = true;
  94.                     }
  95.                 } finally {
  96.                     //恢复标识
  97.                     cellsBusy = 0;
  98.                 }
  99.                 if (init) {
  100.                     break;
  101.                 }
  102.             } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) {
  103.                 break;//Fall back on using base
  104.            }
  105.         }
  106.     }
  107.     ...
  108. }
复制代码
(5)获取数组元素个数的size()方法
sumCount()方法会先得到baseCount的值,保存到sum字段中。然后遍历CounterCell数组,把每个value举行累加。

留意:size()方法在盘算总的元素个数时并没有加锁,以是size()方法返回的元素个数不肯定代表此时此刻总数量。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
  4.     private transient volatile int cellsBusy;
  5.     //Table of counter cells. When non-null, size is a power of 2.
  6.     private transient volatile CounterCell[] counterCells;
  7.    
  8.     public int size() {
  9.         long n = sumCount();
  10.         return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
  11.     }
  12.    
  13.     final long sumCount() {
  14.         CounterCell[] as = counterCells; CounterCell a;
  15.         long sum = baseCount;
  16.         if (as != null) {
  17.             for (int i = 0; i < as.length; ++i) {
  18.                 if ((a = as[i]) != null) {
  19.                     sum += a.value;
  20.                 }
  21.             }
  22.         }
  23.         return sum;
  24.     }
  25.     ...
  26. }
复制代码

9.ConcurrentHashMap的查询操作是否涉及锁
(1)put操作会加锁
(2)size操作不会加锁
(3)get操作也不会加锁

(1)put操作会加锁
首先尝试通过CAS设置Node数组对应位置的Node元素。假如该位置的Node元素非空,大概CAS设置失败,则说明发生了哈希冲突。此时就会利用synchronized关键字对该数组元素加锁来处置惩罚链表大概红黑树。

实在JUC还可以继续优化,先用CAS尝试修改哈希冲突下的链表或红黑树。假如CAS修改失败,再通过利用synchronized对该数组元素加锁来处置惩罚。

(2)size操作不会加锁
size()方法在盘算总的元素个数时并没有加锁,以是size()方法返回的元素个数不肯定代表此时此刻数组元素的总数量。

(3)get操作也不会加锁
get()方法也利用了CAS操作,通过Unsafe类让数组中的元素具有可见性。包管线程根据tabAt()方法获取数组的某个位置的元素时,能获取最新的值。以是get不加锁,但通过volatile读数组,可以包管读到数组元素的最新值。

虽然table变量利用了volatile,但这只包管了table引用对所有线程的可见性,还不能包管table数组中的元素的修改对于所有线程是可见的。因此才通过Unsafe类的getObjectVolatile()来包管table数组中元素的可见性。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //The array of bins. Lazily initialized upon first insertion.
  4.     //Size is always a power of two. Accessed directly by iterators.
  5.     transient volatile Node<K,V>[] table;
  6.    
  7.     public V get(Object key) {
  8.         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  9.         int h = spread(key.hashCode());
  10.         if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
  11.             if ((eh = e.hash) == h) {
  12.                 if ((ek = e.key) == key || (ek != null && key.equals(ek))) {
  13.                     return e.val;
  14.                 }
  15.             } else if (eh < 0) {
  16.                 return (p = e.find(h, key)) != null ? p.val : null;
  17.             }
  18.             while ((e = e.next) != null) {
  19.                 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
  20.                     return e.val;
  21.                 }
  22.             }
  23.         }
  24.         return null;
  25.     }
  26.    
  27.     //获取Node数组在位置i的元素,通过Unsafe类让数组中的元素具有可见性
  28.     //虽然table变量使用了volatile修饰,但这只保证了table引用对于所有线程的可见性,还不能保证table数组中的元素的修改对于所有线程是可见的
  29.     //因此需要通过Unsafe类的getObjectVolatile()来保证table数组中的元素的可见性
  30.     static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  31.         return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
  32.     }
  33.     ...
  34. }
复制代码

10.ConcurrentHashMap中红黑树的利用
(1)treeifyBin()方法的逻辑
(2)TreeBin的成员变量和方法
(3)TreeBin在构造方法中将链表转为红黑树
(4)TreeBin在插入元素时实现红黑树的自均衡

(1)treeifyBin()方法的逻辑
put操作中当发现链表元素>=8时会调用treeifyBin()方法将链表转为红黑树。首先通过tabAt()方法从Node数组中获取位置为index的元素并赋值给变量b,然后利用synchronized对Node数组中位置为index的元素b举行加锁,接着通过for循环遍历Node数组中位置为index的元素b这个链表,并且根据链表中每个结点的数据封装成一个TreeNode对象来组成新链表,最后把新链表的头结点作为参数传给TreeBin构造方法来完成红黑树的构建。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Replaces all linked nodes in bin at given index unless table is too small, in which case resizes instead.
  4.     //将Node数组tab中位置为index的元素,从链表转化为红黑树
  5.     private final void treeifyBin(Node<K,V>[] tab, int index) {
  6.         Node<K,V> b; int n, sc;
  7.         if (tab != null) {
  8.             if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
  9.                 tryPresize(n << 1);//数组扩容
  10.             } else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
  11.                 synchronized (b) {//b就是链表,先用synchronized对b加锁,保证并发安全
  12.                     if (tabAt(tab, index) == b) {
  13.                         TreeNode<K,V> hd = null, tl = null;//hd是新链表的头结点,tl是新链表的尾结点
  14.                         //将链表b赋值给e,然后遍历通过e.next赋值回给e来遍历链表
  15.                         for (Node<K,V> e = b; e != null; e = e.next) {
  16.                             //根据Node结点e来封装一个TreeNode对象
  17.                             TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
  18.                             if ((p.prev = tl) == null) {
  19.                                 hd = p;
  20.                             } else {
  21.                                 //尾插法构造新链表
  22.                                 tl.next = p;
  23.                             }
  24.                             tl = p;
  25.                         }
  26.                         //将构造好的新链表的头结点hd作为参数,创建一个TreeBin对象
  27.                         setTabAt(tab, index, new TreeBin<K,V>(hd));
  28.                     }
  29.                 }
  30.             }
  31.         }
  32.     }
  33.     ...
  34. }
复制代码
(2)TreeBin的成员变量和方法
ConcurrentHashMap中红黑树用继承自Node的TreeNode来表现。TreeBin则主要提供了红黑树的一系列功能的实现,并且实现了读写锁。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     //Nodes for use in TreeBins
  4.     static final class TreeNode<K,V> extends Node<K,V> {
  5.         TreeNode<K,V> parent;//red-black tree links
  6.         TreeNode<K,V> left;
  7.         TreeNode<K,V> right;
  8.         TreeNode<K,V> prev;//needed to unlink next upon deletion
  9.         boolean red;
  10.         TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {
  11.             super(hash, key, val, next);
  12.             this.parent = parent;
  13.         }
  14.         ...
  15.     }
  16.    
  17.     //TreeNodes used at the heads of bins.
  18.     //TreeBins do not hold user keys or values, but instead point to list of TreeNodes and their root.
  19.     //They also maintain a parasitic read-write lock forcing writers (who hold bin lock)
  20.     //to wait for readers (who do not) to complete before tree restructuring operations.
  21.     static final class TreeBin<K,V> extends Node<K,V> {
  22.         TreeNode<K,V> root;//红黑树根结点
  23.         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入
  24.         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)
  25.         volatile int lockState;//表示锁的状态
  26.         // values for lockState
  27.         static final int WRITER = 1;//写锁状态
  28.         static final int WAITER = 2;//等待获取写锁状态
  29.         static final int READER = 4;//读锁状态
  30.         ...
  31.         //构造函数,将以b为头结点的链表转换为红黑树
  32.         //Creates bin with initial set of nodes headed by b.
  33.         TreeBin(TreeNode<K,V> b) {
  34.             ...
  35.         }
  36.    
  37.         //对红黑树的根结点加写锁
  38.         //Acquires write lock for tree restructuring.
  39.         private final void lockRoot() {
  40.             if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER)) {
  41.                 contendedLock(); // offload to separate method
  42.             }
  43.         }
  44.       
  45.         //释放写锁
  46.         //Releases write lock for tree restructuring.
  47.         private final void unlockRoot() {
  48.             lockState = 0;
  49.         }
  50.       
  51.         //根据key获取指定的结点
  52.         //Returns matching node or null if none.
  53.         //Tries to search using tree comparisons from root, but continues linear search when lock not available.
  54.         final Node<K,V> find(int h, Object k) {
  55.             ...
  56.         }
  57.         ...
  58.     }
  59.     ...
  60. }
复制代码
(3)TreeBin在构造方法中将链表转为红黑树
treeifyBin()方法在对链表举行转化时,会先构建一个双向链表,然后将该双向链表传入TreeBin的构造方法。

TreeBin的构造方法会通过如下处置惩罚将该双向链表转化为红黑树:
一.假如红黑树为空,则初始化红黑树的根结点
二.假如红黑树不为空,则按均衡二叉树逻辑插入
三.通过balanceInsertion()方法举行自均衡

TreeBin的构造方法可以分为三部门:

第一部门:初始化红黑树
遍历链表b,将链表b的头结点设置为红黑树的根结点,接着设置红黑树根结点的左右子结点为null,以及设置红黑树根结点为黑色。

第二部门:将链表中的结点添加到初始化好的红黑树
首先盘算dir的值。假如dir = -1,表现红黑树中被插入结点的hash值大于新添加结点x的hash值。假如dir = 1,表现红黑树中被插入结点的hash值小于新添加结点x的hash值。然后根据dir的值来决定新添加结点x是被插入结点的左结点还是右结点,最后调用TreeBin的balanceInsertion()方法对红黑树举行自均衡处置惩罚。

第三部门:对红黑树举行自均衡
调用TreeBin的balanceInsertion()方法对红黑树举行自均衡处置惩罚。
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     static final class TreeBin<K,V> extends Node<K,V> {
  4.         TreeNode<K,V> root;//红黑树根结点
  5.         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入
  6.         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)
  7.         volatile int lockState;//表示锁的状态
  8.         //values for lockState
  9.         static final int WRITER = 1;//写锁状态
  10.         static final int WAITER = 2;//等待获取写锁状态
  11.         static final int READER = 4;//读锁状态
  12.         ...
  13.         //构造函数,将以b为头结点的链表转换为红黑树
  14.         TreeBin(TreeNode<K,V> b) {
  15.             //第一部分开始:初始化红黑树
  16.             super(TREEBIN, null, null, null);
  17.             this.first = b;
  18.             //r表示红黑树的根结点
  19.             TreeNode<K,V> r = null;
  20.             //遍历链表b,x将作为新添加的红黑树结点
  21.             for (TreeNode<K,V> x = b, next; x != null; x = next) {
  22.                 next = (TreeNode<K,V>)x.next;
  23.                 //把新添加的红黑树结点x的左右子结点设置为null
  24.                 x.left = x.right = null;
  25.                 //r表示红黑树的根结点,r == null表示红黑树为空,将x结点设置为红黑树的根结点
  26.                 if (r == null) {
  27.                     x.parent = null;
  28.                     //把红黑树的根结点设置为黑色
  29.                     x.red = false;
  30.                     r = x;
  31.                     //第一部分结束
  32.                 } else {
  33.                     //第二部分开始:将链表中的结点添加到初始化好的红黑树中
  34.                     //x是新添加的红黑树结点
  35.                     K k = x.key;
  36.                     int h = x.hash;
  37.                     Class<?> kc = null;
  38.                     //p是红黑树中被插入的结点
  39.                     for (TreeNode<K,V> p = r;;) {
  40.                         int dir, ph;
  41.                         K pk = p.key;
  42.                         //首先计算dir的值
  43.                         //dir = -1,表示红黑树中被插入结点的hash值大于新添加结点x的hash值
  44.                         //dir = 1,表示红黑树中被插入结点的hash值小于新添加结点x的hash值
  45.                         if ((ph = p.hash) > h) {
  46.                             dir = -1;
  47.                         } else if (ph < h) {
  48.                             dir = 1;
  49.                         } else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) {
  50.                             dir = tieBreakOrder(k, pk);
  51.                             TreeNode<K,V> xp = p;
  52.                         }
  53.                         //然后根据dir的值来决定新添加的结点x是左结点还是右结点
  54.                         if ((p = (dir <= 0) ? p.left : p.right) == null) {
  55.                             x.parent = xp;
  56.                             if (dir <= 0) {
  57.                                 xp.left = x;
  58.                             } else {
  59.                                 xp.right = x;
  60.                             }
  61.                             //第二部分结束
  62.                             //第三部分开始:红黑树进行自平衡
  63.                             //r代表一棵红黑树,x代表往红黑树r添加的结点
  64.                             r = balanceInsertion(r, x);
  65.                             //第三部分结束
  66.                             break;
  67.                         }
  68.                     }
  69.                 }
  70.             }
  71.             this.root = r;
  72.             assert checkInvariants(root);
  73.         }
  74.         ...
  75.     }
  76.     ...
  77. }
复制代码
(4)TreeBin在插入元素时实现红黑树的自均衡
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.     ...
  3.     static final class TreeBin<K,V> extends Node<K,V> {
  4.         TreeNode<K,V> root;//红黑树根结点
  5.         volatile TreeNode<K,V> first;//链表头结点,由构造函数传入
  6.         volatile Thread waiter;//保存最近一个抢占写锁的线程(如果有值,则说明lockState是读锁状态)
  7.         volatile int lockState;//表示锁的状态
  8.         // values for lockState
  9.         static final int WRITER = 1;//写锁状态
  10.         static final int WAITER = 2;//等待获取写锁状态
  11.         static final int READER = 4;//读锁状态
  12.         ...
  13.         //root代表一棵红黑树,x代表往红黑树r添加的结点
  14.         static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) {
  15.             //所有往红黑树添加的结点默认为红色
  16.             x.red = true;
  17.             //自旋,xp表示添加结点x的父结点,xpp表示添加结点x的爷结点,xppl表示爷结点的左结点,xppr表示爷结点的右结点
  18.             for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
  19.                 if ((xp = x.parent) == null) {//此处判断条件表示:x结点的父结点为空
  20.                     //由于只有根结点的父结点才会为空,所以此时x结点为根结点,于是设置根结点x为黑色
  21.                     x.red = false;
  22.                     return x;
  23.                 } else if (!xp.red || (xpp = xp.parent) == null) {//此处判断条件表示:表示x结点的父结点为黑色,或者x结点的爷结点为空
  24.                     //那么直接返回红黑树root,不需要处理
  25.                     return root;
  26.                 }
  27.                 //代码执行到这里,说明x结点的父结点为红色
  28.                 if (xp == (xppl = xpp.left)) {//此处判断条件表示:表示x结点的父结点xp是其爷结点xpp的左子结点xppl
  29.                     if ((xppr = xpp.right) != null && xppr.red) {//此处判断条件表示:x结点的叔结点存在且为红色
  30.                         //那么直接修改父结点和叔结点的颜色为黑色
  31.                         xppr.red = false;
  32.                         xp.red = false;
  33.                         xpp.red = true;
  34.                         x = xpp;
  35.                     } else {//此处判断条件表示:如果x结点的叔结点不存在,或者叔结点存在且为黑色
  36.                         if (x == xp.right) {//如果x结点是父结点的右子结点,则按x结点的父结点进行左旋
  37.                             root = rotateLeft(root, x = xp);//将x结点的父结点赋值给x结点
  38.                             xpp = (xp = x.parent) == null ? null : xp.parent;
  39.                         }
  40.                         if (xp != null) {
  41.                             xp.red = false;
  42.                             if (xpp != null) {
  43.                                 xpp.red = true;
  44.                                 root = rotateRight(root, xpp);
  45.                             }
  46.                         }
  47.                     }
  48.                 } else {//表示x结点的父结点是其爷结点的右子结点
  49.                     if (xppl != null && xppl.red) {
  50.                         xppl.red = false;
  51.                         xp.red = false;
  52.                         xpp.red = true;
  53.                         x = xpp;
  54.                     } else {
  55.                         if (x == xp.left) {
  56.                             root = rotateRight(root, x = xp);
  57.                             xpp = (xp = x.parent) == null ? null : xp.parent;
  58.                         }
  59.                         if (xp != null) {
  60.                             xp.red = false;
  61.                             if (xpp != null) {
  62.                                 xpp.red = true;
  63.                                 root = rotateLeft(root, xpp);
  64.                             }
  65.                         }
  66.                     }
  67.                 }
  68.             }
  69.         }
  70.       
  71.         static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) {
  72.             TreeNode<K,V> r, pp, rl;
  73.             if (p != null && (r = p.right) != null) {
  74.                 if ((rl = p.right = r.left) != null) {
  75.                     rl.parent = p;
  76.                 }
  77.                 if ((pp = r.parent = p.parent) == null) {
  78.                     (root = r).red = false;
  79.                 } else if (pp.left == p) {
  80.                     pp.left = r;
  81.                 } else {
  82.                     pp.right = r;
  83.                 }
  84.                 r.left = p;
  85.                 p.parent = r;
  86.             }
  87.             return root;
  88.         }
  89.         static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) {
  90.             TreeNode<K,V> l, pp, lr;
  91.             if (p != null && (l = p.left) != null) {
  92.                 if ((lr = p.left = l.right) != null) {
  93.                     lr.parent = p;
  94.                 }
  95.                 if ((pp = l.parent = p.parent) == null) {
  96.                     (root = l).red = false;
  97.                 } else if (pp.right == p) {
  98.                     pp.right = l;
  99.                 } else {
  100.                     pp.left = l;
  101.                 }
  102.                 l.right = p;
  103.                 p.parent = l;
  104.             }
  105.             return root;
  106.         }
  107.         ...
  108.     }
  109. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

金歌

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表