Appearance
第7章 任务调度与并行执行
7.1 引言
上一章我们剖析了 Pregel 执行循环的宏观架构——tick()、after_tick() 和 BSP 超步模型。但在每个超步内部,还有一个同样复杂的世界:多个任务如何被并行调度?任务失败时如何重试?缓存如何避免重复计算?PUSH 任务和 PULL 任务在运行时有何不同?
本章将深入 LangGraph 的任务执行层,涉及以下核心组件:
PregelExecutableTask(types.py)—— 可执行任务的数据结构PregelRunner(pregel/_runner.py)—— 任务调度器,管理并行执行和结果收集BackgroundExecutor/AsyncBackgroundExecutor(pregel/_executor.py)—— 线程池和 asyncio 并行原语run_with_retry/arun_with_retry(pregel/_retry.py)—— 重试逻辑- 缓存匹配机制 ——
cache_policy和CacheKey的协作
这些组件共同实现了一个高效的并行执行框架,在保证正确性的前提下最大化吞吐量。
本章要点
PregelExecutableTask是任务执行的最小单元,包含输入、处理器、写入缓冲、配置等全部信息PregelRunner通过FuturesDict管理并发任务,支持"任一失败则全部停止"的语义- PULL 任务由 Channel 版本变更触发,输入从 Channel 读取;PUSH 任务由 Send API 创建,输入由调用者指定
BackgroundExecutor使用线程池实现同步并行,AsyncBackgroundExecutor使用 asyncio 任务实现异步并行- 重试策略支持指数退避、抖动、最大重试次数,以及按异常类型匹配的多策略组合
- 缓存策略通过
CacheKey关联节点身份和输入哈希,支持 TTL 过期
7.2 PregelExecutableTask:任务的全貌
PregelExecutableTask 定义在 types.py 中,是一个不可变的 dataclass:
python
@dataclass(frozen=True)
class PregelExecutableTask:
name: str # 节点名称
input: Any # 任务输入
proc: Runnable # 可执行处理器(bound + writers 的组合)
writes: deque[tuple[str, Any]] # 写入缓冲区
config: RunnableConfig # 完整的运行配置
triggers: Sequence[str] # 触发此任务的 Channel 列表
retry_policy: Sequence[RetryPolicy] # 重试策略
cache_key: CacheKey | None # 缓存键(如果启用了缓存)
id: str # 全局唯一的任务 ID
path: tuple[str | int | tuple, ...] # 任务路径(用于排序和标识)
writers: Sequence[Runnable] = () # 写入器引用
subgraphs: Sequence[PregelProtocol] = () # 子图引用虽然标记为 frozen=True(不可变),但 writes 字段是一个 deque——它的引用不可变,但内容可变。这个设计使得任务执行过程中可以向 writes 追加数据,同时防止意外替换整个 writes 对象。
7.2.1 任务 ID 的生成
任务 ID 是通过确定性哈希函数生成的,确保同一个 Checkpoint 状态下,相同的任务总是获得相同的 ID:
python
# 对于 PULL 任务
task_id = task_id_func(
checkpoint_id_bytes, # Checkpoint ID 的字节表示
checkpoint_ns, # 命名空间(如 "parent|agent")
str(step), # 步数
name, # 节点名称
PULL, # 任务类型
*triggers, # 触发 Channel
)
# 对于 PUSH 任务(Send API)
task_id = task_id_func(
checkpoint_id_bytes,
checkpoint_ns,
str(step),
name,
PUSH,
task_path_str(parent_path), # 父任务路径
str(idx), # 在父任务写入中的索引
)LangGraph 1.1.6 支持两种哈希函数:xxhash(v2 Checkpoint 格式,更快)和 uuid5(v1 格式,兼容旧版)。确定性的 ID 是 Checkpoint 恢复的关键——恢复后重新计算的任务 ID 与保存的 pending writes 中的 task ID 必须匹配,这样 _match_writes 才能正确地将已保存的写入结果关联到重建的任务。
7.2.2 proc 的构成
PregelExecutableTask.proc 是一个 RunnableSeq,它将用户逻辑和写入器串联:
执行 task.proc.invoke(task.input, task.config) 时:
- 首先调用用户函数,传入从 Channel 读取的状态
- 用户函数返回状态更新(如
{"count": 5}) - 第一个
ChannelWrite将更新转化为 Channel 写入元组,通过CONFIG_KEY_SEND发送 - 第二个
ChannelWrite(如果有边)将路由信号写入目标节点的触发 Channel
7.2.3 config 中注入的关键函数
每个任务的 config 中注入了几个关键回调,使得任务执行过程中能与 PregelLoop 交互:
python
config = patch_config(
config,
configurable={
CONFIG_KEY_TASK_ID: task_id,
CONFIG_KEY_SEND: writes.extend, # 写入收集器
CONFIG_KEY_READ: partial( # 状态读取器
local_read, scratchpad, channels, managed,
PregelTaskWrites(path, name, writes, triggers),
),
CONFIG_KEY_CHECKPOINTER: checkpointer,
CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns,
CONFIG_KEY_SCRATCHPAD: scratchpad,
CONFIG_KEY_RUNTIME: runtime,
},
)CONFIG_KEY_SEND:绑定到writes.extend——当ChannelWrite.do_write被调用时,写入元组被追加到任务的writesdeque。deque.extend是线程安全的。CONFIG_KEY_READ:绑定到local_read函数——条件边通过此函数读取"应用了当前任务写入后"的状态快照。这确保条件判断基于最新状态。
7.3 PULL 任务 vs PUSH 任务
LangGraph 中有两种根本不同的任务触发方式: