Skip to content

第16章 Listener 与 Executor:可插拔的传输与调度

第 15 章讲完了 axum::serve 的运行时骨架:accept-loop、handle_connection、graceful shutdown。但 serve 的签名里有两个泛型参数值得单独展开:L: Listener 决定 accept 怎么拿连接、E: Executor 决定 spawn 任务怎么调度。这两个扩展点让 serve 能适配各种部署场景:纯 HTTP、HTTPS、Unix socket、带 tracing 的任务调度、带资源限制的运行时。

这一章拆开这两个 trait,看它们的 API 契约、axum 提供的默认实现、和如何自定义。

Listener 与 Executor 的角色

先用一张图厘清两者在 serve 流程里的位置:

两者是 serve 的两个独立接口

  • Listener 决定 连接从哪来——可以是 TCP、Unix socket、TLS-wrapped TCP、甚至内存中的 mock stream(测试用)
  • Executor 决定 任务往哪调度——默认是 tokio::spawn,可以替换成带 instrumentation 的、限并发的、或者自定义 runtime 的

两者正交——可以独立替换。比如 TLS + 默认 executor、TCP + 自定义 executor、Unix socket + tracing executor。这种正交设计让 axum 能适应各种部署需求而不需要改 serve 核心代码。

为什么要把 Listener 和 Executor 抽象出来

axum 完全可以写成硬编码:TcpListener::accept + tokio::spawn——两行代码覆盖 90% 场景。不抽象成 trait 的 framework 也很多(Express、Flask 都这样)。但 axum 选择 trait 抽象,有几个实际考虑:

一、HTTPS 部署普遍存在:生产 HTTP server 几乎都要 TLS。如果 axum 硬编码 TcpListener,TLS 部署就必须用 axum-server 这样的 alternative 库——生态分叉。抽象出 Listener 让 TLS 只是"另一种 Listener impl"——axum::serve 能直接用。

二、Unix socket 和特殊部署:一些部署方案(systemd socket activation、container-to-container IPC)用 Unix domain socket。TcpListener 硬编码下不能用。Listener 抽象让 UnixListener 也是 first-class 公民。

三、测试友好:mock listener 让单元测试不需要真实 TCP——更快、更可重复、能精确控制连接的时序和状态。

四、instrument 和资源限制:Executor 抽象让用户能统一给所有 spawn 的 task 加 tracing、限并发、切 runtime——没这个抽象、想加这些能力只能 monkey-patch 或 fork。

这四点都不是大问题(都有变通方案),但抽象后 解法从"分叉 library"变成"换一个 impl"——生态成本小了很多。这是 trait 抽象的真实价值:不是为了哲学性优雅、而是为了降低生态分裂的风险。

Listener trait 定义

axum/src/serve/listener.rs:18-33

rust
// axum/src/serve/listener.rs:18-33
pub trait Listener: Send + 'static {
    type Io: AsyncRead + AsyncWrite + Unpin + Send + 'static;
    type Addr: Send;

    fn accept(&mut self) -> impl Future<Output = (Self::Io, Self::Addr)> + Send;
    fn local_addr(&self) -> io::Result<Self::Addr>;
}

接口极简——两个关联类型、两个方法。几个设计细节:

type Io: AsyncRead + AsyncWrite + Unpin:连接返回的 IO 对象——必须是双向可读写、并且 Unpin(可以 move 进 Box::pin)。TCP stream、Unix stream、TLS stream 都满足。

type Addr: Send:连接的远端地址。TCP 是 SocketAddr、Unix 是 tokio::net::unix::SocketAddrSend 让 Addr 能跨任务——比如第 8 章讲的 ConnectInfo<SocketAddr> 把 addr 从 accept 任务传到 handler 任务。

accept(&mut self) -> impl Future<Output = (Io, Addr)>:返回 Future 产出 IO + addr 元组。注意没有 Result——accept 错误必须在 Listener 内部处理(通常 sleep 重试)。这是第 15 章讨论的"accept 错误对 serve 不可见"的设计根源——Listener 吞错而不是向上抛。

local_addr():查本地绑定的地址。用处:serve 启动后日志里 print 实际监听端口(特别当用 0.0.0.0:0 随机端口时)。

trait 的上界 Send + 'static 让 Listener 能被 move 进 async task——axum::serve 内部需要这种能力。

为什么 accept 不返回 Result

从 TcpListener 的 impl 能看到原因(listener.rs:39-46):

rust
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
    loop {
        match Self::accept(self).await {
            Ok(tup) => return tup,
            Err(e) => handle_accept_error(e).await,
        }
    }
}

内部是一个 loop——成功返回、失败调 handle_accept_error(sleep 1 秒)继续 loop。trait 的 accept 签名不返回 Result 是因为这个 loop 必须在 Listener 内部——serve 不想也不能知道怎么处理不同 Listener 的 accept 错误(TCP 的 EMFILE、Unix 的 permission denied、TLS 的握手失败)。

每种 Listener 的 accept 错误处理逻辑不一样,封装到 impl 内部是正确的设计。axum::serve 只要求 Listener 最终"产出"一个连接——怎么产出、失败几次、重试多久,都是 Listener 自己的事。

TcpListener 与 UnixListener 的 impl

axum/src/serve/listener.rs:35-72 给出标准实现:

rust
// axum/src/serve/listener.rs:35-52
impl Listener for TcpListener {
    type Io = TcpStream;
    type Addr = std::net::SocketAddr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        loop {
            match Self::accept(self).await {
                Ok(tup) => return tup,
                Err(e) => handle_accept_error(e).await,
            }
        }
    }

    fn local_addr(&self) -> io::Result<Self::Addr> {
        Self::local_addr(self)
    }
}

#[cfg(unix)]
impl Listener for tokio::net::UnixListener {
    type Io = tokio::net::UnixStream;
    type Addr = tokio::net::unix::SocketAddr;
    // ... 类似
}

TcpListener 和 UnixListener 是 axum 原生支持的两种传输。Windows 下 UnixListener 通过 #[cfg(unix)] 不编译——Windows 上 Unix socket 支持不完整(虽然 Windows 10 后开始有)。

handle_accept_error 的细节

listener.rs:245-272 的错误处理:

rust
async fn handle_accept_error(e: io::Error) {
    if is_connection_error(&e) {
        return;
    }
    error!("accept error: {e}");
    tokio::time::sleep(Duration::from_secs(1)).await;
}

fn is_connection_error(e: &io::Error) -> bool {
    matches!(
        e.kind(),
        io::ErrorKind::ConnectionRefused
            | io::ErrorKind::ConnectionAborted
            | io::ErrorKind::ConnectionReset
    )
}

两类错误分别处理:

连接级错误ConnectionRefused / Aborted / Reset):客户端连接到一半主动断了——accept 还来不及返就失败。这是正常现象(客户端关窗口、网络抖动),直接 return 继续下一次 accept,不 sleep 不 log——避免 log spam。

其他错误(EMFILE、I/O 错误、系统级问题):log 并 sleep 1 秒——防止错误循环刷 CPU 也给 OS 恢复时间。EMFILE(文件描述符耗尽)是最常见的严重情况——sleep 让其他 task 有机会 drop 连接释放 fd。

这套处理是 hyper 0.14 的遗产(源码里的注释)——多年生产经验得出的策略。axum 沿用。

ListenerExt:组合子

ListenerExt 是 blanket impl 给所有 Listener 的扩展方法:

rust
// axum/src/serve/listener.rs:75-128
pub trait ListenerExt: Listener + Sized {
    fn limit_connections(self, limit: usize) -> ConnLimiter<Self> { ... }
    fn tap_io<F>(self, tap_fn: F) -> TapIo<Self, F>
    where F: FnMut(&mut Self::Io) + Send + 'static,
    { ... }
}

impl<L: Listener> ListenerExt for L {}

任何实现了 Listener 的类型自动获得 limit_connectionstap_io 方法——熟悉的 Rust "extension trait" 模式。组合子不改变原 listener 行为、而是包装一层新 Listener。

ConnLimiter:基于 Semaphore 的连接数上限

listener.rs:129-151

rust
pub struct ConnLimiter<T> {
    listener: T,
    sem: Arc<Semaphore>,
}

impl<T: Listener> Listener for ConnLimiter<T> {
    type Io = ConnLimiterIo<T::Io>;
    type Addr = T::Addr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        let permit = self.sem.clone().acquire_owned().await.unwrap();
        let (io, addr) = self.listener.accept().await;
        (ConnLimiterIo { io, permit }, addr)
    }

    fn local_addr(&self) -> tokio::io::Result<Self::Addr> {
        self.listener.local_addr()
    }
}

核心机制:

一、先拿 permit 再 acceptsem.acquire_owned().await 等一个 Semaphore permit——如果当前 permit 都被占了会阻塞。只有拿到 permit 才真正调 inner listener 的 accept。这让连接数上限 = permit 数

二、permit 绑定到 Io:返回的 Io 是 ConnLimiterIo<T::Io>,内部持有 OwnedSemaphorePermit。当这个 Io 被 drop 时(连接关闭),permit 被 drop 释放回 Semaphore——其他等待的 accept 可以继续。

三、ConnLimiterIo 是 transparent wrapper:AsyncRead / AsyncWrite 都直接 forward 给 inner。对用户完全透明。

这种"permit 绑定生命周期到连接对象"的模式很巧妙——不需要额外的"连接关闭"通知机制,Rust 的 Drop 自然触发 permit 释放。

用法:

rust
use axum::serve::ListenerExt;

let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
let listener = listener.limit_connections(1000);

axum::serve(listener, app).await.unwrap();

超过 1000 并发后新连接在 accept 阻塞——OS 的 listen backlog 开始排队(默认 128,Linux 下由 somaxconn 控制)。backlog 满后客户端会收到 ECONNREFUSED。

和 Tower 的 ConcurrencyLimit 的区别:

  • ConnLimiter:限连接数。空闲连接也占名额
  • ConcurrencyLimit:限 inflight 请求数。空闲连接不占(只是不被处理)

两者互补。想精细控制后端资源用 ConcurrencyLimit;想限 fd 占用用 ConnLimiter。

TapIo:连接级钩子

listener.rs:207-243

rust
pub struct TapIo<L, F> {
    listener: L,
    tap_fn: F,
}

impl<L, F> Listener for TapIo<L, F>
where L: Listener, F: FnMut(&mut L::Io) + Send + 'static,
{
    type Io = L::Io;
    type Addr = L::Addr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        let (mut io, addr) = self.listener.accept().await;
        (self.tap_fn)(&mut io);
        (io, addr)
    }
    // ...
}

每次 accept 拿到 Io 后调用户闭包——最常用设 TCP_NODELAY 或其他 socket 选项:

rust
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap()
    .tap_io(|stream| {
        if let Err(err) = stream.set_nodelay(true) {
            tracing::trace!("failed to set TCP_NODELAY: {err:#}");
        }
    });

TCP_NODELAY 禁用 Nagle 算法——让小包立即发送(不等批量)。对 HTTP/1.1 的交互式响应特别重要——省掉的几十 ms 延迟对用户感知明显。axum 不默认开(因为不是所有场景都适合),但是极少场景不该开——几乎应该在所有 HTTP server 上开。

其他常用 socket 选项:

  • SO_KEEPALIVE:TCP level 的 keepalive probe——探测死连接
  • SO_REUSEPORT:多进程共享同一端口(socket activation、zero-downtime restart)
  • 缓冲区大小 (SO_SNDBUF / SO_RCVBUF):高吞吐场景可能要调

tap_io 是 axum 给用户"在 accept 后、服务开始前"的唯一钩子——所有连接级配置都在这里做。

ConnLimiterIo 的 pin_project 技巧

ConnLimiterIo 不只是包 permit + io、还要实现 AsyncRead + AsyncWrite 让 hyper 能直接读写这个对象。源码用 pin_project! 宏:

rust
// axum/src/serve/listener.rs:153-163
pin_project! {
    pub struct ConnLimiterIo<T> {
        #[pin]
        io: T,
        permit: OwnedSemaphorePermit,
    }
}

impl<T: AsyncRead> AsyncRead for ConnLimiterIo<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut io::ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        self.project().io.poll_read(cx, buf)
    }
}

pin_project! 宏做了两件事:

一、宣告 #[pin]io 字段需要 pin projection(因为 T 可能是 !Unpin 的 future)、permit 不需要(OwnedSemaphorePermit 是 Unpin) 二、生成 project() 方法:把 Pin<&mut Self> 转成 ConnLimiterIoProj { io: Pin<&mut T>, permit: &mut OwnedSemaphorePermit }——结构化访问

ConnLimiterIo 的 poll_read 就是 self.project().io.poll_read(...)——把 poll 透明转发给 inner io。transparent forwarding 这种 "wrap without interference" 是 AsyncRead/AsyncWrite 实现的常见 pattern——axum 源码里还能看到 TapIo、TLS wrapper 也是类似结构。

如果自己写 Listener wrapper 需要修饰 io 类型,照着 ConnLimiterIo 的 pin_project 结构抄就行——三个字段(原 io #[pin] + 任何辅助数据)、forward 所有 AsyncRead/AsyncWrite 方法。

自定义 Listener:TLS 封装

生产里想让 axum 直接跑 HTTPS,最常见做法是用 axum-server crate 或自己写一个 TLS Listener。自己写的骨架:

rust
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use axum::serve::Listener;

pub struct RustlsListener {
    inner: TcpListener,
    acceptor: TlsAcceptor,
}

impl Listener for RustlsListener {
    type Io = TlsStream<TcpStream>;
    type Addr = std::net::SocketAddr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        loop {
            match self.inner.accept().await {
                Ok((tcp, addr)) => {
                    match self.acceptor.accept(tcp).await {
                        Ok(tls) => return (tls, addr),
                        Err(e) => {
                            tracing::warn!("TLS handshake failed: {e}");
                            // continue; 下一次 accept
                        }
                    }
                }
                Err(e) => {
                    tracing::error!("accept error: {e}");
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                }
            }
        }
    }

    fn local_addr(&self) -> std::io::Result<Self::Addr> {
        self.inner.local_addr()
    }
}

几个要点:

一、TLS 握手可能失败:客户端证书错、TLS 版本不兼容、SNI mismatch——都会让 acceptor.accept(tcp) 返 Err。失败后不 sleep,继续下一次 accept——TLS 握手失败和 TCP accept 失败性质不同:前者是常态(偶尔会有扫描器连过来)、后者是异常(fd 耗尽)。

二、log level:TLS 握手失败用 warn!(偶发正常),TCP accept 失败用 error!(严重)。分级让告警能精确触发。

三、返回的 Io 是 TlsStream<TcpStream>:AsyncRead/AsyncWrite 都透明转发给底层 TLS stream——hyper 从上层读写字节时自动走 TLS 解密/加密。

四、type Addr = SocketAddr:TLS 不改变 TCP 层的 addr——连接的远端地址还是 TCP socket 的 peer_addr。

这个骨架是 axum-server 内部实现的简化版——真实代码还处理 ALPN(HTTP/2 协商)、session 复用等细节。用 axum-server 不用自己写这些——但理解它的 Listener impl 能让你在需要时自己实现。

更多 Listener 案例:PROXY protocol 与双栈

生产环境里还有几种常见的 Listener 需求。

PROXY protocol:真实客户端 IP 传播

当 axum 跑在 AWS NLB / HAProxy 等 L4 负载均衡之后,需要 PROXY protocol 传递真实客户端 IP。PROXY protocol 在 TCP 连接的前几个字节塞一个头:

text
PROXY TCP4 192.168.1.1 10.0.0.1 56324 443\r\n

axum 不支持 PROXY protocol——需要自己写 Listener wrap:

rust
use proxy_protocol::{parse, ProxyHeader};

pub struct ProxyProtocolListener<L> {
    inner: L,
}

impl<L> Listener for ProxyProtocolListener<L>
where L: Listener, L::Io: tokio::io::AsyncReadExt + Unpin,
{
    type Io = L::Io;
    type Addr = std::net::SocketAddr;  // 真实客户端地址

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        loop {
            let (mut io, _inner_addr) = self.inner.accept().await;
            // 读 PROXY header
            match parse_proxy_header(&mut io).await {
                Ok(real_addr) => return (io, real_addr),
                Err(_) => continue,  // 畸形 header, 丢弃连接
            }
        }
    }

    fn local_addr(&self) -> std::io::Result<Self::Addr> { /* ... */ }
}

这种 "pre-TCP-data 解析" 的 wrapper 让 axum 完全兼容 L4 负载均衡——不用改 handler、直接在 Listener 层解决。

双栈 IPv4 + IPv6

想同时监听 IPv4 和 IPv6、或者多地址——写一个 Merged Listener:

rust
pub struct MergedListener<A, B> {
    a: A,
    b: B,
}

impl<A, B> Listener for MergedListener<A, B>
where
    A: Listener, B: Listener<Io = A::Io, Addr = A::Addr>,
{
    type Io = A::Io;
    type Addr = A::Addr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        tokio::select! {
            conn = self.a.accept() => conn,
            conn = self.b.accept() => conn,
        }
    }

    fn local_addr(&self) -> std::io::Result<Self::Addr> {
        self.a.local_addr()  // 返回第一个 listener 的地址
    }
}

tokio::select! 轮流 poll 两个 inner listener——谁先有连接就返回谁的。这让一个 axum::serve 能处理多个地址的请求——适合"同时绑 127.0.0.1 和外部 IP"的混合场景。

Executor trait:任务调度的扩展点

第 15 章已经介绍过 Executor trait,这里深入——

rust
// axum/src/serve/mod.rs:159-165
pub trait Executor: Clone + Send + Sync + 'static {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where
        Fut: Future + Send + 'static,
        Fut::Output: Send + 'static;
}

只一个方法——接收 future、spawn 它、返回 JoinHandle。

默认实现 TokioExecutor

rust
impl Executor for TokioExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        tokio::spawn(fut)
    }
}

纯 tokio::spawn 转发。

什么 future 会走 execute

axum::serve 在几个地方调 executor.execute(fut)

  1. 每个新连接handle_connection 里 spawn hyper 的 connection serve loop
  2. graceful shutdown signal 监听:WithGracefulShutdown 里 spawn 一个任务 await 用户的 signal future
  3. hyper 内部Builder::new(HyperExecutor(executor.clone()))——HTTP/2 的 stream 并发也靠 executor spawn subtask

所有 spawn 都经过 Executor——这是扩展点的威力。一个自定义 executor 可以统一影响这些 spawn 的行为。

实战一:tracing instrument

最常用的 executor 定制——把每个任务自动包进 tracing span:

rust
#[derive(Clone)]
struct InstrumentedExecutor;

impl Executor for InstrumentedExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        let span = tracing::info_span!("axum.serve.task");
        tokio::spawn(fut.instrument(span))
    }
}

axum::serve(listener, app).with_executor(InstrumentedExecutor).await;

每个连接任务自带一个 "axum.serve.task" span——handler 里的 tracing log 都会有这个 span 作父——整个请求的 tracing 链条自然形成。

实战二:任务级 metrics

Prometheus 指标统计"当前 spawn 的任务数":

rust
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

#[derive(Clone)]
struct MetricExecutor {
    active: Arc<AtomicUsize>,
}

impl Executor for MetricExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        let active = self.active.clone();
        active.fetch_add(1, Ordering::Relaxed);
        tokio::spawn(async move {
            let result = fut.await;
            active.fetch_sub(1, Ordering::Relaxed);
            result
        })
    }
}

active 是"当前活跃任务数"的计数——spawn 时加一、完成时减一。配合 Prometheus gauge 定时读取——生产监控能看到实时连接和内部 subtask 的数量。

实战三:任务优先级调度

如果用带优先级的 runtime(比如 tokio 的 custom scheduler),可以让 executor 把所有 spawn 走 low-priority pool、给 CPU-critical 任务留出资源:

rust
#[derive(Clone)]
struct PriorityExecutor {
    runtime: Arc<tokio::runtime::Handle>,
}

impl Executor for PriorityExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        self.runtime.spawn(fut)  // 在特定 runtime 里 spawn
    }
}

适合 "多 runtime" 架构——比如主 runtime 跑关键任务、axum::serve 用 off-thread runtime 跑网络 I/O。

实战四:限制 spawn 并发数

默认 tokio::spawn 不限制并发——可能启动无限 task。某些 workload 里想"最多 spawn N 个任务、再多就排队":

rust
use tokio::sync::Semaphore;
use std::sync::Arc;

#[derive(Clone)]
struct BoundedExecutor {
    sem: Arc<Semaphore>,
}

impl Executor for BoundedExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        let sem = self.sem.clone();
        tokio::spawn(async move {
            let _permit = sem.acquire_owned().await.unwrap();
            fut.await
        })
    }
}

和 ConnLimiter 类似的 pattern:spawn 内部先 acquire permit、超过上限的 task await 在 permit 上。好处是保护后端资源(数据库连接、内存)——虽然连接被 accept 了、handler 执行被排队。

注意:这和 ConnLimiter 的差别——ConnLimiter 是不 accept、BoundedExecutor 是accept 了但 spawn 前等 permit。前者让客户端感到连接被拒(可选择其他实例),后者让客户端等(可能超时)——根据 workload 选。

实战五:切换 runtime

如果主应用用了 current-thread runtime(为了特殊场景的确定性调度)、而 axum::serve 希望跑在 multi-thread runtime 上——可以通过 executor 实现:

rust
#[derive(Clone)]
struct MultiThreadExecutor {
    handle: tokio::runtime::Handle,
}

impl Executor for MultiThreadExecutor {
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        self.handle.spawn(fut)  // 特定 runtime handle 的 spawn
    }
}

用法:

rust
let mt_runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let mt_handle = mt_runtime.handle().clone();
let executor = MultiThreadExecutor { handle: mt_handle };

// 主程序用 current thread runtime、但 serve 的 task spawn 去 multi thread
axum::serve(listener, app).with_executor(executor).await;

少见但实用的模式——适合混合 workload(比如游戏服务端主逻辑单线程、HTTP API 多线程并发)。

Arc<T: Executor> 的 blanket impl

mod.rs:184-194Arc<T> 实现了 Executor:

rust
impl<T> Executor for Arc<T>
where T: Executor,
{
    fn execute<Fut>(&self, fut: Fut) -> JoinHandle<Fut::Output>
    where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
    {
        self.as_ref().execute(fut)
    }
}

让用户能用 Arc<MyExecutor> 传给 with_executor。好处:MyExecutor 里有昂贵字段时,Clone::clone 变成 Arc::clone 原子加一、不真正深拷贝。对携带连接池、metrics registry 等的 executor 重要。

HyperExecutor wrapper

axum::serve 内部用 HyperExecutor 适配——把自定义 Executor 包装成 hyper 认识的 hyper::rt::Executor

rust
// 第 15 章 mod.rs 里看到的 wrapper
struct HyperExecutor<E>(E);

impl<E: Executor, Fut> hyper::rt::Executor<Fut> for HyperExecutor<E>
where Fut: Future + Send + 'static, Fut::Output: Send + 'static,
{
    fn execute(&self, fut: Fut) {
        drop(self.0.execute(fut));
    }
}

关键点:

  • drop(self.0.execute(fut)):axum 的 executor 返回 JoinHandle,但 hyper 的 executor 返回 ()——丢掉 handle(fire-and-forget)
  • 这让 hyper 内部 spawn 的 task(HTTP/2 的 stream 并发、keepalive 相关)也走用户的 executor——tracing span 能覆盖到 hyper 内部 task

这是 executor 抽象真正的威力——不只 axum 自己的任务、连底层 hyper 也能受控于同一个 executor。如果你的 executor 加了 tracing、metrics、priority——所有相关的 spawn 都会受益。

组合使用:生产级栈

把 Listener 和 Executor 的能力组合成一个生产配置:

rust
use axum::serve::ListenerExt;
use std::time::Duration;
use std::sync::Arc;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let app = build_router();

    // 1. 基础 TCP listener
    let base = TcpListener::bind("0.0.0.0:3000").await.unwrap();

    // 2. 连接级配置 + 连接数限制
    let listener = base
        .tap_io(|tcp| {
            let _ = tcp.set_nodelay(true);
        })
        .limit_connections(10_000);

    // 3. 自定义 executor 加 tracing
    let executor = InstrumentedExecutor;

    // 4. serve 三件事合一
    axum::serve(listener, app)
        .with_executor(executor)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

这个栈做了四件事:

  1. TCP_NODELAY:每连接启用 Nagle 算法禁用——HTTP 响应立即发
  2. 连接数上限 10000:防止 fd 耗尽
  3. tracing 自动 span:所有 serve 任务带上 tracing 链
  4. graceful shutdown:收到 signal 优雅关闭

每一层都是可选的——默认 axum::serve 只做最基础的 TCP accept。生产想要什么能力就加哪一层——API 组合性很好。

想要全部能力时的终极栈

rust
let listener = TcpListener::bind(addr).await?
    .tap_io(|tcp| { let _ = tcp.set_nodelay(true); })
    .limit_connections(10_000);

// 或者带 TLS:
// let listener = RustlsListener::new(base_listener, tls_config)
//     .tap_io(...)           // tap_io 能套在 RustlsListener 外
//     .limit_connections(10_000);

axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>())
    .with_executor(InstrumentedExecutor)
    .with_graceful_shutdown(shutdown_signal())
    .await?;

这基本上是一个成熟生产服务该有的配置——TCP 选项、连接上限、ConnectInfo、tracing、graceful shutdown。

Listener / Executor 的实现经验

给自己写 Listener 或 Executor 的几个要点:

Listener

  1. accept 内部 loop 处理错误:不要让 trait 方法返回 Err——loop + sleep 是标准做法
  2. IO 类型要 AsyncRead + AsyncWrite + Unpin + Send + 'static:hyper 需要这些。大部分 tokio / rustls / native-tls 的类型都满足
  3. local_addr 该实现:让用户能 print 本地绑定地址——特别当用随机端口时
  4. 考虑 wrap 现有 Listener:新功能优先做成 listener wrapper(像 TapIo),比从头实现简单

Executor

  1. Clone + Send + Sync + 'static:多个地方需要克隆 executor
  2. execute 必须真的 spawn:不能同步执行 future——否则 accept-loop 被阻塞
  3. JoinHandle 该返回:axum 可能会 drop 它(fire-and-forget),但 trait 签名要求返回
  4. 考虑错误处理:executor 本身不处理任务 panic——tokio::spawn 的默认行为(panic 被 JoinError 捕获、task 结束)合适大多数场景

如果你写的 Listener 或 Executor 有通用性,贡献回 axum 或发布成独立 crate——两个 trait 的设计就是为了生态扩展。

部署模式全景

把 Listener 和 Executor 的组合能力放到真实部署场景里看:

五种常见部署模式对应不同 Listener / Executor 组合:

  • 裸 HTTP:开发或内网。TcpListener + default executor
  • LB 后:生产标配。TcpListener + ProxyProtocol 读真实 IP + tap_io 设 TCP 选项
  • 自跑 TLS:不想依赖外部 TLS。RustlsListener 套 TcpListener
  • Kubernetes:容器部署。连接数限制 + tracing executor + graceful shutdown
  • Unix socket:进程间通信。UnixListener

每种都用一样的 axum::serve 入口——只换 Listener/Executor 类型。这种"一套 API、多种部署"的灵活性是 trait 抽象的直接收益。

Listener 的单元测试

自定义 Listener 的测试非常直接——源码里 listener.rs:274-322 给了一个 limit_connections 的完整测试:

rust
#[tokio::test(start_paused = true)]
async fn limit_connections() {
    static COUNT: AtomicUsize = AtomicUsize::new(0);

    struct MyListener;
    impl Listener for MyListener {
        type Io = io::DuplexStream;
        type Addr = ();

        async fn accept(&mut self) -> (Self::Io, Self::Addr) {
            COUNT.fetch_add(1, Ordering::SeqCst);
            (io::duplex(0).0, ())
        }

        fn local_addr(&self) -> io::Result<Self::Addr> { Ok(()) }
    }

    let mut listener = MyListener.limit_connections(1);

    let conn1 = listener.accept().await;
    assert_eq!(COUNT.load(Ordering::SeqCst), 1);

    // 第二次 accept 应该被 block——超过 limit 了
    time::timeout(time::Duration::from_secs(1), listener.accept())
        .await
        .expect_err("Second 'accept' should time out.");

    drop(conn1);  // 释放 permit

    // 第三次 accept 能继续
    let _conn2 = listener.accept().await;
    assert_eq!(COUNT.load(Ordering::SeqCst), 2);
}

几个测试技巧:

#[tokio::test(start_paused = true)]:tokio 的"虚拟时间"模式——tokio::time::sleep 不真等、timeout 能精确控制虚拟流逝。测试不会因为真实时钟慢而变慢。

mock Listener 用 DuplexStreamtokio::io::duplex(0) 创建一对内存 stream——不需要真实 TCP。

原子计数器验证:用 AtomicUsize::fetch_add 计 accept 被调次数——断言 ConnLimiter 确实阻止了额外的 accept。

这种测试能在毫秒内跑完——比起真实 TCP 测试快几个数量级。自己写 Listener 实现时建议照这个 pattern 写测试。

Executor 的演进

axum 0.8 加入 Executor trait——之前的版本是什么样?

axum 0.5-0.7:直接 tokio::spawn(fut)。不能自定义。想 tracing instrumentation 要用 monkey patch 或者 fork axum。

axum 0.8:引入 Executor trait、默认 TokioExecutorwith_executor 方法添加。

可能的未来:社区在讨论要不要让 executor 也处理 panic——目前 panic 在 tokio::spawn 的 JoinHandle 里用 JoinError 暴露、axum 自己丢掉 handle 所以看不到。如果 executor 能参与 panic 处理、就能做到"executor 级别的 panic hook"。但这个功能还在讨论。

Executor 引入的根本动机来自社区反馈:"怎么给 axum 的所有 spawn 加 tracing?"——人人都有这个需求、没统一解。Executor trait 是对这个需求的 canonical answer——所有 spawn 经过一个用户可控的 hook。

这个演进路径反映了 framework 的成熟过程:先做简单的(直接 tokio::spawn)、遇到用户需求(instrumentation)、提炼出扩展点(Executor trait)、社区贡献生态(各种 Executor 实现)。

tokio runtime 与 Executor 的关系

一个 axum::serve 跑在什么 runtime 上? 取决于 executor 和你的 main 函数。

#[tokio::main] 默认创建 multi-thread runtime(按 CPU 核数创建 worker)。TokioExecutortokio::spawn 把 task 投到这个 runtime 的 work queue。

rust
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() {
    // 4 个 worker 线程的 runtime
    axum::serve(listener, app).await;
    // serve 的每个 connection task 在这 4 个 worker 上调度
}

想单线程:

rust
#[tokio::main(flavor = "current_thread")]
async fn main() {
    axum::serve(listener, app).await;
    // 所有 task 共享 main 线程、并发靠 async 切换
}

current_thread 适合 CPU 轻量的 I/O-bound 服务——没跨线程开销、锁少。multi_thread 适合 CPU 密集或需要并行加速——但每个 task 可能跨线程(Send bound 要满足)。

自定义 Executor 可以跳出这个 default——比如前面讲的 MultiThreadExecutor 用独立的 multi-thread runtime 给 axum::serve、其他代码用 main runtime。这种细粒度控制是生产 tuning 的空间。

ListenerExt 组合的正确顺序

limit_connectionstap_io 能链式调用——但顺序决定行为:

rust
// 顺序 A:先 tap, 后 limit
let listener = base.tap_io(|tcp| tcp.set_nodelay(true).ok()).limit_connections(1000);

// 顺序 B:先 limit, 后 tap
let listener = base.limit_connections(1000).tap_io(|tcp| tcp.set_nodelay(true).ok());

顺序 A 和 B 的行为都合理——但语义不同:

  • A:所有连接都先被 tap(设 NODELAY)、然后进 limiter——即便连接数满了、NODELAY 也设过了
  • B:只有进了 limiter 的连接才被 tap——limit 满时连接根本不被 accept

实用上两者几乎等价——NODELAY 设了没设对被 limiter 拒绝的连接无所谓。但有些 tap 操作更重(比如 TLS 握手、PROXY protocol parse)——放 limiter 外会被拒绝的连接也走一遍、浪费资源。

经验:"轻量操作"放外(tap_io 设 socket 选项)"重操作"放内(实际握手、解析)。这让 limiter 起到"保护作用"——快速拒绝超出容量的连接、不浪费资源。

Listener 和 Tower MakeConnection 的对比

Tower 生态里还有一个类似抽象:tower::make::MakeConnection——是给 HTTP client 用的"连接工厂"。两者的职责截然相反:

Aspectaxum::Listenertower::MakeConnection
场景Server 端Client 端
作用接受连接发起连接
主方法accept() -> (Io, Addr)make_connection(req) -> Connection
应用axum::serve、axum-serverhyper client、tonic client

两者的结构很像——都是"连接工厂"——但 server 端和 client 端的 workflow 正好对称。Tower 的完整生态包括双向——server 端接受、client 端发起——axum 只关心 server 端、使用 tower 的 Service 抽象(client 可以用 tower + hyper-util)。

这种对称性在 Rust HTTP 生态里很有特色——库之间接口对齐、组合性强。

一个常被忽视的细节:Listener drop 的行为

axum::serve 的 run 函数里有一行:

rust
drop(listener);

在 graceful shutdown 的 accept-loop 退出后、close_tx.closed().await 之前——主动 drop listener。为什么重要?

Listener drop 会关闭底层 socket——操作系统层面 "stop listening on this port"。这让重启时新的 axum 实例能立刻 bind 同一端口——不会因为 "address already in use" 失败。

