5-1 Dataset

All the data could be read into memory for training to maximize the efficiency, if the volume of training data is small (e.g. < 1 GB)

However, if the data volume is huge (e.g. > 10 GB) which is not possible to load everything into the memory, they should be devided into batches before reading.

The API tf.data constructs input data pipeline to help manage huge volume of data with various formats and conversions.

1. Constructing Data Pipeline

Data pipeline could be constructed through following methods: numpy array, pandas DataFrame, Python generator, csv file, text file, file path, tfrecords file.

Among these methods, the most popular ones are: numpy array, pandas DataFrame and file path.

The drawback of using tfrecords file to construct data pipelines is its complication, since it requires: (a) construct tf.Example from samples; (b) compress tf.Example into string and write it to tfrecords file; (c) when using these data, the tfrecords file have to be read and analyzed into tf.Example.

On the other hand, the advantage of using tfrecords files is its small volume after compression, its convenient sharing through the Internet, and the fast speed of loading.

1.1 Constructing Data Pipeline through Numpy Array

  1. # Constructing Data Pipeline through Numpy Array
  2. import tensorflow as tf
  3. import numpy as np
  4. from sklearn import datasets
  5. iris = datasets.load_iris()
  6. ds1 = tf.data.Dataset.from_tensor_slices((iris["data"],iris["target"]))
  7. for features,label in ds1.take(5):
  8. print(features,label)
  1. tf.Tensor([5.1 3.5 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor([4.9 3. 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  3. tf.Tensor([4.7 3.2 1.3 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  4. tf.Tensor([4.6 3.1 1.5 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)
  5. tf.Tensor([5. 3.6 1.4 0.2], shape=(4,), dtype=float64) tf.Tensor(0, shape=(), dtype=int64)

1.2 Constructing Data Pipeline through Pandas DataFrame

  1. # Constructing Data Pipeline through Pandas DataFrame
  2. import tensorflow as tf
  3. from sklearn import datasets
  4. import pandas as pd
  5. iris = datasets.load_iris()
  6. dfiris = pd.DataFrame(iris["data"],columns = iris.feature_names)
  7. ds2 = tf.data.Dataset.from_tensor_slices((dfiris.to_dict("list"),iris["target"]))
  8. for features,label in ds2.take(3):
  9. print(features,label)
  1. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=5.1>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.5>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
  2. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.9>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.0>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.4>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)
  3. {'sepal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=4.7>, 'sepal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=3.2>, 'petal length (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=1.3>, 'petal width (cm)': <tf.Tensor: shape=(), dtype=float32, numpy=0.2>} tf.Tensor(0, shape=(), dtype=int64)

1.3 Constructing Data Pipeline through Python generator

  1. # Constructing Data Pipeline through Python generator
  2. import tensorflow as tf
  3. from matplotlib import pyplot as plt
  4. from tensorflow.keras.preprocessing.image import ImageDataGenerator
  5. # Defining a generator to read image from a folder
  6. image_generator = ImageDataGenerator(rescale=1.0/255).flow_from_directory(
  7. "../data/cifar2/test/",
  8. target_size=(32, 32),
  9. batch_size=20,
  10. class_mode='binary')
  11. classdict = image_generator.class_indices
  12. print(classdict)
  13. def generator():
  14. for features,label in image_generator:
  15. yield (features,label)
  16. ds3 = tf.data.Dataset.from_generator(generator,output_types=(tf.float32,tf.int32))
  1. %matplotlib inline
  2. %config InlineBackend.figure_format = 'svg'
  3. plt.figure(figsize=(6,6))
  4. for i,(img,label) in enumerate(ds3.unbatch().take(9)):
  5. ax=plt.subplot(3,3,i+1)
  6. ax.imshow(img.numpy())
  7. ax.set_title("label = %d"%label)
  8. ax.set_xticks([])
  9. ax.set_yticks([])
  10. plt.show()

1.4 Constructing Data Pipeline through csv file

  1. # Constructing Data Pipeline through csv file
  2. ds4 = tf.data.experimental.make_csv_dataset(
  3. file_pattern = ["../data/titanic/train.csv","../data/titanic/test.csv"],
  4. batch_size=3,
  5. label_name="Survived",
  6. na_value="",
  7. num_epochs=1,
  8. ignore_errors=True)
  9. for data,label in ds4.take(2):
  10. print(data,label)
  1. OrderedDict([('PassengerId', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([540, 58, 764], dtype=int32)>), ('Pclass', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([1, 3, 1], dtype=int32)>), ('Name', <tf.Tensor: shape=(3,), dtype=string, numpy=
  2. array([b'Frolicher, Miss. Hedwig Margaritha', b'Novel, Mr. Mansouer',
  3. b'Carter, Mrs. William Ernest (Lucile Polk)'], dtype=object)>), ('Sex', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'female', b'male', b'female'], dtype=object)>), ('Age', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([22. , 28.5, 36. ], dtype=float32)>), ('SibSp', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 0, 1], dtype=int32)>), ('Parch', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([2, 0, 2], dtype=int32)>), ('Ticket', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'13568', b'2697', b'113760'], dtype=object)>), ('Fare', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ 49.5 , 7.2292, 120. ], dtype=float32)>), ('Cabin', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'B39', b'', b'B96 B98'], dtype=object)>), ('Embarked', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'C', b'C', b'S'], dtype=object)>)]) tf.Tensor([1 0 1], shape=(3,), dtype=int32)
  4. OrderedDict([('PassengerId', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([845, 66, 390], dtype=int32)>), ('Pclass', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([3, 3, 2], dtype=int32)>), ('Name', <tf.Tensor: shape=(3,), dtype=string, numpy=
  5. array([b'Culumovic, Mr. Jeso', b'Moubarek, Master. Gerios',
  6. b'Lehmann, Miss. Bertha'], dtype=object)>), ('Sex', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'male', b'male', b'female'], dtype=object)>), ('Age', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([17., 0., 17.], dtype=float32)>), ('SibSp', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 1, 0], dtype=int32)>), ('Parch', <tf.Tensor: shape=(3,), dtype=int32, numpy=array([0, 1, 0], dtype=int32)>), ('Ticket', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'315090', b'2661', b'SC 1748'], dtype=object)>), ('Fare', <tf.Tensor: shape=(3,), dtype=float32, numpy=array([ 8.6625, 15.2458, 12. ], dtype=float32)>), ('Cabin', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'', b'', b''], dtype=object)>), ('Embarked', <tf.Tensor: shape=(3,), dtype=string, numpy=array([b'S', b'C', b'C'], dtype=object)>)]) tf.Tensor([0 1 1], shape=(3,), dtype=int32)

1.5 Constructing Data Pipeline through text file

  1. # Constructing Data Pipeline through text file
  2. ds5 = tf.data.TextLineDataset(
  3. filenames = ["../data/titanic/train.csv","../data/titanic/test.csv"]
  4. ).skip(1) # Omitting the header on the first line
  5. for line in ds5.take(5):
  6. print(line)
  1. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  2. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  3. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  4. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)
  5. tf.Tensor(b'687,0,3,"Panula, Mr. Jaako Arnold",male,14.0,4,1,3101295,39.6875,,S', shape=(), dtype=string)

1.6 Constructing Data Pipeline through file path

  1. ds6 = tf.data.Dataset.list_files("../data/cifar2/train/*/*.jpg")
  2. for file in ds6.take(5):
  3. print(file)
  1. tf.Tensor(b'../data/cifar2/train/automobile/1263.jpg', shape=(), dtype=string)
  2. tf.Tensor(b'../data/cifar2/train/airplane/2837.jpg', shape=(), dtype=string)
  3. tf.Tensor(b'../data/cifar2/train/airplane/4264.jpg', shape=(), dtype=string)
  4. tf.Tensor(b'../data/cifar2/train/automobile/4241.jpg', shape=(), dtype=string)
  5. tf.Tensor(b'../data/cifar2/train/automobile/192.jpg', shape=(), dtype=string)
  1. from matplotlib import pyplot as plt
  2. def load_image(img_path,size = (32,32)):
  3. label = 1 if tf.strings.regex_full_match(img_path,".*/automobile/.*") else 0
  4. img = tf.io.read_file(img_path)
  5. img = tf.image.decode_jpeg(img) # Note that we are using jpeg format
  6. img = tf.image.resize(img,size)
  7. return(img,label)
  8. %matplotlib inline
  9. %config InlineBackend.figure_format = 'svg'
  10. for i,(img,label) in enumerate(ds6.map(load_image).take(2)):
  11. plt.figure(i)
  12. plt.imshow((img/255.0).numpy())
  13. plt.title("label = %d"%label)
  14. plt.xticks([])
  15. plt.yticks([])

