LangGraph 设计与实现

第13章 流式输出与调试

作者 杨艺韬 · 11,771 字

第13章 流式输出与调试

13.1 引言

在构建 LLM 应用时,流式输出(streaming)不仅是用户体验的刚需,更是生产系统的可观测性基石。用户期望看到 AI 回复的”打字机效果”,而开发者需要实时监控图执行的每一步——哪个节点正在运行、状态如何变化、中间结果是什么。LangGraph 通过七种 StreamMode 和精心设计的 StreamPart 类型体系,提供了从粗粒度到细粒度的完整流式输出方案。

LangGraph 1.1.6 引入了 v2 Stream API,它将所有流式数据统一为带类型标签的 StreamPart 字典,���底解决了 v1 中多种 stream_mode 混合使用时类型模糊的问题。v2 API 使得每个流式事件都是自描述的:你可以通过 part["type"] 直接判断事件类型,无需依赖上下文推断。

本章将从 StreamMode 的枚举定义出发,逐一分析七种模式的语义和实现,深入 StreamProtocolStreamMessagesHandler 的源码,揭示流式输出在 Pregel 循环中的注入点和数据流转路径。

本章要点

  1. 七种 StreamMode 的语义区别——values/updates/custom/messages/checkpoints/tasks/debug
  2. v2 Stream API 的 StreamPart 类型体系——带类型标签的联合类型
  3. StreamWriter 的注入机制——如何在节点中发射自定义流式数据
  4. StreamMessagesHandler 的回调实现——LLM token 级别的流式捕获
  5. StreamProtocol 在 Pregel 循环中的集成方式

13.2 StreamMode 枚举

13.2.1 七种模式定义

StreamMode 定义在 langgraph/types.py 中,是一个字面量联合类型:

StreamMode = Literal[
    "values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]

每种模式对应一种输出粒度和视角:

StreamMode输出内容触发时机典型消费者
values完整状态快照每个超步结束后前端展示完整状态
updates节点名 + 该节点的输出每个节点执行后调试、日志
custom用户自定义数据节点内调用 StreamWriter进度条、中间结果
messagesLLM 消息 tokenLLM 流式生成时打字机效果
checkpoints检查点快照检查点创建时持久化监控
tasks任务开始/结束事件任务生命周期变化执行监控
debugcheckpoints + tasks同上两者开发调试

13.2.2 模式组合

stream 方法支持同时订阅多种模式:

# v1 API:多模式返回元组
for mode, data in graph.stream(input, stream_mode=["values", "messages"]):
    if mode == "values":
        print(f"State: {data}")
    elif mode == "messages":
        print(f"Token: {data}")

# v2 API:统一的 StreamPart
for part in graph.stream(input, version="v2"):
    if part["type"] == "values":
        print(f"State: {part['data']}")
    elif part["type"] == "messages":
        msg, meta = part["data"]
        print(f"Token: {msg.content}")

13.3 StreamPart 类型体系

13.3.1 v2 API 的核心创新

v2 Stream API 将每个流式事件包装为一个带 type 字段的 TypedDict,使得类型判别成为可能:

StreamPart = TypeAliasType(
    "StreamPart",
    ValuesStreamPart[OutputT]
    | UpdatesStreamPart
    | MessagesStreamPart
    | CustomStreamPart
    | CheckpointStreamPart[StateT]
    | TasksStreamPart
    | DebugStreamPart[StateT],
    type_params=(StateT, OutputT),
)

每个具体的 StreamPart 类型都包含三个公共字段:

classDiagram
    class StreamPart {
        <<TypedDict>>
        +type: str
        +ns: tuple[str, ...]
        +data: Any
    }

    class ValuesStreamPart {
        +type: "values"
        +ns: tuple[str, ...]
        +data: OutputT
        +interrupts: tuple[Interrupt, ...]
    }

    class UpdatesStreamPart {
        +type: "updates"
        +ns: tuple[str, ...]
        +data: dict[str, Any]
    }

    class MessagesStreamPart {
        +type: "messages"
        +ns: tuple[str, ...]
        +data: tuple[AnyMessage, dict]
    }

    class CustomStreamPart {
        +type: "custom"
        +ns: tuple[str, ...]
        +data: Any
    }

    class TasksStreamPart {
        +type: "tasks"
        +ns: tuple[str, ...]
        +data: TaskPayload | TaskResultPayload
    }

    class CheckpointStreamPart {
        +type: "checkpoints"
        +ns: tuple[str, ...]
        +data: CheckpointPayload
    }

    class DebugStreamPart {
        +type: "debug"
        +ns: tuple[str, ...]
        +data: DebugPayload
    }

    StreamPart <|-- ValuesStreamPart
    StreamPart <|-- UpdatesStreamPart
    StreamPart <|-- MessagesStreamPart
    StreamPart <|-- CustomStreamPart
    StreamPart <|-- TasksStreamPart
    StreamPart <|-- CheckpointStreamPart
    StreamPart <|-- DebugStreamPart

13.3.2 ns 字段:命名空间追踪

每个 StreamPart 都包含 ns(namespace)字段,它是一个字符串元组,标识事件来自图的哪个层级:

  • ():来自顶层图
  • ("subgraph_name",):来自名为 subgraph_name 的子图
  • ("outer", "inner"):来自嵌套两层的子图

这使得在启用 subgraphs=True 时,消费者可以精确识别每个事件的来源。

13.3.3 ValuesStreamPart

class ValuesStreamPart(TypedDict, Generic[OutputT]):
    type: Literal["values"]
    ns: tuple[str, ...]
    data: OutputT
    interrupts: tuple[Interrupt, ...]

values 模式在每个超步结束后发射完整的状态快照。data 的类型与图的输出类型一致。interrupts 字段记录了该步中发生的中断——这是 v2 API 特有的增强,v1 中中断信息需要通过 __interrupt__ 键间接获取。

13.3.4 UpdatesStreamPart

class UpdatesStreamPart(TypedDict):
    type: Literal["updates"]
    ns: tuple[str, ...]
    data: dict[str, Any]

updates 模式在每个节点执行后发射该节点的输出。data 是一个字典,键是节点名,值是节点返回的更新。如果多个节点在同一超步并行执行,每个节点的更新会单独发射。

13.3.5 MessagesStreamPart

class MessagesStreamPart(TypedDict):
    type: Literal["messages"]
    ns: tuple[str, ...]
    data: tuple[AnyMessage, dict[str, Any]]

messages 模式捕获 LLM 的 token 级流式输出。data 是一个二元组:消息对象(通常是 AIMessageChunk)和元数据字典。元数据包含 langgraph_steplanggraph_nodelanggraph_triggers 等上下文信息。

13.3.6 CustomStreamPart

class CustomStreamPart(TypedDict):
    type: Literal["custom"]
    ns: tuple[str, ...]
    data: Any

custom 模式承载用户通过 StreamWriter 发射的任意数据。这是最灵活的模式——你可以用它发送进度百分比、中间计算结果、或任何自定义结构。

13.4 StreamWriter 注入机制

13.4.1 定义与签名

StreamWriter = Callable[[Any], None]

StreamWriter 的类型定义极为简洁——它就是一个接收任意参数、无返回值的可调用对象。它总是作为关键字参数注入到节点函数中:

def my_node(state: State, writer: StreamWriter) -> dict:
    writer({"progress": 0.5})  # 发射自定义流式数据
    # ... 执行业务逻辑 ...
    writer({"progress": 1.0})
    return {"result": "done"}

在 LangGraph 1.1.6 中,StreamWriter 也通过 Runtime 对象提供:

def my_node(state: State, runtime: Runtime) -> dict:
    runtime.stream_writer({"progress": 0.5})
    return {"result": "done"}

13.4.2 注入流程

StreamWriter 的注入发生在任务准备阶段。当创建 PregelExecutableTask 时,框架会检查节点函数的参数签名,如果发现 writerstream_writer 参数,就注入一个绑定到当前流式管道的写入器:

sequenceDiagram
    participant Node as 节点函数
    participant SW as StreamWriter
    participant SP as StreamProtocol
    participant Queue as 输出队列

    Node->>SW: writer({"progress": 0.5})
    SW->>SP: __call__((ns, "custom", data))
    SP->>Queue: 写入 StreamChunk
    Queue-->>Client: 消费者读取

13.4.3 无订阅时的空操作

当调用方没���订阅 custom 模式时,StreamWriter 会变成一个空操作(no-op):

def _no_op_stream_writer(_: Any) -> None: ...

这个设计确保了节点代码不需要检查 stream_mode 是否包含 "custom"——写入操作总是安全的,只是在没有订阅者时数据会被静默丢弃。

13.5 StreamProtocol 与 StreamChunk

13.5.1 StreamProtocol

StreamProtocol 是流式输出的传输层抽象,定义在 langgraph/pregel/protocol.py 中:

StreamChunk = tuple[tuple[str, ...], str, Any]
# (命名空间元组, 模式字符串, 数据)

class StreamProtocol:
    __slots__ = ("modes", "__call__")

    modes: set[StreamMode]
    __call__: Callable[[Self, StreamChunk], None]

    def __init__(
        self,
        __call__: Callable[[StreamChunk], None],
        modes: set[StreamMode],
    ) -> None:
        self.__call__ = cast(Callable[[Self, StreamChunk], None], __call__)
        self.modes = modes

StreamProtocol 只有两个属性:

  • modes:订阅的模式集合,用于过滤
  • __call__:接收 StreamChunk 的回调函数

13.5.1.5 __slots__ 优化:StreamProtocol 的内存布局

StreamProtocol.__slots__ = ("modes", "__call__") 这一行——给一个类加 slots

class StreamProtocol:
    __slots__ = ("modes", "__call__")
    modes: set[StreamMode]
    __call__: Callable[[Self, StreamChunk], None]

Python __slots__ 的作用——禁止动态加属性、且用紧凑的 C-array 代替 __dict__ 存储字段。

内存节省——没 slots 的对象每个要存一个 __dict__(在 CPython 里约 56 字节初始大小)——加了 slots 节省这 56 字节。StreamProtocol 在一次 streaming 中可能创建上千个实例(每个子图一个 + DuplexStream 包装)——节省的累积是 MB 级。

性能收益——属性访问 slots 比 dict 快——从 __dict__ 查找是 hash + 线性探测(~5-10 ns)、slots 是直接数组索引(<1 ns)。stream.modes in ... 这种 check 在热路径上——每个 token 都要 check 一次——节省的累积显著。

为什么把 __call__ 作为 slot?——因为 StreamProtocol 要像函数一样调用stream(chunk))——Python 的 call 协议会找 type(obj).__call__。通过把 __call__ 作为 instance 属性、每个 StreamProtocol 实例可以有自己的 callable——和闭包一起、实现动态绑定的 stream backend。

这是 Python 里少有的”slots 获得 C++ 级别性能 的地方——LangGraph 在这种高频创建的小对象上毫不含糊地用 slots 优化。

13.5.2 DuplexStream

在子图场景中,子图的流式输出需要同时发送到子图自己的消费者和父图的流式管道。DuplexStream 实现了这种多路复用:

def DuplexStream(*streams: StreamProtocol) -> StreamProtocol:
    def __call__(value: StreamChunk) -> None:
        for stream in streams:
            if value[1] in stream.modes:  # value[1] 是模式字符串
                stream(value)

    return StreamProtocol(__call__, {mode for s in streams for mode in s.modes})
graph LR
    Node[节点输出] --> Duplex[DuplexStream]
    Duplex -->|"mode in modes_A"| StreamA[子图 Stream]
    Duplex -->|"mode in modes_B"| StreamB[父图 Stream]

13.5.2.5 DuplexStream 的闭包 + 过滤合并

DuplexStream 实际实现(streams=多个 StreamProtocol):

def DuplexStream(*streams: StreamProtocol) -> StreamProtocol:
    def __call__(value: StreamChunk) -> None:
        for stream in streams:
            if value[1] in stream.modes:  # 按每个 stream 的 modes 过滤
                stream(value)

    return StreamProtocol(__call__, {mode for s in streams for mode in s.modes})

两个关键设计

① 内部 __call__ 闭包捕获 streams——每次调用 DuplexStream 都创建新闭包、streams 被 capture 进去。调用 duplex(chunk) 时按 modes 过滤分发——只有订阅了这个 mode 的 stream 才收到

modes 合并——用 set comprehension {mode for s in streams for mode in s.modes} 构造一个超集——返回的 StreamProtocol 订阅的 modes 是所有 streams 订阅的 mode 的并集。上层看到这个超集、往 duplex 投递任何 mode 的 chunk——duplex 内部再按子 stream 过滤。

这就是事件系统的”fan-out + filter 模式——一个入口、多个出口、每个出口独立过滤。典型应用是 pub/sub 系统。DuplexStream 用 20 行代码实现了 pub/sub 的核心——不需要专门的 message broker

子图场景下的完整数据流

  1. 子图节点 emit chunk
  2. 子图的 StreamProtocol 接收
  3. 但子图的 StreamProtocol 是 DuplexStream(own_stream, parent_stream)
  4. 同时分发到两个 stream——子图消费者收到 + 父图消费者也收到(如果订阅了)

这个机制让子图的 streaming 既对子图消费者透明、又能透过到父图——LangGraph 的嵌套 streaming 就靠这个。

13.5.3 流式数据在 PregelLoop 中的注入

PregelLooptick 方法在多个关键时刻发射流式事件:

class PregelLoop:
    def tick(self) -> bool:
        # 准备任务
        self.tasks = prepare_next_tasks(...)

        # 发射 checkpoints 事件
        self._emit("checkpoints", map_debug_checkpoint, ...)

        # 发射 tasks 开始事件
        self._emit("tasks", map_debug_tasks, self.tasks.values())

        # ... 执行任务 ...

        # 发射 tasks 结束事件
        self._emit("tasks", map_debug_task_results, ...)

        # 发射 values 和 updates 事件
        for chunk in map_output_values(...):
            self.stream((..., "values", chunk))
        for chunk in map_output_updates(...):
            self.stream((..., "updates", chunk))

13.5.4 StreamChunk = tuple[tuple[str, ...], str, Any] 的三元组设计

StreamChunk 是 LangGraph 内部流式事件的wire format

StreamChunk = tuple[tuple[str, ...], str, Any]
#              (命名空间元组,    模式字符串, 数据)

为什么是 tuple 而不是 dict 或 class?——因为:

① 性能——tuple 是 Python 里最轻量的复合数据结构。创建和解包比 dict / class 快 3-5 倍、比 dict 省 ~100 字节内存。

② 模式匹配简单——ns, mode, data = chunk 解构、比 chunk["ns"], chunk["mode"], chunk["data"] 少 3 次 dict 查找。

③ 类型别名能表达清楚——StreamChunk = tuple[tuple[str, ...], str, Any] 让类型 checker 知道结构——既简洁又类型安全

代价——字段顺序是 public API。如果未来想加第 4 个字段(比如 timestamp)——要么改 tuple 长度(broken)、要么放在 data 里(嵌套)。LangGraph 通过严格版本控制(v1 / v2 API) 来管这种演进——v2 的 StreamPart 就是 TypedDict、更灵活地加字段。

内部用 tuple、外部用 TypedDict 是 LangGraph 的分层设计——内部追求性能、对外保证 API 灵活。中间的 Pregel 层做 tuple → TypedDict 的转换——上层用户完全看不到 tuple。

13.5.5 PregelLoop._emit 的函数式事件发射

PregelLoop 里 emit 事件的模式是高阶函数 + mapper

def _emit(self, mode, mapper, *args):
    if mode in self.stream.modes:
        for chunk in mapper(*args):
            self.stream((self.ns, mode, chunk))

三层解耦

mode in self.stream.modes 的守卫——订阅了的模式才计算 mapper。没订阅的模式、连 mapper 都不调——lazy computation、省 CPU。

② mapper 是函数——map_debug_checkpoint / map_debug_tasks 等——每个 mode 有自己的专用 mapper。mapper 只负责 “把内部状态转成 stream chunk”、不关心订阅逻辑、不关心发送。

self.stream((ns, mode, chunk)) 的 uniform 发送——任何 mapper 产生的 chunk 走同一个 stream channel。

这种 “订阅守卫 + 懒计算 mapper + 统一发送 的模式让 PregelLoop 能发出上百种事件、但代码量极小。添加新 stream mode 只要:

  1. 定义一个新 mapper
  2. 在 PregelLoop 的合适位置调一次 _emit("new_mode", new_mapper, ...)

整个事件系统是 “组合型设计——不是每个 event type 写一份发射代码、而是统一框架 + 多种 mapper。这种模式在 LangChain Core 的 callback、React scheduler 的 markTaskXxx 都有体现——事件类型是数据、不是代码结构

13.6 StreamMessagesHandler:LLM Token 捕获

13.6.1 回调机制

messages 模式的实现依赖于 LangChain 的回调系统。StreamMessagesHandler 同时继承了 BaseCallbackHandler_StreamingCallbackHandler,在 LLM 调用的各个生命周期阶段捕获消息:

class StreamMessagesHandler(BaseCallbackHandler, _StreamingCallbackHandler):
    run_inline = True  # 在主线程运行,避免排序问题

    def __init__(self, stream, subgraphs, *, parent_ns=None):
        self.stream = stream
        self.subgraphs = subgraphs
        self.metadata: dict[UUID, Meta] = {}
        self.seen: set[int | str] = set()
        self.parent_ns = parent_ns

13.6.2 Token 流的捕获链路

sequenceDiagram
    participant LLM as ChatModel
    participant Handler as StreamMessagesHandler
    participant Stream as StreamProtocol
    participant Client as 消费者

    LLM->>Handler: on_chat_model_start(metadata)
    Note over Handler: 记录 run_id -> (ns, metadata)

    loop 每个 token
        LLM->>Handler: on_llm_new_token(chunk)
        Handler->>Handler: 检查 metadata[run_id]
        Handler->>Stream: _emit(meta, chunk.message)
        Stream->>Client: ("messages", (AIMessageChunk, metadata))
    end

    LLM->>Handler: on_llm_end(response)
    Note over Handler: 发射最终完整消息(去重)
    Handler->>Handler: 清理 metadata[run_id]

关键实现细节:

  1. 元数据追踪on_chat_model_start 时记录 run_id(ns, metadata) 的映射
  2. token 发射on_llm_new_token 时根据 run_id 找到元数据,构造 (message, metadata) 元组发射
  3. 去重:使用 seen 集合跟踪已发射的消息 ID,避免在 on_llm_end 时重复发射
  4. 子图过滤:如果 subgraphs=False,跳过命名空间深度 > 0 的事件

13.6.3 节点输出中的消息捕获

StreamMessagesHandler 不仅捕获 LLM 的 token,还捕获节点返回值中的消息对象:

def on_chain_end(self, response, *, run_id, **kwargs):
    if meta := self.metadata.pop(run_id, None):
        if isinstance(response, Command):
            self._find_and_emit_messages(meta, response.update)
        elif isinstance(response, Sequence) and any(
            isinstance(value, Command) for value in response
        ):
            for value in response:
                if isinstance(value, Command):
                    self._find_and_emit_messages(meta, value.update)
                else:
                    self._find_and_emit_messages(meta, value)
        else:
            self._find_and_emit_messages(meta, response)

_find_and_emit_messages 方法递归扫描返回值,从字典、BaseModel、dataclass 等结构中提取 BaseMessage 对象。这使得 messages 模式能够捕获所有经过节点的消息,而不仅是 LLM 直接输出的 token。

13.6.4 三个真实实现细节:ordering、ID 兜底、4 路 dispatch

上面的流程对了,但有三处pregel/_messages.py 里的工程细节容易漏看——没它们,token 流式不会这么可靠:

1. run_inline = True 为什么必要(line 50-51,源码原注释 “We want this callback to run in the main thread to avoid order/locking issues.”):

LangChain callback 默认可以异步/多线程执行——按”发生顺序”触发但不保证”处理顺序”。对 token 流式这个约束是致命的:如果 token Hel 先到但处理线程 A 被挂起、token lo 后到被处理线程 B 优先处理,最终发出顺序变成 lo-Hel——屏幕上打出 “loHel” 而不是 “Hello”。run_inline = True 把此 handler 强制在主事件循环线程内内联执行、保证发生顺序 ≡ 处理顺序 ≡ 发射顺序。这是 token streaming 能正确显示的第一道防线。

2. 消息 ID 的 UUID 兜底(line 94-95):

def _emit(self, meta, message, *, dedupe=False):
    if dedupe and message.id in self.seen:
        return
    else:
        if message.id is None:
            message.id = str(uuid4())   # ← 没 ID 就补一个
        self.seen.add(message.id)
        self.stream((meta[0], "messages", (message, meta[1])))

seen: set[int | str] 去重依赖 message.id——但 LangChain 的 BaseMessageid 字段是 Optional[str]、有些 provider 返回的 message 没有 ID。如果直接走 if message.id in seenNone in {"abc"} 总是 False、每次 emit 都会重复发出同一条消息(特别是 on_chat_model_end 调用时把 on_llm_new_token 已流出的 token 再拼成完整消息重发一次——用户就会看到一模一样的内容被打两遍)。

兜底逻辑:没 ID 就当场生成 UUID 填进去、再记 seen代价是轻微的 side-effect(修改了用户传入的 message 对象)但换来 dedup 能正常工作、消息不会被重复发射。这种”在用户忘记给 ID 时替他补一个”的友好设计是 LangGraph 能和各种 LLM provider(OpenAI/Anthropic/Ollama 等 id 约定不一致)无缝协作的基础。

3. _find_and_emit_messages 的 4 路 dispatch(line 99-113):

def _find_and_emit_messages(self, meta, response):
    if isinstance(response, BaseMessage):                # ① 直接是消息
        self._emit(meta, response, dedupe=True)
    elif isinstance(response, Sequence):                 # ② 消息序列
        for value in response:
            if isinstance(value, BaseMessage):
                self._emit(meta, value, dedupe=True)
    else:
        for value in _state_values(response):            # ③ state 字典值
            if isinstance(value, BaseMessage):
                self._emit(meta, value, dedupe=True)
            elif isinstance(value, Sequence):            # ④ state 里嵌套序列
                for item in value:
                    if isinstance(item, BaseMessage):
                        self._emit(meta, item, dedupe=True)

这 15 行代码覆盖 LangGraph 节点返回值的四种典型形态

  • ① 节点直接 return AIMessage(...)——极少但合法
  • ② 节点 return [msg1, msg2]——一些 agent 返回多消息
  • ③ 节点 return {"messages": [...], "other_state": ...}——经典 state dict 形态,_state_values 把 dict 展平成 values
  • ④ state dict 里某个 key 的 value 是 [msg, msg, ...]——messages 键就是这形态、4 路里专门处理嵌套序列

没有任何一路被忽略——用户写 return {"messages": [msg]}return msg 都能被流式机制感知。朴素的结构发现算法 + 递归降级 的设计让用户不用关心”消息该放哪儿”——想放哪就放哪、框架保证被发现。

但这种”发现式”处理也有代价——BaseMessage 以外的自定义消息类型无效。用户如果定义了 class MyMessage(BaseModel) 当消息用、不继承 BaseMessage——isinstance(..., BaseMessage) 永远 False、流式系统看不到这种消息。这个边界需要使用者自己清楚:想走流式就必须用 LangChain 的 BaseMessage 或其子类

13.6.5 on_chat_model_start 的 ns 提取和 TAG_NOSTREAM 过滤

_messages.py:123-143on_chat_model_start 有几段关键逻辑:

def on_chat_model_start(self, serialized, messages, *, run_id, tags=None, metadata=None, **kwargs):
    if metadata and (not tags or (TAG_NOSTREAM not in tags)):
        ns = tuple(cast(str, metadata["langgraph_checkpoint_ns"]).split(NS_SEP))[:-1]
        if not self.subgraphs and len(ns) > 0 and ns != self.parent_ns:
            return
        if tags:
            if filtered_tags := [t for t in tags if not t.startswith("seq:step")]:
                metadata["tags"] = filtered_tags
        self.metadata[run_id] = (ns, metadata)

三层过滤

TAG_NOSTREAM 检查——LangChain 允许用户给 Runnable 打 TAG_NOSTREAM tag、表示 这个 LLM 调用我不想流式化。典型场景:主 agent 流式、但内部的 tool-calling sub-model 不流式。handler 看到这个 tag 就跳过——metadata 不记录、后续 on_llm_new_token 也不会 emit。

len(ns) > 0 and ns != self.parent_ns——如果用户 stream(subgraphs=False) 且事件来自子图、跳过。但 parent_ns 例外——显式从子图启动 streaming 时保留(source docstring 里的 example 场景)。这种”默认不穿透子图、但显式启动的子图透过”的语义需要这个精细判断。

seq:step tag 过滤——LangChain 内部给 Runnable sequence 的每一步打 seq:step:N tag、对用户毫无意义的内部信息。handler 过滤掉这些、只保留业务相关的 tag——对外暴露的 metadata 更干净

ns 的 [:-1] 处理也值得看——langgraph_checkpoint_ns 的最后一段是当前 frame前面的段是父图 ns。streaming event 的 ns 应该是父 ns、不包括自己——所以 [:-1] 截掉最后一段。这和 §14.7.3 讲的 Runtime 命名空间处理是同一套逻辑。

13.6.6 on_chain_start 的预加载 seen 去重

看 _messages.py:207-215:

if self.metadata[run_id] = (ns, metadata)
for value in _state_values(inputs):
    if isinstance(value, BaseMessage):
        if value.id is not None:
            self.seen.add(value.id)
    elif isinstance(value, Sequence) and not isinstance(value, str):
        for item in value:
            if isinstance(item, BaseMessage):
                if item.id is not None:
                    self.seen.add(item.id)

on_chain_start 时把 input state 里所有已有消息的 ID 都塞进 seen——为什么?

因为用户节点可能返回 “原始 state 加了新消息”——比如 return {"messages": state["messages"] + [new_msg]}。如果不先标记老消息、_find_and_emit_messages重新 emit 所有老消息 + 新消息——客户端看到重复。

预加载 seen 让老消息在 on_chain_end 的遍历里被 dedup 跳过——只有真正新的消息被发出。这就是 LangGraph 能识别 “增量消息”的底层机制——依赖 BaseMessage.id 作为识别标识 + seen 集合记忆。

这里的 isinstance(value, Sequence) and not isinstance(value, str) 检查排除 str——因为 Python 里 strSequence[str]、不加排除会试图对字符串的每个 char 做 isinstance(BaseMessage)——无意义但不出错、就是浪费 CPU。显式排除 str 是防御性编程

13.6.7 tap_output_itertap_output_aiter 的空实现

_messages.py:115-121 有两个空方法:

def tap_output_aiter(self, run_id, output):
    return output

def tap_output_iter(self, run_id, output):
    return output

它们来自 _StreamingCallbackHandler 基类——handler 实现者可以用来 拦截并修改 stream 输出。StreamMessagesHandler 不需要拦截(它只观察、不修改)、所以原样返回 output

为什么要实现但什么都不做?——因为父 trait 可能要求必须有这两个方法(Python 的 ABC)。即使是空实现也得写、否则 instantiate 时会报 Can't instantiate abstract class

这种 “空实现作为 no-op 在 Rust 里是自动的(trait 方法有 default impl)——Python 没有 trait、abstract class 要求所有方法都实现。这是语言差异在具体代码里的体现。

13.6.8 on_chain_error 的 metadata 清理——防内存泄漏

_messages.py 里的 on_chain_error(line 242+):

def on_chain_error(self, error, *, run_id, parent_run_id=None, **kwargs):
    self.metadata.pop(run_id, None)

看似简单的一行、实际是防内存泄漏的关键

self.metadata: dict[UUID, Meta]on_chat_model_start / on_chain_start 时塞入、在对应的 on_end / on_error 里 pop 出。如果只有 on_end 清理、on_error 不清理——LLM 报错时 metadata 不清、run_id 永远留在 dict 里、累积几百万请求后 metadata dict 可能占 GB 级内存。

对称地定义 on_error + pop 保证无论正常结束还是异常结束、metadata 都被清理。这种 “try/finally 式的清理” 是 LangChain callback 生态的标准模式——所有 handler 都要成对实现 on_start + (on_end or on_error)

这和 hyper 第 14 章讲的 timer drop on scope exit、React 第 6 章讲的 restoreSelection 都是同一种 “所有路径都清理资源 的工程习惯——资源泄漏是最常见的生产 bug 之一、只能靠code review 时强制检查清理配对来防范。

13.7 各 StreamMode 的输出映射函数

13.7.0 mapper 函数的命名规律

_output.py 里的 mapper 函数有一致的命名模式:

map_output_values(...)    # → values mode
map_output_updates(...)   # → updates mode
map_debug_checkpoint(...)  # → debug / checkpoints mode
map_debug_tasks(...)      # → debug / tasks mode
map_debug_task_results(...)

命名模式

  • map_ 前缀——表明这是转换函数(functional programming vocabulary)
  • output_ / debug_ 中缀——事件来源分类
  • 后面描述具体事件类型

为什么要这么细分?——因为 debug mode 实际上是 checkpoints + tasks 的聚合。debug_checkpoint 生成的 chunk 会同时发给 checkpoints 订阅者和 debug 订阅者。共用 mapper 避免重复实现。

具体发射时、PregelLoop 里可能是:

self._emit("checkpoints", map_debug_checkpoint, ...)
self._emit("debug", map_debug_checkpoint, ...)  # 同一个 mapper、两个 mode

一个 mapper 服务多个 mode——这是 debug mode 设计合理的技术基础。

13.7.1 map_output_values

def map_output_values(
    output_channels: str | Sequence[str],
    pending_writes: Literal[True] | Sequence[tuple[str, Any]],
    channels: Mapping[str, BaseChannel],
) -> Iterator[dict[str, Any] | Any]:
    if isinstance(output_channels, str):
        if pending_writes is True or any(
            chan == output_channels for chan, _ in pending_writes
        ):
            yield read_channel(channels, output_channels)
    else:
        if pending_writes is True or {
            c for c, _ in pending_writes if c in output_channels
        }:
            yield read_channels(channels, output_channels)

