LangGraph 设计与实现

第7章 任务调度与并行执行

作者 杨艺韬 · 12,292 字

第7章 任务调度与并行执行

7.1 引言

上一章我们剖析了 Pregel 执行循环的宏观架构——tick()after_tick() 和 BSP 超步模型。但在每个超步内部,还有一个同样复杂的世界:多个任务如何被并行调度?任务失败时如何重试?缓存如何避免重复计算?PUSH 任务和 PULL 任务在运行时有何不同?

本章将深入 LangGraph 的任务执行层,涉及以下核心组件:

  • PregelExecutableTasktypes.py)—— 可执行任务的数据结构
  • PregelRunnerpregel/_runner.py)—— 任务调度器,管理并行执行和结果收集
  • BackgroundExecutor / AsyncBackgroundExecutorpregel/_executor.py)—— 线程池和 asyncio 并行原语
  • run_with_retry / arun_with_retrypregel/_retry.py)—— 重试逻辑
  • 缓存匹配机制 —— cache_policyCacheKey 的协作

这些组件共同实现了一个高效的并行执行框架,在保证正确性的前提下最大化吞吐量。

本章要点

  1. PregelExecutableTask 是任务执行的最小单元,包含输入、处理器、写入缓冲、配置等全部信息
  2. PregelRunner 通过 FuturesDict 管理并发任务,支持”任一失败则全部停止”的语义
  3. PULL 任务由 Channel 版本变更触发,输入从 Channel 读取;PUSH 任务由 Send API 创建,输入由调用者指定
  4. BackgroundExecutor 使用线程池实现同步并行,AsyncBackgroundExecutor 使用 asyncio 任务实现异步并行
  5. 重试策略支持指数退避、抖动、最大重试次数,以及按异常类型匹配的多策略组合
  6. 缓存策略通过 CacheKey 关联节点身份和输入哈希,支持 TTL 过期

7.2 PregelExecutableTask:任务的全貌

PregelExecutableTask 定义在 types.py 中,是一个不可变的 dataclass:

@dataclass(frozen=True)
class PregelExecutableTask:
    name: str                          # 节点名称
    input: Any                         # 任务输入
    proc: Runnable                     # 可执行处理器(bound + writers 的组合)
    writes: deque[tuple[str, Any]]     # 写入缓冲区
    config: RunnableConfig             # 完整的运行配置
    triggers: Sequence[str]            # 触发此任务的 Channel 列表
    retry_policy: Sequence[RetryPolicy] # 重试策略
    cache_key: CacheKey | None         # 缓存键(如果启用了缓存)
    id: str                            # 全局唯一的任务 ID
    path: tuple[str | int | tuple, ...] # 任务路径(用于排序和标识)
    writers: Sequence[Runnable] = ()    # 写入器引用
    subgraphs: Sequence[PregelProtocol] = ()  # 子图引用

虽然标记为 frozen=True(不可变),但 writes 字段是一个 deque——它的引用不可变,但内容可变。这个设计使得任务执行过程中可以向 writes 追加数据,同时防止意外替换整个 writes 对象。

7.2.1 任务 ID 的生成

任务 ID 是通过确定性哈希函数生成的,确保同一个 Checkpoint 状态下,相同的任务总是获得相同的 ID:

# 对于 PULL 任务
task_id = task_id_func(
    checkpoint_id_bytes,    # Checkpoint ID 的字节表示
    checkpoint_ns,          # 命名空间(如 "parent|agent")
    str(step),              # 步数
    name,                   # 节点名称
    PULL,                   # 任务类型
    *triggers,              # 触发 Channel
)

# 对于 PUSH 任务(Send API)
task_id = task_id_func(
    checkpoint_id_bytes,
    checkpoint_ns,
    str(step),
    name,
    PUSH,
    task_path_str(parent_path),  # 父任务路径
    str(idx),                    # 在父任务写入中的索引
)

LangGraph 1.1.6 支持两种哈希函数:xxhash(v2 Checkpoint 格式,更快)和 uuid5(v1 格式,兼容旧版)。确定性的 ID 是 Checkpoint 恢复的关键——恢复后重新计算的任务 ID 与保存的 pending writes 中的 task ID 必须匹配,这样 _match_writes 才能正确地将已保存的写入结果关联到重建的任务。

7.2.2 proc 的构成

PregelExecutableTask.proc 是一个 RunnableSeq,它将用户逻辑和写入器串联:

flowchart LR
    INPUT[任务输入] --> BOUND[bound: 用户函数]
    BOUND --> W1[ChannelWrite: 状态更新]
    W1 --> W2[ChannelWrite: 边路由]

    subgraph "proc = RunnableSeq(bound, *writers)"
        BOUND
        W1
        W2
    end

