设计聊天服务器
问题
如何用 Rust 设计一个支持多房间的实时聊天服务器?
答案
架构设计
基于 Axum + broadcast 的聊天服务器
use axum::{
Router, routing::get,
extract::ws::{WebSocket, WebSocketUpgrade, Message},
extract::{State, Query},
response::IntoResponse,
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
/// 房间管理器
struct RoomManager {
rooms: RwLock<HashMap<String, broadcast::Sender<ChatMessage>>>,
}
#[derive(Clone, Debug)]
struct ChatMessage {
room: String,
user: String,
content: String,
}
impl RoomManager {
fn new() -> Self {
Self { rooms: RwLock::new(HashMap::new()) }
}
/// 获取或创建房间的广播频道
async fn get_room(&self, name: &str) -> broadcast::Sender<ChatMessage> {
let rooms = self.rooms.read().await;
if let Some(tx) = rooms.get(name) {
return tx.clone();
}
drop(rooms);
let mut rooms = self.rooms.write().await;
let (tx, _) = broadcast::channel(1000);
rooms.entry(name.to_string())
.or_insert(tx)
.clone()
}
}
/// WebSocket 连接处理
async fn ws_handler(
ws: WebSocketUpgrade,
State(manager): State<Arc<RoomManager>>,
Query(params): Query<HashMap<String, String>>,
) -> impl IntoResponse {
let room = params.get("room").cloned().unwrap_or("lobby".into());
let user = params.get("user").cloned().unwrap_or("anonymous".into());
ws.on_upgrade(move |socket| handle_socket(socket, manager, room, user))
}
async fn handle_socket(
socket: WebSocket,
manager: Arc<RoomManager>,
room: String,
user: String,
) {
let tx = manager.get_room(&room).await;
let mut rx = tx.subscribe();
let (mut ws_sender, mut ws_receiver) = socket.split();
// 接收 WebSocket 消息 → 广播到房间
let tx_clone = tx.clone();
let user_clone = user.clone();
let room_clone = room.clone();
let send_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = ws_receiver.next().await {
let _ = tx_clone.send(ChatMessage {
room: room_clone.clone(),
user: user_clone.clone(),
content: text,
});
}
});
// 接收房间广播 → 发送到 WebSocket
let recv_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
let text = format!("[{}] {}: {}", msg.room, msg.user, msg.content);
if ws_sender.send(Message::Text(text)).await.is_err() {
break;
}
}
});
// 任意一个 task 结束,取消另一个
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
}
关键设计决策
| 决策 | 选择 | 原因 |
|---|---|---|
| 广播 | tokio::sync::broadcast | 多生产者多消费者 |
| 房间隔离 | 每个房间独立 channel | 消息不跨房间 |
| 连接管理 | tokio task per connection | 轻量,自动回收 |
| 消息序列化 | JSON | 前端友好 |
常见面试问题
Q1: 如何扩展到多服务器?
答案:
单机的 broadcast::channel 只在进程内有效。多服务器需要外部消息中间件:
- Redis Pub/Sub:每台服务器订阅房间频道,收到消息后转发给本地连接
- NATS:轻量级消息系统,天然支持发布订阅
- Kafka:持久化消息,支持消息回放
架构变为:Client ↔ Server ↔ Redis/NATS ↔ Server ↔ Client