Appearance
第12章 Send 与动态并行
12.1 引言
在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。
这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。
本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。
本章要点
Send对象的数据结构与语义——向指定节点发送自定义输入TopicChannel 的 pub/sub 机制——支持多值累积与逐步消费- Map-Reduce 模式的完整实现——从 fanout 到 aggregation
- 动态 fanout 的调度细节——
prepare_push_task_send的执行链路 - 条件边返回多个 Send 的工程实践与限制
12.2 Send 对象的设计
12.2.1 数据结构
Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:
python
class Send:
"""A message or packet to send to a specific node in the graph."""
__slots__ = ("node", "arg")
node: str
arg: Any
def __init__(self, /, node: str, arg: Any) -> None:
self.node = node
self.arg = arg
def __hash__(self) -> int:
return hash((self.node, self.arg))
def __repr__(self) -> str:
return f"Send(node={self.node!r}, arg={self.arg!r})"
def __eq__(self, value: object) -> bool:
return (
isinstance(value, Send)
and self.node == value.node
and self.arg == value.arg
)Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash__ 和 __eq__ 以支持去重和集合操作。
关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。
12.2.2 Send 与普通边的本质区别
在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:
静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。
12.2.3 Send 的哈希与去重
Send 实现了 __hash__ 和 __eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:
- 两个
Send("node_a", {"x": 1})是相等的 Send("node_a", {"x": 1})和Send("node_a", {"x": 2})是不同的- 这在 checkpoint 恢复时用于比对已执行的任务
需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。
12.3 Topic Channel:动态并行的基础设施
12.3.1 Topic 的 Pub/Sub 语义
Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]。Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:
python
class Topic(
Generic[Value],
BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
"""A configurable PubSub Topic."""
__slots__ = ("values", "accumulate")
def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
super().__init__(typ)
self.accumulate = accumulate
self.values = list[Value]()
def update(self, values: Sequence[Value | list[Value]]) -> bool:
updated = False
if not self.accumulate:
updated = bool(self.values)
self.values = list[Value]()
if flat_values := tuple(_flatten(values)):
updated = True
self.values.extend(flat_values)
return updated
def get(self) -> Sequence[Value]:
if self.values:
return list(self.values)
else:
raise EmptyChannelError