WebSocket 与 SSE 实时通信
问题
WebSocket 和 Server-Sent Events(SSE)有什么区别?如何选择和使用?
答案
现代 Web 应用常需要实时通信能力,主要有三种技术方案:轮询、WebSocket 和 SSE。
技术对比
| 特性 | 轮询 | 长轮询 | SSE | WebSocket |
|---|---|---|---|---|
| 通信方向 | 单向 | 单向 | 单向(服务端→客户端) | 双向 |
| 连接 | 短连接 | 长连接 | 长连接 | 长连接 |
| 协议 | HTTP | HTTP | HTTP | WS/WSS |
| 实时性 | 低 | 中 | 高 | 高 |
| 服务端复杂度 | 低 | 中 | 低 | 高 |
| 浏览器支持 | 全部 | 全部 | IE 不支持 | IE10+ |
WebSocket
连接过程
基本使用
// 创建连接
const ws = new WebSocket('wss://example.com/socket');
// 连接建立
ws.addEventListener('open', (event: Event) => {
console.log('连接已建立');
ws.send('Hello Server!');
});
// 接收消息
ws.addEventListener('message', (event: MessageEvent) => {
console.log('收到消息:', event.data);
});
// 连接关闭
ws.addEventListener('close', (event: CloseEvent) => {
console.log('连接关闭:', event.code, event.reason);
});
// 错误处理
ws.addEventListener('error', (event: Event) => {
console.error('WebSocket 错误:', event);
});
// 发送数据
ws.send('文本消息');
ws.send(JSON.stringify({ type: 'chat', content: 'hi' }));
ws.send(new Blob(['binary data']));
ws.send(new ArrayBuffer(8));
// 关闭连接
ws.close(1000, '正常关闭');
封装 WebSocket 类
type MessageHandler = (data: any) => void;
type ConnectionHandler = () => void;
interface WebSocketClientOptions {
url: string;
reconnect?: boolean;
reconnectInterval?: number;
maxReconnectAttempts?: number;
heartbeatInterval?: number;
}
class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private reconnect: boolean;
private reconnectInterval: number;
private maxReconnectAttempts: number;
private reconnectAttempts: number = 0;
private heartbeatInterval: number;
private heartbeatTimer: number | null = null;
private messageHandlers: Set<MessageHandler> = new Set();
private isManualClose: boolean = false;
constructor(options: WebSocketClientOptions) {
this.url = options.url;
this.reconnect = options.reconnect ?? true;
this.reconnectInterval = options.reconnectInterval ?? 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
this.heartbeatInterval = options.heartbeatInterval ?? 30000;
}
connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.isManualClose = false;
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket 连接成功');
this.reconnectAttempts = 0;
this.startHeartbeat();
resolve();
};
this.ws.onmessage = (event) => {
let data = event.data;
try {
data = JSON.parse(event.data);
} catch {
// 保持原始数据
}
// 心跳响应
if (data.type === 'pong') return;
this.messageHandlers.forEach(handler => handler(data));
};
this.ws.onclose = (event) => {
console.log('WebSocket 关闭:', event.code);
this.stopHeartbeat();
if (!this.isManualClose && this.reconnect) {
this.tryReconnect();
}
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
reject(error);
};
});
}
private startHeartbeat(): void {
this.heartbeatTimer = window.setInterval(() => {
this.send({ type: 'ping' });
}, this.heartbeatInterval);
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
private tryReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('达到最大重连次数,停止重连');
return;
}
this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);
setTimeout(() => {
this.connect();
}, this.reconnectInterval);
}
send(data: any): void {
if (this.ws?.readyState === WebSocket.OPEN) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
this.ws.send(message);
} else {
console.warn('WebSocket 未连接');
}
}
onMessage(handler: MessageHandler): () => void {
this.messageHandlers.add(handler);
return () => this.messageHandlers.delete(handler);
}
close(): void {
this.isManualClose = true;
this.stopHeartbeat();
this.ws?.close(1000, '客户端关闭');
}
get readyState(): number {
return this.ws?.readyState ?? WebSocket.CLOSED;
}
}
// 使用
const client = new WebSocketClient({
url: 'wss://example.com/ws',
reconnect: true,
heartbeatInterval: 30000,
});
await client.connect();
const unsubscribe = client.onMessage((data) => {
console.log('收到:', data);
});
client.send({ type: 'subscribe', channel: 'news' });
服务端实现(Node.js)
import { WebSocketServer, WebSocket } from 'ws';
const wss = new WebSocketServer({ port: 8080 });
// 存储所有连接
const clients = new Set<WebSocket>();
wss.on('connection', (ws: WebSocket) => {
console.log('新连接');
clients.add(ws);
ws.on('message', (message: Buffer) => {
const data = JSON.parse(message.toString());
// 心跳响应
if (data.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
return;
}
console.log('收到消息:', data);
// 广播给所有客户端
broadcast(data);
});
ws.on('close', () => {
clients.delete(ws);
console.log('连接关闭');
});
ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
});
function broadcast(data: any): void {
const message = JSON.stringify(data);
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}
Server-Sent Events (SSE)
SSE 是一种单向的服务端推送技术,使用标准 HTTP 协议。
基本使用
// 创建 SSE 连接
const eventSource = new EventSource('/api/events');
// 接收消息
eventSource.addEventListener('message', (event: MessageEvent) => {
console.log('收到消息:', event.data);
});
// 自定义事件
eventSource.addEventListener('notification', (event: MessageEvent) => {
const data = JSON.parse(event.data);
console.log('通知:', data);
});
// 连接打开
eventSource.addEventListener('open', () => {
console.log('SSE 连接已建立');
});
// 错误处理
eventSource.addEventListener('error', (event: Event) => {
console.error('SSE 错误:', event);
if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
});
// 关闭连接
eventSource.close();
封装 SSE 类
interface SSEClientOptions {
url: string;
withCredentials?: boolean;
retry?: number;
}
type SSEHandler = (data: any) => void;
class SSEClient {
private eventSource: EventSource | null = null;
private url: string;
private options: EventSourceInit;
private handlers: Map<string, Set<SSEHandler>> = new Map();
private isConnected: boolean = false;
constructor(options: SSEClientOptions) {
this.url = options.url;
this.options = {
withCredentials: options.withCredentials ?? false,
};
}
connect(): void {
this.eventSource = new EventSource(this.url, this.options);
this.eventSource.onopen = () => {
this.isConnected = true;
console.log('SSE 连接成功');
};
this.eventSource.onerror = () => {
this.isConnected = false;
console.log('SSE 连接错误,将自动重连');
};
// 默认消息处理
this.eventSource.onmessage = (event) => {
this.dispatch('message', event.data);
};
}
on(event: string, handler: SSEHandler): () => void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
// 添加自定义事件监听
this.eventSource?.addEventListener(event, (e: Event) => {
const messageEvent = e as MessageEvent;
this.dispatch(event, messageEvent.data);
});
}
this.handlers.get(event)!.add(handler);
return () => {
this.handlers.get(event)?.delete(handler);
};
}
private dispatch(event: string, data: string): void {
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}
this.handlers.get(event)?.forEach(handler => handler(parsedData));
}
close(): void {
this.eventSource?.close();
this.isConnected = false;
this.handlers.clear();
}
get connected(): boolean {
return this.isConnected;
}
}
// 使用
const sse = new SSEClient({
url: '/api/events',
withCredentials: true,
});
sse.connect();
sse.on('message', (data) => {
console.log('消息:', data);
});
sse.on('notification', (data) => {
showNotification(data);
});
服务端实现(Node.js)
import express, { Request, Response } from 'express';
const app = express();
app.get('/api/events', (req: Request, res: Response) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');
// 发送初始消息
res.write('data: {"type": "connected"}\n\n');
// 定时发送消息
const interval = setInterval(() => {
const data = {
time: new Date().toISOString(),
message: 'Hello from server',
};
// SSE 格式: "event: name\ndata: content\n\n"
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 1000);
// 发送自定义事件
setTimeout(() => {
res.write(`event: notification\n`);
res.write(`data: {"title": "New notification"}\n\n`);
}, 5000);
// 客户端断开连接
req.on('close', () => {
clearInterval(interval);
console.log('客户端断开连接');
});
});
app.listen(3000);
SSE 消息格式
# 基本消息
data: Hello World
# 多行数据
data: line 1
data: line 2
# JSON 数据
data: {"name": "Alice", "age": 25}
# 自定义事件
event: notification
data: {"message": "You have a new message"}
# 消息 ID(用于断线重连)
id: 123
data: message with id
# 重试间隔(毫秒)
retry: 5000
data: reconnect in 5 seconds
长轮询
class LongPolling {
private url: string;
private isPolling: boolean = false;
private abortController: AbortController | null = null;
private handlers: Set<(data: any) => void> = new Set();
constructor(url: string) {
this.url = url;
}
start(): void {
this.isPolling = true;
this.poll();
}
private async poll(): Promise<void> {
while (this.isPolling) {
try {
this.abortController = new AbortController();
const response = await fetch(this.url, {
signal: this.abortController.signal,
});
if (response.ok) {
const data = await response.json();
this.handlers.forEach(handler => handler(data));
}
} catch (error) {
if ((error as Error).name !== 'AbortError') {
console.error('轮询错误:', error);
// 错误后延迟重试
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}
onMessage(handler: (data: any) => void): () => void {
this.handlers.add(handler);
return () => this.handlers.delete(handler);
}
stop(): void {
this.isPolling = false;
this.abortController?.abort();
}
}
// 使用
const polling = new LongPolling('/api/poll');
polling.onMessage((data) => {
console.log('收到数据:', data);
});
polling.start();
选型建议
场景推荐
| 场景 | 推荐技术 | 原因 |
|---|---|---|
| 聊天应用 | WebSocket | 双向实时通信 |
| 实时数据看板 | SSE | 单向推送,简单 |
| 股票行情 | WebSocket | 高频双向 |
| 通知推送 | SSE | 单向推送 |
| 在线协作 | WebSocket | 双向实时 |
| 日志监控 | SSE | 持续单向 |
Socket.IO
Socket.IO 是一个封装了 WebSocket 的库,提供了更多功能:
// 客户端
import { io, Socket } from 'socket.io-client';
const socket: Socket = io('http://localhost:3000', {
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionAttempts: 5,
});
// 连接事件
socket.on('connect', () => {
console.log('连接成功:', socket.id);
});
// 发送消息
socket.emit('chat', { message: 'Hello' });
// 接收消息
socket.on('chat', (data: any) => {
console.log('收到:', data);
});
// 带确认的消息
socket.emit('message', { text: 'hi' }, (response: any) => {
console.log('服务端确认:', response);
});
// 加入房间
socket.emit('join', { room: 'room1' });
// 断开连接
socket.on('disconnect', (reason: string) => {
console.log('断开连接:', reason);
});
// 服务端
import { Server, Socket } from 'socket.io';
const io = new Server(3000, {
cors: { origin: '*' },
});
io.on('connection', (socket: Socket) => {
console.log('新连接:', socket.id);
// 接收消息
socket.on('chat', (data: any) => {
// 广播给所有人
io.emit('chat', data);
});
// 加入房间
socket.on('join', ({ room }: { room: string }) => {
socket.join(room);
// 向房间广播
io.to(room).emit('userJoined', { userId: socket.id });
});
// 带确认的消息
socket.on('message', (data: any, callback: Function) => {
console.log('收到消息:', data);
callback({ status: 'ok' });
});
socket.on('disconnect', () => {
console.log('断开连接');
});
});
Protocol Buffers(protobuf)
Protocol Buffers 是 Google 开发的一种语言无关、平台无关的二进制序列化格式,常与 WebSocket 搭配使用来提升传输效率。
JSON vs Protocol Buffers
| 特性 | JSON | Protocol Buffers |
|---|---|---|
| 格式 | 文本(人类可读) | 二进制(不可读) |
| 体积 | 较大(含字段名) | 小 3-10 倍 |
| 解析速度 | 较慢 | 快 20-100 倍 |
| Schema | 无(可选 JSON Schema) | 必须定义 .proto 文件 |
| 类型安全 | 弱 | 强(编译时检查) |
| 调试 | 方便(直接可读) | 不便(需工具解码) |
| 浏览器支持 | 原生 | 需要库(protobuf.js) |
何时使用 protobuf
- 高频消息传输:如直播弹幕、实时行情、游戏状态同步(每秒数十到数百条消息)
- 带宽敏感场景:移动端弱网、大规模推送
- 强类型需求:前后端共享 Schema,减少联调成本
- JSON 足够好的场景:低频通信、调试优先、快速原型
定义 Schema(.proto 文件)
protobuf 的核心是 .proto 文件,定义消息的结构和字段类型:
proto/chat.proto
// 指定 proto3 语法
syntax = "proto3";
// 包名,避免命名冲突
package chat;
// 聊天消息
message ChatMessage {
string id = 1; // 字段编号 1,唯一标识(不是值)
string sender = 2;
string content = 3;
int64 timestamp = 4; // Unix 时间戳(毫秒)
MessageType type = 5;
}
// 枚举类型
enum MessageType {
TEXT = 0; // 枚举第一个值必须为 0
IMAGE = 1;
VIDEO = 2;
SYSTEM = 3;
}
// 嵌套消息
message ChatRoom {
string room_id = 1;
string name = 2;
repeated ChatMessage messages = 3; // repeated = 数组
map<string, string> metadata = 4; // map 类型
}
// WebSocket 通信的统一消息包装
message WebSocketFrame {
string event = 1; // 事件名
oneof payload { // oneof = 联合类型,同时只有一个字段有值
ChatMessage chat = 2;
JoinRequest join = 3;
HeartbeatPing ping = 4;
}
}
message JoinRequest {
string room_id = 1;
string user_id = 2;
}
message HeartbeatPing {
int64 timestamp = 1;
}
protobuf 字段编号规则
- 字段编号是唯一标识,序列化时用编号而非字段名,这是 protobuf 体积小的关键原因
- 1-15 编号只占 1 字节,高频字段应使用小编号
- 16-2047 占 2 字节
- 编号一旦使用不可更改,删除字段后编号也不能复用(用
reserved标记) - 这种设计允许向前/向后兼容:新增字段不影响旧客户端解析
前端使用(protobuf.js)
前端最常用的库是 protobuf.js,支持从 .proto 文件生成 TypeScript 代码:
- npm
- Yarn
- pnpm
- Bun
npm install protobufjs
yarn add protobufjs
pnpm add protobufjs
bun add protobufjs
方式一:静态生成(推荐用于生产环境)
先用 pbjs / pbts 从 .proto 文件生成 JS + TS 类型代码:
# 生成 JS 代码(静态模块)
npx pbjs -t static-module -w es6 -o src/proto/chat.js proto/chat.proto
# 生成 TypeScript 类型定义
npx pbts -o src/proto/chat.d.ts src/proto/chat.js
src/ws-client.ts
// 导入生成的 protobuf 模块
import { chat } from './proto/chat';
// ====== 编码:对象 → 二进制 ======
// 1. 创建消息对象
const message = chat.ChatMessage.create({
id: 'msg-001',
sender: 'user-123',
content: '你好!',
timestamp: Date.now(),
type: chat.MessageType.TEXT,
});
// 2. 验证消息(可选,开发阶段推荐)
const error = chat.ChatMessage.verify(message);
if (error) throw new Error(`消息验证失败: ${error}`);
// 3. 编码为二进制(Uint8Array)
const buffer: Uint8Array = chat.ChatMessage.encode(message).finish();
console.log('JSON 大小:', JSON.stringify(message).length, '字节');
console.log('Protobuf 大小:', buffer.length, '字节');
// 典型结果:JSON ~120 字节,Protobuf ~45 字节
// ====== 解码:二进制 → 对象 ======
const decoded = chat.ChatMessage.decode(buffer);
console.log(decoded.content); // "你好!"
console.log(decoded.type); // 0 (TEXT)
方式二:动态加载(适合开发/快速原型)
src/dynamic-proto.ts
import protobuf from 'protobufjs';
// 运行时加载 .proto 文件
const root = await protobuf.load('proto/chat.proto');
// 获取消息类型
const ChatMessage = root.lookupType('chat.ChatMessage');
// 编码
const payload = { id: '1', sender: 'Alice', content: 'Hi', timestamp: Date.now() };
const buffer = ChatMessage.encode(ChatMessage.create(payload)).finish();
// 解码
const decoded = ChatMessage.decode(buffer);
console.log(ChatMessage.toObject(decoded)); // 转为普通对象
WebSocket + protobuf 完整示例
src/protobuf-ws-client.ts
import { chat } from './proto/chat';
class ProtobufWebSocket {
private ws: WebSocket | null = null;
private handlers = new Map<string, Set<(data: any) => void>>();
connect(url: string): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(url);
this.ws.binaryType = 'arraybuffer'; // 关键:指定接收二进制数据
this.ws.onopen = () => resolve();
this.ws.onerror = () => reject(new Error('连接失败'));
this.ws.onmessage = (event: MessageEvent) => {
// 接收到的是 ArrayBuffer,需要转为 Uint8Array 再解码
const frame = chat.WebSocketFrame.decode(
new Uint8Array(event.data as ArrayBuffer)
);
// 根据 event 字段分发到对应的处理器
const handlers = this.handlers.get(frame.event);
handlers?.forEach((handler) => handler(frame[frame.payload!]));
};
});
}
// 发送 protobuf 编码的消息
sendChat(message: chat.IChatMessage): void {
const frame = chat.WebSocketFrame.create({
event: 'chat',
chat: message,
});
const buffer = chat.WebSocketFrame.encode(frame).finish();
this.ws?.send(buffer); // 直接发送二进制
}
sendJoin(roomId: string, userId: string): void {
const frame = chat.WebSocketFrame.create({
event: 'join',
join: { roomId, userId },
});
this.ws?.send(chat.WebSocketFrame.encode(frame).finish());
}
on(event: string, handler: (data: any) => void): () => void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
}
this.handlers.get(event)!.add(handler);
return () => this.handlers.get(event)?.delete(handler);
}
close(): void {
this.ws?.close(1000);
}
}
// 使用
const client = new ProtobufWebSocket();
await client.connect('wss://example.com/ws');
client.on('chat', (msg: chat.IChatMessage) => {
console.log(`${msg.sender}: ${msg.content}`);
});
client.sendJoin('room-1', 'user-123');
client.sendChat({
id: crypto.randomUUID(),
sender: 'user-123',
content: '大家好!',
timestamp: Date.now(),
type: chat.MessageType.TEXT,
});
服务端解析(Node.js)
server/ws-server.ts
import { WebSocketServer, WebSocket } from 'ws';
import { chat } from './proto/chat';
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', (ws: WebSocket) => {
ws.on('message', (data: Buffer) => {
const frame = chat.WebSocketFrame.decode(new Uint8Array(data));
switch (frame.event) {
case 'chat':
console.log('聊天消息:', frame.chat?.content);
// 广播给所有客户端
const response = chat.WebSocketFrame.encode(frame).finish();
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(response);
}
});
break;
case 'join':
console.log(`用户 ${frame.join?.userId} 加入房间 ${frame.join?.roomId}`);
break;
}
});
});
protobuf 编码原理
protobuf 之所以体积小、速度快,核心在于 Varint 编码 + Tag-Value 格式:
编码格式示意
// JSON 编码 → 文本,包含字段名
// {"id":"1","sender":"Alice","content":"Hi"} // 43 字节
// protobuf 编码 → 二进制,用字段编号代替字段名
// [0a 01 31] [12 05 41 6c 69 63 65] [1a 02 48 69]
// ↑ 字段1 ↑ 字段2 ↑ 字段3
// tag+长度+值 tag+长度+"Alice" tag+长度+"Hi"
// 共 16 字节,节省 ~63%
protobuf 使用注意事项
- 调试不便:二进制数据在 DevTools Network 面板中不可读,建议开发环境提供 JSON fallback 或使用 protobuf-decoder 工具
- Schema 同步:前后端必须使用相同版本的
.proto文件,建议放在共享仓库或 npm 包中 - 字段兼容性:永远不要修改已有字段的编号或类型,只能新增字段或用
reserved标记废弃字段 - 默认值:proto3 中所有字段都有默认值(string→
""、int→0、bool→false),值为默认值时不会被序列化,这意味着你无法区分"字段值为 0"和"字段未设置" - 包体积:
protobufjs完整版约 80KB(gzip 后 ~20KB),如果只需编解码可用轻量版protobufjs/light
常见面试问题
Q1: WebSocket 和 HTTP 有什么区别?
答案:
| 特性 | WebSocket | HTTP |
|---|---|---|
| 连接 | 持久连接 | 短连接 |
| 通信 | 全双工 | 请求-响应 |
| 头部 | 轻量 | 每次请求带完整头部 |
| 协议 | ws/wss | http/https |
| 发起方 | 双方都可 | 只能客户端 |
// HTTP: 每次请求都要建立连接
fetch('/api/data'); // 连接 → 请求 → 响应 → 关闭
// WebSocket: 一次连接,持续通信
const ws = new WebSocket('wss://...');
ws.send('message1');
ws.send('message2'); // 复用同一连接
Q2: WebSocket 如何实现心跳检测?
答案:
class HeartbeatWebSocket {
private ws: WebSocket;
private heartbeatTimer: number | null = null;
private pongTimeout: number | null = null;
connect(url: string): void {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
this.startHeartbeat();
};
this.ws.onmessage = (e) => {
if (e.data === 'pong') {
this.clearPongTimeout();
}
};
this.ws.onclose = () => {
this.stopHeartbeat();
};
}
private startHeartbeat(): void {
this.heartbeatTimer = window.setInterval(() => {
this.ws.send('ping');
// 等待 pong 响应
this.pongTimeout = window.setTimeout(() => {
console.log('心跳超时,断开连接');
this.ws.close();
}, 5000);
}, 30000);
}
private clearPongTimeout(): void {
if (this.pongTimeout) {
clearTimeout(this.pongTimeout);
this.pongTimeout = null;
}
}
private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
this.clearPongTimeout();
}
}
Q3: SSE 和 WebSocket 如何选择?
答案:
| 需求 | SSE | WebSocket |
|---|---|---|
| 单向推送 | ✅ 推荐 | 可以 |
| 双向通信 | ❌ | ✅ 必须 |
| 简单实现 | ✅ HTTP 协议 | 需要专门服务 |
| 自动重连 | ✅ 内置 | 需要自己实现 |
| 二进制数据 | ❌ 仅文本 | ✅ 支持 |
| 高频消息 | 一般 | ✅ 更适合 |
Q4: WebSocket 断线重连怎么实现?
答案:
class ReconnectingWebSocket {
private url: string;
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxAttempts = 5;
private baseDelay = 1000;
connect(): void {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
this.reconnectAttempts = 0;
};
this.ws.onclose = (e) => {
if (e.code !== 1000) { // 非正常关闭
this.reconnect();
}
};
}
private reconnect(): void {
if (this.reconnectAttempts >= this.maxAttempts) {
console.log('达到最大重连次数');
return;
}
// 指数退避
const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts);
this.reconnectAttempts++;
setTimeout(() => {
console.log(`重连中... (${this.reconnectAttempts})`);
this.connect();
}, delay);
}
}
Q5: 如何处理 WebSocket 消息顺序?
答案:
interface OrderedMessage {
seq: number; // 序列号
data: any;
}
class OrderedWebSocket {
private expectedSeq = 0;
private buffer: Map<number, any> = new Map();
handleMessage(msg: OrderedMessage): void {
if (msg.seq === this.expectedSeq) {
// 顺序正确,处理消息
this.process(msg.data);
this.expectedSeq++;
// 处理缓冲区中的后续消息
while (this.buffer.has(this.expectedSeq)) {
this.process(this.buffer.get(this.expectedSeq));
this.buffer.delete(this.expectedSeq);
this.expectedSeq++;
}
} else if (msg.seq > this.expectedSeq) {
// 消息提前到达,缓冲
this.buffer.set(msg.seq, msg.data);
}
// seq < expectedSeq: 重复消息,丢弃
}
private process(data: any): void {
console.log('处理消息:', data);
}
}
Q6: WebSocket 为什么要用 protobuf 而不是 JSON?
答案:
JSON 是文本格式,每条消息都要携带完整的字段名,且解析需要逐字符处理。protobuf 是二进制格式,用字段编号代替字段名,用 Varint 变长编码压缩整数,体积通常只有 JSON 的 1/3~1/10,解析速度快 20-100 倍。
// 同一条消息的体积对比
const message = { id: 1, sender: 'Alice', content: 'Hello', timestamp: 1711180800000 };
// JSON: '{"id":1,"sender":"Alice","content":"Hello","timestamp":1711180800000}'
// → 70 字节,包含字段名 "id"、"sender" 等冗余文本
// Protobuf: 二进制编码
// → ~28 字节,字段名替换为 1 字节的编号,整数用 Varint 压缩
适合 protobuf 的场景:直播弹幕(每秒数百条)、实时行情、游戏同步等高频 + 带宽敏感场景。对于低频 API 调用或需要快速调试的场景,JSON 更合适。
Q7: protobuf 如何实现前后端向后兼容?
答案:
protobuf 的兼容性依赖于字段编号不变这一原则:
message User {
string name = 1;
int32 age = 2;
// 新增字段:旧客户端解析时会忽略未知编号
string email = 3;
// 删除字段:用 reserved 防止编号被复用
reserved 4; // 编号 4 不可再使用
reserved "phone"; // 字段名 "phone" 不可再使用
}
- 新增字段:旧客户端遇到未知编号会自动跳过,不会报错
- 删除字段:用
reserved标记,防止后续开发者误用相同编号导致解析错误 - 修改字段类型或编号:绝对不行,会导致解析错误
- 这种设计使得前后端可以独立发版,只要不破坏已有字段