LinkedTransferQueue 是在 JDK1.7 时,J.U.C 包新增的一种比较特殊的阻塞队列,它除了具备阻塞队列的常用功能外,还有一个比较特殊的 transfer 方法。
public class LinkedTransferQueue<E> extends AbstractQueue<E>
implements TransferQueue<E>, java.io.Serializable
从其继承结构来看,我们可以看到其实现了 TransferQueue 接口。
public interface TransferQueue<E> extends BlockingQueue<E>{
boolean tryTransfer(E e);
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
}
从 TransferQueue 接口中可以看到其继承了 BlockingQueue 接口,所以是间接实现了 BlockingQueue 接口。在 TransferQueue 接口中多了 3 个方法:
void transfer(E e)
当生产者线程调用 transfer 方法时,如果没有消费者等待接收元素,那么一直阻塞等待到消费者消费后才返回。
tryTransfer(E e)
当生产者线程调用 tryTransfer 方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和 transfer 方法的区别就是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法必须等到消费者消费后才返回。
tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法则是加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回。如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
原理
下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员变量
transient volatile Node head;
private transient volatile Node tail;
private static final int NOW = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer
可以看到其有一个 head 和 tail 的节点元素,指向队列的头尾。除此之外我们看到有 4 个属性(NOW/ASYNC/SYNC/TIMED),它们是 xfer 方法 how 参数的值。
NOW 表示即时操作(可能失败),即不会阻塞调用线程。
poll(获取并移除队首元素,如果队列为空,直接返回null);tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者,则立即返回false,也不会将元素入队)
ASYNC 表示异步操作(必然成功)
offer(插入指定元素至队尾,由于是无界队列,所以会立即返回true);put(插入指定元素至队尾,由于是无界队列,所以会立即返回);add(插入指定元素至队尾,由于是无界队列,所以会立即返回true)。
SYNC 表示同步操作(阻塞调用线程)
transfer(阻塞直到出现一个消费者线程);take(从队首移除一个元素,如果队列为空,则阻塞线程)
TIMED表示限时同步操作(限时阻塞调用线程)
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
而队列节点为一个单向链表,我们可以看下 Node 节点的声明。
static final class Node {
final boolean isData; // false if this is a request node
volatile Object item;
volatile Node next;
volatile Thread waiter;
}
关于Node结点,有以下几点需要特别注意:
- Node结点有两种类型:数据结点、请求结点,通过字段isData区分,只有不同类型的结点才能相互匹配。
- Node结点的值保存在item字段,匹配前后值会发生变化;
Node结点的状态变化如下表:
节点/状态 | 数据节点 | 请求节点 |
---|---|---|
匹配前 | isData = true; item = 数据结点值 | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
从上表也可以看出,对于一个数据结点,当item == null表示匹配成功;对于一个请求结点,当item == this表示匹配成功。归纳起来,匹配成功的结点Node就是满足(Node.item == this) || ((Node.item == null) == Node.isData)。
核心方法
对于 LinkedTransferQueue 来说,最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
add/remove方法
add 方法调用 xfer 实现,我们后续分析 xfer 方法时再详细讲解。
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
我们继续看看 remove 方法的实现。
public boolean remove(Object o) {
return findAndRemove(o);
}
在看上面的 findAndRemove 方法之前,我们应该先了解一下背景。LinkedTransferQueue 是一个交换数据的阻塞队列,与 SynchronousQueue 一样,其每次都会将等待获取或插入数据的线程排列在队列中。因此在队列中,只可能出现一种模式的节点(即REQUEST或DATA)。
我们继续看看 findAndRemove 方法的实现。
private boolean findAndRemove(Object e) {
if (e != null) {
// 1.从头开始遍历,寻找值为e的数据节点
for (Node pred = null, p = head; p != null; ) {
Object item = p.item;
// 2.如果头结点是数据节点,那么比较p节点item值与e节点是否一致
// 如果一致,那么直接移出队列。
if (p.isData) {
if (item != null && item != p && e.equals(item) &&
p.tryMatchData()) {
unsplice(pred, p);
return true;
}
}
// 3.如果头结点是请求节点,并且item值为空
// 那么表示整个队列都是请求节点,那么不需要比较了,直接返回false
else if (item == null)
break;
// 4.如果头结点是请求节点,但是item值不为空
// 那么表示头结点是刚刚完成匹配,那么继续寻找头结点的后继节点比较
pred = p;
if ((p = p.next) == pred) { // stale
// 如果 p = p.next == pred
// 就是表示 p 节点指向了自己,这种情况下是 p 节点被移除
// 那么就直接初始化 pred 和 p 节点,p 节点直接指向头结点
// 重新开始比较
pred = null;
p = head;
}
}
}
return false;
}
offer/poll方法
offer 方法调用 xfer 方法实现。
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
poll 方法同样调用 xfer 方法实现。
public E poll() {
return xfer(null, false, NOW, 0);
}
put/take方法
put 方法调用 xfer 方法实现。
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
take 方法调用 xfer 方法实现。
public E take() throws InterruptedException {
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
那么下一步我们直接分析 xfer 方法。
xfer方法
private E xfer(E e, boolean haveData, int how, long nanos) {
if (haveData && (e == null))
throw new NullPointerException();
Node s = null;
retry:
for (;;) { // restart on append race
// 1.进行匹配
for (Node h = head, p = h; p != null;) {
boolean isData = p.isData;
Object item = p.item;
// 1.1 从头结点开始,寻找未匹配的节点。
// 如果节点已经匹配,那么直接跳过该节点。
if (item != p && (item != null) == isData) { // unmatched
// 1.2 p节点未匹配,那么开始进行匹配。
if (isData == haveData) // can't match
break;
// 1.3 尝试将将p节点与e节点匹配,如果匹配失败,那么直接跳过
if (p.casItem(item, e)) { // match
// 1.4 匹配成功,那么判断是否需要修改头结点
// 这里的 q != h 表示其并不是第一次就找到了未匹配的节点
// 因为如果第一次就找到了匹配的点,那么 q=p=h=head
// 正是 LinkedTransferQueue 并不需要每次都改变
// 头结点的位置,而是让至少将 slack 保持在 2 之内
// 所以才需要做这么一个判断
for (Node q = p; q != h;) {
Node n = q.next;
// 1.5 head == h 表示是否有其他线程修改过 head 节点
// 如果head != h,那么表示有其他线程修改过 head 节点
// 那么可能其他线程已经设置好head节点,slack<2了。
if (head == h && casHead(h, n == null ? q : n))
{
h.forgetNext();
break;
} // advance and retry
// 这里是判断头结点的slack是否在2之内
// 如果是,那么直接跳过,否则重新判断。
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 1.6 唤醒等待在p节点上的线程
LockSupport.unpark(p.waiter);
return LinkedTransferQueue.<E>cast(item);
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 2.插入队列
if (how != NOW) { // No matches available
// 2.1 如果不要求立即返回,那么将其放入队列尾部
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
// 2.2 如果是SYNC或TIMED,即要求同步或超时返回,那么就阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
对比
对比 LinkedTransferQueue 与 SynchronousQueue 的实现,我们会发现许多地方都很相似。它们都是首先判断队列是否为空或插入元素与队头元素是否一致,如果是的话就插入队列。否则寻找未匹配的节点,与他们进行匹配。
但 LinkedTransferQueue 与 SynchronousQueue 的不同点在于,LinkedTransferQueue 的实现是公平模式,即先到先得。而 SynchronousQueue 则是有两种模式,即公平和非公平,分别对应 TransferQueue 和 TransferStack。
此外,为了节省 CAS 操作的开销,LinkedTransferQueue 使用了松弛(slack)操作:
在结点被匹配(被删除)之后,不会立即更新队列的head、tail,而是当 head、tail结点与最近一个未匹配的结点之间的距离超过“松弛阀值”后才会更新(默认为 2)。这个「松弛阀值」一般为1到3,如果太大会增加沿链表查找未匹配结点的时间,太小会增加 CAS 的开销。
查了不少资料,有许多博客都说:SynchronousQueue 是没有大小的,而 LinkedTransferQueue 则是有大小的,能存储节点数据。但实际上无论是 SynchronousQueue 还是 LinkedTransferQueue 的节点类中,都有 item 属性来存储节点值。所以说,为什么 SynchronousQueue 是没有大小的,而 LinkedTransferQueue 则是有大小的呢?
感兴趣的朋友可以思考一下。
总结
LinkedTransferQueue 的实现思路基本上与 SynchronousQueue 非常一致,就连设计思路都非常一致。但是它们最大的区别是什么呢?这个问题需要留给我们思考一下。
最后,让我们用几个关键词概括 LinkedTransferQueue:无界队列、链表实现、无锁实现。