Tokio 源码深度解析

第15章 JoinHandle / JoinSet / AbortHandle:Task 集合管理的三把利刃

作者 杨艺韬 · 10,291 字

第15章 JoinHandle / JoinSet / AbortHandle:Task 集合管理的三把利刃

本章要点

  • JoinHandle 是 Task 的”读端 + 控制端”:只持有一个 RawTask(裸指针 + vtable),poll 通过 vtable 调 try_read_output 把 output 从 Cell 里取出来——无泛型穿透是关键
  • JoinSet = IdleNotifiedSet<JoinHandle>:内部双链表(idle / notified)+ 每 entry 一个 my_list 标记——Task 被 wake 时移到 notified 链表、poll_join_next 只看 notified,把”N 个 Task 找哪个 Ready”从 O(N) 降到 O(活跃数)
  • AbortHandle 仅 16 字节:一个 RawTask + vtable,abort() 就是 raw.remote_abort()——非侵入、可 Clone、可跨线程
  • 关键不对称:JoinHandle 持有 output 读取权(drop 后 output 永远丢失),AbortHandle 只持有 abort 权(drop 不影响 Task 生命周期)——权责分离
  • 生产范式:循环里 join_next().await 作为 event loop 驱动、spawn 返回的 AbortHandle 存进 HashMap 做按 key 取消、select! 里 await JoinSet 做并发 + 超时

15.0 为什么需要这三个东西

在 Tokio 之前的多任务管理工具——Go goroutine 有 sync.WaitGroup、Java CompletableFuture 有 allOf、Python asyncio 有 gather——每种语言都在解决同样的问题:怎么同时管理一批异步任务? Tokio 的答案是三件套:JoinHandle(单任务句柄)、AbortHandle(只能取消的句柄)、JoinSet(任务集合)。每一种应对不同场景。本章把这三个的用法、实现、性能讲透——读完你可以自信地回答”在这个场景我该用哪个”。

为什么不设计一个”通用全能句柄”?——这是 Rust API 设计里常见的纠结点。Tokio 选择了”按职责拆分”的路线:需要读结果就给你 JoinHandle(带类型参数 T)、只需要 abort 就给你 AbortHandle(16 字节、无类型参数、更轻)。这种”按需提供能力”的 API 设计让用户付出的成本和拿到的能力严格匹配——不会为了一个你不需要的能力付出额外内存或认知负担。这种克制在 Rust 生态里反复出现、是”零成本抽象”哲学在 API 层面的体现。

假设你在写一个 Web 爬虫:主线程负责调度、worker 负责实际抓取。朴素实现是:

// ⚠️ 反面教材
let mut handles = Vec::new();
for url in urls {
    handles.push(tokio::spawn(fetch(url)));
}
for h in handles {
    let result = h.await?;
    process(result);
}

看起来没问题——但致命缺陷有三个

  1. 处理顺序 = spawn 顺序:第一个 url 慢、后面 99 个都被堵在那儿等——即便它们早就完成了;
  2. 不能提前退出:如果用户 Ctrl-C、你没办法在 Future::await 中途 abort 所有 handles——必须等当前 await 的那个先完成;
  3. 无法按需动态增删:爬到一半发现要加 10 个 url、Vec 已经在 for 循环里被消耗,新加的等不到统一 await。

JoinHandle 解决单个 Task 的 output + abort,但管不了”N 个 Task 哪个先好拿哪个”。JoinSet 就是为这个场景而生的——它内部用 IdleNotifiedSet 维护一个按完成顺序交付的 Task 池。AbortHandle 再进一步:它让你不持有 JoinHandle 也能取消 Task——对于不关心 output、只关心能否 kill 的场景(超时、任务取消、graceful shutdown),这是刚需。

这三个东西就是 Tokio 把”Task 生命周期管理”做成一等公民的工具链。本章一个一个拆。

15.1 JoinHandle 的解剖:16 字节如何承载一切

JoinHandle 是 Tokio 里每次 spawn 都会返回的小对象——看起来平平无奇、但内部紧凑到极致。16 字节里要装下:指向 Task 的 raw 指针、Task 状态位、PhantomData 类型标记。这种”小对象大功能”设计是 Rust 类型系统 + Tokio 优化的典型体现——你在自己项目里写类似”任务句柄”结构时可以直接参照。

翻开 tokio/src/runtime/task/join.rs

pub struct JoinHandle<T> {
    raw: RawTask,
    _p: PhantomData<T>,
}

一个 RawTask,一个幽灵类型。RawTask 内部就是 NonNull<Header>——8 字节裸指针。加上 PhantomData 不占空间——整个 JoinHandle 在 64 位机器上就是 8 字节(加上 Drop glue 的对齐可能填到 16)。

为什么能这么简洁?关键在于vtable——我们在第 6 章(Task)讲过:每个 Task 的 Header 里存了一个指向 Vtable 的静态引用,Vtable 里有 try_read_outputremote_abortschedule 等函数指针。JoinHandle 需要什么操作,都通过 vtable 去 dispatch——不需要知道泛型 T 的具体类型、也不需要额外字段

poll 的神奇之处

JoinHandle 实现 Future trait——这意味着你可以直接对它 .await。这个看似理所当然的设计背后有一个精妙细节:JoinHandle 的 poll 实际上是在访问 Task Trailer 里的通信槽、读 Task 执行结果。如果 Task 已经完成、poll 立刻返回 Ready;还没完成、就把当前 Waker 存进 Task、返回 Pending;Task 完成时会唤醒这个 Waker。整个机制和 oneshot channel 的 receiver 极其相似——事实上 Tokio 作者确认这就是一个专用的 oneshot channel