执行 task.proc.invoke(task.input, task.config) 时:

  1. 首先调用用户函数,传入从 Channel 读取的状态
  2. 用户函数返回状态更新(如 {"count": 5}
  3. 第一个 ChannelWrite 将更新转化为 Channel 写入元组,通过 CONFIG_KEY_SEND 发送
  4. 第二个 ChannelWrite(如果有边)将路由信号写入目标节点的触发 Channel

7.2.3 config 中注入的关键函数

每个任务的 config 中注入了几个关键回调,使得任务执行过程中能与 PregelLoop 交互:

config = patch_config(
    config,
    configurable={
        CONFIG_KEY_TASK_ID: task_id,
        CONFIG_KEY_SEND: writes.extend,     # 写入收集器
        CONFIG_KEY_READ: partial(            # 状态读取器
            local_read, scratchpad, channels, managed,
            PregelTaskWrites(path, name, writes, triggers),
        ),
        CONFIG_KEY_CHECKPOINTER: checkpointer,
        CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns,
        CONFIG_KEY_SCRATCHPAD: scratchpad,
        CONFIG_KEY_RUNTIME: runtime,
    },
)
  • CONFIG_KEY_SEND:绑定到 writes.extend——当 ChannelWrite.do_write 被调用时,写入元组被追加到任务的 writes deque。deque.extend 是线程安全的。
  • CONFIG_KEY_READ:绑定到 local_read 函数——条件边通过此函数读取”应用了当前任务写入后”的状态快照。这确保条件判断基于最新状态。

7.3 PULL 任务 vs PUSH 任务

LangGraph 中有两种根本不同的任务触发方式:

flowchart TB
    subgraph "PULL 任务"
        direction TB
        EDGE["边/条件边写入\nbranch:to:agent"] --> CHAN_VER["Channel 版本更新"]
        CHAN_VER --> TRIGGERS["_triggers() 检测\n版本 > seen"]
        TRIGGERS --> PULL_PREP["prepare_single_task\npath = (PULL, name)"]
        PULL_PREP --> PULL_INPUT["输入 = Channel 读取结果\n读取 node.channels 指定的 Channel"]
        PULL_INPUT --> PULL_TASK["PregelExecutableTask"]
    end

    subgraph "PUSH 任务"
        direction TB
        SEND_API["节点返回 Send('tool', data)\n或条件边返回 Send"] --> TASKS_CH["__pregel_tasks Topic"]
        TASKS_CH --> PUSH_PREP["prepare_single_task\npath = (PUSH, idx)"]
        PUSH_PREP --> PUSH_INPUT["输入 = Send.arg\n由调用者直接指定"]
        PUSH_INPUT --> PUSH_TASK["PregelExecutableTask"]
    end

    style PULL_TASK fill:#c8e6c9
    style PUSH_TASK fill:#fff3e0

PULL 任务

PULL 任务是标准的 BSP 触发方式。在 prepare_single_task 中,对于 (PULL, name) 路径:

if task_path[0] == PULL:
    name = task_path[1]
    proc = processes[name]
    # 检查触发条件
    if _triggers(channels, checkpoint["channel_versions"],
                 checkpoint["versions_seen"].get(name),
                 null_version, proc):
        # 读取输入
        val = _proc_input(proc, managed, channels,
                          for_execution=True, ...)
        if val is MISSING:
            return  # Channel 为空,跳过
        # 创建任务
        return PregelExecutableTask(name, val, node, writes, ...)

PULL 任务的输入来自 Channel:_proc_input 根据 proc.channels 配置读取指定的 Channel 值,如果有 mapper 则进行类型转换。

PUSH 任务

PUSH 任务通过两种途径创建:

  1. Send API(prepare_push_task_send:当 __pregel_tasks Topic Channel 中有 Send 对象时
  2. Functional API(prepare_push_task_functional:当任务路径以 Call 对象结尾时

对于 Send API 的 PUSH 任务:

if task_path[0] == PUSH:
    # 获取 Send 对象
    send = tasks_channel.get()[task_path[1]]
    name = send.node
    val = send.arg  # 直接使用 Send 的参数作为输入
    proc = processes[name]
    # 创建任务(不检查 _triggers)
    return PregelExecutableTask(name, val, node, writes, ...)

关键区别:PUSH 任务不检查 _triggers——它们总是被执行。输入直接来自 Send.arg,而非从 Channel 读取。这使得同一个节点可以被多次调用,每次使用不同的输入。

7.4 PregelRunner:并行调度器

PregelRunner 定义在 pregel/_runner.py 中,负责在每个超步中并行执行所有任务:

class PregelRunner:
    def __init__(self, *, submit, put_writes,
                 use_astream=False, node_finished=None):
        self.submit = submit          # 提交函数(弱引用)
        self.put_writes = put_writes  # 写入保存函数(弱引用)
        self.use_astream = use_astream
        self.node_finished = node_finished

7.4.1 同步 tick 的执行流程

def tick(self, tasks, *, reraise=True, timeout=None,
         retry_policy=None, get_waiter=None, schedule_task):
    tasks = tuple(tasks)
    futures = FuturesDict(
        callback=weakref.WeakMethod(self.commit),
        event=threading.Event(),
        future_type=concurrent.futures.Future,
    )
    # 让出控制权给调用者
    yield

    # 快速路径:单任务无超时
    if len(tasks) == 1 and timeout is None and get_waiter is None:
        t = tasks[0]
        try:
            run_with_retry(t, retry_policy, ...)
            self.commit(t, None)
        except Exception as exc:
            self.commit(t, exc)
            ...
        return

    # 调度所有任务到线程池
    for t in tasks:
        fut = self.submit()(
            run_with_retry, t, retry_policy, ...
        )
        futures[fut] = t

    # 等待任务完成,逐个处理
    while len(futures) > 0:
        done, inflight = concurrent.futures.wait(
            futures,
            return_when=concurrent.futures.FIRST_COMPLETED,
            timeout=...,
        )
        for fut in done:
            futures.pop(fut)
        if _should_stop_others(done):
            break
        yield  # 让出控制权给调用者处理流式输出

    # 等待所有回调完成
    futures.event.wait(timeout=...)
    yield

    # 检查异常
    _panic_or_proceed(futures.done, panic=reraise)

7.4.2 FuturesDict:智能的并发管理

FuturesDict 是一个自定义的 dict,它在 Future 完成时自动调用回调并管理计数器:

class FuturesDict(dict):
    event: threading.Event  # 所有任务完成时设置
    callback: weakref.ref   # commit 回调
    counter: int            # 活跃任务计数
    done: set              # 已完成的 Future 集合

    def __setitem__(self, key, value):
        super().__setitem__(key, value)
        if value is not None:
            self.event.clear()
            self.counter += 1
            key.add_done_callback(partial(self.on_done, value))

    def on_done(self, task, fut):
        try:
            if cb := self.callback():
                cb(task, _exception(fut))
        finally:
            self.done.add(fut)
            self.counter -= 1
            if self.counter == 0 or _should_stop_others(self.done):
                self.event.set()
sequenceDiagram
    participant Runner as PregelRunner
    participant FD as FuturesDict
    participant Pool as 线程池
    participant T1 as 任务 A
    participant T2 as 任务 B
    participant PL as PregelLoop

    Runner->>FD: futures[fut_a] = task_a
    FD->>Pool: submit(run_with_retry, task_a)
    Runner->>FD: futures[fut_b] = task_b
    FD->>Pool: submit(run_with_retry, task_b)

    Pool->>T1: 执行
    Pool->>T2: 执行

    T1-->>FD: on_done(task_a, result)
    FD->>Runner: commit(task_a, None)
    Runner->>PL: put_writes(task_a.id, task_a.writes)

    Note over Runner: yield - 让出控制权
    Note over PL: 处理流式输出

    T2-->>FD: on_done(task_b, result)
    FD->>Runner: commit(task_b, None)
    Runner->>PL: put_writes(task_b.id, task_b.writes)

    FD-->>FD: counter == 0, event.set()

7.4.3 单任务快速路径

当只有一个任务且没有超时时,PregelRunner 使用快速路径——直接在当前线程执行,避免线程池的开销:

if len(tasks) == 1 and timeout is None and get_waiter is None:
    t = tasks[0]
    try:
        run_with_retry(t, retry_policy, ...)
        self.commit(t, None)
    except Exception as exc:
        self.commit(t, exc)

这个优化在简单的线性图(每步只有一个节点执行)中显著减少了开销。

7.4.4 commit:结果提交

commit 方法根据任务结果的不同情况执行不同的处理:

def commit(self, task, exception):
    if isinstance(exception, asyncio.CancelledError):
        # 被取消的任务:保存错误
        task.writes.append((ERROR, exception))
        self.put_writes()(task.id, task.writes)
    elif exception:
        if isinstance(exception, GraphInterrupt):
            # 中断:保存中断信息
            writes = [(INTERRUPT, exception.args[0])]
            self.put_writes()(task.id, writes)
        elif isinstance(exception, GraphBubbleUp):
            # 冒泡异常:不保存,将在 _panic_or_proceed 中处理
            pass
        else:
            # 普通错误:保存错误信息
            task.writes.append((ERROR, exception))
            self.put_writes()(task.id, task.writes)
    else:
        # 成功:通知节点完成,保存写入
        if self.node_finished:
            self.node_finished(task.name)
        if not task.writes:
            task.writes.append((NO_WRITES, None))
        self.put_writes()(task.id, task.writes)

注意 NO_WRITES 标记——即使任务没有产生任何写入,也会标记一个占位写入。这确保了 Checkpointer 知道该任务已经执行过,在恢复时不会重复执行。

7.4.5 错误传播与任务取消

_should_stop_others 函数检查是否有任务失败:

def _should_stop_others(done):
    for fut in done:
        if fut.cancelled():
            continue
        elif exc := fut.exception():
            if not isinstance(exc, GraphBubbleUp) and \
               fut not in SKIP_RERAISE_SET:
                return True
    return False

当一个任务失败时(非 GraphBubbleUp 异常),所有其他任务会被取消。GraphBubbleUp(包括 GraphInterrupt)是特殊的——它们不被视为错误,不会导致其他任务被取消。

_panic_or_proceed 在所有任务完成后进行最终检查:

def _panic_or_proceed(futs, *, panic=True):
    interrupts = []
    while done:
        fut = done.pop()
        if exc := _exception(fut):
            if isinstance(exc, GraphInterrupt):
                interrupts.append(exc)
            elif fut not in SKIP_RERAISE_SET:
                raise exc
    if interrupts:
        # 合并所有中断
        raise GraphInterrupt(
            tuple(i for exc in interrupts for i in exc.args[0])
        )
    if inflight:
        raise TimeoutError("Timed out")

多个任务的 GraphInterrupt 会被合并为一个,确保所有中断信息都被保留。

7.4.6 pregel/_algo.py 1150 行:任务规划器的真实源码账本

PregelRunner 解决的是”如何把一批任务跑起来”——但”这一批任务从哪来”是另一个同等重要的问题、答案藏在 pregel/_algo.py。截至 main 分支 2026-04-22、该文件 1150 行、是 pregel 目录第三大文件(仅次于 main.py 3718 和 _loop.py 1423)。它有两个主函数是本章理解任务调度必须对齐的外部接口——

prepare_next_tasks 函数签名(真实源码,截至 main 分支 2026-04-22):

def prepare_next_tasks(
    checkpoint: Checkpoint,
    pending_writes: list[PendingWrite],
    processes: Mapping[str, PregelNode],
    channels: Mapping[str, BaseChannel],
    managed: ManagedValueMapping,
    config: RunnableConfig,
    step: int,
    stop: int,
    *,
    for_execution: bool,
    store: BaseStore | None = None,
    checkpointer: BaseCheckpointSaver | None = None,
    manager: None | ParentRunManager | AsyncParentRunManager = None,
    trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
    updated_channels: set[str] | None = None,
    retry_policy: Sequence[RetryPolicy] = (),
    cache_policy: CachePolicy | None = None,
) -> dict[str, PregelTask] | dict[str, PregelExecutableTask]:
    """Prepare the set of tasks that will make up the next Pregel step.
    ...
    """

签名里17 个参数每一个都有来路——

参数来自作用
checkpointPregelLoop.checkpoint读取 channel_versions / versions_seen 决定 PULL 激活
pending_writesPregelLoop.checkpoint_pending_writes用来在恢复时 _match_writes——已写入的 task 直接填 writes
processescompile 阶段固化的 node_name → PregelNode决定可调度节点集合
channelsPregelLoop.channelsPULL 任务读输入的数据源
managedPregelLoop.managed外部 reducer / store 等注入对象
config用户 invoke 时传入注入各种 CONFIG_KEY_* 回调
step + stop超步计数器 + 终止条件进入任务 config 的 scratchpad
for_executionTrue 出执行任务、False 出只读 PregelTask两种调用方共用一套规划
trigger_to_nodescompile 阶段算好的倒排索引O(1) 查找”这个 channel 变了激活哪些节点”
updated_channels上一步 apply_writes 返回值增量激活、不遍历所有 channel
retry_policy / cache_policy编译期配置注入到生成的 PregelExecutableTask

trigger_to_nodesupdated_channels 这两个增量结构是 LangGraph 规划器 1.1.6 的性能关键——早期实现每步遍历所有 node × 所有 channel、是 O(N·M)、节点数 20 时尚可、上百节点的大图立刻显著变慢。1.1.6 改成”只看上步改过的 channel 通过倒排索引激活的节点”、典型场景下 O(updated_channels 的度)。这种”增量规划”的思想和第 6 章 tick() 的”增量 checkpoint”是同源的。

apply_writes 函数签名(真实源码,截至 main 分支 2026-04-22):

def apply_writes(
    checkpoint: Checkpoint,
    channels: Mapping[str, BaseChannel],
    tasks: Iterable[WritesProtocol],
    get_next_version: GetNextVersion | None,
    trigger_to_nodes: Mapping[str, Sequence[str]],
) -> set[str]:
    """Apply writes from a set of tasks (usually the tasks from a Pregel step)
    to the checkpoint and channels, and return managed values writes to be
    applied externally.
    ...
    """

返回值 set[str] 就是下一次 prepare_next_tasksupdated_channels——两个函数首尾咬合形成闭环:prepare_next_tasks → run → apply_writes → updated_channels → prepare_next_tasks。这是 §7.4 的 Runner 为什么能完全不知道”下一步做什么”的原因——规划和执行被干净切分、规划算法全部锁在 _algo.py 里、Runner 只负责”把给我的任务跑完并 commit”。

源码文件分工表的对照(line 数据来自 wc -l)——

职责文件主要对外接口
规划”下一步跑什么”_algo.py1150prepare_next_tasks / apply_writes
调度”当前这一批怎么跑”_runner.py745PregelRunner.tick / atick
并发原语_executor.py223BackgroundExecutor / AsyncBackgroundExecutor
单任务重试_retry.py318run_with_retry / arun_with_retry
主循环串联上面四者_loop.py1423PregelLoop.tick / after_tick

规划层(algo)不知道并行——执行层(runner)不知道如何生成任务——重试层(retry)不知道调度——执行器(executor)不知道任务语义。四层正交、通过数据结构(PregelExecutableTaskCheckpointRetryPolicy)而非继承关系耦合——这是典型的Pregel paper 式架构、从 Google 2010 年论文传承至今、LangGraph 用 Python 实现时保留了这个骨架。

7.5 BackgroundExecutor:线程池并行

BackgroundExecutor 定义在 pregel/_executor.py 中,是同步执行的后台任务管理器:

class BackgroundExecutor(AbstractContextManager):
    def __init__(self, config: RunnableConfig):
        self.stack = ExitStack()
        self.executor = self.stack.enter_context(
            get_executor_for_config(config)
        )
        self.tasks: dict[Future, tuple[bool, bool]] = {}

    def submit(self, fn, *args,
               __cancel_on_exit__=False,
               __reraise_on_exit__=True,
               __next_tick__=False,
               **kwargs) -> Future:
        ctx = copy_context()
        if __next_tick__:
            task = self.executor.submit(
                next_tick, ctx.run, fn, *args, **kwargs
            )
        else:
            task = self.executor.submit(ctx.run, fn, *args, **kwargs)
        self.tasks[task] = (__cancel_on_exit__, __reraise_on_exit__)
        task.add_done_callback(self.done)
        return task

7.5.1 关键参数

参数含义使用场景
__cancel_on_exit__退出时是否取消任务waiter 任务(流式输出等待器)
__reraise_on_exit__退出时是否重新抛出异常PUSH 子任务设为 False(由父任务处理)
__next_tick__是否在下一个”tick”执行动态创建的子任务,确保当前步的写入先提交

7.5.2 next_tick 机制

__next_tick__ 参数触发 next_tick 函数:

def next_tick(fn, *args, **kwargs):
    time.sleep(0)  # 让出 CPU 时间片
    return fn(*args, **kwargs)

time.sleep(0) 看起来无意义,但它实际上让出了当前线程的执行权,允许线程池中的其他线程(特别是正在执行 commit 回调的线程)先完成。这确保了动态创建的子任务在父任务的当前写入提交到 PregelLoop 之后才开始执行。

7.5.3 上下文管理器退出

def __exit__(self, exc_type, exc_value, traceback):
    tasks = self.tasks.copy()
    # 取消标记为 cancel_on_exit 的任务
    for task, (cancel, _) in tasks.items():
        if cancel:
            task.cancel()
    # 等待所有任务完成
    if pending := {t for t in tasks if not t.done()}:
        concurrent.futures.wait(pending)
    # 关闭线程池
    self.stack.__exit__(exc_type, exc_value, traceback)
    # 重新抛出标记为 reraise 的任务异常
    if exc_type is None:
        for task, (_, reraise) in tasks.items():
            if not reraise:
                continue
            try:
                task.result()
            except concurrent.futures.CancelledError:
                pass

退出时的处理顺序很重要:先取消,再等待,再清理,最后处理异常。这确保了所有资源被正确释放。

7.6 AsyncBackgroundExecutor:异步并行

AsyncBackgroundExecutor 是异步版本,使用 asyncio 任务替代线程:

class AsyncBackgroundExecutor(AbstractAsyncContextManager):
    def __init__(self, config: RunnableConfig):
        self.tasks: dict[asyncio.Future, tuple[bool, bool]] = {}
        self.loop = asyncio.get_running_loop()
        if max_concurrency := config.get("max_concurrency"):
            self.semaphore = asyncio.Semaphore(max_concurrency)
        else:
            self.semaphore = None

    def submit(self, fn, *args, __cancel_on_exit__=False,
               __reraise_on_exit__=True, __next_tick__=False,
               **kwargs) -> asyncio.Future:
        coro = fn(*args, **kwargs)
        if self.semaphore:
            coro = gated(self.semaphore, coro)
        task = run_coroutine_threadsafe(
            coro, self.loop,
            context=copy_context(),
            lazy=__next_tick__,
        )
        self.tasks[task] = (__cancel_on_exit__, __reraise_on_exit__)
        return task

7.6.1 并发控制:Semaphore

flowchart TB
    subgraph "无并发限制"
        T1A[任务 A] --> EXEC1[立即执行]
        T1B[任务 B] --> EXEC1
        T1C[任务 C] --> EXEC1
    end

    subgraph "max_concurrency=2"
        T2A[任务 A] --> SEM["Semaphore(2)"]
        T2B[任务 B] --> SEM
        T2C[任务 C] --> SEM
        SEM -->|获取 permit| EXEC2A[执行 A]
        SEM -->|获取 permit| EXEC2B[执行 B]
        SEM -->|等待 permit| WAIT[任务 C 等待]
        EXEC2A -->|释放| SEM
        SEM -->|获取 permit| EXEC2C[执行 C]
    end

max_concurrency 通过 gated 协程实现:

async def gated(semaphore, coro):
    async with semaphore:
        return await coro

这提供了细粒度的并发控制——在 AI 工作流中,过多的并发可能导致 API 限流或内存溢出,max_concurrency 提供了安全阀。

7.6.2 同步 vs 异步对比

特性BackgroundExecutorAsyncBackgroundExecutor
并行原语ThreadPoolExecutorasyncio.Task
并发控制线程池大小asyncio.Semaphore
上下文传递copy_context() + ctx.runcopy_context() 参数
取消机制Future.cancel()(仅未启动)Task.cancel()(立即)
next_ticktime.sleep(0)lazy=True 延迟调度

异步版本的取消更加彻底——asyncio 任务可以在执行过程中被取消(通过 CancelledError),而线程池中的 Future 只能在未开始时被取消。

7.7 重试策略:run_with_retry

重试逻辑定义在 pregel/_retry.py 中,run_with_retry 是同步版本:

def run_with_retry(task, retry_policy, configurable=None):
    retry_policy = task.retry_policy or retry_policy
    attempts = 0
    config = task.config
    if configurable is not None:
        config = patch_configurable(config, configurable)

    while True:
        try:
            # 清除上次尝试的写入
            task.writes.clear()
            # 执行任务
            return task.proc.invoke(task.input, config)
        except ParentCommand as exc:
            # Command 路由到当前图或父图
            ns = config[CONF][CONFIG_KEY_CHECKPOINT_NS]
            cmd = exc.args[0]
            if cmd.graph in (ns, recast_checkpoint_ns(ns), task.name):
                for w in task.writers:
                    w.invoke(cmd, config)
                break
            elif cmd.graph == Command.PARENT:
                exc.args = (replace(cmd, graph=parent_ns),)
            raise
        except GraphBubbleUp:
            # 中断信号,直接向上传播
            raise
        except Exception as exc:
            if not retry_policy:
                raise

            # 查找匹配的重试策略
            matching_policy = None
            for policy in retry_policy:
                if _should_retry_on(policy, exc):
                    matching_policy = policy
                    break

            if not matching_policy:
                raise

            attempts += 1
            if attempts >= matching_policy.max_attempts:
                raise

            # 计算退避时间
            interval = matching_policy.initial_interval
            interval = min(
                matching_policy.max_interval,
                interval * (matching_policy.backoff_factor ** (attempts - 1)),
            )
            sleep_time = (
                interval + random.uniform(0, 1)
                if matching_policy.jitter else interval
            )
            time.sleep(sleep_time)

            # 标记为恢复模式
            config = patch_configurable(
                config, {CONFIG_KEY_RESUMING: True}
            )

7.7.1 RetryPolicy 数据结构

class RetryPolicy(NamedTuple):
    initial_interval: float = 0.5   # 首次重试等待时间(秒)
    backoff_factor: float = 2.0     # 退避倍数
    max_interval: float = 128.0     # 最大等待时间(秒)
    max_attempts: int = 3           # 最大重试次数
    jitter: bool = True             # 是否添加随机抖动
    retry_on: type[Exception] | Sequence[type[Exception]] | Callable = default_retry_on

7.7.2 退避算法可视化

flowchart TB
    subgraph "指数退避 + 抖动"
        A1["尝试 1: 执行失败"] --> W1["等待 0.5s + jitter"]
        W1 --> A2["尝试 2: 执行失败"]
        A2 --> W2["等待 1.0s + jitter"]
        W2 --> A3["尝试 3: 执行失败"]
        A3 --> FAIL["达到 max_attempts=3\n抛出异常"]
    end

    subgraph "退避时间计算"
        direction TB
        F["interval = initial_interval * backoff_factor^(attempts-1)"]
        F --> C["interval = min(interval, max_interval)"]
        C --> J{"jitter?"}
        J -->|是| JV["sleep = interval + random(0,1)"]
        J -->|否| NJ["sleep = interval"]
    end

7.7.3 多策略匹配

LangGraph 支持为同一个节点配置多个重试策略,按照异常类型进行匹配:

def _should_retry_on(retry_policy, exc):
    if isinstance(retry_policy.retry_on, Sequence):
        return isinstance(exc, tuple(retry_policy.retry_on))
    elif isinstance(retry_policy.retry_on, type):
        return isinstance(exc, retry_policy.retry_on)
    elif callable(retry_policy.retry_on):
        return retry_policy.retry_on(exc)

这允许精细的重试控制:

# 对 API 限流重试更多次,对网络错误快速重试
graph.add_node(
    "llm_call", llm_fn,
    retry_policy=[
        RetryPolicy(max_attempts=5, initial_interval=2.0,
                     retry_on=RateLimitError),
        RetryPolicy(max_attempts=3, initial_interval=0.1,
                     retry_on=ConnectionError),
    ]
)

7.7.4 重试中的状态管理

重试时有两个关键的状态操作:

  1. task.writes.clear():清除上次尝试的写入。这确保失败的写入不会被保留——只有成功执行的写入才会被提交。
  2. CONFIG_KEY_RESUMING: True:标记为恢复模式。这通知子图”你正在被重试”,子图可以据此跳过已完成的步骤。

7.7.5 可观测性和分布式 runtime 的两个额外细节

上面的伪代码抓住了重试主干,但真实 pregel/_retry.py 还有两处真实工程细节——没有它们 LangGraph 的生产可用性会差一大截

1. Python 3.11+ 的 exc.add_note 注入任务身份_retry.py:145-146):

# _retry.py:30
SUPPORTS_EXC_NOTES = sys.version_info >= (3, 11)

# _retry.py:145 (在 except Exception 分支里)
if SUPPORTS_EXC_NOTES:
    exc.add_note(f"During task with name '{task.name}' and id '{task.id}'")

生产环境里一个链路几十个节点、每个节点可能多次重试——一个原始的 TimeoutErrorValidationError 在日志里显示的回溯栈没法直接指出”是哪个节点的哪次尝试出错”add_notePEP 678 给 Python 3.11 引入的 API——允许在异常对象上附加任意字符串注解、在 traceback 打印时自动显示。LangGraph 把任务名和 ID 注入进去、让运维能一眼看到:

Traceback (most recent call last):
  ...
httpx.TimeoutException: Request timed out after 30.0 seconds
During task with name 'llm_call_node' and id '7f3a1b2c-...'

这一行注解没改变异常类型、没破坏栈、也不影响 except 匹配——是纯增量的可观测性。SUPPORTS_EXC_NOTES 常量用 sys.version_info 在模块加载时判断、Python 3.10 及以下直接跳过这行——是完美的向后兼容设计。logger.info(...) 的 retry 日志(_retry.py:180)也带 exc_info=exc 让完整堆栈进监控。

2. _checkpoint_ns_for_parent_command——27 行处理分布式 runtime 的 namespace 特例_retry.py:58-84):

chapter 7.7 的代码简化写 recast_checkpoint_ns(ns) 就用来比较 parent graph——但真实源码在 Command.PARENT 分支里调的不是它、而是一个叫 _checkpoint_ns_for_parent_command 的专用辅助函数。看它的注释和实现:

# _retry.py:58
def _checkpoint_ns_for_parent_command(ns: str) -> str:
    """The checkpoint namespace is a `|`-separated path. Each segment is usually
    of the form `name:task_id` (e.g. `parent_first:<uuid>|node:<uuid>`), but the
    runtime may also insert a purely-numeric segment (e.g. `|1`) to disambiguate
    concurrent tasks (e.g. `parent_first:<uuid>|node:<uuid>|1`).

    Numeric segments are not real path levels, so we drop them before computing
    the parent namespace."""
    parts = ns.split(NS_SEP)
    # Drop any trailing numeric selectors for the current frame
    while parts and parts[-1].isdigit():
        parts.pop()
    # Drop the current frame segment itself
    if parts:
        parts.pop()
    # Drop any trailing numeric selectors for the parent frame
    while parts and parts[-1].isdigit():
        parts.pop()
    return NS_SEP.join(parts)

这 20 行代码在解决什么问题?LangGraph Platform(托管分布式 runtime)里多个并发任务需要被唯一区分——当两个 Send 任务目标是同一个节点、命名空间光靠 node_name:task_id 不够唯一(若 task_id 复用)、runtime 会插一个纯数字段|1|2)来消歧。但这些数字段不是真正的图层级——计算父图 namespace 时得跳过它们。

