Hyper 与 Tower:工业级 HTTP 栈

第12章 Connection Dispatcher 与 Role:请求分发状态机

作者 杨艺韬 · 12,556 字

第12章 Connection Dispatcher 与 Role:请求分发状态机

12.1 字节之上的编排

上一章我们读完了 decode.rs / encode.rs / role.rs——Hyper 如何把一个请求的字节变成 http::Request、又把一个 Response 变回字节。但这只是 “单条消息” 的处理。

真实的 HTTP/1.1 连接不只是一条消息。一个连接上:

  • 客户端可能发多个请求(keep-alive)。
  • 服务端正处理上一个请求时,客户端可能已经发来下一个请求(pipelining——虽然 RFC 允许但实际很少开启)。
  • 响应 body 可能在请求读完之前就开始写。
  • 发生 Upgrade 时,整条连接要切换到另一个协议(WebSocket、HTTP/2)。

把所有这些协议事件编排成一个可以被 tokio::spawn(connection.await) 驱动的 future——这就是 conn.rsdispatch.rs 的职责。整整 2339 行代码,是整个 HTTP/1 协议栈的指挥中心

这一章我们把这个指挥中心拆开——Conn<I, B, T> 的状态机Dispatcher<D, Bs, I, T> 的 poll loop、它们如何把 Decoder / Encoder / user Service / body channel 编排成一个可正确取消、可 keep-alive、可 upgrade 的完整连接。

源码锁定:hyper 1.9.0 / hyper/src/proto/h1/conn.rs (1531 行) / hyper/src/proto/h1/dispatch.rs (808 行)。

12.2 Conn<I, B, T>:一条连接的所有状态

// hyper/src/proto/h1/conn.rs:38-42
pub(crate) struct Conn<I, B, T> {
    io: Buffered<I, EncodedBuf<B>>,
    state: State,
    _marker: PhantomData<fn(T)>,
}

三个字段:

  • io:buffered IO,把原始 IO(TCP stream)包上一层读写缓冲。下一章专题讲。
  • state:连接的所有状态——下面详述。
  • _markerPhantomData<fn(T)>——TServerClient 的 tag,编译期分支用。

12.2.1 三套状态:Reading / Writing / KeepAlive

State struct 里有几十个字段(源码 930-961 行),但最核心的是三个 enum:

// hyper/src/proto/h1/conn.rs:963-977, 1022-1028
enum Reading {
    Init,
    Continue(Decoder),
    Body(Decoder),
    KeepAlive,
    Closed,
}

enum Writing {
    Init,
    Body(Encoder),
    KeepAlive,
    Closed,
}

enum KA {  // keep-alive status
    Idle,
    #[default]
    Busy,
    Disabled,
}

三个 enum 是正交的——连接同时有一个”当前在读什么”的状态 + “当前在写什么”的状态 + “还打算复用吗”的状态。三者组合起来决定连接下一步该做什么。

12.2.2 Reading 状态意义

  • Init:还没收到 request header——等待客户端发第一个字节。
  • Continue(Decoder):收到了带 Expect: 100-continue 的请求头——发 100 Continue 响应,等 body 开始。
  • Body(Decoder):正在流式读 body——Decoder 按之前决定的长度语义(Length / Chunked / Eof)切帧。
  • KeepAlive:body 读完了——如果连接允许复用,回到 Init 状态等下一个请求;否则转 Closed。
  • Closed:不能再读了——要么收到了 FIN、要么协议错误、要么用户说别读了。

12.2.3 Writing 状态意义

  • Init:没有响应正在写——等待 user 产出 response。
  • Body(Encoder):正在写 response body——Encoder 负责 chunk 格式化 / length enforcement。
  • KeepAlive:响应写完了——等对应的 request 也读完就进入 Idle 阶段。
  • Closed:不能再写了。

12.2.4 KeepAlive 状态意义

KA 是最微妙的一个。它记录”这条连接能不能在当前 message 完成之后被复用”:

  • Busy:当前正在处理请求。默认状态。
  • Idle:当前没有请求在跑,可以接收新的。
  • Disabled:永远不复用——下一条 message 结束后必须关。原因可能是 HTTP/1.0、Connection: close header、或者协议错误。

源码有个有意思的小妙招:

// conn.rs:1013-1019
impl std::ops::BitAndAssign<bool> for KA {
    fn bitand_assign(&mut self, enabled: bool) {
        if !enabled {
            *self = KA::Disabled;
        }
    }
}

KA 重载了 &= 运算符——state.keep_alive &= false; 等价于”如果 false 就设为 Disabled”。这允许代码里这样写:

state.keep_alive &= headers.allows_keep_alive();   // 可能 disable
state.keep_alive &= !headers.has_connection_close(); // 又可能 disable

任何一处 false 就永久 disable——这是一个”单向锁”的语义,用 &= 表达得比 if !x { disable() } 更紧凑、更有代数感。这种小运算符重载在很多工业级 Rust 代码里能见到。

12.3 Reading × Writing × KA 的合法转移

把三个 enum 的 product 算出来——3 × 4 × 4 = 48 种状态组合。但实际上大部分组合不合法。典型的生命周期是:

时刻 1:空连接
  Reading=Init, Writing=Init, KA=Busy

时刻 2:收到 request header
  Reading=Body(decoder), Writing=Init, KA=Busy

时刻 3:body 读完 + 开始写 response
  Reading=KeepAlive, Writing=Body(encoder), KA=Busy

时刻 4:response 写完
  Reading=KeepAlive, Writing=KeepAlive, KA=Busy

时刻 5:状态清理完,回到空 ←─ 这是 try_keep_alive 完成的
  Reading=Init, Writing=Init, KA=Idle

时刻 6:收到下一个 request header
  Reading=Body, Writing=Init, KA=Busy
  ...

这套循环通过 State::try_keep_alive() 推动——当 Reading 和 Writing 同时是 KeepAlive 时,检查 KA 状态决定回到 Init 还是 Closed。Source code 在 conn.rs:1108-1130,其实很短:

// 精简版
fn try_keep_alive(&mut self) {
    match (&self.reading, &self.writing) {
        (Reading::KeepAlive, Writing::KeepAlive) => {
            if let KA::Busy = self.keep_alive.status() {
                self.keep_alive.idle();
                self.reading = Reading::Init;
                self.writing = Writing::Init;
            } else {
                self.close();
            }
        }
        ...
    }
}

读懂这段就掌握了 “keep-alive 复用” 的本质——不是给 socket 起别的名字,而是把两个状态枚举重置回 Init。socket 还是那个 socket、缓冲区还是那个缓冲区——只是这条连接愿意为下一个请求再跑一遍状态机。

12.4 Dispatcher<D, Bs, I, T>:顶层驱动

Conn 只负责状态,driver 在 Dispatcher 里:

// hyper/src/proto/h1/dispatch.rs:21-27
pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
    conn: Conn<I, Bs::Data, T>,
    dispatch: D,
    body_tx: Option<crate::body::Sender>,
    body_rx: Pin<Box<Option<Bs>>>,
    is_closing: bool,
}

五个字段:

  • conn:上面讲的连接状态。
  • dispatchD: Dispatch trait——Server / Client 的分支实现。这个后面细讲。
  • body_tx入站 body 的发送端——当连接读入 body 数据时,塞给这个 sender,让 user handler 能从对应的 Incoming 拿到数据。
  • body_rx出站 body 的接收端——user handler 产出的 Response body,由这里 poll frame 写回连接。
  • is_closing:是不是正在关闭。

12.4.1 Dispatch trait:Server/Client 的分界

// hyper/src/proto/h1/dispatch.rs:29-42
pub(crate) trait Dispatch {
    type PollItem;
    type PollBody;
    type PollError;
    type RecvItem;

    fn poll_msg(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;

    fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
        -> crate::Result<()>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
    fn should_poll(&self) -> bool;
}

四个方法:

