阻塞队列源码系列(六):DelayedWorkQueue

Posted by 陈树义 on 2021-06-20

DelayedWorkQueue 是 ScheduledThreadPoolExecutor 中的内部类,它与 DelayQueue 一样继承了 AbstractQueue 接口,实现了 BlockingQueue 接口。

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable>

其与 DelayQueue 区别不大,只不过队列元素是 RunnableScheduledFuture。所以对于 DelayedWorkQueue 来说,它要求所有入队元素都实现 RunnableScheduledFuture 接口。

private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

原理

要了解 DelayedWorkQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员变量

private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;

可以看到 DelayedWorkQueue 类中数组来存储数据,并且有初始化为 16 的容量大小属性,并且使用 ReentrantLock 来实现并发控制。

构造方法

对于 DelayedWorkQueue 来说,其只有默认的空构造方法。

核心方法

对于 DelayedWorkQueue 来说,最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

add/remove方法

我们先看 add 方法的实现。

public boolean add(Runnable e) {
    return offer(e);
}

可以看到其直接调用了 offer 方法的实现,这里我们暂时不深入,等到 offer 方法中一并分析。

我们继续看 remove 方法的实现。

public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 1.获取元素下标
        int i = indexOf(x);
        if (i < 0)
            return false;
        setIndex(queue[i], -1);
        int s = --size;
        // 2.清除元素信息
        RunnableScheduledFuture<?> replacement = queue[s];
        queue[s] = null;
        // 3.维护二叉树结构
        if (s != i) {
            siftDown(i, replacement);
            if (queue[i] == replacement)
                siftUp(i, replacement);
        }
        return true;
    } finally {
        lock.unlock();
    }
}

上述代码的大致逻辑为:首先获取元素的下标,之后清除最后一个元素的值,接着拿最后一个元素插入移除元素的位置,最后调用 siftDown、siftUp 方法维护二叉树的结构。

我们继续看看 siftDown 方法。

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

这里的下沉方法与我们之前讲的 PriorityBlockingQueue 类似,这里不再赘述。

我们继续看看 siftUp 方法。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

同样地,这里的 siftUp 方法与之前在 PriorityBlockingQueue 类似,不再赘述。

offer/poll方法

我们看一下 offer 方法的实现。

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        // 1.队列已满则扩容
        if (i >= queue.length)
            grow();
        size = i + 1;
        // 2.如果插入的元素不是第一个元素,那么进行上浮操作。
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        // 3.如果插入的元素是第一个元素,那么唤醒其他等待线程
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

上述代码的大致逻辑为:首先,判断队列是否已满。如果队列已满,那么进行扩容。

private void grow() {
    int oldCapacity = queue.length;
    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
    if (newCapacity < 0) // overflow
        newCapacity = Integer.MAX_VALUE;
    queue = Arrays.copyOf(queue, newCapacity);
}

从上述扩容代码可以看出,一般情况下队列会以 50% 的增速扩容。

接着会判断插入的元素是否是第一个元素。如果是,那么直接跳过。否则,需要进行上浮操作,维护二叉树的结构。

// 2.如果插入的元素不是第一个元素,那么进行上浮操作。
if (i == 0) {
    queue[0] = e;
    setIndex(e, 0);
} else {
    siftUp(i, e);
}

上浮操作我们上面已经说到,这里不再赘述。

最后如果插入的元素是第一个元素,那么说明可能会有其他线程在等待获取元素,那么需要将 leader 置为 null,并唤醒其他线程。

// 3.如果插入的元素是第一个元素,那么唤醒其他等待线程
if (queue[0] == e) {
    leader = null;
    available.signal();
}

我们继续看看 poll 方法的实现。

public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        RunnableScheduledFuture<?> first = queue[0];
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return finishPoll(first);
    } finally {
        lock.unlock();
    }
}

从上述代码可以看到,其与 DelayQueue 的实现几乎一致。首先都是判断队头是否为空或者队头元素是否未到期,如果任何一个为真,那么直接返回 null。否则,调用 finishPoll 方法弹出元素。

private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    int s = --size;
    RunnableScheduledFuture<?> x = queue[s];
    queue[s] = null;
    if (s != 0)
        siftDown(0, x);
    setIndex(f, -1);
    return f;
}

在 finishPoll 方法中,我们可以看到其逻辑大致是:拿最后一个元素替代队头元素,并进行二叉树结构的维护(下沉),最后返回队头元素。

put/take方法

我们先看看 put 方法的实现。

public void put(Runnable e) {
    offer(e);
}

可以看到其直接调用 offer 方法的实现,这里就不再赘述了。

继续看看 take 方法的实现。

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            // 1.如果队列为空,那么阻塞等待
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 2.如果队头已经到期,那么直接弹出
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                // 3.如果队头未到期,而又有其他线程在等待,那么阻塞等待
                if (leader != null)
                    available.await();
                else {
                    // 4.如果没有其他线程等待,那么阻塞指定秒数后开始自旋
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 5.如果没有线程在获取元素,而此时队列又不为空,那么唤醒可能在等待的线程
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

上述代码的逻辑与 DelayedQueue 的 take 方法完全一致,这里不再赘述。

总结

DelayedWorkQueue 类其实一个特殊的 DelayQueue 类,其唯一的不同是:DelayQueue 内部用 PriorityQueue 来维护元素的二叉树结构。而 DelayedWorkQueue 因为是专为线程池设计,所以其内部用 RunnableScheduledFuture 数组重新实现了一遍二叉树结构。

最后,让我们用几个关键词概括 DelayedWorkQueue:伪无界队列、数组实现、单锁实现、优先级队列。