跳到主要内容

WebSocket

问题

Python 中如何实现 WebSocket 通信?有哪些库可选?

答案

websockets 库(最流行)

import asyncio
import websockets

# 服务端
async def handler(websocket):
async for message in websocket:
# 收到消息后回显
await websocket.send(f"Echo: {message}")

async def main():
async with websockets.serve(handler, "localhost", 8765):
await asyncio.Future() # 永不完成,保持运行

asyncio.run(main())
# 客户端
async with websockets.connect("ws://localhost:8765") as ws:
await ws.send("Hello")
response = await ws.recv()
print(response) # Echo: Hello

广播模式(聊天室)

connected: set[websockets.WebSocketServerProtocol] = set()

async def handler(websocket):
connected.add(websocket)
try:
async for message in websocket:
# 广播给所有连接的客户端
websockets.broadcast(connected, message)
finally:
connected.discard(websocket)

FastAPI WebSocket

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

class ConnectionManager:
def __init__(self):
self.active: list[WebSocket] = []

async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)

def disconnect(self, ws: WebSocket):
self.active.remove(ws)

async def broadcast(self, message: str):
for ws in self.active:
await ws.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{room_id}")
async def websocket_endpoint(websocket: WebSocket, room_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"[{room_id}] {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)

常见面试问题

Q1: WebSocket 和 HTTP 轮询的区别?

答案

方案连接方式实时性资源消耗
短轮询不断发 HTTP 请求延迟高
长轮询HTTP 挂起等响应中等
SSE单向服务端推送
WebSocket全双工持久连接最高最低

Q2: WebSocket 如何做心跳保活?

答案

# websockets 库内置 ping/pong 心跳(默认 20 秒)
async with websockets.serve(
handler, "localhost", 8765,
ping_interval=20, # 每 20 秒发 ping
ping_timeout=10, # 10 秒内未收到 pong 则断开
):
await asyncio.Future()

Q3: WebSocket 如何做认证?

答案

在握手阶段通过查询参数或 Cookie 认证:

async def handler(websocket):
# 从查询参数获取 token
token = websocket.path_params.get("token") # ws://host/ws?token=xxx
user = await verify_token(token)
if not user:
await websocket.close(1008, "Unauthorized")
return
# 认证通过,开始通信
async for message in websocket:
await websocket.send(f"Hello {user.name}: {message}")

相关链接