MLeap Scikit-Learn Integration

MLeap provides serialization functionality to Scikit Pipelines, Feature Unions and Transformers to Bundle.Ml in such a way that we maintain parity between Scikit and Spark transformers’ functionality.There are two main use-cases for MLeap-Scikit:

  1. Serialize Scikit Pipelines and execute using MLeap Runtime
  2. Serialize Scikit Pipelines and deserialize with Spark

As mentioned earlier, MLeap Runtime is a scala-only library today and we plan to add Python bindings in the future. However, it is enough to be able to execute pipelines and models without the dependency on Scikit, and Numpy.

Extending Scikit with MLeap

There are a couple of important differences in how scikit transformers work and how Spark transformers work:

  1. Spark transformers all come with name, op, inputCol, and outputCol attributes, scikit does not
  2. Spark transformers can opperate on a vector, where as scikit operates on a n-dimensional arrays and matrices
  3. Spark, because it is written in Scala, makes it easy to add implicit functions and attributes, with scikit it is a bit trickier and requires use of setattr()

Because of these additional complexities, there are a few paradigms we have to follow when extending scikit transformers with MLeap.First is we have to initialize each transformer to include:

  • Op: Unique op name - this is used as a link to Spark-based transformers (i.e. a Standard Scaler in scikit is the same as in Spark, so we have an op called standard_scaler to represent it)
  • Name: A unique name for each transformer. For example, if you have multiple Standard Scaler objects, each needs to be assigned a unque name
  • Input Column: Strictly for serialization, we set what the input column is
  • Output Column: Strictly for serialization, we set what the output column is

Scikit Transformer and Pipeline with MLeap

Let’s first initialize all of the required libraries

  1. # Initialize MLeap libraries before Scikit/Pandas
  2. import mleap.sklearn.preprocessing.data
  3. import mleap.sklearn.pipeline
  4. from mleap.sklearn.preprocessing.data import FeatureExtractor
  5. # Import Scikit Transformer(s)
  6. import pandas as pd
  7. import numpy as np
  8. from sklearn.preprocessing import StandardScaler
  9. from sklearn.pipeline import Pipeline

Then let’s create a test DataFrame in Pandas

  1. # Create a pandas DataFrame
  2. df = pd.DataFrame(np.random.randn(10, 5), columns=['a', 'b', 'c', 'd', 'e'])

Let’s define two transformers, a feature extractor that will extract only the features we want to scale and Standard Scaler, which will perform the standard normal scaling operation.

  1. # Initialize a FeatureExtractor, which subselects only the features we want
  2. # to run the Standard Scaler against
  3. input_features = ['a', 'c', 'd']
  4. output_vector_name = 'unscaled_continuous_features' # Used only for serialization purposes
  5. output_features = ["{}_scaled".format(x) for x in input_features]
  6. feature_extractor_tf = FeatureExtractor(input_scalars=input_features,
  7. output_vector=output_vector_name,
  8. output_vector_items=output_features)
  9. # Define the Standard Scaler as we normally would
  10. standard_scaler_tf = StandardScaler(with_mean=True,
  11. with_std=True)
  12. # Execute ML-Init to add the require attributes to the transformer object
  13. # Op and Name will be initialized automatically
  14. standard_scaler_tf.mlinit(prior_tf=feature_extractor_tf,
  15. output_features='scaled_continuous_features')

Now that we have our transformers defined, we assemble them into a pipeline and execute it on our data frame

  1. # Now let's create a small pipeline using the Feature Extractor and the Standard Scaler
  2. standard_scaler_pipeline = Pipeline([(feature_extractor_tf.name, feature_extractor_tf),
  3. (standard_scaler_tf.name, standard_scaler_tf)])
  4. standard_scaler_pipeline.mlinit()
  5. # Now let's run it on our test DataFrame
  6. standard_scaler_pipeline.fit_transform(df)
  7. # Printed output
  8. array([[ 0.2070446 , 0.30612846, -0.91620529],
  9. [ 0.81463009, -0.26668287, 1.95663995],
  10. [-0.94079041, -0.18882131, -0.0462197 ],
  11. [-0.43931405, 0.13214763, -0.10700743],
  12. [ 0.43992551, -0.2985418 , -0.89093752],
  13. [-0.15391539, -2.20828471, 0.5361159 ],
  14. [-1.07689244, 1.61019861, 1.42868885],
  15. [ 0.87874789, 1.43146482, -0.44362038],
  16. [-1.60105094, -0.40130005, -0.10754886],
  17. [ 1.87161513, -0.11630878, -1.40990552]])

Combining Transformers

