WebSocket
问题
Python 中如何实现 WebSocket 通信?有哪些库可选?
答案
websockets 库(最流行)
import asyncio
import websockets
# 服务端
async def handler(websocket):
async for message in websocket:
# 收到消息后回显
await websocket.send(f"Echo: {message}")
async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future() # 永不完成,保持运行
asyncio.run(main())
# 客户端
async with websockets.connect("ws://localhost:8765") as ws:
await ws.send("Hello")
response = await ws.recv()
print(response) # Echo: Hello
广播模式(聊天室)
connected: set[websockets.WebSocketServerProtocol] = set()
async def handler(websocket):
connected.add(websocket)
try:
async for message in websocket:
# 广播给所有连接的客户端
websockets.broadcast(connected, message)
finally:
connected.discard(websocket)
FastAPI WebSocket
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for ws in self.active:
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"[{room_id}] {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
常见面试问题
Q1: WebSocket 和 HTTP 轮询的区别?
答案:
| 方案 | 连接方式 | 实时性 | 资源消耗 |
|---|---|---|---|
| 短轮询 | 不断发 HTTP 请求 | 延迟高 | 高 |
| 长轮询 | HTTP 挂起等响应 | 中等 | 中 |
| SSE | 单向服务端推送 | 高 | 低 |
| WebSocket | 全双工持久连接 | 最高 | 最低 |
Q2: WebSocket 如何做心跳保活?
答案:
# websockets 库内置 ping/pong 心跳(默认 20 秒)
async with websockets.serve(
handler, "localhost", 8765,
ping_interval=20, # 每 20 秒发 ping
ping_timeout=10, # 10 秒内未收到 pong 则断开
):
await asyncio.Future()
Q3: WebSocket 如何做认证?
答案:
在握手阶段通过查询参数或 Cookie 认证:
async def handler(websocket):
# 从查询参数获取 token
token = websocket.path_params.get("token") # ws://host/ws?token=xxx
user = await verify_token(token)
if not user:
await websocket.close(1008, "Unauthorized")
return
# 认证通过,开始通信
async for message in websocket:
await websocket.send(f"Hello {user.name}: {message}")