Skip to content

第6章 Buffer / LoadShed / ConcurrencyLimit:容量工程

6.1 当服务饱和时,三种活法

上一章我们把三个"按规则节制"的中间件读完了——Timeout、Retry、RateLimit。这一章要讨论的是另一组问题:

当服务真的满了,你选择怎么办?

这是分布式系统里最古老也最哲学的问题之一。直觉上大多数人会说"限流呗",但真正坐下来审视细节,会发现"服务满了"可以用截然不同的方式应对:

  • 排队等(Buffer)——"我先把请求记下来,等服务有空时依次处理。"
  • 直接拒(LoadShed)——"现在忙不过来,请你 502 回去吧,等会再试。"
  • 挂起等(ConcurrencyLimit)——"别急,你的请求先挂着,轮到你的时候我再接。"

三种策略对应三种真实场景。不同场景下选错策略的代价可能非常昂贵:排队可能导致雪崩拒绝可能丢掉容易处理的请求挂起可能让上游大量 TCP 连接堆积

Tower 把这三种策略都实现成了中间件:BufferLoadShedConcurrencyLimit。这一章我们把前两个读透,并和上一章已经读过的 ConcurrencyLimit(第 4 章也讲过它的 permit 模型)合在一起对比。三个加起来不到 400 行代码,但它们是整个 Rust 生产环境容量工程的基石。

源码引用来自 tower 0.5.3 (tower/src/buffer/tower/src/load_shed/)。

6.2 Buffer:把 mpsc 和 oneshot 缝起来

Buffer 的核心观察是:tokio 的 mpsc::channel(bound) 本身就是一个背压感知的队列。消费端处理不过来时,生产端 send() 会返回 Pending——调用方可以在 task 层面自然挂起。Tower 的 Buffer 就是"把 mpsc 通道嫁接到 Service 接口上"。

6.2.1 两半结构

rust
// tower/src/buffer/service.rs:18-22
pub struct Buffer<Req, F> {
    tx: PollSender<Message<Req, F>>,
    handle: Handle,
}

Buffer 本身不持有被包裹的 Service——它只持有一个 channel 的发送端和一个 error handle。真正的 Service 活在另一个地方:Worker<T, Request>,一个独立的 tokio task。

这是 Buffer 最核心的架构选择:把 Service 消费和请求分发拆成两个独立的 async 实体

         ┌──────────────┐            ┌──────────┐
         │ Buffer       │            │ Worker   │
caller ──▶│ (Clone)     │──mpsc──▶  │          │──call──▶ inner Service
         │ (Clone)     │            │          │
         └──────────────┘            └──────────┘
  • 调用方(左侧)拿一个 Buffer 实例,调用 poll_ready + call
  • poll_ready 会对 mpsc 的 sender 做 poll_reserve——预留一个槽位。
  • call 构造一个 Message { request, span, tx_oneshot },通过 mpsc 送给 worker。返回值是一个等 rx_oneshot 的 future。
  • Worker(右侧)在自己的 task 里 poll_recv 收请求,对 inner service 做 poll_ready + call,把 inner 的 response future 通过 oneshot 送回。

这是一个经典的 actor 模式——worker 是 actor,Buffer 是"client handle"。调用方只通过 handle 送消息,内部 actor 串行处理。

6.2.2 为什么 Buffer 能让任何 Service 变 Clone

rust
// tower/src/buffer/service.rs:133-144
impl<Req, F> Clone for Buffer<Req, F>
where Req: Send + 'static, F: Send + 'static,
{
    fn clone(&self) -> Self {
        Self {
            handle: self.handle.clone(),
            tx: self.tx.clone(),
        }
    }
}

Buffer<Req, F> 可以 Clone——不管底下的 Service 能不能 Clone。这在 Retry(需要 S: Clone)、Balance(需要给每个 endpoint 一个 clone)等场景里至关重要。

