Skip to content

第6章 Pregel 执行引擎

6.1 引言

Pregel 是 LangGraph 的心脏。当你调用 compiled_graph.invoke()compiled_graph.stream() 时,真正驱动计算的是 Pregel 执行引擎——一个基于 Google Pregel 论文思想、专门为 AI 工作流定制的 BSP(Bulk Synchronous Parallel)运行时。

Google 在 2010 年发表的 Pregel 论文描述了一种用于大规模图计算的编程模型:计算以超步(superstep)为单位推进,每个超步中所有活跃节点并行执行,执行结果在超步之间通过消息传递可见。LangGraph 将这个模型适配到了 AI 工作流场景——"节点"是 AI Agent 或工具调用,"消息"是 Channel 中的状态更新,"超步"是一轮任务执行与状态同步。

本章将深入剖析以下核心组件:

  • Pregel 类(pregel/main.py)—— 执行引擎的公共接口
  • PregelLooppregel/_loop.py)—— 执行循环的核心状态机
  • SyncPregelLoop / AsyncPregelLoop —— 同步和异步的具体实现
  • prepare_next_tasks 算法 —— 决定每个超步执行哪些任务
  • apply_writes 算法 —— 在超步之间更新 Channel 状态
  • 版本追踪机制 —— 高效判定哪些节点需要被触发
  • max_steps 停止条件 —— 防止无限循环的安全阀

本章要点

  1. Pregel 类是 LangGraph 的统一运行时接口,invoke()stream() 都构建在同一个执行循环上
  2. PregelLoop 是一个状态机,核心循环为 tick() -> 执行任务 -> after_tick()
  3. prepare_next_tasks 通过 Channel 版本比较决定哪些节点在下一步执行
  4. apply_writes 在超步之间原子地更新所有 Channel,确保步内隔离
  5. 版本追踪使用 channel_versionsversions_seen 两张表实现高效的变更检测
  6. recursion_limit 通过 step > stop 条件提供安全停止保证

6.2 Pregel 类:执行引擎的入口

Pregel 类定义在 pregel/main.py 中,是 CompiledStateGraph 的父类。它持有执行所需的全部配置:

python
class Pregel(PregelProtocol, Generic[StateT, ContextT, InputT, OutputT]):
    nodes: dict[str, PregelNode]         # 编译后的节点
    channels: dict[str, BaseChannel | ManagedValueSpec]  # 通道定义
    input_channels: str | Sequence[str]  # 输入通道
    output_channels: str | Sequence[str] # 输出通道
    stream_channels: str | Sequence[str] | None
    trigger_to_nodes: Mapping[str, Sequence[str]]  # 优化映射
    checkpointer: Checkpointer           # 检查点存储
    store: BaseStore | None              # 持久化存储
    cache: BaseCache | None              # 节点结果缓存
    retry_policy: Sequence[RetryPolicy]  # 全局重试策略
    cache_policy: CachePolicy | None     # 全局缓存策略
    interrupt_before_nodes: All | Sequence[str]
    interrupt_after_nodes: All | Sequence[str]
    step_timeout: float | None           # 步超时

6.2.1 invoke() 和 stream() 的关系

在 Pregel 的设计中,invoke() 是基于 stream() 实现的——它调用 stream() 收集所有输出,然后返回最终值:

python
def invoke(self, input, config=None, *, stream_mode="values", ...):
    latest = None
    for chunk in self.stream(
        input, config,
        stream_mode=["updates", "values"] if stream_mode == "values"
        else stream_mode,
        ...
    ):
        if stream_mode == "values":
            mode, payload = chunk
            if mode == "values":
                latest = payload
    return latest

这意味着 stream() 才是真正的执行入口。所有的执行逻辑都围绕流式输出构建。

6.2.2 stream() 的执行框架

stream() 方法的核心结构如下:

python
def stream(self, input, config=None, *, stream_mode=None, ...):
    # 1. 准备配置
    stream_modes, output_keys, interrupt_before_, interrupt_after_, \
        checkpointer, store, cache, durability_ = self._defaults(...)

    # 2. 构建流式队列
    stream = SyncQueue()

    # 3. 进入 PregelLoop 上下文
    with SyncPregelLoop(
        input, stream=StreamProtocol(stream.put, stream_modes),
        config=config, checkpointer=checkpointer,
        nodes=self.nodes, specs=self.channels,
        ...
    ) as loop:
        # 4. 创建 Runner
        runner = PregelRunner(
            submit=weakref.WeakMethod(loop.submit),
            put_writes=weakref.WeakMethod(loop.put_writes),
        )
        # 5. BSP 主循环
        while loop.tick():
            for _ in runner.tick(
                [t for t in loop.tasks.values() if not t.writes],
                timeout=self.step_timeout,
                schedule_task=loop.accept_push,
            ):
                yield from _output(stream.get, ...)
            loop.after_tick()

这段代码精确地体现了 BSP 模型的三个阶段:

基于 VitePress 构建