Harness Engineering

第3章 Agent Loop:心跳与决策循环

作者 杨艺韬 · 10,845 字

第3章 Agent Loop:心跳与决策循环

每一个 AI Agent 的核心都是一个循环——观察、思考、行动、再观察。这个循环的工程质量,决定了 Agent 是一个惊艳的 demo 还是一个可靠的生产系统。

3.1 从 OODA 到 Agent Loop

军事理论家 John Boyd 提出的 OODA 循环(Observe → Orient → Decide → Act)被广泛应用于决策理论。AI Agent 的核心循环本质上是 OODA 的工程化实现:

flowchart TD
    O["🔍 Observe\n接收输入/工具结果"] --> T["🧠 Think\n模型推理决策"]
    T --> D{"需要工具?"}
    D -->|是| A["⚡ Act\n执行工具调用"]
    A --> F["📥 Feedback\n工具返回结果"]
    F --> O
    D -->|否| R["✅ 输出最终回答"]

    style T fill:#fef3c7,stroke:#f59e0b
    style A fill:#dbeafe,stroke:#3b82f6
    style R fill:#dcfce7,stroke:#22c55e
  • Observe:接收用户输入或上一轮工具执行的结果
  • Think:大模型基于当前上下文进行推理,决定下一步动作
  • Act:调用工具、生成代码、发送请求
  • Feedback:工具返回结果,成为下一轮 Observe 的输入

看起来简单,但魔鬼藏在每一个箭头里。模型可能产生幻觉调用不存在的工具,工具可能超时或失败,循环可能陷入无限重试。一个生产级的 Agent Loop 需要处理所有这些情况。

让我们先看两个真实系统是怎么做的。

3.2 Claude Code 的 Agent Loop 实现

Claude Code 的 Agent Loop 是一个经典的 while 循环实现。剥去日志、遥测等非核心逻辑后,其核心骨架如下:

async function agentLoop(
  userMessage: string,
  context: ConversationContext
): Promise<void> {
  // 将用户消息加入对话历史
  context.messages.push({ role: "user", content: userMessage });

  let shouldContinue = true;

  while (shouldContinue) {
    // 1. Think: 调用模型
    const response = await queryModel({
      messages: context.messages,
      tools: context.availableTools,
      system: context.systemPrompt,
    });

    // 2. 将模型响应加入对话历史
    context.messages.push({ role: "assistant", content: response.content });

    // 3. 检查是否有工具调用
    const toolUses = response.content.filter(
      (block) => block.type === "tool_use"
    );

    if (toolUses.length === 0) {
      // 模型没有调用工具,说明它认为任务完成了
      shouldContinue = false;
      break;
    }

    // 4. Act: 执行工具调用(支持并行)
    const toolResults = await Promise.all(
      toolUses.map(async (toolUse) => {
        const result = await executeTool(toolUse.name, toolUse.input);
        return {
          type: "tool_result",
          tool_use_id: toolUse.id,
          content: result,
        };
      })
    );

    // 5. Feedback: 将工具结果加入对话历史
    context.messages.push({ role: "user", content: toolResults });

    // 6. 检查终止条件
    if (context.messages.length > MAX_TURNS) {
      shouldContinue = false;
    }
  }
}

这段代码揭示了几个关键设计决策:

决策一:循环的驱动力是工具调用。 模型不调用工具 = 任务完成。这是一个优雅的终止条件——不需要额外的”done”信号,模型自己通过行为表达”我做完了”。

决策二:工具并行执行。 Promise.all 意味着同一轮迭代中的多个工具调用是并发的。当模型同时请求读取三个文件时,三次 IO 并行发生,而不是串行等待。这在实践中能将某些迭代的耗时缩短数倍。

决策三:工具结果以 user 角色回传。 这是 Anthropic API 的约定——工具结果被包装成 user 消息,让模型在下一轮能”看到”工具的输出。这保持了对话的 user/assistant 交替结构。

但真实的 Claude Code 远比这复杂。让我们逐一看它处理的边界情况。

3.2.1 查询引擎与消息调度

Claude Code 的查询引擎不是简单地把消息丢给 API。它在发送之前要做大量预处理:

async function queryModelWithPreprocessing(
  context: ConversationContext
): Promise<ModelResponse> {
  // 1. 上下文窗口管理:如果消息太多,截断早期内容
  const messages = truncateToFitContext(context.messages, {
    maxTokens: MODEL_CONTEXT_LIMIT,
    strategy: "keep-recent-and-system",
  });

  // 2. 注入系统提醒(system reminder)
  //    在最后一条 user 消息中附加实时上下文
  const enrichedMessages = injectSystemReminder(messages, {
    currentDir: process.cwd(),
    gitStatus: await getGitStatus(),
    activeFile: context.activeFile,
  });

  // 3. 工具过滤:根据权限模型过滤可用工具
  const tools = filterToolsByPermission(
    context.availableTools,
    context.permissionLevel
  );

  // 4. 发送请求,启用流式响应
  const stream = await anthropic.messages.stream({
    model: context.model,
    messages: enrichedMessages,
    tools,
    system: context.systemPrompt,
    max_tokens: 16384,
  });

  return stream;
}

注意第 2 步的 injectSystemReminder。这是 Claude Code 的一个精妙设计:每次循环迭代都会在消息末尾注入最新的环境状态(当前目录、git 状态等)。这确保模型始终基于最新信息做决策,而不是依赖几轮之前的过时上下文。这个机制我们会在第 8 章”提示词架构”中详细讨论。

3.2.2 流式处理与实时反馈

在生产系统中,模型的推理可能需要 10-30 秒。如果让用户干等一个 loading 动画,体验是灾难性的。Claude Code 通过流式处理解决这个问题:

async function processStreamingResponse(
  stream: MessageStream,
  onText: (text: string) => void,
  onToolStart: (tool: ToolUse) => void,
  onToolEnd: (result: ToolResult) => void
): Promise<ModelResponse> {
  const contentBlocks: ContentBlock[] = [];
  let currentBlock: Partial<ContentBlock> | null = null;

  for await (const event of stream) {
    switch (event.type) {
      case "content_block_start":
        currentBlock = event.content_block;
        if (currentBlock.type === "tool_use") {
          onToolStart(currentBlock as ToolUse);
        }
        break;

      case "content_block_delta":
        if (event.delta.type === "text_delta") {
          // 文本流式输出——用户立刻看到模型的思考过程
          onText(event.delta.text);
        } else if (event.delta.type === "input_json_delta") {
          // 工具参数逐步到达——可以提前展示工具调用意图
          accumulateToolInput(currentBlock, event.delta.partial_json);
        }
        break;

      case "content_block_stop":
        contentBlocks.push(currentBlock as ContentBlock);
        currentBlock = null;
        break;

      case "message_stop":
        return { content: contentBlocks, stopReason: event.stop_reason };
    }
  }
}

流式处理不只是”好看”。它有两个关键工程价值:

  1. 用户感知延迟降低:第一个 token 到达的时间(TTFT)通常在 1-2 秒内,用户立刻知道系统在工作。
  2. 提前执行工具:某些实现会在工具参数完整但消息尚未结束时就开始执行工具,进一步压缩端到端延迟。

