ThreadPoolExecutor
线程池的意义
提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。
ThreadPoolExecutor系列重载方法
源码
1 public ThreadPoolExecutor(int corePoolSize, // 1
2 int maximumPoolSize, // 2
3 long keepAliveTime, // 3
4 TimeUnit unit, // 4
5 BlockingQueue<Runnable> workQueue, // 5
6 ThreadFactory threadFactory, // 6
7 RejectedExecutionHandler handler ) { //7
8 if (corePoolSize < 0 ||
9 maximumPoolSize <= 0 ||
10 maximumPoolSize < corePoolSize ||
11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.corePoolSize = corePoolSize; 16 this.maximumPoolSize = maximumPoolSize; 17 this.workQueue = workQueue; 18 this.keepAliveTime = unit.toNanos(keepAliveTime); 19 this.threadFactory = threadFactory; 20 this.handler = handler; 21 }
ThreadPoolExecutor参数
参数详解
1,corePoolSize和maximumPoolSize
线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSize
和setMaximumPoolSize
进行动态更改。
2,keepAliveTime和unit
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止。这提供了一种在不积极使用线程池时减少资源消耗的方法。如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)
进行动态调整。
防止空闲线程在关闭之前终止,可以使用如下方法:setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);
默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)
也可用于将此超时策略应用于核心线程。
3,workQueue
BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。
Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。
Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。
Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
4,threadFactory线程工厂
新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。
通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。
5,handler(Rejected tasks 拒绝任务)
拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:
AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
DiscardPolicy:直接丢弃新提交的任务;
DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。
预定义线程池解析
1,FixedThreadPool
适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 }
corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;
keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;
workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;
FixedThreadPool的任务执行是无序的;
2,CachedThreadPool
适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。
1 public static ExecutorService newCachedThreadPool() { 2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3 60L, TimeUnit.SECONDS, 4 new SynchronousQueue<Runnable>()); 5 }
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;
keepAliveTime = 60s,线程空闲60s后自动结束。
workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;
3,SingleThreadExecutor
1 public static ExecutorService newSingleThreadExecutor() { 2 return new FinalizableDelegatedExecutorService 3 (new ThreadPoolExecutor(1, 1, 4 0L, TimeUnit.MILLISECONDS, 5 new LinkedBlockingQueue<Runnable>())); 6 }
FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。
4,ScheduledThreadPool
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2 return new ScheduledThreadPoolExecutor(corePoolSize); 3 }
newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。
1 public ScheduledThreadPoolExecutor(int corePoolSize) { 2 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 3 new DelayedWorkQueue()); 4 }
自定义线程池
1 package io.guangsoft.erp; 2
3 import java.util.concurrent.*; 4 import java.util.concurrent.atomic.AtomicInteger; 5
6 public class ThreadPoolTest { 7
8 static class NameTreadFactory implements ThreadFactory { 9
10 private final AtomicInteger mThreadNum = new AtomicInteger(1); 11
12 @Override 13 public Thread newThread(Runnable r) { 14 Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement()); 15 System.out.println(t.getName() + " has been created"); 16 return t; 17 } 18 } 19
20 public static class MyIgnorePolicy implements RejectedExecutionHandler { 21
22 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 23 doLog(r, e); 24 } 25
26 private void doLog(Runnable r, ThreadPoolExecutor e) { 27 System.err.println( r.toString() + " rejected"); 28 System.out.println("completedTaskCount: " + e.getCompletedTaskCount()); 29 } 30 } 31
32 static class MyTask implements Runnable { 33 private String name; 34
35 public MyTask(String name) { 36 this.name = name; 37 } 38
39 @Override 40 public void run() { 41 try { 42 System.out.println(this.toString() + " is running!"); 43 Thread.sleep(3000); //让任务执行慢点
44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } 47 } 48
49 public String getName() { 50 return name; 51 } 52
53 @Override 54 public String toString() { 55 return "MyTask [name=" + name + "]"; 56 } 57 } 58
59 public static void main(String[] args) throws Exception { 60 int corePoolSize = 2; 61 int maximumPoolSize = 4; 62 long keepAliveTime = 10; 63 TimeUnit unit = TimeUnit.SECONDS; 64 BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); 65 ThreadFactory threadFactory = new NameTreadFactory(); 66 RejectedExecutionHandler handler = new MyIgnorePolicy(); 67 ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 68 workQueue, threadFactory, handler); 69 executor.prestartAllCoreThreads(); // 预启动所有核心线程
70
71 for (int i = 1; i <= 10; i++) { 72 MyTask task = new MyTask(String.valueOf(i)); 73 executor.execute(task); 74 } 75
76 System.in.read(); //阻塞主线程
77 } 78 }