6-7 Call Tensorflow Model Using spark-scala

This section introduce how to use the trained TensorFlow model to predict in spark.

The prerequisite of this section is fundamental knowledge on spark and scala.

It is easier to use pyspark, since it only requires loading model with Python on each executor and predict separately.

For the consideration of the performance, the spark in scala version is the most popular.

The section shows how to use the trained TensorFlow model in spark through TensorFlow for Java.

It is possible to predit with the trained TensorFlow model in hundreds of thousands computers using the parallel computing feature of spark.

0 Using TensorFlow model in spark-scala

The necessary steps for predicting with trained TensorFlow model in spark (scala) are:

(1) Preparing protobuf model file

(2) Create a spark (scala) project, insert jar package dependencies for TensorFlow in java.

(3) Loading TensorFlow model on the driver end of spark (scala) project and debug it successfully.

(4) Loading TensorFlow model on executor of spark (scala) project through RDD and debug it successfully.

(5) Loading TensorFlow model on executor of spark (scala) project through Data and debug it successfully.

1. Preparing protobuf Model File

Here we train a simple linear regression model with tf.keras and save it as protobuf file.

  1. import tensorflow as tf
  2. from tensorflow.keras import models,layers,optimizers
  3. ## Number of samples
  4. n = 800
  5. ## Generating testing dataset
  6. X = tf.random.uniform([n,2],minval=-10,maxval=10)
  7. w0 = tf.constant([[2.0],[-1.0]])
  8. b0 = tf.constant(3.0)
  9. Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) # @ is matrix multiplication; adding Gaussian noise
  10. ## Modeling
  11. tf.keras.backend.clear_session()
  12. inputs = layers.Input(shape = (2,),name ="inputs") # Set the input name as "inputs"
  13. outputs = layers.Dense(1, name = "outputs")(inputs) # Set the output name as "outputs"
  14. linear = models.Model(inputs = inputs,outputs = outputs)
  15. linear.summary()
  16. ## Training with fit method
  17. linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"])
  18. linear.fit(X,Y,batch_size = 8,epochs = 100)
  19. tf.print("w = ",linear.layers[1].kernel)
  20. tf.print("b = ",linear.layers[1].bias)
  21. ## Save the model as pb format
  22. export_path = "../data/linear_model/"
  23. version = "1" # Version could be used for management of further updates
  24. linear.save(export_path+version, save_format="tf")
  1. !ls {export_path+version}
  1. # Check the info of the model file
  2. !saved_model_cli show --dir {export_path+str(version)} --all

The model file information marked red could be used later.

6-7 Call Tensorflow Model Using spark-scala - 图1

2. Create a spark (scala) project, insert jar package dependencies for TensorFlow in java.

Need to add the following jar package dependency if use maven to manage projects.

  1. <!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
  2. <dependency>
  3. <groupId>org.tensorflow</groupId>
  4. <artifactId>tensorflow</artifactId>
  5. <version>1.15.0</version>
  6. </dependency>

You may also download the jar package org.tensorflow.tensorflow, together with the depended org.tensorflow.libtensorflow and org.tensorflowlibtensorflow_jni from the following link, then add all of them into the project.

https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0

3. Loading TensorFlow model on the driver end of spark (scala) project and debug it successfully.

The following demonstration is run in jupyter notebook. We need to install toree to have it support spark(scala).

  1. import scala.collection.mutable.WrappedArray
  2. import org.{tensorflow=>tf}
  3. //Note: the second argument of the load function should be "serve"; the related info could be found from the model file.
  4. val bundle = tf.SavedModelBundle
  5. .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days-zh/data/linear_model/1","serve")
  6. //Note: for the Java version TensorFlow uses static graph as TensorFlow 1.X, i.e. use `Session`, then explicit data to feed and results to fetch, and finally run it.
  7. //Note: multiple feed methods could be used consequetively when we need to feed multiple data.
  8. //Note: the input must be in the type of float
  9. val sess = bundle.session()
  10. val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f)))
  11. val y = sess.runner().feed("serving_default_inputs:0", x)
  12. .fetch("StatefulPartitionedCall:0").run().get(0)
  13. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  14. y.copyTo(result)
  15. if(x != null) x.close()
  16. if(y != null) y.close()
  17. if(sess != null) sess.close()
  18. if(bundle != null) bundle.close()
  19. result

The output is:

  1. Array(Array(3.019596), Array(3.9878292))

6-7 Call Tensorflow Model Using spark-scala - 图2

4. Loading TensorFlow model on executor of spark (scala) project through RDD and debug it successfully

