• 教程 >
  • 使用 PyTorch 编写分布式应用程序
Shortcuts

使用 PyTorch 编写分布式应用程序|

作者塞布·阿诺德

在这个简短的教程中,我们将介绍 PyTorch 的分布式包。 我们将了解如何设置分布式设置、使用不同的通信策略,并介绍包的一些内部。

*

PyTorch(即torch.distributed中包含的分布式包使研究人员和实践者能够轻松地跨进程和计算机集群并行化计算。 为此,它利用消息传递语义,允许每个进程将数据传达到任何其他进程。 与多处理torch.multiprocessing) 包相反,进程可以使用不同的通信后端,并且不局限于在同一台计算机上执行。

为了开始,我们需要同时运行多个进程的能力。 如果您有权访问计算群集,则应与本地系统管理员联系或使用您喜爱的协调工具。 (例如,pdsh、群集外壳或其他)在本教程中,我们将使用一台计算机,并使用以下模板分叉多个进程。

"""run.py:"""
#!/usr/bin/env python
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process

def run(rank, size):
    """ Distributed function to be implemented later. """
    pass

def init_process(rank, size, fn, backend='gloo'):
    """ Initialize the distributed environment. """
    os.environ['MASTER_ADDR'] = '127.0.0.1'
    os.environ['MASTER_PORT'] = '29500'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)


if __name__ == "__main__":
    size = 2
    processes = []
    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

上述脚本生成两个进程,每个进程将分别设置分布式环境,初始化进程组dist.init_process_group),最后执行给定的run函数。

让我们来看看init_process函数。 它确保每个进程都能够使用相同的 IP 地址和端口通过主进程进行协调。 请注意,我们使用gloo后端,但其他后端可用。 (c.f. 第 5.1 节)我们将在本教程末尾介绍dist.init_process_group中发生的魔力,但它实质上允许进程通过共享其位置来相互通信。

点对点通信|

Send and Recv

发送和重新病毒|

将数据从一个进程传输到另一个进程称为点对点通信。 这些是通过sendrecv函数或其直接的对部,isendirecv实现的。