3.3 LangGraph 的状态机方法

如果说 Claude Code 的 Agent Loop 是”命令式的 while 循环”,那 LangGraph 就是”声明式的状态图”。LangGraph 基于 Pregel 执行引擎,将 Agent Loop 建模为一个有向图的遍历过程。

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    iteration_count: int

def call_model(state: AgentState) -> dict:
    """Think 节点:调用模型"""
    messages = state["messages"]
    response = model.invoke(messages)
    return {
        "messages": [response],
        "iteration_count": state["iteration_count"] + 1,
    }

def call_tools(state: AgentState) -> dict:
    """Act 节点:执行工具"""
    last_message = state["messages"][-1]
    tool_calls = last_message.tool_calls

    results = []
    for call in tool_calls:
        tool = tool_registry[call["name"]]
        result = tool.invoke(call["args"])
        results.append(ToolMessage(content=result, tool_call_id=call["id"]))

    return {"messages": results}

def should_continue(state: AgentState) -> str:
    """路由函数:决定下一步去哪个节点"""
    last_message = state["messages"][-1]

    # 终止条件 1:模型没有调用工具
    if not last_message.tool_calls:
        return "end"

    # 终止条件 2:超过最大迭代次数
    if state["iteration_count"] >= MAX_ITERATIONS:
        return "end"

    return "tools"

# 构建状态图
graph = StateGraph(AgentState)
graph.add_node("model", call_model)
graph.add_node("tools", call_tools)
graph.set_entry_point("model")
graph.add_conditional_edges("model", should_continue, {
    "tools": "tools",
    "end": END,
})
graph.add_edge("tools", "model")

# 编译为可执行的 Runnable
agent = graph.compile()

这种方法和 while 循环在运行时行为上是等价的,但在工程特性上有显著差异:

特性while 循环(Claude Code)状态图(LangGraph)
可视化需要额外日志图结构天然可视化
中断恢复需要手动序列化状态快照内置
分支逻辑if/else 嵌套条件边,声明式
调试断点+日志节点级别 trace
并发控制开发者自己管理Pregel 引擎管理
灵活性极高受图结构约束

3.3.1 Pregel 执行引擎

LangGraph 的执行引擎借鉴了 Google 的 Pregel 图计算模型。每个”超级步”(superstep)中,所有就绪的节点并行执行,结果汇入共享状态,然后进入下一个超级步。

class PregelExecutor:
    def __init__(self, graph, state):
        self.graph = graph
        self.state = state
        self.checkpoint_store = CheckpointStore()

    async def run(self):
        current_node = self.graph.entry_point

        while current_node != END:
            # 1. 保存检查点(支持中断恢复)
            self.checkpoint_store.save(self.state, step=current_node)

            # 2. 执行当前节点
            node_fn = self.graph.nodes[current_node]
            updates = await node_fn(self.state)

            # 3. 合并状态更新
            self.state = merge_state(self.state, updates)

            # 4. 评估出边,决定下一个节点
            edges = self.graph.get_edges(current_node)
            current_node = evaluate_edges(edges, self.state)

        return self.state

检查点机制是 LangGraph 的杀手级特性之一。当 Agent 执行到一半需要人工审批时(比如确认是否执行一个危险的数据库操作),可以保存当前状态、挂起执行、等待人工介入、然后从断点恢复。这种能力在 while 循环模式下需要大量额外工程来实现。

3.4 循环终止:最被低估的工程问题

一个失控的 Agent Loop 能在几分钟内烧掉几十美元的 API 费用,更糟糕的是可能执行数百次无意义的工具调用对外部系统造成副作用。循环终止条件的设计是 Agent Loop 中最被低估的工程问题。

flowchart TD
    L["循环迭代完成"] --> C1{"模型停止调用工具?"}
    C1 -->|是| T1["✅ 自然终止"]
    C1 -->|否| C2{"迭代次数 ≥ 上限?"}
    C2 -->|是| T2["⛔ 强制终止\n(max iterations)"]
    C2 -->|否| C3{"Token 预算耗尽?"}
    C3 -->|是| T3["⛔ 预算终止\n(token budget)"]
    C3 -->|否| C4{"墙钟超时?"}
    C4 -->|是| T4["⛔ 超时终止\n(wall time)"]
    C4 -->|否| C5{"检测到死循环?"}
    C5 -->|是| T5["⛔ 熔断终止\n(circuit breaker)"]
    C5 -->|否| N["继续下一轮迭代"]

    style T1 fill:#dcfce7,stroke:#22c55e
    style T2 fill:#fee2e2,stroke:#ef4444
    style T3 fill:#fee2e2,stroke:#ef4444
    style T4 fill:#fee2e2,stroke:#ef4444
    style T5 fill:#fee2e2,stroke:#ef4444

3.4.1 终止条件的完整清单

一个生产级的 Agent Loop 至少需要以下终止条件:

function checkTermination(context: LoopContext): TerminationReason | null {
  // 1. 自然终止:模型不再调用工具
  if (context.lastResponse.stopReason === "end_turn") {
    return { reason: "natural", message: "模型认为任务完成" };
  }

  // 2. 迭代上限
  if (context.iterationCount >= context.maxIterations) {
    return {
      reason: "max_iterations",
      message: `达到最大迭代次数 ${context.maxIterations}`,
    };
  }

  // 3. Token 预算耗尽
  if (context.totalTokensUsed >= context.tokenBudget) {
    return {
      reason: "token_budget",
      message: `Token 使用量 ${context.totalTokensUsed} 超出预算`,
    };
  }

  // 4. 时间超限
  if (Date.now() - context.startTime >= context.timeoutMs) {
    return {
      reason: "timeout",
      message: `执行时间超过 ${context.timeoutMs / 1000} 秒`,
    };
  }

  // 5. 循环检测:连续相同的工具调用
  if (detectRepeatingPattern(context.toolCallHistory)) {
    return {
      reason: "loop_detected",
      message: "检测到重复的工具调用模式",
    };
  }

  // 6. 致命错误累积
  if (context.consecutiveErrors >= context.maxConsecutiveErrors) {
    return {
      reason: "error_threshold",
      message: `连续 ${context.consecutiveErrors} 次错误`,
    };
  }

  return null; // 继续执行
}

3.4.2 循环检测的工程实现

循环检测值得单独讨论。最简单的方法是检查连续 N 次工具调用是否完全相同:

function detectRepeatingPattern(
  history: ToolCall[],
  windowSize: number = 3
): boolean {
  if (history.length < windowSize * 2) return false;

  const recent = history.slice(-windowSize);
  const previous = history.slice(-windowSize * 2, -windowSize);

  // 比较最近的 N 次调用和之前的 N 次调用是否相同
  return recent.every(
    (call, i) =>
      call.name === previous[i].name &&
      JSON.stringify(call.input) === JSON.stringify(previous[i].input)
  );
}

但实际中更常见的是”语义循环”——模型不断尝试略有不同的参数调用同一个工具,每次都失败。对付这种情况需要更高级的策略:

function detectSemanticLoop(
  history: ToolCall[],
  windowSize: number = 5
): boolean {
  if (history.length < windowSize) return false;

  const recent = history.slice(-windowSize);

  // 统计最近 N 次中同一个工具被调用的比例
  const toolCounts = new Map<string, number>();
  for (const call of recent) {
    toolCounts.set(call.name, (toolCounts.get(call.name) || 0) + 1);
  }

  // 如果同一个工具占了 80% 以上的调用,且都返回了错误
  for (const [toolName, count] of toolCounts) {
    if (count / windowSize >= 0.8) {
      const recentResults = recent
        .filter((c) => c.name === toolName)
        .map((c) => c.result);
      const allFailed = recentResults.every((r) => r.isError);
      if (allFailed) return true;
    }
  }

  return false;
}

3.5 错误处理:让循环具备韧性

在生产环境中,Agent Loop 中的每一步都可能失败。工具可能超时,API 可能限流,模型可能返回畸形的 JSON。一个健壮的循环需要在不同层次处理错误。

3.5.1 工具级错误处理

最常见的错误发生在工具执行阶段。关键原则是:不要让工具的失败终止循环,而是把错误信息交给模型,让模型决定下一步。

async function executeToolSafely(
  toolName: string,
  toolInput: unknown
): Promise<ToolResult> {
  try {
    // 设置单个工具的执行超时
    const result = await withTimeout(
      executeTool(toolName, toolInput),
      TOOL_TIMEOUT_MS
    );

    return { content: result, isError: false };
  } catch (error) {
    if (error instanceof TimeoutError) {
      return {
        content: `工具 ${toolName} 执行超时(${TOOL_TIMEOUT_MS / 1000}秒)。` +
          `请尝试减小操作范围或使用其他方式。`,
        isError: true,
      };
    }

    if (error instanceof PermissionError) {
      return {
        content: `权限不足:${error.message}。该操作需要用户确认。`,
        isError: true,
      };
    }

    // 未知错误:返回有用的错误信息,但不暴露内部细节
    return {
      content: `工具 ${toolName} 执行失败:${error.message}。` +
        `你可以尝试不同的参数或使用其他工具。`,
      isError: true,
    };
  }
}

注意错误消息的措辞。我们不只是说”失败了”,而是给模型提供行动建议:“请尝试减小操作范围”、“你可以尝试不同的参数”。这种设计让模型有机会自我纠正,而不是在相同的错误上反复碰壁。

3.5.2 模型级错误处理

模型本身也可能出错——返回无效的工具调用、产生幻觉工具名、或者 API 请求失败:

async function queryModelWithRetry(
  messages: Message[],
  tools: Tool[],
  maxRetries: number = 3
): Promise<ModelResponse> {
  let lastError: Error | null = null;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      const response = await queryModel(messages, tools);

      // 验证响应中的工具调用是否合法
      for (const block of response.content) {
        if (block.type === "tool_use") {
          if (!tools.some((t) => t.name === block.name)) {
            // 模型幻觉了一个不存在的工具
            // 不重试,而是将错误信息反馈给模型
            return createErrorFeedback(
              `工具 "${block.name}" 不存在。可用工具:${tools.map((t) => t.name).join(", ")}`
            );
          }
        }
      }

      return response;
    } catch (error) {
      lastError = error;

      if (error instanceof RateLimitError) {
        // 限流:指数退避重试
        const backoffMs = Math.min(1000 * 2 ** attempt, 30000);
        await sleep(backoffMs);
        continue;
      }

      if (error instanceof OverloadedError) {
        // 服务过载:等待更长时间
        await sleep(5000 * (attempt + 1));
        continue;
      }

      // 其他错误不重试
      throw error;
    }
  }

  throw new MaxRetriesError(`模型调用失败 ${maxRetries} 次`, lastError);
}

这里有一个细微但重要的区别:网络层错误(限流、过载)通过重试处理,逻辑层错误(幻觉工具名)通过反馈处理。 重试是对外部环境的容错;反馈是对模型行为的纠正。把它们混为一谈是很多 Agent 系统不稳定的根源。

3.6 并发:单次迭代内的并行执行

Claude Code 支持模型在一次响应中返回多个工具调用,并且并行执行它们。这个能力在实际使用中非常重要——想象模型需要读取 5 个文件来理解一个 bug,如果串行读取需要 5 次 IO,并行只需要 1 次。

但并行执行带来了新的工程问题:

async function executeToolsConcurrently(
  toolUses: ToolUse[],
  concurrencyLimit: number = 10
): Promise<ToolResult[]> {
  // 使用信号量控制并发数
  const semaphore = new Semaphore(concurrencyLimit);
  const results: ToolResult[] = [];

  const promises = toolUses.map(async (toolUse, index) => {
    await semaphore.acquire();
    try {
      const result = await executeToolSafely(toolUse.name, toolUse.input);
      results[index] = {
        type: "tool_result",
        tool_use_id: toolUse.id,
        content: result.content,
        is_error: result.isError,
      };
    } finally {
      semaphore.release();
    }
  });

  // 使用 allSettled 而非 all——一个工具失败不应阻塞其他工具
  await Promise.allSettled(promises);

  return results;
}

关键设计点:

  1. 并发上限:不设上限可能导致系统资源耗尽。Claude Code 默认限制为合理的并发数。
  2. allSettled 而非 allPromise.all 在任一 Promise 失败时就会 reject。但工具 A 的失败不应该影响工具 B 的结果。allSettled 确保所有工具都有机会完成。
  3. 结果顺序保持:通过 results[index] 保证结果数组的顺序与工具调用数组一致,这对模型理解结果至关重要。

3.6.1 工具间的依赖与冲突

并行执行还要考虑工具之间的语义冲突。比如模型同时调用”写入文件 A”和”读取文件 A”——执行顺序会影响结果的正确性。

Claude Code 的策略是简洁的:信任模型。 如果模型同时发出了两个工具调用,它应该对并行执行有预期。在实践中,模型几乎总是在同一批次中发出无依赖的并行请求(比如同时读取多个不同的文件),很少出现冲突情况。

但如果你在构建自己的 Agent 系统且无法完全信任模型的判断,可以引入工具冲突检测:

function partitionByConflict(toolUses: ToolUse[]): ToolUse[][] {
  const groups: ToolUse[][] = [];
  const resourceLocks = new Map<string, number>();

  for (const toolUse of toolUses) {
    const resources = getAffectedResources(toolUse);
    let conflictGroup = -1;

    for (const resource of resources) {
      if (resourceLocks.has(resource)) {
        conflictGroup = Math.max(conflictGroup, resourceLocks.get(resource)!);
      }
    }

    const targetGroup = conflictGroup + 1;
    if (!groups[targetGroup]) groups[targetGroup] = [];
    groups[targetGroup].push(toolUse);

    for (const resource of resources) {
      resourceLocks.set(resource, targetGroup);
    }
  }

  return groups; // 组内并行,组间串行
}

