CompletableFuture
问题
CompletableFuture 解决了 Future 的什么问题?如何使用 CompletableFuture 进行异步编排?异常处理怎么做?
答案
Future 的局限性
JDK 5 的 Future 接口只能实现最基本的异步:
FutureLimitations.java
Future<String> future = executor.submit(() -> fetchData());
// 问题 1:get() 阻塞当前线程
String result = future.get(); // 一直阻塞 ❌
// 问题 2:无法链式组合多个异步任务
// 问题 3:无法手动完成(设置结果)
// 问题 4:无法注册回调,只能轮询 isDone()
CompletableFuture(JDK 8)
CompletableFuture 实现了 Future 和 CompletionStage 接口,支持函数式的异步编排。
创建异步任务
CreateCompletableFuture.java
// 1. supplyAsync - 有返回值
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
return fetchData(); // 在 ForkJoinPool.commonPool 中执行
});
// 2. runAsync - 无返回值
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> {
sendNotification();
});
// 3. 指定线程池(推荐,避免使用公共 ForkJoinPool)
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> {
return fetchData();
}, executor);
// 4. 手动创建并完成
CompletableFuture<String> cf4 = new CompletableFuture<>();
cf4.complete("手动设置结果"); // 正常完成
cf4.completeExceptionally(new RuntimeException()); // 异常完成
指定线程池
默认使用 ForkJoinPool.commonPool()(共享线程池),不同业务的异步任务应使用独立线程池,避免相互影响。
链式转换(thenApply / thenAccept / thenRun)
ChainingExample.java
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserId()) // 异步获取用户 ID
.thenApply(id -> fetchUserName(id)) // 用 ID 查名字(有入参有返回)
.thenApply(name -> "Hello, " + name) // 转换结果
.thenApply(String::toUpperCase); // 再次转换
// thenAccept:有入参,无返回值
CompletableFuture<Void> print = result.thenAccept(System.out::println);
// thenRun:无入参,无返回值(仅在前一步完成后执行)
CompletableFuture<Void> done = result.thenRun(() -> System.out.println("完成"));
| 方法 | 入参 | 返回值 | 用途 |
|---|---|---|---|
thenApply | 前一步结果 | 有 | 转换结果 |
thenAccept | 前一步结果 | 无 | 消费结果 |
thenRun | 无 | 无 | 执行动作 |
每个方法都有对应的 Async 版本(在另一个线程中执行):
// 同步版本:使用前一步的线程执行
cf.thenApply(x -> transform(x));
// 异步版本:使用默认线程池执行
cf.thenApplyAsync(x -> transform(x));
// 异步版本:使用指定线程池执行
cf.thenApplyAsync(x -> transform(x), customExecutor);
组合两个异步任务
thenCompose(flatMap,串行依赖)
ThenCompose.java
// 第二个任务依赖第一个任务的结果
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> fetchUserId())
// thenCompose:接收一个函数,返回 CompletableFuture(避免嵌套)
.thenCompose(id -> CompletableFuture.supplyAsync(() -> fetchUserDetail(id)));
thenCombine(并行合并)
ThenCombine.java
// 两个任务并行执行,结果合并
CompletableFuture<String> nameFuture = CompletableFuture.supplyAsync(() -> fetchName());
CompletableFuture<Integer> ageFuture = CompletableFuture.supplyAsync(() -> fetchAge());
// 两个都完成后合并结果
CompletableFuture<String> result = nameFuture.thenCombine(ageFuture,
(name, age) -> name + " is " + age + " years old");
多任务编排
allOf(全部完成)
AllOf.java
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> fetchA());
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> fetchB());
CompletableFuture<String> cf3 = CompletableFuture.supplyAsync(() -> fetchC());
// 等待所有任务完成(注意:allOf 返回 CompletableFuture<Void>)
CompletableFuture<Void> allDone = CompletableFuture.allOf(cf1, cf2, cf3);
// 获取所有结果
allDone.thenRun(() -> {
String a = cf1.join(); // join() 不抛受检异常
String b = cf2.join();
String c = cf3.join();
System.out.println(a + ", " + b + ", " + c);
});
anyOf(任一完成)
AnyOf.java
// 任一任务完成即返回
CompletableFuture<Object> fastest = CompletableFuture.anyOf(cf1, cf2, cf3);
fastest.thenAccept(result -> System.out.println("最快的结果: " + result));
异常处理
ExceptionHandling.java
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("出错了");
return "success";
})
// exceptionally:出错时返回默认值(类似 catch)
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "default";
});
// handle:无论成功还是失败都执行(类似 try-catch-finally)
CompletableFuture<String> result2 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.handle((value, ex) -> {
if (ex != null) {
return "出错了: " + ex.getMessage();
}
return "成功: " + value;
});
// whenComplete:观察结果,不修改返回值
CompletableFuture<String> result3 = CompletableFuture
.supplyAsync(() -> riskyOperation())
.whenComplete((value, ex) -> {
if (ex != null) {
log.error("任务失败", ex);
} else {
log.info("任务成功: {}", value);
}
});
| 方法 | 出错时 | 成功时 | 返回值 |
|---|---|---|---|
exceptionally | 执行,返回替代值 | 不执行 | 可修改 |
handle | 执行 | 执行 | 可修改 |
whenComplete | 执行 | 执行 | 不可修改 |
超时控制(JDK 9+)
Timeout.java
// JDK 9+
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> slowOperation())
.orTimeout(3, TimeUnit.SECONDS) // 超时抛 TimeoutException
.completeOnTimeout("默认值", 3, TimeUnit.SECONDS); // 超时返回默认值
实战示例:并行调用多个服务
ParallelServiceCalls.java
public UserProfile getUserProfile(long userId) {
// 并行调用三个微服务
CompletableFuture<UserInfo> userFuture =
CompletableFuture.supplyAsync(() -> userService.getUser(userId), executor);
CompletableFuture<List<Order>> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.getOrders(userId), executor);
CompletableFuture<Integer> scoreFuture =
CompletableFuture.supplyAsync(() -> scoreService.getScore(userId), executor);
// 等待三个结果,组装返回
return CompletableFuture.allOf(userFuture, orderFuture, scoreFuture)
.thenApply(v -> {
UserInfo user = userFuture.join();
List<Order> orders = orderFuture.join();
int score = scoreFuture.join();
return new UserProfile(user, orders, score);
})
.join(); // 阻塞获取最终结果
}
常见面试问题
Q1: CompletableFuture 和 Future 的区别?
答案:
| 对比 | Future | CompletableFuture |
|---|---|---|
| 获取结果 | get() 阻塞 | 非阻塞回调 + get()/join() |
| 任务组合 | 不支持 | thenApply、thenCompose、thenCombine |
| 多任务编排 | 不支持 | allOf、anyOf |
| 异常处理 | 只能 try-catch | exceptionally、handle |
| 手动完成 | 不支持 | complete()、completeExceptionally() |
| API 风格 | 命令式 | 函数式 |
Q2: thenApply 和 thenCompose 的区别?
答案:
类似 Stream 的 map 和 flatMap:
- thenApply:
Function<T, U>,返回值直接作为新 CompletableFuture 的结果 - thenCompose:
Function<T, CompletableFuture<U>>,函数本身返回一个 CompletableFuture,避免嵌套
// thenApply → CompletableFuture<CompletableFuture<String>> 嵌套
cf.thenApply(id -> fetchAsync(id));
// thenCompose → CompletableFuture<String> 扁平化
cf.thenCompose(id -> fetchAsync(id));
Q3: get() 和 join() 的区别?
答案:
| 对比 | get() | join() |
|---|---|---|
| 受检异常 | 抛出 ExecutionException(需要 try-catch) | 抛出 CompletionException(非受检) |
| 超时 | 支持 get(timeout, unit) | 不支持 |
| 用途 | 需要超时控制时使用 | 链式调用中更方便 |
Q4: CompletableFuture 默认使用什么线程池?
答案:
默认使用 ForkJoinPool.commonPool()。这是一个全局共享的线程池,默认线程数为 Runtime.getRuntime().availableProcessors() - 1。
生产环境建议:为不同业务传入自定义线程池,避免不同任务竞争同一个线程池资源,也避免慢任务拖垮所有异步操作。
Q5: CompletableFuture 的异常处理策略?
答案:
- exceptionally:只处理异常,返回替代值
- handle:无论成功失败都执行,可修改返回值
- whenComplete:无论成功失败都执行,不可修改返回值(只做观察/记录)
链式调用中异常会传播,直到遇到异常处理方法。
Q6: 如何实现超时控制?
答案:
JDK 9+:orTimeout() 和 completeOnTimeout()。
JDK 8 需要手动实现:
public static <T> CompletableFuture<T> withTimeout(
CompletableFuture<T> future, long timeout, TimeUnit unit) {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
future.completeExceptionally(new TimeoutException("超时"));
}, timeout, unit);
return future;
}
相关链接
- CompletableFuture - Java 17 API
- 线程池 - CompletableFuture 使用的线程池
- Fork/Join 框架 - ForkJoinPool.commonPool
- Stream API - 函数式编程风格对比