DelayQueue 是 JDK 1.5 时,随着 J.U.C 包一起引入的一种阻塞队列,它实现了 BlockingQueue 接口,底层基于已有的 PriorityBlockingQueue 实现:
DelayQueue 也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue 中的所有元素必须实现 Delayed 接口:
/**
* 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
* <p>
* 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
*/
public interface Delayed extends Comparable<Delayed> {
/**
* 返回与此对象相关的剩余有效时间,以给定的时间单位表示.
*/
long getDelay(TimeUnit unit);
}
可以看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。 getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了能够比较两个对象,以便排序。
也就是说,如果一个类实现了 Delayed 接口,当创建该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。
特性
DelayQueue的特点简要概括如下:
- DelayQueue 使用 PriorityBlockingQueue 实现,所以其本质上还是无界阻塞队列。
- 队列中的元素必须实现 Delayed 接口,元素过期后才会从队列中取走。
- 由于 DelayQueue 是按照元素的权重进入排序,所以队列中的元素必须是可以比较的,也就是说元素必须实现 Comparable 接口。
示例
为了理解 DelayQueue 的功能,我们用一个例子来看看 DelayQueue 是如何使用的。
首先,定义一个 DelayNode 类,作为队列元素,该类必须实现 Delayed 接口。
class DelayNode implements Delayed{
private Integer seq;
private long expireTime;
public DelayNode(long expireTime, Integer seq) {
this.expireTime = expireTime;
this.seq = seq;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - new Date().getTime(), TimeUnit.MILLISECONDS);
}
/**
* 按照序号优先级排序
* @param other
* @return
*/
@Override
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof DelayNode) {
DelayNode x = (DelayNode) other;
if (this.getSeq() > x.getSeq()) {
return 1;
} else if (this.getSeq() == x.getSeq()) {
return 0;
} else if (this.getSeq() < x.getSeq()) {
return -1;
}
}
// 一般不会执行到这里
System.out.println("error.");
return 0;
}
public Integer getSeq() {
return seq;
}
@Override
public String toString() {
return "DelayNode{" +
"expireTime=" + expireTime +
", seq=" + seq +
'}';
}
}
在实现 Delayed 接口的 getDelay 接口时要注意:必须要将失效时间转成传入的时间单位,即要用 unit.convert() 方法进行一次转化,否则会出现不可预见的错误。
接着我们写一个简单的例子来测试 DelayQueue 的使用。
public class DelayQueueDemo {
public static void main(String[] args) throws Exception{
putTakeDemo();
}
public static void putTakeDemo() throws Exception{
BlockingQueue delayQueue = new DelayQueue<>();
long now = new Date().getTime();
delayQueue.put(new DelayNode(now+3000, 5));
delayQueue.put(new DelayNode(now+4000, 1));
delayQueue.put(new DelayNode(now+6000, 14));
delayQueue.put(new DelayNode(now+8000, 8));
System.out.println(delayQueue);
for(int i=0; i<4; i++) {
System.out.println(delayQueue.take());
}
}
}
在上面的例子,我们可以看到我们插入了 4 个元素,并且设置了不同的序号。因为我们 DelayNode 中 comparable 的实现是根据序号大小,所以无论过期时间设置成多少。其元素的输出顺序永远都是:1、5、8、14。下面是其中一次的输出结果:
DelayNode{expireTime=1553212254636, seq=1}
DelayNode{expireTime=1553212253636, seq=5}
DelayNode{expireTime=1553212258636, seq=8}
DelayNode{expireTime=1553212256636, seq=14}
原理
要了解 DelayQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员变量
private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
可以看到 DelayQueue 类中使用了 PriorityQueue 来保存数据,使用 ReentrantLock 来实现并发控制。
构造方法
了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 PriorityBlockingQueue 来说,其有两个构造方法:
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
上述方法也比较简单,这里不深入介绍。
核心方法
在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
add/remove方法
我们先看看 add 方法的实现:
public boolean add(E e) {
return offer(e);
}
可以看到其直接调用了 offer 方法的实现,我们一起留在后面看。
我们继续看 remove 方法的实现。
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
可以看到其直接调用了 PriorityQueue 的 remove 方法将元素移除了。关于 PriorityQueue 的 remove 方法这里就不再深入,有兴趣的可以自己了解。
offer/poll方法
我们看一下 offer 方法的实现。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 如果入队元素在队首, 则唤醒一个出队线程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
可以看到其获取锁之后,直接调用了 PriorityQueue 的 offer 方法将其入列。之后判断队列头是否是插入的元素?如果插入的元素就在队头,那么表示插入前队列为空,此时可能有出队线程在等待获取元素了。那么将 leader 属性设置为空,并且唤醒其他等待获取元素的线程。
我们继续看下 poll 方法的实现。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
可以看到其在获取锁之后,判断队头元素是否为空或是否到期?如果队头为空或者时间未到,那么直接返回 null,否则直接将元素弹出。
put/take方法
put 方法的实现是调用了 offer 方法的实现,这里不赘述。
public void put(E e) {
offer(e);
}
我们继续看看 take 方法的实现。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 1.获取锁
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
// 2.如果队列为空,那么等待
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 3.如果队列不为空,并且元素过期了,那么直接弹出
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
// 4.如果 leader 不为空,表示有其他线程在等待,那么阻塞等待
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
// 被唤醒后重置leader属性为null
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
上述代码的大致逻辑为:获取锁之后进去自旋循环中。如果队列为空,那么阻塞等待。如果队列不为空,那么继续判断元素是否到期。如果元素到期了,那么直接弹出。如果元素未到期,并且已经有其他线程在等待了,那么直接阻塞等待唤醒,否则将 leader 属性设置为当前线程并阻塞等待至元素过期。
从这块的代码我们可以看出 leader 属性的作用。其实它的存在是为了避免高并发时自旋浪费 CPU 资源。试想一下,如果现在已经有线程在自旋进行入列或出列操作了,那么这时候再继续进行自旋只会增加 CPU 消耗,所以最好的办法是等待该线程完成之后再进行操作,这样可以减少竞争,提高效率。
在上面代码中,线程自旋结束后一段代码:
if (leader == null && q.peek() != null)
available.signal();
其中 leader == null && q.peek() != null
的含义是:没有 leader 线程但队列中存在元素,那么就唤醒等待的元素。因为此时可能有其他等待获取元素的线程。如果不唤醒,那么出队线程永远无法执行。
对比
DelayQueue 与 PriorityBlockingQueue 有相似之处,它们都具有优先级队列的特性,因为它们底层都使用了二叉树数组实现。但 DelayQueue 比起 PriorityBlockingQueue 还多了一个延迟属性,可以设置延迟到某个时间再出列。
总结
DelayQueue 的重点在于掌握其使用方法以及其原理。
DelayQueue 的每个元素都需要实现 Delayed 接口,并且实现 getDelay() 方法时,需要将返回值其转化指定的时间单位。这一点是非常重要的,如果没有进行转化,那么会导致不可预知的问题。此外,元素出队的顺序并不是由其剩余时间决定,而是由元素实现的 comparble 接口决定。这一点也是许多人会搞错的。
对于 DelayQueue 的实现,其实也并不复杂。其直接使用了 PriorityBlockingQueue 来实现数据的维护,使用 ReentrantLock 实现并发控制。因为其使用 PriorityBlockingQueue 实现,而本质上 PriorityBlockingQueue 是有最大值限制的,所以其并不是无界队列,而是有界队列。
在 DelayQueue 的具体实现上,其使用了 leader 属性保存了第一个等待获取元素的线程,从而避免了过多线程进行 CPU 自旋浪费资源。此外,还使用了 awaitNano 方法最大限度地避免 CPU 无效空转,这些都是非常好的设计思路。
最后,让我们用几个关键词概括 DelayQueue:伪无界队列、数组实现、单锁实现、优先级队列。