跳到主要内容

接口响应慢排查

问题

Python Web 接口响应时间过长怎么排查和优化?

答案

排查流程

中间件计时

middleware/timing.py
import time
import logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger(__name__)

class TimingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start

# 慢接口告警
if duration > 1.0:
logger.warning(
f"Slow API: {request.method} {request.url.path} "
f"took {duration:.3f}s"
)
response.headers["X-Response-Time"] = f"{duration:.3f}"
return response

cProfile 分析

debug/profile_api.py
import cProfile
import pstats
from io import StringIO

def profile_endpoint():
"""对慢接口做 cProfile 分析"""
profiler = cProfile.Profile()
profiler.enable()

# 模拟接口逻辑
result = slow_business_logic()

profiler.disable()
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream).sort_stats("cumulative")
stats.print_stats(20)
print(stream.getvalue())

数据库慢查询

debug/slow_query.py
import logging
from sqlalchemy import event

# SQLAlchemy 查询计时
logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)

# 自定义慢查询检测
@event.listens_for(Engine, "before_cursor_execute")
def before_execute(conn, cursor, statement, parameters, context, executemany):
conn.info["query_start"] = time.perf_counter()

@event.listens_for(Engine, "after_cursor_execute")
def after_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.perf_counter() - conn.info["query_start"]
if duration > 0.5: # 超过 500ms
logger.warning(f"Slow query ({duration:.3f}s): {statement[:200]}")

常见优化手段

optimizations.py
# 1. N+1 查询优化
# ❌ N+1
users = session.query(User).all()
for user in users:
print(user.orders) # 每个用户触发一次 SQL

# ✅ joinedload 预加载
from sqlalchemy.orm import joinedload
users = session.query(User).options(joinedload(User.orders)).all()

# 2. 异步化外部调用
import asyncio
import httpx

async def get_user_data(user_id: int):
async with httpx.AsyncClient() as client:
# 并行请求
profile, orders, notifications = await asyncio.gather(
client.get(f"/api/profile/{user_id}"),
client.get(f"/api/orders/{user_id}"),
client.get(f"/api/notifications/{user_id}"),
)
return {**profile.json(), "orders": orders.json(), "notifications": notifications.json()}

# 3. 用 orjson 加速序列化
import orjson

@app.get("/users")
async def list_users():
users = await get_users()
return Response(
content=orjson.dumps(users),
media_type="application/json",
)

# 4. 缓存热点数据
from functools import lru_cache

@lru_cache(maxsize=256)
def get_config(key: str) -> str:
return db.query(Config).filter_by(key=key).first().value

常见面试问题

Q1: 接口慢的常见原因?

答案

原因排查方法优化方案
慢 SQLSQLAlchemy 日志加索引、优化查询
N+1 查询计数 SQL 条数joinedload / selectinload
外部 API 慢计时中间件异步并发、缓存、超时
序列化慢cProfile 分析orjson、精简字段
大量计算pprof缓存结果、异步任务

Q2: 如何设置合理的超时?

答案

# 数据库查询超时
engine = create_engine(url, pool_timeout=5, connect_args={"connect_timeout": 3})

# HTTP 客户端超时
async with httpx.AsyncClient(timeout=httpx.Timeout(connect=3, read=10)) as client:
...

# 接口整体超时
@app.middleware("http")
async def timeout_middleware(request, call_next):
try:
return await asyncio.wait_for(call_next(request), timeout=30)
except asyncio.TimeoutError:
return JSONResponse({"error": "timeout"}, status_code=504)

Q3: 缓存应该放哪一层?

答案

  1. 应用内lru_cache、进程内字典(最快,但进程间不共享)
  2. Redis:跨进程共享,毫秒级延迟
  3. CDN:静态资源和不常变的 API 响应

相关链接