异步数据读取

除同步Feed方式外,我们提供了DataLoader。DataLoader的性能比 同步数据读取 更好,因为DataLoader的数据读取和模型训练过程是异步进行的,且能与 double_buffer_reader 配合以进一步提高数据读取性能。此外, double_buffer_reader 负责异步完成CPU Tensor到GPU Tensor的转换,一定程度上提升了数据读取效率。

创建DataLoader对象

创建DataLoader对象的方式为:

  1. import paddle.fluid as fluid
  2.  
  3. image = fluid.data(name='image', dtype='float32', shape=[None, 784])
  4. label = fluid.data(name='label', dtype='int64', shape=[None, 1])
  5.  
  6. ITERABLE = True
  7.  
  8. data_loader = fluid.io.DataLoader.from_generator(
  9. feed_list=[image, label], capacity=64, use_double_buffer=True, iterable=ITERABLE)

其中,

  • feed_list为需要输入的数据层变量列表;
  • capacity为DataLoader对象的缓存区大小,单位为batch数量;
  • use_double_buffer默认为True,表示使用 double_buffer_reader 。建议开启,可提升数据读取速度;
  • iterable默认为True,表示该DataLoader对象是可For-Range迭代的。推荐设置iterable=True。当iterable=True时,DataLoader与Program解耦,定义DataLoader对象不会改变Program;当iterable=False时,DataLoader会在Program中插入数据读取相关的op。

需要注意的是:Program.clone() (参见 Program )不能实现DataLoader对象的复制。如果您要创建多个不同DataLoader对象(例如训练和预测阶段需创建两个不同的DataLoader),则需重定义两个DataLoader对象。 若需要共享训练阶段和测试阶段的模型参数,您可以通过 fluid.unique_name.guard() 的方式来实现。 注:Paddle采用变量名区分不同变量,且变量名是根据 unique_name 模块中的计数器自动生成的,每生成一个变量名计数值加1。 fluid.unique_name.guard() 的作用是重置 unique_name 模块中的计数器,保证多次调用 fluid.unique_name.guard() 配置网络时对应变量的变量名相同,从而实现参数共享。

下面是一个使用DataLoader配置训练阶段和测试阶段网络的例子:

  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4.  
  5. def network():
  6. image = fluid.data(name='image', dtype='float32', shape=[None, 784])
  7. label = fluid.data(name='label', dtype='int64', shape=[None, 1])
  8. loader = fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=64)
  9.  
  10. # Definition of models
  11. fc = fluid.layers.fc(image, size=10)
  12. xe = fluid.layers.softmax_with_cross_entropy(fc, label)
  13. loss = fluid.layers.reduce_mean(xe)
  14. return loss , loader
  15.  
  16. # Create main program and startup program for training
  17. train_prog = fluid.Program()
  18. train_startup = fluid.Program()
  19.  
  20. with fluid.program_guard(train_prog, train_startup):
  21. # Use fluid.unique_name.guard() to share parameters with test network
  22. with fluid.unique_name.guard():
  23. train_loss, train_loader = network()
  24. adam = fluid.optimizer.Adam(learning_rate=0.01)
  25. adam.minimize(train_loss)
  26.  
  27. # Create main program and startup program for testing
  28. test_prog = fluid.Program()
  29. test_startup = fluid.Program()
  30. with fluid.program_guard(test_prog, test_startup):
  31. # Use fluid.unique_name.guard() to share parameters with train network
  32. with fluid.unique_name.guard():
  33. test_loss, test_loader = network()

设置DataLoader对象的数据源

DataLoader对象通过 set_sample_generator()set_sample_list_generatorset_batch_generator() 方法设置其数据源。 这三个方法均接收Python生成器 generator 作为参数,其区别在于:

  • set_sample_generator() 要求 generator 返回的数据格式为[img_1, label_1],其中img_1和label_1为单个样本的Numpy Array类型数据。
  • set_sample_list_generator() 要求 generator 返回的数据格式为[(img_1, label_1), (img_2, label_2), …, (img_n, label_n)],其中img_i和label_i均为每个样本的Numpy Array类型数据,n为batch size。
  • set_batch_generator() 要求 generator 返回的数据的数据格式为[batched_imgs, batched_labels],其中batched_imgs和batched_labels为batch级的Numpy Array或LoDTensor类型数据。

值得注意的是,使用DataLoader做多GPU卡(或多CPU核)训练时,实际的总batch size为用户传入的 generator 的batch size乘以设备数量。

当DataLoader的iterable=True(默认)时,必须给这三个方法传 places 参数, 指定将读取的数据转换为CPU Tensor还是GPU Tensor。当DataLoader的iterable=False时,不需传places参数。

例如,假设我们有两个reader,其中fake_sample_reader每次返回一个sample的数据,fake_batch_reader每次返回一个batch的数据。

  1. import paddle.fluid as fluid
  2. import numpy as np
  3.  
  4. # sample级reader
  5. def fake_sample_reader():
  6. for _ in range(100):
  7. sample_image = np.random.random(size=(784, )).astype('float32')
  8. sample_label = np.random.random_integers(size=(1, ), low=0, high=9).astype('int64')
  9. yield sample_image, sample_label
  10.  
  11. # batch级reader
  12. def fake_batch_reader():
  13. batch_size = 32
  14. for _ in range(100):
  15. batch_image = np.random.random(size=(batch_size, 784)).astype('float32')
  16. batch_label = np.random.random_integers(size=(batch_size, 1), low=0, high=9).astype('int64')
  17. yield batch_image, batch_label
  18.  
  19. image1 = fluid.data(name='image1', dtype='float32', shape=[None, 784])
  20. label1 = fluid.data(name='label1', dtype='int64', shape=[None, 1])
  21.  
  22. image2 = fluid.data(name='image2', dtype='float32', shape=[None, 784])
  23. label2 = fluid.data(name='label2', dtype='int64', shape=[None, 1])
  24.  
  25. image3 = fluid.data(name='image3', dtype='float32', shape=[None, 784])
  26. label3 = fluid.data(name='label3', dtype='int64', shape=[None, 1])

对应的DataLoader设置如下:

  1. import paddle
  2. import paddle.fluid as fluid
  3.  
  4. ITERABLE = True
  5. USE_CUDA = True
  6. USE_DATA_PARALLEL = True
  7.  
  8. if ITERABLE:
  9. # 若DataLoader可迭代,则必须设置places参数
  10. if USE_DATA_PARALLEL:
  11. # 若进行多GPU卡训练,则取所有的CUDAPlace
  12. # 若进行多CPU核训练,则取多个CPUPlace,本例中取了8个CPUPlace
  13. places = fluid.cuda_places() if USE_CUDA else fluid.cpu_places(8)
  14. else:
  15. # 若进行单GPU卡训练,则取单个CUDAPlace,本例中0代表0号GPU卡
  16. # 若进行单CPU核训练,则取单个CPUPlace,本例中1代表1个CPUPlace
  17. places = fluid.cuda_places(0) if USE_CUDA else fluid.cpu_places(1)
  18. else:
  19. # 若DataLoader不可迭代,则不需要设置places参数
  20. places = None
  21.  
  22. # 使用sample级的reader作为DataLoader的数据源
  23. data_loader1 = fluid.io.DataLoader.from_generator(feed_list=[image1, label1], capacity=10, iterable=ITERABLE)
  24. data_loader1.set_sample_generator(fake_sample_reader, batch_size=32, places=places)
  25.  
  26. # 使用sample级的reader + fluid.io.batch设置DataLoader的数据源
  27. data_loader2 = fluid.io.DataLoader.from_generator(feed_list=[image2, label2], capacity=10, iterable=ITERABLE)
  28. sample_list_reader = fluid.io.batch(fake_sample_reader, batch_size=32)
  29. sample_list_reader = fluid.io.shuffle(sample_list_reader, buf_size=64) # 还可以进行适当的shuffle
  30. data_loader2.set_sample_list_generator(sample_list_reader, places=places)
  31.  
  32. # 使用batch级的reader作为DataLoader的数据源
  33. data_loader3 = fluid.io.DataLoader.from_generator(feed_list=[image3, label3], capacity=10, iterable=ITERABLE)
  34. data_loader3.set_batch_generator(fake_batch_reader, places=places)

使用DataLoader进行模型训练和测试

