Skip to content

第15章 JoinHandle / JoinSet / AbortHandle:Task 集合管理的三把利刃

"Spawning is easy. Tracking, joining, aborting—that is where the work is." —— 笔者

本章要点

  • JoinHandle 是 Task 的"读端 + 控制端":只持有一个 RawTask(裸指针 + vtable),poll 通过 vtable 调 try_read_output 把 output 从 Cell 里取出来——无泛型穿透是关键
  • JoinSet = IdleNotifiedSet<JoinHandle>:内部双链表(idle / notified)+ 每 entry 一个 my_list 标记——Task 被 wake 时移到 notified 链表、poll_join_next 只看 notified,把"N 个 Task 找哪个 Ready"从 O(N) 降到 O(活跃数)
  • AbortHandle 仅 16 字节:一个 RawTask + vtable,abort() 就是 raw.remote_abort()——非侵入、可 Clone、可跨线程
  • 关键不对称:JoinHandle 持有 output 读取权(drop 后 output 永远丢失),AbortHandle 只持有 abort 权(drop 不影响 Task 生命周期)——权责分离
  • 生产范式:循环里 join_next().await 作为 event loop 驱动、spawn 返回的 AbortHandle 存进 HashMap 做按 key 取消、select! 里 await JoinSet 做并发 + 超时

15.0 为什么需要这三个东西

假设你在写一个 Web 爬虫:主线程负责调度、worker 负责实际抓取。朴素实现是:

rust
// ⚠️ 反面教材
let mut handles = Vec::new();
for url in urls {
    handles.push(tokio::spawn(fetch(url)));
}
for h in handles {
    let result = h.await?;
    process(result);
}

看起来没问题——但致命缺陷有三个

  1. 处理顺序 = spawn 顺序:第一个 url 慢、后面 99 个都被堵在那儿等——即便它们早就完成了;
  2. 不能提前退出:如果用户 Ctrl-C、你没办法在 Future::await 中途 abort 所有 handles——必须等当前 await 的那个先完成;
  3. 无法按需动态增删:爬到一半发现要加 10 个 url、Vec 已经在 for 循环里被消耗,新加的等不到统一 await。

JoinHandle 解决单个 Task 的 output + abort,但管不了"N 个 Task 哪个先好拿哪个"。JoinSet 就是为这个场景而生的——它内部用 IdleNotifiedSet 维护一个按完成顺序交付的 Task 池。AbortHandle 再进一步:它让你不持有 JoinHandle 也能取消 Task——对于不关心 output、只关心能否 kill 的场景(超时、任务取消、graceful shutdown),这是刚需。

这三个东西就是 Tokio 把"Task 生命周期管理"做成一等公民的工具链。本章一个一个拆。

15.1 JoinHandle 的解剖:16 字节如何承载一切

翻开 tokio/src/runtime/task/join.rs

rust
pub struct JoinHandle<T> {
    raw: RawTask,
    _p: PhantomData<T>,
}

一个 RawTask,一个幽灵类型。RawTask 内部就是 NonNull<Header>——8 字节裸指针。加上 PhantomData 不占空间——整个 JoinHandle 在 64 位机器上就是 8 字节(加上 Drop glue 的对齐可能填到 16)。

为什么能这么简洁?关键在于vtable——我们在第 6 章(Task)讲过:每个 Task 的 Header 里存了一个指向 Vtable 的静态引用,Vtable 里有 try_read_outputremote_abortschedule 等函数指针。JoinHandle 需要什么操作,都通过 vtable 去 dispatch——不需要知道泛型 T 的具体类型、也不需要额外字段

poll 的神奇之处

JoinHandle: Future<Output = Result<T, JoinError>>,它的 poll 实现是这样:

rust
let mut ret = Poll::Pending;
// Try to read the task output. If the task is not yet complete, the waker
// is stored and is notified once the task does complete.
unsafe {
    self.raw
        .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
}

&mut Poll<Result<T, JoinError>> 强转成 *mut ()(类型擦除)、传给 vtable 里的 try_read_output。vtable 那头的函数在编译期就是针对具体 T 实例化过的——它知道怎么把 Task 的 stage(Cell<Stage<T>>)里的值写回这个指针。这就是 Rust 的"monomorphization + vtable"组合拳:在 JoinHandle 这层做零泛型穿透、在 Task 创建时把类型信息固化到 vtable——编译期 + 运行期各自付出最小代价