这个”JoinHandle 本质是 oneshot”的观察让你理解很多看似不连贯的行为:为什么 JoinHandle 被 drop 不影响 Task(oneshot sender 不关心 receiver 是否还在)?为什么 JoinHandle 不能 Clone(oneshot 一次消费)?为什么 JoinHandle 的 poll 是 cancel-safe 的(oneshot 的语义就是)?——理解了底层就是 oneshot、这些行为全都是自然推论,不需要记忆。**Tokio 源码大量这样的”表面不同、底层同源”**设计——认出同源关系是看懂源码的重要技能。

JoinHandle: Future<Output = Result<T, JoinError>>,它的 poll 实现是这样:

let mut ret = Poll::Pending;
// Try to read the task output. If the task is not yet complete, the waker
// is stored and is notified once the task does complete.
unsafe {
    self.raw
        .try_read_output(&mut ret as *mut _ as *mut (), cx.waker());
}

&mut Poll<Result<T, JoinError>> 强转成 *mut ()(类型擦除)、传给 vtable 里的 try_read_output。vtable 那头的函数在编译期就是针对具体 T 实例化过的——它知道怎么把 Task 的 stage(Cell<Stage<T>>)里的值写回这个指针。这就是 Rust 的”monomorphization + vtable”组合拳:在 JoinHandle 这层做零泛型穿透、在 Task 创建时把类型信息固化到 vtable——编译期 + 运行期各自付出最小代价

这个技巧在整个 Tokio 里反复出现:凡是需要”类型擦除的句柄”的地方(BoxFutureAbortHandleJoinHandle)——都用 vtable + *mut () 搞定。

abort 和 is_finished

abortis_finished 是 JoinHandle 除 poll 外最常用的两个方法。这两个方法让 JoinHandle 不仅是”等结果的通道”、还是”控制任务的遥控器”——你能在不 await 的前提下主动介入任务的生命周期abort 是”请求取消”——它不保证任务立刻停、只保证任务在下次 poll 时得到 Cancelled 状态、协作式地退出。is_finished非阻塞的快照查询——用于”任务是否已经完成”这种判断、不需要 await。这两个方法组合起来能写出很多精妙模式:**“查状态 + 按需取消的健康检查逻辑、超时兜底取消”**的 graceful timeout 等。

pub fn abort(&self) {
    self.raw.remote_abort();
}
pub fn is_finished(&self) -> bool {
    let state = self.raw.header().state.load();
    state.is_complete()
}

两个方法、三行代码。abort 通过 vtable 的 remote_abort——它做的事在第 6 章已经讲过:CAS 地给 state 置 CANCELLED 位、如果 Task 还在 run queue 就把它 pop 出来——不等 Task 当前 poll 结束、下一次 schedule 时直接按 cancelled 分支走。is_finished 只读 state 的 COMPLETE 位。

JoinHandle 的哲学不持有数据、只持有一个指针 + 一套规定的操作。所有重活在 Task Header / Vtable 那边。这也是为什么 Clone for JoinHandle 不存在——output 只能被读一次try_read_output 内部用 CAS 保证),如果允许 Clone 就得多维护一份读状态、得不偿失。真要共享,用 tokio::sync::oneshot 手写一个广播层。

15.2 AbortHandle:只要权利、不要责任

AbortHandle 是 Tokio 0.3 之后新增的 API——一个 Task 的”半截”句柄:能 abort 但不能 await 结果。为什么要这个东西?考虑这个场景:你 spawn 了 100 个后台任务、想在某个事件触发时一次性全部取消——但你并不关心每个任务的返回值。如果用 JoinHandle、你必须为每个任务都持有它、浪费内存;用 AbortHandle 只需要 16 字节(无 Output 类型参数)、内存占用减半。这种”给用户只需要的能力、不强加额外负担”是成熟 API 设计的典范。

和 JoinHandle 并肩的,是 tokio/src/runtime/task/abort.rs 里的 AbortHandle:

pub struct AbortHandle {
    raw: RawTask,
}

比 JoinHandle 还简洁——连 PhantomData 都没有,因为它不关心 output 类型

  • abort():同 JoinHandle,走 vtable remote_abort
  • is_finished():读 state;
  • id()(unstable):从 Header 读 Task ID;
  • impl Clone:可以!因为 abort 语义是幂等的——abort 一个已经 abort 或已经 complete 的 Task 是 no-op。

和 JoinHandle 的不对称

AbortHandle 和 JoinHandle 有刻意的不对称——前者没有 poll、不能 await、不能读结果。这种”减法设计”让 AbortHandle 的 API 表面极小、不会给用户错误的 mental model。如果 AbortHandle 能 poll、用户可能误以为它和 JoinHandle 等价、在不需要结果的场景上也用上。给用户最小必要的接口、不给多余的能力——这种克制是 API 设计成熟的标志。

这里有一个设计决策的精华

维度JoinHandleAbortHandle
能读 output
能 abort
能 Clone
Drop 影响 Task❌(detach)❌(什么都不做)
存在多个?不可以任意多个

为什么 JoinHandle 不能 Clone 但 AbortHandle 可以?根本原因在于”读 output”是一次性的——Task 的 stage 里的 T 一旦被读走、原始位置就是空的。如果两份 JoinHandle 同时 await,必须引入”哪个先到谁读”的协调机制、代码就复杂了;而 abort 是”设置一个 flag”——完全可以幂等 + 并发安全

Drop 语义

JoinHandle 被 drop 时会怎样?这是一个常常被忽视但极其重要的问题——也是很多团队在生产里踩过的坑。答案是:drop JoinHandle 并不会取消 Task——Task 继续在后台跑。这个行为和很多人的直觉相反、但它其实是 Tokio 最合理的选择——任务一旦独立地 spawn、应该有独立的生命周期、不被句柄的生命周期捆绑。否则你每次 spawn 都得小心翼翼地把 JoinHandle 保留到任务结束——违背”spawn 就忘了”这种习惯用法。这是 Tokio 和 Go goroutine 的共通哲学——任务一旦 spawn、就有自己的生命、不依赖句柄。这个语义对某些用户来说反直觉:他们以为 drop JoinHandle 就像 drop Future 一样会取消——但这是两个不同概念(Future 的 drop 确实取消、JoinHandle 是指向已 spawn Task 的独立句柄、drop 它只是放弃读 Task 结果的能力)。

