LangGraph 设计与实现
第17章 多 Agent 模式实战
第17章 多 Agent 模式实战
17.1 引言
单个 Agent 能力有限——它只有一组工具、一种提示、一个执行循环。当任务复杂度上升时(如”帮我分析这家公司的财报,然后写一份投资建议书”),单个 Agent 要么因为工具太多导致 LLM 选择困难,要么因为系统提示过长影响推理质量。多 Agent 系统通过分工协作解决这个问题:不同的 Agent 负责不同的能力域,通过明确的通信机制协同完成复杂任务。
LangGraph 的底层基础设施——StateGraph、Send、Command、子图——天然支持多 Agent 模式的构建。图的节点可以是子图(即嵌套的 Agent),条件边可以实现动态路由,Send 支持一对多的任务分发,Command 支持跨图的状态更新和导航。
本章将系统介绍基于 LangGraph 实现的四种典型多 Agent 架构:Supervisor(监督者)、Swarm(蜂群)、分层 Agent 和协作 Agent。每种模式都有其适用场景和权衡,我们会结合源码和实战代码深入分析它们的设计原理。
本章要点
- Supervisor 模式——中央调度者分配任务给专业 Agent
- Swarm 模式——Agent 之间点对点的控制权交接
- 分层 Agent——多层级的 Supervisor 树形结构
- 协作 Agent——共享状态的对等协作
- 基于 LangGraph 原语实现各种模式的核心技术
17.2 多 Agent 的核心抽象
17.2.1 Agent 作为子图
在 LangGraph 中,一个 Agent 本质上就是一个 CompiledStateGraph。多 Agent 系统就是一个图中嵌套了多个子图:
# 每个 Agent 是一个独立的图
research_agent = create_react_agent(model, research_tools, name="researcher")
writer_agent = create_react_agent(model, writing_tools, name="writer")
# 多 Agent 系统是一个包含子图的父图
builder = StateGraph(TeamState)
builder.add_node("researcher", research_agent)
builder.add_node("writer", writer_agent)
17.2.2 通信机制的三种模式
graph TB
subgraph "模式 1:共享状态"
A1[Agent A] -->|写入状态| S1[共享 State]
S1 -->|读取状态| B1[Agent B]
end
subgraph "模式 2:Command 导航"
A2[Agent A] -->|"Command(goto='B')"| B2[Agent B]
end
subgraph "模式 3:Send 分发"
Router[路由器] -->|"Send('A', task1)"| A3[Agent A]
Router -->|"Send('B', task2)"| B3[Agent B]
end
17.2.1 重要迁移:create_react_agent 和 AgentState 自 v1.0 起已 deprecated
在深入具体模式前必须告知的一件事——本章所有示例里的 from langgraph.prebuilt import create_react_agent 和 AgentState 在 LangGraph v1.0 已经被标记 deprecated。打开 langgraph/prebuilt/chat_agent_executor.py:
# chat_agent_executor.py:274
@deprecated(
"create_react_agent has been moved to `langchain.agents`. "
"Please update your import to `from langchain.agents import create_agent`.",
category=LangGraphDeprecatedSinceV10,
)
def create_react_agent(...):
...
对应的 AgentState / AgentStatePydantic / AgentStateWithStructuredResponse / AgentStateWithStructuredResponsePydantic 四个 state 类都挂了同样的 @deprecated 装饰器(chat_agent_executor.py:53-107),迁移指引统一是”移到 langchain.agents”。
这次迁移的设计意图(可从源码注释和 LangGraph 1.0 release notes 推断):
- LangChain 负责 “LLM 应用的通用原语”——包括 agent、tool、prompt、memory 这些与具体编排引擎无关的概念
- LangGraph 专注于 “低层图执行引擎”——StateGraph、Pregel、checkpointer 这些编排层概念
create_react_agent本质是 “基于 LangGraph 实现的一个预设 agent 形态”——它跨两边但 “agent 概念” 的归属应该在 LangChain 层- 所以 1.0 把它搬到
langchain.agents.create_agent、让 LangGraph 库聚焦到更底层
用户现在的迁移路径:
# 旧(本章示例,仍能用但会警告)
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(model, tools)
# 新(v1.0+ 推荐)
from langchain.agents import create_agent
agent = create_agent(model, tools)
两者短期内都存在并工作——create_react_agent 仍然是 create_agent 的薄 wrapper、只是挂了 @deprecated 装饰器让 IDE/linter 产生警告。LangGraphDeprecatedSinceV10 类别让用户可以 warnings.filterwarnings("ignore", category=LangGraphDeprecatedSinceV10) 暂时屏蔽——但长期建议尽快迁移。
本章接下来的示例代码为了讲清楚核心概念、继续使用 create_react_agent 的传统写法——因为多数生产代码库也还在用、读者会在真实环境里遇到。看到 deprecation warning 请按上面指引迁移、核心架构模式(Supervisor/Swarm/分层)本身不变。
这种 “预设组件从执行引擎迁到应用框架” 的分离也和第 7 章 TraitDef 的 edition 补丁、第 10 章 serde_derive 的 @deprecated、第 15 章 vLLM 注册表的废弃接口一样——大型软件生态演进里分层重构的常态,用 @deprecated + 迁移指引做平滑过渡。
17.2.3 源码核对:Send 类的真实定义与 map-reduce 动机
Send 和 Command 是多 Agent 编排的两个最重要原语。打开 langgraph/types.py:574-646,Send 的真实定义极其简单:
@dataclass(frozen=True)
class Send:
"""A message or packet to send to a specific node in the graph."""
__slots__ = ("node", "arg")
node: str
arg: Any
def __init__(self, /, node: str, arg: Any) -> None:
self.node = node
self.arg = arg
def __hash__(self) -> int:
return hash((self.node, self.arg))
def __eq__(self, value: object) -> bool:
return (
isinstance(value, Send)
and self.node == value.node
and self.arg == value.arg
)
四个关键设计:
1、__slots__ 限制属性为两个。Send 只承载”往哪个节点 + 什么 payload”两件事——slots 让它成为 Python 里最轻的对象之一(没有 dict)。在 Pregel 一次调度创建数百个 Send 对象的 map-reduce 场景下,这个轻量化是实打实的内存优势。
2、frozen=True:Send 不可变——创建后 node 和 arg 都不能改。这条约束保证了 Pregel 在做任务去重、缓存、hash 时不会被 Send 的意外 mutation 打破。
3、__hash__ 基于 (node, arg) —— 两个 Send 只有节点名和参数都相同才相等。这让你可以用 set[Send] 去重——如果用户不小心在 Supervisor 里对同一子任务生成两个相同 Send,Pregel 能识别并合并成一次调度。
4、/ 第一位置参数语法:def __init__(self, /, node: str, arg: Any) ——Python 3.8+ 的 positional-only 参数。意味着 Send(node="x", arg=y) 写法不被支持,必须 Send("x", y)。这是一个显式语义选择——“Send 是位置参数为主的数据结构、不像 Command 那样有太多可选关键字”。
docstring 里的 map-reduce 例子(第 591-616 行)点出了 Send 最重要的用途:“the sent state can differ from the core graph’s state”——你可以给每个 Send 传不同的 arg,每个 target 节点看到的是自己的那份独立 state。Supervisor 给 researcher 和 writer 分别派任务时,不是让两个 Agent 看同一份全局 state——而是各自拿到定制的 task description。
17.2.4 源码核对:Command 的四个字段和它们之间的组合语义
Command(types.py:652-702)的定义比 Send 复杂得多——它是 LangGraph 多 Agent 通信的瑞士军刀:
@dataclass(**_DC_KWARGS)
class Command(Generic[N], ToolOutputMixin):
graph: str | None = None
update: Any | None = None
resume: dict[str, Any] | Any | None = None
goto: Send | Sequence[Send | N] | N = ()
PARENT: ClassVar[Literal["__parent__"]] = "__parent__"
四个字段各司其职:
- graph:目标图。
None=当前图;Command.PARENT(即字符串"__parent__")=上一级父图 - update:要对 state 做的更新——dict / NamedTuple / pydantic instance 都可
- resume:配合
interrupt()的恢复值 - goto:下一步跳到哪——单节点名、节点名序列、Send、Send 序列都可
这四个字段可以任意组合——Command 不是 “do one thing” 的简单原语,是”一次动作里表达多种意图”的复合命令。三种最常见的组合:
# 1. 纯导航(Swarm 的 handoff 工具)
Command(goto="billing_agent")
# 2. 导航 + state 更新(Supervisor 决策)
Command(goto="researcher", update={"messages": [decision_msg]})
# 3. 跨图导航(子图内 handoff 到父图的别的 agent)
Command(goto="writer_agent", graph=Command.PARENT)
三组合 1/2 是 § 17.3/17.4 的模式;组合 3 是分层模式的关键——子图内部的 Agent 能直接跳到父图的兄弟节点,不用层层向上通过 Supervisor 中转。
注意 goto 类型是 Send | Sequence[Send | N] | N ——节点名和 Send 可以混合列表:
Command(goto=[
"supervisor", # 先去 supervisor
Send("async_task", {...}), # 同时给 async_task 发任务
])
这种”一条 Command 同时触发多条路径”的能力让 Pregel 一步就能完成复杂的多目标调度——比发多条连续 Command 更高效,因为 Pregel 知道可以并行这些目标。
17.2.5 源码核对:Command._update_as_tuples 的五种 update 类型归一
Command.update 接受多种类型——dict、list[tuple]、NamedTuple、Pydantic、普通值。真实的 _update_as_tuples(types.py:687-700)把它们归一到 list[tuple[str, value]]:
def _update_as_tuples(self) -> Sequence[tuple[str, Any]]:
if isinstance(self.update, dict):
return list(self.update.items())
elif isinstance(self.update, (list, tuple)) and all(
isinstance(t, tuple) and len(t) == 2 and isinstance(t[0], str)
for t in self.update
):
return self.update
elif keys := get_cached_annotated_keys(type(self.update)):
# NamedTuple / TypedDict / Pydantic / dataclass with __annotations__
return get_update_as_tuples(self.update, keys)
elif self.update is not None:
return [("__root__", self.update)]
else:
return []
五个分支分别处理:
dict→items()list[tuple[str, Any]]→ 原样使用- 带
__annotations__的类(NamedTuple/TypedDict/Pydantic/dataclass)→ 用缓存的 annotated keys 提取字段 - 任何其他非 None 值 → 包成
[("__root__", self.update)]——供只有单一字段的 state schema 使用 None→ 空列表
第 3 种特别值得看——get_cached_annotated_keys(type(self.update))。它根据类型的 __annotations__ 缓存每个类对应的字段 keys——同一个 state 类型只做一次 annotation 扫描,后续都命中缓存。这是一个对频繁创建 Command 场景的典型优化——你在多 agent 系统里一秒发几十次 Command,key 提取开销应该摊到每个类型上只付一次。
第 4 种 "__root__" 是个特殊约定——某些 state schema 是 “只有一个字段、或者这个字段就是 state 本身” 的简化形式(比如 StateGraph(list[str]) 直接用 list 作 state),这时 Command.update 传一个 list 而不是 dict——框架自动包成 ("__root__", list) 喂给 reducer。
17.3 Supervisor 模式
17.3.1 架构概述
Supervisor 模式是最直观的多 Agent 架构:一个”主管”Agent 接收用户请求,分析任务需求,然后将子任务分配给专业 Agent。每个专业 Agent 完成后将结果返回给 Supervisor,Supervisor 决定是否需要更多工作。
graph TB
User[用户] --> Sup[Supervisor Agent]
Sup -->|"分配研究任务"| RA[Research Agent]
Sup -->|"分配写作任务"| WA[Writer Agent]
Sup -->|"分配代码任务"| CA[Coder Agent]
RA -->|"研究结果"| Sup
WA -->|"文章草稿"| Sup
CA -->|"代码片段"| Sup
Sup -->|"最终回复"| User
style Sup fill:#ffe6e6,stroke:#333,stroke-width:2px
17.3.2 实现方式
from typing import Annotated, Literal, TypedDict
from langchain_core.messages import BaseMessage, HumanMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
class SupervisorState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
next_agent: str
# 创建专业 Agent
research_agent = create_react_agent(model, research_tools, name="researcher")
writer_agent = create_react_agent(model, writing_tools, name="writer")
def supervisor_node(state: SupervisorState) -> Command:
"""Supervisor 决定下一步交给哪个 Agent"""
response = supervisor_llm.invoke([
SystemMessage(content="""你是一个任务分配主管。
分析用户请求,决定交给哪个 Agent:
- researcher: 负责信息检索和数据分析
- writer: 负责内容创作和文档编写
- FINISH: 任务完成,返回最终结果"""),
*state["messages"]
])
# 解析 LLM 的决策
next_agent = parse_decision(response.content)
if next_agent == "FINISH":
return Command(goto=END, update={"messages": [response]})
else:
return Command(
goto=next_agent,
update={"messages": [response]}
)
# 构建 Supervisor 图
builder = StateGraph(SupervisorState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("researcher", research_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "supervisor")
builder.add_edge("researcher", "supervisor") # 完成后返回 Supervisor
builder.add_edge("writer", "supervisor")
graph = builder.compile()
17.3.3 Supervisor 的关键设计
- 中心化决策:Supervisor 是唯一的决策者,专业 Agent 只负责执行
- 循环结构:Agent 完成后总是返回 Supervisor,由 Supervisor 决定下一步
- Command 导航:使用
Command(goto=next_agent)实现动态路由
17.3.4 适用场景与局限
适用场景:
- 子任务之间有明确的先后依赖
- 需要中央协调来决定任务顺序
- Agent 数量较少(3-5 个)
局限:
- Supervisor 是单点瓶颈
- 每次决策都需要额外的 LLM 调用
- 子任务之间无法直接通信
17.3.5 源码核对:ParentCommand 如何穿越图边界——pregel/_retry.py 的异常冒泡机制
§17.2.4 讲了 Command(graph=Command.PARENT) 可以”导航到父图的兄弟节点”。这个能力背后是一套完整的异常冒泡机制,值得完整拆开。
真实实现在 langgraph/pregel/_retry.py:127-140:
try:
return task.proc.invoke(task.input, config)
except ParentCommand as exc:
ns: str = config[CONF][CONFIG_KEY_CHECKPOINT_NS]
cmd = exc.args[0]
# strip task_ids from namespace for comparison
if cmd.graph in (ns, recast_checkpoint_ns(ns), task.name):
# this command is for the current graph, handle it
for w in task.writers:
w.invoke(cmd, config)
break
elif cmd.graph == Command.PARENT:
# this command is for the parent graph, assign it to the parent.
exc.args = (replace(cmd, graph=_checkpoint_ns_for_parent_command(ns)),)
# bubble up
raise
分三步理解:
1、子图里 return Command(goto="foo", graph=Command.PARENT) 时,Pregel 把它包装成 ParentCommand 异常向上 raise——和 §16.3.3-ter 讲过的 GraphBubbleUp 是同一个异常家族。
2、在每一层捕获时,Pregel 比较 cmd.graph 和当前图的 ns。如果相等,说明这条 Command 已经冒泡到了目标图——执行它;如果是 Command.PARENT,把 graph 字段改写成父图的真实 ns,继续 raise 让它向上冒。
**3、_checkpoint_ns_for_parent_command(_retry.py:57-83)**负责把子图的 ns 字符串转换成父图的 ns:
def _checkpoint_ns_for_parent_command(ns: str) -> str:
"""Return the checkpoint namespace for the parent graph.
The checkpoint namespace is a `|`-separated path. Each segment is usually
of the form `name:task_id` (e.g. `parent_first:<uuid>|node:<uuid>`), but the
runtime may also insert a purely-numeric segment (e.g. `|1`) to disambiguate
concurrent tasks.
Numeric segments are not real path levels, so we drop them before computing
the parent namespace.
"""
parts = ns.split(NS_SEP)
# Drop any trailing numeric selectors
while parts and parts[-1].isdigit():
parts.pop()
# Drop the current frame segment itself
if parts:
parts.pop()
# Drop any trailing numeric selectors for the parent frame
while parts and parts[-1].isdigit():
parts.pop()
return NS_SEP.join(parts)
这个函数体现了 checkpoint ns 结构的一个微妙细节——ns 里可能有纯数字段(比如 parent:<uuid>|1|node:<uuid> 里的 |1|)——代表 concurrent task 的 disambiguator。计算父 ns 时要跳过数字段,只看 name:task_id 形式的真实层级段。
这条设计让你能在 Swarm 或分层模式里从任意深度的子 Agent 直接 Command(goto="sibling_agent", graph=Command.PARENT)——不用手动算层级、不用知道自己嵌套多深——Pregel 自动把 Command 沿着调用栈冒泡到对应父图执行。
这条机制和第 6 章 React 的 “throw-to-boundary” 异常机制是同一思路——用 exception 实现”脱离当前上下文、跳到更外层处理器”——在 React 里是 Suspense/Error Boundary、在 LangGraph 里是 ParentCommand。两者都选择用 JavaScript/Python 的异常栈做控制流转移,因为这是最简洁的”单向向上传递、由最近的匹配者处理”的语义。
17.4 Swarm 模式
17.4.1 架构概述
Swarm 模式(蜂群模式)没有中央调度者。每个 Agent 在完成自己的工作后,可以直接将控制权交给另一个 Agent。这类似于客服系统中的”转接”——当前 Agent 判断用户的需求超出自己的能力范围时,主动转接给更合适的 Agent。
graph LR
User[用户] --> A1[接待 Agent]
A1 -->|"转接"| A2[技术支持 Agent]
A2 -->|"转接"| A3[退款处理 Agent]
A3 -->|"完成"| User
A1 -.->|"也可以直接完成"| User
A2 -.->|"也可以转回"| A1
style A1 fill:#e6f3ff,stroke:#333
style A2 fill:#f3ffe6,stroke:#333
style A3 fill:#ffe6f3,stroke:#333
17.4.2 实现方式
Swarm 的核心是”handoff”工具——每个 Agent 有一个特殊工具用于将控制权转交给其他 Agent:
from langchain_core.tools import tool
from langgraph.types import Command
# 创建 handoff 工具
def create_handoff_tool(target_agent: str, description: str):
@tool(name=f"transfer_to_{target_agent}")
def handoff() -> Command:
f"""将对话转交给 {target_agent}。{description}"""
return Command(goto=target_agent)
return handoff
# 定义 Agent 和它们的 handoff 能力
greeting_tools = [
create_handoff_tool("tech_support", "当用户有技术问题时转交"),
create_handoff_tool("billing", "当用户有账单问题时转交"),
]
tech_tools = [
search_docs_tool,
create_handoff_tool("greeting", "当问题解决后转回"),
]
billing_tools = [
check_balance_tool,
process_refund_tool,
create_handoff_tool("greeting", "当问题解决后转回"),
]
# 创建各 Agent
greeting_agent = create_react_agent(model, greeting_tools, name="greeting")
tech_agent = create_react_agent(model, tech_tools, name="tech_support")
billing_agent = create_react_agent(model, billing_tools, name="billing")
# 构建 Swarm 图
builder = StateGraph(SwarmState)
builder.add_node("greeting", greeting_agent)
builder.add_node("tech_support", tech_agent)
builder.add_node("billing", billing_agent)
builder.add_edge(START, "greeting") # 入口始终是接待 Agent
# 每个 Agent 可以通过 Command 跳转到任意其他 Agent
# 或者到达 END 结束对话
graph = builder.compile()
17.4.3 Swarm 的关键设计
- 去中心化:没有 Supervisor,Agent 之间是对等关系
- Handoff 工具:通过返回
Command(goto=...)实现控制权转移 - 工具即路由:LLM 通过选择”转交”工具来决定路由
17.4.4 适用场景与局限
适用场景:
- 客服系统、对话路由
- Agent 之间是专业领域的分工
- 同一时间只有一个 Agent 活跃
局限:
- 难以协调并行任务
- 可能出现循环转交
- 需要每个 Agent 都了解其他 Agent 的能力
17.4.5 源码核对:Command.goto 被 StateGraph 转换成什么——_CHANNEL_BRANCH_TO
一个 node 返回 Command(goto="billing") 之后,StateGraph 怎么把它变成实际的跳转?打开 graph/state.py:1540-1563:
def _control_branch(value):
commands: list[Command] = []
if isinstance(value, Command):
commands.append(value)
elif isinstance(value, (list, tuple)):
for cmd in value:
if isinstance(cmd, Command):
commands.append(cmd)
rtn: list[tuple[str, Any]] = []
for command in commands:
if command.graph == Command.PARENT:
raise ParentCommand(command)
goto_targets = (
[command.goto] if isinstance(command.goto, (Send, str)) else command.goto
)
for go in goto_targets:
if isinstance(go, Send):
rtn.append((TASKS, go))
elif isinstance(go, str) and go != END:
# END is a special case, it's not actually a node in a practical sense
# but rather a special terminal node that we don't need to branch to
rtn.append((_CHANNEL_BRANCH_TO.format(go), None))
return rtn
三条关键处理:
1、Command.PARENT → 立刻 raise ParentCommand。这条代码是 §17.3.5 讲过的异常冒泡的起点——在子图内部一检测到 graph=Command.PARENT,就把 Command 包成 ParentCommand 异常抛出。这一层不处理、向上冒。
2、Send 对象 → 写入 TASKS 通道。Pregel 有一个特殊通道叫 TASKS——凡是 Send 对象最终都写进这个通道。Pregel 下一个超步启动时会从 TASKS 通道读取所有待执行的 Send、为每个 Send 创建一个 task 实例。这是 Send 机制能实现”并行分发”的底层。
3、节点名字符串 → 写入 _CHANNEL_BRANCH_TO.format(go) 通道。每个可跳转节点有一个专属的”branch to” channel。当这个 channel 被写入时,Pregel 知道”下一步要激活这个节点”。这条机制让条件分支和直接跳转复用同一套 channel 触发逻辑——跳转是 channel 写入、被触发是 channel 订阅。
4、END 是特殊字符串,不走 branch。源码注释 “END is a special case, it’s not actually a node in a practical sense”——END 是个哨兵节点,表示”结束”。不需要跳转到它,只需要让 Pregel 知道”没有下一个活跃节点”就等同于结束。
这套设计让 Command 从用户视角的”导航+更新”语义,翻译到 Pregel 视角的”channel 写入”——Command 是前端 API、channel writes 是执行引擎的单一抽象。所有外部语义最终都塌缩到 channel 写入——这是 LangGraph 架构统一性的核心。
17.5 分层 Agent
17.5.1 架构概述
分层 Agent 是 Supervisor 模式的扩展:多个 Supervisor 形成树形结构,每个 Supervisor 管理一组专业 Agent 或下级 Supervisor。这种模式适用于大规模复杂任务的分解。
graph TB
CEO[CEO Agent<br/>总调度] --> PM1[项目经理 Agent<br/>研究方向]
CEO --> PM2[项目经理 Agent<br/>创作方向]
PM1 --> R1[数据分析 Agent]
PM1 --> R2[文献检索 Agent]
PM2 --> W1[文案撰写 Agent]
PM2 --> W2[校对编辑 Agent]
style CEO fill:#ffe6e6,stroke:#333,stroke-width:2px
style PM1 fill:#fff3e6,stroke:#333
style PM2 fill:#fff3e6,stroke:#333
17.5.2 实现方式
# 第一层:专业 Agent
data_agent = create_react_agent(model, data_tools, name="data_analyst")
search_agent = create_react_agent(model, search_tools, name="searcher")
writing_agent = create_react_agent(model, writing_tools, name="writer")
editing_agent = create_react_agent(model, editing_tools, name="editor")
# 第二层:子 Supervisor
def research_supervisor(state):
"""研究方向的中层主管"""
response = llm.invoke([
SystemMessage(content="你管理数据分析和文献检索。决定分配给谁。"),
*state["messages"]
])
next_agent = parse_decision(response.content)
return Command(goto=next_agent, update={"messages": [response]})
research_team = StateGraph(TeamState)
research_team.add_node("supervisor", research_supervisor)
research_team.add_node("data_analyst", data_agent)
research_team.add_node("searcher", search_agent)
research_team.add_edge(START, "supervisor")
research_team.add_edge("data_analyst", "supervisor")
research_team.add_edge("searcher", "supervisor")
research_graph = research_team.compile(name="research_team")
# 类似地创建 writing_team...
# 第三层:顶层 Supervisor
top_builder = StateGraph(TopState)
top_builder.add_node("ceo", ceo_supervisor)
top_builder.add_node("research_team", research_graph) # 子图作为节点
top_builder.add_node("writing_team", writing_graph)
top_builder.add_edge(START, "ceo")
top_builder.add_edge("research_team", "ceo")
top_builder.add_edge("writing_team", "ceo")
top_graph = top_builder.compile()
17.5.3 子图作为节点的运行时行为
当 Pregel 执行到子图节点时:
- 子图的输入从父图状态中提取
- 子图内部按自己的超步循环运行
- 子图的 checkpoint_ns 自动嵌套(如
research_team:supervisor:task_id) - 子图完成后,输出合并回父图状态
sequenceDiagram
participant Parent as 父图 (CEO)
participant Sub as 子图 (Research Team)
participant Agent as 子 Agent (Data Analyst)
Parent->>Sub: 输入子任务
Note over Sub: checkpoint_ns = "research_team"
Sub->>Agent: Supervisor 分配任务
Note over Agent: checkpoint_ns = "research_team:data_analyst:task_id"
Agent-->>Sub: 返回结果
Sub-->>Parent: 合并输出到父图状态
17.5.4 源码核对:子图的 checkpoint_ns 嵌套格式与 concurrent task disambiguation
§17.5.3 提到了子图 checkpoint_ns 自动嵌套(形如 research_team:supervisor:task_id)。真实的 ns 结构比这个简介更微妙——打开 _retry.py:57-83 对 _checkpoint_ns_for_parent_command 的注释:
The checkpoint namespace is a
|-separated path. Each segment is usually of the formname:task_id(e.g.parent_first:<uuid>|node:<uuid>), but the runtime may also insert a purely-numeric segment (e.g.|1) to disambiguate concurrent tasks (e.g.parent_first:<uuid>|1|node:<uuid>).
三条从这段注释能读出的细节:
1、分隔符是 | 不是 :。注释明确用 “|-separated path”——我们教学版说的 research_team:supervisor:task_id 简化了语义。真实 ns 是 research_team:<uuid>|supervisor:<uuid>——| 分层、: 分段内的 name 和 task_id。
2、: 后面是 task_id(UUID)。每个节点执行都产生一个独立的 task_id——即使同一个节点被多次调用,每次 task_id 都不同。这让 Pregel 能精确追踪”某个特定执行”的 checkpoint——比如”这个 supervisor 的这次特定 dispatch”。
3、中间可能插入纯数字段(|1|)做并发消歧。当你用 Send 同时给一个节点分发多个任务(map-reduce 场景)——同节点同时并发时,每个并发 task 需要不同的 ns 前缀防止 checkpoint 冲突。Pregel 为并发任务插入 |1|, |2|, |3| 等数字段——research_team:<uuid>|1|worker:<uuid>、research_team:<uuid>|2|worker:<uuid>——数字对应并发槽位。
这条设计让 checkpoint 能精确到”顶层图的这次运行的这个子图的第 2 个并发任务的这个节点的这次执行”——路径长得像 Linux 进程树路径。看起来复杂,但它是多 Agent 系统里精确 observability 的基础——你可以通过 ns 唯一定位一条执行轨迹,做精细的日志聚合、性能分析、错误归因。
17.5.5 源码核对:_get_root 对 Command.PARENT 的特殊处理
graph/state.py:1580-1600 里的 _get_root 函数在处理节点返回值时对 Command.PARENT 做了一个不起眼但重要的分支:
def _get_root(input: Any):
if isinstance(input, Command):
if input.graph == Command.PARENT:
return () # ← 返回空 tuple
return input._update_as_tuples()
elif isinstance(input, (list, tuple)) and input and any(isinstance(i, Command) for i in input):
updates = []
for i in input:
if isinstance(i, Command):
if i.graph == Command.PARENT:
continue # ← 跳过
updates.extend(i._update_as_tuples())
else:
updates.append(("__root__", i))
return updates
为什么 Command.PARENT 返回空或跳过? 因为 Command.PARENT 的语义是”这个 Command 不属于当前图”——它即将被 raise 成 ParentCommand 异常向上冒泡。当前图不应该把它的 update 应用到自己的 state——update 会在父图处理时被应用到父图的 state。
这条分支如果不写——当前图就会错误地把父图的 update 应用到自己的 state 上,导致 state 污染。这种”看起来无关紧要的一行代码”实际上是维护”Command.PARENT 路由语义”的关键 invariant。
17.6 协作 Agent
17.6.1 架构概述
协作 Agent 模式中,多个 Agent 共享同一个状态空间,按照预定义的顺序或条件轮流执行。每个 Agent 都能读取和修改共享状态,形成一种”接力赛”式的协作。
graph LR
Start[START] --> P[规划 Agent]
P --> R[研究 Agent]
R --> W[写作 Agent]
W --> Rev[审核 Agent]
Rev -->|需要修改| R
Rev -->|通过| End[END]
style P fill:#e6f3ff
style R fill:#f3ffe6
style W fill:#ffe6f3
style Rev fill:#fff3e6
17.6.2 实现方式
class CollaborativeState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
plan: str
research_results: list[str]
draft: str
review_feedback: str
iteration: int
def planner(state: CollaborativeState) -> dict:
"""规划 Agent:分析需求,制定计划"""
response = planner_llm.invoke([
SystemMessage(content="你是一个项目规划者。分析用户需求,制定详细的执行计划。"),
*state["messages"]
])
return {"plan": response.content, "messages": [response]}
def researcher(state: CollaborativeState) -> dict:
"""研究 Agent:根据计划收集信息"""
response = research_llm.invoke([
SystemMessage(content=f"执行以下研究计划:{state['plan']}"),
*state["messages"]
])
return {"research_results": [response.content], "messages": [response]}
def writer(state: CollaborativeState) -> dict:
"""写作 Agent:基于研究结果撰写"""
context = "\n".join(state["research_results"])
response = writer_llm.invoke([
SystemMessage(content=f"基于以下研究结果撰写:\n{context}"),
*state["messages"]
])
return {"draft": response.content, "messages": [response]}
def reviewer(state: CollaborativeState) -> dict:
"""审核 Agent:评审草稿质量"""
response = reviewer_llm.invoke([
SystemMessage(content=f"审核以下草稿的质量:\n{state['draft']}"),
*state["messages"]
])
return {
"review_feedback": response.content,
"messages": [response],
"iteration": state.get("iteration", 0) + 1
}
def review_decision(state: CollaborativeState) -> str:
"""决定是否需要修改"""
if "approved" in state["review_feedback"].lower():
return END
if state.get("iteration", 0) >= 3:
return END # 最多迭代 3 次
return "researcher"
builder = StateGraph(CollaborativeState)
builder.add_node("planner", planner)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("reviewer", reviewer)
builder.add_edge(START, "planner")
builder.add_edge("planner", "researcher")
builder.add_edge("researcher", "writer")
builder.add_edge("writer", "reviewer")
builder.add_conditional_edges("reviewer", review_decision)
graph = builder.compile()
17.6.3 协作的关键设计
- 共享状态:所有 Agent 读写同一个
CollaborativeState - 专用字段:每个 Agent 有自己的输出字段(plan、research_results、draft、review_feedback)
- 迭代循环:reviewer 可以触发新一轮的 research -> write -> review
17.6.4 源码核对:Send 如何在 Pregel 内部变成一个独立 task
§17.2.3 讲了 Send 的数据结构、§17.4.5 讲了它被写入 TASKS 通道。真正把 Send 变成一个可执行 task 的代码在 _algo.py:900-937(这部分是 Pregel 启动每一个超步时的核心逻辑):
def prepare_push_task(...) -> PregelTask | PregelExecutableTask | None:
if len(task_path) == 2:
# SEND tasks, executed in superstep n+1
# (PUSH, idx of pending send)
idx = cast(int, task_path[1])
if not channels[TASKS].is_available():
return
sends: Sequence[Send] = channels[TASKS].get()
if idx < 0 or idx >= len(sends):
return
packet = sends[idx]
if not isinstance(packet, Send):
logger.warning(f"Ignoring invalid packet type {type(packet)} in pending sends")
return
if packet.node not in processes:
logger.warning(f"Ignoring unknown node name {packet.node} in pending sends")
return
# find process
proc = processes[packet.node]
proc_node = proc.node
if proc_node is None:
return
# create task id
triggers = PUSH_TRIGGER
checkpoint_ns = (
f"{parent_ns}{NS_SEP}{packet.node}" if parent_ns else packet.node
)
task_id = task_id_func(
checkpoint_id_bytes,
checkpoint_ns,
str(step),
packet.node,
PUSH,
str(idx),
)
五条从这段代码能读出的工程细节:
1、Send 在上一超步写入 TASKS 通道,下一超步才执行(注释 “executed in superstep n+1”)。这意味着用户写 return Send("worker", {...}) 产出的任务不会立刻执行——需要等当前超步完成、所有 channel 更新生效、Pregel 开始下一超步。Send 是跨超步的异步分发,不是同步调用。
2、三道防御性校验:
not channels[TASKS].is_available()→ TASKS 通道没有待处理 Send,直接返回idx < 0 or idx >= len(sends)→ 索引越界(可能因为 checkpoint 恢复时 Send 列表已变),忽略not isinstance(packet, Send)→ 类型错误,log warning 并忽略packet.node not in processes→ 未知节点名,log warning 并忽略
第四条特别值得注意——如果你写 Send("nonexistent_node", ...),Pregel 不会 throw——只会 log warning 并静默跳过。这是 Pregel 的容错设计:一个错误的 Send 不应该让整个超步失败。代价是用户可能不知道自己拼错了节点名——所以 DEV 时要留意 warning。
3、每个 Send 生成一个独立 task_id。task_id 的输入包含 idx——Send 在列表中的位置——两个 Send 给同一节点但不同 idx 会生成不同 task_id。这让 Pregel 能精确追踪每个 Send 对应的执行。
4、checkpoint_ns = f"{parent_ns}{NS_SEP}{packet.node}"——Send 产生的 task 的 ns 追加到父 ns 后面,形成层级路径。这保证了 Send 调度在嵌套图里也有正确的 checkpoint 定位(§17.5.4 讲过)。
5、triggers = PUSH_TRIGGER——Send 产生的 task 用一个特殊的 trigger 标记。Pregel 区分 “通过 channel 订阅被触发” 的普通节点 和 “通过 Send PUSH 被触发” 的任务——两者在 checkpoint 恢复、调度优先级、错误处理上有微妙差别。
这 40 行代码让 return [Send("a", ...), Send("b", ...)] 这种简单的 Python 返回值,变成了 Pregel 并发调度的基础——一行 Send = 一个异步任务、带精确 ns、带独立 checkpoint、带类型安全。用户不用自己写调度代码,这是 LangGraph 作为编排框架的核心价值。
17.7 模式对比与选择
17.7.1 四种模式对比
graph TB
subgraph Supervisor
S_Hub[中央调度者]
S_A[Agent A]
S_B[Agent B]
S_Hub --> S_A
S_Hub --> S_B
S_A --> S_Hub
S_B --> S_Hub
end
subgraph Swarm
SW_A[Agent A]
SW_B[Agent B]
SW_C[Agent C]
SW_A <-->|handoff| SW_B
SW_B <-->|handoff| SW_C
end
subgraph 分层
H_Top[顶层 Sup]
H_Mid1[中层 Sup 1]
H_Mid2[中层 Sup 2]
H_A[Agent A]
H_B[Agent B]
H_Top --> H_Mid1
H_Top --> H_Mid2
H_Mid1 --> H_A
H_Mid2 --> H_B
end
subgraph 协作
C_A[Agent A] --> C_B[Agent B]
C_B --> C_C[Agent C]
C_C -.->|可选循环| C_A
end
| 维度 | Supervisor | Swarm | 分层 | 协作 |
|---|---|---|---|---|
| 决策方式 | 中央决策 | 自主决策 | 层级决策 | 预定义顺序 |
| 通信模式 | 星形(hub-spoke) | 点对点 | 树形 | 链式/环形 |
| 并行能力 | 可通过 Send | 单一活跃 | 层内可并行 | 可通过 Send |
| 适用规模 | 小型(3-5 Agent) | 中型 | 大型 | 任意 |
| 复杂度 | 低 | 中 | 高 | 低 |
| 灵活性 | 中 | 高 | 高 | 低 |
17.7.2 选择建议
flowchart TB
Q1{任务可以分解为<br/>独立子任务?}
Q1 -->|是| Q2{需要动态决策<br/>任务分配?}
Q1 -->|否| Q3{Agent 之间需要<br/>直接交接?}
Q2 -->|是| Sup[Supervisor 模式]
Q2 -->|否| Collab[协作 Agent 模式]
Q3 -->|是| Swarm[Swarm 模式]
Q3 -->|否| Q4{Agent 数量 > 5?}
Q4 -->|是| Hier[分层 Agent 模式]
Q4 -->|否| Sup
17.8 高级技巧
17.8.1 并行 Agent 执行
使用 Send API 让 Supervisor 同时分配多个任务:
def supervisor_with_parallel(state):
"""Supervisor 同时分配多个任务"""
tasks = analyze_subtasks(state["messages"])
return [
Send(task["agent"], {"messages": state["messages"], "task": task["description"]})
for task in tasks
]
17.8.2 Agent 间消息传递的状态设计
class MultiAgentState(TypedDict):
# 公共消息历史
messages: Annotated[list[BaseMessage], add_messages]
# 各 Agent 的中间结果(使用 reducer 合并)
agent_outputs: Annotated[dict[str, Any], merge_dicts]
# 当前活跃的 Agent
active_agent: str
# 全局上下文
shared_context: dict[str, Any]
17.8.3 跨 Agent 的 Store 共享
def research_agent_node(state, runtime: Runtime):
"""研究 Agent:结果保存到 Store"""
results = do_research(state["messages"])
runtime.store.put(
("shared", "research"),
state["task_id"],
{"results": results}
)
return {"messages": [AIMessage(content="研究完成")]}
def writer_agent_node(state, runtime: Runtime):
"""写作 Agent:从 Store 读取研究结果"""
research = runtime.store.search(
("shared", "research"),
limit=10
)
context = "\n".join(r.value["results"] for r in research)
draft = write_with_context(context, state["messages"])
return {"messages": [AIMessage(content=draft)]}
17.9 常见陷阱与最佳实践
17.9.1 避免无限循环
多 Agent 系统最常见的问题是循环转交——Agent A 认为应该交给 Agent B,Agent B 又认为应该交回 Agent A。解决方案:
class SafeState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
handoff_count: int
max_handoffs: int
def safe_handoff(state: SafeState, target: str) -> Command:
"""带有次数限制的安全转交"""
if state.get("handoff_count", 0) >= state.get("max_handoffs", 10):
return Command(
goto=END,
update={"messages": [AIMessage(content="已达到最大转交次数,结束对话。")]}
)
return Command(
goto=target,
update={"handoff_count": state.get("handoff_count", 0) + 1}
)
17.9.2 状态 Schema 设计原则
多 Agent 系统的状态设计需要平衡共享与隔离:
class WellDesignedState(TypedDict):
# 公共字段:所有 Agent 共享
messages: Annotated[list[BaseMessage], add_messages]
task_description: str
# Agent 特有字段:使用 NotRequired 避免其他 Agent 被强制提供
research_notes: NotRequired[str]
draft_content: NotRequired[str]
review_score: NotRequired[float]
# 元数据字段:追踪执行过程
current_agent: str
iteration_count: int
17.9.3 消息历史管理
多 Agent 系统中,消息历史会快速膨胀。建议使用 pre_model_hook 或专门的消息管理节点:
from langgraph.graph.message import REMOVE_ALL_MESSAGES, RemoveMessage
def trim_messages_hook(state):
"""保留最近 20 条消息,加上系统消息"""
messages = state["messages"]
if len(messages) > 20:
# 保留系统消息 + 最近 20 条
system_msgs = [m for m in messages if isinstance(m, SystemMessage)]
recent = messages[-20:]
return {
"messages": [RemoveMessage(id=REMOVE_ALL_MESSAGES)] + system_msgs + recent
}
return {"messages": messages}
17.9.4 源码核对:interrupt() 的 resume 机制与多 Agent 人审批场景
多 Agent 系统最需要的能力之一是”某个 Agent 执行到敏感操作时等待人审批”——比如写作 Agent 写完后等审核员 +OK 再发出。langgraph/types.py:705 的 interrupt() 函数是这个机制的实现:
def interrupt(value: Any) -> Any:
"""Interrupt the graph with a resumable exception from within a node.
The `interrupt` function enables human-in-the-loop workflows by pausing graph
execution and surfacing a value to the client. ...
In a given node, the first invocation of this function raises a `GraphInterrupt`
exception, halting execution. ...
A client resuming the graph must use the `Command` primitive to specify a value
for the interrupt and continue execution. The graph resumes from the start of
the node, **re-executing** all logic.
"""
三条容易被忽略的语义:
1、interrupt 发生时,当前节点抛 GraphInterrupt——整个超步停止、checkpoint 保存完整状态。这条异常通过 §17.3.5 讲过的 GraphBubbleUp 家族冒泡。§16.3.3-ter 讲过 GraphBubbleUp 绝不被 ToolNode 吞——就是为了保证 interrupt 能正确穿透。
2、恢复时用 Command(resume=value):客户端用 graph.invoke(Command(resume="approved"), config=...) 告诉 Pregel “从中断处恢复、这个值作为 interrupt 的返回”——graph 从当前节点重新开始执行,但 interrupt() 调用会直接返回 resume 值(不再抛异常)。
3、节点从头重跑:官方注释明确写了 “The graph resumes from the start of the node, re-executing all logic.”。这意味着节点里 interrupt 之前的代码会被跑第二次——任何副作用(数据库写、API 调用)会被重复执行。这条陷阱让用户必须把 interrupt 前的操作做成幂等的、或者把 interrupt 尽可能放在节点最开始。
多 Agent 系统下,interrupt 配合 Swarm/Supervisor 能实现复杂的人审批工作流——比如:
def sensitive_agent_node(state):
# ... 收集信息 ...
decision = interrupt({
"action": "approve_refund",
"amount": state["refund_amount"],
"user": state["user_id"],
})
if decision == "approved":
return Command(goto="refund_processor", update={"approved": True})
else:
return Command(goto="denial_handler", update={"reason": decision})
客户端收到 interrupt 值(包含申请详情),人工审核后调 graph.invoke(Command(resume="approved"), ...)——Agent 恢复、走批准路径。这套 pattern 是 LangGraph Platform 人审批工作流的核心——所有用户可见的”等待决策”UI 背后都是这条机制。
17.9.5 源码核对:RetryPolicy 与多 Agent 的失败恢复
多 Agent 系统涉及多次 LLM 调用、多次 API 调用——任何一次都可能失败。langgraph/types.py:404-423 的 RetryPolicy 给每个节点配置精细的重试策略:
class RetryPolicy(NamedTuple):
initial_interval: float = 0.5
"""Amount of time that must elapse before the first retry occurs. In seconds."""
backoff_factor: float = 2.0
"""Multiplier by which the interval increases after each retry."""
max_interval: float = 128.0
"""Maximum amount of time that may elapse between retries. In seconds."""
max_attempts: int = 3
"""Maximum number of attempts to make before giving up, including the first."""
jitter: bool = True
"""Whether to add random jitter to the interval between retries."""
retry_on: (
type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool]
) = default_retry_on
五个字段表达了一套指数退避 + jitter的成熟重试策略:
- 第 1 次重试等 0.5s
- 第 2 次等 1s(0.5 × 2)
- 第 3 次等 2s(1 × 2)
- 最多等 128s(max_interval 封顶)
- jitter=True 加随机抖动避免惊群效应
retry_on 接受 Exception 类型、序列或 callable——默认的 default_retry_on 只重试网络相关错误(超时、503 等),不重试应用逻辑错误(KeyError、ValueError 等——这些重试多少次都是同样结果)。
多 Agent 场景下的用法:
builder.add_node(
"llm_heavy_node",
llm_node,
retry=RetryPolicy(max_attempts=5, retry_on=(httpx.TimeoutException,)),
)
只对 httpx.TimeoutException 重试 5 次——LLM API 调用偶尔超时很正常、多重试能救;应用层错误不重试避免浪费 token。
这条设计和第 16.3.4 节讲过的 _infer_handled_types(ToolNode 的错误分类)是同一哲学——精确控制哪些错误被吞、哪些向上抛。多 Agent 系统的可靠性建立在每一层都有明确的错误处理策略上。
17.10 设计决策
17.9.1 为什么用子图而不是函数调用?
将 Agent 实现为子图(而非简单的函数调用)的优势:
- 独立 Checkpoint:每个子图有自己的 checkpoint 命名空间,支持独立的中断恢复
- 可视化:子图在图的可视化中自然展示层级关系
- 复用:同一个 Agent 子图可以在不同的多 Agent 系统中复用
- 流式输出:子图的流式事件自动携带命名空间前缀,便于追踪
17.9.2 为什么 Handoff 用 Command 而不是状态更新?
Swarm 模式中使用 Command(goto=target) 而不是 return {"next_agent": target} 的原因:
- 原子性:Command 的 goto 和 update 在同一步中执行
- 类型安全:goto 的目标必须是已注册的节点名
- 跨图能力:Command 可以通过
graph=Command.PARENT导航到父图
17.9.3 状态隔离 vs 状态共享
不同的多 Agent 模式对状态共享有不同的需求:
- Supervisor:子 Agent 的状态可以通过子图隔离
- Swarm:所有 Agent 共享同一个状态(消息历史)
- 协作:共享状态是核心机制,每个 Agent 写入不同的字段
- 分层:每层有自己的状态,通过子图接口传递数据
17.10 小结
本章系统介绍了基于 LangGraph 实现的四种多 Agent 模式。每种模式都是底层原语(StateGraph、Send、Command、子图)的不同组合方式:
- Supervisor 通过
Command(goto=agent)实现中央调度 - Swarm 通过 handoff 工具返回
Command实现点对点交接 - 分层 Agent 通过子图嵌套实现多级管理
- 协作 Agent 通过共享 State 的不同字段实现接力式协作
选择哪种模式取决于具体需求:任务是否可分解、是否需要动态路由、Agent 数量和并行需求。在实践中,这些模式也可以混合使用——例如,顶层使用 Supervisor 模式,底层使用协作模式。
LangGraph 的优势在于它不绑定任何特定的 Agent 框架模式。无论选择哪种多 Agent 架构,底层都是相同的 StateGraph + Pregel 调度器,享有相同的 Checkpoint 持久化、中断恢复、流式输出和类型安全能力。这使得开发者可以根据业务需求自由组合,而不是被框架限制在某个特定的模式中。
下一章是本书的最后一章,我们将从更高的视角审视 LangGraph 的设计模式与架构决策——Pregel 计算模型的选择、Channel 版本追踪的巧妙、Checkpoint 时间旅行的实现、中断/恢复机制的全貌,以及如何基于这些思想构建你自己的工作流引擎。
17.10.2 源码核对:multi_agent supervisor 的 Send 并行派发如何和 messages reducer 配合
§17.8.1 给了 Supervisor 用 Send 并行派发多个任务的示例。实现这种”并行+结果聚合”的完整模式需要 state 设计和 reducer 配合:
from typing import Annotated
from operator import add
class ParallelSupervisorState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
task_results: Annotated[list[str], add] # ← 关键:用 add reducer
completed_tasks: Annotated[int, operator.add]
def supervisor(state) -> Sequence[Send]:
"""一次派发多个并行任务"""
tasks = analyze_and_split(state["messages"])
return [Send("worker", {"messages": state["messages"], "task": t}) for t in tasks]
def worker(state) -> dict:
result = do_task(state["task"])
return {"task_results": [result], "completed_tasks": 1}
三条关键点:
1、reducer 必须是幂等的 merge 类型。Annotated[list, add] 用 operator.add——两个 list 合并;Annotated[int, operator.add] 让并发 worker 的 completed_tasks 累加。如果用 Annotated[list, lambda old, new: new](即后写覆盖前写),三个并发 worker 的 result 只会保留最后一个。
2、Send 发给同一个 node 名是允许的:上面代码里 3 个 Send 都指向 "worker"——Pregel 会为每个 Send 独立实例化一个 worker 任务(task_id 不同,§17.6.4 讲过)。3 个 worker 并发执行、各自产出 task_results 更新、都被 add reducer 合并到总 state。
3、后续节点能读到完整聚合结果:supervisor 派发完立即返回 Sequence[Send],当前超步结束;下一超步 worker 们并发执行;再下一超步回到 supervisor(假设 worker 都有边连回 supervisor),此时 state["task_results"] 是完整的合并列表。三次超步完成”分发-并行执行-聚合”的 map-reduce。
这条 pattern 是 LangGraph 实现”Swarm of workers”并行多 Agent 的标准写法——Pregel 的并发 + Send 的分发 + reducer 的聚合三件套。读完你应该能写出生产级的 map-reduce Agent pipeline。
17.10.3 源码核对:handoff 工具里 return Command(goto=...) 在 ToolNode 的执行路径
§17.4 讲 Swarm 的 handoff 工具。真实的 handoff 能生效,依赖 ToolNode 识别工具返回值类型的能力。第 16 章 §16.3.5 讲过 ToolNode._execute_tool_sync(tool_node.py:989-997)的结尾有这样一段:
# Process successful response
if isinstance(response, Command):
# Validate Command before returning to handler
return self._validate_tool_command(response, request.tool_call, input_type)
if isinstance(response, ToolMessage):
response.content = cast("str | list", msg_content_output(response.content))
return response
msg = f"Tool {call['name']} returned unexpected type: {type(response)}"
raise TypeError(msg)
工具返回的不是 ToolMessage 就必须是 Command——否则 ToolNode 立刻 TypeError。这条规则是 Swarm 能工作的基础:handoff 工具返回 Command(goto="agent_name"),ToolNode 识别它是 Command 类型、经过 _validate_tool_command 校验(检查 goto 目标是否是已知节点等)、然后原样往上传到 Pregel。Pregel 看到 Command 执行跳转。
_validate_tool_command 还做了一件事——它检查 Command.update 的结构是否和当前 StateGraph 的 schema 兼容。如果工具用 dict 形式传了 update 但图用的是 list schema(或反之),这里会提前抛错——让错误在构建时就暴露、不要等到 Pregel commit update 时才发现不兼容。
这条链路一环扣一环:
- 用户写
@tool def handoff() -> Command: return Command(goto="billing") - LLM 决定调用 handoff 工具
- ToolNode._run_one 执行 handoff
- 返回 Command 被识别、经
_validate_tool_command校验 - 上传到 Pregel、Pregel 走 §17.4.5 讲过的
_control_branch - 转换成 channel 写入、下一超步激活 billing 节点
6 步转换让用户视角的”工具返回跳转意图”变成执行引擎的”channel 状态变化”——一条完整的抽象漏斗。
17.10.3.5 源码核对:ToolOutputMixin 的双重角色——Command 既是 graph 原语也是 tool 返回值
§17.2.4 讲 Command 时提到它是 Generic[N], ToolOutputMixin——为什么要继承 ToolOutputMixin?打开 langchain_core(langgraph 依赖)的 ToolOutputMixin 定义:
# langchain_core/tools/base.py 附近
class ToolOutputMixin:
"""Marker class for tool outputs that should bypass message serialization."""
pass
它是一个marker class——什么都不做,只作为一个”类型信号”存在。ToolNode 和 ChatModel 在处理工具返回值时会检查 isinstance(response, ToolOutputMixin)——如果是,跳过默认的 “把返回值 str(x) 成 ToolMessage content” 逻辑、直接原样上传。
这条设计让 Command 能同时扮演两个角色:
- 作为 graph 节点返回值:走 §17.4.5 的
_control_branch - 作为工具函数返回值:绕过 ToolNode 的字符串转换、被识别为”跳转意图”
一个 class、两条识别路径——这是 Python 多重继承 + marker class 模式的典型应用。对比一下如果不用 ToolOutputMixin——ToolNode 会把 Command 对象 str() 化为 "Command(goto='billing')" 字符串、塞进 ToolMessage.content——LLM 看到的是一段毫无意义的字符串、handoff 彻底失效。
marker class 是 Python 里一条”不需要额外代码、只需要类型信息”的扩展机制。你在自己的工具系统里如果也需要”某些工具返回值有特殊含义”——用 marker class 比用字典或枚举类型检查更干净。
17.10.3.7 读完本章能回答的 10 个具体问题
作为本章掌握度的自测表:
- Send 和 Command 的数据结构差异?(§17.2.3-4——Send 2 字段轻量、Command 4 字段复合)
- Command.PARENT 如何跨图跳转?(§17.3.5——raise ParentCommand 异常冒泡 + 计算父图 ns)
- checkpoint_ns 里为什么会有数字段?(§17.5.4——并发 task 消歧)
- Send(“unknown_node”, …) 会怎样?(§17.6.4——Pregel log warning 并静默忽略)
- Send 的任务什么时候执行?(§17.6.4——下一超步,不是同步)
- 工具返回非 ToolMessage/Command 会怎样?(§17.10.3——ToolNode 立刻 TypeError)
- Command 的 update 字段接受哪几种类型?(§17.2.5——dict / list of tuples / NamedTuple / Pydantic / 单一值)
- interrupt() 后节点恢复时会重跑吗?(§17.9.4——从头重跑节点,副作用可能重复)
- RetryPolicy 默认对什么异常重试?(§17.9.5——default_retry_on 只重试网络类错误)
- ToolOutputMixin 的作用?(§17.10.3.5——marker class 让 Command 绕过 ToolNode 的字符串序列化)
能回答 8 条以上说明源码细节已经被吃透;不清楚的条目直接回到对应小节按行号核对源码。
17.10.4 常见陷阱与工程经验总结
多 Agent 系统在生产环境里会遇到单 Agent 不会有的一类独特问题。把本章所有讲过的源码细节+工程经验汇总成6 条生产级 checklist:
1、 handoff 工具必须返回 Command 而不是字符串:return "transfer to billing" 对 Pregel 毫无意义——Pregel 看不懂自然语言路由意图。必须 return Command(goto="billing")——Pregel 才能真的跳转。这条低级错误在多 Agent 社区 issue 里出现过几百次。
2、子 Agent 和父图的 state schema 类型要对齐:子图的 input_schema 定义了它能接受什么 state,父图传入不匹配的字段会静默丢失。推荐用 TypedDict 继承或共享 schema。state: TypedDict 里的 total=True 会导致”字段必须全部存在”的严格模式——多 Agent 场景用 total=False 或 NotRequired[...] 更宽松。
3、messages 历史会爆炸:§17.9.3 讲过 trim_messages_hook。实际生产系统必须在关键节点加消息修剪——否则 10 轮对话后上下文就爆了 100K tokens。可以考虑用 summary reducer 压缩历史、或者只传当前任务相关的上下文切片。
4、循环转交必须有 circuit breaker:§17.9.1 讲过 handoff_count。真实生产里建议三层防护:handoff_count 上限、total_step 上限(Pregel 自带 recursion_limit)、timeout 上限(通过 config)。任何一层触发就返回”任务超限,人工介入”。
5、并发 Send 时要注意 reducer 的幂等性:Supervisor 用 Send 同时派发 10 个子任务、每个子任务都往 messages reducer 写——如果 reducer 不是 merge/append 类型,后写会覆盖先写。必须用 Annotated[list, add_messages] 或 Annotated[dict, merge_dicts] 这种幂等 reducer。
6、ParentCommand 穿透子图时要处理 state 合并语义:当子 Agent 用 Command(goto="parent_sibling", graph=Command.PARENT, update={...}) 跳到父图时,update 是应用到父图 state 的,不是子图的。用户常误以为 update 会传到两侧——实际只作用于目标图。这条语义在 §17.5.5 的 _get_root 里体现得最清楚。
这 6 条 checklist 是从真实生产 bug 里提炼出来的——每一条都对应过一次线上事故。多 Agent 系统从”玩具”走到”生产”的距离,主要就是这 6 条的逐一兑现。
17.11 本章与全书体系的呼应
多 Agent 模式不是空中楼阁——它建立在本书前面所有章节讲过的原语之上。梳理一下本章和全书的关联能让你看到 LangGraph 架构的整体性:
与第 2-3 章(StateGraph、Channel)的基础:Supervisor 的中央调度、Swarm 的 handoff、协作 Agent 的共享状态——它们全部是 StateGraph 的节点 + 条件边 + reducer 的不同组合。分层 Agent 用子图作为节点,子图也是 StateGraph——没有任何一种多 Agent 模式引入了 StateGraph 之外的新原语。
与第 7 章(Pregel 调度)的支撑:Send 的并行分发、Command 的跨图导航、子图的 ns 嵌套——全部通过 Pregel 的超步模型实现。第 7 章讲过的 “每超步从 channel 读、节点执行、写回 channel、再次触发” 循环是多 Agent 协作的底层物理。
与第 13 章(Checkpoint)的结合:多 Agent 的长流程天然需要持久化。Supervisor 每次决策、Swarm 每次 handoff、协作 Agent 每次迭代——都是独立的 checkpoint。checkpoint_ns 的嵌套(§17.5.4)让你能在复杂多 Agent 系统里精确定位到”这次运行的第 3 步的 research_team 子图的第 2 个并发 worker 的状态”。
与第 15 章(Store)的共享记忆:§17.8.3 讲过跨 Agent 的 Store 共享。Research Agent 把研究结果写 Store、Writer Agent 从 Store 读——这是多 Agent 系统里”共享知识”最自然的实现。比通过 messages 传递效率高、比通过 state 传递解耦好。
与第 16 章(ToolNode + create_react_agent)的衔接:§17.2.1 讲的 “Agent = CompiledStateGraph” 实际上引用了 §16.2 的 create_react_agent——里面的 agent 节点、tools 节点、should_continue 条件都是 §16 细讲过的。多 Agent 系统里每个单 Agent 都是这个 pattern 的实例。
这种”每一章的原语在后一章组合出新的模式”的递进结构是 LangGraph 教学最有效的路径——你不需要学习 N 个独立概念,真正的概念只有 4 个(StateGraph、Pregel、Checkpoint、Store),剩下的全是组合。从简单 ReAct Agent(§16)到四种多 Agent 模式(本章)再到完整生产工作流(§18),其实用的都是同一组积木——只是搭成了不同形状。
这是 LangGraph 作为编排引擎最可贵的特质——极简原语 + 无限组合。对比一些框架动辄提供 20 个 “特化 API”(tool_agent、reasoning_agent、planning_agent、multi_agent_executor…),LangGraph 坚持用 4 个原语硬扛所有场景——代码量更少、心智负担更小、灵活性反而更高。读完本章后,你应该能意识到:没有”Supervisor Agent 框架”这种东西——只有 StateGraph + Command。学会用 4 个原语组合,你就能实现任何见过或没见过的多 Agent 模式。
17.12 源码定位索引
为便于读者按图索骥核实本章说法:
| 小节 | 源文件 | 关键行号 |
|---|---|---|
| §17.2.3 Send | libs/langgraph/langgraph/types.py | class Send:574-646 |
| §17.2.4-5 Command | 同上 | class Command:652-702 / _update_as_tuples:687-700 |
| §17.3.5 ParentCommand | libs/langgraph/langgraph/pregel/_retry.py | 捕获:127-140 / _checkpoint_ns_for_parent_command:57-83 |
| §17.4.5 _control_branch | libs/langgraph/langgraph/graph/state.py | 1540-1563 |
| §17.5.4 ns 格式 | libs/langgraph/langgraph/pregel/_retry.py | 注释:57-67 |
| §17.5.5 _get_root | libs/langgraph/langgraph/graph/state.py | 1580-1600 |
| §17.6.4 Send → task | libs/langgraph/langgraph/pregel/_algo.py | prepare_push_task:900-937 |
| §17.9.4 interrupt | libs/langgraph/langgraph/types.py | interrupt:705+ |
| §17.9.5 RetryPolicy | 同上 | class RetryPolicy:404-423 |
源码版本:langgraph-latest 本地仓库。
实操建议:把本章 4 种模式各写一个最小 repro 项目跑起来,观察 checkpoint_ns 的演化——state schema 设计、消息历史膨胀、循环转交防护、错误冒泡等边角问题会在动手时集中暴露。
17.13 下一步建议:从本章走向生产
读完本章后,你可以按以下顺序扩展学习:
短期(1-2 周):
- 实现一个自己的 Supervisor + 2 Agent 的 demo,观察 LangGraph Studio 里的图可视化
- 把本章 §17.8.3 的 Store 共享 demo 跑起来,理解跨 Agent 状态共享的细节
- 读一遍
langgraph/types.py全文,把 Send/Command/Interrupt/RetryPolicy 的类定义看透
中期(1-2 月):
- 实现一个 Swarm 客服系统,用真实 LLM 跑起来,故意触发循环转交看看 handoff_count 保护机制
- 加上 Checkpoint 持久化到 Postgres,测试中断恢复场景
- 加上 Store 做跨会话用户偏好记忆
长期(3-6 月):
- 读完
langgraph/pregel/所有源码(一万行左右),理解完整调度模型 - 基于 LangGraph 写一个垂直领域的 Agent SDK(比如”数据分析 Agent”、“客服 Agent”)
- 为 langgraph-latest 仓库提一个 PR(修 bug 或加小特性)
下一章(第 18 章)将从更高的视角对 LangGraph 做架构复盘——Pregel 计算模型、Channel 版本追踪、Checkpoint 时间旅行、中断恢复机制如何汇聚成一套一致的设计哲学。