1.7 Constructing Data Pipeline through tfrecords file

  1. import os
  2. import numpy as np
  3. # inpath is the original data path; outpath: output path of the TFRecord file
  4. def create_tfrecords(inpath,outpath):
  5. writer = tf.io.TFRecordWriter(outpath)
  6. dirs = os.listdir(inpath)
  7. for index, name in enumerate(dirs):
  8. class_path = inpath +"/"+ name+"/"
  9. for img_name in os.listdir(class_path):
  10. img_path = class_path + img_name
  11. img = tf.io.read_file(img_path)
  12. #img = tf.image.decode_image(img)
  13. #img = tf.image.encode_jpeg(img) # Use jpeg format for all the compressions
  14. example = tf.train.Example(
  15. features=tf.train.Features(feature={
  16. 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[index])),
  17. 'img_raw': tf.train.Feature(bytes_list=tf.train.BytesList(value=[img.numpy()]))
  18. }))
  19. writer.write(example.SerializeToString())
  20. writer.close()
  21. create_tfrecords("../data/cifar2/test/","../data/cifar2_test.tfrecords/")
  1. from matplotlib import pyplot as plt
  2. def parse_example(proto):
  3. description ={ 'img_raw' : tf.io.FixedLenFeature([], tf.string),
  4. 'label': tf.io.FixedLenFeature([], tf.int64)}
  5. example = tf.io.parse_single_example(proto, description)
  6. img = tf.image.decode_jpeg(example["img_raw"]) # Note that we are using jpeg format
  7. img = tf.image.resize(img, (32,32))
  8. label = example["label"]
  9. return(img,label)
  10. ds7 = tf.data.TFRecordDataset("../data/cifar2_test.tfrecords").map(parse_example).shuffle(3000)
  11. %matplotlib inline
  12. %config InlineBackend.figure_format = 'svg'
  13. plt.figure(figsize=(6,6))
  14. for i,(img,label) in enumerate(ds7.take(9)):
  15. ax=plt.subplot(3,3,i+1)
  16. ax.imshow((img/255.0).numpy())
  17. ax.set_title("label = %d"%label)
  18. ax.set_xticks([])
  19. ax.set_yticks([])
  20. plt.show()

2. Applying Data Conversion

Dataset is very flexible in the application of data structure. Essentially it is a sequence with elements in various data types, such as tensor, list, dictionary and Dataset.

