Skip to content

第8章 高级提取器:WebSocket、Multipart、ConnectInfo

第 7 章讲的四个提取器——Path/Query/State/Json——覆盖了大多数 HTTP API 场景。但 HTTP 之上还有几类"协议上的协议"或"运行时附着信息"需要 Axum 接住:双向帧流的 WebSocket、流式多部分表单、以及底层 TCP 连接的远端地址等。这一章把三个典型例子拆开,看 Axum 的提取器框架怎么把它们塞进 FromRequestParts / FromRequest 的同一模子里。

它们各自的关键特点:

三者在"消费 body"这条轴上的位置各不相同:WebSocket 完全不消费 body(升级握手是 HTTP/1.1 方式或 HTTP/2 CONNECT,body 留给后续升级协议);Multipart 消费 body 但只以"流"的方式,不缓冲;ConnectInfo 和 body 完全无关——它的数据来自 tokio 的连接监听阶段。这三种场景都被 axum 塞到了同一个提取器框架里,trait 没破例。

WebSocketUpgrade:握手在 parts 层完成

WebSocket 协议(RFC 6455 / RFC 8441)规定:HTTP/1.1 客户端发 GET + Upgrade: websocket + Connection: Upgrade + Sec-WebSocket-Key: <base64> + Sec-WebSocket-Version: 13;HTTP/2 客户端发 CONNECT + :protocol: websocket(扩展 CONNECT)。服务端校验这些头,如果都合法,返回 101 Switching ProtocolsSec-WebSocket-Accept: <SHA1-base64(key+guid)>;从此 TCP 连接不再跑 HTTP,变成裸的 WebSocket 帧流。

Axum 的 WebSocketUpgrade 提取器承担的是头验证部分,响应体的 101 返回和升级后的帧流处理都往后推迟。它实现 FromRequestParts——因为只需要看头部——这让它可以和其他提取器共存在 handler 参数列表。

from_request_parts:六种头校验

axum/src/extract/ws.rs:442-517 是 WebSocket 升级验证的全部逻辑:

rust
// axum/src/extract/ws.rs:442-517(节选,HTTP/1.1 分支)
impl<S> FromRequestParts<S> for WebSocketUpgrade<DefaultOnFailedUpgrade>
where S: Send + Sync,
{
    type Rejection = WebSocketUpgradeRejection;

    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
        let sec_websocket_key = if parts.version <= Version::HTTP_11 {
            if parts.method != Method::GET {
                return Err(MethodNotGet.into());
            }
            if !header_contains(&parts.headers, &header::CONNECTION, "upgrade") {
                return Err(InvalidConnectionHeader.into());
            }
            if !header_eq(&parts.headers, &header::UPGRADE, "websocket") {
                return Err(InvalidUpgradeHeader.into());
            }
            Some(parts.headers.get(header::SEC_WEBSOCKET_KEY)
                .ok_or(WebSocketKeyHeaderMissing)?.clone())
        } else {
            // HTTP/2 分支:CONNECT + :protocol=websocket
            if parts.method != Method::CONNECT { return Err(MethodNotConnect.into()); }
            // ... 检查 hyper::ext::Protocol 扩展头
            None
        };

        if !header_eq(&parts.headers, &header::SEC_WEBSOCKET_VERSION, "13") {
            return Err(InvalidWebSocketVersionHeader.into());
        }

        let on_upgrade = parts.extensions.remove::<hyper::upgrade::OnUpgrade>()
            .ok_or(ConnectionNotUpgradable)?;
        // ... 处理 Sec-WebSocket-Protocol 子协议列表
        Ok(Self { /* ... */ })
    }
}

逐段拆解其中的工程细节:

1. HTTP/1.1 与 HTTP/2 分流parts.version <= Version::HTTP_11 是分支条件。HTTP/1.1 走传统升级路径——GET + Upgrade: websocket;HTTP/2 走 RFC 8441 扩展 CONNECT——CONNECT + :protocol: websocket(在 hyper 里通过 hyper::ext::Protocol extension 暴露)。Axum 同时支持两条路径,code path 清晰分开

2. 头比较用了两种 helperws.rs:519-537):header_eq 做大小写不敏感的完整等值比较(用于 Upgrade: websocketSec-WebSocket-Version: 13 这类值固定的头);header_contains 做大小写不敏感的子串包含比较(用于 Connection: Upgrade, keep-alive 这种可能有多个 token 的头)。两种方式分别对应 HTTP 规范里不同头的语法规则

3. parts.extensions.remove::<hyper::upgrade::OnUpgrade>():这是 Axum 和 hyper 协作的关键。hyper 在 HTTP 解析时,如果识别到连接可以升级(看到了 Upgrade: 头并允许升级),会往请求的 extensions 里塞一个 OnUpgrade future——这个 future 在未来 await 时会返回一个可以双向读写的 hyper::upgrade::UpgradedWebSocketUpgrade 从 extensions 里把它拿走.remove 不是 .get)——后续的 on_upgrade 调用需要它,只能被一个提取器拿走一次

4. Sec-WebSocket-Protocol 子协议收集ws.rs:497-506):允许客户端指定多个候选子协议(逗号分隔),服务端选一个。get_all 获取所有同名头、逐项 split 逗号、trim、收集。这种对"多值头"的处理在各种 HTTP 头里常见

校验失败后返回 WebSocketUpgradeRejection——它是一个 composite rejection,不同原因对应不同 HTTP 状态码(通常是 400 Bad Request 或 405 Method Not Allowed)。

on_upgrade:返回 101 + spawn 任务

from_request_parts 完成后,handler 拿到一个 WebSocketUpgrade。真正的协议升级动作是调 .on_upgrade(callback)——ws.rs:346-410

