• 教程 >
  • 分布式数据并行入门
Shortcuts

分布式数据并行入门|

作者沈丽

分布式数据并行(DDP) 在模块级别实现数据并行性。 它使用割炬中的通信集合。分布式包用于同步渐变、参数和缓冲区。 并行性在流程中和跨流程中都可用。 在流程中,DDP 将输入模块复制到device_ids中指定的设备,相应地沿批处理维度分散输入,并将输出收集到output_device这与DataParallel类似。 跨进程,DDP 在前进过程中插入必要的参数同步,在向后传递中插入梯度同步。 只要进程不共享 GPU 设备,用户就应将进程映射到可用资源。 建议(通常最快的)方法是为每个模块副本创建一个进程,即进程中没有模块复制。 本教程中的代码在 8-GPU 服务器上运行,但可以轻松将其推广到其他环境中。

DataParallelDistributedDataParallel的比较|

在我们深入探讨之前,让我们澄清一下为什么,尽管增加了复杂性,您还是会考虑在DataParallel上使用DistributedDataParallel

  • 首先,从上一教程中回顾,如果模型太大,无法适应单个 GPU,则必须使用模型并行将其拆分到多个 GPU 中。 DistributedDataParallel处理模型并行;DataParallel此时不会。

  • DataParallel是单进程、多线程的,仅在一台计算机上工作,而DistributedDataParallel是多进程,适用于单机和多机训练。 因此,即使对于单机训练,您的数据足够小,可以安装在一台机器上,DistributedDataParallel预计比DataParallel更快。 DistributedDataParallel还会提前复制模型,而不是在每次迭代时复制模型,并让全局解释器锁定。

  • 如果两个数据都太大,无法安装在一台计算机上,并且模型太大,无法容纳在单个 GPU 上,则可以将模型并行(跨多个 GPU 拆分单个模型)与DistributedDataParallel结合。 在此制度下,每个DistributedDataParallel进程都可以并行使用模型,并且所有进程将共同使用数据并行。

基本用例|

要创建 DDP 模块,首先正确设置流程组。 更多详细信息可在使用 PyTorch 编写分布式应用程序中找到。

import os
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

    # Explicitly setting seed to make sure that models created in two processes
    # start from same random weights and biases.
    torch.manual_seed(42)


def cleanup():
    dist.destroy_process_group()

现在,让我们创建一个玩具模块,用DDP包装它,然后用一些虚拟输入数据来馈送它。 请注意,如果培训从随机参数开始,则可能需要确保所有 DDP 进程使用相同的初始值。 否则,全局渐变同步将没有意义。

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    setup(rank, world_size)

    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))

    # create model and move it to device_ids[0]
    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

如您所见,DDP 包装较低级别的分布式通信详细信息,并提供干净的 API,就好像它是本地模型一样。 对于基本用例,DDP 只需要几个更多的 LOC 来设置流程组。 将 DDP 应用于更高级的用例时,需要注意一些注意事项。

倾斜的处理速度|

在DDP中,构造函数、正向方法和输出的区分是分布式同步点。 不同的进程应以相同的顺序到达同步点,并在大致相同的时间输入每个同步点。 否则,快速进程可能会提前到达,并在等待流器时超时。 因此,用户负责平衡跨进程的工作负载分配。 有时,由于网络延迟、资源争用、不可预测的工作负载峰值等原因,处理速度的偏斜是不可避免的。 为了避免在这些情况下超时,请确保在调用init_process_group时传递足够大timeout值。

保存和加载检查点|

在训练期间,通常使用torch.savetorch.load关点模块,然后从检查点恢复。 有关详细信息,请参阅保存和加载模型 使用 DDP 时,一种优化是仅将模型保存在一个进程中,然后将其加载到所有进程,从而减少写入开销。 这是正确的,因为所有进程从相同的参数开始,梯度在向后传递中同步,因此优化器应继续将参数设置为相同的值。 如果使用此优化,请确保在保存完成之前不会开始加载所有进程。 此外,加载模块时,您需要提供适当的map_location参数,以防止进程单步执行他人设备。 如果map_locationtorch.load将首先将模块加载到 CPU,然后将每个参数复制到保存位置,这将导致同一台计算机上的所有进程使用相同的设备集。

def demo_checkpoint(rank, world_size):
    setup(rank, world_size)

    # setup devices for this process, rank 1 uses GPUs [0, 1, 2, 3] and
    # rank 2 uses GPUs [4, 5, 6, 7].
    n = torch.cuda.device_count() // world_size
    device_ids = list(range(rank * n, (rank + 1) * n))

    model = ToyModel().to(device_ids[0])
    # output_device defaults to device_ids[0]
    ddp_model = DDP(model, device_ids=device_ids)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # All processes should see same parameters as they all start from same
        # random parameters and gradients are synchronized in backward passes.
        # Therefore, saving it in one process is sufficient.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

    # Use a barrier() to make sure that process 1 loads the model after process
    # 0 saves it.
    dist.barrier()
    # configure map_location properly
    rank0_devices = [x - rank * len(device_ids) for x in device_ids]
    device_pairs = zip(rank0_devices, device_ids)
    map_location = {'cuda:%d' % x: 'cuda:%d' % y for x, y in device_pairs}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(device_ids[0])
    loss_fn = nn.MSELoss()
    loss_fn(outputs, labels).backward()
    optimizer.step()

    # Use a barrier() to make sure that all processes have finished reading the
    # checkpoint
    dist.barrier()

    if rank == 0:
        os.remove(CHECKPOINT_PATH)

    cleanup()

将 DDP 与模型并行性相结合|

DDP 还适用于多 GPU 模型,但不支持进程中的复制。 您需要创建每个模块副本一个进程,这通常比每个进程有多个副本具有更好的性能。 DDP 包装多 GPU 模型在训练具有大量数据的大型模型时特别有用。 使用此功能时,需要仔细实现多 GPU 模型以避免硬编码设备,因为不同的模型副本将放置在不同的设备。

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)

    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)

将多 GPU 模型传递给 DDP 时,不得设置device_idsoutput_device 输入和输出数据将由应用程序或模型forward()方法放置在适当的设备中。

def demo_model_parallel(rank, world_size):
    setup(rank, world_size)

    # setup mp_model and devices for this process
    dev0 = rank * 2
    dev1 = rank * 2 + 1
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    # outputs will be on dev1
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


if __name__ == "__main__":
    run_demo(demo_basic, 2)
    run_demo(demo_checkpoint, 2)

    if torch.cuda.device_count() >= 8:
        run_demo(demo_model_parallel, 4)