Skip to content

第14章 Pool 内部实现:空闲队列、公平性、驱逐策略

"The hardest part of writing a connection pool is knowing when to trust your state— and having a RAII guard ready for when you can't." —— 读懂 sqlx PoolInner 的人共同的领悟

本章要点

  • PoolInner<DB>sqlx-core/src/pool/inner.rs:26-3710 个字段:connect_options(RwLock)+ idle_conns(ArrayQueue)+ semaphore(AsyncSemaphore)+ size / num_idle(atomics)+ is_closed + on_closed(event_listener)+ options + 两个日志 Level。
  • idle_conns: ArrayQueue<Idle<DB>> 使用 crossbeam-queue 的无锁环形队列——固定容量(= max_connections)、多生产者多消费者、零分配。比 Mutex<VecDeque> 在高并发下避开 lock contention 长尾。
  • semaphore: AsyncSemaphoresqlx-core/src/sync.rs)是 sqlx 对 Tokio / async-std 的统一异步信号量包装fair: bool 决定排队是 FIFO(公平)还是 LIFO(抢占)。默认 true。
  • 三个状态类型Live<DB>(借出中)/ Idle<DB>(空闲队列里)/ Floating<DB, C>(过渡态,附带 DecrementSizeGuard RAII)。状态转换通过 float() / into_live() / into_idle() 方法——每次转换是类型级迁移。
  • DecrementSizeGuard<DB>inner.rs:573-585)是 Pool 最重要的 RAII guard——持有"已经增了 size 计数"的承诺,如果 Drop 前没有 cancel,自动 decrement——让"建新连接失败时 size 不会泄漏"成为不变量。
  • PoolConnection::Drop 自动 spawn 归还任务(connection.rs:199-213)—— drop 时 take 出 live、spawn 一个 return_to_pool future,异步归还到 idle queue。归还过程可能调用 after_release 回调做清理。
  • spawn_maintenance_tasksinner.rs:503-572)—— 启动时 spawn 一个后台任务,按 min(idle_timeout, max_lifetime) 为周期扫描 idle queue,关掉过期连接;同时维持 min_connections 下限。
  • 连接建立的退避 backoffinner.rs:321-398)——connect 失败用指数退避(10ms → 20ms → 40ms → ... 最多 acquire_timeout/5)重试,避免雪崩。

14.1 问题引入:Pool 内部的五件事

第 13 章讲完 Pool 的外部 API——用户能调的所有方法。但每次 pool.acquire().await? 底下究竟发生了什么?Pool 内部至少在做五件事:

  1. 管理一组连接:有空闲的(idle)、有借出去的(in-use),两类加起来不能超过 max_connections。
  2. 协调并发:100 个 async task 同时 acquire,谁先拿到、谁等、等多久——信号量(Semaphore)解决。
  3. 跟踪连接状态:一条连接从"空闲队列里"到"被借出用"到"归还回来"到"过期被关"——状态机。
  4. 后台维护:空闲太久的连接要关、太老的连接要退休、min_connections 要补齐——后台 task 做。
  5. 安全兜底:网络断、panic、cancel——每种异常路径都不能让 size 计数泄漏、不能让连接凭空消失。

sqlx Pool 内部用几个精心选择的并发原语把这五件事串起来:ArrayQueue(无锁队列)、AsyncSemaphore(公平异步信号量)、RAII guard(DecrementSizeGuard)、原子计数(AtomicU32 / AtomicUsize / AtomicBool)、event_listener(close 事件)。本章把它们逐一拆开。

14.2 PoolInner<DB> 的 10 个字段

sqlx-core/src/pool/inner.rs:26-37

rust
pub(crate) struct PoolInner<DB: Database> {
    pub(super) connect_options: RwLock<Arc<<DB::Connection as Connection>::Options>>,
    pub(super) idle_conns: ArrayQueue<Idle<DB>>,
    pub(super) semaphore: AsyncSemaphore,
    pub(super) size: AtomicU32,
    pub(super) num_idle: AtomicUsize,
    is_closed: AtomicBool,
    pub(super) on_closed: event_listener::Event,
    pub(super) options: PoolOptions<DB>,
    pub(crate) acquire_time_level: Option<Level>,
    pub(crate) acquire_slow_level: Option<Level>,
}

10 个字段(最后两个 Level 是同一类日志配置的两面):

状态存储

  • idle_conns:空闲连接的队列。
  • size:当前 pool 持有的连接总数(atomic)。
  • num_idle:空闲连接数(atomic)。

并发原语

  • semaphore:控制 max_connections 并发。
  • on_closed:close 事件的广播器。

配置

  • connect_options:可热更新的连接参数。
  • options:Pool 自身的配置(max_connections 等)。
  • 两个 Level:日志粒度。

标志

  • is_closed:atomic bool,标记 pool 已关闭。

这 10 个字段加起来才 200 字节左右(idle_conns 和 semaphore 分别各几十字节头 + 堆分配内容)。PoolInner 本身很小、扩展性来自它内部的 ArrayQueue 和 AsyncSemaphore——这两者可以扩展到 max_connections 条连接的容量。

14.3 idle_conns: ArrayQueue 的无锁队列

inner.rs:27idle_conns: ArrayQueue<Idle<DB>>——来自 crossbeam_queue crate的无锁环形队列。

为什么选 ArrayQueue 而不是 Mutex<VecDeque>

ArrayQueue 是 bounded MPMC queue(多生产者多消费者、有界):

  • 无锁:push 和 pop 都通过 atomic CAS 完成,不走 Mutex。
  • 固定容量ArrayQueue::new(capacity) 分配一次,之后不扩容。
  • 零分配:push / pop 在已分配 buffer 里操作。

Pool 的 idle_conns 容量设为 max_connectionsinner.rs:43-44)——最多同时有这么多空闲连接。

为什么这个选择对 Pool 重要——单线程无争抢时两种方案差异很小(都在几十 ns 量级);真正拉开差距的是高并发场景Mutex<VecDeque> 下多线程同时 push/pop 会串行化到 Mutex 临界区——N 个 waiter 就有 N 倍延迟放大;而 ArrayQueue 基于 CAS 原子操作、多个线程的操作可以真正并行。高吞吐 Web 服务每秒上万次 acquire/release、若用 Mutex 方案在某个 P99 峰值里会看到 lock contention 的长尾。sqlx 选 ArrayQueue 避开这条长尾——代价是"有界 + 满了直接 panic"这个约束(Pool 通过 size counter 在外部已保证不会满、约束天然满足)。

