Skip to content

第12章 Connection Dispatcher 与 Role:请求分发状态机

12.1 字节之上的编排

上一章我们读完了 decode.rs / encode.rs / role.rs——Hyper 如何把一个请求的字节变成 http::Request、又把一个 Response 变回字节。但这只是 "单条消息" 的处理。

真实的 HTTP/1.1 连接不只是一条消息。一个连接上:

  • 客户端可能发多个请求(keep-alive)。
  • 服务端正处理上一个请求时,客户端可能已经发来下一个请求(pipelining——虽然 RFC 允许但实际很少开启)。
  • 响应 body 可能在请求读完之前就开始写。
  • 发生 Upgrade 时,整条连接要切换到另一个协议(WebSocket、HTTP/2)。

把所有这些协议事件编排成一个可以被 tokio::spawn(connection.await) 驱动的 future——这就是 conn.rsdispatch.rs 的职责。整整 2339 行代码,是整个 HTTP/1 协议栈的指挥中心

这一章我们把这个指挥中心拆开——Conn<I, B, T> 的状态机Dispatcher<D, Bs, I, T> 的 poll loop、它们如何把 Decoder / Encoder / user Service / body channel 编排成一个可正确取消、可 keep-alive、可 upgrade 的完整连接。

源码锁定:hyper 1.9.0 / hyper/src/proto/h1/conn.rs (1531 行) / hyper/src/proto/h1/dispatch.rs (808 行)。

12.2 Conn<I, B, T>:一条连接的所有状态

rust
// hyper/src/proto/h1/conn.rs:38-42
pub(crate) struct Conn<I, B, T> {
    io: Buffered<I, EncodedBuf<B>>,
    state: State,
    _marker: PhantomData<fn(T)>,
}

三个字段:

  • io:buffered IO,把原始 IO(TCP stream)包上一层读写缓冲。下一章专题讲。
  • state:连接的所有状态——下面详述。
  • _markerPhantomData<fn(T)>——TServerClient 的 tag,编译期分支用。

12.2.1 三套状态:Reading / Writing / KeepAlive

State struct 里有几十个字段(源码 930-961 行),但最核心的是三个 enum:

rust
// hyper/src/proto/h1/conn.rs:963-977, 1022-1028
enum Reading {
    Init,
    Continue(Decoder),
    Body(Decoder),
    KeepAlive,
    Closed,
}

enum Writing {
    Init,
    Body(Encoder),
    KeepAlive,
    Closed,
}

enum KA {  // keep-alive status
    Idle,
    #[default]
    Busy,
    Disabled,
}

三个 enum 是正交的——连接同时有一个"当前在读什么"的状态 + "当前在写什么"的状态 + "还打算复用吗"的状态。三者组合起来决定连接下一步该做什么。

12.2.2 Reading 状态意义

  • Init:还没收到 request header——等待客户端发第一个字节。
  • Continue(Decoder):收到了带 Expect: 100-continue 的请求头——发 100 Continue 响应,等 body 开始。
  • Body(Decoder):正在流式读 body——Decoder 按之前决定的长度语义(Length / Chunked / Eof)切帧。
  • KeepAlive:body 读完了——如果连接允许复用,回到 Init 状态等下一个请求;否则转 Closed。
  • Closed:不能再读了——要么收到了 FIN、要么协议错误、要么用户说别读了。

12.2.3 Writing 状态意义

  • Init:没有响应正在写——等待 user 产出 response。
  • Body(Encoder):正在写 response body——Encoder 负责 chunk 格式化 / length enforcement。
  • KeepAlive:响应写完了——等对应的 request 也读完就进入 Idle 阶段。
  • Closed:不能再写了。

12.2.4 KeepAlive 状态意义

KA 是最微妙的一个。它记录"这条连接能不能在当前 message 完成之后被复用":

  • Busy:当前正在处理请求。默认状态。
  • Idle:当前没有请求在跑,可以接收新的。
  • Disabled:永远不复用——下一条 message 结束后必须关。原因可能是 HTTP/1.0、Connection: close header、或者协议错误。