这个技巧在整个 Tokio 里反复出现:凡是需要"类型擦除的句柄"的地方(BoxFutureAbortHandleJoinHandle)——都用 vtable + *mut () 搞定。

abort 和 is_finished

rust
pub fn abort(&self) {
    self.raw.remote_abort();
}
pub fn is_finished(&self) -> bool {
    let state = self.raw.header().state.load();
    state.is_complete()
}

两个方法、三行代码。abort 通过 vtable 的 remote_abort——它做的事在第 6 章已经讲过:CAS 地给 state 置 CANCELLED 位、如果 Task 还在 run queue 就把它 pop 出来——不等 Task 当前 poll 结束、下一次 schedule 时直接按 cancelled 分支走。is_finished 只读 state 的 COMPLETE 位。

JoinHandle 的哲学不持有数据、只持有一个指针 + 一套规定的操作。所有重活在 Task Header / Vtable 那边。这也是为什么 Clone for JoinHandle 不存在——output 只能被读一次try_read_output 内部用 CAS 保证),如果允许 Clone 就得多维护一份读状态、得不偿失。真要共享,用 tokio::sync::oneshot 手写一个广播层。

15.2 AbortHandle:只要权利、不要责任

和 JoinHandle 并肩的,是 tokio/src/runtime/task/abort.rs 里的 AbortHandle:

rust
pub struct AbortHandle {
    raw: RawTask,
}

比 JoinHandle 还简洁——连 PhantomData 都没有,因为它不关心 output 类型

  • abort():同 JoinHandle,走 vtable remote_abort
  • is_finished():读 state;
  • id()(unstable):从 Header 读 Task ID;
  • impl Clone:可以!因为 abort 语义是幂等的——abort 一个已经 abort 或已经 complete 的 Task 是 no-op。

和 JoinHandle 的不对称

这里有一个设计决策的精华

维度JoinHandleAbortHandle
能读 output
能 abort
能 Clone
Drop 影响 Task❌(detach)❌(什么都不做)
存在多个?不可以任意多个

为什么 JoinHandle 不能 Clone 但 AbortHandle 可以?根本原因在于"读 output"是一次性的——Task 的 stage 里的 T 一旦被读走、原始位置就是空的。如果两份 JoinHandle 同时 await,必须引入"哪个先到谁读"的协调机制、代码就复杂了;而 abort 是"设置一个 flag"——完全可以幂等 + 并发安全

Drop 语义

JoinHandle::drop——Task 被 detach:Task 继续在 runtime 里跑、output 被 drop 到"无人接收"的虚空中。这和 std::thread::JoinHandle 完全相反(后者 drop 会 join、阻塞当前线程)。Tokio 这么设计的理由:Task 是轻量的、detach 成本低、而"drop 时阻塞"的语义在 async 环境下会引发死锁(await 当前 Task 等另一个 Task……)。

AbortHandle::drop——什么都不做。丢掉 abort 权利而已、Task 生命周期不受影响。

15.3 JoinSet:一组 Task 的事件循环

现在真正的主角上场。打开 tokio/src/task/join_set.rs

rust
pub struct JoinSet<T> {
    inner: IdleNotifiedSet<JoinHandle<T>>,
}

一个字段。所有复杂度藏在 IdleNotifiedSet 里——这是本章最值得花时间的一块。

先看 JoinSet 的对外 API(节选):

rust
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
where
    F: Future<Output = T> + Send + 'static,
    T: Send,
{ /* ... */ }

pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
    crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

