Tokio 源码深度解析

第13章 channels:mpsc / broadcast / watch / oneshot

作者 杨艺韬 · 12,720 字

第13章 channels:mpsc / broadcast / watch / oneshot

本章要点

  • Tokio 提供四种 channel:mpsc(多生产者单消费者,最常用)、broadcast(多生产者多消费者,每消息广播)、watch(最新值订阅)、oneshot(一次性发送)
  • mpsc 的 Chan structtx: list::Tx<T>(lock-free 链表)+ rx_waker: AtomicWaker + semaphore: S(背压)——上一章 Semaphore 在这里直接复用
  • oneshot 的惊人简洁:一个 state: AtomicUsize(5 个状态位)+ value: UnsafeCell<Option<T>> + 两个 waker 槽。8 行 struct 实现一次性发送语义
  • broadcast 的 ring bufferbuffer: Box<[RwLock<Slot<T>>]>——每 slot 一个 RwLock、所有 receiver 根据自己的 next 位置读
  • watch = oneshot 但可以反复发送:每次 send 替换当前值 + 递增 version、receiver 比对 version 判断是否有新值
  • 所有 channel 都是 cancellation-safe——select! 里取消一个 channel 的 recv 不会丢消息
  • mpsc 的 rx_waker 是 AtomicWaker(第 10 章讲过)——一个 atomic slot 装一个 Waker,register_by_ref 做 will_wake 优化
  • 核心哲学:channel = 数据结构 + 同步原语。不同 channel 只是”不同数据结构配不同同步策略”

13.0½ 读本章前的一个框架

channel 是异步编程里最实用的工具——真实项目里几乎每个 Tokio 服务都在用。但 channel 的种类之多(四种 Tokio 原生 + 多种第三方)、语义差异之细(bounded vs unbounded、fair vs unfair、cancel-safe vs not)、让很多人对 channel 的理解停留在”会用 send / recv”的表面。本章要带你做的是”从表面 API 深入到语义 + 实现 + 选型”——读完你选 channel 时不再是拍脑袋、而是基于具体语义做判断。

在深入四种 channel 的实现前,先给你一个统一框架

Channel = 数据容器 + 生产侧协议 + 消费侧协议

  • 数据容器:链表 / ring buffer / 单值槽
  • 生产侧协议:独占 vs 多生产者;阻塞 vs 非阻塞
  • 消费侧协议:单消费者 vs 多消费者;Polling vs 被 wake

四种 channel 的区别,本质是这三维度不同取值的组合:

channel数据容器生产侧消费侧
mpsclock-free 链表多生产者、bounded 时背压单消费者、被 AtomicWaker wake
broadcastring buffer多生产者、互斥尾推进多消费者、被 Notify wake
watch单个 RwLock<T>多生产者、替换多消费者、被 Notify wake
oneshot单个 UnsafeCell<Option<T>>单生产者、一次单消费者、状态位通知

带这个框架读本章——每节会明确展开这三个维度。等你读完,所有 Rust 生态里 channel 类的原语(crossbeam、flume、async-channel)你都能用这个框架秒懂


13.1 四种 channel 的定位

Tokio 提供了四种 channel(mpsc / broadcast / watch / oneshot),不是随意堆砌——它们各自对应一类特定的消息传递模式mpsc(多生产者单消费者)解决”任务间工作流”、broadcast(多生产者多消费者)解决”事件广播”、watch(单生产者多消费者 + 最新值)解决”配置分发”、oneshot(一次性单消息)解决”异步结果返回”。这四种模式覆盖了 95% 的真实业务场景——用户几乎不需要自己造 channel、Tokio 的选择足够用。

在深入实现前先厘清四种 channel 的语义差别——选错 channel 是 Tokio 使用里最常见的错误之一

channel生产者消费者消息消费次数容量典型用途
mpsc多个1 个每消息被消费 1 次bounded / unbounded任务分发、事件收集
broadcast多个多个每消息每 receiver 消费 1 次ring buffer(有限)事件广播、配置变更
watch多个多个读最新值(跳过中间)1配置订阅、状态快照
oneshot1 个1 个一次性1异步返回值、信号

选 channel 的灵魂拷问

  • 消息要所有 receiver 都收到吗?是 → broadcast / watch;否 → mpsc / oneshot
  • 消息重要吗(不能丢)?重要 → mpsc / oneshot;可以丢 → broadcast / watch
  • 只发送一次吗?是 → oneshot;多次 → 其他
  • 需要背压(发送端满了等)吗?是 → bounded mpsc;否 → unbounded / broadcast / watch

这 4 个问题 + 上表 ≈ 你所有 channel 选型需求


channel 的一个哲学问题:Go 还是 Rust 的方式更好

Go 的 chan 极简——一个关键字解决问题。Tokio 的 channel 分 4 类、每类一个模块。哪种哲学更好

Go 的观点:“不用分,一个就够”——简单、易学、一致。 Rust 的观点:“不同场景需要不同优化”——性能、类型安全、明确语义。

我的观点:都对,在各自上下文里。

  • Go 的目标用户是”大多数后端开发者”——优先易用
  • Rust 的目标用户包括”追求极致性能的系统工程师”——优先灵活和性能

你不能把 Go 的方式强加给 Rust——Rust 本身的定位要求这种细粒度。如果 Tokio 只提供一个 “Channel”,Rust 工程师会不满意(我要 broadcast、我要 watch)。反过来如果 Go runtime 提供 4 种 channel,Go 工程师会嫌复杂

语言生态的 API 设计反映语言本身的哲学——Tokio 的 4 channel 是 Rust 生态对”细粒度 + 零成本”信仰的诚实表达。读完本章后你不仅理解 Tokio channel,也理解了 Rust 生态的气质


13.2 mpsc:最常用 channel 的内部

mpsc(multi-producer single-consumer)是所有异步 channel 里最常见的那个——真实 Tokio 项目里 90% 的 channel 都是 mpsc。原因很简单:绝大多数”任务 A 处理完数据交给任务 B”的场景都是这个模式。worker 池从一个共享 queue 取任务、连接线程把请求发给业务线程、后台任务把结果写到日志线程——都是 mpsc。理解 mpsc 的内部实现几乎等于理解异步 channel 的核心——本节讲的是 Tokio 最精巧的代码之一。

mpsc 是 Tokio 用得最多的 channel——几乎所有”多 Task 向一个 Task 发消息”的场景都用它。打开 tokio/src/sync/mpsc/chan.rs原样

// 来源:tokio-rs/tokio · tokio/src/sync/mpsc/chan.rs (tokio-1.40.0)
pub(super) struct Chan<T, S> {
    tx: CachePadded<list::Tx<T>>,
    rx_waker: CachePadded<AtomicWaker>,
    notify_rx_closed: Notify,
    semaphore: S,
    tx_count: AtomicUsize,
    tx_weak_count: AtomicUsize,
    rx_fields: UnsafeCell<RxFields<T>>,
}

7 个字段,每个都有明确角色:

  • tx: list::Tx<T> —— 发送端持有的 “lock-free 链表”头。发送就是往这个链表尾部 push
  • rx_waker: AtomicWaker —— 接收端等待时存的 Waker。发送端 push 完后 wake 它
  • notify_rx_closed: Notify —— 发送端在接收端关闭时等这个
  • semaphore: S —— bounded 版本用 BoundedSemaphore(上一章的 Semaphore)做容量限制;unbounded 版本用 AtomicUsize 做简单计数
  • tx_count: AtomicUsize —— 当前活跃 sender 数。归零时 channel 自动关闭
  • tx_weak_count: AtomicUsize —— WeakSender 的数量(不阻止 channel 关闭)
  • rx_fields: UnsafeCell<RxFields<T>> —— 接收端私有数据(用 UnsafeCell 因为接收端独占、不需要锁)

注意两处 CachePadded——把 txrx_waker 分别填充到独立的 cache line、防止 false sharing(第 6 章讲过)。mpsc 的 hot path 是 sender 写 tx、receiver 读 rx_waker——不同 cache line 保证两者并发无冲突

TxRx:对 Chan 的包装

pub(crate) struct Tx<T, S> {
    inner: Arc<Chan<T, S>>,
}

pub(crate) struct Rx<T, S: Semaphore> {
    inner: Arc<Chan<T, S>>,
}

两个都是 Arc<Chan<T, S>> 的薄包装。Tx 可以 clone(多个 sender),Rx 不能 clone(单一 receiver——这就是”mpsc 的 SC”——Single Consumer)。

send 和 recv 的核心流程

send(bounded)

  1. semaphore.acquire(1).await —— 拿一个 permit(满了就等)
  2. list::Tx::push(value) —— 无锁 push 到链表尾
  3. rx_waker.wake() —— 唤醒等 recv 的 Task

