当前位置 : 主页 > 编程语言 > java >

Java并发编程——CompletableFuture源码解析

来源:互联网 收集:自由互联 发布时间:2023-02-04
前言 JDK8 为我们带来了 CompletableFuture 这个有意思的新类,它提供比 Future 更灵活更强大的回调功能,借助 CompletableFuture 我们可以更方便的编排异步任务。   由于 CompletableFuture 默认的线

前言

JDK8 为我们带来了 CompletableFuture 这个有意思的新类,它提供比 Future 更灵活更强大的回调功能,借助 CompletableFuture 我们可以更方便的编排异步任务。

 

由于 CompletableFuture 默认的线程池是 ForkJoinPool,在讲 CompletableFuture 之前觉得有必要先简单介绍一下 ForkJoinPool。

一、ForkJoinPool 工作原理

ForkJoin 框架,另一种风格的线程池(相比于 ThreadPoolExecutor),采用分治算法,以及工作窃取策略,极大地提高了并行性。对于那种大任务分割小任务的(分治)又或者并行计算场景尤其有用。

1.1 ForkJoinPool

线程池,ForkJoinPool 类中有一个commonPool,默认并发数为逻辑处理器个数 - 1;

ForkJoinPool commonPool = ForkJoinPool.commonPool();//makeCommonPool //看源码发现 int parallelism = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"); // 如果有自定义,那么取自定义的作为 parallelism // 如果没有 逻辑处理器个数-1 parallelism = Runtime.getRuntime().availableProcessors() - 1

当然了,如果你不想使用 commonPool,你完全可以直接new 一个

ForkJoinPool fjp = new ForkJoinPool(4);// 最大并发数4

其实我当时有个疑问,明明可以直接 new 一个 ForkJoinPool,还可以很方便的指定parallelism(我写的ForkJoinTest就是通过这种方式),为什么 ForkJoinPool 中还定义一个static的commonPool?

 

commonPool一般用于 Java8 中的并发流计算中或者 CompletableFuture 没有指定线程池时使用的一个commonPool。

1.2 ForkJoinTask

ForkJoinPool 与 ForkJoinTask 之间的关系,可以类比 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。它是一个抽象类。主要有两个实现RecursiveAction(有返回值) 和 RecursiveTask(无返回值)

1.3 ForkJoinWorkerThread

运行 ForkJoinTask 任务的工作线程(worker),最大并发数不会超过parallelism。

1.4 WorkQueue

任务队列,每个worker对应一个queue,这是和 ThreadPoolExecutor 最大不同的地方之一。

1.5 WorkQueue[]

ForkJoinPool 中的任务分为两种,一种是本地提交的任务Submission task,通过execute()、submit()、invoke()等方法提交的任务;比如 CompletableFuture 中不提供线程池时,提交的都是Submission task。

 

另外一种是工作线程fork出的子任务Worker task。

 

两种任务都会存放在WorkQueue数组中,Submission task存放在WorkQueue数组的偶数索引位置,Worker task存放在奇数索引位置。工作线程只会分配在奇数索引的工作队列。

1.6 工作窃取机制

工作窃取是指当某个线程的任务队列中没有可执行任务的时候,从其他线程的任务队列中窃取任务来执行,以充分利用工作线程的计算能力,减少线程由于获取不到任务而造成的空闲浪费。在 ForkJoinPool 中,工作任务的队列都采用双端队列容器。我们知道,在通常使用队列的过程中,我们都在队尾插入,而在队头消费以实现 FIFO。而为了实现工作窃取。一般我们会改成工作线程在工作队列上 LIFO,而窃取其他线程的任务的时候,从队列头部取获取。

工作线程worker1、worker2以及worker3都从WorkQueue的尾部popping获取task,而任务也从尾部Pushing,当worker3队列中没有任务的时候,就会从其他线程的队列中窃取stealing,这样就使得 worker3 不会由于没有任务而空闲。这就是工作窃取算法的基本原理。 可以想象,要是不使用工作窃取算法,那么我们在不断 fork 的过程中,可能某些 worker 就会一直处于 join 的等待中。

 

