Hyper 与 Tower:工业级 HTTP 栈

第10章 http-body 与 Body trait:frame、trailers、size_hint

作者 杨艺韬 · 11,952 字

第10章 http-body 与 Body trait:frame、trailers、size_hint

10.1 “body 到底是什么”这个被忽视的问题

上一章我们读完了 http crate——整个 HTTP 数据模型的骨架。但骨架里有一个地方故意留空:body。

Request<T>Response<T> 里的 T 是什么?答案是”随便什么”——StringVec<u8>()BytesIncomingBoxBody……任何类型都可以放进去。这不是 API 设计偷懒——body 本身就是一个独立的抽象问题,它值得一个独立的 crate。

这就是 http-body 的角色。它定义了一个很短的 trait:

// http-body/src/lib.rs:38-88
pub trait Body {
    type Data: Buf;
    type Error;

    fn poll_frame(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

    fn is_end_stream(&self) -> bool { false }
    fn size_hint(&self) -> SizeHint { SizeHint::default() }
}

三个方法,十几行。但它是整个 Rust HTTP 生态”body 作为流”的公共语言。这一章我们把它、以及它的小伙伴 Frame / SizeHint / http-body-util / hyper::body::Incoming 全部读一遍。

源码锁定:http-body 1.0.1 / http-body-util 0.1.3(commit c8cb37f)。

10.2 为什么要把 body 做成”流”

Naive 的 HTTP 服务器会把整个 body 读进一个 Vec<u8> 再交给 handler 处理。这种做法在小请求(JSON API)上合理,但在三种场景下会灾难:

  1. 上传大文件:100MB 的文件请求,Vec 化之后 100MB 都在服务进程的堆上。1000 个并发上传 = 100GB 内存 = OOM。
  2. 流式响应:服务端一边生成数据一边响应(SSE / 流式 JSON / 视频)。等生成完才发——用户感觉卡死。
  3. HTTP/2 stream:HTTP/2 的流天生是分帧的,一个 frame 一个 frame 到,如果要求组装成 Vec 会阻塞流控。

所以 body 必须是一个惰性、增量、双向驱动的流——调用方 poll_frame 拉一帧、用完释放、再拉下一帧。http-body::Body 正是这个抽象。

10.2.1 type Data: Buf 而不是 Vec<u8>

Body 的数据类型是 type Data: Buf——关联类型,约束到 bytes::Buf trait。

为什么不是具体的 BytesVec<u8>?因为不同来源的 body 产出的类型不同:

  • HTTP/1 解析器从内部环形缓冲读一段出来——那是一个 Bytes(引用计数切片)。
  • 用户从 file 读一段——可能是 Vec<u8>
  • gRPC codec 产生的是预先对齐的 protobuf Bytes
  • 测试代码里可能是 &'static [u8]

要求所有 body 都先转成 Vec 或 Bytes 会强制一次 copy。解决办法是让 Data 类型保持泛型——只约束”能像一个 byte cursor 那样被读取”,这正是 bytes::Buf 的契约。

Buf trait 本身是 bytes crate 的核心抽象——提供 chunk() -> &[u8]advance(n)remaining() -> usize 等方法。一个类型能做到这些,就能被当作”可推进的字节序列”使用——不管它底下是单 slice、多 slice 的 chain、还是离散的链表。

这种”不绑定具体容器类型”的 trait 设计在 Rust 生态很普遍。卷四《Tokio 源码深度解析》第 8 章(I/O Driver)里 AsyncRead 里也有类似的 “read into anywhere that implements BufMut” 的思路——把数据移入/移出的”目标类型”延迟到具体场景,是 Rust 高性能库的共同气质。

10.2.2 Body trait 只有一个 required method 的克制

回看 Body trait 的签名——只有 poll_frame 是必须实现的is_end_streamsize_hint 都有默认 impl:

pub trait Body {
    type Data: Buf;
    type Error;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

    fn is_end_stream(&self) -> bool { false }
    fn size_hint(&self) -> SizeHint { SizeHint::default() }
}

三个方法里只有一个 required、两个 provided——这是 Rust trait 设计的一个黄金平衡点。想实现 Body 的用户只需要提供 poll_frame、其他两个有合理默认:

  • is_end_stream 默认 false——告诉上层 “我不知道什么时候结束、每次 poll_frame 才知道”。保守但永远安全。
  • size_hint 默认返回 SizeHint::default()(lower=0, upper=None)——表达 “我没法预知大小”。也是保守默认。

为什么 Body 不把这两个方法也变 required?——

  • 很多 body 实现确实不知道答案(比如从网络来的流、从异步迭代器来的流)——强制实现只会让代码抛不合适的默认值
  • 默认值足够准确——保守的 falseNone 不会误导消费者

为什么 Body 不把默认 impl 拆到 BodyExt?——