"""Blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        dist.send(tensor=tensor, dst=1)
    else:
        # Receive tensor from process 0
        dist.recv(tensor=tensor, src=0)
    print('Rank ', rank, ' has data ', tensor[0])

在上面的示例中,两个进程都以零张条开始,然后处理 0 递增的张条并将其发送到进程 1,以便它们最终都以 1.0 结束。 请注意,进程 1 需要分配内存才能存储它将接收的数据。

另请注意,sendrecv 正在阻塞:两个进程都停止,直到通信完成。 另一方面,即时性是非阻塞的;脚本继续其执行,方法返回一个Work对象,我们可以选择在该对象wait()

"""Non-blocking point-to-point communication."""

def run(rank, size):
    tensor = torch.zeros(1)
    req = None
    if rank == 0:
        tensor += 1
        # Send the tensor to process 1
        req = dist.isend(tensor=tensor, dst=1)
        print('Rank 0 started sending')
    else:
        # Receive tensor from process 0
        req = dist.irecv(tensor=tensor, src=0)
        print('Rank 1 started receiving')
    req.wait()
    print('Rank ', rank, ' has data ', tensor[0])

使用即时时,我们必须小心使用发送和接收的张量。 由于我们不知道数据何时将传达给其他进程,因此在req.wait()完成之前,我们不应修改发送的张数,也不应访问接收的张数。 换句话说,

  • dist.isend()之后写入tensor将导致未定义的行为。

  • dist.irecv()之后的tensor中读取将导致未定义的行为。

但是,在执行req.wait()后,我们保证通信发生了,并且存储在tensor[0]中的值为 1.0。

当我们想要对流程的通信进行细粒度控制时,点对点通信非常有用。 它们可以用来实现花哨的算法,比如百度的DeepSpeechFacebook的大规模实验中使用的算法。(c.f. 第 4.1 节

集体沟通|

Scatter

散点|

Gather

收集|

Reduce

减少|

All-Reduce

全降|

Broadcast

广播|

All-Gather

全聚集|

与点对点交流不同,集体允许跨组中的所有进程进行通信模式。 组是我们所有流程的子集。 要创建组,我们可以将排名列表传递给dist.new_group(group) 默认情况下,集体在所有进程(也称为世界)上执行。 例如,为了获得所有进程的所有张量之和,我们可以使用dist.all_reduce(张量、op、组)集合。

""" All-Reduce example."""
def run(rank, size):
    """ Simple point-to-point communication. """
    group = dist.new_group([0, 1])
    tensor = torch.ones(1)
    dist.all_reduce(tensor, op=dist.reduce_op.SUM, group=group)
    print('Rank ', rank, ' has data ', tensor[0])

由于我们想要组中的所有张量的总和,所以我们使用dist.reduce_op.SUM作为减少运算符。 一般来说,任何交换数学运算都可以用作运算符。 开箱即用的 PyTorch 附带 4 个此类运算符,所有运算符均按元素级别工作:

  • dist.reduce_op.SUM

  • dist.reduce_op.PRODUCT

  • dist.reduce_op.MAX

  • dist.reduce_op.MIN.

除了dist.all_reduce(张、运、组)外,目前在PyTorch中共有6个集体。

  • dist.广播(张数、src、组):tensor src复制到所有其他进程。

  • dist.reduce(张数、dst、op、组):op应用于所有tensor并将结果存储在dst中。

  • all_reduce(张数、op、组):与减少相同,但结果存储在所有进程中。

  • dist.scatter(张数、src、scatter_list、组):[(i}text_th_i] scatter_list[i]的"我"张数复制到[(i_text_th_)进程。

  • dist.gather(张数、dst、gather_list、组):dst中的所有进程复制tensor

  • all_gather(tensor_list、张数、组):在所有进程上复制tensortensor_list

  • dist.barrier(group)阻止中的所有进程,直到每个进程都进入此函数。

分布式培训|

注意: 你可以在这个GitHub存储库中找到本节的示例脚本。

现在,我们已经了解了分布式模块的工作原理,让我们编写一些有用的内容。 我们的目标是复制分布式数据并行的功能。 当然,这将是一个教条的例子,在现实世界中,你应该使用上面链接的官方、经过良好测试和优化的版本。

很简单,我们希望实现随机梯度下降的分布式版本。 我们的脚本将允许所有进程在其数据批次上计算其模型的梯度,然后对其梯度进行平均计算。 为了确保在更改进程数时产生类似的收敛结果,我们首先必须对数据集进行分区。 (您也可以使用tnt.dataset.SplitDataset,而不是下面的代码段。

""" Dataset partitioning helper """
class Partition(object):

    def __init__(self, data, index):
        self.data = data
        self.index = index

    def __len__(self):
        return len(self.index)

    def __getitem__(self, index):
        data_idx = self.index[index]
        return self.data[data_idx]


class DataPartitioner(object):

    def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
        self.data = data
        self.partitions = []
        rng = Random()
        rng.seed(seed)
        data_len = len(data)
        indexes = [x for x in range(0, data_len)]
        rng.shuffle(indexes)

        for frac in sizes:
            part_len = int(frac * data_len)
            self.partitions.append(indexes[0:part_len])
            indexes = indexes[part_len:]

    def use(self, partition):
        return Partition(self.data, self.partitions[partition])

使用上述代码段,我们现在可以使用以下几行对任何数据集进行分区:

""" Partitioning MNIST """
def partition_dataset():
    dataset = datasets.MNIST('./data', train=True, download=True,
                             transform=transforms.Compose([
                                 transforms.ToTensor(),
                                 transforms.Normalize((0.1307,), (0.3081,))
                             ]))
    size = dist.get_world_size()
    bsz = 128 / float(size)
    partition_sizes = [1.0 / size for _ in range(size)]
    partition = DataPartitioner(dataset, partition_sizes)
    partition = partition.use(dist.get_rank())
    train_set = torch.utils.data.DataLoader(partition,
                                         batch_size=bsz,
                                         shuffle=True)
    return train_set, bsz

假设我们有 2 个副本,则每个进程将有 60000 / 2 = 30000 个样本train_set 我们还将批处理大小除以副本数,以保持批处理大小 128。

现在,我们可以编写通常的向前向后优化训练代码,并添加函数调用以平均模型的梯度。 (以下主要灵感来自官方的PyTorch MNIST 示例

""" Distributed Synchronous SGD Example """
def run(rank, size):
    torch.manual_seed(1234)
    train_set, bsz = partition_dataset()
    model = Net()
    optimizer = optim.SGD(model.parameters(),
                          lr=0.01, momentum=0.5)

    num_batches = ceil(len(train_set.dataset) / float(bsz))
    for epoch in range(10):
        epoch_loss = 0.0
        for data, target in train_set:
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            epoch_loss += loss.item()
            loss.backward()
            average_gradients(model)
            optimizer.step()
        print('Rank ', dist.get_rank(), ', epoch ',
              epoch, ': ', epoch_loss / num_batches)

它仍然是实现average_gradients(model)函数,它简单地接受一个模型,并在全世界平均其梯度。

""" Gradient averaging. """
def average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.reduce_op.SUM)
        param.grad.data /= size

Et voil® 我们成功地实现了分布式同步SGD,可以在大型计算机集群上训练任何模型。

注意: 虽然最后一句在技术上是正确的,但实现同步 SGD 的生产级实现还需要很多技巧 同样,请使用已经测试和优化的内容。

我们自己的戒指全部减少|

作为额外的挑战,假设我们想要实现 DeepSpeech 的高效环全部减少。 这很容易使用点对点集体实现。

""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
    rank = dist.get_rank()
    size = dist.get_world_size()
    send_buff = th.zeros(send.size())
    recv_buff = th.zeros(send.size())
    accum = th.zeros(send.size())
    accum[:] = send[:]

    left = ((rank - 1) + size) % size
    right = (rank + 1) % size

    for i in range(size - 1):
        if i % 2 == 0:
            # Send send_buff
            send_req = dist.isend(send_buff, right)
            dist.recv(recv_buff, left)
            accum[:] += recv[:]
        else:
            # Send recv_buff
            send_req = dist.isend(recv_buff, right)
            dist.recv(send_buff, left)
            accum[:] += send[:]
        send_req.wait()
    recv[:] = accum[:]

在上面的脚本中,allreduce(发送、recv)函数的签名与 PyTorch 中的签名略有不同。 它需要一个recv张量,并将存储所有send张量的总和。 作为留给读者的练习,我们的版本和深度语音中的版本之间还有一个区别:它们的实现将梯度张表示分为,以便以最佳方式利用通信带宽。 (提示:火炬.chunk)

高级主题|

现在,我们准备发现一些更先进的torch.distributed 由于需要涵盖的很多部分,本节分为两个子节:

  1. 通信后端:我们学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。

  2. 初始化方法:其中我们了解如何在dist.init_process_group()中最好地设置初始协调阶段。

通信后端|

torch.distributed最优雅的方面之一是它能够抽象和构建不同的后端。 如前所述,目前在 PyTorch 中实现了三个后端:Gloo、NCCL 和 MPI。 根据所需的用例,它们都有不同的规格和权衡。 支持函数的比较表可以在这里找到。

沟后端

到目前为止,我们已经广泛使用了Gloo后端 它作为一个开发平台非常方便,因为它包含在预编译的 PyTorch 二进制文件中,适用于 Linux(自 0.2 起)和 macOS(自 1.3 年起)。 它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。 CUDA 张量器的集体操作的实现不如 NCCL 后端提供的优化。

正如您已经注意到的,如果您将model放在 GPU 上,我们分布式 SGD 示例将不起作用。 为了使用多个 GPU,让我们也进行以下修改:

  1. 使用设备 + 割炬.设备("cuda:}".格式(排名))

  2. 模型= 净() [(右箭头]) 模型 = 净()到(设备)

  3. 使用数据、目标 = 数据.to(设备)、目标到(设备)

通过上述修改,我们的模型现在在两个GPU上训练,你可以用手表nvidia-smi来监控它们的利用率。

MPI 后端

消息传递接口 (MPI) 是高性能计算领域的标准化工具。 它允许进行点对点和集体的沟通,是torch.distributedAPI的主要灵感。 MPI的几种实现存在(例如开放MPI、MVAPICH2、英特尔MPI),每个实现都针对不同的目的进行了优化。 使用 MPI 后端的优点在于 MPI 在大型计算机群集上的广泛可用性和高级别优化。 一些最近的实现还能够利用 CUDA IPC 和 GPU 直接技术,以避免通过 CPU 进行内存复制。

遗憾的是,PyTorch 的二进制文件不能包括 MPI 实现,我们必须手动重新编译它。 幸运的是,这个过程相当简单,因为编译后,PyTorch 将自行查找可用的 MPI 实现。 以下步骤安装 MPI 后端,通过从源安装 PyTorch 。

  1. 创建并激活 Anaconda 环境,按照指南安装所有先决条件,但不要运行python setup.py安装

  2. 选择并安装您最喜爱的 MPI 实现。 请注意,启用 CUDA 感知 MPI 可能需要一些其他步骤。 在我们的例子中,我们将坚持开放MPI没有GPU支持:conda安装-c conda锻造openmpi

  3. 现在,转到克隆的 PyTorch repo 并执行python setup.py安装

为了测试我们新安装的后端,需要进行一些修改。

  1. 替换以下内容 如果 __name__ == "__main__": init_process(0,0,运行,后端='mpi')。

  2. 运行mpirun -n 4 python myscript.py.

这些更改的原因是 MPI 需要在生成进程之前创建自己的环境。 MPI 还将生成自己的进程并执行初始化方法中描述的握手,使init_process_groupranksize参数变得多余。 这实际上非常强大,因为您可以将其他参数传递给mpirun以便为每个进程定制计算资源。 (例如每个进程的核心数、将计算机分配给特定级别等, 等等)这样做,您应该获得与其他通信后端相同的熟悉输出。

NCCL 后端

NCCL 后端针对 CUDA 张量提供对集体操作的优化实现。 如果仅对集体操作使用 CUDA 张条,请考虑使用此后端以获得最佳的一流性能。 NCCL 后端包含在具有 CUDA 支持的预构建二进制文件中。

初始化方法|

为了完成本教程,让我们来谈谈我们调用的第一个函数:dist.init_process_group(后端,init_method)。 特别是,我们将介绍负责每个进程之间初始协调步骤的不同初始化方法。 这些方法允许您定义如何完成此协调。 根据您的硬件设置,这些方法之一自然应该比其他方法更合适。 除了以下各节之外,您还应查看官方文档

环境变量

在本教程中,我们一直在使用环境变量初始化方法。 通过在所有计算机上设置以下四个环境变量,所有进程将能够正确连接到主进程,获取有关其他进程的信息,并最终与他们握手。

  • MASTER_PORT计算机上将承载进程的自由端口,排名为 0。

  • MASTER_ADDR将承载进程的计算机的 IP 地址排名为 0。

  • WORLD_SIZE进程总数,使主知道等待多少个工时。

  • RANK每个流程的排名,这样他们就会知道它是否是工人的主人。

共享文件系统

共享文件系统要求所有进程都有权访问共享文件系统,并将通过共享文件进行协调。 这意味着每个进程将打开文件,写入其信息,并等待每个人这样做。 之后,所有必需的信息将随时提供给所有进程。 为了避免争用条件,文件系统必须支持通过fcntl锁定。

dist.init_process_group(
    init_method='file:///mnt/nfs/sharedfile',
    rank=args.rank,
    world_size=4)

Tcp

通过提供进程 IP 地址,具有等级为 0 和可到达的端口号,可以实现通过 TCP 进行初始化。 在这里,所有工作人员将能够连接到流程与排名0,并交换信息,如何达到对方。

dist.init_process_group(
    init_method='tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)

确认

我要感谢 PyTorch 开发人员在实现、文档和测试方面做得非常好。 当代码不明确时,我始终可以依靠文档测试来找到答案。 我特别要感谢苏米斯·钦塔拉、亚当·帕什克和纳塔利娅·吉梅尔申就早期草案发表了有见地的评论和回答问题。