0%

CompletableFuture任务编排

CompletableFuture任务编排

背景

CompletableFuture是对Future的扩展,弥补了Future的局限性,同时又实现了对任务进行编排的能力。

CompletableFuture实现于FutureCompletionStage,对于Future的扩展内容,都在CompletionStage接口中。在这个接口中定义了任务编排的方法,执行某个任务完成或者进行并行执行等操作的方法。

CompletableFuture类图.png

CompletableFutrue在对任务编排的基础上,结合Futrue的特性,做到可以将某个任务异步到其他线程执行,内部通过Futrue的阻塞相关接口来等待任务的完成,再执行编排的接下来任务。默认使用ForkJoinPool

任务编排主要分为4大类

1.依赖关系

  • thenApply():入参为以上一个任务的执行结果为入参的Function方法,也就是thenApply()中的Function方法的返回值是上一个任务执行结果的类型。
  • thenCompose():入参为以上一个任务的执行结果为Function方法的入参类型,以CompletableFuture为出参类型。也就是thenCompose()中的Function方法的返回值是CompletableFuture类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void thenApplyTest(){
// 结果转换,thenApply使用上一轮结果进行计算
// thenApply 加不加async仅表示是否使用异步线程来完成任务(可指定线程池)
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
sleep(2000);
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
}).thenApply(value -> {
return "abcd";
});
System.out.println(result.join());
}

private static void thenComposeTest(){
// thenCompose 与 thenApply 类似,但是thenCompose的返回值必须是CompletableFuture
// thenCompose是将上一次计算的结果作为参数传进来,然后需要返回一个新的CompletableFuture
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
sleep(2000);
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
}).thenCompose(param -> CompletableFuture.supplyAsync(() -> param + "d"));
System.out.println(result.join());
}

2.合并关系

合并A、B任务,当A、B任务完成之后执行C任务。

  • thenCombine:合并A、B任务,当A、B任务完成之后执行thenCombine()方法的第二个参数对应的任务。第二个参数是个BiFunction方法,接收A、B任务的返回值作为入参,且有返回值。
  • thenAccepetBoth:合并A、B任务,当A、B任务完成之后执行thenAccepetBoth()方法的第二个参数对应的任务。第二个参数是个BiConsumer方法,接收A、B任务的返回值作为入参,且无返回值。
  • runAfterBoth:合并A、B任务,当A、B任务完成之后执行runAfterBoth()方法的第二个参数对应的任务。第二个参数是个Runnable,不接收参数,且无返回值。

综上:其实三个方法的主要却别在于第二个参数所使用的的是BiFunctionBiConsumerRunnable来控制是否需要接收前面任务的参数、是否需要有返回值。其他核心逻辑并无二至。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void thenCombineTest(){
// 联合
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
sleep(2000);
System.out.println("2:" + Thread.currentThread().getId());
return "def";
}), (s1, s2) -> {
System.out.println(s1 + s2);
System.out.println("3:" + Thread.currentThread().getId());
return "hij";
});
}
public static void thenAcceptBothTest(){
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
sleep(2000);
System.out.println("2:" + Thread.currentThread().getId());
return "def";
}), (s1, s2) -> {
System.out.println(s1 + s2);
System.out.println("3:" + Thread.currentThread().getId());
});
System.out.println(future.join());
}
public static void runAfterBothTest(){
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
sleep(1000);
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
sleep(2000);
System.out.println("2:" + Thread.currentThread().getId());
return "def";
}), () -> {
//System.out.println(s1 + s2);
System.out.println("3:" + Thread.currentThread().getId());
});
System.out.println(future.join());
}

3.并联关系

A、B两个任务,只要有一个完成,就执行C任务。

  • applyToEither():A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。applyToEither()的第二个参数是Function,表示使用前一个任务的返回值为入口参数,且有返回值。
  • acceptEither():A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。acceptEither()的第二个参数是Consumer,表示使用前一个任务的返回值为入口参数,无返回值。
  • runAfterEither():A、B两个任务,哪个任务先完成就用那个任务的结果执行C任务。acceptEither()的第二个参数是Runnable,表示AB任意一个任务完成后,就执行一个任务,这个任务无入参无返回值。

