vLLM 推理内核深度解析

第6章 Worker 与 Executor:GPU 军团

作者 杨艺韬 · 8,520 字

第6章 Worker 与 Executor:GPU 军团

“An army of sheep led by a lion can defeat an army of lions led by a sheep.” — Alexander the Great

本章要点

  • 理解 Executor 在 EngineCore 和 Worker 之间作为”部署拓扑中间件”的定位,以及为什么不能把这层省掉
  • 掌握 collective_rpc 作为 Executor 唯一核心原语的优雅:所有操作都是”在全部 Worker 上执行同一个方法”
  • 区分三种实现的真实适用边界:UniProc(单卡/开发)、Multiproc(V1 单机 TP)、Ray(多机或 pipeline parallel)
  • 读懂 MultiprocExecutor 的共享内存 MessageQueue:本地小消息走共享内存 ring buffer,大消息或远端读者走 ZMQ pub/sub,序列化仍是 pickle
  • 掌握 V0 无状态 Worker → V1 有状态 Worker 的变革,差量调度输出如何降低每步通信负担
  • 走一遍 Worker 初始化的六阶段流程:init_device → load_model → profile → allocate_kv_cache → warmup → ready
  • 理解 CUDA Graph 的捕获与重放、torch.compile 与 CUDA Graph 的组合策略
  • 看懂 Sleep Level 的两档设计:level 1 只 offload allocator 中标记为 weights 的块,level 2 先保存 model buffers 再释放 allocator 管理的块
  • 了解 Worker 跨进程崩溃恢复的工程实践:fail-fast vs 自愈的取舍

6.1 为什么不能让 EngineCore 直接管 Worker

一个最朴素的设计是:EngineCore 直接 spawn Worker 进程、直接发调度结果给 Worker。省掉中间层,省掉抽象。V0 早期就是这样做的——结果呢?llm_engine.py 里出现了大量的 if tensor_parallel_size > 1 or pipeline_parallel_size > 1 or distributed_executor_backend == "ray": ... 分支。每新增一种部署形态(DP、MoE 专家并行、异构 GPU),这里就要加一行;重构起来就是噩梦。

V1 的解决:引入 Executor 抽象层。它在 EngineCore 和 Worker 之间。一切关于”部署拓扑”的差异都收进这一层里。EngineCore 只跟 Executor 对话,根本不关心背后有几个 Worker、住在哪台机器、用什么 IPC 协议。

# vllm/v1/executor/abstract.py(摘录语义)
class Executor(ExecutorBase):
    @staticmethod
    def get_class(vllm_config):
        backend = vllm_config.parallel_config.distributed_executor_backend
        if backend == "ray":
            return RayDistributedExecutor
        if backend == "mp":
            return MultiprocExecutor
        if backend == "uni":
            return UniProcExecutor
        ...

    def initialize_from_config(self, kv_cache_configs):
        self.collective_rpc("initialize_from_config",
                            args=(kv_cache_configs,))
        self.collective_rpc("compile_or_warm_up_model")

    def execute_model(self, scheduler_output) -> ModelRunnerOutput:
        """EngineCore 每步只调用这一个方法。"""
        results = self.collective_rpc("execute_model", args=(scheduler_output,))
        return results[0]  # TP 下所有 rank 结果对齐,rank 0 即可

源码里的 Executor 并不是一个纯 Python ABC,它继承 ExecutorBase,把 V1 需要的少量方法集中到 vllm/v1/executor/abstract.pyget_class() 负责按 distributed_executor_backend 选择 ray / mp / uni / external_launcherinitialize_from_config() 先广播 KV cache 初始化,再广播 compile_or_warm_up_model()execute_model() 默认广播 execute_model 并取 output[0]。这些调用点在本地源码的 26-88 行可以直接对应。

一个入口,三种实现,覆盖主要部署拓扑——这是抽象设计的关键。EngineCore 的 step() 里只写:

executor_output = self.executor.execute_model(scheduler_output)

不管底层是一个函数调用、一次共享内存广播、还是横跨 10 台机器的 Ray 集群调度,这一行都不会变。

graph TB
    EC["EngineCore"] --> |execute_model| EXE["Executor 接口"]

    EXE --> |"单卡/开发"| UNI["UniProcExecutor<br/>直接函数调用"]
    EXE --> |"多卡单机<br/>(生产主力)"| MP["MultiprocExecutor<br/>共享内存 MessageQueue"]
    EXE --> |"多卡多机 / PP"| RAY["RayDistributedExecutor<br/>Ray Compiled DAG"]

    UNI --> W1["Worker 0"]
    MP --> W2a["Worker 0 (GPU 0)"]
    MP --> W2b["Worker 1 (GPU 1)"]
    MP --> W2c["Worker 2 (GPU 2)"]
    MP --> W2d["Worker 3 (GPU 3)"]
    RAY --> W3a["Worker 0 (Node A)"]
    RAY --> W3b["Worker 1 (Node B)"]
    RAY --> W3c["..."]

    style EC fill:#ec4899,color:#fff,stroke:none
    style EXE fill:#f59e0b,color:#fff,stroke:none
    style UNI fill:#10b981,color:#fff,stroke:none
    style MP fill:#10b981,color:#fff,stroke:none
    style RAY fill:#10b981,color:#fff,stroke:none

6.1.1 collective_rpc 的语义细节

看名字就知道这是”集体 RPC”。几个语义约束:

  • 同一方法、相同参数——所有 Worker 运行同一个方法,不允许让 rank 0 调 A、rank 1 调 B。TP 场景下所有 rank 同步执行同一层的计算,这个语义天然契合。
  • 返回按 rank 顺序的 list——rank 0 的结果在 list[0],rank 1 在 list[1]。这让调用方可以明确引用某个 rank 的结果。普通 Executor.execute_model()output[0]MultiprocExecutor.execute_model() 则进一步把 rank0_reply_only=True 传给 collective_rpc(),只等 driver rank 的 ModelRunnerOutput
  • 同步语义——collective_rpc 返回前目标 Worker 必须完成。异步需求不放在普通 mp 路径里,而是在 Ray + pipeline parallel 时通过 FutureWrapper 暴露给调度器。
  • 超时可选——V1 mp 的 execute_model() 使用 EXECUTE_MODEL_TIMEOUT_S = 40;健康检查用 10 秒超时;其他初始化 RPC 多数不传 timeout。不要把这些值理解成性能指标,它们是卡死保护。
  • 方法可以是字符串或 callable——mp 路径里字符串方法直接广播;callable 先用 cloudpickle.dumps(..., protocol=pickle.HIGHEST_PROTOCOL) 序列化,再由 Worker 反序列化后绑定到自身执行。

这一套语义让调用方几乎感受不到 Executor 的存在——它就是个”比函数调用多跑几张 GPU”的函数。

6.2 UniProcExecutor:最简单的情况

单卡推理时只有一个 Worker,住在 EngineCore 主进程内。没有子进程、没有共享内存队列、没有跨进程序列化。V1 这里没有另写一份完整实现,而是复用 V0 的 UniProcExecutor

# vllm/v1/executor/abstract.py
from vllm.executor.uniproc_executor import UniProcExecutor as UniProcExecutorV0

class UniProcExecutor(UniProcExecutorV0, Executor):
    pass

把 V0 的本地方法调用实现接到 V1 Executor 契约上,含义很直接:collective_rpc() 在单 Worker 上退化成一次本地 run_method(),返回值仍包装成单元素 list,调用侧不用知道底层是不是进程内调用。这意味着:

  • 写单元测试时,UniProcExecutor 是最容易 mock / 断言的形态;
  • 离线批处理和本地单卡调试常常落到这条路径;
  • 调试器不用 attach 到子进程,异常栈也更接近普通 Python 调用。

“就是一个函数调用”的优雅是 V1 抽象层设计的一个侧证据——好的抽象不是往上加,而是让最简单的情况也能零成本地走这条抽象。

6.3 MultiprocExecutor:生产主力

MultiprocExecutor 对应的是 V1 的单机多卡路径。源码里第一条硬约束就写得很清楚:world_size 必须等于 tensor_parallel_size,否则报错说明 “Pipeline parallelism is not yet implemented in v1”。也就是说,V1 mp 路径负责的是单机 TP,不是多机,也不是 PP。

初始化时它做四件事:

步骤源码位置作用
设置通用多进程环境multiproc_executor.py:61-62复用 V0/V1 共同的 worker 环境设置
生成本机分布式初始化地址multiproc_executor.py:64-68mp 只支持单机,所以用 127.0.0.1 + 空闲端口
创建广播队列multiproc_executor.py:70-73MessageQueue(world_size, world_size) 给 SchedulerOutput/RPC 命令广播用
启动 WorkerProcmultiproc_executor.py:79-99每个 rank 一个进程,等 READY,再启动 worker monitor

Worker 侧也不是神秘的”另一个服务”:WorkerProc.__init__() 收到共享内存 handle 后,创建输入 MessageQueue、创建自己的响应 MessageQueue(1, 1),然后依次调用 init_device()load_model()。READY 信号发回主进程之后,才进入 worker_busy_loop() 接收后续 RPC。

6.3.1 为什么不用 multiprocessing.Queue

Python 标准的 multiprocessing.Queue 内部结构可以概括成:

Put side (主进程):
  object → pickle.dumps → bytes → write() 到 pipe

Get side (子进程):
  read() 从 pipe → bytes → pickle.loads → object

这类队列的问题不是”不能用”,而是它的广播语义不贴合 vLLM:一个 SchedulerOutput 要被多个 Worker 读取,普通 Queue 更像点对点消费;如果给每个 Worker 各放一份,主进程要重复写 N 次。V1 的 MessageQueue 明确为”一个 writer、多个 reader”优化,ShmRingBuffer 注释里写的就是 broadcast communication:一个 enqueue,多个 dequeue

更重要的是,MultiprocExecutor.collective_rpc() 的请求路径非常固定:

if isinstance(method, str):
    send_method = method
else:
    send_method = cloudpickle.dumps(method, protocol=pickle.HIGHEST_PROTOCOL)
self.rpc_broadcast_mq.enqueue((send_method, args, kwargs, rank0_reply_only))

workers = (self.workers[0],) if rank0_reply_only else self.workers
for w in workers:
    status, result = w.worker_response_mq.dequeue(...)

这段源码说明三件事:

  • 主进程只向广播队列写一次 (method, args, kwargs, rank0_reply_only)
  • execute_model() 设置 rank0_reply_only=True,所以响应只读 driver rank 的输出;
  • 若方法不是字符串,会用 cloudpickle 序列化 callable;但普通热路径是字符串方法名。

6.3.2 MessageQueue 的真实协议

# vllm/distributed/device_communicators/shm_broadcast.py(摘录语义)
class MessageQueue:
    def __init__(self, n_reader, n_local_reader, max_chunk_bytes=10 * MB, ...):
        if n_local_reader > 0:
            # local readers:
            # 1. shared-memory ring buffer for small data
            # 2. publish-subscribe socket for large data
            self.buffer = ShmRingBuffer(n_local_reader,
                                        max_chunk_bytes,
                                        max_chunks)
            self.local_socket = context.socket(XPUB)
            ...

    def enqueue(self, obj):
        serialized_obj = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
        if len(serialized_obj) < self.buffer.max_chunk_bytes:
            with self.acquire_write(timeout) as buf:
                buf[0] = 0
                buf[1:len(serialized_obj) + 1] = serialized_obj
        else:
            with self.acquire_write(timeout) as buf:
                buf[0] = 1
            self.local_socket.send(serialized_obj)

    def dequeue(self):
        with self.acquire_read(timeout, cancel) as buf:
            overflow = buf[0] == 1
            if not overflow:
                obj = pickle.loads(buf[1:])
        if overflow:
            obj = MessageQueue.recv(self.local_socket, timeout)
        return obj

这里有几个容易被误读的点,必须按源码来:

  • 它不是 msgpack:当前实现直接 pickle.dumps() / pickle.loads(),源码在 shm_broadcast.py:458-485。如果讲成 msgspec/msgpack,就是把别的路径误搬过来了。
  • 它不是 Python 对象零拷贝:Python 对象仍要序列化成 bytes,读侧仍要反序列化。共享内存优化的是广播介质:本地小消息放进共享内存 ring buffer,多个 reader 打开同一段内存读取;不是把对象指针跨进程传过去。
  • 大消息会走 socket:本地 reader 的注释写得很明确,小数据用 shared memory ring buffer,大数据用 publish-subscribe socket;远端 reader 也走 TCP pub/sub。这也是 MessageQueue 可以同时服务本地和远端读者的原因。
  • ring buffer 的同步靠元数据字节ShmRingBuffer 把每个 chunk 的状态拆成 written_flag 和每个 reader 的 flag。writer 写完后把 reader flags 置 0、再设置 written flag;reader 读完后标记自己的 flag。这个状态机保证一个 chunk 在所有 reader 读完前不会被覆盖。

这套设计的收益不需要用没有来源的”快几倍”数字来支撑。真正的工程价值是语义对齐:vLLM 的主进程经常需要”写一次,让所有 rank 看见同一条命令”;MessageQueue 直接把这种广播模式做成一等公民。

sequenceDiagram
    participant EC as EngineCore / Executor
    participant SHM as 共享内存段
    participant W0 as Worker 0 (GPU 0)
    participant W1 as Worker 1 (GPU 1)
    participant W2 as Worker 2 (GPU 2)
    participant W3 as Worker 3 (GPU 3)

    EC->>SHM: pickle 序列化 RPC tuple → 写入 ring buffer

    par 所有 worker 同时读
        SHM->>W0: 读取共享段 → pickle.loads
        SHM->>W1: 同上
        SHM->>W2: 同上
        SHM->>W3: 同上
    end

    par 4 卡同时前向
        W0->>W0: GPU 0 执行 TP-shard
        W1->>W1: GPU 1 执行 TP-shard
        W2->>W2: GPU 2 执行 TP-shard
        W3->>W3: GPU 3 执行 TP-shard
    end

    Note over W0,W3: all_reduce 聚合 logits
    W0->>SHM: 写入 driver rank 的 ModelRunnerOutput
    SHM->>EC: 读出

6.3.3 进程模型与 CUDA 初始化陷阱

一个容易踩坑的点:CUDA 上下文不能 fork。如果主进程已经调用过任何 CUDA API(比如 torch.cuda.is_available()),再 fork 出来的子进程会继承一个”坏掉”的 CUDA 上下文,后续任何 CUDA 调用都会挂。

但本地源码并不是简单地说”永远用 spawn”。envs.pyVLLM_WORKER_MULTIPROC_METHOD 默认值是 "fork"get_mp_context() 会先调用 _maybe_force_spawn(),如果检测到 CUDA 已初始化,或者当前在 Ray actor 内,就把环境变量覆盖成 "spawn",再返回对应的 multiprocessing context。换句话说:

  • 默认配置偏向 fork,因为启动成本低;
  • 如果当前进程状态会让 fork 变危险,vLLM 才强制切到 spawn
  • 用户也可以通过 VLLM_WORKER_MULTIPROC_METHOD=spawn 主动选择 spawn。

另一个陷阱是子进程的 GPU 绑定。本章不能写成”先设置 CUDA_VISIBLE_DEVICES=<rank> 再 import torch”,因为 V1 worker 源码里的关键动作是 Worker.init_device():设置 TORCH_NCCL_AVOID_RECORD_STREAMS=1,移除 Ray 可能留下的 NCCL_ASYNC_ERROR_HANDLING,构造 torch.device(f"cuda:{local_rank}"),然后 torch.cuda.set_device(self.device)。随后才初始化分布式环境、设置随机种子、构造 GPUModelRunner

这段流程和上一章模型加载也连在一起:WorkerProc.__init__()wrapper.init_worker(...),再建输入/输出 MessageQueue,接着 init_device()load_model()。也就是说,进程创建只是外壳,真正的 CUDA 归属在 Worker 初始化阶段落定。

6.4 RayExecutor:多机与流水线并行

当模型大到单机放不下,或需要 pipeline parallel 跨多组 worker 时,RayDistributedExecutor 登场。V1 这里没有重写完整 Ray executor:vllm/v1/executor/ray_distributed_executor.py 只有一个薄包装,继承 V0 的 RayDistributedExecutor,主要覆盖 max_concurrent_batchesexecute_model()

6.4.1 为什么必须用 Ray

