LangChain 设计与实现

第12章 回调与可观测性

作者 杨艺韬 · 8,235 字

第12章 回调与可观测性

引言

一个优秀的 AI 应用框架不仅要能正确执行任务,还必须让开发者清晰地了解执行过程中发生了什么。LangChain 的回调系统正是为此而生。它提供了一套精密的事件驱动架构,使得日志记录、性能监控、流式输出、调试追踪等横切关注点可以在不侵入业务逻辑的前提下实现。

回调系统是 LangChain 中最复杂的基础设施之一。它横跨 langchain_core 中的基础定义(callbacks/)和追踪器实现(tracers/),以及 langchain 包中面向具体场景的处理器实现。从基础的标准输出打印,到与 LangSmith 平台的深度集成,再到支撑 astream_events API 的事件流追踪器,回调系统的每一层都经过精心设计。

本章将从回调处理器的 Mixin 层次结构讲起,逐步深入到 CallbackManager 的事件分发机制、Tracer 的运行追踪模型,最终到达事件流的实现细节。

本章要点

  • 回调处理器通过六个 Mixin 类组合而成,支持 LLM、Chain、Tool、Retriever 等全部组件类型
  • CallbackManagerAsyncCallbackManager 负责事件分发,通过 configure 方法实现多源回调合并
  • 运行管理器(RunManager)是绑定到具体 run 的回调触发器,通过 get_child() 实现父子关系传播
  • BaseTracer / _TracerCore 构建了完整的运行追踪模型,维护 run_maporder_map 实现嵌套追踪
  • LangChainTracer 将追踪数据发送到 LangSmith 平台,LogStreamCallbackHandler 和事件流追踪器支撑了 astream_logastream_events API

12.1 回调处理器的 Mixin 架构

12.1.1 六大 Mixin 类

LangChain 的回调处理器采用了精细的 Mixin 组合设计。每个 Mixin 对应一类组件的生命周期事件:

classDiagram
    class LLMManagerMixin {
        +on_llm_new_token(token, chunk, run_id)
        +on_llm_end(response, run_id)
        +on_llm_error(error, run_id)
    }
    class ChainManagerMixin {
        +on_chain_end(outputs, run_id)
        +on_chain_error(error, run_id)
        +on_agent_action(action, run_id)
        +on_agent_finish(finish, run_id)
    }
    class ToolManagerMixin {
        +on_tool_end(output, run_id)
        +on_tool_error(error, run_id)
    }
    class RetrieverManagerMixin {
        +on_retriever_end(documents, run_id)
        +on_retriever_error(error, run_id)
    }
    class CallbackManagerMixin {
        +on_llm_start(serialized, prompts, run_id)
        +on_chat_model_start(serialized, messages, run_id)
        +on_chain_start(serialized, inputs, run_id)
        +on_tool_start(serialized, input_str, run_id)
        +on_retriever_start(serialized, query, run_id)
    }
    class RunManagerMixin {
        +on_text(text, run_id)
        +on_retry(retry_state, run_id)
        +on_custom_event(name, data, run_id)
    }

    class BaseCallbackHandler {
        +raise_error: bool
        +run_inline: bool
        +ignore_llm: bool
        +ignore_chain: bool
        +ignore_agent: bool
        +ignore_retriever: bool
        +ignore_chat_model: bool
        +ignore_custom_event: bool
    }

    LLMManagerMixin <|-- BaseCallbackHandler
    ChainManagerMixin <|-- BaseCallbackHandler
    ToolManagerMixin <|-- BaseCallbackHandler
    RetrieverManagerMixin <|-- BaseCallbackHandler
    CallbackManagerMixin <|-- BaseCallbackHandler
    RunManagerMixin <|-- BaseCallbackHandler

这个设计的精妙之处在于:

  1. CallbackManagerMixin 包含所有 on_*_start 方法 -- 这些是"启动事件",只有 CallbackManager 层级才需要处理
  2. 组件特定 Mixin(LLM/Chain/Tool/Retriever)包含对应的 on_*_endon_*_error 方法 -- 这些是"完成事件"
  3. RunManagerMixin 包含通用的 on_texton_retryon_custom_event -- 这些可以在任何运行阶段触发

值得注意的是 on_chat_model_starton_llm_start 的关系。on_chat_model_start 默认抛出 NotImplementedError,这是一种故意的设计:

class CallbackManagerMixin:
    def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs):
        # NotImplementedError is thrown intentionally
        # Callback handler will fall back to on_llm_start
        msg = f"{self.__class__.__name__} does not implement `on_chat_model_start`"
        raise NotImplementedError(msg)

事件分发系统会捕获这个异常,并自动将消息转换为字符串后回退到 on_llm_start。这保证了只实现了 on_llm_start 的旧处理器也能处理 Chat Model 的事件。

12.1.2 BaseCallbackHandler 的控制属性

BaseCallbackHandler 通过一系列 ignore_* 属性提供了精细的事件过滤能力:

