跳到主要内容

实现延迟队列

问题

如何用 Go 实现延迟队列?支持定时执行任务。

答案

方案对比

方案精度持久化分布式复杂度
time.AfterFunc
最小堆
Redis ZSet秒级
时间轮槽精度

方案一:最小堆

type DelayTask struct {
ExecuteAt time.Time
Fn func()
}

type DelayQueue struct {
mu sync.Mutex
tasks []DelayTask // 小顶堆
notify chan struct{}
}

func NewDelayQueue() *DelayQueue {
dq := &DelayQueue{notify: make(chan struct{}, 1)}
go dq.run()
return dq
}

func (dq *DelayQueue) Add(delay time.Duration, fn func()) {
dq.mu.Lock()
task := DelayTask{ExecuteAt: time.Now().Add(delay), Fn: fn}
heap.Push(dq, task)
dq.mu.Unlock()

// 通知可能有更早的任务
select {
case dq.notify <- struct{}{}:
default:
}
}

func (dq *DelayQueue) run() {
for {
dq.mu.Lock()
if dq.Len() == 0 {
dq.mu.Unlock()
<-dq.notify // 等待新任务
continue
}

next := dq.tasks[0]
delay := time.Until(next.ExecuteAt)
dq.mu.Unlock()

if delay <= 0 {
// 到时间了,执行
dq.mu.Lock()
task := heap.Pop(dq).(DelayTask)
dq.mu.Unlock()
go task.Fn()
continue
}

// 等到执行时间或有新任务
select {
case <-time.After(delay):
case <-dq.notify:
}
}
}

// 实现 heap.Interface
func (dq *DelayQueue) Len() int { return len(dq.tasks) }
func (dq *DelayQueue) Less(i, j int) bool { return dq.tasks[i].ExecuteAt.Before(dq.tasks[j].ExecuteAt) }
func (dq *DelayQueue) Swap(i, j int) { dq.tasks[i], dq.tasks[j] = dq.tasks[j], dq.tasks[i] }
func (dq *DelayQueue) Push(x interface{}) { dq.tasks = append(dq.tasks, x.(DelayTask)) }
func (dq *DelayQueue) Pop() interface{} {
old := dq.tasks
n := len(old)
task := old[n-1]
dq.tasks = old[:n-1]
return task
}

方案二:Redis ZSet(分布式)

// 添加延迟任务:score = 执行时间戳
func AddDelayTask(rdb *redis.Client, taskID string, payload string, delay time.Duration) {
executeAt := float64(time.Now().Add(delay).Unix())
rdb.ZAdd(context.Background(), "delay_queue", redis.Z{
Score: executeAt,
Member: taskID + ":" + payload,
})
}

// 消费者:轮询到期任务
func ConsumeDelayTasks(rdb *redis.Client, handler func(string)) {
for {
now := float64(time.Now().Unix())
// 取出所有已到期的任务
results, _ := rdb.ZRangeByScore(context.Background(), "delay_queue", &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%f", now),
Count: 10,
}).Result()

for _, item := range results {
// 原子删除(防止多消费者重复消费)
removed, _ := rdb.ZRem(context.Background(), "delay_queue", item).Result()
if removed > 0 {
handler(item)
}
}

if len(results) == 0 {
time.Sleep(500 * time.Millisecond) // 无任务时短暂休眠
}
}
}

使用示例

func main() {
// 内存版
dq := NewDelayQueue()
dq.Add(3*time.Second, func() {
fmt.Println("3秒后执行")
})
dq.Add(1*time.Second, func() {
fmt.Println("1秒后执行")
})

// Redis 版
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
AddDelayTask(rdb, "order:cancel", `{"order_id":"123"}`, 15*time.Minute)

go ConsumeDelayTasks(rdb, func(item string) {
fmt.Printf("执行延迟任务: %s\n", item)
})
}

常见面试问题

Q1: 延迟队列的典型应用场景?

答案

  • 订单超时未支付自动取消(15 分钟)
  • 延迟重试(发送失败的消息)
  • 定时提醒(会议通知)
  • 延迟删除(软删除后 30 天清理)

Q2: Redis ZSet 轮询的缺点?

答案

  • 轮询有延迟(取决于 sleep 间隔)
  • 高频轮询浪费 Redis 资源
  • 改进:用 Redis Stream 或 BLPOP 阻塞等待

相关链接