LangGraph 设计与实现
第12章 Send 与动态并行
第12章 Send 与动态并行
12.1 引言
在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。
这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。
本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。
本章要点
Send对象的数据结构与语义——向指定节点发送自定义输入TopicChannel 的 pub/sub 机制——支持多值累积与逐步消费- Map-Reduce 模式的完整实现——从 fanout 到 aggregation
- 动态 fanout 的调度细节——
prepare_push_task_send的执行链路 - 条件边返回多个 Send 的工程实践与限制
12.2 Send 对象的设计
12.2.1 数据结构
Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:
class Send:
"""A message or packet to send to a specific node in the graph."""
__slots__ = ("node", "arg")
node: str
arg: Any
def __init__(self, /, node: str, arg: Any) -> None:
self.node = node
self.arg = arg
def __hash__(self) -> int:
return hash((self.node, self.arg))
def __repr__(self) -> str:
return f"Send(node={self.node!r}, arg={self.arg!r})"
def __eq__(self, value: object) -> bool:
return (
isinstance(value, Send)
and self.node == value.node
and self.arg == value.arg
)
Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash__ 和 __eq__ 以支持去重和集合操作。
关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。
12.2.2 Send 与普通边的本质区别
在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:
graph LR
subgraph 静态边模式
A1[NodeA] -->|完整图状态| B1[NodeB]
end
subgraph 动态Send模式
A2[条件边函数] -->|arg1| B2a["NodeB #0"]
A2 -->|arg2| B2b["NodeB #1"]
A2 -->|arg3| B2c["NodeB #2"]
end
静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。
12.2.3 Send 的哈希与去重
Send 实现了 __hash__ 和 __eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:
- 两个
Send("node_a", {"x": 1})是相等的 Send("node_a", {"x": 1})和Send("node_a", {"x": 2})是不同的- 这在 checkpoint 恢复时用于比对已执行的任务
需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。
12.3 Topic Channel:动态并行的基础设施
12.3.1 Topic 的 Pub/Sub 语义
Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]。Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:
class Topic(
Generic[Value],
BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
"""A configurable PubSub Topic."""
__slots__ = ("values", "accumulate")
def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
super().__init__(typ)
self.accumulate = accumulate
self.values = list[Value]()
def update(self, values: Sequence[Value | list[Value]]) -> bool:
updated = False
if not self.accumulate:
updated = bool(self.values)
self.values = list[Value]()
if flat_values := tuple(_flatten(values)):
updated = True
self.values.extend(flat_values)
return updated
def get(self) -> Sequence[Value]:
if self.values:
return list(self.values)
else:
raise EmptyChannelError
与 LastValue Channel(只保留最新值)不同,Topic 可以在一个超步内接收多个值。_flatten 辅助函数将嵌套列表展平,使得无论是单个 Send 还是 Send 列表都能正确处理。
12.3.2 _flatten 的展平逻辑
def _flatten(values: Sequence[Value | list[Value]]) -> Iterator[Value]:
for value in values:
if isinstance(value, list):
yield from value
else:
yield value
这个简洁的生成器函数实现了一层展平。它的设计意图是:Channel 的 update 方法接收的是一个”来自各任务的写入值列表”。每个写入值本身可以是单个 Send,也可以是一个 Send 列表。_flatten 把这两层统一为一个平坦的 Send 序列。
12.3.3 accumulate 模式
accumulate 参数控制 Topic 的跨超步行为:
accumulate=False(默认):每个超步开始时清空旧值,只保留本步写入的新值。这是 TASKS Channel 使用的模式——每一轮只处理当前步产生的 Send。accumulate=True:值跨超步累积,适用于需要收集所有历史消息的场景。
sequenceDiagram
participant Step1 as 超步 1
participant Topic as Topic Channel
participant Step2 as 超步 2
Step1->>Topic: update([Send_A, Send_B])
Note over Topic: values = [Send_A, Send_B]
Topic-->>Step2: get() -> [Send_A, Send_B]
Step2->>Topic: update([Send_C])
Note over Topic: accumulate=false<br/>清空旧值后追加<br/>values = [Send_C]
12.3.4 Topic 与其他 Channel 的对比
graph TB
subgraph Channel类型对比
LV[LastValue<br/>保留最新单值<br/>覆盖语义]
BOA[BinaryOperatorAggregate<br/>通过 reducer 聚合<br/>如 operator.add]
TP[Topic<br/>多值列表<br/>pub/sub 语义]
NB[NamedBarrierValue<br/>同步屏障<br/>等待所有写入者]
end
LV -->|用于| S1[普通状态字段]
BOA -->|用于| S2["Annotated[list, op.add]"]
TP -->|用于| S3[TASKS 分发 Send]
NB -->|用于| S4[节点同步]
| 特性 | LastValue | BinaryOperatorAggregate | Topic |
|---|---|---|---|
| 单步值数量 | 1 | 1(经 reducer 合并) | N |
| 更新语义 | 覆盖 | 聚合 | 追加 |
| 跨步保留 | 是 | 是 | 可选(accumulate) |
| 典型用途 | 普通状态字段 | Annotated[list, operator.add] | TASKS 分发 |
12.4 Map-Reduce 模式的完整实现
12.4.1 经典用法
让我们通过一个完整的 map-reduce 示例来理解 Send 的工作流程:
from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], operator.add]
def continue_to_jokes(state: OverallState):
"""条件边函数:为每个 subject 生成一个 Send"""
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
def generate_joke(state: dict) -> dict:
"""工作节点:注意它接收的不是 OverallState,而是 Send 的 arg"""
return {"jokes": [f"Joke about {state['subject']}"]}
builder = StateGraph(OverallState)
builder.add_node("generate_joke", generate_joke)
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()
result = graph.invoke({"subjects": ["cats", "dogs", "robots"]})
# {'subjects': ['cats', 'dogs', 'robots'],
# 'jokes': ['Joke about cats', 'Joke about dogs', 'Joke about robots']}
这里的关键在于:
continue_to_jokes返回一个Send列表(而非字符串节点名)- 每个
Send都指向"generate_joke"节点,但携带不同的输入 generate_joke接收的state是{"subject": "cats"}这样的小字典,而不是完整的OverallState- 输出通过
Annotated[list[str], operator.add]的 reducer 自动合并
12.4.2 执行时序
sequenceDiagram
participant Input as 用户输入
participant Start as __start__
participant Cond as 条件边函数
participant TASKS as TASKS Channel
participant J1 as generate_joke #0
participant J2 as generate_joke #1
participant J3 as generate_joke #2
participant Agg as Reducer(operator.add)
participant EndN as __end__
Input->>Start: {subjects: [cats, dogs, robots]}
Start->>Cond: 读取状态
Cond->>TASKS: 写入 [Send_0, Send_1, Send_2]
Note over TASKS: Topic Channel 存储三个 Send
par 并行执行超步
TASKS->>J1: Send("generate_joke", {subject: cats})
TASKS->>J2: Send("generate_joke", {subject: dogs})
TASKS->>J3: Send("generate_joke", {subject: robots})
end
J1->>Agg: {jokes: ["Joke about cats"]}
J2->>Agg: {jokes: ["Joke about dogs"]}
J3->>Agg: {jokes: ["Joke about robots"]}
Agg->>EndN: jokes 合并为完整列表
12.5 Send 的调度链路深度剖析
12.5.1 条件边写入 TASKS Channel
当条件边函数返回 Send 对象时,分支处理逻辑会识别返回值的类型。如果返回值是 Send 或包含 Send 的列表,它们会被写入 TASKS Channel。在 Command 的处理路径中,这个逻辑体现在 map_command 函数里:
# langgraph/pregel/_io.py
def map_command(cmd: Command) -> Iterator[tuple[str, str, Any]]:
"""Map input chunk to a sequence of pending writes."""
if cmd.graph == Command.PARENT:
raise InvalidUpdateError("There is no parent graph")
if cmd.goto:
if isinstance(cmd.goto, (tuple, list)):
sends = cmd.goto
else:
sends = [cmd.goto]
for send in sends:
if isinstance(send, Send):
yield (NULL_TASK_ID, TASKS, send) # 写入 TASKS Channel
elif isinstance(send, str):
yield (NULL_TASK_ID, f"branch:to:{send}", START)
注意这里的 NULL_TASK_ID——这表示写入操作来自图的控制流层面,而非某个具体的运行中的任务。Send 对象被写入到 TASKS Channel(__pregel_tasks),而普通的字符串节点名被写入到 branch:to:{node} Channel。
12.5.2 prepare_next_tasks 中的 PUSH 任务创建
在 prepare_next_tasks 中,TASKS Channel 中的 Send 对象被逐一转化为可执行任务:
# langgraph/pregel/_algo.py
def prepare_next_tasks(...):
input_cache: dict[INPUT_CACHE_KEY_TYPE, Any] = {}
tasks: list[PregelTask | PregelExecutableTask] = []
# 第一步:消费 TASKS Channel 中的 Send 对象(PUSH 任务)
tasks_channel = cast(Topic[Send] | None, channels.get(TASKS))
if tasks_channel and tasks_channel.is_available():
for idx, _ in enumerate(tasks_channel.get()):
if task := prepare_single_task(
(PUSH, idx), # task_path 标记为 PUSH 类型
None,
checkpoint=checkpoint,
...
):
tasks.append(task)
# 第二步:处理 PULL 类型的任务(常规节点触发)
for name in candidate_nodes:
if task := prepare_single_task(
(PULL, name),
...
):
tasks.append(task)
return {t.id: t for t in tasks}
这里的核心区分是 PUSH vs PULL:
- PULL 任务:由 Channel 版本更新触发的常规节点执行
- PUSH 任务:由 Send 对象显式创建的动态任务
PUSH 任务总是在 PULL 任务之前被创建,两者在同一个超步内并行执行。
12.5.3 prepare_push_task_send 的核心逻辑
当 prepare_single_task 识别到 task_path[0] == PUSH 且不是函数式 API 的 Call 时,它调用 prepare_push_task_send:
def prepare_push_task_send(task_path, task_id_checksum, *, ...):
# 从 TASKS Channel 中按索引取出 Send 对象
idx = cast(int, task_path[1])
sends: Sequence[Send] = channels[TASKS].get()
if idx < 0 or idx >= len(sends):
return
packet = sends[idx]
if not isinstance(packet, Send):
logger.warning(f"Ignoring invalid packet type {type(packet)}")
return
# 验证目标节点存在
if packet.node not in processes:
logger.warning(f"Ignoring unknown node name {packet.node}")
return
proc = processes[packet.node]
# 生成确定性任务 ID
triggers = PUSH_TRIGGER # (PUSH,)
checkpoint_ns = f"{parent_ns}{NS_SEP}{packet.node}" if parent_ns else packet.node
task_id = task_id_func(
checkpoint_id_bytes,
checkpoint_ns,
str(step),
packet.node,
PUSH,
str(idx),
)
任务 ID 的生成是确定性的——基于 checkpoint ID、节点名、步数和 Send 索引。这保证了相同的图在相同的状态下会生成完全相同的任务 ID,这对于 checkpoint 恢复和幂等性至关重要。
12.5.4 Send 的输入如何传递给节点
创建 PregelExecutableTask 时,Send 的 arg 直接作为任务的 input:
# 在 prepare_push_task_send 中
return PregelExecutableTask(
packet.node, # name:目标节点名
packet.arg, # input:这就是 Send.arg
proc_node, # proc:节点的 Runnable
writes, # writes:输出 deque
config, # 配置
triggers, # (PUSH,) 触发器标记
proc.retry_policy or retry_policy,
cache_key,
task_id,
task_path[:3],
writers=proc.flat_writers,
subgraphs=proc.subgraphs,
)
这意味着 generate_joke 函数接收到的 state 参数实际上就是 packet.arg,即我们在 Send("generate_joke", {"subject": "cats"}) 中传入的字典。
12.6 动态 Fanout 的完整数据流
flowchart TB
subgraph 编译期
CE[条件边注册] --> BranchSpec[BranchSpec 存储分支函数]
BranchSpec --> Writer[生成 ChannelWrite 写入器]
end
subgraph "运行期 - 超步 N"
CondFn[条件边函数执行] -->|返回 Send 列表| WriteTasks[写入 TASKS Channel]
WriteTasks --> TopicCh["Topic<Send> Channel"]
end
subgraph "运行期 - 超步 N+1"
TopicCh --> PNT[prepare_next_tasks]
PNT --> PPTS1["prepare_push_task_send(idx=0)"]
PNT --> PPTS2["prepare_push_task_send(idx=1)"]
PNT --> PPTSN["prepare_push_task_send(idx=N)"]
PPTS1 --> Task1[PregelExecutableTask 1]
PPTS2 --> Task2[PregelExecutableTask 2]
PPTSN --> TaskN[PregelExecutableTask N]
end
subgraph "运行期 - 执行与聚合"
Task1 -->|并行执行| Runner[PregelRunner.tick]
Task2 --> Runner
TaskN --> Runner
Runner -->|写入输出 Channel| AW[apply_writes]
AW -->|BinaryOperatorAggregate| Merged[合并后的状态]
end
12.7 条件边返回多个 Send 的工程细节
12.7.1 分支处理中的 Send 识别
在编译过程中,条件边的返回值会经过一个写入器函数处理。这个写入器能够区分三种返回类型:
- 字符串:表示一个目标节点名,转化为对应的
branch:to:{node}Channel 写入 - Send 对象:直接写入 TASKS Channel
- 列表:可以包含字符串和 Send 的混合
# 条件边可以混合返回字符串和 Send
def route(state):
sends = [Send("worker", {"task": t}) for t in state["tasks"]]
sends.append("summary_node") # 同时也触发一个常规节点
return sends
这种混合返回的能力非常强大——你可以在一个条件边中同时触发多个动态并行任务和静态节点。
12.7.2 Command.goto 中的 Send
Command 是 LangGraph 更强大的控制流原语,它的 goto 字段也支持 Send:
from langgraph.types import Command, Send
def my_node(state):
return Command(
update={"status": "dispatched"},
goto=[
Send("processor", {"item": item})
for item in state["items"]
]
)
在 map_command 函数中,Command.goto 中的 Send 对象以完全相同的方式被写入 TASKS Channel。这使得动态并行不仅可以在条件边中触发,还可以在任何节点的返回值中触发。
Command 的完整形状比”一个带 goto 的对象”要大。源码 langgraph/types.py:652 的真实定义揭示了它的四个字段——每一个都对应一个不同的控制流维度:
# langgraph/types.py:652
@dataclass(**_DC_KWARGS)
class Command(Generic[N], ToolOutputMixin):
graph: str | None = None # 目标图:None / Command.PARENT
update: Any | None = None # 状态更新
resume: dict[str, Any] | Any | None = None # 中断恢复值
goto: Send | Sequence[Send | N] | N = () # 下一跳
四个字段是正交的——任何组合都合法。你可以同时 update 状态 + goto 下一个节点 + 回复一组 resume 值——一次节点返回就承担”改状态 + 走路由 + 解中断”三件事。这种”合一表达”能力让 LangGraph 能用单一返回类型实现 LangSmith 那套复杂 Agent 行为,不需要设计一堆不同的 sentinel 值让节点区别返回。
graph 字段是子图逃逸通道。graph=Command.PARENT(Command 类属性里的特殊标记)告诉 Pregel”这个命令目标不是当前图、是父图”。用途:子图作为 tool 被父图调用时,子图内某个节点想直接改变父图的路由或状态,不需要”返回值→父图再解析”的两级处理,可以用 Command(graph=Command.PARENT, goto="finalize") 一步到位。注意 graph 只接受 None 或 Command.PARENT 两个值——不是任意图名——因为 Pregel 只知道直接父图的存在、无法定位到更远祖先。
update 的类型很宽松 是 Any | None——实际可以是 dict、tuple list(走 _update_as_tuples,line 687 会拆成 [(channel, value), ...])、甚至更奇怪的结构。_update_as_tuples 内部做一次规整:dict 直接 items()、list of 2-tuple 原样保留、其它形态抛错。这让 Command 对不同 state_schema(TypedDict / Pydantic / dataclass / 自定义 reducer)都能自然适配。
resume 的双态(dict[str, Any] | Any | None):dict 形态是”按 interrupt id 精确路由”——用于单节点里有多个 interrupt 点;单值形态是”下一个待恢复的中断直接拿这个”——简化场景常用。这个二态是 interrupt() 机制的对称设计:interrupt 单次触发就用单值 resume、多次触发就用 dict 按 id 配对。
回到 Send 这个主题:Send 是 Command 的一个合法 goto 值(类型注解 Send | Sequence[Send | N] | N),意味着”直接返回 Send 列表” 和 “返回 Command(goto=[Send, ...])” 在 Pregel 看来最终都走 map_command 同一条路径。差别只在”节点除了派生任务以外还要不要顺便改状态/解中断”。这种统一又分层的设计——简单场景零开销(裸 Send 即可),复杂场景也不需要新语法(Command 容纳一切)——是 LangGraph 控制流 API 简洁度的来源。
12.7.3 Send 与中断的交互
当 Send 派生的任务遇到 interrupt() 时,情况变得有趣。每个 PUSH 任务都有自己独立的 checkpoint_ns(格式为 {parent_ns}:{node_name}:{task_id}),这意味着:
- 每个 Send 任务的中断是独立追踪的
- 恢复时,框架能准确地将
resume值路由到正确的任务 - 尚未完成的 Send 任务可以被单独恢复,已完成的不会重新执行
stateDiagram-v2
[*] --> Dispatched: Send 列表写入 TASKS
Dispatched --> Running: prepare_push_task_send
Running --> Completed: 正常完成
Running --> Interrupted: 遇到 interrupt()
Interrupted --> Resumed: Command(resume=value)
Resumed --> Completed: 从头重新执行节点
Completed --> Aggregated: apply_writes 合并输出
Aggregated --> [*]
12.8 设计决策
12.8.1 为什么 Send 不是 Channel?
一个自然的疑问是:为什么不把 Send 设计为一种新的 Channel 类型?答案在于关注点分离:
- Send 是数据:它只是一个”请求执行某节点”的消息包
- Topic 是基础设施:它负责存储和分发这些消息包
- TASKS 是约定:它是一个预定义的系统级 Channel 名
这种分层让 Send 保持简洁(仅两个字段),同时复用了 Topic Channel 的全部能力。如果未来需要支持新的分发语义(如优先级队列),只需替换 TASKS Channel 的实现,而 Send 本身无需改变。
12.8.2 确定性任务 ID 的重要性
prepare_push_task_send 中的任务 ID 生成使用了确定性哈希:
task_id = task_id_func(
checkpoint_id_bytes,
checkpoint_ns,
str(step),
packet.node,
PUSH,
str(idx), # Send 在列表中的索引
)
这个设计确保了:
- 幂等性:相同状态下重新执行会生成相同的任务 ID
- Checkpoint 恢复:从 checkpoint 恢复时,已完成的任务不会被重复创建
- 并发安全:不同的 Send 索引产生不同的任务 ID,避免冲突
task_id_func 根据 checkpoint 版本选择不同的哈希算法——v2+ 使用 xxhash(更快),旧版本使用 UUID5(兼容性)。
12.8.3 Send.arg 为 Any 类型的权衡
Send.arg 类型为 Any 带来了极大的灵活性——你可以发送字典、Pydantic 模型、甚至原始字符串。但这也意味着类型安全完全依赖于开发者:
# 正确:节点期望 dict,Send 发送 dict
Send("process", {"key": "value"})
# 也正确:节点期望 str,Send 发送 str
Send("process", "hello")
# 运行时错误:类型不匹配不会在编译期被检查到
Send("process", 42) # 如果节点期望 dict,运行时会失败
这是一个典型的灵活性 vs 安全性权衡。LangGraph 选择了灵活性,因为动态并行的场景下,输入类型往往在运行时才确定。
12.8.4 Reduce 阶段的隐式实现
LangGraph 的 map-reduce 模式中,reduce 阶段是隐式的——通过 Channel 的 reducer 函数自动完成:
class OverallState(TypedDict):
jokes: Annotated[list[str], operator.add] # reducer 就是 reduce 逻辑
这与 MapReduce 框架中显式定义 reduce 函数不同。在 apply_writes 中,所有并行任务的输出被收集后,通过 BinaryOperatorAggregate Channel 的 update 方法依次应用 reducer:
# apply_writes 中的关键逻辑
pending_writes_by_channel: dict[str, list[Any]] = defaultdict(list)
for task in tasks:
for chan, val in task.writes:
if chan in channels:
pending_writes_by_channel[chan].append(val)
for chan, vals in pending_writes_by_channel.items():
if chan in channels:
channels[chan].update(vals) # BinaryOperatorAggregate 应用 reducer
这种隐式设计的优势在于,同一个 reducer 同时服务于静态边和动态 Send 的输出合并,无需开发者区分两种场景。
12.9 高级模式
12.9.1 Send 到子图节点
Send 可以将数据发送到一个子图节点,此时 arg 会成为子图的输入:
def dispatch_to_subgraph(state):
return [
Send("analysis_subgraph", {"document": doc, "mode": "sentiment"})
for doc in state["documents"]
]
子图会以 arg 作为输入开始执行,它的 checkpoint_ns 会自动嵌套在父图的命名空间下。
12.9.2 动态 Fan-in 的注意事项
所有 Send 任务在同一个超步内并行执行,它们的输出在下一个超步通过 apply_writes 合并。这意味着:
- 如果需要等待所有并行任务完成后才进行下一步,只需让所有 Send 任务有相同的后续边
- 如果某个并行任务失败,其错误会被记录但不会阻止其他任务的执行(除非配置了
interrupt行为)
graph TB
Start[START] -->|条件边| Fan{fanout}
Fan -->|"Send(arg1)"| W1[worker]
Fan -->|"Send(arg2)"| W2[worker]
Fan -->|"Send(arg3)"| W3[worker]
W1 -->|reducer 聚合| Agg[aggregator]
W2 -->|reducer 聚合| Agg
W3 -->|reducer 聚合| Agg
Agg --> End[END]
style Fan fill:#f0e6ff,stroke:#333,stroke-width:2px
style Agg fill:#e6f0ff,stroke:#333,stroke-width:2px
12.9.3 Send 的性能考量
每个 Send 都会创建一个独立的 PregelExecutableTask,包括独立的配置、写入 deque、checkpoint 命名空间等。对于大量并行任务(如数百个 Send),需要注意:
- 每个任务都会产生 checkpoint 写入(如果启用了 checkpointer)
- 线程池的大小可能成为瓶颈
- 所有任务的输出在内存中累积直到
apply_writes
对于极大规模的并行场景,建议在条件边中控制 Send 的数量,或使用批处理策略。
12.9.4 实战:文档批量处理流水线
以下是一个更完整的 map-reduce 实战示例——批量处理文档列表,每个文档独立分析后汇总:
from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator
class PipelineState(TypedDict):
documents: list[dict] # 输入文档列表
results: Annotated[list[dict], operator.add] # 分析结果(reducer 合并)
summary: str # 最终汇总
class DocInput(TypedDict):
"""Send 传递给 analyze_doc 的输入"""
doc_id: str
content: str
analysis_type: str
def dispatch_documents(state: PipelineState):
"""条件边:为每个文档创建一个 Send"""
return [
Send("analyze_doc", DocInput(
doc_id=doc["id"],
content=doc["content"],
analysis_type="sentiment"
))
for doc in state["documents"]
]
def analyze_doc(state: DocInput) -> dict:
"""工作节点:分析单个文档"""
# 注意:state 的类型是 DocInput,不是 PipelineState
result = {
"doc_id": state["doc_id"],
"sentiment": "positive", # 实际中调用 LLM
"length": len(state["content"])
}
return {"results": [result]}
def summarize(state: PipelineState) -> dict:
"""汇总节点:整合所有分析结果"""
total = len(state["results"])
positive = sum(1 for r in state["results"] if r["sentiment"] == "positive")
return {"summary": f"Analyzed {total} docs, {positive} positive"}
builder = StateGraph(PipelineState)
builder.add_node("analyze_doc", analyze_doc)
builder.add_node("summarize", summarize)
builder.add_conditional_edges(START, dispatch_documents)
builder.add_edge("analyze_doc", "summarize")
builder.add_edge("summarize", END)
graph = builder.compile()
这个例子展示了 Send 的完整生命周期:条件边创建 Send 列表、每个 Send 携带独立的输入类型、工作节点独立执行、输出通过 reducer 自动合并、最终汇总节点处理合并后的结果。
12.9.5 Send 与 Checkpoint 的交互细节
当启用了 checkpointer 时,Send 任务的 checkpoint 行为值得深入理解:
- 任务写入持久化:每个 PUSH 任务的
writes会通过put_writes异步持久化 - 恢复时的匹配:从 checkpoint 恢复时,
pending_writes中已有结果的任务不会被重新执行 - Send 的序列化:Send 对象本身作为 TASKS Channel 的值被保存在 checkpoint 中,恢复时用于重建任务
flowchart LR
subgraph "首次执行"
S1["Send 列表写入 TASKS"] --> T1["创建 PUSH 任务"]
T1 --> E1["执行任务"]
E1 --> W1["写入 pending_writes"]
W1 --> CP1["保存 Checkpoint"]
end
subgraph "恢复执行"
CP1 --> R1["加载 Checkpoint"]
R1 --> R2["重建 TASKS Channel"]
R2 --> R3["prepare_next_tasks"]
R3 --> R4{"pending_writes<br/>中有结果?"}
R4 -->|是| Skip["跳过已完成的任务"]
R4 -->|否| Exec["重新执行任务"]
end
12.9.6 langgraph/channels/ 8 文件 918 行的真实拆分
§12.3 讨论 Topic 时给出了和其他 channel 的对比表——把整个 channels/ 目录的真实账本展开——
| 文件 | 行 | 角色 |
|---|---|---|
named_barrier_value.py | 167 | 本目录最大——多 source 同步屏障(等所有命名输入到齐才触发) |
last_value.py | 151 | 默认 reducer——后写覆盖前写、单值 |
binop.py | 134 | BinaryOperatorAggregate(§12.4 reduce 阶段的复用基础——operator.add 拼 list 就靠它) |
base.py | 121 | BaseChannel ABC——定义 update / get / consume / checkpoint 协议 |
topic.py | 94 | Topic Channel(本章 §12.3 主角)——pub/sub 语义,Send 收集走它 |
ephemeral_value.py | 79 | 单步生命周期值(下一步消失,不持久化) |
untracked_value.py | 73 | 不参与 checkpoint/version 的值 |
any_value.py | 72 | ”任意一个写入即可”(first-write-wins 反语义) |
__init__.py | 27 | 公共 export |
三条值得记住的物理事实——
- Topic Channel 仅 94 行——本章用了一节(§12.3)讨论它的 pub/sub + flatten + accumulate 三种模式——94 行就实现完——印证 LangGraph 对channel 抽象的设计纪律:每种 channel 只负责”如何接收 update + 何时 emit”两件事、其余复杂度全在 Pregel loop 里
named_barrier_value.py167 行是最大——但本章没提到它——因为 NamedBarrier 是 LangGraph 处理 “多分支汇总到同一节点” 时的隐式 channel,例如add_edge(["a", "b"], "c")让 c 等 a 和 b 都到达;用户感知不到、但运行时比 Topic 还重binop.py134 行 =BinaryOperatorAggregate——是 §12.4 map-reduce 模式 reduce 阶段的实际 channel——Annotated[list[str], operator.add]这种类型注解在 compile 时就实例化为一个 binop channel 实例——reducer 不是新概念、只是 binop channel 的别名
Send 类的真实定义(types.py:574-650 实测)——
class Send:
__slots__ = ("node", "arg") # 严格 2 字段、节省 dict 开销
def __init__(self, /, node: str, arg: Any) -> None: ...
def __hash__(self) -> int:
return hash((self.node, self.arg))
def __eq__(self, value: object) -> bool: ...
两点细节——
__slots__限定 2 字段——Send 实例没有__dict__——一个 Send 在内存里只占几十字节、跑 1000 次 Map-Reduce 也不会因 dict 开销爆炸/让node是 positional-only 参数——用户不能写Send(node="x", arg=...)——只能写Send("x", {...})——这是 Python 3.8+ 的特性、目的是降低 API 表面的兼容负担(未来如果改字段名node → target、关键字写法的代码会全崩、positional 不会)
StreamMode 实测 7 种 (types.py:118)——values / updates / checkpoints / tasks / debug / messages / custom——和小结里那一句”下一章七种 StreamMode”对得上。
12.9A 源码账本(截至 main 2026-04-22)
前面章节把 Send 链路讲清楚了——但**“链路”不等于”账本”。以下表格是对照仓库当前 main 分支 grep 得来的真实文件 + 行号清单**,供校对和跳读用。
12.9A.1 Send/PUSH 链路涉及的真实文件
| 文件 | 行 | 职责 |
|---|---|---|
libs/langgraph/langgraph/types.py | 748-792 | Send 类定义——__slots__ = ("node", "arg") + positional-only / |
libs/langgraph/langgraph/types.py | 796+ | Command dataclass——goto 字段类型为 Send | Sequence[Send | N] | N |
libs/langgraph/langgraph/types.py | 99-101 | StreamMode Literal——7 种(values/updates/checkpoints/tasks/debug/messages/custom) |
libs/langgraph/langgraph/constants.py | 总 61 | 公共 re-export 入口——PUSH/TASKS 等已迁到 _internal |
libs/langgraph/langgraph/_internal/_constants.py | 总 90 | 系统级常量本体——Send 派发相关的所有字符串 |
libs/langgraph/langgraph/_internal/_constants.py | 17 | TASKS = "__pregel_tasks" |
libs/langgraph/langgraph/_internal/_constants.py | 58 | PUSH = "__pregel_push" |
libs/langgraph/langgraph/_internal/_constants.py | 61 | PULL = "__pregel_pull" |
libs/langgraph/langgraph/_internal/_constants.py | 72 | NULL_TASK_ID = "00000000-0000-0000-0000-000000000000" |
libs/langgraph/langgraph/_internal/_constants.py | 64 | NS_SEP = "|"——checkpoint 命名空间分隔符 |
libs/langgraph/langgraph/_internal/_constants.py | 28 | CONFIG_KEY_SEND = "__pregel_send" |
libs/langgraph/langgraph/channels/topic.py | 总 99 | Topic Channel + _flatten helper |
libs/langgraph/langgraph/channels/topic.py | 13-17 | _flatten 生成器——一层展开 Sequence[Value | list[Value]] |
libs/langgraph/langgraph/pregel/_algo.py | 总 909 | Pregel 调度核心 |
libs/langgraph/langgraph/pregel/_algo.py | 395 | prepare_single_task 入口 |
libs/langgraph/langgraph/pregel/_algo.py | 410 | task_id_func = _xxhash_str if checkpoint["v"] > 1 else _uuid5_str |
libs/langgraph/langgraph/pregel/_algo.py | 633 | prepare_push_task_send——把 Send 转成 PregelExecutableTask |
libs/langgraph/langgraph/pregel/_algo.py | 854 | _uuid5_str 兼容老 checkpoint v1 |
libs/langgraph/langgraph/pregel/_algo.py | 862 | _xxhash_str v2+ 快速哈希 |
12.9A.2 几条值得记住的物理事实
constants.py只有 61 行——大部分系统常量已迁到_internal/_constants.py(90 行)——TASKS / PUSH / PULL / NULL_TASK_ID / NS_SEP / CONFIG_KEY_SEND都在后者——说明 LangGraph 把”实现细节”和”对外 API”做了严格分层:用户from langgraph.constants import START, END看到的是稳定表面,内部 re-export 通过__getattr__动态代理(constants.py line 28-53),让重构自由度最大化NS_SEP是"|"不是":"——本章 §12.5.3 写checkpoint_ns = f"{parent_ns}{NS_SEP}{packet.node}"时这个分隔符就是竖线——子图嵌套命名空间在 checkpoint store 里形如"parent|analyze_doc|xxh3_abc"——选|的原因是节点名、task_id 都不含它、不会歧义- task_id v1 vs v2 分叉(
_algo.py:410)——checkpoint["v"] > 1走 xxhash、否则 UUID5——2024 下半年 LangGraph 从 v1 升 v2 的时候保留了 UUID5 老路径给旧 checkpoint 做向后兼容——这意味着线上长跑的图如果 checkpoint 版本很老,恢复时仍然用慢但稳定的 UUID5——不会因为版本新旧产生 task_id 漂移、从而把已完成任务误判成未执行 TASKS的字符串值"__pregel_tasks"以双下划线开头——这是 LangGraph 的系统 Channel 命名约定——用户定义的 Channel 名不允许以__pregel_开头——避免和内部 Channel 冲突——同样约定的还有__pregel_push / __pregel_pull / __pregel_send / __interrupt__CONFIG_KEY_SEND = "__pregel_send"——这是节点运行时拿到 Send 派发器的 config 键——即在节点内部可以通过config["configurable"]["__pregel_send"]直接取出一个send(packet: Send)回调——不经过条件边也能写入 TASKS——这是 functional API 的Send运行时入口
12.9B Fan-out 三种机制对比
LangGraph 里能触发”多分支”的机制不止 Send 一种——以下把 三种 fan-out 路径 放一起横向看,避免实战时选错工具。
12.9B.1 机制对照表
| 维度 | add_conditional_edges(返回节点名) | Send 列表(本章主角) | add_edge(list, target)(多源汇聚) |
|---|---|---|---|
| 决策时机 | 运行时(超步末) | 运行时(超步末) | 编译期(静态拓扑) |
| 分支数量 | 编译期固定(候选节点集) | 运行时任意(可 0、可 N) | 编译期固定 |
| 目标节点 | 一个或多个候选中的子集 | 任意可派发节点(含子图) | 多个 source → 一个 target |
| 输入载荷 | 读完整图状态 | 读 Send.arg(可完全脱离主 state) | 读完整图状态 |
| 承载 Channel | branch:to:{node}(EphemeralValue) | __pregel_tasks(Topic) | NamedBarrierValue |
| Channel 行号 | channels/ephemeral_value.py 79 行 | channels/topic.py 99 行 | channels/named_barrier_value.py 167 行(目录最大) |
| 同步语义 | 触发即走 | 每个 Send 独立 PUSH 任务 | 等所有 source 到齐才触发 target |
| 任务类型 | PULL | PUSH | PULL |
| 典型用途 | 路由(if/switch) | Map-Reduce、动态批处理 | 多分支汇合(join/barrier) |
| 与 checkpoint 关系 | 条件边结果写 branch Channel | 每个 Send 独立任务 ID | barrier 状态写 checkpoint |
| 可发送到子图 | 是 | 是(arg 成为子图 input) | 是 |
| 可混合返回 | 只能返回字符串 | 可混合 Send + 字符串(§12.7.1) | N/A |
12.9B.2 选择决策图
flowchart TD
Q1{分支数量在编译期<br/>能否确定?}
Q1 -->|能| Q2{是否需要<br/>多 source 同步?}
Q1 -->|不能/依赖输入| SEND[Send 列表<br/>PUSH 动态派发]
Q2 -->|需要| BARRIER[add_edge#40;list, target#41;<br/>NamedBarrierValue 同步]
Q2 -->|不需要| Q3{目标节点是<br/>候选集子集?}
Q3 -->|是| COND[add_conditional_edges<br/>返回节点名]
Q3 -->|否| SEND
SEND --> Q4{节点期望收到<br/>完整主 state?}
Q4 -->|是| WARN[警告:Send.arg 必须<br/>手工构造完整 state<br/>或考虑换条件边]
Q4 -->|否| OK[正确选择 Send]
style SEND fill:#f0e6ff,stroke:#333
style BARRIER fill:#e6f0ff,stroke:#333
style COND fill:#fff0e6,stroke:#333
12.9B.3 易错点:Send 下游节点的状态读取
最常见的坑——generate_joke(state) 收到的 state 是 Send.arg 而不是主 state——这意味着:
def generate_joke(state): # state 就是 {"subject": "cats"}
subjects = state["subjects"] # KeyError!主 state 里的字段这里没有
return {"jokes": [...]}
如果 Send 下游节点同时也需要主 state 的其它字段,有两条路径——
- 在条件边里把需要的上下文塞进 Send.arg:
[Send("worker", {**s, "user_id": state["user_id"]}) for s in items] - 在节点签名里显式声明
InjectedState(LangGraph 新版 functional API 提供)——框架会把主 state 作为第二参数注入、不经过 Send.arg——这让Send 负责路由、主 state 负责共享上下文、职责分离
12.9C Send 链路跨章校对
本书多章涉及 PUSH/PULL 任务、超步调度、Channel 写入——以下把和 Send 强相关的章节锚点梳理一次,方便来回查:
| 章节 | 锚点 | 和本章的关系 |
|---|---|---|
| 第 07 章 任务调度 | PregelRunner.tick / submit / commit 三阶段 | 本章 §12.5 创建的 PUSH PregelExecutableTask 最终交给 07 章讲的 PregelRunner 执行——Send 派生多个任务后,submit 阶段并行提交、commit 阶段把 writes 回写 Channel |
| 第 07 章 | PULL vs PUSH 任务区分 | 本章 §12.5.2 的”PUSH 任务先于 PULL 创建”这条规则源头在 07 章——prepare_next_tasks 按 PUSH-first 顺序生成任务列表 |
| 第 08 章 Channel 体系 | BinaryOperatorAggregate / LastValue / Topic | 本章 §12.4 map-reduce 的 reduce 阶段复用 08 章的 BinaryOperatorAggregate(binop.py 134 行)——Annotated[list[str], operator.add] 编译时即实例化为 binop channel |
| 第 08 章 | Channel 检查点 checkpoint / from_checkpoint 协议 | 本章 §12.9.5 的”Send 对象作为 TASKS Channel 值被保存”实现在 Topic.checkpoint(topic.py)——values 列表直接序列化 |
| 第 09 章 Checkpoint | put_writes / get_writes | 本章 §12.9.5 的 pending_writes 恢复逻辑对应 09 章的 checkpointer.get_writes——PUSH 任务因为 task_id 确定性、重启时能精确匹配到之前未完成的写入 |
| 第 10 章 中断 | interrupt() + Command(resume=...) | 本章 §12.7.3 的”每个 Send 任务独立 checkpoint_ns”决定了 10 章的 interrupt 能按任务独立追踪——dict 形态 resume 按 interrupt id 路由、单值形态对应单任务恢复 |
| 第 11 章 条件边 | add_conditional_edges + BranchSpec | 本章 §12.7.1 的”条件边识别 Send 并写 TASKS”扩展 11 章的 BranchSpec 写入器——同一个分支函数可以混合返回 Send + 字符串 |
| 第 13 章 流式输出(下一章) | StreamMode 7 种(types.py:99-101) | 本章小结提到的”七种 StreamMode”详情见 13 章——其中 tasks mode 能观测到每个 PUSH 任务的创建与完成事件、对调试动态并行极关键 |
| 第 14+ 章 子图 / Functional API | Command(graph=PARENT) + CONFIG_KEY_SEND | 本章 §12.7.2 的父图逃逸和 §12.9A.2 第 5 条的 CONFIG_KEY_SEND 运行时入口——在子图/functional API 中作为非条件边场景的 Send 触发方式 |
一张图总结 Send 在 LangGraph 全栈的位置:
flowchart LR
subgraph Ch11[第11章 条件边]
BR[BranchSpec 写入器]
end
subgraph Ch12[第12章 本章 Send]
SEND[Send 对象]
TOPIC[TASKS = Topic Channel]
PUSH_TASK[PUSH PregelExecutableTask]
end
subgraph Ch07[第07章 调度]
RUNNER[PregelRunner.tick]
SUBMIT[submit 并发]
COMMIT[commit 写回]
end
subgraph Ch08[第08章 Channel]
BINOP[BinaryOperatorAggregate]
end
subgraph Ch09[第09章 Checkpoint]
CP[put_writes/get_writes]
end
subgraph Ch10[第10章 中断]
INT[interrupt+resume]
end
BR -->|识别 Send| TOPIC
SEND --> TOPIC
TOPIC -->|prepare_push_task_send| PUSH_TASK
PUSH_TASK --> RUNNER
RUNNER --> SUBMIT
SUBMIT --> COMMIT
COMMIT -->|reducer 合并| BINOP
PUSH_TASK -.独立 ns.-> CP
PUSH_TASK -.独立 ns.-> INT
12.10 小结
本章深入分析了 LangGraph 的动态并行机制。Send 对象以极简的两字段设计(node + arg)实现了强大的运行时任务分发能力。它通过 Topic Channel 的 pub/sub 语义被收集和分发,通过 prepare_push_task_send 被转化为可执行任务,最终在 PregelRunner 中并行执行。
整个 map-reduce 模式的实现体现了 LangGraph 的核心哲学:复用而非重复发明。Send 复用了 Topic Channel 的基础设施,reducer 复用了 BinaryOperatorAggregate 的聚合能力,任务调度复用了 Pregel 的超步机制。开发者只需关注两件事:在条件边中返回 Send 列表(map),在状态定义中声明 reducer(reduce),框架会处理其余一切。
物理事实:channels/ 8 文件 918 行,Topic 仅 94 行实现 pub/sub+flatten+accumulate 三模式;Send 用 __slots__ 限定 2 字段保证内存极小、__init__ 用 / 让 node 是 positional-only 屏蔽未来 API 改名的兼容负担。