设计定时任务系统
问题
如何用 Python 设计一个定时任务调度系统?APScheduler 和 Celery Beat 有什么区别?
答案
APScheduler
scheduler/apscheduler_demo.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.jobstores.redis import RedisJobStore
scheduler = AsyncIOScheduler(
jobstores={
"default": RedisJobStore(host="localhost", port=6379, db=2),
},
job_defaults={
"coalesce": True, # 错过多次只执行一次
"max_instances": 1, # 同一任务最多并行 1 个
"misfire_grace_time": 60, # 超时容忍 60 秒
},
)
# Cron 触发器
@scheduler.scheduled_job(CronTrigger(hour=2, minute=0))
async def daily_cleanup():
"""每天凌晨 2 点清理过期数据"""
await cleanup_expired_records()
# 间隔触发器
@scheduler.scheduled_job(IntervalTrigger(minutes=5))
async def health_check():
"""每 5 分钟健康检查"""
await check_all_services()
# 动态添加任务
def add_job(func, trigger, job_id: str, **kwargs):
scheduler.add_job(
func,
trigger=trigger,
id=job_id,
replace_existing=True,
**kwargs,
)
scheduler.start()
Celery Beat
scheduler/celery_beat.py
from celery import Celery
from celery.schedules import crontab
app = Celery("scheduler", broker="redis://localhost:6379/0")
app.conf.beat_schedule = {
"report-every-morning": {
"task": "tasks.generate_report",
"schedule": crontab(hour=8, minute=0),
"args": ("daily",),
},
"sync-every-hour": {
"task": "tasks.sync_data",
"schedule": 3600.0,
},
}
# 启动: celery -A scheduler beat --loglevel=info
# Worker: celery -A scheduler worker --loglevel=info
自定义调度器
scheduler/custom.py
import asyncio
import heapq
import time
from dataclasses import dataclass, field
from typing import Callable, Awaitable
@dataclass(order=True)
class ScheduledTask:
next_run: float
interval: float = field(compare=False)
func: Callable[[], Awaitable] = field(compare=False)
name: str = field(compare=False, default="")
class SimpleScheduler:
def __init__(self):
self.tasks: list[ScheduledTask] = []
self.running = False
def add_task(self, func: Callable, interval: float, name: str = ""):
task = ScheduledTask(
next_run=time.monotonic(),
interval=interval,
func=func,
name=name,
)
heapq.heappush(self.tasks, task)
async def run(self):
self.running = True
while self.running and self.tasks:
task = self.tasks[0]
now = time.monotonic()
if task.next_run > now:
await asyncio.sleep(task.next_run - now)
task = heapq.heappop(self.tasks)
try:
await task.func()
except Exception as e:
print(f"Task {task.name} failed: {e}")
# 重新调度
task.next_run = time.monotonic() + task.interval
heapq.heappush(self.tasks, task)
常见面试问题
Q1: APScheduler vs Celery Beat?
答案:
| 特性 | APScheduler | Celery Beat |
|---|---|---|
| 依赖 | 独立库 | 需 Celery |
| 动态任务 | 支持运行时增删 | 需 django-celery-beat |
| 持久化 | SQLAlchemy/Redis | 数据库 |
| 分布式 | 需额外处理 | 天然分布式 Worker |
| 适用 | 中小项目 | 已用 Celery 的项目 |
Q2: 分布式定时任务如何防止重复执行?
答案:
- 分布式锁:Redis SETNX 抢锁,只有一个节点执行
- 单节点调度:只部署一个 Beat 进程
- 数据库锁:SELECT FOR UPDATE 抢任务
def ensure_single_execution(task_id: str, ttl: int = 300):
"""Redis 分布式锁防重复"""
lock_key = f"scheduler:lock:{task_id}"
acquired = redis.set(lock_key, "1", nx=True, ex=ttl)
return acquired
Q3: 任务错过执行时间怎么办?
答案:
- coalesce:跳过中间缺失的,只执行最后一次
- misfire_grace_time:在容忍时间内仍然执行
- 补偿执行:记录执行日志,启动后检查并补执行