Buffer 与 Stream
问题
Node.js 中的 Buffer 和 Stream 是什么?它们有什么用途和使用场景?
答案
Buffer 是 Node.js 中用于处理二进制数据的类,Stream 是处理流式数据的抽象接口。两者是 Node.js 高效处理 I/O 的核心。
Buffer
什么是 Buffer
Buffer 是 Node.js 中的全局对象,用于表示固定长度的字节序列,直接操作内存中的数据。
// 创建 Buffer
const buf1 = Buffer.alloc(10); // 10 字节,初始化为 0
const buf2 = Buffer.allocUnsafe(10); // 10 字节,未初始化(更快)
const buf3 = Buffer.from([1, 2, 3]); // 从数组创建
const buf4 = Buffer.from('Hello', 'utf8'); // 从字符串创建
const buf5 = Buffer.from(buf4); // 复制
console.log(buf4); // <Buffer 48 65 6c 6c 6f>
console.log(buf4.toString()); // 'Hello'
常用操作
const buf = Buffer.alloc(10);
// 写入
buf.write('Hello');
buf.writeInt32BE(12345, 5); // 大端序写入
// 读取
console.log(buf.toString('utf8', 0, 5)); // 'Hello'
console.log(buf.readInt32BE(5)); // 12345
// 切片(共享内存)
const slice = buf.slice(0, 5);
slice[0] = 97; // 修改会影响原 buf
// 复制(独立内存)
const copy = Buffer.alloc(5);
buf.copy(copy, 0, 0, 5);
// 比较
const a = Buffer.from('abc');
const b = Buffer.from('abd');
console.log(a.compare(b)); // -1 (a < b)
console.log(a.equals(Buffer.from('abc'))); // true
// 连接
const combined = Buffer.concat([buf1, buf2, buf3]);
// 填充
buf.fill(0); // 全部填充为 0
编码转换
const str = '你好';
// 字符串 → Buffer
const buf = Buffer.from(str, 'utf8');
console.log(buf); // <Buffer e4 bd a0 e5 a5 bd>
// Buffer → 字符串
console.log(buf.toString('utf8')); // '你好'
console.log(buf.toString('base64')); // '5L2g5aW9'
console.log(buf.toString('hex')); // 'e4bda0e5a5bd'
// 支持的编码
// utf8, utf16le, latin1, base64, base64url, hex, ascii, binary
Stream
四种流类型
| 类型 | 说明 | 示例 |
|---|---|---|
| Readable | 可读流 | fs.createReadStream、http 请求体 |
| Writable | 可写流 | fs.createWriteStream、http 响应体 |
| Duplex | 双工流(可读可写) | net.Socket、TCP 连接 |
| Transform | 转换流(读写时转换) | zlib.createGzip、crypto |
import { Readable, Writable, Duplex, Transform } from 'stream';
// 可读流
const readable = new Readable({
read(size) {
this.push('hello');
this.push(null); // 结束信号
}
});
// 可写流
const writable = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
// 管道
readable.pipe(writable);
可读流 (Readable)
import { createReadStream } from 'fs';
const readable = createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB 缓冲区
});
// 流动模式 - data 事件
readable.on('data', (chunk) => {
console.log(`接收 ${chunk.length} 字节`);
});
readable.on('end', () => {
console.log('读取完成');
});
readable.on('error', (err) => {
console.error('读取错误:', err);
});
// 暂停模式 - read() 方法
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(chunk);
}
});
// 暂停和恢复
readable.pause();
readable.resume();
可写流 (Writable)
import { createWriteStream } from 'fs';
const writable = createWriteStream('output.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB
});
// 写入
const canContinue = writable.write('Hello ');
console.log('缓冲区未满:', canContinue);
// 背压处理
if (!writable.write('World')) {
console.log('缓冲区已满,等待排空');
writable.once('drain', () => {
console.log('缓冲区已排空,可以继续写入');
});
}
// 结束
writable.end('Done!', () => {
console.log('写入完成');
});
// 事件
writable.on('finish', () => console.log('全部写入完成'));
writable.on('error', (err) => console.error('写入错误:', err));
双工流 (Duplex)
import { Duplex } from 'stream';
const duplex = new Duplex({
read(size) {
this.push('from read');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('写入:', chunk.toString());
callback();
}
});
// 读和写是独立的
duplex.write('to write');
duplex.on('data', (chunk) => console.log('读取:', chunk.toString()));
转换流 (Transform)
import { Transform } from 'stream';
// 自定义转换流 - 大写转换
const uppercase = new Transform({
transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
callback(null, upper);
}
});
// 使用
process.stdin
.pipe(uppercase)
.pipe(process.stdout);
// 内置转换流
import { createGzip, createGunzip } from 'zlib';
import { createCipheriv, createDecipheriv } from 'crypto';
// 压缩
createReadStream('input.txt')
.pipe(createGzip())
.pipe(createWriteStream('input.txt.gz'));
管道 (Pipe)
基本用法
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
// 简单管道
createReadStream('input.txt')
.pipe(createWriteStream('output.txt'));
// 链式管道
createReadStream('input.txt')
.pipe(createGzip())
.pipe(createWriteStream('input.txt.gz'));
pipeline (推荐)
import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';
// 异步 pipeline,自动处理错误和清理
async function compress(input: string, output: string) {
await pipeline(
createReadStream(input),
createGzip(),
createWriteStream(output)
);
console.log('压缩完成');
}
// 回调版本
import { pipeline as pipelineCb } from 'stream';
pipelineCb(
createReadStream('input.txt'),
createGzip(),
createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline 失败:', err);
} else {
console.log('Pipeline 成功');
}
}
);
背压 (Backpressure)
背压是当可写流消费速度跟不上可读流生产速度时的一种反馈机制。
import { createReadStream, createWriteStream } from 'fs';
const readable = createReadStream('large-file.txt');
const writable = createWriteStream('output.txt');
// ❌ 不处理背压可能导致内存溢出
readable.on('data', (chunk) => {
writable.write(chunk); // 忽略返回值
});
// ✅ 正确处理背压
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// 暂停读取,等待写入完成
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});
// ✅✅ 最佳方案:使用 pipe/pipeline
readable.pipe(writable); // 自动处理背压
实际应用
文件复制
import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';
async function copyFile(src: string, dest: string): Promise<void> {
await pipeline(
createReadStream(src),
createWriteStream(dest)
);
}
大文件处理
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
// 逐行读取大文件
async function processLargeFile(filepath: string): Promise<void> {
const rl = createInterface({
input: createReadStream(filepath),
crlfDelay: Infinity
});
for await (const line of rl) {
// 处理每一行
console.log(line);
}
}
HTTP 流式响应
import { createServer } from 'http';
import { createReadStream } from 'fs';
const server = createServer((req, res) => {
// 流式发送大文件
const stream = createReadStream('large-video.mp4');
res.writeHead(200, {
'Content-Type': 'video/mp4'
});
stream.pipe(res);
stream.on('error', (err) => {
res.writeHead(500);
res.end('Server Error');
});
});
常见面试问题
Q1: Buffer 和字符串有什么区别?
答案:
| 特性 | Buffer | String |
|---|---|---|
| 内容 | 二进制字节序列 | Unicode 字符序列 |
| 长度 | 字节数 | 字符数 |
| 可变性 | 可修改 | 不可变 |
| 编码 | 无编码,原始字节 | UTF-16 |
| 用途 | 二进制数据、I/O | 文本处理 |
const str = '你好';
const buf = Buffer.from(str);
console.log(str.length); // 2(字符数)
console.log(buf.length); // 6(字节数,UTF-8 编码)
// Buffer 可修改
buf[0] = 0x00;
// String 不可变
// str[0] = 'x'; // 无效
Q2: 什么是背压?如何处理?
答案:
背压(Backpressure)是当数据生产速度超过消费速度时的一种流控机制。
// 不处理背压的问题
readable.on('data', (chunk) => {
// 如果 writable 较慢,数据会堆积在内存中
writable.write(chunk);
});
// 处理背压
readable.on('data', (chunk) => {
if (!writable.write(chunk)) {
readable.pause();
writable.once('drain', () => readable.resume());
}
});
// 最佳实践:使用 pipeline
import { pipeline } from 'stream/promises';
await pipeline(readable, transform, writable);
Q3: Stream 的流动模式和暂停模式有什么区别?
答案:
| 模式 | 触发方式 | 数据获取 | 控制方式 |
|---|---|---|---|
| 流动模式 | data 事件 | 自动推送 | pause/resume |
| 暂停模式 | readable 事件 | 手动 read() | 按需读取 |
// 流动模式
readable.on('data', (chunk) => {
// 数据自动推送
});
// 暂停模式
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
// 手动读取
}
});
// 切换到流动模式
readable.resume();
readable.pipe(writable);
// 切换到暂停模式
readable.pause();
Q4: 为什么要用 Stream 而不是直接读取整个文件?
答案:
// ❌ 读取整个文件到内存
const data = await fs.promises.readFile('1GB-file.txt');
// 问题:1GB 文件需要 1GB+ 内存
// ✅ 使用 Stream 流式处理
const stream = fs.createReadStream('1GB-file.txt');
// 优势:只需要 highWaterMark(默认 64KB)的内存
Stream 的优势:
- 内存效率:只缓冲一小部分数据
- 时间效率:可以边读边处理,不等待全部加载
- 可组合:通过 pipe 链接多个处理步骤
- 背压处理:自动控制生产消费速度
Q5: 如何实现一个自定义的 Transform 流?
答案:
import { Transform, TransformCallback } from 'stream';
// 方式一:继承 Transform
class UpperCaseTransform extends Transform {
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback
): void {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
}
_flush(callback: TransformCallback): void {
// 流结束时调用,可以输出剩余数据
this.push('\n--- END ---\n');
callback();
}
}
// 方式二:构造函数选项
const transform = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
// 使用
process.stdin
.pipe(new UpperCaseTransform())
.pipe(process.stdout);