所以parts.pop() 要做三轮:先剥掉当前帧的数字后缀、再剥掉当前帧本身、再剥掉父帧的数字后缀recast_checkpoint_ns 是通用的 ns 规整、但对这个特殊的”父图 Command 路由”场景不够——需要专门函数。

本地开发通常看不到 |1 段、这个函数执行路径短;但一旦部署到 LangGraph Platform、同一节点的多并发实例立刻触发此处逻辑。跑本地单机跑得好好的图一上分布式可能因为 Command 路由错传到错误的父图的 bug 就是这种地方。这种”本地 vs 分布式 runtime 不对称”的处理是 LangGraph Platform 能从 open-source LangGraph 无缝升级的关键——代码里为它留了明确的接口点。

7.8 缓存策略

LangGraph 1.1.6 引入了节点级别的缓存支持,避免对确定性节点的重复计算。

7.8.1 CachePolicy 和 CacheKey

@dataclass
class CachePolicy:
    key_func: Callable = default_cache_key  # 缓存键生成函数
    ttl: int | None = None                  # 过期时间(秒)

class CacheKey(NamedTuple):
    ns: tuple[str, ...]   # 命名空间(标识节点身份)
    key: str              # 缓存键(输入哈希)
    ttl: int | None       # 过期时间

缓存键由三部分组成:

  • 命名空间(CACHE_NS_WRITES, identifier(proc), name) —— 标识是哪个节点的哪个实现
  • xxh3_128_hexdigest(key_func(input)) —— 输入数据的哈希
  • TTL:可选的过期时间