ArrayQueue 的代价:有界(但 Pool 恰好天然有界、不是问题)、push 满了返回 Err(release 源码 inner.rs:230-232 对应处理,满了直接 panic——因为 size counter 理论上保证不会满)。

14.3.1 Idle<DB> 数据

pool/connection.rs:27-35

rust
pub(super) struct Live<DB: Database> {
    pub(super) raw: DB::Connection,
    pub(super) created_at: Instant,
}

pub(super) struct Idle<DB: Database> {
    pub(super) live: Live<DB>,
    pub(super) idle_since: Instant,
}

Live:借出中的连接 + 建立时间。 Idle:Live + 进入 idle queue 的时间戳。

created_at 用于 max_lifetime 检查(连接活太久就退休);idle_since 用于 idle_timeout 检查(空闲太久就关掉)。两个 Instant 一起让 maintenance task 能做精确的年龄判断。

14.4 AsyncSemaphore: 公平异步信号量

inner.rs:29semaphore: AsyncSemaphore——来自 sqlx-core/src/sync.rs。它是对 tokio::sync::Semaphoreasync_std::sync::Semaphore统一包装(runtime 多后端支持,第 2 章 §2.5.1 讨论过)。

Semaphore 的作用:限制 acquire 的并发——最多 max_connections 个 permit。超过的 acquire 排队等。

AsyncSemaphore::new(fair, capacity) 的两个参数:

  • capacity: usize——permit 总数 = max_connections。
  • fair: bool——排队语义:

14.4.1 fair 语义的 FIFO vs LIFO

fair = true(默认):FIFO 排队——先 acquire 的先拿到 permit。

fair = false非公平——新 acquire 如果有 permit 直接抢;排队中的等更久。

看上去 fair 是"显然更公平"——但 fair = false 其实有其性能优势:

  • fair:有 permit release 时必须唤醒队列头——可能是很久没跑的 task、cache cold、唤醒 + 调度开销大。
  • 非 fair:有 permit release 时如果刚好有新 acquire 进来、直接给新来的——new acquire 多半 cache warm、延迟低。

sqlx 选默认 fair——避免队列尾饿死(极端并发下非 fair 可能让某些 task 等很久)。这是一个"吞吐 vs 公平性"的经典 trade-off,sqlx 默认取公平。

用户可以通过 PoolOptions::__fair(false) 关闭——但方法名以 __ 开头表明是"内部 / 不建议调"。大多数场景 fair 就是对的。

14.4.2 AsyncSemaphore 对 Tokio Semaphore 的包装

sync.rs 里的核心实现(以 tokio feature 为例):

rust
// 简化
pub struct AsyncSemaphore {
    inner: Arc<tokio::sync::Semaphore>,
    fair: bool,
}

impl AsyncSemaphore {
    pub fn new(fair: bool, capacity: usize) -> Self {
        Self { inner: Arc::new(tokio::sync::Semaphore::new(capacity)), fair }
    }

    pub async fn acquire(&self, n: u32) -> AsyncSemaphoreReleaser<'_> {
        // 调 tokio 的 acquire、返回 releaser wrapper
        let permit = self.inner.acquire_many(n).await.unwrap();
        AsyncSemaphoreReleaser { ... }
    }
    // ...
}

Tokio 的 Semaphore 本身就支持 fair(通过 acquire_many——sqlx 只是加一层包装、统一 runtime 差异。实际性能特征由 Tokio 本身决定——对 AsyncSemaphore 的分析可以转化成对 Tokio Semaphore 的分析。Tokio 0.3 之后的 Semaphore 是lock-free 的,基于 intrusive linked list——这也是为什么 fair 模式下的排队开销并不大。

14.5 连接状态机:Live / Idle / Floating

三个状态类型(pool/connection.rs:27-40):

rust
pub(super) struct Live<DB: Database> {
    pub(super) raw: DB::Connection,
    pub(super) created_at: Instant,
}

pub(super) struct Idle<DB: Database> {
    pub(super) live: Live<DB>,
    pub(super) idle_since: Instant,
}

pub(super) struct Floating<DB: Database, C> {
    pub(super) inner: C,
    pub(super) guard: DecrementSizeGuard<DB>,
}

三种状态的含义

  • Live<DB>:连接在用中——raw connection + 建立时间。
  • Idle<DB>:连接在 idle queue 里——Live + 入队时间。
  • Floating<DB, C>过渡态——连接被"捞出"idle queue 但还没成正式的 PoolConnection、或者 PoolConnection drop 后还没归还——内带 DecrementSizeGuard 做 RAII 兜底。

状态转换图

五个转换

  1. Connecting → Floating_Live:新建连接成功。
  2. Floating_Live → PoolConnection:包装成用户可见的 PoolConnection。
  3. PoolConnection → Floating_Live:drop 开始归还。
  4. Floating_Live → Idle:push 进 idle queue。
  5. Idle → Floating_Idle:acquire 弹出。

14.5.1 Floating 的 RAII 保证

Floating 的类型参数 C 是 Live 或 Idle 之一——它自己是"我正在处理这个连接,还没决定归宿"的过渡态。

关键在 guard: DecrementSizeGuard<DB>——如果 Floating 被 drop 但没显式 release 或 close,DecrementSizeGuard 的 Drop 会把 pool size 减 1、释放信号量 permit——保证连接计数不泄漏

这是 Rust RAII 的经典用法——状态变迁本来可能出错(网络断、panic、await 取消),用 RAII guard 兜底让计数器自动修正。

14.6 acquire 的完整实现

inner.rs:238-318 的 acquire 主体(简化后):

rust
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
    if self.is_closed() { return Err(Error::PoolClosed); }

    let acquire_started_at = Instant::now();
    let deadline = acquire_started_at + self.options.acquire_timeout;

    let acquired = crate::rt::timeout(self.options.acquire_timeout, async {
        loop {
            let permit = self.acquire_permit().await?;

            // 1. 先尝试 idle queue
            let guard = match self.pop_idle(permit) {
                Ok(conn) => match check_idle_conn(conn, &self.options).await {
                    Ok(live) => return Ok(live),      // ✓ 空闲连接可用
                    Err(guard) => guard,                // ✗ 连接有问题,guard 待重建
                },
                Err(permit) => {
                    // idle queue 空,尝试建新连接
                    if let Ok(guard) = self.try_increment_size(permit) {
                        guard
                    } else {
                        // 既没空闲也不能增 size——重试
                        crate::rt::yield_now().await;
                        continue;
                    }
                }
            };

            // 2. 建新连接
            return self.connect(deadline, guard).await;
        }
    }).await.map_err(|_| Error::PoolTimedOut)??;

    Ok(acquired)
}

三阶段

  1. 获取 permit(受 semaphore 限制)。
  2. 尝试 pop idle——有就用(先 check_idle_conn 验证)、没有就尝试 try_increment_size 决定是否能建新。
  3. 建连接——调 connect(deadline, guard),失败 backoff 重试。

整段包在 rt::timeout(acquire_timeout, ...) 里——超总时限返回 PoolTimedOut

14.6.1 check_idle_conn:acquire 时的健康检查

inner.rs:463-502(简化):

rust
async fn check_idle_conn<DB: Database>(
    mut conn: Floating<DB, Idle<DB>>,
    options: &PoolOptions<DB>,
) -> Result<Floating<DB, Live<DB>>, DecrementSizeGuard<DB>> {
    // 1. 检查 max_lifetime
    if is_beyond_max_lifetime(&conn, options) {
        return Err(conn.close().await.guard);
    }

    // 2. test_before_acquire
    if options.test_before_acquire {
        if let Err(_) = conn.inner.live.raw.ping().await {
            return Err(conn.close().await.guard);
        }
    }

    // 3. before_acquire 回调
    if let Some(callback) = &options.before_acquire {
        // 调用回调、按返回 decide keep / drop
    }

    Ok(conn.into_live())
}

三层检查依次过:

  1. max_lifetime:连接太老——关掉、让上层建新的。
  2. test_before_acquire:ping 验证(默认开)。
  3. before_acquire 回调:用户自定义校验(第 13 章 §13.11.2)。

任一失败返回 Err(DecrementSizeGuard)——让上层拿 guard 去建新连接(guard 已经持有"size 已算"的承诺,不用再 try_increment)。

14.7 release 归还

inner.rs:222-234

rust
pub(super) fn release(&self, floating: Floating<DB, Live<DB>>) {
    let Floating { inner: idle, guard } = floating.into_idle();

    if self.idle_conns.push(idle).is_err() {
        panic!("BUG: connection queue overflow in release()");
    }

    // 释放 permit(但不减 size)
    guard.release_permit();

    self.num_idle.fetch_add(1, Ordering::AcqRel);
}

三步:

  1. floating.into_idle()Floating<Live> 转成 Idle——记录 idle_since = Instant::now()
  2. push 进 idle queue——成功是不变量(size counter 保证不会满)。
  3. guard.release_permit()——释放信号量 permit(让下一个 acquire 能拿)但不减 size(连接还在 pool 里、只是状态变 idle)。
  4. num_idle += 1

注意 release_permit 和 decrement_size 的区别

  • release_permit:归还信号量名额——让并发 limit 恢复。
  • decrement_size:实际减少连接总数——只在 connection 真的 close 时发生。

这两者分离是 Pool 设计的精妙之处——信号量限制"并发"、size 限制"总数"。一条连接 idle 时占 size 不占信号量(等它被拿起时信号量才被占)。

14.8 try_increment_size + DecrementSizeGuard

inner.rs:194-217

rust
pub(super) fn try_increment_size<'a>(
    self: &'a Arc<Self>,
    permit: AsyncSemaphoreReleaser<'a>,
) -> Result<DecrementSizeGuard<DB>, AsyncSemaphoreReleaser<'a>> {
    let result = self.size.fetch_update(Ordering::AcqRel, Ordering::Acquire, |size| {
        if self.is_closed() { return None; }
        size.checked_add(1).filter(|size| size <= &self.options.max_connections)
    });

    match result {
        Ok(_) => Ok(DecrementSizeGuard::from_permit((*self).clone(), permit)),
        Err(_) => Err(permit),
    }
}

原子 CAS 循环fetch_update)—— size 已达 max 或 pool 已关返回 None(CAS 失败)、否则成功 +1、返回 DecrementSizeGuard。

DecrementSizeGuard 的 Drop 实现(inner.rs:601-617):

rust
impl<DB: Database> Drop for DecrementSizeGuard<DB> {
    fn drop(&mut self) {
        if self.cancelled { return; }

        // 没被 cancel 就自动减 size
        self.pool.size.fetch_sub(1, Ordering::AcqRel);
        self.pool.semaphore.release(1);
    }
}

如果 guard 没被 cancel()——drop 时自动减 size + 释放 permit。

谁会调 cancel?——成功把连接变成 PoolConnection 时(DecrementSizeGuard::cancel)——这时 guard 不需要兜底、pool 确实多了一条连接。

没调 cancel 的路径:connect 失败、after_connect 失败、任何"已增 size 但连接没成型"场景。自动减 size 让 size counter 不泄漏——Pool 的核心不变量。

这条 RAII 设计让建连接失败的各种错误路径都自动正确——不用程序员在每个 ? 后面手动 decrement。

14.9 PoolConnection::Drop 归还机制

用户代码里 let conn = pool.acquire().await?; 的 conn 最终被 drop 时会怎样?

pool/connection.rs:199-213

rust
impl<DB: Database> Drop for PoolConnection<DB> {
    fn drop(&mut self) {
        if self.close_on_drop {
            crate::rt::spawn(self.take_and_close());
            return;
        }

        // 还需要 spawn task 来维持 min_connections
        if self.live.is_some() || self.pool.options.min_connections > 0 {
            crate::rt::spawn(self.return_to_pool());
        }
    }
}

Drop 里 spawn 一个异步 task ——return_to_pool。这个 task 做的事(pool/connection.rs 的 return_to_pool 方法):

  1. 从 self.live 取出 Live<DB>
  2. 调用 after_release 回调(如果配置了)。
  3. 检查 max_lifetime——连接太老就 close(不归还)。
  4. push 到 idle queue + release permit(即调 PoolInner::release)。