多机场景下跨节点 IPC 面临的问题:

  • 共享内存不可用(不同节点的物理内存不互通)
  • TCP socket 可用但延迟高、带宽低
  • 节点调度(把 Worker 0 放哪台机器)需要资源管理

Ray 恰好提供了这些:分布式 Actor 模型(Worker = Actor)、节点调度(Placement Group)、高效的对象存储(Plasma,底层是内存映射)。

# vllm/v1/executor/ray_distributed_executor.py(概念性)
class RayDistributedExecutor(Executor):
    def __init__(self, vllm_config):
        # 1. 申请 placement group(告诉 Ray "我需要 16 张 GPU,TP=8 × 2 节点")
        self.pg = placement_group(bundles=[{"GPU": 1, "CPU": 2}] * 16)
        ray.get(self.pg.ready())

        # 2. 为每个 Worker 创建 Ray Actor
        self.workers = [
            ray.remote(Worker).options(
                num_gpus=1,
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=self.pg,
                    placement_group_bundle_index=rank,
                ),
            ).remote(vllm_config, rank=rank, ...)
            for rank in range(16)
        ]

        # 3. 等所有 Worker 初始化完
        ray.get([w.init.remote() for w in self.workers])

    def collective_rpc(self, method, args=(), kwargs=None):
        futures = [getattr(w, method).remote(*args, **(kwargs or {})) for w in self.workers]
        return ray.get(futures)

6.4.2 Ray Compiled DAG:流水线并行的加速

V1 使用 Ray 的 Compiled DAG:第一次执行时如果 self.forward_dag is None,调用 _compiled_ray_dag(enable_asyncio=False) 构建固定执行图;之后每步直接 self.forward_dag.execute(scheduler_output)。本章不写固定微秒数字,因为这些开销取决于 Ray 版本、节点间网络、PP stage 数和对象大小;源码能确认的是:V1 把热路径收敛到 compiled DAG 的 execute(),而不是每步手写一组普通 Ray actor RPC。

# vllm/v1/executor/ray_distributed_executor.py
if self.forward_dag is None:
    self.forward_dag = self._compiled_ray_dag(enable_asyncio=False)

refs = self.forward_dag.execute(scheduler_output)
if self.max_concurrent_batches == 1:
    return refs[0].get()
return FutureWrapper(refs[0])

这个实现里有一个和调度器契约相关的细节:max_concurrent_batches 返回 pipeline_parallel_size。没有 PP 时,execute_model() 阻塞等待 refs[0].get();有 PP 时,它立即返回 FutureWrapper(refs[0]),让调度器可以把下一个 batch 放进流水线。也就是说,Compiled DAG 解决的是 Ray 执行拓扑的热路径,FutureWrapper 解决的是 V1 调度循环如何表达”这个 batch 还在 pipeline 里跑”。

6.4.3 什么时候该用 Ray

不该用

  • 单机多卡(优先用 MultiprocExecutor,拓扑更简单,直接使用本地进程和共享内存)
  • 开发调试(用 UniProcExecutor,最好调试)

该用

  • 模型大到单机放不下(TP × 节点数 > 机器 GPU 数)
  • 多副本 + 动态扩缩容(Ray 的 placement group 能随时加/减 Worker)
  • pipeline parallel(必须跨节点分层)

如果只是单机 TP,V1 mp 路径更直接;如果需要跨节点资源管理或 PP 并发,Ray 才是合理选项。这里的判断标准不是”哪个名字更高级”,而是部署拓扑是否真的超出了单机共享内存能表达的范围。

6.5 Worker 的”有状态”大变革

V0 和 V1 Worker 最大的架构差异:是否缓存请求状态

6.5.1 V0:无状态 Worker + 全量通信

EngineCore → Worker(每步):
  - 本批所有请求的 block_tables
  - 本批所有请求的 input_ids
  - 本批所有请求的 positions
  - 本批所有请求的 sampling params
  - 本批所有请求的 seq_lens / context_lens
  - ...

Worker → EngineCore(每步):
  - 本批所有请求的采样结果
  - (可能的)hidden states / logprobs

“本批”可能有 256 个请求,其中 95% 是 decode 请求(只多 1 个 token)。它们的 block_tables 和上一步几乎完全一样——但每步都要从头广播一遍。假设 decode 请求平均占 20 个 block(一个请求生成了 320 token),256 个 decode × 20 = 5120 个 int32 block id = 20 KB。每步广播一次。

6.5.2 V1:有状态 Worker + 差量同步

V1 的 Worker 在本地维护一个 InputBatch 对象(vllm/v1/worker/gpu_input_batch.py),里面是持久化的 GPU 张量

class InputBatch:
    # 下面这些都是预分配好的 GPU 张量,每步只更新变化部分
    input_ids: torch.Tensor           # [max_num_batched_tokens]
    positions: torch.Tensor           # [max_num_batched_tokens]
    block_tables: torch.Tensor        # [max_num_seqs, max_blocks_per_seq]
    slot_mapping: torch.Tensor        # [max_num_batched_tokens]
    sampling_params_per_req: dict     # CPU 侧字典
    num_computed_tokens_cpu: np.ndarray  # 每个 slot 已经计算到哪里
    req_id_to_index: dict             # 请求 id → slot 号
    # ...

EngineCore 每步只发”差量”:

@dataclass
class SchedulerOutput:
    scheduled_new_reqs: list[NewRequestData]    # 本步新进 RUNNING 的
    scheduled_cached_reqs: list[CachedRequestData]  # 本步继续 RUNNING 的(只发 block 的追加)
    finished_req_ids: set[str]                   # 要从本地状态清除的
    preempted_req_ids: set[str]                  # 要标记回 WAITING 的
    num_scheduled_tokens: dict[str, int]         # 每个请求本步计算多少 token