  • poll_msg:产出”下一个要发送”的消息(含 body)。
  • recv_msg:消费”刚收到”的消息(含 body)。
  • poll_ready:底层是否能接受新请求?(映射到 Service::poll_ready)
  • should_poll:有没有待做的工作?

这个抽象是 Server 和 Client 的对称点:

Server 侧Client 侧
PollItem(要发的)StatusCode 响应行RequestLine 请求行
RecvItem(收到的)RequestLine 请求行StatusCode 响应行
poll_msg 的来源service.call(req).await 得到从用户 API 队列得到
recv_msg 的目的交给 service.call(req)交给等待响应的用户

两个 impl(Server 和 Client)在源码里分别占 dispatch.rs 500-600 行和 600-700 行。它们各自 handle user-facing 的 API 形状差异——Server 包一个 HttpService<B>、Client 包一个 mpsc receiver。

12.4.2 Server 侧 poll_msg

// hyper/src/proto/h1/dispatch.rs:532-555 精简
fn poll_msg(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<Option<Result<(MessageHead<StatusCode>, S::ResBody), S::Error>>>
{
    let mut this = self.as_mut();
    let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
        let resp = ready!(fut.as_mut().poll(cx));
        // 拿到 user 的 Response<B>
        let (parts, body) = resp?.into_parts();
        let head = MessageHead {
            version: parts.version,
            subject: parts.status,
            headers: parts.headers,
            extensions: parts.extensions,
        };
        Ok((head, body))
    } else {
        unreachable!("poll_msg shouldn't be called if no inflight");
    };
    // 清空 in-flight slot
    this.in_flight.set(None);
    Poll::Ready(Some(ret))
}

逻辑:

  1. in_flightPin<Box<Option<S::Future>>>——这就是 user Service 产生的 future。
  2. poll 它,等 user handler 完成。
  3. 把 Response 拆成 head + body,head 交给 Conn 去写、body 留给 Dispatcher 的 body_rx 持续 poll。

12.4.3 Server 侧 recv_msg

// 简化
fn recv_msg(&mut self, msg: crate::Result<(RequestHead, IncomingBody)>) -> crate::Result<()> {
    let (head, body) = msg?;
    let req = http::Request::from_parts(head_to_parts(head), body);
    let fut = self.service.call(req);
    self.in_flight.set(Some(fut));
    Ok(())
}

收到 request + body,构造 http::Request<Incoming>调用 user Service 的 call——把返回的 future 存进 in_flight

self.service.call(req) 就是 user handler 被实际调用的地方。整个 hyper 的所有工作——parse、decode、encode、state machine 管理——都是为了让这一行代码被调用。

12.5 poll_loop:一次 poll 做多少事

Dispatcher 的核心循环:

// hyper/src/proto/h1/dispatch.rs:163-191
fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    // Limit the looping on this connection, in case it is ready far too
    // often, so that other futures don't starve.
    //
    // 16 was chosen arbitrarily, as that is number of pipelined requests
    // benchmarks often use.
    for _ in 0..16 {
        let _ = self.poll_read(cx)?;
        let _ = self.poll_write(cx)?;
        let _ = self.poll_flush(cx)?;

        if !self.conn.wants_read_again() {
            return Poll::Ready(Ok(()));
        }
    }

    trace!("poll_loop yielding (self = {:p})", self);

    task::yield_now(cx).map(|never| match never {})
}

三件事,循环最多 16 次:

  1. poll_read:从 IO 读 header 或 body。如果读到了 message,传给 dispatch.recv_msg
  2. poll_write:如果 user 产出了消息(从 dispatch.poll_msg),把 header 写到 buffered IO;如果正在写 body,poll user body 的 frame,通过 encoder 写。
  3. poll_flush:把 buffered IO 的字节真正提交到底层 socket。

12.5.1 为什么是 16

// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.

注释里自己说”arbitrary”——随便选的。但 16 有一定道理:

  • HTTP/1.1 pipelining 的 benchmarks 一般开 16 个并发——刚好一次 poll 处理完一个 batch。
  • 大多数小请求(header-only)一次 poll 就能完成。
  • 如果真有超 16 个 message 堆在同一个 socket 上——最后一行 task::yield_now(cx) 让出 task,让 runtime scheduler 有机会先跑别的 future。

这是一个典型的 cooperative yielding 模式——循环到某个上限后主动让贤,避免”饿死其他 task”。卷四《Tokio 源码深度解析》第 14 章(select! 与公平调度)里讨论过 Tokio runtime 的”Budget”机制——tokio 在 await 点会自动插入这类 yielding。Hyper 的 yield_now 是相同的思路——避免任何 future 独占 executor 太久

12.5.2 wants_read_again 的退出条件

这是 poll_loop 的另一个智慧:循环会提前退出,除非 “读出新东西后状态又变得可读”:

if !self.conn.wants_read_again() {
    return Poll::Ready(Ok(()));
}

大多数时候一次 poll 只做一件事——读一次、写一次、flush 一次,然后返回 Pending 等 IO。只有在”读完一条消息、紧接着下一条又在 buffer 里”(pipelining 场景)时才继续循环。

这样做的好处:非 pipelining 场景(占 99%)没有额外开销,pipelining 场景又能利用缓存中的字节提升吞吐。按 common case 优化是工业级代码的气质

12.6 poll_read:身兼数职

这是 Dispatcher 最复杂的一个方法(dispatch.rs:192-285,将近 100 行)。我们挑核心看:

fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    loop {
        if self.is_closing { return Poll::Ready(Ok(())); }

        if self.conn.can_read_head() {
            // 情况 1:还没读 header——读 header
            ready!(self.poll_read_head(cx))?;
        } else if let Some(mut body) = self.body_tx.take() {
            if self.conn.can_read_body() {
                // 情况 2:header 读过了、正在流式读 body
                match body.poll_ready(cx) { ... }
                match self.conn.poll_read_body(cx) {
                    Poll::Ready(Some(Ok(frame))) => {
                        if frame.is_data() {
                            body.try_send_data(frame.into_data().unwrap())
                        } else if frame.is_trailers() {
                            body.try_send_trailers(frame.into_trailers().unwrap())
                        }
                        ...
                    }
                    ...
                }
            }
        } else {
            // 情况 3:没有正在流的 body,可能是 keep-alive 等待
            return Poll::Ready(Ok(()));
        }
    }
}

读的三种情况:

  1. 还没读 header:调 poll_read_head —— 从 IO 读字节,调 role::parse_headers 解析。解析成功后构造 IncomingBody 的 channel(body_tx 这边存进 Dispatcher,另一边通过 dispatch.recv_msg 包进 http::Request 交给 Service)。
  2. 正在读 bodybody.poll_ready(cx) 检查 body 接收者还想继续读(这就是第 10 章讨论的 want_tx 信号);检查通过后 poll decoder 拉一帧,送到 body_tx。
  3. 没活儿干:return Ready——让 poll_write 或者下一轮再试。

注意嵌套的三层 Poll 对话:

  • dispatcher poll_read 问 Conn:能读 body 吗
  • Conn 问 decoder:decode 一帧吧
  • Decoder 问 buffered IO:要读字节
  • Buffered IO 问 underlying socket:有字节来吗

任何一层 Pending,整条链挂起。任何一层有东西,信号向上反馈——一路到 dispatcher 把 frame 塞到 body_tx。

12.6.1 源码核对:poll_read 的”用户取消 body”主动检测

§12.6 给了 poll_read 的简化版。真实的 poll_read(dispatch.rs:195-269)有几条为”用户主动放弃 body”准备的容错路径

fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    loop {
        if self.is_closing { return Poll::Ready(Ok(())); }
        else if self.conn.can_read_head() { ready!(self.poll_read_head(cx))?; }
        else if let Some(mut body) = self.body_tx.take() {
            if self.conn.can_read_body() {
                match body.poll_ready(cx) {
                    Poll::Ready(Ok(())) => (),
                    Poll::Pending => {
                        self.body_tx = Some(body);
                        return Poll::Pending;
                    }
                    Poll::Ready(Err(_canceled)) => {
                        // user doesn't care about the body
                        // so we should stop reading
                        trace!("body receiver dropped before eof, draining or closing");
                        self.conn.poll_drain_or_close_read(cx);
                        continue;
                    }
                }
                match self.conn.poll_read_body(cx) {
                    Poll::Ready(Some(Ok(frame))) => {
                        if frame.is_data() {
                            let chunk = frame.into_data()...;
                            match body.try_send_data(chunk) {
                                Ok(()) => { self.body_tx = Some(body); }
                                Err(_canceled) => {
                                    if self.conn.can_read_body() {
                                        trace!("body receiver dropped before eof, closing");
                                        self.conn.close_read();
                                    }
                                }
                            }
                        }
                        ...
                    }
                    ...
                }
            }
        }
        else { return self.conn.poll_read_keep_alive(cx); }
    }
}

两处 _canceled 错误的区别处理值得拆开:

1、body.poll_ready(cx) → Err(_canceled):用户在还没接受任何 body chunk 之前就 drop 了 IncomingBody。比如 user handler 是 async fn handler(_req: Request) -> Response { ... }——根本没读 body。

应对:self.conn.poll_drain_or_close_read(cx); continue;——**尝试 drain(继续从 socket 读但丢弃数据)**或者直接 close。drain 的好处是让 keep-alive 还能用——客户端发完 body 后这条连接还能复用;如果直接 close,必须断 connection。drain 是性能优化、close 是 fallback

2、body.try_send_data(chunk) → Err(_canceled):用户在接收到一些 chunk 后才 drop(比如读了一半 body 就 break)。这时 drain 的成本太高(可能还有几 MB body 没读)——直接 self.conn.close_read() 关掉 read 侧。

这两条分支体现了 “早 drop / 晚 drop” 的不同最优策略——HTTP server 处理一个 abnormal request lifecycle 时不能简单关连接、要看丢弃发生在哪一步。

注释里的 “trace!” 行让运维能追踪到这种异常——日志说 “body receiver dropped before eof”——根因就在用户的 handler 里。

12.6.2 源码核对:poll_read_head 的 dispatch.poll_ready 三态分支

§12.6 顺带提了 poll_read_head。真实代码(dispatch.rs:271-280)开头是这样的:

fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    // can dispatch receive, or does it still care about other incoming message?
    match ready!(self.dispatch.poll_ready(cx)) {
        Ok(()) => (),
        Err(()) => {
            trace!("dispatch no longer receiving messages");
            self.close();
            return Poll::Ready(Ok(()));
        }
    }

    // dispatch is ready for a message, try to read one
    match ready!(self.conn.poll_read_head(cx)) {
        Some(Ok((mut head, body_len, wants))) => { ... }
        ...
    }
}

先问 dispatch 是否 ready 接受新 message——再才向 socket 读 head。这是标准的 backpressure pattern:

  • Server 侧 dispatch.poll_ready 内部映射到 service.poll_ready ——如果 user service 还没 ready 接受新请求(可能在做 rate limiting / connection draining)——dispatcher 就不读新 header
  • Client 侧 dispatch.poll_ready 内部检查”还有 user 在等响应吗”——如果队列空了、回 Err——dispatch 关闭连接

Err(()) 走 close 路径——很重要:避免 server 在 user service 已经”不接受请求”时还继续读字节、最终 OOM。Service trait 的 poll_ready 协议在这里被严格执行。

这条 backpressure 链贯穿 hyper 整个 IO 路径——从 socket → buffered IO → conn → dispatch → service → user handler——每一层都能拒绝下一层的输入。没有这条链、HTTP server 在过载时会无限缓冲、最终 OOM

12.7 poll_writebody_rx 的互动

与 poll_read 对称的 poll_write(略):

  1. 如果 Writing::Init 且 dispatch 有 message——poll_msg 拿 head + body,encode_headers 写 header,body_rx 记住;writing 切到 Body。
  2. 如果 Writing::Body——poll body_rx 下一帧,用 Encoder 格式化(chunked / length)写 buffered IO。
  3. body 写完——Writing::KeepAlive;try_keep_alive 检查是否可以复用。

核心循环模式相同:循环往 IO 推数据,直到 Pending

12.7.1 body_rx 的 Pin<Box<Option<Bs>>>

这个类型很有意思:

body_rx: Pin<Box<Option<Bs>>>,

三层包装——Box 堆分配、Pin 固定地址、Option 允许清空。为什么要这么复杂?

  • Pin:user 的 Body 可能是 !Unpin(典型例子是 async_stream! 宏生成的 Stream),需要 pin 才能 poll。
  • BoxPin<&mut Bs> 需要 Bs 在内存中地址稳定——堆分配保证这一点。
  • Option:body 写完之后要被 drop——set(None) 清空 Box 的内容。

Pin<Box<Option<T>>> 这种三层包装是 Rust 异步代码里处理”可替换、可 pin、可 unpin 的 future/stream”的标准模板。理解它比去死记它来得实用——下次你遇到类似需求,知道怎么拼就行了。

12.7.1 源码核对:poll_write 的 body chunk 处理三分支 + 空 chunk 优化

§12.7 给的 poll_write 是高层视角。真实代码(dispatch.rs:328-421)有几条专门优化 chunk 流的细节

let item = ready!(body.as_mut().poll_frame(cx));
if let Some(item) = item {
    let frame = item.map_err(|e| {
        *clear_body = true;
        crate::Error::new_user_body(e)
    })?;

    if frame.is_data() {
        let chunk = frame.into_data().unwrap_or_else(|_| unreachable!());
        let eos = body.is_end_stream();
        if eos {
            *clear_body = true;
            if chunk.remaining() == 0 {
                trace!("discarding empty chunk");
                self.conn.end_body()?;
            } else {
                self.conn.write_body_and_end(chunk);
            }
        } else {
            if chunk.remaining() == 0 {
                trace!("discarding empty chunk");
                continue;
            }
            self.conn.write_body(chunk);
        }
    } else if frame.is_trailers() {
        *clear_body = true;
        self.conn.write_trailers(...);
    } else {
        trace!("discarding unknown frame");
        continue;
    }
} else {
    *clear_body = true;
    self.conn.end_body()?;
}

四条工程精细:

1、空 chunk 的双路径丢弃:用户 body 实现可能 yield Ok(empty_chunk)——比如 stream 里中间状态。chunked encoding 下传一个空 chunk 字面意味”end-of-message”——错误地写出去会让客户端以为 body 结束、提前关流。所以 hyper 检测 chunk.remaining() == 0discard——非 EOS 时 continue 等下一帧;EOS 时直接 end_body 而不写 chunk。

2、body.is_end_stream() 优化合并最后一帧 + 终止:当 user body 的当前 frame 是最后一帧——write_body_and_end(chunk) 一次性写 chunk + 结束标记(chunked 的 0\r\n\r\n terminator),省一次 IO 系统调用。如果分两步(write_body + end_body)会多一次 buffer flush 决策。

3、*clear_body = true 配合 OptGuard 模式OptGuard::new(self.body_rx.as_mut())clear_body 是个 RAII guard——析构时如果 *clear_body == true 就把 body_rx 置 None。这是对panic-safe 资源清理的精确实现——即便 poll_frame 中间 panic、guard 也会正确清理 body_rx 状态。

4、unknown frame 丢弃http_body::Frame 是个枚举(Data / Trailers / 未来可能加新变体)。hyper 用 is_data() / is_trailers() 判明确支持的两种、其他静默丢弃(continue)。Forward compatibility——http_body crate 未来加新 frame type 不会让 hyper 编译失败、只是不处理新类型。

这些细节合起来让 hyper 的 body 写入路径对各种 abnormal user body 都鲁棒——空 chunk、未知 frame、中途 panic、提前 EOS——都有应对。生产级 HTTP server 必须 handle 这些 case、否则一个 buggy user service 就能 crash 整个进程

12.8 错误、Upgrade、Shutdown 的正交处理

Dispatcher 还要处理三类”不走正常流程”的事件。

12.8.1 错误:poll_catch

// dispatch.rs:122-142
fn poll_catch(&mut self, cx: &mut Context<'_>, should_shutdown: bool)
    -> Poll<crate::Result<Dispatched>>
{
    Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
        // Be sure to alert a streaming body of the failure.
        if let Some(mut body) = self.body_tx.take() {
            body.send_error(crate::Error::new_body("connection error"));
        }
        self.dispatch.recv_msg(Err(e))?;
        Ok(Dispatched::Shutdown)
    }))
}

任何 poll_inner 的错误都被 poll_catch 吞下:

  1. 如果有正在接收的 body——把错误发到 body_tx,让 handler 侧的 body.poll_frame 看到 Err
  2. 告诉 dispatch(Server / Client)有错误发生——让它优雅退出,而不是 panic 或挂起。
  3. 返回 Dispatched::Shutdown——让上层知道连接该关了。

错误不会静默消失——它顺着 body channel 和 dispatch 接口一起传给 user。这是 Hyper 可靠性的一个基石。

12.8.2 Upgrade:特殊分支

当客户端发 Upgrade: websocket + 服务端响应 101 Switching Protocols,连接就脱离 HTTP/1 协议了——TCP stream 被交给 user 继续用。

// dispatch.rs:144-164 简化
if self.is_done() {
    if let Some(pending) = self.conn.pending_upgrade() {
        self.conn.take_error()?;
        return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
    } else if should_shutdown {
        ready!(self.conn.poll_shutdown(cx))...;
    }
    ...
}

pending_upgrade() 返回一个 Pending<Upgraded> ——这个 handle 会在未来的某一刻拿到 raw socket(通过 Upgraded::downcast_into())。Dispatcher 的工作到这里结束——它不会再读写 socket,让给 upgrade handler。

Upgrade 的完整机制是第 18 章的主题。这里只知道 Dispatcher 通过 Dispatched::Upgrade(pending) 这个 退出码 把”连接控制权”交回给上层。

12.8.3 Shutdown 的优雅顺序

else if should_shutdown {
    ready!(self.conn.poll_shutdown(cx))?;
}

poll_shutdown 调到 AsyncWrite::poll_shutdown——在 TCP 上发一个 FIN,告诉对端”我不再发了”。对端会看到 EOF,自然关闭它的 read half。

这个机制是 TCP 半关闭——hyper 不会粗暴地 close 整个 socket。对端还能继续读缓冲里没被消费的字节(比如你的响应最后几个字节),直到正常 EOF。这是和”快速关”的关键差别——快速关会丢数据,半关保证完整。

12.8.1 源码核对:try_keep_alive 的 4 路状态转移 + Client 特殊路径

§12.3 给了 try_keep_alive 的简化版。真实代码(conn.rs:1072-1091)覆盖4 种状态组合

fn try_keep_alive<T: Http1Transaction>(&mut self) {
    match (&self.reading, &self.writing) {
        // 情况 1:双方都 KeepAlive → 看 KA flag 决定 idle 或 close
        (&Reading::KeepAlive, &Writing::KeepAlive) => {
            if let KA::Busy = self.keep_alive.status() {
                self.idle::<T>();
            } else {
                trace!("try_keep_alive({}): could keep-alive, but status = {:?}",
                    T::LOG, self.keep_alive);
                self.close();
            }
        }
        // 情况 2/3:一方 Closed 一方 KeepAlive → 直接 close
        (&Reading::Closed, &Writing::KeepAlive)
        | (&Reading::KeepAlive, &Writing::Closed) => {
            self.close()
        }
        // 情况 4:其他组合(一方还在 Init/Body 等)→ 啥都不做、等下次 try
        _ => (),
    }
}

第 2/3 种情况的存在反映了对称性破缺的处理:read 关了但 write 还想 keep-alive——明显矛盾、必须关;反之亦然。如果不处理这种”半关闭一半 keep-alive”的状态,连接会卡在不正常状态——既不真关也不真复用。

fn idle<T: Http1Transaction> (conn.rs:1104-1134)里有一段Client 特有的逻辑

fn idle<T: Http1Transaction>(&mut self) {
    debug_assert!(!self.is_idle(), "State::idle() called while idle");

    self.method = None;
    self.keep_alive.idle();
    if !self.is_idle() {
        self.close();
        return;
    }

    self.reading = Reading::Init;
    self.writing = Writing::Init;

    // !T::should_read_first() means Client.
    //
    // If Client connection has just gone idle, the Dispatcher
    // should try the poll loop one more time, so as to poll the
    // pending requests stream.
    if !T::should_read_first() {
        self.notify_read = true;
    }

    #[cfg(feature = "server")]
    if self.h1_header_read_timeout.is_some() {
        // Next read will start and poll the header read timeout,
        // so we can close the connection if another header isn't
        // received in a timely manner.
        self.notify_read = true;
    }
}

!T::should_read_first() 是 client side 的标识——server 是”先读 request 再写 response”、client 是”先写 request 再读 response”。Client 在 idle 时设 notify_read = true 让 Dispatcher 立刻再 poll 一次——因为 client 可能有 user 在等着发新请求、需要立刻处理。Server 不需要——它只是被动等下一个请求来。

这个 should_read_first() 是 hyper Server/Client trait 区分的核心方法之一——通过它在编译期 dispatch 到不同的状态机行为。server feature 和 client feature 共享 conn.rs 的代码——dispatch 通过 trait method 实现 zero-cost abstraction。

#[cfg(feature = "server")] 后面的 h1_header_read_timeout 是 server 特有的配置——下一个请求 header 必须在 timeout 内到达、否则关连接(防 slowloris attack)。这条逻辑也通过 notify_read 立刻触发下一次 poll。

短短 30 行 idle 函数同时处理 Client 立刻 poll、Server timeout 启动、双方共用的状态重置——用 trait method + cfg 让两个 use case 共享代码。这种”trait + cfg 双 dispatch” 是 Rust feature gating 的常见 pattern。

12.9 Role 的 maybe_notify 小玩法

最后看一段鲜为人知但极其 clever 的代码。Conn state 里有一个字段:

notify_read: bool,

它是一个”我需要被再 poll 一次”的标记。当 Dispatcher 处于 “body 读完了但 KeepAlive 状态还没复用”的瞬间,如果已经有下一个 request 的字节在 buffer 里——不让 task 挂起、直接循环回 poll_read——这就是 wants_read_again 的意义。

// dispatch.rs:186-190
if !self.conn.wants_read_again() {
    return Poll::Ready(Ok(()));
}

简单两行,但它是 hyper pipelining benchmark 里”吞吐不崩”的秘诀。没有这个检查,每个 pipelined request 要多一次 runtime wake-up 循环——在高并发下开销显著。

有注释特别提到:

Using this instead of task::current() and notify() inside the Conn is noticeably faster in pipelined benchmarks.

翻译:不通过 waker re-notify,直接把下一轮 loop 继续跑——在 pipelined benchmark 里明显更快

这就是为什么读 hyper 源码你会一再看到”为 benchmark 调优”的痕迹——它是一个 benchmark-driven 的工程代码。每一处非朴素的代码背后几乎都有一个 benchmark 数字推动。

12.9.1 源码核对:poll_read_keep_alive 的三态——区分”安全 idle”和”异常字节”

§12.8.1 提了 idle 状态下还可能要 read。真实 poll_read_keep_alive(conn.rs:435-489)做的事是在 idle 时仍然要从 socket 读、判断什么时候是合法 EOF、什么时候是协议错误

pub(crate) fn poll_read_keep_alive(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    debug_assert!(!self.can_read_head() && !self.can_read_body());

    if self.is_read_closed() {
        Poll::Pending
    } else if self.is_mid_message() {
        self.mid_message_detect_eof(cx)
    } else {
        self.require_empty_read(cx)
    }
}

三个分支:

1、is_read_closed() → Pending——已经关了 read 端、不应该再尝试任何 IO。返回 Pending 让 task 睡眠(直到外部事件唤醒)。

2、is_mid_message() → mid_message_detect_eof——还在处理一条消息中(reading 或 writing 不是 Init)。这时如果 socket 突然 EOF——是异常——客户端不该在 message 中途断连。报 Error::new_incomplete()

3、其他 → require_empty_read——纯 idle 状态。期望 socket 没有任何字节——如果有字节、说明客户端发了多余数据(违反协议);如果 EOF——可能是合法的”客户端关闭连接”(return Ok)或异常断开(return error,取决于 should_error_on_eof)。

require_empty_read(conn.rs:458-489)专门给 client side 用——Server 不会主动检测 idle 时是否有 unexpected bytes(因为 server 收到额外数据可能就是 pipelining 的下一个请求)。Client 在 idle 时收到额外字节是 protocol violation——server 不该在 client 没发请求时主动发数据。

mid_message_detect_eof(conn.rs:491-508)有个微妙的 allow_half_close 检查:

if self.state.allow_half_close || !self.io.read_buf().is_empty() {
    return Poll::Pending;
}

half-close 模式下——客户端关闭 write 端但 server 还能继续 write response(典型场景:长 polling、SSE、某些 RPC)——hyper 不报错。allow_half_close 是个 server config 选项、默认 false(严格模式)。

这两个函数加起来覆盖了 idle 状态下的4 种 socket 事件:合法 EOF(safe close)、异常 EOF(incomplete)、unexpected bytes(protocol error)、合法 half-close(allow_half_close 模式)。每种事件走不同路径——保证 hyper 在各种异常 client 行为下都有正确响应。

这种”边角情况的精确分类处理”是生产 HTTP server 的核心质量指标——一个 server 的”健壮性”不在 happy path 表现、在 90% 时间不会发生但发生了就崩的边角情况。hyper 的 conn.rs 用 1500 行代码精心覆盖这些 case——对比一些”hello world”级别的 HTTP server(200-300 行)能 handle 完所有 case 的不到 5%。

12.10 一张连接生命周期图

把整章讲的东西画成图:

TCP accept

Conn::new(io) — state.reading=Init, writing=Init, KA=Busy

Dispatcher::new(svc, conn)

tokio::spawn(dispatcher) ─── 进入 poll_loop

        ├─ poll_read   ───── role::parse ─── dispatch.recv_msg ─── svc.call(req)
        │                                                              │
        ├─ poll_write  ───── dispatch.poll_msg ─── role::encode ───── writes header
        │                    │
        │                    └── body.poll_frame ─── encoder 写 body

        ├─ poll_flush  ───── underlying socket

        └─ wants_read_again? ─── 继续循环 / yield

事件发生:
  - 读完 body       → Reading::KeepAlive
  - 写完 body       → Writing::KeepAlive
  - try_keep_alive  → 若 KA=Busy,reset 到 Init(开始下一轮)
  - 或 KA=Disabled  → close(); poll_shutdown
  - 或 Upgrade      → Dispatched::Upgrade(pending),Dispatcher 退出

这张图浓缩了 2339 行代码的指挥流。读懂它你就读懂了 hyper 1.x HTTP/1 的全部顶层逻辑。

12.10.1 源码核对:poll_read_head 的 h1_header_read_timeout——slowloris 防御

§12.8.1 提到 server 有 h1_header_read_timeout。打开 conn.rs:211-277 看完整实现:

pub(super) fn poll_read_head(...) -> Poll<...> {
    debug_assert!(self.can_read_head());

    #[cfg(feature = "server")]
    if !self.state.h1_header_read_timeout_running {
        if let Some(h1_header_read_timeout) = self.state.h1_header_read_timeout {
            let deadline = self.state.timer.now() + h1_header_read_timeout;
            self.state.h1_header_read_timeout_running = true;
            match self.state.h1_header_read_timeout_fut {
                Some(ref mut fut) => {
                    self.state.timer.reset(fut, deadline);
                }
                None => {
                    self.state.h1_header_read_timeout_fut =
                        Some(self.state.timer.sleep_until(deadline));
                }
            }
        }
    }

    let msg = match self.io.parse::<T>(cx, ParseContext { ... }) {
        Poll::Ready(Ok(msg)) => msg,
        Poll::Ready(Err(e)) => return self.on_read_head_error(e),
        Poll::Pending => {
            #[cfg(feature = "server")]
            if self.state.h1_header_read_timeout_running {
                if let Some(ref mut fut) = self.state.h1_header_read_timeout_fut {
                    if Pin::new(fut).poll(cx).is_ready() {
                        self.state.h1_header_read_timeout_running = false;
                        warn!("read header from client timeout");
                        return Poll::Ready(Some(Err(crate::Error::new_header_timeout())));
                    }
                }
            }
            return Poll::Pending;
        }
    };

    #[cfg(feature = "server")]
    {
        self.state.h1_header_read_timeout_running = false;
        self.state.h1_header_read_timeout_fut = None;
    }
    ...
}

这套机制专门 mitigate Slowloris 攻击——攻击者建立 TCP 连接、慢慢发送 header(每秒 1 字节)、占用 server connection slot 直到 server 资源耗尽。Slowloris 是 2009 年提出的著名 HTTP DoS 攻击——影响过 Apache、IIS、nginx(早期版本)。

hyper 的应对:

1、h1_header_read_timeout(server config,默认 None 即禁用)——admin 可以设置”header 必须在 N 秒内全部读完、否则关连接”。生产部署强烈建议设 30s 或更短。

2、Timer 复用reset(fut, deadline) 复用已有 timer future、不每次 new 一个。timer 是有 cost 的(注册到 tokio runtime 的 timer wheel)——复用避免重复成本。

3、读到 header 后清理 timerh1_header_read_timeout_running = false + h1_header_read_timeout_fut = None——避免 timer 在已经完成 read 后继续触发。

4、Pending 时 poll timer:在每次 socket Pending 时主动 poll timer——如果 timer 已经 ready(超时了)——立刻 raise header_timeout error 关连接。

这是个多层 future 协调的范例——socket future 和 timer future 同时被 poll、谁先 ready 谁触发对应行为。这种”race between IO and timer”在异步 server 里是常见 pattern——其他场景如 keep-alive timeout、idle timeout、request timeout 都用同样套路。

12.10.2 源码核对:state.busy() + keep_alive &= msg.keep_alive 的并发安全状态推进

poll_read_head 拿到 msg 后立刻执行(conn.rs:293-295):

self.state.busy();
self.state.keep_alive &= msg.keep_alive;
self.state.version = msg.head.version;

三行代码做了三件事:

1、state.busy() 标记连接正忙——之前可能是 Idle,现在收到 request 头要变 Busy。但有个微妙:busy() 内部检查 if let KA::Disabled = self.keep_alive.status() { return; }——如果之前已经被 disable(比如 connection: close header),不会回退到 Busy——保持 Disabled 状态。

2、keep_alive &= msg.keep_alive——§12.2.4 讲过这条 BitAndAssign。msg.keep_alive 是 parser 根据 HTTP 版本和 Connection header 计算出的”这条 message 是否允许 keep-alive”。&= 让任何一条 false 永久 disable——单向锁语义。

3、state.version = msg.head.version——记录这条 message 的 HTTP 版本。后续 response 必须用相同或更低版本(HTTP/1.1 server 不能给 HTTP/1.0 client 回 HTTP/1.1)。

三行配合体现了状态推进的精确性——每条新进 message 都按规则更新 connection state、不会因为某个状态被错误地”重置”而违反协议。这种精确性是 hyper 能在 production 跑亿万小时不出 bug 的基础。

12.11 和其他章节的呼应

  • 第 11 章讲的 Decoder / Encoder 被 Conn::state 的 Reading / Writing 持有。
  • 第 10 章讲的 Incoming::Chan { data_rx, want_tx } 的 sender 端就是 Dispatcher 的 body_tx——want_tx 的信号传到 Dispatcher 的 poll_read,决定是否 poll decoder。
  • 第 13 章(下一章)讲的 hyper::Service trait 是被 Server::recv_msg 这一行调用:self.service.call(req)
  • 第 18 章会接回 pending_upgrade() 的流程。

整本书到这里进入”hyper 核心最难的部分”——协议状态机。读完第 11、12 章你应该感到有些累——因为它们是全书最 intricate 的章节。但这也是收获最大的两章——这就是工业级 HTTP/1 实现,没有捷径。

12.10.2.5 源码核对:poll_drain_or_close_read 的优雅 drain 策略

§12.6.1 提到了 user 早 drop body 时 hyper 走”drain or close”。打开 conn.rs:848-865

/// If the read side can be cheaply drained, do so. Otherwise, close.
pub(super) fn poll_drain_or_close_read(&mut self, cx: &mut Context<'_>) {
    if let Reading::Continue(ref decoder) = self.state.reading {
        // skip sending the 100-continue
        // just move forward to a read, in case a tiny body was included
        self.state.reading = Reading::Body(decoder.clone());
    }

    let _ = self.poll_read_body(cx);

    // If still in Reading::Body, just give up
    match self.state.reading {
        Reading::Init | Reading::KeepAlive => {
            trace!("body drained")
        }
        _ => self.close_read(),
    }
}

