1. DDP 是什么
DDP,全称 DistributedDataParallel。
一句话理解:
多进程 + 多卡训练。每个进程负责一张 GPU,各自算自己的数据,反向传播时同步梯度。
2. DDP 和 DP 的区别
DP(DataParallel)
单进程控制多张卡
主卡压力大
通常更慢,现在不太推荐新项目继续使用
DDP(DistributedDataParallel)
一张卡一个进程
每个进程一份完整模型
每个进程处理不同数据
backward 时自动同步梯度
性能通常更好,是官方主推方案
DP:一个进程管多张卡
DDP:一张卡一个进程,大家各算各的,反向时多个进程同步梯度
3. DDP 的基本思想
假设有 4 张 GPU:
启动 4 个进程
每个进程绑定 1 张 GPU
每个进程创建 1 份模型
数据集被切成 4 份
每个进程只训练自己的那一份
loss.backward()时同步梯度optimizer.step()后,各进程模型仍保持一致
4. 需要记住的几个词
world
world 可以粗略理解成:整个分布式训练系统里的全体进程集合
所有参与训练的进程合在一起,就是一个 world。
world_size
world_size 表示:这个 world 里一共有多少个进程
通常在 DDP 里:
1 个进程对应 1 张 GPU
所以很多时候也可以先简单理解成:
总共用了多少张卡,就常常有多少个进程
例如:
单机 4 卡:
world_size = 4两台机器,每台 4 卡:
world_size = 8
rank
rank 表示:当前进程在整个 world 里的全局编号
编号从 0 开始。
如果 world_size = 8,那就有:rank0~rank7
它是全局唯一的编号。
通常:
rank=0常被当作主进程负责打印日志、保存模型之类的工作
local_rank
local_rank 表示:当前进程在本机上的编号
如果一台机器有 4 张卡,那么本机上的进程通常有:local_rank0~local_rank3
它一般直接对应本机 GPU 编号。
常见写法是:
torch.cuda.set_device(local_rank)多机情况下举个例子:
第二台机器的
rank编号是rank4~rank7,而local_rank编号仍然是local_rank0~local_rank3
node
node 表示:一台机器 / 一台服务器
所以:
1 node = 1 台电脑或服务器
多机训练 = 多个 node 一起训练
backend
分布式通信后端。
GPU 训练一般用 nccl。
5. DDP 最核心的 4 步
第一步:初始化分布式环境
dist.init_process_group(backend="nccl")第二步:绑定当前 GPU
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)第三步:包装模型
model = model.to(device)
model = DDP(model, device_ids=[local_rank])第四步:使用分布式采样器
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, batch_size=32, sampler=sampler)这些就是从“普通单卡训练”切到“DDP 训练”最关键的改动。
6. 最小代码模板
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
def main():
dist.init_process_group(backend="nccl")
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
# 假数据
x = torch.randn(1000, 10)
y = torch.randint(0, 2, (1000,))
dataset = TensorDataset(x, y)
sampler = DistributedSampler(dataset)
loader = DataLoader(dataset, batch_size=32, sampler=sampler)
model = nn.Sequential(
nn.Linear(10, 32),
nn.ReLU(),
nn.Linear(32, 2)
).to(device)
model = DDP(model, device_ids=[local_rank])
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.SGD(model.parameters(), lr=0.01)
for epoch in range(5):
# 只有这样,每个 epoch 的 shuffle 才会正确更新
# 如果不写,分布式训练下的数据打乱可能不符合预期
sampler.set_epoch(epoch)
for bx, by in loader:
bx, by = bx.to(device), by.to(device)
optimizer.zero_grad()
out = model(bx)
loss = criterion(out, by)
loss.backward()
optimizer.step()
if dist.get_rank() == 0:
print(f"epoch={epoch}, loss={loss.item():.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
main()
这个写法和 PyTorch 官方教程、官方示例的整体流程是一致的:初始化进程组、为每个进程设置设备、将模型封装到 DDP 中,再用 DistributedSampler 切分数据。(PyTorch Docs)
7. 训练流程图
torchrun 启动多个进程
↓
每个进程绑定一张 GPU
↓
每个进程创建自己的模型副本
↓
DistributedSampler 切分数据
↓
forward
↓
backward(自动同步梯度)
↓
optimizer.step()为什么一定要用 DistributedSampler?
因为 DDP 里每个进程都在独立跑。如果不用它,可能会出现:
每个进程都读到同样的数据。
那就相当于重复训练,效率和效果都不对。PyTorch 官方教程也把
DistributedSampler作为标准用法。
8. 常见注意事项
1)只让 rank 0 打印日志 / 保存模型
不然每个进程都会打印一遍,屏幕会很热闹。
if dist.get_rank() == 0:
print(loss.item())2)保存模型注意事项
DDP 包装后,真正的原始模型在 model.module 里面。
所以保存一般写:
torch.save(model.module.state_dict(), "model.pth")
不是直接保存 model.state_dict()。这在官方示例和常见实践里都很常见。
3)batch_size 是“每个进程”的 batch size
比如:
4 张卡
batch_size=32
那么全局 batch size 其实是:32 * 4 = 128
4)模型先 .to(device),再 DDP(...)
顺序别反了。
5)GPU 训练一般优先 nccl
这是 PyTorch 官方文档对 CUDA GPU 环境的常规推荐。
6)有些动态分支模型可能需要:
DDP(model, device_ids=[local_rank], find_unused_parameters=True)因为某些参数可能这轮没有参与计算。PyTorch 官方也提到 DDP 的梯度同步依赖参数参与反向图。