We just demonstrated how to apply a transformer to a set of features, but the output of that opperation is just a n-dimensional array that we would have to join back to our original data if we wanted to use it in say a regression model. Let’s show how we can combine data from multiple transformers using Feature Unions.

First, go ahead and create another transformers, a MinMaxScaler on the remaining two features of the data frame:

  1. from sklearn.preprocessing import MinMaxScaler
  2. input_features_min_max = ['b', 'e']
  3. output_vector_name_min_max = 'unscaled_continuous_features_min_max' # Used only for serialization purposes
  4. output_features_min_max = ["{}_min_maxscaled".format(x) for x in input_features_min_max]
  5. feature_extractor_min_max_tf = FeatureExtractor(input_scalars=input_features_min_max,
  6. output_vector=output_vector_name_min_max,
  7. output_vector_items=output_features_min_max)
  8. # Define the MinMaxScaler as we normally would
  9. min_maxscaler_tf = MinMaxScaler()
  10. # Execute ML-Init to add the require attributes to the transformer object
  11. # Op and Name will be initialized automatically
  12. min_maxscaler_tf.mlinit(prior_tf=feature_extractor_min_max_tf,
  13. output_features='min_max_scaled_continuous_features')
  14. # Assemble our MinMaxScaler Pipeline
  15. min_max_scaler_pipeline = Pipeline([(feature_extractor_min_max_tf.name, feature_extractor_min_max_tf),
  16. (min_maxscaler_tf.name, min_maxscaler_tf)])
  17. min_max_scaler_pipeline.mlinit()
  18. # Now let's run it on our test DataFrame
  19. min_max_scaler_pipeline.fit_transform(df)
  20. array([[ 0.58433367, 0.72234095],
  21. [ 0.21145259, 0.72993807],
  22. [ 0.52661493, 0.59771784],
  23. [ 0.29403088, 0.19431993],
  24. [ 0.48838789, 1. ],
  25. [ 1. , 0.46456522],
  26. [ 0.36402459, 0.43669119],
  27. [ 0. , 0.74182958],
  28. [ 0.60312285, 0. ],
  29. [ 0.33707035, 0.39792128]])

Finaly, let’s combine the two pipelines using a Feature Union. Note that you do not have to run the fit or fit_transform method on the pipeline before assembling the Feature Union.

  1. # Import MLeap extension to Feature Unions
  2. import mleap.sklearn.feature_union
  3. # Import Feature Union
  4. from sklearn.pipeline import FeatureUnion
  5. feature_union = FeatureUnion([
  6. (standard_scaler_pipeline.name, standard_scaler_pipeline),
  7. (min_max_scaler_pipeline.name, min_max_scaler_pipeline)
  8. ])
  9. feature_union.mlinit()
  10. # Create pipeline out of the Feature Union
  11. feature_union_pipeline = Pipeline([(feature_union.name, feature_union)])
  12. feature_union_pipeline.mlinit()
  13. # Execute it on our data frame
  14. feature_union_pipeline.fit_transform(df)
  15. array([[ 0.2070446 , 0.30612846, -0.91620529, 0.58433367, 0.72234095],
  16. [ 0.81463009, -0.26668287, 1.95663995, 0.21145259, 0.72993807],
  17. [-0.94079041, -0.18882131, -0.0462197 , 0.52661493, 0.59771784],
  18. [-0.43931405, 0.13214763, -0.10700743, 0.29403088, 0.19431993],
  19. [ 0.43992551, -0.2985418 , -0.89093752, 0.48838789, 1. ],
  20. [-0.15391539, -2.20828471, 0.5361159 , 1. , 0.46456522],
  21. [-1.07689244, 1.61019861, 1.42868885, 0.36402459, 0.43669119],
  22. [ 0.87874789, 1.43146482, -0.44362038, 0. , 0.74182958],
  23. [-1.60105094, -0.40130005, -0.10754886, 0.60312285, 0. ],
  24. [ 1.87161513, -0.11630878, -1.40990552, 0.33707035, 0.39792128]])

Serialize to Zip File

In order to serialize to a zip file, make sure the URI begins with jar:file and ends with a .zip.

For example jar:file:/tmp/mleap-bundle.zip.

Note that you do have to fit your pipeline before serializing.

JSON Format

Setting init=True tells the serializer that we are creating a bundle instead of just serializing the transformer.

  1. feature_union_pipeline.serialize_to_bundle('/tmp', 'mleap-bundle', init=True)

Protobuf Format

Coming Soon

Deserializing

Coming Soon

Demos

Complete demos available on github that demonstrates full usage of Transformers, Pipelines, Feature Unions and serialization.