LangGraph 设计与实现

第15章 Store 与长期记忆

作者 杨艺韬 · 13,110 字

第15章 Store 与长期记忆

15.1 引言

Checkpoint 是 LangGraph 的”短期记忆”——它保存单个线程(thread)内的完整状态历史,支持时间旅行和中断恢复。但在许多 LLM 应用中,我们还需要一种”长期记忆”:跨线程、跨对话的持久化存储。例如,用户的偏好设置在所有对话中都应该可用;一个文档分析 Agent 应该记住之前处理过的文档;多个 Agent 之间需要共享知识库。

LangGraph 通过 BaseStore 接口和 InMemoryStore 实现提供了这种长期记忆能力。Store 是一个层级化的键值存储,支持命名空间(namespace)组织、精确查询和可选的向量语义搜索。它通过 Runtime.store 注入到节点函数中,也可以在工具函数中通过 ToolRuntime.store 访问。

本章将从 BaseStore 的抽象接口出发,分析 Item 数据模型、操作类型(Get/Put/Search/ListNamespaces)的设计,深入 InMemoryStore 的实现细节,并对比 Store 与 Checkpoint 在定位、生命周期和使用场景上的本质区别。

本章要点

  1. BaseStore 接口的操作原语——Get、Put、Search、ListNamespaces 四种操作
  2. Item 数据模型——key/value/namespace/created_at/updated_at
  3. InMemoryStore 的实现——字典存储 + 可选向量搜索
  4. Store 与 Checkpoint 的区别——跨线程 vs 线程内、键值 vs 快照
  5. runtime.store 的访问模式——从节点和工具中使用 Store

15.2 BaseStore 接口

15.2.1 接口定义

BaseStore 定义在 langgraph/store/base/__init__.py 中,是一个抽象基类,定义了 Store 的全部操作原语:

class BaseStore(ABC):
    """Abstract base class for persistent key-value stores."""

    @abstractmethod
    def batch(self, ops: Iterable[Op]) -> list[Result]:
        """Execute a batch of operations."""

    @abstractmethod
    async def abatch(self, ops: Iterable[Op]) -> list[Result]:
        """Async version of batch."""

    # 便捷方法(基于 batch 实现)
    def get(self, namespace: tuple[str, ...], key: str) -> Item | None: ...
    def put(self, namespace: tuple[str, ...], key: str, value: dict) -> None: ...
    def delete(self, namespace: tuple[str, ...], key: str) -> None: ...
    def search(self, namespace_prefix: tuple[str, ...], **kwargs) -> list[Item]: ...
    def list_namespaces(self, **kwargs) -> list[tuple[str, ...]]: ...

    # 对应的 async 版本
    async def aget(self, ...) -> Item | None: ...
    async def aput(self, ...) -> None: ...
    async def adelete(self, ...) -> None: ...
    async def asearch(self, ...) -> list[Item]: ...
    async def alist_namespaces(self, ...) -> list[tuple[str, ...]]: ...

所有便捷方法最终都委托给 batchabatch 方法。这种设计的好处是实现类只需要实现两个方法就能获得完整的接口,同时 batch 方法天然支持操作批量化,有利于网络 I/O 优化。

15.2.2 操作类型

Store 定义了四种操作原语,每种都是一个 NamedTuple:

graph TB
    subgraph 操作类型
        GetOp["GetOp<br/>namespace + key<br/>精确查找单个 Item"]
        PutOp["PutOp<br/>namespace + key + value<br/>创建或更新 Item"]
        SearchOp["SearchOp<br/>namespace_prefix + filter/query<br/>搜索多个 Item"]
        ListNS["ListNamespacesOp<br/>match_conditions<br/>列举命名空间"]
    end

    GetOp -->|返回| Item["Item | None"]
    PutOp -->|返回| None_["None"]
    SearchOp -->|返回| Items["list[SearchItem]"]
    ListNS -->|返回| NS["list[tuple[str, ...]]"]

15.2.3 GetOp

class GetOp(NamedTuple):
    namespace: tuple[str, ...]
    key: str
    refresh_ttl: bool = True

GetOp 通过 namespace + key 精确定位一个 Item。refresh_ttl 控制是否刷新该 Item 的 TTL(存活时间)。

15.2.4 PutOp

class PutOp(NamedTuple):
    namespace: tuple[str, ...]
    key: str
    value: dict[str, Any] | None  # None 表示删除
    index: list[str] | Literal[False] | None = None

PutOp 用于创建、更新或删除 Item。当 valueNone 时表示删除。index 字段控制哪些值路径需要被向量索引。

15.2.5 SearchOp

class SearchOp(NamedTuple):
    namespace_prefix: tuple[str, ...]
    filter: dict[str, Any] | None = None
    limit: int = 10
    offset: int = 0
    query: str | None = None
    refresh_ttl: bool = True

SearchOp 是最灵活的查询操作。它支持三种过滤方式:

  1. 命名空间前缀:只搜索指定前缀下的 Item
  2. 结构化过滤:通过 filter 字典做精确匹配或比较操作
  3. 语义搜索:通过 query 字符串做向量相似度搜索

过滤操作支持多种比较运算符:

# 精确匹配
{"status": "active"}

# 比较运算符
{"score": {"$gt": 4.99}}
{"score": {"$gte": 3.0}}
{"priority": {"$lt": 5}}
{"count": {"$lte": 100}}
{"status": {"$ne": "deleted"}}

15.3 Item 数据模型

15.2.6 BaseStore 的两个被忽视的类级属性:supports_ttl 与 weakref

打开 libs/checkpoint/langgraph/store/base/__init__.py:700,BaseStore 类头定义里藏着两个教学资料极少提到的类级变量:

class BaseStore(ABC):
    supports_ttl: bool = False
    ttl_config: TTLConfig | None = None

    __slots__ = ("__weakref__",)

1、supports_ttl: bool = False 是一个 feature capability flag。子类必须显式覆写为 True 才能”声称支持 TTL”。InMemoryStore 没覆写——所以它的 supports_ttl 读出来是 False——即使它的 put/get 接口接受 ttl 参数。这不是 API 不一致,而是一条明确契约:InMemoryStore 接受 TTL 参数但不会真的执行过期回收,用户代码如果依赖 TTL 语义必须用 Postgres/Sqlite 后端。Python 里没有 trait/interface 支持特性检查,用类属性模拟出来——使用方法:

store = InMemoryStore()
assert store.supports_ttl is False       # 静态查询能力

2、__slots__ = ("__weakref__",) 有两层用意:

  • 禁止动态添加实例属性。如果子类定义了 __slots__ 但基类没有,就会破坏 slots 隔离(实例会有 __dict__)——所以基类也要声明自己的 slots
  • 显式保留 __weakref__。slot 类默认不支持弱引用(weakref.ref(store) 会抛 TypeError),把 "__weakref__" 放进 slots 恢复这个能力。LangGraph 在 langgraph.pregel.main.Pregel 里对 store 会持弱引用(避免循环引用导致 GC 不回收)——因此 BaseStore 必须支持 weakref。

看似不起眼的一行代码背后是”内存布局约束 + 弱引用生态互通”的双重考虑。这条实践在 Python 库开发里值得记住:写基类用 __slots__,但永远别忘了加 "__weakref__"——否则库会被在复杂场景下使用时出现”some_type does not support weak references”的诡异错误。

15.3 Item 数据模型

15.3.0 _validate_namespace:被 Pregel 在编译期强加的四条铁律

Store 接口接受的 namespace 是 tuple[str, ...]——Python 里一个普通元组没有格式要求。但 BaseStore.put/BaseStore.search 第一行都会调 _validate_namespace(namespace)store/base/__init__.py:911, 1172)。这个校验函数(第 1255 行)把四条看似平常的规则写死在了框架里

def _validate_namespace(namespace: tuple[str, ...]) -> None:
    if not namespace:
        raise InvalidNamespaceError("Namespace cannot be empty.")
    for label in namespace:
        if not isinstance(label, str):
            raise InvalidNamespaceError(...)
        if "." in label:
            raise InvalidNamespaceError(
                f"Invalid namespace label '{label}' found in {namespace}. "
                f"Namespace labels cannot contain periods ('.')."
            )
        elif not label:
            raise InvalidNamespaceError(
                f"Namespace labels cannot be empty strings. Got {label} in {namespace}"
            )
    if namespace[0] == "langgraph":
        raise InvalidNamespaceError(
            f'Root label for namespace cannot be "langgraph". Got: {namespace}'
        )

四条规则各有源自真实后端的工程动机:

规则 1:namespace 不能为空。空 tuple () 作为 “根 namespace” 在语义上模糊——到底是”存到顶层”还是”查所有”?LangGraph 选择拒绝以消除歧义。你想存顶层文档应该写 ("documents",) 这种至少有一层的 namespace。

规则 2:label 必须是 str。限制这条是因为 PostgresStore/SqliteStore 会把 namespace 拼接成一条 ltree 路径或字符串主键——tuple 里出现 int/UUID 会让拼接不稳定。统一要求 str 让所有后端序列化方式一致。

规则 3:label 不能含 .点号是 Postgres ltree 类型的分隔符——如果允许 ("users.alice", "prefs"),Postgres 存成 users.alice.prefs、查询时会被误解析为三层命名空间。InMemory/Sqlite 自己不会有这个问题,但为了跨后端可移植性,校验器统一禁止。用户需要在 label 里表达多段信息时应该用下划线或连字符(("users", "alice_corp")),而不是点号。

规则 4:根 label 不能是 "langgraph"。这是 LangGraph 自己预留的保留字——框架内部可能会用 ("langgraph", ...) 前缀存储系统元数据(比如 checkpointer 的迁移状态、Pregel 的编译缓存)。用户代码如果写进 ("langgraph", ...) 会发生谁覆盖谁都说不清楚——所以预先禁止。

这四条校验看似冗余、实则决定了 Store 在多后端场景下的一致性。在接口层做防御校验、在实现层假设输入已经合法——这是分层架构里”迟一行校验、早一层信任”的经典操作。用户实际代码被拒时错误消息非常精确(点名哪个 label 有问题),回到第 13 章讨论 Checkpoint 的 CheckpointError 时我们说过这种”可定位的输入错误”是 LangGraph 设计哲学的一贯特征。

15.3.1 结构定义

class Item:
    """Represents a stored item with metadata."""

    __slots__ = ("value", "key", "namespace", "created_at", "updated_at")

    def __init__(
        self,
        *,
        value: dict[str, Any],
        key: str,
        namespace: tuple[str, ...],
        created_at: datetime,
        updated_at: datetime,
    ):
        self.value = value
        self.key = key
        self.namespace = tuple(namespace)
        self.created_at = (
            datetime.fromisoformat(cast(str, created_at))
            if isinstance(created_at, str)
            else created_at
        )
        self.updated_at = (
            datetime.fromisoformat(cast(str, updated_at))
            if isinstance(updated_at, str)
            else updated_at
        )

Item 使用 __slots__ 优化内存,值类型固定为 dict[str, Any]。时间戳字段支持从 ISO 8601 字符串反序列化——这是为了兼容 JSON 存储后端。

15.3.2 命名空间的层级设计

graph TB
    Root["()"]
    Root --> Users["('users',)"]
    Root --> Docs["('docs',)"]
    Users --> U1["('users', 'alice')"]
    Users --> U2["('users', 'bob')"]
    U1 --> Prefs["('users', 'alice', 'prefs')"]
    U1 --> History["('users', 'alice', 'history')"]
    Docs --> D1["('docs', 'project_a')"]

    style Root fill:#f9f9f9,stroke:#333
    style Users fill:#e6f3ff,stroke:#333
    style Docs fill:#fff3e6,stroke:#333

命名空间是一个字符串元组,形成类似文件系统的层级结构。这种设计的优势:

  • 组织清晰:自然地按用户、项目、类型分层
  • 搜索高效:通过 namespace_prefix 可以搜索某个子树下的所有 Item
  • 访问控制:可以基于命名空间前缀实现权限隔离

15.3.3 SearchItem

class SearchItem(Item):
    """Represents an item returned from a search operation."""

    __slots__ = ("score",)

    def __init__(self, ..., score: float | None = None):
        super().__init__(...)
        self.score = score

SearchItem 继承自 Item,增加了 score 字段,用于向量搜索时返回相似度分数。

15.4 InMemoryStore 实现

15.4.1 数据结构

class InMemoryStore(BaseStore):
    __slots__ = ("_data", "_vectors", "index_config", "embeddings")

    def __init__(self, *, index: IndexConfig | None = None) -> None:
        self._data: dict[tuple[str, ...], dict[str, Item]] = defaultdict(dict)
        self._vectors: dict[tuple[str, ...], dict[str, dict[str, list[float]]]] = (
            defaultdict(lambda: defaultdict(dict))
        )
        self.index_config = index
        if self.index_config:
            self.embeddings = ensure_embeddings(self.index_config.get("embed"))

InMemoryStore 使用两层嵌套字典:外层按命名空间分组,内层按 key 索引。向量数据单独存储在 _vectors 字典中。

graph LR
    subgraph "_data 存储结构"
        NS1["('users', 'alice')"] --> K1["'prefs' -> Item"]
        NS1 --> K2["'profile' -> Item"]
        NS2["('docs',)"] --> K3["'doc1' -> Item"]
    end

    subgraph "_vectors 向量索引"
        VNS1["('docs',)"] --> VK1["'doc1'"]
        VK1 --> VP1["'text' -> [0.1, 0.2, ...]"]
    end

15.4.1-bis batch 入口:三型拆分到三种处理路径

InMemoryStore.batch(memory/init.py:206)的真实实现只有 14 行,但把 Store 所有”批量做什么”的逻辑摊在一个函数里:

def batch(self, ops: Iterable[Op]) -> list[Result]:
    # The batch/abatch methods are treated as internal.
    # Users should access via put/search/get/list_namespaces/etc.
    results, put_ops, search_ops = self._prepare_ops(ops)
    if search_ops:
        queryinmem_store = self._embed_search_queries(search_ops)
        self._batch_search(search_ops, queryinmem_store, results)

    to_embed = self._extract_texts(put_ops)
    if to_embed and self.index_config and self.embeddings:
        embeddings = self.embeddings.embed_documents(list(to_embed))
        self._insertinmem_store(to_embed, embeddings)
    self._apply_put_ops(put_ops)
    return results

_prepare_ops 把传入的 ops 按类型分成三路(memory/init.py:375):

for i, op in enumerate(ops):
    if isinstance(op, GetOp):
        item = self._data[op.namespace].get(op.key)
        results.append(item)               # ← 立即返回
    elif isinstance(op, SearchOp):
        search_ops[i] = (op, self._filter_items(op))
        results.append(None)               # ← 占位,稍后填
    elif isinstance(op, ListNamespacesOp):
        results.append(self._handle_list_namespaces(op))  # ← 立即返回
    elif isinstance(op, PutOp):
        put_ops[(op.namespace, op.key)] = op
        results.append(None)               # ← 占位
    else:
        raise ValueError(f"Unknown operation type: {type(op)}")

这套分型有两条工程巧思值得细读:

1、put_ops(namespace, key) 作 dict 的 key——天然去重。如果同一个 batch 里出现了两条针对同一 (ns, key) 的 PutOp,后者覆盖前者,只有一条最终被执行。这避免了”同批次重复 put 的顺序敏感性”——用户不用担心批量插入时同 key 操作的先后。

2、SearchOp 和 PutOp 都先占 None 返回位,最后才填。这是为了让 embedding 调用成批进行。如果一个 batch 里同时有 5 条 SearchOp 需要 embed query 和 3 条 PutOp 需要 embed value——各走各的 embedding 调用是 8 次 RPC;_embed_search_queries + _extract_texts 统一 embed 是 2 次 RPC(search 的 5 条一起、put 的 3 条一起)——把 embedding 成本压到最低,这对 LLM embedding 收费 API 非常关键。

从这个设计能反推出 LangGraph Store 的一条核心哲学:API 层暴露细粒度操作、实现层成批优化。用户随便调 store.get()/store.put()——接口简单、不用管批量;但如果你写 store.batch([...]) 传入复杂操作集,实现层会自动找到 I/O 和计算的聚合点。

15.4.2 基本操作示例

from langgraph.store.memory import InMemoryStore

store = InMemoryStore()

# 存储用户偏好
store.put(("users", "alice"), "prefs", {"theme": "dark", "language": "zh"})

# 读取
item = store.get(("users", "alice"), "prefs")
print(item.value)  # {"theme": "dark", "language": "zh"}
print(item.key)    # "prefs"
print(item.namespace)  # ("users", "alice")

# 搜索某用户下的所有数据
results = store.search(("users", "alice"))

# 列举所有命名空间
namespaces = store.list_namespaces(prefix=("users",))
# [("users", "alice"), ("users", "bob"), ...]

# 删除
store.put(("users", "alice"), "prefs", None)  # value=None 表示删除

15.4.2-bis _apply_put_ops 删除时同时清理 _data_vectors