因为这种机制,它能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。

  • 1、为什么说 ForkJoinPool 并发访问一个 IO 计算可能会拖垮整个系统? 这主要说的是 ForkJoinPool 中的 commonPool,commonPool 是整个·系统共享的。比如你在 Stream 并行流中并发访问一个 IO 计算,某一时刻会导致commonPool 中大部分线程甚至所有线程都阻碍在这里。这可能就会造成其他程序使用到 commonPool 的程序线程饥饿,比如 CompletableFuture 中没有指定线程池时。

  • 2、为什么说 cpu 密集型的任务使用 ForkJoinPool 性能更好? 网上说主要有工作窃取机制。工作窃取的目的不就是充分利用 cpu,那普通线程不也可以吗,把核心线程调成和 ForkJoinPool 默认线程。

CompletableFuture 使用总结

关于 CompletableFuture(异步计算)的使用,比如订单列表的并发请求远程服务,服务中异步处理一些任务,又或者异步导出,并发处理数据等等。

按功能分类

  • xxx():表示该方法将继续在已有的线程中执行;
  • xxxAsync():表示将异步在线程池中执行。
  • 异步执行方法默认一个参数的话任务是在 ForkJoinPool.commonPool() 线程池中执行的(带看源码),带executor 参数的使用 executor线程池异步执行。

按逻辑和组织方式来分的话(completableFuture 中大约有50个来方法)

  • 一种是 then 的逻辑,即前一个计算完成的时候调度后一个计算
  • 一种是 both 的逻辑,即等待两个计算都完成之后执行下一个计算,只要能组合一个和另一个,我们就可以无限复用这个 +1 的逻辑组合任意多的计算。(如果任务多,可以考虑可用allOf)
  • 另一种是 either 的逻辑,即等待两个计算的其中一个完成之后执行下一个计算。注意这样的计算可以说是非确定性的。因为被组合的两个计算中先触发下一个计算执行的那个会被作为前一个计算,而这两个前置的计算到底哪一个先完成是不可预知的(anyOf)

从依赖关系和出入参数类型区别,基本分为三类

  • apply 字样的方式意味着组合方式是 Function,即接受前一个计算的结果,应用函数之后返回一个新的结果。
  • accept 字样的方式意味着组合方式是 Consumer,即接受前一个计算的结果,执行消费后不返回有意义的值。
  • run 字样的方式意味着组合方式是 Runnable,即忽略前一个计算的结果,仅等待它完成后执行动作。

其中出入参数主要有 Java8 Function,Consumer 或 Runnable三中函数型接口,每一种都决定了是怎么样一种依赖关系

二、数据结构

2.1、CompletableFuture

CompletableFuture 实现了 Future 接口和 CompletionStage,Future 不必多说,而 CompletionStage 定义了各种任务编排的 API:

CompletableFuture<T> implements Future<T>, CompletionStage<T> { volatile Object result; // Either the result or boxed AltResult volatile Completion stack; // Top of Treiber stack of dependent actions }

CompletableFuture 的数据结构包括用于记录结果的 result,以及用于持有任务以及任务间依赖关系的 Completion 类型的成员变量 stack。

 

如果阅读过 spring 注解相关功能的源码的同学,对于 CompletableFuture 和 Completion 应该会有一种 TypeMappedAnnotation 和 AnnotationTypeMapping 的既视感,实际上他们两者之间的关系确实也非常相似。

2.2、Completion

数据结构

Completion 是 CompletableFuture 中的一个内部类,我们可以简单的认为它就是我们一般所说的“操作”。

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next; }

它通过 next 指针在 CompletableFuture 中形成一个链表结构。

依赖关系

它还有两个抽象的实现类 UniCompletion 和 BiCompletion:

abstract static class UniCompletion<T,V> extends Completion { Executor executor; // executor to use (null if none) CompletableFuture<V> dep; // the dependent to complete CompletableFuture<T> src; // source for action } abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { CompletableFuture<U> snd; // second source for action }

其中 executor 表示该操作的执行者,而 src 和 snd 两个指针表示要执行的操作对应的 CompletableFuture 实例,而 dep 则表示要执行的操作依赖的前置操作对应的 CompletableFuture 实例。多个 Completion 彼此之间通过这些指针维护彼此的依赖关系。

实现类

在 CompletableFuture,我们会看到很多格式为 UniXXX 或者 BiXXX 的内部类,它们大多数都基于上述两抽象类实现,分别对应不同的操作。我们以 UniApply 为例:

static final class UniApply<T,V> extends UniCompletion<T,V> { Function<? super T,? extends V> fn; }

其本质上就是一个额外挂载了 Function 接口的 UniCompletion,同理,XXXAccept 就是挂载了 Consumer 的 Completion,而 XXXRun 就是挂载的 Runnable 接口的 Completion。

三、源码分析

样例

后面介绍的源码都会以下面的用例为切入点,循着调用轨迹理解源码。如果任务很耗时,记得传Executor, 或者方法末尾加上future.get(); 因为CompletableFuture默认使用ForkJoinPool, 而ForkJoinPool里面的线程都是daemon线程,主线程跑完了,虚拟机也就over了。

public void whenComplete() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); future.whenComplete((l, r) -> System.out.println(l)); } public void thenApply() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); future.thenApply(i -> -i); } public void thenAccept() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); future.thenAccept(System.out::println); } public void thenRun() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); future.thenRun(() -> System.out.println("Done")); } public void thenAcceptBoth() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200); future.thenAcceptBoth(other, (x, y) -> System.out.println(x + y)); } public void acceptEither() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Integer> other = CompletableFuture.supplyAsync(() -> 200); future.acceptEither(other, System.out::println); } public void allOf() { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200); CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300); CompletableFuture.allOf(future, second, third); } public void anyOf() throws InterruptedException, ExecutionException { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Integer> second = CompletableFuture.supplyAsync(() -> 200); CompletableFuture<Integer> third = CompletableFuture.supplyAsync(() -> 300); CompletableFuture.anyOf(future, second, third); }

3.1 supplyAsync(Supplier<U> supplier)

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { // asyncPool, ForkJoinPool.commonPool()或者ThreadPerTaskExecutor(实现了Executor接口,里面的内容是{new Thread(r).start();}) return asyncSupplyStage(asyncPool, supplier); }

3.2 asyncSupplyStage(Executor e, Supplier<U> f)

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); // 构建一个新的CompletableFuture, 以此构建AsyncSupply作为Executor的执行参数 CompletableFuture<U> d = new CompletableFuture<U>(); // AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口 e.execute(new AsyncSupply<U>(d, f)); // 返回d,立返 return d; }

3.3 AsyncSupply

// CompletableFuture的静态内部类,作为一个ForkJoinTask static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { // AsyncSupply作为一个依赖Task,dep作为这个Task的Future CompletableFuture<T> dep; // fn作为这个Task的具体执行逻辑,函数式编程 Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { run(); return true; } public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { // 非空判断 dep = null; fn = null; // 查看任务是否结束,如果已经结束(result != null),直接调用postComplete()方法 if (d.result == null) { try { // 等待任务结束,并设置结果 d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); // 异常 } } // 任务结束后,会执行所有依赖此任务的其他任务,这些任务以一个无锁并发栈的形式存在 d.postComplete(); } } }

3.4 postComplete()

final void postComplete() { // 当前CompletableFuture CompletableFuture<?> f = this; // 无锁并发栈,(Completion next), 保存的是依靠当前的CompletableFuture一串任务,完成即触发(回调) Completion h; // 当f的stack为空时,使f重新指向当前的CompletableFuture,继续后面的结点 while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; // 从头遍历stack,并更新头元素 if (f.casStack(h, t = h.next)) { if (t != null) { // 如果f不是当前CompletableFuture,则将它的头结点压入到当前CompletableFuture的stack中,使树形结构变成链表结构,避免递归层次过深 if (f != this) { pushStack(h); // 继续下一个结点,批量压入到当前栈中 continue; } // 如果是当前CompletableFuture, 解除头节点与栈的联系 h.next = null; } // 调用头节点的tryFire()方法,该方法可看作Completion的钩子方法,执行完逻辑后,会向后传播的 f = (d = h.tryFire(NESTED)) == null ? this : d; } } }

示意图

每个CompletableFuture持有一个Completion栈stack, 每个Completion持有一个CompletableFuture -> dep, 如此递归循环下去,是层次很深的树形结构,所以想办法将其变成链表结构。

首先取出头结点,下图中灰色Completion结点,它会返回一个CompletableFuture, 同样也拥有一个stack,策略是遍历这个CompletableFuture的stack的每个结点,依次压入到当前CompletableFuture的stack中,关系如下箭头所示,灰色结点指的是处理过的结点。

第一个Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面。

后续的Completion结点返回的CompletableFuture, 将拥有的stack里面的所有结点都压入了当前CompletableFuture的stack里面,重新构成了一个链表结构,后续也按照前面的逻辑操作,如此反复,便会遍历完所有的CompletableFuture, 这些CompletableFuture(叶子结点)的stack为空,也是结束条件。

postComplete()最后调用的是Completion#tryFire()方法,先看下Completion的数据结构

Completion

abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next; // 无锁并发栈 /** * 钩子方法,有三种模式,postComplete()方法里面使用的是NESTED模式,避免过深的递归调用 SYNC, ASYNC, or NESTED */ abstract CompletableFuture<?> tryFire(int mode); // run()和exec()都调用了这个钩子方法 /** cleanStack()方法里有用到 */ abstract boolean isLive(); public final void run() { tryFire(ASYNC); } public final boolean exec() { tryFire(ASYNC); return true; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } }

static final int SYNC = 0; 同步 static final int ASYNC = 1; 异步 static final int NESTED = -1; 嵌套

 

继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口,它有诸多子类,如下图

后面的方法都对应着不同的子类。

先看一个子类UniCompletion

abstract static class UniCompletion<T,V> extends Completion { Executor executor; // 执行器 CompletableFuture<V> dep; // 依赖的任务 CompletableFuture<T> src; // 被依赖的任务 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src) { this.executor = executor; this.dep = dep; this.src = src; } // 如果当前任务可以被执行,返回true,否则,返回false; 保证任务只被执行一次 final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { if (e == null) return true; executor = null; // 设置为不可用 e.execute(this); } return false; } final boolean isLive() { return dep != null; } }

claim()方法保证任务只被执行一次。

whenComplete

whenComplete()/whenCompleteAsync()

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); }

