asyncio 常见陷阱
问题
使用 asyncio 开发时有哪些常见陷阱?如何避免?
答案
陷阱 1:忘记 await
# ❌ 忘记 await,协程没有执行
async def fetch_data():
return await httpx.get("https://api.example.com/data")
async def main():
result = fetch_data() # 返回的是协程对象,不是结果!
print(type(result)) # <class 'coroutine'>
# ✅ 正确
async def main():
result = await fetch_data()
陷阱 2:在 async 中调用同步阻塞函数
import time
import asyncio
# ❌ 阻塞整个事件循环
async def handler():
time.sleep(5) # 阻塞!其他协程都被卡住
requests.get(url) # 阻塞!
# ✅ 使用 run_in_executor 包装
async def handler():
loop = asyncio.get_event_loop()
# 在线程池中运行阻塞代码
await loop.run_in_executor(None, time.sleep, 5)
result = await loop.run_in_executor(None, requests.get, url)
# ✅ 更好:用异步库替代
async def handler():
await asyncio.sleep(5)
async with httpx.AsyncClient() as client:
result = await client.get(url)
陷阱 3:Task 被垃圾回收
# ❌ Task 没有被引用,可能被 GC 回收
async def main():
asyncio.create_task(background_work()) # 没有保存引用!
# ✅ 保持引用
async def main():
tasks = set()
task = asyncio.create_task(background_work())
tasks.add(task)
task.add_done_callback(tasks.discard)
陷阱 4:异常被吞掉
# ❌ Task 的异常不会自动抛出
async def risky():
raise ValueError("出错了")
async def main():
task = asyncio.create_task(risky())
await asyncio.sleep(1)
# 异常被静默忽略,只有在 await task 时才会抛出
# ✅ 总是 await 或添加回调
async def main():
task = asyncio.create_task(risky())
def handle_exception(t: asyncio.Task):
if t.exception():
logging.error(f"Task failed: {t.exception()}")
task.add_done_callback(handle_exception)
陷阱 5:用列表推导创建协程但不并发
# ❌ 串行执行
async def main():
results = []
for url in urls:
result = await fetch(url) # 一个一个等
results.append(result)
# ✅ 并发执行
async def main():
results = await asyncio.gather(*[fetch(url) for url in urls])
# ✅ 带并发限制
async def main():
sem = asyncio.Semaphore(10)
async def limited_fetch(url):
async with sem:
return await fetch(url)
results = await asyncio.gather(*[limited_fetch(url) for url in urls])
陷阱 6:共享可变状态
# ❌ 异步代码中的竞态条件
counter = 0
async def increment():
global counter
value = counter # 读
await asyncio.sleep(0) # 让出执行权
counter = value + 1 # 写(可能覆盖其他协程的写入)
# ✅ 使用 asyncio.Lock
lock = asyncio.Lock()
async def increment():
global counter
async with lock:
counter += 1
常见面试问题
Q1: asyncio.gather vs asyncio.TaskGroup?
答案:
| 特性 | gather | TaskGroup(3.11+) |
|---|---|---|
| 错误处理 | return_exceptions=True | 自动取消其他任务 |
| 结构化并发 | 否 | 是 |
| 推荐 | 兼容旧版本 | 新代码首选 |
# TaskGroup(推荐)
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch(url1))
task2 = tg.create_task(fetch(url2))
# 任一失败,其余自动取消
Q2: 如何检测事件循环被阻塞?
答案:
# 开启 debug 模式
asyncio.run(main(), debug=True)
# 会打印警告:Executing <Task> took 0.XXX seconds
Q3: 如何优雅关闭异步服务?
答案:
import signal
async def shutdown(signal, loop):
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(shutdown(s, loop)))