Fork/Join 框架
问题
Fork/Join 框架的工作原理是什么?什么是工作窃取算法?ForkJoinPool 和普通线程池有什么区别?
答案
Fork/Join 框架概述
Fork/Join 框架是 JDK 7 引入的一个并行执行框架,基于分治(Divide and Conquer) 思想:
- Fork:将大任务拆分为多个小任务,递归直到足够小
- Join:合并子任务的结果
核心类
| 类 | 说明 |
|---|---|
ForkJoinPool | 执行 ForkJoinTask 的线程池 |
ForkJoinTask<V> | 在 ForkJoinPool 中执行的任务基类 |
RecursiveTask<V> | 有返回值的 ForkJoinTask |
RecursiveAction | 无返回值的 ForkJoinTask |
使用示例:并行求和
ParallelSum.java
public class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000; // 拆分阈值
private final long[] array;
private final int start;
private final int end;
public SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
// 基本情况:任务足够小,直接计算
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 分治:将任务拆分为两半
int mid = start + length / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// fork:异步执行左半部分
leftTask.fork();
// 当前线程直接执行右半部分(避免浪费当前线程)
Long rightResult = rightTask.compute();
// join:等待左半部分完成并获取结果
Long leftResult = leftTask.join();
// 合并结果
return leftResult + rightResult;
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool(); // 默认线程数 = CPU 核数
long[] array = LongStream.rangeClosed(1, 10_000_000).toArray();
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("Sum = " + result);
fork/compute 顺序
推荐先 fork() 一个子任务,然后在当前线程 compute() 另一个子任务,最后 join() 等待 fork 的结果。这样可以充分利用当前线程,避免空等。
leftTask.fork(); // 异步执行左任务
Long right = rightTask.compute(); // 当前线程执行右任务
Long left = leftTask.join(); // 等待左任务结果
工作窃取算法(Work-Stealing)
ForkJoinPool 中每个工作线程都有自己的双端队列(Deque):
工作流程:
- 线程 fork 产生的子任务放入自己的 Deque 头部
- 线程从自己的 Deque 头部取任务执行(LIFO,优先处理最近的任务)
- 当自己的 Deque 为空时,从其他线程的 Deque 尾部窃取任务(FIFO)
为什么用双端队列?
- 自己从头部取(LIFO):优先执行最近 fork 的大任务,便于递归分解
- 窃取从尾部取(FIFO):窃取最老的大任务,可以继续分解,减少竞争
ForkJoinPool vs ThreadPoolExecutor
| 对比 | ForkJoinPool | ThreadPoolExecutor |
|---|---|---|
| 任务类型 | 可分解的递归任务 | 独立的异步任务 |
| 队列 | 每线程一个 Deque | 共享一个 BlockingQueue |
| 调度 | 工作窃取 | 从共享队列取任务 |
| 适用场景 | 计算密集型、分治 | 通用异步任务 |
| 线程数默认 | CPU 核数 | 需手动指定 |
| JDK 版本 | JDK 7 | JDK 5 |
ForkJoinPool.commonPool()
JDK 8 引入了一个静态共享的 ForkJoinPool:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
// 线程数 = Runtime.getRuntime().availableProcessors() - 1
以下功能默认使用 commonPool:
CompletableFuture.supplyAsync()/runAsync()Stream.parallel()
commonPool 的风险
所有使用 commonPool 的代码共享同一个线程池,某个慢任务可能拖累所有其他任务。I/O 密集型任务不应使用 commonPool。
在生产环境中,建议为 CompletableFuture 传入自定义线程池。
parallelStream 与 Fork/Join
ParallelStream.java
// parallelStream 底层使用 ForkJoinPool.commonPool()
long sum = LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum();
// 如果需要使用自定义 ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(8);
long result = customPool.submit(() ->
LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum()
).get();
使用注意事项
Fork/Join 注意事项
- 避免在 ForkJoinTask 中阻塞 I/O:工作窃取算法假设任务是 CPU 密集型的
- 合理设置阈值:阈值太小导致任务拆分过多(开销大于收益),太大则并行度不够
- 避免在 ForkJoinTask 中使用 synchronized:可能导致工作窃取线程阻塞
- 先 fork 再 compute 再 join:充分利用当前线程
常见面试问题
Q1: 什么是工作窃取算法?
答案:
工作窃取(Work-Stealing)是 ForkJoinPool 的核心调度策略。每个工作线程维护一个双端队列,从自己队列的头部取任务执行。当自己的队列为空时,从其他线程队列的尾部"窃取"任务执行。
好处:
- 充分利用 CPU,减少线程空闲
- 窃取从尾部取,自己从头部取,减少竞争
- 窃取大粒度任务,可继续分解
Q2: Fork/Join 框架适用于什么场景?
答案:
适用于可递归拆分的计算密集型任务,典型场景:
- 大数组排序/求和
- 递归搜索(如盘点文件系统)
- 矩阵运算
- 图遍历
不适用于 I/O 密集型任务(阻塞会浪费工作线程)。
Q3: RecursiveTask 和 RecursiveAction 的区别?
答案:
| 类 | 返回值 | 方法 | 场景 |
|---|---|---|---|
RecursiveTask<V> | 有 | V compute() | 需要返回结果(如求和) |
RecursiveAction | 无 | void compute() | 不需要返回结果(如排序) |
Q4: ForkJoinPool 和普通线程池的区别?
答案:
- 普通线程池(ThreadPoolExecutor):所有线程竞争一个共享队列,适合独立任务
- ForkJoinPool:每个线程有独立的双端队列 + 工作窃取,适合可分解的递归任务
ForkJoinPool 在分治场景下能更高效地利用 CPU。
Q5: CompletableFuture 和 parallelStream 默认用什么线程池?有什么风险?
答案:
都默认使用 ForkJoinPool.commonPool(),这是一个全局共享的线程池。
风险:
- 慢任务拖累所有使用 commonPool 的功能
- I/O 操作阻塞工作线程
- 在 Web 应用中,多个请求共享同一个 commonPool
建议:重要的异步任务使用自定义线程池。
相关链接
- ForkJoinPool - Java 17 API
- RecursiveTask - Java 17 API
- 线程池 - ThreadPoolExecutor 对比
- CompletableFuture - 使用 ForkJoinPool.commonPool
- Stream API - parallelStream 底层原理