py_reader

  • paddle.fluid.layers.py_reader(capacity, shapes, dtypes, lod_levels=None, name=None, use_double_buffer=True)[源代码]

创建一个在Python端提供数据的reader

该OP返回一个Reader Variable。该Reader提供了 decorate_paddle_reader()decorate_tensor_provider() 来设置Python generator作为数据源,将数据源中的数据feed到Reader Variable。在c++端调用 Executor::Run() 时,来自generator的数据将被自动读取。与 DataFeeder.feed() 不同,数据读取进程和 Executor::Run() 进程可以使用 py_reader 并行运行。在每次数据传递开始时调用reader的 start() ,在传递结束和抛出 fluid.core.EOFException 异常后执行 reset()

注意: Program.clone() (含义详见 Program )不能克隆 py_reader ,且 read_fileread_file 含义详见 read_file )调用需在声明 py_reader 的program block内。

  • 参数:
    • capacity (int) – py_reader 维护的缓冲区的容量数据个数。
    • shapes (list|tuple) – 一个列表或元组,shapes[i]是代表第i个数据shape,因此shape[i]也是元组或列表。
    • dtypes (list|tuple) – 一个string的列表或元组。为 shapes 对应元素的数据类型,支持bool,float16,float32,float64,int8,int16,int32,int64,uint8。
    • lod_levels (list|tuple) – lod_level的整型列表或元组
    • name (str,可选) – 具体用法请参见 Name ,一般无需设置,默认值为None。
    • use_double_buffer (bool) – 是否使用双缓冲区,双缓冲区是为了预读下一个batch的数据、异步CPU -> GPU拷贝。默认值为True。

返回:reader,从reader中可以获取feed的数据,其dtype和feed的数据dtype相同。

返回类型:Variable

代码示例

1.py_reader 基本用法如下

  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4.  
  5. def network(image, label):
  6. # 用户自定义网络,此处以softmax回归为例
  7. predict = fluid.layers.fc(input=image, size=10, act='softmax')
  8. return fluid.layers.cross_entropy(input=predict, label=label)
  9.  
  10. reader = fluid.layers.py_reader(capacity=64,
  11. shapes=[(-1,1, 28, 28), (-1,1)],
  12. dtypes=['float32', 'int64'])
  13. reader.decorate_paddle_reader(
  14. paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5),
  15. buf_size=1000))
  16.  
  17. img, label = fluid.layers.read_file(reader)
  18. loss = network(img, label) # 一些网络定义
  19.  
  20. fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
  21. exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name)
  22. for epoch_id in range(10):
  23. reader.start()
  24. try:
  25. while True:
  26. exe.run(fetch_list=[loss.name])
  27. except fluid.core.EOFException:
  28. reader.reset()
  29.  
  30. fluid.io.save_inference_model(dirname='./model',
  31. feeded_var_names=[img.name, label.name],
  32. target_vars=[loss],
  33. executor=fluid.Executor(fluid.CUDAPlace(0)))

2.训练和测试应使用不同的名称创建两个不同的py_reader,例如:

  1. import paddle
  2. import paddle.fluid as fluid
  3. import paddle.dataset.mnist as mnist
  4.  
  5. def network(reader):
  6. img, label = fluid.layers.read_file(reader)
  7. # 用户自定义网络,此处以softmax回归为例
  8. predict = fluid.layers.fc(input=img, size=10, act='softmax')
  9. loss = fluid.layers.cross_entropy(input=predict, label=label)
  10. return fluid.layers.mean(loss)
  11.  
  12. # 新建 train_main_prog 和 train_startup_prog
  13. train_main_prog = fluid.Program()
  14. train_startup_prog = fluid.Program()
  15. with fluid.program_guard(train_main_prog, train_startup_prog):
  16. # 使用 fluid.unique_name.guard() 实现与test program的参数共享
  17. with fluid.unique_name.guard():
  18. train_reader = fluid.layers.py_reader(capacity=64,
  19. shapes=[(-1, 1, 28, 28), (-1, 1)],
  20. dtypes=['float32', 'int64'],
  21. name='train_reader')
  22. train_reader.decorate_paddle_reader(
  23. paddle.reader.shuffle(paddle.batch(mnist.train(),
  24. batch_size=5),
  25. buf_size=500))
  26. train_loss = network(train_reader) # 一些网络定义
  27. adam = fluid.optimizer.Adam(learning_rate=0.01)
  28. adam.minimize(train_loss)
  29.  
  30. # Create test_main_prog and test_startup_prog
  31. test_main_prog = fluid.Program()
  32. test_startup_prog = fluid.Program()
  33. with fluid.program_guard(test_main_prog, test_startup_prog):
  34. # 使用 fluid.unique_name.guard() 实现与train program的参数共享
  35. with fluid.unique_name.guard():
  36. test_reader = fluid.layers.py_reader(capacity=32,
  37. shapes=[(-1, 1, 28, 28), (-1, 1)],
  38. dtypes=['float32', 'int64'],
  39. name='test_reader')
  40. test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
  41.  
  42. test_loss = network(test_reader)
  43.  
  44. fluid.Executor(fluid.CUDAPlace(0)).run(train_startup_prog)
  45. fluid.Executor(fluid.CUDAPlace(0)).run(test_startup_prog)
  46.  
  47. train_exe = fluid.ParallelExecutor(use_cuda=True,
  48. loss_name=train_loss.name, main_program=train_main_prog)
  49. test_exe = fluid.ParallelExecutor(use_cuda=True,
  50. loss_name=test_loss.name, main_program=test_main_prog)
  51. for epoch_id in range(10):
  52. train_reader.start()
  53. try:
  54. while True:
  55. train_exe.run(fetch_list=[train_loss.name])
  56. except fluid.core.EOFException:
  57. train_reader.reset()
  58.  
  59. test_reader.start()
  60. try:
  61. while True:
  62. test_exe.run(fetch_list=[test_loss.name])
  63. except fluid.core.EOFException:
  64. test_reader.reset()