模型并行化最佳实践

作者Shen Li

译者Hamish

校验Hamish

模型并行化在分布式训练技术中被广泛使用。先前的文章已经解释了如何使用DataParallel在多个GPU上训练神经网络。此功能将相同的模型复制到所有GPU,其中每个GPU负责消化输入数据的不同部分。尽管它可以极大地加快训练过程,但不适用于某些模型太大而无法被单个GPU容纳的用例。这篇文章展示了如何通过使用模型化来解决该问题,与DataParallel相比,模型并行化将单个模型拆分到不同的GPU上,而不是在每个GPU上复制整个模型(具体来说,模型m包含10层:使用DataParallel时,每个GPU都具有这10层中每个层的副本,而当在两个GPU上使用模型并行化时,每个GPU可以承载5层。

模型并行化的高级思想是将模型的不同子网络放置在不同的设备上,并相应地实现forward方法,以在设备之间传递中间输出。由于只需模型的一部分就能能在任何独立设备上运行,因此一组设备可以共同为更大的模型服务。在本文中,我们不会尝试构建庞大的模型并将其压缩到有限数量的GPU中。相反,本文着重展示模型并行化的思想。读者可以将这些想法应用到实际应用中。

基本用法

让我们从包含两个线性层的玩具模型开始。要在两个GPU上运行此模型,只需将每个线性层放在不同的GPU上,然后将输入和中间输出传递到匹配的层设备。

  1. import torch
  2. import torch.nn as nn
  3. import torch.optim as optim
  4. class ToyModel(nn.Module):
  5. def __init__(self):
  6. super(ToyModel, self).__init__()
  7. self.net1 = torch.nn.Linear(10, 10).to('cuda:0')
  8. self.relu = torch.nn.ReLU()
  9. self.net2 = torch.nn.Linear(10, 5).to('cuda:1')
  10. def forward(self, x):
  11. x = self.relu(self.net1(x.to('cuda:0')))
  12. return self.net2(x.to('cuda:1'))

请注意,除去五个to(device)调用将线性层和张量放置在适当的设备上之外,上面的ToyModel看起来非常类似于在单个GPU上实现它的方式。那是模型中唯一需要更改的地方。backward()torch.optim将自动处理梯度,就像模型在一个GPU上一样。你只需确保调用损失函数时标签与输出在同一设备上。

  1. model = ToyModel()
  2. loss_fn = nn.MSELoss()
  3. optimizer = optim.SGD(model.parameters(), lr=0.001)
  4. optimizer.zero_grad()
  5. outputs = model(torch.randn(20, 10))
  6. labels = torch.randn(20, 5).to('cuda:1')
  7. loss_fn(outputs, labels).backward()
  8. optimizer.step()

在现有模块上应用模型并行化

只需更改几行,就可以在多个GPU上运行现有的单GPU模块。以下代码显示了如何将torchvision.models.reset50()分解到两个GPU上。基本想法是从现有的ResNet模块继承,并在构建过程中将层划分到两个GPU。然后,通过传递对应的中间输出,覆写forward方法以将两个子网络拼合。

  1. from torchvision.models.resnet import ResNet, Bottleneck
  2. num_classes = 1000
  3. class ModelParallelResNet50(ResNet):
  4. def __init__(self, *args, **kwargs):
  5. super(ModelParallelResNet50, self).__init__(
  6. Bottleneck, [3, 4, 6, 3], num_classes=num_classes, *args, **kwargs)
  7. self.seq1 = nn.Sequential(
  8. self.conv1,
  9. self.bn1,
  10. self.relu,
  11. self.maxpool,
  12. self.layer1,
  13. self.layer2
  14. ).to('cuda:0')
  15. self.seq2 = nn.Sequential(
  16. self.layer3,
  17. self.layer4,
  18. self.avgpool,
  19. ).to('cuda:1')
  20. self.fc.to('cuda:1')
  21. def forward(self, x):
  22. x = self.seq2(self.seq1(x).to('cuda:1'))
  23. return self.fc(x.view(x.size(0), -1))

对于模型太大而无法放入单个GPU的情况,上述实现解决了该问题。但是,你可能已经注意到,if your model fits,这一实现将比在单个GPU上运行还要慢。这是因为在任何时间点,两个GPU中只有一个在工作,而另一个在那儿什么也没做。而由于中间输出需要在layer2layer3之间从cuda:0复制到cuda:1,这导致性能进一步下降。

让我们进行实验以更定量地了解执行时间。在本实验中,我们通过运行随机输入和标签来训练ModelParallelResNet50和现有的torchvision.models.reset50()。训练后,模型不会产生任何有用的预测,但是我们可以对执行时间有一个合理的了解。

  1. import torchvision.models as models
  2. num_batches = 3
  3. batch_size = 120
  4. image_w = 128
  5. image_h = 128
  6. def train(model):
  7. model.train(True)
  8. loss_fn = nn.MSELoss()
  9. optimizer = optim.SGD(model.parameters(), lr=0.001)
  10. one_hot_indices = torch.LongTensor(batch_size) \
  11. .random_(0, num_classes) \
  12. .view(batch_size, 1)
  13. for _ in range(num_batches):
  14. # generate random inputs and labels
  15. inputs = torch.randn(batch_size, 3, image_w, image_h)
  16. labels = torch.zeros(batch_size, num_classes) \
  17. .scatter_(1, one_hot_indices, 1)
  18. # run forward pass
  19. optimizer.zero_grad()
  20. outputs = model(inputs.to('cuda:0'))
  21. # run backward pass
  22. labels = labels.to(outputs.device)
  23. loss_fn(outputs, labels).backward()
  24. optimizer.step()

上述train(model)方法使用nn.MSELoss作为损失函数,并使用optim.SGD作为优化器。它模拟了对128 X 128图像的训练,这些图像分为3个batch,每个batch包含120幅图像。然后,我们使用timeit来运行train(model)方法10次,并绘制带有标准差的执行时间。

  1. import matplotlib.pyplot as plt
  2. plt.switch_backend('Agg')
  3. import numpy as np
  4. import timeit
  5. num_repeat = 10
  6. stmt = "train(model)"
  7. setup = "model = ModelParallelResNet50()"
  8. # globals arg is only available in Python 3. In Python 2, use the following
  9. # import __builtin__
  10. # __builtin__.__dict__.update(locals())
  11. mp_run_times = timeit.repeat(
  12. stmt, setup, number=1, repeat=num_repeat, globals=globals())
  13. mp_mean, mp_std = np.mean(mp_run_times), np.std(mp_run_times)
  14. setup = "import torchvision.models as models;" + \
  15. "model = models.resnet50(num_classes=num_classes).to('cuda:0')"
  16. rn_run_times = timeit.repeat(
  17. stmt, setup, number=1, repeat=num_repeat, globals=globals())
  18. rn_mean, rn_std = np.mean(rn_run_times), np.std(rn_run_times)
  19. def plot(means, stds, labels, fig_name):
  20. fig, ax = plt.subplots()
  21. ax.bar(np.arange(len(means)), means, yerr=stds,
  22. align='center', alpha=0.5, ecolor='red', capsize=10, width=0.6)
  23. ax.set_ylabel('ResNet50 Execution Time (Second)')
  24. ax.set_xticks(np.arange(len(means)))
  25. ax.set_xticklabels(labels)
  26. ax.yaxis.grid(True)
  27. plt.tight_layout()
  28. plt.savefig(fig_name)
  29. plt.close(fig)
  30. plot([mp_mean, rn_mean],
  31. [mp_std, rn_std],
  32. ['Model Parallel', 'Single GPU'],
  33. 'mp_vs_rn.png')

模型并行化最佳实践 - 图1

结果表明,模型并行实现的执行时间比现有的单GPU实现长4.02 / 3.75-1 = 7%。因此,我们可以得出结论,在GPU之间来回复制张量大约有7%的开销。这里有改进的空间,因为我们知道两个GPU之一在整个执行过程中处于空闲状态。一种选择是将每个批次进一步划分为拆分流水线,以便当一个拆分到达第二子网络时,可以将下一个拆分投入第一子网络。这样,两个连续的拆分可以在两个GPU上同时运行。

用流水线输入加速

在以下实验中,我们将每个120图像batch进一步划分为20图像分割。当PyTorch异步启动CUDA操作时,该实现无需生成多个线程即可实现并发。

  1. class PipelineParallelResNet50(ModelParallelResNet50):
  2. def __init__(self, split_size=20, *args, **kwargs):
  3. super(PipelineParallelResNet50, self).__init__(*args, **kwargs)
  4. self.split_size = split_size
  5. def forward(self, x):
  6. splits = iter(x.split(self.split_size, dim=0))
  7. s_next = next(splits)
  8. s_prev = self.seq1(s_next).to('cuda:1')
  9. ret = []
  10. for s_next in splits:
  11. # A. s_prev runs on cuda:1
  12. s_prev = self.seq2(s_prev)
  13. ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
  14. # B. s_next runs on cuda:0, which can run concurrently with A
  15. s_prev = self.seq1(s_next).to('cuda:1')
  16. s_prev = self.seq2(s_prev)
  17. ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))
  18. return torch.cat(ret)
  19. setup = "model = PipelineParallelResNet50()"
  20. pp_run_times = timeit.repeat(
  21. stmt, setup, number=1, repeat=num_repeat, globals=globals())
  22. pp_mean, pp_std = np.mean(pp_run_times), np.std(pp_run_times)
  23. plot([mp_mean, rn_mean, pp_mean],
  24. [mp_std, rn_std, pp_std],
  25. ['Model Parallel', 'Single GPU', 'Pipelining Model Parallel'],
  26. 'mp_vs_rn_vs_pp.png')

