LangGraph 设计与实现

第14章 Runtime 与 Context

作者 杨艺韬 · 12,491 字

第14章 Runtime 与 Context

14.1 引言

在构建 LLM 应用的图时,节点函数往往需要访问一些”运行时依赖”——当前用户的身份信息、数据库连接池、API 密钥、或者一个全局的向量存储。这些依赖既不属于图的状态(它们不随步骤变化),也不应该被硬编码在节点函数中(它们因调用而异)。传统做法是通过闭包或全局变量传递,但这在测试、多租户和类型安全方面都存在痛点。

LangGraph 1.1.6 引入了 Runtime 类和 ContextT 泛型来解决这个问题。Runtime 是一个不可变的数据容器,在图执行开始时由调用方创建,自动注入到每个节点函数中。它携带了 context(用户自定义的运行时上下文)、store(持久化存储)、stream_writer(流式写入器)、execution_info(执行元数据)等运行时信息。

本章将从 Runtime 的数据类定义出发,分析 ContextT 泛型的设计理念、ExecutionInfoServerInfo 的信息模型、context 与 state 的本质区别,以及 Runtime 在 Pregel 循环中的注入机制。

本章要点

  1. Runtime 类的完整字段定义——context、store、stream_writer、previous、execution_info、server_info
  2. ContextT 泛型的类型传播——从 StateGraph 到节点函数的端到端类型安全
  3. ExecutionInfoServerInfo 的信息模型——执行元数据的结构化表达
  4. Context vs State 的本质区别——不可变依赖 vs 可变状态
  5. Runtime 注入机制——从编译到执行的完整链路

14.2 Runtime 类的设计

14.2.1 数据类定义

Runtime 定义在 langgraph/runtime.py 中,是一个泛型冻结数据类:

@dataclass(**_DC_KWARGS)  # kw_only=True, slots=True, frozen=True
class Runtime(Generic[ContextT]):
    """Convenience class that bundles run-scoped context and other runtime utilities."""

    context: ContextT = field(default=None)
    """Static context for the graph run, like user_id, db_conn, etc."""

    store: BaseStore | None = field(default=None)
    """Store for the graph run, enabling persistence and memory."""

    stream_writer: StreamWriter = field(default=_no_op_stream_writer)
    """Function that writes to the custom stream."""

    previous: Any = field(default=None)
    """The previous return value for the given thread (functional API only)."""

    execution_info: ExecutionInfo | None = field(default=None)
    """Read-only execution information/metadata for the current node run."""

    server_info: ServerInfo | None = field(default=None)
    """Metadata injected by LangGraph Server. None for open-source."""

_DC_KWARGS 展开为 kw_only=True, slots=True, frozen=True,这意味着:

  • kw_only:所有字段必须通过关键字参数传递,避免位置参数的歧义
  • slots:使用 __slots__ 优化内存和属性访问速度
  • frozen:实例创建后不可修改,确保运行时安全

14.2.2 字段语义

graph TB
    Runtime[Runtime 对象]
    Runtime --> Context["context: ContextT<br/>用户自定义上下文<br/>如 user_id, db_conn"]
    Runtime --> Store["store: BaseStore | None<br/>持久化存储<br/>跨线程记忆"]
    Runtime --> SW["stream_writer: StreamWriter<br/>自定义流式写入<br/>发射中间结果"]
    Runtime --> Prev["previous: Any<br/>上次执行的返回值<br/>仅函数式 API"]
    Runtime --> EI["execution_info: ExecutionInfo<br/>执行元数据<br/>checkpoint_id, task_id 等"]
    Runtime --> SI["server_info: ServerInfo<br/>服务器元数据<br/>assistant_id, user 等"]

六个字段覆盖了节点函数可能需要的所有运行时信息:

字段类型来源可变性
contextContextT(泛型)调用方传入整个执行期间不变
storeBaseStore图编译时配置引用不变,内容可变
stream_writerStreamWriter框架自动注入每个任务独立
previousAnyCheckpoint 读取只读
execution_infoExecutionInfo框架生成每个任务独立
server_infoServerInfoLangGraph Server只读

14.2.3 不可变性与 override/merge

虽然 Runtime 是 frozen 的,但它提供了两个方法来创建修改后的副本:

def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
    """Merge two runtimes together. If a value is not provided in other,
    the value from self is used."""
    return Runtime(
        context=other.context or self.context,
        store=other.store or self.store,
        stream_writer=other.stream_writer
            if other.stream_writer is not _no_op_stream_writer
            else self.stream_writer,
        previous=self.previous if other.previous is None else other.previous,
        execution_info=other.execution_info or self.execution_info,
        server_info=other.server_info or self.server_info,
    )

def override(self, **overrides) -> Runtime[ContextT]:
    """Replace the runtime with a new runtime with the given overrides."""
    return replace(self, **overrides)

merge 用于子图继承父图的 Runtime 时,合并两个 Runtime 对象。override 用于框架在任务准备阶段注入特定字段(如 execution_info)。

merge 的三种”未提供”判定

merge 的实现乍看只是”6 个字段各选一个非空的”、实际源码里三种字段用了三种不同的判定方式——每一种都在处理 Python None/falsy 歧义的不同坑:

# runtime.py:188 真实实现
def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
    return Runtime(
        context=other.context or self.context,                    # ① or
        store=other.store or self.store,                          # ① or
        stream_writer=other.stream_writer
            if other.stream_writer is not _no_op_stream_writer    # ② is not 哨兵
            else self.stream_writer,
        previous=self.previous if other.previous is None          # ③ is None 精确检测
                 else other.previous,
        execution_info=other.execution_info or self.execution_info,  # ① or
        server_info=other.server_info or self.server_info,        # ① or
    )

context/store/execution_info/server_infoor——标准 Python 惯用法,None 或 falsy(包括 False、空字典、0)时都走右边。对这四个字段安全,因为它们的值永远是对象实例或 None,不会是 False/0/{}

stream_writeris not _no_op_stream_writer 哨兵检查——因为 stream_writer 的默认值是一个函数 _no_op_stream_writer(定义在 line 77:def _no_op_stream_writer(_: Any) -> None: ...)、函数在 Python 里永远是 truthy、or 会把默认函数当成”真实设置过的值”保留下来。必须用 is not 身份比较——只有参考同一个哨兵函数对象时才算”未提供”。一旦用户传了自定义 writer(哪怕是个 noop 的 lambda)、identity 就不同了、会正确地覆盖默认。

previousis None 精确检测——这是三种里最严格的判定。因为 previous 是 functional API 里上次执行的用户返回值、可能是任意值:空字典 {}、空字符串 ""、数字 0、布尔 False。如果用 orother.previous = 0 时会错误地退回 self.previous——用户会觉得”我明明返回了 0 但下次 runtime 里看到的是上上次的返回值”。用 is None 只在严格未设置时才退回、0/""/{} 等 falsy 值都被如实保留。

这种**“同一个 merge 函数里用三种不同的判定”的设计不是疏忽、是对每个字段真实语义的精确建模。Python 里 None vs falsy 的区别是许多 subtle bug 的根源——LangGraph 用 6 个字段 3 种判定展示了怎么写”不被 Python 语义坑”的合并函数**。每种判定背后都是一个”如果误用另一种会怎样”的反问。

14.2.4 _DC_KWARGS 的三件套:为什么 Runtime 不能用普通 @dataclass

Runtime 不是写成 @dataclass(frozen=True)、而是 @dataclass(**_DC_KWARGS)——_DC_KWARGS 定义在 types.py:401、只有一行但内涵浓缩:

_DC_KWARGS = {"kw_only": True, "slots": True, "frozen": True}

三个标志同时开、少一个都有问题:

kw_only=True——强制所有字段都必须以关键字传入。意味着用户写 Runtime(some_ctx)(希望按位置传 context)会直接 TypeError必须写成 Runtime(context=some_ctx)。为什么要这么严?因为 Runtime 的字段顺序是按语义分组的(context/store/stream_writer/previous/execution_info/server_info)、不是按”最重要的排第一”——如果允许位置参数、用户会下意识地 Runtime(ctx, store) 这样写、一旦未来在字段间插入新字段(比如把 previous 挪到 stream_writer 前面)、所有调用代码就都错位了。kw_only 把”字段顺序”从公开 API 里隐藏了、未来怎么重排字段都不破坏兼容。

slots=True——生成 __slots__ 而不是 __dict__。Runtime 在一次图执行里可能被 override 上千次(每个任务、每次重试都 replace 一次)、每个 Runtime 实例少 ~56 字节(__dict__ 的典型大小)、在长时间运行的服务端累积就是几 MB 级别的节省。而且 __slots__ 让属性查找走 C 层的 descriptor 而不是 Python dict 查找、单次访问快 ~15-20%。但 slots 有代价:不能动态加属性——runtime.my_extra = "foo"AttributeError。对 Runtime 这是特性不是 bug——Runtime 的字段就应该是固定的 6 个、谁想在 runtime 上塞临时变量都是错的、slots 在编译期就拦住了。

frozen=True——__setattr__ 会 raise。这是并发安全的核心保险丝:多个任务并发执行时可能共享同一个 Runtime 引用、如果任何一个任务能 self.runtime.store = None 改掉、其他任务就看到脏数据。frozen 让这种改动在类型层面就不可能发生、想”改”只能 replace() 返回新对象——原对象不变、老引用依然安全。

三个标志组合起来的语义是”不可变的、有内存约束的、带命名字段的运行时依赖容器”——每一条都是从”Runtime 要承担什么角色”反推出来的。这不是过度工程、每个 flag 关掉都会引入一种真实的 bug。

14.2.5 _RuntimeOverrides TypedDict:override 的类型签名秘密

override 方法在 runtime.py:204 的签名不是 **overrides: Any、而是:

def override(
    self, **overrides: Unpack[_RuntimeOverrides[ContextT]]
) -> Runtime[ContextT]:
    return replace(self, **overrides)

关键是 Unpack[_RuntimeOverrides[ContextT]]——Unpack 是 PEP 646/692 里的类型操作符、把一个 TypedDict 展开为 kwargs 类型签名。_RuntimeOverrides 定义在 runtime.py:80

class _RuntimeOverrides(TypedDict, Generic[ContextT], total=False):
    context: ContextT
    store: BaseStore | None
    stream_writer: StreamWriter
    previous: Any
    execution_info: ExecutionInfo
    server_info: ServerInfo | None

total=False 表示所有字段都是可选的。Unpack 让 IDE 和 mypy 知道”这个 **overrides 只接受这 6 个键、且每个键有固定类型”——写 runtime.override(contxet=x)(拼错)会在类型检查时就被捕获、runtime.override(context=123)(类型错)同样被拦截。

为什么不直接写 def override(self, context=..., store=..., ...) 六个具名参数?——因为那样调用 runtime.override(execution_info=ei) 就必须写出所有默认值或者漏掉的被 replace 吞掉。replace(self, **overrides)dataclasses 的标准做法——未传的字段保持原值、传了的字段覆盖。只有 **overrides 能干净地表达”只替换我给的这几个字段”。Unpack[_RuntimeOverrides]可变字段集合类型安全同时拿到了。

这个设计在 Python 里算较新(UnpackTypedDict 支持是 PEP 692、Python 3.12 才标准化、typing_extensions backport 到老版本)——LangGraph 愿意用它、是因为 Runtime 的使用频率太高、类型错误的成本实在太大。

14.2.6 第三个方法 patch_execution_info

章节只展示了 mergeoverride、但真实 Runtime 还有第三个方法(line 210-218):

def patch_execution_info(self, **overrides: Any) -> Runtime[ContextT]:
    """Return a new runtime with selected execution_info fields replaced."""
    if self.execution_info is None:
        msg = "Cannot patch execution_info before it has been set"
        raise RuntimeError(msg)
    return replace(
        self,
        execution_info=self.execution_info.patch(**overrides),
    )

这个方法只改 execution_info 里的字段、不改 Runtime 其他字段——是”二级 replace”:先找到 execution_info 子对象、再对它做 replace、再用 replace 后的值覆盖回 Runtime。语义上等价于 runtime.override(execution_info=runtime.execution_info.patch(**overrides))、但明确命名表达”我只是在更新执行元数据、不是在换整个 runtime”。

if self.execution_info is None: raise RuntimeError 是另一道保险丝——execution_info 只有在任务被 Pregel 实际调度后才存在、未调度前 patch 它是逻辑错误。这种错误在并发场景下难调试(看到的 Runtime 可能来自不同 code path),明确抛 RuntimeError 而不是 silent None 覆盖能在最早的错误发生点报出。

这个方法在第 7 章讲的 _retry.py 里被反复调用——每次重试都通过它给 execution_info 注入 node_attempt(递增的尝试次数)和 node_first_attempt_time(首次尝试时间)。重试不该改 context/store/stream_writer——只该改 “这是第几次尝试”patch_execution_info 正是为此而生、名字和调用路径都精确反映了意图。

14.2.7 _ensure_execution_info:分布式运行时的”缺口填补”

Runtime.patch_execution_info 的前置条件很严——self.execution_info is None 就 raise。但在分布式运行时(LangGraph Platform)里、这个条件会被打破:任务是由服务端进程准备、序列化、再在 executor 进程里反序列化执行的、executor 不走 _algo.py 里的 OSS 路径、没有人来设置 execution_info

为此 _retry.py:33 引入了一个”兜底函数” _ensure_execution_info

def _ensure_execution_info(
    runtime: Runtime, config: RunnableConfig, task: PregelExecutableTask
) -> Runtime:
    if runtime.execution_info is not None:
        return runtime
    configurable = config.get(CONF, {})
    return runtime.override(
        execution_info=ExecutionInfo(
            checkpoint_id=configurable.get(CONFIG_KEY_CHECKPOINT_ID) or "",
            checkpoint_ns=configurable.get(CONFIG_KEY_CHECKPOINT_NS) or "",
            task_id=configurable.get(CONFIG_KEY_TASK_ID) or task.id,
            thread_id=configurable.get(CONFIG_KEY_THREAD_ID),
            run_id=str(rid) if (rid := config.get("run_id")) else None,
        ),
    )

逻辑是:如果 execution_info 已经有了(OSS 路径走过)、直接返回;否则从 config[CONF] 里凑——CONFIG_KEY_CHECKPOINT_ID/_NS/_TASK_ID/_THREAD_ID 都是任务 config 里一定有的字段(因为序列化传过来时保留了)、task.id 作为 task_id 的兜底。

这个函数只在 run_with_retryarun_with_retry 的入口被调用一次_retry.py:100:205)——在 patch_execution_info(node_first_attempt_time=...) 之前、确保 execution_info 存在。这是整个 Runtime 体系里唯一为分布式部署开的后门——如果没有它、LangGraph Platform 上运行任何带 RetryPolicy 的节点都会抛”Cannot patch execution_info before it has been set”。

为什么不在 Runtime 构造时就强制 execution_info 非空?——因为 OSS 本地运行时、Runtime 是在 Pregel 初始化阶段创建的、那时还没有”任务”的概念、execution_info 还没法填。强制非空就让用户在 graph.invoke 之前手动构造 Runtime——那是违反直觉的。让它能为 None、由框架在”任务准备阶段”填上、是最干净的分层。但这也意味着分布式路径绕过了任务准备阶段、所以需要 _ensure_execution_info 这一道补丁——是分布式架构在 OSS 抽象上撕开的口子、用最小侵入补回来。

