SynchronousQueue详解

打印 上一主题 下一主题

主题 781|帖子 781|积分 2343

SynchronousQueue介绍

  【1】SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。
             
  【2】如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
  【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
SynchronousQueue的源码分析

  【1】构造函数
  1. //默认采用非公平
  2. public SynchronousQueue() {
  3.     this(false);
  4. }
  5. //可以选择模式
  6. public SynchronousQueue(boolean fair) {
  7.     transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
  8. }
复制代码
  【2】核心方法分析
  1. //这些方法本质上都是调用属性值transferer的transfer方法
  2. public void put(E e) throws InterruptedException {
  3.     if (e == null) throw new NullPointerException();
  4.     if (transferer.transfer(e, false, 0) == null) {
  5.         Thread.interrupted();
  6.         throw new InterruptedException();
  7.     }
  8. }
  9. public boolean offer(E e) {
  10.     if (e == null) throw new NullPointerException();
  11.     return transferer.transfer(e, true, 0) != null;
  12. }
  13. public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
  14.     if (e == null) throw new NullPointerException();
  15.     if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
  16.         return true;
  17.     if (!Thread.interrupted())
  18.         return false;
  19.     throw new InterruptedException();
  20. }
  21. public E take() throws InterruptedException {
  22.     E e = transferer.transfer(null, false, 0);
  23.     if (e != null)
  24.         return e;
  25.     Thread.interrupted();
  26.     throw new InterruptedException();
  27. }
  28. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  29.     E e = transferer.transfer(null, true, unit.toNanos(timeout));
  30.     if (e != null || !Thread.interrupted())
  31.         return e;
  32.     throw new InterruptedException();
  33. }
  34. public E poll() {
  35.     return transferer.transfer(null, true, 0);
  36. }
复制代码
s
Transferer分析

  【1】Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。
  【2】Transferer有两个实现类:TransferQueue和TransferStack。
  【3】这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。
  【4】源码展示
  1. // 堆栈和队列共同的接口,负责执行 put or take
  2. abstract static class Transferer<E> {
  3.     // e 为空的,会直接返回特殊值,不为空会传递给消费者
  4.     // timed 为 true,说明会有超时时间
  5.     abstract E transfer(E e, boolean timed, long nanos);
  6. }
复制代码
 
TransferQueue分析

  【1】节点元素
  1. //队列节点元素
  2. static final class QNode {
  3.     // 当前元素的下一个元素
  4.     volatile QNode next;         
  5.     // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
  6.     volatile Object item;         
  7.     // 可以阻塞住的当前线程
  8.     volatile Thread waiter;      
  9.     // 节点类型:true是 put,false是 take
  10.     final boolean isData;         
  11.    ....
  12. }
复制代码
  【2】构造方法
  1. //队列头结点指针
  2. transient volatile QNode head;
  3. //队列尾结点指针
  4. transient volatile QNode tail;
  5. TransferQueue() {
  6.     QNode h = new QNode(null, false); // initialize to dummy node.
  7.     head = h;
  8.     tail = h;
  9. }
复制代码
  【3】核心方法
[code]@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) {    QNode s = null;     //根据是否传入数据 判断是获取还是存放     boolean isData = (e != null);    for (;;) {        // 队列头和尾的临时变量,队列是空的时候,t=h        QNode t = tail;        QNode h = head;        // tail 和 head 没有初始化时,无限循环,虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况        // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了        if (t == null || h == null)         // saw uninitialized value            continue;                       // spin        // 首尾节点相同,说明是空队列        // 或者尾节点的操作和当前节点操作一致        if (h == t || t.isData == isData) { // empty or same-mode            QNode tn = t.next;            if (t != tail)                  //直至拿到尾节点                continue;            if (tn != null) {               // lagging tail                advanceTail(t, tn);                continue;            }            //超时直接返回 null            if (timed && nanos
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

羊蹓狼

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表