LangGraph 设计与实现

第12章 Send 与动态并行

作者 杨艺韬 · 8,616 字

第12章 Send 与动态并行

12.1 引言

在前面的章节中,我们已经了解了 LangGraph 的静态图编译流程:节点通过边连接、Channel 传递状态、Pregel 按超步调度。然而,现实世界中的许多工作流并不是在编译时就能确定所有执行路径的。考虑这样一个场景——用户输入一篇文章,我们需要对文章中的每个段落分别进行翻译、摘要或情感分析,然后把所有结果汇总。段落的数量在编译时未知,只有在运行时读到输入后才能确定。

这就是 动态并行 的核心需求:在运行时根据数据决定要派生多少个并行任务,每个任务可以携带不同的输入,最终所有任务的输出通过 reducer 汇聚回主图状态。LangGraph 通过 Send 对象和 Topic Channel 精巧地解决了这个问题,实现了经典的 map-reduce 模式。

本章将从 Send 类的源码出发,追踪它在条件边中的返回、写入 TASKS Channel、被 prepare_next_tasks 消费、最终派生为并行 PregelExecutableTask 的完整链路。我们会深入 Topic Channel 的 pub/sub 语义,理解它如何支撑动态 fanout;我们还会分析条件边如何返回多个 Send 对象,以及 Command.goto 中嵌入 Send 的高级用法。

本章要点

  1. Send 对象的数据结构与语义——向指定节点发送自定义输入
  2. Topic Channel 的 pub/sub 机制——支持多值累积与逐步消费
  3. Map-Reduce 模式的完整实现——从 fanout 到 aggregation
  4. 动态 fanout 的调度细节——prepare_push_task_send 的执行链路
  5. 条件边返回多个 Send 的工程实践与限制

12.2 Send 对象的设计

12.2.1 数据结构

Send 是 LangGraph 中最简洁的数据结构之一,定义在 langgraph/types.py 中:

class Send:
    """A message or packet to send to a specific node in the graph."""

    __slots__ = ("node", "arg")

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

Send 只有两个字段:node 指定目标节点名,arg 是传递给该节点的自定义输入。使用 __slots__ 优化内存占用,实现了 __hash____eq__ 以支持去重和集合操作。

关键设计决策在于 arg 的类型是 Any——这意味着 Send 携带的输入可以与图的主状态 schema 完全不同。当一个节点通过 Send 被调用时,它接收的不是完整的图状态,而是 Send 中指定的 arg。这是实现动态并行的核心:每个并行任务可以有自己独立的输入。

12.2.2 Send 与普通边的本质区别

在静态图中,边定义了数据流的拓扑关系,所有节点共享同一份状态。而 Send 改变了这个规则:

graph LR
    subgraph 静态边模式
        A1[NodeA] -->|完整图状态| B1[NodeB]
    end
    subgraph 动态Send模式
        A2[条件边函数] -->|arg1| B2a["NodeB #0"]
        A2 -->|arg2| B2b["NodeB #1"]
        A2 -->|arg3| B2c["NodeB #2"]
    end

静态边下,NodeB 的每次执行都读取完整的图状态。而在动态 Send 模式下,同一个节点可以被实例化多次,每个实例接收不同的输入。这种区别在源码中体现为两种不同的任务类型——PULL 任务(由 Channel 更新触发)和 PUSH 任务(由 Send 对象创建)。

12.2.3 Send 的哈希与去重

Send 实现了 __hash____eq__,这使得它可以被放入集合或用作字典键。哈希基于 (node, arg) 的元组,这意味着:

  • 两个 Send("node_a", {"x": 1}) 是相等的
  • Send("node_a", {"x": 1})Send("node_a", {"x": 2}) 是不同的
  • 这在 checkpoint 恢复时用于比对已执行的任务

需要注意的是,如果 arg 包含不可哈希的对象(如嵌套的列表),__hash__ 会抛出 TypeError。这是 Python 标准行为的自然延伸。

12.3 Topic Channel:动态并行的基础设施

12.3.1 Topic 的 Pub/Sub 语义

Send 对象最终被写入一个名为 __pregel_tasks(即常量 TASKS)的特殊 Channel,这个 Channel 的类型就是 Topic[Send]Topic Channel 定义在 langgraph/channels/topic.py 中,它实现了经典的发布/订阅模式:

class Topic(
    Generic[Value],
    BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
    """A configurable PubSub Topic."""

    __slots__ = ("values", "accumulate")

    def __init__(self, typ: type[Value], accumulate: bool = False) -> None:
        super().__init__(typ)
        self.accumulate = accumulate
        self.values = list[Value]()

    def update(self, values: Sequence[Value | list[Value]]) -> bool:
        updated = False
        if not self.accumulate:
            updated = bool(self.values)
            self.values = list[Value]()
        if flat_values := tuple(_flatten(values)):
            updated = True
            self.values.extend(flat_values)
        return updated

    def get(self) -> Sequence[Value]:
        if self.values:
            return list(self.values)
        else:
            raise EmptyChannelError

LastValue Channel(只保留最新值)不同,Topic 可以在一个超步内接收多个值。_flatten 辅助函数将嵌套列表展平,使得无论是单个 Send 还是 Send 列表都能正确处理。

12.3.2 _flatten 的展平逻辑

def _flatten(values: Sequence[Value | list[Value]]) -> Iterator[Value]:
    for value in values:
        if isinstance(value, list):
            yield from value
        else:
            yield value

这个简洁的生成器函数实现了一层展平。它的设计意图是:Channel 的 update 方法接收的是一个”来自各任务的写入值列表”。每个写入值本身可以是单个 Send,也可以是一个 Send 列表。_flatten 把这两层统一为一个平坦的 Send 序列。

12.3.3 accumulate 模式

accumulate 参数控制 Topic 的跨超步行为:

  • accumulate=False(默认):每个超步开始时清空旧值,只保留本步写入的新值。这是 TASKS Channel 使用的模式——每一轮只处理当前步产生的 Send。
  • accumulate=True:值跨超步累积,适用于需要收集所有历史消息的场景。
sequenceDiagram
    participant Step1 as 超步 1
    participant Topic as Topic Channel
    participant Step2 as 超步 2

    Step1->>Topic: update([Send_A, Send_B])
    Note over Topic: values = [Send_A, Send_B]
    Topic-->>Step2: get() -> [Send_A, Send_B]
    Step2->>Topic: update([Send_C])
    Note over Topic: accumulate=false<br/>清空旧值后追加<br/>values = [Send_C]

12.3.4 Topic 与其他 Channel 的对比

graph TB
    subgraph Channel类型对比
        LV[LastValue<br/>保留最新单值<br/>覆盖语义]
        BOA[BinaryOperatorAggregate<br/>通过 reducer 聚合<br/>如 operator.add]
        TP[Topic<br/>多值列表<br/>pub/sub 语义]
        NB[NamedBarrierValue<br/>同步屏障<br/>等待所有写入者]
    end

    LV -->|用于| S1[普通状态字段]
    BOA -->|用于| S2["Annotated[list, op.add]"]
    TP -->|用于| S3[TASKS 分发 Send]
    NB -->|用于| S4[节点同步]
特性LastValueBinaryOperatorAggregateTopic
单步值数量11(经 reducer 合并)N
更新语义覆盖聚合追加
跨步保留可选(accumulate)
典型用途普通状态字段Annotated[list, operator.add]TASKS 分发

12.4 Map-Reduce 模式的完整实现

12.4.1 经典用法

让我们通过一个完整的 map-reduce 示例来理解 Send 的工作流程:

from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]

def continue_to_jokes(state: OverallState):
    """条件边函数:为每个 subject 生成一个 Send"""
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def generate_joke(state: dict) -> dict:
    """工作节点:注意它接收的不是 OverallState,而是 Send 的 arg"""
    return {"jokes": [f"Joke about {state['subject']}"]}

builder = StateGraph(OverallState)
builder.add_node("generate_joke", generate_joke)
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()

result = graph.invoke({"subjects": ["cats", "dogs", "robots"]})
# {'subjects': ['cats', 'dogs', 'robots'],
#  'jokes': ['Joke about cats', 'Joke about dogs', 'Joke about robots']}

这里的关键在于:

  1. continue_to_jokes 返回一个 Send 列表(而非字符串节点名)
  2. 每个 Send 都指向 "generate_joke" 节点,但携带不同的输入
  3. generate_joke 接收的 state{"subject": "cats"} 这样的小字典,而不是完整的 OverallState
  4. 输出通过 Annotated[list[str], operator.add] 的 reducer 自动合并

12.4.2 执行时序

sequenceDiagram
    participant Input as 用户输入
    participant Start as __start__
    participant Cond as 条件边函数
    participant TASKS as TASKS Channel
    participant J1 as generate_joke #0
    participant J2 as generate_joke #1
    participant J3 as generate_joke #2
    participant Agg as Reducer(operator.add)
    participant EndN as __end__

    Input->>Start: {subjects: [cats, dogs, robots]}
    Start->>Cond: 读取状态
    Cond->>TASKS: 写入 [Send_0, Send_1, Send_2]
    Note over TASKS: Topic Channel 存储三个 Send

    par 并行执行超步
        TASKS->>J1: Send("generate_joke", {subject: cats})
        TASKS->>J2: Send("generate_joke", {subject: dogs})
        TASKS->>J3: Send("generate_joke", {subject: robots})
    end

    J1->>Agg: {jokes: ["Joke about cats"]}
    J2->>Agg: {jokes: ["Joke about dogs"]}
    J3->>Agg: {jokes: ["Joke about robots"]}
    Agg->>EndN: jokes 合并为完整列表

12.5 Send 的调度链路深度剖析

12.5.1 条件边写入 TASKS Channel

当条件边函数返回 Send 对象时,分支处理逻辑会识别返回值的类型。如果返回值是 Send 或包含 Send 的列表,它们会被写入 TASKS Channel。在 Command 的处理路径中,这个逻辑体现在 map_command 函数里:

# langgraph/pregel/_io.py
def map_command(cmd: Command) -> Iterator[tuple[str, str, Any]]:
    """Map input chunk to a sequence of pending writes."""
    if cmd.graph == Command.PARENT:
        raise InvalidUpdateError("There is no parent graph")
    if cmd.goto:
        if isinstance(cmd.goto, (tuple, list)):
            sends = cmd.goto
        else:
            sends = [cmd.goto]
        for send in sends:
            if isinstance(send, Send):
                yield (NULL_TASK_ID, TASKS, send)  # 写入 TASKS Channel
            elif isinstance(send, str):
                yield (NULL_TASK_ID, f"branch:to:{send}", START)

注意这里的 NULL_TASK_ID——这表示写入操作来自图的控制流层面,而非某个具体的运行中的任务。Send 对象被写入到 TASKS Channel(__pregel_tasks),而普通的字符串节点名被写入到 branch:to:{node} Channel。

12.5.2 prepare_next_tasks 中的 PUSH 任务创建

prepare_next_tasks 中,TASKS Channel 中的 Send 对象被逐一转化为可执行任务:

# langgraph/pregel/_algo.py
def prepare_next_tasks(...):
    input_cache: dict[INPUT_CACHE_KEY_TYPE, Any] = {}
    tasks: list[PregelTask | PregelExecutableTask] = []

    # 第一步:消费 TASKS Channel 中的 Send 对象(PUSH 任务)
    tasks_channel = cast(Topic[Send] | None, channels.get(TASKS))
    if tasks_channel and tasks_channel.is_available():
        for idx, _ in enumerate(tasks_channel.get()):
            if task := prepare_single_task(
                (PUSH, idx),  # task_path 标记为 PUSH 类型
                None,
                checkpoint=checkpoint,
                ...
            ):
                tasks.append(task)

    # 第二步:处理 PULL 类型的任务(常规节点触发)
    for name in candidate_nodes:
        if task := prepare_single_task(
            (PULL, name),
            ...
        ):
            tasks.append(task)

    return {t.id: t for t in tasks}

这里的核心区分是 PUSH vs PULL

  • PULL 任务:由 Channel 版本更新触发的常规节点执行
  • PUSH 任务:由 Send 对象显式创建的动态任务

PUSH 任务总是在 PULL 任务之前被创建,两者在同一个超步内并行执行。

12.5.3 prepare_push_task_send 的核心逻辑

prepare_single_task 识别到 task_path[0] == PUSH 且不是函数式 API 的 Call 时,它调用 prepare_push_task_send

def prepare_push_task_send(task_path, task_id_checksum, *, ...):
    # 从 TASKS Channel 中按索引取出 Send 对象
    idx = cast(int, task_path[1])
    sends: Sequence[Send] = channels[TASKS].get()
    if idx < 0 or idx >= len(sends):
        return
    packet = sends[idx]
    if not isinstance(packet, Send):
        logger.warning(f"Ignoring invalid packet type {type(packet)}")
        return

    # 验证目标节点存在
    if packet.node not in processes:
        logger.warning(f"Ignoring unknown node name {packet.node}")
        return
    proc = processes[packet.node]

    # 生成确定性任务 ID
    triggers = PUSH_TRIGGER  # (PUSH,)
    checkpoint_ns = f"{parent_ns}{NS_SEP}{packet.node}" if parent_ns else packet.node
    task_id = task_id_func(
        checkpoint_id_bytes,
        checkpoint_ns,
        str(step),
        packet.node,
        PUSH,
        str(idx),
    )

任务 ID 的生成是确定性的——基于 checkpoint ID、节点名、步数和 Send 索引。这保证了相同的图在相同的状态下会生成完全相同的任务 ID,这对于 checkpoint 恢复和幂等性至关重要。

12.5.4 Send 的输入如何传递给节点

创建 PregelExecutableTask 时,Send 的 arg 直接作为任务的 input

# 在 prepare_push_task_send 中
return PregelExecutableTask(
    packet.node,       # name:目标节点名
    packet.arg,        # input:这就是 Send.arg
    proc_node,         # proc:节点的 Runnable
    writes,            # writes:输出 deque
    config,            # 配置
    triggers,          # (PUSH,) 触发器标记
    proc.retry_policy or retry_policy,
    cache_key,
    task_id,
    task_path[:3],
    writers=proc.flat_writers,
    subgraphs=proc.subgraphs,
)

这意味着 generate_joke 函数接收到的 state 参数实际上就是 packet.arg,即我们在 Send("generate_joke", {"subject": "cats"}) 中传入的字典。

12.6 动态 Fanout 的完整数据流

flowchart TB
    subgraph 编译期
        CE[条件边注册] --> BranchSpec[BranchSpec 存储分支函数]
        BranchSpec --> Writer[生成 ChannelWrite 写入器]
    end

    subgraph "运行期 - 超步 N"
        CondFn[条件边函数执行] -->|返回 Send 列表| WriteTasks[写入 TASKS Channel]
        WriteTasks --> TopicCh["Topic&lt;Send&gt; Channel"]
    end

    subgraph "运行期 - 超步 N+1"
        TopicCh --> PNT[prepare_next_tasks]
        PNT --> PPTS1["prepare_push_task_send(idx=0)"]
        PNT --> PPTS2["prepare_push_task_send(idx=1)"]
        PNT --> PPTSN["prepare_push_task_send(idx=N)"]
        PPTS1 --> Task1[PregelExecutableTask 1]
        PPTS2 --> Task2[PregelExecutableTask 2]
        PPTSN --> TaskN[PregelExecutableTask N]
    end

    subgraph "运行期 - 执行与聚合"
        Task1 -->|并行执行| Runner[PregelRunner.tick]
        Task2 --> Runner
        TaskN --> Runner
        Runner -->|写入输出 Channel| AW[apply_writes]
        AW -->|BinaryOperatorAggregate| Merged[合并后的状态]
    end

12.7 条件边返回多个 Send 的工程细节

12.7.1 分支处理中的 Send 识别

在编译过程中,条件边的返回值会经过一个写入器函数处理。这个写入器能够区分三种返回类型:

  1. 字符串:表示一个目标节点名,转化为对应的 branch:to:{node} Channel 写入
  2. Send 对象:直接写入 TASKS Channel
  3. 列表:可以包含字符串和 Send 的混合
# 条件边可以混合返回字符串和 Send
def route(state):
    sends = [Send("worker", {"task": t}) for t in state["tasks"]]
    sends.append("summary_node")  # 同时也触发一个常规节点
    return sends

这种混合返回的能力非常强大——你可以在一个条件边中同时触发多个动态并行任务和静态节点。

12.7.2 Command.goto 中的 Send

Command 是 LangGraph 更强大的控制流原语,它的 goto 字段也支持 Send:

from langgraph.types import Command, Send

def my_node(state):
    return Command(
        update={"status": "dispatched"},
        goto=[
            Send("processor", {"item": item})
            for item in state["items"]
        ]
    )

map_command 函数中,Command.goto 中的 Send 对象以完全相同的方式被写入 TASKS Channel。这使得动态并行不仅可以在条件边中触发,还可以在任何节点的返回值中触发。

Command 的完整形状比”一个带 goto 的对象”要大。源码 langgraph/types.py:652 的真实定义揭示了它的四个字段——每一个都对应一个不同的控制流维度:

# langgraph/types.py:652
@dataclass(**_DC_KWARGS)
class Command(Generic[N], ToolOutputMixin):
    graph:  str | None = None         # 目标图:None / Command.PARENT
    update: Any | None = None         # 状态更新
    resume: dict[str, Any] | Any | None = None  # 中断恢复值
    goto:   Send | Sequence[Send | N] | N = ()  # 下一跳

四个字段是正交的——任何组合都合法。你可以同时 update 状态 + goto 下一个节点 + 回复一组 resume 值——一次节点返回就承担”改状态 + 走路由 + 解中断”三件事。这种”合一表达”能力让 LangGraph 能用单一返回类型实现 LangSmith 那套复杂 Agent 行为,不需要设计一堆不同的 sentinel 值让节点区别返回。

graph 字段是子图逃逸通道graph=Command.PARENTCommand 类属性里的特殊标记)告诉 Pregel”这个命令目标不是当前图、是父图”。用途:子图作为 tool 被父图调用时,子图内某个节点想直接改变父图的路由或状态,不需要”返回值→父图再解析”的两级处理,可以用 Command(graph=Command.PARENT, goto="finalize") 一步到位。注意 graph 只接受 NoneCommand.PARENT 两个值——不是任意图名——因为 Pregel 只知道直接父图的存在、无法定位到更远祖先。

update 的类型很宽松Any | None——实际可以是 dict、tuple list(走 _update_as_tuples,line 687 会拆成 [(channel, value), ...])、甚至更奇怪的结构。_update_as_tuples 内部做一次规整:dict 直接 items()、list of 2-tuple 原样保留、其它形态抛错。这让 Command 对不同 state_schema(TypedDict / Pydantic / dataclass / 自定义 reducer)都能自然适配。

resume 的双态dict[str, Any] | Any | None):dict 形态是”按 interrupt id 精确路由”——用于单节点里有多个 interrupt 点;单值形态是”下一个待恢复的中断直接拿这个”——简化场景常用。这个二态是 interrupt() 机制的对称设计:interrupt 单次触发就用单值 resume、多次触发就用 dict 按 id 配对。

回到 Send 这个主题:Send 是 Command 的一个合法 goto(类型注解 Send | Sequence[Send | N] | N),意味着”直接返回 Send 列表” 和 “返回 Command(goto=[Send, ...])” 在 Pregel 看来最终都走 map_command 同一条路径。差别只在”节点除了派生任务以外还要不要顺便改状态/解中断”。这种统一又分层的设计——简单场景零开销(裸 Send 即可),复杂场景也不需要新语法(Command 容纳一切)——是 LangGraph 控制流 API 简洁度的来源。

12.7.3 Send 与中断的交互

当 Send 派生的任务遇到 interrupt() 时,情况变得有趣。每个 PUSH 任务都有自己独立的 checkpoint_ns(格式为 {parent_ns}:{node_name}:{task_id}),这意味着:

  • 每个 Send 任务的中断是独立追踪的
  • 恢复时,框架能准确地将 resume 值路由到正确的任务
  • 尚未完成的 Send 任务可以被单独恢复,已完成的不会重新执行
stateDiagram-v2
    [*] --> Dispatched: Send 列表写入 TASKS
    Dispatched --> Running: prepare_push_task_send
    Running --> Completed: 正常完成
    Running --> Interrupted: 遇到 interrupt()
    Interrupted --> Resumed: Command(resume=value)
    Resumed --> Completed: 从头重新执行节点
    Completed --> Aggregated: apply_writes 合并输出
    Aggregated --> [*]

12.8 设计决策

12.8.1 为什么 Send 不是 Channel?

一个自然的疑问是:为什么不把 Send 设计为一种新的 Channel 类型?答案在于关注点分离:

  • Send 是数据:它只是一个”请求执行某节点”的消息包
  • Topic 是基础设施:它负责存储和分发这些消息包
  • TASKS 是约定:它是一个预定义的系统级 Channel 名

这种分层让 Send 保持简洁(仅两个字段),同时复用了 Topic Channel 的全部能力。如果未来需要支持新的分发语义(如优先级队列),只需替换 TASKS Channel 的实现,而 Send 本身无需改变。

12.8.2 确定性任务 ID 的重要性

prepare_push_task_send 中的任务 ID 生成使用了确定性哈希:

task_id = task_id_func(
    checkpoint_id_bytes,
    checkpoint_ns,
    str(step),
    packet.node,
    PUSH,
    str(idx),  # Send 在列表中的索引
)

这个设计确保了:

  1. 幂等性:相同状态下重新执行会生成相同的任务 ID
  2. Checkpoint 恢复:从 checkpoint 恢复时,已完成的任务不会被重复创建
  3. 并发安全:不同的 Send 索引产生不同的任务 ID,避免冲突

task_id_func 根据 checkpoint 版本选择不同的哈希算法——v2+ 使用 xxhash(更快),旧版本使用 UUID5(兼容性)。

12.8.3 Send.arg 为 Any 类型的权衡

