LangChain 设计与实现
第12章 回调与可观测性
第12章 回调与可观测性
引言
一个优秀的 AI 应用框架不仅要能正确执行任务,还必须让开发者清晰地了解执行过程中发生了什么。LangChain 的回调系统正是为此而生。它提供了一套精密的事件驱动架构,使得日志记录、性能监控、流式输出、调试追踪等横切关注点可以在不侵入业务逻辑的前提下实现。
回调系统是 LangChain 中最复杂的基础设施之一。它横跨 langchain_core 中的基础定义(callbacks/)和追踪器实现(tracers/),以及 langchain 包中面向具体场景的处理器实现。从基础的标准输出打印,到与 LangSmith 平台的深度集成,再到支撑 astream_events API 的事件流追踪器,回调系统的每一层都经过精心设计。
本章将从回调处理器的 Mixin 层次结构讲起,逐步深入到 CallbackManager 的事件分发机制、Tracer 的运行追踪模型,最终到达事件流的实现细节。
本章要点
- 回调处理器通过六个 Mixin 类组合而成,支持 LLM、Chain、Tool、Retriever 等全部组件类型
CallbackManager和AsyncCallbackManager负责事件分发,通过configure方法实现多源回调合并- 运行管理器(RunManager)是绑定到具体 run 的回调触发器,通过
get_child()实现父子关系传播 BaseTracer/_TracerCore构建了完整的运行追踪模型,维护run_map和order_map实现嵌套追踪LangChainTracer将追踪数据发送到 LangSmith 平台,LogStreamCallbackHandler和事件流追踪器支撑了astream_log和astream_eventsAPI
12.1 回调处理器的 Mixin 架构
12.1.1 六大 Mixin 类
LangChain 的回调处理器采用了精细的 Mixin 组合设计。每个 Mixin 对应一类组件的生命周期事件:
classDiagram
class LLMManagerMixin {
+on_llm_new_token(token, chunk, run_id)
+on_llm_end(response, run_id)
+on_llm_error(error, run_id)
}
class ChainManagerMixin {
+on_chain_end(outputs, run_id)
+on_chain_error(error, run_id)
+on_agent_action(action, run_id)
+on_agent_finish(finish, run_id)
}
class ToolManagerMixin {
+on_tool_end(output, run_id)
+on_tool_error(error, run_id)
}
class RetrieverManagerMixin {
+on_retriever_end(documents, run_id)
+on_retriever_error(error, run_id)
}
class CallbackManagerMixin {
+on_llm_start(serialized, prompts, run_id)
+on_chat_model_start(serialized, messages, run_id)
+on_chain_start(serialized, inputs, run_id)
+on_tool_start(serialized, input_str, run_id)
+on_retriever_start(serialized, query, run_id)
}
class RunManagerMixin {
+on_text(text, run_id)
+on_retry(retry_state, run_id)
+on_custom_event(name, data, run_id)
}
class BaseCallbackHandler {
+raise_error: bool
+run_inline: bool
+ignore_llm: bool
+ignore_chain: bool
+ignore_agent: bool
+ignore_retriever: bool
+ignore_chat_model: bool
+ignore_custom_event: bool
}
LLMManagerMixin <|-- BaseCallbackHandler
ChainManagerMixin <|-- BaseCallbackHandler
ToolManagerMixin <|-- BaseCallbackHandler
RetrieverManagerMixin <|-- BaseCallbackHandler
CallbackManagerMixin <|-- BaseCallbackHandler
RunManagerMixin <|-- BaseCallbackHandler
这个设计的精妙之处在于:
- CallbackManagerMixin 包含所有
on_*_start方法 -- 这些是"启动事件",只有 CallbackManager 层级才需要处理 - 组件特定 Mixin(LLM/Chain/Tool/Retriever)包含对应的
on_*_end和on_*_error方法 -- 这些是"完成事件" - RunManagerMixin 包含通用的
on_text、on_retry和on_custom_event-- 这些可以在任何运行阶段触发
值得注意的是 on_chat_model_start 和 on_llm_start 的关系。on_chat_model_start 默认抛出 NotImplementedError,这是一种故意的设计:
class CallbackManagerMixin:
def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs):
# NotImplementedError is thrown intentionally
# Callback handler will fall back to on_llm_start
msg = f"{self.__class__.__name__} does not implement `on_chat_model_start`"
raise NotImplementedError(msg)
事件分发系统会捕获这个异常,并自动将消息转换为字符串后回退到 on_llm_start。这保证了只实现了 on_llm_start 的旧处理器也能处理 Chat Model 的事件。
12.1.2 BaseCallbackHandler 的控制属性
BaseCallbackHandler 通过一系列 ignore_* 属性提供了精细的事件过滤能力:
class BaseCallbackHandler(
LLMManagerMixin, ChainManagerMixin, ToolManagerMixin,
RetrieverManagerMixin, CallbackManagerMixin, RunManagerMixin
):
raise_error: bool = False # 回调异常时是否传播
run_inline: bool = False # 是否在主线程内联执行
@property
def ignore_llm(self) -> bool: return False
@property
def ignore_chain(self) -> bool: return False
@property
def ignore_agent(self) -> bool: return False
@property
def ignore_retriever(self) -> bool: return False
@property
def ignore_chat_model(self) -> bool: return False
@property
def ignore_custom_event(self) -> bool: return False
raise_error 和 run_inline 是两个关键的控制标志:
raise_error = False(默认):回调处理器中的异常被吞掉并记录日志,不影响主流程。这是生产环境中的安全默认值,确保监控代码的缺陷不会导致业务逻辑失败。run_inline = True:异步场景下强制在当前上下文中内联执行,而不是通过asyncio.gather并发执行。这对需要保证执行顺序的处理器很重要。
12.1.3 AsyncCallbackHandler
AsyncCallbackHandler 继承自 BaseCallbackHandler,将所有事件方法重新声明为 async:
class AsyncCallbackHandler(BaseCallbackHandler):
async def on_llm_start(self, serialized, prompts, *, run_id, **kwargs) -> None: ...
async def on_chat_model_start(self, serialized, messages, *, run_id, **kwargs) -> Any:
raise NotImplementedError(...)
async def on_llm_new_token(self, token, *, run_id, **kwargs) -> None: ...
async def on_chain_start(self, serialized, inputs, *, run_id, **kwargs) -> None: ...
# ... 其他事件方法
12.2 BaseCallbackManager:回调管理的基石
12.2.1 数据结构
BaseCallbackManager 管理两层处理器列表和元数据:
class BaseCallbackManager(CallbackManagerMixin):
def __init__(self, handlers, inheritable_handlers=None, parent_run_id=None,
*, tags=None, inheritable_tags=None,
metadata=None, inheritable_metadata=None):
self.handlers: list[BaseCallbackHandler] = handlers
self.inheritable_handlers: list[BaseCallbackHandler] = inheritable_handlers or []
self.parent_run_id: UUID | None = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
"可继承"机制是回调系统的核心设计之一。当一个 Chain 调用子 Chain 时,父级的 inheritable_handlers、inheritable_tags 和 inheritable_metadata 会自动传递给子级。这确保了顶层设置的追踪器能够捕获整个调用树中的所有事件。
flowchart TD
subgraph "父 CallbackManager"
PH["handlers: H1, H2"]
PIH["inheritable_handlers: H1"]
PT["tags: tag_a, tag_b"]
PIT["inheritable_tags: tag_a"]
end
subgraph "子 CallbackManager -- 通过 get_child 创建"
CH["handlers: H1 -- 来自继承"]
CIH["inheritable_handlers: H1 -- 来自继承"]
CT["tags: tag_a -- 来自继承"]
CIT["inheritable_tags: tag_a -- 来自继承"]
end
PIH --> CH
PIH --> CIH
PIT --> CT
PIT --> CIT
12.2.2 处理器管理方法
def add_handler(self, handler, inherit=True):
if handler not in self.handlers:
self.handlers.append(handler)
if inherit and handler not in self.inheritable_handlers:
self.inheritable_handlers.append(handler)
def remove_handler(self, handler):
if handler in self.handlers:
self.handlers.remove(handler)
if handler in self.inheritable_handlers:
self.inheritable_handlers.remove(handler)
def merge(self, other):
combined_handlers = list(set(self.handlers) | set(other.handlers))
combined_inheritable = list(set(self.inheritable_handlers) | set(other.inheritable_handlers))
return self.__class__(
handlers=combined_handlers,
inheritable_handlers=combined_inheritable,
tags=list(set(self.tags + other.tags)),
inheritable_tags=list(set(self.inheritable_tags + other.inheritable_tags)),
metadata={**self.metadata, **other.metadata},
inheritable_metadata={**self.inheritable_metadata, **other.inheritable_metadata},
)
merge 方法使用集合运算去重,这是处理多个回调源合并时避免重复触发的关键。
12.3 CallbackManager:事件分发中枢
12.3.1 configure 方法
CallbackManager.configure 是整个回调系统的入口点,负责从多个来源合并回调配置:
flowchart TD
A["CallbackManager.configure(callbacks, inheritable_callbacks,
verbose, tags, inheritable_tags, metadata, inheritable_metadata)"]
A --> B{是否有 tracing 环境变量?}
B -->|是| C[添加 LangChainTracer]
B --> D[合并 callbacks 和 inheritable_callbacks]
C --> D
D --> E{verbose 为 True?}
E -->|是| F[添加 StdOutCallbackHandler]
E --> G[合并 tags 和 metadata]
F --> G
G --> H[返回配置好的 CallbackManager]
这个方法的设计体现了"约定优于配置"的原则:
- 设置
LANGCHAIN_TRACING_V2=true环境变量自动启用 LangSmith 追踪 - 设置
verbose=True自动添加标准输出处理器 - 开发者也可以显式传入自定义处理器
12.3.2 事件分发机制
事件分发通过 handle_event 和 ahandle_event 两个函数实现。同步版本的核心逻辑:
def handle_event(handlers, event_name, ignore_condition_name, *args, **kwargs):
coros = []
message_strings = None
for handler in handlers:
try:
if ignore_condition_name is None or not getattr(handler, ignore_condition_name):
event = getattr(handler, event_name)(*args, **kwargs)
if asyncio.iscoroutine(event):
coros.append(event)
except NotImplementedError as e:
if event_name == "on_chat_model_start":
if message_strings is None:
message_strings = [get_buffer_string(m) for m in args[1]]
handle_event([handler], "on_llm_start", "ignore_llm",
args[0], message_strings, *args[2:], **kwargs)
else:
logger.warning("NotImplementedError in %s.%s",
handler.__class__.__name__, event_name)
except Exception as e:
logger.warning("Error in %s.%s: %s",
handler.__class__.__name__, event_name, repr(e))
if handler.raise_error:
raise
if coros:
try:
asyncio.get_running_loop()
loop_running = True
except RuntimeError:
loop_running = False
if loop_running:
_executor().submit(copy_context().run, _run_coros, coros).result()
else:
_run_coros(coros)
这段代码有几个值得注意的设计决策:
-
NotImplementedError 的优雅回退:当
on_chat_model_start未实现时,自动将消息转换为字符串后调用on_llm_start。这保证了向后兼容性。 -
异步处理器的同步兼容:当在同步上下文中遇到异步处理器产生的协程时,会检测是否有运行中的事件循环。如果有,则将协程提交到线程池执行(避免死锁);如果没有,则直接运行协程。
-
错误隔离:默认情况下回调异常只记录日志不传播,除非
handler.raise_error = True。
异步版本 ahandle_event 的处理策略有所不同:
async def ahandle_event(handlers, event_name, ignore_condition_name, *args, **kwargs):
# 先内联执行标记为 run_inline 的处理器
for handler in [h for h in handlers if h.run_inline]:
await _ahandle_event_for_handler(
handler, event_name, ignore_condition_name, *args, **kwargs)
# 其余处理器并发执行
await asyncio.gather(*(
_ahandle_event_for_handler(
handler, event_name, ignore_condition_name, *args, **kwargs)
for handler in handlers if not handler.run_inline
))
run_inline 处理器优先执行并阻塞等待完成,然后其余处理器通过 asyncio.gather 并发执行。
12.3.3 RunManager:绑定运行上下文的触发器
当 CallbackManager.on_chain_start 被调用时,它会创建一个 CallbackManagerForChainRun 实例,该实例绑定了当前运行的 run_id:
class BaseRunManager(RunManagerMixin):
def __init__(self, *, run_id, handlers, inheritable_handlers,
parent_run_id=None, tags=None, inheritable_tags=None,
metadata=None, inheritable_metadata=None):
self.run_id = run_id
self.handlers = handlers
self.inheritable_handlers = inheritable_handlers
self.parent_run_id = parent_run_id
self.tags = tags or []
self.inheritable_tags = inheritable_tags or []
self.metadata = metadata or {}
self.inheritable_metadata = inheritable_metadata or {}
每个组件类型都有对应的 RunManager:
classDiagram
class BaseRunManager {
+run_id: UUID
+handlers: list
+get_child(tag) CallbackManager
}
class CallbackManagerForLLMRun {
+on_llm_new_token(token)
+on_llm_end(response)
+on_llm_error(error)
}
class CallbackManagerForChainRun {
+on_chain_end(outputs)
+on_chain_error(error)
+on_agent_action(action)
+on_agent_finish(finish)
+get_child(tag) CallbackManager
}
class CallbackManagerForToolRun {
+on_tool_end(output)
+on_tool_error(error)
}
class CallbackManagerForRetrieverRun {
+on_retriever_end(documents)
+on_retriever_error(error)
}
BaseRunManager <|-- CallbackManagerForLLMRun
BaseRunManager <|-- CallbackManagerForChainRun
BaseRunManager <|-- CallbackManagerForToolRun
BaseRunManager <|-- CallbackManagerForRetrieverRun
get_child 方法是嵌套调用链追踪的关键 -- 它创建一个新的 CallbackManager,将当前 run_id 设为新管理器的 parent_run_id:
def get_child(self, tag=None):
manager = CallbackManager(handlers=[], parent_run_id=self.run_id)
manager.set_handlers(self.inheritable_handlers)
manager.add_tags(self.inheritable_tags)
manager.add_metadata(self.inheritable_metadata)
if tag:
manager.add_tags([tag], inherit=False)
return manager
12.4 内置 Handler 实现
12.4.1 StdOutCallbackHandler
最简单的内置处理器,用于将执行过程输出到标准输出:
class StdOutCallbackHandler(BaseCallbackHandler):
def __init__(self, color=None):
self.color = color
def on_chain_start(self, serialized, inputs, **kwargs):
if "name" in kwargs:
name = kwargs["name"]
elif serialized:
name = serialized.get("name", serialized.get("id", ["<unknown>"])[-1])
else:
name = "<unknown>"
print(f"\n\n\033[1m> Entering new {name} chain...\033[0m")
def on_chain_end(self, outputs, **kwargs):
print("\n\033[1m> Finished chain.\033[0m")
def on_agent_action(self, action, color=None, **kwargs):
print_text(action.log, color=color or self.color)
def on_tool_end(self, output, color=None,
observation_prefix=None, llm_prefix=None, **kwargs):
output = str(output)
if observation_prefix is not None:
print_text(f"\n{observation_prefix}")
print_text(output, color=color or self.color)
它使用 ANSI 转义码实现了粗体和颜色输出,在终端中提供了直观的调用链可视化。当 Chain.verbose = True 时,StdOutCallbackHandler 会被 CallbackManager.configure 自动注入。
12.4.2 StreamingStdOutCallbackHandler
专为流式输出设计的处理器,核心是 on_llm_new_token:
class StreamingStdOutCallbackHandler(BaseCallbackHandler):
def on_llm_new_token(self, token, **kwargs):
sys.stdout.write(token)
sys.stdout.flush()
每当 LLM 产生一个新 token 时,立即写入标准输出并刷新缓冲区。sys.stdout.flush() 确保 token 即时显示而非等到缓冲区满。
12.4.3 FileCallbackHandler 与第三方集成
langchain_classic/callbacks/file.py 中的 FileCallbackHandler 将事件输出到文件,适合生产环境的日志记录。langchain_classic/callbacks/ 目录下还有大量第三方集成处理器,包括 wandb_callback.py(Weights & Biases)、mlflow_callback.py(MLflow)、arize_callback.py(Arize)等,覆盖了主流的 ML 可观测性平台。
12.5 Tracer 系统:结构化运行追踪
12.5.1 _TracerCore:追踪的内核
Tracer 是回调系统中最复杂的部分。_TracerCore 定义在 langchain_core/tracers/core.py 中,是所有追踪器的内部基础:
class _TracerCore(ABC):
def __init__(self, *, _schema_format="original", **kwargs):
self._schema_format = _schema_format
self.run_map: dict[str, Run] = {} # run_id -> Run 对象
self.order_map: dict[UUID, tuple[UUID, str]] = {} # run_id -> (trace_id, dotted_order)
_TracerCore 维护两个关键映射:
run_map:从 run_id 到 Run 对象的映射,在运行期间维护,运行结束后清理order_map:从 run_id 到(trace_id, dotted_order)的映射,用于构建运行的全局排序
_start_trace 方法展示了 dotted_order 的构建逻辑:
def _start_trace(self, run):
current_dotted_order = run.start_time.strftime("%Y%m%dT%H%M%S%fZ") + str(run.id)
if run.parent_run_id:
if parent := self.order_map.get(run.parent_run_id):
run.trace_id, run.dotted_order = parent
run.dotted_order += "." + current_dotted_order
if parent_run := self.run_map.get(str(run.parent_run_id)):
self._add_child_run(parent_run, run)
else:
# 父运行未找到,当作根运行处理
run.parent_run_id = None
run.trace_id = run.id
run.dotted_order = current_dotted_order
else:
run.trace_id = run.id
run.dotted_order = current_dotted_order
self.order_map[run.id] = (run.trace_id, run.dotted_order)
self.run_map[str(run.id)] = run
dotted_order 是一种类似文件路径的嵌套标识,格式如 20240101T120000000000Z<root_id>.20240101T120001000000Z<child_id>。它同时编码了时间顺序和嵌套结构,使得运行日志可以按自然顺序排列。
flowchart TD
subgraph "run_map 和 order_map 的维护"
START["on_xxx_start 被调用"] --> CREATE[创建 Run 对象]
CREATE --> CALC[计算 dotted_order]
CALC --> CHECK{有 parent_run_id?}
CHECK -->|是| LOOKUP[从 order_map 查找父运行]
LOOKUP --> FOUND{找到?}
FOUND -->|是| CHAIN_ORDER[在父 dotted_order 后追加]
FOUND -->|否| ROOT[当作根运行]
CHECK -->|否| ROOT
CHAIN_ORDER --> STORE[存入 run_map 和 order_map]
ROOT --> STORE
STORE --> END_LATER["on_xxx_end 被调用"]
END_LATER --> PERSIST{是根运行?}
PERSIST -->|是| SAVE["调用 _persist_run 持久化"]
PERSIST -->|否| SKIP[跳过持久化]
SAVE --> CLEANUP[从 run_map 中移除]
SKIP --> CLEANUP
end
12.5.2 BaseTracer:同步追踪器
BaseTracer 继承自 _TracerCore 和 BaseCallbackHandler,将追踪逻辑与回调接口对接:
class BaseTracer(_TracerCore, BaseCallbackHandler, ABC):
@abstractmethod
def _persist_run(self, run: Run) -> None:
"""Persist a run."""
def _start_trace(self, run):
super()._start_trace(run)
self._on_run_create(run)
def _end_trace(self, run):
if not run.parent_run_id:
self._persist_run(run)
self.run_map.pop(str(run.id))
self._on_run_update(run)
def on_chain_start(self, serialized, inputs, *, run_id, **kwargs):
chain_run = self._create_chain_run(
serialized=serialized, inputs=inputs, run_id=run_id, **kwargs
)
self._start_trace(chain_run)
self._on_chain_start(chain_run)
return chain_run
def on_chain_end(self, outputs, *, run_id, inputs=None, **kwargs):
chain_run = self._complete_chain_run(
outputs=outputs, run_id=run_id, inputs=inputs
)
self._end_trace(chain_run)
self._on_chain_end(chain_run)
return chain_run
注意 _end_trace 中的关键判断:只有根运行(not run.parent_run_id)才调用 _persist_run。这是因为根运行的 child_runs 列表中已经包含了所有子运行,持久化根运行就持久化了整棵调用树。
12.5.3 AsyncBaseTracer:异步追踪器
AsyncBaseTracer 的关键差异在于使用 asyncio.gather 并发执行追踪操作和通知:
class AsyncBaseTracer(_TracerCore, AsyncCallbackHandler, ABC):
async def on_chain_start(self, serialized, inputs, *, run_id, **kwargs):
chain_run = self._create_chain_run(...)
tasks = [self._start_trace(chain_run), self._on_chain_start(chain_run)]
await asyncio.gather(*tasks)
async def on_chain_end(self, outputs, *, run_id, **kwargs):
chain_run = self._complete_chain_run(...)
tasks = [self._end_trace(chain_run), self._on_chain_end(chain_run)]
await asyncio.gather(*tasks)
_start_trace 和 _on_chain_start 同时执行,以及 _end_trace 和 _on_chain_end 同时执行 -- 这种并发设计最大化了异步追踪的效率。但注意源码中的注释:"No _on_[run_type]_start callback should depend on operations in _start_trace" -- 两者之间不能有依赖关系。
12.5.4 Run 模型的数据结构
_TracerCore 中创建 Run 对象的方法揭示了运行记录的完整结构:
def _create_chain_run(self, serialized, inputs, run_id,
tags=None, parent_run_id=None, metadata=None,
run_type=None, name=None, **kwargs):
start_time = datetime.now(timezone.utc)
if metadata:
kwargs.update({"metadata": metadata})
return Run(
id=run_id,
parent_run_id=parent_run_id,
serialized=serialized,
inputs=self._get_chain_inputs(inputs),
extra=kwargs,
events=[{"name": "start", "time": start_time}],
start_time=start_time,
child_runs=[],
run_type=run_type or "chain",
name=name,
tags=tags or [],
)
一个 Run 对象包含:运行标识(id/parent_run_id/trace_id)、序列化的组件信息、输入输出、时间戳、事件列表、子运行列表、运行类型和标签。这些信息足以完整还原一次调用链的执行过程。
12.6 LangChainTracer:LangSmith 集成
LangChainTracer 定义在 langchain_core/tracers/langchain.py 中,是连接 LangChain 应用与 LangSmith 平台的桥梁。它继承自 BaseTracer,通过 langsmith.Client 将运行数据发送到 LangSmith 服务器。
flowchart LR
subgraph LangChain 应用
CHAIN[Chain 或 Runnable 执行]
CM[CallbackManager]
LCT[LangChainTracer]
end
subgraph LangSmith 平台
API[LangSmith API]
UI[LangSmith Dashboard]
end
CHAIN -->|事件| CM
CM -->|分发| LCT
LCT -->|"_persist_run"| API
API --> UI
当设置 LANGCHAIN_TRACING_V2=true 环境变量时,CallbackManager.configure 会自动创建 LangChainTracer 实例并添加到处理器列表中。追踪数据通过 langsmith.Client 异步发送到 LangSmith 服务器。
LangChainTracer 还处理了 token 使用量的聚合,从 LLM 响应的 generations 结构中提取 usage_metadata 并累加,为成本监控提供基础数据。
12.7 事件流:astream_events 与 astream_log
12.7.1 LogStreamCallbackHandler 与 RunLog
astream_log API 背后是 LogStreamCallbackHandler,它继承自 BaseTracer,将运行日志转化为 JSON Patch 格式的增量更新流:
class LogEntry(TypedDict):
id: str # 子运行 ID
name: str # 运行对象名称
type: str # 运行类型 (prompt/chain/llm 等)
tags: list[str] # 标签
metadata: dict[str, Any] # 元数据
start_time: str # ISO-8601 开始时间
streamed_output_str: list[str] # LLM token 流
streamed_output: list[Any] # 输出块流
final_output: Any | None # 最终输出
end_time: str | None # ISO-8601 结束时间
RunLog 是一个以 JSON Patch 操作驱动的不可变数据结构。客户端通过连续应用 patch 操作即可重建完整的运行日志,这种设计特别适合流式传输场景,因为只需要发送增量变化而非完整状态。
两层类型结构(tracers/log_stream.py:115 和 :168):
class RunLogPatch:
ops: list[dict[str, Any]] # RFC 6902 JSON Patch 操作列表
def __init__(self, *ops): self.ops = list(ops)
def __add__(self, other):
if type(other) is RunLogPatch:
ops = self.ops + other.ops
state = jsonpatch.apply_patch(None, copy.deepcopy(ops)) # 从空起算
return RunLog(*ops, state=state)
raise TypeError(...)
class RunLog(RunLogPatch):
state: RunState # 应用 ops 后的当前快照
def __add__(self, other):
if type(other) is RunLogPatch:
ops = self.ops + other.ops
state = jsonpatch.apply_patch(self.state, other.ops) # 增量应用
return RunLog(*ops, state=state)
RunLog = RunLogPatch + state 缓存。两个类的关系看似复杂、实际是 "未物化的增量" vs "已物化的快照" 分层:
RunLogPatch:仅携带 ops 列表、没有 state 字段。客户端 SSE stream 每次收到的就是这种最小增量。RunLog:继承自RunLogPatch,额外 cache 了一个state: RunState—— 是对所有 ops 做过一次jsonpatch.apply_patch之后的完整状态快照。
__add__ 的两种分派路径揭示性能 trick:
RunLogPatch + RunLogPatch(line 148-151):两边都是"未物化"的补丁,合并时要apply_patch(None, deepcopy(ops))从空对象起算。RunLog + RunLogPatch(line 196-199):左边已有 state 缓存,合并时apply_patch(self.state, other.ops)只应用新到的 ops,跳过旧 ops 的重放。
服务端在长 chain 里累积 patch 时走第二条路径——每次新 patch 的应用代价只和增量大小成正比、不与历史长度成正比。如果每次都从 ops 列表重放,一个跑 100 步的 chain 最后一个 patch 要重放前 99 步,复杂度从 O(n) 变 O(n²)。两层类就是为了让这 O(n²) 退化不发生。
两个隐蔽细节:
-
copy.deepcopy(ops)的深拷贝(line 150):jsonpatch库在应用 patch 时会把 ops 里的 value 对象引用进结果——不深拷贝的话,后续对 state 的变更会反射到原始 ops 列表。Streaming 场景里这会导致"已经发出去的 patch"被后续计算改写、客户端重建出错。 -
__hash__ = None(line 165):显式禁掉哈希能力,尽管结构看起来像值类型。这是刻意的——RunLog 在管线里频繁通过+=累加(实际产出新对象),用户如果误把它塞 set/dict 会拿到过时快照。Python 的默认做法是"有__eq__就可哈希",显式赋 None 是 opt-out 惯用法,防误用。
jsonpatch 是真实依赖,不是比喻——RFC 6902 的六种 op 原样走:
{"op": "add", "path": "/logs/prompt-0", "value": {...}}
{"op": "replace", "path": "/streamed_output/-", "value": "token"}
{"op": "remove", "path": "/logs/prompt-0/final_output"}
客户端用任何 RFC 6902 兼容库(JS 侧 fast-json-patch、Go 侧 evanphx/json-patch)就能重建状态——语言中立的流式协议。这也是为什么 LangSmith/LangGraph Studio 的前端能用 JS 重建整个 chain 状态:服务端只发 ops、前端本地累积。
12.7.2 事件流追踪器
astream_events API 由 langchain_core/tracers/event_stream.py 中的追踪器支撑。它将运行事件标准化为 StreamEvent 格式:
class RunInfo(TypedDict):
name: str # 运行名称
tags: list[str] # 标签
metadata: dict[str, Any] # 元数据
run_type: str # 运行类型
inputs: NotRequired[Any] # 输入
parent_run_id: UUID | None # 父运行 ID
astream_events 提供了比 astream_log 更高层次的抽象。事件类型包括 on_chain_start、on_chain_stream、on_chain_end 等标准事件,以及 on_custom_event 自定义事件。开发者可以用直观的过滤条件(如 include_names、include_types、include_tags)选择感兴趣的事件。
12.7.3 _schema_format 的角色
_TracerCore 构造函数中的 _schema_format 参数控制了输入输出的序列化格式:
def _get_chain_inputs(self, inputs):
if self._schema_format in {"original", "original+chat"}:
return inputs if isinstance(inputs, dict) else {"input": inputs}
if self._schema_format == "streaming_events":
return {"input": inputs}
"original"格式是所有传统追踪器使用的格式,直接透传输入输出"streaming_events"格式将所有输入统一包装在{"input": ...}中,为事件流提供一致的数据结构"original+chat"格式与"original"相同,但不会在on_chat_model_start时抛出NotImplementedError
这三种格式的存在反映了 LangChain 在保持向后兼容性的同时不断演进追踪格式的策略。
12.8 回调在组件中的集成模式
12.8.1 标准集成模式
回调在所有组件中遵循统一的集成模式:
# 1. 合并配置
callback_manager = CallbackManager.configure(
callbacks, self.callbacks, self.verbose,
tags, self.tags, metadata, self.metadata
)
# 2. 触发开始事件,获得 RunManager
run_manager = callback_manager.on_chain_start(None, inputs, name=run_name)
# 3. 执行业务逻辑,传递 RunManager
try:
outputs = self._call(inputs, run_manager=run_manager)
except BaseException as e:
run_manager.on_chain_error(e)
raise
# 4. 触发结束事件
run_manager.on_chain_end(outputs)
12.8.2 子调用的回调传播
当子组件需要调用时,通过 run_manager.get_child() 创建子级 CallbackManager:
# SequentialChain._call 中:
for chain in self.chains:
callbacks = _run_manager.get_child()
outputs = chain(known_values, return_only_outputs=True, callbacks=callbacks)
这确保了父子运行之间的追踪关系被正确建立。在 Runnable 体系中,同样的传播通过 RunnableConfig 中的 callbacks 字段实现:
docs = self.retriever.invoke(
question, config={"callbacks": run_manager.get_child()}
)
12.9 设计决策分析
12.9.1 为什么使用 Mixin 而非单一基类
Mixin 设计允许不同的 RunManager 类型只混入需要的事件方法。CallbackManagerForLLMRun 只需要 LLMManagerMixin 的方法,不需要 ChainManagerMixin。这避免了在不相关的上下文中暴露无意义的方法,提供了更清晰的类型安全。
12.9.2 为什么默认吞掉回调异常
回调系统是一种横切关注点,它的故障不应影响主业务流程。在生产环境中,一个监控系统的临时故障不应导致用户请求失败。raise_error = False 的默认值体现了"可观测性不应影响可用性"的设计理念。
12.9.3 为什么需要可继承与不可继承的区分
考虑这样的场景:用户为顶层 Chain 设置了一个全局追踪器(应继承给所有子调用),同时也设置了一个只在当前层生效的调试处理器(不应继承)。inheritable_handlers 与 handlers 的分离正是为了支持这种灵活性。标签和元数据也有同样的区分,使得追踪树中的每一层都可以有自己的局部标注。
12.9.4 为什么 Tracer 只在根运行持久化
根运行包含了完整的调用树(通过 child_runs 列表递归嵌套),因此只需持久化一次即可保存全部信息。这种设计避免了大量的重复写入操作,同时也保证了追踪数据的原子性 -- 要么完整保存一棵调用树,要么不保存。
12.9.5 langchain_core/callbacks/ + tracers/ 真实尺寸:4677 + 5057 行
langchain_core 里两个目录、19 个文件、合计 9734 行——
callbacks/(4677 行)——
| 文件 | 行 | 角色 |
|---|---|---|
manager.py |
2697 | CallbackManager / AsyncCallbackManager / RunManager 一家——本目录最大、占 58% |
base.py |
1157 | 6 个 Mixin + BaseCallbackHandler + AsyncCallbackHandler + BaseCallbackManager;40 个 on_* 方法(sync + async 双轨) |
file.py |
267 | FileCallbackHandler——日志写文件 |
streaming_stdout.py |
152 | 流式 stdout |
usage.py |
149 | UsageMetadataCallbackHandler——token 计费聚合 |
__init__.py |
132 | public 导出 |
stdout.py |
123 | StdOutCallbackHandler |
tracers/(5057 行)——
| 文件 | 行 | 角色 |
|---|---|---|
event_stream.py |
1100 | astream_events 的实现——本目录最大 |
base.py |
937 | BaseTracer + AsyncBaseTracer——同步/异步追踪器双层 |
log_stream.py |
769 | astream_log 的实现——RunLog patch 流 |
core.py |
705 | _TracerCore——run_map / order_map 双索引、运行树管理 |
langchain.py |
400 | LangChainTracer——把 Run 持久化到 LangSmith |
evaluation.py |
226 | EvaluationTracer——给评估框架用 |
stdout.py / context.py |
205 / 205 | 文本输出 / contextvars 注入 |
memory_stream.py |
148 | 跨 task 的 in-memory queue |
root_listeners.py / _compat.py / __init__.py / run_collector.py / _streaming.py / schemas.py |
≤ 130 | 薄壳 + 兼容 |
三条非显然的物理事实——
manager.py单文件 2697 行占 callbacks 目录 58%——把同步、异步、Run、AsyncRun 四套 Manager 都塞在一起——跨 Manager 共享的工具函数(_handle_event、_acopy_callbacks、parent_run_id 传播)也在这里——再次印证 LangChain "单文件包大方法集" 的偏好(对比 LangGraph 把 pregel 切成 22 个文件)tracers/event_stream.py(astream_events)和tracers/log_stream.py(astream_log)几乎同样大(1100 + 769 = 1869 行)——它们解决同一个问题的两代方案——astream_log是 v0 patch-based 早期方案、astream_events是 v0.2 之后推荐——LangChain 把两套并存就是为了向后兼容、不强制升级- 40 个
on_*回调方法(base.py 实测)——sync + async 双轨——对应 6 个 Mixin × 4 种事件(start / end / error / 特殊)——这就是为什么BaseCallbackHandler看似"接口爆炸":它必须覆盖 LLM / Chain / Tool / Retriever / Agent / Token 6 个组件类型 × 4 种事件 × 2 种执行模式——3 个维度的笛卡尔积
12.9.6 回调系统的真实控制面:跳过、吞错、继承、异步分流
回调系统不是一个简单的事件总线。langchain_core/callbacks/base.py:455-469 的 BaseCallbackHandler 同时混入 LLM、Chain、Tool、Retriever、Callback、Run 六组 Mixin,并提供两个关键开关:raise_error=False 和 run_inline=False。这两个默认值表达了 LangChain 的生产态度:回调失败默认不打断主链路,回调默认可以被调度到非内联路径,避免阻塞业务。
BaseCallbackManager 在 base.py:938-971 维护四组可传播状态:普通 handlers、inheritable_handlers、普通 tags / metadata、可继承 inheritable_tags / inheritable_metadata。这解释了为什么本章前面反复强调"可继承"和"不可继承"的区分。顶层用户希望所有子调用都带上项目标签,但某个局部调试 handler 不一定应该泄漏到每个下游模型调用。
同步事件的核心在 langchain_core/callbacks/manager.py:254-320 的 handle_event。它遍历 handlers,先检查 ignore_condition_name,再调用对应事件方法;如果 on_chat_model_start 没实现,会回退到 on_llm_start;普通异常会记录 warning,只有 handler.raise_error 为真时才重新抛出。异步版本在 manager.py:366-420,如果事件方法是 coroutine 就 await;如果 handler.run_inline 为真就直接调用;否则放到 executor 里执行。
flowchart TD
A[事件触发] --> B{handler 是否 ignore}
B -->|是| C[跳过]
B -->|否| D{同步还是异步}
D -->|同步| E[handle_event]
D -->|异步| F[ahandle_event]
E --> G{异常}
F --> G
G -->|raise_error=false| H[记录 warning 继续]
G -->|raise_error=true| I[向上抛出]
父子传播也不是概念图。ParentRunManager.get_child 在 manager.py:564-583 创建新的 CallbackManager,把当前 run id 设为 parent_run_id,只复制 inheritable handlers、tags、metadata,并可追加局部 tag。这样一个 Chain 调 Tool、Tool 内部调 Retriever 时,trace 树能保留父子关系,但局部 handler 不会默认污染子树。
CallbackManager.on_chain_start 在 manager.py:1410-1453 展示了完整事件生命周期:没有 run_id 时生成 uuid,调用 handle_event 发出 on_chain_start,然后返回 CallbackManagerForChainRun,把 run_id、handlers、inheritable_handlers、parent_run_id、tags、metadata 都带上。异步 AsyncCallbackManager.on_chain_start 在 manager.py:1942-1975 做同样的事,只是调用 ahandle_event。
| 控制面 | 源码位置 | 工程意义 |
|---|---|---|
ignore_* |
handle_event 检查 ignore 条件 |
一个 handler 可只关心 LLM 或 Tool |
raise_error |
manager.py:305-313、407-415 |
可观测性默认不影响可用性 |
run_inline |
manager.py:376-387 |
异步 handler 可选择内联或 executor |
get_child |
manager.py:564-583 |
只传播 inheritable 状态,建立父子 run |
configure |
manager.py:1608-1628、2128-2144 |
多来源回调、标签、元数据汇总入口 |
这套控制面解释了为什么 callbacks 目录会膨胀到几千行。它要同时解决四个问题:事件接口覆盖所有组件,handler 故障不破坏主流程,trace 树能表达父子关系,同步和异步路径都能工作。少任何一层,LangSmith、终端日志、token 统计、前端事件流都会在复杂链路里断裂。
回调系统的设计也提醒我们:可观测性不是"事后加一个日志函数"。在 LangChain 这种组合式框架里,每个组件都可能嵌套调用另一个组件,日志必须天然带有父子关系;每个组件都可能同步或异步执行,事件处理必须同时覆盖两条路径;每个组件都可能用于生产请求,观测系统失败不能默认拖垮业务。
这就是 handlers 和 inheritable_handlers 分离的意义。顶层追踪器需要继承,因为它负责整棵调用树;局部调试器不一定继承,因为它可能只服务当前组件。标签和元数据也一样:项目编号、用户编号、会话编号通常应该沿调用树传播;某个局部实验标签则不应污染下游所有节点。
异常策略同样体现生产权衡。默认吞掉回调异常不是忽视错误,而是把主业务可用性放在第一位;需要强一致追踪的场景可以打开 raise_error。内联策略也不是性能微调,而是决定事件处理是否参与当前执行时序。审计、计费、事务型回调可能需要内联;普通日志、前端事件流、调试输出通常可以异步处理。
从使用者角度看,调试回调问题时可以按四步走:
- 事件有没有触发,对应组件是否调用了
on_*_start。 - handler 有没有被
ignore_*跳过。 - handler 是否被放到异步执行路径,导致时序和预期不同。
- 子调用是否通过
get_child()继承了正确的 handler、tag 和 metadata。
这四步比盲目打印日志有效,因为它们对应源码里的真实控制点。
还有一个常见误区,是把回调只理解成调试输出。实际上,同一套机制还承担计费统计、前端进度、审计记录、评估采样、实验标签传播。不同用途对可靠性的要求不同:计费和审计更接近强一致,前端进度更接近弱一致,调试输出可以丢失。LangChain 用同一套接口承载这些用途,所以必须提供跳过、吞错、内联、继承这些控制点,让使用者按场景调节。
这也解释了为什么回调和追踪分成两层。回调负责"事件发生时通知谁",追踪负责"把事件组织成一棵可查询的运行树"。如果只要打印日志,回调足够;如果要复盘一次复杂 Agent 为什么走到某个答案,就必须有追踪器维护父子关系、顺序关系和输入输出快照。
所以可观测性代码看起来啰嗦,是因为它承担的是事后解释权。线上问题发生后,工程团队需要知道请求从哪里进入、经过哪些模型和工具、哪一步失败、失败是否被吞掉、最终输出是否可信。回调和追踪系统就是为这些问题预先铺路。
没有这条路,复杂链路只能靠猜,排障成本会迅速失控,团队协作也会变慢。
12.10 小结
LangChain 的回调系统是一个精心分层的事件驱动架构。从底层的 Mixin 组合到顶层的事件流 API,每一层都有清晰的职责边界。
BaseCallbackHandler 通过六个 Mixin 的组合定义了完整的事件接口,并通过 ignore_* 属性和 raise_error/run_inline 标志提供了精细的控制能力。BaseCallbackManager 通过可继承/不可继承的双层处理器列表实现了回调的树状传播。CallbackManager.configure 方法是多源回调配置的合并入口,自动处理环境变量驱动的追踪器注入。
RunManager 系统将回调触发器与具体的运行上下文绑定,get_child() 方法实现了父子运行关系的自动传播。Tracer 系统在回调基础上构建了完整的运行追踪模型,_TracerCore 的 run_map 和 order_map 维护了运行的嵌套结构和全局排序。BaseTracer 的 _persist_run 只在根运行结束时调用,一次性持久化整棵调用树。
事件流 API(astream_events 和 astream_log)通过专用的追踪器将运行事件转化为可消费的异步流,为前端实时展示和中间状态监控提供了强大的基础设施。
物理骨架:callbacks/ 4677 行 + tracers/ 5057 行 = 9734 行——其中 manager.py 单文件 2697 行(全 callbacks 58%)+ event_stream.py 1100 行(全 tracers 22%)合计占两目录 39%——是理解整套可观测性架构的最小必读子集。