ConcurrentHashMap(并发工具类)

打印 上一主题 下一主题

主题 907|帖子 907|积分 2721

并发工具类

在JDK的并发包里提供了几个非常有用的并发容器和并发工具类。供我们在多线程开辟中进行使用。
5.1 ConcurrentHashMap

5.1.1 概述以及基本使用

在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了包管数据的安全性我们可以使用Hashtable,但是Hashtable的服从低下。
基于以上两个原因我们可以使用JDK1.5以后所提供的ConcurrentHashMap。
案例1:演示HashMap线程不安全
实现步骤

  • 创建一个HashMap集合对象
  • 创建两个线程对象,第一个线程对象向集合中添加元素(1-24),第二个线程对象向集合中添加元素(25-50);
  • 主线程休眠1秒,以便让其他两个线程将数据填装完毕
  • 从集合中找出键和值不相同的数据
测试类
  1. public class HashMapDemo01 {
  2.     public static void main(String[] args) {
  3.         // 创建一个HashMap集合对象
  4.         HashMap<String , String> hashMap = new HashMap<String , String>() ;
  5.         // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
  6.         Thread t1 = new Thread() {
  7.             @Override
  8.             public void run() {
  9.                 // 第一个线程对象向集合中添加元素(1-24)
  10.                 for(int x = 1 ; x < 25 ; x++) {
  11.                     hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
  12.                 }
  13.             }
  14.         };
  15.         // 线程t2
  16.         Thread t2 = new Thread() {
  17.             @Override
  18.             public void run() {
  19.                 // 第二个线程对象向集合中添加元素(25-50)
  20.                 for(int x = 25 ; x < 51 ; x++) {
  21.                     hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
  22.                 }
  23.             }
  24.         };
  25.         // 启动线程
  26.         t1.start();
  27.         t2.start();
  28.         System.out.println("----------------------------------------------------------");
  29.         try {
  30.             // 主线程休眠2s,以便让其他两个线程将数据填装完毕
  31.             TimeUnit.SECONDS.sleep(2);
  32.         } catch (InterruptedException e) {
  33.             e.printStackTrace();
  34.         }
  35.         // 从集合中找出键和值不相同的数据
  36.         for(int x = 1 ; x < 51 ; x++) {
  37.             // HashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
  38.             if( !String.valueOf(x).equals( hashMap.get(String.valueOf(x)) ) ) {
  39.                 System.out.println(String.valueOf(x) + ":" + hashMap.get(String.valueOf(x)));
  40.             }
  41.         }
  42.     }
  43. }
复制代码
控制台输出结果
  1. ----------------------------------------------------------
  2. 5:null
复制代码
通过控制台的输出结果,我们可以看到在多线程操纵HashMap的时候,可能会出现线程安全问题。
注1:必要多次运行才可以看到详细的效果; 可以使用循环将代码进行改造,以便让问题方便的暴露出来!
案例2:演示Hashtable线程安全
测试类
  1. public class HashtableDemo01 {
  2.     public static void main(String[] args) {
  3.         // 创建一个Hashtable集合对象
  4.         Hashtable<String , String> hashtable = new Hashtable<String , String>() ;
  5.         // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
  6.         Thread t1 = new Thread() {
  7.             @Override
  8.             public void run() {
  9.                 // 第一个线程对象向集合中添加元素(1-24)
  10.                 for(int x = 1 ; x < 25 ; x++) {
  11.                     hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
  12.                 }
  13.             }
  14.         };
  15.         // 线程t2
  16.         Thread t2 = new Thread() {
  17.             @Override
  18.             public void run() {
  19.                 // 第二个线程对象向集合中添加元素(25-50)
  20.                 for(int x = 25 ; x < 51 ; x++) {
  21.                     hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
  22.                 }
  23.             }
  24.         };
  25.         // 启动线程
  26.         t1.start();
  27.         t2.start();
  28.         System.out.println("----------------------------------------------------------");
  29.         try {
  30.             // 主线程休眠2s,以便让其他两个线程将数据填装完毕
  31.             TimeUnit.SECONDS.sleep(2);
  32.         } catch (InterruptedException e) {
  33.             e.printStackTrace();
  34.         }
  35.         // 从集合中找出键和值不相同的数据
  36.         for(int x = 1 ; x < 51 ; x++) {
  37.             // Hashtable中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
  38.             if( !String.valueOf(x).equals( hashtable.get(String.valueOf(x)) ) ) {
  39.                 System.out.println(String.valueOf(x) + ":" + hashtable.get(String.valueOf(x)));
  40.             }
  41.         }
  42.         
  43.     }
  44. }
