Appearance
第7章 工具编排与并发执行
当模型在一次响应中同时调用 Grep、Read、Glob 三个工具搜索代码库时,这些调用是顺序执行还是并行执行?当模型一边读文件一边写文件时,系统如何保证写操作不会和读操作产生竞态条件?当某个 Bash 命令执行失败时,还在排队的其他工具调用该怎么处理?
这些问题的答案,隐藏在 Claude Code 的工具编排层中。这一层位于 API 响应解析和工具实际执行之间,它决定了哪些工具可以并行运行、哪些必须串行等待,以及在整个过程中如何维护上下文一致性。
本章将深入这个编排层的完整实现,从批次分区算法到并发执行引擎,从 Pre/Post Hooks 管道到文件状态追踪,揭示一个生产级 Agent 系统是如何高效且安全地管理数十个工具的并发调度的。理解工具编排的设计,不仅有助于理解 Claude Code 本身的行为特征(比如为什么有些操作明显更快),更能为构建自己的 Agent 系统提供可直接复用的工程模式。
本章要点
- 工具编排的核心是
runTools()async generator:它从 API 响应中提取工具调用块,分批执行并流式产出结果 - 批次分区算法:连续的读操作合并为一个并发批次,写操作独占一个串行批次
- 双模式执行引擎:
runToolsConcurrently()利用all()并发调度器实现有上限的并行执行,runToolsSerially()保证写操作的顺序一致性 - 完整的工具执行管道:每个工具调用经历输入校验、Pre-Hook、权限检查、实际执行、Post-Hook、结果收集六个阶段
- 文件状态缓存:
FileStateCache基于 LRU 策略缓存已读文件,避免重复 IO;FileHistory在每次写操作前创建备份,支持回退 - Hook 系统:
PreToolUse和PostToolUse钩子允许用户自定义命令在工具执行前后介入,实现权限控制、日志记录、输入改写等高级功能
7.1 工具编排总览
从 API 响应到工具执行
在第5章中,我们分析了 Claude Code 的流式响应处理。当 API 返回的助手消息中包含 tool_use 类型的内容块时,查询引擎需要提取这些工具调用并编排它们的执行。这个提取过程发生在 query.ts 的主循环中:
typescript
// 源码位置: src/query.ts
const toolUseBlocks: ToolUseBlock[] = []
// 流式处理中,每当检测到 tool_use 块就收集起来
const assistantMessages: AssistantMessage[] = []
// 流结束后,将工具调用交给编排层
const toolUpdates = streamingToolExecutor
? streamingToolExecutor.getRemainingResults()
: runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)
for await (const update of toolUpdates) {
if (update.message) {
// 将工具执行结果加入消息流
}
}这里有一个关键的分支:系统可以使用 StreamingToolExecutor(流式工具执行器,在 API 响应流式传入过程中就开始执行工具)或者 runTools()(批量编排,等所有工具调用解析完毕后统一调度)。两者的编排逻辑相似,但 runTools() 是更核心的抽象,它清晰地展示了编排层的完整决策链路。
之所以存在两种模式,是因为实际场景中存在一个权衡:流式模式可以让工具尽早开始执行(API 还在返回后续 tool_use 块时,前面的工具已经在运行了),带来更低的端到端延迟;而批量模式拥有全部工具调用的全局视图,可以做出更优的分区决策。对于大多数交互式场景,流式模式的延迟优势更有价值;在某些需要精确控制执行顺序的场景(如查询辅助工具调用),批量模式更为可靠。
runTools() 的结构
runTools() 是整个编排层的入口,定义在 src/services/tools/toolOrchestration.ts 中。它是一个 async generator 函数,接收工具调用块列表,产出执行结果流:
typescript
// 源码位置: src/services/tools/toolOrchestration.ts
export async function* runTools(
toolUseMessages: ToolUseBlock[],
assistantMessages: AssistantMessage[],
canUseTool: CanUseToolFn,
toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
let currentContext = toolUseContext
for (const { isConcurrencySafe, blocks } of partitionToolCalls(
toolUseMessages,
currentContext,
)) {
if (isConcurrencySafe) {
// 并发执行读操作批次
const queuedContextModifiers: Record<
string,
((context: ToolUseContext) => ToolUseContext)[]
> = {}
for await (const update of runToolsConcurrently(
blocks, assistantMessages, canUseTool, currentContext,
)) {
if (update.contextModifier) {
const { toolUseID, modifyContext } = update.contextModifier
if (!queuedContextModifiers[toolUseID]) {
queuedContextModifiers[toolUseID] = []
}
queuedContextModifiers[toolUseID].push(modifyContext)
}
yield { message: update.message, newContext: currentContext }
}
// 批次结束后,按工具顺序应用上下文修改
for (const block of blocks) {
const modifiers = queuedContextModifiers[block.id]
if (!modifiers) continue
for (const modifier of modifiers) {
currentContext = modifier(currentContext)
}
}
yield { newContext: currentContext }
} else {
// 串行执行写操作批次
for await (const update of runToolsSerially(
blocks, assistantMessages, canUseTool, currentContext,
)) {
if (update.newContext) {
currentContext = update.newContext
}
yield { message: update.message, newContext: currentContext }
}
}
}
}这段代码的核心逻辑可以归纳为三步:分区 -> 判断 -> 执行。首先调用 partitionToolCalls() 将工具调用分为若干批次,然后根据每个批次的 isConcurrencySafe 标志选择并发或串行执行路径。
值得注意的是 currentContext 的管理策略。对于并发批次,上下文修改器(contextModifier)不会在执行过程中立即生效,而是先缓存起来,等整个批次执行完毕后再按照工具的原始顺序依次应用。这保证了并发执行不会导致上下文状态的不确定性。对于串行批次,每个工具执行完毕后上下文修改立即生效,后续工具看到的是更新后的上下文。
使用 async generator 的设计哲学
选择 async generator 作为编排层的核心抽象并非偶然。这种模式带来三个关键优势:
第一,流式产出。工具执行的结果不需要等所有工具跑完才返回,而是执行一个就产出一个。对于 UI 层来说,用户可以实时看到每个工具的执行进度和结果。
第二,可组合性。runTools() 产出的是 AsyncGenerator<MessageUpdate>,而 runToolsConcurrently() 和 runToolsSerially() 也是同样类型的 generator。它们可以自由嵌套和组合,而不需要复杂的回调或事件机制。
第三,背压控制。当消费者(UI 层或查询引擎)处理不过来时,generator 会自然地暂停产出,避免内存无限增长。这在工具大量并发执行时尤为重要。
第四,取消语义。async generator 天然支持取消——消费者只需要停止迭代(break 出 for-await-of 循环),generator 就会被垃圾回收。相比之下,基于 Promise.all 的并发方案在需要提前终止时会面临很大的复杂性。
在 Claude Code 的整个代码库中,async generator 是一种无处不在的模式。从 API 流式响应、工具编排、Hook 执行到 UI 更新,几乎所有涉及"随时间逐步产出结果"的场景都使用了这种抽象。这不是偶然的选择,而是经过反复验证的架构决策——它为复杂的异步流程提供了一种线性的、可组合的编程模型。
下图展示了 runTools() 的完整编排流程,从 API 响应中的工具调用块到最终结果产出的全链路:
7.2 批次分区与并发决策
partitionToolCalls 算法
批次分区是编排层最关键的决策点。partitionToolCalls() 函数将一组工具调用分为若干批次,每个批次要么全部并发执行,要么包含一个串行执行的工具:
typescript
// 源码位置: src/services/tools/toolOrchestration.ts
type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }
function partitionToolCalls(
toolUseMessages: ToolUseBlock[],
toolUseContext: ToolUseContext,
): Batch[] {
return toolUseMessages.reduce((acc: Batch[], toolUse) => {
const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
const isConcurrencySafe = parsedInput?.success
? (() => {
try {
return Boolean(tool?.isConcurrencySafe(parsedInput.data))
} catch {
return false
}
})()
: false
if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) {
acc[acc.length - 1]!.blocks.push(toolUse)
} else {
acc.push({ isConcurrencySafe, blocks: [toolUse] })
}
return acc
}, [])
}这个算法使用了一次遍历的贪心策略,通过 reduce 累加器模式实现:从头到尾扫描所有工具调用,对每个工具判断其是否并发安全。如果当前工具是并发安全的,并且上一个批次也是并发安全的,就将当前工具追加到上一个批次(合并连续的读操作);否则开启一个新批次。注意这个算法是在线的(一次遍历),时间复杂度为 O(n),不需要回溯或全局优化。