设计 ETL 数据管道
问题
如何用 Python 设计 ETL(Extract-Transform-Load)数据管道?Airflow 的核心概念是什么?
答案
架构
Airflow DAG
dags/daily_etl.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
default_args = {
"owner": "data-team",
"depends_on_past": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"email_on_failure": True,
}
dag = DAG(
"daily_user_etl",
default_args=default_args,
description="每日用户数据 ETL",
schedule_interval="0 2 * * *", # 每天凌晨 2 点
start_date=datetime(2024, 1, 1),
catchup=False,
)
def extract(**context):
"""从 MySQL 抽取数据"""
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://user:pass@host/db")
ds = context["ds"] # 逻辑日期
df = pd.read_sql(f"SELECT * FROM users WHERE date = '{ds}'", engine)
# 通过 XCom 传递数据
context["ti"].xcom_push(key="raw_data", value=df.to_json())
def transform(**context):
"""数据清洗与转换"""
import pandas as pd
raw = context["ti"].xcom_pull(key="raw_data", task_ids="extract")
df = pd.read_json(raw)
# 清洗
df = df.dropna(subset=["email"])
df["email"] = df["email"].str.lower()
df["created_date"] = pd.to_datetime(df["created_at"]).dt.date
context["ti"].xcom_push(key="clean_data", value=df.to_json())
def load(**context):
"""写入数据仓库"""
import pandas as pd
from sqlalchemy import create_engine
clean = context["ti"].xcom_pull(key="clean_data", task_ids="transform")
df = pd.read_json(clean)
engine = create_engine("postgresql://user:pass@warehouse/dw")
df.to_sql("dim_users", engine, if_exists="append", index=False)
# 定义任务依赖
t_extract = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
t_transform = PythonOperator(task_id="transform", python_callable=transform, dag=dag)
t_load = PythonOperator(task_id="load", python_callable=load, dag=dag)
t_notify = BashOperator(task_id="notify", bash_command='echo "ETL done"', dag=dag)
t_extract >> t_transform >> t_load >> t_notify
纯 Python ETL 管道
etl/pipeline.py
from dataclasses import dataclass
from typing import Callable, Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class Step:
name: str
func: Callable
retry: int = 0
class Pipeline:
def __init__(self, name: str):
self.name = name
self.steps: list[Step] = []
def add_step(self, name: str, func: Callable, retry: int = 0):
self.steps.append(Step(name=name, func=func, retry=retry))
return self
def run(self, data: Any = None) -> Any:
for step in self.steps:
for attempt in range(step.retry + 1):
try:
logger.info(f"[{step.name}] 开始执行 (attempt {attempt + 1})")
data = step.func(data)
break
except Exception as e:
if attempt == step.retry:
logger.error(f"[{step.name}] 最终失败: {e}")
raise
logger.warning(f"[{step.name}] 重试: {e}")
return data
# 使用
pipeline = Pipeline("user_etl")
pipeline.add_step("extract", extract_users, retry=2)
pipeline.add_step("transform", clean_users)
pipeline.add_step("load", load_to_warehouse, retry=1)
pipeline.run()
常见面试问题
Q1: Airflow 核心概念?
答案:
| 概念 | 说明 |
|---|---|
| DAG | 有向无环图,定义任务依赖 |
| Task | DAG 中的单个执行单元 |
| Operator | Task 的类型(Python/Bash/SQL) |
| XCom | 任务间数据传递 |
| Scheduler | 调度器,触发 DAG Run |
| Executor | 执行器(Local/Celery/Kubernetes) |
Q2: ETL vs ELT?
答案:
| 维度 | ETL | ELT |
|---|---|---|
| 转换位置 | 中间层 | 目标数据仓库内 |
| 适用场景 | 传统数据仓库 | 云数据仓库(BigQuery/Snowflake) |
| 性能 | 受中间层限制 | 利用数仓计算力 |
Q3: 大数据量 ETL 怎么优化?
答案:
- 分批处理:分页/分区读取,避免一次加载全量
- 并行执行:用 Airflow 动态生成并行任务
- 增量抽取:按时间戳或 CDC(Change Data Capture)
- 流式 ETL:用 Kafka + Flink/Spark Streaming 实时处理