9张图深入剖析ConcurrentHashMap

打印 上一主题 下一主题

主题 846|帖子 846|积分 2540

前言

在日常的开发中,我们经常使用key-value键值对的HashMap,其使用哈希表实现,用空间换取时间,提升查询性能
但在多线程的并发场景中,HashMap并不是线程安全的
如果想使用线程安全的,可以使用ConcurrentHashMap、HashTable、Collections.synchronizedMap等
但由于后面二者使用synchronized的粒度太大,因此一般不使用,而使用并发包中的ConcurrentHashMap
在ConcurrentHashMap中,使用volatile保证内存可见性,使得读场景下不需要“加锁”保证原子性
在写场景下使用CAS+synchronized,synchronized只锁哈希表某个索引位置上的首节点,相当于细粒度加锁,增大并发性能
本篇文章将从ConcurrentHashMap的使用,读、写、扩容的实现原理,设计思想等方面进行剖析
查看本文前需要了解哈希表、volatile、CAS、synchronized等知识
volatile可以查看这篇文章:5个案例和流程图让你从0到1搞懂volatile关键字
CAS、synchronized可以查看这篇文章:15000字、6个代码案例、5个原理图让你彻底搞懂Synchronized
使用ConcurrentHashMap

ConcurrentHashMap是并发场景下线程安全的Map,可以在并发场景下查询存储K、V键值对
不可变对象是绝对线程安全的,无论外界如何使用,都线程安全
ConcurrentHashMap并不是绝对线程安全的,只提供方法的线程安全,如果在外层使用错误依旧会导致线程不安全
来看下面的案例,使用value存储自增调用次数,开启10个线程每个执行100次,最终结果应该是1000次,但错误的使用导致不足1000
  1.     public void test() {
  2. //        Map<String, Integer> map = new HashMap(16);
  3.         Map<String, Integer> map = new ConcurrentHashMap(16);
  4.         String key = "key";
  5.         CountDownLatch countDownLatch = new CountDownLatch(10);
  6.         for (int i = 0; i < 10; i++) {
  7.             new Thread(() -> {
  8.                 for (int j = 0; j < 100; j++) {
  9.                     incr(map, key);
  10. //                    incrCompute(map, key);
  11.                 }
  12.                 countDownLatch.countDown();
  13.             }).start();
  14.         }
  15.         try {
  16.             //阻塞到线程跑完
  17.             countDownLatch.await();
  18.         } catch (InterruptedException e) {
  19.             e.printStackTrace();
  20.         }
  21.         //1000不到
  22.         System.out.println(map.get(key));
  23.     }
  24.         private void incr(Map<String, Integer> map, String key) {
  25.         map.put(key, map.getOrDefault(key, 0) + 1);
  26.     }
复制代码
在自增方法incr中,先进行读操作,再计算,最后进行写操作,这种复合操作没有保证原子性,导致最终所有结果累加一定不为1000
正确的使用方式是使用JDK8提供的默认方法compute
ConcurrentHashMap实现compute的原理是在put中使用同步手段后再进行计算
  1.         private void incrCompute(Map<String, Integer> map, String key) {
  2.         map.compute(key, (k, v) -> Objects.isNull(v) ? 1 : v + 1);
  3.     }
复制代码
数据结构

与HashMap类似,使用哈希表+链表/红黑树实现
哈希表

哈希表的实现由数组构成,当发生哈希冲突(哈希算法得到同一索引)时使用链地址法构建成链表

当链表上的节点太长,遍历寻找开销大,超过阈值时(链表节点超过8个、哈希表长度大于64),树化成红黑树减少遍历寻找开销,时间复杂度从O(n)优化为(log n)