源码有个有意思的小妙招:

rust
// conn.rs:1013-1019
impl std::ops::BitAndAssign<bool> for KA {
    fn bitand_assign(&mut self, enabled: bool) {
        if !enabled {
            *self = KA::Disabled;
        }
    }
}

KA 重载了 &= 运算符——state.keep_alive &= false; 等价于"如果 false 就设为 Disabled"。这允许代码里这样写:

rust
state.keep_alive &= headers.allows_keep_alive();   // 可能 disable
state.keep_alive &= !headers.has_connection_close(); // 又可能 disable

任何一处 false 就永久 disable——这是一个"单向锁"的语义,用 &= 表达得比 if !x { disable() } 更紧凑、更有代数感。这种小运算符重载在很多工业级 Rust 代码里能见到。

12.3 Reading × Writing × KA 的合法转移

把三个 enum 的 product 算出来——3 × 4 × 4 = 48 种状态组合。但实际上大部分组合不合法。典型的生命周期是:

时刻 1:空连接
  Reading=Init, Writing=Init, KA=Busy

时刻 2:收到 request header
  Reading=Body(decoder), Writing=Init, KA=Busy

时刻 3:body 读完 + 开始写 response
  Reading=KeepAlive, Writing=Body(encoder), KA=Busy

时刻 4:response 写完
  Reading=KeepAlive, Writing=KeepAlive, KA=Busy

时刻 5:状态清理完,回到空 ←─ 这是 try_keep_alive 完成的
  Reading=Init, Writing=Init, KA=Idle

时刻 6:收到下一个 request header
  Reading=Body, Writing=Init, KA=Busy
  ...

这套循环通过 State::try_keep_alive() 推动——当 Reading 和 Writing 同时是 KeepAlive 时,检查 KA 状态决定回到 Init 还是 Closed。Source code 在 conn.rs:1108-1130,其实很短:

rust
// 精简版
fn try_keep_alive(&mut self) {
    match (&self.reading, &self.writing) {
        (Reading::KeepAlive, Writing::KeepAlive) => {
            if let KA::Busy = self.keep_alive.status() {
                self.keep_alive.idle();
                self.reading = Reading::Init;
                self.writing = Writing::Init;
            } else {
                self.close();
            }
        }
        ...
    }
}

读懂这段就掌握了 "keep-alive 复用" 的本质——不是给 socket 起别的名字,而是把两个状态枚举重置回 Init。socket 还是那个 socket、缓冲区还是那个缓冲区——只是这条连接愿意为下一个请求再跑一遍状态机。

12.4 Dispatcher<D, Bs, I, T>:顶层驱动

Conn 只负责状态,driver 在 Dispatcher 里:

rust
// hyper/src/proto/h1/dispatch.rs:21-27
pub(crate) struct Dispatcher<D, Bs: Body, I, T> {
    conn: Conn<I, Bs::Data, T>,
    dispatch: D,
    body_tx: Option<crate::body::Sender>,
    body_rx: Pin<Box<Option<Bs>>>,
    is_closing: bool,
}

五个字段:

  • conn:上面讲的连接状态。
  • dispatchD: Dispatch trait——Server / Client 的分支实现。这个后面细讲。
  • body_tx入站 body 的发送端——当连接读入 body 数据时,塞给这个 sender,让 user handler 能从对应的 Incoming 拿到数据。
  • body_rx出站 body 的接收端——user handler 产出的 Response body,由这里 poll frame 写回连接。
  • is_closing:是不是正在关闭。

12.4.1 Dispatch trait:Server/Client 的分界

rust
// hyper/src/proto/h1/dispatch.rs:29-42
pub(crate) trait Dispatch {
    type PollItem;
    type PollBody;
    type PollError;
    type RecvItem;

    fn poll_msg(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;

    fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>)
        -> crate::Result<()>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>>;
    fn should_poll(&self) -> bool;
}

