这可能是最简短的线程池分析文章了。
顶层设计,定义执行接口
Interface Executor(){ void execute(Runnable command); }
ExecutorService,定义控制接口
interface ExecutorService extends Executor{ }
抽象实现ExecutorService中的大部分方法
abstract class AbstractExecutorService implements ExecutorService{ //此处把ExecutorService中的提交方法都实现了 }
我们看下提交中的核心
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // ① //核心线程数没有满就继续添加核心线程 if (addWorker(command, true)) // ② return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { // ③ int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))// ④ reject(command); //⑦ else if (workerCountOf(recheck) == 0) // ⑤ //如果worker为0,则添加一个非核心worker,所以线程池里至少有一个线程 addWorker(null, false);// ⑥ } //队列满了以后,添加非核心线程 else if (!addWorker(command, false))// ⑧ reject(command);//⑦ }
这里就会有几道常见的面试题
1,什么时候用核心线程,什么时候启用非核心线程?
添加任务时优先使用核心线程,核心线程满了以后,任务放入队列中。只要队列不填满,就一直使用核心线程执行任务(代码①②)。
当队列满了以后开始使用增加非核心线程来执行队列中的任务(代码⑧)。
2,0个核心线程,2个非核心线程,队列100,添加99个任务是否会执行?
会执行,添加队列成功后,如果worker的数量为0,会添加非核心线程执行任务(见代码⑤⑥)
3,队列满了会怎么样?
队列满了,会优先启用非核心线程执行任务,如果非核心线程也满了,那就执行拒绝策略。
4,submit 和execute的区别是?
submit将执行任务包装成了RunnableFuture,最终返回了Future,executor 方法执行无返回值。
addworker实现
ThreadPoolExecutor extends AbstractExecutorService{ //保存所有的执行线程(worker) HashSet<Worker> workers = new HashSet<Worker>(); //存放待执行的任务,这块具体由指定的队列实现 BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){ } //添加执行worker private boolean addWorker(Runnable firstTask, boolean core) { //这里每次都会基础校验和cas校验,防止并发无法创建线程, retry: for(;;){ for(;;){ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } try{ //创建一个worker w = new Worker(firstTask); final Thread t = w.thread; try{ //加锁校验,添加到workers集合中 workers.add(w); } //添加成功,将对应的线程启动,执行任务 t.start(); }finally{ //失败执行进行释放资源 addWorkerFailed(Worker w) } } //Worker 是对任务和线程的封装 private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ //线程启动后会循环执行任务 public void run() { runWorker(this); } } //循环执行 final void runWorker(Worker w) { try{ while (task != null || (task = getTask()) != null) { //执行前的可扩展点 beforeExecute(wt, task); try{ //执行任务 task.run(); }finally{ //执行后的可扩展点,这块也把异常给吃了 afterExecute(task, thrown); } } //这里会对执行的任务进行统计 }finally{ //异常或者是循环退出都会走这里 processWorkerExit(w, completedAbruptly); } } //获取执行任务,此处决定runWorker的状态 private Runnable getTask() { //worker的淘汰策略:允许超时或者工作线程>核心线程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //满足淘汰策略且...,就返回null,交由processWorkerExit去处理线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } // 满足淘汰策略,就等一定的时间poll(),不满足,就一直等待take() Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); } //处理任务退出(循环获取不到任务的时候) private void processWorkerExit(Worker w, boolean completedAbruptly) { //异常退出的,不能调整线程数的 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //不管成功或失败,都执行以下逻辑 //1,计数,2,减去一个线程 completedTaskCount += w.completedTasks; //将线程移除,并不关心是否非核心 workers.remove(w); //如果是还是运行状态 if (!completedAbruptly) { //正常终止的,处理逻辑 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //核心线程为0 ,最小值也是1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //总线程数大于min就不再添加 if (workerCountOf(c) >= min) return; // replacement not needed } //异常退出一定还会添加worker,正常退出一般不会再添加线程,除非核心线程数为0 addWorker(null, false); } }
这里涉及到几个点:
1,任务异常以后虽然有throw异常,但是外面有好几个finally代码;
2,在finally中,进行了任务的统计以及worker移除;
3,如果还有等待处理的任务,最少添加一个worker(不管核心线程数是否为0)
这里会引申出来几个面试题:
1, 线程池中核心线程数如何设置?
cpu密集型:一般为核心线程数+1,尽可能减少cpu的并行;
IO密集型:可以设置核心线程数稍微多些,将IO等待期间的空闲cpu充分利用起来。
2,线程池使用队列的意义?
a)线程的资源是有限的,且线程的创建成本比较高;
b) 要保证cpu资源的合理利用(不能直接给cpu提一堆任务,cpu处理不过来,大家都慢了)
c) 利用了削峰填谷的思想(保证任务执行的可用性);
d) 队列过大也会把内存撑爆。
3,为什么要用阻塞队列?而不是非阻塞队列?
a) 利用阻塞的特性,在没有任务时阻塞一定的时间,防止资源被释放(getTask和processWorkExit);
b) 阻塞队列在阻塞时,CPU状态是wait,等有任务时,会被唤醒,不会占用太多的资源;
线程池有两个地方:
1,在execute方法中(提交任务时),只要工作线程为0,就至少添加一个Worker;
2,在processWorkerExit中(正常或异常结束时),只要有待处理的任务,就会增加Worker
所以正常情况下线程池一定会保证所有任务的执行。
我们在看下ThreadPoolExecutor中以下几个方法
public boolean prestartCoreThread() { return workerCountOf(ctl.get()) < corePoolSize && addWorker(null, true); } void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; }
确保了核心线程数必须是满的,这些方法特别是在批处理的时候,或者动态调整核心线程数的大小时很有用。
我们再看下Executors中常见的创建线程池的方法:
一、newFixedThreadPool 与newSingleThreadExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
特点:
1,核心线程数和最大线程数大小一样(唯一不同的是,一个是1,一个是自定义);
2,队列用的是LinkedBlockingQueue(长度是Integer.Max_VALUE)
当任务的生产速度大于消费速度后,很容易将系统内存撑爆。
二、 newCachedThreadPool 和
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
特点:最大线程数为Integer.MAX_VALUE
当任务提交过多时,线程创建过多容易导致无法创建
三、 newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
这个主要是并行度,默认为cpu的核数。
四、newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
封装起来的要么最大线程数不可控,要么队列长度不可控,所以阿里规约里也不建议使用Executors方法创建线程池。
ps:
生产上使用线程池,最好是将关键任务和非关键任务分开设立线程池,非关键业务影响关键业务的执行。
总结
到此这篇关于jdk8线程池的文章就介绍到这了,更多相关jdk8线程池内容请搜索自由互联以前的文章或继续浏览下面的相关文章希望大家以后多多支持自由互联!