DataLoader

class paddle.io.DataLoader ( dataset, feed_list=None, places=None, return_list=False, batch_sampler=None, batch_size=1, shuffle=False, drop_last=False, collate_fn=None, num_workers=0, use_buffer_reader=True, use_shared_memory=False, timeout=0, worker_init_fn=None ) [源代码]

DataLoader返回一个迭代器,该迭代器根据 batch_sampler 给定的顺序迭代一次给定的 dataset

DataLoader支持单进程和多进程的数据加载方式,当 num_workers 大于0时,将使用多进程方式异步加载数据。

DataLoader当前支持 map-styleiterable-style 的数据集, map-style 的数据集可通过下标索引样本,请参考 paddle.io.Datasetiterable-style 数据集只能迭代式地获取样本,类似Python迭代器,请参考 paddle.io.IterableDataset

batch_sampler 请参考 paddle.io.BatchSampler

禁用自动组batch

在如NLP等任务中,用户需求自定义组batch的方式,不希望 DataLoader 自动组batch, DataLoader 支持在 batch_sizebatch_sampler 均为None的时候禁用自动组batch功能,此时需求从 dataset 中获取的数据为已经组好batch的数据,该数据将不做任何处理直接传到 collate_fndefault_collate_fn 中。

注解

当禁用自动组batch时, default_collate_fn 将不对输入数据做任何处理。

参数:

  • dataset (Dataset) - DataLoader从此参数给定数据集中加载数据,此参数必须是 paddle.io.Datasetpaddle.io.IterableDataset 的一个子类实例。

  • feed_list (list(Tensor)|tuple(Tensor)) - feed变量列表,由 paddle.static.data() 创建。当 return_list 为False时,此参数必须设置。默认值为None。

  • places (list(Place)|tuple(Place)) - 数据需要放置到的Place列表。在静态图和动态图模式中,此参数均必须设置。在动态图模式中,此参数列表长度必须是1。默认值为None。

  • return_list (bool) - 每个设备上的数据是否以list形式返回。若return_list = False,每个设备上的返回数据均是str -> Tensor的映射表,其中映射表的key是每个输入变量的名称。若return_list = True,则每个设备上的返回数据均是list(Tensor)。在动态图模式下,此参数必须为True。默认值为False。

  • batch_sampler (BatchSampler) - paddle.io.BatchSampler 或其子类的实例,DataLoader通过 batch_sampler 产生的mini-batch索引列表来 dataset 中索引样本并组成mini-batch。默认值为None。

  • batch_size (int|None) - 每mini-batch中样本个数,为 batch_sampler 的替代参数,若 batch_sampler 未设置,会根据 batch_size shuffle drop_last 创建一个 paddle.io.BatchSampler 。默认值为1。

  • shuffle (bool) - 生成mini-batch索引列表时是否对索引打乱顺序,为 batch_sampler 的替代参数,若 batch_sampler 未设置,会根据 batch_size shuffle drop_last 创建一个 paddle.io.BatchSampler 。默认值为False。

  • drop_last (bool) - 是否丢弃因数据集样本数不能被 batch_size 整除而产生的最后一个不完整的mini-batch,为 batch_sampler 的替代参数,若 batch_sampler 未设置,会根据 batch_size shuffle drop_last 创建一个 paddle.io.BatchSampler 。默认值为False。

  • collate_fn (callable) - 通过此参数指定如果将样本列表组合为mini-batch数据,当 collate_fn 为None时,默认为将样本个字段在第0维上堆叠(同 np.stack(..., axis=0) )为mini-batch的数据。默认值为None。

  • num_workers (int) - 用于加载数据的子进程个数,若为0即为不开启子进程,在主进程中进行数据加载。默认值为0。

  • use_buffer_reader (bool) - 是否使用缓存读取器 。若 use_buffer_reader 为True,DataLoader会异步地预读取下一个mini-batch的数据,可加速数据读取过程,但同时会占用少量的CPU/GPU存储,即一个batch输入数据的存储空间。默认值为True。

  • use_shared_memory (bool) - 是否使用共享内存来提升子进程将数据放入进程间队列的速度,该参数尽在多进程模式下有效(即 num_workers > 0 ),请确认机器上有足够的共享内存空间(如Linux系统下 /dev/shm/ 目录空间大小)再设置此参数。默认为False。

  • timeout (int) - 从子进程输出队列获取mini-batch数据的超时时间。默认值为0。

  • worker_init_fn (callable) - 子进程初始化函数,此函数会被子进程初始化时被调用,并传递 worker id 作为参数。默认值为None。

返回:迭代 dataset 数据的迭代器,迭代器返回的数据中的每个元素都是一个Tensor。

返回类型: DataLoader

