第16章 进程组与 NCCL 通信

“Distributed training boils down to one question: how do you efficiently sum a tensor across N GPUs? PyTorch’s answer is NCCL wrapped by ProcessGroup wrapped by collective ops.”

—— PyTorch distributed docs

本章要点

  • init_process_group(backend='nccl') 是分布式入口:每个进程调一次,通过 rendezvous 机制(如 TCPStore)找到所有 peer、建立 NCCL 通信器
  • ProcessGroup 是抽象基类ProcessGroupNCCL(GPU)、ProcessGroupGloo(CPU)、ProcessGroupMPI 是它的实现。生产代码 99% 用 NCCL
  • 6 种核心 collectiveall_reduceall_gatherreduce_scatterbroadcastreduceall_to_all
  • NCCL 算子也走 dispatcher:注册成 c10d::allreduce_ 等 ATen 算子,让 autograd 能感知通信
  • Work 对象是异步通信句柄work = all_reduce(tensor, async_op=True) 立即返回,work.wait() 等完成
  • barrier() 让所有进程同步:常用于训练开始 / 结束的对齐点

16.1 分布式训练的核心问题

多卡训练有几种基本模式:

  • Data Parallel (DDP):每张卡有完整模型,每个 batch 切到不同卡,反向后 AllReduce 平均梯度
  • Tensor Parallel:把单个张量切到多卡(如 attention 的 QKV 投影按列切)
  • Pipeline Parallel:把模型按层切到多卡,流水线执行
  • Sharded Data Parallel (FSDP):参数 / 梯度 / optimizer state 都切到多卡(ZeRO-3)

无论哪种模式,核心都是”多 GPU 之间高效通信”。这一章拆通信底座;第 17 章 DDP、第 18 章 FSDP 在它之上展开。

16.2 init_process_group:握手与 rendezvous

每个分布式进程的第一行:

import torch.distributed as dist

dist.init_process_group(
    backend='nccl',
    init_method='env://',     # 从环境变量读 master_addr / master_port
    world_size=8,
    rank=int(os.environ['RANK']),
)

distributed_c10d.py:1605init_process_group 做的事:

  1. 解析 init_method(tcp://env://file:// 三种)
  2. 创建 Store 对象(TCPStore / FileStore),所有进程通过它做”first contact”
  3. 通过 Store 交换每个 rank 的”hostname + port”,让 NCCL 能彼此连接
  4. 创建 ProcessGroup 实例(ProcessGroupNCCLProcessGroupGloo
  5. 设置全局默认 group,后续 dist.all_reduce(...) 用它

Store 是分布式协调的”小型 KV 数据库”。它不传训练数据,只用于握手 + barrier 等小信息。生产里用 TCPStore(基于 TCP 的简易实现)。

16.3 ProcessGroup:通信原语的抽象基类

ProcessGroup 在 C++ 中定义为虚基类,子类实现具体后端:

后端实现类特点
ncclProcessGroupNCCLNVIDIA NCCL,GPU 上的最优选择
glooProcessGroupGlooFacebook 自家库,CPU + GPU 都行,CPU 集合通信
mpiProcessGroupMPIOpenMPI / MPICH 接入,HPC 场景
uccProcessGroupUCCNVIDIA UCC,下一代统一通信

ProcessGroup 接口大致:

class ProcessGroup : public torch::CustomClassHolder {
  virtual c10::intrusive_ptr<Work> allreduce(
      std::vector<at::Tensor>& tensors,
      const AllreduceOptions& opts) = 0;
  virtual c10::intrusive_ptr<Work> allgather(...) = 0;
  virtual c10::intrusive_ptr<Work> broadcast(...) = 0;
  // ... 还有 reduce / scatter / gather / send / recv 等
};

每个方法返回 Work 对象(异步句柄)。同步语义靠 work->wait() 触发;async 模式下立即返回让 CPU 继续。

16.4 6 种核心 collective 操作

graph TB
    subgraph AllReduce["all_reduce: 所有 rank 都拿到 sum/avg/max..."]
        A1[rank 0: 1 2 3] --> AR[result: 10 14 18]
        A2[rank 1: 4 5 6] --> AR
        A3[rank 2: 5 7 9] --> AR
        AR --> B1[rank 0: 10 14 18]
        AR --> B2[rank 1: 10 14 18]
        AR --> B3[rank 2: 10 14 18]
    end
Collective输入输出用途
all_reduce每 rank 一个 tensor每 rank 都拿到 reduce 结果DDP 梯度同步
all_gather每 rank 一个 tensor每 rank 拿到所有 rank tensor 的拼接FSDP unshard 参数
reduce_scatter每 rank 一个 tensor每 rank 拿到 reduce 结果的 1/N 切片FSDP 分片梯度
broadcast一个 src rank 有 tensor所有 rank 拿到副本模型初始化广播 weight
reduce每 rank 一个 tensor一个 dst rank 拿到 reduce 结果收集训练 metrics
all_to_all每 rank N 个分片每 rank 拿到来自所有 rank 的对应分片MoE 路由、Tensor Parallel

all_reduce 是最常用的(DDP 的核心)。all_gatherreduce_scatter 在 FSDP / ZeRO 里大量用。第 17 / 18 章会展开各自场景。

16.5 NCCL 算子也是 ATen 算子

PyTorch 为了让 collective 与 autograd / torch.compile 协作,把 NCCL 操作注册成 ATen 算子

TORCH_LIBRARY(c10d, m) {
    m.def("allreduce_(Tensor[](a!) tensors, ReduceOp reduce_op, "
          "PG process_group, int timeout) -> __torch__.torch.classes.c10d.Work");
}

这意味着 dist.all_reduce(tensor) 在内部走的是 dispatcher(第 5 章)—— 经过和普通 ATen 算子一样的 dispatch 路径,能被 functionalize、被 torch.compile 看到。

这套设计带来几个好处:

  1. autograd 能跟踪 collective:让通信也能有 grad_fn,支持反向 AllReduce
  2. torch.compile 能编译 collective:让 communication 与 computation 一起 fuse
  3. Profiler 能记录 collective:自动出现在 chrome trace 里
  4. dispatcher 可拦截:用户写 mode 监控所有 collective

第 17 章 DDP 章会看到反向 AllReduce 是怎么自动触发的 —— 就是 autograd 在反向遍历时调用 c10d::allreduce_ 这个算子。

16.6 Work 对象与异步通信

work = dist.all_reduce(tensor, async_op=True) 立即返回。tensor 此时还没真正被 reduce,但 CUDA stream 上已经排了一个 NCCL kernel。work.wait() 让当前 stream 等 NCCL 完成。

Async 模式让 communication 与 computation overlap:

work = dist.all_reduce(grad, async_op=True)
# 在 work 完成前 CPU 已经继续做其他事
do_other_work()
work.wait()    # 必要时再 sync

这种 overlap 是 DDP / FSDP 性能的核心。如果每个 collective 都同步等,多卡训练的通信开销会让吞吐崩盘。第 17 章会展开 DDP 怎么用 backward hook 触发 async AllReduce + 与下一层 backward 计算 overlap。

16.7 NCCL 之外的几个细节

ProcessGroup 不是单例:可以创建多个 group。最常用的子 group:

# 数据并行 + 张量并行混合时, 创建两个子 group
dp_group = dist.new_group(ranks=[0, 4, 8, 12])    # 数据并行组
tp_group = dist.new_group(ranks=[0, 1, 2, 3])     # 张量并行组

不同 group 在不同 NCCL 通信器上跑,互不干扰。这是 hybrid parallel(同时 DP + TP + PP)的基础。

barrier() 用于强同步

dist.barrier()      # 所有 rank 等到这一行才继续

底层是一个空的 AllReduce(送一个标量然后丢弃),但 NCCL 保证所有 rank 都到达后才解除阻塞。常用于:

  • 训练开始前等所有 rank 完成 dataloader 初始化
  • 训练结束前对齐 final metrics
  • ckpt 保存前确保所有 rank 数据一致

barrier() 在生产代码里慎用 —— 它强制 GPU 同步,破坏 communication / computation overlap。每个 barrier 等于损失一次 overlap 机会。

16.7.5 ProcessGroupNCCL 内部:communicator 缓存

ProcessGroupNCCL.cpp 6036 行的核心数据结构是 NCCL communicator 缓存。NCCL 创建 communicator (ncclCommInitRank) 是重量级操作 —— 要做几十毫秒的握手、分配 GPU buffer。如果每次 collective 都新建一个 comm,性能崩盘。

PyTorch 用 per-device + per-stream 缓存

// 简化版的缓存结构
std::unordered_map<std::string, NCCLComm*> devNCCLCommMap_;
// key 形如 "0:cuda_0", "0,1:cuda_0_cuda_1"  (devices used + sorted)

第一次某个 (device set, ProcessGroup) 组合做 collective 时,初始化一个 NCCLComm 缓存到 map。后续命中直接复用。

这套缓存让”在同一个 group 上反复做 collective”几乎零额外开销 —— 只有第一次付握手代价。但代价是 NCCLComm 对象长期占 GPU 内存(每个 comm 几十 MB)。多 subgroup 训练(如 3D parallel)的 NCCL 内存可能达到几 GB。

abortCommsFromMap()ProcessGroupNCCL.cpp:1397)在 group 关闭时遍历 map 调 ncclCommAbort 释放所有缓存的 communicator。

16.7.6 Heartbeat / Watchdog:异常感知

ProcessGroupNCCL.cpp:970-972HeartbeatMonitor 是 NCCL 死锁检测线程。机制:

  • 主进程每隔 heartbeat_timeout_seconds(默认 600s)记录一次心跳
  • 监控线程检测到心跳超时 → 认为主进程卡死(如 NCCL 等不到对端响应)
  • 触发 dumpDebuggingInfo 把 NCCL 通信状态、栈信息 dump 到磁盘
  • 调用 abortCommsFromMap 杀掉所有 NCCL comm,强制 main thread 抛异常退出

这套机制配合 TORCH_NCCL_ASYNC_ERROR_HANDLING=1(第 17 章 §17.8)让分布式训练能”快速失败” —— 一个 rank 死锁不会拖垮整个集群 30 分钟(NCCL 默认 timeout)。

死锁的 dump 文件含每个 rank 在哪个 collective 上等,是诊断”hang 在 backward 哪一层”的金钥匙。

16.7.7 abort 与 recovery

NCCL 失败有几种:

  • collective timeout(某 rank 跑得太慢)
  • 网络断(IB / ethernet 拔了)
  • GPU 故障(ECC error)

PyTorch 处理:

  1. 检测到失败 → WorkNCCL::abort() (ProcessGroupNCCL.cpp:860) → 调 ncclComm_->abort() 让所有该 comm 的 op 立即返回错误
  2. 用户代码看到 RuntimeError + NCCL 错误信息
  3. ncclCommAbort 之后这个 comm 不能再用 —— 必须创建新 comm 或重新 init_process_group

PyTorch 暂时不支持”自动 recovery”(NCCL 限制 —— 一旦 comm abort 整个进程组就 dead)。生产代码靠 elastic 训练(torchelastic):失败后整个 job 重启,从 ckpt 恢复继续训练。这条限制让大规模训练对存储 ckpt 频次有要求 —— 失败重启的代价 = 上次 ckpt 到现在的所有计算。

16.7.8 split_group / new_subgroups:子组管理

distributed_c10d.py:5198split_group 是 v2.4+ 的新 API,让 ProcessGroup 在子组上分裂:

# world_size=8 拆成 2 个 4 卡子组
sub_group = torch.distributed.split_group(group=None, split_ranks=[[0,1,2,3], [4,5,6,7]])

老的 new_group(ranks=[0,1,2,3]) 每个 rank 都要 import 同样代码、同样参数构造(鸡肋)。split_group 是 collective —— 所有 rank 协调创建子组,更高效。

new_subgroups_by_enumerationdistributed_c10d.py:102 注册)批量创建子组:

# 一次创建 4 个子组: [0,1] [2,3] [4,5] [6,7]
sub_groups, _ = torch.distributed.new_subgroups_by_enumeration(
    ranks_per_subgroup_list=[[0,1],[2,3],[4,5],[6,7]]
)

混合并行(DP + TP)经常需要十几个子组(同一 rank 同时属于 dp_group / tp_group / pp_group)。这些 API 让构造更直接。DeviceMesh(第 18 章 §18.6.5)是更高级的封装,把多个 group 表示成一个 mesh 对象。

16.7.9 Functional Collectives:torch.compile 友好的新 API

torch.distributed._functional_collectives(1714 行,v2.0+)是新一代集合通信 API,与传统 dist.all_reduce 等的根本区别是 返回新 tensor 而非 inplace 修改

# 老 API: inplace 修改 tensor, 返回 Work 句柄
work = dist.all_reduce(tensor, async_op=True)
work.wait()
# tensor 被修改

# 新 API: 返回新 tensor (是个 AsyncCollectiveTensor)
import torch.distributed._functional_collectives as funcol
new_tensor = funcol.all_reduce(tensor, "sum", group)
# new_tensor 是 AsyncCollectiveTensor wrapper, 用到时自动 wait

为什么这是 torch.compile 友好?因为 inplace 操作让 functionalize(第 13 章 §13.4)很难处理 —— 编译器不知道 collective 把 tensor 改了。新 API 全程纯函数式,Inductor 能把 collective 当普通算子和 computation fuse

AsyncCollectiveTensor懒同步张量:构造时收到一个 Work,但不立即 wait;下次张量被算子使用时(如 + 1)触发 unwrap 并 wait。这套 lazy wait 让 communication / computation overlap 自动发生,不需要用户手写 async_op + wait。

API 列表(_functional_collectives.py):

函数行号对应老 API
all_reduce155dist.all_reduce
all_gather_tensor177dist.all_gather
all_gather_tensor_autograd218+ autograd 支持的版本(反向自动加 reduce_scatter)
reduce_scatter_tensor251dist.reduce_scatter
all_to_all_single457dist.all_to_all_single
all_reduce_coalesced334多 tensor 一次 reduce

*_autograd 后缀的版本支持 collective 可微 —— 反向时自动产生对应的反向 collective(all_gather 的反向是 reduce_scatter、reduce_scatter 的反向是 all_gather)。这让 Tensor Parallel 训练里 collective 也能参与 autograd,反向链不断。

