Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit f7e495e

Browse files
committed
增加 线程池之ScheduledThreadPoolExecutor
1 parent 3676d6c commit f7e495e

File tree

1 file changed

+199
-0
lines changed

1 file changed

+199
-0
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
2+
# 1. ScheduledThreadPoolExecutor简介 #
3+
ScheduledThreadPoolExecutor可以用来在给定延时后执行异步任务或者周期性执行任务,相对于任务调度的Timer来说,其功能更加强大,Timer只能使用一个后台线程执行任务,而ScheduledThreadPoolExecutor则可以通过构造函数来指定后台线程的个数。ScheduledThreadPoolExecutor类的UML图如下:
4+
5+
6+
![ScheduledThreadPoolExecutor类的UML图.png](https://upload-images.jianshu.io/upload_images/2615789-adf418781bb01bf1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
7+
8+
9+
10+
1. 从UML图可以看出,ScheduledThreadPoolExecutor继承了`ThreadPoolExecutor`,也就是说ScheduledThreadPoolExecutor拥有execute()和submit()提交异步任务的基础功能,关于ThreadPoolExecutor[可以看这篇文章](https://juejin.im/post/5aeec0106fb9a07ab379574f)。但是,ScheduledThreadPoolExecutor类实现了`ScheduledExecutorService`,该接口定义了ScheduledThreadPoolExecutor能够延时执行任务和周期执行任务的功能;
11+
2. ScheduledThreadPoolExecutor也两个重要的内部类:**DelayedWorkQueue****ScheduledFutureTask**。可以看出DelayedWorkQueue实现了BlockingQueue接口,也就是一个阻塞队列,ScheduledFutureTask则是继承了FutureTask类,也表示该类用于返回异步任务的结果。这两个关键类,下面会具体详细来看。
12+
13+
14+
## 1.1 构造方法 ##
15+
16+
ScheduledThreadPoolExecutor有如下几个构造方法:
17+
18+
public ScheduledThreadPoolExecutor(int corePoolSize) {
19+
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
20+
new DelayedWorkQueue());
21+
};
22+
23+
public ScheduledThreadPoolExecutor(int corePoolSize,
24+
ThreadFactory threadFactory) {
25+
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
26+
new DelayedWorkQueue(), threadFactory);
27+
};
28+
public ScheduledThreadPoolExecutor(int corePoolSize,
29+
RejectedExecutionHandler handler) {
30+
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
31+
new DelayedWorkQueue(), handler);
32+
};
33+
34+
public ScheduledThreadPoolExecutor(int corePoolSize,
35+
ThreadFactory threadFactory,
36+
RejectedExecutionHandler handler) {
37+
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
38+
new DelayedWorkQueue(), threadFactory, handler);
39+
}
40+
41+
可以看出由于ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,它的构造方法实际上是调用了ThreadPoolExecutor,对ThreadPoolExecutor的介绍可以[可以看这篇文章](https://juejin.im/post/5aeec0106fb9a07ab379574f),理解ThreadPoolExecutor构造方法的几个参数的意义后,理解这就很容易了。可以看出,ScheduledThreadPoolExecutor的核心线程池的线程个数为指定的corePoolSize,当核心线程池的线程个数达到corePoolSize后,就会将任务提交给有界阻塞队列DelayedWorkQueue,对DelayedWorkQueue在下面进行详细介绍,线程池允许最大的线程个数为Integer.MAX_VALUE,也就是说理论上这是一个大小无界的线程池。
42+
43+
## 1.2 特有方法
44+
45+
ScheduledThreadPoolExecutor实现了`ScheduledExecutorService`接口,该接口定义了**可延时执行异步任务和可周期执行异步任务的特有功能**,相应的方法分别为:
46+
47+
//达到给定的延时时间后,执行任务。这里传入的是实现Runnable接口的任务,
48+
//因此通过ScheduledFuture.get()获取结果为null
49+
public ScheduledFuture<?> schedule(Runnable command,
50+
long delay, TimeUnit unit);
51+
//达到给定的延时时间后,执行任务。这里传入的是实现Callable接口的任务,
52+
//因此,返回的是任务的最终计算结果
53+
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
54+
long delay, TimeUnit unit);
55+
56+
//是以上一个任务开始的时间计时,period时间过去后,
57+
//检测上一个任务是否执行完毕,如果上一个任务执行完毕,
58+
//则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
59+
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
60+
long initialDelay,
61+
long period,
62+
TimeUnit unit);
63+
//当达到延时时间initialDelay后,任务开始执行。上一个任务执行结束后到下一次
64+
//任务执行,中间延时时间间隔为delay。以这种方式,周期性执行任务。
65+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
66+
long initialDelay,
67+
long delay,
68+
TimeUnit unit);
69+
70+
71+
# 2. 可周期性执行的任务---ScheduledFutureTask #
72+
73+
ScheduledThreadPoolExecutor最大的特色是能够周期性执行异步任务,当调用`schedule,scheduleAtFixedRate和scheduleWithFixedDelay方法`时,实际上是将提交的任务转换成的ScheduledFutureTask类,从源码就可以看出。以schedule方法为例:
74+
75+
public ScheduledFuture<?> schedule(Runnable command,
76+
long delay,
77+
TimeUnit unit) {
78+
if (command == null || unit == null)
79+
throw new NullPointerException();
80+
RunnableScheduledFuture<?> t = decorateTask(command,
81+
new ScheduledFutureTask<Void>(command, null,
82+
triggerTime(delay, unit)));
83+
delayedExecute(t);
84+
return t;
85+
}
86+
87+
可以看出,通过`decorateTask`会将传入的Runnable转换成`ScheduledFutureTask`类。线程池最大作用是将任务和线程进行解耦,线程主要是任务的执行者,而任务也就是现在所说的ScheduledFutureTask。紧接着,会想到任何线程执行任务,总会调用`run()`方法。为了保证ScheduledThreadPoolExecutor能够延时执行任务以及能够周期性执行任务,ScheduledFutureTask重写了run方法:
88+
89+
public void run() {
90+
boolean periodic = isPeriodic();
91+
if (!canRunInCurrentRunState(periodic))
92+
cancel(false);
93+
else if (!periodic)
94+
//如果不是周期性执行任务,则直接调用run方法
95+
ScheduledFutureTask.super.run();
96+
//如果是周期性执行任务的话,需要重设下一次执行任务的时间
97+
else if (ScheduledFutureTask.super.runAndReset()) {
98+
setNextRunTime();
99+
reExecutePeriodic(outerTask);
100+
}
101+
}
102+
103+
从源码可以很明显的看出,在重写的run方法中会先`if (!periodic)`判断当前任务是否是周期性任务,如果不是的话就直接调用`run()方法`;否则的话执行`setNextRunTime()`方法重设下一次任务执行的时间,并通过`reExecutePeriodic(outerTask)`方法将下一次待执行的任务放置到`DelayedWorkQueue`中。
104+
105+
因此,可以得出结论:**`ScheduledFutureTask`最主要的功能是根据当前任务是否具有周期性,对异步任务进行进一步封装。如果不是周期性任务(调用schedule方法)则直接通过`run()`执行,若是周期性任务,则需要在每一次执行完后,重设下一次执行的时间,然后将下一次任务继续放入到阻塞队列中。**
106+
107+
# 3. DelayedWorkQueue #
108+
109+
在ScheduledThreadPoolExecutor中还有另外的一个重要的类就是DelayedWorkQueue。为了实现其ScheduledThreadPoolExecutor能够延时执行异步任务以及能够周期执行任务,DelayedWorkQueue进行相应的封装。DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面。
110+
111+
> 为什么要使用DelayedWorkQueue呢?
112+
113+
定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
114+
115+
DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。
116+
117+
> DelayedWorkQueue的数据结构
118+
119+
//初始大小
120+
private static final int INITIAL_CAPACITY = 16;
121+
//DelayedWorkQueue是由一个大小为16的数组组成,数组元素为实现RunnableScheduleFuture接口的类
122+
//实际上为ScheduledFutureTask
123+
private RunnableScheduledFuture<?>[] queue =
124+
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
125+
private final ReentrantLock lock = new ReentrantLock();
126+
private int size = 0;
127+
128+
可以看出DelayedWorkQueue底层是采用数组构成的,关于[DelayedWorkQueue可以看这篇博主的文章](https://juejin.im/post/5aeebd02518825672f19c546),很详细。
129+
130+
关于DelayedWorkQueue我们可以得出这样的结论:**DelayedWorkQueue是基于堆的数据结构,按照时间顺序将每个任务进行排序,将待执行时间越近的任务放在在队列的队头位置,以便于最先进行执行**
131+
132+
# 4.ScheduledThreadPoolExecutor执行过程
133+
134+
现在我们对ScheduledThreadPoolExecutor的两个内部类ScheduledFutueTask和DelayedWorkQueue进行了了解,实际上这也是线程池工作流程中最重要的两个关键因素:**任务以及阻塞队列**。现在我们来看下ScheduledThreadPoolExecutor提交一个任务后,整体的执行过程。以ScheduledThreadPoolExecutor的schedule方法为例,具体源码为:
135+
136+
public ScheduledFuture<?> schedule(Runnable command,
137+
long delay,
138+
TimeUnit unit) {
139+
if (command == null || unit == null)
140+
throw new NullPointerException();
141+
//将提交的任务转换成ScheduledFutureTask
142+
RunnableScheduledFuture<?> t = decorateTask(command,
143+
new ScheduledFutureTask<Void>(command, null,
144+
triggerTime(delay, unit)));
145+
//延时执行任务ScheduledFutureTask
146+
delayedExecute(t);
147+
return t;
148+
}
149+
150+
方法很容易理解,为了满足ScheduledThreadPoolExecutor能够延时执行任务和能周期执行任务的特性,会先将实现Runnable接口的类转换成ScheduledFutureTask。然后会调用`delayedExecute`方法进行执行任务,这个方法也是关键方法,来看下源码:
151+
152+
153+
private void delayedExecute(RunnableScheduledFuture<?> task) {
154+
if (isShutdown())
155+
//如果当前线程池已经关闭,则拒绝任务
156+
reject(task);
157+
else {
158+
//将任务放入阻塞队列中
159+
super.getQueue().add(task);
160+
if (isShutdown() &&
161+
!canRunInCurrentRunState(task.isPeriodic()) &&
162+
remove(task))
163+
task.cancel(false);
164+
else
165+
//保证至少有一个线程启动,即使corePoolSize=0
166+
ensurePrestart();
167+
}
168+
}
169+
170+
`delayedExecute`方法的主要逻辑请看注释,可以看出该方法的重要逻辑会是在`ensurePrestart()`方法中,它的源码为:
171+
172+
void ensurePrestart() {
173+
int wc = workerCountOf(ctl.get());
174+
if (wc < corePoolSize)
175+
addWorker(null, true);
176+
else if (wc == 0)
177+
addWorker(null, false);
178+
}
179+
180+
可以看出该方法逻辑很简单,关键在于它所调用的`addWorker方法`,该方法主要功能:**新建`Worker类`,当执行任务时,就会调用被`Worker所重写的run方法`,进而会继续执行`runWorker`方法。在`runWorker`方法中会调用`getTask`方法从阻塞队列中不断的去获取任务进行执行,直到从阻塞队列中获取的任务为null的话,线程结束终止**。addWorker方法是ThreadPoolExecutor类中的方法,[对ThreadPoolExecutor的源码分析可以看这篇文章,很详细。](http://www.ideabuffer.cn/2017/04/04/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AThreadPoolExecutor/#addWorker%E6%96%B9%E6%B3%95)
181+
182+
# 5.总结 #
183+
184+
1. ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,因此,整体上功能一致,线程池主要负责创建线程(Worker类),线程从阻塞队列中不断获取新的异步任务,直到阻塞队列中已经没有了异步任务为止。但是相较于ThreadPoolExecutor来说,ScheduledThreadPoolExecutor具有延时执行任务和可周期性执行任务的特性,ScheduledThreadPoolExecutor重新设计了任务类`ScheduleFutureTask`,ScheduleFutureTask重写了`run`方法使其具有可延时执行和可周期性执行任务的特性。另外,阻塞队列`DelayedWorkQueue`是可根据优先级排序的队列,采用了堆的底层数据结构,使得与当前时间相比,待执行时间越靠近的任务放置队头,以便线程能够获取到任务进行执行;
185+
2. 线程池无论是ThreadPoolExecutor还是ScheduledThreadPoolExecutor,在设计时的三个关键要素是:任务,执行者以及任务结果。它们的设计思想也是完全将这三个关键要素进行了解耦。
186+
187+
**执行者**
188+
189+
任务的执行机制,完全交由`Worker类`,也就是进一步了封装了Thread。向线程池提交任务,无论为ThreadPoolExecutor的execute方法和submit方法,还是ScheduledThreadPoolExecutor的schedule方法,都是先将任务移入到阻塞队列中,然后通过addWork方法新建了Work类,并通过runWorker方法启动线程,并不断的从阻塞对列中获取异步任务执行交给Worker执行,直至阻塞队列中无法取到任务为止。
190+
191+
**任务**
192+
193+
在ThreadPoolExecutor和ScheduledThreadPoolExecutor中任务是指实现了Runnable接口和Callable接口的实现类。ThreadPoolExecutor中会将任务转换成`FutureTask`类,而在ScheduledThreadPoolExecutor中为了实现可延时执行任务和周期性执行任务的特性,任务会被转换成`ScheduledFutureTask`类,该类继承了FutureTask,并重写了run方法。
194+
195+
**任务结果**
196+
197+
在ThreadPoolExecutor中提交任务后,获取任务结果可以通过Future接口的类,在ThreadPoolExecutor中实际上为FutureTask类,而在ScheduledThreadPoolExecutor中则是`ScheduledFutureTask`类
198+
199+

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /