分布式并行训练

概述

MindSpore支持数据并行及自动并行。自动并行是MindSpore融合了数据并行、模型并行及混合并行的一种分布式并行模式,可以自动建立代价模型,为用户选择一种并行模式。

其中:

  • 数据并行(Data Parallel):对数据batch维度切分的一种并行模式。

  • 模型并行(Layerwise Parallel):对参数channel维度切分的一种并行模式。

  • 混合并行(Hybrid Parallel):涵盖数据并行和模型并行的一种并行模式。

  • 代价模型(Cost Model):同时考虑内存的计算代价和通信代价对训练时间建模,并设计了高效的算法来找到训练时间较短的并行策略。

本篇教程我们主要了解如何在MindSpore上通过数据并行及自动并行模式训练ResNet-50网络。样例代码请参考 https://gitee.com/mindspore/docs/blob/master/tutorials/tutorial_code/distributed_training/resnet50_distributed_training.py

当前样例面向Ascend AI处理器。

准备环节

配置分布式环境变量

在实验室环境进行分布式训练时,需要配置当前多卡环境的组网信息文件。如果使用华为云环境,可以跳过本小节。

以Ascend 910 AI处理器、1980 AIServer为例,一个两卡环境的json配置文件示例如下,本样例将该配置文件命名为rank_table.json。

  1. Copy{
  2. "board_id": "0x0000",
  3. "chip_info": "910",
  4. "deploy_mode": "lab",
  5. "group_count": "1",
  6. "group_list": [
  7. {
  8. "device_num": "2",
  9. "server_num": "1",
  10. "group_name": "",
  11. "instance_count": "2",
  12. "instance_list": [
  13. {"devices":[{"device_id":"0","device_ip":"192.1.27.6"}],"rank_id":"0","server_id":"10.155.111.140"},
  14. {"devices":[{"device_id":"1","device_ip":"192.2.27.6"}],"rank_id":"1","server_id":"10.155.111.140"}
  15. ]
  16. }
  17. ],
  18. "para_plane_nic_location": "device",
  19. "para_plane_nic_name": [
  20. "eth0", "eth1"
  21. ],
  22. "para_plane_nic_num": "2",
  23. "status": "completed"
  24. }

其中需要根据实际训练环境修改的参数项有:

  • server_num表示机器数量, server_id表示本机IP地址。

  • device_numpara_plane_nic_numinstance_count表示卡的数量。

  • rank_id表示卡逻辑序号,固定从0开始编号,device_id表示卡物理序号,即卡所在机器中的实际序号。

  • device_ip表示网卡IP地址,可以在当前机器执行指令cat /etc/hccn.conf获取网卡IP地址。

  • para_plane_nic_name对应网卡名称。

组网信息文件准备好后,将文件路径加入环境变量MINDSPORE_HCCL_CONFIG_PATH中。此外需要将device_id信息传入脚本中,本样例通过配置环境变量DEVICE_ID的方式传入。

  1. Copyexport MINDSPORE_HCCL_CONFIG_PATH="./rank_table.json"
  2. export DEVICE_ID=0

调用集合通信库

我们需要在context.set_context()接口中使能分布式接口enable_hccl,设置device_id参数,并通过调用init()完成初始化操作。

在样例中,我们指定运行时使用图模式,在Ascend AI处理器上,使用华为集合通信库Huawei Collective Communication Library(以下简称HCCL)。

  1. Copyimport os
  2. from mindspore import context
  3. from mindspore.communication.management import init
  4.  
  5. if __name__ == "__main__":
  6. context.set_context(mode=context.GRAPH_MODE, device_target="Ascend", enable_hccl=True, device_id=int(os.environ["DEVICE_ID"]))
  7. init()
  8. ...

mindspore.communication.management中封装了HCCL提供的集合通信接口,方便用户获取分布式信息。常用的包括get_rankget_group_size,分别对应当前设备在集群中的ID和集群数量。

HCCL实现了基于Davinci架构芯片的多机多卡通信。当前使用分布式服务存在如下约束:

  1. 单机场景下支持1、2、4、8卡设备集群,多机场景下支持8*n卡设备集群。

  2. 每台机器的0-3卡和4-7卡各为一个组网,2卡和4卡训练时网卡必须相连且不支持跨组网创建集群。

  3. 操作系统需使用SMP (symmetric multiprocessing)处理模式。

加载数据集

分布式训练时,数据是以数据并行的方式导入的。下面我们以Cifar10Dataset为例,介绍以数据并行方式导入CIFAR-10数据集的方法,data_path是指数据集的路径。与单机不同的是,在数据集接口需要传入num_shardsshard_id参数,分别对应网卡数量和逻辑序号,建议通过HCCL接口获取。

  1. Copyimport mindspore.common.dtype as mstype
  2. import mindspore.dataset as ds
  3. import mindspore.dataset.transforms.c_transforms as C
  4. import mindspore.dataset.transforms.vision.c_transforms as vision
  5. from mindspore.communication.management import get_rank, get_group_size
  6.  
  7. def create_dataset(repeat_num=1, batch_size=32, rank_id=0, rank_size=1):
  8. resize_height = 224
  9. resize_width = 224
  10. rescale = 1.0 / 255.0
  11. shift = 0.0
  12.  
  13. # get rank_id and rank_size
  14. rank_id = get_rank()
  15. rank_size = get_group_size()
  16. data_set = ds.Cifar10Dataset(data_path, num_shards=rank_size, shard_id=rank_id)
  17.  
  18. # define map operations
  19. random_crop_op = vision.RandomCrop((32, 32), (4, 4, 4, 4))
  20. random_horizontal_op = vision.RandomHorizontalFlip()
  21. resize_op = vision.Resize((resize_height, resize_width))
  22. rescale_op = vision.Rescale(rescale, shift)
  23. normalize_op = vision.Normalize((0.4465, 0.4822, 0.4914), (0.2010, 0.1994, 0.2023))
  24. changeswap_op = vision.HWC2CHW()
  25. type_cast_op = C.TypeCast(mstype.int32)
  26.  
  27. c_trans = [random_crop_op, random_horizontal_op]
  28. c_trans += [resize_op, rescale_op, normalize_op, changeswap_op]
  29.  
  30. # apply map operations on images
  31. data_set = data_set.map(input_columns="label", operations=type_cast_op)
  32. data_set = data_set.map(input_columns="image", operations=c_trans)
  33.  
  34. # apply repeat operations
  35. data_set = data_set.repeat(repeat_num)
  36.  
  37. # apply shuffle operations
  38. data_set = data_set.shuffle(buffer_size=10)
  39.  
  40. # apply batch operations
  41. data_set = data_set.batch(batch_size=batch_size, drop_remainder=True)
  42.  
  43. return data_set

定义网络

数据并行及自动并行模式下,网络定义方式与单机一致。代码请参考: https://gitee.com/mindspore/docs/blob/master/tutorials/tutorial_code/resnet/resnet.py

定义损失函数及优化器

定义损失函数

