SynchronousQueue 是 JDK 1.5 时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口。
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue 一般用于交换数据,当一个线程插入数据时,必须等待另一个线程插入数据匹配后才返回,否则会一直阻塞等待。
原理
下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员变量
private transient volatile Transferer<E> transferer;
SynchronousQueue 类中的 Transferer 对象是实现数据交换的具体实现。
构造方法
SynchronousQueue 一共有两个构造函数。
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
从第二个构造函数可以知道看出:如果是公平模式的话,那么使用 TransferQueue 实现。如果是非公平模式的话,那么使用 TransferStack 实现。
我们继续看看 TransferQueue 的结构。
/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
static final class QNode {
volatile QNode next; // 下个节点
volatile Object item; // 节点值
volatile Thread waiter; // 等待匹配的线程
final boolean isData; // 是否是生产者
}
可以看出 TransferQueue 是使用 QNode 作为链表节点。
我们继续看看 TransferStack 的结构。
// 请求节点
static final int REQUEST = 0;
// 数据节点
static final int DATA = 1;
// 正在匹配节点
static final int FULFILLING = 2;
// 头结点
volatile SNode head;
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode; // 节点类型,对应上述的 REQEUST/DATA/FULFILLING
}
TransferStack 使用 SNode 作为其链表节点。
核心方法
对于 SynchronousQueue 来说,最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
add/remove方法
我们先看 add 方法的实现。
因为 SynchronousQueue 并没有重写 add 方法,所以其会通过模板方法回调子类的 offer 方法,这里我们暂时不深入,放到 offer 方法中一起分析。
我们看看 remove 方法的实现。
public boolean remove(Object o) {
return false;
}
因为 SynchronousQueue 类设计于数据交换,所以其 remove 方法是无效的,所以总是返回 false。
offer/poll方法
我们先看 offer 方法的实现。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
可以看到其直接调用了 Transferer 对象的 transfer 方法。
我们继续看 poll 方法的实现。
public E poll() {
return transferer.transfer(null, true, 0);
}
可以看到 poll 方法也直接调用了 Transferer 对象的 transfer 方法。
put/take方法
我们继续看 put 方法的实现,其也直接调用 Transferer 对象的 transfer 方法。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
与 put 方法一样,take 方法同样调用 Transferer 对象的 transfer 方法。
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
通过上述分析我们可以知道,所以方法的入口其实都在 Transfer 接口的 transfer 方法中。而 Transfer 接口有两个实现,即:TransferQueue 和 TransferStack。
TransferQueue
我们看看 TransferQueue 中的 transfer 方法是如何实现的。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 1.如果队列为空或模式相同,那么插入队列
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
if (t != tail) // inconsistent read
continue;
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // can't wait
return null;
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // failed to link in
continue;
advanceTail(t, s); // swing tail and wait
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
} else { // complementary-mode
// 2.如果走到这里,那说明不为空,并且队头模式不相同,那么进行匹配
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
上面的逻辑比较复杂,这里先简单说一下 TransferQueue 的交换数据逻辑:首先,调用 transfer 方法时会判断队列是否为空或者插入元素与队头元素模式是否一样。如果队列为空或者插入元素与队头元素模式一致,那么就直接插入到链表队列中等待匹配。如果队列不为空并且插入元素与队头元素模式不一致,那么说明队列中已经有可以匹配的线程了,那么进行匹配。
我们先看看第一个逻辑,即队列为空或者模式一样的情况。
// 1.如果队列为空或模式相同,那么插入队列
if (h == t || t.isData == isData) {
// 1.1 将元素插入尾节点
QNode tn = t.next;
if (t != tail) // 尾节点变化了,说明有其他线程插入,重来。
continue;
if (tn != null) { // 尾节点的后继节点不为空,说明尾节点不是最新的,重来
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 时间到了,直接返回空。
return null;
if (s == null) // 初始化节点
s = new QNode(e, isData);
if (!t.casNext(null, s)) // 将元素插入尾节点,如果失败就重来
continue;
advanceTail(t, s); // 将尾节点设置为最新元素
// 1.2 等待匹配
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // x==s 表示 s 节点被取消
clean(t, s);
return null;
}
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
我们继续看看 awaitFulfill 方法的实现:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数。如果s是头结点,那么设置自旋次数,否则自旋次数为0
// 主要是为了减少无谓的CPU浪费
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 不断自旋
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
// 1.1 x != e 表示已经被匹配了,那么直接返回匹配到的值
if (x != e)
return x;
// 如果有时间限制,那么判断是否到期,到期了则取消匹配
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 1.1 如果需要自旋,则不断自旋。
// 否则判断是否有线程在这个节点上等待(s.waiter == null)
// 如果没有线程等待,就将等待线程设置为当前线程。
// 如果有线程等待,那么判断是否有时间限制,如果没有,那么挂起线程。
// 如果有时间限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
从上述代码中我们可以知道其大致逻辑为:首先,如果等待的节点 s 是头结点的话,那么设置自旋次数,否则设置自旋次数为0,即不自旋。这是因为队列头的总是先匹配,所以为了避免无谓的自旋浪费CPU资源,非队列头将直接进入阻塞。
// 自旋次数。如果s是头结点,那么设置自旋次数,否则自旋次数为0
// 主要是为了减少无谓的CPU浪费
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
接着,会判断是否完成匹配,即 x != e。如果 x != e,那么表示已经完成了匹配或者节点被取消。那么直接返回 s 节点的 item 值,即 x。这里可以说是自旋的唯一出口。
// 1.1 x != e 表示已经被匹配了,那么直接返回匹配到的值
if (x != e)
return x;
接着, 会判断是否需要自旋。如果不需要自旋,那么判断是否有线程在 s 节点上等待。如果没有,那么设置等待线程为本线程。否则判断是否有超时限制,如果没有,那么直接挂起线程。如果有超时限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒后继续自旋。
// 1.1 如果需要自旋,则不断自旋。
// 否则判断是否有线程在这个节点上等待(s.waiter == null)
// 如果没有线程等待,就将等待线程设置为当前线程。
// 如果有线程等待,那么判断是否有时间限制,如果没有,那么挂起线程。
// 如果有时间限制且超时时间大于spinForTimeoutThreshold,那么阻塞nano秒
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
awaitFulfill 方法分析完后,我们继续回到 transfer 方法。
当线程从 awaitFulfill 中被唤醒,并返回匹配值后。会判断返回值是否与自身相同,如果相同,那么表示该节点被取消,并返回 null。
// 1.2 等待匹配
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // x==s 表示 s 节点被取消
clean(t, s);
return null;
}
如果节点顺利匹配,那么继续进入下面的判断。其判断 s 节点是否已经脱离链表,如果还未脱离,那么需要 s 节点脱离。
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
我们继续看第二个逻辑,即队列不为空且队列头与插入元素模式不一致。那么说明我们有了可以匹配的节点,于是进入下面的逻辑进行匹配。
// 2.如果走到这里,那说明不为空,并且队头模式不相同,那么进行匹配
QNode m = h.next; // m是准备匹配的节点
if (t != tail || m == null || h != head)
continue; // 发生了多线程读写,导致数据不一致,重来。
// 2.1 尝试设置m节点的item值
Object x = m.item;
if (isData == (x != null) || // m 已经匹配过
x == m || // x==m 表示 m 节点已经匹配
!m.casItem(x, e)) { // 尝试设置e为m的item值
advanceHead(h, m); // CAS失败,那么丢弃m节点,获取下个节点尝试
continue;
}
// 2.2 设置m节点为头结点,如果成功,那么匹配结束
advanceHead(h, m); // successfully fulfilled
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
上述代码的大致逻辑为:首先,尝试设置m节点的item值。如果不成功,那么直接丢弃m节点,获取下个节点尝试。如果成功,那么继续设置m节点为头结点。如果设置m节点为头结点成功,那么唤醒等待在m节点上的线程。
经过分析可以知道:QNode 的头结点其实是一个已经匹配完成的节点,而不是正在等待匹配的节点,有点类似于哨兵节点的味道。
TransferStack
我们看看 TransferStack 中的 transfer 方法是如何实现的。
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 1.队列为空或者模式相同
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
// 2.头结点还未开始匹配
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
// 3.头结点正在匹配
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
可以看到 TransferStack 的 transfer 方法一共有三个分支,分别是:
- 第一,队列为空或者插入元素与头结点模式相同。此时,将插入元素插入链表中。
- 第二,队列不为空且插入元素模式不同,并且头结点还未开始匹配。此时,开始与头结点匹配。
- 第三,队列不为空且插入元素模式不同,但头结点正在匹配。此时,帮助头结点进行匹配。
我们先看第一个分支:队列为空或者插入元素与头结点模式相同。
// 1.队列为空或者模式相同
if (h == null || h.mode == mode) { // empty or same-mode
// 1.1 如果超时了,那么弹出头结点,将后继节点设置为头结点
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 1.2 将新插入节点 s 设置为头结点,后继节点为旧的头结点(是栈结构)
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 1.3 进入等待
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // s 节点被取消
clean(s);
return null;
}
// 1.4 如果有节点与s节点匹配,那么设置头结点为s的尾节点。
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
上述代码的逻辑大致与 TransferQueue 的相同,但因为其实栈的结构,所以其在插入节点时(即1.2)是将其设置为头结点,而不是设置为尾节点。此外,1.4 中的代码场景是其被唤醒,此时有一个节点在s节点之上,且此节点是head节点。结合这个假设去理解就会很快明白。
接着我们看第二个分支:头结点还未开始匹配。
if (!isFulfilling(h.mode)) { // try to fulfill
// 2.头结点还未开始匹配
// 2.1 头结点被取消,那么直接弹出头结点
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
// 2.2 设置头结点为s节点,并且设置其mode为正在匹配
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // 不断尝试匹配,直到成功匹配或者等待匹配节点消失
SNode m = s.next; // m is s's match
// 2.3 如果s节点的后继节点为空,那么表示其没有可匹配的节点了
// 于是直接退出循环,重新来。
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 2.4 尝试用m节点匹配s节点,即将m节点的match属性设为s节点
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
}
接下来我们看看第三个分支:队列不为空且插入元素模式不同,但头结点正在匹配。此时,帮助头结点进行匹配。
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // 匹配成功,那么直接跳过两个节点
casHead(h, mn); // pop both h and m
else // 匹配失败,直接丢掉
h.casNext(m, mn); // help unlink
}
总结
SynchronousQueue 用于交换数据,但是其本身并不保存数据。其有两种策略,分别是:公平策略和非公平策略。如果是公平策略,那么使用 TransferQueue 来实现,表示先到的先交换数据。如果是非公平策略,那么使用 TransferStack 实现,表示先到的反而后获取数据。
最后,让我们用几个关键词概括 SynchronousQueue:无界队列、链表实现、无锁实现。