Send.arg 类型为 Any 带来了极大的灵活性——你可以发送字典、Pydantic 模型、甚至原始字符串。但这也意味着类型安全完全依赖于开发者:

# 正确:节点期望 dict,Send 发送 dict
Send("process", {"key": "value"})

# 也正确:节点期望 str,Send 发送 str
Send("process", "hello")

# 运行时错误:类型不匹配不会在编译期被检查到
Send("process", 42)  # 如果节点期望 dict,运行时会失败

这是一个典型的灵活性 vs 安全性权衡。LangGraph 选择了灵活性,因为动态并行的场景下,输入类型往往在运行时才确定。

12.8.4 Reduce 阶段的隐式实现

LangGraph 的 map-reduce 模式中,reduce 阶段是隐式的——通过 Channel 的 reducer 函数自动完成:

class OverallState(TypedDict):
    jokes: Annotated[list[str], operator.add]  # reducer 就是 reduce 逻辑

这与 MapReduce 框架中显式定义 reduce 函数不同。在 apply_writes 中,所有并行任务的输出被收集后,通过 BinaryOperatorAggregate Channel 的 update 方法依次应用 reducer:

# apply_writes 中的关键逻辑
pending_writes_by_channel: dict[str, list[Any]] = defaultdict(list)
for task in tasks:
    for chan, val in task.writes:
        if chan in channels:
            pending_writes_by_channel[chan].append(val)

for chan, vals in pending_writes_by_channel.items():
    if chan in channels:
        channels[chan].update(vals)  # BinaryOperatorAggregate 应用 reducer

这种隐式设计的优势在于,同一个 reducer 同时服务于静态边和动态 Send 的输出合并,无需开发者区分两种场景。

12.9 高级模式

12.9.1 Send 到子图节点

Send 可以将数据发送到一个子图节点,此时 arg 会成为子图的输入:

def dispatch_to_subgraph(state):
    return [
        Send("analysis_subgraph", {"document": doc, "mode": "sentiment"})
        for doc in state["documents"]
    ]

子图会以 arg 作为输入开始执行,它的 checkpoint_ns 会自动嵌套在父图的命名空间下。

12.9.2 动态 Fan-in 的注意事项

所有 Send 任务在同一个超步内并行执行,它们的输出在下一个超步通过 apply_writes 合并。这意味着:

  • 如果需要等待所有并行任务完成后才进行下一步,只需让所有 Send 任务有相同的后续边
  • 如果某个并行任务失败,其错误会被记录但不会阻止其他任务的执行(除非配置了 interrupt 行为)
graph TB
    Start[START] -->|条件边| Fan{fanout}
    Fan -->|"Send(arg1)"| W1[worker]
    Fan -->|"Send(arg2)"| W2[worker]
    Fan -->|"Send(arg3)"| W3[worker]
    W1 -->|reducer 聚合| Agg[aggregator]
    W2 -->|reducer 聚合| Agg
    W3 -->|reducer 聚合| Agg
    Agg --> End[END]

    style Fan fill:#f0e6ff,stroke:#333,stroke-width:2px
    style Agg fill:#e6f0ff,stroke:#333,stroke-width:2px

12.9.3 Send 的性能考量

每个 Send 都会创建一个独立的 PregelExecutableTask,包括独立的配置、写入 deque、checkpoint 命名空间等。对于大量并行任务(如数百个 Send),需要注意:

  • 每个任务都会产生 checkpoint 写入(如果启用了 checkpointer)
  • 线程池的大小可能成为瓶颈
  • 所有任务的输出在内存中累积直到 apply_writes

对于极大规模的并行场景,建议在条件边中控制 Send 的数量,或使用批处理策略。

12.9.4 实战:文档批量处理流水线

以下是一个更完整的 map-reduce 实战示例——批量处理文档列表,每个文档独立分析后汇总:

from typing import Annotated, TypedDict
from langgraph.types import Send
from langgraph.graph import END, START, StateGraph
import operator

class PipelineState(TypedDict):
    documents: list[dict]  # 输入文档列表
    results: Annotated[list[dict], operator.add]  # 分析结果(reducer 合并)
    summary: str  # 最终汇总

class DocInput(TypedDict):
    """Send 传递给 analyze_doc 的输入"""
    doc_id: str
    content: str
    analysis_type: str

def dispatch_documents(state: PipelineState):
    """条件边:为每个文档创建一个 Send"""
    return [
        Send("analyze_doc", DocInput(
            doc_id=doc["id"],
            content=doc["content"],
            analysis_type="sentiment"
        ))
        for doc in state["documents"]
    ]

def analyze_doc(state: DocInput) -> dict:
    """工作节点:分析单个文档"""
    # 注意:state 的类型是 DocInput,不是 PipelineState
    result = {
        "doc_id": state["doc_id"],
        "sentiment": "positive",  # 实际中调用 LLM
        "length": len(state["content"])
    }
    return {"results": [result]}

def summarize(state: PipelineState) -> dict:
    """汇总节点:整合所有分析结果"""
    total = len(state["results"])
    positive = sum(1 for r in state["results"] if r["sentiment"] == "positive")
    return {"summary": f"Analyzed {total} docs, {positive} positive"}

builder = StateGraph(PipelineState)
builder.add_node("analyze_doc", analyze_doc)
builder.add_node("summarize", summarize)
builder.add_conditional_edges(START, dispatch_documents)
builder.add_edge("analyze_doc", "summarize")
builder.add_edge("summarize", END)
graph = builder.compile()

这个例子展示了 Send 的完整生命周期:条件边创建 Send 列表、每个 Send 携带独立的输入类型、工作节点独立执行、输出通过 reducer 自动合并、最终汇总节点处理合并后的结果。

