Zane Blog

CompletableFuture异步线程

2024-11-15
默认
java
java
最后更新:2024-11-15
16分钟
3146字

简单记录一下学习completableFuture的过程,记录的逻辑可能不太通顺,可以阅读末尾参考文章

作用

CompletableFuture继承了future,future可以获取线程执行的结果,线程执行后返回一个future,通过future获取线程执行结果。compaltedfuture可以获取异步线程的执行结果。可以进行并行操作

  • Future用于表示异步计算的结果,只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法,Java 8之前若要设置回调一般会使用guava的ListenableFuture,回调的引入又会导致臭名昭著的回调地狱(下面的例子会通过ListenableFuture的使用来具体进行展示)。
  • CompletableFuture对Future进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,支持进一步的编排,同时一定程度解决了回调地狱的问题。

创建

cf的参数是一个函数,因此可以实现函数式编程,使用lamba表达式,比较简便。

获取cf实例,可以通过new一个,也可以通过静态工厂方法。

  • 通过new关键字

    1
    CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();

    complete可以为cf传入一个结果值。

    1
    resultFuture.complete(rpcResponse);

    根据supplier创建CompletableFuture任务

    1
    //使用内置线程ForkJoinPool.commonPool(),根据supplier构建执行任务
    2
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    3
    //指定自定义线程,根据supplier构建执行任务
    4
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    根据runnable创建CompletableFuture任务

    1
    //使用内置线程ForkJoinPool.commonPool(),根据runnable构建执行任务
    2
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    3
    //指定自定义线程,根据runnable构建执行任务
    4
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    • 使用示例
    1
    ExecutorService executor = Executors.newSingleThreadExecutor();
    2
    CompletableFuture<Void> rFuture = CompletableFuture
    3
    .runAsync(() -> System.out.println("hello siting"), executor);
    4
    //supplyAsync的使用
    5
    CompletableFuture<String> future = CompletableFuture
    6
    .supplyAsync(() -> {
    7
    System.out.print("hello ");
    8
    return "siting";
    9
    }, executor);
    10
    11
    //阻塞等待,runAsync 的future 无返回值,输出null
    12
    System.out.println(rFuture.join());
    13
    //阻塞等待
    14
    String name = future.join();
    15
    System.out.println(name);
    5 collapsed lines
    16
    executor.shutdown(); // 线程池需要关闭
    17
    --------输出结果--------
    18
    hello siting
    19
    null
    20
    hello siting

    常量值作为CompletableFuture返回

    1
    //有时候是需要构建一个常量的CompletableFuture
    2
    public static <U> CompletableFuture<U> completedFuture(U value)
1
public static CompletableFuture<Void> runAsync(Runnable runnable)
2
3
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
4
5
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
6
//带有executor参数的表示,可以自定义线程池作为参数,执行异步任务,不自定义则使用默认的线程池,推荐自定义。
7
8
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

run标识没有返回值,参数是runnable,supply标识有返回值。

获取结果

1
public T get()
2
public T get(long timeout, TimeUnit unit)
3
public T getNow(T valueIfAbsent)
4
public T join() //join和get一样会阻塞后面线程。

不使用上面方法去获取结果,线程任务依然执行。(单纯新建一个completableFuture,不去get获取结果,这个任务依然在建立的时候回去运行,类似于thread的start()方法) getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。 join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别,你可以运行下面的代码进行比较:

1
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
2
int i = 1/0;
3
return 100;
4
});
5
//future.join();
6
future.get()

计算结果完成时的处理

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

1
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
2
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
3
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
4
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

BiConsumer<? super T,? super Throwable>,可以处理两个参数,一个是结果,一个是异常。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。

注意这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常。

  • whenComplete与handle的区别在于,它不参与返回结果的处理,把它当成监听器即可
  • 即使异常被处理,在CompletableFuture外层,异常也会再次复现
  • 使用whenCompleteAsync时,返回结果则需要考虑多线程操作问题,毕竟会出现两个线程同时操作一个结果

使用示例

