异步数据读取

除同步Feed方式外,我们提供了PyReader。PyReader的性能比 同步数据读取 更好,因为PyReader的数据读取和模型训练过程是异步进行的,且能与 double_buffer_reader 配合以进一步提高数据读取性能。此外, double_buffer_reader 负责异步完成CPU Tensor到GPU Tensor的转换,一定程度上提升了数据读取效率。

创建PyReader对象

创建PyReader对象的方式为:

  1. import paddle.fluid as fluid
  2.  
  3. py_reader = fluid.layers.py_reader(capacity=64,
  4. shapes=[(-1,784), (-1,1)],
  5. dtypes=['float32', 'int64'],
  6. name='py_reader',
  7. use_double_buffer=True)

其中,capacity为PyReader对象的缓存区大小;shapes为batch各参量(如图像分类任务中的image和label)的尺寸;dtypes为batch各参量的数据类型;name为PyReader对象的名称;use_double_buffer默认为True,表示使用 double_buffer_reader ,建议开启,可提升数据读取速度。

需要注意的是:如果您要创建多个不同PyReader对象(例如训练和预测阶段需创建两个不同的PyReader),则需要必须给不同的PyReader对象指定不同的name。这是因为PaddlePaddle采用不同的变量名区分不同的变量,而且 Program.clone() (参见 cn_api_fluid_Program_clone )不能实现PyReader对象的复制。

  1. import paddle.fluid as fluid
  2.  
  3. train_py_reader = fluid.layers.py_reader(capacity=64,
  4. shapes=[(-1,784), (-1,1)],
  5. dtypes=['float32', 'int64'],
  6. name='train',
  7. use_double_buffer=True)
  8.  
  9. test_py_reader = fluid.layers.py_reader(capacity=64,
  10. shapes=[(-1,3,224,224), (-1,1)],
  11. dtypes=['float32', 'int64'],
  12. name='test',
  13. use_double_buffer=True)

在使用PyReader时,如果需要共享训练阶段和测试阶段的模型参数,您可以通过 fluid.unique_name.guard() 的方式来实现。注:Paddle采用变量名区分不同变量,且变量名是根据 unique_name 模块中的计数器自动生成的,每生成一个变量名计数值加1。 fluid.unique_name.guard() 的作用是重置 unique_name 模块中的计数器,保证多次调用 fluid.unique_name.guard() 配置网络时对应变量的变量名相同,从而实现参数共享。

下面是一个使用PyReader配置训练阶段和测试阶段网络的例子:

  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4.  
  5. def network(is_train):
  6. # Create py_reader object and give different names
  7. # when is_train = True and is_train = False
  8. reader = fluid.layers.py_reader(
  9. capacity=10,
  10. shapes=((-1, 784), (-1, 1)),
  11. dtypes=('float32', 'int64'),
  12. name="train_reader" if is_train else "test_reader",
  13. use_double_buffer=True)
  14.  
  15. # Use read_file() method to read out the data from py_reader
  16. img, label = fluid.layers.read_file(reader)
  17. ...
  18. # Here, we omitted the definition of loss of the model
  19. return loss , reader
  20.  
  21. # Create main program and startup program for training
  22. train_prog = fluid.Program()
  23. train_startup = fluid.Program()
  24.  
  25. with fluid.program_guard(train_prog, train_startup):
  26. # Use fluid.unique_name.guard() to share parameters with test network
  27. with fluid.unique_name.guard():
  28. train_loss, train_reader = network(True)
  29. adam = fluid.optimizer.Adam(learning_rate=0.01)
  30. adam.minimize(train_loss)
  31.  
  32. # Create main program and startup program for testing
  33. test_prog = fluid.Program()
  34. test_startup = fluid.Program()
  35. with fluid.program_guard(test_prog, test_startup):
  36. # Use fluid.unique_name.guard() to share parameters with train network
  37. with fluid.unique_name.guard():
  38. test_loss, test_reader = network(False)

设置PyReader对象的数据源

PyReader对象通过 decorate_paddle_reader()decorate_tensor_provider() 方法设置其数据源。 decorate_paddle_reader()decorate_tensor_provider() 均接收Python生成器 generator 作为参数, generator 内部每次通过yield的方式生成一个batch的数据。