  • 这两个方法和 Body 的语义强耦合——不是”可选扩展”、是 “Body 的本质”
  • 消费 Body 的代码(hyper、axum)频繁调用这两个方法——trait method 的虚调用(对 trait object 而言)比 BodyExt 的 trait bound 查找更直接

这种”必需方法最小化、可选方法默认实现”的模式在 Iterator trait(next 是必需、100+ 其他方法都有默认)、Future trait(poll 必需)里反复出现——是 Rust 最成熟的 trait 设计模式之一。

10.3 poll_frame:一次拉一帧

fn poll_frame(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;

返回类型层层嵌套,但语义清晰:

  • Poll::Pending:下一帧还没准备好(等网络、等上游、等磁盘)。
  • Poll::Ready(None):流结束,再也没东西了。
  • Poll::Ready(Some(Ok(frame))):有一帧可以消费。
  • Poll::Ready(Some(Err(e))):读取错误。

三种”有东西”的情况都塞进一个返回类型——Pending 等待、Ready+None 结束、Ready+Some 产出。这是 Rust Stream trait 的标准惯用形式。事实上如果你看 futures::Stream

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<Option<Self::Item>>;

一模一样的签名——只是把 Item 具体化成了 Result<Frame<Data>, Error>Body 本质就是一个”会产出 Frame 的 Stream”,只不过带了一些 HTTP 特有的 helper 方法。

10.3.1 为什么不直接用 Stream trait

这是个好问题,有两个答案。

第一,Stream trait 曾经不稳定futures::Stream 在 futures 0.3 时代是第三方 trait,进入 core 的过程持续了很多年。http-body 不想依赖一个外部 crate 的 trait——于是自己定义。

第二,语义更精确Stream 是通用”连续产出 Item”的抽象;Body 明确表达”这是一个 HTTP body”。两个 helper 方法(is_end_streamsize_hint)是 HTTP 特有的——HTTP/2 需要 is_end_stream 提示来决定是否发 END_STREAM flag;size_hint 影响 Content-Length header。把这些放在独立 trait 上比做 Stream::size_hint(不存在)更合适。

这是另一个 “基础语义 + 领域 helper”的 trait 拆分例子,和第 2 章 Service trait 与 Serializer trait 一脉相承。

10.4 Frame<T>:两种可能

// http-body/src/frame.rs:3-17
#[derive(Debug)]
pub struct Frame<T> {
    kind: Kind<T>,
}

enum Kind<T> {
    // The first two variants are "inlined" since they are undoubtedly
    // the most common. This saves us from having to allocate a
    // boxed trait object for them.
    Data(T),
    Trailers(HeaderMap),
    //Unknown(Box<dyn Frameish>),
}

一个 frame 有两种可能的内容:

  • Data(T):字节数据,T: Buf
  • Trailers(HeaderMap):HTTP/2 的 trailing headers——body 结束后附带的 header。

10.4.0 Frame<T> 的 newtype pattern 而不是直接暴露 enum

打开 frame.rs:3-17——注意 Frame 不是直接的 enum

#[derive(Debug)]
pub struct Frame<T> {
    kind: Kind<T>,
}

#[derive(Debug)]
enum Kind<T> {
    Data(T),
    Trailers(HeaderMap),
    //Unknown(Box<dyn Frameish>),
}

Frame 是一个 struct、里面包着一个 private enum Kind——这是newtype pattern 的变体。为什么?

如果 Frame 直接写成 pub enum Frame<T> { Data(T), Trailers(HeaderMap) }——用户可以直接 match

match frame {
    Frame::Data(d) => ...,
    Frame::Trailers(t) => ...,
}

但这就把 variant 集合暴露成了公开 API——未来 http-body 想加第三种 frame(就像注释里提的 Unknown)、是 breaking change——所有 match 都要加新分支。

用 struct + private enum + 暴露 helper 方法is_datadata_refis_trailers 等)——未来加新 variant只需要加新 helper、老代码继续工作、旧 match 不存在(因为用户 match 不到 private enum)。

这是 API 稳定性的经典模式——enum 是内部实现细节、对外用方法暴露。std 里的 CowDuration、很多类型都这么写——允许内部 representation 变化而不破坏用户代码。

代价是用户代码的 match 变成一串 if-else

if frame.is_data() {
    let data = frame.data_ref().unwrap();  // 稍微啰嗦
}

但这是可接受的代价——换来 http-body 可以安全地演进。

10.4.1 Trailers 是 HTTP/2 / gRPC 的必需品

初学 HTTP 的人可能没接触过 trailers——它是 HTTP/1.1 存在但很少用、HTTP/2 普遍用、gRPC 必须用的功能。

典型用途:gRPC 把状态码放在 trailer 里。因为 gRPC 的响应体是一个 protobuf 流——请求成功还是失败、错误细节是什么——都不能放在响应头里(响应头要在 body 之前发,那时还不知道 body 处理结果)。解决方案:响应 200 OK、body 里是 protobuf 流、流结束后再发一个 trailer grpc-status: 0。所以 gRPC 客户端必须等到 trailer 才知道请求是否真的成功。

http-body 把 data 和 trailer 统一成一个 Frame——调用方 poll_frame 读到的东西可能是这两种之一:

loop {
    match body.poll_frame(cx).await {
        Some(Ok(frame)) => {
            if let Some(data) = frame.data_ref() {
                // 处理字节数据
            } else if let Some(trailers) = frame.trailers_ref() {
                // 处理 trailing headers
            }
        }
        Some(Err(e)) => return Err(e),
        None => break, // end of stream
    }
}

10.4.2 那个被注释掉的 Unknown variant

//Unknown(Box<dyn Frameish>),

这一行是留给未来的。作者早期考虑过给 frame 加第三种 variant——“未知类型的帧”,让扩展协议可以携带非 Data/Trailers 的内容。但这会引入 trait object 和堆分配——破坏 http-body 的零成本气质。最终决定注释掉——但留着这个”曾经考虑过”的痕迹,作为未来讨论的起点。

这也是读工业级开源代码的一个乐趣——注释里的”—“比代码本身还透露信息。你看到这行注释,就知道 http-body 的作者是有意决定保持 Frame 闭合——任何需要更多 frame 种类的协议都要自己扩展,不能通过 trait object 硬塞进 Frame。

10.4.2.5 into_data 返回 Result<T, Self> 的 by-value-or-return 模式

into_data 的签名:

pub fn into_data(self) -> Result<T, Self> {
    match self.kind {
        Kind::Data(data) => Ok(data),
        _ => Err(self),
    }
}

失败时不返回 NoneError、而是返回 Err(self) 把原 Frame 放回——这样用户可以继续使用那个 Frame

典型用法:

let frame = match frame.into_data() {
    Ok(data) => {
        process(data);
        return;
    }
    Err(f) => f,  // 拿回原 Frame、继续尝试其他操作
};

match frame.into_trailers() {
    Ok(headers) => handle_trailers(headers),
    Err(_) => unreachable!(),
}

如果 into_data 返回 Option<T>——失败时 Frame 被消耗掉、用户没办法再提取 trailers。返回 Result<T, Self>失败场景下原对象被归还

这就是 Rust 所有权系统里处理 “消耗式转换、失败时还原” 的惯用模式——比 先 is_xxx 检查、再 into_xxx 消耗 更安全(避免 TOCTOU 风险、少一次 branch 检查)。

其他类似模式的例子:TryInto<T>::try_into(self) -> Result<T, Self::Error>——错误类型通常也携带原值、让用户能恢复。

这种 占有 self 但失败时归还 的 API 比 Python 的 duck typing 或 C++ 的引用语义健壮得多——所有权永远有明确的归属、不会有 “对象在失败后处于未定义状态” 的问题。

10.4.3 map_data:只变换数据

pub fn map_data<F, D>(self, f: F) -> Frame<D>
where F: FnOnce(T) -> D,
{
    match self.kind {
        Kind::Data(data) => Frame { kind: Kind::Data(f(data)) },
        Kind::Trailers(trailers) => Frame { kind: Kind::Trailers(trailers) },
    }
}

变换数据类型,trailers 原样透传。这让你能在中间件里”把所有 Data frame 的类型从 Bytes 变成 String”而不影响 trailers。是 Frame 上最常用的 combinator。

10.5 SizeHint:一个 lower/upper bound 对

// http-body/src/size_hint.rs:7-11
pub struct SizeHint {
    lower: u64,
    upper: Option<u64>,
}

表达”body 至少有多少字节、最多有多少字节(可能未知)”。

用途:

  • 服务端决定要不要发 Content-Length: N header。如果 size_hint.exact() 返回 Some(N)——设这个 header;否则用 Transfer-Encoding: chunked
  • 客户端决定 body buffer 要预分配多大。取 upper 作为预分配量,避免多次 realloc。
  • HTTP/2 的 flow control 决策:知道 body 有多大,可以提前预算流控窗口。

10.5.0 SizeHint::exact() 的相等判断实现

size_hint.rs:69-76exact 方法:

#[inline]
pub fn exact(&self) -> Option<u64> {
    if Some(self.lower) == self.upper {
        self.upper
    } else {
        None
    }
}

Some(self.lower) == self.upper 是一个巧妙的比较——Option<u64> == Option<u64> 处理 upper 为 None 的情况

三种可能情形:

  • lower=100, upper=Some(100)Some(100) == Some(100) = true → 返回 Some(100)
  • lower=100, upper=Some(200)Some(100) == Some(200) = false → 返回 None
  • lower=100, upper=NoneSome(100) == None = false → 返回 None

第三种情形是关键——upper 未知时、无论 lower 是多少都不能说 “exact”。Option 的相等语义自动处理了这个——写成 self.lower == self.upper.unwrap_or(0) 就错了(lower=0 时会返回 Some(0)、但实际是 “不知道”)。

这种用 Option::eq 自动处理 unknown 的模式是 Rust 的一个美学——不用写一堆 if let Some(u) = upper { if u == lower { ... } } 的嵌套、直接利用类型系统。

set_exact 的对称实现:

pub fn set_exact(&mut self, value: u64) {
    self.lower = value;
    self.upper = Some(value);
}

一次设置两者相等——比用户自己 set_lower(n); set_upper(n)原子化(中间不会处于 lower > old_upper 的非法态)。API 设计让常见操作一步完成、同时禁止中间态——好 API 的标志。

10.5.1 set_lower / set_upper 的 panic 保护

pub fn set_lower(&mut self, value: u64) {
    assert!(value <= self.upper.unwrap_or(u64::MAX));
    self.lower = value;
}

pub fn set_upper(&mut self, value: u64) {
    assert!(value >= self.lower, "`value` is less than than `lower`");
    self.upper = Some(value);
}

维持不变式 lower <= upper——违反就 panic。这是一个小而严肃的 defensive programming——与其容忍不合理状态传播,不如早死早超生

10.5.1.5 assert! 而非 Result—— 为什么选择 panic

set_lower / set_upperassert! 而不是返回 Result<(), Error>

pub fn set_lower(&mut self, value: u64) {
    assert!(value <= self.upper.unwrap_or(u64::MAX));
    self.lower = value;
}

为什么选 panic?——因为违反 lower <= upper 不变式是编程逻辑错误、不是 “业务可以处理的失败”。API 使用者的 bug、不应该被 silent 吞掉或者要求每次 check Result

对比 Result 版本会是这样:

pub fn set_lower(&mut self, value: u64) -> Result<(), InvariantError> {
    if value > self.upper.unwrap_or(u64::MAX) {
        return Err(InvariantError);
    }
    self.lower = value;
    Ok(())
}

调用方每次都要 ?、还可能选择”吞掉 error 继续”——那就是 silent bug。panic! 让错误立刻暴露、调用方必须修才能继续。

Rust 文化里 contract violation = panic;business error = Result 是一条清晰的分界线。这里 lower > upper 是 contract violation。

#[inline] 配合 assert——release build 里 assert! 不会被优化掉、但如果用户总是传合法值、JIT/branch predictor 会让这个 assert 接近免费(可预测的 branch 成本几乎为 0)。生产开销可忽略、换来的是 strict 的 invariant 维护。

10.5.2 一个被写进单元测试的数学证明

http-bodysize_hint.rs 里有一段相当罕见的东西——一个把数学证明写进单元测试的 block(文件 100-173 行):

#[test]
fn size_hint_addition_proof() {
    // assuming addition itself is perfect, there are 3 distinct states:
    // (_, Some(_)) + (_, Some(_)) => (_ + _, Some(_ + _))
    // (_, Some(_)) + (_, None) => (_ + _, None)
    // (_, None) + (_, None) => (_ + _, None)
    //
    // we can assert this in the typesystem! (and name them for our tests)
    match (to_parts(SizeHint::new()), to_parts(SizeHint::new())) {
        ((_, Some(_)), (_, Some(_))) => {} // 1
        ((_, None), (_, None)) => {}       // 2
        ((_, Some(_)), (_, None)) => {} // 3
        ((_, None), (_, Some(_))) => {}
    }
    ...
}

这段代码用 Rust 的 match exhaustiveness 来证明”SizeHint 加法的四种 upper 状态都被覆盖到了”。如果未来有人改 SizeHint 数据结构加了新 variant,这个 match 会编译失败——强制修改者更新证明。

这比把证明写在注释里强得多——证明变成了可执行的、会随代码变化自动 rechecking 的东西。这是 Rust 作者圈子里的一种美学——用类型系统辅助文档化不变式,既是注释也是检查。

10.5.3 #[inline] 的性能意义:为什么 SizeHint 全标上

size_hint.rs每一个公开方法都加了 #[inline]

#[inline]
pub fn new() -> SizeHint { ... }

#[inline]
pub fn with_exact(value: u64) -> SizeHint { ... }

#[inline]
pub fn lower(&self) -> u64 { self.lower }

#[inline]
pub fn upper(&self) -> Option<u64> { self.upper }
// ...

#[inline] 作用是什么?——告诉编译器 “这个函数尽量在调用点展开、不走函数调用”。对 SizeHint 这种小结构体的 getter、inline 后直接读字段——零函数调用开销。

为什么不依赖编译器自动 inline?——因为 SizeHint 定义在 http-body crate、跨 crate 调用时(比如 hyper 调 SizeHint 的 getter)、编译器默认不 inline(跨 crate 的函数没有 inlining hint 就只 inline 非常短的代码)。#[inline] 是一个显式请求、让跨 crate 调用也能 inline。

hyper 每个 response 都要调 size_hint().exact() 决定 Content-Length header——千万次 / 秒的频次。inline 之后这个”调用”变成 “直接读两个字段比较”——和直接访问 struct 字段没区别。

inline 的代价:binary size 略增(每个调用点展开一次函数体)。对这种几行代码的小方法、体积开销可忽略、换来每次调用省 ~5-10 cycles 的函数 call overhead。

这是 Rust 小型数据结构库的标准做法——所有 getter 和 trivial 构造器标 #[inline]BytesDurationMutex 源码里都是这样。

10.6 http-body-util:常用实现 + combinators

trait 定义完了,谁来实现?答:http-body-util 这个 sibling crate。它提供了四类实现:

  1. 静态 bodyEmpty<D>(没 body)、Full<D>(一整块 body)。
  2. 流式 bodyStreamBody<S>(包装一个 Stream)、BodyDataStream<B>(反向:Body 适配成 Stream)。
  3. 通道 bodychannel() 返回 (Sender, Body),生产者写、消费者读。
  4. 修饰 bodyLimited(限制总字节数)、BoxBody(类型擦除)、MapErrMapFrame

以及一个 BodyExt trait——给 Body 添加一堆 combinator 方法(map_errmap_framecollect()boxed())。

10.6.1 Full<D>:最简单的 Body 实现

// http-body-util/src/full.rs:10-74 精简
pub struct Full<D> {
    data: Option<D>,
}

impl<D: Buf> Body for Full<D> {
    type Data = D;
    type Error = Infallible;

    fn poll_frame(mut self: Pin<&mut Self>, _cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<D>, Self::Error>>>
    {
        Poll::Ready(self.data.take().map(|d| Ok(Frame::data(d))))
    }

    fn is_end_stream(&self) -> bool { self.data.is_none() }

    fn size_hint(&self) -> SizeHint {
        self.data.as_ref()
            .map(|data| SizeHint::with_exact(u64::try_from(data.remaining()).unwrap()))
            .unwrap_or_else(|| SizeHint::with_exact(0))
    }
}

20 行代码——这是一个完整的 Body 实现。分解:

  • Option<D> 存数据:Some(d) 表示”还没发”,None 表示”已经发完”。
  • 第一次 poll_frametake() 把数据拿出来,构造一个 Frame::data(d) 返回。
  • 第二次 poll_frameself.data 已经是 None,take() 返回 None,整个 Poll 变成 Ready(None)——结束。
  • size_hint() 返回精确的大小——因为我们事先就知道 body 有多少字节。
  • type Error = Infallible:这种 body 不可能产生错误。

Infallible 是 Rust 标准库里”表示永远不会发生的错误”的类型——它是一个空 enum,没有任何 variant,所以任何返回 Result<T, Infallible> 的函数实际只可能返回 Ok。Rust 编译器对这种类型有特殊优化,不会生成不可达的错误处理代码。

10.6.1.5 Empty<D> 的 zero-sized type 优化

除了 Full<D>、http-body-util 还提供 Empty<D>——表达 “这个 body 没有任何数据”。看它的定义:

// http-body-util/src/empty.rs 简化
pub struct Empty<D> {
    _marker: PhantomData<fn() -> D>,
}

impl<D: Buf> Body for Empty<D> {
    type Data = D;
    type Error = Infallible;

