vLLM 推理内核深度解析
第2章 EngineCore:引擎的心脏
第2章 EngineCore:引擎的心脏
"A conductor does not make a sound. He depends on his ability to make other people powerful." — Benjamin Zander
本章要点
- 理解 EngineCore 作为"指挥者"的职责边界:它协调所有人,却不替任何人干活
- 跟随
EngineCoreProc.run_busy_loop逐行走完主循环的 6 个阶段 - 掌握
add_request/abort_request的数据契约:跨进程传的是什么,为什么这么传 - 读懂 ZMQ 通信协议:输入 ROUTER/DEALER、输出 PUSH/PULL,以及同步客户端关闭用的 PAIR
- 看懂
EngineCoreClient的三种形态(Sync / Async / MultiprocClient)以及调用方如何无感切换 - 理解请求生命周期状态机的每一个迁移背后的触发条件
- 分析 V1 如何用"异步流水线 + 后台线程"把 CPU 调度与 GPU 计算叠到一起
- 掌握多数据并行(DP)下 EngineCore 的跨 rank 协调机制
- 学会读 EngineCore 的信号处理与优雅退出路径,避免生产环境僵尸进程
- 知道在哪些观测点挂钩子来排查"吞吐上不去"类问题
2.1 指挥者模式:不演奏,却不可或缺
在一个交响乐团中,指挥不演奏任何乐器。他不吹长笛,不拉小提琴,不敲定音鼓。但如果指挥离开,乐团会在几小节内陷入混乱——节奏失序,声部冲突,音乐瓦解。
EngineCore 就是 vLLM 的指挥。
它不做分词——那是 API Server 的工作。它不做 GPU 计算——那是 Worker 的工作。它甚至不做具体的调度决策——那是 Scheduler 的工作。但它协调所有这些组件的节奏,确保它们在正确的时刻做正确的事。
这种"不做具体事,只做协调"的设计,是典型的中介者模式(Mediator Pattern)的工程落地。放到 V1 重构的语境下看,它解决了 V0 时代一个非常痛苦的问题:Scheduler 和 Executor 之间的直接耦合让两者无法独立演化。把它们都挂到 EngineCore 下,用明确的 SchedulerOutput / ExecutorOutput 数据契约串起来,调度算法改了不影响执行路径,执行后端(GPU / TPU / CPU)换了不影响调度逻辑。
我们先从最宏观的视角看 EngineCore 在一次推理步骤中指挥的"声部":
sequenceDiagram
participant API as API Server
participant EC as EngineCore
participant SCH as Scheduler
participant KV as KV Cache Manager
participant EXE as Executor
participant W as Worker(s)
API->>EC: 新请求(Token IDs + 采样参数)
loop 每一步推理 step()
EC->>SCH: schedule()
SCH->>KV: 分配/释放块
SCH-->>EC: SchedulerOutput
EC->>EXE: execute_model(SchedulerOutput)
EXE->>W: 分发到 GPU(TP / DP / PP)
W-->>EXE: 采样后的新 Token
EXE-->>EC: ExecutorOutput
EC->>EC: update_from_output()
EC-->>API: EngineCoreOutputs(流式)
end
API->>EC: abort_request(req_id)
EC->>SCH: finish_requests({req_id}, ABORTED)
SCH->>KV: free blocks
注意这张图里 EngineCore 出现了 6 次,但没有任何一条箭头是"EngineCore 做 XXX"——它永远是"让别人做 XXX"的那一方。这就是指挥者模式的精髓:最重要的组件恰恰是最"无所事事"的那一个。
让我们打开 vllm/v1/engine/core.py,看看这位指挥是怎么指挥的。
2.1.1 EngineCore 初始化时真正握住了什么
EngineCore.__init__() 的顺序很能说明它的职责边界:
| 顺序 | 源码动作 | 含义 |
|---|---|---|
| 1 | self.model_executor = executor_class(vllm_config) |
EngineCore 不知道底层是 uni、mp 还是 ray,只拿 executor 接口 |
| 2 | _initialize_kv_caches(vllm_config) |
让 executor 汇报 KV spec 和可用显存,统一各 worker 的 KV cache config,并广播初始化 |
| 3 | StructuredOutputManager(vllm_config) |
结构化输出的 grammar 编译入口挂在 EngineCore 上,但实际请求状态仍进 scheduler |
| 4 | 构造 Scheduler(...) |
把 KV cache config、structured output manager、DP finished-set 开关交给 scheduler |
| 5 | MirroredProcessingCache(model_config) |
多模态预处理缓存和 client 侧镜像,EngineCore 收到 mm hash 后能取回对应输入 |
| 6 | batch_queue_size = model_executor.max_concurrent_batches |
只有 executor 声明可并发多个 batch 时,EngineCore 才启用 batch queue |
这张表也解释了为什么 EngineCore 不该直接写模型逻辑。它手里只有三个重对象:executor、scheduler、structured/multimodal 辅助管理器。模型怎么加载、KV cache 张量怎么分配、token 怎么采样,都不在 EngineCore 里实现;EngineCore 只负责把这些对象按生命周期接起来。
2.2 主循环:逐行拆解 run_busy_loop
EngineCore 有两个核心类:EngineCore(真正的调度/执行协调者)与 EngineCoreProc(多进程形态下的 ZMQ 包装层)。EngineCore.__init__() 会创建 executor、profile 并初始化 KV cache、构造 scheduler、建立多模态输入缓存;EngineCoreProc.__init__() 再额外启动输入/输出两个 IO 线程,把 ZMQ socket 和 Python queue.Queue 接起来。
先看真实的 EngineCoreProc.run_busy_loop:
class EngineCoreProc(EngineCore):
def run_busy_loop(self) -> None:
# Loop until process is sent a SIGINT or SIGTERM
while True:
# 1) Poll the input queue until there is work to do.
self._process_input_queue()
# 2) Step the engine core and return the outputs.
self._process_engine_step()
它比想象中的"大循环"更薄,因为 socket IO 被拆到后台线程里了:process_input_socket() 负责从 ZMQ DEALER 收消息、反序列化后放入 input_queue;process_output_socket() 负责从 output_queue 取 EngineCoreOutputs、用 MsgpackEncoder 编码后通过 PUSH socket 发回前端。主循环只碰 Python queue 和核心状态,这样可以把 ZMQ IO、序列化/反序列化和 GPU forward 尽量重叠。
(1) 初始化在 busy loop 之前完成
本章不能写成 run_busy_loop() 里有一个 _ensure_workers_ready() 屏障;本地源码没有这个函数。真正的就绪顺序发生在构造阶段:
EngineCore.__init__()创建model_executor,然后_initialize_kv_caches()调get_kv_cache_specs()、determine_available_memory()、get_kv_cache_config()、unify_kv_cache_configs(),最后model_executor.initialize_from_config(kv_cache_configs)。EngineCoreProc.__init__()创建input_queue/output_queue,启动process_input_socket()和process_output_socket()两个 daemon 线程。process_input_socket()用 DEALER identity 连接到前端 ROUTER 后,先socket.send(b"READY")。MPClient._wait_for_engine_startup()在前端用 poller 等每个 engine id 发回READY,全部 ready 后才允许客户端继续。
所以就绪信号不是 Worker 逐个 set multiprocessing.Event,而是 EngineCore 子进程完成自身初始化并连上 input socket 后,通过 ZMQ 给前端发 READY。Worker 是否完成权重加载、KV cache 初始化、warmup,已经被 EngineCore.__init__() 和 executor 初始化流程包在这个 ready 之前。
(2) 批量拉取输入:减少系统调用
_process_input_queue 是一个贪心消费,但它消费的是 Python input_queue,不是直接读 ZMQ socket:
def _process_input_queue(self) -> None:
while not self.engines_running and not self.scheduler.has_requests():
req = self.input_queue.get() # 阻塞等第一条工作
self._handle_client_request(*req)
while not self.input_queue.empty(): # 非阻塞清空积压
req = self.input_queue.get_nowait()
self._handle_client_request(*req)
为什么不是每次主循环只处理一条消息?因为在高 QPS 场景下,前端可能短时间内塞进来几十条 add_request。如果每次主循环只处理一条,那处理完一条就要跑一整轮调度+GPU 计算,剩下的消息要积压若干个 step 才能入队——这会显著拖慢请求的首 token 延迟。批量拉取把"接收"和"调度"解耦,一次把所有等待中的消息吸进来,然后在后续的 step 中一起调度。
(3) 无请求时的等待策略
self.scheduler.has_requests() 返回 False 意味着目前没有任何 WAITING 或 RUNNING 的请求。普通 EngineCoreProc 这时不会跑 step() 空转,而是在 _process_input_queue() 的第一段 self.input_queue.get() 上阻塞等待。真正的 ZMQ poller 在 client 侧和输出线程里使用:启动时 MPClient._wait_for_engine_startup() 用 poller 等 READY;SyncMPClient 的输出线程用 poller 同时监听 output socket 和 shutdown PAIR socket。
这个拆分很重要:EngineCore 主循环不需要懂 ZMQ poller,它只要处理 queue;IO 线程和 client 负责 socket 细节。这样主循环的状态机更小,也避免了 socket 读写逻辑和 scheduler 状态混在一起。
(3.5) _handle_client_request:四种输入类型
主循环从 input_queue 里拿到的元素形如 (EngineCoreRequestType, payload)。_handle_client_request() 只分四类:
| request type | 处理动作 |
|---|---|
ADD |
调 add_request(request),把 EngineCoreRequest 转成内部 Request,必要时启动 grammar 编译,然后交给 scheduler |
ABORT |
调 abort_requests(request_ids),最终进入 scheduler 的 finish_requests(..., FINISHED_ABORTED) |
UTILITY |
解出 (call_id, method_name, args),反射调用 EngineCore 上的方法,把结果或错误包成 UtilityOutput 发回 |
EXECUTOR_FAILED |
直接抛 RuntimeError("Executor failed."),触发 EngineCore dead 信号 |
UTILITY 是很多"控制面"能力的统一通道:profile、reset prefix cache、sleep/wake、add/remove/list/pin LoRA、save sharded state、collective_rpc 都通过它从 client 侧调用到 EngineCore 子进程。这里还有一个小细节:如果 utility 参数应该是某个 msgspec.Struct,_convert_msgspec_args() 会用方法签名里的类型注解尝试 msgspec.convert(),避免跨进程序列化后类型变成普通 dict/list。
(4) step():引擎的一拍
def step(self) -> list[EngineCoreOutput]:
# 4.1 调度:决定本步执行哪些请求、每个请求执行多少 Token
scheduler_output = self.scheduler.schedule()
# 4.2 执行:把调度结果交给 Executor 跑前向传播
executor_output = self.model_executor.execute_model(scheduler_output)
# 4.3 处理输出:更新请求状态、检查完成条件、构造输出包
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, executor_output
)
return engine_core_outputs
这三行是整个 V1 推理引擎的灵魂。注意它们的数据流:
graph LR
A["Scheduler.schedule()"] -->|"SchedulerOutput<br/>· 本步执行的 req + token 数<br/>· KV 块分配<br/>· 抢占/恢复指令"| B["Executor.execute_model()"]
B -->|"ExecutorOutput<br/>· 每个 req 的新采样 token<br/>· logprobs / prompt_logprobs<br/>· 投机命中情况"| C["Scheduler.update_from_output()"]
C -->|"list[EngineCoreOutput]<br/>· 新 token<br/>· finish_reason<br/>· 最终 logprobs"| D["API Server"]
style A fill:#f59e0b,color:#fff,stroke:none
style B fill:#10b981,color:#fff,stroke:none
style C fill:#f59e0b,color:#fff,stroke:none
style D fill:#3b82f6,color:#fff,stroke:none
注意 update_from_output 虽然方法名在 Scheduler 上,但它修改的是 Scheduler 内部状态(RUNNING / FINISHED 队列、KV 块分配表)然后把需要发给前端的数据打包成 list[EngineCoreOutput] 返回。这个方法是 V0 → V1 重构的受益者之一:V0 里这块逻辑散落在 LLMEngine、BlockManager、OutputProcessor 三个类里,难以追踪状态变化;V1 把所有"一步结束后的状态更新"集中到 Scheduler,让一致性检查变得可行。
(5) 发回 API Server
_process_engine_step() 不直接写 ZMQ,它只把输出放进 output_queue:
def _process_engine_step(self):
outputs = self.step_fn()
if outputs is not None:
self.output_queue.put_nowait(outputs)
输出线程 process_output_socket() 再从 output_queue.get() 阻塞取值,使用 MsgpackEncoder.encode_into() 编码,socket.send_multipart(buffers, copy=False, track=True) 发出。它还维护 pending 和 reuse_buffers:如果 ZMQ 还没有发送完,就暂时保留 outputs 和 backing buffer 的引用;发送完成后回收 bytearray,最多缓存两个复用。这里的关键不是"丢弃最老消息",本地源码没有这么做;关键是用独立线程和 zero-copy multipart 发送,把输出序列化/发送从核心循环中移走。
2.3 请求的注入:add_request 全链路
从 API Server 发起 add_request 到请求真正被 Scheduler 看到,中间要穿过进程边界、经过数据结构转换、更新内部索引。我们跟着一个请求走完这段路:
sequenceDiagram
autonumber
participant U as 用户(async HTTP)
participant AS as API Server
participant CL as MultiprocClient
participant Z as ZMQ input_socket
participant EC as EngineCoreProc
participant SCH as Scheduler
U->>AS: POST /v1/chat/completions
AS->>AS: tokenize prompt
AS->>CL: client.add_request_async(req)
CL->>CL: 构造 EngineCoreRequest
CL->>Z: send_multipart(msgpack(req))
Z->>EC: _process_input_queue 读出来
EC->>EC: Request.from_engine_core_request()
EC->>SCH: scheduler.add_request(request)
SCH-->>EC: 入 WAITING 队列
关键点有三个。
第一,API Server / processor 侧完成输入处理。V1 的 EngineCoreRequest 已经携带 prompt_token_ids,EngineCore 不再负责把原始文本分词成 token。这样引擎核心只看 token、采样参数、多模态占位、LoRA 请求等结构化输入;tokenizer 和 OpenAI 协议细节留在前端处理。
第二,跨进程传递的是 EngineCoreRequest 不是 Request。这是一个精心设计的分离:
# vllm/v1/engine/__init__.py
class EngineCoreRequest:
request_id: str
prompt_token_ids: list[int]
mm_inputs: Optional[Sequence[Optional[MultiModalKwargs]]]
mm_hashes: Optional[list[str]]
mm_placeholders: Optional[list[PlaceholderRange]]
sampling_params: SamplingParams
eos_token_id: Optional[int]
arrival_time: float
lora_request: Optional[LoRARequest]
current_wave: int = 0 # DP wave 协调用
# vllm/v1/request.py
class Request:
"""EngineCore 内部使用的带状态对象。包含 KV 块引用、当前 token 数、
状态标志(WAITING / RUNNING / FINISHED / ...)等 runtime 信息。"""
def __init__(self, core_req: EngineCoreRequest, ...):
self.request_id = core_req.request_id
self.prompt_token_ids = core_req.prompt_token_ids
self.num_computed_tokens = 0
self.output_token_ids: list[int] = []
self.status = RequestStatus.WAITING
self.num_computed_tokens = 0
self.mm_positions = multi_modal_placeholders or []
self.structured_output_request = StructuredOutputRequest(...)
EngineCoreRequest 是 msgspec.Struct,带 array_like=True、omit_defaults=True、gc=False,不是普通 dataclass。这个选择服务于跨进程序列化:字段顺序固定、默认值省略、对象更轻。Runtime 状态(例如 _output_token_ids、_all_token_ids、num_computed_tokens、结构化输出状态、多模态位置)在 Request.from_engine_core_request() 之后才在 EngineCore 进程内生成,前端不需要知道。
第三,scheduler.add_request 在主循环外部被调用。这点在并发安全上很微妙——严格说来,add_request 是在 _process_input_queue 里调用的,而 _process_input_queue 在主循环的顶部运行,和 step() 不重叠。换句话说,Scheduler 的状态修改要么发生在处理输入阶段,要么发生在 step 阶段,两者是串行的。整个 EngineCore 进程其实是单线程的(只有一个主循环线程),所以不存在锁问题。这也是 V1 能把实现写得如此紧凑的底层原因——复杂性被并发安全约束限制得很死。
2.4 取消请求:abort_request 的三种时机
abort_request 是另一个必须实现正确才能在生产环境活下去的机制。用户关浏览器、HTTP 连接超时、应用层主动取消——这些都要能干净地把请求从引擎里撤走、释放资源。
取消可能发生在三个时刻:
stateDiagram-v2
direction LR
WAITING: WAITING
RUNNING: RUNNING
FINISHED: FINISHED / ABORTED
[*] --> WAITING
WAITING --> RUNNING
RUNNING --> FINISHED
WAITING --> A1: abort@WAITING\n直接从等待队列移除
RUNNING --> A2: abort@RUNNING\n标 ABORTED,下一步释放 KV
FINISHED --> A3: abort@FINISHED\n无操作(已经完成)
对应的源码:
def abort_requests(self, request_ids: list[str]) -> None:
"""从引擎中移除给定的请求。支持同时取消多个。"""
# 注意:V1 里 scheduler 维护了 requests 字典,用 request_id 直接查
self.scheduler.finish_requests(
request_ids=request_ids,
finished_status=RequestStatus.ABORTED,
)
# vllm/v1/core/sched/scheduler.py
def finish_requests(self, request_ids, finished_status):
for rid in request_ids:
req = self.requests.get(rid)
if req is None:
continue # 已经不存在(可能已完成或从未注册)
if req.status == RequestStatus.WAITING:
self.waiting.remove(req)
elif req.status == RequestStatus.RUNNING:
self.running.remove(req)
else:
self.waiting.remove(req)
req.status = finished_status
self._free_request(req)
注意 get 配合 if req is None: continue 的防御式写法。这是因为 abort_requests 可能收到一个"在飞行中"的请求 id——前端刚发 abort 时,引擎可能刚好把这个请求标记为 FINISHED 并清理掉了。这种 TOCTOU(Time Of Check Time Of Use)竞态在分布式系统里很常见,V1 的解决方案是让 abort 做到幂等:取消一个不存在的请求不应该报错。
还有一个不明显的细节:abort 通过普通的 input 消息传递,而不是走"紧急通道"。这意味着它会和 ADD、UTILITY 一起进入 input_queue,由 _process_input_queue() 顺序处理。本章不写固定延迟数字;真正要关注的是幂等和资源释放是否正确。
2.5 ZMQ 通信协议:三条 Socket 各司其职
MultiprocClient 和 EngineCoreProc 之间用 ZMQ 通信。具体架构如下:
graph LR
subgraph "API Server 进程"
AS["API Server"]
CL["MultiprocClient"]
end
subgraph "EngineCore 进程"
EC["EngineCoreProc"]
end
AS --> CL
CL -->|"inputs<br/>(ROUTER → DEALER)"| EC
EC -->|"outputs<br/>(PUSH → PULL)"| CL
EC -.->|"READY<br/>DEALER identity"| CL
style AS fill:#8b5cf6,color:#fff,stroke:none
style CL fill:#f59e0b,color:#fff,stroke:none
style EC fill:#10b981,color:#fff,stroke:none
三条 Socket 各自承担不同语义:
inputs (ROUTER → DEALER):前端绑定 ROUTER socket,每个 EngineCoreProc 用带 engine index 的 DEALER identity 连接。MPClient._send_input() 发 (identity, request_type, serialized_request...),ZMQ 根据 identity 把消息送到对应 engine。这个设计能自然支持 DP 多个 EngineCore:同一个前端 input socket 可以按 engine id 精确投递,而不是 PUSH/PULL 的轮询分发。
outputs (PUSH → PULL):引擎 → 前端的单向队列。EngineCore 产生的 EngineCoreOutputs 都走这条。这里特别强调"单向"——API Server 不会往这条上发任何东西,引擎也不会从上面读。这种方向性约束让协议实现起来极其简单:每条消息读完就处理,不需要维护请求-响应关联。
ready(DEALER 发给 ROUTER):子进程 input socket 连接成功后先 socket.send(b"READY")。前端 _wait_for_engine_startup() 用 poller 等每个 engine id 都发回 READY。PAIR 在本地源码里另有用途:SyncMPClient 的输出线程用一个 inproc PAIR socket 接收 shutdown signal,不是 EngineCore ready 通道。
为什么输入不用 PUSH/PULL?因为 DP 场景要把请求送到选定 rank。DPAsyncMPClient.get_core_engine_for_request() 当前按 num_reqs_in_flight 选最少的 engine,然后 _send_input(..., engine=chosen_engine) 用 identity 精确投递。PUSH/PULL 无法表达"发给这个 rank"。
消息编码:V1 用 vllm/v1/serial_utils.py 里的 MsgpackEncoder / MsgpackDecoder。它不是简单的 msgpack.packb(dict),而是带自定义 hook:小 tensor / ndarray 可以 inline,大 tensor / ndarray 可以作为额外 multipart buffer 发送,函数对象走 cloudpickle,普通不认识的对象才 fallback 到 pickle。
# vllm/v1/engine/core_client.py
encoder = MsgpackEncoder()
decoder = MsgpackDecoder(EngineCoreOutputs)
# 发送时是一组 multipart frames:
message = (engine.identity, request_type.value, *encoder.encode(request))
input_socket.send_multipart(message, copy=False)
因此本章不写"比 pickle 快几倍"这类泛化数字。真正的实现重点是:EngineCoreRequest / EngineCoreOutputs 本身是 msgspec.Struct,而 MsgpackEncoder 又能把 tensor/ndarray 的 backing buffer 拆成额外 frame,配合 ZMQ copy=False 减少不必要的数据复制。
2.6 EngineCoreClient:同一套接口,三张面孔
API Server 不直接和 EngineCore 打交道——它拿到一个 EngineCoreClient,按这个接口调用:
class EngineCoreClient(ABC):
@abstractmethod
def add_request(self, request: EngineCoreRequest) -> None: ...
@abstractmethod
def abort_requests(self, request_ids: list[str]) -> None: ...
@abstractmethod
def get_output(self) -> EngineCoreOutputs: ...
@abstractmethod
def shutdown(self) -> None: ...
# ... 还有十几个方法
三种具体实现:
graph LR
API["API Server"] --> CL["EngineCoreClient<br/>(抽象)"]
CL --> Sync["InprocClient<br/>同进程同步调用"]
CL --> Async["AsyncMPClient<br/>同进程 async 调用"]
CL --> MP["SyncMPClient<br/>子进程 + ZMQ"]
CL --> DPMP["DPAsyncMPClient<br/>子进程组 + DP 协调"]
style API fill:#8b5cf6,color:#fff,stroke:none
style CL fill:#f59e0b,color:#fff,stroke:none
style Sync fill:#10b981,color:#fff,stroke:none
style Async fill:#10b981,color:#fff,stroke:none
style MP fill:#10b981,color:#fff,stroke:none
style DPMP fill:#10b981,color:#fff,stroke:none
InprocClient——最简单的实现。EngineCore 在同一进程里,add_request 就是一个普通的 Python 方法调用,get_output 触发一次 step()。适合离线批处理场景(llm = LLM(model="...") 这种用法)。
AsyncMPClient——子进程 + ZMQ 的 asyncio 形态。add_request_async() 通过 async ZMQ socket 发送,get_output_async() 从 asyncio queue 取输出;输出 socket 由 EngineCoreOutputQueueTask 后台任务持续读取。源码明确不支持"asyncio 但不 multiprocess":make_client() 在 asyncio_mode and not multiprocess_mode 时直接 NotImplementedError。
SyncMPClient——子进程 + ZMQ 的同步形态。add_request 是发 ZMQ 消息然后立即返回,get_output 是阻塞地从 ZMQ 读消息。主要给某些需要"同步感觉但要跨进程"的测试工具用。
DPAsyncMPClient——多 data-parallel rank 的 asyncio 形态。它为每个 DP rank 启动一个 CoreEngine,共用一个 input ROUTER socket 和一个 output PULL socket;请求按 num_reqs_in_flight 选当前最少的 engine。当前源码注释还写着 multi-node 暂不支持,所以 local_dp_rank == dp_rank。
关键是这四种 Client 的外部 API 完全一致。上层 API Server 代码写的时候只 client.add_request(...),切换部署模式只是 engine_core_client_from_engine_args(...) 里换一个分支:
def make_engine_core_client(engine_args):
if multiprocess_mode and asyncio_mode:
if data_parallel_size > 1:
return DPAsyncMPClient(...)
return AsyncMPClient(...)
if multiprocess_mode and not asyncio_mode:
return SyncMPClient(...)
return InprocClient(...)
一个接口、四种实现,这正是"依赖倒置"的实战价值——上层代码面向抽象,切底层不需要动。
2.6.1 Client 侧也在管理生命周期
MPClient 不是一个薄 socket wrapper,它还负责一组容易被忽略的资源生命周期问题。
第一,BackgroundResources 被挂到 weakref.finalize() 上。客户端对象即使因为异常半初始化,也会尝试关闭 core engine 子进程、取消 output queue task、关闭 input/output socket。ZMQ context 如果 socket 没先关,term() 可能挂住,所以源码里先显式 close(linger=0)。
第二,发送 input 时要保留 pending message。MsgpackEncoder 可能把 tensor/ndarray 的 backing buffer 抽成额外 frame,send_multipart(copy=False, track=True) 返回的 tracker 没完成前,原始 request 对象不能被 GC。pending_messages 保存 (tracker, request),free_pending_messages() 只在 tracker.done 后释放引用。这一段是性能优化和内存安全的交界:想 zero-copy,就必须自己管理对象寿命。
第三,utility call 通过 call_id 对应 future。client 发送 (call_id, method, args),EngineCore 执行后返回 UtilityOutput(call_id, result/failure);输出处理线程调用 _process_utility_output() 找回 future 并设置结果或异常。这样控制面调用和普通 token 输出可以共用 output socket,而不会混淆响应归属。
第四,DP client 还维护 reqs_in_flight。请求发给哪个 CoreEngine,就把 request id 映射到那个 engine;收到 finished request 后再把对应 engine 的 num_reqs_in_flight 减一。这个计数是后续路由选择的依据,因此如果 output 侧不处理 finished_requests,DP 负载均衡会越来越偏。
这些细节共同说明一件事:EngineCore 的可靠性不只在子进程里,也在 client 侧的引用、future、socket 和进程句柄管理里。读这部分源码时不要只看 step(),还要看对象什么时候被保留、什么时候释放、异常怎样跨线程/跨进程冒泡。
这也是排查线上卡死时的阅读顺序:先确认 core engine 是否还活着,再看 output task 是否还在消费,最后看 pending message 是否因为 zero-copy buffer 没释放而堆积。 这条链路比单个函数更能解释真实故障,也更适合作为压测时的检查清单。
2.7 请求的一生:状态机与时序
把请求在 EngineCore 中经历的所有状态画成状态机:
stateDiagram-v2
[*] --> WAITING: add_request()
WAITING --> WAITING_FOR_FSM: 有结构化输出约束<br/>等 FSM 编译
WAITING_FOR_FSM --> WAITING: FSM 就绪
WAITING --> RUNNING: Scheduler 选中<br/>分配 KV 块
RUNNING --> RUNNING: 继续生成
RUNNING --> PREEMPTED: KV 显存紧张<br/>被抢占
PREEMPTED --> WAITING: 释放 KV<br/>回到等待
RUNNING --> FINISHED_STOPPED: 遇到 stop token
RUNNING --> FINISHED_LENGTH_CAPPED: 达到 max_tokens
RUNNING --> FINISHED_ABORTED: abort_request()
WAITING --> FINISHED_ABORTED: abort_request()
FINISHED_STOPPED --> [*]: 释放 KV
FINISHED_LENGTH_CAPPED --> [*]: 释放 KV
FINISHED_ABORTED --> [*]: 释放 KV
每一个迁移都有明确的触发条件,我们逐个过一遍。
WAITING → RUNNING(被调度):这是最常见的迁移,发生在 Scheduler.schedule() 选中这个请求时。进入 RUNNING 前必须完成两件事:给请求分配 KV 块(prefill 阶段一次性分配、decode 阶段按需增量分配)、把请求加入 running 队列。V1 里这一步的代码在 Scheduler._schedule_running_and_waiting() 中,后续章节会详细讲。
RUNNING → PREEMPTED(抢占):当 KV 池用满、新请求又到了、Scheduler 的抢占策略(FCFS / Priority)决定牺牲某个 RUNNING 请求时触发。V1 的抢占非常轻量:直接释放它的 KV 块,把状态改回 WAITING。重要的是 num_computed_tokens 不清零——下次被调度时可以增量恢复。
RUNNING → FINISHED_STOPPED:遇到 EOS token 或用户指定的 stop 字符串时触发。Scheduler.update_from_output() 每一步都会检查新生成的 token 是否触发了停止条件:
# 简化自 vllm/v1/core/sched/scheduler.py
def _check_stop(self, req, new_token_id):
if new_token_id == req.eos_token_id and not req.ignore_eos:
return FinishReason.STOP
if len(req.output_token_ids) >= req.sampling_params.max_tokens:
return FinishReason.LENGTH
if req.sampling_params.stop_token_ids and new_token_id in req.sampling_params.stop_token_ids:
return FinishReason.STOP
return None
任何状态 → FINISHED_ABORTED:上一节已经讲过。
为什么要把 FINISHED 细分成三种?因为前端需要知道"为什么停止"来决定下一步行为——OpenAI 兼容 API 的 finish_reason 字段直接映射到这个枚举:stop / length / abort。把这个信息从 EngineCore 带到 API Server 非常重要,否则前端无法正确填充响应。
2.8 异步流水线:让 CPU 和 GPU 并行跑
EngineCore 的异步不是"普通 decode 每步都不等 GPU 结果"。本地源码里普通 step() 是同步的:
scheduler_output = self.scheduler.schedule()
output = self.model_executor.execute_model(scheduler_output)
engine_core_outputs = self.scheduler.update_from_output(
scheduler_output, output)
真正的异步入口来自 pipeline parallel 的 batch queue。EngineCore.__init__() 读取 model_executor.max_concurrent_batches;如果这个值大于 1,就创建 queue.Queue(batch_queue_size),并把 self.step_fn 指到 step_with_batch_queue()。V1 Ray executor 的 max_concurrent_batches 返回 pipeline_parallel_size,PP 时 execute_model() 返回 FutureWrapper,这正好能放进 batch queue。
step_with_batch_queue() 的策略很明确:
- 如果 batch queue 没满,先
scheduler.schedule(),有 token 就调用execute_model(),把(future, scheduler_output)放入队列,并优先返回空输出。 - 如果本轮没有新 batch 可排,并且队列不空,就取出队首 future,
future.result()阻塞等它完成。 - 拿到
model_output后调用scheduler.update_from_output()。
这不是"延迟一拍检查 stop token",也不是无条件让 CPU 准备下一步;它是为了 PP 消除 pipeline bubbles:先尽量把多个 batch 填进 pipeline,等没有新 batch 可塞时再回收最早的结果。对于非 PP、max_concurrent_batches == 1 的路径,还是同步 schedule -> execute_model -> update_from_output。
2.9 数据并行(DP)协调:多个 EngineCore 的共舞
当部署模式是 data_parallel_size > 1 时,会有多个独立的 EngineCore 子进程,每个绑一组 GPU。这时候 EngineCore 之间需要协调吗?
一般情况下不需要共享请求状态。DP 的核心理念就是"各干各的",每个 rank 独立地处理分配给它的请求。DPAsyncMPClient 在前端用 get_core_engine_for_request() 选择 num_reqs_in_flight 最少的 rank,然后这个 rank 的 EngineCore 独立走完 add → schedule → execute → output。
但需要共享"这一波是否还有活"。DP worker 之间仍可能存在分布式通信要求;如果某个 rank 已经空了而其他 rank 还在跑,空 rank 不能永远停住。V1 用 wave 机制和 dummy batch 解决这个问题:
class DPEngineCoreProc(EngineCoreProc):
def run_busy_loop(self):
while True:
self._process_input_queue()
local_unfinished = self.scheduler.has_unfinished_requests()
if local_unfinished:
self._process_engine_step()
local_unfinished = self.scheduler.has_unfinished_requests()
else:
if self.scheduler.has_finished_requests():
self._process_engine_step() # flush finished state
if not self.engines_running:
continue
self.execute_dummy_batch()
self.engines_running = self._has_global_unfinished_reqs(
local_unfinished)
_has_global_unfinished_reqs() 也不是每一步都 all-reduce;源码里有一个计数器,每 24 次 forward pass 才调用 ParallelConfig.has_unfinished_dp(self.dp_group, local_unfinished) 做一次全局判断。当前 wave 全部结束后,local rank 0 会向前端发 EngineCoreOutputs(wave_complete=...),client 收到后推进 current_wave 并把 engines_running 置回 False。若某个 engine 收到非当前 wave 的请求,还会发 start_wave 让其他 rank 进入同一波。
这套机制说明 DP 协调的本质不是"共享调度器",而是"每个 rank 独立调度,但用 wave/dummy batch 保证整组能一起前进和一起停下"。
2.10 容错与优雅退出
生产环境中,引擎不能随便崩溃,也不能死得难看。EngineCore 实现了几层保障:
信号处理——子进程启动时注册 SIGTERM / SIGINT handler:
shutdown_requested = False
def signal_handler(signum, frame):
nonlocal shutdown_requested
if not shutdown_requested:
shutdown_requested = True
raise SystemExit()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
注意这里不是"设置标志然后等主循环安全点退出",而是直接抛 SystemExit,并用 shutdown_requested 避免重复抛。run_engine_core() 外层捕获 SystemExit 后重新抛出,但 finally 会调用 engine_core.shutdown(),而 shutdown() 清理 structured output backend,并调用 model_executor.shutdown()。
优雅退出流程:
graph TD
A["SIGTERM 到达"] --> B["signal handler<br/>置 _shutdown_requested"]
B --> C["抛出 SystemExit"]
C --> D["run_engine_core finally"]
D --> E["engine_core.shutdown()"]
E --> F["structured output backend clear"]
F --> G["model_executor.shutdown()"]
G --> H["进程退出"]
H --> I["退出进程"]
style A fill:#ef4444,color:#fff,stroke:none
style I fill:#10b981,color:#fff,stroke:none
这里不应凭空写 torch.cuda.synchronize();本地 EngineCoreProc 退出路径没有直接调用它。GPU/worker 清理由 executor/worker 的 shutdown 负责,EngineCore 层只做结构化输出 backend 和 executor 的收口。
Worker 崩溃恢复——MultiprocExecutor 会注册 failure callback。EngineCoreProc 构造时创建 input_queue,并把 executor_fail_callback 传给 EngineCore;callback 的内容是往 input_queue 放一条 (EngineCoreRequestType.EXECUTOR_FAILED, b"")。主循环处理到这条消息时抛 RuntimeError("Executor failed."),随后 run_engine_core() 捕获异常、调用 _send_engine_dead() 通知前端。
executor_fail_callback = lambda: input_queue.put_nowait(
(EngineCoreRequestType.EXECUTOR_FAILED, b""))
elif request_type == EngineCoreRequestType.EXECUTOR_FAILED:
raise RuntimeError("Executor failed.")
_send_engine_dead() 会把特殊帧 ENGINE_CORE_DEAD 放入输出队列,并等待输出线程最多 5 秒把它发出。客户端 BackgroundResources.validate_alive() 看到这个帧后设置 engine_dead=True 并抛 EngineDeadError。这里选择的是 fail-fast:EngineCore 不在这一层尝试重建部分 worker,而是把明确的 dead 信号交给前端和外部编排处理。
2.11 可观测性:EngineCore 在默默发出的信号
运维 vLLM 生产环境时,你会想知道这些事:现在有多少请求在 RUNNING?KV 块使用率多少?平均每步生成多少 token?EngineCore 的每一步都会产出一批统计数据:
# vllm/v1/engine/core.py 中 step 返回时附带 stats
class SchedulerStats:
num_running_reqs: int
num_waiting_reqs: int
gpu_cache_usage: float
prefix_cache_stats: PrefixCacheStats
spec_decoding_stats: Optional[SpecDecodingStats]
SchedulerStats 是 scheduler 每步产生的系统状态;prompt/generation token、TTFT、TPOT、请求级延迟、preemption 等则在前端 output processor 中累积到 IterationStats。AsyncLLM._record_stats() 会把 scheduler_stats 和 iteration_stats 一起交给 stat logger,Prometheus metric 名称来自 vllm/engine/metrics.py:
vllm:num_requests_running
vllm:num_requests_waiting
vllm:gpu_cache_usage_perc
vllm:lora_requests_info
vllm:num_preemptions_total
vllm:prompt_tokens_total
vllm:generation_tokens_total
vllm:iteration_tokens_total
vllm:time_to_first_token_seconds
vllm:time_per_output_token_seconds
vllm:e2e_request_latency_seconds
vllm:request_queue_time_seconds
生产调优最常看的三个:
kv_cache_usage_ratio——持续接近 1.0 意味着 KV 池快满,将触发抢占。要么加 GPU,要么降低max_num_seqsnum_requests_waiting——持续 > 0 意味着 Scheduler 吃不下了,请求开始排队。通常伴随首 token 延迟上升preempted_requests_total——持续增长意味着显存频繁紧张,被抢占的请求要重新 prefill,吞吐和延迟都会受损
此外 EngineCore 里有大量 tracing span(通过 vllm/tracing.py 的 OpenTelemetry 封装),可以追踪单个请求从 add 到 finish 的全链路耗时。生产环境建议接 Jaeger 或 Tempo 保留几天的 trace,发生"某个请求莫名其妙特别慢"的问题时能直接定位。
2.12 本章小结
EngineCore 是 vLLM 的心脏,但它的力量不在于"做什么",而在于"让正确的组件在正确的时刻做正确的事":
- 指挥者模式——EngineCore 本身几乎没有业务逻辑,所有重活都交给 Scheduler、KV Cache Manager、Executor,自己只负责按节拍协调
- 主循环 6 阶段——worker 就绪 → 批量拉输入 → 空闲等待 → 调度+执行+处理 → 发回输出,周而复始
- 数据契约最小化——跨进程只传
EngineCoreRequest(不可变),进程内用Request(带状态),界限清晰 - ZMQ 通信——inputs 是 ROUTER/DEALER 精确投递,outputs 是 PUSH/PULL 单向输出,READY 通过 DEALER 发回;PAIR 只用于同步客户端输出线程的本地 shutdown
- 四种 Client 面孔——Inproc / AsyncMP / SyncMP / DPAsyncMP,同一接口适配 4 种部署拓扑
- 请求状态机——WAITING / RUNNING / PREEMPTED / FINISHED_{STOPPED,LENGTH,ABORTED},每个迁移都有明确触发条件
- 异步流水线——普通路径同步 step;pipeline parallel 才通过 batch queue 和
FutureWrapper并行推进多个 batch - DP 协调——多 rank 通过 dummy step 保持集合通信同步
- 容错——信号处理走"设置标志 + 主循环安全退出",Worker 崩溃走 fail-fast + 外部重启
- 可观测——SchedulerStats 通过 scheduler_stats 携带到 API Server,暴露为 Prometheus 指标
下一章,我们将深入调度器 Scheduler——那个决定"谁先谁后、每人发多少 token"的裁判。它的每一个决策都直接影响吞吐量和延迟,是 vLLM 性能优化的核心战场。
源码导航
- EngineCore 主类:
vllm/v1/engine/core.py(EngineCore/EngineCoreProc)- Client 实现:
vllm/v1/engine/core_client.py(InprocClient/AsyncMPClient/SyncMPClient/DPAsyncMPClient)- Request 数据结构:
vllm/v1/request.py/vllm/v1/engine/__init__.py- MultiprocExecutor:
vllm/v1/executor/multiproc_executor.py- Scheduler 入口:
vllm/v1/core/sched/scheduler.py- Metrics:
vllm/v1/metrics/