1
CompletableFuture<AtomicBoolean> first = CompletableFuture
2
.supplyAsync(() -> {
3
if (true) { throw new RuntimeException("main error!"); }
4
return "hello world";
5
})
6
.thenApply(data -> new AtomicBoolean(false))
7
.whenCompleteAsync((data,e) -> {
8
//异常捕捉处理, 但是异常还是会在外层复现
9
System.out.println(e.getMessage());
10
});
11
first.join();
12
--------输出结果--------
13
java.lang.RuntimeException: main error!
14
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!
15
... 5 more
  • 如果之前的处理环节有异常问题,则会触发exceptionally的调用相当于 try…catch

    1
    CompletableFuture<Integer> first = CompletableFuture
    2
    .supplyAsync(() -> {
    3
    if (true) {
    4
    throw new RuntimeException("main error!");
    5
    }
    6
    return "hello world";
    7
    })
    8
    .thenApply(data -> 1)
    9
    .exceptionally(e -> {
    10
    e.printStackTrace(); // 异常捕捉处理,前面两个处理环节的日常都能捕获
    11
    return 0;
    12
    });

completeExceptionally

  • 如果你想让 CompletableFuture 的结果就是异常的话,可以使用 completeExceptionally() 方法为其赋值。

  • 1
    CompletableFuture<String> completableFuture = new CompletableFuture<>();
    2
    // ...
    3
    completableFuture.completeExceptionally(
    4
    new RuntimeException("Calculation failed!"));
    5
    // ...
    6
    completableFuture.get(); // ExecutionException

handle也返回CompletableFuture对象,但是对象的值和原来的CompletableFuture计算的值不同。当原先的CompletableFuture的值计算完成或者抛出异常的时候,会触发这个CompletableFuture对象的计算,结果由BiFunction参数计算而得。因此这组方法兼有whenComplete和转换的两个功能。

handle-任务完成或者异常时运行fn,返回值为fn的返回,相比exceptionally而言,即可处理上一环节的异常也可以处理其正常返回值

1
CompletableFuture<Integer> first = CompletableFuture
2
.supplyAsync(() -> {
3
if (true) { throw new RuntimeException("main error!"); }
4
return "hello world";
5
})
6
.thenApply(data -> 1)
7
.handleAsync((data,e) -> {
8
e.printStackTrace(); // 异常捕捉处理 第一个出现异常第二个不执行,直接进入hadle
9
return data;
10
});
11
System.out.println(first.join());
12
--------输出结果--------
13
java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!
14
... 5 more
15
null
1
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
2
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
3
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

串行执行/一元依赖

  • thenApply()

    • 1
      public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)//非异步
      2
      public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)//异步,默认线程池
      3
      public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)//异步,自定义线程池,推荐。

      可以理解为转化,因为有返回值,把上一个cf转变成另一个。

  • thenAccept()

    • 1
      public CompletableFuture<Void> thenAccept(Consumer<? super T> action)//非异步
      2
      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)//异步,默认线程池
      3
      public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)//异步,自定义线程池,推荐。

      可以理解为消费,没有返回值,只利用上一个的结果作为参数进行处理。

  • thenRun()

    • 1
      public CompletableFuture<Void> thenRun(Runnable action)
      2
      public CompletableFuture<Void> thenRunAsync(Runnable action)
      3
      public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

thenApply() 方法接受一个 Function 实例,用它来处理结果。有返回值

thenAccept() 方法的参数是 Consumer<? super T> 。接收cf的结果,但是没有返回值

thenRun() 的方法是的参数是 Runnable 。不接受结果,相当于上一个执行后,自己做自己的事情,有时间关系,没有数据传输关系。

因此,你可以根据方法的参数的类型来加速你的记忆。Runnable类型的参数会忽略计算的结果,Consumer是纯消费计算结果,BiConsumer会组合另外一个CompletionStage纯消费,Function会对计算结果做转换,BiFunction会组合另外一个CompletionStage的计算结果做转换。

CompletionStage表示cf的一个阶段,参数是CompletionStage,标识一个cf作为参数。

并行执行---组合/二元依赖,thencompose算一元依赖