复制代码
岂论该步伐运行多少次,都不会产生数据问题。因此也就证明Hashtable是线程安全的。
Hashtable包管线程安全的原理
查看Hashtable的源码
  1. public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable {
  2.    
  3.     // Entry数组,一个Entry就相当于一个元素
  4.     private transient Entry<?,?>[] table;
  5.    
  6.     // Entry类的定义
  7.     private static class Entry<K,V> implements Map.Entry<K,V> {
  8.         final int hash;                // 当前key的hash码值
  9.         final K key;                // 键
  10.         V value;                        // 值
  11.         Entry<K,V> next;        // 下一个节点
  12.     }
  13.    
  14.     // 存储数据
  15.     public synchronized V put(K key, V value){...}
  16.    
  17.     // 获取数据
  18.     public synchronized V get(Object key){...}
  19.    
  20.     // 获取长度
  21.     public synchronized int size(){...}
  22.    
  23.     ...
  24.    
  25. }
复制代码
对应的结构如下图所示

Hashtable包管线程安全性的是使用方法全局锁进行实现的。在线程竞争激烈的情况下HashTable的服从非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable
的同步方法时,会进入阻塞状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈服从越低。
案例3:演示ConcurrentHashMap线程安全
测试类
  1. public class ConcurrentHashMapDemo01 {
  2.     public static void main(String[] args) {
  3.         // 创建一个ConcurrentHashMap集合对象
  4.         ConcurrentHashMap<String , String> concurrentHashMap = new ConcurrentHashMap<String , String>() ;
  5.         // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
  6.         Thread t1 = new Thread() {
  7.             @Override
  8.             public void run() {
  9.                 // 第一个线程对象向集合中添加元素(1-24)
  10.                 for(int x = 1 ; x < 25 ; x++) {
  11.                     concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
  12.                 }
  13.             }
  14.         };
  15.         // 线程t2
  16.         Thread t2 = new Thread() {
  17.             @Override
  18.             public void run() {
  19.                 // 第二个线程对象向集合中添加元素(25-50)
  20.                 for(int x = 25 ; x < 51 ; x++) {
  21.                     concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
  22.                 }
  23.             }
  24.         };
  25.         // 启动线程
  26.         t1.start();
  27.         t2.start();
  28.         System.out.println("----------------------------------------------------------");
  29.         try {
  30.             // 主线程休眠2s,以便让其他两个线程将数据填装完毕
  31.             TimeUnit.SECONDS.sleep(2);
  32.         } catch (InterruptedException e) {
  33.             e.printStackTrace();
  34.         }
  35.         // 从集合中找出键和值不相同的数据
  36.         for(int x = 1 ; x < 51 ; x++) {
  37.             // concurrentHashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
  38.             if( !String.valueOf(x).equals( concurrentHashMap.get(String.valueOf(x)) ) ) {
  39.                 System.out.println(String.valueOf(x) + ":" + concurrentHashMap.get(String.valueOf(x)));
  40.             }
  41.         }
  42.     }
  43. }
复制代码
岂论该步伐运行多少次,都不会产生数据问题。因此也就证明ConcurrentHashMap是线程安全的。
5.1.2 源码分析

由于ConcurrentHashMap在jdk1.7和jdk1.8的时候实现原理不太相同,因此必要分别来讲解一下两个差别版本的实现原理。
1) jdk1.7版本

