实现延迟队列
问题
如何用 Go 实现延迟队列?支持定时执行任务。
答案
方案对比
| 方案 | 精度 | 持久化 | 分布式 | 复杂度 |
|---|---|---|---|---|
| time.AfterFunc | 高 | 无 | 否 | 低 |
| 最小堆 | 高 | 无 | 否 | 中 |
| Redis ZSet | 秒级 | 有 | 是 | 中 |
| 时间轮 | 槽精度 | 无 | 否 | 中 |
方案一:最小堆
type DelayTask struct {
ExecuteAt time.Time
Fn func()
}
type DelayQueue struct {
mu sync.Mutex
tasks []DelayTask // 小顶堆
notify chan struct{}
}
func NewDelayQueue() *DelayQueue {
dq := &DelayQueue{notify: make(chan struct{}, 1)}
go dq.run()
return dq
}
func (dq *DelayQueue) Add(delay time.Duration, fn func()) {
dq.mu.Lock()
task := DelayTask{ExecuteAt: time.Now().Add(delay), Fn: fn}
heap.Push(dq, task)
dq.mu.Unlock()
// 通知可能有更早的任务
select {
case dq.notify <- struct{}{}:
default:
}
}
func (dq *DelayQueue) run() {
for {
dq.mu.Lock()
if dq.Len() == 0 {
dq.mu.Unlock()
<-dq.notify // 等待新任务
continue
}
next := dq.tasks[0]
delay := time.Until(next.ExecuteAt)
dq.mu.Unlock()
if delay <= 0 {
// 到时间了,执行
dq.mu.Lock()
task := heap.Pop(dq).(DelayTask)
dq.mu.Unlock()
go task.Fn()
continue
}
// 等到执行时间或有新任务
select {
case <-time.After(delay):
case <-dq.notify:
}
}
}
// 实现 heap.Interface
func (dq *DelayQueue) Len() int { return len(dq.tasks) }
func (dq *DelayQueue) Less(i, j int) bool { return dq.tasks[i].ExecuteAt.Before(dq.tasks[j].ExecuteAt) }
func (dq *DelayQueue) Swap(i, j int) { dq.tasks[i], dq.tasks[j] = dq.tasks[j], dq.tasks[i] }
func (dq *DelayQueue) Push(x interface{}) { dq.tasks = append(dq.tasks, x.(DelayTask)) }
func (dq *DelayQueue) Pop() interface{} {
old := dq.tasks
n := len(old)
task := old[n-1]
dq.tasks = old[:n-1]
return task
}
方案二:Redis ZSet(分布式)
// 添加延迟任务:score = 执行时间戳
func AddDelayTask(rdb *redis.Client, taskID string, payload string, delay time.Duration) {
executeAt := float64(time.Now().Add(delay).Unix())
rdb.ZAdd(context.Background(), "delay_queue", redis.Z{
Score: executeAt,
Member: taskID + ":" + payload,
})
}
// 消费者:轮询到期任务
func ConsumeDelayTasks(rdb *redis.Client, handler func(string)) {
for {
now := float64(time.Now().Unix())
// 取出所有已到期的任务
results, _ := rdb.ZRangeByScore(context.Background(), "delay_queue", &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%f", now),
Count: 10,
}).Result()
for _, item := range results {
// 原子删除(防止多消费者重复消费)
removed, _ := rdb.ZRem(context.Background(), "delay_queue", item).Result()
if removed > 0 {
handler(item)
}
}
if len(results) == 0 {
time.Sleep(500 * time.Millisecond) // 无任务时短暂休眠
}
}
}
使用示例
func main() {
// 内存版
dq := NewDelayQueue()
dq.Add(3*time.Second, func() {
fmt.Println("3秒后执行")
})
dq.Add(1*time.Second, func() {
fmt.Println("1秒后执行")
})
// Redis 版
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
AddDelayTask(rdb, "order:cancel", `{"order_id":"123"}`, 15*time.Minute)
go ConsumeDelayTasks(rdb, func(item string) {
fmt.Printf("执行延迟任务: %s\n", item)
})
}
常见面试问题
Q1: 延迟队列的典型应用场景?
答案:
- 订单超时未支付自动取消(15 分钟)
- 延迟重试(发送失败的消息)
- 定时提醒(会议通知)
- 延迟删除(软删除后 30 天清理)
Q2: Redis ZSet 轮询的缺点?
答案:
- 轮询有延迟(取决于 sleep 间隔)
- 高频轮询浪费 Redis 资源
- 改进:用 Redis Stream 或
BLPOP阻塞等待