跳到主要内容

WebSocket 与 SSE 实时通信

问题

WebSocket 和 Server-Sent Events(SSE)有什么区别?如何选择和使用?

答案

现代 Web 应用常需要实时通信能力,主要有三种技术方案:轮询WebSocketSSE

技术对比

特性轮询长轮询SSEWebSocket
通信方向单向单向单向(服务端→客户端)双向
连接短连接长连接长连接长连接
协议HTTPHTTPHTTPWS/WSS
实时性
服务端复杂度
浏览器支持全部全部IE 不支持IE10+

WebSocket

连接过程

基本使用

// 创建连接
const ws = new WebSocket('wss://example.com/socket');

// 连接建立
ws.addEventListener('open', (event: Event) => {
console.log('连接已建立');
ws.send('Hello Server!');
});

// 接收消息
ws.addEventListener('message', (event: MessageEvent) => {
console.log('收到消息:', event.data);
});

// 连接关闭
ws.addEventListener('close', (event: CloseEvent) => {
console.log('连接关闭:', event.code, event.reason);
});

// 错误处理
ws.addEventListener('error', (event: Event) => {
console.error('WebSocket 错误:', event);
});

// 发送数据
ws.send('文本消息');
ws.send(JSON.stringify({ type: 'chat', content: 'hi' }));
ws.send(new Blob(['binary data']));
ws.send(new ArrayBuffer(8));

// 关闭连接
ws.close(1000, '正常关闭');

封装 WebSocket 类

type MessageHandler = (data: any) => void;
type ConnectionHandler = () => void;

interface WebSocketClientOptions {
url: string;
reconnect?: boolean;
reconnectInterval?: number;
maxReconnectAttempts?: number;
heartbeatInterval?: number;
}

class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private reconnect: boolean;
private reconnectInterval: number;
private maxReconnectAttempts: number;
private reconnectAttempts: number = 0;
private heartbeatInterval: number;
private heartbeatTimer: number | null = null;
private messageHandlers: Set<MessageHandler> = new Set();
private isManualClose: boolean = false;

constructor(options: WebSocketClientOptions) {
this.url = options.url;
this.reconnect = options.reconnect ?? true;
this.reconnectInterval = options.reconnectInterval ?? 3000;
this.maxReconnectAttempts = options.maxReconnectAttempts ?? 5;
this.heartbeatInterval = options.heartbeatInterval ?? 30000;
}

connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.isManualClose = false;
this.ws = new WebSocket(this.url);

this.ws.onopen = () => {
console.log('WebSocket 连接成功');
this.reconnectAttempts = 0;
this.startHeartbeat();
resolve();
};

this.ws.onmessage = (event) => {
let data = event.data;
try {
data = JSON.parse(event.data);
} catch {
// 保持原始数据
}

// 心跳响应
if (data.type === 'pong') return;

this.messageHandlers.forEach(handler => handler(data));
};

this.ws.onclose = (event) => {
console.log('WebSocket 关闭:', event.code);
this.stopHeartbeat();

if (!this.isManualClose && this.reconnect) {
this.tryReconnect();
}
};

this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
reject(error);
};
});
}

private startHeartbeat(): void {
this.heartbeatTimer = window.setInterval(() => {
this.send({ type: 'ping' });
}, this.heartbeatInterval);
}

private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}

private tryReconnect(): void {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('达到最大重连次数,停止重连');
return;
}

