跳到主要内容

Node.js 进程与线程

问题

Node.js 如何实现多进程和多线程?child_process、cluster 和 worker_threads 有什么区别?

答案

Node.js 提供三种方式利用多核 CPU:

  • child_process:创建子进程,执行任意命令或脚本
  • cluster:创建多个工作进程,共享同一端口
  • worker_threads:创建工作线程,共享内存

child_process

spawn

import { spawn } from 'child_process';

// 创建子进程执行命令
const ls = spawn('ls', ['-la']);

ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
console.log(`子进程退出,退出码: ${code}`);
});

// 选项
const child = spawn('node', ['script.js'], {
cwd: '/path/to/dir', // 工作目录
env: { ...process.env, NODE_ENV: 'production' },
stdio: 'inherit', // 继承父进程的 stdio
detached: true, // 分离子进程
shell: true // 使用 shell 执行
});

exec

import { exec, execSync } from 'child_process';
import { promisify } from 'util';

const execAsync = promisify(exec);

// 异步执行
exec('ls -la', (error, stdout, stderr) => {
if (error) {
console.error('执行错误:', error);
return;
}
console.log('stdout:', stdout);
});

// Promise 版本
const { stdout, stderr } = await execAsync('ls -la');

// 同步执行
const result = execSync('ls -la', { encoding: 'utf8' });

execFile

import { execFile } from 'child_process';

// 直接执行可执行文件(不通过 shell)
execFile('node', ['--version'], (error, stdout) => {
console.log('Node version:', stdout);
});

fork

// parent.js
import { fork } from 'child_process';

const child = fork('./child.js');

// 发送消息给子进程
child.send({ type: 'start', data: [1, 2, 3] });

// 接收子进程消息
child.on('message', (message) => {
console.log('收到子进程消息:', message);
});

child.on('exit', (code) => {
console.log('子进程退出:', code);
});

// child.js
process.on('message', (message) => {
console.log('收到父进程消息:', message);

// 处理任务
const result = message.data.reduce((a, b) => a + b, 0);

// 发送结果给父进程
process.send({ type: 'result', data: result });
});

spawn vs exec vs fork

方法用途stdout适用场景
spawn执行命令流式输出大量输出、长时间运行
exec执行命令缓冲输出小量输出、简单命令
execFile执行文件缓冲输出直接执行,更安全
fork执行 Node.jsIPC 通信Node.js 脚本、进程通信

cluster

cluster 模块用于创建多个工作进程,共享同一端口,实现负载均衡。

import cluster from 'cluster';
import { createServer } from 'http';
import { cpus } from 'os';

const numCPUs = cpus().length;