代码示例

  1. import numpy as np
  2. import paddle
  3. import paddle.nn as nn
  4. import paddle.nn.functional as F
  5. from paddle.io import Dataset, BatchSampler, DataLoader
  6. BATCH_NUM = 20
  7. BATCH_SIZE = 16
  8. EPOCH_NUM = 4
  9. IMAGE_SIZE = 784
  10. CLASS_NUM = 10
  11. USE_GPU = False # whether use GPU to run model
  12. # define a random dataset
  13. class RandomDataset(Dataset):
  14. def __init__(self, num_samples):
  15. self.num_samples = num_samples
  16. def __getitem__(self, idx):
  17. image = np.random.random([IMAGE_SIZE]).astype('float32')
  18. label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64')
  19. return image, label
  20. def __len__(self):
  21. return self.num_samples
  22. dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
  23. class SimpleNet(nn.Layer):
  24. def __init__(self):
  25. super(SimpleNet, self).__init__()
  26. self.fc = nn.Linear(IMAGE_SIZE, CLASS_NUM)
  27. def forward(self, image, label=None):
  28. return self.fc(image)
  29. simple_net = SimpleNet()
  30. opt = paddle.optimizer.SGD(learning_rate=1e-3,
  31. parameters=simple_net.parameters())
  32. loader = DataLoader(dataset,
  33. batch_size=BATCH_SIZE,
  34. shuffle=True,
  35. drop_last=True,
  36. num_workers=2)
  37. for e in range(EPOCH_NUM):
  38. for i, (image, label) in enumerate(loader()):
  39. out = simple_net(image)
  40. loss = F.cross_entropy(out, label)
  41. avg_loss = paddle.mean(loss)
  42. avg_loss.backward()
  43. opt.minimize(avg_loss)
  44. simple_net.clear_gradients()
  45. print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy())))

from_generator ( feed_list=None, capacity=None, use_double_buffer=True, iterable=True, return_list=False, use_multiprocess=False, drop_last=True )

警告

这个API将在未来版本废弃,推荐使用支持多进程并发加速的 paddle.io.DataLoader

注解

框架保证DataLoader的数据加载顺序与用户提供的数据源读取顺序一致。

创建一个DataLoader对象用于加载Python生成器产生的数据。数据会由Python线程预先读取,并异步送入一个队列中。

本方法创建的DataLoader对象提供了3个方法设置数据源,分别是 set_sample_generator , set_sample_list_generatorset_batch_generator 。请查阅下述示例代码了解它们的使用方法。

如果iterable = True,本方法创建的DataLoader对象是一个Python生成器,可以for-range的方法循环迭代。

如果iterable = False,本方法创建的DataLoader对象提供 start()reset() 方法控制数据读取过程。

参数:

  • feed_list (list(Tensor)|tuple(Tensor)) - feed变量列表,由 paddle.static.data() 创建。

  • capacity (int) - DataLoader对象内部维护队列的容量大小。单位是batch数量。若reader读取速度较快,建议设置较大的capacity值。

  • use_double_buffer (bool) - 是否使用 double_buffer_reader 。若use_double_buffer=True,DataLoader会异步地预读取下一个batch的数据,可加速数据读取过程,但同时会占用少量的CPU/GPU存储,即一个batch输入数据的存储空间。

  • iterable (bool) - 所创建的DataLoader对象是否可迭代。

  • return_list (bool) - 每个设备上的数据是否以list形式返回。仅在iterable = True模式下有效。若return_list = False,每个设备上的返回数据均是str -> LoDTensor的映射表,其中映射表的key是每个输入变量的名称。若return_list = True,则每个设备上的返回数据均是list(LoDTensor)。推荐在静态图模式下使用return_list = False,在动态图模式下使用return_list = True。

  • use_multiprocess (bool) - 设置是否是用多进程加速动态图的数据载入过程。注意:该参数的设置仅在动态图模式下有效, 在静态图模式下,该参数设置与否均无任何影响。默认值为False。

  • drop_last (bool): 是否丢弃最后的不足CPU/GPU设备数的批次。默认值为True。在网络训练时,用户不能设置drop_last=False,此时所有CPU/GPU设备均应从DataLoader中读取到数据。在网络预测时,用户可以设置drop_last=False,此时最后不足CPU/GPU设备数的批次可以进行预测。

返回: 被创建的DataLoader对象

返回类型: loader (DataLoader)