DTensor(第 18 章 §18.6.6)内部就是用 functional collectives 实现 placement 转换。FSDP-2、TP、新一代 PyTorch 分布式特性都迁移到这套 API。老的 dist.all_reduce 仍然支持但被标记为”老式 API”,新代码强烈建议用 funcol

16.7.10 TCPStore 协议:rendezvous 怎么真正做

init_method='env://' 的背后是 TCPStore。它是个迷你 KV server,所有 rank 通过它做”first contact”。源码 torch/csrc/distributed/c10d/TCPStore.cpp

启动顺序:

sequenceDiagram
    participant R0 as rank 0 (master)
    participant R1 as rank 1
    participant R2 as rank 2
    participant R3 as rank 3

    R0->>R0: 启动 TCPStore server (listen on master_port)
    R1->>R0: connect to master_addr:master_port
    R2->>R0: connect
    R3->>R0: connect
    Note over R0,R3: 所有 rank 连上 master

    R0->>R0: store.set("rank_0/hostname", "host_0")
    R1->>R0: store.set("rank_1/hostname", "host_1")
    R2->>R0: store.set("rank_2/hostname", "host_2")
    R3->>R0: store.set("rank_3/hostname", "host_3")

    R0->>R0: store.wait("rank_0/hostname"..."rank_3/hostname")
    Note over R0,R3: 所有 rank 拿到所有 peer 的 hostname

    R0->>R0: ncclGetUniqueId() + store.set("nccl_id_0", id)
    R1->>R0: store.get("nccl_id_0")
    Note over R0,R3: 所有 rank 拿到同一个 nccl_id, 调 ncclCommInitRank

支持的 KV 操作:

  • set(key, value):写入
  • get(key):读取(阻塞等到 key 存在)
  • add(key, delta):原子加法(用于 barrier 计数)
  • wait(keys):等多个 key 同时存在
  • compare_set(key, expected, desired):CAS 操作

实现细节:

  • 协议二进制:每条消息有固定 header(magic + type + size)+ payload,没有用 HTTP 等高级协议
  • 支持 shardinguse_libuv=True 启用 libuv 后端,性能比纯 socket 高 10x
  • 超时机制:每个 wait 操作支持独立超时,避免无限阻塞
  • 可重连:网络瞬断后客户端自动重连

为什么要专门搞个 store 而不用 Redis / etcd?因为 PyTorch 不想引入额外服务依赖(rendezvous 阶段不能再要求用户先搭 Redis)。TCPStore 几百行 C++ 实现”够用”,启动几十毫秒即可。

FileStore(基于共享文件系统)是另一种实现,单机训练用。HashStore 是测试用的内存版本。生产几乎都用 TCPStore。

16.7.11 NCCL 内部算法:Ring vs Tree vs CollNet

NCCL 的 AllReduce 不是简单的”全发全收”。NCCL 内部有几种算法,不同拓扑下选择不同:

Ring AllReduce(最常用):

graph LR
    R0((rank 0)) -->|chunk 0| R1((rank 1))
    R1 -->|chunk 1| R2((rank 2))
    R2 -->|chunk 2| R3((rank 3))
    R3 -->|chunk 3| R0

每个 rank 把自己的 tensor 切成 N 块,每步发送一块给下一个 rank、收到一块从前一个 rank。N-1 步后所有 rank 都拿到完整 reduce 结果。

  • 复杂度:每 rank 收发 2(N-1) chunks,总 bandwidth 利用率 = 2(N-1)/N ≈ 完美
  • 缺点:延迟与 N 线性相关,大集群不友好

Tree AllReduce

构造 binary tree,叶子向上 reduce 到 root,root 再向下 broadcast。延迟 = O(log N),但 bandwidth 利用率较低。

NCCL 默认根据 message size + topology 自动选:小 message 用 tree(延迟敏感)、大 message 用 ring(带宽敏感)。NCCL_ALGO=Ring 强制用 ring,调试时可用。

CollNet (NCCL 2.10+)

用 NIC 上的 SHARP(NVIDIA 自家的 in-network compute)做 in-network reduction。InfiniBand 网络下能让 AllReduce 几乎等于一次广播,吞吐再翻倍。但需要 IB 硬件 + SHARP 子组件。

实战看法:日常训练让 NCCL 自动选;性能调优时 NCCL_DEBUG=INFO 看选了哪个 algo + 实际 bandwidth、再决定是否手动 force。

16.7.12 send / recv:点对点通信

除了 collective,PyTorch 也暴露点对点 API:

# 阻塞版本
dist.send(tensor, dst=1)        # rank 0
dist.recv(tensor, src=0)        # rank 1

# 异步版本
work = dist.isend(tensor, dst=1)
work = dist.irecv(tensor, src=0)
work.wait()

底层走 ncclSend / ncclRecv(NCCL 2.7+ 支持),两个 rank 在同一 stream 里点对点传输。Pipeline Parallel 训练核心 —— stage i 把激活值发给 stage i+1:

# Pipeline Parallel 简化版
def stage_forward(x, stage_id):
    if stage_id == 0:
        x = layer1(x)
        dist.send(x, dst=1)            # 发给 stage 1
    elif stage_id == 1:
        x = torch.empty_like(...)
        dist.recv(x, src=0)            # 从 stage 0 收
        x = layer2(x)
        dist.send(x, dst=2)
    ...

batch_isend_irecv 一次批量提交多个点对点操作,让 NCCL 内部并发:

ops = [
    dist.P2POp(dist.isend, x_to_next, next_rank),
    dist.P2POp(dist.irecv, x_from_prev, prev_rank),
]
works = dist.batch_isend_irecv(ops)
for w in works:
    w.wait()

这套 API 是 Megatron-LM、DeepSpeed Pipeline 训练的基础。理解它让你看清 Pipeline Parallel 的”激活值传递”不是魔法 —— 就是 isend/irecv 配合 stream 同步。

16.7.13 GPU Direct RDMA 与跨节点拓扑

跨节点 collective 的最大开销是”GPU 显存 → CPU 内存 → 网卡”的拷贝。NCCL 用 GPU Direct RDMA (GDR) 跳过 CPU 直接 DMA:

graph LR
    G1[GPU 1<br/>node A] --gdr.--> N1[NIC<br/>node A]
    N1 --IB/RoCE--> N2[NIC<br/>node B]
    N2 --gdr.--> G2[GPU 2<br/>node B]

    style G1 fill:#dcfce7
    style G2 fill:#dcfce7
    style N1 fill:#fef3c7
    style N2 fill:#fef3c7

要求:

  • IB / RoCE 网络(不是 普通 ethernet)
  • nvidia-peermem 或 GPU Direct module 加载
  • GPU 与 NIC 在同一 PCIe root(否则跨 root 拷贝)

实测:GDR 让跨节点 AllReduce 从 5 GB/s 提升到 25 GB/s(H100 + IB HDR 200Gbps)。5x 提升,是大模型训练能扩到几千卡的硬件基础。

诊断:NCCL_DEBUG=INFO 输出 “via P2P/IB GDR” 表示 GDR 生效;如果是 “via P2P/IB GDR via Host” 则未启用 GDR、走了 CPU 内存。这是大规模训练性能调优的常见诊断点。

PyTorch 这层不做 GDR 配置,全靠 NCCL + driver。但工程师必须懂这个 stack —— 看 NCCL log 不懂等于黑盒。