ConcurrentHashMap由Node数组构成,在扩容时会存在新旧两个哈希表:table、nextTable
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
  2.     implements ConcurrentMap<K,V>, Serializable {       
  3.         //哈希表 node数组
  4.         transient volatile Node<K,V>[] table;
  5.    
  6.     //扩容时为了兼容读写,会存在两个哈希表,这个是新哈希表
  7.     private transient volatile Node<K,V>[] nextTable;
  8.    
  9.     // 默认为 0
  10.     // 当初始化时, 为 -1
  11.     // 当扩容时, 为 -(1 + 扩容线程数)
  12.     // 当初始化或扩容完成后,为 下一次的扩容的阈值大小
  13.     private transient volatile int sizeCtl;
  14.    
  15.     //扩容时 用于指定迁移区间的下标
  16.     private transient volatile int transferIndex;
  17.    
  18.     //统计每个哈希槽中的元素数量
  19.     private transient volatile CounterCell[] counterCells;
  20. }
复制代码
节点

Node用于实现哈希表数组的节点和发生哈希冲突时,构建成链表的节点
  1. //实现哈希表的节点,数组和链表时使用
  2. static class Node<K,V> implements Map.Entry<K,V> {
  3.     //节点哈希值
  4.         final int hash;
  5.         final K key;
  6.         volatile V val;
  7.     //作为链表时的 后续指针
  8.         volatile Node<K,V> next;           
  9. }
  10. // 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
  11. static final class ForwardingNode<K,V> extends Node<K,V> {}
  12. // 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
  13. static final class ReservationNode<K,V> extends Node<K,V> {}
  14. // 作为 treebin 的头节点, 存储 root 和 first
  15. static final class TreeBin<K,V> extends Node<K,V> {}
  16. // 作为 treebin 的节点, 存储 parent, left, right
  17. static final class TreeNode<K,V> extends Node<K,V> {}
复制代码
节点哈希值
  1. //转发节点
  2. static final int MOVED     = -1;
  3. //红黑树在数组中的节点
  4. static final int TREEBIN   = -2;
  5. //占位节点
  6. static final int RESERVED  = -3;
复制代码
转发节点:继承Node,用于扩容时设置在旧哈希表某索引的首节点,遇到转发节点要去新哈希表中寻找
  1. static final class ForwardingNode<K,V> extends Node<K,V> {
  2.             //新哈希表
  3.         final Node<K,V>[] nextTable;
  4.            
  5.         ForwardingNode(Node<K,V>[] tab) {
  6.             //哈希值设置为-1
  7.             super(MOVED, null, null, null);
  8.             this.nextTable = tab;
  9.         }
  10. }
复制代码
红黑树在数组中的节点 TreeBin:继承Node,first指向红黑树的首节点
  1. static final class TreeBin<K,V> extends Node<K,V> {
  2.         TreeNode<K,V> root;
  3.             //红黑树首节点
  4.         volatile TreeNode<K,V> first;
  5. }   
复制代码

红黑树节点TreeNode
  1. static final class TreeNode<K,V> extends Node<K,V> {
  2.         TreeNode<K,V> parent;  
  3.         TreeNode<K,V> left;
  4.         TreeNode<K,V> right;
  5.         TreeNode<K,V> prev;
  6.             boolean red;
  7. }
复制代码
占位节点:继承Node,需要计算时(使用computer方法),先使用占位节点占位,计算完再构建节点取代占位节点
  1.         static final class ReservationNode<K,V> extends Node<K,V> {
  2.         ReservationNode() {
  3.             super(RESERVED, null, null, null);
  4.         }
  5.         Node<K,V> find(int h, Object k) {
  6.             return null;
  7.         }
  8.     }
复制代码
实现原理

构造