三条工程妙处:

1、Reading::Continue 的特殊路径——客户端发了 Expect: 100-continue 期望 server 同意才发 body。如果 user 已经 drop 了 body 接收者——根本不该回 100-continue(client 收到后会发 body、徒增网络流量)。代码直接把状态转到 Reading::Body、不发 100-continue——这是 server 的”礼貌拒绝”

2、试一次 poll_read_body 看能不能立刻 drain——如果 socket 缓冲里已经有 body 字节、立刻读完进入 Init / KeepAlive 状态、连接复用。如果 body 还没到、状态保持 Body——_ => close_read() 直接关。

这个权衡背后的考虑:drain 一个未读的小 body(几 KB)几乎免费、能保住 keep-alive;drain 一个大 body(几 MB)耗 CPU + 网络、不如直接关。hyper 用”试一次能不能立刻 drain、不能就 give up”的策略——比 “总是 drain” 或 “总是 close” 更精细。

3、注释 “If still in Reading::Body, just give up” 表达了 hyper 的核心 opinion——不为了 keep-alive 强行付出昂贵代价。Connection 复用是优化、不是宗教——能复用就复用、不能就关。这条 pragmatic 的态度让 hyper 在”客户端各种异常行为”下都不会陷入 hang/wait 状态

这条 17 行函数浓缩了 HTTP server 写作里最微妙的取舍之一——keep-alive 的成本/收益、用户行为的异常处理、网络 IO 的按需触发。读懂它能让你写自己的 server 时不再纠结”用户中途 drop 我该咋办”——答案是 try-then-give-up。

12.10.3 hyper Connection 编排 vs 其他 HTTP server 实现

横向对比让你看到 hyper 在 Connection 这一层的设计取舍:

Apache httpd(C,prefork/worker MPM):传统 multi-process / multi-thread 模型——每条连接一个 worker。Connection 状态全在线程局部变量 + apr_pool 里。简单但内存重——10K 并发要 10K 进程或线程、几 GB 内存。

nginx(C,单线程事件驱动):所有 connection 共享一个 event loop——每条 connection 是一个 ngx_connection_t struct,状态用 callback chain 表达。极致性能但代码复杂——一个事件回调能跨越几百行代码追踪。

Node.js http module(JavaScript on libuv):Connection 是个 EventEmitter——状态通过 callback + event 表达。比 nginx 易写但性能差一档(V8 + GC 的成本)。Connection 状态散落在多个 callback closure 里。

Go net/http(goroutine per connection):每个 connection 起一个 goroutine——状态在 goroutine 局部变量里。GC + go scheduler 让代码写起来像同步、跑起来异步。简单但单进程几十万并发会有 GC 压力

hyper(async/await + tokio):Connection 是一个 Future——状态在 Conn struct 里、状态机由 poll() 驱动。和 Go 类似的”代码像同步”体验、但没有 goroutine + GC 的开销——Future 直接编译成状态机代码、零运行时调度成本。这是 Rust 的核心承诺在 HTTP server 层的体现

5 种实现的权衡:

  • 易写:Go > Node > hyper > nginx > Apache
  • 性能:nginx > hyper > Go > Node > Apache
  • 内存效率:hyper > nginx > Go > Node > Apache
  • 调试难度:Apache > Go > hyper > Node > nginx

hyper 的”性能 + 易写 + 内存”三角接近最优——这是它在 cloud-native 时代越来越受欢迎的根本原因。Connection 编排的代码越精炼、性能-易用-资源效率三角就越好——本章讲的 1500 行 conn.rs + 800 行 dispatch.rs 就是这条逻辑的产物。

12.10.2.5.2 hyper Connection 的设计哲学三连问

读完本章后你应该能用三句话回答 hyper 的 Connection 设计哲学:

Q1:为什么要把 Conn 和 Dispatcher 拆开? A:分离 state 和 driver——state 容易测、driver 容易换。测 conn 状态机不需要 mock 整个 IO + Service——只需 mock 一个 trait。这种”分层 testing”是 hyper 能在每个 PR 加 5-10 个 case 还保持稳定的原因。

Q2:为什么状态机不用 actor model(每个 connection 一个 thread)? A:actor model 简单但内存重。hyper 用 Future 状态机让 10K connection = 10K Future = 几 MB 内存。性能/资源效率上 future 状态机吊打 thread-per-connection——这是 Rust async 的核心承诺。

Q3:为什么 Reading/Writing/KA 三 enum 分开、不合一个? A:正交性。Reading 决定”现在能不能读”、Writing 决定”现在能不能写”、KA 决定”完成后要不要复用”。三件事独立变化、合并到一个 enum 会有 N×M×K 种状态、组合爆炸。三 enum 让每个维度独立 reason about、bug 更少。

这三连问的答案是 hyper 设计的精髓——职责分离 + 性能优先 + 维度正交。任何写 connection-style 协议库的人都该思考这三个问题、给出自己的答案。如果你的设计违反这三条、大概率会有维护问题。

12.10.2.5.3 写一个最小 hyper-style Connection driver 的 50 行尝试

理解本章后给读者一个练手项目:用 100 行内代码实现一个 hyper-style 的 Connection driver——只服务一个 echo 协议(client 发什么、server 回什么)、但要包含本章讲过的所有核心 pattern:

struct Conn { state: State, buf_read: BytesMut, buf_write: BytesMut }
struct State { reading: Reading, writing: Writing, ka: KA }
enum Reading { Init, Body(usize), KeepAlive }
enum Writing { Init, Body(Bytes), KeepAlive }
enum KA { Busy, Idle, Disabled }

impl Conn {
    fn poll_loop<S: AsyncReadWrite>(&mut self, io: &mut S, cx: &mut Context) -> Poll<()> {
        for _ in 0..16 {
            let _ = self.poll_read(io, cx)?;
            let _ = self.poll_write(io, cx)?;
            if !self.wants_more() { return Poll::Ready(Ok(())) }
        }
        task::yield_now(cx).map(|_| Ok(()))
    }
    fn try_keep_alive(&mut self) {
        if matches!((&self.state.reading, &self.state.writing),
                    (Reading::KeepAlive, Writing::KeepAlive)) {
            if matches!(self.state.ka, KA::Busy) {
                self.state.reading = Reading::Init;
                self.state.writing = Writing::Init;
                self.state.ka = KA::Idle;
            } else {
                self.close();
            }
        }
    }
    // ... poll_read / poll_write / close / wants_more 略
}

100 行内不可能 cover 所有 case——但足够让你深刻理解为什么 hyper 要 1500 行。当你写到第 30 行就发现:“我的 buf_read 怎么处理 partial frame?” ——hyper 的 Buffered 给了答案。当你写到第 60 行发现:“取消怎么传播?” ——hyper 的 dispatch.rs 给了答案。

自己实现一遍是验证理解最快的方式——比读 10 遍源码有用。挑一个周末做这个练习、你对 connection-style protocol 的所有疑问都会清晰。

12.10.2.5.4 conn.rs 1500 行的代码结构地图

读完本章后再翻 hyper 的 conn.rs(1500 行)应该能直接 navigate——这里给一份代码区域 map 帮你快速定位:

行号区间内容何时去查
1-50use 声明 + Conn struct 定义看依赖结构
50-100Conn::new 构造 + State 初始化看默认配置
100-180builder-style 配置 setter.http1_xxx_timeout()
180-300poll_read_head 入口 + parse + timeout解码请求 header 的全部逻辑
300-430处理 1xx informational + 100-continue + h09edge case 处理
430-520poll_read_keep_alive + 三态 + force_io_readidle 状态处理
520-700poll_read_body + decoder 调用 + close_readbody 流式读取
700-850write_head + write_body + encoderresponse 写入
850-950poll_drain_or_close_read + close 系列异常关闭路径
950-1010State struct + 几十个字段找具体配置字段
1010-1140KA enum + State::busy/idle/close + try_keep_alivekeep-alive 状态机
1140-1530Cargo feature gated server/client 特殊代码 + tests找 server/client 差异

