Skip to content

第13章 Streamable HTTP:远程流式传输

在第12章中,我们分析了 stdio 传输——一种简单而高效的本地通信方式。但现实世界中的 AI 系统,从多人协作的开发环境到云端部署的 Agent 平台,都需要跨网络的远程通信能力。这就是 Streamable HTTP 传输层要解决的核心问题。

MCP 协议在早期版本中使用的是 SSE+HTTP POST 的组合方案:客户端通过一个长连接的 SSE 通道接收服务端消息,再通过独立的 POST 请求发送消息。这种设计虽然可行,但存在明显的架构缺陷——两个独立的 HTTP 连接需要通过额外的机制(如 URL 中嵌入的 endpoint 路径)进行关联,增加了实现复杂度,也限制了部署灵活性。Streamable HTTP 正是为了取代这一方案而设计的。

13.1 设计动机:为什么需要 Streamable HTTP

要理解 Streamable HTTP 的设计选择,我们需要先看清它面临的约束条件。

第一个约束是 HTTP 的半双工特性。 与 WebSocket 这样的全双工协议不同,HTTP 本质上是请求-响应模式——客户端发起请求,服务端返回响应。在一次 HTTP 事务中,通信方向是固定的。这意味着服务端无法在没有客户端请求的情况下主动推送消息。

第二个约束是基础设施兼容性。 现实中的网络基础设施——代理服务器、负载均衡器、CDN、WAF——都是为 HTTP 设计的。WebSocket 虽然是全双工的,但并非所有中间件都能正确处理它。Streamable HTTP 选择在标准 HTTP 语义之上构建,最大程度地利用现有基础设施。

第三个约束是流式传输的需求。 MCP 的许多操作天然是流式的——工具调用可能需要数分钟才能完成,期间需要向客户端推送进度更新。普通的 HTTP 请求-响应无法满足这一需求。

Streamable HTTP 的核心设计思路可以用一句话概括:用 POST 请求承载客户端到服务端的消息,用 SSE(Server-Sent Events)流承载服务端到客户端的流式响应。 这不是两个独立的连接,而是将 SSE 嵌入到 POST 请求的响应体中。

13.2 协议机制:三种 HTTP 方法的分工

Streamable HTTP 使用同一个 URL endpoint,通过三种 HTTP 方法实现不同的通信语义。

POST:客户端到服务端的消息通道

客户端通过 POST 请求发送 JSON-RPC 消息。请求体是标准的 JSON-RPC 消息(或批量消息数组),Content-Type 为 application/json。关键的设计点在于 Accept 头——客户端必须同时声明接受 application/jsontext/event-stream,因为服务端可能以任一格式响应。

服务端对 POST 请求有三种可能的响应方式:

  1. 202 Accepted:当客户端发送的是通知(notification)或响应(response)时,服务端无需返回内容,直接返回 202。
  2. 200 OK + JSON:当服务端选择非流式模式时,直接返回 JSON-RPC 响应。
  3. 200 OK + SSE:当服务端选择流式模式时,返回一个 SSE 流,在流中逐步推送消息。

来看 TypeScript 客户端中 _send 方法的核心逻辑(源码位于 packages/client/src/client/streamableHttp.ts):

typescript
// 设置请求头——同时声明接受 JSON 和 SSE
headers.set('content-type', 'application/json');
headers.set('accept', [...new Set(types)].join(', '));
// types 包含 'application/json' 和 'text/event-stream'

const response = await (this._fetch ?? fetch)(this._url, init);

// 从响应头中提取 session ID
const sessionId = response.headers.get('mcp-session-id');
if (sessionId) {
    this._sessionId = sessionId;
}

// 202 表示服务端已接收但无需返回内容
if (response.status === 202) {
    // 如果是 initialized 通知,开启 GET SSE 流
    if (isInitializedNotification(message)) {
        this._startOrAuthSse({ resumptionToken: undefined })
            .catch(error => this.onerror?.(error));
    }
    return;
}

// 根据响应的 Content-Type 决定处理方式
const contentType = response.headers.get('content-type');
if (contentType?.includes('text/event-stream')) {
    // 流式响应——交给 SSE 处理器
    this._handleSseStream(response.body, { onresumptiontoken }, false);
} else if (contentType?.includes('application/json')) {
    // 非流式响应——直接解析 JSON
    const data = await response.json();
    const responseMessages = Array.isArray(data)
        ? data.map(msg => JSONRPCMessageSchema.parse(msg))
        : [JSONRPCMessageSchema.parse(data)];
    for (const msg of responseMessages) {
        this.onmessage?.(msg);
    }
}

这段代码揭示了 Streamable HTTP 的一个重要设计决策:客户端不预设服务端的响应格式。同一个请求,服务端可以根据自身能力和当前场景选择 JSON 或 SSE 响应。这种灵活性使得简单的服务端可以只实现 JSON 响应,而复杂的服务端可以利用 SSE 进行流式传输。

GET:服务端主动推送通道

GET 请求用于建立一个独立的 SSE 长连接,使服务端能够在没有客户端请求的情况下主动推送消息——例如服务端发起的请求(server-initiated requests)和通知。这个通道是可选的:如果服务端返回 405 Method Not Allowed,客户端会静默忽略,不报错。

服务端侧的实现限制了每个 session 只能有一个 GET 流。来看服务端的处理逻辑(源码位于 packages/server/src/server/streamableHttp.ts):

typescript
// 检查是否已有活跃的 GET 流
if (this._streamMapping.get(this._standaloneSseStreamId) !== undefined) {
    return this.createJsonErrorResponse(
        409, -32_000,
        'Conflict: Only one SSE stream is allowed per session'
    );
}

DELETE:会话终止

客户端通过 DELETE 请求显式终止会话。服务端可以返回 200 OK 确认终止,也可以返回 405 Method Not Allowed 表示不支持客户端主动终止会话。

13.3 会话管理:有状态与无状态的权衡

Streamable HTTP 支持两种运行模式:有状态(stateful)和无状态(stateless),通过服务端是否提供 sessionIdGenerator 来决定。

在有状态模式下,服务端在处理 initialize 请求时生成一个 session ID,通过 mcp-session-id 响应头返回给客户端。此后,客户端必须在每个请求中携带这个 session ID。来看服务端的会话验证逻辑:

typescript
private validateSession(req: Request): Response | undefined {
    if (this.sessionIdGenerator === undefined) {
        // 无状态模式,跳过验证
        return undefined;
    }
    if (!this._initialized) {
        return this.createJsonErrorResponse(
            400, -32_000, 'Bad Request: Server not initialized'
        );
    }
    const sessionId = req.headers.get('mcp-session-id');
    if (!sessionId) {
        // 缺少 session ID,返回 400
        return this.createJsonErrorResponse(
            400, -32_000, 'Bad Request: Mcp-Session-Id header is required'
        );
    }
    if (sessionId !== this.sessionId) {
        // session ID 不匹配,返回 404
        return this.createJsonErrorResponse(404, -32_001, 'Session not found');
    }
    return undefined;
}

这里有一个重要的设计细节:无效的 session ID 返回 404 Not Found,而非 403 Forbidden。这是因为对于客户端来说,无效的 session 等同于不存在的资源——客户端应该重新初始化,而不是尝试修复认证。

Python 实现中(源码位于 src/mcp/server/streamable_http.py),session ID 还有格式验证的约束:

python
# 只允许可见 ASCII 字符(0x21-0x7E)
SESSION_ID_PATTERN = re.compile(r"^[\x21-\x7E]+$")

def __init__(self, mcp_session_id: str | None, ...):
    if mcp_session_id is not None and not SESSION_ID_PATTERN.fullmatch(mcp_session_id):
        raise ValueError(
            "Session ID must only contain visible ASCII characters (0x21-0x7E)"
        )

无状态模式适用于简单场景或 serverless 部署——每个请求独立处理,不需要在服务端维护状态。有状态模式则适用于需要上下文延续的场景,例如多轮对话中的工具调用。

13.4 SSE 流处理:priming event 与断线重连

SSE(Server-Sent Events)是 Streamable HTTP 中实现流式传输的核心机制。与 WebSocket 不同,SSE 是单向的(服务端到客户端),但正好契合 MCP 的需求——在一个 POST 请求的响应中,服务端需要向客户端推送多条消息。

Priming Event:建立可恢复性的基石

当服务端配置了 EventStore(支持可恢复性)时,SSE 流的第一个事件是一个特殊的"priming event"——它只包含一个事件 ID 和空数据,不携带任何 JSON-RPC 消息。其目的是让客户端尽早获得一个可用于断线恢复的 event ID。

typescript
private async writePrimingEvent(
    controller: ReadableStreamDefaultController<Uint8Array>,
    encoder: InstanceType<typeof TextEncoder>,
    streamId: string,
    protocolVersion: string
): Promise<void> {
    if (!this._eventStore) return;

    // 旧版客户端不支持空数据的 SSE 事件,会尝试将空字符串解析为 JSON 而崩溃
    if (protocolVersion < '2025-11-25') return;

    const primingEventId = await this._eventStore.storeEvent(
        streamId, {} as JSONRPCMessage
    );
    let primingEvent = `id: ${primingEventId}\ndata: \n\n`;
    if (this._retryInterval !== undefined) {
        primingEvent = `id: ${primingEventId}\nretry: ${this._retryInterval}\ndata: \n\n`;
    }
    controller.enqueue(encoder.encode(primingEvent));
}

注意版本检查 protocolVersion < '2025-11-25' 这行代码——这是一个向后兼容的防御措施。早期版本的客户端会将每个 SSE 事件的 data 字段当作 JSON 解析,空数据会导致解析失败。只有 2025-11-25 及之后版本的协议才正确处理了这一点。

断线重连机制

网络连接不可能永远稳定。Streamable HTTP 提供了一套完整的断线重连机制,包括指数退避和服务端控制的重试间隔。

客户端的重连核心逻辑体现在 _handleSseStream_scheduleReconnection 方法中:

typescript
private _handleSseStream(
    stream: ReadableStream<Uint8Array> | null,
    options: StartSSEOptions,
    isReconnectable: boolean
): void {
    let lastEventId: string | undefined;
    let hasPrimingEvent = false;  // 是否收到过带 ID 的事件
    let receivedResponse = false; // 是否已收到最终响应

    const processStream = async () => {
        try {
            const reader = stream
                .pipeThrough(new TextDecoderStream())
                .pipeThrough(new EventSourceParserStream({
                    onRetry: (retryMs: number) => {
                        // 捕获服务端指定的重试间隔
                        this._serverRetryMs = retryMs;
                    }
                }))
                .getReader();

            while (true) {
                const { value: event, done } = await reader.read();
                if (done) break;

                if (event.id) {
                    lastEventId = event.id;
                    hasPrimingEvent = true;
                    onresumptiontoken?.(event.id);
                }

                // 跳过空数据事件(priming event、keep-alive)
                if (!event.data) continue;

                // 解析并分发 JSON-RPC 消息
                const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
                if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
                    receivedResponse = true;
                }
                this.onmessage?.(message);
            }

            // 流正常关闭后,判断是否需要重连
            const canResume = isReconnectable || hasPrimingEvent;
            const needsReconnect = canResume && !receivedResponse;
            if (needsReconnect && !this._abortController?.signal.aborted) {
                this._scheduleReconnection({ resumptionToken: lastEventId, ... }, 0);
            }
        } catch (error) {
            // 网络断开等异常,同样尝试重连
            // ...
        }
    };
    processStream();
}

这段代码中有两个关键的布尔变量:

  • hasPrimingEvent:标记是否收到过带 event ID 的事件。只有收到过 priming event,POST 请求的 SSE 流才被视为可恢复的。
  • receivedResponse:标记是否已收到最终的 JSON-RPC 响应。如果已经收到了完整响应,即使流断开也不需要重连——请求已经完成了。

重连的退避策略由 _getNextReconnectionDelay 计算:

typescript
private _getNextReconnectionDelay(attempt: number): number {
    // 优先使用服务端指定的重试间隔
    if (this._serverRetryMs !== undefined) {
        return this._serverRetryMs;
    }
    // 否则使用指数退避
    const initialDelay = this._reconnectionOptions.initialReconnectionDelay; // 默认 1s
    const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor; // 默认 1.5
    const maxDelay = this._reconnectionOptions.maxReconnectionDelay; // 默认 30s
    return Math.min(initialDelay * Math.pow(growFactor, attempt), maxDelay);
}

默认配置下,重试间隔为 1s、1.5s、2.25s...,上限 30s,最多重试 2 次。服务端可以通过 SSE 的 retry 字段覆盖这个间隔,从而实现服务端对客户端轮询频率的控制。

13.5 EventStore:事件持久化与重放

断线重连的另一半在服务端——当客户端带着 Last-Event-ID 头重新连接时,服务端需要能够重放在断线期间产生的事件。这就是 EventStore 的作用。

EventStore 是一个接口,定义了事件存储和重放的契约:

typescript
// TypeScript 接口(packages/server/src/server/streamableHttp.ts)
export interface EventStore {
    storeEvent(streamId: StreamId, message: JSONRPCMessage): Promise<EventId>;
    getStreamIdForEventId?(eventId: EventId): Promise<StreamId | undefined>;
    replayEventsAfter(
        lastEventId: EventId,
        { send }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
    ): Promise<StreamId>;
}
python
# Python 抽象类(src/mcp/server/streamable_http.py)
class EventStore(ABC):
    @abstractmethod
    async def store_event(
        self, stream_id: StreamId, message: JSONRPCMessage | None
    ) -> EventId: ...

    @abstractmethod
    async def replay_events_after(
        self, last_event_id: EventId, send_callback: EventCallback
    ) -> StreamId | None: ...

设计上有几个值得注意的点:

第一,EventStore 是可选的。 不配置 EventStore 的服务端不支持断线恢复,客户端断线后只能重新初始化。这降低了简单部署场景的实现复杂度。

第二,getStreamIdForEventId 方法是可选的。 TypeScript 中用 ? 标记,Python 中则不要求实现。如果提供了这个方法,服务端可以在重放前检查 event ID 的合法性并进行冲突检测;如果没有提供,服务端依赖 replayEventsAfter 返回的 stream ID 进行映射。

第三,事件存储与流传输解耦。 在 Python 实现的 message_router 中,无论客户端是否在线,消息都会被存入 EventStore:

python
# 无论客户端是否连接,都存储事件
event_id = None
if self._event_store:
    event_id = await self._event_store.store_event(request_stream_id, message)

if request_stream_id in self._request_streams:
    await self._request_streams[request_stream_id][0].send(
        EventMessage(message, event_id)
    )
else:
    # 客户端未连接,但事件已存储,重连时可重放
    logger.debug("Request stream not found, client might reconnect and replay.")

这种设计意味着即使客户端在服务端处理请求的过程中完全断开,所有中间事件都不会丢失。客户端重连后,通过 Last-Event-ID 头可以获取断线期间的所有事件。

服务端处理重放请求的流程如下(TypeScript 实现):

typescript
private async replayEvents(lastEventId: string): Promise<Response> {
    // 1. 如果提供了 getStreamIdForEventId,先验证 event ID 合法性
    if (this._eventStore.getStreamIdForEventId) {
        const streamId = await this._eventStore.getStreamIdForEventId(lastEventId);
        if (!streamId) {
            return this.createJsonErrorResponse(400, -32_000, 'Invalid event ID format');
        }
        // 检查该 stream 是否已有活跃连接
        if (this._streamMapping.get(streamId) !== undefined) {
            return this.createJsonErrorResponse(409, -32_000,
                'Conflict: Stream already has an active connection');
        }
    }

    // 2. 创建新的 SSE 流
    const readable = new ReadableStream<Uint8Array>({ ... });

    // 3. 调用 EventStore 重放历史事件
    const replayedStreamId = await this._eventStore.replayEventsAfter(lastEventId, {
        send: async (eventId, message) => {
            this.writeSSEEvent(streamController, encoder, message, eventId);
        }
    });

    // 4. 注册新的 stream mapping,继续接收后续事件
    this._streamMapping.set(replayedStreamId, { controller, encoder, cleanup: ... });

    return new Response(readable, { headers });
}

13.6 认证与 401 重试

远程传输必然涉及认证。Streamable HTTP 在传输层集成了 OAuth 认证流程,其核心设计是透明的 401 重试机制——当请求收到 401 Unauthorized 响应时,传输层会自动尝试重新认证并重发请求,对上层协议完全透明。

客户端的 _send 方法通过 isAuthRetry 参数实现了"最多重试一次"的语义:

typescript
private async _send(
    message: JSONRPCMessage | JSONRPCMessage[],
    options: ... | undefined,
    isAuthRetry: boolean  // 是否为认证重试
): Promise<void> {
    // ...
    if (response.status === 401 && this._authProvider) {
        if (response.headers.has('www-authenticate')) {
            const { resourceMetadataUrl, scope } = extractWWWAuthenticateParams(response);
            this._resourceMetadataUrl = resourceMetadataUrl;
            this._scope = scope;
        }
        if (this._authProvider.onUnauthorized && !isAuthRetry) {
            // 第一次 401:调用 onUnauthorized 刷新凭证,然后重试
            await this._authProvider.onUnauthorized({ response, serverUrl: this._url, ... });
            return this._send(message, options, true); // 标记为重试
        }
        if (isAuthRetry) {
            // 重试后仍然 401,抛出错误
            throw new SdkError(SdkErrorCode.ClientHttpAuthentication,
                'Server returned 401 after re-authentication', { status: 401 });
        }
        throw new UnauthorizedError();
    }
    // ...
}

同样的机制也应用在 GET SSE 流的建立过程中(_startOrAuthSse 方法),确保了 GET 流也能在认证过期时自动恢复。

此外还有一个 403 Forbidden 的处理——当服务端返回 insufficient_scope 错误时,客户端会提取新的 scope 信息,尝试用更高权限重新认证。为防止无限循环,代码通过 _lastUpscopingHeader 追踪上一次尝试的 WWW-Authenticate 头,如果两次相同就终止重试:

typescript
if (this._lastUpscopingHeader === wwwAuthHeader) {
    throw new SdkError(SdkErrorCode.ClientHttpForbidden,
        'Server returned 403 after trying upscoping', { status: 403 });
}

13.7 服务端架构:流映射与消息路由

服务端的实现需要解决一个核心的架构问题:如何将多个并发请求的响应路由到正确的 SSE 流?

TypeScript 服务端使用两层映射来解决这个问题:

  1. _requestToStreamMapping: Map<RequestId, string>——将每个请求 ID 映射到一个 stream ID。
  2. _streamMapping: Map<string, StreamMapping>——将 stream ID 映射到实际的 SSE 流控制器。

当服务端需要发送消息时,send 方法的逻辑如下:

typescript
async send(message: JSONRPCMessage, options?: { relatedRequestId?: RequestId }): Promise<void> {
    let requestId = options?.relatedRequestId;
    if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
        requestId = message.id;
    }

    // 没有关联的请求 ID——发送到 GET 独立流
    if (requestId === undefined) {
        const standaloneSse = this._streamMapping.get(this._standaloneSseStreamId);
        if (standaloneSse?.controller && standaloneSse?.encoder) {
            this.writeSSEEvent(standaloneSse.controller, standaloneSse.encoder, message, eventId);
        }
        return;
    }

    // 有关联的请求 ID——发送到对应的 POST 响应流
    const streamId = this._requestToStreamMapping.get(requestId);
    const stream = this._streamMapping.get(streamId);

    if (stream?.controller && stream?.encoder) {
        this.writeSSEEvent(stream.controller, stream.encoder, message, eventId);
    }

    // 如果是最终响应(result 或 error),检查是否所有请求都已完成
    if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
        this._requestResponseMap.set(requestId, message);
        const relatedIds = [...this._requestToStreamMapping.entries()]
            .filter(([_, sid]) => sid === streamId)
            .map(([id]) => id);
        const allResponsesReady = relatedIds.every(id => this._requestResponseMap.has(id));
        if (allResponsesReady) {
            stream.cleanup(); // 关闭 SSE 流
        }
    }
}

Python 实现采用了不同的架构——使用 anyio 的 MemoryObjectStream 作为内部消息通道,通过一个专门的 message_router 协程负责消息分发:

这里的 message_router 是一个长期运行的协程,它从 write_stream 读取上层协议产生的消息,根据消息类型和关联的请求 ID 分发到对应的请求流。这种设计将"消息从哪里来"和"消息到哪里去"彻底解耦。

