阻塞队列源码系列(七):SynchronousQueue

Posted by 陈树义 on 2021-06-20

SynchronousQueue 是 JDK 1.5 时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable

SynchronousQueue 一般用于交换数据,当一个线程插入数据时,必须等待另一个线程插入数据匹配后才返回,否则会一直阻塞等待。

原理

下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员变量

private transient volatile Transferer<E> transferer;

SynchronousQueue 类中的 Transferer 对象是实现数据交换的具体实现。

构造方法

SynchronousQueue 一共有两个构造函数。

public SynchronousQueue() {
    this(false);
}
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

从第二个构造函数可以知道看出:如果是公平模式的话,那么使用 TransferQueue 实现。如果是非公平模式的话,那么使用 TransferStack 实现。

我们继续看看 TransferQueue 的结构。

/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
TransferQueue() {
    QNode h = new QNode(null, false); // initialize to dummy node.
    head = h;
    tail = h;
}
static final class QNode {
    volatile QNode next;          // 下个节点
    volatile Object item;         // 节点值
    volatile Thread waiter;       // 等待匹配的线程
    final boolean isData;         // 是否是生产者
}

可以看出 TransferQueue 是使用 QNode 作为链表节点。

我们继续看看 TransferStack 的结构。

// 请求节点
static final int REQUEST    = 0;
// 数据节点
static final int DATA       = 1;
// 正在匹配节点
static final int FULFILLING = 2;
// 头结点
volatile SNode head;
static final class SNode {
    volatile SNode next;        // next node in stack
    volatile SNode match;       // the node matched to this
    volatile Thread waiter;     // to control park/unpark
    Object item;                // data; or null for REQUESTs
    int mode;   // 节点类型,对应上述的 REQEUST/DATA/FULFILLING
}

TransferStack 使用 SNode 作为其链表节点。

核心方法

对于 SynchronousQueue 来说,最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

add/remove方法

我们先看 add 方法的实现。

因为 SynchronousQueue 并没有重写 add 方法,所以其会通过模板方法回调子类的 offer 方法,这里我们暂时不深入,放到 offer 方法中一起分析。

我们看看 remove 方法的实现。

public boolean remove(Object o) {
    return false;
}

因为 SynchronousQueue 类设计于数据交换,所以其 remove 方法是无效的,所以总是返回 false。

offer/poll方法

我们先看 offer 方法的实现。

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

可以看到其直接调用了 Transferer 对象的 transfer 方法。

我们继续看 poll 方法的实现。

public E poll() {
    return transferer.transfer(null, true, 0);
}

可以看到 poll 方法也直接调用了 Transferer 对象的 transfer 方法。

put/take方法

我们继续看 put 方法的实现,其也直接调用 Transferer 对象的 transfer 方法。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

与 put 方法一样,take 方法同样调用 Transferer 对象的 transfer 方法。

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

通过上述分析我们可以知道,所以方法的入口其实都在 Transfer 接口的 transfer 方法中。而 Transfer 接口有两个实现,即:TransferQueue 和 TransferStack。

TransferQueue

