设计日志收集器
问题
如何用 Rust 设计一个高吞吐的日志收集系统?
答案
架构设计
异步日志收集器
use tokio::sync::mpsc;
use tokio::io::AsyncWriteExt;
use tokio::fs::OpenOptions;
use std::time::Duration;
#[derive(Debug)]
pub struct LogEntry {
pub level: LogLevel,
pub message: String,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug)]
pub enum LogLevel { Info, Warn, Error }
pub struct LogCollector {
sender: mpsc::Sender<LogEntry>,
}
impl LogCollector {
pub fn new(output_path: &str) -> Self {
let (tx, rx) = mpsc::channel::<LogEntry>(10_000);
let path = output_path.to_string();
// 后台写入 task
tokio::spawn(async move {
Self::writer_loop(rx, &path).await;
});
Self { sender: tx }
}
pub async fn log(&self, level: LogLevel, message: String) {
let entry = LogEntry {
level,
message,
timestamp: chrono::Utc::now(),
};
// 非阻塞发送,channel 满了丢弃(日志不阻塞业务)
let _ = self.sender.try_send(entry);
}
/// 后台批量写入循环
async fn writer_loop(mut rx: mpsc::Receiver<LogEntry>, path: &str) {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await
.unwrap();
let mut buffer = Vec::with_capacity(100);
loop {
// 批量收集:等待第一条或超时
tokio::select! {
Some(entry) = rx.recv() => {
buffer.push(entry);
// 尽量多取(非阻塞)
while buffer.len() < 100 {
match rx.try_recv() {
Ok(entry) => buffer.push(entry),
Err(_) => break,
}
}
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
}
// 批量写入
if !buffer.is_empty() {
let mut output = String::new();
for entry in buffer.drain(..) {
output.push_str(&format!(
"[{}] {:?}: {}\n",
entry.timestamp.format("%Y-%m-%d %H:%M:%S"),
entry.level,
entry.message
));
}
let _ = file.write_all(output.as_bytes()).await;
let _ = file.flush().await;
}
}
}
}
关键设计决策
| 决策 | 选择 | 原因 |
|---|---|---|
| 传输 | mpsc::channel | 多个生产者发送到一个写入线程 |
| 背压 | try_send(丢弃) | 日志不应阻塞业务逻辑 |
| 写入 | 批量 flush | 减少系统调用次数 |
| 文件轮转 | 按日期/大小 | 防止单文件过大 |
| 格式 | JSON / 结构化 | 便于后续分析 |
常见面试问题
Q1: 日志系统如何做到不影响业务性能?
答案:
- 异步写入:日志 API 只是往 channel 发消息,不等待写入完成
try_send:channel 满时丢弃日志,不阻塞业务- 批量刷盘:聚合多条日志后一次写入,减少 IO 系统调用
- 独立线程:写入在单独的 task 中,不竞争业务线程