Hyper 与 Tower:工业级 HTTP 栈

第6章 Buffer / LoadShed / ConcurrencyLimit:容量工程

作者 杨艺韬 · 10,769 字

第6章 Buffer / LoadShed / ConcurrencyLimit:容量工程

6.1 当服务饱和时,三种活法

上一章我们把三个”按规则节制”的中间件读完了——Timeout、Retry、RateLimit。这一章要讨论的是另一组问题:

当服务真的满了,你选择怎么办?

这是分布式系统里最古老也最哲学的问题之一。直觉上大多数人会说”限流呗”,但真正坐下来审视细节,会发现”服务满了”可以用截然不同的方式应对:

  • 排队等(Buffer)——“我先把请求记下来,等服务有空时依次处理。”
  • 直接拒(LoadShed)——“现在忙不过来,请你 502 回去吧,等会再试。”
  • 挂起等(ConcurrencyLimit)——“别急,你的请求先挂着,轮到你的时候我再接。”

三种策略对应三种真实场景。不同场景下选错策略的代价可能非常昂贵:排队可能导致雪崩拒绝可能丢掉容易处理的请求挂起可能让上游大量 TCP 连接堆积

Tower 把这三种策略都实现成了中间件:BufferLoadShedConcurrencyLimit。这一章我们把前两个读透,并和上一章已经读过的 ConcurrencyLimit(第 4 章也讲过它的 permit 模型)合在一起对比。三个加起来不到 400 行代码,但它们是整个 Rust 生产环境容量工程的基石。

源码引用来自 tower 0.5.3 (tower/src/buffer/tower/src/load_shed/)。

6.2 Buffer:把 mpsc 和 oneshot 缝起来

Buffer 的核心观察是:tokio 的 mpsc::channel(bound) 本身就是一个背压感知的队列。消费端处理不过来时,生产端 send() 会返回 Pending——调用方可以在 task 层面自然挂起。Tower 的 Buffer 就是”把 mpsc 通道嫁接到 Service 接口上”。

6.2.1 两半结构

// tower/src/buffer/service.rs:18-22
pub struct Buffer<Req, F> {
    tx: PollSender<Message<Req, F>>,
    handle: Handle,
}

Buffer 本身不持有被包裹的 Service——它只持有一个 channel 的发送端和一个 error handle。真正的 Service 活在另一个地方:Worker<T, Request>,一个独立的 tokio task。

这是 Buffer 最核心的架构选择:把 Service 消费和请求分发拆成两个独立的 async 实体

         ┌──────────────┐            ┌──────────┐
         │ Buffer       │            │ Worker   │
caller ──▶│ (Clone)     │──mpsc──▶  │          │──call──▶ inner Service
         │ (Clone)     │            │          │
         └──────────────┘            └──────────┘
  • 调用方(左侧)拿一个 Buffer 实例,调用 poll_ready + call
  • poll_ready 会对 mpsc 的 sender 做 poll_reserve——预留一个槽位。
  • call 构造一个 Message { request, span, tx_oneshot },通过 mpsc 送给 worker。返回值是一个等 rx_oneshot 的 future。
  • Worker(右侧)在自己的 task 里 poll_recv 收请求,对 inner service 做 poll_ready + call,把 inner 的 response future 通过 oneshot 送回。

这是一个经典的 actor 模式——worker 是 actor,Buffer 是”client handle”。调用方只通过 handle 送消息,内部 actor 串行处理。

6.2.2 为什么 Buffer 能让任何 Service 变 Clone

// tower/src/buffer/service.rs:133-144
impl<Req, F> Clone for Buffer<Req, F>
where Req: Send + 'static, F: Send + 'static,
{
    fn clone(&self) -> Self {
        Self {
            handle: self.handle.clone(),
            tx: self.tx.clone(),
        }
    }
}

Buffer<Req, F> 可以 Clone——不管底下的 Service 能不能 Clone。这在 Retry(需要 S: Clone)、Balance(需要给每个 endpoint 一个 clone)等场景里至关重要。

工作原理:clone 的只是 PollSenderHandle——mpsc 的 sender 和错误句柄都是廉价可 clone 的引用。多个 Buffer clone 共享同一个底层 worker,请求通过 channel 汇聚到 worker 串行处理。

在第 5 章我们读 Retry 源码时已经提到过:如果 inner service 自身不 Clone(比如 AuthClient,内部有大型配置),可以套一层 Buffer:

let svc = ServiceBuilder::new()
    .layer(RetryLayer::new(policy))
    .buffer(100)              // ← 在这里 Clone 藏进 Arc+mpsc
    .service(heavy_auth_client);

Retry 只看到”一个可 Clone 的 Buffer”——实际上底下 heavy_auth_client 只有一份,所有 Retry 的 clone 都通过 channel 排队进同一个 worker。

6.2.3 poll_ready:向 channel 要槽位

// tower/src/buffer/service.rs:97-108
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    // First, check if the worker is still alive.
    if self.tx.is_closed() {
        return Poll::Ready(Err(self.get_worker_error()));
    }

    // Poll the sender to acquire a permit.
    self.tx
        .poll_reserve(cx)
        .map_err(|_| self.get_worker_error())
}

PollSender::poll_reserve(cx) 是 tokio-util 提供的一个 API,把 tokio 的 mpsc::Sender::reserve() 异步能力包装成 poll_fn 形式。语义是”异步地为这一 Buffer 客户端预留一个 mpsc 槽位”。

Buffer 的 poll_ready 做两件事:

  1. 检查 worker 是否还活着(channel 是否关闭)。如果 worker 已经挂掉(可能因为底层 Service 报错),直接返回”我坏了”的错误。
  2. poll_reserve 一个 channel 槽位。如果 mpsc 已经满(达到 bound),返回 Pending——调用方被挂起,等有槽位再被 waker 唤醒。

这一步完美接上了 Buffer 的背压语义:当 worker 来不及处理、channel 里积累到 bound 个未处理消息时,新的 poll_ready 会 Pending——上游的 Tower 栈可以根据这个信号决定是继续挂起还是做别的决定。

6.2.4 call:送消息 + 等响应

// tower/src/buffer/service.rs:110-131
fn call(&mut self, request: Req) -> Self::Future {
    let span = tracing::Span::current();
    let (tx, rx) = oneshot::channel();

    match self.tx.send_item(Message { request, span, tx }) {
        Ok(_) => ResponseFuture::new(rx),
        Err(_) => {
            tracing::trace!("buffer channel closed");
            ResponseFuture::failed(self.get_worker_error())
        }
    }
}

几个细节:

  • tracing::Span::current():把当前 tracing span 捎带到 worker。不这么做的话,worker task 里的 tracing 事件不会关联到发起请求的 span。这是一个跨 task tracing 的惯用技巧——span 是 Clone 的,传入 message 里,worker 收到后 _guard = msg.span.enter() 进入这个 span。
  • (tx, rx) = oneshot::channel():每个请求对应一个一次性 channel。worker 把 inner service 返回的 response future 通过 tx 送回,调用方在 rx 上等。
  • send_item:这里已经通过 poll_reserve 拿到了槽位,所以 send 是同步成功的——不可能阻塞,不可能 Pending。协议保证 call 前必走 poll_readypoll_ready 拿到槽位后一直到 call 都是”已预留”状态。

6.2.5 Worker:真正的循环

// tower/src/buffer/worker.rs:147-206 精简
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    if self.finish { return Poll::Ready(()); }
    loop {
        match ready!(self.poll_next_msg(cx)) {
            Some((msg, _first)) => {
                let _guard = msg.span.enter();
                if let Some(ref failed) = self.failed {
                    let _ = msg.tx.send(Err(failed.clone()));
                    continue;
                }
                match self.service.poll_ready(cx) {
                    Poll::Ready(Ok(())) => {
                        let response = self.service.call(msg.request);
                        let _ = msg.tx.send(Ok(response));
                    }
                    Poll::Pending => {
                        // 把消息放回去,挂起 worker
                        drop(_guard);
                        self.current_message = Some(msg);
                        return Poll::Pending;
                    }
                    Poll::Ready(Err(e)) => {
                        let error = e.into();
                        drop(_guard);
                        self.failed(error);
                        let _ = msg.tx.send(Err(self.failed.as_ref().unwrap().clone()));
                    }
                }
            }
            None => {
                self.finish = true;
                return Poll::Ready(());
            }
        }
    }
}

Worker 的 Future::poll 本身是一个 loop——每次 poll 尽可能消费所有待处理消息,直到要么所有消息处理完、要么 service 不 ready。

核心三段式:

  1. 拿下一个消息poll_next_msg)。如果 channel 关了(所有 Buffer 实例都 drop 了),返回 None。
  2. 检查 service 状态:如果已经 failed,直接把错误送回 oneshot。
  3. poll_ready + call
    • Ready(Ok):调用 inner.call(req),把 response future 通过 oneshot 送回。注意!这里送回的是 future,不是 response——真正的 await 发生在调用方的 ResponseFuture
    • Pending把消息放回 self.current_message,return Pending。下次 worker 被 wake 时再尝试。
    • Ready(Err):service 永久坏了。走 failed() 流程——关闭 channel、把错误广播给所有还在等的 oneshot。

current_message 字段非常巧妙——它解决了”已经从 channel 取出但 service 还没就绪”的尴尬。不能把消息再送回 channel(顺序就乱了),必须缓存在 worker 里等 service 就绪再 call。

6.2.6 cancel 的三种路径

Buffer 支持三种请求取消:

  1. 调用方 drop 了 ResponseFuture(还没送到 worker 就不想要了)。worker 下次 poll_next_msg 时会检查 msg.tx.is_closed()——如果调用方已经 drop 了 rx,oneshot 的 tx 会变成 closed,worker 把这条消息直接跳过:
// worker.rs:81-91
if !msg.tx.is_closed() {
    return Poll::Ready(Some((msg, false)));
}
tracing::trace!("dropping cancelled buffered request");
  1. Worker 报错:调用 self.failed(error),后续所有消息收到错误。
  2. 所有 Buffer clone drop:channel 关闭,worker 收到 None,正常结束。

这三种路径不需要额外的”cancel 信号”通道——Rust 的 drop 语义和 oneshot/mpsc 的 closed 检测完整覆盖。

6.2.7 Buffer 的 bound 该设多少

/// A note on choosing a `bound`
///
/// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time.

这段文档里的警告值得强调:poll_ready 返回 Ready 后,那个槽位就被”预留”了。如果调用方 poll_ready 后没有立刻 call(比如被 cancel 了),那个槽位还占着——直到 PollSender 的 drop 释放。

实战推论:bound 要 >= 预期的并发 poll_ready 调用方数量。如果你有 200 个并发 task 都在 buffer_svc.ready().await,bound 设成 100,会有 100 个 task 被堵在 poll_ready 的 Pending 上——正确,但不是你想要的”先全部 ready,然后慢慢处理”的效果。

一般经验:bound >= 2 × 预期峰值并发数。留出冗余给那些”ready 了但还没 call”的短暂态。

6.3 LoadShed:饱和时立即拒绝

Buffer 的哲学是”排队”,LoadShed 的哲学刚好相反——“满了就拒”。整段源码加起来不到 150 行。

6.3.1 整体结构

// tower/src/load_shed/mod.rs:17-20
#[derive(Debug)]
pub struct LoadShed<S> {
    inner: S,
    is_ready: bool,
}

两个字段:inner + 一个 bool。就这么多。

is_ready 是一个缓存——它记录”上一次 inner.poll_ready 返回 Ready 了吗”。这个细节让 LoadShed 的整个哲学可行。

6.3.2 颠覆性的 poll_ready

// tower/src/load_shed/mod.rs:43-54
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    // We check for readiness here, so that we can know in `call` if
    // the inner service is overloaded or not.
    self.is_ready = match self.inner.poll_ready(cx) {
        Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
        r => r.is_ready(),
    };

    // But we always report Ready, so that layers above don't wait until
    // the inner service is ready (the entire point of this layer!)
    Poll::Ready(Ok(()))
}

看这一行代码:Poll::Ready(Ok(())) —— LoadShed 的 poll_ready 永远返回 Ready

这是 LoadShed 的灵魂:它违反了 Tower 协议的精神(poll_ready Ready 应该代表”真的能接”),但这个违反是有意的——LoadShed 的存在就是为了绕过背压,让上层调用方永远不挂起。

具体流程:

  1. 内部先 poll 一次 inner,把结果存在 self.is_ready 里。
  2. 无论 inner 是 Ready 还是 Pending,都向调用方报告 Ready。
  3. 如果 inner 报错,这种”不可恢复”的信号还是要传上去——否则调用方会以为还能用。

6.3.3 call:真相的时刻

// tower/src/load_shed/mod.rs:56-64
fn call(&mut self, req: Req) -> Self::Future {
    if self.is_ready {
        // readiness only counts once, you need to check again!
        self.is_ready = false;
        ResponseFuture::called(self.inner.call(req))
    } else {
        ResponseFuture::overloaded()
    }
}

到了 call,LoadShed 才查看上一次 poll_ready 缓存的结果:

  • is_ready == true:inner 那时是 ready 的——转交请求。注意 self.is_ready = false——一次许可只能用一次,下次 call 必须再 poll_ready。
  • is_ready == false:inner 那时是 pending 的——直接构造一个”已失败”的 future,里面装着 Overloaded 错误。调用方 await 这个 future,立即拿到 Err(Overloaded)

overloaded future”的实现:

// tower/src/load_shed/future.rs:22-31
pin_project! {
    #[project = ResponseStateProj]
    enum ResponseState<F> {
        Called { #[pin] fut: F },
        Overloaded,
    }
}
// poll 实现:
match self.project().state.project() {
    ResponseStateProj::Called { fut } => fut.poll(cx).map_err(Into::into),
    ResponseStateProj::Overloaded => Poll::Ready(Err(Overloaded::new().into())),
}

两个变体的 future——Called 是真实工作的 future,Overloaded 是一个立即 Ready 的常量 future。整个”拒绝”路径是零分配的(只构造一个 unit variant),性能代价小到可以忽略。

6.3.4 LoadShed 的用场

读过 poll_ready 永远 Ready 这个设计之后,你可能会担心 LoadShed 是不是”把 Tower 协议破坏了”。确实——但只在 LoadShed 自己这一层。外层看到的是一个”总是就绪”的服务,内层的真实就绪信号被 LoadShed 吞掉并转化成 Overloaded 错误。

合适的使用场景:HTTP 服务端。HTTP 请求到达之后,你很少希望连接层 pending——更合理的是立即响应 503。用户自然会在客户端 retry 或者显示”服务繁忙”。

实战组合:

let svc = ServiceBuilder::new()
    .load_shed()                     // 上层始终 Ready → 请求永不等待
    .concurrency_limit(100)          // 内层有真实容量限制
    .service(expensive_handler);

concurrency_limit 会在 100 个 in-flight 时报告 Pending;这个 Pending 被 load_shed 吞掉——转换成 Err(Overloaded) 返回给调用方。调用方(比如 hyper 的 HTTP 层)立刻拿到错误,产生 HTTP 503 返回客户端。整个链条:“服务满 → 立即 503”,不挂起、不排队。

不合适的使用场景:客户端代码。客户端调用远端服务,绝大多数情况下你希望”等一下,等远端有空”——这恰好是 ConcurrencyLimit 或 Buffer 的作用。LoadShed 会让客户端直接放弃,可能导致大量无意义的 retry。

6.4 三者对比:一个表讲清

策略中间件poll_ready 的行为call 的行为典型用场
挂起ConcurrencyLimit有许可时 Ready;无许可时 Pending消费许可客户端,公平等待
排队Buffer有 channel 槽时 Ready;无槽时 Pending送消息,等 oneshot!CloneClone,或延迟处理
拒绝LoadShed总是 Ready按缓存决定转发或返回 OverloadedHTTP 服务端,流量管控

这三种策略对应三种完全不同的系统观:

挂起是”平滑”的——上游 task 挂起在 waker 上,不浪费线程,不消耗内存,但可能让上游看起来”卡住了”,如果上游是一个 HTTP 连接,连接可能被客户端判断为超时

排队是”平滑但可能累积”的——请求不会丢,但队列会积累。适合批处理、异步工作流。但 bound 设得太大时,内存可能吃爆;bound 设得太小时,又退化成”背压传到上游”。

拒绝是”决断”的——系统任何时候都快速响应,不积累、不挂起。但失败率会上升,对客户端友好度取决于客户端有没有正确的 retry 逻辑。

实战里往往组合使用。经典 HTTP server 模式:

// 顶层:对上游永远 Ready,防止 TCP 连接累积
.load_shed()
// 中层:设置一个排队缓冲,吸收 burst
.buffer(500)
// 内层:限制真实并发,保护 CPU 和下游
.concurrency_limit(50)
.service(router)

请求流量的命运:

  1. 前 50 个 → 立刻进入 handler。
  2. 第 51-550 个 → 进队列(buffer 500 个槽)。
  3. 第 551 个 → 触发 LoadShed 拒绝(buffer.poll_ready 返回 Pending,被 load_shed 转成 Overloaded 错误)。

三层结合得到一个”平滑吸收 burst、硬拒绝雪崩”的栈。

6.5 深入:Worker 的 failed() 协议

Buffer 里有一段值得独立讨论的代码——Worker::failed()(worker.rs:106-137)。它处理的是”inner service 永久报错之后,怎么让所有使用这个 Buffer 的调用方都知道”。

fn failed(&mut self, error: crate::BoxError) {
    let error = ServiceError::new(error);
    let mut inner = self.handle.inner.lock().unwrap();

    if inner.is_some() { return; }   // 已经报错过,避免重复处理

    *inner = Some(error.clone());
    drop(inner);

    self.rx.close();   // 关闭 channel——后续 send 会失败

    self.failed = Some(error);
}

注释里有一段工程思考非常精彩(worker.rs:112-118,手动翻译):

必须处理 “handle 并发地尝试 send 请求” 的场景。我们希望 要么 请求 send 失败、要么 它能从自己的 oneshot 上收到错误。要避免的 race 是:我们先给所有在等的请求广播错误,然后 调用方才发来它的 send——这时它拿不到错误,永远挂起。

解决方法: 暴露错误、然后 关闭 channel(所以后续 send 会 fail)、然后 给已经在等的 oneshot 发错误。

这段话读三遍值得——它在讲一个分布式系统里”状态广播 + 并发写入”的经典 race condition,在 Tower 这种纯 in-process actor 模式里依然存在。解决方式很细致:

  1. 先写 handle.inner = Some(error):任何新来的调用方 poll_ready 检查 tx.is_closed()… 不对,这时 channel 还没关呢。
  2. 再关 channel (self.rx.close()):此后 tx.send_item() 都会立刻失败——新 call 走 Err(_) => ResponseFuture::failed(...) 分支,self.get_worker_error() 从 handle 读到 error。
  3. self.failed:worker 后续 pop 出的消息会被这个 self.failed 拦住,直接回错误。

这个顺序之所以正确,是因为”更新顺序和读顺序一致”——调用方的流程是 tx.send → 检查结果 → 读 handle 错误。worker 的广播顺序是 handle 写 → channel 关闭。调用方 send 失败之后再读 handle,一定能读到错误。

写 Rust 并发代码时这类”先写可见字段,再触发关闭信号”的模式在 crossbeam、tokio 源码里反复出现。对照卷四《Tokio 源码深度解析》第 13 章 channel 源码 讨论 broadcast/oneshot 关闭顺序——模式是相通的。

6.5.2 tower 0.5.3 里 buffer/ 和 load_shed/ 的真实行数对比

读到这里你可能好奇——两个中间件功能相反、代码量差多少?实测 ~/.cargo/registry/src/.../tower-0.5.3/src/——

模块文件小计
buffer/mod / service / worker / layer / future / error / message47 + 144 + 226 + 72 + 78 + 68 + 16651
load_shed/mod / layer / future / error76 + 33 + 70 + 34213

Buffer 是 LoadShed 的 3.06 倍——差距几乎全部集中在三个 buffer 独有文件——

  • worker.rs 226 行——LoadShed 没有 worker,它是零状态、零后台任务的中间件。Buffer 要维护一个后台 tokio::spawn 的 worker loop、处理 close/failed/cancel 三种结束路径、负责把 handle 的错误广播给所有 send 端
  • service.rs 144 行——Buffer 的 Service impl 要处理 poll_ready 向 channel 要槽位、call 送消息到 worker 并拿 oneshot 的两步协议;LoadShed 的 Service impl 整个就 76 行(含 Clone/Debug)、因为它不涉及跨任务状态同步
  • message.rs 16 行 + future.rs 78 行——Buffer 要定义 Message<Req, Fut>(带 oneshot sender + tracing span)作为 worker 通信载体;LoadShed 的 future.rs 70 行里一半是 ResponseState::Overloaded 这种零分配 unit variant——纯同步路径

一条值得记住的规律——“是否需要跨任务的消息传递”是中间件复杂度分水岭——同步包装(LoadShed、Timeout、Retry、ConcurrencyLimit)通常 100-300 行、异步 worker 包装(Buffer、Spawn-Ready、Balance)通常 500-1000+ 行。选中间件时这个差距也会映射到运行时成本——worker 意味着多一次 context switch、一次 allocation、一次 oneshot。

6.5.3 Buffer 通道的源码账本:7 个文件 + 关键类型定位

上一小节给出 651 行的总量,这一小节把它摊开到”每个文件具体承担什么、关键类型在哪一行”——读源码的时候你可以直接当成地图用。实测 tower-0.5.3/src/buffer/——

文件核心职责关键类型 / 函数 / 行号
mod.rs47公开导出 + 模块文档pub use service::Buffer; pub use layer::BufferLayer
message.rs16跨任务通信载体struct Message { request, tx: oneshot::Sender, span: tracing::Span }(line 5-10)
error.rs68两种错误枚举ServiceError(worker 报错)+ Closed(worker 已退出)
future.rs78调用方等 oneshot 的 futureResponseFuture 内置 State::Failed/Rx/Poll 三态机
layer.rs72BufferLayer::new(bound) + 长注释tokio::spawn(worker) 发生在 layer.rs 之外的 Buffer::newservice.rs:57
service.rs144Buffer<Req, F> 客户端句柄pub fn new (line 49)、pub fn pair (line 66)、Service::poll_ready (line 97-108)、Service::call (line 110-131)、Clone (line 133-144)
worker.rs226唯一后台 taskWorker<T, Req>(line 23-33,6 字段)、Handle(line 37-40)、Worker::new (line 47)、poll_next_msg (line 71)、failed (line 106)、Future::poll (line 147)、Handle::get_error_on_closed (line 210)、Handle::clone (line 221)
合计651

几条值得记的”冷知识”——

  • message.rs 只有 16 行——它是 Buffer 最小但最核心的类型。整个 Buffer 的价值就凝结在这个 struct:把”请求 + oneshot 返回端 + tracing span”打包送进 mpsc,让 actor 模式能跨 task 保留 trace 上下文。
  • Handle 是 6 行结构体(worker.rs:37-40)——内部只有一个 Arc<Mutex<Option<ServiceError>>>。整个”worker 挂掉后所有调用方都能读到错误”的协议,就靠这 1 个 Arc<Mutex<Option<E>>> 支撑。第 §6.5 节讲的 failed() race-free 顺序正是围绕这 6 行展开。
  • Worker 有 6 个字段(worker.rs:23-33)——current_message(回填已取出但 inner 未 ready 的消息)、rx(接收端)、service(被包裹的 inner)、finish(结束标志)、failed(永久错误缓存)、handle(与 Handle 共享 Arc)。这 6 个字段每一个都对应一种运行态的可能性,读的时候可以按字段对着 poll() 函数的 match 分支一一对应。
  • tokio::spawn(worker)service.rs:57Buffer::new——不是在 BufferLayer::layer() 里!这意味着 每次 Layer::layer() 都会立刻 spawn 一个新 worker 并漏出 handle。如果你把 BufferLayer 套在一个会被反复 layer() 的地方(比如 Balance 的 make-service pattern),会悄悄 spawn 大量 worker——这是一个生产里真实踩过的坑。规避方法:要么用 Buffer::pair 自己管理 worker 生命周期、要么把 Buffer 放在 make_service_fn 外层只 spawn 一次。

Buffer 的”7 个文件 651 行”和后面将读到的 Balance(第 7 章,大约 900+ 行)、SpawnReady(122 行)并置——你会看到一个规律——

graph TD
    A["同步中间件<br/>(Timeout / LoadShed / Concurrency / Rate)"] --> A1["70-130 行 / 文件"]
    A --> A2["无 tokio::spawn"]
    A --> A3["无跨 task 错误广播"]
    B["异步中间件<br/>(Buffer / SpawnReady / Balance)"] --> B1["200-900+ 行 / 文件"]
    B --> B2["有 tokio::spawn"]
    B --> B3["有 Arc&lt;Mutex&gt; + channel / handle 双向失败传播"]

是否跨 task”就是 Tower 中间件宇宙的元维度——一旦跨越,Handle + Message + Worker + oneshot + Closed + ServiceError 这一整套模式就必然出现。

6.5.4 LoadShed vs ConcurrencyLimit:同叫”限制”,源码机制截然相反

这一小节回答一个经常被问到的问题:.load_shed().concurrency_limit(100).concurrency_limit(100) 差在哪里? 不是表面上的”一个拒绝一个等待”——两者背后的源码机制完全不同。实测行号摆在一起对照看——

维度ConcurrencyLimit
(limit/concurrency/service.rs 117 行)
LoadShed
(load_shed/mod.rs 76 行)
容量来源外部 Arc<Semaphore>(line 2, 16)无显式容量——依赖 inner 的 poll_ready
状态字段permit: Option<OwnedSemaphorePermit>(line 22)is_ready: bool(mod.rs:17-20)
poll_ready 行为semaphore.poll_acquire (line 68),许可拿到才调 inner.poll_ready (line 79)总是返回 Poll::Ready(Ok(())) (line 63),但内部先调 inner.poll_ready 并缓存 (line 56-59)
达不到容量时poll_ready Pending——调用方挂起等信号量poll_ready Ready,但 call 里看到 is_ready==false,构造 ResponseFuture::overloaded() (line 60)
call 行为permit.take()(line 83-86)+ 转发 inner.call;permit 随 response future 生命周期持有if is_ready { is_ready=false; inner.call } else ResponseFuture::overloaded()
许可释放时机ResponseFuture drop 时 permit 自动 drop —— response 完成后才释放无许可概念;拒绝路径零分配,只有一个 unit variant
是否使用 tokio sync 原语tokio::sync::Semaphore + tokio_util::sync::PollSemaphore(line 2-3)零——纯同步布尔
外部能看到的错误永远不报”overloaded”,只报 inner 的错误Overloaded 单独错误类型(load_shed/error.rs 34 行)
能否和自身叠加.concurrency_limit(10).concurrency_limit(100) 语义合理(内层生效).load_shed().load_shed() 几乎无意义——第二层永远不会触发
背压语义真背压——Pending 会冒泡到上游切断背压——把 Pending 转成错误,不向上游 Pending

一条关键洞察——ConcurrencyLimit 的 poll_ready 是 “先拿票、再探病” 两段式(line 65-79)——

// 先尝试拿信号量许可
if self.permit.is_none() {
    self.permit = ready!(self.semaphore.poll_acquire(cx));
}
// 有许可后才问 inner 是否 ready
self.inner.poll_ready(cx)

这个顺序意味着 ConcurrencyLimit 会把 inner 的 Pending 透传——如果 inner(比如下游数据库连接池)本身还没就绪、就算 ConcurrencyLimit 有空槽也会 Pending。这是正确的”双重背压”

对比 LoadShed 的 poll_ready (mod.rs:53-64)——

self.is_ready = match self.inner.poll_ready(cx) {
    Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
    r => r.is_ready(),           // ← 把 Pending 塞进布尔缓存
};
Poll::Ready(Ok(()))              // ← 对上层撒谎说"我 ready 了"

LoadShed 故意撕掉了 Tower 协议的诚信合约——“poll_ready = Ready 代表 call 一定会进真正的工作路径”这条约定被 LoadShed 打破了。这不是 bug,是LoadShed 存在的全部理由——它要把”慢背压”强行转成”快失败”。

组合叠加的语义小心事项——

叠加方式含义何时用
.load_shed().concurrency_limit(100)外层撕背压、内层真限流→ 超过 100 并发立刻 Err(Overloaded)HTTP server 推荐——满了就 503
.concurrency_limit(100).load_shed()内层撕背压(无意义,inner 本来就没 backpressure)、外层真限流几乎永远错——LoadShed 被包在内,外层 ConcurrencyLimit 永远 Ready
.concurrency_limit(100).timeout(5s)硬并发 + 单请求超时必须这么叠(§6.6.2)——防慢请求占槽
.buffer(500).concurrency_limit(100)排队 500 + 并发 100适合”短 burst 可容忍额外 5× 延迟”的批处理
.load_shed().buffer(500).concurrency_limit(100)三层——最外撕背压、中层吸 burst、内层真限生产推荐(§6.4 结尾那段 stack)——“平滑 burst,硬拒雪崩”

这张表值得打印贴墙——Tower 中间件选错位置、语义马上反掉。

6.5.5 与 ch23 §23.8.6 “14400 行生产旋钮”账本的串联

第 23 章 §23.8.6 为本书第一次给出了”一份生产 hyper 服务能拨动的所有源码旋钮总账”——14400 行代码、跨 hyper/hyper-util/tower 三个 crate。本章讨论的三个中间件在那张账本里占了 263 行——也就是说整套”HTTP 服务负载控制底盘”不到 hyper-util auto Builder(1376 行)的 1/5

把本章读到的 651 行 Buffer + 213 行 LoadShed + 117 行 ConcurrencyLimit 横向摆回 ch23 账本——

中间件本章测得行数ch23 §23.8.6 口径差异说明
tower/src/timeout/ (mod + future + layer)147 (70+53+24)70(只算 mod.rs)ch23 只统计了”Service 定义本体”,本章把 future + layer 也列进去了
tower/src/load_shed/ 全子树21376(只算 mod.rs)同上——ch23 估的是”最小决策代码
tower/src/limit/concurrency/ 全子树227 (10+60+40+117)117(只算 service.rs)同上
tower/src/buffer/ 全子树651未列入(ch23 §23.5 没用 Buffer)ch23 的生产 stack 选择了无 Buffer 的 load_shed + concurrency_limit 直连——本章 §6.4 的叠加推荐只适用于”容忍短 burst 的批处理”,ch23 §23.5 的生产模板反而明确去掉了 Buffer

两章的差异背后有一个隐藏的工程观——

生产 HTTP 服务默认不开 Buffer。Buffer 会把”瞬时压力”翻译成”队列延迟”,但 HTTP 服务的核心诉求是”尾延迟可预测”——满了直接 503 比偷偷排队更诚实。ch23 §23.5 选择 load_shed + concurrency_limit不套 buffer,本质就是这个判断。

什么场景真需要 Buffer? 三类——

  1. 服务内部是 !Clone(比如 AuthClient 持有大型连接池)——第 5 章的 Retry 要求 S: Clone,不套 Buffer 没法接 Retry。
  2. 批处理 / 后台消费(比如消息队列 worker)——吞吐优先、延迟可容忍到秒级。
  3. 多消费者共享一个串行 resource(比如一个 FFI 调用只能单线程跑)——Buffer 是最小成本的 “actor adapter”。

横向串联本书已有账本——

flowchart LR
    subgraph 卷三["卷三 Hyper&Tower"]
        CH02["ch02 tower-service<br/>390 行 / 85% 文档"]
        CH05["ch05 Timeout/Retry/RateLimit"]
        CH06["ch06 Buffer/LoadShed/ConcurrencyLimit<br/>651+213+117=981 行"]
        CH19["ch19 hyper-util 全 crate<br/>12693 行"]
        CH21["ch21 client 子树<br/>6619 行"]
        CH22["ch22 axum 全家桶<br/>33693 行"]
        CH23["ch23 生产旋钮<br/>14400 行"]
    end
    CH02 --> CH05 --> CH06
    CH06 -.->|"263 行归入"| CH23
    CH19 -.->|hyper-util 1376 行| CH23
    CH21 -.->|客户端侧| CH23
    CH22 -.->|服务端侧| CH23

把这张图记在脑子里——“Tower 中间件薄、生态承载重”这条规律不是抒情,是可测量的:真正写在 tower crate 里的生产级中间件合起来不到 1000 行,但它和 hyper-util(12693)、axum(33693)组合起来就是 Rust HTTP 宇宙的主干。每多读一章、这个账本会多出一列——到 ch23 之前你心里应该有一张可反查的行数总表

对下一章的伏笔——ch07 将读 Balance / Discover / ready_cache。把本章 §6.5.3 的”跨 task 中间件复杂度规律”带过去,你会发现 Balance 又是一个 500+ 行的异步中间件——它和 Buffer 几乎是同一种模式(内部 spawn 监听后端变更)。读第 7 章时可以先猜:Balance 的 Message / Handle / Worker 三件套应该长什么样? 猜完再对源码——这种”先构造心智模型、再对账”的读源码法,会让你把 500 行读成 5 分钟。

6.5.6 三种策略的”默认值与关键 API 对照表

读完上面两张表,再加一张API 维度的对照——把本章三个中间件在 ServiceBuilder 上的调用签名、默认行为、覆盖方式全部摊开——

维度BufferLoadShedConcurrencyLimit
Builder API.buffer(bound: usize).load_shed()(无参).concurrency_limit(max: usize)
无默认值(必须显式传)bound——max
有可选配置Buffer::pair(svc, bound) 返回 (Buffer, Worker) 让你自己 spawnLoadShedLayer::new() —— 层化构造(layer.rs 33 行全部)ConcurrencyLimit::with_semaphore(svc, Arc<Semaphore>) 允许多 Service 共享一个信号量
底层依赖tokio::spawn + tokio::sync::mpsc + tokio-util::sync::PollSender零外部依赖tokio::sync::Semaphore + tokio-util::sync::PollSemaphore
实现 Clone总是可以service.rs:133-144——只克隆 tx+handle)仅当 S: Clone 时可以(mod.rs:67-75)——is_ready 会重置为 false仅当 S: Clone 时可以(service.rs:95-107)——新 clone 的 permit: None
drop 语义最后一个 Buffer drop → mpsc 关闭 → worker 收 None → worker task 退出不持有资源ResponseFuture drop → permit drop → 信号量自动归还
poll_ready 反向不变量遵守 Tower 协议(槽位预留机制),但槽可能被占很久(文档警告)故意违反——永远 Ready遵守 Tower 协议(无许可 Pending),且透传 inner 的 Pending
错误类型BoxError(由 ServiceError/Closed 混合)Overloadederror.rs:34 独立类型)继承 S::Error 不变
Service<Req> trait 关系请求类型 Req 必须 Send + 'static(跨 task 传)无额外约束无额外约束

其中最容易被忽视的那一行ConcurrencyLimit::with_semaphore(svc, sem)——它允许你手动共享一个 Arc<Semaphore> 到多个 Service。典型用场:多个路由共享同一个”数据库连接池”容量——

let db_sem = Arc::new(Semaphore::new(50));   // 总池上限 50

let route_a = ServiceBuilder::new()
    .layer_fn(|s| ConcurrencyLimit::with_semaphore(s, db_sem.clone()))
    .service(handler_a);
let route_b = ServiceBuilder::new()
    .layer_fn(|s| ConcurrencyLimit::with_semaphore(s, db_sem.clone()))
    .service(handler_b);

这等价于跨路由的全局并发池——route_a 满了、route_b 也会占用同一个 permit。而如果你朴素地写 .concurrency_limit(50) 两次,就变成两个独立的 50 并发池——总容量 100,和预期的 50 差了一倍。读源码(service.rs:27-36)你会发现 ::new() 只是 ::with_semaphore(inner, Arc::new(Semaphore::new(max))) 的糖衣——这个细节看上去小,生产里被踩中过。

6.5.7 三种 drop 路径的代价账本:多少 ns 丢进去?

本章 §6.2.6 提过 Buffer 的三种 cancel 路径,但没量化。这一节把三种中间件”被取消时发生了什么”以及”取消一次要花多少功夫”放一起——不是为了 benchmark 数字,而是为了让你在 code review 时一眼看出哪里会慢