values 模式读取所有输出 Channel 的当前值,构成完整的状态快照。只有当相关 Channel 在本步被写入时才发射。

13.7.2 map_output_updates

def map_output_updates(
    output_channels: str | Sequence[str],
    tasks: list[tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]],
    cached: bool = False,
) -> Iterator[dict[str, Any | dict[str, Any]]]:
    output_tasks = [
        (t, ww) for t, ww in tasks
        if (not t.config or TAG_HIDDEN not in t.config.get("tags", EMPTY_SEQ))
        and ww[0][0] != ERROR
        and ww[0][0] != INTERRUPT
    ]
    ...

updates 模式将每个任务的写入按节点名分组。隐藏节点(标记了 TAG_HIDDEN)和错误/中断写入被过滤掉。输出格式为 {node_name: update_value}

13.7.3 Debug 输出

debug 模式组合了 checkpoints 和 tasks 的输出,包裹在带时间戳和步数的外层结构中:

class _DebugTaskPayload(TypedDict):
    step: int
    timestamp: str        # ISO 8601 格式
    type: Literal["task"]
    payload: TaskPayload

class _DebugTaskResultPayload(TypedDict):
    step: int
    timestamp: str
    type: Literal["task_result"]
    payload: TaskResultPayload

class _DebugCheckpointPayload(TypedDict, Generic[StateT]):
    step: int
    timestamp: str
    type: Literal["checkpoint"]
    payload: CheckpointPayload[StateT]
graph TB
    subgraph debug 模式输出
        D[DebugPayload] --> DC[type: checkpoint]
        D --> DT[type: task]
        D --> DTR[type: task_result]

        DC --> CP[CheckpointPayload<br/>config, metadata, values, next, tasks]
        DT --> TP[TaskPayload<br/>id, name, input, triggers]
        DTR --> TRP[TaskResultPayload<br/>id, name, error, interrupts, result]
    end

13.7.4 _state_values 的三类型 dispatch

_state_values(line 31-39)是 handler 里的一个小 utility、但设计精致:

def _state_values(obj: Any) -> Sequence[Any]:
    """Extract top-level field values from a state object (dict, BaseModel, or dataclass)."""
    if isinstance(obj, dict):
        return list(obj.values())
    elif isinstance(obj, BaseModel):
        return [getattr(obj, k) for k in type(obj).model_fields]
    elif is_dataclass(obj) and not isinstance(obj, type):
        return [getattr(obj, f.name) for f in fields(obj)]
    return ()

三种 state schema 的统一处理

  • dict——直接 list(values())、Python 原生容器
  • BaseModel(pydantic)——用 type(obj).model_fields 得字段名、然后 getattr 取值
  • dataclass——用 fields(obj) 得 Field 对象、取 .name 再 getattr

is_dataclass(obj) and not isinstance(obj, type)——dataclass 类本身也 is_dataclass() == True、但我们要的是 instance、不是 class。加 not isinstance(obj, type) 排除 class 对象。

为什么不用 fields(obj) 直接取 value?——因为 fields(obj) 返回的是 Field 对象、不是当前值。要通过 getattr(obj, f.name) 才能取当前值——和 BaseModel 路径同构。

三种 state 形态的支持是 LangGraph 对用户 state schema 自由度的体现——用户可以用 TypedDict(同构 dict)、BaseModel、dataclass 任一种——streaming 都能无感知地提取字段值

兜底返回 ()——对不认识的 obj 返回空序列、不 panic。这让 handler 能安全处理各种用户数据——未知类型就跳过、不报错

13.7.4 TAG_HIDDEN 过滤 vs TAG_NOSTREAM——两种过滤的语义差异

注意 map_output_updates 里有一个过滤条件:

output_tasks = [
    (t, ww) for t, ww in tasks
    if (not t.config or TAG_HIDDEN not in t.config.get("tags", EMPTY_SEQ))
    and ww[0][0] != ERROR
    and ww[0][0] != INTERRUPT
]

TAG_HIDDEN 和 §13.6.5 讲的 TAG_NOSTREAM两种不同的 tag

  • TAG_HIDDEN ——节点要对流式输出完全隐藏。常见于 框架内部节点(如 __start__ / __end__ sentinel 节点)——用户不关心这些。updates / tasks 等 mode 都过滤掉 TAG_HIDDEN 节点。
  • TAG_NOSTREAM ——仅对 messages 模式隐藏。这是 “这个 LLM 我不想流式化” 的语义——但 values / updates 等仍然会看到这个节点的输出。

两种 tag 的设计反映了 “可见性” 的多维度——同一个节点可能需要:

  • 不出现在 state 更新日志(TAG_HIDDEN)
  • 但 token 还是要流(TAG_NOSTREAM 不打)
  • 或反过来:显示进度但不流 token(TAG_HIDDEN 不打、TAG_NOSTREAM 打)

四种组合覆盖不同的 observability 需求。这就是 LangGraph 对流式细粒度控制的体现——用户可以按需调每个维度。

13.8 v2 Stream API 的使用

13.8.1 基本用法

# v2 API:version="v2" 参数
async for part in graph.astream(input, version="v2"):
    match part["type"]:
        case "values":
            state = part["data"]
            interrupts = part["interrupts"]
        case "updates":
            for node, update in part["data"].items():
                print(f"{node}: {update}")
        case "messages":
            message, metadata = part["data"]
            print(message.content, end="", flush=True)
        case "custom":
            handle_custom(part["data"])

13.8.1.5 version="v2" 返回的 async 迭代器语义

v2 API 的 async for part in graph.astream(..., version="v2") 里、part严格顺序的——LangGraph 保证:

① 同一条 stream 上的事件按发生顺序到达——不会 “后发生的 update 先到”。这是 StreamProtocol 的单线程消费者模式保证的——事件写入 queue 后消费者按 FIFO 读。

② 不同 mode 的事件交错——messagesvalues 可以同时订阅、交错到达——但同一种 mode 内部保证顺序。

③ async 迭代器本身有 backpressure——用户 await 下一个 part 之前、producer 不发新 part。如果用户消费慢、graph 执行也会被自动 pacing——不会 OOM。

backpressure 的关键实现——LangGraph 内部用 asyncio.Queue 作为 stream 的 buffer、当 queue 满时 producer 自动挂起。这和 Go 的 buffered channel、Rust 的 Tokio mpsc channel 是同构的——生产者消费者速率解耦、但用 buffer + block 保证不爆

用户代码 side

async for part in graph.astream(input, version="v2"):
    await save_to_database(part)  # 慢操作
    # ↑ 这个 await 会自然地 backpressure 到 graph 执行

没有 backpressure 的 streaming生产 OOM 的常见原因——producer 比 consumer 快、buffer 无限增长——最后把内存吃光。LangGraph 的 async stream 设计天然规避了这个陷阱。

13.8.2 v1 vs v2 对比

graph LR
    subgraph "v1 API"
        V1S["stream(mode='values')"] --> V1O["Iterator[dict]"]
        V1M["stream(mode=['values','messages'])"] --> V1T["Iterator[tuple[str, Any]]"]
    end

    subgraph "v2 API"
        V2["stream(version='v2')"] --> V2O["Iterator[StreamPart]"]
        V2O --> V2D["part.type 判别"]
    end

v2 的主要优势:

  1. 类型安全:每个 StreamPart 都有明确的 type 字段,可用于类型窄化
  2. 统一接口:无论订阅多少种模式,输出格式一致
  3. 中断信息内联ValuesStreamPart 直接包含 interrupts 字段
  4. 泛型支持StreamPart[StateT, OutputT] 携带完整的类型信息