12.9.5 Send 与 Checkpoint 的交互细节

当启用了 checkpointer 时,Send 任务的 checkpoint 行为值得深入理解:

  1. 任务写入持久化:每个 PUSH 任务的 writes 会通过 put_writes 异步持久化
  2. 恢复时的匹配:从 checkpoint 恢复时,pending_writes 中已有结果的任务不会被重新执行
  3. Send 的序列化:Send 对象本身作为 TASKS Channel 的值被保存在 checkpoint 中,恢复时用于重建任务
flowchart LR
    subgraph "首次执行"
        S1["Send 列表写入 TASKS"] --> T1["创建 PUSH 任务"]
        T1 --> E1["执行任务"]
        E1 --> W1["写入 pending_writes"]
        W1 --> CP1["保存 Checkpoint"]
    end

    subgraph "恢复执行"
        CP1 --> R1["加载 Checkpoint"]
        R1 --> R2["重建 TASKS Channel"]
        R2 --> R3["prepare_next_tasks"]
        R3 --> R4{"pending_writes<br/>中有结果?"}
        R4 -->|是| Skip["跳过已完成的任务"]
        R4 -->|否| Exec["重新执行任务"]
    end

12.9.6 langgraph/channels/ 8 文件 918 行的真实拆分

§12.3 讨论 Topic 时给出了和其他 channel 的对比表——把整个 channels/ 目录的真实账本展开——

文件角色
named_barrier_value.py167本目录最大——多 source 同步屏障(等所有命名输入到齐才触发)
last_value.py151默认 reducer——后写覆盖前写、单值
binop.py134BinaryOperatorAggregate(§12.4 reduce 阶段的复用基础——operator.add 拼 list 就靠它)
base.py121BaseChannel ABC——定义 update / get / consume / checkpoint 协议
topic.py94Topic Channel(本章 §12.3 主角)——pub/sub 语义,Send 收集走它
ephemeral_value.py79单步生命周期值(下一步消失,不持久化)
untracked_value.py73不参与 checkpoint/version 的值
any_value.py72”任意一个写入即可”(first-write-wins 反语义)
__init__.py27公共 export

三条值得记住的物理事实——

  1. Topic Channel 仅 94 行——本章用了一节(§12.3)讨论它的 pub/sub + flatten + accumulate 三种模式——94 行就实现完——印证 LangGraph 对channel 抽象的设计纪律:每种 channel 只负责”如何接收 update + 何时 emit”两件事、其余复杂度全在 Pregel loop 里
  2. named_barrier_value.py 167 行是最大——但本章没提到它——因为 NamedBarrier 是 LangGraph 处理 “多分支汇总到同一节点” 时的隐式 channel,例如 add_edge(["a", "b"], "c") 让 c 等 a 和 b 都到达;用户感知不到、但运行时比 Topic 还重
  3. binop.py 134 行 = BinaryOperatorAggregate——是 §12.4 map-reduce 模式 reduce 阶段的实际 channel——Annotated[list[str], operator.add] 这种类型注解在 compile 时就实例化为一个 binop channel 实例——reducer 不是新概念、只是 binop channel 的别名

Send 类的真实定义(types.py:574-650 实测)——

class Send:
    __slots__ = ("node", "arg")    # 严格 2 字段、节省 dict 开销
    def __init__(self, /, node: str, arg: Any) -> None: ...
    def __hash__(self) -> int:
        return hash((self.node, self.arg))
    def __eq__(self, value: object) -> bool: ...

两点细节——

  1. __slots__ 限定 2 字段——Send 实例没有 __dict__——一个 Send 在内存里只占几十字节、跑 1000 次 Map-Reduce 也不会因 dict 开销爆炸
  2. /node 是 positional-only 参数——用户不能写 Send(node="x", arg=...)——只能写 Send("x", {...})——这是 Python 3.8+ 的特性、目的是降低 API 表面的兼容负担(未来如果改字段名 node → target、关键字写法的代码会全崩、positional 不会)

StreamMode 实测 7 种 (types.py:118)——values / updates / checkpoints / tasks / debug / messages / custom——和小结里那一句”下一章七种 StreamMode”对得上。

12.9A 源码账本(截至 main 2026-04-22)

前面章节把 Send 链路讲清楚了——但**“链路”不等于”账本”。以下表格是对照仓库当前 main 分支 grep 得来的真实文件 + 行号清单**,供校对和跳读用。

12.9A.1 Send/PUSH 链路涉及的真实文件

文件职责
libs/langgraph/langgraph/types.py748-792Send 类定义——__slots__ = ("node", "arg") + positional-only /
libs/langgraph/langgraph/types.py796+Command dataclass——goto 字段类型为 Send | Sequence[Send | N] | N
libs/langgraph/langgraph/types.py99-101StreamMode Literal——7 种(values/updates/checkpoints/tasks/debug/messages/custom)
libs/langgraph/langgraph/constants.py61公共 re-export 入口——PUSH/TASKS 等已迁到 _internal
libs/langgraph/langgraph/_internal/_constants.py90系统级常量本体——Send 派发相关的所有字符串
libs/langgraph/langgraph/_internal/_constants.py17TASKS = "__pregel_tasks"
libs/langgraph/langgraph/_internal/_constants.py58PUSH = "__pregel_push"
libs/langgraph/langgraph/_internal/_constants.py61PULL = "__pregel_pull"
libs/langgraph/langgraph/_internal/_constants.py72NULL_TASK_ID = "00000000-0000-0000-0000-000000000000"
libs/langgraph/langgraph/_internal/_constants.py64NS_SEP = "|"——checkpoint 命名空间分隔符
libs/langgraph/langgraph/_internal/_constants.py28CONFIG_KEY_SEND = "__pregel_send"
libs/langgraph/langgraph/channels/topic.py99Topic Channel + _flatten helper
libs/langgraph/langgraph/channels/topic.py13-17_flatten 生成器——一层展开 Sequence[Value | list[Value]]
libs/langgraph/langgraph/pregel/_algo.py909Pregel 调度核心
libs/langgraph/langgraph/pregel/_algo.py395prepare_single_task 入口
libs/langgraph/langgraph/pregel/_algo.py410task_id_func = _xxhash_str if checkpoint["v"] > 1 else _uuid5_str
libs/langgraph/langgraph/pregel/_algo.py633prepare_push_task_send——把 Send 转成 PregelExecutableTask
libs/langgraph/langgraph/pregel/_algo.py854_uuid5_str 兼容老 checkpoint v1
libs/langgraph/langgraph/pregel/_algo.py862_xxhash_str v2+ 快速哈希

