设计连接池
问题
如何用 Rust 设计一个通用的异步连接池?
答案
核心设计
use tokio::sync::{Semaphore, Mutex};
use std::collections::VecDeque;
use std::sync::Arc;
/// 通用异步连接池
pub struct Pool<C> {
idle: Mutex<VecDeque<C>>, // 空闲连接队列
semaphore: Arc<Semaphore>, // 控制最大连接数
factory: Box<dyn Fn() -> futures::future::BoxFuture<'static, Result<C, Box<dyn std::error::Error + Send>>> + Send + Sync>,
max_size: usize,
}
impl<C: Send + 'static> Pool<C> {
/// 获取连接(可能等待可用连接)
pub async fn get(&self) -> Result<PooledConnection<C>, Box<dyn std::error::Error + Send>> {
// 获取许可(限制总连接数)
let permit = self.semaphore.clone().acquire_owned().await?;
// 尝试从空闲队列取
let conn = {
let mut idle = self.idle.lock().await;
idle.pop_front()
};
let conn = match conn {
Some(c) => c,
None => (self.factory)().await?, // 创建新连接
};
Ok(PooledConnection {
conn: Some(conn),
pool: self,
_permit: permit,
})
}
/// 归还连接到池中
async fn return_conn(&self, conn: C) {
let mut idle = self.idle.lock().await;
idle.push_back(conn);
}
}
/// RAII 守卫:Drop 时自动归还连接
pub struct PooledConnection<'a, C> {
conn: Option<C>,
pool: &'a Pool<C>,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl<C: Send + 'static> Drop for PooledConnection<'_, C> {
fn drop(&mut self) {
if let Some(conn) = self.conn.take() {
let pool_idle = self.pool.idle.clone();
// 在 Drop 中无法 async,用 spawn 归还
tokio::spawn(async move {
pool_idle.lock().await.push_back(conn);
});
}
}
}
关键设计要素
| 要素 | 说明 | Rust 实现 |
|---|---|---|
| 最大连接数 | 防止资源耗尽 | Semaphore |
| 空闲连接管理 | LIFO 回收 | VecDeque |
| 连接归还 | 自动归还 | RAII Drop |
| 健康检查 | 检测断开的连接 | 取出时 ping |
| 超时 | 等待连接超时 | tokio::time::timeout |
| 最小空闲 | 预热连接 | 后台 task 维护 |
Rust 的 RAII 优势
连接池最怕忘记归还连接(连接泄漏)。Rust 的 Drop trait 保证连接离开作用域时自动归还——编译时杜绝连接泄漏。
生产级连接池推荐
| 库 | 特点 |
|---|---|
deadpool | 通用异步连接池,支持 Postgres/Redis |
bb8 | 基于 r2d2 的异步版 |
sqlx 内置 | SQLx 自带连接池 |
mobc | 异步连接池,API 简洁 |
常见面试问题
Q1: 连接池的大小怎么设置?
答案:
经验公式:pool_size = CPU 核心数 * 2 + 磁盘数
但实际需要根据场景调整:
- CPU 密集:等于 CPU 核心数
- IO 密集:可以更大(连接在等待 IO 时不占 CPU)
- 监控调整:观察等待队列长度和连接利用率