跳到主要内容

设计日志收集系统

问题

如何用 Python 设计日志收集与分析系统?如何实现结构化日志?

答案

架构

结构化日志

logger.py
import structlog
import logging

def setup_logger():
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)

logger = structlog.get_logger()

# 使用
logger.info("user.login", user_id=12345, ip="192.168.1.1")
# 输出: {"event": "user.login", "user_id": 12345, "ip": "192.168.1.1",
# "level": "info", "timestamp": "2024-01-01T00:00:00Z"}

# 绑定上下文(请求级)
log = logger.bind(request_id="abc-123", trace_id="xyz-789")
log.info("order.created", order_id=1001)

请求链路追踪中间件

middleware/trace.py
import uuid
import structlog
from starlette.middleware.base import BaseHTTPMiddleware

class TraceMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
trace_id = request.headers.get("X-Trace-ID", str(uuid.uuid4()))
# 将 trace_id 绑定到当前上下文
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
trace_id=trace_id,
method=request.method,
path=request.url.path,
)
logger.info("request.start")
response = await call_next(request)
logger.info("request.end", status=response.status_code)
response.headers["X-Trace-ID"] = trace_id
return response

日志采集 SDK

log_collector/sdk.py
import json
import asyncio
import aiohttp
from collections import deque
from threading import Thread, Event

class LogCollector:
"""异步批量日志上报 SDK"""
def __init__(self, endpoint: str, batch_size: int = 50, flush_interval: float = 5):
self.endpoint = endpoint
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer: deque[dict] = deque(maxlen=10000)
self._stop = Event()
self._thread = Thread(target=self._flush_loop, daemon=True)
self._thread.start()

def log(self, event: str, **kwargs):
self.buffer.append({"event": event, **kwargs})
if len(self.buffer) >= self.batch_size:
self._flush()

def _flush_loop(self):
while not self._stop.is_set():
self._stop.wait(self.flush_interval)
if self.buffer:
self._flush()

def _flush(self):
batch = []
while self.buffer and len(batch) < self.batch_size:
batch.append(self.buffer.popleft())
if batch:
self._send(batch)

def _send(self, batch: list[dict]):
import requests
try:
requests.post(self.endpoint, json=batch, timeout=5)
except Exception:
# 发送失败,放回队列
self.buffer.extendleft(reversed(batch))

def close(self):
self._stop.set()
self._flush()

常见面试问题

Q1: logging vs loguru vs structlog?

答案

特点适用
logging标准库,配置复杂简单项目
loguru零配置,API 简洁脚本、小项目
structlog结构化日志,JSON 输出微服务、生产系统

Q2: 日志级别如何设计?

答案

  • DEBUG:开发调试,生产环境关闭
  • INFO:关键业务事件(用户登录、订单创建)
  • WARNING:异常但可恢复(重试成功、降级触发)
  • ERROR:需关注的错误(接口失败、数据异常)
  • CRITICAL:严重故障(服务不可用、数据丢失)

Q3: 日志量太大怎么办?

答案

  1. 采样:高频日志按比例采样(如 1%)
  2. 分级存储:ERROR 存 30 天,INFO 存 7 天
  3. 异步写入:不阻塞主线程
  4. 结构化索引:只对关键字段建索引

相关链接