class BaseCallbackHandler(
    LLMManagerMixin, ChainManagerMixin, ToolManagerMixin,
    RetrieverManagerMixin, CallbackManagerMixin, RunManagerMixin
):
    raise_error: bool = False       # 回调异常时是否传播
    run_inline: bool = False        # 是否在主线程内联执行

    @property
    def ignore_llm(self) -> bool: return False
    @property
    def ignore_chain(self) -> bool: return False
    @property
    def ignore_agent(self) -> bool: return False
    @property
    def ignore_retriever(self) -> bool: return False
    @property
    def ignore_chat_model(self) -> bool: return False
    @property
    def ignore_custom_event(self) -> bool: return False

raise_errorrun_inline 是两个关键的控制标志:

12.1.3 AsyncCallbackHandler

AsyncCallbackHandler 继承自 BaseCallbackHandler,将所有事件方法重新声明为 async

class AsyncCallbackHandler(BaseCallbackHandler):
    async def on_llm_start(self, serialized, prompts, *, run_id, **kwargs) -> None: ...
    async def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs) -> Any:
        raise NotImplementedError(...)
    async def on_llm_new_token(self, token, *, run_id, **kwargs) -> None: ...
    async def on_chain_start(self, serialized, inputs, *, run_id, **kwargs) -> None: ...
    # ... 其他事件方法

12.2 BaseCallbackManager:回调管理的基石

12.2.1 数据结构

BaseCallbackManager 管理两层处理器列表和元数据:

class BaseCallbackManager(CallbackManagerMixin):
    def __init__(self, handlers, inheritable_handlers=None, parent_run_id=None,
                 *, tags=None, inheritable_tags=None,
                 metadata=None, inheritable_metadata=None):
        self.handlers: list[BaseCallbackHandler] = handlers
        self.inheritable_handlers: list[BaseCallbackHandler] = inheritable_handlers or []
        self.parent_run_id: UUID | None = parent_run_id
        self.tags = tags or []
        self.inheritable_tags = inheritable_tags or []
        self.metadata = metadata or {}
        self.inheritable_metadata = inheritable_metadata or {}

"可继承"机制是回调系统的核心设计之一。当一个 Chain 调用子 Chain 时,父级的 inheritable_handlersinheritable_tagsinheritable_metadata 会自动传递给子级。这确保了顶层设置的追踪器能够捕获整个调用树中的所有事件。

flowchart TD
    subgraph "父 CallbackManager"
        PH["handlers: H1, H2"]
        PIH["inheritable_handlers: H1"]
        PT["tags: tag_a, tag_b"]
        PIT["inheritable_tags: tag_a"]
    end

    subgraph "子 CallbackManager -- 通过 get_child 创建"
        CH["handlers: H1 -- 来自继承"]
        CIH["inheritable_handlers: H1 -- 来自继承"]
        CT["tags: tag_a -- 来自继承"]
        CIT["inheritable_tags: tag_a -- 来自继承"]
    end

    PIH --> CH
    PIH --> CIH
    PIT --> CT
    PIT --> CIT

12.2.2 处理器管理方法

def add_handler(self, handler, inherit=True):
    if handler not in self.handlers:
        self.handlers.append(handler)
    if inherit and handler not in self.inheritable_handlers:
        self.inheritable_handlers.append(handler)

def remove_handler(self, handler):
    if handler in self.handlers:
        self.handlers.remove(handler)
    if handler in self.inheritable_handlers:
        self.inheritable_handlers.remove(handler)

def merge(self, other):
    combined_handlers = list(set(self.handlers) | set(other.handlers))
    combined_inheritable = list(set(self.inheritable_handlers) | set(other.inheritable_handlers))
    return self.__class__(
        handlers=combined_handlers,
        inheritable_handlers=combined_inheritable,
        tags=list(set(self.tags + other.tags)),
        inheritable_tags=list(set(self.inheritable_tags + other.inheritable_tags)),
        metadata={**self.metadata, **other.metadata},
        inheritable_metadata={**self.inheritable_metadata, **other.inheritable_metadata},
    )

merge 方法使用集合运算去重,这是处理多个回调源合并时避免重复触发的关键。

12.3 CallbackManager:事件分发中枢

12.3.1 configure 方法

CallbackManager.configure 是整个回调系统的入口点,负责从多个来源合并回调配置:

flowchart TD
    A["CallbackManager.configure(callbacks, inheritable_callbacks,
    verbose, tags, inheritable_tags, metadata, inheritable_metadata)"]
    A --> B{是否有 tracing 环境变量?}
    B -->|是| C[添加 LangChainTracer]
    B --> D[合并 callbacks 和 inheritable_callbacks]
    C --> D
    D --> E{verbose 为 True?}
    E -->|是| F[添加 StdOutCallbackHandler]
    E --> G[合并 tags 和 metadata]
    F --> G
    G --> H[返回配置好的 CallbackManager]

这个方法的设计体现了"约定优于配置"的原则:

12.3.2 事件分发机制

事件分发通过 handle_eventahandle_event 两个函数实现。同步版本的核心逻辑:

def handle_event(handlers, event_name, ignore_condition_name, *args, **kwargs):
    coros = []
    message_strings = None

    for handler in handlers:
        try:
            if ignore_condition_name is None or not getattr(handler, ignore_condition_name):
                event = getattr(handler, event_name)(*args, **kwargs)
                if asyncio.iscoroutine(event):
                    coros.append(event)
        except NotImplementedError as e:
            if event_name == "on_chat_model_start":
                if message_strings is None:
                    message_strings = [get_buffer_string(m) for m in args[1]]
                handle_event([handler], "on_llm_start", "ignore_llm",
                             args[0], message_strings, *args[2:], **kwargs)
            else:
                logger.warning("NotImplementedError in %s.%s",
                               handler.__class__.__name__, event_name)
        except Exception as e:
            logger.warning("Error in %s.%s: %s",
                           handler.__class__.__name__, event_name, repr(e))
            if handler.raise_error:
                raise

    if coros:
        try:
            asyncio.get_running_loop()
            loop_running = True
        except RuntimeError:
            loop_running = False

        if loop_running:
            _executor().submit(copy_context().run, _run_coros, coros).result()
        else:
            _run_coros(coros)

这段代码有几个值得注意的设计决策:

  1. NotImplementedError 的优雅回退:当 on_chat_model_start 未实现时,自动将消息转换为字符串后调用 on_llm_start。这保证了向后兼容性。

  2. 异步处理器的同步兼容:当在同步上下文中遇到异步处理器产生的协程时,会检测是否有运行中的事件循环。如果有,则将协程提交到线程池执行(避免死锁);如果没有,则直接运行协程。

  3. 错误隔离:默认情况下回调异常只记录日志不传播,除非 handler.raise_error = True

异步版本 ahandle_event 的处理策略有所不同:

async def ahandle_event(handlers, event_name, ignore_condition_name, *args, **kwargs):
    # 先内联执行标记为 run_inline 的处理器
    for handler in [h for h in handlers if h.run_inline]:
        await _ahandle_event_for_handler(
            handler, event_name, ignore_condition_name, *args, **kwargs)
    # 其余处理器并发执行
    await asyncio.gather(*(
        _ahandle_event_for_handler(
            handler, event_name, ignore_condition_name, *args, **kwargs)
        for handler in handlers if not handler.run_inline
    ))

run_inline 处理器优先执行并阻塞等待完成,然后其余处理器通过 asyncio.gather 并发执行。

12.3.3 RunManager:绑定运行上下文的触发器

CallbackManager.on_chain_start 被调用时,它会创建一个 CallbackManagerForChainRun 实例,该实例绑定了当前运行的 run_id

class BaseRunManager(RunManagerMixin):
    def __init__(self, *, run_id, handlers, inheritable_handlers,
                 parent_run_id=None, tags=None, inheritable_tags=None,
                 metadata=None, inheritable_metadata=None):
        self.run_id = run_id
        self.handlers = handlers
        self.inheritable_handlers = inheritable_handlers
        self.parent_run_id = parent_run_id
        self.tags = tags or []
        self.inheritable_tags = inheritable_tags or []
        self.metadata = metadata or {}
        self.inheritable_metadata = inheritable_metadata or {}

每个组件类型都有对应的 RunManager:

classDiagram
    class BaseRunManager {
        +run_id: UUID
        +handlers: list
        +get_child(tag) CallbackManager
    }
    class CallbackManagerForLLMRun {
        +on_llm_new_token(token)
        +on_llm_end(response)
        +on_llm_error(error)
    }
    class CallbackManagerForChainRun {
        +on_chain_end(outputs)
        +on_chain_error(error)
        +on_agent_action(action)
        +on_agent_finish(finish)
        +get_child(tag) CallbackManager
    }
    class CallbackManagerForToolRun {
        +on_tool_end(output)
        +on_tool_error(error)
    }
    class CallbackManagerForRetrieverRun {
        +on_retriever_end(documents)
        +on_retriever_error(error)
    }

    BaseRunManager <|-- CallbackManagerForLLMRun
    BaseRunManager <|-- CallbackManagerForChainRun
    BaseRunManager <|-- CallbackManagerForToolRun
    BaseRunManager <|-- CallbackManagerForRetrieverRun

get_child 方法是嵌套调用链追踪的关键 -- 它创建一个新的 CallbackManager,将当前 run_id 设为新管理器的 parent_run_id

def get_child(self, tag=None):
    manager = CallbackManager(handlers=[], parent_run_id=self.run_id)
    manager.set_handlers(self.inheritable_handlers)
    manager.add_tags(self.inheritable_tags)
    manager.add_metadata(self.inheritable_metadata)
    if tag:
        manager.add_tags([tag], inherit=False)
    return manager

12.4 内置 Handler 实现

12.4.1 StdOutCallbackHandler

最简单的内置处理器,用于将执行过程输出到标准输出:

class StdOutCallbackHandler(BaseCallbackHandler):
    def __init__(self, color=None):
        self.color = color

    def on_chain_start(self, serialized, inputs, **kwargs):
        if "name" in kwargs:
            name = kwargs["name"]
        elif serialized:
            name = serialized.get("name", serialized.get("id", ["<unknown>"])[-1])
        else:
            name = "<unknown>"
        print(f"\n\n\033[1m> Entering new {name} chain...\033[0m")

    def on_chain_end(self, outputs, **kwargs):
        print("\n\033[1m> Finished chain.\033[0m")

    def on_agent_action(self, action, color=None, **kwargs):
        print_text(action.log, color=color or self.color)

    def on_tool_end(self, output, color=None,
                    observation_prefix=None, llm_prefix=None, **kwargs):
        output = str(output)
        if observation_prefix is not None:
            print_text(f"\n{observation_prefix}")
        print_text(output, color=color or self.color)

