• 教程 >
  • (高级)使用Amazon AWS 的 PyTorch 1.0 分布式训练器
Shortcuts

(高级)使用亚马逊 AWS 的 PyTorch 1.0 分布式培训师|

作者内森·因卡

编辑:滕丽

在本教程中,我们将演示如何跨两个多 gpu Amazon AWS 节点设置、编码和运行 PyTorch 1.0 分布式培训器。 我们将首先描述 AWS 设置,然后是 PyTorch 环境配置,最后是分布式培训师的代码。 希望您会发现,将当前培训代码扩展到分布式应用程序实际上只需很少的代码更改,并且大部分工作是在一次性环境设置中。

亚马逊 AWS 设置|

在本教程中,我们将跨两个多 gpu 节点运行分布式训练。 在本节中,我们将首先介绍如何创建节点,然后如何设置安全组,以便节点可以相互通信。

创建节点|

在 Amazon AWS 中,创建实例有七个步骤。 要开始使用,请登录并选择"启动实例"。

步骤 1: 选择亚马逊机器映像 (AMI) - 在这里我们将选择深度学习 AMI (Ubuntu) 版本 14.0 如上所述,此实例附带了许多最流行的深度学习框架,并预配置了 CUDA、cuDNN 和 NCCL。 这是本教程的一个很好的起点。

步骤 2:选择实例类型- 现在,选择称为p2.8xlarge的 GPU 计算单元。 请注意,每个实例的成本都不同,但此实例为每个节点提供 8 个 NVIDIA Tesla K80 GPU,并为多 gpu 分布式训练提供了良好的体系结构。

步骤 3:配置实例详细信息- 此处要更改的唯一设置是将实例数增加到 2。 所有其他配置可能保留为默认值。

步骤 4:添加存储- 请注意,默认情况下,这些节点不附带大量存储(仅 75 GB)。 在本教程中,由于我们只使用 STL-10 数据集,因此存储量很大。 但是,如果要在更大的数据集(如 ImageNet)上进行训练,则必须添加更多存储,以便适合要保存的数据集和任何经过定型的模型。

第 5 步:添加标记- 此处不执行任何操作,只需继续操作即可。

步骤 6:配置安全组- 这是配置过程中的关键步骤。 默认情况下,同一安全组中的两个节点将无法在分布式培训设置中进行通信。 在这里,我们希望为两个节点创建新的安全组 但是,在此步骤中,我们无法完成配置。 现在,只需记住新的安全组名称(例如启动向导-12),然后转到步骤 7。

步骤 7:查看实例启动- 在此处,查看实例,然后启动实例。 默认情况下,这将自动开始初始化这两个实例。 可以从仪表板监视初始化进度。

配置安全组|

回想一下,在创建实例时,我们无法正确配置安全组。 启动实例后,在 EC2 仪表板中选择"网络和安全组"选项卡。 这将列出您有权访问的安全组的列表。 选择您在步骤 6 中创建的新安全组(即启动向导-12),该组将显示名为"说明"、"入站"、"出站"和"标记"的选项卡。 首先,选择"入站"选项卡和"编辑"以添加规则,以允许启动向导-12 安全组中的"源"中的"所有流量"。 然后选择"出站"选项卡,然后执行完全相同的操作。 现在,我们有效地允许启动向导-12 安全组中的节点之间的所有类型入站和出站流量。

必要信息|

在继续之前,我们必须查找并记住两个节点的 IP 地址。 在 EC2 仪表板中找到正在运行的实例。 对于这两种情况,请记下IPv4 公共 IP专用 IP 在本文的其余部分中,我们将这些称为节点0-公共IP、节点0-私有IP、节点1-公共IP节点1-私有IP。 公共 IP 是我们将用于 SSH 的地址,专用 IP 将用于节点间通信。

环境设置|

下一个关键步骤是设置每个节点。 遗憾的是,我们无法同时配置两个节点,因此此过程必须在每个节点上单独完成。 但是,这是一次性设置,因此一旦正确配置了节点,就不必为将来的分布式训练项目重新配置。

第一步,一旦登录到节点,是创建一个新的conda环境与python 3.6和numpy。 创建后激活环境。