注意是 spawn 而不是 block——drop 是同步函数,不能 await。spawn 出一个 task 让异步清理异步进行。代价是drop 返回时清理还没完成——但用户一般不关心(pool 内部最终一致)。

14.9.1 close_on_drop 的特殊路径

PoolConnection 有一个 close_on_drop: bool 字段——用户可以设让连接drop 时直接关而不是归还:

rust
let mut conn = pool.acquire().await?;
conn.close_on_drop();  // 标记:用完关
// ... 用 conn ...
// drop → 连接被 close 而不是 release

典型用途:你做了一些可能破坏连接状态的操作(临时改了 session 参数、触发了某种错误)——不想让这条连接污染其他请求——直接关掉让 pool 建新的。

14.10 spawn_maintenance_tasks:后台驱逐循环

inner.rs:503-572 是 Pool 的"后台维护任务"入口:

rust
fn spawn_maintenance_tasks<DB: Database>(pool: &Arc<PoolInner<DB>>) {
    let pool_weak = Arc::downgrade(pool);

    let period = match (pool.options.max_lifetime, pool.options.idle_timeout) {
        (Some(it), None) | (None, Some(it)) => it,
        (Some(a), Some(b)) => cmp::min(a, b),
        (None, None) => {
            // 都没设——只维持 min_connections
            if pool.options.min_connections > 0 {
                crate::rt::spawn(async move { ... pool.min_connections_maintenance(None).await; ... });
            }
            return;
        }
    };

    let mut close_event = pool.close_event();

    crate::rt::spawn(async move {
        let _ = close_event.do_until(async {
            while let Some(pool) = pool_weak.upgrade() {
                if pool.is_closed() { return; }

                // 扫描 idle queue,关过期连接
                for _ in 0..pool.num_idle() {
                    if let Some(conn) = pool.try_acquire() {
                        if is_beyond_idle_timeout(&conn, &pool.options)
                            || is_beyond_max_lifetime(&conn, &pool.options)
                        {
                            let _ = conn.close().await;
                            pool.min_connections_maintenance(Some(next_run)).await;
                        } else {
                            pool.release(conn.into_live());
                        }
                    }
                }

                // 等下一轮
                crate::rt::sleep(duration).await;
            }
        }).await;
    });
}

四个关键点

  1. periodmin(idle_timeout, max_lifetime) 决定——默认 10 分钟。意味着每 10 分钟扫描一次 idle queue。
  2. Arc::downgrade 成 Weak——后台任务持有 Weak 引用、不阻止 PoolInner drop。Pool 最后一个 handle drop 时 Weak 升级失败、循环退出。
  3. close_event 可取消——Pool close 时 close_event 触发、do_until 退出、task 结束。
  4. 扫描过程自身用 try_acquire + release——连接先拿出来检查、不过期就放回去、过期就 close。

为什么用 try_acquire 而不是直接操作 idle_conns?——因为 try_acquire 走正常路径(抢 permit + 弹 idle)——和用户的 acquire 互斥。防止"维护任务刚检查完连接用户就拿去用"这种 race。

14.10.1 min_connections_maintenance

inner.rs:405 附近的 min_connections_maintenance(简化):

rust
pub async fn min_connections_maintenance(self: &Arc<Self>, deadline: Option<Instant>) {
    while self.size() < self.options.min_connections {
        // 尝试建新连接到 min
        let Ok(guard) = self.try_increment_size_unchecked() else { break; };
        let Ok(conn) = self.connect(deadline_or_now, guard).await else { break; };
        self.release(conn);
    }
}

保证 size >= min_connections。调用时机:

  1. 启动时(PoolOptions::build 内部)。
  2. 每次 maintenance scan 后——如果关掉过期连接导致 size 低于 min,立即补齐。
  3. PoolConnection Drop 里connection.rs:209-210)——drop 时也调一次、补齐。

min_connections 的最佳 effort 语义——try_increment_size 可能因为竞争失败、connect 可能网络抖动失败——但不阻止进程继续。下次 maintenance 再试。

14.11 连接建立的指数退避

inner.rs:321-398 的 connect 方法有一个精彩的 backoff 机制:

rust
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout(deadline)? / 5;

loop {
    let timeout = deadline_as_timeout(deadline)?;

    match crate::rt::timeout(timeout, connect_options.connect()).await {
        Ok(Ok(mut raw)) => { /* 成功路径 */ }

        // IO 错误(连接被拒)——可能是 DB 刚启动
        Ok(Err(Error::Io(e))) if e.kind() == ConnectionRefused => (),

        // 瞬时 DB 错误
        Ok(Err(Error::Database(error))) if error.is_transient_in_connect_phase() => (),

        Ok(Err(e)) => return Err(e),
        Err(_) => return Err(Error::PoolTimedOut),
    }

    crate::rt::sleep(backoff).await;
    backoff = cmp::min(backoff * 2, max_backoff);
}

backoff 策略

  • 初始 10ms。
  • 每次失败 ×2:10 → 20 → 40 → 80 → ...
  • 上限 acquire_timeout / 5——不超过总时限的 1/5 让至少有 5 次尝试。

两类可重试的错误

  • ConnectionRefused——DB 可能在启动中。
  • is_transient_in_connect_phase()——DB 报的瞬时错误(比如太多连接但即将缓解)。

其他错误直接返回——认证失败、DNS 不可解等,重试没意义。

这条退避机制对容器化部署特别重要——app 和 DB 可能同时启动、app 先 up 时 DB 还没 ready——backoff 让 app 能平滑等 DB 就绪。

14.12 close 的关闭协议

inner.rs:97-120

rust
pub(super) fn close<'a>(self: &'a Arc<Self>) -> impl Future<Output = ()> + 'a {
    self.mark_closed();

    async move {
        for permits in 1..=self.options.max_connections {
            // 关所有 idle
            while let Some(idle) = self.idle_conns.pop() {
                let _ = idle.live.float((*self).clone()).close().await;
            }

            if self.size() == 0 { break; }

            // 等所有 permit 都 release
            let _permits = self.semaphore.acquire(permits).await;
        }
    }
}

三阶段

  1. mark_closed——atomic bool 置 true,on_closed 事件广播。
  2. 关所有 idle——循环 pop idle queue、close。
  3. 等所有借出的连接归还——通过 semaphore.acquire(permits) 逐步增加需要的 permit 数——这个 acquire 在所有借出连接归还(release permit)后才能完成——等价于"所有 in-use 连接都归还"。

这是一个精妙的用法——用信号量的 acquire 作为"等所有 release"的同步——不需要显式 channel 或 condvar。

14.12.1 Pool 内部关键流程的整体图

把本章讨论的几条主路径合一张总图:

这张图包含四条主路径:

  1. 用户 acquire + 已有 idleacquire → semaphore → pop idle → check → 返回
  2. 用户 acquire + 无 idleacquire → semaphore → try_increment → connect → 返回
  3. 用户 drop conndrop → spawn return_to_pool → release → 入 idle queue
  4. 后台 maintenance扫描 → 关过期 → 触发 min_connections 补齐

理解这四条路径你就理解了 Pool 整体的动态行为。

14.13 本章小结

本章把 PoolInner 的每一个机制拆开:

  1. PoolInner 10 字段(§14.2)—— idle_conns / semaphore / size / num_idle / is_closed / on_closed + 配置和日志。
  2. ArrayQueue 无锁队列(§14.3)—— crossbeam-queue 的 MPMC 有界队列,基于 CAS 原子操作、高并发下不被 Mutex contention 拖长尾。
  3. AsyncSemaphore fair 语义(§14.4)—— 默认 FIFO 公平。抽象 Tokio/async-std 差异。
  4. 三状态 Live/Idle/Floating(§14.5)—— Floating 是过渡态、带 DecrementSizeGuard RAII。
  5. acquire 的 loop(§14.6)—— 获取 permit → pop idle 或 try_increment_size → 建连接。check_idle_conn 三层验证。
  6. release 的双计数分离(§14.7)—— permit 管并发、size 管总数——idle 时占 size 不占 permit。
  7. DecrementSizeGuard RAII(§14.8)—— fetch_update 原子 +size、Drop 自动 -size 除非 cancel——错误路径的自动修正。
  8. PoolConnection::Drop spawn 归还(§14.9)—— 异步清理 + after_release 回调 + close_on_drop 特殊路径。
  9. maintenance task(§14.10)—— 后台扫描 idle queue、关过期连接、维持 min_connections。用 Weak 引用不阻止 PoolInner drop。
  10. 指数退避 backoff(§14.11)—— 建连接失败 10ms→20ms→40ms...,上限 acquire_timeout/5。
  11. close 的三阶段(§14.12)—— mark_closed → 关 idle → 等 in-use 归还。用 semaphore.acquire 作为"等归还"的同步。

下一章进入 Transaction——事务 RAII guard 的实现、savepoint 嵌套、Transaction::drop 的"尽力 rollback"、commit/rollback 的明确义务。

14.14 Pool 内部的并发正确性

Pool 内部是高并发代码——多个 async task 同时 acquire、maintenance task 后台扫描、close 可能并发——如何保证正确性?

sqlx 用了几条并发技巧:

1. 原子计数 + CAS 循环size.fetch_update 用乐观 CAS——读当前值、算新值、原子替换失败就重试。这比 Mutex<u32> 快:

rust
let result = self.size.fetch_update(..., |size| {
    if self.is_closed() { return None; }
    size.checked_add(1).filter(|size| size <= &max)
});

checked_add + filter 一起表达"没满才能加"。Ok(new) 表示成功、Err(old) 表示失败(None 分支或 CAS 竞争失败)。

2. 分离 permit 和 size。permit 管"允许多少并发 acquire"、size 管"实际多少连接"。两者独立——idle 连接占 size 不占 permit。通过 DecrementSizeGuard::release_permit(不减 size)vs Drop(减 size)实现精细控制。

3. Weak 引用防止循环。maintenance task 持 Weak<PoolInner>——不阻止 PoolInner drop。Arc 计数为 0 时 Weak upgrade 返回 None、task 退出。否则 Arc cycle 会让 Pool 永远不被回收。

4. event_listener 用于 close 信号on_closed: event_listener::Event——多个 waiter 同时 listen、一次 notify(usize::MAX) 唤醒所有。比 Arc<Notify> 轻量。

5. crate::rt::yield_now() 在冲突时让步。acquire 循环里如果 pop idle + try_increment 都失败,yield 给其他 task 机会——避免 busy wait。

6. do_until 用于可取消操作close_event().do_until(future) 让长任务能在 Pool close 时取消——优雅 shutdown 的基础。

这六条手法共同构成 Pool 的并发正确性保证——读这段源码学到的并发技巧可以迁移到你写任何高并发 Rust 代码。

14.15 性能特征和瓶颈分析

Pool 在不同负载下的定性性能特征(具体数值需自行 benchmark):

场景 A:低并发(< 10 acquire/s)

  • 几乎没 semaphore 争抢——permit 永远有。
  • 大多数 acquire 走 idle queue pop 快路径(纳秒级原子操作)。
  • 主要开销是 Tokio 的 spawn(PoolConnection::Drop 里异步归还)。

场景 B:中等并发(100-1000 acquire/s)

  • idle queue pop / push 频繁——ArrayQueue 的无锁优势显著。
  • semaphore 可能有短暂等待。
  • acquire 通常亚毫秒级、不会成为瓶颈。

场景 C:高并发 + 连接饱和(acquire > max_connections)

  • 排队成为主要开销——fair 模式 FIFO。
  • P99 可能触及 acquire_timeout(默认 30s)。
  • 症状:P99 延迟剧烈震荡,偶发 PoolTimedOut

场景 D:连接抖动

  • 网络不稳、连接频繁断开重建。
  • maintenance task 忙着驱逐;connect backoff 延长。
  • 症状:num_idle 长期 0,size 剧烈波动

瓶颈定位

  • 低并发慢:检查业务 query 本身(不是 Pool 问题)。
  • 中等并发偶发慢:acquire_slow_level 打开日志——看哪些 acquire 变慢。
  • 高并发排队:增大 max_connections 或加实例水平扩展。
  • 抖动:排查网络 / DB 健康 / PgBouncer 状态。