它使用 ANSI 转义码实现了粗体和颜色输出,在终端中提供了直观的调用链可视化。当 Chain.verbose = True 时,StdOutCallbackHandler 会被 CallbackManager.configure 自动注入。

12.4.2 StreamingStdOutCallbackHandler

专为流式输出设计的处理器,核心是 on_llm_new_token

class StreamingStdOutCallbackHandler(BaseCallbackHandler):
    def on_llm_new_token(self, token, **kwargs):
        sys.stdout.write(token)
        sys.stdout.flush()

每当 LLM 产生一个新 token 时,立即写入标准输出并刷新缓冲区。sys.stdout.flush() 确保 token 即时显示而非等到缓冲区满。

12.4.3 FileCallbackHandler 与第三方集成

langchain_classic/callbacks/file.py 中的 FileCallbackHandler 将事件输出到文件,适合生产环境的日志记录。langchain_classic/callbacks/ 目录下还有大量第三方集成处理器,包括 wandb_callback.py(Weights & Biases)、mlflow_callback.py(MLflow)、arize_callback.py(Arize)等,覆盖了主流的 ML 可观测性平台。

12.5 Tracer 系统:结构化运行追踪

12.5.1 _TracerCore:追踪的内核

Tracer 是回调系统中最复杂的部分。_TracerCore 定义在 langchain_core/tracers/core.py 中,是所有追踪器的内部基础:

class _TracerCore(ABC):
    def __init__(self, *, _schema_format="original", **kwargs):
        self._schema_format = _schema_format
        self.run_map: dict[str, Run] = {}     # run_id -> Run 对象
        self.order_map: dict[UUID, tuple[UUID, str]] = {}  # run_id -> (trace_id, dotted_order)

_TracerCore 维护两个关键映射:

_start_trace 方法展示了 dotted_order 的构建逻辑:

def _start_trace(self, run):
    current_dotted_order = run.start_time.strftime("%Y%m%dT%H%M%S%fZ") + str(run.id)
    if run.parent_run_id:
        if parent := self.order_map.get(run.parent_run_id):
            run.trace_id, run.dotted_order = parent
            run.dotted_order += "." + current_dotted_order
            if parent_run := self.run_map.get(str(run.parent_run_id)):
                self._add_child_run(parent_run, run)
        else:
            # 父运行未找到,当作根运行处理
            run.parent_run_id = None
            run.trace_id = run.id
            run.dotted_order = current_dotted_order
    else:
        run.trace_id = run.id
        run.dotted_order = current_dotted_order
    self.order_map[run.id] = (run.trace_id, run.dotted_order)
    self.run_map[str(run.id)] = run

dotted_order 是一种类似文件路径的嵌套标识,格式如 20240101T120000000000Z<root_id>.20240101T120001000000Z<child_id>。它同时编码了时间顺序和嵌套结构,使得运行日志可以按自然顺序排列。

flowchart TD
    subgraph "run_map 和 order_map 的维护"
        START["on_xxx_start 被调用"] --> CREATE[创建 Run 对象]
        CREATE --> CALC[计算 dotted_order]
        CALC --> CHECK{有 parent_run_id?}
        CHECK -->|是| LOOKUP[从 order_map 查找父运行]
        LOOKUP --> FOUND{找到?}
        FOUND -->|是| CHAIN_ORDER[在父 dotted_order 后追加]
        FOUND -->|否| ROOT[当作根运行]
        CHECK -->|否| ROOT
        CHAIN_ORDER --> STORE[存入 run_map 和 order_map]
        ROOT --> STORE
        STORE --> END_LATER["on_xxx_end 被调用"]
        END_LATER --> PERSIST{是根运行?}
        PERSIST -->|是| SAVE["调用 _persist_run 持久化"]
        PERSIST -->|否| SKIP[跳过持久化]
        SAVE --> CLEANUP[从 run_map 中移除]
        SKIP --> CLEANUP
    end

12.5.2 BaseTracer:同步追踪器

BaseTracer 继承自 _TracerCoreBaseCallbackHandler,将追踪逻辑与回调接口对接:

class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
    @abstractmethod
    def _persist_run(self, run: Run) -> None:
        """Persist a run."""

    def _start_trace(self, run):
        super()._start_trace(run)
        self._on_run_create(run)

    def _end_trace(self, run):
        if not run.parent_run_id:
            self._persist_run(run)
        self.run_map.pop(str(run.id))
        self._on_run_update(run)

    def on_chain_start(self, serialized, inputs, *, run_id, **kwargs):
        chain_run = self._create_chain_run(
            serialized=serialized, inputs=inputs, run_id=run_id, **kwargs
        )
        self._start_trace(chain_run)
        self._on_chain_start(chain_run)
        return chain_run

    def on_chain_end(self, outputs, *, run_id, inputs=None, **kwargs):
        chain_run = self._complete_chain_run(
            outputs=outputs, run_id=run_id, inputs=inputs
        )
        self._end_trace(chain_run)
        self._on_chain_end(chain_run)
        return chain_run