rust
// axum/src/extract/ws.rs:346-410(节选)
pub fn on_upgrade<C, Fut>(self, callback: C) -> Response
where
    C: FnOnce(WebSocket) -> Fut + Send + 'static,
    Fut: Future<Output = ()> + Send + 'static,
    F: OnFailedUpgrade,
{
    let on_upgrade = self.on_upgrade;
    let protocol = self.protocol.clone();

    tokio::spawn(async move {
        let upgraded = match on_upgrade.await { /* ... */ };
        let upgraded = TokioIo::new(upgraded);
        let socket = WebSocketStream::from_raw_socket(
            upgraded, protocol::Role::Server, Some(config)
        ).await;
        let socket = WebSocket { inner: socket, protocol };
        callback(socket).await;
    });

    // 构造 101 响应
    let mut response = if let Some(sec_websocket_key) = &self.sec_websocket_key {
        Response::builder()
            .status(StatusCode::SWITCHING_PROTOCOLS)
            .header(header::CONNECTION, UPGRADE)
            .header(header::UPGRADE, WEBSOCKET)
            .header(
                header::SEC_WEBSOCKET_ACCEPT,
                sign(sec_websocket_key.as_bytes()),   // SHA1 + base64
            )
            .body(Body::empty()).unwrap()
    } else {
        // HTTP/2: 2XX + 空 body
        Response::new(Body::empty())
    };
    response
}

关键工程动作两件:

一、tokio::spawn 一个独立任务跑升级后的握手和用户回调hyper::upgrade::OnUpgrade 这个 future 必须在 Response 被发送之后才会 resolve——因为 hyper 需要先把 101 响应写出去,然后把底层 TCP 连接的控制权交给上层。如果在同一个 future 里先 await on_upgrade 再返回响应,两边互相等对方,死锁。spawn 一个后台任务是标准解法——handler 立即返回 101 响应,后台任务里 await OnUpgrade 拿到 Upgraded,用 tungstenite 封成 WebSocketStream,然后调用户回调

二、sign(key) 计算 Sec-WebSocket-Accept。RFC 6455 规定:Accept = base64(sha1(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))——这个固定 GUID 是协议规范定的。客户端收到响应后会自己算一遍,对比不上就认为是错误的服务端。签名不是为了安全(攻击者也能算),是为了防中间缓存误把 WebSocket 请求当普通 HTTP 请求缓存——GUID 让响应无法被假造

完整数据流:

WebSocket:Sink + Stream 的双向接口

ws.rs:543-546 定义的 WebSocket

rust
pub struct WebSocket {
    inner: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
    protocol: Option<HeaderValue>,
}

两个字段:内部的 WebSocketStream(tokio-tungstenite 提供)和选中的子协议。WebSocket 实现了 futures::Stream(接收帧)和 futures::Sink<Message>(发送帧)——ws.rs:577-619recv / send 方法是便利包装:

rust
// ws.rs:552-562
pub async fn recv(&mut self) -> Option<Result<Message, Error>> { self.next().await }
pub async fn send(&mut self, msg: Message) -> Result<(), Error> {
    self.inner.send(msg.into_tungstenite()).await.map_err(Error::new)
}

设计上不新增抽象,直接复用 futures 生态的 Stream / Sink——这让用户可以用 StreamExt / SinkExt 的所有组合子:socket.map(...)socket.filter(...)socket.forward(...)split 分成独立的 sink 和 stream 等。Axum 不自己定义 WebSocket 事件循环,把这件事交给标准 async 生态。

WebSocket 的安全边界:Origin 检查

WebSocket 协议本身不强制 Origin 校验——服务端接受来自任何源的升级请求是默认行为。这是一个常见的 WebSocket 安全坑:"Cross-Origin WebSocket Hijacking"——恶意网站诱导用户浏览器发起对你服务的 WebSocket 升级,利用浏览器自动带的 Cookie 完成登录态利用。

Axum 本身不强制 Origin 检查——你必须自己校验。标准做法:

rust
use axum::http::HeaderValue;

async fn secure_ws(ws: WebSocketUpgrade, headers: HeaderMap) -> Result<Response, StatusCode> {
    let origin = headers.get("origin")
        .and_then(|h| h.to_str().ok())
        .ok_or(StatusCode::FORBIDDEN)?;
    if !["https://app.example.com", "https://admin.example.com"].contains(&origin) {
        return Err(StatusCode::FORBIDDEN);
    }
    Ok(ws.on_upgrade(|socket| async move { /* ... */ }))
}

注意 WebSocketUpgradefrom_request_parts 不校验 Origin——它只校验 Upgrade/Connection/Sec-WebSocket-* 这些协议必需头。业务级的 Origin / 认证令牌 / Cookie 合法性全部需要 handler 自己写。这是框架"刻意不越权"的体现——安全策略因业务而异,框架默认放开、让应用侧决定。

同理,WebSocket 的认证也不能走浏览器自动 Cookie——最佳实践是客户端在升级 URL 里带 token (wss://host/ws?token=xxx),或者走 Sec-WebSocket-Protocol 头塞 token。前者简单但 token 会出现在日志里;后者更规范但客户端代码需要支持设置 protocol。哪种都是业务选择,Axum 不替你决定。

Message 类型与帧分片

tokio-tungstenite 定义的 Message enum(Axum 重新导出)有几种 variant:

Variant用途帧类型
Text(Utf8Bytes)UTF-8 文本data frame, opcode=0x1
Binary(Bytes)任意字节data frame, opcode=0x2
Ping(Bytes)心跳 pingcontrol frame, opcode=0x9
Pong(Bytes)心跳响应control frame, opcode=0xA
Close(Option<CloseFrame>)关闭连接control frame, opcode=0x8

Text 的 payload 是 Utf8Bytes——ws.rs:621-640 定义的一个类型,它是 Bytes + UTF-8 保证。为什么不直接用 String?因为 String 必须是堆上 Vec<u8>——每次从网络收到文本帧都要拷贝到新的 String 分配。Utf8Bytes 内部是 Bytes,支持零拷贝共享(Bytes 是共享的引用计数字节缓冲)。在高频 WebSocket 消息流下这个差异会显著——几千消息每秒级别时,节省的分配能让 p99 从毫秒降到微秒。

WebSocket 帧在协议层有"分片"(fragmentation)——大消息可以拆成多个 Continuation 帧。tokio-tungstenite 默认把分片帧在内部合并成一个完整的 Message,用户只看到完整消息。你可以配置 WebSocketConfig 关闭自动合并、或者设置最大帧大小——ws.rs:156-300WebSocketUpgrade::max_frame_size / max_message_size / accept_unmasked_frames 等方法调整。默认配置已经合适大多数场景。

Ping/Pong 的用途不是应用层心跳——是 WebSocket 协议层的 keep-alive。RFC 6455 规定:对方的 Ping 必须被回一个 Pong(payload 相同)。tokio-tungstenite 自动处理 Ping→Pong 的回复,用户代码通常不需要关心——除非你想做应用层心跳逻辑(比如超时检测 "30 秒没收到任何消息就断"),那时可以主动调 socket.send(Message::Ping(...)) 并观察是否收到 Pong。

WebSocket 典型 handler 模式

把三件事拼起来——升级、发消息、收消息。一个最小 echo 服务:

rust
use axum::extract::WebSocketUpgrade;
use axum::extract::ws::Message;
use axum::response::Response;

async fn echo(ws: WebSocketUpgrade) -> Response {
    ws.on_upgrade(|mut socket| async move {
        while let Some(Ok(msg)) = socket.recv().await {
            match msg {
                Message::Text(t) => {
                    let _ = socket.send(Message::Text(t)).await;
                }
                Message::Binary(b) => {
                    let _ = socket.send(Message::Binary(b)).await;
                }
                Message::Close(_) => break,
                _ => {}  // 忽略 Ping/Pong(自动处理)
            }
        }
    })
}

广播场景:多客户端订阅、有人发消息时全员收到。用 tokio::sync::broadcast 配合:

rust
use tokio::sync::broadcast;
use futures::{SinkExt, StreamExt};

#[derive(Clone)]
struct AppState {
    tx: broadcast::Sender<String>,
}

async fn chat(ws: WebSocketUpgrade, State(state): State<AppState>) -> Response {
    ws.on_upgrade(|socket| async move {
        let (mut sink, mut stream) = socket.split();  // 分成 sink / stream
        let mut rx = state.tx.subscribe();

        // 下行:从广播 channel 往 client 发
        let send_task = tokio::spawn(async move {
            while let Ok(msg) = rx.recv().await {
                if sink.send(Message::Text(msg.into())).await.is_err() { break; }
            }
        });

        // 上行:从 client 收到消息广播出去
        let recv_task = tokio::spawn({
            let tx = state.tx.clone();
            async move {
                while let Some(Ok(Message::Text(t))) = stream.next().await {
                    let _ = tx.send(t.to_string());
                }
            }
        });

        tokio::select! {
            _ = send_task => (),
            _ = recv_task => (),
        }
    })
}

关键点:

  • socket.split() 分 sink / streamfutures::StreamExt::split 要求 Stream + Sink 双实现——Axum 的 WebSocket 正好满足。拆开后两个独立任务一个收一个发,互不阻塞
  • tokio::select!:任一边断开时整个连接终止——客户端关闭时 recv_task 先返回,select! 立即触发,send_task 被 drop(底层 future 取消)
  • broadcast::Sender 的多生产多消费:任何客户端收到的消息 tx.send 会进入所有 subscriber——包括自己(所以上面的代码自己会收到自己发的消息;生产中可能想过滤)

这种"spawn 两个方向任务 + select 合并"的模式是 axum WebSocket handler 的范本。第 15 章讲 Serve 的优雅关闭时还会看到它如何和 shutdown 信号配合。

消息格式选择:JSON、MessagePack 还是自定义二进制

WebSocket 只传递字节帧(TextBinary)——帧内内容的序列化格式是应用自己的决策。三种主流选择:

JSON:最简单、最兼容——浏览器里 JSON.stringify / JSON.parse 无依赖;调试方便,抓包就能看内容。但 UTF-8 字符串、字段名重复、数字转字符串等都增加开销。适合低频消息、高调试需求、异构客户端场景

MessagePack / CBOR:二进制 serde 格式——比 JSON 小 30-50%、快 2-3×。但需要客户端支持(浏览器里得用 msgpack-litecbor-x 这类库)。适合消息频繁、带宽敏感但仍需 schema 灵活的场景

自定义二进制bincode、手写 struct-pack):最紧凑、最快,但是 schema 硬编码——版本演进复杂。适合内部服务间 WebSocket、固定协议、需要极致性能的场景