取消场景BufferLoadShedConcurrencyLimit
poll_ready 但未 callmpsc 槽位被 PollSender 占着,直到 sender drop(可能很久)—— §6.2.7 文档警告的精髓无状态,零代价permit 被持有直到 service drop 或再次 call——可能长时间占槽
call 但调用方 drop 了返回的 futureworker 下次 poll_next_msgmsg.tx.is_closed() → 跳过(worker.rs:81-91)——一次 mutex / atomic 检查ResponseFuture::Overloaded drop 即释放——零代价ResponseFuture drop → permit 被 drop → Semaphore 内部 atomic 归还
inner service 永久报错Worker::failed() 写 Handle + 关 channel + 广播 → O(N) 错误 cloneworker.rs:106-137错误直接回传,LoadShed 本身不做额外工作错误直接回传,permit 仍归还
所有 handle drop(服务关停)mpsc 关闭 → worker 收 None → task 退出(worker.rs:174-177无状态Semaphore 随 Arc 归零 drop

一条实战规则——“长期持有的 future 不要跨越 Buffer/ConcurrencyLimit 一起挂起”。比如你写一个长 SSE stream(能流 30 分钟)、外面套 ConcurrencyLimit(50)——那个 permit 会被占 30 分钟。50 路并发就把整个池打满。正确做法是把 ConcurrencyLimit 放在”短请求”路径上,SSE 这类长连接走独立 stack(或套 load_shed 让满了立刻拒)。

再引一次已有账本的观察——卷四《Tokio 源码深度解析》第 13 章 channel 源码 里对 mpsc::Sender::reserve() 的分析提到:PollSender 预留的槽位释放成本是一次 AtomicUsize::fetch_sub——纳秒级。所以 Buffer 的”槽位预留”本身不慢,慢的是槽位被用户代码逻辑占久了(忘记及时 call 或 drop)。

6.5.8 Buffer ResponseFuture 状态机:78 行读出三态

buffer/future.rs 只有 78 行,但它定义了一个三态机——是调用方那端”等 oneshot 返回 future、再等那个 future 出 response”的完整编舞。读这 78 行的回报比你想的大:它是 Tower 里少见的”双层 await”实现范本

三个状态——

stateDiagram-v2
    [*] --> Failed: Buffer 一 call 就炸(channel 关了)
    [*] --> Rx: call 成功 → 拿到 oneshot Receiver
    Rx --> Failed: worker 在真 call 前挂了(收到 ServiceError)
    Rx --> Poll: 收到 inner 的 response future
    Poll --> [*]: 最终 response 就绪
    Failed --> [*]: 立刻返回错误

对应到源码——ResponseState::Failedcall 同步检测到 channel 关闭时直接构造(service.rs:127-128)、ResponseState::Rxcall 成功送出消息后包住 oneshot receiver、ResponseState::Poll 是 oneshot 收到 inner 返回的 “工作 future” 后转入的态——最后那层 fut.poll(cx) 才是 inner service 真正的响应 future。

这个三态机等价于一个跨 task 的 Promise chain——oneshot 把 “inner service future 的所有权”从 worker 搬回了调用方的 task。搬回来之后,真正的计算(比如 TCP 往返、磁盘 IO)发生在调用方所在的 runtime上——而不是 worker 里。这是 Buffer 的非对称设计——

阶段执行者
请求排队 → inner.poll_readyworker task
inner.call(构造 future)worker task
future.poll(真正跑 IO)调用方 task

换句话说——worker 只是一个”派工台”,真正干活的线程是调用方自己的。这个设计的好处是 worker 不会被任何一个慢请求阻塞——它只负责”从 queue 取、派单、立刻回到 loop”。如果 inner 是 hyper::Client,那实际的 HTTP 往返跑在发起请求的那个 tokio task 里,worker 只参与了一个 poll_ready + call 的瞬间。

这也解释了为什么 Buffer 的 worker 行号里看不到”等待 inner 完成”的代码——它根本不需要等service.call(msg.request) 返回 future 后,worker 立刻 msg.tx.send(Ok(response)) 把 future 塞进 oneshot(worker.rs:156-157),然后 continue loop。

6.5.9 tracing 的 span 穿越:一个容易忽略的性能点

§6.2.4 提过 Buffer 会把 tracing::Span::current() 塞进 Message 传给 worker(service.rs:121)。这个设计的性能含义值得单独说——

  • tracing::Span 是 Clone——但不是零成本。Span 内部持有 DispatchArc<dyn Subscriber>)和一个 ID。Span::current() 要走一次 thread-local 读取。
  • 每个 buffer 请求要 clone 一次 Span——如果你启用了 tracing,每次 Buffer::call 都比不用 Buffer 多一次 Arc::clone + thread-local 访问。纳秒级,但在 100k QPS 场景会出现在火焰图上。
  • worker 进入 spanlet _guard = msg.span.enter()worker.rs:149)——每次处理消息 enter+drop 一次。这是让 inner service 内部的 tracing::info! 能挂回调用方 span的唯一手段。没有这一步,worker 的 task 和调用方的 task 是两个独立的 span 上下文,日志就断了。