我们看看 TransferQueue 中的 transfer 方法是如何实现的。

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);
    for (;;) {   
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin
        // 1.如果队列为空或模式相同,那么插入队列
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;
            advanceTail(t, s);              // swing tail and wait 
            Object x = awaitFulfill(s, e, timed, nanos);    
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            // 2.如果走到这里,那说明不为空,并且队头模式不相同,那么进行匹配
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

上面的逻辑比较复杂,这里先简单说一下 TransferQueue 的交换数据逻辑:首先,调用 transfer 方法时会判断队列是否为空或者插入元素与队头元素模式是否一样。如果队列为空或者插入元素与队头元素模式一致,那么就直接插入到链表队列中等待匹配。如果队列不为空并且插入元素与队头元素模式不一致,那么说明队列中已经有可以匹配的线程了,那么进行匹配。

我们先看看第一个逻辑,即队列为空或者模式一样的情况。

// 1.如果队列为空或模式相同,那么插入队列
if (h == t || t.isData == isData) {  
    // 1.1 将元素插入尾节点
    QNode tn = t.next;
    if (t != tail)      // 尾节点变化了,说明有其他线程插入,重来。
        continue;
    if (tn != null) {   // 尾节点的后继节点不为空,说明尾节点不是最新的,重来
        advanceTail(t, tn);
        continue;
    }
    if (timed && nanos <= 0)  // 时间到了,直接返回空。
        return null;
    if (s == null)      // 初始化节点
        s = new QNode(e, isData);
    if (!t.casNext(null, s))  // 将元素插入尾节点,如果失败就重来
        continue;
    advanceTail(t, s);      // 将尾节点设置为最新元素
    // 1.2 等待匹配
    Object x = awaitFulfill(s, e, timed, nanos);    
    if (x == s) {          // x==s 表示 s 节点被取消
        clean(t, s);
        return null;
    }

    if (!s.isOffList()) {           // not already unlinked
        advanceHead(t, s);          // unlink if head
        if (x != null)              // and forget fields
            s.item = s;
        s.waiter = null;
    }
    return (x != null) ? (E)x : e;
}

我们继续看看 awaitFulfill 方法的实现:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋次数。如果s是头结点,那么设置自旋次数,否则自旋次数为0
    // 主要是为了减少无谓的CPU浪费
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    // 不断自旋
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        // 1.1 x != e 表示已经被匹配了,那么直接返回匹配到的值
        if (x != e)
            return x;
        // 如果有时间限制,那么判断是否到期,到期了则取消匹配
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        // 1.1 如果需要自旋,则不断自旋。
        // 否则判断是否有线程在这个节点上等待(s.waiter == null)
        // 如果没有线程等待,就将等待线程设置为当前线程。
        // 如果有线程等待,那么判断是否有时间限制,如果没有,那么挂起线程。
        // 如果有时间限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

从上述代码中我们可以知道其大致逻辑为:首先,如果等待的节点 s 是头结点的话,那么设置自旋次数,否则设置自旋次数为0,即不自旋。这是因为队列头的总是先匹配,所以为了避免无谓的自旋浪费CPU资源,非队列头将直接进入阻塞。

// 自旋次数。如果s是头结点,那么设置自旋次数,否则自旋次数为0
// 主要是为了减少无谓的CPU浪费
int spins = ((head.next == s) ?
         (timed ? maxTimedSpins : maxUntimedSpins) : 0);

接着,会判断是否完成匹配,即 x != e。如果 x != e,那么表示已经完成了匹配或者节点被取消。那么直接返回 s 节点的 item 值,即 x。这里可以说是自旋的唯一出口。

// 1.1 x != e 表示已经被匹配了,那么直接返回匹配到的值
if (x != e)
    return x;

接着, 会判断是否需要自旋。如果不需要自旋,那么判断是否有线程在 s 节点上等待。如果没有,那么设置等待线程为本线程。否则判断是否有超时限制,如果没有,那么直接挂起线程。如果有超时限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒后继续自旋。

// 1.1 如果需要自旋,则不断自旋。
// 否则判断是否有线程在这个节点上等待(s.waiter == null)
// 如果没有线程等待,就将等待线程设置为当前线程。
// 如果有线程等待,那么判断是否有时间限制,如果没有,那么挂起线程。
// 如果有时间限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒
if (spins > 0)
    --spins;
else if (s.waiter == null)
    s.waiter = w;
else if (!timed)
    LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanos);

awaitFulfill 方法分析完后,我们继续回到 transfer 方法。

当线程从 awaitFulfill 中被唤醒,并返回匹配值后。会判断返回值是否与自身相同,如果相同,那么表示该节点被取消,并返回 null。

// 1.2 等待匹配
Object x = awaitFulfill(s, e, timed, nanos);    
if (x == s) {          // x==s 表示 s 节点被取消
    clean(t, s);
    return null;
} 

如果节点顺利匹配,那么继续进入下面的判断。其判断 s 节点是否已经脱离链表,如果还未脱离,那么需要 s 节点脱离。

if (!s.isOffList()) {           // not already unlinked
    advanceHead(t, s);          // unlink if head
    if (x != null)              // and forget fields
        s.item = s;
    s.waiter = null;
}
return (x != null) ? (E)x : e;

我们继续看第二个逻辑,即队列不为空且队列头与插入元素模式不一致。那么说明我们有了可以匹配的节点,于是进入下面的逻辑进行匹配。

// 2.如果走到这里,那说明不为空,并且队头模式不相同,那么进行匹配
QNode m = h.next;               // m是准备匹配的节点
if (t != tail || m == null || h != head)
    continue;                   // 发生了多线程读写,导致数据不一致,重来。
// 2.1 尝试设置m节点的item值
Object x = m.item;
if (isData == (x != null) ||    // m 已经匹配过
    x == m ||                   // x==m 表示 m 节点已经匹配
    !m.casItem(x, e)) {         // 尝试设置e为m的item值
    advanceHead(h, m);          // CAS失败,那么丢弃m节点,获取下个节点尝试
    continue;
}
// 2.2 设置m节点为头结点,如果成功,那么匹配结束
advanceHead(h, m);              // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;

