设计搜索服务
问题
如何用 Python 设计一个搜索服务?Elasticsearch 的核心原理是什么?
答案
架构
Elasticsearch 操作
search/es_client.py
from elasticsearch import AsyncElasticsearch
es = AsyncElasticsearch(["http://localhost:9200"])
# 创建索引
async def create_index():
await es.indices.create(
index="articles",
body={
"settings": {
"analysis": {
"analyzer": {
"ik_smart_analyzer": {
"type": "custom",
"tokenizer": "ik_smart",
}
}
}
},
"mappings": {
"properties": {
"title": {"type": "text", "analyzer": "ik_smart_analyzer"},
"content": {"type": "text", "analyzer": "ik_max_word"},
"tags": {"type": "keyword"},
"created_at": {"type": "date"},
"author_id": {"type": "integer"},
}
},
},
)
# 索引文档
async def index_article(article: dict):
await es.index(index="articles", id=article["id"], body=article)
# 批量索引
async def bulk_index(articles: list[dict]):
from elasticsearch.helpers import async_bulk
actions = [
{"_index": "articles", "_id": a["id"], "_source": a}
for a in articles
]
await async_bulk(es, actions)
搜索服务
search/service.py
from dataclasses import dataclass
@dataclass
class SearchResult:
total: int
items: list[dict]
aggregations: dict
class SearchService:
async def search(
self,
query: str,
page: int = 1,
size: int = 20,
tags: list[str] | None = None,
sort_by: str = "_score",
) -> SearchResult:
body = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": query,
"fields": ["title^3", "content"], # title 权重 3 倍
"type": "best_fields",
}
}
],
"filter": [],
}
},
"highlight": {
"fields": {"title": {}, "content": {"fragment_size": 150}},
"pre_tags": ["<em>"],
"post_tags": ["</em>"],
},
"aggs": {
"tag_stats": {"terms": {"field": "tags", "size": 20}}
},
"from": (page - 1) * size,
"size": size,
}
# 标签过滤
if tags:
body["query"]["bool"]["filter"].append(
{"terms": {"tags": tags}}
)
result = await es.search(index="articles", body=body)
items = []
for hit in result["hits"]["hits"]:
item = hit["_source"]
item["score"] = hit["_score"]
item["highlight"] = hit.get("highlight", {})
items.append(item)
return SearchResult(
total=result["hits"]["total"]["value"],
items=items,
aggregations=result.get("aggregations", {}),
)
搜索建议(自动补全)
search/suggest.py
async def suggest(prefix: str, size: int = 5) -> list[str]:
"""基于 completion suggester 的搜索建议"""
result = await es.search(
index="articles",
body={
"suggest": {
"title_suggest": {
"prefix": prefix,
"completion": {
"field": "title.suggest",
"size": size,
"skip_duplicates": True,
},
}
}
},
)
options = result["suggest"]["title_suggest"][0]["options"]
return [opt["text"] for opt in options]
数据同步
search/sync.py
async def sync_from_mysql():
"""增量同步:基于更新时间"""
last_sync = get_last_sync_time()
articles = db.query(Article).filter(Article.updated_at > last_sync).all()
if articles:
await bulk_index([a.to_dict() for a in articles])
set_last_sync_time(articles[-1].updated_at)
常见面试问题
Q1: 倒排索引原理?
答案:
普通索引:文档 ID → 内容。倒排索引:关键词 → 文档 ID 列表。
"Python" → [doc1, doc3, doc7]
"异步" → [doc2, doc3]
搜索「Python 异步」→ 取交集 → [doc3]
Q2: 如何提高搜索相关性?
答案:
- 字段权重:
title^3标题匹配权重更高 - BM25 算法:ES 默认,考虑词频和文档长度
- 同义词扩展:配置同义词词典
- 拼音/纠错:拼音插件、fuzzy 查询
Q3: ES 与 MySQL 数据一致性?
答案:
| 方案 | 延迟 | 复杂度 |
|---|---|---|
| 同步双写 | 低 | 高(事务一致性难) |
| 异步消息 | 秒级 | 中(需 MQ) |
| 定时同步 | 分钟级 | 低 |
| Binlog 监听 | 秒级 | 中(Canal/Debezium) |