简单记录一下学习completableFuture的过程,记录的逻辑可能不太通顺,可以阅读末尾参考文章
作用
CompletableFuture继承了future,future可以获取线程执行的结果,线程执行后返回一个future,通过future获取线程执行结果。compaltedfuture可以获取异步线程的执行结果。可以进行并行操作
- Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8之前若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致臭名昭著的回调地狱(下面的例子会通过ListenableFuture的使用来具体进行展示)。
- CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。
创建
cf的参数是一个函数,因此可以实现函数式编程,使用lamba表达式,比较简便。
获取cf实例,可以通过new一个,也可以通过静态工厂方法。
-
通过new关键字
1CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();complete可以为cf传入一个结果值。
1resultFuture.complete(rpcResponse);根据supplier创建CompletableFuture任务
1//使用内置线程ForkJoinPool.commonPool(),根据supplier构建执行任务2public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)3//指定自定义线程,根据supplier构建执行任务4public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)根据runnable创建CompletableFuture任务
1//使用内置线程ForkJoinPool.commonPool(),根据runnable构建执行任务2public static CompletableFuture<Void> runAsync(Runnable runnable)3//指定自定义线程,根据runnable构建执行任务4public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)- 使用示例
1ExecutorService executor = Executors.newSingleThreadExecutor();2CompletableFuture<Void> rFuture = CompletableFuture3.runAsync(() -> System.out.println("hello siting"), executor);4//supplyAsync的使用5CompletableFuture<String> future = CompletableFuture6.supplyAsync(() -> {7System.out.print("hello ");8return "siting";9}, executor);1011//阻塞等待,runAsync 的future 无返回值,输出null12System.out.println(rFuture.join());13//阻塞等待14String name = future.join();15System.out.println(name);5 collapsed lines16executor.shutdown(); // 线程池需要关闭17--------输出结果--------18hello siting19null20hello siting常量值作为CompletableFuture返回
1//有时候是需要构建一个常量的CompletableFuture2public static <U> CompletableFuture<U> completedFuture(U value)
1public static CompletableFuture<Void> runAsync(Runnable runnable)2
3public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)4
5public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)6 //带有executor参数的表示,可以自定义线程池作为参数,执行异步任务,不自定义则使用默认的线程池,推荐自定义。7
8public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)run标识没有返回值,参数是runnable,supply标识有返回值。
获取结果
1public T get()2public T get(long timeout, TimeUnit unit)3public T getNow(T valueIfAbsent)4public T join() //join和get一样会阻塞后面线程。不使用上面方法去获取结果,线程任务依然执行。(单纯新建一个completableFuture,不去get获取结果,这个任务依然在建立的时候回去运行,类似于thread的start()方法)
getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别,你可以运行下面的代码进行比较:
1CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {2 int i = 1/0;3 return 100;4});5//future.join();6future.get()计算结果完成时的处理
当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:
1public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)2public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)3public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)4public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)BiConsumer<? super T,? super Throwable>,可以处理两个参数,一个是结果,一个是异常。
方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。
- whenComplete与handle的区别在于,它不参与返回结果的处理,把它当成监听器即可
- 即使异常被处理,在CompletableFuture外层,异常也会再次复现
- 使用whenCompleteAsync时,返回结果则需要考虑多线程操作问题,毕竟会出现两个线程同时操作一个结果
使用示例
1CompletableFuture<AtomicBoolean> first = CompletableFuture2 .supplyAsync(() -> {3 if (true) { throw new RuntimeException("main error!"); }4 return "hello world";5 })6 .thenApply(data -> new AtomicBoolean(false))7 .whenCompleteAsync((data,e) -> {8 //异常捕捉处理, 但是异常还是会在外层复现9 System.out.println(e.getMessage());10 });11first.join();12--------输出结果--------13java.lang.RuntimeException: main error!14Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!15 ... 5 more-
如果之前的处理环节有异常问题,则会触发exceptionally的调用相当于 try…catch
1CompletableFuture<Integer> first = CompletableFuture2.supplyAsync(() -> {3if (true) {4throw new RuntimeException("main error!");5}6return "hello world";7})8.thenApply(data -> 1)9.exceptionally(e -> {10e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获11return 0;12});
completeExceptionally
-
如果你想让
CompletableFuture的结果就是异常的话,可以使用completeExceptionally()方法为其赋值。 -
1CompletableFuture<String> completableFuture = new CompletableFuture<>();2// ...3completableFuture.completeExceptionally(4new RuntimeException("Calculation failed!"));5// ...6completableFuture.get(); // ExecutionException
handle也返回CompletableFuture对象,但是对象的值和原来的CompletableFuture计算的值不同。当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。因此这组方法兼有whenComplete和转换的两个功能。
handle-任务完成或者异常时运行fn,返回值为fn的返回,相比exceptionally而言,即可处理上一环节的异常也可以处理其正常返回值
1CompletableFuture<Integer> first = CompletableFuture2 .supplyAsync(() -> {3 if (true) { throw new RuntimeException("main error!"); }4 return "hello world";5 })6 .thenApply(data -> 1)7 .handleAsync((data,e) -> {8 e.printStackTrace(); // 异常捕捉处理 第一个出现异常第二个不执行,直接进入hadle9 return data;10 });11System.out.println(first.join());12--------输出结果--------13java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!14 ... 5 more15null1public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)2public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)3public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)串行执行/一元依赖
-
thenApply()-
1public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)//非异步2public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)//异步,默认线程池3public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)//异步,自定义线程池,推荐。
可以理解为转化,因为有返回值,把上一个cf转变成另一个。
-
-
thenAccept()-
1public CompletableFuture<Void> thenAccept(Consumer<? super T> action)//非异步2public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)//异步,默认线程池3public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)//异步,自定义线程池,推荐。
可以理解为消费,没有返回值,只利用上一个的结果作为参数进行处理。
-
-
thenRun()-
1public CompletableFuture<Void> thenRun(Runnable action)2public CompletableFuture<Void> thenRunAsync(Runnable action)3public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
-
thenApply() 方法接受一个 Function 实例,用它来处理结果。有返回值
thenAccept() 方法的参数是 Consumer<? super T> 。接收cf的结果,但是没有返回值
thenRun() 的方法是的参数是 Runnable 。不接受结果,相当于上一个执行后,自己做自己的事情,有时间关系,没有数据传输关系。
因此,你可以根据方法的参数的类型来加速你的记忆。
Runnable类型的参数会忽略计算的结果,Consumer是纯消费计算结果,BiConsumer会组合另外一个CompletionStage纯消费,Function会对计算结果做转换,BiFunction会组合另外一个CompletionStage的计算结果做转换。CompletionStage表示cf的一个阶段,参数是CompletionStage,标识一个cf作为参数。
并行执行---组合/二元依赖,thencompose算一元依赖
**thenCompose() **
-
1public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)2public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)3public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。
接受上一个任务的返回值作为参数,存在先后顺序有返回值。
- 类似thenApply(区别是thenCompose的返回值是CompletionStage,thenApply则是返回 U),提供该方法为了和其他CompletableFuture任务更好地配套组合使用。Function<T,R>,T是输入,R是返回。
因此它的功能类似:
1A +--> B +---> C1//第一个异步任务,常量任务2CompletableFuture<String> f = CompletableFuture.completedFuture("OK");3//第二个异步任务4ExecutorService executor = Executors.newSingleThreadExecutor();5CompletableFuture<String> future = CompletableFuture6.supplyAsync(() -> "hello world", executor)7.thenComposeAsync(data -> {8System.out.println(data); return f; //使用第一个任务作为返回,也可以自定义一个completableFuture9}, executor);10System.out.println(future.join());11executor.shutdown();12--------输出结果--------13hello world14OK //返回的是第一任务,得到的值就是第一个的ok159 collapsed lines16CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {17return 100;18});19CompletableFuture<String> f = future.thenCompose( i -> {20return CompletableFuture.supplyAsync(() -> {21return (i * 10) + ""; //compose返回值是CompletionStage。retuen22});23});24System.out.println(f.get()); //1000
thenCombine()
-
1public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)3public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,
other并不会等待先前的CompletableFuture执行完毕后再执行。thenCombine是,两个CompletableFuture并行执行完,然后执行fn,依赖上两个任务的结果,有返回值
那
thenCompose()和thenCombine()有什么区别呢?thenCompose()可以两个CompletableFuture对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。thenCombine()会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。
runAfterBoth
1public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)2public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)3public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)两个CompletableFuture并行执行完,然后执行action,不依赖上两个任务的结果,无返回值
thenAcceptBoth
1//调用方任务和other并行完成后执行action,action再依赖消费两个任务的结果,无返回值2public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,3 BiConsumer<? super T, ? super U> action)4//两个任务异步完成,fn再依赖消费两个任务的结果,无返回值,使用默认线程池5public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,6 BiConsumer<? super T, ? super U> action)7//两个任务异步完成,fn(用指定线程池执行)再依赖消费两个任务的结果,无返回值8public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,9 BiConsumer<? super T, ? super U> action, Executor executor)两个CompletableFuture并行执行完,然后执行action,依赖上两个任务的结果,无返回值
1thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,either是当任意一个CompletableFuture计算完成的时候就会执行。Either
1public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)2public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)3public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)4public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)5public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)6public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture
applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。
多元依赖:依赖多个CF

如上图红色链路所示,整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOf或anyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf,如下代码所示:
1CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);2CompletableFuture<String> result = cf6.thenApply(v -> {3 //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。4 result3 = cf3.join();5 result4 = cf4.join();6 result5 = cf5.join();7 //根据result3、result4、result5组装最终result;8 return "result";9});注意点
1. Future需要获取返回值,才能获取异常信息
1ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,2 TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));3CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {4 int a = 0;5 int b = 666;6 int c = b / a;7 return true;8 },executorService).thenAccept(System.out::println);9
10 //如果不加 get()方法这一行,看不到异常信息11 //future.get();Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。
参考文章
基础篇:异步编程不会?我教你啊!CompletableFuture(JDK1.8) - 掘金 (juejin.cn)
CompletableFuture原理与实践-外卖商家端API的异步化 - SegmentFault 思否
Java CompletableFuture 详解 (colobu.com)