xxx和xxxAsync方法的区别是,有没有asyncPool作为入参,有的话,任务直接入参,不检查任务是否完成。uniWhenCompleteStage方法有说明。

uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f)

private CompletableFuture<T> uniWhenCompleteStage(Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); // 构建future CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { // 如果线程池不为空,直接构建任务入栈,并调用tryFire()方法;否则,调用uniWhenComplete()方法,检查依赖的那个任务是否完成,没有完成返回false, // 完成了返回true, 以及后续一些操作。 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); // UniWhenComplete继承了UniCompletion push(c); c.tryFire(SYNC); // 先调一下钩子方法,检查一下任务是否结束 } return d; }

uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c)

final boolean uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T, ? super Throwable> f, UniWhenComplete<T> c) { Object r; T t; Throwable x = null; if (a == null || (r = a.result) == null || f == null) // 被依赖的任务还未完成 return false; if (result == null) { // 被依赖的任务完成了 try { if (c != null && !c.claim()) // 判断任务是否能被执行 return false; if (r instanceof AltResult) { // 判断异常,AltResult类型很简单,里面只有一个属性Throwable ex; x = ((AltResult) r).ex; t = null; } else { @SuppressWarnings("unchecked") T tr = (T) r; // 正常的结果 t = tr; } f.accept(t, x); // 执行任务 if (x == null) { internalComplete(r); // 任务的结果设置为被依赖任务的结果 return true; } } catch (Throwable ex) { if (x == null) x = ex; // 记录异常 } completeThrowable(x, r); // 设置异常和结果 } return true; }

push()

final void push(UniCompletion<?, ?> c) { if (c != null) { while (result == null && !tryPushStack(c)) lazySetNext(c, null); // 失败重置c的next域 } } final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this, STACK, h, c); } static void lazySetNext(Completion c, Completion next) { UNSAFE.putOrderedObject(c, NEXT, next); }

UniWhenComplete

static final class UniWhenComplete<T> extends UniCompletion<T, T> { BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<T> tryFire(int mode) { // 钩子方法 CompletableFuture<T> d; // 依赖的任务 CompletableFuture<T> a; // 被依赖的任务 if ((d = dep) == null || !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) // 如果是异步模式(mode = 1),就不判断任务是否结束 return null; // dep为空,说明已经调用过了 dep = null; src = null; fn = null; return d.postFire(a, mode); // 钩子方法之后的处理 } }

postFire(CompletableFuture<?> a, int mode)

final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { if (a != null && a.stack != null) { // 被依赖的任务存在,且stack不为空,先处理它 if (mode < 0 || a.result == null) // 如果是嵌套模式(mode = -1), 或者任务的结果为空,直接清空栈 a.cleanStack(); else a.postComplete(); // 否则,调用postComplete()方法 } if (result != null && stack != null) { // 再处理当前任务 if (mode < 0) // 嵌套模式,直接返回自身(树 -> 链表,避免过深的递归调用) return this; else postComplete(); // 调用postComplete()方法 } return null; }

cleanStack()

final void cleanStack() { // 过滤掉已经死掉的结点(Not isLive) for (Completion p = null, q = stack; q != null;) { // q指针从头节点开始,向右移动,s一直执行q的下一个结点,p要么为空,要么指向遍历过的最后一个活着的结点,一旦发现q死掉了,就断开q, 连接p, s Completion s = q.next; if (q.isLive()) { // 还活着,p指向遍历过的最后一个结点,q向右移动 p = q; q = s; } else if (p == null) { // 说明第一个结点就是死掉的,cas stack, q指向stack casStack(q, s); q = stack; } else { // 否则的话,连接p, s p.next = s; if (p.isLive()) // 再次判断p结点是否还或者(在这期间是否有别的线程改动了) q = s; // 还活着,q继续向右移动 else { p = null; // 过期的值,从新开始 q = stack; } } } }

如下图

  • 第1个结点是无效结点,更新stack,更新指针
  • 第2个结点是有效结点,更新指针
  • 第3个结点是无效结点,更新指针
  • 第4个结点是有效结点,更新指针

thenApply

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T, V> c = new UniApply<T, V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S, ? extends T> f, UniApply<S, T> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult) r).ex) != null) { completeThrowable(x, r); // 有异常,直接跳出 break tryComplete; } r = null; } try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") S s = (S) r; completeValue(f.apply(s)); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class UniApply<T, V> extends UniCompletion<T, V> { Function<? super T, ? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends V> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<V> tryFire(int mode) { CompletableFuture<V> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }

一样的套路,thenApply/thenApplyAsync -> uniApplyStage -> uniApply -> tryFire -> postFire

thenAccept

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } final <S> boolean uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult) r).ex) != null) { completeThrowable(x, r); // 有异常直接跳出 break tryComplete; } r = null; } try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") S s = (S) r; f.accept(s); completeNull(); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class UniAccept<T> extends UniCompletion<T, Void> { Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniAccept(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }

thenAccept/thenAcceptAsync -> uniAcceptStage -> uniAccept -> tryFire -> postFire

thenRun

public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniRun(this, f, null)) { UniRun<T> c = new UniRun<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; } final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { Object r; Throwable x; if (a == null || (r = a.result) == null || f == null) return false; if (result == null) { if (r instanceof AltResult && (x = ((AltResult) r).ex) != null) completeThrowable(x, r); else try { if (c != null && !c.claim()) return false; f.run(); completeNull(); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class UniRun<T> extends UniCompletion<T, Void> { Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn) { super(executor, dep, src); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; if ((d = dep) == null || !d.uniRun(a = src, fn, mode > 0 ? null : this)) return null; dep = null; src = null; fn = null; return d.postFire(a, mode); } }

thenRun/thenRunAsync -> uniRunStage -> uniRun -> tryFire -> postFire

thenAcceptBoth

thenAcceptBoth

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); }

