LinkedBlockingQueue是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于单链表实现:
在深入了解其源码之前,我们先用一个例子看看 LinkedBlockingQueue 是怎么使用的。
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
linkedBlockingQueue.offer("apple");
linkedBlockingQueue.take();
从上面的例子可以看出,LinkedBlockingQueue 的使用非常简单,声明对象之后使用 offer/take 等方法即可。
特性
LinkedBlockingQueue 也是有界阻塞队列,在初始构造时就指定队列的容量,也可以不指定。如果不指定,那么它的容量大小默认为Integer.MAX_VALUE。具有如下特点:
- 1、底层使用链表实现。
- 2、只支持非公平策略,不支持公平策略。
- 3、使用两把锁takeLock和putLock控制并发,takeLock用于控制出队的并发,putLock用于入队的并发。可以同时有一个线程进行入队操作,一个线程进行出队操作。
原理
要了解 LinkedBlockingQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。
类成员变量
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
从上面列出的类成员变量,我们可以得出几个显而易见的结论:
- LinkedBlockingQueue 使用链表实现。
- LinkedBlockingQueue 使用两把锁(takeLock、putLock)实现并发控制。
我们可以看看 Node 节点的声明:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
可以看到其内容非常简单,就一个保存节点元素值的 item,以及保存下一个节点引用的 next。
构造方法
了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 LinkedBlockingQueue 来说,其有三个构造方法:
/**
* 默认构造器.
* 队列容量为Integer.MAX_VALUE.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 显示指定队列容量的构造器
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* 从已有集合构造队列.
* 队列容量为Integer.MAX_VALUE
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // 这里加锁仅仅是为了保证可见性
try {
int n = 0;
for (E e : c) {
if (e == null) // 队列不能包含null元素
throw new NullPointerException();
if (n == capacity) // 队列已满
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e)); // 队尾插入元素
++n;
}
count.set(n); // 设置元素个数
} finally {
putLock.unlock();
}
}
从上面的三个构造方法我们可以看出,LinkedBlockingQueue 并没有提供参数设置公平策略,所以其默认策略是非公平策略。经过构造方法初始化后,LinkedBlockingQueue的初始结构如下:
插入部分元素后的 LinkedBlockingQueue 结构:
这里有一个细节需要注意:即 head 指向的节点,其 item 值是 null,我们一般称这样的节点为哨兵节点。而 last 指向的元素则是正常的,item 是有值的。
核心方法
在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:
- add/remove
- offer/poll
- put/take
不同的阻塞队列可能会有所不同,但大致都类似。对于 LinkedBlockingQueue 来说,其核心方法就是这三对存储方法。
add/remove方法
ArrayBlockingQueue 的 add 方法调用了父类 AbstractQueue 的 add 方法。
public boolean add(E e) {
return super.add(e);
}
而 AbstractQueue 则是利用模板设计模板,调回了子类的 offer 方法。
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
可以看到这一点的实现与 ArrayBlockingQueue 完全一致。应该说如果具体实现类没有重写 AbstractQueue 的 add 方法,那么其实现就是调用子类的 offer 方法。
我们看看 remove 方法的具体实现:
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
可以看到其首先调用 fullyLock 获取了一个全锁(putLock 和 takeLock),之后再进将元素移出队列。这是因为 remove 方法中包含了遍历操作,如果不锁定的话业务逻辑太过于复杂,很容易出现并发问题。
offer/poll方法
我们先看看 offer 的具体实现:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock; // 1.获取写锁
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement(); // c 表示入队前的队列元素个数
if (c + 1 < capacity) // 2.唤醒等待插入的线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0) // 3.唤醒等待获取元素的线程
signalNotEmpty();
return c >= 0;
}
首先,先获取入队锁(putLock)。接着,判断队列元素是否小于 capacity,即队列是否已满。如果队列未满,那么调用 enqueue 方法进行入队操作,否则直接退出。这里的 enqueue 方法非常简单,就是简单的修改元素引用,不做过多讲解。
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
接着,判断入队后的队列元素数量是否小于capacity,即队列是否未满。如果队列未满,那么说明可以继续插入元素,那么就唤醒等待插入元素的线程。
if (c + 1 < capacity) // CASE 1:唤醒等待插入的线程
notFull.signal();
最后,判断入队前的队列元素数量为0,即元素入队前队列是否为空。如果元素入队前队列为空,即可能有其他线程在等待获取数据,那么唤醒正在等待获取元素的线程。
if (c == 0) // CASE 2:唤醒等待获取元素的线程
signalNotEmpty();
这里唤醒获取元素的线程调用的是 signalNotEmpty 方法:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
可以看到,其首先获取了 takeLock,之后再进行出元素操作,而不是直接调用 notEmpty.signal 方法。这是因为每次调用 signal 方法,必须获取到该 Condition 所属的锁,否则会报错。
继续看一下 poll 方法:
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock(); // 1.获取出队锁
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1) // 2.唤醒等待获取元素的线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity) // 3.唤醒等待插入的线程
signalNotFull();
return x;
}
首先,先获取出队锁(takeLock)。接着判断队列容量是否大于0,即队列是否不为空。如果队列不为空,那么调用 dequeue 方法进行出队操作。从 dequeue 方法可以看到,方法中进行元素引用的修改,比较简单。
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
接着,判断出队前队列元素数量是否大于1,即出队前队列元素是否大于等于2。如果是,那么表明即使经过这次出列,队列中还是不为空。那么唤醒其他等待获取元素的线程。
if (c > 1) // 2.唤醒等待获取元素的线程
notEmpty.signal();
最后,判断元素出队前队列元素数量是否等于capacity,即元素出队前队列是否已满。如果队列已满,那么说明可能有在等待插入元素的线程,那么唤醒唤醒等待插入的线程。
if (c == capacity) // CASE 4:唤醒等待插入的线程
signalNotFull();
signalNotFull 方法的内容与 signalNotEmpty 类似,都是先获取锁再唤醒。
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
从我们上面的分析,我们可以看到 offer 和 poll 的实现逻辑非常相似。但是和 ArrayBlockingQueue 对比起来,会发现有许多不同之处。特别是因为 LinkedBlockingQueue 使用了两个锁进行出入队列,所以在一些细节处理上会有所不同。
put/take方法
我们先看看 put 方法的源码:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity) // CASE 1:唤醒其他正在等待插入的线程
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) // CASE 2:唤醒其他正在等待获取元素的线程
signalNotEmpty();
}
put 方法的大致逻辑为:首先,获取入队锁(putLock)。接着,如果队列元素数量是否等于capacity,即队列是否已满。如果队列已满,那么在 notFull 队列等待,否则调用 enqueue 方法执行入队操作。这里调用的 enqueue 方法和 offer 方法中调用的 enqueue 方法是一样的,都是进行简单的引用修改。
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
接着,判断入队后的队列容量是否小于 capacity,即元素入队列后队列容量是否还未达到最大容量。如果未达到最大容量,则唤醒其他正在等待插入的线程。
if (c + 1 < capacity) // CASE 1:唤醒其他正在等待插入的线程
notFull.signal();
最后,判断入队列前的队列元素数量是否等于0,即元素入队列前队列是否为空。如果是,那么唤醒其他正在等待获取元素的线程。
if (c == 0) // CASE 2:唤醒其他正在等待获取元素的线程
signalNotEmpty();
我们继续看看 take 方法的源码。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 1.获取锁
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) // 2.唤醒等待获取元素的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) // 3.唤醒等待插入元素的线程
signalNotFull();
return x;
}
take 方法的大致逻辑为:首先,获取出队锁(takeLock)。接着,判断队列大小是否等于0,即队列是否为空。如果队列为空,那么阻塞等待。否则调用 dequeue 方法进行出列操作。这里的 dequeue 方法与 poll 方法调用的 dequeue 方法是同一个方法。
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
接着,判断元素入队列前队列的大小是否大于1,即元素入队列前队列大小是否大于等于2。如果是,那么说明经过此次出列后,队列仍然不为空,那么就唤醒其他正在等待获取元素的线程。
if (c > 1) // 2.唤醒等待获取元素的线程
notEmpty.signal();
最后,判断元素入队列前队列的大小是否等于 capacity,即元素入队列前队列是否已满。如果队列已满,那么可能有线程在等待插入元素,那么就唤醒在等待插入元素的线程。
if (c == capacity) // 3.唤醒等待插入元素的线程
signalNotFull();
仔细分析我们可以知道,put/take 方法的实现与 offer/poll 方法的实现几乎一致。略微不同的是,如果队列非空或者非满,put/take 会阻塞等待,而 offer/poll 则会直接返回。
对比
与 ArrayBlockingQueue 相比,LinkedBlockingQueue 与其有下面几点不同:
- 第一,LinkedBlockingQueue 使用链表实现底层逻辑,而 ArrayBlockingQueue 使用数组实现。
- 第二,LinkedBlockingQueue 使用两个锁实现,而 ArrayBlockingQueue 使用单锁实现。
这里提出一个问题:为什么 LinkedBlockingQueue 要使用两个锁,而 ArrayBlockingQueue 则使用单个锁?如果改造 ArrayBlockingQueue 让其用两个锁去实现可以吗?
总结
LinkedBlockingQueue 使用了链表的数据结构实现,其大小虽然没有限制,但是有一个非常大的默认值。与 ArrayBlockingQueue 对比起来,其实现更加复杂。在并发控制上,其使用了两个锁,即 putLock 和 takeLock。这使得 LinkedBlockingQueue 允许两个线程同时进行入列和出列操作,极大地提高了并发性能。
在核心的 add/remove 方法中,我们发现其与 ArrayBlockingQueue 类似,都调用了父级接口 AbstractQueue 的 add 方法,之后回调子类的 offer 方法。而在 remove 方法中,与 offer/poll/put/take 方法获取单个锁不同,其同时获取了两个锁。这是为了避免遍历时发生线程安全问题。
而在 offer/poll/put/take 方法中,其业务逻辑几乎一致。与 ArrayBlockingQueue 相比,其使用两个锁进行并发控制,极大地提高了并发性能。但同时也使得其实现有了些许不同。而 offer/poll 与 put/take 的不同,仅仅在于其是否等待这一点。如果去除这一点,那么其实现逻辑完全一致。
最后,让我们用几个关键词概括 LinkedBlockingQueue:有界队列、链表实现、双锁实现。