3.7 心跳机制:让用户知道”我还活着”

Agent 执行复杂任务可能需要几分钟。在这段时间里,用户最大的焦虑来源不是”慢”,而是”不知道在干什么”。心跳机制(heartbeat)解决的就是这个问题。

3.7.1 多层级的进度反馈

一个完善的心跳系统应该提供多个层级的信息:

interface HeartbeatSystem {
  // 第一层:循环级别——当前是第几轮迭代
  onIterationStart(iteration: number, totalEstimate?: number): void;

  // 第二层:工具级别——正在执行什么工具
  onToolStart(toolName: string, toolInput: unknown): void;
  onToolEnd(toolName: string, result: ToolResult): void;

  // 第三层:文本级别——模型正在生成什么内容
  onTextDelta(text: string): void;

  // 第四层:元信息——资源消耗情况
  onMetrics(metrics: {
    tokensUsed: number;
    elapsedMs: number;
    toolCallCount: number;
  }): void;
}

Claude Code 的实现是终端 UI 驱动的。每次工具开始执行时,界面上会显示一个旋转的 spinner 和工具名称;执行完成后显示结果摘要。模型生成的文本则实时流式输出。这看起来是 UI 细节,但它本质上是 Agent Loop 的一部分——反馈回路不只包括给模型的反馈,也包括给用户的反馈。

3.7.2 进度估算

心跳的一个进阶需求是进度估算。用户不仅想知道”在干什么”,还想知道”大概还要多久”。

class ProgressEstimator {
  private history: { taskType: string; iterations: number; durationMs: number }[] = [];

  estimateProgress(
    currentIteration: number,
    taskType: string
  ): { percent: number; remainingMs: number } | null {
    // 基于历史数据估算
    const similar = this.history.filter((h) => h.taskType === taskType);
    if (similar.length < 3) return null; // 数据不足,不估算

    const avgIterations = similar.reduce((s, h) => s + h.iterations, 0) / similar.length;
    const avgDuration = similar.reduce((s, h) => s + h.durationMs, 0) / similar.length;

    const percent = Math.min((currentIteration / avgIterations) * 100, 95);
    const remainingMs = Math.max(
      avgDuration * (1 - currentIteration / avgIterations),
      0
    );

    return { percent, remainingMs };
  }

  recordCompletion(taskType: string, iterations: number, durationMs: number): void {
    this.history.push({ taskType, iterations, durationMs });
    // 只保留最近 100 条记录
    if (this.history.length > 100) this.history.shift();
  }
}

注意 Math.min(..., 95) 这个细节。永远不要告诉用户”99% 完成”然后让他们再等 2 分钟——这比不给进度条更让人抓狂。上限设为 95%,直到真正完成才跳到 100%。

3.8 完整的生产级 Agent Loop

把前面讨论的所有机制组合起来,一个生产级的 Agent Loop 长这样:

async function productionAgentLoop(
  userMessage: string,
  context: ConversationContext,
  heartbeat: HeartbeatSystem
): Promise<AgentResult> {
  context.messages.push({ role: "user", content: userMessage });

  const startTime = Date.now();
  let iterationCount = 0;

  while (true) {
    iterationCount++;
    heartbeat.onIterationStart(iterationCount);

    // ---- Think ----
    let response: ModelResponse;
    try {
      response = await queryModelWithRetry(
        preprocessMessages(context),
        getPermittedTools(context),
        3 // max retries
      );
    } catch (error) {
      return {
        status: "error",
        message: `模型调用失败:${error.message}`,
        iterations: iterationCount,
      };
    }

    // 流式输出模型的文本内容
    for (const block of response.content) {
      if (block.type === "text") {
        heartbeat.onTextDelta(block.text);
      }
    }

    context.messages.push({ role: "assistant", content: response.content });

    // ---- 提取工具调用 ----
    const toolUses = response.content.filter(
      (block) => block.type === "tool_use"
    );

    // ---- 检查终止条件 ----
    const termination = checkTermination({
      lastResponse: response,
      iterationCount,
      startTime,
      maxIterations: context.config.maxIterations ?? 50,
      timeoutMs: context.config.timeoutMs ?? 600_000,
      totalTokensUsed: context.totalTokens,
      tokenBudget: context.config.tokenBudget ?? 1_000_000,
      toolCallHistory: context.toolCallHistory,
      consecutiveErrors: context.consecutiveErrors,
      maxConsecutiveErrors: 5,
    });

    if (termination) {
      heartbeat.onMetrics({
        tokensUsed: context.totalTokens,
        elapsedMs: Date.now() - startTime,
        toolCallCount: context.toolCallHistory.length,
      });
      return {
        status: termination.reason === "natural" ? "success" : "terminated",
        message: termination.message,
        iterations: iterationCount,
      };
    }

    // ---- Act: 并行执行工具 ----
    for (const toolUse of toolUses) {
      heartbeat.onToolStart(toolUse.name, toolUse.input);
    }

    const toolResults = await executeToolsConcurrently(toolUses);

    for (let i = 0; i < toolUses.length; i++) {
      heartbeat.onToolEnd(toolUses[i].name, toolResults[i]);
      context.toolCallHistory.push({
        name: toolUses[i].name,
        input: toolUses[i].input,
        result: toolResults[i],
      });

      // 更新连续错误计数
      if (toolResults[i].is_error) {
        context.consecutiveErrors++;
      } else {
        context.consecutiveErrors = 0;
      }
    }

    // ---- Feedback: 工具结果回传 ----
    context.messages.push({ role: "user", content: toolResults });
  }
}

3.9 方法论提炼

回顾本章内容,我们可以提炼出关于 Agent Loop 设计的几条核心方法论:

方法论一:终止条件是首要设计对象。 大多数人设计 Agent Loop 时先想”怎么跑起来”,但生产系统中最先爆出问题的是”怎么停下来”。你的终止条件清单至少应该包括:自然终止、迭代上限、token 预算、时间超限、循环检测、错误累积阈值。

方法论二:错误分层处理,不要一刀切。 网络层错误用重试解决,逻辑层错误用反馈解决,资源层错误用终止解决。把工具失败的信息交给模型而不是直接抛异常,这是让 Agent 具备自愈能力的关键。

方法论三:心跳不是锦上添花,是基础设施。 对于任何执行时间超过 5 秒的 Agent 操作,都应该有进度反馈机制。流式输出、工具执行状态、迭代计数——这些信息让用户从”等待黑盒”变成”观察同事工作”。

方法论四:并发是性能的乘数,但要有控制。 并行工具执行可以显著缩短端到端延迟,但需要设置并发上限、使用 allSettled 容错、保持结果顺序。在无法确保工具间无冲突时,按资源分组串行执行。

方法论五:循环和状态图是两种等价的表达方式,选择取决于你的需求。 如果需要中断恢复、可视化调试、复杂的分支逻辑,状态图(LangGraph)更合适。如果追求极致的灵活性和最小的抽象开销,while 循环(Claude Code)更直接。没有”更好”的方案,只有”更适合”的方案。

