阻塞队列源码系列(二):LinkedBlockingQueue

Posted by 陈树义 on 2021-06-20

LinkedBlockingQueue是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于单链表实现:

在深入了解其源码之前,我们先用一个例子看看 LinkedBlockingQueue 是怎么使用的。

LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
linkedBlockingQueue.offer("apple");
linkedBlockingQueue.take();

从上面的例子可以看出,LinkedBlockingQueue 的使用非常简单,声明对象之后使用 offer/take 等方法即可。

特性

LinkedBlockingQueue 也是有界阻塞队列,在初始构造时就指定队列的容量,也可以不指定。如果不指定,那么它的容量大小默认为Integer.MAX_VALUE。具有如下特点:

  • 1、底层使用链表实现。
  • 2、只支持非公平策略,不支持公平策略。
  • 3、使用两把锁takeLock和putLock控制并发,takeLock用于控制出队的并发,putLock用于入队的并发。可以同时有一个线程进行入队操作,一个线程进行出队操作。

原理

要了解 LinkedBlockingQueue 是如何实现的,那么必须深入源码才行。下面我们将从类成员变量、构造方法、核心方法两个方面逐一介绍。

类成员变量

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

从上面列出的类成员变量,我们可以得出几个显而易见的结论:

  • LinkedBlockingQueue 使用链表实现。
  • LinkedBlockingQueue 使用两把锁(takeLock、putLock)实现并发控制。

我们可以看看 Node 节点的声明:

static class Node<E> {
    E item;
    
    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;
    
    Node(E x) { item = x; }
}

可以看到其内容非常简单,就一个保存节点元素值的 item,以及保存下一个节点引用的 next。

构造方法

了解构造方法是如何实现的,可以知道其实怎么初始化数据的,这对于后续的深入研究其实现有着非常重要的作用。对于 LinkedBlockingQueue 来说,其有三个构造方法:

/**
 * 默认构造器.
 * 队列容量为Integer.MAX_VALUE.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 显示指定队列容量的构造器
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}
/**
 * 从已有集合构造队列.
 * 队列容量为Integer.MAX_VALUE
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();                     // 这里加锁仅仅是为了保证可见性
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)              // 队列不能包含null元素
                throw new NullPointerException();
            if (n == capacity)          // 队列已满
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e));    // 队尾插入元素
            ++n;
        }
        count.set(n);                   // 设置元素个数
    } finally {
        putLock.unlock();
    }
}

从上面的三个构造方法我们可以看出,LinkedBlockingQueue 并没有提供参数设置公平策略,所以其默认策略是非公平策略。经过构造方法初始化后,LinkedBlockingQueue的初始结构如下:

插入部分元素后的 LinkedBlockingQueue 结构:

这里有一个细节需要注意:即 head 指向的节点,其 item 值是 null,我们一般称这样的节点为哨兵节点。而 last 指向的元素则是正常的,item 是有值的。

核心方法

在之前讲解 BlockingQueue 的时候,我们说过阻塞队列最核心的就是那三对存取方法:

  • add/remove
  • offer/poll
  • put/take

不同的阻塞队列可能会有所不同,但大致都类似。对于 LinkedBlockingQueue 来说,其核心方法就是这三对存储方法。

add/remove方法

ArrayBlockingQueue 的 add 方法调用了父类 AbstractQueue 的 add 方法。

public boolean add(E e) {
    return super.add(e);
}

而 AbstractQueue 则是利用模板设计模板,调回了子类的 offer 方法。

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

可以看到这一点的实现与 ArrayBlockingQueue 完全一致。应该说如果具体实现类没有重写 AbstractQueue 的 add 方法,那么其实现就是调用子类的 offer 方法。

我们看看 remove 方法的具体实现:

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

可以看到其首先调用 fullyLock 获取了一个全锁(putLock 和 takeLock),之后再进将元素移出队列。这是因为 remove 方法中包含了遍历操作,如果不锁定的话业务逻辑太过于复杂,很容易出现并发问题。

offer/poll方法

我们先看看 offer 的具体实现:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock; // 1.获取写锁
    putLock.lock();
    try {
        if (count.get() < capacity) {   
            enqueue(node);
            c = count.getAndIncrement(); // c 表示入队前的队列元素个数
            if (c + 1 < capacity)   // 2.唤醒等待插入的线程
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0) // 3.唤醒等待获取元素的线程
        signalNotEmpty();   
    return c >= 0;
}

首先,先获取入队锁(putLock)。接着,判断队列元素是否小于 capacity,即队列是否已满。如果队列未满,那么调用 enqueue 方法进行入队操作,否则直接退出。这里的 enqueue 方法非常简单,就是简单的修改元素引用,不做过多讲解。

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

接着,判断入队后的队列元素数量是否小于capacity,即队列是否未满。如果队列未满,那么说明可以继续插入元素,那么就唤醒等待插入元素的线程。

if (c + 1 < capacity)   // CASE 1:唤醒等待插入的线程
    notFull.signal();

最后,判断入队前的队列元素数量为0,即元素入队前队列是否为空。如果元素入队前队列为空,即可能有其他线程在等待获取数据,那么唤醒正在等待获取元素的线程。

if (c == 0) // CASE 2:唤醒等待获取元素的线程
    signalNotEmpty();   

这里唤醒获取元素的线程调用的是 signalNotEmpty 方法:

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

可以看到,其首先获取了 takeLock,之后再进行出元素操作,而不是直接调用 notEmpty.signal 方法。这是因为每次调用 signal 方法,必须获取到该 Condition 所属的锁,否则会报错。

继续看一下 poll 方法:

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();    // 1.获取出队锁
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)  // 2.唤醒等待获取元素的线程
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)  // 3.唤醒等待插入的线程
        signalNotFull();
    return x;
}

首先,先获取出队锁(takeLock)。接着判断队列容量是否大于0,即队列是否不为空。如果队列不为空,那么调用 dequeue 方法进行出队操作。从 dequeue 方法可以看到,方法中进行元素引用的修改,比较简单。

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

接着,判断出队前队列元素数量是否大于1,即出队前队列元素是否大于等于2。如果是,那么表明即使经过这次出列,队列中还是不为空。那么唤醒其他等待获取元素的线程。

if (c > 1)  // 2.唤醒等待获取元素的线程
    notEmpty.signal();

最后,判断元素出队前队列元素数量是否等于capacity,即元素出队前队列是否已满。如果队列已满,那么说明可能有在等待插入元素的线程,那么唤醒唤醒等待插入的线程。

if (c == capacity)  // CASE 4:唤醒等待插入的线程
    signalNotFull();

signalNotFull 方法的内容与 signalNotEmpty 类似,都是先获取锁再唤醒。

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

从我们上面的分析,我们可以看到 offer 和 poll 的实现逻辑非常相似。但是和 ArrayBlockingQueue 对比起来,会发现有许多不同之处。特别是因为 LinkedBlockingQueue 使用了两个锁进行出入队列,所以在一些细节处理上会有所不同。

put/take方法

我们先看看 put 方法的源码:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try { 
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)   // CASE 1:唤醒其他正在等待插入的线程
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)     // CASE 2:唤醒其他正在等待获取元素的线程
        signalNotEmpty();
}

put 方法的大致逻辑为:首先,获取入队锁(putLock)。接着,如果队列元素数量是否等于capacity,即队列是否已满。如果队列已满,那么在 notFull 队列等待,否则调用 enqueue 方法执行入队操作。这里调用的 enqueue 方法和 offer 方法中调用的 enqueue 方法是一样的,都是进行简单的引用修改。

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

接着,判断入队后的队列容量是否小于 capacity,即元素入队列后队列容量是否还未达到最大容量。如果未达到最大容量,则唤醒其他正在等待插入的线程。

if (c + 1 < capacity)   // CASE 1:唤醒其他正在等待插入的线程
    notFull.signal();

最后,判断入队列前的队列元素数量是否等于0,即元素入队列前队列是否为空。如果是,那么唤醒其他正在等待获取元素的线程。

if (c == 0)     // CASE 2:唤醒其他正在等待获取元素的线程
    signalNotEmpty();

我们继续看看 take 方法的源码。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();   // 1.获取锁
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)  // 2.唤醒等待获取元素的线程
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)  // 3.唤醒等待插入元素的线程
        signalNotFull();
    return x;
}

take 方法的大致逻辑为:首先,获取出队锁(takeLock)。接着,判断队列大小是否等于0,即队列是否为空。如果队列为空,那么阻塞等待。否则调用 dequeue 方法进行出列操作。这里的 dequeue 方法与 poll 方法调用的 dequeue 方法是同一个方法。

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

接着,判断元素入队列前队列的大小是否大于1,即元素入队列前队列大小是否大于等于2。如果是,那么说明经过此次出列后,队列仍然不为空,那么就唤醒其他正在等待获取元素的线程。

if (c > 1)  // 2.唤醒等待获取元素的线程
    notEmpty.signal();

最后,判断元素入队列前队列的大小是否等于 capacity,即元素入队列前队列是否已满。如果队列已满,那么可能有线程在等待插入元素,那么就唤醒在等待插入元素的线程。

if (c == capacity)  // 3.唤醒等待插入元素的线程
    signalNotFull();

仔细分析我们可以知道,put/take 方法的实现与 offer/poll 方法的实现几乎一致。略微不同的是,如果队列非空或者非满,put/take 会阻塞等待,而 offer/poll 则会直接返回。

对比

与 ArrayBlockingQueue 相比,LinkedBlockingQueue 与其有下面几点不同:

  • 第一,LinkedBlockingQueue 使用链表实现底层逻辑,而 ArrayBlockingQueue 使用数组实现。
  • 第二,LinkedBlockingQueue 使用两个锁实现,而 ArrayBlockingQueue 使用单锁实现。

这里提出一个问题:为什么 LinkedBlockingQueue 要使用两个锁,而 ArrayBlockingQueue 则使用单个锁?如果改造 ArrayBlockingQueue 让其用两个锁去实现可以吗?

总结

LinkedBlockingQueue 使用了链表的数据结构实现,其大小虽然没有限制,但是有一个非常大的默认值。与 ArrayBlockingQueue 对比起来,其实现更加复杂。在并发控制上,其使用了两个锁,即 putLock 和 takeLock。这使得 LinkedBlockingQueue 允许两个线程同时进行入列和出列操作,极大地提高了并发性能。

在核心的 add/remove 方法中,我们发现其与 ArrayBlockingQueue 类似,都调用了父级接口 AbstractQueue 的 add 方法,之后回调子类的 offer 方法。而在 remove 方法中,与 offer/poll/put/take 方法获取单个锁不同,其同时获取了两个锁。这是为了避免遍历时发生线程安全问题。

而在 offer/poll/put/take 方法中,其业务逻辑几乎一致。与 ArrayBlockingQueue 相比,其使用两个锁进行并发控制,极大地提高了并发性能。但同时也使得其实现有了些许不同。而 offer/poll 与 put/take 的不同,仅仅在于其是否等待这一点。如果去除这一点,那么其实现逻辑完全一致。

最后,让我们用几个关键词概括 LinkedBlockingQueue:有界队列、链表实现、双锁实现。

参考资料