value=None 表示删除这个约定比看起来要多一层。真实的 _apply_put_ops(memory/init.py:404):

def _apply_put_ops(self, put_ops):
    for (namespace, key), op in put_ops.items():
        if op.value is None:
            self._data[namespace].pop(key, None)
            self._vectors[namespace].pop(key, None)   # ← 同时清向量索引
        else:
            self._data[namespace][key] = Item(
                value=op.value, ...
            )

注意删除时必须同时 pop _data_vectors。如果只清 _data,后续向量搜索仍会匹配到”已经被删掉但还有 embedding 的 key”——返回一个 namespace+key 指向空数据的”僵尸 SearchItem”。

相反的,向更新 put 的路径没有清 _vectors——这是有意的。更新 value 后,同 key 的 embedding 会由 _extract_texts + _insertinmem_store 重新计算并覆盖掉旧向量——所以此时不必先清。但如果你把 _data 存的 value 从 “有 text 字段” 改成 “没 text 字段”,旧的 embedding 会继续存活(因为新 value 里没 text,不触发 re-embed)——这是一个 InMemoryStore 的已知边界行为,生产 Postgres Store 通过显式的 DELETE FROM ... WHERE 语句避免这个问题。

实际使用这条知识的价值:如果你在 InMemoryStore 上做了”从 {text: "foo"} 改成 {}”的 put,语义上那个 Item 应该不可搜索了,但向量搜索仍能用旧 embedding 找到它。要避免,应该先 put(ns, key, None) 删除、再重新 put 新 value。

15.4.3 向量搜索配置

from langgraph.store.memory import InMemoryStore

# 使用 LangChain embeddings
from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "dims": 1536,
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "fields": ["text"],  # 指定要索引的字段
    }
)

# 存储文档
store.put(("docs",), "doc1", {"text": "Python 编程入门教程", "author": "Alice"})
store.put(("docs",), "doc2", {"text": "TypeScript 类型系统详解", "author": "Bob"})

# 语义搜索
results = store.search(("docs",), query="编程语言教程")
for item in results:
    print(f"{item.key}: {item.value['text']} (score={item.score})")

向量搜索也支持使用原生的 OpenAI SDK 或任意嵌入函数:

from openai import OpenAI

client = OpenAI()

def embed_texts(texts: list[str]) -> list[list[float]]:
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return [e.embedding for e in response.data]

store = InMemoryStore(
    index={"dims": 1536, "embed": embed_texts}
)

15.4.4 _batch_search 的四段语义:flatten → score → max-pool → 分页

InMemoryStore 的向量搜索核心逻辑在 _batch_search(memory/init.py:302)。逐段翻译它做什么:

# 阶段 1:展平 (item, vectors) 为 (item, single_vector) 列表
flat_items, flat_vectors = [], []
scoreless = []
for item, vectors in candidates:
    for vector in vectors:
        flat_items.append(item)
        flat_vectors.append(vector)
    if not vectors:
        scoreless.append(item)              # ← 没有 embedding 的 item 单独记下

# 阶段 2:一次性计算所有 (query, vector) 的余弦相似度
scores = _cosine_similarity(query_embedding, flat_vectors)

# 阶段 3:按分数排序
sorted_results = sorted(
    zip(scores, flat_items, strict=False),
    key=lambda x: x[0],
    reverse=True,
)

# 阶段 4:max pooling——同一个 (namespace, key) 只保留分最高的那份
seen = set()
kept = []
for score, item in sorted_results:
    key = (item.namespace, item.key)
    if key in seen:
        continue
    ix = len(seen)
    seen.add(key)
    if ix >= op.offset + op.limit:
        break
    if ix < op.offset:
        continue
    kept.append((score, item))

if scoreless and len(kept) < op.limit:
    # Corner case: fill tail with non-scored items
    kept.extend((None, item) for item in scoreless[: op.limit - len(kept)])

四个设计点值得细读:

1、多字段索引的”每字段一条向量”。如果 index.fields = ["text", "summary"],一个 Item 会产生 两条向量(text 一条、summary 一条)。flat_items 在 flatten 阶段会把同一 Item 塞进去两遍(每个 vector 对应一行)。

2、max pooling 消重。多字段产生的多个向量在分数上互相独立——同一 Item 可能同时以 text_score=0.85 和 summary_score=0.90 两条身份出现。seen set 按 (namespace, key) 去重——只保留得分最高的那一条sorted_results 是降序排的,所以遍历时第一次遇到一个 key 就是最高分,后续全 skip。这就是 “max pooling” 的字面含义——把多向量 collapse 到单 Item 时取最大相似度。

3、offset/limit 在去重之后。注意 ix = len(seen) 统计的是去重后的序号——offset=5 表示”跳过前 5 个不同 Item”,而不是”跳过前 5 个向量”。这是正确的语义(用户写 limit=10 期望拿到 10 个不同 Item),但实现成本比单纯向量排序高——必须遍历完所有 sorted_results 再根据序号决定要不要 keep。

4、scoreless 兜底。没有任何向量的 Item(比如 index.fields=["text"] 但某 Item 没有 text 字段)在 filter 后仍是候选,但没得分。如果用户 limit=10 但有得分的 Item 只有 3 个,就从 scoreless 里补 7 个(score=None)填满。设计理由是”用户显式指定了 limit 就要尽量返回那么多”——哪怕部分 item 没相似度分。这对”先粗筛再人工 rerank”的流程很友好。

15.4.5 _cosine_similarity:numpy 可选 + 纯 Python 回退

向量相似度计算函数(memory/init.py:493)同时提供 numpy 优化路径和纯 Python 兜底:

def _cosine_similarity(X, Y):
    if not Y:
        return []
    if _check_numpy():
        import numpy as np
        X_arr = np.array(X) if not isinstance(X, np.ndarray) else X
        Y_arr = np.array(Y) if not isinstance(Y, np.ndarray) else Y
        X_norm = np.linalg.norm(X_arr)
        Y_norm = np.linalg.norm(Y_arr, axis=1)
        # Avoid division by zero
        mask = Y_norm != 0
        similarities = np.zeros_like(Y_norm)
        similarities[mask] = np.dot(Y_arr[mask], X_arr) / (Y_norm[mask] * X_norm)
        return similarities.tolist()
    # 纯 Python fallback
    similarities = []
    for y in Y:
        dot_product = sum(a * b for a, b in zip(X, y, strict=False))
        norm1 = sum(a * a for a in X) ** 0.5
        norm2 = sum(a * a for a in y) ** 0.5
        similarity = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0
        similarities.append(similarity)
    return similarities

四条教学点:

1、numpy 是可选依赖。LangGraph 的 checkpoint 子包不强依赖 numpy,Store 用懒检测 _check_numpy()(延迟 import 并捕获 ImportError)。这让最小化部署(比如 serverless 场景)可以不装 numpy——代价是向量搜索慢一个数量级,但对开发/测试 InMemoryStore 可忽略。

2、numpy 路径里的 maskmask = Y_norm != 0——把”所有维度为 0 的向量”排除。这些向量不能 normalize(除 0),让它们的相似度直接是 0。纯 Python 路径的 if norm1 > 0 and norm2 > 0 else 0.0 是同样的保护,只是语法看起来不像矩阵运算。

3、zip(X, y, strict=False)。Python 3.10+ 新增 strict 参数——True 时要求两侧长度一致、不等就 raise。这里传 False 是允许长度不同(虽然 embedding 场景下应该永远相等),要在 Python 3.9 兼容性和严格校验之间做权衡。如果你的生产代码要求严格校验,建议改 strict=True 以便在 embedding 维度不匹配时立刻炸出来(而不是默默计算出错误相似度)。

4、双实现路径的维护成本。一份 numpy、一份纯 Python——两条路径必须语义一致。这份代码很短所以好维护,但如果你给自己的系统写类似的双路径,记得写交叉测试:同一组输入、两条路径结果应该在浮点容差内相等。否则会出现”开发环境没装 numpy 结果 A、生产环境装了 numpy 结果 B”的诡异分叉。

15.5 Store 与 Checkpoint 的区别

15.5.1 核心对比

graph TB
    subgraph "Checkpoint(检查点)"
        direction TB
        CP1["线程级别<br/>每个 thread_id 独立"]
        CP2["快照语义<br/>保存完整状态"]
        CP3["自动管理<br/>框架在每步保存"]
        CP4["时间旅行<br/>可回溯到任意步骤"]
        CP5["序列化格式<br/>Channel 值的快照"]
    end

    subgraph "Store(存储)"
        direction TB
        ST1["全局级别<br/>跨所有 thread 共享"]
        ST2["键值语义<br/>namespace + key 定位"]
        ST3["手动管理<br/>节点显式读写"]
        ST4["持久化存储<br/>独立于图执行"]
        ST5["结构化数据<br/>dict[str, Any] 值"]
    end
