跳到主要内容

concurrent.futures

问题

concurrent.futures 模块怎么用?ThreadPoolExecutorProcessPoolExecutor 有什么区别?

答案

统一的接口

concurrent.futures 提供了线程池和进程池的统一高级接口:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

def task(n: int) -> int:
return n * n

# 线程池(IO 密集型)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(task, i) for i in range(10)]

# as_completed:按完成顺序获取结果
for future in as_completed(futures):
print(future.result())

# map:按提交顺序返回结果
results = list(executor.map(task, range(10)))

# 进程池(CPU 密集型)——接口完全相同
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(task, range(10)))

Future 对象

from concurrent.futures import ThreadPoolExecutor, Future

def slow_task() -> str:
import time
time.sleep(2)
return "完成"

with ThreadPoolExecutor() as executor:
future: Future = executor.submit(slow_task)

print(future.done()) # False — 还没完成
print(future.running()) # True
result = future.result(timeout=5) # 阻塞等待结果
print(future.done()) # True

# 添加回调
future2 = executor.submit(slow_task)
future2.add_done_callback(lambda f: print(f"回调: {f.result()}"))

常见面试问题

Q1: 线程池的 max_workers 设置多少合适?

答案

类型推荐值原因
IO 密集型CPU 核心数 × 5IO 等待时线程不占 CPU
CPU 密集型CPU 核心数更多线程反而增加切换开销
import os
cpu_count = os.cpu_count() # 如 8
# IO 密集型:ThreadPoolExecutor(max_workers=cpu_count * 5)
# CPU 密集型:ProcessPoolExecutor(max_workers=cpu_count)

Q2: submitmap 的区别?

答案

  • submit:提交单个任务,返回 Future,更灵活
  • map:批量提交,按顺序返回结果,更简洁

submit + as_completed 适合想按完成顺序处理的场景;map 适合想保持输入顺序的场景。

相关链接