—————————— Yesterday is history, tomorrow is a mystery, but today is a gift. That is why it’s called the present. ——————————
Java中经常使用异步执行任务来提高效率,但是获取异步结果需要阻塞主线程,同时可能导致回调地狱问题(多个future对象之间存在依赖组合关系)。于是在Java 1.8中新增了CompletableFuture,让我们可以随意组合各种异步任务的结果。通俗来说就是我们将各个异步任务组成一个任务链,当异步任务完成时会触发这个任务链的下一步执行。首先看下官方说明:
内容翻译如下,这篇文章不探究CompletableFuture的实现过程,只了解CompletableFuture如何使用。
当两个或者多个任务完成、抛出异常、取消、或者只有其中一个成功时,我们可以显式地设置future对象的值和状态,将其当做一个CompletionStage对象,并且支持在任务完成时触发的相关函数和操作。
除了这些可以直接操作对象状态和结果的方法之外,CompletableFuture实现了CompletionStage接口,具有以下规则:
1.非异步方法可能由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用者执行。(也就是说非异步方法,例如thenApply可能由主线程完成,也可能由调用方的线程完成)
2.所有没有显式指定线程池的异步方法都会使用ForkJoinPool.commonPool执行,为了简化代码监控、调试和追踪,所有生成的异步任务都是CompletableFuture.AsynchronousCompletionTask接口的实例。
同时CompletableFuture也实现了Future接口,具有以下规则:
1.因为CompletableFuture不能直接控制需要完成的任务(和FutureTask不同),所以任务取消被视为另一种形式的异常完成。因此方法cancel与completeexception (new CancellationException())是同样的效果,方法iscompletedexception可用于确定CompletableFuture是否以任何异常方式完成。(这里的意思是说用iscompletedexception判断cancel同样会返回true,因为cancel也属于异常完成)
2.在CompletableFuture任务异常完成的情况下,方法get()和get(long, TimeUnit)会抛出一个检查异常,异常原因也就是对应的CompletionException中的原因。为了简化CompletableFuture在大多数代码中的使用,这个类还定义了join()和getNow方法,它们在这些情况下直接抛出CompletionException。(这里的CompletionException是运行时异常,不需要手动处理,也就是不用自己去try-catch)
接下来介绍CompletableFuture中的各种方法,首先新建两个线程池和两个方法用于测试
private static ThreadPoolExecutor poolExecutor1 = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new DefaultThreadFactory("pool1"), new ThreadPoolExecutor.DiscardPolicy());
private static ThreadPoolExecutor poolExecutor2 = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new DefaultThreadFactory("pool2"), new ThreadPoolExecutor.DiscardPolicy());
@SneakyThrows
private static void sleep(long millis) {
Thread.sleep(millis);
}
private static void printf(String msg) {
System.out.println(msg + ", time is " + new Date().getSeconds() + ", thead is " + Thread.currentThread().getName());
}
主要有以下三种方法:
runAsync和supplyAsync可以指定线程池,如果不指定,则使用ForkJoinPool的commonPool线程池,从下面例子中future2和future3使用的线程池可以看出。
<U> CompletableFuture<U> completedFuture(U value)
CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
<U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
private static void createDemo() {
CompletableFuture<String> future1 = CompletableFuture.completedFuture("hello world");
System.out.println(future1.join());
CompletableFuture future2 = CompletableFuture.runAsync(() -> {
printf("runAsync " + "random number: " + new Random().nextInt(10));
}, poolExecutor1);
future2.join();
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
printf("supplyAsync");
return "random number: " + new Random().nextInt(10);
});
System.out.println(future3.join());
}
输出结果:
hello world
runAsync random number: 8, time is 24, thead is pool1-1-1
supplyAsync, time is 24, thead is ForkJoinPool.commonPool-worker-9
random number: 4
主要有两种方法,get和join。二者的区别在于join方法抛出的是RuntimeException,不需要显式进行处理,而使用get就需要显式捕获异常。get可以设定超时时间,getNow可以设定默认值,当未获取到future值或者出现异常时,则返回设定的默认值,如下示例。
private static void getDemo() {
CompletableFuture future = CompletableFuture.completedFuture("hello world");
try {
System.out.println(future.get());
System.out.println(future.get(1000, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(future.join());
System.out.println(future.getNow("default"));
}
分为两类进行介绍,一类是基础动作,包括thenApply、thenAccept和thenRun,和其对应的异步方法thenApplyAsync、thenAcceptAsync和thenRunAsync。
需要注意的是,如果thenApply、thenAccept和thenRun是第一个任务,前面没有其他任务,会使用主线程,可能影响性能,并且之后所有的thenApply、thenAccept和thenRun都会使用主线程,其他情况下会用上一个任务的线程池。
当thenApplyAsync、thenAcceptAsync和thenRunAsync没有指定线程池时,会使用ForkJoinPool的commonPool线程池,不会使用上一个任务的线程池。
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenRun(Runnable action)
private static void thenDemo() {
CompletableFuture<String> future1 = CompletableFuture.completedFuture("hello world");
CompletableFuture future2 = future1
.thenApplyAsync(s -> {
printf("thenApplyAsync");
return s + ", java";
}, poolExecutor1)
.thenAcceptAsync(s -> {
printf("thenAcceptAsync, " + s);
})
.thenRun(() -> {
printf("thenRun just run");
});
future2.join();
}
输出结果:
thenApplyAsync, time is 34, thead is pool1-1-1
thenAcceptAsync, hello world, java, time is 34, thead is ForkJoinPool.commonPool-worker-9
thenRunAsync just run, time is 34, thead is ForkJoinPool.commonPool-worker-9
其他动作则包括applyToEither和acceptEither,以及其对应的异步方法applyToEitherAsync和acceptEitherAsync。applyToEither和acceptEither都表示任意一个future完成即可,区别在于applyToEither接收入参有返回值,acceptEither接收入参但是没有返回值。
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn)
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
private static void otherThenDemo() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "hello world";
}, poolExecutor1);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "java";
}, poolExecutor2);
CompletableFuture<String> future3 = future2.applyToEither(CompletableFuture.supplyAsync(() -> "test"), s -> {
printf("applyToEither");
return "applyToEither result: " + s;
});
System.out.println(future3.join());
CompletableFuture future4 = future2.acceptEither(future1, s -> printf("acceptEither string is " + s));
future4.join();
}
输出结果:
applyToEither, time is 52, thead is main
applyToEither result: test
acceptEither string is java, time is 53, thead is pool2-2-1
可以看到这里applyToEither用的是主线程,而acceptEither用的是future2的线程。需要注意,尽量使用异步方法并且指定线程池,避免因使用主线程影响性能。
还有一些其他操作,这里一起进行介绍。包括控制CompletableFuture对象状态的方法,cancel、complete、completeExceptionally,获取结果的方法allof、anyof,和其他方法,例如whenComplete。
boolean cancel(boolean mayInterruptIfRunning)
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
private static void otherDemo() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "hello world";
});
sleep(1000);
future1.complete("java");
System.out.println(future1.join());
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "hello world";
});
future2.completeExceptionally(new RuntimeException("completeExceptionally"));
try {
System.out.println(future2.join());
} catch (Exception e) {
System.out.println("error message is: " + e.getMessage());
}
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "hello world";
}).whenComplete((s, throwable) -> System.out.println("future3 is ok, result is " + s));
sleep(2000);
}
输出结果:
java
error message is: java.lang.RuntimeException: completeExceptionally
future3 is ok, result is hello world
需要注意的是,由于allof是获取所有CompletableFuture对象的结果,因此无法确认返回的类型,所以allof返回的是CompletableFuture<void>对象,需要使用thenApply方法来处理,示例如下:
private static void allOfDemo() {
CompletableFuture<Integer> num1 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
CompletableFuture<Integer> num2 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
CompletableFuture<Integer> num3 = CompletableFuture.supplyAsync(() -> new Random().nextInt(10));
System.out.println(CompletableFuture.allOf(num1, num2, num3)
.thenApply((Function<Void, Object>) unused -> {
return "num1: " + num1.join() + " num2: " + num2.join() + " num3: " + num3.join();
}).join());
}
输出结果:
num1: 0 num2: 1 num3: 9
至此,CompletableFuture中的常用方法都已介绍完毕,后续将会介绍RxJava和Spring Reactor,讲述如何使用“反应式编程”。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- awee.cn 版权所有 湘ICP备2023022495号-5
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务