设计限流器
问题
如何用 Python 实现一个限流器?令牌桶和滑动窗口算法的原理和实现?
答案
令牌桶算法
rate_limiter/token_bucket.py
import time
import threading
class TokenBucket:
def __init__(self, rate: float, capacity: int):
"""
rate: 每秒放入的令牌数
capacity: 桶的最大容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.monotonic()
self.lock = threading.Lock()
def allow(self) -> bool:
with self.lock:
now = time.monotonic()
elapsed = now - self.last_refill
# 补充令牌
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
滑动窗口算法
rate_limiter/sliding_window.py
import time
from collections import deque
import threading
class SlidingWindowLog:
"""滑动窗口日志算法"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.timestamps: deque[float] = deque()
self.lock = threading.Lock()
def allow(self) -> bool:
with self.lock:
now = time.monotonic()
# 移除过期时间戳
while self.timestamps and self.timestamps[0] <= now - self.window:
self.timestamps.popleft()
if len(self.timestamps) < self.max_requests:
self.timestamps.append(now)
return True
return False
class SlidingWindowCounter:
"""滑动窗口计数器(近似算法,更省内存)"""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window = window_seconds
self.prev_count = 0
self.curr_count = 0
self.window_start = time.monotonic()
self.lock = threading.Lock()
def allow(self) -> bool:
with self.lock:
now = time.monotonic()
elapsed = now - self.window_start
if elapsed >= self.window:
self.prev_count = self.curr_count
self.curr_count = 0
self.window_start = now
elapsed = 0
# 加权计算
weight = 1 - elapsed / self.window
estimated = self.prev_count * weight + self.curr_count
if estimated < self.max_requests:
self.curr_count += 1
return True
return False
分布式限流(Redis)
rate_limiter/redis_limiter.py
import redis
import time
class RedisRateLimiter:
def __init__(self, redis_client: redis.Redis, key_prefix: str = "rl"):
self.redis = redis_client
self.prefix = key_prefix
def allow(self, key: str, max_requests: int, window: int) -> bool:
"""滑动窗口限流"""
redis_key = f"{self.prefix}:{key}"
now = time.time()
pipe = self.redis.pipeline()
# Sorted Set:score=时间戳,member=唯一标识
pipe.zremrangebyscore(redis_key, 0, now - window) # 移除过期
pipe.zcard(redis_key) # 当前计数
pipe.zadd(redis_key, {f"{now}": now}) # 添加当前请求
pipe.expire(redis_key, window)
results = pipe.execute()
current_count = results[1]
if current_count >= max_requests:
# 超限,移除刚添加的
self.redis.zrem(redis_key, f"{now}")
return False
return True
FastAPI 限流中间件
middleware/rate_limit.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, limiter: RedisRateLimiter):
super().__init__(app)
self.limiter = limiter
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
if not self.limiter.allow(client_ip, max_requests=100, window=60):
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": "60"},
)
return await call_next(request)
常见面试问题
Q1: 四种限流算法对比?
答案:
| 算法 | 优点 | 缺点 |
|---|---|---|
| 固定窗口 | 简单 | 边界突发 |
| 滑动窗口日志 | 精确 | 内存高 |
| 滑动窗口计数 | 近似、省内存 | 不精确 |
| 令牌桶 | 允许突发、平滑 | 实现稍复杂 |
| 漏桶 | 固定速率 | 不允许突发 |
Q2: 分布式限流难点?
答案:
- 时钟偏差:各节点时间不一致,用 Redis 服务器时间
- 原子性:Lua 脚本保证 Redis 操作原子
- 性能:高频限流可用本地预分配 + Redis 全局兜底
Q3: 限流触发后如何通知客户端?
答案:
- HTTP 429 状态码
Retry-AfterHeader 告知等待时间X-RateLimit-Remaining剩余额度X-RateLimit-Reset重置时间