这条性能特征分析让你在生产排查时对症下药——不会"P99 高了就盲目调 max_connections"。

14.16 Pool 作为 Rust 并发编程的教材

读完本章你会发现 sqlx 的 pool/ 模块是一份Rust 高并发代码的教材——浓缩了许多值得学习的模式:

技术在 Pool 里的用法通用用途
无锁 MPMC 队列idle_conns: ArrayQueue任何"多生产者多消费者 + 有界"场景
异步信号量AsyncSemaphore 控制 max_connections任何"并发数限制"场景
原子 CAS 循环size.fetch_update无锁计数器
RAII 兜底 guardDecrementSizeGuard错误路径自动修正计数
Weak 引用避免 cyclemaintenance task 持 Weak<PoolInner>后台任务不阻止主对象回收
event_listeneron_closed 事件广播多 waiter 的单次通知
异步超时 + 退避connect backoff 10→max网络操作的重试
Drop 里 spawnPoolConnection::Drop → spawn return_to_pool异步清理资源

每一条都是 Rust 生态里反复出现的技术——掌握它们的具体用法(sqlx 展示的)可以直接迁移到你自己的项目。如果你在写 HTTP client pool、gRPC channel pool、任何"管理一组异步资源"的代码——sqlx 的 Pool 就是你的参考实现

这也回应本书开头提过的一个观察:sqlx 不只是"写 SQL 用的库"——它是 Rust 异步后端生态里的基础设施范本。每一章学的不只是 sqlx 自己、也是 Rust 做资源管理的最佳实践。

14.17 三点必记的 Pool 内部常识

本章最后,精炼成三点生产工程师必记的 Pool 内部常识:

1. acquire 成功的条件是 "拿到 permit AND (pop idle OR try_increment 成功)"——两个 AND 和一个 OR。理解这个组合就理解了"为什么我的 acquire 超时"——看是哪一侧(permit 排队还是 size 撞上限)。

2. size 泄漏永远由 DecrementSizeGuard 兜底——哪怕 connect 过程 panic、task 被取消,只要 guard 没 cancel,drop 时自动减 size。这是 Pool 永远不会"size 单调上升超过实际连接数"的保证。

3. maintenance task 是 Weak 引用——Pool 的最后一个 handle drop 后、maintenance 会退出。你不需要手动关 pool——drop 所有 handle 就行。但生产建议显式 close().await 让 idle 连接优雅关——不然 maintenance 退出后 idle 连接靠 TCP FIN 通知 server,延迟较大。

这三点是生产级 sqlx 工程师必须理解的——碰到任何 Pool 相关诡异问题,从这三点出发找线索几乎都能定位到根因。

14.18 源码阅读推荐路径

如果读完本章你想自己直接读 sqlx 的 Pool 源码——推荐按以下顺序:

  1. sqlx-core/src/pool/mod.rs——从 Pool struct 和公共方法开始(200 行对外 API)。
  2. sqlx-core/src/pool/options.rs——PoolOptions 的 13 个配置项(700 行,大部分是 builder 方法)。
  3. sqlx-core/src/pool/inner.rs——核心实现(670 行,最硬的一部分)。
  4. sqlx-core/src/pool/connection.rs——PoolConnection + Live/Idle/Floating 状态类型(420 行)。
  5. sqlx-core/src/sync.rs——AsyncSemaphore 的 runtime 抽象(200 行)。

总共约 2200 行——一个熟练 Rust 工程师 2-3 小时能通读。读这 5 个文件比任何书都能快速把 Rust 并发资源管理的最佳实践打进脑子——哪些该 Mutex、哪些该 atomic、哪些该 Weak、哪些该 spawn、哪些该 RAII guard——sqlx 给了详细示范。

重点关注源码中的注释——大部分关键决策有一段英文解释。比如 FuseYieldCAS loop 这些地方的注释密度都很高——作者在和你对话"为什么这么做"。读注释比读代码本身学到的更多。

14.19 Pool 内部设计的通用启示

最后升华到 API 设计层面,sqlx Pool 内部代码给我们的三条启示

1. 分离语义层次的计数器。permit(并发控制)和 size(资源总数)分开——让"一条 idle 连接占 size 但不占 permit" 这种细粒度控制可能。很多库一开始只有一个 "connection count" 计数——之后发现不够用、必须拆成两个。sqlx 从一开始就拆——省了后续的重构。

2. 用 RAII guard 处理错误路径。DecrementSizeGuard 的 Drop 让错误路径"默认正确"——程序员不用手写 "if err { decrement }"。这条设计在 Rust 里极其常见——Mutex 的 MutexGuard、BorrowMut 的 RefMut、tokio 的 tokio::sync::SemaphorePermit——都是同一套思路。遇到"需要成对做的操作"永远考虑 RAII。

3. 后台任务用 Weak 引用。maintenance task 持 Weak<PoolInner> 不阻止 PoolInner drop——这让 "Pool 最后一个 handle drop 后整个资源链自动回收" 成立。如果后台任务持 Arc,Pool 就永远不会被回收(cycle)。这条"引用类型的方向性"是 Rust 异步代码里最容易踩坑的地方——但学会后能避免很多生产事故。

读完本章,希望你对这三条启示有具体的代码位置作证——下次在自己代码里应用时不是抄模式、而是有理解的迁移

14.20 parent_pool 的特殊机制

inner.rs:127-170acquire_permit 里有一段关于 parent_pool 的处理——这是 sqlx 一个少用但精彩的机制。

PoolOptions::parent(pool) 让一个子 Pool 从父 Pool 偷 permit

rust
let main_pool = PgPoolOptions::new().max_connections(100).connect(url).await?;

// 子 Pool 共享父的连接预算
let subtask_pool = PgPoolOptions::new()
    .max_connections(20)
    .parent(main_pool.clone())
    .connect_lazy_with(opts);

语义

  • 子 Pool 的 max_connections = 20 —— 逻辑上限。
  • 父 Pool 的 max_connections = 100 —— 两者共享一个 permit 空间。
  • 子 Pool 的 acquire 会同时尝试从自己的信号量和父的信号量抢 permit——父先满足就走父、否则等子。
  • 效果:子 Pool 最多用 20 条 "属于它自己的" 名额,但额外能借父的名额。