工作原理:clone 的只是 PollSenderHandle——mpsc 的 sender 和错误句柄都是廉价可 clone 的引用。多个 Buffer clone 共享同一个底层 worker,请求通过 channel 汇聚到 worker 串行处理。

在第 5 章我们读 Retry 源码时已经提到过:如果 inner service 自身不 Clone(比如 AuthClient,内部有大型配置),可以套一层 Buffer:

rust
let svc = ServiceBuilder::new()
    .layer(RetryLayer::new(policy))
    .buffer(100)              // ← 在这里 Clone 藏进 Arc+mpsc
    .service(heavy_auth_client);

Retry 只看到"一个可 Clone 的 Buffer"——实际上底下 heavy_auth_client 只有一份,所有 Retry 的 clone 都通过 channel 排队进同一个 worker。

6.2.3 poll_ready:向 channel 要槽位

rust
// tower/src/buffer/service.rs:97-108
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    // First, check if the worker is still alive.
    if self.tx.is_closed() {
        return Poll::Ready(Err(self.get_worker_error()));
    }

    // Poll the sender to acquire a permit.
    self.tx
        .poll_reserve(cx)
        .map_err(|_| self.get_worker_error())
}

PollSender::poll_reserve(cx) 是 tokio-util 提供的一个 API,把 tokio 的 mpsc::Sender::reserve() 异步能力包装成 poll_fn 形式。语义是"异步地为这一 Buffer 客户端预留一个 mpsc 槽位"。

Buffer 的 poll_ready 做两件事:

  1. 检查 worker 是否还活着(channel 是否关闭)。如果 worker 已经挂掉(可能因为底层 Service 报错),直接返回"我坏了"的错误。
  2. poll_reserve 一个 channel 槽位。如果 mpsc 已经满(达到 bound),返回 Pending——调用方被挂起,等有槽位再被 waker 唤醒。

这一步完美接上了 Buffer 的背压语义:当 worker 来不及处理、channel 里积累到 bound 个未处理消息时,新的 poll_ready 会 Pending——上游的 Tower 栈可以根据这个信号决定是继续挂起还是做别的决定。

6.2.4 call:送消息 + 等响应

rust
// tower/src/buffer/service.rs:110-131
fn call(&mut self, request: Req) -> Self::Future {
    let span = tracing::Span::current();
    let (tx, rx) = oneshot::channel();

    match self.tx.send_item(Message { request, span, tx }) {
        Ok(_) => ResponseFuture::new(rx),
        Err(_) => {
            tracing::trace!("buffer channel closed");
            ResponseFuture::failed(self.get_worker_error())
        }
    }
}

几个细节:

  • tracing::Span::current():把当前 tracing span 捎带到 worker。不这么做的话,worker task 里的 tracing 事件不会关联到发起请求的 span。这是一个跨 task tracing 的惯用技巧——span 是 Clone 的,传入 message 里,worker 收到后 _guard = msg.span.enter() 进入这个 span。
  • (tx, rx) = oneshot::channel():每个请求对应一个一次性 channel。worker 把 inner service 返回的 response future 通过 tx 送回,调用方在 rx 上等。
  • send_item:这里已经通过 poll_reserve 拿到了槽位,所以 send 是同步成功的——不可能阻塞,不可能 Pending。协议保证 call 前必走 poll_readypoll_ready 拿到槽位后一直到 call 都是"已预留"状态。

6.2.5 Worker:真正的循环

rust
// tower/src/buffer/worker.rs:147-206 精简
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    if self.finish { return Poll::Ready(()); }
    loop {
        match ready!(self.poll_next_msg(cx)) {
            Some((msg, _first)) => {
                let _guard = msg.span.enter();
                if let Some(ref failed) = self.failed {
                    let _ = msg.tx.send(Err(failed.clone()));
                    continue;
                }
                match self.service.poll_ready(cx) {
                    Poll::Ready(Ok(())) => {
                        let response = self.service.call(msg.request);
                        let _ = msg.tx.send(Ok(response));
                    }
                    Poll::Pending => {
                        // 把消息放回去,挂起 worker
                        drop(_guard);
                        self.current_message = Some(msg);
                        return Poll::Pending;
                    }
                    Poll::Ready(Err(e)) => {
                        let error = e.into();
                        drop(_guard);
                        self.failed(error);
                        let _ = msg.tx.send(Err(self.failed.as_ref().unwrap().clone()));
                    }
                }
            }
            None => {
                self.finish = true;
                return Poll::Ready(());
            }
        }
    }
}

