Appearance
第6章 Pregel 执行引擎
6.1 引言
Pregel 是 LangGraph 的心脏。当你调用 compiled_graph.invoke() 或 compiled_graph.stream() 时,真正驱动计算的是 Pregel 执行引擎——一个基于 Google Pregel 论文思想、专门为 AI 工作流定制的 BSP(Bulk Synchronous Parallel)运行时。
Google 在 2010 年发表的 Pregel 论文描述了一种用于大规模图计算的编程模型:计算以超步(superstep)为单位推进,每个超步中所有活跃节点并行执行,执行结果在超步之间通过消息传递可见。LangGraph 将这个模型适配到了 AI 工作流场景——"节点"是 AI Agent 或工具调用,"消息"是 Channel 中的状态更新,"超步"是一轮任务执行与状态同步。
本章将深入剖析以下核心组件:
Pregel类(pregel/main.py)—— 执行引擎的公共接口PregelLoop(pregel/_loop.py)—— 执行循环的核心状态机SyncPregelLoop/AsyncPregelLoop—— 同步和异步的具体实现prepare_next_tasks算法 —— 决定每个超步执行哪些任务apply_writes算法 —— 在超步之间更新 Channel 状态- 版本追踪机制 —— 高效判定哪些节点需要被触发
max_steps停止条件 —— 防止无限循环的安全阀
本章要点
- Pregel 类是 LangGraph 的统一运行时接口,
invoke()和stream()都构建在同一个执行循环上 PregelLoop是一个状态机,核心循环为tick()-> 执行任务 ->after_tick()prepare_next_tasks通过 Channel 版本比较决定哪些节点在下一步执行apply_writes在超步之间原子地更新所有 Channel,确保步内隔离- 版本追踪使用
channel_versions和versions_seen两张表实现高效的变更检测 recursion_limit通过step > stop条件提供安全停止保证
6.2 Pregel 类:执行引擎的入口
Pregel 类定义在 pregel/main.py 中,是 CompiledStateGraph 的父类。它持有执行所需的全部配置:
python
class Pregel(PregelProtocol, Generic[StateT, ContextT, InputT, OutputT]):
nodes: dict[str, PregelNode] # 编译后的节点
channels: dict[str, BaseChannel | ManagedValueSpec] # 通道定义
input_channels: str | Sequence[str] # 输入通道
output_channels: str | Sequence[str] # 输出通道
stream_channels: str | Sequence[str] | None
trigger_to_nodes: Mapping[str, Sequence[str]] # 优化映射
checkpointer: Checkpointer # 检查点存储
store: BaseStore | None # 持久化存储
cache: BaseCache | None # 节点结果缓存
retry_policy: Sequence[RetryPolicy] # 全局重试策略
cache_policy: CachePolicy | None # 全局缓存策略
interrupt_before_nodes: All | Sequence[str]
interrupt_after_nodes: All | Sequence[str]
step_timeout: float | None # 步超时6.2.1 invoke() 和 stream() 的关系
在 Pregel 的设计中,invoke() 是基于 stream() 实现的——它调用 stream() 收集所有输出,然后返回最终值:
python
def invoke(self, input, config=None, *, stream_mode="values", ...):
latest = None
for chunk in self.stream(
input, config,
stream_mode=["updates", "values"] if stream_mode == "values"
else stream_mode,
...
):
if stream_mode == "values":
mode, payload = chunk
if mode == "values":
latest = payload
return latest这意味着 stream() 才是真正的执行入口。所有的执行逻辑都围绕流式输出构建。
6.2.2 stream() 的执行框架
stream() 方法的核心结构如下:
python
def stream(self, input, config=None, *, stream_mode=None, ...):
# 1. 准备配置
stream_modes, output_keys, interrupt_before_, interrupt_after_, \
checkpointer, store, cache, durability_ = self._defaults(...)
# 2. 构建流式队列
stream = SyncQueue()
# 3. 进入 PregelLoop 上下文
with SyncPregelLoop(
input, stream=StreamProtocol(stream.put, stream_modes),
config=config, checkpointer=checkpointer,
nodes=self.nodes, specs=self.channels,
...
) as loop:
# 4. 创建 Runner
runner = PregelRunner(
submit=weakref.WeakMethod(loop.submit),
put_writes=weakref.WeakMethod(loop.put_writes),
)
# 5. BSP 主循环
while loop.tick():
for _ in runner.tick(
[t for t in loop.tasks.values() if not t.writes],
timeout=self.step_timeout,
schedule_task=loop.accept_push,
):
yield from _output(stream.get, ...)
loop.after_tick()这段代码精确地体现了 BSP 模型的三个阶段: