LangChain 设计与实现
第13章 记忆与会话管理
第13章 记忆与会话管理
引言
大语言模型本身是无状态的 -- 每次调用都是独立的,模型不会"记住"之前的对话内容。要构建一个能够进行连贯对话的 AI 应用,开发者必须在应用层实现记忆机制,将历史上下文注入到每次调用的输入中。
LangChain 的记忆系统经历了两代设计。第一代以 BaseMemory 为基础,通过 ConversationBufferMemory、ConversationSummaryMemory 等类提供了丰富的记忆策略,与 Chain 体系深度集成。第二代以 RunnableWithMessageHistory 为核心,建立在 LCEL 和 BaseChatMessageHistory 之上,提供了更灵活、更现代的会话管理方式。
在最新的源码中,第一代记忆系统的所有类已被标记为 @deprecated,但它们的设计思想和实现模式依然值得深入研究。理解这些设计模式不仅有助于维护使用旧 API 的项目,更重要的是,它们揭示了 AI 应用中记忆管理的核心挑战和解决思路。
本章要点
BaseMemory定义了load_memory_variables/save_context/clear三个核心抽象方法ConversationBufferMemory存储完整对话历史,ConversationSummaryMemory通过 LLM 压缩历史为摘要ConversationTokenBufferMemory按 token 上限裁剪历史,ConversationEntityMemory提取和维护实体知识VectorStoreRetrieverMemory通过向量检索实现语义相关的记忆检索ChatMessageHistory是消息持久化的抽象接口,支持多种存储后端RunnableWithMessageHistory是现代推荐方案,通过包装 Runnable 自动管理消息历史
13.1 BaseMemory:记忆的抽象基础
13.1.1 核心抽象
BaseMemory 定义在 langchain_classic/base_memory.py 中,是所有记忆类的抽象基类:
class BaseMemory(Serializable, ABC):
"""Abstract base class for memory in Chains."""
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
@abstractmethod
def memory_variables(self) -> list[str]:
"""The string keys this memory class will add to chain inputs."""
@abstractmethod
def load_memory_variables(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Return key-value pairs given the text input to the chain."""
async def aload_memory_variables(self, inputs: dict[str, Any]) -> dict[str, Any]:
return await run_in_executor(None, self.load_memory_variables, inputs)
@abstractmethod
def save_context(self, inputs: dict[str, Any], outputs: dict[str, str]) -> None:
"""Save the context of this chain run to memory."""
async def asave_context(self, inputs, outputs) -> None:
await run_in_executor(None, self.save_context, inputs, outputs)
@abstractmethod
def clear(self) -> None:
"""Clear memory contents."""
async def aclear(self) -> None:
await run_in_executor(None, self.clear)
三个核心方法形成了完整的记忆生命周期:
flowchart LR
subgraph "Chain 执行流程"
A["prep_inputs"] --> B["_call"]
B --> C["prep_outputs"]
end
subgraph "Memory 生命周期"
LOAD["load_memory_variables(inputs)"] --> INJECT["注入到 Chain 输入"]
SAVE["save_context(inputs, outputs)"] --> STORE["保存到存储"]
end
A -->|调用| LOAD
INJECT -->|返回给| A
C -->|调用| SAVE
memory_variables:声明此记忆类将向 Chain 输入中添加哪些键。Chain 基类用它来区分哪些输入键来自用户,哪些来自记忆。load_memory_variables(inputs):根据当前输入加载相关的记忆内容。返回的字典将被合并到 Chain 的输入中。save_context(inputs, outputs):在 Chain 执行完成后保存当前轮次的上下文。
注意异步方法全部通过 run_in_executor 提供默认实现,这使得同步记忆类自动获得异步支持,尽管性能不如原生异步实现。
13.1.2 Memory 与 Chain 的集成机制
回顾 Chain 基类的 prep_inputs 和 prep_outputs 方法(第11章详述),Memory 的集成是完全透明的:
class Chain(RunnableSerializable):
memory: BaseMemory | None = None
def prep_inputs(self, inputs):
if not isinstance(inputs, dict):
_input_keys = set(self.input_keys)
if self.memory is not None:
_input_keys = _input_keys.difference(self.memory.memory_variables)
inputs = {next(iter(_input_keys)): inputs}
if self.memory is not None:
external_context = self.memory.load_memory_variables(inputs)
inputs = dict(inputs, **external_context)
return inputs
def prep_outputs(self, inputs, outputs, return_only_outputs=False):
self._validate_outputs(outputs)
if self.memory is not None:
self.memory.save_context(inputs, outputs)
if return_only_outputs:
return outputs
return {**inputs, **outputs}
当一个 Chain 被传入单个字符串而非字典时,prep_inputs 会智能地推断哪个输入键是用户提供的(排除 Memory 提供的变量后剩余的那个)。这是一种"最少惊讶原则"的体现。
13.1.3 Memory 为什么被 Runnable 吃掉——字典 IO → 消息 IO 的范式换轨
BaseMemory 的签名 save_context(inputs: dict, outputs: dict) 藏着它只能适配 Chain、不能适配 Runnable 的根本原因——字典假设。
第 11 章 Chain 明确定义了输入输出是 dict[str, Any]——Memory 顺着这个协议写就行;但第 3 章 Runnable 允许输入输出是任意类型——str / BaseMessage / list[BaseMessage] / dict 全都合法——尤其直接用 chat_model.invoke([HumanMessage(...), AIMessage(...)])——输入就是一个 list、不是 dict——传统 Memory 这里无键可取、只能抛错或退化。
两种 IO 语义的本质差异:
| 维度 | Chain dict IO | Runnable 多态 IO |
|---|---|---|
| 输入形态 | {"input": "hi"} 固定结构 |
str / msg / list / dict 任意 |
| 输出形态 | {"output": "hello"} 固定结构 |
AIMessage / str / dict 等 |
| Memory 抓哪个字段 | 显式 input_key / output_key |
不存在固定字段——必须运行时适配 |
| 与 Chat Model 亲和 | 差(需 AIMessage → str → dict 转换) | 好(原生消息对象) |
RunnableWithMessageHistory 不是 Memory 的"演进"——是 Memory 的"弃用"——它放弃了 dict 假设、改走消息中心主义——§13.10.4-§13.10.5 的 _get_input_messages / _get_output_messages 就是运行时适配层——覆盖 6 种输入 × 2 种输出的所有组合——代价是 15 行"形状归一化"代码。
这个范式换轨对应 LangChain 1.x 的主基调——从"Python 字典原语建模 AI 流程"转向"消息对象原语建模"——与 OpenAI/Anthropic API 的消息列表规范对齐、也与 MCP 的 Message 协议对齐——这不是 LangChain 单独的决定、是整个 LLM 生态的合流。
13.2 BaseChatMemory:对话记忆的中间层
BaseChatMemory 在 BaseMemory 之上增加了 ChatMessageHistory 的集成:
class BaseChatMemory(BaseMemory, ABC):
chat_memory: BaseChatMessageHistory = Field(
default_factory=InMemoryChatMessageHistory
)
output_key: str | None = None
input_key: str | None = None
return_messages: bool = False
def _get_input_output(self, inputs, outputs):
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
if self.output_key is None:
if len(outputs) == 1:
output_key = next(iter(outputs.keys()))
elif "output" in outputs:
output_key = "output"
warnings.warn(...)
else:
raise ValueError(f"Got multiple output keys: {outputs.keys()}")
else:
output_key = self.output_key
return inputs[prompt_input_key], outputs[output_key]
def save_context(self, inputs, outputs):
input_str, output_str = self._get_input_output(inputs, outputs)
self.chat_memory.add_messages([
HumanMessage(content=input_str),
AIMessage(content=output_str),
])
def clear(self):
self.chat_memory.clear()
classDiagram
class BaseMemory {
<<abstract>>
+memory_variables: list~str~
+load_memory_variables(inputs) dict
+save_context(inputs, outputs)
+clear()
}
class BaseChatMemory {
<<abstract>>
+chat_memory: BaseChatMessageHistory
+output_key: str | None
+input_key: str | None
+return_messages: bool
+save_context(inputs, outputs)
+clear()
}
class ConversationBufferMemory {
+memory_key: str
+buffer: Any
+load_memory_variables(inputs)
}
class ConversationSummaryMemory {
+buffer: str
+llm: BaseLanguageModel
+predict_new_summary(messages, existing_summary)
}
class ConversationTokenBufferMemory {
+llm: BaseLanguageModel
+max_token_limit: int
}
class ConversationEntityMemory {
+llm: BaseLanguageModel
+entity_store: BaseEntityStore
+entity_cache: list~str~
}
BaseMemory <|-- BaseChatMemory
BaseChatMemory <|-- ConversationBufferMemory
BaseChatMemory <|-- ConversationSummaryMemory
BaseChatMemory <|-- ConversationTokenBufferMemory
BaseChatMemory <|-- ConversationEntityMemory
BaseChatMemory 的关键设计包括:
- 自动推断输入输出键:当
input_key或output_key未设置时,会自动推断。如果有多个输出键但包含"output",会发出警告并使用"output"。 - return_messages 开关:控制
load_memory_variables返回的是消息对象列表还是格式化的字符串。Chat Model 通常需要消息列表,而 Text LLM 需要字符串。 - 委托给 ChatMessageHistory:实际的消息存储委托给
chat_memory属性,实现了记忆策略与存储后端的分离。
13.3 ConversationBufferMemory:完整历史保留
最简单直接的记忆策略 -- 保留完整的对话历史:
class ConversationBufferMemory(BaseChatMemory):
human_prefix: str = "Human"
ai_prefix: str = "AI"
memory_key: str = "history"
@property
def buffer(self) -> Any:
return self.buffer_as_messages if self.return_messages else self.buffer_as_str
@property
def buffer_as_str(self) -> str:
return get_buffer_string(
self.chat_memory.messages,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
@property
def buffer_as_messages(self) -> list[BaseMessage]:
return self.chat_memory.messages
@property
def memory_variables(self) -> list[str]:
return [self.memory_key]
def load_memory_variables(self, inputs):
return {self.memory_key: self.buffer}
get_buffer_string 将消息列表格式化为字符串,格式如:
Human: 你好
AI: 你好!有什么可以帮助你的?
Human: 今天天气怎么样?
AI: 我无法直接获取天气信息...
human_prefix 和 ai_prefix 允许自定义角色标签,这在多语言场景中很有用。
适用场景:对话轮次较少(不超过模型上下文窗口的场景)。随着对话增长,完整历史会消耗越来越多的 token 预算。
13.4 ConversationSummaryMemory:摘要压缩
当对话变长时,ConversationSummaryMemory 通过 LLM 将历史压缩为摘要:
class SummarizerMixin(BaseModel):
human_prefix: str = "Human"
ai_prefix: str = "AI"
llm: BaseLanguageModel
prompt: BasePromptTemplate = SUMMARY_PROMPT
summary_message_cls: type[BaseMessage] = SystemMessage
def predict_new_summary(self, messages, existing_summary):
new_lines = get_buffer_string(
messages, human_prefix=self.human_prefix, ai_prefix=self.ai_prefix
)
chain = LLMChain(llm=self.llm, prompt=self.prompt)
return chain.predict(summary=existing_summary, new_lines=new_lines)
class ConversationSummaryMemory(BaseChatMemory, SummarizerMixin):
buffer: str = ""
memory_key: str = "history"
def save_context(self, inputs, outputs):
super().save_context(inputs, outputs)
self.buffer = self.predict_new_summary(
self.chat_memory.messages[-2:], self.buffer
)
def load_memory_variables(self, inputs):
if self.return_messages:
buffer = [self.summary_message_cls(content=self.buffer)]
else:
buffer = self.buffer
return {self.memory_key: buffer}
def clear(self):
super().clear()
self.buffer = ""
sequenceDiagram
participant User as 用户
participant Chain as Chain
participant Memory as ConversationSummaryMemory
participant LLM as LLM (摘要)
User->>Chain: 第 N 轮输入
Chain->>Memory: load_memory_variables
Memory-->>Chain: {history: "已有的对话摘要"}
Chain->>Chain: 执行业务逻辑
Chain->>Memory: save_context(inputs, outputs)
Memory->>Memory: 先保存消息到 chat_memory
Memory->>LLM: predict_new_summary(最后2条消息, 已有摘要)
LLM-->>Memory: 更新后的摘要
Memory->>Memory: self.buffer = 新摘要
关键设计点:
- 增量摘要:每次只将最新的两条消息(一轮对话)与现有摘要合并,而非重新摘要全部历史。这使得摘要成本为 O(1)(每轮固定成本)而非 O(n)。
- SummarizerMixin:摘要逻辑被提取为 Mixin,可以被其他记忆类复用(如
ConversationSummaryBufferMemory)。 - summary_message_cls:摘要以
SystemMessage的形式返回,这在 Chat Model 中会被放置在消息序列的开头,提供全局上下文。
from_messages 工厂方法支持从已有消息历史构建摘要记忆:
@classmethod
def from_messages(cls, llm, chat_memory, *, summarize_step=2, **kwargs):
obj = cls(llm=llm, chat_memory=chat_memory, **kwargs)
for i in range(0, len(obj.chat_memory.messages), summarize_step):
obj.buffer = obj.predict_new_summary(
obj.chat_memory.messages[i : i + summarize_step], obj.buffer
)
return obj
13.4.1 ConversationSummaryBufferMemory:摘要 + 滑动窗口的混合体
ConversationSummaryMemory 把全部历史都压成摘要——摘要可能会丢掉最近几轮的细节;ConversationBufferMemory 反过来保留最近全部细节——但超窗。ConversationSummaryBufferMemory 是两者的折中——最近 N token 原样保留、更早的被增量摘要(libs/langchain/langchain_classic/memory/summary_buffer.py、148 行、截至 main 分支 2026-04-22):
class ConversationSummaryBufferMemory(BaseChatMemory, SummarizerMixin):
max_token_limit: int = 2000
moving_summary_buffer: str = "" # 累积的"早期对话"摘要
memory_key: str = "history"
def save_context(self, inputs, outputs):
super().save_context(inputs, outputs)
self.prune()
def prune(self):
buffer = self.chat_memory.messages
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
if curr_buffer_length > self.max_token_limit:
pruned_memory = []
while curr_buffer_length > self.max_token_limit:
pruned_memory.append(buffer.pop(0))
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
self.moving_summary_buffer = self.predict_new_summary(
pruned_memory, self.moving_summary_buffer,
)
关键洞察:prune 把"要扔掉的最早消息"喂给 predict_new_summary——被移出窗口的消息不是丢弃、而是并入累积摘要。输出给 LLM 的 history 字段是"早期摘要 SystemMessage + 最近原始消息"两段拼接,相当于一个**"老的粗粒度 + 新的细粒度"**的两级缓存。
与 §13.5 ConversationTokenBufferMemory 的区别是——后者直接把老消息丢掉、无摘要——信息永久丢失;ConversationSummaryBufferMemory 有摘要兜底、代价是每次淘汰都要调一次 LLM。生产中通常:token 预算够 LLM 成本不敏感选摘要 buffer、token 紧 LLM 成本敏感选 token buffer。
13.4.2 ConversationBufferWindowMemory:最近 k 轮滑动窗口
再往轻量走一层——ConversationBufferWindowMemory(buffer_window.py、59 行、截至 main 分支 2026-04-22)——只保留最近 k 轮对话、不做摘要、不按 token 算、按对话轮数裁:
class ConversationBufferWindowMemory(BaseChatMemory):
human_prefix: str = "Human"
ai_prefix: str = "AI"
memory_key: str = "history"
k: int = 5 # 默认保留最近 5 轮
@property
def buffer(self):
messages = self.chat_memory.messages[-self.k * 2 :] if self.k > 0 else []
# ...格式化为 str 或原消息列表
self.k * 2 是因为一轮对话 = human + ai 两条消息——k=5 意味着保留 10 条消息。代码 59 行、比 ConversationTokenBufferMemory 的 60+ 行还短——是所有 Memory 类里最简单的一个。适用场景——"只要最近几轮上下文、不关心更早"的 FAQ 类 bot——比如"这个订单什么情况→上一条订单号还记得就行"。
13.4.3 CombinedMemory:多 Memory 的组合器
CombinedMemory(combined.py、85 行、截至 main 分支 2026-04-22)是个装饰器型类——把多个 Memory 对象合并成一个接口:
class CombinedMemory(BaseMemory):
memories: list[BaseMemory]
@property
def memory_variables(self) -> list[str]:
return [v for mem in self.memories for v in mem.memory_variables]
@model_validator(mode="after")
def _check_repeated_memory_variable(self):
# 多个子 Memory 不能注入同名变量、否则覆盖
all_vars = [v for m in self.memories for v in m.memory_variables]
if len(all_vars) != len(set(all_vars)):
raise ValueError(f"The same variable {all_vars} is used in multiple memories.")
典型用法——一个 agent 同时要"最近 5 轮对话 + 实体摘要"——用 CombinedMemory([ConversationBufferWindowMemory(memory_key="chat_history"), ConversationEntityMemory(chat_history_key="entities_history")])——两个子 Memory 各注入不同名的变量、prompt template 里用 {chat_history} 和 {entities_history} 两个槽位引用。
重名校验_check_repeated_memory_variable 是这个类唯一的"业务逻辑"——其余 load_memory_variables / save_context / clear 都是纯粹的 for-loop 转发。这种"接口聚合 + 转发"模式是 §13.11.1 策略模式的组合拳变体——让用户能"把策略串起来用"。
13.5 Memory 子类 API 矩阵与源码账本
前面 §13.3-§13.7 + §13.4.1-§13.4.3 逐个讲了 8 个 classic Memory 实现——这里给一张全景对比表——把它们的触发条件 / 成本模型 / 状态字段 / 源码位置 / 典型场景摆齐——帮读者快速选型:
| Memory 类 | 源码文件 | 行数 | 触发裁剪 | 裁剪动作 | LLM 依赖 | 典型场景 |
|---|---|---|---|---|---|---|
ConversationBufferMemory |
buffer.py |
173 | 无 | 不裁 | 否 | 短对话(<20 轮) |
ConversationBufferWindowMemory |
buffer_window.py |
59 | 消息数 > k*2 |
丢最早 | 否 | FAQ bot、只看近期 |
ConversationTokenBufferMemory |
token_buffer.py |
≤85 | token > max_token_limit |
丢最早 | 仅 tokenizer | 严格 token 预算 |
ConversationSummaryMemory |
summary.py |
168 | 每轮都摘 | 最新 2 条 → 增量摘要 | 是(每轮 1 次) | 长对话、token 成本低 |
ConversationSummaryBufferMemory |
summary_buffer.py |
148 | token > 上限 | 老消息 → 摘要 | 是(仅超限时) | 长对话、混合粒度 |
ConversationEntityMemory |
entity.py |
611 | 每轮都抽实体 | 实体 → 外部 store | 是(每轮 2 次) | 需要跟踪人物/产品 |
VectorStoreRetrieverMemory |
vectorstore.py |
122 | 按查询召回 | 不裁(向量库无限扩展) | 仅 embedding | 无限历史 |
CombinedMemory |
combined.py |
85 | 委托 | 委托 | 委托 | 多策略叠加 |
三个维度的权衡轴:
- 纵轴:上下文保真度——Buffer > Summary Buffer > Window > Summary > Entity(前者保留原句、后者保留"梗概/结构化")
- 横轴:成本——Buffer / Window 0 LLM 调用、Summary Buffer 仅超限时调、Summary 每轮 1 次、Entity 每轮 2 次(提取 + 摘要)
- 深度轴:存储耦合——Buffer/Window/Summary/TokenBuffer 只需
BaseChatMessageHistory、Entity 额外需BaseEntityStore、VectorStore 需VectorStoreRetriever——耦合越深、可替换性越弱
ConversationEntityMemory 611 行占总量 30%——这是过度工程的教训:用 LLM 维护结构化实体知识库听起来很美、但提取 prompt 对模型很挑、质量不稳定——被后来的 LangGraph + 外置 RAG 替代(§13.10.5 细谈)。现在它还在代码库里、纯属兼容老代码。
13.6 ConversationTokenBufferMemory:按 token 裁剪
当需要精确控制 token 用量时,ConversationTokenBufferMemory 按 token 上限从前向后裁剪历史消息:
class ConversationTokenBufferMemory(BaseChatMemory):
human_prefix: str = "Human"
ai_prefix: str = "AI"
llm: BaseLanguageModel
memory_key: str = "history"
max_token_limit: int = 2000
def save_context(self, inputs, outputs):
super().save_context(inputs, outputs)
buffer = self.chat_memory.messages
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
if curr_buffer_length > self.max_token_limit:
pruned_memory = []
while curr_buffer_length > self.max_token_limit:
pruned_memory.append(buffer.pop(0))
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
flowchart TD
SAVE["save_context 被调用"] --> ADD["添加新消息到 chat_memory"]
ADD --> CALC["计算当前总 token 数"]
CALC --> CHECK{超过 max_token_limit?}
CHECK -->|否| DONE[结束]
CHECK -->|是| POP["移除最早的消息"]
POP --> RECALC["重新计算 token 数"]
RECALC --> RECHECK{仍超过限制?}
RECHECK -->|是| POP
RECHECK -->|否| DONE
这种策略的核心优势是 精确的 token 预算控制。不同于按消息数量裁剪(可能导致 token 数量波动很大),它直接使用 LLM 的 tokenizer 计算真实 token 数,确保不会超出模型的上下文窗口。
需要注意的是,llm.get_num_tokens_from_messages 方法的准确性取决于具体的 LLM 实现。不同模型的 tokenizer 不同,因此这里传入的 llm 参数应该与实际使用的模型一致。
13.7 ConversationEntityMemory:实体级知识管理
ConversationEntityMemory 是最复杂的记忆实现,它使用 LLM 从对话中提取命名实体并维护每个实体的摘要:
class ConversationEntityMemory(BaseChatMemory):
llm: BaseLanguageModel
entity_extraction_prompt: BasePromptTemplate = ENTITY_EXTRACTION_PROMPT
entity_summarization_prompt: BasePromptTemplate = ENTITY_SUMMARIZATION_PROMPT
entity_cache: list[str] = []
k: int = 3 # 用于实体提取的最近消息对数
chat_history_key: str = "history"
entity_store: BaseEntityStore = Field(default_factory=InMemoryEntityStore)
@property
def memory_variables(self) -> list[str]:
return ["entities", self.chat_history_key]
load_memory_variables 做了两件事:提取实体名称,并查询每个实体的现有摘要:
def load_memory_variables(self, inputs):
chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt)
buffer_string = get_buffer_string(
self.buffer[-self.k * 2 :],
human_prefix=self.human_prefix, ai_prefix=self.ai_prefix,
)
output = chain.predict(history=buffer_string, input=inputs[prompt_input_key])
if output.strip() == "NONE":
entities = []
else:
entities = [w.strip() for w in output.split(",")]
entity_summaries = {}
for entity in entities:
entity_summaries[entity] = self.entity_store.get(entity, "")
self.entity_cache = entities
if self.return_messages:
buffer = self.buffer[-self.k * 2 :]
else:
buffer = buffer_string
return {self.chat_history_key: buffer, "entities": entity_summaries}
save_context 为每个缓存的实体生成更新后的摘要:
def save_context(self, inputs, outputs):
super().save_context(inputs, outputs)
buffer_string = get_buffer_string(self.buffer[-self.k * 2 :], ...)
chain = LLMChain(llm=self.llm, prompt=self.entity_summarization_prompt)
for entity in self.entity_cache:
existing_summary = self.entity_store.get(entity, "")
output = chain.predict(
summary=existing_summary, entity=entity,
history=buffer_string, input=inputs[prompt_input_key]
)
self.entity_store.set(entity, output.strip())
flowchart TD
subgraph "load_memory_variables"
L1[获取最近 k 轮对话] --> L2["LLM 提取实体名称"]
L2 --> L3{提取到实体?}
L3 -->|是| L4["从 entity_store 查询每个实体的摘要"]
L3 -->|否| L5["entities = 空字典"]
L4 --> L6["返回 {history, entities}"]
L5 --> L6
end
subgraph "save_context"
S1["保存消息到 chat_memory"] --> S2["遍历 entity_cache 中的每个实体"]
S2 --> S3["查询现有摘要"]
S3 --> S4["LLM 生成更新后的摘要"]
S4 --> S5["写入 entity_store"]
end
13.7.1 ConversationEntityMemory 为什么被弃用——三个失败模式
§13.5 的 API 矩阵指出 ConversationEntityMemory 每轮要调 2 次 LLM(提取 + 摘要)、代价极高——但真正让它被淘汰的不是成本、是可靠性:
- 实体提取 prompt 的脆弱性——
ENTITY_EXTRACTION_PROMPT(在prompt.py164 行里)依赖 LLM 按"逗号分隔的实体名"格式返回、否则output.split(",")会把非实体的句子碎片化成假实体。当 LLM 返回"Sure, the entities are: John, Paris"——split 出来的 "Sure"、"the entities are"、"John"、"Paris" 都进了 entity_cache——"Sure" 被当成实体、触发一次 LLM 生成关于"Sure"的摘要——荒谬的浪费。 - 摘要幻觉累积——
predict_new_summary把旧摘要 + 新对话喂回 LLM 生成更新后的摘要——LLM 每次都有概率添油加醋——比如对话提到 John 去了巴黎、LLM 摘要写"John 去了巴黎旅游"("旅游"是幻觉)——下一轮摘要基于这个、可能变成"John 去巴黎旅游了一周"("一周"又是幻觉)——错误信息像雪球一样滚大——几轮后 entity_store 里的摘要和真实对话完全脱节。 - Entity 粒度不可控——LLM 可能把"John"和"John Smith"识别为两个不同实体、分别维护两份摘要——也可能把"my dog"和"Rex"识别为同一实体——合并/分裂的决策权完全交给 LLM、人类无法介入。
对比 LangGraph 的 StateGraph——用户显式定义 TypedDict 声明哪些字段是状态(user_name: str、user_location: str)——LLM 的工作是填字段而非发明字段——失效域从"整个实体字典"收窄到"字段级 schema 违反"——可验证、可重放、可测试。这就是为什么 §13.11.3 说 ConversationEntityMemory 是"想法好但工程化失败"的代表——理念对、实现在 LLM 可靠性不足的时代不可持续。
13.7.2 BaseEntityStore 与存储后端
实体存储通过 BaseEntityStore 抽象,支持多种后端:
class BaseEntityStore(BaseModel, ABC):
@abstractmethod
def get(self, key: str, default: str | None = None) -> str | None: ...
@abstractmethod
def set(self, key: str, value: str | None) -> None: ...
@abstractmethod
def delete(self, key: str) -> None: ...
@abstractmethod
def exists(self, key: str) -> bool: ...
@abstractmethod
def clear(self) -> None: ...
LangChain 提供了四种内置实现:
| 实现类 | 存储位置 | 特点 |
|---|---|---|
InMemoryEntityStore |
内存字典 | 简单快速,进程重启后丢失 |
RedisEntityStore |
Redis | 支持 TTL,适合分布式 |
UpstashRedisEntityStore |
Upstash Redis | 无服务器 Redis |
SQLiteEntityStore |
SQLite 文件 | 单机持久化 |
Redis 实现支持自动过期(TTL),默认实体在创建后 24 小时过期,每次读取延长 3 天:
class RedisEntityStore(BaseEntityStore):
ttl: int | None = 60 * 60 * 24 # 1 天
recall_ttl: int | None = 60 * 60 * 24 * 3 # 3 天
def get(self, key, default=None):
return self.redis_client.getex(
f"{self.full_key_prefix}:{key}", ex=self.recall_ttl
) or default or ""
这种"读取即延期"的设计巧妙地实现了 LRU(最近最少使用)语义:经常被提及的实体持续存活,长期不被提及的实体自动过期清除。
13.8 VectorStoreRetrieverMemory:语义检索记忆
VectorStoreRetrieverMemory 与前面的记忆类有本质区别 -- 它不是按时间顺序保留对话,而是将每轮对话存入向量数据库,然后根据当前输入的语义相似度检索相关的历史片段:
class VectorStoreRetrieverMemory(BaseMemory):
retriever: VectorStoreRetriever = Field(exclude=True)
memory_key: str = "history"
input_key: str | None = None
return_docs: bool = False
exclude_input_keys: Sequence[str] = Field(default_factory=tuple)
def load_memory_variables(self, inputs):
input_key = self._get_prompt_input_key(inputs)
query = inputs[input_key]
docs = self.retriever.invoke(query)
return self._documents_to_memory_variables(docs)
def save_context(self, inputs, outputs):
documents = self._form_documents(inputs, outputs)
self.retriever.add_documents(documents)
def _form_documents(self, inputs, outputs):
exclude = set(self.exclude_input_keys)
exclude.add(self.memory_key)
filtered_inputs = {k: v for k, v in inputs.items() if k not in exclude}
texts = [f"{k}: {v}" for k, v in list(filtered_inputs.items()) + list(outputs.items())]
page_content = "\n".join(texts)
return [Document(page_content=page_content)]
flowchart TD
subgraph "save_context"
SC1["过滤掉排除的输入键"]
SC2["将输入和输出合并为文本"]
SC3["创建 Document 对象"]
SC4["添加到 VectorStore"]
SC1 --> SC2 --> SC3 --> SC4
end
subgraph "load_memory_variables"
LM1["获取当前输入文本"]
LM2["用输入文本作为查询"]
LM3["从 VectorStore 检索相似文档"]
LM4["返回相关历史片段"]
LM1 --> LM2 --> LM3 --> LM4
end
这种记忆策略的独特优势在于:
- 无上下文窗口限制:向量数据库可以存储无限量的对话历史
- 语义相关性:检索到的不是最近的对话,而是与当前话题最相关的对话
- 长期记忆:即使是很久之前的对话,只要语义相关就能被召回
注意 _form_documents 方法中的 exclude_input_keys 设计 -- 它允许排除某些输入键(如 Memory 自身注入的 history),避免在存储时产生自引用的循环。
13.8.1 向量检索记忆的三个失效模式
VectorStoreRetrieverMemory 号称"无限容量的长期记忆"——但实战中坑比预想的多——三个最常见的失效模式:
- "相似但无关"的召回——向量检索按 embedding 距离算"相似度"、语义相似 ≠ 对话相关。用户问"怎么重置密码"、检索可能召回一年前"我设置了新密码是 abc123"——embedding 很接近、但那是用户在跟 AI 闲聊时自曝的私密信息、不该再出现在当前上下文——泄露风险 + 上下文污染。
- 自举污染——
_form_documents把输入和输出拼成一条文档存入向量库——如果 AI 某次回答有幻觉(比如错说"巴黎是英国首都")、这条幻觉会永久留在记忆里——下次用户问相关问题、召回带幻觉的旧对话、AI 看到"自己说过巴黎是英国首都"——会继续坚持错误——幻觉被记忆固化。§13.10.5 讨论的消息保真度问题在这里被放大。 - 冷启动和尺度失衡——对话只有 5 条时、向量检索随便返回一条都叫相似——召回毫无意义;对话有 100 万条时、embedding 需要频繁重算(换 embedding 模型时所有历史要重索引)、迁移成本极高。VectorStoreRetrieverMemory 在中间规模(50-5000 条)区间才有甜点。
生产替代——把向量检索降级为RAG 管道的一个检索器(第 10 章讲)、而不是 Memory 的替代品——对话历史用 §13.4.1 ConversationSummaryBufferMemory 保留最近 N 轮原文、另起一个 vector store 存"有价值的长期事实"——两者职责分离、避免"召回自己说过的幻觉"这种自举污染。
13.9 ChatMessageHistory:消息持久化层
13.9.1 BaseChatMessageHistory
所有记忆策略的底层消息存储都通过 BaseChatMessageHistory 接口访问:
class BaseChatMessageHistory(ABC):
messages: list[BaseMessage]
async def aget_messages(self) -> list[BaseMessage]:
return await run_in_executor(None, lambda: self.messages)
def add_user_message(self, message: HumanMessage | str) -> None:
if isinstance(message, HumanMessage):
self.add_message(message)
else:
self.add_message(HumanMessage(content=message))
def add_ai_message(self, message: AIMessage | str) -> None:
if isinstance(message, AIMessage):
self.add_message(message)
else:
self.add_message(AIMessage(content=message))
def add_message(self, message: BaseMessage) -> None:
if type(self).add_messages != BaseChatMessageHistory.add_messages:
self.add_messages([message])
else:
raise NotImplementedError(
"add_message is not implemented. Implement add_message or add_messages."
)
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
for message in messages:
self.add_message(message)
@abstractmethod
def clear(self) -> None:
"""Remove all messages from the store."""
add_message 和 add_messages 之间存在一种巧妙的互递归关系:
- 如果子类实现了
add_messages(批量添加),add_message会委托给它 - 如果子类没有实现
add_messages,默认实现会逐个调用add_message
这种设计鼓励子类优先实现批量接口 add_messages(减少 IO 往返),同时保证了向后兼容性。
为什么 add_messages 是"正路"——历史包袱——早期 LangChain 只有 add_user_message + add_ai_message 两个单条 API、Redis/PG 等后端每调一次就一次网络 RTT——在 LCEL 时代一次 invoke 要存 HumanMessage + AIMessage 至少两条——两次 RTT 是浪费。add_messages 引入后、所有新后端都应该原生实现批量——老后端通过 BaseChatMessageHistory 的默认实现继续工作、不破 API。type(self).add_messages != BaseChatMessageHistory.add_messages 这一行检查的含义是——子类有没有覆写 add_messages——如果没覆写、说明子类是老派单条实现、调用 add_messages 会退化成 for 循环调 add_message;如果覆写了、说明是新派、调 add_messages 走一次批量 RTT——双向兼容。
异步镜像 aadd_messages / aadd_message 在最近的 langchain_core 里也已落地——对 async 后端(async Redis、async PG)至关重要——避免把异步驱动卡在 run_in_executor 的线程池里。本章 §13.1.1 已指出 BaseMemory 所有 async 方法通过 run_in_executor 兜底——这是过渡方案;生产中应该用原生异步 ChatMessageHistory 实现、别让 async event loop 被线程池噎住。
13.9.2 InMemoryChatMessageHistory
内存实现是最简单的参考实现:
class InMemoryChatMessageHistory(BaseChatMessageHistory, BaseModel):
messages: list[BaseMessage] = Field(default_factory=list)
def add_message(self, message: BaseMessage) -> None:
self.messages.append(message)
def clear(self) -> None:
self.messages = []
13.9.3 存储后端生态
langchain_community 包中提供了大量的 ChatMessageHistory 实现,覆盖了主流的存储后端:
flowchart TD
BASE[BaseChatMessageHistory]
BASE --> INMEM[InMemoryChatMessageHistory]
BASE --> REDIS[RedisChatMessageHistory]
BASE --> PG[PostgresChatMessageHistory]
BASE --> MONGO[MongoDBChatMessageHistory]
BASE --> DDB[DynamoDBChatMessageHistory]
BASE --> FS[FileChatMessageHistory]
BASE --> SQL[SQLChatMessageHistory]
BASE --> COSMOS[CosmosDBChatMessageHistory]
BASE --> FIREBASE[FirestoreChatMessageHistory]
每种实现针对其存储后端的特性做了优化。例如 Redis 实现使用列表结构存储消息序列,支持自动过期;PostgreSQL 实现使用表结构,支持复杂查询和索引;DynamoDB 实现利用分区键实现按 session_id 高效查询。
13.9.4 session_id 的生命周期与多租户隔离
BaseChatMessageHistory 的协议不管 session 怎么分配——session_id 是调用方传进来的字符串——这个协议松散换来了最大的灵活性、但也把多租户安全的责任推给了上层:
三条生产中踩过的坑——
- session_id 猜测攻击——如果用递增整数(
user-1、user-2)当 session_id、攻击者可以枚举别人的会话——Redis 后端的GET chat:user-123直接读到别人历史。必须用不可猜测的随机值——uuid4()或 HMAC 签名——并在应用层校验"当前登录用户 == session_id 归属用户"。 - session_id 永不过期——
RedisChatMessageHistory默认无 TTL——用户 1 年前的会话消息永远躺在 Redis、积累到百万 key——既浪费内存又违反 GDPR 数据最小化。生产中应强制设置 TTL(比如 30 天)或定期 GC 脚本清理 30 天未访问的会话。 - 跨后端一致性——app 把消息写 Redis、同时把 session 元信息写 PG——
session_id在两边指代的是同一逻辑会话、但 Redis 过期了、PG 还在——查询会"消息无但 session 有"——两个后端需要同步生命周期、或引入定期核对任务。
RunnableWithMessageHistory 提供的多参数 session 键(§13.10.6)是应对多租户的标准姿势——把 session_id 升级为 {user_id, conversation_id} 两级键——后端以 user_id 为分区键(Redis 可用 user:{user_id}:conv:{conv_id} 前缀、PG 用复合主键)——天然隔离不同用户——即使某用户泄露自己的 conversation_id、别人也拿不到、因为需要对应的 user_id 才能组 key。
容量估算——一条 AIMessage 序列化后 JSON 大约 500 字节-2KB(含元信息、tool_call 结构)——BufferMemory 保留 50 轮对话、一会话 50-200KB——1 万活跃用户同时在线就是 500MB-2GB——Redis 内存会迅速吃满——生产中必须配合 ConversationTokenBufferMemory 或定期摘要归档、把旧消息压缩/冷存到 PG 或对象存储。
13.10 RunnableWithMessageHistory:现代方式
13.10.1 设计动机
传统 Memory 系统与 Chain 紧耦合,无法直接用于 LCEL 管道。RunnableWithMessageHistory 解决了这个问题 -- 它是一个 Runnable 包装器,可以为任何 Runnable 自动管理消息历史。
class RunnableWithMessageHistory(RunnableBindingBase):
get_session_history: GetSessionHistoryCallable
input_messages_key: str | None = None
output_messages_key: str | None = None
history_messages_key: str | None = None
history_factory_config: Sequence[ConfigurableFieldSpec]
13.10.2 初始化过程
构造函数构建了一个精巧的 Runnable 管道:
def __init__(self, runnable, get_session_history, *,
input_messages_key=None, output_messages_key=None,
history_messages_key=None, history_factory_config=None, **kwargs):
# 1. 构建历史加载链
history_chain = RunnableLambda(
self._enter_history, self._aenter_history
).with_config(run_name="load_history")
# 2. 如果需要,将历史注入到输入字典的指定键
messages_key = history_messages_key or input_messages_key
if messages_key:
history_chain = RunnablePassthrough.assign(
**{messages_key: history_chain}
).with_config(run_name="insert_history")
# 3. 为 runnable 附加退出监听器(保存历史)
runnable_sync = runnable.with_listeners(on_end=self._exit_history)
runnable_async = runnable.with_alisteners(on_end=self._aexit_history)
# 4. 组装完整管道
bound = (
history_chain
| RunnableLambda(_call_runnable_sync, _call_runnable_async)
.with_config(run_name="check_sync_or_async")
).with_config(run_name="RunnableWithMessageHistory")
flowchart LR
subgraph "RunnableWithMessageHistory 内部管道"
A["_enter_history: 加载历史消息"]
B["RunnablePassthrough.assign: 注入到 history_messages_key"]
C["被包装的 Runnable 执行"]
D["_exit_history: 保存输入和输出消息"]
end
INPUT[用户输入] --> A --> B --> C --> OUTPUT[输出]
C -.->|"on_end 监听器"| D
13.10.3 历史加载与保存
def _enter_history(self, value, config):
hist = config["configurable"]["message_history"]
messages = hist.messages.copy()
if not self.history_messages_key:
# 没有独立的历史键:将输入消息追加到历史消息后面
input_val = value if not self.input_messages_key else value[self.input_messages_key]
messages += self._get_input_messages(input_val)
return messages
def _exit_history(self, run, config):
hist = config["configurable"]["message_history"]
inputs = load(run.inputs, allowed_objects="all")
input_messages = self._get_input_messages(inputs)
if not self.history_messages_key:
# 去除已有的历史消息,避免重复
historic_messages = config["configurable"]["message_history"].messages
input_messages = input_messages[len(historic_messages):]
output_val = load(run.outputs, allowed_objects="all")
output_messages = self._get_output_messages(output_val)
hist.add_messages(input_messages + output_messages)
_exit_history 中的去重逻辑很重要:当 history_messages_key 未设置时,输入消息是历史消息 + 当前消息的组合。保存时需要去除历史部分,只保存当前轮次的新消息。
13.10.4 _get_input_messages 的 5 路输入规整
上面 _enter_history 和 _exit_history 都靠 _get_input_messages 把"任意形态的输入"归一成 list[BaseMessage]。这个函数是整个 RunnableWithMessageHistory 能兼容各种上游 Runnable 的基础——贴完整实现(langchain_core/runnables/history.py:443):
def _get_input_messages(self, input_val) -> list[BaseMessage]:
# ① dict:按约定抽 key
if isinstance(input_val, dict):
if self.input_messages_key:
key = self.input_messages_key
elif len(input_val) == 1:
key = next(iter(input_val.keys()))
else:
key = "input"
input_val = input_val[key]
# ② str → HumanMessage 自动包装
if isinstance(input_val, str):
return [HumanMessage(content=input_val)]
# ③ 单个 BaseMessage → 包成单元素 list
if isinstance(input_val, BaseMessage):
return [input_val]
# ④ Sequence 处理
if isinstance(input_val, (list, tuple)):
if len(input_val) == 0:
return list(input_val) # 空 list 短路
if isinstance(input_val[0], list): # batch unwrap
if len(input_val) != 1:
raise ValueError(f"Expected a single list of messages. Got {input_val}.")
return input_val[0]
return list(input_val)
# ⑤ 兜底错误
raise ValueError(f"Expected str, BaseMessage, list[BaseMessage], ... Got {input_val}.")
5 条路径各自对应一个真实用户场景:
① dict 路径的三级 key 回退:
self.input_messages_key显式指定——最严格- 单 key dict——约定优于配置:"如果你的 dict 只有一个 key、就取那个",省略配置
- 默认回退到
"input"——LCEL 中最常见的 key 名
这种"显式 > 约定 > 默认"的三级回退是整个 LangChain 的 API 设计模式——第 8 章 tool 注入、第 10 章 retriever 子类检测都是一样的思路。
② 字符串 → HumanMessage 自动包装:这让用户可以写 chain.invoke("hello") 而不是 chain.invoke({"input": HumanMessage(content="hello")})。是 第 3 章 LCEL 讲的 coerce_to_runnable 的对称操作——LCEL 尽可能不让用户关心"我的输入要包装成什么"。
③ 单 BaseMessage 包成单元素 list:同理,chain.invoke(HumanMessage(...)) 也 work。
④ Sequence 处理里的两个子 case:
- 空 list 短路(line 465-466):防止后面
input_val[0]访问空 list 越界。这一行极其微小但在"用户传空历史"场景下保命。 - "list of list" batch unwrap(line 469-473)——这是为了兼容 ChatModel 的 batch API:
chat_model.invoke([[msg1, msg2]])(batch size=1)的输入形态。内层才是真正的消息列表。如果 batch size > 1,整个RunnableWithMessageHistory的语义就不成立(每条消息对应不同会话?无法解释),抛错要求 batch=1。
⑤ 所有上述情况都没命中,兜底抛 ValueError 告诉用户接受的类型清单——作为使用文档的运行时版本。
13.10.5 _get_output_messages 里"直接 wrap ChatModel"的补丁
_get_output_messages 基本对称但有一个额外补丁(line 494-495):
# If you are wrapping a chat model directly
# The output is actually this weird generations object
if key not in output_val and "generations" in output_val:
output_val = output_val["generations"][0][0]["message"]
else:
output_val = output_val[key]
用户可以直接 RunnableWithMessageHistory(chat_model, get_session_history)——不接 prompt、不加 output parser。ChatModel 的返回是 {"generations": [[{"message": AIMessage(...)}]]} 这种 nested 结构(LCEL 内部表示)、不是普通 {"output": AIMessage(...)}。这个补丁检测到没 key 但有 generations 就按 ChatModel 原生结构抽消息。
双层索引 [0][0] 对应 ChatModel 的"每 batch 每 n(候选数)"输出维度——直接 wrap 时 batch=1、n=1、两层都是 0。注释原文 "this weird generations object" 是作者的自嘲——这个格式不优雅但是历史遗留、只能在这一处补丁处理。
这两个函数合起来是 "用 LCEL 的语法写兼容性极强的适配器" 的范本——15 行代码把 6 种输入形态、2 种输出形态规整为统一的 list[BaseMessage]。用户写 chain | model | output_parser 或 chain | model 或 model 或纯 lambda x: ...——RunnableWithMessageHistory 都能正确捕获消息。这是为什么它能作为 LangChain 推荐的"新 Memory"——不是功能更多、是适配面更广。
13.10.6 Session 管理
_merge_configs 方法处理 session 的创建和查找:
def _merge_configs(self, *configs):
config = super()._merge_configs(*configs)
expected_keys = [field_spec.id for field_spec in self.history_factory_config]
configurable = config.get("configurable", {})
if len(expected_keys) == 1:
message_history = self.get_session_history(configurable[expected_keys[0]])
else:
message_history = self.get_session_history(
**{key: configurable[key] for key in expected_keys}
)
config["configurable"]["message_history"] = message_history
return config
默认情况下,get_session_history 接受一个 session_id 参数。通过 history_factory_config 可以自定义参数(如 user_id + conversation_id):
# 默认:单一 session_id
chain_with_history = RunnableWithMessageHistory(
chain, get_session_history,
input_messages_key="question",
history_messages_key="history",
)
chain_with_history.invoke(
{"question": "..."},
config={"configurable": {"session_id": "abc123"}}
)
# 自定义:user_id + conversation_id
chain_with_history = RunnableWithMessageHistory(
chain, get_session_history,
input_messages_key="question",
history_messages_key="history",
history_factory_config=[
ConfigurableFieldSpec(id="user_id", annotation=str, ...),
ConfigurableFieldSpec(id="conversation_id", annotation=str, ...),
],
)
chain_with_history.invoke(
{"question": "..."},
config={"configurable": {"user_id": "user1", "conversation_id": "conv1"}}
)
13.10.7 get_session_history factory 的并发陷阱
get_session_history 是个工厂函数——返回某个 session_id 对应的 BaseChatMessageHistory 实例。官方示例经常写成:
store = {} # session_id → BaseChatMessageHistory
def get_session_history(session_id: str):
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
这个"懒加载字典"模式在单进程单线程下完美——但生产 web server 经常是多线程 / 多进程 / 异步的——踩过三个坑:
- 字典竞态——两个线程同时收到
session_id="abc"的请求、都看到"abc" not in store、都 new 一个InMemoryChatMessageHistory——后写的赢、另一个实例的消息就此丢失。修复是加锁或者把store换成collections.defaultdict(InMemoryChatMessageHistory)——后者的__missing__由 Python 保证单次。 - 进程间不共享——
uvicorn --workers 4起 4 个 worker 进程、每个进程自己一份store——用户两次请求被 nginx 分到不同 worker、消息历史完全错乱。这是为什么生产必须用 Redis/PG 后端——store只能做本地开发/单元测试。 - Memory 泄漏——
store永不 evict——10 万 session × 50 条消息 × 2KB ≈ 10GB RSS——Python 进程 OOM 被 k8s 杀掉。修复是换成 LRU cache(functools.lru_cache+maxsize=10000)或外置存储。
源码佐证——RunnableWithMessageHistory._merge_configs 每次调用都会重新执行 get_session_history——这意味着工厂函数在调用热路径上——如果工厂内部逻辑重(比如每次查 PG 读 metadata)、每次 invoke 都要一次 PG RTT——需要工厂自己做缓存或用 connection pool。
13.11 从传统 Memory 到现代方案的迁移
13.11.1 迁移策略对照
| 传统 Memory | 现代替代方案 |
|---|---|
ConversationBufferMemory |
RunnableWithMessageHistory + InMemoryChatMessageHistory |
ConversationSummaryMemory |
RunnableWithMessageHistory + 自定义摘要逻辑 |
ConversationTokenBufferMemory |
RunnableWithMessageHistory + 消息裁剪 Runnable |
ConversationEntityMemory |
自定义 Runnable 实现实体提取 |
VectorStoreRetrieverMemory |
将向量检索作为 Runnable 步骤集成 |
13.11.2 设计决策:为何弃用传统 Memory
传统 Memory 系统存在几个根本性的限制:
- 与 Chain 耦合:
BaseMemory的save_context(inputs, outputs)签名假设了 Chain 的字典输入输出模式,无法适用于接受消息列表的 Chat Model。 - 不支持工具调用:源码注释中明确指出
BaseChatMemory"does NOT support native tool calling capabilities for chat models and will fail SILENTLY"。 - 隐式的状态管理:Memory 在
prep_inputs/prep_outputs中被隐式调用,调试困难。 - 缺乏流式支持:传统 Memory 无法与流式输出协同工作。
RunnableWithMessageHistory 通过以下方式解决了这些问题:
- 基于
BaseChatMessageHistory,直接操作消息对象 - 使用 Runnable 监听器(
on_end)保存上下文,与流式兼容 - 通过
RunnableConfig管理 session,显式而透明 - 支持任意输入输出格式的自动适配
13.11.3 LangGraph 的进一步演进
在 LangChain 的最新生态中,LangGraph 提供了更加强大的状态管理能力。LangGraph 的 StateGraph 可以管理任意复杂的对话状态,包括消息历史、中间变量、分支条件等。这代表了 AI 应用状态管理从"记忆注入"向"状态图"的范式转变。
13.11.4 langchain_classic/memory/ 真实源码布局:1.x → 0.x 的考古现场
当前版本装 langchain 后、旧 Memory 类都迁到了 libs/langchain/langchain_classic/memory/——"_classic" 是明示的 legacy 包名。这个目录的行数分布非常直观地说明了 LangChain 取舍的轨迹——
核心实现文件(libs/langchain/langchain_classic/memory/*.py、共 2157 行)——
| 文件 | 行 | 角色 |
|---|---|---|
entity.py |
611 | ConversationEntityMemory——最复杂的旧 Memory、近 30% |
vectorstore_token_buffer_memory.py |
183 | 向量 + token 预算组合 |
buffer.py |
173 | ConversationBufferMemory |
summary.py |
168 | ConversationSummaryMemory |
prompt.py |
164 | 5 个 default prompt template(entity extraction / summary / kg) |
summary_buffer.py |
148 | Summary + sliding window 组合 |
__init__.py |
126 | 公共导出表 |
vectorstore.py |
122 | VectorStoreRetrieverMemory |
chat_memory.py |
104 | BaseChatMemory 抽象(本章 §13.2 的主角) |
| 其余 | ≤ 85 | combined / token_buffer / buffer_window / simple / readonly / kg / zep / motorhead / utils |
后端壳子目录(chat_message_histories/、21 个文件)——除 in_memory.py(5 行)、sql.py(33 行)、singlestoredb.py(27 行)外、其余 18 个文件整齐地是 25 行——内容是纯重定向:
# redis.py(实测 25 行)
DEPRECATED_LOOKUP = {
"RedisChatMessageHistory": "langchain_community.chat_message_histories",
}
_import_attribute = create_importer(__package__, deprecated_lookups=DEPRECATED_LOOKUP)
def __getattr__(name): return _import_attribute(name)
__all__ = ["RedisChatMessageHistory"]
三条非显然的结论——
- 21 个后端里 20 个是 25 行模板壳子——真正实现全在
langchain_community包里——这是 LangChain 把**"核心抽象 + 多余依赖"**切开的关键一刀——装langchain核心只拉in_memory和sql、要 Redis/Mongo/DynamoDB 再装langchain_community - 核心实现里
entity.py611 行占 30%——ConversationEntityMemory 当年是想"让 LLM 维护一份结构化实体知识库"——但维护复杂度太高 + 效果不稳定、被 LangGraph 的 StateGraph 淘汰;现在仍在包里、主要是为了让老代码别崩 RunnableWithMessageHistory在 core 包、622 行——而所有 classic memory 在langchain_classic——反映了包边界策略:langchain_core保留通用机制(runnable + history 协议)、langchain_classic冻结不再演进的产物——这条切割让 langchain 的核心依赖保持极简
13.11.5 LangGraph Checkpoint:把会话状态抬升到"状态图"层级
§13.11.3 提到 LangGraph 的 StateGraph 是 classic Memory 的**"后代"——这里给出源码层面的证据**。LangGraph 不再用 BaseMemory 体系、而是 BaseCheckpointSaver 抽象(libs/checkpoint/langgraph/checkpoint/base/__init__.py、截至 main 分支 2026-04-22):
class BaseCheckpointSaver(ABC):
@abstractmethod
def put(self, config, checkpoint, metadata, new_versions) -> RunnableConfig: ...
@abstractmethod
def put_writes(self, config, writes, task_id) -> None: ...
@abstractmethod
def get_tuple(self, config) -> CheckpointTuple | None: ...
@abstractmethod
def list(self, config, *, filter=None, before=None, limit=None) -> Iterator[CheckpointTuple]: ...
# 生产扩展
def delete_thread(self, thread_id) -> None: ...
def copy_thread(self, src, dst) -> None: ...
def prune(self, retention_policy) -> None: ...
对比 BaseChatMessageHistory(§13.9.1)——同样是"持久化抽象"、语义差一个数量级:
| 维度 | BaseChatMessageHistory |
BaseCheckpointSaver |
|---|---|---|
| 持久化粒度 | 单条 BaseMessage |
整个 StateGraph 快照(Checkpoint TypedDict) |
| 主键 | session_id |
thread_id + checkpoint_id |
| 存的是什么 | 消息列表 | channel_values + channel_versions + ts + 父链 |
| 时光回溯 | 否 | 是(list + before 参数) |
| 中间写入 | 否 | put_writes——工具调用前写状态、避免失败重做 |
| 复制会话 | 需自行复制消息 | copy_thread |
| 过期策略 | 后端自己实现 | prune(retention_policy) 内建 |
put_writes 这个方法是分水岭——它让 LangGraph 支持中断恢复——工具调用到一半失败、下次从最后一次 put_writes 恢复、不用从 session 起点重跑。BaseChatMessageHistory 没这个概念——消息要么全写、要么不写、中间态不存在。
CheckpointTuple 的三元组——(checkpoint, config, metadata)——其中 metadata.parent_config 形成父子链表——类似 Git commit graph。同一个 thread_id 的多个 checkpoint 串成历史链、list(thread_id) 按时间倒序遍历——这就是时光回溯的底层结构:读取任意历史 checkpoint、从那点 invoke 就能"重开时间线"。
范式跃迁——
- Classic Memory 是线性的消息队列——"前 N 轮对话 + 现在输入"——是单维度、只向前、无分支的
- LangGraph Checkpoint 是图状快照流——"状态 = 多个 channel 的 值 + 版本号 + 父指针"——是多维度、可回溯、可分支的
第 15-18 章会详谈 LangGraph 的 StateGraph——理解本章后面那些 classic memory 的局限性后、再看 StateGraph 的状态机建模会豁然开朗。对于生产环境、新项目应直接用 LangGraph——Memory 体系仅作为老项目维护参考。
13.11.6 选型决策树:三类场景对号入座
综合本章所有讨论——给三类典型场景的技术选型建议:
场景 A:单轮 FAQ / 客服机器人(无跨会话状态)
- 首选:
RunnableWithMessageHistory+InMemoryChatMessageHistory(开发)/RedisChatMessageHistory(生产)+ConversationBufferWindowMemory(k=5)等价逻辑 - 理由:最轻、最便宜、0 LLM 开销;客服问题上下文依赖浅、5 轮足够
- 规避:不要用 Entity/VectorStore——过度工程
场景 B:多轮长对话 / 角色扮演 agent(需跨 session 连续性)
- 首选:LangGraph + PostgresCheckpointSaver
- 理由:需要时光回溯(用户说"回到上一轮")、需要中断恢复(工具失败重试)、需要多租户隔离(
thread_id级) - 次选:
RunnableWithMessageHistory+ConversationSummaryBufferMemory——如果坚持不上 LangGraph - 规避:不用
ConversationEntityMemory——幻觉累积风险
场景 C:长期知识库 + 对话(需"记住用户画像")
- 首选:LangGraph 状态机 + 独立 RAG 向量库——对话 state 和用户画像 state 用 typed channel 分离、画像修改通过显式 tool 调用
- 规避:不要用
VectorStoreRetrieverMemory把对话和画像混存——自举污染
三条铁律——
- dev 用 InMemory、prod 必须外置存储——
InMemoryChatMessageHistory永远不要上生产、进程重启就丢 - session_id 必须不可猜测——
uuid4()最低要求、多租户场景升级到 HMAC 签名 - 任何 LLM 调用都要超时兜底——
ConversationSummaryMemory/ConversationEntityMemory每轮都调 LLM、超时/429 要有降级路径(跳过摘要、直接存原文)——否则 Memory 变成对话链路的单点故障
13.12 设计模式总结
13.12.1 策略模式
不同的记忆策略(Buffer/Summary/TokenBuffer/Entity/VectorStore)是策略模式的典型应用。它们都实现了 BaseMemory 接口,可以互换使用而不影响 Chain 的逻辑。
13.12.2 存储分离
记忆策略与存储后端的分离是一个优秀的设计。BaseChatMemory 通过 chat_memory: BaseChatMessageHistory 属性将消息存储委托出去。ConversationEntityMemory 通过 entity_store: BaseEntityStore 将实体存储委托出去。这使得同一种策略可以搭配不同的存储后端。
flowchart TD
subgraph 记忆策略层
BM[ConversationBufferMemory]
SM[ConversationSummaryMemory]
TBM[ConversationTokenBufferMemory]
EM[ConversationEntityMemory]
end
subgraph 存储抽象层
CMH[BaseChatMessageHistory]
ES[BaseEntityStore]
end
subgraph 存储实现层
INMEM[InMemory]
REDIS[Redis]
PG[PostgreSQL]
SQLITE[SQLite]
end
BM --> CMH
SM --> CMH
TBM --> CMH
EM --> CMH
EM --> ES
CMH --> INMEM
CMH --> REDIS
CMH --> PG
ES --> INMEM
ES --> REDIS
ES --> SQLITE
13.12.3 渐进式上下文管理
五种记忆策略代表了上下文管理的五个层次:
- 完整保留(Buffer):最简单,适合短对话
- 按量裁剪(TokenBuffer):控制成本,保留最近的上下文
- 摘要压缩(Summary):用 LLM 压缩历史,保留全局理解
- 实体提取(Entity):结构化知识管理,适合需要跟踪多个话题的场景
- 语义检索(VectorStore):无限容量的长期记忆,按相关性召回
13.12.4 抽象—实现矩阵:本章所有协议全景
把本章涉及的所有抽象接口和它们的实现汇总到一张表——方便读者在后续章节反查:
| 抽象层 | 包位置 | 核心方法 | 本章小节 | 下游用法 |
|---|---|---|---|---|
BaseMemory |
langchain_classic/memory/ |
load_memory_variables / save_context / clear |
§13.1 | Chain.memory 字段(已弃用) |
BaseChatMemory |
同上 | + chat_memory + return_messages |
§13.2 | 绝大多数 classic Memory 的基类 |
BaseEntityStore |
memory/entity.py |
get / set / delete / exists / clear |
§13.7.2 | ConversationEntityMemory.entity_store |
BaseChatMessageHistory |
langchain_core/chat_history |
add_messages / messages / clear |
§13.9.1 | RunnableWithMessageHistory 依赖 |
BaseRetriever |
langchain_core/retrievers |
invoke / add_documents |
第 10 章 | VectorStoreRetrieverMemory 依赖 |
BaseCheckpointSaver |
langgraph/checkpoint |
put / put_writes / get_tuple / list |
§13.11.5 | LangGraph StateGraph 依赖 |
分层策略的精髓——同一层抽象保持窄接口、多实现覆盖存储后端——比如 BaseChatMessageHistory 只有 3 个核心方法、却有 21 个后端实现(Redis、PG、Mongo、DynamoDB、Cosmos、Firebase……)——用户按部署环境选后端、代码零修改。这是"依赖倒置"在 Python 动态语言下的典范落地。
13.13 小结
LangChain 的记忆系统展现了 AI 应用中会话状态管理的完整图景。BaseMemory 通过三个抽象方法(load_memory_variables、save_context、clear)定义了清晰的记忆契约,与 Chain 基类的 prep_inputs/prep_outputs 形成了无缝的集成。
五种具体的记忆实现覆盖了从简单到复杂的全部需求:ConversationBufferMemory 适合短对话,ConversationTokenBufferMemory 精确控制 token 预算,ConversationSummaryMemory 通过 LLM 实现了信息压缩,ConversationEntityMemory 维护了结构化的实体知识库,VectorStoreRetrieverMemory 实现了基于语义相似度的长期记忆检索。
存储层通过 BaseChatMessageHistory 和 BaseEntityStore 两个抽象实现了策略与存储的分离,支持从内存到 Redis、PostgreSQL、DynamoDB 等多种后端。
RunnableWithMessageHistory 代表了现代的会话管理方式,它基于 LCEL 的 Runnable 协议,通过配置化的 session 管理和自动的消息加载/保存,提供了比传统 Memory 更灵活、更透明的解决方案。尽管传统 Memory 类已被标记为弃用,但它们所体现的设计模式 -- 策略选择、存储分离、渐进式上下文管理 -- 在现代方案中依然适用。
更进一步、LangGraph 的 BaseCheckpointSaver 把整个对话状态(不只是消息列表)提升为持久化单元——支持时光回溯、中断恢复、多分支——这是 AI 应用状态管理的下一代范式。本章讨论的所有 classic Memory 类、从框架演进史的角度是宝贵的教训——它们用各自的方式回答了"如何向 LLM 注入历史上下文"这个问题、也用各自的方式暴露了局限——只有理解了这些局限(prompt 侵入性、幻觉累积、session 并发、存储耦合)、才能在 LangGraph 时代做出正确的架构选型。
后续阅读推荐——第 14 章 Agent / 第 15-18 章 LangGraph / 第 10 章 RAG / 第 19 章生产部署——这四章从不同侧面延伸了"记忆"这个主题:Agent 章节讲如何记住工具调用历史链、LangGraph 章节用 state 通道建模一切会话上下文、RAG 章节用检索扩展长期记忆、生产部署章节关心 session 级 SLA 与多租户隔离。记忆不是孤立的模块、是 AI 应用的系统级主题——贯穿整个框架。