在Loss部分,我们采用SoftmaxCrossEntropyWithLogits的展开形式,即按照数学公式,将其展开为多个小算子进行实现。相较于融合loss,自动并行以展开loss中的算子为粒度,通过算法搜索得到最优并行策略。

  1. Copyfrom mindspore.ops import operations as P
  2. from mindspore import Tensor
  3. import mindspore.ops.functional as F
  4. import mindspore.common.dtype as mstype
  5. import mindspore.nn as nn
  6.  
  7. class SoftmaxCrossEntropyExpand(nn.Cell):
  8. def __init__(self, sparse=False):
  9. super(SoftmaxCrossEntropyExpand, self).__init__()
  10. self.exp = P.Exp()
  11. self.sum = P.ReduceSum(keep_dims=True)
  12. self.onehot = P.OneHot()
  13. self.on_value = Tensor(1.0, mstype.float32)
  14. self.off_value = Tensor(0.0, mstype.float32)
  15. self.div = P.Div()
  16. self.log = P.Log()
  17. self.sum_cross_entropy = P.ReduceSum(keep_dims=False)
  18. self.mul = P.Mul()
  19. self.mul2 = P.Mul()
  20. self.mean = P.ReduceMean(keep_dims=False)
  21. self.sparse = sparse
  22. self.max = P.ReduceMax(keep_dims=True)
  23. self.sub = P.Sub()
  24.  
  25. def construct(self, logit, label):
  26. logit_max = self.max(logit, -1)
  27. exp = self.exp(self.sub(logit, logit_max))
  28. exp_sum = self.sum(exp, -1)
  29. softmax_result = self.div(exp, exp_sum)
  30. if self.sparse:
  31. label = self.onehot(label, F.shape(logit)[1], self.on_value, self.off_value)
  32. softmax_result_log = self.log(softmax_result)
  33. loss = self.sum_cross_entropy((self.mul(softmax_result_log, label)), -1)
  34. loss = self.mul2(F.scalar_to_array(-1.0), loss)
  35. loss = self.mean(loss, -1)
  36.  
  37. return loss

定义优化器

采用Momentum优化器作为参数更新工具,这里定义与单机一致。

  1. Copyfrom mindspore.nn.optim.momentum import Momentum
  2. lr = 0.01
  3. momentum = 0.9
  4. opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), lr, momentum)

训练网络

context.set_auto_parallel_context()是提供给用户设置并行参数的接口。主要参数包括:

  • parallel_mode:分布式并行模式。可选数据并行ParallelMode.DATA_PARALLEL及自动并行ParallelMode.AUTO_PARALLEL

  • mirror_mean: 反向计算时,框架内部会将数据并行参数分散在多台机器的梯度进行收集,得到全局梯度值后再传入优化器中更新。设置为True对应allreduce_mean操作,False对应allreduce_sum操作。

在下面的样例中我们指定并行模式为自动并行,其中dataset_sink_mode=False表示采用数据非下沉模式,LossMonitor能够通过回调函数返回loss值。

  1. Copyfrom mindspore.nn.optim.momentum import Momentum
  2. from mindspore.train.callback import LossMonitor
  3. from mindspore.train.model import Model, ParallelMode
  4. from resnet import resnet50
  5.  
  6. def test_train_cifar(num_classes=10, epoch_size=10):
  7. context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, mirror_mean=True)
  8. loss_cb = LossMonitor()
  9. dataset = create_dataset(epoch_size)
  10. net = resnet50(32, num_classes)
  11. loss = SoftmaxCrossEntropyExpand(sparse=True)
  12. opt = Momentum(filter(lambda x: x.requires_grad, net.get_parameters()), 0.01, 0.9)
  13. model = Model(net, loss_fn=loss, optimizer=opt)
  14. model.train(epoch_size, dataset, callbacks=[loss_cb], dataset_sink_mode=False)

运行测试用例

目前MindSpore分布式执行采用单卡单进程运行方式,进程数量应当与卡的使用数量保持一致。每个进程创建一个目录,用来保存日志信息以及算子编译信息。下面以一个2卡分布式训练的运行脚本为例:

  1. Copy #!/bin/bash
  2.  
  3. export MINDSPORE_HCCL_CONFIG_PATH=./rank_table.json
  4. export RANK_SIZE=2
  5. for((i=0;i<$RANK_SIZE;i++))
  6. do
  7. mkdir device$i
  8. cp ./resnet50_distributed_training.py ./device$i
  9. cd ./device$i
  10. export RANK_ID=$i
  11. export DEVICE_ID=$i
  12. echo "start training for device $i"
  13. env > env$i.log
  14. pytest -s -v ./resnet50_distributed_training.py > log$i 2>&1 &
  15. cd ../
  16. done