在构造时会检查入参,然后根据需要存储的数据容量、负载因子计算哈希表容量,最后将哈希表容量调整成2次幂
构造时并不会初始化,而是等到使用再进行创建(懒加载)
  1.         public ConcurrentHashMap(int initialCapacity,
  2.                              float loadFactor, int concurrencyLevel) {
  3.         //检查负载因子、初始容量
  4.         if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
  5.             throw new IllegalArgumentException();
  6.         
  7.         //concurrencyLevel:1
  8.         if (initialCapacity < concurrencyLevel)   // Use at least as many bins
  9.             initialCapacity = concurrencyLevel;   // as estimated threads
  10.         //计算大小 = 容量/负载因子 向上取整
  11.         long size = (long)(1.0 + (long)initialCapacity / loadFactor);
  12.         //如果超过最大值就使用最大值
  13.         //tableSizeFor 将大小调整为2次幂
  14.         int cap = (size >= (long)MAXIMUM_CAPACITY) ?
  15.             MAXIMUM_CAPACITY : tableSizeFor((int)size);
  16.         
  17.         //设置容量
  18.         this.sizeCtl = cap;
  19.     }
复制代码
读-get

读场景使用volatile保证可见性,即使其他线程修改也是可见的,不用使用其他手段保证同步
读操作需要在哈希表中寻找元素,经过扰动算法打乱哈希值,再使用哈希值通过哈希算法得到索引,根据索引上的首节点分为多种情况处理

  • 扰动算法将哈希值充分打乱(避免造成太多的哈希冲突),符号位&0保证结果正数
    int h = spread(key.hashCode())
    扰动算法:哈希值高低16位异或运算
    经过扰动算法后,&HASH_BITS = 0x7fffffff (011111...),符号位为0保证结果为正数
    负数的哈希值表示特殊的作用,比如:转发节点、树的首节点、占位节点等
    1.         static final int spread(int h) {
    2.         return (h ^ (h >>> 16)) & HASH_BITS;
    3.     }
    复制代码
  • 使用打乱的哈希值经过哈希算法得到数组中的索引(下标)
    n 为哈希表长度:(n = tab.length)
    (e = tabAt(tab, (n - 1) & h)
    h为计算后的哈希值,哈希值%(哈希表长度-1) 就能求出索引位置
    为了性能提升,规定哈希表长度为2的n次幂,哈希表长度二进制一定是1000....,而(n-1)的二进制一定是0111...
    因此(n - 1) & h计算索引,进行与运算的结果一定在0~n-1之间 使用位运算提升性能
  • 得到数组上的节点后,需要进行比较
    找到哈希表上的首个节点后,进行比较key 查看是否是当前节点
    比较规则:先对哈希值进行比较,如果对象哈希值相同,那么可能是同一个对象,还需要比较key(==与equals),如果哈希值都不相同,那么肯定不是同一个对象
    先比较哈希值的好处就是提升查找性能,如果直接使用equals 可能时间复杂度会上升(比如String的equals)
  • 使用链地址法解决哈希冲突,因此找到节点后可能遍历链表或树;由于哈希表存在扩容,也可能要去新节点上寻找
    4.1 首节点比较成功,直接返回
    4.2 首节点哈希值为负,说明该节点是特殊情况的:转发节点、树的首节点 、计算的预订占位节点

    • 如果是转发节点,正在扩容则去新数组上找
    • 如果是TreeBin则去红黑树中寻找
    • 如果是占位节点 直接返回空
    4.3 遍历该链表依次比较

get代码
  1. public V get(Object key) {
  2.     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3.     //1.spread:扰动算法 + 让key的哈希值不能为负数,因为负数哈希值代表红黑树或ForwardingNode
  4.     int h = spread(key.hashCode());
  5.     //2.(n - 1) & h:下标、索引 实际上就是数组长度模哈希值 位运算效率更高
  6.     //e:哈希表中对应索引位置上的节点
  7.     if ((tab = table) != null && (n = tab.length) > 0 &&
  8.         (e = tabAt(tab, (n - 1) & h)) != null) {
  9.         //3.如果哈希值相等,说明可能找到,再比较key
  10.         if ((eh = e.hash) == h) {
  11.             //4.1 key相等说明找到 返回
  12.             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  13.                 return e.val;
  14.         }
  15.         //4.2 首节点哈希值为负,说明该节点是转发节点,当前正在扩容则去新数组上找
  16.         else if (eh < 0)
  17.             return (p = e.find(h, key)) != null ? p.val : null;
  18.         
  19.         //4.3 遍历该链表,能找到就返回值,不能返回null
  20.         while ((e = e.next) != null) {
  21.             if (e.hash == h &&
  22.                 ((ek = e.key) == key || (ek != null && key.equals(ek))))
  23.                 return e.val;
  24.         }
  25.     }
  26.     return null;
  27. }
复制代码
写-put

添加元素时,使用CAS+synchronized(只锁住哈希表中某个首节点)的同步方式保证原子性

  • 获取哈希值:扰动算法+确保哈希值为正数
  • 哈希表为空,CAS保证一个线程初始化
  1.         private final Node<K,V>[] initTable() {
  2.         Node<K,V>[] tab; int sc;
  3.         while ((tab = table) == null || tab.length == 0) {
  4.             //小于0 说明其他线程在初始化 让出CPU时间片 后续初始化完退出
  5.             if ((sc = sizeCtl) < 0)
  6.                 Thread.yield();
  7.             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  8.                 //CAS将SIZECTL设置成-1 (表示有线程在初始化)成功后 初始化
  9.                 try {
  10.                     if ((tab = table) == null || tab.length == 0) {
  11.                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  12.                         @SuppressWarnings("unchecked")
  13.                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  14.                         table = tab = nt;
  15.                         sc = n - (n >>> 2);
  16.                     }
  17.                 } finally {
  18.                     sizeCtl = sc;
  19.                 }
  20.                 break;
  21.             }
  22.         }
  23.         return tab;
  24.     }
