LangChain 设计与实现
第3章 Runnable 与 LCEL 表达式语言
第3章 Runnable 与 LCEL 表达式语言
本章基于 LangChain 1.0.3 / langchain-core 1.2.26 源码分析。核心源码文件:
libs/core/langchain_core/runnables/base.py(约 6200 行)。
如果说 LangChain 是一座建筑,那么 Runnable 就是它的钢筋骨架,而 LCEL(LangChain Expression Language)就是将钢筋拼接成结构的焊接方式。本章将从源码层面彻底剖析这两个核心概念。
我们将逐一拆解 Runnable 协议的每一个方法,深入 | 操作符背后的魔法,然后详细分析六个核心组合原语——RunnableSequence、RunnableParallel、RunnableLambda、RunnableBranch、RunnablePassthrough、RunnableBinding——它们各自的设计动机、实现细节和使用场景。
本章要点
- Runnable 协议:
invoke/ainvoke/batch/abatch/stream/astream六大方法的接口设计与默认实现策略 |操作符实现:__or__和__ror__如何将 Python 管道语法转化为RunnableSequence- RunnableSequence:管道组合的核心,
first/middle/last的类型安全设计,流式传播的_transform实现 - RunnableParallel:并行执行的实现,
ThreadPoolExecutor的使用,字典字面量的自动转换 - RunnableLambda:函数到 Runnable 的桥梁,类型推断、依赖追踪、
@chain装饰器 - RunnableBranch / RunnablePassthrough / RunnableBinding:条件分支、数据透传、参数绑定的实现
- RunnableConfig:配置在管道中的流转机制,
ensure_config/patch_config/set_config_context三位一体
3.1 Runnable 协议:万物皆可 Runnable
Runnable 是定义在 libs/core/langchain_core/runnables/base.py 中的抽象基类,它是 LangChain 中最核心的抽象。理解它的设计,等于理解了 LangChain 的一半。
3.1.1 泛型签名与抽象方法
# 源码文件:libs/core/langchain_core/runnables/base.py (第124行)
class Runnable(ABC, Generic[Input, Output]):
"""A unit of work that can be invoked, batched, streamed,
transformed and composed."""
name: str | None
Runnable 有两个泛型参数:Input 和 Output。这使得类型检查器可以在编译时验证管道的数据流是否类型正确。例如,一个 Runnable[str, int] 只能通过 | 连接到一个 Runnable[int, ...]。
在整个 Runnable 类中,只有一个方法是真正的抽象方法:
@abstractmethod
def invoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any,
) -> Output:
"""Transform a single input into an output."""
这意味着,要实现一个 Runnable,你只需要实现 invoke,其他所有方法都有基于 invoke 的默认实现。 这是一个非常务实的设计——降低了接入门槛,同时允许性能敏感的子类覆写特定方法。
3.1.2 六大方法的默认实现策略
graph TB
subgraph "同步方法"
invoke["invoke(input)<br/>[抽象方法]"]
batch["batch(inputs)<br/>默认: 线程池并行调用 invoke"]
stream["stream(input)<br/>默认: yield invoke(input)"]
transform["transform(inputs)<br/>默认: 累积输入后 invoke"]
end
subgraph "异步方法"
ainvoke["ainvoke(input)<br/>默认: run_in_executor(invoke)"]
abatch["abatch(inputs)<br/>默认: asyncio.gather(ainvoke)"]
astream["astream(input)<br/>默认: yield await ainvoke(input)"]
atransform["atransform(inputs)<br/>默认: 累积输入后 ainvoke"]
end
invoke -.->|"线程池执行"| ainvoke
invoke -.->|"并行执行"| batch
invoke -.->|"单值 yield"| stream
ainvoke -.->|"gather"| abatch
ainvoke -.->|"单值 yield"| astream
style invoke fill:#4CAF50,color:#fff
style ainvoke fill:#2196F3,color:#fff
让我们逐一分析这些默认实现:
ainvoke 默认实现:线程池委托
# 源码文件:libs/core/langchain_core/runnables/base.py (第844行)
async def ainvoke(self, input, config=None, **kwargs):
return await run_in_executor(config, self.invoke, input, config, **kwargs)
run_in_executor 使用 asyncio.get_running_loop().run_in_executor() 将同步方法放入线程池执行。这意味着即使你的 Runnable 没有原生的异步实现,ainvoke 也能在 async 上下文中安全使用(不会阻塞事件循环)。性能敏感的子类(如直接调用异步 HTTP API 的 ChatOpenAI)会覆写为原生异步实现。
batch 默认实现:线程池并行
# 源码文件:libs/core/langchain_core/runnables/base.py (第867行)
def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
if not inputs:
return []
configs = get_config_list(config, len(inputs))
def invoke(input_, config):
if return_exceptions:
try:
return self.invoke(input_, config, **kwargs)
except Exception as e:
return e
else:
return self.invoke(input_, config, **kwargs)
if len(inputs) == 1:
return [invoke(inputs[0], configs[0])]
with get_executor_for_config(configs[0]) as executor:
return list(executor.map(invoke, inputs, configs))
注意两个设计细节:
- 当只有一个输入时,直接调用
invoke而不创建线程池——避免不必要的线程开销 return_exceptions=True模式类似于asyncio.gather的同名参数,将异常作为结果返回而非抛出
stream 默认实现:单值 yield
# 源码文件:libs/core/langchain_core/runnables/base.py (第1130行)
def stream(self, input, config=None, **kwargs):
yield self.invoke(input, config, **kwargs)
默认实现只是 yield 一次 invoke 的结果。这意味着默认情况下 stream 并不是真正的流式——它等价于 invoke。只有覆写了 stream(或 transform)的子类才有真正的流式能力。这就是为什么 LLM 类会覆写 stream 来逐 token 输出。
3.1.3 Schema 推断机制
Runnable 提供了自动的输入/输出 schema 推断:
# 源码文件:libs/core/langchain_core/runnables/base.py (第300行)
@property
def InputType(self) -> type[Input]:
# 首先从 Pydantic 的 __pydantic_generic_metadata__ 获取
for base in self.__class__.mro():
if hasattr(base, "__pydantic_generic_metadata__"):
metadata = base.__pydantic_generic_metadata__
if "args" in metadata and len(metadata["args"]) == 2:
return metadata["args"][0]
# 回退到 __orig_bases__ (非 Pydantic 的 Runnable)
for cls in self.__class__.__orig_bases__:
type_args = get_args(cls)
if type_args and len(type_args) == 2:
return type_args[0]
raise TypeError(f"Runnable {self.get_name()} doesn't have an inferable InputType.")
这段代码的精妙之处在于它处理了两种不同的情况:
- Pydantic 模型子类(如
RunnableSequence、RunnableParallel):从 Pydantic 的泛型元数据中提取类型参数 - 非 Pydantic 子类(如
RunnableLambda):从 Python 的__orig_bases__中提取类型参数
input_schema 属性进一步将 InputType 包装为 Pydantic BaseModel,使其可以生成 JSON Schema:
@property
def input_schema(self) -> type[BaseModel]:
return self.get_input_schema()
def get_input_schema(self, config=None):
root_type = self.InputType
if inspect.isclass(root_type) and issubclass(root_type, BaseModel):
return root_type
return create_model_v2(self.get_name("Input"), root=root_type)
3.2 管道操作符 | 的实现
LCEL 的核心语法是 | 操作符。在 Python 中,a | b 会尝试调用 a.__or__(b);如果返回 NotImplemented,则尝试 b.__ror__(a)。
3.2.1 __or__:正向管道
# 源码文件:libs/core/langchain_core/runnables/base.py (第618行)
def __or__(
self,
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]:
return RunnableSequence(self, coerce_to_runnable(other))
注意 other 参数的类型签名——它接受五种类型:
Runnable:直接使用- 同步生成器函数:包装为
RunnableGenerator - 异步生成器函数:包装为
RunnableGenerator - 普通可调用对象:包装为
RunnableLambda - 字典:包装为
RunnableParallel
3.2.2 __ror__:反向管道
# 源码文件:libs/core/langchain_core/runnables/base.py (第639行)
def __ror__(
self,
other: Runnable[Other, Any] | Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]:
return RunnableSequence(coerce_to_runnable(other), self)
__ror__ 使得非 Runnable 对象可以出现在 | 的左侧。例如:
# 字典字面量在左侧,触发右侧 Runnable 的 __ror__
chain = {"context": retriever, "question": RunnablePassthrough()} | prompt
# 等价于
chain = RunnableParallel(context=retriever, question=RunnablePassthrough()) | prompt
3.2.3 coerce_to_runnable:万物转换
# 源码文件:libs/core/langchain_core/runnables/base.py (第6176行)
def coerce_to_runnable(thing: RunnableLike) -> Runnable[Input, Output]:
if isinstance(thing, Runnable):
return thing
if is_async_generator(thing) or inspect.isgeneratorfunction(thing):
return RunnableGenerator(thing)
if callable(thing):
return RunnableLambda(cast(..., thing))
if isinstance(thing, dict):
return cast(..., RunnableParallel(thing))
raise TypeError(f"Expected a Runnable, callable or dict. "
f"Instead got an unsupported type: {type(thing)}")
这个函数是 LCEL 灵活性的关键——它使得 | 操作符的两侧可以接受各种 Python 对象,而不仅仅是 Runnable 实例。
graph TD
INPUT["coerce_to_runnable(thing)"]
INPUT --> C1{是 Runnable?}
C1 -->|是| R1["直接返回"]
C1 -->|否| C2{是生成器函数?}
C2 -->|是| R2["RunnableGenerator(thing)<br/>保留流式能力"]
C2 -->|否| C3{是 callable?}
C3 -->|是| R3["RunnableLambda(thing)<br/>包装为 Runnable"]
C3 -->|否| C4{是 dict?}
C4 -->|是| R4["RunnableParallel(thing)<br/>并行执行"]
C4 -->|否| R5["抛出 TypeError"]
style INPUT fill:#e8f5e9
style R1 fill:#c8e6c9
style R2 fill:#c8e6c9
style R3 fill:#c8e6c9
style R4 fill:#c8e6c9
style R5 fill:#ffcdd2
3.3 RunnableSequence:管道的骨架
RunnableSequence 是 LCEL 中最重要的组合原语。每一次 | 操作都会创建一个 RunnableSequence(或将步骤追加到已有的 RunnableSequence)。
3.3.1 构造与展平
# 源码文件:libs/core/langchain_core/runnables/base.py (第2911行)
class RunnableSequence(RunnableSerializable[Input, Output]):
first: Runnable[Input, Any]
middle: list[Runnable[Any, Any]] = Field(default_factory=list)
last: Runnable[Any, Output]
def __init__(self, *steps: RunnableLike, name=None, first=None, middle=None, last=None):
steps_flat: list[Runnable] = []
if not steps and first is not None and last is not None:
steps_flat = [first] + (middle or []) + [last]
for step in steps:
if isinstance(step, RunnableSequence):
steps_flat.extend(step.steps) # 关键:展平嵌套
else:
steps_flat.append(coerce_to_runnable(step))
super().__init__(
first=steps_flat[0],
middle=list(steps_flat[1:-1]),
last=steps_flat[-1],
name=name,
)
展平的重要性:当你写 (a | b) | c 时,a | b 先创建一个 RunnableSequence(a, b),然后 | c 时检测到左操作数是 RunnableSequence,会将其展平为 RunnableSequence(a, b, c),而不是 RunnableSequence(RunnableSequence(a, b), c)。这避免了不必要的嵌套,保持了执行图的扁平。
RunnableSequence 还覆写了 __or__ 和 __ror__ 来优化这个展平过程:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3077行)
def __or__(self, other):
if isinstance(other, RunnableSequence):
return RunnableSequence(
self.first, *self.middle, self.last,
other.first, *other.middle, other.last,
name=self.name or other.name,
)
return RunnableSequence(
self.first, *self.middle, self.last,
coerce_to_runnable(other),
name=self.name,
)
当两个 RunnableSequence 通过 | 连接时,直接将两者的步骤列表拼接,而不是嵌套。这使得无论你写多长的管道,内部都只有一层 RunnableSequence。
3.3.2 invoke 的实现
# 源码文件:libs/core/langchain_core/runnables/base.py (第3131行)
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
None, input,
name=config.get("run_name") or self.get_name(),
run_id=config.pop("run_id", None),
)
input_ = input
try:
for i, step in enumerate(self.steps):
config = patch_config(
config,
callbacks=run_manager.get_child(f"seq:step:{i + 1}")
)
with set_config_context(config) as context:
if i == 0:
input_ = context.run(step.invoke, input_, config, **kwargs)
else:
input_ = context.run(step.invoke, input_, config)
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(input_)
return cast("Output", input_)
逐行分析:
ensure_config:初始化配置,合并 ContextVar 中的父级配置callback_manager.on_chain_start:通知追踪系统"序列开始执行"- 循环执行每个步骤:
patch_config:为当前步骤创建子级回调(seq:step:1、seq:step:2...)set_config_context:将配置写入 ContextVar,创建隔离的执行上下文context.run(step.invoke, ...):在隔离上下文中执行步骤- 第一步传递
**kwargs,后续步骤不传递(kwargs 只对首步有效)
- 错误处理:捕获
BaseException(包括KeyboardInterrupt),上报给追踪系统
3.3.3 stream 与 _transform
RunnableSequence.stream 的实现是整个 LCEL 流式能力的核心:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3528行)
def stream(self, input, config=None, **kwargs):
yield from self.transform(iter([input]), config, **kwargs)
stream 将单个输入包装为迭代器,然后委托给 transform。而 transform 调用内部的 _transform:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3465行)
def _transform(self, inputs, run_manager, config, **kwargs):
steps = [self.first, *self.middle, self.last]
final_pipeline = cast("Iterator[Output]", inputs)
for idx, step in enumerate(steps):
config = patch_config(
config, callbacks=run_manager.get_child(f"seq:step:{idx + 1}")
)
if idx == 0:
final_pipeline = step.transform(final_pipeline, config, **kwargs)
else:
final_pipeline = step.transform(final_pipeline, config)
yield from final_pipeline
这段代码实现了惰性管道串联:它并不立即执行任何步骤,而是将每个步骤的 transform 方法串联成一个嵌套的迭代器链。只有当最终消费者(yield from)开始拉取数据时,数据才开始逐步流经管道。
graph LR
subgraph "stream 管道"
I["输入: iter([input])"] --> T1["step1.transform()"]
T1 --> T2["step2.transform()"]
T2 --> T3["step3.transform()"]
T3 --> O["yield from<br/>输出流"]
end
subgraph "惰性求值"
direction TB
N1["消费者拉取"] --> N2["step3 向 step2 请求"]
N2 --> N3["step2 向 step1 请求"]
N3 --> N4["step1 从输入迭代器读取"]
end
style I fill:#e8f5e9
style O fill:#e8f5e9
3.3.4 batch 的优化实现
RunnableSequence 覆写了 batch,实现了"按步骤批量"而非"按输入逐个"的执行策略:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3207行)
def batch(self, inputs, config=None, *, return_exceptions=False, **kwargs):
# ... 设置 callback_managers 和 run_managers ...
# 关键:对每个步骤调用 batch,而非对整个序列多次 invoke
for i, step in enumerate(self.steps):
inputs = step.batch(
inputs,
[
patch_config(config, callbacks=rm.get_child(f"seq:step:{i + 1}"))
for rm, config in zip(run_managers, configs)
],
return_exceptions=return_exceptions,
**(kwargs if i == 0 else {}),
)
# ...
这个设计的好处是:如果某个步骤(如 ChatOpenAI)有原生的批量 API,它可以在一次 HTTP 请求中处理多个输入,而不是发送多次单独的请求。这在实际应用中可以带来数倍的性能提升。
3.4 RunnableParallel:分叉与汇聚
RunnableParallel 实现了数据的"一进多出"——将同一个输入分发给多个 Runnable 并行执行,然后将各自的输出收集到一个字典中。
3.4.1 构造方式
RunnableParallel 支持三种构造方式:
# 方式1:字典参数
parallel = RunnableParallel({"key1": runnable1, "key2": runnable2})
# 方式2:关键字参数
parallel = RunnableParallel(key1=runnable1, key2=runnable2)
# 方式3:在管道中使用字典字面量(通过 coerce_to_runnable 自动转换)
chain = some_runnable | {"key1": runnable1, "key2": runnable2}
内部实现:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3651行)
class RunnableParallel(RunnableSerializable[Input, dict[str, Any]]):
steps__: Mapping[str, Runnable[Input, Any]]
def __init__(self, steps__=None, **kwargs):
merged = {**steps__} if steps__ is not None else {}
merged.update(kwargs)
super().__init__(
steps__={key: coerce_to_runnable(r) for key, r in merged.items()}
)
注意字段名 steps__ 使用了双下划线后缀——这是为了避免与用户传入的关键字参数冲突。如果字段名是 steps,那么 RunnableParallel(steps=some_runnable) 就会产生歧义。
3.4.2 invoke 的并行执行
# 源码文件:libs/core/langchain_core/runnables/base.py (第3834行)
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config)
callback_manager = CallbackManager.configure(
inheritable_callbacks=config.get("callbacks"), ...
)
run_manager = callback_manager.on_chain_start(None, input, ...)
def _invoke_step(step, input_, config, key):
child_config = patch_config(
config,
callbacks=run_manager.get_child(f"map:key:{key}"),
)
with set_config_context(child_config) as context:
return context.run(step.invoke, input_, child_config)
try:
steps = dict(self.steps__)
with get_executor_for_config(config) as executor:
futures = [
executor.submit(_invoke_step, step, input, config, key)
for key, step in steps.items()
]
output = {
key: future.result()
for key, future in zip(steps, futures)
}
except BaseException as e:
run_manager.on_chain_error(e)
raise
else:
run_manager.on_chain_end(output)
return output
核心执行逻辑:
- 使用
ThreadPoolExecutor并行提交所有步骤 - 每个步骤接收相同的
input,但各自拥有独立的子回调管理器(map:key:xxx) - 收集所有
Future.result(),组装成字典返回
3.4.3 Schema 合并
RunnableParallel 的输入 schema 是所有子步骤输入 schema 的合并:
# 源码文件:libs/core/langchain_core/runnables/base.py (第3722行)
def get_input_schema(self, config=None):
if all(
s.get_input_schema(config).model_json_schema().get("type", "object") == "object"
for s in self.steps__.values()
):
# 所有子步骤都接受对象输入时,合并它们的字段
return create_model_v2(
self.get_name("Input"),
field_definitions={
k: (v.annotation, v.default)
for step in self.steps__.values()
for k, v in step.get_input_schema(config).model_fields.items()
if k != "__root__"
},
)
return super().get_input_schema(config)
这意味着,如果并行步骤中的 Runnable A 需要字段 "question",Runnable B 需要字段 "context",那么 RunnableParallel 的输入 schema 将包含 "question" 和 "context" 两个字段。
graph TB
subgraph "RunnableParallel 执行模型"
INPUT["输入: {question: '...', context: '...'}"]
INPUT -->|同一输入| S1["步骤 A: retriever"]
INPUT -->|同一输入| S2["步骤 B: RunnablePassthrough"]
INPUT -->|同一输入| S3["步骤 C: some_transform"]
S1 -->|"context: [doc1, doc2]"| OUTPUT
S2 -->|"question: '...'"| OUTPUT
S3 -->|"extra: '...'"| OUTPUT
OUTPUT["输出字典:<br/>{context: [...], question: '...', extra: '...'}"]
end
subgraph "线程池"
direction LR
T1["Thread 1: A.invoke()"]
T2["Thread 2: B.invoke()"]
T3["Thread 3: C.invoke()"]
end
S1 -.-> T1
S2 -.-> T2
S3 -.-> T3
style INPUT fill:#e8f5e9
style OUTPUT fill:#e8f5e9
3.5 RunnableLambda:函数的桥梁
RunnableLambda 是将普通 Python 函数接入 Runnable 生态的桥梁。它的设计比看起来复杂得多——需要处理类型推断、同步/异步兼容、依赖追踪等问题。
3.5.1 构造与同步/异步检测
# 源码文件:libs/core/langchain_core/runnables/base.py (第4579行)
class RunnableLambda(Runnable[Input, Output]):
def __init__(self, func, afunc=None, name=None):
if afunc is not None:
self.afunc = afunc
func_for_name = afunc
if is_async_callable(func) or is_async_generator(func):
if afunc is not None:
raise TypeError(
"Func was provided as a coroutine function, but afunc was "
"also provided. If providing both, func should be a regular "
"function to avoid ambiguity."
)
self.afunc = func
func_for_name = func
elif callable(func):
self.func = cast(..., func)
func_for_name = func
else:
raise TypeError(...)
# 自动从函数名推断 Runnable 名称
try:
if name is not None:
self.name = name
elif func_for_name.__name__ != "<lambda>":
self.name = func_for_name.__name__
except AttributeError:
pass
关键设计:
- 如果
func是协程函数(async def),自动将其设置为afunc而非func - 可以同时提供同步和异步两个实现:
RunnableLambda(sync_fn, afunc=async_fn) - 名称自动从函数名推断,lambda 函数除外
3.5.2 类型推断
RunnableLambda 从函数签名中推断输入和输出类型:
# 源码文件:libs/core/langchain_core/runnables/base.py (第4654行)
@property
def InputType(self):
func = getattr(self, "func", None) or self.afunc
try:
params = inspect.signature(func).parameters
first_param = next(iter(params.values()), None)
if first_param and first_param.annotation != inspect.Parameter.empty:
return first_param.annotation
except ValueError:
pass
return Any
@property
def OutputType(self):
func = getattr(self, "func", None) or self.afunc
try:
sig = inspect.signature(func)
if sig.return_annotation != inspect.Signature.empty:
# 解包迭代器类型:Iterator[X] -> X
if getattr(sig.return_annotation, "__origin__", None) in {
collections.abc.Iterator,
collections.abc.AsyncIterator,
}:
return getattr(sig.return_annotation, "__args__", (Any,))[0]
return sig.return_annotation
except ValueError:
pass
return Any
一个巧妙的细节:如果函数返回 Iterator[X] 或 AsyncIterator[X],OutputType 会自动解包为 X。这是因为生成器函数的"输出"是每个 yield 的值,而非迭代器本身。
3.5.3 invoke 的实现
# 源码文件:libs/core/langchain_core/runnables/base.py (第4997行)
def invoke(self, input, config=None, **kwargs):
if hasattr(self, "func"):
return self._call_with_config(
self._invoke,
input,
ensure_config(config),
**kwargs,
)
raise TypeError("Cannot invoke a coroutine function synchronously.")
_call_with_config 是 Runnable 基类提供的模板方法,它包装了追踪逻辑。实际的函数调用发生在 _invoke 中(通过 call_func_with_variable_args 自动适配函数签名)。
3.5.4 依赖追踪
RunnableLambda 有一个被低估的能力——自动追踪闭包中引用的其他 Runnable:
# 源码文件:libs/core/langchain_core/runnables/base.py (第4763行)
@functools.cached_property
def deps(self) -> list[Runnable]:
if hasattr(self, "func"):
objects = get_function_nonlocals(self.func)
elif hasattr(self, "afunc"):
objects = get_function_nonlocals(self.afunc)
else:
objects = []
deps: list[Runnable] = []
for obj in objects:
if isinstance(obj, Runnable):
deps.append(obj)
elif isinstance(getattr(obj, "__self__", None), Runnable):
deps.append(obj.__self__)
return deps
get_function_nonlocals 检查函数闭包(__closure__)中的非局部变量,如果其中包含 Runnable 对象,就将它们记录为依赖。这使得图可视化工具能正确显示 RunnableLambda 与其闭包引用的 Runnable 之间的关系。
3.5.5 @chain 装饰器
@chain 装饰器是 RunnableLambda 的语法糖,使函数定义更具声明性:
# 源码文件:libs/core/langchain_core/runnables/base.py (第6227行)
def chain(func):
"""Decorate a function to make it a Runnable.
Sets the name of the Runnable to the name of the function."""
return RunnableLambda(func)
使用示例:
from langchain_core.runnables import chain
@chain
def format_output(text: str) -> dict:
return {"formatted": text.strip().upper()}
# format_output 现在是一个 Runnable,名称为 "format_output"
chain = some_llm | format_output
3.6 RunnableBranch:条件分支
RunnableBranch 实现了"if-elif-else"风格的条件路由。
3.6.1 结构设计
# 源码文件:libs/core/langchain_core/runnables/branch.py (第42行)
class RunnableBranch(RunnableSerializable[Input, Output]):
branches: Sequence[tuple[Runnable[Input, bool], Runnable[Input, Output]]]
default: Runnable[Input, Output]
def __init__(self, *branches):
if len(branches) < 2:
raise ValueError("RunnableBranch requires at least two branches")
default = branches[-1] # 最后一个参数是 default 分支
default_ = coerce_to_runnable(default)
branches_ = []
for branch in branches[:-1]:
condition, runnable = branch
condition = coerce_to_runnable(condition)
runnable = coerce_to_runnable(runnable)
branches_.append((condition, runnable))
super().__init__(branches=branches_, default=default_)
构造参数是一系列 (条件, Runnable) 元组,最后一个参数是默认分支。条件本身也是 Runnable(或函数,会被 coerce_to_runnable 包装)。
3.6.2 invoke 的实现
# 源码文件:libs/core/langchain_core/runnables/branch.py (第189行)
def invoke(self, input, config=None, **kwargs):
config = ensure_config(config)
callback_manager = get_callback_manager_for_config(config)
run_manager = callback_manager.on_chain_start(
None, input, name=config.get("run_name") or self.get_name(), ...
)
try:
for idx, branch in enumerate(self.branches):
condition, runnable = branch
expression_value = condition.invoke(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(f"branch:{idx + 1}:condition:"),
),
)
if expression_value:
output = runnable.invoke(
input,
config=patch_config(
config,
callbacks=run_manager.get_child(f"branch:{idx + 1}:then:"),
),
**kwargs,
)
break
else:
output = self.default.invoke(
input,
config=patch_config(
config, callbacks=run_manager.get_child("branch:default:"),
),
**kwargs,
)
except BaseException as e:
run_manager.on_chain_error(e)
raise
run_manager.on_chain_end(output)
return output
执行逻辑清晰:依次评估每个条件,第一个为 True 的条件对应的 Runnable 被执行;如果所有条件都为 False,执行默认分支。注意 Python 的 for-else 语法——else 块在循环没有被 break 中断时执行。
graph TD
INPUT["输入数据"]
INPUT --> C1{"条件1: isinstance(x, str)?"}
C1 -->|True| B1["分支1: x.upper()"]
C1 -->|False| C2{"条件2: isinstance(x, int)?"}
C2 -->|True| B2["分支2: x + 1"]
C2 -->|False| C3{"条件3: isinstance(x, float)?"}
C3 -->|True| B3["分支3: x * 2"]
C3 -->|False| BD["默认分支: 'goodbye'"]
B1 --> OUTPUT["输出"]
B2 --> OUTPUT
B3 --> OUTPUT
BD --> OUTPUT
style INPUT fill:#e8f5e9
style OUTPUT fill:#e8f5e9
style BD fill:#fff3e0
3.7 RunnablePassthrough:数据透传与增强
RunnablePassthrough 看起来像是一个什么都不做的组件,但它在 LCEL 管道中扮演着至关重要的角色——透传原始数据的同时,允许执行副作用或增强字段。
3.7.1 基本透传
# 源码文件:libs/core/langchain_core/runnables/passthrough.py (第74行)
class RunnablePassthrough(RunnableSerializable[Other, Other]):
func: Callable | None = None # 可选的副作用函数
afunc: Callable | None = None # 可选的异步副作用函数
def invoke(self, input, config=None, **kwargs):
# 如果有副作用函数,先执行它
if self.func is not None:
call_func_with_variable_args(
self.func, input, ensure_config(config), **kwargs
)
# 然后透传原始输入
return self._call_with_config(identity, input, config)
核心设计:无论副作用函数做了什么,返回值始终是原始输入。 这使得 RunnablePassthrough 成为了一个"观察点"——你可以在管道中插入日志、统计等副作用,而不影响数据流。
3.7.2 assign:字段增强
RunnablePassthrough.assign 是 LCEL 中最实用的方法之一:
# 源码文件:libs/core/langchain_core/runnables/passthrough.py (第207行)
@classmethod
def assign(cls, **kwargs) -> RunnableAssign:
return RunnableAssign(RunnableParallel[dict[str, Any]](kwargs))
assign 创建了一个 RunnableAssign,它在保留原始输入字典所有字段的基础上,添加(或覆盖)新的字段。这在 RAG 管道中极为常见:
# 典型 RAG 管道
chain = (
RunnablePassthrough.assign(
context=lambda x: retriever.invoke(x["question"])
)
| prompt
| model
| StrOutputParser()
)
# 输入: {"question": "什么是LangChain?"}
# assign 后: {"question": "什么是LangChain?", "context": [doc1, doc2, ...]}
3.7.3 RunnablePick:字段选择
与 assign 相对,pick 用于从字典中选择特定字段:
# 用法
chain = some_parallel | RunnablePick(["key1", "key2"])
# 等价于
chain = some_parallel.pick(["key1", "key2"])
RunnablePick 是 Runnable.pick 方法的底层实现,它从字典输入中提取指定的键值对。
3.8 RunnableBinding:参数绑定与配置固化
RunnableBinding 通过装饰器模式为 Runnable 附加额外的参数或配置。
3.8.1 结构设计
# 源码文件:libs/core/langchain_core/runnables/base.py (第5530行)
class RunnableBindingBase(RunnableSerializable[Input, Output]):
bound: Runnable[Input, Output]
kwargs: Mapping[str, Any] = Field(default_factory=dict)
config: RunnableConfig = Field(default_factory=RunnableConfig)
config_factories: list[Callable[[RunnableConfig], RunnableConfig]] = Field(
default_factory=list
)
custom_input_type: Any | None = None
custom_output_type: Any | None = None
RunnableBindingBase 持有一个 bound(被包装的 Runnable)和一组固定的 kwargs。每次调用 invoke 时,这些 kwargs 会自动传递给 bound.invoke:
# 源码文件:libs/core/langchain_core/runnables/base.py (第5689行)
def invoke(self, input, config=None, **kwargs):
return self.bound.invoke(
input,
self._merge_configs(config),
**{**self.kwargs, **kwargs},
)
3.8.2 常见用法
RunnableBinding 通常通过 Runnable.bind 方法创建:
# 为 LLM 固定 temperature 参数
model_creative = model.bind(temperature=0.9)
model_precise = model.bind(temperature=0.1)
# 为 LLM 绑定工具
model_with_tools = model.bind(tools=[search_tool, calculator_tool])
# 固定配置
chain_tagged = chain.with_config({"tags": ["production"], "metadata": {"version": "1.0"}})
3.9 RunnableConfig 的完整生命周期
让我们通过一个完整的例子,跟踪 RunnableConfig 在一次管道执行中的完整生命周期:
chain = prompt | model | parser
chain.invoke(
{"question": "Hello"},
config={"tags": ["user-request"], "metadata": {"user_id": "123"}}
)
sequenceDiagram
participant User as 用户
participant Seq as RunnableSequence
participant EC as ensure_config
participant PC as patch_config
participant SC as set_config_context
participant CV as ContextVar
participant P as Prompt
participant M as Model
participant Pa as Parser
User->>Seq: invoke(input, config={tags:["user-request"]})
Seq->>EC: ensure_config(config)
Note over EC: 1. 创建默认配置<br/>2. 从ContextVar继承<br/>3. 用用户config覆盖
EC-->>Seq: full_config
Seq->>PC: patch_config(config, callbacks=child_1)
Note over PC: 替换callbacks为seq:step:1
PC-->>Seq: step1_config
Seq->>SC: set_config_context(step1_config)
SC->>CV: 写入ContextVar
Note over SC: copy_context()
Seq->>P: prompt.invoke(input, step1_config)
Note over P: 内部ensure_config<br/>自动继承step1_config
P-->>Seq: prompt_output
Seq->>PC: patch_config(config, callbacks=child_2)
Seq->>SC: set_config_context(step2_config)
SC->>CV: 写入ContextVar
Seq->>M: model.invoke(prompt_output, step2_config)
M-->>Seq: model_output
Seq->>PC: patch_config(config, callbacks=child_3)
Seq->>SC: set_config_context(step3_config)
SC->>CV: 写入ContextVar
Seq->>Pa: parser.invoke(model_output, step3_config)
Pa-->>Seq: final_output
Seq-->>User: final_output
关键观察:
- tags 和 metadata 贯穿始终:用户设置的
tags和metadata通过ensure_config的COPIABLE_KEYS机制,在每一步的 config 中都存在(通过 copy 避免共享引用) - callbacks 逐步替换:每一步的
callbacks被patch_config替换为该步骤特有的子回调管理器,使得追踪系统能构建正确的父子关系树 - ContextVar 提供隐式传播:即使某些内部调用没有显式传递 config,
ensure_config也能从ContextVar中恢复父级配置
3.10 RunnableWithFallbacks:优雅降级
虽然不是 LCEL 的核心组合原语,RunnableWithFallbacks 是生产环境中不可或缺的容错机制:
# 源码文件:libs/core/langchain_core/runnables/fallbacks.py (第36行)
class RunnableWithFallbacks(RunnableSerializable[Input, Output]):
runnable: Runnable[Input, Output]
fallbacks: Sequence[Runnable[Input, Output]]
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
exception_key: str | None = None
执行逻辑:先尝试 runnable,如果抛出 exceptions_to_handle 中的异常,依次尝试 fallbacks 中的 Runnable,直到有一个成功或全部失败。
# 使用示例
model = ChatOpenAI(model="gpt-4o").with_fallbacks(
[ChatAnthropic(model="claude-sonnet-4-6"), ChatOllama(model="llama3")]
)
# GPT-4o 故障时自动降级到 Claude,Claude 故障时降级到本地 Ollama
3.10.1 invoke 实现里的四条设计约束
上面的 4 个字段看起来平凡,但 fallbacks.py:166 的 invoke 实现藏着四条生产环境必须了解的约束:
# fallbacks.py:187-213 主循环
first_error = None
last_error = None
for runnable in self.runnables:
try:
if self.exception_key and last_error is not None:
input[self.exception_key] = last_error # ← ①
...
output = context.run(runnable.invoke, input, config, **kwargs)
except self.exceptions_to_handle as e: # ← ②
if first_error is None:
first_error = e
last_error = e
except BaseException as e: # ← ③
run_manager.on_chain_error(e)
raise
else:
return output
# 全部失败:抛 first_error
run_manager.on_chain_error(first_error)
raise first_error # ← ④
① exception_key 是一个反馈通道、不是标签。如果设了 exception_key="last_error",当 fallback #1 也失败时、框架会在调用 fallback #2 前把 input["last_error"] = fallback_1_的异常对象 写进输入。下一级的 prompt 模板可以读这个字段做自我修正——比如给 LLM 提示"上一个模型因为 RateLimit 失败、请用更短的回答"。这让多级 fallback 不只是"换个模型重试"、而是"带着失败原因换一个模型问"的升级尝试。这个字段的使用前提是 input 必须是 dict(line 169-174 有守卫、不是 dict 直接 ValueError),所以 .with_fallbacks(..., exception_key="err") 只能搭配字典类型的输入使用。
② exceptions_to_handle 精确圈定可接管的异常。默认 (Exception,) 会接所有 Exception 子类、但不接 BaseException ——也就是说 KeyboardInterrupt、SystemExit、asyncio.CancelledError(Python 3.8+ 把它从 Exception 改成 BaseException 子类)不会被当作"可重试的失败"。这是防生产环境 bug:用户 Ctrl+C 想停进程、或者 task 被取消、或者应用想 graceful exit——不能因为"包了 fallback"就把这些控制信号吞掉继续重试。line 203-205 的 except BaseException 明确把控制流信号放走,fallback 机制只拦业务错误。
③ 控制流异常优先透传,并调 run_manager.on_chain_error(e) 通知 LangSmith trace "这个 run 被中断了"——trace 不会丢失。用户在监控里看到的不是"神秘成功"、而是"这里有个 KeyboardInterrupt、链路被打断"。
④ 全部失败时抛的是 first_error、不是 last_error。这条设计逻辑很关键:生产环境排障时你想问的通常是"为什么主模型失败了"——不是"备用模型也挂了"。如果抛的是 last_error,日志里堆的是"ollama 连不上"或"claude rate limit"——掩盖了真正的主因"gpt-4o 返回了 5xx"。first_error = None; if first_error is None: first_error = e 这个简单记录,让事后 debug 时能直接看见第一张倒下的多米诺骨牌。
callback tracing 的完整性是另一条隐形约束——每次 fallback 调用都走 patch_config(config, callbacks=run_manager.get_child())(line 191)。在 LangSmith 上你会看到一棵 trace 树:根节点是 RunnableWithFallbacks、子节点是"主模型(失败)" + "fallback #1(失败)" + "fallback #2(成功)"——每次尝试都有独立 trace、输入输出完整记录。这对生产环境的事后分析至关重要:你想知道"哪个请求的哪次降级花了多少时间"时,LangSmith 已经替你把每一步都存下来了。
3.11 组合原语的协作模式
让我们通过一个实际的 RAG 管道示例,展示所有组合原语如何协作:
from langchain_core.runnables import (
RunnablePassthrough, RunnableParallel, RunnableBranch, RunnableLambda
)
# 构建一个带有条件路由和并行检索的 RAG 管道
chain = (
# 1. RunnableParallel: 并行执行问题分类和上下文检索
RunnableParallel(
question=RunnablePassthrough(),
context=lambda x: retriever.invoke(x["question"]),
category=lambda x: classifier.invoke(x["question"]),
)
# 2. RunnableBranch: 根据问题类别选择不同的 Prompt
| RunnableBranch(
(lambda x: x["category"] == "technical", technical_prompt),
(lambda x: x["category"] == "creative", creative_prompt),
general_prompt, # 默认
)
# 3. RunnableSequence: 通过管道连接 Model 和 Parser
| model
| StrOutputParser()
)
graph TD
INPUT["用户输入<br/>{question: '...'}"]
INPUT --> PAR["RunnableParallel"]
PAR -->|"question"| PP["RunnablePassthrough<br/>透传原始问题"]
PAR -->|"context"| RET["RunnableLambda<br/>retriever.invoke()"]
PAR -->|"category"| CLS["RunnableLambda<br/>classifier.invoke()"]
PP --> MERGE["合并结果"]
RET --> MERGE
CLS --> MERGE
MERGE --> BRANCH["RunnableBranch"]
BRANCH -->|"category == 'technical'"| TP["technical_prompt"]
BRANCH -->|"category == 'creative'"| CP["creative_prompt"]
BRANCH -->|"默认"| GP["general_prompt"]
TP --> MODEL["ChatModel"]
CP --> MODEL
GP --> MODEL
MODEL --> PARSER["StrOutputParser"]
PARSER --> OUTPUT["最终文本输出"]
style INPUT fill:#e8f5e9
style OUTPUT fill:#e8f5e9
style PAR fill:#e3f2fd
style BRANCH fill:#fff3e0
在这个管道中:
RunnableParallel负责数据的扇出——将同一个输入分发给三个并行的处理流RunnablePassthrough负责保留原始数据——让问题文本在后续步骤中可用RunnableBranch负责条件路由——根据分类结果选择不同的 PromptRunnableSequence(由|创建)负责串行流转——将数据从一个组件传递到下一个
3.12 设计决策深度分析
为什么 Runnable 同时是 ABC 和 Generic?
Runnable(ABC, Generic[Input, Output]) 的多继承看似简单,但解决了一个深层问题:如何在不牺牲类型安全的前提下允许多态?
ABC 确保子类必须实现 invoke(通过 @abstractmethod);Generic[Input, Output] 确保管道中的类型能被静态检查器验证。如果只有 ABC,管道的类型检查将退化为 Any;如果只有 Generic,就无法强制实现 invoke。
为什么用 Pydantic BaseModel 作为 RunnableSerializable 的基类?
RunnableSerializable 继承了 Serializable(间接继承 Pydantic BaseModel)和 Runnable。这不是多余的——Pydantic 提供了:
- 自动验证:
RunnableSequence(first=..., middle=..., last=...)的参数会被 Pydantic 自动验证类型 - 序列化支持:
model_dump()/model_dump_json()为 JSON 序列化提供基础 - Schema 生成:
model_json_schema()自动生成 JSON Schema
但这也带来了限制——RunnableLambda 因为包含不可序列化的 Callable,所以不继承 RunnableSerializable。
transform vs stream:为什么需要两个方法?
stream(input) 接受一个单值输入,产生一个输出流;transform(inputs) 接受一个输入流,产生一个输出流。stream 本质上是 transform(iter([input])) 的语法糖。
分离这两个方法的原因是语义清晰性:用户通常调用 stream(单输入),而框架内部的管道串联使用 transform(流到流)。
3.12.1 langchain_core/runnables/ 16 文件 13730 行的真实拆分
把整个 runnables/ 目录按文件大小排序——
| 文件 | 行 | 角色 |
|---|---|---|
base.py |
6261 | 本目录最大、占 45.6%——Runnable ABC + RunnableSequence / RunnableParallel / RunnableLambda / RunnableBinding / RunnableEachBase 等核心组合原语全部塞在一文件 |
passthrough.py |
841 | RunnablePassthrough + RunnableAssign + RunnablePick(§3.7) |
utils.py |
759 | 类型推断工具、get_unique_config_specs、Schema 推断 |
graph.py |
739 | Graph / Node / Edge 数据类 + 图算法 |
configurable.py |
716 | RunnableConfigurableFields + RunnableConfigurableAlternatives——§3.8 提到的"配置覆写"实现 |
fallbacks.py |
664 | RunnableWithFallbacks(§3.10 主角) |
config.py |
641 | RunnableConfig TypedDict + merge_configs + get_executor_for_config(§3.9 主角) |
history.py |
622 | RunnableWithMessageHistory(ch13 §13.9 主角,见 §13.10.4 实测) |
graph_mermaid.py |
503 | Graph → mermaid 字符串渲染 |
branch.py |
461 | RunnableBranch(§3.6 主角) |
retry.py |
379 | RunnableRetry——本章未深入讨论 |
graph_ascii.py |
366 | Graph → ASCII 字符画渲染 |
router.py |
239 | 本章完全没提——RouterRunnable 是 RunnableBranch 的进化版(按 route_name 字段路由、不是按谓词链),用 Mapping[str, Runnable] 表达 |
graph_png.py |
215 | Graph → PNG 通过 mermaid.ink CDN 渲染(不是本地) |
schema.py |
188 | EventData / StreamEvent 等 streaming 事件类型 |
__init__.py |
136 | 公共 export |
两条值得记住的物理事实——
base.py6261 行 = 整目录 46%——一个单文件塞了 7~8 个核心组合原语类——是 LangChain"单文件大方法集"风格的极端体现——比 ch12 的callbacks/manager.py2697 行还大 1.3 倍——Python 调试时跨类共享_serialize_kwargs / _validate / _coerce_*这些 helpers 比拆文件成本低、但代价是 LSP 跳转/git blame 体验差- 3 个 graph_ 文件 1084 行*(mermaid 503 + ascii 366 + png 215)+
graph.py739 = 可视化基础设施 1823 行 = 13.3%——langchain_core把 "render graph as mermaid/ascii/png" 当成核心一等公民、而不是开发工具——印证"调试可观测性是基础设施投资"(同款思路在 ch12 §12.9.5 callbacks/tracers/ 9734 行也成立)
router.py 239 行是本章未覆盖的板块——RouterRunnable 用 Mapping[str, Runnable](按字符串 key 选分支)取代 RunnableBranch 的"谓词函数链"——适合 LLM 输出已经分类好的场景("intent classifier 返回 weather / news / chat、再路由到对应 Runnable");下一版章节应补一节。
3.12.2 从 | 到 Router:LCEL 的两种组合边界
本章已经讲过 | 是 LCEL 的视觉核心。源码里它有两个入口:langchain_core/runnables/base.py:618-637 的 Runnable.__or__ 把右侧对象交给 coerce_to_runnable,再构造成 RunnableSequence;base.py:639-650 的 __ror__ 处理"普通对象在左、Runnable 在右"的反向组合。真正的宽容入口在 base.py:6176-6200:如果对象已经是 Runnable,就原样返回;如果是生成器函数,包成 RunnableGenerator;如果是普通 callable,包成 RunnableLambda;如果是 dict,包成 RunnableParallel;否则抛 TypeError。
flowchart TD
A[LCEL 右侧对象] --> B{coerce_to_runnable}
B -->|Runnable| C[原样使用]
B -->|callable| D[RunnableLambda]
B -->|generator| E[RunnableGenerator]
B -->|dict| F[RunnableParallel]
B -->|其他| G[TypeError]
这个入口解释了为什么 LCEL 用起来像"魔法":prompt | model | parser、{"context": retriever, "question": RunnablePassthrough()} | prompt、lambda x: x["query"] 都能进入同一个协议。魔法不是运行时反射,而是一组明确的 coercion 规则。
但 LCEL 不只有顺序组合。RunnableSequence.__or__ 在 base.py:3077-3101 还有一个重要优化:如果右侧已经是 RunnableSequence,它会把两条 sequence 展平,避免嵌套 sequence 套 sequence;base.py:3104-3128 的反向组合也做同样处理。调用时,RunnableSequence.invoke 在 base.py:3131-3144 先建立 callback manager 和根 run,再逐步执行 steps。这就是第 12 章回调树能覆盖整条链的原因:组合结构从一开始就被保留为一个可追踪的 Runnable。
router.py 则提供另一条边界。langchain_core/runnables/router.py:46 定义 RouterRunnable,它接收一个 Mapping[str, Runnable];router.py:107 的 invoke 会读取输入里的 key,选中对应 runnable 后转发;router.py:209 的 stream 也按同样路由进入子 runnable。它和 RunnableBranch 的区别是:
| 组合原语 | 路由依据 | 适合场景 |
|---|---|---|
RunnableBranch |
一组谓词函数按顺序判断 | 条件来自程序逻辑,如分数阈值、字段存在性 |
RouterRunnable |
输入里已经有 route key | 上游模型或分类器已经给出意图标签 |
RunnableParallel |
无路由,所有分支都执行 | 扇出、多路特征提取、RAG context/question 组合 |
工程上这三者不能混用。你如果已经有一个 intent classifier 输出 "weather" / "news" / "chat",用 RouterRunnable 比写一串 lambda predicate 更直接;如果判断条件依赖复杂业务状态,RunnableBranch 更清楚;如果需要同时计算多个字段,才是 RunnableParallel。LCEL 的价值不只是写法短,而是把串行、并行、条件、路由都纳入同一套 Runnable 协议。
这套协议的真正价值,是把"组件怎么接"和"组件做什么"分开。模型调用、检索器、解析器、普通函数、字典并行分支,在业务语义上完全不同;但只要被规整成同一种可运行对象,外层就能统一处理配置、回调、批量、异步和流式。框架用户感受到的是写法简洁,框架作者真正维护的是组合代数。
排查 LCEL 问题时也应沿着这条边界走。第一步看对象是否被正确规整:普通函数是不是变成函数包装器,字典是不是变成并行分支,生成器是不是变成流式生成器。第二步看组合结构是否被展平:嵌套管道是否变成一条步骤列表。第三步才看每个组件自己的业务逻辑。很多"链路不工作"的问题,根因并不在模型或提示词,而在某个普通对象没有被正确规整成协议对象。
这和丛书里的其他框架形成呼应。LangGraph 把状态机节点规整成图执行单元,MCP 把工具和资源规整成协议消息,Serde 把用户类型规整成序列化数据模型。成熟框架的共同点,都是先定义一个足够窄的协议,再让各种复杂对象进入这个协议。
所以学 LCEL 时,不要只记住管道符。管道符只是入口,真正要掌握的是对象规整、组合展平、配置传播、回调追踪这四个机制。它们合起来,才让一段短链具备生产系统需要的可执行性。
掌握这四个机制后,短链和长链的理解方式是一样的:都是协议对象在组合结构中流动。
3.13 小结
本章深入分析了 LangChain 的两大核心支柱:Runnable 协议和 LCEL 表达式语言。
Runnable 协议通过一个抽象方法(invoke)和一组带有合理默认实现的标准方法(ainvoke/batch/stream 等),使得任何组件只需实现 invoke 就能自动获得异步、批量、流式的全部能力。类型安全通过 Generic[Input, Output] 保证,Schema 推断通过 Pydantic 和 Python 反射实现。
LCEL 的 | 操作符利用 Python 的 __or__/__ror__ 魔术方法,将组件组合的代码转化为类似 Unix 管道的声明式语法。coerce_to_runnable 函数使得 | 两侧可以接受普通函数、字典等非 Runnable 对象,极大提升了 LCEL 的灵活性。
六个核心组合原语各司其职:
- RunnableSequence:管道串联,自动展平嵌套,支持流式传播
- RunnableParallel:并行分发,线程池执行,Schema 自动合并
- RunnableLambda:函数包装,类型推断,依赖追踪
- RunnableBranch:条件路由,顺序评估,默认分支
- RunnablePassthrough:数据透传,副作用执行,字段增强(assign)
- RunnableBinding:参数固化,配置绑定,类型覆写
物理事实:langchain_core/runnables/ 16 文件 13730 行——base.py 单文件 6261 行占 46% 是"单文件大方法集"极端体现;可视化(graph + 3 个 graph_*)1823 行占 13.3% 体现 LangChain 把调试可观测性当一等公民;router.py 239 行的 RouterRunnable 是本章未覆盖板块。