注意 _end_trace 中的关键判断:只有根运行(not run.parent_run_id)才调用 _persist_run。这是因为根运行的 child_runs 列表中已经包含了所有子运行,持久化根运行就持久化了整棵调用树。

12.5.3 AsyncBaseTracer:异步追踪器

AsyncBaseTracer 的关键差异在于使用 asyncio.gather 并发执行追踪操作和通知:

class AsyncBaseTracer(_TracerCore, AsyncCallbackHandler, ABC):
    async def on_chain_start(self, serialized, inputs, *, run_id, **kwargs):
        chain_run = self._create_chain_run(...)
        tasks = [self._start_trace(chain_run), self._on_chain_start(chain_run)]
        await asyncio.gather(*tasks)

    async def on_chain_end(self, outputs, *, run_id, **kwargs):
        chain_run = self._complete_chain_run(...)
        tasks = [self._end_trace(chain_run), self._on_chain_end(chain_run)]
        await asyncio.gather(*tasks)

_start_trace_on_chain_start 同时执行,以及 _end_trace_on_chain_end 同时执行 -- 这种并发设计最大化了异步追踪的效率。但注意源码中的注释:"No _on_[run_type]_start callback should depend on operations in _start_trace" -- 两者之间不能有依赖关系。

12.5.4 Run 模型的数据结构

_TracerCore 中创建 Run 对象的方法揭示了运行记录的完整结构:

def _create_chain_run(self, serialized, inputs, run_id,
                       tags=None, parent_run_id=None, metadata=None,
                       run_type=None, name=None, **kwargs):
    start_time = datetime.now(timezone.utc)
    if metadata:
        kwargs.update({"metadata": metadata})
    return Run(
        id=run_id,
        parent_run_id=parent_run_id,
        serialized=serialized,
        inputs=self._get_chain_inputs(inputs),
        extra=kwargs,
        events=[{"name": "start", "time": start_time}],
        start_time=start_time,
        child_runs=[],
        run_type=run_type or "chain",
        name=name,
        tags=tags or [],
    )

一个 Run 对象包含:运行标识(id/parent_run_id/trace_id)、序列化的组件信息、输入输出、时间戳、事件列表、子运行列表、运行类型和标签。这些信息足以完整还原一次调用链的执行过程。

12.6 LangChainTracer:LangSmith 集成

LangChainTracer 定义在 langchain_core/tracers/langchain.py 中,是连接 LangChain 应用与 LangSmith 平台的桥梁。它继承自 BaseTracer,通过 langsmith.Client 将运行数据发送到 LangSmith 服务器。

flowchart LR
    subgraph LangChain 应用
        CHAIN[Chain 或 Runnable 执行]
        CM[CallbackManager]
        LCT[LangChainTracer]
    end

    subgraph LangSmith 平台
        API[LangSmith API]
        UI[LangSmith Dashboard]
    end

    CHAIN -->|事件| CM
    CM -->|分发| LCT
    LCT -->|"_persist_run"| API
    API --> UI

当设置 LANGCHAIN_TRACING_V2=true 环境变量时,CallbackManager.configure 会自动创建 LangChainTracer 实例并添加到处理器列表中。追踪数据通过 langsmith.Client 异步发送到 LangSmith 服务器。

LangChainTracer 还处理了 token 使用量的聚合,从 LLM 响应的 generations 结构中提取 usage_metadata 并累加,为成本监控提供基础数据。

12.7 事件流:astream_events 与 astream_log

12.7.1 LogStreamCallbackHandler 与 RunLog

astream_log API 背后是 LogStreamCallbackHandler,它继承自 BaseTracer,将运行日志转化为 JSON Patch 格式的增量更新流:

class LogEntry(TypedDict):
    id: str                          # 子运行 ID
    name: str                        # 运行对象名称
    type: str                        # 运行类型 (prompt/chain/llm 等)
    tags: list[str]                  # 标签
    metadata: dict[str, Any]         # 元数据
    start_time: str                  # ISO-8601 开始时间
    streamed_output_str: list[str]   # LLM token 流
    streamed_output: list[Any]       # 输出块流
    final_output: Any | None         # 最终输出
    end_time: str | None             # ISO-8601 结束时间

RunLog 是一个以 JSON Patch 操作驱动的不可变数据结构。客户端通过连续应用 patch 操作即可重建完整的运行日志,这种设计特别适合流式传输场景,因为只需要发送增量变化而非完整状态。

两层类型结构tracers/log_stream.py:115 和 :168):

class RunLogPatch:
    ops: list[dict[str, Any]]  # RFC 6902 JSON Patch 操作列表

    def __init__(self, *ops): self.ops = list(ops)

    def __add__(self, other):
        if type(other) is RunLogPatch:
            ops = self.ops + other.ops
            state = jsonpatch.apply_patch(None, copy.deepcopy(ops))  # 从空起算
            return RunLog(*ops, state=state)
        raise TypeError(...)

class RunLog(RunLogPatch):
    state: RunState  # 应用 ops 后的当前快照

    def __add__(self, other):
        if type(other) is RunLogPatch:
            ops = self.ops + other.ops
            state = jsonpatch.apply_patch(self.state, other.ops)     # 增量应用
            return RunLog(*ops, state=state)

RunLog = RunLogPatch + state 缓存。两个类的关系看似复杂、实际是 "未物化的增量" vs "已物化的快照" 分层:

__add__ 的两种分派路径揭示性能 trick

服务端在长 chain 里累积 patch 时走第二条路径——每次新 patch 的应用代价只和增量大小成正比、不与历史长度成正比。如果每次都从 ops 列表重放,一个跑 100 步的 chain 最后一个 patch 要重放前 99 步,复杂度从 O(n) 变 O(n²)。两层类就是为了让这 O(n²) 退化不发生。

两个隐蔽细节

  1. copy.deepcopy(ops) 的深拷贝(line 150):jsonpatch 库在应用 patch 时会把 ops 里的 value 对象引用进结果——不深拷贝的话,后续对 state 的变更会反射到原始 ops 列表。Streaming 场景里这会导致"已经发出去的 patch"被后续计算改写、客户端重建出错。

  2. __hash__ = None(line 165):显式禁掉哈希能力,尽管结构看起来像值类型。这是刻意的——RunLog 在管线里频繁通过 += 累加(实际产出新对象),用户如果误把它塞 set/dict 会拿到过时快照。Python 的默认做法是"有 __eq__ 就可哈希",显式赋 None 是 opt-out 惯用法,防误用。

jsonpatch 是真实依赖,不是比喻——RFC 6902 的六种 op 原样走:

{"op": "add",     "path": "/logs/prompt-0", "value": {...}}
{"op": "replace", "path": "/streamed_output/-", "value": "token"}
{"op": "remove",  "path": "/logs/prompt-0/final_output"}

客户端用任何 RFC 6902 兼容库(JS 侧 fast-json-patch、Go 侧 evanphx/json-patch)就能重建状态——语言中立的流式协议。这也是为什么 LangSmith/LangGraph Studio 的前端能用 JS 重建整个 chain 状态:服务端只发 ops、前端本地累积。

12.7.2 事件流追踪器

astream_events API 由 langchain_core/tracers/event_stream.py 中的追踪器支撑。它将运行事件标准化为 StreamEvent 格式:

class RunInfo(TypedDict):
    name: str                    # 运行名称
    tags: list[str]              # 标签
    metadata: dict[str, Any]     # 元数据
    run_type: str                # 运行类型
    inputs: NotRequired[Any]     # 输入
    parent_run_id: UUID | None   # 父运行 ID

astream_events 提供了比 astream_log 更高层次的抽象。事件类型包括 on_chain_starton_chain_streamon_chain_end 等标准事件,以及 on_custom_event 自定义事件。开发者可以用直观的过滤条件(如 include_namesinclude_typesinclude_tags)选择感兴趣的事件。

12.7.3 _schema_format 的角色

_TracerCore 构造函数中的 _schema_format 参数控制了输入输出的序列化格式:

def _get_chain_inputs(self, inputs):
    if self._schema_format in {"original", "original+chat"}:
        return inputs if isinstance(inputs, dict) else {"input": inputs}
    if self._schema_format == "streaming_events":
        return {"input": inputs}

这三种格式的存在反映了 LangChain 在保持向后兼容性的同时不断演进追踪格式的策略。

12.8 回调在组件中的集成模式

12.8.1 标准集成模式

回调在所有组件中遵循统一的集成模式:

# 1. 合并配置
callback_manager = CallbackManager.configure(
    callbacks, self.callbacks, self.verbose,
    tags, self.tags, metadata, self.metadata
)

# 2. 触发开始事件,获得 RunManager
run_manager = callback_manager.on_chain_start(None, inputs, name=run_name)

# 3. 执行业务逻辑,传递 RunManager
try:
    outputs = self._call(inputs, run_manager=run_manager)
except BaseException as e:
    run_manager.on_chain_error(e)
    raise

# 4. 触发结束事件
run_manager.on_chain_end(outputs)

12.8.2 子调用的回调传播

当子组件需要调用时,通过 run_manager.get_child() 创建子级 CallbackManager:

# SequentialChain._call 中:
for chain in self.chains:
    callbacks = _run_manager.get_child()
    outputs = chain(known_values, return_only_outputs=True, callbacks=callbacks)

这确保了父子运行之间的追踪关系被正确建立。在 Runnable 体系中,同样的传播通过 RunnableConfig 中的 callbacks 字段实现:

docs = self.retriever.invoke(
    question, config={"callbacks": run_manager.get_child()}
)

12.9 设计决策分析

12.9.1 为什么使用 Mixin 而非单一基类

Mixin 设计允许不同的 RunManager 类型只混入需要的事件方法。CallbackManagerForLLMRun 只需要 LLMManagerMixin 的方法,不需要 ChainManagerMixin。这避免了在不相关的上下文中暴露无意义的方法,提供了更清晰的类型安全。

12.9.2 为什么默认吞掉回调异常

回调系统是一种横切关注点,它的故障不应影响主业务流程。在生产环境中,一个监控系统的临时故障不应导致用户请求失败。raise_error = False 的默认值体现了"可观测性不应影响可用性"的设计理念。

12.9.3 为什么需要可继承与不可继承的区分

考虑这样的场景:用户为顶层 Chain 设置了一个全局追踪器(应继承给所有子调用),同时也设置了一个只在当前层生效的调试处理器(不应继承)。inheritable_handlershandlers 的分离正是为了支持这种灵活性。标签和元数据也有同样的区分,使得追踪树中的每一层都可以有自己的局部标注。

