Appearance
第10章 Pin、Waker 与 Future:异步运行时的三大支柱
"Pin 不是一个限制自由的枷锁,而是一份让编译器能够证明你的异步代码内存安全的契约。理解了这三者的协作机制,你就掌握了 Rust 异步运行时的全部秘密。"
本章要点
- 自引用问题:async 状态机跨越 await 点后,某些状态包含指向自身其他字段的指针,移动会制造悬垂指针
- Pin<P> 是编译期加运行时的协同保证——被 Pin 的值在其生命周期内不会被移动到另一个内存地址
- Unpin 是 auto trait:绝大多数类型自动实现 Unpin,表示"即使被 Pin 也可以安全移动"
- PhantomPinned 是显式 opt-in
!Unpin的标记类型 - Pin 投影(projection):结构化 pinning 与非结构化 pinning 的选择决定了字段访问的安全性边界
- Waker 是 executor 与 Future 之间的唯一通信通道——通过 vtable 实现零分配的类型擦除回调
- RawWaker + RawWakerVTable 构成了 Waker 的底层接口,任何 executor 都可以提供自己的实现
- Future::poll 的签名
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>精确编码了上述所有约束 - 三者协作构成了一个惰性、需求驱动的异步执行模型
10.1 为什么需要 Pin:自引用问题的本质
在深入 Pin 的实现之前,我们必须首先理解它要解决的核心问题。这个问题不是凭空出现的——它是 Rust 选择"零成本异步"这条道路的必然产物。
10.1.1 async 状态机中的自引用
考虑一个简单的 async 函数:
rust
async fn process_data() -> String {
let data = vec![1, 2, 3, 4, 5];
let slice = &data[..]; // slice 引用 data 的堆内存
tokio::time::sleep(Duration::from_secs(1)).await; // 跨越 await 点
format!("processed {} items", slice.len()) // 使用 slice
}编译器会将这个 async 函数转换为一个状态机枚举(我们在第9章已经详细分析过这个过程)。关键在于:data 和 slice 都需要在 await 点之后继续使用,因此它们必须作为状态机的字段被保存下来。展开后的状态机大致如下:
rust
enum ProcessDataFuture {
// 初始状态:刚进入函数
Start,
// 等待 sleep 完成:需要保存跨 await 点的变量
WaitingSleep {
data: Vec<i32>,
slice: &[i32], // 这个引用指向 data 的堆内存
sleep_future: Sleep,
},
// 已完成
Done,
}问题来了:slice 是一个引用,它指向同一个结构体中 data 字段所拥有的堆内存。当编译器将 data 的地址编码为 slice 中的指针时,这个地址是固定的。但如果整个状态机被移动了呢?
10.1.2 移动导致悬垂指针
Rust 中"移动"的本质是字节级的内存拷贝。标准库的 pin 模块文档对此有精确的描述:
When we say a value is moved, we mean that the compiler copies, byte-for-byte, the value from one location to another.
所有 Rust 值默认都是可移动的。对于普通类型这完全没有问题,但对于包含自引用的类型,移动是灾难性的。当状态机内部的引用指向的是结构体自身的数据时,移动结构体就会产生悬垂指针:
10.1.3 为什么 Rust 不选择"移动时更新指针"
标准库的 pin 模块文档指出了两种可能的解决方案:一是让值检测到移动并更新内部指针(C++ 的移动构造函数方案),二是保证值的地址不变。Rust 的移动语义是纯粹的字节拷贝——编译器在语义移动发生时执行 memcpy,不会调用任何用户代码。这是零成本抽象的基础,不可能放弃。因此 Pin 选择了第二条路径:保证值不会被移动。
10.1.4 地址敏感类型的生命周期
Pin 模块文档描述了地址敏感类型的典型生命周期:
- 创建阶段:值可以自由移动(调用 async 函数返回 Future)
- 固定阶段:某操作使值开始依赖自己的地址不变(第一次 poll)
- 使用阶段:后续操作假设地址稳定(后续 poll 调用)
- 销毁阶段:值被 drop
关键事实:async 函数返回的 Future 在被 poll 之前可以移动。只有在第一次 poll 之后(状态机可能进入包含自引用的状态),它才需要被固定。
10.2 Pin<Ptr> 的实现机理
10.2.1 Pin 的定义:令人意外的简洁
让我们直接看 library/core/src/pin.rs 中 Pin 的定义:
rust
// library/core/src/pin.rs
#[stable(feature = "pin", since = "1.33.0")]
#[lang = "pin"]
#[fundamental]
#[repr(transparent)]
#[rustc_pub_transparent]
#[derive(Copy, Clone)]
pub struct Pin<Ptr> {
pointer: Ptr,
}Pin 的结构极其简单——它就是一个包裹了指针的新类型(newtype)。#[repr(transparent)] 保证了 Pin<Ptr> 和 Ptr 在内存中有完全相同的布局。没有额外的运行时开销,没有额外的内存占用。
Pin 的力量不在于它的数据结构,而在于它的 API 设计——通过精心控制哪些方法是 safe 的、哪些方法是 unsafe 的,Pin 构建了一个编译期的安全屏障。
10.2.2 安全构造 vs 不安全构造
Pin 提供了两种构造方式,它们的安全性边界截然不同:
rust
// library/core/src/pin.rs
// 安全构造:仅当 T: Unpin 时可用
impl<Ptr: Deref<Target: Unpin>> Pin<Ptr> {
pub const fn new(pointer: Ptr) -> Pin<Ptr> {
// SAFETY: the value pointed to is `Unpin`, and so has no requirements
// around pinning.
unsafe { Pin::new_unchecked(pointer) }
}
}
// 不安全构造:适用于所有类型,但需要调用者保证不移动
impl<Ptr: Deref> Pin<Ptr> {
pub const unsafe fn new_unchecked(pointer: Ptr) -> Pin<Ptr> {
Pin { pointer }
}
}这两个构造函数的分界线就是 Unpin trait。Pin::new 只接受指向 Unpin 类型的指针——因为 Unpin 类型本来就不在乎是否被移动,所以 Pin 对它们来说只是一个无操作的包装。
Pin::new_unchecked 则是给那些真正需要被固定的 !Unpin 类型使用的。调用者必须用 unsafe 来承诺:从这一刻起,直到值被 drop,它不会被移动到另一个内存地址。
10.2.3 Pin 如何阻止移动
Pin 阻止移动的核心策略是:不提供获取 &mut T 的安全途径(当 T: !Unpin 时)。
rust
// library/core/src/pin.rs
impl<'a, T: ?Sized> Pin<&'a mut T> {
// 安全方法:仅当 T: Unpin 时可用
pub const fn get_mut(self) -> &'a mut T
where
T: Unpin,
{
self.pointer
}
// 不安全方法:适用于所有类型
pub const unsafe fn get_unchecked_mut(self) -> &'a mut T {
self.pointer
}
}为什么获取 &mut T 是危险的?因为有了 &mut T,你就可以调用 std::mem::swap 或 std::mem::replace 将值移走:
rust
// 如果 Pin 允许安全地获取 &mut T,那么:
let mut pinned_value: Pin<&mut MyFuture> = ...;
let inner: &mut MyFuture = pinned_value.get_mut(); // 假设这是安全的
let mut other = MyFuture::new();
std::mem::swap(inner, &mut other); // 灾难!值被移动了!Pin 通过将 get_mut 限制为 T: Unpin 来阻止这种操作。对于 !Unpin 类型,你只能通过 unsafe 的 get_unchecked_mut 来获取可变引用,此时你必须自己保证不会移动值。
Pin 提供的安全方法只允许获取 Pin<&mut T>(通过 as_mut)或 Pin<&T>(通过 as_ref),这些包装过的引用同样不暴露裸的 &mut T。
10.2.4 Pin 的安全保证链
10.3 Unpin 与 PhantomPinned:opt-out 与 opt-in
10.3.1 Unpin:auto trait 的优雅设计
rust
// library/core/src/marker.rs
#[lang = "unpin"]
pub auto trait Unpin {}Unpin 是一个 auto trait——编译器会自动为所有字段都是 Unpin 的类型实现它。这意味着绝大多数类型(i32、String、Vec<T>、HashMap<K,V> 等)都自动实现了 Unpin。
这个设计决策的含义深远:只有少数特殊类型才需要关心 Pin 的语义。对于普通的 Rust 开发者来说,Pin 几乎是透明的——你日常使用的类型都是 Unpin 的,Pin 对它们没有任何实质约束。
那么哪些类型是 !Unpin 的呢?最重要的就是编译器为 async fn 生成的状态机类型。编译器知道这些类型可能包含自引用,因此它不会为它们实现 Unpin。
10.3.2 PhantomPinned:显式声明"我不可移动"
如果你在编写自己的地址敏感类型(比如侵入式链表),你需要一种方式告诉编译器:"我的类型不应该实现 Unpin。"这就是 PhantomPinned 的用途:
rust
// library/core/src/marker.rs
/// A marker type which does not implement `Unpin`.
///
/// If a type contains a `PhantomPinned`, it will not implement `Unpin` by default.
#[stable(feature = "pin", since = "1.33.0")]
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct PhantomPinned;
#[stable(feature = "pin", since = "1.33.0")]
impl !Unpin for PhantomPinned {}PhantomPinned 通过负向 impl !Unpin 来"感染"包含它的类型。由于 Unpin 是 auto trait,一个结构体只有当所有字段都是 Unpin 时才自动实现 Unpin。加入一个 PhantomPinned 字段就能打破这个条件:
rust
use std::marker::PhantomPinned;
struct SelfReferential {
data: String,
ptr_to_data: *const String,
_pin: PhantomPinned, // 使 SelfReferential: !Unpin
}
// 现在 Pin::new(&mut self_ref) 不再编译
// 必须使用 unsafe 的 Pin::new_unchecked10.3.3 Unpin 的语义总结
| 类型 | Unpin? | Pin 的效果 |
|---|---|---|
i32, String, Vec<T> | 自动 Unpin | Pin 是空操作 |
async fn 生成的 Future | !Unpin | Pin 阻止移动 |
包含 PhantomPinned 的类型 | !Unpin | Pin 阻止移动 |
Box<T> | 自动 Unpin | Pin<Box<T>> 固定的是堆上的 T |
注意 Box<T> 的微妙之处:Box 本身总是 Unpin(移动 Box 只是移动指针),但 Pin<Box<T>> 保证堆上的 T 不会被移动。
10.4 Pin 投影:访问被固定结构体的字段
当你有一个 Pin<&mut Struct>,你如何访问 Struct 的各个字段?这就是 Pin 投影(Pin projection)问题。标准库的 pin 模块文档对此有详细的讨论。
10.4.1 结构化 pinning vs 非结构化 pinning
对于 Pin<&mut Struct> 中的每个字段,你有两种选择:
选择一:结构化 pinning(pinning 向下传播)
如果字段本身也是地址敏感的,你应该让 pinning 传播到该字段:
rust
// library/core/src/pin.rs 中的文档示例
impl Struct {
fn field(self: Pin<&mut Self>) -> Pin<&mut Field> {
// 使用 map_unchecked_mut 进行结构化投影
unsafe { self.map_unchecked_mut(|s| &mut s.field) }
}
}map_unchecked_mut 的源码揭示了投影的机制:
rust
// library/core/src/pin.rs
pub unsafe fn map_unchecked_mut<U, F>(self, func: F) -> Pin<&'a mut U>
where
U: ?Sized,
F: FnOnce(&mut T) -> &mut U,
{
// SAFETY: the caller is responsible for not moving the
// value out of this reference.
let pointer = unsafe { Pin::get_unchecked_mut(self) };
let new_pointer = func(pointer);
// SAFETY: as the value of `this` is guaranteed to not have
// been moved out, this call to `new_unchecked` is safe.
unsafe { Pin::new_unchecked(new_pointer) }
}选择二:非结构化 pinning(pinning 不传播)
如果字段不是地址敏感的,你可以返回普通的 &mut Field:
rust
// library/core/src/pin.rs 中的文档示例
impl Struct {
fn field(self: Pin<&mut Self>) -> &mut Field {
// 不传播 pinning,直接获取可变引用
unsafe { &mut self.get_unchecked_mut().field }
}
}10.4.2 结构化 pinning 的约束与实践
选择结构化 pinning 带来三个约束:(1) 结构体只有在所有结构化 pin 字段都 Unpin 时才能实现 Unpin;(2) drop 时结构化 pin 字段的内存不能在调用其析构函数前被重用;(3) 不能使用 #[repr(packed)]。
在实际项目中,手写 pin 投影容易出错。社区广泛使用 pin-project-lite crate 来自动生成安全的投影代码:
rust
use pin_project_lite::pin_project;
pin_project! {
struct MyFuture {
#[pin] // 结构化 pinning:投影为 Pin<&mut InnerFuture>
inner: InnerFuture,
counter: u32, // 非结构化:投影为 &mut u32
}
}
impl Future for MyFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Pending => { *this.counter += 1; Poll::Pending }
Poll::Ready(()) => Poll::Ready(()),
}
}
}10.5 Waker:executor 与 Future 之间的通信通道
Pin 解决了"值不能被移动"的问题。现在我们转向另一个核心问题:当一个 Future 还没准备好时,executor 如何知道什么时候该再次 poll 它?
10.5.1 惰性执行模型的需求
Rust 的 Future 是惰性的——创建一个 Future 不会执行任何操作,只有 poll 它才会推进。这引出了一个关键的调度问题:
- 如果 executor 不停地 poll 所有 Future(忙等待),那 CPU 利用率会极低
- 如果 executor 等到有事件发生再 poll,它怎么知道该 poll 哪个 Future?
答案就是 Waker:Future 在返回 Poll::Pending 时保存一个 Waker,当它准备好被再次 poll 时(比如 IO 就绪、定时器到期),通过 Waker 通知 executor。
10.5.2 Waker 的结构:类型擦除的回调
直接看 library/core/src/task/wake.rs:
rust
// library/core/src/task/wake.rs
#[repr(transparent)]
pub struct Waker { waker: RawWaker }
impl Unpin for Waker {}
unsafe impl Send for Waker {} // 可跨线程传递
unsafe impl Sync for Waker {} // 可跨线程共享
pub struct RawWaker {
data: *const (), // 类型擦除的数据指针
vtable: &'static RawWakerVTable, // 行为定制表
}data 是类型擦除的指针(*const ()),vtable 包含四个函数指针。这与 trait object 的 vtable 不同——它是手工构造的独立 vtable,可以实现零分配。Waker 是 Send + Sync 的,IO 线程可以唤醒工作线程上的 task。
10.5.3 RawWakerVTable:四个函数指针
rust
// library/core/src/task/wake.rs
pub struct RawWakerVTable {
clone: unsafe fn(*const ()) -> RawWaker, // clone 时增加引用计数
wake: unsafe fn(*const ()), // 唤醒 task,消费资源
wake_by_ref: unsafe fn(*const ()), // 唤醒 task,不消费
drop: unsafe fn(*const ()), // 释放资源
}10.5.4 Waker 的方法实现
rust
// library/core/src/task/wake.rs
impl Waker {
pub fn wake(self) {
// 用 ManuallyDrop 避免 wake 后再调 drop(wake 已包含资源释放)
let this = ManuallyDrop::new(self);
unsafe { (this.waker.vtable.wake)(this.waker.data) };
}
pub fn wake_by_ref(&self) {
// 不消费 Waker,正常生命周期结束时通过 vtable.drop 释放
unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
}
}wake(self) 消费 Waker 并使用 ManuallyDrop 避免双重释放——wake 函数的语义已包含资源释放(如减少 Arc 引用计数)。wake_by_ref(&self) 不消费,Waker 在 drop 时正常释放。
10.5.5 Waker 的 vtable 机制图解
10.5.6 为什么不用 trait object?
为什么不直接用 Box<dyn Wake> 而要手工构造 vtable?三个原因:(1) 零分配——RawWaker 的 data 可以是任意指针(Arc、栈指针、甚至 null);(2) 灵活性——标准库的 NOOP waker 展示了这一点:
rust
// library/core/src/task/wake.rs
impl RawWaker {
const NOOP: RawWaker = {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
|_| RawWaker::NOOP, |_| {}, |_| {}, |_| {},
);
RawWaker::new(ptr::null(), &VTABLE)
};
}NOOP waker 的 data 是 null,所有函数都是空操作,编译期完全确定,零运行时分配。(3) no_std 兼容——Box 需要 alloc,但 Waker 定义在 core 中。
10.5.7 Wake trait:安全的高层抽象
对于不需要极致性能的场景,alloc crate 提供了 Wake trait 作为安全替代:
rust
// 概念代码
pub trait Wake {
fn wake(self: Arc<Self>);
fn wake_by_ref(self: &Arc<Self>) { self.clone().wake(); }
}
// 使用示例——不需要手工构造 vtable
struct MyWaker { task_id: usize, ready_queue: Sender<usize> }
impl Wake for MyWaker {
fn wake(self: Arc<Self>) {
let _ = self.ready_queue.send(self.task_id);
}
}
let waker = Waker::from(Arc::new(MyWaker { task_id: 42, ready_queue: tx }));Wake trait 要求 Arc(堆分配),但完全安全——无需 unsafe 代码。
10.6 Future trait:poll 协议
10.6.1 Future 的定义
rust
// library/core/src/future/future.rs
#[doc(notable_trait)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[stable(feature = "futures_api", since = "1.36.0")]
#[lang = "future_trait"]
pub trait Future {
/// The type of value produced on completion.
type Output;
/// Attempts to resolve the future to a final value, registering
/// the current task for wakeup if the value is not yet available.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}这个签名中的每个元素都经过了精心设计。让我们逐一分析。
10.6.2 为什么是 Pin<&mut Self> 而不是 &mut Self
如果 poll 的签名是 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Self::Output>,那么 executor 就可以在两次 poll 之间移动 Future。对于普通类型这没有问题,但对于 async 状态机,第一次 poll 可能使状态机进入包含自引用的状态,此后移动就会导致悬垂指针。
Pin<&mut Self> 要求 executor 在 poll Future 之前先将它固定(pin)。这意味着:
- Future 在创建后可以自由移动(比如从 async 函数返回、在集合间传递)
- 一旦被 pin 并开始 poll,Future 就不能再被移动
- 后续的每次 poll 都通过
Pin<&mut Self>进行,保证了地址稳定性
10.6.3 Context:Waker 的载体
rust
// library/core/src/task/wake.rs
pub struct Context<'a> {
waker: &'a Waker,
local_waker: &'a LocalWaker, // 不稳定特性
ext: AssertUnwindSafe<ExtData<'a>>, // 不稳定特性
_marker: PhantomData<fn(&'a ()) -> &'a ()>, // 保证生命周期不变性
_marker2: PhantomData<*mut ()>, // !Send + !Sync
}Context 的核心职责是携带 Waker 引用。为什么不直接传 &Waker?可扩展性——Context 是结构体,未来可以添加字段而不破坏 API(local_waker 和 ext 就是后来添加的)。Context 是 !Send + !Sync,它是某次 poll 调用的局部上下文。
10.6.4 Poll 枚举
rust
// library/core/src/task/poll.rs
#[must_use = "this `Poll` may be a `Pending` variant, which should be handled"]
pub enum Poll<T> {
Ready(T), // 值已就绪
Pending, // 未就绪——必须确保 task 会被唤醒
}关键约束:返回 Pending 时,Future 必须确保 task 会在将来某个时刻被唤醒(通过之前保存的 Waker)。否则 task 永远不会被再次 poll——"遗忘唤醒"(lost wakeup)。
10.6.5 Future 的 blanket implementations
标准库提供了两个重要的 blanket impl:(1) &mut F 对 F: Future + Unpin 实现了 Future,让你可以通过可变引用 poll;(2) Pin<P> 对 P: DerefMut<Target: Future> 实现了 Future,使 Pin<Box<dyn Future>> 本身也是 Future——这是异步运行时中最常用的模式。
10.7 三者协作:完整的异步执行追踪
现在让我们将 Pin、Waker 和 Future 组合在一起,追踪一个完整的异步操作从创建到完成的全过程。
10.7.1 一个具体的例子
rust
async fn fetch_and_process(url: &str) -> Result<String, Error> {
let response = http_get(url).await; // 第一个 await 点
let body = response.text().await; // 第二个 await 点
Ok(body.to_uppercase())
}10.7.2 执行追踪时序图
10.7.3 三者各自的角色
Pin:(1) Box::pin(future) 在创建时固定 Future;(2) 每次 poll 通过 as_mut() 获取 Pin<&mut dyn Future>;(3) 保证自引用状态机在 poll 间不被移动。
Waker:(1) Executor 为每个 Task 创建关联的 Waker;(2) 叶子 Future 在返回 Pending 前保存 Waker;(3) IO 就绪时 Reactor 调用 waker.wake() 入队;(4) 每次 poll 应更新保存的 Waker。
10.8 从零构建一个 Mini Executor
理论分析之后,让我们动手实现一个教学用的 mini executor。这个 executor 虽然简单,但完整地展示了 Pin、Waker 和 Future 三者的协作机制。
10.8.1 整体架构
10.8.2 完整实现
rust
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
struct Task {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
task_id: usize,
}
struct MiniExecutor {
tasks: HashMap<usize, Task>,
ready_queue: Arc<Mutex<VecDeque<usize>>>,
next_id: usize,
}
impl MiniExecutor {
fn new() -> Self {
MiniExecutor {
tasks: HashMap::new(),
ready_queue: Arc::new(Mutex::new(VecDeque::new())),
next_id: 0,
}
}
fn spawn(&mut self, future: impl Future<Output = ()> + Send + 'static) {
let task_id = self.next_id;
self.next_id += 1;
// 关键:Box::pin 将 Future 固定在堆上,地址从此不变
let task = Task { future: Box::pin(future), task_id };
self.tasks.insert(task_id, task);
self.ready_queue.lock().unwrap().push_back(task_id);
}
fn run(&mut self) {
loop {
if self.tasks.is_empty() { break; }
let task_id = match self.ready_queue.lock().unwrap().pop_front() {
Some(id) => id,
None => continue, // 真实 executor 会阻塞等待而非自旋
};
if let Some(task) = self.tasks.get_mut(&task_id) {
let waker = create_waker(task_id, self.ready_queue.clone());
let mut cx = Context::from_waker(&waker);
// 关键:as_mut() 获取 Pin<&mut dyn Future>,保证地址稳定
match task.future.as_mut().poll(&mut cx) {
Poll::Ready(()) => { self.tasks.remove(&task_id); }
Poll::Pending => { /* 等待 Waker 唤醒 */ }
}
}
}
}
}
// --- Waker 实现:手工构造 vtable ---
struct WakerData {
task_id: usize,
ready_queue: Arc<Mutex<VecDeque<usize>>>,
}
fn create_waker(task_id: usize, ready_queue: Arc<Mutex<VecDeque<usize>>>) -> Waker {
let data = Arc::new(WakerData { task_id, ready_queue });
let raw_waker = RawWaker::new(Arc::into_raw(data) as *const (), &VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|data| unsafe { // clone: 增加引用计数
let arc = Arc::from_raw(data as *const WakerData);
let cloned = arc.clone();
std::mem::forget(arc);
RawWaker::new(Arc::into_raw(cloned) as *const (), &VTABLE)
},
|data| unsafe { // wake: 入队 + 释放 Arc
let arc = Arc::from_raw(data as *const WakerData);
arc.ready_queue.lock().unwrap().push_back(arc.task_id);
},
|data| unsafe { // wake_by_ref: 入队,不释放
let arc = Arc::from_raw(data as *const WakerData);
arc.ready_queue.lock().unwrap().push_back(arc.task_id);
std::mem::forget(arc);
},
|data| unsafe { // drop: 释放 Arc
let _ = Arc::from_raw(data as *const WakerData);
},
);10.8.3 使用 mini executor
rust
// 异步计时器:在另一个线程等待后通过 Waker 唤醒
struct TimerFuture {
waker_holder: Arc<Mutex<Option<Waker>>>,
}
impl TimerFuture {
fn new(duration: std::time::Duration) -> Self {
let waker_holder = Arc::new(Mutex::new(None));
let holder = waker_holder.clone();
std::thread::spawn(move || {
std::thread::sleep(duration);
if let Some(waker) = holder.lock().unwrap().take() {
waker.wake(); // 唤醒 task
}
});
TimerFuture { waker_holder }
}
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// 每次 poll 都更新 Waker(关键!)
*self.waker_holder.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending // 简化:总是返回 Pending,由线程唤醒后再次 poll
}
}
fn main() {
let mut executor = MiniExecutor::new();
executor.spawn(async {
println!("task 1: start");
TimerFuture::new(Duration::from_millis(100)).await;
println!("task 1: done");
});
executor.run();
}10.8.4 mini executor 中三者的协作要点
让我们回顾这个实现中 Pin、Waker、Future 各自的角色:
Pin 的角色:
Box::pin(future)在spawn时将 Future 固定在堆上task.future.as_mut()在每次poll前获取Pin<&mut dyn Future>- 保证了 async 块生成的状态机在多次 poll 之间不会被移动
Waker 的角色:
create_waker手工构造 vtable,将 task_id 和 ready_queue 编码为data指针TimerFuture::poll中通过cx.waker().clone()保存 Waker- 定时器线程通过
waker.wake()将 task 放回就绪队列
Future 的角色:
poll方法是推进异步操作的唯一入口- 返回
Pending时必须确保 Waker 已被保存(否则 task 永远不会被唤醒) - 返回
Ready时携带最终结果,task 生命周期结束
10.9 与 Tokio Executor 的对比
我们的 mini executor 与 Tokio 这样的生产级 executor 有几个关键差异:
| 特性 | Mini Executor | Tokio |
|---|---|---|
| 任务调度 | 单线程自旋 | 多线程 work-stealing |
| IO 事件 | 用户线程 | epoll/kqueue/io_uring Reactor |
| Waker | Arc<WakerData> | 嵌入 Task header 的原子引用计数 |
| 就绪队列 | 全局 Mutex | 线程本地队列 + 全局注入队列 |
Tokio 的 Waker data 指针直接指向 task header,vtable 函数操作原子状态位——唤醒是一个 CAS + 入队操作,clone 是原子引用计数增加,整个 Waker 生命周期与 Task 绑定,无需额外堆分配。这使 Tokio 能高效管理数十万并发 task。
10.10 常见陷阱与最佳实践
10.10.1 陷阱一:忘记调用 wake
这是异步编程中最常见也最难调试的 bug:
rust
impl Future for BrokenFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.is_ready() {
Poll::Ready(())
} else {
// 错误!返回 Pending 但没有安排 Waker 调用
// 这个 Future 永远不会被再次 poll
Poll::Pending
}
}
}修复方法:确保在返回 Pending 之前,Waker 已经被保存到某个会在未来调用 wake() 的地方。
10.10.2 陷阱二:忙轮询(busy-polling)
rust
// 反模式:立即唤醒自己,导致 executor 不停 poll
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if !self.is_ready() {
cx.waker().wake_by_ref(); // 立即唤醒 → CPU 空转
return Poll::Pending;
}
Poll::Ready(())
}正确做法:只在真正有新事件到来时调用 wake()。需要让出 CPU 时用 tokio::task::yield_now().await。
10.10.3 陷阱三:在 poll 中阻塞
rust
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<u8>> {
let data = std::fs::read("large_file.dat").unwrap(); // 阻塞整个 executor 线程!
Poll::Ready(data)
}修复:使用 tokio::task::spawn_blocking 将阻塞操作移到专用线程池。
10.10.4 陷阱四:不更新 Waker
每次 poll 传入的 Waker 可能不同(executor 重新分配了调度资源),因此必须每次 poll 都更新保存的 Waker,或用 Waker::will_wake() 检查是否需要更新。
10.10.5 陷阱五:poll after Ready
rust
let result = future.as_mut().poll(&mut cx); // Poll::Ready(42)
let result2 = future.as_mut().poll(&mut cx); // 可能 panic、死循环或其他问题Future 完成后不应再次 poll。
10.10.6 陷阱六:Pin 的不安全使用
rust
struct SelfRef {
data: String,
ptr: *const String,
_pin: PhantomPinned,
}
// 正确:使用 Box::pin,堆上的值在 Box 移动时地址不变
let mut data = Box::pin(SelfRef::new("hello".into()));
data.as_mut().init();
// 错误:在栈上 unsafe pin 后移动值
// let mut data = SelfRef::new("hello".into());
// let pinned = unsafe { Pin::new_unchecked(&mut data) };
// pinned.init();
// let moved = data; // 灾难!ptr 成为悬垂指针关键原则:如果你 unsafe 创建 Pin,必须保证值在整个生命周期内不被移动。最安全的做法是使用 Box::pin。
10.11 三大支柱的设计哲学
10.11.1 零成本验证
Pin:#[repr(transparent)] 保证 Pin<&mut T> 和 &mut T 在运行时表示完全相同,所有检查在编译期完成。Waker:RawWaker 只有两个字(data + vtable 指针),等价于 C 的函数指针回调。Future:poll 是普通方法调用,编译器将 async/await 展开为状态机后,每次 poll 就是一个 match 跳转——与手写状态机等价。
10.11.2 类型系统编码安全约束
三个核心约束全部编码在 Future::poll 的签名中:
rust
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;Pin<&mut Self>→ "不能移动自引用类型"Context携带Waker→ "必须通知 executor 何时重新 poll"#[must_use]+ 需要.await或poll→ "Future 是惰性的"
Rust 是唯一一个需要在类型系统层面解决自引用问题的主流语言,因为它同时追求"无 GC"和"零成本异步"。其他语言(Go、JS、C#)通过 GC 处理引用更新,无需面对这个问题。
10.12 本章总结
本章深入剖析了 Rust 异步运行时的三大支柱。让我们用一张图总结它们的关系:
Pin 解决了"async 状态机不能被移动"的内存安全问题,通过类型系统在编译期阻止了不安全的移动操作。
Waker 解决了"executor 如何知道何时重新 poll"的调度问题,通过 vtable 实现了零分配的、executor 无关的类型擦除回调。
Future 定义了异步操作的执行协议,其 poll 签名精确地编码了上述两个保证,将三者融为一个统一的系统。
这三个抽象共同构成了 Rust 异步生态的基础。无论是 Tokio、async-std 还是 smol,所有异步运行时都建立在这三个标准库类型之上。理解了它们的实现机理和协作方式,你就掌握了阅读任何 Rust 异步运行时源码的钥匙。
下一章我们将回到编译器的另一面——闭包。闭包在编译器中被展开为匿名结构体,这个过程和 async 的状态机展开有异曲同工之妙。我们将看到 Fn、FnMut、FnOnce 三个 trait 如何在编译器中被具象化为结构体和方法调用。