14.3 ContextT 泛型

14.3.1 定义

# langgraph/typing.py
ContextT = TypeVar("ContextT", bound=StateLike | None, default=None)

ContextT 是一个带默认值的类型变量,约束为 StateLike | NoneStateLike 包括 TypedDictBaseModeldataclass 等结构化类型。默认值为 None,这意味着如果不指定 context_schema,Runtime 的 context 字段类型就是 None

细节是:这个 TypeVar 不是从标准 typing 导入、而是 typing_extensions.TypeVartyping.py:3)。原因是 default=NonePEP 696 的特性、Python 3.13 才进 stdlib、LangGraph 支持 3.10+ 所以必须用 typing_extensions 的 backport。这一点看似无关紧要、但它决定了**StateGraph 在 3.10/3.11/3.12 上都能写 StateGraph[MyState] 而不是 StateGraph[MyState, None]**——ContextT 有 default、省略它时自动 fallback 到 None。换任何标准 TypeVar 都做不到这点。

14.3.2 类型传播链路

flowchart LR
    Schema["context_schema=MyContext"] --> SG["StateGraph[State, MyContext]"]
    SG --> Compile["compile()"]
    Compile --> CSG["CompiledStateGraph[State, MyContext, ...]"]
    CSG --> Invoke["invoke(input, context=MyContext(...))"]
    Invoke --> RT["Runtime[MyContext]"]
    RT --> Node["node(state, runtime: Runtime[MyContext])"]

类型从 StateGraphcontext_schema 参数开始,贯穿编译、调用、注入的全过程。IDE 和类型检查器可以在每一步提供准确的类型补全。

14.3.3 使用示例

from dataclasses import dataclass
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

@dataclass
class AppContext:
    user_id: str
    api_key: str
    is_admin: bool = False

class State(TypedDict, total=False):
    response: str

def my_node(state: State, runtime: Runtime[AppContext]) -> State:
    # IDE 知道 runtime.context 的类型是 AppContext
    user_id = runtime.context.user_id
    if runtime.context.is_admin:
        return {"response": f"Admin {user_id}: full access"}
    return {"response": f"User {user_id}: limited access"}

graph = (
    StateGraph(state_schema=State, context_schema=AppContext)
    .add_node("my_node", my_node)
    .set_entry_point("my_node")
    .set_finish_point("my_node")
    .compile()
)

result = graph.invoke({}, context=AppContext(user_id="alice", api_key="sk-..."))

14.4 ExecutionInfo:执行元数据

14.4.1 数据结构

@dataclass(frozen=True, slots=True)
class ExecutionInfo:
    """Read-only execution info/metadata for the current thread/run/node."""

    checkpoint_id: str
    """The checkpoint ID for the current execution."""

    checkpoint_ns: str
    """The checkpoint namespace for the current execution."""

    task_id: str
    """The task ID for the current execution."""

    thread_id: str | None = None
    """None when running without a checkpointer."""

    run_id: str | None = None
    """None when run_id is not provided in RunnableConfig."""

    node_attempt: int = 1
    """Current node execution attempt number (1-indexed)."""

    node_first_attempt_time: float | None = None
    """Unix timestamp for when the first attempt started."""

ExecutionInfo 提供了节点函数可能需要的所有执行上下文信息,而无需直接操作低层的 RunnableConfig

14.4.2 字段用途

graph TB
    EI[ExecutionInfo]
    EI --> CID["checkpoint_id<br/>当前检查点 ID<br/>用于状态追踪"]
    EI --> CNS["checkpoint_ns<br/>检查点命名空间<br/>标识子图层级"]
    EI --> TID["task_id<br/>任务 ID<br/>唯一标识本次执行"]
    EI --> ThID["thread_id<br/>线程 ID<br/>跨轮次对话标识"]
    EI --> RID["run_id<br/>运行 ID<br/>单次调用标识"]
    EI --> NA["node_attempt<br/>重试次数<br/>1 表示首次执行"]
    EI --> NFAT["node_first_attempt_time<br/>首次尝试时间<br/>用于超时计算"]

典型的使用场景:

def my_node(state: State, runtime: Runtime) -> State:
    info = runtime.execution_info
    # 日志中记录执行上下文
    logger.info(f"Thread={info.thread_id}, Task={info.task_id}, Attempt={info.node_attempt}")

    # 根据重试次数调整行为
    if info.node_attempt > 1:
        logger.warning("Retrying, using fallback strategy")

    # 使用 thread_id 做线程级缓存
    cache_key = f"{info.thread_id}:{info.task_id}"
    ...

14.4.3 patch 方法

ExecutionInfo 是 frozen 的,但提供了 patch 方法创建修改后的副本:

def patch(self, **overrides: Any) -> ExecutionInfo:
    """Return a new execution info object with selected fields replaced."""
    return replace(self, **overrides)

框架在重试时使用这个方法更新 node_attemptnode_first_attempt_time

14.5 ServerInfo:服务端元数据

14.5.1 数据结构

@dataclass(frozen=True, slots=True)
class ServerInfo:
    """Metadata injected by LangGraph Server."""

    assistant_id: str
    """The assistant ID for the current execution."""

    graph_id: str
    """The graph ID for the current execution."""

    user: BaseUser | None = None
    """The authenticated user, if any."""

ServerInfo 只在 LangGraph Platform(部署服务)环境中被填充。在本地开源运行时,runtime.server_info 始终为 None

14.5.2 _build_server_info:为什么 isinstance 不够、要 hasattr 兜底

server_info 不是节点函数传进来的、是 Pregel 在初始化时从 config["metadata"]config[CONF]["langgraph_auth_user"]重建出来的。重建逻辑在 main.py:3659

def _build_server_info(config, parent_runtime) -> ServerInfo | None:
    metadata = config.get("metadata") or {}
    configurable = config.get(CONF) or {}
    assistant_id = metadata.get("assistant_id")
    graph_id = metadata.get("graph_id")

    auth_user_data = configurable.get("langgraph_auth_user")
    user: BaseUser | None = None
    if auth_user_data is not None:
        if isinstance(auth_user_data, BaseUser) or hasattr(auth_user_data, "identity"):
            user = cast(BaseUser, auth_user_data)

    if assistant_id is not None or graph_id is not None or user is not None:
        return ServerInfo(
            assistant_id=str(assistant_id) if assistant_id else "",
            graph_id=str(graph_id) if graph_id else "",
            user=user,
        )
    return None

最耐人寻味的是这一行:

if isinstance(auth_user_data, BaseUser) or hasattr(auth_user_data, "identity"):

**为什么 isinstance(auth_user_data, BaseUser) 不够、要 hasattr(auth_user_data, "identity") 兜底?**源码注释给出了答案:

We prefer isinstance(BaseUser) but fall back to hasattr(“identity”) because the server’s ProxyUser provides permissions via __getattr__, which Python’s runtime_checkable Protocol check doesn’t see.

BaseUserruntime_checkableProtocol——isinstance 检查要求所有声明的属性都必须在对象的 __dict__ 或类层级上可见。但 ProxyUser(LangGraph Platform 用于跨进程传递用户的代理对象)通过 __getattr__ 动态代理属性访问——hasattr(proxy, "permissions") 返回 True(动态触发 __getattr__ 并成功返回)、但 isinstance(proxy, BaseUser) 返回 False(因为 runtime_checkable 的实现不走 __getattr__)。

解决办法就是这个 or 链:先用 isinstance 对”真正实现 BaseUser 的用户对象”做快速检查、再用 hasattr(..., "identity") 对 ProxyUser 兜底。identityBaseUser 最核心的属性(用户唯一 ID)、如果一个对象连 identity 都没有、它大概率不是用户对象。这一行体现了生产环境 Protocol 和动态代理的冲突、是分布式系统设计里常见的真实权衡。