**thenCompose() **

  • 1
    public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
    2
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
    3
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

    这一组方法接受一个Function作为参数,这个Function的输入是当前的CompletableFuture的计算值,返回结果将是一个新的CompletableFuture,这个新的CompletableFuture会组合原来的CompletableFuture和函数返回的CompletableFuture。

    接受上一个任务的返回值作为参数,存在先后顺序有返回值。

    • 类似thenApply(区别是thenCompose的返回值是CompletionStage,thenApply则是返回 U),提供该方法为了和其他CompletableFuture任务更好地配套组合使用。Function<T,R>,T是输入,R是返回。

    因此它的功能类似:

    1
    A +--> B +---> C
    1
    //第一个异步任务,常量任务
    2
    CompletableFuture<String> f = CompletableFuture.completedFuture("OK");
    3
    //第二个异步任务
    4
    ExecutorService executor = Executors.newSingleThreadExecutor();
    5
    CompletableFuture<String> future = CompletableFuture
    6
    .supplyAsync(() -> "hello world", executor)
    7
    .thenComposeAsync(data -> {
    8
    System.out.println(data); return f; //使用第一个任务作为返回,也可以自定义一个completableFuture
    9
    }, executor);
    10
    System.out.println(future.join());
    11
    executor.shutdown();
    12
    --------输出结果--------
    13
    hello world
    14
    OK //返回的是第一任务,得到的值就是第一个的ok
    15
    9 collapsed lines
    16
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    17
    return 100;
    18
    });
    19
    CompletableFuture<String> f = future.thenCompose( i -> {
    20
    return CompletableFuture.supplyAsync(() -> {
    21
    return (i * 10) + ""; //compose返回值是CompletionStage。retuen
    22
    });
    23
    });
    24
    System.out.println(f.get()); //1000

thenCombine()

  • 1
    public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    2
    public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
    3
    public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

    两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。

    thenCombine是,两个CompletableFuture并行执行完,然后执行fn,依赖上两个任务的结果,有返回值

    thenCompose()thenCombine() 有什么区别呢?

    • thenCompose() 可以两个 CompletableFuture 对象,并将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
    • thenCombine() 会在两个任务都执行完成后,把两个任务的结果合并。两个任务是并行执行的,它们之间并没有先后依赖顺序。

runAfterBoth

1
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
2
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
3
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)

两个CompletableFuture并行执行完,然后执行action,不依赖上两个任务的结果,无返回值

thenAcceptBoth

1
//调用方任务和other并行完成后执行action,action再依赖消费两个任务的结果,无返回值
2
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
3
BiConsumer<? super T, ? super U> action)
4
//两个任务异步完成,fn再依赖消费两个任务的结果,无返回值,使用默认线程池
5
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
6
BiConsumer<? super T, ? super U> action)
7
//两个任务异步完成,fn(用指定线程池执行)再依赖消费两个任务的结果,无返回值
8
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
9
BiConsumer<? super T, ? super U> action, Executor executor)

两个CompletableFuture并行执行完,然后执行action,依赖上两个任务的结果,无返回值

1
thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,either是当任意一个CompletableFuture计算完成的时候就会执行。

Either

1
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
2
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
3
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
4
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
5
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
6
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

acceptEither方法是当任意一个CompletionStage完成的时候,action这个消费者就会被执行。这个方法返回CompletableFuture

applyToEither方法是当任意一个CompletionStage完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。

多元依赖:依赖多个CF

default

如上图红色链路所示,整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOfanyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf,如下代码所示:

1
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
2
CompletableFuture<String> result = cf6.thenApply(v -> {
3
//这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
4
result3 = cf3.join();
5
result4 = cf4.join();
6
result5 = cf5.join();
7
//根据result3、result4、result5组装最终result;
8
return "result";
9
});

注意点

1. Future需要获取返回值,才能获取异常信息

1
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,
2
TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
3
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
4
int a = 0;
5
int b = 666;
6
int c = b / a;
7
return true;
8
},executorService).thenAccept(System.out::println);
9
10
//如果不加 get()方法这一行,看不到异常信息
11
//future.get();

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try…catch…或者使用exceptionally方法。

参考文章

基础篇:异步编程不会?我教你啊!CompletableFuture(JDK1.8) - 掘金 (juejin.cn)

CompletableFuture原理与实践-外卖商家端API的异步化 - SegmentFault 思否

Java CompletableFuture 详解 (colobu.com)

CompletableFuture入门 | JavaGuide

异步编程利器:CompletableFuture详解 |Java 开发实战 - 掘金 (juejin.cn)

本文标题:CompletableFuture异步线程
文章作者:Zane
发布时间:2024-11-15