CompletableFuture任务编排
背景
CompletableFuture
是对Future
的扩展,弥补了Future
的局限性,同时又实现了对任务进行编排的能力。
CompletableFuture
实现于Future
和CompletionStage
,对于Future
的扩展内容,都在CompletionStage
接口中。在这个接口中定义了任务编排的方法,执行某个任务完成或者进行并行执行等操作的方法。
CompletableFutrue
在对任务编排的基础上,结合Futrue
的特性,做到可以将某个任务异步到其他线程执行,内部通过Futrue
的阻塞相关接口来等待任务的完成,再执行编排的接下来任务。默认使用ForkJoinPool
。
任务编排主要分为4大类
1.依赖关系
thenApply()
:入参为以上一个任务的执行结果为入参的Function
方法,也就是thenApply()
中的Function
方法的返回值是上一个任务执行结果的类型。
thenCompose()
:入参为以上一个任务的执行结果为Function
方法的入参类型,以CompletableFuture
为出参类型。也就是thenCompose()
中的Function
方法的返回值是CompletableFuture
类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| private static void thenApplyTest(){ CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> { sleep(2000); System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }).thenApply(value -> { return "abcd"; }); System.out.println(result.join()); }
private static void thenComposeTest(){ CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> { sleep(2000); System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }).thenCompose(param -> CompletableFuture.supplyAsync(() -> param + "d")); System.out.println(result.join()); }
|
2.合并关系
合并A、B任务,当A、B任务完成之后执行C任务。
thenCombine
:合并A、B任务,当A、B任务完成之后执行thenCombine()
方法的第二个参数对应的任务。第二个参数是个BiFunction
方法,接收A、B任务的返回值作为入参,且有返回值。
thenAccepetBoth
:合并A、B任务,当A、B任务完成之后执行thenAccepetBoth()
方法的第二个参数对应的任务。第二个参数是个BiConsumer
方法,接收A、B任务的返回值作为入参,且无返回值。
runAfterBoth
:合并A、B任务,当A、B任务完成之后执行runAfterBoth()
方法的第二个参数对应的任务。第二个参数是个Runnable
,不接收参数,且无返回值。
综上:其实三个方法的主要却别在于第二个参数所使用的的是BiFunction
、BiConsumer
、Runnable
来控制是否需要接收前面任务的参数、是否需要有返回值。其他核心逻辑并无二至。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public static void thenCombineTest(){ CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { sleep(1000); System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }).thenCombine(CompletableFuture.supplyAsync(() -> { sleep(2000); System.out.println("2:" + Thread.currentThread().getId()); return "def"; }), (s1, s2) -> { System.out.println(s1 + s2); System.out.println("3:" + Thread.currentThread().getId()); return "hij"; }); } public static void thenAcceptBothTest(){ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { sleep(1000); System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { sleep(2000); System.out.println("2:" + Thread.currentThread().getId()); return "def"; }), (s1, s2) -> { System.out.println(s1 + s2); System.out.println("3:" + Thread.currentThread().getId()); }); System.out.println(future.join()); } public static void runAfterBothTest(){ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { sleep(1000); System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }).runAfterBoth(CompletableFuture.supplyAsync(() -> { sleep(2000); System.out.println("2:" + Thread.currentThread().getId()); return "def"; }), () -> { System.out.println("3:" + Thread.currentThread().getId()); }); System.out.println(future.join()); }
|
3.并联关系
A、B两个任务,只要有一个完成,就执行C任务。
applyToEither()
:A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。applyToEither()
的第二个参数是Function
,表示使用前一个任务的返回值为入口参数,且有返回值。
acceptEither()
:A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。acceptEither()
的第二个参数是Consumer
,表示使用前一个任务的返回值为入口参数,无返回值。
runAfterEither()
:A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。acceptEither()
的第二个参数是Runnable
,表示AB任意一个任务完成后,就执行一个任务,这个任务无入参无返回值。
综上:与合并关系其实差不多,三个方法上都是在对第二个函数式接口参数上做文章,主要为了用户在根据任务的参数需要和返回值需要选择特定的方法。
4.并行关系
多个任务时,多个任务都完成或者其中一个任务完成时,进行下一个任务的执行,返回的是新的CompletableFuture
。
allOf()
:当所有给定的CompletableFuture
完成时,返回一个新的CompletableFuture
anyOf()
:当任何一个给定的CompletablFuture
完成时,返回一个新的CompletableFuture
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public static void anyOfTest(){ CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(3000); return "hello"; });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(2000); return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2); System.out.println(result.join()); } public static void allOfTest(){ CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { sleep(3000); return "hello"; });
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { sleep(2000); return "world"; }); CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2); System.out.println(result.join()); }
|
5.结果处理
当任务执行结束可以对任务执行的结果或者任务抛出的异常进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public static void anyOfTest(){ CompletableFuture.supplyAsync(() -> { System.out.println("1:" + Thread.currentThread().getId()); return "abc"; }) .whenComplete((s, throwable) -> { System.out.println(s); System.out.println("2:" + Thread.currentThread().getId()); }); } public static void exceptionallyTest(){ CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> { if (Math.random() < 0.5) { throw new RuntimeException("抛出异常"); } System.out.println("正常结束"); return 1.1; }).thenApply(result -> { System.out.println("thenApply接收到的参数 = " + result); return result; }).exceptionally(new Function<Throwable, Double>() { @Override public Double apply(Throwable throwable) { System.out.println("异常:" + throwable.getMessage()); return 0.0; } }); System.out.println("最终返回的结果 = " + future.join()); }
|
异步
在CompletableFuture
中针对每个方法都定义了两个xxxAsync
接口,以whenComplete
方法为了,如下所示,两个xxxAsync
接口的区别在于是否显示指定使用的线程池。如果不使用xxxAsync
接口,则使用当前线程执行任务,如果使用xxxAysnc
没有显示指定线程池,则使用默认的ForkJoinPool
提供的线程池。但ForkJoinPool
是公共线程池,如果所有的CompletableFuture
都共享一个线程池,如果某一些任务执行较慢,会导致线程池中的所有线程阻塞,造成线程饥饿,从而影响整个系统的性能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); }
public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) { return uniWhenCompleteStage(screenExecutor(executor), action); }
|
扩展
在京东的开源框架AsyncTool
,主要就是对CompletableFuture
进行了组装和包装,使得任务的编排和回调在使用上变更更加方便。并对任务回调相关内容做了一个优化。比如在CompletableFuture
中,如果你编排了多个任务,当前执行到那个任务,每一个任务的执行结果实际上是不知道的,只有等任务都执行完毕,最总汇总结果。AysncTool
认为这种方式不是友好的,一个并行框架需要对每一步的执行有能进行监控,每一步无论执行成功与失败,都应该有一个回调,才算完整。比如在某些场景中,某些任务单元是需要被跳过不执行的,但是不执行的这个任务是不是也需要一个回调,进行类似于一些通知之类的动作?