1500 行能 navigate 到具体功能区——这就是 “懂源码”的实操体现。任何遇到的 hyper connection 行为问题、你都知道去哪 100 行内找答案。

记 navigation 的核心方法:记关键函数名 + 类型名、不记具体行号。行号会随版本变、函数名 / 类型名相对稳定。grep -n "fn try_keep_alive" 比记 conn.rs:1072 实用 100 倍。

12.10.2.5.5 测试 hyper Connection 的几种实战手段

读懂源码后下一步是验证理解——下面 5 种测试 hyper connection 行为的实战手段:

1、hurl 工具发奇怪请求hurl 能精确控制 HTTP/1 请求的细节——header 顺序、Connection 字段值、Expect: 100-continue 等。比 curl 更适合测协议边界。

2、tcpdump/wireshark 抓包看真实字节:如果 hyper 的行为不符合预期、抓包能 100% 告诉你 wire 上发了什么。tcpdump -i lo -A 'port 8080' 在本地测试一行命令解决。

3、用 tokio-test 写 unit test:tokio_test::io::Builder 能 mock 一个 AsyncRead/AsyncWrite——用它 inject 各种异常字节序列、看 hyper Conn 的状态如何变。hyper 自己的 conn.rs:1156+ 的 test 模块就是这套用法。

4、tracing 启用 trace 级别日志:hyper 各处都有 trace!() 调用——RUST_LOG=hyper=trace cargo run 能看到每个状态转移、每次 IO。这是定位”hyper 内部到底走到哪了”的最直接手段。

5、miri 跑 connection 单元测试:miri 是 Rust 的 UB checker——能发现某些 unsafe 边界行为。hyper 的某些 unsafe(pin projection 等)需要 miri 验证。cargo +nightly miri test 跑一次。

这 5 种手段配合本章源码理解、能让你成为团队里的”HTTP/1 wire-level 专家”——遇到任何 hyper connection 异常都能用源码 + 工具组合定位。这种能力在 cloud-native 时代越来越值钱——k8s ingress、API gateway、service mesh proxy 都是 hyper 的下游用户。

12.10.2.6 hyper 1.9 与 0.14 的 try_keep_alive 对比演化

如果你之前读过 hyper 0.14 的 try_keep_alive、再看 1.9 的版本——能看到一个很微妙的演化:0.14 版本的相同函数有 5-6 个 match arm(因为更多的特殊路径),1.9 简化到 3 个。这不是因为功能减少、而是因为状态语义被精炼——以前需要专门处理的情况现在被融入到更基础的不变量里。

具体例子:0.14 的 try_keep_alive 有专门的 (Reading::KeepAlive, Writing::Init) 分支处理”读完了但还在等 user 产出 response”。1.9 取消这个分支——不是因为这个 case 不存在、而是因为 1.9 的 KA flag 状态机让这个分支可以走”啥都不做、等下次 try”通用路径。

“代码减少但能力不变”——不是删功能、是把多个特殊 case 合并到一个通用规则里。读 git log 看每次 try_keep_alive 的修改 commit 能看到设计思路的演进。

类似的演进在源码里随处可见:state.busy() 在 0.14 是直接覆盖,1.9 加了 KA::Disabled 不回退 的保护——避免一个 protocol error 让连接被错误地标 Busy 后又进入 keep-alive 循环。这条修改对应一个真实的 GitHub issue:某些 server 在 protocol error 后没正确关连接、导致下个 request 被错误处理。

12.10.2.7 几个真实生产 bug 的 root cause

几个真实出现过的生产 bug能用本章知识精确定位:

Bug 1:keep-alive 不工作、每个请求新 TCP

  • 症状:CPU 大部分耗在 TCP 三次握手、QPS 卡在几百
  • 排查:抓包看 connection 是否被 reset
  • 根因:response handler 漏调 Connection: close、或 client/server 一方 HTTP/1.0 / HTTP/1.1 协商错乱
  • 本章定位:§12.2.4 KA 的 BitAndAssign——一定是某行代码错误地 &= false

Bug 2:上传大文件 server hang

  • 症状:客户端 upload 30GB 文件、server 不返回 response
  • 根因:user handler 没有 read_to_end() body、hyper 的 body_tx channel 满了
  • 本章定位:§12.6.1 的 body.poll_ready 返回 Pending——poll_loop 卡在 read 上

Bug 3:slowloris 单 IP 100 连接打死 server

  • 症状:单 IP 攻击让 server 拒绝服务
  • 根因:没设 h1_header_read_timeout
  • 本章定位:§12.10.1 的 timer race——加 .http1_header_read_timeout(Duration::from_secs(10)) 即可

Bug 4:response 写到一半客户端断开、server CPU 100%

  • 症状:grafana 上 50x errors 不增加但 CPU 飙
  • 根因:socket 已断、hyper 不停尝试 write/flush 失败、busy loop
  • 本章定位:§12.7 的 poll_write/poll_flush——error mapping 必须正确将 IO 错误传上去关连接

Bug 5:HTTP/1 server 偶发”connection reset by peer”

  • 症状:客户端报错、server 日志没异常
  • 根因:keep-alive 时 client 已经关 socket、server 还想发新 response
  • 本章定位:§12.9.1 的 mid_message_detect_eof ——server 应该提前检测对端 EOF 关连接

这些 bug 的共同特点——没有读 hyper 源码很难定位——hung/reset/high CPU 这些症状指向的不是某行用户代码、是 hyper 内部状态机的某个分支。读懂本章后再遇到这种问题、能直接打开 conn.rs:XX 看哪条状态转移触发了

12.10.3.1 hyper 1.x vs 0.14 在 connection 设计上的演进

如果你之前用过 hyper 0.14 再来读 1.x 源码——能看到几个架构级的演进

1、Service trait 的精简:0.14 的 Service trait 有 Future 关联类型 + Response + Error + call 方法 + poll_ready。1.x 把 Future 改成 GAT(lifetime-aware)、签名更接近 std。接口表达力更强、但需要更新版的 rustc

2、IncomingBody 取代 Body:0.14 的 Body 是 universal type、用户可以传任意 body。1.x 改名 IncomingBody、明确”这是从 connection 进来的 body”——用户的 response 用自己的 B: Body 实现。类型系统更精确、避免”我以为 Body 是 streaming 但实际是 buffer”的混淆

3、显式 cooperative yield:0.14 不显式调 task::yield_now、靠 tokio runtime 的 budget 机制。1.x 显式用 yield_now——给非 tokio runtime 也能用 hyper 留口子。这是 hyper 走向”runtime-agnostic”的一步。

4、http-body 1.0 接口:0.14 用 http-body 0.4(每个 frame 是 Bytes)。1.x 用 http-body 1.0(frame 是 enum:Data/Trailers/…)——支持 trailers + 未来扩展

5、Conn 的字段精简:0.14 的 Conn 有 30+ 字段、1.x 通过 cfg-gating 把 server-only / client-only 字段分开——编译出来的 server-only binary 不带 client 字段、内存占用小

这些演进反映了 hyper 团队在 0.14 → 1.0 的 4 年里积累的工程教训——什么该精简、什么该精确化、什么该 runtime-agnostic。任何长期维护的库都会经历这种”先 push 大胆新功能、后回头精炼”的循环。读 hyper 不同版本的 changelog 是看这种演进的最佳方式。

12.10.4 几条 HTTP/1 connection 的”反直觉”事实

读完本章你应该获得几条很反直觉但真实的事实——这些在普通 HTTP server 教程里不会讲:

1、HTTP/1.1 keep-alive 默认 ON、HTTP/1.0 默认 OFF——但很多生产 server 错误配置成 HTTP/1.0 风格(每请求一连接)、损失 5-10x 性能。判断标准是 response header 是否带 Connection: keep-alive——HTTP/1.1 不需要带、HTTP/1.0 必须带才生效。

2、Pipelining 在浏览器里几乎无用——RFC 7230 允许、但实际 Chrome/Firefox 默认禁用 pipelining(因为 head-of-line blocking 风险)。hyper 支持 pipelining 但你的客户端 99% 用不上——看本章 §12.5.1 的 16 轮 poll loop 主要服务 server-to-server 场景。