Worker 的 Future::poll 本身是一个 loop——每次 poll 尽可能消费所有待处理消息,直到要么所有消息处理完、要么 service 不 ready。

核心三段式:

  1. 拿下一个消息poll_next_msg)。如果 channel 关了(所有 Buffer 实例都 drop 了),返回 None。
  2. 检查 service 状态:如果已经 failed,直接把错误送回 oneshot。
  3. poll_ready + call
    • Ready(Ok):调用 inner.call(req),把 response future 通过 oneshot 送回。注意!这里送回的是 future,不是 response——真正的 await 发生在调用方的 ResponseFuture
    • Pending把消息放回 self.current_message,return Pending。下次 worker 被 wake 时再尝试。
    • Ready(Err):service 永久坏了。走 failed() 流程——关闭 channel、把错误广播给所有还在等的 oneshot。

current_message 字段非常巧妙——它解决了"已经从 channel 取出但 service 还没就绪"的尴尬。不能把消息再送回 channel(顺序就乱了),必须缓存在 worker 里等 service 就绪再 call。

6.2.6 cancel 的三种路径

Buffer 支持三种请求取消:

  1. 调用方 drop 了 ResponseFuture(还没送到 worker 就不想要了)。worker 下次 poll_next_msg 时会检查 msg.tx.is_closed()——如果调用方已经 drop 了 rx,oneshot 的 tx 会变成 closed,worker 把这条消息直接跳过:
rust
// worker.rs:81-91
if !msg.tx.is_closed() {
    return Poll::Ready(Some((msg, false)));
}
tracing::trace!("dropping cancelled buffered request");
  1. Worker 报错:调用 self.failed(error),后续所有消息收到错误。
  2. 所有 Buffer clone drop:channel 关闭,worker 收到 None,正常结束。

这三种路径不需要额外的"cancel 信号"通道——Rust 的 drop 语义和 oneshot/mpsc 的 closed 检测完整覆盖。

6.2.7 Buffer 的 bound 该设多少

rust
/// A note on choosing a `bound`
///
/// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time.

这段文档里的警告值得强调:poll_ready 返回 Ready 后,那个槽位就被"预留"了。如果调用方 poll_ready 后没有立刻 call(比如被 cancel 了),那个槽位还占着——直到 PollSender 的 drop 释放。

实战推论:bound 要 >= 预期的并发 poll_ready 调用方数量。如果你有 200 个并发 task 都在 buffer_svc.ready().await,bound 设成 100,会有 100 个 task 被堵在 poll_ready 的 Pending 上——正确,但不是你想要的"先全部 ready,然后慢慢处理"的效果。

一般经验:bound >= 2 × 预期峰值并发数。留出冗余给那些"ready 了但还没 call"的短暂态。

6.3 LoadShed:饱和时立即拒绝

Buffer 的哲学是"排队",LoadShed 的哲学刚好相反——"满了就拒"。整段源码加起来不到 150 行。

6.3.1 整体结构

rust
// tower/src/load_shed/mod.rs:17-20
#[derive(Debug)]
pub struct LoadShed<S> {
    inner: S,
    is_ready: bool,
}

两个字段:inner + 一个 bool。就这么多。

is_ready 是一个缓存——它记录"上一次 inner.poll_ready 返回 Ready 了吗"。这个细节让 LoadShed 的整个哲学可行。

6.3.2 颠覆性的 poll_ready

