DelayedWorkQueue 是 ScheduledThreadPoolExecutor 中的内部类,它与 DelayQueue 一样继承了 AbstractQueue 接口,实现了 BlockingQueue 接口。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable>
其与 DelayQueue 区别不大,只不过队列元素是 RunnableScheduledFuture。所以对于 DelayedWorkQueue 来说,它要求所有入队元素都实现 RunnableScheduledFuture 接口。
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
原理
要了解 DelayedWorkQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员变量
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
可以看到 DelayedWorkQueue 类中数组来存储数据,并且有初始化为 16 的容量大小属性,并且使用 ReentrantLock 来实现并发控制。
构造方法
对于 DelayedWorkQueue 来说,其只有默认的空构造方法。
核心方法
对于 DelayedWorkQueue 来说,最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
add/remove方法
我们先看 add 方法的实现。
public boolean add(Runnable e) {
return offer(e);
}
可以看到其直接调用了 offer 方法的实现,这里我们暂时不深入,等到 offer 方法中一并分析。
我们继续看 remove 方法的实现。
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 1.获取元素下标
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
// 2.清除元素信息
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 3.维护二叉树结构
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
上述代码的大致逻辑为:首先获取元素的下标,之后清除最后一个元素的值,接着拿最后一个元素插入移除元素的位置,最后调用 siftDown、siftUp 方法维护二叉树的结构。
我们继续看看 siftDown 方法。
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
这里的下沉方法与我们之前讲的 PriorityBlockingQueue 类似,这里不再赘述。
我们继续看看 siftUp 方法。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
同样地,这里的 siftUp 方法与之前在 PriorityBlockingQueue 类似,不再赘述。
offer/poll方法
我们看一下 offer 方法的实现。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 1.队列已满则扩容
if (i >= queue.length)
grow();
size = i + 1;
// 2.如果插入的元素不是第一个元素,那么进行上浮操作。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
// 3.如果插入的元素是第一个元素,那么唤醒其他等待线程
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
上述代码的大致逻辑为:首先,判断队列是否已满。如果队列已满,那么进行扩容。
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
从上述扩容代码可以看出,一般情况下队列会以 50% 的增速扩容。
接着会判断插入的元素是否是第一个元素。如果是,那么直接跳过。否则,需要进行上浮操作,维护二叉树的结构。
// 2.如果插入的元素不是第一个元素,那么进行上浮操作。
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
上浮操作我们上面已经说到,这里不再赘述。
最后如果插入的元素是第一个元素,那么说明可能会有其他线程在等待获取元素,那么需要将 leader 置为 null,并唤醒其他线程。
// 3.如果插入的元素是第一个元素,那么唤醒其他等待线程
if (queue[0] == e) {
leader = null;
available.signal();
}
我们继续看看 poll 方法的实现。
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
} finally {
lock.unlock();
}
}
从上述代码可以看到,其与 DelayQueue 的实现几乎一致。首先都是判断队头是否为空或者队头元素是否未到期,如果任何一个为真,那么直接返回 null。否则,调用 finishPoll 方法弹出元素。
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
在 finishPoll 方法中,我们可以看到其逻辑大致是:拿最后一个元素替代队头元素,并进行二叉树结构的维护(下沉),最后返回队头元素。
put/take方法
我们先看看 put 方法的实现。
public void put(Runnable e) {
offer(e);
}
可以看到其直接调用 offer 方法的实现,这里就不再赘述了。
继续看看 take 方法的实现。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 1.如果队列为空,那么阻塞等待
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 2.如果队头已经到期,那么直接弹出
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 3.如果队头未到期,而又有其他线程在等待,那么阻塞等待
if (leader != null)
available.await();
else {
// 4.如果没有其他线程等待,那么阻塞指定秒数后开始自旋
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 5.如果没有线程在获取元素,而此时队列又不为空,那么唤醒可能在等待的线程
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
上述代码的逻辑与 DelayedQueue 的 take 方法完全一致,这里不再赘述。
总结
DelayedWorkQueue 类其实一个特殊的 DelayQueue 类,其唯一的不同是:DelayQueue 内部用 PriorityQueue 来维护元素的二叉树结构。而 DelayedWorkQueue 因为是专为线程池设计,所以其内部用 RunnableScheduledFuture 数组重新实现了一遍二叉树结构。
最后,让我们用几个关键词概括 DelayedWorkQueue:伪无界队列、数组实现、单锁实现、优先级队列。