代码示例 1

  1. '''
  2. Example in static graph mode
  3. '''
  4. import numpy as np
  5. import paddle
  6. import paddle.static as static
  7. import paddle.nn.functional as F
  8. BATCH_NUM = 10
  9. BATCH_SIZE = 16
  10. EPOCH_NUM = 4
  11. CLASS_NUM = 10
  12. ITERABLE = True # whether the created DataLoader object is iterable
  13. USE_GPU = False # whether to use GPU
  14. DATA_FORMAT = 'batch_generator' # data format of data source user provides
  15. paddle.enable_static()
  16. def simple_net(image, label):
  17. fc_tmp = static.nn.fc(image, size=CLASS_NUM)
  18. cross_entropy = F.softmax_with_cross_entropy(image, label)
  19. loss = paddle.mean(cross_entropy)
  20. sgd = paddle.optimizer.SGD(learning_rate=1e-3)
  21. sgd.minimize(loss)
  22. return loss
  23. def get_random_images_and_labels(image_shape, label_shape):
  24. image = np.random.random(size=image_shape).astype('float32')
  25. label = np.random.random(size=label_shape).astype('int64')
  26. return image, label
  27. # If the data generator yields one sample each time,
  28. # use DataLoader.set_sample_generator to set the data source.
  29. def sample_generator_creator():
  30. def __reader__():
  31. for _ in range(BATCH_NUM * BATCH_SIZE):
  32. image, label = get_random_images_and_labels([784], [1])
  33. yield image, label
  34. return __reader__
  35. # If the data generator yield list of samples each time,
  36. # use DataLoader.set_sample_list_generator to set the data source.
  37. def sample_list_generator_creator():
  38. def __reader__():
  39. for _ in range(BATCH_NUM):
  40. sample_list = []
  41. for _ in range(BATCH_SIZE):
  42. image, label = get_random_images_and_labels([784], [1])
  43. sample_list.append([image, label])
  44. yield sample_list
  45. return __reader__
  46. # If the data generator yields a batch each time,
  47. # use DataLoader.set_batch_generator to set the data source.
  48. def batch_generator_creator():
  49. def __reader__():
  50. for _ in range(BATCH_NUM):
  51. batch_image, batch_label = get_random_images_and_labels([BATCH_SIZE, 784], [BATCH_SIZE, 1])
  52. yield batch_image, batch_label
  53. return __reader__
  54. # If DataLoader is iterable, use for loop to train the network
  55. def train_iterable(exe, prog, loss, loader):
  56. for _ in range(EPOCH_NUM):
  57. for data in loader():
  58. exe.run(prog, feed=data, fetch_list=[loss])
  59. # If DataLoader is not iterable, use start() and reset() method to control the process
  60. def train_non_iterable(exe, prog, loss, loader):
  61. for _ in range(EPOCH_NUM):
  62. loader.start() # call DataLoader.start() before each epoch starts
  63. try:
  64. while True:
  65. exe.run(prog, fetch_list=[loss])
  66. except paddle.core.EOFException:
  67. loader.reset() # call DataLoader.reset() after catching EOFException
  68. def set_data_source(loader, places):
  69. if DATA_FORMAT == 'sample_generator':
  70. loader.set_sample_generator(sample_generator_creator(), batch_size=BATCH_SIZE, drop_last=True, places=places)
  71. elif DATA_FORMAT == 'sample_list_generator':
  72. loader.set_sample_list_generator(sample_list_generator_creator(), places=places)
  73. elif DATA_FORMAT == 'batch_generator':
  74. loader.set_batch_generator(batch_generator_creator(), places=places)
  75. else:
  76. raise ValueError('Unsupported data format')
  77. image = static.data(name='image', shape=[None, 784], dtype='float32')
  78. label = static.data(name='label', shape=[None, 1], dtype='int64')
  79. # Define DataLoader
  80. loader = paddle.io.DataLoader.from_generator(feed_list=[image, label], capacity=16, iterable=ITERABLE)
  81. # Define network
  82. loss = simple_net(image, label)
  83. # Set data source of DataLoader
  84. #
  85. # If DataLoader is iterable, places must be given and the number of places must be the same with device number.
  86. # - If you are using GPU, call `paddle.static.cuda_places()` to get all GPU places.
  87. # - If you are using CPU, call `paddle.static.cpu_places()` to get all CPU places.
  88. #
  89. # If DataLoader is not iterable, places can be None.
  90. places = static.cuda_places() if USE_GPU else static.cpu_places()
  91. set_data_source(loader, places)
  92. exe = static.Executor(places[0])
  93. exe.run(static.default_startup_program())
  94. prog = static.CompiledProgram(static.default_main_program()).with_data_parallel(loss_name=loss.name)
  95. if loader.iterable:
  96. train_iterable(exe, prog, loss, loader)
  97. else:
  98. train_non_iterable(exe, prog, loss, loader)

