Java8异步编程之CompletableFuture源码解读
一、引言
一说到异步任务,很多人上来咔咔新建个线程池。为了防止线程数量肆虐,一般还会考虑使用单例模式创建线程池,具体使用方法大都如下面的代码所示:
@Test
public void demo1() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
Future<Object> future1 = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return Thread.currentThread().getName();
}
});
System.out.println(future1.get());
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
经常使用 JavaScript 的同学相信对于异步回调的用法相当熟悉了,毕竟 JavaScript 拥有“回调地狱”的美誉。
我们大 Java 又开启了新一轮模仿之旅。🐂🍺
java.util.concurrent 包新增了 CompletableFuture 类可以实现类似 JavaScript 的连续回调。
二、两种基本用法
先来看下 CompletableFuture 的两种基本用法,代码如下:
@Test
public void index1() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> Thread.currentThread().getName());
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> Thread.currentThread().getName());
System.out.println(completableFuture1.get());
System.out.println(completableFuture2.get());
}
打印输出:
ForkJoinPool.commonPool-worker-1
null
初看代码,第一反应是代码简洁。直接调用 CompletableFuture 类的静态方法,提交任务方法就完事了。但是,随之而来的疑问就是,异步任务执行的背后是一套什么逻辑呢?是一对一使用new Thread()
还是依赖线程池去执行的呢。
三、探索线程池原理
翻阅 CompletableFuture 类的源码,我们找到答案。关键代码如下:
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
可以看到 CompletableFuture 类默认使用的是 ForkJoinPool.commonPool()
方法返回的线程池。当然啦,前提是 ForkJoinPool 线程池的数量大于 1 。否则,则使用 CompletableFuture 类自定义的 ThreadPerTaskExecutor 线程池。
ThreadPerTaskExecutor 线程池的实现逻辑非常简单,一行代码简单实现了 Executor 接口,内部执行逻辑是一条任务对应一条线程。代码如下:
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); }
}
四、两种异步接口
之前我们使用线程池执行异步任务时,当不需要任务执行完毕后返回结果的,我们都是实现 Runnable 接口。而当需要实现返回值时,我们使用的则是 Callable 接口。
同理,使用 CompletableFuture 类的静态方法执行异步任务时,不需要返回结果的也是实现 Runnable 接口。而当需要实现返回值时,我们使用的则是 Supplier 接口。其实,Callable 接口和 Supplier 接口并没有什么区别。
接下来,我们来分析一下 CompletableFuture 是如何实现异步任务执行的。
runAsync
CompletableFuture 执行无返回值任务的是 runAsync() 方法。该方法的关键执行代码如下:
static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
if (f == null) throw new NullPointerException();
CompletableFuture<Void> d = new CompletableFuture<Void>();
e.execute(new AsyncRun(d, f));
return d;
}
可以看到,该方法将 Runnable 实例作为参数封装至 AsyncRun 类。实际上, AsyncRun 类是对 Runnable 接口的进一步封装。实际上,AsyncRun 类也是实现了 Runnable 接口。观察下方 AsyncRun 类的源码,可以看到 AsyncRun 类的 run() 方法中调用了 Runnable 参数的 run() 方法。
public void run() {
CompletableFuture<Void> d; Runnable f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
f.run();
d.completeNull();
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
当提交的任务执行完毕后,即f.run()
方法执行完毕。调用d.completeNull()
方法设置任务执行结果为空。代码如下:
/** The encoding of the null value. */
static final AltResult NIL = new AltResult(null);
/** Completes with the null value, unless already completed. */
final boolean completeNull() {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
NIL);
}
可以看到,对于任务返回值为 null 的执行结果,被封装为 new AltResult(null)
对象。而且,还是调用的 CAS 本地方法实现了原子操作。
为什么需要对 null 值进行单独封装呢?观察 get() 方法的源码:
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
原来原因是便于使用 null 值区分异步任务是否执行完毕。
如果你对 CAS 不太了解的话,可以查阅 compareAndSwapObject 方法的四个参数的含义。该方法的参数 RESULT 是什么呢?查看代码如下:
RESULT = u.objectFieldOffset(k.getDeclaredField("result"));
原来,RESULT 是获取 CompletableFuture 对象中 result 字段的偏移地址。这个 result 字段又是啥呢?就是任务执行完毕后的结果值。代码如下:
// Either the result or boxed AltResult
volatile Object result;
supplyAsync
CompletableFuture 执行有返回值任务的是 supplyAsync() 方法。该方法的关键执行代码如下:
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
与 AsyncRun 类对 Runnable 接口的封装相同的是,AsyncSupply 类也是对 Runnable 接口的 run() 方法进行了一层封装。代码如下:
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
当异步任务执行完毕后,返回结果会经d.completeValue()
方法进行封装。与d.completeNull()
方法不同的是,该方法具有一个参数。代码如下:
/** Completes with a non-exceptional result, unless already completed. */
final boolean completeValue(T t) {
return UNSAFE.compareAndSwapObject(this, RESULT, null,
(t == null) ? NIL : t);
}
无论是类 AsyncRun 还是类 AsyncSupply ,run() 方法都会在执行结束之际调用 CompletableFuture 对象的 postComplete()
方法。顾名思义,该方法将通知后续回调函数的执行。
五、探究回调函数原理
前面我们提到了 CompletableFuture 具有连续回调的特性。举个例子:
@Test
public void demo2() throws ExecutionException, InterruptedException {
CompletableFuture<ArrayList> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return new ArrayList();
})
.whenCompleteAsync((list, throwable) -> {
System.out.println(Thread.currentThread().getName());
list.add(1);
})
.whenCompleteAsync((list, throwable) -> {
System.out.println(Thread.currentThread().getName());
list.add(2);
})
.whenCompleteAsync((list, throwable) -> {
System.out.println(Thread.currentThread().getName());
list.add(3);
});
System.out.println(completableFuture.get());
}
打印输出:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
[1, 2, 3]
上面的测试方法中,通过 supplyAsync 方法提交异步任务,当异步任务运行结束,对结果值添加三个回调函数进一步处理。
观察打印输出,可以初步得出如下结论:
- 异步任务与回调函数均运行在同一个线程中。
- 回调函数的调用顺序与添加回调函数的顺序一致。
那么问题来了,CompletableFuture 内部是如何处理连续回调函数的呢?
AsyncSupply
当我们提交异步任务时,等价于向线程池提交 AsyncSupply 对象或者 AsyncRun 对象。观察这两个类的唯一构造方法都是相同的,代码如下:
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
这就将 AsyncSupply 异步任务与返回给用户的 CompletableFuture 对象进行绑定,用于在执行结束后回填结果到 CompletableFuture 对象,以及通知后续回调函数的运行。
Completion
回调函数均是 Completion 类的子类,抽取 Completion 类与子类的关键代码:
Completion next;
CompletableFuture<V> dep;
CompletableFuture<T> src;
Function fn;
Completion 类含有 next 字段,很明显是一个链表。
Completion 的子类含有两个 CompletableFuture 类型的参数,dep 是新建的、用于下一步的 CompletableFuture 对象,src 则是引用它的 CompletableFuture 对象。
当 Completion 执行完回调方法后,一般会返回 dep 对象,用于迭代遍历。
CompletableFuture
观察源码,CompletableFuture 主要包含下面两个参数:
volatile Object result; //结果
volatile Completion stack; //回调方法栈
Completion 类型封装了回调方法,但为什么要起名为 stack (栈)呢?
因为 CompletableFuture 借助 Completion 的链表结构实现了栈。每当调用 CompletableFuture 对象的 whenCompleteAsync() 或其它回调方法时,都会新建一个 Completion 对象,并压到栈顶。代码如下:
final boolean tryPushStack(Completion c) {
Completion h = stack;
lazySetNext(c, h);
return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}
postComplete
回顾上面两种异步任务类的实现,当异步任务执行完毕之后,都会调用postComplete()
方法通知回调方法的执行。代码如下:
final void postComplete() {
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
这段代码是本文的核心部分,大致逻辑如下:
当异步任务执行结束后,CompletableFuture 会查看自身是否含有回调方法栈,如果含有,会通过 casStack()
方法拿出栈顶元素 h ,此时的栈顶是原来栈的第二位元素 t。如果 t 等于 null,那么直接执行回调方法 h,并返回下一个 CompletableFuture 对象。然后一直迭代这个过程。
简化上述思路,我更想称其为通过 Completion 对象实现桥接的 CompletableFuture 链表,流程图如下:
上面的过程是属于正常情况下的,也就是一个 CompletableFuture 对象只提交一个回调方法的情况。如果我们使用同一个 CompletableFuture 对象连续调用多次回调方法,那么就会形成 Completion 栈。
你以为 Completion 栈内元素会依次调用,不会的。从代码中来看,当回调方法 t 不等于 null,有两种情况:
情况 1:如果当前迭代到的 CompletableFuture 对象是 this (也就是 CompletableFuture 链表头),会令 h.next = null
,因为 h.next 也就是 t 通过 CAS 的方式压到了 this 对象的 stack 栈顶。
情况 2:如果当前迭代到的 CompletableFuture 对象 f 不是 this (不是链表头)的话,会将回调函数 h 压入 this (链表头)的 stack 中。然后从链表头再次迭代遍历。这样下去,对象 f 中的回调方法栈假设为 3-2-1,从 f 的栈顶推出再压入 this 的栈顶,顺序就变为了 1-2-3。这时候,情况就变成了第 1 种。
这样,当回调方法 t = h.next 等于 null 或者 f 等于 this 时,都会对栈顶的回调方法进行调用。
简单来说,就是将拥有多个回调方法的 CompletableFuture 对象的多余的回调方法移到到 this 对象的栈内。
回调方法执行结束要么返回下一个 CompletableFuture 对象,要么返回 null 然后手动设置为 f = this,再次从头遍历。
Async
回调函数的执行其实分为两种,区别在于带不带 Async 后缀。例如:
@Test
public void demo3() throws ExecutionException, InterruptedException {
CompletableFuture<ArrayList> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return new ArrayList();
})
.whenComplete((arrayList, throwable) -> {
System.out.println(Thread.currentThread().getName());
arrayList.add(1);
}).whenCompleteAsync((arrayList, throwable) -> {
System.out.println(Thread.currentThread().getName());
arrayList.add(2);
});
System.out.println(completableFuture.get());
}
打印输出:
ForkJoinPool.commonPool-worker-1
main
ForkJoinPool.commonPool-worker-1
[1, 2]
whenComplete() 和 whenCompleteAsync() 方法的区别在于是否在立即执行。源码如下:
private CompletableFuture<T> uniWhenCompleteStage(
Executor e, BiConsumer<? super T, ? super Throwable> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<T> d = new CompletableFuture<T>();
if (e != null || !d.uniWhenComplete(this, f, null)) {
UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
push(c);
c.tryFire(SYNC);
}
return d;
}
两个方法都是调用的 uniWhenCompleteStage() ,区别在于参数 Executor e 是否为 null。从而控制是否调用 d.uniWhenComplete() 方法,该方法会判断 result 是否为 null,从而尝试是否立即执行该回调方法。若是 supplyAsync() 方法提交的异步任务耗时相对长一些,那么就不建议使用 whenComplete() 方法了。此时由 whenComplete() 和 whenCompleteAsync() 方法提交的异步任务都会由线程池执行。
本章小结
通过本章节的源码分析,我们明白了 Completion 之所以将自身设置为链表结构,是因为 CompletableFuture 需要借助 Completion 的链表结构实现栈。也明白了同一个 CompletableFuture 对象如果多次调用回调方法时执行顺序会与调用的顺序不符合。换言之,一个 CompletableFuture 对象只调用一个回调方法才是 CompletableFuture 设计的初衷,我们在编程中也可以利用这一特性来保证回调方法的调用顺序。
因篇幅有限,本文并没有分析更多的 CompletableFuture 源码,感兴趣的小伙伴可以自行查看。
六、用法集锦
异常处理
方法:
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
示例:
@Test
public void index2() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2 / 0)
.exceptionally((e) -> {
System.out.println(e.getMessage());
return 0;
});
System.out.println(completableFuture.get());
}
输出:
java.lang.ArithmeticException: / by zero
0 ### 任务完成后对结果的处理
方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
示例:
@Test
public void index3() throws ExecutionException, InterruptedException {
CompletableFuture<HashMap> completableFuture = CompletableFuture.supplyAsync(() -> new HashMap())
.whenComplete((map, throwable) -> {
map.put("key1", "value1");
});
System.out.println(completableFuture.get());
}
输出:
{key=value}
任务完成后对结果的转换
方法:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
示例:
@Test
public void index4() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
.thenApply((r) -> r + 1);
System.out.println(completableFuture.get());
}
输出:
3
任务完成后对结果的消费
方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
示例:
@Test
public void index5() throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> 2)
.thenAccept(System.out::println);
System.out.println(completableFuture.get());
}
输出:
2
null ### 任务的组合(需等待上一个任务完成)
方法:
public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
示例:
@Test
public void index6() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
.thenCompose(integer -> CompletableFuture.supplyAsync(() -> integer + 1));
System.out.println(completableFuture.get());
}
输出:
3
任务的组合(不需等待上一步完成)
方法:
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
示例:
@Test
public void index7() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 2)
.thenCombine(CompletableFuture.supplyAsync(() -> 1), (x, y) -> x + y);
System.out.println(completableFuture.get());
}
输出:
3
消费最先执行完毕的其中一个任务,不返回结果
方法:
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
示例:
@Test
public void index8() throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
})
.acceptEither(CompletableFuture.supplyAsync(() -> 1), System.out::println);
System.out.println(completableFuture.get());
}
输出:
1
null
消费最先执行完毕的其中一个任务,并返回结果
方法:
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
示例:
@Test
public void index9() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 2;
})
.applyToEither(CompletableFuture.supplyAsync(() -> 1), x -> x + 10);
System.out.println(completableFuture.get());
}
输出:
11
等待所有任务完成
方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例:
@Test
public void index10() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2);
System.out.println("waiting all task finish..");
System.out.println(completableFuture.get());
System.out.println("all task finish");
}
输出:
waiting all task finish..
null
all task finish
返回最先完成的任务结果
方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例:
@Test
public void index11() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Object> completableFuture = CompletableFuture.anyOf(completableFuture1, completableFuture2);
System.out.println(completableFuture.get());
}
输出:
2
版权声明:凡未经本网站书面授权,任何媒体、网站及个人不得转载、复制、重制、改动、展示或使用本网站的局部或全部的内容或服务,或在非本网站所属服务器上建立镜像。如果已转载,请自行删除。同时,我们保留进一步追究相关行为主体的法律责任的权利。我们希望与各媒体合作,签订著作权有偿使用许可合同,故转载方须书面/邮件申请,以待商榷。