pub fn poll_join_next(&mut self, cx: &mut Context<'_>)
    -> Poll<Option<Result<T, JoinError>>>
{ /* 核心实现 */ }

pub fn abort_all(&mut self);
pub fn detach_all(&mut self);
pub async fn shutdown(&mut self);
pub async fn join_all(self) -> Vec<T>;

语义直白

  • spawn 把 Future 丢进 runtime、返回 AbortHandle;
  • join_next 等"任一" Task 完成;
  • abort_all 一键取消所有;
  • shutdown 取消 + 等待全部退出;
  • join_all任意顺序收集所有 output。

关键是 join_next 的效率——朴素做法是"轮询 N 个 Future,谁 ready 返回谁"(就像 select! 那样),O(N) 的每次 poll。JoinSet 做到了 O(活跃数)——具体如何?下一节揭秘。

15.4 IdleNotifiedSet:双链表 + Wake 回调的精妙

tokio/src/util/idle_notified_set.rs 是整个 JoinSet 的心脏:

rust
pub(crate) struct IdleNotifiedSet<T> {
    lists: Arc<Lists<T>>,
    length: usize,
}

struct ListsInner<T> {
    notified: LinkedList<T>,
    idle: LinkedList<T>,
    waker: Option<Waker>,
}

两条链表

  • idle:存"当前无事可做、在等 Waker"的 Task(包装 JoinHandle);
  • notified:存"被 wake 过、可能有进展"的 Task。

每个 entry 还带一个 my_list 字段(UnsafeCell<List>),取值 Notified / Idle / Neither(已移出)。

Wake 实现:原子搬家

每个 entry 会 impl Wake——当 Task 被 wake 时:

rust
fn wake_by_ref(me: &Arc<Self>) {
    let mut lock = me.parent.lock();
    let old_my_list = me.my_list.with_mut(|ptr| unsafe {
        let old_my_list = *ptr;
        if old_my_list == List::Idle {
            *ptr = List::Notified;
        }
        old_my_list
    });
    if old_my_list == List::Idle {
        let me = unsafe {
            lock.idle.remove(ListEntry::as_raw(me)).unwrap()
        };
        lock.notified.push_front(me);
    }
}

步骤

  1. 拿 parent 的 mutex;
  2. CAS 把 my_list 从 Idle 改成 Notified;
  3. 如果原来是 Idle:从 idle 链表 O(1) 摘除、push 到 notified 链表头

为什么要在 wake 里做这个搬家?因为 JoinSet 的 poll_join_next 一会儿要只看 notified 链表——不用遍历所有 Task、只看被 wake 过的那批。原本 N 个 Task 的 O(N) 扫描,降到 O(notified 数)——而大多数时候 notified 只有几个(比如 100 个网络请求、每次批量回来 2-3 个)。

poll_join_next 的核心

JoinSet::poll_join_next 大致是:

rust
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<...>> {
    // 1. 存 waker,供 wake 时通知
    self.inner.set_waker_if_notified_empty(cx.waker());
    // 2. 从 notified 链表取一个
    loop {
        let entry = self.inner.pop_notified();
        match entry {
            None => return if self.inner.is_empty() {
                Poll::Ready(None)  // 所有 task 都完成
            } else {
                Poll::Pending  // 还有 idle task 等
            },
            Some(mut entry) => {
                // 3. poll 这一个 Task 的 JoinHandle
                let mut handle = entry.get_mut();
                match Pin::new(&mut handle).poll(cx) {
                    Poll::Ready(r) => {
                        entry.remove();  // 完成了,从 set 里去掉
                        return Poll::Ready(Some(r));
                    }
                    Poll::Pending => {
                        // 没好,放回 idle 等下次 wake
                        entry.move_to_idle();
                    }
                }
            }
        }
    }
}

三件事

  1. 存 waker(供 Wake 实现从 ListsInner 里唤醒);
  2. 从 notified 链表捞一个候选;
  3. 真的 poll 它的 JoinHandle——Ready 就返回,Pending 就送回 idle。

这是一个典型的"reactor pattern":不主动轮询、只处理通知过的事件源。和 epoll/kqueue 的哲学一脉相承——第 8 章 I/O Driver 讲过。

15.5 ListEntry:一个 entry 如何同时是 Task 的 Waker

这里有个极精妙的细节:每个 entry 既是 JoinHandle 的容器、也是那个 Task 的 Waker

为什么?考虑 Task 的生命周期:

  • Task 本身由 runtime 调度、会生成 output;
  • Task 完成时会通过 JoinHandle 内部存的 waker 通知等待方;
  • 在 JoinSet 里,等待方就是 poll_join_next

JoinSet 巧妙地:把每个 entry 自己作为 waker 传给 JoinHandle。Task 完成时调这个 waker → entry 的 wake_by_ref 触发 → entry 从 idle 链表搬到 notified 链表 → poll_join_next 下次被唤醒时直接看到它。

没有额外的 HashMap<TaskId, Waker>没有全局通知管道所有协调都靠 Arc<ListEntry> 本身的 Wake impl——这是 Rust 类型系统 + 侵入式链表的组合威力。

15.6 abort_all vs detach_all vs shutdown

三个方法看着像、语义却截然不同:

  • abort_all():遍历所有 entry、对每个 raw.remote_abort()——标记取消但不等。Task 下次 schedule 时沿 cancelled 分支退出。之后 join_next() 会收到 Err(JoinError::cancelled(...))
  • detach_all()清空 JoinSet 但不 abort——Task 继续在 runtime 里跑、output 被 drop。用于"我不管了、让它们自生自灭";
  • shutdown().await:= abort_all() + 循环 join_next().await 直到 None。这是唯一的"保证 Task 真的退出"的方法,其他两个都是"fire and forget"。

生产常见错误:graceful shutdown 里只调了 abort_all() 然后 drop JoinSet——有些 Task 可能还没真的执行到 cancelled 分支、但 JoinSet 已经没了、再也没人能观察它们——它们会占着 runtime 的 worker 直到 natural 退出。正确做法是用 shutdown().await

15.7 和 select! 的组合:最常见的 event loop

rust
let mut set = JoinSet::new();
for url in urls {
    set.spawn(fetch(url));
}

loop {
    tokio::select! {
        Some(result) = set.join_next() => {
            process(result);
        }
        _ = shutdown.changed() => {
            set.shutdown().await;
            break;
        }
    }
}

这是一段教科书级的"并发 + 可取消 + 优雅关停"代码

  • JoinSet 负责"先完成先处理"——避免 HOL blocking;
  • select! 把关停信号和 Task 事件并置——取消触发后 shutdown().await 保证所有 Task 真退出
  • Some(result) = set.join_next() 的模式匹配——当 set 为空join_next 返回 None)时分支 disabled、不会死循环 panic。

