设计任务调度器
问题
如何用 Rust 设计一个支持定时任务和延迟任务的调度器?
答案
时间轮 + 优先队列架构
核心实现
use std::collections::BinaryHeap;
use std::cmp::Reverse;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Instant, Duration};
type Task = Box<dyn FnOnce() -> futures::future::BoxFuture<'static, ()> + Send>;
struct ScheduledTask {
run_at: Instant,
task: Task,
}
impl PartialEq for ScheduledTask {
fn eq(&self, other: &Self) -> bool {
self.run_at == other.run_at
}
}
impl Eq for ScheduledTask {}
impl PartialOrd for ScheduledTask {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduledTask {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// 反转顺序,让最小的在堆顶
other.run_at.cmp(&self.run_at)
}
}
pub struct Scheduler {
tasks: Arc<Mutex<BinaryHeap<ScheduledTask>>>,
notify: tokio::sync::Notify,
}
impl Scheduler {
pub fn new() -> Self {
Self {
tasks: Arc::new(Mutex::new(BinaryHeap::new())),
notify: tokio::sync::Notify::new(),
}
}
/// 调度延迟任务
pub async fn schedule_after<F, Fut>(&self, delay: Duration, f: F)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let task = ScheduledTask {
run_at: Instant::now() + delay,
task: Box::new(move || Box::pin(f())),
};
self.tasks.lock().await.push(task);
self.notify.notify_one(); // 通知调度循环
}
/// 调度循环
pub async fn run(&self) {
loop {
let next_wake = {
let mut tasks = self.tasks.lock().await;
if let Some(task) = tasks.peek() {
let now = Instant::now();
if task.run_at <= now {
// 到期,执行任务
let task = tasks.pop().unwrap();
tokio::spawn((task.task)());
continue;
}
Some(task.run_at - now)
} else {
None
}
};
match next_wake {
Some(duration) => {
tokio::select! {
_ = tokio::time::sleep(duration) => {},
_ = self.notify.notified() => {},
}
}
None => {
self.notify.notified().await;
}
}
}
}
}
使用示例
let scheduler = Arc::new(Scheduler::new());
// 5 秒后执行
scheduler.schedule_after(Duration::from_secs(5), || async {
println!("5 seconds later!");
}).await;
// 启动调度器
scheduler.run().await;
常见面试问题
Q1: 时间轮 vs 最小堆?
答案:
| 方案 | 添加任务 | 取下一个任务 | 适用场景 |
|---|---|---|---|
| 最小堆 | O(log n) | O(log n) | 任务数量中等 |
| 时间轮 | O(1) | O(1) | 大量定时任务 |
| 层级时间轮 | O(1) | O(1) | 跨度大的定时器 |
Tokio 内部使用层级时间轮管理所有 sleep/timeout。