decode 请求的”继续运行”通常只需要一个 int32(新增 token 数),block_tables 完全不变(除非在本步末尾跨块了)。通信量从 20 KB/step 降到 几百字节/step——数量级的改进。

6.5.3 一致性代价与保证

有状态的代价是 Worker 和 EngineCore 必须严格同步。如果一个差量消息丢了,后续所有差量都会错。

V1 的保证机制:

  • 同机 IPC 的顺序约束:共享内存 ring buffer 的 written/reader flags 保证 chunk 在所有 reader 读完前不被覆盖;RPC 命令按队列顺序被 worker busy loop 消费
  • 一个引擎内单向命令流:EngineCore 是广播队列的 writer,Worker 是 reader;响应队列则由对应 Worker 写回、Executor 读取,竞争面很小
  • 初始化阶段先建立完整运行状态:Executor 先集体调用 initialize_from_config(kv_cache_configs),再调用 compile_or_warm_up_model();进入 step 循环后,Worker 才持续接收 SchedulerOutput 差量
  • 崩溃即 fail-fast:Ray 场景下如果 Worker 崩溃,整个 EngineCore 退出,由 K8s 重建——不尝试”复活”一个差量状态损坏的 Worker

这和”TCP 可靠字节流,不保证消息边界”的精神是一致的:尽量假设基础层可靠,只在真出问题时走最保守的恢复路径。

6.6 Worker 初始化:六阶段流程

一个 Worker 从创建到开始接 step 请求要走完 6 步:

graph TB
    subgraph "阶段 1: 设备与通信"
        A1["torch.cuda.set_device(rank)"]
        A2["init_distributed_env<br/>(NCCL / Gloo process group)"]
        A1 --> A2
    end

    subgraph "阶段 2: 模型加载"
        B1["load_model<br/>(权重从磁盘 / HF / safetensors)"]
        B2["量化处理 (FP8/GPTQ/AWQ)"]
        B3["LoRA baseline 如启用"]
        B1 --> B2 --> B3
    end

    subgraph "阶段 3: 显存探测"
        C1["determine_available_memory<br/>profile_run + peak_memory"]
    end

    subgraph "阶段 4: KV Cache 分配"
        D1["initialize_from_config<br/>torch.empty 一次性出整个 KV 池"]
    end

    subgraph "阶段 5: 预热"
        E1["capture_cuda_graphs"]
        E2["warmup_batches (各种 batch size)"]
        E1 --> E2
    end

    subgraph "阶段 6: 就绪"
        F1["run_busy_loop / wait_for_tasks"]
    end

    A2 --> B1
    B3 --> C1
    C1 --> D1
    D1 --> E1
    E2 --> F1

    style A1 fill:#3b82f6,color:#fff,stroke:none
    style B1 fill:#8b5cf6,color:#fff,stroke:none
    style C1 fill:#ec4899,color:#fff,stroke:none
    style D1 fill:#f59e0b,color:#fff,stroke:none
    style E1 fill:#10b981,color:#fff,stroke:none
    style F1 fill:#6366f1,color:#fff,stroke:none

阶段 1:设备与通信gpu_worker.py:112-147 负责绑定 CUDA 设备、检查 dtype 支持、清理缓存、记录初始空闲显存、初始化分布式环境、设置随机种子、构造 GPUModelRunnerTORCH_NCCL_AVOID_RECORD_STREAMS=1 的设置也在这里,目的是避免反复 all_reduce 时输入 tensor 生命周期拖长导致显存增长。

阶段 2:模型加载load_model() 如果开启 sleep mode,会用 CuMemAllocator"weights" 内存池包住 model_runner.load_model();否则就是普通上下文。这个设计解释了后面的 sleep level:只有 allocator 知道哪些分配属于 weights,后续才能按 tag offload。

阶段 3:显存探测determine_available_memory()empty_cache()、重置峰值统计、调用 model_runner.profile_run(),再用 PyTorch 峰值分配加上可能的 non-torch allocations 估算 KV cache 可用显存。这里有一个重要断言:profile 前的空闲显存必须大于 profile 后的空闲显存,否则认为 profiling 期间 GPU 被其他进程干扰或初始化没有清理干净。

阶段 4:KV Cache 分配initialize_from_config() 如果开启 sleep mode,会用 "kv_cache" 内存池包住 model_runner.initialize_kv_cache(kv_cache_config);否则直接分配。KV cache 的形状来自前面 get_kv_cache_spec() 和内存规划结果,不是 Worker 自己拍脑袋决定。

阶段 5:编译、预热与 CUDA Graph 捕获compile_or_warm_up_model() 先挑出需要 compile 但不在 cudagraph capture sizes 里的尺寸做 _dummy_run(size);如果不是 eager 模式,再调用 capture_model();最后在 PP last rank 上跑 sampler dummy run,预分配 logits 和采样相关 buffer,减少后续内存碎片。

阶段 6:就绪。对 mp worker 来说,READY 信号是在 WorkerProc.__init__() 完成 init_device()load_model() 后发出的;KV cache 初始化和 compile/warmup 则通过 Executor.initialize_from_config() 的 collective RPC 继续推进。Worker 进入 worker_busy_loop() 后,每次从广播队列取 (method, args, kwargs, rank0_only),执行对应方法,再按需把结果写回响应队列。