这个 pattern 在 hyper、axum、tower 的源码里随处可见。

15.8 AbortHandle 的惯用法:按 key 取消

JoinHandle 不能 Clone、JoinSet 只能按完成顺序处理——那"按业务 key 取消特定 Task"怎么做?AbortHandle 上场

rust
let mut abort_handles: HashMap<UserId, AbortHandle> = HashMap::new();

// 启动某用户的 Task
let h = tokio::spawn(long_running(user_id));
abort_handles.insert(user_id, h.abort_handle());

// 某时刻取消
if let Some(h) = abort_handles.remove(&user_id) {
    h.abort();
}

JoinHandle 可以通过 abort_handle() 生成一个 AbortHandle——两者不共享状态、各自独立。JoinHandle 留给原 spawn 点 await output,AbortHandle 扔到 HashMap 里做远程杀手。权责分离的完美体现

15.9 真实源码的几个"读代码陷阱"

读 join_set.rs 的源码,有几个容易绕晕的地方先打预防针:

陷阱 1:idle 和 notified 都是 unsafe 侵入式链表

LinkedList<T> 在 Tokio 里是侵入式链表(entry 自带 prev/next 指针)——不是 std 的那个。理由在第 6 章讲过:零堆分配、O(1) 移除指定节点(stdLinkedList 需要 *mut Node 才能 O(1) 删)。代价是所有链表操作都 unsafe——Tokio 在这块的 safety invariants 有 200 行 comment 在源码里。

陷阱 2:entry.move_to_idle 不是移动内存、是改链表指针

entry 本身是 Arc<ListEntry<T>>——它的堆上位置永远不变。move_to_idle 只是:从当前链表 unlink、重新 link 到 idle 链表。Arc 保证多方引用(wake 回调、JoinSet 本身、可能的外部 AbortHandle)共享同一块内存

陷阱 3:JoinSet 自己不是 Future

你不能 set.await——JoinSet 没有 impl Future。要用 set.join_next().await——每次 await 只拿一个。如果要一次性等所有,用 set.join_all().await(Tokio 1.38+ 才有)或者循环 while let Some(r) = set.join_next().await {}