综上:与合并关系其实差不多,三个方法上都是在对第二个函数式接口参数上做文章,主要为了用户在根据任务的参数需要和返回值需要选择特定的方法。

4.并行关系

多个任务时,多个任务都完成或者其中一个任务完成时,进行下一个任务的执行,返回的是新的CompletableFuture

  • allOf():当所有给定的CompletableFuture完成时,返回一个新的CompletableFuture
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void anyOfTest(){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "world";
});
// 只要有一个有结果就得到result, 但是存在一种情况,如果future1先完成,那么future2还是在执行,
// 如果在future1执行完成后,future2执行完成前获取result的值,那么result的值就是future1的值
// 获取值时如果都完成,那么由前往后取,也就是都完成的情况下,根据anyOf()方法参数定义的顺序获取。
CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
System.out.println(result.join());
}
public static void allOfTest(){
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "world";
});
// 两个都完成了
CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2);
System.out.println(result.join());
}

5.结果处理

当任务执行结束可以对任务执行的结果或者任务抛出的异常进行处理。

  • whenComplete:当任务完成时,将使用结果(或null)和此阶段的异常(或null)执行给定操作

  • exceptionally:为了捕获任务中抛出的异常进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public static void anyOfTest(){
// 当某个任务完成之后,进行下一个任务
CompletableFuture.supplyAsync(() -> {
System.out.println("1:" + Thread.currentThread().getId());
return "abc";
})
// 上一个任务若有异常则这里的s=null, throwable=异常; 若正常执行完成则s=上一个任务的返回值, throwable=null
// async表示使用可能会去使用其他线程如果使用同一线程池也可能被同一个线程执行
// 不指定线程池的情况下使用的是ForkJoinPool.commonPool()
.whenComplete((s, throwable) -> {
System.out.println(s);
System.out.println("2:" + Thread.currentThread().getId());
});
}
public static void exceptionallyTest(){
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("抛出异常");
}
System.out.println("正常结束");
return 1.1;
}).thenApply(result -> {
System.out.println("thenApply接收到的参数 = " + result);
return result;
}).exceptionally(new Function<Throwable, Double>() {
@Override
public Double apply(Throwable throwable) {
System.out.println("异常:" + throwable.getMessage());
return 0.0;
}
});
System.out.println("最终返回的结果 = " + future.join());
}

异步

​ 在CompletableFuture中针对每个方法都定义了两个xxxAsync接口,以whenComplete方法为了,如下所示,两个xxxAsync接口的区别在于是否显示指定使用的线程池。如果不使用xxxAsync接口,则使用当前线程执行任务,如果使用xxxAysnc没有显示指定线程池,则使用默认的ForkJoinPool提供的线程池。但ForkJoinPool是公共线程池,如果所有的CompletableFuture都共享一个线程池,如果某一些任务执行较慢,会导致线程池中的所有线程阻塞,造成线程饥饿,从而影响整个系统的性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}

public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}

扩展

​ 在京东的开源框架AsyncTool,主要就是对CompletableFuture进行了组装和包装,使得任务的编排和回调在使用上变更更加方便。并对任务回调相关内容做了一个优化。比如在CompletableFuture中,如果你编排了多个任务,当前执行到那个任务,每一个任务的执行结果实际上是不知道的,只有等任务都执行完毕,最总汇总结果。AysncTool认为这种方式不是友好的,一个并行框架需要对每一步的执行有能进行监控,每一步无论执行成功与失败,都应该有一个回调,才算完整。比如在某些场景中,某些任务单元是需要被跳过不执行的,但是不执行的这个任务是不是也需要一个回调,进行类似于一些通知之类的动作?

-------------本文结束感谢您的阅读-------------