维度CheckpointStore
作用域单个线程(thread_id)全局跨线程
数据模型Channel 值的完整快照namespace/key/value 键值对
管理方式框架自动保存开发者显式操作
时间语义有序历史(可时间旅行)最新值(无历史)
访问方式get_state() / get_state_history()runtime.store.get/put/search
典型用途对话上下文、中断恢复用户偏好、知识库、跨对话记忆

15.5.2 互补关系

Checkpoint 和 Store 不是替代关系,而是互补的:

def personalized_chat(state: State, runtime: Runtime) -> State:
    # 从 Checkpoint 恢复的状态中获取对话历史(线程内)
    messages = state["messages"]

    # 从 Store 中获取用户偏好(跨线程)
    prefs = runtime.store.get(
        ("users", runtime.context.user_id), "preferences"
    )

    # 结合两者生成响应
    prompt = f"User prefers {prefs.value['style']}" if prefs else ""
    response = llm.invoke(messages + [SystemMessage(content=prompt)])

    # 更新 Store 中的记忆(跨线程持久化)
    runtime.store.put(
        ("users", runtime.context.user_id, "memory"),
        f"conv_{runtime.execution_info.thread_id}",
        {"summary": summarize(messages)}
    )

    return {"messages": [response]}

15.5.3 数据流对比

sequenceDiagram
    participant User as 用户
    participant Graph as 图执行
    participant CP as Checkpoint
    participant Store as Store

    Note over CP: 线程 A 的状态
    User->>Graph: 开始对话(线程 A)
    Graph->>CP: 保存每步状态
    Graph->>Store: 保存用户偏好
    Graph-->>User: 响应

    Note over CP: 线程 B 的状态
    User->>Graph: 新对话(线程 B)
    Graph->>CP: 读取线程 B 的状态(空)
    Graph->>Store: 读取用户偏好(跨线程!)
    Note over Graph: 用户偏好从线程 A 延续到线程 B
    Graph-->>User: 个性化响应

15.6 runtime.store 访问模式

15.6.1 在节点中使用

def my_node(state: State, runtime: Runtime) -> State:
    if runtime.store is None:
        # 没有配置 Store,降级处理
        return {"result": "default"}

    # 读取
    item = runtime.store.get(("config",), "settings")

    # 写入
    runtime.store.put(
        ("results", state["session_id"]),
        "analysis",
        {"score": 0.95, "category": "positive"}
    )

    # 搜索
    similar = runtime.store.search(
        ("knowledge",),
        query=state["question"],
        limit=5
    )

    return {"result": [s.value for s in similar]}

15.6.2 在工具中使用

通过 ToolRuntimeInjectedStore 注解:

from langchain_core.tools import tool
from langgraph.prebuilt import ToolRuntime

@tool
def save_memory(content: str, runtime: ToolRuntime) -> str:
    """保存一段记忆到长期存储"""
    runtime.store.put(
        ("memories",),
        f"mem_{hash(content)}",
        {"content": content, "timestamp": datetime.now().isoformat()}
    )
    return f"Memory saved: {content[:50]}..."

@tool
def recall_memories(query: str, runtime: ToolRuntime) -> str:
    """回忆相关记忆"""
    results = runtime.store.search(("memories",), query=query, limit=3)
    return "\n".join(r.value["content"] for r in results)

15.6.3 Store 配置

Store 在图编译时注入:

from langgraph.store.memory import InMemoryStore

store = InMemoryStore(index={"dims": 1536, "embed": my_embed_fn})

graph = (
    StateGraph(State)
    .add_node("process", process_node)
    .set_entry_point("process")
    .set_finish_point("process")
    .compile(store=store)  # 注入 Store
)

编译后,store 通过 Pregel 的初始化流程被放入 Runtime,最终注入到每个任务的配置中。

15.7 跨线程记忆模式

15.7.1 用户画像累积

def update_user_profile(state: State, runtime: Runtime) -> State:
    user_ns = ("users", runtime.context.user_id)

    # 获取现有画像
    profile = runtime.store.get(user_ns, "profile")
    existing = profile.value if profile else {}

    # 从对话中提取新信息
    new_info = extract_user_info(state["messages"])

    # 合并更新
    merged = {**existing, **new_info}
    runtime.store.put(user_ns, "profile", merged)

    return state

15.7.2 共享知识库

def research_agent(state: State, runtime: Runtime) -> State:
    # 搜索已有研究成果
    existing = runtime.store.search(
        ("research", state["topic"]),
        query=state["question"],
        limit=10
    )

    if relevant := [r for r in existing if r.score and r.score > 0.8]:
        # 复用已有成果
        return {"answer": synthesize(relevant)}

    # 生成新的研究成果
    answer = do_research(state["question"])

    # 保存到共享知识库
    runtime.store.put(
        ("research", state["topic"]),
        f"q_{hash(state['question'])}",
        {"question": state["question"], "answer": answer}
    )

    return {"answer": answer}

15.7.3 命名空间设计模式

graph TB
    subgraph 常见命名空间模式
        U["('users', user_id)"] --> UP["用户画像"]
        U --> UM["('users', user_id, 'memories')"] --> UMI["对话记忆"]
        U --> UH["('users', user_id, 'history')"] --> UHI["历史记录"]

        D["('docs', project_id)"] --> DI["文档索引"]

        K["('knowledge', domain)"] --> KI["知识条目"]

        C["('cache',)"] --> CI["缓存数��"]
    end

15.8 MatchCondition 与 ListNamespacesOp

15.8.1 命名空间查询

ListNamespacesOp 支持复杂的命名空间匹配:

class MatchCondition(NamedTuple):
    match_type: NamespaceMatchType  # "prefix" | "suffix"
    path: NamespacePath             # 可包含 "*" 通配符

class ListNamespacesOp(NamedTuple):
    match_conditions: tuple[MatchCondition, ...] | None = None
    max_depth: int | None = None
    limit: int = 100
    offset: int = 0

示例:

# 列举所有用户命名空间
store.list_namespaces(
    match_conditions=(MatchCondition("prefix", ("users",)),)
)

# 列举所有以 "v1" 结尾的命名空间
store.list_namespaces(
    match_conditions=(MatchCondition("suffix", ("v1",)),)
)

# 组合条件:以 "docs" 开头且以 "draft" 结尾
store.list_namespaces(
    match_conditions=(
        MatchCondition("prefix", ("docs",)),
        MatchCondition("suffix", ("draft",)),
    )
)

15.8.2 SearchOp:自然语言检索 + MongoDB 风格过滤

ListNamespacesOp 之外、Store 还有另一个查询原语 SearchOp章节完全没展开。打开 checkpoint/langgraph/store/base/__init__.py:203

class SearchOp(NamedTuple):
    namespace_prefix: tuple[str, ...]
    filter: dict[str, Any] | None = None
    limit: int = 10
    offset: int = 0
    query: str | None = None                # ← 自然语言查询
    ...

两种独立的检索维度

1. filter 字段——MongoDB 风格过滤算子(源码 docstring 明确列出 6 种):

$eq   Equal to(同直接值比较)
$ne   Not equal to
$gt   Greater than
$gte  Greater than or equal to
$lt   Less than
$lte  Less than or equal to

用法:

store.search(
    namespace_prefix=("documents",),
    filter={
        "status": "active",                # ← 直接值 = $eq
        "priority": {"$gte": 5},           # ← 算子比较
        "type": {"$ne": "draft"},
    }
)

LangGraph 选择 MongoDB 的 $-prefixed 算子语法不是偶然——这是 JSON 数据库(MongoDB、CouchDB、Elasticsearch 的 query DSL)里事实标准。用户对这种语法熟悉度最高、跨存储后端也能复用同一套 filter 语法。PostgresStore / SqliteStore / InMemoryStore 三种 Store 实现都要能接同样的 filter——这个算子抽象就是那层共用 API。

2. query: str 字段——自然语言语义检索

store.search(
    namespace_prefix=("users", "content"),
    query="technical documentation about APIs",   # ← LLM 语义搜索
    limit=20,
)

但 docstring(line 211)写了一个重要警示

Natural language search support depends on your store implementation.