15.10 性能特征:IdleNotifiedSet vs 朴素 FuturesUnordered

futures::stream::FuturesUnordered 做同样的事——管理 N 个 Future、按完成顺序交付。对比一下:

维度JoinSet (Tokio)FuturesUnordered (futures)
底层机制IdleNotifiedSet (Tokio 定制)自己的 ready queue + linked list
Waker 开销每 entry 1 个 Arc<ListEntry>每 entry 1 个 Arc<Node> + Waker vtable
并发安全Mutex 保护 listsLockfree ready queue
是否限 Tokio是(spawn 依赖 runtime)否(纯 futures)
Task 取消abort() 原生支持得 drop 整个 stream

选型:如果你在 Tokio 里、Task 需要独立 spawn 到 runtime(比如不同 worker thread)、需要 abort——用 JoinSet。如果你只是有一堆 Future 要并发跑、不需要 spawn——FuturesUnordered 更轻(不占 runtime task 配额)。

15.10½ 一个血泪生产事故:JoinSet 里的 Task 被"看不见地"泄漏

讲个真实故事。2023 年某次上线后,线上某个服务的 RSS 开始稳步增长——每天涨 200 MB、两周后 OOM。profiling 发现堆上积累了几十万个 Box<dyn Future>。代码审查一遍又一遍都没找到问题。

最后发现:有段代码大致是这样——

rust
let mut set = JoinSet::new();
for batch in incoming_requests() {
    set.spawn(process(batch));
    if let Some(result) = set.try_join_next() {
        handle(result);
    }
}

看出问题了吗try_join_next()非阻塞的——只有当有 Task 已完成时才返回 Some。如果请求来得比 Task 完成得快,JoinSet 里的 Task 会越积越多。loop 退出后(比如 incoming 关闭),JoinSet 里还有一堆未处理的 Task、它们的 output 再也没人读——detach 了就丢失

修复:循环退出前一定要 while let Some(r) = set.join_next().await { handle(r); } 把剩下的清空、或者至少 set.shutdown().await

这个坑的教训:JoinSet 不是"无限容器"——它的 size 和 incoming 速率应该有明确的 backpressure 关系。否则一旦 consumer 慢于 producer,JoinSet 就成了一个无限增长的隐式队列

Tokio 1.37+ 的补救:spawn 自动施压

Tokio 官方注意到这个问题,在 1.37+ 为 JoinSet::spawn 默认启用了 tracing——如果你的 tracing-subscriber 装上了 tokio-console一眼就能看到某个 JoinSet 的 task 数量不正常。这不是根治、但大幅缩短了发现时间。根治还是得在业务层加 set.len() 上限检查。

15.10¾ IdleNotifiedSet 为什么不用 lockfree 队列

细心的读者会问:ListsInner 外面套了个 Mutex——为什么不用 Tokio 其他地方常用的 lockfree 队列(比如 concurrent_queue)?

答案是复杂度不值得。每个 Task wake 时需要原子完成"判断 + 从链表 A 移除 + 加入链表 B + 更新 my_list"——这是多步操作,lockfree 实现要么用 hazard pointer、要么用 RCU、要么双倍数据结构——代码量翻 5 倍、bug 面翻 10 倍。而 wake 本身频率有限(每 Task 每次从 Pending → Ready 才触发一次)、Mutex 又只保护链表指针(不跨 poll),竞争窗口极短(几十纳秒)——实测 Mutex 开销远小于 lockfree 带来的复杂度。

Tokio 代码里有个经验法则热路径(每次 poll 都走)必须 lockfree(如 mpsc 的发送链),温路径(wake + Task 切换)可以 Mutex(如这里),冷路径(Task 创建、shutdown)用 Mutex 甚至 RwLock 都可以。这种"按访问频率分配同步原语"的工程判断,是区分"能用"和"好用"代码的关键。

另一个有趣对比:Java 的 CompletableFuture.allOf