7.8.2 缓存匹配流程

缓存匹配在两个地方发生:

1. 超步开始时(tick 之后)

while loop.tick():
    for task in loop.match_cached_writes():
        loop.output_writes(task.id, task.writes, cached=True)
    # 只执行没有缓存命中的任务
    for _ in runner.tick(
        [t for t in loop.tasks.values() if not t.writes],
        ...
    ):

match_cached_writes 批量查询缓存:

def match_cached_writes(self):
    if self.cache is None:
        return ()
    cached = {
        (t.cache_key.ns, t.cache_key.key): t
        for t in self.tasks.values()
        if t.cache_key and not t.writes  # 只查没有写入的任务
    }
    matched = []
    for key, values in self.cache.get(tuple(cached)).items():
        task = cached[key]
        task.writes.extend(values)  # 将缓存的写入填充到任务
        matched.append(task)
    return matched

2. 任务写入保存时(put_writes)

def put_writes(self, task_id, writes):
    super().put_writes(task_id, writes)
    if self.cache is None or not hasattr(self, "tasks"):
        return
    task = self.tasks.get(task_id)
    if task is None or task.cache_key is None:
        return
    # 异步保存到缓存
    self.submit(
        self.cache.set,
        {(task.cache_key.ns, task.cache_key.key):
         (task.writes, task.cache_key.ttl)}
    )
