AI SDK 与框架
问题
前端开发 AI 应用时有哪些主流 SDK 和框架?各自的内部原理、核心 API 和适用场景是什么?如何实现跨 Provider 的统一抽象、错误恢复、Token 追踪?
答案
AI 应用开发中,选择合适的 SDK 和框架可以大幅提升开发效率。目前前端生态中最重要的三个选择是 Vercel AI SDK、LangChain.js 和各 LLM 厂商的 官方 SDK(OpenAI / Anthropic)。本文从架构原理、核心 API、Provider 抽象、中间件机制、错误处理、Token 追踪等维度进行深度解析。
本文假设你已了解 前端接入大模型 API 中的基础 API 格式、流式渲染与 SSE 中的流式处理方案,以及 Function Calling 与 AI Agent 中的工具调用机制。这些基础知识是理解各 SDK 封装层的前提。
一、Vercel AI SDK 深入
Vercel AI SDK 是目前前端 AI 开发中最流行的全栈框架,提供从后端生成到前端渲染的完整解决方案。它的核心设计理念是 Provider 抽象 + 流式协议 + React Hooks。
1.1 整体架构
1.2 streamText 内部原理
streamText 是 Vercel AI SDK 最核心的函数。理解其内部流程对调试和扩展至关重要:
// 简化的 streamText 内部实现
async function streamText(options: StreamTextOptions) {
const { model, messages, tools, maxSteps, onChunk, onFinish } = options;
// 1. 通过 Provider 转换为统一格式
const providerRequest = model.convertToProviderRequest(messages, tools);
// 2. 调用 Provider 的底层 doStream 方法
const rawStream = await model.doStream(providerRequest);
// 3. 创建 TransformStream 管道处理原始流
const transformedStream = rawStream
.pipeThrough(new ToolCallParsingTransform()) // 解析工具调用
.pipeThrough(new UsageTrackingTransform()) // 追踪 Token 使用
.pipeThrough(new FinishDetectionTransform()); // 检测完成信号
// 4. Agent 循环:如果有工具调用且未达 maxSteps,继续循环
let currentStep = 0;
while (currentStep < maxSteps) {
const stepResult = await consumeStream(transformedStream);
if (stepResult.hasToolCalls) {
// 执行工具,将结果追加到 messages
const toolResults = await executeTools(stepResult.toolCalls, tools);
messages.push(...toolResults);
currentStep++;
// 重新发起 LLM 请求(带上工具结果)
continue;
}
break; // 没有工具调用,循环结束
}
// 5. 返回 StreamTextResult 对象
return {
toDataStreamResponse: () => encodeToDataStream(transformedStream),
toTextStreamResponse: () => encodeToTextStream(transformedStream),
textStream: createAsyncIterable(transformedStream),
usage: Promise<TokenUsage>, // Token 使用量(在流结束后 resolve)
finishReason: Promise<string>,
steps: Promise<StepResult[]>, // 所有步骤的详情
};
}
maxSteps 参数控制 Agent 循环的最大步数。每一步包含一次完整的 LLM 调用。如果设置 maxSteps: 5,意味着 LLM 最多可以连续调用 5 次工具再给出最终回答。注意:Token 消耗随步数线性增长,因为每步都会发送完整的历史消息。详见 Function Calling 与 AI Agent。
1.3 DataStream 协议格式
toDataStreamResponse() 返回的并不是普通的 SSE 文本流,而是 Vercel AI SDK 自定义的 DataStream 协议。前端 useChat 正是基于这个协议解析数据。
// DataStream 协议每一行格式:TYPE_CODE:JSON_VALUE\n
// 类型码定义:
// 0 — 文本 delta(最常见)
// "0:\"Hello \"\n"
// "0:\"World\"\n"
// 2 — data(自定义数据,通过 data option 发送)
// "2:[{\"sources\":[\"doc1.pdf\"]}]\n"
// 9 — 工具调用开始
// "9:{\"toolCallId\":\"call_1\",\"toolName\":\"getWeather\",\"args\":{\"city\":\"北京\"}}\n"
// a — 工具调用结果
// "a:{\"toolCallId\":\"call_1\",\"result\":{\"temp\":25}}\n"
// e — 错误
// "e:{\"message\":\"Rate limit exceeded\"}\n"
// d — 完成信号(包含 finishReason 和 usage)
// "d:{\"finishReason\":\"stop\",\"usage\":{\"promptTokens\":150,\"completionTokens\":80}}\n"
理解这个协议可以帮助你:
- 调试流式问题:在浏览器 Network 面板中查看原始响应
- 自定义解析:不使用
useChat时手动解析 DataStream - 注入自定义数据:通过
data通道向前端推送元数据
import { streamText, createDataStreamResponse } from 'ai';
import { openai } from '@ai-sdk/openai';
export async function POST(req: Request) {
const { messages } = await req.json();
return createDataStreamResponse({
execute: async (dataStream) => {
// 在流开始前注入自定义数据
dataStream.writeData({ type: 'session', id: 'sess_123' });
const result = streamText({
model: openai('gpt-4o'),
messages,
onChunk: ({ chunk }) => {
// 实时注入额外数据
if (chunk.type === 'tool-result') {
dataStream.writeData({
type: 'tool-status',
toolName: chunk.toolName,
status: 'completed',
});
}
},
});
// 将 AI 流合并到 DataStream
result.mergeIntoDataStream(dataStream);
},
});
}
1.4 useChat 状态管理原理
useChat 的内部状态管理比表面看起来复杂得多,理解这些机制有助于排查常见问题。
function useChat(options: UseChatOptions) {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState<Error | undefined>();
// 关键:使用 ref 存储最新的 messages,避免闭包陷阱
const messagesRef = useRef(messages);
messagesRef.current = messages;
// 流式数据累积器
const abortControllerRef = useRef<AbortController | null>(null);
const handleSubmit = useCallback(async (e?: FormEvent) => {
e?.preventDefault();
if (!input.trim()) return;
const userMessage: Message = {
id: generateId(),
role: 'user',
content: input,
createdAt: new Date(),
};
// 乐观更新:立即显示用户消息 + 空的 assistant 消息
const assistantMessage: Message = {
id: generateId(),
role: 'assistant',
content: '',
createdAt: new Date(),
};
setMessages([...messagesRef.current, userMessage, assistantMessage]);
setInput('');
setIsLoading(true);
const abortController = new AbortController();
abortControllerRef.current = abortController;
try {
const response = await fetch(options.api ?? '/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messagesRef.current, userMessage],
}),
signal: abortController.signal,
});
// 解析 DataStream 协议
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let accumulatedContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n').filter(Boolean);
for (const line of lines) {
const typeCode = line[0];
const data = JSON.parse(line.slice(2));
switch (typeCode) {
case '0': // 文本 delta
accumulatedContent += data;
// 更新最后一条 assistant 消息的 content
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1] = {
...updated[updated.length - 1],
content: accumulatedContent,
};
return updated;
});
break;
case '9': // 工具调用
// 追加到 parts 数组
break;
case 'e': // 错误
setError(new Error(data.message));
break;
case 'd': // 完成
options.onFinish?.(updated[updated.length - 1]);
break;
}
}
}
} catch (err) {
if (err instanceof DOMException && err.name === 'AbortError') return;
setError(err as Error);
options.onError?.(err as Error);
} finally {
setIsLoading(false);
}
}, [input, options]);
const stop = useCallback(() => {
abortControllerRef.current?.abort();
setIsLoading(false);
}, []);
const reload = useCallback(async () => {
// 移除最后一条 assistant 消息,重新发送
const lastUserIndex = messagesRef.current.findLastIndex(m => m.role === 'user');
if (lastUserIndex === -1) return;
setMessages(messagesRef.current.slice(0, lastUserIndex + 1));
// 重新触发请求...
}, []);
return {
messages, input, isLoading, error,
handleInputChange: (e: ChangeEvent<HTMLInputElement>) => setInput(e.target.value),
handleSubmit, stop, reload,
setMessages, // 允许外部直接修改 messages
append, // 追加消息并触发请求
};
}
1.5 核心 API 使用示例
import { streamText, tool } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { z } from 'zod';
export async function POST(req: Request) {
const { messages, model } = await req.json();
// 多模型支持,只需切换 provider
const provider = model === 'claude'
? anthropic('claude-sonnet-4-20250514')
: openai('gpt-4o');
const result = streamText({
model: provider,
messages,
// 内置工具系统
tools: {
getWeather: tool({
description: '获取指定城市的天气信息',
parameters: z.object({
city: z.string().describe('城市名称'),
}),
execute: async ({ city }) => {
const weather = await fetchWeather(city);
return weather;
},
}),
},
// 最大工具调用步骤(Agent 循环)
maxSteps: 5,
});
return result.toDataStreamResponse();
}
'use client';
import { useChat } from '@ai-sdk/react';
export function Chat() {
const {
messages,
input,
handleInputChange,
handleSubmit,
isLoading,
error,
reload, // 重新生成
stop, // 停止生成
} = useChat({
api: '/api/chat',
// 流式回调
onFinish: (message) => {
console.log('生成完成:', message);
},
onError: (error) => {
console.error('出错:', error);
},
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong>
{m.content}
{/* 工具调用结果自动包含在 parts 中 */}
{m.parts?.map((part, i) => {
if (part.type === 'tool-invocation') {
return (
<div key={i}>
调用工具: {part.toolInvocation.toolName}
{part.toolInvocation.state === 'result' && (
<pre>{JSON.stringify(part.toolInvocation.result)}</pre>
)}
</div>
);
}
return null;
})}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit" disabled={isLoading}>发送</button>
{isLoading && <button onClick={stop}>停止</button>}
</form>
</div>
);
}
1.6 结构化输出(Generative UI)
import { streamObject } from 'ai';
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';
const recipeSchema = z.object({
name: z.string(),
ingredients: z.array(z.object({
name: z.string(),
amount: z.string(),
})),
steps: z.array(z.string()),
cookingTime: z.number().describe('烹饪时间(分钟)'),
});
export async function POST(req: Request) {
const { prompt } = await req.json();
const result = streamObject({
model: openai('gpt-4o'),
schema: recipeSchema,
prompt: `Extract recipe from: ${prompt}`,
});
return result.toTextStreamResponse();
}
'use client';
import { useObject } from '@ai-sdk/react';
import { recipeSchema } from '@/lib/schemas';
export function RecipeExtractor() {
const { object, submit, isLoading } = useObject({
api: '/api/extract',
schema: recipeSchema,
});
return (
<div>
<button onClick={() => submit('番茄炒蛋的做法')}>提取食谱</button>
{/* object 在流式过程中逐步填充 */}
{object && (
<div>
<h3>{object.name ?? '加载中...'}</h3>
<ul>
{object.ingredients?.map((ing, i) => (
<li key={i}>{ing?.name}: {ing?.amount}</li>
))}
</ul>
</div>
)}
</div>
);
}
二、Provider 抽象层深入
Provider 模式是 Vercel AI SDK 最关键的设计——它让上层代码完全不感知底层 LLM 的差异。
2.1 Provider 接口规范
// 每个 Provider 需要实现的核心接口
interface LanguageModelV1 {
// 元信息
readonly specificationVersion: 'v1';
readonly provider: string; // "openai" | "anthropic" | ...
readonly modelId: string; // "gpt-4o" | "claude-sonnet-4-20250514" | ...
// 核心方法:非流式生成
doGenerate(options: LanguageModelV1CallOptions): Promise<{
text?: string;
toolCalls?: ToolCallPart[];
finishReason: FinishReason;
usage: TokenUsage;
rawCall: { rawPrompt: unknown; rawSettings: Record<string, unknown> };
}>;
// 核心方法:流式生成
doStream(options: LanguageModelV1CallOptions): Promise<{
stream: ReadableStream<LanguageModelV1StreamPart>;
rawCall: { rawPrompt: unknown; rawSettings: Record<string, unknown> };
}>;
}
// 统一的调用选项
interface LanguageModelV1CallOptions {
inputFormat: 'prompt' | 'messages';
mode: { type: 'regular'; tools?: ToolDefinition[] }
| { type: 'object-json'; schema: JSONSchema }
| { type: 'object-tool'; tool: ToolDefinition };
prompt: LanguageModelV1Prompt;
maxTokens?: number;
temperature?: number;
topP?: number;
frequencyPenalty?: number;
presencePenalty?: number;
seed?: number;
abortSignal?: AbortSignal;
headers?: Record<string, string>;
}
// 流式事件类型
type LanguageModelV1StreamPart =
| { type: 'text-delta'; textDelta: string }
| { type: 'tool-call'; toolCallId: string; toolName: string; args: unknown }
| { type: 'tool-call-delta'; toolCallId: string; argsTextDelta: string }
| { type: 'finish'; finishReason: FinishReason; usage: TokenUsage }
| { type: 'error'; error: unknown };
2.2 自定义 Provider 实现
当需要接入私有部署的 LLM 或非官方支持的 Provider 时,可以实现自定义 Provider:
import type {
LanguageModelV1,
LanguageModelV1CallOptions,
LanguageModelV1StreamPart,
} from 'ai';
// 自定义 Provider:接入内部部署的 LLM 服务
export function createInternalLLM(modelId: string): LanguageModelV1 {
return {
specificationVersion: 'v1',
provider: 'internal',
modelId,
async doGenerate(options) {
const body = convertToInternalFormat(options);
const response = await fetch('https://llm.internal.company.com/v1/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.INTERNAL_LLM_KEY}`,
},
body: JSON.stringify(body),
});
const result = await response.json();
return {
text: result.output,
finishReason: mapFinishReason(result.stop_reason),
usage: {
promptTokens: result.usage.input_tokens,
completionTokens: result.usage.output_tokens,
},
rawCall: { rawPrompt: body, rawSettings: {} },
};
},
async doStream(options) {
const body = convertToInternalFormat(options);
body.stream = true;
const response = await fetch('https://llm.internal.company.com/v1/chat', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.INTERNAL_LLM_KEY}`,
},
body: JSON.stringify(body),
signal: options.abortSignal,
});
// 将内部 SSE 格式转换为 Vercel AI SDK 的统一流格式
const stream = new ReadableStream<LanguageModelV1StreamPart>({
async start(controller) {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let totalUsage = { promptTokens: 0, completionTokens: 0 };
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n');
for (const line of lines) {
if (!line.startsWith('data: ') || line === 'data: [DONE]') continue;
const data = JSON.parse(line.slice(6));
if (data.type === 'content') {
controller.enqueue({
type: 'text-delta',
textDelta: data.text,
});
} else if (data.type === 'done') {
totalUsage = data.usage;
}
}
}
controller.enqueue({
type: 'finish',
finishReason: 'stop',
usage: totalUsage,
});
controller.close();
},
});
return {
stream,
rawCall: { rawPrompt: body, rawSettings: {} },
};
},
};
}
// 使用方式与官方 Provider 完全一致
import { streamText } from 'ai';
const result = streamText({
model: createInternalLLM('internal-gpt-7b'),
messages: [{ role: 'user', content: '你好' }],
});
2.3 Provider Registry 模式
import { experimental_createProviderRegistry as createProviderRegistry } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { google } from '@ai-sdk/google';
import { createInternalLLM } from './custom-provider';
// 创建 Provider 注册表:统一管理所有可用模型
export const registry = createProviderRegistry({
openai,
anthropic,
google,
// 自定义 Provider 也可以注册
internal: {
languageModel: (modelId: string) => createInternalLLM(modelId),
},
});
// 通过字符串 ID 动态获取模型
// 格式:"provider:modelId"
const model = registry.languageModel('openai:gpt-4o');
const model2 = registry.languageModel('anthropic:claude-sonnet-4-20250514');
const model3 = registry.languageModel('internal:gpt-7b');
// 在路由中动态选择模型
export async function POST(req: Request) {
const { messages, modelId } = await req.json();
const result = streamText({
model: registry.languageModel(modelId), // "openai:gpt-4o"
messages,
});
return result.toDataStreamResponse();
}
三、Vercel AI SDK Middleware
Middleware 是 Vercel AI SDK 的高级特性,允许在 LLM 调用前后插入自定义逻辑,类似 Express 中间件。
import {
type Experimental_LanguageModelV1Middleware as LanguageModelMiddleware,
wrapLanguageModel,
} from 'ai';
// 中间件 1:请求/响应日志
const loggingMiddleware: LanguageModelMiddleware = {
transformParams: async ({ params }) => {
console.log('[AI Request]', {
model: params.model,
messageCount: params.prompt.length,
timestamp: new Date().toISOString(),
});
return params;
},
wrapGenerate: async ({ doGenerate, params }) => {
const startTime = Date.now();
const result = await doGenerate();
const duration = Date.now() - startTime;
console.log('[AI Response]', {
finishReason: result.finishReason,
usage: result.usage,
duration: `${duration}ms`,
});
// 上报到监控系统
reportMetrics({
model: params.model,
promptTokens: result.usage.promptTokens,
completionTokens: result.usage.completionTokens,
latency: duration,
});
return result;
},
};
// 中间件 2:语义缓存
const cachingMiddleware: LanguageModelMiddleware = {
wrapGenerate: async ({ doGenerate, params }) => {
const cacheKey = generateCacheKey(params.prompt);
const cached = await redis.get(cacheKey);
if (cached) {
console.log('[Cache HIT]', cacheKey);
return JSON.parse(cached);
}
const result = await doGenerate();
// 只缓存成功的、无工具调用的结果
if (result.finishReason === 'stop' && !result.toolCalls?.length) {
await redis.set(cacheKey, JSON.stringify(result), 'EX', 3600);
}
return result;
},
};
// 中间件 3:内容安全 Guardrail
const guardrailMiddleware: LanguageModelMiddleware = {
wrapGenerate: async ({ doGenerate }) => {
const result = await doGenerate();
// 检查输出是否包含敏感内容
if (result.text) {
const isSafe = await contentSafetyCheck(result.text);
if (!isSafe) {
return {
...result,
text: '抱歉,该回答包含不适当的内容,已被过滤。',
};
}
}
return result;
},
wrapStream: async ({ doStream }) => {
const { stream, ...rest } = await doStream();
let accumulatedText = '';
const filteredStream = stream.pipeThrough(
new TransformStream({
transform(chunk, controller) {
if (chunk.type === 'text-delta') {
accumulatedText += chunk.textDelta;
// 实时检测敏感词
if (containsSensitiveWord(accumulatedText)) {
controller.enqueue({
type: 'text-delta',
textDelta: '[内容已过滤]',
});
return;
}
}
controller.enqueue(chunk);
},
})
);
return { stream: filteredStream, ...rest };
},
};
// 组合中间件并包装模型
export function createGuardedModel(baseModel: LanguageModelV1) {
return wrapLanguageModel({
model: baseModel,
middleware: loggingMiddleware, // 先记录日志
// 可以链式叠加多个 middleware
});
}
// 使用
import { openai } from '@ai-sdk/openai';
import { streamText } from 'ai';
const guardedModel = createGuardedModel(openai('gpt-4o'));
const result = streamText({
model: guardedModel,
messages,
});
四、LangChain.js 深入
LangChain.js 是从 Python LangChain 生态移植的 JavaScript 版本,适合构建复杂的 AI Chain 和 Agent。其核心概念是 LCEL(LangChain Expression Language) 和 Runnable 协议。
4.1 LCEL 管道语法
LCEL 是 LangChain 中编排组件的声明式语法,所有组件都实现 Runnable 接口:
import { ChatOpenAI } from '@langchain/openai';
import { StringOutputParser } from '@langchain/core/output_parsers';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import {
RunnableSequence,
RunnablePassthrough,
RunnableBranch,
RunnableParallel,
RunnableLambda,
} from '@langchain/core/runnables';
const model = new ChatOpenAI({ modelName: 'gpt-4o' });
const parser = new StringOutputParser();
// ---- 1. 基础 pipe 语法 ----
const prompt = ChatPromptTemplate.fromMessages([
['system', '你是一个{expertise}专家,用简洁的中文回答。'],
['human', '{question}'],
]);
// pipe() 是 LCEL 的核心:将组件串联为管道
const basicChain = prompt.pipe(model).pipe(parser);
const result = await basicChain.invoke({
expertise: '前端性能优化',
question: '如何减少首屏加载时间?',
});
// ---- 2. RunnablePassthrough:透传输入 ----
// RunnablePassthrough 将输入原样传递,常用于保留原始数据
const chainWithPassthrough = RunnableSequence.from([
{
// 并行执行:context 走检索,question 原样透传
context: retriever.pipe(formatDocs),
question: new RunnablePassthrough(),
},
prompt,
model,
parser,
]);
// 调用时只需传 question,context 会自动检索
await chainWithPassthrough.invoke('什么是虚拟 DOM?');
// ---- 3. RunnableParallel:并行执行 ----
const parallelChain = RunnableParallel.from({
summary: summaryChain,
translation: translationChain,
keywords: keywordsChain,
});
// 三条 Chain 并行执行,结果合并为对象
const results = await parallelChain.invoke({ text: '很长的文章...' });
// { summary: "...", translation: "...", keywords: ["..."] }
4.2 RunnableBranch 条件路由
import { RunnableBranch, RunnableLambda } from '@langchain/core/runnables';
import { ChatPromptTemplate } from '@langchain/core/prompts';
// RunnableBranch:根据条件选择不同的处理链
const routingChain = RunnableBranch.from([
// [条件函数, 对应的 Chain]
[
(input: { topic: string }) => input.topic === 'code',
ChatPromptTemplate.fromMessages([
['system', '你是一个代码专家,用代码示例回答。'],
['human', '{question}'],
]).pipe(model).pipe(parser),
],
[
(input: { topic: string }) => input.topic === 'math',
ChatPromptTemplate.fromMessages([
['system', '你是一个数学教授,用公式和推导回答。'],
['human', '{question}'],
]).pipe(model).pipe(parser),
],
// 默认分支(最后一个参数,不需要条件)
ChatPromptTemplate.fromMessages([
['system', '你是一个通用助手。'],
['human', '{question}'],
]).pipe(model).pipe(parser),
]);
// 自动路由到代码专家
await routingChain.invoke({
topic: 'code',
question: '如何实现防抖函数?',
});
// 使用 RunnableLambda 实现更复杂的路由逻辑
const smartRouter = new RunnableLambda({
func: async (input: { question: string }) => {
// 先用 LLM 判断问题类别
const category = await classificationChain.invoke(input);
// 根据类别选择不同的专家链
switch (category) {
case 'frontend': return frontendExpertChain.invoke(input);
case 'backend': return backendExpertChain.invoke(input);
default: return generalChain.invoke(input);
}
},
});
4.3 Memory 记忆类型
LangChain.js 提供多种 Memory 类型,适用于不同场景:
import { BufferMemory } from 'langchain/memory';
import { ConversationSummaryMemory } from 'langchain/memory';
import { VectorStoreRetrieverMemory } from 'langchain/memory';
import { ChatOpenAI, OpenAIEmbeddings } from '@langchain/openai';
import { MemoryVectorStore } from 'langchain/vectorstores/memory';
// 1. BufferMemory:完整保留所有对话历史
const bufferMemory = new BufferMemory({
memoryKey: 'chat_history',
returnMessages: true, // 返回 Message 对象而非字符串
inputKey: 'input',
outputKey: 'output',
});
await bufferMemory.saveContext(
{ input: '我叫张三' },
{ output: '你好,张三!' }
);
// 优点:信息无损 | 缺点:长对话 Token 消耗大
// 2. ConversationSummaryMemory:用 LLM 压缩历史为摘要
const summaryMemory = new ConversationSummaryMemory({
llm: new ChatOpenAI({ modelName: 'gpt-4o-mini' }), // 用小模型做摘要
memoryKey: 'chat_history',
returnMessages: true,
});
// 多轮对话后自动生成摘要:"用户名为张三,讨论了前端性能优化..."
// 优点:Token 消耗恒定 | 缺点:细节可能丢失
// 3. VectorStoreMemory:基于向量相似度检索相关记忆
const vectorStore = new MemoryVectorStore(new OpenAIEmbeddings());
const vectorMemory = new VectorStoreRetrieverMemory({
vectorStoreRetriever: vectorStore.asRetriever(3), // 检索 top 3 相关记忆
memoryKey: 'relevant_history',
});
// 只检索与当前问题相关的历史对话
// 优点:长期记忆、按需检索 | 缺点:需要向量存储基础设施
| Memory 类型 | Token 消耗 | 信息完整性 | 适用场景 |
|---|---|---|---|
| BufferMemory | 随对话线性增长 | 完全保留 | 短对话(< 20 轮) |
| BufferWindowMemory | 固定窗口(k 轮) | 只保留最近 k 轮 | 中等对话,关注近期上下文 |
| ConversationSummaryMemory | 近似恒定 | 摘要,细节可能丢失 | 长对话(> 50 轮) |
| VectorStoreMemory | 按检索数量 | 语义相关的完整记录 | 超长期记忆、知识助手 |
Vercel AI SDK 的 useChat 采用最简单的方式——将完整 messages 数组发送给后端,相当于 BufferMemory。如果需要更智能的记忆管理(摘要、向量检索),可以在后端结合 LangChain.js 的 Memory 组件处理后再传给 LLM。
4.4 RAG Chain
import { ChatOpenAI, OpenAIEmbeddings } from '@langchain/openai';
import { MemoryVectorStore } from 'langchain/vectorstores/memory';
import { RecursiveCharacterTextSplitter } from '@langchain/textsplitters';
import { createRetrievalChain } from 'langchain/chains/retrieval';
import { createStuffDocumentsChain } from 'langchain/chains/combine_documents';
import { ChatPromptTemplate } from '@langchain/core/prompts';
// 1. 文档分割
const splitter = new RecursiveCharacterTextSplitter({
chunkSize: 1000,
chunkOverlap: 200,
});
const docs = await splitter.createDocuments([documentContent]);
// 2. 向量存储
const vectorStore = await MemoryVectorStore.fromDocuments(
docs,
new OpenAIEmbeddings()
);
// 3. 构建 RAG Chain
const prompt = ChatPromptTemplate.fromMessages([
['system', '基于以下上下文回答问题:\n\n{context}'],
['human', '{input}'],
]);
const combineDocsChain = await createStuffDocumentsChain({
llm: new ChatOpenAI({ modelName: 'gpt-4o' }),
prompt,
});
const retrievalChain = await createRetrievalChain({
retriever: vectorStore.asRetriever({ k: 3 }),
combineDocsChain,
});
const result = await retrievalChain.invoke({
input: '什么是虚拟 DOM?',
});
更多 RAG 实现细节请参考 RAG 检索增强生成。
五、OpenAI SDK 深入
当需要最大控制力或使用最新 API 特性时,直接使用 OpenAI 官方 SDK。
5.1 结构化输出与流式辅助方法
import OpenAI from 'openai';
import { zodResponseFormat } from 'openai/helpers/zod';
import { z } from 'zod';
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// Structured Outputs:使用 response_format 确保 JSON 格式
const ProductReview = z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
score: z.number().min(0).max(10),
pros: z.array(z.string()),
cons: z.array(z.string()),
summary: z.string(),
});
const completion = await client.chat.completions.create({
model: 'gpt-4o',
messages: [
{ role: 'system', content: '分析用户评论的情感。' },
{ role: 'user', content: '这个产品质量很好,但价格太贵了。' },
],
response_format: zodResponseFormat(ProductReview, 'review_analysis'),
});
// 返回的 content 是严格符合 Schema 的 JSON
const review = JSON.parse(completion.choices[0].message.content!);
// { sentiment: "neutral", score: 6, pros: ["质量好"], cons: ["价格贵"], summary: "..." }
两者都可以让 LLM 输出结构化 JSON,但用途不同:
- response_format:让 LLM 的回答内容是 JSON 格式(Structured Outputs)
- Function Calling:让 LLM 决定调用哪个工具并生成参数 JSON
简单来说:response_format 控制输出格式,Function Calling 控制行为决策。详见 Function Calling 与 AI Agent。
5.2 流式辅助方法
import OpenAI from 'openai';
const client = new OpenAI();
// 方式 1:.stream() 辅助方法(推荐)
const stream = client.chat.completions.stream({
model: 'gpt-4o',
messages: [{ role: 'user', content: '解释 React Fiber' }],
});
// 事件驱动 API
stream.on('content', (delta, snapshot) => {
// delta: 增量文本
// snapshot: 完整累积文本
process.stdout.write(delta);
});
stream.on('message', (message) => {
// 完整的 ChatCompletionMessage
console.log('完成:', message);
});
stream.on('totalUsage', (usage) => {
console.log('Token 使用:', usage);
});
// 也可以 await 最终结果
const finalMessage = await stream.finalChatCompletion();
// 方式 2:原始 stream(需要手动解析)
const rawStream = await client.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: '你好' }],
stream: true,
stream_options: { include_usage: true }, // 流式也返回 usage
});
for await (const chunk of rawStream) {
const content = chunk.choices[0]?.delta?.content ?? '';
process.stdout.write(content);
// 最后一个 chunk 包含 usage 信息
if (chunk.usage) {
console.log('prompt_tokens:', chunk.usage.prompt_tokens);
console.log('completion_tokens:', chunk.usage.completion_tokens);
}
}
六、Anthropic SDK 深入
Anthropic SDK 有一些独特特性,特别是 Extended Thinking 和 Prompt Caching。
6.1 Extended Thinking(扩展思维)
Extended Thinking 让 Claude 在回答前进行深度推理,思维过程会以 thinking block 形式返回:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
// 启用扩展思维
const response = await client.messages.create({
model: 'claude-sonnet-4-20250514',
max_tokens: 16000,
thinking: {
type: 'enabled',
budget_tokens: 10000, // 思维过程的 Token 预算
},
messages: [{
role: 'user',
content: '设计一个支持百万并发的 WebSocket 消息推送系统的前端架构',
}],
});
// 响应中包含 thinking 和 text 两种 content block
for (const block of response.content) {
if (block.type === 'thinking') {
// 思维过程(内部推理链,可用于调试)
console.log('[思考过程]:', block.thinking);
} else if (block.type === 'text') {
// 最终回答
console.log('[回答]:', block.text);
}
}
// 流式 + 扩展思维
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 16000,
thinking: { type: 'enabled', budget_tokens: 10000 },
messages: [{ role: 'user', content: '比较 React 和 Vue 的 Diff 算法差异' }],
});
stream.on('thinking', (thinking) => {
// 思维过程逐步流出
process.stdout.write(`[思考] ${thinking}`);
});
stream.on('text', (text) => {
// 最终回答逐步流出
process.stdout.write(text);
});
await stream.finalMessage();
budget_tokens会消耗额外 Token(计费的),但不会出现在最终回答中- 思维内容可能包含不完整的中间推理,不建议直接展示给用户
- 适合复杂推理(数学、代码设计、系统架构),简单对话不需要开启
max_tokens必须大于budget_tokens,否则留给回答的 Token 不足
6.2 Prompt Caching
Prompt Caching 让 Anthropic 缓存长 system prompt 和上下文,大幅减少重复请求的 Token 消耗和延迟:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
// 使用 cache_control 标记需要缓存的内容
const response = await client.messages.create({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
system: [
{
type: 'text',
text: '你是一个前端面试官,基于以下知识库内容进行提问和评估...',
},
{
type: 'text',
text: longKnowledgeBase, // 很长的知识库文本(> 1024 tokens)
cache_control: { type: 'ephemeral' }, // 标记为可缓存
},
],
messages: [{ role: 'user', content: '请考察我关于 React Hooks 的知识' }],
});
// 查看缓存效果
console.log('缓存命中 tokens:', response.usage.cache_read_input_tokens);
console.log('缓存创建 tokens:', response.usage.cache_creation_input_tokens);
// 首次请求:cache_creation > 0, cache_read = 0
// 后续请求:cache_creation = 0, cache_read > 0(费用降低 90%)
| 缓存场景 | 输入 Token 费用 | 延迟改善 |
|---|---|---|
| 无缓存 | 100%(基准) | - |
| 首次缓存写入 | 125%(额外 25%) | 无 |
| 缓存命中 | 10%(节省 90%) | 减少 ~85% |
6.3 Batch API
Batch API 允许批量异步发送请求,费用减半(50% 折扣),适合离线批处理场景:
import Anthropic from '@anthropic-ai/sdk';
const client = new Anthropic();
// 创建批量请求
const batch = await client.messages.batches.create({
requests: [
{
custom_id: 'review-1',
params: {
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: '评审这段代码...' }],
},
},
{
custom_id: 'review-2',
params: {
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: '评审另一段代码...' }],
},
},
// 最多 10000 个请求
],
});
console.log('Batch ID:', batch.id);
// 批处理在 24 小时内完成
// 轮询或 Webhook 获取结果
const result = await client.messages.batches.retrieve(batch.id);
if (result.processing_status === 'ended') {
// 获取结果
for await (const entry of client.messages.batches.results(batch.id)) {
console.log(`[${entry.custom_id}]`, entry.result);
}
}
6.4 Tool Use 差异
Anthropic 的 Tool Use 与 OpenAI 的 Function Calling 在 API 格式上有差异:
// ---- OpenAI Function Calling ----
const openaiRequest = {
model: 'gpt-4o',
messages: [{ role: 'user', content: '北京今天天气怎么样?' }],
tools: [{
type: 'function',
function: {
name: 'getWeather',
description: '获取天气信息',
parameters: { // JSON Schema 格式
type: 'object',
properties: {
city: { type: 'string', description: '城市名' },
},
required: ['city'],
},
},
}],
};
// OpenAI 工具调用结果在 message.tool_calls 数组中
// response.choices[0].message.tool_calls[0]
// { id: "call_xxx", type: "function", function: { name: "getWeather", arguments: '{"city":"北京"}' } }
// 注意:arguments 是 JSON 字符串,需要 JSON.parse
// ---- Anthropic Tool Use ----
const anthropicRequest = {
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: '北京今天天气怎么样?' }],
tools: [{
name: 'getWeather',
description: '获取天气信息',
input_schema: { // 同样是 JSON Schema,但字段名不同
type: 'object',
properties: {
city: { type: 'string', description: '城市名' },
},
required: ['city'],
},
}],
};
// Anthropic 工具调用在 content blocks 中
// response.content[0]
// { type: "tool_use", id: "toolu_xxx", name: "getWeather", input: { city: "北京" } }
// 注意:input 已经是解析好的对象,不需要 JSON.parse
| 差异点 | OpenAI | Anthropic |
|---|---|---|
| 工具定义字段 | tools[].function.parameters | tools[].input_schema |
| 调用结果位置 | message.tool_calls[] | content[](type: tool_use) |
| 参数格式 | JSON 字符串(需 parse) | 已解析对象 |
| 结果返回 | role: "tool" 消息 | role: "user" + tool_result content block |
| 并行调用 | 支持(多个 tool_calls) | 支持(多个 tool_use blocks) |
| 停止原因 | finish_reason: "tool_calls" | stop_reason: "tool_use" |
这就是 Vercel AI SDK Provider 抽象层的价值所在——上层使用统一的 tool() 定义和 Zod schema,Provider 内部自动转换为 OpenAI 或 Anthropic 的格式。你不需要关心这些差异。
七、错误处理与重试策略
AI 应用的错误处理比传统 API 更复杂,因为 LLM 请求耗时长、费用高、错误种类多。
7.1 常见错误类型
7.2 跨 SDK 统一错误处理
import OpenAI from 'openai';
import Anthropic from '@anthropic-ai/sdk';
// 统一错误类型
interface AIError {
type: 'rate_limit' | 'auth' | 'server' | 'token_limit' | 'content_filter' | 'network' | 'unknown';
message: string;
retryable: boolean;
retryAfterMs?: number;
originalError: unknown;
}
// 将不同 SDK 的错误统一化
function normalizeError(error: unknown): AIError {
// OpenAI SDK 错误
if (error instanceof OpenAI.APIError) {
if (error.status === 429) {
const retryAfter = error.headers?.['retry-after'];
return {
type: 'rate_limit',
message: '请求频率超限,请稍后重试',
retryable: true,
retryAfterMs: retryAfter ? parseInt(retryAfter) * 1000 : 60000,
originalError: error,
};
}
if (error.status === 401 || error.status === 403) {
return {
type: 'auth',
message: 'API 认证失败',
retryable: false,
originalError: error,
};
}
if (error.status === 500 || error.status === 503) {
return {
type: 'server',
message: '服务暂时不可用',
retryable: true,
retryAfterMs: 5000,
originalError: error,
};
}
if (error.code === 'context_length_exceeded') {
return {
type: 'token_limit',
message: '输入内容过长,超出模型上下文限制',
retryable: false,
originalError: error,
};
}
}
// Anthropic SDK 错误
if (error instanceof Anthropic.APIError) {
if (error.status === 429) {
return {
type: 'rate_limit',
message: '请求频率超限',
retryable: true,
retryAfterMs: 60000,
originalError: error,
};
}
if (error.status === 529) {
// Anthropic 特有:API overloaded
return {
type: 'server',
message: 'Anthropic API 过载',
retryable: true,
retryAfterMs: 30000,
originalError: error,
};
}
}
// 网络错误
if (error instanceof TypeError && (error.message.includes('fetch') || error.message.includes('network'))) {
return {
type: 'network',
message: '网络连接失败',
retryable: true,
retryAfterMs: 3000,
originalError: error,
};
}
return {
type: 'unknown',
message: error instanceof Error ? error.message : '未知错误',
retryable: false,
originalError: error,
};
}
7.3 指数退避重试
interface RetryOptions {
maxRetries: number;
initialDelayMs: number;
maxDelayMs: number;
backoffMultiplier: number;
jitterMs: number; // 随机抖动,避免请求风暴
}
const DEFAULT_RETRY_OPTIONS: RetryOptions = {
maxRetries: 3,
initialDelayMs: 1000,
maxDelayMs: 60000,
backoffMultiplier: 2,
jitterMs: 500,
};
async function withRetry<T>(
fn: () => Promise<T>,
options: Partial<RetryOptions> = {},
): Promise<T> {
const config = { ...DEFAULT_RETRY_OPTIONS, ...options };
let lastError: AIError | undefined;
for (let attempt = 0; attempt <= config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = normalizeError(error);
// 不可重试的错误直接抛出
if (!lastError.retryable) {
throw lastError;
}
// 最后一次重试也失败了
if (attempt === config.maxRetries) {
throw lastError;
}
// 计算退避时间
const baseDelay = lastError.retryAfterMs ??
Math.min(
config.initialDelayMs * Math.pow(config.backoffMultiplier, attempt),
config.maxDelayMs,
);
const jitter = Math.random() * config.jitterMs;
const delay = baseDelay + jitter;
console.warn(
`[AI Retry] Attempt ${attempt + 1}/${config.maxRetries}, ` +
`type: ${lastError.type}, retrying in ${Math.round(delay)}ms`
);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError;
}
// 使用示例
const result = await withRetry(
() => streamText({
model: openai('gpt-4o'),
messages,
}),
{ maxRetries: 3, initialDelayMs: 2000 },
);
7.4 Provider Fallback(多 Provider 容灾)
import { streamText, type LanguageModelV1 } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { google } from '@ai-sdk/google';
interface FallbackConfig {
models: LanguageModelV1[];
maxRetriesPerModel: number;
}
// 多 Provider 容灾:依次尝试,直到成功
async function streamWithFallback(
config: FallbackConfig,
options: Omit<Parameters<typeof streamText>[0], 'model'>,
) {
const errors: Array<{ model: string; error: AIError }> = [];
for (const model of config.models) {
for (let attempt = 0; attempt < config.maxRetriesPerModel; attempt++) {
try {
console.log(`[Fallback] 尝试: ${model.provider}:${model.modelId} (attempt ${attempt + 1})`);
const result = streamText({
...options,
model,
});
// 验证流是否正常启动(读取第一个 chunk)
// 这一步很重要:有些错误只在开始读取流时才暴露
return result;
} catch (error) {
const aiError = normalizeError(error);
errors.push({ model: `${model.provider}:${model.modelId}`, error: aiError });
// 认证错误不重试,直接切换 Provider
if (aiError.type === 'auth') break;
// 限流等待后重试
if (aiError.retryAfterMs) {
await new Promise(r => setTimeout(r, aiError.retryAfterMs));
}
}
}
}
// 所有 Provider 都失败
throw new Error(
`所有 Provider 均失败:\n${errors.map(e => ` ${e.model}: ${e.error.message}`).join('\n')}`
);
}
// 使用示例
const result = await streamWithFallback(
{
models: [
anthropic('claude-sonnet-4-20250514'), // 首选
openai('gpt-4o'), // 备选 1
google('gemini-1.5-pro'), // 备选 2
],
maxRetriesPerModel: 2,
},
{
messages,
tools,
maxSteps: 5,
},
);
八、Token 使用追踪
Token 是 AI 应用的核心成本,精确追踪和控制 Token 使用是生产环境的基本要求。
8.1 各 SDK 的 Token 追踪方式
// ---- 1. Vercel AI SDK ----
import { streamText, generateText } from 'ai';
import { openai } from '@ai-sdk/openai';
// 非流式:直接在结果中获取
const genResult = await generateText({
model: openai('gpt-4o'),
prompt: '你好',
});
console.log('Vercel AI SDK usage:', genResult.usage);
// { promptTokens: 10, completionTokens: 5, totalTokens: 15 }
// 流式:通过 Promise 或回调获取
const streamResult = streamText({
model: openai('gpt-4o'),
messages,
onFinish: ({ usage }) => {
// 流结束后回调
console.log('Token 使用:', usage);
},
});
// 或者 await usage Promise(流结束后 resolve)
const usage = await streamResult.usage;
// ---- 2. OpenAI SDK ----
import OpenAI from 'openai';
const oaiClient = new OpenAI();
// 非流式
const oaiResult = await oaiClient.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: '你好' }],
});
console.log('OpenAI usage:', oaiResult.usage);
// { prompt_tokens: 10, completion_tokens: 5, total_tokens: 15 }
// 流式:需要显式启用 stream_options
const oaiStream = await oaiClient.chat.completions.create({
model: 'gpt-4o',
messages: [{ role: 'user', content: '你好' }],
stream: true,
stream_options: { include_usage: true }, // 必须显式开启
});
for await (const chunk of oaiStream) {
if (chunk.usage) {
// 最后一个 chunk 包含 usage
console.log('流式 usage:', chunk.usage);
}
}
// ---- 3. Anthropic SDK ----
import Anthropic from '@anthropic-ai/sdk';
const antClient = new Anthropic();
const antResult = await antClient.messages.create({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
messages: [{ role: 'user', content: '你好' }],
});
console.log('Anthropic usage:', antResult.usage);
// { input_tokens: 10, output_tokens: 5, cache_creation_input_tokens: 0, cache_read_input_tokens: 0 }
// 注意:Anthropic 还额外返回缓存相关的 Token 信息
8.2 Token 使用监控系统
interface TokenRecord {
requestId: string;
model: string;
provider: string;
promptTokens: number;
completionTokens: number;
totalTokens: number;
estimatedCost: number; // 美分
timestamp: Date;
userId?: string;
feature?: string; // 哪个功能消耗的
}
// 各模型定价(每百万 Token,美元)
const PRICING: Record<string, { input: number; output: number }> = {
'gpt-4o': { input: 2.5, output: 10 },
'gpt-4o-mini': { input: 0.15, output: 0.6 },
'claude-sonnet-4-20250514': { input: 3, output: 15 },
'claude-haiku-4-20250514': { input: 0.8, output: 4 },
'gemini-1.5-pro': { input: 1.25, output: 5 },
};
class TokenTracker {
private records: TokenRecord[] = [];
track(record: Omit<TokenRecord, 'estimatedCost' | 'timestamp'>) {
const pricing = PRICING[record.model];
const estimatedCost = pricing
? (record.promptTokens * pricing.input + record.completionTokens * pricing.output) / 1_000_000
: 0;
this.records.push({
...record,
estimatedCost,
timestamp: new Date(),
});
// 持久化到数据库
this.persist(record);
}
// 获取统计摘要
getSummary(filter?: { userId?: string; feature?: string; since?: Date }) {
let filtered = this.records;
if (filter?.userId) filtered = filtered.filter(r => r.userId === filter.userId);
if (filter?.feature) filtered = filtered.filter(r => r.feature === filter.feature);
if (filter?.since) filtered = filtered.filter(r => r.timestamp >= filter.since);
return {
totalRequests: filtered.length,
totalTokens: filtered.reduce((sum, r) => sum + r.totalTokens, 0),
totalCost: filtered.reduce((sum, r) => sum + r.estimatedCost, 0),
byModel: this.groupBy(filtered, 'model'),
byFeature: this.groupBy(filtered, 'feature'),
};
}
// 预算检查:超出预算时阻止请求
async checkBudget(userId: string, dailyLimitCents: number): Promise<boolean> {
const today = new Date();
today.setHours(0, 0, 0, 0);
const summary = this.getSummary({ userId, since: today });
return summary.totalCost < dailyLimitCents;
}
private groupBy(records: TokenRecord[], key: keyof TokenRecord) {
return records.reduce((acc, r) => {
const k = String(r[key] ?? 'unknown');
if (!acc[k]) acc[k] = { count: 0, tokens: 0, cost: 0 };
acc[k].count++;
acc[k].tokens += r.totalTokens;
acc[k].cost += r.estimatedCost;
return acc;
}, {} as Record<string, { count: number; tokens: number; cost: number }>);
}
private async persist(record: any) {
// 写入数据库...
}
}
export const tokenTracker = new TokenTracker();
九、框架综合对比
9.1 同一任务的多框架实现
以「带工具调用的流式对话」为例,对比各框架的实现差异:
- Vercel AI SDK
- LangChain.js
- OpenAI SDK
import { streamText, tool } from 'ai';
import { openai } from '@ai-sdk/openai';
import { z } from 'zod';
// Vercel AI SDK:最简洁,内置工具系统
const result = streamText({
model: openai('gpt-4o'),
messages: [{ role: 'user', content: '北京今天天气如何?' }],
tools: {
getWeather: tool({
description: '获取天气',
parameters: z.object({ city: z.string() }),
execute: async ({ city }) => fetchWeather(city),
}),
},
maxSteps: 3,
});
return result.toDataStreamResponse();
// 前端用 useChat 自动处理一切
import { ChatOpenAI } from '@langchain/openai';
import { DynamicStructuredTool } from '@langchain/core/tools';
import { AgentExecutor, createOpenAIFunctionsAgent } from 'langchain/agents';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import { z } from 'zod';
// LangChain.js:更多配置,但灵活性更高
const weatherTool = new DynamicStructuredTool({
name: 'getWeather',
description: '获取天气',
schema: z.object({ city: z.string() }),
func: async ({ city }) => JSON.stringify(await fetchWeather(city)),
});
const model = new ChatOpenAI({ modelName: 'gpt-4o', streaming: true });
const prompt = ChatPromptTemplate.fromMessages([
['system', '你是一个有用的助手。'],
['placeholder', '{chat_history}'],
['human', '{input}'],
['placeholder', '{agent_scratchpad}'],
]);
const agent = await createOpenAIFunctionsAgent({ llm: model, tools: [weatherTool], prompt });
const executor = new AgentExecutor({ agent, tools: [weatherTool] });
const stream = await executor.stream({ input: '北京今天天气如何?' });
// 需要自行将结果转为 SSE 发送给前端
import OpenAI from 'openai';
const client = new OpenAI();
// OpenAI SDK:最底层,需要手动管理 Agent 循环
const tools: OpenAI.Chat.ChatCompletionTool[] = [{
type: 'function',
function: {
name: 'getWeather',
description: '获取天气',
parameters: {
type: 'object',
properties: { city: { type: 'string' } },
required: ['city'],
},
},
}];
const messages: OpenAI.Chat.ChatCompletionMessageParam[] = [
{ role: 'user', content: '北京今天天气如何?' },
];
// 手动 Agent 循环
while (true) {
const response = await client.chat.completions.create({
model: 'gpt-4o',
messages,
tools,
});
const choice = response.choices[0];
messages.push(choice.message);
if (choice.finish_reason === 'tool_calls') {
for (const toolCall of choice.message.tool_calls!) {
const args = JSON.parse(toolCall.function.arguments);
const result = await fetchWeather(args.city);
messages.push({
role: 'tool',
tool_call_id: toolCall.id,
content: JSON.stringify(result),
});
}
continue; // 继续循环
}
// 非工具调用,结束
return choice.message.content;
}
9.2 综合对比表
| 维度 | Vercel AI SDK | LangChain.js | OpenAI SDK | Anthropic SDK |
|---|---|---|---|---|
| 定位 | 全栈 AI UI 框架 | AI 应用编排框架 | OpenAI 底层封装 | Anthropic 底层封装 |
| 前端集成 | useChat/useObject/useCompletion | 需自行封装 | 需自行封装 | 需自行封装 |
| 多模型支持 | Provider 抽象层(30+ 模型) | 多 LLM Adapter | 仅 OpenAI | 仅 Anthropic |
| 流式处理 | DataStream 协议 + React Hooks | Runnable.stream() | .stream() 辅助方法 | .stream() + 事件 |
| 工具调用 | tool() + Zod | DynamicStructuredTool | tools[] + JSON Schema | tools[] + input_schema |
| Agent | maxSteps 循环 | AgentExecutor + 多种策略 | 手动循环 | 手动循环 |
| RAG | 需配合其他库 | 内置完整 RAG 管道 | 需自行实现 | 需自行实现 |
| Memory | messages 数组(最简) | Buffer/Summary/Vector | 手动管理 | 手动管理 |
| 中间件 | wrapLanguageModel | RunnableMiddleware | 无(需自行包装) | 无(需自行包装) |
| Token 追踪 | usage Promise/callback | callbacks 系统 | response.usage | response.usage |
| 错误处理 | 内置重试逻辑 | 可配置 fallback | APIError 类 | APIError 类 |
| TypeScript | 原生、类型安全 | 良好 | 原生、类型安全 | 原生、类型安全 |
| 学习曲线 | 低 | 中高 | 低 | 低 |
| 包体积 | ~50KB (core) | ~200KB+ | ~30KB | ~20KB |
| 适用场景 | Next.js AI 应用 | 复杂 Chain/Agent/RAG | 直接使用 OpenAI 最新特性 | 直接使用 Claude 最新特性 |
- Next.js 项目:首选 Vercel AI SDK,前后端一体化体验最好
- 复杂 RAG / Agent:选 LangChain.js,Chain 编排能力强
- 简单对接 / 最新 API 特性:直接用官方 SDK
- 混合方案:后端用 LangChain.js 编排,前端用 Vercel AI SDK 的 Hooks
- 需要 Extended Thinking:使用 Anthropic SDK 或 Vercel AI SDK(anthropic provider 支持)
- 成本敏感:Anthropic Batch API(50% 折扣)+ Prompt Caching(90% 节省)
常见面试问题
Q1: Vercel AI SDK 的 DataStream 协议是怎么工作的?Wire Format 是什么样的?
答案:
DataStream 是 Vercel AI SDK 自定义的流式传输协议,建立在 HTTP 流之上。每一行的格式为 TYPE_CODE:JSON_VALUE\n,类型码定义了数据类型:
0:文本 delta(最常见),如0:"Hello "\n2:自定义 data,如2:[{"key":"value"}]\n9:工具调用,如9:{"toolCallId":"call_1","toolName":"getWeather"}\na:工具结果e:错误信息d:完成信号,包含finishReason和usage
前端 useChat 通过 ReadableStream 逐行读取并解析这个协议。文本 delta 会累积到最后一条 assistant 消息的 content 中,工具调用会累积到 parts 数组中。d(finish)事件触发 onFinish 回调。
与普通 SSE 的区别是:SSE 用 event: / data: 字段分隔,而 DataStream 用更紧凑的单行 TYPE:VALUE 格式,减少了传输开销。你可以在浏览器 DevTools Network 面板中直接查看原始 DataStream 数据来调试流式问题。
Q2: useChat vs useCompletion vs useObject 分别在什么场景下使用?
答案:
三个 Hook 对应不同的 AI 交互模式:
| Hook | 后端 API | 数据格式 | 适用场景 |
|---|---|---|---|
| useChat | streamText → toDataStreamResponse() | Message[](多轮对话) | 聊天机器人、客服、AI 助手 |
| useCompletion | streamText → toTextStreamResponse() | string(纯文本) | 文本补全、单次生成、写作辅助 |
| useObject | streamObject → toTextStreamResponse() | Partial<T>(结构化对象) | 数据提取、表单填充、Generative UI |
关键区别:
- useChat 维护完整的消息历史(
messages数组),支持多轮对话、工具调用结果展示、reload(重新生成)、stop(停止生成) - useCompletion 只维护最新一次的
completion字符串,不管理对话历史,适合一问一答的场景 - useObject 返回的
object在流式过程中是Partial<T>类型——字段逐步被填充,你可以在 UI 上展示正在加载的状态
// useObject 的核心价值:流式结构化数据
const { object } = useObject({ api: '/api/extract', schema: mySchema });
// 流式过程中 object 逐步填充:
// 第 1 秒: { name: "番茄炒蛋" }
// 第 2 秒: { name: "番茄炒蛋", ingredients: [{ name: "番茄" }] }
// 第 3 秒: { name: "番茄炒蛋", ingredients: [{ name: "番茄", amount: "2个" }] }
Q3: 如何在 Vercel AI SDK 中实现自定义 Middleware?
答案:
Vercel AI SDK 的 Middleware 通过 wrapLanguageModel 包装模型实现,它拦截模型的 doGenerate 和 doStream 调用。Middleware 接口包含三个可选方法:
import {
type Experimental_LanguageModelV1Middleware as LanguageModelMiddleware,
wrapLanguageModel,
} from 'ai';
const myMiddleware: LanguageModelMiddleware = {
// 1. transformParams:修改请求参数(在调用 LLM 之前)
transformParams: async ({ params }) => {
// 例如:自动注入 system prompt
return {
...params,
prompt: [
{ role: 'system', content: '你始终使用中文回答。' },
...params.prompt,
],
};
},
// 2. wrapGenerate:包装非流式调用
wrapGenerate: async ({ doGenerate, params }) => {
const start = Date.now();
const result = await doGenerate();
// 例如:记录 Token 使用
tokenTracker.track({
model: params.model,
usage: result.usage,
latency: Date.now() - start,
});
return result;
},
// 3. wrapStream:包装流式调用
wrapStream: async ({ doStream, params }) => {
const { stream, ...rest } = await doStream();
// 例如:过滤流中的敏感内容
const filtered = stream.pipeThrough(new ContentFilterTransform());
return { stream: filtered, ...rest };
},
};
// 使用 wrapLanguageModel 将 middleware 应用到模型上
const enhancedModel = wrapLanguageModel({
model: openai('gpt-4o'),
middleware: myMiddleware,
});
// 使用方式不变
streamText({ model: enhancedModel, messages });
典型的 Middleware 用例:日志监控、缓存层、内容安全过滤、Token 追踪、请求/响应修改。Middleware 可以嵌套使用(多次 wrap),执行顺序为外层先执行。
Q4: LangChain.js 的 LCEL 与手动 Chain 相比有什么优劣?
答案:
LCEL(LangChain Expression Language)通过 .pipe() 语法将组件声明式串联,而手动 Chain 则是命令式地逐步调用。
LCEL 优势:
- 声明式:
prompt.pipe(model).pipe(parser)一行表达完整管道,代码更简洁 - 自动流式:LCEL 管道自动支持
.stream(),每个组件的输出实时流向下一个组件 - 批处理:自动支持
.batch()批量执行 - 并行:
RunnableParallel让多条 Chain 并行执行,结果自动合并 - 条件路由:
RunnableBranch实现声明式条件分支 - 类型传递:输入输出类型在管道中自动推导
LCEL 劣势:
- 调试困难:管道中间状态不透明,出错时难以定位是哪个环节
- 灵活性受限:复杂的业务逻辑(循环、异常处理、条件判断)用声明式表达不直观
- 学习曲线:
RunnablePassthrough、RunnableParallel、RunnableLambda等概念较抽象 - TypeScript 类型:复杂管道的类型推导偶尔会出问题
// LCEL 风格(声明式)
const chain = prompt.pipe(model).pipe(parser);
const result = await chain.invoke({ question: '...' });
// 手动风格(命令式)
const formatted = await prompt.format({ question: '...' });
const response = await model.invoke(formatted);
const result = await parser.parse(response);
// 手动风格更容易在每步之间加日志、判断、异常处理
建议:简单的线性管道用 LCEL,复杂的业务逻辑(需要循环、大量条件判断、自定义错误处理)用手动方式。两者可以混合使用——LCEL 管道中可以嵌入 RunnableLambda 来执行任意自定义代码。
Q5: 各 SDK 如何处理 Token 限制问题?
答案:
Token 限制是 AI 应用中最常见的问题之一。各模型有不同的上下文窗口大小(GPT-4o: 128K,Claude 3.5: 200K,Gemini 1.5: 1M),超出限制会导致请求失败。
1. 预估并截断消息历史:
// 使用 tiktoken(OpenAI)预估 Token 数
import { encoding_for_model } from 'tiktoken';
function estimateTokens(messages: Message[], model: string): number {
const enc = encoding_for_model(model as any);
let total = 0;
for (const msg of messages) {
total += enc.encode(msg.content as string).length;
total += 4; // 每条消息的额外 Token(role 标记等)
}
return total + 2; // 回复的起始标记
}
// 截断策略:保留 system prompt + 最近 N 条消息
function truncateMessages(messages: Message[], maxTokens: number): Message[] {
const systemMessages = messages.filter(m => m.role === 'system');
const otherMessages = messages.filter(m => m.role !== 'system');
let tokens = estimateTokens(systemMessages, 'gpt-4o');
const result = [...systemMessages];
// 从最新到最旧添加,直到接近限制
for (let i = otherMessages.length - 1; i >= 0; i--) {
const msgTokens = estimateTokens([otherMessages[i]], 'gpt-4o');
if (tokens + msgTokens > maxTokens * 0.8) break; // 留 20% 给输出
result.splice(systemMessages.length, 0, otherMessages[i]);
tokens += msgTokens;
}
return result;
}
2. 使用 LangChain.js 的 Memory 压缩:
长对话使用 ConversationSummaryMemory 自动将历史压缩为摘要,Token 消耗保持恒定。
3. Anthropic Prompt Caching: 即使不能减少 Token 数量,缓存可以减少 90% 的重复计费。
4. 动态选择模型: 短上下文用便宜的小模型(gpt-4o-mini),长上下文切换到大窗口模型(gemini-1.5-pro 的 1M 上下文)。
Q6: 如何实现多 Provider 的 Fallback 容灾?
答案:
多 Provider Fallback 是生产环境的标配,确保单个 Provider 宕机时服务不中断。核心思路是依次尝试多个 Provider,遇到可重试错误时切换到下一个。
关键实现要点:
- 区分可重试和不可重试错误:429(限流)、500/503(服务不可用)可重试;401(认证失败)、400(参数错误)不应重试
- 指数退避:重试间隔指数增长(1s → 2s → 4s),避免请求风暴
- 随机抖动(Jitter):在退避时间上加随机偏移,避免多个客户端同时重试
- 超时控制:每个 Provider 设置合理超时,避免长时间等待失败的 Provider
- 日志与告警:记录每次 Fallback 事件,用于监控各 Provider 的可用性
// Vercel AI SDK 的 Provider Fallback 实现
const result = await streamWithFallback({
models: [
anthropic('claude-sonnet-4-20250514'), // 首选
openai('gpt-4o'), // 备选 1
google('gemini-1.5-pro'), // 备选 2
],
maxRetriesPerModel: 2,
}, { messages, tools, maxSteps: 5 });
建议 Fallback 链中混合不同 Provider(而非同一 Provider 的不同模型),因为同一 Provider 的宕机通常影响所有模型。同时,不同 Provider 对工具调用的支持可能有差异,需要确保 Fallback 链中的所有模型都支持你使用的功能。
Q7: Anthropic 的 Extended Thinking 与 OpenAI 的推理 Token(o3/o4-mini)在前端处理上有什么区别?
答案:
两者都是「让模型先思考再回答」的机制,但 API 层面差异很大:
| 维度 | Anthropic Extended Thinking | OpenAI Reasoning (o3/o4-mini) |
|---|---|---|
| API 参数 | thinking: { type: 'enabled', budget_tokens: N } | 无显式参数,模型自动推理 |
| 思维可见性 | 返回 thinking content block,可查看 | 不返回思维内容,完全黑盒 |
| Token 计费 | 思维 Token 单独计费(有 budget 控制) | 推理 Token 计入 completion_tokens |
| 流式行为 | thinking 和 text 分别流出 | 思考阶段无输出(只有最终回答流出) |
| 前端展示 | 可以展示「思考过程」 | 只能展示「模型正在思考...」 |
前端处理差异:
// Anthropic:可以展示思考过程
stream.on('thinking', (text) => {
// 可以展示给用户或用于调试
setThinkingContent(prev => prev + text);
});
stream.on('text', (text) => {
setResponseContent(prev => prev + text);
});
// OpenAI o3/o4-mini:只能展示最终结果
// 思考阶段 TTFT(首 Token 时间)会很长(10-60秒)
// 前端需要展示加载状态,无法展示中间过程
for await (const chunk of stream) {
// 只有 text delta,没有 thinking
const content = chunk.choices[0]?.delta?.content ?? '';
setResponseContent(prev => prev + content);
}
Vercel AI SDK 的统一处理:
Vercel AI SDK 的 anthropic provider 通过 providerOptions 支持 Extended Thinking,思维内容会出现在 message 的 parts 中,类型为 reasoning。但 OpenAI 的 o3/o4-mini 由于不返回思维内容,parts 中不会有 reasoning block。前端需要根据模型特性适配 UI。
Q8: Vercel AI SDK 的 Provider 抽象层是怎么工作的?如何实现自定义 Provider?
答案:
Provider 抽象层的核心是 LanguageModelV1 接口。每个 Provider 包(如 @ai-sdk/openai)返回一个函数,该函数接收模型 ID,返回实现了 LanguageModelV1 接口的对象。
接口有两个核心方法:
doGenerate(options):非流式生成,返回完整结果doStream(options):流式生成,返回ReadableStream
Provider 内部负责将 Vercel AI SDK 的统一消息格式转换为各厂商的专有格式(如 OpenAI 的 messages 数组、Anthropic 的 system + messages 分离格式),并将厂商返回的流事件转换为统一的 LanguageModelV1StreamPart 类型。
自定义 Provider 实现步骤:
- 创建一个函数,接收 modelId,返回
LanguageModelV1对象 - 实现
doGenerate:将统一格式转换为你的 LLM API 格式,调用 API,转换回统一结果 - 实现
doStream:类似doGenerate,但返回ReadableStream,每个 SSE 事件转换为LanguageModelV1StreamPart
注册到 Provider Registry 后,就可以通过 registry.languageModel('custom:model-id') 使用,与官方 Provider 体验一致。
Q9: LangChain.js 中如何实现条件路由(不同问题走不同的处理链)?
答案:
LangChain.js 提供 RunnableBranch 用于声明式条件路由。它接受一组 [条件函数, Runnable] 对和一个默认 Runnable:
import { RunnableBranch } from '@langchain/core/runnables';
const router = RunnableBranch.from([
// [条件, 处理链] —— 按顺序匹配,第一个匹配的生效
[
(input) => input.topic === 'code',
codeExpertChain,
],
[
(input) => input.topic === 'math',
mathExpertChain,
],
// 最后一个参数是默认分支
generalChain,
]);
更灵活的方式是使用 RunnableLambda 配合 LLM 做分类:先用一个轻量 LLM 调用判断问题类别,再路由到对应的专家 Chain。这种方式适合类别无法预先硬编码的场景。
与 Vercel AI SDK 对比:Vercel AI SDK 没有内置路由机制,需要在服务端代码中手动用 if/switch 判断。LangChain.js 的路由是管道的一部分,可以与其他 Runnable 自由组合。
Q10: 如何在前端统一追踪不同 SDK 的 Token 使用量?
答案:
各 SDK 返回 Token 使用量的方式不同,需要统一收集:
- Vercel AI SDK:
streamText返回的result.usage(Promise)或onFinish({ usage })回调 - OpenAI SDK:非流式在
response.usage;流式需要设置stream_options: { include_usage: true },在最后一个 chunk 中获取 - Anthropic SDK:在
response.usage中,额外包含cache_creation_input_tokens和cache_read_input_tokens
统一做法是实现一个 TokenTracker 服务:
- 每次 LLM 调用完成后,将
{ model, provider, promptTokens, completionTokens }发送到 Tracker - Tracker 根据各模型定价计算费用
- 持久化到数据库,按用户/功能/模型维度统计
- 设置预算告警,超出阈值时阻止请求或降级到更便宜的模型
在 Vercel AI SDK 中,推荐通过 Middleware 的 wrapGenerate/wrapStream 统一拦截并记录,避免在每个 API 路由中重复编写追踪代码。
Q11: 各框架在错误处理上有什么差异?生产环境的最佳实践是什么?
答案:
各框架错误处理差异:
- OpenAI SDK:抛出
OpenAI.APIError,包含status、code、message,429 限流时 headers 中有retry-after - Anthropic SDK:抛出
Anthropic.APIError,结构类似,但有特有的 529(overloaded)状态码 - Vercel AI SDK:在 Middleware 层和回调中处理错误,
useChat的error状态自动捕获 API 错误 - LangChain.js:通过 callbacks 系统报告错误,
AgentExecutor有内置重试
生产环境最佳实践:
- 统一错误规范化:将各 SDK 的错误转换为统一的
AIError类型,包含type(rate_limit / auth / server / token_limit)、retryable、retryAfterMs - 指数退避 + 抖动:重试间隔为
min(initialDelay * 2^attempt, maxDelay) + random(0, jitter) - Provider Fallback:主 Provider 失败时自动切换备选 Provider
- 流中断恢复:流式传输中途断开时,保留已接收的内容,提示用户「点击继续生成」
- 前端错误展示:区分用户可操作的错误(「请缩短输入内容」)和系统错误(「服务暂时不可用,正在重试」)
- 监控告警:错误率超过阈值时触发告警,帮助快速发现 Provider 问题
Q12: 如何封装一个多 Provider 的统一 AI 服务层?
答案:
import { streamText, generateText, type CoreMessage } from 'ai';
import { openai } from '@ai-sdk/openai';
import { anthropic } from '@ai-sdk/anthropic';
import { google } from '@ai-sdk/google';
// 模型配置
type ModelId = 'gpt-4o' | 'gpt-4o-mini' | 'claude-sonnet' | 'claude-haiku' | 'gemini-pro';
const MODELS: Record<ModelId, ReturnType<typeof openai>> = {
'gpt-4o': openai('gpt-4o'),
'gpt-4o-mini': openai('gpt-4o-mini'),
'claude-sonnet': anthropic('claude-sonnet-4-20250514'),
'claude-haiku': anthropic('claude-haiku-4-20250514'),
'gemini-pro': google('gemini-1.5-pro'),
};
// Fallback 链配置
const FALLBACK_CHAINS: Record<string, ModelId[]> = {
default: ['claude-sonnet', 'gpt-4o', 'gemini-pro'],
fast: ['gpt-4o-mini', 'claude-haiku'],
budget: ['gpt-4o-mini', 'claude-haiku', 'gemini-pro'],
};
interface ChatOptions {
model: ModelId;
messages: CoreMessage[];
system?: string;
tools?: Record<string, any>;
fallbackChain?: keyof typeof FALLBACK_CHAINS;
userId?: string;
}
export async function createChatStream(options: ChatOptions) {
const models = options.fallbackChain
? FALLBACK_CHAINS[options.fallbackChain].map(id => MODELS[id])
: [MODELS[options.model]];
return streamWithFallback(
{ models, maxRetriesPerModel: 2 },
{
system: options.system,
messages: options.messages,
tools: options.tools,
maxSteps: 5,
onFinish: ({ usage }) => {
tokenTracker.track({
requestId: crypto.randomUUID(),
model: options.model,
provider: MODELS[options.model].provider,
promptTokens: usage.promptTokens,
completionTokens: usage.completionTokens,
totalTokens: usage.promptTokens + usage.completionTokens,
userId: options.userId,
});
},
},
);
}
这个服务层的设计要点:
- 模型注册表:统一管理所有可用模型,按需扩展
- Fallback 链:预定义多种容灾策略(default/fast/budget)
- Token 追踪:在
onFinish中自动记录使用量 - 成本控制:根据场景选择不同成本的 Fallback 链
相关链接
- Vercel AI SDK 文档
- LangChain.js 文档
- OpenAI Node SDK
- Anthropic SDK
- 前端接入大模型 API - API 调用详解
- 流式渲染与 SSE - 流式响应处理
- Function Calling 与 AI Agent - 工具调用
- RAG 检索增强生成 - RAG 架构与实现