Dataset contains many functions of data conversion.

  • map: projecting the conversion function to every element in the dataset.

  • flat_map: projecting the conversion function to every element in the dataset, and flatten the embedded Dataset.

  • interleave: similar as flat_map but interleaves the data from different sources.

  • filter: filter certain elements.

  • zip: zipping two Datasets with the same length.

  • concatenate: concatenating two Datasets.

  • reduce: executing operation of reducing.

  • batch: constructing batches and release one batch each time; there will be one more rank comparing to the original data; the inverse operation is unbatch.

  • padded_batch: constructing batches, similar as batch, but can achieve padded shape.

  • window: constructing sliding window, and return Dataset of Dataset.

  • shuffle: shuffling the order of the data.

  • repeat: repeat the data certain times; if no argument is specified, repeat data with infinitive times.

  • shard: sampling the elements starting from a certain position with fixed distance.

  • take: sampling the first few elements from a certain position.

  1. #map: projecting the conversion function to every element in the dataset.
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_map = ds.map(lambda x:tf.strings.split(x," "))
  4. for x in ds_map:
  5. print(x)
  1. tf.Tensor([b'hello' b'world'], shape=(2,), dtype=string)
  2. tf.Tensor([b'hello' b'China'], shape=(2,), dtype=string)
  3. tf.Tensor([b'hello' b'Beijing'], shape=(2,), dtype=string)
  1. #flat_map: projecting the conversion function to every element in the dataset, and flatten the embedded Dataset.
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_flatmap = ds.flat_map(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
  4. for x in ds_flatmap:
  5. print(x)
  1. tf.Tensor(b'hello', shape=(), dtype=string)
  2. tf.Tensor(b'world', shape=(), dtype=string)
  3. tf.Tensor(b'hello', shape=(), dtype=string)
  4. tf.Tensor(b'China', shape=(), dtype=string)
  5. tf.Tensor(b'hello', shape=(), dtype=string)
  6. tf.Tensor(b'Beijing', shape=(), dtype=string)
  1. # interleave: similar as `flat_map` but interleaves the data from different sources.
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. ds_interleave = ds.interleave(lambda x:tf.data.Dataset.from_tensor_slices(tf.strings.split(x," ")))
  4. for x in ds_interleave:
  5. print(x)
  1. tf.Tensor(b'hello', shape=(), dtype=string)
  2. tf.Tensor(b'hello', shape=(), dtype=string)
  3. tf.Tensor(b'hello', shape=(), dtype=string)
  4. tf.Tensor(b'world', shape=(), dtype=string)
  5. tf.Tensor(b'China', shape=(), dtype=string)
  6. tf.Tensor(b'Beijing', shape=(), dtype=string)
  1. #filter: filter certain elements.
  2. ds = tf.data.Dataset.from_tensor_slices(["hello world","hello China","hello Beijing"])
  3. # Find the element with letter'a' or 'B'
  4. ds_filter = ds.filter(lambda x: tf.strings.regex_full_match(x, ".*[a|B].*"))
  5. for x in ds_filter:
  6. print(x)
  1. tf.Tensor(b'hello China', shape=(), dtype=string)
  2. tf.Tensor(b'hello Beijing', shape=(), dtype=string)
  1. #zip: zipping two Datasets with the same length.
  2. ds1 = tf.data.Dataset.range(0,3)
  3. ds2 = tf.data.Dataset.range(3,6)
  4. ds3 = tf.data.Dataset.range(6,9)
  5. ds_zip = tf.data.Dataset.zip((ds1,ds2,ds3))
  6. for x,y,z in ds_zip:
  7. print(x.numpy(),y.numpy(),z.numpy())
  1. 0 3 6
  2. 1 4 7
  3. 2 5 8
  1. #condatenate: concatenating two Datasets.
  2. ds1 = tf.data.Dataset.range(0,3)
  3. ds2 = tf.data.Dataset.range(3,6)
  4. ds_concat = tf.data.Dataset.concatenate(ds1,ds2)
  5. for x in ds_concat:
  6. print(x)
  1. tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor(1, shape=(), dtype=int64)
  3. tf.Tensor(2, shape=(), dtype=int64)
  4. tf.Tensor(3, shape=(), dtype=int64)
  5. tf.Tensor(4, shape=(), dtype=int64)
  6. tf.Tensor(5, shape=(), dtype=int64)
  1. #reduce: executing operation of reducing.
  2. ds = tf.data.Dataset.from_tensor_slices([1,2,3,4,5.0])
  3. result = ds.reduce(0.0,lambda x,y:tf.add(x,y))
  4. result
  1. <tf.Tensor: shape=(), dtype=float32, numpy=15.0>
  1. #batch: constructing batches and release one batch each time; there will be one more rank comparing to the original data; the inverse operation is `unbatch`.
  2. ds = tf.data.Dataset.range(12)
  3. ds_batch = ds.batch(4)
  4. for x in ds_batch:
  5. print(x)
  1. tf.Tensor([0 1 2 3], shape=(4,), dtype=int64)
  2. tf.Tensor([4 5 6 7], shape=(4,), dtype=int64)
  3. tf.Tensor([ 8 9 10 11], shape=(4,), dtype=int64)
  1. #padded_batch: constructing batches, similar as `batch`, but can achieve padded shape.
  2. elements = [[1, 2],[3, 4, 5],[6, 7],[8]]
  3. ds = tf.data.Dataset.from_generator(lambda: iter(elements), tf.int32)
  4. ds_padded_batch = ds.padded_batch(2,padded_shapes = [4,])
  5. for x in ds_padded_batch:
  6. print(x)
  1. tf.Tensor(
  2. [[1 2 0 0]
  3. [3 4 5 0]], shape=(2, 4), dtype=int32)
  4. tf.Tensor(
  5. [[6 7 0 0]
  6. [8 0 0 0]], shape=(2, 4), dtype=int32)
  1. #window: constructing sliding window, and return Dataset of Dataset.
  2. ds = tf.data.Dataset.range(12)
  3. # window returns Dataset of Dataset, which could be flattened by flat_map
  4. ds_window = ds.window(3, shift=1).flat_map(lambda x: x.batch(3,drop_remainder=True))
  5. for x in ds_window:
  6. print(x)
  1. tf.Tensor([0 1 2], shape=(3,), dtype=int64)
  2. tf.Tensor([1 2 3], shape=(3,), dtype=int64)
  3. tf.Tensor([2 3 4], shape=(3,), dtype=int64)
  4. tf.Tensor([3 4 5], shape=(3,), dtype=int64)
  5. tf.Tensor([4 5 6], shape=(3,), dtype=int64)
  6. tf.Tensor([5 6 7], shape=(3,), dtype=int64)
  7. tf.Tensor([6 7 8], shape=(3,), dtype=int64)
  8. tf.Tensor([7 8 9], shape=(3,), dtype=int64)
  9. tf.Tensor([ 8 9 10], shape=(3,), dtype=int64)
  10. tf.Tensor([ 9 10 11], shape=(3,), dtype=int64)
  1. #shuffle: shuffling the order of the data.
  2. ds = tf.data.Dataset.range(12)
  3. ds_shuffle = ds.shuffle(buffer_size = 5)
  4. for x in ds_shuffle:
  5. print(x)
  1. tf.Tensor(1, shape=(), dtype=int64)
  2. tf.Tensor(4, shape=(), dtype=int64)
  3. tf.Tensor(0, shape=(), dtype=int64)
  4. tf.Tensor(6, shape=(), dtype=int64)
  5. tf.Tensor(5, shape=(), dtype=int64)
  6. tf.Tensor(2, shape=(), dtype=int64)
  7. tf.Tensor(7, shape=(), dtype=int64)
  8. tf.Tensor(11, shape=(), dtype=int64)
  9. tf.Tensor(3, shape=(), dtype=int64)
  10. tf.Tensor(9, shape=(), dtype=int64)
  11. tf.Tensor(10, shape=(), dtype=int64)
  12. tf.Tensor(8, shape=(), dtype=int64)
  1. #repeat: repeat the data certain times; if no argument is specified, repeat data with infinitive times.
  2. ds = tf.data.Dataset.range(3)
  3. ds_repeat = ds.repeat(3)
  4. for x in ds_repeat:
  5. print(x)
  1. tf.Tensor(0, shape=(), dtype=int64)
  2. tf.Tensor(1, shape=(), dtype=int64)
  3. tf.Tensor(2, shape=(), dtype=int64)
  4. tf.Tensor(0, shape=(), dtype=int64)
  5. tf.Tensor(1, shape=(), dtype=int64)
  6. tf.Tensor(2, shape=(), dtype=int64)
  7. tf.Tensor(0, shape=(), dtype=int64)
  8. tf.Tensor(1, shape=(), dtype=int64)
  9. tf.Tensor(2, shape=(), dtype=int64)
  1. #shard: sampling the elements starting from a certain position with fixed distance.
  2. ds = tf.data.Dataset.range(12)
  3. ds_shard = ds.shard(3,index = 1)
  4. for x in ds_shard:
  5. print(x)
  1. tf.Tensor(1, shape=(), dtype=int64)
  2. tf.Tensor(4, shape=(), dtype=int64)
  3. tf.Tensor(7, shape=(), dtype=int64)
  4. tf.Tensor(10, shape=(), dtype=int64)
  1. #take: sampling the first few elements from a certain position.
  2. ds = tf.data.Dataset.range(12)
  3. ds_take = ds.take(3)
  4. list(ds_take.as_numpy_iterator())
  1. [0, 1, 2]

3. Enhance the Efficiency of the Pipeline

The training of deep learning model could be lengthy.

The consumed time is mainly consists of two parts: data preparation and parameter iteration.

The efficiency of parameter iteration is ususlly enhanced by GPU.

The efficiency of data preparation could be improved by constructing high-efficiency data pipeline.

Below are several suggestions of constructing high-efficiency data pipeline:

  • 1, Paralleling the data preparation and the parameter iteration using method prefetch.

  • 2, Use the method interleave to read data with multi-process and interleave the data from different sources.

  • 3, Set num_parallel_calls during using map, allowing data conversion with multiple process.

  • 4, Apply method cache to cache data into the memory after the first epoch for the case with a small data volume.

  • 5, When converting with map, batch the data first, and then convert each batch with vecterization.

3.1 Paralleling the data preparation and the parameter iteration using method prefetch.

  1. import tensorflow as tf
  2. # Time stamp
  3. @tf.function
  4. def printbar():
  5. ts = tf.timestamp()
  6. today_ts = ts%(24*60*60)
  7. hour = tf.cast(today_ts//3600+8,tf.int32)%tf.constant(24)
  8. minite = tf.cast((today_ts%3600)//60,tf.int32)
  9. second = tf.cast(tf.floor(today_ts%60),tf.int32)
  10. def timeformat(m):
  11. if tf.strings.length(tf.strings.format("{}",m))==1:
  12. return(tf.strings.format("0{}",m))
  13. else:
  14. return(tf.strings.format("{}",m))
  15. timestring = tf.strings.join([timeformat(hour),timeformat(minite),
  16. timeformat(second)],separator = ":")
  17. tf.print("=========="*8,end = "")
  18. tf.print(timestring)
  1. import time
  2. # Data preparation and parameter iteration is serial as default.
  3. # Simulation of data preparation
  4. def generator():
  5. for i in range(10):
  6. # Suppose we need 2 seconds for each preparation
  7. time.sleep(2)
  8. yield i
  9. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))
  10. # Simulation of parameter iteration
  11. def train_step():
  12. # Suppose we need 1 seconds for each training step
  13. time.sleep(1)
  1. # Estimated time of training: 10*2+10*1 = 30s
  2. printbar()
  3. tf.print(tf.constant("start training..."))
  4. for x in ds:
  5. train_step()
  6. printbar()
  7. tf.print(tf.constant("end training..."))
  1. # Use method prefetch to parallel the processes of data preparation and parameter iteration.
  2. # Estimated time of training: max(10*2,10*1) = 20s
  3. printbar()
  4. tf.print(tf.constant("start training with prefetch..."))
  5. # tf.data.experimental.AUTOTUNE allows auto-selection of parameters
  6. for x in ds.prefetch(buffer_size = tf.data.experimental.AUTOTUNE):
  7. train_step()
  8. printbar()
  9. tf.print(tf.constant("end training..."))

3.2 Use the method interleave to read data with multi-process and interleave the data from different sources.

  1. ds_files = tf.data.Dataset.list_files("../data/titanic/*.csv")
  2. ds = ds_files.flat_map(lambda x:tf.data.TextLineDataset(x).skip(1))
  3. for line in ds.take(4):
  4. print(line)
  1. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  2. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  3. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  4. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)
  1. ds_files = tf.data.Dataset.list_files("../data/titanic/*.csv")
  2. ds = ds_files.interleave(lambda x:tf.data.TextLineDataset(x).skip(1))
  3. for line in ds.take(8):
  4. print(line)
  1. tf.Tensor(b'181,0,3,"Sage, Miss. Constance Gladys",female,,8,2,CA. 2343,69.55,,S', shape=(), dtype=string)
  2. tf.Tensor(b'493,0,1,"Molson, Mr. Harry Markland",male,55.0,0,0,113787,30.5,C30,S', shape=(), dtype=string)
  3. tf.Tensor(b'405,0,3,"Oreskovic, Miss. Marija",female,20.0,0,0,315096,8.6625,,S', shape=(), dtype=string)
  4. tf.Tensor(b'53,1,1,"Harper, Mrs. Henry Sleeper (Myna Haxtun)",female,49.0,1,0,PC 17572,76.7292,D33,C', shape=(), dtype=string)
  5. tf.Tensor(b'635,0,3,"Skoog, Miss. Mabel",female,9.0,3,2,347088,27.9,,S', shape=(), dtype=string)
  6. tf.Tensor(b'388,1,2,"Buss, Miss. Kate",female,36.0,0,0,27849,13.0,,S', shape=(), dtype=string)
  7. tf.Tensor(b'701,1,1,"Astor, Mrs. John Jacob (Madeleine Talmadge Force)",female,18.0,1,0,PC 17757,227.525,C62 C64,C', shape=(), dtype=string)
  8. tf.Tensor(b'192,0,2,"Carbines, Mr. William",male,19.0,0,0,28424,13.0,,S', shape=(), dtype=string)

