LangChain 设计与实现
第11章 Chain 组合模式
第11章 Chain 组合模式
引言
在 LangChain 的早期架构中,Chain 是最核心的抽象之一。它提供了一种将多个组件(模型、提示词、输出解析器、检索器等)串联起来形成完整处理流程的标准化方式。Chain 的设计理念源自函数式编程中的管道思想:每个 Chain 接收结构化输入,执行内部逻辑,产出结构化输出,输出又可以作为下一个 Chain 的输入。
然而,随着 LangChain 的演进,传统 Chain 类正在被更灵活的 LCEL(LangChain Expression Language)所取代。在源码中,我们可以清晰地看到这一演变:Chain 基类继承自 RunnableSerializable,这意味着每个传统 Chain 本身就是一个 Runnable,可以无缝融入 LCEL 管道。与此同时,许多传统 Chain 子类已被标记为 @deprecated,并提供了基于 LCEL 的替代方案。
本章将深入剖析 Chain 体系的设计与实现,理解传统 Chain 的工作机制,并对比 LCEL 的现代替代方式,帮助读者在实际项目中做出正确的技术选型。
本章要点
- Chain 基类继承自
RunnableSerializable,是连接传统 API 与现代 Runnable 体系的桥梁 - 文档处理链族(Stuff/MapReduce/Refine/Reduce)为 RAG 场景提供了多种文档合并策略
RetrievalQA和ConversationalRetrievalChain是经典的检索问答链,现已被create_retrieval_chain取代SequentialChain和RouterChain提供了顺序编排和动态路由能力- LCEL 以管道操作符
|替代了大部分传统 Chain 的使用场景,是当前推荐的实践方式
11.1 Chain 基类:连接两个时代的桥梁
11.1.1 类继承结构
Chain 基类定义在 langchain_classic/chains/base.py 中,它的继承关系揭示了 LangChain 架构演进的脉络:
classDiagram
class Serializable {
+is_lc_serializable() bool
+to_json() dict
}
class Runnable {
+invoke(input, config) Output
+ainvoke(input, config) Output
+batch(inputs) list
+stream(input) Iterator
}
class RunnableSerializable {
<<同时具备序列化和Runnable能力>>
}
class Chain {
+memory: BaseMemory | None
+callbacks: Callbacks
+verbose: bool
+input_keys* list~str~
+output_keys* list~str~
+_call(inputs, run_manager)* dict
+invoke(input, config) dict
+prep_inputs(inputs) dict
+prep_outputs(inputs, outputs) dict
}
Serializable <|-- RunnableSerializable
Runnable <|-- RunnableSerializable
RunnableSerializable <|-- Chain
Chain 继承自 RunnableSerializable[dict[str, Any], dict[str, Any]],这个泛型参数明确了 Chain 的输入输出类型:接收字典、返回字典。这一设计既保留了传统 Chain 的接口契约,又使其自动获得了 Runnable 接口的全部能力。
11.1.2 核心属性与抽象方法
Chain 基类定义了四个核心属性和两个抽象方法:
class Chain(RunnableSerializable[dict[str, Any], dict[str, Any]], ABC):
memory: BaseMemory | None = None
callbacks: Callbacks = Field(default=None, exclude=True)
verbose: bool = Field(default_factory=_get_verbosity)
tags: list[str] | None = None
metadata: dict[str, Any] | None = None
@property
@abstractmethod
def input_keys(self) -> list[str]:
"""Keys expected to be in the chain input."""
@property
@abstractmethod
def output_keys(self) -> list[str]:
"""Keys expected to be in the chain output."""
@abstractmethod
def _call(
self,
inputs: dict[str, Any],
run_manager: CallbackManagerForChainRun | None = None,
) -> dict[str, Any]:
"""Execute the chain."""
input_keys 和 output_keys 构成了 Chain 的静态类型契约。每个子类必须声明它期望的输入键和将产出的输出键。_call 是实际执行逻辑的模板方法,由子类实现具体的业务逻辑。
11.1.2.5 new_arg_supported 的签名嗅探:向后兼容的巧思
翻开 langchain_classic/chains/base.py:156、有一行不起眼的代码、实则体现了 Chain 基类在 API 演进中保全旧用户代码的智慧:
new_arg_supported = inspect.signature(self._call).parameters.get("run_manager")
这是运行时签名内省——检查子类的 _call 方法是否有 run_manager 参数。如果有、后面就 self._call(inputs, run_manager=run_manager);如果没有、就 self._call(inputs)。_acall 也有同样的检查(line 212)。
为什么要做这个检查?——因为 Chain 的 _call 签名演进过。早期版本的 _call 只有 inputs 一个参数、用户自定义的 Chain 子类只写了 def _call(self, inputs)。后来 LangChain 引入了 callback 系统、给 _call 加了 run_manager 参数——但老用户代码里没 run_manager。
如果硬性要求新签名、所有继承 Chain 的用户代码都要同时改——这会造成大量破坏性变更。内省嗅探 + 条件分支调用、让新旧两种签名共存:老代码不改一行照常工作、新代码能接到 run_manager 做细粒度 callback。
这是向后兼容的"检测 + 分发"模式——不是靠"所有版本都保留老 API"、而是靠"运行时看看你能处理什么、再传对应的参数"。代价是一次内省开销 + 两条代码分支、收益是不破坏任何已有下游代码。
注意 inspect.signature 是有缓存的(CPython 内部对 __signature__ 有 functools.lru_cache)、第二次访问同一个方法不会重算——这就把开销压到"每个 Chain 类第一次 invoke 时一次内省、之后免费"。
11.1.3 invoke 方法的执行流程
invoke 方法是 Chain 与 Runnable 接口的对接点。它的实现揭示了一个精心编排的执行流程:
def invoke(
self, input: dict[str, Any], config: RunnableConfig | None = None, **kwargs: Any
) -> dict[str, Any]:
config = ensure_config(config)
callbacks = config.get("callbacks")
tags = config.get("tags")
metadata = config.get("metadata")
run_name = config.get("run_name") or self.get_name()
inputs = self.prep_inputs(input)
callback_manager = CallbackManager.configure(
callbacks, self.callbacks, self.verbose, tags, self.tags, metadata, self.metadata
)
run_manager = callback_manager.on_chain_start(None, inputs, name=run_name)
try:
self._validate_inputs(inputs)
outputs = self._call(inputs, run_manager=run_manager)
final_outputs = self.prep_outputs(inputs, outputs, return_only_outputs)
except BaseException as e:
run_manager.on_chain_error(e)
raise
run_manager.on_chain_end(outputs)
return final_outputs
这个流程包含六个关键步骤:
flowchart TD
A[invoke 被调用] --> B[prep_inputs: 注入 Memory 变量]
B --> C[CallbackManager.configure: 合并回调配置]
C --> D[on_chain_start: 通知回调开始]
D --> E[_validate_inputs: 校验输入键完整性]
E --> F["_call: 执行子类业务逻辑"]
F --> G{执行是否成功?}
G -->|成功| H[prep_outputs: 保存 Memory 上下文]
H --> I[on_chain_end: 通知回调结束]
G -->|异常| J[on_chain_error: 通知回调异常]
J --> K[重新抛出异常]
I --> L[返回最终输出]
其中 prep_inputs 和 prep_outputs 是 Memory 集成的关键:
def prep_inputs(self, inputs: dict[str, Any] | Any) -> dict[str, str]:
if not isinstance(inputs, dict):
_input_keys = set(self.input_keys)
if self.memory is not None:
_input_keys = _input_keys.difference(self.memory.memory_variables)
inputs = {next(iter(_input_keys)): inputs}
if self.memory is not None:
external_context = self.memory.load_memory_variables(inputs)
inputs = dict(inputs, **external_context)
return inputs
def prep_outputs(self, inputs, outputs, return_only_outputs=False):
self._validate_outputs(outputs)
if self.memory is not None:
self.memory.save_context(inputs, outputs)
if return_only_outputs:
return outputs
return {**inputs, **outputs}
这种设计使得 Memory 对子类完全透明:子类的 _call 方法无需感知 Memory 的存在,所有的记忆加载和保存都由基类自动处理。
11.1.4 _validate_inputs 的"单参数歧义解析"
_validate_inputs(base.py:289-309)逻辑看起来简单、真实实现里藏着一个对 "调用者传了单个字符串" 的特殊情况处理:
def _validate_inputs(self, inputs: Any) -> None:
if not isinstance(inputs, dict):
_input_keys = set(self.input_keys)
if self.memory is not None:
_input_keys = _input_keys.difference(self.memory.memory_variables)
if len(_input_keys) != 1:
msg = (
f"A single string input was passed in, but this chain expects "
f"multiple inputs ({_input_keys}). When a chain expects "
f"multiple inputs, please call it by passing in a dictionary, "
"eg `chain({'foo': 1, 'bar': 2})`"
)
raise ValueError(msg)
missing_keys = set(self.input_keys).difference(inputs)
if missing_keys:
msg = f"Missing some input keys: {missing_keys}"
raise ValueError(msg)
三步逻辑:
① 如果传的不是 dict(比如 chain("hello"))——把 input_keys 扣掉 memory 能提供的那些、看剩下的单键数是否是 1。只有剩下恰好一个键时、单字符串才能被无歧义地 map 到这个键上——否则没法知道字符串该放进哪个槽。
② 如果歧义(>1 个非 memory 键)——直接 ValueError、并给出详细建议:"请传 dict"。这种报错消息是 LangChain 文档性的体现——不只是说"错了"、还说"怎么改"。
③ 最后一定要检查 missing_keys——即使 inputs 是 dict、缺少必要键也得报错。
这个函数对"用户传单字符串"的处理呼应了 §11.1.1 prep_inputs 的自动字符串→dict 转换逻辑——两者配合:prep_inputs 把字符串包成 dict、validate_inputs 确保包好后键都齐备。两层加一起、Chain 才能既接受严格的 dict 输入、又允许便利的单字符串调用。
这种对"歧义输入"做显式检测、是 LangChain 基类对用户最友好的一面——隐蔽错误在最早的边界就被捕获。
11.1.5 _acall 的 run_in_executor 默认回退
Chain 提供了 _acall 默认实现(base.py:340-366):
async def _acall(
self,
inputs: builtins.dict[str, Any],
run_manager: AsyncCallbackManagerForChainRun | None = None,
) -> builtins.dict[str, Any]:
return await run_in_executor(
None,
self._call,
inputs,
run_manager.get_sync() if run_manager else None,
)
子类如果只实现了同步的 _call、没实现异步的 _acall、基类会自动把 _call 放进 executor(线程池)运行。这意味着所有 Chain 子类自动获得 async 能力——不用手写 _acall 的异步版本。
关键细节在 run_manager.get_sync() if run_manager else None——异步的 AsyncCallbackManagerForChainRun 被转换成同步等价物才传给 _call、因为 _call 是同步函数、不能接受异步 manager。这个转换由 get_sync 方法在 core 里实现、负责把 async callback 桥接到 sync 接口。
代价:在 executor 里跑同步代码、实际上还是同步的——只是从 asyncio 的主线程搬到了一个 worker 线程。如果 _call 里有阻塞 I/O(比如同步 HTTP 请求)、这样至少不会卡住整个 event loop。但如果 _call 里有异步友好的操作(比如 httpx 的异步客户端)、被 _call 的同步接口强制包成 sync、就失去了并发优势——这时候子类应该自己实现 _acall、绕过默认回退。
这是典型的"默认值可工作、但不是最优"——让新手不卡在"我必须写 async 版本"上、同时提醒性能敏感的用户"请自己实现 _acall"。LangChain 大量基类都采用这种"默认回退 + 可选优化"的 async 支持模式。
11.2 文档处理链族
RAG(检索增强生成)是 LangChain 最核心的应用场景之一,而文档处理链负责将检索到的文档转化为语言模型可以消费的格式。LangChain 提供了三种主要的文档合并策略。
11.2.1 BaseCombineDocumentsChain
所有文档处理链的基类定义在 chains/combine_documents/base.py 中:
class BaseCombineDocumentsChain(Chain, ABC):
input_key: str = "input_documents"
output_key: str = "output_text"
@abstractmethod
def combine_docs(self, docs: list[Document], **kwargs: Any) -> tuple[str, dict]:
"""Combine documents into a single string."""
def _call(self, inputs, run_manager=None):
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
docs = inputs[self.input_key]
other_keys = {k: v for k, v in inputs.items() if k != self.input_key}
output, extra_return_dict = self.combine_docs(
docs, callbacks=_run_manager.get_child(), **other_keys
)
extra_return_dict[self.output_key] = output
return extra_return_dict
注意 combine_docs 返回一个元组 (str, dict),第一个元素是合并后的文本,第二个元素是额外的返回字典。这个设计允许子类在主输出之外返回附加信息(如中间结果)。
11.2.1.5 BaseCombineDocumentsChain 的 prompt_length "可选能力"
打开 combine_documents/base.py:80-96,会看到一个空壳方法:
def prompt_length(self, docs: list[Document], **kwargs: Any) -> int | None:
"""Return the prompt length given the documents passed in."""
return None
默认实现返回 None——意思是"我不知道这些文档跑到 prompt 里会是多长"。子类可以选择 override 这个方法(比如 StuffDocumentsChain 就实现了)、返回实际 token 数。
为什么基类不 abstractmethod?——因为不是所有 combine chain 都能有意义地计算长度。比如 MapReduce chain——它根本不是单次把文档塞进 prompt、而是分批处理、没有单一的"prompt length"概念。强制所有子类实现就违反了Liskov 替换原则(子类不能比父类承诺更少)。
为什么基类也不直接省略?——因为调用方(比如 ReduceDocumentsChain 的 _collapse)需要一个统一接口来问"你处理得动这堆文档吗"。如果没这个方法、调用方得对每种 combine chain 类型做 isinstance 分支——设计反模式。
返回 None 的契约是"基于长度的递归折叠不适用"——上层看到 None 就跳过长度检查、直接调 combine_docs。返回 int 才表示"请根据这个数决定是否需要 collapse"。这是Optional 返回值作为能力标记的经典用法——类似 Rust 里 trait 可选方法的模式。
ReduceDocumentsChain 的 _collapse(§11.2.3)正是这么用的:num_tokens = length_func(result_docs, **kwargs)、while num_tokens is not None and num_tokens > _token_max:——用 is not None 把"不支持长度"的 combine chain 绕过、不报错、只是不做折叠。这种"契约式可选能力"让接口既统一、又不强求所有实现。
11.2.2 StuffDocumentsChain:填充策略
Stuff(填充)是最简单直接的策略:将所有文档拼接成一个字符串,一次性传递给语言模型。
class StuffDocumentsChain(BaseCombineDocumentsChain):
llm_chain: LLMChain
document_prompt: BasePromptTemplate
document_variable_name: str
document_separator: str = "\n\n"
def combine_docs(self, docs, callbacks=None, **kwargs):
inputs = self._get_inputs(docs, **kwargs)
return self.llm_chain.predict(callbacks=callbacks, **inputs), {}
def _get_inputs(self, docs, **kwargs):
doc_strings = [format_document(doc, self.document_prompt) for doc in docs]
inputs = {k: v for k, v in kwargs.items()
if k in self.llm_chain.prompt.input_variables}
inputs[self.document_variable_name] = self.document_separator.join(doc_strings)
return inputs
flowchart LR
subgraph 输入
D1[文档1]
D2[文档2]
D3[文档3]
end
subgraph StuffDocumentsChain
FMT[format_document 格式化每个文档]
JOIN["join 用分隔符拼接"]
PROMPT[填入 prompt 模板]
LLM[调用 LLM]
end
D1 --> FMT
D2 --> FMT
D3 --> FMT
FMT --> JOIN
JOIN --> PROMPT
PROMPT --> LLM
LLM --> OUTPUT[输出结果]
Stuff 策略的优势在于简单高效,适合文档数量少、总长度不超过模型上下文窗口的场景。其劣势也很明显:当文档总量超过上下文限制时无法使用。
11.2.3 ReduceDocumentsChain:递归缩减策略
当文档总量超过上下文窗口时,需要分批处理。ReduceDocumentsChain 通过递归折叠实现这一目标:
class ReduceDocumentsChain(BaseCombineDocumentsChain):
combine_documents_chain: BaseCombineDocumentsChain
collapse_documents_chain: BaseCombineDocumentsChain | None = None
token_max: int = 3000
def _collapse(self, docs, token_max=None, callbacks=None, **kwargs):
result_docs = docs
length_func = self.combine_documents_chain.prompt_length
num_tokens = length_func(result_docs, **kwargs)
_token_max = token_max or self.token_max
while num_tokens is not None and num_tokens > _token_max:
new_result_doc_list = split_list_of_docs(
result_docs, length_func, _token_max, **kwargs
)
result_docs = [
collapse_docs(docs_, _collapse_docs_func, **kwargs)
for docs_ in new_result_doc_list
]
num_tokens = length_func(result_docs, **kwargs)
return result_docs, {}
flowchart TD
START[输入 N 个文档] --> CHECK{总 token 数超过阈值?}
CHECK -->|否| COMBINE[直接调用 combine_documents_chain]
CHECK -->|是| SPLIT[split_list_of_docs: 按 token 上限分组]
SPLIT --> COLLAPSE[每组调用 collapse_chain 压缩为单个文档]
COLLAPSE --> RECHECK{压缩后总 token 数仍超过阈值?}
RECHECK -->|是| SPLIT
RECHECK -->|否| COMBINE
COMBINE --> OUTPUT[最终输出]
split_list_of_docs 函数是分组的核心,它贪心地将文档添加到当前组中,当组的 token 数超过阈值时开始新的一组:
def split_list_of_docs(docs, length_func, token_max, **kwargs):
new_result_doc_list = []
_sub_result_docs = []
for doc in docs:
_sub_result_docs.append(doc)
_num_tokens = length_func(_sub_result_docs, **kwargs)
if _num_tokens > token_max:
if len(_sub_result_docs) == 1:
raise ValueError("A single document was longer than the context length")
new_result_doc_list.append(_sub_result_docs[:-1])
_sub_result_docs = _sub_result_docs[-1:]
new_result_doc_list.append(_sub_result_docs)
return new_result_doc_list
split_list_of_docs 的几个耐人寻味的细节:
① _sub_result_docs[:-1] 回退——当加入一个文档发现超限时、new_result_doc_list.append(_sub_result_docs[:-1]) 把不包括最后那个文档的列表存为一组、然后 _sub_result_docs = _sub_result_docs[-1:] 让最后那个文档变成新一组的第一个。贪心的 "装不下就换组"——和装箱算法里的 First Fit Decreasing 同构。
② 单文档超限的硬错——if len(_sub_result_docs) == 1: raise ValueError(...)——如果一个文档自己就超过 token_max、直接报错而不是偷偷接受。这是因为这种文档没法靠分组解决、必须先 split 成更小的文档——把错误扔给调用方、让他们知道"你得先 split_documents"。
③ 循环尾的 new_result_doc_list.append(_sub_result_docs)——循环结束后最后一组也要加进去(因为循环只在"超限"时 append、正常 append 一步可能没走到)。这是典型"最后一批不要漏"的模式——不加这行就会丢掉剩下的文档、还不报错、最隐蔽的 bug。
collapse_docs 函数将一组文档合并为一个文档,同时智能合并元数据:
def collapse_docs(docs, combine_document_func, **kwargs):
result = combine_document_func(docs, **kwargs)
combined_metadata = {k: str(v) for k, v in docs[0].metadata.items()}
for doc in docs[1:]:
for k, v in doc.metadata.items():
if k in combined_metadata:
combined_metadata[k] += f", {v}"
else:
combined_metadata[k] = str(v)
return Document(page_content=result, metadata=combined_metadata)
11.2.3.5 collapse_docs 的元数据合并:, 拼接 vs 覆盖
元数据合并的细节(chains/combine_documents/reduce.py 附近)里藏着一个看似无关紧要实际关乎语义的选择:
def collapse_docs(docs, combine_document_func, **kwargs):
result = combine_document_func(docs, **kwargs)
combined_metadata = {k: str(v) for k, v in docs[0].metadata.items()}
for doc in docs[1:]:
for k, v in doc.metadata.items():
if k in combined_metadata:
combined_metadata[k] += f", {v}" # ← 拼接、不覆盖
else:
combined_metadata[k] = str(v)
return Document(page_content=result, metadata=combined_metadata)
为什么是拼接、不是 覆盖?因为合并文档的元数据有不同的语义需求:
source: "doc1.pdf"+source: "doc2.pdf"→source: "doc1.pdf, doc2.pdf"——保留溯源、让下游知道合并文档来自哪几个源文件。page: "5"+page: "12"→page: "5, 12"——保留页码列表、虽然不再是单个数字、至少信息没丢。author: "Alice"+author: "Alice"→author: "Alice, Alice"——重复也保留、虽然冗余、但如果选择去重就得做 hash、简单实现里就不管了。
为什么不用 list 存多值?——因为元数据要能序列化到 prompt 里(通过 format_document 格式化)、字符串最通用。如果存 list、prompt 模板拿到 page = [5, 12] 还得自己处理格式。统一成 ", " 分隔的字符串、是可读性 + 可格式化的折中。
str(v) 的强制转换——即使 metadata 里原本是 int/bool、合并后也统一为 string。这是因为下一轮合并可能再 collapse、int + str 会报错、全部变成 str 才能持续合并下去。一次性付出"类型丢失"的代价、换来了递归合并的幂等性。
这个小函数体现了 LangChain 对"工程可用"的追求——不是设计最优雅、而是最能跑得通且不丢信息的方案。
11.2.4 三种策略的对比
graph TB
subgraph "Stuff 策略"
S1[所有文档] --> S2[拼接] --> S3[单次 LLM 调用]
end
subgraph "Map-Reduce 策略"
MR1[文档1] --> MR2[LLM 调用1]
MR3[文档2] --> MR4[LLM 调用2]
MR5[文档3] --> MR6[LLM 调用3]
MR2 --> MR7[合并中间结果]
MR4 --> MR7
MR6 --> MR7
MR7 --> MR8[最终 LLM 调用]
end
subgraph "Refine 策略"
R1[文档1 + 初始 prompt] --> R2[LLM 调用1 得到初始答案]
R2 --> R3[文档2 + 上一步答案] --> R4[LLM 调用2 精炼]
R4 --> R5[文档3 + 上一步答案] --> R6[LLM 调用3 精炼]
end
| 策略 | LLM 调用次数 | 适用场景 | 优势 | 劣势 |
|---|---|---|---|---|
| Stuff | 1 | 文档总量小 | 简单高效,上下文完整 | 受限于上下文窗口 |
| Map-Reduce | N+1 | 大量文档 | 可并行处理,扩展性好 | 可能丢失跨文档关联 |
| Refine | N | 需要深度理解 | 逐步精炼答案质量高 | 串行执行速度慢 |
| Reduce | 变化 | 超长文档集 | 自适应递归压缩 | 多轮压缩可能丢失信息 |
11.3 检索问答链
11.3.1 RetrievalQA:经典的检索问答
RetrievalQA 是最经典的 RAG Chain,它将检索器和文档处理链组合在一起:
class BaseRetrievalQA(Chain):
combine_documents_chain: BaseCombineDocumentsChain
input_key: str = "query"
output_key: str = "result"
return_source_documents: bool = False
def _call(self, inputs, run_manager=None):
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
question = inputs[self.input_key]
docs = self._get_docs(question, run_manager=_run_manager)
answer = self.combine_documents_chain.run(
input_documents=docs, question=question,
callbacks=_run_manager.get_child()
)
if self.return_source_documents:
return {self.output_key: answer, "source_documents": docs}
return {self.output_key: answer}
class RetrievalQA(BaseRetrievalQA):
retriever: BaseRetriever
def _get_docs(self, question, *, run_manager):
return self.retriever.invoke(
question, config={"callbacks": run_manager.get_child()}
)
11.3.1.5 return_source_documents 的条件返回结构
RetrievalQA 里有一个常被忽略的字段 return_source_documents: bool = False。它决定 chain 返回 dict 里是否附带 source_documents 键:
if self.return_source_documents:
return {self.output_key: answer, "source_documents": docs}
return {self.output_key: answer}
为什么默认关?——因为带 source_documents 的返回字典大得多——包含检索到的完整文档(可能几 KB 到 MB)。如果调用方只关心答案、这些 doc 就是浪费。默认关、需要的人显式打开、是小-by-default的合理默认。
为什么不直接用 if 简化返回?——熟悉 Python 的读者可能会写:
return {self.output_key: answer, **({"source_documents": docs} if self.return_source_documents else {})}
这种 dict 展开写法功能等价、但可读性差——第一眼看不出返回值的 shape 有几种。LangChain 用两个分支明确的 return、IDE 类型推导和阅读都友好——{a} 和 {a, b} 两种结构在两行里一目了然。
类型层面——返回值类型实际上是一个 union type:dict[str, str] | dict[str, str | list[Document]]。但 LangChain 没有用 TypedDict 精确建模、而是用 dict[str, Any]——因为 Python 的类型系统对"条件 key 存在"支持得还不够优雅(NotRequired 是 3.11+ 才有的)。向下兼容的代价是返回类型不精确、但能跨 Python 3.9+ 正常工作。
这个 return_source_documents 字段的影响会一路传导到上层——RAG 应用的 UI 需不需要显示"答案基于哪些文档"、完全由这一个布尔位决定。它看似小、但决定了整个应用的展示层能做什么。
11.3.2 ConversationalRetrievalChain:会话式检索
ConversationalRetrievalChain 在 RetrievalQA 的基础上增加了会话历史处理能力。它的核心设计是使用一个独立的 question_generator 链将原始问题和聊天历史转化为独立的检索查询:
class BaseConversationalRetrievalChain(Chain):
combine_docs_chain: BaseCombineDocumentsChain
question_generator: LLMChain
rephrase_question: bool = True
return_source_documents: bool = False
def _call(self, inputs, run_manager=None):
question = inputs["question"]
chat_history_str = get_chat_history(inputs["chat_history"])
if chat_history_str:
new_question = self.question_generator.run(
question=question, chat_history=chat_history_str,
callbacks=_run_manager.get_child()
)
else:
new_question = question
docs = self._get_docs(new_question, inputs, run_manager=_run_manager)
if self.rephrase_question:
new_inputs["question"] = new_question
answer = self.combine_docs_chain.run(
input_documents=docs, callbacks=_run_manager.get_child(), **new_inputs
)
return {self.output_key: answer}
sequenceDiagram
participant User as 用户
participant CRC as ConversationalRetrievalChain
participant QG as question_generator
participant Retriever as 检索器
participant CDC as combine_docs_chain
participant LLM as 语言模型
User->>CRC: {question, chat_history}
CRC->>CRC: 格式化 chat_history 为字符串
alt 有聊天历史
CRC->>QG: {question, chat_history}
QG->>LLM: 生成独立查询
LLM-->>QG: standalone_question
QG-->>CRC: new_question
else 无聊天历史
CRC->>CRC: new_question = question
end
CRC->>Retriever: new_question
Retriever-->>CRC: relevant_documents
CRC->>CDC: {documents, question}
CDC->>LLM: 合并文档 + 生成回答
LLM-->>CDC: answer
CDC-->>CRC: answer
CRC-->>User: {answer, source_documents?}
11.3.2.5 rephrase_question 选项与 new_question 的两种身份
ConversationalRetrievalChain 里 rephrase_question: bool = True 控制一个非常精妙的行为:改写后的问题是只用于检索、还是同时传给合成链?
if chat_history_str:
new_question = self.question_generator.run(
question=question, chat_history=chat_history_str, ...
)
else:
new_question = question
docs = self._get_docs(new_question, inputs, ...) # ← 检索用改写后
if self.rephrase_question:
new_inputs["question"] = new_question # ← 合成也用改写后
# else: new_inputs["question"] 仍然是原始 question
answer = self.combine_docs_chain.run(...)
rephrase_question = True(默认)——LLM 同时看到改写后的独立问题。场景:用户说"它多大?"、改写后是"姚明有多高?"、合成链就直接给出"姚明身高 2.26 米"。
rephrase_question = False——LLM 看到原始的、带上下文歧义的问题"它多大?"、但有了 chat_history 和检索到的 docs——让 LLM 自己做语义关联。有些场景下这样结果更自然(保持对话流)——因为改写后的问题听起来像第三方发问、失去了对话的亲密感。
为什么给用户这个选项?——因为没有普适的正确答案。技术问答场景偏向 True(精确性重要)、聊天机器人场景偏向 False(口吻连贯)。把选择权交给应用层、是 LangChain 对"框架不替用户决断产品体验"的克制。
这就像编程语言里"显式 vs 隐式"的 switch——框架提供两种行为、各有其美、让用户根据业务场景选。LangGraph 的 interrupts、Vite 的 HMR 也都有类似"行为开关"——不是每个问题都有正确答案、给 knob是成熟框架的标志。
11.3.3 create_retrieval_chain:LCEL 时代的替代方案
create_retrieval_chain 是传统检索链的现代替代品,它返回一个纯粹的 LCEL Runnable:
def create_retrieval_chain(
retriever: BaseRetriever | Runnable[dict, RetrieverOutput],
combine_docs_chain: Runnable[dict[str, Any], str],
) -> Runnable:
if not isinstance(retriever, BaseRetriever):
retrieval_docs = retriever
else:
retrieval_docs = (lambda x: x["input"]) | retriever
return (
RunnablePassthrough.assign(
context=retrieval_docs.with_config(run_name="retrieve_documents"),
).assign(answer=combine_docs_chain)
).with_config(run_name="retrieval_chain")
这个实现体现了 LCEL 的精髓:通过 RunnablePassthrough.assign 在数据字典中逐步添加字段。首先执行检索将结果赋给 context 键,然后将整个字典(包含 input 和 context)传递给 combine_docs_chain 生成答案赋给 answer 键。
配合 create_stuff_documents_chain,一个完整的 RAG 管道可以这样构建:
def create_stuff_documents_chain(llm, prompt, *, output_parser=None,
document_prompt=None, document_separator="\n\n",
document_variable_name="context"):
_document_prompt = document_prompt or DEFAULT_DOCUMENT_PROMPT
_output_parser = output_parser or StrOutputParser()
def format_docs(inputs: dict) -> str:
return document_separator.join(
format_document(doc, _document_prompt)
for doc in inputs[document_variable_name]
)
return (
RunnablePassthrough.assign(**{document_variable_name: format_docs})
.with_config(run_name="format_inputs")
| prompt | llm | _output_parser
).with_config(run_name="stuff_documents_chain")
11.4 SequentialChain:顺序编排
SequentialChain 将多个 Chain 串联执行,前一个 Chain 的输出自动成为后续 Chain 的可用输入:
class SequentialChain(Chain):
chains: list[Chain]
input_variables: list[str]
output_variables: list[str]
return_all: bool = False
def _call(self, inputs, run_manager=None):
known_values = inputs.copy()
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
for _i, chain in enumerate(self.chains):
callbacks = _run_manager.get_child()
outputs = chain(known_values, return_only_outputs=True, callbacks=callbacks)
known_values.update(outputs)
return {k: known_values[k] for k in self.output_variables}
SequentialChain 在构造时会执行严格的验证,确保每个 Chain 的输入键都能在已知变量中找到对应值,且不同 Chain 的输出键不会发生冲突:
@model_validator(mode="before")
@classmethod
def validate_chains(cls, values):
chains = values["chains"]
known_variables = set(values["input_variables"])
for chain in chains:
missing_vars = set(chain.input_keys).difference(known_variables)
if missing_vars:
raise ValueError(f"Missing required input keys: {missing_vars}")
overlapping_keys = known_variables.intersection(chain.output_keys)
if overlapping_keys:
raise ValueError(f"Chain returned keys that already exist: {overlapping_keys}")
known_variables |= set(chain.output_keys)
return values
11.4.1 SequentialChain.validate_chains 的构造期交叉检查
SequentialChain 的构造验证(base.py 里的 @model_validator(mode="before"))值得细看——它在 pydantic 模型构造阶段就做多链跨边界检查:
known_variables = set(values["input_variables"])
for chain in chains:
missing_vars = set(chain.input_keys).difference(known_variables)
if missing_vars:
raise ValueError(f"Missing required input keys: {missing_vars}")
overlapping_keys = known_variables.intersection(chain.output_keys)
if overlapping_keys:
raise ValueError(f"Chain returned keys that already exist: {overlapping_keys}")
known_variables |= set(chain.output_keys)
两种错误都在构造时抓:
① missing_vars——前一个 chain 没产出某个键、后一个 chain 却需要它。这就是数据流断链——如果不在构造期抓、运行时就会是一个 KeyError on inputs、离根因很远。构造期抓、错误消息直接指向"你忘了产出 X、但下游要"。
② overlapping_keys——某个 chain 要产出的键、已经在 known_variables 里。这就是数据流覆盖——要么 chain 顺序错了、要么有重复的 output_keys。不抓会发生"后一个 chain 悄悄覆盖前一个的输出"——典型的隐蔽 bug。构造期直接 ValueError、强迫用户重新设计 key 命名。
这两种检查结合 @model_validator(mode="before") 的早期执行时机、让 SequentialChain 的构造本身就是整条管道的数据流验证——一次性确保所有键名对齐、避免运行时调试的痛苦。
这种"把运行时错误变成构造期错误"是 LangChain 很多 chain 的通用模式——create_retrieval_chain 的 "input" not in prompt.input_variables 检查、ToolMessage 里的字段校验都类似。越早失败、越便宜——这是整个 LangChain 作为 pydantic 模型集合的一个核心价值主张。
SimpleSequentialChain 是简化版,要求每个子链只有一个输入键和一个输出键,上一个链的输出直接作为下一个链的输入:
class SimpleSequentialChain(Chain):
chains: list[Chain]
def _call(self, inputs, run_manager=None):
_input = inputs[self.input_key]
for i, chain in enumerate(self.chains):
_input = chain.run(_input, callbacks=_run_manager.get_child(f"step_{i + 1}"))
return {self.output_key: _input}
11.5 RouterChain:动态路由
RouterChain 实现了基于输入内容动态选择执行路径的能力:
class Route(NamedTuple):
destination: str | None
next_inputs: dict[str, Any]
class RouterChain(Chain, ABC):
@property
def output_keys(self) -> list[str]:
return ["destination", "next_inputs"]
def route(self, inputs, callbacks=None) -> Route:
result = self(inputs, callbacks=callbacks)
return Route(result["destination"], result["next_inputs"])
class MultiRouteChain(Chain):
router_chain: RouterChain
destination_chains: Mapping[str, Chain]
default_chain: Chain
silent_errors: bool = False
def _call(self, inputs, run_manager=None):
callbacks = _run_manager.get_child()
route = self.router_chain.route(inputs, callbacks=callbacks)
if not route.destination:
return self.default_chain(route.next_inputs, callbacks=callbacks)
if route.destination in self.destination_chains:
return self.destination_chains[route.destination](
route.next_inputs, callbacks=callbacks
)
if self.silent_errors:
return self.default_chain(route.next_inputs, callbacks=callbacks)
raise ValueError(f"Received invalid destination chain name '{route.destination}'")
flowchart TD
INPUT[用户输入] --> ROUTER[RouterChain]
ROUTER --> DECISION{路由决策}
DECISION -->|destination=A| CHAIN_A[Chain A]
DECISION -->|destination=B| CHAIN_B[Chain B]
DECISION -->|destination=C| CHAIN_C[Chain C]
DECISION -->|destination=None| DEFAULT[Default Chain]
DECISION -->|无效目标 + silent_errors| DEFAULT
CHAIN_A --> OUTPUT[输出]
CHAIN_B --> OUTPUT
CHAIN_C --> OUTPUT
DEFAULT --> OUTPUT
MultiRouteChain 的设计体现了策略模式:RouterChain 负责决策,destination_chains 映射表负责执行。silent_errors 参数提供了优雅降级能力,当路由到不存在的目标时自动回退到默认链。
11.5.1 silent_errors 的三级降级语义
MultiRouteChain 的 silent_errors 标志不是简单的"忽略错误"——它定义了三级降级策略:
if not route.destination: # ① destination 是 None
return self.default_chain(route.next_inputs, ...)
if route.destination in self.destination_chains: # ② 找到对应 chain
return self.destination_chains[route.destination](...)
if self.silent_errors: # ③ 找不到、但允许静默
return self.default_chain(route.next_inputs, ...)
raise ValueError(f"Received invalid destination chain name '{route.destination}'") # ④ 找不到且不允许静默
四种情况、三种降级:
① destination=None——RouterChain 显式不路由(可能 predicate 判断"不需要特殊处理")——直接走 default。这不是错误、是合法的路由决策。
② 匹配到具体 chain——正常路由、走对应 destination_chain。
③ destination 不匹配 + silent_errors——RouterChain 可能返回了幻觉目标(比如 LLM 的 RouterChain 输出了 chain 列表里没有的名字)——这是错误状态、但根据 silent_errors 决定是优雅降级到 default 还是 抛异常。
④ destination 不匹配 + silent_errors=False——显式抛 ValueError、让上层知道"RouterChain 出错了"。
silent_errors 默认是 False——因为默认情况下配置错误该大声抛出、而不是被偷偷吞掉。应用层如果判断"我宁可降级、不要崩溃"、才显式打开 silent_errors。
这种"默认严格、显式宽松"的 toggle 哲学是生产代码的黄金准则——默认行为偏向暴露问题、用户明确表达"我知道这可能错"再打开降级。和 throw vs try/catch 的分工类似——异常是 default、吞异常需要显式代码。
11.6 create_history_aware_retriever:历史感知检索
这是一个用 LCEL 构建的现代函数,替代了 ConversationalRetrievalChain 的问题改写逻辑:
def create_history_aware_retriever(llm, retriever, prompt):
retrieve_documents = RunnableBranch(
(
lambda x: not x.get("chat_history", False),
(lambda x: x["input"]) | retriever,
),
prompt | llm | StrOutputParser() | retriever,
).with_config(run_name="chat_retriever_chain")
return retrieve_documents
RunnableBranch 在这里实现了条件路由:如果没有聊天历史,直接将输入传递给检索器;如果有聊天历史,先通过 LLM 改写问题再检索。
11.6.1 create_history_aware_retriever 69 行全文解读
上面的伪代码省略了几处真实意义重大的细节。打开 langchain_classic/chains/history_aware_retriever.py 全文只有 69 行,每行都值得看:
def create_history_aware_retriever(
llm: LanguageModelLike,
retriever: RetrieverLike,
prompt: BasePromptTemplate,
) -> RetrieverOutputLike:
if "input" not in prompt.input_variables: # ← ①
msg = (f"Expected `input` to be a prompt variable, "
f"but got {prompt.input_variables}")
raise ValueError(msg)
retrieve_documents: RetrieverOutputLike = RunnableBranch(
(
# Both empty string and empty list evaluate to False
lambda x: not x.get("chat_history", False), # ← ②
# If no chat history, then we just pass input to retriever
(lambda x: x["input"]) | retriever,
),
# If chat history, then we pass inputs to LLM chain, then to retriever
prompt | llm | StrOutputParser() | retriever,
).with_config(run_name="chat_retriever_chain") # ← ③
return retrieve_documents
三个关键细节:
① 构造时的 prompt 契约校验:函数一进门就检查 "input" not in prompt.input_variables——如果你传的 prompt 没有 {input} 占位符、直接 ValueError。这是"fail-fast on misconfiguration"——构造函数阶段就拦住错误用法,不是到 invoke 才炸。对用户体验至关重要:LangChain 的错误如果在运行时冒出来、traceback 会经过 LCEL 的好几层组合,很难定位根因;构造时直接报错能精确指向"就是你 prompt 写错了"。
② "空字符串和空列表都算 False" 的有意选择(line 60-61 源码原注释):
# Both empty string and empty list evaluate to False
lambda x: not x.get("chat_history", False),
chat_history 为缺失、为空列表 []、为空字符串 "" 时统一走"无历史"分支。这避免了一个常见坑:前端传 {"chat_history": ""} 时、如果用 x.get("chat_history") is None 判断就走了"有历史"分支、prompt 空历史 + LLM 改写会产生"改写一个没有上下文的问题"的怪异行为。原注释明确写出来是给将来维护者的信号:"这不是 bug、别改"。
③ with_config(run_name="chat_retriever_chain") 给整条管道挂个名字——LangSmith 里就能看到这个节点叫 "chat_retriever_chain",不是一串下划线和随机 ID。这是第 12 章讲 callback 时 _get_ls_params 名字归一化的配套设计:在关键组合点显式命名、让 dashboard 可读。
11.6.2 create_retrieval_chain 的 RunnablePassthrough.assign 双步
另一个 LCEL 替代传统 Chain 的代表——create_retrieval_chain(同目录 retrieval.py)——全文 68 行、核心逻辑 5 行:
if not isinstance(retriever, BaseRetriever):
retrieval_docs: Runnable[dict, RetrieverOutput] = retriever
else:
retrieval_docs = (lambda x: x["input"]) | retriever # ← 适配层
return (
RunnablePassthrough.assign(
context=retrieval_docs.with_config(run_name="retrieve_documents"),
).assign(answer=combine_docs_chain)
).with_config(run_name="retrieval_chain")
三层精心设计:
1. 类型分叉适配:传入的 retriever 如果是 BaseRetriever 子类(第 10 章讲的),需要外加 (lambda x: x["input"]) | retriever 从 dict 里抽出 input 字段;如果传入的已经是 Runnable[dict, list[Doc]](已经接受整个 dict 的 runnable),直接用。这种对两种输入形态的对齐让函数能承接旧式 retriever 和新式 runnable 两种用法。
2. RunnablePassthrough.assign(context=...):把 retriever 的输出挂到 context 键、同时保留原始 input 和其他字段。这就是为什么后续 prompt 里既能用 {context} 又能用 {input} 和 {chat_history}——passthrough assign 的精髓。
3. 链式 .assign(answer=...):再跑一次 assign,用 combine_docs_chain 产出 answer 字段。最终 dict 形状是 {input, chat_history, context, answer}——所有中间值和最终答案都在同一个字典里、方便用户做错误分析、日志、或二次处理。
对比传统 RetrievalQA 的十几个属性和手写 _call 方法——现代 LCEL 版本用 60 行做了一个字典转换器。功能完全等价、可读性和可调试性好得多。这就是 @deprecated(since="0.1.17") 背后的设计自信:不是随便换 API、而是新 API 在每个工程维度都严格优于旧版。
11.6.3 RunnableBranch 的"第一个不匹配就尝试下一个" 语义
RunnableBranch 在 create_history_aware_retriever 里的用法暴露了它的设计:
RunnableBranch(
(
lambda x: not x.get("chat_history", False), # condition
(lambda x: x["input"]) | retriever, # branch
),
prompt | llm | StrOutputParser() | retriever, # default
)
位置参数模式:前 N 个是 (condition, runnable) 元组、最后一个是 default。RunnableBranch 按顺序依次尝试 condition、第一个返回 truthy 的对应 runnable 执行;都不匹配走 default。
为什么不是 if/elif/else 式的 switch-case?——因为 RunnableBranch 本身也是 Runnable、要进 LCEL 管道、要能被 .with_config 挂名字、要自动 streaming、要支持 sync/async。switch-case 不是 Runnable、没法承担这些。
condition 必须是 Runnable 或 callable——本质上是 lambda。LangChain 把 lambda 自动包成 RunnableLambda(通过 LCEL 的 __or__ 机制)——对用户透明。
这种"链式 branch"和传统 if-elif 最大的差异是可组合性——你可以把一个完整的 RunnableBranch 塞到更大的管道里、作为一个节点。而 if-elif 只能在函数体里用、没法作为"构件"传给下游。LCEL 的哲学——一切皆 Runnable——让条件逻辑也变成可组合的单元。
对比第 15 章 vite 的中间件链、第 12 章 hyper 的 Steer——都是**"按运行时信息选择分支"**的模式、但用不同语言/框架的原语实现。RunnableBranch 在 Python 里扮演 tower::Steer 在 Rust 里扮演的角色——动态路由的统一接口。
11.6.4 与本书第 9/10 章的呼应
与第 9 章(Runnable 协议)的呼应——第 9 章建立了 Runnable 是 LangChain 所有东西的共同协议——Chain、Retriever、PromptTemplate、LLM、OutputParser 全都是 Runnable。本章给出了 Chain 如何成为 Runnable 的具体实现:继承 RunnableSerializable、override invoke、保留 _call 作为子类扩展点。理解了第 9 章的协议、再看 Chain 的 invoke 就能发现每一步都对应 Runnable 的生命周期(prep_inputs → _call → prep_outputs)。
与第 10 章(Retriever 与文档加载)的呼应——本章讲的 StuffDocumentsChain、ReduceDocumentsChain 是第 10 章检索到的 Document 的下游消费者。第 10 章告诉你 "文档从哪来"、本章告诉你 "文档怎么用"——两章合成一个完整的 RAG 工作流。create_retrieval_chain 的 retriever 参数就是第 10 章构造的 Retriever。
与 LangGraph 第 14 章的呼应——LangGraph 的 Runtime 注入机制和 Chain 的 Memory/Callback 注入是同一个问题的不同解:都要"跨节点共享运行时依赖"。LangChain 选择了 Memory 字段 + Callback 参数、LangGraph 选择了 frozen Runtime 对象——两者体现了**"dynamic 配置" vs "type-safe 配置"**的设计张力。LangChain 灵活但需要运行时检查、LangGraph 严格但编译期就能验证。
11.7 从传统 Chain 到 LCEL 的迁移
11.7.1 设计决策:为何弃旧迎新
传统 Chain 存在几个固有限制:
- 输入输出格式固化:必须是
dict[str, Any],不够灵活 - 静态类型声明:
input_keys和output_keys是硬编码的属性 - 组合性受限:需要专门的 SequentialChain 来编排,不如
|操作符直观 - 流式支持不完善:传统 Chain 难以实现细粒度的流式输出
LCEL 通过 Runnable 协议解决了这些问题:任意类型的输入输出、自动类型推导、操作符组合、原生流式支持。
11.7.2 迁移对照表
| 传统 Chain | LCEL 替代方案 |
|---|---|
LLMChain(llm, prompt) |
prompt | llm | StrOutputParser() |
StuffDocumentsChain |
create_stuff_documents_chain(llm, prompt) |
RetrievalQA |
create_retrieval_chain(retriever, combine_docs) |
ConversationalRetrievalChain |
create_history_aware_retriever + create_retrieval_chain |
SequentialChain([c1, c2]) |
c1 | c2 |
RouterChain + MultiRouteChain |
RunnableBranch(...) |
11.7.2.5 迁移示例:LLMChain → RunnableSequence 的实际差异
最基础的迁移:LLMChain(llm=llm, prompt=prompt) → prompt | llm | StrOutputParser()。看起来一对一、实际上有几处真实差异值得迁移时注意:
① 输出类型变了——LLMChain 返回 {"text": "..."}(一个 dict)、LCEL 返回 "..."(一个字符串)。如果老代码写 result["text"]、迁移后要改成直接用 result。
② memory 没自动继承——LLMChain 有 memory 字段、prompt | llm 没有。如果要保留对话记忆、得显式用 RunnableWithMessageHistory 包一下:
chain = prompt | llm | StrOutputParser()
chain_with_memory = RunnableWithMessageHistory(
chain,
get_session_history,
input_messages_key="input",
history_messages_key="history",
)
③ callback 自动传递机制不同——LLMChain 在 invoke 内部聚合 self.callbacks + 参数 callbacks;LCEL 完全靠 config 参数传播。老代码可能依赖 LLMChain(callbacks=[my_handler]) 的行为、迁移后要改成 chain.invoke(..., config={"callbacks": [my_handler]})。
④ streaming 的不同——LLMChain 的 stream 方法是"一次性执行、然后把结果切片"的假流式;LCEL 的 | 链是真流式——llm 产出一个 token、解析器消费一个 token、下游实时得到。这是迁移能带来的真实增益——但也意味着老代码里对"完整输出"做后处理的逻辑可能不再适用。
LangChain 官方在 deprecated 消息里写了迁移路径、但真实迁移时上述四个陷阱需要逐一检查。不是替换一行代码、是重新思考数据流——这是为什么 LangChain 允许老 Chain 一直存在、不强制一夜迁移:让用户逐步理解 LCEL 的模型、再动手改代码。
11.7.3 源码中的弃用标记
在源码中,几乎所有传统 Chain 子类都已被标记为弃用:
@deprecated(since="0.1.17", alternative="RunnableSequence, e.g., `prompt | llm`", removal="1.0")
class LLMChain(Chain): ...
@deprecated(since="0.2.13", removal="1.0",
message="Use the `create_stuff_documents_chain` constructor instead.")
class StuffDocumentsChain(BaseCombineDocumentsChain): ...
@deprecated(since="0.1.17", removal="1.0",
message="Use the `create_retrieval_chain` constructor instead.")
class RetrievalQA(BaseRetrievalQA): ...
弃用消息中都提供了迁移指引的文档链接,这是一种负责任的 API 演进方式。
11.7.4 __call__ 的 deprecated 但保留:破坏性更迭的克制
Chain 基类里有一段 deprecated 但仍然存在的方法(base.py:368-418):
@deprecated("0.1.0", alternative="invoke", removal="1.0")
def __call__(
self,
inputs: dict[str, Any] | Any,
return_only_outputs: bool = False,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
include_run_info: bool = False,
) -> dict[str, Any]:
...
return self.invoke(
inputs,
cast("RunnableConfig", {k: v for k, v in config.items() if v is not None}),
return_only_outputs=return_only_outputs,
include_run_info=include_run_info,
)
注意 @deprecated("0.1.0", ..., removal="1.0")——标记为从 0.1.0 版本开始过期、计划 1.0 版本移除。但实际发了多少次小版本这个方法还在——因为 1.0 还没到。API 弃用不是一刀切、有一个"deprecated 窗口"让用户有时间迁移。
__call__ 内部只是转发到 invoke——没有独立实现逻辑。用户老代码 chain(inputs) 能继续工作、但每次调用都会打印 deprecation warning。这种"代理到新 API"的兼容层是良性的 API 演进——保留表面兼容、让用户按自己节奏迁移。
为什么不直接删?——因为 LangChain 0.1.0 之后的用户基数太大、直接删会破坏无数第三方集成。1.0 的移除是明确的、宣告过的 breaking change——给用户足够时间(从 0.1.0 到 1.0 中间有 15+ 次小版本更新)。
弃用警告的力度——@deprecated 装饰器本身是 LangChain core 自己写的(langchain_core._api)、它在第一次调用时发 DeprecationWarning、不会每次都吵。这是对"不打扰但持续提醒"的平衡——足够让用户看到、但不至于刷屏到被过滤掉。
deprecated 窗口 + 转发实现、体现了 LangChain 作为大型 Python 库在 API 治理上的成熟度——兼容性是用户信任的基础、但不能永远挂着不砍——通过"宣告 + 保留 + 最终移除"的三阶段完成无痛迁移。
11.7.5 raise_callback_manager_deprecation 的迁移保护层
Chain 基类里还有一个"保护层"——@model_validator(mode="before") 装饰的 raise_callback_manager_deprecation(base.py:245-263):
@model_validator(mode="before")
@classmethod
def raise_callback_manager_deprecation(cls, values: dict) -> Any:
if values.get("callback_manager") is not None:
if values.get("callbacks") is not None:
msg = (
"Cannot specify both callback_manager and callbacks. "
"callback_manager is deprecated, callbacks is the preferred "
"parameter to pass in."
)
raise ValueError(msg)
warnings.warn(
"callback_manager is deprecated. Please use callbacks instead.",
DeprecationWarning,
stacklevel=4,
)
values["callbacks"] = values.pop("callback_manager", None)
return values
这段代码处理老字段 callback_manager(早期版本的名字)迁移到新字段 callbacks 的过程:
① 同时设了两个字段?直接拒绝——用户既传 callback_manager 又传 callbacks、说明他没搞清楚哪个是对的、报错强制他选一个。
② 只设了 callback_manager?警告 + 自动迁移——打印 DeprecationWarning、然后把 callback_manager 的值 pop 出来、赋给 callbacks。老代码不改一行、照常工作、但每次实例化都会看到警告。
③ stacklevel=4——让 warning 显示在用户调用 Chain 构造的那行、而不是 Chain 基类内部。这对用户调试非常重要——warning 的栈位置决定了用户能不能快速定位"哪里用了老 API"。
这种字段重命名保护层在 pydantic 模型里是常见惯用法——用 @model_validator(mode="before") 在字段验证前重写 values dict、把老字段名搬到新字段名。和 §11.7.4 的 __call__ 转发类似、是字段级别的弃用过渡。
这个验证器的存在让用户可以无痛升级——新版本的 LangChain 还接受老字段名、只是提醒应该改。等到 callback_manager 真正被移除的那个大版本、再拿掉这个验证器——届时老代码会 ValidationError(unknown field)、但至少有过一个警告窗口。
LangChain 的 API 弃用策略可以总结为三层防护:
@deprecated方法装饰器——调用级别的警告@model_validator字段迁移——构造级别的自动修复- 文档里的
removal版本标记——明确的最终期限
三层叠加、给用户最宽容的升级通道、同时把最终清理的决心写得很清楚。
11.7.6 include_run_info 与 RUN_KEY 的可选元信息
Chain.invoke 在最后有一段条件逻辑(base.py:182-184):
if include_run_info:
final_outputs[RUN_KEY] = RunInfo(run_id=run_manager.run_id)
return final_outputs
RUN_KEY 是 langchain_classic/schema.py 里定义的常量(字面量 "__run")。当用户传 include_run_info=True、Chain 会在返回的 dict 里塞一个特殊 key __run、值是 RunInfo 对象(包含 run_id)。
为什么不默认包含?——因为大部分调用方不关心 run_id、每次返回一个额外 key 会污染 dict 结构——用户 for k, v in result.items() 迭代输出时会看到一个"陌生"的 __run 键。默认关、需要的人显式打开、是"小-by-default"原则的又一体现。
为什么用下划线前缀的 key?——防止和业务 output_keys 冲突。用户定义 chain 时可能有 {"__run": "..."} 当作普通输出(虽然罕见)——但下划线双前缀是Python 的"私有"约定、应用代码一般不用。如果真有冲突、LangChain 会被用户的值覆盖——是约定式而非强制的命名空间隔离。
run_manager.run_id 是 UUID、跨进程唯一——它和 LangSmith 的 trace 系统关联。用户拿到 run_id 后可以程序化地查询 trace——比如发送给 LangSmith UI 生成可分享链接。这对调试生产问题很有用——能从应用日志反向追到具体一次执行的细节。
include_run_info 是 invoke 的 kwargs、不是构造字段——每次调用都可以单独决定是否要 run info。这比"全局开关"更灵活——正常调用不带、debug 时再带上、不需要重构整个 Chain 构造。
11.8 小结
Chain 体系是 LangChain 架构演进的一个缩影。从最初的面向对象 Chain 类继承体系,到如今基于 LCEL 的函数式管道组合,LangChain 在保持向后兼容的同时完成了范式转换。
Chain 基类通过继承 RunnableSerializable 巧妙地桥接了两个时代:传统 Chain 可以直接参与 LCEL 管道的组合。invoke 方法中精心编排的 Memory 注入、Callback 管理和异常处理,展示了一个生产级框架应有的健壮性。
文档处理链族(Stuff/Reduce)为不同规模的文档集提供了适当的合并策略。检索问答链(RetrievalQA/ConversationalRetrievalChain)虽已弃用,但其设计思想被 create_retrieval_chain 和 create_history_aware_retriever 完整继承。RouterChain 的动态路由能力在 LCEL 中由 RunnableBranch 接班。
对于新项目,建议直接使用 LCEL 的 create_* 工厂函数和 Runnable 管道。对于维护中的旧项目,由于传统 Chain 本身就是 Runnable,可以渐进式地将内部逻辑迁移到 LCEL,而不需要一次性重写整个应用。
11.9 本章源码巡礼的七个设计要点
整本章拆开的源码细节、可以归纳为七个贯穿 Chain 体系的设计要点:
① inspect.signature 做运行时签名嗅探——向后兼容老 Chain 子类(§11.1.2.5)。
② _validate_inputs 的单字符串歧义消解——只在 input_keys 单一时允许、否则给详细错误(§11.1.4)。
③ run_in_executor 的同步-异步桥——子类只写 _call、自动获得 _acall(§11.1.5)。
④ prompt_length 返回 None 作为"不支持"标记——Optional 返回值作为能力声明(§11.2.1.5)。
⑤ validate_chains 的构造期数据流验证——把运行时 KeyError 前移到构造 ValueError(§11.4.1)。
⑥ silent_errors 的三级降级——默认严格、显式宽松(§11.5.1)。
⑦ 三层 API 弃用防护——@deprecated + @model_validator + removal 版本标记(§11.7.5)。
这七个点都围绕一个核心——让框架在保持兼容的同时不失严谨。Chain 基类作为承载"新旧两代 API 过渡"的桥梁、把每一种可能的错误用法都做了友好的引导——单字符串歧义、字段重命名、签名演进、输出类型、依赖缺失——每一个都有对应的捕获和提示。
这是基础设施代码与应用代码的区别:基础设施要预判被误用的方式、并在每一处放上指向正确用法的引导。Chain 基类 200 多行代码里、真正执行业务逻辑的不到 50 行、剩下都是契约验证和迁移保护——这是支撑数十万应用的必要代价。
继承链 Serializable → RunnableSerializable → Chain → LLMChain/StuffDocumentsChain/RetrievalQA 每一层都在为下一层铺路、每一层的设计决策都服务于让传统 Chain 融入 LCEL 体系。Chain 内部的 run_manager 参数指向下一章——Callback 与可观测性,那里是整个执行模型的神经系统。