Skip to content

第7章 工具编排与并发执行

当模型在一次响应中同时调用 Grep、Read、Glob 三个工具搜索代码库时,这些调用是顺序执行还是并行执行?当模型一边读文件一边写文件时,系统如何保证写操作不会和读操作产生竞态条件?当某个 Bash 命令执行失败时,还在排队的其他工具调用该怎么处理?

这些问题的答案,隐藏在 Claude Code 的工具编排层中。这一层位于 API 响应解析和工具实际执行之间,它决定了哪些工具可以并行运行、哪些必须串行等待,以及在整个过程中如何维护上下文一致性。

本章将深入这个编排层的完整实现,从批次分区算法到并发执行引擎,从 Pre/Post Hooks 管道到文件状态追踪,揭示一个生产级 Agent 系统是如何高效且安全地管理数十个工具的并发调度的。理解工具编排的设计,不仅有助于理解 Claude Code 本身的行为特征(比如为什么有些操作明显更快),更能为构建自己的 Agent 系统提供可直接复用的工程模式。

本章要点

  • 工具编排的核心是 runTools() async generator:它从 API 响应中提取工具调用块,分批执行并流式产出结果
  • 批次分区算法:连续的读操作合并为一个并发批次,写操作独占一个串行批次
  • 双模式执行引擎runToolsConcurrently() 利用 all() 并发调度器实现有上限的并行执行,runToolsSerially() 保证写操作的顺序一致性
  • 完整的工具执行管道:每个工具调用经历输入校验、Pre-Hook、权限检查、实际执行、Post-Hook、结果收集六个阶段
  • 文件状态缓存FileStateCache 基于 LRU 策略缓存已读文件,避免重复 IO;FileHistory 在每次写操作前创建备份,支持回退
  • Hook 系统PreToolUsePostToolUse 钩子允许用户自定义命令在工具执行前后介入,实现权限控制、日志记录、输入改写等高级功能

7.1 工具编排总览

从 API 响应到工具执行

在第5章中,我们分析了 Claude Code 的流式响应处理。当 API 返回的助手消息中包含 tool_use 类型的内容块时,查询引擎需要提取这些工具调用并编排它们的执行。这个提取过程发生在 query.ts 的主循环中:

typescript
// 源码位置: src/query.ts
const toolUseBlocks: ToolUseBlock[] = []

// 流式处理中,每当检测到 tool_use 块就收集起来
const assistantMessages: AssistantMessage[] = []

// 流结束后,将工具调用交给编排层
const toolUpdates = streamingToolExecutor
  ? streamingToolExecutor.getRemainingResults()
  : runTools(toolUseBlocks, assistantMessages, canUseTool, toolUseContext)

for await (const update of toolUpdates) {
  if (update.message) {
    // 将工具执行结果加入消息流
  }
}

这里有一个关键的分支:系统可以使用 StreamingToolExecutor(流式工具执行器,在 API 响应流式传入过程中就开始执行工具)或者 runTools()(批量编排,等所有工具调用解析完毕后统一调度)。两者的编排逻辑相似,但 runTools() 是更核心的抽象,它清晰地展示了编排层的完整决策链路。

之所以存在两种模式,是因为实际场景中存在一个权衡:流式模式可以让工具尽早开始执行(API 还在返回后续 tool_use 块时,前面的工具已经在运行了),带来更低的端到端延迟;而批量模式拥有全部工具调用的全局视图,可以做出更优的分区决策。对于大多数交互式场景,流式模式的延迟优势更有价值;在某些需要精确控制执行顺序的场景(如查询辅助工具调用),批量模式更为可靠。

runTools() 的结构

runTools() 是整个编排层的入口,定义在 src/services/tools/toolOrchestration.ts 中。它是一个 async generator 函数,接收工具调用块列表,产出执行结果流:

typescript
// 源码位置: src/services/tools/toolOrchestration.ts
export async function* runTools(
  toolUseMessages: ToolUseBlock[],
  assistantMessages: AssistantMessage[],
  canUseTool: CanUseToolFn,
  toolUseContext: ToolUseContext,
): AsyncGenerator<MessageUpdate, void> {
  let currentContext = toolUseContext
  for (const { isConcurrencySafe, blocks } of partitionToolCalls(
    toolUseMessages,
    currentContext,
  )) {
    if (isConcurrencySafe) {
      // 并发执行读操作批次
      const queuedContextModifiers: Record<
        string,
        ((context: ToolUseContext) => ToolUseContext)[]
      > = {}
      for await (const update of runToolsConcurrently(
        blocks, assistantMessages, canUseTool, currentContext,
      )) {
        if (update.contextModifier) {
          const { toolUseID, modifyContext } = update.contextModifier
          if (!queuedContextModifiers[toolUseID]) {
            queuedContextModifiers[toolUseID] = []
          }
          queuedContextModifiers[toolUseID].push(modifyContext)
        }
        yield { message: update.message, newContext: currentContext }
      }
      // 批次结束后,按工具顺序应用上下文修改
      for (const block of blocks) {
        const modifiers = queuedContextModifiers[block.id]
        if (!modifiers) continue
        for (const modifier of modifiers) {
          currentContext = modifier(currentContext)
        }
      }
      yield { newContext: currentContext }
    } else {
      // 串行执行写操作批次
      for await (const update of runToolsSerially(
        blocks, assistantMessages, canUseTool, currentContext,
      )) {
        if (update.newContext) {
          currentContext = update.newContext
        }
        yield { message: update.message, newContext: currentContext }
      }
    }
  }
}

这段代码的核心逻辑可以归纳为三步:分区 -> 判断 -> 执行。首先调用 partitionToolCalls() 将工具调用分为若干批次,然后根据每个批次的 isConcurrencySafe 标志选择并发或串行执行路径。

值得注意的是 currentContext 的管理策略。对于并发批次,上下文修改器(contextModifier)不会在执行过程中立即生效,而是先缓存起来,等整个批次执行完毕后再按照工具的原始顺序依次应用。这保证了并发执行不会导致上下文状态的不确定性。对于串行批次,每个工具执行完毕后上下文修改立即生效,后续工具看到的是更新后的上下文。

使用 async generator 的设计哲学

选择 async generator 作为编排层的核心抽象并非偶然。这种模式带来三个关键优势:

第一,流式产出。工具执行的结果不需要等所有工具跑完才返回,而是执行一个就产出一个。对于 UI 层来说,用户可以实时看到每个工具的执行进度和结果。

第二,可组合性runTools() 产出的是 AsyncGenerator<MessageUpdate>,而 runToolsConcurrently()runToolsSerially() 也是同样类型的 generator。它们可以自由嵌套和组合,而不需要复杂的回调或事件机制。

第三,背压控制。当消费者(UI 层或查询引擎)处理不过来时,generator 会自然地暂停产出,避免内存无限增长。这在工具大量并发执行时尤为重要。

第四,取消语义。async generator 天然支持取消——消费者只需要停止迭代(break 出 for-await-of 循环),generator 就会被垃圾回收。相比之下,基于 Promise.all 的并发方案在需要提前终止时会面临很大的复杂性。

在 Claude Code 的整个代码库中,async generator 是一种无处不在的模式。从 API 流式响应、工具编排、Hook 执行到 UI 更新,几乎所有涉及"随时间逐步产出结果"的场景都使用了这种抽象。这不是偶然的选择,而是经过反复验证的架构决策——它为复杂的异步流程提供了一种线性的、可组合的编程模型。

下图展示了 runTools() 的完整编排流程,从 API 响应中的工具调用块到最终结果产出的全链路:

7.2 批次分区与并发决策

partitionToolCalls 算法

批次分区是编排层最关键的决策点。partitionToolCalls() 函数将一组工具调用分为若干批次,每个批次要么全部并发执行,要么包含一个串行执行的工具:

typescript
// 源码位置: src/services/tools/toolOrchestration.ts
type Batch = { isConcurrencySafe: boolean; blocks: ToolUseBlock[] }

function partitionToolCalls(
  toolUseMessages: ToolUseBlock[],
  toolUseContext: ToolUseContext,
): Batch[] {
  return toolUseMessages.reduce((acc: Batch[], toolUse) => {
    const tool = findToolByName(toolUseContext.options.tools, toolUse.name)
    const parsedInput = tool?.inputSchema.safeParse(toolUse.input)
    const isConcurrencySafe = parsedInput?.success
      ? (() => {
          try {
            return Boolean(tool?.isConcurrencySafe(parsedInput.data))
          } catch {
            return false
          }
        })()
      : false
    if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) {
      acc[acc.length - 1]!.blocks.push(toolUse)
    } else {
      acc.push({ isConcurrencySafe, blocks: [toolUse] })
    }
    return acc
  }, [])
}

这个算法使用了一次遍历的贪心策略,通过 reduce 累加器模式实现:从头到尾扫描所有工具调用,对每个工具判断其是否并发安全。如果当前工具是并发安全的,并且上一个批次也是并发安全的,就将当前工具追加到上一个批次(合并连续的读操作);否则开启一个新批次。注意这个算法是在线的(一次遍历),时间复杂度为 O(n),不需要回溯或全局优化。

基于 VitePress 构建