    fn poll_frame(self: Pin<&mut Self>, _cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<D>, Self::Error>>>
    {
        Poll::Ready(None)
    }

    fn is_end_stream(&self) -> bool { true }
    fn size_hint(&self) -> SizeHint { SizeHint::with_exact(0) }
}

Empty<D> 内部只有 PhantomData——零大小。Rust 保证 size_of::<Empty<D>>() == 0——无论 D 是什么。

为什么要 generic D——因为 Body::type Data = D——即使 empty、也需要声明 Data 的类型才能满足 trait。PhantomData 让编译器只在类型层面记住 D、运行时不占空间。

is_end_stream 返回 true——上层立刻知道 “这个 body 是空的、不用等后续 poll_frame”。HTTP/2 编码时就能立即发 END_STREAM flag。

size_hint 返回 with_exact(0)——精确声明 “0 字节”。上层可以精确设 Content-Length: 0

Empty vs Full 的内存差异

  • Full<Bytes>:内部 Option<Bytes>——一个 Bytes 是 16 字节,Option 的 Some/None 判断多 1 字节 + padding、总 16-24 字节
  • Empty<Bytes>:零字节

在大量 GET / HEAD / 204 场景下、每个 Response 都配一个 body——Empty 能让每个 Response 省 16 字节。Response 本身可能只有几百字节、16 字节的节省是 3-5% 的压缩——对每秒百万 QPS 的服务累积起来是可观的 RSS 减少。

这是 Rust 能做到的”运行时零开销的类型级抽象”——Python/Go 里表达 “empty body” 可能用 None / "" / nil——都是运行时有实体的值。PhantomData 让 “类型信息不占空间” 成为可能。

10.6.2 BoxBody<D, E>:类型擦除的逃生舱

// http-body-util/src/combinators/box_body.rs 精简
pub struct BoxBody<D, E> {
    inner: Pin<Box<dyn Body<Data = D, Error = E> + Send>>,
}

impl<D: Buf, E> Body for BoxBody<D, E> {
    type Data = D;
    type Error = E;
    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<D>, E>>>
    {
        self.project().inner.as_mut().poll_frame(cx)
    }
}

与第 3 章讨论的 BoxService 思路一致——用一次堆分配 + 一次虚方法调用换类型擦除。当你的 body 链路复杂到类型名变成一堆 <<<>>> 时,装进 BoxBody 就把类型统一——route 表就能有同样的 Body 类型。

Axum 的 Router::service 返回的 body 就是 BoxBody<Bytes, Error>——这就是为什么你在 Axum handler 的返回类型里经常看到 BoxBodyAxum 内部 route 分发需要类型统一,而用户的各种 handler(返回 String、Vec、stream、None)可能产出形形色色的 body——全部统一装 box。

10.6.2.5 BoxBodyPin<Box<dyn Body + Send>> 层叠

BoxBody 的内部类型是一个三层 wrap:

pub struct BoxBody<D, E> {
    inner: Pin<Box<dyn Body<Data = D, Error = E> + Send>>,
}

从内到外

  1. dyn Body<Data = D, Error = E> + Send——trait object、擦除具体类型、要求跨线程
  2. Box<...>——堆分配、让 DST(dynamically sized type)有固定大小
  3. Pin<Box<...>>——pin 语义、让 Body 的 self-referential future 不被 move

三层各有必要

  • 没有 dyn——类型不擦除、BoxBody<D, E> 会跟具体 Body 类型耦合、失去统一作用
  • 没有 Box——dyn Body 的大小运行时才知道、没法作为字段存储
  • 没有 Pin——某些 Body 实现(比如自己持有 Future 的)不 move-safe、直接存会 UB

代价

  1. 一次堆分配(构造时)
  2. 每次 poll_frame 一次 vtable 查找(虚函数调用)

需要跨多种 body 类型统一的 route/middleware 场景、这个成本可以接受。不需要统一的场景(单一 body 类型贯穿)、直接用具体类型 + 泛型、省掉 BoxBody。

+ Send bound 保证 BoxBody 可以跨线程——对 axum / actix 的 multi-threaded runtime 必需。有些 Body 实现(比如 Rc<Cell<Bytes>> 这种)不 Send——编译期就拒绝装进 BoxBody。

10.6.2.6 BoxCloneBody——Clone 能力的特殊 BoxBody 变体

除了普通的 BoxBody<D, E>、http-body-util 还有一个 BoxCloneBody<D, E>——要求 inner body + Clone

pub struct BoxCloneBody<D, E> {
    inner: Pin<Box<dyn CloneBody<Data = D, Error = E> + Send>>,
}

trait CloneBody: Body {
    fn clone_box(&self) -> Pin<Box<dyn CloneBody<...> + Send>>;
}

为什么要这个变体?——因为 普通 dyn Body 不是 Clone(trait object 不能自动 impl Clone)、而某些场景需要 clone body(重试、fork)。

如何给 trait object 加 Clone?——经典的 Rust idiom:在 trait 里定义 clone_box(&self) -> Box<dyn Trait>、然后手动 impl Clone for BoxCloneBody 调用 clone_box

用户视角:

let body: BoxCloneBody<Bytes, Error> = ...;
let retry_body = body.clone();  // 现在可以 clone!

内部每次 clone 都会堆分配一个新 Box——不便宜、但在重试场景下可接受(重试本身已经是慢路径)。

对比:

  • BoxBody——最基础、不 Clone
  • BoxCloneBody——可 Clone、代价是要求 inner body 也 Clone
  • UnsyncBoxBody——不 Send、可跨类型但只在单线程用

三种变体提供 “不同能力需求 × 不同类型约束” 的组合——用户根据场景选。这种**“一个核心 + 多个变体”** 的设计在 Rust 标准库大量出现——Arc / Rc / WeakMutex / RwLock / Cell / RefCell

10.6.3 Limited:强制限制 body 大小

这是一个非常重要的安全中间件。

// http-body-util/src/limited.rs 精简
pub struct Limited<B> {
    inner: B,
    remaining: usize,
}

impl<B: Body> Body for Limited<B> {
    type Data = B::Data;
    type Error = Box<dyn Error + Send + Sync + 'static>;

    fn poll_frame(...) -> Poll<Option<Result<Frame<B::Data>, Self::Error>>> {
        // 从 inner 拉一帧
        let frame = std::task::ready!(inner.poll_frame(cx));
        match frame {
            Some(Ok(f)) => {
                if let Some(d) = f.data_ref() {
                    let len = d.remaining();
                    if len > self.remaining {
                        // 超限——产出错误
                        return Poll::Ready(Some(Err(LengthLimitError.into())));
                    }
                    self.remaining -= len;
                }
                Poll::Ready(Some(Ok(f)))
            }
            // ...
        }
    }
}

逻辑:读一帧 → 检查 remaining → 扣除 → 返回帧(或超限错误)

这是 HTTP 服务端的必备防护——没有 Limited 的 body 就是 DoS 攻击向量。攻击者可以发一个 Content-Length: 99999999999 的请求(或者 chunked encoding 不断发数据),你的服务端如果 naively 全读进内存就挂了。Axum、Actix-web、warp 等 HTTP 框架默认会把 body 套 Limited——通常 2MB 或 4MB 的上限,可配置。

10.6.4 Limited 的 error 类型选择:Box<dyn Error + Send + Sync>

Limitedtype ErrorBox<dyn Error + Send + Sync + 'static>——和 inner body 的 Error 不同:

impl<B: Body> Body for Limited<B> {
    type Data = B::Data;
    type Error = Box<dyn Error + Send + Sync + 'static>;
    // ...
}

为什么要 type-erase error?——因为 Limited 有两种错误来源

  1. inner body 的错误B::Error
  2. Limited 自己产生的 LengthLimitError

如果 type Error = B::Error——Limited 自己的错误类型必须塞进 B::Error 里(要求 B::Error impl From<LengthLimitError>)——不可能对所有 B 都成立

如果 type Error = LengthLimitError——inner body 的错误不能传递——丢信息。

唯一通用的解决办法就是 Box<dyn Error>——把两种错误都装进去、上层用 downcast 分辨是哪一种。

代价:每次 poll_frame 中出错时做一次堆分配(把错误装 Box)。只在错误路径、正常路径完全没有——可接受。

这就是 http-body-util 作为中间件层的 trade-off——功能通用性(支持任意 Body)胜过类型精确性(保留 B::Error)。用户需要精确类型的场景、可以自己写不带类型擦除的 limited body。

对比 BoxBody 的 type-erase 发生在 type Datatype Error——完全擦除用户类型;Limited 只 erase Error——Data 保留 B::Data。两种 erase 的粒度不同、各服务不同场景。

10.7 hyper::body::Incoming:把三种来源统一成 Body

现在我们来看 Hyper 这一端——当一个 HTTP 请求的 body 真实从网络上到达 Hyper 时,它是什么类型?答:hyper::body::Incoming

// hyper/src/body/incoming.rs:52-75
pub struct Incoming {
    kind: Kind,
}

enum Kind {
    Empty,
    Chan {
        content_length: DecodedLength,
        want_tx: watch::Sender,
        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
        trailers_rx: oneshot::Receiver<HeaderMap>,
    },
    H2 {
        content_length: DecodedLength,
        data_done: bool,
        ping: ping::Recorder,
        recv: h2::RecvStream,
    },
    #[cfg(feature = "ffi")]
    Ffi(crate::ffi::UserBody),
}

Incoming 是一个多态结构——它可能是三种 body 来源之一:

  • Empty:没有 body(GET 请求、204 响应、HEAD)。
  • Chan:HTTP/1 路径——hyper 的 HTTP/1 解析器把 body chunks 通过 mpsc 送来,want_tx 是一个反向信号(告诉解析器调用方希望继续读)。
  • H2:HTTP/2 路径——直接持有 h2::RecvStream,带 flow control 和 ping 逻辑。
  • Ffi:FFI 用户 body(C API 写的)。

所有这些”来源”对外统一暴露成一个 Body impl——用户只看到 Incoming.poll_frame() 之后返回 frame。内部是哪条路径调用方根本不知道、也不需要知道。

这是 Hyper 1.x 对多协议的优雅处理:协议实现的差别藏在 enum variant 里,对外的 trait impl 把它们统一。看 Incoming 的 Body impl(文件后半段):

impl Body for Incoming {
    type Data = Bytes;
    type Error = crate::Error;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Result<Frame<Bytes>, crate::Error>>>
    {
        match self.kind {
            Kind::Empty => Poll::Ready(None),
            Kind::Chan { ref mut data_rx, ref mut trailers_rx, .. } => {
                // 从 mpsc 拉数据;结束后看 oneshot 拿 trailers
                ...
            }
            Kind::H2 { ref mut recv, .. } => {
                // 调 h2 的 RecvStream
                ...
            }
            ...
        }
    }
}

三种实现、同一个 trait、同一个对外 API。这是协议多态最好的例证。

10.6.5 channel() 生产的 (Sender, Body) 对的生命周期

http-body-util::channel() 返回一对 (Sender<D, E>, Body<D, E>)——生产者端 Sender 可以 send_datasend_trailersclose;消费者端 Body 实现了 Body trait、poll_frame 从 channel 拉数据。

这对值得看生命周期设计

Sender::send_data(&mut self, data: D) 是 async——会 channel buffer 空出位置才 return。这是生产者侧的背压:消费者读得慢、生产者也慢下来。

Body::poll_frame 在 channel 空时返回 Pending——消费者侧自然等待、不 busy wait。

Sender drop 时 channel 关闭——Body 下次 poll_frame 读到 None、优雅结束。所有权语义自动管理生命周期——不需要显式 close。

Body drop 时 Sender 得知——下次 send_data 返回 SenderClosed 错误——生产者知道”没人听了”、可以主动停止生成数据、避免浪费 CPU。

这种 两端 drop 感知对方 的设计在 Rust 里通过 Arc + 原子计数 + Waker 实现——任何一端 drop 会把 weak count 减 1、剩下一端通过这个变化感知

典型用法是 SSE(Server-Sent Events)

let (sender, body) = channel::<Bytes, Error>();

tokio::spawn(async move {
    loop {
        let event = generate_event().await;
        if sender.send_data(event).await.is_err() {
            break;  // 客户端断开了、停止生成
        }
    }
});

Response::builder().body(BoxBody::new(body)).unwrap()

客户端断连、sender send_data 报错、spawn 的任务自动结束——没有任务泄漏、没有需要显式 cleanup。

这是 Rust 所有权系统最优雅的应用之一——资源生命周期和所有权绑定、drop 语义触发清理、让并发编程里最难的 “如何知道对方已经走了” 变成免费获得的信息。

10.6.5.5 Sender::try_send_data vs send_data 的 sync 变体

channel::Sender 实际提供了两个 send 变体

impl<D, E> Sender<D, E> {
    // sync 版——不等待
    pub fn try_send_data(&mut self, data: D) -> Result<(), TrySendError<D>> { ... }

    // async 版——可能等
    pub async fn send_data(&mut self, data: D) -> Result<(), SendError> { ... }
}

try_send_data

  • 不阻塞——立即返回
  • 如果 channel 满、返回 TrySendError::Full(data)(把数据还给你)
  • 如果 channel 关闭、返回 TrySendError::Closed(data)

send_data

  • async——channel 满时等待消费者读
  • 只在 channel 关闭时返回 error

为什么提供两个版本?——不同场景有不同需求:

  • 正常流式上传——async 版、自然背压
  • 实时数据(log、metric)——sync 版、宁可丢数据也不阻塞生产者(丢 metric 比让业务卡住好)
  • 测试里的 setup——sync 版、直接确认是否 enqueue

try_send_data 返回 data 让 caller 决定怎么处理——可以丢、可以放 retry queue、可以 log 告警。这种”失败也不吞数据”的 API 对可观测性友好。

对比 Node.js 的 stream.write(chunk)——返回 bool 表示是否还能继续写、但数据是否已入队你不知道。Rust 这套 API 精确区分 “已入队” 和 “想入队但没位置——让错误处理每一步都精确。

10.9.7 http-body 和 bytes 的协同——零拷贝传递

http-body 的 type Data: Buf 约束让每一帧的数据可以是任意 Buf 实现——bytes::Bytesbytes::BytesMut&[u8]Vec<u8>Chain<A, B> 等。

典型流转

TCP → BytesMut (read 填充)
     ↓ freeze
   Bytes (read-only、引用计数)
     ↓ hyper::Incoming 包装
   Frame<Bytes>
     ↓ BodyExt::frame().await
   用户代码看到 Frame<Bytes>
     ↓ frame.into_data().unwrap()
   Bytes
     ↓ .chunk() 得 &[u8]
   zero-copy 传给 serde / HTTP body parser / ...

整条链路零拷贝——TCP read 时一次 memcpy、之后 Bytes 在各层间只是 Arc refcount 的增减、从不复制字节。

对比不用 Buf trait 的设计——比如每帧都是 Vec<u8>——每次接口转换都要 Vec::from_slice 重新分配——一个 100KB 的响应经过 5 层 middleware 就是 500KB 堆分配 + 500KB memcpy。

Buf trait 把 “字节来自哪里、怎么存” 的问题和 “怎么消费字节” 的问题解耦——消费者只看 Buf 接口、不关心底层是 slice、Bytes、Chain 还是 mmap 映射。

这个解耦在 HTTP/2 和 gRPC 场景最明显——h2 crate 的 RecvStream 产出的是 Bytes、gRPC codec 用 protobuf 直接 parse Bytes、不需要中间 Vec 转换。整条 pipeline 从 kernel 到业务代码、数据不被复制一次。

这就是 零拷贝生态 的真实含义——不是一两个接口零拷贝、而是生态里所有库共享 Buf trait 协议、数据在库之间传递时永远是 refcount 操作。

落到你键盘上:

10.7.0 Incoming 不暴露构造函数的封装严谨性

hyper::body::Incoming 没有公开的 new 函数——只能通过 Request::into_body() / Response::into_body() 从 hyper 内部获取。

为什么这么严?——因为 Incoming 的几种 variant(Chan / H2 / Ffi)都需要和 hyper 内部状态机紧密配合

  • Chan variant 的 want_txdata_rx 必须对应一个活跃的 HTTP/1 连接——不配对的话上层 poll_frame 永远 Pending
  • H2 variant 的 recv: h2::RecvStream 必须来自真实的 HTTP/2 连接——否则 flow control 乱套
  • Ffi variant 涉及 C API 内存模型——需要特殊初始化

用户构造不出”合法”的 Incoming——就避免了意外用错。需要自己的 Body 类型、用 http-body-util::FullEmptyStreamBody 等。

这种 pub struct 但 constructor 不 pub 的模式是 capability-based design——类型的存在意味着持有 capability(“你有 Incoming = 你有一个活跃的 HTTP 连接”)。用户无法假造一个 Incoming 绕过协议。

如果未来 hyper 想加 Kind::Quic 支持——公开 API 不会变。加 variant + 新的内部构造路径就完事。

10.7.1 Chan 的反向信号:want_tx

Chan variant 里有一个字段你可能没见过:want_tx: watch::Sender

作用:告诉 HTTP/1 parser “调用方现在还想继续读 body”。Hyper HTTP/1 parser 不会无脑从 TCP 读数据填 mpsc——它会等调用方 poll_frame 时发出”想要”信号。这是一种 body-level 的背压:如果 handler 处理 body 慢,parser 不会把内存填爆。

HTTP/2 不需要这个 mechanism——h2 crate 自己的 flow control window 处理了背压。HTTP/1 因为没有线路级 flow control,需要应用层配合。

这是hyper 工程上最被忽视但至关重要的一块——它让 HTTP/1 上的大文件流式上传在 Rust 里也能正确应用背压。第 11 章读 HTTP/1 wire 时会把 want_tx / want_rx 的协议完整讲清楚。

10.7.2 DecodedLength 类型的三态语义

Incoming::ChanIncoming::H2 都有一个 content_length: DecodedLength 字段。DecodedLength 是 hyper 内部的一个小结构、封装 u64 + 特殊值

// 概念性
pub(crate) struct DecodedLength(u64);

impl DecodedLength {
    pub(crate) const CLOSE_DELIMITED: DecodedLength = DecodedLength(u64::MAX);
    pub(crate) const CHUNKED: DecodedLength = DecodedLength(u64::MAX - 1);
}

u64::MAXu64::MAX-1 作为特殊哨兵值——表达 “未知长度(close-delimited)” 和 “chunked encoding”。其他值就是真实的 Content-Length。

为什么不用 enum?——因为 Incoming 存放于每个请求对象里、可能有几千到几百万个对象同时存在。enum 会因 discriminant + padding 变成 16 字节(u64 + 1 字节 tag + 7 字节 padding)——而压缩成 u64 就是 8 字节每个对象省 8 字节、100 万对象省 8MB

代价——if length == DecodedLength::CLOSE_DELIMITEDmatch length { DecodedLength::CloseDelimited => ... } 略难读。但是封装到 DecodedLength 类型内部、外部用户看到的是 .is_close_delimited() 这样的方法——可读性不受影响。

这种 用魔数挤进 bit 省内存 的技巧在低层库里常见——Rust 的 Option<NonZeroU64> 就是编译器自动做的同样事。但编译器的优化有限——DecodedLength 这种自己 handcraft 的魔数才有更精细的控制权。

类似设计在 kernel 数据结构里大量出现——u64 承载多种 semantic 值、用魔数区分、省掉 tag 字节。

10.8 一段完整链路

把所有这些串起来——从 TCP 字节到 handler 消费 body——是这样:

TCP stream

hyper proto::h1::Dispatcher 解析 header

构造 Request<Incoming>

Incoming::Chan { data_rx, want_tx, ... }
    ↓                       ↑
    └── mpsc 发数据 ←── h1 parser 读 TCP
    
handler 拿到 Request<Incoming>

while let Some(frame) = body.frame().await?

    frame.data()  /  frame.trailers()

    handler 处理

上链路里每一个箭头都是一次 .await——整条链是惰性驱动的。handler 不消费,parser 不发;parser 不发,TCP 不读。这就是”端到端的流式处理”在 Rust 里的落地形态。

10.8.5 BodyExt trait 的 “extension method” 模式

http-body-util 有一个 BodyExt trait——给所有 Body 自动加上 combinator 方法

pub trait BodyExt: Body {
    fn frame(&mut self) -> Frame<'_, Self>
    where Self: Unpin,
    { ... }

    fn collect(self) -> Collect<Self>
    where Self: Sized,
    { ... }

    fn boxed(self) -> BoxBody<Self::Data, Self::Error>
    where Self: Sized + Send + 'static,
    { ... }

    fn map_err<F, E>(self, f: F) -> MapErr<Self, F>
    where Self: Sized, F: FnMut(Self::Error) -> E,
    { ... }

    fn map_frame<F, B>(self, f: F) -> MapFrame<Self, F>
    where Self: Sized, F: FnMut(Frame<Self::Data>) -> Frame<B>,
    { ... }
}

impl<T: Body> BodyExt for T {}

blanket impl impl<T: Body> BodyExt for T {}所有 Body 自动获得这些方法——用户只要 use http_body_util::BodyExt 就能调 body.frame().awaitbody.collect() 等。

为什么不把这些方法直接放 Body trait?——因为:

① Body trait 要保持极简——trait 方法多了、实现负担大。把可选方法放 BodyExt、核心 trait 只有 3 个方法。

② 冒烟避免 trait object 大小膨胀——trait object dyn Body 的 vtable 只包含必需方法。可选方法在 vtable 外、dyn Body 更紧凑。

③ 扩展性——http-body 的 semver stability 要高(被所有 HTTP 库依赖)、频繁加方法破坏稳定性。BodyExt 在 http-body-util 里、可以更自由地演进。

这就是 Rust 社区的 core trait + ext trait 模式——参考 Iterator / IteratorExtRead / ReadExtFuture / FutureExt——core trait 描述最小能力、ext trait 提供便利组合子

这种分层让两个目标同时达成:core trait 稳定、可选方法演进

10.9 实战:一个最小 JSON 流解析器

我们用本章和前几章的知识写一个”流式 JSON 数组解析器”——不把整个 body 读进内存,一边读一边解析。

use http_body_util::BodyExt;
use hyper::body::Incoming;
use serde_json::Deserializer;

async fn stream_parse(body: Incoming) -> Result<Vec<Item>, BoxError> {
    let mut items = Vec::new();
    let mut buffer = bytes::BytesMut::new();

    let mut body = std::pin::pin!(body);
    while let Some(frame) = body.as_mut().frame().await {
        let frame = frame?;
        if let Some(data) = frame.data_ref() {
            buffer.extend_from_slice(data.chunk());

            // 尝试增量解析
            let de = Deserializer::from_slice(&buffer).into_iter::<Item>();
            let mut consumed = 0;
            for result in de {
                match result {
                    Ok(item) => {
                        items.push(item);
                        consumed = de.byte_offset();
                    }
                    Err(e) if e.is_eof() => break,  // 数据不足,等下一帧
                    Err(e) => return Err(e.into()),
                }
            }
            buffer.advance(consumed);
        }
    }
    Ok(items)
}

这段代码真实可用,关键点:

  • body.frame().await 来自 BodyExt——把 poll_frame 包成 async fn
  • BytesMut 做增量缓冲——每收到一帧追加,serde 消费多少就 advance 多少。
  • 解析出 EOF 错误不当失败——是”数据不足”的信号,等下一帧。
  • 永远不把整个 body 读进内存——memory footprint ≈ 一帧 + 未消费字节 + 一个 JSON 对象。

这是 Rust 流式处理的”标准配方”。结合卷四《Serde 元编程》第 4 章(Deserializer 与 Visitor)——你会发现 serde 的 Visitor 设计天然适合这种”增量输入、增量产出”场景。http-body 的流和 serde 的流可以在同一条 async 链上共存。

10.9.5 跨章节呼应——Body 在 HTTP 栈里的位置

与第 11 章(HTTP/1 wire)的呼应——第 11 章的 Decoder::Kind::{Length, Chunked, Eof} 对应 Incoming 的 Chan variant 里的 data_rx。Decoder 是低层字节→Frame 的产生者、Incoming 是高层 Body trait 的包装——数据流就是 Decoder → mpsc → Incoming → BodyExt → handler。

与第 16 章(h2 flow control)的呼应——HTTP/2 Body 通过 h2::RecvStream 消费数据、每次 poll_frame 自动发 WINDOW_UPDATE。http-body 的 poll_frame 节奏 ≈ HTTP/2 的 flow control 节奏——读得慢、对端发得慢。

与 React ch6 的呼应——React 的 Frame(mutation effect)和 http-body 的 Frame(data/trailers)都是 “一帧处理一次 的粒度——前者是 DOM 操作、后者是字节。帧化是避免 “全量同步处理”** 的通用设计。

与 langgraph ch17 的呼应——langgraph 的 Send/Command 是图上的消息帧、http-body 的 Frame 是流上的数据帧——两者都是在执行流上携带结构化信息的原子单元。不同抽象层上的同一种思维。

10.9.6 本章收束的八条工程原则

① 极简 required、丰富 provided(§10.2.2)——Body trait 只有 poll_frame 必需、其他有合理默认。

② struct 包装 enum 保 API 稳定(§10.4.0)——Frame 的 newtype 让未来加 variant 不破坏用户代码。

Result<T, Self> 失败归还(§10.4.2.5)——into_data 失败把原对象还给调用方。

Option 相等判断处理 unknown(§10.5.0)——Some(lower) == upper 自然处理 upper=None。

assert! 维护 contract(§10.5.1.5)——违反 invariant 就 panic、不给 silent bug 机会。

#[inline] 跨 crate 小方法(§10.5.3)——getter 跨 crate 零成本需要显式标记。

⑦ PhantomData 零大小带类型(§10.6.1.5)——Empty 既有 generic 又零字节。

⑧ core trait + ext trait 分层(§10.8.5)——稳定性和易用性兼得。

这八条每一条都能在其他成熟 Rust 库里找到 echo——Iterator / Read / Write / Stream 的核心扩展都遵循。把 http-body 放在这个谱系里看、就能理解为什么它的设计几乎没有争议——它只是把已经被验证的 Rust 生态模式应用到 HTTP body 上

10.10 小结与落到你键盘上

本章要点:

  1. Body traitpoll_frame 把”HTTP body”抽象成”Frame 流”——每帧可能是 Data 或 Trailers。
  2. Frame<T> + SizeHint 是小而精确的数据原语,甚至在单元测试里把加法的数学证明写进 match。
  3. http-body-util 提供常用 impl:EmptyFullStreamBodyBoxBodyLimitedLimited 是安全防护,生产 HTTP 服务端必配。
  4. hyper::body::Incoming 用 enum 把 HTTP/1 mpsc、HTTP/2 recv stream、FFI body 统一成一个 Body impl。
  5. Chan variant 的 want_tx 在 HTTP/1 上提供 body-level 背压——hyper 工程细节里很少被讲的关键一环。

10.10.5 Body 设计对比:Rust vs Node vs Python

把 http-body 的设计和其他语言的同类库对比——能看清 Rust 哲学的独特性:

Node.js (Readable Stream)

stream.on('data', chunk => { /* ... */ })
stream.on('end', () => { /* ... */ })
stream.on('error', err => { /* ... */ })
  • 事件驱动、多个 listener 同时能订阅
  • 错误通过独立 event 传、容易漏绑 error handler 导致 uncaught
  • 背压通过 pause()/resume() 手动控制——容易忘

Python (aiohttp StreamReader)

async for chunk in response.content:
    process(chunk)
  • async iterator、语法糖漂亮
  • 错误通过 exception 传、和 Python async 一致
  • 没有 trailer 支持——aiohttp 必须特殊 API 查

Rust (http-body)

while let Some(frame) = body.frame().await? {
    // frame 可能是 Data 或 Trailers
}
  • Future poll 驱动、背压自动传导(poll 的节奏就是数据节奏)
  • 错误通过 Result 传、编译器强制处理
  • Trailer 和 Data 统一为 Frame——一个循环处理两者

三种设计的核心差别

NodePythonRust
错误处理eventexceptionResult
背压手动自动(GIL)自动(Future)
Trailer 支持特殊 API特殊 API统一 Frame
类型安全动态部分编译期
零拷贝有(Buffer)有限原生

Rust 不一定所有维度都最优——但组合起来是 “最可预测、最难出 bug” 的设计async/await + Result + Frame enum + Buf trait 四个正交抽象组合、表达出别的语言要三倍代码才能表达的语义。

这不是 “Rust 好别人差”——每种语言的选择都契合自己的哲学。但对想在 Rust 里写 HTTP 服务的人、理解 http-body 的设计就是理解 Rust 怎么看待 I/O

10.10.6 小结与落到你键盘上

落到你键盘上:

  • http-body-util/src/channel.rs——它实现了 (Sender, Body) 对,是做 SSE / 流式响应时最实用的工具。
  • 给 Axum 代码加 Limited——如果你还没做,立即加上 tower_http::limit::RequestBodyLimitLayer::new(2 * 1024 * 1024)
  • 实验 BodyExt::collect()——把一个 Incoming 一次性 collect 成 Vec<u8>。跑一次之后读它的源码(大约 50 行),你会明白所有 body 的 .await 链本质上是在做什么。

10.12 回答一个常见问题:为什么不能直接在 Request 里存 Bytes

新手常问:“我就是想处理 JSON、body 几百字节、直接存 Request<Bytes> 不就完了吗?为什么要搞流式?”

答案分三层:

① 能不能——可以Request<Bytes> 完全合法——BytesBuf、满足 Body 的某些实现(Full<Bytes> 或自定义)。

② 会不会导致问题——看上下文