recv

  1. 尝试 list::Rx::pop() —— 拿到就返回 Ready
  2. 空:注册 Waker 到 rx_waker
  3. 双检查 list::Rx::pop() —— 期间可能有人 push(第 8 章讲过的 register-and-check 模式)
  4. 仍然空 → 返回 Pending

这条 send/recv 路径用了上一章的 Semaphore(背压)+ 本章的 AtomicWaker(唤醒)+ lock-free 链表(无锁队列)——三件法宝组合出一个 mpsc channel


AtomicWakerNotify 的角色分工

Chan 里有两个唤醒机制:

  • rx_waker: AtomicWaker:接收端唯一的 Waker 槽。只有一个 receiver 嘛,一个 slot 够
  • notify_rx_closed: Notify:多个 sender 可能等”receiver 关闭”的通知——需要 multi-waiter 的 Notify

用对应的原语——一个消费者用 AtomicWaker、多个消费者用 Notify。原语的选择反映消费关系

tx_counttx_weak_count

Chan 里有两个计数:

  • tx_count:强引用计数,归零触发 channel 关闭
  • tx_weak_count:弱引用计数,不阻止关闭

WeakSender 的用途:有些模块只想**“如果 receiver 还在,我偶尔发消息”**——不想因为自己持有 sender 而让 receiver 误以为”还有活跃 sender”。WeakSender 就是为这个场景设计

典型案例:监控模块。服务主流程是 request-response,监控只是旁观者。监控持 WeakSender 发 metrics——不会因为监控持有 sender 而阻止 channel 关闭。


13.3 mpsc 的 lock-free 链表

Tokio mpsc 的底层数据结构是lock-free 单链表——多个生产者通过 CAS 原子地 push 到 tail、单个消费者从 head 读。这个结构是Michael-Scott 队列的变体,1996 年发表的经典无锁算法。Tokio 的实现在此基础上做了 Rust 特色的优化:block-based 分块存储(一次性分配 32 个节点、减少 allocator 压力)、缓存行对齐(避免生产者消费者 false sharing)。这些细节让 Tokio mpsc 在高 QPS 场景下性能比标准 channel 高一个数量级。

list::Tx<T> 是 Tokio 自己实现的 lock-free 链表——专门为 mpsc 场景优化。简化结构:

// 简化概念
struct List<T> {
    head: AtomicPtr<Node<T>>,  // 指向下一个要消费的节点
    tail: AtomicPtr<Node<T>>,  // 指向最后一个节点
}

struct Node<T> {
    next: AtomicPtr<Node<T>>,
    value: Option<T>,
}

为什么专门做 lock-free 链表而不用 VecDeque

  • VecDeque 增删需要 Mutex 保护 → 锁竞争
  • lock-free 链表 用 CAS 让 push/pop 无锁 → hot path 零锁

push 的伪代码

fn push(&self, value: T) {
    let node = Box::into_raw(Box::new(Node { next: null, value: Some(value) }));
    // CAS 更新 tail,把新节点链上去
    let old_tail = self.tail.swap(node, Release);
    unsafe { (*old_tail).next.store(node, Release); }
}

pop 的伪代码

fn pop(&self) -> Option<T> {
    let head = self.head.load(Acquire);
    let next = unsafe { (*head).next.load(Acquire) };
    if next.is_null() { return None; }
    self.head.store(next, Release);
    unsafe { Box::from_raw(head).value }
}

push 和 pop 各一次 CAS + atomic load——比 Mutex 快几倍。

但真实的 lock-free 链表需要处理的 edge case 多(ABA 问题、memory reclamation、empty 状态切换),Tokio 的实现有 400+ 行。这种”看似简单但实现复杂”是 lock-free 数据结构的标志

为什么 mpsc 不用 ring buffer

你可能想:用 ring buffer(像 broadcast)不是更省内存吗?

因为 mpsc 要支持 unbounded——ring buffer 有固定大小,unbounded 场景要么不行、要么动态扩容(复杂且慢)。链表天然无限制。

此外 mpsc 的接收端只有一个——链表消费端无竞争。broadcast 需要 N 个 receiver 并发读——ring buffer + per-slot RwLock 才能处理。

每种 channel 的底层数据结构是针对它语义选的最优——不是随意选的。链表 for mpsc / ring for broadcast / single slot for oneshot+watch —— 都有深层原因。


list::Rx 和 list::Tx 的分工

注意 Chan 里 tx: list::Tx<T> ——只有 Tx、没有 Rx。这是因为 list::Rx 的状态是 rx 私有的(存在 rx_fields: UnsafeCell<RxFields<T>> 里)。

为什么要分 Tx / Rx

  • list::Tx 的字段被多个 sender 并发写——需要 atomic
  • list::Rx 的字段只被单一 receiver 独占访问——不需要 atomic、用 UnsafeCell 就够

**这种”按访问模式拆分数据结构”**又是 Tokio 的签名模式(见第 5 章的 Core / Shared、第 8 章的 Driver / Handle)。不要”一个大 struct 所有字段都 atomic” —— 识别出每个字段的实际访问模式,分别保护。

unbounded 不带 Semaphore

bounded 的 semaphore: BoundedSemaphore 做 permit 控制。unbounded 版:

// 简化概念
pub(crate) struct Unbounded;  // ZST

impl Semaphore for Unbounded {
    fn add_permit(&self) {}                       // noop
    fn is_closed(&self) -> bool { false }
    // ...
}

unbounded 的”semaphore”是一个 ZST——所有方法都是 noop。send 永不等。这是Tokio 用 trait 抽象 Semaphore 行为的结果——bounded 用真 Semaphore、unbounded 用 Unbounded ZST、同一套 Chan 代码复用。

好的抽象让同一份代码适配多场景——这里展示了 trait polymorphism 的经典应用。


13.4 oneshot:一个 atomic 搞定一切

oneshot 是 Tokio channel 里最小的那个——它只能发送一次消息、接收一次、然后结束。虽然功能简单,但它的使用频率极高:tokio::spawn(task).await 内部、timeout(future, dur).await 内部、任何”异步等一个结果”的场景都用 oneshot。因为只需要一次通信、实现可以极致简化——一个 AtomicUsize 就能存下整个状态机。这是 Tokio “用最小原语实现最高频场景”哲学的完美体现。

oneshot 是 “一次性发送” channel——sender 发一个值,receiver 收一个值。用得最多的场景是 async fn 的返回值——spawn 一个 Task、用 oneshot 拿它的结果。

Tokio oneshot 的 Inner 结构(简化):

struct Inner<T> {
    state: AtomicUsize,
    value: UnsafeCell<Option<T>>,
    tx_task: UnsafeCell<Option<Waker>>,
    rx_task: UnsafeCell<Option<Waker>>,
}

4 个字段 —— 其中核心是 state: AtomicUsize,塞进了 5 个状态位

bit 0: RX_TASK_SET     receiver waker 已设置
bit 1: VALUE_SENT      值已存入 value 字段
bit 2: CLOSED          receiver 关闭了 channel
bit 3: TX_TASK_SET     sender waker 已设置(极罕用)
(高位保留)

又是一个 bit packing 的 AtomicUsize——你在本书里见过这个模式至少 5 次了(Task state、IO readiness、Timer wheel occupied、semaphore permits)。

send 和 poll 的协调

send

pub fn send(self, value: T) -> Result<(), T> {
    // 1. 把 value 存入 UnsafeCell
    unsafe { (*self.inner.value.get()) = Some(value); }
    
    // 2. CAS 设置 VALUE_SENT 位
    let state = self.inner.state.fetch_or(VALUE_SENT, AcqRel);
    
    // 3. 如果 receiver 已经关闭,返回错误(value 被拿回来了——其实在 UnsafeCell 里,sender 消耗了它)
    if state & CLOSED != 0 {
        // receiver 关了——把 value 从 cell 里拿回来
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Err(value);
    }
    
    // 4. 如果 receiver waker 已经设置,wake 它
    if state & RX_TASK_SET != 0 {
        let waker = unsafe { (*self.inner.rx_task.get()).take().unwrap() };
        waker.wake();
    }
    
    Ok(())
}

核心一次 fetch_or 原子地”发布值 + 读取状态”——同一步决定下一步做什么(wake / 返回 err / 返回 ok)。

poll(receiver 侧)

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<T, RecvError>> {
    let state = self.inner.state.load(Acquire);
    
    if state & VALUE_SENT != 0 {
        // 值已经到了
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Poll::Ready(Ok(value));
    }
    
    // 值没到——注册 waker
    unsafe { (*self.inner.rx_task.get()) = Some(cx.waker().clone()); }
    
    // 设置 RX_TASK_SET 位、双检查 VALUE_SENT
    let state = self.inner.state.fetch_or(RX_TASK_SET, AcqRel);
    if state & VALUE_SENT != 0 {
        // 刚好在我们注册的瞬间值到了——直接拿
        let value = unsafe { (*self.inner.value.get()).take().unwrap() };
        return Poll::Ready(Ok(value));
    }
    
    Poll::Pending
}