flowchart TB
    START[超步开始] --> PREP[prepare_next_tasks\n创建任务,计算 cache_key]
    PREP --> MATCH[match_cached_writes\n批量查询缓存]
    MATCH --> HIT{缓存命中?}
    HIT -->|是| FILL[填充 task.writes\n发出 cached=True 事件]
    HIT -->|否| EXEC[runner.tick 执行任务]
    EXEC --> COMMIT[commit 提交结果]
    COMMIT --> SAVE[put_writes 保存写入]
    SAVE --> CACHE_SAVE[异步保存到缓存]
    FILL --> AFTER[after_tick 应用写入]
    CACHE_SAVE --> AFTER

7.8.3 缓存键的生成

默认的 default_cache_key 函数使用 pickle 序列化输入后进行哈希:

# _internal/_cache.py
def default_cache_key(input: Any) -> bytes:
    return pickle.dumps(input)

prepare_single_task 中,缓存键的完整计算过程:

if cache_policy:
    args_key = cache_policy.key_func(val)  # 用户定义或默认的键函数
    cache_key = CacheKey(
        (CACHE_NS_WRITES, identifier(proc) or "__dynamic__", name),
        xxh3_128_hexdigest(
            args_key.encode() if isinstance(args_key, str) else args_key
        ),
        cache_policy.ttl,
    )

identifier(proc) 返回处理器的唯一标识(通常基于函数的模块和名称),确保不同实现的同名节点不会共享缓存。

7.9 动态任务创建:accept_push

在 Functional API 中,任务可以在执行过程中动态创建子任务。这通过 PregelLoop.accept_push 方法实现:

def accept_push(self, task, write_idx, call=None):
    """接受一个来自正在执行的任务的 PUSH 请求。"""
    pushed = prepare_single_task(
        (PUSH, task.path, write_idx, task.id, call),
        None,
        checkpoint=self.checkpoint,
        ...
    )
    if pushed:
        # 发出调试事件
        self._emit("tasks", map_debug_tasks, [pushed])
        # 保存新任务
        self.tasks[pushed.id] = pushed
        # 匹配已有写入
        if not self.is_replaying:
            self._match_writes({pushed.id: pushed})
        return pushed

accept_push 作为 schedule_task 回调传递给 PregelRunner.tick,在任务执行过程中被调用:

# _runner.py 中的 _call 函数
if next_task := schedule_task(task(), scratchpad.call_counter(), call):
    if next_task.writes:
        # 已经有结果(从缓存或 Checkpoint 恢复),直接返回
        fut = concurrent.futures.Future()
        ret = next((v for c, v in next_task.writes if c == RETURN), MISSING)
        fut.set_result(ret)
    else:
        # 调度新任务到线程池
        fut = submit()(
            run_with_retry, next_task, retry_policy, ...,
            __next_tick__=True,  # 确保写入先提交
        )
        SKIP_RERAISE_SET.add(fut)
        futures()[fut] = next_task

动态创建的子任务使用 __next_tick__=True,确保父任务的当前写入先被提交到 PregelLoop,然后子任务才开始执行。这维护了写入的因果顺序。

7.10 异常处理的分层设计

LangGraph 的异常处理分为多个层次:

flowchart TB
    subgraph "异常分层"
        direction TB
        L1["GraphBubbleUp(基类)"]
        L2["GraphInterrupt\n(中断信号)"]
        L3["ParentCommand\n(跨图命令)"]
        L4["普通 Exception\n(可重试)"]
        L5["CancelledError\n(被取消)"]

        L1 --- L2
        L1 --- L3
    end

    subgraph "处理策略"
        direction TB
        S1["GraphBubbleUp: 不重试,不停止其他任务,向上传播"]
        S2["GraphInterrupt: 保存中断信息,合并后向上传播"]
        S3["ParentCommand: 路由到目标图,或继续冒泡"]
        S4["Exception: 匹配重试策略,超限后保存错误并停止其他任务"]
        S5["CancelledError: 保存错误到 Checkpoint"]
    end

    L1 --> S1
    L2 --> S2
    L3 --> S3
    L4 --> S4
    L5 --> S5

run_with_retry 中的异常处理

while True:
    try:
        task.writes.clear()
        return task.proc.invoke(task.input, config)
    except ParentCommand as exc:
        # 检查 Command 的目标图
        if cmd.graph in (ns, recast_checkpoint_ns(ns), task.name):
            # 当前图处理
            for w in task.writers:
                w.invoke(cmd, config)
            break
        elif cmd.graph == Command.PARENT:
            # 设置父图命名空间后冒泡
            exc.args = (replace(cmd, graph=parent_ns),)
        raise
    except GraphBubbleUp:
        raise  # 直接向上传播
    except Exception as exc:
        if not retry_policy:
            raise
        # 匹配并执行重试策略...

这种分层设计确保了:

  • 控制流信号(Interrupt、Command)不会被重试逻辑意外捕获
  • 普通异常可以根据策略重试
  • 异常信息会被保存到 Checkpoint,支持调试和恢复

7.11 执行过程中的写入流

让我们追踪一次完整的写入流,从节点函数返回到 Channel 更新:

sequenceDiagram
    participant Node as 用户函数
    participant CW as ChannelWrite
    participant Writes as task.writes (deque)
    participant Runner as PregelRunner
    participant PL as PregelLoop
    participant Chan as Channels

    Note over Node: return {"count": 5}

    Node->>CW: invoke({"count": 5})
    Note over CW: _get_updates 提取元组
    CW->>CW: do_write(config, writes)
    Note over CW: config[CONF][CONFIG_KEY_SEND]
    CW->>Writes: extend([("count", 5)])

    Note over CW: 第二个 ChannelWrite (路由)
    CW->>Writes: extend([("branch:to:next", None)])

    Note over Runner: task.proc.invoke 完成

    Runner->>Runner: commit(task, None)
    Runner->>PL: put_writes(task.id, task.writes)

    Note over PL: 保存到 checkpoint_pending_writes
    Note over PL: 异步保存到 Checkpointer

    Note over PL: after_tick()
    PL->>Chan: apply_writes(checkpoint, channels, tasks, ...)
    Note over Chan: channels["count"].update([5])
    Note over Chan: channels["branch:to:next"].update([None])