ConcurrentHashMap中的重要成员变量
  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
  2.    
  3.     /**
  4.      * Segment翻译中文为"段" , 段数组对象
  5.      */
  6.     final Segment<K,V>[] segments;
  7.    
  8.     // Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
  9.     static final class Segment<K,V> extends ReentrantLock implements Serializable {
  10.         
  11.         transient volatile int count;                            // Segment中元素的数量,由volatile修饰,支持内存可见性;
  12.         transient int modCount;                                                 // 对table的大小造成影响的操作的数量(比如put或者remove操作);
  13.         transient int threshold;                                         // 扩容阈值;
  14.         transient volatile HashEntry<K,V>[] table;  // 链表数组,数组中的每一个元素代表了一个链表的头部;
  15.         final float loadFactor;                                                 // 负载因子
  16.         
  17.     }
  18.    
  19.     // Segment中的元素是以HashEntry的形式存放在数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由                     // volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。
  20.     static final class HashEntry<K,V> {
  21.         final int hash;                                        // 当前节点key对应的哈希码值
  22.         final K key;                                        // 存储键
  23.         volatile V value;                                // 存储值
  24.         volatile HashEntry<K,V> next;        // 下一个节点
  25.     }
  26.    
  27. }
复制代码
对应的结构如下图所示

简单来讲,就是ConcurrentHashMap比HashMap多了一次hash过程,第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。在进行写操纵时,只需锁住写
元素地点的Segment即可(这种锁被称为分段锁),其他Segment无需加锁,从而产生锁竞争的概率大大减小,进步了并发读写的服从。该种实现方式的缺点是hash过程比普通的HashMap要长
(因为必要进行两次hash操纵)。
ConcurrentHashMap的put方法源码分析
  1. public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
  2.    
  3.     public V put(K key, V value) {
  4.         
  5.         // 定义一个Segment对象
  6.         Segment<K,V> s;
  7.         
  8.         // 如果value的值为空,那么抛出异常
  9.         if (value == null) throw new NullPointerException();
  10.         
  11.         // hash函数获取key的hashCode,然后做了一些处理
  12.         int hash = hash(key);
  13.         
  14.         // 通过key的hashCode定位segment
  15.         int j = (hash >>> segmentShift) & segmentMask;
  16.         
  17.         // 对定位的Segment进行判断,如果Segment为空,调用ensureSegment进行初始化操作(第一次hash定位)
  18.         if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
  19.             s = ensureSegment(j);
  20.         
  21.         // 调用Segment对象的put方法添加元素
  22.         return s.put(key, hash, value, false);
  23.     }
  24.    
  25.     // Segment是一种可ReentrantLock,在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
  26.     static final class Segment<K,V> extends ReentrantLock implements Serializable {
  27.         
  28.         // 添加元素
  29.         final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  30.             
  31.             // 尝试对该段进行加锁,如果加锁失败,则调用scanAndLockForPut方法;在该方法中就要进行再次尝试或者进行自旋等待
  32.             HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
  33.             V oldValue;
  34.             try {
  35.                
  36.                 // 获取HashEntry数组对象
  37.                 HashEntry<K,V>[] tab = table;
  38.                
  39.                 // 根据key的hashCode值计算索引(第二次hash定位)
  40.                 int index = (tab.length - 1) & hash;
  41.                 HashEntry<K,V> first = entryAt(tab, index);
  42.                 for (HashEntry<K,V> e = first;;)
  43.                     
  44.                     // 若不为null
  45.                     if (e != null) {
  46.                         K k;
  47.                         
  48.                         // 判读当前节点的key是否和链表头节点的key相同(依赖于hashCode方法和equals方法)
  49.                         // 如果相同,值进行更新
  50.                         if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
  51.                             oldValue = e.value;
  52.                             if (!onlyIfAbsent) {
  53.                                 e.value = value;
  54.                                 ++modCount;
  55.                             }
  56.                             break;
  57.                         }
  58.                         
  59.                         e = e.next;
  60.                     } else {  // 若头结点为null
  61.                         
  62.                         // 将新节点添加到链表中
  63.                         if (node != null)
  64.                             node.setNext(first);
  65.                         else
  66.                             node = new HashEntry<K,V>(hash, key, value, first);
  67.                         int c = count + 1;
  68.                         
  69.                         // 如果超过阈值,则进行rehash操作
  70.                         if (c > threshold && tab.length < MAXIMUM_CAPACITY)
  71.                             rehash(node);
  72.                         else
  73.                             setEntryAt(tab, index, node);
  74.                         ++modCount;
  75.                         count = c;
  76.                         oldValue = null;
  77.                         break;
  78.                     }
  79.                 }
  80.             } finally {
  81.                 unlock();
  82.             }
  83.         
  84.             return oldValue;
  85.         }        
  86.         
  87.     }
  88.    
  89. }
