vLLM 推理内核深度解析
第6章 Worker 与 Executor:GPU 军团
第6章 Worker 与 Executor:GPU 军团
“An army of sheep led by a lion can defeat an army of lions led by a sheep.” — Alexander the Great
本章要点
- 理解 Executor 在 EngineCore 和 Worker 之间作为”部署拓扑中间件”的定位,以及为什么不能把这层省掉
- 掌握
collective_rpc作为 Executor 唯一核心原语的优雅:所有操作都是”在全部 Worker 上执行同一个方法” - 区分三种实现的真实适用边界:UniProc(单卡/开发)、Multiproc(V1 单机 TP)、Ray(多机或 pipeline parallel)
- 读懂 MultiprocExecutor 的共享内存 MessageQueue:本地小消息走共享内存 ring buffer,大消息或远端读者走 ZMQ pub/sub,序列化仍是 pickle
- 掌握 V0 无状态 Worker → V1 有状态 Worker 的变革,差量调度输出如何降低每步通信负担
- 走一遍 Worker 初始化的六阶段流程:init_device → load_model → profile → allocate_kv_cache → warmup → ready
- 理解 CUDA Graph 的捕获与重放、torch.compile 与 CUDA Graph 的组合策略
- 看懂
Sleep Level的两档设计:level 1 只 offload allocator 中标记为 weights 的块,level 2 先保存 model buffers 再释放 allocator 管理的块 - 了解 Worker 跨进程崩溃恢复的工程实践:fail-fast vs 自愈的取舍
6.1 为什么不能让 EngineCore 直接管 Worker
一个最朴素的设计是:EngineCore 直接 spawn Worker 进程、直接发调度结果给 Worker。省掉中间层,省掉抽象。V0 早期就是这样做的——结果呢?llm_engine.py 里出现了大量的 if tensor_parallel_size > 1 or pipeline_parallel_size > 1 or distributed_executor_backend == "ray": ... 分支。每新增一种部署形态(DP、MoE 专家并行、异构 GPU),这里就要加一行;重构起来就是噩梦。
V1 的解决:引入 Executor 抽象层。它在 EngineCore 和 Worker 之间。一切关于”部署拓扑”的差异都收进这一层里。EngineCore 只跟 Executor 对话,根本不关心背后有几个 Worker、住在哪台机器、用什么 IPC 协议。
# vllm/v1/executor/abstract.py(摘录语义)
class Executor(ExecutorBase):
@staticmethod
def get_class(vllm_config):
backend = vllm_config.parallel_config.distributed_executor_backend
if backend == "ray":
return RayDistributedExecutor
if backend == "mp":
return MultiprocExecutor
if backend == "uni":
return UniProcExecutor
...
def initialize_from_config(self, kv_cache_configs):
self.collective_rpc("initialize_from_config",
args=(kv_cache_configs,))
self.collective_rpc("compile_or_warm_up_model")
def execute_model(self, scheduler_output) -> ModelRunnerOutput:
"""EngineCore 每步只调用这一个方法。"""
results = self.collective_rpc("execute_model", args=(scheduler_output,))
return results[0] # TP 下所有 rank 结果对齐,rank 0 即可
源码里的 Executor 并不是一个纯 Python ABC,它继承 ExecutorBase,把 V1 需要的少量方法集中到 vllm/v1/executor/abstract.py:get_class() 负责按 distributed_executor_backend 选择 ray / mp / uni / external_launcher;initialize_from_config() 先广播 KV cache 初始化,再广播 compile_or_warm_up_model();execute_model() 默认广播 execute_model 并取 output[0]。这些调用点在本地源码的 26-88 行可以直接对应。
一个入口,三种实现,覆盖主要部署拓扑——这是抽象设计的关键。EngineCore 的 step() 里只写:
executor_output = self.executor.execute_model(scheduler_output)
不管底层是一个函数调用、一次共享内存广播、还是横跨 10 台机器的 Ray 集群调度,这一行都不会变。
graph TB
EC["EngineCore"] --> |execute_model| EXE["Executor 接口"]
EXE --> |"单卡/开发"| UNI["UniProcExecutor<br/>直接函数调用"]
EXE --> |"多卡单机<br/>(生产主力)"| MP["MultiprocExecutor<br/>共享内存 MessageQueue"]
EXE --> |"多卡多机 / PP"| RAY["RayDistributedExecutor<br/>Ray Compiled DAG"]
UNI --> W1["Worker 0"]
MP --> W2a["Worker 0 (GPU 0)"]
MP --> W2b["Worker 1 (GPU 1)"]
MP --> W2c["Worker 2 (GPU 2)"]
MP --> W2d["Worker 3 (GPU 3)"]
RAY --> W3a["Worker 0 (Node A)"]
RAY --> W3b["Worker 1 (Node B)"]
RAY --> W3c["..."]
style EC fill:#ec4899,color:#fff,stroke:none
style EXE fill:#f59e0b,color:#fff,stroke:none
style UNI fill:#10b981,color:#fff,stroke:none
style MP fill:#10b981,color:#fff,stroke:none
style RAY fill:#10b981,color:#fff,stroke:none
6.1.1 collective_rpc 的语义细节
看名字就知道这是”集体 RPC”。几个语义约束:
- 同一方法、相同参数——所有 Worker 运行同一个方法,不允许让 rank 0 调 A、rank 1 调 B。TP 场景下所有 rank 同步执行同一层的计算,这个语义天然契合。
- 返回按 rank 顺序的 list——rank 0 的结果在 list[0],rank 1 在 list[1]。这让调用方可以明确引用某个 rank 的结果。普通
Executor.execute_model()取output[0];MultiprocExecutor.execute_model()则进一步把rank0_reply_only=True传给collective_rpc(),只等 driver rank 的ModelRunnerOutput。 - 同步语义——
collective_rpc返回前目标 Worker 必须完成。异步需求不放在普通 mp 路径里,而是在 Ray + pipeline parallel 时通过FutureWrapper暴露给调度器。 - 超时可选——V1 mp 的
execute_model()使用EXECUTE_MODEL_TIMEOUT_S = 40;健康检查用 10 秒超时;其他初始化 RPC 多数不传 timeout。不要把这些值理解成性能指标,它们是卡死保护。 - 方法可以是字符串或 callable——mp 路径里字符串方法直接广播;callable 先用
cloudpickle.dumps(..., protocol=pickle.HIGHEST_PROTOCOL)序列化,再由 Worker 反序列化后绑定到自身执行。
这一套语义让调用方几乎感受不到 Executor 的存在——它就是个”比函数调用多跑几张 GPU”的函数。
6.2 UniProcExecutor:最简单的情况
单卡推理时只有一个 Worker,住在 EngineCore 主进程内。没有子进程、没有共享内存队列、没有跨进程序列化。V1 这里没有另写一份完整实现,而是复用 V0 的 UniProcExecutor:
# vllm/v1/executor/abstract.py
from vllm.executor.uniproc_executor import UniProcExecutor as UniProcExecutorV0
class UniProcExecutor(UniProcExecutorV0, Executor):
pass
把 V0 的本地方法调用实现接到 V1 Executor 契约上,含义很直接:collective_rpc() 在单 Worker 上退化成一次本地 run_method(),返回值仍包装成单元素 list,调用侧不用知道底层是不是进程内调用。这意味着:
- 写单元测试时,
UniProcExecutor是最容易 mock / 断言的形态; - 离线批处理和本地单卡调试常常落到这条路径;
- 调试器不用 attach 到子进程,异常栈也更接近普通 Python 调用。
“就是一个函数调用”的优雅是 V1 抽象层设计的一个侧证据——好的抽象不是往上加,而是让最简单的情况也能零成本地走这条抽象。
6.3 MultiprocExecutor:生产主力
MultiprocExecutor 对应的是 V1 的单机多卡路径。源码里第一条硬约束就写得很清楚:world_size 必须等于 tensor_parallel_size,否则报错说明 “Pipeline parallelism is not yet implemented in v1”。也就是说,V1 mp 路径负责的是单机 TP,不是多机,也不是 PP。
初始化时它做四件事:
| 步骤 | 源码位置 | 作用 |
|---|---|---|
| 设置通用多进程环境 | multiproc_executor.py:61-62 | 复用 V0/V1 共同的 worker 环境设置 |
| 生成本机分布式初始化地址 | multiproc_executor.py:64-68 | mp 只支持单机,所以用 127.0.0.1 + 空闲端口 |
| 创建广播队列 | multiproc_executor.py:70-73 | MessageQueue(world_size, world_size) 给 SchedulerOutput/RPC 命令广播用 |
| 启动 WorkerProc | multiproc_executor.py:79-99 | 每个 rank 一个进程,等 READY,再启动 worker monitor |
Worker 侧也不是神秘的”另一个服务”:WorkerProc.__init__() 收到共享内存 handle 后,创建输入 MessageQueue、创建自己的响应 MessageQueue(1, 1),然后依次调用 init_device() 和 load_model()。READY 信号发回主进程之后,才进入 worker_busy_loop() 接收后续 RPC。
6.3.1 为什么不用 multiprocessing.Queue
Python 标准的 multiprocessing.Queue 内部结构可以概括成:
Put side (主进程):
object → pickle.dumps → bytes → write() 到 pipe
Get side (子进程):
read() 从 pipe → bytes → pickle.loads → object
这类队列的问题不是”不能用”,而是它的广播语义不贴合 vLLM:一个 SchedulerOutput 要被多个 Worker 读取,普通 Queue 更像点对点消费;如果给每个 Worker 各放一份,主进程要重复写 N 次。V1 的 MessageQueue 明确为”一个 writer、多个 reader”优化,ShmRingBuffer 注释里写的就是 broadcast communication:一个 enqueue,多个 dequeue。
更重要的是,MultiprocExecutor.collective_rpc() 的请求路径非常固定:
if isinstance(method, str):
send_method = method
else:
send_method = cloudpickle.dumps(method, protocol=pickle.HIGHEST_PROTOCOL)
self.rpc_broadcast_mq.enqueue((send_method, args, kwargs, rank0_reply_only))
workers = (self.workers[0],) if rank0_reply_only else self.workers
for w in workers:
status, result = w.worker_response_mq.dequeue(...)
这段源码说明三件事:
- 主进程只向广播队列写一次
(method, args, kwargs, rank0_reply_only); execute_model()设置rank0_reply_only=True,所以响应只读 driver rank 的输出;- 若方法不是字符串,会用
cloudpickle序列化 callable;但普通热路径是字符串方法名。
6.3.2 MessageQueue 的真实协议
# vllm/distributed/device_communicators/shm_broadcast.py(摘录语义)
class MessageQueue:
def __init__(self, n_reader, n_local_reader, max_chunk_bytes=10 * MB, ...):
if n_local_reader > 0:
# local readers:
# 1. shared-memory ring buffer for small data
# 2. publish-subscribe socket for large data
self.buffer = ShmRingBuffer(n_local_reader,
max_chunk_bytes,
max_chunks)
self.local_socket = context.socket(XPUB)
...
def enqueue(self, obj):
serialized_obj = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
if len(serialized_obj) < self.buffer.max_chunk_bytes:
with self.acquire_write(timeout) as buf:
buf[0] = 0
buf[1:len(serialized_obj) + 1] = serialized_obj
else:
with self.acquire_write(timeout) as buf:
buf[0] = 1
self.local_socket.send(serialized_obj)
def dequeue(self):
with self.acquire_read(timeout, cancel) as buf:
overflow = buf[0] == 1
if not overflow:
obj = pickle.loads(buf[1:])
if overflow:
obj = MessageQueue.recv(self.local_socket, timeout)
return obj
这里有几个容易被误读的点,必须按源码来:
- 它不是 msgpack:当前实现直接
pickle.dumps()/pickle.loads(),源码在shm_broadcast.py:458-485。如果讲成 msgspec/msgpack,就是把别的路径误搬过来了。 - 它不是 Python 对象零拷贝:Python 对象仍要序列化成 bytes,读侧仍要反序列化。共享内存优化的是广播介质:本地小消息放进共享内存 ring buffer,多个 reader 打开同一段内存读取;不是把对象指针跨进程传过去。
- 大消息会走 socket:本地 reader 的注释写得很明确,小数据用 shared memory ring buffer,大数据用 publish-subscribe socket;远端 reader 也走 TCP pub/sub。这也是
MessageQueue可以同时服务本地和远端读者的原因。 - ring buffer 的同步靠元数据字节:
ShmRingBuffer把每个 chunk 的状态拆成written_flag和每个 reader 的 flag。writer 写完后把 reader flags 置 0、再设置 written flag;reader 读完后标记自己的 flag。这个状态机保证一个 chunk 在所有 reader 读完前不会被覆盖。
这套设计的收益不需要用没有来源的”快几倍”数字来支撑。真正的工程价值是语义对齐:vLLM 的主进程经常需要”写一次,让所有 rank 看见同一条命令”;MessageQueue 直接把这种广播模式做成一等公民。
sequenceDiagram
participant EC as EngineCore / Executor
participant SHM as 共享内存段
participant W0 as Worker 0 (GPU 0)
participant W1 as Worker 1 (GPU 1)
participant W2 as Worker 2 (GPU 2)
participant W3 as Worker 3 (GPU 3)
EC->>SHM: pickle 序列化 RPC tuple → 写入 ring buffer
par 所有 worker 同时读
SHM->>W0: 读取共享段 → pickle.loads
SHM->>W1: 同上
SHM->>W2: 同上
SHM->>W3: 同上
end
par 4 卡同时前向
W0->>W0: GPU 0 执行 TP-shard
W1->>W1: GPU 1 执行 TP-shard
W2->>W2: GPU 2 执行 TP-shard
W3->>W3: GPU 3 执行 TP-shard
end
Note over W0,W3: all_reduce 聚合 logits
W0->>SHM: 写入 driver rank 的 ModelRunnerOutput
SHM->>EC: 读出
6.3.3 进程模型与 CUDA 初始化陷阱
一个容易踩坑的点:CUDA 上下文不能 fork。如果主进程已经调用过任何 CUDA API(比如 torch.cuda.is_available()),再 fork 出来的子进程会继承一个”坏掉”的 CUDA 上下文,后续任何 CUDA 调用都会挂。
但本地源码并不是简单地说”永远用 spawn”。envs.py 里 VLLM_WORKER_MULTIPROC_METHOD 默认值是 "fork";get_mp_context() 会先调用 _maybe_force_spawn(),如果检测到 CUDA 已初始化,或者当前在 Ray actor 内,就把环境变量覆盖成 "spawn",再返回对应的 multiprocessing context。换句话说:
- 默认配置偏向
fork,因为启动成本低; - 如果当前进程状态会让 fork 变危险,vLLM 才强制切到
spawn; - 用户也可以通过
VLLM_WORKER_MULTIPROC_METHOD=spawn主动选择 spawn。
另一个陷阱是子进程的 GPU 绑定。本章不能写成”先设置 CUDA_VISIBLE_DEVICES=<rank> 再 import torch”,因为 V1 worker 源码里的关键动作是 Worker.init_device():设置 TORCH_NCCL_AVOID_RECORD_STREAMS=1,移除 Ray 可能留下的 NCCL_ASYNC_ERROR_HANDLING,构造 torch.device(f"cuda:{local_rank}"),然后 torch.cuda.set_device(self.device)。随后才初始化分布式环境、设置随机种子、构造 GPUModelRunner。
这段流程和上一章模型加载也连在一起:WorkerProc.__init__() 先 wrapper.init_worker(...),再建输入/输出 MessageQueue,接着 init_device()、load_model()。也就是说,进程创建只是外壳,真正的 CUDA 归属在 Worker 初始化阶段落定。
6.4 RayExecutor:多机与流水线并行
当模型大到单机放不下,或需要 pipeline parallel 跨多组 worker 时,RayDistributedExecutor 登场。V1 这里没有重写完整 Ray executor:vllm/v1/executor/ray_distributed_executor.py 只有一个薄包装,继承 V0 的 RayDistributedExecutor,主要覆盖 max_concurrent_batches 和 execute_model()。
6.4.1 为什么必须用 Ray
多机场景下跨节点 IPC 面临的问题:
- 共享内存不可用(不同节点的物理内存不互通)
- TCP socket 可用但延迟高、带宽低
- 节点调度(把 Worker 0 放哪台机器)需要资源管理
Ray 恰好提供了这些:分布式 Actor 模型(Worker = Actor)、节点调度(Placement Group)、高效的对象存储(Plasma,底层是内存映射)。
# vllm/v1/executor/ray_distributed_executor.py(概念性)
class RayDistributedExecutor(Executor):
def __init__(self, vllm_config):
# 1. 申请 placement group(告诉 Ray "我需要 16 张 GPU,TP=8 × 2 节点")
self.pg = placement_group(bundles=[{"GPU": 1, "CPU": 2}] * 16)
ray.get(self.pg.ready())
# 2. 为每个 Worker 创建 Ray Actor
self.workers = [
ray.remote(Worker).options(
num_gpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=self.pg,
placement_group_bundle_index=rank,
),
).remote(vllm_config, rank=rank, ...)
for rank in range(16)
]
# 3. 等所有 Worker 初始化完
ray.get([w.init.remote() for w in self.workers])
def collective_rpc(self, method, args=(), kwargs=None):
futures = [getattr(w, method).remote(*args, **(kwargs or {})) for w in self.workers]
return ray.get(futures)
6.4.2 Ray Compiled DAG:流水线并行的加速
V1 使用 Ray 的 Compiled DAG:第一次执行时如果 self.forward_dag is None,调用 _compiled_ray_dag(enable_asyncio=False) 构建固定执行图;之后每步直接 self.forward_dag.execute(scheduler_output)。本章不写固定微秒数字,因为这些开销取决于 Ray 版本、节点间网络、PP stage 数和对象大小;源码能确认的是:V1 把热路径收敛到 compiled DAG 的 execute(),而不是每步手写一组普通 Ray actor RPC。
# vllm/v1/executor/ray_distributed_executor.py
if self.forward_dag is None:
self.forward_dag = self._compiled_ray_dag(enable_asyncio=False)
refs = self.forward_dag.execute(scheduler_output)
if self.max_concurrent_batches == 1:
return refs[0].get()
return FutureWrapper(refs[0])
这个实现里有一个和调度器契约相关的细节:max_concurrent_batches 返回 pipeline_parallel_size。没有 PP 时,execute_model() 阻塞等待 refs[0].get();有 PP 时,它立即返回 FutureWrapper(refs[0]),让调度器可以把下一个 batch 放进流水线。也就是说,Compiled DAG 解决的是 Ray 执行拓扑的热路径,FutureWrapper 解决的是 V1 调度循环如何表达”这个 batch 还在 pipeline 里跑”。
6.4.3 什么时候该用 Ray
不该用:
- 单机多卡(优先用 MultiprocExecutor,拓扑更简单,直接使用本地进程和共享内存)
- 开发调试(用 UniProcExecutor,最好调试)
该用:
- 模型大到单机放不下(TP × 节点数 > 机器 GPU 数)
- 多副本 + 动态扩缩容(Ray 的 placement group 能随时加/减 Worker)
- pipeline parallel(必须跨节点分层)
如果只是单机 TP,V1 mp 路径更直接;如果需要跨节点资源管理或 PP 并发,Ray 才是合理选项。这里的判断标准不是”哪个名字更高级”,而是部署拓扑是否真的超出了单机共享内存能表达的范围。
6.5 Worker 的”有状态”大变革
V0 和 V1 Worker 最大的架构差异:是否缓存请求状态。
6.5.1 V0:无状态 Worker + 全量通信
EngineCore → Worker(每步):
- 本批所有请求的 block_tables
- 本批所有请求的 input_ids
- 本批所有请求的 positions
- 本批所有请求的 sampling params
- 本批所有请求的 seq_lens / context_lens
- ...
Worker → EngineCore(每步):
- 本批所有请求的采样结果
- (可能的)hidden states / logprobs
“本批”可能有 256 个请求,其中 95% 是 decode 请求(只多 1 个 token)。它们的 block_tables 和上一步几乎完全一样——但每步都要从头广播一遍。假设 decode 请求平均占 20 个 block(一个请求生成了 320 token),256 个 decode × 20 = 5120 个 int32 block id = 20 KB。每步广播一次。
6.5.2 V1:有状态 Worker + 差量同步
V1 的 Worker 在本地维护一个 InputBatch 对象(vllm/v1/worker/gpu_input_batch.py),里面是持久化的 GPU 张量:
class InputBatch:
# 下面这些都是预分配好的 GPU 张量,每步只更新变化部分
input_ids: torch.Tensor # [max_num_batched_tokens]
positions: torch.Tensor # [max_num_batched_tokens]
block_tables: torch.Tensor # [max_num_seqs, max_blocks_per_seq]
slot_mapping: torch.Tensor # [max_num_batched_tokens]
sampling_params_per_req: dict # CPU 侧字典
num_computed_tokens_cpu: np.ndarray # 每个 slot 已经计算到哪里
req_id_to_index: dict # 请求 id → slot 号
# ...
EngineCore 每步只发”差量”:
@dataclass
class SchedulerOutput:
scheduled_new_reqs: list[NewRequestData] # 本步新进 RUNNING 的
scheduled_cached_reqs: list[CachedRequestData] # 本步继续 RUNNING 的(只发 block 的追加)
finished_req_ids: set[str] # 要从本地状态清除的
preempted_req_ids: set[str] # 要标记回 WAITING 的
num_scheduled_tokens: dict[str, int] # 每个请求本步计算多少 token
decode 请求的”继续运行”通常只需要一个 int32(新增 token 数),block_tables 完全不变(除非在本步末尾跨块了)。通信量从 20 KB/step 降到 几百字节/step——数量级的改进。
6.5.3 一致性代价与保证
有状态的代价是 Worker 和 EngineCore 必须严格同步。如果一个差量消息丢了,后续所有差量都会错。
V1 的保证机制:
- 同机 IPC 的顺序约束:共享内存 ring buffer 的 written/reader flags 保证 chunk 在所有 reader 读完前不被覆盖;RPC 命令按队列顺序被 worker busy loop 消费
- 一个引擎内单向命令流:EngineCore 是广播队列的 writer,Worker 是 reader;响应队列则由对应 Worker 写回、Executor 读取,竞争面很小
- 初始化阶段先建立完整运行状态:Executor 先集体调用
initialize_from_config(kv_cache_configs),再调用compile_or_warm_up_model();进入 step 循环后,Worker 才持续接收SchedulerOutput差量 - 崩溃即 fail-fast:Ray 场景下如果 Worker 崩溃,整个 EngineCore 退出,由 K8s 重建——不尝试”复活”一个差量状态损坏的 Worker
这和”TCP 可靠字节流,不保证消息边界”的精神是一致的:尽量假设基础层可靠,只在真出问题时走最保守的恢复路径。
6.6 Worker 初始化:六阶段流程
一个 Worker 从创建到开始接 step 请求要走完 6 步:
graph TB
subgraph "阶段 1: 设备与通信"
A1["torch.cuda.set_device(rank)"]
A2["init_distributed_env<br/>(NCCL / Gloo process group)"]
A1 --> A2
end
subgraph "阶段 2: 模型加载"
B1["load_model<br/>(权重从磁盘 / HF / safetensors)"]
B2["量化处理 (FP8/GPTQ/AWQ)"]
B3["LoRA baseline 如启用"]
B1 --> B2 --> B3
end
subgraph "阶段 3: 显存探测"
C1["determine_available_memory<br/>profile_run + peak_memory"]
end
subgraph "阶段 4: KV Cache 分配"
D1["initialize_from_config<br/>torch.empty 一次性出整个 KV 池"]
end
subgraph "阶段 5: 预热"
E1["capture_cuda_graphs"]
E2["warmup_batches (各种 batch size)"]
E1 --> E2
end
subgraph "阶段 6: 就绪"
F1["run_busy_loop / wait_for_tasks"]
end
A2 --> B1
B3 --> C1
C1 --> D1
D1 --> E1
E2 --> F1
style A1 fill:#3b82f6,color:#fff,stroke:none
style B1 fill:#8b5cf6,color:#fff,stroke:none
style C1 fill:#ec4899,color:#fff,stroke:none
style D1 fill:#f59e0b,color:#fff,stroke:none
style E1 fill:#10b981,color:#fff,stroke:none
style F1 fill:#6366f1,color:#fff,stroke:none
阶段 1:设备与通信。gpu_worker.py:112-147 负责绑定 CUDA 设备、检查 dtype 支持、清理缓存、记录初始空闲显存、初始化分布式环境、设置随机种子、构造 GPUModelRunner。TORCH_NCCL_AVOID_RECORD_STREAMS=1 的设置也在这里,目的是避免反复 all_reduce 时输入 tensor 生命周期拖长导致显存增长。
阶段 2:模型加载。load_model() 如果开启 sleep mode,会用 CuMemAllocator 的 "weights" 内存池包住 model_runner.load_model();否则就是普通上下文。这个设计解释了后面的 sleep level:只有 allocator 知道哪些分配属于 weights,后续才能按 tag offload。
阶段 3:显存探测。determine_available_memory() 会 empty_cache()、重置峰值统计、调用 model_runner.profile_run(),再用 PyTorch 峰值分配加上可能的 non-torch allocations 估算 KV cache 可用显存。这里有一个重要断言:profile 前的空闲显存必须大于 profile 后的空闲显存,否则认为 profiling 期间 GPU 被其他进程干扰或初始化没有清理干净。
阶段 4:KV Cache 分配。initialize_from_config() 如果开启 sleep mode,会用 "kv_cache" 内存池包住 model_runner.initialize_kv_cache(kv_cache_config);否则直接分配。KV cache 的形状来自前面 get_kv_cache_spec() 和内存规划结果,不是 Worker 自己拍脑袋决定。
阶段 5:编译、预热与 CUDA Graph 捕获。compile_or_warm_up_model() 先挑出需要 compile 但不在 cudagraph capture sizes 里的尺寸做 _dummy_run(size);如果不是 eager 模式,再调用 capture_model();最后在 PP last rank 上跑 sampler dummy run,预分配 logits 和采样相关 buffer,减少后续内存碎片。
阶段 6:就绪。对 mp worker 来说,READY 信号是在 WorkerProc.__init__() 完成 init_device() 和 load_model() 后发出的;KV cache 初始化和 compile/warmup 则通过 Executor.initialize_from_config() 的 collective RPC 继续推进。Worker 进入 worker_busy_loop() 后,每次从广播队列取 (method, args, kwargs, rank0_only),执行对应方法,再按需把结果写回响应队列。
这六阶段有两个工程含义。第一,“加载权重”和”KV cache/graph 初始化”被拆开了:WorkerProc 启动阶段先保证模型可用,Executor 后续再集体下发 cache 与 warmup。第二,初始化耗时不能用一个固定秒数描述;它随模型大小、权重格式、磁盘/网络、量化方式、compile 配置、cudagraph capture sizes 变化。书里更应该讲清楚源码顺序和状态边界,而不是给一个看似精确但不可迁移的启动时间。
6.7 CUDA Graph 与 torch.compile:预热的秘密
6.7.1 为什么需要 CUDA Graph
每次前向传播,PyTorch 要做的事:
- Python 代码执行,构造一系列
nn.Module调用 - 每个 module 触发一系列 ATen 算子(
matmul,softmax,add…) - 每个算子启动一个 CUDA kernel
- 每次 kernel 启动都要走 CUDA Runtime API
对小 batch、短 decode step 来说,Python 调度和 kernel launch 的固定成本会变得显眼。CUDA Graph 的思路是把这些 kernel 调用序列”录制”下来(capture),后续执行时一次性”回放”(replay)。本章不把 launch cost 写成固定微秒值,因为它依赖 GPU、CUDA、PyTorch、算子融合和 batch 形状;源码层面能确认的是:V1 在 GPUModelRunner 中维护 use_cuda_graph、cudagraph_batch_sizes,运行时按 token 数决定是否走 graph。
6.7.2 V1 的 CUDA Graph 捕获策略
问题:CUDA Graph 要求所有张量形状固定。但 vLLM 的 batch 大小和序列长度都是动态的。怎么办?
V1 的方案是捕获多个固定形状的 graph。GPUModelRunner.__init__() 里把配置中的 cudagraph_capture_sizes 反转成升序的 self.cudagraph_batch_sizes;执行时如果 num_scheduled_tokens <= self.cudagraph_batch_sizes[-1],就调用 pad_for_cudagraph() 把 token 数 pad 到可捕获形状,否则走 eager 路径。
cudagraph_capture_sizes: config 中给定
self.cudagraph_batch_sizes = reversed(cudagraph_capture_sizes)
if num_scheduled_tokens <= max(self.cudagraph_batch_sizes):
num_input_tokens = pad_for_cudagraph(num_scheduled_tokens)
else:
eager mode
capture_model() 里还有一个细节:它按 reversed(self.cudagraph_batch_sizes) 捕获,也就是先捕获大形状,再捕获小形状,源码注释说这样小形状可以复用大形状分配出来的 memory pool。捕获结束时会记录 elapsed time 和占用显存,日志注释写着通常需要 5 到 20 秒;这个数字来自源码注释,仍应理解为经验范围,而不是跨模型的承诺。
6.7.3 torch.compile 的角色
PyTorch 2.0+ 的 torch.compile 能把 Python nn.Module 编译成 graph 形式。V1 的 model runner 默认启用 torch.compile——它和 CUDA Graph 是互补而非替代的:
torch.compile:Python 层面的 graph 捕获和优化(算子融合、内存规划)- CUDA Graph:CUDA 层面的 kernel 调用序列重放
两者叠加时,torch.compile 负责减少或重排算子,CUDA Graph 负责让固定形状的调用序列变成可重放路径。实际收益必须用目标模型和目标 batch 形态测出来;源码能给我们的判断是:V1 把 compile sizes、cudagraph capture sizes、sampler buffer 预热放在同一个 compile_or_warm_up_model() 阶段处理,避免运行时第一次请求承担这些成本。
6.8 Sleep Level:多租户场景的显存腾让
V1 引入了 Worker 的睡眠模式,允许空闲实例释放 GPU 显存:
# vllm/v1/worker/gpu_worker.py
class Worker:
def sleep(self, level: int):
free_before = torch.cuda.mem_get_info()[0]
if level == 2:
self._sleep_saved_buffers = {
name: buffer.cpu().clone()
for name, buffer in self.model_runner.model.named_buffers()
}
allocator = CuMemAllocator.get_instance()
allocator.sleep(offload_tags=("weights",) if level == 1 else tuple())
...
def wake_up(self, tags=None):
allocator = CuMemAllocator.get_instance()
allocator.wake_up(tags)
if self._sleep_saved_buffers:
# restore buffers saved before level 2 sleep
...
这段实现比”level 1 权重、level 2 权重+KV 全部搬到 CPU”更细。它依赖 CuMemAllocator 的 tag:
| 模式 | 源码动作 | 正确理解 |
|---|---|---|
sleep(1) | allocator.sleep(offload_tags=("weights",)) | 只对 allocator 中标记为 weights 的分配做 offload;其他 allocator 管理的块按 allocator 语义处理,不应写成”KV 一定保留” |
sleep(2) | 先把 model.named_buffers() clone 到 CPU,再 allocator.sleep(offload_tags=tuple()) | level 2 特别保护 model buffers;释放范围由 allocator 当前管理的内存池决定 |
wake_up(tags) | allocator.wake_up(tags),再恢复 _sleep_saved_buffers | 可以按 tag 唤醒;如果之前保存过 buffers,会复制回模型 buffer |
前面 load_model() 和 initialize_from_config() 的内存池 tag 正好解释了这里的行为:权重加载时使用 "weights" tag,KV cache 初始化时使用 "kv_cache" tag。sleep level 并不是 Worker 自己遍历所有 tensor 粗暴搬家,而是把释放/恢复交给 allocator 的 tagged memory pool。
stateDiagram-v2
[*] --> ACTIVE
ACTIVE --> LEVEL1: sleep(1)
LEVEL1 --> ACTIVE: wake_up()<br/>新请求到达
ACTIVE --> LEVEL2: sleep(2)
LEVEL2 --> ACTIVE: wake_up()
LEVEL1 --> LEVEL2: sleep(2)<br/>进一步腾让
note right of LEVEL1
allocator 按 weights tag offload
常用于释放权重占用
end note
note right of LEVEL2
保存 model buffers
allocator 释放更大范围
end note
典型使用场景:
- 多模型实例混部——8 张卡跑 10 个不同模型实例,靠 sleep-wake 切换活跃的
- 冷热分层——热模型常驻 GPU,冷模型进 level 2,有请求时现唤醒
- 开发环境复用——多个开发者共享一组 GPU,自己的实例用完自动 sleep
这是 V1 相对 V0 更细的资源复用能力。真正上线时需要结合模型大小、KV cache 配置、allocator tag 覆盖范围和冷启动 SLA 做测试,不能只根据 level 名字推断唤醒时间。
6.9 故障处理:fail-fast 的工程哲学
6.9.1 MultiprocExecutor 的故障检测
主进程通过 multiprocessing.Process spawn Worker 子进程。当 Worker 崩溃(OOM、CUDA error、Python 异常)时,主进程如何察觉?真实实现(vllm/v1/executor/multiproc_executor.py:107)用的不是定时轮询、而是事件驱动:
# multiproc_executor.py:107
def start_worker_monitor(self):
workers = self.workers
self_ref = weakref.ref(self)
def monitor_workers():
sentinels = [h.proc.sentinel for h in workers]
died = multiprocessing.connection.wait(sentinels) # ← 阻塞等待
_self = self_ref()
if not _self or getattr(_self, 'shutting_down', False):
return
_self.is_failed = True
proc_name = next(h.proc.name for h in workers
if h.proc.sentinel == died[0])
logger.error(
"Worker proc %s died unexpectedly, "
"shutting down executor.", proc_name)
_self.shutdown()
callback = _self.failure_callback
if callback is not None:
_self.failure_callback = None
callback()
Thread(target=monitor_workers, daemon=True,
name="MultiprocWorkerMonitor").start()
几个关键差别和常规”轮询 + is_alive()”模式形成对照:
1. multiprocessing.connection.wait(sentinels) 是阻塞式事件等待。每个 Process.sentinel 是一个 OS 级句柄;当对应进程终止,这个句柄会变成 ready,wait(sentinels) 在任意一个 ready 时返回。它的价值不是某个固定毫秒数字,而是避免了”定期 sleep 再扫一遍 is_alive()”的轮询结构,故障路径更及时也更少后台噪音。
2. weakref.ref(self) 避免监控线程强引用持有 Executor。如果监控线程直接持有 self,正常退出路径下 Executor 就 drop 不掉——daemon 线程会吊住整个进程。weakref 让 self 可以被 GC、监控线程在 self_ref() 返回 None 时自己静默退出。这是 Python daemon thread + 循环引用的标准规避。
3. shutting_down 标志区分”意外崩溃 vs 正常关停”。主线程主动 shutdown() 时会设这个 flag、之后再发 terminate 信号给 workers——workers 退出导致 wait() 返回时监控线程看到 shutting_down=True 就什么都不做。没这个 flag 的话,正常关停流程也会触发”Worker died unexpectedly”的 error log 和 callback,制造 false alarm。
4. failure_callback 是单次触发(line 129-130):_self.failure_callback = None; callback()——先置空再调用,保证一次 Worker 死亡事件只触发一次 callback。防 race condition:如果两个 worker 同时死(CUDA 级别的整 node 崩溃),本函数被并发触发两次、只有第一次走完整 callback 路径。
检测到 Worker 死亡 → 整个 EngineCore 退出 → 外部编排(K8s / systemd)重启服务。这是”fail-fast”哲学的典型体现——但具体实现里每行代码都在和”轮询延迟 / GC 环依赖 / 并发 race / false alarm”这些具体工程毒素作战。
6.9.2 为什么不尝试”复活”死 Worker
直觉上我们可能想:“Worker 死了重启一个不就行了?“但实际上:
- 权重要重新加载:耗时随模型大小、权重格式、磁盘/网络和量化方式变化,这段时间 GPU 不可稳定服务
- KV Cache 状态丢了:正在跑的请求全部作废,用户侧还得重试
- TP 场景下必须同时重启所有 rank:任何一个 rank 权重不一致就 all_reduce 崩溃
- LoRA / 量化状态也要重建:更长时间
与其让用户面对一个状态不清的半恢复实例,不如让整个实例立刻退出、由外部编排拉起新实例、健康检查期间把流量路由到其他副本。用户体验一致性更好,SRE 也更清楚状态。
6.9.3 Ray 场景的特殊处理
Ray Actor 有自己的生命周期和失败处理机制,但在 LLM 推理里,“某个 rank 单独恢复”并不等于服务恢复。TP/PP 组内的权重、KV cache、请求状态、LoRA 状态都要一致;任何一部分只恢复了一半,后续 collective 通信和调度状态都可能变成不可推理。V1 的 Ray executor 继承了 V0 的 actor/placement group 复杂逻辑,V1 自己只在 execute_model() 出口适配 compiled DAG 与 PP future;这也说明失败处理应该按整组 worker 的一致性来理解,而不是按单 actor 的重启能力来想象。
这种对”fail-fast 胜于半恢复”的坚持在 V1 其他很多地方也能看到。是一种选择,而不是懒惰——它让整个系统的状态空间变得可推理。
6.9.4 V1 三种 Executor 实现策略:从 V0 继承的不对称真相
把 V0 / V1 两套 executor 目录全部按行数实测——
| Executor | V1 实现路径 | V1 行 | V0 父类路径 | V0 行 | 实际承担 |
|---|---|---|---|---|---|
UniProcExecutor | (不存在 V1 版本) | — | vllm/executor/uniproc_executor.py | 141 | V1 直接复用 V0 实现——vllm/v1/executor/abstract.py 的 Executor 类把 V0 的 UniProcExecutor 直接拉进来当默认 |
MultiprocExecutor | vllm/v1/executor/multiproc_executor.py | 480 | (V0 不复用) | — | 完全重写——480 行全新实现、不继承任何 V0 代码 |
RayDistributedExecutor | vllm/v1/executor/ray_distributed_executor.py | 61 | vllm/executor/ray_distributed_executor.py | 700 | 极薄 V1 包装——class RayDistributedExecutor(RayDistributedExecutorV0, Executor) 只覆盖 execute_model() 加 PipelineParallel 异步 future 支持、其他全继承 |
vllm/v1/executor/ 目录总共只有 653 行——其中 multiproc_executor.py 480 占 73%——印证 §1.6.1 实测的”executor/ 仅 653 行”——是 v1/ 22 子目录里最薄的一个。
三条值得记住的工程含义——
- MultiprocExecutor 完全重写、Ray 只做薄包装——根因是两者改造代价不对称:MultiprocExecutor 的 V0 实现耦合在 worker 生命周期 + KV broadcast + 旧调度协议里、改不动只能重来;Ray 的核心逻辑(actor 创建、跨机 RPC、placement group)是稳定的、V1 只需要在
execute_model()这个出口接上新的SchedulerOutput → ModelRunnerOutput类型契约 - V1
RayDistributedExecutor61 行的FutureWrapper类(line 12-25)专门为 Pipeline Parallel 服务——max_concurrent_batches > 1时返回Future让调度器立即调度下一个 batch、不阻塞等待——这是 §6.4.2 “Compiled DAG 承接 Ray 热路径”在出口侧的对应:底层执行图固定,上层调度器拿到 future 后可以继续推进 pipeline v1/worker/4851 行 vsv1/executor/653 行 = 7.4 倍——印证 §1.6.1 测得的”worker 23% 是最重子系统”——单卡内的 GPU 流水线编排(gpu_model_runner.py 1776 + tpu_model_runner.py 1419 + gpu_input_batch.py 677)远比跨进程编排重——这是 GPU 推理”重活在 worker、薄壳在 executor”的设计底色
gpu_model_runner.py 1776 行是 v1/worker/ 最重单文件——比本章 §6.6 讨论的”worker 6 阶段初始化”(gpu_worker.py 349)还重 5 倍——是下一章 ch07 ModelRunner 的核心读物。
6.10 本章小结
Worker 与 Executor 是 vLLM 从”调度决策”到”GPU 计算”的执行层:
- Executor 抽象层封装部署拓扑,让 EngineCore 的 step() 代码永远只有一行调用;
collective_rpc是唯一核心原语 - UniProcExecutor——单卡/开发用,函数调用即 RPC,测试友好
- MultiprocExecutor——V1 单机 TP 路径;共享内存 MessageQueue 用 pickle 序列化、本地小消息走 shared-memory ring buffer,大消息或远端读者走 ZMQ pub/sub;进程启动方式由
VLLM_WORKER_MULTIPROC_METHOD和_maybe_force_spawn()决定 - RayDistributedExecutor——多机或 pipeline parallel 路径;V1 是 V0 Ray executor 的薄包装,热路径通过 Compiled DAG
execute(),PP 时返回FutureWrapper - 有状态 Worker——本地维护 InputBatch 张量,每步同步
SchedulerOutput差量;decode-heavy 场景下少发重复 block table 与 sampling 状态,一致性靠”可靠命令流 + fail-fast”保证 - 六阶段初始化——设备、模型、探测、KV 分配、预热、就绪;源码顺序比固定启动耗时更重要
- CUDA Graph + torch.compile——互补方案:Python 侧算子融合 + CUDA 侧调用序列重放;capture sizes 来自配置,运行时通过
pad_for_cudagraph()对齐 - Sleep Level——level 1 按 weights tag offload,level 2 先保存 model buffers 再释放 allocator 管理内存;具体释放量要看 allocator tag 覆盖
- fail-fast 哲学——Worker 崩溃 → 整个 EngineCore 退出 → 外部编排重建;不尝试半恢复,因为权重重载太慢、状态太复杂
物理事实:v1/executor/ 仅 653 行(v1/ 22 子目录最薄);MultiprocExecutor 完全重写 480 行 vs RayDistributedExecutor 仅 61 行薄包装继承 V0 700 行——印证’重活在 worker、薄壳在 executor’设计底色;gpu_model_runner.py 1776 行是 v1/worker/ 最重单文件、是下一章 ModelRunner 核心读物。
源码导航
- Executor 抽象:
vllm/v1/executor/abstract.py- UniProcExecutor:
vllm/executor/uniproc_executor.py- MultiprocExecutor:
vllm/v1/executor/multiproc_executor.py- RayDistributedExecutor:
vllm/v1/executor/ray_distributed_executor.py- 共享内存 MessageQueue:
vllm/distributed/device_communicators/shm_broadcast.py- GPU Worker:
vllm/v1/worker/gpu_worker.py- InputBatch(有状态 Worker 的核心):
vllm/v1/worker/gpu_input_batch.py- CUDA Graph 捕获:
vllm/v1/worker/gpu_model_runner.py(capture_model)