14.5.3 BaseUser 协议

# 来自 langgraph_sdk.auth.types
class BaseUser:
    """认证用户协议,支持属性访问和字典访问"""
    identity: str  # 用户唯一标识
    # 支持 user.identity 和 user["identity"] 两种访问方式

这使得节点函数可以在有认证的环境中安全地获取用户信息:

def secure_node(state: State, runtime: Runtime) -> State:
    if runtime.server_info and runtime.server_info.user:
        user_id = runtime.server_info.user.identity
    else:
        user_id = "anonymous"
    ...

14.6 Context vs State 的本质区别

14.6.1 概念对比

这是理解 LangGraph 运行时模型的关键区分:

graph LR
    subgraph "State(状态)"
        direction TB
        S1[可变] --> S2[在节点间流动]
        S2 --> S3[被 Channel 管理]
        S3 --> S4[支持 reducer 合并]
        S4 --> S5[被 Checkpoint 持久化]
    end

    subgraph "Context(上下文)"
        direction TB
        C1[不可变] --> C2[在整个执行期间固定]
        C2 --> C3[由调用方提供]
        C3 --> C4[不参与状态管理]
        C4 --> C5[不被 Checkpoint 持久化]
    end
维度StateContext
可变性每个节点可以修改整个执行期间不变
流转方式通过 Channel 在节点间传递通过 Runtime 注入到所有节点
持久化被 Checkpoint 保存不被保存
典型内容消息列表、处理结果用户 ID、API 密钥
定义方式state_schema=Statecontext_schema=Context
传入方式graph.invoke(input)graph.invoke(input, context=ctx)

14.6.2 为什么 Context 不放在 State 中?

把运行时依赖放在 State 中存在几个问题:

  1. Checkpoint 污染:数据库连接、API 密钥不应被序列化到 checkpoint
  2. 类型混淆:状态字段应该是”数据”,而不是”工具”
  3. 安全风险:checkpoint 可能被导出或共享,敏感信息不应出现在其中
  4. 语义错误:reducer 不应该对”用户 ID”做 operator.add

Context 通过将依赖项与数据分离,彻底解决了这些问题。

14.6.3 为什么 Context 不放在 Config 中?

LangGraph 0.6.0 之前,运行时依赖通过 RunnableConfig.configurable 传递(即旧的 config_schema 参数)。这种方式有几个缺点:

  1. 类型不安全:config 是 dict[str, Any],失去了泛型类型信息
  2. API 混乱:config 的主要用途是传递 thread_id、checkpoint_id 等框架参数
  3. 嵌套访问:需要 config["configurable"]["user_id"] 这样的深层访问

context_schemaRuntime[ContextT] 提供了一流的、类型安全的替代方案:

# 旧方式(已弃用)
def my_node(state, config: RunnableConfig):
    user_id = config["configurable"]["user_id"]  # 无类型提示

# 新方式
def my_node(state, runtime: Runtime[AppContext]):
    user_id = runtime.context.user_id  # IDE 自动补全

14.7 Runtime 注入机制

14.7.1 注入链路总览

flowchart TB
    subgraph 调用层
        Caller["graph.invoke(input, context=ctx)"]
    end

    subgraph Pregel 初始化
        Caller --> CreateRT["创建 Runtime(context=ctx, store=store)"]
        CreateRT --> InjectConfig["写入 config[CONF][CONFIG_KEY_RUNTIME]"]
    end

    subgraph 任务准备
        InjectConfig --> PNT["prepare_next_tasks"]
        PNT --> PST["prepare_single_task / prepare_push_task_send"]
        PST --> Override["runtime.override(<br/>previous=...,<br/>store=...,<br/>execution_info=...)"]
        Override --> TaskConfig["写入 task config[CONF][CONFIG_KEY_RUNTIME]"]
    end

    subgraph 节点执行
        TaskConfig --> GetRT["节点接收 runtime 参数"]
        GetRT --> UseRT["runtime.context.user_id"]
    end

14.7.2 CONFIG_KEY_RUNTIME 为什么要 sys.intern

注入链路的每一环都通过 CONFIG_KEY_RUNTIME 这个字符串键访问 config。它在 _internal/_constants.py:65 定义:

CONFIG_KEY_RUNTIME = sys.intern("__pregel_runtime")

sys.intern 让 Python 把这个字符串加入全局驻留表——所有字面量相同的字符串在内存里指向同一个对象。这看似没必要(Python 对短的 ASCII 字符串会自动 intern),但加上 sys.intern 保证了无论字符串怎么来的(拼接、从 JSON 反序列化、跨模块传递)都是同一个对象。

**为什么对 Runtime key 特别做这件事?**因为每次任务准备都要做 configurable.get(CONFIG_KEY_RUNTIME)每次 dict 查找的第一步就是计算 key 的 hash——interned 字符串的 hash 是算一次缓存住的、非 interned 每次查找都重算 hash。在大型图里一次 stream 调用可能触发 10k+ 次 configurable.get、这个微优化累积起来很可观。更重要的是:intern 后的字符串比较是指针比较(identity)"__pregel_runtime" == CONFIG_KEY_RUNTIME 在 dict 内部可以 short-circuit 为 is 比较、完全不走 __eq__

LangGraph 对 _internal/_constants.py所有 CONFIG_KEY_*都做了 intern——这是一个”小优化不小”的系统设计:单次收益可忽略、乘以百万次调用就是毫秒级的差异、而实现成本就是一行 sys.intern(...)

14.7.3 Pregel 初始化阶段

当调用 graph.invoke(input, context=ctx) 时,Pregel 的 stream 方法将 context 封装到 Runtime 中,并存入配置:

# Pregel.stream 中的简化逻辑
runtime = Runtime(context=context, store=self.store)
config = patch_configurable(config, {CONFIG_KEY_RUNTIME: runtime})

14.7.4 _coerce_context:dict 到 dataclass/BaseModel 的自动转换

用户调用 graph.invoke({}, context={"user_id": "alice"}) 时、传的是 dict、但节点函数里 runtime.context.user_id 是属性访问——需要 dict 被转换成 AppContext dataclass。这个转换由 _coerce_context 完成(main.py:3691):

def _coerce_context(context_schema, context) -> ContextT | None:
    if context is None:
        return None
    if context_schema is None:
        return context
    schema_is_class = issubclass(context_schema, BaseModel) or is_dataclass(context_schema)
    if isinstance(context, dict) and schema_is_class:
        return context_schema(**context)
    return cast(ContextT, context)

四层分支:(1)None 直接返回 None——没 context 就是没 context。(2)没 context_schema 时原样返回——退化到兼容模式、用户没声明类型、框架不做任何转换。(3)dict + class schema → 调用构造函数——AppContext(**context) 会把 dict 展开成构造参数、字段名不匹配就 TypeError、字段类型不匹配(BaseModel 验证)就 ValidationError(4)其他情况原样 cast——比如用户已经传了 AppContext(...) 实例、不需要转换。

这里有一个微妙点:TypedDict 不走第三条路径——is_dataclass(TypedDict_subclass) 是 False、issubclass(td, BaseModel) 也是 False——所以走第四条”原样返回”。这是对的:TypedDict 在运行时就是 dict、节点里 runtime.context["user_id"] 是字典访问而不是属性访问、根本不需要”转换”。

这个函数把”怎么表达 Context”的自由度完全交给用户:dataclass、BaseModel、TypedDict、甚至自定义类都行。框架只在严格需要转换时介入(dict + 非 dict schema)、其他情况都尊重用户的数据形态。这种不霸道的设计让 Context 能适配任何已存在的数据模型。

14.7.5 任务准备阶段

prepare_single_task 中,框架从配置中取出 Runtime,注入任务级别的信息:

# prepare_single_task 中的 PULL 任务逻辑
runtime = cast(
    Runtime, configurable.get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
)
runtime = runtime.override(
    previous=checkpoint["channel_values"].get(PREVIOUS, None),
    store=store,
    execution_info=ExecutionInfo(
        checkpoint_id=checkpoint["id"],
        checkpoint_ns=task_checkpoint_ns,
        task_id=task_id,
        thread_id=configurable.get(CONFIG_KEY_THREAD_ID),
        run_id=str(rid) if (rid := config.get("run_id")) else None,
    ),
)

每个任务都获得一个新的 Runtime 实例(因为 frozen,所以是 replace 创建的新对象),其中 execution_info 包含了该任务特有的元数据。

14.7.6 KWARGS_CONFIG_KEYS:基于签名的参数注入

节点函数可以这样声明签名:

def my_node(state: State) -> State: ...
def my_node(state: State, runtime: Runtime[Ctx]) -> State: ...
def my_node(state: State, *, store: BaseStore, writer: StreamWriter) -> State: ...
def my_node(state: State, *, config: RunnableConfig, runtime: Runtime) -> State: ...

框架怎么知道该注入哪些参数?答案在 _runnable.py:132KWARGS_CONFIG_KEYS 注册表——一组 (kwarg_name, accepted_types, runtime_attr, default) 元组:

KWARGS_CONFIG_KEYS = (
    ("config",   (RunnableConfig, "RunnableConfig", Optional[RunnableConfig], "Optional[RunnableConfig]", Parameter.empty),
                 "N/A", Parameter.empty),
    ("writer",   (StreamWriter, "StreamWriter", Parameter.empty),
                 "stream_writer", lambda _: None),
    ("store",    (BaseStore, "BaseStore", Parameter.empty),
                 "store", Parameter.empty),
    ("store",    (Optional[BaseStore], "Optional[BaseStore]"),
                 "store", None),
    ("previous", (ANY_TYPE,),
                 "previous", Parameter.empty),
    ("runtime",  (ANY_TYPE,),
                 "N/A", Parameter.empty),
)

六条里有三个精彩设计:

store 出现两次——一次是 BaseStore(无默认、若框架没提供 store 就报错)、一次是 Optional[BaseStore](默认 None、接受缺失)。这就让用户可以用声明的类型来表达”我要不要 store”:写 store: BaseStore 表示”必须有、否则是配置错误”、写 store: BaseStore | None 表示”最好有、没有我也能跑”。框架通过匹配精确的类型注解走不同分支——类型不仅是文档、还是运行时的契约

ANY_TYPE = object() 哨兵——用在 previousruntime 上。previous 的类型由 ContextT 决定(用户返回什么框架就传什么)、runtime 的类型是 Runtime[某个泛型]——这些都不能在元组里枚举完、用 ANY_TYPE 表示”不检查类型、只看名字”。检查逻辑在 _runnable.py:303if typ != (ANY_TYPE,) and p.annotation not in typ: continue——ANY_TYPE 就跳过类型检查。

"N/A" 表示 runtime_attr——configruntime 两行的 runtime_attr 都是 "N/A"。因为 config 直接来自 invoke 的参数、不从 Runtime 对象拿;runtime 则是”整个 Runtime 对象”本身。调用注入时(_runnable.py:349)对这两个有特殊分支:

if kw == "config":
    kw_value = config  # 直接用
elif runtime:
    if kw == "runtime":
        kw_value = runtime  # 注入整个对象
    else:
        kw_value = getattr(runtime, runtime_key)  # 从 runtime 拿属性