12.9A.2 几条值得记住的物理事实

  1. constants.py 只有 61 行——大部分系统常量已迁到 _internal/_constants.py(90 行)——TASKS / PUSH / PULL / NULL_TASK_ID / NS_SEP / CONFIG_KEY_SEND 都在后者——说明 LangGraph 把”实现细节”和”对外 API”做了严格分层:用户 from langgraph.constants import START, END 看到的是稳定表面,内部 re-export 通过 __getattr__ 动态代理(constants.py line 28-53),让重构自由度最大化
  2. NS_SEP"|" 不是 ":"——本章 §12.5.3 写 checkpoint_ns = f"{parent_ns}{NS_SEP}{packet.node}" 时这个分隔符就是竖线——子图嵌套命名空间在 checkpoint store 里形如 "parent|analyze_doc|xxh3_abc"——选 | 的原因是节点名、task_id 都不含它、不会歧义
  3. task_id v1 vs v2 分叉_algo.py:410)——checkpoint["v"] > 1 走 xxhash、否则 UUID5——2024 下半年 LangGraph 从 v1 升 v2 的时候保留了 UUID5 老路径给旧 checkpoint 做向后兼容——这意味着线上长跑的图如果 checkpoint 版本很老,恢复时仍然用慢但稳定的 UUID5——不会因为版本新旧产生 task_id 漂移、从而把已完成任务误判成未执行
  4. TASKS 的字符串值 "__pregel_tasks" 以双下划线开头——这是 LangGraph 的系统 Channel 命名约定——用户定义的 Channel 名不允许以 __pregel_ 开头——避免和内部 Channel 冲突——同样约定的还有 __pregel_push / __pregel_pull / __pregel_send / __interrupt__
  5. CONFIG_KEY_SEND = "__pregel_send"——这是节点运行时拿到 Send 派发器的 config 键——即在节点内部可以通过 config["configurable"]["__pregel_send"] 直接取出一个 send(packet: Send) 回调——不经过条件边也能写入 TASKS——这是 functional API 的 Send 运行时入口

12.9B Fan-out 三种机制对比

LangGraph 里能触发”多分支”的机制不止 Send 一种——以下把 三种 fan-out 路径 放一起横向看,避免实战时选错工具。

12.9B.1 机制对照表

