SynchronousQueue介绍
【1】SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take。
【2】如图所示,SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
【3】需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递(direct handoff)。由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
SynchronousQueue的源码分析
【1】构造函数- //默认采用非公平
- public SynchronousQueue() {
- this(false);
- }
- //可以选择模式
- public SynchronousQueue(boolean fair) {
- transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
- }
复制代码 【2】核心方法分析- //这些方法本质上都是调用属性值transferer的transfer方法
- public void put(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (transferer.transfer(e, false, 0) == null) {
- Thread.interrupted();
- throw new InterruptedException();
- }
- }
- public boolean offer(E e) {
- if (e == null) throw new NullPointerException();
- return transferer.transfer(e, true, 0) != null;
- }
- public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
- return true;
- if (!Thread.interrupted())
- return false;
- throw new InterruptedException();
- }
- public E take() throws InterruptedException {
- E e = transferer.transfer(null, false, 0);
- if (e != null)
- return e;
- Thread.interrupted();
- throw new InterruptedException();
- }
- public E poll(long timeout, TimeUnit unit) throws InterruptedException {
- E e = transferer.transfer(null, true, unit.toNanos(timeout));
- if (e != null || !Thread.interrupted())
- return e;
- throw new InterruptedException();
- }
- public E poll() {
- return transferer.transfer(null, true, 0);
- }
复制代码 s
Transferer分析
【1】Transferer是SynchronousQueue的内部抽象类,双栈和双队列算法共享该类。他只有一个transfer方法,用于转移元素,从生产者转移到消费者;或者消费者调用该方法从生产者取数据。
【2】Transferer有两个实现类:TransferQueue和TransferStack。
【3】这两个类的区别就在于是否公平。TransferQueue是公平的,TransferStack非公平。
【4】源码展示- // 堆栈和队列共同的接口,负责执行 put or take
- abstract static class Transferer<E> {
- // e 为空的,会直接返回特殊值,不为空会传递给消费者
- // timed 为 true,说明会有超时时间
- abstract E transfer(E e, boolean timed, long nanos);
- }
复制代码
TransferQueue分析
【1】节点元素- //队列节点元素
- static final class QNode {
- // 当前元素的下一个元素
- volatile QNode next;
- // 当前元素的值,如果当前元素被阻塞住了,等其他线程来唤醒自己时,其他线程会把自己 set 到 item 里面
- volatile Object item;
- // 可以阻塞住的当前线程
- volatile Thread waiter;
- // 节点类型:true是 put,false是 take
- final boolean isData;
- ....
- }
复制代码 【2】构造方法- //队列头结点指针
- transient volatile QNode head;
- //队列尾结点指针
- transient volatile QNode tail;
- TransferQueue() {
- QNode h = new QNode(null, false); // initialize to dummy node.
- head = h;
- tail = h;
- }
复制代码 【3】核心方法
[code]@SuppressWarnings("unchecked")E transfer(E e, boolean timed, long nanos) { QNode s = null; //根据是否传入数据 判断是获取还是存放 boolean isData = (e != null); for (;;) { // 队列头和尾的临时变量,队列是空的时候,t=h QNode t = tail; QNode h = head; // tail 和 head 没有初始化时,无限循环,虽然这种 continue 非常耗cpu,但感觉不会碰到这种情况 // 因为 tail 和 head 在 TransferQueue 初始化的时候,就已经被赋值空节点了 if (t == null || h == null) // saw uninitialized value continue; // spin // 首尾节点相同,说明是空队列 // 或者尾节点的操作和当前节点操作一致 if (h == t || t.isData == isData) { // empty or same-mode QNode tn = t.next; if (t != tail) //直至拿到尾节点 continue; if (tn != null) { // lagging tail advanceTail(t, tn); continue; } //超时直接返回 null if (timed && nanos |