13.8.3 GraphOutput:invoke 的 v2 返回值

v2 API 还改进了 invoke 的返回类型:

@dataclass(frozen=True)
class GraphOutput(Generic[OutputT]):
    value: OutputT
    interrupts: tuple[Interrupt, ...] = ()

不再返回裸字典,而是一个带类型的容器:

result = graph.invoke(input, version="v2")
print(result.value)       # 输出值
print(result.interrupts)  # 中断信息

13.9 TaskPayload 与 TaskResultPayload

13.9.1 任务生命周期事件

tasks 模式提供了细粒度的任务执行监控:

class TaskPayload(TypedDict):
    """任务开始事件"""
    id: str              # 唯一任务 ID
    name: str            # 节点名
    input: Any           # 任务输入
    triggers: list[str]  # 触发原因

class TaskResultPayload(TypedDict):
    """任务结束事件"""
    id: str              # 唯一任务 ID
    name: str            # 节点名
    error: str | None    # 错误信息
    interrupts: list[dict]  # 中断列表
    result: dict[str, Any]  # 输出结果
stateDiagram-v2
    [*] --> TaskStart: tasks 事件 (TaskPayload)
    TaskStart --> TaskEnd: tasks 事件 (TaskResultPayload)
    TaskEnd --> [*]

    state TaskStart {
        id: 任务ID
        name: 节点名
        input: 输入数据
        triggers: 触发器列表
    }

    state TaskEnd {
        id: 任务ID
        name: 节点名
        result: 输出数据
        error: 错误信息
        interrupts: 中断列表
    }

13.9.1.5 triggers: list[str] 字段的溯源价值

TaskPayloadtriggers 字段是一个常被忽略但超级有用的字段:

class TaskPayload(TypedDict):
    id: str
    name: str
    input: Any
    triggers: list[str]  # ← 触发原因列表

triggers 记录哪些上游更新触发了这个任务——例如:

  • ["__start__"]——用户 invoke 触发、起点
  • ["node_a", "node_b"]——两个上游节点都写了这个 channel
  • ["channel_xxx"]——某个 channel 的值变化触发

溯源场景:debug 时一个任务突然被触发、不知道为什么——打开 tasks stream、看 triggers 字段——立刻看到触发链

Pregel 的超步模型里、任务是 reactive 的(channel 写入触发下游节点)——但这种 reactive 关系对用户是半黑盒。triggers 字段把这个黑盒打开——让用户看到每个任务的”因果由来”。

这和 distributed tracing 的 “parent span id” 概念同构——每个 task 有它的 cause chain、通过 triggers 可以重建。对复杂 agent(多节点交互)的 debug、这是救命字段

13.9.2 实际应用

for part in graph.stream(input, stream_mode="tasks", version="v2"):
    payload = part["data"]
    if "input" in payload:
        # 任务开始
        print(f"Starting task {payload['name']} (id={payload['id']})")
    else:
        # 任务结束
        if payload.get("error"):
            print(f"Task {payload['name']} failed: {payload['error']}")
        else:
            print(f"Task {payload['name']} completed: {payload['result']}")

13.9.3 TaskPayload vs TaskResultPayload 的 input vs result 分开

注意 Task 的开始和结束用两个独立的 TypedDict——TaskPayloadTaskResultPayload——不是一个 Payload 带 status 字段

class TaskPayload(TypedDict):
    id: str
    name: str
    input: Any
    triggers: list[str]

class TaskResultPayload(TypedDict):
    id: str
    name: str
    error: str | None
    interrupts: list[dict]
    result: dict[str, Any]

为什么不合成 {id, name, status: "start"/"end", data: ...} 一个 dict?——因为:

① 字段异构——start 有 input + triggers、end 有 error + interrupts + result——合成一个 dict 会让两种状态都要检查所有字段、且很多字段 Optional。

② TypedDict 类型精度——两个独立的 TypedDict 让 type checker 能根据 start vs end 精确推导可用字段。合成一个则退化成 “all Optional”、类型检查无用。

③ 消费者清晰——用户代码 if "input" in payload 就能判别开始/结束——字段缺失天然表达语义。不需要额外 status 字段。

共用字段 id + name——因为 start 和 end 要配对——用户可以存 pending_tasks[id] = info_from_start、遇到 end 时 pending_tasks.pop(id)——形成任务生命周期的完整记录。

这就是**“用不同类型表达不同状态、避免 status enum + Optional”**的 Python 模式——和 Rust 的 enum variants 异曲同工。

13.10 设计决策

13.10.1 为什么有七种模式而不是一种?

