使用FleetAPI进行分布式训练

FleetAPI 设计说明

Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。具体的设计原理可以参考Fleet API设计文档。当前FleetAPI还处于paddle.fluid.incubate目录下,未来功能完备后会放到paddle.fluid目录中,欢迎持续关注。

Fleet API快速上手示例

下面会针对Fleet API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。快速上手的示例源代码可以在Fleet Quick Start 找到。

  • 假设我们定义MLP网络如下:
  1. import paddle.fluid as fluid
  2.  
  3. def mlp(input_x, input_y, hid_dim=128, label_dim=2):
  4. fc_1 = fluid.layers.fc(input=input_x, size=hid_dim, act='tanh')
  5. fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim, act='tanh')
  6. prediction = fluid.layers.fc(input=[fc_2], size=label_dim, act='softmax')
  7. cost = fluid.layers.cross_entropy(input=prediction, label=input_y)
  8. avg_cost = fluid.layers.mean(x=cost)
  9. return avg_cost
  • 定义一个在内存生成数据的Reader如下:
  1. import numpy as np
  2.  
  3. def gen_data():
  4. return {"x": np.random.random(size=(128, 32)).astype('float32'),
  5. "y": np.random.randint(2, size=(128, 1)).astype('int64')}
  • 单机Trainer定义
  1. import paddle.fluid as fluid
  2. from nets import mlp
  3. from utils import gen_data
  4.  
  5. input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
  6. input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
  7.  
  8. cost = mlp(input_x, input_y)
  9. optimizer = fluid.optimizer.SGD(learning_rate=0.01)
  10. optimizer.minimize(cost)
  11. place = fluid.CUDAPlace(0)
  12.  
  13. exe = fluid.Executor(place)
  14. exe.run(fluid.default_startup_program())
  15. step = 1001
  16. for i in range(step):
  17. cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name])
  18. print("step%d cost=%f" % (i, cost_val[0]))
  • Parameter Server训练方法

参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出使用Parameter Server进行训练的示例如下:

  1. import paddle.fluid as fluid
  2. from nets import mlp
  3. from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
  4. from paddle.fluid.incubate.fleet.base import role_maker
  5. from utils import gen_data
  6.  
  7. input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
  8. input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
  9.  
  10. cost = mlp(input_x, input_y)
  11. optimizer = fluid.optimizer.SGD(learning_rate=0.01)
  12.  
  13. role = role_maker.PaddleCloudRoleMaker()
  14. fleet.init(role)
  15. optimizer = fleet.distributed_optimizer(optimizer)
  16. optimizer.minimize(cost)
  17.  
  18. if fleet.is_server():
  19. fleet.init_server()
  20. fleet.run_server()
  21. elif fleet.is_worker():
  22. place = fluid.CPUPlace()
  23. exe = fluid.Executor(place)
  24. exe.run(fluid.default_startup_program())
  25. step = 1001
  26. for i in range(step):
  27. cost_val = exe.run(
  28. program=fluid.default_main_program(),
  29. feed=gen_data(),
  30. fetch_list=[cost.name])
  31. print("worker_index: %d, step%d cost = %f" %
  32. (fleet.worker_index(), i, cost_val[0]))
  • Collective训练方法

Collective Training通常在GPU多机多卡训练中使用,一般在复杂模型的训练中比较常见,我们基于上面的单机模型定义给出使用Collective方法进行分布式训练的示例如下:

  1. import paddle.fluid as fluid
  2. from nets import mlp
  3. from paddle.fluid.incubate.fleet.collective import fleet
  4. from paddle.fluid.incubate.fleet.base import role_maker
  5. from utils import gen_data
  6.  
  7. input_x = fluid.layers.data(name="x", shape=[32], dtype='float32')
  8. input_y = fluid.layers.data(name="y", shape=[1], dtype='int64')
  9.  
  10. cost = mlp(input_x, input_y)
  11. optimizer = fluid.optimizer.SGD(learning_rate=0.01)
  12. role = role_maker.PaddleCloudRoleMaker(is_collective=True)
  13. fleet.init(role)
  14.  
  15. optimizer = fleet.distributed_optimizer(optimizer)
  16. optimizer.minimize(cost)
  17. place = fluid.CUDAPlace(0)
  18.  
  19. exe = fluid.Executor(place)
  20. exe.run(fluid.default_startup_program())
  21. step = 1001
  22. for i in range(step):
  23. cost_val = exe.run(
  24. program=fluid.default_main_program(),
  25. feed=gen_data(),
  26. fetch_list=[cost.name])
  27. print("worker_index: %d, step%d cost = %f" %
  28. (fleet.worker_index(), i, cost_val[0]))

更多使用示例

点击率预估

语义匹配

向量学习

基于Resnet50的图像分类

基于Transformer的机器翻译

基于Bert的语义表示学习

Fleet API相关的接口说明

Fleet API接口

  • init(role_maker=None)
    • fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置
  • is_worker()
    • Parameter Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False
  • is_server(model_dir=None)
    • Parameter Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False
  • init_server()
    • Parameter Server训练中,fleet加载model_dir中保存的模型相关参数进行parameter server的初始化
  • run_server()
    • Parameter Server训练中使用,用来启动server端服务
  • init_worker()
    • Parameter Server训练中使用,用来启动worker端服务
  • stop_worker()
    • 训练结束后,停止worker
  • distributed_optimizer(optimizer, strategy=None)
    • 分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer

RoleMaker

  • MPISymetricRoleMaker

    • 描述:MPISymetricRoleMaker会假设每个节点启动两个进程,1worker+1pserver,这种RoleMaker要求用户的集群上有mpi环境。

    • 示例:

  1. from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
  2. from paddle.fluid.incubate.fleet.base import role_maker
  3.  
  4. role = role_maker.MPISymetricRoleMaker()
  5. fleet.init(role)
  • 启动方法:
  1. mpirun -np 2 python trainer.py
  • PaddleCloudRoleMaker

    • 描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch_ps启动脚本

    • Parameter Server训练示例:

  1. from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
  2. from paddle.fluid.incubate.fleet.base import role_maker
  3.  
  4. role = role_maker.PaddleCloudRoleMaker()
  5. fleet.init(role)
  • 启动方法:
  1. python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py
  • Collective训练示例:
  1. from paddle.fluid.incubate.fleet.collective import fleet
  2. from paddle.fluid.incubate.fleet.base import role_maker
  3.  
  4. role = role_maker.PaddleCloudRoleMaker(is_collective=True)
  5. fleet.init(role)
  • 启动方法:
  1. python -m paddle.distributed.launch trainer.py
  • UserDefinedRoleMaker

    • 描述:用户自定义节点的角色信息,IP和端口信息

    • 示例:

  1. from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
  2. from paddle.fluid.incubate.fleet.base import role_maker
  3.  
  4. role = role_maker.UserDefinedRoleMaker(
  5. current_id=int(os.getenv("CURRENT_ID")),
  6. role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER")))
  7. else role_maker.Role.SERVER,
  8. worker_num=int(os.getenv("WORKER_NUM")),
  9. server_endpoints=pserver_endpoints)
  10. fleet.init(role)

Strategy

  • Parameter Server Training
    • Sync_mode
  • Collective Training
    • LocalSGD
    • ReduceGrad

Fleet Mode

  • Parameter Server Training
  1. from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
  • Collective Training
  1. from paddle.fluid.incubate.fleet.collective import fleet