3、Content-Length: 0 比省略 Content-Length 不一样——POST 一个 body=0 的请求必须Content-Length: 0(否则 server 不知道 body 是空还是 chunked)。这是 HTTP/1 spec 的微妙规定。hyper 的 Encoder 自动处理、用户不用操心、但理解协议帮你 debug 第三方 server 的 weird 行为。

4、Expect: 100-continue 在 HTTP/2 里被 deprecated、HTTP/3 完全没有——这条 HTTP/1 特性服务上传大文件时”先 OK 再发 body”——HTTP/2 的流控机制取代了它的功能。但很多老客户端还在发——hyper 的 Reading::Continue 状态专门处理。

5、Connection 数和并发数不一定相等——HTTP/1.1 即使 keep-alive、一条 connection 同时只处理一个请求;要并发就开多条连接。HTTP/2 一条 connection 多个 stream。所以”server 支持 10K 并发”对 HTTP/1 和 HTTP/2 是不同含义——HTTP/1 的 10K 并发 = 10K connection、HTTP/2 可能是 100 connection × 100 stream。

6、TCP keep-alive 不等于 HTTP keep-alive——TCP keep-alive 是 OS 级别的”心跳检测连接活着”、HTTP keep-alive 是 application 级别的”复用 connection 多个 request”。两者完全无关——很多新手混淆。本章讲的全部是 HTTP keep-alive。

这 6 条事实在生产 server 排错时反复出现——理解它们能让你少走很多弯路。

12.11.1 本章源码定位索引

为便于按图索骥(基于 hyper 1.9.0):

主题源文件关键行号
Conn structhyper-1.9.0/src/proto/h1/conn.rs38-42
State + Reading/Writing/KA enum同上930-1028
BitAndAssign for KA同上1013-1019
try_keep_alive 4 路同上1072-1091
idle + Client/Server 特殊路径同上1104-1134
poll_read_head 含 timeout同上211-310
poll_read_keep_alive 三态同上435-489
Dispatcher structhyper-1.9.0/src/proto/h1/dispatch.rs21-27
Dispatch trait 4 方法同上29-42
poll_loop 16 轮 cooperative yield同上165-193
poll_read 含取消处理同上195-269
poll_read_head + poll_ready 反压同上271-310
poll_write + body chunk 处理同上328-421

12.11.2 读完本章能回答的具体问题清单

作为本章自测:

  1. Reading × Writing × KA 共有多少种合法状态组合?(§12.2-3——理论 3×4×4=48、实际典型只有 5-6 种循环)
  2. KA 重载 &= 运算符是为了什么?(§12.2.4——单向锁语义、任何一处 false 就永久 disable)
  3. try_keep_alive 处理哪 4 种状态组合?(§12.8.1——双 KA、Read closed/Write KA、Read KA/Write closed、其他)
  4. idle 函数里 !T::should_read_first() 是什么意思?为什么要 set notify_read?(§12.8.1——Client 标识、立刻 poll 让 user 等待的请求能发出去)
  5. poll_loop 为什么循环 16 次?(§12.5——arbitrary、对应典型 pipelining benchmark + cooperative yielding)
  6. poll_read 区分两种 _canceled 错误为什么?(§12.6.1——早 drop drain、晚 drop close,性能 vs keep-alive 权衡)
  7. poll_read_head 为什么要先 poll dispatch.poll_ready?(§12.6.2——backpressure、避免 user service 不接受时还读字节 OOM)
  8. poll_write 为什么 discard 空 chunk?(§12.7.1——chunked encoding 空 chunk 字面是 EOM、错误写出会让 client 提前关流)
  9. write_body_and_end vs write_body + end_body 差在哪?(§12.7.1——前者一次 IO 写 chunk + terminator、省一次系统调用)
  10. slowloris 攻击是什么?hyper 怎么防?(§12.10.1——慢速 header 占用 connection slot、h1_header_read_timeout config + race timer)
  11. poll_read_keep_alive 的三个分支分别处理什么?(§12.9.1——已 closed Pending、mid-message EOF 报 incomplete、idle unexpected bytes 报 protocol error)
  12. state.keep_alive &= msg.keep_alive 的语义?(§12.10.2——任何一条 message 不允许 keep-alive 就永久 disable)

能答 8 条以上——你对 hyper HTTP/1 connection 编排的理解已经超越大多数 Rust 后端开发者。这种深度让你能 debug 生产 HTTP server 的所有 connection 异常——keep-alive 不工作、connection 早关、慢请求 hung 住——能直接定位到本章讲过的具体源码片段。

12.11.3 给写自己的 connection-style codec 的 8 条工程启示

如果你将来写自己的协议库(不一定是 HTTP——可能是自定义二进制协议、WebSocket-like、proprietary RPC)——hyper 这套 Conn + Dispatcher 模式给你 8 条可移植的启示:

1、状态用 enum 而不是 boolean——boolean 让 N 种状态变成 2^N 个 bool 组合、容易出现”非法但没人检查”的状态。enum 强制所有状态被穷尽 match。本章 §12.2 的 Reading/Writing/KA 三 enum 就是这个原则。

2、运算符重载表达不变量——§12.2.4 的 BitAndAssign 让”单向锁”语义有代数感。如果你的状态有”一旦 disable 就永久 disable”性质——重载 &= 让代码读起来像”我在 AND 一个布尔”——意图清晰。

3、driver 和 state 分离——本章 §12.4 的 Conn (state) 和 Dispatcher (driver) 拆成两个 struct。Conn 不知道 driver 怎么 poll 它、Dispatcher 不直接读写 socket。职责分离让单元测试容易——Conn 可以脱离 driver 测、Dispatcher 可以用 mock conn 测。

4、for _ in 0..N 的 cooperative yielding——§12.5 的 16 轮限制 + yield_now。任何 future 都不应该 starve 整个 executor——不管你的协议多简单、加这条防止饿死。

5、双层缓存(buffered IO + 应用层 buffer)——hyper 的 Buffered<I, EncodedBuf> 套两层。第一层把零碎 IO 系统调用合并、第二层让 application 层能预 build packet 不立刻 send。这种分层在所有性能敏感的协议库里都有。

6、用 state.method / state.version 这种 per-message 字段——HTTP/1 的某些行为依赖前一条 message 的字段(比如 HEAD 请求的 response 不该带 body)。connection state 不仅记当前状态、还记跨 message 的契约信息——你写的协议如果有类似跨消息约束、用同样的字段。

7、错误分级:本章看到的几种错误——new_incomplete(mid-message EOF)、new_unexpected_message(idle 收到字节)、new_header_timeout(slowloris)、new_user_body(user 业务错误)、new_io(底层 IO 错)。每种错误有专属构造函数 + 不同诊断——比一刀切 Box<dyn Error> 强 100 倍。

8、#[cfg(feature = "server")] / #[cfg(feature = "client")] 区分——避免 server 部署带 client 代码、反之亦然。用 cargo feature gate 精确控制 binary size——一些 hyper 用户只用 server、不会被 client 代码膨胀。

这 8 条不是 hyper 独家——任何成熟的协议库(h2、tokio-tungstenite、quinn)都用类似套路。协议库写作的”工程哲学”是相对稳定的——学会一种就能迁移到所有同类型项目。

12.12 落到你键盘上

  • RUSTFLAGS="--cfg hyper_unstable_tracing" 编译 hyper 并开 tracing——你会看到 Reading::Init → Reading::Body → Reading::KeepAlive → Reading::Init 这样的状态转移日志。感受状态机实时运转。
  • 尝试手动触发 pipelined requests:用 nc 手写两个连在一起的 HTTP/1.1 请求(GET /a HTTP/1.1\r\nHost: x\r\n\r\nGET /b HTTP/1.1\r\nHost: x\r\n\r\n),观察 hyper 怎么处理。你会看到 wants_read_again 的 loop 在工作。
  • 读 dispatch.rs 的 Client impl(600-700 行)——它和 Server impl 对称,但承载的是客户端侧的异步队列。读完一遍两者,你会对”同一个 Dispatcher 如何泛化到 Server/Client”有感性认识。

下一章,我们回到 trait 层面——看 hyper 1.x 为什么定义了自己的 Service trait,以及那个看似 “一个字符” 的差别(&self vs &mut self)背后是怎样一段三年讨论。