Appearance
第10章 响应类型实战:JSON、HTML、Redirect、SSE
第 9 章把 IntoResponse 的抽象讲透了。这一章落到四个最常用的响应类型——Json<T> / Html<T> / Redirect / Sse<S>——看它们如何把 trait 的通用契约映射到具体 HTTP 场景。其中 SSE 尤其值得看:它的 body 不是一次性的 Vec<u8>,而是一条 Stream<Item = Event>,这让"响应是无限流"在 axum 里有了类型化的表达。
四个类型按"body 形态"可以分两组:
缓冲型响应把 body 一次性生成完再返回;流式响应里 body 是一个 Stream,帧按需发送。这两种形态对 hyper/tokio 的底层 poll_frame 有不同触发方式——但 handler 侧都只是"返回一个 IntoResponse 类型",看不到底层差异。
这一章的重点不是列 API——每个类型的方法清单去文档查更好——而是读它们的源码。每一个响应类型的 impl 都在 100 行以内,读完会让你清楚知道"为什么 axum 能在 2ms 内构造出一个 JSON 响应"、"为什么 Html 只设 Content-Type 就够了"、"SSE 的心跳是哪个组件在什么时候发出的"。这些"为什么"在用的时候不需要知道,但在 debug、性能调优、写自己的响应类型时是基础知识。
Json<T>:serde_json 直写 BytesMut
axum/src/json.rs:197-232 是 Json<T> 的 IntoResponse impl:
rust
// axum/src/json.rs:197-232
impl<T> IntoResponse for Json<T>
where T: Serialize,
{
fn into_response(self) -> Response {
fn make_response(buf: BytesMut, ser_result: serde_json::Result<()>) -> Response {
match ser_result {
Ok(()) => (
[(header::CONTENT_TYPE, HeaderValue::from_static(mime::APPLICATION_JSON.as_ref()))],
buf.freeze(),
)
.into_response(),
Err(err) => (
StatusCode::INTERNAL_SERVER_ERROR,
[(header::CONTENT_TYPE, HeaderValue::from_static(mime::TEXT_PLAIN_UTF_8.as_ref()))],
IntoResponseFailed,
err.to_string(),
)
.into_response(),
}
}
let mut buf = BytesMut::with_capacity(128).writer();
let res = serde_json::to_writer(&mut buf, &self.0);
make_response(buf.into_inner(), res)
}
}几个工程细节值得逐一拆解。
serde_json::to_writer 而非 to_vec
serde_json 有 to_vec、to_string、to_writer 三种序列化入口。Axum 选 to_writer(&mut buf, ...) 写进一个 BytesMut::writer()——等价"序列化到 BytesMut"。为什么不用更直接的 to_vec?
to_vec 返回 Vec<u8>,之后要 Body::from(Vec)。这个转换在 Rust 里是 move,没有额外开销——但 Vec<u8> 和 Bytes 的内部表示略有不同,Body 最终希望拿到的是 Bytes。从 Vec<u8> 转 Bytes 有一次(通常零拷贝的)移交,多绕一步
to_writer 直接写进 BytesMut,然后 buf.freeze() 把 BytesMut 原地转成 Bytes——完全零拷贝。对大响应(几 MB 的 JSON),这条路径省一次潜在的内存拷贝
初始容量 128:注释明确写"像 serde_json::to_vec 一样用 128 字节起步"。这是个合理默认——小响应不需要多次 realloc,大响应会按指数级增长到合适大小
成功与失败的两个分支
成功路径输出:状态 200(默认)、Content-Type: application/json、body 是序列化字节。失败路径输出:500 + text/plain + IntoResponseFailed 标记 + 错误字符串作为 body。
失败路径里 IntoResponseFailed 的作用我们在第 9 章讲过:如果 handler 写了 (StatusCode::CREATED, Json(data)),Json 序列化失败后 IntoResponseFailed 让外层不再覆盖成 201,保留 500。失败响应的 body 是错误信息字符串——生产部署要小心:如果输入结构里含敏感字段名、错误信息里可能暴露它。
失败的实际触发条件非常窄:T: Serialize 编译器已经检查了类型可序列化,运行时失败主要是:
HashMap<NonStringKey, V>:JSON object 的 key 必须是字符串,HashMap<u32, V>序列化会在 runtime 报错- 自定义
Serializeimpl 手动返回Err:极少见 - 递归/无限大结构体(栈溢出,通常崩)
日常业务数据用 #[derive(Serialize)] 几乎不会失败——但框架必须在 API 层承担这个可能性。
Json 响应的 HTTP 状态码与 Content-Type
有趣的是 Json<T> 的成功响应不设任何状态码——状态码保持 Response 默认的 200。用户要 201 时,(StatusCode::CREATED, Json(data)) 在元组 impl 里覆盖。这让 Json<T> 单独使用等价"200 + JSON",搭配元组时又能精细控制——简单场景简单,复杂场景灵活。
Content-Type 固定 application/json——不是 application/json; charset=utf-8。JSON 规范(RFC 8259)规定 JSON 必须是 UTF-8,charset 参数是冗余的(且有历史上因为 charset 解析混乱导致的兼容问题)。Axum 选"不带 charset"是遵循规范的做法。
流式 NDJSON:大规模导出的高效格式
单次响应传几十万条记录时,普通 Json 响应会把整个数组 serialize 到内存——几百 MB 的 peak 内存对服务器是灾难。NDJSON(Newline-Delimited JSON)把每条记录各自一行、按行流式返回:
json
{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
...Content-Type 通常是 application/x-ndjson 或 application/jsonl。axum 实现方式是自定义 body:
rust
use axum::body::Body;
use futures::stream::StreamExt;
async fn export_ndjson(State(db): State<Pool>) -> impl IntoResponse {
let rows = db.stream_users(); // 数据库游标的 Stream<Item = User>
let stream = rows.map(|user| {
let mut line = serde_json::to_vec(&user).unwrap();
line.push(b'\n');
Ok::<_, std::io::Error>(Bytes::from(line))
});
(
[("content-type", "application/x-ndjson")],
Body::from_stream(stream),
)
}每条记录序列化后立即推给 body——数据库游标按需读、HTTP body 按需发、中间内存只有一条记录的量级。这种模式适合大量数据导出(导出用户列表、审计日志、分析结果)。
客户端侧,NDJSON 按行读就能处理——不需要等整个响应完成。几 GB 数据导出在 NDJSON 下比普通 Json 数组快几倍且节省内存。
Json 响应的性能调优
对大多数 API 来说 Json 的序列化开销完全可接受,但有些场景需要更激进的优化。
一、提前填充 BytesMut 容量。默认 128 字节起步对小响应(几 K)合适,但对大响应(MB 级)每次倍增 realloc 会多几次内存分配。如果你知道响应大概多大,可以自定义 IntoResponse 控制容量:
rust
impl IntoResponse for BigResponse {
fn into_response(self) -> Response {
let mut buf = BytesMut::with_capacity(self.estimated_size()).writer();
serde_json::to_writer(&mut buf, &self).unwrap();
([("content-type", "application/json")], buf.into_inner().freeze()).into_response()
}
}二、用 simd-json 替代。simd-json 是一个 SIMD 加速的 JSON 库,序列化/反序列化速度比 serde_json 快 2-3 倍。但:
- simd-json 的 Serialize 兼容性不如
serde_json完整——某些边缘类型会失败 - 需要 AVX2 或类似的 SIMD 指令支持——ARM(苹果 M 系列、树莓派)上可能无效
- axum 默认用
serde_json,换成 simd-json 需要自己写 IntoResponse
一般业务场景不值得换。只有在 profiling 明确 JSON 序列化是瓶颈(CPU 火焰图里 serde_json::to_writer 超过 10%)时才有意义。
三、避免 Serialize T 里的大数据 clone。Json(T) 按值接收——如果 T 里包含 Vec<User> 等大数据、handler 先 clone 再 Json(data),clone 本身可能是瓶颈。改成借用或用 Arc:
rust
// ❌ 触发 clone
let users = cache.get_users();
Json(users.clone()).into_response()
// ✅ 零拷贝
let users = cache.get_users_arc(); // Arc<Vec<User>>
Json(&*users).into_response()Json<&T> 也能 IntoResponse——T: Serialize 对 &T 自动生效。这种细节能让热点 API 少一次大数据 clone。
Html<T>:最薄的包装
axum/src/response/mod.rs:37-53 的 Html<T> 出奇简单:
rust
// axum/src/response/mod.rs:37-53
pub struct Html<T>(pub T);
impl<T> IntoResponse for Html<T>
where T: IntoResponse,
{
fn into_response(self) -> Response {
(
[(header::CONTENT_TYPE, HeaderValue::from_static(mime::TEXT_HTML_UTF_8.as_ref()))],
self.0,
).into_response()
}
}就这几行:
T: IntoResponse是内部类型约束——不管 T 是&str、String、Bytes还是别的,只要能 IntoResponse 就行- impl 里组合成
([content_type_header], T)元组,走元组 IntoResponse - Content-Type 固定为
text/html; charset=utf-8
和 Json 的核心差异:Html 不做序列化——它假设你的 T 已经是 HTML 字节了(要么模板引擎生成的 String,要么静态字符串)。Html 只负责贴上正确的 Content-Type 标签。
使用场景:
rust
// 简单场景:静态 HTML
async fn index() -> Html<&'static str> {
Html("<h1>Hello</h1>")
}
// 和模板引擎配合
use askama::Template;
#[derive(Template)]
#[template(path = "page.html")]
struct PageTemplate { title: String, items: Vec<Item> }
async fn page() -> Html<String> {
let tmpl = PageTemplate { title: "Home".into(), items: fetch() };
Html(tmpl.render().unwrap())
}模板引擎(askama / tera / maud / minijinja)产出 String 或 &str,直接包 Html(...) 返回。模板渲染的失败处理在模板引擎内——Html 本身不处理。
为什么不让 Html 自己做模板渲染?
一个自然的问题:既然 Html 和某个模板引擎强关联,为什么 axum 不定义 Html<impl Template> 之类的 trait?
答案是"不绑定特定模板引擎"。Rust 生态有 askama(编译期)、tera(运行时)、maud(宏 DSL)、minijinja、handlebars 等,各有适用场景。Axum 不替用户选——让 Html<T> 只关心"T 能变成响应",用户自己决定用哪个模板引擎。这和第 8 章讨论的 Multipart"不选特定结构化解析器"是同一种不绑定策略。
RFC 7807: 结构化错误响应的标准
REST API 的错误响应常用自定义 JSON 形态,但有个标准规范——RFC 7807 Problem Details for HTTP APIs:
json
{
"type": "https://example.com/problem/out-of-credit",
"title": "You do not have enough credit.",
"status": 403,
"detail": "Your current balance is 30, but costs 50.",
"instance": "/account/12345/msgs/abc"
}Content-Type 是 application/problem+json。客户端(尤其是网关、监控系统)能识别这个格式、做统一处理——比自定义的 {"error": "..."} 更有生态兼容性。
axum 实现:
rust
#[derive(Serialize)]
struct Problem {
#[serde(rename = "type")]
type_: String,
title: String,
status: u16,
detail: String,
}
impl IntoResponse for Problem {
fn into_response(self) -> Response {
let status = StatusCode::from_u16(self.status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
(
status,
[("content-type", "application/problem+json")],
serde_json::to_vec(&self).unwrap(),
).into_response()
}
}对外公开的 API 建议遵守 RFC 7807——让错误响应可机读、可聚合、可分类。内部 API 可以简化为自定义格式,但保留"status + 错误类型 + 简短说明"这三要素是最低要求。
Html 的 XSS 防线
Html 只设 Content-Type,不对内容做任何转义——这意味着 XSS 防御完全是模板引擎或 handler 的责任。下面两种写法其中一种是 XSS 漏洞:
rust
// ❌ XSS 漏洞:username 可能是 "<script>alert('xss')</script>"
async fn greet(Path(username): Path<String>) -> Html<String> {
Html(format!("<h1>Hello, {}</h1>", username)) // 直接嵌入,危险
}
// ✅ 用模板引擎自动 escape
#[derive(Template)]
#[template(path = "greet.html")] // askama 模板默认 HTML escape
struct GreetTemplate { username: String }
async fn greet(Path(username): Path<String>) -> Html<String> {
Html(GreetTemplate { username }.render().unwrap())
}不同模板引擎的默认 escape 策略:
| 引擎 | 默认 escape | 强度 |
|---|---|---|
| askama | 是(HTML entity encoding) | 按默认安全 |
| tera | 是 | 按默认安全 |
| handlebars | {{ }} escape、{{{ }}} 不 escape | 取决于用法 |
| maud | 是(编译期分析) | 按默认安全 |
| minijinja | 是 | 按默认安全 |
选模板引擎时 escape 是第一优先级,其次才是性能和 DSL 体验。生产项目里绝对不要手写 format! 拼 HTML——一次疏忽就是 XSS。
Redirect:3xx 三兄弟
HTTP 的重定向状态码有六种(301/302/303/307/308、304),Axum 的 Redirect 暴露三种(303/307/308)——redirect.rs:27-56:
rust
// axum/src/response/redirect.rs:37-56(简化)
impl Redirect {
pub fn to(uri: impl Into<String>) -> Self { // 303 See Other
Self::with_status_code(StatusCode::SEE_OTHER, uri.into())
}
pub fn temporary(uri: impl Into<String>) -> Self { // 307 Temporary Redirect
Self::with_status_code(StatusCode::TEMPORARY_REDIRECT, uri.into())
}
pub fn permanent(uri: impl Into<String>) -> Self { // 308 Permanent Redirect
Self::with_status_code(StatusCode::PERMANENT_REDIRECT, uri.into())
}
}每个构造器对应一个精确的语义。
| 构造器 | 状态码 | 客户端行为 | 典型用途 |
|---|---|---|---|
to | 303 See Other | 把后续请求的 method 改成 GET | POST 表单提交后跳转(POST-Redirect-GET 模式) |
temporary | 307 Temporary Redirect | 保留原 method 和 body | 临时维护、A/B 测试 |
permanent | 308 Permanent Redirect | 保留原 method 和 body | 资源永久移到新 URL |
缺的 301 / 302 哪去了?历史上 301 和 302 被某些客户端 errone 地改 method(按 303 行为处理),导致"POST 被当成 GET 重发"的行为不确定。RFC 7231 最终把 301 和 302 标为"客户端行为不确定",新代码建议用 308(永久,保留 method)或 307(临时,保留 method)或 303(永久改 GET)。Axum 只提供这三个符合规范的——强制用户做语义明确的选择。
axum 0.6 及之前有一个 Redirect::to(305/302 之类),后来改成了现在的 303。如果你从老项目迁移,要注意语义变化。
HeaderValue::try_from:url 合法性兜底
redirect.rs:87-94 的 IntoResponse impl:
rust
impl IntoResponse for Redirect {
fn into_response(self) -> Response {
match HeaderValue::try_from(self.location) {
Ok(location) => (self.status_code, [(LOCATION, location)]).into_response(),
Err(error) => (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response(),
}
}
}HeaderValue::try_from(String) 会检查字符串里是不是有非法的 header value 字符(控制字符、换行、非 ASCII)。测试 redirect.rs:131-137 验证:"Axum is awesome, \n but newlines aren't allowed 😦"——这个字符串包含 \n,构造 HeaderValue 失败,结果是 500。
这是一个常被忽略的安全考虑:如果 URL 来自用户输入、没有校验就塞 Redirect,攻击者可以注入 \n 字符做 CRLF injection——在响应头里注入额外的行,影响 cookie、CSP 等关键头。Axum 在这一步做了被动兜底:不合法字符直接 500,不让它出到响应里。但这不是万能——合法字符也可能是恶意的(开放重定向漏洞:Redirect::to(user_input) 让攻击者引用户去钓鱼站)。开放重定向的防御要在 handler 里做白名单检查,Redirect 本身不管。
HSTS 与 HTTPS 强制
一个相关的安全头:Strict-Transport-Security(HSTS)。它告诉浏览器"未来对这个域名只走 HTTPS"——即使用户输入 http://,浏览器会自动改成 https。这防止了"第一次请求"被中间人降级成 HTTP。
和 Redirect 配合:处理 http 请求时做 308 跳转到 https 的同时设 HSTS:
rust
async fn redirect_to_https(uri: Uri) -> impl IntoResponse {
let https_uri = format!("https://example.com{}", uri.path());
(
[
("strict-transport-security", "max-age=31536000; includeSubDomains"),
],
Redirect::permanent(&https_uri),
)
}但通常 HTTPS 强制在反向代理层(Nginx / Cloudflare / ALB)做——axum 服务器自己不直接处理 HTTP→HTTPS 跳转。生产部署下这个 handler 很少需要自己写。
开放重定向的防御
开放重定向(Open Redirect)是常见漏洞:handler 接受 URL 参数做跳转,攻击者构造 /redirect?to=https://evil.com,钓鱼站诱导用户点击 https://trusted.com/redirect?to=https://evil.com——浏览器地址栏一开始是 trusted.com 让用户放松警惕,跳转后到 evil.com。
防御两种思路:
一、URL 白名单:
rust
fn is_safe_redirect(url: &str) -> bool {
// 相对 URL 安全
if url.starts_with('/') && !url.starts_with("//") { return true; }
// 白名单域名
if url.starts_with("https://app.example.com") { return true; }
false
}
async fn redirect_handler(Query(q): Query<RedirectQuery>) -> impl IntoResponse {
if !is_safe_redirect(&q.to) {
return (StatusCode::BAD_REQUEST, "unsafe redirect target").into_response();
}
Redirect::to(&q.to).into_response()
}注意 // 的特殊性——//evil.com/path 是协议相对 URL,被浏览器当成 https://evil.com/path。很多开放重定向漏洞由忽略 // 前缀造成。
二、只接受 ID,服务端查表:
rust
async fn redirect(Path(id): Path<u64>) -> impl IntoResponse {
let target = db.get_redirect(id).await?; // 服务端保管所有合法 URL
Redirect::to(&target.url).into_response()
}URL 不来自用户输入——从不可能构造恶意目标。这是更稳妥的做法,适合"短链服务"场景。
Axum 的 Redirect 本身不做白名单——它是 low-level 构造器,白名单策略应该在应用层做。
Sse<S>:Stream 作为响应 body
Server-Sent Events 是 HTML5 规范的一部分——浏览器通过 EventSource API 与服务端建立长连接单向推送。协议非常简单:Content-Type: text/event-stream,body 是一连串 UTF-8 文本,每个事件以空行分隔:
text
event: update
id: 42
data: {"price": 100}
event: update
id: 43
data: {"price": 101}三行一组:event: / id: / data:。客户端 EventSource 按事件触发 JavaScript 回调。适合实时行情、通知推送、AI 流式输出等场景。
IntoResponse 实现:包装 Stream 成 body
axum/src/response/sse.rs:90-107:
rust
// axum/src/response/sse.rs:90-107
impl<S, E> IntoResponse for Sse<S>
where
S: Stream<Item = Result<Event, E>> + Send + 'static,
E: Into<BoxError>,
{
fn into_response(self) -> Response {
(
[
(http::header::CONTENT_TYPE, mime::TEXT_EVENT_STREAM.as_ref()),
(http::header::CACHE_CONTROL, "no-cache"),
],
Body::new(SseBody {
event_stream: SyncWrapper::new(self.stream),
}),
).into_response()
}
}SseBody<S> 是一个 newtype 把 Stream<Item = Result<Event, E>> 包装成 http_body::Body。核心是 poll_frame(sse.rs:116-135):
rust
// axum/src/response/sse.rs:123-134
fn poll_frame(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Result<Frame<Self::Data>, Self::Error>>>
{
let this = self.project();
match ready!(this.event_stream.get_pin_mut().poll_next(cx)) {
Some(Ok(event)) => Poll::Ready(Some(Ok(Frame::data(event.finalize())))),
Some(Err(error)) => Poll::Ready(Some(Err(error))),
None => Poll::Ready(None),
}
}逻辑极简:每次 hyper poll 这个 body,就 poll 底层 Stream 取下一个 Event,把 Event 编码成字节(event.finalize())作为 Frame::data。Stream 结束(None)就是响应 body 结束。
SyncWrapper:应对 !Sync stream
注意 SyncWrapper::new(self.stream)——用一个 SyncWrapper 包装 stream。这是因为某些合法的 Stream 实现(比如捕获了 Cell 或 RefCell 的 stream)不是 Sync,但 hyper 要求 body 是 Send + Sync(在某些 runtime 配置下)。SyncWrapper<T> 通过不安全但合理的声明把 !Sync 类型变成 Sync——因为只要 T 是 Send,在 body 的单线程 poll 上下文里用 &mut T 即可,不需要真正的 Sync。
这种"用 newtype 绕过 marker trait 要求"的手法在 Rust 异步生态比较常见——futures 和 tokio 都在若干地方用它。SyncWrapper 不让 Stream 变得真正 Sync,只是满足类型签名。
Event 的编码:SSE 帧格式
Event 用一个小状态机构造,sse.rs:174-300 有完整 API:
rust
use axum::response::sse::Event;
let event = Event::default()
.id("42")
.event("update")
.data(r#"{"price": 100}"#);finalize() 生成 SSE 帧的字节:
text
event: update
id: 42
data: {"price": 100}注意 Event 的数据写入有两个关键点:
多行数据自动拆分:data: "line1\nline2" 会被 Event 自动拆成两个 data: 行:
text
data: line1
data: line2这是 SSE 协议规范要求——单个 data 字段的多行数据必须用多个 data: 前缀。直接写 data: line1\nline2 是违反协议的。Event API 封装了这个细节。
不能同时用 data() 和 json_data():写一个 event 只能 data 一次——因为 data 字段累积会破坏结构。API 通过 EventDataWriter 的生命周期约束(sse.rs:191-198,data_written: bool 标记)防止重复调用。
KeepAlive:防中间代理断连
生产 SSE 有一个隐形陷阱:长时间没数据的 SSE 连接会被中间代理(Nginx、Cloudflare、ALB)默默断掉。标准做法是服务端定期发心跳事件,让代理认为连接还活。
sse.rs:518-573 定义的 KeepAlive:
rust
// axum/src/response/sse.rs:523-530
impl KeepAlive {
pub fn new() -> Self {
Self {
event: Event::DEFAULT_KEEP_ALIVE,
max_interval: Duration::from_secs(15),
}
}
pub fn interval(mut self, time: Duration) -> Self { self.max_interval = time; self }
pub fn text<I: AsRef<str>>(self, text: I) -> Self { /* ... */ }
}默认 15 秒一次,心跳内容是 :\n\n(sse.rs:202 定义的 DEFAULT_KEEP_ALIVE——一个空 SSE 注释,客户端收到会无视)。用法:
rust
use axum::response::sse::{Event, KeepAlive, Sse};
use futures::stream::Stream;
use std::time::Duration;
async fn stream_prices(State(rx): State<broadcast::Sender<Price>>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let s = BroadcastStream::new(rx.subscribe())
.map(|price| Ok(Event::default().event("price").json_data(price.unwrap()).unwrap()));
Sse::new(s).keep_alive(KeepAlive::default().interval(Duration::from_secs(10)))
}keep_alive(...) 返回新类型 Sse<KeepAliveStream<S>>——底层 stream 被包装成 KeepAliveStream<S>,内部同时持有一个 tokio::time::Sleep。每次 poll 时:先 poll inner stream,有事件就返回;没事件且定时器到期就返回一个心跳 Event;否则继续 pending。sse.rs:606-650 的 Stream impl 有完整逻辑。
没有 KeepAlive 的 SSE 在 Nginx 默认配置下 60 秒无事件就断,在 Cloudflare 下 100 秒。15 秒默认间隔对绝大多数代理都安全。
Last-Event-ID:断线重连的续传
SSE 规范有一个巧妙设计:客户端 EventSource 在连接断开后会自动重连,重连时把上次收到的最后一个事件 id 作为请求头 Last-Event-ID 发给服务端。服务端可以读这个头判断"客户端丢了哪些事件",从那之后开始续传。
axum 实现:
rust
use axum::http::HeaderMap;
async fn resume_stream(
headers: HeaderMap,
State(store): State<EventStore>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let last_id = headers
.get("last-event-id")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
// 从 last_id+1 开始推送
let stream = store.events_since(last_id + 1).map(|e| {
Ok(Event::default()
.id(e.id.to_string())
.event("update")
.json_data(&e.data)
.unwrap())
});
Sse::new(stream).keep_alive(KeepAlive::default())
}两件事必须做对:
- 每个事件都设
id:客户端根据 id 判断"这个我见过没" - 服务端保留历史事件:内存中的 ring buffer、或者 redis/数据库——重连时能返回旧 id 之后的事件
如果不设 id 或丢历史,客户端重连后只能接收新事件——连接掉线期间的事件客户端永远收不到。对通知、行情等"偶尔丢几条无所谓"的场景可以忽略,对"每一条都重要"的场景必须实现续传。
Backpressure:SSE 的慢消费者问题
poll_frame 是 hyper 拉取 body 帧的调用——如果客户端网络慢,hyper 的底层 TCP write buffer 会满,poll_frame 暂停调用。与此同时上游 Stream(比如 broadcast subscribe)继续产生事件——如果没设 back-pressure,broadcast buffer 爆掉、subscriber 开始 lag 甚至被踢。
处理方法:
- broadcast 的容量:
broadcast::channel(1000)让慢消费者能积压到 1000 条才开始丢数据。对低频事件够用 - 丢弃策略:
BroadcastStream的Lagged(n)错误告诉你丢了 n 条事件——handler 可以检测到并生成"你掉线了"的提示 event - 按需订阅:高频推送(游戏帧率)配
watch::channel而非broadcast——watch 语义是"只给最新值",天然丢历史、没积压问题 - 关断慢连接:发现 subscriber 持续 lag 就主动断开——让客户端重连(Last-Event-ID 会恢复)
SSE 和 WebSocket 的 backpressure 都是生产部署的难点——关键是监测,不是盲目扩 buffer。对 subscriber 数量、每连接的 lag 做指标。
NoContent:清晰意图的小工具
response/mod.rs:77-83 定义了 NoContent:
rust
pub struct NoContent;
impl IntoResponse for NoContent {
fn into_response(self) -> Response {
StatusCode::NO_CONTENT.into_response()
}
}为什么不直接让 handler 返 StatusCode::NO_CONTENT?因为类型比常量自述性更强。async fn delete() -> NoContent 比 async fn delete() -> StatusCode 多传递一层含义:"这个 handler 语义上就是返回 204",不是随便哪个状态码。签名对读者更友好。
还有一个历史原因在文档里(mod.rs:63-66):"() 的 IntoResponse 返 200 而不是 204"——这是 axum 历史兼容决定。如果想要 204,用 NoContent,不要指望 () 变化。这是"类型用作标签"的一个典型 Rust 风格——用零大小类型表示语义,编译期不带开销,运行时行为明确。
文件下载:Content-Disposition 与流式响应
文件下载和 HTML / JSON 这类响应逻辑略有不同——客户端应该保存而不是内联显示。关键是 Content-Disposition: attachment; filename="..." 头。
小文件直接读进内存返回:
rust
async fn download_small(Path(name): Path<String>) -> Result<impl IntoResponse, AppError> {
let bytes = tokio::fs::read(format!("/data/{name}")).await?;
Ok((
[
("content-type", "application/octet-stream"),
("content-disposition", &format!("attachment; filename=\"{name}\"")),
],
bytes,
))
}几 MB 以下文件用这种方式够快。但大文件(几十 MB 起)应该流式,不要全部缓冲——否则一个并发下载请求就吃几十 MB 内存。流式下载用 StreamBody 或自定义 Body:
rust
use axum::body::Body;
use tokio_util::io::ReaderStream;
async fn download_large(Path(name): Path<String>) -> Result<impl IntoResponse, AppError> {
let file = tokio::fs::File::open(format!("/data/{name}")).await?;
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
Ok((
[
("content-type", "application/octet-stream"),
("content-disposition", &format!("attachment; filename=\"{name}\"")),
],
body,
))
}tokio_util::io::ReaderStream 把 AsyncRead 包成 Stream<Item = Result<Bytes, io::Error>>,Body::from_stream 把它变成 body。hyper 按需 poll,每次从磁盘读一块发一块——内存占用恒定,无关文件大小。
filename 的 UTF-8 处理:attachment; filename="foo.txt" 里的 filename 按 HTTP 规范只能是 ASCII。中文 filename 要用 filename*=UTF-8''... 的 RFC 5987 扩展:
http
content-disposition: attachment; filename="fallback.txt"; filename*=UTF-8''%E6%96%87%E4%BB%B6.txt实际写 filename* 的值要 percent-encode。axum 不自动做——需要自己用 percent-encoding crate。
Form 作为响应:较少但存在
axum::Form<T> 和 Json<T> 对称——既是提取器也是响应:
rust
async fn get_config() -> Form<AppConfig> {
Form(AppConfig { /* ... */ })
}响应 Content-Type 是 application/x-www-form-urlencoded,body 是序列化后的 k=v&k=v 字符串。
实际场景少——客户端很少期望"urlencoded 格式的响应"——但这个对称性让 form 作为传输格式的双向场景(某些老系统集成)可用。常见用途:旧式 OAuth token exchange 的响应(虽然现代 OAuth 已全面走 JSON)。
Sse 在 LLM 流式场景的典型用法
现代 AI 应用(OpenAI API / Anthropic / 各家 Chat UI)几乎都用 SSE 做"模型 token 流式输出"——用户等字逐渐出现而不是等 10 秒看整段。axum 实现很自然:
rust
async fn chat_stream(
State(llm): State<Arc<LlmClient>>,
Json(req): Json<ChatRequest>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let token_stream = llm.stream_completion(req.messages);
let events = token_stream.map(|token_result| {
let event = match token_result {
Ok(chunk) => Event::default()
.event("token")
.json_data(&chunk).unwrap(),
Err(e) => Event::default()
.event("error")
.data(e.to_string()),
};
Ok(event)
});
Sse::new(events).keep_alive(KeepAlive::default().interval(Duration::from_secs(5)))
}几个 AI 场景专有的设计:
event: token/event: error双类型:前端EventSource.addEventListener('token', ...)和addEventListener('error', ...)分别处理——token 事件是业务数据,error 事件是流出错- 5s 心跳而非 15s:LLM 有时有长 stall(在 thinking 或调工具),短心跳让前端 loading 动画判断"是在等还是挂了"更灵敏
- 最后发一个
event: done标记流结束:客户端明确知道"正常结束"。如果依赖 Stream end 而不发 done 事件,客户端可能分不清"正常结束"和"连接断开" - 不设
Last-Event-ID:LLM 的 token 流是一次性的——重连从头开始不合理。这里应该让客户端检测断开后重新发请求而不是断线重连
这类 Sse handler 在 LLM 应用里每天跑亿万次——是 axum 生产运行最多的场景之一。从 axum 源码看 SSE 的机制能让你理解"为什么 OpenAI 的 stream 偶尔卡一下"——大概率是上游某一步 stall 了,心跳机制是感知它的工具。
3xx 重定向的历史注释
RFC 7231 对 301/302/303/307/308 的规定如下:
| 码 | 语义 | 客户端行为 |
|---|---|---|
| 301 Moved Permanently | 永久搬迁 | 历史上客户端可能把 POST 改成 GET——行为不一致 |
| 302 Found | 临时搬迁 | 同上,行为不一致 |
| 303 See Other | "去另一个地方看" | 明确把请求改成 GET |
| 307 Temporary Redirect | 临时搬迁 | 明确保留原 method |
| 308 Permanent Redirect | 永久搬迁 | 明确保留原 method |
为什么 301/302 行为不一致?早期浏览器(Netscape、IE 4)把 301 看作"永久所以下次直接用新 URL;且重发时改 GET 更安全"——无论原来是什么 method。这让 301/302 的实际行为等价于"301/302 ≈ 303"。RFC 2616 承认了这个既成事实,但 RFC 7231 引入了 307/308 作为"保留 method 的新版本"——语义明确。
axum 的选择:不提供 301/302 构造器——强迫用户选 303/307/308 之一。这是 axum "API 层面表达标准化语义"的又一例子:框架能约束的事情,就不留给用户踩坑。如果你真的需要返回 301 或 302(比如兼容某些老旧客户端),可以直接构造 Response:
rust
async fn legacy_301() -> Response {
Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header("location", "/new")
.body(Body::empty())
.unwrap()
}但绝大多数场景应该用 axum 提供的三个 Redirect 变体。
为什么 Redirect 接收 String 而非 Url
Redirect::to(uri: impl Into<String>)——接收的是 String,不是 url::Url 或 http::Uri。这看起来像松懈,实际有原因:
一、相对 URL 合法。HTTP 重定向的 Location 头可以是相对路径(/login)、相对 URL(./sub)、或绝对 URL(https://host/path)——客户端按当前 URL 解析。url::Url 要求绝对 URL,会拒绝 /login;http::Uri 能表达但构造笨重
二、URL 构造在业务层更合适。format!("https://host/users/{}", id) 在 handler 里读起来直接;让 Redirect 接收 url::Url 反而强迫 handler 先构造 Url::parse(...)——多一层且 parse 可能失败
三、Redirect 自己做合法性检查。在 into_response 里 HeaderValue::try_from 兜住非 ASCII 和控制字符——足以防护常见注入。
String 接收让 API 保持朴素。这体现了 axum 对"宽进严出"的权衡:API 入口接受宽泛类型、内部做必要的兜底检查,既不让类型系统过度约束用户、也不让安全兜底被用户绕过。
四种响应的对比
| 维度 | Json<T> | Html<T> | Redirect | Sse<S> |
|---|---|---|---|---|
| 状态码默认 | 200 | 200 | 303/307/308 | 200 |
| Content-Type | application/json | text/html; charset=utf-8 | 不设 | text/event-stream |
| Body 形态 | Bytes(serde_json 缓冲) | T: IntoResponse(透传) | 空 | 流式 Stream |
| 失败路径 | 500 + IntoResponseFailed | 无(T 的失败) | 500(URL 非法) | Stream 的 Err 变 body 错 |
| 生成成本 | O(T 大小),serde_json 开销 | 模板引擎渲染 + Content-Type | O(1) | Stream 按需 poll |
| 典型用途 | REST API | 传统多页 Web | POST-Redirect-GET、URL 搬迁 | 实时推送 |
注意 Sse 的一个独特性质:它是四种里唯一的流式响应。body 长度不确定、生存期可能几分钟到几小时。这让 Sse 的 handler 经常是 async fn ... -> Sse<impl Stream>,返回一个动态生成的 stream。
响应监控:哪些指标值得看
生产环境用 axum 做 API 服务,响应层面的指标比 handler 层面更能反映用户体验:
- 按 status code 分桶的 QPS:2xx/3xx/4xx/5xx 各自速率,异常抖动时第一眼看的图
- 按 content-type 分桶的 body 大小:
application/json的平均/p99 大小——监控内存膨胀 - SSE 活跃连接数:
ACTIVE_SUBSCRIPTIONSgauge;连接数爆涨往往是代理层配置或客户端 bug 的信号 IntoResponseFailed事件计数:序列化失败频率——持续非零说明 API 里有不能 serialize 的数据结构,需要调查
tower-http::trace::TraceLayer 配合 tracing 和 metrics 收集这些数据的基础设施——第 13 章讲中间件时会详细讨论怎么组装 observability 栈。
响应类型选型指南
回到 handler 层面——给定一个场景,选哪种响应类型?几条经验:
返回结构化数据 → Json。REST API、AJAX 响应、前后端分离的 SPA 场景默认 Json。如果前端是 React/Vue/Svelte 等现代框架,Json 是唯一合理选择。
服务端渲染整张页面 → Html。传统多页 Web、Server-Side Rendered(SSR)架构、或者结合 htmx / hotwire 的渐进式增强——Html 响应配合模板引擎产出整块 HTML。注意模板引擎的 auto-escape 是安全基线。
跳转 → Redirect。POST 表单提交后的 PRG 模式用 Redirect::to(303);url 搬家用 Redirect::permanent(308);临时维护用 Redirect::temporary(307)。避免用已弃用的 301/302。
实时推送(单向)→ Sse。通知、行情、AI 流式文本输出、日志 tail 等"服务端主动向客户端发送"的场景。浏览器 EventSource 原生支持,有 Last-Event-ID 自动重连机制——比 WebSocket 简单、比 long polling 高效。
实时双向通信 → 第 8 章的 WebSocket。聊天、协作编辑、游戏——客户端也要发数据的场景。
文件下载 → 手动 Body::from_stream + Content-Disposition。axum 没为"文件下载"做专门的响应类型——因为场景组合太多(是否流式、是否分块、是否支持 Range 等)。下一章 IntoResponseParts 会讨论 TypedHeader 让头设置更优雅。
空响应 → NoContent(204)或 StatusCode::NO_CONTENT。DELETE 成功、PUT 无 body 返回等。
自定义复杂响应 → 自定义 IntoResponse。把分页、HATEOAS、版本化等跨 handler 的模式封装成类型,handler 签名保持清晰。
实战:SSE 推送通知系统
把所有机制串起来,做一个典型场景——用户订阅实时通知,服务端有事件时推送。
rust
use axum::{
extract::{Path, State},
response::sse::{Event, KeepAlive, Sse},
};
use futures::stream::{Stream, StreamExt};
use tokio::sync::broadcast;
#[derive(Clone)]
struct AppState {
notify_tx: broadcast::Sender<Notification>,
}
async fn subscribe(
State(state): State<AppState>,
Path(user_id): Path<u64>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let rx = state.notify_tx.subscribe();
let stream = BroadcastStream::new(rx)
.filter_map(move |result| async move {
let notif = result.ok()?;
// 只推送给该用户的通知
if notif.user_id != user_id { return None; }
Some(Ok(Event::default()
.event("notification")
.id(notif.id.to_string())
.json_data(¬if).unwrap()))
});
Sse::new(stream).keep_alive(
KeepAlive::default()
.interval(Duration::from_secs(15))
.text("ping"),
)
}
// handler 挂路由
Router::new()
.route("/users/{user_id}/notifications", get(subscribe))
.with_state(state);客户端的用法:
javascript
const src = new EventSource('/users/123/notifications');
src.addEventListener('notification', (e) => {
const n = JSON.parse(e.data);
showNotification(n);
});几件工程考虑:
filter_map过滤:广播 channel 广播给所有订阅者,但每个连接只过滤属于自己用户的。filter 走 stream combinator 就够,不需要额外基础设施- 错误类型
Infallible:上面 handler 里 stream 的 Err 类型是Infallible——broadcast 的 RecvError 在filter_map里被吞了。这让 Sse 不会向客户端 propagate 任何错误帧——但这也意味着你得确信"broadcast 错不是业务问题"。如果 lagging(subscriber 消费慢)是要报警的业务问题,不能这样吞 - 心跳配置:15 秒间隔 + 文本
"ping"——方便服务端日志判断"心跳发了但是客户端有没有收到"。默认空心跳虽省流量,但日志里看不到 broadcast::Sender容量:默认 16,对高并发场景要调大——否则 subscriber 跟不上会丢数据。但调太大占内存。这是广播设计的基本 tradeoff
连接清理与订阅生命周期
SSE 连接关闭(客户端主动关、网络断、浏览器 tab 关)时,服务端的 Stream future 需要立即释放——否则 broadcast subscription、数据库连接、内存中的状态会累积。幸运的是,hyper 自动处理大部分情况:
- 客户端 TCP 关闭 → hyper 的 Stream::poll_next 返回
None或 hyper body 被 drop - Body 被 drop → SseBody 被 drop → inner Stream 被 drop → subscribe 对应的 Receiver 被 drop → broadcast 自动取消订阅
关键是 Rust 的所有权链:整个 Stream/Future 图由 handler 返回后 hyper 持有,handler 本地 move 进去的资源(broadcast Receiver、DB cursor 等)随 Future 释放自动清理。
但有些资源不在 Stream 里持有——比如"外部的通知计数器"、"Prometheus counter"——handler 想在连接关闭时做收尾,要显式写:
rust
async fn stream_with_cleanup(...) -> impl IntoResponse {
let gauge = ACTIVE_SUBSCRIPTIONS.clone();
gauge.inc();
let cleanup = scopeguard::guard(gauge, |g| g.dec());
let stream = subscribe_stream().map(move |e| {
let _hold = &cleanup; // 保持 guard 存活
Ok(Event::default().json_data(&e).unwrap())
});
Sse::new(stream).keep_alive(KeepAlive::default())
}scopeguard::guard 在 drop 时跑回调——Stream 被 drop 时 gauge 减一。这样 Prometheus 永远能准确反映活跃连接数,即使客户端异常断开。
与 Tokio Stream 生态的协作
SSE 的 handler 几乎必然跟 tokio-stream 打交道。几个常用 stream 工具:
BroadcastStream(tokio-stream):把broadcast::Receiver包成 StreamWatchStream(tokio-stream):把watch::Receiver包成 Stream——适合"最新值一直推"IntervalStream(tokio-stream):定时产生事件——适合周期性心跳UnboundedReceiverStream/ReceiverStream:把 mpsc 包成 Stream——单生产者场景
《Tokio 源码深度解析》第 11 章详细讨论了这些 Stream 适配器的实现——特别是 BroadcastStream 如何处理 Lagged 错误、watch 如何合并中间值。读那一章后再看 axum 的 SSE handler,就会理解"每种 channel 类型对应不同的 SSE 推送语义"。
响应头的工程细节
除了前面讨论的 Content-Type,有几组响应头在生产 API 里极其重要——它们都可以通过元组 IntoResponse 或 IntoResponseParts 设置。
Cache-Control:客户端和代理缓存
Cache-Control 决定浏览器、CDN、反向代理是否缓存响应:
rust
async fn static_asset() -> impl IntoResponse {
(
[
("cache-control", "public, max-age=31536000, immutable"),
("content-type", "image/png"),
],
include_bytes!("../static/logo.png").as_slice(),
)
}
async fn dynamic_page() -> impl IntoResponse {
(
[("cache-control", "no-store")],
Html(render_page()),
)
}几种典型值:
| 值 | 语义 | 场景 |
|---|---|---|
no-store | 任何缓存都别存 | 含敏感信息的动态响应 |
no-cache | 可缓存但每次要向服务端验证(If-None-Match) | 可能变化的页面 |
public, max-age=3600 | 任何缓存可存 3600 秒 | 公开的 API 响应 |
public, max-age=31536000, immutable | 一年缓存+不重验证 | 带 hash 后缀的静态资源 |
private, max-age=60 | 只允许私有缓存(浏览器) | 用户专属数据 |
全局统一设置通常用 tower-http::set_header::SetResponseHeaderLayer,不需要在每个 handler 里写。ResponseHeaders 中间件按路由区分策略比 handler 内写更集中。
ETag + If-None-Match:节省带宽
对大响应,如果客户端已经缓存过一份(带 ETag),再次请求时会发 If-None-Match: "abc123"——服务端看到这个头,如果资源没变,返回 304 Not Modified 空 body,节省整个响应体的带宽:
rust
async fn article(Path(id): Path<u64>, headers: HeaderMap) -> Result<Response, AppError> {
let article = db.get(id).await?;
let etag = format!("\"{}\"", article.version);
// 检查客户端缓存是否仍有效
if let Some(inm) = headers.get("if-none-match") {
if inm.to_str().ok() == Some(&etag) {
return Ok((StatusCode::NOT_MODIFIED, [("etag", etag.as_str())]).into_response());
}
}
Ok((
[("etag", etag.as_str()), ("cache-control", "public, max-age=60")],
Json(article),
).into_response())
}ETag 的值约定是"任何能代表资源版本的字符串"——数据库版本号、内容 sha256、修改时间 timestamp 都行。关键是同一资源版本的 ETag 必须一致。
内容协商:Accept 头与多格式响应
有些 API 要同时支持 JSON 和 HTML 两种响应格式——客户端通过 Accept 头表达偏好:
rust
use axum::http::{HeaderMap, header::ACCEPT};
enum UserResponse {
Json(User),
Html(String),
}
impl IntoResponse for UserResponse {
fn into_response(self) -> Response {
match self {
Self::Json(u) => Json(u).into_response(),
Self::Html(h) => Html(h).into_response(),
}
}
}
async fn show_user(Path(id): Path<u64>, headers: HeaderMap) -> UserResponse {
let user = db.get_user(id).await.unwrap();
let accept = headers.get(ACCEPT).and_then(|v| v.to_str().ok()).unwrap_or("");
if accept.contains("text/html") {
UserResponse::Html(render_user_page(&user))
} else {
UserResponse::Json(user)
}
}实际生产会更复杂——Accept 头可能是 text/html, application/xhtml+xml, application/xml;q=0.9, */*;q=0.8,要解析 quality 值(q=0.9)做优先级判断。mime-guess 或 accept-header crate 可以帮忙。
自定义的 UserResponse enum + 内部分发是处理"多格式响应"的标准模式——比在 handler 里写 if accept.contains(...) 散在各处更干净。
常见错误:impl IntoResponse 的类型推断陷阱
一个常见编译错误:
rust
async fn h(cond: bool) -> impl IntoResponse {
if cond {
Json(Data { /* ... */ })
} else {
StatusCode::NOT_FOUND // ❌ 类型不匹配
}
}if 两个分支类型必须一致——Json<Data> 和 StatusCode 是不同类型。impl IntoResponse 是占位,但占位也得指向一个具体类型——不是 "任何 IntoResponse"。错误消息会指出 "expected Json<Data>, found StatusCode" 之类。
修法一:把两个分支都走 Response(所有 IntoResponse 类型都能转 Response):
rust
async fn h(cond: bool) -> Response {
if cond {
Json(Data { /* ... */ }).into_response()
} else {
StatusCode::NOT_FOUND.into_response()
}
}修法二:用 enum 包装两种形态(类似上面的 UserResponse)。Enum 更显式、更类型化——适合"业务语义上的分类响应";直接 Response 更简单——适合"随手的分支"。
修法三:用 Result<T, E>:
rust
async fn h(cond: bool) -> Result<Json<Data>, StatusCode> {
if cond { Ok(Json(Data { /* ... */ })) }
else { Err(StatusCode::NOT_FOUND) }
}这是最常用做法——把"成功 vs 失败"编码在类型上,? 和 Result IntoResponse impl 自动处理。
跨书关联:流式响应的设计共性
SSE 在 axum 里的设计——把 Stream<Item = Event> 当作 http_body::Body——和 tonic (gRPC for Rust) 的 tonic::codec::Streaming<T> 非常相似。两者都把 Rust 的 Stream 抽象映射到 HTTP body 的 poll_frame 接口。差异在于编码:
- SSE:每个 Event 编码成
"event: X\nid: Y\ndata: Z\n\n"文本帧 - gRPC:每个 message 编码成 Length-prefixed protobuf 二进制帧
编码不同但架构一样——这是 http_body::Body trait 设计能容纳各种流式协议的证明。《Hyper 与 Tower:工业级 HTTP 栈》第 10 章讲 http_body crate 时详细讨论了这种"poll-based body 作为流式协议的通用底座"的设计权衡。
SSE 的时序分解
把 SSE 响应的完整生命周期画出来——从连接建立到事件推送到心跳维持:
这张图同时展示了三条并发通道:事件通道、心跳通道、断开检测通道。axum 的 KeepAliveStream 通过 tokio::time::Sleep 在内部做定时器,和 inner stream 的 poll 一起复用在一个 future 里。第 15 章讲 Serve 时会看到 axum 如何在连接层面做"shutdown 信号 + 其他通道"的 select——相同模式的另一层应用。
下一章进入 IntoResponseParts 与元组响应——第 9 章里已经讲过核心机制,第 11 章会挖更深的类型级组合、自定义 IntoResponseParts 的用法(比如 TypedHeader)、以及 axum 0.9 里新增的 IntoResponseFailed 在这套组合机制里的传播路径。