如果不 drop、让 listener 等到整个 serve 函数结束才释放——在某些实现下(ConnLimiter 持有 Arc 等)可能延迟释放、让新实例的 bind 需要 SO_REUSEADDR 兜底。

这个细节对自己写 Listener 的人有启发:Drop 语义要和"停止接受连接"对齐——用户 drop listener 意思是"不要再接了",不是"保留资源"。

关于自定义 Listener 的发布建议

如果你的自定义 Listener 有通用性(TLS、PROXY protocol、特殊协议),考虑发布成 crate 而不是 "放在项目本地"。几个好处:

  1. 社区复用:其他 axum 用户可能需要同样功能
  2. 更多测试:依赖它的项目会帮你发现 corner case
  3. API 设计打磨:被多个项目用后会催生更 ergonomic 的 API
  4. 贡献回 axum 或 axum-extra:axum-extra 愿意接收通用的 middleware / listener——减少每个项目的自定义负担

Rust 生态的 crate 门槛很低(cargo publish 几分钟)——勇于把内部工具提炼成公共 crate 的项目让整个生态都受益。Listener 这个抽象本身就是社区贡献的——axum 从 0.7 加入、结合了若干社区项目的经验。

其他 axum 生态 crate 里的 Listener 实现可以参考:axum-server(TLS)、tokio-listener(UDS + 各种协议)、listenfd(从 fd 继承)。各有特色——读它们的源码是最好的学习资料。

跨书关联:Tower Service、tokio IO

Listener 和 Executor 这两个抽象在 Rust 异步生态里比较独特——大多数库不单独定义这些 trait。但抽象背后的思想是通用的:

Listener = "产出连接的 source" 抽象——Tower 生态里类似的有 make_service_fn(但 Tower 不强求 Listener 提供 IO 类型约束)。hyper 的 hyper::server::accept::Accept 是另一个类似抽象——但签名不同。axum 的 Listener 基本上是 hyper 0.14 时代 Accept 的继承者——调整了 API 更贴合 axum 的需求。

Executor = "spawn 抽象"——类似的有 hyper 的 hyper::rt::Executor、async-task 的 async_task::Runnable。不同库用不同的抽象名字,都干同样的事。axum 的 Executor trait 是专门为 axum::serve 设计的——签名针对 spawn + return JoinHandle 优化。

两个抽象都遵守 Rust "用 trait 解耦"的工程习惯——不强绑定到具体实现(tokio::net::TcpListener、tokio::spawn),而是让实现可替换。同样的思想在《Hyper 与 Tower:工业级 HTTP 栈》第 2 章讨论的 Service/Layer 设计里也有——trait 是 Rust 生态解耦的核心工具。

实战:mock Listener 用于测试

一个有意思的 Listener 用法:测试。不用真开 TCP,可以写内存 Listener:

rust
use tokio::io::{DuplexStream, duplex};
use axum::serve::Listener;

struct MockListener {
    // 存一些预置连接
    connections: std::sync::Mutex<Vec<DuplexStream>>,
}

impl Listener for MockListener {
    type Io = DuplexStream;
    type Addr = ();

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        loop {
            if let Some(stream) = self.connections.lock().unwrap().pop() {
                return (stream, ());
            }
            tokio::task::yield_now().await;  // 没连接就让出
        }
    }

    fn local_addr(&self) -> std::io::Result<Self::Addr> {
        Ok(())
    }
}

// 测试
#[tokio::test]
async fn mock_server() {
    let (client, server) = duplex(1024);
    let mock = MockListener {
        connections: std::sync::Mutex::new(vec![server]),
    };
    tokio::spawn(axum::serve(mock, app()));

    // 用 client 端发 HTTP 请求测 server
    // ...
}

这让你不起真实 TCP 就能端到端测 axum 应用——比 Router::oneshot 更真实(有 hyper 的 HTTP parse),比起真 TCP 更快、更可控。

实际生产测试通常还是起 TCP(localhost)——因为和 HTTP client 的整合更方便。mock Listener 适合需要精确控制连接顺序、模拟异常的单元测试。

常见问题

Q:ConnLimiter 的 limit 设多少合适?

看每连接的资源消耗。每连接 hyper 内部约 10-50 KB 内存(HTTP/1.1)+ handler 自己的消耗(数据库连接、session 等)。保守值:每 1GB 内存对应 1000 连接;精确值需要实测。

Q:TCP_NODELAY 有坏处吗?

极少。唯一负面是高频小包场景稍微多占 TCP 带宽(因为每个小 send 各自一个 packet)。对 HTTP 这种通常交互式的场景完全是正面——响应延迟立即降低。现代 OS 的 Nagle 算法和 TCP_NODELAY 之间的 tradeoff 已经倾向 NODELAY(除非你是做 telnet 这类纯字符回显)。

Q:ConnLimiter 和 tower::load_shed 哪个先?

两者语义不同:

  • ConnLimiter 限 连接数(accept 层)
  • load_shed 限 in-flight 请求数(Service::call 层)

生产通常两者都要——limit_connections 防 fd 耗尽、load_shed 防单连接内的并发过载。顺序上 limit_connections 必须在 Listener 层(外部)、load_shed 在 Service 层(内部)——自然嵌套。

Q:Listener accept 失败后 sleep 1 秒太长怎么办?

改不了。axum 硬编码。如果不能接受这个行为,绕过 axum::serve 自己 accept-loop——自定义错误处理策略。但 1 秒对绝大多数场景合适——accept 错误率很低、错了也不着急 retry。

Q:自定义 Listener 需要重新实现 graceful shutdown 吗?

不需要。graceful shutdown 是 WithGracefulShutdown 在 accept-loop 外层处理的——只要 Listener 的 accept 能被 tokio::select! 配合 signal 放弃等待,就自动工作。绝大多数 Listener 实现都满足这点(accept future 是 cancel-safe 的)。

Q:自定义 Executor 的 future 不 spawn 只同步 await 会怎样?

灾难——accept-loop 会阻塞在第一个连接的处理上、不能接其他连接。Executor 必须真的 spawn。同步运行的 executor 不是 executor 是 error。

Q:Listener 能换 IPv6 和 IPv4 吗?

tokio 的 TcpListener 绑 [::]:3000 时默认允许 IPv4-mapped IPv6 地址(取决于 OS 设置的 ipv6.bindv6only)。大多 Linux 系统默认关——一个 IPv6 listener 能接收 IPv4 连接。不行就写 MergedListener 两个 listener 合并。

Q:ConnLimiter 和 OS listen backlog 的关系?

ConnLimiter 限 axum 接受的连接数、不影响 OS 排队的连接数。两个独立机制:

  • OS backlog(somaxconn,Linux 默认 128):内核层排队、满了的新连接会 RST
  • ConnLimiter:应用层限制、超过 limit 就不 accept——这些连接留在内核 backlog 里