biAcceptStage

private <U> CompletableFuture<Void> biAcceptStage(Executor e, CompletionStage<U> o, BiConsumer<? super T, ? super U> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept<T, U> c = new BiAccept<T, U>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }

bipush

final void bipush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) { if (c != null) { Object r; while ((r = result) == null && !tryPushStack(c)) // a的result还没准备好,c压入栈 lazySetNext(c, null); // 失败重置c的next域 if (b != null && b != this && b.result == null) { // b的result也还没准备好 Completion q = (r != null) ? c : new CoCompletion(c); // 根据a的result决定是否构建CoCompletion, 如果a未结束,则构建一个CoCompletion, CoCompletion最后调用的也是BiCompletion的tryFire while (b.result == null && !b.tryPushStack(q)) // 将q压入栈 lazySetNext(q, null); // 失败重置q的next域 } } }

CoCompletion

static final class CoCompletion extends Completion { BiCompletion<?, ?, ?> base; CoCompletion(BiCompletion<?, ?, ?> base) { this.base = base; } final CompletableFuture<?> tryFire(int mode) { BiCompletion<?, ?, ?> c; CompletableFuture<?> d; if ((c = base) == null || (d = c.tryFire(mode)) == null) // 调用的还是BiCompletion的tryFire方法 return null; base = null; return d; } final boolean isLive() { BiCompletion<?, ?, ?> c; return (c = base) != null && c.dep != null; } }

biAccept

final <R, S> boolean biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R, ? super S> f, BiAccept<R, S> c) { Object r, s; Throwable x; if (a == null || (r = a.result) == null || b == null || (s = b.result) == null || f == null) return false; // a和b都完成了,才会往下走 tryComplete: if (result == null) { if (r instanceof AltResult) { if ((x = ((AltResult) r).ex) != null) { // a的异常检查 completeThrowable(x, r); break tryComplete; } r = null; } if (s instanceof AltResult) { if ((x = ((AltResult) s).ex) != null) { // b的异常检查 completeThrowable(x, s); break tryComplete; } s = null; } try { if (c != null && !c.claim()) return false; @SuppressWarnings("unchecked") R rr = (R) r; @SuppressWarnings("unchecked") S ss = (S) s; f.accept(rr, ss); // 执行任务 completeNull(); } catch (Throwable ex) { completeThrowable(ex); } } return true; }

BiAccept

static final class BiAccept<T, U> extends BiCompletion<T, U, Void> { BiConsumer<? super T, ? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T, ? super U> fn) { super(executor, dep, src, snd); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } abstract static class BiCompletion<T, U, V> extends UniCompletion<T, V> { CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { super(executor, dep, src); this.snd = snd; } }

thenAcceptBoth/thenAcceptBothAsync -> biAcceptStage -> biAccept -> tryFire -> postFire

acceptEither

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); } private <U extends T> CompletableFuture<Void> orAcceptStage(Executor e, CompletionStage<U> o, Consumer<? super T> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orAccept(this, b, f, null)) { OrAccept<T, U> c = new OrAccept<T, U>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; } final <R, S extends R> boolean orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f, OrAccept<R, S> c) { Object r; Throwable x; if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null) || f == null) return false; // a和b有一个完成了就往下走 tryComplete: if (result == null) { try { if (c != null && !c.claim()) return false; if (r instanceof AltResult) { // 异常 if ((x = ((AltResult) r).ex) != null) { completeThrowable(x, r); break tryComplete; } r = null; } @SuppressWarnings("unchecked") R rr = (R) r; f.accept(rr); // 执行 completeNull(); } catch (Throwable ex) { completeThrowable(ex); } } return true; } static final class OrAccept<T, U extends T> extends BiCompletion<T, U, Void> { Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn) { super(executor, dep, src, snd); this.fn = fn; } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) return null; dep = null; src = null; snd = null; fn = null; return d.postFire(a, b, mode); } } final void orpush(CompletableFuture<?> b, BiCompletion<?, ?, ?> c) { if (c != null) { while ((b == null || b.result == null) && result == null) { // a和b的result都没好,才会考虑入栈 if (tryPushStack(c)) { // 先入a的栈 if (b != null && b != this && b.result == null) { // 入a的栈成功,b的result还没好 Completion q = new CoCompletion(c); // a还未结束,用c构建CoCompletion while (result == null && b.result == null && !b.tryPushStack(q)) // 再次判断,a和b的result都没好,才会考虑入栈 lazySetNext(q, null); // 失败置空q的next域 } break; } lazySetNext(c, null); // 失败置空c的next域 } } }

