阻塞队列源码系列(八):LinkedTransferQueue

Posted by 陈树义 on 2021-06-20

LinkedTransferQueue 是在 JDK1.7 时,J.U.C 包新增的一种比较特殊的阻塞队列,它除了具备阻塞队列的常用功能外,还有一个比较特殊的 transfer 方法。

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable

从其继承结构来看,我们可以看到其实现了 TransferQueue 接口。

public interface TransferQueue<E> extends BlockingQueue<E>{
    boolean tryTransfer(E e);
    void transfer(E e) throws InterruptedException;
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
}

从 TransferQueue 接口中可以看到其继承了 BlockingQueue 接口,所以是间接实现了 BlockingQueue 接口。在 TransferQueue 接口中多了 3 个方法:

void transfer(E e)

当生产者线程调用 transfer 方法时,如果没有消费者等待接收元素,那么一直阻塞等待到消费者消费后才返回。

tryTransfer(E e)

当生产者线程调用 tryTransfer 方法时,如果没有消费者等待接收元素,则会立即返回false。该方法和 transfer 方法的区别就是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法必须等到消费者消费后才返回。

tryTransfer(E e, long timeout, TimeUnit unit)

tryTransfer(E e,long timeout,TimeUnit unit)方法则是加上了限时等待功能,如果没有消费者消费该元素,则等待指定的时间再返回。如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

原理

下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员变量

transient volatile Node head;
private transient volatile Node tail;
private static final int NOW   = 0; // for untimed poll, tryTransfer
private static final int ASYNC = 1; // for offer, put, add
private static final int SYNC  = 2; // for transfer, take
private static final int TIMED = 3; // for timed poll, tryTransfer

可以看到其有一个 head 和 tail 的节点元素,指向队列的头尾。除此之外我们看到有 4 个属性(NOW/ASYNC/SYNC/TIMED),它们是 xfer 方法 how 参数的值。

NOW 表示即时操作(可能失败),即不会阻塞调用线程。

poll(获取并移除队首元素,如果队列为空,直接返回null);tryTransfer(尝试将元素传递给消费者,如果没有等待的消费者,则立即返回false,也不会将元素入队)

ASYNC 表示异步操作(必然成功)

offer(插入指定元素至队尾,由于是无界队列,所以会立即返回true);put(插入指定元素至队尾,由于是无界队列,所以会立即返回);add(插入指定元素至队尾,由于是无界队列,所以会立即返回true)。

SYNC 表示同步操作(阻塞调用线程)

transfer(阻塞直到出现一个消费者线程);take(从队首移除一个元素,如果队列为空,则阻塞线程)

TIMED表示限时同步操作(限时阻塞调用线程)

poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)

而队列节点为一个单向链表,我们可以看下 Node 节点的声明。

static final class Node {
    final boolean isData;   // false if this is a request node
    volatile Object item;       
    volatile Node next;
    volatile Thread waiter;
}

关于Node结点,有以下几点需要特别注意:

  • Node结点有两种类型:数据结点、请求结点,通过字段isData区分,只有不同类型的结点才能相互匹配。
  • Node结点的值保存在item字段,匹配前后值会发生变化;

Node结点的状态变化如下表:

节点/状态数据节点请求节点
匹配前isData = true; item = 数据结点值isData = false; item = null
匹配后isData = true; item = nullisData = false; item = this

从上表也可以看出,对于一个数据结点,当item == null表示匹配成功;对于一个请求结点,当item == this表示匹配成功。归纳起来,匹配成功的结点Node就是满足(Node.item == this) || ((Node.item == null) == Node.isData)。

核心方法

对于 LinkedTransferQueue 来说,最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

add/remove方法

add 方法调用 xfer 实现,我们后续分析 xfer 方法时再详细讲解。

