跳到主要内容

Buffer 与 Stream

问题

Node.js 中的 Buffer 和 Stream 是什么?它们有什么用途和使用场景?

答案

Buffer 是 Node.js 中用于处理二进制数据的类,Stream 是处理流式数据的抽象接口。两者是 Node.js 高效处理 I/O 的核心。


Buffer

什么是 Buffer

Buffer 是 Node.js 中的全局对象,用于表示固定长度的字节序列,直接操作内存中的数据。

// 创建 Buffer
const buf1 = Buffer.alloc(10); // 10 字节,初始化为 0
const buf2 = Buffer.allocUnsafe(10); // 10 字节,未初始化(更快)
const buf3 = Buffer.from([1, 2, 3]); // 从数组创建
const buf4 = Buffer.from('Hello', 'utf8'); // 从字符串创建
const buf5 = Buffer.from(buf4); // 复制

console.log(buf4); // <Buffer 48 65 6c 6c 6f>
console.log(buf4.toString()); // 'Hello'

常用操作

const buf = Buffer.alloc(10);

// 写入
buf.write('Hello');
buf.writeInt32BE(12345, 5); // 大端序写入

// 读取
console.log(buf.toString('utf8', 0, 5)); // 'Hello'
console.log(buf.readInt32BE(5)); // 12345

// 切片(共享内存)
const slice = buf.slice(0, 5);
slice[0] = 97; // 修改会影响原 buf

// 复制(独立内存)
const copy = Buffer.alloc(5);
buf.copy(copy, 0, 0, 5);

// 比较
const a = Buffer.from('abc');
const b = Buffer.from('abd');
console.log(a.compare(b)); // -1 (a < b)
console.log(a.equals(Buffer.from('abc'))); // true

// 连接
const combined = Buffer.concat([buf1, buf2, buf3]);

// 填充
buf.fill(0); // 全部填充为 0

编码转换

const str = '你好';

// 字符串 → Buffer
const buf = Buffer.from(str, 'utf8');
console.log(buf); // <Buffer e4 bd a0 e5 a5 bd>

// Buffer → 字符串
console.log(buf.toString('utf8')); // '你好'
console.log(buf.toString('base64')); // '5L2g5aW9'
console.log(buf.toString('hex')); // 'e4bda0e5a5bd'

// 支持的编码
// utf8, utf16le, latin1, base64, base64url, hex, ascii, binary

Stream

四种流类型

类型说明示例
Readable可读流fs.createReadStream、http 请求体
Writable可写流fs.createWriteStream、http 响应体
Duplex双工流(可读可写)net.Socket、TCP 连接
Transform转换流(读写时转换)zlib.createGzip、crypto
import { Readable, Writable, Duplex, Transform } from 'stream';

// 可读流
const readable = new Readable({
read(size) {
this.push('hello');
this.push(null); // 结束信号
}
});

// 可写流
const writable = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});

// 管道
readable.pipe(writable);

可读流 (Readable)

import { createReadStream } from 'fs';

const readable = createReadStream('large-file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB 缓冲区
});

// 流动模式 - data 事件
readable.on('data', (chunk) => {
console.log(`接收 ${chunk.length} 字节`);
});

readable.on('end', () => {
console.log('读取完成');
});

readable.on('error', (err) => {
console.error('读取错误:', err);
});

// 暂停模式 - read() 方法
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
console.log(chunk);
}
});

// 暂停和恢复
readable.pause();
readable.resume();

可写流 (Writable)

import { createWriteStream } from 'fs';

const writable = createWriteStream('output.txt', {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB
});

// 写入
const canContinue = writable.write('Hello ');
console.log('缓冲区未满:', canContinue);

// 背压处理
if (!writable.write('World')) {
console.log('缓冲区已满,等待排空');
writable.once('drain', () => {
console.log('缓冲区已排空,可以继续写入');
});
}

// 结束
writable.end('Done!', () => {
console.log('写入完成');
});

// 事件
writable.on('finish', () => console.log('全部写入完成'));
writable.on('error', (err) => console.error('写入错误:', err));

双工流 (Duplex)

import { Duplex } from 'stream';

const duplex = new Duplex({
read(size) {
this.push('from read');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('写入:', chunk.toString());
callback();
}
});

// 读和写是独立的
duplex.write('to write');
duplex.on('data', (chunk) => console.log('读取:', chunk.toString()));

转换流 (Transform)

import { Transform } from 'stream';

// 自定义转换流 - 大写转换
const uppercase = new Transform({
transform(chunk, encoding, callback) {
const upper = chunk.toString().toUpperCase();
callback(null, upper);
}
});

// 使用
process.stdin
.pipe(uppercase)
.pipe(process.stdout);

// 内置转换流
import { createGzip, createGunzip } from 'zlib';
import { createCipheriv, createDecipheriv } from 'crypto';

// 压缩
createReadStream('input.txt')
.pipe(createGzip())
.pipe(createWriteStream('input.txt.gz'));

管道 (Pipe)

基本用法

import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

// 简单管道
createReadStream('input.txt')
.pipe(createWriteStream('output.txt'));

// 链式管道
createReadStream('input.txt')
.pipe(createGzip())
.pipe(createWriteStream('input.txt.gz'));

