参考文献

ArrayBlockingQueue(基于数组的阻塞队列)

  • 队列的容量一旦在构造时指定,后续不能改变;
  • 插入元素时,在队尾进行;删除元素时,在队首进行;
  • 队列满时,调用特定方法插入元素会阻塞线程;队列空时,删除元素也会阻塞线程;
  • 支持公平/非公平策略,默认为非公平策略。
    • 公平策略,是指当线程从阻塞到唤醒后,以最初请求的顺序(FIFO)来添加或删除元素;
    • 非公平策略指线程被唤醒后,谁先抢占到锁,谁就能往队列中添加/删除顺序,是随机的。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* 内部数组
* 这个数据就是ArrayBlockingQueue队列集合用来存储数据的数组
*/
final Object[] items;

/**
* 下一个待删除位置的索引: take, poll, peek, remove方法使用
* 该变量指向的索引位上的元素是下一次将从队列集合中移除的元素,这个移除操作可以是take, poll, peek 或者 remove。
*/
int takeIndex;

/**
* 下一个待插入位置的索引: put, offer, add方法使用
* 该变量指向的索引位是下一个添加到队列的元素存储的索引位,这个添加操作可以是 put, offer 或者 add
*/
int putIndex;

/**
* 队列中的元素个数
* 该变量标示当前在队列集合中的总的元素数量
*/
int count;

/**
* 全局锁
* ArrayBlockingQueue队列使用基于AQS机制的可重入锁ReentrantLock进行线程安全性控制并采用双条件控制方式对移除、添加操作进行交互控制
*/
final ReentrantLock lock;

/**
* 非空条件队列:当队列空时,线程在该队列等待获取
* 控制着移除操作条件
*/
private final Condition notEmpty;

/**
* 非满条件队列:当队列满时,线程在该队列等待插入
* 控制着添加操作条件
*/
private final Condition notFull;