decorate_paddle_reader()decorate_tensor_provider() 方法的区别在于:

  • decorate_paddle_reader() 要求 generator 返回的数据格式为[(img_1, label_1), (img_2, label_2), …, (img_n, label_n)],其中img_i和label_i均为每个样本的Numpy Array类型数据,n为batch size。而 decorate_tensor_provider() 要求 generator 返回的数据的数据格式为[batched_imgs, batched_labels],其中batched_imgs和batched_labels为batch级的Numpy Array或LoDTensor类型数据。
  • decorate_tensor_provider() 要求 generator 返回的数据类型、尺寸必须与配置py_reader时指定的dtypes、shapes参数相同,而 decorate_paddle_reader() 不要求数据类型和尺寸的严格一致,其内部会完成数据类型和尺寸的转换。具体方式为:
  1. import paddle.batch
  2. import paddle.fluid as fluid
  3. import numpy as np
  4.  
  5. BATCH_SIZE = 32
  6.  
  7. # Case 1: Use decorate_paddle_reader() method to set the data source of py_reader
  8. # The generator yields Numpy-typed batched data
  9. def fake_random_numpy_reader():
  10. image = np.random.random(size=(784, ))
  11. label = np.random.random_integers(size=(1, ), low=0, high=9)
  12. yield image, label
  13.  
  14. py_reader1 = fluid.layers.py_reader(
  15. capacity=10,
  16. shapes=((-1, 784), (-1, 1)),
  17. dtypes=('float32', 'int64'),
  18. name='py_reader1',
  19. use_double_buffer=True)
  20.  
  21. py_reader1.decorate_paddle_reader(paddle.batch(fake_random_numpy_reader, batch_size=BATCH_SIZE))
  22.  
  23.  
  24. # Case 2: Use decorate_tensor_provider() method to set the data source of py_reader
  25. # The generator yields Tensor-typed batched data
  26. def fake_random_tensor_provider():
  27. image = np.random.random(size=(BATCH_SIZE, 784)).astype('float32')
  28. label = np.random.random_integers(size=(BATCH_SIZE, 1), low=0, high=9).astype('int64')
  29. yield image_tensor, label_tensor
  30.  
  31. py_reader2 = fluid.layers.py_reader(
  32. capacity=10,
  33. shapes=((-1, 784), (-1, 1)),
  34. dtypes=('float32', 'int64'),
  35. name='py_reader2',
  36. use_double_buffer=True)
  37.  
  38. py_reader2.decorate_tensor_provider(fake_random_tensor_provider)

使用PyReader进行模型训练和测试

使用PyReader进行模型训练和测试的例程如下:

  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4. import six
  5.  
  6. def network(is_train):
  7. # Create py_reader object and give different names
  8. # when is_train = True and is_train = False
  9. reader = fluid.layers.py_reader(
  10. capacity=10,
  11. shapes=((-1, 784), (-1, 1)),
  12. dtypes=('float32', 'int64'),
  13. name="train_reader" if is_train else "test_reader",
  14. use_double_buffer=True)
  15. img, label = fluid.layers.read_file(reader)
  16. ...
  17. # Here, we omitted the definition of loss of the model
  18. return loss , reader
  19.  
  20. # Create main program and startup program for training
  21. train_prog = fluid.Program()
  22. train_startup = fluid.Program()
  23.  
  24. # Define train network
  25. with fluid.program_guard(train_prog, train_startup):
  26. # Use fluid.unique_name.guard() to share parameters with test network
  27. with fluid.unique_name.guard():
  28. train_loss, train_reader = network(True)
  29. adam = fluid.optimizer.Adam(learning_rate=0.01)
  30. adam.minimize(train_loss)
  31.  
  32. # Create main program and startup program for testing
  33. test_prog = fluid.Program()
  34. test_startup = fluid.Program()
  35.  
  36. # Define test network
  37. with fluid.program_guard(test_prog, test_startup):
  38. # Use fluid.unique_name.guard() to share parameters with train network
  39. with fluid.unique_name.guard():
  40. test_loss, test_reader = network(False)
  41.  
  42. place = fluid.CUDAPlace(0)
  43. exe = fluid.Executor(place)
  44.  
  45. # Run startup program
  46. exe.run(train_startup)
  47. exe.run(test_startup)
  48.  
  49. # Compile programs
  50. train_prog = fluid.CompiledProgram(train_prog).with_data_parallel(loss_name=train_loss.name)
  51. test_prog = fluid.CompiledProgram(test_prog).with_data_parallel(share_vars_from=train_prog)
  52.  
  53. # Set the data source of py_reader using decorate_paddle_reader() method
  54. train_reader.decorate_paddle_reader(
  55. paddle.reader.shuffle(paddle.batch(mnist.train(), 512), buf_size=8192))
  56.  
  57. test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
  58.  
  59. for epoch_id in six.moves.range(10):
  60. train_reader.start()
  61. try:
  62. while True:
  63. loss = exe.run(program=train_prog, fetch_list=[train_loss])
  64. print 'train_loss', loss
  65. except fluid.core.EOFException:
  66. print 'End of epoch', epoch_id
  67. train_reader.reset()
  68.  
  69. test_reader.start()
  70. try:
  71. while True:
  72. loss = exe.run(program=test_prog, fetch_list=[test_loss])
  73. print 'test loss', loss
  74. except fluid.core.EOFException:
  75. print 'End of testing'
  76. test_reader.reset()

具体步骤为:

  • 在每个epoch开始前,调用 start() 方法启动PyReader对象;
  • 在每个epoch结束时, read_file 抛出 fluid.core.EOFException 异常,在捕获异常后调用 reset() 方法重置PyReader对象的状态,以便启动下一轮的epoch。