Skip to content

第15章 Transaction:Drop 保护、savepoint、嵌套

"A transaction is a promise—and Rust's Drop cannot keep promises that require await." —— 每个写过 sqlx Transaction 代码的人遇到的第一条领悟

本章要点

  • Transaction<'c, DB>sqlx-core/src/transaction.rs:86-94)是 sqlx 的事务 RAII guard——两个字段:connection: MaybePoolConnection<'c, DB> + open: bool。简洁到让人意外。
  • TransactionManager traittransaction.rs:15-53)定义了驱动层的事务操作:begin / commit / rollback(async)+ start_rollback(sync)+ get_transaction_depth。由每个驱动具体实现(PgTransactionManager 等)。
  • 三条 ANSI SQL helper 函数transaction.rs:277-305):begin_ansi_transaction_sql(depth)commit_ansi_transaction_sql(depth)rollback_ansi_transaction_sql(depth)——depth 0 用 BEGIN/COMMIT/ROLLBACK、depth > 0 用 SAVEPOINT _sqlx_savepoint_N / RELEASE / ROLLBACK TO
  • Drop 的"尽力 rollback"transaction.rs:260-275)—— Drop 调 start_rollback 同步方法、只排队一条 ROLLBACK SQL 到 write buffer、不等发送完成。真正发送要等下次这条连接 I/O。
  • 嵌套事务通过 SAVEPOINT——第一层 BEGIN;第二层开始用 SAVEPOINT _sqlx_savepoint_1;commit 嵌套用 RELEASE SAVEPOINT;rollback 用 ROLLBACK TO SAVEPOINT
  • Transaction 实现 DerefMut 到 DB::Connectiontransaction.rs:220-232)——让 &mut *tx 变成 &mut Connection、满足 Executor bound(第 4 章 §4.6 讨论过)。
  • Transaction 本身不实现 Executor——只有通过 DerefMut 的 &mut *tx 才是。原因是 lazy normalization 限制(第 4 章 §4.6)。
  • commit/rollback/drop 的工程优先级显式 commit 最优 / 显式 rollback 次之 / drop 兜底但不保证——生产代码永远显式 commit 或 rollback,不依赖 drop。

15.1 问题引入:Rust async 里的事务挑战

事务是 SQL 世界的原子性保证——一组操作要么全做要么全不做。经典用法:

rust
let mut tx = pool.begin().await?;
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
    .bind(100).bind(from_id).execute(&mut *tx).await?;
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
    .bind(100).bind(to_id).execute(&mut *tx).await?;
tx.commit().await?;

三行 SQL 在同一事务里——中间任何一步失败(第二条 UPDATE 抛错、? 返回 Err),整个事务要回滚,钱不会凭空消失。

问题是? 返回 Err 后 tx 变量离开作用域被 drop——Rust 期望 drop 里发 ROLLBACK 给数据库。但 Rust 的 Drop::drop 是同步函数——不能 .await。而"发 ROLLBACK 消息给数据库"是异步 I/O(写 socket + 等响应)——drop 里做不完

这个矛盾是 sqlx Transaction 设计的根本挑战。sqlx 的对策是两层 API

  1. 显式 commit/rollback 是 async——用户 .await 它们保证消息发出、响应接收。
  2. Drop 调同步 start_rollback——只把 ROLLBACK 查询排进 write buffer、不发送。真正发送要等下次这条连接被使用时。

这是一个"prefer explicit, fall back to best-effort"的设计——用户如果记得 commit/rollback 就一切正常;如果忘了、Drop 也尽可能把事务关掉(但不保证立即)。

本章拆开这条设计的每一处细节——TransactionManager trait、三条 ANSI SQL helper、Transaction struct、Drop 实现、嵌套 savepoint——看 sqlx 如何在 Rust async 限制下把事务 API 做得"尽可能安全"。

15.2 TransactionManager trait:驱动层的抽象

sqlx-core/src/transaction.rs:15-53 的 trait:

rust
#[doc(hidden)]
pub trait TransactionManager {
    type Database: Database;

    fn begin<'conn>(
        conn: &'conn mut <Self::Database as Database>::Connection,
        statement: Option<Cow<'static, str>>,
    ) -> BoxFuture<'conn, Result<(), Error>>;

    fn commit(conn: &mut <Self::Database as Database>::Connection) -> BoxFuture<'_, Result<(), Error>>;
    fn rollback(conn: &mut <Self::Database as Database>::Connection) -> BoxFuture<'_, Result<(), Error>>;

    fn start_rollback(conn: &mut <Self::Database as Database>::Connection);

    fn get_transaction_depth(conn: &<Self::Database as Database>::Connection) -> usize;
}

五个方法:

  • begin(conn, statement) async——开事务或进 savepoint。
  • commit(conn) async——提交事务或 release savepoint。
  • rollback(conn) async——回滚事务或 rollback to savepoint。
  • start_rollback(conn) sync——只排队 ROLLBACK 查询,不等执行。
  • get_transaction_depth(conn)——查当前 nesting 层级(0 = 无事务、1 = 事务中、>1 = savepoint 嵌套)。

#[doc(hidden)] 标注——普通用户不调这些方法,通过 Transaction 类型间接用。trait 是驱动层内部抽象。

没有方法接收 self——所有方法都是 type-associated,通过 &mut Connection 操作。TransactionManager 本身是零大小 marker 类型(PgTransactionManagerpub struct PgTransactionManager; 空 struct)——不持状态,只提供方法集。

15.2.1 begin 的 statement 参数

begin(conn, statement: Option<Cow<'static, str>>)statement 允许用户自定义 BEGIN 语句:

  • None:用默认 ANSI BEGIN
  • Some("BEGIN ISOLATION LEVEL SERIALIZABLE"):自定义(只对 depth = 0 有效)。

第 12 章 §12.5 讨论过的 Connection::begin_with 就走这条——传 Some(statement) 给 TransactionManager::begin。嵌套事务(depth > 0)不能自定义 statement——sqlx 强制用 SAVEPOINT,传 Some 时返回 Error::InvalidSavePointStatement

15.3 Transaction struct 的简洁性

transaction.rs:86-94

rust
pub struct Transaction<'c, DB>
where DB: Database,
{
    connection: MaybePoolConnection<'c, DB>,
    open: bool,
}

只有两个字段

  • connection: MaybePoolConnection<'c, DB>——要么是 &mut Connection(从 Connection::begin 来)、要么是 owned PoolConnection(从 Pool::begin 来)。统一抽象让 Transaction 对两种来源都工作。
  • open: bool——标记事务是否还活着。true 表示事务中、false 表示已 commit / rollback。

MaybePoolConnection<'c, DB>sqlx-core/src/pool/maybe.rs)简化:

rust
pub enum MaybePoolConnection<'c, DB: Database> {
    Connection(&'c mut DB::Connection),  // 来自 Connection::begin
    PoolConnection(PoolConnection<DB>),   // 来自 Pool::begin
}

两种来源通过枚举统一——Transaction 不关心具体来源、只管事务语义。这让 Transaction<'c, DB>'c 生命周期根据来源不同而不同:

  • Connection-originated:'c = 被借用的 Connection 的生命周期。
  • Pool-originated:'c = 'static——PoolConnection owned 不借用外部。

15.3.1 open: bool 的作用

open 字段跟踪事务状态。只在两处被修改

  • Self::commit 成功后:self.open = falsetransaction.rs:119)。
  • Self::rollback 成功后:self.open = falsetransaction.rs:125)。