if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`);

// 创建工作进程
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

// 工作进程退出时重启
cluster.on('exit', (worker, code, signal) => {
console.log(`工作进程 ${worker.process.pid} 退出`);
cluster.fork(); // 重新创建
});

} else {
// 工作进程共享同一 TCP 连接
createServer((req, res) => {
res.writeHead(200);
res.end(`来自进程 ${process.pid}\n`);
}).listen(3000);

console.log(`工作进程 ${process.pid} 已启动`);
}

进程间通信

if (cluster.isPrimary) {
const worker = cluster.fork();

// 发送消息给工作进程
worker.send({ type: 'config', data: { port: 3000 } });

// 接收工作进程消息
worker.on('message', (message) => {
console.log('收到工作进程消息:', message);
});

} else {
// 接收主进程消息
process.on('message', (message) => {
console.log('收到主进程消息:', message);

// 发送消息给主进程
process.send({ type: 'ready', pid: process.pid });
});
}

负载均衡策略

// 设置调度策略(在 fork 前设置)
cluster.schedulingPolicy = cluster.SCHED_RR; // 轮询(默认)
// 或
cluster.schedulingPolicy = cluster.SCHED_NONE; // 由操作系统决定

worker_threads

worker_threads 用于创建 JavaScript 工作线程,共享内存,适合 CPU 密集型任务。

基本用法

// main.ts
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

if (isMainThread) {
// 主线程
const worker = new Worker(__filename, {
workerData: { n: 40 }
});

worker.on('message', (result) => {
console.log('计算结果:', result);
});

worker.on('error', (err) => {
console.error('Worker 错误:', err);
});

worker.on('exit', (code) => {
console.log('Worker 退出:', code);
});

} else {
// 工作线程
const { n } = workerData;

function fibonacci(n: number): number {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}

const result = fibonacci(n);
parentPort?.postMessage(result);
}

独立 Worker 文件

// main.ts
import { Worker } from 'worker_threads';
import { fileURLToPath } from 'url';
import { dirname, join } from 'path';

const __dirname = dirname(fileURLToPath(import.meta.url));

const worker = new Worker(join(__dirname, 'worker.ts'), {
workerData: { n: 40 }
});

worker.on('message', (result) => {
console.log('结果:', result);
});

// worker.ts
import { parentPort, workerData } from 'worker_threads';

const { n } = workerData;
// ... 计算
parentPort?.postMessage(result);

SharedArrayBuffer(共享内存)

import { Worker, isMainThread } from 'worker_threads';

if (isMainThread) {
// 创建共享内存
const sharedBuffer = new SharedArrayBuffer(4);
const sharedArray = new Int32Array(sharedBuffer);

sharedArray[0] = 0;

const worker = new Worker(__filename, {
workerData: { sharedBuffer }
});

// 修改共享内存
setTimeout(() => {
Atomics.add(sharedArray, 0, 10);
console.log('主线程: 加了 10');
}, 100);

} else {
const { sharedBuffer } = workerData;
const sharedArray = new Int32Array(sharedBuffer);

// 原子操作
Atomics.add(sharedArray, 0, 5);
console.log('Worker: 加了 5,当前值:', sharedArray[0]);
}

MessageChannel

import { Worker, MessageChannel } from 'worker_threads';

const worker = new Worker('./worker.js');

// 创建通信通道
const { port1, port2 } = new MessageChannel();

// 将 port2 发送给 worker
worker.postMessage({ port: port2 }, [port2]);

// 通过 port1 通信
port1.on('message', (message) => {
console.log('收到:', message);
});

port1.postMessage('Hello from main');

对比

特性child_processclusterworker_threads
隔离级别进程进程线程
内存独立独立可共享
通信IPCIPCpostMessage/SharedArrayBuffer
开销
适用场景执行外部命令HTTP 服务负载均衡CPU 密集型计算
稳定性崩溃不影响主进程崩溃不影响主进程崩溃可能影响主进程

常见面试问题

Q1: Node.js 如何利用多核 CPU?

答案

// 方式一:cluster(HTTP 服务)
import cluster from 'cluster';
import { cpus } from 'os';

if (cluster.isPrimary) {
for (let i = 0; i < cpus().length; i++) {
cluster.fork();
}
} else {
// 启动 HTTP 服务
}

// 方式二:worker_threads(CPU 密集型)
import { Worker } from 'worker_threads';

const workers = cpus().map(() => new Worker('./task.js'));

// 方式三:child_process
import { fork } from 'child_process';

const children = cpus().map(() => fork('./task.js'));

// 方式四:PM2
// pm2 start app.js -i max

Q2: cluster 的工作原理是什么?

答案

cluster 通过主进程监听端口,将连接分发给工作进程:

  1. 端口共享:主进程监听端口
  2. 连接分发:新连接到达时,主进程选择一个工作进程处理
  3. 负载均衡:默认使用轮询(Round-Robin)策略
// 简化原理
if (cluster.isPrimary) {
const server = net.createServer();
server.listen(3000);

server.on('connection', (socket) => {
// 选择一个 worker
const worker = selectWorker();
// 将 socket 发送给 worker
worker.send('socket', socket);
});
}

Q3: worker_threads 和 Web Workers 有什么区别?

答案

特性worker_threadsWeb Workers
环境Node.js浏览器
共享内存SharedArrayBuffer + AtomicsSharedArrayBuffer(需 COOP/COEP)
可转移对象MessagePort、ArrayBuffer 等类似
require/import✅ 支持❌ 需 importScripts
Node.js API✅ 可访问❌ 无
// worker_threads
import { Worker } from 'worker_threads';
const worker = new Worker('./task.js');

// Web Workers
const worker = new Worker('/task.js');
worker.postMessage(data);

Q4: 如何实现进程间通信(IPC)?

答案

// 方式一:child_process/cluster 的 IPC
// parent.js
const child = fork('./child.js');
child.send({ type: 'data', payload: [1, 2, 3] });
child.on('message', (msg) => console.log(msg));

// child.js
process.on('message', (msg) => {
process.send({ type: 'result', payload: 6 });
});

// 方式二:共享内存(worker_threads)
const shared = new SharedArrayBuffer(1024);
const worker = new Worker('./worker.js', {
workerData: { shared }
});

// 方式三:消息队列(Redis、RabbitMQ 等)
// 适用于独立进程

// 方式四:Socket
// 使用 TCP/Unix Socket 通信

Q5: cluster 和 worker_threads 应该如何选择?

答案

使用 cluster

  • HTTP/TCP 服务器负载均衡
  • 需要进程隔离(一个崩溃不影响其他)
  • 与 PM2 配合管理

使用 worker_threads

  • CPU 密集型计算
  • 需要共享内存
  • 需要低开销线程
// HTTP 服务 → cluster
import cluster from 'cluster';

if (cluster.isPrimary) {
// fork workers
} else {
app.listen(3000);
}

// CPU 密集型 → worker_threads
import { Worker } from 'worker_threads';

app.get('/compute', async (req, res) => {
const worker = new Worker('./heavy-task.js');
worker.on('message', (result) => res.json(result));
});

相关链接