线程池顶级类及其接口
转载自《线程池系列三》-线程池顶级类及其接口源码分析 (opens new window)
在了解ThreadPoolExecutor之前,我们必须先弄清楚其父类及其父接口的定义,了解这部分知识后,我们也可以自定义自己的执行器实现。
# 集成关系
首先看一下ThreadPoolExecutor的集成关系,如下所示:
public interface Executor
public interface ExecutorService extends Executor
public abstract class AbstractExecutorService implements ExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService
2
3
4
# Executor接口
该接口表示可执行的接口,只提供了一个方法execute(Runnable) 需要一个Runnable对象的参数,代码如下:
void execute(Runnable command);
# ExecutorService接口
该接口继承了Executor接口,并添加了执行器(这里指的是实现ExecutorService的类,不一定是线程池)相关的操作,比如关闭任务、提交任务、等待执行器关闭、执行器状态的判断等操作。
- 提交任务操作
submit(),提供了三种提交任务的方式,不管是哪一种提交操作,都是将传入的参数转换为RunnableFuture对象
- callable对象 在线程池中就是封装为FutureTask
- runnable对象 调用RunnableFuture的get()方法时,返回值为null
- runnable对象 + 泛型返回值T 调用RunnableFuture的get()方法时,返回值为T的值
其源码如下:
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
2
3
- 执行器的关闭操作
- shutdown() 关闭操作
- shutdownNow() 关闭操作,与shutdown()的返回值不同,其他的区别在讲解源码时进行分析
- awaitTermination() 等待执行器关闭,等待最长时间为参数时间
其源码如下:
void shutdown();
List<Runnable> shutdownNow();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
2
3
- 判断执行器状态操作
- isShutdown() 判断状态是否为关闭装填
- isTerminated() 判断执行器是否已经完全终止
其源码如下:
boolean isShutdown();
boolean isTerminated();
2
- 批量执行任务操作
与submit操作不同,该类型的任务是需要等等待任务执行完才会返回
- invokeAll(Collection) 等待提交的所有的任务都执行完,或者可以指定最长等待时间
- invokeAny(Collection) 执行完一个任务就可以返回,也可以制定最长的等待时间
这两种方法的实现是在AbstractExecutorService类中实现的,两者的具体含义在讲解源码时进行分析,其方法定义源码如下:
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
2
3
4
5
6
7
8
# AbstractExecutorService类
该类重要实现了ExecutorService中的一些公用方法,例如提交操作、批量执行任务操作,除此之外,还提供了讲Callable对象和Runnable对象转化为RunnableFuture的操作。
- 任务转化操作, 方法为protected方法,只能子类可以调用
都是调用的FutureTask的构造方法,可以看出该类中返回的RunnableFuture类型都是FutureTask类型
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
2
3
4
5
6
- 提交任务操作submit
这里只看一个方法就可以,思想就是将Runnable+T返回值,或者Callable对象转化为FutureTask类型,然后调用execute(Runnable)方法(FutureTask继承了Runnable接口,也是Runnable对象),返回FutureTask对象。其源代码如下:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
2
3
4
5
6
- 批量处理任务操作
(1) invokeAny方法
功能:提交多个任务,只有一个任务正常执行完成(异常不算正常完成)即可返回(但是任务的执行并不是一次性全部启动执行的)
阻塞方法invokeAny(Colletion)和非阻塞方法invokeAny(Colletion, timeout, unit)都调用doInvokeAny()方法
其执行逻辑如下:
- 先执行一个任务
- 检查任务有没有执行完成,如果执行完成,并且是正常执行完,则直接返回结果
- 如果任务没有执行完成,并且还有可执行的任务,那么再启动一个任务,再次检查有没有任务执行完,循环2,3步
- 如果没有可执行任务了,只能等待任务执行,调用take()阻塞方法
具体的流程参考源码,源码中有详细的注释,源码如下:
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//检查tasks集合参数
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//用于存放提交任务的返回结果值
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
//定义ExecutorCompletionService,讲this作为其构造参数,因为要用该执行器执行任务
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
//判断超时时间
long lastTime = timed ? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 提交第一个任务
futures.add(ecs.submit(it.next()));
//记录任务数
--ntasks;
//记录正在执行的任务数量
int active = 1;
for (;;) {
//注意:这里使用的是poll()非阻塞方法(没有执行完的任务时,返回null),而不是take()
Future<T> f = ecs.poll();
//说明没有执行完成的任务
if (f == null) {
//如果还存在可执行的任务,就添加任务执行
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
//所有的任务都不是正常结束的
else if (active == 0)
break;
//如果设定了超时,计算是否超时,如果已超时,抛出超时异常
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
//所有任务都提交了,等待有执行完成的任务
else
f = ecs.take();
}
// 存在已经执行完成的任务
if (f != null) {
--active;
try {
//返回执行任务结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//执行到这说明,没有通过return返回结果,也就是出现了问题,抛出异常
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//取消所有的任务
for (Future<T> f : futures)
f.cancel(true);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
从该方法中可以看出,任务并不是一次性启动的,然是先启动一个,判断有没有结束,如果没有结束那么再启动一个任务,如果已经结束则返回
再看invokeAny(Collection<? extends Callable<T>> tasks)
阻塞方法,其源码如下,只是简单的调用了doInvokeAny()方法,设置不超时:
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
2
3
4
5
6
7
8
9
那么invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
非阻塞方法,也是简单调用了doInvokeAny()方法而已,其源码如下:
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
2
3
4
5
(2) invokeAll方法
功能:执行完所有的任务才会返回
其执行逻辑:
- 执行所有的任务
- 等待所有任务执行完
详细的注释在源码中,源码如下:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
//参数检查
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//一次性启动并执行所有的任务
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
//判断任务有没有执行完,如果没有执行完,则等待
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
//get()方法为阻塞方法,只有执行完成才会返回
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
//表明所有的任务都已经执行完成,但是不能保证是否正常执行完
done = true;
return futures;
} finally {
//如果是异常终止,那么取消所有的任务
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
该方法不同于invokeAny, 该方法是一次性启动执行所有的任务,而且不管任务是否执行成功,执行完成即可
invokeAll(Collection<? extends Callable<T>> task, long timeout, TimeUnit unit)
方法只是简单的条件了判断超时的逻辑,在调用get方法时,不在是调用阻塞get(),而是调用可超时的get(timeout)方法,其源码如下:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//先将任务都放到list中,返回的list不管超时与否都是需要返回的
//所以先把任务都加到list中
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
Iterator<Future<T>> it = futures.iterator();
//在启动执行过程中,也判断是否超时
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
//每次都计算超时时间,调用非阻塞的get()方法
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
至此,ThreadPoolExecutor其父类和接口的源码已经分析完了,而线程池的源码分析才刚刚开始,还需要耐心学习下去。