关键观察:写入在任务执行过程中被收集到 task.writes 中,但直到 after_tick() 调用 apply_writes 时才真正应用到 Channel。这就是 BSP 模型的”步内隔离”——同一超步中的不同任务看不到彼此的写入。

7.12 设计决策分析

为什么使用弱引用(weakref)?

PregelRunner 中的 submitput_writes 都使用弱引用:

class PregelRunner:
    def __init__(self, *, submit, put_writes, ...):
        self.submit = submit          # weakref.WeakMethod
        self.put_writes = put_writes  # weakref.WeakMethod

这防止了循环引用导致的内存泄漏。PregelRunner 引用 PregelLoop 的方法,而 PregelLoop 可能间接引用 PregelRunner(通过 config 中注入的回调)。弱引用确保了当 PregelLoop 退出上下文管理器后,所有相关对象都能被正确回收。

为什么 writes 使用 deque 而非 list?

deque.extend 是线程安全的(在 CPython 中),而 list.extend 不是。在并行执行场景中,多个 writer 可能同时向同一个 writes deque 追加数据,deque 的原子性保证了不会出现数据损坏。

为什么 PUSH 子任务使用 SKIP_RERAISE_SET?

当任务 A 动态创建子任务 B 时,B 的异常应该由 A 处理(通过返回的 Future),而不是由 PregelRunner_panic_or_proceed 处理。SKIP_RERAISE_SET 记录了哪些 Future 的异常应该被跳过:

# _runner.py
fut = submit()(run_with_retry, next_task, ...)
SKIP_RERAISE_SET.add(fut)  # 标记:异常由父任务处理
futures()[fut] = next_task

这实现了”异常沿调用链传播”的语义,而非”所有异常在顶层汇合”的语义。

为什么重试时清除 writes 而不是创建新 deque?

task.writes.clear()  # 而不是 task = replace(task, writes=deque())

因为 PregelExecutableTaskfrozen=True 的 dataclass,不能替换字段。同时,config 中注入的 CONFIG_KEY_SEND 引用的是原始 deque 的 extend 方法——如果创建新 deque,写入会丢失到旧对象中。clear() 在不改变引用的前提下清空内容。

7.12.5 langgraph/pregel/ 22 文件 11392 行的真实骨架

读到这里你已经接触了 runner、executor、retry、algo 四个核心模块——把它们放在整个 pregel/ 目录的尺度看——

文件本章涉及角色
main.py3718-Pregel 主类——compile、invoke、stream、astream 的用户入口
_loop.py1423间接PregelLoop 超步主循环——驱动 runner、处理 checkpoint、管理 channel 版本
_algo.py1258部分任务规划——prepare_next_tasks / apply_writes / channel version 递增算法
remote.py1191-RemoteGraph——跨进程调用其他 LangGraph Graph 的 client
_runner.py768核心本章 §7.4 主角——FuturesDict + PregelRunner.tick/atick
_retry.py318§7.7run_with_retry / arun_with_retry + 指数退避实现
_draw.py294-绘图为 mermaid / graphviz 的可视化
protocol.py288部分PregelProtocol 抽象——Pregel 和 RemoteGraph 的公共超类
debug.py279-print_checkpoint 等 debug helper
_read.py277间接PregelNode.read / channel 读路径
_call.py269-@task / @entrypoint 装饰器实现
_messages.py250-Message 相关的 reducer
_executor.py223§7.5-7.6BackgroundExecutor + AsyncBackgroundExecutor
_utils.py218间接小工具函数
_write.py192§7.11ChannelWrite 以及写入路径
_io.py174-stream mode 的 yield 路径
_validate.py / _checkpoint.py / types.py / __init__.py / _log.py / _config.py≤ 120-校验 / 公共类型 / 薄壳

三条值得记住的观察——

  1. main.py 3718 行占 32.6%——单一 Pregel 类把 compile / invoke / stream / astream / get_graph / get_state / update_state / get_history / with_types 等所有 public API 塞在一起——这是 LangGraph “一个类是主入口” 的设计取舍——对立面是 LangChain 把 LCEL 切成 Runnable / RunnableLambda / RunnableParallel 等十几个类
  2. _algo.py 1258 行是第三大——任务规划(看哪些 channel 有新值、对应激活哪些 node、生成 PULL 任务列表)的纯算法层独立成文件——这让 PregelLoop 可以不关心”下一步做什么”、只负责”怎么做”
  3. remote.py 1191 行接近 runner——LangGraph 把跨进程远程调用投入了和本地调度差不多的代码量——这不是玩具级 RPC、是承载 LangGraph Platform 生产集群的关键路径——对比 LangChain 没有等价设施、可见两个项目运营生态的不同定位

7.12-bis PregelRunner 源码的几个容易误读之处

_runner.py 的 745 行、有几个地方第一次读容易误解——集中澄清能省去后续大量试错。截至 main 分支 2026-04-22 的版本账本——

误读 1:yield 不是协程——是”让调用方插入流式消费”

PregelRunner.tick 的签名返回 Iterator[None]——方法体里散布着 yield 语句:

# _runner.py: PregelRunner.tick 内部
yield  # 1: 调度前让出,给调用方一次改变心意的机会
# ...
while len(futures) > 0:
    done, inflight = concurrent.futures.wait(...)
    # 处理完成的 futures
    if _should_stop_others(done):
        break
    yield  # 2: 每 wait 一批完成后让出
# ...
yield  # 3: 最后的收尾让出

这三个 yield 不是 asyncio 的”让出事件循环”——它们是 Python 生成器的 yield唯一作用是让 PregelLoop 调用方取回控制权PregelLoop 的主循环其实是这样消费:

# _loop.py 近似逻辑
for _ in runner.tick(tasks, ...):
    # 每一次 yield 后、在这里处理流式输出
    for chunk in self._stream_pending():
        self._caller_queue.put(chunk)

不是协程 = 不需要 async 即可实现”每完成一个 task 就输出一次”——这是 LangGraph 流式输出能在同步 API 里工作的关键设计。async 版本 atick 才用真正的 await、但机械上结构一致。新人最容易犯的错是把 Iterator[None] 理解成”异步”、结果 debug 时把断点打在错误位置。

误读 2:SKIP_RERAISE_SET 是模块级 WeakSet——不是实例属性

源码第 60 行定义——

SKIP_RERAISE_SET: weakref.WeakSet[concurrent.futures.Future | asyncio.Future] = (
    weakref.WeakSet()
)

这是模块级全局变量、所有 PregelRunner 实例共享一个 WeakSet。为什么能这样?因为 Future 对象全局唯一——A Runner 的 fut_1 不可能和 B Runner 的 fut_2 ID 冲突。WeakSet 的关键在于——当 Future 被垃圾回收、自动从 Set 里消失、不需要显式清理。这避免了内存泄漏——长时间运行的 LangGraph 服务如果用普通 set、每次 PUSH 子任务都往里塞 Future、永远不清理、最终会 OOM。这种”全局 WeakSet + 自动清理”的设计在分布式系统的 Future 追踪里很常见、但新手容易盯着实例属性找不到它。

误读 3:FuturesDict 继承 dict——不是 Mapping

第 63 行——

class FuturesDict(Generic[F, E], dict[F, PregelExecutableTask | None]):
    event: E
    callback: weakref.ref[...]
    counter: int
    done: set[F]
    lock: threading.Lock

它是 dict 的子类、有真实的 []pop()len() 能力——不是抽象接口。这带来两个真实后果——

  1. futures[fut] = t 触发 __setitem__ 重载、在 super 调用后加计数器递增和 add_done_callback
  2. futures.pop(fut) 是 dict 原生行为、不触发计数器递减——计数器递减在 on_done 回调里发生、独立于 pop