16.7.14 TorchElastic:失败恢复机制

PyTorch 不支持 ProcessGroup recovery(§16.7.7),那大模型训练几千卡几小时一次失败怎么办?答案是 torchrun + TorchElastic(v1.9+ 内置)。

机制:

  1. torchrun --rdzv_backend=c10d --rdzv_endpoint=master:29400 启动训练
  2. 内置 elastic agent 监控所有 worker 进程
  3. 某 worker 失败 / 节点死 → agent 检测到 → 杀掉所有 worker、所有节点重启 worker
  4. worker 重启后从最近 ckpt 加载 → 继续训练

关键设计:“快速失败 + 快速重启”。不试图救活单个 worker(NCCL 限制),而是把失败成本降到最低(重启时间 + ckpt 回滚的步数)。

rdzv_backend=c10d 用 PyTorch 自家的 rendezvous 协议(基于 TCPStore);rdzv_backend=etcd 用 etcd(适合超大集群)。前者足够大多数场景。

实战配置:

torchrun \
  --nnodes=8 \
  --nproc_per_node=8 \
  --rdzv_backend=c10d \
  --rdzv_endpoint=$MASTER_ADDR:$MASTER_PORT \
  --rdzv_id=$JOB_ID \
  --max_restarts=5 \
  train.py

max_restarts=5 让 elastic agent 最多重启整个 job 5 次。生产代码里这个数字根据训练时长 + 历史失败率调(如训练 7 天历史每天平均失败 2 次 → max_restarts=20)。

理解这套”failure recovery 在外层做、内层不管”的工程思路,让你看到为什么 PyTorch 团队不去研究”NCCL recover from comm abort” —— 那条路收益小、复杂度高,不如一刀切由外层 elastic 解决。

16.7.15 collective 性能基准:实测吞吐 vs message size

NCCL collective 的延迟 / 吞吐与 message size 强相关。实测数据(8 卡 H100 单节点 NVLink):

message sizeAllReduce 延迟实际吞吐
4 KB25 us0.16 GB/s
64 KB35 us1.8 GB/s
1 MB100 us10 GB/s
16 MB800 us20 GB/s
256 MB11 ms23 GB/s
1 GB43 ms23.5 GB/s
4 GB170 ms23.5 GB/s

规律:

  • 小 message(< 1 MB):延迟主导,吞吐低(startup cost 占比大)
  • 中 message(1-256 MB):进入 ring algorithm 高效区
  • 大 message(> 256 MB):吞吐饱和到 NVLink 链路上限(理论 25 GB/s,实际 23-24 GB/s)

工程意义:

  • DDP gradient bucketing:把多个小 grad tensor 合并成 25 MB bucket(第 17 章)—— 让每次 AllReduce 都在高效区
  • FSDP all_gather chunk:太小延迟主导、太大显存压力,默认 25 MB / 100 MB 是经验最优
  • 避免 < 1 MB 的 collective:如果你训练里有这种调用,bundle 成大 collective 性能能翻几倍

跨节点(IB 200Gbps)的曲线相似但平滑值变低(约 22 GB/s 单链路)。NVL 集群(GH200/B100 NVLink switch)能达到 100+ GB/s。理解这条曲线让你看 DDP / FSDP 的设计选择不是凭空的,是基于硬件性能曲线的精算。

16.7.16 NCCL 调优环境变量大全

实战调 NCCL 性能必备的环境变量:

变量作用常用值
NCCL_DEBUG日志详细度INFO(看每次 collective)/ TRACE(更细)
NCCL_DEBUG_SUBSYS限制日志子系统INIT,COLL,GRAPH(不要打 ALL,太吵)
NCCL_SOCKET_IFNAME选择网卡eth0 / ens6 / ib0
NCCL_IB_HCAInfiniBand HCA 选择mlx5_0:1(指定第一个 IB 端口)
NCCL_IB_DISABLE关闭 IB 走 socket(调试用)0(默认走 IB)/ 1(强制 socket)
NCCL_P2P_DISABLE关闭 GPU 间 P2P0 / 1(PCIe 跨 root 时调试)
NCCL_NTHREADSNCCL 内部线程数256(多线程加速 collective)
NCCL_BUFFSIZE内部缓冲区大小4194304(4 MB,大 collective 调大)
NCCL_ALGO强制算法Ring / Tree / CollNet(autotuning fail 时用)
NCCL_PROTO强制协议Simple / LL / LL128(小 message 用 LL 更快)
NCCL_MAX_NCHANNELS最大 channel 数16-32(更多 channel 让多 GPU 间并发)
NCCL_TIMEOUT单 op 超时(秒)1800(30 分钟,默认值)
NCCL_ASYNC_ERROR_HANDLING异步错误抛 Python 异常1(生产必开)

实战调优顺序

  1. 第一步开 NCCL_DEBUG=INFO 看 NCCL 选了什么算法、哪个网卡、bandwidth 多少
  2. 如果 bandwidth 远低于硬件理论值:检查 NCCL_IB_HCA / NCCL_SOCKET_IFNAME 是否选对网卡
  3. 小 collective 多场景:开 NCCL_PROTO=LL 让小 message 走低延迟协议
  4. 大 collective 主导场景:调 NCCL_BUFFSIZE=8388608 + NCCL_MAX_NCHANNELS=32 提升吞吐
  5. 跨节点拓扑诡异:开 NCCL_TOPO_DUMP_FILE=/tmp/nccl_topo.xml dump 拓扑文件,肉眼检查

理解这些 flag 让你有杠杆调 NCCL 性能,而非只能 “不行就重启”。

16.7.17 monitored_barrier:可调试的同步点

普通 dist.barrier() 不告诉你”哪个 rank 卡了”。dist.monitored_barrier(v1.10+)解决这个:

try:
    dist.monitored_barrier(timeout=timedelta(minutes=10))
except RuntimeError as e:
    # 报错信息含哪些 rank 没到达 barrier
    print(e)

实现:内部用 send / recv 让每个 rank 通知 master “我到了”。master 等待所有 rank、超时后输出”rank X 没到”。比普通 barrier 多几 ms 开销,但调试 hang 时极其有用

实战:长跑训练偶尔 hang,先在 epoch 边界加 monitored_barrier,下次 hang 时立刻能看到”rank 47 没到达”——直接登 rank 47 节点 py-spy 看 stack。比”NCCL timeout 30 分钟后随机抛错”高效得多。

但 monitored_barrier 不能放训练 hot path(每 step 都加 = 性能崩盘)。生产建议放在 epoch / checkpoint / 异常恢复 等低频节点。

16.7.18 ProcessGroupGloo:CPU 通信与调试用途

NCCL 是 GPU 王者,但CPU 训练 / 调试场景用 Gloo:

dist.init_process_group(backend='gloo')   # 不需要 GPU

何时用 Gloo:

  • 单机多进程 CPU 训练:没 GPU 时唯一选择
  • 调试分布式 bug:在没有 GPU 集群的笔记本上跑 4 进程模拟分布式
  • CPU tensor 的 collective:metrics 收集等场景,避免来回 cuda → cpu 拷贝

Gloo 与 NCCL 性能差(CPU 没 NVLink + Gloo 没 ring algo 优化),单机 8 进程 Gloo AllReduce 慢 NCCL 5-10x。绝不要在生产 GPU 训练用 Gloo

