ArrayBlockingQueue 是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于数组实现:
在深入了解其源码之前,我们先用一个例子看看 ArrayBlockingQueue 是怎么使用的。
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(10);
blockingQueue.offer("apple");
System.out.println(blockingQueue.poll());
从上面的例子可以看出,ArrayBlockingQueue 的使用非常简单,只需要先声明队列容量,之后使用 offer/take 等方法即可。
特性
ArrayBlockingQueue是一种有界阻塞队列,在初始构造的时候需要指定队列的容量。具有如下特点:
- 1、队列的容量一旦在构造时指定,后续不能改变。
- 2、插入元素时,在队尾进行;删除元素时,在队首进行。
- 3、队列满时,调用特定方法插入元素会阻塞线程;队列空时,删除元素也会阻塞线程。
- 4、支持公平/非公平策略,默认为非公平策略。
这里的公平策略,是指当线程从阻塞到唤醒后,以最初请求的顺序(FIFO)来添加或删除元素;非公平策略指线程被唤醒后,谁先抢占到锁,谁就能往队列中添加/删除顺序,是随机的。
原理
要了解 ArrayBlockingQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员
ArrayBlockingQueue 中一共有下面这些类成员变量:
/** The queued items 存储队列元素的数组 */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
从上面列出的类成员变量,我们可以得出几个显而易见的结论:
- ArrayBlockingQueue 底层使用数组实现其逻辑。
- ArrayBlockingQueue 底层使用 AQS 的 Condition 实现并发控制。
构造方法
了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 ArrayBlockingQueue 来说,其有三个构造方法:
/**
* 指定队列初始容量的构造器.
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* 指定队列初始容量和公平/非公平策略的构造器.
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair); // 利用独占锁的策略
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 根据已有集合构造队列
*/
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // 这里加锁是用于保证items数组的可见性
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e); // 不能有null元素
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i; // 如果队列已满,则重置puIndex索引为0
} finally {
lock.unlock();
}
}
看了上面三个构造方法,虽然它们有些略微不同,但其主体内容都一致。基本上都是初始化 ReentrantLock 对象、Condition 对象以及 putIndex 等类成员变量。
核心方法
在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
不同的阻塞队列可能会有所不同,但大致都类似。对于 ArrayBlockingQueue 来说,其核心方法就是这三对存储方法。
add/remove方法
ArrayBlockingQueue 的 add 方法调用了父类 AbstractQueue 的 add 方法。
public boolean add(E e) {
return super.add(e);
}
而 AbstractQueue 则是利用模板设计模板,调回了子类的 offer 方法。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
接下来我们继续看看 remove 方法。
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length) // CASE 1
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
我们知道队列是先进先出的,我们都是从队头取数据(takeIndex),从队尾存数据(putIndex)。在这样一个基础之上我们再看上面的代码就容易得多:
- 首先,上锁避免其他线程并发修改。
- 之后,从队头(takeIndex)开始一直遍历到队尾(putIndex),找到需要移除的元素。
其中有一个细节需要注意,那便是 CASE 1 标记的这段代码:
if (++i == items.length) // CASE 1
i = 0;
在遍历队列的时候会去判断 i 是否到达了数组尾部,如果是则重置 i 的值为零。从这我们可以看出 ArrayBlockingQueue 是一个环形队列,头尾是连接在一起的。
offer/poll方法
我们先看看 offer 方法的实现:
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
其实现也比较简单,先上锁之后做一些简单的判断,最后调用 enqueue 方法将元素如队列。那我们来看看 enqueue 方法的具体逻辑。
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) // CASE 2
putIndex = 0;
count++;
notEmpty.signal();
}
enqueue 方法的逻辑也比较简单,首先插入元素,最后唤醒正在等待元素的线程。唯一需要注意的细节就是 CASE 2 这个点。其会判断 putIndex 的位置是否到达了数组尾端,如果到了则置为 0。这和我们上面说的逻辑是一致的,因为 ArrayBlockingQueue 是一个环形队列。
接下来我们看看 poll 方法的源码。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
poll 方法的源码也很简单,主要的逻辑在 dequeue 方法中。
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) // CASE 3
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued(); // CASE 4
notFull.signal();
return x;
}
这里的逻辑也非常简单,首先删除元素,之后唤醒插入元素的线程。唯一需要注意的两个点是 CASE 3 和 CASE 4。CASE 3 是对环形队列元素的维护,CASE 4 则是为了确保在修改内容时,遍历该队列的线程能够看到变化。
put/take方法
我们先看看 put 方法的源码:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
可以看到其首先判断队列是否已满,如果满了就阻塞等待,否则调用 enqueue 方法入队。enqueue 方法上面我们已经讲过,这里不再讲解。
我们继续看 take 方法的源码:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
可以看到其首先判断队列是否为空,如果为空,那么阻塞等待,否则直接调用 dequeue 方法获取元素退出。dequeue 方法我们上面解析过,就是直接获取队尾元素,这里不再深入讲解。
看完了这三组核心方法,我们发现其核心在与 enqueue 和 dequeue 方法。这三组方法中大多数都是调用者两个核心方法完成的,因此如果想掌握 ArrayBlockingQueue 的实现精髓,就需要掌握 enqueue 和 dequeue 方法。
总结
ArrayBlockingQueue 用于比较简单的阻塞场景,其使用 ReentrantLock 的 Condition 条件实现线程安全,其实现相对比较简单,不是很难掌握。
下面我们用几个关键词概括 ArrayBlockingQueue:有界队列、数组实现、单锁实现。