一、ForkJoinPool
ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。其广泛用在java8的stream中。
ForkJoinPool是 java 7 中新增的线程池类,它的继承体系如下:
ForkJoinPool 并不是为了替代 ThreadPoolExecutor 而出现的,而是作为一种它的补充。在处理 CPU 密集型任务的时候,它的性能比 ThreadPoolExecutor 更好,而如果你是 I/O 密集型任务的时候,除非配置 ManagedBlocker 一起使用,否则不建议使用它。
ForkJoinPool 可以根据CPU的核数并行的执行,适合使用在很耗时的操作,可以充分的利用CPU执行任务。
ForkJoinPool 的UML类图:
ForkJoinPool采取工作窃取算法,以避免工作线程由于拆分了任务之后的join等待过程。这样处于空闲的工作线程将从其他工作线程的队列中主动去窃取任务来执行。这里涉及到的两个基本知识点是分治法和工作窃取。
1.1 分治法
分治法的基本思想是一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。简单的问题,可以用二分法来完成。
二分法,就是我们之前在检索的时候经常用到的Binary Search 。这样可以迅速将时间复杂度从O(n)降低到O(log n)。那么对应到ForkJoinPool对问题的处理也如此。基本原理如下图:
这只是一个简化版本的Fork-Join,实际上我们在日常工作中的应用可能比这要复杂很多。但是基本原理类似。这样就将一个大的任务,通过fork方法不断拆解,直到能够计算为止,之后,再将这些结果用join合并。这样逐次递归,就得到了我们想要的结果。这就是再ForkJoinPool中的分治法。
1.2 工作窃取
工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在ForkJoinpool中,工作任务的队列都采用双端队列Deque容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上LIFO,而窃取其他线程的任务的时候,从队列头部取获取。示意图如下:
工作线程worker1、worker2以及worker3都从taskQueue的尾部popping获取task,而任务也从尾部Pushing,当worker3队列中没有任务的时候,就会从其他线程的队列中取stealing,这样就使得worker3不会由于没有任务而空闲。这就是工作窃取算法的基本原理。
可以想象,要是不使用工作窃取算法,那么我们在不断fork的过程中,可能某些worker就会一直处于join的等待中。
-
(1)每个工作线程都有自己的工作队列WorkQueue;
-
(2)这是一个双端队列,它是线程私有的;
-
(3)ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务;
-
(4)为了最大化地利用CPU,空闲的线程将从其它线程的队列中“窃取”任务来执行;
-
(5)从工作队列的尾部窃取任务,以减少竞争;
-
(6)双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
-
(7)当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;
1.3 构造方法
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池ForkJoinPool提供了如下两个常用的构造器。
@sun.misc.Contended public class ForkJoinPool extends AbstractExecutorService { //以Runtime.getRuntime().availableProcessors()的返回值作为parallelism来创建ForkJoinPool public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); } // 创建一个包含parallelism个并行线程的ForkJoinPool public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } }创建ForkJoinPool实例后,可以调用ForkJoinPool的submit(ForkJoinTask task) 、execute(ForkJoinTask<?> task) 或者 invoke(ForkJoinTask task) 来执行指定任务。其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它有两个抽象子类:RecursiveAction和RecursiveTask。
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
根据需要,继承这两个相关类之一,重写compute()方法,在compute()方法中分解任务,并且将任务结果合并。
ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
非fork/join端调用(Call from non-fork/join clients) fork/join内部调用(Call from within fork/join computations) 安排异步执行(Arrange async execution) ForkJoinPool.execute(ForkJoinTask) ForkJoinTask.fork() 等待并获取结果(Arrange async execution) ForkJoinPool.invoke(ForkJoinTask) ForkJoinTask.invoke() 安排执行并获取Future(Arrange exec and obtain Future) ForkJoinPool.submit(ForkJoinTask) ForkJoinTask.fork() (ForkJoinTasks are Futures)1.4 使用场景
- ForkJoinPool 非常适合执行任务比较多、执行事件比较短的程序,比如过滤集合中的元素(JDK1.8 stream 底层就是 ForkJoinPool )。
- 单个任务很大,执行起来很耗时间,这时,就可以把任务进行拆分,拆分成多个小任务去执行,然后小任务执行完毕后再把每个小任务执行的结果合并起来,这样就可以节省时间。
- ForkJoinPool 的出现不是为了替换 ThreadPoolExecutor 这一类的线程池,而是为了做功能上的补充(为了解决 CPU 负载不均衡的问题。如某个较大的任务,被一个线程去执行,而其他线程处于空闲状态),两者各有使用场景,根据不同的场景使用不同的线程池即可。
- ForkJoinPool 在处理 CPU 密集型任务的时候,它的性能比 ThreadPoolExecutor 更好,而如果你是 I/O 密集型任务的时候,除非配置 ManagedBlocker 一起使用,否则不建议使用它。
- ForkJoinPool 可以根据CPU的核数并行的执行,适合使用在很耗时的操作,可以充分的利用CPU执行任务。
二、源码分析
2.1 FokJoinPool的字段
2.1.1 常量
// Bounds 控制线程池相关边界的常量 // 二进制形式0b00000000_00000000_11111111_11111111 线程池索引掩码 static final int SMASK = 0xffff; // short bits == max index // 二进制形式0b00000000_000000000_1111111_11111111 工作者线程数的最大值 static final int MAX_CAP = 0x7fff; // max #workers - 1 //二进制形式0b00000000_00000000_11111111_11111110 //用来取workQueues偶数槽下标,(二进制最低位强置为0,当最低位为0时,表示偶数) static final int EVENMASK = 0xfffe; // even short bits //二进制形式0b00000000_00000000_00000000_01111110 最大的槽位值(限制最多只有64们偶数槽位) static final int SQMASK = 0x007e; // max 64 (even) slots // Masks and units for WorkQueue.scanState and ctl sp subfield //控制WorkQueue的scanState属性 、及ForkJoinPool的ctl属性的低32位sp子属性 //二进制形式0b00000000_00000000_00000000_00000001 ,runState的最低位,1表示正在扫描,0表示正在执行任务 static final int SCANNING = 1; // false when running tasks // 二进制形式0b10000000_00000000_00000000_00000000 runState最高位为1,表示此线程是INACTIVE空闲的 static final int INACTIVE = 1 << 31; // must be negative // 0b00000000_00000001_00000000_00000000 版本号,让线程池索引加1,取下一个线程 static final int SS_SEQ = 1 << 16; // version count // Mode bits for ForkJoinPool.config and WorkQueue.config 控制ForkJoinPool.config和WorkQueue.config属性的常量 // 二进制形式0b11111111_11111111_00000000_00000000 获取当前队列mode的掩码,只取config的高16位 static final int MODE_MASK = 0xffff << 16; // top half of int // 二进制形式0b00000000_00000000_00000000_00000000 先进先出模式(异步模式),将WorkQueue看作(内部任务)队列 static final int LIFO_QUEUE = 0; // 二进制形式0b00000000_00000001_00000000_00000000 后时先出模式(非异步模式),将WorkQueue看作(内部任务)栈 static final int FIFO_QUEUE = 1 << 16; // 二进制形式0b10000000_00000000_00000000_00000000 共离模式,提交外部任务放入的队列 static final int SHARED_QUEUE = 1 << 31; // must be negative // Lower and upper word masks 分别取ctl的高32位和低32位的掩码 private static final long SP_MASK = 0xffffffffL; private static final long UC_MASK = ~SP_MASK; // Active counts 获取活动线程数的相关掩码 private static final int AC_SHIFT = 48; private static final long AC_UNIT = 0x0001L << AC_SHIFT; private static final long AC_MASK = 0xffffL << AC_SHIFT; // Total counts 获取部线程数的相关掩码 private static final int TC_SHIFT = 32; private static final long TC_UNIT = 0x0001L << TC_SHIFT; private static final long TC_MASK = 0xffffL << TC_SHIFT; private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign // runState bits: SHUTDOWN must be negative, others arbitrary powers of two //执行器运行状态的掩码。 private static final int RSLOCK = 1; private static final int RSIGNAL = 1 << 1; private static final int STARTED = 1 << 2; private static final int STOP = 1 << 29; private static final int TERMINATED = 1 << 30; private static final int SHUTDOWN = 1 << 31;上面这些常量,大多都是获取相关属性的各分割位的掩码。SMASK 、SQMASK是获取线程池中线程数相关边界的掩码,SCANNING 、INACTIVE等又是设置获取runState及 ctl的sp子属性相关分割位含义的掩码。
2.1.2 成员变量
// 用来配置ctl在控制线程数量使用 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign //控制线程池数量(ctl & ADD_WORKER) != 0L 时创建线程, // 也就是当ctl的第16位不为0时,可以继续创建线程 volatile long ctl; // main pool control //全局锁控制,全局运行状态 volatile int runState; // lockable status //config二进制形式的低16位表示parallelism, //config二进制形式的第高16位表示mode,1表示异步模式, 使用先进先出队列, 0表示同步模式, 使用先进后出栈 //低16位表示workerQueue在pool中的索引,高16位表示mode, 有FIFI LIFL final int config; // parallelism, mode //生成workerQueue索引的重要依据 int indexSeed; // to generate worker index //工作者队列数组,内部线程ForkJoinWorkerThread启动时会注册一个WorkerQueue对象到这个数组中 volatile WorkQueue[] workQueues; // main registry //工作者线程线程工厂,创建ForkJoinWorkerThread的策略 final ForkJoinWorkerThreadFactory factory; //在线程因未捕异常而退出时,java虚拟机将回调的异常处理策略 final UncaughtExceptionHandler ueh; // per-worker UEH //工作者线程名的前缀 final String workerNamePrefix; // to create worker name string //执行器所有线程窃取的任务总数,也作为监视runState的锁 volatile AtomicLong stealCounter; // also used as sync monitor //通用的执行器,它在静态块中初始化 static final ForkJoinPool common;ctl:执行器的主要控制属性。它可以可以分割为4个子字段。
AC:ctl最高的16位,它表示"active workers — parallelism"(活动线程数减去预设的并行度),因此它为负数时表明活动线程不够。 TC: ctl的33位至47位,它表示“total workers — parallelism” (部线程数送去预设的并行度),因此它为负数时表明总线程数太少。 SS:ctl的低32位,ctl的16位至1位表示版本号,ctl的32-17位表示处于栈顶(此时将WorkQueue看作栈)的空闲线程的状态 ID : ctl的低32位, 表示下次使用的线程池索引可见ctl的低32位同时表示SS和ID,因此将ctl的低32位可统称为sp,sp非零表示当前池中没有空闲线程。
-
runState:表示线程池的运行状态,它有RSLOCK、 RSIGNAL 、STOP 、 TERMINATED 、SHUTDOWN这几种状态。
-
config:执行器的配置信息,包括并行度、模式。此属性在构造方法中初始化后就不再变化。并行度即预先估计的线程数,而模式表示是否为异步模式。若是异步模式就将WorkQueue当作队列,反之将WorkQueue当作栈。
-
indexSeed:生成工作者的随机索引(workQueues的槽位索引)的重要依据。
-
factory:线程工厂,创建ForkJoinWorkerThread的策略。
-
ueh:在线程因未捕异常而退出时,java虚拟机将回调的异常处理策略。
-
workerNamePrefix:工作者线程名的前缀。
-
stealCounter:当前执行器所有线程所窃取的任务总数,也作为监视runState的锁。
-
common:通用的执行器,它在静态块中初始化。当workQueue未指定执行器时就使用这个执行器。
2.1.3 WorkerQueue的字段
@sun.misc.Contended static final class WorkQueue { //队列的初始容量 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 64M 队列的最大容量 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatile int scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor int nsteals; // number of steals int hint; // randomization and stealer index hint int config; // pool index and mode volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer }-
scanState:它可以看作是乐观锁的版本号,另外它还有此其他功能,它为负数时,表示工作者线程非活动,它为奇数是表示,正在扫描(准备窃取)任务,它为偶数是表示正在执行任务。
-
stackPred:表示在线程池栈当前工作线程的前驱线程的索引。在唤醒线程时常用到此属性。
-
nsteals:表示owner线程窃取的任务数。
-
hint:任务窃取时的随机定位种子。
-
config:低16位表示,当前WorkerQueue对象在外部类的数组属性workQueues中的索引(下标) 。高16位表示当前WorkerQueue对象的模式。对于内部任务,若构造方法配置为异步模式就将WorkQueue当作先进先出的队列,反之将WorkQueue当作后进先出的栈。对于外部任务,将WorkQueue视为共享队列。
-
qlock:初始值为0,”=1“时表示当前WorkerQueue对象被锁住,” < 0“时 表示当前WorkerQueue对象已终止,队列中的其他未完成任务将不再被执行。
-
base:表示下次对任务数组array进行poll出队操作(窃取任务)的槽位索引(队尾)。
-
top:表示下次任务数组array进行push入栈操作(添加任务)的槽位索引(栈顶)。
-
array:非学重要的属性,这用是保存任务的数组(容器)。
-
pool:与之关联的ForkJoinPool执行器,它可能为空。若为空,就使用静态变量common作为执行器。
-
owner:当前队列对应的工作者线程,它一般不为空。若从外部提交任务时,当前WorkerQueue对象表示共享队列,owner为空。
-
parker:阻塞的线程。在被阻塞的时候,它等于owner,其他时候它为空。
-
currentJoin:表示当前正在join的任务,主要在awaitJoin方法使用。
-
currentSteal:表示当前被窃取的任务,主要在helpStealer方法中使用。
2.2 ForkJoinPool 源码分析
2.2.1 runState锁的获取与释放
lockRunState():获取runState锁
(当前锁已被持有)或(当前未被持有且再获取锁状态还失败)时,等待awaitRunStateLock方法(自旋或阻塞)完成后返回,当前锁未被持有且获取锁状态成功,就直接返回。
private int lockRunState() { int rs; return ((((rs = runState) & RSLOCK) != 0 || //当前runState锁未被持有 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //不是锁状态再去获取锁状态失败 awaitRunStateLock() : rs); }awaitRunStateLock()
核心逻辑:
- 当前锁未被持有,就CAS自旋获取锁状态(CAS更新为锁定状态),
- 已获取到锁状态,尝试CAS将runState更新为等待信号RSIGNAL的状态 , CAS更新成功,就让当前线程阻塞等待 ,CAS更新失败,就重新自旋。
unlockRunState
unlockRunState方法释放锁并重设runState. 其方法逻辑简单
- 先使用CAS更新runState为SINGAL,即CAS释放锁状态;若CAS失败,就使用java原生的“等待/通知”模型唤醒线程。这里刚好与上面的awaitRunStateLock对应上。
2.2.2 创建、注册、注销Worker
tryAddWorker(long c)
tryAddWorker(long c):尝试创建一个ForkJoinWorkerThread线程。
主要逻辑:如果可用线程太少、执行器未终止且线程数预更新成功,就创建并启动一个ForkJoinWorkerThread线程。
private void tryAddWorker(long c) { boolean add = false; do { //AC(活动线程数)和TC(总线程数)均加1 long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //获取runState锁后检查执行器是否停止 //更新AC 与TC add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { createWorker(); //创建ForkJoinWorkerThread工作者,并启动(Thread.start()) break; } } //"((c = ctl) & ADD_WORKER) != 0L" ADD_WORKER取TC的最高位,即符号数, //符号位非零说明TC为负数,即池中的线程总数小于配置的并行度,线程数太少 //(c只取ctl的低32位)c == 0表示池中没有空闲线程 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);//可用线程数不够就自旋,直到启动一个ForkJoinWorkerThread }createWorker()
createWorker()用来创建一个工作者线程,其核心逻辑是:构建并启动一个ForkJoinWorkerThread,若失败就回滚注销这个线程。
private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //构建并启动一个Worker线程, //构建线程时构造方法ForkJoinWorkerThread()会执行registerWorker()注册工作者线程 if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex);//注销这个Worker线程 return false; }registerWorker
registerWorker 用来注册一个工作者线程。其核心逻辑是:
- 将工作者线程wt放入workQueues中,并初始化wt的其他相关属性。而要将wt放入workQueues,就必须先计算wt的槽位索引,如果对应的槽位已被占用(索引碰撞),就要重新计算索引(碰撞特别严重时需要对workQueues扩容),
deregisterWorker
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array int idx = w.config & SMASK;//获取queue在workQueues中的索引 int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) //workQueues中存在这个workerQueue, // 就将wt对应的workerQueue从workQueues中移除 ws[idx] = null; unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts //将AC TC均减1(线程数少1),ctl的低32位不变 do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); if (w != null) { w.qlock = -1;//qlock<0 表示此workerqueue已terminated ensure set w.transferStealCount(this); //将此workerQueue的stealCount转移同步到外部类对应的属性上 w.cancelAll();//取消此workerQueue上所有(未完成)任务 cancel remaining tasks } //如果有需要,就用一个线程来替补这个即将终止的线程 for (;;) { // possibly replace WorkQueue[] ws; int m, sp; if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating //如果执行器已(正)终止或没有可执行的任务,就不需要替补 break; if ((sp = (int)(c = ctl)) != 0) { //还有空闲线程就唤醒一个线程来替补 wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } //若池中没有空闲线程、线程数少(不足以支持配置的并行度)且这个线程是因异常而终止的, //就创建一个线程 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else //其他情况不需要替代终止的线程 don't need replacement break; } //处理异常 if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }2.2.3 Scanning for tasks
runWorker:此方法是worker线程执行任务的最上层循环,此方法直接被ForkJoinWorkerThread.run调用
runWorker的主要逻辑:先到其他队列中窃取任务,如能窃取到任务,就先执行窃取的任务,然后执行自己本地的任务。若无法窃取到阻塞等待被窃取的任务
runWorker
final void runWorker(WorkQueue w) { w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift for (ForkJoinTask<?> t;;) { if ((t = scan(w, r)) != null) //随机窃取任务 w.runTask(t);//先执行窃取来的任务t,然后执行w.array中的本地任务 else if (!awaitWork(w, r))//阻塞等待被窃取的任务 break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift 异或计算,使随机数更随机 } }scan
scan()扫描并尝试窃取任务。
从一个随机的槽位开始扫描,然后此槽位对应的任务队列尾部窃取任务。若此槽位上没有任务或线程竞争激烈就再次随机方式移动槽位,反之以线性方式移动槽位,直到在两轮遍历中有相同的校验和就退出自旋(队尾索引连续为空时,对每个队列的队尾索引进行累加的结果称为校验和checkSum)返回null。 若扫描到的队列是活动的,将尾部任务返回,任务窃取成功。若扫描到的队列是非活动的,就尝试激活它,重新自旋。
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { int ss = w.scanState; // initially non-negative for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; if ((q = ws[k]) != null) { //随机获取一个workerqueue //保证随机的q中含有任务 if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //在队列的尾部获取一个任务(此时array看作”队列“,只能从尾部窃取任务,) if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) {//保证队列尾部索引未被其他线程修改 if (ss >= 0) { //w.owner线程是活动的(scanState是负数表示INACTIVE) //将q.array队列队尾的任务清空,并通知工作者线程 if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; if (n < -1) // signal others signalWork(ws, q); return t; } } //w.owner线程是INACTIVE,就激活线程w.owner线程 else if (oldSum == 0 && // try to activate w.scanState < 0) //唤醒w.owner线程,并将AC(活动线程数)加1 tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //队列q.array中没有任务或线程竞争激烈CAS更新失败, //或w.owner是INACTIVE,就重新获取scanState、随机移动到一个槽位 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; //随机获取workerQueue的槽位(r是随机的) origin = k = r & m; // move and rescan oldSum = checkSum = 0;//重置为0 continue; } checkSum += b;//队列连续为空时,将队尾索引累加 } //q是空或q.array中没有任务时,无法窃取任务,重新计算索引 //"(k + 1) & m”让索引位置线性移动,上次索引加1后用作除数再取余 //“(k + 1) & m) == origin”表明经过索引若干次移动后跳回了最初始的索引位置 if ((k = (k + 1) & m) == origin) { // continue until stable //ss >= 0 表明w.owner线程之前是活动的(局部变量保存它之前的状态,后来其他线程可能已经将之修改,它可能不是最新的状态), // "(ss == (ss = w.scanState)"w.owner线程之前是inactivate时就重新获取scanState //"oldSum == (oldSum = checkSum)"本轮与上轮的“队列连续为空时队尾索引累加值"相等 //(每一轮的定义是:索引经若干次移动后第一次跳回最初始的索引位置) //若两者相等,可能就要出退出自旋,窃取任务失败 if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) {//更新上次的检验和oldSum //此w.owner是inactive,或此队列w已经terminated(终止),退出自旋,任务窃取失败 if (ss < 0 || w.qlock < 0) // already inactive break; //w.owner是活动的,就将其设为inactivate int ns = ss | INACTIVE; // try to inactivate long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); //保存前一个栈顶 w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); //尝试更新ctl,并将更新runSate if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns;//CAS更新成功,使用最新scanState else w.scanState = ss; //CAS更新失败,使用之前的scanState back out } checkSum = 0;//将下次校验和初始化置为零 } } } return null; }awaitWork
awaitWork 方法为任务的窃取而阻塞工作者w(若w应终止返回false)。
主要逻辑:若w应终止返回false ;若w状态是ACTIVE的就返回true。
而若w状态是INACTIVE则要进行进一步的处理。 如果当前执行器停滞(没有活动线程),就检查执行器是否终止,给当前线程设置给定的超时时间(若还有活动线程就不限时等待)。 在超时后,若w状态是INACTIVE的就返回true, 若ctl尚未被更改,就重置ctl,返回false。
private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating //w.qlock<0表示,此WorkQueue已经终止 return false; for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) //此w.owner线程是活动的active,当前方法返回true。而 runWorker方法会自旋重新窃取任务 break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; //若w前驱对应线程没有被阻塞或是active的(active状态能确定它不可能被阻塞),就继续自旋 if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && (v = ws[j]) != null && // see if pred parking (v.parker == null || v.scanState >= 0)) spins = SPINS; // continue spinning } } else if (w.qlock < 0) // recheck after spins return false; //此WorkQueue已经终止 else if (!Thread.interrupted()) {//当前线程未中断(若中断了就清除中断标志) long c, prevctl, parkTime, deadline; int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) ||//活动线程不够且执行器(已经或正在)终止 (runState & STOP) != 0) //执行器正在终止 pool terminating return false; //没有空闲的工作者线程 if (ac <= 0 && ss == (int)c) { // is last waiter prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short)(c >>> TC_SHIFT); // shrink excess spares //增加一个活动线程、并记录w前驱的索引(空闲线程过多,减少空闲线程) if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // else use timed wait //cas失败,将执行下面的超时等待 parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else//有其他的空闲线程,可能要不限时的休眠等待 prevctl = parkTime = deadline = 0L; // Thread wt = Thread.currentThread(); //wt线程记录阻塞对象 U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park //wt是INACTIVE就休眠等待 U.park(false, parkTime); //超时或被唤醒,将相关的parker清空 U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) break;//wt是ACTIVE的,返回true if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) //减少空闲线程成功,执行器将终止,返回false return false; // shrink pool } } return true; }2.2.4 Joining tasks
helpComplete 尝试窃取并执行指定的任务task。helpComplete 只能窃取CountedCompleter类型任务,CountedCompleter是ForJoinTask的子类。maxTasks参数表示从(除w之外)其他队列中窃取任务task的总数.
helpComplete 的主要逻辑: 先从w的本地队列中出队并执行(popCC方法)任务task,然后到其他队列中(队尾)窃取任务task执行。若在其他队列中窃取任务成功但还达到指定的窃取数目,就随机移动槽位,继续自旋;若在其他队列中没有窃取到任务,就线性移动槽位,直到两轮校验和相等就退出自旋。
final int helpComplete(WorkQueue w, CountedCompleter<?> task, int maxTasks) { WorkQueue[] ws; int s = 0, m; if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && task != null && w != null) { int mode = w.config; // for popCC int r = w.hint ^ w.top; // arbitrary seed for origin int origin = r & m; // first queue to scan //h=1,表示窃取任务并执行成功,h>1表示多线程竞争,h<0表示队列队尾上不存在此任务 int h = 1; //1:ran, >1:contended, <0:hash for (int k = origin, oldSum = 0, checkSum = 0;;) { CountedCompleter<?> p; WorkQueue q; if ((s = task.status) < 0) //任务完成,直接返回 break; if (h == 1 && (p = w.popCC(task, mode)) != null) { //上次窃取任务成功,并在w的栈顶中成功出队,执行此任务 p.doExec(); // run local task if (maxTasks != 0 && --maxTasks == 0) //完成了给定数目的任务窃取工作,退出自旋 break; //重置初始槽位、校验和() origin = k; // reset oldSum = checkSum = 0; } //上次任务窃取失败或w中不存在task,继续到其他队列中窃取任务 else { // poll other queues if ((q = ws[k]) == null)//队列不存在 h = 0; else if ((h = q.pollAndExecCC(task)) < 0)//队列的尾部没这个任务 checkSum += h;//校验和累加 if (h > 0) {//任务被成功窃取或存在多线程竞争需要重试 if (h == 1 && maxTasks != 0 && --maxTasks == 0) //本次成功窃取任务且已完成给定数目的任务窃取工作,退出自旋 break; //随机移动槽位 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift origin = k = r & m; // move and restart oldSum = checkSum = 0; } //队列q的尾部上没有指定任务,窃取失败,“(k + 1) & m”表示需要线性移动槽位 //两轮连续窃取任务失败的校验和相等,就退出自旋 else if ((k = (k + 1) & m) == origin) { //“k=origin”表示遍历一轮workerQueues if (oldSum == (oldSum = checkSum))//两轮校验和相等 break; checkSum = 0; } } } } return s; }2.2.5 external
externalPush
final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe();//探针值,用于计算q在workQueues中的索引槽位 int rs = runState; //运行状态 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && //workQueues非空,且workQueues可放入任务(长度大于1) //与HashMap类似,m&r是用来确定数组的索引(取余,这里的r相当于HashMap中Node的hash属性), //SQMASK=Ob1111110,(SQMASK十进制为126,)它限制了槽位索引只能是0-126 //而SQMASK的二进制最低位为0,又相当于强制将"m & r'的最低位设为0(二进制最低位为零时表示偶数), //因此"m & r & SQMASK"的结果取0-126之间的偶数(共有64个偶数)。 (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { //锁定q,这里CAS更新成功后q.qlock为1,其他线程就不能CAS更新q.qlock了 ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) {//q.array还可以放入新任务 //am二进制位全是1,所以am&s ==s int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task);//将task放入到成员变量ForkJoinTask类型数组array中 U.putOrderedInt(q, QTOP, s + 1);//更新下次入队位置的索引 U.putIntVolatile(q, QLOCK, 0);//无条件更新q.qlock,解除对q的锁定 if (n <= 1) //队列中最多只有一个任务了,可以唤醒一个线程或创建一个新线程来执行任务 signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0);//q.array无法容纳新任务时,也要解除对q的锁定,因数之前CAS成功将q.qlock更新为1 } //完整版本的externalPush // workQueues 或workQueues[m & r & SQMASK]是空的,需要初始化相关属性,并提交任务 externalSubmit(task); }externalSubmit
externalSubmit 是完整版的externalPush,核心逻辑是:
- 若执行器还未开始、workQueues未被初始化时,将执行器状态更新、并初始化相关属性。
- 若workQueues相应槽位的queue不为空,就将任务添加到这个queue中。
- 若workQueues相应槽位的queue为空,创建一个新的共享队列放入workQueues中。
externalInterruptibleAwaitDone
private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>) this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) wait(0L); else notifyAll(); } } } } return s; } //翻译后的方法 private int externalInterruptibleAwaitDone1() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); if((s = status) >= 0 ){ if(this instanceof CountedCompleter){ s= ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>) this, 0); }else{ s=ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0; } } if(s>=0){ while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) wait(0L); else notifyAll(); } } } } return s; }2.3 WorkerQueue
工作人员队列处在奇数索引处。共享(提交)队列的索引为偶数,最多64个插槽,以限制增长。
“ ctl”字段原子地维护活动和总工作人员计数,以及用于放置等待线程的队列,以便可以定位它们以发出信号。主动计数也起着静态指标的作用,因此当工作人员认为没有更多要执行的任务时,主动计数就会减少。 “队列”实际上是Treiber堆栈的一种形式。堆栈是按最近使用的顺序激活线程的理想选择。这改善了性能和局部性,克服了易于争用和无法释放工作程序(除非其位于堆栈的最顶部)的缺点。当工人找不到工作时,他们推入闲置的工人堆栈(由ctl的较低的32bit子字段表示)后,我们将工人停放。最高堆栈状态保存工作程序的“ scanState”字段的值:其索引和状态,以及一个版本计数器,该计数器添加。
WorkQueue也以类似的方式用于提交给池的任务。我们不能将这些任务混合在工人使用的相同队列中。相反,我们使用哈希形式将提交队列与提交线程随机关联。 ThreadLocalRandom探针值用作选择现有队列的哈希码,并且在与其他提交者竞争时可以随机重新放置。从本质上讲,提交者的行为类似于工作人员,只是他们只能执行提交的本地任务(或者在CountedCompleters的情况下,其他任务具有相同的根任务)。在共享模式下插入任务需要一个锁(主要是在调整大小的情况下进行保护),但是我们仅使用简单的自旋锁(使用字段qlock),因为遇到繁忙队列的提交者会继续尝试或创建其他队列-它们阻止了仅在创建和注册新队列时。此外,“ qlock”在关闭时会饱和到可解锁的值(-1)。在成功的情况下,仍可以通过更便宜的有序写入“ qlock”来执行解锁,但是在不成功的情况下使用CAS。
工作者和池都使用字段scanState来管理和跟踪工作者是不活动的(可能是阻塞的,等待信号),还是对任务进行扫描(当两个都不持有它正在忙于运行任务时)。禁用工作程序后,将设置其scanState字段,并阻止其执行任务,即使它必须对其进行扫描一次也可以避免排队。请注意,scanState更新延迟队列CAS释放,因此使用时需要注意。排队时,scanState的低16位必须保留其池索引。因此,我们在初始化时将索引放置在此处(请参见registerWorker),否则将其保留在该索引中或在必要时将其还原。
2.3.1 WorkerQueue的字段
@sun.misc.Contended static final class WorkQueue { //队列的初始容量 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; // 64M 队列的最大容量 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatile int scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor int nsteals; // number of steals int hint; // randomization and stealer index hint int config; // pool index and mode volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer }-
scanState:它可以看作是乐观锁的版本号,另外它还有此其他功能,它为负数时,表示工作者线程非活动,它为奇数是表示,正在扫描(准备窃取)任务,它为偶数是表示正在执行任务。
-
stackPred:表示在线程池栈当前工作线程的前驱线程的索引。在唤醒线程时常用到此属性。
-
nsteals:表示owner线程窃取的任务数。
-
hint:任务窃取时的随机定位种子。
-
config:低16位表示,当前WorkerQueue对象在外部类的数组属性workQueues中的索引(下标) 。高16位表示当前WorkerQueue对象的模式。对于内部任务,若构造方法配置为异步模式就将WorkQueue当作先进先出的队列,反之将WorkQueue当作后进先出的栈。对于外部任务,将WorkQueue视为共享队列。
-
qlock:初始值为0,”=1“时表示当前WorkerQueue对象被锁住,” < 0“时 表示当前WorkerQueue对象已终止,队列中的其他未完成任务将不再被执行。
-
base:表示下次对任务数组array进行poll出队操作(窃取任务)的槽位索引(队尾)。
-
top:表示下次任务数组array进行push入栈操作(添加任务)的槽位索引(栈顶)。
-
array:非学重要的属性,这用是保存任务的数组(容器)。
-
pool:与之关联的ForkJoinPool执行器,它可能为空。若为空,就使用静态变量common作为执行器。
-
owner:当前队列对应的工作者线程,它一般不为空。若从外部提交任务时,当前WorkerQueue对象表示共享队列,owner为空。
-
parker:阻塞的线程。在被阻塞的时候,它等于owner,其他时候它为空。
-
currentJoin:表示当前正在join的任务,主要在awaitJoin方法使用。
-
currentSteal:表示当前被窃取的任务,主要在helpStealer方法中使用。
config:线程池的索引 , 第16位是1,表示为先进先出的队列,第16位为1,表示后进先出的栈。
获取线程池索引,实际上是取config的二进制的2-16位。
final int getPoolIndex() { //先取低16位,但最低位是表示mode,将奇偶标志去掉。 return (config & 0xffff) >>> 1; // ignore odd/even tag bit }获取队列中的任务数。
final int queueSize() { //栈底-栈顶,正常情况下为负。 int n = base - top; // non-owner callers must read base first return (n >= 0) ? 0 : -n; // ignore transient negative }runTask(ForkJoinTask) 先执行指定的从外部窃取的任务,然后执行当前队列中的所有本地任务。
final void runTask(ForkJoinTask<?> task) { if (task != null) { //scanState&=0b11111111_11111111_11111111_11111110 scanState &= ~SCANNING; // mark as busy 将scanState的二进制最低位强制置为0,位运算后scanState为偶数,表示工作者线程正执行任务 (currentSteal = task).doExec(); //更新‘窃取任务’属性currentSteal U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC ‘窃取任务’完成后,将currentSteal清空 execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow 正数在超出最大可表示范围后,将变成负数, //将nsteals同步到外部类ForkJoinPool的stealCounter属性上,并将nsteals重置为0. transferStealCount(pool); //将scanState的二进制最低位强制置为,位运算后scanState为奇数,表示工作者线程处于空闲状态 scanState |= SCANNING; //scanState|=0b00000000_00000000_00000000_00000001 , if (thread != null) //thread若是InnocuousForkJoinWorkerThread类型,就清空父类Thread的threadLocals 、inheritableThreadLocals这两个属性。 //thread若是ForkJoinWorkerThread,则啥也不做 thread.afterTopLevelExec(); } }tryRemoveAndExec移除并执行指定的任务。只有当队列为空或给定任务完成状态未知时返回true,在队列中没有此任务和此任务已经完成返回false。
final boolean tryRemoveAndExec(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; int m, s, b, n; if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //任务队列已经初始化 且 给定的任务也非空 while ((n = (s = top) - (b = base)) > 0) { // 任务队列非空 for (ForkJoinTask<?> t;;) { // traverse from s to b long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)//当前遍历到的任务为null //若栈顶为null,返回true return s + 1 == top; // shorter than expected else if (t == task) {//在栈(队列)中找到了这个任务 boolean removed = false; if (s + 1 == top) {// pop //若待移除任务在栈顶,将队列中的对应任务引用清空,并更新栈顶 if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); removed = true; } } // //若待移除任务在栈底,则用EmptyTask代替原任务 else if (base == b) // replace with proxy removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed)//移除后,执行此任务 task.doExec(); break; } //当前遍历到的任务不是给定的任务 else if (t.status < 0 && s + 1 == top) {//当前遍历到的任务已完成且处于栈顶,将此任务从队列中移除,并更新栈顶 if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); break; // was cancelled } if (--n == 0) //遍历过程中队列变为空,返回false return false; } if (task.status < 0)//指定任务已经完成返回false return false; } } return true; }参考: https://blog.csdn.net/tyrroo/article/details/81483608
https://blog.csdn.net/weixin_30696755/article/details/114140368
https://blog.csdn.net/u010841296/article/details/83963637
https://blog.csdn.net/Xiaowu_First/article/details/122407019
https://www.cnblogs.com/juniorMa/articles/14234472.html
https://www.cnblogs.com/juniorMa/articles/14241296.html
https://www.cnblogs.com/juniorMa/articles/14241534.html
https://www.cnblogs.com/juniorMa/articles/14242659.html
https://blog.csdn.net/weixin_42039228/article/details/123206215