LangChain 设计与实现

第3章 Runnable 与 LCEL 表达式语言

作者 杨艺韬 · 9,445 字

第3章 Runnable 与 LCEL 表达式语言

本章基于 LangChain 1.0.3 / langchain-core 1.2.26 源码分析。核心源码文件:libs/core/langchain_core/runnables/base.py(约 6200 行)。

如果说 LangChain 是一座建筑,那么 Runnable 就是它的钢筋骨架,而 LCEL(LangChain Expression Language)就是将钢筋拼接成结构的焊接方式。本章将从源码层面彻底剖析这两个核心概念。

我们将逐一拆解 Runnable 协议的每一个方法,深入 | 操作符背后的魔法,然后详细分析六个核心组合原语——RunnableSequenceRunnableParallelRunnableLambdaRunnableBranchRunnablePassthroughRunnableBinding——它们各自的设计动机、实现细节和使用场景。

本章要点

  • Runnable 协议invoke/ainvoke/batch/abatch/stream/astream 六大方法的接口设计与默认实现策略
  • | 操作符实现__or____ror__ 如何将 Python 管道语法转化为 RunnableSequence
  • RunnableSequence:管道组合的核心,first/middle/last 的类型安全设计,流式传播的 _transform 实现
  • RunnableParallel:并行执行的实现,ThreadPoolExecutor 的使用,字典字面量的自动转换
  • RunnableLambda:函数到 Runnable 的桥梁,类型推断、依赖追踪、@chain 装饰器
  • RunnableBranch / RunnablePassthrough / RunnableBinding:条件分支、数据透传、参数绑定的实现
  • RunnableConfig:配置在管道中的流转机制,ensure_config / patch_config / set_config_context 三位一体

3.1 Runnable 协议:万物皆可 Runnable

Runnable 是定义在 libs/core/langchain_core/runnables/base.py 中的抽象基类,它是 LangChain 中最核心的抽象。理解它的设计,等于理解了 LangChain 的一半。

3.1.1 泛型签名与抽象方法

# 源码文件:libs/core/langchain_core/runnables/base.py (第124行)

class Runnable(ABC, Generic[Input, Output]):
    """A unit of work that can be invoked, batched, streamed,
    transformed and composed."""

    name: str | None

Runnable 有两个泛型参数:InputOutput。这使得类型检查器可以在编译时验证管道的数据流是否类型正确。例如,一个 Runnable[str, int] 只能通过 | 连接到一个 Runnable[int, ...]

在整个 Runnable 类中,只有一个方法是真正的抽象方法:

@abstractmethod
def invoke(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any,
) -> Output:
    """Transform a single input into an output."""

这意味着,要实现一个 Runnable,你只需要实现 invoke,其他所有方法都有基于 invoke 的默认实现。 这是一个非常务实的设计——降低了接入门槛,同时允许性能敏感的子类覆写特定方法。

3.1.2 六大方法的默认实现策略

graph TB
    subgraph "同步方法"
        invoke["invoke(input)<br/>[抽象方法]"]
        batch["batch(inputs)<br/>默认: 线程池并行调用 invoke"]
        stream["stream(input)<br/>默认: yield invoke(input)"]
        transform["transform(inputs)<br/>默认: 累积输入后 invoke"]
    end

    subgraph "异步方法"
        ainvoke["ainvoke(input)<br/>默认: run_in_executor(invoke)"]
        abatch["abatch(inputs)<br/>默认: asyncio.gather(ainvoke)"]
        astream["astream(input)<br/>默认: yield await ainvoke(input)"]
        atransform["atransform(inputs)<br/>默认: 累积输入后 ainvoke"]
    end

    invoke -.->|"线程池执行"| ainvoke
    invoke -.->|"并行执行"| batch
    invoke -.->|"单值 yield"| stream
    ainvoke -.->|"gather"| abatch
    ainvoke -.->|"单值 yield"| astream

    style invoke fill:#4CAF50,color:#fff
    style ainvoke fill:#2196F3,color:#fff

让我们逐一分析这些默认实现:

ainvoke 默认实现:线程池委托

# 源码文件:libs/core/langchain_core/runnables/base.py (第844行)

async def ainvoke(self, input, config=None, **kwargs):
    return await run_in_executor(config, self.invoke, input, config, **kwargs)

run_in_executor 使用 asyncio.get_running_loop().run_in_executor() 将同步方法放入线程池执行。这意味着即使你的 Runnable 没有原生的异步实现,ainvoke 也能在 async 上下文中安全使用(不会阻塞事件循环)。性能敏感的子类(如直接调用异步 HTTP API 的 ChatOpenAI)会覆写为原生异步实现。

batch 默认实现:线程池并行

# 源码文件:libs/core/langchain_core/runnables/base.py (第867行)

def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
    if not inputs:
        return []

    configs = get_config_list(config, len(inputs))

    def invoke(input_, config):
        if return_exceptions:
            try:
                return self.invoke(input_, config, **kwargs)
            except Exception as e:
                return e
        else:
            return self.invoke(input_, config, **kwargs)

    if len(inputs) == 1:
        return [invoke(inputs[0], configs[0])]

    with get_executor_for_config(configs[0]) as executor:
        return list(executor.map(invoke, inputs, configs))

注意两个设计细节:

stream 默认实现:单值 yield

# 源码文件:libs/core/langchain_core/runnables/base.py (第1130行)

def stream(self, input, config=None, **kwargs):
    yield self.invoke(input, config, **kwargs)

默认实现只是 yield 一次 invoke 的结果。这意味着默认情况下 stream 并不是真正的流式——它等价于 invoke。只有覆写了 stream(或 transform)的子类才有真正的流式能力。这就是为什么 LLM 类会覆写 stream 来逐 token 输出。

3.1.3 Schema 推断机制

Runnable 提供了自动的输入/输出 schema 推断:

# 源码文件:libs/core/langchain_core/runnables/base.py (第300行)

@property
def InputType(self) -> type[Input]:
    # 首先从 Pydantic 的 __pydantic_generic_metadata__ 获取
    for base in self.__class__.mro():
        if hasattr(base, "__pydantic_generic_metadata__"):
            metadata = base.__pydantic_generic_metadata__
            if "args" in metadata and len(metadata["args"]) == 2:
                return metadata["args"][0]

    # 回退到 __orig_bases__ (非 Pydantic 的 Runnable)
    for cls in self.__class__.__orig_bases__:
        type_args = get_args(cls)
        if type_args and len(type_args) == 2:
            return type_args[0]

    raise TypeError(f"Runnable {self.get_name()} doesn't have an inferable InputType.")

这段代码的精妙之处在于它处理了两种不同的情况:

input_schema 属性进一步将 InputType 包装为 Pydantic BaseModel,使其可以生成 JSON Schema:

@property
def input_schema(self) -> type[BaseModel]:
    return self.get_input_schema()

def get_input_schema(self, config=None):
    root_type = self.InputType
    if inspect.isclass(root_type) and issubclass(root_type, BaseModel):
        return root_type
    return create_model_v2(self.get_name("Input"), root=root_type)

3.2 管道操作符 | 的实现

LCEL 的核心语法是 | 操作符。在 Python 中,a | b 会尝试调用 a.__or__(b);如果返回 NotImplemented,则尝试 b.__ror__(a)

3.2.1 __or__:正向管道

# 源码文件:libs/core/langchain_core/runnables/base.py (第618行)

def __or__(
    self,
    other: Runnable[Any, Other]
        | Callable[[Iterator[Any]], Iterator[Other]]
        | Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
        | Callable[[Any], Other]
        | Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]:
    return RunnableSequence(self, coerce_to_runnable(other))

注意 other 参数的类型签名——它接受五种类型:

  1. Runnable:直接使用
  2. 同步生成器函数:包装为 RunnableGenerator
  3. 异步生成器函数:包装为 RunnableGenerator
  4. 普通可调用对象:包装为 RunnableLambda
  5. 字典:包装为 RunnableParallel

3.2.2 __ror__:反向管道

# 源码文件:libs/core/langchain_core/runnables/base.py (第639行)

def __ror__(
    self,
    other: Runnable[Other, Any] | Callable[[Other], Any]
        | Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]:
    return RunnableSequence(coerce_to_runnable(other), self)

__ror__ 使得非 Runnable 对象可以出现在 | 的左侧。例如:

# 字典字面量在左侧,触发右侧 Runnable 的 __ror__
chain = {"context": retriever, "question": RunnablePassthrough()} | prompt
# 等价于
chain = RunnableParallel(context=retriever, question=RunnablePassthrough()) | prompt

3.2.3 coerce_to_runnable:万物转换

# 源码文件:libs/core/langchain_core/runnables/base.py (第6176行)

def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
    if isinstance(thing, Runnable):
        return thing
    if is_async_generator(thing) or inspect.isgeneratorfunction(thing):
        return RunnableGenerator(thing)
    if callable(thing):
        return RunnableLambda(cast(..., thing))
    if isinstance(thing, dict):
        return cast(..., RunnableParallel(thing))
    raise TypeError(f"Expected a Runnable, callable or dict. "
                    f"Instead got an unsupported type: {type(thing)}")

这个函数是 LCEL 灵活性的关键——它使得 | 操作符的两侧可以接受各种 Python 对象,而不仅仅是 Runnable 实例。

graph TD
    INPUT["coerce_to_runnable(thing)"]

    INPUT --> C1{是 Runnable?}
    C1 -->|是| R1["直接返回"]

    C1 -->|否| C2{是生成器函数?}
    C2 -->|是| R2["RunnableGenerator(thing)<br/>保留流式能力"]

    C2 -->|否| C3{是 callable?}
    C3 -->|是| R3["RunnableLambda(thing)<br/>包装为 Runnable"]

    C3 -->|否| C4{是 dict?}
    C4 -->|是| R4["RunnableParallel(thing)<br/>并行执行"]

    C4 -->|否| R5["抛出 TypeError"]

    style INPUT fill:#e8f5e9
    style R1 fill:#c8e6c9
    style R2 fill:#c8e6c9
    style R3 fill:#c8e6c9
    style R4 fill:#c8e6c9
    style R5 fill:#ffcdd2

3.3 RunnableSequence:管道的骨架

RunnableSequence 是 LCEL 中最重要的组合原语。每一次 | 操作都会创建一个 RunnableSequence(或将步骤追加到已有的 RunnableSequence)。

3.3.1 构造与展平

# 源码文件:libs/core/langchain_core/runnables/base.py (第2911行)

class RunnableSequence(RunnableSerializable[Input, Output]):
    first: Runnable[Input, Any]
    middle: list[Runnable[Any, Any]] = Field(default_factory=list)
    last: Runnable[Any, Output]

    def __init__(self, *steps: RunnableLike, name=None, first=None, middle=None, last=None):
        steps_flat: list[Runnable] = []
        if not steps and first is not None and last is not None:
            steps_flat = [first] + (middle or []) + [last]
        for step in steps:
            if isinstance(step, RunnableSequence):
                steps_flat.extend(step.steps)  # 关键:展平嵌套
            else:
                steps_flat.append(coerce_to_runnable(step))
        super().__init__(
            first=steps_flat[0],
            middle=list(steps_flat[1:-1]),
            last=steps_flat[-1],
            name=name,
        )

展平的重要性:当你写 (a | b) | c 时,a | b 先创建一个 RunnableSequence(a, b),然后 | c 时检测到左操作数是 RunnableSequence,会将其展平为 RunnableSequence(a, b, c),而不是 RunnableSequence(RunnableSequence(a, b), c)。这避免了不必要的嵌套,保持了执行图的扁平。

RunnableSequence 还覆写了 __or____ror__ 来优化这个展平过程:

# 源码文件:libs/core/langchain_core/runnables/base.py (第3077行)

def __or__(self, other):
    if isinstance(other, RunnableSequence):
        return RunnableSequence(
            self.first, *self.middle, self.last,
            other.first, *other.middle, other.last,
            name=self.name or other.name,
        )
    return RunnableSequence(
        self.first, *self.middle, self.last,
        coerce_to_runnable(other),
        name=self.name,
    )

当两个 RunnableSequence 通过 | 连接时,直接将两者的步骤列表拼接,而不是嵌套。这使得无论你写多长的管道,内部都只有一层 RunnableSequence

3.3.2 invoke 的实现

# 源码文件:libs/core/langchain_core/runnables/base.py (第3131行)

def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(
        None, input,
        name=config.get("run_name") or self.get_name(),
        run_id=config.pop("run_id", None),
    )
    input_ = input

    try:
        for i, step in enumerate(self.steps):
            config = patch_config(
                config,
                callbacks=run_manager.get_child(f"seq:step:{i + 1}")
            )
            with set_config_context(config) as context:
                if i == 0:
                    input_ = context.run(step.invoke, input_, config, **kwargs)
                else:
                    input_ = context.run(step.invoke, input_, config)
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    else:
        run_manager.on_chain_end(input_)
        return cast("Output", input_)

逐行分析:

  1. ensure_config:初始化配置,合并 ContextVar 中的父级配置
  2. callback_manager.on_chain_start:通知追踪系统"序列开始执行"
  3. 循环执行每个步骤
    • patch_config:为当前步骤创建子级回调(seq:step:1seq:step:2 ...)
    • set_config_context:将配置写入 ContextVar,创建隔离的执行上下文
    • context.run(step.invoke, ...):在隔离上下文中执行步骤
    • 第一步传递 **kwargs,后续步骤不传递(kwargs 只对首步有效)
  4. 错误处理:捕获 BaseException(包括 KeyboardInterrupt),上报给追踪系统

3.3.3 stream 与 _transform

RunnableSequence.stream 的实现是整个 LCEL 流式能力的核心:

# 源码文件:libs/core/langchain_core/runnables/base.py (第3528行)

def stream(self, input, config=None, **kwargs):
    yield from self.transform(iter([input]), config, **kwargs)

stream 将单个输入包装为迭代器,然后委托给 transform。而 transform 调用内部的 _transform

# 源码文件:libs/core/langchain_core/runnables/base.py (第3465行)

def _transform(self, inputs, run_manager, config, **kwargs):
    steps = [self.first, *self.middle, self.last]
    final_pipeline = cast("Iterator[Output]", inputs)
    for idx, step in enumerate(steps):
        config = patch_config(
            config, callbacks=run_manager.get_child(f"seq:step:{idx + 1}")
        )
        if idx == 0:
            final_pipeline = step.transform(final_pipeline, config, **kwargs)
        else:
            final_pipeline = step.transform(final_pipeline, config)
    yield from final_pipeline

这段代码实现了惰性管道串联:它并不立即执行任何步骤,而是将每个步骤的 transform 方法串联成一个嵌套的迭代器链。只有当最终消费者(yield from)开始拉取数据时,数据才开始逐步流经管道。

graph LR
    subgraph "stream 管道"
        I["输入: iter([input])"] --> T1["step1.transform()"]
        T1 --> T2["step2.transform()"]
        T2 --> T3["step3.transform()"]
        T3 --> O["yield from<br/>输出流"]
    end

    subgraph "惰性求值"
        direction TB
        N1["消费者拉取"] --> N2["step3 向 step2 请求"]
        N2 --> N3["step2 向 step1 请求"]
        N3 --> N4["step1 从输入迭代器读取"]
    end

    style I fill:#e8f5e9
    style O fill:#e8f5e9

3.3.4 batch 的优化实现

RunnableSequence 覆写了 batch,实现了"按步骤批量"而非"按输入逐个"的执行策略:

# 源码文件:libs/core/langchain_core/runnables/base.py (第3207行)

def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
    # ... 设置 callback_managers 和 run_managers ...

    # 关键:对每个步骤调用 batch,而非对整个序列多次 invoke
    for i, step in enumerate(self.steps):
        inputs = step.batch(
            inputs,
            [
                patch_config(config, callbacks=rm.get_child(f"seq:step:{i + 1}"))
                for rm, config in zip(run_managers, configs)
            ],
            return_exceptions=return_exceptions,
            **(kwargs if i == 0 else {}),
        )
    # ...

这个设计的好处是:如果某个步骤(如 ChatOpenAI)有原生的批量 API,它可以在一次 HTTP 请求中处理多个输入,而不是发送多次单独的请求。这在实际应用中可以带来数倍的性能提升。

3.4 RunnableParallel:分叉与汇聚

RunnableParallel 实现了数据的"一进多出"——将同一个输入分发给多个 Runnable 并行执行,然后将各自的输出收集到一个字典中。

3.4.1 构造方式

RunnableParallel 支持三种构造方式:

# 方式1:字典参数
parallel = RunnableParallel({"key1": runnable1, "key2": runnable2})

# 方式2:关键字参数
parallel = RunnableParallel(key1=runnable1, key2=runnable2)

# 方式3:在管道中使用字典字面量(通过 coerce_to_runnable 自动转换)
chain = some_runnable | {"key1": runnable1, "key2": runnable2}

内部实现:

# 源码文件:libs/core/langchain_core/runnables/base.py (第3651行)

class RunnableParallel(RunnableSerializable[Input, dict[str, Any]]):
    steps__: Mapping[str, Runnable[Input, Any]]

    def __init__(self, steps__=None, **kwargs):
        merged = {**steps__} if steps__ is not None else {}
        merged.update(kwargs)
        super().__init__(
            steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
        )

注意字段名 steps__ 使用了双下划线后缀——这是为了避免与用户传入的关键字参数冲突。如果字段名是 steps,那么 RunnableParallel(steps=some_runnable) 就会产生歧义。

3.4.2 invoke 的并行执行

# 源码文件:libs/core/langchain_core/runnables/base.py (第3834行)

def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)
    callback_manager = CallbackManager.configure(
        inheritable_callbacks=config.get("callbacks"), ...
    )
    run_manager = callback_manager.on_chain_start(None, input, ...)

    def _invoke_step(step, input_, config, key):
        child_config = patch_config(
            config,
            callbacks=run_manager.get_child(f"map:key:{key}"),
        )
        with set_config_context(child_config) as context:
            return context.run(step.invoke, input_, child_config)

    try:
        steps = dict(self.steps__)
        with get_executor_for_config(config) as executor:
            futures = [
                executor.submit(_invoke_step, step, input, config, key)
                for key, step in steps.items()
            ]
            output = {
                key: future.result()
                for key, future in zip(steps, futures)
            }
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    else:
        run_manager.on_chain_end(output)
        return output

核心执行逻辑:

  1. 使用 ThreadPoolExecutor 并行提交所有步骤
  2. 每个步骤接收相同的 input,但各自拥有独立的子回调管理器(map:key:xxx
  3. 收集所有 Future.result(),组装成字典返回

3.4.3 Schema 合并

RunnableParallel 的输入 schema 是所有子步骤输入 schema 的合并:

# 源码文件:libs/core/langchain_core/runnables/base.py (第3722行)

def get_input_schema(self, config=None):
    if all(
        s.get_input_schema(config).model_json_schema().get("type", "object") == "object"
        for s in self.steps__.values()
    ):
        # 所有子步骤都接受对象输入时,合并它们的字段
        return create_model_v2(
            self.get_name("Input"),
            field_definitions={
                k: (v.annotation, v.default)
                for step in self.steps__.values()
                for k, v in step.get_input_schema(config).model_fields.items()
                if k != "__root__"
            },
        )
    return super().get_input_schema(config)

这意味着,如果并行步骤中的 Runnable A 需要字段 "question",Runnable B 需要字段 "context",那么 RunnableParallel 的输入 schema 将包含 "question""context" 两个字段。

graph TB
    subgraph "RunnableParallel 执行模型"
        INPUT["输入: {question: '...', context: '...'}"]

        INPUT -->|同一输入| S1["步骤 A: retriever"]
        INPUT -->|同一输入| S2["步骤 B: RunnablePassthrough"]
        INPUT -->|同一输入| S3["步骤 C: some_transform"]

        S1 -->|"context: [doc1, doc2]"| OUTPUT
        S2 -->|"question: '...'"| OUTPUT
        S3 -->|"extra: '...'"| OUTPUT

        OUTPUT["输出字典:<br/>{context: [...], question: '...', extra: '...'}"]
    end

    subgraph "线程池"
        direction LR
        T1["Thread 1: A.invoke()"]
        T2["Thread 2: B.invoke()"]
        T3["Thread 3: C.invoke()"]
    end

    S1 -.-> T1
    S2 -.-> T2
    S3 -.-> T3

    style INPUT fill:#e8f5e9
    style OUTPUT fill:#e8f5e9

3.5 RunnableLambda:函数的桥梁

RunnableLambda 是将普通 Python 函数接入 Runnable 生态的桥梁。它的设计比看起来复杂得多——需要处理类型推断、同步/异步兼容、依赖追踪等问题。

3.5.1 构造与同步/异步检测

# 源码文件:libs/core/langchain_core/runnables/base.py (第4579行)

class RunnableLambda(Runnable[Input, Output]):

    def __init__(self, func, afunc=None, name=None):
        if afunc is not None:
            self.afunc = afunc
            func_for_name = afunc

        if is_async_callable(func) or is_async_generator(func):
            if afunc is not None:
                raise TypeError(
                    "Func was provided as a coroutine function, but afunc was "
                    "also provided. If providing both, func should be a regular "
                    "function to avoid ambiguity."
                )
            self.afunc = func
            func_for_name = func
        elif callable(func):
            self.func = cast(..., func)
            func_for_name = func
        else:
            raise TypeError(...)

        # 自动从函数名推断 Runnable 名称
        try:
            if name is not None:
                self.name = name
            elif func_for_name.__name__ != "<lambda>":
                self.name = func_for_name.__name__
        except AttributeError:
            pass

关键设计:

3.5.2 类型推断

RunnableLambda 从函数签名中推断输入和输出类型:

# 源码文件:libs/core/langchain_core/runnables/base.py (第4654行)

@property
def InputType(self):
    func = getattr(self, "func", None) or self.afunc
    try:
        params = inspect.signature(func).parameters
        first_param = next(iter(params.values()), None)
        if first_param and first_param.annotation != inspect.Parameter.empty:
            return first_param.annotation
    except ValueError:
        pass
    return Any

@property
def OutputType(self):
    func = getattr(self, "func", None) or self.afunc
    try:
        sig = inspect.signature(func)
        if sig.return_annotation != inspect.Signature.empty:
            # 解包迭代器类型:Iterator[X] -> X
            if getattr(sig.return_annotation, "__origin__", None) in {
                collections.abc.Iterator,
                collections.abc.AsyncIterator,
            }:
                return getattr(sig.return_annotation, "__args__", (Any,))[0]
            return sig.return_annotation
    except ValueError:
        pass
    return Any

一个巧妙的细节:如果函数返回 Iterator[X]AsyncIterator[X]OutputType 会自动解包为 X。这是因为生成器函数的"输出"是每个 yield 的值,而非迭代器本身。

3.5.3 invoke 的实现

# 源码文件:libs/core/langchain_core/runnables/base.py (第4997行)

def invoke(self, input, config=None, **kwargs):
    if hasattr(self, "func"):
        return self._call_with_config(
            self._invoke,
            input,
            ensure_config(config),
            **kwargs,
        )
    raise TypeError("Cannot invoke a coroutine function synchronously.")

_call_with_configRunnable 基类提供的模板方法,它包装了追踪逻辑。实际的函数调用发生在 _invoke 中(通过 call_func_with_variable_args 自动适配函数签名)。

3.5.4 依赖追踪

RunnableLambda 有一个被低估的能力——自动追踪闭包中引用的其他 Runnable

# 源码文件:libs/core/langchain_core/runnables/base.py (第4763行)

@functools.cached_property
def deps(self) -> list[Runnable]:
    if hasattr(self, "func"):
        objects = get_function_nonlocals(self.func)
    elif hasattr(self, "afunc"):
        objects = get_function_nonlocals(self.afunc)
    else:
        objects = []

    deps: list[Runnable] = []
    for obj in objects:
        if isinstance(obj, Runnable):
            deps.append(obj)
        elif isinstance(getattr(obj, "__self__", None), Runnable):
            deps.append(obj.__self__)
    return deps

get_function_nonlocals 检查函数闭包(__closure__)中的非局部变量,如果其中包含 Runnable 对象,就将它们记录为依赖。这使得图可视化工具能正确显示 RunnableLambda 与其闭包引用的 Runnable 之间的关系。

3.5.5 @chain 装饰器

@chain 装饰器是 RunnableLambda 的语法糖,使函数定义更具声明性:

# 源码文件:libs/core/langchain_core/runnables/base.py (第6227行)

def chain(func):
    """Decorate a function to make it a Runnable.
    Sets the name of the Runnable to the name of the function."""
    return RunnableLambda(func)

使用示例:

from langchain_core.runnables import chain

@chain
def format_output(text: str) -> dict:
    return {"formatted": text.strip().upper()}

# format_output 现在是一个 Runnable,名称为 "format_output"
chain = some_llm | format_output

3.6 RunnableBranch:条件分支

RunnableBranch 实现了"if-elif-else"风格的条件路由。

3.6.1 结构设计

# 源码文件:libs/core/langchain_core/runnables/branch.py (第42行)

class RunnableBranch(RunnableSerializable[Input, Output]):
    branches: Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]
    default: Runnable[Input, Output]

    def __init__(self, *branches):
        if len(branches) < 2:
            raise ValueError("RunnableBranch requires at least two branches")

        default = branches[-1]  # 最后一个参数是 default 分支
        default_ = coerce_to_runnable(default)

        branches_ = []
        for branch in branches[:-1]:
            condition, runnable = branch
            condition = coerce_to_runnable(condition)
            runnable = coerce_to_runnable(runnable)
            branches_.append((condition, runnable))

        super().__init__(branches=branches_, default=default_)