下一章我们将讨论 Agent Loop 中最关键的输入——上下文。如何在有限的上下文窗口中塞入最有用的信息,是决定 Agent 能力上限的核心工程问题。

3.10 Claude Code 真实的 query.ts:1729 行是怎样炼成的

本章前面给的伪代码是 while (true) + 一个终止条件——Claude Code 的真实实现src/query.ts是一个 async generator,1729 行。打开它你会震惊——看似简单的”loop”里藏了 12 种终止路径、多层 fallback、reactive compact、token budget tracking

核心签名(query.ts:219-239)——

export async function* query(params: QueryParams):
  AsyncGenerator<StreamEvent | RequestStartEvent | Message | ... , Terminal>
{
  const terminal = yield* queryLoop(params, consumedCommandUuids)
  for (const uuid of consumedCommandUuids) {
    notifyCommandLifecycle(uuid, 'completed')
  }
  return terminal
}

三个值得记住的决策——

  • async function* 不是 async function——生成器让每一条 stream event、每一条 message、每一次 transition 都能立即 yield 给调用方——用户不等一整轮结束就看到进展
  • yield* queryLoop(...)——把内部循环委托给另一个生成器——外层只做生命周期管理(完成通知)、业务逻辑隔离在 queryLoop
  • 返回 Terminal 类型——不是简单的 void——每种终止都带原因(见下节)

3.10.1 十二种终止路径——一个生产 Agent 的真实”出口”

grep -n "return { reason:"query.ts 里能找到 12 种终止路径

reason触发条件含义
completed模型不再调 tool自然终止(最常见)
max_turns达到 maxTurns 上限强制终止
blocking_limitcontext 使用超过 blocking 限压缩都来不及了
image_error图片 validation / resize 失败输入问题
model_error模型 API 返回错误上游故障
aborted_streaming流式响应期间用户 abort用户取消
aborted_tools工具执行期间 abort用户取消(晚一步)
prompt_too_longAPI 返回 context_length_exceeded压缩已不可挽救
stop_hook_preventedonStop hook 阻止继续用户扩展
hook_stopped其他 hook 拦截用户扩展
collapse_drain_retrycontext collapse 重试 transition内部
reactive_compact_retry反应式压缩重试内部

这 12 种终止路径告诉你什么——生产 Agent 不是能跑就行”——每种可能的结束情况都要有名字、有处理、有 trace”——本章§3.4 的 6 种终止条件只是起步”、真实系统是它的 2 倍复杂。

3.10.2 maxTurns 的精细度

本章前面给的 maxTurns 是一个简单常量——Claude Code 的实现更微妙query.ts:1704-1711):

if (maxTurns && nextTurnCount > maxTurns) {
  yield createAttachmentMessage({ type: 'max_turns_reached', maxTurns, turnCount: nextTurnCount })
  return { reason: 'max_turns', turnCount: nextTurnCount }
}
  • maxTurns 可为 undefined——内部主循环默认不限、外部调用者(SDK / subagent)才限
  • yield max_turns_reached attachment message——让模型/UI 看到是因为轮数到了”、而不是无声地停
  • 返回 turnCount——便于上层统计”实际跑了几轮”——不是 assume 等于 maxTurns

这种”带情境的终止”——比一刀切 if (i > N) break 强得多——调用方能精确区分自然完成vs被限制打断”。

3.11 reactive_compact_retry:反应式压缩的秘密

§3.4 没讲的一种”终止 → 恢复”转换——reactive compact

背景——

  • 正常 compact 由 shouldAutoCompact() 主动触发(本书第 11 章)
  • 某些场景下 Agent 请求响应前 compact 没触发、结果请求发出后因 context 过长被 API 拒绝
  • 这时就要反应式(reactive)——错误发生后、立即触发一次 compact、然后重试

Claude Code 实现query.ts:1100-1170)——

  • 捕获 API 返回的 prompt_too_long 错误
  • reactiveCompact 模块压缩历史
  • state transition: { reason: 'reactive_compact_retry' }
  • 回到循环开头重跑

和 proactive compact 的区别——

  • proactive:预防性、在触发前就压(本书第 11 章§11.2 的 13K buffer 就是干这个)
  • reactive:补救性、已经撞墙后补压

生产 Agent 必须两者都有——只有 proactive 会漏长尾场景(比如动态生成的 tool_result 意外巨大)、只有 reactive 会每次都让用户等一次失败 + 一次重试——组合起来才稳

3.12 Token Budget 跨 compact 的会计难题

queryLoop 的 state 里有个特殊字段 task_budget.remaining——跨越 compact 边界的 token 预算追踪

源码注释(query.ts:275-285)——

task_budget.remaining tracking across compaction boundaries. Undefined until first compact fires — while context is uncompacted the server can see the full history and handles the countdown from {total} itself. After a compact, the server sees only the summary and would under-count spend; remaining tells it the pre-compact final window that got summarized away.

翻译——

  • compact 之前——server 看得到完整历史、自己能算”你已经用掉多少 token”
  • compact 之后——server 只能看到摘要、以为你”才刚开始花钱”
  • client 必须主动告诉 server:摘要之前我已经花了 X、请从 total − X 算起

这是分布式计数的经典难题——两端都在累加、某一刻一端做了 checkpoint、另一端必须把 checkpoint 之前的值补上——Paxos / Raft 的 snapshot_lag 同源——Agent 经济会计从 DB 工程师的工具箱里借了一招

读者启示——任何跨模块的数字累加(token / cost / retry count / latency)——compact / snapshot / restart 之后都要考虑累积值怎么传递——否则数字会漂

3.13 Terminal 类型的工程哲学

§3.10 列的 12 种终止原因——都是 Terminal 类型的tagged union

type Terminal =
  | { reason: 'completed' }
  | { reason: 'max_turns'; turnCount: number }
  | { reason: 'model_error'; error: Error }
  | { reason: 'blocking_limit' }
  // ... 其余 8 种

为什么用 tagged union 而不是 string + any——

  • TypeScript 能验证每种终止的字段max_turns 必带 turnCountmodel_error 必带 error
  • switch-exhaustiveness 检查——加新的终止原因时、所有处理它的地方编译器强制提示要处理——永远不漏分支
  • 序列化友好——可以直接 JSON 存到 trace、不丢信息

和 Rust 的 enum Result<T, E>、Haskell 的 ADT 同源——tagged union 是类型安全和数据完整性的双赢——Agent 的 Terminal 类型就是它的生产案例

本章§3.4 的 TerminationReason 建议立刻升级到 tagged union——不要只用 string

3.14 stopHookActiveonStopHook:让用户扩展终止策略

queryLoop state 还有 stopHookActive——让用户注册自定义终止回调

典型用法——

  • 用户设置”每次 LLM 响应完毕时调一次 onStop”(例如上传 embedding 到向量库)
  • hook 返回 { continue: true } 允许继续、{ continue: false, reason: '...' } 强制停
  • stop hook 本身是异步的、阻塞循环直到返回(由源码中的 stop_hook_active 状态位控制)

