Appearance
第10章 http-body 与 Body trait:frame、trailers、size_hint
10.1 "body 到底是什么"这个被忽视的问题
上一章我们读完了 http crate——整个 HTTP 数据模型的骨架。但骨架里有一个地方故意留空:body。
Request<T> 和 Response<T> 里的 T 是什么?答案是"随便什么"——String、Vec<u8>、()、Bytes、Incoming、BoxBody……任何类型都可以放进去。这不是 API 设计偷懒——body 本身就是一个独立的抽象问题,它值得一个独立的 crate。
这就是 http-body 的角色。它定义了一个很短的 trait:
rust
// http-body/src/lib.rs:38-88
pub trait Body {
type Data: Buf;
type Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;
fn is_end_stream(&self) -> bool { false }
fn size_hint(&self) -> SizeHint { SizeHint::default() }
}三个方法,十几行。但它是整个 Rust HTTP 生态"body 作为流"的公共语言。这一章我们把它、以及它的小伙伴 Frame / SizeHint / http-body-util / hyper::body::Incoming 全部读一遍。
源码锁定:http-body 1.0.1 / http-body-util 0.1.3(commit c8cb37f)。
10.2 为什么要把 body 做成"流"
Naive 的 HTTP 服务器会把整个 body 读进一个 Vec<u8> 再交给 handler 处理。这种做法在小请求(JSON API)上合理,但在三种场景下会灾难:
- 上传大文件:100MB 的文件请求,Vec 化之后 100MB 都在服务进程的堆上。1000 个并发上传 = 100GB 内存 = OOM。
- 流式响应:服务端一边生成数据一边响应(SSE / 流式 JSON / 视频)。等生成完才发——用户感觉卡死。
- HTTP/2 stream:HTTP/2 的流天生是分帧的,一个 frame 一个 frame 到,如果要求组装成 Vec 会阻塞流控。
所以 body 必须是一个惰性、增量、双向驱动的流——调用方 poll_frame 拉一帧、用完释放、再拉下一帧。http-body::Body 正是这个抽象。
10.2.1 type Data: Buf 而不是 Vec<u8>
Body 的数据类型是 type Data: Buf——关联类型,约束到 bytes::Buf trait。
为什么不是具体的 Bytes 或 Vec<u8>?因为不同来源的 body 产出的类型不同:
- HTTP/1 解析器从内部环形缓冲读一段出来——那是一个
Bytes(引用计数切片)。 - 用户从 file 读一段——可能是
Vec<u8>。 - gRPC codec 产生的是预先对齐的 protobuf
Bytes。 - 测试代码里可能是
&'static [u8]。
要求所有 body 都先转成 Vec 或 Bytes 会强制一次 copy。解决办法是让 Data 类型保持泛型——只约束"能像一个 byte cursor 那样被读取",这正是 bytes::Buf 的契约。
Buf trait 本身是 bytes crate 的核心抽象——提供 chunk() -> &[u8]、advance(n)、remaining() -> usize 等方法。一个类型能做到这些,就能被当作"可推进的字节序列"使用——不管它底下是单 slice、多 slice 的 chain、还是离散的链表。
这种"不绑定具体容器类型"的 trait 设计在 Rust 生态很普遍。卷四《Tokio 源码深度解析》第 8 章(I/O Driver)里 AsyncRead 里也有类似的 "read into anywhere that implements BufMut" 的思路——把数据移入/移出的"目标类型"延迟到具体场景,是 Rust 高性能库的共同气质。
10.3 poll_frame:一次拉一帧
rust
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>;返回类型层层嵌套,但语义清晰:
Poll::Pending:下一帧还没准备好(等网络、等上游、等磁盘)。Poll::Ready(None):流结束,再也没东西了。Poll::Ready(Some(Ok(frame))):有一帧可以消费。Poll::Ready(Some(Err(e))):读取错误。
三种"有东西"的情况都塞进一个返回类型——Pending 等待、Ready+None 结束、Ready+Some 产出。这是 Rust Stream trait 的标准惯用形式。事实上如果你看 futures::Stream:
rust
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;一模一样的签名——只是把 Item 具体化成了 Result<Frame<Data>, Error>。Body 本质就是一个"会产出 Frame 的 Stream",只不过带了一些 HTTP 特有的 helper 方法。
10.3.1 为什么不直接用 Stream trait
这是个好问题,有两个答案。
第一,Stream trait 曾经不稳定。futures::Stream 在 futures 0.3 时代是第三方 trait,进入 core 的过程持续了很多年。http-body 不想依赖一个外部 crate 的 trait——于是自己定义。
第二,语义更精确。Stream 是通用"连续产出 Item"的抽象;Body 明确表达"这是一个 HTTP body"。两个 helper 方法(is_end_stream、size_hint)是 HTTP 特有的——HTTP/2 需要 is_end_stream 提示来决定是否发 END_STREAM flag;size_hint 影响 Content-Length header。把这些放在独立 trait 上比做 Stream::size_hint(不存在)更合适。
这是另一个 "基础语义 + 领域 helper"的 trait 拆分例子,和第 2 章 Service trait 与 Serializer trait 一脉相承。
10.4 Frame<T>:两种可能
rust
// http-body/src/frame.rs:3-17
#[derive(Debug)]
pub struct Frame<T> {
kind: Kind<T>,
}
enum Kind<T> {
// The first two variants are "inlined" since they are undoubtedly
// the most common. This saves us from having to allocate a
// boxed trait object for them.
Data(T),
Trailers(HeaderMap),
//Unknown(Box<dyn Frameish>),
}一个 frame 有两种可能的内容:
Data(T):字节数据,T: Buf。Trailers(HeaderMap):HTTP/2 的 trailing headers——body 结束后附带的 header。
10.4.1 Trailers 是 HTTP/2 / gRPC 的必需品
初学 HTTP 的人可能没接触过 trailers——它是 HTTP/1.1 存在但很少用、HTTP/2 普遍用、gRPC 必须用的功能。
典型用途:gRPC 把状态码放在 trailer 里。因为 gRPC 的响应体是一个 protobuf 流——请求成功还是失败、错误细节是什么——都不能放在响应头里(响应头要在 body 之前发,那时还不知道 body 处理结果)。解决方案:响应 200 OK、body 里是 protobuf 流、流结束后再发一个 trailer grpc-status: 0。所以 gRPC 客户端必须等到 trailer 才知道请求是否真的成功。
http-body 把 data 和 trailer 统一成一个 Frame——调用方 poll_frame 读到的东西可能是这两种之一:
rust
loop {
match body.poll_frame(cx).await {
Some(Ok(frame)) => {
if let Some(data) = frame.data_ref() {
// 处理字节数据
} else if let Some(trailers) = frame.trailers_ref() {
// 处理 trailing headers
}
}
Some(Err(e)) => return Err(e),
None => break, // end of stream
}
}10.4.2 那个被注释掉的 Unknown variant
rust
//Unknown(Box<dyn Frameish>),这一行是留给未来的。作者早期考虑过给 frame 加第三种 variant——"未知类型的帧",让扩展协议可以携带非 Data/Trailers 的内容。但这会引入 trait object 和堆分配——破坏 http-body 的零成本气质。最终决定注释掉——但留着这个"曾经考虑过"的痕迹,作为未来讨论的起点。
这也是读工业级开源代码的一个乐趣——注释里的"—"比代码本身还透露信息。你看到这行注释,就知道 http-body 的作者是有意决定保持 Frame 闭合——任何需要更多 frame 种类的协议都要自己扩展,不能通过 trait object 硬塞进 Frame。
10.4.3 map_data:只变换数据
rust
pub fn map_data<F, D>(self, f: F) -> Frame<D>
where F: FnOnce(T) -> D,
{
match self.kind {
Kind::Data(data) => Frame { kind: Kind::Data(f(data)) },
Kind::Trailers(trailers) => Frame { kind: Kind::Trailers(trailers) },
}
}变换数据类型,trailers 原样透传。这让你能在中间件里"把所有 Data frame 的类型从 Bytes 变成 String"而不影响 trailers。是 Frame 上最常用的 combinator。
10.5 SizeHint:一个 lower/upper bound 对
rust
// http-body/src/size_hint.rs:7-11
pub struct SizeHint {
lower: u64,
upper: Option<u64>,
}表达"body 至少有多少字节、最多有多少字节(可能未知)"。
用途:
- 服务端决定要不要发
Content-Length: Nheader。如果size_hint.exact()返回 Some(N)——设这个 header;否则用Transfer-Encoding: chunked。 - 客户端决定 body buffer 要预分配多大。取 upper 作为预分配量,避免多次 realloc。
- HTTP/2 的 flow control 决策:知道 body 有多大,可以提前预算流控窗口。
10.5.1 set_lower / set_upper 的 panic 保护
rust
pub fn set_lower(&mut self, value: u64) {
assert!(value <= self.upper.unwrap_or(u64::MAX));
self.lower = value;
}
pub fn set_upper(&mut self, value: u64) {
assert!(value >= self.lower, "`value` is less than than `lower`");
self.upper = Some(value);
}维持不变式 lower <= upper——违反就 panic。这是一个小而严肃的 defensive programming——与其容忍不合理状态传播,不如早死早超生。
10.5.2 一个被写进单元测试的数学证明
http-body 的 size_hint.rs 里有一段相当罕见的东西——一个把数学证明写进单元测试的 block(文件 100-173 行):
rust
#[test]
fn size_hint_addition_proof() {
// assuming addition itself is perfect, there are 3 distinct states:
// (_, Some(_)) + (_, Some(_)) => (_ + _, Some(_ + _))
// (_, Some(_)) + (_, None) => (_ + _, None)
// (_, None) + (_, None) => (_ + _, None)
//
// we can assert this in the typesystem! (and name them for our tests)
match (to_parts(SizeHint::new()), to_parts(SizeHint::new())) {
((_, Some(_)), (_, Some(_))) => {} // 1
((_, None), (_, None)) => {} // 2
((_, Some(_)), (_, None)) => {} // 3
((_, None), (_, Some(_))) => {}
}
...
}这段代码用 Rust 的 match exhaustiveness 来证明"SizeHint 加法的四种 upper 状态都被覆盖到了"。如果未来有人改 SizeHint 数据结构加了新 variant,这个 match 会编译失败——强制修改者更新证明。
这比把证明写在注释里强得多——证明变成了可执行的、会随代码变化自动 rechecking 的东西。这是 Rust 作者圈子里的一种美学——用类型系统辅助文档化不变式,既是注释也是检查。
10.6 http-body-util:常用实现 + combinators
trait 定义完了,谁来实现?答:http-body-util 这个 sibling crate。它提供了四类实现:
- 静态 body:
Empty<D>(没 body)、Full<D>(一整块 body)。 - 流式 body:
StreamBody<S>(包装一个Stream)、BodyDataStream<B>(反向:Body 适配成 Stream)。 - 通道 body:
channel()返回(Sender, Body),生产者写、消费者读。 - 修饰 body:
Limited(限制总字节数)、BoxBody(类型擦除)、MapErr、MapFrame。
以及一个 BodyExt trait——给 Body 添加一堆 combinator 方法(map_err、map_frame、collect()、boxed())。
10.6.1 Full<D>:最简单的 Body 实现
rust
// http-body-util/src/full.rs:10-74 精简
pub struct Full<D> {
data: Option<D>,
}
impl<D: Buf> Body for Full<D> {
type Data = D;
type Error = Infallible;
fn poll_frame(mut self: Pin<&mut Self>, _cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<D>, Self::Error>>>
{
Poll::Ready(self.data.take().map(|d| Ok(Frame::data(d))))
}
fn is_end_stream(&self) -> bool { self.data.is_none() }
fn size_hint(&self) -> SizeHint {
self.data.as_ref()
.map(|data| SizeHint::with_exact(u64::try_from(data.remaining()).unwrap()))
.unwrap_or_else(|| SizeHint::with_exact(0))
}
}20 行代码——这是一个完整的 Body 实现。分解:
Option<D>存数据:Some(d)表示"还没发",None表示"已经发完"。- 第一次
poll_frame:take()把数据拿出来,构造一个Frame::data(d)返回。 - 第二次
poll_frame:self.data已经是 None,take()返回 None,整个 Poll 变成Ready(None)——结束。 size_hint()返回精确的大小——因为我们事先就知道 body 有多少字节。type Error = Infallible:这种 body 不可能产生错误。
Infallible 是 Rust 标准库里"表示永远不会发生的错误"的类型——它是一个空 enum,没有任何 variant,所以任何返回 Result<T, Infallible> 的函数实际只可能返回 Ok。Rust 编译器对这种类型有特殊优化,不会生成不可达的错误处理代码。
10.6.2 BoxBody<D, E>:类型擦除的逃生舱
rust
// http-body-util/src/combinators/box_body.rs 精简
pub struct BoxBody<D, E> {
inner: Pin<Box<dyn Body<Data = D, Error = E> + Send>>,
}
impl<D: Buf, E> Body for BoxBody<D, E> {
type Data = D;
type Error = E;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<D>, E>>>
{
self.project().inner.as_mut().poll_frame(cx)
}
}与第 3 章讨论的 BoxService 思路一致——用一次堆分配 + 一次虚方法调用换类型擦除。当你的 body 链路复杂到类型名变成一堆 <<<>>> 时,装进 BoxBody 就把类型统一——route 表就能有同样的 Body 类型。
Axum 的 Router::service 返回的 body 就是 BoxBody<Bytes, Error>——这就是为什么你在 Axum handler 的返回类型里经常看到 BoxBody:Axum 内部 route 分发需要类型统一,而用户的各种 handler(返回 String、Vec、stream、None)可能产出形形色色的 body——全部统一装 box。
10.6.3 Limited:强制限制 body 大小
这是一个非常重要的安全中间件。
rust
// http-body-util/src/limited.rs 精简
pub struct Limited<B> {
inner: B,
remaining: usize,
}
impl<B: Body> Body for Limited<B> {
type Data = B::Data;
type Error = Box<dyn Error + Send + Sync + 'static>;
fn poll_frame(...) -> Poll<Option<Result<Frame<B::Data>, Self::Error>>> {
// 从 inner 拉一帧
let frame = std::task::ready!(inner.poll_frame(cx));
match frame {
Some(Ok(f)) => {
if let Some(d) = f.data_ref() {
let len = d.remaining();
if len > self.remaining {
// 超限——产出错误
return Poll::Ready(Some(Err(LengthLimitError.into())));
}
self.remaining -= len;
}
Poll::Ready(Some(Ok(f)))
}
// ...
}
}
}逻辑:读一帧 → 检查 remaining → 扣除 → 返回帧(或超限错误)。
这是 HTTP 服务端的必备防护——没有 Limited 的 body 就是 DoS 攻击向量。攻击者可以发一个 Content-Length: 99999999999 的请求(或者 chunked encoding 不断发数据),你的服务端如果 naively 全读进内存就挂了。Axum、Actix-web、warp 等 HTTP 框架默认会把 body 套 Limited——通常 2MB 或 4MB 的上限,可配置。
10.7 hyper::body::Incoming:把三种来源统一成 Body
现在我们来看 Hyper 这一端——当一个 HTTP 请求的 body 真实从网络上到达 Hyper 时,它是什么类型?答:hyper::body::Incoming。
rust
// hyper/src/body/incoming.rs:52-75
pub struct Incoming {
kind: Kind,
}
enum Kind {
Empty,
Chan {
content_length: DecodedLength,
want_tx: watch::Sender,
data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
trailers_rx: oneshot::Receiver<HeaderMap>,
},
H2 {
content_length: DecodedLength,
data_done: bool,
ping: ping::Recorder,
recv: h2::RecvStream,
},
#[cfg(feature = "ffi")]
Ffi(crate::ffi::UserBody),
}Incoming 是一个多态结构——它可能是三种 body 来源之一:
Empty:没有 body(GET 请求、204 响应、HEAD)。Chan:HTTP/1 路径——hyper 的 HTTP/1 解析器把 body chunks 通过 mpsc 送来,want_tx是一个反向信号(告诉解析器调用方希望继续读)。H2:HTTP/2 路径——直接持有h2::RecvStream,带 flow control 和 ping 逻辑。Ffi:FFI 用户 body(C API 写的)。
所有这些"来源"对外统一暴露成一个 Body impl——用户只看到 Incoming,.poll_frame() 之后返回 frame。内部是哪条路径调用方根本不知道、也不需要知道。
这是 Hyper 1.x 对多协议的优雅处理:协议实现的差别藏在 enum variant 里,对外的 trait impl 把它们统一。看 Incoming 的 Body impl(文件后半段):
rust
impl Body for Incoming {
type Data = Bytes;
type Error = crate::Error;
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<Bytes>, crate::Error>>>
{
match self.kind {
Kind::Empty => Poll::Ready(None),
Kind::Chan { ref mut data_rx, ref mut trailers_rx, .. } => {
// 从 mpsc 拉数据;结束后看 oneshot 拿 trailers
...
}
Kind::H2 { ref mut recv, .. } => {
// 调 h2 的 RecvStream
...
}
...
}
}
}三种实现、同一个 trait、同一个对外 API。这是协议多态最好的例证。
10.7.1 Chan 的反向信号:want_tx
Chan variant 里有一个字段你可能没见过:want_tx: watch::Sender。
作用:告诉 HTTP/1 parser "调用方现在还想继续读 body"。Hyper HTTP/1 parser 不会无脑从 TCP 读数据填 mpsc——它会等调用方 poll_frame 时发出"想要"信号。这是一种 body-level 的背压:如果 handler 处理 body 慢,parser 不会把内存填爆。
HTTP/2 不需要这个 mechanism——h2 crate 自己的 flow control window 处理了背压。HTTP/1 因为没有线路级 flow control,需要应用层配合。
这是hyper 工程上最被忽视但至关重要的一块——它让 HTTP/1 上的大文件流式上传在 Rust 里也能正确应用背压。第 11 章读 HTTP/1 wire 时会把 want_tx / want_rx 的协议完整讲清楚。
10.8 一段完整链路
把所有这些串起来——从 TCP 字节到 handler 消费 body——是这样:
TCP stream
↓
hyper proto::h1::Dispatcher 解析 header
↓
构造 Request<Incoming>
↓
Incoming::Chan { data_rx, want_tx, ... }
↓ ↑
└── mpsc 发数据 ←── h1 parser 读 TCP
handler 拿到 Request<Incoming>
↓
while let Some(frame) = body.frame().await?
↓
frame.data() / frame.trailers()
↓
handler 处理上链路里每一个箭头都是一次 .await——整条链是惰性驱动的。handler 不消费,parser 不发;parser 不发,TCP 不读。这就是"端到端的流式处理"在 Rust 里的落地形态。
10.9 实战:一个最小 JSON 流解析器
我们用本章和前几章的知识写一个"流式 JSON 数组解析器"——不把整个 body 读进内存,一边读一边解析。
rust
use http_body_util::BodyExt;
use hyper::body::Incoming;
use serde_json::Deserializer;
async fn stream_parse(body: Incoming) -> Result<Vec<Item>, BoxError> {
let mut items = Vec::new();
let mut buffer = bytes::BytesMut::new();
let mut body = std::pin::pin!(body);
while let Some(frame) = body.as_mut().frame().await {
let frame = frame?;
if let Some(data) = frame.data_ref() {
buffer.extend_from_slice(data.chunk());
// 尝试增量解析
let de = Deserializer::from_slice(&buffer).into_iter::<Item>();
let mut consumed = 0;
for result in de {
match result {
Ok(item) => {
items.push(item);
consumed = de.byte_offset();
}
Err(e) if e.is_eof() => break, // 数据不足,等下一帧
Err(e) => return Err(e.into()),
}
}
buffer.advance(consumed);
}
}
Ok(items)
}这段代码真实可用,关键点:
body.frame().await来自BodyExt——把poll_frame包成async fn。- 用
BytesMut做增量缓冲——每收到一帧追加,serde 消费多少就 advance 多少。 - 解析出 EOF 错误不当失败——是"数据不足"的信号,等下一帧。
- 永远不把整个 body 读进内存——memory footprint ≈ 一帧 + 未消费字节 + 一个 JSON 对象。
这是 Rust 流式处理的"标准配方"。结合卷四《Serde 元编程》第 4 章(Deserializer 与 Visitor)——你会发现 serde 的 Visitor 设计天然适合这种"增量输入、增量产出"场景。http-body 的流和 serde 的流可以在同一条 async 链上共存。
10.10 小结与落到你键盘上
本章要点:
Bodytrait 用poll_frame把"HTTP body"抽象成"Frame 流"——每帧可能是 Data 或 Trailers。Frame<T>+SizeHint是小而精确的数据原语,甚至在单元测试里把加法的数学证明写进 match。http-body-util提供常用 impl:Empty、Full、StreamBody、BoxBody、Limited。Limited是安全防护,生产 HTTP 服务端必配。hyper::body::Incoming用 enum 把 HTTP/1 mpsc、HTTP/2 recv stream、FFI body 统一成一个 Body impl。- Chan variant 的 want_tx 在 HTTP/1 上提供 body-level 背压——hyper 工程细节里很少被讲的关键一环。
落到你键盘上:
- 读
http-body-util/src/channel.rs——它实现了(Sender, Body)对,是做 SSE / 流式响应时最实用的工具。 - 给 Axum 代码加
Limited——如果你还没做,立即加上tower_http::limit::RequestBodyLimitLayer::new(2 * 1024 * 1024)。 - 实验 BodyExt::collect()——把一个 Incoming 一次性 collect 成
Vec<u8>。跑一次之后读它的源码(大约 50 行),你会明白所有 body 的.await链本质上是在做什么。
下一章开始 Hyper HTTP/1 的硬核部分——我们读 httparse、proto::h1::Encoder、proto::h1::Decoder,看字节流如何被解析成你熟悉的那些 http:: 类型。