Harness Engineering
第6章 工具编排与并发执行
第6章 工具编排与并发执行
“Concurrency is not parallelism, and in Agent systems, understanding the difference is the difference between a sluggish bot and a responsive teammate.” —— 杨艺韬
上一章我们讨论了如何设计好的工具。但设计出来的工具终归要被执行。一个 Agent 在一次对话中可能调用几十甚至上百次工具,如何编排这些调用——按什么顺序、是否并发、如何处理依赖关系、怎么控制资源消耗——这些问题直接决定了 Agent 的执行效率和可靠性。
本章的核心论点是:工具编排不是简单的”调用函数”,而是一个调度系统的设计问题。它和操作系统的进程调度、分布式系统的任务编排有着深层的结构相似性。
本章要点
- 从顺序到并发是效率的关键跳跃——现代 LLM API 原生支持多工具调用
- 异步生成器模式——结果按完成顺序流式返回,用户体验关键
- 显式 vs 隐式依赖:模型管理前者,Harness 管理后者
- 信号量限流——按工具类型设独立并发上限
- 超时两层防御:单工具超时 + 全局循环超时
- 批处理——当相似操作可合并时,减少重复推理和重复工具开销
- Claude Code vs LangGraph——循环内嵌编排 vs 图节点化编排的权衡
flowchart TD
subgraph Sequential["顺序执行"]
S1[Tool A] --> S2[Tool B] --> S3[Tool C]
end
subgraph Parallel["并行执行"]
P1[Tool A]
P2[Tool B]
P3[Tool C]
end
subgraph Dependency["依赖图"]
D1[Tool A] --> D3[Tool C]
D2[Tool B] --> D3
end
Sequential ---|"简单但慢"| Parallel
Parallel ---|"快但需处理依赖"| Dependency
6.1 顺序执行:最朴素的模式
最简单的工具编排方式是顺序执行:模型返回一个工具调用请求,Harness 执行它,把结果喂回模型,模型再决定下一步。
// 最基本的顺序执行循环
async function agentLoop(messages: Message[]) {
while (true) {
const response = await llm.chat(messages);
if (response.stopReason === "end_turn") {
return response.content;
}
for (const toolCall of response.toolCalls) {
const result = await executeTool(toolCall);
messages.push({ role: "tool", content: result, toolCallId: toolCall.id });
}
}
}
顺序执行的优势
顺序执行的优势显而易见:简单、可预测、易调试。
- 每个工具调用的上下文是明确的
- 出了问题只需要看上一次调用的结果
- 状态变更的顺序清晰
- 不需要考虑并发安全
对于大多数简单场景——比如”读一个文件,然后修改它”——顺序执行完全够用。
顺序执行的代价
但它的代价也很明显:慢。
考虑这样的场景:Agent 需要理解一个模块的架构,要读取多个相关文件。
场景:顺序执行多个文件读取
第 1 轮: 模型决策 "先读 file1"
工具执行: Read(file1)
第 2 轮: 模型看到 file1 后再决定 "接下来读 file2"
工具执行: Read(file2)
...
每次读取之间都要重新走一遍模型推理!
不仅慢,还浪费 token:多个独立读取被拆成了多轮模型调用。
这不仅慢,还浪费 token——因为每读完一个文件,模型都需要重新决定下一步读哪个文件。
实际上,第 3 章讨论的 Agent Loop 在早期原型中几乎都采用顺序执行。LangChain 最早的 AgentExecutor 就是这个模式。它能跑,但在真实场景中,用户会明显感受到 Agent 的”迟钝”。
什么场景仍然应该顺序执行
不是所有场景都需要并发。以下情况更适合顺序执行:
- 状态依赖强:下一步必须基于上一步的具体结果
- 调试期:并发会让日志和堆栈混乱,顺序更好排查
- 低并发开销的工具:像简单数学计算这种,并发收益不大
- 操作同一资源:避免并发写冲突
6.2 并发执行:从串行到并行
一些 LLM API 支持在一次响应中返回多个工具调用请求,例如通过多个 tool-use block 或 tool-call 数组表达。这为并发执行提供了基础。
并发执行的核心思路是:如果模型一次返回了多个工具调用,说明它认为这些调用之间没有依赖关系,可以同时执行。
基础实现
async function agentLoopWithConcurrency(messages: Message[]) {
while (true) {
const response = await llm.chat(messages);
if (response.stopReason === "end_turn") {
return response.content;
}
// 并发执行所有工具调用
const results = await Promise.allSettled(
response.toolCalls.map(toolCall => executeTool(toolCall))
);
// 将所有结果按顺序推入消息
for (let i = 0; i < results.length; i++) {
const result = results[i];
const content = result.status === "fulfilled"
? result.value
: `Error: ${result.reason}`;
messages.push({
role: "tool",
content,
toolCallId: response.toolCalls[i].id,
});
}
}
}
Promise.allSettled vs Promise.all
这里用 Promise.allSettled 而不是 Promise.all 是一个关键的工程决策。
| 行为 | Promise.all | Promise.allSettled |
|---|---|---|
| 失败处理 | 任一失败立即 reject | 等所有完成 |
| 部分结果 | 丢失 | 保留 |
| 适用场景 | 事务性操作 | 独立并发任务 |
对于工具执行来说,一个工具失败不应该阻止其他工具返回结果——模型需要看到所有结果(包括错误)来做下一步决策。
并发的效率模型
场景:多个独立文件读取
顺序执行:
1 次模型推理决定第一个文件
+ N 次 "读完一个文件后再让模型决定下一个文件"
并发执行:
1 次模型推理决定 N 个独立文件
+ N 个文件读取并发执行
收益来自两部分:
1. 少走多轮模型推理
2. 工具 I/O 等待重叠
这里不需要假设固定耗时。只要工具调用之间没有数据依赖,并发就能减少等待链路;如果模型推理本身比工具 I/O 贵,并发还能减少重复推理造成的 token 和延迟开销。
6.2.1 Claude Code 的异步生成器模式
Claude Code 的工具编排比上面的简单示例复杂得多。它使用了异步生成器(async generator)模式来实现流式工具执行。
async function* executeToolsConcurrently(
toolCalls: ToolCall[]
): AsyncGenerator<ToolResult> {
const promises = toolCalls.map(async (toolCall) => {
const result = await executeTool(toolCall);
return { toolCall, result };
});
// 使用竞速模式,谁先完成谁先 yield
const pending = new Set(promises);
while (pending.size > 0) {
const { promise, value } = await Promise.race(
[...pending].map(p => p.then(v => ({ promise: p, value: v })))
);
pending.delete(promise);
yield value;
}
}
流式返回的用户体验
这个模式的精妙之处在于:结果按完成顺序流式返回,而不是按请求顺序。
请求顺序: tool_A (慢), tool_B (快), tool_C (中)
Promise.all: 等最慢的 tool_A 完成后一起返回
流式异步生成器: tool_B (完成先) → tool_C → tool_A
用户体验:
Promise.all: "思考中..." → 最慢工具完成后 "全部完成"
流式: "tool_B 完成" → "tool_C 完成" → "tool_A 完成"
如果 5 个文件读取中有一个特别快,它的结果会立即被处理,而不需要等待其他 4 个。对用户体验至关重要——用户可以在终端中实时看到工具执行的进度,而不是盯着空白屏幕等所有工具执行完毕。
与 Agent Loop 的兼容性
异步生成器的另一个优势是它与 Agent Loop 的消息流天然兼容。Agent Loop 本身就是一个持续产出消息的流式过程,工具执行结果可以无缝汇入这个流:
async function* runAgent(input: string) {
yield { type: 'user_message', content: input };
while (true) {
yield { type: 'thinking' };
const response = await llm.chat(messages);
yield { type: 'assistant_message', content: response.text };
if (response.toolCalls.length === 0) return;
// 工具执行也是流式的,无缝融入
for await (const result of executeToolsConcurrently(response.toolCalls)) {
yield { type: 'tool_result', ...result };
}
}
}
6.3 工具依赖图:当并发遇到依赖
并非所有工具调用都可以并发执行。考虑这个场景:Agent 需要先搜索文件找到目标路径,再读取该文件,最后修改它。搜索 → 读取 → 修改之间存在严格的数据依赖。
两种依赖类型
在实践中,依赖关系有两种来源:
显式依赖——一个工具的输入参数来自另一个工具的输出。
Grep("待办标记") → files: [a.ts, b.ts]
↓(显式依赖:下一步需要 Grep 的结果)
Read(a.ts)
↓
Edit(a.ts, old_marker, new_marker)
这种依赖由模型自然管理:模型不会在一次响应中同时请求 grep 和 read(因为它还不知道 grep 的结果),它会先请求 grep,拿到结果后再请求 read。
隐式依赖——两个工具操作同一资源,存在竞态条件。
Edit(file.ts, "foo", "bar") ← 写入
|
↓ 同时
Read(file.ts) ← 读取
→ 谁先谁后不确定,可能读到旧值也可能读到新值
这种依赖模型未必能识别,需要 Harness 层来管理。
隐式依赖检测
// 隐式依赖检测的简化实现
function detectConflicts(toolCalls: ToolCall[]): Map<string, ToolCall[]> {
const resourceMap = new Map<string, ToolCall[]>();
for (const call of toolCalls) {
const resources = extractResources(call); // 提取工具操作的资源
for (const resource of resources) {
if (!resourceMap.has(resource)) {
resourceMap.set(resource, []);
}
resourceMap.get(resource)!.push(call);
}
}
// 返回有冲突的资源及其相关工具调用
return new Map(
[...resourceMap.entries()].filter(([_, calls]) => {
// 多个写操作或读写混合都算冲突
return calls.length > 1 && calls.some(c => isWriteOperation(c));
})
);
}
function extractResources(call: ToolCall): string[] {
switch (call.name) {
case 'Read':
case 'Edit':
case 'Write':
return [call.params.file_path];
case 'Bash':
// Bash 本身就是一个全局资源
return ['_shell'];
default:
return [];
}
}
Claude Code 的务实策略
Claude Code 的真实实现比”构建完整依赖图”更简单。toolOrchestration.ts 让每个工具通过 isConcurrencySafe(parsedInput) 自己声明当前输入是否适合并发;编排层只负责把连续的并发安全调用合成 batch,不安全调用则单独串行执行(../claude-code-main/src/services/tools/toolOrchestration.ts:86-116)。
这个策略的关键不是”完全相信模型”,而是”相信工具自己的并发安全判断”:
- Read、Glob、Grep 这类只读工具通常可以并发
- Bash、写文件、修改状态的工具更倾向于串行
- 如果输入解析失败,或
isConcurrencySafe自身抛错,编排层保守地视为不安全
这种设计把复杂度推给最了解资源语义的工具层。编排层不需要知道 Bash 命令是否只读、某个 LSP 操作是否会修改状态;工具自己判断,编排层只按判断结果分组。
冲突解决策略
当检测到冲突时,有三种处理策略:
graph TD
Conflict[检测到资源冲突]
Conflict --> S1[策略 1: 串行化<br/>冲突调用按顺序执行]
Conflict --> S2[策略 2: 拒绝<br/>返回错误让模型重试]
Conflict --> S3[策略 3: 分组并发<br/>冲突组内串行,组间并发]
S1 --> U1[适用: 自动可解决]
S2 --> U2[适用: 需模型重新规划]
S3 --> U3[适用: 大批量操作]
style S1 fill:#dcfce7,stroke:#22c55e
style S2 fill:#fef3c7,stroke:#f59e0b
style S3 fill:#dbeafe,stroke:#3b82f6
6.4 速率限制与资源管理
并发执行带来效率提升的同时,也带来了资源管理问题。如果不加限制,一个 Agent 可能同时发起 100 个文件读取、50 个网络请求、20 个子进程。这不仅会耗尽系统资源,还可能触发外部 API 的速率限制。
信号量模式
资源管理的核心模式是信号量(Semaphore):
class Semaphore {
private permits: number;
private waiting: Array<() => void> = [];
constructor(permits: number) {
this.permits = permits;
}
async acquire(): Promise<void> {
if (this.permits > 0) {
this.permits--;
return;
}
return new Promise(resolve => this.waiting.push(resolve));
}
release(): void {
if (this.waiting.length > 0) {
this.waiting.shift()!();
} else {
this.permits++;
}
}
}
// 按工具类型设置不同的并发上限
const limits: Record<string, Semaphore> = {
file_read: new Semaphore(10), // 最多 10 个并发文件读取
bash: new Semaphore(1), // bash 串行执行
web_fetch: new Semaphore(5), // 最多 5 个并发网络请求
default: new Semaphore(20), // 默认上限
};
async function executeToolWithLimit(toolCall: ToolCall): Promise<ToolResult> {
const semaphore = limits[toolCall.name] ?? limits.default;
await semaphore.acquire();
try {
return await executeTool(toolCall);
} finally {
semaphore.release();
}
}
不同工具的并发策略
| 工具类型 | 并发上限 | 理由 |
|---|---|---|
| 文件读取 | 中到高 | I/O 密集,OS 能处理 |
| 文件写入 | 低 | 降低冲突概率 |
| Shell 命令 | 低 | 全局状态共享,倾向串行 |
| 网络请求 | 中 | 适配 API rate limit |
| 数据库查询 | 连接池大小 | 不超过 DB 连接池 |
| 子 Agent | 低到中 | LLM API、上下文复制和费用都需要控制 |
全局配额
在生产环境中,资源管理还需要考虑全局配额。如果一个用户同时运行多个 Agent 会话,它们共享的系统资源需要在会话之间做公平调度:
class GlobalQuotaManager {
private userLimits = new Map<string, Semaphore>();
private globalLimit = new Semaphore(GLOBAL_LIMIT);
async acquire(userId: string): Promise<void> {
await this.globalLimit.acquire();
if (!this.userLimits.has(userId)) {
this.userLimits.set(userId, new Semaphore(PER_USER_LIMIT));
}
await this.userLimits.get(userId)!.acquire();
}
}
这已经超出了单个 Agent 的范畴,进入了平台工程的领域。
背压(Backpressure)
除了并发限制,还需要考虑背压——当下游处理不过来时,上游自然慢下来。典型场景:
场景:Agent 生成工具调用速度 > Harness 执行速度
↓ 如果不处理
Harness 积压大量待执行任务 → 内存爆炸
正确做法:
Harness 处理满了 → 让模型推理等一等
用并发信号量天然实现:模型申请工具执行时会阻塞
6.5 工具调用验证
在执行工具之前,验证调用参数是一个容易被忽略但极其重要的环节。模型生成的参数可能存在各种问题:
- 类型错误:期望数字,传了字符串
- 路径穿越:
../../etc/passwd这样的恶意路径 - 参数缺失:必填参数没有提供
- 超出范围:文件内容超过合理长度
- 危险命令:
rm -rf /这种显而易见的危险操作
三层验证
function validateToolCall(toolCall: ToolCall): ValidationResult {
// 层 1: Schema 验证(类型、必填)
const schemaResult = validateSchema(toolCall.name, toolCall.params);
if (!schemaResult.valid) {
return { valid: false, error: schemaResult.errors, layer: 'schema' };
}
// 层 2: 语义验证(合理性)
if (toolCall.name === "Read" || toolCall.name === "Edit") {
const path = toolCall.params.file_path as string;
if (path.includes("..")) {
return { valid: false, error: "Path traversal detected", layer: 'semantic' };
}
if (!path.startsWith(allowedRoot)) {
return { valid: false, error: "Path outside allowed directory", layer: 'semantic' };
}
}
if (toolCall.name === "Bash") {
const command = toolCall.params.command as string;
const dangerCheck = detectDangerousCommand(command);
if (dangerCheck.blocked) {
return { valid: false, error: dangerCheck.reason, layer: 'security' };
}
}
// 层 3: 输入消毒(清洁化)
toolCall.params = sanitize(toolCall.name, toolCall.params);
return { valid: true };
}
验证失败的处理
验证失败时的处理策略值得思考:
| 策略 | 优势 | 劣势 | 适用 |
|---|---|---|---|
| 直接拒绝 | 简单清晰 | 模型不知原因 | Schema 错误 |
| 返回详细错误 | 模型可恢复 | 成本稍高 | 语义错误 |
| 自动修正 | 用户无感 | 可能修错 | 路径规范化 |
| 升级确认 | 最安全 | 阻塞 | 安全敏感操作 |
Claude Code 在验证方面做了大量工作:
- 检查文件路径是否在项目目录内
- 验证 bash 命令是否包含危险操作
- 确认编辑操作的目标文件确实存在
- 检查 Write 前是否已 Read
这些验证在第 14 章的权限模型中会有更详细的讨论。
6.6 超时管理
工具执行可能因为各种原因卡住:网络请求超时、子进程死锁、文件系统挂起。超时管理是工具编排中不可或缺的一环。
超时的两个层次
flowchart TD
Start[工具开始执行] --> T1{单工具超时}
T1 -->|超时| Kill[终止进程<br/>返回 Timeout 错误]
T1 -->|完成| Check{全局循环超时}
Check -->|超时| Stop[强制终止整个循环<br/>返回部分结果]
Check -->|未超时| Next[继续下一个工具]
style Kill fill:#fee2e2,stroke:#ef4444
style Stop fill:#fee2e2,stroke:#ef4444
style Next fill:#dcfce7,stroke:#22c55e
单工具超时
async function executeWithTimeout(
toolCall: ToolCall,
timeoutMs: number
): Promise<ToolResult> {
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
return await executeTool(toolCall, { signal: controller.signal });
} catch (error) {
if (error instanceof AbortError) {
return {
toolCallId: toolCall.id,
content: `Tool execution timed out after ${timeoutMs}ms`,
isError: true,
};
}
throw error;
} finally {
clearTimeout(timer);
}
}
全局循环超时
async function agentLoopWithGlobalTimeout(
messages: Message[],
globalTimeoutMs: number
) {
const deadline = Date.now() + globalTimeoutMs;
while (Date.now() < deadline) {
const remaining = deadline - Date.now();
const response = await withTimeout(llm.chat(messages), remaining);
if (response.stopReason === "end_turn") return response.content;
// 执行工具时传入剩余时间
await executeTools(response.toolCalls, remaining);
}
return "Agent loop timed out";
}
不同工具的超时策略
| 工具类型 | 超时策略 | 理由 |
|---|---|---|
| 文件读取 | 短超时 | 本地 I/O 通常很快 |
| 文件搜索(Glob/Grep) | 中等超时 | 大项目可能慢 |
| Bash 命令 | 默认超时 + 用户可配置上限 | 编译/测试可能耗时 |
| 网络请求 | 按外部 API SLA 配置 | 需要兼顾重试和用户等待 |
| 子 Agent | 长超时 + 可中断 | 可能涉及多轮推理 |
Claude Code 的 BashTool prompt 会把默认超时和最大超时写进工具说明中(../claude-code-main/src/tools/BashTool/prompt.ts:331-335)。默认值和最大值由 utils/timeouts.ts 定义,并支持通过环境变量覆盖(../claude-code-main/src/utils/timeouts.ts:1-39)。这比把超时数字散落在 prompt 中更容易维护。
取消与清理
超时只是第一步,真正难的是取消后的清理。网络请求要取消未完成连接,子进程要回收句柄,文件写入要避免半写状态,数据库事务要回滚。工具接口如果只返回 Timeout 字符串,不暴露取消信号和清理钩子,Harness 就很难保证资源不会泄漏。
全局超时则防止 Agent 进入无限循环。即使每次单独的工具调用都没有超时,Agent 可能因为无法完成任务而反复调用工具。设置一个全局时间上限是必要的安全措施。
6.7 批处理:当量变引起质变
有些场景下,Agent 需要对大量相似的目标执行相同操作——比如重命名一个在 50 个文件中出现的变量,或者在一个大型项目中搜索所有匹配某个模式的文件。逐个调用工具既低效又浪费 token。
批处理的核心思想
将多个相似的工具调用合并为一个:
// ❌ 不好:50 次独立的 grep 调用
for (const pattern of patterns) {
await grep({ pattern, path: projectRoot });
}
// ✅ 好:一次 grep 调用覆盖多个模式
await grep({ pattern: patterns.join("|"), path: projectRoot });
两层批处理
批处理不仅发生在工具内部,也可以发生在 Harness 层。如果 Harness 检测到模型连续请求了多个相同类型的工具调用,可以将它们合并执行:
function batchToolCalls(toolCalls: ToolCall[]): ToolCall[] {
const groups = groupBy(toolCalls, call => call.name);
const batched: ToolCall[] = [];
for (const [name, calls] of Object.entries(groups)) {
if (name === "file_read" && calls.length > 3) {
// 将多个文件读取合并为一个批量读取
batched.push({
name: "batch_file_read",
params: { paths: calls.map(c => c.params.path) },
id: generateId(),
});
} else {
batched.push(...calls);
}
}
return batched;
}
Claude Code 的批处理设计
Claude Code 在文件操作上大量使用了批处理思维:
- Glob:一次匹配多个模式(
{**/*.ts,**/*.tsx}) - Grep:在多个文件类型中搜索(
--type js) - Edit 的 replace_all:一次替换所有匹配,不用逐个替换
- 批量 Read:通过 Agent 工具让子 Agent 并发读取大量文件
这些工具的设计本身就是批处理友好的——它们的参数空间允许单次调用覆盖广泛的操作范围。
批处理的挑战:结果映射
批处理的挑战在于结果映射。原始调用和合并后的调用之间存在一对多的关系,需要正确地将批量结果拆分回每个原始调用的上下文中。
场景: 批量读取
原始调用: [Read(a.ts), Read(b.ts), Read(c.ts)]
批量后: batch_read(['a.ts', 'b.ts', 'c.ts'])
结果: { 'a.ts': content_a, 'b.ts': content_b, 'c.ts': content_c }
需要:把批量结果拆回 3 个 tool_result 消息
注意:顺序、ID 映射都不能错
如果不处理好这一点,模型可能无法正确关联工具调用和结果。
6.8 教学模型:完整编排流程
把上面的各个组件组合起来,可以得到一个教学版的工具编排模型:
async function* orchestrateTools(
toolCalls: ToolCall[],
config: OrchestrationConfig
): AsyncGenerator<ToolEvent> {
// 第一步:验证所有工具调用
const validated = toolCalls.map(call => {
const result = validateToolCall(call);
return { call, error: result.valid ? null : result.error };
});
// 立即 yield 验证失败的结果
for (const v of validated.filter(v => v.error)) {
yield {
type: "tool_error",
toolCallId: v.call.id,
error: v.error,
};
}
const validCalls = validated
.filter(v => !v.error)
.map(v => v.call);
// 第二步:检测冲突,拆分为可并发组
const groups = splitIntoConcurrentGroups(validCalls);
// 第三步:按组顺序执行,组内并发
for (const group of groups) {
const promises = group.map(async (call) => {
// 获取信号量
const semaphore = getSemaphore(call.name);
await semaphore.acquire();
try {
// 带超时执行
const timeout = getTimeout(call.name, config);
const result = await executeWithTimeout(call, timeout);
return { type: "tool_result" as const, ...result };
} catch (error) {
return {
type: "tool_error" as const,
toolCallId: call.id,
error: String(error),
};
} finally {
semaphore.release();
}
});
// 竞速 yield 结果
const pending = new Set(promises);
while (pending.size > 0) {
const settled = await Promise.race(
[...pending].map(p => p.then(v => ({ p, v })))
);
pending.delete(settled.p);
yield settled.v;
}
}
}
五层机制回顾
这个实现体现了几个关键设计决策:
- 验证前置:在任何执行之前完成所有验证,尽早失败
- 分组并发:有冲突的调用被分到不同组,组间串行、组内并发
- 信号量限流:每种工具类型有独立的并发上限
- 带超时执行:每个工具调用都有独立的超时控制
- 流式返回:结果按完成顺序 yield,不阻塞
这五层机制叠加起来,形成了一个既高效又安全的工具执行引擎。实际产品不一定显式实现这五个抽象,但它们对应的责任不能消失:验证、分组、限流、超时、流式返回都要有落点。
6.8.1 本地快照:Claude Code 真实工具编排比教学模型更朴素
§6.8 的 orchestrateTools 是教学伪代码,展示”五层叠加”的概念。打开本地 ../claude-code-main/src/services/tools/ 看真实代码,可以看到更紧凑的实现:
| 文件 | 行 | 角色 |
|---|---|---|
toolOrchestration.ts | 188 | 本章主角的真身——runTools / partitionToolCalls / runToolsSerially / runToolsConcurrently |
toolExecution.ts | 1745 | 单个工具执行(runToolUse)—— 远比编排重 |
toolHooks.ts | 650 | 工具前后 hook |
StreamingToolExecutor.ts | 530 | 流式工具结果聚合 |
| 合计 | 3113 | — |
toolOrchestration.ts 只有 188 行,核心逻辑集中在两段:runTools() 根据 batch 类型选择并发或串行(../claude-code-main/src/services/tools/toolOrchestration.ts:19-82),partitionToolCalls() 用一次 reduce 把工具调用分成 batch(../claude-code-main/src/services/tools/toolOrchestration.ts:86-116)。
// toolOrchestration.ts:91-116(本地快照)
function partitionToolCalls(toolUseMessages, toolUseContext): Batch[] {
return toolUseMessages.reduce((acc, toolUse) => {
const tool = findToolByName(...)
const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
const isConcurrencySafe = parsedInput?.success
? (() => {
try { return Boolean(tool?.isConcurrencySafe(parsedInput.data)) }
catch { return false } // 抛错就当成不安全 ← 防御性编码
})()
: false
// 连续 safe 的合并到同一 batch、否则新开 batch
if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) {
acc[acc.length - 1]!.blocks.push(toolUse)
} else {
acc.push({ isConcurrencySafe, blocks: [toolUse] })
}
return acc
}, [])
}
三条源码事实可以校正 §6.8 的伪代码理解:
- 没有”冲突图”或
splitIntoConcurrentGroups这种复杂算法。Claude Code 只是维护一个连续序列,碰到isConcurrencySafe = false就开新 batch,连续 safe 的合并到当前 batch(../claude-code-main/src/services/tools/toolOrchestration.ts:95-115)。 - 工具自己声明并发安全。
tool.isConcurrencySafe(parsedInput)是每个工具自带的运行时方法;如果输入解析失败或判断过程抛错,编排层保守地视为不安全(../claude-code-main/src/services/tools/toolOrchestration.ts:96-108)。 - 并发上限集中配置。
CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY环境变量缺省为 10(../claude-code-main/src/services/tools/toolOrchestration.ts:8-11),并作为 concurrency 参数传给all(...)(../claude-code-main/src/services/tools/toolOrchestration.ts:152-176)。
对比下来,教学伪代码把验证、分组、信号量、超时、流式都摊开讲;真实代码把并发分组做得很薄,把复杂度推到工具执行、工具 schema、权限和工具自己的 isConcurrencySafe 中。这不是教学伪代码错了,而是”概念清晰”和”工程精简”的取舍。读者建立心智模型用前者,读源码理解实际实现用后者。
6.8.2 编排事件模型
工具编排如果只返回最终结果,调试会非常困难。生产系统应该把工具执行过程拆成事件流,让 UI、日志、评估和恢复逻辑都能订阅同一组事实:
| 事件 | 触发时机 | 用途 |
|---|---|---|
tool_scheduled | 工具通过验证、进入队列 | 记录模型想做什么 |
tool_started | 工具开始执行 | UI 显示进度,计时开始 |
tool_progress | 长任务产生中间状态 | 避免用户误以为卡死 |
tool_succeeded | 工具正常结束 | 记录输出摘要和资源变化 |
tool_failed | 工具返回错误 | 供模型恢复或触发降级 |
tool_cancelled | 用户中断或全局超时 | 清理资源、停止后续 batch |
batch_completed | 一个并发组全部结束 | 决定是否进入下一组 |
事件模型的关键是不要把大结果直接塞进日志。文件内容、命令输出、网络响应都可能很大,也可能包含敏感信息。更稳的做法是日志记录摘要、大小、耗时、退出码、资源路径和错误类型;完整内容只进入受控的消息上下文或临时存储。
type ToolEvent =
| { type: "tool_scheduled"; id: string; name: string; resourceKeys: string[] }
| { type: "tool_started"; id: string; startedAt: number }
| { type: "tool_succeeded"; id: string; durationMs: number; summary: string }
| { type: "tool_failed"; id: string; durationMs: number; errorType: string }
| { type: "tool_cancelled"; id: string; reason: "user" | "timeout" | "budget" }
有了事件流,很多能力会自然出现:终端可以实时显示进度,可观测性系统可以计算慢工具和失败工具,恢复逻辑可以判断是否重试,评估系统可以复盘模型是否发起了多余工具调用。并发编排不是只追求快,它还要让执行过程可解释、可中断、可恢复。没有事件流的并发系统,出了问题只能看最终错误;有事件流的系统,才能定位是哪一个 batch、哪一个工具、哪一种资源冲突导致失败。
6.9 与 LangGraph 的对比
LangGraph 对工具执行采取了截然不同的架构思路。在 LangGraph 中,工具执行被建模为图中的一个节点(ToolNode):
from langgraph.prebuilt import ToolNode
tool_node = ToolNode([search_tool, calculator_tool, file_tool])
graph = StateGraph(AgentState)
graph.add_node("agent", call_model)
graph.add_node("tools", tool_node)
graph.add_edge("agent", "tools")
graph.add_edge("tools", "agent")
ToolNode 内部也支持并发执行——当 LLM 返回多个工具调用时,ToolNode 会并发执行它们。但关键区别在于抽象层次。
两种架构的权衡
Claude Code 把工具编排看作 Agent Loop 的内部实现细节。工具执行嵌在循环体内部,编排逻辑和循环逻辑紧密耦合。
LangGraph 则把工具执行外化为图的一个可组合节点。好处是清晰的职责分离和强大的可组合性——你可以在 ToolNode 前后插入任意的预处理和后处理节点(比如权限检查节点、日志节点)。
| 维度 | Claude Code | LangGraph |
|---|---|---|
| 抽象模型 | 循环内嵌编排 | 图节点 |
| 并发机制 | async generator + Promise.allSettled | asyncio.gather |
| 流式支持 | 原生,按完成顺序 yield | 需要额外配置 |
| 可组合性 | 低,紧耦合 | 高,节点可插拔 |
| 性能开销 | 低 | 中等 |
| 状态管理 | 手动 | 图状态自动传递 |
| 学习曲线 | 低 | 中 |
| 调试难度 | 低(线性代码) | 中(图拓扑) |
如何选择
两种方式没有绝对的优劣:
- Claude Code 方式 更适合单体 Agent 应用,追求极致的执行效率和用户体验
- LangGraph 方式 更适合复杂的多步编排场景,追求架构的清晰和灵活
graph TD
Q{场景?}
Q -->|单用户 / CLI / IDE| CC[Claude Code 模式<br/>循环内嵌]
Q -->|多租户服务 / 复杂工作流| LG[LangGraph 模式<br/>图节点化]
Q -->|简单工具调用| CC
Q -->|需要持久化状态| LG
Q -->|需要时间旅行调试| LG
style CC fill:#dcfce7,stroke:#22c55e
style LG fill:#dbeafe,stroke:#3b82f6
6.10 四个反模式
反模式一:一律顺序执行
现象:即使是独立的工具调用也强制串行。
后果:独立 I/O 被串成等待链,用户感知延迟和重复推理成本都会上升。
对策:默认并发,只在检测到资源冲突时串行。
反模式二:无限制并发
现象:Promise.all(1000 个工具调用),没有信号量。
后果:系统资源爆炸、API rate limit 触发、OOM。
对策:信号量限流,按工具类型设上限。
反模式三:一把锁锁全局
现象:一个全局 Mutex 保护所有工具调用。
后果:等价于顺序执行,失去并发的价值。
对策:细粒度锁——按资源(文件、URL)加锁,而非全局。
反模式四:没有超时
现象:工具执行没有超时上限。
后果:一个卡住的网络请求冻结整个 Agent。
对策:每个工具调用都有超时;Agent 循环有全局超时。
6.11 本章小结:工具编排的七条原则
工具编排是 Harness Engineering 中最”工程”的部分之一。它不涉及 AI 理论,纯粹是软件工程问题——并发控制、资源管理、超时处理、输入验证。但正是这些”无聊”的工程细节,决定了 Agent 在生产环境中是流畅运行还是频繁卡死。
核心要点
- 从顺序到并发是效率的关键跳跃——当模型能一次表达多个独立工具调用时,Harness 应该利用并发减少等待链路
- 异步生成器 + 流式返回——不要让用户等最慢的工具
- 依赖管理可以信任模型但要兜底——模型通常能正确处理显式依赖,但隐式依赖需要 Harness 层保护
- 资源管理用信号量模式——不同工具类型设置不同的并发上限,防止资源耗尽
- 验证是最便宜的保险——在执行前验证参数,远比在执行后处理错误成本低
- 超时要分层——单工具超时防止个别调用卡死,全局超时防止 Agent 无限循环
- 批处理是减少重复开销的优化——当相似操作可以合并时,合并执行能减少重复推理、重复 I/O 和结果映射成本
核心口号
Orchestrate, don’t just execute. The difference between
await tool()andawait orchestrate(tools)is the difference between a toy and a product.
下一章我们将讨论当工具执行出错时怎么办——工具错误恢复是工具编排的另一面。设计得再好的编排系统,也无法避免工具在运行时失败,关键是如何优雅地处理这些失败。