构造参数是一系列 (条件, Runnable) 元组,最后一个参数是默认分支。条件本身也是 Runnable(或函数,会被 coerce_to_runnable 包装)。

3.6.2 invoke 的实现

# 源码文件:libs/core/langchain_core/runnables/branch.py (第189行)

def invoke(self, input, config=None, **kwargs):
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(
        None, input, name=config.get("run_name") or self.get_name(), ...
    )

    try:
        for idx, branch in enumerate(self.branches):
            condition, runnable = branch
            expression_value = condition.invoke(
                input,
                config=patch_config(
                    config,
                    callbacks=run_manager.get_child(f"branch:{idx + 1}:condition:"),
                ),
            )
            if expression_value:
                output = runnable.invoke(
                    input,
                    config=patch_config(
                        config,
                        callbacks=run_manager.get_child(f"branch:{idx + 1}:then:"),
                    ),
                    **kwargs,
                )
                break
        else:
            output = self.default.invoke(
                input,
                config=patch_config(
                    config, callbacks=run_manager.get_child("branch:default:"),
                ),
                **kwargs,
            )
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise
    run_manager.on_chain_end(output)
    return output

执行逻辑清晰:依次评估每个条件,第一个为 True 的条件对应的 Runnable 被执行;如果所有条件都为 False,执行默认分支。注意 Python 的 for-else 语法——else 块在循环没有被 break 中断时执行。

graph TD
    INPUT["输入数据"]

    INPUT --> C1{"条件1: isinstance(x, str)?"}
    C1 -->|True| B1["分支1: x.upper()"]
    C1 -->|False| C2{"条件2: isinstance(x, int)?"}
    C2 -->|True| B2["分支2: x + 1"]
    C2 -->|False| C3{"条件3: isinstance(x, float)?"}
    C3 -->|True| B3["分支3: x * 2"]
    C3 -->|False| BD["默认分支: 'goodbye'"]

    B1 --> OUTPUT["输出"]
    B2 --> OUTPUT
    B3 --> OUTPUT
    BD --> OUTPUT

    style INPUT fill:#e8f5e9
    style OUTPUT fill:#e8f5e9
    style BD fill:#fff3e0

3.7 RunnablePassthrough:数据透传与增强

RunnablePassthrough 看起来像是一个什么都不做的组件,但它在 LCEL 管道中扮演着至关重要的角色——透传原始数据的同时,允许执行副作用或增强字段。

3.7.1 基本透传

# 源码文件:libs/core/langchain_core/runnables/passthrough.py (第74行)

class RunnablePassthrough(RunnableSerializable[Other, Other]):
    func: Callable | None = None    # 可选的副作用函数
    afunc: Callable | None = None   # 可选的异步副作用函数

    def invoke(self, input, config=None, **kwargs):
        # 如果有副作用函数,先执行它
        if self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        # 然后透传原始输入
        return self._call_with_config(identity, input, config)

核心设计:无论副作用函数做了什么,返回值始终是原始输入。 这使得 RunnablePassthrough 成为了一个"观察点"——你可以在管道中插入日志、统计等副作用,而不影响数据流。

3.7.2 assign:字段增强

RunnablePassthrough.assign 是 LCEL 中最实用的方法之一:

# 源码文件:libs/core/langchain_core/runnables/passthrough.py (第207行)

@classmethod
def assign(cls, **kwargs) -> RunnableAssign:
    return RunnableAssign(RunnableParallel[dict[str, Any]](kwargs))

assign 创建了一个 RunnableAssign,它在保留原始输入字典所有字段的基础上,添加(或覆盖)新的字段。这在 RAG 管道中极为常见:

# 典型 RAG 管道
chain = (
    RunnablePassthrough.assign(
        context=lambda x: retriever.invoke(x["question"])
    )
    | prompt
    | model
    | StrOutputParser()
)

# 输入: {"question": "什么是LangChain?"}
# assign 后: {"question": "什么是LangChain?", "context": [doc1, doc2, ...]}

3.7.3 RunnablePick:字段选择

assign 相对,pick 用于从字典中选择特定字段:

# 用法
chain = some_parallel | RunnablePick(["key1", "key2"])
# 等价于
chain = some_parallel.pick(["key1", "key2"])

RunnablePickRunnable.pick 方法的底层实现,它从字典输入中提取指定的键值对。

3.8 RunnableBinding:参数绑定与配置固化

RunnableBinding 通过装饰器模式为 Runnable 附加额外的参数或配置。

3.8.1 结构设计

# 源码文件:libs/core/langchain_core/runnables/base.py (第5530行)

class RunnableBindingBase(RunnableSerializable[Input, Output]):
    bound: Runnable[Input, Output]
    kwargs: Mapping[str, Any] = Field(default_factory=dict)
    config: RunnableConfig = Field(default_factory=RunnableConfig)
    config_factories: list[Callable[[RunnableConfig], RunnableConfig]] = Field(
        default_factory=list
    )
    custom_input_type: Any | None = None
    custom_output_type: Any | None = None

RunnableBindingBase 持有一个 bound(被包装的 Runnable)和一组固定的 kwargs。每次调用 invoke 时,这些 kwargs 会自动传递给 bound.invoke

# 源码文件:libs/core/langchain_core/runnables/base.py (第5689行)

