WD
Classnote Docs课程课件
20

CompletableFuture

它解决了传统 Future 的局限性,提供了更灵活、强大的功能。

提供有序包含执行结果的异步执行方案

一个CompletableFuture对象主要有两个功能:

  • 表示一个Future对象,即可以表示一个异步任务的执行结果
  • 可以实现多个异步任务之间的功能依赖,对多个异步任务之间进行"组合"。

什么是组合呢?比如"异步任务A需要在异步任务B之后执行,那么可以通过CompletableFuture实现任务A执行完毕后,自动触发任务B的执行"

image-20240519161150253
image-20240519161150253

如上图所示,这里描绘的是一个业务接口的流程,其中包括CF1\CF2\CF3\CF4\CF5\CF6 共6个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次数据库操作或者是一次本地方法调用,或者是一次服务间的调用等,在使用CompletableFuture进行异步化编程时,图中的每个步骤都会产生一个CompletableFuture对象,最终结果也会用一个CompletableFuture来进行表示。

根据CompletableFuture依赖数量,可以分为以下几类:零依赖、一元依赖、二元依赖和多元依赖。

  • runsupply
  • run:无返回值
  • supply:包含执行结果
  • accept:获得前面的执行结果无返回值
  • apply:获得前面的执行结果有返回值
  • 有无Async
  • 无Async:沿用之前的线程
  • 有Async:强制开启新的线程,也可以传参,指定线程
  • then:一元依赖
01 / Section

零依赖

零依赖,其实也就是CompletableFuture对象的创建,我们可以先来看看如何不依赖其他的CompletableFuture对象来创建新的CompletableFuture。

image-20240519161519164
image-20240519161519164

如上图CF1和CF2的创建,其实就是直接创建CompletableFuture对象。

runAsync-无返回值

第一种方式是获取一个表示无返回值异步任务执行结果的CompletableFuture对象

接口

java
// 获取一个异步任务CompletableFuture对象
// 这里没有指定线程池,默认会把当前任务交给CompletableFuture内部的线程池来执行
static CompletableFuture<Void> runAsync(Runnable runnable)

// 在指定的线程池中,执行下一个异步任务
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

*区别*

第一个方法没有传入线程池,那么在执行异步任务的时候,就会由CompletableFuture默认的线程池(ForkJoinPool)来执行 第二个方法传入了线程池,那么执行的时候 由传入的线程池来执行异步任务

而默认的ForkJoinPool中的线程都是守护线程(守护线程的特点是会随着主线程的停止而停止),ForkJoinPool适合于把一个大的任务,拆分成若干个小的任务来执行

使用案例

java
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("Run零依赖无参无返回值的异步任务");
});

CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> System.out.println("异步任务2"));

cf1.join();
cf2.join();

注意事项

  • CompletableFuture<Void>,泛型类型为Void,表示异步任务没有执行结果
  • 一个异步任务虽然没有返回值,但是会有执行状态,而这个执行状态,也被封装在CompletableFuture<Void>对象中,因此我们可以做如下操作
java
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("执行任务");
});

// 异步任务是否被取消了
boolean isCancelled = future.isCancelled();
// 是否在执行过程中出现了异常
boolean isExceptional = future.isCompletedExceptionally();
// 是否正常执行完毕了
boolean done = future.isDone();

supplyAsync-有返回值

第二种方式是获取一个表示有返回值的异步任务执行结果的CompletableFuture对象、

接口

java
/*
	Supplier<U> supplier 提供者接口,产生一个结果
	CompletableFuture<U> 表示supplier在子线程中执行的结果,包含U类型的对象
*/
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    

// 在指定的线程池中,执行下一个异步任务
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
java
public interface Supplier<T> {
    T get();
}

使用案例

java
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    String result = "hello world";
    return result;
});

CompletableFuture<User> cf2 = CompletableFuture.supplyAsync(() -> {
    User user = User.builder().username("张三").gender("男").birthday(new Date()).build();
    return user;
});

String cf1Result = cf1.join();
User cf2Result = cf2.join();
System.out.println("cf1Result = " + cf1Result);
System.out.println("cf2Result = " + cf2Result);