四个方法:

  • poll_msg:产出"下一个要发送"的消息(含 body)。
  • recv_msg:消费"刚收到"的消息(含 body)。
  • poll_ready:底层是否能接受新请求?(映射到 Service::poll_ready)
  • should_poll:有没有待做的工作?

这个抽象是 Server 和 Client 的对称点:

Server 侧Client 侧
PollItem(要发的)StatusCode 响应行RequestLine 请求行
RecvItem(收到的)RequestLine 请求行StatusCode 响应行
poll_msg 的来源service.call(req).await 得到从用户 API 队列得到
recv_msg 的目的交给 service.call(req)交给等待响应的用户

两个 impl(Server 和 Client)在源码里分别占 dispatch.rs 500-600 行和 600-700 行。它们各自 handle user-facing 的 API 形状差异——Server 包一个 HttpService<B>、Client 包一个 mpsc receiver。

12.4.2 Server 侧 poll_msg

rust
// hyper/src/proto/h1/dispatch.rs:532-555 精简
fn poll_msg(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<Option<Result<(MessageHead<StatusCode>, S::ResBody), S::Error>>>
{
    let mut this = self.as_mut();
    let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
        let resp = ready!(fut.as_mut().poll(cx));
        // 拿到 user 的 Response<B>
        let (parts, body) = resp?.into_parts();
        let head = MessageHead {
            version: parts.version,
            subject: parts.status,
            headers: parts.headers,
            extensions: parts.extensions,
        };
        Ok((head, body))
    } else {
        unreachable!("poll_msg shouldn't be called if no inflight");
    };
    // 清空 in-flight slot
    this.in_flight.set(None);
    Poll::Ready(Some(ret))
}

逻辑:

  1. in_flightPin<Box<Option<S::Future>>>——这就是 user Service 产生的 future。
  2. poll 它,等 user handler 完成。
  3. 把 Response 拆成 head + body,head 交给 Conn 去写、body 留给 Dispatcher 的 body_rx 持续 poll。

12.4.3 Server 侧 recv_msg

rust
// 简化
fn recv_msg(&mut self, msg: crate::Result<(RequestHead, IncomingBody)>) -> crate::Result<()> {
    let (head, body) = msg?;
    let req = http::Request::from_parts(head_to_parts(head), body);
    let fut = self.service.call(req);
    self.in_flight.set(Some(fut));
    Ok(())
}

收到 request + body,构造 http::Request<Incoming>调用 user Service 的 call——把返回的 future 存进 in_flight

self.service.call(req) 就是 user handler 被实际调用的地方。整个 hyper 的所有工作——parse、decode、encode、state machine 管理——都是为了让这一行代码被调用。

12.5 poll_loop:一次 poll 做多少事

Dispatcher 的核心循环:

rust
// hyper/src/proto/h1/dispatch.rs:163-191
fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    // Limit the looping on this connection, in case it is ready far too
    // often, so that other futures don't starve.
    //
    // 16 was chosen arbitrarily, as that is number of pipelined requests
    // benchmarks often use.
    for _ in 0..16 {
        let _ = self.poll_read(cx)?;
        let _ = self.poll_write(cx)?;
        let _ = self.poll_flush(cx)?;

        if !self.conn.wants_read_again() {
            return Poll::Ready(Ok(()));
        }
    }

    trace!("poll_loop yielding (self = {:p})", self);

    task::yield_now(cx).map(|never| match never {})
}

三件事,循环最多 16 次:

  1. poll_read:从 IO 读 header 或 body。如果读到了 message,传给 dispatch.recv_msg
  2. poll_write:如果 user 产出了消息(从 dispatch.poll_msg),把 header 写到 buffered IO;如果正在写 body,poll user body 的 frame,通过 encoder 写。
  3. poll_flush:把 buffered IO 的字节真正提交到底层 socket。

12.5.1 为什么是 16

rust
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.