def invoke(self, input, config=None, **kwargs):
    return self.bound.invoke(
        input,
        self._merge_configs(config),
        **{**self.kwargs, **kwargs},
    )

3.8.2 常见用法

RunnableBinding 通常通过 Runnable.bind 方法创建:

# 为 LLM 固定 temperature 参数
model_creative = model.bind(temperature=0.9)
model_precise = model.bind(temperature=0.1)

# 为 LLM 绑定工具
model_with_tools = model.bind(tools=[search_tool, calculator_tool])

# 固定配置
chain_tagged = chain.with_config({"tags": ["production"], "metadata": {"version": "1.0"}})

3.9 RunnableConfig 的完整生命周期

让我们通过一个完整的例子,跟踪 RunnableConfig 在一次管道执行中的完整生命周期:

chain = prompt | model | parser
chain.invoke(
    {"question": "Hello"},
    config={"tags": ["user-request"], "metadata": {"user_id": "123"}}
)
sequenceDiagram
    participant User as 用户
    participant Seq as RunnableSequence
    participant EC as ensure_config
    participant PC as patch_config
    participant SC as set_config_context
    participant CV as ContextVar
    participant P as Prompt
    participant M as Model
    participant Pa as Parser

    User->>Seq: invoke(input, config={tags:["user-request"]})
    Seq->>EC: ensure_config(config)
    Note over EC: 1. 创建默认配置<br/>2. 从ContextVar继承<br/>3. 用用户config覆盖
    EC-->>Seq: full_config

    Seq->>PC: patch_config(config, callbacks=child_1)
    Note over PC: 替换callbacks为seq:step:1
    PC-->>Seq: step1_config

    Seq->>SC: set_config_context(step1_config)
    SC->>CV: 写入ContextVar
    Note over SC: copy_context()
    Seq->>P: prompt.invoke(input, step1_config)
    Note over P: 内部ensure_config<br/>自动继承step1_config
    P-->>Seq: prompt_output

    Seq->>PC: patch_config(config, callbacks=child_2)
    Seq->>SC: set_config_context(step2_config)
    SC->>CV: 写入ContextVar
    Seq->>M: model.invoke(prompt_output, step2_config)
    M-->>Seq: model_output

    Seq->>PC: patch_config(config, callbacks=child_3)
    Seq->>SC: set_config_context(step3_config)
    SC->>CV: 写入ContextVar
    Seq->>Pa: parser.invoke(model_output, step3_config)
    Pa-->>Seq: final_output

    Seq-->>User: final_output

关键观察:

  1. tags 和 metadata 贯穿始终:用户设置的 tagsmetadata 通过 ensure_configCOPIABLE_KEYS 机制,在每一步的 config 中都存在(通过 copy 避免共享引用)
  2. callbacks 逐步替换:每一步的 callbackspatch_config 替换为该步骤特有的子回调管理器,使得追踪系统能构建正确的父子关系树
  3. ContextVar 提供隐式传播:即使某些内部调用没有显式传递 config,ensure_config 也能从 ContextVar 中恢复父级配置

3.10 RunnableWithFallbacks:优雅降级

虽然不是 LCEL 的核心组合原语,RunnableWithFallbacks 是生产环境中不可或缺的容错机制:

# 源码文件:libs/core/langchain_core/runnables/fallbacks.py (第36行)

class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
    runnable: Runnable[Input, Output]
    fallbacks: Sequence[Runnable[Input, Output]]
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
    exception_key: str | None = None

执行逻辑:先尝试 runnable,如果抛出 exceptions_to_handle 中的异常,依次尝试 fallbacks 中的 Runnable,直到有一个成功或全部失败。

# 使用示例
model = ChatOpenAI(model="gpt-4o").with_fallbacks(
    [ChatAnthropic(model="claude-sonnet-4-6"), ChatOllama(model="llama3")]
)
# GPT-4o 故障时自动降级到 Claude,Claude 故障时降级到本地 Ollama

3.10.1 invoke 实现里的四条设计约束

上面的 4 个字段看起来平凡,但 fallbacks.py:166invoke 实现藏着四条生产环境必须了解的约束:

# fallbacks.py:187-213 主循环
first_error = None
last_error = None
for runnable in self.runnables:
    try:
        if self.exception_key and last_error is not None:
            input[self.exception_key] = last_error        # ← ①
        ...
        output = context.run(runnable.invoke, input, config, **kwargs)
    except self.exceptions_to_handle as e:                # ← ②
        if first_error is None:
            first_error = e
        last_error = e
    except BaseException as e:                            # ← ③
        run_manager.on_chain_error(e)
        raise
    else:
        return output
# 全部失败:抛 first_error
run_manager.on_chain_error(first_error)
raise first_error                                         # ← ④

exception_key 是一个反馈通道、不是标签。如果设了 exception_key="last_error",当 fallback #1 也失败时、框架会在调用 fallback #2 前把 input["last_error"] = fallback_1_的异常对象 写进输入。下一级的 prompt 模板可以读这个字段做自我修正——比如给 LLM 提示"上一个模型因为 RateLimit 失败、请用更短的回答"。这让多级 fallback 不只是"换个模型重试"、而是"带着失败原因换一个模型问"的升级尝试。这个字段的使用前提是 input 必须是 dict(line 169-174 有守卫、不是 dict 直接 ValueError),所以 .with_fallbacks(..., exception_key="err") 只能搭配字典类型的输入使用。

exceptions_to_handle 精确圈定可接管的异常。默认 (Exception,) 会接所有 Exception 子类、但不接 BaseException ——也就是说 KeyboardInterruptSystemExitasyncio.CancelledError(Python 3.8+ 把它从 Exception 改成 BaseException 子类)不会被当作"可重试的失败"。这是防生产环境 bug:用户 Ctrl+C 想停进程、或者 task 被取消、或者应用想 graceful exit——不能因为"包了 fallback"就把这些控制信号吞掉继续重试。line 203-205 的 except BaseException 明确把控制流信号放走,fallback 机制只拦业务错误。

③ 控制流异常优先透传,并调 run_manager.on_chain_error(e) 通知 LangSmith trace "这个 run 被中断了"——trace 不会丢失。用户在监控里看到的不是"神秘成功"、而是"这里有个 KeyboardInterrupt、链路被打断"。

