LangGraph 设计与实现
第15章 Store 与长期记忆
第15章 Store 与长期记忆
15.1 引言
Checkpoint 是 LangGraph 的”短期记忆”——它保存单个线程(thread)内的完整状态历史,支持时间旅行和中断恢复。但在许多 LLM 应用中,我们还需要一种”长期记忆”:跨线程、跨对话的持久化存储。例如,用户的偏好设置在所有对话中都应该可用;一个文档分析 Agent 应该记住之前处理过的文档;多个 Agent 之间需要共享知识库。
LangGraph 通过 BaseStore 接口和 InMemoryStore 实现提供了这种长期记忆能力。Store 是一个层级化的键值存储,支持命名空间(namespace)组织、精确查询和可选的向量语义搜索。它通过 Runtime.store 注入到节点函数中,也可以在工具函数中通过 ToolRuntime.store 访问。
本章将从 BaseStore 的抽象接口出发,分析 Item 数据模型、操作类型(Get/Put/Search/ListNamespaces)的设计,深入 InMemoryStore 的实现细节,并对比 Store 与 Checkpoint 在定位、生命周期和使用场景上的本质区别。
本章要点
BaseStore接口的操作原语——Get、Put、Search、ListNamespaces 四种操作Item数据模型——key/value/namespace/created_at/updated_atInMemoryStore的实现——字典存储 + 可选向量搜索- Store 与 Checkpoint 的区别——跨线程 vs 线程内、键值 vs 快照
runtime.store的访问模式——从节点和工具中使用 Store
15.2 BaseStore 接口
15.2.1 接口定义
BaseStore 定义在 langgraph/store/base/__init__.py 中,是一个抽象基类,定义了 Store 的全部操作原语:
class BaseStore(ABC):
"""Abstract base class for persistent key-value stores."""
@abstractmethod
def batch(self, ops: Iterable[Op]) -> list[Result]:
"""Execute a batch of operations."""
@abstractmethod
async def abatch(self, ops: Iterable[Op]) -> list[Result]:
"""Async version of batch."""
# 便捷方法(基于 batch 实现)
def get(self, namespace: tuple[str, ...], key: str) -> Item | None: ...
def put(self, namespace: tuple[str, ...], key: str, value: dict) -> None: ...
def delete(self, namespace: tuple[str, ...], key: str) -> None: ...
def search(self, namespace_prefix: tuple[str, ...], **kwargs) -> list[Item]: ...
def list_namespaces(self, **kwargs) -> list[tuple[str, ...]]: ...
# 对应的 async 版本
async def aget(self, ...) -> Item | None: ...
async def aput(self, ...) -> None: ...
async def adelete(self, ...) -> None: ...
async def asearch(self, ...) -> list[Item]: ...
async def alist_namespaces(self, ...) -> list[tuple[str, ...]]: ...
所有便捷方法最终都委托给 batch 或 abatch 方法。这种设计的好处是实现类只需要实现两个方法就能获得完整的接口,同时 batch 方法天然支持操作批量化,有利于网络 I/O 优化。
15.2.2 操作类型
Store 定义了四种操作原语,每种都是一个 NamedTuple:
graph TB
subgraph 操作类型
GetOp["GetOp<br/>namespace + key<br/>精确查找单个 Item"]
PutOp["PutOp<br/>namespace + key + value<br/>创建或更新 Item"]
SearchOp["SearchOp<br/>namespace_prefix + filter/query<br/>搜索多个 Item"]
ListNS["ListNamespacesOp<br/>match_conditions<br/>列举命名空间"]
end
GetOp -->|返回| Item["Item | None"]
PutOp -->|返回| None_["None"]
SearchOp -->|返回| Items["list[SearchItem]"]
ListNS -->|返回| NS["list[tuple[str, ...]]"]
15.2.3 GetOp
class GetOp(NamedTuple):
namespace: tuple[str, ...]
key: str
refresh_ttl: bool = True
GetOp 通过 namespace + key 精确定位一个 Item。refresh_ttl 控制是否刷新该 Item 的 TTL(存活时间)。
15.2.4 PutOp
class PutOp(NamedTuple):
namespace: tuple[str, ...]
key: str
value: dict[str, Any] | None # None 表示删除
index: list[str] | Literal[False] | None = None
PutOp 用于创建、更新或删除 Item。当 value 为 None 时表示删除。index 字段控制哪些值路径需要被向量索引。
15.2.5 SearchOp
class SearchOp(NamedTuple):
namespace_prefix: tuple[str, ...]
filter: dict[str, Any] | None = None
limit: int = 10
offset: int = 0
query: str | None = None
refresh_ttl: bool = True
SearchOp 是最灵活的查询操作。它支持三种过滤方式:
- 命名空间前缀:只搜索指定前缀下的 Item
- 结构化过滤:通过
filter字典做精确匹配或比较操作 - 语义搜索:通过
query字符串做向量相似度搜索
过滤操作支持多种比较运算符:
# 精确匹配
{"status": "active"}
# 比较运算符
{"score": {"$gt": 4.99}}
{"score": {"$gte": 3.0}}
{"priority": {"$lt": 5}}
{"count": {"$lte": 100}}
{"status": {"$ne": "deleted"}}
15.3 Item 数据模型
15.2.6 BaseStore 的两个被忽视的类级属性:supports_ttl 与 weakref
打开 libs/checkpoint/langgraph/store/base/__init__.py:700,BaseStore 类头定义里藏着两个教学资料极少提到的类级变量:
class BaseStore(ABC):
supports_ttl: bool = False
ttl_config: TTLConfig | None = None
__slots__ = ("__weakref__",)
1、supports_ttl: bool = False 是一个 feature capability flag。子类必须显式覆写为 True 才能”声称支持 TTL”。InMemoryStore 没覆写——所以它的 supports_ttl 读出来是 False——即使它的 put/get 接口接受 ttl 参数。这不是 API 不一致,而是一条明确契约:InMemoryStore 接受 TTL 参数但不会真的执行过期回收,用户代码如果依赖 TTL 语义必须用 Postgres/Sqlite 后端。Python 里没有 trait/interface 支持特性检查,用类属性模拟出来——使用方法:
store = InMemoryStore()
assert store.supports_ttl is False # 静态查询能力
2、__slots__ = ("__weakref__",) 有两层用意:
- 禁止动态添加实例属性。如果子类定义了
__slots__但基类没有,就会破坏 slots 隔离(实例会有__dict__)——所以基类也要声明自己的 slots。 - 显式保留
__weakref__。slot 类默认不支持弱引用(weakref.ref(store)会抛 TypeError),把"__weakref__"放进 slots 恢复这个能力。LangGraph 在langgraph.pregel.main.Pregel里对 store 会持弱引用(避免循环引用导致 GC 不回收)——因此 BaseStore 必须支持 weakref。
看似不起眼的一行代码背后是”内存布局约束 + 弱引用生态互通”的双重考虑。这条实践在 Python 库开发里值得记住:写基类用 __slots__,但永远别忘了加 "__weakref__"——否则库会被在复杂场景下使用时出现”some_type does not support weak references”的诡异错误。
15.3 Item 数据模型
15.3.0 _validate_namespace:被 Pregel 在编译期强加的四条铁律
Store 接口接受的 namespace 是 tuple[str, ...]——Python 里一个普通元组没有格式要求。但 BaseStore.put/BaseStore.search 第一行都会调 _validate_namespace(namespace)(store/base/__init__.py:911, 1172)。这个校验函数(第 1255 行)把四条看似平常的规则写死在了框架里:
def _validate_namespace(namespace: tuple[str, ...]) -> None:
if not namespace:
raise InvalidNamespaceError("Namespace cannot be empty.")
for label in namespace:
if not isinstance(label, str):
raise InvalidNamespaceError(...)
if "." in label:
raise InvalidNamespaceError(
f"Invalid namespace label '{label}' found in {namespace}. "
f"Namespace labels cannot contain periods ('.')."
)
elif not label:
raise InvalidNamespaceError(
f"Namespace labels cannot be empty strings. Got {label} in {namespace}"
)
if namespace[0] == "langgraph":
raise InvalidNamespaceError(
f'Root label for namespace cannot be "langgraph". Got: {namespace}'
)
四条规则各有源自真实后端的工程动机:
规则 1:namespace 不能为空。空 tuple () 作为 “根 namespace” 在语义上模糊——到底是”存到顶层”还是”查所有”?LangGraph 选择拒绝以消除歧义。你想存顶层文档应该写 ("documents",) 这种至少有一层的 namespace。
规则 2:label 必须是 str。限制这条是因为 PostgresStore/SqliteStore 会把 namespace 拼接成一条 ltree 路径或字符串主键——tuple 里出现 int/UUID 会让拼接不稳定。统一要求 str 让所有后端序列化方式一致。
规则 3:label 不能含 .。点号是 Postgres ltree 类型的分隔符——如果允许 ("users.alice", "prefs"),Postgres 存成 users.alice.prefs、查询时会被误解析为三层命名空间。InMemory/Sqlite 自己不会有这个问题,但为了跨后端可移植性,校验器统一禁止。用户需要在 label 里表达多段信息时应该用下划线或连字符(("users", "alice_corp")),而不是点号。
规则 4:根 label 不能是 "langgraph"。这是 LangGraph 自己预留的保留字——框架内部可能会用 ("langgraph", ...) 前缀存储系统元数据(比如 checkpointer 的迁移状态、Pregel 的编译缓存)。用户代码如果写进 ("langgraph", ...) 会发生谁覆盖谁都说不清楚——所以预先禁止。
这四条校验看似冗余、实则决定了 Store 在多后端场景下的一致性。在接口层做防御校验、在实现层假设输入已经合法——这是分层架构里”迟一行校验、早一层信任”的经典操作。用户实际代码被拒时错误消息非常精确(点名哪个 label 有问题),回到第 13 章讨论 Checkpoint 的 CheckpointError 时我们说过这种”可定位的输入错误”是 LangGraph 设计哲学的一贯特征。
15.3.1 结构定义
class Item:
"""Represents a stored item with metadata."""
__slots__ = ("value", "key", "namespace", "created_at", "updated_at")
def __init__(
self,
*,
value: dict[str, Any],
key: str,
namespace: tuple[str, ...],
created_at: datetime,
updated_at: datetime,
):
self.value = value
self.key = key
self.namespace = tuple(namespace)
self.created_at = (
datetime.fromisoformat(cast(str, created_at))
if isinstance(created_at, str)
else created_at
)
self.updated_at = (
datetime.fromisoformat(cast(str, updated_at))
if isinstance(updated_at, str)
else updated_at
)
Item 使用 __slots__ 优化内存,值类型固定为 dict[str, Any]。时间戳字段支持从 ISO 8601 字符串反序列化——这是为了兼容 JSON 存储后端。
15.3.2 命名空间的层级设计
graph TB
Root["()"]
Root --> Users["('users',)"]
Root --> Docs["('docs',)"]
Users --> U1["('users', 'alice')"]
Users --> U2["('users', 'bob')"]
U1 --> Prefs["('users', 'alice', 'prefs')"]
U1 --> History["('users', 'alice', 'history')"]
Docs --> D1["('docs', 'project_a')"]
style Root fill:#f9f9f9,stroke:#333
style Users fill:#e6f3ff,stroke:#333
style Docs fill:#fff3e6,stroke:#333
命名空间是一个字符串元组,形成类似文件系统的层级结构。这种设计的优势:
- 组织清晰:自然地按用户、项目、类型分层
- 搜索高效:通过
namespace_prefix可以搜索某个子树下的所有 Item - 访问控制:可以基于命名空间前缀实现权限隔离
15.3.3 SearchItem
class SearchItem(Item):
"""Represents an item returned from a search operation."""
__slots__ = ("score",)
def __init__(self, ..., score: float | None = None):
super().__init__(...)
self.score = score
SearchItem 继承自 Item,增加了 score 字段,用于向量搜索时返回相似度分数。
15.4 InMemoryStore 实现
15.4.1 数据结构
class InMemoryStore(BaseStore):
__slots__ = ("_data", "_vectors", "index_config", "embeddings")
def __init__(self, *, index: IndexConfig | None = None) -> None:
self._data: dict[tuple[str, ...], dict[str, Item]] = defaultdict(dict)
self._vectors: dict[tuple[str, ...], dict[str, dict[str, list[float]]]] = (
defaultdict(lambda: defaultdict(dict))
)
self.index_config = index
if self.index_config:
self.embeddings = ensure_embeddings(self.index_config.get("embed"))
InMemoryStore 使用两层嵌套字典:外层按命名空间分组,内层按 key 索引。向量数据单独存储在 _vectors 字典中。
graph LR
subgraph "_data 存储结构"
NS1["('users', 'alice')"] --> K1["'prefs' -> Item"]
NS1 --> K2["'profile' -> Item"]
NS2["('docs',)"] --> K3["'doc1' -> Item"]
end
subgraph "_vectors 向量索引"
VNS1["('docs',)"] --> VK1["'doc1'"]
VK1 --> VP1["'text' -> [0.1, 0.2, ...]"]
end
15.4.1-bis batch 入口:三型拆分到三种处理路径
InMemoryStore.batch(memory/init.py:206)的真实实现只有 14 行,但把 Store 所有”批量做什么”的逻辑摊在一个函数里:
def batch(self, ops: Iterable[Op]) -> list[Result]:
# The batch/abatch methods are treated as internal.
# Users should access via put/search/get/list_namespaces/etc.
results, put_ops, search_ops = self._prepare_ops(ops)
if search_ops:
queryinmem_store = self._embed_search_queries(search_ops)
self._batch_search(search_ops, queryinmem_store, results)
to_embed = self._extract_texts(put_ops)
if to_embed and self.index_config and self.embeddings:
embeddings = self.embeddings.embed_documents(list(to_embed))
self._insertinmem_store(to_embed, embeddings)
self._apply_put_ops(put_ops)
return results
_prepare_ops 把传入的 ops 按类型分成三路(memory/init.py:375):
for i, op in enumerate(ops):
if isinstance(op, GetOp):
item = self._data[op.namespace].get(op.key)
results.append(item) # ← 立即返回
elif isinstance(op, SearchOp):
search_ops[i] = (op, self._filter_items(op))
results.append(None) # ← 占位,稍后填
elif isinstance(op, ListNamespacesOp):
results.append(self._handle_list_namespaces(op)) # ← 立即返回
elif isinstance(op, PutOp):
put_ops[(op.namespace, op.key)] = op
results.append(None) # ← 占位
else:
raise ValueError(f"Unknown operation type: {type(op)}")
这套分型有两条工程巧思值得细读:
1、put_ops 用 (namespace, key) 作 dict 的 key——天然去重。如果同一个 batch 里出现了两条针对同一 (ns, key) 的 PutOp,后者覆盖前者,只有一条最终被执行。这避免了”同批次重复 put 的顺序敏感性”——用户不用担心批量插入时同 key 操作的先后。
2、SearchOp 和 PutOp 都先占 None 返回位,最后才填。这是为了让 embedding 调用成批进行。如果一个 batch 里同时有 5 条 SearchOp 需要 embed query 和 3 条 PutOp 需要 embed value——各走各的 embedding 调用是 8 次 RPC;_embed_search_queries + _extract_texts 统一 embed 是 2 次 RPC(search 的 5 条一起、put 的 3 条一起)——把 embedding 成本压到最低,这对 LLM embedding 收费 API 非常关键。
从这个设计能反推出 LangGraph Store 的一条核心哲学:API 层暴露细粒度操作、实现层成批优化。用户随便调 store.get()/store.put()——接口简单、不用管批量;但如果你写 store.batch([...]) 传入复杂操作集,实现层会自动找到 I/O 和计算的聚合点。
15.4.2 基本操作示例
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# 存储用户偏好
store.put(("users", "alice"), "prefs", {"theme": "dark", "language": "zh"})
# 读取
item = store.get(("users", "alice"), "prefs")
print(item.value) # {"theme": "dark", "language": "zh"}
print(item.key) # "prefs"
print(item.namespace) # ("users", "alice")
# 搜索某用户下的所有数据
results = store.search(("users", "alice"))
# 列举所有命名空间
namespaces = store.list_namespaces(prefix=("users",))
# [("users", "alice"), ("users", "bob"), ...]
# 删除
store.put(("users", "alice"), "prefs", None) # value=None 表示删除
15.4.2-bis _apply_put_ops 删除时同时清理 _data 和 _vectors
value=None 表示删除这个约定比看起来要多一层。真实的 _apply_put_ops(memory/init.py:404):
def _apply_put_ops(self, put_ops):
for (namespace, key), op in put_ops.items():
if op.value is None:
self._data[namespace].pop(key, None)
self._vectors[namespace].pop(key, None) # ← 同时清向量索引
else:
self._data[namespace][key] = Item(
value=op.value, ...
)
注意删除时必须同时 pop _data 和 _vectors。如果只清 _data,后续向量搜索仍会匹配到”已经被删掉但还有 embedding 的 key”——返回一个 namespace+key 指向空数据的”僵尸 SearchItem”。
相反的,向更新 put 的路径没有清 _vectors——这是有意的。更新 value 后,同 key 的 embedding 会由 _extract_texts + _insertinmem_store 重新计算并覆盖掉旧向量——所以此时不必先清。但如果你把 _data 存的 value 从 “有 text 字段” 改成 “没 text 字段”,旧的 embedding 会继续存活(因为新 value 里没 text,不触发 re-embed)——这是一个 InMemoryStore 的已知边界行为,生产 Postgres Store 通过显式的 DELETE FROM ... WHERE 语句避免这个问题。
实际使用这条知识的价值:如果你在 InMemoryStore 上做了”从 {text: "foo"} 改成 {}”的 put,语义上那个 Item 应该不可搜索了,但向量搜索仍能用旧 embedding 找到它。要避免,应该先 put(ns, key, None) 删除、再重新 put 新 value。
15.4.3 向量搜索配置
from langgraph.store.memory import InMemoryStore
# 使用 LangChain embeddings
from langchain.embeddings import init_embeddings
store = InMemoryStore(
index={
"dims": 1536,
"embed": init_embeddings("openai:text-embedding-3-small"),
"fields": ["text"], # 指定要索引的字段
}
)
# 存储文档
store.put(("docs",), "doc1", {"text": "Python 编程入门教程", "author": "Alice"})
store.put(("docs",), "doc2", {"text": "TypeScript 类型系统详解", "author": "Bob"})
# 语义搜索
results = store.search(("docs",), query="编程语言教程")
for item in results:
print(f"{item.key}: {item.value['text']} (score={item.score})")
向量搜索也支持使用原生的 OpenAI SDK 或任意嵌入函数:
from openai import OpenAI
client = OpenAI()
def embed_texts(texts: list[str]) -> list[list[float]]:
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return [e.embedding for e in response.data]
store = InMemoryStore(
index={"dims": 1536, "embed": embed_texts}
)
15.4.4 _batch_search 的四段语义:flatten → score → max-pool → 分页
InMemoryStore 的向量搜索核心逻辑在 _batch_search(memory/init.py:302)。逐段翻译它做什么:
# 阶段 1:展平 (item, vectors) 为 (item, single_vector) 列表
flat_items, flat_vectors = [], []
scoreless = []
for item, vectors in candidates:
for vector in vectors:
flat_items.append(item)
flat_vectors.append(vector)
if not vectors:
scoreless.append(item) # ← 没有 embedding 的 item 单独记下
# 阶段 2:一次性计算所有 (query, vector) 的余弦相似度
scores = _cosine_similarity(query_embedding, flat_vectors)
# 阶段 3:按分数排序
sorted_results = sorted(
zip(scores, flat_items, strict=False),
key=lambda x: x[0],
reverse=True,
)
# 阶段 4:max pooling——同一个 (namespace, key) 只保留分最高的那份
seen = set()
kept = []
for score, item in sorted_results:
key = (item.namespace, item.key)
if key in seen:
continue
ix = len(seen)
seen.add(key)
if ix >= op.offset + op.limit:
break
if ix < op.offset:
continue
kept.append((score, item))
if scoreless and len(kept) < op.limit:
# Corner case: fill tail with non-scored items
kept.extend((None, item) for item in scoreless[: op.limit - len(kept)])
四个设计点值得细读:
1、多字段索引的”每字段一条向量”。如果 index.fields = ["text", "summary"],一个 Item 会产生 两条向量(text 一条、summary 一条)。flat_items 在 flatten 阶段会把同一 Item 塞进去两遍(每个 vector 对应一行)。
2、max pooling 消重。多字段产生的多个向量在分数上互相独立——同一 Item 可能同时以 text_score=0.85 和 summary_score=0.90 两条身份出现。seen set 按 (namespace, key) 去重——只保留得分最高的那一条。sorted_results 是降序排的,所以遍历时第一次遇到一个 key 就是最高分,后续全 skip。这就是 “max pooling” 的字面含义——把多向量 collapse 到单 Item 时取最大相似度。
3、offset/limit 在去重之后。注意 ix = len(seen) 统计的是去重后的序号——offset=5 表示”跳过前 5 个不同 Item”,而不是”跳过前 5 个向量”。这是正确的语义(用户写 limit=10 期望拿到 10 个不同 Item),但实现成本比单纯向量排序高——必须遍历完所有 sorted_results 再根据序号决定要不要 keep。
4、scoreless 兜底。没有任何向量的 Item(比如 index.fields=["text"] 但某 Item 没有 text 字段)在 filter 后仍是候选,但没得分。如果用户 limit=10 但有得分的 Item 只有 3 个,就从 scoreless 里补 7 个(score=None)填满。设计理由是”用户显式指定了 limit 就要尽量返回那么多”——哪怕部分 item 没相似度分。这对”先粗筛再人工 rerank”的流程很友好。
15.4.5 _cosine_similarity:numpy 可选 + 纯 Python 回退
向量相似度计算函数(memory/init.py:493)同时提供 numpy 优化路径和纯 Python 兜底:
def _cosine_similarity(X, Y):
if not Y:
return []
if _check_numpy():
import numpy as np
X_arr = np.array(X) if not isinstance(X, np.ndarray) else X
Y_arr = np.array(Y) if not isinstance(Y, np.ndarray) else Y
X_norm = np.linalg.norm(X_arr)
Y_norm = np.linalg.norm(Y_arr, axis=1)
# Avoid division by zero
mask = Y_norm != 0
similarities = np.zeros_like(Y_norm)
similarities[mask] = np.dot(Y_arr[mask], X_arr) / (Y_norm[mask] * X_norm)
return similarities.tolist()
# 纯 Python fallback
similarities = []
for y in Y:
dot_product = sum(a * b for a, b in zip(X, y, strict=False))
norm1 = sum(a * a for a in X) ** 0.5
norm2 = sum(a * a for a in y) ** 0.5
similarity = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0
similarities.append(similarity)
return similarities
四条教学点:
1、numpy 是可选依赖。LangGraph 的 checkpoint 子包不强依赖 numpy,Store 用懒检测 _check_numpy()(延迟 import 并捕获 ImportError)。这让最小化部署(比如 serverless 场景)可以不装 numpy——代价是向量搜索慢一个数量级,但对开发/测试 InMemoryStore 可忽略。
2、numpy 路径里的 mask。mask = Y_norm != 0——把”所有维度为 0 的向量”排除。这些向量不能 normalize(除 0),让它们的相似度直接是 0。纯 Python 路径的 if norm1 > 0 and norm2 > 0 else 0.0 是同样的保护,只是语法看起来不像矩阵运算。
3、zip(X, y, strict=False)。Python 3.10+ 新增 strict 参数——True 时要求两侧长度一致、不等就 raise。这里传 False 是允许长度不同(虽然 embedding 场景下应该永远相等),要在 Python 3.9 兼容性和严格校验之间做权衡。如果你的生产代码要求严格校验,建议改 strict=True 以便在 embedding 维度不匹配时立刻炸出来(而不是默默计算出错误相似度)。
4、双实现路径的维护成本。一份 numpy、一份纯 Python——两条路径必须语义一致。这份代码很短所以好维护,但如果你给自己的系统写类似的双路径,记得写交叉测试:同一组输入、两条路径结果应该在浮点容差内相等。否则会出现”开发环境没装 numpy 结果 A、生产环境装了 numpy 结果 B”的诡异分叉。
15.5 Store 与 Checkpoint 的区别
15.5.1 核心对比
graph TB
subgraph "Checkpoint(检查点)"
direction TB
CP1["线程级别<br/>每个 thread_id 独立"]
CP2["快照语义<br/>保存完整状态"]
CP3["自动管理<br/>框架在每步保存"]
CP4["时间旅行<br/>可回溯到任意步骤"]
CP5["序列化格式<br/>Channel 值的快照"]
end
subgraph "Store(存储)"
direction TB
ST1["全局级别<br/>跨所有 thread 共享"]
ST2["键值语义<br/>namespace + key 定位"]
ST3["手动管理<br/>节点显式读写"]
ST4["持久化存储<br/>独立于图执行"]
ST5["结构化数据<br/>dict[str, Any] 值"]
end
| 维度 | Checkpoint | Store |
|---|---|---|
| 作用域 | 单个线程(thread_id) | 全局跨线程 |
| 数据模型 | Channel 值的完整快照 | namespace/key/value 键值对 |
| 管理方式 | 框架自动保存 | 开发者显式操作 |
| 时间语义 | 有序历史(可时间旅行) | 最新值(无历史) |
| 访问方式 | get_state() / get_state_history() | runtime.store.get/put/search |
| 典型用途 | 对话上下文、中断恢复 | 用户偏好、知识库、跨对话记忆 |
15.5.2 互补关系
Checkpoint 和 Store 不是替代关系,而是互补的:
def personalized_chat(state: State, runtime: Runtime) -> State:
# 从 Checkpoint 恢复的状态中获取对话历史(线程内)
messages = state["messages"]
# 从 Store 中获取用户偏好(跨线程)
prefs = runtime.store.get(
("users", runtime.context.user_id), "preferences"
)
# 结合两者生成响应
prompt = f"User prefers {prefs.value['style']}" if prefs else ""
response = llm.invoke(messages + [SystemMessage(content=prompt)])
# 更新 Store 中的记忆(跨线程持久化)
runtime.store.put(
("users", runtime.context.user_id, "memory"),
f"conv_{runtime.execution_info.thread_id}",
{"summary": summarize(messages)}
)
return {"messages": [response]}
15.5.3 数据流对比
sequenceDiagram
participant User as 用户
participant Graph as 图执行
participant CP as Checkpoint
participant Store as Store
Note over CP: 线程 A 的状态
User->>Graph: 开始对话(线程 A)
Graph->>CP: 保存每步状态
Graph->>Store: 保存用户偏好
Graph-->>User: 响应
Note over CP: 线程 B 的状态
User->>Graph: 新对话(线程 B)
Graph->>CP: 读取线程 B 的状态(空)
Graph->>Store: 读取用户偏好(跨线程!)
Note over Graph: 用户偏好从线程 A 延续到线程 B
Graph-->>User: 个性化响应
15.6 runtime.store 访问模式
15.6.1 在节点中使用
def my_node(state: State, runtime: Runtime) -> State:
if runtime.store is None:
# 没有配置 Store,降级处理
return {"result": "default"}
# 读取
item = runtime.store.get(("config",), "settings")
# 写入
runtime.store.put(
("results", state["session_id"]),
"analysis",
{"score": 0.95, "category": "positive"}
)
# 搜索
similar = runtime.store.search(
("knowledge",),
query=state["question"],
limit=5
)
return {"result": [s.value for s in similar]}
15.6.2 在工具中使用
通过 ToolRuntime 或 InjectedStore 注解:
from langchain_core.tools import tool
from langgraph.prebuilt import ToolRuntime
@tool
def save_memory(content: str, runtime: ToolRuntime) -> str:
"""保存一段记忆到长期存储"""
runtime.store.put(
("memories",),
f"mem_{hash(content)}",
{"content": content, "timestamp": datetime.now().isoformat()}
)
return f"Memory saved: {content[:50]}..."
@tool
def recall_memories(query: str, runtime: ToolRuntime) -> str:
"""回忆相关记忆"""
results = runtime.store.search(("memories",), query=query, limit=3)
return "\n".join(r.value["content"] for r in results)
15.6.3 Store 配置
Store 在图编译时注入:
from langgraph.store.memory import InMemoryStore
store = InMemoryStore(index={"dims": 1536, "embed": my_embed_fn})
graph = (
StateGraph(State)
.add_node("process", process_node)
.set_entry_point("process")
.set_finish_point("process")
.compile(store=store) # 注入 Store
)
编译后,store 通过 Pregel 的初始化流程被放入 Runtime,最终注入到每个任务的配置中。
15.7 跨线程记忆模式
15.7.1 用户画像累积
def update_user_profile(state: State, runtime: Runtime) -> State:
user_ns = ("users", runtime.context.user_id)
# 获取现有画像
profile = runtime.store.get(user_ns, "profile")
existing = profile.value if profile else {}
# 从对话中提取新信息
new_info = extract_user_info(state["messages"])
# 合并更新
merged = {**existing, **new_info}
runtime.store.put(user_ns, "profile", merged)
return state
15.7.2 共享知识库
def research_agent(state: State, runtime: Runtime) -> State:
# 搜索已有研究成果
existing = runtime.store.search(
("research", state["topic"]),
query=state["question"],
limit=10
)
if relevant := [r for r in existing if r.score and r.score > 0.8]:
# 复用已有成果
return {"answer": synthesize(relevant)}
# 生成新的研究成果
answer = do_research(state["question"])
# 保存到共享知识库
runtime.store.put(
("research", state["topic"]),
f"q_{hash(state['question'])}",
{"question": state["question"], "answer": answer}
)
return {"answer": answer}
15.7.3 命名空间设计模式
graph TB
subgraph 常见命名空间模式
U["('users', user_id)"] --> UP["用户画像"]
U --> UM["('users', user_id, 'memories')"] --> UMI["对话记忆"]
U --> UH["('users', user_id, 'history')"] --> UHI["历史记录"]
D["('docs', project_id)"] --> DI["文档索引"]
K["('knowledge', domain)"] --> KI["知识条目"]
C["('cache',)"] --> CI["缓存数��"]
end
15.8 MatchCondition 与 ListNamespacesOp
15.8.1 命名空间查询
ListNamespacesOp 支持复杂的命名空间匹配:
class MatchCondition(NamedTuple):
match_type: NamespaceMatchType # "prefix" | "suffix"
path: NamespacePath # 可包含 "*" 通配符
class ListNamespacesOp(NamedTuple):
match_conditions: tuple[MatchCondition, ...] | None = None
max_depth: int | None = None
limit: int = 100
offset: int = 0
示例:
# 列举所有用户命名空间
store.list_namespaces(
match_conditions=(MatchCondition("prefix", ("users",)),)
)
# 列举所有以 "v1" 结尾的命名空间
store.list_namespaces(
match_conditions=(MatchCondition("suffix", ("v1",)),)
)
# 组合条件:以 "docs" 开头且以 "draft" 结尾
store.list_namespaces(
match_conditions=(
MatchCondition("prefix", ("docs",)),
MatchCondition("suffix", ("draft",)),
)
)
15.8.2 SearchOp:自然语言检索 + MongoDB 风格过滤
ListNamespacesOp 之外、Store 还有另一个查询原语 SearchOp,章节完全没展开。打开 checkpoint/langgraph/store/base/__init__.py:203:
class SearchOp(NamedTuple):
namespace_prefix: tuple[str, ...]
filter: dict[str, Any] | None = None
limit: int = 10
offset: int = 0
query: str | None = None # ← 自然语言查询
...
两种独立的检索维度:
1. filter 字段——MongoDB 风格过滤算子(源码 docstring 明确列出 6 种):
$eq Equal to(同直接值比较)
$ne Not equal to
$gt Greater than
$gte Greater than or equal to
$lt Less than
$lte Less than or equal to
用法:
store.search(
namespace_prefix=("documents",),
filter={
"status": "active", # ← 直接值 = $eq
"priority": {"$gte": 5}, # ← 算子比较
"type": {"$ne": "draft"},
}
)
LangGraph 选择 MongoDB 的 $-prefixed 算子语法不是偶然——这是 JSON 数据库(MongoDB、CouchDB、Elasticsearch 的 query DSL)里事实标准。用户对这种语法熟悉度最高、跨存储后端也能复用同一套 filter 语法。PostgresStore / SqliteStore / InMemoryStore 三种 Store 实现都要能接同样的 filter——这个算子抽象就是那层共用 API。
2. query: str 字段——自然语言语义检索:
store.search(
namespace_prefix=("users", "content"),
query="technical documentation about APIs", # ← LLM 语义搜索
limit=20,
)
但 docstring(line 211)写了一个重要警示:
“Natural language search support depends on your store implementation.”
不是所有 Store 都支持——InMemoryStore 必须有配套 embedder 才能做 semantic search;PostgresStore 需要 pgvector 扩展;SqliteStore 需要 sqlite-vec 扩展。用户代码如果依赖 query=... 但用的后端不支持——运行时报错而不是编译期——这是 Store 接口设计的一个”软约束”。
filter 和 query 可以组合——先语义筛选、再精确过滤:
store.search(
namespace_prefix=("docs",),
query="machine learning", # 语义相关
filter={"year": {"$gte": 2024}}, # 年份 ≥ 2024
limit=10,
)
这是典型的 “hybrid search”——在向量检索和关键字/元数据过滤之间做交叉。2024-2025 年生产 RAG 系统的主流范式。LangGraph Store 在 API 层就把这个能力内置了。
15.8.3 MatchCondition.path 的通配符 *
章节例子里 MatchCondition("suffix", ("v1",)) 是纯字符串路径。真实 path 类型支持通配符(store/base/__init__.py:350 docstring 示例):
MatchCondition(match_type="suffix", path=("cache", "*"))
* 匹配任意单段路径。所以 ("cache", "*") 能匹配 ("cache", "v1")、("cache", "v2") 等末尾两段。组合多个 * 和固定段可以表达 docs/*/draft/* 这种结构化匹配。
这让命名空间组织可以采用**“段化 pattern”** 而不是字符串拼接——比如 ("users", user_id, "preferences") 比 f"users/{user_id}/preferences" 更容易过滤:
# 列所有用户的 preferences namespace
store.list_namespaces(
match_conditions=(
MatchCondition("prefix", ("users",)),
MatchCondition("suffix", ("preferences",)),
)
)
# 列某一用户的所有 namespace
store.list_namespaces(
match_conditions=(MatchCondition("prefix", ("users", user_id)),)
)
这种 “结构化 path + tuple 段 + 通配符” 的设计让命名空间既有层次性、又便于程序化查询——比纯字符串 key 表达力强得多。
15.8.3-bis _handle_list_namespaces:max_depth 截断 + sorted + offset/limit 的四段管线
list_namespaces 看起来平平无奇,真实实现(memory/init.py:460)是把 4 个相关操作串在一起:
def _handle_list_namespaces(self, op: ListNamespacesOp) -> list[tuple[str, ...]]:
# 阶段 1:快照所有 namespace key,避免迭代时被并发修改
all_namespaces = list(self._data.keys())
namespaces = all_namespaces
# 阶段 2:按 match_conditions 过滤
if op.match_conditions:
namespaces = [
ns for ns in namespaces
if all(_does_match(condition, ns) for condition in op.match_conditions)
]
# 阶段 3:max_depth 截断 + 去重
if op.max_depth is not None:
namespaces = sorted({ns[: op.max_depth] for ns in namespaces})
else:
namespaces = sorted(namespaces)
# 阶段 4:offset/limit
return namespaces[op.offset : op.offset + op.limit]
两个值得深看的点:
1、“Avoid collection size changing while iterating” 的注释。list(self._data.keys()) 显式把 keys 拷贝成 list——这是为了防止在异步场景下、迭代过程中有其他协程 put/delete 导致 dict changed size during iteration 错误。InMemoryStore 没加锁——它依赖 Python 的 GIL + 这种”快照拷贝”来做并发安全。对于真正高并发场景,应该用 PostgresStore(有数据库级事务隔离)。
2、max_depth 的”截断 + set 去重”。如果你有 100 个命名空间 ("users", "alice"), ("users", "bob"), ..., ("users", "zyy")——全部以 ("users",) 开头。查 max_depth=1 会把所有 ("users", X) 截成 ("users",)——set 自然去重——最终只返回 [("users",)] 一条。这对”只看顶层结构”的场景(比如管理后台列所有用户分类)很好用——不用 load 全部 namespace 就能看到”有哪些一级分类”。
两个阶段的组合拳让 list_namespaces 既是一个”穷举 namespace”的 API,也是一个”做 namespace 结构分析”的 API——看你传不传 max_depth。这种用一个 API 的不同参数表达不同语义的设计,在 SQL 里对应 GROUP BY depth 1 vs SELECT DISTINCT——LangGraph 用 max_depth 一个字段就把两种语义合并了。
15.8.4 _apply_operator 的真实实现:6 个算子、float 强转、JSONB 语义对齐
前面 §15.2.5 和 §15.8.2 列了 6 种 filter 算子($eq/$ne/$gt/$gte/$lt/$lte)。真实实现(memory/init.py:577)是个朴素的 if-elif:
def _apply_operator(value: Any, operator: str, op_value: Any) -> bool:
"""Apply a comparison operator, matching PostgreSQL's JSONB behavior."""
if operator == "$eq":
return value == op_value
elif operator == "$gt":
return float(value) > float(op_value)
elif operator == "$gte":
return float(value) >= float(op_value)
elif operator == "$lt":
return float(value) < float(op_value)
elif operator == "$lte":
return float(value) <= float(op_value)
elif operator == "$ne":
return value != op_value
else:
raise ValueError(f"Unknown operator: {operator}")
三条容易被忽略的性质:
1、大小比较强制转 float。$gt/$gte/$lt/$lte 四个比较操作先把两边都 float() 再比——意味着只能比数值(int/float)。如果你对字符串或 datetime 写 {"created_at": {"$gt": "2024-01-01"}},会抛 ValueError: could not convert string to float。这是 InMemoryStore 的限制——PostgresStore 的 JSONB 操作符支持字符串排序,跨后端这个差异要注意。
生产代码里的应对:日期比较要么 put 时存成 Unix timestamp、要么只用 PostgresStore。docstring 第 578 行写了 “matching PostgreSQL’s JSONB behavior”——但这句话对 InMemory 来说只对了一半(只对了数值比较这部分)。
2、$eq 和 $ne 用 Python 原生 ==/!=。对嵌套 dict、list 也能比较——但这依赖 Python 的 __eq__ 语义。比如 {"tags": ["a", "b"]} 和 {"tags": ["b", "a"]} 是不相等的(list 顺序敏感);但 {"a": 1, "b": 2} 和 {"b": 2, "a": 1} 是相等的(dict 不关心 key 顺序)。这和 JSON 序列化后的等价性一致但不等于”业务等价”——如果你的业务里 ["a", "b"] 和 ["b", "a"] 应该视为相等,filter 的 $eq 帮不上忙,得用 $in 或者业务层预先排序。
3、未知算子直接 raise,不是 warn。拼错 $gT(大写 T)会立刻 ValueError——这比静默忽略好。回到 §16 章讲过的”LangGraph opinion:用户错误要被立刻报出、不要静默继续”——这条规则在 Store 的 filter 层同样成立。
_compare_values(memory/init.py:551)还有一层更精彩的设计:它检查 filter_value 是不是 “算子字典”(所有 key 以 $ 开头)——
def _compare_values(item_value: Any, filter_value: Any) -> bool:
if isinstance(filter_value, dict):
if any(k.startswith("$") for k in filter_value):
return all(
_apply_operator(item_value, op_key, op_value)
for op_key, op_value in filter_value.items()
)
if not isinstance(item_value, dict):
return False
return all(
_compare_values(item_value.get(k), v) for k, v in filter_value.items()
)
...
如果 filter_value 的任一 key 以 $ 开头,整个字典当算子字典处理(所有算子必须都满足);否则当作嵌套字段匹配。这允许你写:
filter={"score": {"$gt": 5, "$lte": 10}} # 5 < score <= 10
6 < score <= 10 的复合约束在单字段上被 AND 了——靠的是 all(_apply_operator(...) for ... in filter_value.items())。这是一种非常简洁的 DSL 设计——一个 dict 同时承担”值匹配/算子组”两种语义,通过 key 前缀区分。MongoDB 的 filter DSL 也是这个设计,LangGraph 在 InMemoryStore 精确复现了 MongoDB 的这套语义。
15.9 设计决策
15.8.4-bis _check_numpy:懒加载 + 一次性警告 + lru_cache
为什么要专门提这个 6 行小函数(memory/init.py:479)?
@functools.lru_cache(maxsize=1)
def _check_numpy() -> bool:
if bool(util.find_spec("numpy")):
return True
logger.warning(
"NumPy not found in the current Python environment. "
"The InMemoryStore will use a pure Python implementation for vector operations, "
"which may significantly impact performance, especially for large datasets or frequent searches. "
"For optimal speed and efficiency, consider installing NumPy: "
"pip install numpy"
)
return False
一个极短的函数同时示范了 Python 库开发的三条常见实践:
1、@functools.lru_cache(maxsize=1) 防止重复警告。_check_numpy 在每次向量相似度计算时都被调用——没装 numpy 的环境每次都走 else 分支发 warning,用户 log 会被刷爆。lru_cache 把函数的返回值缓存——第一次调用时 log 一次、后续所有调用直接拿缓存返回值不重复执行函数体。这是比 “module-level boolean flag” 更干净的 memoization。
2、util.find_spec("numpy") 做存在性检测不 import。不是 try: import numpy except ImportError。find_spec 在不执行 numpy init 代码的情况下就能告诉你 numpy 是否可用——快(init numpy 可能要几十 ms)且不污染 sys.modules(真正需要 numpy 时才 import)。生产 Python 库做可选依赖检测应该优先用这个模式。
3、warning 消息给出了行动建议。不是 “NumPy not found”——是 “pip install numpy”。§15.8.4 提过 LangGraph 的错误消息风格——错误/警告都是”下一步指导”,不是事实陈述。读库时看到 logger.warning 里带具体命令,就是 LangGraph 的签名风格之一。
这个小函数和前面 §15.9.4 的 AsyncBatchedBaseStore._run 里 weakref.ref + if s := store() 的 GC 感知模式一起,构成了 LangGraph Store 模块里一系列**“看起来无关紧要但让库在长寿命进程里保持干净”**的小设计。这些设计不会出现在 README、用户也永远不会直接接触——但它们是一个库从”能用”到”可信赖地长期用”的分水岭。
15.8.5 tokenize_path 与 get_text_at_path:index.fields 背后的 mini-DSL
Store 的 index.fields 允许用户写路径表达式指定哪些字段要被向量索引:
store = InMemoryStore(index={
"dims": 1536,
"embed": my_embed_fn,
"fields": [
"text", # 简单字段
"metadata.title", # 嵌套字段
"tags[*]", # 数组全部元素
"authors[0]", # 指定索引
"authors[-1]", # 倒数第一
"{title,description}", # 多字段一起索引
]
})
这套表达式的解析器 tokenize_path(base/embed.py:333)是个手写的字符扫描器——while i < len(path) 循环、分支处理 [、{、.、普通字符。90 行代码实现了一个最小 JSONPath 子集:
field1.field2→ tokenize 成["field1", "field2"]tags[*]→["tags", "[*]"]([*]作为一个 token 传下去){title,description}→["{title,description}"](花括号内不拆)
取值函数 get_text_at_path(base/embed.py:233)拿这些 tokens 按顺序下潜——遇到 [*] 展开 list 所有元素、遇到 {a,b} 取多个字段都返回。最终产出 list[str],每个字符串作为一条单独的待 embed 文本。
这层抽象解决了**“多字段 + 数组 + 嵌套”混合场景**——("docs",) 下一条 Item 可能是:
{
"title": "Python for LLM",
"sections": [
{"heading": "Intro", "body": "..."},
{"heading": "Chat Models", "body": "..."},
],
"authors": ["Alice", "Bob"]
}
配置 fields=["title", "sections[*].body"] 就能同时索引 title 和每个 section 的 body——一个 Item 产出 3 条 embedding(1 个 title + 2 个 section body)。_batch_search 里的 max pooling 会在搜索时自动把这 3 条折回单 Item。
这套小 DSL 的价值:用户不用把文档拆成多个 Item 就能分字段索引。PostgresStore 和 SqliteStore 也用同样的 tokenize_path——DSL 在 base/embed.py 一处实现,三种后端都能共用。这是 LangGraph 的典型风格:核心工具函数放 base 子包、后端只专注 I/O 差异。
15.9 设计决策
15.9.1 为什么 Store 接口基于 batch?
BaseStore 的核心是 batch 方法,而非独立的 get/put 方法。这个设计的动机是网络效率:在生产环境中,Store 通常是远程服务(如 PostgresStore),每次 RPC 调用都有网络延迟。batch 操作允许将多个读写请求合并为一次网络往返:
# 效率更高的批量操作
results = store.batch([
GetOp(("users", "alice"), "prefs"),
GetOp(("users", "alice"), "profile"),
PutOp(("logs",), "entry_1", {"action": "login"}),
])
便捷方法(get、put 等)内部将单个操作包装为长度为 1 的 batch 调用,保持 API 简洁的同时不损失性能优化的可能。
15.9.2 为什么 Item.value 是 dict 而非 Any?
Item.value 固定为 dict[str, Any] 类型,不像 Send.arg 那样接受任意类型。原因有三:
- 可过滤性:
SearchOp.filter需要按 key 做精确匹配,dict 结构天然支持 - 可索引性:向量搜索需要按字段路径提取文本,dict 结构便于字段定位
- 序列化安全:dict 可以直接 JSON 序列化,而任意 Python 对象不行
15.9.3 InMemoryStore 的定位
InMemoryStore 是开发和测试的首选,但不适合生产环境——数据在进程退出时丢失。生产环境应使用 PostgresStore 或 SqliteStore:
# 开发
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()
# 生产
from langgraph.store.postgres import PostgresStore
store = PostgresStore(connection_string="postgresql://...")
所有实现都遵循 BaseStore 接口,节点代码无需修改即可切换后端。
15.9.3-bis PostgresStore 的 schema:主表 + 向量表 + ON DELETE CASCADE
为了让”内存 vs 持久化后端”的差别不止停留在口头上,打开 PostgresStore 源码 libs/checkpoint-postgres/langgraph/store/postgres/base.py:62——MIGRATIONS 里能看到它真实的表结构:
-- MIGRATIONS[0]:主表
CREATE TABLE IF NOT EXISTS store (
prefix text NOT NULL, -- namespace 拼接
key text NOT NULL, -- Item key
value jsonb NOT NULL, -- value 存成 jsonb
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (prefix, key)
);
-- MIGRATIONS[1]:前缀索引(text_pattern_ops 让 LIKE 'prefix%' 能走 index)
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_prefix_idx
ON store USING btree (prefix text_pattern_ops);
-- MIGRATIONS[2]:TTL 支持(后加的)
ALTER TABLE store
ADD COLUMN IF NOT EXISTS expires_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN IF NOT EXISTS ttl_minutes INT;
-- MIGRATIONS[3]:只对 expires_at IS NOT NULL 建 partial index
CREATE INDEX IF NOT EXISTS idx_store_expires_at ON store (expires_at)
WHERE expires_at IS NOT NULL;
-- VECTOR_MIGRATIONS:启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;
-- 向量子表(每个字段一行)
CREATE TABLE IF NOT EXISTS store_vectors (
prefix text NOT NULL,
key text NOT NULL,
field_name text NOT NULL,
embedding vector(1536), -- 维度由 index_config 决定
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (prefix, key, field_name),
FOREIGN KEY (prefix, key) REFERENCES store(prefix, key) ON DELETE CASCADE
);
五条设计值得拆开细看:
1、namespace tuple 被拼成 prefix 一列。内存版用的 dict[tuple, ...] 在 Postgres 里要扁平化成一个字符串——tuple ("users", "alice") 变成 "users.alice"(这也就是 §15.3.0 禁止 label 里含 . 的根本原因)。PRIMARY KEY (prefix, key) 保证 namespace+key 的唯一性,和内存版的 _data[namespace][key] 语义等价。
2、prefix text_pattern_ops 索引。Postgres 默认 btree 索引用 locale-aware 比较——对 LIKE 'users.alice%' 这种前缀匹配不能走索引。text_pattern_ops 操作符类告诉 Postgres “这列按字节比较、用于 LIKE 前缀匹配时能走 index”。§15.8.1 的 MatchCondition("prefix", (...)) 在 Postgres 后端就对应一条 WHERE prefix LIKE 'users.%'——没有这个操作符类就得全表扫描。
3、TTL 是后加特性(MIGRATIONS[2]/[3])。原始 schema 没有 expires_at,后来才 ALTER 加列——这解释了 §15.2.6 为什么 supports_ttl 默认 False:TTL 是 opt-in 特性、后引入、不是所有实现都支持。同一个 Store 可以在升级到新 LangGraph 时自动跑迁移开启 TTL(靠 MIGRATIONS 自增)。
4、partial index 只索引”有 TTL”的行。CREATE INDEX ... WHERE expires_at IS NOT NULL 是 Postgres partial index——不索引 NULL 行。大部分 Store 条目没 TTL(永不过期)——索引它们纯粹是浪费空间和维护成本。只给需要被后台扫描器 sweep 的那些建索引——这类 partial index 是 Postgres 优化器的一条典型实践。
5、ON DELETE CASCADE 解决了 §15.4.2-bis 的”僵尸向量”问题。Postgres schema 的 store_vectors 表引用 store 表的 PK——删除主表一行时,向量子表对应行会自动级联删。这比 InMemoryStore 手动在 _apply_put_ops 里 self._vectors[namespace].pop(key, None) 更可靠——由数据库内核保证原子一致性,不会因为中间出错留下半清半不清的状态。
这些 schema 细节解释了”为什么 PostgresStore 在生产更稳”的每一条底层理由。如果未来你要自己写一个 Store 后端(比如 RedisStore),这份 MIGRATIONS 是最好的起点清单——9 条 SQL 换来一个生产级 Store。
15.9.4 TTL 与数据清理
Store 支持可选的 TTL(Time-To-Live)机制。GetOp 和 SearchOp 的 refresh_ttl 参数控制读取操作是否刷新 Item 的存活时间。这对于缓存场景特别有用——频繁访问的数据自动延长存活,冷数据自然过期。
stateDiagram-v2
[*] --> Active: store.put(ttl=3600)
Active --> Accessed: store.get(refresh_ttl=True)
Accessed --> Active: TTL 重置
Active --> Expired: TTL 到期
Expired --> [*]: 自动清理
15.9.4-bis TTLConfig:3 个字段各控一条语义
TTL 的真实配置结构 TTLConfig(base/init.py:545)是个 TypedDict,只有三个字段:
class TTLConfig(TypedDict, total=False):
refresh_on_read: bool
"""若 True,读操作(get/search)默认也刷新 TTL。
可在单次操作上用 refresh_ttl 覆盖。默认 True。"""
default_ttl: float | None
"""单位是分钟。新 Item 若没显式指定 ttl,用这个默认。
None 表示不过期。"""
sweep_interval_minutes: int | None
"""后台扫描清理过期条目的间隔(分钟)。
None 表示不清理(过期条目仍在表里,但查询时被过滤)。"""
三个字段的设计表达了三条独立但相关的语义:
1、refresh_on_read 控制读是否 sliding TTL。如果一个缓存类场景希望”热数据自动延长 TTL、冷数据自然过期”——就用默认的 refresh_on_read=True。如果是审计日志类场景”创建时间固定、到期必须消失”——设 refresh_on_read=False,读操作不重置计时器。这是 LRU 和 TTL 两种缓存语义的融合点。
2、default_ttl 是”没指定 ttl 的新 Item 用什么默认”。类似 Redis 的 maxttl 配置——防止忘记指定 TTL 导致数据堆积。生产环境建议永远设一个兜底值,比如 default_ttl=60*24*7(7 天)——意味着最宽松情况下数据也不会超过 7 天还存活。这也是 Store 接近”缓存语义”而非”数据库语义”的体现。
3、sweep_interval_minutes 控制”到期后多久被物理删除”。过期 Item 不会在 get() 返回,但行还在表里——直到 sweeper 定期扫到为止。这是 Postgres Store 和 SqliteStore 的实现策略;InMemoryStore 目前不支持真实过期扫描,supports_ttl=False(§15.2.6)就是这个原因。
三个字段 + supports_ttl capability flag 合起来构成了一套极简的 TTL 系统——对比 Redis 的几十个 expiration 相关命令和配置,LangGraph Store 选了最小可行的特性集:不支持 sub-second ttl、不支持 conditional expire、不支持 expire on write vs read 的双时钟。这种克制对一个面向 LLM 长期记忆场景的系统是合理的——memory 不该有毫秒级过期需求,让配置表面简洁能降低用户写错概率。
这条”最小可行”和 §15.2.2 Store 只暴露 4 种操作原语是同一套克制哲学:比起给 10 种 API 让用户自己 pick,LangGraph 选择 4 种覆盖 90% 场景的原语 + 一套可组合的 filter DSL。读到这一层,你就能理解整个 Store 模块的审美 ——简单 > 强大,因为强大的东西更难用对。
15.9.4 AsyncBatchedBaseStore:跨 tick 自动聚合的后台批处理
前面讲 Store 的 batch 是”显式批量”——用户写 store.batch([op1, op2, op3]) 才会聚合。LangGraph 还有一个隐式批量类型 AsyncBatchedBaseStore(base/batch.py:58),用于把异步环境下同一个事件循环 tick 内发起的独立 op 自动聚合为一次 abatch。实现值得读:
class AsyncBatchedBaseStore(BaseStore):
__slots__ = ("_loop", "_aqueue", "_task")
def __init__(self) -> None:
super().__init__()
self._loop = asyncio.get_running_loop()
self._aqueue: asyncio.Queue[tuple[asyncio.Future, Op]] = asyncio.Queue()
self._task: asyncio.Task | None = None
self._ensure_task()
def _ensure_task(self) -> None:
if self._task is None or self._task.done():
self._task = self._loop.create_task(_run(self._aqueue, weakref.ref(self)))
每个 aget/aput/asearch 都把 (future, op) 塞进 _aqueue,后台协程 _run(base/batch.py:326)负责消费:
async def _run(aqueue, store):
while item := await aqueue.get():
if item[0].done(): continue
if s := store(): # ← 弱引用取回 store
items = [item]
try:
while item := aqueue.get_nowait(): # ← 抽空本 tick 所有 op
if item[0].done(): continue
items.append(item)
except asyncio.QueueEmpty:
pass
futs = [item[0] for item in items]
values = [item[1] for item in items]
listen, dedupped = _dedupe_ops(values) # ← 跨 future 去重
results = await s.abatch(dedupped)
if listen is not None:
results = [results[ix] for ix in listen]
for fut, result in zip(futs, results, strict=False):
if not fut.done():
fut.set_result(result)
else:
break # ← store 被 GC 了,后台 task 自行退出
四条值得看懂的设计:
1、while await aqueue.get(): ... try: while aqueue.get_nowait() 两级抓取。第一次 await get() 阻塞等第一条 op 到达;拿到后立刻用 get_nowait() 在不 await 的情况下把队列里所有剩余的 op 也抓过来。这种 “等 1 个 + 搜刮剩下所有” 是 asyncio 社区里最经典的微批(micro-batching)套路——既不因为太激进批量化而增加延迟,也不因为太保守每次只处理 1 条而失去批量优势。
2、weakref.ref(self) + _ensure_task 的生命周期策略。后台 _run 对 store 用弱引用。如果用户的 store 对象被 GC(比如测试代码局部变量超出作用域),后台协程会 if s := store(): ... else: break 自行退出——不会因为 task 持有强引用导致 store 永远不被释放。__del__ 里额外 self._task.cancel() 兜底。这套组合让用户不用操心”调用完 Store 要不要 close”——Python GC 一来就全自动清理。
3、_dedupe_ops 跨 future 去重。同一 tick 内如果有两个业务代码调用 aget(("users","alice"), "prefs")——两条 GetOp 参数完全一样。_dedupe_ops(base/batch.py:300)的做法:
if isinstance(op, (GetOp, SearchOp, ListNamespacesOp)):
try:
listen.append(dedupped.index(op)) # ← 已有相同 op,复用
except ValueError:
listen.append(len(dedupped))
dedupped.append(op)
elif isinstance(op, PutOp):
putkey = (op.namespace, op.key)
if putkey in puts:
# Overwrite previous put
ix = puts[putkey]
dedupped[ix] = op # ← 同 key 多次 put,取最后一个
listen.append(ix)
读操作(Get/Search/List)看到重复直接复用已有 op 的结果;写操作(Put)看到同 (ns, key) 的两次 Put 把后者覆盖前者。listen 数组维护”每个 future 要读 results 的哪个索引”——查询完再按 listen 把结果映射回各自的 future。
这层去重对 agent 场景极其有用——100 个并发 agent 读同一个用户偏好只会产生 1 次后端调用,但每个 agent 都拿到正确的结果。
4、_check_loop 装饰器防死锁。同步接口(如 store.get())如果在自己的事件循环里被调用会阻塞——_check_loop(base/batch.py:33)检测到 asyncio.get_running_loop() is store._loop 就立刻 raise InvalidStateError,错误消息直接告诉用户改成 await store.aget(...)——这种”错误消息内置迁移建议”的做法和第 9 章 serde 的 TOOL_CALL_ERROR_TEMPLATE 是一个思路:错误消息不是日志、是下一步指导。
这套 AsyncBatchedBaseStore 在 LangGraph Platform 的 PostgresStore 子类里默认启用——用户代码写 await runtime.store.aget(...)、底层自动变成跨并发请求的批量。不改一行业务代码就获得 10-100x 的吞吐提升——这才是 LangGraph Store 最有价值的性能特性。
15.9.4-ter IndexConfig.embed:4 种类型、一套 ensure_embeddings 归一
Store 的 embed 字段定义(base/init.py:590)接受 4 种完全不同形态的输入:
embed: Embeddings | EmbeddingsFunc | AEmbeddingsFunc | str
- LangChain 的
Embeddings类实例(init_embeddings("openai:text-embedding-3-small")返回的) - 同步函数
(list[str]) -> list[list[float]](直接用 OpenAI SDK 自己写) - 异步函数
(list[str]) -> Awaitable[list[list[float]]](用 AsyncOpenAI 写) - 字符串 provider 标识符(
"openai:text-embedding-3-small"——LangGraph 自己帮你调init_embeddings)
这四种怎么归一的?ensure_embeddings(embed.py:34)统一把它们包装成 Embeddings 接口:
- 已经是 Embeddings 实例——原样返回
- 是同步/异步函数——包成
EmbeddingsLambda(embed.py:109) - 是字符串——call
langchain.embeddings.init_embeddings("...")——但这会可选依赖 langchain
第 4 种 “provider 字符串” 的好处和陷阱:
好处:用户代码里写 index={"embed": "openai:text-embedding-3-small"}——比自己实例化 embedder 简洁得多。不用 import OpenAI、不用写 API key 管理代码。对”快速原型”场景非常友好。
陷阱:langgraph 自己不依赖 langchain-openai。你只装 pip install langgraph 再用 embed="openai:..." 会在运行时报 ImportError: Could not import langchain_openai。LangGraph 刻意不预装所有 provider 客户端——这是它保持核心包小的代价。生产代码推荐用 1 或 2 的显式实例化,一是避开这个陷阱,二是让依赖关系在 pip install 阶段暴露而不是运行时。
同步 vs 异步函数的选择也有策略:InMemoryStore.batch() 走同步路径——只能用同步 embed;InMemoryStore.abatch() 优先用异步 embed、如果传的是同步函数会用 run_in_executor 包装。如果你的 LLM 调用全在 async loop 里(LangGraph v1.0 的主流方式),请传 async 函数——否则 embed 调用会占线程池 slot、拖慢其他异步任务。
这层”4 种形态统一到一个接口”的设计是 LangGraph 的典型作风——API 表面给用户最大自由,内部实现统一归一。从 Runnable 到 Checkpointer 到 Embeddings,都是这个模式。一旦你掌握这个套路,读 LangGraph 源码就会越来越快——看到一个接口接受多种输入类型,就找”ensure_XXX”函数看怎么归一,90% 的情况都能秒懂。
15.9.5 本章在全书体系中的坐标
回到本书整体脉络——Store 是 LangGraph 四大系统组件(StateGraph、Pregel、Checkpoint、Store)里位置最边缘、但对”应用质量”贡献最大的一个。几个承接关系值得明示:
- 承接第 7 章(Pregel 调度):Store 通过
Runtime.store和ToolRuntime.store两条通道注入到节点/工具里。注入时机是 Pregel 的prepare_task阶段——和 config、context 同批。理解了第 7 章的 task-scoped injection,就能理解”为什么节点看到的 store 是同一个实例”——它不是每次新建的,是 compile 时注入的单例引用传到每个任务。 - 承接第 13 章(Checkpoint):本章 §15.5 画清了两者分工。Store 的数据生命周期不受 checkpoint 版本影响——你 rollback 到 checkpoint_id=X 不会把 Store 里写过的东西”撤销”。这个设计是刻意的——长期记忆不应该被会话级的时间旅行抹掉。
- 支撑第 16 章(预构建 agent):
InjectedStore注解和ToolRuntime.store字段(第 16.6.2/16.6.3 节)都依赖本章 BaseStore 的 API 契约。_inject_tool_args里对 store 参数的”剥离 + 补回”安全层(第 16.3.6 节)防的就是”LLM 伪造 store”——这条防线的存在预设了 store 是框架受控资源、不是业务数据。 - 铺垫第 17 章(multi-agent):多 agent 架构里最常见的”跨 agent 共享知识”就是 Store 的用武之地。主 agent 把 research 结果
store.put(("knowledge", topic), ...),子 agentstore.search(("knowledge",), query=...)拿结果——Store 是 agent 之间的”共享白板”。这个用法的实现细节在下一章展开,本章打好的 BaseStore 四操作原语(Get/Put/Search/ListNamespaces)就是白板上的”写/读/搜/列”。
这层承接关系不是后话、而是 LangGraph 的核心设计意图。Store、Checkpoint、Pregel 三者在 compile() 时同时进入 Pregel 实例——它们天生就是配套使用的。本章把 Store 单独拎出来讲,是方便知识分层;实际写应用时你几乎总会同时用到三者。
从更宏观的软件架构视角看,Store 是 LangGraph 整个技术栈里层次最高、对业务代码最近的一层——它直接被节点/工具代码调用。Checkpoint 和 Pregel 更多是框架内部机制,用户感知较弱;Store 则是你的业务代码每天都要写的东西。理解本章揭示的 Store 内部的 30 多处工程决策——_validate_namespace 的四条铁律、AsyncBatchedBaseStore 的跨 future 去重、_cosine_similarity 的 numpy 可选路径、PostgresStore 的 ON DELETE CASCADE——能让你在写应用时做出更匹配 Store 本意的 API 调用选择。这种”框架意图驱动应用选型”的能力,是从”会用 LangGraph”到”精通 LangGraph”的关键一跃。
15.10 小结
本章详细分析了 LangGraph 的 Store 长期记忆系统。BaseStore 通过四种操作原语(Get、Put、Search、ListNamespaces)和层级化的命名空间,为 LLM 应用提供了灵活的跨线程持久化能力。InMemoryStore 以简洁的字典实现覆盖了开发测试需求,同时支持可选的向量语义搜索。
Store 与 Checkpoint 的关系是互补的:Checkpoint 管理线程内的状态历史(短期记忆),Store 管理跨线程的持久化数据(长期记忆)。通过 runtime.store 的统一访问接口,节点和工具可以无缝地读写长期记忆,而底层存储后端(内存、SQLite、PostgreSQL)可以透明切换。
batch 接口的设计体现了对生产环境网络效率的重视;dict[str, Any] 的值类型约束确保了可过滤性、可索引性和序列化安全;层级命名空间为多租户和多项目场景提供了自然的组织结构。
下一章我们将进入 LangGraph 的预构建组件层——create_react_agent 工厂函数、ToolNode 实现和 tools_condition 路由——了解框架如何将底层的图构建原语封装为开箱即用的 Agent 架构。
本章源码定位:主要引用来自 langgraph-latest/libs/checkpoint/langgraph/store/ 目录的三个 Python 文件——base/__init__.py(1314 行,定义 BaseStore/Item/Op/TTLConfig/IndexConfig 和校验函数)、base/batch.py(371 行,AsyncBatchedBaseStore 和 _dedupe_ops)、base/embed.py(433 行,ensure_embeddings/tokenize_path/get_text_at_path)、memory/__init__.py(592 行,InMemoryStore 的全部实现)。Postgres 后端细节来自 libs/checkpoint-postgres/langgraph/store/postgres/base.py(MIGRATIONS 和 VECTOR_MIGRATIONS)。所有引用都给了准确的行号锚点——存疑可直接对照核验。
本章和第 8 章(Checkpoint)在系统组件图里是”双持久化”:短期在 Checkpoint、长期在 Store;本章和第 16 章(预构建 agent)、第 17 章(multi-agent)是”对接用户代码”的前置知识——理解了 Store 的四原语 + TTLConfig/IndexConfig 两配置 + AsyncBatchedBaseStore 自动聚合,后面读 create_react_agent 里 store= 参数传进去做什么才不会停留在”被神秘注入”的模糊状态。生产环境向量搜索通常要关注:pgvector 扩展启用、HNSW/IVF 参数调优、dims 匹配 embedding 模型输出、fields 路径覆盖实际索引的字段——这几个旋钮正是本章反复出现的配置项。