设计日志收集系统
问题
如何用 Go 设计一个高吞吐、低延迟的日志收集系统,支持结构化日志、多级采集和查询?
答案
整体架构
日志模型
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"` // DEBUG, INFO, WARN, ERROR
Service string `json:"service"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
Message string `json:"message"`
Fields map[string]any `json:"fields"` // 结构化字段
Caller string `json:"caller"` // 调用位置
Host string `json:"host"`
}
Agent 采集端
Agent 部署在每台机器上,负责本地日志采集和批量转发:
type Agent struct {
buffer chan *LogEntry // 缓冲通道
producer *kafka.Writer
batch []*LogEntry
mu sync.Mutex
}
func NewAgent(kafkaBrokers []string, topic string) *Agent {
a := &Agent{
buffer: make(chan *LogEntry, 10000), // 缓冲区
producer: &kafka.Writer{
Addr: kafka.TCP(kafkaBrokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
// 批量发送配置
BatchSize: 1000,
BatchTimeout: 100 * time.Millisecond,
},
}
go a.flushLoop()
return a
}
// 接收日志
func (a *Agent) Collect(entry *LogEntry) {
select {
case a.buffer <- entry:
default:
// 缓冲满了,丢弃或写本地磁盘兜底
log.Println("日志缓冲区满,丢弃日志")
}
}
// 批量刷写到 Kafka
func (a *Agent) flushLoop() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
batch := make([]kafka.Message, 0, 1000)
for {
select {
case entry := <-a.buffer:
data, _ := json.Marshal(entry)
batch = append(batch, kafka.Message{Value: data})
// 达到批量大小立即发送
if len(batch) >= 1000 {
a.sendBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
a.sendBatch(batch)
batch = batch[:0]
}
}
}
}
func (a *Agent) sendBatch(batch []kafka.Message) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := a.producer.WriteMessages(ctx, batch...); err != nil {
log.Printf("发送 Kafka 失败: %v", err)
// 降级:写本地磁盘
}
}
消费处理服务
type LogConsumer struct {
reader *kafka.Reader
es *elasticsearch.Client
}
func (c *LogConsumer) Consume(ctx context.Context) {
for {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
log.Printf("读取消息失败: %v", err)
continue
}
var entry LogEntry
if err := json.Unmarshal(msg.Value, &entry); err != nil {
continue
}
// 写入 Elasticsearch
indexName := fmt.Sprintf("logs-%s-%s", entry.Service, entry.Timestamp.Format("2006.01.02"))
data, _ := json.Marshal(entry)
c.es.Index(indexName, bytes.NewReader(data))
}
}
日志采样
高流量场景对 DEBUG/INFO 日志做采样,降低存储量:
type Sampler struct {
counter atomic.Int64
rate int64 // 每 N 条采 1 条
}
func (s *Sampler) ShouldLog(level string) bool {
switch level {
case "ERROR", "WARN":
return true // 错误日志全量采集
default:
n := s.counter.Add(1)
return n%s.rate == 0
}
}
关键设计决策
| 问题 | 方案 |
|---|---|
| 日志丢失 | Agent 本地磁盘兜底 + Kafka 持久化 |
| 高吞吐 | 批量发送 + 异步 Channel |
| 存储膨胀 | 日志分级、采样、冷热分层(7天ES→归档到 S3) |
| 查询性能 | ES 按日期分索引 + 服务维度分片 |
| 敏感信息 | 消费端脱敏处理(手机号、身份证等) |
常见面试问题
Q1: Agent 挂了日志会丢吗?
答案:
- Agent 使用内存 Channel 缓冲,如果 Agent 进程崩溃,Channel 中的数据会丢失
- 可以加磁盘 WAL(Write-Ahead Log):先写本地文件,再异步发 Kafka
- Kafka 本身有持久化保障,发送成功的不会丢
Q2: 如何处理日志量激增(如故障时大量 ERROR)?
答案:
- Kafka 分区扩容 + 消费者水平扩展
- Agent 端限速(令牌桶限流)
- DEBUG/INFO 自动采样降级
- 告警触发后临时调整采样率
Q3: ES 索引策略如何设计?
答案:
- 按
service + 日期分索引:logs-user-service-2024.01.15 - 热数据用 SSD 节点,7 天以上转冷节点
- 30 天以上归档到对象存储
- 使用 ILM(Index Lifecycle Management)自动管理
Q4: Go 的日志库怎么选?
答案:
| 库 | 特点 |
|---|---|
log/slog | Go 1.21 标准库,结构化日志,推荐新项目 |
zap | Uber 出品,高性能,零分配 |
zerolog | 零分配 JSON 日志 |
logrus | 老牌库,API 友好但性能一般 |
生产环境推荐 zap 或 slog,配合 JSON 输出方便采集。