所以 ConnLimiter(1000) 配合 OS backlog(128) 的话:最多 1128 个连接在某种状态(1000 被 accept、128 在 OS 队列)。设置时要综合考虑两者——generally OS backlog 该调大(tokio::net::TcpListener::listen_backlogsysctl -w net.core.somaxconn=4096)。

更多 FAQ

Q:能把 axum 跑在 smol / async-std 上吗?

原则上可以——需要:

  1. 一个 Listener impl 用 smol 的网络原语(smol 提供 async_net)
  2. 一个 Executor impl 用 smol::spawn
  3. hyper 需要 tokio——所以实际无法摆脱 tokio

hyper 的存在让 axum 实质上绑定 tokio。真想切换 runtime 要找其他 HTTP 实现。一些边缘场景(Bevy 引擎的 HTTP server)用 async-tungstenite 等独立 HTTP/WebSocket 实现——但那不是 axum。

Q:Listener trait 为什么不要求 Debug?

Debug 不是必需——axum::serve 的 Debug 实现用 type_name 格式化 make_service 和 listener,不依赖 Debug impl。让 trait 尽量窄、bound 尽量少是 axum 的 style。自定义 Listener 如果想 Debug 自己实现 Debug 就行。

Q:Executor 能做 rate limiting 吗(按 spawn 速率)?

不直接。Executor 的 execute 立即返回 JoinHandle——阻塞 execute 意味着阻塞 accept-loop。想限 spawn 速率用 ConnLimiter(限 accept)或 Tower 的 RateLimitLayer(限请求)——Executor 不合适。

Q:ConnLimiter 超了之后客户端能感知吗?

看情况。limit 满时 axum 不 accept——OS 的 listen backlog 开始排队。backlog 满(Linux 默认 128)时,新连接客户端会收到 ECONNREFUSED。之前客户端只是 SYN 后等,可能超时或连接成功后等待 handler。

生产里 ConnLimiter 不是第一道防线——前置 LB 做更精细的流量控制(queue + 429 响应)。ConnLimiter 是"最后防护"——防止单实例 fd 耗尽。

Q:Listener 能在运行时动态加端口吗?

axum::serve 接一个 Listener——一个 serve 只能一个。想动态加端口:要么 spawn 多个 serve 实例、要么写一个 MergedListener 在其内部动态管理一组子 Listener(select 轮询)。

Listener 与 Executor 的组合全景

用户侧配置 Listener 和 Executor,axum::serve 的内部 accept + spawn 依赖这两个抽象。整张图把本章讨论的所有组件整合在一起。

Listener 的性能考虑

自定义 Listener 有几个常见性能坑。

一、accept 内部昂贵的初始化:如果 Listener 的 accept 里做 I/O(TLS 握手、PROXY protocol parse),这些操作会串行化——一次 accept 等完才能下一次。高并发下瓶颈明显(比如每个 TLS 握手耗时 10ms、accept-loop 只能跑 100 次/秒)。解决:

rust
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
    let (tcp, addr) = self.inner.accept().await;
    // ❌ 慢:串行等 TLS 握手
    let tls = self.acceptor.accept(tcp).await.unwrap();
    (tls, addr)
}

改成 spawn 握手到独立 task、accept 立即继续:

rust
// 但这样改变了 Listener 的 API 语义——accept 返回"握手完成的 IO"
// 实际更常见做法是在 handle_connection 里做 TLS 握手(不是 Listener 里)
// 这需要自己写 serve loop 而不是用 axum::serve

实际生产都接受 serial TLS 握手——因为 hyper_util / axum::serve 的模型假设 accept 返回 ready IO。想并行化 TLS 握手要用 axum-server 或自己写 accept-loop。

二、Io wrapper 的 poll 开销:每个 wrapper 的 AsyncRead/AsyncWrite 多一次虚函数调用。多层嵌套(ConnLimiterIo<TlsStream<ProxyStream<TcpStream>>>)的开销线性增长——但每层纳秒级,实际不显著。

三、Addr 类型的 clone:有些 Listener 的 accept 返回 Addr 时 clone 一次(比如从 IO 里 extract)——对频繁 accept 的场景可能累积。简单类型(SocketAddr)无所谓、复杂类型(带 metadata 的 CustomAddr)要注意。

大多数场景这些考虑都可以忽略——Listener 自定义的开销对业务 handler 来说完全不可见。

多 Listener 聚合:一个 serve 多个监听点

前面 MergedListener 的例子扩展——想同时监听 IPv4/IPv6、或者多个 port、或者 TCP + Unix socket:

rust
pub struct MultiListener<L1, L2> {
    a: L1,
    b: L2,
}

// 两个 listener 的 Io 不同? 需要枚举
pub enum EitherIo<A, B> {
    A(A),
    B(B),
}

// EitherIo 需要 impl AsyncRead / AsyncWrite——转发到 inner
impl<A: AsyncRead + Unpin, B: AsyncRead + Unpin> AsyncRead for EitherIo<A, B> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        match self.get_mut() {
            EitherIo::A(a) => Pin::new(a).poll_read(cx, buf),
            EitherIo::B(b) => Pin::new(b).poll_read(cx, buf),
        }
    }
}

然后让 MultiListener 的 Io 是 EitherIo

rust
impl<L1, L2> Listener for MultiListener<L1, L2>
where L1: Listener, L2: Listener<Addr = L1::Addr>,
{
    type Io = EitherIo<L1::Io, L2::Io>;
    type Addr = L1::Addr;

    async fn accept(&mut self) -> (Self::Io, Self::Addr) {
        tokio::select! {
            (io, addr) = self.a.accept() => (EitherIo::A(io), addr),
            (io, addr) = self.b.accept() => (EitherIo::B(io), addr),
        }
    }

    fn local_addr(&self) -> io::Result<Self::Addr> { self.a.local_addr() }
}

实际用:

rust
let tcp = TcpListener::bind("0.0.0.0:3000").await.unwrap();
let tcp6 = TcpListener::bind("[::]:3000").await.unwrap();
let multi = MultiListener { a: tcp, b: tcp6 };
axum::serve(multi, app).await;

一个 serve 处理双栈 IPv4 + IPv6——避免两个 serve 的复杂性。限制是 Addr 类型必须一致(SocketAddr 对 v4/v6 都行,但 TCP + Unix 的 Addr 不同——要么自己 enum 封装)。

这种 "聚合 listener" pattern 是 axum 扩展的强大之处——类型系统表达清晰、组合灵活。

axum-server 的内部结构

前面提过 axum-server 是生产 TLS 的事实标准——它的设计值得看看做对比。

axum-server 的主要结构:

rust
// 伪代码
pub struct Server<L = tokio::net::TcpListener> {
    listener: L,
    // ...
}

impl Server<TcpListener> {
    pub async fn serve<M, S>(self, make_service: M) -> io::Result<()>
    where M: MakeService, S: Service<Request>,
    {
        // 自己实现的 accept-loop, 不用 axum::serve
    }
}

