Appearance
第6章 Buffer / LoadShed / ConcurrencyLimit:容量工程
6.1 当服务饱和时,三种活法
上一章我们把三个"按规则节制"的中间件读完了——Timeout、Retry、RateLimit。这一章要讨论的是另一组问题:
当服务真的满了,你选择怎么办?
这是分布式系统里最古老也最哲学的问题之一。直觉上大多数人会说"限流呗",但真正坐下来审视细节,会发现"服务满了"可以用截然不同的方式应对:
- 排队等(Buffer)——"我先把请求记下来,等服务有空时依次处理。"
- 直接拒(LoadShed)——"现在忙不过来,请你 502 回去吧,等会再试。"
- 挂起等(ConcurrencyLimit)——"别急,你的请求先挂着,轮到你的时候我再接。"
三种策略对应三种真实场景。不同场景下选错策略的代价可能非常昂贵:排队可能导致雪崩、拒绝可能丢掉容易处理的请求、挂起可能让上游大量 TCP 连接堆积。
Tower 把这三种策略都实现成了中间件:Buffer、LoadShed、ConcurrencyLimit。这一章我们把前两个读透,并和上一章已经读过的 ConcurrencyLimit(第 4 章也讲过它的 permit 模型)合在一起对比。三个加起来不到 400 行代码,但它们是整个 Rust 生产环境容量工程的基石。
源码引用来自 tower 0.5.3 (tower/src/buffer/、tower/src/load_shed/)。
6.2 Buffer:把 mpsc 和 oneshot 缝起来
Buffer 的核心观察是:tokio 的 mpsc::channel(bound) 本身就是一个背压感知的队列。消费端处理不过来时,生产端 send() 会返回 Pending——调用方可以在 task 层面自然挂起。Tower 的 Buffer 就是"把 mpsc 通道嫁接到 Service 接口上"。
6.2.1 两半结构
rust
// tower/src/buffer/service.rs:18-22
pub struct Buffer<Req, F> {
tx: PollSender<Message<Req, F>>,
handle: Handle,
}Buffer 本身不持有被包裹的 Service——它只持有一个 channel 的发送端和一个 error handle。真正的 Service 活在另一个地方:Worker<T, Request>,一个独立的 tokio task。
这是 Buffer 最核心的架构选择:把 Service 消费和请求分发拆成两个独立的 async 实体。
┌──────────────┐ ┌──────────┐
│ Buffer │ │ Worker │
caller ──▶│ (Clone) │──mpsc──▶ │ │──call──▶ inner Service
│ (Clone) │ │ │
└──────────────┘ └──────────┘- 调用方(左侧)拿一个
Buffer实例,调用poll_ready+call。 poll_ready会对 mpsc 的 sender 做poll_reserve——预留一个槽位。call构造一个Message { request, span, tx_oneshot },通过 mpsc 送给 worker。返回值是一个等rx_oneshot的 future。- Worker(右侧)在自己的 task 里
poll_recv收请求,对 inner service 做poll_ready+call,把 inner 的 response future 通过 oneshot 送回。
这是一个经典的 actor 模式——worker 是 actor,Buffer 是"client handle"。调用方只通过 handle 送消息,内部 actor 串行处理。
6.2.2 为什么 Buffer 能让任何 Service 变 Clone
rust
// tower/src/buffer/service.rs:133-144
impl<Req, F> Clone for Buffer<Req, F>
where Req: Send + 'static, F: Send + 'static,
{
fn clone(&self) -> Self {
Self {
handle: self.handle.clone(),
tx: self.tx.clone(),
}
}
}Buffer<Req, F> 可以 Clone——不管底下的 Service 能不能 Clone。这在 Retry(需要 S: Clone)、Balance(需要给每个 endpoint 一个 clone)等场景里至关重要。
工作原理:clone 的只是 PollSender 和 Handle——mpsc 的 sender 和错误句柄都是廉价可 clone 的引用。多个 Buffer clone 共享同一个底层 worker,请求通过 channel 汇聚到 worker 串行处理。
在第 5 章我们读 Retry 源码时已经提到过:如果 inner service 自身不 Clone(比如 AuthClient,内部有大型配置),可以套一层 Buffer:
rust
let svc = ServiceBuilder::new()
.layer(RetryLayer::new(policy))
.buffer(100) // ← 在这里 Clone 藏进 Arc+mpsc
.service(heavy_auth_client);Retry 只看到"一个可 Clone 的 Buffer"——实际上底下 heavy_auth_client 只有一份,所有 Retry 的 clone 都通过 channel 排队进同一个 worker。
6.2.3 poll_ready:向 channel 要槽位
rust
// tower/src/buffer/service.rs:97-108
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// First, check if the worker is still alive.
if self.tx.is_closed() {
return Poll::Ready(Err(self.get_worker_error()));
}
// Poll the sender to acquire a permit.
self.tx
.poll_reserve(cx)
.map_err(|_| self.get_worker_error())
}PollSender::poll_reserve(cx) 是 tokio-util 提供的一个 API,把 tokio 的 mpsc::Sender::reserve() 异步能力包装成 poll_fn 形式。语义是"异步地为这一 Buffer 客户端预留一个 mpsc 槽位"。
Buffer 的 poll_ready 做两件事:
- 检查 worker 是否还活着(channel 是否关闭)。如果 worker 已经挂掉(可能因为底层 Service 报错),直接返回"我坏了"的错误。
- poll_reserve 一个 channel 槽位。如果 mpsc 已经满(达到 bound),返回 Pending——调用方被挂起,等有槽位再被 waker 唤醒。
这一步完美接上了 Buffer 的背压语义:当 worker 来不及处理、channel 里积累到 bound 个未处理消息时,新的 poll_ready 会 Pending——上游的 Tower 栈可以根据这个信号决定是继续挂起还是做别的决定。
6.2.4 call:送消息 + 等响应
rust
// tower/src/buffer/service.rs:110-131
fn call(&mut self, request: Req) -> Self::Future {
let span = tracing::Span::current();
let (tx, rx) = oneshot::channel();
match self.tx.send_item(Message { request, span, tx }) {
Ok(_) => ResponseFuture::new(rx),
Err(_) => {
tracing::trace!("buffer channel closed");
ResponseFuture::failed(self.get_worker_error())
}
}
}几个细节:
tracing::Span::current():把当前 tracing span 捎带到 worker。不这么做的话,worker task 里的 tracing 事件不会关联到发起请求的 span。这是一个跨 task tracing 的惯用技巧——span 是Clone的,传入 message 里,worker 收到后_guard = msg.span.enter()进入这个 span。(tx, rx) = oneshot::channel():每个请求对应一个一次性 channel。worker 把 inner service 返回的 response future 通过 tx 送回,调用方在 rx 上等。send_item:这里已经通过poll_reserve拿到了槽位,所以 send 是同步成功的——不可能阻塞,不可能 Pending。协议保证call前必走poll_ready,poll_ready拿到槽位后一直到call都是"已预留"状态。
6.2.5 Worker:真正的循环
rust
// tower/src/buffer/worker.rs:147-206 精简
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.finish { return Poll::Ready(()); }
loop {
match ready!(self.poll_next_msg(cx)) {
Some((msg, _first)) => {
let _guard = msg.span.enter();
if let Some(ref failed) = self.failed {
let _ = msg.tx.send(Err(failed.clone()));
continue;
}
match self.service.poll_ready(cx) {
Poll::Ready(Ok(())) => {
let response = self.service.call(msg.request);
let _ = msg.tx.send(Ok(response));
}
Poll::Pending => {
// 把消息放回去,挂起 worker
drop(_guard);
self.current_message = Some(msg);
return Poll::Pending;
}
Poll::Ready(Err(e)) => {
let error = e.into();
drop(_guard);
self.failed(error);
let _ = msg.tx.send(Err(self.failed.as_ref().unwrap().clone()));
}
}
}
None => {
self.finish = true;
return Poll::Ready(());
}
}
}
}Worker 的 Future::poll 本身是一个 loop——每次 poll 尽可能消费所有待处理消息,直到要么所有消息处理完、要么 service 不 ready。
核心三段式:
- 拿下一个消息(
poll_next_msg)。如果 channel 关了(所有 Buffer 实例都 drop 了),返回 None。 - 检查 service 状态:如果已经
failed,直接把错误送回 oneshot。 - poll_ready + call:
Ready(Ok):调用inner.call(req),把 response future 通过 oneshot 送回。注意!这里送回的是 future,不是 response——真正的 await 发生在调用方的ResponseFuture。Pending:把消息放回self.current_message,return Pending。下次 worker 被 wake 时再尝试。Ready(Err):service 永久坏了。走failed()流程——关闭 channel、把错误广播给所有还在等的 oneshot。
current_message 字段非常巧妙——它解决了"已经从 channel 取出但 service 还没就绪"的尴尬。不能把消息再送回 channel(顺序就乱了),必须缓存在 worker 里等 service 就绪再 call。
6.2.6 cancel 的三种路径
Buffer 支持三种请求取消:
- 调用方 drop 了 ResponseFuture(还没送到 worker 就不想要了)。worker 下次
poll_next_msg时会检查msg.tx.is_closed()——如果调用方已经 drop 了 rx,oneshot 的 tx 会变成 closed,worker 把这条消息直接跳过:
rust
// worker.rs:81-91
if !msg.tx.is_closed() {
return Poll::Ready(Some((msg, false)));
}
tracing::trace!("dropping cancelled buffered request");- Worker 报错:调用
self.failed(error),后续所有消息收到错误。 - 所有 Buffer clone drop:channel 关闭,worker 收到 None,正常结束。
这三种路径不需要额外的"cancel 信号"通道——Rust 的 drop 语义和 oneshot/mpsc 的 closed 检测完整覆盖。
6.2.7 Buffer 的 bound 该设多少
rust
/// A note on choosing a `bound`
///
/// When [`Buffer`]'s implementation of [`poll_ready`] returns [`Poll::Ready`], it reserves a
/// slot in the channel for the forthcoming [`call`]. However, if this call doesn't arrive,
/// this reserved slot may be held up for a long time.这段文档里的警告值得强调:poll_ready 返回 Ready 后,那个槽位就被"预留"了。如果调用方 poll_ready 后没有立刻 call(比如被 cancel 了),那个槽位还占着——直到 PollSender 的 drop 释放。
实战推论:bound 要 >= 预期的并发 poll_ready 调用方数量。如果你有 200 个并发 task 都在 buffer_svc.ready().await,bound 设成 100,会有 100 个 task 被堵在 poll_ready 的 Pending 上——正确,但不是你想要的"先全部 ready,然后慢慢处理"的效果。
一般经验:bound >= 2 × 预期峰值并发数。留出冗余给那些"ready 了但还没 call"的短暂态。
6.3 LoadShed:饱和时立即拒绝
Buffer 的哲学是"排队",LoadShed 的哲学刚好相反——"满了就拒"。整段源码加起来不到 150 行。
6.3.1 整体结构
rust
// tower/src/load_shed/mod.rs:17-20
#[derive(Debug)]
pub struct LoadShed<S> {
inner: S,
is_ready: bool,
}两个字段:inner + 一个 bool。就这么多。
is_ready 是一个缓存——它记录"上一次 inner.poll_ready 返回 Ready 了吗"。这个细节让 LoadShed 的整个哲学可行。
6.3.2 颠覆性的 poll_ready
rust
// tower/src/load_shed/mod.rs:43-54
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// We check for readiness here, so that we can know in `call` if
// the inner service is overloaded or not.
self.is_ready = match self.inner.poll_ready(cx) {
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
r => r.is_ready(),
};
// But we always report Ready, so that layers above don't wait until
// the inner service is ready (the entire point of this layer!)
Poll::Ready(Ok(()))
}看这一行代码:Poll::Ready(Ok(())) —— LoadShed 的 poll_ready 永远返回 Ready。
这是 LoadShed 的灵魂:它违反了 Tower 协议的精神(poll_ready Ready 应该代表"真的能接"),但这个违反是有意的——LoadShed 的存在就是为了绕过背压,让上层调用方永远不挂起。
具体流程:
- 内部先 poll 一次 inner,把结果存在
self.is_ready里。 - 无论 inner 是 Ready 还是 Pending,都向调用方报告 Ready。
- 如果 inner 报错,这种"不可恢复"的信号还是要传上去——否则调用方会以为还能用。
6.3.3 call:真相的时刻
rust
// tower/src/load_shed/mod.rs:56-64
fn call(&mut self, req: Req) -> Self::Future {
if self.is_ready {
// readiness only counts once, you need to check again!
self.is_ready = false;
ResponseFuture::called(self.inner.call(req))
} else {
ResponseFuture::overloaded()
}
}到了 call,LoadShed 才查看上一次 poll_ready 缓存的结果:
is_ready == true:inner 那时是 ready 的——转交请求。注意self.is_ready = false——一次许可只能用一次,下次 call 必须再 poll_ready。is_ready == false:inner 那时是 pending 的——直接构造一个"已失败"的 future,里面装着Overloaded错误。调用方 await 这个 future,立即拿到Err(Overloaded)。
"overloaded future"的实现:
rust
// tower/src/load_shed/future.rs:22-31
pin_project! {
#[project = ResponseStateProj]
enum ResponseState<F> {
Called { #[pin] fut: F },
Overloaded,
}
}
// poll 实现:
match self.project().state.project() {
ResponseStateProj::Called { fut } => fut.poll(cx).map_err(Into::into),
ResponseStateProj::Overloaded => Poll::Ready(Err(Overloaded::new().into())),
}两个变体的 future——Called 是真实工作的 future,Overloaded 是一个立即 Ready 的常量 future。整个"拒绝"路径是零分配的(只构造一个 unit variant),性能代价小到可以忽略。
6.3.4 LoadShed 的用场
读过 poll_ready 永远 Ready 这个设计之后,你可能会担心 LoadShed 是不是"把 Tower 协议破坏了"。确实——但只在 LoadShed 自己这一层。外层看到的是一个"总是就绪"的服务,内层的真实就绪信号被 LoadShed 吞掉并转化成 Overloaded 错误。
合适的使用场景:HTTP 服务端。HTTP 请求到达之后,你很少希望连接层 pending——更合理的是立即响应 503。用户自然会在客户端 retry 或者显示"服务繁忙"。
实战组合:
rust
let svc = ServiceBuilder::new()
.load_shed() // 上层始终 Ready → 请求永不等待
.concurrency_limit(100) // 内层有真实容量限制
.service(expensive_handler);concurrency_limit 会在 100 个 in-flight 时报告 Pending;这个 Pending 被 load_shed 吞掉——转换成 Err(Overloaded) 返回给调用方。调用方(比如 hyper 的 HTTP 层)立刻拿到错误,产生 HTTP 503 返回客户端。整个链条:"服务满 → 立即 503",不挂起、不排队。
不合适的使用场景:客户端代码。客户端调用远端服务,绝大多数情况下你希望"等一下,等远端有空"——这恰好是 ConcurrencyLimit 或 Buffer 的作用。LoadShed 会让客户端直接放弃,可能导致大量无意义的 retry。
6.4 三者对比:一个表讲清
| 策略 | 中间件 | poll_ready 的行为 | call 的行为 | 典型用场 |
|---|---|---|---|---|
| 挂起 | ConcurrencyLimit | 有许可时 Ready;无许可时 Pending | 消费许可 | 客户端,公平等待 |
| 排队 | Buffer | 有 channel 槽时 Ready;无槽时 Pending | 送消息,等 oneshot | 让 !Clone 变 Clone,或延迟处理 |
| 拒绝 | LoadShed | 总是 Ready | 按缓存决定转发或返回 Overloaded | HTTP 服务端,流量管控 |
这三种策略对应三种完全不同的系统观:
挂起是"平滑"的——上游 task 挂起在 waker 上,不浪费线程,不消耗内存,但可能让上游看起来"卡住了",如果上游是一个 HTTP 连接,连接可能被客户端判断为超时。
排队是"平滑但可能累积"的——请求不会丢,但队列会积累。适合批处理、异步工作流。但 bound 设得太大时,内存可能吃爆;bound 设得太小时,又退化成"背压传到上游"。
拒绝是"决断"的——系统任何时候都快速响应,不积累、不挂起。但失败率会上升,对客户端友好度取决于客户端有没有正确的 retry 逻辑。
实战里往往组合使用。经典 HTTP server 模式:
rust
// 顶层:对上游永远 Ready,防止 TCP 连接累积
.load_shed()
// 中层:设置一个排队缓冲,吸收 burst
.buffer(500)
// 内层:限制真实并发,保护 CPU 和下游
.concurrency_limit(50)
.service(router)请求流量的命运:
- 前 50 个 → 立刻进入 handler。
- 第 51-550 个 → 进队列(buffer 500 个槽)。
- 第 551 个 → 触发 LoadShed 拒绝(buffer.poll_ready 返回 Pending,被 load_shed 转成 Overloaded 错误)。
三层结合得到一个"平滑吸收 burst、硬拒绝雪崩"的栈。
6.5 深入:Worker 的 failed() 协议
Buffer 里有一段值得独立讨论的代码——Worker::failed()(worker.rs:106-137)。它处理的是"inner service 永久报错之后,怎么让所有使用这个 Buffer 的调用方都知道"。
rust
fn failed(&mut self, error: crate::BoxError) {
let error = ServiceError::new(error);
let mut inner = self.handle.inner.lock().unwrap();
if inner.is_some() { return; } // 已经报错过,避免重复处理
*inner = Some(error.clone());
drop(inner);
self.rx.close(); // 关闭 channel——后续 send 会失败
self.failed = Some(error);
}注释里有一段工程思考非常精彩(worker.rs:112-118,手动翻译):
必须处理 "handle 并发地尝试 send 请求" 的场景。我们希望 要么 请求 send 失败、要么 它能从自己的 oneshot 上收到错误。要避免的 race 是:我们先给所有在等的请求广播错误,然后 调用方才发来它的 send——这时它拿不到错误,永远挂起。
解决方法:先 暴露错误、然后 关闭 channel(所以后续 send 会 fail)、然后 给已经在等的 oneshot 发错误。
这段话读三遍值得——它在讲一个分布式系统里"状态广播 + 并发写入"的经典 race condition,在 Tower 这种纯 in-process actor 模式里依然存在。解决方式很细致:
- 先写 handle.inner = Some(error):任何新来的调用方
poll_ready检查tx.is_closed()... 不对,这时 channel 还没关呢。 - 再关 channel (
self.rx.close()):此后tx.send_item()都会立刻失败——新 call 走Err(_) => ResponseFuture::failed(...)分支,self.get_worker_error()从 handle 读到 error。 - 设
self.failed:worker 后续 pop 出的消息会被这个self.failed拦住,直接回错误。
这个顺序之所以正确,是因为"更新顺序和读顺序一致"——调用方的流程是 tx.send → 检查结果 → 读 handle 错误。worker 的广播顺序是 handle 写 → channel 关闭。调用方 send 失败之后再读 handle,一定能读到错误。
写 Rust 并发代码时这类"先写可见字段,再触发关闭信号"的模式在 crossbeam、tokio 源码里反复出现。对照卷四《Tokio 源码深度解析》第 13 章 channel 源码 讨论 broadcast/oneshot 关闭顺序——模式是相通的。
6.6 一些容量工程的"反常识"
读完三种策略和它们的源码,给你三条容易被忽视的经验。
6.6.1 Buffer 不是"无限容量"的替身
新手最容易的误解是把 Buffer 当"加大吞吐"的魔法——"我加一个 buffer(10000),吞吐就能上来"。实际上:
- Buffer 不能加速 inner service——worker 是串行 poll inner 的。
- Buffer 只能吸收 burst,不能提升稳态吞吐。稳态 throughput = min(producer_rate, consumer_rate)——buffer 大了只让"满起来慢一点"。
- Buffer 大 → 请求在队列里等的时间长 → 尾延迟炸掉。p99 latency 受 buffer 深度直接影响。
如果你真的需要"更高的稳态吞吐",要么加并行度(水平扩展)、要么优化 inner service。Buffer 只是"容量缓冲",不是"容量放大器"。
6.6.2 ConcurrencyLimit 对慢请求敏感
假设你设 .concurrency_limit(100) 期望"最多 100 个并发请求"。但 "100 个并发" 只是一个字面意思——它不限制每个请求的耗时。如果下游变慢(从平均 50ms 涨到 500ms),100 个槽会被"满上"——总吞吐从 2000 QPS 变成 200 QPS。
真实事故案例:数据库慢查询。平时 100 个 DB 并发完全够用,某天执行计划突然变差,单查从 10ms 变 1s——concurrency_limit(100) 以为还能接 100 个并发请求,但实际上吞吐崩掉了。上游看起来正常(每次 call 都能 ready),但客户端看到的每个请求都变慢了 100 倍。
防守方法:ConcurrencyLimit 一般要配 Timeout 使用。单个请求超时了就释放许可,防止慢请求独占槽位。Tower 官方 guides 里几乎所有 ConcurrencyLimit 示例都配 Timeout 就是这个原因。
6.6.3 LoadShed 在客户端通常错了
第三个"反常识":LoadShed 很少应该出现在客户端代码里。
客户端调用远端服务,偶尔的容量限制(连接池满、inflight 达上限)是正常的——等一下就好。LoadShed 把所有"暂时满"都翻译成错误——客户端代码需要在每一次调用处判断"如果是 Overloaded 要不要 retry 要不要冒泡",非常不友好。
LoadShed 的正确位置几乎总是最外层 HTTP 服务端——在这个位置拒绝比挂起 TCP 连接更健康。在客户端用 ConcurrencyLimit 或者 Buffer,让调用方安静地等待,直到真的满太久才通过 Timeout 放弃。
6.7 落到你键盘上
本章读完之后:
- 读
tower/src/buffer/future.rs——很短,是 Buffer 调用方等 oneshot 的 future 实现。里面有一个细节:如果 worker drop 了而 oneshot 还没发消息,调用方怎么识别?答案在那一行rx.is_closed()判断里。 - 写一段压测:用
.concurrency_limit(10)和.load_shed().concurrency_limit(10)各 wrap 一个延迟 100ms 的 handler,分别发 1000 并发请求。第一种所有请求都会慢慢处理完;第二种 90% 直接拿到 Overloaded。两者延迟分布、成功率曲线完全不同——这就是策略的力量。 - 在你生产项目里盘一遍:哪些地方用 concurrency_limit 但没配 timeout?哪些地方 buffer 开得过大?哪些地方该用 load_shed 却没用?这是一本书读下来最值钱的环节。
下一章我们继续往 Tower 的深水区走——Balance、Discover、ready_cache,这一组中间件把"多个后端 endpoints"织成一个"单一 Service",是微服务里 client-side load balancing 的基础。