代码示例 2

  1. '''
  2. Example in dynamic graph mode.
  3. '''
  4. import numpy as np
  5. import paddle
  6. import paddle.nn as nn
  7. import paddle.optimizer as opt
  8. import paddle.distributed as dist
  9. BATCH_SIZE = 16
  10. BATCH_NUM = 4
  11. EPOCH_NUM = 4
  12. IMAGE_SIZE = 784
  13. CLASS_NUM = 1
  14. USE_GPU = False # whether to use GPU
  15. def _get_random_images_and_labels(image_shape, label_shape):
  16. image = np.random.random(size=image_shape).astype('float32')
  17. label = np.random.random(size=label_shape).astype('int64')
  18. return image, label
  19. def __reader__():
  20. for _ in range(BATCH_NUM):
  21. batch_image, batch_label = _get_random_images_and_labels(
  22. [BATCH_SIZE, IMAGE_SIZE], [BATCH_SIZE, CLASS_NUM])
  23. yield batch_image, batch_label
  24. def random_batch_reader():
  25. return __reader__
  26. class LinearNet(nn.Layer):
  27. def __init__(self):
  28. super(LinearNet, self).__init__()
  29. self._linear = nn.Linear(IMAGE_SIZE, CLASS_NUM)
  30. @paddle.jit.to_static
  31. def forward(self, x):
  32. return self._linear(x)
  33. # set device
  34. paddle.set_device('gpu' if USE_GPU else 'cpu')
  35. # create network
  36. layer = LinearNet()
  37. dp_layer = paddle.DataParallel(layer)
  38. loss_fn = nn.CrossEntropyLoss()
  39. adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters())
  40. # create data loader
  41. loader = paddle.io.DataLoader.from_generator(capacity=5)
  42. loader.set_batch_generator(random_batch_reader())
  43. for epoch_id in range(EPOCH_NUM):
  44. for batch_id, (image, label) in enumerate(loader()):
  45. out = layer(image)
  46. loss = loss_fn(out, label)
  47. loss.backward()
  48. adam.step()
  49. adam.clear_grad()
  50. print("Epoch {} batch {}: loss = {}".format(
  51. epoch_id, batch_id, np.mean(loss.numpy())))

代码示例 3

  1. '''
  2. Example of `drop_last` using in static graph multi-cards mode
  3. '''
  4. import paddle
  5. import paddle.static as static
  6. import numpy as np
  7. import os
  8. # We use 2 CPU cores to run inference network
  9. os.environ['CPU_NUM'] = '2'
  10. paddle.enable_static()
  11. # The data source has only 3 batches, which can not be
  12. # divided evenly to each CPU core
  13. def batch_generator():
  14. for i in range(3):
  15. yield np.array([i+1]).astype('float32'),
  16. x = static.data(name='x', shape=[None], dtype='float32')
  17. y = x * x
  18. def run_inference(drop_last):
  19. loader = paddle.io.DataLoader.from_generator(feed_list=[x],
  20. capacity=8, drop_last=drop_last)
  21. loader.set_batch_generator(batch_generator, static.cpu_places())
  22. exe = static.Executor(paddle.CPUPlace())
  23. prog = static.CompiledProgram(static.default_main_program())
  24. prog = prog.with_data_parallel()
  25. result = []
  26. for data in loader():
  27. each_ret, = exe.run(prog, feed=data, fetch_list=[y])
  28. result.extend(each_ret)
  29. return result
  30. # Set drop_last to True, so that the last batch whose
  31. # number is less than CPU core number would be discarded.
  32. print(run_inference(drop_last=True)) # [1.0, 4.0]
  33. # Set drop_last to False, so that the last batch whose
  34. # number is less than CPU core number can be tested.
  35. print(run_inference(drop_last=False)) # [1.0, 4.0, 9.0]

from_dataset ( dataset, places, drop_last=True )

警告

这个API将在未来版本废弃,推荐使用支持多进程并发加速的 paddle.io.DataLoader

创建一个DataLoader对象用于加载Dataset产生的数据。目前,Dataset仅支持Linux系统下使用。

参数:

  • dataset (InMemoryDataset|QueueDataset) - Dataset对象。

  • places (list(CUDAPlace)|list(CPUPlace)) - DataLoader对象返回数据所在的place。

  • drop_last (bool) - 是否丢弃最后样本数量不足batch size的batch。若drop_last = True则丢弃,若drop_last = False则不丢弃。

返回: 被创建的DataLoader对象,可以for-range的方式循环迭代

返回类型: loader (DataLoader)

代码示例

  1. import paddle
  2. import paddle.static as static
  3. paddle.enable_static()
  4. image = static.data(name='image', shape=[None, 784], dtype='float32')
  5. label = static.data(name='label', shape=[None, 1], dtype='int64')
  6. dataset = paddle.distributed.QueueDataset()
  7. dataset.init(
  8. batch_size=32,
  9. pipe_command='cat',
  10. use_var=[image, label])
  11. dataset.set_filelist(['a.txt', 'b.txt', 'c.txt'])
  12. loader = paddle.io.DataLoader.from_dataset(dataset, static.cpu_places())