您好,欢迎来到爱问旅游网。
搜索
您的当前位置:首页一篇文章搞清楚Java中CompletableFuture的使用

一篇文章搞清楚Java中CompletableFuture的使用

来源:爱问旅游网

—————————— Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it’s called the present. ——————————


Java中经常使用异步执行任务来提高效率,但是获取异步结果需要阻塞主线程,同时可能导致回调地狱问题(多个future对象之间存在依赖组合关系)。于是在Java 1.8中新增了CompletableFuture,让我们可以随意组合各种异步任务的结果。通俗来说就是我们将各个异步任务组成一个任务链,当异步任务完成时会触发这个任务链的下一步执行。首先看下官方说明:

内容翻译如下,这篇文章不探究CompletableFuture的实现过程,只了解CompletableFuture如何使用。

当两个或者多个任务完成、抛出异常、取消、或者只有其中一个成功时,我们可以显式地设置future对象的值和状态,将其当做一个CompletionStage对象,并且支持在任务完成时触发的相关函数和操作。
除了这些可以直接操作对象状态和结果的方法之外,CompletableFuture实现了CompletionStage接口,具有以下规则:
1.非异步方法可能由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。(也就是说非异步方法,例如thenApply可能由主线程完成,也可能由调用方的线程完成)
2.所有没有显式指定线程池的异步方法都会使用ForkJoinPool.commonPool执行,为了简化代码监控、调试和追踪,所有生成的异步任务都是CompletableFuture.AsynchronousCompletionTask接口的实例。
同时CompletableFuture也实现了Future接口,具有以下规则:
1.因为CompletableFuture不能直接控制需要完成的任务(和FutureTask不同),所以任务取消被视为另一种形式的异常完成。因此方法cancel与completeexception (new CancellationException())是同样的效果,方法iscompletedexception可用于确定CompletableFuture是否以任何异常方式完成。(这里的意思是说用iscompletedexception判断cancel同样会返回true,因为cancel也属于异常完成)
2.在CompletableFuture任务异常完成的情况下,方法get()和get(long, TimeUnit)会抛出一个检查异常,异常原因也就是对应的CompletionException中的原因。为了简化CompletableFuture在大多数代码中的使用,这个类还定义了join()和getNow方法,它们在这些情况下直接抛出CompletionException。(这里的CompletionException是运行时异常,不需要手动处理,也就是不用自己去try-catch)


接下来介绍CompletableFuture中的各种方法,首先新建两个线程池和两个方法用于测试

private static ThreadPoolExecutor poolExecutor1 = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new DefaultThreadFactory("pool1"), new ThreadPoolExecutor.DiscardPolicy());
private static ThreadPoolExecutor poolExecutor2 = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new DefaultThreadFactory("pool2"), new ThreadPoolExecutor.DiscardPolicy());
        
@SneakyThrows
private static void sleep(long millis) {
    Thread.sleep(millis);
}

private static void printf(String msg) {
    System.out.println(msg + ", time is " + new Date().getSeconds() + ", thead is " + Thread.currentThread().getName());
}
1、构建一个CompletableFuture对象

主要有以下三种方法:

  • completedFuture 用于构建一个现成的CompletableFuture对象;
  • runAsync 用于构建一个没有入参也没有出参的任务;
  • supplyAsync 用于构建一个没有入参但是有出参的任务。

runAsync和supplyAsync可以指定线程池,如果不指定,则使用ForkJoinPool的commonPool线程池,从下面例子中future2和future3使用的线程池可以看出。

<U> CompletableFuture<U> completedFuture(U value)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
private static void createDemo() {
    CompletableFuture<String> future1 = CompletableFuture.completedFuture("hello world");
    System.out.println(future1.join());
    CompletableFuture future2 = CompletableFuture.runAsync(() -> {
        printf("runAsync " + "random number: " + new Random().nextInt(10));
    }, poolExecutor1);
    future2.join();
    CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
        printf("supplyAsync");
        return "random number: " + new Random().nextInt(10);
    });
    System.out.println(future3.join());
}

输出结果:
hello world
runAsync random number: 8, time is 24, thead is pool1-1-1
supplyAsync, time is 24, thead is ForkJoinPool.commonPool-worker-9
random number: 4
2、获取CompletableFuture对象的结果

主要有两种方法,get和join。二者的区别在于join方法抛出的是RuntimeException,不需要显式进行处理,而使用get就需要显式捕获异常。get可以设定超时时间,getNow可以设定默认值,当未获取到future值或者出现异常时,则返回设定的默认值,如下示例。

