LangChain 设计与实现
第10章 向量存储与检索器
第10章 向量存储与检索器
在 RAG 系统的架构中,向量存储(VectorStore)和检索器(Retriever)是连接知识库与语言模型的关键桥梁。前者负责将文档嵌入为向量并高效存储,后者负责根据查询从知识库中检索最相关的文档。LangChain 为这两个层次分别构建了精心设计的抽象,使得开发者可以在不改变应用代码的前提下,轻松切换底层的向量数据库实现。
本章将从 Embeddings 接口开始,逐步展开 VectorStore 的核心抽象和 InMemoryVectorStore 的参考实现,深入分析 BaseRetriever 的 Runnable 集成设计,以及 VectorStoreRetriever 的适配器模式。最后,我们将探讨多种高级检索策略的实现思路。
本章要点
- 理解
Embeddings接口的embed_documents/embed_query双方法设计 - 掌握
VectorStore抽象的核心方法族:similarity_search / MMR / relevance_scores - 深入理解
InMemoryVectorStore的完整实现,包括余弦相似度和 MMR 算法 - 理解
BaseRetriever的 Runnable 协议集成与回调机制 - 掌握
VectorStoreRetriever的适配器模式与搜索策略选择 - 了解高级检索器(ParentDocumentRetriever / MultiVectorRetriever / ContextualCompressionRetriever / MergerRetriever)的设计思路
10.1 Embeddings:向量化接口
Embeddings 是向量存储系统的前置依赖 -- 它负责将文本转换为高维向量表示。
# langchain_core/embeddings/embeddings.py
class Embeddings(ABC):
@abstractmethod
def embed_documents(self, texts: list[str]) -> list[list[float]]:
"""Embed search docs."""
@abstractmethod
def embed_query(self, text: str) -> list[float]:
"""Embed query text."""
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
return await run_in_executor(None, self.embed_documents, texts)
async def aembed_query(self, text: str) -> list[float]:
return await run_in_executor(None, self.embed_query, text)
10.1.1 为什么区分 embed_documents 和 embed_query?
这个双方法设计看似冗余,实际上有深刻的技术原因。某些嵌入模型(如 Google 的 embedding-001、BGE 系列模型)会根据文本的角色使用不同的编码策略:
- 文档嵌入:优化为捕捉文档的全局语义,可能使用更长的上下文窗口
- 查询嵌入:优化为捕捉用户意图,可能添加特殊的前缀指令
大多数模型(如 OpenAI 的 text-embedding-3-small)的两个方法实现相同,但这种接口分离确保了在需要区分时有扩展空间。
flowchart LR
subgraph 文档索引阶段
A[文档列表] --> B[embed_documents]
B --> C[向量列表]
C --> D[存入 VectorStore]
end
subgraph 查询检索阶段
E[用户查询] --> F[embed_query]
F --> G[查询向量]
G --> H[VectorStore 相似搜索]
H --> I[最相关文档]
end
10.1.2 异步接口的默认实现
aembed_documents 和 aembed_query 的默认实现使用 run_in_executor 将同步方法包装到线程池中。这遵循了 LangChain 的一贯模式:提供同步的基础实现,通过线程池桥接异步接口,同时允许子类覆盖为原生异步实现。
10.2 VectorStore:向量存储抽象
VectorStore 是 LangChain 中体量最大的抽象类之一,它定义了向量存储系统的完整接口。
# langchain_core/vectorstores/base.py
class VectorStore(ABC):
# 写入接口
def add_texts(self, texts, metadatas=None, *, ids=None, **kwargs) -> list[str]: ...
def add_documents(self, documents, **kwargs) -> list[str]: ...
def delete(self, ids=None, **kwargs) -> bool | None: ...
def get_by_ids(self, ids: Sequence[str], /) -> list[Document]: ...
# 搜索接口
@abstractmethod
def similarity_search(self, query, k=4, **kwargs) -> list[Document]: ...
def similarity_search_with_score(self, *args, **kwargs) -> list[tuple[Document, float]]: ...
def similarity_search_with_relevance_scores(self, query, k=4, **kwargs) -> list[tuple[Document, float]]: ...
def max_marginal_relevance_search(self, query, k=4, fetch_k=20, lambda_mult=0.5, **kwargs) -> list[Document]: ...
# 构造接口
@classmethod
@abstractmethod
def from_texts(cls, texts, embedding, metadatas=None, **kwargs) -> Self: ...
def as_retriever(self, **kwargs) -> VectorStoreRetriever: ...
classDiagram
class VectorStore {
<<abstract>>
+embeddings: Embeddings
+add_texts(texts, metadatas, ids) list~str~
+add_documents(documents) list~str~
+delete(ids) bool
+get_by_ids(ids) list~Document~
+similarity_search(query, k) list~Document~*
+similarity_search_with_score() list~tuple~
+similarity_search_with_relevance_scores(query, k) list~tuple~
+max_marginal_relevance_search(query, k, fetch_k, lambda_mult) list~Document~
+search(query, search_type) list~Document~
+from_texts(texts, embedding) Self*
+from_documents(documents, embedding) Self
+as_retriever() VectorStoreRetriever
#_select_relevance_score_fn() Callable
#_euclidean_relevance_score_fn(distance) float$
#_cosine_relevance_score_fn(distance) float$
#_max_inner_product_relevance_score_fn(distance) float$
}
class InMemoryVectorStore {
+store: dict
+embedding: Embeddings
+similarity_search(query, k) list~Document~
+max_marginal_relevance_search(query, k) list~Document~
+dump(path) void
+load(path, embedding) InMemoryVectorStore$
}
VectorStore <|-- InMemoryVectorStore
10.2.1 写入接口的双路径设计
VectorStore 的写入接口存在 add_texts 和 add_documents 两个方法,它们之间有一个精巧的互相委托关系:
def add_texts(self, texts, metadatas=None, *, ids=None, **kwargs):
if type(self).add_documents != VectorStore.add_documents:
# 子类实现了 add_documents,委托给它
docs = [Document(id=id_, page_content=text, metadata=metadata_)
for text, metadata_, id_ in zip(texts, metadatas_, ids_, strict=False)]
return self.add_documents(docs, **kwargs)
raise NotImplementedError(...)
def add_documents(self, documents, **kwargs):
if type(self).add_texts != VectorStore.add_texts:
# 子类实现了 add_texts,委托给它
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
return self.add_texts(texts, metadatas, **kwargs)
raise NotImplementedError(...)
这种设计允许子类只实现其中一个方法,另一个会自动适配。通过 type(self).method != VectorStore.method 检测子类是否覆盖了特定方法,避免了无限递归。
10.2.2 搜索方法族
VectorStore 提供了三种搜索策略:
| 方法 | 返回类型 | 特点 |
|---|---|---|
similarity_search |
list[Document] |
最基础的相似度搜索 |
similarity_search_with_relevance_scores |
list[tuple[Document, float]] |
返回归一化到 [0,1] 的相关性分数 |
max_marginal_relevance_search |
list[Document] |
MMR 算法,平衡相关性和多样性 |
相关性分数归一化
不同的向量数据库返回不同度量的原始分数(余弦距离、欧氏距离、内积等)。LangChain 通过 _select_relevance_score_fn 模板方法,让子类选择合适的归一化函数:
@staticmethod
def _euclidean_relevance_score_fn(distance: float) -> float:
return 1.0 - distance / math.sqrt(2)
@staticmethod
def _cosine_relevance_score_fn(distance: float) -> float:
return 1.0 - distance
@staticmethod
def _max_inner_product_relevance_score_fn(distance: float) -> float:
if distance > 0:
return 1.0 - distance
return -1.0 * distance
所有归一化函数都将分数映射到 [0, 1] 区间,其中 1 表示最相似。这使得 similarity_search_with_relevance_scores 的返回值在不同后端之间具有可比性。
分数阈值过滤
similarity_search_with_relevance_scores 支持 score_threshold 参数:
def similarity_search_with_relevance_scores(self, query, k=4, **kwargs):
score_threshold = kwargs.pop("score_threshold", None)
docs_and_similarities = self._similarity_search_with_relevance_scores(query, k=k, **kwargs)
if score_threshold is not None:
docs_and_similarities = [
(doc, similarity)
for doc, similarity in docs_and_similarities
if similarity >= score_threshold
]
return docs_and_similarities
这个过滤机制使得检索系统可以动态控制结果质量 -- 当没有足够相关的文档时,返回空列表优于返回不相关的结果。
10.2.3 as_retriever:从存储到检索的桥梁
def as_retriever(self, **kwargs) -> VectorStoreRetriever:
tags = kwargs.pop("tags", None) or [*self._get_retriever_tags()]
return VectorStoreRetriever(vectorstore=self, tags=tags, **kwargs)
这个方法是适配器模式的经典应用 -- 它将 VectorStore 包装为符合 BaseRetriever 接口的对象,使其能够直接用于 LCEL 链。_get_retriever_tags 自动生成包含 VectorStore 类名和 Embeddings 类名的标签,用于追踪和调试。
10.3 InMemoryVectorStore:参考实现
InMemoryVectorStore 是 LangChain 内置的基于内存的向量存储实现。虽然不适用于生产环境的大规模数据,但它作为参考实现完整展示了 VectorStore 接口的正确用法。
10.3.1 数据存储结构
class InMemoryVectorStore(VectorStore):
def __init__(self, embedding: Embeddings) -> None:
self.store: dict[str, dict[str, Any]] = {}
self.embedding = embedding
每条记录存储为一个字典:
self.store[doc_id] = {
"id": doc_id,
"vector": vector, # list[float] - 嵌入向量
"text": doc.page_content,
"metadata": doc.metadata,
}
10.3.2 余弦相似度搜索
核心搜索方法使用 numpy 实现的余弦相似度:
def _similarity_search_with_score_by_vector(self, embedding, k=4, filter=None):
docs = list(self.store.values())
if filter is not None:
docs = [doc for doc in docs
if filter(Document(id=doc["id"], page_content=doc["text"], metadata=doc["metadata"]))]
if not docs:
return []
similarity = cosine_similarity([embedding], [doc["vector"] for doc in docs])[0]
top_k_idx = similarity.argsort()[::-1][:k]
return [
(Document(id=doc_dict["id"], page_content=doc_dict["text"], metadata=doc_dict["metadata"]),
float(similarity[idx].item()),
doc_dict["vector"])
for idx in top_k_idx
if (doc_dict := docs[idx])
]
这段代码的执行流程:
- 如果提供了
filter函数,先过滤文档 - 计算查询向量与所有文档向量的余弦相似度
- 按相似度降序排序,取 top-k
filter 参数接受一个 Callable[[Document], bool],允许在向量搜索的基础上进行元数据过滤。这比在搜索后再过滤更高效,因为它减少了需要排序的候选集大小。
flowchart TD
A[查询向量] --> B[计算与所有文档的余弦相似度]
B --> C{有 filter?}
C -->|是| D[先按元数据过滤]
D --> E[对过滤后的文档计算相似度]
C -->|否| E[对所有文档计算相似度]
E --> F[argsort 降序排序]
F --> G[取 top-k 个索引]
G --> H[构造 Document + score 返回]
10.3.3 余弦相似度的实现
# langchain_core/vectorstores/utils.py
def _cosine_similarity(x: Matrix, y: Matrix) -> np.ndarray:
x = np.array(x)
y = np.array(y)
if not _HAS_SIMSIMD:
x_norm = np.linalg.norm(x, axis=1)
y_norm = np.linalg.norm(y, axis=1)
with np.errstate(divide="ignore", invalid="ignore"):
similarity = np.dot(x, y.T) / np.outer(x_norm, y_norm)
similarity[np.isnan(similarity) | np.isinf(similarity)] = 0.0
return similarity
# 使用 simsimd 加速
x = np.array(x, dtype=np.float32)
y = np.array(y, dtype=np.float32)
return 1 - np.array(simd.cdist(x, y, metric="cosine"))
实现中有几个值得注意的细节:
- simsimd 加速:优先使用
simsimd库,它是用 C 实现的 SIMD 优化距离计算库,比纯 numpy 快数倍 - NaN/Inf 处理:对于零向量等边界情况,相似度计算会产生 NaN/Inf,统一替换为 0.0
- 安全检查:提前检测输入中的 NaN 和 Inf,发出警告
10.3.4 MMR 算法
最大边际相关性(Maximal Marginal Relevance)算法在保证相关性的同时增加结果的多样性:
def maximal_marginal_relevance(query_embedding, embedding_list, lambda_mult=0.5, k=4):
if min(k, len(embedding_list)) <= 0:
return []
similarity_to_query = _cosine_similarity(query_embedding, embedding_list)[0]
most_similar = int(np.argmax(similarity_to_query))
idxs = [most_similar]
selected = np.array([embedding_list[most_similar]])
while len(idxs) < min(k, len(embedding_list)):
best_score = -np.inf
idx_to_add = -1
similarity_to_selected = _cosine_similarity(embedding_list, selected)
for i, query_score in enumerate(similarity_to_query):
if i in idxs:
continue
redundant_score = max(similarity_to_selected[i])
equation_score = lambda_mult * query_score - (1 - lambda_mult) * redundant_score
if equation_score > best_score:
best_score = equation_score
idx_to_add = i
idxs.append(idx_to_add)
selected = np.append(selected, [embedding_list[idx_to_add]], axis=0)
return idxs
MMR 公式:score = lambda * sim(doc, query) - (1 - lambda) * max(sim(doc, selected_docs))
flowchart TD
A[计算所有文档与查询的相似度] --> B[选择最相似的文档加入结果集]
B --> C{结果集达到 k?}
C -->|否| D[对每个未选文档计算 MMR 分数]
D --> E["MMR = lambda * 查询相似度 - (1-lambda) * 与已选最大相似度"]
E --> F[选择 MMR 分数最高的文档]
F --> G[加入结果集]
G --> C
C -->|是| H[返回结果索引]
lambda_mult 参数控制相关性和多样性的平衡:
lambda_mult = 1.0:等同于普通相似度搜索lambda_mult = 0.0:最大化多样性lambda_mult = 0.5:默认值,相关性和多样性各占一半
10.3.5 持久化支持
InMemoryVectorStore 提供了简单的持久化方法:
def dump(self, path: str) -> None:
path_ = Path(path)
path_.parent.mkdir(exist_ok=True, parents=True)
with path_.open("w", encoding="utf-8") as f:
json.dump(dumpd(self.store), f, indent=2)
@classmethod
def load(cls, path, embedding, **kwargs):
with Path(path).open("r", encoding="utf-8") as f:
store = load(json.load(f), allowed_objects=[Document])
vectorstore = cls(embedding=embedding, **kwargs)
vectorstore.store = store
return vectorstore
使用 LangChain 的序列化系统(dumpd/load)确保了 Document 对象的正确序列化和反序列化。
10.4 BaseRetriever:检索器的 Runnable 抽象
BaseRetriever 是 LangChain 检索系统的核心抽象。它继承自 RunnableSerializable[str, list[Document]],这意味着检索器接收字符串查询,返回文档列表,并且完全兼容 LCEL。
# langchain_core/retrievers.py
RetrieverInput = str
RetrieverOutput = list[Document]
class BaseRetriever(RunnableSerializable[RetrieverInput, RetrieverOutput], ABC):
tags: list[str] | None = None
metadata: dict[str, Any] | None = None
@abstractmethod
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> list[Document]:
"""Get documents relevant to a query."""
async def _aget_relevant_documents(
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> list[Document]:
return await run_in_executor(
None, self._get_relevant_documents, query, run_manager=run_manager.get_sync(),
)
10.4.1 invoke 方法的完整回调集成
BaseRetriever.invoke 的实现展示了 LangChain 回调系统的完整工作流:
def invoke(self, input: str, config: RunnableConfig | None = None, **kwargs) -> list[Document]:
config = ensure_config(config)
inheritable_metadata = {
**(config.get("metadata") or {}),
**self._get_ls_params(**kwargs),
}
callback_manager = CallbackManager.configure(
config.get("callbacks"), None,
verbose=kwargs.get("verbose", False),
inheritable_tags=config.get("tags"),
local_tags=self.tags,
inheritable_metadata=inheritable_metadata,
local_metadata=self.metadata,
)
run_manager = callback_manager.on_retriever_start(
None, input, name=config.get("run_name") or self.get_name(), ...
)
try:
result = self._get_relevant_documents(input, run_manager=run_manager, ...)
except Exception as e:
run_manager.on_retriever_error(e)
raise
else:
run_manager.on_retriever_end(result)
return result
sequenceDiagram
participant C as Caller
participant R as BaseRetriever
participant CM as CallbackManager
participant I as Implementation
C->>R: invoke(query, config)
R->>CM: configure(callbacks, tags, metadata)
CM->>CM: on_retriever_start(query)
R->>I: _get_relevant_documents(query, run_manager)
alt 成功
I-->>R: documents
R->>CM: on_retriever_end(documents)
R-->>C: documents
else 异常
I-->>R: Exception
R->>CM: on_retriever_error(exception)
R-->>C: raise Exception
end
这段代码有几个关键设计点:
- LangSmith 集成:
_get_ls_params返回LangSmithRetrieverParams,包含检索器名称、向量存储提供商、嵌入模型等信息,用于追踪 - 标签继承:通过
inheritable_tags和local_tags的区分,支持标签在链式调用中的传播 - 错误追踪:无论成功还是失败,都会通知回调管理器,确保可观测性
10.4.2 init_subclass 的兼容性处理
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
parameters = signature(cls._get_relevant_documents).parameters
cls._new_arg_supported = parameters.get("run_manager") is not None
# 如果子类未实现 run_manager 参数,自动适配
if not cls._new_arg_supported and cls._aget_relevant_documents == BaseRetriever._aget_relevant_documents:
async def _aget_relevant_documents(self, query):
return await run_in_executor(None, self._get_relevant_documents, query)
cls._aget_relevant_documents = _aget_relevant_documents
这段代码处理了一个向后兼容问题:早期版本的 _get_relevant_documents 不接受 run_manager 参数。__init_subclass__ 在子类定义时自动检测其签名,如果发现旧式签名(没有 run_manager),就会自动生成一个适配版本的 _aget_relevant_documents。
10.4.3 _expects_other_args 与 LangSmith 名字归一化:另外两处容易漏看的子类检测
__init_subclass__ 还多做了一件 chapter 里没提的事——检测签名里有没有第三种参数(retrievers.py:162-165):
# retrievers.py:162
# If a V1 retriever broke the interface and expects additional arguments
cls._expects_other_args = (
len(set(parameters.keys()) - {"self", "query", "run_manager"}) > 0
)
集合减法 parameters - {self, query, run_manager}、非空即 _expects_other_args = True。这个 flag 在 invoke 里有关键作用(retrievers.py:220):
kwargs_ = kwargs if self._expects_other_args else {}
"用户通过 invoke 传的 kwargs 要不要转发给 _get_relevant_documents"——取决于它自己的签名接不接。这样设计的原因:如果一个 V1 时代的 retriever 只写了 def _get_relevant_documents(self, query, run_manager),你 invoke(query, config, foo="bar") 的 foo 不能强塞——会报 unexpected keyword argument。_expects_other_args=False 时把 kwargs 丢掉是对老 retriever 的保护。反之新式 retriever 声明 def _get_relevant_documents(self, query, run_manager, **kw) 或带具体参数的、_expects_other_args=True、kwargs 透传。这是 Python 动态类型下最优雅的"新旧接口无缝共存"手法——不需要版本号、不需要 decorator、靠 inspect.signature 在子类定义时做一次静态分析就完成。
另一处隐蔽细节——_get_ls_params 里的名字归一化(retrievers.py:169-174):
default_retriever_name = self.get_name()
if default_retriever_name.startswith("Retriever"):
default_retriever_name = default_retriever_name[9:]
elif default_retriever_name.endswith("Retriever"):
default_retriever_name = default_retriever_name[:-9]
default_retriever_name = default_retriever_name.lower()
如果你的 class 叫 CustomRetriever、LangSmith trace 里显示的名字是 "custom"(去 "Retriever" 后缀 + 小写);叫 RetrieverMCP 显示成 "mcp"(去前缀)。这个处理让 LangSmith dashboard 里几十种 retriever 的名字统一成类似 tag 的短名——容易聚合、容易比较延迟分布。细节到数毫秒的可视化差异都被它贡献。
这两处和 __init_subclass__ 的 _new_arg_supported 检测合起来——**都属于"在子类定义时做一次轻量静态分析、运行时零开销"**的范式。Python 不像 Rust 那样能用类型系统在编译期表达接口演化,但 __init_subclass__ + inspect.signature 这组工具在 runtime 早期(类定义那一瞬间)做等价的事——让新老 retriever 在同一个进程里共存、用户代码完全感知不到差异。这是大型 Python 生态库维护数年 API 稳定性的关键手法。
10.5 VectorStoreRetriever:适配器模式
VectorStoreRetriever 将 VectorStore 适配为 BaseRetriever 接口:
class VectorStoreRetriever(BaseRetriever):
vectorstore: VectorStore
search_type: str = "similarity"
search_kwargs: dict = Field(default_factory=dict)
allowed_search_types: ClassVar[Collection[str]] = (
"similarity", "similarity_score_threshold", "mmr",
)
@model_validator(mode="before")
@classmethod
def validate_search_type(cls, values):
search_type = values.get("search_type", "similarity")
if search_type not in cls.allowed_search_types:
raise ValueError(...)
if search_type == "similarity_score_threshold":
score_threshold = values.get("search_kwargs", {}).get("score_threshold")
if not isinstance(score_threshold, float):
raise ValueError("score_threshold must be a float")
return values
10.5.1 搜索策略路由
def _get_relevant_documents(self, query, *, run_manager, **kwargs):
kwargs_ = self.search_kwargs | kwargs
if self.search_type == "similarity":
docs = self.vectorstore.similarity_search(query, **kwargs_)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = self.vectorstore.similarity_search_with_relevance_scores(query, **kwargs_)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = self.vectorstore.max_marginal_relevance_search(query, **kwargs_)
return docs
三种搜索策略的适用场景:
flowchart TD
A[VectorStoreRetriever] --> B{search_type?}
B -->|similarity| C["similarity_search(query, k=4)"]
C --> D[返回 top-k 最相似文档]
B -->|similarity_score_threshold| E["similarity_search_with_relevance_scores(query, score_threshold=0.8)"]
E --> F[返回分数高于阈值的文档]
B -->|mmr| G["max_marginal_relevance_search(query, k=4, fetch_k=20, lambda_mult=0.5)"]
G --> H[返回兼顾相关性和多样性的文档]
10.5.2 使用示例
# 基本相似度搜索
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# MMR 搜索
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}
)
# 分数阈值过滤
retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8}
)
# 在 LCEL 链中使用
chain = retriever | format_docs | prompt | model | StrOutputParser()
10.6 高级检索策略
LangChain 在基础检索器之上构建了多种高级检索策略,每种策略都针对特定的检索质量问题提供解决方案。
10.6.1 MultiVectorRetriever:多向量检索
MultiVectorRetriever 的核心思想是:用于搜索的向量和最终返回的文档可以是不同的。
class MultiVectorRetriever(BaseRetriever):
vectorstore: VectorStore # 存储子文档向量
docstore: BaseStore[str, Document] # 存储父文档
id_key: str = "doc_id" # 子文档 metadata 中关联父文档的键
search_type: SearchType = SearchType.similarity
def _get_relevant_documents(self, query, *, run_manager):
if self.search_type == SearchType.mmr:
sub_docs = self.vectorstore.max_marginal_relevance_search(query, **self.search_kwargs)
else:
sub_docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
# 从子文档的 metadata 中提取父文档 ID
ids = []
for d in sub_docs:
if self.id_key in d.metadata and d.metadata[self.id_key] not in ids:
ids.append(d.metadata[self.id_key])
# 从 docstore 中获取父文档
docs = self.docstore.mget(ids)
return [d for d in docs if d is not None]
flowchart TD
subgraph 索引阶段
A[原始文档] --> B[分割为子文档]
B --> C[为每个子文档生成嵌入]
C --> D[存入 VectorStore]
A --> E[存入 DocStore]
B --> F["子文档 metadata 记录 doc_id"]
end
subgraph 检索阶段
G[查询] --> H[在 VectorStore 中搜索子文档]
H --> I[提取唯一的 doc_id 集合]
I --> J[从 DocStore 获取父文档]
J --> K[返回完整父文档]
end
10.6.2 ParentDocumentRetriever:父文档检索
ParentDocumentRetriever 继承自 MultiVectorRetriever,自动化了"小块搜索、大块返回"的过程:
class ParentDocumentRetriever(MultiVectorRetriever):
child_splitter: TextSplitter # 创建子文档的分割器
parent_splitter: TextSplitter | None = None # 创建父文档的分割器
child_metadata_fields: Sequence[str] | None = None
def add_documents(self, documents, ids=None, add_to_docstore=True, **kwargs):
# 可选:先用 parent_splitter 分割为中等大小的父文档
if self.parent_splitter is not None:
documents = self.parent_splitter.split_documents(documents)
# 为每个父文档生成唯一 ID
doc_ids = [str(uuid.uuid4()) for _ in documents]
docs = []
full_docs = []
for i, doc in enumerate(documents):
_id = doc_ids[i]
# 用 child_splitter 进一步分割为子文档
sub_docs = self.child_splitter.split_documents([doc])
for _doc in sub_docs:
_doc.metadata[self.id_key] = _id # 关联父文档 ID
docs.extend(sub_docs)
full_docs.append((_id, doc))
# 子文档存入向量库,父文档存入文档库
self.vectorstore.add_documents(docs, **kwargs)
if add_to_docstore:
self.docstore.mset(full_docs)
这种设计解决了 RAG 中的经典矛盾:
- 小块嵌入更精确地捕捉语义,提高检索准确性
- 大块(父文档)提供更完整的上下文,提高生成质量
10.6.3 ContextualCompressionRetriever:压缩检索
ContextualCompressionRetriever 在基础检索器的结果上应用后处理压缩:
class ContextualCompressionRetriever(BaseRetriever):
base_compressor: BaseDocumentCompressor
base_retriever: RetrieverLike
def _get_relevant_documents(self, query, *, run_manager, **kwargs):
docs = self.base_retriever.invoke(
query, config={"callbacks": run_manager.get_child()}, **kwargs,
)
if docs:
compressed_docs = self.base_compressor.compress_documents(
docs, query, callbacks=run_manager.get_child(),
)
return list(compressed_docs)
return []
这是装饰器模式的典型应用。base_compressor 可以是:
- LLM 摘要压缩器:用 LLM 提取文档中与查询最相关的段落
- 重排序器(Reranker):使用交叉编码器模型对结果重新排序
- 冗余过滤器:去除语义重复的文档
10.6.4 MergerRetriever:融合检索
MergerRetriever 将多个检索器的结果合并:
class MergerRetriever(BaseRetriever):
retrievers: list[BaseRetriever]
def merge_documents(self, query, run_manager):
retriever_docs = [
retriever.invoke(query, config={"callbacks": run_manager.get_child(f"retriever_{i + 1}")})
for i, retriever in enumerate(self.retrievers)
]
# 交叉合并:轮流从每个检索器取文档
merged_documents = []
max_docs = max(map(len, retriever_docs), default=0)
for i in range(max_docs):
for _retriever, doc in zip(self.retrievers, retriever_docs, strict=False):
if i < len(doc):
merged_documents.append(doc[i])
return merged_documents
flowchart TD
Q[查询] --> R1[检索器 1: 稠密向量]
Q --> R2[检索器 2: 稀疏 BM25]
Q --> R3[检索器 3: 关键词]
R1 --> M[MergerRetriever]
R2 --> M
R3 --> M
M --> D["交叉合并排序"]
D --> F["融合结果"]
交叉合并策略(round-robin)确保了每个检索器的高优先级结果都能靠前出现。异步版本使用 asyncio.gather 并行调用所有检索器,显著减少延迟。
10.7 LangSmith 检索追踪
BaseRetriever 集成了 LangSmith 追踪参数:
class LangSmithRetrieverParams(TypedDict, total=False):
ls_retriever_name: str
ls_vector_store_provider: str | None
ls_embedding_provider: str | None
ls_embedding_model: str | None
VectorStoreRetriever 覆盖了 _get_ls_params 方法,自动填充这些追踪参数:
def _get_ls_params(self, **kwargs):
ls_params = super()._get_ls_params(**kwargs)
ls_params["ls_vector_store_provider"] = self.vectorstore.__class__.__name__
if self.vectorstore.embeddings:
ls_params["ls_embedding_provider"] = self.vectorstore.embeddings.__class__.__name__
return ls_params
这使得在 LangSmith 中可以按向量存储提供商和嵌入模型进行过滤和分析检索性能。
10.8 设计决策分析
VectorStore 为什么不是 Runnable?
与 BaseRetriever 不同,VectorStore 不继承 Runnable。这是因为 VectorStore 的操作语义更复杂 -- 它不仅仅是"输入查询、输出文档",还涉及写入、删除、ID 查询等操作。as_retriever() 方法将 VectorStore 的搜索能力投射为 Runnable 接口,保持了 VectorStore 自身接口的完整性。
为什么 similarity_search 是唯一的抽象方法?
在 VectorStore 的众多搜索方法中,只有 similarity_search 和 from_texts 是抽象方法。其他方法(MMR、relevance_scores 等)都有基于 similarity_search_with_score 的默认实现。这降低了实现一个新 VectorStore 后端的门槛 -- 只需实现最基本的相似度搜索,其他搜索策略就能自动可用。
MMR 的 fetch_k 参数
MMR 搜索有一个 fetch_k 参数(默认 20),它决定了先获取多少候选文档,再从中通过 MMR 算法选取 k 个。fetch_k > k 确保了 MMR 有足够的候选集来优化多样性。fetch_k 过小会限制多样性的空间,过大会增加计算成本。
BaseRetriever 的 _new_arg_supported 机制
这是 LangChain 处理 API 演进的一个典型模式。当框架需要为已有的抽象方法添加新参数时,不能简单地修改签名(会破坏所有现有实现)。通过 __init_subclass__ 动态检测子类签名并自动适配,既保持了新 API 的功能,又不破坏旧代码。
10.9 小结
LangChain 的向量存储与检索器系统构建了从嵌入到检索的完整链路。Embeddings 接口通过 embed_documents/embed_query 的分离,为不同角色的文本嵌入留下了优化空间。VectorStore 抽象通过精心设计的方法族,支持从简单相似度搜索到 MMR 多样性搜索的全方位检索能力。InMemoryVectorStore 作为参考实现,完整展示了余弦相似度计算、MMR 算法、持久化等核心功能。
BaseRetriever 作为 Runnable 的子类,使得检索器天然融入 LCEL 链。其 invoke 方法中精心编排的回调管理、LangSmith 追踪参数、以及 __init_subclass__ 的兼容性处理,体现了 LangChain 在可观测性和向后兼容方面的深思熟虑。VectorStoreRetriever 通过适配器模式优雅地连接了存储层和检索层。
高级检索策略 -- MultiVectorRetriever 的"小块搜索、大块返回"、ParentDocumentRetriever 的自动化父子文档管理、ContextualCompressionRetriever 的后处理压缩、MergerRetriever 的多源融合 -- 展示了 LangChain 检索系统的丰富生态。这些组件通过统一的 BaseRetriever 接口可以自由组合,构建出适应各种复杂检索需求的解决方案。
检索质量是 RAG 系统的生命线。理解这些抽象的设计意图和实现细节,是构建高质量 RAG 应用的基础。
10.10 源码实证:retrievers.py 328 行 + vectorstores/base.py 1111 行
本章前面讲了抽象——真实代码就这么多:
| 文件 | 行 | 内容 |
|---|---|---|
libs/core/langchain_core/retrievers.py |
328 | BaseRetriever + 两个内置辅助 |
libs/core/langchain_core/vectorstores/base.py |
1111 | VectorStore + VectorStoreRetriever |
libs/core/langchain_core/vectorstores/in_memory.py |
546 | 参考实现 InMemoryVectorStore |
libs/core/langchain_core/vectorstores/utils.py |
157 | MMR 算法 + 相似度 helpers |
总计 2195 行——撑起整个 LangChain 的检索抽象。
10.11 __init_subclass__ 的**"向后兼容"**黑魔法
§10.8 提了 _new_arg_supported 机制——源码 retrievers.py:146-160 的实现值得逐行拆:
@override
def __init_subclass__(cls, **kwargs: Any) -> None:
super().__init_subclass__(**kwargs)
parameters = signature(cls._get_relevant_documents).parameters
cls._new_arg_supported = parameters.get("run_manager") is not None
if (
not cls._new_arg_supported
and cls._aget_relevant_documents == BaseRetriever._aget_relevant_documents
):
async def _aget_relevant_documents(self, query):
return await run_in_executor(None, self._get_relevant_documents, query)
cls._aget_relevant_documents = _aget_relevant_documents
四段魔法——
1——__init_subclass__ 是 Python 3.6+ 的 hook——子类"被定义时" 触发——不是实例化时——所以是类属性初始化。
2——signature(cls._get_relevant_documents).parameters 用 inspect 模块反射子类方法签名——判断是否有 run_manager 参数——新 API 的标记。
3——cls._new_arg_supported 记在类上——运行 invoke 时用这个 flag 选择调用路径。
4——没新 API 的老子类——自动 monkey-patch 一个 async 版本——用 run_in_executor 在线程池里跑同步方法——老代码不改也能跑 async。
这 14 行代码——承载了 LangChain 3+ 年的 API 演进——旧代码零迁移。
10.12 VectorStoreRetriever.allowed_search_types 的三档
vectorstores/base.py:976-980——三种 search_type:
similarity——标准余弦相似度 + top-ksimilarity_score_threshold——只返回 score ≥ threshold 的mmr——Maximum Marginal Relevance——兼顾相关性 + 多样性
对应的 validate_search_type 函数(base.py:988-1008)——enforce score_threshold 必须是 0-1 的 float——**防止 **"忘配 threshold" 的典型踩坑。
10.13 MMR 算法的**"lambda"**参数——多样性 vs 相关性
MMR(Carbonell & Goldstein, 1998)的公式——
MMR = argmax [λ * Sim(q, d) - (1-λ) * max_j Sim(d, d_j)]
λ 取值——
λ = 1.0:纯相关性(等同于标准 similarity_search)λ = 0.5:平衡(LangChain 默认)λ = 0.0:纯多样性
典型取值参考——
- News summarization——
λ ≈ 0.7(偏相关性、避免信息重复) - 代码 snippet 检索——
λ ≈ 0.3(多样性优先、覆盖不同语言/库) - 问答召回——
λ ≈ 0.5(LangChain 默认值)
注意 fetch_k 必须 > k——否则 MMR 算法没候选集——fetch_k=20, k=5 是常见甜点。
10.14 InMemoryVectorStore 546 行:最小参考实现
性能极限——N < 10,000 文档 + d < 2048 维——秒级响应——再多就得上 FAISS / Chroma / Pinecone。
何时值得用 InMemory——
- 原型验证——不想装 vector db
- 小数据集 RAG——个人知识库 < 1 万条
- 单元测试——mock 掉真实 vector db
- CI 集成测试——快速、无外部依赖
生产规模——必须换真实 vector db——但接口一致、换实现只改一行构造代码。
10.15 from_texts 的工厂方法——为什么它是 @classmethod
vectorstores/base.py:848-902 的 from_texts——abstract class method:
@classmethod
@abstractmethod
def from_texts(cls, texts, embedding, metadatas=None, ...) -> VST: ...
为什么要 classmethod 工厂——
- VectorStore 的构造过程复杂——embedding 模型初始化 + 底层客户端连接 + schema 创建
- 不同 VectorStore 的
__init__参数完全不同(Pinecone 要 API key、FAISS 要 index_type、Chroma 要 persist_directory) from_texts把复杂构造标准化——用户写ChromaStore.from_texts([...])就建好 store 并插入了初始文档
10.16 ContextualCompressionRetriever 的"后处理"流水线
§10.1 提到 ContextualCompressionRetriever——它是一个"retriever + compressor" 组合:
流水线——
- base_retriever 召回 k=20 文档(原文可能很长)
- compressor 逐个 doc 压缩——去掉无关段落——只留精华
- 最终塞给 LLM 的是 20 个 doc 但每个被压缩过——总 token 可能从 50K 降到 8K
三类 compressor——
- LLMChainExtractor——LLM 抽取相关句
- LLMChainFilter——LLM 判断"这个 doc 整体相关吗"(过滤级)
- EmbeddingsFilter——向量相似度二次过滤(不用 LLM,便宜)
组合——先 EmbeddingsFilter 粗过滤、再 LLMChainExtractor 精抽取——成本和质量平衡。
这一模式和本书 MCP 第 20 章§20.2.3 讨论的"工具结果截断"同源。
10.17 MergerRetriever:多源检索的 Reciprocal Rank Fusion
MergerRetriever 把多个 retriever 的结果合并成一个:
合并策略——Reciprocal Rank Fusion (RRF):
score(d) = sum over i: 1 / (k + rank_i(d))
k通常 = 60- 每个 retriever 给 d 一个 rank(第几名)
- RRF 的优点——不需要分数标准化——每个 retriever 的分数尺度可以完全不同
这就是"hybrid retrieval"的核心——dense vector + sparse keyword (BM25) 两路召回、RRF 合并。它常常优于单一 retriever,但提升幅度取决于语料、查询分布、评测集和合并参数,不能脱离具体 benchmark 给固定百分比。
10.18 ParentDocumentRetriever:大 index 小、大 retrieve 大
经典场景——你的文档是 100 页 PDF、chunk 成 500 token/chunk 存向量库——检索后只返回 500 token 太短、LLM 没上下文——但如果存 5000 token/chunk、embedding 又"太糊"、精度下降。
ParentDocumentRetriever 的解法——
- "子 chunk" 入 vector store——小、精(embedding 准)
- "父 chunk" 入 docstore(key-value 存储)——大、含上下文
- 检索时——先用子 chunk 找匹配、再用它的 parent_id 把父 chunk 拉回来——精度 + 上下文兼得
这是 RAG 里最有效的"chunk size 权衡"——任何生产级 RAG 都值得试这个 retriever。
10.19 Async 支持的陷阱:同步阻塞破坏 event loop
retrievers.py 里的 _aget_relevant_documents——如果用户懒得实现、默认会调 run_in_executor 把同步版塞进线程池。
优点——老同步代码自动 async 可用
陷阱——如果 _get_relevant_documents 里有 CPU-bound 操作(比如跑 embedding 模型)——会占用 executor 的线程池——高并发时 executor 耗尽、新请求排队
最佳实践——
- I/O-bound retriever(访问 DB / API)——必须实现
_aget_relevant_documents、用 async I/O 原语(aiohttp / asyncpg) - CPU-bound retriever(本地 embedding)——可以用 executor fallback、但 pool size 要调大或用 multiprocessing pool
10.20 本章的四层升华
Retriever 抽象的分层:
- 最底层:
Embeddings(embed_query / embed_documents)——把文本变向量 - 第二层:
VectorStore(similarity_search / MMR)——向量相似度检索 - 第三层:
BaseRetriever(Runnable)——把 VectorStore 的能力通过 Runnable 暴露 - 第四层:
高级 Retriever(MultiVector / Parent / Compression / Merger)——组合多个 Retriever 构建复杂检索流程
四层关系——底层换 provider(OpenAI → Cohere embedding)不影响上层、换 vector store(FAISS → Pinecone)不影响 Retriever 接口、换 Retriever 不影响 chain 的下游消费。
**这就是"分层抽象"的工程红利——每层可替换、总体不崩。
本书第 2 章讲的 Runnable + 本章讲的 Retriever——在第四层自然汇合——Retriever 是 Runnable、可以retriever | llm | parser 拼出完整 RAG chain。
10.21 实战:8 行代码搭起最小 RAG
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
store = InMemoryVectorStore(embedding=OpenAIEmbeddings())
store.add_texts(["RAG stands for ...", "LangChain is ...", "..."])
prompt = ChatPromptTemplate.from_messages([
("system", "Answer based on docs:\n{context}"),
("human", "{question}")
])
chain = {"context": store.as_retriever(), "question": RunnablePassthrough()} | prompt | ChatOpenAI() | StrOutputParser()
answer = chain.invoke("What is RAG?")
8 行代码、完整 RAG——涵盖本章讨论的所有抽象——Embeddings + VectorStore + as_retriever() + Runnable | prompt | llm | parser。
可替换性——
- 换 DB:
InMemoryVectorStore→PineconeVectorStore(一行) - 换 embedding:
OpenAIEmbeddings→HuggingFaceEmbeddings(一行) - 换 llm:
ChatOpenAI→ChatAnthropic(一行)
一套架构、多种组合——这就是 LangChain 的"乐高玩法"。
10.22 RAG 质量评估的**"四个维度"**
Retriever 效果如何测量?—— 四个维度:
维度 1:召回率 (Recall@k)——相关文档是否在 top-k 结果里——召回率低 = 信息漏掉、LLM 答不对
维度 2:准确率 (Precision@k)——top-k 里有多少真相关——准确率低 = 噪声多、LLM 被干扰
维度 3:MRR (Mean Reciprocal Rank)——第一个相关结果在第几名——MRR 高 = 相关文档排前面、LLM 用得到
维度 4:NDCG——考虑相关性等级(不只 0/1、还有 0.5、0.3 等)——更细粒度
业界实践——
- 快速迭代期——Recall@10 + MRR
- 深度调优期——NDCG@10 + 人工评估
- LLM 端到端评测——本书第 18 章讲的 Judge-based eval
RAGAS / TruLens / DeepEval 是三个常用的 RAG 评测框架——分别有不同侧重——读者选一个内化进 CI 就够。
10.24 Embedding 模型的**"三大流派"**
本章主要讲 VectorStore 和 Retriever——但 embedding 模型本身值得一节:
流派 1:OpenAI / 商用 API——
text-embedding-3-small1536 维、$0.02/M tokenstext-embedding-3-large3072 维、$0.13/M tokens- 优点:质量高、无需自建
- 缺点:按量付费、数据出境
流派 2:开源 sentence-transformers——
BAAI/bge-large-en-v1.51024 维——英文 SOTA 之一BAAI/bge-m31024 维——多语言 + dense/sparse 混合- 优点:本地跑、免费、可 fine-tune
- 缺点:GPU 推理成本、维护负担
流派 3:Cohere / Voyage AI——
cohere-embed-v3—— 支持 input_type 区分 query/document(LangChain Embeddings 接口的天然契合)voyage-3-lite—— 性价比高- 优点:质量顶尖
- 缺点:API 调用、国内访问麻烦
选型三原则——
- 质量优先 → OpenAI / Cohere
- 成本优先 → 本地 bge / e5
- 合规优先 → 本地 open-source
一个典型事故——某团队"模型升级" 时把 OpenAI 从 ada-002 换到 text-embedding-3-small——维度从 1536 变了但索引没重建——全库检索结果混乱——半天查到原因。
教训——embedding 模型的升级 = 整个向量库的重建——不可滚动升级——提前规划停机窗口。
10.27 BaseRetriever.invoke 调用栈的全貌
retrievers.py:175-240 的 invoke 方法——一行简单签名、背后十多步工作:
def invoke(self, input: str, config=None, **kwargs) -> list[Document]:
config = ensure_config(config)
callback_manager = CallbackManager.configure(
config.get("callbacks"),
inheritable_tags=config.get("tags") or [] + self.tags or [],
inheritable_metadata=config.get("metadata") or {} | self.metadata or {},
)
run_manager = callback_manager.on_retriever_start(
self._get_ls_params(**kwargs), input, run_id=config.get("run_id"),
name=config.get("run_name") or self.get_name(),
)
try:
_kwargs = {"run_manager": run_manager} if self._new_arg_supported else {}
if self._expects_other_args:
result = self._get_relevant_documents(input, **_kwargs, **kwargs)
else:
result = self._get_relevant_documents(input, **_kwargs)
except Exception as e:
run_manager.on_retriever_error(e)
raise
else:
run_manager.on_retriever_end(result)
return result
十一步——
- ensure_config——把
config=None变成空 dict - CallbackManager.configure——继承 tag / metadata
- on_retriever_start——LangSmith 事件起点、生成 run_id
- 判断
_new_arg_supported(§10.11) - 判断
_expects_other_args - 调用子类的
_get_relevant_documents - 失败 → on_retriever_error + raise
- 成功 → on_retriever_end
- 返回结果
注意——每个步骤都给 LangSmith 发事件——一次 invoke 在 trace 里能看到完整 retriever span(含输入 query、输出文档 id、耗时、错误)。
这呼应本书第 19 章§19.28 讨论的 PromQL 仪表盘——每次 retriever 调用都是一条可观测数据——RAG 产品的健康度就看这些事件。
10.28 _get_ls_params 的 LangSmith 集成
retrievers.py 和 vectorstores/base.py:1018-1036 的 _get_ls_params——给 LangSmith 追踪加"专用参数":
ls_params["ls_vector_store_provider"] = self.vectorstore.__class__.__name__
ls_params["ls_embedding_provider"] = self.vectorstore.embeddings.__class__.__name__
这两个字段在 LangSmith UI 上会单独显示——用户能按"vector store 提供商" 和 "embedding 模型" 切片分析:
- "Pinecone 的检索准确率 vs Weaviate 是多少"
- "text-embedding-3-small vs text-embedding-3-large 在我 agent 上的召回差多少"
没有这两个字段——你只能看到"一次 retriever 调用"——不知道是哪个 provider 做的。
这是 LangChain 和 LangSmith 的深度集成——框架原生支持追踪切片——不需要用户手动埋点。
10.29 VectorStore.add_texts vs add_documents
两个看起来相似的方法——实际略有语义差别:
add_texts(texts: list[str], metadatas: list[dict])——用户手动拆开 text 和 metadataadd_documents(documents: list[Document])——用户传入 Document 对象、内部自动拆
为什么两个都有——历史兼容——老 API 是 add_texts、新 API 是 add_documents——后者更 type-safe(Document 类验证过 page_content / metadata 字段)。
推荐用法——
- 新代码用
add_documents(类型安全) - 快速脚本用
add_texts(简洁) - Document 是 Runnable 链里的通用类型——用 add_documents 和 chain 输出天然契合
10.30 VectorStore 性能 benchmark 的基本功
读者想比较"Pinecone vs Chroma vs Qdrant"——哪些指标要测:
指标 1——Insert QPS——每秒能插多少文档——build 阶段关键(1M 文档要插几小时)
指标 2——Search P99 latency——检索延迟——交互 RAG 关键(用户等待 > 500ms 就感觉卡)
指标 3——Recall@k——相同 ground truth 下召回率——质量关键
指标 4——Recall@k vs ef_search 的曲线——近似搜索(HNSW)的参数调优——权衡精度 vs 速度
指标 5——Metadata filter 的性能损失——过滤条件多了变不变慢——很多 vector db 的 metadata filter 是 post-filter、不是 pre-filter、性能差异明显
指标 6——内存/磁盘占用——1M × 1536 维 float 是多少 GB——FAISS 默认 ~6GB、Pinecone 托管不需要操心
6 项测完——才能"数据驱动"选 vector db——别被营销文案骗。
10.31 vectorstores/utils.py 157 行——MMR 算法真身
vectorstores/utils.py:88-157 的 maximal_marginal_relevance 函数——MMR 算法 Python 实现:
def maximal_marginal_relevance(
query_embedding: np.ndarray,
embedding_list: list,
lambda_mult: float = 0.5,
k: int = 4,
) -> list[int]:
# Step 1: 余弦相似度计算(query vs all docs)
similarity_to_query = cosine_similarity(query_embedding, embedding_list)[0]
# Step 2: 先选 similarity 最高的
most_similar = int(np.argmax(similarity_to_query))
idxs = [most_similar]
selected = np.array([embedding_list[most_similar]])
# Step 3: 迭代选剩下的 k-1 个
while len(idxs) < min(k, len(embedding_list)):
best_score = -np.inf
idx_to_add = -1
similarity_to_selected = cosine_similarity(embedding_list, selected)
for i, query_score in enumerate(similarity_to_query):
if i in idxs: continue
redundant_score = max(similarity_to_selected[i])
equation_score = (
lambda_mult * query_score
- (1 - lambda_mult) * redundant_score
)
if equation_score > best_score:
best_score = equation_score
idx_to_add = i
idxs.append(idx_to_add)
selected = np.append(selected, [embedding_list[idx_to_add]], axis=0)
return idxs
50 行 Python——教科书级 MMR 实现——你可以直接抄到自己的项目。
时间复杂度——O(k × N)——k 个 iteration、每次遍历 N 个候选——N < 100 时毫秒级、N > 1000 时秒级。
所以 fetch_k 一般设 20-50——太大会让 MMR 本身变瓶颈。
10.32 一段额外的工程提醒
RAG 系统上生产前——务必检查三件事:
检查 1:chunk 是否合理——500-1000 token 是甜点——太小上下文不够、太大 embedding 糊——用 RecursiveCharacterTextSplitter 按语义拆。
检查 2:embedding 和检索用同一模型——用 text-embedding-3-small 建索引、检索时也必须用同一模型生成 query 向量——用错会得"接近随机" 的结果——看似检索出来、实际乱搞。
检查 3:metadata filter 能用 index——Pinecone、Qdrant 等支持 metadata index——不建索引的 filter 会全表扫描——性能退化。
这 3 项核对表过一遍——90% RAG "为什么不工作" 的问题能被解决。
10.34 Chunking 策略的**"五大流派"
RAG 上游最被忽视的环节是 chunking——5 种常见策略:
策略 1——定长 chunk(CharacterTextSplitter)——按字符数切——最快、最糙——语义边界可能被砍断
策略 2——递归 chunk(RecursiveCharacterTextSplitter)——按分隔符层级(段落 → 句子 → 词)——LangChain 默认、质量好
策略 3——语义 chunk(SemanticChunker)——用 embedding 相似度找语义边界——质量最高、成本也高
策略 4——结构 chunk——Markdown 按 header、HTML 按 tag、Code 按函数——场景专用
策略 5——层级 chunk(parent/child 双层)——见§10.18 ParentDocumentRetriever——小 chunk 入索引、大 chunk 做 retrieve
选择原则——
- 通用文档 → 递归
- 技术文档 → 结构(按 header)
- 高质量问答 → 语义
- 长文档 RAG → 层级
一个具体数字——chunk_size=500 + chunk_overlap=50——是 LangChain RecursiveCharacterTextSplitter 的"默认甜点"——值得作为起点。
10.35 Rerank——RAG 质量的**"临门一脚"
Retriever 召回 top-20——再用 reranker 重排挑 top-5 喂 LLM——这一步能让 RAG 效果"再上一个台阶"。
两类 rerank 模型——
- Cross-encoder(如
BAAI/bge-reranker-large)——(query, doc) → score直接打分——质量最高、速度较慢 - LLM-based(让 GPT-4o-mini 给每个 doc 打 1-10 分)——质量好、成本高、可定制
LangChain 集成——ContextualCompressionRetriever + CrossEncoderReranker ——一行组合。
典型效果——MRR 提升 20-40%——尤其是 long-tail query——投入产出比极高。
工程实践——生产 RAG 默认"retriever 召回 20 + reranker 取 5"——成为新标准。
10.36 查询改写——**"用户 query 不等于好 retrieval query"
用户问 "那个我上次说的 bug 怎么样了"——直接用这句 embedding 去检索——废。
查询改写——在 retrieve 前用 LLM 把 query 重写成"适合检索的形式":
- HyDE(Hypothetical Document Embeddings)——先让 LLM 生成一段"假想答案"、用这段 embedding 去检索——比原 query embedding 更准
- Query Decomposition——把复杂问题拆成多个子问题、各自检索、合并结果
- Multi-Query——LLM 生成 5 种 query 变体、并发检索、RRF 合并
LangChain 集成——MultiQueryRetriever / HyDERetriever ——几行代码。
性能代价——每次检索多一次 LLM 调用——成本 + 延迟——但精度提升通常值得(10-20% 召回增益)。
10.37 **"Document 的 metadata 是第一公民"
Document.metadata 字段常被当附属信息——实际它是生产 RAG 的"钥匙":
metadata 的生产用途——
- 过滤——
similarity_search(query, filter={"source": "internal_docs"})—— 只搜内部文档 - 权限——
filter={"user_id": current_user.id}—— 按用户隔离知识库 - 多租户——
filter={"tenant": "acme_corp"}—— 同一 vector store 服务多客户 - 时效性——
filter={"updated_after": last_week}—— 只搜最近更新 - 评分加权——metadata.priority 在合并时作为 boost factor
metadata 设计原则——
- 小——metadata 字段太大会让向量 db 慢(一般 < 1KB/doc)
- 可索引——常用 filter 字段"建索引"
- 标准化——枚举值统一(
type: "pdf" | "html" | "md")——方便过滤
一个常见错误——把整个原文 page_content 复制进 metadata——vector store 存储 × 2、费用 × 2——保持 metadata 精简。
10.38 最后一张**"RAG 系统工程 checklist"
12 项生产 RAG 上线前的自检——
- chunking 策略选对(§10.34)
- embedding 模型和检索 query 用同一个(§10.32)
- chunk_size + overlap 跑过实验(500 + 50 起步)
- 用 RecursiveCharacterTextSplitter 而不是 CharacterTextSplitter
- metadata 设计含权限/租户字段(§10.37)
- retrieve 用 MMR 或混合召回(§10.13 / §10.17)
- 加 reranker(§10.35)
- 尝试 HyDE / MultiQuery 查询改写(§10.36)
- 用 RAGAS 跑 eval(§10.22)
- LangSmith trace 开启(§10.28)
- 成本监控:按 tenant 切片 embedding + LLM 成本
- 有 fallback 策略(retriever 挂了是否有备选)
打勾 ≥ 10 项——你的 RAG 生产级——少于 7 项——上线前再调。
10.40 一段额外的**"embedding provider 成本实测"**
2026 年 4 月各家 embedding 官方定价——
| Provider | 模型 | 维度 | $/M tokens | 1M 文档 (×500 tok) 成本 |
|---|---|---|---|---|
| OpenAI | text-embedding-3-small | 1536 | $0.02 | $10 |
| OpenAI | text-embedding-3-large | 3072 | $0.13 | $65 |
| Cohere | embed-v3 | 1024 | $0.10 | $50 |
| Voyage | voyage-3-lite | 512 | $0.02 | $10 |
| HuggingFace bge-large | 1024 | 本地 | 0 + GPU 电费 | ≈ $1-5 |
三个关键观察——
- text-embedding-3-small 是"性价比之王"——$10 能嵌完 1M 文档
- text-embedding-3-large 比 small 贵 6.5 倍——但质量提升 ~10%(MTEB benchmark)——值不值得看场景
- 本地 bge 对 10M+ 规模最划算——一次 GPU 跑 12 小时完事
成本优化技巧——只嵌入 changed docs、不重嵌没变的——每次文档更新只产生小增量 embedding 调用。节省比例取决于文档变更率;如果每天只改少量文档,收益很大,如果语料经常全量重建,收益就有限。
这呼应本书第 20 章§20.15 讨论的 Meta-Router——同样的"只为必要的计算付费"理念。
10.41 自建 vector db 的**"一个 weekend 路线"
有读者问——能不能不用任何外部 vector db、自己写一个?—— 能、一个周末:
Day 1——最简
dict[str, np.ndarray]存 embeddingnumpy.dot算余弦- 就是本章§10.14 InMemoryVectorStore 的简化版 < 100 行
Day 2——加 ANN
- 引入
hnswlib库(HNSW 算法的 Python 实现) - 10M 文档秒级检索
- 100-200 行代码
结果——你有了一个"self-hosted vector search"——性能接近 Qdrant/Weaviate、免费。
代价——持久化、分片、副本、一致性——这些才是商用 vector db 的"真正价值"——自建不划算。
教训——核心算法很简单(HNSW / IVF)、产品化才是价值所在——买商用 vector db 买的是"运维省心"。
10.43 主流 vector db 的**"决策矩阵"
2026 年生产选型最常被问的问题——"我应该用哪个 vector db"——6 个候选 + 5 个维度:
| DB | 托管 | 规模 | 性能 | 价格 | 特色 |
|---|---|---|---|---|---|
| Pinecone | 纯托管 | 10B+ | 高 | 贵 | 零运维、企业友好 |
| Weaviate | 自建/托管 | 1B+ | 高 | 中 | 开源、GraphQL 风 |
| Qdrant | 自建/托管 | 1B+ | 高 | 中 | Rust 写的、快 |
| Chroma | 自建为主 | 10M | 中 | 免费 | SQLite 级简单 |
| Milvus | 自建 | 10B+ | 高 | 免费 | 中国团队主力 |
| pgvector | Postgres 插件 | 100M | 中 | 免费 | 不用额外组件 |
典型选型路径——
- 原型 → Chroma / InMemoryVectorStore
- 小规模生产 (< 1M docs) → pgvector(复用现有 PG)
- 中规模 (1M-100M) → Qdrant / Weaviate 自建
- 大规模 (100M+) → Pinecone 托管 / Milvus 自建
- 企业合规 → 自建 Weaviate / Milvus(数据不出境)
一次真实的 cost 对比(10M 文档、$1M tokens/月检索)——
- Pinecone:$500-2000/月(按 pod)
- Qdrant 自建:$200/月 AWS 实例
- pgvector:$100/月(PG 实例就能跑)
省钱档位——pgvector → Qdrant → Pinecone——多的钱换的是运维省心。
10.44 附录:三本其他书的呼应
本章多次提到其他书——一次性集中:
- 第 2 章 LangChain Runnable——Retriever 是 Runnable 的子类——学完 Runnable 再看 Retriever 如鱼得水
- 第 18 章 LangGraph Design Patterns——RAG 场景里 retriever 通常作为 LangGraph 的一个 node——那本书的 tool-using agent 模式直接用
- 第 20 章 MCP Build Server——MCP Server 经常作为 retriever 的"数据源"——本章 retriever 可以通过 MCP 对接外部知识库
三本书交叉读——你就能构建"MCP retriever + LangChain wrap + LangGraph 编排"的企业级 RAG 架构——这是 2026 年的标杆方案。
10.45 **"用 RAG 代替 fine-tune"**的工程辩证
2023-2024 年业界流行"用 RAG 代替 fine-tune"——2025 开始有更细致的辩证:
RAG 的优势——
- 冷启动快——不需要训练数据、几天上线
- 知识可控——每个答案都能 trace 到源文档
- 更新及时——新增文档立刻生效、不用重训
- 成本低——只花 retrieval + LLM API 钱、不花训练钱
RAG 的劣势——
- 推理成本高——每次查询都要 retrieval + LLM,不 scalable 到超高 QPS
- 上下文窗口限制——再多文档也塞不下全部
- 复杂推理弱——多步推理时 chunk 间关系丢失
- 领域适配慢——专业术语、推理模式用 fine-tune 更彻底
2026 年主流做法——混合:
- 事实类查询 → RAG(快、准、可控)
- 风格 / 推理模式 → Fine-tune(内化、规模化)
- 长尾 / 罕见场景 → RAG + few-shot prompt
一个具体数字——某公司 RAG 月成本 2k——规模化后 fine-tune 更省——但早期 RAG 是必经之路。
这个决策——和本书第 20 章§20.15 Meta-Router 思路一致——根据任务类型用最适合的技术。
10.47 **"RAG 事故"**的三种典型形态
生产 RAG 系统里最容易出现的三种故障——
事故 A:chunk_overlap 设 0——chunk 切在句子中间、上下文被砍断、检索召回了"半句话"、LLM 答得莫名其妙。修复:overlap 设 ~10% 的 chunk_size。
事故 B:embedding 模型升级 embed 全库但忘 reindex query 模型——query 端还用旧模型、向量空间不一致、召回"像是随机"。修复:embed 模型变更必须同时更新 build + query 两端。
事故 C:metadata 不加 user_id 隔离——A 客户的文档被检索给了 B 客户、合规事故。修复:filter={"tenant": current_tenant} 必须成为强制约束。
10.48 延伸资料
LangChain 官方 RAG tutorial——github.com/langchain-ai/langchain/tree/master/docs/docs/tutorials/rag——10+ 篇从入门到进阶的 Jupyter notebook。
langchain_community/retrievers/ 目录——包含 BM25Retriever、EnsembleRetriever、ContextualCompressionRetriever 等在 core 里看不到的实现、是本章接口抽象的具体落地。
10.52 一段凝练:把整章浓缩成一句话 + 三组数字
一句话——"Retriever 是 RAG 的"输入输出控制台"——精度决定 LLM 上限、成本决定商业可行。
三组数字——
- chunk_size=500, overlap=50(RecursiveCharacterTextSplitter 甜点)
- fetch_k=20, k=5, lambda_mult=0.5(MMR 默认参数)
- retriever 召回 20 + reranker 取 5(生产 RAG 新标准)
这三组数字是 LangChain 源码和社区实践里最常被复用的缺省参数——调参调优时先从默认值出发、再按业务场景微调、避免一上来就改一堆参数找不到基准。
10.54 "embedding vs rerank 预算分配"
给工程师最常问的成本分配问题一个答案——
100% 预算给不同环节的合理比例——
- 40% embedding(一次性 build 成本)
- 30% vector store 托管费
- 20% rerank(每次查询)
- 10% 其他(ETL / telemetry)
如果需要降 50% 成本——
- 先砍 rerank(影响精度最小)
- 再砍 embedding(换成 bge 本地)
- 最后换 vector store(最难、改动大)
如果需要提精度——
- 先加 rerank(最快)
- 再加 MultiQuery(中等)
- 最后换更大 embedding 模型(最慢)
**这张"钱花在哪里"分配表——是 RAG 成熟团队的"财务地图"。