流水并行训练

常见的分布式并行策略 一文中介绍了流水并行的特点。

在 OneFlow 的 一致性视角 下,通过简单的设置 Tensor 的 placement 属性,就可以实现流水并行。

以下代码是简单的示范,它将 快速上手 中的网络,以流水并行的方式运行。前几层的 Module nn.Flattennn.Linear(28*28, 512)nn.ReLU() 在 GPU0 上运行;剩余的网络部分在 GPU1 上运行。

Code

  1. import oneflow as flow
  2. BATCH_SIZE = 16
  3. BROADCAST = [flow.sbp.broadcast]
  4. P0 = flow.placement("cuda", {0: [0]})
  5. P1 = flow.placement("cuda", {0: [1]})
  6. class Stage0Module(flow.nn.Module):
  7. def __init__(self):
  8. super().__init__()
  9. self.flatten = flow.nn.Flatten()
  10. self.linear0 = flow.nn.Linear(28*28, 512)
  11. self.relu0 = flow.nn.ReLU()
  12. def forward(self, x):
  13. out = self.flatten(x)
  14. out = self.linear0(out)
  15. out = self.relu0(out)
  16. return out
  17. class Stage1Module(flow.nn.Module):
  18. def __init__(self):
  19. super().__init__()
  20. self.linear1 = flow.nn.Linear(512, 512)
  21. self.relu1 = flow.nn.ReLU()
  22. self.linear2 = flow.nn.Linear(512, 10)
  23. self.relu2 = flow.nn.ReLU()
  24. def forward(self, x):
  25. out = self.linear1(x)
  26. out = self.relu1(out)
  27. out = self.linear2(out)
  28. out = self.relu2(out)
  29. return out
  30. class PipelineModule(flow.nn.Module):
  31. def __init__(self):
  32. super().__init__()
  33. self.m_stage0 = Stage0Module()
  34. self.m_stage1 = Stage1Module()
  35. self.m_stage0.to_consistent(placement=P0, sbp=BROADCAST)
  36. self.m_stage1.to_consistent(placement=P1, sbp=BROADCAST)
  37. def forward(self, x):
  38. out_stage0 = self.m_stage0(x)
  39. in_stage1 = out_stage0.to_consistent(placement=P1, sbp=BROADCAST)
  40. out_stage1 = self.m_stage1(in_stage1)
  41. return out_stage1
  42. module_pipeline = PipelineModule()
  43. sgd = flow.optim.SGD(module_pipeline.parameters(), lr=0.001)
  44. class PipelineGraph(flow.nn.Graph):
  45. def __init__(self):
  46. super().__init__()
  47. self.module_pipeline = module_pipeline
  48. self.module_pipeline.m_stage0.config.stage_id = 0
  49. self.module_pipeline.m_stage1.config.stage_id = 1
  50. self.loss_fn = flow.nn.CrossEntropyLoss()
  51. self.config.set_gradient_accumulation_steps(2)
  52. self.add_optimizer(sgd)
  53. def build(self, x, y):
  54. out = self.module_pipeline(x)
  55. loss = self.loss_fn(out, y)
  56. loss.backward()
  57. return loss
  58. graph_pipeline = PipelineGraph()
  59. x = flow.randn(BATCH_SIZE, 1, 28, 28)
  60. x = x.to_consistent(P0, BROADCAST)
  61. y = flow.randint(0, 10, (BATCH_SIZE,))
  62. y = y.to_consistent(P1, BROADCAST)
  63. for i in range(20):
  64. loss = graph_pipeline(x, y)
  65. print(loss.to_local())

以上代码,保存为脚本(如 pipeline.py)后,使用 launch 模块启动分布式训练

  1. python3 -m oneflow.distributed.launch --nproc_per_node 2 ./pipeline.py

代码解读

设置 placement 与 sbp

将需要使用的 placement 与 sbp 设置提前准备好:

  1. BROADCAST = [flow.sbp.broadcast]
  2. P0 = flow.placement("cuda", {0: [0]})
  3. P1 = flow.placement("cuda", {0: [1]})

P0P1 分别代表第0号机器上的第0个 GPU 和第1个 GPU。

通过调用 nn.Module.to_consistentTensor.to_consistent 就可以将模型或张量分配到指定的计算设备上运行,将一个网络拆分为多个流水阶段(stage)。

在此我们定义了一个 PipelineModule 专门设置各阶段的流水。

  1. class PipelineModule(flow.nn.Module):
  2. def __init__(self):
  3. #...
  4. self.m_stage0.to_consistent(placement=P0, sbp=BROADCAST)
  5. self.m_stage1.to_consistent(placement=P1, sbp=BROADCAST)
  6. def forward(self, x):
  7. out_stage0 = self.m_stage0(x)
  8. in_stage1 = out_stage0.to_consistent(placement=P1, sbp=BROADCAST)
  9. out_stage1 = self.m_stage1(in_stage1)
  10. return out_stage1

Local Tensor 与 Consistent Tensor 的转换

示例中使用了随机生成的数据作为输入。

  1. x = flow.randn(BATCH_SIZE, 1, 28, 28)
  2. x = x.to_consistent(P0, BROADCAST)

当使用 launch 模块启动训练时,因为命令行参数为 --nproc_per_node 2launch 会启动 2 个进程。两个进程均为执行脚本中的代码。

其中 x = flow.randn(BATCH_SIZE, 1, 28, 28) 返回的是 Local Tensor(只在本进程中有效的本地数据),当运行 x = x.to_consistent(P0, BROADCAST) 时,OneFlow 会自动将所有进程中的 Local Tensor 整合为 Consistent Tensor。

在实际训练中,各个计算设备也可以加载属于各自的本地数据,然后通过 to_consistent 实现 Local Tensor 到 Consistent Tensor 的转化。

Stage ID 及 梯度累积设置

通过设置 Module 的 config.stage_id 属性,设置 Stage ID,Stage ID 从0开始编号,依次加1。 调用 self.config.set_gradient_accumulation_steps 方法,设置梯度累积的步长。 OneFlow 通过这两项配置,获取实现流水并行中的 micro batch 技术所需的信息。

  1. self.module_pipeline.m_stage0.config.stage_id = 0
  2. self.module_pipeline.m_stage1.config.stage_id = 1
  3. self.config.set_gradient_accumulation_steps(2)

为正常使用来必力评论功能请激活JavaScript