注意事项

  • CompletableFuture<String>, 泛型类型为String,表示异步任务最终返回的结果是一个String对象
  • CompletableFuture<String>中除了封装异步任务执行的状态,还封装了异步任务的执行结果,可以通过如下方式获取
java
// CompletableFuture的泛型是什么类型,get方法获取的就是什么类型的值,若为Void,则get的结果是null
String result = completableFuture.get();
  • 关于CompletableFuture.get()方法,是一个阻塞方法,会阻塞调用线程,直到CompletableFuture对象所代表的的异步任务执行结束。
02 / Section

一元依赖

image-20240519162415395
image-20240519162415395

如上图所示,CF3和CF5都需要依赖于前一个CompletableFuture对象。

thenRun & thenRunAsync

接口

java
// thenRun方法:在当前异步任务结束后,触发下一个异步任务(thenRun方法传递的参数)无参且没有返回值的异步任务
CompletableFuture<Void> thenRun(Runnable action)

//强制切换线程,如果后续操作非常轻量,使用 thenRunAsync 的线程切换开销可能高于操作本身。
// thenRunAync方法:方法参数和执行结果同thenRun,区别是thenRunAsync方法传递的异步任务所执行的线程是一个新的线程.
// thenRunAsync方法还有一个重载方法,executor指定下一个异步任务执行所使用的线程池
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

使用案例

java
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
});

CompletableFuture<Void> cf2 = cf1.thenRun(() -> {
    System.out.println("无参无返回值的一元依赖异步任务");
});

cf2.join();

或者我们可以使用链式调用的方式,重新书写以上的代码(效果等效),实际开发中我们通常习惯使用链式调用方式

java
CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
}).thenRun(() -> {
    System.out.println("无参无返回值的一元依赖异步任务");
}).join();

thenAccept & thenAcceptAsync

接口

thenAccept方法: 在当前异步任务执行结束后,触发下一个异步任务的执行,且下一个异步任务,接收当前异步任务的结果作为参数执行,下一个异步任务无返回值

java
/*
	参数:
	 action的accept方法的参数表示下一个异步任务所接受的当前异步任务执行的结果
	返回值: 
	 CompletableFuture<Void>表示下一个异步任务的执行结果无返回值
*/
CompletableFuture<Void> thenAccept(Consumer<? super T> action)

// thenAcceptAsync方法: 方法参数和执行结果同thenAccept,区别是thenAcceptAsync方法传递的异步任务所执行的线程是一个新的线程
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
    
// thenAcceptAsync还有一个重载方法,executor指定下一个异步任务执行所使用的线程池
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
java
public interface Consumer<T> {
    
    void accept(T t);
}
java
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello ");
CompletableFuture<Void> cf2 = cf1.thenAccept((cf1Result) -> {
    System.out.println(cf1Result + "world");
});

cf2.join();

thenApply & thenApplyAsync

接口

thenApply方法: 在当前异步任务执行结束后,触发下一个异步任务的执行,下一个异步任务接收当前异步任务的结果作为参数,且下一个异步任务有自己的返回值

thenApplyAsync方法:方法参数和执行结果同thenApply,区别是thenApplyAsync方法传递的异步任务所执行的线程是一个新的线程

java
/*
	参数:
	 Function的apply方法第一个参数表示接收的当前异步的结果
	 Function的apply方法返回值表示下一个异步任务的结果
	返回值: 
	 CompletableFuture<U>表示下一个异步任务的执行结果包含下一个异步任务的返回值
*/
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    
// 把异步任务交给默认的线程池来执行
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    
// 把异步任务交给指定的线程池来执行
CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
java
public interface Function<T, R> {
    // 将一个参数值转化为另一个类型的值并返回
    R apply(T t);
}

使用案例

java
CompletableFuture
    .supplyAsync(() -> "张三")
    .thenApply(name -> {
        User user = User.builder().username(name).gender("男").birthday(new Date()).build();
        return user;
    }).thenAccept(user -> {
        user.setGender("女");
        System.out.println(user);
    }).join();
03 / Section

二元依赖

image-20240519201159136
image-20240519201159136

