分布式训练快速开始

准备工作

在本篇文章中,我们将会在介绍如何快速在一个集群中启动一个 PaddlePaddle的分布式训练任务,在开始之前,请按如下步骤做些准备工作:

  • 准备一个网络连通的训练集群,在本文中我们使用4个训练节点使用 *.paddlepaddle.com来表示节点的主机名称,您可以根据实际情况修改它。
  • 在开始之前确保已经阅读过 install_steps并且可以在集群的所有节点上可以正常运行 PaddlePaddle。

样例代码

下面使用一个非常简单的线性回归模型作为样例来解释如何启动一个包含2个 PSERVER 节点以及2个 TRAINER 节点的分布式训练任务,您可以将本段代码保存为 dist_train.py 运行。

  1. import os
  2. import paddle
  3. import paddle.fluid as fluid
  4.  
  5. # train reader
  6. BATCH_SIZE = 20
  7. EPOCH_NUM = 30
  8. BATCH_SIZE = 8
  9.  
  10. train_reader = paddle.batch(
  11. paddle.reader.shuffle(
  12. paddle.dataset.uci_housing.train(), buf_size=500),
  13. batch_size=BATCH_SIZE)
  14.  
  15. def train():
  16. y = fluid.layers.data(name='y', shape=[1], dtype='float32')
  17. x = fluid.layers.data(name='x', shape=[13], dtype='float32')
  18. y_predict = fluid.layers.fc(input=x, size=1, act=None)
  19.  
  20. loss = fluid.layers.square_error_cost(input=y_predict, label=y)
  21. avg_loss = fluid.layers.mean(loss)
  22. opt = fluid.optimizer.SGD(learning_rate=0.001)
  23. opt.minimize(avg_loss)
  24.  
  25. place = fluid.CPUPlace()
  26. feeder = fluid.DataFeeder(place=place, feed_list=[x, y])
  27. exe = fluid.Executor(place)
  28.  
  29. # fetch distributed training environment setting
  30. training_role = os.getenv("PADDLE_TRAINING_ROLE", None)
  31. port = os.getenv("PADDLE_PSERVER_PORT", "6174")
  32. pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
  33. trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
  34. eplist = []
  35. for ip in pserver_ips.split(","):
  36. eplist.append(':'.join([ip, port]))
  37. pserver_endpoints = ",".join(eplist)
  38. trainers = int(os.getenv("PADDLE_TRAINERS"))
  39. current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
  40.  
  41. t = fluid.DistributeTranspiler()
  42. t.transpile(
  43. trainer_id = trainer_id,
  44. pservers = pserver_endpoints,
  45. trainers = trainers)
  46.  
  47. if training_role == "PSERVER":
  48. pserver_prog = t.get_pserver_program(current_endpoint)
  49. startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
  50. exe.run(startup_prog)
  51. exe.run(pserver_prog)
  52. elif training_role == "TRAINER":
  53. trainer_prog = t.get_trainer_program()
  54. exe.run(fluid.default_startup_program())
  55.  
  56. for epoch in range(EPOCH_NUM):
  57. for batch_id, batch_data in enumerate(train_reader()):
  58. avg_loss_value, = exe.run(trainer_prog,
  59. feed=feeder.feed(batch_data),
  60. fetch_list=[avg_loss])
  61. if (batch_id + 1) % 10 == 0:
  62. print("Epoch: {0}, Batch: {1}, loss: {2}".format(
  63. epoch, batch_id, avg_loss_value[0]))
  64. # destory the resource of current trainer node in pserver server node
  65. exe.close()
  66. else:
  67. raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")
  68.  
  69. train()

环境变量说明

在启动分布式训练任务时,使用不同的环境变量来表示不同的节点角色,具体如下:

环境变量数据类型样例描述
PADDLE_TRAINING_ROLEstrPSERVER,TRAINER当前训练节点角色
PADDLE_PSERVER_IPSstrps0.paddlepaddle.com,ps1.paddlepaddle.com分布式训练任务中所有 PSERVER 节点的 IP 地址或 hostname, 使用”,”分隔
PADDLE_PSERVER_PORTint6174PSERVER 进程监听的端口
PADDLE_TRAINERSint2分布式训练任务中 trainer 节点的数量
PADDLE_CURRENT_IPstrps0.paddlepaddle.com当前 PSERVER 节点的 IP 地址或 hostname
PADDLE_TRAINER_IDstr0当前 TRAINER 节点的 ID (唯一), 取值范围为 [0, PADDLE_TRAINERS)

注: 环境变量只是获取运行时信息的一种方式,实际任务中可以采用命令行参数等方式获取运行时信息。

分布式训练相关 API

DistributeTranspiler

基于 pserver-trainer 架构的的分布式训练任务分为两种角色: Parameter Server(PSERVER) 以及 TRAINER,在 Fluid 中,用户只需配置单机训练所需要的网络配置, DistributeTranspiler 模块会自动地根据当前训练节点的角色将用户配置的单机网路配置改写成 PSERVER 和 TRAINER 需要运行的网络配置:

  1. t = fluid.DistributeTranspiler()
  2. t.transpile(
  3. trainer_id = trainer_id,
  4. pservers = pserver_endpoints,
  5. trainers = trainers)
  6. if PADDLE_TRAINING_ROLE == "TRAINER":
  7. # fetch the trainer program and execute it
  8. trainer_prog = t.get_trainer_program()
  9. ...
  10.  
  11. elif PADDLE_TRAINER_ROLE == "PSERVER":
  12. # fetch the pserver program and execute it
  13. pserver_prog = t.get_pserver_program(current_endpoint)
  14. ...

exe.close()

PSERVER 节点中会保存所有 TRAINER 节点的状态信息,在 TRAINER 结束训练时需要调用 exe.close()通知所有 PSERVER 节点释放当前 TRAINER 节点的资源:

  1. exe = fluid.Executor(fluid.CPUPlace())
  2. # training process ...
  3. exe.close() # notify PServer to destory the resource

启动分布式训练任务

启动节点启动命令说明
ps0.paddlepaddle.comPADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps0.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py启动 PSERVER 节点
ps1.paddlepaddle.comPADDLE_TRAINING_ROLE=PSERVER PADDLE_CURRENT_IP=ps1.paddlepaddle.com PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_PSERVER_PORT=6174 python fluid_dist.py启动 PSERVER 节点
trainer0.paddlepaddle.comPADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=0 PADDLE_PSERVER_PORT=6174 python fluid_dist.py启动第0号 TRAINER 节点
trainer1.paddlepaddle.comPADDLE_TRAINING_ROLE=TRAINER PADDLE_PSERVER_IPS=ps0.paddlepaddle.com,ps1.paddlepaddle.com PADDLE_TRAINERS=2 PADDLE_TRAINER_ID=1 PADDLE_PSERVER_PORT=6174 python fluid_dist.py启动第1号 TRAINER 节点