这六阶段有两个工程含义。第一,“加载权重”和”KV cache/graph 初始化”被拆开了:WorkerProc 启动阶段先保证模型可用,Executor 后续再集体下发 cache 与 warmup。第二,初始化耗时不能用一个固定秒数描述;它随模型大小、权重格式、磁盘/网络、量化方式、compile 配置、cudagraph capture sizes 变化。书里更应该讲清楚源码顺序和状态边界,而不是给一个看似精确但不可迁移的启动时间。

6.7 CUDA Graph 与 torch.compile:预热的秘密

6.7.1 为什么需要 CUDA Graph

每次前向传播,PyTorch 要做的事:

  1. Python 代码执行,构造一系列 nn.Module 调用
  2. 每个 module 触发一系列 ATen 算子(matmul, softmax, add …)
  3. 每个算子启动一个 CUDA kernel
  4. 每次 kernel 启动都要走 CUDA Runtime API

对小 batch、短 decode step 来说,Python 调度和 kernel launch 的固定成本会变得显眼。CUDA Graph 的思路是把这些 kernel 调用序列”录制”下来(capture),后续执行时一次性”回放”(replay)。本章不把 launch cost 写成固定微秒值,因为它依赖 GPU、CUDA、PyTorch、算子融合和 batch 形状;源码层面能确认的是:V1 在 GPUModelRunner 中维护 use_cuda_graphcudagraph_batch_sizes,运行时按 token 数决定是否走 graph。

6.7.2 V1 的 CUDA Graph 捕获策略

问题:CUDA Graph 要求所有张量形状固定。但 vLLM 的 batch 大小和序列长度都是动态的。怎么办?

V1 的方案是捕获多个固定形状的 graphGPUModelRunner.__init__() 里把配置中的 cudagraph_capture_sizes 反转成升序的 self.cudagraph_batch_sizes;执行时如果 num_scheduled_tokens <= self.cudagraph_batch_sizes[-1],就调用 pad_for_cudagraph() 把 token 数 pad 到可捕获形状,否则走 eager 路径。

cudagraph_capture_sizes: config 中给定
self.cudagraph_batch_sizes = reversed(cudagraph_capture_sizes)
if num_scheduled_tokens <= max(self.cudagraph_batch_sizes):
    num_input_tokens = pad_for_cudagraph(num_scheduled_tokens)
else:
    eager mode

capture_model() 里还有一个细节:它按 reversed(self.cudagraph_batch_sizes) 捕获,也就是先捕获大形状,再捕获小形状,源码注释说这样小形状可以复用大形状分配出来的 memory pool。捕获结束时会记录 elapsed time 和占用显存,日志注释写着通常需要 5 到 20 秒;这个数字来自源码注释,仍应理解为经验范围,而不是跨模型的承诺。

6.7.3 torch.compile 的角色

PyTorch 2.0+ 的 torch.compile 能把 Python nn.Module 编译成 graph 形式。V1 的 model runner 默认启用 torch.compile——它和 CUDA Graph 是互补而非替代的:

  • torch.compile:Python 层面的 graph 捕获和优化(算子融合、内存规划)
  • CUDA Graph:CUDA 层面的 kernel 调用序列重放

两者叠加时,torch.compile 负责减少或重排算子,CUDA Graph 负责让固定形状的调用序列变成可重放路径。实际收益必须用目标模型和目标 batch 形态测出来;源码能给我们的判断是:V1 把 compile sizes、cudagraph capture sizes、sampler buffer 预热放在同一个 compile_or_warm_up_model() 阶段处理,避免运行时第一次请求承担这些成本。

6.8 Sleep Level:多租户场景的显存腾让

V1 引入了 Worker 的睡眠模式,允许空闲实例释放 GPU 显存:

# vllm/v1/worker/gpu_worker.py
class Worker:
    def sleep(self, level: int):
        free_before = torch.cuda.mem_get_info()[0]
        if level == 2:
            self._sleep_saved_buffers = {
                name: buffer.cpu().clone()
                for name, buffer in self.model_runner.model.named_buffers()
            }
        allocator = CuMemAllocator.get_instance()
        allocator.sleep(offload_tags=("weights",) if level == 1 else tuple())
        ...

    def wake_up(self, tags=None):
        allocator = CuMemAllocator.get_instance()
        allocator.wake_up(tags)
        if self._sleep_saved_buffers:
            # restore buffers saved before level 2 sleep
            ...

这段实现比”level 1 权重、level 2 权重+KV 全部搬到 CPU”更细。它依赖 CuMemAllocator 的 tag:

模式源码动作正确理解
sleep(1)allocator.sleep(offload_tags=("weights",))只对 allocator 中标记为 weights 的分配做 offload;其他 allocator 管理的块按 allocator 语义处理,不应写成”KV 一定保留”
sleep(2)先把 model.named_buffers() clone 到 CPU,再 allocator.sleep(offload_tags=tuple())level 2 特别保护 model buffers;释放范围由 allocator 当前管理的内存池决定
wake_up(tags)allocator.wake_up(tags),再恢复 _sleep_saved_buffers可以按 tag 唤醒;如果之前保存过 buffers,会复制回模型 buffer

前面 load_model()initialize_from_config() 的内存池 tag 正好解释了这里的行为:权重加载时使用 "weights" tag,KV cache 初始化时使用 "kv_cache" tag。sleep level 并不是 Worker 自己遍历所有 tensor 粗暴搬家,而是把释放/恢复交给 allocator 的 tagged memory pool。