Drop 里检查 if self.open——只有还在开着的事务才调 start_rollback

rust
impl<'c, DB> Drop for Transaction<'c, DB> {
    fn drop(&mut self) {
        if self.open {
            DB::TransactionManager::start_rollback(&mut self.connection);
        }
    }
}

这条简单的标志让 "已 commit/rollback 的事务 drop 不做任何事" 成立——避免重复回滚。

15.4 begin 的流程:ANSI BEGIN 或 SAVEPOINT

Transaction::begintransaction.rs:97-112):

rust
pub fn begin(
    conn: impl Into<MaybePoolConnection<'c, DB>>,
    statement: Option<Cow<'static, str>>,
) -> BoxFuture<'c, Result<Self, Error>> {
    let mut conn = conn.into();
    Box::pin(async move {
        DB::TransactionManager::begin(&mut conn, statement).await?;
        Ok(Self { connection: conn, open: true })
    })
}

就是调 TransactionManager::begin + 构造 Self。具体的 BEGIN 语句在 PgTransactionManager::beginsqlx-postgres/src/transaction.rs:17-46)里决定:

rust
fn begin<'conn>(conn: &'conn mut PgConnection, statement: Option<Cow<'static, str>>) -> BoxFuture<...> {
    Box::pin(async move {
        let depth = conn.inner.transaction_depth;
        let statement = match statement {
            Some(_) if depth > 0 => return Err(Error::InvalidSavePointStatement),
            Some(statement) => statement,
            None => begin_ansi_transaction_sql(depth),
        };

        let rollback = Rollback::new(conn);
        rollback.conn.queue_simple_query(&statement)?;
        rollback.conn.wait_until_ready().await?;
        if !rollback.conn.in_transaction() {
            return Err(Error::BeginFailed);
        }
        rollback.conn.inner.transaction_depth += 1;
        rollback.defuse();
        Ok(())
    })
}

核心流程:

  1. 读当前 depth——决定是 BEGIN 还是 SAVEPOINT。
  2. 按 depth 选 SQL
    • depth = 0 + 无自定义 statement:BEGIN
    • depth = 0 + 有自定义:用户 statement(BEGIN ISOLATION LEVEL SERIALIZABLE 等)。
    • depth > 0 + 自定义:拒绝(InvalidSavePointStatement)。
    • depth > 0:SAVEPOINT _sqlx_savepoint_N
  3. 新建 Rollback guard——一个内部 RAII,如果 begin 过程中异常 drop、自动 start_rollback。
  4. 发 simple query——把 BEGIN/SAVEPOINT 发给服务端。
  5. 等 ReadyForQuery——确认服务端进入事务状态。
  6. transaction_depth += 1
  7. defuse Rollback guard——成功了不需要 guard 的兜底。

15.4.1 begin_ansi_transaction_sql helper

transaction.rs:277-282

rust
pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
    if depth == 0 {
        Cow::Borrowed("BEGIN")
    } else {
        Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
    }
}

depth = 0BEGIN(顶层事务)。 depth = 1SAVEPOINT _sqlx_savepoint_1(第一个嵌套 savepoint)。 depth = 2SAVEPOINT _sqlx_savepoint_2

savepoint 命名用连续数字——第 N 层 savepoint 叫 _sqlx_savepoint_N。这样 commit / rollback 对应的 RELEASE / ROLLBACK TO 也能按 depth 反算名字。

返回 Cow<'static, str> 的巧妙:depth = 0 时是 "BEGIN" 字符串字面量(借用)、depth > 0 时是动态格式化 String(owned)。Cow 让两种情况共用签名——常见字面量零分配、少见情况按需分配。

15.4.2 Rollback guard

PgTransactionManager::begin 里的 Rollback 是个内部 RAII

rust
struct Rollback<'c> {
    conn: &'c mut PgConnection,
    defuse: bool,
}

impl Drop for Rollback<'_> {
    fn drop(&mut self) {
        if !self.defuse {
            PgTransactionManager::start_rollback(self.conn)
        }
    }
}

作用:如果 begin 过程中任何 .await? 失败(panic / 网络错误)、Rollback 的 drop 会自动 start_rollback——避免服务端已经开始了事务但 Rust 这边错误返回的状态不一致。

rollback.defuse() 只在 begin 完全成功后调——cancel 掉自动 rollback、因为 Transaction 本身接管了事务管理责任。

这是一个经典 RAII 模式——"部分成功的错误路径"用 guard 兜底。第 14 章的 DecrementSizeGuard 也是同一种设计。

15.5 commit / rollback:async 完整路径

transaction.rs:114-127

rust
pub async fn commit(mut self) -> Result<(), Error> {
    DB::TransactionManager::commit(&mut self.connection).await?;
    self.open = false;
    Ok(())
}

pub async fn rollback(mut self) -> Result<(), Error> {
    DB::TransactionManager::rollback(&mut self.connection).await?;
    self.open = false;
    Ok(())
}

两个方法的结构一样——调 TransactionManager 的 commit/rollback + 设置 open = false

关键是吃 self——commit/rollback 消费整个 Transaction 对象。.await? 之后 self 要么被消费(Ok)要么错误冒泡(Err)——两条路径之后 Transaction 都不可再用。

注意 如果 commit 的 .await? 失败——open 保持 true、Drop 会触发 start_rollback。commit 失败不意味着"事务还活着"——它意味着"客户端不确定事务状态"(网络可能在 COMMIT 消息发出后断了)——rollback 是保守兜底。

15.5.1 PgTransactionManager::commit 的实现

sqlx-postgres/src/transaction.rs:48-57

rust
fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> {
    Box::pin(async move {
        if conn.inner.transaction_depth > 0 {
            conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth))
                .await?;
            conn.inner.transaction_depth -= 1;
        }
        Ok(())
    })
}

三步:

  1. 检查 depth:只在 depth > 0 时执行(否则无事务可 commit)。
  2. 发 commit SQLCOMMITRELEASE SAVEPOINT _sqlx_savepoint_N
  3. depth -= 1

commit_ansi_transaction_sqltransaction.rs:285-291):

rust
pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
    if depth == 1 {
        Cow::Borrowed("COMMIT")
    } else {
        Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
    }
}
  • depth = 1COMMIT(关闭顶层事务)。
  • depth > 1RELEASE SAVEPOINT _sqlx_savepoint_{depth-1}(关闭嵌套的 savepoint)。

注意 depth - 1——命名用 savepoint 建立时的 depth。举例:

  • depth = 0 → begin BEGIN(depth 变 1)。
  • depth = 1 → begin SAVEPOINT _sqlx_savepoint_1(depth 变 2)。
  • commit at depth = 2 → RELEASE SAVEPOINT _sqlx_savepoint_1(depth 变 1)。

这条命名和 release 的反向对称是 savepoint 语义的一部分——让 commit/rollback 能按 depth 反推到正确的 savepoint 名字。

15.6 start_rollback:同步路径

PgTransactionManager::start_rollbacksqlx-postgres/src/transaction.rs:71-80):

rust
fn start_rollback(conn: &mut PgConnection) {
    if conn.inner.transaction_depth > 0 {
        conn.queue_simple_query(&rollback_ansi_transaction_sql(conn.inner.transaction_depth))
            .expect("BUG: Rollback query somehow too large for protocol");
        conn.inner.transaction_depth -= 1;
    }
}

同步函数——不 .await。做的事:

  1. 检查 depth
  2. queue_simple_query——把 ROLLBACK SQL 写进连接的 write buffer(不 flush、不发送)。
  3. depth -= 1——逻辑上事务已经"准备" rollback。

expect("BUG: ...")——queue_simple_query 只在"SQL 长度超过 u32 最大值(4GB)"时失败——不可能发生、panic 是合理的。

15.6.1 为什么"只排队不发送"?

queue_simple_query 往 write buffer 写字节——不调 flush。什么时候真正发送?

  1. 下次用这条连接执行 query 时——新 query 写进 buffer 后、flush 会把之前的 ROLLBACK 一起发。
  2. 连接归还 Pool、Pool 在下次 acquire 时 flush(如果有 test_before_acquire)。
  3. 连接被 drop——drop 实现 best effort 地 flush(不保证)。

延迟发送意味着:ROLLBACK 消息可能几百毫秒后才真到服务端。在这段间隔里,服务端视角下事务还开着——持有锁、持有行版本、占 statement 资源。

实际影响

  • 短期(几秒):没明显影响,服务端最终会收到 ROLLBACK。
  • 极端情况(连接卡住、进程挂):ROLLBACK 永远不发、服务端等到 session timeout(默认几分钟)才清理。

这条"尽力"语义是 Rust async Drop 限制的直接结果——没办法更好。sqlx 文档里反复强调"永远显式 commit/rollback"——就是因为 Drop 路径不够可靠。

15.7 Drop 的"尽力 rollback"

transaction.rs:260-275 的 Drop 实现:

rust
impl<'c, DB> Drop for Transaction<'c, DB>
where DB: Database,
{
    fn drop(&mut self) {
        if self.open {
            // starts a rollback operation
            // what this does depends on the database but generally this means we queue a rollback
            // operation that will happen on the next asynchronous invocation of the underlying
            // connection (including if the connection is returned to a pool)
            DB::TransactionManager::start_rollback(&mut self.connection);
        }
    }
}

注释精炼地说清楚了语义——"queue a rollback operation that will happen on the next asynchronous invocation"。用户看到这段注释就明白"drop 不立即 rollback"。

这条设计的工程权衡:

如果 Drop 试图同步发 ROLLBACK(就像 tokio-postgres 的做法之一):

  • 需要 block on the runtime——可能死锁(Tokio drop 时 runtime 可能已经 shutdown)。
  • 或者 spawn task——task 可能跑不完 Drop 就返回、Transaction 对象先被析构。

如果 Drop 什么都不做

  • ROLLBACK 永远不发——服务端要等 session timeout——更糟。

start_rollback 是中间路径排队不发送——正确性要靠下次 connection I/O 触发——大部分场景有效、极端情况依赖 server-side timeout。

sqlx 的选择是 "最佳 effort + 明确文档"——不假装能做到同步 rollback、也不放弃尝试。

15.8 嵌套事务:savepoint 的工作机制

sqlx 支持任意深度的嵌套事务——通过 SAVEPOINT 实现:

rust
let mut tx = pool.begin().await?;                   // depth 1: BEGIN

sqlx::query("INSERT ...").execute(&mut *tx).await?;

let mut inner_tx = (&mut *tx).begin().await?;       // depth 2: SAVEPOINT _sqlx_savepoint_1
sqlx::query("INSERT ...").execute(&mut *inner_tx).await?;

// 嵌套 rollback 只回滚到 savepoint
inner_tx.rollback().await?;                          // ROLLBACK TO SAVEPOINT _sqlx_savepoint_1

// 外层继续
sqlx::query("INSERT ...").execute(&mut *tx).await?;
tx.commit().await?;                                  // COMMIT

语义:内层 rollback 不影响外层——外层继续跑到 commit。内层如果 commit 了、外层再 rollback,内层的改动也会被外层 rollback(因为 COMMIT 嵌套 savepoint 实际是 RELEASE——在外层 COMMIT 前还没真正提交到 DB)。

这条语义对应 Postgres / MySQL 的 SAVEPOINT 标准——sqlx 直接用协议的 SAVEPOINT 命令、不做客户端模拟。

15.8.1 嵌套的 SQL 对应

完整映射表:

depth 变化begin SQLcommit SQLrollback SQL
0 → 1BEGINCOMMITROLLBACK
1 → 2SAVEPOINT _sqlx_savepoint_1RELEASE SAVEPOINT _sqlx_savepoint_1ROLLBACK TO SAVEPOINT _sqlx_savepoint_1
2 → 3SAVEPOINT _sqlx_savepoint_2RELEASE SAVEPOINT _sqlx_savepoint_2ROLLBACK TO SAVEPOINT _sqlx_savepoint_2
3 → 4SAVEPOINT _sqlx_savepoint_3......

depth 在 begin 时递增、commit/rollback 时递减——跟踪由 TransactionManager::get_transaction_depth 维护。

15.9 Transaction 的 DerefMut

transaction.rs:212-232

rust
impl<'c, DB> Deref for Transaction<'c, DB>
where DB: Database,
{
    type Target = DB::Connection;
    fn deref(&self) -> &Self::Target { &self.connection }
}

impl<'c, DB> DerefMut for Transaction<'c, DB>
where DB: Database,
{
    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.connection }
}

Transaction 可以被 deref 成 &mut DB::Connection——这是 sqlx 让 &mut *tx 成为 Executor 的基础。

第 4 章 §4.6 详细讨论过**&mut Transaction 不直接实现 Executor**——因为 lazy normalization 限制。用户写 &mut *tx 显式 deref——绕过这条限制。

这条设计让事务内的 query 写法统一:

rust
sqlx::query(...).execute(&mut *tx).await?;           // &mut Connection via DerefMut
sqlx::query(...).execute(&mut conn).await?;          // 直接 &mut Connection
sqlx::query(...).execute(&pool).await?;              // &Pool

三种 Executor 混用——Transaction 通过 DerefMut 融入统一体系。

15.10 Connection::transaction 便捷方法

第 12 章 §12.5 讨论过 Connection::transaction——一个便捷闭包包装:

rust
fn transaction<F, R, E>(&mut self, callback: F) -> BoxFuture<'_, Result<R, E>>
where F: FnOnce(&mut Transaction<'_, Self::Database>) -> BoxFuture<'_, Result<R, E>>,
      R: Send, E: From<Error> + Send,
{
    Box::pin(async move {
        let mut transaction = self.begin().await?;
        let ret = callback(&mut transaction).await;
        match ret {
            Ok(ret) => { transaction.commit().await?; Ok(ret) }
            Err(err) => { transaction.rollback().await?; Err(err) }
        }
    })
}

显式 commit 或 rollback——不依赖 Drop。这让 transaction 包装器比手写 begin/commit/rollback 更安全——错误路径显式 rollback、不用担心 Drop 的"尽力"语义。

生产代码推荐优先用 transaction 闭包:

rust
conn.transaction(|tx| Box::pin(async move {
    sqlx::query("UPDATE ...").execute(&mut **tx).await?;
    sqlx::query("INSERT ...").execute(&mut **tx).await?;
    Ok::<_, sqlx::Error>(())
})).await?;

相比手写 begin/commit,少两行代码、错误路径自动显式 rollback。唯一限制是闭包内无法跨 await 借用外部变量太复杂——这时候 fallback 到手写 begin。

15.11 跨 DB 的事务差异

三家 DB 的事务行为共性 + 差异

共性

  • 都用 BEGIN / COMMIT / ROLLBACK 顶层事务。
  • 都用 SAVEPOINT / RELEASE / ROLLBACK TO SAVEPOINT 嵌套。
  • depth 跟踪机制类似。

Postgres 的特殊

  • 自动 rollback on error:Postgres 事务内一条 query 失败后、整个事务进入 "aborted" 状态、后续 query 全 current transaction is aborted——只能 rollback。sqlx 不处理这个、让错误自然冒泡。
  • ISOLATION LEVELBEGIN ISOLATION LEVEL SERIALIZABLE 等通过 begin_with 传入。
  • READ ONLY / DEFERRABLE:同样通过 begin_with。

MySQL 的特殊

  • DDL 不事务性:MySQL 的 DDL(CREATE TABLE 等)会自动提交当前事务——不像 Postgres 里 DDL 也事务性。sqlx 不能防止这个(协议层无法拒绝)。
  • Lock 行为:MySQL 的行锁在事务中持有直到 COMMIT / ROLLBACK、和 Postgres 略不同。

SQLite 的特殊

  • 三种 BEGINBEGIN DEFERRED(SQLite 默认)、BEGIN IMMEDIATEBEGIN EXCLUSIVE——获取写锁的时机不同。sqlx 默认发裸 BEGIN(等同 DEFERRED、见 sqlx-core/src/transaction.rs:278)——第一条 SELECT 拿 shared lock、第一条 UPDATE/INSERT/DELETE 才升级为 reserved lock。如果业务在事务里先读后写且多个连接并发、可能遇到"lock upgrade 死锁"——这时应该用 conn.begin_with("BEGIN IMMEDIATE") 显式指定。
  • 单写者:SQLite 一个数据库文件只能有一个 writing transaction——其他 writer 要等。Pool 的 max_connections 不能解决这个并发限制。

这些差异在业务代码几乎看不见——sqlx 的 Transaction API 屏蔽了大部分方言。但生产事故时需要知道这些——比如"为什么 MySQL 里 DDL 跑完我的事务没了"就是差异 1 导致。

15.12 实战模式

几个常见的事务实战模式:

15.12.1 金钱转账

rust
pub async fn transfer(pool: &PgPool, from: i32, to: i32, amount: i64) -> Result<(), Error> {
    let mut tx = pool.begin().await?;

    // 扣款(带余额校验)
    let affected = sqlx::query(
        "UPDATE accounts SET balance = balance - $1 WHERE id = $2 AND balance >= $1"
    ).bind(amount).bind(from).execute(&mut *tx).await?.rows_affected();

    if affected != 1 {
        return Err(Error::InsufficientFunds);
        // drop tx → start_rollback 排队
    }

    // 入账
    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(amount).bind(to).execute(&mut *tx).await?;

    // 审计记录
    sqlx::query("INSERT INTO audit_log (from_id, to_id, amount) VALUES ($1, $2, $3)")
        .bind(from).bind(to).bind(amount).execute(&mut *tx).await?;

    tx.commit().await?;  // 显式 commit
    Ok(())
}

三条 SQL 原子性——任何一条失败(扣款校验不过、网络断),整个回滚。

15.12.2 带 SAVEPOINT 的错误恢复

rust
pub async fn bulk_import(pool: &PgPool, rows: Vec<Row>) -> Result<u64, Error> {
    let mut tx = pool.begin().await?;
    let mut inserted = 0u64;

    for row in rows {
        // 每行一个 savepoint
        let mut sp = (&mut *tx).begin().await?;

        let result = sqlx::query("INSERT INTO target ...").bind(...).execute(&mut *sp).await;

        match result {
            Ok(_) => {
                sp.commit().await?;  // RELEASE SAVEPOINT
                inserted += 1;
            }
            Err(_) => {
                sp.rollback().await?;  // ROLLBACK TO SAVEPOINT
                // 继续下一行
            }
        }
    }

    tx.commit().await?;
    Ok(inserted)
}

逐行 savepoint让 "一行失败不影响其他行" 成为可能——批量导入的常见模式。

15.12.3 重试(乐观并发)

rust
pub async fn update_with_retry(pool: &PgPool, id: i32, f: impl Fn(&Entity) -> Entity) -> Result<(), Error> {
    for attempt in 0..3 {
        let mut tx = pool.begin().await?;
        let entity: Entity = sqlx::query_as("SELECT ... WHERE id = $1 FOR UPDATE")
            .bind(id).fetch_one(&mut *tx).await?;

        let updated = f(&entity);

        let res = sqlx::query("UPDATE ... WHERE id = $1 AND version = $2")
            .bind(id).bind(entity.version)
            .execute(&mut *tx).await?;

        if res.rows_affected() == 1 {
            tx.commit().await?;
            return Ok(());
        } else {
            tx.rollback().await?;
            // 版本冲突,重试
        }
    }
    Err(Error::ConflictRetryExceeded)
}

乐观并发 + 版本冲突重试——显式 rollback + loop retry。用 Transaction 保证每次尝试隔离。

15.12.1 事务状态机可视化

把 Transaction 对象的完整生命周期画出来:

四条主要路径:

  1. Running → Committed:正常 commit 关闭。
  2. Running → RolledBack:显式 rollback 关闭。
  3. Running → DropRollback → Closed:没显式关就 drop——start_rollback 排队、等下次 I/O 发送。
  4. Closed 不是完整状态——严格说是"已排队 rollback 但 server 可能还没收到"的临界态。

三条正常路径(1/2/3)都最终到 [*]——但第三条的"到 [*]"延迟不定、server 侧最终一致。生产代码应当只走 1 或 2 路径

15.12.2 Transaction 的三种典型错误场景

场景 1:commit 失败

rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?;  // ← 这里可能失败

commit 失败的原因:

  • 网络断(ACK 没回来)——客户端不知道事务是否提交成功
  • 服务端 COMMIT 被 revoke(极罕见)——事务没提交。
  • 并发冲突(SERIALIZABLE 隔离级别下)——事务没提交。

错误处理tx.commit().await? 返回 Err——tx 已被 commit 消费、但 open 还是 true(commit 没成功设 false)——Drop 会触发 start_rollback——如果事务实际已经在 server 端 commit,这次 rollback 是 no-op(server 拒绝)

业务含义:commit 返回 Err 时你不能假设事务已生效——可能成功也可能失败。幂等设计是唯一安全做法——重试时先 SELECT 验证状态。

场景 2:begin 失败

rust
let mut tx = pool.begin().await?;  // ← 这里失败

begin 失败的原因:

  • pool 满(PoolTimedOut)。
  • 连接建立失败。
  • BEGIN SQL 被 server 拒绝(罕见)。

错误处理:tx 根本没构造出——无事务状态泄漏。

场景 3:query 失败

rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?;  // ← 这里失败
sqlx::query(...).execute(&mut *tx).await?;  // 不到这
tx.commit().await?;                          // 不到这

? 冒泡错误——tx 离开作用域 drop——start_rollback 排队 ROLLBACK。

Postgres 特殊:第一条 query 失败后事务进入 "aborted" 状态、后续 query 全拒绝 current transaction is abortedrollback 是唯一出路——drop 的 start_rollback 正好是期望行为。

三种错误场景的共性始终显式 ? 传播错误、让 Rust 的 Drop 机制处理未完成的事务。不要自己 try/catch + 手动 rollback——Rust 的 RAII 已经帮你做了。

15.13 本章小结

本章拆开 sqlx Transaction 的每一处设计:

  1. Rust async Drop 的根本限制(§15.1)—— Drop 同步、事务回滚需要 async I/O、两者冲突。sqlx 的对策是显式 commit/rollback + start_rollback 同步排队。
  2. TransactionManager trait(§15.2)—— 五个方法 begin/commit/rollback(async)+ start_rollback(sync)+ get_transaction_depth。驱动层抽象。
  3. Transaction 两字段 struct(§15.3)—— connection + open。MaybePoolConnection 统一 Connection-originated 和 Pool-originated。
  4. begin 的 depth-based SQL 选择(§15.4)—— depth = 0 → BEGIN;depth > 0 → SAVEPOINT _sqlx_savepoint_N。Rollback guard 兜底 begin 过程的错误。
  5. commit / rollback async(§15.5)—— 执行 SQL + 更新 depth + 设置 open = false。失败时 open 保持 true 让 Drop 兜底。
  6. start_rollback 同步排队(§15.6)—— queue_simple_query 写 buffer 不发送。真正发送要等下次 I/O。
  7. Drop 的"尽力 rollback"(§15.7)—— 中间路径:不做太 eager(避免 deadlock)、也不完全放弃(保证最终发送)。
  8. SAVEPOINT 嵌套(§15.8)—— 连续数字命名、commit/rollback 按 depth 反推 savepoint 名。
  9. DerefMut 到 Connection(§15.9)—— &mut *tx 变成 &mut Connection、满足 Executor bound。
  10. Connection::transaction 闭包(§15.10)—— 显式 commit/rollback 包装、错误路径显式 rollback、比手写 begin 更安全。
  11. 跨 DB 差异(§15.11)—— Postgres 的 aborted 状态、MySQL 的 DDL 隐式提交、SQLite 的单写者。
  12. 三个实战模式(§15.12)—— 转账、带 savepoint 的批量导入、乐观并发重试。

第四部分"连接与事务"到此结束。下一章进入第五部分"驱动实现"——Postgres 驱动的协议栈、从 Parse/Bind/Execute 的 Extended Query 到 pipelining 的完整实现。

15.14 Transaction 的三种"不要做"

生产里关于 Transaction 使用的三条反模式,值得单独列出:

不要做 1:在事务里调 tokio::spawn

rust
let mut tx = pool.begin().await?;
tokio::spawn(async move {
    sqlx::query("...").execute(&mut *tx).await;  // 编译错:tx 不 Send?
});

编译层面可能因为 Connection-originated Transaction 持有 &mut conn——不能跨 spawn。即便是 Pool-originated('static)也不推荐 spawn 里用——事务 commit/rollback 必须和 begin 在同一任务、否则语义混乱。

修复:所有事务操作在一个 task 内完成。

不要做 2:让事务持续数秒或更久

rust
let mut tx = pool.begin().await?;
long_external_api_call().await?;  // 几秒 HTTP 请求
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?;

事务持有时间长 = 持锁时间长 = 其他事务被阻塞更久 = scalability 瓶颈。Postgres 的 row lock 在事务中一直持有。

修复:事务只包围必要的原子操作;外部 API 调用在事务外做。

不要做 3:假设 Drop 会立即 rollback

rust
if condition {
    // 想"丢弃这个事务,不 rollback"——drop 让它 rollback 就够了
} else {
    tx.commit().await?;
}

错——drop 会 start_rollback、排队 ROLLBACK SQL——不是"什么都不做"。如果你真的想"不操作、让 server timeout 清理"——也做不到(start_rollback 已经执行)。

修复:想明确 rollback 就调 tx.rollback().await?;不想 rollback 就 commit 或保持路径一致。Drop 是兜底、不是你该依赖的"行为"。

15.15 Transaction API 的设计哲学

读完本章、回看整个 Transaction 设计——能看出 sqlx 团队的几条核心取舍:

1. 类型系统优于运行时检查。Transaction 吃 self commit/rollback——一次用完就消费掉、不能重复 commit。open 字段只在内部维护——用户看不到。用 Rust 类型一次性表达"事务是线性资源"。

2. 让异常路径尽可能正确。Rollback guard(begin 过程)、start_rollback(Drop)、open 字段(防重复)——全部都是"异常时也能兜底"的设计。代码里看不到一个"try/catch"——完全靠 RAII 和 flags。

3. 诚实暴露 Rust 限制。sqlx 没假装能同步 rollback、也没试图"偷偷跨线程 spawn rollback task"——直接文档写"drop 的 rollback 是尽力的"。这种诚实比"假装完美"换来更多用户信任。

4. 跨 DB 差异压到最低。同一套 Transaction 对外 API——底下 Postgres / MySQL / SQLite 的具体 SQL 和行为不同——但用户代码几乎不需要改。差异只在业务要用 DB 方言特性(如 ISOLATION LEVEL)时才显露。

5. RAII 自动化 + 显式调用鼓励。Drop 保底 + transaction() 闭包促显式。用户按场景选——简单场景让 RAII 管、需要精细控制时手写 begin + explicit commit。

这五条哲学共同构成 sqlx Transaction 的精神——"类型驱动安全 + RAII 保底 + 诚实文档 + 一套 API 跨 DB + 灵活性保留"。这是一份好的Rust 库设计模板——你设计自己的资源管理 API 时都可以套用。

15.16 第四部分回顾

本书第四部分"连接与事务"到此结束——四章 250+ 页内容讲完了 sqlx 最核心的资源管理抽象:

  • 第 12 章 Connection——单条连接的生命周期、协议、礼仪。
  • 第 13 章 Pool 外部 API——用户如何配置和使用 Pool。
  • 第 14 章 Pool 内部实现——idle queue、semaphore、maintenance task。
  • 第 15 章 Transaction(本章)——事务的 RAII guard、SAVEPOINT、尽力 rollback。

四章合起来构成了 sqlx 的"资源管理栈"——从最底层的 Connection 到最上层的 Transaction、每一层都有清晰职责、层层叠加。

读懂这部分的工程师有能力运维任何 sqlx 生产部署——Pool 配置、连接泄漏排查、事务边界设计、故障诊断都在掌握范围内。这也是本书最"实用派"的部分——第 1-11 章讲"sqlx 如何工作"、第 12-15 章讲"sqlx 在生产怎么用"。

下一部分进入驱动实现——Postgres / MySQL / SQLite / Any 四章。这部分偏"协议级深入"——读者可以按兴趣选。日常业务用 sqlx 不需要读;但想贡献 sqlx 代码、写自定义驱动、深度优化性能——这部分必读。

15.17 Transaction 常见问题 FAQ

读完本章,读者可能常有的几个问题:

Q1:嵌套事务(SAVEPOINT)在生产里真的用得着吗?

用得着,但场景较窄。典型用例:

  • 批量导入——每行一个 savepoint 隔离失败(§15.12.2)。
  • 子流程——某个独立操作可能失败但不影响主事务。
  • ORM 框架内部——sea-orm 等 ORM 用嵌套 savepoint 实现 per-method 的事务边界。

大多数业务代码不用嵌套——一个顶层事务包围所有操作就够。

Q2:事务里可以 await 其他非数据库操作吗?

可以但不建议

rust
let mut tx = pool.begin().await?;
sqlx::query(...).execute(&mut *tx).await?;
some_http_call().await?;  // 危险
sqlx::query(...).execute(&mut *tx).await?;
tx.commit().await?;

http_call 可能慢几秒——在这段时间里事务持有锁占 Pool 连接占 server 资源。高并发场景会让 Pool 饱和、其他事务等死。

规范:事务里只做数据库操作;外部调用在事务外。

Q3:query_as::<User>(...).fetch_all(&mut *tx).fetch_all(&pool) 行为有什么不同?

  • &mut *tx:走事务连接、看到事务内未提交的改动、读一致性受隔离级别控制。
  • &pool:借一个独立连接、看不到你自己事务里未提交的改动、和事务隔离。

在事务里想查"自己刚改的数据"——必须用 &mut *tx

Q4:为什么 Transaction 不直接用 Drop::drop 等待 rollback 完成?

Rust Drop::drop 是同步、不能 .await。解决方案只有两个:

  • spawn task——但 task 可能跑不完 Drop 就返回(主 runtime 可能 shutdown)。
  • block_on——可能死锁(当前 runtime 就 drop 时的那个)。

两者都不完美。sqlx 选了"start_rollback 同步排队 + 等下次 I/O 触发发送"——现实可行的最佳策略。

Q5:我的代码里 commit 返回错误是不是应该重试?

看情况。

  • 网络错:可以重试、但先查 DB 状态确认事务到底提交了没——commit 错误不代表事务失败、可能 ACK 丢了。
  • SERIALIZABLE 冲突:重试是正常流程(§15.12.3 的乐观并发)。
  • 逻辑错:不要重试、修代码。

规则:重试必须配合幂等设计——单次操作的重复执行不改变结果。

这五个 FAQ 覆盖生产里最常碰到的 Transaction 疑问——收藏本章作为事务使用的参考。

15.18 Transaction 源码阅读推荐

如果你想精读 sqlx Transaction 的源码,推荐顺序:

  1. sqlx-core/src/transaction.rs——core trait + Transaction struct + 3 条 ANSI SQL helper(300 行)。本章主要源码。
  2. sqlx-postgres/src/transaction.rs——PgTransactionManager 具体实现(110 行)。
  3. sqlx-mysql/src/transaction.rs——MySqlTransactionManager(类似 PgTransactionManager)。
  4. sqlx-sqlite/src/transaction.rs——SQLite 版本、通过 worker command 发 BEGIN(同样默认 DEFERRED 语义)。
  5. sqlx-core/src/pool/maybe.rs——MaybePoolConnection 枚举(20 行)——Transaction 用来统一 Connection/PoolConnection 来源。

总共 500-700 行——一个熟悉 Rust async 的工程师 1-2 小时能通读。读完后你对"Rust 里实现 RAII 风格事务"这件事有完整理解——在你自己的项目里需要实现类似"作用域式资源释放"时就知道怎么做。

特别留意

  • transaction.rs:260-275 的 Drop 实现和注释——解释了"尽力 rollback"的哲学。
  • PgTransactionManager::begin 里的 Rollback guard——部分成功的错误路径兜底。
  • 三条 ANSI SQL helper 的 Cow<'static, str> 返回类型——零拷贝常见情况 + 按需分配动态情况。

这些细节不读代码看不出来、读一遍让你对 Rust 工艺水平有更深的欣赏。

15.19 Transaction 给 Rust 设计者的通用启示

跳出 sqlx 看 Transaction 的设计,有四条可迁移的启示:

1. 线性资源用 self 消费确保 "用一次"。commit / rollback 吃 self——事务对象被消费后类型系统不允许再用。这在 Rust 里比手动标志 + 运行时检查优雅太多。任何"只能做一次的操作"都值得用 self 表达。

2. Drop 用于保底、不用于正常路径。Transaction::Drop 是"万一用户忘了调 commit/rollback"的兜底——不是正常关闭流程。正常流程显式 commit/rollback。这条"Drop 仅兜底"的哲学让 API 边界清晰——用户知道什么该自己做、什么让 drop 管。

3. 跨 trait 的能力借用用 DerefMut。Transaction DerefMut 到 Connection——让 &mut *tx 变成 &mut Connection——不需要手写 impl Executor for Transaction。这让 "这个类型虽然不是 Connection 但在某些语境下可以当 Connection 用" 的需求零成本满足。

4. 诚实承认语言限制、写清楚文档。"Drop can't be async" 的限制不试图隐藏——注释、文档、tooling 都明确告诉用户。这比"假装没问题"换来更少的 bug 和生产事故——用户知道限制就不会踩坑。

这四条原则放在任何涉及资源管理 + 异步 + 错误处理的 Rust 项目里都适用——sqlx Transaction 是它们的一份精确展示。

15.20 从 Transaction 到驱动实现:中间观察

第 15 章是第四部分的收尾、也是第五部分(驱动实现)的桥梁

本章讨论的 TransactionManager trait 和 ANSI SQL helper——是 driver-independent 的接口抽象;真正的 driver-specific 实现在每个驱动 crate 里(PgTransactionManager / MySqlTransactionManager / SqliteTransactionManager)。

从这一步开始往下,我们要讨论的是 "具体 DB 协议"——怎么发 Parse + Bind + Execute 消息(Postgres)、怎么处理 COM_STMT_EXECUTE 的 null_bitmap(MySQL)、怎么把同步 C API 包装成 async(SQLite)。

这条"从抽象到具体"的路径让 sqlx 的学习曲线分阶段——初学者读第 1-11 章就能用好 sqlx;读到第 12-15 章能在生产环境稳定部署;读到第 16-19 章(驱动实现)能贡献代码或写自定义驱动。你不需要一次读完所有章节——按自己阶段取用。

15.21 第四部分的一个核心教训

第 12-15 章贯穿一条教训:Rust async 在"资源管理"层面和同步 Rust 有本质差异——Drop 不能 async、生命周期更长、spawn task 有 capture 限制。sqlx 的解法可以总结成三件事

  1. 显式 commit/rollback/close 是 async——保证协议级正确性。
  2. Drop 走 start_rollback / close_hard 同步排队——最佳 effort 兜底。
  3. 文档和 API 设计持续提醒用户"显式"——避免用户依赖不完善的自动路径。

这三件事在 Pool(第 12-14 章)和 Transaction(本章)里都有体现——是同一套设计哲学的不同展开。读懂这条哲学、你在自己 Rust 项目里处理类似问题时有明确的参照。

第 16 章开始,我们走出资源管理的讨论、进入协议栈的深水区——Postgres 驱动的 Extended Query 协议是 Rust 里最硬核的 async 状态机之一。

15.22 Transaction 代码品质观察

最后一点对 sqlx transaction 源码的品质观察:

1. 整个 transaction.rs 只有 302 行——包括文档注释和一大段被注释掉的 Executor impl(§第 4 章 §4.6)。去掉注释后业务代码只有 100-150 行。如此少的代码承担了事务这么核心的功能——体现了抽象的力量。

2. 核心方法(commit/rollback)只有 5-6 行——调驱动 + 设 open = false。所有复杂度都推到 TransactionManager trait 的实现方——core crate 保持薄。

3. 三个 SQL helper 是 pub fn——begin_ansi_transaction_sql 等可以被驱动以外的代码调。这让未来可能的第三方驱动(ClickHouse 等)可以复用 ANSI SQL 生成,不用自己重写。

4. 注释里承认限制——start_rollback 的 "depends on the database" / Drop 注释的 "next asynchronous invocation" ——不美化、不含糊。这种文档品质让用户建立准确预期、不踩坑。

5. #[doc(hidden)] 用得克制——只 TransactionManager trait 和 Transaction::begin 标 hidden。其他公共 API 都给用户看。这让 sqlx 的文档站既不暴露内部细节、又不藏掉有用信息

这五点观察是代码审美层面的——读 sqlx 源码你能感受到作者对"简洁"的追求。每次有"这段能不能更短"的疑问,sqlx 通常已经想过了——减无可减的状态下才是现在的形态。

15.23 本章的最终消化

读完第 15 章,你应该能答下面这些问题:

  1. 为什么 Transaction::commit 吃 self?——线性资源、一次性消费、类型系统保证不重复 commit。
  2. Drop 里做什么?——start_rollback 排队 ROLLBACK SQL 到 buffer。
  3. "尽力 rollback" 意味着什么?——Drop 不保证消息立即发送、只保证最终发送(下次连接 I/O)。
  4. SAVEPOINT 嵌套的 SQL 是什么?——SAVEPOINT _sqlx_savepoint_NRELEASE SAVEPOINT ...ROLLBACK TO SAVEPOINT ...
  5. 为什么 &mut Transaction 不直接是 Executor?——lazy normalization 限制(§第 4 章 §4.6)——通过 DerefMut 让 &mut *tx 成为 &mut Connection
  6. 生产代码应该依赖 Drop 自动 rollback 吗?——不应该、永远显式 commit/rollback。
  7. commit 返回 Err 应该假设事务没提交吗?——不能假设、可能提交了只是 ACK 丢失——需要幂等设计。

能答出这七题说明你对 Transaction 理解到位了——第四部分的学习成果到此兑现。

15.24 事务性能数据

用粗略数据给事务相关操作一个量级估计:

操作本地 Postgres跨机房 Postgres (10ms RTT)
pool.begin().await1-3ms15-25ms
&mut *tx 一次 query.execute()1-3ms10-15ms
tx.commit().await1-3ms10-15ms
Drop 的 start_rollback< 1μs (同步排队)同左
嵌套 savepoint begin1-3ms10-15ms

观察:事务的每个操作都是一次 DB 往返——跨机房累加很快。一个"begin + 3 条 query + commit"事务在跨机房可能50-70ms——相当明显。

优化方向

  1. 减少事务内 query 数量——把多次查合并成一条 SQL(JOIN / CTE / RETURNING)。
  2. batch 操作——一条 INSERT 带 1000 行胜过 1000 次 INSERT。
  3. 保持事务短——事务外的只读查询分离出去。
  4. sqlx::raw_sql 批量执行 migration 脚本——一条 simple query 一次性发。

这些优化都是"减少 round-trip"——对事务性能影响最大的因素。

15.24a 三条 ANSI SQL 生成函数

sqlx 的 SAVEPOINT 嵌套靠三个小函数生成 SQL(sqlx-core/src/transaction.rs:277-302):

rust
pub fn begin_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
    if depth == 0 {
        Cow::Borrowed("BEGIN")
    } else {
        Cow::Owned(format!("SAVEPOINT _sqlx_savepoint_{depth}"))
    }
}

pub fn commit_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
    if depth == 1 {
        Cow::Borrowed("COMMIT")
    } else {
        Cow::Owned(format!("RELEASE SAVEPOINT _sqlx_savepoint_{}", depth - 1))
    }
}

pub fn rollback_ansi_transaction_sql(depth: usize) -> Cow<'static, str> {
    if depth == 1 {
        Cow::Borrowed("ROLLBACK")
    } else {
        Cow::Owned(format!(
            "ROLLBACK TO SAVEPOINT _sqlx_savepoint_{}",
            depth - 1
        ))
    }
}

三个函数一眼看完嵌套语义

  • depth=0 → BEGIN(真事务启动)。
  • depth≥1 → SAVEPOINT _sqlx_savepoint_<depth>(嵌套点)。
  • commit depth=1 → COMMIT;commit depth>1 → RELEASE SAVEPOINT ...
  • rollback depth=1 → ROLLBACK;rollback depth>1 → ROLLBACK TO SAVEPOINT ...

SAVEPOINT 命名前缀 _sqlx_savepoint_—— 和用户 SAVEPOINT 永远不冲突(用户一般不用下划线开头)—— 和 _sqlx_migrations 表的命名策略一致——下划线前缀 = sqlx 内部空间

depth 计数由 TransactionManager 维护——每家驱动实现 begin/commit/rollback 时维护自己的 depth:Postgres 用 PgConnection::transaction_depth、MySQL 用 MySqlConnection::transaction_depth、SQLite 用 WorkerSharedState::transaction_depth同一套 ANSI SQL 生成函数 + 三种 depth 存储—— 共享代码最大化。

Cow<'static, str> 的选择—— depth=0/1 的常见路径返回 Borrowed("BEGIN") / Borrowed("COMMIT")—— 零分配;嵌套路径才分配 owned String—— 常见路径 fast、边界路径慢一点但仍正确。

这种"常见路径零开销、少见路径接受开销"是 sqlx 全书反复出现的设计风格。

15.24b Transaction::drop 的 start_rollback 含义

源码sqlx-core/src/transaction.rs:260-275):

rust
impl<'c, DB> Drop for Transaction<'c, DB>
where DB: Database,
{
    fn drop(&mut self) {
        if self.open {
            // what this does depends on the database but generally this means we queue a rollback
            // operation that will happen on the next asynchronous invocation of the underlying
            // connection (including if the connection is returned to a pool)
            DB::TransactionManager::start_rollback(&mut self.connection);
        }
    }
}

关键词 start_rollback(注意是 start)——不是 "执行 rollback"—— 是 "排队一条 rollback"。因为 Drop 不能 async——不能直接发 ROLLBACK 并等响应——只能标记这个连接"下次用的时候先跑 ROLLBACK"。

每家驱动实现 start_rollback 不同:

  • Postgres—— 设 PgConnection::pending_rollback = true—— 下次 run 前补一个 ROLLBACK。
  • MySQL—— 类似机制、设 flag。
  • SQLite—— worker 线程收到下一个 command 前先 rollback。