$ conda create -n nightly_pt python=3.6 numpy
$ source activate nightly_pt

接下来,我们将在 conda 环境中安装启用 Cuda 9.0 的夜间构建的 PyTorch。"

$ pip install torch_nightly -f https://download.pytorch.org/whl/nightly/cu90/torch_nightly.html

我们还必须安装火炬视觉,以便我们可以使用火炬视觉模型和数据集。 此时,我们必须从源构建割炬,因为 pip 安装默认将在我们刚刚安装的夜间构建顶部安装旧版本的 PyTorch。

$ cd
$ git clone https://github.com/pytorch/vision.git
$ cd vision
$ python setup.py install

最后,非常重要的步骤是设置 NCCL 套接字的网络接口名称。 这是使用环境变量NCCL_SOCKET_IFNAME设置的。 要获取正确的名称,请运行节点上的ifconfig命令,并查看对应于节点的私有 IP的接口名称(例如 ens3)。 然后将环境变量设置为

$ export NCCL_SOCKET_IFNAME=ens3

请记住,在两个节点上执行此操作。 您也可以考虑将NCCL_SOCKET_IFNAME设置添加到.bashrc 一个重要的观察是,我们没有在节点之间设置共享文件系统。 因此,每个节点必须具有代码副本和数据集副本。 有关在节点之间设置共享网络文件系统的详细信息,请参阅此处

分布式培训代码|

随着实例的运行和环境设置,我们现在可以进入培训代码。 此处的大多数代码都取自PyTorch ImageNet 示例,该示例还支持分布式训练。 此代码为自定义培训师提供了一个良好的起点,因为它具有许多样板训练循环、验证循环和准确性跟踪功能。 但是,您会注意到,为了简单起见,已剥离参数解析和其他非基本函数。

在此示例中,我们将使用torchvision.model.resnet18模型,并在火炬视觉.datasets.STL10数据集上训练它。 为了适应 STL-10 与 Resnet18 的维数不匹配,我们将使用转换将每个图像调整为 224x224。 请注意,模型和数据集的选择与分布式训练代码正交,您可以使用任何您希望的数据集和模型,并且过程相同。 让我们首先处理导入并讨论一些帮助器函数。 然后,我们将定义训练和测试函数,这些函数主要取自 ImageNet 示例。 最后,我们将构建处理分布式培训设置的代码的主要部分。 最后,我们将讨论如何实际运行代码。

进口|

这里重要的分布式训练具体进口是火炬.nn.parallel、火炬分发火炬.utils.data.和火炬.Multi处理 将多处理启动方法设置为生成分叉服务器(仅在 Python 3 中仅受支持)也很重要,因为默认值为分叉,当使用多个工作进程进行数据加载时,可能会导致死锁。

import time
import sys
import torch

if __name__ == '__main__':
    torch.multiprocessing.set_start_method('spawn')

import torch.nn as nn
import torch.nn.parallel
import torch.distributed as dist
import torch.optim
import torch.utils.data
import torch.utils.data.distributed
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models

from torch.multiprocessing import Pool, Process

帮助器功能|

我们还必须定义一些帮助器函数和类,这将使培训更容易。 AverageMeter跟踪训练统计信息,如准确性和迭代计数。 accuracy函数计算并返回模型的前 k 精度,以便我们可以跟踪学习进度。 两者都是为了培训的便利而提供的,但两者都没有专门进行分布式培训。

class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def accuracy(output, target, topk=(1,)):
    """Computes the precision@k for the specified values of k"""
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

列车功能|

为了简化主循环,最好将训练纪元步骤分离为称为train"的功能。 该函数为train_loader的一个纪元训练输入模型。 此函数中唯一的分布式训练工件是在转发传递之前将数据non_blocking属性和将张量设置为True 这允许数据的异步 GPU 副本,这意味着传输可以与计算重叠。 此函数还沿途输出训练统计信息,以便我们可以跟踪整个纪念的进度。

这里要定义的另一个函数是adjust_learning_rate它按固定的时间表衰减初始学习速率。 这是另一个样板训练器功能,可用于训练准确的模型。