复制代码

  • 将哈希值通过哈希算法获取索引上的节点 f = tabAt(tab, i = (n - 1) & hash)
  • 根据不同情况进行处理

    • 4.1 首节点为空时,直接CAS往索引位置添加节点 casTabAt(tab, i, null,new Node(hash, key, value, null))

    • 4.2 首节点hash为MOVED -1时,表示该节点是转发节点,说明正在扩容,帮助扩容
    • 4.3 首节点加锁

      • 4.3.1 遍历链表寻找并添加/覆盖

      • 4.3.2 遍历树寻找并添加/覆盖


  • addCount统计每个节点上的数据,并检查扩容
put代码
  1. //onlyIfAbsent为true时,如果原来有k,v则这次不会覆盖
  2. final V putVal(K key, V value, boolean onlyIfAbsent) {
  3.     if (key == null || value == null) throw new NullPointerException();
  4.     //1.获取哈希值:扰动算法+确保哈希值为正数
  5.     int hash = spread(key.hashCode());
  6.     int binCount = 0;
  7.     //乐观锁思想 CSA+失败重试
  8.     for (Node<K,V>[] tab = table;;) {
  9.         Node<K,V> f; int n, i, fh;
  10.         //2.哈希表为空 CAS保证只有一个线程初始化
  11.         if (tab == null || (n = tab.length) == 0)
  12.             tab = initTable();
  13.         //3. 哈希算法求得索引找到索引上的首节点
  14.         //4.1 节点为空时,直接CAS构建节点
  15.         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
  16.             if (casTabAt(tab, i, null,
  17.                          new Node<K,V>(hash, key, value, null)))
  18.                 break;                   // no lock when adding to empty bin
  19.         }
  20.         //4.2 索引首节点hash 为MOVED 说明该节点是转发节点,当前正在扩容,去帮助扩容
  21.         else if ((fh = f.hash) == MOVED)
  22.             tab = helpTransfer(tab, f);
  23.         else {
  24.             V oldVal = null;
  25.             //4.3 首节点 加锁
  26.             synchronized (f) {
  27.                 //首节点没变
  28.                 if (tabAt(tab, i) == f) {
  29.                     //首节点哈希值大于等于0 说明节点是链表上的节点  
  30.                     //4.3.1 遍历链表寻找然后添加/覆盖
  31.                     if (fh >= 0) {
  32.                         //记录链表上有几个节点
  33.                         binCount = 1;
  34.                         //遍历链表找到则替换,如果遍历完了还没找到就添加(尾插)
  35.                         for (Node<K,V> e = f;; ++binCount) {
  36.                             K ek;
  37.                             //替换
  38.                             if (e.hash == hash &&
  39.                                 ((ek = e.key) == key ||
  40.                                  (ek != null && key.equals(ek)))) {
  41.                                 oldVal = e.val;
  42.                                 //onlyIfAbsent为false允许覆盖(使用xxIfAbsent方法时,有值就不覆盖)
  43.                                 if (!onlyIfAbsent)
  44.                                     e.val = value;
  45.                                 break;
  46.                             }
  47.                             Node<K,V> pred = e;
  48.                             //添加
  49.                             if ((e = e.next) == null) {
  50.                                 pred.next = new Node<K,V>(hash, key,
  51.                                                           value, null);
  52.                                 break;
  53.                             }
  54.                         }
  55.                     }
  56.                     //如果是红黑树首节点,则找到对应节点再覆盖
  57.                     //4.3.2 遍历树寻找然后添加/覆盖
  58.                     else if (f instanceof TreeBin) {
  59.                         Node<K,V> p;
  60.                         binCount = 2;
  61.                         //如果是添加返回null,返回不是null则出来添加
  62.                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  63.                                                        value)) != null) {
  64.                             oldVal = p.val;
  65.                             //覆盖
  66.                             if (!onlyIfAbsent)
  67.                                 p.val = value;
  68.                         }
  69.                     }
  70.                 }
  71.             }
  72.             
  73.             if (binCount != 0) {
  74.                 if (binCount >= TREEIFY_THRESHOLD)
  75.                     //链表上的节点超过TREEIFY_THRESHOLD 8个(不算首节点) 并且 数组长度超过64才树化,否则扩容
  76.                     treeifyBin(tab, i);
  77.                 if (oldVal != null)
  78.                     return oldVal;
  79.                 break;
  80.             }
  81.         }
  82.     }
  83.     //5.添加计数,用于统计元素(添加节点的情况)
  84.     addCount(1L, binCount);
  85.     return null;
  86. }
