CompletableFuture 异步编排详解

CompletableFuture 异步编排详解

杰子学编程 26 2022-06-01

CompletableFuture 异步编排详解

业务场景:

查询商品详情页面逻辑比较复杂,有些数据需要远程调用,必然需要花费更多的时间。

假如商品详情每个页面查询,需要的如下的标准时间完成,那么用户需要10s才能完成。这里我们需采用异步查询,但是比如接口A查询商品信息,而接口B需要查询商品的SKU,接口C需要查询商品供应商等信息,如接口C必须依赖接口A或接口B的返回值。那么我们就需要使用CompletableFuture接口来实现。

一、开启异步编程

runAsync:无入参、无返回值

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        System.out.println("main start ...");
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("开启异步任务...");
        }, service);
        System.out.println("main end ...");
    }
}

执行结果:

main start ...

main end ...

开启异步任务...

supplyAsync :无入参,可以获取返回值

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("main start ...");
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            return "开启异步任务,我是返回值";
        }, service);
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
获取异步任务返回值:开启异步任务,我是返回值
main end ...

二、计算完成回调

当我们想第一个异步任务执行完成后,还需要做其他的事情。我们的CompletableFuture提供了计算完成时回调方法,whenCompletewhenCompleteAsyncexceptionally等接口。

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

whenComplete 可以处理正常和异常的计算结果,exceptionally: 处理异常情况。

whenCompletewhenCompleteAsync 的区别是whenComplete 是执行当前任务的线程继续执行whenComplete的任务。

whenCompleteAsync: 是把whenCompleteAsync的任务继续提交给线程池来进行执行。

whenCompleteAsync

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("main start ...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            return i;
        }, service).whenCompleteAsync((res, exc) -> {
            System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);
        });
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:5  异常是:null
获取异步任务返回值:5
main end ...	

如果异步任务出现了异常,可以通过exc打印异常,我们在程序中设置一个运行时异常 ,如下:

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            if (i == 5) {
                throw new RuntimeException("远程服务调用失败");
            }
            return i;
        }, service).whenCompleteAsync((res, exc) -> {
            System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);
        });
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:null  异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
main end ...

exceptionally

上面异步任务出现了异常,我们可以使用exceptionally进行异常处理。exceptionally 接口可以接收一个异常,返回异常处理结果。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            if (i == 5) {
                throw new RuntimeException("远程服务调用失败");
            }
            return i;
        }, service).whenCompleteAsync((res, exc) -> {
            System.out.println("异步任务完成了,执行结果是:" + res + "  异常是:" + exc);
        }).exceptionally(throwable -> {
            System.out.println("进入了异常处理,捕获了" + throwable.getMessage() + "异常");
            return 5;
        });
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
异步任务完成了,执行结果是:null  异常是:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
进入了异常处理,捕获了java.lang.RuntimeException: 远程服务调用失败异常
获取异步任务返回值:5
main end ...

我们可以看到,通过exceptionally可以捕获异步任务抛出来的异常信息,并对异常进行处理,并可以将处理结果返回。

whenComplete虽然可以得到异常信息,但是无法修改结果,exceptionally可以感知异常,同时可以返回默认值。

三、handle最终处理

handle和whenComplete方法类似,但是whenComplete能感知异常但是不能返回结果。只能通过exceptionally进行处理。

而handle即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            if (i == 5) {
                throw new RuntimeException("远程服务调用失败");
            }
            return i;
        }, service).handleAsync((res, thr) -> {
            System.out.println("进入handleAsync方法");
            if (res != null) {
                return res * 2;
            }
            if (thr != null) {
                System.out.println("捕获到异常" + thr);
                return 0;
            }
            return 0;
        }, service);
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
进入handleAsync方法
捕获到异常java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
获取异步任务返回值:0
main end ...

如果我们去掉异常信息,可以看到如下返回值,最终异步执行结果为10;

main start ...
开启异步任务...
进入handleAsync方法
获取异步任务返回值:10
main end ...

四、线程串行化

在CompletableFuture中有以下方法:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值
  • thenAccept方法:消费处理结果,接收任务的处理结果,并消费处理,无返回结果
  • thenRun方法:只要上面的任务执行完成,就开始执行thenRun,只是处理完任务后,执行thenRun的后续操作。
    • thenRun 获取不到上个任务的执行结果,无返回值。

thenRun

thenRun 不能获取上一步的执行结果,并无返回值。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            return i;
        }, service).thenRun(() -> {
            System.out.println("任务2启动了...");
        });
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

运行结果:

main start ...
开启异步任务...
任务2启动了...
获取异步任务返回值:null
main end ...

如果我们需要获取上一步的执行结果,我们使用thenAccept;

thenAccept

消费处理结果,接收任务的处理结果,并消费处理,无返回结果

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            return i;
        }, service).thenAcceptAsync((res) -> {
            System.out.println("任务2启动了... 上一步的结果是:" + res);
        }, service);
        System.out.println("获取异步任务返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务返回值:null
main end ...

如果我们即需要上一步执行结果,并需要返回值供别人使用,那么我们使用thenApply方法

thenApply

当一个线程依赖另一个线程时,获取上一个任务返回结果,并返回当前任务的返回值

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务...");
            int i = 10 / 2;
            return i;
        }, service).thenApplyAsync((res) -> {
            System.out.println("任务2启动了... 上一步的结果是:" + res);
            return res * 2;
        }, service);
        System.out.println("获取异步任务最终返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务...
任务2启动了... 上一步的结果是:5
获取异步任务最终返回值:10
main end ...

五、两任务组合-两个任务都完才

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn,Executor executor);         

