参考文献

Future

  • 通常只需要使用将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public class FutureTest {
    public static void main(String[] args) {
    ExecutorService executor = Executors.newCachedThreadPool();
    Future<Double> future = executor.submit(new Callable<Double>() {
    @Override
    public Double call() throws Exception {
    return doSomeLongComputation();
    }
    });
    doSomethingElse();
    try {
    Double result = future.get(1, TimeUnit.SECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
    throw new RuntimeException(e);
    }
    }
    }

Future接口的局限性

  • 合并异步计算: 如果你需要将两个异步计算合并为一个,其中第二个计算依赖于第一个计算的结果,Future接口本身无法提供直接的支持.你可能需要使用其他机制,如CompletableFuture,以实现更复杂的任务组合和依赖关系.
  • 等待所有任务完成: Future接口没有提供一个直接的方法来等待一个Future集合中的所有任务都完成.你可以通过编写自己的等待逻辑来实现这一功能,或者使用CompletableFuture中的allOf方法来等待一组CompletableFuture对象都完成.
  • 等待最快任务完成: Future接口没有提供内置的机制来等待Future集合中最快的任务完成,并返回其结果.你可以使用CompletableFutureanyOf方法来实现这个需求.
  • 手动指定Future结果: Future接口没有提供以编程方式手动设置异步操作的结果的方法.一旦Future被创建,其结果通常由异步操作负责生成.如果你需要手动设置Future的结果,你可能需要使用CompletableFuture并结合complete方法来实现.
  • 处理Future的完成事件: Future接口没有提供明确的机制来处理Future的完成事件,并在完成时收到通知.你可以尝试使用CompletableFuture中的thenApply, thenAcceptthenRun等方法,来进行链式调用或操作Future的结果.

CompletableFuture

  • 它提供了一种方便的异步编程方式,可以以函数式的方式处理异步任务的结果.
  • CompletableFuture类实现了FutureCompletionStage接口,因此具备了FutureCompletionStage接口的所有方法和特性.
  • CompletableFuture类的主要特点有:
    1. 异步执行任务: CompletableFuture可以异步执行任务,不会阻塞当前线程.
    2. 异步任务完成后自动回调: CompletableFuture可以在异步任务完成后自动回调指定的函数,以处理异步任务的结果.
    3. 串行和并行执行: CompletableFuture可以以串行或并行的方式执行多个异步任务.
    4. 异常处理: CompletableFuture可以方便地处理异步任务抛出的异常.
    5. 组合操作: CompletableFuture支持对多个异步任务的结果进行组合操作,例如等待所有异步任务完成后再执行下一步操作等.

创建类

  • completeFuture可以用于创建默认返回值
  • runAsync 异步执行,无返回值
  • supplyAsync 异步执行,有返回值
  • anyOf任意一个执行完成,就可以进行下一步动作
  • allOf 全部完成所有任务,才可以进行下一步任务
1
2
3
4
5
6
7
8
9
10
CompletableFuture<Double> future = new CompletableFuture<>();
new Thread(() -> {
try {
double price = calculatePrice(product);
future.complete(price);
} catch (Exception e) {
future.completeExceptionally(e);
}
}).start();
return future;
使用supplyAsync
1
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> calculatePrice(product));
  • supplyAsync 方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值.
  • 生产者方法会交由ForkJoinPool池中某个执行线程(Executor)运行,但是也可以使用supplyAsync 方法重载版本,传入第二个参数指定不同的执行线程执行生产者方法.
1
2
3
4
5
6
7
List<CompletableFuture<String>> priceFuture = shops.stream()
// 使用CompletableFuture以异步的方式做耗时操作
.map(shop -> CompletableFuture.supplyAsync(() -> doSomeLongThing()))
.collect(Collectors.toList());
// 等待所有异步操作结束
return priceFuture.stream().map(CompletableFuture::join)
.collect(Collectors.toList());

状态取值类

  • join 阻塞式获取结果,抛出受检异常
  • get 合并等待结果,可以增加超时时间;get和join区别,join只会抛出unchecked异常,get会返回具体的异常
  • getNow 如果结果计算完成或者异常了,则返回结果或异常;否则,返回valueIfAbsent的值
  • isCancelled
  • isCompletedExceptionally
  • isDone

控制类

  • 用于主动控制CompletableFuture的完成行为

  • complete

  • completeExceptionally

  • cancel

