Hyper 与 Tower:工业级 HTTP 栈
第8章 Filter / MapRequest / Steer:请求路由与变换
第8章 Filter / MapRequest / Steer:请求路由与变换
8.1 另一种中间件
前面几章我们讲的中间件——Timeout、Retry、Buffer、LoadShed、Balance——都在”按规则控制请求的时序”:什么时候放过、什么时候挂起、什么时候分发到哪个端点。本章我们看另一组中间件:它们不关心时序,关心请求本身。
| 中间件 | 干什么 |
|---|---|
Filter / AsyncFilter | 根据 predicate 决定”过不过”,通过的才发给内层 |
MapRequest | 把 Req1 变成 Req2,然后发给内层 |
MapResponse / MapResult / MapErr | 对内层响应做变换 |
Steer | 多路分流——根据 predicate 选择发给哪个内层 |
这些中间件是”路由层”的构建块。Axum 的路由、Tonic 的方法分发、代理的 upstream 选择,本质上都是这些原语的组合。它们共享一个特征:只做纯粹的类型/值变换,不涉及异步等待、不涉及容量控制。
这一章我们重点讲 Filter、MapRequest、Steer——三个最有代表性的。源码来自 tower 0.5.3 的 tower/src/filter/、tower/src/util/map_request.rs、tower/src/steer/。
8.2 MapRequest:最纯粹的变换
从最简单的开始。MapRequest 就是一个 fn(Req1) -> Req2:
// tower/src/util/map_request.rs:6-10
pub struct MapRequest<S, F> {
inner: S,
f: F,
}
// 41-62
impl<S, F, R1, R2> Service<R1> for MapRequest<S, F>
where
S: Service<R2>,
F: FnMut(R1) -> R2,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}
#[inline]
fn call(&mut self, request: R1) -> S::Future {
self.inner.call((self.f)(request))
}
}
整整 20 行代码就是全部。call 一句 self.inner.call((self.f)(request))——把请求通过函数 f 变一下,转给内层。
这段代码里有一个非常容易错过的魔法:Service<R1> 和 Service<R2> 是不同的 trait impl。MapRequest 在泛型上声明它 impl 的是 Service<R1>(外部看到的请求类型),但内层 S impl 的是 Service<R2>(变换后的请求类型)。MapRequest 本身改变了 trait impl 面的 “Request 参数”。
换句话说:你有一个 Service<GenericRequest> 想暴露成一个 Service<HttpRequest>,只需要把从 HttpRequest 到 GenericRequest 的映射函数套上 MapRequest。从外面看,这个 service 变成了 Service<HttpRequest> 能处理的东西。这是类型级别的协议转换器。
8.2.0 #[inline] 在 MapRequest 上的份量
打开 tower/src/util/map_request.rs:52-60、两个关键方法都被标了 #[inline]:
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}
#[inline]
fn call(&mut self, request: R1) -> S::Future {
self.inner.call((self.f)(request))
}
为什么要加 #[inline]?——因为 MapRequest 本身没做任何工作——它的 call 就是把请求过一个闭包再交给内层。如果 #[inline] 不开、编译器可能把 MapRequest 的 call 单独编成一个函数、每次请求都是一个多余的函数调用。一个 tower service 链里可能串 5-10 个 MapRequest(改 header、加 tag、注入 trace ID……)、每个请求就是 5-10 次无谓的函数跳转。
开了 #[inline]、编译器就会把 MapRequest::call 的代码展开到调用处——外层看到的直接就是 inner.call((f)(request))、相当于 MapRequest 这层在机器码里不存在。这是零成本抽象的关键——代码写起来分层、编译出来扁平。
Tower 在所有薄封装的 Service(MapRequest、MapResponse、Filter 的 poll_ready)上都加了 #[inline]——因为它们要串很深的链、任何一层不被 inline 都会变成性能税。这是一个看起来平淡、但在真实 benchmark 里能测到 10-20% 差异的细节。
8.2.1 一个真实场景
在 Axum 里你会看到这样的代码:
let app = Router::new()
.route("/foo", get(handler))
.layer(MapRequestLayer::new(|req: Request| {
// 附加一个 X-Request-Id 头
let mut req = req;
req.headers_mut().insert(
"x-request-id",
HeaderValue::from_str(&Uuid::new_v4().to_string()).unwrap(),
);
req
}));
这类”请求预处理”——打 tag、改 header、插 extension——用 MapRequest 就够了。不需要自己写一个新的 struct + Service impl,一个闭包解决。
8.2.2 MapResponse / MapErr / MapResult
Tower 对称地提供了三个响应侧的变换:
MapResponse<S, F>:对成功响应做F: FnMut(Resp1) -> Resp2。MapErr<S, F>:对错误做F: FnMut(Err1) -> Err2。MapResult<S, F>:对Result<Resp, Err>做统一变换——可以把错误转成成功,反之亦然。
三者的实现结构和 MapRequest 几乎一致——“call 返回一个包着原 future 的 adapter future”。我们不重复代码了——它们就是沿着同一个模板铺出去的。
8.2.3 为什么这些都只是”工具”,不配有章节
读者可能好奇:这么基础的变换为什么 Tower 不把它们做成”built-in 方法”——就像 Iterator::map?答案是:已经是了。
ServiceExt trait(tower/src/util/mod.rs)定义了一组便利方法:
pub trait ServiceExt<Request>: tower_service::Service<Request> {
fn map_request<F, R>(self, f: F) -> MapRequest<Self, F>
where Self: Sized, F: FnMut(R) -> Request { ... }
fn map_response<F, Res>(self, f: F) -> MapResponse<Self, F> { ... }
fn map_err<F, Err>(self, f: F) -> MapErr<Self, F> { ... }
// ...
}
有了 ServiceExt,你在业务代码里可以直接:
let svc = my_service.map_request(|req| modify(req))
.map_response(|resp| enrich(resp))
.map_err(|e| MyError::from(e));
这就像 Iterator 上的 .map().filter().collect()——一个链式的数据流改写。Tower 的意图很明显:让 Service 链式编程感觉像 Iterator 一样自然。读过卷四《Serde 元编程》第 3 章里关于 Serializer 链式方法的讨论——这种组合式 API 在 Rust 生态是一个通用模式:trait 定义核心、trait 扩展方法定义惯用法。
8.3 Filter:带 Predicate 的守门人
Filter 比 MapRequest 进了一步:它不止变换,还能拒绝。
// tower/src/filter/mod.rs:45-48
pub struct Filter<T, U> {
inner: T,
predicate: U,
}
两个类型参数:T 是内层 service、U 是 predicate。
8.3.0 const fn new 与 Filter::check 独立方法
Filter 的构造函数写成 const fn(tower/src/filter/mod.rs:64):
impl<T, U> Filter<T, U> {
pub const fn new(inner: T, predicate: U) -> Self {
Self { inner, predicate }
}
为什么是 const?——因为 Filter 的构造逻辑就是两个字段赋值、完全可以在编译期完成。这允许用户写 static FILTER: Filter<_, _> = Filter::new(...) 这样的静态初始化——在嵌入式和极限性能场景里有用。相比之下 Filter::layer(predicate) -> FilterLayer<U> 因为返回不同类型就没做成 const、但 FilterLayer::new 也是 const(layer.rs:38)——同样允许静态构造。AsyncFilter::new 也是 const(mod.rs:126)、和 Filter 保持对称。
const fn 这个装饰还有一个非功能性作用——它是一个不变性承诺:未来的 Tower 作者如果在 Filter::new 里加了一行 println!("created") 之类的副作用、编译会失败。const 在这里不只是”能不能在编译期调用”、更是一道”这个构造永远不做副作用”的契约。
Filter 还有一个独立的 check 方法(mod.rs:77-82)、允许外部代码不经过 call 就手动 check 一个请求:
pub fn check<R>(&mut self, request: R) -> Result<U::Request, BoxError>
where U: Predicate<R>,
{
self.predicate.check(request)
}
这对测试或条件调用很有用——你可以先 check 一下、如果通过了再决定怎么处理请求。注意这里的 R 是方法级的泛型、不是 Filter 结构体的——意味着同一个 Filter 实例可以用不同的请求类型来 check(只要 U 对每个类型都 impl 了 Predicate)。这种”一个 Filter 服务多种请求”的场景虽然罕见、但 API 没有把这种可能性关死。
8.3.1 Predicate trait
// tower/src/filter/predicate.rs:24-38
pub trait Predicate<Request> {
type Request;
fn check(&mut self, request: Request)
-> Result<Self::Request, BoxError>;
}
这个 trait 有两个关联类型——不,只有一个(Request)。但它接收一个泛型 Request——这个泛型是 predicate 的输入类型;关联类型 Request 是 predicate 的输出类型。
读这两个名字很容易混:
- trait 的泛型参数
Request(比如impl Predicate<RawHttpRequest>)——外部传进来的原始请求类型。 - 关联类型
type Request——predicate 接受之后、传给内层 Service 的类型。
所以 Predicate 本质是:(Request) → Result<Self::Request, Error>——它既可能拒绝(Err),也可能变换(映射到不同类型)。
实际上,MapRequest 是 Filter 的特例:如果 predicate 永远不返回 Err,那 Filter 退化成一个 MapRequest。Tower 没把它们强行统一,因为在语义和 API 清晰度上区分它们更直观——MapRequest 是”我一定要改”、Filter 是”我可能拒绝”。
8.3.2 Predicate 是 blanket impl 覆盖的
// tower/src/filter/predicate.rs:55-65
impl<F, T, R, E> Predicate<T> for F
where F: FnMut(T) -> Result<R, E>, E: Into<BoxError>,
{
type Request = R;
fn check(&mut self, request: T) -> Result<Self::Request, BoxError> {
self(request).map_err(Into::into)
}
}
任何形如 FnMut(T) -> Result<R, E> 的闭包或函数,都自动满足 Predicate trait。你写 filter 时不需要造一个 struct——直接闭包:
let auth_filter = Filter::new(inner, |req: Request| {
if req.headers().get("authorization").is_some() {
Ok(req)
} else {
Err(BoxError::from("unauthorized"))
}
});
看到了吗?auth 中间件就这么几行。“有 authorization header 就通过,没有就拒绝”——就是一个 Predicate。
8.3.2.5 AsyncPredicate 的 ErrInto 自动转换
AsyncPredicate 的 blanket impl(predicate.rs:40-53)里藏着一个细节:
impl<F, T, U, R, E> AsyncPredicate<T> for F
where
F: FnMut(T) -> U,
U: Future<Output = Result<R, E>>,
E: Into<BoxError>,
{
type Future = futures_util::future::ErrInto<U, BoxError>;
type Request = R;
fn check(&mut self, request: T) -> Self::Future {
use futures_util::TryFutureExt;
self(request).err_into()
}
}
关键在 type Future = futures_util::future::ErrInto<U, BoxError>——用户写的异步 predicate 返回 impl Future<Output = Result<R, MyError>>、但 AsyncPredicate 对外暴露的 Future 类型是 ErrInto<U, BoxError>。
ErrInto 是 futures-util 的一个 future 适配器——它把内层 future 的 Err 分支通过 Into::into 自动转成 BoxError。用户不需要在 predicate 里手动写 .map_err(BoxError::from)——只要错误类型实现了 Into<BoxError>(E: Into<BoxError> 在 where 子句里)、类型转换自动发生。
这个设计把业务 predicate 的错误类型自由度和Filter 对外的统一错误类型调和起来——你可以在 predicate 里用任何自定义错误、Filter 自动帮你擦除。代价是性能上的一层 vtable 调用(BoxError 是 Box<dyn Error>)——在 predicate 这层可以接受、因为 predicate 本来就是”请求生命周期里的一次决策”、不是热路径。
注意这个模式和同步 Predicate impl(predicate.rs:55-65)完全对称——那边是 self(request).map_err(Into::into)、同步版本不需要包装 future、直接 map_err。两个实现是”同一个想法的两种姿态”——异步用 ErrInto、同步用 map_err、都是自动把错误搬到 BoxError。
8.3.3 Filter::call:把两种路径合成一个 future
// tower/src/filter/mod.rs:114-119
fn call(&mut self, request: Request) -> Self::Future {
ResponseFuture::new(match self.predicate.check(request) {
Ok(request) => Either::Right(self.inner.call(request).err_into()),
Err(e) => Either::Left(std::future::ready(Err(e))),
})
}
Either 是 futures-util 的一个双路径 future——它要么 poll 左边要么 poll 右边:
- Ok:predicate 通过,返回内层 service 产生的 future(通过
err_into()把错误类型转成 BoxError)。 - Err:predicate 拒绝,返回一个立即 Ready 的错误 future(
std::future::ready(Err(e)))。
对外统一成一个 ResponseFuture 类型——调用方不需要关心内部走的是哪条路径。这是典型的”用 Either 统一两个异构 future”的模式,在 Tower 里反复出现。
8.3.3.5 opaque_future! 宏与 ResponseFuture 的命名技巧
ResponseFuture 定义在 tower/src/filter/future.rs:31-40:
opaque_future! {
/// Filtered response future from [`Filter`] services.
pub type ResponseFuture<R, F> =
futures_util::future::Either<
std::future::Ready<Result<R, crate::BoxError>>,
futures_util::future::ErrInto<F, crate::BoxError>
>;
}
注意这不是普通的 type 别名、而是 Tower 自定义的 opaque_future! 宏(在 macros.rs 定义)。这个宏生成的不只是类型别名——它会把 Either 包成一个新的 struct、对外完全隐藏内部类型。
为什么要做这一层遮蔽?——因为如果直接把 Either<Ready<...>, ErrInto<...>> 暴露给用户、那这就成了 Filter 的公开 API 的一部分。未来 Tower 想把错误分支换成别的 future 实现、任何依赖这个具体类型的用户代码就会编译失败——这是 API 稳定性的陷阱。
opaque_future! 宏把具体类型封装成一个 newtype——用户只能通过 Future<Output = ...> 这个 trait 约束使用它、不能依赖内部结构。这样未来 Tower 想从 Either<Ready, ErrInto> 换成别的 sum-type future、用户代码完全不受影响。
这是rust 库作者的长期主义——今天多写一点宏、未来十年的 API 演进都留了余地。对比 Axum、Tonic 这些上层框架、也用类似技巧包装自己的返回 future——“不暴露具体 future 类型”已经是 Rust 异步生态的默认最佳实践。
8.3.4 AsyncFilter:异步 predicate 和 Clone 的微妙处理
有时候 predicate 本身需要异步查询——比如”查 Redis 判断这个 token 有没有被吊销”。Tower 为此提供了 AsyncFilter 和 AsyncPredicate:
// tower/src/filter/predicate.rs:5-23
pub trait AsyncPredicate<Request> {
type Future: Future<Output = Result<Self::Request, BoxError>>;
type Request;
fn check(&mut self, request: Request) -> Self::Future;
}
返回 Future 而不是 Result——检查本身是异步的。
实现里有一个值得细品的Clone 处理:
// tower/src/filter/mod.rs:176-189
fn call(&mut self, request: Request) -> Self::Future {
use std::mem;
let inner = self.inner.clone();
// In case the inner service has state that's driven to readiness and
// not tracked by clones (such as `Buffer`), pass the version we have
// already called `poll_ready` on into the future, and leave its clone
// behind.
let inner = mem::replace(&mut self.inner, inner);
// Check the request
let check = self.predicate.check(request);
AsyncResponseFuture::new(check, inner)
}
这就是第 4 章警告过的”poll_ready 后 clone 再 call 的陷阱”的正确解法。AsyncFilter 之所以需要这么做,是因为 predicate 是异步的——check 过程中调用方可能并发地 call 另一个请求,这时候 self.inner 不能被独占。方法是:先 clone 一个备用 inner 放回 self.inner,然后把 “已经 poll_ready 过的原 inner” 搬到 future 里。
这段代码的注释写得极好,直接告诉你”为什么”(in case the inner service has state that's driven to readiness and not tracked by clones (such as Buffer))——是整本 tower 源码里对这个经典模式最清晰的解释。
8.3.5 AsyncResponseFuture 状态机:两阶段 poll 的教科书写法
AsyncFilter 真正的核心是 AsyncResponseFuture(future.rs:42-97)——这是 Tower 里最典型的”手写 future 状态机”之一、值得逐行拆:
pin_project! {
#[project = StateProj]
enum State<F, G> {
Check { #[pin] check: F }, // 阶段 ①:等 predicate
WaitResponse { #[pin] response: G }, // 阶段 ②:等内层响应
}
}
impl<P, S, Request> Future for AsyncResponseFuture<P, S, Request> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.state.as_mut().project() {
StateProj::Check { mut check } => {
let request = ready!(check.as_mut().poll(cx))?;
let response = this.service.call(request);
this.state.set(State::WaitResponse { response });
}
StateProj::WaitResponse { response } => {
return response.poll(cx).map_err(Into::into);
}
}
}
}
}
三个关键技术点:
① pin_project! 的 #[pin] 标记——Check 和 WaitResponse 两个 variant 里的 future 字段都需要 #[pin]、因为 future 的 poll 要求 Pin<&mut Self>、而字段的 pin projection 必须显式声明。pin_project_lite 这个 macro 生成的 StateProj 投影类型自动处理了 Pin 的投射规则——用户代码里就只看到普通的 match this.state.as_mut().project()、Pin 语义被隐藏。
② loop + match 的状态迁移——Check 完成时不返回、而是 this.state.set(State::WaitResponse { response }) 切状态、然后 loop 顶上再 match 一次进入 WaitResponse 分支。这是手动 async/await 状态机的经典写法:一个状态完成就立刻进下一个、不让出 future(因为没必要 yield)。如果用 async fn 写同样逻辑、编译器生成的状态机几乎一模一样——只是你看不见。
③ ready! 宏的 ?-style 处理**——ready!(check.as_mut().poll(cx))? 是两个运算符的组合:ready! 对 Poll::Pending 直接 return Pending、对 Poll::Ready(value) 解包出 value;?` 对 Err 直接返回、对 Ok 解包。两个运算符叠起来、三行逻辑(Pending → 让出;Err → 冒泡;Ok → 继续)写成一行。这是 Rust async 状态机里最紧凑的 polling 范式**。
注意 this.service.call(request) 是同步调用——call 本身只是返回 future、不 await。这就是 Service trait 的核心约定:call 瞬间返回、工作在 future 的 poll 里发生。AsyncResponseFuture 做的事就是**“等 predicate、切到等 response”**的状态桥接——两阶段 future 合成一个、完美匹配”predicate check + service call”的语义。
8.4 Steer:多个 Service 的”分流器”
Steer 让你把多个 Service 组合成一个,按某种规则把请求分流到其中一个。
// tower/src/steer/mod.rs:106-111
pub struct Steer<S, F, Req> {
router: F,
services: Vec<S>,
not_ready: VecDeque<usize>,
_phantom: PhantomData<Req>,
}
字段:
services: Vec<S>——所有内层 service。router: F——picker 函数,fn(&Req, &[S]) -> usize返回选中的 index。not_ready: VecDeque<usize>——还没就绪的 service index。
8.4.0 Steer::new 的初始化:为什么 not_ready 从全集开始
Steer::new(steer/mod.rs:117-126)的实现值得仔细看:
pub fn new(services: impl IntoIterator<Item = S>, router: F) -> Self {
let services: Vec<_> = services.into_iter().collect();
let not_ready: VecDeque<_> = services.iter().enumerate().map(|(i, _)| i).collect();
Self { router, services, not_ready, _phantom: PhantomData }
}
not_ready 初始值是”所有 service 的 index”——而不是空 VecDeque。换句话说、一个新建的 Steer 默认认为”所有 service 都还没 ready”。
这是有深意的:Tower 协议要求”call 前必须 poll_ready 且返回 Ready”。Steer::poll_ready 的逻辑是”把 not_ready 里的每个 service 都 poll 一遍、都 Ready 了才返回 Ready”。如果 not_ready 初始是空、那第一次 poll_ready 直接返回 Ready(因为 “is_empty() → Ready”)——但此时 services 里的每个 service 都还没被 poll 过、真实 readiness 未知。用户一 call 就可能撞上 “inner poll_ready 返回 Pending 但 Steer 已经说 Ready 了” 的协议违反。
初始化时把所有 index 塞进 not_ready、保证了第一次 poll_ready 一定会把每个 service 都 poll 一遍——不管 service 本身”默认是什么状态”、都走一遍正规化检查、然后再说 Steer 是 Ready。这是防御性初始化——把正确性建立在”第一次 poll 必走完全部检查”的不变式上。
VecDeque 而不是 Vec——因为 poll_ready 循环里要 pop_front(从队首取、已 ready 的剔除)、call 里要 push_back(被 call 过的扔队尾、下次重新检查)。这是典型的 FIFO 队列用法、VecDeque 的 O(1) 两端操作正合适。用 Vec 就得从队首 remove、每次 O(n) 移动数据——乘以高频 poll_ready 就是性能灾难。
PhantomData<Req>——Steer 结构体里实际不存储任何 Req 类型的值(Req 只出现在 Picker::pick 的参数里、是外部传入的)。但为了让 Steer<S, F, Req> 的泛型参数 Req 在类型系统里有”立足点”、必须用 PhantomData 标记。否则 Rust 编译器会报”unused type parameter”——因为 Rust 要求所有泛型参数都必须”出现在字段或 impl 边界里”、PhantomData 就是专为这种”逻辑上相关但无实际存储”的类型参数设计的标记。
8.4.1 Picker trait
// tower/src/steer/mod.rs:74-87
pub trait Picker<S, Req> {
fn pick(&mut self, r: &Req, services: &[S]) -> usize;
}
impl<S, F, Req> Picker<S, Req> for F
where F: Fn(&Req, &[S]) -> usize,
{
fn pick(&mut self, r: &Req, services: &[S]) -> usize {
self(r, services)
}
}
任何闭包都是 Picker。典型用法来自 Tower 官方文档里的例子:
let mut svc = Steer::new(
vec![root_service, not_found_service],
|req: &Request<String>, _services: &[_]| {
if req.method() == Method::GET && req.uri().path() == "/" {
0 // root
} else {
1 // not_found
}
},
);
这就是一个最原始的 HTTP router——按 method + path 选 service。
8.4.2 poll_ready:head-of-line blocking
这里是 Steer 最有争议也最微妙的一段:
// tower/src/steer/mod.rs:138-155
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
if self.not_ready.is_empty() {
return Poll::Ready(Ok(()));
} else {
if self.services[self.not_ready[0]]
.poll_ready(cx)?
.is_pending()
{
return Poll::Pending;
}
self.not_ready.pop_front();
}
}
}
关键行为:Steer 在 poll_ready 时要求”所有 service 都 Ready”才报告 Ready。
Why?因为 Steer 在收到请求之前不知道 picker 会选哪个 service——所以必须保证每一个候选都能接。不然就会出现 “picker 选了 2 号但 2 号 not ready 要挂起” 的尴尬——而 Tower 协议里 poll_ready 一旦 Ready 就必须能立刻 call。
这个设计带来了一个明显问题:head-of-line blocking。如果你有 10 个 service,一个慢,一个快 9 个快;哪怕所有请求都是要发给 9 个快的,Steer 也必须等那个慢的 ready 才行。文档里明确警告:
This will cause head-of-line blocking unless paired with a [
Service] that does buffer items indefinitely, and thus always returns [Poll::Ready]. For example, wrapping each component service with a [Buffer] with a high enough limit … will prevent head-of-line blocking.
翻译:要么每个 service 是”永远 Ready”的,要么用 Buffer 包住每个 service——让 Buffer 把背压吞掉,Steer 看到的就是”永远 Ready”的 service 列表。
这是一个对 Tower 协议有意的牺牲:Steer 本可以在内部维护一个”队列到哪个还在忙着的 service”的机制——但那就变成了 Balance。Steer 选择了简洁——保证它永远不偷偷替你排队,所有决策都由你在 picker 里显式做出。
8.4.2.5 poll_ready 循环的终止性证明
第一次看 Steer 的 poll_ready 可能会怀疑:这个 loop 会不会死循环?
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
if self.not_ready.is_empty() {
return Poll::Ready(Ok(()));
} else {
if self.services[self.not_ready[0]].poll_ready(cx)?.is_pending() {
return Poll::Pending;
}
self.not_ready.pop_front();
}
}
}
终止性论证——loop 里三种结果:
① not_ready.is_empty() → return Ready:循环出口 ①、成功返回。
② inner poll_ready 返回 Pending → return Pending:循环出口 ②、把 Pending 冒泡。
③ inner poll_ready 返回 Ready → pop_front + continue:循环体每走一轮、not_ready.len() 单调递减 1(pop_front 走了一个)。
三个分支里、只有 ③ 继续循环、且每次都让队列长度严格减少。队列初始长度是 services.len()(有限)、最多走 services.len() 轮就一定进入 ①(队列空)或 ②(某个 service 还没 ready)。循环必然终止。
这是循环变量法的经典结构:找一个非负整数(not_ready.len)、证明它在每次循环中严格减少、就保证终止。Rust 编译器不做这种证明(不是形式化语言)、但人工读代码可以——这是并发代码里”状态机必然收敛”的标准推理。
有一个隐含前提:loop 里没有增长 not_ready 的分支。验证一下——loop body 里只有 pop_front 和读取、没有任何 push_back。所以队列只减不增、循环必定收敛。
唯一可能引起无限 poll_ready 被调用的外部原因是inner service 每次都返回 Ready 但立刻又被 call 到 Pending——这不在 Steer 的控制范围内、是调用方用错了 Tower 协议(call 前应该先 poll_ready 确认 Ready)。Steer 通过 call 里的 assert! 检测这种协议违反、panic 出问题现场。
8.4.3 call:按 picker 分发
// tower/src/steer/mod.rs:157-167
fn call(&mut self, req: Req) -> Self::Future {
assert!(
self.not_ready.is_empty(),
"Steer must wait for all services to be ready. Did you forget to call poll_ready()?"
);
let idx = self.router.pick(&req, &self.services[..]);
let cl = &mut self.services[idx];
self.not_ready.push_back(idx);
cl.call(req)
}
三件事:
- 断言所有 service 都是 ready(否则 panic)。
- 调 picker 选 index。
- 把这个 index 加入
not_ready(call 之后这个 service 可能又变回 pending),调 call。
注意:self.not_ready.push_back(idx)——这个 service 被 call 之后就再次进入 not_ready 队列。下一次 poll_ready 又要 poll 它一次。这保证了 steer 的状态一致性——每 call 一次、标脏一次、下次 poll_ready 再确认 ready。
8.4.3.5 Clone 的条件 impl 与 _phantom 的处理
Steer 的 Clone(steer/mod.rs:170-183)有一个细节:
impl<S, F, Req> Clone for Steer<S, F, Req>
where
S: Clone,
F: Clone,
{
fn clone(&self) -> Self {
Self {
router: self.router.clone(),
services: self.services.clone(),
not_ready: self.not_ready.clone(),
_phantom: PhantomData,
}
}
}
where S: Clone, F: Clone 是条件 impl——Steer 本身不无条件地 #[derive(Clone)]、而是只在内部 Service 和 Picker 都 Clone 时才 Clone。这避免了一个陷阱:如果无条件 derive、那 Rust 编译器会默认要求 S: Clone,这个约束会从类型定义处就固化、即使用户从不 clone Steer 也必须满足 S: Clone。
手写 impl<..> where ..> Clone 把 Clone 约束推迟到真正使用 clone 时才强制——用户用 Box<dyn Service>(不 Clone)构造 Steer 完全没问题、只要他不 clone Steer 就行。这是 Rust 里精确条件 impl 胜于 derive的典型场景。
注意 _phantom: PhantomData 在 clone 里没有调用 self._phantom.clone()——因为 PhantomData 的 clone 就是”造一个新的 PhantomData”、没有任何运行时工作。直接写 PhantomData 等价于 clone。这个小细节反映了 Rust 编译器对 PhantomData 的特殊处理——它是零大小类型(ZST)、编译后完全消失、clone 就是原地返回。
还有一点:not_ready: self.not_ready.clone()——克隆出来的 Steer 继承原 Steer 的 not_ready 状态。如果原 Steer 已经 poll_ready 过几个 service、克隆出来的 Steer 也记得这些信息。这意味着 clone 不是 reset——两个 Steer 实例共享”初始时的就绪判断结果”。但因为它们有各自独立的 services 列表(Vec clone 出两份)、后续的 poll_ready / call 都各自独立演化。
8.4.4 Steer::Future = S::Future 的类型传递
最容易忽略的一行是(mod.rs:136):
type Future = S::Future;
Steer 不包装 future、直接透传内层 service 的 future 类型。这意味着:
- 如果所有 inner service 的 Future 类型相同(比如都是
Pin<Box<dyn Future>>)、Steer 能正常编译——因为S::Future对所有 inner 都一致。 - 如果 inner service 的 Future 类型不同(比如一个是
MyFuture1、一个是MyFuture2)、Steer 根本编译不过——因为S: Service<Req>意味着 services Vec 里每个元素必须是同一个 S 类型、它们的 Future 自然也都是同一个S::Future。
这就是 §8.7.1 提到的”Steer 里不能混 Service 类型”的编译期根源——不是运行时限制、是 Rust 类型系统的天然约束。要绕开、必须先用 BoxService::new(svc) 把每个 service 擦除到 BoxService<Req, Resp, Err>、这时所有 service 的类型都是同一个 BoxService、Future 也都是同一个 BoxFuture。类型擦除不是可选的便利、是 Steer 能接纳异构 service 的唯一办法。
这种”直接透传 associated type”比 Filter 的 ResponseFuture(opaque_future! 包装的 Either)更激进——Filter 至少包装了一层、内部可以换;Steer 完全透传、内部没有自己的 future。这是有代价的——Steer 永远不能在 future 里插入自己的逻辑(比如 metrics、tracing)。如果 Steer 未来想加”每个分发记一个计数器”、它就必须改 type Future、这是破坏性 API 变更。
Tower 在 Steer 这里选择了极简——透传到底、一切延迟都来自内层 service、Steer 自身不增加任何 future 层。这和 Filter 的”包一层 Either 统一错误”形成对比——两种设计取舍、都合理、但体现了封装 vs 透传的哲学分歧。
8.5 三者的组合实例
我们来看一个真实应用,把这些原语串起来:构造一个 authN + routing 的中间件链。
场景:HTTP 服务端,需求是:
- 所有请求必须带 Authorization header,否则 401(auth filter)。
- URL path 包含
/admin的请求打上X-Admin: true头(map_request)。 - Path 以
/api开头的走 API service,以/web开头的走 Web service,其他走 fallback(steer)。
use tower::{filter::FilterLayer, util::MapRequestLayer, steer::Steer, ServiceBuilder, ServiceExt};
use http::{Request, Response};
use http_body_util::Full;
// 1. API / Web / Fallback 三个内层 service
let api_svc = /* ... */;
let web_svc = /* ... */;
let fallback_svc = /* ... */;
// 2. 用 Steer 组成一个"按 path 选 service"的组合
let routed = Steer::new(
vec![api_svc, web_svc, fallback_svc],
|req: &Request<_>, _svcs: &[_]| {
if req.uri().path().starts_with("/api") { 0 }
else if req.uri().path().starts_with("/web") { 1 }
else { 2 }
},
);
// 3. 在外面套上 map_request + filter
let app = ServiceBuilder::new()
.layer(FilterLayer::new(|req: Request<_>| {
if req.headers().get("authorization").is_some() {
Ok(req)
} else {
Err(BoxError::from("401 unauthorized"))
}
}))
.layer(MapRequestLayer::new(|mut req: Request<_>| {
if req.uri().path().contains("/admin") {
req.headers_mut().insert("X-Admin", HeaderValue::from_static("true"));
}
req
}))
.service(routed);
请求流:
req → Filter(auth) → MapRequest(admin tag) → Steer(path router) → api|web|fallback → resp
│ │ │
↓ 401 err ↓ 变形 ↓ 分发
整条链是静态类型、零运行时选择、零分配。编译器把它全部单态化成一个具体的 struct。Axum 的 handler 分发、Tonic 的 method 分发,最底层用的就是类似的组合思路(虽然 Axum 自己实现了更复杂的 Router——因为要支持路径参数、动态方法、嵌套路由等,不是朴素的 Steer 能覆盖的)。
8.5.5 FilterLayer::layer:Clone 约束的前置
FilterLayer 实现 Tower 的 Layer trait(filter/layer.rs:43-50):
impl<U: Clone, S> Layer<S> for FilterLayer<U> {
type Service = Filter<S, U>;
fn layer(&self, service: S) -> Self::Service {
let predicate = self.predicate.clone();
Filter::new(service, predicate)
}
}
U: Clone 是 impl 级别的约束、不是 FilterLayer struct 本身的约束。打开 filter/layer.rs:12-15 看 FilterLayer 的定义:
#[derive(Debug, Clone)]
pub struct FilterLayer<U> {
predicate: U,
}
FilterLayer struct 本身不要求 U: Clone——你可以构造一个 FilterLayer<NonCloneablePredicate>。但一旦你想把它当 Layer 用(调用 .layer(svc))、predicate 就必须 Clone——因为 layer 方法需要 self.predicate.clone() 把 predicate 搬进新 Filter。
为什么 predicate 要 clone?——因为 Layer 的语义是”给我一个 Service、我返回一个新 Service”——外层需要拥有自己的 predicate 副本、不能借用 FilterLayer 内部的 predicate(ownership 冲突)。Clone 是最直接的所有权转移办法。
这个”struct 宽松、impl 严格”的约束模式是 Rust 库设计的一个重要惯用法——数据结构允许多种可能、只在需要某种能力的操作处强制。用户即使用不 Clone 的 predicate 构造了 FilterLayer、只要不走 Layer trait 的路径、就不会受 Clone 约束影响。
对比 Tower 里一些更严格的 layer(比如 RetryLayer 要求 policy 全程 Clone)、FilterLayer 的设计更灵活——因为 predicate 可能有自己的状态(比如一个计数器)、允许用户用 Arc<Mutex<Predicate>> 这样的共享状态也能正常工作(Arc 是 Clone 的、clone 只是增加引用计数)。
8.5.6 和第 4 章 Buffer/poll_ready 协议的回应
§8.3.4 里 AsyncFilter 的 mem::replace 那段代码是第 4 章讨论的陷阱的唯一正确解法。回想第 4 章讲的 Buffer:
Buffer 不是一个 Service——它是一个由 Clone 共享的 channel 端点。每次 clone Buffer 都不会复制内部状态、只是共享一个 Arc<Mutex<ReadyCache>>。当 A.poll_ready() 返回 Ready、B = A.clone() 拿到的是同一个 Arc——但 A 自己已经”消费”了 readiness slot、B 调用 poll_ready 又得重新排队。
这就是 AsyncFilter 必须做 mem::replace(&mut self.inner, inner) 的根本原因:
let inner = self.inner.clone(); // ① 克隆、拿"新引用"
let inner = mem::replace(&mut self.inner, inner); // ② 把"新引用"放回 self、取出"旧引用"
let check = self.predicate.check(request);
AsyncResponseFuture::new(check, inner) // ③ 用"旧引用"——它持有 readiness slot
第 4 章已经铺垫过为什么 A.clone().call() 会丢 readiness——因为 clone 的 B 是另一个 channel 端点、channel 的 readiness 跟着特定的 Permit 走、不是跟着引用走。AsyncFilter 解法的精髓是:让”持有 readiness slot”的那个 inner 一路送到 future 的 call 处——通过 mem::replace 的左右交换把 readiness 保留在 future 里、不被丢在 self 上。
这个技术和 §6 章讲的 Ready future(tower 的 ready_cache)是一脉相承的——都是围绕 “poll_ready 产生的 readiness 是一次性资源、不能被 clone” 这个核心事实做文章。整本 Tower 源码里、这个规则像一条暗线串起了 Buffer、Filter、Balance 这些中间件——读懂了它、这些中间件的 call 实现就都通了。
8.6 与卷五 Vue 3 的 h() 函数对照
有一个类比也许会让你会心一笑。我们读过卷五《Vue 3 设计与实现》关于响应式组件的章节——Vue 的 h(Component, props, children) 也是一种”用变换构造”的模式:
h('div',
{ class: 'wrapper' },
[ h(MyComponent, { prop: value }),
h('span', {}, ['text']) ])
Tower 的 Steer::new(vec![svc_a, svc_b], picker) 从接口角度看几乎是一样的——“给我一堆组件(services)+ 一个选择函数,我给你一个统一的组合”。
更深一层:两者都解决”如何在不破坏类型的前提下把多路选择表达出来”。Vue 用 VNode 做运行时多态(每个 h() 产生一个 VNode 对象,在 diff 时决定行为),Tower 用 Picker + index 做编译期多态(每个 service 单态化保留其原始类型,picker 决定走哪条)。
这是一个有意思的”语言哲学影响抽象实现方式”的案例。JS 动态类型让 VNode 可以是任何东西、h() 返回值是同一个类型——这是动态语言的方便。Rust 静态类型要求你在 Steer 外面用 BoxService 擦除每一个 service 的类型(或者保证它们类型一致)——这是静态语言的约束。但两套系统表达的核心思想是一致的:组合 + 选择。
8.7 一些容易踩的坑
8.7.1 Steer 里不能混 Service 类型
你不能直接 Steer::new(vec![http_svc, grpc_svc], ...)——因为 Vec 要求所有 service 类型一致。Tower 官方例子里总是先 BoxService::new(svc) 把每个 service 擦除类型,再塞进 Vec。这意味着每次请求分发都付一次 vtable 查找——但这是必须的代价。
8.7.2 AsyncFilter 的延迟累积
AsyncFilter 每次 call 都会异步查一次 predicate。如果你 10 个中间件都是 AsyncFilter,每个都查 Redis——单次请求就有 10 次 Redis round trip。AsyncFilter 只用在真的需要异步查询的场景,能用同步 Predicate 就用同步。
8.7.3 Filter 错误被 BoxError 包裹
Filter 的 type Error = BoxError——即使你内层 service 用了具体的错误类型 MyError,经过 Filter 之后就变成 BoxError。Axum 这类框架的顶层要 downcast_ref::<MyError> 才能识别业务错误。详见第 3 章关于 Layer 错误擦除的讨论。
8.7.4 Filter 的 Error = BoxError 擦除的类型学解释
Filter 的 Service impl(filter/mod.rs:100-108)有一行:
impl<T, U, Request> Service<Request> for Filter<T, U>
where
U: Predicate<Request>,
T: Service<U::Request>,
T::Error: Into<BoxError>,
{
type Response = T::Response;
type Error = BoxError;
type Future = ResponseFuture<T::Response, T::Future>;
type Error = BoxError——不管内层 service 的错误类型是什么、Filter 吐出来的错误一律是 BoxError(Box<dyn std::error::Error + Send + Sync>)。
为什么要统一到 BoxError?——因为 Filter 有两条错误路径:
- predicate 拒绝:predicate 返回
Err(predicate_err)、这个错误类型是BoxError(Predicate trait 的 check 方法签名强制)。 - 内层 service 失败:内层返回
T::Error、这个类型由 inner service 决定、和 predicate 的错误类型不一定相同。
这两条路径的错误要在 Filter 的 Error 关联类型里统一表达——只能用最”兜底”的 BoxError。ResponseFuture 里 futures_util::future::ErrInto<F, BoxError> 正是把 T::Error 通过 Into::into 转成 BoxError 的适配器。
这是有代价的——用户原本用具体的 MyError 类型、经过 Filter 后类型被擦掉、顶层要 downcast_ref::<MyError>() 才能拿回具体信息。但没有办法——两条异构错误路径汇合到一个类型、BoxError 是唯一选项(除非用户自己写 enum FilterError { Pred(PE), Inner(IE) }——但那就让 Filter 的类型签名变复杂、失去了”中间件就该简单”的初衷)。
tower-http 的选择——tower-http 的 AuthorizeLayer、CorsLayer 都不用 BoxError、而是返回具体 error(通常是 StatusCode 或 Response<Body>)。因为 HTTP 场景里错误总能转成 HTTP 响应、不需要 Box 动态类型。这和 tower 的 BoxError 形成对照——tower 协议无关、必须动态;tower-http HTTP 专用、可以静态。这个层次划分体现了抽象层要付什么代价——越通用、类型信息损失越多。
8.7.5 AsyncFilter 的 T: Service<U::Request> + Clone 约束
AsyncFilter 的 Service impl(filter/mod.rs:162-167)比 Filter 多一个约束:
impl<T, U, Request> Service<Request> for AsyncFilter<T, U>
where
U: AsyncPredicate<Request>,
T: Service<U::Request> + Clone, // ← 比 Filter 多 + Clone
T::Error: Into<BoxError>,
T: Clone 是 AsyncFilter 的额外要求——因为 AsyncFilter 的 call 里要做 let inner = self.inner.clone() 的 mem::replace 技巧(§8.3.4)。Filter 不需要这个——因为 Filter 的 predicate 是同步的、call 瞬间完成、不存在”call 进行中有新 call 要用 self.inner”的情况、不需要 clone。
这就出现了一个有意思的不对称:AsyncFilter 只能包 Clone 的 service、而 Filter 可以包非 Clone 的 service。如果你的 inner service 不 Clone(比如是独占资源的数据库连接池)、你只能用同步 Predicate + Filter、不能用 AsyncPredicate + AsyncFilter。
这个约束在实践中不是大问题——因为任何真实生产环境的 service 都会被 Arc<..> 包一下、Arc 是 Clone 的。但它是一个类型系统里刻下的设计决策——“异步处理必须 clone”这个事实被编译期强制、避免用户在运行时撞上”async 场景下 service 无法复用”的问题。
8.8 和 tower-http 的关系
有一个关键的区分:tower 的 Filter / MapRequest 是完全协议无关的——它们只看 Service<Req>,不知道 Req 具体是什么。如果你要做HTTP 专用的变换(比如加 Authorization header、判断方法、解析路径参数),应该用 tower-http crate 里的专用中间件:
| tower-http 中间件 | 作用 |
|---|---|
SetRequestHeaderLayer | 给每个请求加一个固定 header |
SetResponseHeaderLayer | 给每个响应加一个固定 header |
ValidateRequestHeaderLayer | 校验某个 header 必须符合 predicate |
CorsLayer | CORS 中间件 |
TraceLayer | 自动把请求方法、路径、状态码记进 tracing span |
AuthorizeLayer | Bearer/Basic token 鉴权 |
这些中间件底层都是 Tower 的 Filter / MapRequest 组合——只是它们带了 HTTP 语义,实现里能直接操作 http::Request 的 headers / method / uri。在生产代码里优先用 tower-http 的版本,不要手搓——除非你真的需要一个它没有提供的 predicate。
tower-http 和 tower 的分层在这里很清楚:
- tower:协议无关的中间件”原子”。
- tower-http:把原子组合成 HTTP 语义常用套件。
读一遍 tower-http/src/trace/make_span.rs 和 tower-http/src/auth/require_authorization.rs 的源码,你会发现它们基本都是 Filter 或 MapRequest 的 http 特化——语法糖,核心在 tower 里。
8.8.5 Predicate 的关联类型 vs 泛型参数:为什么分成两个名字
回到 §8.3.1 里那个容易混淆的设计——Predicate<Request> 既有泛型参数 Request、又有关联类型 Request。这不是作者懒、而是刻意的二元设计。看一个实例:
impl<F, T, R, E> Predicate<T> for F
where F: FnMut(T) -> Result<R, E>, E: Into<BoxError>,
{
type Request = R;
对同一个函数 F: FnMut(String) -> Result<Vec<u8>, MyErr>:
- trait 泛型参数
T = String——调用者传进来的原始类型。 - 关联类型
Self::Request = R = Vec<u8>——predicate 输出给内层 service 的类型。
为什么不让 predicate 的输入输出都是同一个类型?因为那就退化成 MapRequest 了——不能做类型级别的过滤映射。典型场景:
// 原始请求是 HttpRequest<RawBody>、predicate 校验后转成 HttpRequest<ValidatedJsonBody>
// 传给内层、内层拿到的就是已校验的、不需要再做一次 json 解析
fn auth(req: HttpRequest<RawBody>) -> Result<AuthRequest, BoxError> {
let body = req.into_parts().1;
let json: AuthRequest = serde_json::from_slice(&body).map_err(|e| BoxError::from(e))?;
Ok(json)
}
trait 让 predicate 有变形能力——输入 String、输出 Vec<u8>;输入 HttpRequest、输出 AuthRequest。内层 service 写成 Service<AuthRequest>、它看到的 Request 类型是 predicate 吐出来的那个、不需要和外部调用者传入的一致。
为什么输入要做泛型参数、输出要做关联类型?——因为一个 Predicate 类型可以 impl 多个不同的 input 类型(不同的 Predicate<T1>、Predicate<T2>)、但每个 impl 只能有一个确定的输出。多 impl 需要多个签名、关联类型不能多 impl(每个 impl 只有一个 Self::Request)——所以输入必须作为 trait 的泛型参数(可重复 impl)、输出作为关联类型(impl 一次定死)。
这就是为什么 Predicate<Request> 要拆成两个同名概念——它们是 trait 设计里两种不同语义实体的自然体现。trait 作者明知”Request”和”Request”重名会让读者困惑、但语义上两者都叫 Request 最准确——硬改名(比如叫 OutRequest)反而扭曲了概念。
8.8.6 本章与第 3/4 章、langgraph 第 17 章的呼应
与第 3 章(Layer 与 ServiceBuilder)的呼应——§8.3 讲的 FilterLayer、§8.2.3 讲的 MapRequestLayer 都是 Layer trait 的实现。第 3 章建立了”Layer 是 Service 的构造模板”这个概念、本章给出了最薄的两个 Layer的完整实现——几乎就是 Layer trait 的教学范本。读完本章再回第 3 章、Layer 的抽象就落实到具体代码上。
与第 4 章(Buffer 与 poll_ready 协议)的呼应——§8.5.6 已经明示:AsyncFilter 的 mem::replace 技巧是第 4 章讨论的 readiness 一次性资源问题的唯一正解。这个技术不止 AsyncFilter 用、Buffer、Balance、SpawnReady 内部都用同样模式。第 4 章铺垫了原则、本章示范了应用。
与 LangGraph 第 17 章的呼应——LangGraph 的 Send/Command 机制是”在图上动态选择下一条边”、Tower 的 Steer 是”根据请求动态选择哪个 service”。两者都属于”运行时多态 dispatch”——一个是在图抽象里、一个是在 Service 抽象里。区别是 LangGraph 的 dispatch 产生新的 pending task(异步、有 checkpoint)、Tower 的 dispatch 是同步选择并立即 call(单一 future)。动态 dispatch 的底层思想跨语言跨框架——本章把 Rust 实现摊开、就能更好理解 LangGraph 在 Python 里同样做的事。
8.8.7 blanket impl 的魔法:闭包如何”免费”变成 Predicate / Picker
§8.3.2 和 §8.4.1 两次都用到了一个模式——任何合适签名的闭包自动满足 Predicate / Picker trait。源码里是 blanket impl:
// filter/predicate.rs:55-65
impl<F, T, R, E> Predicate<T> for F
where F: FnMut(T) -> Result<R, E>, E: Into<BoxError>,
{ ... }
// steer/mod.rs:80-87
impl<S, F, Req> Picker<S, Req> for F
where F: Fn(&Req, &[S]) -> usize,
{ ... }
blanket impl 是 Rust 的一个强大特性:对所有满足某些约束的 F、自动实现某个 trait。效果是——用户根本不需要知道 Predicate 或 Picker trait 存在、只要闭包签名对、就能直接塞进 Filter / Steer。
这种 API 设计的深层意图是”zero-ceremony”——用户不用 impl Predicate for MyStruct、不用 derive(Picker)、不用 wrap(closure)、闭包就是 predicate、闭包就是 picker。对比一些早期框架要求”每个 predicate 必须是一个具名 struct”、Tower 的这个设计让prototype 到生产代码的 friction 几乎为零。
但 blanket impl 有个代价:不能再给 Predicate 加冲突的 blanket impl。比如如果 Tower 想再加一个 impl<T> Predicate<T> for Option<T>(“Some 通过、None 拒绝”)——编译会报 coherence error、因为 Option<T> 可能满足 F: FnMut(T) -> Result<R, E>(理论上)、两个 impl 冲突。这叫 孤儿规则和一致性——Rust 对 blanket impl 的一个硬约束。
Tower 把闭包 blanket impl 这个”名额”留给了最常用的形态(fn / closure)、其他形态用户需要自己写 impl Predicate for MyType。这是一个经过权衡的资源分配——把最便利的 impl 给最常用的场景、其他场景付一点样板代码的代价。
8.8.8 fn layer 静态方法 vs FilterLayer 独立类型
Tower 给 Filter 提供了两种”构造 Layer”的方式:
// 方式 ①(filter/mod.rs:72-74)
impl<T, U> Filter<T, U> {
pub fn layer(predicate: U) -> FilterLayer<U> {
FilterLayer::new(predicate)
}
}
// 方式 ②(filter/layer.rs:38-40)
impl<U> FilterLayer<U> {
pub const fn new(predicate: U) -> Self {
Self { predicate }
}
}
两个都返回 FilterLayer<U>、效果一样。为什么要提供两种?
Filter::layer(pred)——读起来像”给我一个 Filter 层”、从”我知道 Filter 这个类型”出发。适合已经在用 Filter API 的用户。FilterLayer::new(pred)——读起来像”构造一个 FilterLayer”、从”我需要一个 Layer 能插到 ServiceBuilder 里”出发。适合在写 ServiceBuilder 链的用户。
两个入口服务两种阅读习惯——API 设计不是非得极简、适度的”多条路通罗马”能让不同心智模型的用户都感觉顺手。Tower 整体上有这种倾向:ServiceExt::filter、Filter::layer、FilterLayer::new 都能到达同一个状态、用户选自己舒服的。
一个微妙点:Filter::layer 不是 const(没法、因为它调 FilterLayer::new 时传参数)、但 FilterLayer::new 是 const。想在 const 上下文里构造、就必须用 FilterLayer::new。这是API 分叉在 const 支持上的表现——看似相同的两个入口、在编译期能力上有细微差异。
8.8.9 pin_project_lite vs pin_project:为什么 Tower 两个都用
读 Tower 的 future 实现会发现代码里有时用 pin_project_lite::pin_project!、有时用 pin_project::pin_project——比如 AsyncResponseFuture 用的是 pin_project_lite。这两个 crate 看着是一回事、实际上有关键差异:
pin_project(full 版)——依赖 proc-macro、支持完整的语法(包括implblock、attribute macros 等)。编译慢一点、但功能全。pin_project_lite(轻量版)——纯 declarative macro(macro_rules!)、不依赖 proc-macro。编译快、但只支持有限的 struct/enum 形态。
Tower 的工程决策是:能用 lite 就用 lite、不行才用 full。原因是 Tower 是一个基础库、被几乎所有 Rust 异步服务依赖——如果 Tower 强制用 proc-macro、所有上游都得在 build 时加载 proc-macro 编译器、整条依赖链的编译时间翻倍。pin_project_lite 的存在就是专为”库作者”设计的、让 pin_project 的语义渗透到不愿付 proc-macro 税的库里。
AsyncResponseFuture 只涉及简单的 enum 投影(State::Check / State::WaitResponse)、pin_project_lite 完全够用。Tower 其他更复杂的 future(比如 BoxFuture 里的某些 adapter)可能要用 full 版。这个选择每次都是基于实际需求做的——不是盲目追求轻量、也不是无脑用 full。
这个细节展示了 Tower 在编译时开销上的精打细算——不只是运行时性能、构建时体验也算工程预算的一部分。
8.10 本章小结
Filter、MapRequest、Steer 是 Tower 中间件里最纯粹的三个——它们不涉及并发、不涉及时序、不涉及背压管理、只做类型和值的变换。但正是这种”极简纯粹”让它们成为 Axum、Tonic 等上层框架路由层的基石。
本章从源码层面揭示了几个容易忽略的设计细节:
#[inline]在薄封装 Service 上的必要性——不开就是性能税。const fn构造函数的不变性承诺——既允许静态初始化、也传递”无副作用”的契约。opaque_future!宏的 API 稳定性——用 newtype 遮蔽具体 future 类型、留出未来演进的余地。blanket impl对闭包的自动适配——让 predicate 和 picker 零样板、但付出”impl 名额被占”的代价。mem::replace+ clone 保 readiness 的经典模式——AsyncFilter 是这个模式在 Tower 里最清晰的教学实例。where约束的条件 impl——Steer 的 Clone、FilterLayer 的 Layer 都只在需要能力时强制对应 trait。PhantomData<Req>的类型幽灵参数——Steer 需要 Req 出现在类型参数里才能 impl Service、但不需要实际存储值。VecDeque而非Vec——Steer 的 not_ready 需要两端 O(1) 操作、直接对应数据结构选型。
这些细节单独看都是小优化、合起来就是 Tower 作为基础库的密度。在第三部分我们离开 Tower、进入 HTTP 数据模型层——http / http-body 两个 crate 定义了 Hyper、Tonic、Axum 等整个 Rust HTTP 生态的”数据语言”。
8.11 落到你键盘上
本章读完:
- 浏览
tower/src/util/下所有 map_ 文件*——map_err.rs、map_response.rs、map_result.rs、then.rs、and_then.rs。一个接一个读,整套”Service 的 combinators”就清楚了。 - 给自己的项目写一个 Filter——一个简单的 Content-Type 检查,或者一个 User-Agent 解析。用
Filter::new+ 闭包,5 行代码解决,你会体会到”纯函数路由层”的干净。 - 读 tower-http 的
TraceLayer源码——它用 MapRequest + MapResponse 的思路构造了一个自动记录 tracing 的中间件,是 tower-http 里最优雅的中间件之一,不到 300 行。
第二部分(Tower 中间件源码实录)到这里结束。下一章开始我们离开 Tower,进入 HTTP 数据模型层——http crate 和 http-body。它们是 Hyper 和整个 Rust HTTP 生态的”数据语言”。