参考文献

LinkedBlockingQueue

  • LinkedBlockingQueue是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于单链表实现.
  • LinkedBlockingQueue除了底层数据结构(单链表)与ArrayBlockingQueue不同外,另外一个特点就是:它维护了两把锁——takeLockputLock
    • takeLock用于控制出队的并发,putLock用于入队的并发。这也就意味着,同一时刻,只能只有一个线程能执行入队/出队操作,其余入队/出队线程会被阻塞;但是,入队和出队之间可以并发执行,即同一时刻,可以同时有一个线程进行入队,另一个线程进行出队,这样就可以提升吞吐量。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* 队列容量
* 如果不指定, 则为Integer.MAX_VALUE
*/
private final int capacity;

/**
* 队列中的元素个数
* 使用了一个原子变量AtomicInteger记录队列中元素的个数,以保证入队/出队并发修改元素时的数据一致性
*/
private final AtomicInteger count = new AtomicInteger();

/**
* 队首指针
* head.item == null
*/
transient Node<E> head;

/**
* 队尾指针
* last.next == null
*/
private transient Node<E> last;

/**
* 出队锁
*/
private final ReentrantLock takeLock = new ReentrantLock();

/**
* 队列空时,出队线程在该条件队列等待
*/
private final Condition notEmpty = takeLock.newCondition();

/**
* 入队锁
*/
private final ReentrantLock putLock = new ReentrantLock();

/**
* 队列满时,入队线程在该条件队列等待
*/
private final Condition notFull = putLock.newCondition();

/**
* 链表结点定义
*/
static class Node<E> {
E item;
Node<E> next; // 后驱指针
Node(E x) {
item = x;
}
}

//...
}

核心方法

插入元素 put(E e)
  • 插入元素时,首先需要获得**“入队锁”,如果队列满了,则当前线程需要在notFull**条件队列等待;否则,将新元素链接到队列尾部
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 在队尾插入指定的元素.
* 如果队列已满,则阻塞线程.
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // 获取“入队锁”
try {
while (count.get() == capacity) { // 队列已满, 则线程在notFull上等待
notFull.await();
}
enqueue(node); // 将新结点链接到“队尾”

/**
* c+1 表示的元素个数.
* 如果,则唤醒一个“入队线程”
*/
c = count.getAndIncrement(); // c表示入队前的队列元素个数
if (c + 1 < capacity) // 入队后队列未满, 则唤醒一个“入队线程”
notFull.signal();
} finally {
putLock.unlock();
}

if (c == 0) // 入队前队列为空, 则唤醒一个“出队线程”
signalNotEmpty();
}
  • 每入队一个元素后,如果队列还没满,则需要唤醒其它可能正在等待的“入队线程”

    1
    2
    3
    4
    5
    6
    7
    /**
    * c+1 表示的元素个数.
    * 如果,则唤醒一个“入队线程”
    */
    c = count.getAndIncrement(); // c表示入队前的队列元素个数
    if (c + 1 < capacity) // 入队后队列未满, 则唤醒一个“入队线程”
    notFull.signal();
  • 向队列中放入元素后,判断队列是否为空。如果队列在放入元素之前为空(即原来的元素个数为0),则需要唤醒一个等待的"出队线程"来处理元素的消费。

1
2
if (c == 0)                             // 入队前队列为空, 则唤醒一个“出队线程”
signalNotEmpty();
删除元素 take()
  • 删除元素的逻辑和插入元素类似。删除元素时,首先需要获得**“出队锁”,如果队列为空,则当前线程需要在notEmpty**条件队列等待;否则,从队首出队一个元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 从队首出队一个元素
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock; // 获取“出队锁”
takeLock.lockInterruptibly();
try {
while (count.get() == 0) { // 队列为空, 则阻塞线程
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement(); // c表示出队前的元素个数
if (c > 1) // 出队前队列非空, 则唤醒一个出队线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) // 出队前队列为满,则唤醒一个入队线程
signalNotFull();
return x;
}
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 队首出队一个元素.
*/
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
  • 每出队一个元素前,如果队列非空,则需要唤醒其它可能正在等待的“出队线程”

    1
    2
    3
    c = count.getAndDecrement();                // c表示出队前的元素个数
    if (c > 1) // 出队前队列非空, 则唤醒一个出队线程
    notEmpty.signal();
  • 检查队列在出队操作之前是否已满。如果队列在出队操作之前为满(即原来的元素个数等于队列容量),则需要唤醒一个等待的"入队线程"来处理元素的生产

    1
    2
    if (c == capacity)                              // 队列初始为满,则唤醒一个入队线程
    signalNotFull();

总结

  • **LinkedBlockingQueueArrayBlockingQueue**比较主要有以下区别
    • 队列大小:ArrayBlockingQueue在创建时需要指定固定的容量大小,而LinkedBlockingQueue可以选择不指定大小(默认为Integer.MAX_VALUE),使其近似于无界队列。
    • 底层数据结构:ArrayBlockingQueue使用数组作为底层数据存储容器,而LinkedBlockingQueue使用单链表作为底层数据存储容器。
    • 加锁机制:ArrayBlockingQueue使用一个全局锁(ReentrantLock)来控制入队和出队操作的互斥访问。即同一时间只允许一个线程进行入队或出队操作。而LinkedBlockingQueue使用锁分离的机制,使用两个独立的锁(putLocktakeLock)来控制入队和出队操作的并发访问。这样可以提高并发性能,因为同时可以有一个线程进行入队操作,另一个线程进行出队操作。
    • 公平性策略:ArrayBlockingQueue可以通过构造函数参数指定公平或非公平的访问策略。而LinkedBlockingQueue没有提供直接设置公平性策略的选项,它默认为非公平。公平队列会按照线程的到达顺序来获取锁,而非公平队列则允许线程的抢占。