这个”set 时加计数、回调时减计数”的非对称设计很微妙——如果想自己改 Runner、不理解这个会把计数器搞乱、整个并发语义崩坏。线程安全靠 lock: threading.Lock 守护——event.wait() 的唤醒靠 on_doneevent.set()。源码里 lock 字段的使用范围故意收窄到”修改 counter 和 done”这两步、避免锁粒度过大。

误读 4:单任务快速路径不是优化——是正确性要求

§7.4.3 提到”单任务快速路径避免线程池开销”——但真正的动机更微妙。**在用户在节点里调 interrupt() 触发 GraphInterrupt 时、当前线程栈里才能保留足够信息让 traceback 指向用户代码。**如果经线程池跳一次、traceback 会断在 ThreadPoolExecutor._work 某一行、对用户没有 debug 价值。

快速路径还有一个额外好处——LangSmith 追踪时、OpenTelemetry 的 trace context 是 thread-local 的、快速路径让 span 自然挂在用户调用栈上、不用显式 copy_context()。多任务路径才需要 ctx.run(...) 做 context 传递。这是把”正确性的要求”包装成”性能优化”的典范——从外面看是快速路径、本质是保持 traceback / trace context 连续。

7.12a super-step vs DAG / 协程 / Actor——四种并发调度模式对照

LangGraph 的”super-step(超步)“来自 Google 2010 年 Pregel paper 的 BSP(Bulk Synchronous Parallel)模型——它不是常见的并发范式。把它和当代工程里最主流的三种调度模式对照、能更清楚看到 LangGraph 的取舍。

7.12a.1 模式四象限

flowchart LR
    subgraph "super-step (BSP)"
        direction TB
        BSPS["步 N:所有 task 并行执行"] --> BSPA["after_tick:原子 apply_writes"]
        BSPA --> BSPN["步 N+1:按新 channel 版本激活 task"]
    end
    subgraph "DAG (Airflow / Prefect)"
        direction TB
        DAGA["task A 完成"] --> DAGB["依赖 A 的 B 立即可跑"]
        DAGB --> DAGC["一跑到完、无重复访问同一节点"]
    end
    subgraph "协程 pipeline (asyncio / Go channel)"
        direction TB
        CO1["producer →"] --> CO2["stage1 →"]
        CO2 --> CO3["stage2 →"]
        CO3 --> CO4["consumer"]
    end
    subgraph "Actor (Akka / Erlang / Ray)"
        direction TB
        ACT1["actor A ←→ mailbox"]
        ACT2["actor B ←→ mailbox"]
        ACT1 -.-> ACT2
        ACT2 -.-> ACT1
    end

7.12a.2 语义对比表

维度super-step (LangGraph)DAG (Airflow)协程 (asyncio)Actor (Ray)
同一节点可多次激活是(多步循环)否(一次 run)流式、不谈”节点”身份是(actor 接收多消息)
状态模型共享 Channel + BSP 隔离每 task 独立输入输出隐式在队列里每 actor 私有可变状态
并行粒度步内全部 task 并行依赖满足即并行stage 间 pipeline 并行无限制、actor 间异步
步间屏障是(apply_writes 原子)无全局屏障
失败恢复Checkpoint → 超步粒度重放task 粒度重试无原生支持无原生支持、需 snapshot
中断 / 人类介入一等公民(GraphInterrupt)需手搭需手搭需手搭
循环支持一等公民(Channel 版本递增)困难(DAG 定义不含环)需手搭困难(消息循环易死锁)
典型规模10-100 节点的 agent 图100-1000 task 的 ETL生产者/消费者流1000+ actor 集群
调度开销中(每步屏障)低(纯拓扑)
观测粒度每 task 的 writes + checkpoint每 task 日志actor 消息追踪

7.12a.3 为什么 LangGraph 选择 super-step?

核心原因有三——

1. Agent 的天然循环属性——LLM agent 的执行是”思考 → 调工具 → 看结果 → 再思考”的循环、同一个 agent 节点会被激活 10 次甚至更多。DAG 模型不支持循环(或者要展开成重复 task、观测困难);Actor 模型支持但状态跟踪麻烦(每个 actor 实例各自的 state 要对账)。super-step 的 Channel 版本机制天然适配——Channel 值变了就再激活一次、同一个 agent 节点名在历史上有 N 次 PregelExecutableTask 实例、都被 checkpoint 完整记录。

2. 人类介入的确定性语义——用户在第 5 步触发 interrupt、系统应当知道”这一步没跑完、下次从这一步开始”。BSP 的””是天然的 checkpoint 粒度——每步之间原子 apply_writes、之间不保存半成品状态——所以恢复逻辑极其简单(从最近 checkpoint 重放一步)。DAG 如果中途中断、已跑过但未产出的 task 难以幂等恢复;协程被中断基本等于丢数据。

3. 写入隔离带来正确性保证——一步内的 N 个并行 task 看到的 Channel 值是这步开始时的快照、不会读到”同步被另一 task 刚写的值”。这避免了一类最难调试的 bug:并发 task 互相看到对方”半成品”的更新、导致不可复现的行为。代价是性能——没有 write-through——但对 Agent 场景(延迟由 LLM 主导、不是调度主导),这个代价几乎免费。

代价面也真实存在——

  1. 步间屏障限制了吞吐——纯数据流场景(ETL、推理流水)中 super-step 明显不如 DAG 或协程流水
  2. step 计数器隐含上限——默认 recursion_limit=25、agent 循环超过这个数就抛异常——需要显式拉大
  3. 观测的维度高——同一节点多步激活让日志量线性放大(Channel 历史、多个 task_id 对应同一 name)

LangGraph 不是”最快的”调度框架、它是”最适合 agent 循环 + 人类介入 + checkpoint 恢复”的调度框架——理解这三个约束才能理解为什么它选择 BSP 而非更常见的 DAG 或协程。这也是为什么数据工程团队把 LangGraph 当 ETL 引擎用时常感到”太重”——它不是解决那个问题的。

7.13 小结

本章深入分析了 LangGraph 的任务调度和并行执行机制。核心要点回顾:

  1. PregelExecutableTask 是不可变的任务容器,其 proc 串联了用户逻辑和写入器,writes deque 作为线程安全的写入缓冲区
  2. PULL vs PUSH:PULL 任务由 Channel 版本变更触发,输入从 Channel 读取;PUSH 任务由 Send API 创建,输入由调用者指定,不检查版本
  3. PregelRunner 通过 FuturesDict 管理并发任务,支持单任务快速路径、超时控制、“任一失败则全部停止”语义
  4. BackgroundExecutor 使用线程池 + copy_context 实现同步并行;AsyncBackgroundExecutor 使用 asyncio + Semaphore 实现异步并行和并发控制
  5. 重试策略支持指数退避、随机抖动、最大次数限制,以及按异常类型匹配的多策略组合。重试时清除旧写入并标记恢复模式
  6. 缓存策略通过节点身份 + 输入哈希生成 CacheKey,在超步开始时批量查询,命中后直接填充 writes 跳过执行
  7. 写入流遵循 BSP 模型:执行期间收集到 deque,commit 时保存到 Checkpointer,after_tick 时统一应用到 Channel

物理层面:整个 pregel/ 目录 22 文件 11392 行——本章主讲的 _runner.py(745)+ _executor.py(223)+ _retry.py(318)合计 1286 行、占 11.3%——是理解 LangGraph 运行时的最小必读子集

注:本章完稿时(2026-04-22)核对 GitHub main 分支 _runner.py 实际为 745 行而非先前版本的 768 行——上游在 1.1.x 期间对 _call 辅助函数做过精简。本书其余章节提到的行数以此版本账本为准。

7.14 与本书其他章节的串联——任务调度如何穿插全书

任务调度不是孤立的主题、它串联了本书几乎所有其他章节。把 §7 放在全书坐标系里、能更清楚每一节外接了谁——

7.14.1 向上依赖——state / channels / checkpoint

本章内容依赖章节关联点
PULL 任务从 Channel 读输入§3 StateGraph + §4 Channels_proc_input 通过 proc.channels 映射读取 Channel;reducer 是 Channel 的核心能力
task.writesapply_writes 里汇入 Channel§4 Channels写入的合并规则来自 Channel 类型(LastValue / Topic / BinaryOperatorAggregate 等)
任务 ID 由 checkpoint_id + step + name 生成§8 Checkpoint_match_writes 靠确定性 ID 把恢复的任务对回已持久化的 writes
CONFIG_KEY_RESUMING 通知重试§8 Checkpoint + §11 Subgraphs子图据此跳过已完成步骤
CachePolicy 存储的 (writes, ttl)§8 Checkpoint共用 put_writes 基础设施、但写的是 cache namespace 不是 checkpoint namespace