this.reconnectAttempts++;
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`);

setTimeout(() => {
this.connect();
}, this.reconnectInterval);
}

send(data: any): void {
if (this.ws?.readyState === WebSocket.OPEN) {
const message = typeof data === 'string' ? data : JSON.stringify(data);
this.ws.send(message);
} else {
console.warn('WebSocket 未连接');
}
}

onMessage(handler: MessageHandler): () => void {
this.messageHandlers.add(handler);
return () => this.messageHandlers.delete(handler);
}

close(): void {
this.isManualClose = true;
this.stopHeartbeat();
this.ws?.close(1000, '客户端关闭');
}

get readyState(): number {
return this.ws?.readyState ?? WebSocket.CLOSED;
}
}

// 使用
const client = new WebSocketClient({
url: 'wss://example.com/ws',
reconnect: true,
heartbeatInterval: 30000,
});

await client.connect();

const unsubscribe = client.onMessage((data) => {
console.log('收到:', data);
});

client.send({ type: 'subscribe', channel: 'news' });

服务端实现(Node.js)

import { WebSocketServer, WebSocket } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

// 存储所有连接
const clients = new Set<WebSocket>();

wss.on('connection', (ws: WebSocket) => {
console.log('新连接');
clients.add(ws);

ws.on('message', (message: Buffer) => {
const data = JSON.parse(message.toString());

// 心跳响应
if (data.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }));
return;
}

console.log('收到消息:', data);

// 广播给所有客户端
broadcast(data);
});

ws.on('close', () => {
clients.delete(ws);
console.log('连接关闭');
});

ws.on('error', (error) => {
console.error('WebSocket 错误:', error);
});
});

function broadcast(data: any): void {
const message = JSON.stringify(data);
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(message);
}
});
}

Server-Sent Events (SSE)

SSE 是一种单向的服务端推送技术,使用标准 HTTP 协议。

基本使用

// 创建 SSE 连接
const eventSource = new EventSource('/api/events');

// 接收消息
eventSource.addEventListener('message', (event: MessageEvent) => {
console.log('收到消息:', event.data);
});

// 自定义事件
eventSource.addEventListener('notification', (event: MessageEvent) => {
const data = JSON.parse(event.data);
console.log('通知:', data);
});

// 连接打开
eventSource.addEventListener('open', () => {
console.log('SSE 连接已建立');
});

// 错误处理
eventSource.addEventListener('error', (event: Event) => {
console.error('SSE 错误:', event);

if (eventSource.readyState === EventSource.CLOSED) {
console.log('连接已关闭');
}
});

// 关闭连接
eventSource.close();

封装 SSE 类

interface SSEClientOptions {
url: string;
withCredentials?: boolean;
retry?: number;
}

type SSEHandler = (data: any) => void;

class SSEClient {
private eventSource: EventSource | null = null;
private url: string;
private options: EventSourceInit;
private handlers: Map<string, Set<SSEHandler>> = new Map();
private isConnected: boolean = false;

constructor(options: SSEClientOptions) {
this.url = options.url;
this.options = {
withCredentials: options.withCredentials ?? false,
};
}

connect(): void {
this.eventSource = new EventSource(this.url, this.options);

this.eventSource.onopen = () => {
this.isConnected = true;
console.log('SSE 连接成功');
};

this.eventSource.onerror = () => {
this.isConnected = false;
console.log('SSE 连接错误,将自动重连');
};

// 默认消息处理
this.eventSource.onmessage = (event) => {
this.dispatch('message', event.data);
};
}

on(event: string, handler: SSEHandler): () => void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());

// 添加自定义事件监听
this.eventSource?.addEventListener(event, (e: Event) => {
const messageEvent = e as MessageEvent;
this.dispatch(event, messageEvent.data);
});
}

this.handlers.get(event)!.add(handler);

return () => {
this.handlers.get(event)?.delete(handler);
};
}

private dispatch(event: string, data: string): void {
let parsedData;
try {
parsedData = JSON.parse(data);
} catch {
parsedData = data;
}

this.handlers.get(event)?.forEach(handler => handler(parsedData));
}

close(): void {
this.eventSource?.close();
this.isConnected = false;
this.handlers.clear();
}

get connected(): boolean {
return this.isConnected;
}
}

// 使用
const sse = new SSEClient({
url: '/api/events',
withCredentials: true,
});

sse.connect();

sse.on('message', (data) => {
console.log('消息:', data);
});

sse.on('notification', (data) => {
showNotification(data);
});

服务端实现(Node.js)

import express, { Request, Response } from 'express';

const app = express();

app.get('/api/events', (req: Request, res: Response) => {
// 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('Access-Control-Allow-Origin', '*');

// 发送初始消息
res.write('data: {"type": "connected"}\n\n');

// 定时发送消息
const interval = setInterval(() => {
const data = {
time: new Date().toISOString(),
message: 'Hello from server',
};

// SSE 格式: "event: name\ndata: content\n\n"
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 1000);

// 发送自定义事件
setTimeout(() => {
res.write(`event: notification\n`);
res.write(`data: {"title": "New notification"}\n\n`);
}, 5000);

// 客户端断开连接
req.on('close', () => {
clearInterval(interval);
console.log('客户端断开连接');
});
});

app.listen(3000);

SSE 消息格式

# 基本消息
data: Hello World

# 多行数据
data: line 1
data: line 2

# JSON 数据
data: {"name": "Alice", "age": 25}

# 自定义事件
event: notification
data: {"message": "You have a new message"}

# 消息 ID(用于断线重连)
id: 123
data: message with id

# 重试间隔(毫秒)
retry: 5000
data: reconnect in 5 seconds

长轮询

class LongPolling {
private url: string;
private isPolling: boolean = false;
private abortController: AbortController | null = null;
private handlers: Set<(data: any) => void> = new Set();

constructor(url: string) {
this.url = url;
}

start(): void {
this.isPolling = true;
this.poll();
}

private async poll(): Promise<void> {
while (this.isPolling) {
try {
this.abortController = new AbortController();

const response = await fetch(this.url, {
signal: this.abortController.signal,
});

if (response.ok) {
const data = await response.json();
this.handlers.forEach(handler => handler(data));
}
} catch (error) {
if ((error as Error).name !== 'AbortError') {
console.error('轮询错误:', error);
// 错误后延迟重试
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
}

onMessage(handler: (data: any) => void): () => void {
this.handlers.add(handler);
return () => this.handlers.delete(handler);
}

stop(): void {
this.isPolling = false;
this.abortController?.abort();
}
}

// 使用
const polling = new LongPolling('/api/poll');

polling.onMessage((data) => {
console.log('收到数据:', data);
});

polling.start();

选型建议

场景推荐

场景推荐技术原因
聊天应用WebSocket双向实时通信
实时数据看板SSE单向推送,简单
股票行情WebSocket高频双向
通知推送SSE单向推送
在线协作WebSocket双向实时
日志监控SSE持续单向

Socket.IO

Socket.IO 是一个封装了 WebSocket 的库,提供了更多功能:

// 客户端
import { io, Socket } from 'socket.io-client';

const socket: Socket = io('http://localhost:3000', {
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionAttempts: 5,
});

// 连接事件
socket.on('connect', () => {
console.log('连接成功:', socket.id);
});

// 发送消息
socket.emit('chat', { message: 'Hello' });

// 接收消息
socket.on('chat', (data: any) => {
console.log('收到:', data);
});

// 带确认的消息
socket.emit('message', { text: 'hi' }, (response: any) => {
console.log('服务端确认:', response);
});

// 加入房间
socket.emit('join', { room: 'room1' });

// 断开连接
socket.on('disconnect', (reason: string) => {
console.log('断开连接:', reason);
});
// 服务端
import { Server, Socket } from 'socket.io';

const io = new Server(3000, {
cors: { origin: '*' },
});

io.on('connection', (socket: Socket) => {
console.log('新连接:', socket.id);

// 接收消息
socket.on('chat', (data: any) => {
// 广播给所有人
io.emit('chat', data);
});

// 加入房间
socket.on('join', ({ room }: { room: string }) => {
socket.join(room);
// 向房间广播
io.to(room).emit('userJoined', { userId: socket.id });
});

// 带确认的消息
socket.on('message', (data: any, callback: Function) => {
console.log('收到消息:', data);
callback({ status: 'ok' });
});

socket.on('disconnect', () => {
console.log('断开连接');
});
});

Protocol Buffers(protobuf)

Protocol Buffers 是 Google 开发的一种语言无关、平台无关的二进制序列化格式,常与 WebSocket 搭配使用来提升传输效率。

JSON vs Protocol Buffers

特性JSONProtocol Buffers
格式文本(人类可读)二进制(不可读)
体积较大(含字段名)小 3-10 倍
解析速度较慢快 20-100 倍
Schema无(可选 JSON Schema)必须定义 .proto 文件
类型安全强(编译时检查)
调试方便(直接可读)不便(需工具解码)
浏览器支持原生需要库(protobuf.js)
何时使用 protobuf
  • 高频消息传输:如直播弹幕、实时行情、游戏状态同步(每秒数十到数百条消息)
  • 带宽敏感场景:移动端弱网、大规模推送
  • 强类型需求:前后端共享 Schema,减少联调成本
  • JSON 足够好的场景:低频通信、调试优先、快速原型

定义 Schema(.proto 文件)

protobuf 的核心是 .proto 文件,定义消息的结构和字段类型:

proto/chat.proto
// 指定 proto3 语法
syntax = "proto3";

// 包名,避免命名冲突
package chat;

// 聊天消息
message ChatMessage {
string id = 1; // 字段编号 1,唯一标识(不是值)
string sender = 2;
string content = 3;
int64 timestamp = 4; // Unix 时间戳(毫秒)
MessageType type = 5;
}

// 枚举类型
enum MessageType {
TEXT = 0; // 枚举第一个值必须为 0
IMAGE = 1;
VIDEO = 2;
SYSTEM = 3;
}

// 嵌套消息
message ChatRoom {
string room_id = 1;
string name = 2;
repeated ChatMessage messages = 3; // repeated = 数组
map<string, string> metadata = 4; // map 类型
}

// WebSocket 通信的统一消息包装
message WebSocketFrame {
string event = 1; // 事件名
oneof payload { // oneof = 联合类型,同时只有一个字段有值
ChatMessage chat = 2;
JoinRequest join = 3;
HeartbeatPing ping = 4;
}
}

message JoinRequest {
string room_id = 1;
string user_id = 2;
}

message HeartbeatPing {
int64 timestamp = 1;
}
protobuf 字段编号规则
  • 字段编号是唯一标识,序列化时用编号而非字段名,这是 protobuf 体积小的关键原因
  • 1-15 编号只占 1 字节,高频字段应使用小编号
  • 16-2047 占 2 字节
  • 编号一旦使用不可更改,删除字段后编号也不能复用(用 reserved 标记)
  • 这种设计允许向前/向后兼容:新增字段不影响旧客户端解析

前端使用(protobuf.js)

前端最常用的库是 protobuf.js,支持从 .proto 文件生成 TypeScript 代码:

npm install protobufjs

方式一:静态生成(推荐用于生产环境)

先用 pbjs / pbts.proto 文件生成 JS + TS 类型代码:

# 生成 JS 代码(静态模块)
npx pbjs -t static-module -w es6 -o src/proto/chat.js proto/chat.proto

# 生成 TypeScript 类型定义
npx pbts -o src/proto/chat.d.ts src/proto/chat.js
src/ws-client.ts
// 导入生成的 protobuf 模块
import { chat } from './proto/chat';

// ====== 编码:对象 → 二进制 ======

// 1. 创建消息对象
const message = chat.ChatMessage.create({
id: 'msg-001',
sender: 'user-123',
content: '你好!',
timestamp: Date.now(),
type: chat.MessageType.TEXT,
});

// 2. 验证消息(可选,开发阶段推荐)
const error = chat.ChatMessage.verify(message);
if (error) throw new Error(`消息验证失败: ${error}`);

// 3. 编码为二进制(Uint8Array)
const buffer: Uint8Array = chat.ChatMessage.encode(message).finish();

console.log('JSON 大小:', JSON.stringify(message).length, '字节');
console.log('Protobuf 大小:', buffer.length, '字节');
// 典型结果:JSON ~120 字节,Protobuf ~45 字节

// ====== 解码:二进制 → 对象 ======

const decoded = chat.ChatMessage.decode(buffer);
console.log(decoded.content); // "你好!"
console.log(decoded.type); // 0 (TEXT)

方式二:动态加载(适合开发/快速原型)

src/dynamic-proto.ts
import protobuf from 'protobufjs';

// 运行时加载 .proto 文件
const root = await protobuf.load('proto/chat.proto');

// 获取消息类型
const ChatMessage = root.lookupType('chat.ChatMessage');

// 编码
const payload = { id: '1', sender: 'Alice', content: 'Hi', timestamp: Date.now() };
const buffer = ChatMessage.encode(ChatMessage.create(payload)).finish();

// 解码
const decoded = ChatMessage.decode(buffer);
console.log(ChatMessage.toObject(decoded)); // 转为普通对象

WebSocket + protobuf 完整示例

src/protobuf-ws-client.ts
import { chat } from './proto/chat';

class ProtobufWebSocket {
private ws: WebSocket | null = null;
private handlers = new Map<string, Set<(data: any) => void>>();

connect(url: string): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(url);
this.ws.binaryType = 'arraybuffer'; // 关键:指定接收二进制数据

this.ws.onopen = () => resolve();
this.ws.onerror = () => reject(new Error('连接失败'));

this.ws.onmessage = (event: MessageEvent) => {
// 接收到的是 ArrayBuffer,需要转为 Uint8Array 再解码
const frame = chat.WebSocketFrame.decode(
new Uint8Array(event.data as ArrayBuffer)
);

// 根据 event 字段分发到对应的处理器
const handlers = this.handlers.get(frame.event);
handlers?.forEach((handler) => handler(frame[frame.payload!]));
};
});
}

// 发送 protobuf 编码的消息
sendChat(message: chat.IChatMessage): void {
const frame = chat.WebSocketFrame.create({
event: 'chat',
chat: message,
});

const buffer = chat.WebSocketFrame.encode(frame).finish();
this.ws?.send(buffer); // 直接发送二进制
}

sendJoin(roomId: string, userId: string): void {
const frame = chat.WebSocketFrame.create({
event: 'join',
join: { roomId, userId },
});

this.ws?.send(chat.WebSocketFrame.encode(frame).finish());
}

on(event: string, handler: (data: any) => void): () => void {
if (!this.handlers.has(event)) {
this.handlers.set(event, new Set());
}
this.handlers.get(event)!.add(handler);
return () => this.handlers.get(event)?.delete(handler);
}

close(): void {
this.ws?.close(1000);
}
}

// 使用
const client = new ProtobufWebSocket();
await client.connect('wss://example.com/ws');

client.on('chat', (msg: chat.IChatMessage) => {
console.log(`${msg.sender}: ${msg.content}`);
});

client.sendJoin('room-1', 'user-123');
client.sendChat({
id: crypto.randomUUID(),
sender: 'user-123',
content: '大家好!',
timestamp: Date.now(),
type: chat.MessageType.TEXT,
});

服务端解析(Node.js)

server/ws-server.ts
import { WebSocketServer, WebSocket } from 'ws';
import { chat } from './proto/chat';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws: WebSocket) => {
ws.on('message', (data: Buffer) => {
const frame = chat.WebSocketFrame.decode(new Uint8Array(data));

switch (frame.event) {
case 'chat':
console.log('聊天消息:', frame.chat?.content);
// 广播给所有客户端
const response = chat.WebSocketFrame.encode(frame).finish();
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(response);
}
});
break;

case 'join':
console.log(`用户 ${frame.join?.userId} 加入房间 ${frame.join?.roomId}`);
break;
}
});
});

protobuf 编码原理

protobuf 之所以体积小、速度快,核心在于 Varint 编码 + Tag-Value 格式

编码格式示意
// JSON 编码 → 文本,包含字段名
// {"id":"1","sender":"Alice","content":"Hi"} // 43 字节

// protobuf 编码 → 二进制,用字段编号代替字段名
// [0a 01 31] [12 05 41 6c 69 63 65] [1a 02 48 69]
// ↑ 字段1 ↑ 字段2 ↑ 字段3
// tag+长度+值 tag+长度+"Alice" tag+长度+"Hi"
// 共 16 字节,节省 ~63%
protobuf 使用注意事项
  1. 调试不便:二进制数据在 DevTools Network 面板中不可读,建议开发环境提供 JSON fallback 或使用 protobuf-decoder 工具
  2. Schema 同步:前后端必须使用相同版本的 .proto 文件,建议放在共享仓库或 npm 包中
  3. 字段兼容性:永远不要修改已有字段的编号或类型,只能新增字段或用 reserved 标记废弃字段
  4. 默认值:proto3 中所有字段都有默认值(string→""、int→0、bool→false),值为默认值时不会被序列化,这意味着你无法区分"字段值为 0"和"字段未设置"
  5. 包体积protobufjs 完整版约 80KB(gzip 后 ~20KB),如果只需编解码可用轻量版 protobufjs/light

常见面试问题

Q1: WebSocket 和 HTTP 有什么区别?

答案

特性WebSocketHTTP
连接持久连接短连接
通信全双工请求-响应
头部轻量每次请求带完整头部
协议ws/wsshttp/https
发起方双方都可只能客户端
// HTTP: 每次请求都要建立连接
fetch('/api/data'); // 连接 → 请求 → 响应 → 关闭

// WebSocket: 一次连接,持续通信
const ws = new WebSocket('wss://...');
ws.send('message1');
ws.send('message2'); // 复用同一连接

Q2: WebSocket 如何实现心跳检测?

答案

class HeartbeatWebSocket {
private ws: WebSocket;
private heartbeatTimer: number | null = null;
private pongTimeout: number | null = null;

connect(url: string): void {
this.ws = new WebSocket(url);

this.ws.onopen = () => {
this.startHeartbeat();
};

this.ws.onmessage = (e) => {
if (e.data === 'pong') {
this.clearPongTimeout();
}
};

this.ws.onclose = () => {
this.stopHeartbeat();
};
}

private startHeartbeat(): void {
this.heartbeatTimer = window.setInterval(() => {
this.ws.send('ping');

// 等待 pong 响应
this.pongTimeout = window.setTimeout(() => {
console.log('心跳超时,断开连接');
this.ws.close();
}, 5000);
}, 30000);
}

private clearPongTimeout(): void {
if (this.pongTimeout) {
clearTimeout(this.pongTimeout);
this.pongTimeout = null;
}
}

private stopHeartbeat(): void {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
this.clearPongTimeout();
}
}

Q3: SSE 和 WebSocket 如何选择?

答案

需求SSEWebSocket
单向推送✅ 推荐可以
双向通信✅ 必须
简单实现✅ HTTP 协议需要专门服务
自动重连✅ 内置需要自己实现
二进制数据❌ 仅文本✅ 支持
高频消息一般✅ 更适合

Q4: WebSocket 断线重连怎么实现?

答案

class ReconnectingWebSocket {
private url: string;
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxAttempts = 5;
private baseDelay = 1000;

connect(): void {
this.ws = new WebSocket(this.url);

this.ws.onopen = () => {
this.reconnectAttempts = 0;
};

this.ws.onclose = (e) => {
if (e.code !== 1000) { // 非正常关闭
this.reconnect();
}
};
}

private reconnect(): void {
if (this.reconnectAttempts >= this.maxAttempts) {
console.log('达到最大重连次数');
return;
}

// 指数退避
const delay = this.baseDelay * Math.pow(2, this.reconnectAttempts);
this.reconnectAttempts++;

setTimeout(() => {
console.log(`重连中... (${this.reconnectAttempts})`);
this.connect();
}, delay);
}
}

Q5: 如何处理 WebSocket 消息顺序?

答案

interface OrderedMessage {
seq: number; // 序列号
data: any;
}

class OrderedWebSocket {
private expectedSeq = 0;
private buffer: Map<number, any> = new Map();

handleMessage(msg: OrderedMessage): void {
if (msg.seq === this.expectedSeq) {
// 顺序正确,处理消息
this.process(msg.data);
this.expectedSeq++;

// 处理缓冲区中的后续消息
while (this.buffer.has(this.expectedSeq)) {
this.process(this.buffer.get(this.expectedSeq));
this.buffer.delete(this.expectedSeq);
this.expectedSeq++;
}
} else if (msg.seq > this.expectedSeq) {
// 消息提前到达,缓冲
this.buffer.set(msg.seq, msg.data);
}
// seq < expectedSeq: 重复消息,丢弃
}

private process(data: any): void {
console.log('处理消息:', data);
}
}

Q6: WebSocket 为什么要用 protobuf 而不是 JSON?

答案

JSON 是文本格式,每条消息都要携带完整的字段名,且解析需要逐字符处理。protobuf 是二进制格式,用字段编号代替字段名,用 Varint 变长编码压缩整数,体积通常只有 JSON 的 1/3~1/10,解析速度快 20-100 倍。

// 同一条消息的体积对比
const message = { id: 1, sender: 'Alice', content: 'Hello', timestamp: 1711180800000 };

// JSON: '{"id":1,"sender":"Alice","content":"Hello","timestamp":1711180800000}'
// → 70 字节,包含字段名 "id"、"sender" 等冗余文本

// Protobuf: 二进制编码
// → ~28 字节,字段名替换为 1 字节的编号,整数用 Varint 压缩

适合 protobuf 的场景:直播弹幕(每秒数百条)、实时行情、游戏同步等高频 + 带宽敏感场景。对于低频 API 调用或需要快速调试的场景,JSON 更合适。

Q7: protobuf 如何实现前后端向后兼容?

答案

protobuf 的兼容性依赖于字段编号不变这一原则:

message User {
string name = 1;
int32 age = 2;
// 新增字段:旧客户端解析时会忽略未知编号
string email = 3;
// 删除字段:用 reserved 防止编号被复用
reserved 4; // 编号 4 不可再使用
reserved "phone"; // 字段名 "phone" 不可再使用
}
  • 新增字段:旧客户端遇到未知编号会自动跳过,不会报错
  • 删除字段:用 reserved 标记,防止后续开发者误用相同编号导致解析错误
  • 修改字段类型或编号绝对不行,会导致解析错误
  • 这种设计使得前后端可以独立发版,只要不破坏已有字段

相关链接