实测结论——如果你用 Buffer 包裹一个会记录大量 trace 的 service,关掉 tracing::subscriber 的对应 target,比优化 inner service 的 IO 更能提升吞吐。这不是 Buffer 的锅,是 tracing 的固有成本;但 Buffer 把 “跨 task 的 span 透传”这一成本显式暴露出来——读这一段代码能训练你对”哪些 Arc::clone 在热路径上”的敏感度。

对比一下——LoadShed 和 ConcurrencyLimit 都不跨 task——它们的 call 就在调用方自己的 task 里,tracing span 天然延续,不需要任何额外操作。这又是一个”跨 task 中间件 vs 同步中间件”的隐性成本差。

6.5.10 一条可以直接落地的”阅读顺序”清单

读完本章前面所有表格和账本,给一个具体的”按这个顺序读完这 981 行”建议——

  1. 先读 message.rs 16 行——理解整条 actor 链的最小承载单元
  2. 再读 load_shed/mod.rs 76 行——感受”同步中间件”的纯净样子(没有任何跨 task 协议)
  3. 然后读 limit/concurrency/service.rs 117 行——同步中间件但引入外部资源(Semaphore),理解”资源约束是如何通过 Future 生命周期自动归还
  4. 接着读 buffer/service.rs 144 行——Buffer 的客户端半侧,看”poll_reserve + send_item + oneshot::channel”三件套怎么拼
  5. 然后读 buffer/worker.rs 226 行——异步中间件的主体,重点读 Worker::failed (line 106) 和 Future::poll (line 147)
  6. 最后读 buffer/future.rs 78 行——把调用方侧的三态机和 worker 的 oneshot.send 对齐起来