Axum 不为任何一种格式做特殊支持——Message::Text(...) / Message::Binary(...) 里装什么字节是应用自己决定。但生产实践里建议:定义一个顶层 ClientMessage / ServerMessage enum,带 tag 字段(serde#[serde(tag = "type")]),所有通信都走这个 enum 的序列化结果。这让协议演进可审计、类型化错误可检查,远优于散着 match json["type"] 的 handler。

WebSocket 错误处理的几条工程直觉

三个常见错误场景和处理建议:

客户端异常断开:网络掉线、客户端崩溃、socket.recv().await 返回 Some(Err(_))None。代码应该优雅退出——不要 unwrap、不要 panic、不要 log "unexpected close"(这是常态不是异常)。在广播场景下,广播 sender 的 drop 会自动触发所有 subscriber 收到 RecvError::Closed,清理流自然发生

慢消费者:某个客户端的上行 sink 在某次 .send() 时超时或失败——可能是 TCP buffer 满、客户端处理慢、网络拥塞。默认行为是卡住——send 会一直 await。生产里建议用 tokio::time::timeout 包裹 send:timeout(Duration::from_secs(30), sink.send(msg)),超时就断开这个连接,否则慢消费者会积压内存

协议违规:客户端发来非法 WebSocket 帧(不合法的 opcode、超大 frame、错误的 mask)——tokio-tungstenite 会返回 Error::Protocol。通常的响应是以合适的 close code 关闭(1002 Protocol error),然后 drop socket

这三类错误都不是 axum 层面的——axum 只在 WebSocketUpgrade 阶段做头校验,升级后的帧层错误全部通过 Message / Error 暴露给 handler。这保持了 axum 自己的代码简单——升级后的问题域移交给 tokio-tungstenite。

Multipart:流式解析的设计考虑

multipart/form-data 是 HTML form 上传文件时用的格式——RFC 7578 定义。请求体被 --<boundary> 分隔成多个字段,每个字段有自己的 Content-Disposition 和可选的 Content-Type

http
POST /upload HTTP/1.1
Content-Type: multipart/form-data; boundary=---XYZ

-----XYZ
Content-Disposition: form-data; name="title"

My Photo
-----XYZ
Content-Disposition: form-data; name="photo"; filename="cat.jpg"
Content-Type: image/jpeg

<二进制字节>
-----XYZ--

文件字段可能很大——几十 MB、几百 MB 都常见。把整个 body 缓冲到 Bytes 再解析是灾难,必须流式处理——边读网络字节边切字段、字段本身可以继续流式消费。这决定了 Multipart 的 API 形态和 Json 完全不同。

from_request:把 body 转 stream 包给 multer

axum/src/extract/multipart.rs:68-82

rust
// axum/src/extract/multipart.rs:68-82
impl<S> FromRequest<S> for Multipart
where S: Send + Sync,
{
    type Rejection = MultipartRejection;

    async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
        let boundary = content_type_str(req.headers())
            .and_then(|content_type| multer::parse_boundary(content_type).ok())
            .ok_or(InvalidBoundary)?;
        let stream = req.with_limited_body().into_body();
        let multipart = multer::Multipart::new(stream.into_data_stream(), boundary);
        Ok(Self { inner: multipart })
    }
}

三步:

1. 从 Content-Type 头解析 boundarymultipart/form-data; boundary=---XYZ 这样的头,multer::parse_boundary 从里面抠出 ---XYZ 字符串——用 multer crate 的解析器,axum 不重写

2. req.with_limited_body().into_body():把 body 套一层 body limit(默认 2MB,第 6 章讨论过的 DefaultBodyLimit)再拆出来。注意这里把 body 缓冲到内存——into_body() 返回的是仍在流式状态的 body

3. multer::Multipart::new(stream.into_data_stream(), boundary)multer 拿到 body stream 和 boundary,构造一个 Multipart 对象。内部不 eagerly 读任何字节——解析是按需进行的,当用户调 next_field() 时才往前推进

Field 的生命周期强制约束

multipart.rs:106-124next_field

rust
// axum/src/extract/multipart.rs:106-124
impl Multipart {
    pub async fn next_field(&mut self) -> Result<Option<Field<'_>>, MultipartError> {
        let field = self.inner.next_field().await.map_err(MultipartError::from_multer)?;
        if let Some(field) = field {
            Ok(Some(Field {
                inner: field,
                _multipart: self,
            }))
        } else {
            Ok(None)
        }
    }
}

pub struct Field<'a> {
    inner: multer::Field<'static>,
    // multer 运行时要求同时最多只有一个活的 Field, Axum 把这个要求编译期化
    _multipart: &'a mut Multipart,
}

Field<'a> 有一个 _multipart: &'a mut Multipart 字段——它不被读取,只用来借用外层 Multipart。这让类型系统强制:"同一时刻最多只有一个活的 Field"。想拿下一个 field,上一个必须先 drop——next_field 的签名 &mut self 要求独占访问,而第一个 Field 已经持有了 &mut self 借用,直到它 drop。

这个设计来自 multer crate 的底层约束——multer::Multipart 内部维护一个共享的字节流位置指针,两个 Field 同时活着会让指针解释冲突、产生错乱的字节切片。multer crate 本身用运行时检查(Err(Error::FieldBorrowed))来防止;Axum 用生命周期把这个检查提前到编译期,从此不可能出 runtime 错误。这是一种典型的"把运行时约束编译化"的工程模式——和第 6 章讲 body 消费单一性的路子一样。

Field:自身也是 Stream

multipart.rs:135-143

rust
impl Stream for Field<'_> {
    type Item = Result<Bytes, MultipartError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.inner).poll_next(cx).map_err(MultipartError::from_multer)
    }
}

Field 实现了 Stream<Item = Result<Bytes, _>>——可以流式消费单个字段的内容。一个 1 GB 的上传文件不需要缓冲,而是用 while let Some(chunk) = field.next().await { write_to_disk(chunk).await; } 按块处理。

对于小字段(文本),Field::bytes() 方法(对应 TryStreamExt::try_concat)会把所有块累积成一个 Bytes

rust
async fn upload(mut mp: Multipart) {
    while let Some(mut field) = mp.next_field().await.unwrap() {
        let name = field.name().unwrap_or("(未命名)").to_string();
        if let Some(_filename) = field.file_name() {
            // 有 filename → 是文件字段,流式写盘
            while let Some(chunk) = field.next().await {
                let chunk = chunk.unwrap();
                /* 写入存储 */
            }
        } else {
            // 没 filename → 是文本字段,累积读
            let data = field.bytes().await.unwrap();
            println!("{} = {:?}", name, data);
        }
    }
}

两种消费模式对应两种字段:文件字段流式、文本字段累积。选错了会出问题——把 1 GB 文件 .bytes() 掉会 OOM 或者被 body limit 拦截;把短文本按 stream 轮询则额外开销大。

multer 的工作方式:状态机驱动的流式解析

multer crate 是 multipart 解析的核心——Axum 完全委托它做真正的字节流处理。了解一下它的工作机制有助于把握 Multipart 的真实开销。

multer 内部维护一个简单的状态机:

关键挑战是"如何检测 boundary":body 是字节流,boundary 可能跨 chunk 边界出现。比如一个 chunk 结尾是 ...data\r\n--boun,下一个 chunk 开头是 dary——单独看任一 chunk 都认不出 boundary,必须前后拼起来。multer 的做法是缓冲"最后 (boundary.len()+2) 字节"作为 lookbehind,下次 poll 时加上新 chunk 再 scan。这种 lookbehind 是所有流式 multipart 解析器的标配技术。

boundary 冲突处理:如果上传文件的字节流里恰好包含和 boundary 相同的字节序列,解析会错乱。为了避免这个问题,HTTP/multipart 协议要求 boundary 足够随机(通常是 30+ 字节的随机字符串,客户端生成)。客户端保证了 boundary 不会出现在任何字段内容里——multer 只需按 boundary 字面扫,不用做复杂的冲突检测。生成不够随机的 boundary(比如 ----WebKitFormBoundary)是客户端 bug,multer 不负责兜底。

空文件/空字段:有些客户端上传"空文件"(用户没选文件就点提交),生成的 multipart 里会有一个 Content-Disposition 包含 filename 但 data 部分是空字节流。multer 正确处理——field.next().await 立即返回 None,不会卡住。handler 代码要判断 field.file_name().is_some() && field.has_data() 决定业务行为(通常空文件字段应当被忽略或报错)。

为什么不用 serde

Json/Query/Form 都走 serde,Multipart 不走。原因有三:

一、字段顺序不保证:multipart 的 field 可以任意顺序发送;结构体的字段有固定顺序。serde 的 Deserializer::deserialize_struct 假定字段按某个可预测的方式呈现(或者都在一个 map 里),multipart 的"流式出现"语义很难塞进这个模型——要么强制把全部 fields 缓冲到 map 再 serde,失去流式优势;要么 serde 支持"按名字乱序 visit",但这是大改标准协议

二、字段内容可能是大二进制:serde 的 visitor 要求类型可反序列化,但文件字段的"正确类型"是什么?Vec<u8>&[u8]io::Read?都不理想——前两个必须整体缓冲,后者不是 serde 支持的类型。Multipart 直接暴露 Field: Stream<Item = Bytes>,用户按自己需要决定怎么消费

三、typing 复杂度不匹配:文本字段和文件字段有质性差异,塞进同一个结构体类型容易设计错。直接暴露 while let Some(field) 循环让用户看到"流式顺序",显式区分文件和文本,反而更清晰

这是"不是所有提取都适合 serde"的典型场景。Axum 的提取器框架不强制 serde——trait 只关心"能从 Request 产出 Self",怎么解析由实现者自定。Multipart 是这种灵活性的充分利用。

Multipart 实战:文件上传 + 安全考虑

生产里写文件上传 handler 要兼顾几件事:

rust
use axum::extract::{Multipart, DefaultBodyLimit};
use tokio::fs::File;
use tokio::io::AsyncWriteExt;

async fn upload(mut mp: Multipart) -> Result<Json<UploadResult>, AppError> {
    let mut title = None;
    let mut file_path = None;

    while let Some(mut field) = mp.next_field().await? {
        match field.name() {
            Some("title") => {
                title = Some(field.text().await?);  // 短文本, 累积
            }
            Some("photo") => {
                let fname = sanitize_filename(field.file_name().ok_or(AppError::NoFilename)?);
                let path = format!("/var/uploads/{fname}");
                let mut f = File::create(&path).await?;
                let mut total = 0usize;
                while let Some(chunk) = field.next().await {
                    let chunk = chunk?;
                    total += chunk.len();
                    if total > 50 * 1024 * 1024 {  // 单字段 50MB 上限
                        return Err(AppError::TooLarge);
                    }
                    f.write_all(&chunk).await?;
                }
                file_path = Some(path);
            }
            _ => {}  // 忽略未知字段
        }
    }
    Ok(Json(UploadResult { title, file_path }))
}

// 路由
Router::new()
    .route("/upload", post(upload))
    .layer(DefaultBodyLimit::max(100 * 1024 * 1024));  // 总 body 100MB

几点:

  1. sanitize_filename:客户端给的 filename 可能是 ../../../etc/passwdfoo\0.jpg——必须过滤。推荐用 sanitize-filename crate,或者直接 reject 含 /\\0.. 的 filename
  2. DefaultBodyLimit::max:总 body 限制。Axum 默认 2MB 对文件上传远远不够,必须显式放大。但不要盲目设超大——先算清楚"最差情况每个用户能上传多大"
  3. 单字段大小校验:循环里累积统计 total,超限主动返回错误。DefaultBodyLimit 是 body 整体限制,不限单字段——文件上传场景通常还想限制"单个文件最大 50MB"
  4. content-type 白名单field.content_type() 返回客户端声明的 MIME type。如果应用只接受图片,应该校验 image/jpeg / image/png 等。注意这只是客户端声明——真实类型还要二次检查(比如读 magic bytes 确认是 JPEG)
  5. 空间管理File::create 后如果后续出错,文件留在磁盘上垃圾。drop guard 模式或 ? 后 cleanup——这是写流式上传的老坑

Multipart 不自动防范任何一项——它只暴露原始字段流。安全责任完全在用户。和 Json 反序列化自动校验类型相比,Multipart handler 要写更多防御代码。

ConnectInfo:来自底层连接的信息

客户端的 IP 地址不在 HTTP 请求里——它来自 TCP 层(TcpStream::peer_addr)。但 handler 可能想知道,比如限流、地理位置识别、访问日志。Axum 的 ConnectInfo<T> 解决这个问题,代价是改变 Router 的暴露方式。

数据流:into_make_service_with_connect_info

普通 Axum 应用是 axum::serve(listener, app)axum::serve(listener, app.into_make_service())。想要 ConnectInfo 时,换成 into_make_service_with_connect_info::<SocketAddr>()

rust
let app = Router::new().route("/", get(handler));
axum::serve(
    listener,
    app.into_make_service_with_connect_info::<SocketAddr>(),
).await;

async fn handler(ConnectInfo(addr): ConnectInfo<SocketAddr>) {
    println!("connected from {addr}");
}

底层工作流在 axum/src/extract/connect_info.rs:99-118Service impl:

rust
// axum/src/extract/connect_info.rs:99-118
impl<S, C, T> Service<T> for IntoMakeServiceWithConnectInfo<S, C>
where
    S: Clone,
    C: Connected<T>,
{
    type Response = AddExtension<S, ConnectInfo<C>>;
    type Error = Infallible;
    // ...
    fn call(&mut self, target: T) -> Self::Future {
        let connect_info = ConnectInfo(C::connect_info(target));
        let svc = Extension(connect_info).layer(self.svc.clone());
        ResponseFuture::new(ready(Ok(svc)))
    }
}

IntoMakeServiceWithConnectInfo 是一个 MakeService——它的 call 接收一个连接 target(比如 IncomingStream),返回一个针对该连接的 Service。在 call 里:

  1. C::connect_info(target):从 target 里提取连接信息(通过 Connected<T> trait),包装成 ConnectInfo<C>
  2. Extension(connect_info).layer(self.svc.clone()):给底层 service 加一个 AddExtension layer,这个 layer 在每次请求处理前把 ConnectInfo<C> 塞进 req.extensions
  3. 返回这个包装后的 service:对该连接所有请求统一用它

关键:每个连接一个 Service 实例——MakeService::call 在每次新连接建立时被调用。所以 ConnectInfo<SocketAddr> 是"按连接注入"的,不是按请求。同一个连接的后续请求(HTTP/1.1 keep-alive 或 HTTP/2 多路复用)共享同一个 ConnectInfo

Connected<T> trait:适配不同的监听器

connect_info.rs:74-77Connected trait:

rust
pub trait Connected<T>: Clone + Send + Sync + 'static {
    fn connect_info(stream: T) -> Self;
}

T 是 stream 类型(比如 IncomingStream<'_, L>SocketAddr)。connect_info 返回 Self——要放进 extensions 的连接信息。connect_info.rs:83-97SocketAddr 实现了两种 Connected

rust
// 从 IncomingStream 拿远端地址
impl<L> Connected<serve::IncomingStream<'_, L>> for SocketAddr
where L: serve::Listener<Addr = Self>,
{
    fn connect_info(stream: serve::IncomingStream<'_, L>) -> Self {
        *stream.remote_addr()
    }
}

// 恒等变换,方便手动注入
impl Connected<Self> for SocketAddr {
    fn connect_info(remote_addr: Self) -> Self { remote_addr }
}

这个设计允许用户用自定义的连接类型——如果你有一个自定义 Listener 暴露出非 SocketAddr 的 peer info(Unix domain socket 的 PID / UID、TLS 握手的 SNI / cert 等),只要给自己的类型 impl Connected,就能走同一套 ConnectInfo 抽象。

FromRequestParts:从 extensions 取回

connect_info.rs:138-154

rust
impl<S, T> FromRequestParts<S> for ConnectInfo<T>
where S: Send + Sync, T: Clone + Send + Sync + 'static,
{
    type Rejection = <Extension<Self> as FromRequestParts<S>>::Rejection;

    async fn from_request_parts(parts: &mut Parts, state: &S) -> Result<Self, Self::Rejection> {
        match Extension::<Self>::from_request_parts(parts, state).await {
            Ok(Extension(connect_info)) => Ok(connect_info),
            Err(err) => match parts.extensions.get::<MockConnectInfo<T>>() {
                Some(MockConnectInfo(connect_info)) => Ok(Self(connect_info.clone())),
                None => Err(err),
            },
        }
    }
}

两步回退:

  • 首选Extension<Self>::from_request_parts 从 extensions 里取出之前 AddExtension 放进去的 ConnectInfo
  • 备用:如果没找到,检查是否有 MockConnectInfo<T>(测试用 Layer),有就用它

MockConnectInfo 是为测试设计的——单元测试里没有真实 TCP 连接,但 handler 可能依赖 ConnectInfoapp.layer(MockConnectInfo(addr)) 在 Router 级别注入一个假的——connect_info.rs:158-196 文档里有完整示例。这种"生产路径 + 测试路径"双支持是 axum 对可测试性的有意投入。

自定义 Connected<T>:Unix socket 认证 / TLS SNI

Connected<T> 的真正价值在扩展。示例:Unix domain socket 下想获得对端进程的 pid / uid(UCred):

rust
use tokio::net::UnixStream;
use std::os::unix::net::UCred;

#[derive(Clone)]
struct UnixConnectInfo {
    pid: Option<i32>,
    uid: u32,
}

// 自定义 Listener 实现:略,要求 IncomingStream::remote_addr 返回 UnixStream
impl<L> Connected<serve::IncomingStream<'_, L>> for UnixConnectInfo
where
    L: serve::Listener<Addr = UnixStream>,  // 简化声明
{
    fn connect_info(stream: serve::IncomingStream<'_, L>) -> Self {
        let peer_cred = stream.remote_addr().peer_cred().ok();
        Self {
            pid: peer_cred.and_then(|c| c.pid()),
            uid: peer_cred.map(|c| c.uid()).unwrap_or(0),
        }
    }
}

// handler 端
async fn who(ConnectInfo(info): ConnectInfo<UnixConnectInfo>) -> String {
    format!("uid={}, pid={:?}", info.uid, info.pid)
}

类似地,TLS server 可以把握手阶段拿到的 client certificate 或 SNI 塞进自定义 ConnectInfo<TlsInfo>。Axum 自身不内置 TLS——需要 axum-serverrustls 配合——但 Connected<T> 的抽象让 TLS 扩展crate能以一致的方式暴露 TLS 元信息给 handler,用户不需要学第二套 API。

这是"依赖倒置"的漂亮例子:Connected<T> 不关心 T 是什么,只要求 T 能从 listener 提供的 stream 产出自己。具体的 T 类型由用户或扩展 crate 定义——axum 的 SocketAddr impl 只是最常用的默认。

反向代理场景:X-Forwarded-For 与真实 IP

生产部署里客户端和 axum server 之间通常隔着反向代理(Nginx、ALB、CloudFlare)。这时 ConnectInfo<SocketAddr> 拿到的是代理的 IP,不是真实客户端 IP。客户端真实 IP 在代理转发请求时加进 HTTP 头里——常见的是 X-Forwarded-For(一串 IP,逗号分隔)或 Forwarded(RFC 7239 标准头)。

axum 本身不内置解析——你需要自己从 headers 读:

rust
use axum::http::HeaderMap;
use std::net::IpAddr;

fn extract_client_ip(headers: &HeaderMap, connect_info: SocketAddr) -> IpAddr {
    // 优先用 X-Forwarded-For 的第一个(最靠近客户端的)
    if let Some(xff) = headers.get("x-forwarded-for") {
        if let Ok(xff) = xff.to_str() {
            if let Some(first) = xff.split(',').next() {
                if let Ok(ip) = first.trim().parse::<IpAddr>() {
                    return ip;
                }
            }
        }
    }
    connect_info.ip()  // fallback 到 TCP 层
}

安全警告在可信代理后面用 X-Forwarded-For——任何人都可以伪造这个头。如果你的 server 直接暴露在 Internet 上,读 X-Forwarded-For 等于让客户端自称 IP,任何限流和日志都失效。正确做法:

  • 在可信代理后面:解析 X-Forwarded-For,信任代理加的那部分
  • 直接暴露:只用 ConnectInfo<SocketAddr>,无视 X-Forwarded-For
  • 混合环境(部分流量有代理部分没有):代码上显式配置"这个 Router 的请求全部来自代理" / "那个 Router 不经过代理"

把这个逻辑做成一个自定义提取器 ClientIp 能让 handler 代码更干净——axum-client-ip crate 就是这样的第三方实现。

性能开销:三种提取器的量级对比

提取器提取阶段开销运行期开销主要成本来源
WebSocketUpgrade~1 µs几乎零头比较 + SHA1 签名;真正通信成本在后续帧流处理
Multipart~1 µs取决于字段数 + 字节量boundary 解析、stream 迭代开销
ConnectInfo<T>< 100 ns一次 extensions HashMap 查找 + T::clone()

三个开销几乎都微不足道——WebSocket 的 SHA1 签名是对 24 字节的计算,现代 CPU 纳秒级;Multipart 的 boundary 扫描用 memmem-style 搜索也很快。真实瓶颈都在协议层面而不是提取器本身:

  • WebSocket:单连接的每帧 RTT、消息的 serde 成本(如果你在 handler 里 serde_json 每个消息)、tokio 任务调度
  • Multipart:网络上行带宽、磁盘写入速度——如果 handler 要把文件落盘,这才是瓶颈
  • ConnectInfo:完全可以忽略——它就是一次 HashMap 读 + 一次 Arc clone 级别

这三个提取器的框架开销远低于它们对应业务的复杂度。优化方向永远是"对应业务怎么优化"——WebSocket 是消息协议设计(比如二进制 vs JSON),Multipart 是存储写入策略(流式写文件而非先缓冲),ConnectInfo 一般不需要优化。

与 Tokio / Hyper 的连接生命周期协作

三个提取器都对 HTTP/Tokio/hyper 的底层有不同程度的依赖,梳理一下这些依赖关系能帮助理解 axum 架构的分层。

WebSocket:依赖 hyper 的 hyper::upgrade::OnUpgrade 机制——第 5 章讲过 hyper 在识别出 Upgrade 头时会往 extensions 放 OnUpgrade future。Axum 从 extensions 取出它,在响应发出后 await 得到 Upgraded TCP stream,交给 tokio-tungstenite 封装成 WebSocket。整条链上 axum 只是"粘合剂"——hyper 提供协议级升级、tokio-tungstenite 提供 WebSocket 帧编解码、tokio 提供异步运行时。这是《Hyper 与 Tower:工业级 HTTP 栈》第 12 章讨论的"hyper 如何处理协议升级"的具体应用场景。

Multipart:依赖 hyper 的 Body trait 作为数据源——多亏 Body::poll_frame 是流式的,multer 可以按需拉取字节,不必等全部 body 到齐。这是《Hyper 与 Tower》第 10 章讨论 http-body crate 的 poll-based body 模型的直接收益——如果 body 是"整体返回"的模型,multipart 的流式优势无从谈起。

ConnectInfo:依赖 tokio 的 TCP listener 能 peer_addr()——这是 tokio::net::TcpStream 暴露的基本能力。IntoMakeServiceWithConnectInfo 在每次 accept 时调 peer_addr,塞进每条连接的 extension 层。这是《Tokio 源码深度解析》第 8 章讨论 TcpListener 的一个上层应用。

三个提取器各自踩着不同的底层抽象:协议层(hyper upgrade)、数据流层(http-body)、连接层(tokio TcpListener)。axum 的提取器框架能把这三种差别极大的机制都装进同一个 trait,是因为 trait 本身是最一般化的契约——只要求"能从 Parts/Request 产出 Self",至于数据从哪儿来、是不是流、是不是独立任务,全交给实现自行决定。

协议性质对比:WebSocket vs SSE vs long polling

实时双向通信有三种主流模式,选型会影响 handler 结构:

模式协议axum 支持特点
WebSocketRFC 6455 / RFC 8441WebSocketUpgrade全双工、二进制或文本、服务端可主动推送、升级后脱离 HTTP
SSE (Server-Sent Events)EventStreamaxum::response::sse::Sse服务端→客户端单向推送、纯文本、走 HTTP 长连接、浏览器原生支持
Long polling普通 HTTP普通 handler客户端轮询、服务端挂起直到有数据或超时、简单但开销大

选型原则:

  • 需要全双工 + 高频消息(游戏、实时协作编辑)→ WebSocket
  • 只有服务端→客户端推送(通知、实时日志、股票行情)→ SSE(第 10 章会讲)
  • 极简场景或无法 WebSocket 的环境(某些企业代理过滤 WebSocket)→ long polling

WebSocket 和 SSE 在 axum 里实现形态截然不同:WebSocket 是提取器WebSocketUpgrade 接到 handler 后接管连接),SSE 是响应(handler 返回一个 Sse 对象,axum 把它编码成 text/event-stream)。这反映了两者的协议本质差异——WebSocket 是"HTTP 握手 + 独立协议",SSE 是"HTTP 响应 + 长流式 body"。axum 对两种形态的 API 设计精确反映了底层协议差异,没有硬塞进同一个抽象里。