Here we transfer the TensorFlow model loaded on the Driver end to each executor through broadcasting, and predict with distributed computing on all the executors.

  1. import org.apache.spark.sql.SparkSession
  2. import scala.collection.mutable.WrappedArray
  3. import org.{tensorflow=>tf}
  4. val spark = SparkSession
  5. .builder()
  6. .appName("TfRDD")
  7. .enableHiveSupport()
  8. .getOrCreate()
  9. val sc = spark.sparkContext
  10. // Loading model on Driver end
  11. val bundle = tf.SavedModelBundle
  12. .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
  13. // Broadcasting the model to all the executors
  14. val broads = sc.broadcast(bundle)
  15. // Creating dataset
  16. val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f)))
  17. // Predicting in batch by using the model through mapPartitions
  18. val rdd_result = rdd_data.mapPartitions(iter => {
  19. val arr = iter.toArray
  20. val model = broads.value
  21. val sess = model.session()
  22. val x = tf.Tensor.create(arr)
  23. val y = sess.runner().feed("serving_default_inputs:0", x)
  24. .fetch("StatefulPartitionedCall:0").run().get(0)
  25. // Copy the prediction into the Array in type Float with the same shape
  26. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  27. y.copyTo(result)
  28. result.iterator
  29. })
  30. rdd_result.take(5)
  31. bundle.close

The output is:

  1. Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984))

6-7 Call Tensorflow Model Using spark-scala - 图3

5. Loading TensorFlow model on executor of spark (scala) project through Data and debug it successfully

The distributed prediction using TensorFlow model could also be implemented on DataFrame data, besides implementing on RDD data in Spark.

It could be done through registering the method of prediction as a sparkSQL function.

  1. import org.apache.spark.sql.SparkSession
  2. import scala.collection.mutable.WrappedArray
  3. import org.{tensorflow=>tf}
  4. object TfDataFrame extends Serializable{
  5. def main(args:Array[String]):Unit = {
  6. val spark = SparkSession
  7. .builder()
  8. .appName("TfDataFrame")
  9. .enableHiveSupport()
  10. .getOrCreate()
  11. val sc = spark.sparkContext
  12. import spark.implicits._
  13. val bundle = tf.SavedModelBundle
  14. .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
  15. val broads = sc.broadcast(bundle)
  16. // Construct the prediction function and register it as udf of sparkSQL
  17. val tfpredict = (features:WrappedArray[Float]) => {
  18. val bund = broads.value
  19. val sess = bund.session()
  20. val x = tf.Tensor.create(Array(features.toArray))
  21. val y = sess.runner().feed("serving_default_inputs:0", x)
  22. .fetch("StatefulPartitionedCall:0").run().get(0)
  23. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  24. y.copyTo(result)
  25. val y_pred = result(0)(0)
  26. y_pred
  27. }
  28. spark.udf.register("tfpredict",tfpredict)
  29. // Creating DataFrame dataset, and put the features into one of the columns
  30. val dfdata = sc.parallelize(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(7.0f,8.0f))).toDF("features")
  31. dfdata.show
  32. // Call the sparkSQL predicting function, add a new column as y_preds
  33. val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds")
  34. dfresult.show
  35. bundle.close
  36. }
  37. }
  1. TfDataFrame.main(Array())
  1. +----------+
  2. | features|
  3. +----------+
  4. |[1.0, 2.0]|
  5. |[3.0, 5.0]|
  6. |[7.0, 8.0]|
  7. +----------+
  8. +----------+---------+
  9. | features| y_preds|
  10. +----------+---------+
  11. |[1.0, 2.0]| 3.019596|
  12. |[3.0, 5.0]|3.9264367|
  13. |[7.0, 8.0]| 8.828995|
  14. +----------+---------+

We implemented distributed prediction using a linear regression model (implemented by tf.keras) using both RDD and DataFrame data structures in spark.

It is also possible to use the trained neural networks for distributed prediction through spark with just a slight modifications on this demonstration.

Actually the capability of TensorFlow is more than implementing neural networks, the low-level language of graph is able to express all kinds of numerical computation.

We are able to implement any kind of machine learning model on TensorFlow 2.0 with these various low-level APIs.

It is also possible to export the trained models as files and use it on the distributed system such as spark, which provides huge space of imagination for future applications.

Please leave comments in the WeChat official account “Python与算法之美” (Elegance of Python and Algorithms) if you want to communicate with the author about the content. The author will try best to reply given the limited time available.

You are also welcomed to join the group chat with the other readers through replying 加群 (join group) in the WeChat official account.

image.png