pub fn bind_rustls(addr: SocketAddr, config: RustlsConfig) -> Server<RustlsListener> { ... }

和 axum::serve 的主要差异:

  1. API surface 更大:bind / bind_rustls / bind_openssl 等多种入口
  2. 内置 TLS:不需要自己写 Listener——直接 bind_rustls 就行
  3. graceful shutdown handle:通过 Handle 对象控制、不是 future
  4. 生态定位:axum-server 是 "超集"——覆盖 axum::serve 的所有功能 + TLS

选择标准:

  • 只要纯 HTTP(前置 LB 做 TLS)→ axum::serve,简单
  • 需要 axum 进程自己跑 TLSaxum-server,开箱即用
  • 特殊需求(PROXY protocol、自定义协议)→ 实现自定义 Listener 配合 axum::serve

axum-server 代码量约 axum::serve 的 3-4 倍——多出的都在 TLS + handle + 双协议支持上。axum 官方的 serve 保持简洁是有意——复杂场景外包给 axum-server。

小结

这一章讲了 axum::serve 的两个扩展点:Listener 负责连接来源抽象,Executor 负责任务调度抽象。两者独立可插拔——TCP + 默认 executor、TLS + tracing executor、Unix socket + limited executor,各种组合都合法。

Listener 的设计核心是 accept -> (Io, Addr) 的 async 方法——不返回 Result,错误在 impl 内部处理。ListenerExt 提供 limit_connections(Semaphore permit 绑定 Io 生命周期)和 tap_io(连接级钩子)两个组合子——覆盖大多数常见需求。

Executor 的设计核心是 execute<Fut>(fut) -> JoinHandle<Fut::Output>——一个方法换 spawn。自定义 executor 能统一给所有 serve 任务加 tracing、metrics、scheduling policy。

两个 trait 都遵守 axum 的"薄层胶水"设计哲学:接口极简、实现可组合、默认合理、替换容易。理解了这两个扩展点,你能面对几乎所有"axum 不直接支持 X 协议 / X 调度策略"的问题——自己实现 Listener 或 Executor 就能补上。

设计原则小结

  • Listener:把"连接的来源"抽象——TCP/Unix/TLS/PROXY/mock 都能是一个 impl
  • ListenerExt 组合子:tap_io / limit_connections 让常见 wrap 不用重新实现 trait
  • Executor:把"任务的去向"抽象——spawn 可以加 instrumentation / 限流 / 换 runtime
  • Arc<T> 的 blanket impl:让昂贵 executor 能 Arc 共享、clone 零成本
  • HyperExecutor 桥接:让 hyper 内部 spawn 也走用户的 executor

这些一起构成一个 "小 API + 大生态" 的设计样板——几百行代码定义的 trait 支持 axum-server、axum-valid-listener 等生态 crate。生态繁荣的前提是核心设计极简——axum 这两个 trait 是样板式的。

实际生产里 Listener 的配置影响运维/部署(部署方式是 TCP 还是 Unix socket?前置 LB 是否带 TLS?连接数上限多少?)、Executor 的配置影响可观测性(每个 task 怎么 instrument?怎么统计活跃 task 数?)。这两类配置在不同阶段讨论——但都集中在一个 axum::serve 调用里——这让部署和调试都有明确的"配置入口"。

trait 抽象还有一个次级好处:文档化。想了解 axum 支持什么部署方式?看 Listener 的实现列表(生态里有哪些 Listener crate)。想了解 executor 能做什么?看 Executor trait 的文档 + 几个示例 impl。相比"所有东西硬编码到 serve 里",trait 让 axum 的能力边界清晰可见。

实战:socket activation 模式

systemd / launchd 等系统服务管理器支持 "socket activation"——父进程(systemd)提前打开 socket、fork 子进程(axum)时把 fd 传过来。子进程不用自己 bind——直接从 fd 构造 Listener:

rust
use std::os::unix::io::FromRawFd;
use tokio::net::TcpListener;

fn listener_from_systemd() -> Option<TcpListener> {
    let fds = listenfd::ListenFd::from_env();
    // systemd 把 LISTEN_FDS 和 LISTEN_PID 设好
    let std_listener = fds.take_tcp_listener(0).ok()??;
    std_listener.set_nonblocking(true).ok()?;
    TcpListener::from_std(std_listener).ok()
}

async fn main() {
    let listener = match listener_from_systemd() {
        Some(l) => l,
        None => TcpListener::bind("0.0.0.0:3000").await.unwrap(),
    };
    axum::serve(listener, app).await;
}

socket activation 的好处:

  • 零停机重启:新版本启动时 systemd 交出 socket、老版本退出——客户端连接不断
  • 按需启动:socket 开着但服务进程可能休眠,有请求才唤醒
  • privileged port 绑定:systemd 用 root 绑 :80 / :443、service 进程用普通用户

Listener trait 让这个 pattern 无缝——systemd 给的是 TCP fd、axum 的 TcpListener 能接收。不需要特殊 API 支持。

实战:ACME + TLS 自动轮换

生产 TLS 需要定期轮换证书(Let's Encrypt 90 天)。rustls-acme 这类 crate 提供自动轮换——它给一个动态的 RustlsConfig,证书过期前自动申请新的。配合自定义 Listener 用:

rust
use rustls_acme::AcmeConfig;

let tls_acceptor = AcmeConfig::new(vec!["example.com"])
    .cache_dir(Some("./acme"))
    .acceptor();  // 返回动态 acceptor, 证书自动更新

let listener = AutoTlsListener::new(base_tcp, tls_acceptor);
axum::serve(listener, app).await;

AutoTlsListener 的 accept 每次调用时用当前最新的证书做 TLS 握手——证书更新对上层完全透明。这让生产 axum 跑完全自动化的 HTTPS——不用 cert-manager、不用外部脚本、证书 90 天自动轮。

这个 pattern 也是 Listener trait 的威力——加一层 wrapper 就能给 axum 加本来没有的能力。

与其他小 trait 的呼应

Listener 做"连接来源抽象"、Executor 做"任务调度抽象"。两个 trait 都很窄——只有 2-3 个方法。trait 窄的好处是实现简单——生态里多个库都提供 Listener / Executor impl、用户按需选择组合。这和第 11 章讨论的 IntoResponseParts、第 6 章讨论的 FromRequestParts 一样——axum 用大量小 trait 解耦、每个负责一个维度——组合起来覆盖各种场景。

下一章讲 Body 处理与流式响应——hyper::body 和 axum::body 的类型关系、Body::from_stream 如何把 Stream 转成 body、超大 body 的流式处理、以及和第 10 章讨论的 SSE 在底层 body 机制上的共性。那一章会回到 axum 的数据流层——本章讨论的是"连接级"抽象、下一章讨论的是"数据级"抽象,两者互补。

基于 VitePress 构建