stateDiagram-v2
    [*] --> ACTIVE
    ACTIVE --> LEVEL1: sleep(1)
    LEVEL1 --> ACTIVE: wake_up()<br/>新请求到达

    ACTIVE --> LEVEL2: sleep(2)
    LEVEL2 --> ACTIVE: wake_up()
    LEVEL1 --> LEVEL2: sleep(2)<br/>进一步腾让

    note right of LEVEL1
        allocator 按 weights tag offload
        常用于释放权重占用
    end note

    note right of LEVEL2
        保存 model buffers
        allocator 释放更大范围
    end note

典型使用场景:

  • 多模型实例混部——8 张卡跑 10 个不同模型实例,靠 sleep-wake 切换活跃的
  • 冷热分层——热模型常驻 GPU,冷模型进 level 2,有请求时现唤醒
  • 开发环境复用——多个开发者共享一组 GPU,自己的实例用完自动 sleep

这是 V1 相对 V0 更细的资源复用能力。真正上线时需要结合模型大小、KV cache 配置、allocator tag 覆盖范围和冷启动 SLA 做测试,不能只根据 level 名字推断唤醒时间。

6.9 故障处理:fail-fast 的工程哲学

6.9.1 MultiprocExecutor 的故障检测

主进程通过 multiprocessing.Process spawn Worker 子进程。当 Worker 崩溃(OOM、CUDA error、Python 异常)时,主进程如何察觉?真实实现(vllm/v1/executor/multiproc_executor.py:107)用的不是定时轮询、而是事件驱动

# multiproc_executor.py:107
def start_worker_monitor(self):
    workers = self.workers
    self_ref = weakref.ref(self)

    def monitor_workers():
        sentinels = [h.proc.sentinel for h in workers]
        died = multiprocessing.connection.wait(sentinels)   # ← 阻塞等待
        _self = self_ref()
        if not _self or getattr(_self, 'shutting_down', False):
            return
        _self.is_failed = True
        proc_name = next(h.proc.name for h in workers
                         if h.proc.sentinel == died[0])
        logger.error(
            "Worker proc %s died unexpectedly, "
            "shutting down executor.", proc_name)
        _self.shutdown()
        callback = _self.failure_callback
        if callback is not None:
            _self.failure_callback = None
            callback()

    Thread(target=monitor_workers, daemon=True,
           name="MultiprocWorkerMonitor").start()

几个关键差别和常规”轮询 + is_alive()”模式形成对照:

1. multiprocessing.connection.wait(sentinels) 是阻塞式事件等待。每个 Process.sentinel 是一个 OS 级句柄;当对应进程终止,这个句柄会变成 ready,wait(sentinels) 在任意一个 ready 时返回。它的价值不是某个固定毫秒数字,而是避免了”定期 sleep 再扫一遍 is_alive()”的轮询结构,故障路径更及时也更少后台噪音。

2. weakref.ref(self) 避免监控线程强引用持有 Executor。如果监控线程直接持有 self,正常退出路径下 Executor 就 drop 不掉——daemon 线程会吊住整个进程。weakrefself 可以被 GC、监控线程在 self_ref() 返回 None 时自己静默退出。这是 Python daemon thread + 循环引用的标准规避。

3. shutting_down 标志区分”意外崩溃 vs 正常关停”。主线程主动 shutdown() 时会设这个 flag、之后再发 terminate 信号给 workers——workers 退出导致 wait() 返回时监控线程看到 shutting_down=True 就什么都不做。没这个 flag 的话,正常关停流程也会触发”Worker died unexpectedly”的 error log 和 callback,制造 false alarm。

4. failure_callback 是单次触发(line 129-130):_self.failure_callback = None; callback()——先置空再调用,保证一次 Worker 死亡事件只触发一次 callback。防 race condition:如果两个 worker 同时死(CUDA 级别的整 node 崩溃),本函数被并发触发两次、只有第一次走完整 callback 路径。

检测到 Worker 死亡 → 整个 EngineCore 退出 → 外部编排(K8s / systemd)重启服务。这是”fail-fast”哲学的典型体现——但具体实现里每行代码都在和”轮询延迟 / GC 环依赖 / 并发 race / false alarm”这些具体工程毒素作战。

6.9.2 为什么不尝试”复活”死 Worker

直觉上我们可能想:“Worker 死了重启一个不就行了?“但实际上:

  • 权重要重新加载:耗时随模型大小、权重格式、磁盘/网络和量化方式变化,这段时间 GPU 不可稳定服务
  • KV Cache 状态丢了:正在跑的请求全部作废,用户侧还得重试
  • TP 场景下必须同时重启所有 rank:任何一个 rank 权重不一致就 all_reduce 崩溃
  • LoRA / 量化状态也要重建:更长时间

与其让用户面对一个状态不清的半恢复实例,不如让整个实例立刻退出、由外部编排拉起新实例、健康检查期间把流量路由到其他副本。用户体验一致性更好,SRE 也更清楚状态。

6.9.3 Ray 场景的特殊处理

Ray Actor 有自己的生命周期和失败处理机制,但在 LLM 推理里,“某个 rank 单独恢复”并不等于服务恢复。TP/PP 组内的权重、KV cache、请求状态、LoRA 状态都要一致;任何一部分只恢复了一半,后续 collective 通信和调度状态都可能变成不可推理。V1 的 Ray executor 继承了 V0 的 actor/placement group 复杂逻辑,V1 自己只在 execute_model() 出口适配 compiled DAG 与 PP future;这也说明失败处理应该按整组 worker 的一致性来理解,而不是按单 actor 的重启能力来想象。

这种对”fail-fast 胜于半恢复”的坚持在 V1 其他很多地方也能看到。是一种选择,而不是懒惰——它让整个系统的状态空间变得可推理。

