跳到主要内容

数据管道问题排查

问题

Python 数据处理管道中常见的性能和准确性问题有哪些?如何排查和优化?

答案

常见问题

大数据内存优化

pipeline/memory.py
import pandas as pd

# ❌ 一次加载全量(内存爆炸)
df = pd.read_csv("huge_file.csv") # 10GB 文件直接 OOM

# ✅ 分块读取
def process_in_chunks(filepath: str, chunk_size: int = 10_000):
results = []
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# 每个 chunk 是 DataFrame
processed = transform(chunk)
results.append(processed)
return pd.concat(results, ignore_index=True)


# ✅ 指定数据类型减少内存
dtypes = {
"user_id": "int32", # 默认 int64,节省 50%
"amount": "float32",
"status": "category", # 枚举值用 category
}
df = pd.read_csv("data.csv", dtype=dtypes)


# ✅ 用 Polars 替代 Pandas(更快更省内存)
import polars as pl

df = pl.scan_csv("huge_file.csv") # 延迟加载
result = (
df.filter(pl.col("status") == "active")
.group_by("category")
.agg(pl.col("amount").sum())
.collect() # 最终执行
)

数据质量检查

pipeline/quality.py
import pandas as pd
from dataclasses import dataclass

@dataclass
class QualityReport:
total_rows: int
null_counts: dict
duplicate_count: int
anomalies: list[str]

def check_quality(df: pd.DataFrame) -> QualityReport:
anomalies = []

# 空值检查
null_counts = df.isnull().sum().to_dict()
for col, count in null_counts.items():
if count > len(df) * 0.1: # 空值超过 10%
anomalies.append(f"列 {col} 空值率 {count/len(df):.1%}")

# 重复行检查
dup_count = df.duplicated().sum()
if dup_count > 0:
anomalies.append(f"发现 {dup_count} 行重复数据")

# 异常值检查(IQR 方法)
for col in df.select_dtypes(include="number"):
q1, q3 = df[col].quantile([0.25, 0.75])
iqr = q3 - q1
outliers = ((df[col] < q1 - 1.5 * iqr) | (df[col] > q3 + 1.5 * iqr)).sum()
if outliers > 0:
anomalies.append(f"列 {col}{outliers} 个异常值")

return QualityReport(
total_rows=len(df),
null_counts=null_counts,
duplicate_count=dup_count,
anomalies=anomalies,
)

幂等性保证

pipeline/idempotent.py
import hashlib

def idempotent_load(df: pd.DataFrame, table: str, engine):
"""幂等写入:按日期分区覆盖"""
date = df["date"].iloc[0]
# 先删除该分区
engine.execute(f"DELETE FROM {table} WHERE date = :date", {"date": date})
# 再插入
df.to_sql(table, engine, if_exists="append", index=False)


def dedup_by_key(df: pd.DataFrame, keys: list[str]) -> pd.DataFrame:
"""按业务主键去重(保留最新)"""
return df.sort_values("updated_at").drop_duplicates(subset=keys, keep="last")

常见面试问题

Q1: Pandas vs Polars vs Dask?

答案

数据量特点
Pandas<1GB最通用,生态最好
Polars<100GB快、延迟执行、Rust 实现
Dask>100GB分布式 Pandas,集群计算
PySparkTB级Spark 生态,集群必选

Q2: 管道失败如何重试?

答案

  • 幂等设计:重跑不会产生重复数据
  • 检查点:每一步完成后记录状态,失败从断点恢复
  • Airflow 重试retries=3, retry_delay=timedelta(minutes=5)

Q3: 如何监控数据管道?

答案

  1. 处理行数、成功率指标(Prometheus)
  2. 执行时间趋势(Grafana 看板)
  3. 数据质量报告(自动邮件)
  4. 异常告警(Slack/钉钉通知)

相关链接