如上图红色链路所示,CF4同时依赖于两个CF1和CF2,这种二元依赖可以通过thenCombine等方法来实现,如下代码所示

thenCombine & thenCombineAsync

接口

java
/*
	参数:
	 other: 待合并的另一个异步任务
	 T: 第一个任务的返回结果
	 U: 第二个任务的返回结果
	 fn: 组合之后的返回类型
	返回值: 
	 CompletableFuture<U>表示下一个异步任务的执行结果包含下一个异步任务的返回值
*/
public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);

public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
java
@FunctionalInterface
public interface BiFunction<T, U, R> {
	// T 第一个任务的返回结果
	// U 第二个任务的返回结果
	// R 组合之后的返回类型
	R apply(T t, U u);
}

使用

java
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "李四");
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> "男");

CompletableFuture<User> cf3 = cf1.thenCombine(cf2, (v1, v2) -> {
    User user = User.builder()
            .username(v1)
            .gender(v2)
            .build();
    return user;
});

CompletableFuture<User> cf4 = cf3.thenApply(user -> {
    user.setBirthday(new Date());
    return user;
});

User user = cf4.join();
System.out.println("user = " + user);
04 / Section

多元依赖

image-20240519202449036
image-20240519202449036

如上图红色链路所示,整个流程的结束依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过allOfanyOf方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf

allOf

allOf方法: 可以将多个异步任务合并为一个新的异步任务,当且仅当所有的异步任务都执行完,新的异步任务才算执行完

java
/*
   参数表示待合并的多个异步任务
   返回值为CompletableFuture<Void>类型,说明合并之后的异步任务不包含任何返回值
*/
CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
java
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    sleep(3000);
    System.out.println("异步任务1完成");
});
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
    sleep(1000);
    System.out.println("异步任务2完成");
});
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
    sleep(5000);
    System.out.println("异步任务3完成");
});
CompletableFuture<Void> cf4 = CompletableFuture.runAsync(() -> {
    sleep(2000);
    System.out.println("异步任务4完成");
});

CompletableFuture<Void> cfAll = CompletableFuture.allOf(cf1, cf2, cf3, cf4);
cfAll.join();
System.out.println("执行完成");

private static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

anyOf (了解)

allOf方法: 可以将多个异步任务合并为一个新的异步任务,其中只要有一个任务执行完,新的异步任务就算执行完,且会将该任务的执行结果,作为新的异步任务的结果

java
/*
   参数表示待合并的多个异步任务
   返回值为CompletableFuture<Object> 合并之后的异步任务的结果,同先执行完的那个异步任务的结果
*/
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

注意:如果这些待合并的异步任务中,同时有多个异步任务执行完毕,那么合并之后的新的异步任务的执行结果就是不确定的,具有很大的不确定性,所以实际开发中很少用。

java
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> {
    sleep(3000);
    System.out.println("异步任务1完成");
});
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
    sleep(1000);
    System.out.println("异步任务2完成");
});
CompletableFuture<Void> cf3 = CompletableFuture.runAsync(() -> {
    sleep(5000);
    System.out.println("异步任务3完成");
});
CompletableFuture<Void> cf4 = CompletableFuture.runAsync(() -> {
    sleep(2000);
    System.out.println("异步任务4完成");
});

CompletableFuture<Object> anyOfCf = CompletableFuture.anyOf(cf1, cf2, cf3, cf4);
anyOfCf.join();
System.out.println("执行完成");

private static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
05 / Section

异常处理

CompletableFuture还给我们提供了一个专门用来处理异常的方法:exceptionally

java
/*
	注意,该方法当且仅当 当前异步任务出现异常的时候才会执行
	作用:
	  通过执行fn,将当前异步任务的异常转化为某一个默认值返回
	返回值:异常处理的结果,即当发生异常时所返回的默认值,默认值的数据类型同当前异步任务的返回值类型
*/
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
java
CompletableFuture<int[]> exceptionally = CompletableFuture.supplyAsync(() -> {
    int[] a = null;
    System.out.println(a[3]);
    return a;
})
.exceptionally(e -> new int[0]);

int[] ints = exceptionally.get();
System.out.println(ints.length);
image-20250513115104402
image-20250513115104402