接口响应慢排查
问题
Python Web 接口响应时间过长怎么排查和优化?
答案
排查流程
中间件计时
middleware/timing.py
import time
import logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger(__name__)
class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start
# 慢接口告警
if duration > 1.0:
logger.warning(
f"Slow API: {request.method} {request.url.path} "
f"took {duration:.3f}s"
)
response.headers["X-Response-Time"] = f"{duration:.3f}"
return response
cProfile 分析
debug/profile_api.py
import cProfile
import pstats
from io import StringIO
def profile_endpoint():
"""对慢接口做 cProfile 分析"""
profiler = cProfile.Profile()
profiler.enable()
# 模拟接口逻辑
result = slow_business_logic()
profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream).sort_stats("cumulative")
stats.print_stats(20)
print(stream.getvalue())
数据库慢查询
debug/slow_query.py
import logging
from sqlalchemy import event
# SQLAlchemy 查询计时
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
# 自定义慢查询检测
@event.listens_for(Engine, "before_cursor_execute")
def before_execute(conn, cursor, statement, parameters, context, executemany):
conn.info["query_start"] = time.perf_counter()
@event.listens_for(Engine, "after_cursor_execute")
def after_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.perf_counter() - conn.info["query_start"]
if duration > 0.5: # 超过 500ms
logger.warning(f"Slow query ({duration:.3f}s): {statement[:200]}")
常见优化手段
optimizations.py
# 1. N+1 查询优化
# ❌ N+1
users = session.query(User).all()
for user in users:
print(user.orders) # 每个用户触发一次 SQL
# ✅ joinedload 预加载
from sqlalchemy.orm import joinedload
users = session.query(User).options(joinedload(User.orders)).all()
# 2. 异步化外部调用
import asyncio
import httpx
async def get_user_data(user_id: int):
async with httpx.AsyncClient() as client:
# 并行请求
profile, orders, notifications = await asyncio.gather(
client.get(f"/api/profile/{user_id}"),
client.get(f"/api/orders/{user_id}"),
client.get(f"/api/notifications/{user_id}"),
)
return {**profile.json(), "orders": orders.json(), "notifications": notifications.json()}
# 3. 用 orjson 加速序列化
import orjson
@app.get("/users")
async def list_users():
users = await get_users()
return Response(
content=orjson.dumps(users),
media_type="application/json",
)
# 4. 缓存热点数据
from functools import lru_cache
@lru_cache(maxsize=256)
def get_config(key: str) -> str:
return db.query(Config).filter_by(key=key).first().value
常见面试问题
Q1: 接口慢的常见原因?
答案:
| 原因 | 排查方法 | 优化方案 |
|---|---|---|
| 慢 SQL | SQLAlchemy 日志 | 加索引、优化查询 |
| N+1 查询 | 计数 SQL 条数 | joinedload / selectinload |
| 外部 API 慢 | 计时中间件 | 异步并发、缓存、超时 |
| 序列化慢 | cProfile 分析 | orjson、精简字段 |
| 大量计算 | pprof | 缓存结果、异步任务 |
Q2: 如何设置合理的超时?
答案:
# 数据库查询超时
engine = create_engine(url, pool_timeout=5, connect_args={"connect_timeout": 3})
# HTTP 客户端超时
async with httpx.AsyncClient(timeout=httpx.Timeout(connect=3, read=10)) as client:
...
# 接口整体超时
@app.middleware("http")
async def timeout_middleware(request, call_next):
try:
return await asyncio.wait_for(call_next(request), timeout=30)
except asyncio.TimeoutError:
return JSONResponse({"error": "timeout"}, status_code=504)
Q3: 缓存应该放哪一层?
答案:
- 应用内:
lru_cache、进程内字典(最快,但进程间不共享) - Redis:跨进程共享,毫秒级延迟
- CDN:静态资源和不常变的 API 响应