LangGraph 设计与实现
第6章 Pregel 执行引擎
第6章 Pregel 执行引擎
6.1 引言
Pregel 是 LangGraph 的心脏。当你调用 compiled_graph.invoke() 或 compiled_graph.stream() 时,真正驱动计算的是 Pregel 执行引擎——一个基于 Google Pregel 论文思想、专门为 AI 工作流定制的 BSP(Bulk Synchronous Parallel)运行时。
Google 在 2010 年发表的 Pregel 论文描述了一种用于大规模图计算的编程模型:计算以超步(superstep)为单位推进,每个超步中所有活跃节点并行执行,执行结果在超步之间通过消息传递可见。LangGraph 将这个模型适配到了 AI 工作流场景——“节点”是 AI Agent 或工具调用,“消息”是 Channel 中的状态更新,“超步”是一轮任务执行与状态同步。
本章将深入剖析以下核心组件:
Pregel类(pregel/main.py)—— 执行引擎的公共接口PregelLoop(pregel/_loop.py)—— 执行循环的核心状态机SyncPregelLoop/AsyncPregelLoop—— 同步和异步的具体实现prepare_next_tasks算法 —— 决定每个超步执行哪些任务apply_writes算法 —— 在超步之间更新 Channel 状态- 版本追踪机制 —— 高效判定哪些节点需要被触发
max_steps停止条件 —— 防止无限循环的安全阀
本章要点
- Pregel 类是 LangGraph 的统一运行时接口,
invoke()和stream()都构建在同一个执行循环上 PregelLoop是一个状态机,核心循环为tick()-> 执行任务 ->after_tick()prepare_next_tasks通过 Channel 版本比较决定哪些节点在下一步执行apply_writes在超步之间原子地更新所有 Channel,确保步内隔离- 版本追踪使用
channel_versions和versions_seen两张表实现高效的变更检测 recursion_limit通过step > stop条件提供安全停止保证
6.2 Pregel 类:执行引擎的入口
Pregel 类定义在 pregel/main.py 中,是 CompiledStateGraph 的父类。它持有执行所需的全部配置:
class Pregel(PregelProtocol, Generic[StateT, ContextT, InputT, OutputT]):
nodes: dict[str, PregelNode] # 编译后的节点
channels: dict[str, BaseChannel | ManagedValueSpec] # 通道定义
input_channels: str | Sequence[str] # 输入通道
output_channels: str | Sequence[str] # 输出通道
stream_channels: str | Sequence[str] | None
trigger_to_nodes: Mapping[str, Sequence[str]] # 优化映射
checkpointer: Checkpointer # 检查点存储
store: BaseStore | None # 持久化存储
cache: BaseCache | None # 节点结果缓存
retry_policy: Sequence[RetryPolicy] # 全局重试策略
cache_policy: CachePolicy | None # 全局缓存策略
interrupt_before_nodes: All | Sequence[str]
interrupt_after_nodes: All | Sequence[str]
step_timeout: float | None # 步超时
6.2.1 invoke() 和 stream() 的关系
在 Pregel 的设计中,invoke() 是基于 stream() 实现的——它调用 stream() 收集所有输出,然后返回最终值:
def invoke(self, input, config=None, *, stream_mode="values", ...):
latest = None
for chunk in self.stream(
input, config,
stream_mode=["updates", "values"] if stream_mode == "values"
else stream_mode,
...
):
if stream_mode == "values":
mode, payload = chunk
if mode == "values":
latest = payload
return latest
这意味着 stream() 才是真正的执行入口。所有的执行逻辑都围绕流式输出构建。
6.2.2 stream() 的执行框架
stream() 方法的核心结构如下:
def stream(self, input, config=None, *, stream_mode=None, ...):
# 1. 准备配置
stream_modes, output_keys, interrupt_before_, interrupt_after_, \
checkpointer, store, cache, durability_ = self._defaults(...)
# 2. 构建流式队列
stream = SyncQueue()
# 3. 进入 PregelLoop 上下文
with SyncPregelLoop(
input, stream=StreamProtocol(stream.put, stream_modes),
config=config, checkpointer=checkpointer,
nodes=self.nodes, specs=self.channels,
...
) as loop:
# 4. 创建 Runner
runner = PregelRunner(
submit=weakref.WeakMethod(loop.submit),
put_writes=weakref.WeakMethod(loop.put_writes),
)
# 5. BSP 主循环
while loop.tick():
for _ in runner.tick(
[t for t in loop.tasks.values() if not t.writes],
timeout=self.step_timeout,
schedule_task=loop.accept_push,
):
yield from _output(stream.get, ...)
loop.after_tick()
这段代码精确地体现了 BSP 模型的三个阶段:
flowchart TB
subgraph "BSP 超步循环"
TICK["loop.tick()\n计划阶段"] --> EXEC["runner.tick()\n执行阶段"]
EXEC --> AFTER["loop.after_tick()\n更新阶段"]
AFTER -->|有新任务| TICK
AFTER -->|无新任务| DONE[结束]
end
subgraph "计划阶段详情"
T1[检查步数限制] --> T2[prepare_next_tasks]
T2 --> T3[匹配已有写入]
T3 --> T4[检查 interrupt_before]
T4 --> T5[发出调试事件]
end
subgraph "更新阶段详情"
A1[apply_writes 更新 Channel] --> A2[发出 values 事件]
A2 --> A3[清空 pending_writes]
A3 --> A4[保存 Checkpoint]
A4 --> A5[检查 interrupt_after]
A5 --> A6[步数 + 1]
end
TICK -.-> T1
AFTER -.-> A1
6.3 PregelLoop:执行循环的状态机
PregelLoop 是执行循环的核心,定义在 pregel/_loop.py 中。它不是一个简单的 while 循环,而是一个精心设计的状态机。
6.3.1 状态定义
class PregelLoop:
# 配置
config: RunnableConfig
nodes: Mapping[str, PregelNode]
specs: Mapping[str, BaseChannel | ManagedValueSpec]
input_keys: str | Sequence[str]
output_keys: str | Sequence[str]
stream_keys: str | Sequence[str]
# 运行时状态
step: int # 当前超步编号
stop: int # 最大步数
status: str # 状态机状态
tasks: dict[str, PregelExecutableTask] # 当前步的任务
output: Any | None # 最终输出
updated_channels: set[str] | None # 上一步更新的通道
# Checkpoint 状态
checkpoint: Checkpoint
checkpoint_config: RunnableConfig
checkpoint_metadata: CheckpointMetadata
checkpoint_pending_writes: list[PendingWrite]
checkpoint_previous_versions: dict[str, str | float | int]
# Channel 和 Managed Values
channels: Mapping[str, BaseChannel]
managed: ManagedValueMapping
6.3.2 状态机转换
PregelLoop 的 status 字段可以是以下值之一:
stateDiagram-v2
[*] --> input : __enter__
input --> pending : _first() 成功
pending --> pending : tick() + after_tick()
pending --> done : tick() 返回 False(无任务)
pending --> out_of_steps : tick() 返回 False(超步数)
pending --> interrupt_before : tick() 中触发中断
pending --> interrupt_after : after_tick() 中触发中断
done --> [*] : __exit__
out_of_steps --> [*] : 抛出 GraphRecursionError
interrupt_before --> [*] : 抛出 GraphInterrupt
interrupt_after --> [*] : 抛出 GraphInterrupt
6.3.3 SyncPregelLoop 的生命周期
SyncPregelLoop 实现为 Python 上下文管理器,其 __enter__ 方法执行完整的初始化:
def __enter__(self) -> Self:
# 1. 获取 Checkpoint
if not self.checkpointer:
saved = None
elif self.checkpoint_config[CONF].get(CONFIG_KEY_CHECKPOINT_ID):
saved = self.checkpointer.get_tuple(self.checkpoint_config)
else:
saved = self.checkpointer.get_tuple(self.checkpoint_config)
if saved is None:
saved = CheckpointTuple(
self.checkpoint_config, empty_checkpoint(),
{"step": -2}, None, []
)
elif self._migrate_checkpoint is not None:
self._migrate_checkpoint(saved.checkpoint)
# 2. 恢复 Checkpoint 状态
self.checkpoint = saved.checkpoint
self.checkpoint_metadata = saved.metadata
self.checkpoint_pending_writes = [...]
# 3. 初始化后台执行器
self.submit = self.stack.enter_context(
BackgroundExecutor(self.config)
)
# 4. 从 Checkpoint 恢复 Channel 状态
self.channels, self.managed = channels_from_checkpoint(
self.specs, self.checkpoint
)
# 5. 计算步数边界
self.step = self.checkpoint_metadata["step"] + 1
self.stop = self.step + self.config["recursion_limit"] + 1
# 6. 处理首步输入
self.updated_channels = self._first(
input_keys=self.input_keys,
updated_channels=...
)
return self
这里的关键点:
- Checkpoint 恢复:如果存在之前的 Checkpoint(例如从中断点恢复),直接加载而非从空状态开始
step的计算:从 Checkpoint 的元数据中恢复步数,确保恢复执行时步数连续stop的计算:step + recursion_limit + 1。+1是因为比较条件是step > stop(严格大于),所以需要多一步的余量_first():处理首步输入——将用户输入写入 Channel,或者在恢复执行时跳过输入处理
6.4 _first():首步输入处理
_first() 方法是执行循环中最复杂的初始化逻辑,它需要区分三种场景:
flowchart TD
START[_first 开始] --> CHECK{是否从 Checkpoint 恢复?}
CHECK -->|是: is_resuming=True| RESUME[恢复模式]
CHECK -->|否| INPUT{输入是 Command?}
INPUT -->|是| CMD[处理 Command]
INPUT -->|否| FRESH[新鲜输入模式]
RESUME --> VSEEN["更新 versions_seen\n标记所有 Channel 为已见"]
VSEEN --> EMIT_V[发出 values 事件]
CMD --> MAP_CMD[map_command 解析]
MAP_CMD --> WRITE_CMD[写入 Channel]
FRESH --> MAP_INPUT[map_input 解析]
MAP_INPUT --> DISCARD[丢弃未完成任务]
DISCARD --> APPLY[apply_writes 应用输入]
APPLY --> SAVE[保存输入 Checkpoint]
恢复模式
当从 Checkpoint 恢复时(is_resuming=True),_first 将 versions_seen[INTERRUPT] 设置为当前所有 Channel 的版本号。这告诉 should_interrupt() 函数:“我已经看到了所有当前的更新”,防止恢复后立即再次触发中断。
if is_resuming:
self.checkpoint["versions_seen"].setdefault(INTERRUPT, {})
for k in self.channels:
if k in self.checkpoint["channel_versions"]:
version = self.checkpoint["channel_versions"][k]
self.checkpoint["versions_seen"][INTERRUPT][k] = version
新鲜输入模式
新输入通过 map_input 转化为 Channel 写入,然后通过 apply_writes 应用:
elif input_writes := deque(map_input(input_keys, self.input)):
# 丢弃任何未完成的任务
discard_tasks = prepare_next_tasks(...)
# 应用输入写入
updated_channels = apply_writes(
self.checkpoint, self.channels,
[*discard_tasks.values(),
PregelTaskWrites((), INPUT, input_writes, [])],
self.checkpointer_get_next_version,
self.trigger_to_nodes,
)
# 保存输入 Checkpoint
self._put_checkpoint({"source": "input"})
注意 discard_tasks 的处理——如果 Checkpoint 中有之前未完成的任务,它们会在这里被”消费”,确保不会在新执行中被重复触发。
6.5 tick():计划阶段
tick() 方法是 BSP 模型的计划阶段,它决定当前超步要执行哪些任务:
def tick(self) -> bool:
# 1. 检查步数限制
if self.step > self.stop:
self.status = "out_of_steps"
return False
# 2. 准备下一步任务
self.tasks = prepare_next_tasks(
self.checkpoint,
self.checkpoint_pending_writes,
self.nodes, self.channels, self.managed,
self.config, self.step, self.stop,
for_execution=True,
manager=self.manager,
store=self.store,
checkpointer=self.checkpointer,
trigger_to_nodes=self.trigger_to_nodes,
updated_channels=self.updated_channels,
retry_policy=self.retry_policy,
cache_policy=self.cache_policy,
)
# 3. 如果没有任务,图执行完毕
if not self.tasks:
self.status = "done"
return False
# 4. 匹配已有写入(从 Checkpoint 恢复时)
if not self.is_replaying and self.checkpoint_pending_writes:
self._match_writes(self.tasks)
# 5. 检查是否需要在执行前中断
if self.interrupt_before and should_interrupt(
self.checkpoint, self.interrupt_before, self.tasks.values()
):
self.status = "interrupt_before"
raise GraphInterrupt()
return True
tick() 返回 True 表示”有任务需要执行”,False 表示”执行结束”。调用者(stream() 方法)根据返回值决定是否继续循环。
6.6 prepare_next_tasks:任务准备算法
prepare_next_tasks 是整个执行引擎中最核心的调度算法,定义在 pregel/_algo.py 中。它的职责是确定下一个超步应该执行哪些任务。
6.6.1 算法概览
def prepare_next_tasks(
checkpoint, pending_writes, processes, channels, managed,
config, step, stop, *, for_execution, store, checkpointer,
manager, trigger_to_nodes, updated_channels,
retry_policy, cache_policy,
):
input_cache = {}
tasks = []
# 阶段一:处理 PUSH 任务(Send API 产生的动态任务)
tasks_channel = channels.get(TASKS)
if tasks_channel and tasks_channel.is_available():
for idx, _ in enumerate(tasks_channel.get()):
if task := prepare_single_task((PUSH, idx), ...):
tasks.append(task)
# 阶段二:确定候选节点(优化路径)
if updated_channels and trigger_to_nodes:
triggered_nodes = set()
for channel in updated_channels:
if node_ids := trigger_to_nodes.get(channel):
triggered_nodes.update(node_ids)
candidate_nodes = sorted(triggered_nodes)
elif not checkpoint["channel_versions"]:
candidate_nodes = ()
else:
candidate_nodes = processes.keys()
# 阶段三:处理 PULL 任务(常规节点触发)
for name in candidate_nodes:
if task := prepare_single_task((PULL, name), ...):
tasks.append(task)
return {t.id: t for t in tasks}
6.6.2 PUSH 任务 vs PULL 任务
这两种任务类型对应了两种不同的触发机制:
flowchart LR
subgraph "PULL 任务(常规触发)"
CH_UPDATE["Channel 版本更新"] --> VER_CHECK["版本比较\n_triggers()"]
VER_CHECK --> PULL_TASK["创建 PULL 任务\n节点名称 -> 任务"]
end
subgraph "PUSH 任务(Send API)"
SEND["Send('node', data)"] --> TASKS_CH["__pregel_tasks\nTopic Channel"]
TASKS_CH --> PUSH_TASK["创建 PUSH 任务\n带自定义输入"]
end
style PULL_TASK fill:#c8e6c9
style PUSH_TASK fill:#fff3e0
- PULL 任务:由 Channel 版本变更触发。当节点订阅的 Channel 在上一步被更新时,该节点在下一步被”拉入”执行。这是 BSP 模型的标准触发方式。
- PUSH 任务:由
SendAPI 显式创建。节点可以通过返回Send("target", data)来动态创建任务,“推送”自定义数据到目标节点。PUSH 任务不经过 Channel 版本检查。
6.6.3 优化路径:trigger_to_nodes
阶段二的候选节点确定包含了一个重要的优化。当同时满足以下条件时,引擎使用 trigger_to_nodes 映射表快速定位需要检查的节点:
updated_channels不为空——知道上一步更新了哪些 Channeltrigger_to_nodes不为空——有预构建的映射表
if updated_channels and trigger_to_nodes:
triggered_nodes = set()
for channel in updated_channels:
if node_ids := trigger_to_nodes.get(channel):
triggered_nodes.update(node_ids)
candidate_nodes = sorted(triggered_nodes)
当这两个条件不满足时(例如首次执行时 updated_channels 可能为 None),退化为遍历所有节点。sorted() 确保了确定性的执行顺序。
6.6.4 版本比较:_triggers() 函数
对于每个候选 PULL 节点,prepare_single_task 调用 _triggers() 检查该节点是否真的需要被触发:
def _triggers(channels, channel_versions, versions_seen,
null_version, proc):
"""检查节点的任何触发 Channel 是否在上一步被更新过。"""
if versions_seen is None:
# 节点从未被执行过
return any(
channel_versions.get(chan, null_version) > null_version
for chan in proc.triggers
if chan in channels and channels[chan].is_available()
)
else:
return any(
channel_versions.get(chan, null_version)
> versions_seen.get(chan, null_version)
for chan in proc.triggers
if chan in channels and channels[chan].is_available()
)
这个函数是 Pregel 调度机制的核心。它使用两张版本表来做比较:
channel_versions[chan]—— Channel 的当前版本号versions_seen[node][chan]—— 该节点上次执行时看到的 Channel 版本号
如果任何一个触发 Channel 的当前版本大于节点上次看到的版本,说明有新数据,节点需要被触发。
6.7 apply_writes:更新阶段算法
apply_writes 在每个超步结束后被调用(在 after_tick() 中),负责将所有任务的写入原子地应用到 Channel:
def apply_writes(checkpoint, channels, tasks,
get_next_version, trigger_to_nodes):
# 排序任务,确保确定性
tasks = sorted(tasks, key=lambda t: task_path_str(t.path[:3]))
bump_step = any(t.triggers for t in tasks)
# 1. 更新 versions_seen
for task in tasks:
checkpoint["versions_seen"].setdefault(task.name, {}).update({
chan: checkpoint["channel_versions"][chan]
for chan in task.triggers
if chan in checkpoint["channel_versions"]
})
# 2. 计算下一个版本号
next_version = get_next_version(
max(checkpoint["channel_versions"].values())
if checkpoint["channel_versions"] else None,
None,
)
# 3. 消费已读 Channel
for chan in {chan for task in tasks for chan in task.triggers
if chan not in RESERVED and chan in channels}:
if channels[chan].consume() and next_version is not None:
checkpoint["channel_versions"][chan] = next_version
# 4. 按 Channel 分组写入
pending_writes_by_channel = defaultdict(list)
for task in tasks:
for chan, val in task.writes:
if chan in channels:
pending_writes_by_channel[chan].append(val)
# 5. 应用写入到 Channel
updated_channels = set()
for chan, vals in pending_writes_by_channel.items():
if channels[chan].update(vals) and next_version is not None:
checkpoint["channel_versions"][chan] = next_version
if channels[chan].is_available():
updated_channels.add(chan)
# 6. 通知未更新的 Channel 新步开始
if bump_step:
for chan in channels:
if channels[chan].is_available() and chan not in updated_channels:
if channels[chan].update(EMPTY_SEQ):
...
# 7. 尝试触发 finish()
if bump_step and updated_channels.isdisjoint(trigger_to_nodes):
for chan in channels:
if channels[chan].finish():
...
updated_channels.add(chan)
return updated_channels
6.7.1 算法详解
让我们逐步解析这个算法:
步骤 1 - 更新 versions_seen:记录每个任务”看到”了它触发 Channel 的哪个版本。这是下一次 _triggers() 比较的基准。
步骤 2 - 计算 next_version:所有在本步中发生变更的 Channel 都会被赋予同一个版本号。默认的版本函数 increment 简单地将整数加 1。
步骤 3 - 消费已读 Channel:某些 Channel(如 EphemeralValue)在被消费后会清除自己的值。这确保了路由信号是一次性的。consume() 方法返回 True 表示 Channel 状态发生了变化。
步骤 4-5 - 分组并应用写入:将同一 Channel 的所有写入收集在一起,然后一次性调用 channel.update(values)。这保证了 Channel 的更新是原子的——要么全部应用,要么因 InvalidUpdateError 全部拒绝。
步骤 6 - 空通知:即使没有被写入的 Channel,也会收到一个空的 update([]) 调用。这允许 EphemeralValue 在超步之间清除自己——如果上一步有值但本步没有写入,update([]) 会将 value 设为 MISSING,Channel 变为不可用。
步骤 7 - finish() 触发:当所有更新的 Channel 都不在 trigger_to_nodes 中时(即没有节点会被这些更新触发),算法认为这是最后一个超步,调用所有 Channel 的 finish() 方法。LastValueAfterFinish Channel 在此时变为可用,触发 defer 节点。
sequenceDiagram
participant PL as PregelLoop
participant Tasks as 任务集合
participant Channels as Channels
participant CP as Checkpoint
Note over PL: after_tick() 开始
PL->>Tasks: 收集所有 writes
PL->>CP: 更新 versions_seen
PL->>Channels: 消费已读 Channel (consume)
Note over Channels: EphemeralValue 清除旧值
PL->>Channels: 分组应用写入 (update)
Channels-->>PL: 返回 updated_channels
PL->>Channels: 空通知未更新 Channel (update([]))
Note over Channels: EphemeralValue 清除未被写入的值
alt 无触发节点
PL->>Channels: finish()
Note over Channels: LastValueAfterFinish 变为可用
end
PL->>CP: 保存 Checkpoint
Note over PL: step += 1
6.8 after_tick():超步结束处理
after_tick() 在每个超步的所有任务执行完成后被调用:
def after_tick(self) -> None:
# 1. 应用所有写入
self.updated_channels = apply_writes(
self.checkpoint, self.channels,
self.tasks.values(),
self.checkpointer_get_next_version,
self.trigger_to_nodes,
)
# 2. 发出 values 流式事件(如果输出 Channel 被更新)
if not self.updated_channels.isdisjoint(
(self.output_keys,) if isinstance(self.output_keys, str)
else self.output_keys
):
self._emit("values", map_output_values,
self.output_keys, writes, self.channels)
# 3. 清空 pending_writes
self.checkpoint_pending_writes.clear()
# 4. 标记不再是重放模式
self.is_replaying = False
# 5. 保存 Checkpoint
self._put_checkpoint({"source": "loop"})
# 6. 检查 interrupt_after
if self.interrupt_after and should_interrupt(
self.checkpoint, self.interrupt_after, self.tasks.values()
):
self.status = "interrupt_after"
raise GraphInterrupt()
after_tick 中有几个细节值得关注:
is_replaying = False:只在第一个 tick 中可能为 True(从 Checkpoint 恢复时重放),之后的 tick 都是新执行- Checkpoint 保存时机:每个超步结束后都保存 Checkpoint(除非
durability="exit"),这确保了即使进程崩溃也能从最近的超步恢复 - interrupt_after 检查:在写入应用之后、下一步开始之前检查,确保中断时状态已经更新
6.9 版本追踪机制
版本追踪是 Pregel 调度机制的基石。它通过两张表实现:
6.9.1 channel_versions
channel_versions 是 Checkpoint 的一部分,记录每个 Channel 的当前版本号:
checkpoint["channel_versions"] = {
"messages": 3,
"count": 3,
"branch:to:agent": 2,
"branch:to:tool": 3,
# ...
}
每次 apply_writes 更新 Channel 时,对应的版本号会被设置为 next_version(所有本步更新的 Channel 共享同一个版本号)。
6.9.2 versions_seen
versions_seen 记录每个节点(以及特殊的 INTERRUPT 标识)上次执行时看到的 Channel 版本:
checkpoint["versions_seen"] = {
"agent": {
"branch:to:agent": 2,
},
"tool": {
"branch:to:tool": 2,
},
INTERRUPT: {
"messages": 3,
"count": 3,
# ...
}
}
6.9.3 触发判定
节点是否需要被触发,取决于以下比较:
channel_versions[trigger_chan] > versions_seen[node][trigger_chan]
如果节点的任何触发 Channel 的当前版本大于该节点上次看到的版本,说明有新数据需要处理,节点被触发。
flowchart TB
subgraph "版本追踪示例"
direction TB
CV["channel_versions:\nbranch:to:agent = 4\nbranch:to:tool = 3"]
VS["versions_seen:\nagent: branch:to:agent = 2\ntool: branch:to:tool = 3"]
CV --> CMP{比较}
VS --> CMP
CMP --> R1["agent: 4 > 2 = True\n需要触发"]
CMP --> R2["tool: 3 > 3 = False\n不触发"]
end
style R1 fill:#c8e6c9
style R2 fill:#fce4ec
这个机制的优雅之处在于:
- 无需全局锁:每个节点只关心自己订阅的 Channel 版本,不需要全局协调
- 幂等性:同一版本不会触发重复执行
- 自然停止:当没有任何 Channel 被更新时,所有版本比较都返回 False,循环自然终止
6.10 停止条件:max_steps 与 recursion_limit
Pregel 执行循环有两种正常停止条件和一种安全停止条件:
正常停止 1 - 无任务:prepare_next_tasks 返回空字典,说明没有节点需要被触发,图执行完成。
正常停止 2 - 中断:遇到 interrupt_before 或 interrupt_after 节点,抛出 GraphInterrupt。
安全停止 - 步数限制:
def tick(self) -> bool:
if self.step > self.stop:
self.status = "out_of_steps"
return False
self.stop 在初始化时计算:
self.stop = self.step + self.config["recursion_limit"] + 1
默认的 recursion_limit 是 25,意味着最多执行 25 个超步。如果超过这个限制,stream() 方法会抛出 GraphRecursionError:
if loop.status == "out_of_steps":
raise GraphRecursionError(
f"Recursion limit of {config['recursion_limit']} reached "
"without hitting a stop condition."
)
这个安全阀防止了无限循环——在 AI 工作流中,循环依赖和非终止条件是常见的 bug,步数限制确保了图总会终止。
6.11 SyncPregelLoop vs AsyncPregelLoop
LangGraph 为同步和异步场景提供了两个 PregelLoop 实现。它们共享 PregelLoop 基类的所有逻辑(tick、after_tick、_first 等),区别仅在于 I/O 操作的同步/异步形式:
| 组件 | SyncPregelLoop | AsyncPregelLoop |
|---|---|---|
| 上下文管理器 | AbstractContextManager | AbstractAsyncContextManager |
| 后台执行器 | BackgroundExecutor(线程池) | AsyncBackgroundExecutor(asyncio) |
| Checkpoint 读取 | checkpointer.get_tuple() | checkpointer.aget_tuple() |
| Checkpoint 写入 | checkpointer.put() | checkpointer.aput() |
| Writes 保存 | checkpointer.put_writes() | checkpointer.aput_writes() |
| 缓存操作 | cache.get() / cache.set() | cache.aget() / cache.aset() |
class SyncPregelLoop(PregelLoop, AbstractContextManager):
def __init__(self, ...):
super().__init__(...)
self.stack = ExitStack()
if checkpointer:
self.checkpointer_get_next_version = checkpointer.get_next_version
self.checkpointer_put_writes = checkpointer.put_writes
else:
self.checkpointer_get_next_version = increment
self.checkpointer_put_writes = None
class AsyncPregelLoop(PregelLoop, AbstractAsyncContextManager):
def __init__(self, ...):
super().__init__(...)
self.stack = AsyncExitStack()
if checkpointer:
self.checkpointer_get_next_version = checkpointer.get_next_version
self.checkpointer_put_writes = checkpointer.aput_writes # 注意 a 前缀
else:
self.checkpointer_get_next_version = increment
self.checkpointer_put_writes = None
注意:即使没有 Checkpointer,版本追踪仍然工作。increment 函数作为默认的版本生成器确保了 Channel 版本在没有持久化的情况下仍然正确递增。
6.12 Checkpoint 保存策略
PregelLoop 支持三种持久化模式(Durability),通过 durability 参数控制:
flowchart LR
subgraph "sync 模式"
S1[超步完成] --> S2[保存 Checkpoint]
S2 --> S3[等待保存完成]
S3 --> S4[开始下一步]
end
subgraph "async 模式(默认)"
A1[超步完成] --> A2[异步保存 Checkpoint]
A1 --> A3[开始下一步]
A2 -.->|后台| A4[保存完成]
end
subgraph "exit 模式"
E1[超步完成] --> E2[继续下一步]
E2 --> E3[...]
E3 --> E4[图退出时保存]
end
在 _put_checkpoint 方法中,保存操作被提交给后台执行器:
def _put_checkpoint(self, metadata):
# ...
self._put_checkpoint_fut = self.submit(
self._checkpointer_put_after_previous,
getattr(self, "_put_checkpoint_fut", None), # 前一个保存的 Future
self.checkpoint_config,
copy_checkpoint(self.checkpoint),
self.checkpoint_metadata,
new_versions,
)
_checkpointer_put_after_previous 确保 Checkpoint 按顺序保存——它会等待前一个保存操作完成后再执行当前保存。这个”链式等待”设计避免了并发写入导致的顺序问题,同时不阻塞主执行线程。
在 sync 持久化模式下,stream() 方法会在每步结束后显式等待保存完成:
loop.after_tick()
if durability_ == "sync":
loop._put_checkpoint_fut.result() # 阻塞等待
6.13 put_writes:增量写入机制
put_writes 方法处理任务执行过程中产生的写入。它不仅更新内存中的 checkpoint_pending_writes,还会在非 exit 模式下即时保存到 Checkpointer:
def put_writes(self, task_id, writes):
if not writes:
return
# 去重特殊 Channel 的写入
if all(w[0] in WRITES_IDX_MAP for w in writes):
writes = list({w[0]: w for w in writes}.values())
# 更新内存中的 pending writes
self.checkpoint_pending_writes.extend(
(task_id, c, v) for c, v in writes
)
# 即时保存到 Checkpointer(非 exit 模式)
if self.durability != "exit" and self.checkpointer_put_writes:
self.submit(
self.checkpointer_put_writes,
config, writes_to_save, task_id,
)
# 输出流式事件
if hasattr(self, "tasks"):
self.output_writes(task_id, writes)
这种”写入即保存”的策略确保了即使进程在超步中途崩溃,已完成任务的结果也不会丢失。恢复时,_match_writes 方法会将保存的 pending writes 重新匹配到对应的任务。
6.13.1 崩溃恢复的三件套:is_replaying / checkpoint_pending_writes / _match_writes
上一节提到 _match_writes,但它本身的作用要放进一个更完整的故事里才能看清楚。LangGraph 的”死而复生”能力依赖三个看似独立、实际协同的状态量:
1. is_replaying(_loop.py:157,249) 是一个 bool 标志,构造时从 config 读出来:
# _loop.py:249
self.is_replaying = CONFIG_KEY_CHECKPOINT_ID in config[CONF]
含义简单直接——用户传入了一个具体的 checkpoint_id 就是在 replay,否则是 fresh run。这不是推断来的——是 API 契约:graph.invoke(input, {"configurable": {"thread_id": "x", "checkpoint_id": "..."}}) 的第二个 key 决定了 loop 走 replay 路径还是 fresh 路径。
2. checkpoint_pending_writes: list[PendingWrite](_loop.py:191) 是一个已持久化但尚未被”消化”(还没 apply_writes 应用到 channel)的写入列表。元素形如 (task_id, channel, value)。它的内容来源两处:
- Fresh run 中:本超步里已经完成的任务把结果通过
put_writes写进来(_loop.py:348的extend),等当前超步结束调after_tick时一并apply_writes到 channels。 - Replay 时:从 checkpointer 读取上次死掉前已经保存但没来得及 apply 的 writes——这些就是要被”原样接回”的进度。
3. _match_writes(_loop.py:583) 的实现只有 5 行,但意图非常精准:
def _match_writes(self, tasks: Mapping[str, PregelExecutableTask]) -> None:
for tid, k, v in self.checkpoint_pending_writes:
if k in (ERROR, INTERRUPT, RESUME):
continue
if task := tasks.get(tid):
task.writes.append((k, v))
遍历 checkpoint_pending_writes,跳过三类”控制”写入(ERROR/INTERRUPT/RESUME 是编排信号不是业务数据),找到还在本次 prepare_next_tasks 结果里的同 task_id,把该任务上次已经产出的 writes 直接塞进 task.writes 属性。
三者怎么合成”崩溃恢复”? 场景是:某个超步 10 个 task 并发执行,其中 6 个已经调用了 put_writes 让结果进了 checkpointer(假设 durability != "exit")、剩余 4 个还在跑的时候进程被 kill。用户后来带着那次 checkpoint_id 重新 invoke:
is_replaying = True——构造时从 config 识别出来。tick()调prepare_next_tasks重新列出本超步要执行的 10 个 task。tick()执行if not self.is_replaying and self.checkpoint_pending_writes: self._match_writes(...)——在 replay 模式下这行不执行,因为 replay 的语义是”重新从头推一次这个超步”,需要让那 10 个 task 自己 run 一遍才能确定性地产生一致的 output。- 但在非 replay 模式下(用户要的是”继续上次的流程”而不是”从某个 checkpoint 重放”——两个不同 API 姿态),
_match_writes会把已保存的那 6 个 task 的 writes 挂到 task 对象上,配合tick()line 534-536 的output_writes(task.id, task.writes, cached=True)直接发”我其实已经做完了”的信号,这 6 个 task 本轮不会真正执行,只剩 4 个未完成的被重新运行。
这个**“replay 走确定性重算、resume 走 write-level cache”** 的双路径设计让 LangGraph 在同一套代码里既支持”回到过去调试”(replay)又支持”从崩溃恢复生产流”(resume)——两种需求在教科书上经常被混淆,源码里用 is_replaying 一个 bool 把它们彻底分开。
_loop.py:520 的条件 if not self.is_replaying and self.checkpoint_pending_writes 是这一切的开关——把 bool 和 list 非空同时作为前置,保证”既不是回放、又有上次遗留写入”时才走 resume 分支。这一行代码不长、但它背后的”三件套协同”是 LangGraph 生产可用性的核心。
6.14 设计决策分析
为什么选择 BSP 模型而非 Actor 模型?
纯 Actor 模型中,消息发送是异步的,接收是即时的。但在 AI 工作流中,我们需要更强的一致性保证:
- 状态一致性:同一步中所有节点看到的是同一个状态快照,不会出现”读到半更新状态”
- 可重放性:BSP 的确定性执行顺序使得从 Checkpoint 恢复后能够精确重现执行过程
- 调试友好:步的概念使得”在第 3 步之后中断”这样的调试操作变得自然
为什么 apply_writes 中有 finish() 机制?
finish() 是为 defer 节点设计的。考虑一个场景:所有正常节点都执行完毕,但还有一个 defer 节点等待触发。此时 updated_channels 与 trigger_to_nodes 没有交集(正常节点的路由 Channel 不触发任何节点),finish() 被调用,LastValueAfterFinish Channel 变为可用,defer 节点在下一步被触发。
为什么版本号采用全局递增而非 Channel 独立递增?
统一的版本号简化了比较逻辑,同时支持一个重要特性:should_interrupt 函数检查”自上次中断以来是否有任何更新”,这需要跨 Channel 比较,全局递增的版本号使这种比较成为可能。
6.13.2 _loop.py 与 _algo.py 实测拆分:1423 + 1258 = 2681 行
本章贯穿讨论 PregelLoop 和 prepare_next_tasks 两块——它们的物理布局——
_loop.py 1423 行——
| 类 / 函数 | 起始行 | 行数 | 角色 |
|---|---|---|---|
PregelLoop(基类) | 142 | 878 | 全部 BSP 状态机逻辑(tick / after_tick / _first / put_writes)—— 占 _loop.py 62% |
SyncPregelLoop | 1020 | 196 | 同步上下文管理器、阻塞 IO |
AsyncPregelLoop | 1216 | 207 | 异步上下文管理器、__aenter__ / __aexit__ |
DuplexStream(top-level fn) | 133 | 9 | 多输出 stream 的合并器 |
_algo.py 1258 行——
| 函数 / 类 | 起始行 | 行数 | 角色 |
|---|---|---|---|
prepare_single_task | 502 | 239 | 单个 task 的全部生命周期逻辑——本文件最大、是 prepare_next_tasks 的内部循环 |
prepare_next_tasks | 327/349/370 | 128(3 个签名) | 2 个 @overload + 1 impl——按 for_execution 切两套返回类型(PregelTask 规划态 / PregelExecutableTask 执行态) |
prepare_push_task_send | 878 | 171 | Send → Task 转换(ch12 已讨论) |
prepare_push_task_functional | 741 | 137 | functional API(@task 装饰器)的 Send 路径 |
apply_writes | 218 | 109 | 把 pending_writes 应用到 channels、更新 versions(§6.7) |
local_read / should_interrupt / _triggers / _proc_input 等 helpers | - | < 100 each | 一系列纯函数 |
两条值得记住的物理事实——
PregelLoop基类 878 行 + Sync/Async 子类各 ~200 行——典型的”模板方法 + 两态实现” 模式——基类把 BSP 状态机骨架完整定义、子类只接管”如何在 sync/async 上下文里 acquire / release checkpoint / store 资源”——是 LangGraph 把”协议 vs 执行模式”切开的范例(同款思路体现在BaseTracer / AsyncBaseTracerch12 §12.5)prepare_next_tasks用 2 个@overload——签名按for_execution: Literal[False]vsLiteral[True]切换返回类型——False路径不需要store / checkpointer / manager(因为只规划)、返回PregelTask;True路径要全套依赖、返回PregelExecutableTask——用 Literal 类型而不是运行时 if 让调用方在静态类型层面就拿到对的返回类型——和 LangChain@tool的 4 overload(ch08 §8.4.4)是同款 Python 类型学手艺;mypy 能精确推断”我现在拿到的是规划态 task 还是执行态 task”
一条规模对比——本章主讲两个文件 2681 行 = ch07 §7.12.5 测得的 pregel/ 目录 11392 行的 23.5%——是理解 LangGraph 调度器最核心的子集;剩下 76.5% 是 main.py(用户入口 3718 行)+ remote.py(跨进程 1191)+ _runner.py(ch07 主角 768)+ 其它薄壳。
6.13.3 从源码闭环看一次超步的“可解释性”
把本章几个函数连起来看,LangGraph 的 Pregel 运行时有一个非常强的性质:每一次节点执行都能被解释为“某些 Channel 版本变化触发了某些任务,任务写入又推进了下一批 Channel 版本”。这不是文档层面的抽象,而是源码里明确保存下来的因果链。
第一环是计划阶段。libs/langgraph/langgraph/pregel/_loop.py:461-491 的 tick() 先检查 self.step > self.stop,然后调用 prepare_next_tasks(...),把 checkpoint、pending writes、nodes、channels、managed values、step、stop、trigger_to_nodes、updated_channels 全部交给算法层。也就是说,调度器并不根据“上一步跑了哪个节点”直接推下一步,而是根据“上一步哪些 Channel 更新了、这些 Channel 能触发哪些节点”来推下一步。节点名只是结果,Channel 版本才是因。
第二环是候选节点裁剪。pregel/_algo.py:446-464 有一个很关键的优化分支:如果同时有 updated_channels 和 trigger_to_nodes,就只遍历这些更新 Channel 对应的节点;否则才退回遍历全部 processes.keys()。这让大图中的一次局部更新不必扫描所有节点。更重要的是,它保持语义等价,因为 trigger_to_nodes 来自 pregel/main.py:3563-3569,只是把每个 PregelNode.triggers 反向索引了一遍,没有引入新的业务判断。
第三环是真正的触发判定。即使某个节点进入候选集合,prepare_single_task 仍会在 pregel/_algo.py:575-590 调 _triggers(...) 检查它是否应该运行;_triggers 本身在 pregel/_algo.py:1058-1075,逻辑很朴素:如果这个节点从未见过版本,且触发 Channel 可用,就运行;如果它见过版本,只有当前 channel_versions[chan] 大于 versions_seen[name][chan] 时才运行。这里没有隐藏的队列,也没有额外的全局状态,只有两个版本表。
第四环是更新阶段。pregel/_loop.py:540-565 的 after_tick() 收集所有 task.writes,调用 apply_writes(...),发出 values 事件,清空 pending writes,并保存 checkpoint。apply_writes 在 pregel/_algo.py:239-323 做了几件事:先按 task path 排序保证写入顺序确定;再把每个 task 的 triggers 写入 versions_seen;然后把普通写入按 Channel 分组,调用 channels[chan].update(vals);如果更新成功,就推进 checkpoint["channel_versions"][chan],并把可用 Channel 放进 updated_channels。
这个闭环解释了 LangGraph 为什么适合调试长流程 Agent。你可以从 checkpoint 里读出 channel_versions 和 versions_seen,知道某个节点为什么这一步被触发;可以从 pending writes 里知道某个任务已经完成但还没应用到 Channel;可以从 updated_channels 推出下一步候选节点。很多工作流框架只能告诉你“节点 X 执行了”,LangGraph 能告诉你“节点 X 是因为它订阅的 Channel Y 的版本超过了它上次见过的版本而执行”。
这也解释了为什么 Reducer 必须是确定性的。apply_writes 会把同一超步里所有写入集中应用,且按 task path 做确定性排序;如果 Reducer 内部依赖当前时间、随机数、外部服务或可变全局变量,那么相同 checkpoint 重放时就可能得到不同状态,版本表仍然看起来一致,但状态内容已经分叉。生产图里的 Reducer 应该只做纯粹的合并:追加消息、取最大值、集合并集、覆盖明确字段,尽量不要把副作用藏进 Reducer。
最后,finish() 机制也要放在这个闭环里理解。apply_writes 在 pregel/_algo.py:313-320 判断如果本轮 updated_channels 与 trigger_to_nodes 没有交集,就会对所有 Channel 调 finish();这不是“运行结束后清理资源”,而是给 after-finish Channel 一个机会,让 defer 节点在普通节点不再触发时变得可用。也就是说,LangGraph 的“收尾任务”不是特殊 if 分支,而仍然通过 Channel 生命周期进入同一套调度算法。
从工程实践看,排查 Pregel 执行问题可以按四步走。先看本步 tasks 里节点的 triggers,确认它为什么进入执行;再看节点 writes,确认它写了哪些 Channel;再看 apply_writes 后的 updated_channels,确认哪些 Channel 真正推进版本;最后看下一步 prepare_next_tasks 选出的节点,确认版本表是否按预期触发。这个顺序和源码调用顺序一致,比从业务日志里倒推控制流可靠得多。
这套排查顺序也适合写测试:断言最终输出之外,再断言关键步骤的 stream 事件和 checkpoint 版本变化,才能覆盖调度语义。这样测试失败时也更容易定位。
6.15 小结
本章深入剖析了 Pregel 执行引擎的完整实现。核心要点回顾:
- BSP 超步模型:计算以超步为单位推进,每步包含计划(
tick)、执行(runner.tick)、更新(after_tick)三个阶段 - PregelLoop 状态机:从
input经pending到done/out_of_steps/interrupt,管理整个生命周期 - prepare_next_tasks:通过 Channel 版本比较和
trigger_to_nodes优化,高效确定每步要执行的任务 - apply_writes:在超步之间原子地更新 Channel,同时维护版本追踪表,支持
consume/finish等生命周期方法 - 版本追踪:
channel_versions和versions_seen两张表协同工作,实现高效的变更检测和幂等调度 - 安全停止:
recursion_limit通过步数比较提供硬性终止保证
物理事实:本章主讲 _loop.py(1423 行)+ _algo.py(1258 行)合计 2681 行 = 整个 pregel/ 目录 11392 行的 23.5%——是理解 LangGraph 调度器最核心的子集;PregelLoop 基类 878 行 + Sync/Async 子类 ~200 行各印证”模板方法 + 两态实现”模式;prepare_next_tasks 2 个 @overload 按 for_execution Literal 切返回类型让 mypy 静态推断——和 LangChain @tool 同款类型学手艺。