按这个顺序读下来,不到 700 行代码,2-3 小时完成——但收获的是”同步 vs 异步中间件的普适心智模型”。这个模型会在你读第 7 章的 Balance、第 10 章的 reconnect、卷四的 tokio sync 原语时反复复用。

6.5.11 修两条容易误导的说法:Buffer 的”spawn 次数”与”无 bound 默认值”

最后一小节,修正两条在 Rust 社区(Reddit、Stack Overflow、博客转述)里反复出现但和 0.5.3 源码不一致的说法——

说法一:“Buffer 会在每个 call 里 spawn 一个新 task” —— 。实测 service.rs:49-60——tokio::spawn(worker) 只在 Buffer::new 里被调用一次,对应”每把 Buffer 包上一个 Service 就 spawn 一个 worker”。call 路径只经过 send_item + oneshot::channel——零 spawn

但有一个隐式 spawn——每次调用 BufferLayer::layer(svc) 也会走 Buffer::newlayer.rs 读源码可见)。这意味着如果你在 make_service_fn 里重复 BufferLayer::new(100)每个新连接都会 spawn 一个 worker task——这是大多数人没意识到的”per-connection worker”。生产正确做法:把 Buffer 放在 make_service 之外(整个 server 一个 worker)、或者根本不要用 Layer 形式,直接在外层 Buffer::new 一次。

说法二:“.buffer() 默认 bound 是 0” —— 错,根本没有默认值ServiceBuilder::buffer(bound: usize) 必须显式传。但 tokio 的 mpsc::channel(bound) 如果传 0 会 panic(tokio 0.5+ 已检查),所以即使你真的传 0 也会立刻挂。正确的 bound 选择见 §6.2.7——但”永远不传 0”是不写进文档的硬约束。

对比 LoadShed 无参、ConcurrencyLimit 必填 max——三个中间件的”强制配置 surface”如下——

中间件零参数版本必填参数是否有”沉默变坏”的边界值
Buffer(必填 bound)bound: usizebound=0 会立刻 panic;bound 过大会吃内存
LoadShed——.load_shed()——
ConcurrencyLimit(必填 max)max: usizemax=0 等价于”永远 Pending”——不会 panic,但服务永远不响应

max=0 的沉默陷阱值得特别注意——tokio::sync::Semaphore::new(0) 完全合法、只是 poll_acquire 永远返回 Pending。ConcurrencyLimit 套一个 max=0 的信号量,对上层的表现是”永久 poll_ready Pending”——服务没报错,只是所有请求都挂着。如果你的配置文件从环境变量读 max、环境变量没设默认成 0——直接中奖。

6.5.12 本章账本归并回卷三主表

把 §6.5.2 的 “651 + 213 = 864 行” 加上 §6.5.4 讨论的 ConcurrencyLimit 全子树 227 行,本章完整覆盖的 tower 源码是——

ch06 覆盖行数=651buffer/+213load_shed/+227limit/concurrency/=1091 行\text{ch06 覆盖行数} = \underbrace{651}_{\text{buffer/}} + \underbrace{213}_{\text{load\_shed/}} + \underbrace{227}_{\text{limit/concurrency/}} = \mathbf{1091 \text{ 行}}

