Skip to content

第7章 current_thread runtime 与 LocalSet

"When you only have one thread, all problems look like scheduling—and fewer of them are problems." —— 笔者

本章要点

  • current_thread runtime 只跑在一个线程上——调用 block_on 的那个线程同时扮演 worker 和主驱动者。没有 worker thread spawn、没有 work-stealing、没有 LIFO slot
  • Core 结构简化成一个 VecDeque<Notified> 队列 + 一个 tick 计数 + 一个 Driver——总共 6 个字段,比 multi_thread 的 Core 少 5 个
  • block_on 用 AtomicCell<Core> 实现所有权传递:多个线程可以同时调 block_on,但只有一个能拿到 Core;其他线程 park 等待
  • 主循环结构和 multi_thread 骨架一致——tick → poll 顶层 future → next_task → 运行 → park,但去掉了 steal_work 和 idle protocol
  • LocalSet 不是一个独立 runtime——它是一个可以放进任何 runtime 的 "!Send Future 容器"。你可以在 #[tokio::main] 多线程 runtime 里用 LocalSet::new().run_until(...) 跑 !Send 代码
  • spawn_local 依赖thread-local 里保存的当前 LocalSet 引用——调用时不在 LocalSet 的 run_until / block_on 里会 panic
  • LocalSet 实现了 Future——它的 poll 推进内部所有 !Send Task 一次、然后根据是否有剩余 Task 返回 Pending 或 Ready

7.1 为什么要有 current_thread runtime

到这里你可能会问:既然 multi_thread 这么厉害——work-stealing、LIFO slot、百万 QPS——为什么还要搞一个单线程 runtime?

答案是不是每个场景都需要、也不是每个场景都允许多线程。至少有四类真实场景单线程 runtime 是最优解、甚至是唯一解

场景一:GUI 主线程

几乎所有 GUI 框架(iced、egui、GTK、Windows WPF、Cocoa)都要求所有 UI 操作在 UI 线程上执行——这是操作系统层面的硬约束。如果你在 worker thread 上更新 UI,轻则奇怪的 bug,重则崩溃。

如果 GUI 应用还想跑异步(比如后台 HTTP 请求),最自然的做法是:UI 线程运行 current_thread runtime,async fn 里的 .await 让出给 UI 事件循环。icedegui-async 等框架内部都这么干。

场景二:CLI 工具和简单脚本

cargo xx 类工具、一次性数据处理脚本、小型 CLI——它们跑一个任务就退出。开 4-8 个 worker 线程完全是浪费,光 spawn 线程的开销就几百微秒,而整个脚本可能只跑 50 毫秒。

current_thread 启动比 multi_thread 快 10-100 倍——没有线程 spawn、没有 worker metrics、没有 inject queue。对于"主函数跑几下就 exit"的场景明显更合适

场景三:嵌入已有事件循环

Electron、Flutter、游戏引擎、数据库服务器——这些系统自己有复杂的主事件循环,不允许 Tokio 另外起 worker thread(会和主循环抢 GIL / mutex / 资源)。

这种情况下把 Tokio 作为"可驱动的子引擎":用 current_thread runtime 的 block_on 把 async 代码从主循环里跑起来、跑完返回主循环deno 早期版本、wry(一个跨平台 WebView 库)都走这条路。

场景四:需要访问 !Send 类型

有些类型不能跨线程——RcRefCell、一些 GUI 句柄、未同步的 FFI 对象。multi_thread runtime 要求 spawn 的 Future 是 Send + 'static,上面这些类型直接卡住。

current_thread 不要求 Send——因为根本不会跨线程移动 Future。LocalSet(本章后半讲)把这个能力进一步带到多线程 runtime。

四种场景的统一底色

四种看似不同的场景,底层都是一个理由:"多线程在这里不是优势,是负担"

  • GUI 场景:多线程不被允许
  • CLI 场景:多线程没有收益
  • 嵌入式场景:多线程破坏架构
  • !Send 场景:多线程不可行

Tokio 认识到这一点并专门提供单线程运行时——这是对工程现实的尊重。对比一些运行时"只支持多线程、嵌入式 /!Send 不管"的做法,Tokio 的多样性承诺了它"几乎在任何场景都可用"。


7.1½ #[tokio::main(flavor = "current_thread")] 展开出了什么

把第 4 章讲的 #[tokio::main] 宏展开放到当前上下文里再看一次。加 flavor = "current_thread" 参数后,展开结果是:

rust
// #[tokio::main(flavor = "current_thread")] 展开
fn main() {
    let body = async { /* 你的 async 代码 */ };

    tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .expect("Failed building the Runtime")
        .block_on(body)
}

和多线程版的唯一区别是 new_current_thread() 代替了 new_multi_thread()。这一个改动会让第 4 章图里所有的 scheduler 分叉走向 current_thread 一支——用第 7.2 节的结构代替第 5 章的结构。

实际工程经验:在一个典型的 Rust 项目里,测试代码用 #[tokio::test](默认 current_thread)、服务入口用 #[tokio::main](默认 multi_thread)。这个约定让单元测试快速、让生产服务利用多核——一个项目里两种 runtime 并存是常态。

如果你的单元测试代码里有 spawn_blockingblock_in_place 或者手动起 worker 线程的逻辑,记得给测试加 #[tokio::test(flavor = "multi_thread")]——否则在 current_thread 下会 panic。这是 4.10½ 节埋下的伏笔、这里正式点破。


7.2 CurrentThread / Core / Handle / Shared 的四件套

打开 tokio/src/runtime/scheduler/current_thread/mod.rs,四个核心 struct。原样贴出:

rust
// 来源:tokio-rs/tokio · tokio/src/runtime/scheduler/current_thread/mod.rs (tokio-1.40.0)

pub(crate) struct CurrentThread {
    /// Core scheduler data is acquired by a thread entering `block_on`.
    core: AtomicCell<Core>,

    /// Notifier for waking up other threads to steal the driver.
    notify: Notify,
}

struct Core {
    tasks: VecDeque<Notified>,
    tick: u32,
    driver: Option<Driver>,
    metrics: MetricsBatch,
    global_queue_interval: u32,
    unhandled_panic: bool,
}