不是所有 Store 都支持——InMemoryStore 必须有配套 embedder 才能做 semantic search;PostgresStore 需要 pgvector 扩展;SqliteStore 需要 sqlite-vec 扩展。用户代码如果依赖 query=... 但用的后端不支持——运行时报错而不是编译期——这是 Store 接口设计的一个”软约束”

filterquery 可以组合——先语义筛选、再精确过滤:

store.search(
    namespace_prefix=("docs",),
    query="machine learning",              # 语义相关
    filter={"year": {"$gte": 2024}},        # 年份 ≥ 2024
    limit=10,
)

这是典型的 “hybrid search”——在向量检索和关键字/元数据过滤之间做交叉。2024-2025 年生产 RAG 系统的主流范式。LangGraph Store 在 API 层就把这个能力内置了。

15.8.3 MatchCondition.path 的通配符 *

章节例子里 MatchCondition("suffix", ("v1",)) 是纯字符串路径。真实 path 类型支持通配符store/base/__init__.py:350 docstring 示例):

MatchCondition(match_type="suffix", path=("cache", "*"))

* 匹配任意单段路径。所以 ("cache", "*") 能匹配 ("cache", "v1")("cache", "v2") 等末尾两段。组合多个 * 和固定段可以表达 docs/*/draft/* 这种结构化匹配。

这让命名空间组织可以采用**“段化 pattern”** 而不是字符串拼接——比如 ("users", user_id, "preferences")f"users/{user_id}/preferences" 更容易过滤:

# 列所有用户的 preferences namespace
store.list_namespaces(
    match_conditions=(
        MatchCondition("prefix", ("users",)),
        MatchCondition("suffix", ("preferences",)),
    )
)

# 列某一用户的所有 namespace
store.list_namespaces(
    match_conditions=(MatchCondition("prefix", ("users", user_id)),)
)

这种 “结构化 path + tuple 段 + 通配符” 的设计让命名空间既有层次性、又便于程序化查询——比纯字符串 key 表达力强得多。

15.8.3-bis _handle_list_namespaces:max_depth 截断 + sorted + offset/limit 的四段管线

list_namespaces 看起来平平无奇,真实实现(memory/init.py:460)是把 4 个相关操作串在一起:

def _handle_list_namespaces(self, op: ListNamespacesOp) -> list[tuple[str, ...]]:
    # 阶段 1:快照所有 namespace key,避免迭代时被并发修改
    all_namespaces = list(self._data.keys())
    namespaces = all_namespaces

    # 阶段 2:按 match_conditions 过滤
    if op.match_conditions:
        namespaces = [
            ns for ns in namespaces
            if all(_does_match(condition, ns) for condition in op.match_conditions)
        ]

    # 阶段 3:max_depth 截断 + 去重
    if op.max_depth is not None:
        namespaces = sorted({ns[: op.max_depth] for ns in namespaces})
    else:
        namespaces = sorted(namespaces)

    # 阶段 4:offset/limit
    return namespaces[op.offset : op.offset + op.limit]

两个值得深看的点:

1、“Avoid collection size changing while iterating” 的注释list(self._data.keys()) 显式把 keys 拷贝成 list——这是为了防止在异步场景下、迭代过程中有其他协程 put/delete 导致 dict changed size during iteration 错误。InMemoryStore 没加锁——它依赖 Python 的 GIL + 这种”快照拷贝”来做并发安全。对于真正高并发场景,应该用 PostgresStore(有数据库级事务隔离)。

2、max_depth 的”截断 + set 去重”。如果你有 100 个命名空间 ("users", "alice"), ("users", "bob"), ..., ("users", "zyy")——全部以 ("users",) 开头。查 max_depth=1 会把所有 ("users", X) 截成 ("users",)——set 自然去重——最终只返回 [("users",)] 一条。这对”只看顶层结构”的场景(比如管理后台列所有用户分类)很好用——不用 load 全部 namespace 就能看到”有哪些一级分类”。

两个阶段的组合拳让 list_namespaces 既是一个”穷举 namespace”的 API,也是一个”做 namespace 结构分析”的 API——看你传不传 max_depth。这种用一个 API 的不同参数表达不同语义的设计,在 SQL 里对应 GROUP BY depth 1 vs SELECT DISTINCT——LangGraph 用 max_depth 一个字段就把两种语义合并了。

15.8.4 _apply_operator 的真实实现:6 个算子、float 强转、JSONB 语义对齐

前面 §15.2.5 和 §15.8.2 列了 6 种 filter 算子($eq/$ne/$gt/$gte/$lt/$lte)。真实实现(memory/init.py:577)是个朴素的 if-elif:

def _apply_operator(value: Any, operator: str, op_value: Any) -> bool:
    """Apply a comparison operator, matching PostgreSQL's JSONB behavior."""
    if operator == "$eq":
        return value == op_value
    elif operator == "$gt":
        return float(value) > float(op_value)
    elif operator == "$gte":
        return float(value) >= float(op_value)
    elif operator == "$lt":
        return float(value) < float(op_value)
    elif operator == "$lte":
        return float(value) <= float(op_value)
    elif operator == "$ne":
        return value != op_value
    else:
        raise ValueError(f"Unknown operator: {operator}")

三条容易被忽略的性质:

1、大小比较强制转 float$gt/$gte/$lt/$lte 四个比较操作先把两边都 float() 再比——意味着只能比数值(int/float)。如果你对字符串或 datetime 写 {"created_at": {"$gt": "2024-01-01"}},会抛 ValueError: could not convert string to float。这是 InMemoryStore 的限制——PostgresStore 的 JSONB 操作符支持字符串排序,跨后端这个差异要注意。

生产代码里的应对:日期比较要么 put 时存成 Unix timestamp、要么只用 PostgresStore。docstring 第 578 行写了 “matching PostgreSQL’s JSONB behavior”——但这句话对 InMemory 来说只对了一半(只对了数值比较这部分)。

2、$eq$ne 用 Python 原生 ==/!=。对嵌套 dict、list 也能比较——但这依赖 Python 的 __eq__ 语义。比如 {"tags": ["a", "b"]}{"tags": ["b", "a"]}不相等的(list 顺序敏感);但 {"a": 1, "b": 2}{"b": 2, "a": 1}相等的(dict 不关心 key 顺序)。这和 JSON 序列化后的等价性一致但不等于”业务等价”——如果你的业务里 ["a", "b"]["b", "a"] 应该视为相等,filter 的 $eq 帮不上忙,得用 $in 或者业务层预先排序。

3、未知算子直接 raise,不是 warn。拼错 $gT(大写 T)会立刻 ValueError——这比静默忽略好。回到 §16 章讲过的”LangGraph opinion:用户错误要被立刻报出、不要静默继续”——这条规则在 Store 的 filter 层同样成立。

_compare_values(memory/init.py:551)还有一层更精彩的设计:它检查 filter_value 是不是 “算子字典”(所有 key 以 $ 开头)——

def _compare_values(item_value: Any, filter_value: Any) -> bool:
    if isinstance(filter_value, dict):
        if any(k.startswith("$") for k in filter_value):
            return all(
                _apply_operator(item_value, op_key, op_value)
                for op_key, op_value in filter_value.items()
            )
        if not isinstance(item_value, dict):
            return False
        return all(
            _compare_values(item_value.get(k), v) for k, v in filter_value.items()
        )
    ...

如果 filter_value 的任一 key 以 $ 开头,整个字典当算子字典处理(所有算子必须都满足);否则当作嵌套字段匹配。这允许你写:

filter={"score": {"$gt": 5, "$lte": 10}}   # 5 < score <= 10

6 < score <= 10 的复合约束在单字段上被 AND 了——靠的是 all(_apply_operator(...) for ... in filter_value.items())。这是一种非常简洁的 DSL 设计——一个 dict 同时承担”值匹配/算子组”两种语义,通过 key 前缀区分。MongoDB 的 filter DSL 也是这个设计,LangGraph 在 InMemoryStore 精确复现了 MongoDB 的这套语义。

15.9 设计决策

15.8.4-bis _check_numpy:懒加载 + 一次性警告 + lru_cache

为什么要专门提这个 6 行小函数(memory/init.py:479)?

@functools.lru_cache(maxsize=1)
def _check_numpy() -> bool:
    if bool(util.find_spec("numpy")):
        return True
    logger.warning(
        "NumPy not found in the current Python environment. "
        "The InMemoryStore will use a pure Python implementation for vector operations, "
        "which may significantly impact performance, especially for large datasets or frequent searches. "
        "For optimal speed and efficiency, consider installing NumPy: "
        "pip install numpy"
    )
    return False

一个极短的函数同时示范了 Python 库开发的三条常见实践:

1、@functools.lru_cache(maxsize=1) 防止重复警告_check_numpy 在每次向量相似度计算时都被调用——没装 numpy 的环境每次都走 else 分支发 warning,用户 log 会被刷爆。lru_cache 把函数的返回值缓存——第一次调用时 log 一次、后续所有调用直接拿缓存返回值不重复执行函数体。这是比 “module-level boolean flag” 更干净的 memoization。

2、util.find_spec("numpy") 做存在性检测不 import不是 try: import numpy except ImportError。find_spec 在不执行 numpy init 代码的情况下就能告诉你 numpy 是否可用——(init numpy 可能要几十 ms)且不污染 sys.modules(真正需要 numpy 时才 import)。生产 Python 库做可选依赖检测应该优先用这个模式。

3、warning 消息给出了行动建议。不是 “NumPy not found”——是 “pip install numpy”。§15.8.4 提过 LangGraph 的错误消息风格——错误/警告都是”下一步指导”,不是事实陈述。读库时看到 logger.warning 里带具体命令,就是 LangGraph 的签名风格之一。

这个小函数和前面 §15.9.4 的 AsyncBatchedBaseStore._runweakref.ref + if s := store() 的 GC 感知模式一起,构成了 LangGraph Store 模块里一系列**“看起来无关紧要但让库在长寿命进程里保持干净”**的小设计。这些设计不会出现在 README、用户也永远不会直接接触——但它们是一个库从”能用”到”可信赖地长期用”的分水岭。

15.8.5 tokenize_path 与 get_text_at_path:index.fields 背后的 mini-DSL

Store 的 index.fields 允许用户写路径表达式指定哪些字段要被向量索引:

store = InMemoryStore(index={
    "dims": 1536,
    "embed": my_embed_fn,
    "fields": [
        "text",                      # 简单字段
        "metadata.title",            # 嵌套字段
        "tags[*]",                   # 数组全部元素
        "authors[0]",                # 指定索引
        "authors[-1]",               # 倒数第一
        "{title,description}",       # 多字段一起索引
    ]
})

这套表达式的解析器 tokenize_path(base/embed.py:333)是个手写的字符扫描器——while i < len(path) 循环、分支处理 [{.、普通字符。90 行代码实现了一个最小 JSONPath 子集

  • field1.field2 → tokenize 成 ["field1", "field2"]
  • tags[*]["tags", "[*]"][*] 作为一个 token 传下去)
  • {title,description}["{title,description}"](花括号内不拆)

取值函数 get_text_at_path(base/embed.py:233)拿这些 tokens 按顺序下潜——遇到 [*] 展开 list 所有元素、遇到 {a,b} 取多个字段都返回。最终产出 list[str],每个字符串作为一条单独的待 embed 文本。

这层抽象解决了**“多字段 + 数组 + 嵌套”混合场景**——("docs",) 下一条 Item 可能是:

{
    "title": "Python for LLM",
    "sections": [
        {"heading": "Intro", "body": "..."},
        {"heading": "Chat Models", "body": "..."},
    ],
    "authors": ["Alice", "Bob"]
}

配置 fields=["title", "sections[*].body"] 就能同时索引 title 和每个 section 的 body——一个 Item 产出 3 条 embedding(1 个 title + 2 个 section body)。_batch_search 里的 max pooling 会在搜索时自动把这 3 条折回单 Item。

这套小 DSL 的价值:用户不用把文档拆成多个 Item 就能分字段索引。PostgresStore 和 SqliteStore 也用同样的 tokenize_path——DSL 在 base/embed.py 一处实现,三种后端都能共用。这是 LangGraph 的典型风格:核心工具函数放 base 子包、后端只专注 I/O 差异

15.9 设计决策

15.9.1 为什么 Store 接口基于 batch?

BaseStore 的核心是 batch 方法,而非独立的 get/put 方法。这个设计的动机是网络效率:在生产环境中,Store 通常是远程服务(如 PostgresStore),每次 RPC 调用都有网络延迟。batch 操作允许将多个读写请求合并为一次网络往返:

# 效率更高的批量操作
results = store.batch([
    GetOp(("users", "alice"), "prefs"),
    GetOp(("users", "alice"), "profile"),
    PutOp(("logs",), "entry_1", {"action": "login"}),
])

便捷方法(getput 等)内部将单个操作包装为长度为 1 的 batch 调用,保持 API 简洁的同时不损失性能优化的可能。

15.9.2 为什么 Item.value 是 dict 而非 Any?

Item.value 固定为 dict[str, Any] 类型,不像 Send.arg 那样接受任意类型。原因有三:

  1. 可过滤性SearchOp.filter 需要按 key 做精确匹配,dict 结构天然支持
  2. 可索引性:向量搜索需要按字段路径提取文本,dict 结构便于字段定位
  3. 序列化安全:dict 可以直接 JSON 序列化,而任意 Python 对象不行

15.9.3 InMemoryStore 的定位

InMemoryStore 是开发和测试的首选,但不适合生产环境——数据在进程退出时丢失。生产环境应使用 PostgresStoreSqliteStore

# 开发
from langgraph.store.memory import InMemoryStore
store = InMemoryStore()

# 生产
from langgraph.store.postgres import PostgresStore
store = PostgresStore(connection_string="postgresql://...")

所有实现都遵循 BaseStore 接口,节点代码无需修改即可切换后端。

15.9.3-bis PostgresStore 的 schema:主表 + 向量表 + ON DELETE CASCADE

为了让”内存 vs 持久化后端”的差别不止停留在口头上,打开 PostgresStore 源码 libs/checkpoint-postgres/langgraph/store/postgres/base.py:62——MIGRATIONS 里能看到它真实的表结构:

-- MIGRATIONS[0]:主表
CREATE TABLE IF NOT EXISTS store (
    prefix text NOT NULL,                                   -- namespace 拼接
    key text NOT NULL,                                      -- Item key
    value jsonb NOT NULL,                                   -- value 存成 jsonb
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (prefix, key)
);

-- MIGRATIONS[1]:前缀索引(text_pattern_ops 让 LIKE 'prefix%' 能走 index)
CREATE INDEX CONCURRENTLY IF NOT EXISTS store_prefix_idx
  ON store USING btree (prefix text_pattern_ops);

-- MIGRATIONS[2]:TTL 支持(后加的)
ALTER TABLE store
ADD COLUMN IF NOT EXISTS expires_at TIMESTAMP WITH TIME ZONE,
ADD COLUMN IF NOT EXISTS ttl_minutes INT;

-- MIGRATIONS[3]:只对 expires_at IS NOT NULL 建 partial index
CREATE INDEX IF NOT EXISTS idx_store_expires_at ON store (expires_at)
WHERE expires_at IS NOT NULL;

-- VECTOR_MIGRATIONS:启用 pgvector 扩展
CREATE EXTENSION IF NOT EXISTS vector;

-- 向量子表(每个字段一行)
CREATE TABLE IF NOT EXISTS store_vectors (
    prefix text NOT NULL,
    key text NOT NULL,
    field_name text NOT NULL,
    embedding vector(1536),    -- 维度由 index_config 决定
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (prefix, key, field_name),
    FOREIGN KEY (prefix, key) REFERENCES store(prefix, key) ON DELETE CASCADE
);

五条设计值得拆开细看:

1、namespace tuple 被拼成 prefix 一列。内存版用的 dict[tuple, ...] 在 Postgres 里要扁平化成一个字符串——tuple ("users", "alice") 变成 "users.alice"(这也就是 §15.3.0 禁止 label 里含 . 的根本原因)。PRIMARY KEY (prefix, key) 保证 namespace+key 的唯一性,和内存版的 _data[namespace][key] 语义等价。

2、prefix text_pattern_ops 索引。Postgres 默认 btree 索引用 locale-aware 比较——对 LIKE 'users.alice%' 这种前缀匹配不能走索引text_pattern_ops 操作符类告诉 Postgres “这列按字节比较、用于 LIKE 前缀匹配时能走 index”。§15.8.1 的 MatchCondition("prefix", (...)) 在 Postgres 后端就对应一条 WHERE prefix LIKE 'users.%'——没有这个操作符类就得全表扫描。

3、TTL 是后加特性(MIGRATIONS[2]/[3])。原始 schema 没有 expires_at,后来才 ALTER 加列——这解释了 §15.2.6 为什么 supports_ttl 默认 False:TTL 是 opt-in 特性、后引入、不是所有实现都支持。同一个 Store 可以在升级到新 LangGraph 时自动跑迁移开启 TTL(靠 MIGRATIONS 自增)。

4、partial index 只索引”有 TTL”的行CREATE INDEX ... WHERE expires_at IS NOT NULL 是 Postgres partial index——不索引 NULL 行。大部分 Store 条目没 TTL(永不过期)——索引它们纯粹是浪费空间和维护成本。只给需要被后台扫描器 sweep 的那些建索引——这类 partial index 是 Postgres 优化器的一条典型实践

5、ON DELETE CASCADE 解决了 §15.4.2-bis 的”僵尸向量”问题。Postgres schema 的 store_vectors 表引用 store 表的 PK——删除主表一行时,向量子表对应行会自动级联删。这比 InMemoryStore 手动在 _apply_put_opsself._vectors[namespace].pop(key, None) 更可靠——由数据库内核保证原子一致性,不会因为中间出错留下半清半不清的状态。

这些 schema 细节解释了”为什么 PostgresStore 在生产更稳”的每一条底层理由。如果未来你要自己写一个 Store 后端(比如 RedisStore),这份 MIGRATIONS 是最好的起点清单——9 条 SQL 换来一个生产级 Store

15.9.4 TTL 与数据清理

Store 支持可选的 TTL(Time-To-Live)机制。GetOpSearchOprefresh_ttl 参数控制读取操作是否刷新 Item 的存活时间。这对于缓存场景特别有用——频繁访问的数据自动延长存活,冷数据自然过期。

stateDiagram-v2
    [*] --> Active: store.put(ttl=3600)
    Active --> Accessed: store.get(refresh_ttl=True)
    Accessed --> Active: TTL 重置
    Active --> Expired: TTL 到期
    Expired --> [*]: 自动清理

15.9.4-bis TTLConfig:3 个字段各控一条语义

TTL 的真实配置结构 TTLConfig(base/init.py:545)是个 TypedDict,只有三个字段:

class TTLConfig(TypedDict, total=False):
    refresh_on_read: bool
    """若 True,读操作(get/search)默认也刷新 TTL。
    可在单次操作上用 refresh_ttl 覆盖。默认 True。"""

    default_ttl: float | None
    """单位是分钟。新 Item 若没显式指定 ttl,用这个默认。
    None 表示不过期。"""

    sweep_interval_minutes: int | None
    """后台扫描清理过期条目的间隔(分钟)。
    None 表示不清理(过期条目仍在表里,但查询时被过滤)。"""

三个字段的设计表达了三条独立但相关的语义:

1、refresh_on_read 控制读是否 sliding TTL。如果一个缓存类场景希望”热数据自动延长 TTL、冷数据自然过期”——就用默认的 refresh_on_read=True。如果是审计日志类场景”创建时间固定、到期必须消失”——设 refresh_on_read=False,读操作不重置计时器。这是 LRU 和 TTL 两种缓存语义的融合点。

2、default_ttl 是”没指定 ttl 的新 Item 用什么默认”。类似 Redis 的 maxttl 配置——防止忘记指定 TTL 导致数据堆积。生产环境建议永远设一个兜底值,比如 default_ttl=60*24*7(7 天)——意味着最宽松情况下数据也不会超过 7 天还存活。这也是 Store 接近”缓存语义”而非”数据库语义”的体现。

3、sweep_interval_minutes 控制”到期后多久被物理删除”。过期 Item 不会在 get() 返回,但行还在表里——直到 sweeper 定期扫到为止。这是 Postgres Store 和 SqliteStore 的实现策略;InMemoryStore 目前不支持真实过期扫描supports_ttl=False(§15.2.6)就是这个原因。

三个字段 + supports_ttl capability flag 合起来构成了一套极简的 TTL 系统——对比 Redis 的几十个 expiration 相关命令和配置,LangGraph Store 选了最小可行的特性集:不支持 sub-second ttl、不支持 conditional expire、不支持 expire on write vs read 的双时钟。这种克制对一个面向 LLM 长期记忆场景的系统是合理的——memory 不该有毫秒级过期需求,让配置表面简洁能降低用户写错概率。

这条”最小可行”和 §15.2.2 Store 只暴露 4 种操作原语是同一套克制哲学:比起给 10 种 API 让用户自己 pick,LangGraph 选择 4 种覆盖 90% 场景的原语 + 一套可组合的 filter DSL。读到这一层,你就能理解整个 Store 模块的审美 ——简单 > 强大,因为强大的东西更难用对。

15.9.4 AsyncBatchedBaseStore:跨 tick 自动聚合的后台批处理

前面讲 Store 的 batch 是”显式批量”——用户写 store.batch([op1, op2, op3]) 才会聚合。LangGraph 还有一个隐式批量类型 AsyncBatchedBaseStore(base/batch.py:58),用于把异步环境下同一个事件循环 tick 内发起的独立 op 自动聚合为一次 abatch。实现值得读:

class AsyncBatchedBaseStore(BaseStore):
    __slots__ = ("_loop", "_aqueue", "_task")

    def __init__(self) -> None:
        super().__init__()
        self._loop = asyncio.get_running_loop()
        self._aqueue: asyncio.Queue[tuple[asyncio.Future, Op]] = asyncio.Queue()
        self._task: asyncio.Task | None = None
        self._ensure_task()

    def _ensure_task(self) -> None:
        if self._task is None or self._task.done():
            self._task = self._loop.create_task(_run(self._aqueue, weakref.ref(self)))

每个 aget/aput/asearch 都把 (future, op) 塞进 _aqueue,后台协程 _run(base/batch.py:326)负责消费:

async def _run(aqueue, store):
    while item := await aqueue.get():
        if item[0].done(): continue
        if s := store():                        # ← 弱引用取回 store
            items = [item]
            try:
                while item := aqueue.get_nowait():  # ← 抽空本 tick 所有 op
                    if item[0].done(): continue
                    items.append(item)
            except asyncio.QueueEmpty:
                pass
            futs = [item[0] for item in items]
            values = [item[1] for item in items]
            listen, dedupped = _dedupe_ops(values)   # ← 跨 future 去重
            results = await s.abatch(dedupped)
            if listen is not None:
                results = [results[ix] for ix in listen]
            for fut, result in zip(futs, results, strict=False):
                if not fut.done():
                    fut.set_result(result)
        else:
            break                                # ← store 被 GC 了,后台 task 自行退出

四条值得看懂的设计:

1、while await aqueue.get(): ... try: while aqueue.get_nowait() 两级抓取。第一次 await get() 阻塞等第一条 op 到达;拿到后立刻用 get_nowait() 在不 await 的情况下把队列里所有剩余的 op 也抓过来。这种 “等 1 个 + 搜刮剩下所有” 是 asyncio 社区里最经典的微批(micro-batching)套路——既不因为太激进批量化而增加延迟,也不因为太保守每次只处理 1 条而失去批量优势。

2、weakref.ref(self) + _ensure_task 的生命周期策略。后台 _run 对 store 用弱引用。如果用户的 store 对象被 GC(比如测试代码局部变量超出作用域),后台协程会 if s := store(): ... else: break 自行退出——不会因为 task 持有强引用导致 store 永远不被释放__del__ 里额外 self._task.cancel() 兜底。这套组合让用户不用操心”调用完 Store 要不要 close”——Python GC 一来就全自动清理。

3、_dedupe_ops 跨 future 去重。同一 tick 内如果有两个业务代码调用 aget(("users","alice"), "prefs")——两条 GetOp 参数完全一样。_dedupe_ops(base/batch.py:300)的做法:

if isinstance(op, (GetOp, SearchOp, ListNamespacesOp)):
    try:
        listen.append(dedupped.index(op))     # ← 已有相同 op,复用
    except ValueError:
        listen.append(len(dedupped))
        dedupped.append(op)
elif isinstance(op, PutOp):
    putkey = (op.namespace, op.key)
    if putkey in puts:
        # Overwrite previous put
        ix = puts[putkey]
        dedupped[ix] = op                     # ← 同 key 多次 put,取最后一个
        listen.append(ix)

读操作(Get/Search/List)看到重复直接复用已有 op 的结果;写操作(Put)看到同 (ns, key) 的两次 Put 把后者覆盖前者listen 数组维护”每个 future 要读 results 的哪个索引”——查询完再按 listen 把结果映射回各自的 future。

这层去重对 agent 场景极其有用——100 个并发 agent 读同一个用户偏好只会产生 1 次后端调用,但每个 agent 都拿到正确的结果。

4、_check_loop 装饰器防死锁。同步接口(如 store.get())如果在自己的事件循环里被调用会阻塞——_check_loop(base/batch.py:33)检测到 asyncio.get_running_loop() is store._loop 就立刻 raise InvalidStateError错误消息直接告诉用户改成 await store.aget(...)——这种”错误消息内置迁移建议”的做法和第 9 章 serde 的 TOOL_CALL_ERROR_TEMPLATE 是一个思路:错误消息不是日志、是下一步指导

这套 AsyncBatchedBaseStore 在 LangGraph Platform 的 PostgresStore 子类里默认启用——用户代码写 await runtime.store.aget(...)、底层自动变成跨并发请求的批量。不改一行业务代码就获得 10-100x 的吞吐提升——这才是 LangGraph Store 最有价值的性能特性。

15.9.4-ter IndexConfig.embed:4 种类型、一套 ensure_embeddings 归一

Store 的 embed 字段定义(base/init.py:590)接受 4 种完全不同形态的输入:

embed: Embeddings | EmbeddingsFunc | AEmbeddingsFunc | str
  1. LangChain 的 Embeddings 类实例(init_embeddings("openai:text-embedding-3-small") 返回的)
  2. 同步函数 (list[str]) -> list[list[float]](直接用 OpenAI SDK 自己写)
  3. 异步函数 (list[str]) -> Awaitable[list[list[float]]](用 AsyncOpenAI 写)
  4. 字符串 provider 标识符("openai:text-embedding-3-small"——LangGraph 自己帮你调 init_embeddings

这四种怎么归一的?ensure_embeddings(embed.py:34)统一把它们包装成 Embeddings 接口:

  • 已经是 Embeddings 实例——原样返回
  • 是同步/异步函数——包成 EmbeddingsLambda(embed.py:109)
  • 是字符串——call langchain.embeddings.init_embeddings("...")——但这会可选依赖 langchain

第 4 种 “provider 字符串” 的好处和陷阱:

好处:用户代码里写 index={"embed": "openai:text-embedding-3-small"}——比自己实例化 embedder 简洁得多。不用 import OpenAI、不用写 API key 管理代码。对”快速原型”场景非常友好。

陷阱langgraph 自己不依赖 langchain-openai。你只装 pip install langgraph 再用 embed="openai:..." 会在运行时ImportError: Could not import langchain_openai。LangGraph 刻意不预装所有 provider 客户端——这是它保持核心包小的代价。生产代码推荐用 1 或 2 的显式实例化,一是避开这个陷阱,二是让依赖关系在 pip install 阶段暴露而不是运行时。

同步 vs 异步函数的选择也有策略:InMemoryStore.batch() 走同步路径——只能用同步 embed;InMemoryStore.abatch() 优先用异步 embed、如果传的是同步函数会用 run_in_executor 包装。如果你的 LLM 调用全在 async loop 里(LangGraph v1.0 的主流方式),请传 async 函数——否则 embed 调用会占线程池 slot、拖慢其他异步任务。

这层”4 种形态统一到一个接口”的设计是 LangGraph 的典型作风——API 表面给用户最大自由,内部实现统一归一。从 Runnable 到 Checkpointer 到 Embeddings,都是这个模式。一旦你掌握这个套路,读 LangGraph 源码就会越来越快——看到一个接口接受多种输入类型,就找”ensure_XXX”函数看怎么归一,90% 的情况都能秒懂。

15.9.5 本章在全书体系中的坐标

回到本书整体脉络——Store 是 LangGraph 四大系统组件(StateGraph、Pregel、Checkpoint、Store)里位置最边缘、但对”应用质量”贡献最大的一个。几个承接关系值得明示:

  • 承接第 7 章(Pregel 调度):Store 通过 Runtime.storeToolRuntime.store 两条通道注入到节点/工具里。注入时机是 Pregel 的 prepare_task 阶段——和 config、context 同批。理解了第 7 章的 task-scoped injection,就能理解”为什么节点看到的 store 是同一个实例”——它不是每次新建的,是 compile 时注入的单例引用传到每个任务。
  • 承接第 13 章(Checkpoint):本章 §15.5 画清了两者分工。Store 的数据生命周期不受 checkpoint 版本影响——你 rollback 到 checkpoint_id=X 不会把 Store 里写过的东西”撤销”。这个设计是刻意的——长期记忆不应该被会话级的时间旅行抹掉
  • 支撑第 16 章(预构建 agent)InjectedStore 注解和 ToolRuntime.store 字段(第 16.6.2/16.6.3 节)都依赖本章 BaseStore 的 API 契约。_inject_tool_args 里对 store 参数的”剥离 + 补回”安全层(第 16.3.6 节)防的就是”LLM 伪造 store”——这条防线的存在预设了 store 是框架受控资源、不是业务数据
  • 铺垫第 17 章(multi-agent):多 agent 架构里最常见的”跨 agent 共享知识”就是 Store 的用武之地。主 agent 把 research 结果 store.put(("knowledge", topic), ...),子 agent store.search(("knowledge",), query=...) 拿结果——Store 是 agent 之间的”共享白板”。这个用法的实现细节在下一章展开,本章打好的 BaseStore 四操作原语(Get/Put/Search/ListNamespaces)就是白板上的”写/读/搜/列”。

这层承接关系不是后话、而是 LangGraph 的核心设计意图。Store、Checkpoint、Pregel 三者在 compile() 时同时进入 Pregel 实例——它们天生就是配套使用的。本章把 Store 单独拎出来讲,是方便知识分层;实际写应用时你几乎总会同时用到三者。

从更宏观的软件架构视角看,Store 是 LangGraph 整个技术栈里层次最高、对业务代码最近的一层——它直接被节点/工具代码调用。Checkpoint 和 Pregel 更多是框架内部机制,用户感知较弱;Store 则是你的业务代码每天都要写的东西。理解本章揭示的 Store 内部的 30 多处工程决策——_validate_namespace 的四条铁律、AsyncBatchedBaseStore 的跨 future 去重、_cosine_similarity 的 numpy 可选路径、PostgresStore 的 ON DELETE CASCADE——能让你在写应用时做出更匹配 Store 本意的 API 调用选择。这种”框架意图驱动应用选型”的能力,是从”会用 LangGraph”到”精通 LangGraph”的关键一跃。

15.10 小结

本章详细分析了 LangGraph 的 Store 长期记忆系统。BaseStore 通过四种操作原语(Get、Put、Search、ListNamespaces)和层级化的命名空间,为 LLM 应用提供了灵活的跨线程持久化能力。InMemoryStore 以简洁的字典实现覆盖了开发测试需求,同时支持可选的向量语义搜索。

Store 与 Checkpoint 的关系是互补的:Checkpoint 管理线程内的状态历史(短期记忆),Store 管理跨线程的持久化数据(长期记忆)。通过 runtime.store 的统一访问接口,节点和工具可以无缝地读写长期记忆,而底层存储后端(内存、SQLite、PostgreSQL)可以透明切换。

batch 接口的设计体现了对生产环境网络效率的重视;dict[str, Any] 的值类型约束确保了可过滤性、可索引性和序列化安全;层级命名空间为多租户和多项目场景提供了自然的组织结构。

下一章我们将进入 LangGraph 的预构建组件层——create_react_agent 工厂函数、ToolNode 实现和 tools_condition 路由——了解框架如何将底层的图构建原语封装为开箱即用的 Agent 架构。


本章源码定位:主要引用来自 langgraph-latest/libs/checkpoint/langgraph/store/ 目录的三个 Python 文件——base/__init__.py(1314 行,定义 BaseStore/Item/Op/TTLConfig/IndexConfig 和校验函数)、base/batch.py(371 行,AsyncBatchedBaseStore 和 _dedupe_ops)、base/embed.py(433 行,ensure_embeddings/tokenize_path/get_text_at_path)、memory/__init__.py(592 行,InMemoryStore 的全部实现)。Postgres 后端细节来自 libs/checkpoint-postgres/langgraph/store/postgres/base.py(MIGRATIONS 和 VECTOR_MIGRATIONS)。所有引用都给了准确的行号锚点——存疑可直接对照核验。

本章和第 8 章(Checkpoint)在系统组件图里是”双持久化”:短期在 Checkpoint、长期在 Store;本章和第 16 章(预构建 agent)、第 17 章(multi-agent)是”对接用户代码”的前置知识——理解了 Store 的四原语 + TTLConfig/IndexConfig 两配置 + AsyncBatchedBaseStore 自动聚合,后面读 create_react_agentstore= 参数传进去做什么才不会停留在”被神秘注入”的模糊状态。生产环境向量搜索通常要关注:pgvector 扩展启用、HNSW/IVF 参数调优、dims 匹配 embedding 模型输出、fields 路径覆盖实际索引的字段——这几个旋钮正是本章反复出现的配置项。