public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor)


public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)}
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action) }
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,
                                                     Executor executor)}

两个任务必须都完成,触发该任务。

  • runAfterBoth 没有返回值,入参CompletionStage、action;第一个异步任务.runAfterBoth(第二个异步任务,第三个异步任务)
  • thenAcceptBoth 可以获取两个任务的返回值。
  • thenCombine 可以获取两个任务的返回值,并可以将任务三结果返回。

runAfterBoth

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务2...");
            return "hello";
        }, service);
        future1.runAfterBothAsync(future2, () -> {
            System.out.println("任务3 启动了....");
        }, service);
//        System.out.println("获取异步任务最终返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了....

可以看到,任务3是在任务1和任务2执行完成后,才执行的。

thenAcceptBoth

我们使用thenAcceptBoth可以感知任务1和任务2的返回值,但是thenAcceptBoth没有返回值。我们看下案例。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务2...");
            return "hello";
        }, service);
        future1.thenAcceptBothAsync(future2, (res1, res2) -> {
            System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);
        }, service);
//        System.out.println("获取异步任务最终返回值:" + future.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
main end ...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello

我们可以看到,任务3在任务1和任务2执行后执行了,并获取了任务1和任务2的返回值。

thenCombine

可以获取两个任务的返回值,并可以将任务三结果返回

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务2...");
            return "hello";
        }, service);
        CompletableFuture<String> stringCompletableFuture = future1.thenCombineAsync(future2, (res1, res2) -> {
            System.out.println("任务3 启动了.... 任务1的返回值:" + res1 + " 任务2的返回值:" + res2);
            return res1 + "-->" + res2;
        }, service);
        System.out.println("获取异步任务最终返回值:" + stringCompletableFuture.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
开启异步任务2...
任务3 启动了.... 任务1的返回值:5 任务2的返回值:hello
获取异步任务最终返回值:5-->hello
main end ...

六、两任务组合-一个任务执行

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,
                                                       Executor executor)
                                                    
                                                    
public CompletableFuture<Void> acceptEither(
  			CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
  			CompletionStage<? extends T> other, Consumer<? super T> action)}
public CompletableFuture<Void> acceptEitherAsync(
  			CompletionStage<? extends T> other, Consumer<? super T> action,Executor executor)}

public <U> CompletableFuture<U> applyToEither(
  			CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
  			CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor)

当两个任务中,任意一个future任务完成的时候,执行任务。

  • applyToEither 两个任务有一个任务执行完成,获取它的返回值,处理任务并有新的返回值。
  • acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
  • runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

runAfterEitherAsync

不感知结果,自己没有返回值。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开启异步任务2...");
            return "hello";
        }, service);
        future1.runAfterEitherAsync(future2, () -> {
            System.out.println("任务3 启动了....");
        }, service);
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
main end ...
任务3 启动了....
开启异步任务2...

我们可以看到,任务1执行完成后,任务3不需要等待任务2执行完成,即可启动任务3。但是使用runAfterEitherAsync不能感知任务的返回值,自身也无返回值。

acceptEither

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开启异步任务2...");
            return 10;
        }, service);
        future1.acceptEitherAsync(future2, (res) -> {
            System.out.println("任务3 启动了...., 任务结果是:" + res);
        }, service);
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
main end ...
任务3 启动了...., 任务结果是:5
开启异步任务2...

可以看到,可以获取任务1的执行结果,但不返回执行结果。

applyToEither

可以感知结果,并返回执行结果。

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("开启异步任务1...");
            int i = 10 / 2;
            return i;
        }, service);
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("开启异步任务2...");
            return 10;
        }, service);
        CompletableFuture<String> stringCompletableFuture = future1.applyToEitherAsync(future2, (res) -> {
            System.out.println("任务3 启动了...., 上个任务结果是:" + res);
            return "我是任务三的返回值, 上个任务的执行结果是:" + res;
        }, service);
        System.out.println(stringCompletableFuture.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
开启异步任务1...
任务3 启动了...., 上个任务结果是:5
我是任务三的返回值, 上个任务的执行结果是:5
main end ...
开启异步任务2...

七、多任务组合

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • allOf:等待所有任务完成
  • anyOf: 只要有一个任务完成
public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品图片...");
            return "图片地址";
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品属性...");
            return "黑色 256G";
        }, service);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品品牌...");
            return "苹果手机";
        }, service);
        CompletableFuture<Void> future = CompletableFuture.allOf(future1, future2, future3);
        future.get();//等待索引结果完成
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
main end ...

注:如果不使用future.get()阻塞,若其中一个任务执行时间较长,则可能会丢失任务信息。

anyOf

public class CompletableFutureDemo {
    /**
     * 定义线程池
     */
    public static ExecutorService service = Executors.newFixedThreadPool(5);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main start ...");
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品图片...");
            return "图片地址";
        }, service);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品属性...");
            return "黑色 256G";
        }, service);
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("查询商品品牌...");
            return "苹果手机";
        }, service);
        CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println("第一个执行成功的数据:" + objectCompletableFuture.get());
        System.out.println("main end ...");
    }
}

执行结果:

main start ...
查询商品图片...
查询商品属性...
查询商品品牌...
第一个执行成功的数据:图片地址
main end ...

# Java # 多线程