第三方 API 集成问题
问题
集成第三方 API 时常见哪些问题?如何设计健壮的 API 调用?
答案
核心问题
健壮的 HTTP 客户端
api/client.py
import httpx
import asyncio
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logger = logging.getLogger(__name__)
class APIClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.client = httpx.AsyncClient(
base_url=base_url,
timeout=httpx.Timeout(connect=5, read=30, write=10),
headers={"Authorization": f"Bearer {api_key}"},
limits=httpx.Limits(max_connections=20, max_keepalive_connections=10),
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=30),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
before_sleep=lambda retry_state: logger.warning(
f"重试第 {retry_state.attempt_number} 次"
),
)
async def get(self, path: str, **kwargs) -> dict:
response = await self.client.get(path, **kwargs)
response.raise_for_status()
return response.json()
async def close(self):
await self.client.aclose()
限流处理
api/rate_limit.py
import asyncio
import time
class RateLimitedClient(APIClient):
def __init__(self, base_url: str, api_key: str, max_per_second: float = 10):
super().__init__(base_url, api_key)
self.semaphore = asyncio.Semaphore(int(max_per_second))
self.min_interval = 1.0 / max_per_second
self.last_request = 0.0
async def get(self, path: str, **kwargs) -> dict:
async with self.semaphore:
# 主动限流
elapsed = time.monotonic() - self.last_request
if elapsed < self.min_interval:
await asyncio.sleep(self.min_interval - elapsed)
self.last_request = time.monotonic()
try:
return await super().get(path, **kwargs)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
# 读取 Retry-After Header
retry_after = int(e.response.headers.get("Retry-After", 60))
logger.warning(f"限流,等待 {retry_after}s")
await asyncio.sleep(retry_after)
return await super().get(path, **kwargs)
raise
熔断降级
api/circuit_breaker.py
import time
from enum import Enum
class State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 30):
self.state = State.CLOSED
self.failures = 0
self.threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure = 0.0
async def call(self, func, *args, fallback=None, **kwargs):
if self.state == State.OPEN:
if time.monotonic() - self.last_failure > self.recovery_timeout:
self.state = State.HALF_OPEN
elif fallback:
return fallback()
else:
raise RuntimeError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
if fallback:
return fallback()
raise
def _on_success(self):
self.failures = 0
self.state = State.CLOSED
def _on_failure(self):
self.failures += 1
self.last_failure = time.monotonic()
if self.failures >= self.threshold:
self.state = State.OPEN
# 使用
breaker = CircuitBreaker()
result = await breaker.call(
client.get, "/api/data",
fallback=lambda: {"data": [], "source": "cache"},
)
常见面试问题
Q1: 重试策略怎么选?
答案:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| 固定间隔 | 每次等相同时间 | 简单场景 |
| 指数退避 | 1s → 2s → 4s → 8s | 推荐默认 |
| 指数退避+抖动 | 加随机偏移 | 多客户端避免雪崩 |
Q2: 如何处理 API 版本变更?
答案:
- 响应数据用 Pydantic 模型校验,字段缺失时快速发现
- 版本号固定在 URL 中(
/v2/users) - 定期跑集成测试检测 API 变化
Q3: 如何测试第三方 API 集成?
答案:
# 使用 respx mock HTTP 请求
import respx
@respx.mock
async def test_api_client():
respx.get("https://api.example.com/users").mock(
return_value=httpx.Response(200, json={"users": []})
)
result = await client.get("/users")
assert result == {"users": []}