跳到主要内容

设计文件存储服务

问题

如何用 Python 设计一个文件存储服务?如何实现大文件分片上传?

答案

架构

分片上传

storage/upload.py
import hashlib
import os
from pathlib import Path
from fastapi import UploadFile

CHUNK_DIR = Path("/tmp/chunks")

class ChunkedUploader:
def __init__(self, upload_id: str, total_chunks: int, filename: str):
self.upload_id = upload_id
self.total_chunks = total_chunks
self.filename = filename
self.chunk_dir = CHUNK_DIR / upload_id
self.chunk_dir.mkdir(parents=True, exist_ok=True)

async def upload_chunk(self, chunk_index: int, file: UploadFile) -> bool:
"""上传单个分片"""
chunk_path = self.chunk_dir / f"{chunk_index:06d}"
content = await file.read()

# 校验分片完整性
received_md5 = hashlib.md5(content).hexdigest()
chunk_path.write_bytes(content)
return True

def is_complete(self) -> bool:
"""检查所有分片是否到齐"""
uploaded = len(list(self.chunk_dir.iterdir()))
return uploaded >= self.total_chunks

async def merge(self) -> str:
"""合并分片为完整文件"""
output_path = CHUNK_DIR / f"merged_{self.filename}"
with open(output_path, "wb") as out:
for i in range(self.total_chunks):
chunk_path = self.chunk_dir / f"{i:06d}"
out.write(chunk_path.read_bytes())

# 计算文件 hash
file_hash = self._compute_hash(output_path)
# 上传到 S3
s3_key = await self._upload_to_s3(output_path, file_hash)
# 清理临时文件
self._cleanup()
return s3_key

def _compute_hash(self, path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()

async def _upload_to_s3(self, path: Path, key: str) -> str:
import boto3
s3 = boto3.client("s3")
s3.upload_file(str(path), "my-bucket", f"files/{key}")
return f"files/{key}"

def _cleanup(self):
import shutil
shutil.rmtree(self.chunk_dir, ignore_errors=True)

FastAPI 接口

storage/api.py
import uuid
from fastapi import FastAPI, UploadFile, HTTPException

app = FastAPI()
uploads: dict[str, ChunkedUploader] = {}

@app.post("/upload/init")
def init_upload(filename: str, total_chunks: int, file_size: int):
upload_id = str(uuid.uuid4())
uploads[upload_id] = ChunkedUploader(upload_id, total_chunks, filename)
return {"upload_id": upload_id}

@app.post("/upload/chunk")
async def upload_chunk(upload_id: str, chunk_index: int, file: UploadFile):
uploader = uploads.get(upload_id)
if not uploader:
raise HTTPException(404, "Upload not found")
await uploader.upload_chunk(chunk_index, file)

if uploader.is_complete():
s3_key = await uploader.merge()
del uploads[upload_id]
return {"status": "complete", "key": s3_key}
return {"status": "partial", "chunk_index": chunk_index}

@app.get("/download/{file_key:path}")
async def download(file_key: str):
# 生成预签名 URL
import boto3
s3 = boto3.client("s3")
url = s3.generate_presigned_url(
"get_object",
Params={"Bucket": "my-bucket", "Key": file_key},
ExpiresIn=3600,
)
return {"download_url": url}

秒传(Hash 去重)

storage/dedup.py
class FileService:
def check_duplicate(self, file_hash: str) -> str | None:
"""秒传:如果文件已存在,直接返回"""
existing = db.query(FileRecord).filter_by(hash=file_hash).first()
if existing:
return existing.s3_key
return None

常见面试问题

Q1: 分片上传 vs 整体上传?

答案

方案适用场景优势
整体上传小文件 <5MB简单
分片上传大文件断点续传、并发上传
预签名直传绕过服务器减轻服务端压力

Q2: 如何实现断点续传?

答案

  1. 客户端让服务端返回已完成的分片列表
  2. 只上传缺失的分片
  3. 分片编号决定顺序

Q3: 文件安全如何保障?

答案

  • 访问控制:预签名 URL 带过期时间
  • 传输加密:HTTPS
  • 存储加密:S3 SSE-S3 / SSE-KMS
  • 防盗链:CDN Referer 白名单

相关链接