acceptEither/acceptEitherAsync -> orAcceptStage -> orAccept -> tryFire -> postFire

allOf

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 将一个数组构建成一棵树,二叉树,动态规划 CompletableFuture<Void> d = new CompletableFuture<Void>(); if (lo > hi) // empty d.result = NIL; else { CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : andTree(cfs, mid + 1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?, ?> c = new BiRelay<>(d, a, b); a.bipush(b, c); // both c.tryFire(SYNC); } } return d; } static final class BiRelay<T, U> extends BiCompletion<T, U, Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { super(null, dep, src, snd); } final CompletableFuture<Void> tryFire(int mode) { CompletableFuture<Void> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.biRelay(a = src, b = snd)) return null; src = null; snd = null; dep = null; return d.postFire(a, b, mode); } } boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { Object r, s; Throwable x; if (a == null || (r = a.result) == null || b == null || (s = b.result) == null) return false; // a和b都结束了才往下执行 if (result == null) { if (r instanceof AltResult && (x = ((AltResult) r).ex) != null) completeThrowable(x, r); else if (s instanceof AltResult && (x = ((AltResult) s).ex) != null) completeThrowable(x, s); else completeNull(); // 辅助结点,什么都不做 } return true; }

allOf -> andTree -> biRelay -> tryFire -> postFire