//...
}
  • 实际上ArrayBlockingQueue是一个可循环使用数组空间有界阻塞队列,使用可复用的环形数组进行数据记录。其内部使用一个takeIndex变量代表队列头(队列头可在数组的任何有效索引位),使用一个putIndex``变量代码队列尾(队列为可不是数组最后一个索引位);从takeIndexputIndex的索引位置,是数组中已经放置了元素的位置,从putIndextakeIndex`的索引位置是数组中还可以放置新的元素的位置。

核心方法

插入元素 put(E e)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 在队尾插入指定元素,如果队列已满,则阻塞线程.

*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁
try {
while (count == items.length) // 队列已满。这里必须用while,防止虚假唤醒
notFull.await(); // 在notFull队列上等待
enqueue(e); // 队列未满, 直接入队
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 该方法负责在putIndex变量指定的索引位置添加新的数据
* 该方法内部虽然没有做线程安全性的操作,但是对该方法的调用者都有“持有锁”的要求:
* 在putIndex的索引位置放入新的元素,并将putIndex指向的索引位向后移动一位,如果移动后超出数组边界,则重新指向0号索引位。
*/
private void enqueue(E x) {
final Object[] items = this.items;
// 将入参的x元素添加到指定的数组索引位置
items[putIndex] = x;
// 将入参的x元素添加到指定的数组索引位置
if (++putIndex == items.length)
putIndex = 0;
// 集合总数据量的计数器 + 1
count++;
// 发出信号,帮助那些在集合为空时处于阻塞状态的线程(消费者线程),解除阻塞状态
notEmpty.signal();
}

删除元素 take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 从队首删除一个元素, 如果队列为空, 则阻塞线程
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空, 则线程在notEmpty条件队列等待
notEmpty.await();
return dequeue(); // 队列非空,则出队一个元素
} finally {
lock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 该方法负责从takeIndex指向的索引位移除一个元素
* 该方法内部虽然没有做线程安全性的操作,但是对该方法的调用者都有“持有锁”的要求:
* 在takeIndex的索引位的数据将被移除,并将takeIndex指向的索引位向后移动一位,如果移动后超出数组边界,则重新指向0号索引位。
*/
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
// 将已经移除了数据的索引位置为null,以便帮助可能的GC动作
items[takeIndex] = null;
// 移除后,如果下一个索引位超出了边界,则索引位置重新指向0
if (++takeIndex == items.length) // 如果队列已空
takeIndex = 0;
// 集合总数据量的计数器 - 1
count--;
// 如果存在迭代器(们),则迭代器也需要进行数据清理
if (itrs != null)
itrs.elementDequeued();
// 发出信号,帮助那些在集合已满时进入阻塞状态的线程(生产者线程),解除阻塞状态
notFull.signal();
return x;
}

ArrayBlockingQueue的迭代器

  • ArrayBlockingQueue为了管理一个和多个迭代器,专门设立了一个Itrs迭代器组的概念,除了detached(独立/无效)工作模式下的迭代器外,ArrayBlockingQueue队列中目前所有正在被使用的迭代器都基于Itrs迭代器组构造成一个单向链表结构,列表中的每个节点使用“弱引用”方式进行对象引用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 class Itrs {

/**
* Node in a linked list of weak iterator references.
* 这是Itrs迭代器组的一个Node节点定义
*/
private class Node extends WeakReference<Itr> {
// next属性指向Itrs迭代器组单向链表中的下一个Node节点
Node next;
// 每一个Node节点都弱引用一个iterator迭代器
Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}
/**
* Incremented whenever takeIndex wraps around to 0
* 该属性非常重要,它记录takeIndex索引重新回到0号索引位的次数
* 由此来描述takeIndex索引位的“圈数”
*/
int cycles = 0;

/**
* Linked list of weak iterator references
* 这是Itrs迭代器的第一个Node节点的,以便进行整个单向链表的构建、遍历和管理
*
*/
private Node head;

/**
* Used to expunge stale iterators
* Itrs迭代器组在特定的场景下会进行Node单向链表的清理,该属性表示上次一清理到的Node节点
* 以便在下一次清理时使用(不用回到head处重新遍历了)
*
*/
private Node sweeper = null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
  private class Itr implements Iterator<E> {
/** Index to look for new nextItem; NONE at end */
// 当前游标索引位
private int cursor;

/** Element to be returned by next call to next(); null if none */
// 专门为支持hashNext方法和next方法配合所使用的属性,用于在调用next方法返回数据
private E nextItem;

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
// 专门为支持hashNext方法和next方法配合所使用的属性,记录调用next方法返回数据的索引位
private int nextIndex;

/** Last element returned; null if none or not detached. */
// 最后一次(上一次)迭代器遍历操作时返回的元素
private E lastItem;

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
// 最后一次(上一次)迭代器遍历操作时返回的元素的索引位
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
// 该变量表示本迭代器最后一次(上一次)从ArrayBlockingQueue队列中获取到的takeIndex索引位
// 该属性还有一个重要的作用,用来表示当前迭代器是否是“独立”工作模式(或者迭代器是否失效)
private int prevTakeIndex;

/** Previous value of iters.cycles */
// 最后一次(上一次)从ArrayBlockingQueue队列获取的takeIndex索引位回到0号索引位的次数
// 这个值非常重要,是判定迭代器是否有效的重要依据
private int prevCycles;

/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;

/**
* Special index value indicating "removed elsewhere", that is,
* removed by some operation other than a call to this.remove().
*/
// 该常量表示索引位所表示的值已经被remove()方法以外的操作移除
private static final int REMOVED = -2;

/** Special value for prevTakeIndex indicating "detached mode" */
// 该常量值赋值到prevTakeIndex,以表示当前迭代器变成“独立”(无效)工作模式
private static final int DETACHED = -3;
}
  • NONE:一般用来表示指定的索引位已完成任务或者不可用(主要用于Itr迭代器的lastRet索引、nextIndex索引);
  • REMOVED:一般用来表示指定的索引位上的元素已经被其它线程的操作移除(用于Itr迭代器的lastRet索引、nextIndex索引)
  • DETACHED:一般标识在prevTakeIndex变量上,表示当前迭代器为“独立/无效”工作模式(主要用于Itr迭代器的prevTakeIndex索引)。

Itr迭代器实例化过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
// 进行迭代器的初始化,也需要获取操作锁
lock.lock();
try {
// 如果当前条件成立,说明这时ArrayBlockingQueue队列集合没有任何元素
// 这时将当前迭代器作为独立模式进行创建。
if (count == 0) {
// assert itrs == null;
// 当前游标索引无意义
cursor = NONE;
// 下一迭代索引位无意义
nextIndex = NONE;
// 使用该变量,标识当前迭代器独立工作,无需注册到Itrs迭代器组中
prevTakeIndex = DETACHED;
} else {
// 将当前ArrayBlockingQueue队列集合的takeIndex值(下一个取数索引位)
// 记录到prevTakeIndex变量中,作为当前迭代器开始遍历的索引位置
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
// 取出当前开始遍历的索引位上的数据,记录到nextItem变量中(nextIndex值也同时设定),作为将要调用的next()方法的返回值
nextItem = itemAt(nextIndex = takeIndex);
// 确定下一个游标位(incCursor(int)方法很重要,具体过程请参见该方法上的注释)
// 迭代器初始化时,第一个游标位是takeIndex索引位的下一个索引位
// 这是因为遍历起始索引位已经记录在了prevTakeIndex变量中
cursor = incCursor(takeIndex);
// 通过以上过程,迭代器的初始化过程基本完成,现在将这个迭代器对象注册到Itrs迭代器组中
// 如果Itrs迭代器组还没有初始化,则进行Itrs组的初始化,并将当前迭代器对象作为Itrs迭代器组的第一个Node节点
if (itrs == null) {
itrs = new Itrs(this);
}
// 其它情况则将当前迭代器注册到Itrs迭代器组中,并清理Itrs迭代器组中过期/无效的迭代器节点。
else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
// Itrs迭代器组中最重要的一个数值就是当前ArrayBlockingQueue队列集合takeIndex变量通过循环数组0号索引位的次数
// 这个次数记录在Itrs迭代器组的cycles变量中,前者将在这里被赋值给迭代器的prevCycles变量
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}

/**
* 该私有方法根据ArrayBlockingQueue队列集合的固定长度和状态
* 确定下一个游标索引值。
*/
private int incCursor(int index) {
// 有几种情况:
// a、如果下一个索引值等于当前队列容量,说明当前遍历的位置跨过了环形数组的0号索引位,这时设置下一游标位置为0
// b、如果下一个索引值等于ArrayBlockingQueue队列putIndex索引值,说明已经没有能遍历的数据了,这时设置下一游标位置为NONE
// c、其它情况下,index简单+1,就是下一个游标位置
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
public E next() {
final E e = nextItem;
// 注意,如果nextItem中没有数据,则直接抛出异常,这就是为什么在执行next()方法前,
// 一定要先使用hasNext()方法检查迭代器的有效性
if (e == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
// 只有在获得锁的情况下才能执行next遍历操作
lock.lock();
try {
// 如果当前迭代器不是“独立”模式(也就是说没有失效)
// 则通过incorporateDequeues方法对lastRet、nextIndex、cursor、prevCycles、prevTakeIndex属性进行修正
// 保证以上这些属性的状态值,和当前ArrayBlockingQueue队列集合的状态一致。
// incorporateDequeues方法很重要
if (!isDetached()) {
incorporateDequeues();
}
// assert nextIndex != NONE;
// assert lastItem == null;
// 将nextIndex索引位置赋值给lastRet,表示之前取next元素的索引位已经变成了“上一个”取出数据的索引位
lastRet = nextIndex;
final int cursor = this.cursor;
// 如果当前游标有效(不为NONE)
if (cursor >= 0) {
// 那么游标的索引位置就成为下一个取数的位置
nextItem = itemAt(nextIndex = cursor);
// assert nextItem != null;
// 接着游标索引位+1,注意:这是游标索引位可能为None
// 代表取出下一个数后,就再无数可取,遍历结束
this.cursor = incCursor(cursor);
}
// 否则就认为已无数可取,迭代器工作结束
else {
// 这时设定nextIndex为NONE,,设定nextItem为Null
nextIndex = NONE;
nextItem = null;
// 如果条件成立,则标识当前迭代器为“独立”(无效)工作状态
if (lastRet == REMOVED) {
detach();
}
}
} finally {
lock.unlock();
}
return e;
}

// 该方法负责将当前Itr迭代器置为“独立/失效”工作状态,既将prevTakeIndex设置为DETACHED
// 这个动作可能发生在以下多种场景下:
// 1、当Itrs迭代器组要停止对某个Itr迭代器进行状态跟踪时。
// 2、当迭代器中已经没有更多的索引位可以遍历时。
// 3、当迭代器发生了一些处理异常时,
// 4、当incorporateDequeues()方法中判定三个关键索引位全部失效时(cursor < 0 && nextIndex < 0 && lastRet < 0)
private void detach() {
// Switch to detached mode
// assert lock.isHeldByCurrentThread();
// assert cursor == NONE;
// assert nextIndex < 0;
// assert lastRet < 0 || nextItem == null;
// assert lastRet < 0 ^ lastItem != null;
if (prevTakeIndex >= 0) {
// assert itrs != null;
// 设定一个Itr迭代器失效,就是设定prevTakeIndex属性为DETACHED常量
prevTakeIndex = DETACHED;
// try to unlink from itrs (but not too hard)
// 一旦该迭代器被标识为“独立”(无效)工作模式,则试图清理该迭代器对象在Itrs迭代器组中的监控信息
itrs.doSomeSweeping(true);
}
}
/**
* 该方法在用于在Itr迭代器多次操作的间歇间,ArrayBlockingQueue队列状态发生变化的情况下
* 对Itr的重要索引位置进行修正(甚至是让Itr在极端情况下无效)
*/
private void incorporateDequeues() {
// assert lock.isHeldByCurrentThread();
// assert itrs != null;
// assert !isDetached();
// assert count > 0;

// 这是ArrayBlockingQueue目前记录的takeIndex索引位回到0号索引位的次数
final int cycles = itrs.cycles;
// 这是ArrayBlockingQueue目前记录的takeIndex索引位的值
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
// 这是本迭代器中上一次获取到的takeIndex索引位回到0号索引位的次数(值为0)
final int prevCycles = this.prevCycles;
// 这是本迭代器中上一次获取到的takeIndex索引位的值
final int prevTakeIndex = this.prevTakeIndex;

// 如果发现cycles和prevCycles存在差异,或者takeIndex和prevTakeIndex存在差异
// 则说明在迭代器的两次操作间隔中,ArrayBlockingQueue中的数据发生了变化,那么需要进行修正
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
// ArrayBlockingQueue队列中循环数组的容量长度
// 和代码配套的示意图中,该值为X+1
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
// 这句计算非常重要,就是计算在所有读取操作后,两次takeIndex索引产生的索引距离(已出队的数据量)
long dequeues = (long) (cycles - prevCycles) * len + (takeIndex - prevTakeIndex);

// Check indices for invalidation
// 判定lastRet索引位置是否失效,如果失效则赋值为-2
if (invalidated(lastRet, prevTakeIndex, dequeues, len)) {
lastRet = REMOVED;
}
// 判定nextIndex索引位是否失效,如果失效则赋值为-2
if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) {
nextIndex = REMOVED;
}
// 判定nextIndex索引位是否失效,如果失效则将ArrayBlockingQueue目前记录的takeIndex索引位的值赋给它
// 让cursor游标索引位,指向当前ArrayBlockingQueue队列的head位置
if (invalidated(cursor, prevTakeIndex, dequeues, len)) {
cursor = takeIndex;
}

// 如果cursor索引、nextIndex索引、lastRet索引,则表示当前Itr游标失效
// 调用detach()方法将当前Itr迭代器标记为失效,并清理Itrs迭代器组中的Node信息
if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
detach();
}
// 否则(大部分情况)修正Itr迭代器中的状态,以便其能从修正的位置开始进行遍历
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}

/**
* Returns true if index is invalidated by the given number of
* dequeues, starting from prevTakeIndex.
* 该方法依据prevTakeIndex索引的位置、两次takeIndex索引移动的距离(已出队的数据量),以便判定给定的index索引位置是否已经失效
* 如果失效,则返回true,其它情况返回false。
*/
private boolean invalidated(int index, int prevTakeIndex, long dequeues, int length) {
// 如果需要判定的索引位本来就已经失效了(NONE、REMOVED、DETACHED这些常量都为负数)
if (index < 0) {
return false;
}
// 计算index索引位置和prevTakeIndex索引位置的距离
// 最简单的就是当前index的索引位减去prevTakeIndex的索引位值
int distance = index - prevTakeIndex;
// 如果以上计算出来是一个负值,说明index的索引位已经“绕场一周”
// 这时在distance的基础上面,增加一个队列长度值,
if (distance < 0) {
distance += length;
}
return dequeues > distance;
}

总结

  • ArrayBlockingQueue维护了一把全局锁,无论是出队还是入队,都共用这把锁,这就导致任一时间点只有一个线程能够执行。那么对于“生产者-消费者”模式来说,意味着生产者和消费者不能并发执行。