Skip to content

第8章 Checkpoint 持久化

8.1 引言

在前面的章节中,我们深入剖析了 Pregel 执行循环如何驱动图的运行、Channel 如何承载状态的流转、以及任务调度器如何协调节点的并发执行。然而,所有这些精密的运行时机制都面临一个根本性问题:当进程终止、网络中断或用户需要暂停时,图的执行状态将随之消失。

Checkpoint(检查点)机制正是 LangGraph 对这一问题的系统性回答。它不仅仅是简单的"保存与恢复",而是一套精心设计的持久化架构,涵盖了从数据结构定义、序列化协议、版本追踪到多后端存储适配的完整链路。Checkpoint 是 LangGraph 实现中断恢复、人机协作、时间旅行调试等高级特性的基石。

本章将从源码层面深入分析 LangGraph 1.1.6 的 Checkpoint 持久化体系。我们将逐一剖析 Checkpoint 数据结构的每个字段、BaseCheckpointSaver 基类的接口设计、JsonPlusSerializer 的序列化策略、以及 SQLite/Postgres 两种生产级实现的存储方案。理解 Checkpoint 体系不仅有助于正确使用 LangGraph 的持久化功能,更能帮助开发者在面对自定义存储后端、性能优化或调试状态恢复问题时做出正确的架构决策。

要真正理解 Checkpoint 为何如此设计,我们需要先思考状态持久化面临的核心挑战。首先,图的状态不是简单的键值对——它包含 Channel 的值、版本信息和节点的执行进度,这些信息必须作为一个原子单元被保存和恢复。其次,持久化的粒度需要精心选择——太粗会丢失中间状态,太细会拖慢执行速度。最后,序列化必须能够处理 Python 生态中丰富多样的类型系统,从简单的字典到复杂的 Pydantic 模型、从日期时间到 NumPy 数组,都需要可靠地序列化和反序列化。LangGraph 的 Checkpoint 体系对这些挑战给出了优雅而务实的回答。

本章要点

  1. Checkpoint 数据结构:理解 Checkpoint TypedDict 的六个核心字段及其在状态恢复中的作用
  2. CheckpointSaver 基类:掌握 get_tuple/put/list/put_writes 四大核心方法的契约
  3. CheckpointMetadata 与寻址:理解 thread_id/checkpoint_id/checkpoint_ns 三维寻址体系
  4. 序列化体系:深入 JsonPlusSerializer 基于 ormsgpack 的类型安全序列化协议
  5. 持久化模式:区分 sync/async/exit 三种持久化策略的适用场景
  6. 存储实现:对比 InMemory/SQLite/Postgres 三种实现的架构差异

8.2 Checkpoint 数据结构

8.2.1 Checkpoint TypedDict

Checkpoint 是 LangGraph 持久化体系的核心数据结构。它定义在 langgraph.checkpoint.base 模块中,使用 Python 的 TypedDict 来提供类型安全的字典访问:

python
# langgraph/checkpoint/base/__init__.py

class Checkpoint(TypedDict):
    """State snapshot at a given point in time."""

    v: int
    """The version of the checkpoint format. Currently 1."""
    id: str
    """The ID of the checkpoint. Both unique and monotonically increasing."""
    ts: str
    """The timestamp of the checkpoint in ISO 8601 format."""
    channel_values: dict[str, Any]
    """The values of the channels at the time of the checkpoint."""
    channel_versions: ChannelVersions
    """The versions of the channels at the time of the checkpoint."""
    versions_seen: dict[str, ChannelVersions]
    """Map from node ID to map from channel name to version seen."""
    updated_channels: list[str] | None
    """The channels that were updated in this checkpoint."""

每个字段都承担着精确的职责:

v(格式版本):当前 Pregel 层使用的版本号为 4(定义在 pregel/_checkpoint.pyLATEST_VERSION = 4),而 checkpoint 基类中的版本为 2。这个版本号用于在反序列化时进行格式迁移。

id(检查点标识):采用 UUID v6 生成,这是一个关键的设计选择。UUID v6 将时间戳编码到高位,使得 ID 既全局唯一又单调递增。这意味着可以直接通过字符串比较来确定检查点的先后顺序,无需额外的排序字段。

ts(时间戳):ISO 8601 格式的 UTC 时间戳,主要用于调试和审计,不参与排序逻辑。

channel_values(通道值):存储所有 Channel 在该时刻的快照值。这是状态恢复的关键数据。需要注意的是,并非所有 Channel 都会出现在这个字典中——只有在 channel_versions 中有记录(即至少被写入过一次)的 Channel 才会被保存。这种按需保存的策略避免了为未使用的 Channel 浪费存储空间。在 Postgres 实现中,channel_values 并不直接存储在 checkpoints 表中,而是通过 channel_versions 映射到独立的 checkpoint_blobs 表,实现了跨检查点的去重。

channel_versions(通道版本):记录每个 Channel 的当前版本号,用于增量更新判断。版本号的类型由 BaseCheckpointSaver 的泛型参数 V 决定,可以是 intfloatstr。每当一个 Channel 被写入时,其版本号通过 get_next_version 方法递增。这个版本追踪机制是 LangGraph 实现增量持久化的基础——通过比较前后两个检查点的 channel_versions,可以精确知道哪些 Channel 发生了变化,从而只持久化变化的部分。

versions_seen(已见版本):二维映射,记录每个节点上次执行时看到的各 Channel 版本。这是 Pregel 调度器判断"哪些节点需要重新执行"的核心依据。当 channel_versions[channel] > versions_seen[node][channel] 时,说明该 Channel 在节点上次执行之后被更新过,节点需要被重新调度。这个机制确保了图的执行遵循数据驱动的原则——只有当输入数据确实发生变化时,节点才会被触发。

updated_channels:记录在当前检查点中被更新的 Channel 列表。这个字段在 v4 版本中引入,用于优化 prepare_next_tasks 的性能——通过直接查阅被更新的 Channel 列表,避免了遍历所有 Channel 比较版本号的开销。当值为 None 时,回退到全量版本比较。

8.2.2 UUID v6:单调递增的标识符

LangGraph 没有使用标准库的 UUID v4,而是自行实现了 UUID v6。这个选择背后有深思熟虑的工程考量:

python
# langgraph/checkpoint/base/id.py

def uuid6(node=None, clock_seq=None) -> UUID:
    global _last_v6_timestamp
    nanoseconds = time.time_ns()
    timestamp = nanoseconds // 100 + 0x01B21DD213814000
    if _last_v6_timestamp is not None and timestamp <= _last_v6_timestamp:
        timestamp = _last_v6_timestamp + 1
    _last_v6_timestamp = timestamp
    if clock_seq is None:
        clock_seq = random.getrandbits(14)
    if node is None:
        node = random.getrandbits(48)
    time_high_and_time_mid = (timestamp >> 12) & 0xFFFFFFFFFFFF
    time_low_and_version = timestamp & 0x0FFF
    uuid_int = time_high_and_time_mid << 80
    uuid_int |= time_low_and_version << 64
    uuid_int |= (clock_seq & 0x3FFF) << 48
    uuid_int |= node & 0xFFFFFFFFFFFF
    return UUID(int=uuid_int, version=6)

基于 VitePress 构建