Appearance
第14章 Pool 内部实现:空闲队列、公平性、驱逐策略
"The hardest part of writing a connection pool is knowing when to trust your state— and having a RAII guard ready for when you can't." —— 读懂 sqlx PoolInner 的人共同的领悟
本章要点
PoolInner<DB>(sqlx-core/src/pool/inner.rs:26-37)10 个字段:connect_options(RwLock)+ idle_conns(ArrayQueue)+ semaphore(AsyncSemaphore)+ size / num_idle(atomics)+ is_closed + on_closed(event_listener)+ options + 两个日志 Level。idle_conns: ArrayQueue<Idle<DB>>使用 crossbeam-queue 的无锁环形队列——固定容量(= max_connections)、多生产者多消费者、零分配。比Mutex<VecDeque>在高并发下避开 lock contention 长尾。semaphore: AsyncSemaphore(sqlx-core/src/sync.rs)是 sqlx 对 Tokio / async-std 的统一异步信号量包装。fair: bool决定排队是 FIFO(公平)还是 LIFO(抢占)。默认 true。- 三个状态类型:
Live<DB>(借出中)/Idle<DB>(空闲队列里)/Floating<DB, C>(过渡态,附带 DecrementSizeGuard RAII)。状态转换通过float()/into_live()/into_idle()方法——每次转换是类型级迁移。 DecrementSizeGuard<DB>(inner.rs:573-585)是 Pool 最重要的 RAII guard——持有"已经增了 size 计数"的承诺,如果 Drop 前没有 cancel,自动 decrement——让"建新连接失败时 size 不会泄漏"成为不变量。PoolConnection::Drop自动 spawn 归还任务(connection.rs:199-213)—— drop 时 take 出live、spawn 一个return_to_poolfuture,异步归还到 idle queue。归还过程可能调用after_release回调做清理。spawn_maintenance_tasks(inner.rs:503-572)—— 启动时 spawn 一个后台任务,按min(idle_timeout, max_lifetime)为周期扫描 idle queue,关掉过期连接;同时维持min_connections下限。- 连接建立的退避 backoff(
inner.rs:321-398)——connect 失败用指数退避(10ms → 20ms → 40ms → ... 最多 acquire_timeout/5)重试,避免雪崩。
14.1 问题引入:Pool 内部的五件事
第 13 章讲完 Pool 的外部 API——用户能调的所有方法。但每次 pool.acquire().await? 底下究竟发生了什么?Pool 内部至少在做五件事:
- 管理一组连接:有空闲的(idle)、有借出去的(in-use),两类加起来不能超过 max_connections。
- 协调并发:100 个 async task 同时 acquire,谁先拿到、谁等、等多久——信号量(Semaphore)解决。
- 跟踪连接状态:一条连接从"空闲队列里"到"被借出用"到"归还回来"到"过期被关"——状态机。
- 后台维护:空闲太久的连接要关、太老的连接要退休、min_connections 要补齐——后台 task 做。
- 安全兜底:网络断、panic、cancel——每种异常路径都不能让 size 计数泄漏、不能让连接凭空消失。
sqlx Pool 内部用几个精心选择的并发原语把这五件事串起来:ArrayQueue(无锁队列)、AsyncSemaphore(公平异步信号量)、RAII guard(DecrementSizeGuard)、原子计数(AtomicU32 / AtomicUsize / AtomicBool)、event_listener(close 事件)。本章把它们逐一拆开。
14.2 PoolInner<DB> 的 10 个字段
sqlx-core/src/pool/inner.rs:26-37:
rust
pub(crate) struct PoolInner<DB: Database> {
pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
pub(super) idle_conns: ArrayQueue<Idle<DB>>,
pub(super) semaphore: AsyncSemaphore,
pub(super) size: AtomicU32,
pub(super) num_idle: AtomicUsize,
is_closed: AtomicBool,
pub(super) on_closed: event_listener::Event,
pub(super) options: PoolOptions<DB>,
pub(crate) acquire_time_level: Option<Level>,
pub(crate) acquire_slow_level: Option<Level>,
}10 个字段(最后两个 Level 是同一类日志配置的两面):
状态存储:
idle_conns:空闲连接的队列。size:当前 pool 持有的连接总数(atomic)。num_idle:空闲连接数(atomic)。
并发原语:
semaphore:控制 max_connections 并发。on_closed:close 事件的广播器。
配置:
connect_options:可热更新的连接参数。options:Pool 自身的配置(max_connections 等)。- 两个 Level:日志粒度。
标志:
is_closed:atomic bool,标记 pool 已关闭。
这 10 个字段加起来才 200 字节左右(idle_conns 和 semaphore 分别各几十字节头 + 堆分配内容)。PoolInner 本身很小、扩展性来自它内部的 ArrayQueue 和 AsyncSemaphore——这两者可以扩展到 max_connections 条连接的容量。
14.3 idle_conns: ArrayQueue 的无锁队列
inner.rs:27 的 idle_conns: ArrayQueue<Idle<DB>>——来自 crossbeam_queue crate的无锁环形队列。
为什么选 ArrayQueue 而不是 Mutex<VecDeque>?
ArrayQueue 是 bounded MPMC queue(多生产者多消费者、有界):
- 无锁:push 和 pop 都通过 atomic CAS 完成,不走 Mutex。
- 固定容量:
ArrayQueue::new(capacity)分配一次,之后不扩容。 - 零分配:push / pop 在已分配 buffer 里操作。
Pool 的 idle_conns 容量设为 max_connections(inner.rs:43-44)——最多同时有这么多空闲连接。
为什么这个选择对 Pool 重要——单线程无争抢时两种方案差异很小(都在几十 ns 量级);真正拉开差距的是高并发场景。Mutex<VecDeque> 下多线程同时 push/pop 会串行化到 Mutex 临界区——N 个 waiter 就有 N 倍延迟放大;而 ArrayQueue 基于 CAS 原子操作、多个线程的操作可以真正并行。高吞吐 Web 服务每秒上万次 acquire/release、若用 Mutex 方案在某个 P99 峰值里会看到 lock contention 的长尾。sqlx 选 ArrayQueue 避开这条长尾——代价是"有界 + 满了直接 panic"这个约束(Pool 通过 size counter 在外部已保证不会满、约束天然满足)。
ArrayQueue 的代价:有界(但 Pool 恰好天然有界、不是问题)、push 满了返回 Err(release 源码 inner.rs:230-232 对应处理,满了直接 panic——因为 size counter 理论上保证不会满)。
14.3.1 Idle<DB> 数据
pool/connection.rs:27-35:
rust
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) created_at: Instant,
}
pub(super) struct Idle<DB: Database> {
pub(super) live: Live<DB>,
pub(super) idle_since: Instant,
}Live:借出中的连接 + 建立时间。 Idle:Live + 进入 idle queue 的时间戳。
created_at 用于 max_lifetime 检查(连接活太久就退休);idle_since 用于 idle_timeout 检查(空闲太久就关掉)。两个 Instant 一起让 maintenance task 能做精确的年龄判断。
14.4 AsyncSemaphore: 公平异步信号量
inner.rs:29 的 semaphore: AsyncSemaphore——来自 sqlx-core/src/sync.rs。它是对 tokio::sync::Semaphore 或 async_std::sync::Semaphore 的统一包装(runtime 多后端支持,第 2 章 §2.5.1 讨论过)。
Semaphore 的作用:限制 acquire 的并发——最多 max_connections 个 permit。超过的 acquire 排队等。
AsyncSemaphore::new(fair, capacity) 的两个参数:
capacity: usize——permit 总数 = max_connections。fair: bool——排队语义:
14.4.1 fair 语义的 FIFO vs LIFO
fair = true(默认):FIFO 排队——先 acquire 的先拿到 permit。
fair = false:非公平——新 acquire 如果有 permit 直接抢;排队中的等更久。
看上去 fair 是"显然更公平"——但 fair = false 其实有其性能优势:
- fair:有 permit release 时必须唤醒队列头——可能是很久没跑的 task、cache cold、唤醒 + 调度开销大。
- 非 fair:有 permit release 时如果刚好有新 acquire 进来、直接给新来的——new acquire 多半 cache warm、延迟低。
sqlx 选默认 fair——避免队列尾饿死(极端并发下非 fair 可能让某些 task 等很久)。这是一个"吞吐 vs 公平性"的经典 trade-off,sqlx 默认取公平。
用户可以通过 PoolOptions::__fair(false) 关闭——但方法名以 __ 开头表明是"内部 / 不建议调"。大多数场景 fair 就是对的。
14.4.2 AsyncSemaphore 对 Tokio Semaphore 的包装
sync.rs 里的核心实现(以 tokio feature 为例):
rust
// 简化
pub struct AsyncSemaphore {
inner: Arc<tokio::sync::Semaphore>,
fair: bool,
}
impl AsyncSemaphore {
pub fn new(fair: bool, capacity: usize) -> Self {
Self { inner: Arc::new(tokio::sync::Semaphore::new(capacity)), fair }
}
pub async fn acquire(&self, n: u32) -> AsyncSemaphoreReleaser<'_> {
// 调 tokio 的 acquire、返回 releaser wrapper
let permit = self.inner.acquire_many(n).await.unwrap();
AsyncSemaphoreReleaser { ... }
}
// ...
}Tokio 的 Semaphore 本身就支持 fair(通过 acquire_many)——sqlx 只是加一层包装、统一 runtime 差异。实际性能特征由 Tokio 本身决定——对 AsyncSemaphore 的分析可以转化成对 Tokio Semaphore 的分析。Tokio 0.3 之后的 Semaphore 是lock-free 的,基于 intrusive linked list——这也是为什么 fair 模式下的排队开销并不大。
14.5 连接状态机:Live / Idle / Floating
三个状态类型(pool/connection.rs:27-40):
rust
pub(super) struct Live<DB: Database> {
pub(super) raw: DB::Connection,
pub(super) created_at: Instant,
}
pub(super) struct Idle<DB: Database> {
pub(super) live: Live<DB>,
pub(super) idle_since: Instant,
}
pub(super) struct Floating<DB: Database, C> {
pub(super) inner: C,
pub(super) guard: DecrementSizeGuard<DB>,
}三种状态的含义:
Live<DB>:连接在用中——raw connection + 建立时间。Idle<DB>:连接在 idle queue 里——Live + 入队时间。Floating<DB, C>:过渡态——连接被"捞出"idle queue 但还没成正式的 PoolConnection、或者 PoolConnection drop 后还没归还——内带 DecrementSizeGuard 做 RAII 兜底。
状态转换图:
五个转换:
- Connecting → Floating_Live:新建连接成功。
- Floating_Live → PoolConnection:包装成用户可见的 PoolConnection。
- PoolConnection → Floating_Live:drop 开始归还。
- Floating_Live → Idle:push 进 idle queue。
- Idle → Floating_Idle:acquire 弹出。
14.5.1 Floating 的 RAII 保证
Floating 的类型参数 C 是 Live 或 Idle 之一——它自己是"我正在处理这个连接,还没决定归宿"的过渡态。
关键在 guard: DecrementSizeGuard<DB>——如果 Floating 被 drop 但没显式 release 或 close,DecrementSizeGuard 的 Drop 会把 pool size 减 1、释放信号量 permit——保证连接计数不泄漏。
这是 Rust RAII 的经典用法——状态变迁本来可能出错(网络断、panic、await 取消),用 RAII guard 兜底让计数器自动修正。
14.6 acquire 的完整实现
inner.rs:238-318 的 acquire 主体(简化后):
rust
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
if self.is_closed() { return Err(Error::PoolClosed); }
let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;
let acquired = crate::rt::timeout(self.options.acquire_timeout, async {
loop {
let permit = self.acquire_permit().await?;
// 1. 先尝试 idle queue
let guard = match self.pop_idle(permit) {
Ok(conn) => match check_idle_conn(conn, &self.options).await {
Ok(live) => return Ok(live), // ✓ 空闲连接可用
Err(guard) => guard, // ✗ 连接有问题,guard 待重建
},
Err(permit) => {
// idle queue 空,尝试建新连接
if let Ok(guard) = self.try_increment_size(permit) {
guard
} else {
// 既没空闲也不能增 size——重试
crate::rt::yield_now().await;
continue;
}
}
};
// 2. 建新连接
return self.connect(deadline, guard).await;
}
}).await.map_err(|_| Error::PoolTimedOut)??;
Ok(acquired)
}三阶段:
- 获取 permit(受 semaphore 限制)。
- 尝试 pop idle——有就用(先
check_idle_conn验证)、没有就尝试try_increment_size决定是否能建新。 - 建连接——调
connect(deadline, guard),失败 backoff 重试。
整段包在 rt::timeout(acquire_timeout, ...) 里——超总时限返回 PoolTimedOut。
14.6.1 check_idle_conn:acquire 时的健康检查
inner.rs:463-502(简化):
rust
async fn check_idle_conn<DB: Database>(
mut conn: Floating<DB, Idle<DB>>,
options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
// 1. 检查 max_lifetime
if is_beyond_max_lifetime(&conn, options) {
return Err(conn.close().await.guard);
}
// 2. test_before_acquire
if options.test_before_acquire {
if let Err(_) = conn.inner.live.raw.ping().await {
return Err(conn.close().await.guard);
}
}
// 3. before_acquire 回调
if let Some(callback) = &options.before_acquire {
// 调用回调、按返回 decide keep / drop
}
Ok(conn.into_live())
}三层检查依次过:
- max_lifetime:连接太老——关掉、让上层建新的。
- test_before_acquire:ping 验证(默认开)。
- before_acquire 回调:用户自定义校验(第 13 章 §13.11.2)。
任一失败返回 Err(DecrementSizeGuard)——让上层拿 guard 去建新连接(guard 已经持有"size 已算"的承诺,不用再 try_increment)。
14.7 release 归还
inner.rs:222-234:
rust
pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
let Floating { inner: idle, guard } = floating.into_idle();
if self.idle_conns.push(idle).is_err() {
panic!("BUG: connection queue overflow in release()");
}
// 释放 permit(但不减 size)
guard.release_permit();
self.num_idle.fetch_add(1, Ordering::AcqRel);
}三步:
floating.into_idle()把Floating<Live>转成Idle——记录idle_since = Instant::now()。- push 进 idle queue——成功是不变量(size counter 保证不会满)。
guard.release_permit()——释放信号量 permit(让下一个 acquire 能拿)但不减 size(连接还在 pool 里、只是状态变 idle)。num_idle += 1。
注意 release_permit 和 decrement_size 的区别:
- release_permit:归还信号量名额——让并发 limit 恢复。
- decrement_size:实际减少连接总数——只在 connection 真的 close 时发生。
这两者分离是 Pool 设计的精妙之处——信号量限制"并发"、size 限制"总数"。一条连接 idle 时占 size 不占信号量(等它被拿起时信号量才被占)。
14.8 try_increment_size + DecrementSizeGuard
inner.rs:194-217:
rust
pub(super) fn try_increment_size<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<DB>, AsyncSemaphoreReleaser<'a>> {
let result = self.size.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
if self.is_closed() { return None; }
size.checked_add(1).filter(|size| size <= &self.options.max_connections)
});
match result {
Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
Err(_) => Err(permit),
}
}原子 CAS 循环(fetch_update)—— size 已达 max 或 pool 已关返回 None(CAS 失败)、否则成功 +1、返回 DecrementSizeGuard。
DecrementSizeGuard 的 Drop 实现(inner.rs:601-617):
rust
impl<DB: Database> Drop for DecrementSizeGuard<DB> {
fn drop(&mut self) {
if self.cancelled { return; }
// 没被 cancel 就自动减 size
self.pool.size.fetch_sub(1, Ordering::AcqRel);
self.pool.semaphore.release(1);
}
}如果 guard 没被 cancel()——drop 时自动减 size + 释放 permit。
谁会调 cancel?——成功把连接变成 PoolConnection 时(DecrementSizeGuard::cancel)——这时 guard 不需要兜底、pool 确实多了一条连接。
没调 cancel 的路径:connect 失败、after_connect 失败、任何"已增 size 但连接没成型"场景。自动减 size 让 size counter 不泄漏——Pool 的核心不变量。
这条 RAII 设计让建连接失败的各种错误路径都自动正确——不用程序员在每个 ? 后面手动 decrement。
14.9 PoolConnection::Drop 归还机制
用户代码里 let conn = pool.acquire().await?; 的 conn 最终被 drop 时会怎样?
pool/connection.rs:199-213:
rust
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if self.close_on_drop {
crate::rt::spawn(self.take_and_close());
return;
}
// 还需要 spawn task 来维持 min_connections
if self.live.is_some() || self.pool.options.min_connections > 0 {
crate::rt::spawn(self.return_to_pool());
}
}
}Drop 里 spawn 一个异步 task ——return_to_pool。这个 task 做的事(pool/connection.rs 的 return_to_pool 方法):
- 从 self.live 取出 Live
<DB>。 - 调用
after_release回调(如果配置了)。 - 检查 max_lifetime——连接太老就 close(不归还)。
- push 到 idle queue + release permit(即调
PoolInner::release)。
注意是 spawn 而不是 block——drop 是同步函数,不能 await。spawn 出一个 task 让异步清理异步进行。代价是drop 返回时清理还没完成——但用户一般不关心(pool 内部最终一致)。
14.9.1 close_on_drop 的特殊路径
PoolConnection 有一个 close_on_drop: bool 字段——用户可以设让连接drop 时直接关而不是归还:
rust
let mut conn = pool.acquire().await?;
conn.close_on_drop(); // 标记:用完关
// ... 用 conn ...
// drop → 连接被 close 而不是 release典型用途:你做了一些可能破坏连接状态的操作(临时改了 session 参数、触发了某种错误)——不想让这条连接污染其他请求——直接关掉让 pool 建新的。
14.10 spawn_maintenance_tasks:后台驱逐循环
inner.rs:503-572 是 Pool 的"后台维护任务"入口:
rust
fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
let pool_weak = Arc::downgrade(pool);
let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
(Some(it), None) | (None, Some(it)) => it,
(Some(a), Some(b)) => cmp::min(a, b),
(None, None) => {
// 都没设——只维持 min_connections
if pool.options.min_connections > 0 {
crate::rt::spawn(async move { ... pool.min_connections_maintenance(None).await; ... });
}
return;
}
};
let mut close_event = pool.close_event();
crate::rt::spawn(async move {
let _ = close_event.do_until(async {
while let Some(pool) = pool_weak.upgrade() {
if pool.is_closed() { return; }
// 扫描 idle queue,关过期连接
for _ in 0..pool.num_idle() {
if let Some(conn) = pool.try_acquire() {
if is_beyond_idle_timeout(&conn, &pool.options)
|| is_beyond_max_lifetime(&conn, &pool.options)
{
let _ = conn.close().await;
pool.min_connections_maintenance(Some(next_run)).await;
} else {
pool.release(conn.into_live());
}
}
}
// 等下一轮
crate::rt::sleep(duration).await;
}
}).await;
});
}四个关键点:
- period 由
min(idle_timeout, max_lifetime)决定——默认 10 分钟。意味着每 10 分钟扫描一次 idle queue。 Arc::downgrade成 Weak——后台任务持有 Weak 引用、不阻止 PoolInner drop。Pool 最后一个 handle drop 时 Weak 升级失败、循环退出。- close_event 可取消——Pool close 时 close_event 触发、
do_until退出、task 结束。 - 扫描过程自身用
try_acquire+release——连接先拿出来检查、不过期就放回去、过期就 close。
为什么用 try_acquire 而不是直接操作 idle_conns?——因为 try_acquire 走正常路径(抢 permit + 弹 idle)——和用户的 acquire 互斥。防止"维护任务刚检查完连接用户就拿去用"这种 race。
14.10.1 min_connections_maintenance
inner.rs:405 附近的 min_connections_maintenance(简化):
rust
pub async fn min_connections_maintenance(self: &Arc<Self>, deadline: Option<Instant>) {
while self.size() < self.options.min_connections {
// 尝试建新连接到 min
let Ok(guard) = self.try_increment_size_unchecked() else { break; };
let Ok(conn) = self.connect(deadline_or_now, guard).await else { break; };
self.release(conn);
}
}保证 size >= min_connections。调用时机:
- 启动时(PoolOptions::build 内部)。
- 每次 maintenance scan 后——如果关掉过期连接导致 size 低于 min,立即补齐。
- PoolConnection Drop 里(
connection.rs:209-210)——drop 时也调一次、补齐。
min_connections 的最佳 effort 语义——try_increment_size 可能因为竞争失败、connect 可能网络抖动失败——但不阻止进程继续。下次 maintenance 再试。
14.11 连接建立的指数退避
inner.rs:321-398 的 connect 方法有一个精彩的 backoff 机制:
rust
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout(deadline)? / 5;
loop {
let timeout = deadline_as_timeout(deadline)?;
match crate::rt::timeout(timeout, connect_options.connect()).await {
Ok(Ok(mut raw)) => { /* 成功路径 */ }
// IO 错误(连接被拒)——可能是 DB 刚启动
Ok(Err(Error::Io(e))) if e.kind() == ConnectionRefused => (),
// 瞬时 DB 错误
Ok(Err(Error::Database(error))) if error.is_transient_in_connect_phase() => (),
Ok(Err(e)) => return Err(e),
Err(_) => return Err(Error::PoolTimedOut),
}
crate::rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}backoff 策略:
- 初始 10ms。
- 每次失败 ×2:10 → 20 → 40 → 80 → ...
- 上限 acquire_timeout / 5——不超过总时限的 1/5 让至少有 5 次尝试。
两类可重试的错误:
ConnectionRefused——DB 可能在启动中。is_transient_in_connect_phase()——DB 报的瞬时错误(比如太多连接但即将缓解)。
其他错误直接返回——认证失败、DNS 不可解等,重试没意义。
这条退避机制对容器化部署特别重要——app 和 DB 可能同时启动、app 先 up 时 DB 还没 ready——backoff 让 app 能平滑等 DB 就绪。
14.12 close 的关闭协议
inner.rs:97-120:
rust
pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
self.mark_closed();
async move {
for permits in 1..=self.options.max_connections {
// 关所有 idle
while let Some(idle) = self.idle_conns.pop() {
let _ = idle.live.float((*self).clone()).close().await;
}
if self.size() == 0 { break; }
// 等所有 permit 都 release
let _permits = self.semaphore.acquire(permits).await;
}
}
}三阶段:
- mark_closed——atomic bool 置 true,
on_closed事件广播。 - 关所有 idle——循环 pop idle queue、close。
- 等所有借出的连接归还——通过
semaphore.acquire(permits)逐步增加需要的 permit 数——这个 acquire 在所有借出连接归还(release permit)后才能完成——等价于"所有 in-use 连接都归还"。
这是一个精妙的用法——用信号量的 acquire 作为"等所有 release"的同步——不需要显式 channel 或 condvar。
14.12.1 Pool 内部关键流程的整体图
把本章讨论的几条主路径合一张总图:
这张图包含四条主路径:
- 用户 acquire + 已有 idle:
acquire → semaphore → pop idle → check → 返回。 - 用户 acquire + 无 idle:
acquire → semaphore → try_increment → connect → 返回。 - 用户 drop conn:
drop → spawn return_to_pool → release → 入 idle queue。 - 后台 maintenance:
扫描 → 关过期 → 触发 min_connections 补齐。
理解这四条路径你就理解了 Pool 整体的动态行为。
14.13 本章小结
本章把 PoolInner 的每一个机制拆开:
- PoolInner 10 字段(§14.2)—— idle_conns / semaphore / size / num_idle / is_closed / on_closed + 配置和日志。
- ArrayQueue 无锁队列(§14.3)—— crossbeam-queue 的 MPMC 有界队列,基于 CAS 原子操作、高并发下不被 Mutex contention 拖长尾。
- AsyncSemaphore fair 语义(§14.4)—— 默认 FIFO 公平。抽象 Tokio/async-std 差异。
- 三状态 Live/Idle/Floating(§14.5)—— Floating 是过渡态、带 DecrementSizeGuard RAII。
- acquire 的 loop(§14.6)—— 获取 permit → pop idle 或 try_increment_size → 建连接。check_idle_conn 三层验证。
- release 的双计数分离(§14.7)—— permit 管并发、size 管总数——idle 时占 size 不占 permit。
- DecrementSizeGuard RAII(§14.8)—— fetch_update 原子 +size、Drop 自动 -size 除非 cancel——错误路径的自动修正。
- PoolConnection::Drop spawn 归还(§14.9)—— 异步清理 +
after_release回调 +close_on_drop特殊路径。 - maintenance task(§14.10)—— 后台扫描 idle queue、关过期连接、维持 min_connections。用 Weak 引用不阻止 PoolInner drop。
- 指数退避 backoff(§14.11)—— 建连接失败 10ms→20ms→40ms...,上限 acquire_timeout/5。
- close 的三阶段(§14.12)—— mark_closed → 关 idle → 等 in-use 归还。用 semaphore.acquire 作为"等归还"的同步。
下一章进入 Transaction——事务 RAII guard 的实现、savepoint 嵌套、Transaction::drop 的"尽力 rollback"、commit/rollback 的明确义务。
14.14 Pool 内部的并发正确性
Pool 内部是高并发代码——多个 async task 同时 acquire、maintenance task 后台扫描、close 可能并发——如何保证正确性?
sqlx 用了几条并发技巧:
1. 原子计数 + CAS 循环。size.fetch_update 用乐观 CAS——读当前值、算新值、原子替换失败就重试。这比 Mutex<u32> 快:
rust
let result = self.size.fetch_update(..., |size| {
if self.is_closed() { return None; }
size.checked_add(1).filter(|size| size <= &max)
});checked_add + filter 一起表达"没满才能加"。Ok(new) 表示成功、Err(old) 表示失败(None 分支或 CAS 竞争失败)。
2. 分离 permit 和 size。permit 管"允许多少并发 acquire"、size 管"实际多少连接"。两者独立——idle 连接占 size 不占 permit。通过 DecrementSizeGuard::release_permit(不减 size)vs Drop(减 size)实现精细控制。
3. Weak 引用防止循环。maintenance task 持 Weak<PoolInner>——不阻止 PoolInner drop。Arc 计数为 0 时 Weak upgrade 返回 None、task 退出。否则 Arc cycle 会让 Pool 永远不被回收。
4. event_listener 用于 close 信号。on_closed: event_listener::Event——多个 waiter 同时 listen、一次 notify(usize::MAX) 唤醒所有。比 Arc<Notify> 轻量。
5. crate::rt::yield_now() 在冲突时让步。acquire 循环里如果 pop idle + try_increment 都失败,yield 给其他 task 机会——避免 busy wait。
6. do_until 用于可取消操作。close_event().do_until(future) 让长任务能在 Pool close 时取消——优雅 shutdown 的基础。
这六条手法共同构成 Pool 的并发正确性保证——读这段源码学到的并发技巧可以迁移到你写任何高并发 Rust 代码。
14.15 性能特征和瓶颈分析
Pool 在不同负载下的定性性能特征(具体数值需自行 benchmark):
场景 A:低并发(< 10 acquire/s)
- 几乎没 semaphore 争抢——permit 永远有。
- 大多数 acquire 走 idle queue pop 快路径(纳秒级原子操作)。
- 主要开销是 Tokio 的 spawn(PoolConnection::Drop 里异步归还)。
场景 B:中等并发(100-1000 acquire/s)
- idle queue pop / push 频繁——ArrayQueue 的无锁优势显著。
- semaphore 可能有短暂等待。
- acquire 通常亚毫秒级、不会成为瓶颈。
场景 C:高并发 + 连接饱和(acquire > max_connections)
- 排队成为主要开销——fair 模式 FIFO。
- P99 可能触及
acquire_timeout(默认 30s)。 - 症状:P99 延迟剧烈震荡,偶发 PoolTimedOut。
场景 D:连接抖动
- 网络不稳、连接频繁断开重建。
- maintenance task 忙着驱逐;connect backoff 延长。
- 症状:num_idle 长期 0,size 剧烈波动。
瓶颈定位:
- 低并发慢:检查业务 query 本身(不是 Pool 问题)。
- 中等并发偶发慢:acquire_slow_level 打开日志——看哪些 acquire 变慢。
- 高并发排队:增大 max_connections 或加实例水平扩展。
- 抖动:排查网络 / DB 健康 / PgBouncer 状态。
这条性能特征分析让你在生产排查时对症下药——不会"P99 高了就盲目调 max_connections"。
14.16 Pool 作为 Rust 并发编程的教材
读完本章你会发现 sqlx 的 pool/ 模块是一份Rust 高并发代码的教材——浓缩了许多值得学习的模式:
| 技术 | 在 Pool 里的用法 | 通用用途 |
|---|---|---|
| 无锁 MPMC 队列 | idle_conns: ArrayQueue | 任何"多生产者多消费者 + 有界"场景 |
| 异步信号量 | AsyncSemaphore 控制 max_connections | 任何"并发数限制"场景 |
| 原子 CAS 循环 | size.fetch_update | 无锁计数器 |
| RAII 兜底 guard | DecrementSizeGuard | 错误路径自动修正计数 |
| Weak 引用避免 cycle | maintenance task 持 Weak<PoolInner> | 后台任务不阻止主对象回收 |
| event_listener | on_closed 事件广播 | 多 waiter 的单次通知 |
| 异步超时 + 退避 | connect backoff 10→max | 网络操作的重试 |
| Drop 里 spawn | PoolConnection::Drop → spawn return_to_pool | 异步清理资源 |
每一条都是 Rust 生态里反复出现的技术——掌握它们的具体用法(sqlx 展示的)可以直接迁移到你自己的项目。如果你在写 HTTP client pool、gRPC channel pool、任何"管理一组异步资源"的代码——sqlx 的 Pool 就是你的参考实现。
这也回应本书开头提过的一个观察:sqlx 不只是"写 SQL 用的库"——它是 Rust 异步后端生态里的基础设施范本。每一章学的不只是 sqlx 自己、也是 Rust 做资源管理的最佳实践。
14.17 三点必记的 Pool 内部常识
本章最后,精炼成三点生产工程师必记的 Pool 内部常识:
1. acquire 成功的条件是 "拿到 permit AND (pop idle OR try_increment 成功)"——两个 AND 和一个 OR。理解这个组合就理解了"为什么我的 acquire 超时"——看是哪一侧(permit 排队还是 size 撞上限)。
2. size 泄漏永远由 DecrementSizeGuard 兜底——哪怕 connect 过程 panic、task 被取消,只要 guard 没 cancel,drop 时自动减 size。这是 Pool 永远不会"size 单调上升超过实际连接数"的保证。
3. maintenance task 是 Weak 引用——Pool 的最后一个 handle drop 后、maintenance 会退出。你不需要手动关 pool——drop 所有 handle 就行。但生产建议显式 close().await 让 idle 连接优雅关——不然 maintenance 退出后 idle 连接靠 TCP FIN 通知 server,延迟较大。
这三点是生产级 sqlx 工程师必须理解的——碰到任何 Pool 相关诡异问题,从这三点出发找线索几乎都能定位到根因。
14.18 源码阅读推荐路径
如果读完本章你想自己直接读 sqlx 的 Pool 源码——推荐按以下顺序:
sqlx-core/src/pool/mod.rs——从 Pool struct 和公共方法开始(200 行对外 API)。sqlx-core/src/pool/options.rs——PoolOptions 的 13 个配置项(700 行,大部分是 builder 方法)。sqlx-core/src/pool/inner.rs——核心实现(670 行,最硬的一部分)。sqlx-core/src/pool/connection.rs——PoolConnection + Live/Idle/Floating 状态类型(420 行)。sqlx-core/src/sync.rs——AsyncSemaphore 的 runtime 抽象(200 行)。
总共约 2200 行——一个熟练 Rust 工程师 2-3 小时能通读。读这 5 个文件比任何书都能快速把 Rust 并发资源管理的最佳实践打进脑子——哪些该 Mutex、哪些该 atomic、哪些该 Weak、哪些该 spawn、哪些该 RAII guard——sqlx 给了详细示范。
重点关注源码中的注释——大部分关键决策有一段英文解释。比如 Fuse、Yield、CAS loop 这些地方的注释密度都很高——作者在和你对话"为什么这么做"。读注释比读代码本身学到的更多。
14.19 Pool 内部设计的通用启示
最后升华到 API 设计层面,sqlx Pool 内部代码给我们的三条启示:
1. 分离语义层次的计数器。permit(并发控制)和 size(资源总数)分开——让"一条 idle 连接占 size 但不占 permit" 这种细粒度控制可能。很多库一开始只有一个 "connection count" 计数——之后发现不够用、必须拆成两个。sqlx 从一开始就拆——省了后续的重构。
2. 用 RAII guard 处理错误路径。DecrementSizeGuard 的 Drop 让错误路径"默认正确"——程序员不用手写 "if err { decrement }"。这条设计在 Rust 里极其常见——Mutex 的 MutexGuard、BorrowMut 的 RefMut、tokio 的 tokio::sync::SemaphorePermit——都是同一套思路。遇到"需要成对做的操作"永远考虑 RAII。
3. 后台任务用 Weak 引用。maintenance task 持 Weak<PoolInner> 不阻止 PoolInner drop——这让 "Pool 最后一个 handle drop 后整个资源链自动回收" 成立。如果后台任务持 Arc,Pool 就永远不会被回收(cycle)。这条"引用类型的方向性"是 Rust 异步代码里最容易踩坑的地方——但学会后能避免很多生产事故。
读完本章,希望你对这三条启示有具体的代码位置作证——下次在自己代码里应用时不是抄模式、而是有理解的迁移。
14.20 parent_pool 的特殊机制
inner.rs:127-170 的 acquire_permit 里有一段关于 parent_pool 的处理——这是 sqlx 一个少用但精彩的机制。
PoolOptions::parent(pool) 让一个子 Pool 从父 Pool 偷 permit:
rust
let main_pool = PgPoolOptions::new().max_connections(100).connect(url).await?;
// 子 Pool 共享父的连接预算
let subtask_pool = PgPoolOptions::new()
.max_connections(20)
.parent(main_pool.clone())
.connect_lazy_with(opts);语义:
- 子 Pool 的 max_connections = 20 —— 逻辑上限。
- 父 Pool 的 max_connections = 100 —— 两者共享一个 permit 空间。
- 子 Pool 的 acquire 会同时尝试从自己的信号量和父的信号量抢 permit——父先满足就走父、否则等子。
- 效果:子 Pool 最多用 20 条 "属于它自己的" 名额,但额外能借父的名额。
典型用例:一个服务里有主路径 pool(业务查询)和后台任务 pool(ETL / 统计)。后台 pool 不能占满主路径的连接——设成子 Pool、max_connections 给个适量(20 条),让后台跑慢但不撞主路径。
实现上 acquire_permit(inner.rs:127-170)的 future::poll_fn 里同时 poll 子和父的 semaphore——先 poll 子(优先用自己的),如果子满了就 poll 父(借)。逻辑上 permit 实际上被父的 semaphore 管理——子只是把自己的限额加到父上(子的 semaphore capacity 是 0,所有 permit 都从父借)。
这条机制在日常业务代码里少用——但一旦遇到"主路径 / 后台路径 分担连接预算"的场景就非常有用。生产 PgBouncer 也有类似的 reserve_pool_size 机制——sqlx 在 client 侧提供了等价功能。
14.21 Pool 并发测试的挑战
Pool 的内部代码要做并发正确性测试——sqlx 实际仓库里有一套集成测试:
tests/pool/目录的 spawned concurrent tests——N 个 tokio task 同时 acquire + release,验证 size counter 不泄漏、idle queue 不乱序。loom测试(如果启用)——用 loom crate 做穷尽并发状态检查,catch 常规测试跑不出的 race。
但 Pool 的绝大部分"正确性保证"不是测试出来的——是代码设计时就避免竞争:
- atomic CAS 让 size 更新无 race。
- RAII guard 让错误路径自动修正。
- Weak 引用让后台任务不阻碍 drop。
"设计出正确性"比"测试出正确性"更重要——这是 Rust 并发编程的核心理念。sqlx Pool 的代码结构让"测试不出不代表有 bug"的担忧降到最低——关键不变量由类型系统和 RAII 强制。
这也是为什么 sqlx 的 Pool 实现能在数年生产里保持稳定——不是靠天选 / 测试覆盖率、而是靠设计层面的并发正确性。这条设计原则值得你在自己项目里反复实践。
14.22 acquire_permit 函数的 poll_fn 精读
前面 §14.6 一笔带过了 acquire_permit——但它的 poll_fn 实现值得精读。inner.rs:141-170:
rust
future::poll_fn(|cx| {
if close_event.as_mut().poll(cx).is_ready() {
return Poll::Ready(Err(Error::PoolClosed));
}
if parent_close_event.as_mut().poll(cx).is_ready() {
self.mark_closed();
return Poll::Ready(Err(Error::PoolClosed));
}
if let Poll::Ready(permit) = acquire_self.as_mut().poll(cx) {
return Poll::Ready(Ok(permit));
}
// 父 Pool 的 poll 有延迟
if poll_parent {
acquire_parent.as_mut().poll(cx).map(Ok)
} else {
poll_parent = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}).await四个 future 同时 poll——close_event / parent_close_event / acquire_self / acquire_parent。按优先级:
- close_event:Pool 已关,立即返回 PoolClosed。
- parent_close_event:父 Pool 关,propagate 到子、同样 PoolClosed。
- acquire_self:自己的 semaphore 有 permit,立即拿。
- acquire_parent:从父借 permit——但第一次 poll 故意返回 Pending + wake,给子一次"不借父"的机会。
第四步的延迟 poll 很精彩——poll_parent = false 时调用者第一次 poll 返回 Pending + 立刻 wake——让 executor 再 poll 一次(这次 poll_parent = true)。中间可能有子的 semaphore 释放(其他 task 归还)——让子 Pool 优先用自己的配额、只有真找不到时才借父。
这是一个**"优先本地、不得已才借外部"** 的语义——用 poll_fn 里的状态机精确表达。写这段代码的作者思考到了"如果同时 poll 父和子、父可能抢先返回"的 race、用延迟一个 poll cycle 规避。
读这段代码你能感受到 sqlx 作者的并发代码功力——对 Rust future polling 的 semantics 理解极深。日常业务代码可能永远不会写出这种 poll_fn、但理解它能让你看懂任何复杂 Future 实现。
14.23 Pool 性能统计:一些真实数据
基于源码分析 + 常规 benchmark 可以给一些量级估计(不是精确数字):
| 操作 | 耗时量级 | 主要开销来源 |
|---|---|---|
ArrayQueue::pop / push | 30-100ns | 单次 atomic CAS |
AsyncSemaphore::acquire(立刻) | 100-500ns | Tokio Semaphore 的 fast path |
AsyncSemaphore::acquire(排队) | μs 到 ms | 取决于排队长度 |
size.fetch_update CAS | 50-200ns | atomic RMW |
| DecrementSizeGuard RAII 全路径 | 200-500ns | 两个 atomic(size + permit) |
| PoolConnection::Drop + spawn | 1-5μs | tokio::spawn 自身开销 |
idle_conns.push + semaphore release | 200-500ns | 组合 |
| 后台 maintenance 一轮扫描(1000 idle) | ms 级 | try_acquire 1000 次 |
生产意义:sqlx Pool 在空闲热路径(acquire 直接拿到 idle)大概 200-500ns 完成——低于 μs 级。任何 acquire 超过 1μs 几乎一定是走了冷路径(建新连接、排队等、ping 失败)。用 tracing 追踪慢 acquire 会让问题暴露得明显。
对比 tokio-postgres 的手动 pool(deadpool-postgres)——性能大致同级,因为 deadpool 也用 crossbeam + Tokio Semaphore。Go 的 database/sql 的 pool——对标 sqlx 大约慢 2-3 倍(互斥锁 + GC 影响)。Java 的 HikariCP——也差不多。
sqlx 在连接池性能层面和生态里其他 state-of-the-art 的工具打平——不是落后也不是领先。优势在其他维度(类型安全 / query! 宏 / Rust 整合)。
14.24 一条实战排查案例
用一条真实生产场景串起本章内容——假设你的服务突然报大量 PoolTimedOut:
Step 1:看监控仪表盘——pool.size() 恒定在 max_connections、pool.num_idle() 恒 0——Pool 饱和。
Step 2:打开 acquire_slow_level——日志开始出现 slow acquire warning——每次 acquire 等 2s+。
Step 3:查 pg_stat_activity——发现大量连接在同一条长查询上。
Step 4:定位那条查询——SELECT ... FROM huge_table WHERE ... ORDER BY col——没索引、全表扫。
Step 5:根因——这条慢查询把 pool 的所有连接占满、新请求全排队超时。
Step 6:修复——加索引 / 加 statement_timeout(从 after_connect 里设)——让慢查询自动被 kill。
这条排查路径利用了本章讨论的几个机制:
pool.size() / num_idle()做饱和判断(§13.8)。acquire_slow_level定位慢 acquire(§13.10)。after_connect设 statement_timeout(§13.11)。- Pool 的 size 一直等于 max 而不增不减——backdrop 是 §14.8 的 size counter 稳定语义。
- 理解 idle 队列为空意味着所有连接在 in-use——这来自本章的 idle vs in-use 分离概念(§14.7)。
不理解 Pool 内部的工程师可能会乱改配置(增大 max_connections、改 acquire_timeout)——治标不治本。理解本章内容后你知道根因不在 Pool——而在业务查询——从这个方向调才是对的。
这条真实案例说明:读懂 Pool 内部不是学术锻炼、是生产武器。下次你排查任何 Pool 相关问题,本章的机制就是你的工具箱。
14.25 Pool 源码的代码品质观察
作为本章结尾,回看 sqlx pool 源码的代码品质几点观察:
1. 注释密度恰当。关键决策点(RAII guard 的 cancel 语义、poll_fn 的延迟 poll、close 的三阶段)都有 2-5 行英文注释——解释"为什么这么做"。其他显然的代码不加废话注释。这是良好开源代码的标志——新手能读懂、老手不被冗余。
2. 函数长度控制。大多数函数 20-50 行——能一屏看完。超过 100 行的只有 acquire(因为它是核心的循环 + 超时 + 错误处理)——这种"必要的长"可以接受。
3. panic! 使用谨慎。整个 pool 模块只有 1-2 处 panic:release 里的 "BUG: connection queue overflow"——因为逻辑上 size counter 保证 idle_conns 不会满、如果满了一定是 bug。这种不变量 violation 用 panic比用 Result 合理——把"不可能发生"的代码路径标出来、避免用户处理不存在的错误。
4. 测试友好的设计。parent_pool 机制(§14.20)让集成测试里可以构造"父子 pool"跑并发测——模拟生产的复杂拓扑。这种测试钩子不侵入生产代码(parent_pool 是正常 feature 而不是 test-only attr)——值得学习。
5. #[allow(dead_code)] 使用克制。源码里几乎没有 allow(dead_code)——意味着每一段代码都在被用。"死代码标记滥用"是很多开源项目的病——sqlx 避免了这个。
这些品质观察不是"锦上添花"——它们是 sqlx 能持续维护 7 年(从 2019 到 2026)的工程支撑。学 sqlx 不只是学库怎么用、也是学怎么写一份能长期维护的开源 Rust 库。
到此第 14 章结束——Pool 的外部 API(第 13 章)和内部实现(本章)都拆开了。下一章进入第四部分的最后一块拼图——Transaction。
14.26 四条代码阅读心得
读完本章你至少掌握了下面四条从 Pool 源码里直接学到的东西——它们放在任何 Rust 项目里都值得参考:
1. 多状态对象用类型系统表达状态。Live / Idle / Floating 三个类型就是状态机的三个状态——转换通过 into_xxx 方法、编译器保证状态迁移合法(你不能把 Live 直接推回 Idle queue、必须先 into_idle)。比用 bool flag 区分状态类型安全得多。
2. 原子操作 + RAII 组合处理"必须成对的资源操作"。增 size 必须配对减 size;抢 permit 必须配对 release——DecrementSizeGuard 让这两对"自动成对"。比 try { incr; ... } finally { decr } 的 Java 风格优雅很多。
3. 有界并发原语比无界好。ArrayQueue 是有界的(不会无限增长)、Pool 的 semaphore 是有界的(max_connections)——资源上限由类型本身保证。无界队列 / 无界信号量听起来灵活、实际生产是 OOM 温床。
4. 后台任务的生命周期管理用 Weak。任何"长期运行的后台 task"都用 Weak 引用它需要的资源——让资源的主 Arc drop 能触发 task 退出。业务代码里经常会写 tokio::spawn 一个 task 持 Arc 忘了 drop——导致资源永久活着。Weak 是防御。
掌握这四条、你读任何 Rust 高并发代码都会顺手——sqlx pool 是它们的集大成样本。
第 15 章 Transaction 会回到"业务语义"层面——讨论 RAII guard 对事务 commit/rollback 的强制约束、savepoint 嵌套、以及 Drop 里为什么"只能尽力 rollback"。
14.27 和《Tokio 源码深度解析》第12章的关联
sqlx Pool 的核心门禁是 AsyncSemaphore——本章看到 acquire_permit、DecrementSizeGuard、fair 队列这些概念——它们背后就是《Tokio 源码深度解析》第 12 章 §12.3 Semaphore:基础 building block 和 §12.4 公平锁 vs 非公平锁的 trade-off 里讨论的数据结构。
具体对应:
- sqlx
PoolOptions::fair = true默认(sqlx-core/src/pool/options.rs:163)↔ Tokio §12.4 讲tokio::sync::Semaphore公平队列——底层用 intrusive 链表 + FIFO Waker——每个 waiter 在Vec<Waker>里按到达顺序排。 - sqlx Pool 的
acquire_permit().await↔ Tokio §12.3 讲Semaphore::acquire(1)返回SemaphorePermit的 RAII guard——drop 时自动 release。 - sqlx
DecrementSizeGuard的"资源获取和释放绑定同一 struct" ↔ Tokio §12.3 讲的OwnedSemaphorePermit。
为什么要看 Tokio 那章——sqlx 池是在 Semaphore 之上封装业务语义(最大连接数、idle queue、min_connections 维护)——底层 Semaphore 的 waker 管理、公平性实现、poll 协议都在 Tokio crate 里。不看 Tokio 那一层、sqlx 的 acquire 为什么能被唤醒就只能靠直觉理解——看了就知道 Tokio Semaphore 内部 poll_acquire 是怎么挂 waker、怎么在 add_permits 时按序唤醒。
一条建议——如果本章读得有点吃力、先看《Tokio》第 12.3-12.4 半小时——再回来读本章会顺得多。