VALID_KINDS = (POSITIONAL_OR_KEYWORD, KEYWORD_ONLY) 进一步限制:只有”可按关键字传”的参数才参与注入——*args/**kwargs 那样的变参、以及 positional-only 参数都被排除。保证框架只注入到明确命名的槽位、不会意外破坏用户的变参收集逻辑。

这个注册表的设计价值在于添加新的可注入参数只改一行——比如以后要支持 tracer: Tracer、只要在元组里加一行、__init__ 里的签名检查自动扩展、不用改任何核心逻辑。是典型的数据驱动扩展

14.7.7 节点函数接收

框架通过检查节点函数的参数签名,自动注入 Runtime:

# 节点函数可以声明 runtime 参数
def my_node(state: State, runtime: Runtime[AppContext]) -> State:
    ...

# 或者通过 get_runtime() 手动获取
from langgraph.runtime import get_runtime

def my_node(state: State) -> State:
    runtime = get_runtime(AppContext)  # 返回 Runtime[AppContext]
    ...

get_runtime 函数从当前线程的配置中提取 Runtime:

def get_runtime(context_schema: type[ContextT] | None = None) -> Runtime[ContextT]:
    # 源码注释说明:理想情况下 runtime 应该有独立于 config 的
    # context manager;这依赖未来移除 configurable packing。
    runtime = cast(Runtime[ContextT], get_config()[CONF].get(CONFIG_KEY_RUNTIME))
    return runtime

注意源码里保留的待办注释——这透露了一个真实的设计张力:理想情况下 Runtime 应该有独立的 ContextVar/contextvars 管理、而不是塞在 config 里contextvars 是 Python 3.7+ 的标准异步感知的上下文栈、asyncio 任务切换时自动跟着走;而现在 Runtime 靠 config 传递、每次 patch_configurable 都要拷贝整个 CONF dict——是历史包袱

未来一旦 LangGraph 彻底移除 “configurable packing”(把运行时上下文从 RunnableConfig 里剥离),get_runtime 就会变成 CONTEXT_VAR.get() 这样的一行——更快、更干净、线程/协程安全。但现在为了兼容 LangChain Core 的 config 语义、还得用这种借道 config 的次优实现。这种维护者注释是一封”代码里的来信”、告诉后来者”这里还有一步要走”。

context_schema 参数其实完全没被使用——它的唯一作用是类型推导:帮 IDE 和 mypy 知道返回值是 Runtime[AppContext] 而不是 Runtime[Any]。运行时这个参数被忽略、因为 Runtime 对象里并没有存它的 schema 类型(类型信息在运行时已被擦除)。这是典型的类型幽灵参数(phantom type parameter)——只为了让静态类型系统工作、运行时无用。

14.7.8 stream_writer 的三分支选择:custom/CONFIG_KEY_STREAM/noop

Runtime 的 stream_writer 字段不是静态的——在 main.py:2628根据当前的流模式动态选择

if "custom" in stream_modes:
    def stream_writer(c: Any) -> None:
        stream.put(
            (
                tuple(get_config()[CONF][CONFIG_KEY_CHECKPOINT_NS].split(NS_SEP)[:-1]),
                "custom",
                c,
            )
        )
elif CONFIG_KEY_STREAM in config[CONF]:
    stream_writer = config[CONF][CONFIG_KEY_RUNTIME].stream_writer
else:
    def stream_writer(c: Any) -> None:
        pass

三种情况三种 writer:

① 用户传了 stream_mode="custom"——构造一个带命名空间的 writer:从 CONFIG_KEY_CHECKPOINT_NS 读出当前子图的命名空间、把写入的内容包装成 (namespace, "custom", content) 三元组、push 到 stream 通道。用户在 graph.stream(input, stream_mode="custom") 就能收到这些内容、并通过命名空间区分是哪个子图发出的。

② 当前是子图、且父图已经设置了 CONFIG_KEY_STREAM——直接复用父图的 stream_writer(父图的 config[CONF][CONFIG_KEY_RUNTIME].stream_writer)。这样子图通过 runtime.stream_writer(data) 写的内容、会沿着父图的流式管道一路冒上去、用户在顶层 stream 循环里就能看到。

③ 兜底(没有 custom 模式、也不是子图)——定义一个什么都不做的 lambda(注意这个是每次 stream 都新建的、不是 _no_op_stream_writer 那个全局哨兵)。这种情况下节点写 runtime.stream_writer(x) 完全没效果、x 静默丢弃。

这三个分支的优先级非常讲究——custom 模式显式表达了”用户要自己定义流输出”、优先级最高;没 custom 时退回到子图继承父图的流;再没有就 noop。永远不是”自动把所有 stream_writer 输出都发到默认 stream”——因为那会把 custom stream 和 debug stream 混在一起、用户无法区分。

注意 ② 里走的是 parent_runtime.stream_writer、不是 parent_runtime 本身——但这个 stream_writer 已经由父图按同样逻辑选择过了(可能是 parent 的 custom、可能是 parent 的 parent 的 custom……)。这样任意深度的子图嵌套、最终所有 runtime.stream_writer 调用都汇聚到根图的 stream 通道——除非中间某一层用 custom 模式切出了新的流。

14.7.9 async 路径的对称性与差异(main.py:3016

Sync 路径是 Pregel.stream、async 路径是 Pregel.astreammain.py:3016)——两边对 Runtime 的处理几乎完全对称:

# async 路径(main.py:3027-3039)
parent_runtime = config[CONF].get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
server_info = _build_server_info(config, parent_runtime)

runtime = Runtime(
    context=_coerce_context(self.context_schema, context),
    store=store,
    stream_writer=stream_writer,
    previous=None,
    execution_info=None,
    server_info=server_info,
)
runtime = parent_runtime.merge(runtime)
config[CONF][CONFIG_KEY_RUNTIME] = runtime

和 sync 版本一字不差——同样的 _coerce_context 、同样的 _build_server_info、同样的 parent_runtime.merge(runtime) 顺序。为什么不抽成共享函数?——因为 sync/async 路径还有很多其他差异(SyncPregelLoop vs AsyncPregelLoop、stream 的构造方式、callback 管理器类型)、强行抽公共函数会牵连这些差异、反而让代码难懂。两边平行复制 13 行、是工程上的”冗余换清晰”——以可读性为代价换取两个流程各自闭合、修改时不需要担心”改了 sync 会不会打破 async”。

这是开源 Python 框架里常见的 sync/async 二元性处理——LangChain Core 也是同样风格(每个 Runnable 都有 invoke/ainvoke、stream/astream 两套对称实现)。接受这种重复、而不是强行用装饰器/元类消除它、是因为调试成本:抽象层每多一层、异步死锁或 task 丢失的排查都会多一层间接。

14.8 DEFAULT_RUNTIME 与空操作

DEFAULT_RUNTIME = Runtime(
    context=None,
    store=None,
    stream_writer=_no_op_stream_writer,
    previous=None,
    execution_info=None,
)

DEFAULT_RUNTIME 是当没有显式提供 context 时使用的默认值。它的所有字段都是”空”或”无操作”的,确保节点代码在没有 context 的情况下也能安全运行。

def _no_op_stream_writer(_: Any) -> None: ...

空操作的 stream_writer 意味着节点调用 runtime.stream_writer(data) 不会产生任何效果——数据会被静默丢弃。这个设计让节点代码不需要检查 runtime 是否”可用”。

14.8.1 DEFAULT_RUNTIME 是模块级单例、不是每次新建

仔细看 runtime.py:221

DEFAULT_RUNTIME = Runtime(
    context=None,
    store=None,
    stream_writer=_no_op_stream_writer,
    previous=None,
    execution_info=None,
)

这是一个模块级的全局单例——整个进程里所有”缺省 runtime”都共享这同一个对象。结合 Runtime 是 frozen 的、这个共享完全安全。

为什么做成单例、而不是每次 configurable.get(CONFIG_KEY_RUNTIME, Runtime()) 临时建?——因为 _algo.py 里的 prepare_single_task 里几乎每个任务都会走 configurable.get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)。如果每次都现建、一个大图的一次 step 会有成百上千次无谓的 Runtime 构造——哪怕 frozen + slots 已经把单次构造压到 ~100ns、累积也是 100μs 量级的浪费。单例让”缺省分支”免费——dict.get 的第二参数只是指针、没有任何构造开销。

但单例也带来一个陷阱:永远不要改 DEFAULT_RUNTIME 本身。不过这不是风险——Runtime 是 frozen 的、想改也改不了。这就是 frozen dataclass 单例的设计协同效应——不可变性让单例安全、单例让构造免费、两者相辅相成。

注意 DEFAULT_RUNTIME 没有 server_info 字段——它只写了 5 个字段、server_info 用了默认值 None(dataclass 的 field(default=None))。这意味着开源本地运行时、runtime.server_info 从头到尾都是 None、节点里任何 if runtime.server_info: 判断都会短路到 else 分支——这就是”开源和 Platform 共用同一个 Runtime 类型、但在字段值上分化”的实现方式。

14.8.2 _no_op_stream_writer 哨兵 vs 内联 lambda

§14.2.3 提到 mergestream_writeris not _no_op_stream_writer 哨兵判断。这个哨兵的唯一身份依据是函数对象的 id——runtime.py:77 定义的那个具体函数。

stream_writer 的第三分支(§14.7.8 ③)用的不是这个哨兵、而是内联 lambda

else:
    def stream_writer(c: Any) -> None:
        pass

每次 Pregel 启动都现建一个新的 pass-lambda、和 _no_op_stream_writer 不是同一个对象。这是有意为之——如果内联 lambda 是 _no_op_stream_writer 本身、那么 parent_runtime.merge(new_runtime) 时、new_runtime 的 stream_writer 会被当成”未提供”、被 parent_runtime 的老 writer 覆盖——完全违背了 Pregel 初始化”用新 writer”的意图。

内联 lambda 让 parent_runtime.merge 正确地把新建的 pass-lambda当成”已提供的 writer”、覆盖 parent_runtime 的老 writer——哪怕这个 lambda 实际上什么都不做。这就是**“语义上相同 ≠ 代码上相同”的微妙——两个 pass-lambda 的行为一样(都是 noop)、但身份**不同、所以 merge 会走不同分支。

这个设计细节体现了 LangGraph 对 is identity 判定的严格运用——身份不是多余信息、它承载着”这是框架默认值”还是”这是用户或运行时设置的值”的语义区分

14.8.3 previous 字段与函数式 API:为什么只在 checkpointer 存在时才有意义

previous 是 6 个字段里最特殊的一个——只有在函数式 API@entrypoint 装饰器)且配置了 checkpointer 的情况下才有值。普通的 StateGraph 节点里它永远是 None。

_algo.py:670 的赋值:

runtime = runtime.override(
    previous=checkpoint["channel_values"].get(PREVIOUS, None),
    store=store,
    execution_info=ExecutionInfo(...),
)

previouscheckpoint["channel_values"] 的特殊键 PREVIOUS 里读——这个键只在函数式 API 的 @entrypoint 返回时被写入。普通 StateGraph 的 checkpoint 里没有 PREVIOUS 键、get(..., None) 返回 None。

为什么需要 previous?——函数式 API 允许这样写:

@entrypoint(checkpointer=MemorySaver())
def my_flow(input_data, *, previous=None):
    count = (previous or {}).get("count", 0) + 1
    return {"count": count}

previous上一次整个 entrypoint 函数的返回值——这就把”函数式累积”做成了自动化:每次调用 my_flow 都能拿到上次的返回值、不用手动维护状态字段。这比 StateGraph 的 channel 机制更轻量——对一些”计数器”、“对话轮次”类的简单状态、函数式 API + previous 就够了。

previous 和 StateGraph 的 State 的关键区别是:State 是在节点间流动、previous 是在调用间流动。State 的生命周期是”一次 invoke 内”、previous 的生命周期是”同一个 thread 的跨 invoke”——时间尺度完全不同。

这也解释了为什么 merge 对 previous 用 is None 精确检测(§14.2.3):函数式 API 的用户可能返回 0/""/False 当作有意义的状态、不能用 or 把它们和”未设置”混为一谈。这个严格判定直接服务于 previous 的独特语义——在 6 个字段里、previous 是唯一可能承载普通 Python 值(而不是”对象或 None”)的字段。

14.9 子图中的 Runtime 传播

14.9.1 merge 语义

当执行进入子图时,子图可能有自己的 store 和 context。Runtime 的 merge 方法用于合并父图和子图的 Runtime:

def merge(self, other: Runtime[ContextT]) -> Runtime[ContextT]:
    return Runtime(
        context=other.context or self.context,        # 子图优先
        store=other.store or self.store,              # 子图优先
        stream_writer=other.stream_writer             # 子图优先
            if other.stream_writer is not _no_op_stream_writer
            else self.stream_writer,
        previous=self.previous if other.previous is None else other.previous,
        execution_info=other.execution_info or self.execution_info,
        server_info=other.server_info or self.server_info,
    )

合并策略是”子图覆盖父图”——如果子图提供了自己的 context,则使用子图的;否则继承父图的。

14.9.2 传播示意

graph TB
    subgraph 父图
        PR["Runtime[ParentCtx]<br/>context=ParentCtx(...)"]
        PR --> N1[Node A]
        PR --> SubEntry[子图入口]
    end

    subgraph 子图
        SubEntry --> Merge["merge(parent_rt, child_rt)"]
        Merge --> CR["Runtime[ChildCtx]<br/>context 继承或覆盖"]
        CR --> N2[Node B]
        CR --> N3[Node C]
    end

14.9.3 合并顺序决定的语义反转:一个真实的 bug 模式

§14.12 已经点出 parent_runtime.merge(runtime) 的顺序不能反、但在真实代码审查里还是经常看到有人写反——因为”合并”这个动词在中文和英文语境里都对称、没有方向直觉。一个真实的思维实验:

# 错误写法
runtime = runtime.merge(parent_runtime)  # 新 runtime 反被旧覆盖

后果:子图本次调用传入的 context/store 被父图传下来的老 context/store 吃掉。用户在 debug 时会看到:

  • 子图函数里 runtime.context 不是自己刚传的那个、而是父图的那个
  • 重启进程后问题消失(因为没有”父图”、parent_runtime 是 DEFAULT_RUNTIME、merge 顺序错误的负面效果不明显)
  • 只在嵌套子图场景里复现、难以最小化

LangGraph 在源码里用单侧调用和**# build server_info from metadata + parent runtime** 这样的注释来强化正确顺序——把”parent_runtime”放在主语位置、明确它是”底座”而不是”增量”。这是通过命名和结构**让错误代码”看起来明显不对”**的技巧——不是靠运行时检测、而是靠阅读时的语感。

14.10 设计决策

14.10.1 为什么 Runtime 是 frozen 的?

frozen dataclass 带来三个好处:

  1. 线程安全:并发执行的多个节点读取同一个 Runtime 实例时不会发生数据竞争
  2. 语义正确性:context 代表”不变的运行时依赖”,frozen 在类型层面强制了这个语义
  3. 可哈希性:frozen dataclass 默认可哈希,便于缓存和去重

14.10.2 为什么 execution_info 在 Runtime 中而不是单独注入?

execution_info 放在 Runtime 中而非作为独立参数注入,有两个原因:

  1. 减少参数数量:节点函数只需要一个 runtime 参数就能访问所有运行时信息
  2. 一致的生命周期:所有运行时信息在同一个对象中创建和传递,生命周期一致

14.10.3 ToolRuntime 与 Runtime 的关系

langgraph.prebuilt 中还有一个 ToolRuntime 类,它是专为工具函数设计的:

class ToolRuntime(_DirectlyInjectedToolArg, Generic[ContextT, StateT]):
    """Runtime context automatically injected into tools."""
    context: ContextT     # 与 Runtime 共享
    store: BaseStore      # 与 Runtime 共享
    stream_writer: StreamWriter  # 与 Runtime 共享
    config: RunnableConfig       # 工具特有
    state: StateT                # 工具特有
    tool_call_id: str            # 工具特有

ToolRuntime 共享了 Runtime 的 contextstorestream_writer 字段,但增加了工具特有的 configstatetool_call_id。它们之间的关系是互补而非继承——Runtime 服务于节点,ToolRuntime 服务于工具。

14.11 实战:完整的 Runtime 使用案例

14.11.1 多租户 Agent 系统

以下示例展示了如何使用 Runtime 构建一个支持多租户的 Agent 系统,每个用户有独立的数据隔离和权限控制:

from dataclasses import dataclass
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.runtime import Runtime
from langgraph.store.memory import InMemoryStore

@dataclass
class TenantContext:
    """多租户上下文"""
    tenant_id: str
    user_id: str
    role: str  # "admin" | "editor" | "viewer"
    db_connection_string: str

class AgentState(TypedDict, total=False):
    messages: list
    response: str

store = InMemoryStore()

def access_control_node(state: AgentState, runtime: Runtime[TenantContext]) -> dict:
    """访问控制节点:根据角色决定权限"""
    ctx = runtime.context
    info = runtime.execution_info

    # 记录审计日志到 Store
    if runtime.store:
        runtime.store.put(
            ("audit", ctx.tenant_id),
            f"access_{info.task_id}",
            {
                "user": ctx.user_id,
                "role": ctx.role,
                "action": "query",
                "thread_id": info.thread_id,
            }
        )

    if ctx.role == "viewer":
        return {"response": "You have read-only access."}
    return state

def process_node(state: AgentState, runtime: Runtime[TenantContext]) -> dict:
    """业务处理节点:使用租户隔离的数据"""
    ctx = runtime.context

    # 从租户命名空间读取配置
    if runtime.store:
        config = runtime.store.get(("tenants", ctx.tenant_id), "config")
        model_name = config.value["model"] if config else "default-model"
    else:
        model_name = "default-model"

    # 流式输出处理进度
    runtime.stream_writer({"status": "processing", "model": model_name})

    return {"response": f"Processed by {model_name} for tenant {ctx.tenant_id}"}

graph = (
    StateGraph(state_schema=AgentState, context_schema=TenantContext)
    .add_node("access_control", access_control_node)
    .add_node("process", process_node)
    .add_edge(START, "access_control")
    .add_edge("access_control", "process")
    .add_edge("process", END)
    .compile(store=store)
)

# 不同租户使用不同的 context
result = graph.invoke(
    {"messages": ["Hello"]},
    context=TenantContext(
        tenant_id="acme",
        user_id="alice",
        role="admin",
        db_connection_string="postgresql://acme:...",
    )
)

14.11.2 _retry.py 里的三阶段注入:first_attempt_time 只设一次、node_attempt 每轮更新

第 7 章讲过 RetryPolicy 的重试循环、这里专门看它对 Runtime 做了什么。run_with_retry_retry.py:86)的结构是:

def run_with_retry(task, retry_policy, configurable=None):
    ...
    node_first_attempt_time = time.time()     # ① 入口记一次时间
    ...
    runtime = config.get(CONF, {}).get(CONFIG_KEY_RUNTIME)
    if isinstance(runtime, Runtime):
        runtime = _ensure_execution_info(runtime, config, task)
        config = patch_configurable(
            config,
            {CONFIG_KEY_RUNTIME: runtime.patch_execution_info(
                node_first_attempt_time=node_first_attempt_time,  # ② 只在入口打一次
            )},
        )
    while True:                                # ③ 每轮循环
        runtime = config.get(CONF, {}).get(CONFIG_KEY_RUNTIME)
        if isinstance(runtime, Runtime):
            config = patch_configurable(
                config,
                {CONFIG_KEY_RUNTIME: runtime.patch_execution_info(
                    node_attempt=attempts + 1,  # ④ 每轮更新尝试次数
                )},
            )
        try:
            ...
            return task.proc.invoke(task.input, config)
        except Exception:
            attempts += 1
            ...
            time.sleep(sleep_time)

第一次 patch(入口、行 104):设置 node_first_attempt_time——这是整个重试会话的首次尝试时间戳、后续所有重试都看到同一个值。这能让节点逻辑判断”我已经在重试区间里花了多长时间”、做超时熔断。比如 if time.time() - runtime.execution_info.node_first_attempt_time > 60: raise——即使 RetryPolicy 设置了无限重试、节点自己也能在 60 秒后果断投降。

第二次 patch(循环内、行 118):每次重试前把 node_attempt 更新为 attempts + 1(1-indexed)。第一次进入循环 attempts = 0node_attempt = 1;每次失败后 attempts += 1、下一轮 node_attempt = 2/3/...。这样节点可以在代码里根据尝试次数降级

def my_node(state, runtime):
    attempt = runtime.execution_info.node_attempt
    if attempt == 1:
        return call_fast_model()    # 首次尝试用快模型
    elif attempt == 2:
        return call_slow_model()    # 重试用稳模型
    else:
        return return_cached()      # 再不行就返回缓存

这两个字段分开设置、而不是一次 patch 两个——是因为它们的变化频率不同node_first_attempt_time 只变一次(进入 retry 时)、node_attempt 每轮变一次。拆开后、两次 patch 各自只携带真正需要变的字段、patch_execution_info 内部的 replace 也只复制变化的 slot、无谓的重建开销降到最低。

这个小细节体现了**“不可变数据结构 + 精细 patch”**的设计哲学:frozen 保证安全、patch 保证效率、每次 replace 只动必须动的那一个 slot。

14.11.3 Runtime 在重试场景中的行为

当节点配置了 RetryPolicy 时,Runtime 的 execution_info 会在每次重试中更新:

def flaky_node(state: AgentState, runtime: Runtime) -> dict:
    info = runtime.execution_info
    print(f"Attempt {info.node_attempt}")  # 1, 2, 3...

    if info.node_attempt == 1:
        raise ConnectionError("Temporary failure")

    # 第二次尝试成功
    return {"response": "Success after retry"}

框架会通过 runtime.patch_execution_info(node_attempt=2) 创建新的 Runtime 副本,传递给重试的执行。

14.12 parent_runtime.merge 在 stream 入口的真实调用

§14.9 介绍了 merge 的三种未提供判定、这里看它在主流程里被怎么用Pregel.stream 初始化 Runtime 时(main.py:2657):

# build server_info from metadata + parent runtime
parent_runtime = config[CONF].get(CONFIG_KEY_RUNTIME, DEFAULT_RUNTIME)
server_info = _build_server_info(config, parent_runtime)

runtime = Runtime(
    context=_coerce_context(self.context_schema, context),
    store=store,
    stream_writer=stream_writer,
    previous=None,
    execution_info=None,
    server_info=server_info,
)
runtime = parent_runtime.merge(runtime)
config[CONF][CONFIG_KEY_RUNTIME] = runtime

三步:(1)从 config 里拿 parent_runtime——如果当前是子图、父图已经塞过 Runtime、拿出来用;如果是根图、configurable.get 返回 DEFAULT_RUNTIME(2)构造子图本次的 Runtime——context 通过 _coerce_context 规范化、store/stream_writer 来自调用参数、server_info 由 _build_server_info 从 metadata 重建。(3)merge 父子 Runtime——关键在这行、parent_runtime.merge(runtime) 而不是 runtime.merge(parent_runtime)顺序不能反

回看 merge 的实现(§14.2.3)、other 是优先的、self 是 fallback。parent_runtime.merge(runtime) 读作”以 parent_runtime 为底、用新 runtime 覆盖”——新 runtime 提供的字段会覆盖 parent、没提供的从 parent 继承。这是子图继承父图上下文的语义:子图调用方没传 context、就用父图的;子图调用方传了新 store、就用新的。

如果反过来写 runtime.merge(parent_runtime)、语义就变成”用 parent_runtime 覆盖新 runtime”——父图的老 context 会把子图调用方刚设好的新 context 覆盖掉、子图调用完全失去了传 context 的能力。参数顺序决定语义方向、两个看似对称的名字背后是严格的左右区分。这就是 API 设计里”动词 + 施事 + 受事”的微妙——a.merge(b)a 是旧态、b 是增量、b 胜。

14.13 本章与全书体系的呼应

与第 7 章(重试与错误处理)的呼应——本章展示了 patch_execution_info 是如何在 run_with_retry 的每一轮循环里被调用、把 node_attempt 递增注入节点。第 7 章讲的是”重试怎么发生”(何时重试、怎么退避、何时放弃),本章补齐了”重试怎么被节点感知”——节点通过 runtime.execution_info.node_attempt 拿到第几次尝试的信息、能做自适应降级。这两章叠在一起就是”重试”这件事的完整闭环。

与第 15 章(Store)的呼应——runtime.store 是 LangGraph 里跨线程持久化的唯一入口。本章只讲了 “store 通过 Runtime 传给节点”、第 15 章展开讲”store 本身怎么工作、BaseStore 4 个操作、AsyncBatchedBaseStore 的 drain”。Runtime 和 Store 是依赖注入关系——Runtime 是”依赖载体”、Store 是”依赖内容”。

与第 16 章(Prebuilt 与工具)的呼应——§14.10.3 提到 ToolRuntimeRuntime 的姊妹类、第 16 章详解 ToolRuntime 的 8 个字段(context/store/stream_writer 共享、config/state/tool_call_id 独有、previous/execution_info 都暴露)和 _inject_tool_args 如何把它注入到工具调用。Runtime 是节点的依赖容器、ToolRuntime 是工具的依赖容器——两者互补、共享基础字段但针对不同的调用场景扩展。

与第 17 章(Multi-Agent)的呼应——多 Agent 系统里每个 Agent 可以有自己的 Context(不同的 API key、不同的权限)、都通过 Runtime 传递。第 17 章讲的 Send/Command 是流程控制、本章的 Runtime 是运行时依赖——两者组合起来就是”一个 Agent 知道自己是谁(Runtime.context)、也知道该去哪(Send/Command)”。

跨书呼应:和 hyper-tower 的 HTTP 请求上下文——第 12 章讲 hyper 的 Dispatcher 如何在 poll_read/poll_write 之间传递请求状态。Runtime 在 LangGraph 里的角色和 hyper 里的 Conn 状态机异曲同工——都是执行期间不可变的会话级上下文、承载跨步骤共享的信息。不同之处在于 hyper 的状态机是同步可变的、LangGraph 的 Runtime 是 frozen 的——体现了异步并发世界单线程事件循环世界对”可变性”的不同偏好。

14.14 小结

本章深入分析了 LangGraph 的 Runtime 与 Context 机制。Runtime[ContextT] 通过泛型类型参数将运行时依赖注入从一个”约定”提升为一个”类型安全的协议”。六个字段——contextstorestream_writerpreviousexecution_infoserver_info——覆盖了节点函数可能需要的全部运行时信息。frozen 语义确保了并发安全,overridemerge 方法提供了不可变更新的能力。

Context 与 State 的分离是 LangGraph 架构中的关键决策:State 是”随步骤变化的数据”,通过 Channel 管理和 Checkpoint 持久化;Context 是”整个执行期间不变的依赖”,通过 Runtime 注入且不被持久化。这种分离让状态管理更纯粹,同时为敏感信息(如 API 密钥)提供了安全的传递通道。

下一章我们将探讨 BaseStore 接口和 InMemoryStore 实现,了解 LangGraph 如何提供跨线程的长期记忆能力。