参考文献

PriorityBlockingQueue

  • PriorityBlockingQueue,是在JDK1.5时,随着J.U.C包引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于实现.
  • PriorityBlockingQueue是一种无界阻塞队列,它具有以下特点:
    1. 元素按照权重大小的顺序出队:与其他阻塞队列不同,PriorityBlockingQueue是一种优先级队列。元素不是按照FIFO(先进先出)的方式出队,而是根据元素的权重来确定优先级,权重较小的元素先出队。
    2. 无需指定最大容量:与ArrayBlockingQueue不同,创建PriorityBlockingQueue时无需指定最大容量,它是真正的无界队列。它只受系统内存大小的限制,没有预设的最大容量。
    3. 元素必须可比较:由于PriorityBlockingQueue是基于元素权重排序的,因此队列中的元素必须可比较。这意味着元素必须实现Comparable接口,以便进行排序。
    4. 插入元素不会阻塞线程:由于PriorityBlockingQueue是无界队列,插入元素永远不会导致线程阻塞。即使队列已经包含了很多元素,新的元素也可以被立即插入。
    5. 基于数组的堆结构:PriorityBlockingQueue的底层实现是基于数组的堆结构。堆是一种二叉树结构,它可以快速找到最大或最小元素,并保持元素的有序性。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
 
/**
* 默认容量.
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
 
/**
* 最大容量.
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
/**
* 内部堆数组, 保存实际数据, 可以看成一颗二叉树:
* 对于顶点queue[n], queue[2*n+1]表示左子结点, queue[2*(n+1)]表示右子结点.
*/
private transient Object[] queue;
 
/**
* 队列中的元素个数.
*/
private transient int size;
 
/**
* 比较器, 如果为null, 表示以元素自身的自然顺序进行比较(元素必须实现Comparable接口).
*/
private transient Comparator<? super E> comparator;
 
/**
* 全局锁.
*/
private final ReentrantLock lock = new ReentrantLock();
 
/**
* 当队列为空时,出队线程在该条件队列上等待.
*/
private final Condition notEmpty = lock.newCondition();

/**
* 分配时使用的自旋锁,通过CAS(比较并交换)操作获取
*/
private transient volatile int allocationSpinLock;

/**
* 仅用于序列化的普通PriorityQueue,以保持与此类先前版本的兼容性
*/
private PriorityQueue<E> q;
 
// ...
}

核心方法

插入元素 put(E e)
  • PriorityBlockingQueue插入元素不会阻塞线程,put(E e)方法内部其实是调用了offer(E e)方法:
    首先获取全局锁(对于队列的修改都要获取这把锁),然后判断下队列是否已经满了,如果满了就先进行一次内部数组的扩容
1
2
3
public void put(E e) {
offer(e); // never need to block
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] es;
// 若队列已满,则近些年扩容
while ((n = size) >= (cap = (es = queue).length))
tryGrow(es, cap);
try {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
// 比较器为空,则按照元素的自然排序进行调整
siftUpComparable(n, e, es);
else
// 比较器不为空,则按照比较器进行堆调整
siftUpUsingComparator(n, e, es, cmp);
// 队列元素总数+1
size = n + 1;
// 唤醒一个可能正在等待"出队线程"
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 将元素x插入到array[k]的位置.
* 然后按照元素的自然顺序进行堆调整——"上浮",以维持"堆"有序.
* 最终的结果是一个"小顶堆".
*/
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; // 相当于(k-1)除2, 就是求k结点的父结点索引parent
Object e = array[parent];
if (key.compareTo((T) e) >= 0) // 如果插入的结点值大于父结点, 则退出
break;
 
// 否则,交换父结点和当前结点的值
array[k] = e;
k = parent;
}
array[k] = key;
}
扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 扩容和入队/出队可以同时进行, 所以先释放全局锁
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) { // allocationSpinLock置1表示正在扩容
try {
// 计算新的数组大小
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) :
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // 溢出判断
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap]; // 分配新数组
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // 扩容失败(可能有其它线程正在扩容,导致allocationSpinLock竞争失败)
Thread.yield();

lock.lock(); // 获取全局锁(因为要修改内部数组queue)
if (newArray != null && queue == array) {
queue = newArray; // 指向新的内部数组
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
删除元素 take()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
  /**
* 出队一个元素.
* 如果队列为空, 则阻塞线程.
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 获取全局锁
E result;
try {
while ((result = dequeue()) == null) // 队列为空
notEmpty.await(); // 线程在noEmpty条件队列等待
} finally {
lock.unlock();
}
return result;
}

private E dequeue() {
// assert lock.isHeldByCurrentThread();
final Object[] es;
final E result;
// array[0]是堆顶结点, 每次出队都删除堆顶结点
// array[n]是堆的最后一个结点, 也就是二叉树的最右下结点
if ((result = (E) ((es = queue)[0])) != null) {
final int n;
final E x = (E) es[(n = --size)];
es[n] = null;
if (n > 0) {
final Comparator<? super E> cmp;
if ((cmp = comparator) == null)
siftDownComparable(0, x, es, n);
else
siftDownUsingComparator(0, x, es, n, cmp);
}
}
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
// assert n > 0;
Comparable<? super T> key = (Comparable<? super T>)x;
// 相当于n除2, 即找到索引n对应结点的父结点
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
/**
* c保存k的左右子结点中的较小结点值
* child保存较小结点对应的索引
*/
// k的左子结点
int child = (k << 1) + 1; // assume left child is least
Object c = es[child];
// k的右子结点
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right];
if (key.compareTo((T) c) <= 0)
break;
es[k] = c;
k = child;
}
es[k] = key;
}