pipeline (推荐)

import { pipeline } from 'stream/promises';
import { createReadStream, createWriteStream } from 'fs';
import { createGzip } from 'zlib';

// 异步 pipeline,自动处理错误和清理
async function compress(input: string, output: string) {
await pipeline(
createReadStream(input),
createGzip(),
createWriteStream(output)
);
console.log('压缩完成');
}

// 回调版本
import { pipeline as pipelineCb } from 'stream';

pipelineCb(
createReadStream('input.txt'),
createGzip(),
createWriteStream('input.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline 失败:', err);
} else {
console.log('Pipeline 成功');
}
}
);

背压 (Backpressure)

背压是当可写流消费速度跟不上可读流生产速度时的一种反馈机制。

import { createReadStream, createWriteStream } from 'fs';

const readable = createReadStream('large-file.txt');
const writable = createWriteStream('output.txt');

// ❌ 不处理背压可能导致内存溢出
readable.on('data', (chunk) => {
writable.write(chunk); // 忽略返回值
});

// ✅ 正确处理背压
readable.on('data', (chunk) => {
const canContinue = writable.write(chunk);

if (!canContinue) {
// 暂停读取,等待写入完成
readable.pause();
writable.once('drain', () => {
readable.resume();
});
}
});

// ✅✅ 最佳方案:使用 pipe/pipeline
readable.pipe(writable); // 自动处理背压

实际应用

文件复制

import { createReadStream, createWriteStream } from 'fs';
import { pipeline } from 'stream/promises';

async function copyFile(src: string, dest: string): Promise<void> {
await pipeline(
createReadStream(src),
createWriteStream(dest)
);
}

大文件处理

import { createReadStream } from 'fs';
import { createInterface } from 'readline';

// 逐行读取大文件
async function processLargeFile(filepath: string): Promise<void> {
const rl = createInterface({
input: createReadStream(filepath),
crlfDelay: Infinity
});

for await (const line of rl) {
// 处理每一行
console.log(line);
}
}

HTTP 流式响应

import { createServer } from 'http';
import { createReadStream } from 'fs';

const server = createServer((req, res) => {
// 流式发送大文件
const stream = createReadStream('large-video.mp4');

res.writeHead(200, {
'Content-Type': 'video/mp4'
});

stream.pipe(res);

stream.on('error', (err) => {
res.writeHead(500);
res.end('Server Error');
});
});

常见面试问题

Q1: Buffer 和字符串有什么区别?

答案

特性BufferString
内容二进制字节序列Unicode 字符序列
长度字节数字符数
可变性可修改不可变
编码无编码,原始字节UTF-16
用途二进制数据、I/O文本处理
const str = '你好';
const buf = Buffer.from(str);

console.log(str.length); // 2(字符数)
console.log(buf.length); // 6(字节数,UTF-8 编码)

// Buffer 可修改
buf[0] = 0x00;

// String 不可变
// str[0] = 'x'; // 无效

Q2: 什么是背压?如何处理?

答案

背压(Backpressure)是当数据生产速度超过消费速度时的一种流控机制。

// 不处理背压的问题
readable.on('data', (chunk) => {
// 如果 writable 较慢,数据会堆积在内存中
writable.write(chunk);
});

// 处理背压
readable.on('data', (chunk) => {
if (!writable.write(chunk)) {
readable.pause();
writable.once('drain', () => readable.resume());
}
});

// 最佳实践:使用 pipeline
import { pipeline } from 'stream/promises';
await pipeline(readable, transform, writable);

Q3: Stream 的流动模式和暂停模式有什么区别?

答案

模式触发方式数据获取控制方式
流动模式data 事件自动推送pause/resume
暂停模式readable 事件手动 read()按需读取
// 流动模式
readable.on('data', (chunk) => {
// 数据自动推送
});

// 暂停模式
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
// 手动读取
}
});

// 切换到流动模式
readable.resume();
readable.pipe(writable);

// 切换到暂停模式
readable.pause();

Q4: 为什么要用 Stream 而不是直接读取整个文件?

答案

// ❌ 读取整个文件到内存
const data = await fs.promises.readFile('1GB-file.txt');
// 问题:1GB 文件需要 1GB+ 内存

// ✅ 使用 Stream 流式处理
const stream = fs.createReadStream('1GB-file.txt');
// 优势:只需要 highWaterMark(默认 64KB)的内存

Stream 的优势

  1. 内存效率:只缓冲一小部分数据
  2. 时间效率:可以边读边处理,不等待全部加载
  3. 可组合:通过 pipe 链接多个处理步骤
  4. 背压处理:自动控制生产消费速度

Q5: 如何实现一个自定义的 Transform 流?

答案

import { Transform, TransformCallback } from 'stream';

// 方式一:继承 Transform
class UpperCaseTransform extends Transform {
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback
): void {
const upper = chunk.toString().toUpperCase();
this.push(upper);
callback();
}

_flush(callback: TransformCallback): void {
// 流结束时调用,可以输出剩余数据
this.push('\n--- END ---\n');
callback();
}
}

// 方式二:构造函数选项
const transform = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});

// 使用
process.stdin
.pipe(new UpperCaseTransform())
.pipe(process.stdout);

相关链接