典型用例:一个服务里有主路径 pool(业务查询)和后台任务 pool(ETL / 统计)。后台 pool 不能占满主路径的连接——设成子 Pool、max_connections 给个适量(20 条),让后台跑慢但不撞主路径。

实现上 acquire_permitinner.rs:127-170)的 future::poll_fn 里同时 poll 子和父的 semaphore——先 poll 子(优先用自己的),如果子满了就 poll 父(借)。逻辑上 permit 实际上被父的 semaphore 管理——子只是把自己的限额加到父上(子的 semaphore capacity 是 0,所有 permit 都从父借)。

这条机制在日常业务代码里少用——但一旦遇到"主路径 / 后台路径 分担连接预算"的场景就非常有用。生产 PgBouncer 也有类似的 reserve_pool_size 机制——sqlx 在 client 侧提供了等价功能。

14.21 Pool 并发测试的挑战

Pool 的内部代码要做并发正确性测试——sqlx 实际仓库里有一套集成测试:

  • tests/pool/ 目录的 spawned concurrent tests——N 个 tokio task 同时 acquire + release,验证 size counter 不泄漏、idle queue 不乱序。
  • loom 测试(如果启用)——用 loom crate 做穷尽并发状态检查,catch 常规测试跑不出的 race。

但 Pool 的绝大部分"正确性保证"不是测试出来的——是代码设计时就避免竞争

  • atomic CAS 让 size 更新无 race
  • RAII guard 让错误路径自动修正
  • Weak 引用让后台任务不阻碍 drop

"设计出正确性"比"测试出正确性"更重要——这是 Rust 并发编程的核心理念。sqlx Pool 的代码结构让"测试不出不代表有 bug"的担忧降到最低——关键不变量由类型系统和 RAII 强制。

这也是为什么 sqlx 的 Pool 实现能在数年生产里保持稳定——不是靠天选 / 测试覆盖率、而是靠设计层面的并发正确性。这条设计原则值得你在自己项目里反复实践。

14.22 acquire_permit 函数的 poll_fn 精读

前面 §14.6 一笔带过了 acquire_permit——但它的 poll_fn 实现值得精读。inner.rs:141-170

rust
future::poll_fn(|cx| {
    if close_event.as_mut().poll(cx).is_ready() {
        return Poll::Ready(Err(Error::PoolClosed));
    }

    if parent_close_event.as_mut().poll(cx).is_ready() {
        self.mark_closed();
        return Poll::Ready(Err(Error::PoolClosed));
    }

    if let Poll::Ready(permit) = acquire_self.as_mut().poll(cx) {
        return Poll::Ready(Ok(permit));
    }

    // 父 Pool 的 poll 有延迟
    if poll_parent {
        acquire_parent.as_mut().poll(cx).map(Ok)
    } else {
        poll_parent = true;
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}).await

四个 future 同时 poll——close_event / parent_close_event / acquire_self / acquire_parent。按优先级:

  1. close_event:Pool 已关,立即返回 PoolClosed。
  2. parent_close_event:父 Pool 关,propagate 到子、同样 PoolClosed。
  3. acquire_self:自己的 semaphore 有 permit,立即拿。
  4. acquire_parent:从父借 permit——但第一次 poll 故意返回 Pending + wake,给子一次"不借父"的机会。

第四步的延迟 poll 很精彩——poll_parent = false 时调用者第一次 poll 返回 Pending + 立刻 wake——让 executor 再 poll 一次(这次 poll_parent = true)。中间可能有子的 semaphore 释放(其他 task 归还)——让子 Pool 优先用自己的配额、只有真找不到时才借父。

这是一个**"优先本地、不得已才借外部"** 的语义——用 poll_fn 里的状态机精确表达。写这段代码的作者思考到了"如果同时 poll 父和子、父可能抢先返回"的 race、用延迟一个 poll cycle 规避。

读这段代码你能感受到 sqlx 作者的并发代码功力——对 Rust future polling 的 semantics 理解极深。日常业务代码可能永远不会写出这种 poll_fn、但理解它能让你看懂任何复杂 Future 实现。

14.23 Pool 性能统计:一些真实数据

基于源码分析 + 常规 benchmark 可以给一些量级估计(不是精确数字):

操作耗时量级主要开销来源
ArrayQueue::pop / push30-100ns单次 atomic CAS
AsyncSemaphore::acquire(立刻)100-500nsTokio Semaphore 的 fast path
AsyncSemaphore::acquire(排队)μs 到 ms取决于排队长度
size.fetch_update CAS50-200nsatomic RMW
DecrementSizeGuard RAII 全路径200-500ns两个 atomic(size + permit)
PoolConnection::Drop + spawn1-5μstokio::spawn 自身开销
idle_conns.push + semaphore release200-500ns组合
后台 maintenance 一轮扫描(1000 idle)ms 级try_acquire 1000 次

生产意义:sqlx Pool 在空闲热路径(acquire 直接拿到 idle)大概 200-500ns 完成——低于 μs 级。任何 acquire 超过 1μs 几乎一定是走了冷路径(建新连接、排队等、ping 失败)。用 tracing 追踪慢 acquire 会让问题暴露得明显。

对比 tokio-postgres 的手动 pooldeadpool-postgres)——性能大致同级,因为 deadpool 也用 crossbeam + Tokio Semaphore。Go 的 database/sql 的 pool——对标 sqlx 大约慢 2-3 倍(互斥锁 + GC 影响)。Java 的 HikariCP——也差不多。

sqlx 在连接池性能层面和生态里其他 state-of-the-art 的工具打平——不是落后也不是领先。优势在其他维度(类型安全 / query! 宏 / Rust 整合)。

14.24 一条实战排查案例

用一条真实生产场景串起本章内容——假设你的服务突然报大量 PoolTimedOut

Step 1:看监控仪表盘——pool.size() 恒定在 max_connections、pool.num_idle() 恒 0——Pool 饱和

Step 2:打开 acquire_slow_level——日志开始出现 slow acquire warning——每次 acquire 等 2s+。

Step 3:查 pg_stat_activity——发现大量连接在同一条长查询上。

Step 4:定位那条查询——SELECT ... FROM huge_table WHERE ... ORDER BY col——没索引、全表扫。

