阻塞队列源码系列(五):DelayQueue

Posted by 陈树义 on 2021-06-20

DelayQueue 是 JDK 1.5 时,随着 J.U.C 包一起引入的一种阻塞队列,它实现了 BlockingQueue 接口,底层基于已有的 PriorityBlockingQueue 实现:

DelayQueue 也是一种比较特殊的阻塞队列,从类声明也可以看出,DelayQueue 中的所有元素必须实现 Delayed 接口:

/**
 * 一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。
 * <p>
 * 此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。
 */
public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回与此对象相关的剩余有效时间,以给定的时间单位表示.
     */
    long getDelay(TimeUnit unit);
}

可以看到,Delayed 接口除了自身的 getDelay 方法外,还实现了 Comparable 接口。 getDelay 方法用于返回对象的剩余有效时间,实现 Comparable 接口则是为了能够比较两个对象,以便排序。

也就是说,如果一个类实现了 Delayed 接口,当创建该类的对象并添加到 DelayQueue 中后,只有当该对象的 getDalay 方法返回的剩余时间 ≤0 时才会出队。

特性

DelayQueue的特点简要概括如下:

  • DelayQueue 使用 PriorityBlockingQueue 实现,所以其本质上还是无界阻塞队列。
  • 队列中的元素必须实现 Delayed 接口,元素过期后才会从队列中取走。
  • 由于 DelayQueue 是按照元素的权重进入排序,所以队列中的元素必须是可以比较的,也就是说元素必须实现 Comparable 接口。

示例

为了理解 DelayQueue 的功能,我们用一个例子来看看 DelayQueue 是如何使用的。

首先,定义一个 DelayNode 类,作为队列元素,该类必须实现 Delayed 接口。

class DelayNode implements Delayed{

    private Integer seq;

    private long expireTime;

    public DelayNode(long expireTime, Integer seq) {
        this.expireTime = expireTime;
        this.seq = seq;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expireTime - new Date().getTime(), TimeUnit.MILLISECONDS);
    }

    /**
     * 按照序号优先级排序
     * @param other
     * @return
     */
    @Override
    public int compareTo(Delayed other) {
        if (other == this)
            return 0;
        if (other instanceof DelayNode) {
            DelayNode x = (DelayNode) other;
            if (this.getSeq() > x.getSeq()) {
                return 1;
            } else if (this.getSeq() == x.getSeq()) {
                return 0;
            } else if (this.getSeq() < x.getSeq()) {
                return -1;
            }
        }
        // 一般不会执行到这里
        System.out.println("error.");
        return 0;
    }

    public Integer getSeq() {
        return seq;
    }

    @Override
    public String toString() {
        return "DelayNode{" +
                "expireTime=" + expireTime +
                ", seq=" + seq +
                '}';
    }
}

在实现 Delayed 接口的 getDelay 接口时要注意:必须要将失效时间转成传入的时间单位,即要用 unit.convert() 方法进行一次转化,否则会出现不可预见的错误。

接着我们写一个简单的例子来测试 DelayQueue 的使用。

public class DelayQueueDemo {
    public static void main(String[] args) throws Exception{
        putTakeDemo();
    }

    public static void putTakeDemo() throws Exception{
        BlockingQueue delayQueue = new DelayQueue<>();
        long now = new Date().getTime();
        delayQueue.put(new DelayNode(now+3000, 5));
        delayQueue.put(new DelayNode(now+4000, 1));
        delayQueue.put(new DelayNode(now+6000, 14));
        delayQueue.put(new DelayNode(now+8000, 8));
        System.out.println(delayQueue);
        for(int i=0; i<4; i++) {
            System.out.println(delayQueue.take());
        }
    }
}

在上面的例子,我们可以看到我们插入了 4 个元素,并且设置了不同的序号。因为我们 DelayNode 中 comparable 的实现是根据序号大小,所以无论过期时间设置成多少。其元素的输出顺序永远都是:1、5、8、14。下面是其中一次的输出结果:

DelayNode{expireTime=1553212254636, seq=1}
DelayNode{expireTime=1553212253636, seq=5}
DelayNode{expireTime=1553212258636, seq=8}
DelayNode{expireTime=1553212256636, seq=14}

原理

要了解 DelayQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员变量

private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;

可以看到 DelayQueue 类中使用了 PriorityQueue 来保存数据,使用 ReentrantLock 来实现并发控制。