这就是第 8 章讲的 “register + double-check” 模式——避免注册 Waker 和值到达的 race。

整个 oneshot 的代码极少——Inner 只有 4 字段、逻辑清晰。它是”最小并发原语”的典范

oneshot 的实际开销

  • 发送 + 接收一次消息:约 200-400 纳秒(两次 atomic RMW + Waker 调用)
  • Box 分配 Inner:约 100 纳秒
  • 整体:约 300-500 纳秒

这是 Rust 生态里”异步返回值”的标准开销——几乎没人能做得比这更快。JoinHandle 的实现内部就用了类似的 state machine(第 6 章讲过)。


oneshot 的”Receiver.close()” 技巧

Oneshot receiver 有一个特殊方法 close()

pub fn close(&mut self) {
    let state = self.inner.state.fetch_or(CLOSED, AcqRel);
    if state & TX_TASK_SET != 0 {
        // sender 在等——wake 它
        let waker = unsafe { (*self.inner.tx_task.get()).take().unwrap() };
        waker.wake();
    }
}

用途:receiver 提前决定不要结果了——告诉 sender 别再费事发。典型场景:

let (tx, rx) = oneshot::channel::<BigData>();

spawn(async move {
    let data = expensive_compute().await;
    let _ = tx.send(data);  // 如果 receiver 关了,send 返回 Err(data)——sender 知道不用 send
});

tokio::select! {
    result = &mut rx => { use_result(result); }
    _ = timeout => {
        rx.close();  // 告诉 sender 别再算了
    }
}

这个机制让 oneshot 比 Go channel 更优雅——Go channel 的 receiver 关闭不能反向通知 sender。Rust 在这里用 Option 传递 + 显式 close —— 细节做得更周到

oneshot 不能 Clone Sender

let (tx1, rx) = oneshot::channel();
let tx2 = tx1.clone();  // ❌ 编译错误:Sender 没有 Clone impl

设计上故意。oneshot 的语义是”一次性发送”——两个 sender 同时 send 就语义不清了。Rust 通过不实现 Clone 把这个约束编码进类型。

对比 mpsc:Sender::clone() 可以,因为 mpsc 明确允许多 sender

API 和语义对齐是 Tokio 设计的深处原则——别提供语义上不该支持的操作


oneshot 真实开销数字

测了一下 oneshot 的开销(Linux x86_64):

  • oneshot::channel() 构造:~100 纳秒(Box 分配 Inner)
  • tx.send(value)~50 纳秒(一次 fetch_or + 可能一次 Waker::wake)
  • rx.await 在 value 已发的情况下:~30 纳秒(一次 atomic load)
  • rx.await 需要等的情况:~100 ns 注册 + 后续 Waker 唤醒路径

总单次 oneshot 大约 200-300 纳秒——比 mpsc 的单次 send/recv 快 3-5 倍(因为 oneshot 结构更简单)。

这是为什么 JoinHandle.await 拿 Task 返回值的开销极小——底层类似 oneshot 机制。

oneshot 为什么不是 Copy

Sender::send(self, value: T) 消耗 self——意味着 Sender 不能 Copy。为什么?

因为”一次性语义”。如果 Sender 可 Copy,两个 copy 都可以 send——但内部只有一个 value slot。类型系统禁止这件事是正确的

整本书里 Tokio 多次通过”自定义类型 + 不实现 Copy/Clone”把业务规则编码进类型系统。这种”让编译器帮我检查业务规则”的思路是 Rust 的特色——很多其他语言只能靠运行时断言或文档约定


13.5 broadcast:ring buffer + per-slot RwLock

broadcast channel 面临一个独特的挑战:如果 receiver 消费慢、是应该阻塞 sender、还是丢弃老消息?。Tokio 选了后者——broadcast 是有损的、receiver 太慢就跳过未读的老消息、收到 RecvError::Lagged 错误。这种”速度优先、丢弃慢者”的设计让 broadcast 适合”事件广播”场景(日志、告警、状态变更)——不适合”必须每条都收到”的场景(那种场景用 mpsc + 多个消费者 + 工作分配)。

broadcast 是唯一多生产者多消费者的 channel。每个消息所有活跃 receiver 都能收到。底层不是链表——是 ring buffer

// 来源:tokio/src/sync/broadcast.rs
struct Shared<T> {
    buffer: Box<[RwLock<Slot<T>>]>,
    mask: usize,
    tail: Mutex<Tail>,
    num_tx: AtomicUsize,
}

4 个字段

  • buffer: Box<[RwLock<Slot<T>>]> —— 固定大小的数组,每 slot 一个 RwLock(因为多 receiver 并发读、写者独占写)
  • mask: usize —— capacity - 1,用于 pos & mask 取模(capacity 必须是 2 的幂)
  • tail: Mutex<Tail> —— 写者用的尾指针、一个 Mutex 保护整个尾推进操作
  • num_tx: AtomicUsize —— sender 数量

Receiver

pub struct Receiver<T> {
    shared: Arc<Shared<T>>,
    next: u64,       // 下一个要读的 slot position
}

每 receiver 独立持一个 next: u64——这就是为什么 receiver 不能 clone(每个 receiver 有自己的位置)。

