设计消息队列
问题
如何用 Rust 设计一个进程内/分布式消息队列?
答案
进程内消息队列
use tokio::sync::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
type TopicName = String;
type Subscriber = mpsc::Sender<Message>;
#[derive(Clone, Debug)]
pub struct Message {
pub topic: String,
pub payload: Vec<u8>,
pub timestamp: u64,
}
/// 简单的发布订阅消息队列
pub struct MessageBroker {
subscribers: Arc<RwLock<HashMap<TopicName, Vec<Subscriber>>>>,
}
impl MessageBroker {
pub fn new() -> Self {
Self {
subscribers: Arc::new(RwLock::new(HashMap::new())),
}
}
/// 订阅主题,返回接收端
pub async fn subscribe(&self, topic: &str) -> mpsc::Receiver<Message> {
let (tx, rx) = mpsc::channel(1024);
let mut subs = self.subscribers.write().await;
subs.entry(topic.to_string())
.or_default()
.push(tx);
rx
}
/// 发布消息到主题
pub async fn publish(&self, msg: Message) {
let subs = self.subscribers.read().await;
if let Some(subscribers) = subs.get(&msg.topic) {
for sub in subscribers {
// 发送失败(订阅者断开)跳过
let _ = sub.send(msg.clone()).await;
}
}
}
}
持久化消息队列架构
use std::fs::{File, OpenOptions};
use std::io::{Write, BufWriter, Read, BufReader, Seek, SeekFrom};
use std::path::Path;
/// 基于文件的 CommitLog
pub struct CommitLog {
writer: BufWriter<File>,
current_offset: u64,
}
impl CommitLog {
pub fn open(path: &Path) -> std::io::Result<Self> {
let file = OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let current_offset = file.metadata()?.len();
Ok(Self {
writer: BufWriter::new(file),
current_offset,
})
}
/// 追加消息,返回 offset
pub fn append(&mut self, data: &[u8]) -> std::io::Result<u64> {
let offset = self.current_offset;
let len = data.len() as u32;
// 写入长度前缀 + 数据
self.writer.write_all(&len.to_be_bytes())?;
self.writer.write_all(data)?;
self.writer.flush()?;
self.current_offset += 4 + data.len() as u64;
Ok(offset)
}
}
核心设计决策
| 决策 | 选择 | 原因 |
|---|---|---|
| 存储 | 顺序写文件 | 磁盘顺序写速度接近内存 |
| 索引 | 稀疏索引 | 定期记录 offset → 文件位置 |
| 消费 | Pull 模型 | Consumer 控制消费速度 |
| 零拷贝 | sendfile | 跳过用户态,内核直接发送 |
| 序列化 | 长度前缀 | 简单高效,无需解析分隔符 |
常见面试问题
Q1: Rust 实现消息队列有什么优势?
答案:
- 无 GC 暂停:延迟稳定,P99 可控
- 零拷贝:直接操作字节切片,减少内存分配
- 所有权保证:消息传递自动转移所有权,无需手动管理
- channel 语义:
mpsc/broadcast已内置安全的消息传递 - 文件 IO:
tokio::fs+bytes提供高效的异步文件操作