13.8 协议版本协商

Streamable HTTP 引入了 mcp-protocol-version 头用于版本协商。在初始化请求中,版本信息通过 JSON-RPC 消息的 params.protocolVersion 字段传递。在后续请求中,版本通过 HTTP 头传递。

服务端的版本验证逻辑如下:

typescript
private validateProtocolVersion(req: Request): Response | undefined {
    const protocolVersion = req.headers.get('mcp-protocol-version');
    if (protocolVersion !== null
        && !this._supportedProtocolVersions.includes(protocolVersion)) {
        return this.createJsonErrorResponse(
            400, -32_000,
            `Bad Request: Unsupported protocol version: ${protocolVersion}`
        );
    }
    return undefined;
}

注意这里的逻辑:如果没有提供版本头,默认接受;只有提供了但不在支持列表中时才拒绝。这是一种渐进增强的设计——老客户端不发送版本头也能工作。

13.9 与其他传输方式的对比

理解 Streamable HTTP 的定位,需要将它与其他传输方式进行对比。

特性stdio旧版 SSE+POSTStreamable HTTPWebSocket
通信方向双向双向(两个连接)双向(POST+GET)全双工
连接数进程管道2 个 HTTP 连接1-2 个 HTTP 连接1 个连接
流式传输天然支持支持支持支持
断线恢复不支持不支持支持(EventStore)需自行实现
基础设施兼容不适用良好优秀中等
无状态部署不适用困难支持困难
实现复杂度

Streamable HTTP vs 旧版 SSE+POST: 旧方案需要客户端先通过 GET 建立 SSE 连接,服务端返回一个 endpoint URL,客户端再用这个 URL 发送 POST 请求。两个连接通过 URL 关联。Streamable HTTP 将 SSE 嵌入 POST 响应,消除了这种耦合。更重要的是,Streamable HTTP 支持服务端选择 JSON 或 SSE 响应,使得简单场景无需 SSE 即可工作。

Streamable HTTP vs WebSocket: WebSocket 的优势在于全双工——任何一方可以随时发送消息。但 WebSocket 需要专门的基础设施支持,且不能利用 HTTP 的缓存、认证、负载均衡等成熟机制。Streamable HTTP 虽然不是真正的全双工,但通过 GET 流和 POST 响应流的组合,在 HTTP 语义之上实现了等效的双向通信能力。

何时选择 Streamable HTTP: 当 MCP 服务需要跨网络访问时,Streamable HTTP 是首选。它兼容几乎所有的 HTTP 基础设施,支持无状态部署(适合 serverless),并且通过 EventStore 提供了可选的断线恢复能力。只有在局域网内对延迟极其敏感、且基础设施完全可控的场景下,WebSocket 才值得考虑。

13.10 本章小结

Streamable HTTP 是 MCP 协议中最复杂的传输层实现,也是远程部署场景的核心基础设施。它的设计体现了几个重要的工程哲学:

  1. 在约束中寻找最优解。 HTTP 的半双工特性是硬约束,但通过将 SSE 嵌入响应体、通过独立的 GET 流接收服务端推送,Streamable HTTP 在不破坏 HTTP 语义的前提下实现了等效的双向通信。

  2. 渐进增强而非全有全无。 EventStore 是可选的,GET 流是可选的,会话管理是可选的,JSON 与 SSE 响应可以共存。一个最简单的 Streamable HTTP 服务端只需要处理 POST 请求并返回 JSON 响应,就能通过协议兼容性测试。

  3. 面向故障设计。 Priming event、resumption token、指数退避重连、事件持久化与重放——这些机制共同构成了一套完整的容错体系。在不可靠的网络环境中,它们确保了长时间运行的操作不会因为瞬时断线而丢失进度。

  4. 传输层的认证透明性。 401 自动重试机制使得上层协议完全不需要关心认证状态的变化。这种关注点分离让协议层可以专注于业务语义,而传输层负责处理所有的网络和认证复杂性。

在下一章中,我们将分析 SSE 和 WebSocket 这两种传输方式——它们在某些场景下仍然有独特的价值。

基于 VitePress 构建