multiprocess_reader

  • paddle.fluid.io.multiprocess_reader(readers, use_pipe=True, queue_size=1000)[源代码]

使用python多进程从 readers 中读取数据,然后使用 multiprocessing.Pipemultiprocessing.Queue 合并所有数据。 readers 列表中的每个reader会被创建一个独立的进程来调用,reader之间应该相互独立,互不影响,避免出现多进程读取的冲突问题.

multiprocess.queue需要/dev/shm的rw访问权限,某些平台不支持。

  • 参数:
    • readers (list(generator)|tuple(generator)) - python生成器list, 用来读取数据
    • use_pipe (bool,可选) - use_pipe控制multiprocess_reader内部用 pipe 还是 queue 来实现进程间通信,默认为 True 使用 pipe 进行通信
    • queue_size (int,可选) - 如果使用queue来进行进程间通信 (use_pipe=False), 则该参数用来设定队列大小

返回:使用多进程封装readers之后的reader

返回类型:python生成器

代码示例

  1. import paddle.fluid as fluid
  2. from paddle.fluid.io import multiprocess_reader
  3. import numpy as np
  4.  
  5.  
  6. def fake_reader(start, end):
  7. def __impl__():
  8. for i in range(start, end):
  9. yield [np.array([1, 2, 3]) * i],
  10. return __impl__
  11.  
  12.  
  13. with fluid.program_guard(fluid.Program(), fluid.Program()):
  14. place = fluid.CPUPlace()
  15. image = fluid.layers.data(
  16. name='image', dtype='int64', shape=[3])
  17. fluid.layers.Print(image)
  18. reader = fluid.io.PyReader(
  19. feed_list=[image], capacity=2)
  20. image_p_1 = image + 1
  21. decorated_reader = multiprocess_reader(
  22. [fake_reader(1, 5), fake_reader(6, 10)], False)
  23.  
  24. reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
  25.  
  26. exe = fluid.Executor(place)
  27. exe.run(fluid.default_startup_program())
  28.  
  29. for data in reader():
  30. exe.run(feed=data, fetch_list=[image_p_1])