3.3 Set num_parallel_calls during using map, allowing data conversion with multiple process.

  1. ds = tf.data.Dataset.list_files("../data/cifar2/train/*/*.jpg")
  2. def load_image(img_path,size = (32,32)):
  3. label = 1 if tf.strings.regex_full_match(img_path,".*/automobile/.*") else 0
  4. img = tf.io.read_file(img_path)
  5. img = tf.image.decode_jpeg(img) #Note: jpeg format here
  6. img = tf.image.resize(img,size)
  7. return(img,label)
  1. # Conversion with single process
  2. printbar()
  3. tf.print(tf.constant("start transformation..."))
  4. ds_map = ds.map(load_image)
  5. for _ in ds_map:
  6. pass
  7. printbar()
  8. tf.print(tf.constant("end transformation..."))
  1. # Conversion with multi-process
  2. printbar()
  3. tf.print(tf.constant("start parallel transformation..."))
  4. ds_map_parallel = ds.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE)
  5. for _ in ds_map_parallel:
  6. pass
  7. printbar()
  8. tf.print(tf.constant("end parallel transformation..."))

3.4 Apply method cache to cache data into the memory after the first epoch for the case with a small data volume.

  1. import time
  2. # Simulation of data preparation
  3. def generator():
  4. for i in range(5):
  5. # Suppose we need 2 seconds for each preparation
  6. time.sleep(2)
  7. yield i
  8. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32))
  9. # Simulation of parameter iteration模拟参数迭代
  10. def train_step():
  11. # Suppose we need 1 second for each training step
  12. pass
  13. # Estimated time for training: (5*2+5*0)*3 = 30s
  14. printbar()
  15. tf.print(tf.constant("start training..."))
  16. for epoch in tf.range(3):
  17. for x in ds:
  18. train_step()
  19. printbar()
  20. tf.print("epoch =",epoch," ended")
  21. printbar()
  22. tf.print(tf.constant("end training..."))
  1. import time
  2. # Simulation of data preparation
  3. def generator():
  4. for i in range(5):
  5. # Suppose we need 2 seconds for each preparation
  6. time.sleep(2)
  7. yield i
  8. # Use the method "cache" to cache the data into the memory, only for dataset with small volume.
  9. ds = tf.data.Dataset.from_generator(generator,output_types = (tf.int32)).cache()
  10. # Simulation of parameter iteration
  11. def train_step():
  12. # Suppose each training step needs 0 second
  13. time.sleep(0)
  14. # Estimated time for training: (5*2+5*0)+(5*0+5*0)*2 = 10s
  15. printbar()
  16. tf.print(tf.constant("start training..."))
  17. for epoch in tf.range(3):
  18. for x in ds:
  19. train_step()
  20. printbar()
  21. tf.print("epoch =",epoch," ended")
  22. printbar()
  23. tf.print(tf.constant("end training..."))

3.5 When converting with map, batch the data first, and then convert each batch with vecterization.

  1. # Map first, then batch
  2. ds = tf.data.Dataset.range(100000)
  3. ds_map_batch = ds.map(lambda x:x**2).batch(20)
  4. printbar()
  5. tf.print(tf.constant("start scalar transformation..."))
  6. for x in ds_map_batch:
  7. pass
  8. printbar()
  9. tf.print(tf.constant("end scalar transformation..."))
  1. # Batch first, then map
  2. ds = tf.data.Dataset.range(100000)
  3. ds_batch_map = ds.batch(20).map(lambda x:x**2)
  4. printbar()
  5. tf.print(tf.constant("start vector transformation..."))
  6. for x in ds_batch_map:
  7. pass
  8. printbar()
  9. tf.print(tf.constant("end vector transformation..."))