12.9.4 为什么 Tracer 只在根运行持久化

根运行包含了完整的调用树(通过 child_runs 列表递归嵌套),因此只需持久化一次即可保存全部信息。这种设计避免了大量的重复写入操作,同时也保证了追踪数据的原子性 -- 要么完整保存一棵调用树,要么不保存。

12.9.5 langchain_core/callbacks/ + tracers/ 真实尺寸:4677 + 5057 行

langchain_core两个目录、19 个文件、合计 9734 行——

callbacks/(4677 行)——

文件 角色
manager.py 2697 CallbackManager / AsyncCallbackManager / RunManager 一家——本目录最大、占 58%
base.py 1157 6 个 Mixin + BaseCallbackHandler + AsyncCallbackHandler + BaseCallbackManager40 个 on_* 方法(sync + async 双轨)
file.py 267 FileCallbackHandler——日志写文件
streaming_stdout.py 152 流式 stdout
usage.py 149 UsageMetadataCallbackHandler——token 计费聚合
__init__.py 132 public 导出
stdout.py 123 StdOutCallbackHandler

tracers/(5057 行)——

文件 角色
event_stream.py 1100 astream_events 的实现——本目录最大
base.py 937 BaseTracer + AsyncBaseTracer——同步/异步追踪器双层
log_stream.py 769 astream_log 的实现——RunLog patch 流
core.py 705 _TracerCore——run_map / order_map 双索引、运行树管理
langchain.py 400 LangChainTracer——把 Run 持久化到 LangSmith
evaluation.py 226 EvaluationTracer——给评估框架用
stdout.py / context.py 205 / 205 文本输出 / contextvars 注入
memory_stream.py 148 跨 task 的 in-memory queue
root_listeners.py / _compat.py / __init__.py / run_collector.py / _streaming.py / schemas.py ≤ 130 薄壳 + 兼容

三条非显然的物理事实——

  1. manager.py 单文件 2697 行占 callbacks 目录 58%——把同步、异步、Run、AsyncRun 四套 Manager 都塞在一起——跨 Manager 共享的工具函数(_handle_event、_acopy_callbacks、parent_run_id 传播)也在这里——再次印证 LangChain "单文件包大方法集" 的偏好(对比 LangGraph 把 pregel 切成 22 个文件)
  2. tracers/event_stream.py(astream_events)和 tracers/log_stream.py(astream_log)几乎同样大(1100 + 769 = 1869 行)——它们解决同一个问题的两代方案——astream_log 是 v0 patch-based 早期方案、astream_events 是 v0.2 之后推荐——LangChain 把两套并存就是为了向后兼容、不强制升级
  3. 40 个 on_* 回调方法(base.py 实测)——sync + async 双轨——对应 6 个 Mixin × 4 种事件(start / end / error / 特殊)——这就是为什么 BaseCallbackHandler 看似"接口爆炸":它必须覆盖 LLM / Chain / Tool / Retriever / Agent / Token 6 个组件类型 × 4 种事件 × 2 种执行模式——3 个维度的笛卡尔积

12.9.6 回调系统的真实控制面:跳过、吞错、继承、异步分流

回调系统不是一个简单的事件总线。langchain_core/callbacks/base.py:455-469BaseCallbackHandler 同时混入 LLM、Chain、Tool、Retriever、Callback、Run 六组 Mixin,并提供两个关键开关:raise_error=Falserun_inline=False。这两个默认值表达了 LangChain 的生产态度:回调失败默认不打断主链路,回调默认可以被调度到非内联路径,避免阻塞业务。

BaseCallbackManagerbase.py:938-971 维护四组可传播状态:普通 handlersinheritable_handlers、普通 tags / metadata、可继承 inheritable_tags / inheritable_metadata。这解释了为什么本章前面反复强调"可继承"和"不可继承"的区分。顶层用户希望所有子调用都带上项目标签,但某个局部调试 handler 不一定应该泄漏到每个下游模型调用。

同步事件的核心在 langchain_core/callbacks/manager.py:254-320handle_event。它遍历 handlers,先检查 ignore_condition_name,再调用对应事件方法;如果 on_chat_model_start 没实现,会回退到 on_llm_start;普通异常会记录 warning,只有 handler.raise_error 为真时才重新抛出。异步版本在 manager.py:366-420,如果事件方法是 coroutine 就 await;如果 handler.run_inline 为真就直接调用;否则放到 executor 里执行。

flowchart TD
  A[事件触发] --> B{handler 是否 ignore}
  B -->|是| C[跳过]
  B -->|否| D{同步还是异步}
  D -->|同步| E[handle_event]
  D -->|异步| F[ahandle_event]
  E --> G{异常}
  F --> G
  G -->|raise_error=false| H[记录 warning 继续]
  G -->|raise_error=true| I[向上抛出]

父子传播也不是概念图。ParentRunManager.get_childmanager.py:564-583 创建新的 CallbackManager,把当前 run id 设为 parent_run_id,只复制 inheritable handlers、tags、metadata,并可追加局部 tag。这样一个 Chain 调 Tool、Tool 内部调 Retriever 时,trace 树能保留父子关系,但局部 handler 不会默认污染子树。