④ 全部失败时抛的是 first_error、不是 last_error。这条设计逻辑很关键:生产环境排障时你想问的通常是"为什么主模型失败了"——不是"备用模型也挂了"。如果抛的是 last_error,日志里堆的是"ollama 连不上"或"claude rate limit"——掩盖了真正的主因"gpt-4o 返回了 5xx"。first_error = None; if first_error is None: first_error = e 这个简单记录,让事后 debug 时能直接看见第一张倒下的多米诺骨牌。

callback tracing 的完整性是另一条隐形约束——每次 fallback 调用都走 patch_config(config, callbacks=run_manager.get_child())(line 191)。在 LangSmith 上你会看到一棵 trace 树:根节点是 RunnableWithFallbacks、子节点是"主模型(失败)" + "fallback #1(失败)" + "fallback #2(成功)"——每次尝试都有独立 trace、输入输出完整记录。这对生产环境的事后分析至关重要:你想知道"哪个请求的哪次降级花了多少时间"时,LangSmith 已经替你把每一步都存下来了。

3.11 组合原语的协作模式

让我们通过一个实际的 RAG 管道示例,展示所有组合原语如何协作:

from langchain_core.runnables import (
    RunnablePassthrough, RunnableParallel, RunnableBranch, RunnableLambda
)

# 构建一个带有条件路由和并行检索的 RAG 管道
chain = (
    # 1. RunnableParallel: 并行执行问题分类和上下文检索
    RunnableParallel(
        question=RunnablePassthrough(),
        context=lambda x: retriever.invoke(x["question"]),
        category=lambda x: classifier.invoke(x["question"]),
    )
    # 2. RunnableBranch: 根据问题类别选择不同的 Prompt
    | RunnableBranch(
        (lambda x: x["category"] == "technical", technical_prompt),
        (lambda x: x["category"] == "creative", creative_prompt),
        general_prompt,  # 默认
    )
    # 3. RunnableSequence: 通过管道连接 Model 和 Parser
    | model
    | StrOutputParser()
)
graph TD
    INPUT["用户输入<br/>{question: '...'}"]

    INPUT --> PAR["RunnableParallel"]

    PAR -->|"question"| PP["RunnablePassthrough<br/>透传原始问题"]
    PAR -->|"context"| RET["RunnableLambda<br/>retriever.invoke()"]
    PAR -->|"category"| CLS["RunnableLambda<br/>classifier.invoke()"]

    PP --> MERGE["合并结果"]
    RET --> MERGE
    CLS --> MERGE

    MERGE --> BRANCH["RunnableBranch"]
    BRANCH -->|"category == 'technical'"| TP["technical_prompt"]
    BRANCH -->|"category == 'creative'"| CP["creative_prompt"]
    BRANCH -->|"默认"| GP["general_prompt"]

    TP --> MODEL["ChatModel"]
    CP --> MODEL
    GP --> MODEL

    MODEL --> PARSER["StrOutputParser"]
    PARSER --> OUTPUT["最终文本输出"]

    style INPUT fill:#e8f5e9
    style OUTPUT fill:#e8f5e9
    style PAR fill:#e3f2fd
    style BRANCH fill:#fff3e0

在这个管道中:

3.12 设计决策深度分析

为什么 Runnable 同时是 ABC 和 Generic?

Runnable(ABC, Generic[Input, Output]) 的多继承看似简单,但解决了一个深层问题:如何在不牺牲类型安全的前提下允许多态?

ABC 确保子类必须实现 invoke(通过 @abstractmethod);Generic[Input, Output] 确保管道中的类型能被静态检查器验证。如果只有 ABC,管道的类型检查将退化为 Any;如果只有 Generic,就无法强制实现 invoke

为什么用 Pydantic BaseModel 作为 RunnableSerializable 的基类?

RunnableSerializable 继承了 Serializable(间接继承 Pydantic BaseModel)和 Runnable。这不是多余的——Pydantic 提供了:

但这也带来了限制——RunnableLambda 因为包含不可序列化的 Callable,所以不继承 RunnableSerializable

transform vs stream:为什么需要两个方法?

stream(input) 接受一个单值输入,产生一个输出流;transform(inputs) 接受一个输入流,产生一个输出流。stream 本质上是 transform(iter([input])) 的语法糖。

分离这两个方法的原因是语义清晰性:用户通常调用 stream(单输入),而框架内部的管道串联使用 transform(流到流)。

3.12.1 langchain_core/runnables/ 16 文件 13730 行的真实拆分

把整个 runnables/ 目录按文件大小排序——

文件 角色
base.py 6261 本目录最大、占 45.6%——Runnable ABC + RunnableSequence / RunnableParallel / RunnableLambda / RunnableBinding / RunnableEachBase 等核心组合原语全部塞在一文件
passthrough.py 841 RunnablePassthrough + RunnableAssign + RunnablePick(§3.7)
utils.py 759 类型推断工具、get_unique_config_specs、Schema 推断
graph.py 739 Graph / Node / Edge 数据类 + 图算法
configurable.py 716 RunnableConfigurableFields + RunnableConfigurableAlternatives——§3.8 提到的"配置覆写"实现
fallbacks.py 664 RunnableWithFallbacks(§3.10 主角)
config.py 641 RunnableConfig TypedDict + merge_configs + get_executor_for_config(§3.9 主角)
history.py 622 RunnableWithMessageHistory(ch13 §13.9 主角,见 §13.10.4 实测)
graph_mermaid.py 503 Graph → mermaid 字符串渲染
branch.py 461 RunnableBranch(§3.6 主角)
retry.py 379 RunnableRetry——本章未深入讨论
graph_ascii.py 366 Graph → ASCII 字符画渲染
router.py 239 本章完全没提——RouterRunnable 是 RunnableBranch 的进化版(按 route_name 字段路由、不是按谓词链),用 Mapping[str, Runnable] 表达
graph_png.py 215 Graph → PNG 通过 mermaid.ink CDN 渲染(不是本地)
schema.py 188 EventData / StreamEvent 等 streaming 事件类型
__init__.py 136 公共 export

