ConcurrentLinkedQueue详解(图文并茂)

打印 上一主题 下一主题

主题 915|帖子 915|积分 2745

前言

ConcurrentLinkedQueue是基于链接节点的无界限程安全队列。此队列按照FIFO(先辈先出)原则对元素进行排序。队列的头部是队列中存在时间最长的元素,而队列的尾部则是最近添加的元素。新的元素总是被插入到队列的尾部,而队列的获取操纵(例如poll或peek)则是从队列头部开始。
与传统的LinkedList差别,ConcurrentLinkedQueue使用了一种高效的非壅闭算法,被称为无锁编程(Lock-Free programming),它通过原子变量和CAS(Compare-And-Swap)操纵来保证线程安全,而不是通过传统的锁机制。这使得它在高并发场景下具有出色的性能表现。
可以看做一个线程安全的LinkedList,是一个线程安全的无界队列,但LinkedList是一个双向链表,而ConcurrentLinkedQueue是单向链表。
ConcurrentLinkedQueue线程安全在于设置head、tail以及next指针时都用的cas操纵,而且node里的item和next变量都是用volatile修饰,保证了多线程下变量的可见性。而ConcurrentLinkedQueue的所有读操纵都是无锁的,所以可能读会存在不一致性。
应用场景

假如对队列加锁的成本较高则适合使用无锁的 ConcurrentLinkedQueue 来替换。适合在对性能要求相对较高,同时有多个线程对队列进行读写的场景。
ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如壅闭队列常见,毕竟取数据也要不绝的去循环,不如壅闭的计划,但是在并发量特殊大的情况下,是个不错的选择,性能上好很多,而且这个队列的计划也是特殊费力,尤其的使用的改良算法和对哨兵的处理。
紧张方法

ConcurrentLinkedQueue提供了丰富的方法来操纵队列,包括:

  • offer(E e):将指定的元素插入此队列的尾部。
  • add(E e):将指定的元素插入此队列的尾部(与offer方法功能雷同,但在失败时抛出异常)。
  • poll():获取并移除此队列的头部,假云云队列为空,则返回null。
  • peek():获取但不移除此队列的头部,假云云队列为空,则返回null。
  • size():返回此队列中的元素数量。需要注意的是,由于并发的原因,这个方法返回的结果可能并禁绝确。假如需要在并发情况下获取准确的元素数量,建议使用java.util.concurrent.atomic包中的原子变量进行计数。
  • isEmpty():检查此队列是否为空。与size()方法类似,由于并发的原因,这个方法返回的结果也可能禁绝确。
需要注意的是,在并发情况下使用size()和isEmpty()方法时需要特殊小心,因为它们的结果可能并禁绝确。假如需要准确的元素数量或空队列检测,建议使用额外的同步机制或原子变量来实现。
底层源码

类的内部类
  1. private static class Node<E> {
  2.     // 元素
  3.     volatile E item;
  4.     // next域
  5.     volatile Node<E> next;
  6.     /**
  7.     * Constructs a new node.  Uses relaxed write because item can
  8.     * only be seen after publication via casNext.
  9.     */
  10.     // 构造函数
  11.     Node(E item) {
  12.         // 设置item的值
  13.         UNSAFE.putObject(this, itemOffset, item);
  14.     }
  15.     // 比较并替换item值
  16.     boolean casItem(E cmp, E val) {
  17.         return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
  18.     }
  19.    
  20.     void lazySetNext(Node<E> val) {
  21.         // 设置next域的值,并不会保证修改对其他线程立即可见
  22.         UNSAFE.putOrderedObject(this, nextOffset, val);
  23.     }
  24.     // 比较并替换next域的值
  25.     boolean casNext(Node<E> cmp, Node<E> val) {
  26.         return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
  27.     }
  28.     // Unsafe mechanics
  29.     // 反射机制
  30.     private static final sun.misc.Unsafe UNSAFE;
  31.     // item域的偏移量
  32.     private static final long itemOffset;
  33.     // next域的偏移量
  34.     private static final long nextOffset;
  35.     static {
  36.         try {
  37.             UNSAFE = sun.misc.Unsafe.getUnsafe();
  38.             Class<?> k = Node.class;
  39.             itemOffset = UNSAFE.objectFieldOffset
  40.                 (k.getDeclaredField("item"));
  41.             nextOffset = UNSAFE.objectFieldOffset
  42.                 (k.getDeclaredField("next"));
  43.         } catch (Exception e) {
  44.             throw new Error(e);
  45.         }
  46.     }
  47. }
复制代码
说明: Node类表示链表结点,用于存放元素,包含item域和next域,item域表示元素,next域表示下一个结点,其使用反射机制和CAS机制来更新item域和next域,保证原子性。
类的属性
  1. public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
  2.         implements Queue<E>, java.io.Serializable {
  3.     // 版本序列号        
  4.     private static final long serialVersionUID = 196745693267521676L;
  5.     // 反射机制
  6.     private static final sun.misc.Unsafe UNSAFE;
  7.     // head域的偏移量
  8.     private static final long headOffset;
  9.     // tail域的偏移量
  10.     private static final long tailOffset;
  11.     static {
  12.         try {
  13.             UNSAFE = sun.misc.Unsafe.getUnsafe();
  14.             Class<?> k = ConcurrentLinkedQueue.class;
  15.             headOffset = UNSAFE.objectFieldOffset
  16.                 (k.getDeclaredField("head"));
  17.             tailOffset = UNSAFE.objectFieldOffset
  18.                 (k.getDeclaredField("tail"));
  19.         } catch (Exception e) {
  20.             throw new Error(e);
  21.         }
  22.     }
  23.    
  24.     // 头节点
  25.     private transient volatile Node<E> head;
  26.     // 尾结点
  27.     private transient volatile Node<E> tail;
  28. }
复制代码
说明: 属性中包含了head域和tail域,表示链表的头节点和尾结点,同时,ConcurrentLinkedQueue也使用了反射机制和CAS机制来更新头节点和尾结点,保证原子性。
类的构造函数


  • ConcurrentLinkedQueue()
  1. public ConcurrentLinkedQueue() {
  2.     // 初始化头节点与尾结点
  3.     head = tail = new Node<E>(null);
  4. }
复制代码
说明: 该构造函数用于创建一个最初为空的 ConcurrentLinkedQueue,头节点与尾结点指向同一个结点,该结点的item域为null,next域也为null。
<ul>ConcurrentLinkedQueue(Collection
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

我可以不吃啊

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表