public boolean add(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

我们继续看看 remove 方法的实现。

public boolean remove(Object o) {
    return findAndRemove(o);
}

在看上面的 findAndRemove 方法之前,我们应该先了解一下背景。LinkedTransferQueue 是一个交换数据的阻塞队列,与 SynchronousQueue 一样,其每次都会将等待获取或插入数据的线程排列在队列中。因此在队列中,只可能出现一种模式的节点(即REQUEST或DATA)。

我们继续看看 findAndRemove 方法的实现。

private boolean findAndRemove(Object e) {
    if (e != null) {
        // 1.从头开始遍历,寻找值为e的数据节点
        for (Node pred = null, p = head; p != null; ) {
            Object item = p.item; 
            // 2.如果头结点是数据节点,那么比较p节点item值与e节点是否一致
            // 如果一致,那么直接移出队列。
            if (p.isData) {
                if (item != null && item != p && e.equals(item) &&
                    p.tryMatchData()) {
                    unsplice(pred, p);
                    return true;
                }
            }
            // 3.如果头结点是请求节点,并且item值为空
            // 那么表示整个队列都是请求节点,那么不需要比较了,直接返回false
            else if (item == null)
                break;
            // 4.如果头结点是请求节点,但是item值不为空
            // 那么表示头结点是刚刚完成匹配,那么继续寻找头结点的后继节点比较
            pred = p;
            if ((p = p.next) == pred) { // stale
                // 如果 p = p.next == pred
                // 就是表示 p 节点指向了自己,这种情况下是 p 节点被移除
                // 那么就直接初始化 pred 和 p 节点,p 节点直接指向头结点
                // 重新开始比较
                pred = null;
                p = head;
            }
        }
    }
    return false;
}

offer/poll方法

offer 方法调用 xfer 方法实现。

public boolean offer(E e) {
    xfer(e, true, ASYNC, 0);
    return true;
}

poll 方法同样调用 xfer 方法实现。

public E poll() {
    return xfer(null, false, NOW, 0);
}

put/take方法

put 方法调用 xfer 方法实现。

public void put(E e) {
    xfer(e, true, ASYNC, 0);
}

take 方法调用 xfer 方法实现。

public E take() throws InterruptedException {
    E e = xfer(null, false, SYNC, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

那么下一步我们直接分析 xfer 方法。

xfer方法

private E xfer(E e, boolean haveData, int how, long nanos) { 
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;      

    retry:
    for (;;) {                            // restart on append race
        // 1.进行匹配
        for (Node h = head, p = h; p != null;) {             
            boolean isData = p.isData;
            Object item = p.item;
            // 1.1 从头结点开始,寻找未匹配的节点。
            // 如果节点已经匹配,那么直接跳过该节点。
            if (item != p && (item != null) == isData) { // unmatched
                // 1.2 p节点未匹配,那么开始进行匹配。
                if (isData == haveData)   // can't match
                    break;
                // 1.3 尝试将将p节点与e节点匹配,如果匹配失败,那么直接跳过
                if (p.casItem(item, e)) { // match
                    // 1.4 匹配成功,那么判断是否需要修改头结点
                    // 这里的 q != h 表示其并不是第一次就找到了未匹配的节点
                    // 因为如果第一次就找到了匹配的点,那么 q=p=h=head
                    // 正是 LinkedTransferQueue 并不需要每次都改变
                    // 头结点的位置,而是让至少将 slack 保持在 2 之内
                    // 所以才需要做这么一个判断
                    for (Node q = p; q != h;) {
                        Node n = q.next;  
                        // 1.5 head == h 表示是否有其他线程修改过 head 节点
                        // 如果head != h,那么表示有其他线程修改过 head 节点
                        // 那么可能其他线程已经设置好head节点,slack<2了。
                        if (head == h && casHead(h, n == null ? q : n))             
                        {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        // 这里是判断头结点的slack是否在2之内
                        // 如果是,那么直接跳过,否则重新判断。
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    // 1.6 唤醒等待在p节点上的线程
                    LockSupport.unpark(p.waiter);
                    return LinkedTransferQueue.<E>cast(item);
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }
        // 2.插入队列
        if (how != NOW) {                 // No matches available
            // 2.1 如果不要求立即返回,那么将其放入队列尾部
            if (s == null)
                s = new Node(e, haveData);
            Node pred = tryAppend(s, haveData);
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            // 2.2 如果是SYNC或TIMED,即要求同步或超时返回,那么就阻塞等待
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

对比

对比 LinkedTransferQueue 与 SynchronousQueue 的实现,我们会发现许多地方都很相似。它们都是首先判断队列是否为空或插入元素与队头元素是否一致,如果是的话就插入队列。否则寻找未匹配的节点,与他们进行匹配。

但 LinkedTransferQueue 与 SynchronousQueue 的不同点在于,LinkedTransferQueue 的实现是公平模式,即先到先得。而 SynchronousQueue 则是有两种模式,即公平和非公平,分别对应 TransferQueue 和 TransferStack。

此外,为了节省 CAS 操作的开销,LinkedTransferQueue 使用了松弛(slack)操作:
在结点被匹配(被删除)之后,不会立即更新队列的head、tail,而是当 head、tail结点与最近一个未匹配的结点之间的距离超过“松弛阀值”后才会更新(默认为 2)。这个「松弛阀值」一般为1到3,如果太大会增加沿链表查找未匹配结点的时间,太小会增加 CAS 的开销。

查了不少资料,有许多博客都说:SynchronousQueue 是没有大小的,而 LinkedTransferQueue 则是有大小的,能存储节点数据。但实际上无论是 SynchronousQueue 还是 LinkedTransferQueue 的节点类中,都有 item 属性来存储节点值。所以说,为什么 SynchronousQueue 是没有大小的,而 LinkedTransferQueue 则是有大小的呢?

感兴趣的朋友可以思考一下。

总结

LinkedTransferQueue 的实现思路基本上与 SynchronousQueue 非常一致,就连设计思路都非常一致。但是它们最大的区别是什么呢?这个问题需要留给我们思考一下。

最后,让我们用几个关键词概括 LinkedTransferQueue:无界队列、链表实现、无锁实现。