请注意,设备到设备的张量复制操作在源设备和目标设备上的当前线程上同步。如果创建多个线程,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能导致不确定的行为。上面的实现仅在源设备和目标设备上都使用默认线程,因此没有必要另外强制执行同步。

模型并行化最佳实践 - 图2

实验结果表明,对并行ResNet50进行建模的流水线输入可将训练过程大致加快3.75 / 2.51-1 = 49%。距离理想的100%加速还有很长的路要走。由于我们在管道并行实现中引入了新参数split_sizes,因此尚不清楚新参数如何影响整体训练时间。直观地讲,使用较小的split_size会导致许多小的CUDA内核启动,而使用较大的split_size结果会导致在第一次和最后一次拆分期间有较长的空闲时间。都不是最佳选择。对于此特定实验,可能会有最佳的split_size配置。让我们尝试通过使用几个不同的split_size值进行实验来找到它。

  1. means = []
  2. stds = []
  3. split_sizes = [1, 3, 5, 8, 10, 12, 20, 40, 60]
  4. for split_size in split_sizes:
  5. setup = "model = PipelineParallelResNet50(split_size=%d)" % split_size
  6. pp_run_times = timeit.repeat(
  7. stmt, setup, number=1, repeat=num_repeat, globals=globals())
  8. means.append(np.mean(pp_run_times))
  9. stds.append(np.std(pp_run_times))
  10. fig, ax = plt.subplots()
  11. ax.plot(split_sizes, means)
  12. ax.errorbar(split_sizes, means, yerr=stds, ecolor='red', fmt='ro')
  13. ax.set_ylabel('ResNet50 Execution Time (Second)')
  14. ax.set_xlabel('Pipeline Split Size')
  15. ax.set_xticks(split_sizes)
  16. ax.yaxis.grid(True)
  17. plt.tight_layout()
  18. plt.savefig("split_size_tradeoff.png")
  19. plt.close(fig)

模型并行化最佳实践 - 图3

结果表明,将split_size设置为12可获得最快的训练速度,从而导致3.75 / 2.43-1 = 54%的加速比。仍有机会进一步加快训练过程。例如,对cuda:0的所有操作都放在其默认线程上。这意味着下一个拆分的计算不能与上一个拆分的复制操作重叠。但是,由于上一个和下一个拆分是不同的张量,因此将一个的计算与另一个的赋值操作重叠是没有问题的。实现需要在两个GPU上使用多个线程,并且不同的子网络结构需要不同的线程管理策略。由于没有通用的多线程解决方案适用于所有模型并行化用例,因此在本教程中将不再讨论。

注:

这篇文章显示了几个性能指标。在您自己的计算机上运行相同的代码时,您可能会看到不同的数字,因为结果取决于底层的硬件和软件。为了使您的环境获得最佳性能,一种正确的方法是首先生成曲线以找出最佳split_size,然后将该split_size用于流水线输入。

脚本的总运行时间: (5分钟51.519秒)