这呼应本书第 15 章§15.17 讨论过的 pre/post compact hooks——hook 是”可扩展性”的统一接口——Agent 的每个阶段都应该有 hook 点——给用户插手的能力、不用改平台代码

3.15 并发执行的真实细节:executeTools 内部

§3.6 给的 executeToolsConcurrently 是简化版——Claude Code 实际行为(查 executeTools.tsquery.ts 内 tool 执行段):

  • Read-only 工具(Read/Grep/Glob)——完全并行、默认并发 10
  • Write 工具(Edit/Write/Bash)——串行(防止 file state 竞态)
  • MCP 工具——按 server 串行、跨 server 并行——单个 MCP server 不一定并发安全、多个 server 肯定可以独立
  • 所有工具共享 AbortController——用户 Ctrl-C 时所有正在跑的工具同时 abort

这种”按类别分组并发”——无脑 Promise.all更符合真实工具语义——读文件不会互相干扰、写文件可能干扰——Harness 层知道这点、比让模型判断更可靠

3.16 Heartbeat 不只是 UX——也是安全信号

本章§3.7 讨论 heartbeat 从 UX 角度——再加一个角度heartbeat 是 Agent 没有”卡死”的证据

生产经验——Agent 偶尔会陷入请求发出 + 永远不返回的僵尸态——可能是网络僵死、可能是模型内部 deadlock——没 heartbeat 你不知道

工程做法——

  • heartbeat 除了 UI 更新、每次 tick 都 touch 一个最后活跃时间戳
  • 外层看门狗(watchdog)定期扫这个时间戳——连续 N 分钟无活跃 → 强制 abort
  • abort 时上报 zombie_detected event——trace 里能直接看到zombie rate

这是 heartbeat 的双重职责”——对人是反馈、对系统是 liveness 信号——少了任一个都不完整

3.17 四章打通:loop / context / tool / heartbeat

  • 本章 Agent Loop——心跳频率 + 终止条件
  • 第 4 章上下文——每次迭代的观察是什么
  • 第 5-7 章工具设计——每次迭代的行动能做什么
  • 第 19 章可观测性——每次心跳 trace 给谁看

四章合起来 = 完整的 Agent Runtime——每章单看都有价值、合起来才理解”Agent 运行时不是简单循环”

3.18 和 LangGraph Pregel 的深度对比

本章§3.3 介绍了 LangGraph 的状态图范式——这里做一次更深的对比

维度Claude Code query.tsLangGraph Pregel
抽象单位turn(一次 LLM 调用 + 工具执行)superstep(图的一个遍历步)
状态演进单一 state 对象、每 turn 更新多个 channel 各自版本追踪
并发模型tool 之间按语义分组并发节点之间由 pregel 引擎调度
终止12 种 Terminal tagged unionEND 节点 + 条件边
可恢复靠 session storage 存消息内置 checkpoint + time-travel
侵入性调用方手写 while 风格调用方声明 graph
最佳场景单 agent 高灵活度主循环多 agent 编排 / 长期工作流

两者没有”谁更好”——是两种范式解决不同问题

  • Claude Code 的选择——单用户交互式 CLI——query.ts 的命令式风格更契合
  • LangGraph 的选择——企业多步工作流——状态图 + checkpoint 更契合

读过本书第 13 章 LangGraph 《Streaming》、第 18 章《Design Patterns》——会发现 LangGraph 的 Pregel 模型正是针对**“长时间运行 + 多节点协作优化的——query.ts 的设计目标互补、不冲突

3.19 写一个玩具 Agent Loop 的 20 行

把本章所有概念压缩到最小 demo——20 行能跑的 Agent Loop

async function* toyAgentLoop(history: Message[], maxTurns = 10): AsyncGenerator<Message, Terminal> {
  for (let turn = 1; turn <= maxTurns; turn++) {
    const response = await llm.chat(history, { tools })
    history.push({ role: 'assistant', content: response.content })
    yield response
    const toolUses = response.content.filter(b => b.type === 'tool_use')
    if (toolUses.length === 0) return { reason: 'completed' }
    const results = await Promise.allSettled(toolUses.map(tu => executeTool(tu)))
    history.push({ role: 'user', content: results.map((r, i) =>
      ({ type: 'tool_result', tool_use_id: toolUses[i].id,
         content: r.status === 'fulfilled' ? r.value : `Error: ${r.reason}`,
         is_error: r.status === 'rejected' })
    ) })
    yield { role: 'tool', content: results }
  }
  return { reason: 'max_turns', turnCount: maxTurns }
}

20 行里的关键决策——

  • async function*——流式 yield 每个 message
  • Promise.allSettled——一个工具失败不阻塞其他
  • 工具错误被翻译成 tool_result 反馈给模型不抛异常
  • 返回 Terminal——调用方能精确区分完成方式

20 行的 demo1729 行的 Claude Code query.ts相同骨架、10 倍工程加固——你现在能读懂这条演进路径

3.20 五条”我在 Agent Loop 上踩过的坑”

坑 1:用 Promise.all 而不是 allSettled——某工具偶尔报错就让整轮失败——模型看不到其他工具的成功结果、只能从零重试——成本和延迟双爆炸。

坑 2:终止条件只有 max_turns——没装 token budget / wall-time / consecutive_errors——遇上”死循环但每轮只消耗几千 token”的场景、跑 3 小时才被 max_turns 救——多花 $100+

坑 3:工具错误用 throw 不用 tool_result——模型永远不知道错了、只能看到一个 assistant message 突然消失——没办法自我修正

坑 4:heartbeat 只做 UI、不做 watchdog——用户 Ctrl-C 后僵尸进程没清理——$5-10 的沉默消耗直到 session timeout

坑 5:信任模型返回的 tool 名合法——没校验就执行——API 返回不存在 tool 时异常堆栈冒泡到用户——应该返回tool ‘X’ doesn’t exist反馈给模型让它换一个再试

这 5 条——每条我都是”线上事故 → git blame → 补丁”——读者现在可以跳过这些事故、直接从我已经知道开始

3.21 本章压轴:Agent Loop 的 “12 诫”

12 条你立刻能抄走的规则——每条都对应本章某节

  1. async function* 而不是 async function(§3.10)
  2. 终止用 tagged union 而不是 string(§3.13)
  3. 至少 12 种 terminal reason(§3.10.1)
  4. max_turns 要 yield max_turns_reached attachment(§3.10.2)
  5. Proactive compact + reactive compact 必须双轨(§3.11)
  6. 跨 compact 边界要追踪 token budget(§3.12)
  7. 工具执行用 allSettled 不是 all(§3.6 / §3.19)
  8. 工具错误翻译成 tool_result、不抛(§3.5.1)
  9. 并发按语义分组(读并行、写串行)(§3.15)
  10. hook 机制开放给用户扩展(§3.14)
  11. Heartbeat 兼做 liveness watchdog(§3.16)
  12. 模型返回的 tool 名要校验、失败反馈(§3.5.2)

打勾 12/12 的 Agent Loop = 生产可用
打勾 < 8 = 下次事故在路上

3.22 尾声

Agent Loop 是 Agent 工程的心跳——心跳稳、Agent 才能跑