这 1091 行对应的功能密度——三种完全不同的容量策略、三种 drop 语义、两种(同步 / 异步)中间件范式、两种 tokio sync 原语的用法(Semaphore + mpsc+oneshot)——平均每 363 行一个独立概念。和 ch22 的 axum(33693 行 / 约 10 个核心概念,平均 3000+ 行一个概念)对比,你会发现 tower 生态的”信息密度是 axum 的 8-10 倍——这是为什么作者在本卷一再强调”先读 tower 再读 axum”——前者 1000 行每一行都在讲抽象,后者 30000 行大部分是 HTTP 协议适配器的胶水。

把本章 1091 行归入卷三账本(累计)——

章节覆盖 crate 子树实测行数累计
ch02tower-service/390390
ch04tower/src/util/ 重点约 500~890
ch05tower/src/{timeout,retry,limit/rate}/约 350~1240
ch06tower/src/{buffer,load_shed,limit/concurrency}/1091~2330
ch19hyper-util/ 全 crate12693~15000
ch21hyper-util/src/client/ 子树6619(ch19 子集)
ch22axum/ 全家桶33693~48700
ch23生产旋钮汇总14400(横跨多 crate)

到 ch06 末尾,你已经跟读了约 2330 行 tower 源码——这是整个 tower 0.5.3(约 12000 行不到)的近 1/5。本章是整本卷三里**“信息密度 / 行数比”最高的一章**——后面 7-18 章合起来也不会超过 3000 行 tower 源码的覆盖量。这个分布本身就是一种”读源码性价比地图”。

把这张地图映射到时间预算——如果你每天能投入 1 小时深度读源码、用上面 §6.5.10 的 6 步顺序推进,4-5 天可以把本章 1091 行完整吃透;同样的投入读 ch22 axum 33693 行,至少 4-5 周。这不是说 axum 不重要,而是”先用 1 周拿下 tower 的抽象层、再用 1 个月读 axum 的适配层”远比”同时读两者”效率高——因为 axum 的每一个 Layer 都在用 tower 的抽象。

本章三个账本(§6.5.2 / §6.5.3 / §6.5.5 / §6.5.12)可以直接当 check-list 回查——每一张表的行号、类型、函数位置都来自 0.5.3 源码的 grep -n 实测。如果未来 tower 升级到 0.6.0,这几张表里对应文件哪些变了、哪些没变,是你判断”是否要重读本章”的客观指标。源码账本不是炫技,是给你未来自己的可重复校对工具

最后一条收束——本章从”三种活法”的哲学开篇、经过 poll_reserve / oneshot / Semaphore / ResponseState 四段源码、落到跨 crate 的生产账本——每一步都可以用 grep 复现、每一个数字都能用 wc 校对。这是”读源码写作”和”感悟式写作”的核心分水岭——前者给你的每一句话都有可验证的落点,后者只能赢得当下的掌声。把本章的阅读习惯带到后面所有章节去——遇到任何断言、先问”行号在哪”

6.6 一些容量工程的”反常识”

读完三种策略和它们的源码,给你三条容易被忽视的经验。

6.6.1 Buffer 不是”无限容量”的替身

新手最容易的误解是把 Buffer 当”加大吞吐”的魔法——“我加一个 buffer(10000),吞吐就能上来”。实际上:

  • Buffer 不能加速 inner service——worker 是串行 poll inner 的。
  • Buffer 只能吸收 burst,不能提升稳态吞吐。稳态 throughput = min(producer_rate, consumer_rate)——buffer 大了只让”满起来慢一点”。
  • Buffer 大 → 请求在队列里等的时间长 → 尾延迟炸掉。p99 latency 受 buffer 深度直接影响。

如果你真的需要”更高的稳态吞吐”,要么加并行度(水平扩展)、要么优化 inner service。Buffer 只是”容量缓冲”,不是”容量放大器”。

6.6.2 ConcurrencyLimit 对慢请求敏感

假设你设 .concurrency_limit(100) 期望”最多 100 个并发请求”。但 “100 个并发” 只是一个字面意思——它不限制每个请求的耗时。如果下游变慢(从平均 50ms 涨到 500ms),100 个槽会被”满上”——总吞吐从 2000 QPS 变成 200 QPS。

真实事故案例:数据库慢查询。平时 100 个 DB 并发完全够用,某天执行计划突然变差,单查从 10ms 变 1s——concurrency_limit(100) 以为还能接 100 个并发请求,但实际上吞吐崩掉了。上游看起来正常(每次 call 都能 ready),但客户端看到的每个请求都变慢了 100 倍。

防守方法:ConcurrencyLimit 一般要配 Timeout 使用。单个请求超时了就释放许可,防止慢请求独占槽位。Tower 官方 guides 里几乎所有 ConcurrencyLimit 示例都配 Timeout 就是这个原因。

6.6.3 LoadShed 在客户端通常错了

第三个”反常识”:LoadShed 很少应该出现在客户端代码里。

客户端调用远端服务,偶尔的容量限制(连接池满、inflight 达上限)是正常的——等一下就好。LoadShed 把所有”暂时满”都翻译成错误——客户端代码需要在每一次调用处判断”如果是 Overloaded 要不要 retry 要不要冒泡”,非常不友好。

LoadShed 的正确位置几乎总是最外层 HTTP 服务端——在这个位置拒绝比挂起 TCP 连接更健康。在客户端用 ConcurrencyLimit 或者 Buffer,让调用方安静地等待,直到真的满太久才通过 Timeout 放弃。

6.7 落到你键盘上

本章读完之后:

  • tower/src/buffer/future.rs——很短,是 Buffer 调用方等 oneshot 的 future 实现。里面有一个细节:如果 worker drop 了而 oneshot 还没发消息,调用方怎么识别?答案在那一行 rx.is_closed() 判断里。
  • 写一段压测:用 .concurrency_limit(10).load_shed().concurrency_limit(10) 各 wrap 一个延迟 100ms 的 handler,分别发 1000 并发请求。第一种所有请求都会慢慢处理完;第二种 90% 直接拿到 Overloaded。两者延迟分布、成功率曲线完全不同——这就是策略的力量。
  • 在你生产项目里盘一遍:哪些地方用 concurrency_limit 但没配 timeout?哪些地方 buffer 开得过大?哪些地方该用 load_shed 却没用?对着本章三种策略的适用场景过一遍能发现一批隐患。

下一章我们继续往 Tower 的深水区走——BalanceDiscoverready_cache,这一组中间件把”多个后端 endpoints”织成一个”单一 Service”,是微服务里 client-side load balancing 的基础。