def train(train_loader, model, criterion, optimizer, epoch):

    batch_time = AverageMeter()
    data_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to train mode
    model.train()

    end = time.time()
    for i, (input, target) in enumerate(train_loader):

        # measure data loading time
        data_time.update(time.time() - end)

        # Create non_blocking tensors for distributed training
        input = input.cuda(non_blocking=True)
        target = target.cuda(non_blocking=True)

        # compute output
        output = model(input)
        loss = criterion(output, target)

        # measure accuracy and record loss
        prec1, prec5 = accuracy(output, target, topk=(1, 5))
        losses.update(loss.item(), input.size(0))
        top1.update(prec1[0], input.size(0))
        top5.update(prec5[0], input.size(0))

        # compute gradients in a backward pass
        optimizer.zero_grad()
        loss.backward()

        # Call step of optimizer to update model params
        optimizer.step()

        # measure elapsed time
        batch_time.update(time.time() - end)
        end = time.time()

        if i % 10 == 0:
            print('Epoch: [{0}][{1}/{2}]\t'
                  'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                  'Data {data_time.val:.3f} ({data_time.avg:.3f})\t'
                  'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                  'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                  'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                   epoch, i, len(train_loader), batch_time=batch_time,
                   data_time=data_time, loss=losses, top1=top1, top5=top5))

def adjust_learning_rate(initial_lr, optimizer, epoch):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = initial_lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

验证功能|

为了跟踪概括性能并进一步简化主循环,我们还可以将验证步骤提取到称为validate的函数中。 此函数在输入验证数据加载器上运行输入模型的完整验证步骤,并在验证集中返回模型的前 1 精度。 同样,您会注意到此处唯一的分布式训练功能是在将训练数据和标签传递到模型之前为训练数据和标签设置non_blocking=True

def validate(val_loader, model, criterion):

    batch_time = AverageMeter()
    losses = AverageMeter()
    top1 = AverageMeter()
    top5 = AverageMeter()

    # switch to evaluate mode
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (input, target) in enumerate(val_loader):

            input = input.cuda(non_blocking=True)
            target = target.cuda(non_blocking=True)

            # compute output
            output = model(input)
            loss = criterion(output, target)

            # measure accuracy and record loss
            prec1, prec5 = accuracy(output, target, topk=(1, 5))
            losses.update(loss.item(), input.size(0))
            top1.update(prec1[0], input.size(0))
            top5.update(prec5[0], input.size(0))

            # measure elapsed time
            batch_time.update(time.time() - end)
            end = time.time()

            if i % 100 == 0:
                print('Test: [{0}/{1}]\t'
                      'Time {batch_time.val:.3f} ({batch_time.avg:.3f})\t'
                      'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                      'Prec@1 {top1.val:.3f} ({top1.avg:.3f})\t'
                      'Prec@5 {top5.val:.3f} ({top5.avg:.3f})'.format(
                       i, len(val_loader), batch_time=batch_time, loss=losses,
                       top1=top1, top5=top5))

        print(' * Prec@1 {top1.avg:.3f} Prec@5 {top5.avg:.3f}'
              .format(top1=top1, top5=top5))

    return top1.avg

输入

随着帮助器功能的方式,现在我们已经到达了有趣的部分。 下面是我们将定义运行输入的位置。 有些输入是标准模型训练输入,如批处理大小和训练纪元数,有些特定于我们的分布式训练任务。 所需的输入包括:

  • batch_size - 分布式培训组中每个进程的批处理大小。 跨分布式模型的总批处理大小batch_size=world_size

  • 工作人员- 每个进程中与数据加载器一起使用的工作进程数

  • num_epochs - 要训练纪元的总次数

  • starting_lr - 开始培训学习率

  • world_size - 分布式培训环境中的流程数

  • dist_backend - 用于分布式培训通信的后端(即 NCCL、Gloo、MPI 等)。 在本教程中,由于我们使用的是多个多 gpu 节点,因此建议使用 NCCL。

  • dist_url - URL 以指定流程组的初始化方法。 这可能包含 rank0 进程的 IP 地址和端口,或者是共享文件系统上的不存在的文件。 在这里,因为我们没有一个共享文件系统,这将包含节点0-私有IP和节点0上的端口使用。