anyOf

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); } static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { // 将一个数组构建成一棵树,二叉树,动态规划 CompletableFuture<Object> d = new CompletableFuture<Object>(); if (lo <= hi) { CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid + 1) ? cfs[hi] : orTree(cfs, mid + 1, hi))) == null) throw new NullPointerException(); if (!d.orRelay(a, b)) { OrRelay<?, ?> c = new OrRelay<>(d, a, b); a.orpush(b, c); c.tryFire(SYNC); } } return d; } static final class OrRelay<T, U> extends BiCompletion<T, U, Object> { // for Or OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd) { super(null, dep, src, snd); } final CompletableFuture<Object> tryFire(int mode) { CompletableFuture<Object> d; CompletableFuture<T> a; CompletableFuture<U> b; if ((d = dep) == null || !d.orRelay(a = src, b = snd)) return null; src = null; snd = null; dep = null; return d.postFire(a, b, mode); } } final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { Object r; if (a == null || b == null || ((r = a.result) == null && (r = b.result) == null)) return false; // a和b有一个结束就往下进行 if (result == null) completeRelay(r); return true; }

anyOf -> orTree -> orRelay -> tryFire -> postFire

数组构建树

allOf和anyOf都用到了数组构建成树的策略。

 

假设有一个任务Z(虚拟的,什么都不做),依赖一组任务[A, B, C, D, E, F, G, H]

 

对于allOf, 当这组任务都完成时,才会执行Z;对于anyOf, 当这组任务中有任何一个完成,就执行任务Z。

 

如果这组任务是数组结构或者链表结构,我们该如何解决呢?遍历数组或者是链表,当任务都完成或者有一个完成时,就执行Z,需要不停地遍历,这是轮询的方法,不合适。

 

整个基调是回调,是指,当一个任务完成时,会接着执行所有依赖于它的任务。

 

作为一个数组或者链表,该如何应用回调呢?谁在先,谁在后呢?因为不知道哪个任务会先完成,所以没法确定次序。而且这组任务之间也不应该相互依赖,它们只不过都是被Z依赖。

 

如果这组任务只有一个的话,那就演变成了X.thenXXX(Z), 如果这组任务有两个的话,allOf -> Both,anyOf -> Either

 

如果Z依赖Z1,Z2两个个任务,Z1和Z2依赖Z11,Z12和Z21,Z22四个任务,依次类推,当虚拟的任务的个数达到真实任务的个数的一半时,就让虚拟任务监听真实的任务,动态规划加二叉树,时间复杂度也只是logn级别的。

static String array2Tree(String[] cfs, int lo, int hi) { String d = new String(cfs[lo] + cfs[hi]); if (lo <= hi) { String a, b; int mid = (lo + hi) >>> 1; // 二分 if (lo == mid) { // a作为左半部分的的结果 a = cfs[lo]; // 当只有不超过两个元素时,a直接取第一个值 } else { a = array2Tree(cfs, lo, mid); } if (lo == hi) { // 当只有一个元素的时候,b取a的值 b = a; } else { if (hi == mid + 1) { // 右半部分只有两个元素时,b取第二个元素的值 b = cfs[hi]; } else { b = array2Tree(cfs, mid + 1, hi); } } if (a == null || b == null) { throw new NullPointerException(); } System.out.println("[" + a + "][" + b + "]->[" + d + "]"); } return d; }

Console

[A][B]->[AB] [C][D]->[CD] [AB][CD]->[AD] [E][F]->[EF] [G][H]->[GH] [EF][GH]->[EH] [AD][EH]->[AH]

如下图

对于allOf, Z只要保证Z1和Z2都完成了就行,Z1和Z2分别保证Z11,Z12 和 Z21,Z22都完成了就像,而Z11,Z12,Z21,Z22则分别保证了A-H任务都完成。

 

对应anyOf, Z 只要保证Z1和Z2有一个完成了就像,Z1和Z2联合保证了Z11,Z12,Z21,Z22这4个任务只要有一个完成了就行,同理,Z11,Z12,Z21,Z22则联合保证了A-H中有一个任务完成了就行。

 

然后,Z就可以执行了,其实Z什么也没做,只是从这组任务里得出一个结果。

 

参考: https://www.cnblogs.com/Createsequence/p/16963895.html

https://www.cnblogs.com/aniao/p/aniao_cf.html

https://blog.csdn.net/PNGYUL/article/details/119838961

https://blog.csdn.net/qq_33512765/article/details/126427491

网友评论