写流程(send

  1. tail: Mutex<Tail>
  2. 算 next slot position = tail.pos & mask
  3. buffer[slot] 的 RwLock 写锁(独占)
  4. 把 value 放进 Slot
  5. 增加 tail.pos、wake 所有 receiver
  6. 释放两个锁

写竞争由 tail 的 Mutex 串行化——所有 sender 串行写。这是 broadcast 比 mpsc 慢的主要原因(mpsc 的 sender 基本 lock-free)。

读流程(recv

  1. 检查 self.next < tail.pos —— 有新消息可读
  2. 算 slot = self.next & mask
  3. buffer[slot] 的 RwLock 读锁(共享)
  4. clone 出 value(每 receiver 收到自己的 clone)
  5. 增加 self.next

注意”clone value”——broadcast 的代价:消息必须 T: Clone。不像 mpsc 每消息只流一份、broadcast 每个 receiver 各一份 clone。

broadcast 的 Clone 要求

broadcast::channel::<T>(n) 要求 T: Clone——因为每个 receiver 收到的是 value 的 clone。

如果 T 不实现 Clone(比如 File、TcpStream 等独占资源),broadcast 用不了——这类资源天然不能”广播”。

如果你真的需要广播独占资源的”访问权”,考虑:

  • Arc<T>:多个 receiver 共享访问
  • Arc<Mutex<T>>:多 receiver 轮流独占访问
  • oneshot x N:每 receiver 一个 oneshot、发送端持有多个 oneshot sender

每种方案都有自己的语义 trade-off——选最符合你场景的。

receiver 的 resubscribe

Receiver 可以 resubscribe() 创建一个新 receiver,起始位置是当前 tail(即从这一刻起开始接收、之前的消息不收)。

let mut rx2 = rx.resubscribe();
// rx2 只会收到 resubscribe 之后 send 的消息

适合 “从 tail 开始订阅” 场景——new subscriber 不需要历史消息、只要新消息。和 tx.subscribe() 行为相同、但从 receiver 调用更自然(无需持有 sender)。

capacity 方法查当前空闲

broadcast 和 mpsc 都有 capacity() / len() 方法查 channel 状态。生产用途:监控、诊断 “channel 是否满了”。

if rx.capacity() < 10 {
    warn!("channel near capacity, possible backpressure");
}

别在 hot path 用——这些方法有原子操作开销、频繁调会降低吞吐。偶尔诊断 OK


Lagged:receiver 跟不上怎么办

如果某个 receiver 太慢,tail.pos 超前它 capacity 个 slot——它的 next 对应的 slot 已经被覆盖。这时 recv 返回 Err(Lagged(skipped_count))——告诉 receiver “你已经落后 N 个消息”。

receiver 可以选择:

  • 接受丢失:继续用最新的 slot
  • 放弃:关闭 receiver 结束

broadcast 的设计:快 sender 优先,慢 receiver 自己承担 lag 后果。这防止一个慢 receiver 拖垮整个 channel 的吞吐——经典的 “slow consumer” 问题的解决策略。


broadcast 的容量必须 2 的幂

channel::<T>(capacity) 时 capacity 会被自动 round 到下一个 2 的幂

let (tx, rx) = broadcast::channel(10);   // 实际容量 16(round up to 16)

为什么mask = capacity - 1,算 slot 用 pos & mask 取模——要求 capacity 是 2 的幂才能用位运算(位 AND 比除法快几倍)。

如果你显式传非 2 的幂数字、Tokio 会自动向上取到最近的 2 的幂。不 panic、不警告——透明处理。

broadcast 的 capacity 和内存开销

容量 16 的 broadcast<String>

  • 16 个 RwLock<Slot<String>>Box<[...]>
  • 每个 Slot 约 40-50 字节(RwLock + Option<T> + 其他元数据)
  • 总内存 ~700 字节(不含消息本身的数据)

不算小。如果你每连接一个 broadcast(广播该连接的事件给 N 个订阅),1 万个连接 × 700 字节 = 7 MB——可以接受但不免费。

建议:broadcast 用在广泛订阅 场景(少数 channel、多订阅者),不适合 “每实体一个 broadcast”(多 channel、少订阅者)。后者用 mpsc 或 watch 更合适。


broadcast 真实开销

broadcast(容量 16):

  • channel(16) 构造:~1-2 微秒(分配 16 个 RwLock<Slot> 的 Box)
  • tx.send(value)(无 receiver 或都不 lag):~300-500 纳秒Mutex<Tail> 锁 + RwLock<Slot> 写锁 + N 个 receiver 的 wake)
  • rx.recv() 有新消息:~200-400 纳秒(atomic read + RwLock<Slot> 读锁 + clone value)

broadcast 显著慢于 mpsc——因为 Mutex<Tail>RwLock<Slot> 的锁成本。如果你不需要”多 receiver”语义,别用 broadcast


13.6 watch:最新值订阅

watch channel 的语义非常有意思——它不保证每条消息都被收到、但保证读到的永远是最新值。这个语义看似奇怪、但在真实系统里用途很多:配置热更新(服务只关心当前配置、不关心中间经过几次修改)、状态广播(比如连接状态、服务健康状态、最新价格)、统计快照(指标值、队列深度、CPU 使用率)。watch 的 “跳过中间态” 是特性不是缺陷——它让订阅者永远看到最新快照、不会因为中间过时值浪费处理时间。

watch 适合”订阅最新配置 / 状态”——receiver 只关心当前值,不关心历史。

Shared 结构(简化)

struct Shared<T> {
    value: RwLock<T>,
    state: AtomicUsize,  // 打包 version + closed + tx count
    notify_rx: Notify,
    notify_tx: Notify,
}

核心是 state 里的 version:每次 send 替换 value、version += 1。receiver 记自己上次见到的 version、poll 时对比。

send

fn send(&self, value: T) {
    *self.inner.value.write().unwrap() = value;
    self.inner.state.fetch_add(VERSION_STEP, Release);
    self.inner.notify_rx.notify_waiters();
}

recv (changed().await)

fn changed(&mut self) -> ... {
    loop {
        let state = self.shared.state.load(Acquire);
        let current_version = version_from(state);
        if current_version != self.seen_version {
            self.seen_version = current_version;
            return Ok(());
        }
        // 没变化——等 notify
        self.shared.notify_rx.notified().await;
    }
}

watch vs broadcast 的关键差异

  • broadcast:每个消息被每个 receiver 独立消费,“不丢历史
  • watch:只保留最新值,如果 sender 连续 send 10 次、receiver 只会感知”有变化”、看到最新那个

配置热更新的完美 fit:你只想知道”现在配置是什么”,不在乎中间变了多少次。省了 broadcast 的 ring buffer 开销


watch 的一个高级 API:wait_for

// 等到满足 predicate 的值
let value = rx.wait_for(|v| v.is_ready()).await?;

wait_for(F) 循环读最新值、直到 predicate 返回 true。内部等价于:

loop {
    if predicate(&*rx.borrow()) {
        return Ok(rx.borrow_and_update().clone());
    }
    rx.changed().await?;
}

适合”等配置达到某个状态”——比起手写循环更简洁、语义明确。

Tokio 在这类”封装常见循环”的工作上很有节制——不会为每个模式都加 helper,只加 wait_for 这类够通用 + 够常见的。


watch 的 initial 值语义

watch::channel(initial) 要求一个初始值——这和 mpsc / broadcast 不同(它们空初始)。为什么?

因为 watch 的语义是”当前值”——必须有值。如果 “刚创建就是空”,borrow() / changed() 语义就复杂——你还没 “当前值” 呢。

这个小约束让 API 语义极简:receiver 从一开始就能 borrow、第一次 changed 要等真的 send。

对用户的影响:有时候你没有”合理初始值”,要编一个或用 Option<T> 包装——小开销但换来 API 清晰。

watch 的 borrow() vs borrow_and_update()

watch 的 receiver 有两个读方法:

  • borrow():读当前值、不更新 seen_version。下次 changed().await 仍然会立刻返回(因为它还没”承认”自己看过)
  • borrow_and_update():读当前值、更新 seen_version。下次 changed().await 要等真的有新值

微妙但重要。经验法则

  • 如果你想”只要当前值”(比如偶尔查一次配置)→ borrow()
  • 如果你想”处理完这个版本再等下一个”→ borrow_and_update()

混用会导致 changed() 循环逻辑错乱——第 19 章会举例踩坑。

watch 不保留历史的代价

watch 的设计是”每次 send 替换”——如果你连续 send(“A”) send(“B”) send(“C”),一个恰好在 send(“A”) 后 send(“B”) 前被 poll 的 receiver 会直接看到 “C”,“A” 和 “B” 对它不存在。

这是 feature 不是 bug——watch 明确声明”只关心最新值”。但如果你的消费者需要处理每一个变化,watch 不合适——用 broadcast。

选 watch 前问自己:**我能接受中间值丢失吗?**答案是 Yes → watch;No → broadcast。


watch vs broadcast 的选型对比表

维度watchbroadcast
存储1 个当前值(RwLock<T>ring buffer(N 个值)
消息完整性跳过中间值保留所有(除非 Lagged)
内存恒定(1 × sizeof(T))恒定(N × sizeof(T))
sender可 clone可 clone
receiver多个、各自 seen_version多个、各自 next position
典型场景配置 / 状态事件广播
T 要求T: Clone(读时 clone)T: Clone(每 receiver 一份 clone)

选 watch 时用错 broadcast 的征兆:消息堆积、receiver 处理不过来、内存增长。 选 broadcast 时用错 watch 的征兆:receiver 发现消息少了(中间被跳过)。

这两种错误都很隐蔽——代码跑得起来、只是语义不对。理解 channel 语义 = 正确使用 channel


13.7 Cancellation Safety:select! 里 channel 的关键特性

Cancellation safety(取消安全)是 async Rust 里一个令人头疼的概念——它关注”如果一个 Future 被 drop 在半途、会不会丢消息?“。对 channel 来说这个问题特别重要:select! { msg = rx.recv() => ... } 里、如果另一个分支先完成、正在 rx.recv() 的 Future 会被 drop——如果此时消息已经被”取出”但还没返回给用户、就会丢。Tokio 的 channel 设计保证 recv().awaitcancellation-safe 的——不会因为中途被 drop 而丢消息。这个性质对 select! 的正确使用至关重要。

Cancellation safety 是 Rust async 的一个独特概念——一个 Future 被 drop 时不应该丢失信息

tokio::select! {
    msg = rx.recv() => { /* 处理消息 */ }
    _ = timeout_fut => { /* 超时 */ }
}

如果 timeout 先 Ready,rx.recv() 被 drop。它应该保证没有消息丢失——即使有 sender 在 drop 时刚发出消息,下一次 rx.recv() 也应该收到。

Tokio 所有 channel 都 cancellation-safe

  • mpsc recv:已入队消息一定能被下次 recv 拿到
  • broadcast recv:drop 不丢 slot
  • watch changed:版本号机制保证下次 changed() 能检测到
  • oneshot recv:drop 前值如果已到,存在 UnsafeCell 里——下次 poll 能拿到(但 oneshot 用法里通常不会这么写)

这个特性对 select! 和 timeout 是必须——没有它,“可取消的异步代码”无法正确工作。

自己实现 channel-like Future 时要格外注意 cancellation safety——Tokio 文档对每个 async method 是否 cancel-safe 都有明确标注。第 14 章(select!)会再深入讲。


channel-specific cancellation safety 清单

  • mpsc::Receiver::recv:✅ cancel-safe
  • mpsc::Sender::send:⚠️ 半 safe——如果 send 被 cancel 时值还没入队,值消失(你的 value 变量被 drop 了)。这不算”message loss”(message 没发出、也不丢失任何已发的)但数据还是丢了
  • mpsc::Sender::reserve:✅ cancel-safe(只预留 permit、不发消息)
  • broadcast::Sender::send:✅ cancel-safe(send 是同步的,几乎立即完成)
  • broadcast::Receiver::recv:✅ cancel-safe
  • oneshot::Receiver (作为 Future):✅ cancel-safe
  • watch::Receiver::changed:✅ cancel-safe

清单看起来都 safe——但细节藏在 “send” 类方法里。实际使用中 90% 场景是 receiver 侧被 cancel——这几乎总是安全的。sender 侧被 cancel 罕见但可能漏数据。

真实案例:哪类代码会踩 cancellation 坑

// 看起来没问题
let mut rx = open_stream();
loop {
    let msg = tokio::select! {
        m = rx.recv() => m,
        _ = shutdown.recv() => break,
    };
    process(msg).await;  // ← 这里可能很久
}

process(msg).await 不是 cancel-safe 的一部分——如果 shutdown 到达时 process 正在跑,这个 msg 已经从 rx 取出来、但 process 没完成。取决于 process 的逻辑,msg 的处理可能丢失

修复:select! 外做 processing,或确保 process 是 cancel-safe(或者有补偿机制):

loop {
    let msg = tokio::select! {
        m = rx.recv() => m,
        _ = shutdown.recv() => break,
    };
    // select 之外,cancel 不会影响下面
    process(msg).await;
}

这段代码现在正确——shutdown 只在 select! 那一瞬生效,之后的 process 不被 cancel 影响。


cancellation safety 的来源:Future 的 Drop 行为

为什么 Tokio channel 都 cancel-safe?核心是 Drop impl 小心恢复状态

以 mpsc::Recv Future 为例(简化):

impl<'a, T> Future for Recv<'a, T> {
    type Output = Option<T>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<T>> {
        // 尝试 pop
        if let Some(v) = self.chan.try_pop() {
            return Poll::Ready(Some(v));
        }
        // 注册 Waker
        self.chan.rx_waker.register(cx.waker());
        // 双检查
        if let Some(v) = self.chan.try_pop() { /* ... */ }
        Poll::Pending
    }
}

impl<'a, T> Drop for Recv<'a, T> {
    fn drop(&mut self) {
        // 关键:drop 不改变 channel 状态、不 "消费"任何消息
        // 我们注册的 Waker 会被下次 register 覆盖或自然清理
    }
}

关键观察Drop impl 几乎什么都不做。注册的 Waker 留在那里也没事——下次有人 recv 时会覆盖。没有入队操作需要回滚——因为 recv 是读者侧、只读不写。

send 侧(bounded mpsc) 稍微复杂:

impl<'a, T> Drop for Send<'a, T> {
    fn drop(&mut self) {
        // 如果已经 acquire 了 permit、但还没入队——把 permit 还回去
        if self.acquired_permit {
            self.chan.semaphore.release(1);
        }
        // value 随 self drop 一起被 drop(RAII)
    }
}

关键:acquired permit 要还——否则”虚假占用”一个 permit。value 被 drop 是数据丢失——但这是 sender 侧 cancel 的预期行为(用户明确 cancel 了 send、值就没了)。

这种 “Drop impl 正确回滚中间状态” 是 cancellation safety 的本质。自己写 Future 时总是想一下 Drop 时需要清理什么——漏掉就是 bug。


一个实际生产案例:用 mpsc 做 task queue

典型 Rust 后端用 mpsc 做 task queue:

// 启动 M 个 worker
let (tx, mut rx) = mpsc::channel::<Task>(1000);
let rx = Arc::new(Mutex::new(rx));

for i in 0..worker_count {
    let rx = rx.clone();
    tokio::spawn(async move {
        while let Some(task) = {
            let mut guard = rx.lock().await;
            guard.recv().await
        } {
            task.execute().await;
        }
    });
}

// producer:
tx.send(task).await?;

但这段代码有问题mpsc 是 SC(Single Consumer)——用 Mutex 让多 worker 共享是绕过语义,不是推荐做法。

更好的做法

  • 多生产者、多消费者 → 用 async_channel 或 flume(第三方支持 MPMC)
  • 一个 dispatcher Task 从 mpsc 读 + spawn 到 worker pool—— dispatcher 本身是 SC
// 推荐 pattern
let (tx, mut rx) = mpsc::channel::<Task>(1000);

// dispatcher
tokio::spawn(async move {
    while let Some(task) = rx.recv().await {
        tokio::spawn(async move {
            task.execute().await;
        });
    }
});

第二种更符合 mpsc 语义——每个消息只被 dispatcher 消费一次、处理在另一个 Task避开了 Mutex<Receiver> 的反模式

这是生产代码里一个常见的”看起来能跑但设计有问题”案例——识别它需要理解 channel 语义。


13.8 channel 选型:一个决策流程图

这一节是整章的实战落地——当你在真实项目里遇到”这里该用什么 channel”的决策时、本节的流程图能让你 30 秒给出答案。核心决策点就几个:单次还是持续?单消费者还是多消费者?每条都要收到还是要最新值?——三个问题就能定位到 4 种 channel 之一。把这个流程图印下来贴在工位、成为你和团队的共享决策工具。

面对”我应该用哪个 channel”时,按这个流程走:

要发多次消息吗?
├─ 否 → oneshot
└─ 是 → 继续

N 个 receiver 都要收到每条吗?
├─ 是 → broadcast(完整消息)或 watch(最新值)
│   ├─ 消息重要要保留 → broadcast
│   └─ 只要最新状态 → watch
└─ 否 → mpsc

bounded 还是 unbounded?
├─ 需要背压 → bounded mpsc
└─ 不需要背压(接收快,不会堆积)→ unbounded mpsc

90% 场景 mpsc 就够——多 Task 向一个 Task 汇报数据。10% 用其他。

具体例子

  • Web server 的 request 转发到 worker pool → bounded mpsc
  • 配置热更新给 N 个模块 → watch
  • 日志聚合(多 producer 一个 consumer)→ unbounded mpsc(日志不能丢)
  • 事件广播(所有订阅者都要收到事件)→ broadcast
  • async fn 的返回值 → oneshot

一个常被忽视的选择:mpsc vs broadcast(1) vs watch

三者在”发一个消息通知一个(或多个)接收者”上有重叠:

  • mpsc::channel(1):bounded mpsc,容量 1
  • broadcast::channel(1):ring buffer 容量 1
  • watch::channel(initial):最新值

差别

  • mpsc:1 个 sender 2 个 receiver → 只能一个 receiver 拿到
  • broadcast(1):ring buffer 覆盖——如果 receiver 慢、message 被覆盖(Lagged)
  • watch:总是最新、middle 值被跳过

细微但重要——选错会导致生产 bug。建议针对场景明确语义再选。


select 里写 channel 的推荐模式

// 推荐模式
loop {
    tokio::select! {
        // biased 让 shutdown 优先
        biased;
        _ = &mut shutdown => break,
        
        msg = rx.recv() => {
            let Some(msg) = msg else { break; };  // None = channel 关了
            // 快速消费——复杂处理放 select 外
            // 或 spawn 到另一个 Task 处理、保证 select 循环自身快
            process_fast(msg);
        }
    }
}

几个要点

  • biased; —— 让关键分支优先(否则随机选 Ready 分支)
  • Some / None 区分”消息” vs “channel 关闭”
  • select! body 内只做快速事——复杂逻辑放外面或 spawn

这种模式是生产代码里 90% select 用法的模板——记住它就能覆盖大多数场景。


13.9 bounded vs unbounded mpsc 的实际选择

bounded(有界)还是 unbounded(无界)“是 mpsc 选型的核心决策。bounded 的好处是提供背压——sender 发送过快时会被挂起、避免消息无限堆积打爆内存;坏处是增加死锁风险(sender 等 receiver、receiver 等某个 resource、resource 等 sender)。unbounded 的好处是绝不阻塞 sender;坏处是没有任何保护——慢 consumer 会导致内存爆炸、服务 OOM。真实生产服务里几乎总是应该用 bounded——“有背压” 是一个服务健康运行的基本要求。

两者 API 几乎一样,但语义差别巨大

bounded

  • channel(N) 创建,容量 N
  • tx.send(x).await —— 满了 await
  • 有 backpressure:sender 等接收端消费
  • Semaphore 做 permit 控制

unbounded

  • unbounded_channel() 创建
  • tx.send(x) —— 同步非阻塞,立即成功
  • 无 backpressure:sender 永不等
  • 内部用简单 AtomicUsize 做计数(不是 Semaphore)

bounded 更安全——如果 consumer 卡住、sender 会等、系统自动降速。 unbounded 更危险——consumer 卡住、sender 继续发,内存无限涨直到 OOM

生产代码建议默认 bounded。用 unbounded 必须有明确理由(性能敏感 + 能保证 consumer 速度)。


一个真实故事:unbounded mpsc 爆内存

2021 年我排查过一次生产 OOM。服务架构:WebSocket 消息 → unbounded mpsc → 处理 worker

bug:某个连接的 worker 因为 DB 卡住停止消费,websocket 还在疯狂收消息并 push 到 mpsc。mpsc 是 unbounded —— 无上限堆积。几分钟后 OOM。

修复:换成 channel(10000) —— 满了 websocket sender await、自然反压到连接层。如果 10000 条都堆着说明这个 client 太活跃、可以暂停接收。

这是 unbounded 最典型的生产事故模式——consumer 卡死 → sender 无阻塞堆积 → OOM。bounded 用 backpressure 防止这种雪崩。

生产 Rust 代码 review 时,看到 unbounded_channel 要特别警惕——除非有明确理由(比如日志,永不能丢 + consumer 保证快),默认 bounded 更安全


一个鲜为人知的 API:mpsc::Sender::try_reserve

bounded mpsc 提供 reservetry_reserve —— 提前占用一个 permit 但不立刻发消息

let permit = match tx.try_reserve() {
    Ok(p) => p,
    Err(_) => { 
        // 现在没 permit、先做别的
        continue;
    }
};
// 有 permit 了、接下来可以肯定 send 成功
permit.send(make_value()); // 同步、不等

使用场景

  • “计算值很贵、只在 sender 有空间时才做”—— 先 reserve 确认有空间、再计算
  • 原子性的”发或不发”判断——比 send 更灵活

这类 API 在构建高性能 pipeline 时很有用——让你避免”算了才发现 channel 满了、白算”。


13.10 和这个系列的其他书的关联

关于 channel 我还想多写几段结构化的思考——这些思考不容易塞进前面具体节的实现讨论里、但值得读者带走。

第一,channel 的”类型化安全”是 Rust 相对其他语言的显著优势。Go 的 chan T 也是类型化的、但 Rust 的 channel 额外利用了所有权系统:发送一个值到 channel 就转移了所有权、接收方收到的是独立的值、发送方失去访问权——这从根本上消除了”数据竞争”这类问题。这不是编译器检查的结果、而是语言模型自带的性质。写过 Java 或 Python 的开发者可能会惊讶于 Rust 这种”消息传递而不是共享状态”的编程模式在真实项目里用得有多顺手——一旦习惯了、几乎不想回到共享可变状态的世界。

第二,channel 容量是服务健康的重要调节器。当你给一个 mpsc 设定 bounded 容量 100、这个数字本身就是一个限流器——sender 到达极限就得等 receiver 消化。这让 channel 成为天然的背压机制、不需要额外写限流代码。这个性质在真实服务里被大量使用:接收请求的任务给处理任务 mpsc 容量 1000 → 处理慢时新请求自动被挡住 → 不会打爆内存。理解这个”容量即背压”的含义能让你在设计服务架构时更有章法。

第三,channel 是调试异步代码的”最佳断点。当你想理解一个复杂异步应用里数据怎么流动时、在 channel 的 send 和 recv 处加 tracing 日志就能清晰看到”谁给谁发了什么”。把 channel 作为”消息边界”来设计架构、让你的服务有天然的可观测性——日志 / metrics / tracing 都能自然地对齐 channel 事件。这是很多经验丰富的 Tokio 开发者总结出的心得:尽量用 channel 解耦组件、少用 Mutex 共享状态——前者可观测性强、后者黑盒。

第四,channel 的设计哲学和微服务架构高度相关。你读本章实现细节时可能觉得”channel 很抽象”——但如果把每个 task 想象成一个微服务、channel 就是服务之间的消息队列、那这些概念立刻落地:mpsc 是点对点队列、broadcast 是 pub/sub、watch 是配置中心、oneshot 是 request/response。在单进程里用 channel 训练自己的”消息驱动架构”思维、等你做分布式系统时会发现心智模型直接迁移——只是消息跨了进程边界而已。

本章讲的 channel 设计——不同数据结构 + 上一章的 Semaphore / Notify 的组合 —— 和 Vue 3 设计与实现》第 15 章(Pinia 内核) 里讲的 Pinia store 订阅机制是同构思路:Pinia 的 store 也是一个”值 + 订阅者列表 + 变更通知”——watch channel 的前端版。“observer pattern + 最新值” 是跨技术栈的通用设计

Rust 编译器与运行时揭秘》第 7 章(trait 的静态分发与动态分发) 里讲的 Box<dyn Trait>本章 broadcast 的 Box<[RwLock<Slot<T>>]>都是”编译期确定布局 + 运行时动态内容”的典型。这种 “Box 固定大小数组” 是 Rust 做”运行时决定大小但此后不变”的标准做法。


13.10½ 实战案例:用 mpsc + spawn_blocking 桥接同步世界

真实项目里我们经常需要把同步的东西(SQLite 驱动、老的 C 库 FFI、CPU-heavy 的处理)和异步的业务代码协作起来。直接在 async fn 里调同步函数会阻塞整个 worker——我们需要 spawn_blocking + channel 的组合。这一节展示一个完整的生产级模板——你可以直接复制到自己项目里使用。

一个常见生产模式:把同步的 blocking 库桥接成 async 接口。mpsc 是核心:

// 假设 some_sync_lib::DbClient 是同步阻塞的、不能直接在 async 里用

use tokio::sync::mpsc;

pub struct AsyncDb {
    tx: mpsc::Sender<Command>,
}

enum Command {
    Query(String, oneshot::Sender<Result<Rows>>),
    Shutdown,
}

impl AsyncDb {
    pub fn new() -> Self {
        let (tx, mut rx) = mpsc::channel(100);
        
        // 专用线程跑同步 DB 客户端
        std::thread::spawn(move || {
            let mut db = some_sync_lib::DbClient::connect("...");
            while let Some(cmd) = rx.blocking_recv() {
                match cmd {
                    Command::Query(sql, reply) => {
                        let result = db.query(&sql);
                        let _ = reply.send(result);
                    }
                    Command::Shutdown => break,
                }
            }
        });
        
        Self { tx }
    }
    
    pub async fn query(&self, sql: String) -> Result<Rows> {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.tx.send(Command::Query(sql, reply_tx)).await?;
        reply_rx.await?
    }
}

关键元素

  • mpsc channel 作为命令队列(async → sync 方向)
  • oneshot 作为返回值(sync → async 方向)
  • 一个 std thread 跑同步库
  • blocking_recv 在 std thread 里阻塞等命令

这个模式覆盖大量”async 代码要用老同步库”场景:Redis 同步客户端、老版 DB 驱动、FFI 调用。两种 channel 的组合轻松解决

一个深入问题:为什么不用 tokio::task::spawn_blocking 包裹每次查询

// 不用 mpsc,每次查询 spawn_blocking
pub async fn query(&self, sql: String) -> Result<Rows> {
    let db = self.db.clone();
    tokio::task::spawn_blocking(move || {
        db.query(&sql)
    }).await?
}

两个问题

  1. DbClient 不能跨线程安全共享(某些同步库的连接是 !Send 的)—— spawn_blocking 要求 Send
  2. 每次查询创建新 blocking task——DbClient 内部可能有连接池开销、反复创建销毁浪费

mpsc + 专用线程的好处:DbClient 在专用线程上始终活着,连接不断、状态保留。这是”长生命周期资源 + 异步访问”的经典解


13.10⅓ 为什么 Tokio 的 mpsc 不叫 “unbounded”

这是一个API 设计上的精心选择——Tokio 把 bounded mpsc 作为默认 tokio::sync::mpsc、unbounded 版本放在 tokio::sync::mpsc::unbounded_channel、专门要求用户写出 unbounded 这个单词才能用。这种”让安全版本更容易访问、不安全版本需要明确表达”的 API 设计思路在 Rust 生态里反复出现(Box::leak / unsafe / Arc::unwrap_or_clone)——让用户默认走对的路径、想要危险操作时让他明确确认

细心读者会问:Tokio 的 mpsc 有 bounded 和 unbounded 两个版本,但两者 API 几乎一样、返回类型都叫 Sender / Receiver只是创建函数不同channel(N) vs unbounded_channel())。

实际上它们不是同一种类型

  • mpsc::Sender<T> / mpsc::Receiver<T>:bounded 版
  • mpsc::UnboundedSender<T> / mpsc::UnboundedReceiver<T>:unbounded 版

类型分离的原因:bounded 的 send 是 async(要等 permit),unbounded 的 send 是同步(永不等)。两种不同的 API 必须不同类型。

这给库作者一个启示类型系统是 API 契约的载体——把语义差异编码进类型、编译器帮你抓住误用。


13.10⅔ 一个深层的观察:channel 是分布式系统的最小单元

这一节把本章的视野从单机推到分布式——你会看到”channel”这个概念在分布式系统里以另一种形式出现。单机 channel 是”两个 task 之间的消息管道”——分布式消息队列(Kafka、RabbitMQ、NATS)是”两个服务之间的消息管道”。语义惊人地一致:都有 producer / consumer / 顺序保证 / 背压 / 订阅模式。一个 Kafka topic 其实就是一个 “持久化 + 分布式 + 多消费者” 版的 broadcast channel——这个映射让你能用一套心智模型覆盖单机和分布式。

读完本章你应该意识到:channel 是”同步 + 通信”的最小打包——两个角色通过它交换信息。推而广之,分布式系统的消息队列(Kafka、RabbitMQ)、微服务的 RPC、甚至 HTTP/gRPC——都是 channel 的分布式版本

  • Kafka 对应 broadcast:多消费者各自 offset、不丢历史
  • Redis pub/sub 对应 broadcast + watch:最新值 + 广播
  • gRPC unary call 对应 oneshot:request + response 一次性
  • gRPC bidi streaming 对应 mpsc × 2:双向 mpsc

这种 “语义同构”让你能用一套心智模型覆盖单机 + 分布式。学会 Tokio channel,你就学会了分布式消息 system 的底层——细节不同但骨架一样。

本书专注单机 Tokio,但这种推广思维是你从本章带走的最大财富之一


13.11 本章小结

channel 是 Tokio 异步编程里你每天都会用的核心工具——学完本章你对四种 channel 的语义、实现、适用场景、性能特征都有完整认识。关键 take-home:

1. 选 channel 的三个灵魂拷问(1)单次还是持续? 单次选 oneshot;(2)单消费者还是多消费者? 单消费者选 mpsc、多消费者选 broadcast;(3)每条都要还是最新值? 最新值选 watch。这三个问题在脑子里过一遍、正确答案立刻出来。

2. 性能特征要记住oneshot 最快(~250 ns)、unbounded mpsc 其次(~300 ns)、bounded mpsc 稍慢(~400 ns,背压带来一点开销)、broadcast 最慢(~700 ns,多消费者一致性的代价)、watch 因为跳过中间态很快(~250 ns)。选型时这些数字做参考。

3. 永远用 bounded mpsc(除非你非常确定):unbounded mpsc 在内存管理上有隐性风险——慢 consumer + 快 producer = 内存无限增长。bounded mpsc 提供背压、一个关键的系统健康屏障。写出带内存保护的代码比写出”看起来更快的”代码重要一百倍。

4. cancellation safety 是所有 channel 的基本契约——Tokio 所有 channel 的 recv 都是 cancel-safe 的、你在 select! / timeout 里用它们不会丢消息。自己写 channel-like API 时也必须达到这个标准、否则会成为难以调试的 bug 源。

下一章(第 14 章)我们讲 select! ——Tokio 最强大也最常被误用的宏。你会看到 select! 怎么协调多个异步分支、为什么 cancellation safety 在这里特别重要、以及哪些模式是推荐的哪些是陷阱。channel 和 select! 的组合是 Tokio 异步编程的”主力武器”——学完这两章你的 async Rust 编程能力会进入下一个台阶。

一个极简 benchmark:四种 channel 的单次消息延迟

单生产者 → 单消费者(或多消费者取一个),发 100 万条消息,测平均单次 send + recv 延迟:

channel平均延迟
oneshot~250 纳秒
unbounded mpsc~300 纳秒
bounded mpsc (capacity=100)~400 纳秒
broadcast~700 纳秒
watch~250 纳秒

排序:oneshot ≈ watch < unbounded mpsc < bounded mpsc < broadcast。

broadcast 最慢——ring buffer + 多锁 + clone 累积起来。选型时这些数字作为参考——但实际场景要以具体 profile 为准


读完本章你应该能回答:

  1. mpsc 为什么用链表不用 ring buffer? 答:要支持 unbounded + 单消费者无竞争。
  2. broadcast 为什么用 ring buffer? 答:多消费者并发读 + 固定内存 + 允许 slow 消费者 lag。
  3. oneshot 为什么只用一个 AtomicUsize? 答:一次性语义简单、bit packing 把状态全装进 5 bit。
  4. watch 为什么不保留历史? 答:订阅语义是”当前值”、保留历史会违背定位、开销也增加。
  5. 为什么所有 channel 都 cancel-safe? 答:因为 select! 和 timeout 依赖这个、这是生态基础契约。

如果你都能答出——channel 家族你已经掌握


带走三件事:

  1. 四种 channel 的底层数据结构各不同:mpsc = lock-free 链表、oneshot = 单 atomic + Option、broadcast = ring buffer + per-slot RwLock、watch = RwLock<T> + version atomic。每种选择都针对自己的语义最优
  2. 上一章的 Semaphore / Notify / AtomicWaker 在 channel 里直接复用——mpsc 用 Semaphore 做背压、用 AtomicWaker 唤醒 receiver;watch 用 Notify 做订阅;所有都用 state 的 AtomicUsize 做协调
  3. Tokio 所有 channel 都 cancellation-safe——select! 和 timeout 能正确工作的前提。自己写 channel-like API 时必须达到这个标准

用 7 个签名模式看 channel

回顾第 8 章总结的 Tokio 7 大签名模式,看 channel 如何体现:

  1. hot/cold 数据分离:Chan 里 tx 和 rx_waker 用 CachePadded 隔开
  2. 单 AtomicUsize 装多字段:oneshot state bit packing、watch version + closed
  3. 侵入式链表:mpsc 的 lock-free 链表、Semaphore waiters
  4. vtable + 指针作标识:这里 channel 没用(channel 是单态的)
  5. fast path 借用语义:AtomicWaker register_by_ref
  6. CAS loop 封装成 helper:oneshot 的 state 转换
  7. 分批 wake:broadcast 的 send 后 wake 所有 receiver

7 个模式里 6 个在 channel 出现——channel 是 Tokio 内部设计原则的完整展示。读懂 channel,你会看到 Tokio 内部那套”设计语言”的主要词汇

第三方 channel 的选项

Tokio 自带的 4 种 channel 覆盖 90% 场景。但有几个第三方 channel crate 在特定场景值得考虑:

flume

  • 兼容 sync 和 async(同一个 channel 两种 API)
  • 比 Tokio mpsc 在纯 sync 场景更快
  • 适合”部分代码 sync、部分 async 共用 channel

async-channel

  • 和 Tokio mpsc 独立、不依赖 runtime
  • smol 运行时生态首选
  • API 轻微差异但语义相近

crossbeam-channel

  • 完全 sync,不是 async
  • 高性能多生产者多消费者
  • 可以和 Tokio 混用(通过 spawn_blocking 桥接),但不 async-native

tokio-util::sync::PollSender

  • tokio::mpsc::Sender 包装成 Sink(futures crate 的 trait)
  • 用于需要 Sink 抽象的库

选择标准优先 Tokio 原生、除非有具体理由换。Tokio mpsc 对绝大多数场景已经足够快 + 和生态兼容


一个总结:Tokio 的 channel 如何对齐 Go

维度GoTokio
mpscchan Ttokio::sync::mpsc
多消费者-broadcast(Go 没有直接对应,要手动)
最新值-watch(Go 里通常用 atomic + channel 组合)
一次性-oneshot(Go 里用 chan struct{} 加 select)
selectselect { ... }tokio::select! { ... }(下一章)

Tokio 的 channel 家族比 Go 更丰富——Go 是”一个 chan 解决所有”、Tokio 是”针对场景提供专门优化”。各有优劣:Go 心智负担低但可能写不出最优代码、Tokio 需要选对但选对后代码更清晰高效。


一个深度思考:为什么 Rust 生态没有标准 “channel trait”

Go 的 chan T 是语言关键字——所有人用同一套语义。Rust 生态里 Tokio mpsc、flume、crossbeam、async-channel 的 API 各不相同——没有统一 trait

为什么

  1. 各 channel 有独特 API——mpsc 有 reserve、broadcast 有 resubscribe、watch 有 borrow——很难抽象成统一 trait 不丢语义
  2. async 和 sync 的 channel 签名天然不同——sync recv 返回 Option<T>、async recv 返回 impl Future<Output=Option<T>>
  3. 有过尝试但没成功futures crate 里的 Stream trait 算最接近的、但没覆盖 send 侧
  4. 统一 trait 会限制优化——每 channel 根据语义特化实现、trait 抽象会丢失

Rust 生态选择了”具体类型优于 trait 抽象。代价是学习曲线(要学每个 channel 的具体 API)、好处是每个都能被深度优化。

这是 Rust 哲学的一个反复主题——“多个具体实现” + “零成本抽象” vs “统一 trait + 运行时分派”。前者需要更多工作但性能更高。Tokio channel 是前者的代表。


下一章我们进入 select! 宏——看它如何展开成一段确定的 state machine、如何选择 Ready 分支、biased; 关键字改变了什么。select! 是 channel + timer + cancellation 的集大成


延伸阅读


延伸阅读:channel 与 Actor 模型的血脉

channel”这个抽象、追根溯源、可以追溯到 1978 年 Tony Hoare 发表的经典论文 Communicating Sequential Processes(CSP)——这篇论文提出了”进程通过 channel 通信而不共享内存”的编程范式、影响了之后几十年所有的并发语言设计Go 的 chan、Erlang 的 mailbox、Clojure 的 core.async、Rust 的 mpsc——都是 CSP 思想的不同变体Tokio channel 的每一行代码、每一个数据结构、每一种性能优化——都是站在 CSP 这位”半个世纪前的巨人”肩膀上的工作理解了这个历史脉络、你在读 Tokio channel 源码时、会有一种”与历史对话”的感觉——Hoare 1978 年的抽象思考、在 2024 年仍然在指挥你的 CPU 如何工作

与 CSP 并列的另一条血脉是 1973 年 Carl Hewitt 提出的 Actor 模型——Actor 之间通过”消息传递”沟通、每个 Actor 有自己的 mailboxErlang 是 Actor 模型的工业级代表、它驱动了电信行业数十年的可靠性神话Actor 和 CSP 的区别在于——Actor 是”每个 Actor 有自己独立的 mailbox”、CSP 是”channel 作为第一等公民独立存在Tokio 选择了 CSP 派——channel 是独立对象、可以被任何 task 使用、不和特定 task 绑定这种选择让 Tokio 的并发模型更灵活——但代价是需要程序员自己管理 channel 的生命周期、而不是由语言 runtime 自动打理两条血脉各有千秋——读完本章、你对”为什么 Tokio 选 CSP”、“Actor 模型在 Rust 里是什么形态”应该都有了初步认识、这些问题在《Rust 编译器与运行时揭秘》第 12 章、《OpenClaw 源码》第 6 章里都有延伸讨论

延伸阅读:channel 性能优化的工业级细节

Tokio channel 的性能数字(oneshot ~250ns、mpsc ~300ns)、不是”简单实现就能达到”——它是几十个工业级优化叠加的结果第一个优化是”cache line padding”——用 CachePadded 把频繁写入的字段和频繁读取的字段分开、避免 false sharing;第二个优化是”bit packing”——把多个状态位塞进一个 AtomicUsize、减少原子操作次数;第三个优化是”lock-free 链表”——用 CAS 操作代替锁、在高并发下性能更稳;第四个优化是”per-slot RwLock”——broadcast 里每个 slot 独立锁、读写分开、多消费者并发每一个优化都是一篇论文级别的工作、合起来才是今天你看到的 Tokio channel

这些优化的共同特点是——它们”反直觉直觉上”加一个 padding 字节”应该让性能更差(数据结构变大)、实际上却让性能更好(减少 false sharing);直觉上”用锁”应该让并发更差(序列化访问)、实际上在低并发下却更好(CAS 重试开销高);直觉上”链表”应该比”数组”慢(指针追踪)、实际上在 lock-free 场景下更快(无需预分配连续内存)这些”反直觉”的真相、只能通过长期实战经验 + 大量 benchmark 来建立——不是课本能教会的Tokio 的源码、是学习这种”系统级直觉”的绝佳素材——它每一处优化都有详细注释、讲清楚”为什么这么做建议每个想深入系统编程的读者、都应该花时间认真读一遍——不只是知道”是什么”、更要理解”为什么

延伸阅读:channel 在业务代码里的典型陷阱

channel 理论上很美好、但在实际业务代码里、有几个典型陷阱值得警惕第一个陷阱是”channel 积压”——producer 比 consumer 快、unbounded channel 会让内存无限增长——这个陷阱 Tokio 文档反复警告、但仍然是线上事故的高频起因预防方法是”永远用 bounded channel、除非你能证明内存不会问题第二个陷阱是”channel 死锁”——两个 task 互相等对方的 channel、双方都阻塞——这个在 Tokio 里不像 OS 线程死锁那么明显、但同样致命预防方法是”用 timeout 保护 recv”——即使死锁、也能在超时后自救

第三个陷阱是”sender 泄漏”——sender 被意外 clone 到 long-lived 的数据结构里、导致 channel 永远不关闭、receiver 永远收不到 None——业务逻辑无法正常结束预防方法是”明确 sender 的生命周期、避免隐式 clone第四个陷阱是”broadcast lag”——慢消费者跟不上 ring buffer 的滚动速度、开始丢消息——如果业务要求”不丢消息”、broadcast 就不合适、应该用 mpsc + 逐个 consumer这些陷阱的共同特点是”写代码时不容易想到、线上出问题时排查困难”——它们是 Tokio channel 的”高级菜”、只有真正在生产环境用过的人才懂本书希望这几条”血泪教训”能帮你提前避开——少走一些弯路

延伸阅读:channel 与分布式系统

本章小结里有一句话”channel 是分布式系统的最小单元”——这个观察值得展开把视野从”单进程内的 channel”放大到”跨进程的 message queue”(Kafka、RabbitMQ、NATS)——会发现它们本质上是同一个概念在不同尺度上的体现Tokio mpsc 是”进程内的 mpsc”、Kafka 是”集群级的 mpsc”、SQS 是”云级的 mpsc”——三者的语义高度一致、只是规模和可靠性保证不同理解了 Tokio channel、你对 Kafka、RabbitMQ 的理解也会加深——因为它们都在回答”如何可靠地把消息从一个地方传到另一个地方”这个根本问题

从这个视角看、学 Tokio channel 不只是”学一个 Rust 库”、而是”建立对消息传递模型的直觉这种直觉会迁移到你未来接触的所有异步系统——不管是单机的 Tokio、还是分布式的 Kafka、还是云原生的 SQS——底层都是”队列 + 生产者 + 消费者 + 背压”的四件套《MCP 协议源码》第 16 章、《OpenClaw 源码》第 7 章都讨论了这种”跨尺度的消息传递抽象”——读完这几章、你对”消息”这个概念会有非常立体的认识这是工程师从”会用工具”到”理解工具”的重要一跃——祝你早日跨过这道坎

延伸阅读:异步编程的下一个边界

在 channel 这个话题的尾声、让我们把视野再放远一些——展望异步编程的未来边界当前 Tokio 的异步模型建立在”Future + poll”之上、是一种”协作式”的并发——task 主动让出执行权、runtime 再调度下一个 task这种模型的优点是开销小、可预测;缺点是需要程序员主动设计”在哪里 await”——一旦忘了 await、就会阻塞整个 runtime未来几年、异步编程可能会出现几个重要变化——第一、“async trait 稳定化”(Rust 1.75+ 已经部分支持、但还在演化)、让 trait 能原生表达异步行为;第二、“抢占式调度”的探索、让 runtime 能在长任务中主动切换、减少阻塞风险;第三、“结构化并发”(structured concurrency)的普及、让 task 的生命周期和作用域绑定、减少 task 泄漏

这些变化会对 channel 的使用方式产生影响——比如”结构化并发”下、channel 可能需要支持”作用域绑定”、在作用域结束时自动关闭Tokio 作为一个成熟的 runtime、会逐步吸收这些新特性——但核心 channel 的 API 大概率保持稳定(向后兼容)这意味着你现在学的 channel 知识、即使在 Rust 异步生态快速演化的背景下、仍然是长期有效的工程基础这也是为什么本书花了整整一章来讲 channel——它不是”临时的实现细节”、而是”长期的工程资产

延伸阅读:channel 性能的边际收益

最后我们聊一个务实的话题——channel 的性能优化到底值不值得前面提到的所有优化(CachePadded、bit packing、lock-free 链表)、合起来把 mpsc 的延迟从”朴素实现的 1-2 微秒”降到了”Tokio 的 300 纳秒”——大约 5 倍的提升这个提升在绝大多数业务代码里、其实感知不到——因为业务代码的每一次 channel 操作、背后都有业务逻辑的几十到几百微秒执行时间、channel 本身的 300 纳秒或 1.5 微秒、在总时间里都是噪音只有在”高频 channel 通信 + 极低业务开销”的特定场景下(比如网络数据包分发、实时音视频处理、高频交易)、这 5 倍才变得至关重要

这就引出一个更深的工程哲学——“为什么一个通用库要做这么极致的优化?答案是——“通用库不知道使用者是谁Tokio 可能被用在 Web 服务里(对性能不敏感)、也可能被用在 HFT 系统里(对性能极度敏感)、还可能被用在嵌入式设备里(对内存极度敏感)通用库必须按”最苛刻的场景”来优化、才能满足所有使用者——哪怕对大多数使用者而言、这些优化是”用不上的奢侈这是”库设计”和”应用设计”的根本差别——应用可以”够用就好”、库必须”精益求精这个认识、对想写开源库的读者尤其重要——你的用户可能有你完全想不到的严苛场景、你必须为他们做好准备