小结:提取器框架的包容性

三个提取器呈现的机制看起来差异巨大:

提取器消费 body数据来源关键机制
WebSocketUpgradeheaders + extensions::<OnUpgrade>协议升级握手 + tokio::spawn
Multipart是(流式)body 被转为 byte stream流式解析 + Field 生命周期约束
ConnectInfo<T>extensions(由 MakeService 注入)连接级注入 + Connected<T> trait

但它们都没有突破 FromRequest / FromRequestParts 的接口。WebSocket 走 FromRequestParts(只读 parts),Multipart 走 FromRequest(消费 body),ConnectInfo 走 FromRequestParts(只读 extensions)。Rejection 都实现 IntoResponse,都能和其他提取器共存。这证明了提取器框架的一条重要性质:trait 本身不做假设,它只定义"从 Request / &mut Parts 能产出 Self"的最一般契约。具体实现可以走 serde、可以走流式解析、可以依赖外部注入的 extensions——框架无所谓。

从设计角度看,这三个提取器展示了提取器框架三种不同的扩展方式:

一、WebSocket 是"HTTP 协议级扩展"的提取。它依赖 hyper 提前往 extensions 塞了 OnUpgrade,而这个 OnUpgrade 是 HTTP 协议层的概念(HTTP/1.1 升级、HTTP/2 扩展 CONNECT)。Axum 把这层协议能力封装成"头校验提取器 + 延后的 on_upgrade 方法",保持 handler 签名干净。类似的扩展还有 Server-Sent Events——本章没细讲,但它的机制也是类似的:"handler 返回一个特殊类型的 Response,Axum 在响应层把 EventStream 变成流式 body"

二、Multipart 是"body 协议级扩展"。它不按常规反序列化模式解析 body,而是暴露底层 stream。这种"让用户拿到 stream 自己处理"的设计不是偷懒——是对文件上传这种场景的诚实回应。axum 完全可以做一个 TypedMultipart<T> 把字段按 serde 反序列化到结构体(axum-extra 里就有这样的第三方提取器),但基础 Multipart 保持 stream 接口,把选择权给用户

三、ConnectInfo 是"运行时上下文级扩展"。它的数据根本不在 HTTP 请求里,而是来自运行时(tokio 的 TCP accept)。Axum 通过"MakeService 在连接建立时注入 extensions"的机制,把非请求信息塞进提取器框架。这种模式可以推广到任何"每个连接有独特上下文"的场景——TLS 证书、Unix socket credentials、自定义 metadata

这三种扩展方式加上第 7 章讨论的标准 serde 提取器,几乎覆盖了所有"我想在 handler 里拿到 X"的合理需求。

三个提取器的工程反直觉点

本章讨论的三个提取器各自有一个"初看反直觉、想明白后觉得必然"的设计点,放在一起对比能看到 Axum 的一致思路。

WebSocket 的反直觉点from_request_parts 并不真正升级连接,它只做头校验和 OnUpgrade future 的转移——真正的协议升级和 WebSocket 帧处理都延迟到 on_upgrade 方法里 tokio::spawn 的独立任务。这违反了"提取器提取完所有参数、handler 就能直接使用"的朴素直觉。但这是唯一合理的设计——响应必须在 handler 返回 Response 时同步发出,升级后的流式交互必须异步独立任务,两者不能塞进同一个 future

Multipart 的反直觉点Field<'a> 有生命周期参数。朴素想法是"为什么 Field 不是 owned 类型?我想把它存到 Vec 里顺序处理"——你不能。一次只能有一个活的 Field,这是类型系统强制的。但这是唯一合理的设计——multipart 的字节流是一个共享游标,两个 Field 同时活着必然读取冲突

ConnectInfo 的反直觉点ConnectInfo<T> 作为 handler 参数使用时,必须换 serve 入口到 into_make_service_with_connect_info::<T>()。普通 app.into_make_service()ConnectInfo<SocketAddr> 在 handler 里会提取失败返回 500。这违反了"提取器是插拔的"的朴素期待。但这是唯一合理的设计——ConnectInfo 的数据来自连接建立阶段,必须在 MakeService 层注入,Router 内部看不到那一层

三个反直觉点共享同一个底层原因:Rust 类型系统把"物理约束"和"运行时不变量"强制表达出来。WebSocket 升级必须分两段(响应 + 流处理),Multipart 字节流必须独占使用,ConnectInfo 必须在 MakeService 层注入——这些都不是 Axum 设计者的偏好,而是底层协议 / 数据结构 / 运行时架构的真实约束。Axum 不隐藏这些约束,而是让用户在 API 层面看到它们。代价是初学时要多想一步,收益是一旦理解就不会再踩坑。

下一章开始我们从提取器(输入侧)转向响应(输出侧)——IntoResponse trait 是 handler 返回类型的统一抽象,它的设计同样经过深思。

基于 VitePress 构建