本章从 while 循环 demo → 12 种终止路径 → reactive compact → token budget 会计——层层深入、每一层都对应 Claude Code 里的一段真实代码

读者下次写 Agent Loop 时、脑子里应该浮现的是 query.ts 的 1729 行——知道哪里藏着陷阱、知道哪里值得抄、知道 12 诫每条的来源

下一章——上下文窗口管理——Agent 的视野”。

3.23 stream vs AsyncGenerator:为什么 Claude Code 选后者

Anthropic SDK 提供 messages.stream() API——返回一个 MessageStream、支持 for await——看起来和 AsyncGenerator 等价。但 Claude Code 的 query.ts 在此之上又套了一层 async function*——为什么

三个差异——

  • MessageStream 只流 SSE 事件(text_delta、tool_use_start 等 API 层信号)——不流业务层事件(turn_complete、tool_executed、compact_triggered 等)
  • 调用方需要统一消费接口——用户不关心是”SDK 的 stream 事件”还是”Claude Code 自己的 transition 事件”——都用 for await 消费就好
  • 生成器支持提前终止.return())——Ctrl-C 触发时、一次 return 清掉整条管线——Stream 手动管理起来繁琐

这是包装层的经典模式——把下游多个抽象统一成一个给上游一个干净的接口——React 的 Concurrent Rendering 调度器、Tokio 的 Stream combinator 都是同源思路

3.24 QueryEngine.ts 1295 行做的事

src/QueryEngine.tsquery.ts 更上层的封装——1295 行。它的职责——

  • Session 生命周期(创建 / 持久化 / 恢复)
  • 消息预处理processUserInput
  • Permission context 注入
  • Analytics / telemetry hook
  • File state cache 管理(第 15 章讨论过的 FileStateCache
  • Attribution 追踪(本书第 19 章讨论的 trace attribution)

QueryEngine : query.ts = Web 服务器的 HTTP handler : 业务逻辑函数——一层处理协议 / 认证 / 日志、另一层处理业务循环

读者启示——生产 Agent 从不把”loop”和”session 管理”混在一个文件——两层分开、各自不超过 2000 行——维护性好得多

3.25 一个真实事故:maxOutputTokensRecoveryCount

query.ts 的 state 里有个不起眼的字段 maxOutputTokensRecoveryCount——背后有一个真实的生产事故

现象——模型偶尔在生成过程中触发 API 的 max_output_tokens 错误——响应被截断、tool_use 参数只到一半。

naive 处理——直接失败、上报错误。

Claude Code 的处理query.ts:1200-1250 transition: { reason: 'max_output_tokens_escalate' })——

  • 捕获错误
  • 增加 max_tokens 上限(比如从 4096 升到 8192)
  • 重新发送请求、从中断处继续
  • 记录 maxOutputTokensRecoveryCount——连续 3 次都没成功就放弃

这是自适应重试”——第 1 次用默认限、失败就加倍、最多 3 次——比固定 max_tokens 稳、比无限 max_tokens 省

读者启示——任何上游资源限制都值得渐进放宽 + 上限兜底——不是一次失败就罢手”、也不是拼命重试”——指数退避的另一种形式

3.26 关于 transition 的 state machine 哲学

query.ts 的 state 里有 transition?: { reason: '...' }——每次循环迭代带一个上一步怎么进来的标记

transition.reason 可能是:

  • undefined——首次迭代
  • next_turn——正常进入下一轮
  • reactive_compact_retry——刚压缩完重试
  • collapse_drain_retry——context collapse 重试
  • stop_hook_blocking——stop hook 暂缓继续
  • token_budget_continuation——token 预算重续
  • max_output_tokens_escalate——刚升级 max_tokens
  • max_output_tokens_recovery——刚恢复 max_tokens

为什么每种都要 tag——

  • 分支逻辑依赖——“上一轮是不是刚 retry 过”决定当前要不要跳过某些步骤(比如”反复 retry 已经第 3 次、不要再 retry 了”)
  • telemetry 语义——trace 里能看到这个 turn 是怎么来的”、而不是只看到一条消息
  • debugging 友好——事后复盘为什么这里突然转了有显式字段可查

这是可观察的状态机——不是隐式状态 + 乱分支——而是每种 transition 都有名字、都能 trace”——分布式系统里的 Raft / Paxos 状态机也是这样

3.27 给新手的 5 分钟启动模板

如果读者是第一次构建 Agent Loop——用下面这个模板开始

type Terminal = { reason: 'completed' } | { reason: 'max_turns'; turns: number } | { reason: 'error'; error: Error }

async function* agentLoop(
  messages: Message[],
  tools: Tool[],
  maxTurns = 20
): AsyncGenerator<Message, Terminal> {
  for (let turn = 1; turn <= maxTurns; turn++) {
    try {
      const response = await llm.chat(messages, { tools, stream: true })
      messages.push({ role: 'assistant', content: response.content })
      yield { role: 'assistant', content: response.content }

      const toolUses = response.content.filter(b => b.type === 'tool_use')
      if (toolUses.length === 0) return { reason: 'completed' }

      const results = await Promise.allSettled(
        toolUses.map(tu => executeTool(tu.name, tu.input, { timeout: 30000 }))
      )

      const toolResults = results.map((r, i) => ({
        type: 'tool_result' as const,
        tool_use_id: toolUses[i].id,
        content: r.status === 'fulfilled' ? r.value : `Error: ${(r.reason as Error).message}`,
        is_error: r.status === 'rejected'
      }))

      messages.push({ role: 'user', content: toolResults })
      yield { role: 'user', content: toolResults }
    } catch (e) {
      return { reason: 'error', error: e as Error }
    }
  }
  return { reason: 'max_turns', turns: maxTurns }
}

这个模板已经覆盖§3.21 的 12 诫里的 7 条——剩下 5 条(reactive compact、token budget、hook、heartbeat、tool 名校验)在你的 Agent 规模化时再补——不要一开始就过度工程化

从 20 行到 1729 行是一条渐进路径——前期 KISS、中期加固、后期全面——本章讲完这条路径

3.29 categorizeRetryableAPIError:重试策略的真相

QueryEngine.ts:33 引用 services/api/errors.jscategorizeRetryableAPIError——把 API 错误分类到不同 retry 策略

真实的分类——

  • rate_limit_error(429)——指数退避 + Retry-After header 优先
  • overloaded_error(Anthropic 特有)——固定 10 秒等待后重试、最多 3 次
  • api_error(5xx)——指数退避、最多 5 次
  • invalid_request_error(4xx 非 429)——不重试、直接报错
  • authentication_error(401)——不重试、刷新 token 后再调
  • permission_error(403)——不重试、用户决策

每种错误的 retry 曲线都不一样——本章§3.5.2 给的”全部指数退避过于简化——生产系统必须按错误类型定制

读者参考——下次写 retry 层时、先写一张错误码 → retry 策略——比代码里 if-else 更易维护

3.30 stripSignatureBlocks 和工具调用签名

query.ts:54 引用 stripSignatureBlocks——处理模型响应里的 signature 字段(Anthropic extended thinking 的一部分)。