维度add_conditional_edges(返回节点名)Send 列表(本章主角)add_edge(list, target)(多源汇聚)
决策时机运行时(超步末)运行时(超步末)编译期(静态拓扑)
分支数量编译期固定(候选节点集)运行时任意(可 0、可 N)编译期固定
目标节点一个或多个候选中的子集任意可派发节点(含子图)多个 source → 一个 target
输入载荷完整图状态读 Send.arg(可完全脱离主 state读完整图状态
承载 Channelbranch:to:{node}(EphemeralValue)__pregel_tasks(Topic)NamedBarrierValue
Channel 行号channels/ephemeral_value.py 79 行channels/topic.py 99 行channels/named_barrier_value.py 167 行(目录最大)
同步语义触发即走每个 Send 独立 PUSH 任务等所有 source 到齐才触发 target
任务类型PULLPUSHPULL
典型用途路由(if/switch)Map-Reduce、动态批处理多分支汇合(join/barrier)
与 checkpoint 关系条件边结果写 branch Channel每个 Send 独立任务 IDbarrier 状态写 checkpoint
可发送到子图是(arg 成为子图 input)
可混合返回只能返回字符串可混合 Send + 字符串(§12.7.1)N/A

12.9B.2 选择决策图

flowchart TD
    Q1{分支数量在编译期<br/>能否确定?}
    Q1 -->|能| Q2{是否需要<br/>多 source 同步?}
    Q1 -->|不能/依赖输入| SEND[Send 列表<br/>PUSH 动态派发]

    Q2 -->|需要| BARRIER[add_edge#40;list, target#41;<br/>NamedBarrierValue 同步]
    Q2 -->|不需要| Q3{目标节点是<br/>候选集子集?}

    Q3 -->|是| COND[add_conditional_edges<br/>返回节点名]
    Q3 -->|否| SEND

    SEND --> Q4{节点期望收到<br/>完整主 state?}
    Q4 -->|是| WARN[警告:Send.arg 必须<br/>手工构造完整 state<br/>或考虑换条件边]
    Q4 -->|否| OK[正确选择 Send]

    style SEND fill:#f0e6ff,stroke:#333
    style BARRIER fill:#e6f0ff,stroke:#333
    style COND fill:#fff0e6,stroke:#333

12.9B.3 易错点:Send 下游节点的状态读取

最常见的坑——generate_joke(state) 收到的 stateSend.arg 而不是主 state——这意味着:

def generate_joke(state):           # state 就是 {"subject": "cats"}
    subjects = state["subjects"]    # KeyError!主 state 里的字段这里没有
    return {"jokes": [...]}

如果 Send 下游节点同时也需要主 state 的其它字段,有两条路径——

  1. 在条件边里把需要的上下文塞进 Send.arg
    [Send("worker", {**s, "user_id": state["user_id"]}) for s in items]
  2. 在节点签名里显式声明 InjectedState(LangGraph 新版 functional API 提供)——框架会把主 state 作为第二参数注入、不经过 Send.arg——这让Send 负责路由、主 state 负责共享上下文、职责分离

12.9C Send 链路跨章校对

本书多章涉及 PUSH/PULL 任务、超步调度、Channel 写入——以下把和 Send 强相关的章节锚点梳理一次,方便来回查:

章节锚点和本章的关系
第 07 章 任务调度PregelRunner.tick / submit / commit 三阶段本章 §12.5 创建的 PUSH PregelExecutableTask 最终交给 07 章讲的 PregelRunner 执行——Send 派生多个任务后,submit 阶段并行提交、commit 阶段把 writes 回写 Channel
第 07 章PULL vs PUSH 任务区分本章 §12.5.2 的”PUSH 任务先于 PULL 创建”这条规则源头在 07 章——prepare_next_tasks 按 PUSH-first 顺序生成任务列表
第 08 章 Channel 体系BinaryOperatorAggregate / LastValue / Topic本章 §12.4 map-reduce 的 reduce 阶段复用 08 章的 BinaryOperatorAggregate(binop.py 134 行)——Annotated[list[str], operator.add] 编译时即实例化为 binop channel
第 08 章Channel 检查点 checkpoint / from_checkpoint 协议本章 §12.9.5 的”Send 对象作为 TASKS Channel 值被保存”实现在 Topic.checkpoint(topic.py)——values 列表直接序列化
第 09 章 Checkpointput_writes / get_writes本章 §12.9.5 的 pending_writes 恢复逻辑对应 09 章的 checkpointer.get_writes——PUSH 任务因为 task_id 确定性、重启时能精确匹配到之前未完成的写入
第 10 章 中断interrupt() + Command(resume=...)本章 §12.7.3 的”每个 Send 任务独立 checkpoint_ns”决定了 10 章的 interrupt 能按任务独立追踪——dict 形态 resume 按 interrupt id 路由、单值形态对应单任务恢复
第 11 章 条件边add_conditional_edges + BranchSpec本章 §12.7.1 的”条件边识别 Send 并写 TASKS”扩展 11 章的 BranchSpec 写入器——同一个分支函数可以混合返回 Send + 字符串
第 13 章 流式输出(下一章)StreamMode 7 种(types.py:99-101)本章小结提到的”七种 StreamMode”详情见 13 章——其中 tasks mode 能观测到每个 PUSH 任务的创建与完成事件、对调试动态并行极关键
第 14+ 章 子图 / Functional APICommand(graph=PARENT) + CONFIG_KEY_SEND本章 §12.7.2 的父图逃逸和 §12.9A.2 第 5 条的 CONFIG_KEY_SEND 运行时入口——在子图/functional API 中作为非条件边场景的 Send 触发方式

一张图总结 Send 在 LangGraph 全栈的位置

flowchart LR
    subgraph Ch11[第11章 条件边]
        BR[BranchSpec 写入器]
    end
    subgraph Ch12[第12章 本章 Send]
        SEND[Send 对象]
        TOPIC[TASKS = Topic Channel]
        PUSH_TASK[PUSH PregelExecutableTask]
    end
    subgraph Ch07[第07章 调度]
        RUNNER[PregelRunner.tick]
        SUBMIT[submit 并发]
        COMMIT[commit 写回]
    end
    subgraph Ch08[第08章 Channel]
        BINOP[BinaryOperatorAggregate]
    end
    subgraph Ch09[第09章 Checkpoint]
        CP[put_writes/get_writes]
    end
    subgraph Ch10[第10章 中断]
        INT[interrupt+resume]
    end

    BR -->|识别 Send| TOPIC
    SEND --> TOPIC
    TOPIC -->|prepare_push_task_send| PUSH_TASK
    PUSH_TASK --> RUNNER
    RUNNER --> SUBMIT
    SUBMIT --> COMMIT
    COMMIT -->|reducer 合并| BINOP
    PUSH_TASK -.独立 ns.-> CP
    PUSH_TASK -.独立 ns.-> INT

12.10 小结

本章深入分析了 LangGraph 的动态并行机制。Send 对象以极简的两字段设计(node + arg)实现了强大的运行时任务分发能力。它通过 Topic Channel 的 pub/sub 语义被收集和分发,通过 prepare_push_task_send 被转化为可执行任务,最终在 PregelRunner 中并行执行。

整个 map-reduce 模式的实现体现了 LangGraph 的核心哲学:复用而非重复发明。Send 复用了 Topic Channel 的基础设施,reducer 复用了 BinaryOperatorAggregate 的聚合能力,任务调度复用了 Pregel 的超步机制。开发者只需关注两件事:在条件边中返回 Send 列表(map),在状态定义中声明 reducer(reduce),框架会处理其余一切。

物理事实:channels/ 8 文件 918 行,Topic 仅 94 行实现 pub/sub+flatten+accumulate 三模式;Send 用 __slots__ 限定 2 字段保证内存极小、__init__/ 让 node 是 positional-only 屏蔽未来 API 改名的兼容负担。