跳到主要内容

实现日志收集器

问题

如何用 Go 实现一个高性能的日志收集器?支持异步写入、批量发送。

答案

核心设计

异步日志收集器

type LogEntry struct {
Level string `json:"level"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Timestamp time.Time `json:"timestamp"`
}

type AsyncLogger struct {
ch chan LogEntry
writer io.Writer
wg sync.WaitGroup
batchSize int
flushInterval time.Duration
}

func NewAsyncLogger(writer io.Writer, bufSize, batchSize int, flushInterval time.Duration) *AsyncLogger {
l := &AsyncLogger{
ch: make(chan LogEntry, bufSize),
writer: writer,
batchSize: batchSize,
flushInterval: flushInterval,
}
l.wg.Add(1)
go l.consumeLoop()
return l
}

func (l *AsyncLogger) Log(level, message string, fields map[string]interface{}) {
entry := LogEntry{
Level: level,
Message: message,
Fields: fields,
Timestamp: time.Now(),
}

select {
case l.ch <- entry:
default:
// channel 满:根据策略丢弃或阻塞
fmt.Fprintf(os.Stderr, "日志缓冲满,丢弃: %s\n", message)
}
}

// 批量消费循环
func (l *AsyncLogger) consumeLoop() {
defer l.wg.Done()

batch := make([]LogEntry, 0, l.batchSize)
ticker := time.NewTicker(l.flushInterval)
defer ticker.Stop()

for {
select {
case entry, ok := <-l.ch:
if !ok {
// channel 关闭,刷出剩余日志
l.flush(batch)
return
}
batch = append(batch, entry)
if len(batch) >= l.batchSize {
l.flush(batch)
batch = batch[:0] // 复用 slice
}

case <-ticker.C:
// 定时刷出(防止低流量时日志延迟太大)
if len(batch) > 0 {
l.flush(batch)
batch = batch[:0]
}
}
}
}

func (l *AsyncLogger) flush(batch []LogEntry) {
for _, entry := range batch {
data, _ := json.Marshal(entry)
data = append(data, '\n')
l.writer.Write(data)
}
}

func (l *AsyncLogger) Close() {
close(l.ch) // 触发 consumeLoop 退出
l.wg.Wait() // 等待剩余日志写完
}

使用示例

func main() {
file, _ := os.OpenFile("app.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
defer file.Close()

logger := NewAsyncLogger(
file,
10000, // 缓冲 1 万条
100, // 每 100 条刷一次
time.Second, // 或每秒刷一次
)
defer logger.Close()

logger.Log("INFO", "服务启动", map[string]interface{}{"port": 8080})
logger.Log("ERROR", "请求失败", map[string]interface{}{"url": "/api/user", "status": 500})
}

发送到 Kafka

func NewKafkaLogger(brokers []string, topic string) *AsyncLogger {
producer := newKafkaProducer(brokers)
writer := &kafkaWriter{producer: producer, topic: topic}
return NewAsyncLogger(writer, 10000, 200, time.Second)
}

type kafkaWriter struct {
producer sarama.SyncProducer
topic string
}

func (w *kafkaWriter) Write(p []byte) (int, error) {
msg := &sarama.ProducerMessage{
Topic: w.topic,
Value: sarama.ByteEncoder(p),
}
_, _, err := w.producer.SendMessage(msg)
return len(p), err
}

日志轮转

import "gopkg.in/natefinch/lumberjack.v2"

// lumberjack 自动处理日志文件轮转
writer := &lumberjack.Logger{
Filename: "/var/log/app.log",
MaxSize: 100, // 100MB 后轮转
MaxBackups: 7, // 保留 7 个旧文件
MaxAge: 30, // 保留 30 天
Compress: true, // gzip 压缩旧文件
}

logger := NewAsyncLogger(writer, 10000, 100, time.Second)

常见面试问题

Q1: 日志缓冲满了怎么办?

答案:三种策略:

  • 丢弃select default):不影响业务性能
  • 阻塞(直接 ch <-):可能影响业务
  • 降级:丢弃低级别日志(DEBUG/INFO),保留 ERROR

Q2: 如何保证进程退出时日志不丢失?

答案

  • Close() 关闭 channel 并 wg.Wait() 等待消费完
  • 配合优雅关闭(signal 处理),确保 logger.Close() 在最后执行

相关链接