上述代码的大致逻辑为:首先,尝试设置m节点的item值。如果不成功,那么直接丢弃m节点,获取下个节点尝试。如果成功,那么继续设置m节点为头结点。如果设置m节点为头结点成功,那么唤醒等待在m节点上的线程。

经过分析可以知道:QNode 的头结点其实是一个已经匹配完成的节点,而不是正在等待匹配的节点,有点类似于哨兵节点的味道。

TransferStack

我们看看 TransferStack 中的 transfer 方法是如何实现的。

E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;
    
    for (;;) {
        SNode h = head;
        // 1.队列为空或者模式相同
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
        // 2.头结点还未开始匹配
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
        // 3.头结点正在匹配
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

可以看到 TransferStack 的 transfer 方法一共有三个分支,分别是:

  • 第一,队列为空或者插入元素与头结点模式相同。此时,将插入元素插入链表中。
  • 第二,队列不为空且插入元素模式不同,并且头结点还未开始匹配。此时,开始与头结点匹配。
  • 第三,队列不为空且插入元素模式不同,但头结点正在匹配。此时,帮助头结点进行匹配。

我们先看第一个分支:队列为空或者插入元素与头结点模式相同。

// 1.队列为空或者模式相同
if (h == null || h.mode == mode) {  // empty or same-mode
    // 1.1 如果超时了,那么弹出头结点,将后继节点设置为头结点
    if (timed && nanos <= 0) {      // can't wait
        if (h != null && h.isCancelled())
            casHead(h, h.next);     // pop cancelled node
        else
            return null;
    // 1.2 将新插入节点 s 设置为头结点,后继节点为旧的头结点(是栈结构)
    } else if (casHead(h, s = snode(s, e, h, mode))) {
        // 1.3 进入等待
        SNode m = awaitFulfill(s, timed, nanos);
        if (m == s) {               // s 节点被取消
            clean(s);
            return null;
        }
        // 1.4 如果有节点与s节点匹配,那么设置头结点为s的尾节点。
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);     // help s's fulfiller
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
}

上述代码的逻辑大致与 TransferQueue 的相同,但因为其实栈的结构,所以其在插入节点时(即1.2)是将其设置为头结点,而不是设置为尾节点。此外,1.4 中的代码场景是其被唤醒,此时有一个节点在s节点之上,且此节点是head节点。结合这个假设去理解就会很快明白。

接着我们看第二个分支:头结点还未开始匹配。

if (!isFulfilling(h.mode)) { // try to fulfill
    // 2.头结点还未开始匹配
    // 2.1 头结点被取消,那么直接弹出头结点
    if (h.isCancelled())            // already cancelled
        casHead(h, h.next);         // pop and retry
    // 2.2 设置头结点为s节点,并且设置其mode为正在匹配
    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
        for (;;) { // 不断尝试匹配,直到成功匹配或者等待匹配节点消失
            SNode m = s.next;       // m is s's match
            // 2.3 如果s节点的后继节点为空,那么表示其没有可匹配的节点了
            // 于是直接退出循环,重新来。
            if (m == null) {        // all waiters are gone
                casHead(s, null);   // pop fulfill node
                s = null;           // use new node next time
                break;              // restart main loop
            }
            SNode mn = m.next;
            // 2.4 尝试用m节点匹配s节点,即将m节点的match属性设为s节点
            if (m.tryMatch(s)) {
                casHead(s, mn);     // pop both s and m
                return (E) ((mode == REQUEST) ? m.item : s.item);
            } else                  // lost match
                s.casNext(m, mn);   // help unlink
        }
    }
}

接下来我们看看第三个分支:队列不为空且插入元素模式不同,但头结点正在匹配。此时,帮助头结点进行匹配。

SNode m = h.next;               // m is h's match
if (m == null)                  // waiter is gone
    casHead(h, null);           // pop fulfilling node
else {
    SNode mn = m.next;
    if (m.tryMatch(h))          // 匹配成功,那么直接跳过两个节点
        casHead(h, mn);         // pop both h and m
    else                        // 匹配失败,直接丢掉
        h.casNext(m, mn);       // help unlink
}

总结

SynchronousQueue 用于交换数据,但是其本身并不保存数据。其有两种策略,分别是:公平策略和非公平策略。如果是公平策略,那么使用 TransferQueue 来实现,表示先到的先交换数据。如果是非公平策略,那么使用 TransferStack 实现,表示先到的反而后获取数据。

最后,让我们用几个关键词概括 SynchronousQueue:无界队列、链表实现、无锁实现。