print("Collect Inputs...")

# Batch Size for training and testing
batch_size = 32

# Number of additional worker processes for dataloading
workers = 2

# Number of epochs to train for
num_epochs = 2

# Starting Learning Rate
starting_lr = 0.1

# Number of distributed processes
world_size = 4

# Distributed backend type
dist_backend = 'nccl'

# Url used to setup distributed training
dist_url = "tcp://172.31.22.234:23456"

输出:

Collect Inputs...

初始化流程组|

PyTorch 分布式培训最重要的部分之一是正确设置流程组,这是初始化torch.distributed的第步。 为此,我们将使用torch.distributed.init_process_group函数,该函数需要多个输入。 首先,一个后端输入,指定要使用的后端(即 NCCL、Gloo、MPI 等)。 init_method输入,该输入是包含 rank0 计算机的地址和端口的 URL,或是共享文件系统上不存在文件的路径。 请注意,要使用文件init_method,所有计算机都必须有权访问该文件,与 url 方法类似,所有计算机都必须能够在网络上进行通信,因此请确保配置任何防火墙和网络设置以适应。 init_process_group函数还采用排名world_size参数,分别指定运行时此进程的排名和集合中的进程数。 init_method输入也可以"env://"。 在这种情况下,将分别从以下两个环境变量读取 rank0 计算机的地址和端口:MASTER_ADDR、MASTER_PORT。 如果init_process_group函数中未指定排名world_size参数,则还可以分别从以下两个环境变量中读取它们:RANK、WORLD_SIZE。

另一个重要步骤,特别是当每个节点有多个 gpu 时,设置此过程local_rank。 例如,如果您有两个节点,每个节点具有 8 个 GPU,并且希望对所有节点进行训练,然后进行训练 ,然后 [(world__size_16])和每个节点将有一个本地排名为 0-7 的进程。 此local_rank用于为进程设置设备(即使用哪个 GPU),并在创建分布式数据并行模型时用于设置设备。 还建议在此假设环境中使用 NCCL 后端,因为 NCCL 是多 gpu 节点的首选。

print("Initialize Process Group...")
# Initialize Process Group
# v1 - init with url
dist.init_process_group(backend=dist_backend, init_method=dist_url, rank=int(sys.argv[1]), world_size=world_size)
# v2 - init with file
# dist.init_process_group(backend="nccl", init_method="file:///home/ubuntu/pt-distributed-tutorial/trainfile", rank=int(sys.argv[1]), world_size=world_size)
# v3 - init with environment variables
# dist.init_process_group(backend="nccl", init_method="env://", rank=int(sys.argv[1]), world_size=world_size)


# Establish Local Rank and set device on this node
local_rank = int(sys.argv[2])
dp_device_ids = [local_rank]
torch.cuda.set_device(local_rank)

初始化模型|

下一个主要步骤是初始化要训练的模型。 在这里,我们将使用来自torchvision.modelsresnet18 模型。 首先,我们初始化模型并将其放置在 GPU 内存中。 接下来,我们制作模型DistributedDataParallel它处理数据在模型的分布,并且对于分布式训练至关重要。 DistributedDataParallel模块还处理全世界渐变的平均值,因此我们不必在训练步骤中显式平均渐变。

请务必注意,这是一个阻塞函数,这意味着程序执行将在此函数等待,直到world_size进程加入进程组。 此外,请注意,我们将设备 ID 列表作为包含我们正在使用的本地排名(即 GPU)的参数传递。 最后,指定了模型的损耗函数和优化器。

print("Initialize Model...")
# Construct Model
model = models.resnet18(pretrained=False).cuda()
# Make model DistributedDataParallel
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=dp_device_ids, output_device=local_rank)

# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), starting_lr, momentum=0.9, weight_decay=1e-4)

初始化数据加载器|

准备培训的最后一步是指定要使用的数据集。 在这里,我们使用来自火炬视觉的STL-10数据集 STL10 数据集是包含 96x96px 彩色图像的 10 类数据集。 为了与模型一起使用,我们会在转换中将图像大小调整为 224x224px。 本节中的一个分布式训练特定项目是使用DistributedSampler进行训练集,该训练集旨在与DistributedDataParallel模型结合使用。 此对象处理跨分布式环境的数据集分区,以便并非所有模型都在同一数据子集上定型,这会产生反作用。 最后,我们创建负责将数据馈送到进程的DataLoader的。