复制代码
扩容

为了避免频繁发生哈希冲突,当哈希表中的元素数量 / 哈希表长度 超过负载因子时,进行扩容(增大哈希表的长度)
一般来说扩容都是增大哈希表长度的2倍,比如从32到64保证长度是2次幂;如果扩容长度达到整型上限则使用整型最大值
当发生扩容时,需要将数组中每个槽里的链表或树迁移到新数组中
如果处理器是多核,那么这个迁移的操作并不是一个线程单独完成的,而是会让其他线程也来帮助迁移
在迁移时让每个线程从右往左的每次迁移多个槽,迁移完再判断是否全部迁移完,如果没迁移完则继续循环迁移
扩容操作主要在transfer方法中,扩容主要在三个场景下:

  • addCount:添加完节点增加计数检查扩容
  • helpTransfer:线程put时发现正在迁移,来帮忙扩容
  • tryPresize:尝试调整容量(批量添加putAll,树化数组长度没超过64时treeifyBin调用)
分为以下3个步骤

  • 根据CPU核数、哈希表总长度计算每次迁移多少个槽,最小16个
  • 新哈希表为空,说明是初始化
  • 循环迁移

    • 3.1 分配负责迁移的区间 [bround,i](可能存在多线程同时迁移)

    • 3.2 迁移:分为链表迁移、树迁移
      链表迁移

      • 将链表上的节点充分散列到新哈希表的index、index+旧哈希表长度的两个下标中(与HashMap类似)
      • 将index位置链表中的节点 (hash & 哈希表长度),结果为0的放到新数组的index位置,结果为1放到新数组index+旧哈希表长度的位置

        比如旧哈希表长度为16,在索引3的位置上,16的二进制是10000,hash&16 =>  hash& 10000 ,也就是说节点哈希值第5位是0就放到新哈希表的3位置上,是1就放到新哈希表的3+16下标
      • 使用头插法将计算结果为0构建成ln链表,为1构建成hn链表,为方便构建链表,会先寻找lastRun节点:lastRun节点及后续节点都为同一链表上的节点,方便迁移
        构建链表前先构建lastRun,比如图中lastRun e->f ,先将lastRun放到ln链表上,在遍历原始链表,遍历到a :a->e->f,遍历到b:b->a->e->f


      • 每迁移完一个索引位置就将转发节点设置到原哈希表对应位置,当其他线程进行读get操作时,根据转发节点来新哈希表中寻找,进行写put操作时,来帮助扩容(其他区间进行迁移)



