Appearance
第9章 中断与人机协作
9.1 引言
在 AI Agent 的实际应用中,纯自动化的执行流程往往不够。当 Agent 需要进行高风险操作(如转账、删除数据)、当决策需要人类专业判断(如医疗诊断确认)、或者当信息不足需要用户补充时,系统必须能够优雅地暂停执行,等待人类介入,然后从暂停点恢复。
这就是 LangGraph 中断(Interrupt)机制要解决的问题。与传统的回调或轮询方案不同,LangGraph 的中断是一种持久化的暂停——执行状态被完整保存到 Checkpoint 中,进程可以终止、重启,甚至迁移到另一台机器上,只要提供正确的 thread_id,就能从中断点恢复执行。
本章将从源码层面深入分析中断机制的完整实现:从 interrupt() 函数如何抛出异常、到 GraphInterrupt 如何被 Pregel 循环捕获、再到恢复时如何通过 Command(resume=...) 将值传回节点。我们还将探讨 interrupt_before/interrupt_after 的配置式中断,以及多中断场景下的匹配策略。
为了充分理解 LangGraph 中断机制的精妙设计,有必要先反思传统的暂停/恢复方案。最简单的做法是在每个可能中断的位置设置回调函数,让用户代码手动管理暂停逻辑。这种方法的问题在于:它将中断的责任分散到了业务代码中,增加了认知负担;而且回调模式难以持久化——一旦进程终止,回调的上下文就丢失了。另一种方案是使用协程或 async/await,但这要求所有节点函数都是异步的,而且 Python 的协程状态无法可靠地序列化。LangGraph 选择了第三种路径:基于异常的暂停加重新执行的恢复。这种方式让节点函数保持简单(可以是普通同步函数),同时通过 Checkpoint 实现真正的持久化暂停。代价是恢复时需要重新执行整个节点,但通过 scratchpad 的索引追踪,已解决的中断可以立即返回缓存值,实际的重复执行开销极小。
本章要点
- interrupt() 函数:理解其基于异常的暂停机制和 scratchpad 的索引追踪
- Interrupt 数据类型:掌握中断值的结构及其基于命名空间的确定性 ID 生成
- interrupt_before/interrupt_after:区分编译时配置的声明式中断与运行时的命令式中断
- 暂停与恢复机制:深入 Pregel 循环如何捕获中断、保存状态、以及恢复执行
- 与 Checkpoint 的配合:理解中断信息如何作为 pending_writes 持久化
- 多中断与按 ID 恢复:掌握同一节点中多个中断的索引匹配和按 ID 精确恢复
9.2 Interrupt 数据类型
9.2.1 Interrupt 类定义
Interrupt 是一个不可变的数据类,用于封装中断信息:
python
# langgraph/types.py
@final
@dataclass(init=False, slots=True)
class Interrupt:
value: Any
"""The value associated with the interrupt."""
id: str
"""The ID of the interrupt. Can be used to resume the interrupt directly."""
def __init__(self, value: Any, id: str = _DEFAULT_INTERRUPT_ID, **deprecated_kwargs):
self.value = value
if (
(ns := deprecated_kwargs.get("ns", MISSING)) is not MISSING
and (id == _DEFAULT_INTERRUPT_ID)
and (isinstance(ns, Sequence))
):
self.id = xxh3_128_hexdigest("|".join(ns).encode())
else:
self.id = id
@classmethod
def from_ns(cls, value: Any, ns: str) -> Interrupt:
return cls(value=value, id=xxh3_128_hexdigest(ns.encode()))两个核心属性:
value:中断携带的值,可以是任意类型。它被传递给客户端,用于展示中断原因或请求用户输入。例如 "请确认是否执行转账操作?" 或 {"question": "请选择以下选项", "options": ["A", "B", "C"]}。
id:中断的唯一标识符。注意它不是随机生成的 UUID,而是通过 xxh3_128_hexdigest 对检查点命名空间进行哈希计算得出的确定性 ID。这意味着同一个执行路径上的同一个中断,在不同运行中会生成相同的 ID。
python
# 中断 ID 的确定性生成
Interrupt.from_ns(
value="请确认操作",
ns=conf[CONFIG_KEY_CHECKPOINT_NS], # 例如 "agent:task-id-123"
)
# id = xxh3_128_hexdigest(b"agent:task-id-123")这个设计选择服务于多中断场景下的精确恢复:客户端可以通过中断 ID 指定要恢复哪个中断。由于 xxh3_128_hexdigest 是一个非密码学的高速哈希函数,ID 的计算几乎没有性能开销。128 位的输出空间足够大,碰撞概率可以忽略不计。
@final 装饰器和 slots=True 的使用也值得注意。@final 禁止了子类化,这确保了 Interrupt 的行为在整个系统中是一致和可预测的——没有用户代码可以通过继承来改变中断的序列化或比较行为。slots=True 则通过使用 __slots__ 而非 __dict__ 来存储属性,减少了内存开销并略微提升了属性访问速度。这些都是面向高性能场景的微优化,体现了 LangGraph 在底层基础设施上的精细打磨。
9.2.2 GraphInterrupt 异常
GraphInterrupt 是中断机制的传播载体:
python
# langgraph/errors.py
class GraphBubbleUp(Exception):
"""所有需要向上冒泡的异常的基类"""
pass
class GraphInterrupt(GraphBubbleUp):
"""Raised when a subgraph is interrupted,
suppressed by the root graph. Never raised directly."""
def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
super().__init__(interrupts)GraphInterrupt 继承自 GraphBubbleUp,这是 LangGraph 中所有需要跨层传播的异常的基类。关键设计:GraphInterrupt 在根图中被抑制,不会泄漏到用户代码中。它只是一种内部的控制流机制。