Appearance
第5章 Timeout / Retry / RateLimit:基础中间件的源码剖析
5.1 三个最小单元
读过前四章的铺垫之后,我们终于可以坐下来读一段真实、完整、跑在 Linkerd 代理和 Tonic gRPC 流量上的工业级中间件源码了。
本章挑三个最基础也是最常用的 Tower 中间件:
- Timeout — "给这次调用 30 秒时间,到点没回就放弃。"
- Retry — "失败了再试一次,最多三次,每次间隔不一样。"
- RateLimit — "每秒最多 100 个请求,超了就排队。"
每一个单独看都简单,但它们在 Tower 源码里的实现各自走了不同的路:Timeout 用的是纯"两 future 赛跑"、Retry 用的是"状态机 + Service clone"、RateLimit 用的是"token bucket + 预分配 Sleep"。读完这三段代码,你会掌握 Tower 中间件的全部三种主要实现范式——后面章节里的 Buffer、LoadShed、Balance 都是这三种范式的变奏。
所有的源码引用来自 tower 0.5.3(commit 251296d),读者可以在本地 git checkout 251296d 之后对照验证。
5.2 Timeout:两 future 赛跑
5.2.1 整体结构
Timeout 的全部源码只有 timeout/mod.rs(70 行)+ future.rs(53 行)+ layer.rs(24 行)+ error.rs(22 行)——加起来不到 200 行。拆下来就三件事:
struct Timeout<T>存 inner + duration。ResponseFuture<T>把两个 future 捆在一起。TimeoutLayer是工厂。
rust
// tower/src/timeout/mod.rs:18-22
#[derive(Debug, Clone)]
pub struct Timeout<T> {
inner: T,
timeout: Duration,
}注意 #[derive(Clone)]——这意味着"Timeout 本身不阻止 Clone"。内层 T 能 clone,Timeout 就能 clone。这点重要,它让 Timeout 能被塞到需要 Service: Clone 的场景里(比如 Retry 或 Buffer 的 worker)。
5.2.2 call 只做一件事:构造 future
rust
// tower/src/timeout/mod.rs:64-69
fn call(&mut self, request: Request) -> Self::Future {
let response = self.inner.call(request);
let sleep = tokio::time::sleep(self.timeout);
ResponseFuture::new(response, sleep)
}call 里没有 await。它构造两个独立的 future:一个是业务的(self.inner.call(request)——这也是 future,不 await),一个是 tokio 的 sleep。然后把它们装进 ResponseFuture——这就是返回值。
这是 Tower 中间件最典型的惯用法——call 是"future 工厂",不做实际工作。所有真正的 await 都发生在返回的 future 里。原因是 call 的签名 &mut self 会对"在 call 里 await"造成严重束缚(第 2 章讨论过)。把 await 延迟到返回 future 里,&mut self 的借用在 call 返回的瞬间结束,后续调度完全自由。
5.2.3 ResponseFuture::poll:顺序决定语义
rust
// tower/src/timeout/future.rs:38-52
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// First, try polling the future
match this.response.poll(cx) {
Poll::Ready(v) => return Poll::Ready(v.map_err(Into::into)),
Poll::Pending => {}
}
// Now check the sleep
match this.sleep.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(Elapsed(()).into())),
}
}十五行代码,全部加起来。但每一行都值得琢磨。
先 poll response,再 poll sleep——顺序反过来就错了。为什么?如果先 poll sleep 返回 Ready(时间到了),那不管 response 是不是也恰好在这一纳秒完成,都直接报超时——业务成功的结果会被白白丢掉。先 poll response,给成功的路径最后一次机会。响应已经到了的话,直接 Ready 出去,sleep 变成"永远不会完成但也不会再被 poll"的 future 挂在那里,随 ResponseFuture 一起 drop。
map_err(Into::into):内层 error 被转换成 BoxError(Box<dyn Error + Send + Sync + 'static>)。这是因为 Timeout 要和自己产生的 Elapsed 错误合成一个统一的错误类型——整个中间件对外承诺 type Error = BoxError。上一章讲 Layer 组合时提过这个权衡:错误类型擦除是中间件组合性的必要代价。
pin_project! 的作用:#[pin] response、#[pin] sleep 两个字段在 self.project() 里被安全地拿出 Pin<&mut T>。这是因为 tokio::time::Sleep 是 !Unpin 的(它内部持有一个 timer registration 的指针,地址必须稳定),不能被安全地移动。pin_project_lite 宏帮你生成正确的投影代码,让你以声明式的方式描述"哪些字段要 pin"。
这个机制的细节在卷三《Rust 编译器与运行时揭秘》第 10 章(Pin / Waker / Future)里讲透过。如果你没读过,现在的直觉是:pin_project! 是 Pin 系统的"声明式模板",你告诉它"这些字段需要被 pin",它帮你生成 unsafe 代码保证安全。
5.2.4 为什么没有"Timer cancel"?
老手可能会问:tokio Sleep 是不是一个会注册到 Time Driver 全局时间轮的定时器?Timeout 如果业务先完成了,sleep 有没有被 cancel?
答案:sleep 的 cancel 是隐式的——ResponseFuture 被 drop 时,sleep 字段被 drop,tokio 的 Sleep::drop 会把自己从时间轮上摘掉。你不需要手动调用 sleep.cancel()、sleep.stop() 之类的 API。
这又是 Rust 的 RAII 在起作用——资源清理是"通过析构自然发生的"。Tower 的代码里看不到任何显式的 cancel/release——但每一次 ResponseFuture 的 drop(包括因为超时丢弃、因为调用方放弃、因为父任务被取消),都会把 sleep 从 tokio Time Driver 上摘掉。卷四《Tokio 源码深度解析》第 11 章(Time Driver 与分层定时器轮)里讲过这个机制——定时器轮里每个 entry 是一个侵入式链表节点,drop 触发 unlink。
5.2.5 TimeoutLayer:把上面这堆包成一个 Layer
rust
// tower/src/timeout/layer.rs 精简版
#[derive(Debug, Clone, Copy)]
pub struct TimeoutLayer {
timeout: Duration,
}
impl TimeoutLayer {
pub const fn new(timeout: Duration) -> Self { TimeoutLayer { timeout } }
}
impl<S> Layer<S> for TimeoutLayer {
type Service = Timeout<S>;
fn layer(&self, service: S) -> Self::Service {
Timeout::new(service, self.timeout)
}
}TimeoutLayer 是 Copy——Duration 和 TimeoutLayer 都是 POD-like 结构,没有任何堆分配。这让它能在 ServiceBuilder 里自由移动、多次 clone,不产生任何运行时开销。
Timeout 的故事到这里完结。你如果把 timeout/ 目录下的四个文件连起来读一遍,不到 200 行代码构成一个生产级、零分配、正确处理取消语义的工业级超时中间件。这就是 Tower 的品味——每一行都精简到不能再少。
5.3 Retry:状态机 + 请求克隆
Retry 比 Timeout 复杂得多。因为"重试"这件事本质上包含三种状态:
- Called:正在跑本次请求。
- Waiting:请求失败了,按 policy 等 N 毫秒再重试。
- Retrying:等够时间了,等 inner service 再次就绪,然后重新 call。
这是一个清晰的状态机,Tower 把它编码在一个 enum State:
rust
// tower/src/retry/future.rs:27-43
pin_project! {
#[project = StateProj]
#[derive(Debug)]
enum State<F, P> {
Called { #[pin] future: F },
Waiting { #[pin] waiting: P },
Retrying,
}
}三个变体分别对应三种状态:Called 持有本次 call 的 future、Waiting 持有 policy 返回的等待 future、Retrying 没字段(是瞬时状态,只负责做一次 poll_ready 然后再切回 Called)。
5.3.1 Policy trait:策略即接口
rust
// tower/src/retry/policy.rs:46-90
pub trait Policy<Req, Res, E> {
type Future: Future<Output = ()>;
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>)
-> Option<Self::Future>;
fn clone_request(&mut self, req: &Req) -> Option<Req>;
}两个方法定义了 Retry 的所有策略空间:
retry:拿到这次的 req 和结果,决定"要不要再试"。返回None表示"到此为止"、返回Some(fut)表示"等这个 future 完成之后重试"。clone_request:给 Retry 一个克隆请求的方式。有些请求能克隆(比如只读 GET),有些不能(比如流式上传),返回 None 表示"没法克隆"——Retry 就退化成"跑一次就返"。
第一个方法拿的是 &mut Req 和 &mut Result——意味着 Policy 可以修改请求或结果。文档里举了例子:每次重试时给请求加一个 X-Retry-Count: 1 头(修改 req),或者在最后一次失败时把错误类型换成 RetriesExhausted(修改 result)。这种"可变访问"比只读更强大,是一个有意的设计。
5.3.2 为什么 Retry 需要 S: Clone
rust
// tower/src/retry/mod.rs:72-76
impl<P, S, Request> Service<Request> for Retry<P, S>
where
P: Policy<Request, S::Response, S::Error> + Clone,
S: Service<Request> + Clone,
{S: Clone——inner service 必须能克隆。为什么?看 ResponseFuture 的字段:
rust
// tower/src/retry/future.rs:13-24
pub struct ResponseFuture<P, S, Request>
where P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
#[pin] retry: Retry<P, S>, // ← 整个 Retry 被存进 future
#[pin] state: State<S::Future, P::Future>,
}ResponseFuture 里存了一份完整的 Retry<P, S> 本身——也就是一份 S。为什么?因为重试的时候需要对 service poll_ready 再 call 一次——调用方原来持有的 &mut Retry 在 call 返回后已经放开了,future 内部必须自己持有 service 才能后续再调用。
这就是 Retry 需要 S: Clone 的原因:每次 call 都要克隆一份 service 存进 future。如果 S 本身太大(比如持有大量配置),clone 成本会累计。Tower 对此的建议是在 Retry 外面再套一层 Buffer——Buffer 本身是 Clone(内部是 Arc+mpsc),把重的 Service 藏在 Arc 后面共享。
Retry::call 的实现完美对应这个思路:
rust
// tower/src/retry/mod.rs:88-93
fn call(&mut self, request: Request) -> Self::Future {
let cloned = self.policy.clone_request(&request);
let future = self.service.call(request);
ResponseFuture::new(cloned, self.clone(), future)
}先 policy.clone_request(&request) 尝试克隆请求(可能失败)、self.service.call(request) 启动第一次调用(使用原始请求)、然后构造 ResponseFuture(cloned_request, self.clone(), first_future)——整个 self(包括 policy 和 service)被 clone 进 future。
5.3.3 状态机:ResponseFuture::poll
这是 Retry 最关键的 30 行:
rust
// tower/src/retry/future.rs:70-118 精简版
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.state.as_mut().project() {
StateProj::Called { future } => {
let mut result = ready!(future.poll(cx));
if let Some(req) = &mut this.request {
match this.retry.policy.retry(req, &mut result) {
Some(waiting) => this.state.set(State::Waiting { waiting }),
None => return Poll::Ready(result),
}
} else {
return Poll::Ready(result); // 请求没法 clone,不重试
}
}
StateProj::Waiting { waiting } => {
ready!(waiting.poll(cx));
this.state.set(State::Retrying);
}
StateProj::Retrying => {
ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
let req = this.request.take()
.expect("retrying requires cloned request");
*this.request = this.retry.policy.clone_request(&req);
this.state.set(State::Called {
future: this.retry.as_mut().project().service.call(req),
});
}
}
}
}一个 loop 包着一个 match——这是 Rust 状态机 future 的标准结构。每次外层 poll 被调用时:
- Called:poll 当前 future。如果还没完成(
ready!返回Pending),直接返回 Pending;完成了,拿到 result,问 policy 要不要重试。 - Waiting:poll 等待 future。完成之后切到 Retrying。
- Retrying:先
poll_ready(拿许可),然后拿出保存的request,再克隆一份(因为重试后可能还需要再次重试,所以留一份在 request 字段),call(req)启动新一次请求,切回 Called。
这个状态机最精巧的地方是:整个重试链在同一个 ResponseFuture 的生命周期里被驱动。调用方只看到一个 async fn retry_service.call(req).await——背后可能发生了十次网络失败、等待、重试,全部封闭在 future 内部。
5.3.4 poll_ready 的尴尬假设
Retry::poll_ready 的实现是:
rust
// tower/src/retry/mod.rs:81-86
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// NOTE: the Future::poll impl for ResponseFuture assumes that
// Retry::poll_ready is equivalent to Ready.service.poll_ready. ...
self.service.poll_ready(cx)
}直接透传到 inner。看起来合理,但注释里藏了一个微妙问题:
the Future::poll impl for ResponseFuture assumes that Retry::poll_ready is equivalent to Ready.service.poll_ready.
翻译:ResponseFuture::poll 在 Retrying 状态里,直接 poll 了 service 的 poll_ready(第 106 行 this.retry.as_mut().project().service.poll_ready(cx))——而不是 poll 了整个 Retry::poll_ready。之所以这样写,是因为 Retry::poll_ready(&mut self) 要求 self: &mut Retry,而在 Future::poll 里你只有 Pin<&mut ResponseFuture>,想拿到 &mut Retry 需要 Retry: Unpin——但 Retry 被 #[pin] 标注了。
解决办法是"绕过 Retry 的 poll_ready,直接 poll 底层 service 的 poll_ready"——这在语义上正确当且仅当 Retry::poll_ready 是 self.service.poll_ready 的透传。注释里这个 NOTE 是在告诉未来的维护者:"如果你改 Retry::poll_ready 的行为,要记得同步改 ResponseFuture::poll"。
这是一个工程细节——当你跨越 Pin 做状态机时,常常需要在某些方法上做"绕过"。Tower 的代码里坦白记录了假设前提,让 future 的维护者不会踩坑。写自己的 !Unpin future 时,这是值得借鉴的文档习惯。
5.4 RateLimit:token bucket + 预分配 Sleep
RateLimit 又是另一种实现范式。它要做的事情听起来很简单:"每秒最多 100 次"——但真要写对、写快、不抖动,需要一些技巧。
5.4.1 Rate 的定义
rust
// tower/src/limit/rate/rate.rs:1-29
#[derive(Debug, Copy, Clone)]
pub struct Rate {
num: u64,
per: Duration,
}
impl Rate {
pub const fn new(num: u64, per: Duration) -> Self {
assert!(num > 0);
assert!(per.as_nanos() > 0);
Rate { num, per }
}
pub fn num(&self) -> u64 { self.num }
pub fn per(&self) -> Duration { self.per }
}Rate 就是"N 个请求 / duration"。没有 "per second" 这种硬编码——你可以 Rate::new(100, Duration::from_secs(1))(100/s)也可以 Rate::new(5, Duration::from_millis(100))(50/s 但 burst 更紧)。这个分离是有意识的——有些场景需要平滑节流(小周期),有些场景接受突发(大周期)。
5.4.2 状态机:Ready vs Limited
rust
// tower/src/limit/rate/service.rs:13-25
pub struct RateLimit<T> {
inner: T,
rate: Rate,
state: State,
sleep: Pin<Box<Sleep>>,
}
enum State {
Limited,
Ready { until: Instant, rem: u64 },
}两个状态:
Ready { until, rem }:当前周期到until,还剩rem个许可。Limited:当前周期许可用完,正在等下一个周期开始。
注意那个 sleep: Pin<Box<Sleep>> 字段——它在构造时就被预分配好:
rust
// tower/src/limit/rate/service.rs:29-45
pub fn new(inner: T, rate: Rate) -> Self {
let until = Instant::now();
let state = State::Ready { until, rem: rate.num() };
RateLimit {
inner, rate, state,
// The sleep won't actually be used with this duration, but
// we create it eagerly so that we can reset it in place rather than
// `Box::pin`ning a new `Sleep` every time we need one.
sleep: Box::pin(tokio::time::sleep_until(until)),
}
}注释解释了一个性能优化:Sleep 是 !Unpin 的,每次用都得 Box::pin 一个新的——那会频繁分配堆内存。作者选择在构造时预分配一个,后面需要重置时调用 Sleep::reset(new_deadline) 原地改截止时间,avoiding allocation hot path。
tokio Sleep::reset 的实现是 O(1) 的——它会把当前的 timer entry 从时间轮里摘下来、改 deadline、重新插入。这比 Box::pin(new_sleep) 快得多(不经过堆分配器,也不经过时间轮的创建开销)。这种"预分配 + 原地重置"是 hot-path 中间件的通用技巧。
5.4.3 poll_ready 的逻辑
rust
// tower/src/limit/rate/service.rs:71-88
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.state {
State::Ready { .. } => return self.inner.poll_ready(cx),
State::Limited => {
if Pin::new(&mut self.sleep).poll(cx).is_pending() {
tracing::trace!("rate limit exceeded; sleeping.");
return Poll::Pending;
}
}
}
// 走到这里意味着: 要么本来就 Ready,要么 Limited 状态里 sleep 到期了
self.state = State::Ready {
until: Instant::now() + self.rate.per(),
rem: self.rate.num(),
};
self.inner.poll_ready(cx)
}逻辑:
- Ready 状态:直接透传到 inner。不减 remainder——减 remainder 发生在
call里。 - Limited 状态:poll 那个预分配的 sleep;没到时间返回 Pending。到时间了,周期开始——重置 state 为 Ready,rem 重新变为 rate.num(),然后继续 poll inner。
注意这里有一个小缺陷:如果 Limited 状态下 sleep 刚好到期,此时会立刻进入新周期——但新周期的"起点"被设成 Instant::now(),不是原来的 until。这意味着如果你从 Limited 走出来的时刻是上一周期结束后 500ms(tokio scheduling 有延迟),新周期的 500ms 损失不会补回——rate 会稍稍"漏"一点。这在实践中基本不可感知,但理论上 rate limiter 不是数学严格的 N/duration。
5.4.4 call 的逻辑
rust
// tower/src/limit/rate/service.rs:90-117
fn call(&mut self, request: Request) -> Self::Future {
match self.state {
State::Ready { mut until, mut rem } => {
let now = Instant::now();
// 周期过期了?开新周期
if now >= until {
until = now + self.rate.per();
rem = self.rate.num();
}
if rem > 1 {
rem -= 1;
self.state = State::Ready { until, rem };
} else {
// 用完这个许可,下一次 call 前必须等 sleep
self.sleep.as_mut().reset(until);
self.state = State::Limited;
}
self.inner.call(request)
}
State::Limited => panic!("service not ready; poll_ready must be called first"),
}
}几个小细节:
if now >= until:这一段是"周期自动续"——如果poll_ready后很久才调用call,原周期已经过去,直接开一个新周期。这对"偶发流量"场景友好——偶尔来一个请求永远不会被 rate limit 误伤。if rem > 1:注意是>1,不是>0。这一次 call 消费掉的就是最后一个,状态切到Limited。如果写成>0,就会先减到 0 再在下一次 call 时发现没许可 panic——逻辑上等价但边界比较诡异,作者选择了更清晰的"发现快用完就提前切态"。panic!("service not ready; poll_ready must be called first"):协议违反的标准处理——call在Limited状态下永远不应该被触达(poll_ready会挂起调用方),如果真的发生,说明上游代码没按协议走。
5.4.5 RateLimit vs ConcurrencyLimit:概念边界
两个名字长得像,实际上是完全不同的约束。
| ConcurrencyLimit | RateLimit | |
|---|---|---|
| 关心的维度 | 同时 in-flight 请求数 | 单位时间请求数 |
| 典型用途 | 保护 CPU/内存资源 | 保护依赖的下游(API quota、第三方限制) |
| 背压机制 | Semaphore 许可 | 周期计数 + Sleep |
| 慢请求影响 | 占住许可直到完成 | 不影响(只看到达速率) |
| 快请求影响 | 可以并发 N 个 | 可以突发一整个 rate.num |
实操建议:两个一起用。.concurrency_limit(10).rate_limit(100, Duration::from_secs(1)) 意味着"每秒最多 100 个新请求、同时 in-flight 不超过 10 个"。前者保护下游不被速率打爆,后者保护自己不被慢请求堆爆。
5.5 三种范式的总结
我们刚刚读的三个中间件其实对应三种不同的实现范式:
| 中间件 | 范式 | 关键技术 |
|---|---|---|
| Timeout | 双 future 赛跑 | pin_project + 顺序 poll |
| Retry | 状态机 + service clone | enum State + Loop+Match + S: Clone |
| RateLimit | 状态机 + 预分配 timer | Box::pin once + Sleep::reset |
Tower 里后面出现的中间件几乎都能归到这三种:
- Buffer:状态机 + mpsc(类似 Retry 范式)
- LoadShed:立即决定(Timeout 范式的退化版,只 poll 一次)
- ConcurrencyLimit:双阶段 Permit(是 RateLimit 的简化——没有时间维度)
- Balance:多 service + ready cache(Retry 范式的扩展)
当你给自己的代码写 Tower 中间件时,第一步不是"抄一个 Timeout 的模板"——而是先想**"我要做的事属于哪一种范式"**。范式决定了你的 type Future 要不要做状态机、要不要 clone inner、要不要持有 timer。
5.6 和卷四《Tokio》的连接
这三个中间件所有"异步等待"的底层能力都来自 tokio。Timeout 用 tokio::time::sleep、RateLimit 用 tokio::time::sleep_until——两者都建立在卷四《Tokio 源码深度解析》第 11 章讲的分层定时器轮上。
rust
tokio::time::sleep(Duration::from_secs(30))
↓
// tokio::time::Sleep::new_timeout
↓
// 向 Time Driver 注册一个 deadline
↓
// Time Driver 以 "hierarchical timing wheel" 组织所有定时器
↓
// Waker 到期触发,挂起的 task 被唤醒理解了卷四讲的时间轮,你就知道为什么 tokio::time::sleep(Duration::from_millis(1)) 和 tokio::time::sleep(Duration::from_secs(3600)) 的开销几乎一样——注册代价是 O(1)、到期检查是 O(wheel_slots)。这就是为什么 RateLimit 即使设的是 Rate::new(1000, Duration::from_millis(1))(1M QPS)性能也不会崩——每个 Sleep 的注册只是把一个 entry 挂到时间轮的某个 bucket 上。
Retry 的 policy 返回的 Future 也可以利用 tokio time——一个典型的 "exponential backoff" policy 就是每次失败返回一个越来越长的 tokio::time::sleep。Tower 提供了一个辅助 retry::backoff 模块,里面就是这套模式的标准实现。
5.7 落到你键盘上:写一个自己的组合
给一个实战场景:你在写一个对第三方 OCR API 的调用层。约束是:
- 这个 API 最多 5 QPS(第三方限制);
- 失败(包括 429/500/504)要重试最多 3 次,每次间隔翻倍;
- 每次调用最多等 10 秒,超了就放弃;
- 全链路最多同时跑 20 个任务(保护自己的内存)。
正确的 Tower 栈:
rust
use std::time::Duration;
use tower::ServiceBuilder;
let ocr_service = ServiceBuilder::new()
.concurrency_limit(20) // 最外层:保护自己
.timeout(Duration::from_secs(10)) // 每次调用超时
.layer(RetryLayer::new(ExponentialPolicy::new( // 重试 + backoff
3,
Duration::from_millis(200),
)))
.rate_limit(5, Duration::from_secs(1)) // 最内层:对齐第三方限流
.service(ocr_client);这一串是有顺序的——外层先处理:先过 concurrency_limit 限总并发、再应用 timeout(整条链的 deadline)、再进 retry(可能重试多次)、最后 rate_limit(保证每次去打第三方都不超 5/s)。
几个细节值得品:
- Rate_limit 放在最内层。放外层的话,rate_limit 把请求"吃进"后 timeout 就开始计时——但请求还得等 rate_limit 的 sleep,可能等完 sleep 就超时了。放内层让 timeout 包住"重试 + 限流"整个 window。
- Retry 在 timeout 里面。第一次失败后等 200ms、第二次失败后等 400ms——这些等待时间都算在外层 timeout 里。超过 10 秒直接整条放弃,不会卡在第 3 次重试的长 backoff 里。
- Concurrency_limit 放最外层。它是"系统容量"保护——不管业务做得多复杂,总并发超过 20 就挂起后续调用方。
这种"正确的组合顺序"是 Tower 工程的精髓——每一层放在哪里,决定了"超时从什么时候开始算"、"重试是否穿越其他中间件"、"背压信号是在哪一层被吸收"。没有绝对的最优解,但有跟工程目的匹配的合理组合。
5.8 小结
这一章把三个最常用的 Tower 中间件读到底:
- Timeout——双 future 赛跑,全部源码 < 200 行。
- Retry——状态机 + Service clone,Policy trait 抽象了策略空间。
- RateLimit——token bucket + 预分配 Sleep,减少 hot-path 分配。
每一个都是一种可借鉴的中间件实现范式。
落到你键盘上:
- 打开
tower/src/retry/backoff.rs读 Budget 策略。这是 Linkerd 生产环境用的 retry budget 算法,比朴素 ExponentialBackoff 复杂得多,但能防止重试风暴失控——真实世界的 retry 几乎都要和 budget 配合。 - 实验 rate_limit vs concurrency_limit 的组合。在本地起一个 mock 下游服务(固定延迟 200ms),front 挂上
ServiceBuilder::new().rate_limit(5,1s).concurrency_limit(10).service(...)vs 反过来的.concurrency_limit(10).rate_limit(5,1s).service(...)——观察它们在高并发 / 低并发 / burst 情况下的差异。 - 读 tokio 的 Sleep::reset 实现(
tokio-1.46/tokio/src/time/driver/sleep.rs)。只有几十行,能看到它怎么把 entry 从时间轮摘下来、改 deadline、插回去。
下一章我们读 Buffer 和 LoadShed——它们代表 Tower 处理"过载"的两种截然不同的哲学:排队 vs 立即拒绝。