背景——Extended thinking 模式下、模型的 thinking block 会带一个 cryptographic signature——防止用户在重跑时篡改 thinking 内容

工程细节——

  • signature 的原始 bytes不能丢(否则下一轮模型拒绝处理)
  • 不应该送进 cache_control 的 hashing(会让缓存前缀永远变)
  • stripSignatureBlocks 只在缓存键计算时剥离、实际 send 时保留——一石二鸟

**这是”协议层字段 vs 用户层字段的典型区分——用户关心的是文本、协议关心的是签名——两层不能混——设计 API wrapper 时要有这种意识

3.31 buildPostCompactMessages 和 §3.11 的接口

本章§3.11 讨论 reactive compact——query.ts:14 引用 buildPostCompactMessages——就是这个函数在压缩后重建 messages 数组

export function buildPostCompactMessages(
  preCompactMessages: Message[],
  summary: string,
  keepRecentN: number
): Message[]

三个字段的设计意图——

  • preCompactMessages——压缩前的完整历史——为 fallback + trace 保留
  • summary——压缩出的摘要(第 11 章§11.5 的 9 字段结构)
  • keepRecentN——保留最近 N 条消息(第 11 章§11.4.4 默认 6)

这个函数的输出——一条”compact boundary”system message(打了 SDKCompactBoundaryMessage 类型)+ 最近 N 条原消息——模型看到的是过去被压缩了、这里是摘要”——明确的分界

跨章呼应——本章 + 第 11 章合起来——你就能读懂压缩 → buildPostCompactMessages → 继续 loop全链路

3.32 STREAM_EVENT 的 3 层嵌套

query.ts yield 的事件类型是 tagged union 三层嵌套——

  • 顶层StreamEvent | RequestStartEvent | Message | ToolUseSummaryMessage | TombstoneMessage
  • Message 内部UserMessage | AssistantMessage | SystemMessage | AttachmentMessage | HookResultMessage | ToolUseSummaryMessage | TombstoneMessage
  • AssistantMessage.content 内部text | tool_use | thinking | image | signature

三层 tagged union——每个事件、每条消息、每个 content block——都有明确类型——调用方的 switch 分支结构和源码的 switch 分支一一对应

这就是 TypeScript 在工程实践里的价值——不是写 any 省事、是写精确类型强制你考虑每种情况——和本章§3.13 讨论的 Terminal tagged union 一脉相承

3.33 feature() 条件编译

本章多次提到的 feature('REACTIVE_COMPACT')feature('CONTEXT_COLLAPSE')——bun:bundle 提供的条件编译——编译期把 feature('X') ? importX : null 里不走的分支 DCE 掉

为什么——

  • Claude Code 是开源仓库 + 内部功能双构建
  • 内部 feature(CONTEXT_COLLAPSEKAIROSMARBLE_ORIGAMI不能出现在公开 bundle
  • DCE 靠常量折叠——feature('X') 在编译时已知——整个 if (feature('X')) 分支直接移除

这是单仓库多产品的工程模式——Rust 的 #[cfg]、Java 的 @Conditional、C 的 #ifdef——JS/TS 靠 bundler 做——效果一样

读者启示——如果你的 Agent 有免费版/付费版内部/外部双轨——编译期 DCE 比运行期 if 更干净——免费版 bundle 里连 paid feature 的代码都没有

3.35 本章速查卡

一张速查表——

  • 主循环async function* queryLoop 1729 行(Claude Code src/query.ts
  • 终止:12 种 reason(completed / max_turns / model_error / blocking_limit / image_error / prompt_too_long / aborted_streaming / aborted_tools / stop_hook_prevented / hook_stopped / collapse_drain_retry / reactive_compact_retry)
  • Compact 双轨:proactive(autoCompact §3.11)+ reactive(捕获 prompt_too_long 后)
  • Token budget:跨 compact 边界追踪 task_budget.remaining
  • 并发:tool 按语义分组、Read 并行、Write 串行
  • Hook:onStop、onPreCompact、onPostCompact 三大扩展点
  • Heartbeat:UI 反馈 + liveness watchdog 双职责
  • Retry:按错误类型分策略(429 退避、503 固定等、4xx 不重试)
  • 类型:Terminal / StreamEvent / Message 三层 tagged union
  • DCEfeature(...) 条件编译剥离内部代码

3.36 notifyCommandLifecycle 的设计:完成事件的”非对称”语义

query.ts:223for 循环——把 consumedCommandUuids 里的每个命令打上completed 通知

关键细节——注释写得很清楚:

Only reached if queryLoop returned normally. Skipped on throw (error propagates through yield*) and on .return() (Return completion closes both generators). This gives the same asymmetric started-without-completed signal as print.ts’s drainCommandQueue when the turn fails.

翻译——

  • started 通知——命令开始时立即发
  • completed 通知——只在正常结束时发——错误、中断、取消都不发

这是非对称生命周期——不是 match-pair——下游消费者能靠started 但没 completed的不平衡统计失败率”。

对比传统的start/end 必须成对设计——不对称版省掉了一种 failure modeend 发了但命令其实没完)——缺失表达失败”——比用新 event 简洁

这是事件 API 设计里的一条小智慧——缺失的 completed 天然表达失败。

3.37 SYNTHETIC_OUTPUT_TOOL_NAME 的用途

QueryEngine.ts:41 引用 SYNTHETIC_OUTPUT_TOOL_NAME——Claude Code 内部的合成工具”——模型从没 call 过、但 Harness 会伪造一次 tool_use

用途——

  • 用户想让 Agent 输出结构化 JSON(不是自然语言)
  • 给一个虚拟 tooloutput(result: JSON)让模型以 tool_use 形式返回
  • 这个”tool”不真执行、Harness 直接把参数当成最终输出返回给用户

好处——

  • 复用 tool_use 的 schema 机制——不用单独写 JSON validator
  • 模型输出天然结构化——不用解析 markdown 代码块
  • 对调用方 API 干净——结果是 { status: 'completed', output: {...} } 而不是 { output: 'a markdown string with a ```json block' }

这是借协议实现功能的妙手——tool_use 原本是为”调工具”设计Claude Code 把它扩展成强制结构化输出”——同一 API、两种用法

本书第 18 章《评估》讨论过 “structured output enforcement”——本章的 synthetic tool 就是那章的实现路径之一

3.39 源码锚点速查

本章引用到的 Claude Code 文件——一张表方便读者深挖:

话题源码位置行数
主循环(async generator)src/query.ts1729
上层 session 管理src/QueryEngine.ts1295
Compact 模块src/services/compact/*3960(本书第 11 章)
Auto-compact 阈值src/services/compact/autoCompact.ts351
Post-compact 重建src/services/compact/compact.ts:buildPostCompactMessages-
Retry 策略src/services/api/errors.ts:categorizeRetryableAPIError-
Signature 处理src/utils/messages.ts:stripSignatureBlocks-
合成输出工具src/tools/SyntheticOutputTool/*-
Feature 条件编译bun:bundle.feature()-

以上