6.9.4 V1 三种 Executor 实现策略:从 V0 继承的不对称真相

把 V0 / V1 两套 executor 目录全部按行数实测——

ExecutorV1 实现路径V1 行V0 父类路径V0 行实际承担
UniProcExecutor(不存在 V1 版本)vllm/executor/uniproc_executor.py141V1 直接复用 V0 实现——vllm/v1/executor/abstract.pyExecutor 类把 V0 的 UniProcExecutor 直接拉进来当默认
MultiprocExecutorvllm/v1/executor/multiproc_executor.py480(V0 不复用)完全重写——480 行全新实现、不继承任何 V0 代码
RayDistributedExecutorvllm/v1/executor/ray_distributed_executor.py61vllm/executor/ray_distributed_executor.py700极薄 V1 包装——class RayDistributedExecutor(RayDistributedExecutorV0, Executor) 只覆盖 execute_model() 加 PipelineParallel 异步 future 支持、其他全继承

vllm/v1/executor/ 目录总共只有 653 行——其中 multiproc_executor.py 480 占 73%——印证 §1.6.1 实测的”executor/ 仅 653 行”——是 v1/ 22 子目录里最薄的一个

三条值得记住的工程含义——

  1. MultiprocExecutor 完全重写、Ray 只做薄包装——根因是两者改造代价不对称:MultiprocExecutor 的 V0 实现耦合在 worker 生命周期 + KV broadcast + 旧调度协议里、改不动只能重来;Ray 的核心逻辑(actor 创建、跨机 RPC、placement group)是稳定的、V1 只需要在 execute_model() 这个出口接上新的 SchedulerOutput → ModelRunnerOutput 类型契约
  2. V1 RayDistributedExecutor 61 行的 FutureWrapper(line 12-25)专门为 Pipeline Parallel 服务——max_concurrent_batches > 1 时返回 Future 让调度器立即调度下一个 batch、不阻塞等待——这是 §6.4.2 “Compiled DAG 承接 Ray 热路径”在出口侧的对应:底层执行图固定,上层调度器拿到 future 后可以继续推进 pipeline
  3. v1/worker/ 4851 行 vs v1/executor/ 653 行 = 7.4 倍——印证 §1.6.1 测得的”worker 23% 是最重子系统”——单卡内的 GPU 流水线编排(gpu_model_runner.py 1776 + tpu_model_runner.py 1419 + gpu_input_batch.py 677)远比跨进程编排重——这是 GPU 推理”重活在 worker、薄壳在 executor”的设计底色

gpu_model_runner.py 1776 行是 v1/worker/ 最重单文件——比本章 §6.6 讨论的”worker 6 阶段初始化”(gpu_worker.py 349)还重 5 倍——是下一章 ch07 ModelRunner 的核心读物。

6.10 本章小结

Worker 与 Executor 是 vLLM 从”调度决策”到”GPU 计算”的执行层:

  • Executor 抽象层封装部署拓扑,让 EngineCore 的 step() 代码永远只有一行调用;collective_rpc 是唯一核心原语
  • UniProcExecutor——单卡/开发用,函数调用即 RPC,测试友好
  • MultiprocExecutor——V1 单机 TP 路径;共享内存 MessageQueue 用 pickle 序列化、本地小消息走 shared-memory ring buffer,大消息或远端读者走 ZMQ pub/sub;进程启动方式由 VLLM_WORKER_MULTIPROC_METHOD_maybe_force_spawn() 决定
  • RayDistributedExecutor——多机或 pipeline parallel 路径;V1 是 V0 Ray executor 的薄包装,热路径通过 Compiled DAG execute(),PP 时返回 FutureWrapper
  • 有状态 Worker——本地维护 InputBatch 张量,每步同步 SchedulerOutput 差量;decode-heavy 场景下少发重复 block table 与 sampling 状态,一致性靠”可靠命令流 + fail-fast”保证
  • 六阶段初始化——设备、模型、探测、KV 分配、预热、就绪;源码顺序比固定启动耗时更重要
  • CUDA Graph + torch.compile——互补方案:Python 侧算子融合 + CUDA 侧调用序列重放;capture sizes 来自配置,运行时通过 pad_for_cudagraph() 对齐
  • Sleep Level——level 1 按 weights tag offload,level 2 先保存 model buffers 再释放 allocator 管理内存;具体释放量要看 allocator tag 覆盖
  • fail-fast 哲学——Worker 崩溃 → 整个 EngineCore 退出 → 外部编排重建;不尝试半恢复,因为权重重载太慢、状态太复杂

物理事实:v1/executor/ 仅 653 行(v1/ 22 子目录最薄);MultiprocExecutor 完全重写 480 行 vs RayDistributedExecutor 仅 61 行薄包装继承 V0 700 行——印证’重活在 worker、薄壳在 executor’设计底色;gpu_model_runner.py 1776 行是 v1/worker/ 最重单文件、是下一章 ModelRunner 核心读物。


源码导航

  • Executor 抽象:vllm/v1/executor/abstract.py
  • UniProcExecutor:vllm/executor/uniproc_executor.py
  • MultiprocExecutor:vllm/v1/executor/multiproc_executor.py
  • RayDistributedExecutor:vllm/v1/executor/ray_distributed_executor.py
  • 共享内存 MessageQueue:vllm/distributed/device_communicators/shm_broadcast.py
  • GPU Worker:vllm/v1/worker/gpu_worker.py
  • InputBatch(有状态 Worker 的核心):vllm/v1/worker/gpu_input_batch.py
  • CUDA Graph 捕获:vllm/v1/worker/gpu_model_runner.pycapture_model