注释里自己说"arbitrary"——随便选的。但 16 有一定道理:

  • HTTP/1.1 pipelining 的 benchmarks 一般开 16 个并发——刚好一次 poll 处理完一个 batch。
  • 大多数小请求(header-only)一次 poll 就能完成。
  • 如果真有超 16 个 message 堆在同一个 socket 上——最后一行 task::yield_now(cx) 让出 task,让 runtime scheduler 有机会先跑别的 future。

这是一个典型的 cooperative yielding 模式——循环到某个上限后主动让贤,避免"饿死其他 task"。卷四《Tokio 源码深度解析》第 14 章(select! 与公平调度)里讨论过 Tokio runtime 的"Budget"机制——tokio 在 await 点会自动插入这类 yielding。Hyper 的 yield_now 是相同的思路——避免任何 future 独占 executor 太久

12.5.2 wants_read_again 的退出条件

这是 poll_loop 的另一个智慧:循环会提前退出,除非 "读出新东西后状态又变得可读":

rust
if !self.conn.wants_read_again() {
    return Poll::Ready(Ok(()));
}

大多数时候一次 poll 只做一件事——读一次、写一次、flush 一次,然后返回 Pending 等 IO。只有在"读完一条消息、紧接着下一条又在 buffer 里"(pipelining 场景)时才继续循环。

这样做的好处:非 pipelining 场景(占 99%)没有额外开销,pipelining 场景又能利用缓存中的字节提升吞吐。按 common case 优化是工业级代码的气质

12.6 poll_read:身兼数职

这是 Dispatcher 最复杂的一个方法(dispatch.rs:192-285,将近 100 行)。我们挑核心看:

rust
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
    loop {
        if self.is_closing { return Poll::Ready(Ok(())); }

        if self.conn.can_read_head() {
            // 情况 1:还没读 header——读 header
            ready!(self.poll_read_head(cx))?;
        } else if let Some(mut body) = self.body_tx.take() {
            if self.conn.can_read_body() {
                // 情况 2:header 读过了、正在流式读 body
                match body.poll_ready(cx) { ... }
                match self.conn.poll_read_body(cx) {
                    Poll::Ready(Some(Ok(frame))) => {
                        if frame.is_data() {
                            body.try_send_data(frame.into_data().unwrap())
                        } else if frame.is_trailers() {
                            body.try_send_trailers(frame.into_trailers().unwrap())
                        }
                        ...
                    }
                    ...
                }
            }
        } else {
            // 情况 3:没有正在流的 body,可能是 keep-alive 等待
            return Poll::Ready(Ok(()));
        }
    }
}

读的三种情况:

  1. 还没读 header:调 poll_read_head —— 从 IO 读字节,调 role::parse_headers 解析。解析成功后构造 IncomingBody 的 channel(body_tx 这边存进 Dispatcher,另一边通过 dispatch.recv_msg 包进 http::Request 交给 Service)。
  2. 正在读 bodybody.poll_ready(cx) 检查 body 接收者还想继续读(这就是第 10 章讨论的 want_tx 信号);检查通过后 poll decoder 拉一帧,送到 body_tx。
  3. 没活儿干:return Ready——让 poll_write 或者下一轮再试。

注意嵌套的三层 Poll 对话:

  • dispatcher poll_read 问 Conn:能读 body 吗
  • Conn 问 decoder:decode 一帧吧
  • Decoder 问 buffered IO:要读字节
  • Buffered IO 问 underlying socket:有字节来吗

任何一层 Pending,整条链挂起。任何一层有东西,信号向上反馈——一路到 dispatcher 把 frame 塞到 body_tx。

12.7 poll_writebody_rx 的互动

与 poll_read 对称的 poll_write(略):

  1. 如果 Writing::Init 且 dispatch 有 message——poll_msg 拿 head + body,encode_headers 写 header,body_rx 记住;writing 切到 Body。
  2. 如果 Writing::Body——poll body_rx 下一帧,用 Encoder 格式化(chunked / length)写 buffered IO。
  3. body 写完——Writing::KeepAlive;try_keep_alive 检查是否可以复用。

核心循环模式相同:循环往 IO 推数据,直到 Pending