复制代码
注:源代码进行简单讲解即可(核心:进行了两次哈希定位以及加锁过程)
2) jdk1.8版本

在JDK1.8中为了进一步优化ConcurrentHashMap的性能,去掉了Segment分段锁的计划。在数据结构方面,则是跟HashMap一样,使用一个哈希表table数组。(数组 + 链表 + 红黑树)
而线程安全方面是结合CAS机制 + 局部锁实现的,减低锁的粒度,进步性能。同时在HashMap的基础上,对哈希表table数组和链表节点的value,next指针等使用volatile来修饰,从而
实现线程可见性。
ConcurrentHashMap中的重要成员变量
  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  2.    
  3.     // Node数组
  4.     transient volatile Node<K,V>[] table;
  5.    
  6.     // Node类的定义
  7.     static class Node<K,V> implements Map.Entry<K,V> {
  8.         
  9.         final int hash;                                // 当前key的hashCode值
  10.         final K key;                                // 键
  11.         volatile V val;                                // 值
  12.         volatile Node<K,V> next;        // 下一个节点
  13.         
  14.     }
  15.    
  16.     // TreeNode类的定义
  17.     static final class TreeNode<K,V> extends Node<K,V> {
  18.         TreeNode<K,V> parent;  // 父节点
  19.         TreeNode<K,V> left;           // 左子节点
  20.         TreeNode<K,V> right;   // 右子节点
  21.         TreeNode<K,V> prev;    // needed to unlink next upon deletion
  22.         boolean red;                   // 节点的颜色状态
  23.     }
  24.    
  25. }
复制代码
对应的结构如下图

ConcurrentHashMap的put方法源码分析
[code]public class ConcurrentHashMap extends AbstractMap implements ConcurrentMap, Serializable {        // 添加元素    public V put(K key, V value) {            return putVal(key, value, false);        }        // putVal方法定义    final V putVal(K key, V value, boolean onlyIfAbsent) {                // key为null直接抛出非常        if (key == null || value == null) throw new NullPointerException();                // 计算key所对应的hashCode值        int hash = spread(key.hashCode());        int binCount = 0;        for (Node[] tab = table;;) {            Node f; int n, i, fh;                        // 哈希表假如不存在,那么此时初始化哈希表            if (tab == null || (n = tab.length) == 0)                tab = initTable();                        // 通过hash值计算key在table表中的索引,将其值赋值给变量i,然后根据索引找到对应的Node,假如Node为null,做出处置惩罚            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {                                // 新增链表头结点,cas方式添加到哈希表table                if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break;                               }            else if ((fh = f.hash) == MOVED)                tab = helpTransfer(tab, f);            else {                V oldVal = null;                                // f为链表头结点,使用synchronized加锁                synchronized (f) {                    if (tabAt(tab, i) == f) {                        if (fh >= 0) {                            binCount = 1;                            for (Node e = f;; ++binCount) {                                K ek;                                                                // 节点已经存在,更新value即可                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {                                    oldVal = e.val;                                    if (!onlyIfAbsent)                                        e.val = value;                                    break;                                }                                                                // 该key对应的节点不存在,则新增节点并添加到该链表的末尾                                Node pred = e;                                if ((e = e.next) == null) {                                    pred.next = new Node(hash, key, value, null);                                    break;                                }                                                            }                                                    } else if (f instanceof TreeBin) { // 红黑树节点,则往该红黑树更新或添加该节点即可                            Node p;                            binCount = 2;                            if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) {                                oldVal = p.val;                                if (!onlyIfAbsent)                                    p.val = value;                            }                        }                    }                }                                // 判断是否必要将链表转为红黑树                if (binCount != 0) {                    if (binCount >= TREEIFY_THRESHOLD)                        treeifyBin(tab, i);                    if (oldVal != null)                        return oldVal;                    break;                }            }        }        addCount(1L, binCount);        return null;    }        // CAS算法的核心类    private static final sun.misc.Unsafe U;    static {        try {            U = sun.misc.Unsafe.getUnsafe();            ...        } catch (Exception e) {            throw new Error(e);        }    }        // 原子获取链表节点    static final  Node tabAt(Node[] tab, int i) {        return (Node)U.getObjectVolatile(tab, ((long)i

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

用户云卷云舒

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表