Java并发编程(十四)-CompletableFuture
参考文献
Future
-
通常只需要使用将耗时的操作封装在一个Callable对象中,再将它提交给
ExecutorService
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public class FutureTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() throws Exception {
return doSomeLongComputation();
}
});
doSomethingElse();
try {
Double result = future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
Future
接口的局限性
- 合并异步计算: 如果你需要将两个异步计算合并为一个,其中第二个计算依赖于第一个计算的结果,
Future
接口本身无法提供直接的支持.你可能需要使用其他机制,如CompletableFuture
,以实现更复杂的任务组合和依赖关系. - 等待所有任务完成:
Future
接口没有提供一个直接的方法来等待一个Future
集合中的所有任务都完成.你可以通过编写自己的等待逻辑来实现这一功能,或者使用CompletableFuture
中的allOf
方法来等待一组CompletableFuture
对象都完成. - 等待最快任务完成:
Future
接口没有提供内置的机制来等待Future
集合中最快的任务完成,并返回其结果.你可以使用CompletableFuture
的anyOf
方法来实现这个需求. - 手动指定
Future
结果:Future
接口没有提供以编程方式手动设置异步操作的结果的方法.一旦Future
被创建,其结果通常由异步操作负责生成.如果你需要手动设置Future
的结果,你可能需要使用CompletableFuture
并结合complete
方法来实现. - 处理
Future
的完成事件:Future
接口没有提供明确的机制来处理Future
的完成事件,并在完成时收到通知.你可以尝试使用CompletableFuture
中的thenApply
,thenAccept
或thenRun
等方法,来进行链式调用或操作Future
的结果.
CompletableFuture
- 它提供了一种方便的异步编程方式,可以以函数式的方式处理异步任务的结果.
CompletableFuture
类实现了Future
和CompletionStage
接口,因此具备了Future
和CompletionStage
接口的所有方法和特性.CompletableFuture
类的主要特点有:- 异步执行任务:
CompletableFuture
可以异步执行任务,不会阻塞当前线程. - 异步任务完成后自动回调:
CompletableFuture
可以在异步任务完成后自动回调指定的函数,以处理异步任务的结果. - 串行和并行执行:
CompletableFuture
可以以串行或并行的方式执行多个异步任务. - 异常处理:
CompletableFuture
可以方便地处理异步任务抛出的异常. - 组合操作:
CompletableFuture
支持对多个异步任务的结果进行组合操作,例如等待所有异步任务完成后再执行下一步操作等.
- 异步执行任务:
创建类
completeFuture
可以用于创建默认返回值runAsync
异步执行,无返回值supplyAsync
异步执行,有返回值anyOf
任意一个执行完成,就可以进行下一步动作allOf
全部完成所有任务,才可以进行下一步任务
1 | CompletableFuture<Double> future = new CompletableFuture<>(); |
使用supplyAsync
1 | CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> calculatePrice(product)); |
supplyAsync
方法接受一个生产者(Supplier
)作为参数,返回一个CompletableFuture
对象,该对象完成异步执行后会读取调用生产者方法的返回值.- 生产者方法会交由
ForkJoinPool
池中某个执行线程(Executor
)运行,但是也可以使用supplyAsync
方法重载版本,传入第二个参数指定不同的执行线程执行生产者方法.
1 | List<CompletableFuture<String>> priceFuture = shops.stream() |
状态取值类
join
阻塞式获取结果,抛出受检异常get
合并等待结果,可以增加超时时间;get和join区别,join只会抛出unchecked异常,get会返回具体的异常getNow
如果结果计算完成或者异常了,则返回结果或异常;否则,返回valueIfAbsent的值isCancelled
isCompletedExceptionally
isDone
控制类
-
用于主动控制CompletableFuture的完成行为
-
complete
-
completeExceptionally
-
cancel
异步回调
-
thenApply, thenApplyAsync
- 如果中间过程出错,不会继续执行下个任务,会直接走到whenComplete ,结果为null,且有异常
- 任务A执行完成执行B,B需要A的结果,同时任务B有返回值
1
2
3
4
5CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(r -> r + 1).thenApply(r -> r + 1).thenRun(() -> {
System.out.println("run 了");
}).join(); -
thenAccept, thenAcceptAsync
- 任务A执行完成执行B,B需要A的结果,但是任务B无返回值
1
2
3
4
5CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(r -> r + 1).thenApply(r -> r + 1).thenAccept(r -> {
System.out.println("accept " + r);
}); -
thenRun, thenRunAsync
- 任务A执行完执行B,并且B不需要A的结果
1
2
3
4
5CompletableFuture.supplyAsync(() -> {
return 1;
}).thenApply(r -> r + 1).thenApply(r -> r + 1).thenRun(() -> {
System.out.println("run 了");
}).join(); -
thenCombine, thenCombineAsync
1
2
3
4CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
); -
thenAcceptBoth, thenAcceptBothAsync
-
runAfterBoth, runAfterBothAsync
-
applyToEither, applyToEitherAsync
- applyToEither 比较两个cf哪个快,哪个快用哪个的结果
- 取效率最优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public static void main(String[] args) {
CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(11);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task1 run...");
return 1;
});
CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task2 run...");
return 2;
});
CompletableFuture<Integer> cf = cf1.applyToEither(cf2, result -> {
System.out.println("最终结果: " + result);
return result;
});
System.out.println(cf.join());
} -
acceptEither, acceptEitherAsync
-
runAfterEither, runAfterEitherAsync
-
thenCompose, thenComposeAsync
- 两个cf异步执行,合并cf结果
- 异步结果合并
1
2
3
4
5
6
7
8
9
10
11
12
13
14//使用多个异步操作完成流水线操作
public static List<String> findDiscountPriceByCompletableFuture(List<Shop> shops, String productName) {
List<CompletableFuture<String>> collect = shops.stream()
//1. 第一步异步操作: 从远程商店查询商品的原始价格以及对应的折扣
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getProductPrice(productName), EXECUTOR))
//2. 这里可以使用同步操作,因为仅仅涉及到数据的转换
.map(future -> future.thenApply(PriceResult::parse))
//3. 第二个异步操作: 向远程折扣服务传递指定数据并返回折扣后的价格
.map(future -> future.thenCompose(priceResult -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(priceResult), EXECUTOR)))
.collect(Collectors.toList());
return collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
} -
whenComplete, whenCompleteAsync
-
handle, handleAsync
- handle 方法与 thenApply 类似,但它可以处理异常情况.
- 无论前一个阶段是否出现异常,handle 方法都会被执行.
- 如果前一个阶段出现异常,则异常被传递给 handle 方法的异常参数,并执行异常处理.
- 如果前一个阶段正常完成,则前一个阶段的结果将传递给 handle 方法的结果参数,并执行正常结果处理.
-
exceptionally
-
以Async结尾的方法,都是异步方法,对应的没有Async则是同步方法,一般都是一个异步方法对应一个同步方法.
- 后缀带Async的方法,它的意思是,另起一个线程来执行这个任务,不带Async就不是当前任务异步那么默认沿用上个任务的线程执行,如果不传线程池,默认会使用ForkJoin线程池
- 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行.所以有时间测试发现不带Async的使用Main线程执行,正是因为上一步cf已经执行完了
- 如果注册时被依赖的操作还未执行完,则由回调线程执行.
- 后缀带Async的方法,它的意思是,另起一个线程来执行这个任务,不带Async就不是当前任务异步那么默认沿用上个任务的线程执行,如果不传线程池,默认会使用ForkJoin线程池
-
以run开头的方法,其入口参数一定是无参的,并且没有返回值,类似于执行Runnable方法.
-
以supply开头的方法,入口也是没有参数的,但是有返回值
-
以Accept开头或者结尾的方法,入口参数是有参数,但是没有返回值
-
以Apply开头或者结尾的方法,入口有参数,有返回值
-
带有either后缀的方法,表示谁先完成就消费谁
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 HoleLin's Blog!