Step 5:根因——这条慢查询把 pool 的所有连接占满、新请求全排队超时。

Step 6:修复——加索引 / 加 statement_timeout(从 after_connect 里设)——让慢查询自动被 kill。

这条排查路径利用了本章讨论的几个机制:

  • pool.size() / num_idle() 做饱和判断(§13.8)。
  • acquire_slow_level 定位慢 acquire(§13.10)。
  • after_connect 设 statement_timeout(§13.11)。
  • Pool 的 size 一直等于 max 而不增不减——backdrop 是 §14.8 的 size counter 稳定语义。
  • 理解 idle 队列为空意味着所有连接在 in-use——这来自本章的 idle vs in-use 分离概念(§14.7)。

不理解 Pool 内部的工程师可能会乱改配置(增大 max_connections、改 acquire_timeout)——治标不治本。理解本章内容后你知道根因不在 Pool——而在业务查询——从这个方向调才是对的。

这条真实案例说明:读懂 Pool 内部不是学术锻炼、是生产武器。下次你排查任何 Pool 相关问题,本章的机制就是你的工具箱。

14.25 Pool 源码的代码品质观察

作为本章结尾,回看 sqlx pool 源码的代码品质几点观察:

1. 注释密度恰当。关键决策点(RAII guard 的 cancel 语义、poll_fn 的延迟 poll、close 的三阶段)都有 2-5 行英文注释——解释"为什么这么做"。其他显然的代码不加废话注释。这是良好开源代码的标志——新手能读懂、老手不被冗余。

2. 函数长度控制。大多数函数 20-50 行——能一屏看完。超过 100 行的只有 acquire(因为它是核心的循环 + 超时 + 错误处理)——这种"必要的长"可以接受。

3. panic! 使用谨慎。整个 pool 模块只有 1-2 处 panic:release 里的 "BUG: connection queue overflow"——因为逻辑上 size counter 保证 idle_conns 不会满、如果满了一定是 bug。这种不变量 violation 用 panic比用 Result 合理——把"不可能发生"的代码路径标出来、避免用户处理不存在的错误。

4. 测试友好的设计parent_pool 机制(§14.20)让集成测试里可以构造"父子 pool"跑并发测——模拟生产的复杂拓扑。这种测试钩子不侵入生产代码(parent_pool 是正常 feature 而不是 test-only attr)——值得学习。

5. #[allow(dead_code)] 使用克制。源码里几乎没有 allow(dead_code)——意味着每一段代码都在被用。"死代码标记滥用"是很多开源项目的病——sqlx 避免了这个。

这些品质观察不是"锦上添花"——它们是 sqlx 能持续维护 7 年(从 2019 到 2026)的工程支撑。学 sqlx 不只是学库怎么用、也是学怎么写一份能长期维护的开源 Rust 库

到此第 14 章结束——Pool 的外部 API(第 13 章)和内部实现(本章)都拆开了。下一章进入第四部分的最后一块拼图——Transaction。

14.26 四条代码阅读心得

读完本章你至少掌握了下面四条从 Pool 源码里直接学到的东西——它们放在任何 Rust 项目里都值得参考:

1. 多状态对象用类型系统表达状态。Live / Idle / Floating 三个类型就是状态机的三个状态——转换通过 into_xxx 方法、编译器保证状态迁移合法(你不能把 Live 直接推回 Idle queue、必须先 into_idle)。比用 bool flag 区分状态类型安全得多

2. 原子操作 + RAII 组合处理"必须成对的资源操作"。增 size 必须配对减 size;抢 permit 必须配对 release——DecrementSizeGuard 让这两对"自动成对"。比 try { incr; ... } finally { decr } 的 Java 风格优雅很多。

3. 有界并发原语比无界好。ArrayQueue 是有界的(不会无限增长)、Pool 的 semaphore 是有界的(max_connections)——资源上限由类型本身保证。无界队列 / 无界信号量听起来灵活、实际生产是 OOM 温床。

4. 后台任务的生命周期管理用 Weak。任何"长期运行的后台 task"都用 Weak 引用它需要的资源——让资源的主 Arc drop 能触发 task 退出。业务代码里经常会写 tokio::spawn 一个 task 持 Arc 忘了 drop——导致资源永久活着。Weak 是防御。

掌握这四条、你读任何 Rust 高并发代码都会顺手——sqlx pool 是它们的集大成样本

第 15 章 Transaction 会回到"业务语义"层面——讨论 RAII guard 对事务 commit/rollback 的强制约束、savepoint 嵌套、以及 Drop 里为什么"只能尽力 rollback"。

14.27 和《Tokio 源码深度解析》第12章的关联

sqlx Pool 的核心门禁是 AsyncSemaphore——本章看到 acquire_permitDecrementSizeGuardfair 队列这些概念——它们背后就是《Tokio 源码深度解析》第 12 章 §12.3 Semaphore:基础 building block§12.4 公平锁 vs 非公平锁的 trade-off 里讨论的数据结构。

具体对应

  • sqlx PoolOptions::fair = true 默认(sqlx-core/src/pool/options.rs:163)↔ Tokio §12.4 讲 tokio::sync::Semaphore 公平队列——底层用 intrusive 链表 + FIFO Waker——每个 waiter 在 Vec<Waker> 里按到达顺序排。
  • sqlx Pool 的 acquire_permit().await ↔ Tokio §12.3 讲 Semaphore::acquire(1) 返回 SemaphorePermit 的 RAII guard——drop 时自动 release。
  • sqlx DecrementSizeGuard 的"资源获取和释放绑定同一 struct" ↔ Tokio §12.3 讲的 OwnedSemaphorePermit

为什么要看 Tokio 那章——sqlx 池是在 Semaphore 之上封装业务语义(最大连接数、idle queue、min_connections 维护)——底层 Semaphore 的 waker 管理、公平性实现、poll 协议都在 Tokio crate 里。不看 Tokio 那一层、sqlx 的 acquire 为什么能被唤醒就只能靠直觉理解——看了就知道 Tokio Semaphore 内部 poll_acquire 是怎么挂 waker、怎么在 add_permits 时按序唤醒。

一条建议——如果本章读得有点吃力、先看《Tokio》第 12.3-12.4 半小时——再回来读本章会顺得多。

基于 VitePress 构建