JoinHandle::drop——Task 被 detach:Task 继续在 runtime 里跑、output 被 drop 到”无人接收”的虚空中。这和 std::thread::JoinHandle 完全相反(后者 drop 会 join、阻塞当前线程)。Tokio 这么设计的理由:Task 是轻量的、detach 成本低、而”drop 时阻塞”的语义在 async 环境下会引发死锁(await 当前 Task 等另一个 Task……)。

AbortHandle::drop——什么都不做。丢掉 abort 权利而已、Task 生命周期不受影响。

15.3 JoinSet:一组 Task 的事件循环

JoinSet 是 Tokio 0.3 之后推出的爆款 API——几乎所有 “并发执行 N 个独立任务、逐个处理结果”的场景都用它。在 JoinSet 出现前、这类场景要用 FuturesUnordered + 手工 spawn + 手工 manage——代码量大、还容易 leak task。JoinSet 把这些全部封装成一个简单的 API:spawn + join_next + `drop 自动清理”——用户写两行代码就能并发跑 N 个任务且安全清理。这是”Tokio 把高频模式工具化”的优秀范例。

现在真正的主角上场。打开 tokio/src/task/join_set.rs

pub struct JoinSet<T> {
    inner: IdleNotifiedSet<JoinHandle<T>>,
}

一个字段。所有复杂度藏在 IdleNotifiedSet 里——这是本章最值得花时间的一块。

先看 JoinSet 的对外 API(节选):

pub fn spawn<F>(&mut self, task: F) -> AbortHandle
where
    F: Future<Output = T> + Send + 'static,
    T: Send,
{ /* ... */ }

pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
    crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

pub fn poll_join_next(&mut self, cx: &mut Context<'_>)
    -> Poll<Option<Result<T, JoinError>>>
{ /* 核心实现 */ }

pub fn abort_all(&mut self);
pub fn detach_all(&mut self);
pub async fn shutdown(&mut self);
pub async fn join_all(self) -> Vec<T>;

语义直白

  • spawn 把 Future 丢进 runtime、返回 AbortHandle;
  • join_next 等”任一” Task 完成;
  • abort_all 一键取消所有;
  • shutdown 取消 + 等待全部退出;
  • join_all任意顺序收集所有 output。

关键是 join_next 的效率——朴素做法是”轮询 N 个 Future,谁 ready 返回谁”(就像 select! 那样),O(N) 的每次 poll。JoinSet 做到了 O(活跃数)——具体如何?下一节揭秘。

15.4 IdleNotifiedSet:双链表 + Wake 回调的精妙

IdleNotifiedSet 是 JoinSet 的核心数据结构——它解决的核心问题是:“JoinSet 里有 1000 个 Task、如何高效找出”谁 Ready 了”?“。朴素做法是轮询所有 Task 的 poll——复杂度 O(N)、在大 N 时很慢。IdleNotifiedSet 用两个双链表 idle / notified:默认在 idle 链表、被 wake 时 O(1) 移到 notified 链表、join_next 时 O(1) 从 notified 链表取。这种”用链表挪动替代重复 poll”是并发集合优化的经典技巧。

类似的设计在 Linux 内核里叫 “ready list——epoll 内部也是用双链表管理:所有注册 fd 在 idle list、OS 事件到达时 fd 移到 ready list、epoll_wait 从 ready list 取就绪 fd。Tokio JoinSet 本质上是把 epoll 的这套机制搬到了 Task 层面——让”等一组 task 谁完成”变成”等一组 fd 谁可读”一样高效。这种跨层次的模式复用反映了系统编程里的一个深层道理:ready-queue 模式对”大量等待者 + 事件驱动唤醒”场景是最优的

tokio/src/util/idle_notified_set.rs 是整个 JoinSet 的心脏:

pub(crate) struct IdleNotifiedSet<T> {
    lists: Arc<Lists<T>>,
    length: usize,
}

struct ListsInner<T> {
    notified: LinkedList<T>,
    idle: LinkedList<T>,
    waker: Option<Waker>,
}

两条链表

  • idle:存”当前无事可做、在等 Waker”的 Task(包装 JoinHandle);
  • notified:存”被 wake 过、可能有进展”的 Task。

每个 entry 还带一个 my_list 字段(UnsafeCell<List>),取值 Notified / Idle / Neither(已移出)。

Wake 实现:原子搬家

原子搬家”是 IdleNotifiedSet 最精巧的机制——Task 被 wake 时、对应的 ListEntry 要从 idle 链表原子地搬到 notified 链表。这个操作必须原子——否则会出现”entry 在半路、两个链表都找不到它”的中间态。Tokio 用的技巧是:每个 entry 有一个 atomic 状态位、标记自己在哪个链表、wake 时先 CAS 状态、成功再改链表指针——这样即使并发多个 wake 到达、只会有一个成功搬家。

每个 entry 会 impl Wake——当 Task 被 wake 时:

fn wake_by_ref(me: &Arc<Self>) {
    let mut lock = me.parent.lock();
    let old_my_list = me.my_list.with_mut(|ptr| unsafe {
        let old_my_list = *ptr;
        if old_my_list == List::Idle {
            *ptr = List::Notified;
        }
        old_my_list
    });
    if old_my_list == List::Idle {
        let me = unsafe {
            lock.idle.remove(ListEntry::as_raw(me)).unwrap()
        };
        lock.notified.push_front(me);
    }
}

步骤

  1. 拿 parent 的 mutex;
  2. CAS 把 my_list 从 Idle 改成 Notified;
  3. 如果原来是 Idle:从 idle 链表 O(1) 摘除、push 到 notified 链表头

为什么要在 wake 里做这个搬家?因为 JoinSet 的 poll_join_next 一会儿要只看 notified 链表——不用遍历所有 Task、只看被 wake 过的那批。原本 N 个 Task 的 O(N) 扫描,降到 O(notified 数)——而大多数时候 notified 只有几个(比如 100 个网络请求、每次批量回来 2-3 个)。

poll_join_next 的核心

poll_join_next 是 JoinSet 的核心 poll 实现——它把”这个 JoinSet 里有没有完成的 task”这个问题转换成简单的操作:查 notified 链表头、有就取出、没就登记自己的 Waker 返回 Pending。O(1) 复杂度——不管 JoinSet 里有 10 个还是 10000 个 task。这种”复杂场景被数据结构抽象到 O(1)“是好工程的标志。

JoinSet::poll_join_next 大致是:

fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<...>> {
    // 1. 存 waker,供 wake 时通知
    self.inner.set_waker_if_notified_empty(cx.waker());
    // 2. 从 notified 链表取一个
    loop {
        let entry = self.inner.pop_notified();
        match entry {
            None => return if self.inner.is_empty() {
                Poll::Ready(None)  // 所有 task 都完成
            } else {
                Poll::Pending  // 还有 idle task 等
            },
            Some(mut entry) => {
                // 3. poll 这一个 Task 的 JoinHandle
                let mut handle = entry.get_mut();
                match Pin::new(&mut handle).poll(cx) {
                    Poll::Ready(r) => {
                        entry.remove();  // 完成了,从 set 里去掉
                        return Poll::Ready(Some(r));
                    }
                    Poll::Pending => {
                        // 没好,放回 idle 等下次 wake
                        entry.move_to_idle();
                    }
                }
            }
        }
    }
}

三件事

  1. 存 waker(供 Wake 实现从 ListsInner 里唤醒);
  2. 从 notified 链表捞一个候选;
  3. 真的 poll 它的 JoinHandle——Ready 就返回,Pending 就送回 idle。

这是一个典型的”reactor pattern”:不主动轮询、只处理通知过的事件源。和 epoll/kqueue 的哲学一脉相承——第 8 章 I/O Driver 讲过。

15.5 ListEntry:一个 entry 如何同时是 Task 的 Waker

ListEntry 是 IdleNotifiedSet 的节点——它有一个令人意外的双重身份既是链表节点,又是 Task 的 Waker。这种”一个对象承担两种职责”的设计让 wake 操作变得极其高效:当一个 Task 被触发 wake、Waker 的 wake 实现直接把 ListEntry 从 idle 链表挪到 notified 链表、而不是通知一个单独的”事件队列。这种”数据结构节点直接内嵌唤醒能力”是并发编程的高阶模式——值得在自己写高性能并发库时参照。

再往深里想一步——ListEntry 的双重身份让 JoinSet 的 wake 路径比朴素实现少一次 allocation:朴素做法要创建一个”通知”对象、投递到 JoinSet 的事件队列Tokio 做法直接复用 ListEntry 本身。每次 wake 省一次 allocation、乘以大 N task 数量、累积起来的 GC 压力差距明显。这种”不多不少、恰好够用”的设计哲学在 Tokio 源码里随处可见。

ListEntry 是 IdleNotifiedSet 的节点——它有一个令人意外的双重身份既是链表节点,又是 Task 的 Waker。这种”一个对象承担两种职责”的设计让 wake 操作变得极其高效:当一个 Task 被触发 wake、Waker 的 wake 实现直接把 ListEntry 从 idle 链表挪到 notified 链表、而不是通知一个单独的”事件队列。这种”数据结构节点直接内嵌唤醒能力”是并发编程的高阶模式——值得在自己写高性能并发库时参照。

这里有个极精妙的细节:每个 entry 既是 JoinHandle 的容器、也是那个 Task 的 Waker

为什么?考虑 Task 的生命周期:

  • Task 本身由 runtime 调度、会生成 output;
  • Task 完成时会通过 JoinHandle 内部存的 waker 通知等待方;
  • 在 JoinSet 里,等待方就是 poll_join_next

JoinSet 巧妙地:把每个 entry 自己作为 waker 传给 JoinHandle。Task 完成时调这个 waker → entry 的 wake_by_ref 触发 → entry 从 idle 链表搬到 notified 链表 → poll_join_next 下次被唤醒时直接看到它。

没有额外的 HashMap<TaskId, Waker>没有全局通知管道所有协调都靠 Arc<ListEntry> 本身的 Wake impl——这是 Rust 类型系统 + 侵入式链表的组合威力。

15.6 abort_all vs detach_all vs shutdown

JoinSet 提供三种”一次性处理所有任务”的 API——每一种对应不同的业务意图:abort_all(取消所有任务、等它们完成 drop)、detach_all(放弃所有任务、让它们自己跑)、shutdown(取消所有 + 等完成 + 消费返回值)。理解三者差异对写正确的”优雅关闭”逻辑至关重要——真实项目里这三个选错一个、轻则资源泄漏、重则数据损坏。

三个方法看着像、语义却截然不同:

  • abort_all():遍历所有 entry、对每个 raw.remote_abort()——标记取消但不等。Task 下次 schedule 时沿 cancelled 分支退出。之后 join_next() 会收到 Err(JoinError::cancelled(...))
  • detach_all()清空 JoinSet 但不 abort——Task 继续在 runtime 里跑、output 被 drop。用于”我不管了、让它们自生自灭”;
  • shutdown().await:= abort_all() + 循环 join_next().await 直到 None。这是唯一的”保证 Task 真的退出”的方法,其他两个都是”fire and forget”。

生产常见错误:graceful shutdown 里只调了 abort_all() 然后 drop JoinSet——有些 Task 可能还没真的执行到 cancelled 分支、但 JoinSet 已经没了、再也没人能观察它们——它们会占着 runtime 的 worker 直到 natural 退出。正确做法是用 shutdown().await

15.7 和 select! 的组合:最常见的 event loop

JoinSet + select! 是 Tokio 实战里最高频出现的组合——几乎每个长期运行的服务都有这个模式。模板如下:主循环 select! { 来源事件、JoinSet.join_next 处理完成任务、shutdown signal }——新任务从来源进、完成任务从 JoinSet 出、shutdown 时 abort all。读懂这个模板你就能写出生产级的 Tokio 服务骨架。

let mut set = JoinSet::new();
for url in urls {
    set.spawn(fetch(url));
}

loop {
    tokio::select! {
        Some(result) = set.join_next() => {
            process(result);
        }
        _ = shutdown.changed() => {
            set.shutdown().await;
            break;
        }
    }
}

这是一段教科书级的”并发 + 可取消 + 优雅关停”代码

  • JoinSet 负责”先完成先处理”——避免 HOL blocking;
  • select! 把关停信号和 Task 事件并置——取消触发后 shutdown().await 保证所有 Task 真退出
  • Some(result) = set.join_next() 的模式匹配——当 set 为空join_next 返回 None)时分支 disabled、不会死循环 panic。

这个 pattern 在 hyper、axum、tower 的源码里随处可见。

15.8 AbortHandle 的惯用法:按 key 取消

AbortHandle 的最强用法是组成 HashMap<Key, AbortHandle>——这样你就能”按 key 查找任务并取消”。真实场景:按 user_id 管理用户的 WebSocket 心跳任务、用户断开时按 user_id 取消对应任务;按 job_id 管理后台作业、用户请求时按 job_id 取消正在跑的作业。没有 AbortHandle、你要么无法取消、要么得自己造复杂的”任务 id 映射”基础设施。这种 “给你一个小句柄、你能拼出很多花样” 是优秀 API 设计的标志。

再说一个进阶用法AbortHandle + CancellationToken 组合——CancellationToken 来自 tokio-util、提供树形取消语义(父取消触发所有子)。把 AbortHandle 作为 CancellationToken 的一个”叶子”、你就能构建出层级化的任务取消树——大 service 取消时所有子任务、子任务的子任务 … 全部级联取消。这种模式在大型长连接服务里非常有用——一次根取消、清理整棵子树的所有资源

AbortHandle 的最强用法是组成 HashMap<Key, AbortHandle>——这样你就能”按 key 查找任务并取消”。真实场景:按 user_id 管理用户的 WebSocket 心跳任务、用户断开时按 user_id 取消对应任务;按 job_id 管理后台作业、用户请求时按 job_id 取消正在跑的作业。没有 AbortHandle、你要么无法取消、要么得自己造复杂的”任务 id 映射”基础设施。这种 “给你一个小句柄、你能拼出很多花样” 是优秀 API 设计的标志。

JoinHandle 不能 Clone、JoinSet 只能按完成顺序处理——那”按业务 key 取消特定 Task”怎么做?AbortHandle 上场

let mut abort_handles: HashMap<UserId, AbortHandle> = HashMap::new();

// 启动某用户的 Task
let h = tokio::spawn(long_running(user_id));
abort_handles.insert(user_id, h.abort_handle());

// 某时刻取消
if let Some(h) = abort_handles.remove(&user_id) {
    h.abort();
}

JoinHandle 可以通过 abort_handle() 生成一个 AbortHandle——两者不共享状态、各自独立。JoinHandle 留给原 spawn 点 await output,AbortHandle 扔到 HashMap 里做远程杀手。权责分离的完美体现

15.9 真实源码的几个”读代码陷阱”

读 JoinSet 和 IdleNotifiedSet 源码时你会遇到几个容易让人困惑的地方——不是代码错了、是 Rust 的某些高级特性让代码看起来比实际做的更复杂。本节把这些陷阱列出来——读完你看这块代码时不会在某个 unsafe 块前停下来怀疑自己理解能力。

陷阱 1:PhantomData 的多种用途——JoinSet 里的 _marker: PhantomData<T> 不是真的 “幽灵数据”、它是告诉编译器 “JoinSet 逻辑上持有 T 类型的值”(即使物理上只有指针)。这个标记影响 Drop 检查和 Send/Sync 自动派生。不懂 PhantomData 的人看到它会困惑——它是 Rust 类型系统里一个”小而深”的机制。

陷阱 2:unsafe 块里的 invariant 文档——JoinSet 有大量 unsafe 代码、每个 unsafe 块前都有注释说明 “为什么这里 unsafe 是正确的”。读这些注释比读代码本身更有价值——它们把隐性的内存安全 invariant 说清楚了。没有这些注释、代码看起来就像”凭什么你敢 unsafe?“——有了它、你能跟着作者的思路走一遍 reasoning。

陷阱 3:链表操作的 Pin 约束——IdleNotifiedSet 的双链表节点需要 Pin(因为链表指针不能随节点 move)——但 Pin 的操作 boilerplate 让代码看起来比实际做的事多得多。读的时候忽略 Pin 的语法噪音、关注底下的链表操作语义——实际逻辑通常是朴素的”remove / insert head / move to tail”。

读 join_set.rs 的源码,有几个容易绕晕的地方先打预防针:

陷阱 1:idle 和 notified 都是 unsafe 侵入式链表

侵入式链表(intrusive linked list)是系统编程里的一个经典模式——链表节点不是独立 struct、而是嵌入在被链接的对象里。Linux 内核的 struct list_head、FreeBSD 的 queue.h 都是这个模式。优势是零额外分配——节点和对象共用内存;代价是必须用 unsafe 维护指针一致性。Tokio 的 IdleNotifiedSet 用的就是侵入式链表——读源码时看到 unsafe { ... } 不要慌、这是侵入式链表的标配。

LinkedList<T> 在 Tokio 里是侵入式链表(entry 自带 prev/next 指针)——不是 std 的那个。理由在第 6 章讲过:零堆分配、O(1) 移除指定节点(stdLinkedList 需要 *mut Node 才能 O(1) 删)。代价是所有链表操作都 unsafe——Tokio 在这块的 safety invariants 有 200 行 comment 在源码里。

陷阱 2:entry.move_to_idle 不是移动内存、是改链表指针

move”在这里是逻辑上的搬家——不是物理上把 entry 字节从一处复制到另一处。entry 在内存里位置不变、只是修改它的 prev/next 指针让它属于另一个链表。很多读者第一次看到 move_to_idle 会以为它涉及内存移动、然后被 Pin 约束搞糊涂——实际上 Pin 已经保证了 entry 的物理内存地址稳定、所有”move”都是指针重链。

entry 本身是 Arc<ListEntry<T>>——它的堆上位置永远不变。move_to_idle 只是:从当前链表 unlink、重新 link 到 idle 链表。Arc 保证多方引用(wake 回调、JoinSet 本身、可能的外部 AbortHandle)共享同一块内存

陷阱 3:JoinSet 自己不是 Future

JoinSet 本身不是 Future——它是一个容器、你对容器调用 join_next() 才得到一个 Future。这看起来微妙、但对正确使用非常重要:你不能直接 joinset.await、也不能把 JoinSet 放进 select! ——你要 joinset.join_next().await 或放 joinset.join_next() 进 select!。这个区别本章的代码里反复强调——不要搞混。

你不能 set.await——JoinSet 没有 impl Future。要用 set.join_next().await——每次 await 只拿一个。如果要一次性等所有,用 set.join_all().await(Tokio 1.38+ 才有)或者循环 while let Some(r) = set.join_next().await {}

15.10 性能特征:IdleNotifiedSet vs 朴素 FuturesUnordered

把 JoinSet(IdleNotifiedSet)和 futures-util 的 FuturesUnordered 放一起对比是很有启发的——两者解决相同问题(等一组 Future 任一完成)、但实现和性能差异不小。FuturesUnordered 是 futures-util 的通用实现、适合任何 FutureJoinSet 是 Tokio 专用、只接受通过 tokio::spawn 产生的 Task——但它利用 Task 的特殊性(Task 自带 Waker、Scheduler、state)做了针对性优化、在大 N 场景下比 FuturesUnordered 快 2-5 倍。

futures::stream::FuturesUnordered 做同样的事——管理 N 个 Future、按完成顺序交付。对比一下:

维度JoinSet (Tokio)FuturesUnordered (futures)
底层机制IdleNotifiedSet (Tokio 定制)自己的 ready queue + linked list
Waker 开销每 entry 1 个 Arc<ListEntry>每 entry 1 个 Arc<Node> + Waker vtable
并发安全Mutex 保护 listsLockfree ready queue
是否限 Tokio是(spawn 依赖 runtime)否(纯 futures)
Task 取消abort() 原生支持得 drop 整个 stream

选型:如果你在 Tokio 里、Task 需要独立 spawn 到 runtime(比如不同 worker thread)、需要 abort——用 JoinSet。如果你只是有一堆 Future 要并发跑、不需要 spawn——FuturesUnordered 更轻(不占 runtime task 配额)。

15.10½ 一个血泪生产事故:JoinSet 里的 Task 被”看不见地”泄漏

本节讲一个真实生产事故——某团队使用 JoinSet 做后台任务调度、慢慢发现内存一直增长、两周后 OOM。排查发现原因:有个代码路径错误地 spawn 了任务但没把 JoinHandle 放进 JoinSet、Task 成为”匿名孤儿”、永远不会被 JoinSet 统计和 abort、只会在 Task 自己完成时才 drop。从此这个团队的 Tokio 代码 review 里多了一条 checklist:“所有 spawn 必须 track”——要么放 JoinSet 要么放 HashMap、永远不要 “忽略 JoinHandle”。

讲个真实故事。2023 年某次上线后,线上某个服务的 RSS 开始稳步增长——每天涨 200 MB、两周后 OOM。profiling 发现堆上积累了几十万个 Box<dyn Future>。代码审查一遍又一遍都没找到问题。

最后发现:有段代码大致是这样——

let mut set = JoinSet::new();
for batch in incoming_requests() {
    set.spawn(process(batch));
    if let Some(result) = set.try_join_next() {
        handle(result);
    }
}

看出问题了吗try_join_next()非阻塞的——只有当有 Task 已完成时才返回 Some。如果请求来得比 Task 完成得快,JoinSet 里的 Task 会越积越多。loop 退出后(比如 incoming 关闭),JoinSet 里还有一堆未处理的 Task、它们的 output 再也没人读——detach 了就丢失

修复:循环退出前一定要 while let Some(r) = set.join_next().await { handle(r); } 把剩下的清空、或者至少 set.shutdown().await

这个坑的教训:JoinSet 不是”无限容器”——它的 size 和 incoming 速率应该有明确的 backpressure 关系。否则一旦 consumer 慢于 producer,JoinSet 就成了一个无限增长的隐式队列

Tokio 1.37+ 的补救:spawn 自动施压

Tokio 1.37 里给 JoinSet 的 spawn 加了一个小改进——JoinSet 满时新 spawn 的 task 会自然被调度到本地队列、而不会立刻获得 worker。这不是解决 spawn 泄漏问题、但提供了一个软施压机制。真实生产里还是要靠 bounded channel + 明确的限流逻辑——JoinSet 不会替你做”拒绝新任务”。

Tokio 官方注意到这个问题,在 1.37+ 为 JoinSet::spawn 默认启用了 tracing——如果你的 tracing-subscriber 装上了 tokio-console一眼就能看到某个 JoinSet 的 task 数量不正常。这不是根治、但大幅缩短了发现时间。根治还是得在业务层加 set.len() 上限检查。

15.10¾ IdleNotifiedSet 为什么不用 lockfree 队列

这是一个容易被误解的设计选择——很多人第一反应是”为什么不用 lock-free queue 让 wake 完全无锁?“。但 JoinSet 场景有一个特殊约束:只有一个 consumer(调用 join_next 的那个 task)——lock-free 在多 consumer 场景下才显示出巨大优势、在单 consumer 场景下加的复杂度并不值得。JoinSet 内部用了一把轻量 Mutex + 双链表、在预期使用模式下性能已经最优。这是”根据场景选择简单 vs 极致”的工程智慧——不要追求”理论最优”、要追求”场景最合适”。

细心的读者会问:ListsInner 外面套了个 Mutex——为什么不用 Tokio 其他地方常用的 lockfree 队列(比如 concurrent_queue)?

答案是复杂度不值得。每个 Task wake 时需要原子完成”判断 + 从链表 A 移除 + 加入链表 B + 更新 my_list”——这是多步操作,lockfree 实现要么用 hazard pointer、要么用 RCU、要么双倍数据结构——代码量翻 5 倍、bug 面翻 10 倍。而 wake 本身频率有限(每 Task 每次从 Pending → Ready 才触发一次)、Mutex 又只保护链表指针(不跨 poll),竞争窗口极短(几十纳秒)——实测 Mutex 开销远小于 lockfree 带来的复杂度。

Tokio 代码里有个经验法则热路径(每次 poll 都走)必须 lockfree(如 mpsc 的发送链),温路径(wake + Task 切换)可以 Mutex(如这里),冷路径(Task 创建、shutdown)用 Mutex 甚至 RwLock 都可以。这种”按访问频率分配同步原语”的工程判断,是区分”能用”和”好用”代码的关键。

另一个有趣对比:Java 的 CompletableFuture.allOf

Java 的 CompletableFuture.allOf 和 Tokio JoinSet 解决同一个问题——“等一组异步任务完成”。但实现截然不同:Java 版本把所有 CF 的完成回调链到一个 counter 上、counter 归零触发 allOfTokio JoinSet 用 IdleNotifiedSet 的双链表机制。两者都是 O(1) 完成通知、但 Java 版本 allocate 多、Tokio 版本零 allocate(用 entry 嵌入 Task 内)。这种差别让 Tokio 在极高并发场景下有明显优势——不过对大多数应用两者都够用。

Java 的 CompletableFuture.allOf(futs) 返回一个等所有 future 完成的 future,功能和 JoinSet::join_all 类似。但 JVM 的实现走完全不同的路径——每个 CompletableFuture 内部维护一个回调链表stack 字段,CAS push),任一 future 完成时沿链表反向触发 allOf 的 counter 递减。没有双链表、没有 idle/notified 之分——因为 JVM 有 GC、CompletableFuture 不关心”被 detach 后的资源何时释放”。Tokio 做不到 GC,所以必须显式地把未完成的 Task 串在链表上、显式地 detach、显式地释放——双链表本质上是”手动 GC 的分代”。

看清这一点之后你就会发现:Rust async 运行时的所有”复杂数据结构”,绝大多数都是在补 GC 的位。理解了这个大背景,再回头看 IdleNotifiedSet、OwnedTasks、RunQueue——它们的复杂度就不再是”故意炫技”,而是在无 GC 环境下保持正确性 + 高性能的必要代价

15.10⅞ JoinSet 的一个边界:spawn 过程中 JoinSet 自己 drop 会怎样

这是 JoinSet 设计里微妙的边界条件——多线程场景下可能出现”主线程 A spawn 任务到 JoinSet 的一半、另一个线程 B 提前 drop 掉 JoinSet”。Tokio 用 weak Arc 的机制保证这种 race 不会导致 UB——但用户代码如果没理解这个机制、可能写出意外依赖”JoinSet 总活着”假设的代码。本节讲这种边界情况的正确处理。

还有个少人关注的边界:如果 set.spawn(fut) 已经把 Task 注册到 runtime、但 set.drop 在 Task 真正开始 poll 之前发生——会泄漏吗?

答案是不会,但 Task 会被 abort。drop 的路径里 JoinSet 会对每个 entry 调 remote_abort()——相当于对所有 Task 做隐式取消。这和 FuturesUnordered::drop 的语义一致,但和原生 tokio::spawn 不同(后者 drop JoinHandle 只是 detach、不 abort)。

这个差异常常让人踩坑:有人以为”set.spawn 之后立刻扔掉 set”和”tokio::spawn 之后扔掉 JoinHandle”是等价的——其实前者会把所有 Task 干掉、后者让 Task 继续跑。生产里要持续 spawn 又不想被 abort 的场景、只能用裸 tokio::spawn + 另存 JoinHandle、或者让 JoinSet 存活到 Task 自然退出。

15.11 和其他书的呼应

关于 JoinHandle 我再插一段写代码时的实用心法——这是反复实践后才能形成的直觉。

心法 1:spawn 后能否正常结束是你的责任。spawn 本身不保证任务会结束——如果任务是个死循环、它就跑到进程结束。所以每次 spawn 都要在心里问一遍:这个 task 什么时候退出?是等某个 channel 关闭?是 shutdown signal?还是业务完成? 如果你答不上来、这个 spawn 很可能就是未来的 bug 来源。

心法 2:spawn 的任务能 panic 吗?如果 panic 怎么办?——默认情况下 task panic 会让对应 JoinHandle.await 返回 Err(JoinError::panicked) 但不会 panic 主进程。如果你不 await 那个 JoinHandle、panic 就被默默吞掉了——这在生产环境里是常见的”任务悄悄死了但系统看起来在跑”现象。对策:要么 await JoinHandle 显式处理 panic、要么用 JoinSet 统一处理

心法 3:超时必须搭配取消——只用 timeout 不用 abort 是常见错误。timeout(dur, task.await) 超时后 task 还在跑、资源继续占用、可能污染 next request 的状态。正确模式:timeout 触发后必须 task.abort() 并等它真的退出——否则相当于”告诉用户超时、但后台资源还在耗”。


本章涉及的技术和本书其他章节呼应密切——这里做一次串联、让你看到它们如何组成完整的 Tokio 心智模型。JoinHandle 的 16 字节布局延续了第 6 章 Task讲的”冷热数据分离”哲学;IdleNotifiedSet 的双链表第 11 章 Time Driver 的分层时间轮共享”链表节点直接承载 wake 能力”的设计模式;JoinSet 的 abort_all + shutdown第 4 章 Runtime::Drop 的优雅关闭逻辑是同一套思路的不同层次应用。这些共通设计在 Tokio 源码里反复出现——辨识它们能让你把”一堆孤立技巧”升级成”贯穿的设计哲学”。

Vue 3 设计与实现》第 15 章讲过 Vue 的调度器用 Promise.resolve().then() 作为 microtask queue——把多个待更新的 component effect 收集到一个队列、一次 flush。JoinSet 的 idle/notified 双链表思路异曲同工——都是”收集 → 批量处理”。区别在于 Vue 的更新是同步的(浏览器单线程),JoinSet 是并发的(多 worker)。

Rust 编译器与运行时揭秘》第 9 章讲过 Arc 的 weak count 机制——JoinSet 里 Arc<ListEntry> 的多方持有(JoinSet、runtime 内部的 Waker、用户持有的 AbortHandle),如果设计不当就会循环引用泄漏。Tokio 的做法是:entry 只持有 RawTask 的”非所有权指针”——Task 的生命周期由 runtime 的 OwnedTasks 管理,entry 只是观察者。这和那本书里讲的”Arc + RawWaker 避免循环”是同一个套路。

vLLM 源码剖析里的Scheduler 把 Request 分 waiting / running / swapped 三个队列——和 IdleNotifiedSet 的 idle / notified 异曲同工。**“按状态分桶、批量推进”**是所有高性能调度器的共同语言——无论它调度的是 Future、GPU 推理 Batch、还是 OS 进程。

15.12 本章小结

三把利刃(JoinHandle / AbortHandle / JoinSet)各自解决一类特定场景:

1. JoinHandle 是”单个 Task 的完整访问——需要读 Task 的 output、查 Task 是否结束、取消 Task、监听 panic——都通过 JoinHandle。

2. AbortHandle 是”单个 Task 的精简访问——只需要 abort、不需要 output、不需要大小开销。场景:批量任务管理、按 key 管理。

3. JoinSet 是”一组 Task 的集合访问——并发跑 N 个、逐个收集结果、一次性全部取消。场景:扇出扇入、并发 I/O、后台 worker 池。

真实项目里JoinHandle 用于需要结果的独立任务;AbortHandle 用于临时任务(心跳 / 探测);JoinSet 用于扇出扇入工作流。三者的组合能覆盖几乎所有”多任务管理”场景——不需要引入第三方 executor / orchestrator。

下一章(第 16 章)讲 blocking——spawn_blocking、block_in_place、rayon 集成。真实服务里总会碰到”必须 blocking 的操作”(数据库驱动、老 C 库)、本章讲怎么把这些优雅地融入到 Tokio 异步代码里。

章末再强调三个容易忽略的细节

1. JoinHandle drop 不等于取消 task——要取消、必须显式调 handle.abort()。这个差别在”超时兜底”场景尤其重要:如果你用 select! { _ = timeout => ... _ = task.await => ... } 的模式、timeout 到达时 task 的 future 被 drop、但 task 本身继续跑。真要停 task、得先 abort() 再 drop。

2. JoinSet drop 会自动 abort 里面所有 task——这是 Tokio 对 “优雅清理” 的保证。如果你的 function 在栈上建了一个 JoinSet、function 正常返回或 panic、JoinSet drop 时会 abort 所有任务——不会发生孤儿 task 泄漏。这个保证让 JoinSet 比 “手动维护 Vec<JoinHandle>” 安全得多。

3. AbortHandle 是 “弱引用” 语义——它不阻止 Task 被 drop、只是一个 “你想 abort 就 abort” 的通道。如果底层 Task 已经完成或被其他地方 abort 了、AbortHandle.abort() 是 no-op——不会 panic、不会报错。这个健壮性让 AbortHandle 特别适合”最佳努力取消”的场景。

带走三件事:

  1. JoinHandle / AbortHandle 都是 “一个 RawTask + vtable” 的薄壳——类型擦除 + 单态化 vtable 让 16 字节承载一切,Rust”零成本抽象”的教科书级演示
  2. JoinSet 的性能关键在 IdleNotifiedSet——双链表 + Wake 回调做”只扫描被唤醒的 Task”——把 O(N) 降到 O(活跃数)。这是 reactor pattern 在 Task 层的体现
  3. 三者的权责分离——JoinHandle 读 output(不可 Clone)、AbortHandle 远程取消(可 Clone)、JoinSet 批量管理——各司其职、组合起来覆盖 99% 的 Task 管理场景

下一章进入 spawn_blockingblock_in_place——Tokio 如何优雅地把”阻塞代码”塞进 async 世界。你会看到 runtime 如何切换线程池、block_in_place 的”偷走当前 worker”骚操作、以及为什么一个 10 毫秒的 CPU 密集任务能把整个 runtime 卡死。


延伸阅读