Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 0a9d0b9

Browse files
committed
增加 ArrayBlockingQueue和LinkedBlockingQueue的实现原理
1 parent 368a2e7 commit 0a9d0b9

File tree

5 files changed

+373
-0
lines changed

5 files changed

+373
-0
lines changed
18.9 KB
Loading[フレーム]
9.74 KB
Loading[フレーム]
26 KB
Loading[フレーム]
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
2+
# 1. BlockingQueue简介 #
3+
在实际编程中,会经常使用到JDK中Collection集合框架中的各种容器类如实现List,Map,Queue接口的容器类,但是这些容器类基本上不是线程安全的,除了使用Collections可以将其转换为线程安全的容器,Doug Lea大师为我们都准备了对应的线程安全的容器,如实现List接口的CopyOnWriteArrayList([关于CopyOnWriteArrayList可以看这篇文章](https://juejin.im/post/5aeeb55f5188256715478c21)),实现Map接口的ConcurrentHashMap([关于ConcurrentHashMap可以看这篇文章](https://juejin.im/post/5aeeaba8f265da0b9d781d16)),实现Queue接口的ConcurrentLinkedQueue([关于ConcurrentLinkedQueue可以看这篇文章](https://juejin.im/post/5aeeae756fb9a07ab11112af))。
4+
5+
最常用的"**生产者-消费者**"问题中,队列通常被视作线程间操作的数据容器,这样,可以对各个模块的业务功能进行解耦,生产者将"生产"出来的数据放置在数据容器中,而消费者仅仅只需要在"数据容器"中进行获取数据即可,这样生产者线程和消费者线程就能够进行解耦,只专注于自己的业务功能即可。阻塞队列(BlockingQueue)被广泛使用在"生产者-消费者"问题中,其原因是BlockingQueue提供了可阻塞的插入和移除的方法。**当队列容器已满,生产者线程会被阻塞,直到队列未满;当队列容器为空时,消费者线程会被阻塞,直至队列非空时为止。**
6+
7+
# 2. 基本操作 #
8+
9+
BlockingQueue基本操作总结如下(此图来源于JAVA API文档):
10+
11+
![BlockingQueue基本操作.png](http://upload-images.jianshu.io/upload_images/2615789-19d06e0ba334fe52.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
12+
13+
BlockingQueue继承于Queue接口,因此,对数据元素的基本操作有:
14+
15+
> 插入元素
16+
17+
1. add(E e) :往队列插入数据,当队列满时,插入元素时会抛出IllegalStateException异常;
18+
2. offer(E e):当往队列插入数据时,插入成功返回`true`,否则则返回`false`。当队列满时不会抛出异常;
19+
20+
> 删除元素
21+
22+
1. remove(Object o):从队列中删除数据,成功则返回`true`,否则为`false`
23+
2. poll:删除数据,当队列为空时,返回null;
24+
25+
> 查看元素
26+
27+
1. element:获取队头元素,如果队列为空时则抛出NoSuchElementException异常;
28+
2. peek:获取队头元素,如果队列为空则抛出NoSuchElementException异常
29+
30+
BlockingQueue具有的特殊操作:
31+
32+
> 插入数据:
33+
34+
1. put:当阻塞队列容量已经满时,往阻塞队列插入数据的线程会被阻塞,直至阻塞队列已经有空余的容量可供使用;
35+
2. offer(E e, long timeout, TimeUnit unit):若阻塞队列已经满时,同样会阻塞插入数据的线程,直至阻塞队列已经有空余的地方,与put方法不同的是,该方法会有一个超时时间,若超过当前给定的超时时间,插入数据的线程会退出;
36+
37+
> 删除数据
38+
39+
1. take():当阻塞队列为空时,获取队头数据的线程会被阻塞;
40+
2. poll(long timeout, TimeUnit unit):当阻塞队列为空时,获取数据的线程会被阻塞,另外,如果被阻塞的线程超过了给定的时长,该线程会退出
41+
42+
43+
# 3. 常用的BlockingQueue #
44+
实现BlockingQueue接口的有`ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, LinkedTransferQueue, PriorityBlockingQueue, SynchronousQueue`,而这几种常见的阻塞队列也是在实际编程中会常用的,下面对这几种常见的阻塞队列进行说明:
45+
46+
> 1.ArrayBlockingQueue
47+
48+
**ArrayBlockingQueue**是由数组实现的有界阻塞队列。该队列命令元素FIFO(先进先出)。因此,对头元素时队列中存在时间最长的数据元素,而对尾数据则是当前队列最新的数据元素。ArrayBlockingQueue可作为"有界数据缓冲区",生产者插入数据到队列容器中,并由消费者提取。ArrayBlockingQueue一旦创建,容量不能改变。
49+
50+
当队列容量满时,尝试将元素放入队列将导致操作阻塞;尝试从一个空队列中取一个元素也会同样阻塞。
51+
52+
ArrayBlockingQueue默认情况下不能保证线程访问队列的公平性,所谓公平性是指严格按照线程等待的绝对时间顺序,即最先等待的线程能够最先访问到ArrayBlockingQueue。而非公平性则是指访问ArrayBlockingQueue的顺序不是遵守严格的时间顺序,有可能存在,一旦ArrayBlockingQueue可以被访问时,长时间阻塞的线程依然无法访问到ArrayBlockingQueue。**如果保证公平性,通常会降低吞吐量**。如果需要获得公平性的ArrayBlockingQueue,可采用如下代码:
53+
54+
private static ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(10,true);
55+
56+
关于ArrayBlockingQueue的实现原理,可以[看这篇文章](https://juejin.im/post/5aeebdb26fb9a07aa83ea17e)
57+
58+
59+
> 2.LinkedBlockingQueue
60+
61+
LinkedBlockingQueue是用链表实现的有界阻塞队列,同样满足FIFO的特性,与ArrayBlockingQueue相比起来具有更高的吞吐量,为了防止LinkedBlockingQueue容量迅速增,损耗大量内存。通常在创建LinkedBlockingQueue对象时,会指定其大小,如果未指定,容量等于Integer.MAX_VALUE
62+
63+
64+
> 3.PriorityBlockingQueue
65+
66+
PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采用自然顺序进行排序,也可以通过自定义类实现compareTo()方法来指定元素排序规则,或者初始化时通过构造器参数Comparator来指定排序规则。
67+
68+
> 4.SynchronousQueue
69+
70+
SynchronousQueue每个插入操作必须等待另一个线程进行相应的删除操作,因此,SynchronousQueue实际上没有存储任何数据元素,因为只有线程在删除数据时,其他线程才能插入数据,同样的,如果当前有线程在插入数据时,线程才能删除数据。SynchronousQueue也可以通过构造器参数来为其指定公平性。
71+
72+
73+
> 5.LinkedTransferQueue
74+
75+
LinkedTransferQueue是一个由链表数据结构构成的无界阻塞队列,由于该队列实现了TransferQueue接口,与其他阻塞队列相比主要有以下不同的方法:
76+
77+
**transfer(E e)**
78+
如果当前有线程(消费者)正在调用take()方法或者可延时的poll()方法进行消费数据时,生产者线程可以调用transfer方法将数据传递给消费者线程。如果当前没有消费者线程的话,生产者线程就会将数据插入到队尾,直到有消费者能够进行消费才能退出;
79+
80+
**tryTransfer(E e)**
81+
tryTransfer方法如果当前有消费者线程(调用take方法或者具有超时特性的poll方法)正在消费数据的话,该方法可以将数据立即传送给消费者线程,如果当前没有消费者线程消费数据的话,就立即返回`false`。因此,与transfer方法相比,transfer方法是必须等到有消费者线程消费数据时,生产者线程才能够返回。而tryTransfer方法能够立即返回结果退出。
82+
83+
**tryTransfer(E e,long timeout,imeUnit unit)**</br>
84+
与transfer基本功能一样,只是增加了超时特性,如果数据才规定的超时时间内没有消费者进行消费的话,就返回`false`
85+
86+
87+
> 6.LinkedBlockingDeque
88+
89+
LinkedBlockingDeque是基于链表数据结构的有界阻塞双端队列,如果在创建对象时为指定大小时,其默认大小为Integer.MAX_VALUE。与LinkedBlockingQueue相比,主要的不同点在于,LinkedBlockingDeque具有双端队列的特性。LinkedBlockingDeque基本操作如下图所示(来源于java文档)
90+
91+
![LinkedBlockingDeque的基本操作.png](http://upload-images.jianshu.io/upload_images/2615789-d51d940d30786e32.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/600)
92+
93+
94+
95+
如上图所示,LinkedBlockingDeque的基本操作可以分为四种类型:1.特殊情况,抛出异常;2.特殊情况,返回特殊值如null或者false;3.当线程不满足操作条件时,线程会被阻塞直至条件满足;4. 操作具有超时特性。
96+
97+
另外,LinkedBlockingDeque实现了BlockingDueue接口而LinkedBlockingQueue实现的是BlockingQueue,这两个接口的主要区别如下图所示(来源于java文档):
98+
99+
100+
![BlockingQueue和BlockingDeque的区别.png](http://upload-images.jianshu.io/upload_images/2615789-7316a2543b99caa2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/600)
101+
102+
从上图可以看出,两个接口的功能是可以等价使用的,比如BlockingQueue的add方法和BlockingDeque的addLast方法的功能是一样的。
103+
104+
> 7.DelayQueue
105+
106+
DelayQueue是一个存放实现Delayed接口的数据的无界阻塞队列,只有当数据对象的延时时间达到时才能插入到队列进行存储。如果当前所有的数据都还没有达到创建时所指定的延时期,则队列没有队头,并且线程通过poll等方法获取数据元素则返回null。所谓数据延时期满时,则是通过Delayed接口的`getDelay(TimeUnit.NANOSECONDS)`来进行判定,如果该方法返回的是小于等于0则说明该数据元素的延时期已满。
107+
108+
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
2+
# 1. ArrayBlockingQueue简介 #
3+
4+
在多线程编程过程中,为了业务解耦和架构设计,经常会使用并发容器用于存储多线程间的共享数据,这样不仅可以保证线程安全,还可以简化各个线程操作。例如在"生产者-消费者"问题中,会使用阻塞队列(BlockingQueue)作为数据容器,关于BlockingQueue可以[看这篇文章](https://juejin.im/post/5aeebd02518825672f19c546)。为了加深对阻塞队列的理解,唯一的方式是对其实验原理进行理解,这篇文章就主要来看看ArrayBlockingQueue和LinkedBlockingQueue的实现原理。
5+
6+
# 2. ArrayBlockingQueue实现原理 #
7+
8+
阻塞队列最核心的功能是,能够可阻塞式的插入和删除队列元素。当前队列为空时,会阻塞消费数据的线程,直至队列非空时,通知被阻塞的线程;当队列满时,会阻塞插入数据的线程,直至队列未满时,通知插入数据的线程(生产者线程)。那么,多线程中消息通知机制最常用的是lock的condition机制,关于condition可以[看这篇文章的详细介绍](https://juejin.im/post/5aeea5e951882506a36c67f0)。那么ArrayBlockingQueue的实现是不是也会采用Condition的通知机制呢?下面来看看。
9+
10+
## 2.1 ArrayBlockingQueue的主要属性
11+
12+
ArrayBlockingQueue的主要属性如下:
13+
14+
/** The queued items */
15+
final Object[] items;
16+
17+
/** items index for next take, poll, peek or remove */
18+
int takeIndex;
19+
20+
/** items index for next put, offer, or add */
21+
int putIndex;
22+
23+
/** Number of elements in the queue */
24+
int count;
25+
26+
/*
27+
* Concurrency control uses the classic two-condition algorithm
28+
* found in any textbook.
29+
*/
30+
31+
/** Main lock guarding all access */
32+
final ReentrantLock lock;
33+
34+
/** Condition for waiting takes */
35+
private final Condition notEmpty;
36+
37+
/** Condition for waiting puts */
38+
private final Condition notFull;
39+
40+
从源码中可以看出ArrayBlockingQueue内部是采用数组进行数据存储的(`属性items`),为了保证线程安全,采用的是`ReentrantLock lock`,为了保证可阻塞式的插入删除数据利用的是Condition,当获取数据的消费者线程被阻塞时会将该线程放置到notEmpty等待队列中,当插入数据的生产者线程被阻塞时,会将该线程放置到notFull等待队列中。而notEmpty和notFull等中要属性在构造方法中进行创建:
41+
42+
public ArrayBlockingQueue(int capacity, boolean fair) {
43+
if (capacity <= 0)
44+
throw new IllegalArgumentException();
45+
this.items = new Object[capacity];
46+
lock = new ReentrantLock(fair);
47+
notEmpty = lock.newCondition();
48+
notFull = lock.newCondition();
49+
}
50+
51+
接下来,主要看看可阻塞式的put和take方法是怎样实现的。
52+
53+
## 2.2 put方法详解
54+
55+
` put(E e)`方法源码如下:
56+
57+
public void put(E e) throws InterruptedException {
58+
checkNotNull(e);
59+
final ReentrantLock lock = this.lock;
60+
lock.lockInterruptibly();
61+
try {
62+
//如果当前队列已满,将线程移入到notFull等待队列中
63+
while (count == items.length)
64+
notFull.await();
65+
//满足插入数据的要求,直接进行入队操作
66+
enqueue(e);
67+
} finally {
68+
lock.unlock();
69+
}
70+
}
71+
72+
73+
该方法的逻辑很简单,当队列已满时(`count == items.length`)将线程移入到notFull等待队列中,如果当前满足插入数据的条件,就可以直接调用` enqueue(e)`插入数据元素。enqueue方法源码为:
74+
75+
private void enqueue(E x) {
76+
// assert lock.getHoldCount() == 1;
77+
// assert items[putIndex] == null;
78+
final Object[] items = this.items;
79+
//插入数据
80+
items[putIndex] = x;
81+
if (++putIndex == items.length)
82+
putIndex = 0;
83+
count++;
84+
//通知消费者线程,当前队列中有数据可供消费
85+
notEmpty.signal();
86+
}
87+
88+
enqueue方法的逻辑同样也很简单,先完成插入数据,即往数组中添加数据(`items[putIndex] = x`),然后通知被阻塞的消费者线程,当前队列中有数据可供消费(`notEmpty.signal()`)。
89+
90+
## 2.3 take方法详解
91+
92+
take方法源码如下:
93+
94+
95+
public E take() throws InterruptedException {
96+
final ReentrantLock lock = this.lock;
97+
lock.lockInterruptibly();
98+
try {
99+
//如果队列为空,没有数据,将消费者线程移入等待队列中
100+
while (count == 0)
101+
notEmpty.await();
102+
//获取数据
103+
return dequeue();
104+
} finally {
105+
lock.unlock();
106+
}
107+
}
108+
109+
take方法也主要做了两步:1. 如果当前队列为空的话,则将获取数据的消费者线程移入到等待队列中;2. 若队列不为空则获取数据,即完成出队操作`dequeue`。dequeue方法源码为:
110+
111+
private E dequeue() {
112+
// assert lock.getHoldCount() == 1;
113+
// assert items[takeIndex] != null;
114+
final Object[] items = this.items;
115+
@SuppressWarnings("unchecked")
116+
//获取数据
117+
E x = (E) items[takeIndex];
118+
items[takeIndex] = null;
119+
if (++takeIndex == items.length)
120+
takeIndex = 0;
121+
count--;
122+
if (itrs != null)
123+
itrs.elementDequeued();
124+
//通知被阻塞的生产者线程
125+
notFull.signal();
126+
return x;
127+
}
128+
129+
dequeue方法也主要做了两件事情:1. 获取队列中的数据,即获取数组中的数据元素(`(E) items[takeIndex]`);2. 通知notFull等待队列中的线程,使其由等待队列移入到同步队列中,使其能够有机会获得lock,并执行完成功退出。
130+
131+
从以上分析,可以看出put和take方法主要是通过condition的通知机制来完成可阻塞式的插入数据和获取数据。在理解ArrayBlockingQueue后再去理解LinkedBlockingQueue就很容易了。
132+
133+
134+
# 3. LinkedBlockingQueue实现原理 #
135+
LinkedBlockingQueue是用链表实现的有界阻塞队列,当构造对象时为指定队列大小时,队列默认大小为`Integer.MAX_VALUE`。从它的构造方法可以看出:
136+
137+
public LinkedBlockingQueue() {
138+
this(Integer.MAX_VALUE);
139+
}
140+
141+
142+
# 3.1 LinkedBlockingQueue的主要属性 #
143+
144+
145+
LinkedBlockingQueue的主要属性有:
146+
147+
/** Current number of elements */
148+
private final AtomicInteger count = new AtomicInteger();
149+
150+
/**
151+
* Head of linked list.
152+
* Invariant: head.item == null
153+
*/
154+
transient Node<E> head;
155+
156+
/**
157+
* Tail of linked list.
158+
* Invariant: last.next == null
159+
*/
160+
private transient Node<E> last;
161+
162+
/** Lock held by take, poll, etc */
163+
private final ReentrantLock takeLock = new ReentrantLock();
164+
165+
/** Wait queue for waiting takes */
166+
private final Condition notEmpty = takeLock.newCondition();
167+
168+
/** Lock held by put, offer, etc */
169+
private final ReentrantLock putLock = new ReentrantLock();
170+
171+
/** Wait queue for waiting puts */
172+
private final Condition notFull = putLock.newCondition();
173+
174+
可以看出与ArrayBlockingQueue主要的区别是,LinkedBlockingQueue在插入数据和删除数据时分别是由两个不同的lock(`takeLock``putLock`)来控制线程安全的,因此,也由这两个lock生成了两个对应的condition(`notEmpty``notFull`)来实现可阻塞的插入和删除数据。并且,采用了链表的数据结构来实现队列,Node结点的定义为:
175+
176+
static class Node<E> {
177+
E item;
178+
179+
/**
180+
* One of:
181+
* - the real successor Node
182+
* - this Node, meaning the successor is head.next
183+
* - null, meaning there is no successor (this is the last node)
184+
*/
185+
Node<E> next;
186+
187+
Node(E x) { item = x; }
188+
}
189+
190+
接下来,我们也同样来看看put方法和take方法的实现。
191+
192+
## 3.2 put方法详解 ##
193+
194+
put方法源码为:
195+
196+
public void put(E e) throws InterruptedException {
197+
if (e == null) throw new NullPointerException();
198+
// Note: convention in all put/take/etc is to preset local var
199+
// holding count negative to indicate failure unless set.
200+
int c = -1;
201+
Node<E> node = new Node<E>(e);
202+
final ReentrantLock putLock = this.putLock;
203+
final AtomicInteger count = this.count;
204+
putLock.lockInterruptibly();
205+
try {
206+
/*
207+
* Note that count is used in wait guard even though it is
208+
* not protected by lock. This works because count can
209+
* only decrease at this point (all other puts are shut
210+
* out by lock), and we (or some other waiting put) are
211+
* signalled if it ever changes from capacity. Similarly
212+
* for all other uses of count in other wait guards.
213+
*/
214+
//如果队列已满,则阻塞当前线程,将其移入等待队列
215+
while (count.get() == capacity) {
216+
notFull.await();
217+
}
218+
//入队操作,插入数据
219+
enqueue(node);
220+
c = count.getAndIncrement();
221+
//若队列满足插入数据的条件,则通知被阻塞的生产者线程
222+
if (c + 1 < capacity)
223+
notFull.signal();
224+
} finally {
225+
putLock.unlock();
226+
}
227+
if (c == 0)
228+
signalNotEmpty();
229+
}
230+
231+
put方法的逻辑也同样很容易理解,可见注释。基本上和ArrayBlockingQueue的put方法一样。take方法的源码如下:
232+
233+
public E take() throws InterruptedException {
234+
E x;
235+
int c = -1;
236+
final AtomicInteger count = this.count;
237+
final ReentrantLock takeLock = this.takeLock;
238+
takeLock.lockInterruptibly();
239+
try {
240+
//当前队列为空,则阻塞当前线程,将其移入到等待队列中,直至满足条件
241+
while (count.get() == 0) {
242+
notEmpty.await();
243+
}
244+
//移除队头元素,获取数据
245+
x = dequeue();
246+
c = count.getAndDecrement();
247+
//如果当前满足移除元素的条件,则通知被阻塞的消费者线程
248+
if (c > 1)
249+
notEmpty.signal();
250+
} finally {
251+
takeLock.unlock();
252+
}
253+
if (c == capacity)
254+
signalNotFull();
255+
return x;
256+
}
257+
258+
take方法的主要逻辑请见于注释,也很容易理解。
259+
260+
# 4. ArrayBlockingQueue与LinkedBlockingQueue的比较 #
261+
262+
**相同点**:ArrayBlockingQueue和LinkedBlockingQueue都是通过condition通知机制来实现可阻塞式插入和删除元素,并满足线程安全的特性;
263+
264+
**不同点**:1. ArrayBlockingQueue底层是采用的数组进行实现,而LinkedBlockingQueue则是采用链表数据结构;
265+
2. ArrayBlockingQueue插入和删除数据,只采用了一个lock,而LinkedBlockingQueue则是在插入和删除分别采用了`putLock``takeLock`,这样可以降低线程由于线程无法获取到lock而进入WAITING状态的可能性,从而提高了线程并发执行的效率。

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /