当前位置 : 主页 > 网络安全 > 测试自动化 >

ThreadPoolExecutor

来源:互联网 收集:自由互联 发布时间:2021-06-22
ThreadPoolExecutor 线程池的意义 提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明

ThreadPoolExecutor

线程池的意义

  提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;

  统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。

ThreadPoolExecutor系列重载方法

源码

 1 public ThreadPoolExecutor(int corePoolSize, // 1
 2                               int maximumPoolSize,  // 2
 3                               long keepAliveTime,  // 3
 4                               TimeUnit unit,  // 4
 5                               BlockingQueue<Runnable> workQueue, // 5
 6                               ThreadFactory threadFactory,  // 6
 7                               RejectedExecutionHandler handler ) { //7
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0) 12             throw new IllegalArgumentException(); 13         if (workQueue == null || threadFactory == null || handler == null) 14             throw new NullPointerException(); 15         this.corePoolSize = corePoolSize; 16         this.maximumPoolSize = maximumPoolSize; 17         this.workQueue = workQueue; 18         this.keepAliveTime = unit.toNanos(keepAliveTime); 19         this.threadFactory = threadFactory; 20         this.handler = handler; 21     }

ThreadPoolExecutor参数

参数详解

1,corePoolSize和maximumPoolSize

  线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。

  当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSizesetMaximumPoolSize进行动态更改。

2,keepAliveTime和unit

  如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止。这提供了一种在不积极使用线程池时减少资源消耗的方法。如果池在以后变得更加活跃,则应构建新线程。 也可以使用方法setKeepAliveTime(long,TimeUnit)进行动态调整。

  防止空闲线程在关闭之前终止,可以使用如下方法:setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS);

  默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)也可用于将此超时策略应用于核心线程。

3,workQueue

  BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。

Direct handoffs 直接握手队列

  Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。

Unbounded queues 无界队列

  当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。

Bounded queues 有界队列

  一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:

  使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。

  使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。

4,threadFactory线程工厂

  新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。

  通过提供不同的ThreadFactory,您可以更改线程的名称,线程组,优先级,守护进程状态等。如果ThreadCactory在通过从newThread返回null询问时未能创建线程,则执行程序将继续,但可能无法执行任何任务。

  线程应该有modifyThread权限。 如果工作线程或使用该池的其他线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,并且关闭池可能会保持可终止但尚未完成的状态。

5,handler(Rejected tasks 拒绝任务)

  拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;

  无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。预定义了四种处理策略:

  AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;

  CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;

  DiscardPolicy:直接丢弃新提交的任务;

  DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);

  我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。

预定义线程池解析

1,FixedThreadPool

  适用场景:可用于Web服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

1  public static ExecutorService newFixedThreadPool(int nThreads) { 2         return new ThreadPoolExecutor(nThreads, nThreads, 3                                       0L, TimeUnit.MILLISECONDS, 4                                       new LinkedBlockingQueue<Runnable>()); 5     }

  corePoolSize与maximumPoolSize相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势;

  keepAliveTime = 0 该参数默认对核心线程无效,而FixedThreadPool全部为核心线程;

  workQueue 为LinkedBlockingQueue(无界阻塞队列),队列最大值为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;

  FixedThreadPool的任务执行是无序的;

2,CachedThreadPool

  适用场景:快速处理大量耗时较短的任务,如Netty的NIO接受请求时,可使用CachedThreadPool。

1  public static ExecutorService newCachedThreadPool() { 2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 3                                       60L, TimeUnit.SECONDS, 4                                       new SynchronousQueue<Runnable>()); 5     }

  corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制;

  keepAliveTime = 60s,线程空闲60s后自动结束。

  workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为CachedThreadPool线程创建无限制,不会有队列等待,所以使用SynchronousQueue;

3,SingleThreadExecutor

1 public static ExecutorService newSingleThreadExecutor() { 2         return new FinalizableDelegatedExecutorService 3             (new ThreadPoolExecutor(1, 1, 4                                     0L, TimeUnit.MILLISECONDS, 5                                     new LinkedBlockingQueue<Runnable>())); 6     }

  FixedThreadPool可以向下转型为ThreadPoolExecutor,并对其线程池进行配置,而SingleThreadExecutor被包装后,无法成功向下转型。因此,SingleThreadExecutor被定以后,无法修改,做到了真正的Single。

4,ScheduledThreadPool

1  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { 2         return new ScheduledThreadPoolExecutor(corePoolSize); 3     }

  newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,构造是还是调用了其父类的构造方法。

1  public ScheduledThreadPoolExecutor(int corePoolSize) { 2         super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 3               new DelayedWorkQueue()); 4     }

自定义线程池

 1 package io.guangsoft.erp;  2 
 3 import java.util.concurrent.*;  4 import java.util.concurrent.atomic.AtomicInteger;  5 
 6 public class ThreadPoolTest {  7 
 8     static class NameTreadFactory implements ThreadFactory {  9 
10         private final AtomicInteger mThreadNum = new AtomicInteger(1); 11 
12  @Override 13         public Thread newThread(Runnable r) { 14             Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement()); 15             System.out.println(t.getName() + " has been created"); 16             return t; 17  } 18  } 19 
20     public static class MyIgnorePolicy implements RejectedExecutionHandler { 21 
22         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 23  doLog(r, e); 24  } 25 
26         private void doLog(Runnable r, ThreadPoolExecutor e) { 27             System.err.println( r.toString() + " rejected"); 28             System.out.println("completedTaskCount: " + e.getCompletedTaskCount()); 29  } 30  } 31 
32     static class MyTask implements Runnable { 33         private String name; 34 
35         public MyTask(String name) { 36             this.name = name; 37  } 38 
39  @Override 40         public void run() { 41             try { 42                 System.out.println(this.toString() + " is running!"); 43                 Thread.sleep(3000); //让任务执行慢点
44             } catch (InterruptedException e) { 45  e.printStackTrace(); 46  } 47  } 48 
49         public String getName() { 50             return name; 51  } 52 
53  @Override 54         public String toString() { 55             return "MyTask [name=" + name + "]"; 56  } 57  } 58 
59     public static void main(String[] args) throws Exception { 60         int corePoolSize = 2; 61         int maximumPoolSize = 4; 62         long keepAliveTime = 10; 63         TimeUnit unit = TimeUnit.SECONDS; 64         BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2); 65         ThreadFactory threadFactory = new NameTreadFactory(); 66         RejectedExecutionHandler handler = new MyIgnorePolicy(); 67         ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, 68  workQueue, threadFactory, handler); 69         executor.prestartAllCoreThreads(); // 预启动所有核心线程
70 
71         for (int i = 1; i <= 10; i++) { 72             MyTask task = new MyTask(String.valueOf(i)); 73  executor.execute(task); 74  } 75 
76         System.in.read(); //阻塞主线程
77  } 78 }
网友评论