self.open: bool—— 标志位—— commit/rollback 显式调用时置 false—— Drop 看到 false 就什么都不做。

结果—— 用户忘记显式 commit/rollback、业务代码早退、Drop 自动触发"下次 rollback"—— 不丢数据、不弄脏连接—— Rust 的 RAII 把 "C++ 程序员容易忘的事" 变成 "编译器强制的事"。

这段注释是全 sqlx 最重要的注释之一——明确告诉读者 Drop 的语义和限制—— "rollback 会在下次用这条连接时发生"—— 用户不需要 assume Drop 立刻 rollback。

补一句对比—— Java JDBC 的 Connection.close() 默认 commit 未提交事务——很多历史 bug 源于此(崩溃时意外 commit 了半截改动)。sqlx 的 "Drop 触发下次 rollback" 反其道而行—— 默认 rollback 比默认 commit 安全得多——这是 Rust async 库学到 Java 教训后做出的正确默认。

15.25 实战示例:带 retry 的幂等转账

把本章所有内容用一个最完整的例子收尾——一个生产级的带重试的转账函数:

rust
use sqlx::{PgPool, Error};
use std::time::Duration;

pub async fn transfer_with_retry(
    pool: &PgPool,
    from: i32,
    to: i32,
    amount: i64,
    idempotency_key: &str,
) -> Result<(), TransferError> {
    const MAX_ATTEMPTS: u32 = 3;

    for attempt in 0..MAX_ATTEMPTS {
        match transfer_once(pool, from, to, amount, idempotency_key).await {
            Ok(()) => return Ok(()),
            Err(TransferError::Conflict) if attempt < MAX_ATTEMPTS - 1 => {
                // 乐观并发冲突,重试
                let backoff = Duration::from_millis(10 * 2u64.pow(attempt));
                tokio::time::sleep(backoff).await;
            }
            Err(TransferError::MaybeCommitted) if attempt < MAX_ATTEMPTS - 1 => {
                // commit 失败但可能已生效——幂等 key 让重试安全
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
            Err(e) => return Err(e),
        }
    }
    Err(TransferError::RetriesExceeded)
}

async fn transfer_once(
    pool: &PgPool,
    from: i32, to: i32, amount: i64,
    idempotency_key: &str,
) -> Result<(), TransferError> {
    let mut tx = pool.begin().await?;

    // 幂等 key 防重复
    let existing: Option<i64> = sqlx::query_scalar(
        "SELECT amount FROM transfers WHERE idempotency_key = $1"
    ).bind(idempotency_key).fetch_optional(&mut *tx).await?;

    if existing.is_some() {
        return Ok(());  // 已经转过了
    }

    // 扣款 + 乐观锁(version 列)
    let affected = sqlx::query(
        "UPDATE accounts SET balance = balance - $1, version = version + 1
         WHERE id = $2 AND balance >= $1"
    ).bind(amount).bind(from).execute(&mut *tx).await?.rows_affected();

    if affected != 1 { return Err(TransferError::InsufficientFunds); }

    // 入账
    sqlx::query("UPDATE accounts SET balance = balance + $1, version = version + 1 WHERE id = $2")
        .bind(amount).bind(to).execute(&mut *tx).await?;

    // 记录
    sqlx::query("INSERT INTO transfers (idempotency_key, from_id, to_id, amount) VALUES ($1, $2, $3, $4)")
        .bind(idempotency_key).bind(from).bind(to).bind(amount)
        .execute(&mut *tx).await?;

    // commit
    match tx.commit().await {
        Ok(()) => Ok(()),
        Err(e) if e.is_connection_error() => Err(TransferError::MaybeCommitted),
        Err(e) => Err(TransferError::Other(e)),
    }
}

这个例子整合了本章所有要点

  • 显式 begin + commit——不依赖 Drop。
  • 幂等 key——commit 错误后重试不会重复扣款。
  • 乐观锁(version 列)——并发冲突返回 affected != 1。
  • 重试逻辑——指数退避 + 有限次数。
  • commit 失败分类——连接错误 vs 逻辑错误——区分重试与否。

这段 60 行代码整合了本章所有要点——显式 begin+commit、幂等 key、乐观锁、指数退避重试、commit 失败按连接错/逻辑错分类——是生产级事务代码的最小完整样本

第 16 章开始进入驱动层——Postgres Extended Query 协议的 Rust async 实现。

基于 VitePress 构建