12.7.1 body_rx 的 Pin<Box<Option<Bs>>>

这个类型很有意思:

rust
body_rx: Pin<Box<Option<Bs>>>,

三层包装——Box 堆分配、Pin 固定地址、Option 允许清空。为什么要这么复杂?

  • Pin:user 的 Body 可能是 !Unpin(典型例子是 async_stream! 宏生成的 Stream),需要 pin 才能 poll。
  • BoxPin<&mut Bs> 需要 Bs 在内存中地址稳定——堆分配保证这一点。
  • Option:body 写完之后要被 drop——set(None) 清空 Box 的内容。

Pin<Box<Option<T>>> 这种三层包装是 Rust 异步代码里处理"可替换、可 pin、可 unpin 的 future/stream"的标准模板。理解它比去死记它来得实用——下次你遇到类似需求,知道怎么拼就行了。

12.8 错误、Upgrade、Shutdown 的正交处理

Dispatcher 还要处理三类"不走正常流程"的事件。

12.8.1 错误:poll_catch

rust
// dispatch.rs:122-142
fn poll_catch(&mut self, cx: &mut Context<'_>, should_shutdown: bool)
    -> Poll<crate::Result<Dispatched>>
{
    Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
        // Be sure to alert a streaming body of the failure.
        if let Some(mut body) = self.body_tx.take() {
            body.send_error(crate::Error::new_body("connection error"));
        }
        self.dispatch.recv_msg(Err(e))?;
        Ok(Dispatched::Shutdown)
    }))
}

任何 poll_inner 的错误都被 poll_catch 吞下:

  1. 如果有正在接收的 body——把错误发到 body_tx,让 handler 侧的 body.poll_frame 看到 Err
  2. 告诉 dispatch(Server / Client)有错误发生——让它优雅退出,而不是 panic 或挂起。
  3. 返回 Dispatched::Shutdown——让上层知道连接该关了。

错误不会静默消失——它顺着 body channel 和 dispatch 接口一起传给 user。这是 Hyper 可靠性的一个基石。

12.8.2 Upgrade:特殊分支

当客户端发 Upgrade: websocket + 服务端响应 101 Switching Protocols,连接就脱离 HTTP/1 协议了——TCP stream 被交给 user 继续用。

rust
// dispatch.rs:144-164 简化
if self.is_done() {
    if let Some(pending) = self.conn.pending_upgrade() {
        self.conn.take_error()?;
        return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
    } else if should_shutdown {
        ready!(self.conn.poll_shutdown(cx))...;
    }
    ...
}

pending_upgrade() 返回一个 Pending<Upgraded> ——这个 handle 会在未来的某一刻拿到 raw socket(通过 Upgraded::downcast_into())。Dispatcher 的工作到这里结束——它不会再读写 socket,让给 upgrade handler。

Upgrade 的完整机制是第 18 章的主题。这里只知道 Dispatcher 通过 Dispatched::Upgrade(pending) 这个 退出码 把"连接控制权"交回给上层。

12.8.3 Shutdown 的优雅顺序

rust
else if should_shutdown {
    ready!(self.conn.poll_shutdown(cx))?;
}

poll_shutdown 调到 AsyncWrite::poll_shutdown——在 TCP 上发一个 FIN,告诉对端"我不再发了"。对端会看到 EOF,自然关闭它的 read half。

这个机制是 TCP 半关闭——hyper 不会粗暴地 close 整个 socket。对端还能继续读缓冲里没被消费的字节(比如你的响应最后几个字节),直到正常 EOF。这是和"快速关"的关键差别——快速关会丢数据,半关保证完整。

12.9 Role 的 maybe_notify 小玩法

最后看一段鲜为人知但极其 clever 的代码。Conn state 里有一个字段:

rust
notify_read: bool,

它是一个"我需要被再 poll 一次"的标记。当 Dispatcher 处于 "body 读完了但 KeepAlive 状态还没复用"的瞬间,如果已经有下一个 request 的字节在 buffer 里——不让 task 挂起、直接循环回 poll_read——这就是 wants_read_again 的意义。

