asyncio 协程
问题
asyncio 的工作原理是什么?如何使用 async/await?事件循环是怎么回事?
答案
协程基础
import asyncio
async def fetch_data(url: str) -> str:
"""async 函数返回协程对象"""
print(f"开始请求 {url}")
await asyncio.sleep(1) # 模拟 IO 操作,释放控制权
print(f"完成请求 {url}")
return f"数据来自 {url}"
async def main():
# 串行执行
result1 = await fetch_data("url1")
result2 = await fetch_data("url2")
# 总耗时 ~2s
# 并发执行
result1, result2 = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
)
# 总耗时 ~1s
asyncio.run(main())
事件循环
事件循环是 asyncio 的核心,它:
- 维护一个任务队列
- 执行就绪的协程直到遇到
await - 在 IO 完成时唤醒挂起的协程
- 不断循环直到所有任务完成
Task 与并发
import asyncio
async def task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done"
async def main():
# 创建 Task 实现并发
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
# 两个任务并发执行
result1 = await task1
result2 = await task2
# gather:收集多个协程的结果
results = await asyncio.gather(
task("C", 1),
task("D", 2),
task("E", 1),
)
# TaskGroup(3.11+,推荐):自动取消
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(task("F", 1))
t2 = tg.create_task(task("G", 2))
# 自动等待所有任务完成,任一失败则取消其余
asyncio.run(main())
超时与取消
import asyncio
async def slow_operation():
await asyncio.sleep(10)
async def main():
# 超时控制
try:
async with asyncio.timeout(3): # 3.11+
await slow_operation()
except TimeoutError:
print("超时了!")
# 取消任务
task = asyncio.create_task(slow_operation())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务被取消")
异步迭代与异步生成器
import asyncio
async def async_range(n: int):
"""异步生成器"""
for i in range(n):
await asyncio.sleep(0.1)
yield i
async def main():
# 异步 for 循环
async for i in async_range(5):
print(i)
# 异步推导式
values = [i async for i in async_range(5)]
常见面试问题
Q1: await 的作用是什么?
答案:
await 做两件事:
- 暂停当前协程的执行,将控制权交还事件循环
- 等待被 await 的协程/Future 完成,然后恢复执行
只有在 async def 函数内才能使用 await,只能 await 可等待对象(协程、Task、Future)。
Q2: asyncio.gather 和 asyncio.TaskGroup 的区别?
答案:
| 特性 | gather | TaskGroup(3.11+) |
|---|---|---|
| 错误处理 | return_exceptions=True 收集 | 任一失败自动取消其余 |
| 结构 | 函数调用 | 上下文管理器 |
| 取消行为 | 需手动处理 | 自动取消和清理 |
| 推荐度 | 兼容旧版本 | 3.11+ 推荐 |
Q3: 如何在同步代码中调用异步函数?
答案:
import asyncio
async def async_func():
return "hello"
# 方式 1:asyncio.run()(顶层入口)
result = asyncio.run(async_func())
# 方式 2:在已有事件循环中
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
Q4: asyncio 中如何限制并发数?
答案:
import asyncio
async def fetch(url: str, sem: asyncio.Semaphore):
async with sem: # 信号量限制并发
await asyncio.sleep(1)
return f"done: {url}"
async def main():
sem = asyncio.Semaphore(5) # 最多 5 个并发
tasks = [fetch(f"url_{i}", sem) for i in range(20)]
results = await asyncio.gather(*tasks)