跳到主要内容

阻塞队列

问题

BlockingQueue 有哪些实现?各自的特点和适用场景是什么?阻塞队列如何实现生产者-消费者模式?

答案

BlockingQueue 接口

BlockingQueue 扩展了 Queue 接口,增加了阻塞的入队/出队操作:

操作抛异常返回特殊值阻塞超时
插入add(e)offer(e) → falseput(e)offer(e, timeout, unit)
移除remove()poll() → nulltake()poll(timeout, unit)
检查element()peek() → null--
核心方法
  • put():队列满时阻塞等待
  • take():队列空时阻塞等待

这是阻塞队列的核心价值——自动实现生产者-消费者的等待/通知机制。

7 种 BlockingQueue 实现

1. ArrayBlockingQueue

基于数组的有界阻塞队列,FIFO 顺序:

ArrayBlockingQueue
// 必须指定容量
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

// 支持公平锁(默认非公平)
BlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(100, true);
  • 底层:数组 + 一把 ReentrantLock + 两个 Condition(notEmpty / notFull)
  • 特点:容量固定,不能扩容,公平/非公平可选
  • 适用:已知容量上限,内存敏感场景

2. LinkedBlockingQueue

基于单向链表的可选有界阻塞队列:

LinkedBlockingQueue
// 默认容量 Integer.MAX_VALUE(近似无界,慎用!)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

// 推荐指定容量
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(1000);
  • 底层:单向链表 + 两把锁(putLock + takeLock),生产和消费可并行
  • 特点:吞吐量通常高于 ArrayBlockingQueue(双锁),不指定容量则近似无界
  • 适用:吞吐量要求高的场景
Executors 中的隐患

Executors.newFixedThreadPool() 使用的就是不指定容量的 LinkedBlockingQueue(容量 = Integer.MAX_VALUE),可能任务堆积导致 OOM。详见 线程池

3. SynchronousQueue

不存储元素的队列,每个入队操作必须等待一个出队操作:

SynchronousQueue
BlockingQueue<String> queue = new SynchronousQueue<>(); // 非公平
BlockingQueue<String> fairQueue = new SynchronousQueue<>(true); // 公平

// put 会阻塞,直到有线程 take
// 相当于生产者和消费者直接"交接"
  • 底层:非公平模式用栈(TransferStack),公平模式用队列(TransferQueue)
  • 特点:容量为 0,不存储元素,直接传递
  • 适用:Executors.newCachedThreadPool() 使用此队列

4. PriorityBlockingQueue

基于的无界优先级阻塞队列:

PriorityBlockingQueue
// 元素必须实现 Comparable 或提供 Comparator
BlockingQueue<Task> queue = new PriorityBlockingQueue<>(11,
Comparator.comparingInt(Task::getPriority));

// 注意:只保证出队时取到优先级最高的,遍历不保证顺序
  • 底层:数组实现的最小堆 + 一把 ReentrantLock
  • 特点:无界(自动扩容),put() 永不阻塞,take() 在空时阻塞
  • 适用:任务有优先级的场景

5. DelayQueue

元素需要延迟一定时间后才能出队:

DelayQueueExample.java
public class DelayedTask implements Delayed {
private final String name;
private final long expireTime; // 到期时间

public DelayedTask(String name, long delay, TimeUnit unit) {
this.name = name;
this.expireTime = System.currentTimeMillis() + unit.toMillis(delay);
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
}
}

DelayQueue<DelayedTask> queue = new DelayQueue<>();
queue.put(new DelayedTask("任务A", 5, TimeUnit.SECONDS));
queue.put(new DelayedTask("任务B", 2, TimeUnit.SECONDS));

// take() 会按延迟时间顺序取出(先到期的先出)
DelayedTask task = queue.take(); // 2 秒后返回"任务B"
  • 底层:PriorityQueue + 一把 ReentrantLock + Condition
  • 适用:定时任务、缓存过期、订单超时