CallbackManager.on_chain_startmanager.py:1410-1453 展示了完整事件生命周期:没有 run_id 时生成 uuid,调用 handle_event 发出 on_chain_start,然后返回 CallbackManagerForChainRun,把 run_id、handlers、inheritable_handlers、parent_run_id、tags、metadata 都带上。异步 AsyncCallbackManager.on_chain_startmanager.py:1942-1975 做同样的事,只是调用 ahandle_event

控制面 源码位置 工程意义
ignore_* handle_event 检查 ignore 条件 一个 handler 可只关心 LLM 或 Tool
raise_error manager.py:305-313407-415 可观测性默认不影响可用性
run_inline manager.py:376-387 异步 handler 可选择内联或 executor
get_child manager.py:564-583 只传播 inheritable 状态,建立父子 run
configure manager.py:1608-16282128-2144 多来源回调、标签、元数据汇总入口

这套控制面解释了为什么 callbacks 目录会膨胀到几千行。它要同时解决四个问题:事件接口覆盖所有组件,handler 故障不破坏主流程,trace 树能表达父子关系,同步和异步路径都能工作。少任何一层,LangSmith、终端日志、token 统计、前端事件流都会在复杂链路里断裂。

回调系统的设计也提醒我们:可观测性不是"事后加一个日志函数"。在 LangChain 这种组合式框架里,每个组件都可能嵌套调用另一个组件,日志必须天然带有父子关系;每个组件都可能同步或异步执行,事件处理必须同时覆盖两条路径;每个组件都可能用于生产请求,观测系统失败不能默认拖垮业务。

这就是 handlersinheritable_handlers 分离的意义。顶层追踪器需要继承,因为它负责整棵调用树;局部调试器不一定继承,因为它可能只服务当前组件。标签和元数据也一样:项目编号、用户编号、会话编号通常应该沿调用树传播;某个局部实验标签则不应污染下游所有节点。

异常策略同样体现生产权衡。默认吞掉回调异常不是忽视错误,而是把主业务可用性放在第一位;需要强一致追踪的场景可以打开 raise_error。内联策略也不是性能微调,而是决定事件处理是否参与当前执行时序。审计、计费、事务型回调可能需要内联;普通日志、前端事件流、调试输出通常可以异步处理。

从使用者角度看,调试回调问题时可以按四步走:

  1. 事件有没有触发,对应组件是否调用了 on_*_start
  2. handler 有没有被 ignore_* 跳过。
  3. handler 是否被放到异步执行路径,导致时序和预期不同。
  4. 子调用是否通过 get_child() 继承了正确的 handler、tag 和 metadata。

这四步比盲目打印日志有效,因为它们对应源码里的真实控制点。

还有一个常见误区,是把回调只理解成调试输出。实际上,同一套机制还承担计费统计、前端进度、审计记录、评估采样、实验标签传播。不同用途对可靠性的要求不同:计费和审计更接近强一致,前端进度更接近弱一致,调试输出可以丢失。LangChain 用同一套接口承载这些用途,所以必须提供跳过、吞错、内联、继承这些控制点,让使用者按场景调节。

这也解释了为什么回调和追踪分成两层。回调负责"事件发生时通知谁",追踪负责"把事件组织成一棵可查询的运行树"。如果只要打印日志,回调足够;如果要复盘一次复杂 Agent 为什么走到某个答案,就必须有追踪器维护父子关系、顺序关系和输入输出快照。

所以可观测性代码看起来啰嗦,是因为它承担的是事后解释权。线上问题发生后,工程团队需要知道请求从哪里进入、经过哪些模型和工具、哪一步失败、失败是否被吞掉、最终输出是否可信。回调和追踪系统就是为这些问题预先铺路。

没有这条路,复杂链路只能靠猜,排障成本会迅速失控,团队协作也会变慢。

12.10 小结

LangChain 的回调系统是一个精心分层的事件驱动架构。从底层的 Mixin 组合到顶层的事件流 API,每一层都有清晰的职责边界。

BaseCallbackHandler 通过六个 Mixin 的组合定义了完整的事件接口,并通过 ignore_* 属性和 raise_error/run_inline 标志提供了精细的控制能力。BaseCallbackManager 通过可继承/不可继承的双层处理器列表实现了回调的树状传播。CallbackManager.configure 方法是多源回调配置的合并入口,自动处理环境变量驱动的追踪器注入。

RunManager 系统将回调触发器与具体的运行上下文绑定,get_child() 方法实现了父子运行关系的自动传播。Tracer 系统在回调基础上构建了完整的运行追踪模型,_TracerCorerun_maporder_map 维护了运行的嵌套结构和全局排序。BaseTracer_persist_run 只在根运行结束时调用,一次性持久化整棵调用树。

事件流 API(astream_eventsastream_log)通过专用的追踪器将运行事件转化为可消费的异步流,为前端实时展示和中间状态监控提供了强大的基础设施。

物理骨架:callbacks/ 4677 行 + tracers/ 5057 行 = 9734 行——其中 manager.py 单文件 2697 行(全 callbacks 58%)+ event_stream.py 1100 行(全 tracers 22%)合计占两目录 39%——是理解整套可观测性架构的最小必读子集。