torchrun
导言
vllm 的ray后端属实奇诡,ray stop有残留,flush打印被吞(虽然输出能标记ip,折叠重复,在master输出),ray集群的环境变量固定不变导致DP无法实现多机。
为此考虑使用torchrun实现多机并行。
写得太好了,由浅入深。
2. 多机启动命令¶
假设有2台机器(IP分别为192.168.1.100和192.168.1.101),每台机器使用2块GPU:
机器1(主节点):¶
torchrun \
--nnodes=2 \ # 总节点数
--nproc_per_node=2 \ # 每节点GPU数
--node_rank=0 \ # 当前节点编号(主节点为0)
--rdzv_id=12345 \ # 集群唯一ID
--rdzv_backend=c10d \ # 通信后端
--rdzv_endpoint=192.168.1.100:29500 \ # 主节点IP和端口
multinode_train.py
rdzv是Rendezvous的缩写,意思是“集合”或“会合”
--rdzv_backend
- c10d:PyTorch 自带的基于 TCP 的实现(默认推荐,适合大多数场景)。
- etcd:依赖外部 etcd 服务器(适合需要高可靠性的生产环境)。
- static:静态配置(需手动指定所有节点地址,灵活性差)。
机器2(从节点):¶
torchrun \
--nnodes=2 \
--nproc_per_node=2 \
--node_rank=1 \ # 从节点编号为1
--rdzv_id=12345 \ # 必须与主节点一致
--rdzv_backend=c10d \
--rdzv_endpoint=192.168.1.100:29500 \ # 指向主节点IP
multinode_train.py
4. 单机多卡 vs 多机多卡¶
特性 | 单机多卡命令示例 | 多机差异点 |
---|---|---|
启动命令 | torchrun --nproc_per_node=4 train.py |
需指定--nnodes 和--rdzv_endpoint |
通信延迟 | 低(NVLink/NVSwitch) | 高(依赖网络带宽) |
数据分片 | 自动分配 | 跨节点自动分配 |
典型应用场景 | 单台服务器多GPU | 大规模模型训练(如LLM) |
单机通信域变多机¶
要实现 8机8卡 的全局通信,使用 torchrun
可以避免手动管理 hostip
、port
、rank
等参数,同时确保分布式环境自动配置。以下是迁移现有代码到 torchrun
的完整方案:
一、代码改造:移除手动初始化,适配 torchrun
¶
原代码(手动初始化)可能类似:¶
import torch.distributed as dist
# 手动传入参数初始化通信域
dist.init_process_group(
backend="nccl",
init_method=f"tcp://{hostip}:{port}",
rank=rank,
world_size=world_size
)
改造后的代码(适配 torchrun
):¶
import os
import torch
import torch.distributed as dist
def setup():
# 自动从环境变量读取参数,建立通讯域,并且这里有隐藏同步,必须world_size所有的rank都执行了才会继续。
dist.init_process_group(backend="nccl", init_method="env://")
class MyModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.fc = torch.nn.Linear(10, 10)
def forward(self, x):
return self.fc(x)
def main():
setup()
# 自动获取全局参数,不需要init_process_group也有下面环境变量
global_rank = int(os.environ["RANK"]) # 全局进程编号 (0~63)
local_rank = int(os.environ["LOCAL_RANK"]) # 节点内GPU编号 (0~7)
world_size = int(os.environ["WORLD_SIZE"]) # 总进程数 (8机×8卡=64)
# f"Master Addr: {os.environ['MASTER_ADDR']}, "
# f"Master Port: {os.environ['MASTER_PORT']}"
# 模型部署到当前GPU
device = torch.device(f"cuda:{local_rank}")
model = MyModel().to(device)
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank])
# 后续训练代码(数据加载、优化器等)...
if __name__ == "__main__":
main()
二、torchrun
启动命令:8机8卡配置¶
1. 主节点(第0台机器)命令:¶
torchrun \
--nnodes=8 \ # 总节点数
--nproc_per_node=8 \ # 每节点GPU数
--node_rank=0 \ # 主节点编号为0
--rdzv_id=your_job_id \ # 任务唯一标识(任意字符串)
--rdzv_backend=c10d \ # 通信后端
--rdzv_endpoint=主节点IP:29500 \ # 主节点IP和端口
train.py
2. 从节点(第1~7台机器)命令:¶
torchrun \
--nnodes=8 \
--nproc_per_node=8 \
--node_rank=1 \ # 第1台机器为1,第2台为2,...,第7台为7
--rdzv_id=your_job_id \ # 必须与主节点一致
--rdzv_backend=c10d \
--rdzv_endpoint=主节点IP:29500 \ # 指向主节点
train.py
三、关键配置说明¶
参数 | 作用 | 示例值 |
---|---|---|
--nnodes |
总机器数 | 8 |
--nproc_per_node |
每台机器的GPU数量 | 8 |
--node_rank |
当前机器的编号(0~7) | 主节点为0 ,其他递增 |
--rdzv_endpoint |
主节点的IP和端口 | 192.168.1.100:29500 |
--rdzv_id |
任务唯一标识符,用于防止集群冲突 | 任意字符串如my_job_123 |
四、多机通信原理与验证¶
1. 通信流程¶
- 节点发现:所有从节点通过
--rdzv_endpoint
连接到主节点,注册自身信息。 - 全局编组:主节点收集所有节点的
IP:Port
信息,构建全局通信组。 - 数据同步:使用 NCCL 后端进行跨机 GPU 通信(如梯度同步)。
2. 环境变量验证¶
在代码中添加以下调试信息,确认参数正确:
print(
f"Global Rank: {global_rank}, "
f"Local Rank: {local_rank}, "
f"World Size: {world_size}, "
f"Master Addr: {os.environ['MASTER_ADDR']}, "
f"Master Port: {os.environ['MASTER_PORT']}"
)
3. NCCL 网络调优(可选)¶
在启动命令前设置环境变量,优化跨机通信性能:
export NCCL_IB_DISABLE=1 # 禁用InfiniBand(若无IB网卡)
export NCCL_SOCKET_IFNAME=eth0 # 指定网卡名称
export NCCL_DEBUG=INFO # 打印NCCL日志
五、常见问题与解决¶
- 节点无法连接主节点
- 检查主节点防火墙是否开放
--rdzv_endpoint
指定的端口(如29500
)。 -
确保所有机器使用相同版本的 PyTorch 和 NCCL。
-
GPU 未正确绑定
- 使用
nvidia-smi
确认每台机器的 GPU 状态。 - 在代码中强制指定 GPU 设备:
torch.cuda.set_device(local_rank)
。
torch.distributed 常见接口¶
- new_group()创建的子进程组是基于已初始化的全局进程组(通过dist.init_process_group)的通信通道的,因此复用全局组的网络配置(如IP、端口、后端等)
torchrun
和init_process_group
¶
在PyTorch的分布式训练中,torchrun
和init_process_group
的关系需要明确区分,以下是关键点解析:
1. torchrun
的自动初始化¶
• 作用:
torchrun
是PyTorch提供的分布式训练启动工具,自动完成以下任务:
1. 进程管理:根据参数(如--nproc_per_node
)生成子进程。
2. 环境变量注入:自动设置RANK
、WORLD_SIZE
、MASTER_ADDR
、MASTER_PORT
等关键变量。
3. Rendezvous调用:通过指定的后端(如c10d
)执行Rendezvous机制,完成节点发现和同步。
• 结果:
所有进程的默认通信组(default_pg
)已初始化完成,无需手动调用init_process_group
。
5. 调试技巧¶
• 检查进程组状态:
在代码中验证默认进程组是否已初始化:
• 捕获重复初始化错误:
使用异常处理避免程序崩溃:
try:
dist.init_process_group(...)
except RuntimeError as e:
if "already initialized" in str(e):
print("忽略重复初始化")
else:
raise e
torchrun程序入口打印torch.distributed.is_initialize() 结果是false
rendezvous 与 HCCL¶
rendezvous
与通信后端(如 TCP、HCCL、Gloo、MPI)在 PyTorch 分布式训练中扮演不同角色,核心区别如下:
一、功能定位¶
组件 | 核心职责 | 阶段 |
---|---|---|
Rendezvous | 负责节点发现、角色分配(rank /world_size )、共享存储(如 TCPStore )的创建 |
初始化阶段 |
通信后端 | 负责节点间数据传输(如梯度同步、参数更新) | 训练阶段 |
• 关键区别:
Rendezvous 是协调逻辑,确保所有节点达成一致并建立通信基础;而 TCP/HCCL/Gloo/MPI 是通信实现,负责具体数据传输。
二、技术实现对比¶
1. Rendezvous¶
• 实现机制:
通过分布式同步原语(如 barrier)实现节点间的强一致性,生成共享的键值存储(Store
)用于后续通信。
• 依赖关系:
可能基于 TCP 协议(如 TCPStore
)实现初始节点发现,但仅用于初始化阶段,不涉及训练数据传输。
• 容错性:
支持弹性训练(节点动态加入/退出),通过重新执行 Rendezvous 恢复训练(如 torch.distributed.elastic
)。
2. 通信后端¶
后端 | 适用场景 | 技术特性 |
---|---|---|
TCP | 通用场景,依赖 TCP 协议的点对点通信 | 稳定性高,但延迟较大;适用于 CPU 训练或网络条件较差的异构环境 |
HCCL | 华为昇腾(Ascend)芯片的专用后端 | 针对昇腾芯片优化,支持 RDMA 和集合通信操作(类似 NCCL) |
Gloo | 多机 CPU 训练或少量 GPU 场景 | 基于 libuv 实现异步通信,支持多线程;适合小规模 AllReduce 操作 |
MPI | 依赖已有 MPI 环境(如 HPC 集群) | 需预装 OpenMPI/MVAPICH,支持复杂拓扑;适合大规模科学计算 |
• 性能差异:
• HCCL/Gloo/NCCL 针对集合通信优化,适合 GPU/昇腾芯片的高吞吐场景。
• TCP/MPI 更通用,但性能受网络协议栈限制。
三、协作关系¶
- 初始化流程:
• Rendezvous 使用 TCP 等协议完成节点同步,创建Store
(如TCPStore
)。
• 通信后端(如 HCCL/Gloo)基于Store
初始化数据传输通道。 - 训练流程:
• 使用通信后端执行 AllReduce、Broadcast 等操作,与 Rendezvous 无关。
• 若节点故障,Rendezvous 重新触发以重建通信组,通信后端需支持动态重建(如 Gloo 需重启进程)。
四、选择建议¶
- Rendezvous 选择:
• 单机训练:默认c10d
后端(自动管理)。
• 多机弹性训练:需配合etcd
或c10d
实现动态节点管理。 - 通信后端选择:
• GPU 集群:优先 NCCL(NVIDIA)或 HCCL(昇腾)。
• CPU 集群/小规模训练:选择 Gloo 或 TCP。
• HPC 环境:MPI(需系统支持)。
五、总结¶
• Rendezvous 是分布式训练的“协调者”,解决“谁参与训练”和“如何初始化”的问题。
• 通信后端 是“执行者”,解决“数据如何高效传输”的问题。
• 两者协同工作:Rendezvous 为通信后端提供基础环境,通信后端为训练提供性能保障。
唯一的rendezvous与group¶
在 PyTorch 的分布式训练框架中,rendezvous
和 torch.distributed.new_group()
扮演着完全不同的角色。它们的差异源自设计目的、作用域和底层实现机制。以下是关键原因分析:
一、设计目的不同¶
组件 | 用途 | 生命周期 |
---|---|---|
rendezvous |
负责全局进程的发现和初始化(建立通信基础架构) | 仅在训练初始化阶段执行一次 |
new_group() |
在已初始化的分布式环境中,创建子进程组(用于更细粒度的通信) | 训练中可多次动态创建 |
• 类比:
rendezvous
类似于搭建整个城市的电力系统(全局基础设施),而 new_group()
类似于为某个社区安装独立电路(局部优化)。
二、rendezvous
只能有一个的原因¶
1. 全局唯一的基础设施¶
• 核心功能:
rendezvous
创建了分布式训练的全局通信后端(如 gloo
、nccl
)和共享存储(如 TCPStore
)。这些是跨所有进程的单例资源。
• 技术实现:
PyTorch 通过 torch.distributed.init_process_group()
初始化主进程组,其内部调用 rendezvous
完成全局同步。该过程会绑定全局唯一的 rank
和 world_size
。
2. 同步机制的不可重复性¶
• 单次握手:
所有进程必须通过一次 rendezvous
完成初始同步(确定彼此的存在和角色)。若重复调用,会导致进程角色混乱(如 rank
冲突)。
• 资源竞争:
多组 rendezvous
会创建多个通信后端,占用独立端口或存储,引发资源竞争和死锁。
3. 代码示例验证¶
# 初始化全局进程组(隐含调用 rendezvous)
torch.distributed.init_process_group(backend="nccl", init_method="env://")
# 尝试再次初始化(会抛出错误)
try:
torch.distributed.init_process_group(backend="gloo") # 报错:已有默认进程组
except RuntimeError as e:
print(e) # "Default process group has already been initialized"
三、new_group()
可创建多个的原因¶
1. 基于现有基础设施的局部优化¶
• 作用域限制:
new_group()
在已初始化的全局进程组基础上,通过逻辑分组实现局部通信。它不创建新的通信后端,而是复用现有基础设施。
• 轻量级设计:
子进程组仅维护进程列表(如 ranks=[0,1]
),不涉及底层网络资源(端口、存储)的重复分配。
2. 灵活的子集通信需求¶
• 典型场景:
• 在数据并行中,将部分 GPU 分组进行模型同步(如 ranks=[0,1]
和 ranks=[2,3]
)。
• 在模型并行中,为不同模型分片分配独立的通信组。
• 代码示例:
# 全局进程组已初始化(backend=nccl, world_size=4)
group1 = torch.distributed.new_group(ranks=[0, 1]) # GPU 0-1 的通信组
group2 = torch.distributed.new_group(ranks=[2, 3]) # GPU 2-3 的通信组
# 在 group1 内执行 AllReduce 操作(仅 GPU 0-1 参与)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM, group=group1)
3. 技术实现对比¶
操作 | 底层动作 |
---|---|
rendezvous |
创建全局通信后端(如 TCP 连接)、分配 rank/world_size、建立共享键值存储 |
new_group() |
在已有后端中注册子进程列表,生成逻辑分组句柄(不涉及硬件资源分配) |
四、总结:为何设计如此?¶
1. 性能与资源效率¶
• rendezvous
单例化:
避免重复初始化通信后端(如多次绑定端口、建立连接),减少资源浪费和潜在冲突。
• new_group()
动态化:
允许按需创建轻量级逻辑分组,满足复杂训练场景的需求(如混合并行)。
2. 分布式系统的约束¶
• 全局一致性要求:
所有进程必须在同一通信框架下工作,否则无法保证同步性和数据一致性。
• 局部灵活性的妥协:
在全局一致的基础上,通过子进程组实现局部优化,平衡灵活性与系统复杂度。
3. 用户接口的友好性¶
• 简化初始化流程:
用户只需调用一次 init_process_group()
,无需手动管理多个 rendezvous
。
• 按需创建子组:
通过 new_group()
满足特定通信需求,而无需重新设计整个分布式架构。
五、最佳实践¶
- 严格单次调用
rendezvous
:
使用torchrun
或init_process_group()
自动管理全局初始化,避免手动干预。 - 合理使用子进程组:
在以下场景使用new_group()
: • 需要部分进程间的独立通信(如模型并行)。 • 希望减少 AllReduce 的通信量(如分组聚合梯度)。 - 避免子进程组的滥用:
过多的子组会增加逻辑复杂度,可能影响性能。建议通过torch.distributed.GroupMember
管理分组生命周期。