private static void getDemo() {
    CompletableFuture future = CompletableFuture.completedFuture("hello world");
    try {
        System.out.println(future.get());
        System.out.println(future.get(1000, TimeUnit.MILLISECONDS));
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
    System.out.println(future.join());
    System.out.println(future.getNow("default"));
}
3、下一步动作

分为两类进行介绍,一类是基础动作,包括thenApply、thenAccept和thenRun,和其对应的异步方法thenApplyAsync、thenAcceptAsync和thenRunAsync。
需要注意的是,如果thenApply、thenAccept和thenRun是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,并且之后所有的thenApply、thenAccept和thenRun都会使用主线程,其他情况下会用上一个任务的线程池。
当thenApplyAsync、thenAcceptAsync和thenRunAsync没有指定线程池时,会使用ForkJoinPool的commonPool线程池,不会使用上一个任务的线程池。

  • thenApply 接收上一个future的结果用于计算,并返回一个future对象,有入参和出参;
  • thenAccept 有入参,但是没有出参;
  • thenRun 没有入参,也没有出参。
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenRun(Runnable action)
private static void thenDemo() {
    CompletableFuture<String> future1 = CompletableFuture.completedFuture("hello world");
    CompletableFuture future2 = future1
            .thenApplyAsync(s -> {
                printf("thenApplyAsync");
                return s + ", java";
            }, poolExecutor1)
            .thenAcceptAsync(s -> {
                printf("thenAcceptAsync, " + s);
            })
            .thenRun(() -> {
                printf("thenRun just run");
            });
    future2.join();
}

输出结果:
thenApplyAsync, time is 34, thead is pool1-1-1
thenAcceptAsync, hello world, java, time is 34, thead is ForkJoinPool.commonPool-worker-9
thenRunAsync just run, time is 34, thead is ForkJoinPool.commonPool-worker-9

其他动作则包括applyToEither和acceptEither,以及其对应的异步方法applyToEitherAsync和acceptEitherAsync。applyToEither和acceptEither都表示任意一个future完成即可,区别在于applyToEither接收入参有返回值,acceptEither接收入参但是没有返回值。

<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
private static void otherThenDemo() {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        sleep(2000);
        return "hello world";
    }, poolExecutor1);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "java";
    }, poolExecutor2);
    CompletableFuture<String> future3 = future2.applyToEither(CompletableFuture.supplyAsync(() -> "test"), s -> {
        printf("applyToEither");
        return "applyToEither result: " + s;
    });
    System.out.println(future3.join());
    CompletableFuture future4 = future2.acceptEither(future1, s -> printf("acceptEither string is " + s));
    future4.join();
}

输出结果:
applyToEither, time is 52, thead is main
applyToEither result: test
acceptEither string is java, time is 53, thead is pool2-2-1

可以看到这里applyToEither用的是主线程,而acceptEither用的是future2的线程。需要注意,尽量使用异步方法并且指定线程池,避免因使用主线程影响性能。

4、其他操作

还有一些其他操作,这里一起进行介绍。包括控制CompletableFuture对象状态的方法,cancel、complete、completeExceptionally,获取结果的方法allof、anyof,和其他方法,例如whenComplete。

  • complete和completeExceptionally可以更改CompletableFuture对象的状态,如果已完成则不进行操作,如果未完成,则将complete的值作为CompletableFuture对象的值,或者completeExceptionally抛出异常。
  • cancel则是取消任务,cancel方法有一个mayInterruptIfRunning参数,也就是是否能够打断正在进行的任务。cancel同样属于异常完成,因此isCompletedExceptionally方法会返回true
  • whenComplete则是在CompletableFuture对象完成时执行某些操作
  • allof和anyof分别是获取全部CompletableFuture对象的结果,和获取任意一个CompletableFuture对象的结果
boolean cancel(boolean mayInterruptIfRunning)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
private static void otherDemo() {
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        sleep(2000);
        return "hello world";
    });
    sleep(1000);
    future1.complete("java");
    System.out.println(future1.join());
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "hello world";
    });
    future2.completeExceptionally(new RuntimeException("completeExceptionally"));
    try {
        System.out.println(future2.join());
    } catch (Exception e) {
        System.out.println("error message is: " + e.getMessage());
    }
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        sleep(1000);
        return "hello world";
    }).whenComplete((s, throwable) -> System.out.println("future3 is ok, result is " + s));
    sleep(2000);
}

输出结果:
java
error message is: java.lang.RuntimeException: completeExceptionally
future3 is ok, result is hello world

需要注意的是,由于allof是获取所有CompletableFuture对象的结果,因此无法确认返回的类型,所以allof返回的是CompletableFuture<void>对象,需要使用thenApply方法来处理,示例如下:

private static void allOfDemo() {
    CompletableFuture<Integer> num1 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
    CompletableFuture<Integer> num2 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
    CompletableFuture<Integer> num3 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
    System.out.println(CompletableFuture.allOf(num1, num2, num3)
            .thenApply((Function<Void, Object>) unused -> {
                return "num1: " + num1.join() + " num2: " + num2.join() + " num3: " + num3.join();
            }).join());
}

输出结果:
num1: 0 num2: 1 num3: 9

至此,CompletableFuture中的常用方法都已介绍完毕,后续将会介绍RxJava和Spring Reactor,讲述如何使用“反应式编程”。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- awee.cn 版权所有 湘ICP备2023022495号-5

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务