MLeap PySpark Integration

MLeap’s PySpark integration comes with the following feature set:

  • Serialization/Deserialization of Transformers and Pipelines to and from Bundle.ML
  • Support of additional feature transformers and models (ex: SVM, OneVsRest, MapTransform)
  • Support for custom transformers

To use MLeap you do not have to change how you construct your existing pipelines, so the rest of the documentation is going to focus on how to serialize and deserialize your pipeline to and from bundle.ml.To see how to execute your pipeline outside of Spark, refer to the MLeap Runtime section.

Serializing with PySpark

Serializing and deserializing with PySpark works almost exactly the sameas with MLeap. The only difference is we are serializing anddeserializing Spark pipelines and we need to import different support classes.

Create a Simple Spark Pipeline

  1. # Imports MLeap serialization functionality for PySpark
  2. import mleap.pyspark
  3. from mleap.pyspark.spark_support import SimpleSparkSerializer
  4. # Import standard PySpark Transformers and packages
  5. from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
  6. from pyspark.ml import Pipeline, PipelineModel
  7. from pyspark.sql import Row
  8. # Create a test data frame
  9. l = [('Alice', 1), ('Bob', 2)]
  10. rdd = sc.parallelize(l)
  11. Person = Row('name', 'age')
  12. person = rdd.map(lambda r: Person(*r))
  13. df2 = spark.createDataFrame(person)
  14. df2.collect()
  15. # Build a very simple pipeline using two transformers
  16. string_indexer = StringIndexer(inputCol='name', outputCol='name_string_index')
  17. feature_assembler = VectorAssembler(inputCols=[string_indexer.getOutputCol()], outputCol="features")
  18. feature_pipeline = [string_indexer, feature_assembler]
  19. featurePipeline = Pipeline(stages=feature_pipeline)
  20. fittedPipeline = featurePipeline.fit(df2)

Serialize to Zip File

In order to serialize to a zip file, make sure the URI begins withjar:file and ends with a .zip.

For examplejar:file:/tmp/mleap-bundle.zip.

JSON Format

  1. fittedPipeline.serializeToBundle("jar:file:/tmp/pyspark.example.zip", fittedPipeline.transform(df2))

Protobuf Format

Support coming soon

Deserializing

Deserializing is just as easy as serializing. You don’t need to know theformat the MLeap Bundle was serialized as beforehand, you just need toknow where the bundle is.

From Zip Bundle

  1. deserializedPipeline = PipelineModel.deserializeFromBundle("jar:file:/tmp/pyspark.example.zip")