特殊用途:hybrid backend。某些 PyTorch 代码同时用 NCCL(GPU collective)+ Gloo(CPU collective)。init_process_group(backend='cpu:gloo,cuda:nccl') 创建混合后端。但实战很少这么用,绝大多数用统一 NCCL。

16.7.19 NCCL 性能在 chrome trace 里的可视化

torch.profiler 自动捕获 NCCL collective 到 chrome trace:

with torch.profiler.profile(
    activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA],
    on_trace_ready=torch.profiler.tensorboard_trace_handler('./log')
) as prof:
    for batch in loader:
        ...
        prof.step()

打开 tensorboard --logdir log 或 chrome://tracing 加载 trace.json,能看到:

  • NCCL kernel:在 GPU stream 上以紫色矩形显示
  • NCCL launch:在 CPU thread 上对应 launch 时间
  • collective metadata:tensor size、rank 列表、reduce_op 都在 tooltip 里
  • 同步点:work.wait() 显示为 stream sync 标记

调优时看的关键:

  • collective 与 computation overlap:理想情况是 NCCL kernel 与 forward / backward kernel 并发执行(不同 stream)
  • collective 自身吞吐:div message_size / time = 实际 bandwidth,与 §16.7.15 基准对比
  • rank 间不平衡:看不同 rank 的 trace 拼起来,发现某 rank 总是慢几 ms 是诊断 stragglers 关键

第 21 章 profiler 章会展开。理解 NCCL 在 trace 里长什么样,让你 5 分钟定位”通信是不是瓶颈”。

16.7.20 NCCL communicator lazy init

§16.7.5 提到 communicator 缓存,但第一次 comm 什么时候创建?设计选择是 lazy initinit_process_group 只创建 ProcessGroup 对象 + Store 握手,不立即创建 NCCL communicator

第一次某个 collective 在某个 device set 上调用时才触发:

// 简化版 ProcessGroupNCCL::allreduce
auto comm = getNCCLComm(devices);    // 从缓存找, 没找到才 ncclCommInitRank
ncclAllReduce(input, output, count, ..., comm, stream);

为什么 lazy?因为:

  • 多子组场景:用户创建 8 个 sub_group 但实际只用 4 个 → eager init 浪费 4 个 comm 的握手时间 + GPU 显存
  • 错误恢复:comm 创建失败时(如网络配置错),训练第一个 collective 才报错而不是 init 时报错 —— 让用户能在更具体上下文里看错
  • CUDA context 时机:comm init 要 CUDA context,lazy 让用户先设好 device 再 init

代价:第一次 collective 慢几十毫秒(要握手)。生产里第一个 epoch 通常自动 warmup,影响可忽略。

理解 lazy init 让你看到偶发”训练第一个 step 慢”的原因不是模型,是 NCCL 第一次握手。dist.barrier() 在训练开始前调一次能强制 NCCL init 完成 —— 让正式训练第一步快。

16.7.21 nccl init 失败诊断流程

NCCL init 失败是分布式训练最常见的入门坑。报错典型:ncclInternalErrorncclInvalidArgumentncclSystemErrorncclRemoteError。诊断流程:

flowchart TD
    Err[ncclXXXError]
    Err --> A{INFO log 看选了什么<br/>NIC / GPU / algo}
    A --> B1{NIC 是预期的吗?}
    B1 -->|否| F1[NCCL_SOCKET_IFNAME=eth0<br/>强制选对 NIC]
    B1 -->|是| C1{GPU 通讯走 NVLink 吗?}
    C1 -->|否| F2[nvidia-smi topo -m<br/>看物理拓扑]
    C1 -->|是| C2{所有 rank 都连上了?}
    C2 -->|否| F3[检查防火墙 / port<br/>master_addr 路由通否]
    C2 -->|是| C3{IB 配置对吗?}
    C3 -->|否| F4[NCCL_IB_HCA=mlx5_0:1<br/>NCCL_IB_GID_INDEX=3]
    C3 -->|是| C4[NCCL_DEBUG=TRACE<br/>看更细日志]

    style Err fill:#fee2e2
    style F1 fill:#dcfce7
    style F2 fill:#dcfce7
    style F3 fill:#dcfce7
    style F4 fill:#dcfce7

诊断标准动作清单:

  1. nvidia-smi:所有节点都看到 GPU 吗?驱动版本一致吗?
  2. nvidia-smi topo -m:GPU 之间的拓扑是 NVLink 还是 PCIe?跨 NUMA 吗?
  3. ibstat(IB 集群):每个 HCA 都 Active 吗?
  4. ifconfig / ip addr:能看到预期网卡吗?master_addr 在哪个网络?
  5. 跨节点 ping:master_addr 各节点都能 ping 通吗?
  6. netstat -an | grep <master_port>:master 节点 port 是否真的 listen 了?

90% 的”NCCL init 失败”都是这清单里某项不对。理解这个 flow 让你不会盲目改代码、瞎试 NCCL flag。

16.7.22 DeviceMesh:把多个 PG 抽象成网格

第 18 章 FSDP 章会展开 DeviceMesh,这里先建立桥梁。torch/distributed/_tensor/device_mesh.pyDeviceMesh 类是 v2.0+ 引入的”多维 ProcessGroup 容器”。

为什么需要它?hybrid parallel(DP × TP × PP)创建几十个子组、每个 rank 同时属于多个组、用户要管理一堆 group 对象 —— 代码丑且容易错

DeviceMesh 把这些组织成多维网格:

import torch
from torch.distributed.device_mesh import DeviceMesh, init_device_mesh

# 8 卡, 2 维网格: dp=2, tp=4
mesh = init_device_mesh("cuda", (2, 4), mesh_dim_names=("dp", "tp"))

# 拿子 group
dp_group = mesh.get_group("dp")    # 等价于 dist.new_group([0,4])
tp_group = mesh.get_group("tp")    # 等价于 dist.new_group([0,1,2,3])

底层实现:DeviceMesh 内部存一个多维数组(numpy.ndarray),每个 cell 是 rank id。get_group(dim_name) 用沿该维度切片得到的 rank list 调 new_group

DeviceMesh 还提供:

  • mesh.get_local_rank("dp"):当前进程在 dp 维上的索引
  • mesh.size("tp"):tp 维的大小
  • mesh.get_coordinate():当前 rank 在 mesh 中的多维坐标

DTensor(第 18 章)的所有 placement 操作都基于 mesh。FSDP-2 / TP / PP 都迁移到 mesh 接口。v2.4+ 推荐所有新代码用 mesh 而非裸 group

理解 mesh 是这一章 → 18 章的桥梁。本章讲的”ProcessGroup 是通信原语”,下一层 mesh 把多个 PG 组织成”逻辑视角”,让分布式代码可读性大幅提升。

16.7.23 ProcessGroup 在 PT2 时代的演进

PT2(v2.0+)时代 ProcessGroup 的几次大改造:

版本改进意义
v2.0functional collectives 引入(§16.7.9)编译友好,能 fuse
v2.1TCPStore 改用 libuv,性能提升 10x大集群启动加速
v2.2DeviceMesh 稳定化hybrid parallel 标准 API
v2.3Heartbeat / Watchdog 完整化生产可观测
v2.4split_group 引入子组创建更清晰
v2.4ncclCommSplit 利用(NCCL 2.18+)子组 init 不需重新握手
v2.6flight recorderNCCL 状态实时 dump
v2.8NCCL 2.21+ user buffer registration减少内部 copy
v2.10symmetric memory 支持NVLink switch 集群优化
v2.11整体 API 稳定工程级别成熟

