设计文件存储服务
问题
如何用 Python 设计一个文件存储服务?如何实现大文件分片上传?
答案
架构
分片上传
storage/upload.py
import hashlib
import os
from pathlib import Path
from fastapi import UploadFile
CHUNK_DIR = Path("/tmp/chunks")
class ChunkedUploader:
def __init__(self, upload_id: str, total_chunks: int, filename: str):
self.upload_id = upload_id
self.total_chunks = total_chunks
self.filename = filename
self.chunk_dir = CHUNK_DIR / upload_id
self.chunk_dir.mkdir(parents=True, exist_ok=True)
async def upload_chunk(self, chunk_index: int, file: UploadFile) -> bool:
"""上传单个分片"""
chunk_path = self.chunk_dir / f"{chunk_index:06d}"
content = await file.read()
# 校验分片完整性
received_md5 = hashlib.md5(content).hexdigest()
chunk_path.write_bytes(content)
return True
def is_complete(self) -> bool:
"""检查所有分片是否到齐"""
uploaded = len(list(self.chunk_dir.iterdir()))
return uploaded >= self.total_chunks
async def merge(self) -> str:
"""合并分片为完整文件"""
output_path = CHUNK_DIR / f"merged_{self.filename}"
with open(output_path, "wb") as out:
for i in range(self.total_chunks):
chunk_path = self.chunk_dir / f"{i:06d}"
out.write(chunk_path.read_bytes())
# 计算文件 hash
file_hash = self._compute_hash(output_path)
# 上传到 S3
s3_key = await self._upload_to_s3(output_path, file_hash)
# 清理临时文件
self._cleanup()
return s3_key
def _compute_hash(self, path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
async def _upload_to_s3(self, path: Path, key: str) -> str:
import boto3
s3 = boto3.client("s3")
s3.upload_file(str(path), "my-bucket", f"files/{key}")
return f"files/{key}"
def _cleanup(self):
import shutil
shutil.rmtree(self.chunk_dir, ignore_errors=True)
FastAPI 接口
storage/api.py
import uuid
from fastapi import FastAPI, UploadFile, HTTPException
app = FastAPI()
uploads: dict[str, ChunkedUploader] = {}
@app.post("/upload/init")
def init_upload(filename: str, total_chunks: int, file_size: int):
upload_id = str(uuid.uuid4())
uploads[upload_id] = ChunkedUploader(upload_id, total_chunks, filename)
return {"upload_id": upload_id}
@app.post("/upload/chunk")
async def upload_chunk(upload_id: str, chunk_index: int, file: UploadFile):
uploader = uploads.get(upload_id)
if not uploader:
raise HTTPException(404, "Upload not found")
await uploader.upload_chunk(chunk_index, file)
if uploader.is_complete():
s3_key = await uploader.merge()
del uploads[upload_id]
return {"status": "complete", "key": s3_key}
return {"status": "partial", "chunk_index": chunk_index}
@app.get("/download/{file_key:path}")
async def download(file_key: str):
# 生成预签名 URL
import boto3
s3 = boto3.client("s3")
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": file_key},
ExpiresIn=3600,
)
return {"download_url": url}
秒传(Hash 去重)
storage/dedup.py
class FileService:
def check_duplicate(self, file_hash: str) -> str | None:
"""秒传:如果文件已存在,直接返回"""
existing = db.query(FileRecord).filter_by(hash=file_hash).first()
if existing:
return existing.s3_key
return None
常见面试问题
Q1: 分片上传 vs 整体上传?
答案:
| 方案 | 适用场景 | 优势 |
|---|---|---|
| 整体上传 | 小文件 <5MB | 简单 |
| 分片上传 | 大文件 | 断点续传、并发上传 |
| 预签名直传 | 绕过服务器 | 减轻服务端压力 |
Q2: 如何实现断点续传?
答案:
- 客户端让服务端返回已完成的分片列表
- 只上传缺失的分片
- 分片编号决定顺序
Q3: 文件安全如何保障?
答案:
- 访问控制:预签名 URL 带过期时间
- 传输加密:HTTPS
- 存储加密:S3 SSE-S3 / SSE-KMS
- 防盗链:CDN Referer 白名单