设计多种流式模式的核心原因是消费者的多样性

  • 前端 UI 只关心 LLM 文本流(messages
  • 管理面板需要完整状态(values
  • 日志系统需要增量更新(updates
  • 监控系统需要任务级事件(tasks
  • 开发者调试需要全部信息(debug

一种模式无法同时满足所有需求,而如果把所有信息都塞进一种模式,消费者需要过滤大量不相关的数据。

13.10.2 回调 vs 返回值

messages 模式使用回调(StreamMessagesHandler)而非返回值来捕获 LLM token。这是因为 LLM 的流式输出发生在节点执行过程中,而不是执行完成后。回调机制允许在不修改节点代码的前提下拦截中间数据。

13.10.3 StreamWriter 作为 Callable 而非类

StreamWriter 被定义为 Callable[[Any], None] 而非一个类,这是刻意的简约设计。节点代码只需要调用 writer(data) 即可,不需要了解任何框架内部细节。这种设计也使得测试更容易——你可以用任意 lambda 替代:

# 测试时
collected = []
my_node(state, writer=collected.append)
assert collected == [{"progress": 0.5}, {"progress": 1.0}]

13.10.4 run_inline = True 的必要性

StreamMessagesHandler 设置了 run_inline = True,这意味着回调在主线程中同步执行。这个选择是为了避免两个问题:

  1. 顺序保证:token 必须按生成顺序到达消��者
  2. 锁竞争:异步回调可能导致与 Channel 写入的竞争条件

代价是 LLM 生成的速度会略微受到回调处理时间的影响,但对于 token 级别的事件,这个开销可以忽略。

13.10.4.5 为什么不给 StreamMode 加 errorswarnings

有一个合理的疑问——为什么没有专门的 errorswarnings stream mode?

原因是错误和警告在现有 mode 里已经被表达

  • 节点错误——在 TaskResultPayload.error 里,通过 tasks mode 流出来
  • 任务中断——在 TaskResultPayload.interrupts 里,同样通过 tasks mode
  • 框架级异常——通过 Python 原生的 exception 机制、不走 stream

单独加 errors mode 会造成信息碎片化——用户要同时订阅 tasks + errors 才能完整监控——违反”最小订阅”原则

LangGraph 团队的哲学——stream mode 粒度要正交、尽量避免冗余。每个 mode 承载一类语义、彼此独立、不重叠。

类似地、没有 state_changes mode——因为 updates 已经覆盖(每个节点的 update 是 state change)、没有 logs mode——因为 debug 覆盖了。7 种 mode 是精心裁剪后的正交基

用户要更细粒度的错误监控?——订阅 tasks 模式、自己在消费端分类。LangGraph 把复杂度推给消费者的灵活性——框架不猜用户要什么、提供原始信号让用户自组织

13.10.5 TypeAliasType vs Union——PEP 695 的应用

StreamPart 的定义用了 TypeAliasType

StreamPart = TypeAliasType(
    "StreamPart",
    ValuesStreamPart[OutputT]
    | UpdatesStreamPart
    | MessagesStreamPart
    | CustomStreamPart
    | CheckpointStreamPart[StateT]
    | TasksStreamPart
    | DebugStreamPart[StateT],
    type_params=(StateT, OutputT),
)

TypeAliasType 来自 Python 3.12 (PEP 695) 或 typing_extensions 的 backport——让 type alias 成为一等公民、而不是普通变量赋值。

老写法

StreamPart = Union[ValuesStreamPart, UpdatesStreamPart, ...]
# 或
StreamPart: TypeAlias = Union[...]

这种 “变量赋值式 type alias” 有些局限:

  • 不支持带泛型参数的 alias(要用 TypeVar + generic
  • 不是独立的 “type” 在 type system 里——只是一个引用
  • 某些复杂场景(forward ref、recursive type)处理不好

TypeAliasType

  • 一等公民——type checker 能把它当独立类型处理
  • 支持 generic type_params——type_params=(StateT, OutputT) 明确声明
  • 更好的 error 信息——类型错误时 tool 能精确指向”StreamPart 的哪个 variant

LangGraph 用 PEP 695——因为StreamPart 是对外暴露的关键类型、给用户最好的类型体验值得用最新语法。

13.10.6 version="v2" 作为 stream 方法参数

v1/v2 的切换不是不同的方法——而是 stream(version=...) 参数

# v1
for chunk in graph.stream(input, stream_mode=["values"]):
    ...

# v2
for part in graph.stream(input, version="v2"):
    ...

为什么同一个方法、不同返回格式?——因为stream 的核心逻辑(任务调度、checkpoint、channel 更新)完全一样——只是输出包装方式不同。用 version 参数切换、避免 stream_v1 / stream_v2 两个函数的代码重复。

内部实现

# 概念性
def stream(self, input, *, version="v1", stream_mode=None, ...):
    if version == "v2":
        # 包装成 StreamPart
        for chunk in self._stream_impl(input, ...):
            yield self._wrap_as_stream_part(chunk)
    else:
        # v1 格式直接返回
        yield from self._stream_impl(input, ...)

共享 _stream_impl 的核心、仅在输出层分叉——这是 API 版本演进的健康方式。用户代码里只改一个参数就能切换——迁移成本最小

默认 version="v1"——保持向后兼容。LangGraph 团队慢慢推广 v2、等大部分用户迁移后再考虑把默认切换到 v2。

13.11 小结

本章系统分析了 LangGraph 的流式输出架构。七种 StreamMode 覆盖了从完整状态快照到 LLM token 的各个粒度层级。v2 Stream API 通过 StreamPart 类型体系实现了类型安全的流式消费,每个事件都是自描述的。StreamWriter 以极简的函数签名为节点提供了自定义流式输出的能力,而 StreamMessagesHandler 通过 LangChain 回调系统在不侵入业务代码的前提下捕获 LLM 的 token 流。

整个流式系统的设计理念可以归结为:分层解耦,按需订阅。生产者(节点、LLM、框架内部)不关心消费者订阅了什么;消费者通过 stream_mode 声明自己感兴趣的事件类型;StreamProtocol 作为中间层负责路由和过滤。这种设计使得新增流式模式只需定义新的 StreamPart 类型和对应的映射函数,对现有代码零影响。

下一章我们将探讨 LangGraph 的 Runtime 与 Context 机制,了解如何将运行时依赖(如用户身份、数据库连接)安全地注入到图的执行过程中。

13.12 跨章节呼应

与 React 第 4 章 Scheduler 的呼应——LangGraph 的 streaming 走回调(StreamMessagesHandler)+ 事件合并(_find_and_emit_messages 的 4 路 dispatch)、Scheduler 走 task queue + 时间切片——都是在 “异步消费者消费异构事件流 的场景下用 统一入口 + 类型分支 的模式。

与 hyper 第 10 章 http-body 的呼应——LangGraph 的 StreamPart 7 种类型、http-body 的 Frame<T>{Data, Trailers}——都是 “用 type-tagged enum 统一异构流式事件。LangGraph 更丰富(7 种 vs 2 种)、但设计哲学完全一致。

与 Serde 第 16 章 flatten 的呼应——LangGraph 的 _state_values 把 dict/BaseModel/dataclass 统一抽取 values、和 Serde 的 flatten 把嵌套 struct 展平到父级——都是 “用同一套遍历逻辑处理多种 state schema

与 Vite 第 15 章 SSR Module Runner 的呼应——Module Runner 的 concurrentModuleNodePromises 去重、StreamMessagesHandler 的 seen set——都是 “用集合去重高频事件。前者去重并发 fetch、后者去重 emit 过的消息——机制同源、应用不同。

与 vllm 第 8 章 ModelRunner 的呼应——LangGraph 的 StreamChunk tuple 紧凑 wire format + StreamPart TypedDict 灵活对外,vllm 的 InputBatch numpy 紧凑 + Python 包装——都是 “内部压缩 + 外部友好 的两层设计。

13.13 十二条工程原则收束

把本章源码观察压缩成流式系统设计的 12 条原则:

① 类型标签 + 联合类型表达异构流(§13.3)——每个事件 type 字段、消费者精确分派。

② namespace 字段追踪层级(§13.3.2)——ns: tuple[str, ...] 让子图透明可见。

③ run_inline=True 保顺序(§13.6.4)——token 流必须按生成顺序发出、禁止异步乱序。

④ UUID 兜底补消息 ID(§13.6.4 point 2)——user message 缺 ID 时 handler 补上、让 dedup 机制工作。

⑤ 4 路 dispatch 覆盖所有节点返回形态(§13.6.4 point 3)——BaseMessage / Sequence / dict / nested Sequence 都能找到消息。

⑥ 预加载 seen 支持增量消息(§13.6.6)——on_chain_start 时把 state 里老消息的 ID 先塞进 seen、只发真正新的。

__slots__ 优化高频对象(§13.5.1.5)——StreamProtocol 大量创建、slots 省 56 字节 + 属性访问快 5-10x。

⑧ DuplexStream 闭包合并 fan-out(§13.5.2.5)——一个入口多个出口、每个出口独立 mode 过滤。

⑨ 三元组 wire format 最轻(§13.5.4)——StreamChunk 用 tuple 而非 dict/class、省 memory 省 CPU。

⑩ TAG_NOSTREAM 允许跳过流式(§13.6.5)——用户打 tag 表明”这个 LLM 不要流式”、handler 遵守。

⑪ 两个独立 TypedDict 代替 status enum(§13.9.3)——TaskPayload + TaskResultPayload 让字段类型精确。

⑫ TypeAliasType 一等公民 type alias(§13.10.5)——PEP 695 语法让 StreamPart 的泛型联合类型被 type checker 正确识别。

这 12 条加起来就是 LangGraph 流式系统的工程密度——每一条都是针对真实用户场景的精确投入。读完本章、下次你自己设计流式系统——这些原则都能直接复用。

13.14 流式系统与可观测性——从 streaming 到 tracing

LangGraph 的 streaming 不只是 “给前端看个打字机效果”——它是可观测性的基石

把 StreamMode 映射到 observability 术语:

  • messagestoken-level tracing(每个 token 的 span)
  • tasksfunction-level tracing(每个节点的 span)
  • checkpointsstate snapshot(任意时刻的完整状态)
  • updatesevent log(状态变化事件流)
  • debugdistributed trace(跨层的完整链路)

一个完整的 AI 应用观测栈可以建立在 LangGraph 的 streaming 上:

for part in graph.astream(input, version="v2", stream_mode=["messages", "tasks", "checkpoints"]):
    match part["type"]:
        case "messages":
            # 发给 LangSmith / Braintrust / Arize 做 LLM tracing
            langsmith_client.log_token(part["data"])
        case "tasks":
            # 发给 Datadog / New Relic 做 APM
            dd_client.log_span(part["data"])
        case "checkpoints":
            # 发给自己的审计日志
            audit_log.record(part["data"])

这就是 streaming 的战略价值——不是 UX 功能、是基础设施。一个好的 streaming 系统直接决定了 observability 栈能不能建起来。

LangGraph 的 streaming 设计之所以这么精细(7 种模式、带 namespace、回调 handler 等)——因为它预设了这套被用作 observability 基础的使用场景。产品级 AI 应用必须有完整 observability——否则生产环境出问题根本定位不到。

读到本章结尾你应该意识到——streaming 是一等公民、不是可选项。任何严肃的 AI 框架都要有类似的机制。LangGraph 把这件事做到了目前 Python 生态里最完整的水平——对比 LangChain 的 callback 系统(第 12 章讨论过)、LlamaIndex 的 event system——LangGraph 的类型精度最高、场景覆盖最全。

13.15 流式在生产环境的真实案例

案例 1:ChatGPT-style 前端——用 messages 模式、通过 SSE (Server-Sent Events) 推 token 给浏览器:

@app.post("/chat")
async def chat(req: ChatRequest):
    async def generate():
        async for part in graph.astream(req.input, stream_mode="messages", version="v2"):
            msg, meta = part["data"]
            yield f"data: {json.dumps({'content': msg.content})}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream")

用户在前端看到打字机效果——延迟 = 第一个 token 到达的时间(通常 ~200ms)、不是整个响应完成的时间(可能 30 秒)。

案例 2:Multi-step Agent 的进度条——用 tasks 模式:

async for part in graph.astream(input, stream_mode=["tasks"], version="v2"):
    payload = part["data"]
    if "input" in payload:
        ui.show(f"Starting: {payload['name']}")
    else:
        ui.show(f"Done: {payload['name']}")

用户看到 “Starting: search_web → Done: search_web → Starting: analyze → …”——透明的 agent 执行过程、而不是黑盒等待。

案例 3:分布式审计日志——用 checkpoints 模式、把每个 checkpoint 推到 Kafka:

async for part in graph.astream(input, stream_mode=["checkpoints"], version="v2"):
    cp = part["data"]
    await kafka.produce("langgraph-checkpoints", json.dumps(cp))

下游系统可以重放任意历史 checkpoint 复现问题——生产事故的 forensic 分析变得可能。

案例 4:开发调试——用 debug 模式、在 localhost 展示完整执行:

async for part in graph.astream(input, stream_mode=["debug"], version="v2"):
    payload = part["data"]
    print(f"[{payload['timestamp']}] step={payload['step']} type={payload['type']}")
    pprint(payload["payload"])

开发者看到每一步的完整状态——对比 print-debugging、信息密度高出几个数量级。

四个案例——四种 stream_mode 组合——覆盖从用户可见到开发者可见的完整 UX/DX 谱系。LangGraph 的 7 种 mode 看似多、但每一种都在真实场景里有不可替代的角色

13.16 从 streaming 看 LangGraph 的”渐进复杂度”哲学

最后一节——用一个更高的视角看 streaming 在 LangGraph 整体架构中的位置。

LangGraph 的设计有一个哲学叫 渐进复杂度”(progressive disclosure of complexity)

  • Level 1——.invoke()、返回最终结果——和 LangChain LLMChain 一样简单
  • Level 2——.stream() 默认 values 模式——拿到每步状态、开始有 observability
  • Level 3——指定多 stream_mode——开始细分不同消费者
  • Level 4——version="v2" 加 StreamPart 类型——开始写类型安全的消费代码
  • Level 5——自定义 handler、DuplexStream——嵌入到自己的 observability 栈

每一层的代码量和心智负担都是阶梯上升的——用户按需进入。一个 hackathon 原型用 .invoke() 就够、一个生产 agent 需要 v2 + checkpoints + messages、一个平台级 agent infrastructure 需要完整的自定义 handler 栈。

streaming 是 progressive disclosure 的第一跳——从 batch 思维到 streaming 思维的过渡。一旦跨过这一跳、后面所有 observability / customization 的高级特性都顺理成章。

这就是为什么所有严肃的 AI framework 都必须有 streaming——不只是功能、是打开复杂度阶梯的第一扇门。LangGraph 把这扇门做得很宽——7 种模式、v1/v2 双 API、精细到 token 的 handler——让用户在自己的成长阶段都能找到合适的抽象层。

13.16.1 一个具体的渐进场景——从 hackathon 到生产

用一个具体的 agent 项目演进来说明 LangGraph 各层抽象的用处:

Week 1(hackathon MVP)

result = graph.invoke({"input": "search weather"})
print(result)

3 行代码、一个 agent——够用、但没有任何 UX/observability。

Week 2(加前端)

for chunk in graph.stream({"input": query}, stream_mode=["messages"]):
    yield chunk.content  # SSE 推到浏览器

打字机效果有了——用户感知的延迟降了 10 倍。

Week 3(加监控)

for part in graph.astream(input, stream_mode=["messages", "tasks"], version="v2"):
    if part["type"] == "messages":
        yield_to_frontend(part["data"])
    elif part["type"] == "tasks":
        log_to_prometheus(part["data"])

业务可观测性——运营知道 agent 哪个节点慢、哪个 tool 常失败。

Week 6(接审计)

async for part in graph.astream(input, stream_mode=["messages", "tasks", "checkpoints"], version="v2"):
    # ... 同上 ...
    elif part["type"] == "checkpoints":
        await kafka_produce("agent-audit", part["data"])

每个 checkpoint 到审计系统——合规场景 ready、事后可以完整重放。

Week 12(专业化 observability)

async for part in graph.astream(input, stream_mode=["messages"], version="v2"):
    # 自定义 StreamMessagesHandler 子类、加密、多 backend 分发
    for handler in self.handlers:
        await handler.process(part)

企业级 observability 栈——几个不同系统各自从 LangGraph stream 取所需。

四周从 MVP 到企业生产——每一次演进只加几行代码、不推翻前面的工作。这就是 progressive disclosure 的真实价值——系统能陪着用户的需求一起长大

13.17 本章的方法论贡献——如何读流式系统的源码

读完本章你应该有一种新的技能——如何评估任何 “streaming / event system” 库的质量。下面是一个 checklist:

① 事件类型有无明确标签?——直接 any data 还是带 type 字段?前者消费者要靠 context 推断、后者 type-safe。

② 事件层级有没有 namespace?——嵌套场景能不能追踪来源?没有的话、子图/子模块的事件会混成一团。

③ 顺序是否保证?——同一 channel 的事件是否按发生顺序到达?有没有 run_inline=True 或等价机制?

④ 有没有消息去重?——消息可能因重新发送、cleanup、retry 等重复触发——有没有 ID-based dedup?

⑤ 有没有背压?——producer 比 consumer 快时、buffer 会不会无限增长?

⑥ 有没有过滤 tag?——能不能对某些节点/事件说”我不要流式化这个”?

⑦ 生命周期清理配对完整吗?——on_start / on_end / on_error 都清理资源?

⑧ mapper 和 emitter 解耦吗?——事件生成逻辑和发送逻辑有没有分开?方便加新事件类型。

⑨ 类型系统有多精确?——StreamPart 用 TypedDict + type tag 可以精确类型窄化、vs 使用 dict[str, Any] 完全丢类型。

⑩ 版本策略——API 演进是否平滑?——能不能支持 v1 和 v2 并存?用户能渐进迁移吗?

LangGraph 的 stream 系统在这 10 条上几乎全部命中——这是它作为事实标准的技术凭据。下次你评估任何其他 streaming lib(Kafka Streams、RxJS、各种 AI framework 的 event system)——用这 10 条对比、就能立刻看出短板和 strong point。