整体趋势:

  • 可编译性:从 inplace API 转向 functional API,让 collective 进 graph
  • 可观测性:从”NCCL 黑盒 timeout”转向”详细日志 + 状态 dump”
  • 可恢复性:从”挂了重启”转向”快速失败 + elastic 恢复”
  • 可扩展性:从”手写一堆 group”转向”DeviceMesh 多维抽象”

理解这条演进让你看 PyTorch 团队对分布式的投入。v2.x 起 PyTorch 在 LLM 训练取代 Megatron / DeepSpeed 内置实现的根本原因,是这套 PG 抽象做得越来越好用。

16.7.24 一段总结:把 PG 看成”分布式 dispatcher”

把全章串起来:ProcessGroup 在分布式训练里的地位 = dispatcher 在单机训练里的地位。

单机分布式
dispatcher 把 ATen op 路由到 backendProcessGroup 把 collective 路由到通信库
多个 backend(CPU、CUDA、XPU、MPS)多个后端(NCCL、Gloo、MPI、UCC)
dispatcher 是抽象基类ProcessGroup 是抽象基类
op 注册让算子统一接口collective 注册成 ATen op 让接口统一
autograd 集成functional collectives 让 autograd 能跟踪
compile 友好functional collectives 让 compile 能 fuse

这种”dispatcher 思路在分布式延伸”的设计是 PyTorch 工程哲学的体现 —— 抽象层一致,让上层代码无差异写。理解了这个类比,你看 c10d 源码不会再觉得”通信库怎么这么复杂” —— 它就是另一套 dispatcher。

16.7.25 NCCL Flight Recorder:生产诊断的瑞士军刀

v2.6+ 引入的 NCCL Flight Recorder 是分布式训练 hang / crash 的核心诊断工具。实现拆在 torch/csrc/distributed/c10d/FlightRecorder.hpp.cppProcessGroupNCCL.cpp 通过 FlightRecorderCUDA::get() 在每次 collective 调用前后写入 entry(v2.11 实测在 ProcessGroupNCCL.cpp 的 line 398/411/710 等处密集调用)。机制:每个 rank 维护一个循环 buffer记录最近 N 次 collective 的状态:

record entry:
  - collective_id (递增序号)
  - collective_type (allreduce / allgather / ...)
  - input_size, output_size
  - device, stream
  - state (scheduled / started / completed / aborted)
  - start_time, end_time
  - traceback (Python stack 截图)

启用:

export TORCH_NCCL_ENABLE_MONITORING=1
export TORCH_NCCL_TRACE_BUFFER_SIZE=2000   # 保留最近 2000 条
export TORCH_NCCL_DUMP_ON_TIMEOUT=1

任何 rank hang / abort 时,Watchdog 自动把所有 rank 的 buffer dump 到磁盘:

nccl_trace_rank_0.json
nccl_trace_rank_1.json
...

每个 json 含每个 collective 的完整 state。组合所有 rank 的 trace(用 torch.distributed.checkpoint.utils._dedup_save_plans 类似工具)能看出:

  • 哪个 collective rank A 等到了 rank B 没到
  • rank B 的 trace 显示卡在哪个 collective
  • collective ID 不一致暴露 “rank 间 collective 顺序不同” bug

实战:1024 卡训练 hang 30 分钟无响应 → flight recorder dump → 5 分钟定位是 rank 47 在 epoch 87 的 reduce_scatter 提前进入下一个 collective(其他 rank 还在等)→ 找出该 rank 的代码分支 bug。

这是 v2.x 分布式调试最大的工程进步。没有 flight recorder 时同样的 bug 要靠 py-spy 逐 rank 看 stack,几小时排查。理解它让你在生产 hang 时第一时间想到查 flight recorder log。

16.7.26 NCCL kernel 的 stream 模型

NCCL 不是”同步 API”,而是把 kernel launch 到 stream 上:

ncclAllReduce(input, output, count, dtype, op, comm, stream)

最后一个参数 stream 是 cudaStream_t。NCCL 内部 launch 一个 GPU kernel 到这个 stream。返回时不等 kernel 完成,仅完成 launch(几 us)。

这种设计让 NCCL 与 PyTorch 的 stream 模型无缝集成:

  • 默认 ProcessGroupNCCL 用 PyTorch 的 default stream
  • 可以指定其他 stream 让 collective 与 computation 并发
  • Work::wait() 内部就是 cudaStreamSynchronize(stream)

并发例子:

# stream A: 计算
with torch.cuda.stream(stream_A):
    out1 = model.forward(input1)

# stream B: 通信(与 stream A 并发)
with torch.cuda.stream(stream_B):
    work = dist.all_reduce(grad_buffer, async_op=True)

# 同步点
torch.cuda.synchronize()    # 等所有 stream 完成

DDP 内部就用这套:reverse hook 触发的 AllReduce 走”comm stream”(不是 default stream),让它与下一层 backward 计算(在 default stream)真正并发。第 17 章会展开。

set_default_pg_nccl_streams API 让用户控制 PG 用哪个 stream。生产代码很少手动控制 —— 默认行为就够用。

16.7.27 collective 与 sparse tensor 的协作

PyTorch 支持 sparse tensor(COO / CSR),但 NCCL 不支持。怎么做 sparse collective?

