BlockingQueue
BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。
BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。
BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用:1、抛出异常;2、返回特殊值(null 或 true/false,取决于具体的操作);3、阻塞等待此操作,直到这个操作成功;4、阻塞等待此操作,直到成功或者超时指定时间。
| | Throws exception | Special value | Blocks | Times out | | — | — | — | — | — | |Insert|add(e)|offer(e)|put(e)|offer(e, time, unit)| |Remove|remove()|poll()|take()|poll(time, unit)| |Examine|element()|peek()|not applicable|not applicable| BlockingQueue 不接受 null 值的插入,相应的方法在碰到 null 的插入时会抛出 NullPointerException 异常。null 值在这里通常用于作为特殊值返回
一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)。
BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作
。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现。
ArrayBlockingQueue
它采用一个 ReentrantLock 和相应的两个 Condition 来实现。
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
// 用于存放元素的数组
final Object[] items;
// 下一次读取操作的位置
int takeIndex;
// 下一次写入操作的位置
int putIndex;
// 队列中的元素数量
int count;
// 以下几个就是控制并发用的同步器
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
我们用个示意图来描述其同步机制: ArrayBlockingQueue 实现并发同步的原理就是,读操作和写操作都需要获取到 AQS 独占锁才能进行操作。如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
对于 ArrayBlockingQueue,我们可以在构造的时候指定以下三个参数:
- 队列容量,其限制了队列中最多允许的元素个数;
- 指定独占锁是公平锁还是非公平锁。非公平锁的吞吐量比较高,公平锁可以保证每次都是等待最久的线程获取到锁;
- 可以指定用一个集合来初始化,将此集合中的元素在构造方法期间就先添加到队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生产
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}
// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}
LinkedBlockingQueue
底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。看构造方法:
1
2
3
4
5
6
7
8
9
10
11
// 传说中的无界队列
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 传说中的有界队列
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 队列容量
private final int capacity;
// 队列中的元素数量
private final AtomicInteger count = new AtomicInteger(0);
// 队头
private transient Node<E> head;
// 队尾
private transient Node<E> last;
// take, poll, peek 等读操作的方法需要获取到这个锁
private final ReentrantLock takeLock = new ReentrantLock();
// 如果读操作的时候队列是空的,那么等待 notEmpty 条件
private final Condition notEmpty = takeLock.newCondition();
// put, offer 等写操作的方法需要获取到这个锁
private final ReentrantLock putLock = new ReentrantLock();
// 如果写操作的时候队列是满的,那么等待 notFull 条件
private final Condition notFull = putLock.newCondition();
- takeLock 和 notEmpty 搭配:如果要获取(take)一个元素,需要获取 takeLock 锁,但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
- putLock 和 notFull 搭配:如果要插入(put)一个元素,需要获取 putLock 锁,但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。 首先,这里用一个示意图来看看 LinkedBlockingQueue 的并发读写控制,然后再开始分析源码:
1
2
3
4
5
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
注意,这里会初始化一个空的头结点,那么第一个元素入队的时候,队列中就会有两个元素。读取元素时,也总是获取头节点后面的一个节点。count 的计数值不包括这个头节点。
我们来看下 put 方法是怎么将元素插入到队尾的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 如果你纠结这里为什么是 -1,可以看看 offer 方法。这就是个标识成功、失败的标志而已。
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 必须要获取到 putLock 才可以进行插入操作
putLock.lockInterruptibly();
try {
// 如果队列满,等待 notFull 的条件满足。
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// count 原子加 1,c 还是加 1 前的值
c = count.getAndIncrement();
// 如果这个元素入队后,还有至少一个槽可以使用,调用 notFull.signal() 唤醒等待线程。
// 哪些线程会等待在 notFull 这个 Condition 上呢?
if (c + 1 < capacity)
notFull.signal();
} finally {
// 入队后,释放掉 putLock
putLock.unlock();
}
// 如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
// 那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
if (c == 0)
signalNotEmpty();
}
// 入队的代码非常简单,就是将 last 属性指向这个新元素,并且让原队尾的 next 指向这个元素
// 这里入队没有并发问题,因为只有获取到 putLock 独占锁以后,才可以进行此操作
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 元素入队后,如果需要,调用这个方法唤醒读线程来读
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
我们再看看 take 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 首先,需要获取到 takeLock 才能进行出队操作
takeLock.lockInterruptibly();
try {
// 如果队列为空,等待 notEmpty 这个条件满足再继续执行
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// count 进行原子减 1
c = count.getAndDecrement();
// 如果这次出队后,队列中至少还有一个元素,那么调用 notEmpty.signal() 唤醒其他的读线程
if (c > 1)
notEmpty.signal();
} finally {
// 出队后释放掉 takeLock
takeLock.unlock();
}
// 如果 c == capacity,那么说明在这个 take 方法发生的时候,队列是满的
// 既然出队了一个,那么意味着队列不满了,唤醒写线程去写
if (c == capacity)
signalNotFull();
return x;
}
// 取队头,出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
// 之前说了,头结点是空的
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
// 设置这个为新的头结点
head = first;
E x = first.item;
first.item = null;
return x;
}
// 元素出队后,如果需要,调用这个方法唤醒写线程来写
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
ArrayBlockingQueue 底层是数组,有界队列,如果我们要使用生产者-消费者模式,这是非常好的选择。LinkedBlockingQueue 底层是链表,可以当做无界和有界队列来使用,不要以为它就是无界队列。