Skip to content

第12章 Send 与动态并行

12.1 引言

在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。

这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。

本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。

本章要点

  1. Send 对象的数据结构与语义——向指定节点发送自定义输入
  2. Topic Channel 的 pub/sub 机制——支持多值累积与逐步消费
  3. Map-Reduce 模式的完整实现——从 fanout 到 aggregation
  4. 动态 fanout 的调度细节——prepare_push_task_send 的执行链路
  5. 条件边返回多个 Send 的工程实践与限制

12.2 Send 对象的设计

12.2.1 数据结构

Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:

python
class Send:
    """A message or packet to send to a specific node in the graph."""

    __slots__ = ("node", "arg")

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash____eq__ 以支持去重和集合操作。

关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。

12.2.2 Send 与普通边的本质区别

在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:

静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。

12.2.3 Send 的哈希与去重

Send 实现了 __hash____eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:

  • 两个 Send("node_a", {"x": 1}) 是相等的
  • Send("node_a", {"x": 1})Send("node_a", {"x": 2}) 是不同的
  • 这在 checkpoint 恢复时用于比对已执行的任务

需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。

12.3 Topic Channel:动态并行的基础设施

12.3.1 Topic 的 Pub/Sub 语义

Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:

python
class Topic(
    Generic[Value],
    BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
    """A configurable PubSub Topic."""

    __slots__ = ("values", "accumulate")

    def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
        super().__init__(typ)
        self.accumulate = accumulate
        self.values = list[Value]()

    def update(self, values: Sequence[Value | list[Value]]) -> bool:
        updated = False
        if not self.accumulate:
            updated = bool(self.values)
            self.values = list[Value]()
        if flat_values := tuple(_flatten(values)):
            updated = True
            self.values.extend(flat_values)
        return updated

    def get(self) -> Sequence[Value]:
        if self.values:
            return list(self.values)
        else:
            raise EmptyChannelError

基于 VitePress 构建