异步串行与并行
问题
实现异步任务的串行执行(顺序执行)和并行执行(同时执行)。
答案
异步控制是前端开发的核心技能,需要掌握多种实现方式。
异步串行执行
串行执行:下一个任务等待上一个任务完成后才开始。
方式一:async/await
type AsyncTask<T> = () => Promise<T>;
async function serial<T>(tasks: AsyncTask<T>[]): Promise<T[]> {
const results: T[] = [];
for (const task of tasks) {
const result = await task();
results.push(result);
}
return results;
}
// 使用
const tasks = [
() => delay(1000, 'A'),
() => delay(500, 'B'),
() => delay(300, 'C'),
];
await serial(tasks);
// 输出: A, B, C(按顺序,共 1800ms)
function delay<T>(ms: number, value: T): Promise<T> {
return new Promise(resolve => {
setTimeout(() => {
console.log(value);
resolve(value);
}, ms);
});
}
方式二:reduce + Promise 链
function serialReduce<T>(tasks: AsyncTask<T>[]): Promise<T[]> {
return tasks.reduce<Promise<T[]>>(
(promiseChain, task) =>
promiseChain.then((results) =>
task().then((result) => [...results, result])
),
Promise.resolve([])
);
}
// 等价于
// Promise.resolve([])
// .then(results => task1().then(r => [...results, r]))
// .then(results => task2().then(r => [...results, r]))
// .then(results => task3().then(r => [...results, r]))
方式三:递归
function serialRecursive<T>(tasks: AsyncTask<T>[]): Promise<T[]> {
return new Promise((resolve, reject) => {
const results: T[] = [];
let index = 0;
const next = () => {
if (index >= tasks.length) {
resolve(results);
return;
}
tasks[index++]()
.then((result) => {
results.push(result);
next();
})
.catch(reject);
};
next();
});
}
方式四:Generator
function* taskGenerator<T>(tasks: AsyncTask<T>[]): Generator<Promise<T>, void, T> {
for (const task of tasks) {
yield task();
}
}
async function serialGenerator<T>(tasks: AsyncTask<T>[]): Promise<T[]> {
const results: T[] = [];
const gen = taskGenerator(tasks);
for (const promise of gen) {
results.push(await promise);
}
return results;
}
异步并行执行
并行执行:所有任务同时开始执行。
方式一:Promise.all
async function parallel<T>(tasks: AsyncTask<T>[]): Promise<T[]> {
return Promise.all(tasks.map((task) => task()));
}
// 使用
await parallel(tasks);
// 输出: A, B, C(不按顺序,共 1000ms)
方式二:Promise.allSettled(不会因单个失败而中断)
async function parallelSettled<T>(
tasks: AsyncTask<T>[]
): Promise<PromiseSettledResult<T>[]> {
return Promise.allSettled(tasks.map((task) => task()));
}
// 使用
const results = await parallelSettled([
() => Promise.resolve(1),
() => Promise.reject(new Error('fail')),
() => Promise.resolve(3),
]);
// [
// { status: 'fulfilled', value: 1 },
// { status: 'rejected', reason: Error('fail') },
// { status: 'fulfilled', value: 3 }
// ]
方式三:手写 Promise.all
function promiseAll<T>(promises: Promise<T>[]): Promise<T[]> {
return new Promise((resolve, reject) => {
const results: T[] = [];
let completed = 0;
if (promises.length === 0) {
resolve([]);
return;
}
promises.forEach((promise, index) => {
Promise.resolve(promise)
.then((value) => {
results[index] = value;
completed++;
if (completed === promises.length) {
resolve(results);
}
})
.catch(reject);
});
});
}
执行流程对比
限流并行(控制并发数)
介于串行和并行之间,限制同时执行的任务数。
async function parallelLimit<T>(
tasks: AsyncTask<T>[],
limit: number
): Promise<T[]> {
const results: T[] = [];
const executing: Promise<void>[] = [];
for (let i = 0; i < tasks.length; i++) {
const current = i;
const p = tasks[current]().then((result) => {
results[current] = result;
});
const e = p.then(() => {
executing.splice(executing.indexOf(e), 1);
});
executing.push(e);
if (executing.length >= limit) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
}
// 使用:最多同时执行 2 个任务
await parallelLimit(tasks, 2);
竞速执行
多个任务同时执行,返回最先完成的结果。
Promise.race
async function race<T>(tasks: AsyncTask<T>[]): Promise<T> {
return Promise.race(tasks.map(task => task()));
}
// 使用:返回最快的结果
const fastest = await race([
() => delay(1000, 'slow'),
() => delay(100, 'fast'),
() => delay(500, 'medium'),
]);
console.log(fastest); // 'fast'
Promise.any(返回第一个成功的)
async function any<T>(tasks: AsyncTask<T>[]): Promise<T> {
return Promise.any(tasks.map(task => task()));
}
// 使用:返回第一个成功的结果
const first = await any([
() => Promise.reject('error 1'),
() => delay(100, 'success'),
() => Promise.reject('error 2'),
]);
console.log(first); // 'success'
管道执行
每个任务的输出作为下一个任务的输入。
type PipeTask<T, R> = (input: T) => Promise<R>;
async function pipe<T>(
initial: T,
...tasks: PipeTask<unknown, unknown>[]
): Promise<unknown> {
let result: unknown = initial;
for (const task of tasks) {
result = await task(result);
}
return result;
}
// 使用
const result = await pipe(
1,
async (x: number) => x + 1, // 2
async (x: number) => x * 2, // 4
async (x: number) => `Result: ${x}` // 'Result: 4'
);
类型安全的管道
// Compose 类型实现
function compose<A, B>(ab: (a: A) => Promise<B>): (a: A) => Promise<B>;
function compose<A, B, C>(
ab: (a: A) => Promise<B>,
bc: (b: B) => Promise<C>
): (a: A) => Promise<C>;
function compose<A, B, C, D>(
ab: (a: A) => Promise<B>,
bc: (b: B) => Promise<C>,
cd: (c: C) => Promise<D>
): (a: A) => Promise<D>;
function compose(...fns: Array<(x: unknown) => Promise<unknown>>) {
return (initial: unknown) =>
fns.reduce(
(promise, fn) => promise.then(fn),
Promise.resolve(initial)
);
}
// 使用
const process = compose(
async (x: number) => x + 1,
async (x: number) => x * 2,
async (x: number) => x.toString()
);
const result = await process(1); // '4'
条件执行
根据条件决定执行顺序。
interface ConditionalTask<T> {
condition: () => boolean | Promise<boolean>;
task: AsyncTask<T>;
}
async function conditionalSerial<T>(
tasks: ConditionalTask<T>[]
): Promise<(T | null)[]> {
const results: (T | null)[] = [];
for (const { condition, task } of tasks) {
const shouldRun = await condition();
if (shouldRun) {
results.push(await task());
} else {
results.push(null);
}
}
return results;
}
// 使用
const results = await conditionalSerial([
{
condition: () => true,
task: () => Promise.resolve('A'),
},
{
condition: () => false,
task: () => Promise.resolve('B'),
},
{
condition: async () => {
const result = await fetch('/api/check');
return result.ok;
},
task: () => Promise.resolve('C'),
},
]);
重试与超时
interface RetryOptions {
maxRetries: number;
delay: number;
timeout?: number;
}
async function withRetry<T>(
task: AsyncTask<T>,
options: RetryOptions
): Promise<T> {
const { maxRetries, delay: retryDelay, timeout } = options;
const executeWithTimeout = async (): Promise<T> => {
if (!timeout) return task();
return Promise.race([
task(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), timeout)
),
]);
};
let lastError: Error | null = null;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await executeWithTimeout();
} catch (error) {
lastError = error as Error;
if (attempt < maxRetries) {
await delay(retryDelay, null);
}
}
}
throw lastError;
}
// 使用
const result = await withRetry(
() => fetch('/api/data').then(r => r.json()),
{ maxRetries: 3, delay: 1000, timeout: 5000 }
);
常见面试问题
Q1: 串行和并行的使用场景?
答案:
| 执行方式 | 场景 | 示例 |
|---|---|---|
| 串行 | 有依赖关系 | 登录 → 获取用户信息 → 加载数据 |
| 并行 | 无依赖关系 | 同时加载多个独立资源 |
| 限流并行 | 大量请求 | 批量上传文件 |
Q2: for...of 和 forEach 的区别?
答案:
// ❌ forEach 不会等待
tasks.forEach(async (task) => {
await task(); // 不会顺序执行
});
// ✅ for...of 正确等待
for (const task of tasks) {
await task(); // 顺序执行
}
forEach 不支持 await,因为它的回调是同步调用的,不会等待返回的 Promise。
Q3: Promise.all 和 Promise.allSettled 的区别?
答案:
| 方法 | 失败处理 | 返回值 |
|---|---|---|
Promise.all | 任一失败就 reject | 成功值数组 |
Promise.allSettled | 等待所有完成 | 状态对象数组 |
// Promise.all: 一个失败全部失败
try {
await Promise.all([
Promise.resolve(1),
Promise.reject('error'),
Promise.resolve(3),
]);
} catch (e) {
console.log(e); // 'error'
}
// Promise.allSettled: 获取所有结果
const results = await Promise.allSettled([
Promise.resolve(1),
Promise.reject('error'),
Promise.resolve(3),
]);
// 可以分别处理成功和失败
Q4: 如何实现一个 compose/pipe 函数?支持异步函数?
答案:
compose 从右到左组合函数,pipe 从左到右组合函数。核心实现是 reduce,异步版本只需让中间结果用 await 串联。
同步版本:
// compose: 从右到左执行 f(g(h(x)))
function compose<T>(...fns: Array<(arg: T) => T>): (arg: T) => T {
return (arg: T) => fns.reduceRight((acc, fn) => fn(acc), arg);
}
// pipe: 从左到右执行 h(g(f(x)))
function pipe<T>(...fns: Array<(arg: T) => T>): (arg: T) => T {
return (arg: T) => fns.reduce((acc, fn) => fn(acc), arg);
}
// 使用
const add1 = (x: number) => x + 1;
const double = (x: number) => x * 2;
const square = (x: number) => x * x;
const composed = compose(square, double, add1);
console.log(composed(3)); // square(double(add1(3))) = square(double(4)) = square(8) = 64
const piped = pipe(add1, double, square);
console.log(piped(3)); // square(double(add1(3))) = 同上 = 64
异步版本(支持 async 函数):
// 异步 compose:从右到左,自动 await 每个函数的结果
function asyncCompose<T>(
...fns: Array<(arg: T) => T | Promise<T>>
): (arg: T) => Promise<T> {
return (arg: T) =>
fns.reduceRight<Promise<T>>(
(promise, fn) => promise.then(fn),
Promise.resolve(arg)
);
}
// 异步 pipe:从左到右,自动 await 每个函数的结果
function asyncPipe<T>(
...fns: Array<(arg: T) => T | Promise<T>>
): (arg: T) => Promise<T> {
return (arg: T) =>
fns.reduce<Promise<T>>(
(promise, fn) => promise.then(fn),
Promise.resolve(arg)
);
}
// 使用示例
const fetchUser = async (id: number): Promise<number> => {
console.log('fetching user', id);
return id;
};
const transform = (id: number): number => id * 10;
const save = async (data: number): Promise<number> => {
console.log('saving', data);
return data;
};
const process = asyncPipe(fetchUser, transform, save);
await process(1); // fetching user 1 -> saving 10 -> 10
类型安全的 pipe(函数重载):
function typedPipe<A, B>(f1: (a: A) => B | Promise<B>): (a: A) => Promise<B>;
function typedPipe<A, B, C>(
f1: (a: A) => B | Promise<B>,
f2: (b: B) => C | Promise<C>
): (a: A) => Promise<C>;
function typedPipe<A, B, C, D>(
f1: (a: A) => B | Promise<B>,
f2: (b: B) => C | Promise<C>,
f3: (c: C) => D | Promise<D>
): (a: A) => Promise<D>;
function typedPipe<A, B, C, D, E>(
f1: (a: A) => B | Promise<B>,
f2: (b: B) => C | Promise<C>,
f3: (c: C) => D | Promise<D>,
f4: (d: D) => E | Promise<E>
): (a: A) => Promise<E>;
function typedPipe(
...fns: Array<(arg: unknown) => unknown>
): (arg: unknown) => Promise<unknown> {
return (arg: unknown) =>
fns.reduce<Promise<unknown>>(
(promise, fn) => promise.then(fn),
Promise.resolve(arg)
);
}
// 每一步的输入输出类型都是安全的
const process2 = typedPipe(
(id: number) => fetch(`/api/users/${id}`), // number => Promise<Response>
(res: Response) => res.json(), // Response => Promise<any>
(data: any) => data.name as string // any => string
);
| 特性 | compose | pipe |
|---|---|---|
| 执行方向 | 右 → 左 | 左 → 右 |
| 实现方式 | reduceRight | reduce |
| 可读性 | 数学风格 | 数据流风格 |
| 常见场景 | 函数式编程、Redux middleware | 数据处理管道 |
Q5: 如何实现一个 retry + 并发限制的请求函数?
答案:
这道题综合了 retry(重试机制) 和 限流调度器 两个能力:每个请求独立重试,同时限制整体并发数,最终通过 Promise.allSettled 收集所有结果。
interface RetryLimitOptions {
maxConcurrent: number; // 最大并发数
maxRetries: number; // 最大重试次数
retryDelay: number; // 重试间隔(ms)
timeout?: number; // 单次请求超时(ms)
}
type TaskResult<T> =
| { status: 'fulfilled'; value: T; retries: number }
| { status: 'rejected'; reason: Error; retries: number };
async function batchRequestWithRetry<T>(
tasks: Array<() => Promise<T>>,
options: RetryLimitOptions
): Promise<TaskResult<T>[]> {
const { maxConcurrent, maxRetries, retryDelay, timeout } = options;
// 带重试的单个任务执行器
async function executeWithRetry(
task: () => Promise<T>
): Promise<TaskResult<T>> {
let lastError: Error = new Error('Unknown error');
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
let result: T;
if (timeout) {
// 带超时控制
result = await Promise.race([
task(),
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error('Request timeout')),
timeout
)
),
]);
} else {
result = await task();
}
return { status: 'fulfilled', value: result, retries: attempt };
} catch (error) {
lastError = error as Error;
if (attempt < maxRetries) {
// 指数退避 + 随机抖动
const delay = retryDelay * Math.pow(2, attempt);
const jitter = delay * 0.5 * Math.random();
await new Promise((r) => setTimeout(r, delay + jitter));
}
}
}
return { status: 'rejected', reason: lastError, retries: maxRetries };
}
// 并发限制调度
const results: TaskResult<T>[] = [];
const executing = new Set<Promise<void>>();
for (let i = 0; i < tasks.length; i++) {
const index = i;
const p = executeWithRetry(tasks[index]).then((result) => {
results[index] = result;
});
const e: Promise<void> = p.finally(() => executing.delete(e));
executing.add(e);
if (executing.size >= maxConcurrent) {
await Promise.race(executing);
}
}
await Promise.all(executing);
return results;
}
使用示例:
const urls = ['/api/1', '/api/2', '/api/3', '/api/4', '/api/5'];
const tasks = urls.map(
(url) => () =>
fetch(url).then((res) => {
if (!res.ok) throw new Error(`HTTP ${res.status}`);
return res.json();
})
);
const results = await batchRequestWithRetry(tasks, {
maxConcurrent: 2, // 同时最多 2 个请求
maxRetries: 3, // 每个请求最多重试 3 次
retryDelay: 1000, // 初始重试间隔 1 秒
timeout: 5000, // 单次请求超时 5 秒
});
// 处理结果
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(
`请求 ${index} 成功(重试了 ${result.retries} 次):`,
result.value
);
} else {
console.error(
`请求 ${index} 失败(共重试 ${result.retries} 次):`,
result.reason.message
);
}
});
// 统计
const succeeded = results.filter((r) => r.status === 'fulfilled').length;
const failed = results.filter((r) => r.status === 'rejected').length;
console.log(`成功: ${succeeded}, 失败: ${failed}`);
要点
- 独立重试:每个请求的重试互不影响,某个请求重试不会阻塞其他请求
- 指数退避 + 抖动:
delay * 2^attempt + random jitter,避免重试雪崩 - 超时控制:用
Promise.race实现单次请求超时 - 结果收集:使用类似
Promise.allSettled的结构,不会因单个失败丢失其他结果 - 并发池:通过
Set<Promise>+Promise.race维护固定大小的并发池