rust
// dispatch.rs:186-190
if !self.conn.wants_read_again() {
    return Poll::Ready(Ok(()));
}

简单两行,但它是 hyper pipelining benchmark 里"吞吐不崩"的秘诀。没有这个检查,每个 pipelined request 要多一次 runtime wake-up 循环——在高并发下开销显著。

有注释特别提到:

Using this instead of task::current() and notify() inside the Conn is noticeably faster in pipelined benchmarks.

翻译:不通过 waker re-notify,直接把下一轮 loop 继续跑——在 pipelined benchmark 里明显更快

这就是为什么读 hyper 源码你会一再看到"为 benchmark 调优"的痕迹——它是一个 benchmark-driven 的工程代码。每一处非朴素的代码背后几乎都有一个 benchmark 数字推动。

12.10 一张连接生命周期图

把整章讲的东西画成图:

TCP accept

Conn::new(io) — state.reading=Init, writing=Init, KA=Busy

Dispatcher::new(svc, conn)

tokio::spawn(dispatcher) ─── 进入 poll_loop

        ├─ poll_read   ───── role::parse ─── dispatch.recv_msg ─── svc.call(req)
        │                                                              │
        ├─ poll_write  ───── dispatch.poll_msg ─── role::encode ───── writes header
        │                    │
        │                    └── body.poll_frame ─── encoder 写 body

        ├─ poll_flush  ───── underlying socket

        └─ wants_read_again? ─── 继续循环 / yield

事件发生:
  - 读完 body       → Reading::KeepAlive
  - 写完 body       → Writing::KeepAlive
  - try_keep_alive  → 若 KA=Busy,reset 到 Init(开始下一轮)
  - 或 KA=Disabled  → close(); poll_shutdown
  - 或 Upgrade      → Dispatched::Upgrade(pending),Dispatcher 退出

这张图浓缩了 2339 行代码的指挥流。读懂它你就读懂了 hyper 1.x HTTP/1 的全部顶层逻辑。

12.11 和其他章节的呼应

  • 第 11 章讲的 Decoder / Encoder 被 Conn::state 的 Reading / Writing 持有。
  • 第 10 章讲的 Incoming::Chan { data_rx, want_tx } 的 sender 端就是 Dispatcher 的 body_tx——want_tx 的信号传到 Dispatcher 的 poll_read,决定是否 poll decoder。
  • 第 13 章(下一章)讲的 hyper::Service trait 是被 Server::recv_msg 这一行调用:self.service.call(req)
  • 第 18 章会接回 pending_upgrade() 的流程。

整本书到这里进入"hyper 核心最难的部分"——协议状态机。读完第 11、12 章你应该感到有些累——因为它们是全书最 intricate 的章节。但这也是收获最大的两章——这就是工业级 HTTP/1 实现,没有捷径。

12.12 落到你键盘上

  • RUSTFLAGS="--cfg hyper_unstable_tracing" 编译 hyper 并开 tracing——你会看到 Reading::Init → Reading::Body → Reading::KeepAlive → Reading::Init 这样的状态转移日志。感受状态机实时运转。
  • 尝试手动触发 pipelined requests:用 nc 手写两个连在一起的 HTTP/1.1 请求(GET /a HTTP/1.1\r\nHost: x\r\n\r\nGET /b HTTP/1.1\r\nHost: x\r\n\r\n),观察 hyper 怎么处理。你会看到 wants_read_again 的 loop 在工作。
  • 读 dispatch.rs 的 Client impl(600-700 行)——它和 Server impl 对称,但承载的是客户端侧的异步队列。读完一遍两者,你会对"同一个 Dispatcher 如何泛化到 Server/Client"有感性认识。

下一章,我们回到 trait 层面——看 hyper 1.x 为什么定义了自己的 Service trait,以及那个看似 "一个字符" 的差别(&self vs &mut self)背后是怎样一段三年讨论。

基于 VitePress 构建