如果节点不存在,STL-10 数据集将自动下载。 如果要使用自己的数据集,则应下载数据,编写自己的数据集处理程序,并在此处为数据集构造数据加载程序。

print("Initialize Dataloaders...")
# Define the transform for the data. Notice, we must resize to 224x224 with this dataset and model.
transform = transforms.Compose(
    [transforms.Resize(224),
     transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

# Initialize Datasets. STL10 will automatically download if not present
trainset = datasets.STL10(root='./data', split='train', download=True, transform=transform)
valset = datasets.STL10(root='./data', split='test', download=True, transform=transform)

# Create DistributedSampler to handle distributing the dataset across nodes when training
# This can only be called after torch.distributed.init_process_group is called
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)

# Create the Dataloaders to feed data to the training and validation steps
train_loader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=workers, pin_memory=False, sampler=train_sampler)
val_loader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=workers, pin_memory=False)

培训循环|

最后一步是定义训练循环。 我们已经完成了设置分布式培训的大部分工作,因此这不是特定于分布式培训的。 唯一的细节是在DistributedSampler中设置当前纪元计数,因为采样器根据纪元决定地将数据随机排列到每个进程。 更新采样器后,循环将运行一个完整的训练纪元,运行一个完整的验证步骤,然后根据目前性能最好的模型打印当前模型的性能。 在num_epochs培训后,循环退出,本教程完成。 请注意,由于这是一个练习,我们不是保存模型,但您可能希望跟踪性能最佳的模型,然后在培训结束时保存它(请参阅此处)。

best_prec1 = 0

for epoch in range(num_epochs):
    # Set epoch count for DistributedSampler
    train_sampler.set_epoch(epoch)

    # Adjust learning rate according to schedule
    adjust_learning_rate(starting_lr, optimizer, epoch)

    # train for one epoch
    print("\nBegin Training Epoch {}".format(epoch+1))
    train(train_loader, model, criterion, optimizer, epoch)

    # evaluate on validation set
    print("Begin Validation @ Epoch {}".format(epoch+1))
    prec1 = validate(val_loader, model, criterion)

    # remember best prec@1 and save checkpoint if desired
    # is_best = prec1 > best_prec1
    best_prec1 = max(prec1, best_prec1)

    print("Epoch Summary: ")
    print("\tEpoch Accuracy: {}".format(prec1))
    print("\tBest Accuracy: {}".format(best_prec1))

运行代码|

与大多数其他 PyTorch 教程不同,此代码可能不会直接运行在此笔记本中。 要运行,请下载此文件的 .py 版本(或使用此版本进行转换),并将副本上载到两个节点。 精明的读者会注意到,我们硬编码了节点0-私有IP+(世界_size[4]),但分别输入排名local_rank输入作为arg_1]和arg_2_命令行参数。 上传后,将两个 ssh 终端打开到每个节点。

  • 在节点0的第一个终端上,运行$ python main.py 0

  • 在节点0的第二个终端上运行$ python main.py 1 1

  • 在节点1的第一个终端上,运行$ python main.py 2 0

  • 在节点1的第二个终端上运行$ python main.py 3 1

打印"初始化模型..."后,程序将启动并等待。所有四个进程加入流程组。 请注意,第一个参数不会重复,因为这是进程的唯一全局排名。 重复第二个参数,因为这是节点上运行的进程的本地排名。 如果在每个节点上运行nvidia-smi您将在每个节点上看到两个进程,一个在 GPU0 上运行,一个在 GPU1 上运行。

现在,我们已经完成了分布式培训示例! 希望您能看到如何使用本教程来帮助在您自己的数据集上训练您自己的模型,即使您没有使用完全相同的分布式环境。 如果您使用的是 AWS,如果您不使用 AWS,则不要忘记关闭您的节点,或者您可能会在月底发现一张令人不快的大账单。

下一步将转到何处

脚本总运行时间: ( 0 分 0.043 秒)

由狮身人面像库生成的画廊