分布式训练快速开始

使用Fleet API进行分布式训练

从PaddlePaddle Release 1.5.1 开始,官方推荐使用Fleet API进行分布式训练。

准备条件

  • [x] 成功安装PaddlePaddle,如果尚未安装,请参考 快速开始

  • [x] 学会最基本的单机训练方法,请参考 单机训练 中描述的单机训练,进行学习

点击率预估任务

本文使用一个简单的示例,点击率预估任务,来说明如何使用Fleet API进行分布式训练的配置方法,并利用单机环境模拟分布式环境给出运行示例。

为了方便学习,这里给出的示例是单机与多机混合的代码,用户可以通过不同的启动命令进行单机或多机任务的启动。

  1. from __future__ import print_function
  2. from args import parse_args
  3. import os
  4. import sys
  5. import paddle
  6. import paddle.distributed.fleet.base.role_maker as role_maker
  7. import paddle.distributed.fleet as fleet
  8. from network_conf import ctr_dnn_model_dataset
  9. dense_feature_dim = 13
  10. sparse_feature_dim = 10000001
  11. batch_size = 100
  12. thread_num = 10
  13. embedding_size = 10
  14. args = parse_args()
  15. def main_function(is_local):
  16. # common code for local training and distributed training
  17. dense_input = paddle.static.data(
  18. name="dense_input", shape=[dense_feature_dim], dtype='float32')
  19. sparse_input_ids = [
  20. paddle.static.data(name="C" + str(i), shape=[1], lod_level=1,
  21. dtype="int64") for i in range(1, 27)]
  22. label = paddle.static.data(name="label", shape=[1], dtype="int64")
  23. dataset = paddle.distributed.QueueDataset()
  24. dataset.init(
  25. batch_size=batch_size,
  26. thread_num=thread_num,
  27. input_type=0,
  28. pipe_command=python criteo_reader.py %d" % sparse_feature_dim,
  29. use_var=[dense_input] + sparse_input_ids + [label])
  30. whole_filelist = ["raw_data/part-%d" % x
  31. for x in range(len(os.listdir("raw_data")))]
  32. dataset.set_filelist(whole_filelist)
  33. loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(
  34. dense_input, sparse_input_ids, label, embedding_size,
  35. sparse_feature_dim)
  36. exe = paddle.static.Executor(paddle.CPUPlace())
  37. def train_loop(epoch=20):
  38. for i in range(epoch):
  39. exe.train_from_dataset(program=paddle.static.default_main_program(),
  40. dataset=dataset,
  41. fetch_list=[auc_var],
  42. fetch_info=["auc"],
  43. debug=False)
  44. # local training
  45. def local_train():
  46. optimizer = paddle.optimizer.SGD(learning_rate=1e-4)
  47. optimizer.minimize(loss)
  48. exe.run(paddle.static.default_startup_program())
  49. train_loop()
  50. # distributed training
  51. def dist_train():
  52. role = role_maker.PaddleCloudRoleMaker()
  53. fleet.init(role)
  54. strategy = paddle.distributed.fleet.DistributedStrategy()
  55. strategy.a_sync = True
  56. optimizer = paddle.optimizer.SGD(learning_rate=1e-4)
  57. optimizer = fleet.distributed_optimizer(optimizer, strategy)
  58. optimizer.minimize(loss)
  59. if fleet.is_server():
  60. fleet.init_server()
  61. fleet.run_server()
  62. elif fleet.is_worker():
  63. fleet.init_worker()
  64. exe.run(paddle.static.default_startup_program())
  65. train_loop()
  66. if is_local:
  67. local_train()
  68. else:
  69. dist_train()
  70. if __name__ == '__main__':
  71. main_function(args.is_local)
  • 说明:示例中使用的IO方法是dataset,想了解具体的文档和用法请参考 Dataset API

单机训练启动命令

  1. python train.py --is_local 1

单机模拟分布式训练的启动命令

在单机模拟多机训练的启动命令,这里我们用到了paddle内置的一个启动器launch_ps,用户可以指定worker和server的数量进行参数服务器任务的启动

  1. python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py

任务运行的日志在工作目录的logs目录下可以查看,当您能够使用单机模拟分布式训练。