扩容代码
[code]//tab 旧哈希表//nextTab 新哈希表private final void transfer(Node[] tab, Node[] nextTab) {        //1.计算每次迁移多少个槽        //n:哈希表长度(多少个槽)        int n = tab.length, stride;        //stride:每次负责迁移多少个槽        //NCPU: CPU核数        //如果是多核,每次迁移槽数 = 总槽数无符号右移3位(n/8)再除CPU核数          //每次最小迁移槽数 = MIN_TRANSFER_STRIDE = 16        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)            stride = MIN_TRANSFER_STRIDE; // subdivide range            //2.如果新哈希表为空,说明是初始化        if (nextTab == null) {            // initiating            try {                @SuppressWarnings("unchecked")                Node[] nt = (Node[])new Node[n = bound || finishing)                    advance = false;                //transferIndex= n || i + n >= nextn) {                int sc;                //如果完成迁移,设置哈希表、数量                if (finishing) {                    nextTable = null;                    table = nextTab;                    sizeCtl = (n >> 1);                    return;                }                //CAS 将sizeCtl数量-1 表示 一个线程迁移完成                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {                    //如果不是最后一条线程直接返回                    if ((sc - 2) != resizeStamp(n) = 0) {                            int runBit = fh & n;                            Node lastRun = f;                            //寻找lastRun节点                             for (Node p = f.next; p != null; p = p.next) {                                int b = p.hash & n;                                if (b != runBit) {                                    runBit = b;                                    lastRun = p;                                }                            }                            //如果最后一次计算值是0                            //lastRun节点以及后续节点计算值都是0构建成ln链表 否则 都是1构建成hn链表                            if (runBit == 0) {                                ln = lastRun;                                hn = null;                            }                            else {                                hn = lastRun;                                ln = null;                            }                                                        //遍历构建ln、hn链表 (头插)                            for (Node p = f; p != lastRun; p = p.next) {                                int ph = p.hash; K pk = p.key; V pv = p.val;                                //头插:Node构造第四个参数是后继节点                                if ((ph & n) == 0)                                    ln = new Node(ph, pk, pv, ln);                                else                                    hn = new Node(ph, pk, pv, hn);                            }                            //设置ln链表到i位置                            setTabAt(nextTab, i, ln);                            //设置hn链表到i+n位置                            setTabAt(nextTab, i + n, hn);                            //设置转发节点                            setTabAt(tab, i, fwd);                            advance = true;                        }                        //3.2.4.2 树迁移                        else if (f instanceof TreeBin) {                            TreeBin t = (TreeBin)f;                            TreeNode lo = null, loTail = null;                            TreeNode hi = null, hiTail = null;                            int lc = 0, hc = 0;                            for (Node e = t.first; e != null; e = e.next) {                                int h = e.hash;                                TreeNode p = new TreeNode                                    (h, e.key, e.val, null, null);                                if ((h & n) == 0) {                                    if ((p.prev = loTail) == null)                                        lo = p;                                    else                                        loTail.next = p;                                    loTail = p;                                    ++lc;                                }                                else {                                    if ((p.prev = hiTail) == null)                                        hi = p;                                    else                                        hiTail.next = p;                                    hiTail = p;                                    ++hc;                                }                            }                            ln = (lc

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

祗疼妳一个

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表