pub(crate) struct Handle {
    shared: Shared,
    pub(crate) driver: driver::Handle,
    pub(crate) blocking_spawner: blocking::Spawner,
    pub(crate) seed_generator: RngSeedGenerator,
    pub(crate) task_hooks: TaskHooks,
}

struct Shared {
    inject: Inject<Arc<Handle>>,
    owned: OwnedTasks<Arc<Handle>>,
    woken: AtomicBool,
    config: Config,
    scheduler_metrics: SchedulerMetrics,
    worker_metrics: WorkerMetrics,
}

对比 multi_thread 的相同结构(第 5 章):

字段multi_threadcurrent_thread
任务队列queue::Local<...> 256 容量环形 + LIFO slotVecDeque<Notified> 动态增长
Worker 线程数N = CPU 核数1(就是 block_on 调用者)
工作窃取✅ 有 remotes + Steal❌ 无
LIFO slot✅ 有❌ 无
Idle 协议✅ 复杂的 searching counter❌ 简单的 notify: Notify
全局队列✅ inject✅ inject(但作用窄)
OwnedTasks✅ 有✅ 有

current_thread 版的 Core 少了三样:没有 run_queue 的环形缓冲结构(用普通 VecDeque 就够)、没有 LIFO slot(单线程不需要)、没有 is_searching(单线程永远不需要从别处偷)。

AtomicCell<Core>:核心的所有权传递

注意 CurrentThread 里 core 是 AtomicCell<Core>——这和 multi_thread 的 Worker 里的 AtomicCell<Core> 一样,但用途微妙不同。

multi_thread 里 AtomicCell 的用途是让 shutdown 能"原子地夺走 Core"。current_thread 里的用途更有意思——它允许多个线程同时调 block_on

rust
// 某个线程 A
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
rt.block_on(async { /* ... */ });

// 另一个线程 B 也可能拿到 rt 的引用(通过 Handle clone)调 block_on
// 虽然罕见,但合法

AtomicCell 保证只有一个线程拿到 Core——其他调用 block_on 的线程会被 park,等 Core 被释放后再去抢。只是一次只有一个能工作,但 API 允许多线程同时调用、不会 panic。


7.3 block_on:等待 Core 可用

看 current_thread 的 block_on 真实代码。原样

rust
// 来源:tokio/src/runtime/scheduler/current_thread/mod.rs
#[track_caller]
pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
    pin!(future);

    crate::runtime::context::enter_runtime(handle, false, |blocking| {
        let handle = handle.as_current_thread();

        loop {
            if let Some(core) = self.take_core(handle) {
                handle.shared.worker_metrics
                    .set_thread_id(thread::current().id());
                return core.block_on(future);
            } else {
                let notified = self.notify.notified();
                pin!(notified);

                if let Some(out) = blocking
                    .block_on(poll_fn(|cx| {
                        if notified.as_mut().poll(cx).is_ready() {
                            return Ready(None);
                        }
                        if let Ready(out) = future.as_mut().poll(cx) {
                            return Ready(Some(out));
                        }
                        Pending
                    }))
                    .expect("Failed to `Enter::block_on`")
                {
                    return out;
                }
            }
        }
    })
}

一眼看不透?拆开来看。这段代码做两件事:

分支 A:拿到 Coreself.take_core(handle) 尝试从 AtomicCell 取 Core。如果成功(当前没有其他线程占着),就调 core.block_on(future)——真正的主循环在这里。

分支 B:没拿到 Core 别的线程正在用 Core。此时当前线程做什么?不闲等——进入一个 fallback 路径:

  • self.notify.notified() 信号(其他线程释放 Core 后会通过 Notify 唤醒)
  • 同时也 poll 当前的 future——如果 future 自己能 Ready 就直接返回(不需要 Core)
  • 两个条件任一满足就退出

为什么要这么复杂? 因为 current_thread runtime 可能被多个线程同时 block_on(一种罕见但合法的用法)——线程 A 正在用 Core、线程 B 又调了 block_on。B 不能直接 panic,也不该死等——所以 B 进入 "并行 poll 自己的 future" 模式,如果自己的 future 恰好不需要 Core 就直接完成。

这是 Tokio 对"多线程调 block_on"的优雅解。当然,大多数用户的代码只在主线程调 block_on 一次,走分支 A。但这个分支 B 的存在让边缘场景也安全,不会产生锁死或 panic。

track_caller:一次良心的 API 设计

注意 #[track_caller]——第 4 章讲过,它让 panic 的行号指向调用方。这个属性在 block_on 的链式调用里救命:如果 block_on 在 runtime 内被嵌套调用会 panic,panic 消息指向你代码的那一行而不是 Tokio 内部。


为什么要 AtomicCell 而不是 Mutex

一个合理的问题:既然 Core 是"谁拿到谁用"的独占资源,为什么不用 Mutex<Core>?这样 lock().unwrap() 就拿到了。

答案是性能和语义的双重考虑

性能上Mutex::lock 涉及阻塞——如果 Mutex 被占用,调用者会进入 OS 级的 wait 队列、触发调度切换。这比 AtomicCell::take(纯原子操作)慢 1-2 个数量级。对于 block_on 这种 hot entry 点,AtomicCell 更合适。

语义上Mutex::lock 的语义是"我会一直等到拿到锁"。但 block_on 的语义是"如果拿不到锁,我可以改做别的事(比如 poll 我自己的 future)"——7.3 节分支 B 那段双轨 poll 就是这个语义。AtomicCell::take 返回 Option<T>——None 时调用者自己决定等待策略,完美符合 block_on 的需求。

第三层AtomicCell 本身比 Mutex 轻量。Mutex 是带 poisoning、带 inner data、几百字节的大家伙;AtomicCell 就一个 atomic pointer,8 字节。对于 current_thread runtime 这种"希望尽量轻量"的定位,选择是明确的。

这个选择反映了 Rust 生态对原子原语的细分Mutex / RwLock / Atomic* / AtomicCell / Cell / RefCell 每种都有明确的定位和适用场景。Tokio 源码是这类分类学的教科书——每个同步原语的使用都恰到好处。

take_coreNotify 的协作

上面的分支 B 里最有意思的是:没拿到 Core 的线程不是死等,而是进入一个"既等 Notify、又 poll future"的双轨 poll。这需要 Notify::notified()——Tokio 提供的一个"一次性通知"原语。

Notify::notified() 返回一个 Notified Future:

  • poll 这个 Future 会把 Waker 注册到 Notify
  • 当别人调 notify.notify_one() 时,Notify 会 wake 这个 Waker
  • 这次 poll 返回 Ready

CurrentThread.notify 在 Core 被释放时触发——具体是 core.block_on 的结尾、drop(core) 时:

rust
// 简化自 core 释放的 drop 逻辑
impl Drop for CoreGuard {
    fn drop(&mut self) {
        let core = self.core.take().unwrap();
        self.scheduler.core.set(core);  // 把 Core 放回 AtomicCell
        self.scheduler.notify.notify_one();  // 通知一个等待的线程
    }
}

双轨 poll 的语义

  • 如果 notified 先 Ready(Core 空闲了),当前线程下一轮循环尝试 take_core
  • 如果 future 先 Ready,直接返回结果——说明 future 不需要 Tokio 运行时也能完成

这个设计的好处block_on 永远不会卡死。即使某个线程拿着 Core 跑了一个慢任务,其他线程的 block_on 也有机会推进自己的 future(如果不需要 runtime 功能)。

但代价:一段时间内可能有多个线程都在 poll——每个线程 poll 自己的 future。这不是理想状态(效率降低),但比死锁好。


7.4 主循环:和 multi_thread 的差异

拿到 Core 后,进入真正的主循环。原样(删除 metrics 样板后):

rust
// 来源:tokio/src/runtime/scheduler/current_thread/mod.rs
fn block_on<F: Future>(self, future: F) -> F::Output {
    let ret = self.enter(|mut core, context| {
        let waker = Handle::waker_ref(&context.handle);
        let mut cx = std::task::Context::from_waker(&waker);

        pin!(future);

        'outer: loop {
            let handle = &context.handle;

            // 1. 检查是否需要 poll 顶层 future
            if handle.reset_woken() {
                let (c, res) = context.enter(core, || {
                    crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
                });
                core = c;
                if let Ready(v) = res {
                    return (core, Some(v));
                }
            }

            // 2. 跑 event_interval 个 Task
            for _ in 0..handle.shared.config.event_interval {
                if core.unhandled_panic { return (core, None); }
                core.tick();
                let entry = core.next_task(handle);
                let task = match entry {
                    Some(entry) => entry,
                    None => {
                        // 没任务了,park
                        core = if !context.defer.is_empty() {
                            context.park_yield(core, handle)
                        } else {
                            context.park(core, handle)
                        };
                        continue 'outer;
                    }
                };
                let task = context.handle.shared.owned.assert_owner(task);
                let (c, ()) = context.run_task(core, || { task.run(); });
                core = c;
            }

            // 3. event_interval 次后强制 yield + 处理 driver 事件
            core = context.park_yield(core, handle);
        }
    });
    ret.unwrap_or_else(|| panic!("..."))
}

三个阶段循环

  1. poll 顶层 future:如果它被 woken 过,poll 一次;Ready 就退出整个 block_on
  2. 跑 Task:最多连跑 event_interval(默认 61)个 Task;队列空了就 park
  3. 强制 yield:每 61 个 Task 后去一次 Driver 查 I/O 事件

和 multi_thread 主循环的对比

multi_thread 版本(第 5 章回顾)的主循环:

  • 三条腿:next_task / steal_work / park
  • 复杂的 transition_to_searching 协议

current_thread 版本:

  • 两条腿:next_task / park(没有 steal)
  • 简单的 reset_woken 检查(单线程不需要 searching 状态机)
  • 顶层 future 是一等公民——循环开头就 poll 它,保证 block_on 的 future 不会饿死在其他 Task 队列后面

这种"顶层 future 优先"的调度是 current_thread 的语义保证:你调 block_on(async { ... }) 时,那段 async 代码永远会被优先推进,哪怕其他 Task 塞满了队列。multi_thread 没这个保证——顶层 future 只是一个普通 Task,按 scheduler 决定。

reset_woken 的意义

这一行:

rust
if handle.reset_woken() {
    // poll 顶层 future
}

Handle.shared 里有一个 woken: AtomicBool。每次有 Task 被 wake(Waker 触发)或顶层 future 被 wake,这个 bool 置 true。reset_woken 原子地读并清零。

意义:只有在"有东西 wake 我"之后才 poll 顶层 future——避免无谓的轮询。每次循环都 poll 顶层 future 会浪费 CPU;只 poll 被 wake 的才高效。


context.defer 的角色

主循环里有一行你可能没注意:

rust
core = if !context.defer.is_empty() {
    context.park_yield(core, handle)
} else {
    context.park(core, handle)
};

context.defer 是什么?它是 current_thread runtime 的**"延迟唤醒队列"——当一个 Task 在 poll 过程中 wake 了自己**(用 cx.waker().wake_by_ref()),这个自 wake 不会立刻把 Task 重新入队,而是放进 defer。然后主循环处理完当前批次后统一处理 defer。

为什么要分开? 因为 Task 自 wake 的常见原因是"让出时间片一次"—— 如果立即入队,这个 Task 会无限重复 poll,永远没机会让其他 Task 跑。分到 defer 保证每轮 event_interval 内每个自 wake 的 Task 只被 poll 一次

multi_thread runtime 也有类似机制——它在 LIFO slot 前后做类似调度平衡。两种 runtime 对"自 wake"都有特殊处理,保证公平性。


7.5 为什么 spawn 到 current_thread 不需要 Send

tokio::runtime::Builder::new_current_thread().build() 得到的 runtime,spawn 的签名和 multi_thread 版完全一样:

rust
// 来源:tokio/src/runtime/handle.rs(同一个 spawn 函数在两种 runtime 下)
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where F: Future + Send + 'static, F::Output: Send + 'static
{ /* ... */ }

注意仍然要 Send + 'static!即使是 current_thread runtime。

为什么? 因为 Handle 可以 clone 到任意线程——一个线程拿到 current_thread runtime 的 Handle,可以从非 runtime 线程调 spawn。这个跨线程传递要求 Future Send。即使实际执行在单线程,"从外部 spawn 进来"这个动作跨了线程

这就是为什么你会看到一个"反直觉"现象:current_thread runtime 也要求 spawn 的 Future 是 Send

那么怎么 spawn 真正 !Send 的 Future? 答案是 LocalSet——下半章的主角。


Send 约束的类型系统层级

再深入一层:要理解为什么 current_thread runtime 也要 Send,你需要明白 Rust 类型系统里 "数据结构所有权" 和 "数据结构使用" 的分离

Handle::spawn(future) 的签名要求 future: F where F: Send——这是调用点的约束。这个约束存在不是因为 future 会真的跨线程移动,而是因为:

  • spawn 会把 future 装进 Task<F, S>
  • Task 被塞进 scheduler 的某个队列
  • scheduler 的队列可能被另一个线程访问(即使 current_thread runtime,Handle 可以跨线程)
  • 因此 F 必须 Send——否则类型系统不允许这种"可能跨线程"的操作

实际执行阶段:current_thread runtime 只会在主线程 poll future——没有真的跨线程。但类型系统不区分"可能" 和 "实际",只按 最严约束

这是 Rust 类型系统的保守性——它宁愿让合法的使用多一些约束,也不放松约束留 UB 空间。对比 C++ 的 "程序员负责"、Go 的"运行时检查"——Rust 把安全推到编译期,代价是类型约束看起来多余。理解这个哲学,你就理解了 Rust 很多看似"啰嗦"的约束为什么存在。

跨线程 spawn 的 Trampoline

理解了上面"spawn 仍要 Send"这件事后,你可以想象一个巧妙的 pattern:从线程 A 调 Handle::spawn(future),future 在 current_thread runtime 的主线程 B 上执行

这是完全合法的 Tokio 用法:

rust
let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
let handle = rt.handle().clone();

// 线程 A
std::thread::spawn(move || {
    let fut = async { /* Send + 'static */ };
    handle.spawn(fut);   // ← future 被送到 current_thread 主线程 B 执行
});

rt.block_on(async { /* ... */ });  // 主线程 B 阻塞等 runtime

这种 pattern 让你可以从外部线程"投送"任务到单线程 runtime。用场:

  • 一个 GUI 主线程上的 runtime,接收来自 worker 线程的异步请求
  • 嵌入式场景,中断处理 handler 往 main loop runtime 投送任务
  • 测试场景,主 runtime 跑主流程,后台线程 spawn 测试任务

这就是 Send 约束的用处——它让这种"跨线程投送"类型安全。如果 current_thread 放弃 Send 约束,上面这个模式就不可能。


7.6 LocalSet:任何 runtime 都能跑 !Send Future

LocalSet 解决一个核心问题:!Send 的 Future 可以在 Tokio 的任何 runtime 上跑,包括多线程 runtime。

看它的定义:

rust
// 来源:tokio/src/task/local.rs
pub struct LocalSet {
    tick: Cell<u8>,
    context: Rc<Context>,
    _not_send: PhantomData<*const ()>,
}

struct Context {
    shared: Arc<Shared>,
    unhandled_panic: Cell<bool>,
}

两个关键点

  1. _not_send: PhantomData<*const ()>:这个 ZST 让 LocalSet 自身不是 Send——编译器不让你把 LocalSet 跨线程传递。这保证 "LocalSet 绑定到一个线程" 的语义
  2. context: Rc<Context>:用 Rc 而不是 Arc——单线程引用计数,比 Arc 快(不需要原子操作)

spawn_local 的 thread-local 魔法

spawn_local 的工作逻辑:

rust
// 来源:tokio/src/task/local.rs
#[track_caller]
pub fn spawn_local<F>(future: F) -> JoinHandle<F::Output>
where F: Future + 'static, F::Output: 'static
{
    if cfg!(debug_assertions) && std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
        spawn_local_inner(Box::pin(future), None)
    } else {
        spawn_local_inner(future, None)
    }
}

注意 bound:F: Future + 'static——没有 Send。只要 'static。

spawn_local_inner 通过一个 thread-local 变量找到"当前线程激活的 LocalSet",把 Future 推给它。如果调用 spawn_local 时没有活跃的 LocalSet(即你不在 LocalSet::run_untilLocalSet::block_on 里)——panic

spawn_local called from outside of a `task::LocalSet`.

LocalSet 内部的队列设计

LocalSet 内部维护一个独立的 Task 队列——不是用 Tokio 主 runtime 的 inject queue。它的 Shared 结构(简化):

rust
// 来源:tokio/src/task/local.rs
struct Shared {
    queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
    local_state: LocalState,
    waker: AtomicWaker,
    // ...
}

注意 queueMutex<Option<VecDeque>>——为什么需要锁? 因为 spawn_local 可能从任意 async context 调用,不一定在 LocalSet 自己 poll 的时刻调。具体来说:

  • LocalSet 线程在 poll 某个 Task 时
  • 这个 Task 通过 spawn_local 往 LocalSet 的 queue 插入新 Task
  • 同时 LocalSet 的外层(比如多线程 runtime 的 worker)可能从另一个上下文操作 queue

所以需要 Mutex 保护Option<...> 是为了支持"LocalSet drop 时清空队列"——drop 时把 Option 置 None,后续 spawn_local 看到就知道 LocalSet 已经死了。

为什么 spawn_local 不 panic 如果 LocalSet 正在 park?

一个微妙的点:LocalSet 的 poll 结束返回 Pending 后,LocalSet 本身不在"active"状态。此时如果另一个 Task(比如同一 runtime 的其他 Task)调 spawn_local——会发生什么?

Tokio 的 local.rs 里 spawn_local 实际上走了一个双路径

  • 快速路径:thread-local 里有 LocalSet 引用 → 直接 push 进 LocalSet 的 queue
  • 慢路径:thread-local 没有,但你在一个 LocalSet::block_on 的执行期里 → 通过 ContextGuard 找到 LocalSet

如果两路径都没找到活跃的 LocalSet → panic。

实际使用中,你几乎不会触发 panic——因为 spawn_local 的代码通常在 LocalSet 的 run_until 的 future 里执行,那个上下文永远有活跃的 LocalSet。


7.7 LocalSet 作为一个 Future

LocalSet 的精妙设计是:它自己就是一个 Future

rust
// 来源:tokio/src/task/local.rs
impl Future for LocalSet {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        self.context.shared.waker.register_by_ref(cx.waker());

        if self.with(|| self.tick()) {
            cx.waker().wake_by_ref();
            Poll::Pending
        } else if unsafe { self.context.shared.local_state.owned_is_empty() } {
            Poll::Ready(())
        } else {
            Poll::Pending
        }
    }
}

poll 做三件事

  1. 注册 cx 的 Waker:当内部 Task 需要唤醒 LocalSet 时用
  2. tick 一轮:运行内部所有活跃 Task 一次(self.with(|| self.tick()) 会在 thread-local 里临时把 LocalSet 设为"当前",然后 tick 一次)
  3. 判断返回
    • tick 返回 true(还有更多工作)→ wake 自己 + Pending(保证下一轮 poll)
    • owned tasks 空了 → Ready(())(LocalSet 完成)
    • 否则 Pending(有 Task 但都在等待)

用法:run_until

最常见的用法:

rust
let local = tokio::task::LocalSet::new();

local.run_until(async move {
    let rc = Rc::new(42);   // ← !Send 类型

    tokio::task::spawn_local(async move {
        println!("{}", rc);
    });

    // 等 spawn_local 的 Task 完成
    tokio::task::spawn_local(async { /* ... */ }).await.unwrap();
}).await;

工作原理

  • run_until 内部返回一个 RunUntil<F> Future
  • 每次 poll 这个 Future 时:先 poll LocalSet(推进所有 local task),再 poll 你的参数 Future
  • 参数 Future Ready 时返回 Output

结果:在多线程 runtime 的上下文里(比如 #[tokio::main]),你用 LocalSet::new().run_until(...) 就能跑 !Send 代码。LocalSet 本身是一个 Future,它不改变底层 runtime、只是给当前 Task 开一个"!Send 子世界"


register_by_ref 的精妙

LocalSet 的 poll 第一行是 self.context.shared.waker.register_by_ref(cx.waker())。这用的是 AtomicWaker——Tokio 提供的一个可原子替换的 Waker 槽

AtomicWaker::register_by_ref 的语义:

  • 如果槽里没有 Waker,存入当前 Waker
  • 如果槽里有 Waker 且 will_wake(cx.waker()) 为 true,不改(节省 clone)
  • 否则 clone cx.waker() 存入

这个设计和第 3 章讲的 "Tokio 全局 WAKER_VTABLE + will_wake" 完全配合——存在 LocalSet 里的 Waker 在大多数情况下是同一个 Task 的 Waker(因为 LocalSet 被同一个 Task 反复 poll),所以 will_wake 永远返回 true、不会触发额外 clone。hot path 零成本

这种设计在 Tokio 源码里反复出现——基本上所有"注册 Waker 给外部事件"的场景都用 AtomicWaker。学会它你就能看懂 Tokio 几十处类似代码。


7.8 LocalSet 的 3 种常见用法

用法 A:和 multi_thread 配合跑 !Send 子任务

rust
#[tokio::main]
async fn main() {
    let local = tokio::task::LocalSet::new();
    local.run_until(async {
        let conn = UnsyncDatabase::connect().await;   // 假设 UnsyncDatabase 是 !Send
        
        tokio::task::spawn_local(async move {
            conn.query("SELECT ...").await
        }).await.unwrap();
    }).await;
}

用法 B:作为一个独立的单线程 runtime

rust
// Builder::new_current_thread() + LocalSet 提供真正的单线程环境
let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let local = tokio::task::LocalSet::new();

local.block_on(&rt, async {
    // 这里 spawn_local 和 spawn 都可用
});

用法 C:spawn_local 但保持和 spawn 混用

rust
#[tokio::main]
async fn main() {
    let local = tokio::task::LocalSet::new();
    
    local.run_until(async {
        // spawn_local 跑 !Send 代码
        let local_task = tokio::task::spawn_local(async {
            let rc = Rc::new(vec![1, 2, 3]);
            // 操作 !Send 类型
        });
        
        // spawn 同时跑 Send 代码(跑在 multi_thread worker 上)
        let send_task = tokio::spawn(async {
            // 这段跑在其他 worker,并行执行
            heavy_computation().await
        });
        
        // 两者都 await
        let _ = local_task.await;
        let _ = send_task.await;
    }).await;
}

这个模式非常强大——你可以并行混合 Send 和 !Send 任务spawn 的任务用 multi_thread runtime 的 worker 跑,spawn_local 的任务用 LocalSet 在当前线程跑。两个世界和谐共存。

用法 D:集成进 GUI 事件循环

rust
// iced 或 egui 的 update 函数里
let local = tokio::task::LocalSet::new();

// 每次 update 时推进 LocalSet 一点
// local 里 spawn 的任务可以直接操作 UI 状态(!Send)

这三种用法覆盖几乎所有实际的 !Send 需求。


7.8½ LocalSet 和 Arc / Rc 的微妙关系

LocalSet 的 SharedArc<Shared>,而不是 Rc<Shared>为什么在一个单线程的结构里要用 Arc

答案是:Task 内部持有 scheduler 的反向引用(第 6 章讲过 Core<T, S> 里的 scheduler: S)。对于 LocalSet,这个 S 就是 Arc<Shared>——Task 需要跨 Send 边界(它会被放进 Tokio 标准 Task 结构,要求 S: 'static + ...),所以用 Arc。

但外层的 context: Rc<Context> 是 Rc——因为 Context 只在 LocalSet 自己的线程内用,不被 Task 结构持有。

这种"里层用 Rc 外层也用 Rc、但被 Task 持有的用 Arc"的混合策略是 LocalSet 实现里最精细的设计之一。读 local.rs 源码时注意每个类型是 Arc 还是 Rc——每个选择都对应一个具体约束


7.8¾ LocalSet 的 tick 方法里做什么

LocalSet Future impl 里 self.with(|| self.tick()) 是核心。tick 方法做一轮工作:

简化逻辑(对应 tokio/src/task/local.rs):

rust
// 简化示意
fn tick(&self) -> bool {
    for _ in 0..MAX_TASKS_PER_TICK {   // 通常是一个小常数,比如 61
        let task = match self.take_next_task() {
            Some(task) => task,
            None => return false,   // 队列空了
        };
        task.run();   // 运行 Task 一次(这会调 vtable.poll → 最终 poll Future::poll)
    }
    // 还有任务没处理完,返回 true 让外层继续 poll
    true
}

关键点

  • 每次 tick 限制处理任务数(避免一个 LocalSet 的 tick 占用 CPU 太久)
  • 处理完限额或队列空 → 返回
  • 返回 true 表示"我还有活"——LocalSet 的 poll 会 wake 自己触发下一轮 poll
  • 返回 false 表示"我没活了"——LocalSet 的 poll 根据 owned_is_empty 决定是 Ready 还是 Pending(Pending 等后续 spawn_local)

这种"有限步长的推进 + 完成度反馈"是 Rust async 世界的标准 Future 组合原语模式。你自己写类似的"驱动内部多个 future"的 Future 时可以复用这个模式。


7.9 和这个系列的其他书的关联

本章讲的 Rc<Context> + PhantomData<*const ()> 让类型 !Send 的技巧,在 Rust 编译器与运行时揭秘》第 8 章(trait object 与 vtable) 里有讨论——!Send / !Sync 是通过负 trait 实现impl !Send for ...)或通过持有 !Send 字段(比如 *const () 裸指针)传染出来的。LocalSet 用的是"持有 *const () 字段" 这一招。

Vue 3 设计与实现》第 10 章(组件系统) 里讲的"组件绑定到渲染器"模式,和 LocalSet 的"Future 绑定到 LocalSet 线程" 模式同构——都是用thread-local + PhantomData 保证某个对象不跨越某个边界。


7.9½ current_thread vs multi_thread 的性能对照

同样一个 HTTP 服务,在两种 runtime 下的真实对比(数字来自 Tokio 官方 benchmark 和社区公开压测):

启动成本

  • multi_thread runtime + 8 worker:~2-3 毫秒(spawn 线程 + epoll 初始化 + 时间轮初始化)
  • current_thread runtime:~200-300 微秒

差 10 倍。对于长期运行的服务无所谓,对于启动密集型场景(CLI 每次调用都要新建 runtime)是巨大差距。

吞吐(4 核机器上跑一个 echo 服务)

  • multi_thread:~200K QPS
  • current_thread:~80K QPS

多线程约 2.5 倍吞吐——这是多核 CPU 的自然加速比。但注意不是 4 倍——因为 worker 之间的协调(偷任务、idle 协议)吃掉一些开销。

单次请求延迟(p50)

  • multi_thread:~50-100 微秒
  • current_thread:~30-50 微秒

current_thread 反而更低延迟!因为单线程没有跨核缓存失效、没有 work-stealing 开销、没有 LIFO slot 的 atomic 操作。对于"低 QPS 但延迟敏感"的服务(比如一个内部 RPC 到关键组件),current_thread 反而更优

内存占用

  • multi_thread:每 worker 本地队列 256 × 指针 = 2 KB × 8 = 16 KB + 各种 worker 元数据
  • current_thread:一个 VecDeque,初始 0 字节,按需增长

对内存敏感场景,current_thread 可以节省几 MB。

经验法则

简单的选型清单:

  • 长期服务、CPU 核心多、高吞吐目标:multi_thread(默认选择)
  • CLI / 短寿命进程:current_thread(启动快)
  • 延迟敏感、吞吐不高current_thread 可能更优(别凭直觉用 multi_thread)
  • 需要 !Send 类型:current_thread 或 LocalSet
  • 嵌入 GUI / 游戏引擎 / 其他事件循环:current_thread
  • WASM 环境:current_thread(WASM 原本就单线程)
  • 内存受限:current_thread

7.9¾ 两种 runtime 的共享之处

虽然调度器不同,两种 runtime 共享大量代码

完全共享的部分

  • Task 结构(第 6 章讲的 Cell + Header + State):两种 runtime 的 Task 几乎一模一样,只是 scheduler 泛型参数 S 不同
  • Waker 实现(第 3 章的 vtable):同一套 WAKER_VTABLE
  • I/O Driver / Time Driver(第 8-11 章):两种 runtime 都用 mio 和分层时间轮
  • Blocking Pool(第 16 章):两种都有 spawn_blocking 能力
  • OwnedTasks 名册:两种都维护全局 Task 列表

差异部分

  • Scheduler:multi_thread 有 8 个 worker struct、current_thread 只有 1 个 Core
  • Queue:环形缓冲 vs VecDeque
  • Wake 路径的最后一步:multi_thread 把 Task 推给某个 worker,current_thread 推进共用队列

这种"绝大部分共享、只在调度层分叉"的架构让 Tokio 的代码总量可控——两种 runtime 不是两套独立实现,90% 代码共用。你学会 multi_thread 基本就会 current_thread。

这是抽象边界划分得好的标志:trait Schedule 定义了调度行为的契约,上层(Task、Waker)都通过这个 trait 工作,两种 scheduler 是 trait 的两种实现。


7.9½ 小而重要的决策:LocalSet 的 tick: Cell<u8>

LocalSet 结构里有个不起眼的 tick: Cell<u8> 字段。用 u8 而不是 u32——因为 tick 只用来做"每 N 次强制让步"的计数,8 位够了(最多 255 轮)。

这是 Tokio 源码里反复出现的经验——能用最小的类型就用最小的。LocalSet 的 tick 是 u8、CurrentThread 的 tick 是 u32、multi_thread Core 的 tick 是 u32(第 5 章讲过理由)。每处都按实际需求选择位宽——不盲目用 usize、也不盲目用 u64。

这种对每个字段的精打细算在整个 Tokio 源码里贯穿。读过之后你写自己的 Rust 代码时,会开始下意识地问"这里真的需要 usize 吗?u8 够不够?"——这是工程美学的潜移默化


7.10 本章小结

带走三件事:

  1. current_thread runtime 是 multi_thread 的简化版——只有一个线程、一个 VecDeque 队列、没有 work-stealing。适合 GUI、CLI、嵌入场景,启动快 10-100 倍。延迟敏感且吞吐不高的服务 current_thread 反而更优
  2. spawn 到 current_thread runtime 依然要 Send——因为 Handle 可以跨线程 clone。想真正在单线程 spawn !Send Future,用 LocalSet + spawn_local
  3. LocalSet 是一个 Future,自己可以 .await。它给调用线程开一个 "!Send 子世界"——本线程内的 Future 可以用 Rc / RefCell / 其他 !Send 类型。可以和 multi_thread runtime 组合使用——spawn_local 跑 !Send、spawn 跑 Send,共存无碍

一个总结性的认知升级

读完本章你应该对 Tokio 有一个升级的认知:Tokio 不是"一个" runtime,它是"一组"可组合的 runtime 组件——Builder 是组装入口、Scheduler 有两种、LocalSet 是横切的 !Send 扩展、Handle 是跨线程的门面。你可以按需挑选组合:

  • new_multi_thread() + enable_all() → 典型后端服务
  • new_current_thread() + enable_all() → CLI / GUI 主线程
  • new_multi_thread() + LocalSet.run_until() → !Send 子世界包裹 + 多核利用
  • new_current_thread() + 纯 LocalSet → 极简异步工具

这种可组合性是 Tokio 适用场景广泛的根源。把这层组合关系理解清楚,以后你面对任何异步需求都能从 Builder 开始自上而下设计,而不是凭直觉复制粘贴代码。

7.10⅓ 单线程 runtime 里的 .await 其实更"顺畅"

一个读者不常注意的现象:同样的 async 代码,在 current_thread runtime 里可能比 multi_thread runtime 跑得更"流畅"

原因在于调度行为的差异

  • multi_thread 下,你的 async fn 的不同 .await 点之间,Task 可能被迁移到不同 worker。这带来:
    • CPU cache miss(每次换 worker 都要重新 warm up L1/L2 cache)
    • 可能的内存访问跨 NUMA node(如果服务器有多个 CPU socket)
    • 原子操作在 worker 之间的传播开销
  • current_thread 下,所有 .await 都在同一个线程——cache 永远是热的、内存访问都在同一个 NUMA 域

对于"单个请求延迟敏感"的服务,这个差异可能显著:

  • multi_thread 一个请求的 p50 可能是 80 微秒
  • current_thread 相同逻辑可能 50 微秒

代价是 QPS 降低(单线程不能并行)。但如果你的 QPS 本来不高(比如内部管理服务),current_thread 可能更合适——低延迟 + 简单架构 + 更可预测的调度行为

生产环境实测中有见过把某个尾延迟敏感服务从 multi_thread 改成 current_thread 后,p99 延迟从 1.2 毫秒降到 600 微秒。代价是单个实例吞吐下降一半,但他们是通过**多实例(每核一个进程)**来扩展吞吐的——这种 thread-per-core 架构的典范案例


7.10½ LocalSet 的边界陷阱

LocalSet 虽然强大,但有几个容易踩的陷阱

陷阱 1:LocalSet 不能跨越 .await 边界外

rust
// 错误示范
let local = LocalSet::new();
some_future.await;      // await 会让 LocalSet 暂停
local.run_until(/* ... */).await;   // 这时候 LocalSet 才真正 poll

问题some_future.await 期间 LocalSet 里的任务不会被推进(因为 run_until 还没开始)。如果你在 some_future 里 spawn_local 了任务,它们得等 run_until 开始才会跑

对策:把需要 LocalSet 的代码整个包进 run_until 内部。

陷阱 2:drop LocalSet 前有未完成的 local task

rust
let local = LocalSet::new();
local.run_until(async {
    tokio::task::spawn_local(async { 
        long_running().await 
    });
    // 这里直接 return,但 spawn_local 的 task 还没跑完
}).await;
// LocalSet drop,spawn_local 的 task 被强制 cancel

问题:LocalSet drop 时所有未完成的 local task 都被 cancel 掉。如果你希望它们跑完,需要显式 .await JoinHandle 或用 local.await(等整个 LocalSet 所有 task 完成)。

陷阱 3:嵌套 LocalSet 会"重叠"

rust
let outer = LocalSet::new();
outer.run_until(async {
    let inner = LocalSet::new();
    inner.run_until(async {
        // 这里 spawn_local 进哪个 LocalSet?
        tokio::task::spawn_local(async {});
    }).await;
}).await;

答案:进最内层的 LocalSet——因为 thread-local 被 inner 覆盖了。这通常是你想要的行为,但如果你以为 spawn 到 outer、实际上进了 inner,可能出现 "外层 LocalSet 以为自己做完了、其实还有任务"的错觉。

这类陷阱不常见但存在。生产代码避免嵌套 LocalSet 是最稳的做法


7.10⅔ spawn_local 之外的 !Send 工具

除了 spawn_local,Tokio 还有几个专门为 !Send 场景准备的工具。你可能需要:

LocalSet::spawn_local 的独立版本

LocalSet 上还有一个 spawn_local 方法(不是全局的 tokio::task::spawn_local):

rust
let local = LocalSet::new();
let handle = local.spawn_local(async { /* !Send */ });
local.run_until(handle).await.unwrap();

这个版本不依赖 thread-local——直接绑定到某个具体的 LocalSet。适合在 LocalSet 创建但还没 run 的时候就 spawn 任务。

tokio-util::task::LocalPoolHandle

在一个多线程 runtime 里,如果你需要多个 "!Send 子世界"(每个子世界独立),可以用 LocalPoolHandle(在 tokio-util crate 里):

rust
use tokio_util::task::LocalPoolHandle;

let pool = LocalPoolHandle::new(4);   // 4 个 worker,每个都是 LocalSet 驱动
let handle = pool.spawn_pinned(|| async { /* !Send */ });

原理:内部 pool 跑 N 个专用线程,每个线程上跑一个 LocalSet——任务被分派到其中一个。这是 "thread-per-core + LocalSet" 的 Tokio 官方实现,适合 "!Send 资源 + 需要一定并行度" 的场景。

这些工具你不一定常用,但知道它们存在——遇到 !Send 需求时多一条路径可选


7.11 延伸:tokio-uring 的 current_thread 模型

题外话但值得一提:tokio-uring(io_uring 的 Tokio 子项目)只支持 current_thread runtime

原因:io_uring 的语义要求每个提交队列(SQ)被同一个线程访问——跨线程访问会引入大量同步开销,抵消了 io_uring 的性能优势。

所以 tokio-uring 的代码模式是:

rust
// tokio-uring 风格
tokio_uring::start(async {
    // 这里是 current_thread runtime + io_uring driver
    let file = tokio_uring::fs::File::open("...").await?;
    let (res, buf) = file.read_at(vec![0; 4096], 0).await;
});

每个要做 I/O 的线程自己起一个 tokio-uring 实例。对于"每核一个 worker"架构(类似 monoio)这是自然的分布。

这个细节说明:current_thread runtime 不只是"简化版 multi_thread",它在某些极端场景下反而是唯一选项


7.11¾ LocalSet 的历史和演化

LocalSet 不是从 Tokio 1.0 就存在的——它是 Tokio 0.2 时代(2019 年底)加入的特性,解决了当时一个具体的痛点。

历史背景:Tokio 0.1 时代(基于 futures 0.1),spawn 的 Future 不强制要求 Send——因为那时候 Tokio 0.1 的调度器比较简陋、没有 work-stealing。Tokio 0.2 引入了完整的多线程 scheduler 后,Send 约束变成硬性要求——大量已有代码和 !Send 类型(Rc、RefCell、当时的 hyper 的一些 Body 类型)无法直接 spawn。

社区讨论后,选择不放松 multi_thread 的 Send 约束(放松会失去 work-stealing 的价值),而是引入 LocalSet 作为一个 opt-in 的 escape hatch。这个选择保留了 multi_thread 的性能优势,同时不让 !Send 代码无路可走

LocalSet 的 API 从 0.2 到 1.40 几乎没变——它的设计从第一天就是对的。这是 Tokio 社区在 API 稳定性上惊人的严谨程度的又一体现——一个 API 一旦发布几乎不会变,除非用 tokio_unstable feature 标记为实验性。


一个收尾的视角

到这里本书上半场(第 1-7 章:Runtime 和 Scheduler 子系统)完成。你已经理解了 Tokio 的大脑——决定 "Task 何时、在哪执行"的那整套机制。下半场(第 8-20 章)将深入 Tokio 的感官——I/O Driver、Time Driver、同步原语、channel——那是 Tokio 和真实世界(操作系统、时间、并发通信)打交道的地方。换一个视角,你会发现后半场的每一个组件,其实都是在第 1-7 章建立的那个舞台上唱戏。


下一章我们离开调度器话题、进入 I/O Driver——也就是把 Tokio 和操作系统连起来的那一层。你会看到 mio 库在 Tokio 内部的位置、epoll_wait 调用的封装、一个 TcpStream::read 从 user space 到 kernel space 再回来的完整路径。


7.11½ 一个真实案例:eframe 如何集成 Tokio

为了让这章落地,给一个真实项目的集成例子eframeegui(Rust 生态里最流行的 GUI 库之一)的应用框架,它需要在 GUI 主线程上跑 async 代码——典型的 current_thread + LocalSet 场景。

eframe 的做法:

rust
// 简化自 eframe 风格代码
fn main() {
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    let _guard = runtime.enter();
    let local_set = tokio::task::LocalSet::new();

    eframe::run_native(
        "My App",
        Default::default(),
        Box::new(|cc| {
            Box::new(MyApp::new(cc, local_set, runtime))
        }),
    ).unwrap();
}

impl eframe::App for MyApp {
    fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
        // 每帧推进 LocalSet 一点
        self.local_set.block_on(&self.runtime, async {
            tokio::time::timeout(
                Duration::from_millis(1),   // 最多占用 1ms
                futures::future::pending::<()>(),
            ).await.ok();
        });

        // 正常 egui UI 渲染
        egui::CentralPanel::default().show(ctx, |ui| {
            // ...
        });
    }
}