Java 的 CompletableFuture.allOf(futs) 返回一个等所有 future 完成的 future,功能和 JoinSet::join_all 类似。但 JVM 的实现走完全不同的路径——每个 CompletableFuture 内部维护一个回调链表stack 字段,CAS push),任一 future 完成时沿链表反向触发 allOf 的 counter 递减。没有双链表、没有 idle/notified 之分——因为 JVM 有 GC、CompletableFuture 不关心"被 detach 后的资源何时释放"。Tokio 做不到 GC,所以必须显式地把未完成的 Task 串在链表上、显式地 detach、显式地释放——双链表本质上是"手动 GC 的分代"。

看清这一点之后你就会发现:Rust async 运行时的所有"复杂数据结构",绝大多数都是在补 GC 的位。理解了这个大背景,再回头看 IdleNotifiedSet、OwnedTasks、RunQueue——它们的复杂度就不再是"故意炫技",而是在无 GC 环境下保持正确性 + 高性能的必要代价

15.10⅞ JoinSet 的一个边界:spawn 过程中 JoinSet 自己 drop 会怎样

还有个少人关注的边界:如果 set.spawn(fut) 已经把 Task 注册到 runtime、但 set.drop 在 Task 真正开始 poll 之前发生——会泄漏吗?

答案是不会,但 Task 会被 abort。drop 的路径里 JoinSet 会对每个 entry 调 remote_abort()——相当于对所有 Task 做隐式取消。这和 FuturesUnordered::drop 的语义一致,但和原生 tokio::spawn 不同(后者 drop JoinHandle 只是 detach、不 abort)。

这个差异常常让人踩坑:有人以为"set.spawn 之后立刻扔掉 set"和"tokio::spawn 之后扔掉 JoinHandle"是等价的——其实前者会把所有 Task 干掉、后者让 Task 继续跑。生产里要持续 spawn 又不想被 abort 的场景、只能用裸 tokio::spawn + 另存 JoinHandle、或者让 JoinSet 存活到 Task 自然退出。

15.11 和其他书的呼应

Vue 3 设计与实现》第 15 章讲过 Vue 的调度器用 Promise.resolve().then() 作为 microtask queue——把多个待更新的 component effect 收集到一个队列、一次 flush。JoinSet 的 idle/notified 双链表思路异曲同工——都是"收集 → 批量处理"。区别在于 Vue 的更新是同步的(浏览器单线程),JoinSet 是并发的(多 worker)。

Rust 编译器与运行时揭秘》第 9 章讲过 Arc 的 weak count 机制——JoinSet 里 Arc<ListEntry> 的多方持有(JoinSet、runtime 内部的 Waker、用户持有的 AbortHandle),如果设计不当就会循环引用泄漏。Tokio 的做法是:entry 只持有 RawTask 的"非所有权指针"——Task 的生命周期由 runtime 的 OwnedTasks 管理,entry 只是观察者。这和那本书里讲的"Arc + RawWaker 避免循环"是同一个套路。

vLLM 源码剖析里的Scheduler 把 Request 分 waiting / running / swapped 三个队列——和 IdleNotifiedSet 的 idle / notified 异曲同工。**"按状态分桶、批量推进"**是所有高性能调度器的共同语言——无论它调度的是 Future、GPU 推理 Batch、还是 OS 进程。

15.12 本章小结

带走三件事:

  1. JoinHandle / AbortHandle 都是 "一个 RawTask + vtable" 的薄壳——类型擦除 + 单态化 vtable 让 16 字节承载一切,Rust"零成本抽象"的教科书级演示
  2. JoinSet 的性能关键在 IdleNotifiedSet——双链表 + Wake 回调做"只扫描被唤醒的 Task"——把 O(N) 降到 O(活跃数)。这是 reactor pattern 在 Task 层的体现
  3. 三者的权责分离——JoinHandle 读 output(不可 Clone)、AbortHandle 远程取消(可 Clone)、JoinSet 批量管理——各司其职、组合起来覆盖 99% 的 Task 管理场景

下一章进入 spawn_blockingblock_in_place——Tokio 如何优雅地把"阻塞代码"塞进 async 世界。你会看到 runtime 如何切换线程池、block_in_place 的"偷走当前 worker"骚操作、以及为什么一个 10 毫秒的 CPU 密集任务能把整个 runtime 卡死。


延伸阅读

基于 VitePress 构建