并发工具类
问题
CountDownLatch、CyclicBarrier、Semaphore 的区别和应用场景是什么?它们的底层实现原理是什么?
答案
CountDownLatch(倒计时门闩)
一次性的倒计时器,让一个或多个线程等待其他线程完成操作:
// 场景:主线程等待 3 个子任务全部完成
CountDownLatch latch = new CountDownLatch(3); // 计数器初始值 3
for (int i = 0; i < 3; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskId + " 执行中...");
Thread.sleep(1000 + taskId * 500);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数器 -1
}
}).start();
}
System.out.println("主线程等待所有任务完成...");
latch.await(); // 阻塞直到计数器为 0
// latch.await(5, TimeUnit.SECONDS); // 超时等待
System.out.println("所有任务已完成!");
原理:基于 AQS 共享模式。state 初始化为 count,每次 countDown() 将 state 减 1,当 state 减到 0 时唤醒所有等待线程。
CountDownLatch 是一次性的,计数器减到 0 后无法重置。如果需要重复使用,请用 CyclicBarrier。
典型场景:
- 主线程等待多个子任务完成(并行初始化)
- 多个线程同时开始(将 count 设为 1,所有线程 await,主线程 countDown 后同时放行)
CyclicBarrier(循环栅栏)
让一组线程互相等待,全部到达栅栏点后一起继续执行,可循环使用:
// 场景:3 个线程各自准备数据,全部就绪后一起处理
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
// 所有线程到达后执行的回调(可选,由最后到达的线程执行)
System.out.println("所有线程已就绪,开始合并处理");
});
for (int i = 0; i < 3; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("线程 " + taskId + " 准备数据中...");
Thread.sleep(1000 + taskId * 500);
System.out.println("线程 " + taskId + " 准备完毕,等待其他线程");
barrier.await(); // 等待其他线程到达栅栏
// 所有线程到达后继续执行
System.out.println("线程 " + taskId + " 继续执行");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
原理:基于 ReentrantLock + Condition。内部维护一个计数器 count,每个线程调用 await() 时 count 减 1 并阻塞。最后一个线程(count = 0)唤醒所有线程,并重置计数器(可循环使用)。
典型场景:
- 多线程分段计算后汇总(MapReduce)
- 多轮比赛,每轮所有选手就绪后同时开始
CountDownLatch vs CyclicBarrier
| 对比 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 等待方 | 一个或多个线程等待其他线程 | 线程之间互相等待 |
| 计数变化 | countDown() 减计数 | await() 减计数 |
| 可重用 | 不可(一次性) | 可循环使用(reset()) |
| 回调 | 无 | 支持(所有线程到达后执行) |
| 底层 | AQS 共享模式 | ReentrantLock + Condition |
| 异常 | 无特殊处理 | BrokenBarrierException |
Semaphore(信号量)
控制同时访问资源的线程数量,常用于限流:
// 场景:限制同时最多 3 个线程访问资源(如数据库连接池)
Semaphore semaphore = new Semaphore(3); // 3 个许可
for (int i = 0; i < 10; i++) {
final int taskId = i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取一个许可(阻塞)
// semaphore.tryAcquire(1, TimeUnit.SECONDS); // 超时获取
System.out.println("线程 " + taskId + " 获取到许可,正在执行...");
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
System.out.println("线程 " + taskId + " 释放许可");
}
}).start();
}
原理:基于 AQS 共享模式。state 表示可用许可数。acquire() 时 state 减 1,如果 state < 0 则阻塞。release() 时 state 加 1 并唤醒等待线程。
当许可数为 1 时,Semaphore 等价于一个互斥锁。与 ReentrantLock 不同的是,Semaphore 可以由不同线程释放(不要求获取和释放是同一线程)。
典型场景:
- 限流(控制并发数)
- 资源池(连接池、线程池的许可管理)
Exchanger(交换器)
两个线程之间交换数据:
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
String data = "来自线程A的数据";
String received = exchanger.exchange(data); // 交换数据
System.out.println("线程A收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
new Thread(() -> {
try {
String data = "来自线程B的数据";
String received = exchanger.exchange(data); // 交换数据
System.out.println("线程B收到: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 输出:
// 线程A收到: 来自线程B的数据
// 线程B收到: 来自线程A的数据
典型场景:遗传算法中两个线程交换染色体数据、双缓冲技术。
Phaser(阶段器,JDK 7)
Phaser 是 CyclicBarrier 和 CountDownLatch 的增强版,支持动态注册/注销参与者和多阶段同步:
Phaser phaser = new Phaser(3); // 3 个参与者
for (int i = 0; i < 3; i++) {
final int id = i;
new Thread(() -> {
// 阶段 1
System.out.println("线程 " + id + " 完成阶段1");
phaser.arriveAndAwaitAdvance(); // 等待所有参与者到达
// 阶段 2
System.out.println("线程 " + id + " 完成阶段2");
phaser.arriveAndAwaitAdvance(); // 等待所有参与者到达
// 注销
phaser.arriveAndDeregister();
}).start();
}
常见面试问题
Q1: CountDownLatch 和 CyclicBarrier 的区别?
答案:
核心区别在于等待关系和可重用性:
- CountDownLatch:一个线程等待其他线程完成,一次性不可重用
- CyclicBarrier:线程之间互相等待,可循环使用
CountDownLatch 的 countDown() 和 await() 可以是不同线程调用,而 CyclicBarrier 的 await() 由参与者线程调用。
Q2: Semaphore 的使用场景?
答案:
- 限流:控制接口的并发访问数量
- 资源池:管理有限资源(数据库连接、文件句柄)
- 生产者-消费者:空位信号量 + 产品信号量(但通常用 BlockingQueue 更方便)
Q3: Semaphore 是公平的还是非公平的?
答案:
Semaphore 支持公平和非公平两种模式:
new Semaphore(3); // 非公平(默认)
new Semaphore(3, true); // 公平
底层与 ReentrantLock 类似:公平模式先检查 AQS 队列是否有等待线程,非公平模式直接 CAS 竞争。
Q4: CountDownLatch 的 countDown() 和 await() 可以在不同线程调用吗?
答案:
可以。CountDownLatch 的设计就是让 countDown() 和 await() 分离:
- 工作线程调用
countDown(),完成一个任务 - 等待线程调用
await(),等待所有任务完成
一个线程也可以多次调用 countDown()。
Q5: CyclicBarrier 的 BrokenBarrierException 何时抛出?
答案:
以下情况会导致栅栏"损坏",其他等待线程收到 BrokenBarrierException:
- 某个等待线程被中断
- 某个等待线程超时
- 栅栏的回调动作抛出异常
- 调用了
barrier.reset()
栅栏损坏后可以调用 reset() 重置。
Q6: 这些工具类的底层实现分别是什么?
答案:
| 工具类 | 底层实现 |
|---|---|
| CountDownLatch | AQS 共享模式,state = count |
| CyclicBarrier | ReentrantLock + Condition |
| Semaphore | AQS 共享模式,state = permits |
| Exchanger | CAS + LockSupport.park/unpark |
| Phaser | 内部状态机 + CAS |
相关链接
- CountDownLatch - Java 17 API
- CyclicBarrier - Java 17 API
- Semaphore - Java 17 API
- Lock 接口与 AQS - AQS 核心原理
- 线程池 - 线程池中的并发控制
- 阻塞队列 - 另一种并发协作工具