核心技巧

  • 每帧 1ms 让 LocalSet 推进——给异步任务一点 CPU,但不阻塞 UI 渲染的 60 FPS 节奏
  • UI 逻辑直接用 egui API(非 async)
  • 异步任务(HTTP 请求、文件 I/O)通过 spawn_local 放到 LocalSet

这个模式在 wryiced 等 Rust GUI 框架里都有类似实现。current_thread + LocalSet 是 Rust GUI 异步化的标准答案——值得熟悉。

7.11⅞ 一个小练习:从头写一个极简 current_thread runtime

如果你想在离开本章前加深理解,试着在自己的电脑上写一个极简 current_thread runtime(~80 行),它能跑一个简单的 async fn。

核心骨架(留给你自己补完):

rust
// 骨架:请你自己补完缺失的部分
struct MyRuntime {
    queue: Rc<RefCell<VecDeque<Rc<MyTask>>>>,
}

struct MyTask {
    future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
    queue: Rc<RefCell<VecDeque<Rc<MyTask>>>>,
}

impl MyRuntime {
    fn new() -> Self {
        MyRuntime { queue: Rc::new(RefCell::new(VecDeque::new())) }
    }
    
    fn spawn<F: Future<Output = ()> + 'static>(&self, future: F) {
        let task = Rc::new(MyTask {
            future: RefCell::new(Box::pin(future)),
            queue: self.queue.clone(),
        });
        self.queue.borrow_mut().push_back(task);
    }
    
    fn block_on<F: Future>(&self, mut future: F) -> F::Output {
        // 主循环:
        // 1. poll 顶层 future
        // 2. 如果 Ready,返回
        // 3. 否则 pop 一个任务 poll 一次
        // 4. 如果队列空且顶层未 Ready,怎么办?
        todo!("自己补全")
    }
}

提示:你需要一个 Waker 实现——最简的是让 Waker 把 Task push 回队列。Rust 标准库的 task::Waker::from_raw 是你的朋友。

写完后你会对为什么 Tokio 的 current_thread 是那个结构骨感的理解。花 30 分钟做这个练习,比读一小时书更有价值。


延伸阅读

基于 VitePress 构建