使用DataLoader进行模型训练和测试的例程如下。

  • 第一步,我们需组建训练网络和预测网络,并定义相应的DataLoader对象,设置好DataLoader对象的数据源。
  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4. import six
  5.  
  6. ITERABLE = True
  7.  
  8. def network():
  9. # 创建数据层对象
  10. image = fluid.data(name='image', dtype='float32', shape=[None, 784])
  11. label = fluid.data(name='label', dtype='int64', shape=[None, 1])
  12.  
  13. # 创建DataLoader对象
  14. reader = fluid.io.DataLoader.from_generator(feed_list=[image, label], capacity=64, iterable=ITERABLE)
  15.  
  16. # Definition of models
  17. fc = fluid.layers.fc(image, size=10)
  18. xe = fluid.layers.softmax_with_cross_entropy(fc, label)
  19. loss = fluid.layers.reduce_mean(xe)
  20. return loss , reader
  21.  
  22. # 创建训练的main_program和startup_program
  23. train_prog = fluid.Program()
  24. train_startup = fluid.Program()
  25.  
  26. # 定义训练网络
  27. with fluid.program_guard(train_prog, train_startup):
  28. # fluid.unique_name.guard() to share parameters with test network
  29. with fluid.unique_name.guard():
  30. train_loss, train_loader = network()
  31. adam = fluid.optimizer.Adam(learning_rate=0.01)
  32. adam.minimize(train_loss)
  33.  
  34. # 创建预测的main_program和startup_program
  35. test_prog = fluid.Program()
  36. test_startup = fluid.Program()
  37.  
  38. # 定义预测网络
  39. with fluid.program_guard(test_prog, test_startup):
  40. # Use fluid.unique_name.guard() to share parameters with train network
  41. with fluid.unique_name.guard():
  42. test_loss, test_loader = network()
  43.  
  44. place = fluid.CUDAPlace(0)
  45. exe = fluid.Executor(place)
  46.  
  47. # 运行startup_program进行初始化
  48. exe.run(train_startup)
  49. exe.run(test_startup)
  50.  
  51. # Compile programs
  52. train_prog = fluid.CompiledProgram(train_prog).with_data_parallel(loss_name=train_loss.name)
  53. test_prog = fluid.CompiledProgram(test_prog).with_data_parallel(share_vars_from=train_prog)
  54.  
  55. # 设置DataLoader的数据源
  56. places = fluid.cuda_places() if ITERABLE else None
  57.  
  58. train_loader.set_sample_list_generator(
  59. fluid.io.shuffle(fluid.io.batch(mnist.train(), 512), buf_size=1024), places=places)
  60.  
  61. test_loader.set_sample_list_generator(fluid.io.batch(mnist.test(), 512), places=places)
  • 第二步:根据DataLoader对象是否iterable,选用不同的方式运行网络。

若iterable=True,则DataLoader对象是一个Python的生成器,可直接for-range迭代。for-range返回的结果通过exe.run的feed参数传入执行器。

  1. def run_iterable(program, exe, loss, data_loader):
  2. for data in data_loader():
  3. loss_value = exe.run(program=program, feed=data, fetch_list=[loss])
  4. print('loss is {}'.format(loss_value))
  5.  
  6. for epoch_id in six.moves.range(10):
  7. run_iterable(train_prog, exe, train_loss, train_loader)
  8. run_iterable(test_prog, exe, test_loss, test_loader)

若iterable=False,则需在每个epoch开始前,调用 start() 方法启动DataLoader对象;并在每个epoch结束时,exe.run会抛出 fluid.core.EOFException 异常,在捕获异常后调用 reset() 方法重置DataLoader对象的状态, 以便启动下一轮的epoch。iterable=False时无需给exe.run传入feed参数。具体方式为:

  1. def run_non_iterable(program, exe, loss, data_loader):
  2. data_loader.start()
  3. try:
  4. while True:
  5. loss_value = exe.run(program=program, fetch_list=[loss])
  6. print('loss is {}'.format(loss_value))
  7. except fluid.core.EOFException:
  8. print('End of epoch')
  9. data_loader.reset()
  10.  
  11. for epoch_id in six.moves.range(10):
  12. run_non_iterable(train_prog, exe, train_loss, train_loader)
  13. run_non_iterable(test_prog, exe, test_loss, test_loader)