multiprocess_reader

paddle.fluid.io.multiprocess_reader(readers, use_pipe=True, queue_size=1000)[源代码]

使用python多进程从 readers 中读取数据,然后使用 multiprocessing.Pipemultiprocessing.Queue 合并所有数据。 readers 列表中的每个reader会被创建一个独立的进程来调用,reader之间应该相互独立,互不影响,避免出现多进程读取的冲突问题.

multiprocess.queue需要/dev/shm的rw访问权限,某些平台不支持。

参数

  • readers (list(generator)|tuple(generator)) - python生成器list, 用来读取数据
  • use_pipe (bool,可选) - use_pipe控制multiprocess_reader内部用 pipe 还是 queue 来实现进程间通信,默认为 True 使用 pipe 进行通信
  • queue_size (int,可选) - 如果使用queue来进行进程间通信 (use_pipe=False), 则该参数用来设定队列大小

返回

使用多进程封装readers之后的reader

返回类型

python生成器

代码示例

  1. import paddle.fluid as fluid
  2. from paddle.fluid.io import multiprocess_reader
  3. import numpy as np
  4. def fake_reader(start, end):
  5. def __impl__():
  6. for i in range(start, end):
  7. yield [np.array([1, 2, 3]) * i],
  8. return __impl__
  9. with fluid.program_guard(fluid.Program(), fluid.Program()):
  10. place = fluid.CPUPlace()
  11. image = fluid.layers.data(
  12. name='image', dtype='int64', shape=[3])
  13. fluid.layers.Print(image)
  14. reader = fluid.io.PyReader(
  15. feed_list=[image], capacity=2)
  16. image_p_1 = image + 1
  17. decorated_reader = multiprocess_reader(
  18. [fake_reader(1, 5), fake_reader(6, 10)], False)
  19. reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
  20. exe = fluid.Executor(place)
  21. exe.run(fluid.default_startup_program())
  22. for data in reader():
  23. exe.run(feed=data, fetch_list=[image_p_1])