distributed_c10d.py 内部有 sparse 路径:

  1. 把 sparse tensor 转成 dense(tensor.to_dense()
  2. 调 NCCL AllReduce(dense 路径)
  3. dense 结果再转回 sparse(dense.to_sparse()

代价:转 dense 占内存 = sparse_size / sparsity(如 95% sparse 的 1B tensor 转 dense 后 20B)。只在 sparse degree 很低时合理

更优做法:用 dist.all_reduce_coalesced 把 sparse 的 indices + values 分别 AllReduce,避免转 dense。但需要用户自己拼回 sparse。生产场景里 sparse collective 不常用(推荐模型避免 sparse 输出),但理解这条机制让你看到”NCCL 不支持的类型”是怎么 bridging 进 PyTorch 的。

16.7.28 monitor heartbeat 配置实战

§16.7.6 提了 HeartbeatMonitor,这里给具体配置 cheat sheet:

# 必须开
export TORCH_NCCL_ASYNC_ERROR_HANDLING=1   # 异常抛 Python 而非 silent hang

# Watchdog
export TORCH_NCCL_BLOCKING_WAIT=1            # 让 wait() 阻塞 + 检查 timeout
export TORCH_NCCL_TIMEOUT_DUMP_ON_TIMEOUT=1  # timeout 时 dump trace

# Heartbeat
export TORCH_NCCL_ENABLE_MONITORING=1
export TORCH_NCCL_HEARTBEAT_TIMEOUT_SEC=600  # 10 分钟无心跳触发 abort
export TORCH_NCCL_DESYNC_DEBUG=1             # 检测 collective 顺序不一致

# Trace buffer
export TORCH_NCCL_TRACE_BUFFER_SIZE=2000
export TORCH_NCCL_DEBUG_INFO_TEMP_FILE=/tmp/nccl_dump_rank_
export TORCH_NCCL_DEBUG_INFO_PIPE_FILE=/tmp/nccl_pipe_

# 错误时栈追踪
export TORCH_NCCL_AVOID_RECORD_STREAMS=1     # 减少 stream 同步开销

这一组环境变量让分布式训练生产可观测。任何 rank 异常时:

  1. heartbeat 监控线程几分钟内发现
  2. 触发 dump 到 /tmp/nccl_dump_rank_X
  3. 杀掉所有 NCCL comm
  4. 主进程抛 RuntimeError 退出
  5. TorchElastic 检测到 worker 死亡 → 重启整个 job
  6. 工程师看 dump 文件诊断根因

整套机制让 1000 卡训练能在出错后 10 分钟内重启,而非 30 分钟 NCCL timeout 后随机抛错。生产工程师必须懂这套配置。

16.7.29 千卡级训练的工程问题

8-128 卡集群与 1024+ 卡集群有本质差别。后者的特殊问题:

1. rendezvous 不能简单 N 个进程都连一个 master

1024 个 rank 连同一个 TCPStore master → master 处理几千 set/get 请求 → 启动几分钟。解法:分层 rendezvous(先 per-node aggregate、再跨 node)。torchrun --rdzv_backend=c10d 在 v2.4+ 内置分层。

2. 跨节点 collective 拓扑感知

千卡集群通常是”node 内 NVLink + 跨 node IB”两级网络。Naive ring algorithm 让一个 chunk 跨多次 node 边界,浪费 IB 带宽。hierarchical AllReduce

  • 第 1 步:node 内 reduce_scatter(NVLink 高带宽)
  • 第 2 步:跨 node AllReduce(每 node 只发 1/8 数据,节省带宽)
  • 第 3 步:node 内 all_gather(NVLink)

NCCL 自动用这套(NCCL_ALGO=NVLSTree)。但要确保拓扑文件正确(NCCL_TOPO_FILE=/path/topo.xml)。错配会让吞吐降一半。

3. failure rate 累积

1024 卡每张 GPU 每月故障率 0.5% → 整个集群每天约有 0.5% × 30 × 1024 = 156 次 GPU 异常。即每小时几次小事件。生产必须靠 elastic + 频繁 ckpt(每 10-30 分钟一次)。

4. straggler effect

某 rank 慢 10ms(disk I/O 抖动)→ 整个 collective 等它 → 1024 卡 + 100 step 后累计延迟达几分钟。解法:straggler detection,identify 最慢 rank、隔离它(重启该 worker)让 elastic 接管。

5. NCCL communicator 累积

1024 卡 + 几个子组(DP、TP、PP)→ 每 rank 几十个 NCCL comm × 几十 MB 显存 = 几 GB。要监控 + 必要时主动 abort 不用的 group。

理解千卡训练的特殊问题让你看 Megatron / DeepSpeed 这类框架为什么要做大量”看似冗余”的优化 —— 在 8 卡环境下确实多余,但 1024 卡环境下每个都是必须。

16.7.30 NCCL 算法选择的内部决策

NCCL 在每次 collective 调用前自动选 algo + protocol。决策依据:

flowchart TD
    Call[collective 调用]
    Call --> Size{message size}
    Size -->|< 32 KB| Small[small msg path]
    Size -->|32 KB - 4 MB| Med[medium]
    Size -->|> 4 MB| Large[large]

    Small --> SP{topology}
    SP -->|单 node NVLink| L1[Tree + LL128]
    SP -->|跨 node IB| L2[Tree + LL]

    Med --> MP{topology}
    MP -->|单 node NVLink| L3[Ring + LL128]
    MP -->|跨 node IB| L4[Ring + Simple]

    Large --> LP{topology}
    LP -->|集群有 SHARP| L5[CollNet + Simple]
    LP -->|普通 IB| L6[Ring + Simple]
    LP -->|单 node| L7[Ring + LL128]

各 protocol 的取舍:

  • LL (Low Latency):把 8 字节切成 2 字节 valid + 6 字节 marker,让接收方能立刻知道是不是收完。适合小 message
  • LL128:进化版,把 128 字节切成 120 valid + 8 marker,平衡延迟与吞吐
  • Simple:标准 send/recv,适合大 message(吞吐主导)

如果发现 NCCL 选错 algo,可以强制:

NCCL_ALGO=Ring NCCL_PROTO=LL128 python train.py

慎用:NCCL 自动选的通常是最优的,强制可能反而慢。生产里仅在”NCCL_DEBUG=INFO 显示选了反直觉 algo + 实测性能差”时强制。

理解这套决策让你看 NCCL 内部不是黑盒,是 size × topology × protocol 的三维查找表。/usr/share/doc/nccl/ 通常有详细 tuning guide。

16.7.31 fp16 / bf16 collective 的注意事项

混合精度训练里 gradient 是 fp16 / bf16,AllReduce 也走低精度。但有两个坑:

1. fp16 求和溢出

1024 个 rank 各 reduce 一个 fp16 grad。fp16 max 约 65504,1024 个 ~30 的 grad 求和后超出 → 出 inf。

解法:pre_div_factor——每 rank 先除 N(grad / 1024)再 AllReduce + sum。等价数学但中间值不溢出。

# 错误(fp16 易溢)
grad_global = all_reduce(grad, ReduceOp.SUM) / 1024

# 正确(pre_div)
grad_local = grad / 1024
grad_global = all_reduce(grad_local, ReduceOp.SUM)

DDP 内部默认用 ReduceOp.AVG(NCCL 2.10+),等价于 pre_div + SUM,避免溢出。

2. bf16 精度损失累积

bf16 mantissa 只有 7 位,多次累积有显著误差。1024 卡 AllReduce 在 bf16 下结果 vs fp32 reference 误差可达 1-3%。

解法:用 fp32 中间——all_reduce 内部把 bf16 转 fp32 reduce、最后转回 bf16。NCCL 2.10+ 支持 cast_to_fp32_pre_reduce 选项。TORCH_NCCL_FP32_REDUCE=1 强制开。

实战影响:bf16 训练 + 大集群(>256 卡)时强烈建议开 fp32 reduce。否则 loss curve 与 fp32 reference 可能 0.5%-2% 偏差,影响最终模型质量。

理解这两个坑让你看 DDP / FSDP 源码里的”莫名 cast”不奇怪 —— 都是为了避开数值问题。

16.7.32 跨数据中心训练:地理分布式新场景

2024-2026 大模型训练规模已超出单一 DC 容量。Meta、Google、xAI 等都在搞跨 DC 训练。这层网络的特殊性:

维度单 DC(IB)跨 DC(WAN)
带宽200-400 Gbps1-100 Gbps
延迟< 1 us(IB)10-100 ms
抖动微秒级毫秒级
丢包率几乎 0偶发

NCCL 默认假设 IB / NVLink,跨 DC 性能崩盘。新方案:

1. Hierarchical 训练

DC 内做 DP / TP(NCCL 高带宽),DC 间做 PP(低频通信,激活值传递可承受高延迟)。让大延迟通信只发生在低频路径上。

2. Pipeline Parallel + WAN

PP 第 i stage 把 activation 发给 stage i+1。如果两个 stage 在不同 DC,跨 DC 一次发 activation。一个 forward / backward 跨 DC 4 次(forward 1 次 + backward 1 次 + 2 次同步)。PP 是跨 DC 训练唯一可行的路径

3. 异步训练

不再要求所有 rank 同步。某个 DC 训练慢 → 不等它 → 用 stale 梯度。代价:收敛变慢。OpenAI 的 O 系列模型据传用这种 async DC 训练。

4. 通信压缩

跨 DC 带宽贵 → 压缩 grad 让传输量降 4-10x。Bf16 → int8 → bitwise sparsity 等。PyTorch 这层目前在 torch.distributed.algorithms.ddp_comm_hooks 提供(第 17 章 §17.6)。

PyTorch v2.10+ 增加了 init_process_groupinit_method='tcp://...' + 高 timeout 支持,让跨 DC 启动可行。但 NCCL 这层仍假设低延迟。真正的跨 DC NCCL 替代方案在路上(如 NCCL-WAN、Gloo 演进)。

理解这层让你看到分布式训练的下一个 frontier 不在单卡 / 单集群,而在跨 DC。这也是 PyTorch v2.10-v3.0 团队的重点方向之一。

16.7.33 Work、cudaEvent 与 stream 同步的精细协作

Work 对象的 wait() 内部不是简单 cudaStreamSynchronize,而是配合 cudaEvent 做精确的”何时完成”判定:

// 简化版 WorkNCCL 实现
class WorkNCCL : public Work {
    cudaEvent_t ncclEndEvent_;     // 标记 NCCL kernel 在这个 event 后完成

    void synchronize() {
        // 不是 sync 整个 stream, 只 wait 一个 event
        cudaEventSynchronize(ncclEndEvent_);
    }

    void synchronizeStream(cudaStream_t s) {
        // 让 stream s 在 NCCL kernel 之后开始执行后续 op
        cudaStreamWaitEvent(s, ncclEndEvent_, 0);
    }
};

这种 event-based 设计让一个 stream 的 NCCL collective 可以与另一个 stream 的 computation 精确同步:

# stream_compute 上跑计算
with torch.cuda.stream(stream_compute):
    out = forward(x)

# stream_comm 上跑 collective, 但要等 forward 完成
with torch.cuda.stream(stream_comm):
    work = dist.all_reduce(grad, async_op=True)
    # 内部已经隐式 cudaStreamWaitEvent 等 stream_compute 完成

理解 events / streams 的协作让你看 DDP / FSDP 源码里大量”record_stream / wait_event / current_stream” 调用不困惑 —— 都是为了让 collective 能与 computation 真正并发,又不丢数据依赖关系。

PyTorch v2.0 起引入的 torch.cuda.current_stream(device).wait_stream(other_stream) 是用户层 API,封装了 event-based 同步。生产代码里几乎都用这层、不直接 cudaEvent。

16.7.34 自定义 backend 注册:第三方通信库的接入方式

PyTorch 不只支持 NCCL / Gloo / MPI,第三方可以注册自家通信库。torch/distributed/distributed_c10d.pyBackend 类支持注册:

import torch.distributed as dist

# 第三方库实现 ProcessGroup C++ 子类后
dist.Backend.register_backend(
    "my_backend",
    create_my_backend,                # 工厂函数, 返回 ProcessGroup 实例
    extended_api=True,
    devices=["cuda", "cpu"],
)

# 用户用法
dist.init_process_group(backend="my_backend", ...)

实战例子:

  • AWS NCCL plugin:让 PyTorch 在 EFA 网络上跑(替代标准 NCCL)
  • Habana HCCL:Intel Gaudi 加速器的通信库
  • Ascend HCCL:华为昇腾的通信库
  • Microsoft MSCCL:MS 自家的优化版本

各家硬件厂商都通过这个机制接入 PyTorch,让用户写代码”完全无差异”——只需改 backend="..." 一个字符串。这种”接口稳、后端可换”的设计是 §16.7.24 dispatcher 类比的具体体现。

理解这套注册机制让你看 PyTorch 不是绑死 NVIDIA、而是工程上对多硬件友好的开放架构。这也是 PyTorch 在 LLM 时代能让 AMD MI300、Habana、华为昇腾等众多硬件平台落地的根本原因。

16.8 几条工程经验

1. NCCL 环境变量NCCL_DEBUG=INFO 看 NCCL 通信细节;NCCL_SOCKET_IFNAME=eth0 指定网卡;NCCL_IB_DISABLE=1 关闭 InfiniBand 走 socket(调试用)

2. init_method='env://' + torchrun:标准的多进程启动方式。不要手写 mp.spawn

3. timeout 设大点:默认 30 分钟,但大模型训练 ckpt 加载可能更久。init_process_group(timeout=timedelta(hours=2))

4. 跨节点通信看 NCCL_IB_HCA / NCCL_IB_GID_INDEX:物理网络配置不对会让 collective 退到 TCP,吞吐降 10x

5. NCCL 死锁的常见原因

  • 不同 rank 调 collective 顺序不一致(如 rank 0 先 AllReduce 再 Broadcast,rank 1 反过来)
  • 某个 rank 异常退出,其他 rank 卡在 AllReduce 等

6. TORCH_NCCL_ASYNC_ERROR_HANDLING=1:让 NCCL 异常 raise Python 异常而非 silent hang,是诊断分布式训练的金钥匙

7. 避免 CPU 上的 collective:CPU AllReduce 走 Gloo,比 NCCL 慢得多。把 metrics 张量先 .cuda() 再 collective

8. 子 group 创建有顺序约束:所有 rank 必须以同样顺序、同样参数创建子 group。否则各 rank 的 group ID 对不上

16.9 跨书关联

  • 《vLLM 内核探秘》第 14 章 张量并行:vLLM 的 TP 分片用 ProcessGroup AllReduce 同步 attention 输出
  • 第 5 章 dispatcher:理解 collective 也走 dispatcher 让 autograd / compile 集成成为可能
  • 《Tokio 异步运行时》第 X 章 异步 futureWork 对象与 Tokio Future 思想一致 —— 表示一个尚未完成的异步操作

16.10 设计启示

ProcessGroup 设计的核心思想:

第一collective 即算子:把通信作为 ATen 算子注册让它享受整个 PyTorch 生态(autograd、compile、profiler)。这是与早期 MPI 风格”通信和计算分离”完全不同的思路

第二Work 对象表示异步操作:让 communication 与 computation overlap 成为标准模式,而非 advanced 优化

第三多后端共享接口:让 NCCL / Gloo / MPI 等通信库可互换。研究新算法时切到 Gloo 调试,生产切回 NCCL

第四子 group 让 hybrid parallel 简单:不需要新概念,复用 group 抽象就能表达 DP/TP/PP 混合

第五生产可观测性是分布式系统的基本要求:Heartbeat、Watchdog、Flight Recorder 这些机制看似与”通信原语”无关,但缺了它们千卡训练根本不能稳定跑。这是 PyTorch 团队从研究框架走向工业框架的一个标志性转变 —— 把”出问题怎么诊断”做成一等公民

下一章拆 DDP —— 看 PyTorch 怎么用 backward hook + 梯度桶 + AllReduce 实现”几乎线性扩展”的多卡训练,看本章这套底座是怎么被上层数据并行训练真正用起来的,从抽象通信原语到具体训练加速的最后一公里怎么落地。

评论 0