MCP 协议设计与实现
第11章 Python Client 实现剖析
第11章 Python Client 实现剖析
前面几章我们分析了 TypeScript SDK 的 Client 实现,对 MCP 客户端的职责有了清晰认知。本章我们将目光转向 Python SDK 的客户端实现。
Python SDK 在设计哲学上与 TypeScript SDK 保持一致——都围绕”会话”这一核心抽象展开,但在具体实现上却因语言特性而呈现出显著差异。Python SDK 选择了 anyio 作为异步运行时抽象层,用 Pydantic 取代 Zod 进行类型校验,用 async with 上下文管理器替代手动的生命周期管理。这些选择不是偶然的,它们深刻影响了客户端的 API 形态和内部架构。
本章要点
ClientSession/ClientSessionGroup两级抽象——单服务器会话 + 多服务器聚合- anyio + async with——结构化并发与资源生命周期的最佳实践
- 注册即声明——回调函数注册自动推断 Client capability
- 工具结果校验——Python SDK 独有,基于 JSON Schema 的运行时验证
- 三种传输层:stdio / SSE / Streamable HTTP 的统一流抽象
- OAuth2 认证:PKCE + TokenStorage + httpx 中间件
- 与 TS SDK 的 八个关键差异
11.1 客户端架构总览
Python MCP Client 的源码位于 src/mcp/client/ 目录下,核心文件包括:
session.py—ClientSession类,单服务器会话的核心抽象session_group.py—ClientSessionGroup类,多服务器聚合管理stdio.py— stdio 传输层实现sse.py— SSE 传输层实现streamable_http.py— Streamable HTTP 传输层实现websocket.py— WebSocket 传输层实现(Python 独有)auth/oauth2.py— OAuth2 客户端认证
下面这张架构图展示了各模块之间的关系:
graph TB
subgraph 应用层
APP[应用代码]
end
subgraph 会话层
CSG[ClientSessionGroup<br/>多服务器聚合]
CS[ClientSession<br/>单服务器会话]
end
subgraph 基础设施层
BS[BaseSession<br/>请求/响应路由]
end
subgraph 传输层
STDIO[stdio_client<br/>子进程通信]
SSE[sse_client<br/>SSE 长连接]
SH[streamable_http_client<br/>HTTP 流式]
WS[websocket_client<br/>全双工]
end
subgraph 认证层
OAUTH[OAuth2Auth<br/>PKCE + 自动刷新]
end
APP --> CSG
APP --> CS
CSG -->|管理多个| CS
CS -->|继承| BS
CS --> STDIO
CS --> SSE
CS --> SH
CS --> WS
SH -.->|可选| OAUTH
SSE -.->|可选| OAUTH
分层设计的三个核心选择
Python SDK 的分层反映了三个关键的设计哲学:
- 传输层完全抽象化——上层无需感知底层用的是 stdio 还是 HTTP,只看到
(read_stream, write_stream)对 - 会话分级——单会话 (
ClientSession) 管协议,多会话聚合 (ClientSessionGroup) 管应用层编排 - 资源管理结构化——从
AsyncExitStack到async with,Python 把”不泄漏资源”变成语言特性
11.2 ClientSession:单服务器会话
ClientSession 是 Python 客户端的核心类,它继承自 BaseSession,负责与单个 MCP Server 的全部通信。我们先看其类型签名:
class ClientSession(
BaseSession[
types.ClientRequest, # 发送的请求类型
types.ClientNotification, # 发送的通知类型
types.ClientResult, # 发送的结果类型
types.ServerRequest, # 接收的请求类型
types.ServerNotification, # 接收的通知类型
]
):
这个五元泛型参数看起来复杂,但逻辑清晰:前三个约束”我发出去的消息”,后两个约束”我收到的消息”。BaseSession 利用这些泛型参数在编译期保证消息类型的正确性。
11.2.1 构造与初始化
ClientSession 的构造函数接受读写流和一系列回调函数:
def __init__(
self,
read_stream: ReadStream[SessionMessage | Exception],
write_stream: WriteStream[SessionMessage],
read_timeout_seconds: float | None = None,
sampling_callback: SamplingFnT | None = None,
elicitation_callback: ElicitationFnT | None = None,
list_roots_callback: ListRootsFnT | None = None,
logging_callback: LoggingFnT | None = None,
message_handler: MessageHandlerFnT | None = None,
client_info: types.Implementation | None = None,
):
Protocol 定义回调类型
这些回调函数的设计值得关注。Python SDK 使用 Protocol 类来定义回调类型,这是 Python 式的结构化子类型(structural subtyping),等价于 TypeScript 的接口但更灵活:
class SamplingFnT(Protocol):
async def __call__(
self,
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData: ...
Protocol 的好处:
- 无需继承,任何形状匹配的函数/类都符合协议
- 便于用普通函数作为回调
- 支持类型检查器做静态分析
默认回调的”优雅失败”
每个回调都有默认实现。例如,当服务器发起 sampling 请求但客户端未注册回调时,默认返回错误:
async def _default_sampling_callback(
context: RequestContext[ClientSession],
params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
return types.ErrorData(
code=types.INVALID_REQUEST,
message="Sampling not supported",
)
这种设计避免了未注册回调时的崩溃——明确返回”不支持”比抛出 NotImplementedError 更符合协议规范。
注册即声明:能力推断
初始化握手通过 initialize() 方法完成。该方法会根据注册的回调函数自动推断客户端能力:
async def initialize(self) -> types.InitializeResult:
capabilities = types.ClientCapabilities(
experimental={},
roots=(
types.RootsCapability(listChanged=True)
if self._list_roots_callback is not _default_list_roots_callback
else None
),
sampling=(
types.SamplingCapability()
if self._sampling_callback is not _default_sampling_callback
else None
),
elicitation=(
types.ElicitationCapability()
if self._elicitation_callback is not _default_elicitation_callback
else None
),
)
# ... 发送 initialize 请求
- 如果注册了
sampling_callback,就声明支持 sampling 能力 - 如果注册了
list_roots_callback,就声明支持 roots 能力
这种”注册即声明”的设计避免了能力声明与实际实现不一致的问题。开发者只需要关心”我要不要支持这个功能”,不用单独维护 capability 声明。
11.2.2 协议操作方法
ClientSession 为 MCP 协议的每种操作提供了对应的异步方法:
| 方法 | 协议操作 | 返回类型 |
|---|---|---|
list_tools() | tools/list | ListToolsResult |
call_tool() | tools/call | CallToolResult |
list_resources() | resources/list | ListResourcesResult |
read_resource() | resources/read | ReadResourceResult |
list_prompts() | prompts/list | ListPromptsResult |
get_prompt() | prompts/get | GetPromptResult |
complete() | completion/complete | CompleteResult |
send_ping() | ping | EmptyResult |
subscribe_resource() | resources/subscribe | EmptyResult |
unsubscribe_resource() | resources/unsubscribe | EmptyResult |
以 call_tool 为例,它展示了 Python 客户端的一个独特特性——结构化内容校验:
async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
read_timeout_seconds: float | None = None,
progress_callback: ProgressFnT | None = None,
) -> types.CallToolResult:
result = await self.send_request(
types.CallToolRequest(
params=types.CallToolRequestParams(name=name, arguments=arguments),
),
types.CallToolResult,
request_read_timeout_seconds=read_timeout_seconds,
progress_callback=progress_callback,
)
if not result.is_error:
await self._validate_tool_result(name, result)
return result
_validate_tool_result 会根据工具的 output_schema 对返回的 structured_content 进行 JSON Schema 校验。
Schema 缓存机制
工具的 output schema 在 list_tools() 调用时被缓存到 _tool_output_schemas 字典中,后续的 call_tool 会自动利用这个缓存。
async def _validate_tool_result(self, name: str, result: types.CallToolResult):
# 1. 查缓存
schema = self._tool_output_schemas.get(name)
# 2. 缓存缺失时刷新
if schema is None and name not in self._tool_output_schemas:
# 只重试一次,避免无限循环
await self._refresh_tool_cache()
schema = self._tool_output_schemas.get(name)
# 3. 仍无 schema 说明工具不声明结构化输出,跳过
if schema is None:
return
# 4. 校验
try:
jsonschema.validate(result.structured_content, schema)
except jsonschema.ValidationError as e:
raise ToolOutputValidationError(
f"Tool {name} returned invalid output: {e.message}"
)
如果缓存中没有该工具的 schema(比如工具是后来动态添加的),会自动触发一次 list_tools() 刷新缓存。这种”lazy + cache”的组合既保证了正确性又避免了每次都查询。
11.2.3 服务器请求处理
MCP 是双向协议,服务器也会向客户端发起请求。_received_request 方法通过 Python 的 match/case 模式匹配来分发这些请求:
async def _received_request(self, responder):
ctx = RequestContext[ClientSession](
request_id=responder.request_id,
meta=responder.request_meta,
session=self,
)
match responder.request:
case types.CreateMessageRequest(params=params):
with responder:
response = await self._sampling_callback(ctx, params)
await responder.respond(response)
case types.ElicitRequest(params=params):
with responder:
response = await self._elicitation_callback(ctx, params)
await responder.respond(response)
case types.ListRootsRequest():
with responder:
response = await self._list_roots_callback(ctx)
await responder.respond(response)
case types.PingRequest():
with responder:
await responder.respond(types.EmptyResult())
注意 with responder 上下文管理器的使用。RequestResponder 实现了 __enter__/__exit__,在进入时设置取消作用域(CancelScope),在退出时通知会话该请求已处理完毕。这确保了即使回调函数抛出异常,请求的生命周期也能被正确管理。
match/case vs isinstance 链
Python 3.10+ 的 match/case 对 Pydantic 类型有特别优化——它不只是 isinstance 检查,还支持模式解构:
# match/case 风格:类型匹配 + 字段解构一气呵成
case types.CreateMessageRequest(params=params):
response = await self._sampling_callback(ctx, params)
# isinstance 风格:冗余
if isinstance(req, types.CreateMessageRequest):
params = req.params
response = await self._sampling_callback(ctx, params)
11.3 ClientSessionGroup:多服务器聚合
真实的 AI Agent 应用往往需要同时连接多个 MCP Server。ClientSessionGroup 正是为此设计的——它管理多个 ClientSession,并将所有服务器的 tools、resources、prompts 聚合到统一的命名空间中。
graph LR
subgraph ClientSessionGroup
TOOLS[聚合 Tools 字典]
RES[聚合 Resources 字典]
PROMPTS[聚合 Prompts 字典]
end
subgraph ServerA[Server A - 文件系统]
SA[ClientSession A]
TA[read_file, write_file]
end
subgraph ServerB[Server B - 数据库]
SB[ClientSession B]
TB[query, execute]
end
subgraph ServerC[Server C - Web 搜索]
SC[ClientSession C]
TC[search, fetch]
end
SA --> TA
SB --> TB
SC --> TC
TOOLS -->|路由| SA
TOOLS -->|路由| SB
TOOLS -->|路由| SC
11.3.1 连接管理与生命周期
ClientSessionGroup 实现了 async with 协议,用 AsyncExitStack 管理所有子会话的生命周期:
class ClientSessionGroup:
def __init__(
self,
exit_stack: contextlib.AsyncExitStack | None = None,
component_name_hook: _ComponentNameHook | None = None,
):
self._tools: dict[str, types.Tool] = {}
self._resources: dict[str, types.Resource] = {}
self._prompts: dict[str, types.Prompt] = {}
self._sessions: dict[ClientSession, _ComponentNames] = {}
self._tool_to_session: dict[str, ClientSession] = {}
if exit_stack is None:
self._exit_stack = contextlib.AsyncExitStack()
self._owns_exit_stack = True
else:
self._exit_stack = exit_stack
self._owns_exit_stack = False
这里有一个精妙的设计:exit_stack 可以外部传入,也可以内部创建。
| 场景 | exit_stack | owns | 生命周期 |
|---|---|---|---|
| 独立使用 | 内部创建 | True | async with 结束时关闭 |
| 嵌入到更大的资源管理 | 外部传入 | False | 由调用者控制 |
这使得 ClientSessionGroup 可以嵌入到更大的资源管理体系中——比如一个 Agent 框架可能有自己的全局 exit_stack,所有资源都统一管理。
两级 exit_stack 架构
仔细看 ClientSessionGroup 的字段声明(mcp/client/session_group.py:119-120),实际上维护着两级 exit_stack:
_exit_stack: contextlib.AsyncExitStack # 主 stack
_session_exit_stacks: dict[mcp.ClientSession, contextlib.AsyncExitStack] # 每会话 stack
两级设计的理由:
- 主
_exit_stack——管理 Group 整体的 entering/exiting。如果 Group owns 它、__aexit__时aclose();不 own 就让外部处理。 _session_exit_stacks[session]——每个ClientSession有自己独立的 exit stack,记录它的底层连接资源(stdio 进程、HTTP 连接、SSE 流)。
这让”断开一个 session 但保留其他 session”成为可能——disconnect_from_server(session) 只关这一个 session 的 exit_stack、其他 session 继续运行。如果只有一个共享 stack、就得全部关或都不关。
并发 shutdown 而非串行
看 __aexit__ 的真实实现(session_group.py:156-171):
async def __aexit__(self, _exc_type, _exc_val, _exc_tb) -> bool | None:
# Only close the main exit stack if we created it
if self._owns_exit_stack:
await self._exit_stack.aclose()
# Concurrently close session stacks.
async with anyio.create_task_group() as tg:
for exit_stack in self._session_exit_stacks.values():
tg.start_soon(exit_stack.aclose)
anyio.create_task_group() + tg.start_soon(exit_stack.aclose) 让所有 session 的关闭并发发生——不是 for: await aclose() 这样串行。对 N 个 server(每个关闭可能有 1-2 秒 TCP 或进程终止等待):
- 串行关闭:总耗时 ≈ N × 2 秒
- 并发关闭:总耗时 ≈ max(各自关闭时间) ≈ 2 秒
用户 async with 退出 Group 时等的秒数从 O(N) 降到 O(1)。对连了十几个 MCP server 的 Agent 而言、shutdown 从几十秒压到几秒。
disconnect_from_server 的 4 态检查
session_group.py:213-223 的 disconnect_from_server 实现:
async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
session_known_for_components = session in self._sessions
session_known_for_stack = session in self._session_exit_stacks
if not session_known_for_components and not session_known_for_stack:
raise MCPError(
code=types.INVALID_PARAMS,
message="Provided session is not managed or already disconnected.",
)
# ... 分别处理两个维度
两个独立维度:_sessions 记录”组件归属”、_session_exit_stacks 记录”资源所有权”。两者正常情况下同步更新、但边缘情况可能脱同步(比如 connect 过程中异常、一个 dict 已插入另一个还没)。
代码逻辑明确说”只要还在任一维度里就不是完全未知”——只有两者都没时才抛 INVALID_PARAMS。这让 disconnect 的幂等性更强——用户可以安全地重复调 disconnect_from_server(same_session),第二次会抛合适的错误而不是静默把已清理的字典再搅一遍导致 KeyError。
11.3.2 命名冲突:component_name_hook 的作用
同一个工具名(比如 search)被两个不同服务器暴露时会怎样?默认情况下后注册的会覆盖前者——这是 Python dict 的自然行为、但对 Agent 是灾难。
component_name_hook 参数(session_group.py:122-126)给用户一个主动命名空间化的钩子:
# Optional fn consuming (component_name, server_info) for custom names.
# This is to provide a means to mitigate naming conflicts across servers.
# Example: (tool_name, server_info) => "{result.server_info.name}.{tool_name}"
_ComponentNameHook: TypeAlias = Callable[[str, types.Implementation], str]
源码注释直接给了典型用法:
name_fn = lambda name, server_info: f"{server_info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
for params in server_params:
await group.connect_to_server(params)
这样 Server A 的 search 注册为 "ServerA_search"、Server B 的 search 注册为 "ServerB_search"——两者共存。
注意 call_tool 路由是按”外部名”走的(line 203):
async def call_tool(self, name, arguments=...):
session = self._tool_to_session[name] # 按外部名查 session
session_tool_name = self.tools[name].name # 取出 session 里的原名
return await session.call_tool(session_tool_name, ...)
_tool_to_session[name] 用外部(可能已重命名的)名做 lookup、self.tools[name].name 取出 session 内部的原名——因为 session 只认自己当初注册的 search、不认 ServerA_search。这种”外部统一命名 + 内部原名转发”的设计让 rename hook 只影响 Group 层、不扰动底层 session 协议。调试时注意两层之间的映射关系。
使用示例
连接新服务器通过 connect_to_server 完成:
async with ClientSessionGroup() as group:
# 连接第一个服务器
session_a = await group.connect_to_server(
StdioServerParameters(command="python", args=["-m", "my_server"])
)
# 连接第二个服务器
session_b = await group.connect_to_server(
StreamableHttpParameters(url="http://localhost:8080/mcp")
)
# 连接第三个服务器
session_c = await group.connect_to_server(
SseServerParameters(url="http://legacy.example.com/sse")
)
# 此时 group.tools 包含三个服务器的所有工具
print(f"Available tools: {list(group.tools.keys())}")
# 调用工具(自动路由到正确的服务器)
result = await group.call_tool("read_file", {"path": "/tmp/test.txt"})
传输层自动选择
_establish_session 方法展示了传输层的选择逻辑:
async def _establish_session(
self,
params: ServerParameters,
) -> tuple[ReadStream, WriteStream, contextlib.AsyncExitStack]:
session_stack = contextlib.AsyncExitStack()
match params:
case StdioServerParameters():
read, write = await session_stack.enter_async_context(
stdio_client(params)
)
case StreamableHttpParameters():
read, write, _ = await session_stack.enter_async_context(
streamablehttp_client(url=str(params.url), headers=params.headers)
)
case SseServerParameters():
read, write = await session_stack.enter_async_context(
sse_client(url=str(params.url), headers=params.headers)
)
return read, write, session_stack
每个会话都有独立的 AsyncExitStack,确保单个服务器断连不会影响其他会话。
11.3.2 命名冲突处理
当多个服务器提供同名工具时,ClientSessionGroup 默认会抛出错误。但它提供了 component_name_hook 机制来自定义命名策略:
# 给每个工具加上服务器名前缀
name_fn = lambda name, server_info: f"{server_info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
await group.connect_to_server(server_a_params) # tools: "serverA_read"
await group.connect_to_server(server_b_params) # tools: "serverB_read"
常见的命名策略:
{server}_{tool}— 服务器名前缀,最直观{tool}@{server}— 装饰符风格{tool}+ 版本号后缀 — 保持名称简洁- 自定义映射表 — 精细控制
原子性保证
聚合过程使用临时字典来保证原子性——如果聚合过程中任何一步失败,已有的聚合状态不会被污染:
async def _aggregate_components(
self,
server_info: types.Implementation,
session: ClientSession,
):
# 临时收集
prompts_temp: dict[str, types.Prompt] = {}
resources_temp: dict[str, types.Resource] = {}
tools_temp: dict[str, types.Tool] = {}
# 收集 tools
for tool in (await session.list_tools()).tools:
name = self._compute_name(tool.name, server_info)
tools_temp[name] = tool
# 收集 resources
for resource in (await session.list_resources()).resources:
...
# 收集 prompts
for prompt in (await session.list_prompts()).prompts:
...
# 检查冲突(在合并前)
matching_tools = tools_temp.keys() & self._tools.keys()
if matching_tools:
raise MCPError(
f"Tool name conflict: {matching_tools}. "
f"Use component_name_hook to namespace."
)
# 原子性地合并(只有全部检查通过才提交)
self._tools.update(tools_temp)
self._resources.update(resources_temp)
self._prompts.update(prompts_temp)
# 维护反向索引
for name in tools_temp:
self._tool_to_session[name] = session
11.3.3 动态断连
disconnect_from_server 支持运行时动态移除某个服务器。它通过 _ComponentNames 反向索引快速定位该会话注册的所有组件,逐一清理:
async def disconnect_from_server(self, session: ClientSession) -> None:
# 取出该会话注册的所有组件名
component_names = self._sessions.pop(session)
# 从聚合字典中删除
for name in component_names.tools:
del self._tools[name]
del self._tool_to_session[name]
for name in component_names.resources:
del self._resources[name]
for name in component_names.prompts:
del self._prompts[name]
# 关闭该会话的 exit_stack(不影响其他会话)
session_stack = self._session_exit_stacks.pop(session)
await session_stack.aclose()
这个设计支持热插拔——Agent 运行过程中可以动态加入或移除 MCP Server,无需重启。
11.4 传输层:四种连接方式
Python SDK 提供了四种传输层实现,它们都遵循相同的模式——作为异步上下文管理器,yield 出 (read_stream, write_stream) 元组。
11.4.1 stdio 传输
stdio_client 通过 anyio.open_process 启动子进程,将子进程的 stdin/stdout 包装为 MCP 消息流:
@asynccontextmanager
async def stdio_client(server: StdioServerParameters):
process = await anyio.open_process(
[server.command, *server.args],
env=server.env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 创建流
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def stdout_reader():
buffer = ""
async for chunk in TextReceiveStream(process.stdout):
lines = (buffer + chunk).split("\n")
buffer = lines[-1]
for line in lines[:-1]:
if line:
message = types.jsonrpc_message_adapter.validate_json(line)
await read_stream_writer.send(SessionMessage(message))
async def stdin_writer():
async for session_msg in write_stream_reader:
json_str = session_msg.message.model_dump_json()
await process.stdin.send((json_str + "\n").encode())
async with anyio.create_task_group() as tg:
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
try:
yield read_stream, write_stream
finally:
await _graceful_shutdown(process)
Graceful Shutdown
关闭时遵循 MCP 规范的 graceful shutdown 序列:
async def _graceful_shutdown(process: Process) -> None:
# 1. 关闭 stdin,让进程知道"没有更多输入"
await process.stdin.aclose()
# 2. 等待进程自行退出
try:
with anyio.fail_after(10):
await process.wait()
return
except TimeoutError:
pass
# 3. 发送 SIGTERM
process.terminate()
try:
with anyio.fail_after(5):
await process.wait()
return
except TimeoutError:
pass
# 4. 最后手段:SIGKILL
process.kill()
await process.wait()
这种”三段式关闭”是 Unix 进程管理的标准实践——先温和,再严厉,最后暴力。
11.4.2 SSE 与 Streamable HTTP 传输
SSE 传输使用 httpx + httpx-sse 库建立长连接。Streamable HTTP 传输是 SSE 的演进版本,支持双向流式通信,并引入了 mcp-session-id 头来维护会话状态。两者都可以通过 OAuth2Auth 中间件进行认证。
11.4.3 WebSocket 传输
Python SDK 独有的 WebSocket 实现(详见第 14 章),提供真正的全双工通信。
11.4.4 传输参数模型
session_group.py 中定义了四种传输参数的 Pydantic 模型,统一为 ServerParameters 类型别名:
ServerParameters: TypeAlias = (
StdioServerParameters | SseServerParameters | StreamableHttpParameters
)
这使得 connect_to_server 可以接受任意一种参数,内部通过 isinstance 分发到对应的传输层。
11.5 OAuth2 客户端认证
Python SDK 的 OAuth2 实现位于 client/auth/oauth2.py,支持 Authorization Code + PKCE 流程。
PKCE 参数生成
PKCEParameters 类封装了 code_verifier/code_challenge 的生成:
class PKCEParameters(BaseModel):
code_verifier: str = Field(..., min_length=43, max_length=128)
code_challenge: str = Field(..., min_length=43, max_length=128)
@classmethod
def generate(cls) -> "PKCEParameters":
code_verifier = "".join(
secrets.choice(string.ascii_letters + string.digits + "-._~")
for _ in range(128)
)
digest = hashlib.sha256(code_verifier.encode()).digest()
code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
return cls(code_verifier=code_verifier, code_challenge=code_challenge)
TokenStorage 接口
认证流程通过 TokenStorage 协议实现 token 的持久化存储:
class TokenStorage(Protocol):
async def get_tokens(self) -> OAuth2Token | None: ...
async def set_tokens(self, tokens: OAuth2Token) -> None: ...
async def get_client_info(self) -> OAuth2ClientInfo | None: ...
async def set_client_info(self, info: OAuth2ClientInfo) -> None: ...
应用可以自定义存储后端:
- 文件(开发环境)
- 系统密钥链(Keychain/Credentials Manager)
- 数据库(多用户场景)
- 内存(测试)
httpx 中间件集成
OAuth2Auth 作为 httpx 的 Auth 中间件,自动在请求中注入 Bearer token 并处理 token 刷新:
class OAuth2Auth(httpx.Auth):
def auth_flow(self, request: httpx.Request):
# 1. 尝试用现有 token
tokens = self._tokens
if tokens and not tokens.is_expired:
request.headers["Authorization"] = f"Bearer {tokens.access_token}"
response = yield request
# 2. 401 时尝试刷新
if response.status_code == 401:
yield from self._refresh_flow(request)
else:
# 3. 没有 token 或已过期
yield from self._initial_auth_flow(request)
11.6 与 TypeScript Client 的核心差异
通过对比两个 SDK 的客户端实现,可以提炼出以下关键差异:
| 维度 | Python SDK | TypeScript SDK |
|---|---|---|
| 异步运行时 | anyio(兼容 asyncio/trio) | 原生 async/await |
| 生命周期管理 | async with + AsyncExitStack | 手动 connect()/close() |
| 类型校验 | Pydantic TypeAdapter | Zod schema |
| 回调类型 | Protocol(结构化子类型) | TypeScript 函数类型 |
| 消息分发 | match/case 模式匹配 | switch/case |
| 多服务器 | 内置 ClientSessionGroup | 需自行实现 |
| 工具结果校验 | 内置 _validate_tool_result | 无内置校验 |
| 传输层切换 | isinstance + 上下文管理器 | Transport 接口 |
| WebSocket | ✅ 支持 | ❌ 不支持 |
最关键的三点差异
差异一:生命周期管理
Python 的 async with 将资源获取与释放绑定在语法结构上,不可能忘记关闭连接:
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(...)
# 退出 block 时自动清理,无论是否抛异常
TypeScript 需要开发者自觉调用 close(),或使用 try/finally:
const client = new Client(...);
try {
await client.connect(transport);
const result = await client.callTool(...);
} finally {
await client.close(); // 容易忘
}
AsyncExitStack 的嵌套使用更是 Python 独有的模式——ClientSessionGroup 用主 exit_stack 管理所有子会话的 exit_stack,形成资源管理的树状结构。
差异二:多服务器聚合
ClientSessionGroup 是 Python SDK 独有的抽象。它不仅管理连接,还提供了:
- 组件聚合(tools/resources/prompts 统一命名空间)
- 命名冲突检测
- 动态断连
- 原子性保证
TypeScript SDK 没有对应的内置实现,开发者需要自行管理多个 Client 实例。这也是 MCP 框架层的机会——很多 Agent 框架(如 LangChain)在 TypeScript 侧重新实现了类似功能。
差异三:工具结果校验
Python 的 call_tool 在返回结果前会自动校验 structured_content 是否符合工具声明的 output_schema。这利用了 jsonschema 库进行运行时校验,为 Agent 应用提供了额外的安全保障。
# Python:自动校验
result = await session.call_tool("calc", {"a": 1, "b": 2})
# 如果 result.structured_content 不符合 schema,抛异常
// TypeScript:开发者自行校验
const result = await client.callTool("calc", { a: 1, b: 2 });
// 需要手动用 zod/ajv 校验 result.structuredContent
11.7 实战:构建一个多 MCP Server 的 Agent
下面展示一个真实场景——Agent 同时连接文件系统、数据库、Web 搜索三个 MCP Server:
import anyio
from mcp import ClientSessionGroup
from mcp.client.session_group import (
StdioServerParameters,
StreamableHttpParameters,
)
async def run_agent():
# 命名策略:用服务器名前缀避免冲突
name_fn = lambda name, info: f"{info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
# 文件系统 MCP Server(本地 stdio)
fs_session = await group.connect_to_server(
StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
)
)
# 数据库 MCP Server(本地 stdio)
db_session = await group.connect_to_server(
StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-postgres", "postgres://..."],
)
)
# Web 搜索 MCP Server(远程 HTTP)
web_session = await group.connect_to_server(
StreamableHttpParameters(
url="https://search.example.com/mcp",
headers={"Authorization": f"Bearer {API_KEY}"},
)
)
print(f"Connected. Tools: {list(group.tools.keys())}")
# 输出: ['filesystem_read_file', 'filesystem_write_file',
# 'postgres_query', 'postgres_execute',
# 'search_web', 'search_fetch']
# 主 Agent 循环
while True:
user_input = input("You: ")
if user_input == "exit":
break
# 让 LLM 决定调用哪个工具
decision = await llm_decide_tool(user_input, group.tools)
if decision.tool_name:
result = await group.call_tool(
decision.tool_name,
decision.arguments,
)
print(f"Tool result: {result.content}")
else:
print(f"Bot: {decision.text}")
anyio.run(run_agent)
这个模式的威力:
- 统一调用接口——不管工具在哪个 Server,都是
group.call_tool() - 动态扩展——可以运行时加入新 Server
- 故障隔离——单个 Server 挂掉不影响其他
11.8 四个反模式
反模式一:忘记 async with
# ❌ 泄漏连接
session = ClientSession(read, write)
await session.initialize()
# ... 忘了关闭
# ✅ 使用 async with
async with ClientSession(read, write) as session:
await session.initialize()
# 自动关闭
反模式二:并发调用同一个 session 不加锁
MCP session 不是完全并发安全的——虽然发送是安全的,但响应需要配对。大量并发可能让请求 ID 混淆。
对策:用 anyio.Lock 保护关键操作,或限制每个 session 的并发度。
反模式三:硬编码 sampling 回调
# ❌ 回调里直接调 OpenAI,忽略用户的实际 LLM
async def sampling_cb(ctx, params):
return await openai.chat.create(...)
# ✅ 根据配置路由到用户选择的 LLM
async def sampling_cb(ctx, params):
provider = get_user_llm_provider()
return await provider.complete(params)
反模式四:不处理 Server 重启
现象:MCP Server 重启后 session 失效,但 Agent 不知道,继续发请求全部超时。
对策:
- 实现心跳检测(
send_ping()) - 捕获 session 失效错误,自动重连
- 使用
ClientSessionGroup的断连通知机制
11.8.5 mcp/client/ 目录的真实结构
mcp-python-sdk 的 src/mcp/client/ 实测 2508 行——分两块:
| 文件 / 子目录 | 行 | 角色 |
|---|---|---|
session.py | 480 | ClientSession 单服务器会话(章节主线) |
session_group.py | 410 | ClientSessionGroup 多服务器聚合 |
client.py | 308 | Client 高层 wrapper(dataclass、章节没明确介绍) |
streamable_http.py | 588 | Streamable HTTP transport(最复杂) |
stdio.py | 270 | stdio transport |
sse.py | 160 | SSE transport |
websocket.py | 85 | WebSocket transport |
_memory.py | 78 | InMemoryTransport(仅供测试 / Client 直连 server 用) |
_transport.py | 21 | Transport ABC base 接口 |
context.py | 16 | client-side request context |
auth/oauth2.py | 646 | 整个 client 最大文件——OAuth 2.1 完整实现 |
auth/utils.py | 339 | OAuth 工具(PKCE、metadata 发现等) |
auth/extensions/client_credentials.py | 485 | 客户端凭据 grant type 扩展 |
两个值得注意的尺寸事实——
1. auth/oauth2.py 646 行 + utils 339 + extensions 485 = 1470 行 OAuth 代码——比 session + session_group 加起来还多(480+410=890)。Python SDK 在 OAuth 上的投入和 TS SDK 一样夸张——和本书第 9 章 §9.11 提到的 TS SDK auth.ts 1745 行相互印证:OAuth 2.1 完整实现就是 1500-1700 行——任何 SDK 都跑不掉。
2. streamable_http.py 588 行 vs stdio.py 270 行 vs sse.py 160 行——Streamable HTTP 是最复杂的 transport。原因是它要支持:(a) JSON 和 SSE 两种响应模式自适应;(b) Last-Event-ID 重连重放;(c) DELETE 主动关 session;(d) 普通 POST + GET-for-stream 两种模式。三种功能叠加导致代码量是 stdio 的 2 倍多。
11.8.6 Client 高层 wrapper(client.py 308 行)
章节的 ClientSession 是底层 API、需要手动管理 transport——Client 是 dataclass-based 的高层 wrapper:
@dataclass
class Client:
server: Server | MCPServer | Transport | str # ← 自动识别 4 种入参类型
raise_exceptions: bool = False
read_timeout_seconds: float | None = None
sampling_callback: SamplingFnT | None = None
list_roots_callback: ListRootsFnT | None = None
elicitation_callback: ElicitationFnT | None = None
async def __aenter__(self): ...
async def __aexit__(self): ...
server 字段的多态识别——__post_init__ 里——
isinstance(server, Server | MCPServer)→ 包成InMemoryTransport(测试 / 嵌入用)isinstance(server, str)→ URL →streamable_http_clienttransportisinstance(server, Transport)→ 直接用
这种”四 in 一”构造——让最常见的两种用法(连 URL / 直接嵌 server 测试)一行代码搞定:
# 测试嵌入
async with Client(my_mcp_server) as client:
await client.call_tool("add", {"a": 1, "b": 2})
# 生产 HTTP
async with Client("https://api.example.com/mcp") as client: ...
# 自定义 transport
async with Client(my_stdio_transport) as client: ...
Client vs ClientSession 的关系——
ClientSession—— 协议层、必须自己拿到 read/write streamsClient—— 应用层、自动管理 transport + AsyncExitStack- 99% 用户应该用
Client、不用ClientSession—— 章节正文偏向后者是因为讲底层架构、但生产代码默认Client
对照 TS SDK——TS 没有等价的”四态构造”Client——TS 用户必须先 new Client() 再 client.connect(transport)。Python 的 dataclass + __post_init__ 让这种多态构造极简洁——和 §10.10.8 讨论过的 Tool.from_function 是同源思路(类型推断 + Pydantic/dataclass 让 Python SDK 比 TS SDK 更紧凑)。
11.8.7 核心类的源码账本(截至 python-sdk main 分支 2026-04-22)
前面把 ClientSession、ClientSessionGroup、Client、各 transport 和 OAuth 穿了一遍。这里把真实行数与函数签名以账本形式再归拢一遍——便于读者对着源码调试时按图索骥,也便于和 TS SDK 跨章对照。所有数据取自 github.com/modelcontextprotocol/python-sdk 的 src/mcp/client/ 目录。
11.8.7.1 session.py(536 行)的 11 个公开方法签名
| 方法 | 协议操作 | 关键 kwargs | 返回 |
|---|---|---|---|
__init__ | — | 7 个 callback + client_info + experimental_task_handlers(新增) | — |
initialize() | initialize | 无 | InitializeResult |
send_ping(*, meta=None) | ping | meta | EmptyResult |
list_tools(*, params=None) | tools/list | PaginatedRequestParams(分页游标) | ListToolsResult |
call_tool(name, arguments, read_timeout_seconds, progress_callback, *, meta=None) | tools/call | progress_callback 支持流式进度 | CallToolResult |
list_resources(*, params=None) | resources/list | 同上 | ListResourcesResult |
read_resource(uri, *, meta=None) | resources/read | meta | ReadResourceResult |
subscribe_resource(uri, *, meta=None) | resources/subscribe | meta | EmptyResult |
unsubscribe_resource(uri, *, meta=None) | resources/unsubscribe | meta | EmptyResult |
list_prompts(*, params=None) | prompts/list | 分页 | ListPromptsResult |
get_prompt(name, arguments, *, meta=None) | prompts/get | arguments: dict[str, str] | GetPromptResult |
complete(ref, argument, context_arguments=None) | completion/complete | context_arguments 是 2025 spec 才加的——用于多级补全场景 | CompleteResult |
send_progress_notification(token, progress, total, message, *, meta=None) | notifications/progress | message 是 2025 新增 | None |
_received_request(responder) | 入站分发 | match/case 派发 CreateMessageRequest / ElicitRequest / ListRootsRequest / PingRequest | — |
三个容易被忽视的签名细节——
call_tool的progress_callback不是 kwargs-only——位置参数可以传,这是为了和旧版兼容。但新代码推荐用 kwargs 写法,避免把float误传到progress_callback位置。complete(...)的context_arguments是 2025 spec 增补——用来在 prompt argument 自动补全时传”已填字段”作为上下文,让服务器算出更精准的候选。老服务器收到会忽略。experimental_task_handlers(__init__的 kwargs-only 参数)——2026 年初并入 main 分支,承载 streaming task 协议扩展,日后可能升级为稳定 API,现在先走experimental命名空间。
11.8.7.2 session_group.py(471 行)的函数与行号对照
| 成员 | 行号 | 职责 |
|---|---|---|
ServerParameters TypeAlias | 46 | Stdio | Sse | StreamableHttp 三选一 |
_ComponentNames dataclass | ~75 | 反向索引:某 session 注册了哪些 tool/resource/prompt |
_session_exit_stacks: dict[ClientSession, AsyncExitStack] | 103 | 每会话一个 stack(第二级) |
__init__ | 107-125 | 可选外部 exit_stack + component_name_hook |
__aenter__ | 127-131 | 纯返回 self,真工作在 connect_to_server |
__aexit__ | 133-148 | 并发关 stack(anyio task group) |
disconnect_from_server | 169-196 | 单 session 热拔 |
connect_to_server | 184-191 | 新建会话并聚合 |
_establish_session | 193-243 | 按 ServerParameters 类型分发到对应 transport |
_aggregate_components | 245-305 | 临时字典 + 冲突检测 + 原子合并 |
_aggregate_components 长达 60 行——它把”收 tools / 收 resources / 收 prompts / 检测冲突 / 合并 / 更新反向索引”六个阶段串在一起。一半的代码量是为了原子性——临时字典不是写着好看的,是因为任一阶段抛异常都要能让 Group 回到聚合前的状态。
11.8.7.3 client/ 目录总行数分布
| 文件 | 行 | 说明 |
|---|---|---|
session.py | 536 | 主文件(比本书前文给的 480 稍大,说明 2026 年又加了几十行 experimental 代码) |
session_group.py | 471 | 比前文给的 410 多 60 行 |
client.py | 308 | 高层 Client wrapper |
streamable_http.py | 671 | 最大的 transport——SSE 模式自适应 + Last-Event-ID 重连 |
stdio.py | 356 | Unix/Windows 分支 + 三段 shutdown |
auth/oauth2.py | 646 | 单文件最大 |
auth/utils.py | 339 | PKCE / metadata 发现工具 |
auth/extensions/client_credentials.py | 485 | client_credentials grant type 扩展 |
sse.py | 160 | 最老的 HTTP transport |
websocket.py | 85 | Python 独有全双工 |
_memory.py | 78 | InMemoryTransport(测试和 Client(server) 直连用) |
_transport.py | 21 | 抽象基类 |
context.py | 16 | client-side RequestContext 薄封装 |
三条可以拿来和 TS SDK 互校的结论——
- Python
auth/总计 1470 行(646+339+485);TS SDK 的auth.ts单文件就 1745 行(本书第 9 章 §9.11 实测)。OAuth 2.1 完整实现 1500-1800 行是行业基线——Python 用 3 个文件拆分、TS 挤在一个文件里,总量基本一致。 streamable_http.py671 行是最大 transport——大约是sse.py(160)的 4 倍多、stdio.py(356)的 1.9 倍。复杂度主要来自三件事:(a)响应体 JSON vs SSE 自适应;(b)Last-Event-ID断线续传;(c)DELETE主动销毁 session。本书第 13 章会把这三件事的工程细节展开讲。_memory.py才 78 行——它是Client(server)构造器里”直接传 Server 实例”那一路用的。对应 TS SDK 里的InMemoryTransport——TS 版本用来写单元测试很方便,Python 版本则进一步被Client吸收成”零配置嵌入模式”。
11.8.8 Transport 默认值与超时表(实测源码)
transport 层散落着许多默认值,这些值一旦误解会直接导致生产故障。这里把最重要的 11 个默认值做成账本,全部基于 src/mcp/client/ 的现行代码。
| 参数 | 值 | 出处 | 触发条件 |
|---|---|---|---|
stdio_client 内存流 buffer size | 0 | stdio.py:109-111 | 每次 send 都必须有 receiver 就绪,天然背压,慢消费者会拖慢生产者 |
PROCESS_TERMINATION_TIMEOUT | 2.0 秒 | stdio.py:9 | 关 stdin 后等子进程自行退出的窗口;超时后升级到 terminate() |
stdio terminate() 超时 | 5 秒 | stdio.py shutdown 段 | terminate() 后再给子进程 5 秒反应;超时升级到 kill() |
stdio_client(errlog=...) 默认 | sys.stderr | stdio.py:105-106 | 子进程 stderr 直接冒到父进程 stderr——调试方便,生产里可能要重定向到日志文件 |
stdio 启动目录 cwd 默认 | None(继承父进程) | StdioServerParameters 定义 | 生产里最好显式设——避免把 server 绑到 Agent 的工作目录 |
stdio 环境变量 env 默认 | get_default_environment() 白名单 | stdio.py | 不是继承全部 env——只转发 PATH / HOME / USER 等安全白名单,防止 ANTHROPIC_API_KEY 等泄漏给子进程 |
streamable_http_client.terminate_on_close | True | streamable_http.py | 退出上下文时发 DELETE /mcp——生产里如果 server 无状态可设 False 避免 unnecessary 请求 |
MCP_SESSION_ID header | mcp-session-id | streamable_http.py | 小写——注意反代不要把它改成 Mcp-Session-Id(HTTP header 大小写不敏感但代码里是字面量) |
| SSE 响应 content-type 判别 | text/event-stream | streamable_http.py _handle_sse_response | 非此 content-type 就走 JSON 分支——反代给 SSE 响应压了 gzip 导致 content-type 变化是常见踩坑 |
ClientSession.read_timeout_seconds | None(无超时) | session.py:__init__ | 不设超时的 send_request 会永久挂起——生产必须设,比如 60s |
OAuth2Auth.auth_flow 401 → refresh | 1 次 | auth/oauth2.py | 401 自动刷 token 重试一次;再 401 就抛——避免无限循环 |
11.8.8.1 get_default_environment() 白名单为什么重要
stdio.py 不把父进程全部 os.environ 塞给子进程、而是默认白名单过滤——macOS 下白名单包含 HOME、LOGNAME、PATH、SHELL、TERM、USER,其他一律丢弃。
这是一次有意识的安全设计——子进程是第三方 MCP server、可能是不信任代码。如果把 ANTHROPIC_API_KEY / OPENAI_API_KEY / AWS_ACCESS_KEY_ID 等全部透传过去、server 一行 os.environ.get("ANTHROPIC_API_KEY") 就拿走你的密钥。TS SDK 的 StdioClientTransport 也做了同样的过滤(本书第 9 章 §9.10 讨论过)——这是两个 SDK 难得完全一致的安全默认。
需要给子 server 传敏感凭据时,显式写 env={"MY_TOKEN": "xxx"}——明确同意而不是默认透传。
11.8.8.2 read_timeout_seconds 不设值就是永久挂起
ClientSession 的 read_timeout_seconds 默认 None 这件事每个生产项目都会踩一次——
- 本地开发测试:server 一切正常、call_tool 毫秒级返回、没人注意超时
- 部署到生产:某个 server 某次调用慢了或死锁、
await session.call_tool(...)永远不返回 - Agent 主循环卡死、整个 user request 挂在 HTTP gateway 上
对策——
- 初始化时显式传
ClientSession(..., read_timeout_seconds=60)——全局兜底 - 单次调用临时放宽/收紧:
session.call_tool(name, args, read_timeout_seconds=5)——快工具严查、慢工具放宽 - 不要写
read_timeout_seconds=None这种”显式永久等”——要么是 bug、要么加注释说明为什么刻意这么做
和本书第 4 章 §4.6 的”客户端侧超时分层”相互印证——超时不是一个值、是一组值(transport 级 / session 级 / 单次调用级)逐层覆盖。
11.8.9 跨章校对:Python Client 在 MCP 整本书里的位置
本章内容不是孤立的。Python Client 的每一块实现都能在本书其他章找到对应的协议规范或服务端镜像。下面这张表把五对互查点列出来——读者发现本章某段描述和其他章有出入时、应该以最新的章为准、并回头修旧章。
| 主题 | 本章位置 | 协议规范章 | Server 镜像 / 兄弟 SDK |
|---|---|---|---|
initialize 能力协商 | §11.2.1 | 第 4 章 §4.3 initialize 握手 | 第 10 章 §10.4 Python Server 端能力声明 |
tools/call + output schema 校验 | §11.2.2 _validate_tool_result | 第 5 章 §5.5 outputSchema 字段含义 | 第 10 章 §10.6 Python Server 声明 output schema |
resources/list + resources/subscribe | §11.2.2 签名表 | 第 6 章 §6.4 resource 订阅/变更通知 | 第 10 章 §10.7 Server 广播 resources/list_changed |
CreateMessageRequest(sampling 反向调用) | §11.2.3 _received_request 分发 | 第 17 章 §17.2 sampling 协议 | 第 9 章 §9.7 TS Client 对称实现 |
| stdio transport graceful shutdown | §11.4.1 三段式关闭 | 第 12 章 §12.5 stdio 生命周期 | 第 9 章 §9.9 TS stdio transport(用 signal + exit code) |
Streamable HTTP mcp-session-id | §11.4.2 + §11.8.8 默认值表 | 第 13 章 §13.4 session 维护 | 第 10 章 §10.8 Python Server 生成 session ID |
| OAuth2 PKCE + TokenStorage | §11.5 | 第 15 章 §15.3 MCP OAuth 2.1 profile | 第 10 章 §10.11 Python Server OAuth metadata 发布 |
ClientSessionGroup 多服务器聚合 | §11.3 | — MCP 规范没有 group 概念,这是 SDK 层扩展 | TS SDK 不提供、需 Agent 框架自行实现 |
11.8.9.1 Client 与 Server 的”镜像关系”
每个 Client 方法都有一个 Server 端处理函数镜像——
session.call_tool("add", ...)↔ Server 端@server.tool()装饰的add()函数(见第 10 章 §10.5.3 的 FastMCP 装饰器链)session.subscribe_resource("file:///log")↔ Server 端resource_subscriptions: set[str]记账 +send_resource_updated()推送- 客户端
sampling_callback↔ Server 端ctx.session.create_message(...)发起的反向请求
这种对称性意味着——调试 Client bug 时、经常要同时开两端的日志对着看。本书第 10 章 §10.12 给出了”client_debug=true + server_debug=true” 双端日志模板,建议收藏。
11.8.9.2 和 TS SDK 的互查
本章 §11.6 已经列过 9 项差异表。以下三条是最容易混淆的点、读者在 Python 和 TS 之间切换时要留意——
async withvs 手动connect/close——Python 用户忘记会触发语法错误、TS 用户忘记会静默泄漏 socket 或子进程。本书第 9 章 §9.6 给出了 TS 的try/finally最佳实践。list_tools()的返回类型——Python 返回ListToolsResult(驼峰字段反序列化为 snake_case:next_cursor);TS 返回{ tools, nextCursor }(camelCase)。混用两端代码时的 JSON schema 别名要留心。- 分页参数位置——Python 放在 kwargs-only 的
params=PaginatedRequestParams(cursor=...);TS 直接第二参数传{ cursor: "..." }。TS 更紧凑、Python 更类型安全——没有哪个更好。
11.8.9.3 ClientSessionGroup 是”SDK 层创新”而非协议规范
特别要指出的是——ClientSessionGroup 不是 MCP 规范的一部分。规范只讲单 client 单 server 的协议;多服务器聚合是 Python SDK 提供的上层工具。这带来两个工程后果——
- 好:换了 SDK 版本、
ClientSessionGroupAPI 可能变、协议本身不会变——底层ClientSession始终稳定。 - 需要注意:其他语言的 SDK(TS / Go / Rust)不一定有对应抽象——跨语言 Agent 框架得自己实现聚合层。LangChain MCP adapter、LangGraph、crewai 各家都有自己的多 server 管理代码——它们解决的是同一个问题、但没有统一接口。
这也揭示了 MCP 生态下一步的可能走向——“多服务器聚合层”是否要进规范?目前看 2026 年上半年的 spec 讨论还没涵盖、仍留在 SDK 侧。如果未来真的进规范、本书这一节会第一时间更新。
11.8.10 _received_request 的 match/case 分发路径全图
session.py 的 _received_request 是 Client 端最密集的状态机——它接收服务器反向发来的所有请求,按类型路由到对应 callback。下面把所有可能的入站请求列齐,并标明每个分支的默认行为。
| 入站请求类型 | 协议名 | 默认回调名 | 未注册时行为 | 典型用途 |
|---|---|---|---|---|
CreateMessageRequest | sampling/createMessage | _default_sampling_callback | 返回 ErrorData(INVALID_REQUEST, "Sampling not supported") | 第 17 章的 sampling 协议——server 借 client 的 LLM |
ElicitRequest | elicitation/create | _default_elicitation_callback | 返回 ErrorData(..., "Elicitation not supported") | 第 18 章——server 向用户发起结构化提问 |
ListRootsRequest | roots/list | _default_list_roots_callback | 返回 ErrorData(..., "List roots not supported") | 第 18 章——server 请求当前工作空间列表 |
PingRequest | ping | 内置 EmptyResult() | 直接应答、不经 user 代码 | 第 4 章的心跳 |
11.8.10.1 四个”协议规定的入站请求”
MCP 2025 spec 里 server → client 只有上面四种 request——其他都是 notification 或 response。这意味着 _received_request 的 match/case 只需要覆盖 4 个 case,理论上永远不会走到 case _: 兜底——如果走到了要么是 client SDK 版本滞后于 server、要么是 server 发了协议外消息。
实际代码里有 case _:——返回 INVALID_REQUEST 错误。这是正确的降级策略——未知请求不是让 Client 崩溃、而是礼貌回”不认识”、让 server 自己决定怎么处理。
11.8.10.2 默认回调的”三字错误”设计
仔细看四个 _default_*_callback 的错误消息——
"Sampling not supported"
"Elicitation not supported"
"List roots not supported"
都是 <功能名> not supported 三词格式——这不是偶然。这是 SDK 给 server 开发者的机器可读暗示:
- 错误码固定
INVALID_REQUEST(-32600) - 错误消息可以用字符串匹配判断是”能力未声明”还是”能力声明了但执行失败”
- server 侧可以写
if "not supported" in err.message: fallback()
更严格的做法是 spec 用专门 error code(比如 -32004 “Capability Not Declared”)——但目前 spec 没规定、SDK 层约定俗成。本书第 4 章 §4.8 讨论过 JSON-RPC 错误码在 MCP 里的扩展,这里是一个”约定先于规范”的典型。
11.8.10.3 with responder: 为什么是同步上下文管理器
读源码会注意到——match 里每个 case 都写 with responder:(同步 with、不是 async with)。这是因为 RequestResponder.__enter__ 只做同步的 CancelScope 设置、不需要异步资源:
__enter__— 创建anyio.CancelScope并 enter;如果后续 await 超时、这个 scope 可以整体取消正在执行的 callback__exit__— exit scope、把这个 responder 从 session 的”待处理请求”集合里移除、允许 GC
为什么这个设计很重要——假设 client 正在处理 server 发来的 sampling/createMessage(调 LLM 可能要 30 秒)、中途 server 想取消(发 notifications/cancelled)——session 可以 responder._cancel_scope.cancel()、让正在 await 的 LLM 调用立刻抛 CancelledError、回收资源。
如果不用 with responder: 包住、cancel scope 就建不起来、server 发的 cancel 通知 client 就吃不到——只能等 callback 自然完成。这是 MCP 双向取消协议在 client 侧的实现基础(第 4 章 §4.9 讨论过 cancellation 的 wire 格式、此处是 callback 侧如何响应)。
11.8.11 ClientSession 的消息处理循环(_receive_loop 与 _received_notification)
前面讲了”发请求” call_tool 和”收请求” _received_request——但 MCP 还有第三种消息:server 发来的 notification(通知、无需应答)。处理入口是 _received_notification。
11.8.11.1 客户端关心的 notification 类型
| Server → Client notification | 含义 | 默认 callback |
|---|---|---|
notifications/message | 结构化日志 | _default_logging_callback(no-op) |
notifications/resources/updated | 某 resource 内容变了 | 传给 user message_handler |
notifications/resources/list_changed | 资源列表变更 | 同上 |
notifications/tools/list_changed | 工具列表变更 | 同上 |
notifications/prompts/list_changed | 提示词列表变更 | 同上 |
notifications/progress | 长耗时操作进度 | 路由到对应 progress_callback |
notifications/cancelled | server 主动取消 | 取消对应请求的 CancelScope |
11.8.11.2 message_handler 的兜底地位
注意——四个 *_list_changed 通知没有专门 callback。它们默认都路由到 message_handler(ClientSession.__init__ 的参数)——一个”什么都收”的兜底函数。
这个设计反映了 SDK 的取舍——high-churn 的通知类型用一个通用 handler 处理、避免给每种 list_changed 都单独配 callback 造成 API 表面爆炸。缺点是—— user code 要写 isinstance(msg, ResourceListChangedNotification) 之类的 dispatch。
推荐实践——
async def handler(msg):
match msg:
case types.ResourceListChangedNotification():
await refresh_resource_cache()
case types.ToolListChangedNotification():
await refresh_tool_cache() # 会顺带重算 _tool_output_schemas
case types.LoggingMessageNotification(params=p):
logger.log(p.level, p.data)
11.8.11.3 _tool_output_schemas 的失效时机
工具列表变更时、本章 §11.2.2 讲的 schema 缓存必须失效——否则 call_tool 会拿旧 schema 校验新 tool 的返回、误报 validation 错误。
查 session.py 源码、SDK 在 list_tools() 返回时无条件覆盖整个 _tool_output_schemas 字典——这是隐式失效机制。但如果 user 收到 tools/list_changed 不主动调 list_tools()、旧 schema 会一直缓存。
对策——在 message_handler 里看到 ToolListChangedNotification 立刻调 session.list_tools()——既更新了 UI / Agent 端能看到的工具列表、又刷新了 schema 缓存。源码里这是个”二合一”操作、不要分开想。
11.8.12 OAuth2 流程的五个关键决策点
§11.5 给出了 PKCEParameters / TokenStorage / OAuth2Auth 三个构件——这里把它们串起来、看一次完整授权的五个决策点。这些决策点决定了生产环境下的使用体验、也是和 TS SDK 对照时最容易混淆的地方。
11.8.12.1 决策一:token 从哪里来(TokenStorage 的读取顺序)
OAuth2Auth.auth_flow 每次进入先问 self._tokens——内存缓存。内存缓存从哪来?__aenter__ 时从 TokenStorage.get_tokens() 异步读一次、整个 session 生命周期复用。
这意味着——
- 如果 token 在外部被刷新(比如另一个 client 实例更新了 Keychain)、当前 session 不会感知——它只读一次。
- session 内部自己刷的 token 会调
TokenStorage.set_tokens(...)回写——读一次、写多次的模型。 - 对多 client 竞争的场景、
TokenStorage实现应该做带版本号的乐观锁或每次都get_tokens()重读——SDK 默认不这么做、生产需要自己扩展。
11.8.12.2 决策二:401 自动刷新只重试一次
auth_flow 收到 401 会走 _refresh_flow 刷新一次、再拿新 token 重试原请求。第二次还 401 就抛异常——不是无限循环。
这是故意限制——如果 refresh_token 也失效(比如过期或被 revoke)、再刷也是 401、死循环只会堆请求。SDK 选择”快速失败”、让 user code 决定是重新走完整授权还是放弃。
11.8.12.3 决策三:PKCE 的 code_verifier 长度用上限
PKCEParameters.generate() 用 range(128) 生成 code_verifier——直接取 RFC 7636 允许的最大长度。
最短只要 43 字符就合规——为什么用 128?安全性和兼容性的保守取值——128 字符的 verifier 约等于 760 bit 熵(字符集 [A-Za-z0-9-._~] 共 66 个字符、log2(66^128) ≈ 774 bit)。远超 AES-256 的 256 bit——对 authorization code 截获后爆破 verifier 的攻击者来说、几乎是不可能穷举。
代价是 URL 里 code_challenge 字段多 85 字符——不影响任何实际使用。
11.8.12.4 决策四:authorization code 交换用 POST form-urlencoded
实际查 OAuth 2.1 spec 和 auth/oauth2.py 实现——token endpoint 请求 body 是 application/x-www-form-urlencoded、不是 JSON。这是 OAuth 社区的历史遗留——2012 年 RFC 6749 定的、现在改不动了。
新手常犯的错——把 grant_type / code / code_verifier 塞进 JSON body 发给 token endpoint——所有规范 OAuth server 都返回 400 invalid_request。httpx 的 data=... 参数默认就是 form-urlencoded、SDK 用的也是这个——把几乎所有 server 的脾气都照顾到。
11.8.12.5 决策五:Dynamic Client Registration 的 fallback
MCP OAuth profile(第 15 章 §15.3)要求 server 必须支持 RFC 7591 Dynamic Client Registration——client 不用人工去 server 后台申请 client_id、调用 /register 就能拿到。
auth/utils.py 里实现了这一步——但带兜底:如果 server 的 .well-known/oauth-authorization-server 没声明 registration_endpoint、SDK 会回退到”假设 client_id 已预先配置”、从 TokenStorage.get_client_info() 读。
这是为了兼容老 OAuth server——不是所有 server 都实现 RFC 7591。SDK 的策略是——能自动就自动、不能自动就走静态配置——不把 user 逼到死角。
11.8.13 一个容易被忽略的边界:_receive_loop 的异常传播
最后补一个源码级细节——BaseSession._receive_loop(父类、不在 client.py 里但客户端用的就是它)接到来自 read_stream 的 Exception 对象时——不会抛出,而是转发给 message_handler。
回看 ClientSession.__init__ 的签名——read_stream: ReadStream[SessionMessage | Exception]——流里可以流 Exception。这是 anyio 的惯用法——transport 层(stdio 子进程死了 / HTTP 连接断了 / JSON 解析失败)把 exception 包成 stream 元素、而不是抛 exception 弄死 receive loop。
这对 user 的影响——
- 写
message_handler时记得判断if isinstance(msg, Exception): handle_err(msg); return - 不这么判断的话、transport 出错是静默的——stream 可能已空、但 user 不知道为什么 session 突然不工作了
- 推荐每个 handler 最顶上 4 行模板:
async def handler(msg):
if isinstance(msg, Exception):
logger.error("Transport error: %r", msg)
await trigger_reconnect()
return
# ... 正常处理
这是 Python SDK 和 TS SDK 的第 10 个隐藏差异——TS SDK 的 transport 用 event emitter、错误走 transport.onerror(err) callback;Python 用 stream 元素、错误和正常消息走同一条通道。哪种风格更好没有定论——Python 的好处是 error 处理和消息处理都走 match/case、代码路径唯一;TS 的好处是 user 不写 error handling 时错误起码会冒出来、不会静默。
11.9 本章小结
本章深入分析了 MCP Python SDK 的客户端实现。
核心要点
ClientSession作为单服务器会话的核心抽象,继承自BaseSession并提供了完整的 MCP 协议操作方法ClientSessionGroup在此基础上实现了多服务器管理与组件聚合- Python SDK 充分利用了
anyio的结构化并发、async with上下文管理器、Pydantic类型校验等语言特性 - 传输层的四种实现(stdio、SSE、Streamable HTTP、WebSocket)通过统一的读写流抽象与上下文管理器模式,实现了对应用层的完全透明
- OAuth2 认证作为可插拔的 httpx 中间件,为 HTTP 类传输提供了标准化的安全方案
记住这三个设计范式
- async with 贯穿一切——资源获取与释放绑定在语法结构上
- Protocol + TypeAdapter——结构化子类型 + 运行时校验的组合拳
- 聚合 + 命名空间 + 反向索引——
ClientSessionGroup的设计范式可以迁移到其他需要”聚合多个同类服务”的场景
下一章我们将进入传输层的深度实现——第 12 章详解 stdio 传输层,看子进程通信的精细工程。
进入第 12 章之前:带着这三个问题去读
读完本章应该能在脑子里复现 ClientSession 从构造到 async with 退出的完整路径。第 12 章会把视角从会话层进一步下沉到字节层——以下三个问题可以作为阅读指针:
anyio.create_memory_object_stream(0)的 0 容量意味着读写必须同步握手——stdio transport 是怎么把这种严格背压和子进程的 stdout 缓冲区调和的?- Windows 下
start_new_session=True不可用——SDK 用 Job Object 替代实现进程树整体终止、这个技术细节本章只提了名字、第 12 章会展开源码。 - 三段式 shutdown 的 2 秒 / 5 秒两个超时是怎么定出来的?比 TS SDK 激进还是保守?第 12 章给出对照。