Executor框架是指java5中引入的一系列并发库中与executor相关的功能类,包括Executor、Executors、ExecutorService、CompletionService、Future、Callable等。(图片引用自http://www.javaclubcn.com/a/jichuzhishi/2012/1116/170.html)
本篇博文分析Executor中几个比较重要的接口和类。
Executor
1 public interface Executor {
2 void execute(Runnable command);
3 }
Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService。
ExecutorService
1 //继承自Executor接口
2 public interface ExecutorService extends Executor {
3 /**
4 * 关闭方法,调用后执行之前提交的任务,不再接受新的任务
5 */
6 void shutdown();
7 /**
8 * 从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表
9 */
10 List<Runnable> shutdownNow();
11 /**
12 * 判断执行器是否已经关闭
13 */
14 boolean isShutdown();
15 /**
16 * 关闭后所有任务是否都已完成
17 */
18 boolean isTerminated();
19 /**
20 * 中断
21 */
22 boolean awaitTermination(long timeout, TimeUnit unit)
23 throws InterruptedException;
24 /**
25 * 提交一个Callable任务
26 */
27 <T> Future<T> submit(Callable<T> task);
28 /**
29 * 提交一个Runable任务,result要返回的结果
30 */
31 <T> Future<T> submit(Runnable task, T result);
32 /**
33 * 提交一个任务
34 */
35 Future<?> submit(Runnable task);
36 /**
37 * 执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表
38 */
39 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
40 throws InterruptedException;
41 /**
42 * 执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
43 */
44 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
45 long timeout, TimeUnit unit)
46 throws InterruptedException;
47 /**
48 * 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。
49 */
50 <T> T invokeAny(Collection<? extends Callable<T>> tasks)
51 throws InterruptedException, ExecutionException;
52 /**
53 * 执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果。
54 */
55 <T> T invokeAny(Collection<? extends Callable<T>> tasks,
56 long timeout, TimeUnit unit)
57 throws InterruptedException, ExecutionException, TimeoutException;
58 }
ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。
ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。
1 // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
2 public interface Runnable {
3 // run方法就是它所有的内容,就是实际执行的任务
4 public abstract void run();
5 }
6 // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
7 public interface Callable<V> {
8 // 相对于run方法的带有返回值的call方法
9 V call() throws Exception;
10 }
Future
1 // Future代表异步任务的执行结果
2 public interface Future<V> {
3
4 /**
5 * 尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。
6 */
7 boolean cancel(boolean mayInterruptIfRunning);
8
9 /**
10 * 返回代表的任务是否在完成之前被取消了
11 */
12 boolean isCancelled();
13
14 /**
15 * 如果任务已经完成,返回true
16 */
17 boolean isDone();
18
19 /**
20 * 获取异步任务的执行结果(如果任务没执行完将等待)
21 */
22 V get() throws InterruptedException, ExecutionException;
23
24 /**
25 * 获取异步任务的执行结果(有最常等待时间的限制)
26 *
27 * timeout表示等待的时间,unit是它时间单位
28 */
29 V get(long timeout, TimeUnit unit)
30 throws InterruptedException, ExecutionException, TimeoutException;
31 }
ExecutorService有一个子接口ScheduledExecutorService和一个抽象实现类AbstractExecutorService。
ScheduledExecutorService
1 // 可以安排指定时间或周期性的执行任务的ExecutorService
2 public interface ScheduledExecutorService extends ExecutorService {
3 /**
4 * 在指定延迟后执行一个任务,只执行一次
5 */
6 public ScheduledFuture<?> schedule(Runnable command,
7 long delay, TimeUnit unit);
8 /**
9 * 与上面的方法相同,只是接受的是Callable任务
10 */
11 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
12 long delay, TimeUnit unit);
13 /**
14 * 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
15 * 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
16 */
17 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
18 long initialDelay,
19 long period,
20 TimeUnit unit);
21 /**
22 * 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
23 * 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
24 */
25 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
26 long initialDelay,
27 long delay,
28 TimeUnit unit);
29 }
ScheduledExecutorService定义了四个方法,已经在上面给出基本的解释。ScheduledExecutorService有两个实现类,分别是DelegatedScheduledExecutorService和ScheduledThreadPoolExecutor,将在后面介绍。还需要解释的是ScheduledFuture。
ScheduledFuture继承自Future和Delayed接口,自身没有添加方法。Delayed接口定义了一个获取剩余延迟的方法。
AbstractExecutorService
1 // 提供ExecutorService的默认实现
2 public abstract class AbstractExecutorService implements ExecutorService {
3 /*
4 * 为指定的Runnable和value构造一个FutureTask,value表示默认被返回的Future
5 */
6 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
7 return new FutureTask<T>(runnable, value);
8 }
9 /*
10 * 为指定的Callable创建一个FutureTask
11 */
12 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
13 return new FutureTask<T>(callable);
14 }
15 /*
16 * 提交Runnable任务
17 */
18 public Future<?> submit(Runnable task) {
19 if (task == null) throw new NullPointerException();
20 // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null
21 RunnableFuture<Object> ftask = newTaskFor(task, null);
22 // 调用具体实现的execute方法
23 execute(ftask);
24 return ftask;
25 }
26 /*
27 * 提交Runnable任务
28 */
29 public <T> Future<T> submit(Runnable task, T result) {
30 if (task == null) throw new NullPointerException();
31 // 通过newTaskFor方法构造RunnableFuture,默认的返回值是result
32 RunnableFuture<T> ftask = newTaskFor(task, result);
33 execute(ftask);
34 return ftask;
35 }
36 /*
37 * 提交Callable任务
38 */
39 public <T> Future<T> submit(Callable<T> task) {
40 if (task == null) throw new NullPointerException();
41 RunnableFuture<T> ftask = newTaskFor(task);
42 execute(ftask);
43 return ftask;
44 }
45
46 /*
47 * doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用
48 * tasks 是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间
49 */
50 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
51 boolean timed, long nanos)
52 throws InterruptedException, ExecutionException, TimeoutException {
53 // tasks空判断
54 if (tasks == null)
55 throw new NullPointerException();
56 // 任务数量
57 int ntasks = tasks.size();
58 if (ntasks == 0)
59 throw new IllegalArgumentException();
60 // 创建对应数量的Future返回集
61 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
62 ExecutorCompletionService<T> ecs =
63 new ExecutorCompletionService<T>(this);
64 try {
65 // 执行异常
66 ExecutionException ee = null;
67 // System.nanoTime()根据系统计时器当回当前的纳秒值
68 long lastTime = (timed)? System.nanoTime() : 0;
69 // 获取任务集的遍历器
70 Iterator<? extends Callable<T>> it = tasks.iterator();
71
72 // 向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中
73 futures.add(ecs.submit(it.next
74 // 修改任务计数器
75 --ntasks;
76 // 活跃任务计数器
77 int active = 1;
78 for (;;) {
79 // 获取并移除代表已完成任务的Future,如果不存在,返回null
80 Future<T> f = ecs.poll();
81 if (f == null) {
82 // 没有任务完成,且任务集中还有未提交的任务
83 if (ntasks > 0) {
84 // 剩余任务计数器减1
85 --ntasks;
86 // 提交任务并添加结果
87 futures.add(ecs.submit(it.next()));
88 // 活跃任务计数器加1
89 ++active;
90 }
91 // 没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环
92 else if (active == 0)
93 break;
94 else if (timed) {
95 // 获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数
96 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
97 if (f == null)
98 throw new TimeoutException();
99 // 计算剩余可用时间
100 long now = System.nanoTime();
101 nanos -= now - lastTime;
102 lastTime = now;
103 }
104 else
105 // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。
106 // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回
107 f = ecs.take();
108 }
109 // f不为空说明有一个任务完成了
110 if (f != null) {
111 // 已完成一个任务,所以活跃任务计数减1
112 --active;
113 try {
114 // 返回该任务的结果
115 return f.get();
116 } catch (InterruptedException ie) {
117 throw ie;
118 } catch (ExecutionException eex) {
119 ee = eex;
120 } catch (RuntimeException rex) {
121 ee = new ExecutionException(rex);
122 }
123 }
124 }
125 // 如果没有成功返回结果则抛出异常
126 if (ee == null)
127 ee = new ExecutionException();
128 throw ee;
129
130 } finally {
131 // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务
132 for (Future<T> f : futures)
133 f.cancel(true);
134 }
135 }
136
137 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
138 throws InterruptedException, ExecutionException {
139 try {
140 // 非定时任务的doInvokeAny调用
141 return doInvokeAny(tasks, false, 0);
142 } catch (TimeoutException cannotHappen) {
143 assert false;
144 return null;
145 }
146 }
147 // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位
148 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
149 long timeout, TimeUnit unit)
150 throws InterruptedException, ExecutionException, TimeoutException {
151 return doInvokeAny(tasks, true, unit.toNanos(timeout));
152 }
153 // 无超时设置的invokeAll方法
154 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
155 throws InterruptedException {
156 // 空任务判断
157 if (tasks == null)
158 throw new NullPointerException();
159 // 创建大小为任务数量的结果集
160 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
161 // 是否完成所有任务的标记
162 boolean done = false;
163 try {
164 // 遍历并执行任务
165 for (Callable<T> t : tasks) {
166 RunnableFuture<T> f = newTaskFor(t);
167 futures.add(f);
168 execute(f);
169 }
170 // 遍历结果集
171 for (Future<T> f : futures) {
172 // 如果某个任务没完成,通过f调用get()方法
173 if (!f.isDone()) {
174 try {
175 // get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待
176 f.get();
177 } catch (CancellationException ignore) {
178 } catch (ExecutionException ignore) {
179 }
180 }
181 }
182 // 标志所有任务执行完成
183 done = true;
184 // 返回结果
185 return futures;
186 } finally {
187 // 假如没有完成所有任务(可能是发生异常等情况),将任务取消
188 if (!done)
189 for (Future<T> f : futures)
190 f.cancel(true);
191 }
192 }
193 // 超时设置的invokeAll方法
194 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
195 long timeout, TimeUnit unit)
196 throws InterruptedException {
197 // 需要执行的任务集为空或时间单位为空,抛出异常
198 if (tasks == null || unit == null)
199 throw new NullPointerException();
200 // 将超时时间转为纳秒单位
201 long nanos = unit.toNanos(timeout);
202 // 创建任务结果集
203 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
204 // 是否全部完成的标志
205 boolean done = false;
206 try {
207 // 遍历tasks,将任务转为RunnableFuture
208 for (Callable<T> t : tasks)
209 futures.add(newTaskFor(t));
210 // 记录当前时间(单位是纳秒)
211 long lastTime = System.nanoTime();
212 // 获取迭代器
213 Iterator<Future<T>> it = futures.iterator();
214 // 遍历
215 while (it.hasNext()) {
216 // 执行任务
217 execute((Runnable)(it.next()));
218 // 记录当前时间
219 long now = System.nanoTime();
220 // 计算剩余可用时间
221 nanos -= now - lastTime;
222 // 更新上一次执行时间
223 lastTime = now;
224 // 超时,返回保存任务状态的结果集
225 if (nanos <= 0)
226 return futures;
227 }
228
229 for (Future<T> f : futures) {
230 // 如果有任务没完成
231 if (!f.isDone()) {
232 // 时间已经用完,返回保存任务状态的结果集
233 if (nanos <= 0)
234 return futures;
235 try {
236 // 获取计算结果,最多等待给定的时间nanos,单位是纳秒
237 f.get(nanos, TimeUnit.NANOSECONDS);
238 } catch (CancellationException ignore) {
239 } catch (ExecutionException ignore) {
240 } catch (TimeoutException toe) {
241 return futures;
242 }
243 // 计算可用时间
244 long now = System.nanoTime();
245 nanos -= now - lastTime;
246 lastTime = now;
247 }
248 }
249 // 修改是否全部完成的标记
250 done = true;
251 // 返回结果集
252 return futures;
253 } finally {
254 // 假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消
255 if (!done)
256 for (Future<T> f : futures)
257 f.cancel(true);
258 }
259 }
260 }
AbstractExecutor实现了ExecutorService接口的部分方法。具体代码的分析在上面已经给出。
AbstractExecutor有两个子类:DelegatedExecutorService、ThreadPoolExecutor。将在后面介绍。
下面是AbstractExecutor中涉及到的RunnableFuture、FutureTask、ExecutorCompletionService。
RunnableFuture继承自Future和Runnable,只有一个run()方法(Runnable中已经有一个run方法了,为什么RunnableFuture还要重新写一个run方法呢?求高手指教)。RunnableFuture接口看上去就像是Future和Runnable两个接口的组合。
FutureTask实现了RunnableFuture接口,除了实现了Future和Runnable中的方法外,它还有自己的方法和一个内部类Sync。
ExecutorCompletionService实现了CompletionService接口,将结果从复杂的一部分物种解耦出来。这些内容后续会介绍,不过这里先介绍框架中的其它内容,弄清整体框架。
下面看继承自AbstractExecutorService的ThreadPoolExecutor。
ThreadPoolExecutor
ThreadPoolExecutor(好长)
1 public class ThreadPoolExecutor extends AbstractExecutorService {
2 // 检查关闭的权限
3 private static final RuntimePermission shutdownPerm =
4 new RuntimePermission("modifyThread");
5 /* runState提供了主要的生命周期控制,可取值有以下几个:
6 * RUNNING:接受新的任务,处理队列中的任务
7 * SHUTDOWN:不再接受新的任务,但是处理队列中的任务
8 * STOP:不接受新任务,也不处理队列中的任务,打断正在处理的任务
9 * TERMINATED:和STOP类似,同时终止所有线程
10 * RUNNING -> SHUTDOWN
11 * On invocation of shutdown(), perhaps implicitly in finalize()
12 * (RUNNING or SHUTDOWN) -> STOP
13 * On invocation of shutdownNow()
14 * SHUTDOWN -> TERMINATED
15 * When both queue and pool are empty
16 * STOP -> TERMINATED
17 * When pool is empty
18 *
19 */
20 volatile int runState;
21 static final int RUNNING = 0;
22 static final int SHUTDOWN = 1;
23 static final int STOP = 2;
24 static final int TERMINATED = 3;
25
26 // 用于保持任务的队列
27 private final BlockingQueue<Runnable> workQueue;
28 // poolSize, corePoolSize, maximumPoolSize, runState, workers set的更新锁
29 private final ReentrantLock mainLock = new ReentrantLock();
30 // mainLock锁的一个Condition实例
31 private final Condition termination = mainLock.newCondition();
32 // 保持线程池中所有的工作线程。只有获取mainLock锁后才能访问。
33 private final HashSet<Worker> workers = new HashSet<Worker>();
34 // 空闲线程的等待时间,大为是纳秒
35 private volatile long keepAliveTime;
36 // 是否允许核心线程“活着” false(默认值)允许,哪怕空闲;true则使用keepAliveTime来控制等待超时时间
37 private volatile boolean allowCoreThreadTimeOut;
38 // 核心线程池的大小
39 private volatile int corePoolSize;
40 // pool size最大值
41 private volatile int maximumPoolSize;
42 // 当前pool大小
43 private volatile int poolSize;
44 // 拒绝执行的处理器 顾名思义,当一个任务被拒绝执行后将同个这个处理器进行处理
45 private volatile RejectedExecutionHandler handler;
46 // 线程工厂,用于创建线程
47 private volatile ThreadFactory threadFactory;
48 // 最终pool size达到的最大值
49 private int largestPoolSize;
50 // 已完成任务计数
51 private long completedTaskCount;
52 // 默认的拒绝执行的处理器
53 private static final RejectedExecutionHandler defaultHandler =
54 new AbortPolicy();
55 /**
56 * 关于借个size的说明:
57 * 线程池数量poolSize指工作线程Worker对象的集合workers的实际大小,通过workers.size()可直接获得。
58 * 核心线程池数量corePoolSize,可理解为工作线程Worker对象的集合workers的目标大小。
59 * 如果poolSize > corePoolSize,那么ThreadPoolExecutor就会有机制在适当的时候回收闲置的线程。
60 * 最大线程池数量maxPoolSize,就是工作线程Worker对象的集合workers的大小上限。
61 * 假如说任务队列满了,再来新任务时,若poolSize还没达到maxPoolSize,则继续创建新的线程来执行新任务,
62 * 若不幸poolSize达到了上限maxPoolSize,那不能再创建新的线程了,只能采取reject策略来拒绝新任务。
63 */
64 /** 构造方法 开始*/
65 public ThreadPoolExecutor(int corePoolSize,
66 int maximumPoolSize,
67 long keepAliveTime,
68 TimeUnit unit,
69 BlockingQueue<Runnable> workQueue) {
70 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
71 Executors.defaultThreadFactory(), defaultHandler);
72 }
73 public ThreadPoolExecutor(int corePoolSize,
74 int maximumPoolSize,
75 long keepAliveTime,
76 TimeUnit unit,
77 BlockingQueue<Runnable> workQueue,
78 ThreadFactory threadFactory) {
79 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
80 threadFactory, defaultHandler);
81 }
82 public ThreadPoolExecutor(int corePoolSize,
83 int maximumPoolSize,
84 long keepAliveTime,
85 TimeUnit unit,
86 BlockingQueue<Runnable> workQueue,
87 RejectedExecutionHandler handler) {
88 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
89 Executors.defaultThreadFactory(), handler);
90 }
91 // 主要的构造方法,其它构造方法都是对这个方法的调用
92 public ThreadPoolExecutor(int corePoolSize,
93 int maximumPoolSize,
94 long keepAliveTime,
95 TimeUnit unit,
96 BlockingQueue<Runnable> workQueue,
97 ThreadFactory threadFactory,
98 RejectedExecutionHandler handler) {
99 // 非法输入(明显这些值都是不能小于0的)
100 if (corePoolSize < 0 ||
101 maximumPoolSize <= 0 ||
102 maximumPoolSize < corePoolSize ||
103 keepAliveTime < 0)
104 throw new IllegalArgumentException();
105 // 空判断
106 if (workQueue == null || threadFactory == null || handler == null)
107 throw new NullPointerException();
108 this.corePoolSize = corePoolSize;
109 this.maximumPoolSize = maximumPoolSize;
110 this.workQueue = workQueue;
111 this.keepAliveTime = unit.toNanos(keepAliveTime);
112 this.threadFactory = threadFactory;
113 this.handler = handler;
114 }
115 /** 构造方法 结束*/
116
117
118 // 执行Runnable任务
119 public void execute(Runnable command) {
120 if (command == null)
121 throw new NullPointerException();
122 /*如果当前线程数量poolSize>=核心线程数量corePoolSize,
123 那当然无法再把当前任务加入到核心线程池中执行了,于是进花括号选择其他的策略执行;
124 如果poolSize没有达到corePoolSize,那很自然是把当前任务放到核心线程池执行,
125 也就是执行逻辑或运算符后的方法addIfUnderCorePoolSize(command)。
126 “放到核心线程池执行”是什么意思呢?
127 就是new 一个新工作线程放到workers集合中,让这个新线程来执行当前的任务command,而这个新线程可以认为是核心线程池中的其中一个线程。*/
128 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
129 // 线程池状态时RUNNING且能将任务添加到worker队列中
130 if (runState == RUNNING && workQueue.offer(command)) {
131 // 加入了队列以后,只要保证有工作线程就ok了,工作线程会自动去执行任务队列的。
132 // 所以判断一下if ( runState != RUNNING || poolSize == 0),
133 // 在这个if为true时候,去保证一下任务队列有线程会执行,即执行ensureQueuedTaskHandled(command)方法。
134 // 这里有两种情况,情况一:runState != RUNNING,这种情况在ensureQueuedTaskHandled方法中会把任务丢给reject拒绝策略处理,
135 // 情况二:poolSize == 0,这种情况是new一个新线程加入到工作线程集合workers中。
136 if (runState != RUNNING || poolSize == 0)
137 ensureQueuedTaskHandled(command);
138 }
139 // 程序执行到这个分支,说明当前状态runState != RUNNING,或者任务队列workQueue已经满了。
140 // 先看第一个条件下,前面解释过runState,除了RUNNING状态,其他三个状态都不能接收新任务,
141 // 所以当runState != RUNNING时新任务只能根据reject策略拒绝,
142 // 而这个拒绝的逻辑是在addIfUnderMaximumPoolSize方法中实现的;
143 // 再看第二个条件下,workQueue已经满,潜在的条件是runState == RUNNING,这种情况怎么处理新任务呢?
144 // 很简单,若当前线程数量poolSize没有达到最大线程数量maxPoolSize,
145 // 则创建新的线程去执行这个无法加入任务队列的新任务,
146 // 否则就根据reject策略拒绝
147 else if (!addIfUnderMaximumPoolSize(command))
148 reject(command); // is shutdown or saturated
149 }
150 }
151
152 private Thread addThread(Runnable firstTask) {
153 Worker w = new Worker(firstTask);
154 // 创建一个新Thread t
155 Thread t = threadFactory.newThread(w);
156 if (t != null) {
157 w.thread = t;
158 workers.add(w);
159 int nt = ++poolSize;
160 // 跟踪线程池大小的最大值
161 if (nt > largestPoolSize)
162 largestPoolSize = nt;
163 }
164 return t;
165 }
166
167 // 创建并启动新线程执行firstTask(在运行线程数小于核心线程池大小的情况且状态是RUNNING)
168 private boolean addIfUnderCorePoolSize(Runnable firstTask) {
169 Thread t = null;
170 final ReentrantLock mainLock = this.mainLock;
171 // 获取锁
172 mainLock.lock();
173 try {
174 if (poolSize < corePoolSize && runState == RUNNING)
175 // 创建一个新线程
176 t = addThread(firstTask);
177 } finally {
178 // 释放锁
179 mainLock.unlock();
180 }
181 if (t == null)
182 return false;
183 // 启动线程执行任务
184 t.start();
185 return true;
186 }
187
188 // 创建并启动新线程执行firstTask(在运行线程数小于pool size的最大值的情况且状态是RUNNING)
189 private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
190 Thread t = null;
191 final ReentrantLock mainLock = this.mainLock;
192 mainLock.lock();
193 try {
194 if (poolSize < maximumPoolSize && runState == RUNNING)
195 t = addThread(firstTask);
196 } finally {
197 mainLock.unlock();
198 }
199 if (t == null)
200 return false;
201 t.start();
202 return true;
203 }
204
205 // 确保任务被处理
206 private void ensureQueuedTaskHandled(Runnable command) {
207 final ReentrantLock mainLock = this.mainLock;
208 mainLock.lock();
209 // 拒绝标记
210 boolean reject = false;
211 Thread t = null;
212 try {
213 int state = runState;
214 // 如果状态不是RUNNING,能成功从worker队列中移除,则拒绝这个任务执行
215 if (state != RUNNING && workQueue.remove(command))
216 reject = true;
217 else if (state < STOP &&
218 poolSize < Math.max(corePoolSize, 1) &&
219 !workQueue.isEmpty())
220 t = addThread(null);
221 } finally {
222 mainLock.unlock();
223 }
224 if (reject)
225 reject(command);
226 else if (t != null)
227 // 不用拒绝任务则启动线程执行任务
228 t.start();
229 }
230
231 // 调用RejectedExecutionHandler决绝任务
232 void reject(Runnable command) {
233 handler.rejectedExecution(command, this);
234 }
235 // 工作线程,实现了Runnable接口
236 private final class Worker implements Runnable {
237 // 每个任务执行都必须获取和释放runLock。这主要是防止中断的目的是取消工作线程,而不是中断正在运行的任务。
238 private final ReentrantLock runLock = new ReentrantLock();
239 // 要执行的任务
240 private Runnable firstTask;
241 // 每个线程完成任务的计数器,最后会统计到completedTaskCount中
242 volatile long completedTasks;
243 // 用于执行Runnable的线程
244 Thread thread;
245 // 构造方法
246 Worker(Runnable firstTask) {
247 this.firstTask = firstTask;
248 }
249 // 判断这个线程是否活动
250 boolean isActive() {
251 return runLock.isLocked();
252 }
253 // 中断闲置线程
254 void interruptIfIdle() {
255 final ReentrantLock runLock = this.runLock;
256 if (runLock.tryLock()) {
257 try {
258 if (thread != Thread.currentThread())
259 thread.interrupt();
260 } finally {
261 runLock.unlock();
262 }
263 }
264 }
265 // 中断
266 void interruptNow() {
267 thread.interrupt();
268 }
269
270
271 private void runTask(Runnable task) {
272 final ReentrantLock runLock = this.runLock;
273 runLock.lock();
274 try {
275
276 if (runState < STOP &&
277 Thread.interrupted() &&
278 runState >= STOP)
279 thread.interrupt();
280
281 boolean ran = false;
282 beforeExecute(thread, task);
283 try {
284 task.run();
285 ran = true;
286 afterExecute(task, null);
287 ++completedTasks;
288 } catch (RuntimeException ex) {
289 if (!ran)
290 afterExecute(task, ex);
291 throw ex;
292 }
293 } finally {
294 runLock.unlock();
295 }
296 }
297
298
299 public void run() {
300 try {
301 Runnable task = firstTask;
302 firstTask = null;
303 /**
304 * 注意这段while循环的执行逻辑,每执行完一个核心线程后,就会去线程池
305 * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止
306 */
307 while (task != null || (task = getTask()) != null) {
308 //你所提交的核心线程(任务)的运行逻辑
309 runTask(task);
310 task = null;
311 }
312 } finally {
313 // 当前工作线程退出
314 workerDone(this);
315 }
316 }
317 }
318
319 // 从池队列中取的核心线程(任务)的方法
320 Runnable getTask() {
321 for (;;) {
322 try {
323 // 获取运行状态
324 int state = runState;
325 // 大于SHUTDOWN,即STOP和TERMINATED状态,没有任务
326 if (state > SHUTDOWN)
327 return null;
328 Runnable r;
329 // SHUTDOWN状态
330 if (state == SHUTDOWN) // 帮助清空队列
331 r = workQueue.poll();
332 // 状态时RUNNING,且poolSize > corePoolSize或allowCoreThreadTimeOut
333 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
334 // 获取并移除元素(等待指定的时间)
335 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
336 else
337 // 获取并移除元素(会一直等待知道获取到有效元素)
338 r = workQueue.take();
339 // 获取结果不为空,返回
340 if (r != null)
341 return r;
342 // 检查一个获取任务失败的线程能否退出
343 if (workerCanExit()) {
344 if (runState >= SHUTDOWN) // 中断其他线程
345 interruptIdleWorkers();
346 return null;
347 }
348 // Else retry
349 } catch (InterruptedException ie) {
350 // On interruption, re-check runState
351 }
352 }
353 }
354
355 // 检查一个获取任务失败的线程能否退出
356 private boolean workerCanExit() {
357 final ReentrantLock mainLock = this.mainLock;
358 mainLock.lock();
359 boolean canExit;
360 try {
361 // 可以退出的条件是状态为STOP或TERMINATED或至少有一个处理非空队列的线程(在允许超时的情况下)
362 canExit = runState >= STOP ||
363 workQueue.isEmpty() ||
364 (allowCoreThreadTimeOut &&
365 poolSize > Math.max(1, corePoolSize));
366 } finally {
367 mainLock.unlock();
368 }
369 return canExit;
370 }
371
372 // 中断其他线程
373 void interruptIdleWorkers() {
374 final ReentrantLock mainLock = this.mainLock;
375 mainLock.lock();
376 try {
377 // 遍历工作线程
378 for (Worker w : workers)
379 // 尝试中断闲置线程
380 w.interruptIfIdle();
381 } finally {
382 mainLock.unlock();
383 }
384 }
385 // 工作线程退出要处理的逻辑
386 void workerDone(Worker w) {
387 final ReentrantLock mainLock = this.mainLock;
388 mainLock.lock();
389 try {
390 completedTaskCount += w.completedTasks;
391 workers.remove(w);//从工作线程缓存中删除
392 if (--poolSize == 0)//poolSize减一,这时其实又可以创建工作线程了
393 tryTerminate();//尝试终止
394 } finally {
395 mainLock.unlock();
396 }
397 }
398
399 // 尝试终止
400 private void tryTerminate() {
401 //终止的前提条件就是线程池里已经没有工作线程(Worker)了
402 if (poolSize == 0) {
403 int state = runState;
404 /**
405 * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个
406 * 工作线程来执行线程队列中等待的任务
407 */
408 if (state < STOP && !workQueue.isEmpty()) {
409 state = RUNNING; // disable termination check below
410 Thread t = addThread(null);
411 if (t != null)
412 t.start();
413 }
414 // 设置池状态为终止状态
415 if (state == STOP || state == SHUTDOWN) {
416 runState = TERMINATED;
417 termination.signalAll();
418 terminated();
419 }
420 }
421 }
422 // 发起一个有序的关闭在以前已提交任务的执行,但不接受新任务。如果已经关闭,调用不会有其他影响。
423 public void shutdown() {
424 // Gets the system security interface.
425 SecurityManager security = System.getSecurityManager();
426 if (security != null)
427 // 检查权限(以抛出异常的形式)
428 security.checkPermission(shutdownPerm);
429 final ReentrantLock mainLock = this.mainLock;
430 mainLock.lock();
431 try {
432 if (security != null) { // 检查调用者是否能修改线程
433 for (Worker w : workers)
434 security.checkAccess(w.thread);
435 }
436 // 获取运行状态
437 int state = runState;
438 // 小于SHUTDOWN的不就是RUNNING么。。。
439 if (state < SHUTDOWN)
440 runState = SHUTDOWN;
441
442 try {
443 for (Worker w : workers) {
444 // 中断线程
445 w.interruptIfIdle();
446 }
447 } catch (SecurityException se) { // Try to back out
448 runState = state;
449 // tryTerminate() here would be a no-op
450 throw se;
451 }
452 // 尝试终止
453 tryTerminate(); // Terminate now if pool and queue empty
454 } finally {
455 mainLock.unlock();
456 }
457 }
458
459
460 public List<Runnable> shutdownNow() {
461 SecurityManager security = System.getSecurityManager();
462 if (security != null)
463 security.checkPermission(shutdownPerm);
464
465 final ReentrantLock mainLock = this.mainLock;
466 mainLock.lock();
467 try {
468 if (security != null) { // Check if caller can modify our threads
469 for (Worker w : workers)
470 security.checkAccess(w.thread);
471 }
472
473 int state = runState;
474 // 与上一个方法主要区别在于状态和interruptNow方法
475 if (state < STOP)
476 runState = STOP;
477
478 try {
479 for (Worker w : workers) {
480 w.interruptNow();
481 }
482 } catch (SecurityException se) { // Try to back out
483 runState = state;
484 // tryTerminate() here would be a no-op
485 throw se;
486 }
487
488 List<Runnable> tasks = drainQueue();
489 tryTerminate(); // Terminate now if pool and queue empty
490 return tasks;
491 } finally {
492 mainLock.unlock();
493 }
494 }
495
496 // 清空队列
497 private List<Runnable> drainQueue() {
498 List<Runnable> taskList = new ArrayList<Runnable>();
499 // 将队列中的所有元素一到taskList中
500 workQueue.drainTo(taskList);
501 while (!workQueue.isEmpty()) {
502 Iterator<Runnable> it = workQueue.iterator();
503 try {
504 if (it.hasNext()) {
505 Runnable r = it.next();
506 // 从workQueue中移除,并添加到taskList中
507 if (workQueue.remove(r))
508 taskList.add(r);
509 }
510 } catch (ConcurrentModificationException ignore) {
511 }
512 }
513 return taskList;
514 }
515
516 public boolean isShutdown() {
517 return runState != RUNNING;
518 }
519
520
521 boolean isStopped() {
522 return runState == STOP;
523 }
524
525
526 public boolean isTerminating() {
527 int state = runState;
528 return state == SHUTDOWN || state == STOP;
529 }
530
531 public boolean isTerminated() {
532 return runState == TERMINATED;
533 }
534
535 public boolean awaitTermination(long timeout, TimeUnit unit)
536 throws InterruptedException {
537 long nanos = unit.toNanos(timeout);
538 final ReentrantLock mainLock = this.mainLock;
539 mainLock.lock();
540 try {
541 for (;;) {
542 if (runState == TERMINATED)
543 return true;
544 if (nanos <= 0)
545 return false;
546 nanos = termination.awaitNanos(nanos);
547 }
548 } finally {
549 mainLock.unlock();
550 }
551 }
552
553
554 protected void finalize() {
555 shutdown();
556 }
557
558
559 public void setThreadFactory(ThreadFactory threadFactory) {
560 if (threadFactory == null)
561 throw new NullPointerException();
562 this.threadFactory = threadFactory;
563 }
564
565
566 public ThreadFactory getThreadFactory() {
567 return threadFactory;
568 }
569
570
571 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
572 if (handler == null)
573 throw new NullPointerException();
574 this.handler = handler;
575 }
576
577
578 public RejectedExecutionHandler getRejectedExecutionHandler() {
579 return handler;
580 }
581
582 // 设置核心线程数 这里的设置将覆盖构造方法中的设置
583 // 如果小于构造方法的设置,多余的线程将被闲置
584 // 如果大于构造方法的设置,新线程将被用于执行排队的任务
585 public void setCorePoolSize(int corePoolSize) {
586 if (corePoolSize < 0)
587 throw new IllegalArgumentException();
588 final ReentrantLock mainLock = this.mainLock;
589 mainLock.lock();
590 try {
591 int extra = this.corePoolSize - corePoolSize;
592 this.corePoolSize = corePoolSize;
593 // 大于构造方法的设置
594 if (extra < 0) {
595 int n = workQueue.size();
596 while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
597 Thread t = addThread(null);
598 if (t != null)
599 t.start();
600 else
601 break;
602 }
603 }
604 // 小于构造方法的设置
605 else if (extra > 0 && poolSize > corePoolSize) {
606 try {
607 Iterator<Worker> it = workers.iterator();
608 while (it.hasNext() &&
609 extra-- > 0 &&
610 poolSize > corePoolSize &&
611 workQueue.remainingCapacity() == 0)
612 it.next().interruptIfIdle();
613 } catch (SecurityException ignore) {
614 // Not an error; it is OK if the threads stay live
615 }
616 }
617 } finally {
618 mainLock.unlock();
619 }
620 }
621
622
623 public int getCorePoolSize() {
624 return corePoolSize;
625 }
626
627
628 public boolean prestartCoreThread() {
629 return addIfUnderCorePoolSize(null);
630 }
631
632
633 public int prestartAllCoreThreads() {
634 int n = 0;
635 while (addIfUnderCorePoolSize(null))
636 ++n;
637 return n;
638 }
639
640
641 public boolean allowsCoreThreadTimeOut() {
642 return allowCoreThreadTimeOut;
643 }
644
645
646 public void allowCoreThreadTimeOut(boolean value) {
647 if (value && keepAliveTime <= 0)
648 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
649
650 allowCoreThreadTimeOut = value;
651 }
652
653 // 设置所允许的最大的线程数。这将覆盖在构造函数中设置的任何值。如果新值小于当前值,多余的现有线程将被终止时,他们成为闲置。
654 public void setMaximumPoolSize(int maximumPoolSize) {
655 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
656 throw new IllegalArgumentException();
657 final ReentrantLock mainLock = this.mainLock;
658 mainLock.lock();
659 try {
660 int extra = this.maximumPoolSize - maximumPoolSize;
661 this.maximumPoolSize = maximumPoolSize;
662 if (extra > 0 && poolSize > maximumPoolSize) {
663 try {
664 Iterator<Worker> it = workers.iterator();
665 while (it.hasNext() &&
666 extra > 0 &&
667 poolSize > maximumPoolSize) {
668 it.next().interruptIfIdle();
669 --extra;
670 }
671 } catch (SecurityException ignore) {
672 // Not an error; it is OK if the threads stay live
673 }
674 }
675 } finally {
676 mainLock.unlock();
677 }
678 }
679
680
681 public int getMaximumPoolSize() {
682 return maximumPoolSize;
683 }
684
685
686 public void setKeepAliveTime(long time, TimeUnit unit) {
687 if (time < 0)
688 throw new IllegalArgumentException();
689 if (time == 0 && allowsCoreThreadTimeOut())
690 throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
691 this.keepAliveTime = unit.toNanos(time);
692 }
693
694
695 public long getKeepAliveTime(TimeUnit unit) {
696 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
697 }
698
699
700 public BlockingQueue<Runnable> getQueue() {
701 return workQueue;
702 }
703
704
705 public boolean remove(Runnable task) {
706 return getQueue().remove(task);
707 }
708
709 // 移除所有被取消的任务
710 public void purge() {
711 // Fail if we encounter interference during traversal
712 try {
713 Iterator<Runnable> it = getQueue().iterator();
714 while (it.hasNext()) {
715 Runnable r = it.next();
716 if (r instanceof Future<?>) {
717 Future<?> c = (Future<?>)r;
718 if (c.isCancelled())
719 it.remove();
720 }
721 }
722 }
723 catch (ConcurrentModificationException ex) {
724 return;
725 }
726 }
727
728
729 public int getPoolSize() {
730 return poolSize;
731 }
732
733 // 获取活跃线程数
734 public int getActiveCount() {
735 final ReentrantLock mainLock = this.mainLock;
736 mainLock.lock();
737 try {
738 int n = 0;
739 for (Worker w : workers) {
740 if (w.isActive())
741 ++n;
742 }
743 return n;
744 } finally {
745 mainLock.unlock();
746 }
747 }
748
749
750 public int getLargestPoolSize() {
751 final ReentrantLock mainLock = this.mainLock;
752 mainLock.lock();
753 try {
754 return largestPoolSize;
755 } finally {
756 mainLock.unlock();
757 }
758 }
759
760
761 public long getTaskCount() {
762 final ReentrantLock mainLock = this.mainLock;
763 mainLock.lock();
764 try {
765 long n = completedTaskCount;
766 for (Worker w : workers) {
767 // 统计已经完成的任务
768 n += w.completedTasks;
769 // 如果w是活跃线程,说明正在执行一个任务,所以n加一
770 if (w.isActive())
771 ++n;
772 }
773 // 加上队列中的任务
774 return n + workQueue.size();
775 } finally {
776 mainLock.unlock();
777 }
778 }
779
780 // 获取已完成的任务数
781 public long getCompletedTaskCount() {
782 final ReentrantLock mainLock = this.mainLock;
783 mainLock.lock();
784 try {
785 long n = completedTaskCount;
786 for (Worker w : workers)
787 n += w.completedTasks;
788 return n;
789 } finally {
790 mainLock.unlock();
791 }
792 }
793
794
795 protected void beforeExecute(Thread t, Runnable r) { }
796
797
798 protected void afterExecute(Runnable r, Throwable t) { }
799
800
801 protected void terminated() { }
802
803 // 实现了RejectedExecutionHandler,即是一个拒绝执行的Handler
804 public static class CallerRunsPolicy implements RejectedExecutionHandler {
805
806 public CallerRunsPolicy() { }
807
808
809 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
810 if (!e.isShutdown()) {
811 r.run();
812 }
813 }
814 }
815
816
817 public static class AbortPolicy implements RejectedExecutionHandler {
818
819 public AbortPolicy() { }
820
821
822 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
823 throw new RejectedExecutionException();
824 }
825 }
826
827
828 public static class DiscardPolicy implements RejectedExecutionHandler {
829
830 public DiscardPolicy() { }
831
832
833 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
834 }
835 }
836
837
838 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
839
840 public DiscardOldestPolicy() { }
841
842
843 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
844 if (!e.isShutdown()) {
845 e.getQueue().poll();
846 e.execute(r);
847 }
848 }
849 }
850 }
可以参考http://xtu-xiaoxin.iteye.com/blog/647744
从上面的框架结构图中可以可以看出剩下的就是ScheduledThreadPoolExecutor和Executors。Executors是一个工具类,提供一些工厂和实用方法。
下面看ScheduledThreadPoolExecutor,它继承自ThreadPoolExecutor并实现了ScheduledExecutorService接口。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor
// 可以安排指定时间或周期性的执行任务的ExecutorService
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 在Shutdown的时候如果要取消或关闭任务,设置为false;true表示继续执行任务,在Shutdown之后
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
// false表示在Shutdown的时候取消Delayed的任务;true表示执行这个任务
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
// 打破调度联系,进而保证先进先出的顺序捆绑项目中的序列号
private static final AtomicLong sequencer = new AtomicLong(0);
// 基准时间
private static final long NANO_ORIGIN = System.nanoTime();
// 当前时间(相对于基准时间的值)
final long now() {
return System.nanoTime() - NANO_ORIGIN;
}
// RunnableScheduledFuture接口表示是否是周期性的
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
// 预定安排执行的时刻
private long time;
// 表示重复任务,0表示不重复,正数表示固定比率,负数表示固定延时
private final long period;
// 构造方法,构造一个只执行一次的任务
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 构造方法,构造一个按指定ns开始执行,指定period周期性执行
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 构造方法,构造一个延时执行的任务
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 按指定单位获取延时时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
// 判断传入延时和这个任务延时之间的大小关系
public int compareTo(Delayed other) {
// 为什么可以和Delayed比较?因为这个类实现了RunnableScheduledFuture接口,而RunnableScheduledFuture接口继承自Delayed接口
if (other == this) // compare zero ONLY if same object
return 0;
// other是ScheduledFutureTask实例
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// 比较大小
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0)? 0 : ((d < 0)? -1 : 1);
}
// 是否周期性的(包括延时的情况)
public boolean isPeriodic() {
return period != 0;
}
// 执行周期性的任务
private void runPeriodic() {
// 执行任务
boolean ok = ScheduledFutureTask.super.runAndReset();
// 判断是否已经shutdown
boolean down = isShutdown();
// 重新安排任务(如果没有shutdown或在没有关闭且允许在shutdown之后执行已存在的任务)
if (ok && (!down ||
(getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
!isStopped()))) {
long p = period;
if (p > 0)
// 计算下一次执行的时间
time += p;
else
// 计算触发时间
time = triggerTime(-p);
// 将任务添加到队列中
ScheduledThreadPoolExecutor.super.getQueue().add(this);
}
else if (down)
interruptIdleWorkers();
}
// 执行任务,根据是否周期性调用不同的方法
public void run() {
if (isPeriodic())
runPeriodic();
else
ScheduledFutureTask.super.run();
}
}
// 延迟执行
private void delayedExecute(Runnable command) {
// 如果已经shutdown,决绝任务
if (isShutdown()) {
reject(command);
return;
}
if (getPoolSize() < getCorePoolSize())
// 预启动线程
prestartCoreThread();
super.getQueue().add(command);
}
// 取消和清除关闭政策不允许运行的任务
private void cancelUnwantedTasks() {
// 获取shutdown策略
boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
if (!keepDelayed && !keepPeriodic)
super.getQueue().clear();
else if (keepDelayed || keepPeriodic) {
Object[] entries = super.getQueue().toArray();
for (int i = 0; i < entries.length; ++i) {
Object e = entries[i];
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
// 根据是否周期性的任务通过制定的值判断进行取消操作
if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
t.cancel(false);
}
}
entries = null;
// 净化,移除已经取消的任务
purge();
}
}
public boolean remove(Runnable task) {
if (!(task instanceof RunnableScheduledFuture))
return false;
return getQueue().remove(task);
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), handler);
}
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
// 避免移除,返回延迟的值
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(TimeUnit.NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
// 根据执行的延时时间执行任务
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// ScheduledFutureTask的result为null
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
// 延时执行
delayedExecute(t);
return t;
}
// 上一个方法的重载形式,接收的是Callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period...
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Object>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period)));
delayedExecute(t);
return t;
}
/**
* 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,时间单位都是unit
* 每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay), initialDelay + 2 * (任务运行时间+delay)...
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Boolean>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)));
delayedExecute(t);
return t;
}
// 执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 立即执行,延时时间是0
schedule(command, 0, TimeUnit.NANOSECONDS);
}
// 重新 AbstractExecutorService 的方法
public Future<?> submit(Runnable task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result),
0, TimeUnit.NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, TimeUnit.NANOSECONDS);
}
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
if (!value && isShutdown())
cancelUnwantedTasks();
}
public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
return continueExistingPeriodicTasksAfterShutdown;
}
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
if (!value && isShutdown())
cancelUnwantedTasks();
}
public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
return executeExistingDelayedTasksAfterShutdown;
}
// 关闭
public void shutdown() {
// 取消任务
cancelUnwantedTasks();
super.shutdown();
}
// 立即关闭,调用的是父类立即关闭的方法
public List<Runnable> shutdownNow() {
return super.shutdownNow();
}
// 返回使用这个执行器的任务队列
public BlockingQueue<Runnable> getQueue() {
return super.getQueue();
}
// 将DelayQueue<RunnableScheduledFuture> 包装为 BlockingQueue<Runnable>的类
// 类似于代理
private static class DelayedWorkQueue
extends AbstractCollection<Runnable>
implements BlockingQueue<Runnable> {
private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
public Runnable poll() { return dq.poll(); }
public Runnable peek() { return dq.peek(); }
public Runnable take() throws InterruptedException { return dq.take(); }
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
return dq.poll(timeout, unit);
}
public boolean add(Runnable x) {
return dq.add((RunnableScheduledFuture)x);
}
public boolean offer(Runnable x) {
return dq.offer((RunnableScheduledFuture)x);
}
public void put(Runnable x) {
dq.put((RunnableScheduledFuture)x);
}
public boolean offer(Runnable x, long timeout, TimeUnit unit) {
return dq.offer((RunnableScheduledFuture)x, timeout, unit);
}
public Runnable remove() { return dq.remove(); }
public Runnable element() { return dq.element(); }
public void clear() { dq.clear(); }
public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
public int drainTo(Collection<? super Runnable> c, int maxElements) {
return dq.drainTo(c, maxElements);
}
public int remainingCapacity() { return dq.remainingCapacity(); }
public boolean remove(Object x) { return dq.remove(x); }
public boolean contains(Object x) { return dq.contains(x); }
public int size() { return dq.size(); }
public boolean isEmpty() { return dq.isEmpty(); }
public Object[] toArray() { return dq.toArray(); }
public <T> T[] toArray(T[] array) { return dq.toArray(array); }
public Iterator<Runnable> iterator() {
return new Iterator<Runnable>() {
private Iterator<RunnableScheduledFuture> it = dq.iterator();
public boolean hasNext() { return it.hasNext(); }
public Runnable next() { return it.next(); }
public void remove() { it.remove(); }
};
}
}
}
在代码中都加了注释,我想大致能解释清楚吧。
Executor涉及的类还是比较多的,到此为止剩下的还有Executors
Executors
Executors中所定义的 Executor
、ExecutorService
、ScheduledExecutorService
、ThreadFactory
和 Callable
类的工厂和实用方法。此类支持以下各种方法:
- 创建并返回设置有常用配置字符串的
ExecutorService
的方法。 - 创建并返回设置有常用配置字符串的
ScheduledExecutorService
的方法。 - 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。
- 创建并返回
ThreadFactory
的方法,它可将新创建的线程设置为已知的状态。 - 创建并返回非闭包形式的
Callable
的方法,这样可将其用于需要 Callable
Executors提供的都是工具形式的方法,所以都是static的,并且这个类也没有必要实例化,所以它的构造方法时private的。下面主要看一下几个内部类。
RunnableAdapter
1 static final class RunnableAdapter<T> implements Callable<T> {
2 final Runnable task;
3 final T result;
4 RunnableAdapter(Runnable task, T result) {
5 this.task = task;
6 this.result = result;
7 }
8 public T call() {
9 task.run();
10 return result;
11 }
12 }
适配器。以Callable的形式执行Runnable并且返回给定的result。
PrivilegedCallable
1 static final class PrivilegedCallable<T> implements Callable<T> {
2 private final AccessControlContext acc;
3 private final Callable<T> task;
4 private T result;
5 private Exception exception;
6 PrivilegedCallable(Callable<T> task) {
7 this.task = task;
8 this.acc = AccessController.getContext();
9 }
10
11 public T call() throws Exception {
12 AccessController.doPrivileged(new PrivilegedAction<T>() {
13 public T run() {
14 try {
15 result = task.call();
16 } catch (Exception ex) {
17 exception = ex;
18 }
19 return null;
20 }
21 }, acc);
22 if (exception != null)
23 throw exception;
24 else
25 return result;
26 }
27 }
在访问控制下运行的Callable。涉及到Java.security包中的内容。
PrivilegedCallableUsingCurrentClassLoader类与上面的PrivilegedCallable类似,只是使用的是CurrentClassLoader。
DefaultThreadFactory
1 static class DefaultThreadFactory implements ThreadFactory {
2 static final AtomicInteger poolNumber = new AtomicInteger(1);
3 final ThreadGroup group;
4 final AtomicInteger threadNumber = new AtomicInteger(1);
5 final String namePrefix;
6
7 DefaultThreadFactory() {
8 SecurityManager s = System.getSecurityManager();
9 group = (s != null)? s.getThreadGroup() :
10 Thread.currentThread().getThreadGroup();
11 namePrefix = "pool-" +
12 poolNumber.getAndIncrement() +
13 "-thread-";
14 }
15
16 public Thread newThread(Runnable r) {
17 // 调用Thread构造方法创建线程
18 Thread t = new Thread(group, r,
19 namePrefix + threadNumber.getAndIncrement(),
20 0);
21 // 取消守护线程设置
22 if (t.isDaemon())
23 t.setDaemon(false);
24 // 设置默认优先级
25 if (t.getPriority() != Thread.NORM_PRIORITY)
26 t.setPriority(Thread.NORM_PRIORITY);
27 return t;
28 }
29 }
DefaultThreadFactory 是默认的线程工程,提供创建线程的方法。
PrivilegedThreadFactory继承自DefaultThreadFactory,区别在于线程执行的run方法指定了classLoader并受到权限的控制。
DelegatedExecutorService继承自AbstractExecutorService,是一个包装类,暴露ExecutorService的方法。
DelegatedScheduledExecutorService继承自DelegatedExecutorService,实现了ScheduledExecutorService接口。它也是一个包装类,公开ScheduledExecutorService方法。