  • 如果你的 handler总是一次性 collect body——用 Bytes 等价、没什么坏处
  • 如果你的 handler 将来可能接 file upload、multipart、SSE、gRPC——必须保留 Body trait、现在改动手术成本低

③ 为什么 Rust 生态默认选择流式而不是 Bytes——因为:

  • 框架必须对所有用户通用——有人做 JSON API、有人做视频流——流式是超集
  • 流式几乎没有额外开销Full<Bytes> 只是一个 Option + PhantomData)——不 stream 省不了几个字节
  • 类型系统让转换免费:已有 Bytes 随时可以 Full::new(bytes) 变成 Body、collect().await 反向转换——两种表示可以互转

正确的 mental model 是Body 是超集、Bytes 是特例。超集统一生态、特例保持简单。如果你写业务代码、大部分时候用 body.collect().await?.to_bytes() 把 Body 一次性变 Bytes 就好——工具链提供这个便利。

这和 §7.12 讲的 “应用开发者也要懂基础库” 是同一个主题——懂了底层设计、你才能自信地用一行 collect 解决 99% 的场景、同时知道 1% 场景来了怎么切换

不懂底层的开发者会在 “应该 stream 还是应该 collect” 上反复纠结、或者直接用错。懂了底层、选择变成二秒决定。这就是基础知识的杠杆效应——一次学习、长期受益。

10.13 Either<L, R> body——条件性响应的优雅表达

http-body-util 还提供一个常见工具:Either<L, R>——两种 Body 类型的联合

// either.rs 简化
pin_project! {
    #[project = EitherProj]
    pub enum Either<L, R> {
        Left { #[pin] inner: L },
        Right { #[pin] inner: R },
    }
}

impl<L, R> Body for Either<L, R>
where L: Body, R: Body<Data = L::Data, Error = L::Error>,
{
    type Data = L::Data;
    type Error = L::Error;

    fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<...> {
        match self.project() {
            EitherProj::Left { inner } => inner.poll_frame(cx),
            EitherProj::Right { inner } => inner.poll_frame(cx),
        }
    }
}

Either 让 handler 可以根据条件返回不同类型的 Body

async fn handler(req: Request<Incoming>) -> Response<Either<Full<Bytes>, BoxBody<Bytes, Error>>> {
    if is_cached(&req) {
        Response::new(Either::Left { inner: Full::new(cache_data) })
    } else {
        let stream = dynamic_body_stream(&req);
        Response::new(Either::Right { inner: BoxBody::new(stream) })
    }
}

代替 BoxBody 的场景

  • 如果你只有两种已知类型——Either<L, R> 完全静态、零堆分配
  • 如果你有多种未知类型——必须用 BoxBody

Either 是 Rust 生态共享的惯用抽象——http-body-util 有、axum 用、tower-http 也用、futures 有类似的 Either 组合未来——同一个名字跨多个库、语义一致。

如果要 3 种类型、可以嵌套 Either<A, Either<B, C>>、继续下去到任意深度——完全类型化、不是运行时多态。代价是嵌套超过 3 层可读性变差——这时就换 BoxBody。

这种 2 特殊情形专用工具、N 情形回到动态分发 的模式又一次体现了 Rust 生态的 pragma——不求一个工具解决所有问题、提供一组工具让用户组合选择

10.14 本章总结——http-body 是 Rust HTTP 生态的”lingua franca

读到本章结尾、你应该意识到一件事——http-body 不只是一个小 crate、是整个 Rust HTTP 生态的公共语言

从最底层的 hyper、h2、到中间层的 tower-http、http-body-util、到上层的 axum、actix-web、tonic、reqwest——所有库都共享 Body trait 作为 body 的抽象。这个共享让:

  • hyper 生产的 Incoming 能被任何 HTTP 框架消费
  • axum 产出的 Body 能被任何 HTTP 客户端发送
  • tower middleware 能插入到任何 Body 链上

这种生态级别的 interface 标准化是 Rust 社区早期就达成的共识——不搞”一个大框架吞所有”、而是”小 trait 做 lingua franca、允许无数实现百花齐放”。

对比 Go 的 net/http.Body(一个 io.ReadCloser)——太通用、没有 Frame、Trailer、SizeHint 的结构化信息——导致 Go 里的 HTTP/2 trailer 支持一直是二等公民。

对比 Node.js 的 Readable Stream——事件驱动、没有类型约束——不同库的 stream 实现各说各话、互操作要写适配层。

Rust 的 http-body 只定义了 3 个方法 + 2 个关联类型——最小可行的共同基础。小到所有库愿意实现、大到足够表达 HTTP 的全部复杂性(Data、Trailer、大小提示、流结束标记)。

这就是 Rust 生态的组织美学——共享最小基础、垂直堆叠扩展。理解了这一点、你就理解了为什么 Rust 生态看起来 crate 多、但每个 crate 都恰好做一件事、合起来覆盖完整功能。对初学者陡峭、但对老用户极度自由——每一层都能替换、每一层都能审视。

下一章开始 Hyper HTTP/1 的硬核部分——我们读 httparseproto::h1::Encoderproto::h1::Decoder,看字节流如何被解析成你熟悉的那些 http:: 类型。

10.11 BodyExt::collect() 的实现推导

BodyExt::collect() 看起来是一个简单的 async fn collect() -> Collected<D>——但它的实现有意思。思路:

// 概念性
pub struct Collect<B> {
    inner: B,
    data: Vec<B::Data>,
    trailers: Option<HeaderMap>,
}

impl<B: Body> Future for Collect<B> {
    type Output = Result<Collected<B::Data>, B::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        loop {
            match ready!(self.inner.as_mut().poll_frame(cx)) {
                Some(Ok(frame)) => {
                    if let Some(d) = frame.data_ref() { self.data.push(d.clone()); }
                    else if let Some(t) = frame.trailers_ref() { self.trailers = Some(t.clone()); }
                }
                Some(Err(e)) => return Poll::Ready(Err(e)),
                None => return Poll::Ready(Ok(Collected {
                    data: self.data.drain(..).collect(),
                    trailers: self.trailers.take(),
                })),
            }
        }
    }
}

核心逻辑:loop 读所有 frame、Data 存到 Vec、Trailers 存到 Option、流结束时打包成 Collected。

两个值得注意的点

data: Vec<B::Data> 而不是 Vec<u8>——保留每一帧的 Buf 类型、不合并。消费者可以拿到 Vec<Bytes>、用 Collected::aggregate() 零拷贝连成一个 Buf——或者直接 iter 处理每一帧。

loop { ready!(...) } 贪心吸收——一旦 poll_frame Ready、立刻进下一轮、直到 Pending 再挂起。这样一次 Future poll 能消费任意多的 frame——避免了 “读一帧 yield、读一帧 yield” 的低效。

collect 是 把流式 Body 整体化 的最简单工具——但要注意它无限吸收所有数据——对不可信输入必须先 Limited

let bytes = body
    .into_limited(2 * 1024 * 1024)  // 2MB 限制
    .collect()
    .await?
    .to_bytes();

这是 axum 里收 JSON body 的典型组合——先 limit 再 collect。如果跳过 limit——攻击者发大 body 能 OOM。collect 本身没防御能力、防御在上层。