MLeap Spark Integration

MLeap’s Spark integration comes with the following feature set:

  • Serialization/Deserialization of Transformers and Pipelines to and from Bundle.ML
  • Support of additional feature transformers and models (ex: SVM, OneVsRest, MapTransform)
  • Support for custom transformers

To use MLeap you do not have to change how you construct your existing pipelines, so the rest of the documentation is going to focus on how to serialize and deserialize your pipeline to and from bundle.ml.To see how to execute your pipeline outside of Spark, refer to the MLeap Runtime section.

Serializing with Spark

Serializing and deserializing with Spark works almost exactly the sameas with MLeap. The only difference is we are serializing anddeserializing Spark pipelines and we need to import different implicitsupport classes.

Create a Simple Spark Pipeline

  1. import ml.combust.bundle.BundleFile
  2. import ml.combust.bundle.serializer.SerializationFormat
  3. import org.apache.spark.ml.feature.{StringIndexerModel, VectorAssembler}
  4. import org.apache.spark.ml.mleap.SparkUtil
  5. import org.apache.spark.ml.bundle.SparkBundleContext
  6. import ml.combust.mleap.spark.SparkSupport._
  7. import resource._
  8. // Create a sample pipeline that we will serialize
  9. // And then deserialize using various formats
  10. val stringIndexer = new StringIndexerModel(labels = Array("Hello, MLeap!", "Another row")).
  11. setInputCol("a_string").
  12. setOutputCol("a_string_index")
  13. val featureAssembler = new VectorAssembler().setInputCols(Array("a_double")).
  14. setOutputCol("features")
  15. // Because of Spark's privacy, our example pipeline is considerably
  16. // Less interesting than the one we used to demonstrate MLeap serialization
  17. val pipeline = SparkUtil.createPipelineModel(Array(stringIndexer, featureAssembler))

Serialize to Zip File

In order to serialize to a zip file, make sure the URI begins withjar:file and ends with a .zip.

For examplejar:file:/tmp/mleap-bundle.zip.

JSON Format

  1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
  2. for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-json.zip"))) {
  3. pipeline.writeBundle.format(SerializationFormat.Json).save(bundle)(context)
  4. }

Protobuf Format

  1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
  2. for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-protobuf.zip"))) {
  3. pipeline.writeBundle.format(SerializationFormat.Protobuf).save(bundle)(context)
  4. }

Serialize to Directory

In order to serialize to a directory, make sure the URI begins withfile.

For example file:/tmp/mleap-bundle-dir

JSON Format

  1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
  2. for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-json-dir"))) {
  3. pipeline.writeBundle.format(SerializationFormat.Json).save(bundle)(context)
  4. }

Protobuf Format

  1. implicit val context = SparkBundleContext().withDataset(sparkTransformed)
  2. for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-protobuf-dir"))) {
  3. pipeline.writeBundle.format(SerializationFormat.Protobuf).save(bundle)(context)
  4. }

Deserializing

Deserializing is just as easy as serializing. You don’t need to know theformat the MLeap Bundle was serialized as beforehand, you just need toknow where the bundle is.

From Zip Bundle

  1. // Deserialize a zip bundle
  2. // Use Scala ARM to make sure resources are managed properly
  3. val zipBundle = (for(bundle <- managed(BundleFile("jar:file:/tmp/mleap-examples/simple-json.zip"))) yield {
  4. bundle.loadSparkBundle().get
  5. }).opt.get

Directory Bundle

  1. // Deserialize a directory bundle
  2. // Use Scala ARM to make sure resources are managed properly
  3. val dirBundle = (for(bundle <- managed(BundleFile("file:/tmp/mleap-examples/simple-json-dir"))) yield {
  4. bundle.loadSparkBundle().get
  5. }).opt.get