7.14.2 向下支持——interrupt / command / send / streaming / multi-agent

本章内容支撑章节关联点
GraphBubbleUp 不触发重试§9 InterruptGraphInterrupt 继承 GraphBubbleUp、被 runner 识别后合并后抛出
ParentCommand 的跨图路由§10 Commandrecast_checkpoint_ns / _checkpoint_ns_for_parent_command 专门为 Command 服务
PUSH 任务的 Send.arg 输入机制§12 SendSend API 是 PUSH 的 sole producer(除 Functional API 外)
yield 让出控制权给 PregelLoop§13 StreamingRunner 每完成一个 task 就 yield 一次、PregelLoop 趁机把 writes 流式输出
max_concurrency Semaphore§13 Streaming + §17 Multi-agent多 agent 并发调工具时、这个参数是防 API 限流的安全阀
schedule_task 回调的 PUSH 子任务§11 Subgraphs + §17 Multi-agent子图中的动态子任务、层层通过 accept_push 冒泡到顶层 Runner

7.14.3 API 索引——本章出现过的符号速查

用本章时作为速查表:把所有本章介绍的模块、类、函数列成一张表、方便回头查阅时按 Ctrl+F 精准定位。

符号类型来源文件本章小节
PregelExecutableTaskfrozen dataclasstypes.py§7.2
PregelRunnerpregel/_runner.py§7.4
PregelRunner.tick同步方法_runner.py§7.4.1
PregelRunner.atick异步方法_runner.py§7.6
PregelRunner.commit方法_runner.py§7.4.4
FuturesDictdict 子类_runner.py:63§7.4.2
_should_stop_others自由函数_runner.py:379§7.4.5
_panic_or_proceed自由函数_runner.py§7.4.5
SKIP_RERAISE_SET模块级 WeakSet_runner.py:60§7.12-bis
BackgroundExecutor上下文管理器pregel/_executor.py§7.5
AsyncBackgroundExecutor异步上下文管理器_executor.py§7.6
next_tick辅助函数_executor.py§7.5.2
gated协程包装器_executor.py§7.6.1
run_with_retry同步函数pregel/_retry.py§7.7
arun_with_retry异步函数_retry.py§7.7
_should_retry_on辅助函数_retry.py§7.7.3
_checkpoint_ns_for_parent_command辅助函数_retry.py:58§7.7.5
SUPPORTS_EXC_NOTES模块级常量_retry.py:30§7.7.5
RetryPolicyNamedTupletypes.py§7.7.1
CachePolicydataclasstypes.py§7.8.1
CacheKeyNamedTupletypes.py§7.8.1
default_cache_key自由函数_internal/_cache.py§7.8.3
prepare_next_tasks规划器入口pregel/_algo.py§7.4.6
apply_writes写入应用函数_algo.py§7.4.6
prepare_single_task单任务准备_algo.py§7.3
PregelLoop.accept_push方法pregel/_loop.py§7.9
PregelLoop.match_cached_writes方法_loop.py§7.8.2
GraphBubbleUp异常基类errors.py§7.10
GraphInterrupt异常子类errors.py§7.10
ParentCommand异常errors.py§7.10
CONFIG_KEY_SEND / CONFIG_KEY_READ / CONFIG_KEY_RESUMING / CONFIG_KEY_TASK_ID配置键常量constants.py§7.2.3 / §7.7.4
NO_WRITES / INTERRUPT / ERROR / RETURN写入标签常量constants.py§7.4.4 / §7.9

这个表是本章最硬核的工程资产——任何时候读 LangGraph 源码碰到本表中符号、可以直接回这里找所在章节。表中所有 行号/函数名 以 2026-04-22 的 main 分支为准、后续版本如有调整、以官方仓库 git blame 为准。

7.14.4 一个串联的心智图

flowchart TB
    subgraph "编译期 (§3 §4 §5)"
        SG[StateGraph] --> CMP[compile]
        CH[Channels + reducer] --> CMP
        CMP --> NODE["PregelNode 字典\ntrigger_to_nodes 倒排索引"]
    end

    subgraph "运行期核心 (§6 §7)"
        NODE --> LOOP["PregelLoop.tick\n(§6)"]
        LOOP --> ALGO["prepare_next_tasks\n(§7.4.6)"]
        ALGO --> TASKS[PregelExecutableTask 列表]
        TASKS --> CACHE["match_cached_writes\n(§7.8)"]
        CACHE --> RUNNER["PregelRunner.tick\n(§7.4)"]
        RUNNER --> EXEC["BackgroundExecutor\n(§7.5-7.6)"]
        EXEC --> RETRY["run_with_retry\n(§7.7)"]
        RETRY --> COMMIT["put_writes"]
        COMMIT --> APPLY["apply_writes\n→ updated_channels"]
        APPLY --> LOOP
    end

    subgraph "控制流 (§9 §10 §12)"
        RETRY -.->|"GraphInterrupt"| INT["§9 Interrupt\n合并中断"]
        RETRY -.->|"ParentCommand"| CMD["§10 Command\n跨图路由"]
        COMMIT -.->|"Send write"| SEND["§12 Send\nPUSH 新任务"]
        SEND -.-> ALGO
    end

    subgraph "持久化 (§8)"
        COMMIT -.-> CKPT["Checkpointer"]
        LOOP -.-> CKPT
        CKPT -.-> RESUME["恢复时 _match_writes"]
        RESUME -.-> ALGO
    end

    subgraph "输出 (§13)"
        RUNNER -.->|"yield"| STREAM["§13 Streaming\nstream_mode"]
        COMMIT -.-> STREAM
    end

这张图要传达的最重要一条信息是——任务调度(§7)不是一个”模块”、它是 LangGraph 运行时的中央神经系统。所有用户可观察的行为(interrupt 弹出、command 跳转、stream 输出、checkpoint 恢复)最终都要经过 PregelRunner.tick → run_with_retry → commit → put_writes 这条路径。反过来说——掌握了本章你就掌握了全书所有”运行期”章节的共同底座、后续读 §8-§13 都能迅速把新概念挂到本章的骨架上。

读完本章后建议的学习顺序:→ §8 Checkpoint(深化 put_writes 的持久化路径)→ §9 Interrupt(深化 GraphBubbleUp 的分支)→ §12 Send(深化 PUSH 任务的来源)→ §13 Streaming(深化 yield 的消费端)→ §11 Subgraphs / §17 Multi-agent(看整套机制在跨图和多 Agent 场景的组合)。这条路径按”由近及远”的耦合度排序、不会出现”前置章节未读导致卡壳”的情况。

7.14.5 本章与工程实践的三条直接收益

最后把抽象理论落到三条你做 LangGraph 项目时立即能用的工程结论上——

结论 1:线性图请勿多开线程。如果你的图是”开始 → A → B → C → 结束”这种无分叉的线性结构、每步只有一个节点激活、PregelRunner 的单任务快速路径不会占用线程池——给线程池配大了也没用、线程池大小只影响有分叉的步。配置 max_concurrency 对这种图也无效。线性图想提速、改异步(ainvoke / astream)比改 max_concurrency 更直接。

结论 2:重试策略要匹配失败类型retry_policy 支持按异常类型配多个策略——LLM API 的 RateLimitError 给 5 次、每次退避 2 秒;网络瞬时抖动的 ConnectionError 给 3 次、每次 0.1 秒——不要一股脑全配一个 max_attempts=3。生产环境中这个细节能把 agent 的实际成功率从 85% 推到 98%——底层是利用了 _should_retry_on 的多策略扫描逻辑(§7.7.3)。

结论 3:跨图 Command 路由如果在本地调通但上分布式出错——先看 _checkpoint_ns_for_parent_command。§7.7.5 讲过、LangGraph Platform 运行时会插数字段(|1|2)消歧、parent namespace 计算需要跳过这些段。本地单机几乎触发不到、但上了多并发的 Platform 立刻暴露。遇到”Command.PARENT 没路由到预期图”的 bug、优先追查 checkpoint_ns 的字符串格式——大概率问题在此。