6. LinkedTransferQueue(JDK 7)

无界队列,融合了 SynchronousQueue 的直接传递和 LinkedBlockingQueue 的存储能力:

LinkedTransferQueue
LinkedTransferQueue<String> queue = new LinkedTransferQueue<>();

// transfer:如果有消费者在等待则直接交给消费者,否则阻塞等待消费者
queue.transfer("data");

// tryTransfer:尝试直接交给消费者,没有消费者则返回 false(不阻塞)
queue.tryTransfer("data");

7. LinkedBlockingDeque

基于双向链表的可选有界双端阻塞队列:

LinkedBlockingDeque
LinkedBlockingDeque<String> deque = new LinkedBlockingDeque<>(100);
deque.putFirst("头部插入");
deque.putLast("尾部插入");
deque.takeFirst(); // 从头部取
deque.takeLast(); // 从尾部取
  • 适用:工作窃取算法(ForkJoinPool)

实现对比

队列底层边界特点
ArrayBlockingQueue数组有界1 把锁简单可靠
LinkedBlockingQueue链表可选2 把锁吞吐量高
SynchronousQueue无存储0CAS直接传递
PriorityBlockingQueue无界1 把锁优先级排序
DelayQueue无界1 把锁延时出队
LinkedTransferQueue链表无界CAS直接传递+存储
LinkedBlockingDeque链表可选1 把锁双端操作

生产者-消费者模式

ProducerConsumer.java
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 100; i++) {
queue.put("产品-" + i); // 队列满则阻塞
System.out.println("生产: 产品-" + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

// 消费者
Thread consumer = new Thread(() -> {
try {
while (true) {
String product = queue.take(); // 队列空则阻塞
System.out.println("消费: " + product);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

producer.start();
consumer.start();

阻塞队列自动处理了生产者-消费者之间的等待/通知,不需要手动使用 wait/notify 或 Condition。


常见面试问题

Q1: ArrayBlockingQueue 和 LinkedBlockingQueue 的区别?

答案

对比ArrayBlockingQueueLinkedBlockingQueue
底层数组单向链表
容量必须指定(有界)可选,默认 Integer.MAX_VALUE
1 把锁(生产消费互斥)2 把锁(生产消费可并行)
内存预分配数组动态创建节点
GC无额外 GC 压力节点创建/回收有 GC 开销
公平性可选不支持公平
吞吐量较低较高(双锁并行)

Q2: SynchronousQueue 的特点?

答案

SynchronousQueue 不存储元素(容量为 0),每次 put() 必须等待一个 take() 匹配。适合直接传递任务的场景(如 CachedThreadPool),吞吐量高。

公平模式用 TransferQueue(FIFO),非公平模式用 TransferStack(LIFO)。

Q3: put() 和 offer() 的区别?

答案

  • put():队列满时阻塞等待,直到有空间
  • offer():队列满时立即返回 false,不阻塞
  • offer(e, timeout, unit):队列满时等待指定时间,超时返回 false

Q4: 阻塞队列在线程池中的作用?

答案

线程池中阻塞队列用于存放等待执行的任务。当核心线程数已满时,新提交的任务进入队列排队。队列的类型和大小直接影响线程池的行为:

  • 无界队列:maximumPoolSize 无效(永远不会创建非核心线程),可能 OOM
  • 有界队列:队列满后才会创建非核心线程,更可控
  • SynchronousQueue:不存储任务,直接创建线程处理

Q5: DelayQueue 的应用场景?

答案

  1. 定时任务调度ScheduledThreadPoolExecutor 内部使用类似原理
  2. 缓存过期:缓存项到期后从 DelayQueue 取出并清理
  3. 订单超时关闭:下单后放入 DelayQueue,30 分钟后取出检查是否支付
  4. 限流:请求放入 DelayQueue,间隔一定时间才能取出

相关链接