LangGraph 设计与实现
第14章 Runtime 与 Context
第14章 Runtime 与 Context
14.1 引言
在构建 LLM 应用的图时,节点函数往往需要访问一些”运行时依赖”——当前用户的身份信息、数据库连接池、API 密钥、或者一个全局的向量存储。这些依赖既不属于图的状态(它们不随步骤变化),也不应该被硬编码在节点函数中(它们因调用而异)。传统做法是通过闭包或全局变量传递,但这在测试、多租户和类型安全方面都存在痛点。
LangGraph 1.1.6 引入了 Runtime 类和 ContextT 泛型来解决这个问题。Runtime 是一个不可变的数据容器,在图执行开始时由调用方创建,自动注入到每个节点函数中。它携带了 context(用户自定义的运行时上下文)、store(持久化存储)、stream_writer(流式写入器)、execution_info(执行元数据)等运行时信息。
本章将从 Runtime 的数据类定义出发,分析 ContextT 泛型的设计理念、ExecutionInfo 和 ServerInfo 的信息模型、context 与 state 的本质区别,以及 Runtime 在 Pregel 循环中的注入机制。
本章要点
Runtime类的完整字段定义——context、store、stream_writer、previous、execution_info、server_infoContextT泛型的类型传播——从 StateGraph 到节点函数的端到端类型安全ExecutionInfo与ServerInfo的信息模型——执行元数据的结构化表达- Context vs State 的本质区别——不可变依赖 vs 可变状态
- Runtime 注入机制——从编译到执行的完整链路
14.2 Runtime 类的设计
14.2.1 数据类定义
Runtime 定义在 langgraph/runtime.py 中,是一个泛型冻结数据类:
@dataclass(**_DC_KWARGS) # kw_only=True, slots=True, frozen=True
class Runtime(Generic[ContextT]):
"""Convenience class that bundles run-scoped context and other runtime utilities."""
context: ContextT = field(default=None)
"""Static context for the graph run, like user_id, db_conn, etc."""
store: BaseStore | None = field(default=None)
"""Store for the graph run, enabling persistence and memory."""
stream_writer: StreamWriter = field(default=_no_op_stream_writer)
"""Function that writes to the custom stream."""
previous: Any = field(default=None)
"""The previous return value for the given thread (functional API only)."""
execution_info: ExecutionInfo | None = field(default=None)
"""Read-only execution information/metadata for the current node run."""
server_info: ServerInfo | None = field(default=None)
"""Metadata injected by LangGraph Server. None for open-source."""
_DC_KWARGS 展开为 kw_only=True, slots=True, frozen=True,这意味着:
- kw_only:所有字段必须通过关键字参数传递,避免位置参数的歧义
- slots:使用
__slots__优化内存和属性访问速度 - frozen:实例创建后不可修改,确保运行时安全
14.2.2 字段语义
graph TB
Runtime[Runtime 对象]
Runtime --> Context["context: ContextT<br/>用户自定义上下文<br/>如 user_id, db_conn"]
Runtime --> Store["store: BaseStore | None<br/>持久化存储<br/>跨线程记忆"]
Runtime --> SW["stream_writer: StreamWriter<br/>自定义流式写入<br/>发射中间结果"]
Runtime --> Prev["previous: Any<br/>上次执行的返回值<br/>仅函数式 API"]
Runtime --> EI["execution_info: ExecutionInfo<br/>执行元数据<br/>checkpoint_id, task_id 等"]
Runtime --> SI["server_info: ServerInfo<br/>服务器元数据<br/>assistant_id, user 等"]
六个字段覆盖了节点函数可能需要的所有运行时信息:
| 字段 | 类型 | 来源 | 可变性 |
|---|---|---|---|
| context | ContextT(泛型) | 调用方传入 | 整个执行期间不变 |
| store | BaseStore | 图编译时配置 | 引用不变,内容可变 |
| stream_writer | StreamWriter | 框架自动注入 | 每个任务独立 |
| previous | Any | Checkpoint 读取 | 只读 |
| execution_info | ExecutionInfo | 框架生成 | 每个任务独立 |
| server_info | ServerInfo | LangGraph Server | 只读 |
14.2.3 不可变性与 override/merge
虽然 Runtime 是 frozen 的,但它提供了两个方法来创建修改后的副本:
def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
"""Merge two runtimes together. If a value is not provided in other,
the value from self is used."""
return Runtime(
context=other.context or self.context,
store=other.store or self.store,
stream_writer=other.stream_writer
if other.stream_writer is not _no_op_stream_writer
else self.stream_writer,
previous=self.previous if other.previous is None else other.previous,
execution_info=other.execution_info or self.execution_info,
server_info=other.server_info or self.server_info,
)
def override(self, **overrides) -> Runtime[ContextT]:
"""Replace the runtime with a new runtime with the given overrides."""
return replace(self, **overrides)
merge 用于子图继承父图的 Runtime 时,合并两个 Runtime 对象。override 用于框架在任务准备阶段注入特定字段(如 execution_info)。
merge 的三种”未提供”判定
merge 的实现乍看只是”6 个字段各选一个非空的”、实际源码里三种字段用了三种不同的判定方式——每一种都在处理 Python None/falsy 歧义的不同坑:
# runtime.py:188 真实实现
def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
return Runtime(
context=other.context or self.context, # ① or
store=other.store or self.store, # ① or
stream_writer=other.stream_writer
if other.stream_writer is not _no_op_stream_writer # ② is not 哨兵
else self.stream_writer,
previous=self.previous if other.previous is None # ③ is None 精确检测
else other.previous,
execution_info=other.execution_info or self.execution_info, # ① or
server_info=other.server_info or self.server_info, # ① or
)
① context/store/execution_info/server_info 用 or——标准 Python 惯用法,None 或 falsy(包括 False、空字典、0)时都走右边。对这四个字段安全,因为它们的值永远是对象实例或 None,不会是 False/0/{}。
② stream_writer 用 is not _no_op_stream_writer 哨兵检查——因为 stream_writer 的默认值是一个函数 _no_op_stream_writer(定义在 line 77:def _no_op_stream_writer(_: Any) -> None: ...)、函数在 Python 里永远是 truthy、or 会把默认函数当成”真实设置过的值”保留下来。必须用 is not 身份比较——只有参考同一个哨兵函数对象时才算”未提供”。一旦用户传了自定义 writer(哪怕是个 noop 的 lambda)、identity 就不同了、会正确地覆盖默认。
③ previous 用 is None 精确检测——这是三种里最严格的判定。因为 previous 是 functional API 里上次执行的用户返回值、可能是任意值:空字典 {}、空字符串 ""、数字 0、布尔 False。如果用 or、other.previous = 0 时会错误地退回 self.previous——用户会觉得”我明明返回了 0 但下次 runtime 里看到的是上上次的返回值”。用 is None 只在严格未设置时才退回、0/""/{} 等 falsy 值都被如实保留。
这种**“同一个 merge 函数里用三种不同的判定”的设计不是疏忽、是对每个字段真实语义的精确建模。Python 里 None vs falsy 的区别是许多 subtle bug 的根源——LangGraph 用 6 个字段 3 种判定展示了怎么写”不被 Python 语义坑”的合并函数**。每种判定背后都是一个”如果误用另一种会怎样”的反问。
14.2.4 _DC_KWARGS 的三件套:为什么 Runtime 不能用普通 @dataclass
Runtime 不是写成 @dataclass(frozen=True)、而是 @dataclass(**_DC_KWARGS)——_DC_KWARGS 定义在 types.py:401、只有一行但内涵浓缩:
_DC_KWARGS = {"kw_only": True, "slots": True, "frozen": True}
三个标志同时开、少一个都有问题:
① kw_only=True——强制所有字段都必须以关键字传入。意味着用户写 Runtime(some_ctx)(希望按位置传 context)会直接 TypeError、必须写成 Runtime(context=some_ctx)。为什么要这么严?因为 Runtime 的字段顺序是按语义分组的(context/store/stream_writer/previous/execution_info/server_info)、不是按”最重要的排第一”——如果允许位置参数、用户会下意识地 Runtime(ctx, store) 这样写、一旦未来在字段间插入新字段(比如把 previous 挪到 stream_writer 前面)、所有调用代码就都错位了。kw_only 把”字段顺序”从公开 API 里隐藏了、未来怎么重排字段都不破坏兼容。
② slots=True——生成 __slots__ 而不是 __dict__。Runtime 在一次图执行里可能被 override 上千次(每个任务、每次重试都 replace 一次)、每个 Runtime 实例少 ~56 字节(__dict__ 的典型大小)、在长时间运行的服务端累积就是几 MB 级别的节省。而且 __slots__ 让属性查找走 C 层的 descriptor 而不是 Python dict 查找、单次访问快 ~15-20%。但 slots 有代价:不能动态加属性——runtime.my_extra = "foo" 会 AttributeError。对 Runtime 这是特性不是 bug——Runtime 的字段就应该是固定的 6 个、谁想在 runtime 上塞临时变量都是错的、slots 在编译期就拦住了。
③ frozen=True——__setattr__ 会 raise。这是并发安全的核心保险丝:多个任务并发执行时可能共享同一个 Runtime 引用、如果任何一个任务能 self.runtime.store = None 改掉、其他任务就看到脏数据。frozen 让这种改动在类型层面就不可能发生、想”改”只能 replace() 返回新对象——原对象不变、老引用依然安全。
三个标志组合起来的语义是”不可变的、有内存约束的、带命名字段的运行时依赖容器”——每一条都是从”Runtime 要承担什么角色”反推出来的。这不是过度工程、每个 flag 关掉都会引入一种真实的 bug。
14.2.5 _RuntimeOverrides TypedDict:override 的类型签名秘密
override 方法在 runtime.py:204 的签名不是 **overrides: Any、而是:
def override(
self, **overrides: Unpack[_RuntimeOverrides[ContextT]]
) -> Runtime[ContextT]:
return replace(self, **overrides)
关键是 Unpack[_RuntimeOverrides[ContextT]]——Unpack 是 PEP 646/692 里的类型操作符、把一个 TypedDict 展开为 kwargs 类型签名。_RuntimeOverrides 定义在 runtime.py:80:
class _RuntimeOverrides(TypedDict, Generic[ContextT], total=False):
context: ContextT
store: BaseStore | None
stream_writer: StreamWriter
previous: Any
execution_info: ExecutionInfo
server_info: ServerInfo | None
total=False 表示所有字段都是可选的。Unpack 让 IDE 和 mypy 知道”这个 **overrides 只接受这 6 个键、且每个键有固定类型”——写 runtime.override(contxet=x)(拼错)会在类型检查时就被捕获、runtime.override(context=123)(类型错)同样被拦截。
为什么不直接写 def override(self, context=..., store=..., ...) 六个具名参数?——因为那样调用 runtime.override(execution_info=ei) 就必须写出所有默认值或者漏掉的被 replace 吞掉。replace(self, **overrides) 是 dataclasses 的标准做法——未传的字段保持原值、传了的字段覆盖。只有 **overrides 能干净地表达”只替换我给的这几个字段”。Unpack[_RuntimeOverrides] 把可变字段集合和类型安全同时拿到了。
这个设计在 Python 里算较新(Unpack 的 TypedDict 支持是 PEP 692、Python 3.12 才标准化、typing_extensions backport 到老版本)——LangGraph 愿意用它、是因为 Runtime 的使用频率太高、类型错误的成本实在太大。
14.2.6 第三个方法 patch_execution_info
章节只展示了 merge 和 override、但真实 Runtime 还有第三个方法(line 210-218):
def patch_execution_info(self, **overrides: Any) -> Runtime[ContextT]:
"""Return a new runtime with selected execution_info fields replaced."""
if self.execution_info is None:
msg = "Cannot patch execution_info before it has been set"
raise RuntimeError(msg)
return replace(
self,
execution_info=self.execution_info.patch(**overrides),
)
这个方法只改 execution_info 里的字段、不改 Runtime 其他字段——是”二级 replace”:先找到 execution_info 子对象、再对它做 replace、再用 replace 后的值覆盖回 Runtime。语义上等价于 runtime.override(execution_info=runtime.execution_info.patch(**overrides))、但明确命名表达”我只是在更新执行元数据、不是在换整个 runtime”。
if self.execution_info is None: raise RuntimeError 是另一道保险丝——execution_info 只有在任务被 Pregel 实际调度后才存在、未调度前 patch 它是逻辑错误。这种错误在并发场景下难调试(看到的 Runtime 可能来自不同 code path),明确抛 RuntimeError 而不是 silent None 覆盖能在最早的错误发生点报出。
这个方法在第 7 章讲的 _retry.py 里被反复调用——每次重试都通过它给 execution_info 注入 node_attempt(递增的尝试次数)和 node_first_attempt_time(首次尝试时间)。重试不该改 context/store/stream_writer——只该改 “这是第几次尝试”。patch_execution_info 正是为此而生、名字和调用路径都精确反映了意图。
14.2.7 _ensure_execution_info:分布式运行时的”缺口填补”
Runtime.patch_execution_info 的前置条件很严——self.execution_info is None 就 raise。但在分布式运行时(LangGraph Platform)里、这个条件会被打破:任务是由服务端进程准备、序列化、再在 executor 进程里反序列化执行的、executor 不走 _algo.py 里的 OSS 路径、没有人来设置 execution_info。
为此 _retry.py:33 引入了一个”兜底函数” _ensure_execution_info:
def _ensure_execution_info(
runtime: Runtime, config: RunnableConfig, task: PregelExecutableTask
) -> Runtime:
if runtime.execution_info is not None:
return runtime
configurable = config.get(CONF, {})
return runtime.override(
execution_info=ExecutionInfo(
checkpoint_id=configurable.get(CONFIG_KEY_CHECKPOINT_ID) or "",
checkpoint_ns=configurable.get(CONFIG_KEY_CHECKPOINT_NS) or "",
task_id=configurable.get(CONFIG_KEY_TASK_ID) or task.id,
thread_id=configurable.get(CONFIG_KEY_THREAD_ID),
run_id=str(rid) if (rid := config.get("run_id")) else None,
),
)
逻辑是:如果 execution_info 已经有了(OSS 路径走过)、直接返回;否则从 config[CONF] 里凑——CONFIG_KEY_CHECKPOINT_ID/_NS/_TASK_ID/_THREAD_ID 都是任务 config 里一定有的字段(因为序列化传过来时保留了)、task.id 作为 task_id 的兜底。
这个函数只在 run_with_retry 和 arun_with_retry 的入口被调用一次(_retry.py:100 和 :205)——在 patch_execution_info(node_first_attempt_time=...) 之前、确保 execution_info 存在。这是整个 Runtime 体系里唯一为分布式部署开的后门——如果没有它、LangGraph Platform 上运行任何带 RetryPolicy 的节点都会抛”Cannot patch execution_info before it has been set”。
为什么不在 Runtime 构造时就强制 execution_info 非空?——因为 OSS 本地运行时、Runtime 是在 Pregel 初始化阶段创建的、那时还没有”任务”的概念、execution_info 还没法填。强制非空就让用户在 graph.invoke 之前手动构造 Runtime——那是违反直觉的。让它能为 None、由框架在”任务准备阶段”填上、是最干净的分层。但这也意味着分布式路径绕过了任务准备阶段、所以需要 _ensure_execution_info 这一道补丁——是分布式架构在 OSS 抽象上撕开的口子、用最小侵入补回来。
14.3 ContextT 泛型
14.3.1 定义
# langgraph/typing.py
ContextT = TypeVar("ContextT", bound=StateLike | None, default=None)
ContextT 是一个带默认值的类型变量,约束为 StateLike | None。StateLike 包括 TypedDict、BaseModel、dataclass 等结构化类型。默认值为 None,这意味着如果不指定 context_schema,Runtime 的 context 字段类型就是 None。
细节是:这个 TypeVar 不是从标准 typing 导入、而是 typing_extensions.TypeVar(typing.py:3)。原因是 default=None 是 PEP 696 的特性、Python 3.13 才进 stdlib、LangGraph 支持 3.10+ 所以必须用 typing_extensions 的 backport。这一点看似无关紧要、但它决定了**StateGraph 在 3.10/3.11/3.12 上都能写 StateGraph[MyState] 而不是 StateGraph[MyState, None]**——ContextT 有 default、省略它时自动 fallback 到 None。换任何标准 TypeVar 都做不到这点。
14.3.2 类型传播链路
flowchart LR
Schema["context_schema=MyContext"] --> SG["StateGraph[State, MyContext]"]
SG --> Compile["compile()"]
Compile --> CSG["CompiledStateGraph[State, MyContext, ...]"]
CSG --> Invoke["invoke(input, context=MyContext(...))"]
Invoke --> RT["Runtime[MyContext]"]
RT --> Node["node(state, runtime: Runtime[MyContext])"]
类型从 StateGraph 的 context_schema 参数开始,贯穿编译、调用、注入的全过程。IDE 和类型检查器可以在每一步提供准确的类型补全。
14.3.3 使用示例
from dataclasses import dataclass
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from typing_extensions import TypedDict
@dataclass
class AppContext:
user_id: str
api_key: str
is_admin: bool = False
class State(TypedDict, total=False):
response: str
def my_node(state: State, runtime: Runtime[AppContext]) -> State:
# IDE 知道 runtime.context 的类型是 AppContext
user_id = runtime.context.user_id
if runtime.context.is_admin:
return {"response": f"Admin {user_id}: full access"}
return {"response": f"User {user_id}: limited access"}
graph = (
StateGraph(state_schema=State, context_schema=AppContext)
.add_node("my_node", my_node)
.set_entry_point("my_node")
.set_finish_point("my_node")
.compile()
)
result = graph.invoke({}, context=AppContext(user_id="alice", api_key="sk-..."))
14.4 ExecutionInfo:执行元数据
14.4.1 数据结构
@dataclass(frozen=True, slots=True)
class ExecutionInfo:
"""Read-only execution info/metadata for the current thread/run/node."""
checkpoint_id: str
"""The checkpoint ID for the current execution."""
checkpoint_ns: str
"""The checkpoint namespace for the current execution."""
task_id: str
"""The task ID for the current execution."""
thread_id: str | None = None
"""None when running without a checkpointer."""
run_id: str | None = None
"""None when run_id is not provided in RunnableConfig."""
node_attempt: int = 1
"""Current node execution attempt number (1-indexed)."""
node_first_attempt_time: float | None = None
"""Unix timestamp for when the first attempt started."""
ExecutionInfo 提供了节点函数可能需要的所有执行上下文信息,而无需直接操作低层的 RunnableConfig。
14.4.2 字段用途
graph TB
EI[ExecutionInfo]
EI --> CID["checkpoint_id<br/>当前检查点 ID<br/>用于状态追踪"]
EI --> CNS["checkpoint_ns<br/>检查点命名空间<br/>标识子图层级"]
EI --> TID["task_id<br/>任务 ID<br/>唯一标识本次执行"]
EI --> ThID["thread_id<br/>线程 ID<br/>跨轮次对话标识"]
EI --> RID["run_id<br/>运行 ID<br/>单次调用标识"]
EI --> NA["node_attempt<br/>重试次数<br/>1 表示首次执行"]
EI --> NFAT["node_first_attempt_time<br/>首次尝试时间<br/>用于超时计算"]
典型的使用场景:
def my_node(state: State, runtime: Runtime) -> State:
info = runtime.execution_info
# 日志中记录执行上下文
logger.info(f"Thread={info.thread_id}, Task={info.task_id}, Attempt={info.node_attempt}")
# 根据重试次数调整行为
if info.node_attempt > 1:
logger.warning("Retrying, using fallback strategy")
# 使用 thread_id 做线程级缓存
cache_key = f"{info.thread_id}:{info.task_id}"
...
14.4.3 patch 方法
ExecutionInfo 是 frozen 的,但提供了 patch 方法创建修改后的副本:
def patch(self, **overrides: Any) -> ExecutionInfo:
"""Return a new execution info object with selected fields replaced."""
return replace(self, **overrides)
框架在重试时使用这个方法更新 node_attempt 和 node_first_attempt_time。
14.5 ServerInfo:服务端元数据
14.5.1 数据结构
@dataclass(frozen=True, slots=True)
class ServerInfo:
"""Metadata injected by LangGraph Server."""
assistant_id: str
"""The assistant ID for the current execution."""
graph_id: str
"""The graph ID for the current execution."""
user: BaseUser | None = None
"""The authenticated user, if any."""
ServerInfo 只在 LangGraph Platform(部署服务)环境中被填充。在本地开源运行时,runtime.server_info 始终为 None。
14.5.2 _build_server_info:为什么 isinstance 不够、要 hasattr 兜底
server_info 不是节点函数传进来的、是 Pregel 在初始化时从 config["metadata"] 和 config[CONF]["langgraph_auth_user"] 里重建出来的。重建逻辑在 main.py:3659:
def _build_server_info(config, parent_runtime) -> ServerInfo | None:
metadata = config.get("metadata") or {}
configurable = config.get(CONF) or {}
assistant_id = metadata.get("assistant_id")
graph_id = metadata.get("graph_id")
auth_user_data = configurable.get("langgraph_auth_user")
user: BaseUser | None = None
if auth_user_data is not None:
if isinstance(auth_user_data, BaseUser) or hasattr(auth_user_data, "identity"):
user = cast(BaseUser, auth_user_data)
if assistant_id is not None or graph_id is not None or user is not None:
return ServerInfo(
assistant_id=str(assistant_id) if assistant_id else "",
graph_id=str(graph_id) if graph_id else "",
user=user,
)
return None
最耐人寻味的是这一行:
if isinstance(auth_user_data, BaseUser) or hasattr(auth_user_data, "identity"):
**为什么 isinstance(auth_user_data, BaseUser) 不够、要 hasattr(auth_user_data, "identity") 兜底?**源码注释给出了答案:
We prefer isinstance(BaseUser) but fall back to hasattr(“identity”) because the server’s ProxyUser provides
permissionsvia__getattr__, which Python’s runtime_checkable Protocol check doesn’t see.
BaseUser 是 runtime_checkable 的 Protocol——isinstance 检查要求所有声明的属性都必须在对象的 __dict__ 或类层级上可见。但 ProxyUser(LangGraph Platform 用于跨进程传递用户的代理对象)通过 __getattr__ 动态代理属性访问——hasattr(proxy, "permissions") 返回 True(动态触发 __getattr__ 并成功返回)、但 isinstance(proxy, BaseUser) 返回 False(因为 runtime_checkable 的实现不走 __getattr__)。
解决办法就是这个 or 链:先用 isinstance 对”真正实现 BaseUser 的用户对象”做快速检查、再用 hasattr(..., "identity") 对 ProxyUser 兜底。identity 是 BaseUser 最核心的属性(用户唯一 ID)、如果一个对象连 identity 都没有、它大概率不是用户对象。这一行体现了生产环境 Protocol 和动态代理的冲突、是分布式系统设计里常见的真实权衡。
14.5.3 BaseUser 协议
# 来自 langgraph_sdk.auth.types
class BaseUser:
"""认证用户协议,支持属性访问和字典访问"""
identity: str # 用户唯一标识
# 支持 user.identity 和 user["identity"] 两种访问方式
这使得节点函数可以在有认证的环境中安全地获取用户信息:
def secure_node(state: State, runtime: Runtime) -> State:
if runtime.server_info and runtime.server_info.user:
user_id = runtime.server_info.user.identity
else:
user_id = "anonymous"
...
14.6 Context vs State 的本质区别
14.6.1 概念对比
这是理解 LangGraph 运行时模型的关键区分:
graph LR
subgraph "State(状态)"
direction TB
S1[可变] --> S2[在节点间流动]
S2 --> S3[被 Channel 管理]
S3 --> S4[支持 reducer 合并]
S4 --> S5[被 Checkpoint 持久化]
end
subgraph "Context(上下文)"
direction TB
C1[不可变] --> C2[在整个执行期间固定]
C2 --> C3[由调用方提供]
C3 --> C4[不参与状态管理]
C4 --> C5[不被 Checkpoint 持久化]
end
| 维度 | State | Context |
|---|---|---|
| 可变性 | 每个节点可以修改 | 整个执行期间不变 |
| 流转方式 | 通过 Channel 在节点间传递 | 通过 Runtime 注入到所有节点 |
| 持久化 | 被 Checkpoint 保存 | 不被保存 |
| 典型内容 | 消息列表、处理结果 | 用户 ID、API 密钥 |
| 定义方式 | state_schema=State | context_schema=Context |
| 传入方式 | graph.invoke(input) | graph.invoke(input, context=ctx) |
14.6.2 为什么 Context 不放在 State 中?
把运行时依赖放在 State 中存在几个问题:
- Checkpoint 污染:数据库连接、API 密钥不应被序列化到 checkpoint
- 类型混淆:状态字段应该是”数据”,而不是”工具”
- 安全风险:checkpoint 可能被导出或共享,敏感信息不应出现在其中
- 语义错误:reducer 不应该对”用户 ID”做
operator.add
Context 通过将依赖项与数据分离,彻底解决了这些问题。
14.6.3 为什么 Context 不放在 Config 中?
LangGraph 0.6.0 之前,运行时依赖通过 RunnableConfig.configurable 传递(即旧的 config_schema 参数)。这种方式有几个缺点:
- 类型不安全:config 是
dict[str, Any],失去了泛型类型信息 - API 混乱:config 的主要用途是传递 thread_id、checkpoint_id 等框架参数
- 嵌套访问:需要
config["configurable"]["user_id"]这样的深层访问
context_schema 和 Runtime[ContextT] 提供了一流的、类型安全的替代方案:
# 旧方式(已弃用)
def my_node(state, config: RunnableConfig):
user_id = config["configurable"]["user_id"] # 无类型提示
# 新方式
def my_node(state, runtime: Runtime[AppContext]):
user_id = runtime.context.user_id # IDE 自动补全
14.7 Runtime 注入机制
14.7.1 注入链路总览
flowchart TB
subgraph 调用层
Caller["graph.invoke(input, context=ctx)"]
end
subgraph Pregel 初始化
Caller --> CreateRT["创建 Runtime(context=ctx, store=store)"]
CreateRT --> InjectConfig["写入 config[CONF][CONFIG_KEY_RUNTIME]"]
end
subgraph 任务准备
InjectConfig --> PNT["prepare_next_tasks"]
PNT --> PST["prepare_single_task / prepare_push_task_send"]
PST --> Override["runtime.override(<br/>previous=...,<br/>store=...,<br/>execution_info=...)"]
Override --> TaskConfig["写入 task config[CONF][CONFIG_KEY_RUNTIME]"]
end
subgraph 节点执行
TaskConfig --> GetRT["节点接收 runtime 参数"]
GetRT --> UseRT["runtime.context.user_id"]
end
14.7.2 CONFIG_KEY_RUNTIME 为什么要 sys.intern
注入链路的每一环都通过 CONFIG_KEY_RUNTIME 这个字符串键访问 config。它在 _internal/_constants.py:65 定义:
CONFIG_KEY_RUNTIME = sys.intern("__pregel_runtime")
sys.intern 让 Python 把这个字符串加入全局驻留表——所有字面量相同的字符串在内存里指向同一个对象。这看似没必要(Python 对短的 ASCII 字符串会自动 intern),但加上 sys.intern 保证了无论字符串怎么来的(拼接、从 JSON 反序列化、跨模块传递)都是同一个对象。
**为什么对 Runtime key 特别做这件事?**因为每次任务准备都要做 configurable.get(CONFIG_KEY_RUNTIME)、每次 dict 查找的第一步就是计算 key 的 hash——interned 字符串的 hash 是算一次缓存住的、非 interned 每次查找都重算 hash。在大型图里一次 stream 调用可能触发 10k+ 次 configurable.get、这个微优化累积起来很可观。更重要的是:intern 后的字符串比较是指针比较(identity)、"__pregel_runtime" == CONFIG_KEY_RUNTIME 在 dict 内部可以 short-circuit 为 is 比较、完全不走 __eq__。
LangGraph 对 _internal/_constants.py 里所有 CONFIG_KEY_*都做了 intern——这是一个”小优化不小”的系统设计:单次收益可忽略、乘以百万次调用就是毫秒级的差异、而实现成本就是一行 sys.intern(...)。
14.7.3 Pregel 初始化阶段
当调用 graph.invoke(input, context=ctx) 时,Pregel 的 stream 方法将 context 封装到 Runtime 中,并存入配置:
# Pregel.stream 中的简化逻辑
runtime = Runtime(context=context, store=self.store)
config = patch_configurable(config, {CONFIG_KEY_RUNTIME: runtime})
14.7.4 _coerce_context:dict 到 dataclass/BaseModel 的自动转换
用户调用 graph.invoke({}, context={"user_id": "alice"}) 时、传的是 dict、但节点函数里 runtime.context.user_id 是属性访问——需要 dict 被转换成 AppContext dataclass。这个转换由 _coerce_context 完成(main.py:3691):
def _coerce_context(context_schema, context) -> ContextT | None:
if context is None:
return None
if context_schema is None:
return context
schema_is_class = issubclass(context_schema, BaseModel) or is_dataclass(context_schema)
if isinstance(context, dict) and schema_is_class:
return context_schema(**context)
return cast(ContextT, context)
四层分支:(1)None 直接返回 None——没 context 就是没 context。(2)没 context_schema 时原样返回——退化到兼容模式、用户没声明类型、框架不做任何转换。(3)dict + class schema → 调用构造函数——AppContext(**context) 会把 dict 展开成构造参数、字段名不匹配就 TypeError、字段类型不匹配(BaseModel 验证)就 ValidationError。(4)其他情况原样 cast——比如用户已经传了 AppContext(...) 实例、不需要转换。
这里有一个微妙点:TypedDict 不走第三条路径——is_dataclass(TypedDict_subclass) 是 False、issubclass(td, BaseModel) 也是 False——所以走第四条”原样返回”。这是对的:TypedDict 在运行时就是 dict、节点里 runtime.context["user_id"] 是字典访问而不是属性访问、根本不需要”转换”。
这个函数把”怎么表达 Context”的自由度完全交给用户:dataclass、BaseModel、TypedDict、甚至自定义类都行。框架只在严格需要转换时介入(dict + 非 dict schema)、其他情况都尊重用户的数据形态。这种不霸道的设计让 Context 能适配任何已存在的数据模型。
14.7.5 任务准备阶段
在 prepare_single_task 中,框架从配置中取出 Runtime,注入任务级别的信息:
# prepare_single_task 中的 PULL 任务逻辑
runtime = cast(
Runtime, configurable.get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
)
runtime = runtime.override(
previous=checkpoint["channel_values"].get(PREVIOUS, None),
store=store,
execution_info=ExecutionInfo(
checkpoint_id=checkpoint["id"],
checkpoint_ns=task_checkpoint_ns,
task_id=task_id,
thread_id=configurable.get(CONFIG_KEY_THREAD_ID),
run_id=str(rid) if (rid := config.get("run_id")) else None,
),
)
每个任务都获得一个新的 Runtime 实例(因为 frozen,所以是 replace 创建的新对象),其中 execution_info 包含了该任务特有的元数据。
14.7.6 KWARGS_CONFIG_KEYS:基于签名的参数注入
节点函数可以这样声明签名:
def my_node(state: State) -> State: ...
def my_node(state: State, runtime: Runtime[Ctx]) -> State: ...
def my_node(state: State, *, store: BaseStore, writer: StreamWriter) -> State: ...
def my_node(state: State, *, config: RunnableConfig, runtime: Runtime) -> State: ...
框架怎么知道该注入哪些参数?答案在 _runnable.py:132 的 KWARGS_CONFIG_KEYS 注册表——一组 (kwarg_name, accepted_types, runtime_attr, default) 元组:
KWARGS_CONFIG_KEYS = (
("config", (RunnableConfig, "RunnableConfig", Optional[RunnableConfig], "Optional[RunnableConfig]", Parameter.empty),
"N/A", Parameter.empty),
("writer", (StreamWriter, "StreamWriter", Parameter.empty),
"stream_writer", lambda _: None),
("store", (BaseStore, "BaseStore", Parameter.empty),
"store", Parameter.empty),
("store", (Optional[BaseStore], "Optional[BaseStore]"),
"store", None),
("previous", (ANY_TYPE,),
"previous", Parameter.empty),
("runtime", (ANY_TYPE,),
"N/A", Parameter.empty),
)
六条里有三个精彩设计:
① store 出现两次——一次是 BaseStore(无默认、若框架没提供 store 就报错)、一次是 Optional[BaseStore](默认 None、接受缺失)。这就让用户可以用声明的类型来表达”我要不要 store”:写 store: BaseStore 表示”必须有、否则是配置错误”、写 store: BaseStore | None 表示”最好有、没有我也能跑”。框架通过匹配精确的类型注解走不同分支——类型不仅是文档、还是运行时的契约。
② ANY_TYPE = object() 哨兵——用在 previous 和 runtime 上。previous 的类型由 ContextT 决定(用户返回什么框架就传什么)、runtime 的类型是 Runtime[某个泛型]——这些都不能在元组里枚举完、用 ANY_TYPE 表示”不检查类型、只看名字”。检查逻辑在 _runnable.py:303:if typ != (ANY_TYPE,) and p.annotation not in typ: continue——ANY_TYPE 就跳过类型检查。
③ "N/A" 表示 runtime_attr——config 和 runtime 两行的 runtime_attr 都是 "N/A"。因为 config 直接来自 invoke 的参数、不从 Runtime 对象拿;runtime 则是”整个 Runtime 对象”本身。调用注入时(_runnable.py:349)对这两个有特殊分支:
if kw == "config":
kw_value = config # 直接用
elif runtime:
if kw == "runtime":
kw_value = runtime # 注入整个对象
else:
kw_value = getattr(runtime, runtime_key) # 从 runtime 拿属性
VALID_KINDS = (POSITIONAL_OR_KEYWORD, KEYWORD_ONLY) 进一步限制:只有”可按关键字传”的参数才参与注入——*args/**kwargs 那样的变参、以及 positional-only 参数都被排除。保证框架只注入到明确命名的槽位、不会意外破坏用户的变参收集逻辑。
这个注册表的设计价值在于添加新的可注入参数只改一行——比如以后要支持 tracer: Tracer、只要在元组里加一行、__init__ 里的签名检查自动扩展、不用改任何核心逻辑。是典型的数据驱动扩展。
14.7.7 节点函数接收
框架通过检查节点函数的参数签名,自动注入 Runtime:
# 节点函数可以声明 runtime 参数
def my_node(state: State, runtime: Runtime[AppContext]) -> State:
...
# 或者通过 get_runtime() 手动获取
from langgraph.runtime import get_runtime
def my_node(state: State) -> State:
runtime = get_runtime(AppContext) # 返回 Runtime[AppContext]
...
get_runtime 函数从当前线程的配置中提取 Runtime:
def get_runtime(context_schema: type[ContextT] | None = None) -> Runtime[ContextT]:
# 源码注释说明:理想情况下 runtime 应该有独立于 config 的
# context manager;这依赖未来移除 configurable packing。
runtime = cast(Runtime[ContextT], get_config()[CONF].get(CONFIG_KEY_RUNTIME))
return runtime
注意源码里保留的待办注释——这透露了一个真实的设计张力:理想情况下 Runtime 应该有独立的 ContextVar/contextvars 管理、而不是塞在 config 里。contextvars 是 Python 3.7+ 的标准异步感知的上下文栈、asyncio 任务切换时自动跟着走;而现在 Runtime 靠 config 传递、每次 patch_configurable 都要拷贝整个 CONF dict——是历史包袱。
未来一旦 LangGraph 彻底移除 “configurable packing”(把运行时上下文从 RunnableConfig 里剥离),get_runtime 就会变成 CONTEXT_VAR.get() 这样的一行——更快、更干净、线程/协程安全。但现在为了兼容 LangChain Core 的 config 语义、还得用这种借道 config 的次优实现。这种维护者注释是一封”代码里的来信”、告诉后来者”这里还有一步要走”。
context_schema 参数其实完全没被使用——它的唯一作用是类型推导:帮 IDE 和 mypy 知道返回值是 Runtime[AppContext] 而不是 Runtime[Any]。运行时这个参数被忽略、因为 Runtime 对象里并没有存它的 schema 类型(类型信息在运行时已被擦除)。这是典型的类型幽灵参数(phantom type parameter)——只为了让静态类型系统工作、运行时无用。
14.7.8 stream_writer 的三分支选择:custom/CONFIG_KEY_STREAM/noop
Runtime 的 stream_writer 字段不是静态的——在 main.py:2628 它根据当前的流模式动态选择:
if "custom" in stream_modes:
def stream_writer(c: Any) -> None:
stream.put(
(
tuple(get_config()[CONF][CONFIG_KEY_CHECKPOINT_NS].split(NS_SEP)[:-1]),
"custom",
c,
)
)
elif CONFIG_KEY_STREAM in config[CONF]:
stream_writer = config[CONF][CONFIG_KEY_RUNTIME].stream_writer
else:
def stream_writer(c: Any) -> None:
pass
三种情况三种 writer:
① 用户传了 stream_mode="custom"——构造一个带命名空间的 writer:从 CONFIG_KEY_CHECKPOINT_NS 读出当前子图的命名空间、把写入的内容包装成 (namespace, "custom", content) 三元组、push 到 stream 通道。用户在 graph.stream(input, stream_mode="custom") 就能收到这些内容、并通过命名空间区分是哪个子图发出的。
② 当前是子图、且父图已经设置了 CONFIG_KEY_STREAM——直接复用父图的 stream_writer(父图的 config[CONF][CONFIG_KEY_RUNTIME].stream_writer)。这样子图通过 runtime.stream_writer(data) 写的内容、会沿着父图的流式管道一路冒上去、用户在顶层 stream 循环里就能看到。
③ 兜底(没有 custom 模式、也不是子图)——定义一个什么都不做的 lambda(注意这个是每次 stream 都新建的、不是 _no_op_stream_writer 那个全局哨兵)。这种情况下节点写 runtime.stream_writer(x) 完全没效果、x 静默丢弃。
这三个分支的优先级非常讲究——custom 模式显式表达了”用户要自己定义流输出”、优先级最高;没 custom 时退回到子图继承父图的流;再没有就 noop。永远不是”自动把所有 stream_writer 输出都发到默认 stream”——因为那会把 custom stream 和 debug stream 混在一起、用户无法区分。
注意 ② 里走的是 parent_runtime.stream_writer、不是 parent_runtime 本身——但这个 stream_writer 已经由父图按同样逻辑选择过了(可能是 parent 的 custom、可能是 parent 的 parent 的 custom……)。这样任意深度的子图嵌套、最终所有 runtime.stream_writer 调用都汇聚到根图的 stream 通道——除非中间某一层用 custom 模式切出了新的流。
14.7.9 async 路径的对称性与差异(main.py:3016)
Sync 路径是 Pregel.stream、async 路径是 Pregel.astream(main.py:3016)——两边对 Runtime 的处理几乎完全对称:
# async 路径(main.py:3027-3039)
parent_runtime = config[CONF].get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
server_info = _build_server_info(config, parent_runtime)
runtime = Runtime(
context=_coerce_context(self.context_schema, context),
store=store,
stream_writer=stream_writer,
previous=None,
execution_info=None,
server_info=server_info,
)
runtime = parent_runtime.merge(runtime)
config[CONF][CONFIG_KEY_RUNTIME] = runtime
和 sync 版本一字不差——同样的 _coerce_context 、同样的 _build_server_info、同样的 parent_runtime.merge(runtime) 顺序。为什么不抽成共享函数?——因为 sync/async 路径还有很多其他差异(SyncPregelLoop vs AsyncPregelLoop、stream 的构造方式、callback 管理器类型)、强行抽公共函数会牵连这些差异、反而让代码难懂。两边平行复制 13 行、是工程上的”冗余换清晰”——以可读性为代价换取两个流程各自闭合、修改时不需要担心”改了 sync 会不会打破 async”。
这是开源 Python 框架里常见的 sync/async 二元性处理——LangChain Core 也是同样风格(每个 Runnable 都有 invoke/ainvoke、stream/astream 两套对称实现)。接受这种重复、而不是强行用装饰器/元类消除它、是因为调试成本:抽象层每多一层、异步死锁或 task 丢失的排查都会多一层间接。
14.8 DEFAULT_RUNTIME 与空操作
DEFAULT_RUNTIME = Runtime(
context=None,
store=None,
stream_writer=_no_op_stream_writer,
previous=None,
execution_info=None,
)
DEFAULT_RUNTIME 是当没有显式提供 context 时使用的默认值。它的所有字段都是”空”或”无操作”的,确保节点代码在没有 context 的情况下也能安全运行。
def _no_op_stream_writer(_: Any) -> None: ...
空操作的 stream_writer 意味着节点调用 runtime.stream_writer(data) 不会产生任何效果——数据会被静默丢弃。这个设计让节点代码不需要检查 runtime 是否”可用”。
14.8.1 DEFAULT_RUNTIME 是模块级单例、不是每次新建
仔细看 runtime.py:221:
DEFAULT_RUNTIME = Runtime(
context=None,
store=None,
stream_writer=_no_op_stream_writer,
previous=None,
execution_info=None,
)
这是一个模块级的全局单例——整个进程里所有”缺省 runtime”都共享这同一个对象。结合 Runtime 是 frozen 的、这个共享完全安全。
为什么做成单例、而不是每次 configurable.get(CONFIG_KEY_RUNTIME, Runtime()) 临时建?——因为 _algo.py 里的 prepare_single_task 里几乎每个任务都会走 configurable.get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)。如果每次都现建、一个大图的一次 step 会有成百上千次无谓的 Runtime 构造——哪怕 frozen + slots 已经把单次构造压到 ~100ns、累积也是 100μs 量级的浪费。单例让”缺省分支”免费——dict.get 的第二参数只是指针、没有任何构造开销。
但单例也带来一个陷阱:永远不要改 DEFAULT_RUNTIME 本身。不过这不是风险——Runtime 是 frozen 的、想改也改不了。这就是 frozen dataclass 单例的设计协同效应——不可变性让单例安全、单例让构造免费、两者相辅相成。
注意 DEFAULT_RUNTIME 没有 server_info 字段——它只写了 5 个字段、server_info 用了默认值 None(dataclass 的 field(default=None))。这意味着开源本地运行时、runtime.server_info 从头到尾都是 None、节点里任何 if runtime.server_info: 判断都会短路到 else 分支——这就是”开源和 Platform 共用同一个 Runtime 类型、但在字段值上分化”的实现方式。
14.8.2 _no_op_stream_writer 哨兵 vs 内联 lambda
§14.2.3 提到 merge 对 stream_writer 用 is not _no_op_stream_writer 哨兵判断。这个哨兵的唯一身份依据是函数对象的 id——runtime.py:77 定义的那个具体函数。
但 stream_writer 的第三分支(§14.7.8 ③)用的不是这个哨兵、而是内联 lambda:
else:
def stream_writer(c: Any) -> None:
pass
每次 Pregel 启动都现建一个新的 pass-lambda、和 _no_op_stream_writer 不是同一个对象。这是有意为之——如果内联 lambda 是 _no_op_stream_writer 本身、那么 parent_runtime.merge(new_runtime) 时、new_runtime 的 stream_writer 会被当成”未提供”、被 parent_runtime 的老 writer 覆盖——完全违背了 Pregel 初始化”用新 writer”的意图。
内联 lambda 让 parent_runtime.merge 正确地把新建的 pass-lambda当成”已提供的 writer”、覆盖 parent_runtime 的老 writer——哪怕这个 lambda 实际上什么都不做。这就是**“语义上相同 ≠ 代码上相同”的微妙——两个 pass-lambda 的行为一样(都是 noop)、但身份**不同、所以 merge 会走不同分支。
这个设计细节体现了 LangGraph 对 is identity 判定的严格运用——身份不是多余信息、它承载着”这是框架默认值”还是”这是用户或运行时设置的值”的语义区分。
14.8.3 previous 字段与函数式 API:为什么只在 checkpointer 存在时才有意义
previous 是 6 个字段里最特殊的一个——只有在函数式 API(@entrypoint 装饰器)且配置了 checkpointer 的情况下才有值。普通的 StateGraph 节点里它永远是 None。
看 _algo.py:670 的赋值:
runtime = runtime.override(
previous=checkpoint["channel_values"].get(PREVIOUS, None),
store=store,
execution_info=ExecutionInfo(...),
)
previous 从 checkpoint["channel_values"] 的特殊键 PREVIOUS 里读——这个键只在函数式 API 的 @entrypoint 返回时被写入。普通 StateGraph 的 checkpoint 里没有 PREVIOUS 键、get(..., None) 返回 None。
为什么需要 previous?——函数式 API 允许这样写:
@entrypoint(checkpointer=MemorySaver())
def my_flow(input_data, *, previous=None):
count = (previous or {}).get("count", 0) + 1
return {"count": count}
previous 是上一次整个 entrypoint 函数的返回值——这就把”函数式累积”做成了自动化:每次调用 my_flow 都能拿到上次的返回值、不用手动维护状态字段。这比 StateGraph 的 channel 机制更轻量——对一些”计数器”、“对话轮次”类的简单状态、函数式 API + previous 就够了。
previous 和 StateGraph 的 State 的关键区别是:State 是在节点间流动、previous 是在调用间流动。State 的生命周期是”一次 invoke 内”、previous 的生命周期是”同一个 thread 的跨 invoke”——时间尺度完全不同。
这也解释了为什么 merge 对 previous 用 is None 精确检测(§14.2.3):函数式 API 的用户可能返回 0/""/False 当作有意义的状态、不能用 or 把它们和”未设置”混为一谈。这个严格判定直接服务于 previous 的独特语义——在 6 个字段里、previous 是唯一可能承载普通 Python 值(而不是”对象或 None”)的字段。
14.9 子图中的 Runtime 传播
14.9.1 merge 语义
当执行进入子图时,子图可能有自己的 store 和 context。Runtime 的 merge 方法用于合并父图和子图的 Runtime:
def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
return Runtime(
context=other.context or self.context, # 子图优先
store=other.store or self.store, # 子图优先
stream_writer=other.stream_writer # 子图优先
if other.stream_writer is not _no_op_stream_writer
else self.stream_writer,
previous=self.previous if other.previous is None else other.previous,
execution_info=other.execution_info or self.execution_info,
server_info=other.server_info or self.server_info,
)
合并策略是”子图覆盖父图”——如果子图提供了自己的 context,则使用子图的;否则继承父图的。
14.9.2 传播示意
graph TB
subgraph 父图
PR["Runtime[ParentCtx]<br/>context=ParentCtx(...)"]
PR --> N1[Node A]
PR --> SubEntry[子图入口]
end
subgraph 子图
SubEntry --> Merge["merge(parent_rt, child_rt)"]
Merge --> CR["Runtime[ChildCtx]<br/>context 继承或覆盖"]
CR --> N2[Node B]
CR --> N3[Node C]
end
14.9.3 合并顺序决定的语义反转:一个真实的 bug 模式
§14.12 已经点出 parent_runtime.merge(runtime) 的顺序不能反、但在真实代码审查里还是经常看到有人写反——因为”合并”这个动词在中文和英文语境里都对称、没有方向直觉。一个真实的思维实验:
# 错误写法
runtime = runtime.merge(parent_runtime) # 新 runtime 反被旧覆盖
后果:子图本次调用传入的 context/store 被父图传下来的老 context/store 吃掉。用户在 debug 时会看到:
- 子图函数里
runtime.context不是自己刚传的那个、而是父图的那个 - 重启进程后问题消失(因为没有”父图”、parent_runtime 是 DEFAULT_RUNTIME、merge 顺序错误的负面效果不明显)
- 只在嵌套子图场景里复现、难以最小化
LangGraph 在源码里用单侧调用和**# build server_info from metadata + parent runtime** 这样的注释来强化正确顺序——把”parent_runtime”放在主语位置、明确它是”底座”而不是”增量”。这是通过命名和结构**让错误代码”看起来明显不对”**的技巧——不是靠运行时检测、而是靠阅读时的语感。
14.10 设计决策
14.10.1 为什么 Runtime 是 frozen 的?
frozen dataclass 带来三个好处:
- 线程安全:并发执行的多个节点读取同一个 Runtime 实例时不会发生数据竞争
- 语义正确性:context 代表”不变的运行时依赖”,frozen 在类型层面强制了这个语义
- 可哈希性:frozen dataclass 默认可哈希,便于缓存和去重
14.10.2 为什么 execution_info 在 Runtime 中而不是单独注入?
将 execution_info 放在 Runtime 中而非作为独立参数注入,有两个原因:
- 减少参数数量:节点函数只需要一个
runtime参数就能访问所有运行时信息 - 一致的生命周期:所有运行时信息在同一个对象中创建和传递,生命周期一致
14.10.3 ToolRuntime 与 Runtime 的关系
langgraph.prebuilt 中还有一个 ToolRuntime 类,它是专为工具函数设计的:
class ToolRuntime(_DirectlyInjectedToolArg, Generic[ContextT, StateT]):
"""Runtime context automatically injected into tools."""
context: ContextT # 与 Runtime 共享
store: BaseStore # 与 Runtime 共享
stream_writer: StreamWriter # 与 Runtime 共享
config: RunnableConfig # 工具特有
state: StateT # 工具特有
tool_call_id: str # 工具特有
ToolRuntime 共享了 Runtime 的 context、store、stream_writer 字段,但增加了工具特有的 config、state 和 tool_call_id。它们之间的关系是互补而非继承——Runtime 服务于节点,ToolRuntime 服务于工具。
14.11 实战:完整的 Runtime 使用案例
14.11.1 多租户 Agent 系统
以下示例展示了如何使用 Runtime 构建一个支持多租户的 Agent 系统,每个用户有独立的数据隔离和权限控制:
from dataclasses import dataclass
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime
from langgraph.store.memory import InMemoryStore
@dataclass
class TenantContext:
"""多租户上下文"""
tenant_id: str
user_id: str
role: str # "admin" | "editor" | "viewer"
db_connection_string: str
class AgentState(TypedDict, total=False):
messages: list
response: str
store = InMemoryStore()
def access_control_node(state: AgentState, runtime: Runtime[TenantContext]) -> dict:
"""访问控制节点:根据角色决定权限"""
ctx = runtime.context
info = runtime.execution_info
# 记录审计日志到 Store
if runtime.store:
runtime.store.put(
("audit", ctx.tenant_id),
f"access_{info.task_id}",
{
"user": ctx.user_id,
"role": ctx.role,
"action": "query",
"thread_id": info.thread_id,
}
)
if ctx.role == "viewer":
return {"response": "You have read-only access."}
return state
def process_node(state: AgentState, runtime: Runtime[TenantContext]) -> dict:
"""业务处理节点:使用租户隔离的数据"""
ctx = runtime.context
# 从租户命名空间读取配置
if runtime.store:
config = runtime.store.get(("tenants", ctx.tenant_id), "config")
model_name = config.value["model"] if config else "default-model"
else:
model_name = "default-model"
# 流式输出处理进度
runtime.stream_writer({"status": "processing", "model": model_name})
return {"response": f"Processed by {model_name} for tenant {ctx.tenant_id}"}
graph = (
StateGraph(state_schema=AgentState, context_schema=TenantContext)
.add_node("access_control", access_control_node)
.add_node("process", process_node)
.add_edge(START, "access_control")
.add_edge("access_control", "process")
.add_edge("process", END)
.compile(store=store)
)
# 不同租户使用不同的 context
result = graph.invoke(
{"messages": ["Hello"]},
context=TenantContext(
tenant_id="acme",
user_id="alice",
role="admin",
db_connection_string="postgresql://acme:...",
)
)
14.11.2 _retry.py 里的三阶段注入:first_attempt_time 只设一次、node_attempt 每轮更新
第 7 章讲过 RetryPolicy 的重试循环、这里专门看它对 Runtime 做了什么。run_with_retry(_retry.py:86)的结构是:
def run_with_retry(task, retry_policy, configurable=None):
...
node_first_attempt_time = time.time() # ① 入口记一次时间
...
runtime = config.get(CONF, {}).get(CONFIG_KEY_RUNTIME)
if isinstance(runtime, Runtime):
runtime = _ensure_execution_info(runtime, config, task)
config = patch_configurable(
config,
{CONFIG_KEY_RUNTIME: runtime.patch_execution_info(
node_first_attempt_time=node_first_attempt_time, # ② 只在入口打一次
)},
)
while True: # ③ 每轮循环
runtime = config.get(CONF, {}).get(CONFIG_KEY_RUNTIME)
if isinstance(runtime, Runtime):
config = patch_configurable(
config,
{CONFIG_KEY_RUNTIME: runtime.patch_execution_info(
node_attempt=attempts + 1, # ④ 每轮更新尝试次数
)},
)
try:
...
return task.proc.invoke(task.input, config)
except Exception:
attempts += 1
...
time.sleep(sleep_time)
第一次 patch(入口、行 104):设置 node_first_attempt_time——这是整个重试会话的首次尝试时间戳、后续所有重试都看到同一个值。这能让节点逻辑判断”我已经在重试区间里花了多长时间”、做超时熔断。比如 if time.time() - runtime.execution_info.node_first_attempt_time > 60: raise——即使 RetryPolicy 设置了无限重试、节点自己也能在 60 秒后果断投降。
第二次 patch(循环内、行 118):每次重试前把 node_attempt 更新为 attempts + 1(1-indexed)。第一次进入循环 attempts = 0、node_attempt = 1;每次失败后 attempts += 1、下一轮 node_attempt = 2/3/...。这样节点可以在代码里根据尝试次数降级:
def my_node(state, runtime):
attempt = runtime.execution_info.node_attempt
if attempt == 1:
return call_fast_model() # 首次尝试用快模型
elif attempt == 2:
return call_slow_model() # 重试用稳模型
else:
return return_cached() # 再不行就返回缓存
这两个字段分开设置、而不是一次 patch 两个——是因为它们的变化频率不同:node_first_attempt_time 只变一次(进入 retry 时)、node_attempt 每轮变一次。拆开后、两次 patch 各自只携带真正需要变的字段、patch_execution_info 内部的 replace 也只复制变化的 slot、无谓的重建开销降到最低。
这个小细节体现了**“不可变数据结构 + 精细 patch”**的设计哲学:frozen 保证安全、patch 保证效率、每次 replace 只动必须动的那一个 slot。
14.11.3 Runtime 在重试场景中的行为
当节点配置了 RetryPolicy 时,Runtime 的 execution_info 会在每次重试中更新:
def flaky_node(state: AgentState, runtime: Runtime) -> dict:
info = runtime.execution_info
print(f"Attempt {info.node_attempt}") # 1, 2, 3...
if info.node_attempt == 1:
raise ConnectionError("Temporary failure")
# 第二次尝试成功
return {"response": "Success after retry"}
框架会通过 runtime.patch_execution_info(node_attempt=2) 创建新的 Runtime 副本,传递给重试的执行。
14.12 parent_runtime.merge 在 stream 入口的真实调用
§14.9 介绍了 merge 的三种未提供判定、这里看它在主流程里被怎么用。Pregel.stream 初始化 Runtime 时(main.py:2657):
# build server_info from metadata + parent runtime
parent_runtime = config[CONF].get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
server_info = _build_server_info(config, parent_runtime)
runtime = Runtime(
context=_coerce_context(self.context_schema, context),
store=store,
stream_writer=stream_writer,
previous=None,
execution_info=None,
server_info=server_info,
)
runtime = parent_runtime.merge(runtime)
config[CONF][CONFIG_KEY_RUNTIME] = runtime
三步:(1)从 config 里拿 parent_runtime——如果当前是子图、父图已经塞过 Runtime、拿出来用;如果是根图、configurable.get 返回 DEFAULT_RUNTIME。(2)构造子图本次的 Runtime——context 通过 _coerce_context 规范化、store/stream_writer 来自调用参数、server_info 由 _build_server_info 从 metadata 重建。(3)merge 父子 Runtime——关键在这行、parent_runtime.merge(runtime) 而不是 runtime.merge(parent_runtime)、顺序不能反。
回看 merge 的实现(§14.2.3)、other 是优先的、self 是 fallback。parent_runtime.merge(runtime) 读作”以 parent_runtime 为底、用新 runtime 覆盖”——新 runtime 提供的字段会覆盖 parent、没提供的从 parent 继承。这是子图继承父图上下文的语义:子图调用方没传 context、就用父图的;子图调用方传了新 store、就用新的。
如果反过来写 runtime.merge(parent_runtime)、语义就变成”用 parent_runtime 覆盖新 runtime”——父图的老 context 会把子图调用方刚设好的新 context 覆盖掉、子图调用完全失去了传 context 的能力。参数顺序决定语义方向、两个看似对称的名字背后是严格的左右区分。这就是 API 设计里”动词 + 施事 + 受事”的微妙——a.merge(b) 里 a 是旧态、b 是增量、b 胜。
14.13 本章与全书体系的呼应
与第 7 章(重试与错误处理)的呼应——本章展示了 patch_execution_info 是如何在 run_with_retry 的每一轮循环里被调用、把 node_attempt 递增注入节点。第 7 章讲的是”重试怎么发生”(何时重试、怎么退避、何时放弃),本章补齐了”重试怎么被节点感知”——节点通过 runtime.execution_info.node_attempt 拿到第几次尝试的信息、能做自适应降级。这两章叠在一起就是”重试”这件事的完整闭环。
与第 15 章(Store)的呼应——runtime.store 是 LangGraph 里跨线程持久化的唯一入口。本章只讲了 “store 通过 Runtime 传给节点”、第 15 章展开讲”store 本身怎么工作、BaseStore 4 个操作、AsyncBatchedBaseStore 的 drain”。Runtime 和 Store 是依赖注入关系——Runtime 是”依赖载体”、Store 是”依赖内容”。
与第 16 章(Prebuilt 与工具)的呼应——§14.10.3 提到 ToolRuntime 是 Runtime 的姊妹类、第 16 章详解 ToolRuntime 的 8 个字段(context/store/stream_writer 共享、config/state/tool_call_id 独有、previous/execution_info 都暴露)和 _inject_tool_args 如何把它注入到工具调用。Runtime 是节点的依赖容器、ToolRuntime 是工具的依赖容器——两者互补、共享基础字段但针对不同的调用场景扩展。
与第 17 章(Multi-Agent)的呼应——多 Agent 系统里每个 Agent 可以有自己的 Context(不同的 API key、不同的权限)、都通过 Runtime 传递。第 17 章讲的 Send/Command 是流程控制、本章的 Runtime 是运行时依赖——两者组合起来就是”一个 Agent 知道自己是谁(Runtime.context)、也知道该去哪(Send/Command)”。
跨书呼应:和 hyper-tower 的 HTTP 请求上下文——第 12 章讲 hyper 的 Dispatcher 如何在 poll_read/poll_write 之间传递请求状态。Runtime 在 LangGraph 里的角色和 hyper 里的 Conn 状态机异曲同工——都是执行期间不可变的会话级上下文、承载跨步骤共享的信息。不同之处在于 hyper 的状态机是同步可变的、LangGraph 的 Runtime 是 frozen 的——体现了异步并发世界和单线程事件循环世界对”可变性”的不同偏好。
14.14 小结
本章深入分析了 LangGraph 的 Runtime 与 Context 机制。Runtime[ContextT] 通过泛型类型参数将运行时依赖注入从一个”约定”提升为一个”类型安全的协议”。六个字段——context、store、stream_writer、previous、execution_info、server_info——覆盖了节点函数可能需要的全部运行时信息。frozen 语义确保了并发安全,override 和 merge 方法提供了不可变更新的能力。
Context 与 State 的分离是 LangGraph 架构中的关键决策:State 是”随步骤变化的数据”,通过 Channel 管理和 Checkpoint 持久化;Context 是”整个执行期间不变的依赖”,通过 Runtime 注入且不被持久化。这种分离让状态管理更纯粹,同时为敏感信息(如 API 密钥)提供了安全的传递通道。
下一章我们将探讨 BaseStore 接口和 InMemoryStore 实现,了解 LangGraph 如何提供跨线程的长期记忆能力。