阻塞队列源码系列(一):ArrayBlockingQueue

Posted by 陈树义 on 2021-06-20

ArrayBlockingQueue 是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于数组实现:

在深入了解其源码之前,我们先用一个例子看看 ArrayBlockingQueue 是怎么使用的。

ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(10);
blockingQueue.offer("apple");
System.out.println(blockingQueue.poll());

从上面的例子可以看出,ArrayBlockingQueue 的使用非常简单,只需要先声明队列容量,之后使用 offer/take 等方法即可。

特性

ArrayBlockingQueue是一种有界阻塞队列,在初始构造的时候需要指定队列的容量。具有如下特点:

  • 1、队列的容量一旦在构造时指定,后续不能改变。
  • 2、插入元素时,在队尾进行;删除元素时,在队首进行。
  • 3、队列满时,调用特定方法插入元素会阻塞线程;队列空时,删除元素也会阻塞线程。
  • 4、支持公平/非公平策略,默认为非公平策略。

这里的公平策略,是指当线程从阻塞到唤醒后,以最初请求的顺序(FIFO)来添加或删除元素;非公平策略指线程被唤醒后,谁先抢占到锁,谁就能往队列中添加/删除顺序,是随机的。

原理

要了解 ArrayBlockingQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员

ArrayBlockingQueue 中一共有下面这些类成员变量:

/** The queued items 存储队列元素的数组 */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count; 

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

从上面列出的类成员变量,我们可以得出几个显而易见的结论:

  • ArrayBlockingQueue 底层使用数组实现其逻辑。
  • ArrayBlockingQueue 底层使用 AQS 的 Condition 实现并发控制。

构造方法

了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 ArrayBlockingQueue 来说,其有三个构造方法:

/**
 * 指定队列初始容量的构造器.
 */
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
/**
 * 指定队列初始容量和公平/非公平策略的构造器.
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();

    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);     // 利用独占锁的策略
   notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}
/**
 * 根据已有集合构造队列
 */
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock();    // 这里加锁是用于保证items数组的可见性
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);    // 不能有null元素
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;     // 如果队列已满,则重置puIndex索引为0
    } finally {
        lock.unlock();
    }
}

看了上面三个构造方法,虽然它们有些略微不同,但其主体内容都一致。基本上都是初始化 ReentrantLock 对象、Condition 对象以及 putIndex 等类成员变量。

核心方法

在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

不同的阻塞队列可能会有所不同,但大致都类似。对于 ArrayBlockingQueue 来说,其核心方法就是这三对存储方法。

add/remove方法

ArrayBlockingQueue 的 add 方法调用了父类 AbstractQueue 的 add 方法。

public boolean add(E e) {
    return super.add(e);
}

而 AbstractQueue 则是利用模板设计模板,调回了子类的 offer 方法。

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

接下来我们继续看看 remove 方法。

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)    // CASE 1
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

我们知道队列是先进先出的,我们都是从队头取数据(takeIndex),从队尾存数据(putIndex)。在这样一个基础之上我们再看上面的代码就容易得多:

  • 首先,上锁避免其他线程并发修改。
  • 之后,从队头(takeIndex)开始一直遍历到队尾(putIndex),找到需要移除的元素。

其中有一个细节需要注意,那便是 CASE 1 标记的这段代码:

if (++i == items.length)    // CASE 1
    i = 0;

在遍历队列的时候会去判断 i 是否到达了数组尾部,如果是则重置 i 的值为零。从这我们可以看出 ArrayBlockingQueue 是一个环形队列,头尾是连接在一起的。

offer/poll方法

我们先看看 offer 方法的实现:

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

其实现也比较简单,先上锁之后做一些简单的判断,最后调用 enqueue 方法将元素如队列。那我们来看看 enqueue 方法的具体逻辑。

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length) // CASE 2 
        putIndex = 0;
    count++;
    notEmpty.signal();
}

enqueue 方法的逻辑也比较简单,首先插入元素,最后唤醒正在等待元素的线程。唯一需要注意的细节就是 CASE 2 这个点。其会判断 putIndex 的位置是否到达了数组尾端,如果到了则置为 0。这和我们上面说的逻辑是一致的,因为 ArrayBlockingQueue 是一个环形队列。

接下来我们看看 poll 方法的源码。

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

poll 方法的源码也很简单,主要的逻辑在 dequeue 方法中。

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)    // CASE 3
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued(); // CASE 4
    notFull.signal();
    return x;
}

这里的逻辑也非常简单,首先删除元素,之后唤醒插入元素的线程。唯一需要注意的两个点是 CASE 3 和 CASE 4。CASE 3 是对环形队列元素的维护,CASE 4 则是为了确保在修改内容时,遍历该队列的线程能够看到变化。

put/take方法

我们先看看 put 方法的源码:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

可以看到其首先判断队列是否已满,如果满了就阻塞等待,否则调用 enqueue 方法入队。enqueue 方法上面我们已经讲过,这里不再讲解。

我们继续看 take 方法的源码:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

可以看到其首先判断队列是否为空,如果为空,那么阻塞等待,否则直接调用 dequeue 方法获取元素退出。dequeue 方法我们上面解析过,就是直接获取队尾元素,这里不再深入讲解。

看完了这三组核心方法,我们发现其核心在与 enqueue 和 dequeue 方法。这三组方法中大多数都是调用者两个核心方法完成的,因此如果想掌握 ArrayBlockingQueue 的实现精髓,就需要掌握 enqueue 和 dequeue 方法。

总结

ArrayBlockingQueue 用于比较简单的阻塞场景,其使用 ReentrantLock 的 Condition 条件实现线程安全,其实现相对比较简单,不是很难掌握。

下面我们用几个关键词概括 ArrayBlockingQueue:有界队列、数组实现、单锁实现。

参考资料