两条值得记住的物理事实——

  1. base.py 6261 行 = 整目录 46%——一个单文件塞了 7~8 个核心组合原语类——是 LangChain"单文件大方法集"风格的极端体现——比 ch12 的 callbacks/manager.py 2697 行还大 1.3 倍——Python 调试时跨类共享 _serialize_kwargs / _validate / _coerce_* 这些 helpers 比拆文件成本低、但代价是 LSP 跳转/git blame 体验差
  2. 3 个 graph_ 文件 1084 行*(mermaid 503 + ascii 366 + png 215)+ graph.py 739 = 可视化基础设施 1823 行 = 13.3%——langchain_core 把 "render graph as mermaid/ascii/png" 当成核心一等公民、而不是开发工具——印证"调试可观测性是基础设施投资"(同款思路在 ch12 §12.9.5 callbacks/tracers/ 9734 行也成立)

router.py 239 行是本章未覆盖的板块——RouterRunnableMapping[str, Runnable](按字符串 key 选分支)取代 RunnableBranch 的"谓词函数链"——适合 LLM 输出已经分类好的场景("intent classifier 返回 weather / news / chat、再路由到对应 Runnable");下一版章节应补一节。

3.12.2 从 | 到 Router:LCEL 的两种组合边界

本章已经讲过 | 是 LCEL 的视觉核心。源码里它有两个入口:langchain_core/runnables/base.py:618-637Runnable.__or__ 把右侧对象交给 coerce_to_runnable,再构造成 RunnableSequencebase.py:639-650__ror__ 处理"普通对象在左、Runnable 在右"的反向组合。真正的宽容入口在 base.py:6176-6200:如果对象已经是 Runnable,就原样返回;如果是生成器函数,包成 RunnableGenerator;如果是普通 callable,包成 RunnableLambda;如果是 dict,包成 RunnableParallel;否则抛 TypeError

flowchart TD
  A[LCEL 右侧对象] --> B{coerce_to_runnable}
  B -->|Runnable| C[原样使用]
  B -->|callable| D[RunnableLambda]
  B -->|generator| E[RunnableGenerator]
  B -->|dict| F[RunnableParallel]
  B -->|其他| G[TypeError]

这个入口解释了为什么 LCEL 用起来像"魔法":prompt | model | parser{"context": retriever, "question": RunnablePassthrough()} | promptlambda x: x["query"] 都能进入同一个协议。魔法不是运行时反射,而是一组明确的 coercion 规则。

但 LCEL 不只有顺序组合。RunnableSequence.__or__base.py:3077-3101 还有一个重要优化:如果右侧已经是 RunnableSequence,它会把两条 sequence 展平,避免嵌套 sequence 套 sequence;base.py:3104-3128 的反向组合也做同样处理。调用时,RunnableSequence.invokebase.py:3131-3144 先建立 callback manager 和根 run,再逐步执行 steps。这就是第 12 章回调树能覆盖整条链的原因:组合结构从一开始就被保留为一个可追踪的 Runnable。

router.py 则提供另一条边界。langchain_core/runnables/router.py:46 定义 RouterRunnable,它接收一个 Mapping[str, Runnable]router.py:107invoke 会读取输入里的 key,选中对应 runnable 后转发;router.py:209stream 也按同样路由进入子 runnable。它和 RunnableBranch 的区别是:

组合原语 路由依据 适合场景
RunnableBranch 一组谓词函数按顺序判断 条件来自程序逻辑,如分数阈值、字段存在性
RouterRunnable 输入里已经有 route key 上游模型或分类器已经给出意图标签
RunnableParallel 无路由,所有分支都执行 扇出、多路特征提取、RAG context/question 组合

工程上这三者不能混用。你如果已经有一个 intent classifier 输出 "weather" / "news" / "chat",用 RouterRunnable 比写一串 lambda predicate 更直接;如果判断条件依赖复杂业务状态,RunnableBranch 更清楚;如果需要同时计算多个字段,才是 RunnableParallel。LCEL 的价值不只是写法短,而是把串行、并行、条件、路由都纳入同一套 Runnable 协议。

这套协议的真正价值,是把"组件怎么接"和"组件做什么"分开。模型调用、检索器、解析器、普通函数、字典并行分支,在业务语义上完全不同;但只要被规整成同一种可运行对象,外层就能统一处理配置、回调、批量、异步和流式。框架用户感受到的是写法简洁,框架作者真正维护的是组合代数。

排查 LCEL 问题时也应沿着这条边界走。第一步看对象是否被正确规整:普通函数是不是变成函数包装器,字典是不是变成并行分支,生成器是不是变成流式生成器。第二步看组合结构是否被展平:嵌套管道是否变成一条步骤列表。第三步才看每个组件自己的业务逻辑。很多"链路不工作"的问题,根因并不在模型或提示词,而在某个普通对象没有被正确规整成协议对象。

这和丛书里的其他框架形成呼应。LangGraph 把状态机节点规整成图执行单元,MCP 把工具和资源规整成协议消息,Serde 把用户类型规整成序列化数据模型。成熟框架的共同点,都是先定义一个足够窄的协议,再让各种复杂对象进入这个协议。

所以学 LCEL 时,不要只记住管道符。管道符只是入口,真正要掌握的是对象规整、组合展平、配置传播、回调追踪这四个机制。它们合起来,才让一段短链具备生产系统需要的可执行性。

掌握这四个机制后,短链和长链的理解方式是一样的:都是协议对象在组合结构中流动。

3.13 小结

本章深入分析了 LangChain 的两大核心支柱:Runnable 协议和 LCEL 表达式语言。

Runnable 协议通过一个抽象方法(invoke)和一组带有合理默认实现的标准方法(ainvoke/batch/stream 等),使得任何组件只需实现 invoke 就能自动获得异步、批量、流式的全部能力。类型安全通过 Generic[Input, Output] 保证,Schema 推断通过 Pydantic 和 Python 反射实现。

LCEL 的 | 操作符利用 Python 的 __or__/__ror__ 魔术方法,将组件组合的代码转化为类似 Unix 管道的声明式语法。coerce_to_runnable 函数使得 | 两侧可以接受普通函数、字典等非 Runnable 对象,极大提升了 LCEL 的灵活性。

六个核心组合原语各司其职:

物理事实:langchain_core/runnables/ 16 文件 13730 行——base.py 单文件 6261 行占 46% 是"单文件大方法集"极端体现;可视化(graph + 3 个 graph_*)1823 行占 13.3% 体现 LangChain 把调试可观测性当一等公民;router.py 239 行的 RouterRunnable 是本章未覆盖板块。