rust
// tower/src/load_shed/mod.rs:43-54
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    // We check for readiness here, so that we can know in `call` if
    // the inner service is overloaded or not.
    self.is_ready = match self.inner.poll_ready(cx) {
        Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
        r => r.is_ready(),
    };

    // But we always report Ready, so that layers above don't wait until
    // the inner service is ready (the entire point of this layer!)
    Poll::Ready(Ok(()))
}

看这一行代码:Poll::Ready(Ok(())) —— LoadShed 的 poll_ready 永远返回 Ready

这是 LoadShed 的灵魂:它违反了 Tower 协议的精神(poll_ready Ready 应该代表"真的能接"),但这个违反是有意的——LoadShed 的存在就是为了绕过背压,让上层调用方永远不挂起。

具体流程:

  1. 内部先 poll 一次 inner,把结果存在 self.is_ready 里。
  2. 无论 inner 是 Ready 还是 Pending,都向调用方报告 Ready。
  3. 如果 inner 报错,这种"不可恢复"的信号还是要传上去——否则调用方会以为还能用。

6.3.3 call:真相的时刻

rust
// tower/src/load_shed/mod.rs:56-64
fn call(&mut self, req: Req) -> Self::Future {
    if self.is_ready {
        // readiness only counts once, you need to check again!
        self.is_ready = false;
        ResponseFuture::called(self.inner.call(req))
    } else {
        ResponseFuture::overloaded()
    }
}

到了 call,LoadShed 才查看上一次 poll_ready 缓存的结果:

  • is_ready == true:inner 那时是 ready 的——转交请求。注意 self.is_ready = false——一次许可只能用一次,下次 call 必须再 poll_ready。
  • is_ready == false:inner 那时是 pending 的——直接构造一个"已失败"的 future,里面装着 Overloaded 错误。调用方 await 这个 future,立即拿到 Err(Overloaded)

"overloaded future"的实现:

rust
// tower/src/load_shed/future.rs:22-31
pin_project! {
    #[project = ResponseStateProj]
    enum ResponseState<F> {
        Called { #[pin] fut: F },
        Overloaded,
    }
}
// poll 实现:
match self.project().state.project() {
    ResponseStateProj::Called { fut } => fut.poll(cx).map_err(Into::into),
    ResponseStateProj::Overloaded => Poll::Ready(Err(Overloaded::new().into())),
}

两个变体的 future——Called 是真实工作的 future,Overloaded 是一个立即 Ready 的常量 future。整个"拒绝"路径是零分配的(只构造一个 unit variant),性能代价小到可以忽略。

6.3.4 LoadShed 的用场

读过 poll_ready 永远 Ready 这个设计之后,你可能会担心 LoadShed 是不是"把 Tower 协议破坏了"。确实——但只在 LoadShed 自己这一层。外层看到的是一个"总是就绪"的服务,内层的真实就绪信号被 LoadShed 吞掉并转化成 Overloaded 错误。

合适的使用场景:HTTP 服务端。HTTP 请求到达之后,你很少希望连接层 pending——更合理的是立即响应 503。用户自然会在客户端 retry 或者显示"服务繁忙"。

实战组合:

rust
let svc = ServiceBuilder::new()
    .load_shed()                     // 上层始终 Ready → 请求永不等待
    .concurrency_limit(100)          // 内层有真实容量限制
    .service(expensive_handler);

concurrency_limit 会在 100 个 in-flight 时报告 Pending;这个 Pending 被 load_shed 吞掉——转换成 Err(Overloaded) 返回给调用方。调用方(比如 hyper 的 HTTP 层)立刻拿到错误,产生 HTTP 503 返回客户端。整个链条:"服务满 → 立即 503",不挂起、不排队。

不合适的使用场景:客户端代码。客户端调用远端服务,绝大多数情况下你希望"等一下,等远端有空"——这恰好是 ConcurrencyLimit 或 Buffer 的作用。LoadShed 会让客户端直接放弃,可能导致大量无意义的 retry。

6.4 三者对比:一个表讲清

策略中间件poll_ready 的行为call 的行为典型用场
挂起ConcurrencyLimit有许可时 Ready;无许可时 Pending消费许可客户端,公平等待
排队Buffer有 channel 槽时 Ready;无槽时 Pending送消息,等 oneshot!CloneClone,或延迟处理
拒绝LoadShed总是 Ready按缓存决定转发或返回 OverloadedHTTP 服务端,流量管控

这三种策略对应三种完全不同的系统观:

挂起是"平滑"的——上游 task 挂起在 waker 上,不浪费线程,不消耗内存,但可能让上游看起来"卡住了",如果上游是一个 HTTP 连接,连接可能被客户端判断为超时

排队是"平滑但可能累积"的——请求不会丢,但队列会积累。适合批处理、异步工作流。但 bound 设得太大时,内存可能吃爆;bound 设得太小时,又退化成"背压传到上游"。

拒绝是"决断"的——系统任何时候都快速响应,不积累、不挂起。但失败率会上升,对客户端友好度取决于客户端有没有正确的 retry 逻辑。

实战里往往组合使用。经典 HTTP server 模式:

rust
// 顶层:对上游永远 Ready,防止 TCP 连接累积
.load_shed()
// 中层:设置一个排队缓冲,吸收 burst
.buffer(500)
// 内层:限制真实并发,保护 CPU 和下游
.concurrency_limit(50)
.service(router)

请求流量的命运:

  1. 前 50 个 → 立刻进入 handler。
  2. 第 51-550 个 → 进队列(buffer 500 个槽)。
  3. 第 551 个 → 触发 LoadShed 拒绝(buffer.poll_ready 返回 Pending,被 load_shed 转成 Overloaded 错误)。

三层结合得到一个"平滑吸收 burst、硬拒绝雪崩"的栈。

6.5 深入:Worker 的 failed() 协议

Buffer 里有一段值得独立讨论的代码——Worker::failed()(worker.rs:106-137)。它处理的是"inner service 永久报错之后,怎么让所有使用这个 Buffer 的调用方都知道"。

rust
fn failed(&mut self, error: crate::BoxError) {
    let error = ServiceError::new(error);
    let mut inner = self.handle.inner.lock().unwrap();

    if inner.is_some() { return; }   // 已经报错过,避免重复处理

    *inner = Some(error.clone());
    drop(inner);

    self.rx.close();   // 关闭 channel——后续 send 会失败

    self.failed = Some(error);
}

注释里有一段工程思考非常精彩(worker.rs:112-118,手动翻译):

必须处理 "handle 并发地尝试 send 请求" 的场景。我们希望 要么 请求 send 失败、要么 它能从自己的 oneshot 上收到错误。要避免的 race 是:我们先给所有在等的请求广播错误,然后 调用方才发来它的 send——这时它拿不到错误,永远挂起。

解决方法: 暴露错误、然后 关闭 channel(所以后续 send 会 fail)、然后 给已经在等的 oneshot 发错误。

这段话读三遍值得——它在讲一个分布式系统里"状态广播 + 并发写入"的经典 race condition,在 Tower 这种纯 in-process actor 模式里依然存在。解决方式很细致:

  1. 先写 handle.inner = Some(error):任何新来的调用方 poll_ready 检查 tx.is_closed()... 不对,这时 channel 还没关呢。
  2. 再关 channel (self.rx.close()):此后 tx.send_item() 都会立刻失败——新 call 走 Err(_) => ResponseFuture::failed(...) 分支,self.get_worker_error() 从 handle 读到 error。
  3. self.failed:worker 后续 pop 出的消息会被这个 self.failed 拦住,直接回错误。

这个顺序之所以正确,是因为"更新顺序和读顺序一致"——调用方的流程是 tx.send → 检查结果 → 读 handle 错误。worker 的广播顺序是 handle 写 → channel 关闭。调用方 send 失败之后再读 handle,一定能读到错误。

写 Rust 并发代码时这类"先写可见字段,再触发关闭信号"的模式在 crossbeam、tokio 源码里反复出现。对照卷四《Tokio 源码深度解析》第 13 章 channel 源码 讨论 broadcast/oneshot 关闭顺序——模式是相通的。

6.6 一些容量工程的"反常识"

读完三种策略和它们的源码,给你三条容易被忽视的经验。

6.6.1 Buffer 不是"无限容量"的替身

新手最容易的误解是把 Buffer 当"加大吞吐"的魔法——"我加一个 buffer(10000),吞吐就能上来"。实际上:

  • Buffer 不能加速 inner service——worker 是串行 poll inner 的。
  • Buffer 只能吸收 burst,不能提升稳态吞吐。稳态 throughput = min(producer_rate, consumer_rate)——buffer 大了只让"满起来慢一点"。
  • Buffer 大 → 请求在队列里等的时间长 → 尾延迟炸掉。p99 latency 受 buffer 深度直接影响。

如果你真的需要"更高的稳态吞吐",要么加并行度(水平扩展)、要么优化 inner service。Buffer 只是"容量缓冲",不是"容量放大器"。

6.6.2 ConcurrencyLimit 对慢请求敏感

假设你设 .concurrency_limit(100) 期望"最多 100 个并发请求"。但 "100 个并发" 只是一个字面意思——它不限制每个请求的耗时。如果下游变慢(从平均 50ms 涨到 500ms),100 个槽会被"满上"——总吞吐从 2000 QPS 变成 200 QPS。

真实事故案例:数据库慢查询。平时 100 个 DB 并发完全够用,某天执行计划突然变差,单查从 10ms 变 1s——concurrency_limit(100) 以为还能接 100 个并发请求,但实际上吞吐崩掉了。上游看起来正常(每次 call 都能 ready),但客户端看到的每个请求都变慢了 100 倍。

防守方法:ConcurrencyLimit 一般要配 Timeout 使用。单个请求超时了就释放许可,防止慢请求独占槽位。Tower 官方 guides 里几乎所有 ConcurrencyLimit 示例都配 Timeout 就是这个原因。

6.6.3 LoadShed 在客户端通常错了

第三个"反常识":LoadShed 很少应该出现在客户端代码里。

客户端调用远端服务,偶尔的容量限制(连接池满、inflight 达上限)是正常的——等一下就好。LoadShed 把所有"暂时满"都翻译成错误——客户端代码需要在每一次调用处判断"如果是 Overloaded 要不要 retry 要不要冒泡",非常不友好。

LoadShed 的正确位置几乎总是最外层 HTTP 服务端——在这个位置拒绝比挂起 TCP 连接更健康。在客户端用 ConcurrencyLimit 或者 Buffer,让调用方安静地等待,直到真的满太久才通过 Timeout 放弃。

6.7 落到你键盘上

本章读完之后:

  • tower/src/buffer/future.rs——很短,是 Buffer 调用方等 oneshot 的 future 实现。里面有一个细节:如果 worker drop 了而 oneshot 还没发消息,调用方怎么识别?答案在那一行 rx.is_closed() 判断里。
  • 写一段压测:用 .concurrency_limit(10).load_shed().concurrency_limit(10) 各 wrap 一个延迟 100ms 的 handler,分别发 1000 并发请求。第一种所有请求都会慢慢处理完;第二种 90% 直接拿到 Overloaded。两者延迟分布、成功率曲线完全不同——这就是策略的力量。
  • 在你生产项目里盘一遍:哪些地方用 concurrency_limit 但没配 timeout?哪些地方 buffer 开得过大?哪些地方该用 load_shed 却没用?这是一本书读下来最值钱的环节。

下一章我们继续往 Tower 的深水区走——BalanceDiscoverready_cache,这一组中间件把"多个后端 endpoints"织成一个"单一 Service",是微服务里 client-side load balancing 的基础。

基于 VitePress 构建