构造方法

了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 PriorityBlockingQueue 来说,其有两个构造方法:

public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
    this.addAll(c);
}

上述方法也比较简单,这里不深入介绍。

核心方法

在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

add/remove方法

我们先看看 add 方法的实现:

public boolean add(E e) {
    return offer(e);
}

可以看到其直接调用了 offer 方法的实现,我们一起留在后面看。

我们继续看 remove 方法的实现。

public boolean remove(Object o) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return q.remove(o);
    } finally {
        lock.unlock();
    }
}

可以看到其直接调用了 PriorityQueue 的 remove 方法将元素移除了。关于 PriorityQueue 的 remove 方法这里就不再深入,有兴趣的可以自己了解。

offer/poll方法

我们看一下 offer 方法的实现。

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        // 如果入队元素在队首, 则唤醒一个出队线程
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

可以看到其获取锁之后,直接调用了 PriorityQueue 的 offer 方法将其入列。之后判断队列头是否是插入的元素?如果插入的元素就在队头,那么表示插入前队列为空,此时可能有出队线程在等待获取元素了。那么将 leader 属性设置为空,并且唤醒其他等待获取元素的线程。

我们继续看下 poll 方法的实现。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

可以看到其在获取锁之后,判断队头元素是否为空或是否到期?如果队头为空或者时间未到,那么直接返回 null,否则直接将元素弹出。

put/take方法

put 方法的实现是调用了 offer 方法的实现,这里不赘述。

public void put(E e) {
    offer(e);
}

我们继续看看 take 方法的实现。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 1.获取锁
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            // 2.如果队列为空,那么等待
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 3.如果队列不为空,并且元素过期了,那么直接弹出
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                // 4.如果 leader 不为空,表示有其他线程在等待,那么阻塞等待
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        // 被唤醒后重置leader属性为null
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

上述代码的大致逻辑为:获取锁之后进去自旋循环中。如果队列为空,那么阻塞等待。如果队列不为空,那么继续判断元素是否到期。如果元素到期了,那么直接弹出。如果元素未到期,并且已经有其他线程在等待了,那么直接阻塞等待唤醒,否则将 leader 属性设置为当前线程并阻塞等待至元素过期。

从这块的代码我们可以看出 leader 属性的作用。其实它的存在是为了避免高并发时自旋浪费 CPU 资源。试想一下,如果现在已经有线程在自旋进行入列或出列操作了,那么这时候再继续进行自旋只会增加 CPU 消耗,所以最好的办法是等待该线程完成之后再进行操作,这样可以减少竞争,提高效率。

在上面代码中,线程自旋结束后一段代码:

if (leader == null && q.peek() != null)
    available.signal();

其中 leader == null && q.peek() != null 的含义是:没有 leader 线程但队列中存在元素,那么就唤醒等待的元素。因为此时可能有其他等待获取元素的线程。如果不唤醒,那么出队线程永远无法执行。

对比

DelayQueue 与 PriorityBlockingQueue 有相似之处,它们都具有优先级队列的特性,因为它们底层都使用了二叉树数组实现。但 DelayQueue 比起 PriorityBlockingQueue 还多了一个延迟属性,可以设置延迟到某个时间再出列。

总结

DelayQueue 的重点在于掌握其使用方法以及其原理。

DelayQueue 的每个元素都需要实现 Delayed 接口,并且实现 getDelay() 方法时,需要将返回值其转化指定的时间单位。这一点是非常重要的,如果没有进行转化,那么会导致不可预知的问题。此外,元素出队的顺序并不是由其剩余时间决定,而是由元素实现的 comparble 接口决定。这一点也是许多人会搞错的。

对于 DelayQueue 的实现,其实也并不复杂。其直接使用了 PriorityBlockingQueue 来实现数据的维护,使用 ReentrantLock 实现并发控制。因为其使用 PriorityBlockingQueue 实现,而本质上 PriorityBlockingQueue 是有最大值限制的,所以其并不是无界队列,而是有界队列。

在 DelayQueue 的具体实现上,其使用了 leader 属性保存了第一个等待获取元素的线程,从而避免了过多线程进行 CPU 自旋浪费资源。此外,还使用了 awaitNano 方法最大限度地避免 CPU 无效空转,这些都是非常好的设计思路。

最后,让我们用几个关键词概括 DelayQueue:伪无界队列、数组实现、单锁实现、优先级队列。