MCP 协议设计与实现

第11章 Python Client 实现剖析

作者 杨艺韬 · 12,150 字

第11章 Python Client 实现剖析

前面几章我们分析了 TypeScript SDK 的 Client 实现,对 MCP 客户端的职责有了清晰认知。本章我们将目光转向 Python SDK 的客户端实现。

Python SDK 在设计哲学上与 TypeScript SDK 保持一致——都围绕”会话”这一核心抽象展开,但在具体实现上却因语言特性而呈现出显著差异。Python SDK 选择了 anyio 作为异步运行时抽象层,用 Pydantic 取代 Zod 进行类型校验,用 async with 上下文管理器替代手动的生命周期管理。这些选择不是偶然的,它们深刻影响了客户端的 API 形态和内部架构。

本章要点

  • ClientSession / ClientSessionGroup 两级抽象——单服务器会话 + 多服务器聚合
  • anyio + async with——结构化并发与资源生命周期的最佳实践
  • 注册即声明——回调函数注册自动推断 Client capability
  • 工具结果校验——Python SDK 独有,基于 JSON Schema 的运行时验证
  • 三种传输层:stdio / SSE / Streamable HTTP 的统一流抽象
  • OAuth2 认证:PKCE + TokenStorage + httpx 中间件
  • 与 TS SDK 的 八个关键差异

11.1 客户端架构总览

Python MCP Client 的源码位于 src/mcp/client/ 目录下,核心文件包括:

  • session.pyClientSession 类,单服务器会话的核心抽象
  • session_group.pyClientSessionGroup 类,多服务器聚合管理
  • stdio.py — stdio 传输层实现
  • sse.py — SSE 传输层实现
  • streamable_http.py — Streamable HTTP 传输层实现
  • websocket.py — WebSocket 传输层实现(Python 独有)
  • auth/oauth2.py — OAuth2 客户端认证

下面这张架构图展示了各模块之间的关系:

graph TB
    subgraph 应用层
        APP[应用代码]
    end

    subgraph 会话层
        CSG[ClientSessionGroup<br/>多服务器聚合]
        CS[ClientSession<br/>单服务器会话]
    end

    subgraph 基础设施层
        BS[BaseSession<br/>请求/响应路由]
    end

    subgraph 传输层
        STDIO[stdio_client<br/>子进程通信]
        SSE[sse_client<br/>SSE 长连接]
        SH[streamable_http_client<br/>HTTP 流式]
        WS[websocket_client<br/>全双工]
    end

    subgraph 认证层
        OAUTH[OAuth2Auth<br/>PKCE + 自动刷新]
    end

    APP --> CSG
    APP --> CS
    CSG -->|管理多个| CS
    CS -->|继承| BS
    CS --> STDIO
    CS --> SSE
    CS --> SH
    CS --> WS
    SH -.->|可选| OAUTH
    SSE -.->|可选| OAUTH

分层设计的三个核心选择

Python SDK 的分层反映了三个关键的设计哲学:

  1. 传输层完全抽象化——上层无需感知底层用的是 stdio 还是 HTTP,只看到 (read_stream, write_stream)
  2. 会话分级——单会话 (ClientSession) 管协议,多会话聚合 (ClientSessionGroup) 管应用层编排
  3. 资源管理结构化——从 AsyncExitStackasync with,Python 把”不泄漏资源”变成语言特性

11.2 ClientSession:单服务器会话

ClientSession 是 Python 客户端的核心类,它继承自 BaseSession,负责与单个 MCP Server 的全部通信。我们先看其类型签名:

class ClientSession(
    BaseSession[
        types.ClientRequest,        # 发送的请求类型
        types.ClientNotification,   # 发送的通知类型
        types.ClientResult,         # 发送的结果类型
        types.ServerRequest,        # 接收的请求类型
        types.ServerNotification,   # 接收的通知类型
    ]
):

这个五元泛型参数看起来复杂,但逻辑清晰:前三个约束”我发出去的消息”,后两个约束”我收到的消息”BaseSession 利用这些泛型参数在编译期保证消息类型的正确性。

11.2.1 构造与初始化

ClientSession 的构造函数接受读写流和一系列回调函数:

def __init__(
    self,
    read_stream: ReadStream[SessionMessage | Exception],
    write_stream: WriteStream[SessionMessage],
    read_timeout_seconds: float | None = None,
    sampling_callback: SamplingFnT | None = None,
    elicitation_callback: ElicitationFnT | None = None,
    list_roots_callback: ListRootsFnT | None = None,
    logging_callback: LoggingFnT | None = None,
    message_handler: MessageHandlerFnT | None = None,
    client_info: types.Implementation | None = None,
):

Protocol 定义回调类型

这些回调函数的设计值得关注。Python SDK 使用 Protocol 类来定义回调类型,这是 Python 式的结构化子类型(structural subtyping),等价于 TypeScript 的接口但更灵活:

class SamplingFnT(Protocol):
    async def __call__(
        self,
        context: RequestContext[ClientSession],
        params: types.CreateMessageRequestParams,
    ) -> types.CreateMessageResult | types.ErrorData: ...

Protocol 的好处:

  • 无需继承,任何形状匹配的函数/类都符合协议
  • 便于用普通函数作为回调
  • 支持类型检查器做静态分析

默认回调的”优雅失败”

每个回调都有默认实现。例如,当服务器发起 sampling 请求但客户端未注册回调时,默认返回错误:

async def _default_sampling_callback(
    context: RequestContext[ClientSession],
    params: types.CreateMessageRequestParams,
) -> types.CreateMessageResult | types.ErrorData:
    return types.ErrorData(
        code=types.INVALID_REQUEST,
        message="Sampling not supported",
    )

这种设计避免了未注册回调时的崩溃——明确返回”不支持”比抛出 NotImplementedError 更符合协议规范。

注册即声明:能力推断

初始化握手通过 initialize() 方法完成。该方法会根据注册的回调函数自动推断客户端能力

async def initialize(self) -> types.InitializeResult:
    capabilities = types.ClientCapabilities(
        experimental={},
        roots=(
            types.RootsCapability(listChanged=True)
            if self._list_roots_callback is not _default_list_roots_callback
            else None
        ),
        sampling=(
            types.SamplingCapability()
            if self._sampling_callback is not _default_sampling_callback
            else None
        ),
        elicitation=(
            types.ElicitationCapability()
            if self._elicitation_callback is not _default_elicitation_callback
            else None
        ),
    )
    # ... 发送 initialize 请求
  • 如果注册了 sampling_callback,就声明支持 sampling 能力
  • 如果注册了 list_roots_callback,就声明支持 roots 能力

这种”注册即声明”的设计避免了能力声明与实际实现不一致的问题。开发者只需要关心”我要不要支持这个功能”,不用单独维护 capability 声明。

11.2.2 协议操作方法

ClientSession 为 MCP 协议的每种操作提供了对应的异步方法:

方法协议操作返回类型
list_tools()tools/listListToolsResult
call_tool()tools/callCallToolResult
list_resources()resources/listListResourcesResult
read_resource()resources/readReadResourceResult
list_prompts()prompts/listListPromptsResult
get_prompt()prompts/getGetPromptResult
complete()completion/completeCompleteResult
send_ping()pingEmptyResult
subscribe_resource()resources/subscribeEmptyResult
unsubscribe_resource()resources/unsubscribeEmptyResult

call_tool 为例,它展示了 Python 客户端的一个独特特性——结构化内容校验

async def call_tool(
    self,
    name: str,
    arguments: dict[str, Any] | None = None,
    read_timeout_seconds: float | None = None,
    progress_callback: ProgressFnT | None = None,
) -> types.CallToolResult:
    result = await self.send_request(
        types.CallToolRequest(
            params=types.CallToolRequestParams(name=name, arguments=arguments),
        ),
        types.CallToolResult,
        request_read_timeout_seconds=read_timeout_seconds,
        progress_callback=progress_callback,
    )
    if not result.is_error:
        await self._validate_tool_result(name, result)
    return result

_validate_tool_result 会根据工具的 output_schema 对返回的 structured_content 进行 JSON Schema 校验。

Schema 缓存机制

工具的 output schema 在 list_tools() 调用时被缓存到 _tool_output_schemas 字典中,后续的 call_tool 会自动利用这个缓存。

async def _validate_tool_result(self, name: str, result: types.CallToolResult):
    # 1. 查缓存
    schema = self._tool_output_schemas.get(name)

    # 2. 缓存缺失时刷新
    if schema is None and name not in self._tool_output_schemas:
        # 只重试一次,避免无限循环
        await self._refresh_tool_cache()
        schema = self._tool_output_schemas.get(name)

    # 3. 仍无 schema 说明工具不声明结构化输出,跳过
    if schema is None:
        return

    # 4. 校验
    try:
        jsonschema.validate(result.structured_content, schema)
    except jsonschema.ValidationError as e:
        raise ToolOutputValidationError(
            f"Tool {name} returned invalid output: {e.message}"
        )

如果缓存中没有该工具的 schema(比如工具是后来动态添加的),会自动触发一次 list_tools() 刷新缓存。这种”lazy + cache”的组合既保证了正确性又避免了每次都查询。

11.2.3 服务器请求处理

MCP 是双向协议,服务器也会向客户端发起请求。_received_request 方法通过 Python 的 match/case 模式匹配来分发这些请求:

async def _received_request(self, responder):
    ctx = RequestContext[ClientSession](
        request_id=responder.request_id,
        meta=responder.request_meta,
        session=self,
    )
    match responder.request:
        case types.CreateMessageRequest(params=params):
            with responder:
                response = await self._sampling_callback(ctx, params)
                await responder.respond(response)
        case types.ElicitRequest(params=params):
            with responder:
                response = await self._elicitation_callback(ctx, params)
                await responder.respond(response)
        case types.ListRootsRequest():
            with responder:
                response = await self._list_roots_callback(ctx)
                await responder.respond(response)
        case types.PingRequest():
            with responder:
                await responder.respond(types.EmptyResult())

注意 with responder 上下文管理器的使用。RequestResponder 实现了 __enter__/__exit__,在进入时设置取消作用域CancelScope),在退出时通知会话该请求已处理完毕。这确保了即使回调函数抛出异常,请求的生命周期也能被正确管理。

match/case vs isinstance 链

Python 3.10+ 的 match/case 对 Pydantic 类型有特别优化——它不只是 isinstance 检查,还支持模式解构:

# match/case 风格:类型匹配 + 字段解构一气呵成
case types.CreateMessageRequest(params=params):
    response = await self._sampling_callback(ctx, params)

# isinstance 风格:冗余
if isinstance(req, types.CreateMessageRequest):
    params = req.params
    response = await self._sampling_callback(ctx, params)

11.3 ClientSessionGroup:多服务器聚合

真实的 AI Agent 应用往往需要同时连接多个 MCP ServerClientSessionGroup 正是为此设计的——它管理多个 ClientSession,并将所有服务器的 tools、resources、prompts 聚合到统一的命名空间中。

graph LR
    subgraph ClientSessionGroup
        TOOLS[聚合 Tools 字典]
        RES[聚合 Resources 字典]
        PROMPTS[聚合 Prompts 字典]
    end

    subgraph ServerA[Server A - 文件系统]
        SA[ClientSession A]
        TA[read_file, write_file]
    end

    subgraph ServerB[Server B - 数据库]
        SB[ClientSession B]
        TB[query, execute]
    end

    subgraph ServerC[Server C - Web 搜索]
        SC[ClientSession C]
        TC[search, fetch]
    end

    SA --> TA
    SB --> TB
    SC --> TC

    TOOLS -->|路由| SA
    TOOLS -->|路由| SB
    TOOLS -->|路由| SC

11.3.1 连接管理与生命周期

ClientSessionGroup 实现了 async with 协议,用 AsyncExitStack 管理所有子会话的生命周期:

class ClientSessionGroup:
    def __init__(
        self,
        exit_stack: contextlib.AsyncExitStack | None = None,
        component_name_hook: _ComponentNameHook | None = None,
    ):
        self._tools: dict[str, types.Tool] = {}
        self._resources: dict[str, types.Resource] = {}
        self._prompts: dict[str, types.Prompt] = {}
        self._sessions: dict[ClientSession, _ComponentNames] = {}
        self._tool_to_session: dict[str, ClientSession] = {}

        if exit_stack is None:
            self._exit_stack = contextlib.AsyncExitStack()
            self._owns_exit_stack = True
        else:
            self._exit_stack = exit_stack
            self._owns_exit_stack = False

这里有一个精妙的设计:exit_stack 可以外部传入,也可以内部创建。

场景exit_stackowns生命周期
独立使用内部创建Trueasync with 结束时关闭
嵌入到更大的资源管理外部传入False由调用者控制

这使得 ClientSessionGroup 可以嵌入到更大的资源管理体系中——比如一个 Agent 框架可能有自己的全局 exit_stack,所有资源都统一管理。

两级 exit_stack 架构

仔细看 ClientSessionGroup 的字段声明(mcp/client/session_group.py:119-120),实际上维护着两级 exit_stack

_exit_stack: contextlib.AsyncExitStack                               # 主 stack
_session_exit_stacks: dict[mcp.ClientSession, contextlib.AsyncExitStack]  # 每会话 stack

两级设计的理由

  • _exit_stack——管理 Group 整体的 entering/exiting。如果 Group owns 它、__aexit__aclose();不 own 就让外部处理。
  • _session_exit_stacks[session]——每个 ClientSession 有自己独立的 exit stack,记录它的底层连接资源(stdio 进程、HTTP 连接、SSE 流)。

这让”断开一个 session 但保留其他 session”成为可能——disconnect_from_server(session) 只关这一个 session 的 exit_stack、其他 session 继续运行。如果只有一个共享 stack、就得全部关或都不关。

并发 shutdown 而非串行

__aexit__ 的真实实现(session_group.py:156-171):

async def __aexit__(self, _exc_type, _exc_val, _exc_tb) -> bool | None:
    # Only close the main exit stack if we created it
    if self._owns_exit_stack:
        await self._exit_stack.aclose()

    # Concurrently close session stacks.
    async with anyio.create_task_group() as tg:
        for exit_stack in self._session_exit_stacks.values():
            tg.start_soon(exit_stack.aclose)

anyio.create_task_group() + tg.start_soon(exit_stack.aclose) 让所有 session 的关闭并发发生——不是 for: await aclose() 这样串行。对 N 个 server(每个关闭可能有 1-2 秒 TCP 或进程终止等待):

  • 串行关闭:总耗时 ≈ N × 2 秒
  • 并发关闭:总耗时 ≈ max(各自关闭时间) ≈ 2 秒

用户 async with 退出 Group 时等的秒数从 O(N) 降到 O(1)。对连了十几个 MCP server 的 Agent 而言、shutdown 从几十秒压到几秒。

disconnect_from_server 的 4 态检查

session_group.py:213-223disconnect_from_server 实现:

async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
    session_known_for_components = session in self._sessions
    session_known_for_stack = session in self._session_exit_stacks

    if not session_known_for_components and not session_known_for_stack:
        raise MCPError(
            code=types.INVALID_PARAMS,
            message="Provided session is not managed or already disconnected.",
        )
    # ... 分别处理两个维度

两个独立维度_sessions 记录”组件归属”、_session_exit_stacks 记录”资源所有权”。两者正常情况下同步更新、但边缘情况可能脱同步(比如 connect 过程中异常、一个 dict 已插入另一个还没)。

代码逻辑明确说”只要还在任一维度里就不是完全未知”——只有两者都没时才抛 INVALID_PARAMS。这让 disconnect 的幂等性更强——用户可以安全地重复调 disconnect_from_server(same_session),第二次会抛合适的错误而不是静默把已清理的字典再搅一遍导致 KeyError。

11.3.2 命名冲突:component_name_hook 的作用

同一个工具名(比如 search)被两个不同服务器暴露时会怎样?默认情况下后注册的会覆盖前者——这是 Python dict 的自然行为、但对 Agent 是灾难。

component_name_hook 参数(session_group.py:122-126)给用户一个主动命名空间化的钩子:

# Optional fn consuming (component_name, server_info) for custom names.
# This is to provide a means to mitigate naming conflicts across servers.
# Example: (tool_name, server_info) => "{result.server_info.name}.{tool_name}"
_ComponentNameHook: TypeAlias = Callable[[str, types.Implementation], str]

源码注释直接给了典型用法

name_fn = lambda name, server_info: f"{server_info.name}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
    for params in server_params:
        await group.connect_to_server(params)

这样 Server A 的 search 注册为 "ServerA_search"、Server B 的 search 注册为 "ServerB_search"——两者共存。

注意 call_tool 路由是按”外部名”走的(line 203):

async def call_tool(self, name, arguments=...):
    session = self._tool_to_session[name]          # 按外部名查 session
    session_tool_name = self.tools[name].name       # 取出 session 里的原名
    return await session.call_tool(session_tool_name, ...)

_tool_to_session[name] 用外部(可能已重命名的)名做 lookup、self.tools[name].name 取出 session 内部的原名——因为 session 只认自己当初注册的 search、不认 ServerA_search。这种”外部统一命名 + 内部原名转发”的设计让 rename hook 只影响 Group 层、不扰动底层 session 协议。调试时注意两层之间的映射关系。

使用示例

连接新服务器通过 connect_to_server 完成:

async with ClientSessionGroup() as group:
    # 连接第一个服务器
    session_a = await group.connect_to_server(
        StdioServerParameters(command="python", args=["-m", "my_server"])
    )

    # 连接第二个服务器
    session_b = await group.connect_to_server(
        StreamableHttpParameters(url="http://localhost:8080/mcp")
    )

    # 连接第三个服务器
    session_c = await group.connect_to_server(
        SseServerParameters(url="http://legacy.example.com/sse")
    )

    # 此时 group.tools 包含三个服务器的所有工具
    print(f"Available tools: {list(group.tools.keys())}")

    # 调用工具(自动路由到正确的服务器)
    result = await group.call_tool("read_file", {"path": "/tmp/test.txt"})

传输层自动选择

_establish_session 方法展示了传输层的选择逻辑:

async def _establish_session(
    self,
    params: ServerParameters,
) -> tuple[ReadStream, WriteStream, contextlib.AsyncExitStack]:
    session_stack = contextlib.AsyncExitStack()

    match params:
        case StdioServerParameters():
            read, write = await session_stack.enter_async_context(
                stdio_client(params)
            )
        case StreamableHttpParameters():
            read, write, _ = await session_stack.enter_async_context(
                streamablehttp_client(url=str(params.url), headers=params.headers)
            )
        case SseServerParameters():
            read, write = await session_stack.enter_async_context(
                sse_client(url=str(params.url), headers=params.headers)
            )

    return read, write, session_stack

每个会话都有独立的 AsyncExitStack,确保单个服务器断连不会影响其他会话。

11.3.2 命名冲突处理

当多个服务器提供同名工具时,ClientSessionGroup 默认会抛出错误。但它提供了 component_name_hook 机制来自定义命名策略:

# 给每个工具加上服务器名前缀
name_fn = lambda name, server_info: f"{server_info.name}_{name}"

async with ClientSessionGroup(component_name_hook=name_fn) as group:
    await group.connect_to_server(server_a_params)  # tools: "serverA_read"
    await group.connect_to_server(server_b_params)  # tools: "serverB_read"

常见的命名策略:

  • {server}_{tool} — 服务器名前缀,最直观
  • {tool}@{server} — 装饰符风格
  • {tool} + 版本号后缀 — 保持名称简洁
  • 自定义映射表 — 精细控制

原子性保证

聚合过程使用临时字典来保证原子性——如果聚合过程中任何一步失败,已有的聚合状态不会被污染:

async def _aggregate_components(
    self,
    server_info: types.Implementation,
    session: ClientSession,
):
    # 临时收集
    prompts_temp: dict[str, types.Prompt] = {}
    resources_temp: dict[str, types.Resource] = {}
    tools_temp: dict[str, types.Tool] = {}

    # 收集 tools
    for tool in (await session.list_tools()).tools:
        name = self._compute_name(tool.name, server_info)
        tools_temp[name] = tool

    # 收集 resources
    for resource in (await session.list_resources()).resources:
        ...

    # 收集 prompts
    for prompt in (await session.list_prompts()).prompts:
        ...

    # 检查冲突(在合并前)
    matching_tools = tools_temp.keys() & self._tools.keys()
    if matching_tools:
        raise MCPError(
            f"Tool name conflict: {matching_tools}. "
            f"Use component_name_hook to namespace."
        )

    # 原子性地合并(只有全部检查通过才提交)
    self._tools.update(tools_temp)
    self._resources.update(resources_temp)
    self._prompts.update(prompts_temp)

    # 维护反向索引
    for name in tools_temp:
        self._tool_to_session[name] = session

11.3.3 动态断连

disconnect_from_server 支持运行时动态移除某个服务器。它通过 _ComponentNames 反向索引快速定位该会话注册的所有组件,逐一清理:

async def disconnect_from_server(self, session: ClientSession) -> None:
    # 取出该会话注册的所有组件名
    component_names = self._sessions.pop(session)

    # 从聚合字典中删除
    for name in component_names.tools:
        del self._tools[name]
        del self._tool_to_session[name]
    for name in component_names.resources:
        del self._resources[name]
    for name in component_names.prompts:
        del self._prompts[name]

    # 关闭该会话的 exit_stack(不影响其他会话)
    session_stack = self._session_exit_stacks.pop(session)
    await session_stack.aclose()

这个设计支持热插拔——Agent 运行过程中可以动态加入或移除 MCP Server,无需重启。

11.4 传输层:四种连接方式

Python SDK 提供了四种传输层实现,它们都遵循相同的模式——作为异步上下文管理器,yield(read_stream, write_stream) 元组。

11.4.1 stdio 传输

stdio_client 通过 anyio.open_process 启动子进程,将子进程的 stdin/stdout 包装为 MCP 消息流:

@asynccontextmanager
async def stdio_client(server: StdioServerParameters):
    process = await anyio.open_process(
        [server.command, *server.args],
        env=server.env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # 创建流
    read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
    write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

    async def stdout_reader():
        buffer = ""
        async for chunk in TextReceiveStream(process.stdout):
            lines = (buffer + chunk).split("\n")
            buffer = lines[-1]
            for line in lines[:-1]:
                if line:
                    message = types.jsonrpc_message_adapter.validate_json(line)
                    await read_stream_writer.send(SessionMessage(message))

    async def stdin_writer():
        async for session_msg in write_stream_reader:
            json_str = session_msg.message.model_dump_json()
            await process.stdin.send((json_str + "\n").encode())

    async with anyio.create_task_group() as tg:
        tg.start_soon(stdout_reader)
        tg.start_soon(stdin_writer)

        try:
            yield read_stream, write_stream
        finally:
            await _graceful_shutdown(process)

Graceful Shutdown

关闭时遵循 MCP 规范的 graceful shutdown 序列:

async def _graceful_shutdown(process: Process) -> None:
    # 1. 关闭 stdin,让进程知道"没有更多输入"
    await process.stdin.aclose()

    # 2. 等待进程自行退出
    try:
        with anyio.fail_after(10):
            await process.wait()
        return
    except TimeoutError:
        pass

    # 3. 发送 SIGTERM
    process.terminate()
    try:
        with anyio.fail_after(5):
            await process.wait()
        return
    except TimeoutError:
        pass

    # 4. 最后手段:SIGKILL
    process.kill()
    await process.wait()

这种”三段式关闭”是 Unix 进程管理的标准实践——先温和,再严厉,最后暴力。

11.4.2 SSE 与 Streamable HTTP 传输

SSE 传输使用 httpx + httpx-sse 库建立长连接。Streamable HTTP 传输是 SSE 的演进版本,支持双向流式通信,并引入了 mcp-session-id 头来维护会话状态。两者都可以通过 OAuth2Auth 中间件进行认证。

11.4.3 WebSocket 传输

Python SDK 独有的 WebSocket 实现(详见第 14 章),提供真正的全双工通信。

11.4.4 传输参数模型

session_group.py 中定义了四种传输参数的 Pydantic 模型,统一为 ServerParameters 类型别名:

ServerParameters: TypeAlias = (
    StdioServerParameters | SseServerParameters | StreamableHttpParameters
)

这使得 connect_to_server 可以接受任意一种参数,内部通过 isinstance 分发到对应的传输层。

11.5 OAuth2 客户端认证

Python SDK 的 OAuth2 实现位于 client/auth/oauth2.py,支持 Authorization Code + PKCE 流程。

PKCE 参数生成

PKCEParameters 类封装了 code_verifier/code_challenge 的生成:

class PKCEParameters(BaseModel):
    code_verifier: str = Field(..., min_length=43, max_length=128)
    code_challenge: str = Field(..., min_length=43, max_length=128)

    @classmethod
    def generate(cls) -> "PKCEParameters":
        code_verifier = "".join(
            secrets.choice(string.ascii_letters + string.digits + "-._~")
            for _ in range(128)
        )
        digest = hashlib.sha256(code_verifier.encode()).digest()
        code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
        return cls(code_verifier=code_verifier, code_challenge=code_challenge)

TokenStorage 接口

认证流程通过 TokenStorage 协议实现 token 的持久化存储:

class TokenStorage(Protocol):
    async def get_tokens(self) -> OAuth2Token | None: ...
    async def set_tokens(self, tokens: OAuth2Token) -> None: ...
    async def get_client_info(self) -> OAuth2ClientInfo | None: ...
    async def set_client_info(self, info: OAuth2ClientInfo) -> None: ...

应用可以自定义存储后端:

  • 文件(开发环境)
  • 系统密钥链(Keychain/Credentials Manager)
  • 数据库(多用户场景)
  • 内存(测试)

httpx 中间件集成

OAuth2Auth 作为 httpx 的 Auth 中间件,自动在请求中注入 Bearer token 并处理 token 刷新:

class OAuth2Auth(httpx.Auth):
    def auth_flow(self, request: httpx.Request):
        # 1. 尝试用现有 token
        tokens = self._tokens
        if tokens and not tokens.is_expired:
            request.headers["Authorization"] = f"Bearer {tokens.access_token}"
            response = yield request

            # 2. 401 时尝试刷新
            if response.status_code == 401:
                yield from self._refresh_flow(request)
        else:
            # 3. 没有 token 或已过期
            yield from self._initial_auth_flow(request)

11.6 与 TypeScript Client 的核心差异

通过对比两个 SDK 的客户端实现,可以提炼出以下关键差异:

维度Python SDKTypeScript SDK
异步运行时anyio(兼容 asyncio/trio)原生 async/await
生命周期管理async with + AsyncExitStack手动 connect()/close()
类型校验Pydantic TypeAdapterZod schema
回调类型Protocol(结构化子类型)TypeScript 函数类型
消息分发match/case 模式匹配switch/case
多服务器内置 ClientSessionGroup需自行实现
工具结果校验内置 _validate_tool_result无内置校验
传输层切换isinstance + 上下文管理器Transport 接口
WebSocket✅ 支持❌ 不支持

最关键的三点差异

差异一:生命周期管理

Python 的 async with 将资源获取与释放绑定在语法结构上,不可能忘记关闭连接:

async with ClientSession(read, write) as session:
    await session.initialize()
    result = await session.call_tool(...)
# 退出 block 时自动清理,无论是否抛异常

TypeScript 需要开发者自觉调用 close(),或使用 try/finally

const client = new Client(...);
try {
    await client.connect(transport);
    const result = await client.callTool(...);
} finally {
    await client.close();  // 容易忘
}

AsyncExitStack 的嵌套使用更是 Python 独有的模式——ClientSessionGroup 用主 exit_stack 管理所有子会话的 exit_stack,形成资源管理的树状结构

差异二:多服务器聚合

ClientSessionGroup 是 Python SDK 独有的抽象。它不仅管理连接,还提供了:

  • 组件聚合(tools/resources/prompts 统一命名空间)
  • 命名冲突检测
  • 动态断连
  • 原子性保证

TypeScript SDK 没有对应的内置实现,开发者需要自行管理多个 Client 实例。这也是 MCP 框架层的机会——很多 Agent 框架(如 LangChain)在 TypeScript 侧重新实现了类似功能。

差异三:工具结果校验

Python 的 call_tool 在返回结果前会自动校验 structured_content 是否符合工具声明的 output_schema。这利用了 jsonschema 库进行运行时校验,为 Agent 应用提供了额外的安全保障。

# Python:自动校验
result = await session.call_tool("calc", {"a": 1, "b": 2})
# 如果 result.structured_content 不符合 schema,抛异常
// TypeScript:开发者自行校验
const result = await client.callTool("calc", { a: 1, b: 2 });
// 需要手动用 zod/ajv 校验 result.structuredContent

11.7 实战:构建一个多 MCP Server 的 Agent

下面展示一个真实场景——Agent 同时连接文件系统、数据库、Web 搜索三个 MCP Server:

import anyio
from mcp import ClientSessionGroup
from mcp.client.session_group import (
    StdioServerParameters,
    StreamableHttpParameters,
)

async def run_agent():
    # 命名策略:用服务器名前缀避免冲突
    name_fn = lambda name, info: f"{info.name}_{name}"

    async with ClientSessionGroup(component_name_hook=name_fn) as group:
        # 文件系统 MCP Server(本地 stdio)
        fs_session = await group.connect_to_server(
            StdioServerParameters(
                command="npx",
                args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
            )
        )

        # 数据库 MCP Server(本地 stdio)
        db_session = await group.connect_to_server(
            StdioServerParameters(
                command="npx",
                args=["-y", "@modelcontextprotocol/server-postgres", "postgres://..."],
            )
        )

        # Web 搜索 MCP Server(远程 HTTP)
        web_session = await group.connect_to_server(
            StreamableHttpParameters(
                url="https://search.example.com/mcp",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
        )

        print(f"Connected. Tools: {list(group.tools.keys())}")
        # 输出: ['filesystem_read_file', 'filesystem_write_file',
        #       'postgres_query', 'postgres_execute',
        #       'search_web', 'search_fetch']

        # 主 Agent 循环
        while True:
            user_input = input("You: ")
            if user_input == "exit":
                break

            # 让 LLM 决定调用哪个工具
            decision = await llm_decide_tool(user_input, group.tools)

            if decision.tool_name:
                result = await group.call_tool(
                    decision.tool_name,
                    decision.arguments,
                )
                print(f"Tool result: {result.content}")
            else:
                print(f"Bot: {decision.text}")

anyio.run(run_agent)

这个模式的威力:

  • 统一调用接口——不管工具在哪个 Server,都是 group.call_tool()
  • 动态扩展——可以运行时加入新 Server
  • 故障隔离——单个 Server 挂掉不影响其他

11.8 四个反模式

反模式一:忘记 async with

# ❌ 泄漏连接
session = ClientSession(read, write)
await session.initialize()
# ... 忘了关闭

# ✅ 使用 async with
async with ClientSession(read, write) as session:
    await session.initialize()
    # 自动关闭

反模式二:并发调用同一个 session 不加锁

MCP session 不是完全并发安全的——虽然发送是安全的,但响应需要配对。大量并发可能让请求 ID 混淆。

对策:用 anyio.Lock 保护关键操作,或限制每个 session 的并发度。

反模式三:硬编码 sampling 回调

# ❌ 回调里直接调 OpenAI,忽略用户的实际 LLM
async def sampling_cb(ctx, params):
    return await openai.chat.create(...)

# ✅ 根据配置路由到用户选择的 LLM
async def sampling_cb(ctx, params):
    provider = get_user_llm_provider()
    return await provider.complete(params)

反模式四:不处理 Server 重启

现象:MCP Server 重启后 session 失效,但 Agent 不知道,继续发请求全部超时。

对策

  • 实现心跳检测(send_ping()
  • 捕获 session 失效错误,自动重连
  • 使用 ClientSessionGroup 的断连通知机制

11.8.5 mcp/client/ 目录的真实结构

mcp-python-sdksrc/mcp/client/ 实测 2508 行——分两块:

文件 / 子目录角色
session.py480ClientSession 单服务器会话(章节主线)
session_group.py410ClientSessionGroup 多服务器聚合
client.py308Client 高层 wrapper(dataclass、章节没明确介绍)
streamable_http.py588Streamable HTTP transport(最复杂
stdio.py270stdio transport
sse.py160SSE transport
websocket.py85WebSocket transport
_memory.py78InMemoryTransport(仅供测试 / Client 直连 server 用)
_transport.py21Transport ABC base 接口
context.py16client-side request context
auth/oauth2.py646整个 client 最大文件——OAuth 2.1 完整实现
auth/utils.py339OAuth 工具(PKCE、metadata 发现等)
auth/extensions/client_credentials.py485客户端凭据 grant type 扩展

两个值得注意的尺寸事实——

1. auth/oauth2.py 646 行 + utils 339 + extensions 485 = 1470 行 OAuth 代码——比 session + session_group 加起来还多(480+410=890)。Python SDK 在 OAuth 上的投入和 TS SDK 一样夸张——和本书第 9 章 §9.11 提到的 TS SDK auth.ts 1745 行相互印证:OAuth 2.1 完整实现就是 1500-1700 行——任何 SDK 都跑不掉。

2. streamable_http.py 588 行 vs stdio.py 270 行 vs sse.py 160 行——Streamable HTTP 是最复杂的 transport。原因是它要支持:(a) JSON 和 SSE 两种响应模式自适应;(b) Last-Event-ID 重连重放;(c) DELETE 主动关 session;(d) 普通 POST + GET-for-stream 两种模式。三种功能叠加导致代码量是 stdio 的 2 倍多。

11.8.6 Client 高层 wrapper(client.py 308 行)

章节的 ClientSession 是底层 API、需要手动管理 transport——Client 是 dataclass-based 的高层 wrapper

@dataclass
class Client:
    server: Server | MCPServer | Transport | str  # ← 自动识别 4 种入参类型
    raise_exceptions: bool = False
    read_timeout_seconds: float | None = None
    sampling_callback: SamplingFnT | None = None
    list_roots_callback: ListRootsFnT | None = None
    elicitation_callback: ElicitationFnT | None = None

    async def __aenter__(self): ...
    async def __aexit__(self): ...

server 字段的多态识别——__post_init__ 里——

  • isinstance(server, Server | MCPServer) → 包成 InMemoryTransport测试 / 嵌入用
  • isinstance(server, str) → URL → streamable_http_client transport
  • isinstance(server, Transport) → 直接用

这种”四 in 一”构造——让最常见的两种用法(连 URL / 直接嵌 server 测试)一行代码搞定

# 测试嵌入
async with Client(my_mcp_server) as client:
    await client.call_tool("add", {"a": 1, "b": 2})

# 生产 HTTP
async with Client("https://api.example.com/mcp") as client: ...

# 自定义 transport
async with Client(my_stdio_transport) as client: ...

Client vs ClientSession 的关系——

  • ClientSession —— 协议层、必须自己拿到 read/write streams
  • Client —— 应用层、自动管理 transport + AsyncExitStack
  • 99% 用户应该用 Client、不用 ClientSession —— 章节正文偏向后者是因为讲底层架构、但生产代码默认 Client

对照 TS SDK——TS 没有等价的”四态构造”Client——TS 用户必须先 new Client()client.connect(transport)Python 的 dataclass + __post_init__ 让这种多态构造极简洁——和 §10.10.8 讨论过的 Tool.from_function 是同源思路(类型推断 + Pydantic/dataclass 让 Python SDK 比 TS SDK 更紧凑)。

11.8.7 核心类的源码账本(截至 python-sdk main 分支 2026-04-22)

前面把 ClientSessionClientSessionGroupClient、各 transport 和 OAuth 穿了一遍。这里把真实行数与函数签名账本形式再归拢一遍——便于读者对着源码调试时按图索骥,也便于和 TS SDK 跨章对照。所有数据取自 github.com/modelcontextprotocol/python-sdksrc/mcp/client/ 目录。

11.8.7.1 session.py(536 行)的 11 个公开方法签名

方法协议操作关键 kwargs返回
__init__7 个 callback + client_info + experimental_task_handlers新增
initialize()initializeInitializeResult
send_ping(*, meta=None)pingmetaEmptyResult
list_tools(*, params=None)tools/listPaginatedRequestParams分页游标ListToolsResult
call_tool(name, arguments, read_timeout_seconds, progress_callback, *, meta=None)tools/callprogress_callback 支持流式进度CallToolResult
list_resources(*, params=None)resources/list同上ListResourcesResult
read_resource(uri, *, meta=None)resources/readmetaReadResourceResult
subscribe_resource(uri, *, meta=None)resources/subscribemetaEmptyResult
unsubscribe_resource(uri, *, meta=None)resources/unsubscribemetaEmptyResult
list_prompts(*, params=None)prompts/list分页ListPromptsResult
get_prompt(name, arguments, *, meta=None)prompts/getarguments: dict[str, str]GetPromptResult
complete(ref, argument, context_arguments=None)completion/completecontext_arguments 是 2025 spec 才加的——用于多级补全场景CompleteResult
send_progress_notification(token, progress, total, message, *, meta=None)notifications/progressmessage 是 2025 新增None
_received_request(responder)入站分发match/case 派发 CreateMessageRequest / ElicitRequest / ListRootsRequest / PingRequest

三个容易被忽视的签名细节——

  1. call_toolprogress_callback 不是 kwargs-only——位置参数可以传,这是为了和旧版兼容。但新代码推荐用 kwargs 写法,避免把 float 误传到 progress_callback 位置。
  2. complete(...)context_arguments 是 2025 spec 增补——用来在 prompt argument 自动补全时传”已填字段”作为上下文,让服务器算出更精准的候选。老服务器收到会忽略。
  3. experimental_task_handlers__init__ 的 kwargs-only 参数)——2026 年初并入 main 分支,承载 streaming task 协议扩展,日后可能升级为稳定 API,现在先走 experimental 命名空间。

11.8.7.2 session_group.py(471 行)的函数与行号对照

成员行号职责
ServerParameters TypeAlias46Stdio | Sse | StreamableHttp 三选一
_ComponentNames dataclass~75反向索引:某 session 注册了哪些 tool/resource/prompt
_session_exit_stacks: dict[ClientSession, AsyncExitStack]103每会话一个 stack(第二级)
__init__107-125可选外部 exit_stack + component_name_hook
__aenter__127-131纯返回 self,真工作在 connect_to_server
__aexit__133-148并发关 stack(anyio task group)
disconnect_from_server169-196单 session 热拔
connect_to_server184-191新建会话并聚合
_establish_session193-243ServerParameters 类型分发到对应 transport
_aggregate_components245-305临时字典 + 冲突检测 + 原子合并

_aggregate_components 长达 60 行——它把”收 tools / 收 resources / 收 prompts / 检测冲突 / 合并 / 更新反向索引”六个阶段串在一起。一半的代码量是为了原子性——临时字典不是写着好看的,是因为任一阶段抛异常都要能让 Group 回到聚合前的状态。

11.8.7.3 client/ 目录总行数分布

文件说明
session.py536主文件(比本书前文给的 480 稍大,说明 2026 年又加了几十行 experimental 代码)
session_group.py471比前文给的 410 多 60 行
client.py308高层 Client wrapper
streamable_http.py671最大的 transport——SSE 模式自适应 + Last-Event-ID 重连
stdio.py356Unix/Windows 分支 + 三段 shutdown
auth/oauth2.py646单文件最大
auth/utils.py339PKCE / metadata 发现工具
auth/extensions/client_credentials.py485client_credentials grant type 扩展
sse.py160最老的 HTTP transport
websocket.py85Python 独有全双工
_memory.py78InMemoryTransport(测试和 Client(server) 直连用)
_transport.py21抽象基类
context.py16client-side RequestContext 薄封装

三条可以拿来和 TS SDK 互校的结论——

  • Python auth/ 总计 1470 行(646+339+485);TS SDK 的 auth.ts 单文件就 1745 行(本书第 9 章 §9.11 实测)。OAuth 2.1 完整实现 1500-1800 行是行业基线——Python 用 3 个文件拆分、TS 挤在一个文件里,总量基本一致。
  • streamable_http.py 671 行是最大 transport——大约是 sse.py(160)的 4 倍多stdio.py(356)的 1.9 倍。复杂度主要来自三件事:(a)响应体 JSON vs SSE 自适应;(b)Last-Event-ID 断线续传;(c)DELETE 主动销毁 session。本书第 13 章会把这三件事的工程细节展开讲。
  • _memory.py 才 78 行——它是 Client(server) 构造器里”直接传 Server 实例”那一路用的。对应 TS SDK 里的 InMemoryTransport——TS 版本用来写单元测试很方便,Python 版本则进一步被 Client 吸收成”零配置嵌入模式”。

11.8.8 Transport 默认值与超时表(实测源码)

transport 层散落着许多默认值,这些值一旦误解会直接导致生产故障。这里把最重要的 11 个默认值做成账本,全部基于 src/mcp/client/ 的现行代码。

参数出处触发条件
stdio_client 内存流 buffer size0stdio.py:109-111每次 send 都必须有 receiver 就绪,天然背压,慢消费者会拖慢生产者
PROCESS_TERMINATION_TIMEOUT2.0 秒stdio.py:9关 stdin 后等子进程自行退出的窗口;超时后升级到 terminate()
stdio terminate() 超时5 秒stdio.py shutdown 段terminate() 后再给子进程 5 秒反应;超时升级到 kill()
stdio_client(errlog=...) 默认sys.stderrstdio.py:105-106子进程 stderr 直接冒到父进程 stderr——调试方便,生产里可能要重定向到日志文件
stdio 启动目录 cwd 默认None(继承父进程)StdioServerParameters 定义生产里最好显式设——避免把 server 绑到 Agent 的工作目录
stdio 环境变量 env 默认get_default_environment() 白名单stdio.py不是继承全部 env——只转发 PATH / HOME / USER 等安全白名单,防止 ANTHROPIC_API_KEY 等泄漏给子进程
streamable_http_client.terminate_on_closeTruestreamable_http.py退出上下文时发 DELETE /mcp——生产里如果 server 无状态可设 False 避免 unnecessary 请求
MCP_SESSION_ID headermcp-session-idstreamable_http.py小写——注意反代不要把它改成 Mcp-Session-Id(HTTP header 大小写不敏感但代码里是字面量)
SSE 响应 content-type 判别text/event-streamstreamable_http.py _handle_sse_response非此 content-type 就走 JSON 分支——反代给 SSE 响应压了 gzip 导致 content-type 变化是常见踩坑
ClientSession.read_timeout_secondsNone(无超时)session.py:__init__不设超时的 send_request 会永久挂起——生产必须设,比如 60s
OAuth2Auth.auth_flow 401 → refresh1 次auth/oauth2.py401 自动刷 token 重试一次;再 401 就抛——避免无限循环

11.8.8.1 get_default_environment() 白名单为什么重要

stdio.py 不把父进程全部 os.environ 塞给子进程、而是默认白名单过滤——macOS 下白名单包含 HOMELOGNAMEPATHSHELLTERMUSER,其他一律丢弃。

这是一次有意识的安全设计——子进程是第三方 MCP server、可能是不信任代码。如果把 ANTHROPIC_API_KEY / OPENAI_API_KEY / AWS_ACCESS_KEY_ID全部透传过去、server 一行 os.environ.get("ANTHROPIC_API_KEY") 就拿走你的密钥。TS SDK 的 StdioClientTransport 也做了同样的过滤(本书第 9 章 §9.10 讨论过)——这是两个 SDK 难得完全一致的安全默认

需要给子 server 传敏感凭据时,显式写 env={"MY_TOKEN": "xxx"}——明确同意而不是默认透传。

11.8.8.2 read_timeout_seconds 不设值就是永久挂起

ClientSessionread_timeout_seconds 默认 None 这件事每个生产项目都会踩一次——

  • 本地开发测试:server 一切正常、call_tool 毫秒级返回、没人注意超时
  • 部署到生产:某个 server 某次调用慢了或死锁、await session.call_tool(...) 永远不返回
  • Agent 主循环卡死、整个 user request 挂在 HTTP gateway 上

对策——

  • 初始化时显式传 ClientSession(..., read_timeout_seconds=60)——全局兜底
  • 单次调用临时放宽/收紧:session.call_tool(name, args, read_timeout_seconds=5)——快工具严查、慢工具放宽
  • 不要写 read_timeout_seconds=None 这种”显式永久等”——要么是 bug、要么加注释说明为什么刻意这么做

和本书第 4 章 §4.6 的”客户端侧超时分层”相互印证——超时不是一个值、是一组值(transport 级 / session 级 / 单次调用级)逐层覆盖。

11.8.9 跨章校对:Python Client 在 MCP 整本书里的位置

本章内容不是孤立的。Python Client 的每一块实现都能在本书其他章找到对应的协议规范服务端镜像。下面这张表把五对互查点列出来——读者发现本章某段描述和其他章有出入时、应该以最新的章为准、并回头修旧章。

主题本章位置协议规范章Server 镜像 / 兄弟 SDK
initialize 能力协商§11.2.1第 4 章 §4.3 initialize 握手第 10 章 §10.4 Python Server 端能力声明
tools/call + output schema 校验§11.2.2 _validate_tool_result第 5 章 §5.5 outputSchema 字段含义第 10 章 §10.6 Python Server 声明 output schema
resources/list + resources/subscribe§11.2.2 签名表第 6 章 §6.4 resource 订阅/变更通知第 10 章 §10.7 Server 广播 resources/list_changed
CreateMessageRequest(sampling 反向调用)§11.2.3 _received_request 分发第 17 章 §17.2 sampling 协议第 9 章 §9.7 TS Client 对称实现
stdio transport graceful shutdown§11.4.1 三段式关闭第 12 章 §12.5 stdio 生命周期第 9 章 §9.9 TS stdio transport(用 signal + exit code)
Streamable HTTP mcp-session-id§11.4.2 + §11.8.8 默认值表第 13 章 §13.4 session 维护第 10 章 §10.8 Python Server 生成 session ID
OAuth2 PKCE + TokenStorage§11.5第 15 章 §15.3 MCP OAuth 2.1 profile第 10 章 §10.11 Python Server OAuth metadata 发布
ClientSessionGroup 多服务器聚合§11.3MCP 规范没有 group 概念,这是 SDK 层扩展TS SDK 不提供、需 Agent 框架自行实现

11.8.9.1 Client 与 Server 的”镜像关系”

每个 Client 方法都有一个 Server 端处理函数镜像——

  • session.call_tool("add", ...) ↔ Server 端 @server.tool() 装饰的 add() 函数(见第 10 章 §10.5.3 的 FastMCP 装饰器链)
  • session.subscribe_resource("file:///log") ↔ Server 端 resource_subscriptions: set[str] 记账 + send_resource_updated() 推送
  • 客户端 sampling_callback ↔ Server 端 ctx.session.create_message(...) 发起的反向请求

这种对称性意味着——调试 Client bug 时、经常要同时开两端的日志对着看。本书第 10 章 §10.12 给出了”client_debug=true + server_debug=true” 双端日志模板,建议收藏。

11.8.9.2 和 TS SDK 的互查

本章 §11.6 已经列过 9 项差异表。以下三条是最容易混淆的点、读者在 Python 和 TS 之间切换时要留意——

  1. async with vs 手动 connect/close——Python 用户忘记会触发语法错误、TS 用户忘记会静默泄漏 socket 或子进程。本书第 9 章 §9.6 给出了 TS 的 try/finally 最佳实践。
  2. list_tools() 的返回类型——Python 返回 ListToolsResult驼峰字段反序列化为 snake_casenext_cursor);TS 返回 { tools, nextCursor }camelCase)。混用两端代码时的 JSON schema 别名要留心。
  3. 分页参数位置——Python 放在 kwargs-only 的 params=PaginatedRequestParams(cursor=...);TS 直接第二参数传 { cursor: "..." }。TS 更紧凑、Python 更类型安全——没有哪个更好。

11.8.9.3 ClientSessionGroup 是”SDK 层创新”而非协议规范

特别要指出的是——ClientSessionGroup 不是 MCP 规范的一部分。规范只讲单 client 单 server 的协议;多服务器聚合是 Python SDK 提供的上层工具。这带来两个工程后果——

  • :换了 SDK 版本、ClientSessionGroup API 可能变、协议本身不会变——底层 ClientSession 始终稳定。
  • 需要注意:其他语言的 SDK(TS / Go / Rust)不一定有对应抽象——跨语言 Agent 框架得自己实现聚合层。LangChain MCP adapter、LangGraph、crewai 各家都有自己的多 server 管理代码——它们解决的是同一个问题、但没有统一接口

这也揭示了 MCP 生态下一步的可能走向——“多服务器聚合层”是否要进规范?目前看 2026 年上半年的 spec 讨论还没涵盖、仍留在 SDK 侧。如果未来真的进规范、本书这一节会第一时间更新。

11.8.10 _received_request 的 match/case 分发路径全图

session.py_received_request 是 Client 端最密集的状态机——它接收服务器反向发来的所有请求,按类型路由到对应 callback。下面把所有可能的入站请求列齐,并标明每个分支的默认行为。

入站请求类型协议名默认回调名未注册时行为典型用途
CreateMessageRequestsampling/createMessage_default_sampling_callback返回 ErrorData(INVALID_REQUEST, "Sampling not supported")第 17 章的 sampling 协议——server 借 client 的 LLM
ElicitRequestelicitation/create_default_elicitation_callback返回 ErrorData(..., "Elicitation not supported")第 18 章——server 向用户发起结构化提问
ListRootsRequestroots/list_default_list_roots_callback返回 ErrorData(..., "List roots not supported")第 18 章——server 请求当前工作空间列表
PingRequestping内置 EmptyResult()直接应答、不经 user 代码第 4 章的心跳

11.8.10.1 四个”协议规定的入站请求”

MCP 2025 spec 里 server → client 只有上面四种 request——其他都是 notification 或 response。这意味着 _received_requestmatch/case 只需要覆盖 4 个 case,理论上永远不会走到 case _: 兜底——如果走到了要么是 client SDK 版本滞后于 server、要么是 server 发了协议外消息。

实际代码里有 case _:——返回 INVALID_REQUEST 错误。这是正确的降级策略——未知请求不是让 Client 崩溃、而是礼貌回”不认识”、让 server 自己决定怎么处理。

11.8.10.2 默认回调的”三字错误”设计

仔细看四个 _default_*_callback 的错误消息——

"Sampling not supported"
"Elicitation not supported"
"List roots not supported"

都是 <功能名> not supported 三词格式——这不是偶然。这是 SDK 给 server 开发者的机器可读暗示

  • 错误码固定 INVALID_REQUEST(-32600)
  • 错误消息可以用字符串匹配判断是”能力未声明”还是”能力声明了但执行失败”
  • server 侧可以写 if "not supported" in err.message: fallback()

更严格的做法是 spec 用专门 error code(比如 -32004 “Capability Not Declared”)——但目前 spec 没规定、SDK 层约定俗成。本书第 4 章 §4.8 讨论过 JSON-RPC 错误码在 MCP 里的扩展,这里是一个”约定先于规范”的典型。

11.8.10.3 with responder: 为什么是同步上下文管理器

读源码会注意到——match 里每个 case 都写 with responder:(同步 with、不是 async with)。这是因为 RequestResponder.__enter__ 只做同步的 CancelScope 设置、不需要异步资源:

  • __enter__ — 创建 anyio.CancelScope 并 enter;如果后续 await 超时、这个 scope 可以整体取消正在执行的 callback
  • __exit__ — exit scope、把这个 responder 从 session 的”待处理请求”集合里移除、允许 GC

为什么这个设计很重要——假设 client 正在处理 server 发来的 sampling/createMessage(调 LLM 可能要 30 秒)、中途 server 想取消(发 notifications/cancelled)——session 可以 responder._cancel_scope.cancel()、让正在 await 的 LLM 调用立刻抛 CancelledError、回收资源。

如果不用 with responder: 包住、cancel scope 就建不起来、server 发的 cancel 通知 client 就吃不到——只能等 callback 自然完成。这是 MCP 双向取消协议在 client 侧的实现基础(第 4 章 §4.9 讨论过 cancellation 的 wire 格式、此处是 callback 侧如何响应)。

11.8.11 ClientSession 的消息处理循环(_receive_loop_received_notification

前面讲了”发请求” call_tool 和”收请求” _received_request——但 MCP 还有第三种消息:server 发来的 notification(通知、无需应答)。处理入口是 _received_notification

11.8.11.1 客户端关心的 notification 类型

Server → Client notification含义默认 callback
notifications/message结构化日志_default_logging_callback(no-op)
notifications/resources/updated某 resource 内容变了传给 user message_handler
notifications/resources/list_changed资源列表变更同上
notifications/tools/list_changed工具列表变更同上
notifications/prompts/list_changed提示词列表变更同上
notifications/progress长耗时操作进度路由到对应 progress_callback
notifications/cancelledserver 主动取消取消对应请求的 CancelScope

11.8.11.2 message_handler 的兜底地位

注意——四个 *_list_changed 通知没有专门 callback。它们默认都路由到 message_handlerClientSession.__init__ 的参数)——一个”什么都收”的兜底函数。

这个设计反映了 SDK 的取舍——high-churn 的通知类型用一个通用 handler 处理、避免给每种 list_changed 都单独配 callback 造成 API 表面爆炸。缺点是—— user code 要写 isinstance(msg, ResourceListChangedNotification) 之类的 dispatch。

推荐实践——

async def handler(msg):
    match msg:
        case types.ResourceListChangedNotification():
            await refresh_resource_cache()
        case types.ToolListChangedNotification():
            await refresh_tool_cache()  # 会顺带重算 _tool_output_schemas
        case types.LoggingMessageNotification(params=p):
            logger.log(p.level, p.data)

11.8.11.3 _tool_output_schemas 的失效时机

工具列表变更时、本章 §11.2.2 讲的 schema 缓存必须失效——否则 call_tool 会拿旧 schema 校验新 tool 的返回、误报 validation 错误。

session.py 源码、SDK 在 list_tools() 返回时无条件覆盖整个 _tool_output_schemas 字典——这是隐式失效机制。但如果 user 收到 tools/list_changed 不主动调 list_tools()、旧 schema 会一直缓存。

对策——在 message_handler 里看到 ToolListChangedNotification 立刻调 session.list_tools()——既更新了 UI / Agent 端能看到的工具列表、又刷新了 schema 缓存。源码里这是个”二合一”操作、不要分开想。

11.8.12 OAuth2 流程的五个关键决策点

§11.5 给出了 PKCEParameters / TokenStorage / OAuth2Auth 三个构件——这里把它们串起来、看一次完整授权的五个决策点。这些决策点决定了生产环境下的使用体验、也是和 TS SDK 对照时最容易混淆的地方。

11.8.12.1 决策一:token 从哪里来(TokenStorage 的读取顺序)

OAuth2Auth.auth_flow 每次进入先问 self._tokens——内存缓存。内存缓存从哪来__aenter__ 时从 TokenStorage.get_tokens() 异步读一次、整个 session 生命周期复用。

这意味着——

  • 如果 token 在外部被刷新(比如另一个 client 实例更新了 Keychain)、当前 session 不会感知——它只读一次。
  • session 内部自己刷的 token 会调 TokenStorage.set_tokens(...) 回写——读一次、写多次的模型。
  • 对多 client 竞争的场景、TokenStorage 实现应该做带版本号的乐观锁或每次都 get_tokens() 重读——SDK 默认不这么做、生产需要自己扩展。

11.8.12.2 决策二:401 自动刷新只重试一次

auth_flow 收到 401 会走 _refresh_flow 刷新一次、再拿新 token 重试原请求。第二次还 401 就抛异常——不是无限循环。

这是故意限制——如果 refresh_token 也失效(比如过期或被 revoke)、再刷也是 401、死循环只会堆请求。SDK 选择”快速失败”、让 user code 决定是重新走完整授权还是放弃。

11.8.12.3 决策三:PKCE 的 code_verifier 长度用上限

PKCEParameters.generate()range(128) 生成 code_verifier——直接取 RFC 7636 允许的最大长度

最短只要 43 字符就合规——为什么用 128?安全性和兼容性的保守取值——128 字符的 verifier 约等于 760 bit 熵(字符集 [A-Za-z0-9-._~] 共 66 个字符、log2(66^128) ≈ 774 bit)。远超 AES-256 的 256 bit——对 authorization code 截获后爆破 verifier 的攻击者来说、几乎是不可能穷举

代价是 URL 里 code_challenge 字段多 85 字符——不影响任何实际使用。

11.8.12.4 决策四:authorization code 交换用 POST form-urlencoded

实际查 OAuth 2.1 spec 和 auth/oauth2.py 实现——token endpoint 请求 body 是 application/x-www-form-urlencoded、不是 JSON。这是 OAuth 社区的历史遗留——2012 年 RFC 6749 定的、现在改不动了。

新手常犯的错——把 grant_type / code / code_verifier 塞进 JSON body 发给 token endpoint——所有规范 OAuth server 都返回 400 invalid_requesthttpxdata=... 参数默认就是 form-urlencoded、SDK 用的也是这个——把几乎所有 server 的脾气都照顾到。

11.8.12.5 决策五:Dynamic Client Registration 的 fallback

MCP OAuth profile(第 15 章 §15.3)要求 server 必须支持 RFC 7591 Dynamic Client Registration——client 不用人工去 server 后台申请 client_id、调用 /register 就能拿到。

auth/utils.py 里实现了这一步——但带兜底:如果 server 的 .well-known/oauth-authorization-server 没声明 registration_endpoint、SDK 会回退到”假设 client_id 已预先配置”、从 TokenStorage.get_client_info() 读。

这是为了兼容老 OAuth server——不是所有 server 都实现 RFC 7591。SDK 的策略是——能自动就自动、不能自动就走静态配置——不把 user 逼到死角。

11.8.13 一个容易被忽略的边界:_receive_loop 的异常传播

最后补一个源码级细节——BaseSession._receive_loop(父类、不在 client.py 里但客户端用的就是它)接到来自 read_streamException 对象时——不会抛出,而是转发给 message_handler

回看 ClientSession.__init__ 的签名——read_stream: ReadStream[SessionMessage | Exception]——流里可以流 Exception。这是 anyio 的惯用法——transport 层(stdio 子进程死了 / HTTP 连接断了 / JSON 解析失败)把 exception 包成 stream 元素、而不是抛 exception 弄死 receive loop。

这对 user 的影响——

  • message_handler 时记得判断 if isinstance(msg, Exception): handle_err(msg); return
  • 不这么判断的话、transport 出错是静默的——stream 可能已空、但 user 不知道为什么 session 突然不工作了
  • 推荐每个 handler 最顶上 4 行模板:
async def handler(msg):
    if isinstance(msg, Exception):
        logger.error("Transport error: %r", msg)
        await trigger_reconnect()
        return
    # ... 正常处理

这是 Python SDK 和 TS SDK 的第 10 个隐藏差异——TS SDK 的 transport 用 event emitter、错误走 transport.onerror(err) callback;Python 用 stream 元素、错误和正常消息走同一条通道。哪种风格更好没有定论——Python 的好处是 error 处理和消息处理都走 match/case、代码路径唯一;TS 的好处是 user 不写 error handling 时错误起码会冒出来、不会静默。

11.9 本章小结

本章深入分析了 MCP Python SDK 的客户端实现。

核心要点

  1. ClientSession 作为单服务器会话的核心抽象,继承自 BaseSession 并提供了完整的 MCP 协议操作方法
  2. ClientSessionGroup 在此基础上实现了多服务器管理与组件聚合
  3. Python SDK 充分利用了 anyio 的结构化并发、async with 上下文管理器、Pydantic 类型校验等语言特性
  4. 传输层的四种实现(stdio、SSE、Streamable HTTP、WebSocket)通过统一的读写流抽象与上下文管理器模式,实现了对应用层的完全透明
  5. OAuth2 认证作为可插拔的 httpx 中间件,为 HTTP 类传输提供了标准化的安全方案

记住这三个设计范式

  • async with 贯穿一切——资源获取与释放绑定在语法结构上
  • Protocol + TypeAdapter——结构化子类型 + 运行时校验的组合拳
  • 聚合 + 命名空间 + 反向索引——ClientSessionGroup 的设计范式可以迁移到其他需要”聚合多个同类服务”的场景

下一章我们将进入传输层的深度实现——第 12 章详解 stdio 传输层,看子进程通信的精细工程。

进入第 12 章之前:带着这三个问题去读

读完本章应该能在脑子里复现 ClientSession 从构造到 async with 退出的完整路径。第 12 章会把视角从会话层进一步下沉到字节层——以下三个问题可以作为阅读指针:

  1. anyio.create_memory_object_stream(0)0 容量意味着读写必须同步握手——stdio transport 是怎么把这种严格背压和子进程的 stdout 缓冲区调和的?
  2. Windows 下 start_new_session=True 不可用——SDK 用 Job Object 替代实现进程树整体终止、这个技术细节本章只提了名字、第 12 章会展开源码。
  3. 三段式 shutdown 的 2 秒 / 5 秒两个超时是怎么定出来的?比 TS SDK 激进还是保守?第 12 章给出对照。