并发工具类
在JDK的并发包里提供了几个非常有用的并发容器和并发工具类。供我们在多线程开辟中进行使用。
5.1 ConcurrentHashMap
5.1.1 概述以及基本使用
在集合类中HashMap是比较常用的集合对象,但是HashMap是线程不安全的(多线程环境下可能会存在问题)。为了包管数据的安全性我们可以使用Hashtable,但是Hashtable的服从低下。
基于以上两个原因我们可以使用JDK1.5以后所提供的ConcurrentHashMap。
案例1:演示HashMap线程不安全
实现步骤
- 创建一个HashMap集合对象
- 创建两个线程对象,第一个线程对象向集合中添加元素(1-24),第二个线程对象向集合中添加元素(25-50);
- 主线程休眠1秒,以便让其他两个线程将数据填装完毕
- 从集合中找出键和值不相同的数据
测试类- public class HashMapDemo01 {
- public static void main(String[] args) {
- // 创建一个HashMap集合对象
- HashMap<String , String> hashMap = new HashMap<String , String>() ;
- // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
- Thread t1 = new Thread() {
- @Override
- public void run() {
- // 第一个线程对象向集合中添加元素(1-24)
- for(int x = 1 ; x < 25 ; x++) {
- hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 线程t2
- Thread t2 = new Thread() {
- @Override
- public void run() {
- // 第二个线程对象向集合中添加元素(25-50)
- for(int x = 25 ; x < 51 ; x++) {
- hashMap.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 启动线程
- t1.start();
- t2.start();
- System.out.println("----------------------------------------------------------");
- try {
- // 主线程休眠2s,以便让其他两个线程将数据填装完毕
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 从集合中找出键和值不相同的数据
- for(int x = 1 ; x < 51 ; x++) {
- // HashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
- if( !String.valueOf(x).equals( hashMap.get(String.valueOf(x)) ) ) {
- System.out.println(String.valueOf(x) + ":" + hashMap.get(String.valueOf(x)));
- }
- }
- }
- }
复制代码 控制台输出结果- ----------------------------------------------------------
- 5:null
复制代码 通过控制台的输出结果,我们可以看到在多线程操纵HashMap的时候,可能会出现线程安全问题。
注1:必要多次运行才可以看到详细的效果; 可以使用循环将代码进行改造,以便让问题方便的暴露出来!
案例2:演示Hashtable线程安全
测试类- public class HashtableDemo01 {
- public static void main(String[] args) {
- // 创建一个Hashtable集合对象
- Hashtable<String , String> hashtable = new Hashtable<String , String>() ;
- // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
- Thread t1 = new Thread() {
- @Override
- public void run() {
- // 第一个线程对象向集合中添加元素(1-24)
- for(int x = 1 ; x < 25 ; x++) {
- hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 线程t2
- Thread t2 = new Thread() {
- @Override
- public void run() {
- // 第二个线程对象向集合中添加元素(25-50)
- for(int x = 25 ; x < 51 ; x++) {
- hashtable.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 启动线程
- t1.start();
- t2.start();
- System.out.println("----------------------------------------------------------");
- try {
- // 主线程休眠2s,以便让其他两个线程将数据填装完毕
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 从集合中找出键和值不相同的数据
- for(int x = 1 ; x < 51 ; x++) {
- // Hashtable中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
- if( !String.valueOf(x).equals( hashtable.get(String.valueOf(x)) ) ) {
- System.out.println(String.valueOf(x) + ":" + hashtable.get(String.valueOf(x)));
- }
- }
-
- }
- }
复制代码 岂论该步伐运行多少次,都不会产生数据问题。因此也就证明Hashtable是线程安全的。
Hashtable包管线程安全的原理:
查看Hashtable的源码- public class Hashtable<K,V> extends Dictionary<K,V> implements Map<K,V>, Cloneable, java.io.Serializable {
-
- // Entry数组,一个Entry就相当于一个元素
- private transient Entry<?,?>[] table;
-
- // Entry类的定义
- private static class Entry<K,V> implements Map.Entry<K,V> {
- final int hash; // 当前key的hash码值
- final K key; // 键
- V value; // 值
- Entry<K,V> next; // 下一个节点
- }
-
- // 存储数据
- public synchronized V put(K key, V value){...}
-
- // 获取数据
- public synchronized V get(Object key){...}
-
- // 获取长度
- public synchronized int size(){...}
-
- ...
-
- }
复制代码 对应的结构如下图所示
Hashtable包管线程安全性的是使用方法全局锁进行实现的。在线程竞争激烈的情况下HashTable的服从非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable
的同步方法时,会进入阻塞状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈服从越低。
案例3:演示ConcurrentHashMap线程安全
测试类- public class ConcurrentHashMapDemo01 {
- public static void main(String[] args) {
- // 创建一个ConcurrentHashMap集合对象
- ConcurrentHashMap<String , String> concurrentHashMap = new ConcurrentHashMap<String , String>() ;
- // 创建两个线程对象,我们本次使用匿名内部类的方式去常见线程对象
- Thread t1 = new Thread() {
- @Override
- public void run() {
- // 第一个线程对象向集合中添加元素(1-24)
- for(int x = 1 ; x < 25 ; x++) {
- concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 线程t2
- Thread t2 = new Thread() {
- @Override
- public void run() {
- // 第二个线程对象向集合中添加元素(25-50)
- for(int x = 25 ; x < 51 ; x++) {
- concurrentHashMap.put(String.valueOf(x) , String.valueOf(x)) ;
- }
- }
- };
- // 启动线程
- t1.start();
- t2.start();
- System.out.println("----------------------------------------------------------");
- try {
- // 主线程休眠2s,以便让其他两个线程将数据填装完毕
- TimeUnit.SECONDS.sleep(2);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- // 从集合中找出键和值不相同的数据
- for(int x = 1 ; x < 51 ; x++) {
- // concurrentHashMap中的键就是当前循环变量的x这个数据的字符串表现形式 , 根据键找到值,然后在进行判断
- if( !String.valueOf(x).equals( concurrentHashMap.get(String.valueOf(x)) ) ) {
- System.out.println(String.valueOf(x) + ":" + concurrentHashMap.get(String.valueOf(x)));
- }
- }
- }
- }
复制代码 岂论该步伐运行多少次,都不会产生数据问题。因此也就证明ConcurrentHashMap是线程安全的。
5.1.2 源码分析
由于ConcurrentHashMap在jdk1.7和jdk1.8的时候实现原理不太相同,因此必要分别来讲解一下两个差别版本的实现原理。
1) jdk1.7版本
ConcurrentHashMap中的重要成员变量- public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
-
- /**
- * Segment翻译中文为"段" , 段数组对象
- */
- final Segment<K,V>[] segments;
-
- // Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
- static final class Segment<K,V> extends ReentrantLock implements Serializable {
-
- transient volatile int count; // Segment中元素的数量,由volatile修饰,支持内存可见性;
- transient int modCount; // 对table的大小造成影响的操作的数量(比如put或者remove操作);
- transient int threshold; // 扩容阈值;
- transient volatile HashEntry<K,V>[] table; // 链表数组,数组中的每一个元素代表了一个链表的头部;
- final float loadFactor; // 负载因子
-
- }
-
- // Segment中的元素是以HashEntry的形式存放在数组中的,其结构与普通HashMap的HashEntry基本一致,不同的是Segment的HashEntry,其value由 // volatile修饰,以支持内存可见性,即写操作对其他读线程即时可见。
- static final class HashEntry<K,V> {
- final int hash; // 当前节点key对应的哈希码值
- final K key; // 存储键
- volatile V value; // 存储值
- volatile HashEntry<K,V> next; // 下一个节点
- }
-
- }
复制代码 对应的结构如下图所示
简单来讲,就是ConcurrentHashMap比HashMap多了一次hash过程,第1次hash定位到Segment,第2次hash定位到HashEntry,然后链表搜索找到指定节点。在进行写操纵时,只需锁住写
元素地点的Segment即可(这种锁被称为分段锁),其他Segment无需加锁,从而产生锁竞争的概率大大减小,进步了并发读写的服从。该种实现方式的缺点是hash过程比普通的HashMap要长
(因为必要进行两次hash操纵)。
ConcurrentHashMap的put方法源码分析- public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
-
- public V put(K key, V value) {
-
- // 定义一个Segment对象
- Segment<K,V> s;
-
- // 如果value的值为空,那么抛出异常
- if (value == null) throw new NullPointerException();
-
- // hash函数获取key的hashCode,然后做了一些处理
- int hash = hash(key);
-
- // 通过key的hashCode定位segment
- int j = (hash >>> segmentShift) & segmentMask;
-
- // 对定位的Segment进行判断,如果Segment为空,调用ensureSegment进行初始化操作(第一次hash定位)
- if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
- s = ensureSegment(j);
-
- // 调用Segment对象的put方法添加元素
- return s.put(key, hash, value, false);
- }
-
- // Segment是一种可ReentrantLock,在ConcurrentHashMap里扮演锁的角色,将一个大的table分割成多个小的table进行加锁。
- static final class Segment<K,V> extends ReentrantLock implements Serializable {
-
- // 添加元素
- final V put(K key, int hash, V value, boolean onlyIfAbsent) {
-
- // 尝试对该段进行加锁,如果加锁失败,则调用scanAndLockForPut方法;在该方法中就要进行再次尝试或者进行自旋等待
- HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
- V oldValue;
- try {
-
- // 获取HashEntry数组对象
- HashEntry<K,V>[] tab = table;
-
- // 根据key的hashCode值计算索引(第二次hash定位)
- int index = (tab.length - 1) & hash;
- HashEntry<K,V> first = entryAt(tab, index);
- for (HashEntry<K,V> e = first;;)
-
- // 若不为null
- if (e != null) {
- K k;
-
- // 判读当前节点的key是否和链表头节点的key相同(依赖于hashCode方法和equals方法)
- // 如果相同,值进行更新
- if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
- oldValue = e.value;
- if (!onlyIfAbsent) {
- e.value = value;
- ++modCount;
- }
- break;
- }
-
- e = e.next;
- } else { // 若头结点为null
-
- // 将新节点添加到链表中
- if (node != null)
- node.setNext(first);
- else
- node = new HashEntry<K,V>(hash, key, value, first);
- int c = count + 1;
-
- // 如果超过阈值,则进行rehash操作
- if (c > threshold && tab.length < MAXIMUM_CAPACITY)
- rehash(node);
- else
- setEntryAt(tab, index, node);
- ++modCount;
- count = c;
- oldValue = null;
- break;
- }
- }
- } finally {
- unlock();
- }
-
- return oldValue;
- }
-
- }
-
- }
复制代码注:源代码进行简单讲解即可(核心:进行了两次哈希定位以及加锁过程)
2) jdk1.8版本
在JDK1.8中为了进一步优化ConcurrentHashMap的性能,去掉了Segment分段锁的计划。在数据结构方面,则是跟HashMap一样,使用一个哈希表table数组。(数组 + 链表 + 红黑树)
而线程安全方面是结合CAS机制 + 局部锁实现的,减低锁的粒度,进步性能。同时在HashMap的基础上,对哈希表table数组和链表节点的value,next指针等使用volatile来修饰,从而
实现线程可见性。
ConcurrentHashMap中的重要成员变量- public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
-
- // Node数组
- transient volatile Node<K,V>[] table;
-
- // Node类的定义
- static class Node<K,V> implements Map.Entry<K,V> {
-
- final int hash; // 当前key的hashCode值
- final K key; // 键
- volatile V val; // 值
- volatile Node<K,V> next; // 下一个节点
-
- }
-
- // TreeNode类的定义
- static final class TreeNode<K,V> extends Node<K,V> {
- TreeNode<K,V> parent; // 父节点
- TreeNode<K,V> left; // 左子节点
- TreeNode<K,V> right; // 右子节点
- TreeNode<K,V> prev; // needed to unlink next upon deletion
- boolean red; // 节点的颜色状态
- }
-
- }
复制代码 对应的结构如下图
ConcurrentHashMap的put方法源码分析
[code]public class ConcurrentHashMap extends AbstractMap implements ConcurrentMap, Serializable { // 添加元素 public V put(K key, V value) { return putVal(key, value, false); } // putVal方法定义 final V putVal(K key, V value, boolean onlyIfAbsent) { // key为null直接抛出非常 if (key == null || value == null) throw new NullPointerException(); // 计算key所对应的hashCode值 int hash = spread(key.hashCode()); int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; // 哈希表假如不存在,那么此时初始化哈希表 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 通过hash值计算key在table表中的索引,将其值赋值给变量i,然后根据索引找到对应的Node,假如Node为null,做出处置惩罚 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 新增链表头结点,cas方式添加到哈希表table if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; // f为链表头结点,使用synchronized加锁 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; // 节点已经存在,更新value即可 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 该key对应的节点不存在,则新增节点并添加到该链表的末尾 Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 红黑树节点,则往该红黑树更新或添加该节点即可 Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 判断是否必要将链表转为红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; } // CAS算法的核心类 private static final sun.misc.Unsafe U; static { try { U = sun.misc.Unsafe.getUnsafe(); ... } catch (Exception e) { throw new Error(e); } } // 原子获取链表节点 static final Node tabAt(Node[] tab, int i) { return (Node)U.getObjectVolatile(tab, ((long)i |