Appearance
第10章 TcpStream / UdpSocket 源码剖析
"The day you can read
tokio::net::TcpStreamsource end-to-end without looking anything up, is the day you graduate." —— 笔者
本章要点
TcpStream的 struct 定义极简:TcpStream { io: PollEvented<mio::net::TcpStream> }—— 一个字段、一层包装- PollEvented 是 Tokio 对任何 I/O 资源的通用包装层:
PollEvented<T>把任意 mio 事件源T和一个Registration绑定 - Registration 是运行时侧的对接——持有 scheduler Handle +
Arc<ScheduledIo>。poll_read_ready/poll_write_ready是它的核心方法 - 三层栈:TcpStream →
PollEvented<mio::net::TcpStream>→ Registration → ScheduledIo → mio → epoll TcpStream::connect是一个 async fn,内部 DNS 解析后用connect_addr建立非阻塞连接、等 writableAsyncRead::poll_read是 try + register 模式:先mio_io.read(),WouldBlock 就注册 Waker、返回 PendingTcpListener::accept用了漂亮的async_iohelper —— 写任何自定义 async I/O 都可以复用这个 helper- UdpSocket 的设计和 TcpStream 高度同构——学会一个就懂了所有 mio-based I/O 类型
10.0½ 读本章前的状态检查
本章是本书上半场的收束。在进入之前,做一次自我检查——如果你对下面任一条还不自信,建议先回翻那一章:
- 📖 Future trait 和 Poll 语义:不熟 → 读第 2 章
- 📖 Waker 的 vtable 机制 + wake 路径:不熟 → 读第 3 章
- 📖 Runtime 和 Handle 的关系:不熟 → 读第 4 章
- 📖 multi_thread scheduler 的三条腿:不熟 → 读第 5 章
- 📖 Task 的 Cell / Header / State bit layout:不熟 → 读第 6 章
- 📖 ScheduledIo 和 poll_readiness:不熟 → 读第 8 章
- 📖 mio 三层架构和 epoll:不熟 → 读第 9 章
这 7 章是本章的必备基础。本章把它们全部拼起来——每一段代码都依赖前面多个概念的理解。如果你带着某个薄弱环节读本章,会感觉"怎么这么多东西串起来"——那不是章节的问题,是基础没建好。花 1 小时回去补,比硬读本章快。
建立好基础后,本章会出奇地顺畅——因为你会一直看到"哦,这就是我之前学的那个机制在这里的用法"。这种"终于把碎片拼起来"的顿悟是深度学习最爽的时刻,也是本书想带给你的核心体验。
10.1 TcpStream 的三层包装
打开 tokio/src/net/tcp/stream.rs,Tokio 1.40 里 TcpStream 的定义极简:
rust
// 来源:tokio-rs/tokio · tokio/src/net/tcp/stream.rs (tokio-1.40.0)
pub struct TcpStream {
io: PollEvented<mio::net::TcpStream>,
}一个字段、一层包装。从用户视角看 TcpStream 是一个能 await 的 I/O 类型;从内部看,它只是把三层依赖栈组装起来:
TcpStream ← 用户看到的 API 层
│
└─ PollEvented<mio::net::TcpStream> ← Tokio 的 I/O 包装层
│
├─ inner: mio::net::TcpStream ← mio 的同步 TcpStream(非阻塞)
└─ registration: Registration ← 和 Tokio runtime 的连接
│
├─ handle: scheduler::Handle ← Runtime Handle
└─ shared: Arc<ScheduledIo> ← 第 8 章讲的 per-fd 状态
│
└─ ... 底层 mio 的 Token 指向这个 ScheduledIo 地址4 个分层,每一层都在做一件明确的事:
- TcpStream:用户 API(connect / AsyncRead / AsyncWrite / peer_addr 等),不含任何异步 / 调度逻辑
PollEvented<T>:通用 I/O 包装,接受任何 mio 事件源 T,管理它的 registration- Registration:
ScheduledIo的生命周期拥有者 - ScheduledIo(第 8 章已拆):readiness 状态机
这 4 层的分工是 Tokio 对 I/O 类型"模板化"的关键。
"为什么不合并层"的几个诱惑与回答
你可能想:既然每层都这么薄,为什么不合并?少一层不就更简单?
诱惑 1:把 PollEvented 去掉,TcpStream 直接持有 mio::TcpStream + Registration
- 回答:这样每个 I/O 类型都要写自己的"注册 + deregister + 生命周期管理"——重复几百行样板。PollEvented 存在就是为了消除这些重复
诱惑 2:把 Registration 和 ScheduledIo 合并
- 回答:Registration 是用户侧 + 可能被用户代码直接持有(通过 PollEvented),ScheduledIo 是运行时侧 + 被 I/O Driver 内部持有。两者生命周期不一致——ScheduledIo 可能被多个引用持有、Registration 是单一所有者
诱惑 3:直接让 TcpStream 继承 mio::TcpStream
- 回答:Rust 没有继承。即使用 Deref 也会让 mio 方法直接暴露给用户——破坏抽象(用户可能直接调 mio 的阻塞 read,绕过 Tokio)
每一层都有存在的必要性。看起来冗余其实是在用分层换扩展性、换正确性、换代码复用。这种"一眼觉得多余、仔细看发现恰到好处"的设计,是成熟软件的标志。 只要你实现一个类型 T 并让它实现 mio 的 event::Source trait,你就能把它包装成 PollEvented<T>——自动获得完整的 async 能力。后面 TcpListener、UdpSocket、UnixStream、ChildStdout 全部走这一条路。
10.2 PollEvented:Tokio 的 I/O 通用包装
PollEvented 的签名(简化):
rust
pub(crate) struct PollEvented<E: Source> {
io: Option<E>,
registration: Registration,
}两个字段:
io: Option<E>—— 包装的 mio 事件源。Option的作用是支持从 PollEvented 里把E取出来(比如TcpStream::into_std)。取出后io = None,这个 PollEvented 不再能用registration: Registration—— 运行时侧的注册句柄
PollEvented 的 new 方法做两件事:
- 调用 mio 的
register把io注册到 Tokio 的 I/O Driver(第 8 章讲的add_source流程) - 拿到返回的 Registration、和 io 一起打包成 PollEvented
典型用法:
rust
// 伪代码:构造一个自定义的 async I/O 类型
impl MyAsyncType {
fn from_mio(mio_io: MyMioType) -> io::Result<Self> {
let io = PollEvented::new(mio_io)?; // 一行完成注册
Ok(MyAsyncType { io })
}
fn poll_read(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
// 委托给内部 registration + mio_io 的 read
}
}一行 PollEvented::new 就做好了:
- 分配
ScheduledIo - 把
mio_io通过mio::Registry::register注册到 Tokio 的 Poll,token 设为*const ScheduledIo - 返回 PollEvented,其中 Registration 持有
Arc<ScheduledIo>
这层包装的设计价值:任何 I/O 类型"异步化"的成本被压到几行代码。没有这层,每个 I/O 类型都要重复写注册、生命周期管理、deregister 逻辑——几十行样板代码。PollEvented 把这些压缩到一个泛型结构里,所有 I/O 类型直接复用。
PollEvented 的 Drop 行为
当 PollEvented drop 时,它会:
- 从 Tokio I/O Driver deregister(调
mio::Registry::deregister) - Drop 里层的
Arc<ScheduledIo> - Drop
mio_io(关闭 fd)
整个生命周期管理在 Drop 里一气呵成——用户代码不需要手动清理。这是 RAII 最漂亮的应用:资源获取和释放完全对称、编译期保证。
"接受任何 mio::Source" 的扩展性
PollEvented<E: Source> 的 E 约束是 mio::event::Source——任何实现了这个 trait 的类型都可以被包装。这意味着:
- Tokio 自带的
mio::net::TcpStream等当然可以 - 第三方 crate 比如
mio-serial::Serial(串口)、mio-aio::AioContext(Linux AIO)可以 - 你自己写的实现了
Source的类型也可以
从来没有人规定 PollEvented 只能装"socket 相关的东西"——任何 fd-based 或 mio 支持的事件源都行。这种**"基于 trait 的泛型约束"**让 Tokio 的 I/O 生态极易扩展。
mio 换成 io-uring 会怎样
前面提过 tokio-uring 用 io-uring 代替 mio。它的 I/O 类型架构上很像:
tokio_uring::TcpStream包装一个 io-uring-based TcpStream- 但它的中间层不是 PollEvented——因为 io-uring 是 completion 模型,不能走 readiness-based 的 PollEvented
- 内部有一个
Op<T>类型 + submission queue 管理
所以 PollEvented 只适用于 readiness 模型。如果未来 Tokio 主干加入 io-uring 支持,会和 PollEvented 并存(两种 I/O 类型,两套后端)。兼容性和灵活性比统一模型更重要。
10.3 Registration:运行时侧的对接点
打开 tokio/src/runtime/io/registration.rs。Registration 的定义:
rust
// 来源:tokio/src/runtime/io/registration.rs
pub(crate) struct Registration {
handle: scheduler::Handle,
shared: Arc<ScheduledIo>,
}两个字段:
handle:runtime Handle(指向 scheduler 和 I/O Driver)。deregister 时需要shared:指向 ScheduledIo 的共享引用
注意是 Arc<ScheduledIo>——多方共享:
- Registration 持有一份(runtime 活着时)
- Tokio I/O Driver 的
RegistrationSet持有一份(作为链表节点) - 可能还有 ref(比如临时的
poll_ready操作借用)
new_with_interest_and_handle 的真实流程
rust
// 来源:tokio/src/runtime/io/registration.rs
pub(crate) fn new_with_interest_and_handle(
io: &mut impl Source,
interest: Interest,
handle: scheduler::Handle,
) -> io::Result<Registration> {
let shared = handle.driver().io().add_source(io, interest)?;
Ok(Registration { handle, shared })
}两行代码:
handle.driver().io().add_source(io, interest)—— 调用 I/O Driver 的 add_source 方法(第 8 章讲过,内部Box::new(ScheduledIo)+mio::Registry::register)- 打包返回
所有"复杂性"都下沉到 add_source——Registration 自己几乎什么都没做。这种**"薄对接 + 下层厚逻辑"**的分层让每一层单独看都简单。
poll_read_ready / poll_write_ready
Registration 对外提供的核心方法:
rust
// 来源:tokio/src/runtime/io/registration.rs
pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
self.poll_ready(cx, Direction::Read)
}
pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
self.poll_ready(cx, Direction::Write)
}两个方法都委托给 poll_ready,后者再委托给 ScheduledIo::poll_readiness(第 8 章讲过)。这条调用链很深但每一层都很薄。
deregister
rust
// 来源:tokio/src/runtime/io/registration.rs
pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
self.handle().deregister_source(&self.shared, io)
}Drop PollEvented 时调这个,把 fd 从 mio unregister。
所有复杂性都在更深的层——handle.deregister_source 会调 mio::Registry::deregister + 从 RegistrationSet 移除 + 决定何时真的 drop ScheduledIo(可能还有别人持有 Arc)。
Arc<ScheduledIo> 为什么用 Arc 而不是 Box
这是一个微妙的设计决策。ScheduledIo 是不是单一所有者?似乎是——每个 Registration 持有一个。那为什么用 Arc<ScheduledIo> 而不是 Box<ScheduledIo>?
因为至少有两方共同需要访问 ScheduledIo:
- Registration(用户侧):创建时持有 Arc,drop 时释放一份
- Tokio I/O Driver 的 RegistrationSet(内部):为了能遍历所有活跃资源、批量 shutdown,Driver 需要持有所有活跃 ScheduledIo 的引用
需要多份引用 → 必须用 Arc(或者引用计数的等价物)。如果用 Box,只有一方持有,另一方就拿不到。
有没有不用 Arc 的方案?理论上可以——比如用 &'static ScheduledIo(所有 ScheduledIo 永不 drop)。但这会导致内存泄漏(Tokio 常驻程序 spawn 百万次 fd 最终会 OOM)。或者用 Weak<ScheduledIo> 让 Driver 只持弱引用——但弱引用每次 upgrade 都是原子操作,没便宜多少。
Arc 是最简单可行的方案。Tokio 作者选择了简单,不追求"极致零成本"。这也是一个提醒:零成本抽象是个目标,不是铁律——有时候多一次原子操作换来代码清晰是值得的。
10.4 TcpStream::connect:一个 async fn 的完整拆解
原样代码:
rust
// 来源:tokio/src/net/tcp/stream.rs
pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
let addrs = to_socket_addrs(addr).await?;
let mut last_err = None;
for addr in addrs {
match TcpStream::connect_addr(addr).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any address",
)
}))
}逻辑直白:
- DNS 解析:
to_socket_addrs(addr).await?——addr可能是"example.com:80"这种字符串,DNS 解析成Vec<SocketAddr> - 依次尝试每个地址:DNS 可能返回多个 IP(IPv4 + IPv6、或多个 A 记录),依次尝试
- 第一个成功就返回;全部失败 → 返回最后一个错误
为什么要 DNS async?因为 DNS 本身是网络 I/O,不能阻塞。Tokio 的 to_socket_addrs 里部分实现用 spawn_blocking 调 libc 的 getaddrinfo(同步接口),部分用 DNS crate 走异步。
connect_addr 的内部
connect_addr 是真正做 TCP 三次握手的地方。它大致长这样(简化):
rust
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let mio_sock = mio::net::TcpStream::connect(addr)?; // 非阻塞 connect
let stream = TcpStream { io: PollEvented::new(mio_sock)? };
// 等连接完成:fd 变 writable 表示 connect 成功
stream.io.registration().async_io(Interest::WRITABLE, || {
stream.io.get_ref().peer_addr() // 真连上了才能拿到 peer_addr
}).await?;
Ok(stream)
}注意这里的 .await:它不是一次 await,是两次:
- 第一次:DNS 解析(可能走 spawn_blocking 或 async DNS)
- 第二次:等 TCP 三次握手完成(fd 变 writable)
每次 await 都可能让出当前 Task、让出给 runtime 跑其他事。一个看似简单的 TcpStream::connect("example.com:80").await 内部是一个异步状态机,经过几次 yield。
async_io helper:Tokio 最重要的 I/O helper
这一行是整个 Tokio I/O 编程的灵魂:
rust
stream.io.registration().async_io(Interest::WRITABLE, || {
stream.io.get_ref().peer_addr()
}).await?;async_io(interest, op) 的语义:
尝试同步执行
op。如果op返回WouldBlock,注册 Waker 等 interest 就绪、就绪后再试。循环直到op返回非 WouldBlock 结果。
它是一个通用模式,封装了"try → poll_ready → retry" 这个三步循环。TcpListener::accept 也用它、UdpSocket::recv 也用它。下一节详细讲。
DNS 解析的边角:ToSocketAddrs 的 async 设计
to_socket_addrs 的signature 设计值得看一下:
rust
pub trait ToSocketAddrs: sealed::ToSocketAddrsPriv {}Tokio 用了一个 sealed trait pattern——ToSocketAddrs 是公开的空 trait,但实际解析逻辑在 ToSocketAddrsPriv 里,后者是 private 的。
为什么这么设计:
- 让
impl ToSocketAddrs for String / &str / SocketAddr / (IpAddr, u16)等公开的 - 但不允许用户自定义实现(因为 sealed)——所有解析逻辑都走 Tokio 控制的代码路径
- 未来 Tokio 可以改
ToSocketAddrsPriv的签名(比如加参数)——不破坏公开 API
sealed trait pattern 是 Rust 库 API 设计的常见技巧。它让你获得 trait 的好处(开放多态)同时保持内部改造自由。当你写 Rust 库时值得借鉴——尤其是"看起来像 enum 但需要扩展性"的场景。
connect_addr 内部的"两次 await"
重新看 connect_addr 的伪代码:
rust
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let mio_sock = mio::net::TcpStream::connect(addr)?; // 非阻塞 connect 系统调用
let stream = TcpStream { io: PollEvented::new(mio_sock)? };
stream.io.registration().async_io(Interest::WRITABLE, || {
stream.io.get_ref().peer_addr()
}).await?;
Ok(stream)
}注意这里:
mio::net::TcpStream::connect是同步调用——但是它发的是非阻塞 connect(底层是socket(SOCK_NONBLOCK) + connect())connect()系统调用立刻返回EINPROGRESS(连接进行中)——不等三次握手完成- 然后 Tokio 等 fd 变 writable(这是 Linux 习惯——non-blocking connect 完成的信号是 fd 可写)
- writable 后检查
peer_addr()——如果 connect 成功就能拿到;失败会返回错误
这是 Linux 系统编程的经典"异步 connect"模式。Tokio 忠实地遵循了它。
10.5 AsyncRead / AsyncWrite impl:标准的 try + delegate
原样:
rust
// 来源:tokio/src/net/tcp/stream.rs
impl AsyncRead for TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.poll_read_priv(cx, buf)
}
}
impl AsyncWrite for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.poll_write_priv(cx, buf)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.poll_write_vectored_priv(cx, bufs)
}
fn is_write_vectored(&self) -> bool { true }
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown_std(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}
}几个有趣细节:
- AsyncRead / AsyncWrite 的 trait 本身就是公开 API 的形状——用户看到的"TcpStream 可以 .await read/write"就是 impl 这两个 trait 的结果
poll_flush是 no-op——TCP 不需要用户层 flush(内核已经在 kernel buffer 里管理写缓冲),直接返回 Ready(Ok)poll_shutdown调shutdown_std(Shutdown::Write)——对应 POSIXshutdown(2)的写方向关闭。这个是同步操作(不需要 await),因为只是告诉内核"我不再写了"、不涉及真实 I/Opoll_write_vectored支持——TcpStream 支持用多个 IoSlice 一次写(对应 Linux 的writev(2)),这对 HTTP/2 的帧写入有显著性能提升
poll_read_priv 的真实实现
rust
// 来源:tokio/src/net/tcp/stream.rs
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}一行!委托给 PollEvented::poll_read。这个方法的实现(概念上):
rust
// 简化自 PollEvented 的 poll_read 逻辑
pub fn poll_read(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
loop {
// 1. 先 poll readiness(第 8 章讲过的 poll_read_ready)
let event = ready!(self.registration.poll_read_ready(cx))?;
// 2. 读出 ready 状态后尝试真正 read
match self.io.as_ref().unwrap().read(buf.initialize_unfilled()) {
Ok(n) => {
buf.advance(n);
return Poll::Ready(Ok(()));
}
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
// 3. ready 显示可读但真读 WouldBlock:这是 epoll ET 的常见情况
self.registration.clear_readiness(event);
continue; // 清除 readiness 标记,循环回去等下一次 wake
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}核心逻辑(9 章讲过 ET 模式的必要处理):
- 先 poll_read_ready——如果 readiness 没到,注册 Waker、返回 Pending
- readiness 到了,真 read——可能成功、可能 WouldBlock、可能真错误
- WouldBlock → clear_readiness + loop——因为 epoll ET 模式下 readiness 位可能"过时"(内核已经知道 fd 不可读了,但 Tokio 的 ScheduledIo 还标着 READABLE)。clear 后再 loop 等下一次真正的 wake
这个循环是 ET 模式下的正确姿势——不循环会导致偶发的"误以为可读"bug。Tokio 的 poll_read 内置这个循环,用户代码不需要知道。
unsafe { self.io.poll_read(cx, buf) } 里的 unsafe
注意 poll_read_priv 里那个 unsafe —— 为什么需要?
rust
pub(crate) fn poll_read_priv(
&self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}原因在 PollEvented::poll_read 的签名——它是 unsafe fn,要求调用者保证 read 实现正确处理未初始化内存(ReadBuf 里 uninit 的部分不能被读作 u8)。
TcpStream 的 read 保证满足这个条件——它调底层的 recv 系统调用,内核只写入实际收到的字节数。所以 unsafe block 是安全的。注释在源码里明确写着:
rust
// Safety: `TcpStream::read` correctly handles reads into uninitialized memory
unsafe { self.io.poll_read(cx, buf) }这是 Rust unsafe 的正确使用姿势——有文档 + 有不变式 + 有局部化。不是到处撒 unsafe 图省事,而是在必要的边界上带注释地使用。
10.6 async_io helper:自定义 I/O 的万能钥匙
async_io 是 Registration 上的一个公开方法(pub to crate,但通过 TcpStream / UdpSocket 等暴露)。它的签名:
rust
// 概念签名(实际是 unstable 但 tokio 内部都在用)
pub async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R>语义:反复尝试 f(),WouldBlock 时等 interest 就绪,直到 f() 成功或返回非 WouldBlock 错误。
伪代码实现:
rust
async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
loop {
let ev = self.ready(interest).await?;
match f() {
Ok(r) => return Ok(r),
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
}
Err(e) => return Err(e),
}
}
}async_io 用 FnMut 而不是 Fn
细心的读者会注意到 async_io 接受的 f 是 FnMut——可变闭包。为什么?
因为 f 在循环里可能被多次调用(每次 ready 就尝试一次),调用者可能需要在 f 之间保留状态:
rust
let mut retry_count = 0;
self.io.registration().async_io(Interest::READABLE, || {
retry_count += 1; // 这里需要 FnMut 才能可变借用
self.io.get_ref().read(buf)
}).await?;用 Fn 的话这种 retry counter 就无法实现。Tokio API 设计非常精细——这类 FnMut vs Fn 的选择看似细节,但决定了用户代码的灵活性。
async_io 不是零成本:为什么?
async_io 是一个 async 函数,它会被 Rust 编译器展开成一个 Future state machine。这个状态机有大小——它至少要装:
- 自身的 f(闭包)
- 内部的
ready()Future - 当前循环的状态
对于 FnMut 闭包,它可能捕获很多变量——状态机也跟着大。tokio::spawn 的 BOX_FUTURE_THRESHOLD(第 4 章讲过)就是为这种情况准备的——大 Future 自动 Box::pin。
实践影响:如果你在 hot path 上用 async_io + 大闭包,可能偶尔看到"状态机 > 2 KB、debug 下 boxed"的现象。不常见但存在——第 19 章(性能调优)会更详细讨论。
async_io 的 Cancel Safety
一个少被讨论但重要的特性:async_io 是 cancel-safe 的。意思是:
rust
tokio::select! {
r = stream.readable() => { /* ... */ } // 使用 async_io
_ = timeout_future => { /* ... */ } // 超时分支
}如果 timeout 先 Ready,stream.readable() 会被 drop——但 drop 是安全的:
- async_io 内部等的 Waker 会被从 ScheduledIo 的 waiters 里自动移除
- 已经注册但没触发的 readiness 不会丢失
- 下次你再调 readable() 能看到该看到的 readiness
这是 select! 能安全工作的前提——所有 Tokio 自带的 I/O async 方法都 cancel-safe。第 14 章会详细讲 cancel safety——它是 Tokio 区别于 Go / Python 异步的关键特性之一。
自己写 async I/O 方法时:如果用 async_io 包装,自动继承 cancel safety;如果你自己写 state machine,得自己保证。
用 async_io 写 TcpListener::accept
TcpListener::accept 的真实代码(原样):
rust
// 来源:tokio/src/net/tcp/listener.rs
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (mio, addr) = self
.io
.registration()
.async_io(Interest::READABLE, || self.io.accept())
.await?;
let stream = TcpStream::new(mio)?;
Ok((stream, addr))
}漂亮——一行 async_io 封装了全部"等 readable + 非阻塞 accept + WouldBlock 重试"的循环。
为什么 async_io 是如此重要的 helper
在 async_io 出现之前(Tokio 0.2 及更早),每个 I/O 类型的 accept / recv / connect 都手写 try + register + loop 代码——十几行样板、容易出错(忘记 clear_readiness 就是 bug)。
async_io 把这个模式封装成一个 helper——现在所有 Tokio I/O 代码的 accept / recv / etc 都是一行调 async_io。源码简洁、bug 大减、性能一致。
**这种"提炼共同模式为 helper"**是库维护者的日常艺术。你自己写 Rust async 库时,提前想"这个模式会出现多少次"——出现 3 次以上就值得抽成 helper。
poll_flush 为什么是 no-op
这行值得单独讲:
rust
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Poll::Ready(Ok(()))
}为什么 TCP 的 flush 是 no-op?
因为 TCP 不保证"立刻发送"。poll_write 成功意味着数据已经交给内核的 socket buffer——内核决定什么时候真的发出去(Nagle 算法、TSO、ACK 合并等)。用户层的 flush 对 TCP 毫无意义——你的数据已经被内核接管了,内核不会因为你 flush 就改变发送策略。
这和 BufWriter / File 不同——BufWriter 有用户态的缓冲需要 flush 到 File,File 也可能有内核缓冲需要 fsync。TCP 没有用户态缓冲、内核缓冲也不由你控制。
如果真想"立即发",用 TCP_NODELAY(禁用 Nagle)+ TcpStream::set_nodelay(true)。这改变内核发送策略,效果比 flush 直接多了。
poll_shutdown 关闭的是写方向
rust
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown_std(std::net::Shutdown::Write)?;
Poll::Ready(Ok(()))
}注意是 Shutdown::Write——只关闭写方向。为什么不关双向?
因为 AsyncWrite trait 的语义是"writer 关闭"——它不管 reader。如果双向关闭,正在 read 的代码会意外得到 EOF。Tokio 遵循了这个约定:半关闭写方向 = 告诉对端"我不再写了";但 read 方向仍然能继续。
对应 TCP 语义:半关闭会让对端 read 拿到 0 字节(EOF),但你仍可以 read 对端还没发完的数据。HTTP 的 "Connection: close" 底层就是这个半关闭。
为什么 poll_write_vectored 支持但 poll_read_vectored 没有
细节细到这里:AsyncWrite 的 impl 里有 poll_write_vectored——接受 &[IoSlice] 一次写多个 buffer。对应 Linux writev(2),一次系统调用写多段。
但 AsyncRead 的 impl 里没有 poll_read_vectored(Tokio 的 TcpStream 实现里)。为什么?
原因:Tokio 的 AsyncRead trait 接受的是 ReadBuf<'_>——一个连续 buffer,不是 IoSlice 数组。这是 Tokio 故意的 API 简化:
- 写入场景真实需要 vectored——HTTP 帧的 header + body 分开、WebSocket 掩码 + payload 分开
- 读取场景绝大多数时候是"读到一个 buffer"——应用层分析之后再切分。Vectored read 的实际需求不高
trade-off:API 更简单(一个 ReadBuf vs 多个 IoSlice),代价是少数场景需要多次 read。Tokio 作者算过账——对 99% 用户更友好。
如果你真需要 vectored read:可以直接调底层 mio::TcpStream::read_vectored,绕过 Tokio 的 AsyncRead 抽象。但你知道你在做什么才这么干。
TcpStream 的 set_nodelay 及其他 setter
TcpStream 还暴露了一些同步的配置方法:
set_nodelay(true)—— 禁用 Nagle 算法(低延迟优先)set_ttl(u32)—— 设置 IP TTLlinger()/set_linger(Option<Duration>)—— SO_LINGER 控制 close 行为peer_addr()/local_addr()—— 查询本端 / 对端地址
这些都是同步方法,不带 async——因为底层 setsockopt(2) 是非阻塞的、立即返回。不是每个方法都需要 async——只有涉及 I/O 等待的才需要。
Tokio 在标记 async 这件事上很有纪律——有 I/O 可能性的标 async、纯查询 / 配置 的不标。这让用户代码清晰知道"哪里可能让步、哪里不会"。对比一些库给所有方法都加 async(哪怕内部是同步操作)——那让用户无法判断"这一行是不是阻塞点"。
10.7 UdpSocket:和 TcpStream 的同构对比
打开 tokio/src/net/udp.rs,UdpSocket 的定义:
rust
pub struct UdpSocket {
io: PollEvented<mio::net::UdpSocket>,
}和 TcpStream 一模一样的结构——只是泛型参数从 mio::net::TcpStream 换成 mio::net::UdpSocket。
主要差异在 API 层:
- 不实现
AsyncRead/AsyncWrite——因为 UDP 是 datagram-based,AsyncRead的 stream 语义不适用 - 提供
send_to(buf, addr)/recv_from(buf)方法——each call 发 / 收一个完整数据包 - 内部仍然用 async_io 实现异步:
rust
// 简化自 tokio/src/net/udp.rs
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.io.registration().async_io(Interest::READABLE, || {
self.io.get_ref().recv_from(buf)
}).await
}又是一行 async_io。
TcpStream vs UdpSocket:架构层的相同,应用层的不同
| 维度 | TcpStream | UdpSocket |
|---|---|---|
| 底层 mio 类型 | mio::net::TcpStream | mio::net::UdpSocket |
| Tokio 包装 | PollEvented | PollEvented |
| Registration 层 | 完全相同 | 完全相同 |
| async_io helper | 相同 | 相同 |
| API 风格 | AsyncRead + AsyncWrite | send_to / recv_from |
| 读 vs 写的 Waker 注册 | 分开(readable / writable) | 分开 |
Tokio 所有基于 mio 的 I/O 类型(TcpStream / TcpListener / UdpSocket / UnixStream / UnixListener / ChildStdin/Out/Err 等)都是这个套路——三层包装 + async_io。学会其中一个,其他都秒懂。
send_to 一次成功 vs recv_from 需要 retry
UDP 和 TCP 的一个语义差异:
send_to:UDP datagram 是独立的包——要么整个成功要么整个失败。send_to要么 Ready 要么 WouldBlock,不存在"发了一半"recv_from:UDP 也是整个 packet 读。如果你传的 buffer 比 packet 小,多余字节会被丢弃(而不是等下次 read)
这些 UDP 特有语义 Tokio 忠实地暴露——send_to / recv_from 返回 Result<usize, Error> + SocketAddr,对应 packet size 和来源地址。不像 TCP 有 stream 语义需要你拼接 buffer、UDP 是离散 message。
一个常见的 UDP 陷阱:buffer 太小截断
rust
let mut buf = [0u8; 512]; // 只有 512 字节
let (n, addr) = socket.recv_from(&mut buf).await?;
// 如果对端发了 1500 字节,n = 512,**剩余 988 字节被悄悄丢弃**没有任何报错——你只是拿到一个"截断的"packet。对应 Linux recvfrom(2) 的默认行为。
修复:用足够大的 buffer(UDP 理论最大 65535,MTU 常见 1500——取 2048 或 65536 都行)或者用 MSG_PEEK 先看大小。
这类 UDP 陷阱在 Tokio 层面不特殊——UDP 本身就这样、Tokio 保持透明。你写 UDP 代码时得按 UDP 语义来,不能用 TCP 直觉套。
其他 I/O 类型快览
除了 TcpStream / TcpListener / UdpSocket,Tokio 还包装了:
tokio::net::UnixStream / UnixListener:Unix domain socket
- 相同架构:
UnixStream { io: PollEvented<mio::net::UnixStream> } - AsyncRead / AsyncWrite 实现和 TcpStream 几乎一样
- 差别只在 mio 底层(Unix socket 不需要 IP 地址)
tokio::net::UnixDatagram:UDP-like Unix socket
- 对应 UdpSocket,只是底层是 Unix socket
- 方法签名一样(send_to / recv_from)
tokio::process::ChildStdin / ChildStdout / ChildStderr:子进程的 stdio 管道异步化
- 底层是 pipe fd
- Tokio 给这些 fd 做了 PollEvented 包装
- AsyncRead / AsyncWrite 可以直接用
tokio::signal:异步信号处理
- 底层是 signalfd(Linux) 或 kqueue EVFILT_SIGNAL(macOS)
- 用 PollEvented 包装 + 特殊的 signal dispatch
每一个都是不超过 300 行代码的包装——全部复用 PollEvented + async_io。这种高度可复用的架构让 Tokio 的 net / process / signal 等模块总代码量极小。
PollEvented 的泛型约束
PollEvented<E: Source> 里的 Source trait 是 mio 提供的(就是第 9 章讲的 event::Source)。任何实现它的类型都能被 PollEvented 包装。
这个泛型约束让 Tokio 的抽象对外开放——你自己实现 Source 的类型也能享受 PollEvented 的全部功能。Tokio 没有把 I/O 类型硬编码成 "只有 TcpStream / UdpSocket 能异步"——任何符合接口的都行。
**这种"关键抽象对外开放"**是好库的标志。对比一些闭门造车的库——API 只支持库作者预设的几种类型,扩展要 fork 重写。Tokio 走的是开放路径。
10.8 一次 TcpStream::read.await 的全局回看
把第 1-10 章的所有组件串起来,看一次 let n = stream.read(&mut buf).await? 的完整路径:
Step 1(用户代码):read 返回 Read<'a> Future,用户 .await 它
Step 2(Future trait):Read 的 poll 调 stream.poll_read(cx, buf)
Step 3(TcpStream):poll_read → poll_read_priv → self.io.poll_read(...)
Step 4(PollEvented):
- 4.1
registration.poll_read_ready(cx)→ 读 ScheduledIo.readiness - 4.2 没就绪 → 注册 Waker 到 waiters.reader → 返回 Pending
Step 5(Task 挂起):
- 5.1 Read Future 返回 Pending 向上冒泡
- 5.2 外层 async fn 也返回 Pending
- 5.3 Task 的 state 从 RUNNING 变 Idle(第 6 章)
Step 6(Scheduler park):
- 6.1 worker 的本地队列没其他任务 → 准备 park
- 6.2 worker 调
Driver::park→ I/O Driver 的turn(None)
Step 7(mio → 内核):
- 7.1
turn调mio::Poll::poll(events, None) - 7.2 Linux 下就是
epoll_wait(epfd, events, 1024, -1)系统调用 - 7.3 线程阻塞在内核——CPU 让出给其他进程
Step 8(内核侧数据到达):
- 8.1 远端 TCP 包到达网卡 → 内核 TCP/IP 栈处理
- 8.2 数据放进 socket receive buffer
- 8.3 内核把这个 fd 标记为 READY 在 epoll 的就绪列表里
- 8.4
epoll_wait返回
Step 9(mio → Tokio I/O Driver):
- 9.1 epoll_wait 返回 N 个事件
- 9.2 Tokio turn 的 event loop:
for event in events.iter() - 9.3 对 matching event:
let ptr: *const ScheduledIo = token.0 as *const _ - 9.4
io.set_readiness(Tick::Set, |curr| curr | READABLE) - 9.5
io.wake(READABLE)→ 进入 ScheduledIo.wake
Step 10(Tokio 内部 wake):
- 10.1
io.wake拿 Mutex 锁 → 取waiters.reader的 Waker - 10.2 drop 锁 → 调
waker.wake() - 10.3 进入第 3 章讲的 6 层 waker 路径
- 10.4
raw.wake_by_val→Harness::wake_by_val→ state.transition → scheduler.schedule
Step 11(Scheduler 重调度):
- 11.1 Task 被推回本地队列或 LIFO slot
- 11.2 某个 worker pop 到 Task →
Harness::poll - 11.3
transition_to_running→ state 变 RUNNING
Step 12(Task 重新 poll):
- 12.1 Task 的 poll 重新进入 async fn 状态机
- 12.2 恢复到 Read Future 的
.await点 - 12.3 再次
stream.poll_read(cx, buf) - 12.4
registration.poll_read_ready→ readiness 有 READABLE → 返回 Ready - 12.5 执行
self.io.as_ref().read(buf)→ 真正的 recv 系统调用 - 12.6 返回读到的字节数
Step 13(用户代码继续):
- 13.1 Read Future 返回 Ready(Ok(n))
- 13.2 用户代码拿到 n、继续处理
13 步、每一步都有具体代码位置、每一步你在前 10 章都见过。
13 步里每一步的时间预算
把前面几章分别给过的时间数字汇总:
| Step | 操作 | 时间量级 |
|---|---|---|
| 1-3 | 用户代码 → TcpStream::poll_read | ~几纳秒(函数调用) |
| 4 | poll_read_ready → Atomic load + mask | ~5-10 纳秒(fast path) |
| 5 | Task 状态 RUNNING → Idle | ~50 纳秒(state CAS) |
| 6-7 | Driver park → epoll_wait | ~几纳秒(用户态转 kernel) |
| 8 | 内核等数据到来 → wake epoll_wait | 0 ~ 无限(外部延迟) |
| 9 | mio 事件分发 + Token 转 ScheduledIo | ~100 纳秒 |
| 10 | ScheduledIo::wake + Waker vtable | ~200-500 纳秒 |
| 11 | scheduler.schedule + worker pick | ~50-100 纳秒 |
| 12 | Task re-poll → recv syscall | ~500 纳秒-1 微秒 |
| 13 | 用户代码拿结果 | ~几纳秒 |
除了 Step 8(等数据的外部延迟),整条 Tokio 路径 <2 微秒。这就是"百万 QPS 服务"的物质基础——每个请求的 Tokio 开销是数百纳秒级别。
对比你写了 100 行业务逻辑的处理时间(字符串操作、JSON 解析、DB 查询),业务时间远大于 Tokio 开销。这就是为什么 Tokio 在实际服务里"从来不是瓶颈"——它足够快,让业务代码可以专心做业务。
这就是 Tokio——没有任何魔法,只有 13 步精心设计的交互。读完本章你应该真正理解"async I/O"——不是魔法、不是黑盒、不是"就这样子",而是 13 步确定的、可观察的、可测量的交互。
10.8½ 本书上半场的终结:一次心理复盘
到这里本书上半场全部结束。从第 1 章讲 Rust 为什么要把运行时踢出语言、到第 10 章讲 TcpStream 的完整实现路径,你走过了 Tokio 的主体架构:
- 第 1 章:Rust 异步的宇宙观
- 第 2-3 章:Future / Waker —— 语言层的基础
- 第 4 章:Runtime —— 运行时入口
- 第 5 章:multi_thread Scheduler —— 主调度器
- 第 6 章:Task —— 调度对象
- 第 7 章:current_thread + LocalSet —— 另一种调度器
- 第 8 章:I/O Driver —— 运行时和事件的桥梁
- 第 9 章:mio —— 事件和系统调用的桥梁
- 第 10 章:TcpStream / UdpSocket —— 前面所有在一个类型里爆发
心理复盘一下:
- 前 3 章让你有了"Rust async 是什么"的直觉——不是 Go 的 goroutine、不是 JS 的 Promise,是一种"状态机 + Waker 协议"的新模型
- 4-7 章让你有了"Tokio 如何执行 async"的机械图像——Runtime 装 Scheduler、Scheduler 分派 Task、Task 是状态机 + 引用计数
- 8-10 章让你有了"Tokio 如何接外部世界"的图像——epoll_wait 是 heartbeat、ScheduledIo 是 per-fd 状态、每种 I/O 类型都是一个薄包装
这 10 章的关系就像一本建筑图纸的大概念图 + 主体结构 + 关键节点——够你独立阅读任何 Tokio 源码文件、独立诊断生产 Tokio 问题、独立设计一个小规模的自定义 async runtime。
下半场(第 11-20 章)会带你进入时间管理、同步原语、channel、select、blocking、observability、工程实践——每一章都是某个具体能力的深入,但不会再引入根本性的新概念。你在前 10 章建立的那个 mental model 会一直够用。
10.9 和这个系列的其他书的关联
本章是前 9 章的大汇总——所有前面的概念在 TcpStream 里一次呈现。和其他书关联最紧的是 《Rust 编译器与运行时揭秘》第 13 章(FFI 与 ABI 调用约定) 和 第 10 章(Pin、Waker 与 Future 的运行时协作)——本章的 ScheduledIo 作为 mio Token 背后的指针、unsafe { self.io.poll_read(cx, buf) } 的安全性、Pin<&mut Self> 的所有使用——都是那两章理论在 Tokio 里的具体应用。
和 《vLLM 推理内核深度解析》第 17 章(API 服务器与生产部署) 对比也很有启发——vLLM 的 API server 基于 FastAPI + uvloop,但它的 I/O 模型(epoll + callback)和 Tokio 的 I/O 模型在架构上相似。Python / Rust 在这层做出了几乎相同的选择——说明这是正确的设计。
10.9½ 写自定义 async I/O 类型的完整步骤
现在把本章变成可操作的——假设你有一个 Rust 库提供 MyIoDevice(一个 fd-based 的设备,比如串口、PTY、FIFO),你想把它异步化。完整步骤:
Step 1:实现 mio 的 event::Source trait
rust
use mio::event::Source;
use mio::{Interest, Registry, Token};
pub struct MyMioDevice {
fd: RawFd,
}
impl Source for MyMioDevice {
fn register(&mut self, registry: &Registry, token: Token, interests: Interest)
-> io::Result<()>
{
SourceFd(&self.fd).register(registry, token, interests)
}
fn reregister(&mut self, registry: &Registry, token: Token, interests: Interest)
-> io::Result<()>
{
SourceFd(&self.fd).reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
SourceFd(&self.fd).deregister(registry)
}
}SourceFd 是 mio 提供的"给裸 fd 做 Source"的 wrapper——大多数 Unix fd 直接用它就行。复杂设备(比如需要额外 ioctl 的)自己实现。
Step 2:用 PollEvented 包装
rust
use tokio::io::PollEvented;
pub struct MyAsyncDevice {
io: PollEvented<MyMioDevice>,
}
impl MyAsyncDevice {
pub fn new(device: MyMioDevice) -> io::Result<Self> {
Ok(MyAsyncDevice {
io: PollEvented::new(device)?,
})
}
}一行完成 Tokio 侧注册——不需要自己操心 Registration / ScheduledIo / mio::Registry。
Step 3:实现 AsyncRead / AsyncWrite(或自定义方法)
rust
impl AsyncRead for MyAsyncDevice {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
unsafe { self.io.poll_read(cx, buf) }
}
}或者使用 async_io 提供自定义方法:
rust
impl MyAsyncDevice {
pub async fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.registration().async_io(Interest::READABLE, || {
self.io.get_ref().read_sync(buf)
}).await
}
}大约 30 行代码,你的自定义设备就拥有了完整 async 能力——无阻塞、集成 Tokio 调度、支持 .await。这就是 Tokio 抽象层次的威力。
真实案例:mio-serial / tokio-vsock / tokio-tun
Rust 生态里已经有很多这样的库用这个模式包装特殊 fd:
- mio-serial + tokio-serial:异步串口
- tokio-vsock:VSOCK 虚拟机间通信
- tokio-tun:TUN/TAP 网络设备
- tokio-serial-port:工业设备串口
- tokio-ppp / tokio-bluetooth:其他特殊协议
每个都是 ~100 行 Tokio 集成代码 + 具体设备的 mio::Source 实现。生态繁荣依赖这种低成本扩展的可能性。
10.9¾ 一个实战小项目:写一个最简 echo server
合起前面所有知识,看一个最小但完整的 echo server——50 行代码,跑在 Tokio 上:
rust
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on 127.0.0.1:8080");
loop {
let (mut socket, peer) = listener.accept().await?;
println!("New connection: {}", peer);
tokio::spawn(async move {
let mut buf = [0u8; 1024];
loop {
match socket.read(&mut buf).await {
Ok(0) => {
println!("Connection closed: {}", peer);
break;
}
Ok(n) => {
if let Err(e) = socket.write_all(&buf[..n]).await {
eprintln!("Write error: {}", e);
break;
}
}
Err(e) => {
eprintln!("Read error: {}", e);
break;
}
}
}
});
}
}50 行,可以跑 1 万个并发连接、每秒几十万 QPS。所有异步能力都来自本章讲的架构:
TcpListener::bind→ PollEvented + Registration + mio::Registrylistener.accept().await→ async_io + 等 READABLE + 非阻塞 accepttokio::spawn→ 第 6 章的 Task 结构 + 第 5 章的 worker 队列socket.read(&mut buf).await→ poll_read → poll_read_ready → epoll_waitsocket.write_all(...)→ poll_write → poll_write_ready
你现在读得懂这 50 行代码每一个 .await 背后做了什么——这就是前 10 章的收获。
如果你想验证自己的理解,对这个 echo server 做一次"心理执行":假设有一个客户端连接进来、发了 "hello"——从 listener.accept().await 返回到 socket.write_all 完成,你能一步一步讲清楚每一个内核系统调用、每一个 Tokio 函数、每一个 Future state transition 吗?如果能,你已经完全掌握了本书上半场的全部内容。
10.9½ 下半场预告:每一章的甜点
给你一个下半场(11-20 章)的预告——每一章会给你一个具体的"甜点"(一个让你觉得"原来是这样"的惊喜):
- 第 11 章(Time Driver):分层时间轮——Tokio 如何在 O(1) 时间处理任意时长的定时器,比 BTreeMap 或 heap 快 10-100 倍
- 第 12 章(同步原语):
tokio::sync::Mutex是公平锁——为什么 Tokio 的 Mutex 比 std::Mutex 慢一点但更可预测 - 第 13 章(channel):mpsc 的 lock-free ring buffer、broadcast 的 arc_swap 技巧
- 第 14 章(select!):宏展开—— select! 不是运行时魔法,是一段确定的 Future state machine
- 第 15 章(JoinHandle):JoinSet 和 FuturesUnordered 的区别——一个用链表、一个用 slab
- 第 16 章(blocking):
spawn_blocking的线程池启动策略、为什么block_in_place在 current_thread 不能用 - 第 17 章(metrics + tracing):tokio-console 的 protocol——它如何通过 trace event 重建 runtime 状态
- 第 18 章(多 runtime):一个 process 里多个 runtime 的合理场景、thread-per-core 架构在 Tokio 的实现
- 第 19 章(性能调优):8 大典型性能陷阱——每个都附真实案例和诊断步骤
- 第 20 章(设计模式):14 个架构模式——把全书的所有模式总结成可迁移的工程范式
每一章都是独立自洽的——你可以按需跳读,也可以完整读完。下半场比上半场更贴近生产实践——上半场是"理解原理",下半场是"成为能用好 Tokio 的工程师"。
10.10 本章小结
带走三件事:
- TcpStream 是极简的三层包装:TcpStream →
PollEvented<T>→ Registration → ScheduledIo → mio。每一层只有一两个字段、几个方法——简单的分层让整个系统可理解 async_io(interest, f)是 Tokio I/O 的万能 helper——把"try + register + retry + cancel-safe" 模式压成一行调用。TcpListener::accept、UdpSocket::recv_from、任何自定义 I/O 都用它- TcpStream / TcpListener / UdpSocket / UnixStream 架构完全同构——学会一个就学会了所有。差异只在 API 层(AsyncRead vs send_to)和 mio 底层类型
总结:上半场的一个核心洞察
上半场(第 1-10 章)最核心的一个洞察:Tokio 的所有复杂性都在"为并发和正确性服务"——而不是"为了玩花样"。
每一处看似复杂的设计都有明确目的:
- state bit packing:保证状态转移原子
- 侵入式链表:避免堆分配
- 全局 VTABLE:让 will_wake 高效
- ScheduledIo 指针做 Token:避开 HashMap 查找
- 分批 wake:避开锁内副作用
- async_io helper:消除重复 + cancel safety
- PollEvented 泛型:让扩展零成本
每一个都是问题 + 解法的精确对应。没有一段代码是"看起来酷"——每一段都在解决一个具体的工程问题。
读完上半场,你已经看见了工业级基础设施软件是怎么写出来的。下半场的工程实践会让你把这些知识用起来——但"怎么想"的功夫已经在这里建立了。
读完本章值得做的 3 件事:
- 打开你最近写的一个使用 TcpStream / TcpListener 的 Rust 项目、对着本章重新读一遍——你会看到新的层次
- 打开 GitHub 上
tokio/src/net/浏览 10 分钟,挑一个你没见过的模块(比如unix/datagram.rs或windows/named_pipe.rs)——你会发现它们和 TcpStream 惊人地相似 - 思考一个题目:如果让你给一个新协议(比如 SCTP 或 QUIC)做 Tokio 异步化,你的第一步会是什么? 答案:实现 mio::event::Source,然后一切自然展开。
下一章离开 I/O,进入时间——Tokio 的 Time Driver。你会看到一个分层时间轮(hierarchical timing wheel)如何用 4 层 64 slot 管理几百万个定时器、为什么 tokio::time::sleep 精度是亚毫秒级、超过 1 年的 sleep 和 100 毫秒的 sleep 为什么都可以在 O(1) 时间注册。
延伸阅读
- Tokio 源码:
tokio/src/net/tcp/stream.rs - Tokio 源码:
tokio/src/net/tcp/listener.rs - Tokio 源码:
tokio/src/net/udp.rs - Tokio 源码:
tokio/src/runtime/io/registration.rs - 《Rust 编译器与运行时揭秘》第 10 章:Pin / Waker / Future 的运行时协作
- 《vLLM 推理内核深度解析》第 17 章:API 服务器的 I/O 模型对比