异步回调

  • thenApply, thenApplyAsync

    • 如果中间过程出错,不会继续执行下个任务,会直接走到whenComplete ,结果为null,且有异常
    • 任务A执行完成执行B,B需要A的结果,同时任务B有返回值
    1
    2
    3
    4
    5
    CompletableFuture.supplyAsync(() -> {
    return 1;
    }).thenApply(r -> r + 1).thenApply(r -> r + 1).thenRun(() -> {
    System.out.println("run 了");
    }).join();
  • thenAccept, thenAcceptAsync

    • 任务A执行完成执行B,B需要A的结果,但是任务B无返回值
    1
    2
    3
    4
    5
    CompletableFuture.supplyAsync(() -> {
    return 1;
    }).thenApply(r -> r + 1).thenApply(r -> r + 1).thenAccept(r -> {
    System.out.println("accept " + r);
    });
  • thenRun, thenRunAsync

    • 任务A执行完执行B,并且B不需要A的结果
    1
    2
    3
    4
    5
    CompletableFuture.supplyAsync(() -> {
    return 1;
    }).thenApply(r -> r + 1).thenApply(r -> r + 1).thenRun(() -> {
    System.out.println("run 了");
    }).join();
  • thenCombine, thenCombineAsync

    1
    2
    3
    4
    CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> shop.getPrice(product))
    .thenCombine(CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
    (price, rate) -> price * rate
    );
  • thenAcceptBoth, thenAcceptBothAsync

  • runAfterBoth, runAfterBothAsync

  • applyToEither, applyToEitherAsync

    • applyToEither 比较两个cf哪个快,哪个快用哪个的结果
    • 取效率最优
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public static void main(String[] args) {
    CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {
    try {
    TimeUnit.MILLISECONDS.sleep(11);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("task1 run...");
    return 1;
    });
    CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {
    try {
    TimeUnit.MILLISECONDS.sleep(10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("task2 run...");
    return 2;
    });
    CompletableFuture<Integer> cf = cf1.applyToEither(cf2, result -> {
    System.out.println("最终结果: " + result);
    return result;
    });
    System.out.println(cf.join());
    }
  • acceptEither, acceptEitherAsync

  • runAfterEither, runAfterEitherAsync

  • thenCompose, thenComposeAsync

    • 两个cf异步执行,合并cf结果
    • 异步结果合并
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    //使用多个异步操作完成流水线操作
    public static List<String> findDiscountPriceByCompletableFuture(List<Shop> shops, String productName) {

    List<CompletableFuture<String>> collect = shops.stream()
    //1. 第一步异步操作: 从远程商店查询商品的原始价格以及对应的折扣
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getProductPrice(productName), EXECUTOR))
    //2. 这里可以使用同步操作,因为仅仅涉及到数据的转换
    .map(future -> future.thenApply(PriceResult::parse))
    //3. 第二个异步操作: 向远程折扣服务传递指定数据并返回折扣后的价格
    .map(future -> future.thenCompose(priceResult -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(priceResult), EXECUTOR)))
    .collect(Collectors.toList());

    return collect.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }
  • whenComplete, whenCompleteAsync

  • handle, handleAsync

    • handle 方法与 thenApply 类似,但它可以处理异常情况.
    • 无论前一个阶段是否出现异常,handle 方法都会被执行.
    • 如果前一个阶段出现异常,则异常被传递给 handle 方法的异常参数,并执行异常处理.
    • 如果前一个阶段正常完成,则前一个阶段的结果将传递给 handle 方法的结果参数,并执行正常结果处理.
  • exceptionally

  • 以Async结尾的方法,都是异步方法,对应的没有Async则是同步方法,一般都是一个异步方法对应一个同步方法.

    • 后缀带Async的方法,它的意思是,另起一个线程来执行这个任务,不带Async就不是当前任务异步那么默认沿用上个任务的线程执行,如果不传线程池,默认会使用ForkJoin线程池
      • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行.所以有时间测试发现不带Async的使用Main线程执行,正是因为上一步cf已经执行完了
      • 如果注册时被依赖的操作还未执行完,则由回调线程执行.
  • 以run开头的方法,其入口参数一定是无参的,并且没有返回值,类似于执行Runnable方法.

  • 以supply开头的方法,入口也是没有参数的,但是有返回值

  • 以Accept开头或者结尾的方法,入口参数是有参数,但是没有返回值

  • 以Apply开头或者结尾的方法,入口有参数,有返回值

  • 带有either后缀的方法,表示谁先完成就消费谁