Custom Transformers
Every transformer in MLeap can be considered a customtransformer. The only difference between the transformers and bundleintegration code you write and what we write is that ours gets includedin the release jars. We welcome transformer additions to the MLeap project,please make a PR.
There are plenty of examples in the MLeap source codefor how to write your own transformers and make them serializableto/from Spark and MLeap.
Let’s go through a simple example of writing a custom transformer thatmaps an input string to a double using a Map[String, Double]
to store the data needed for transformation. We will call our customtransformer: StringMap
. This is a transformer that is included inMLeap source code, and you can view it here: StringMapModel.scala.
Overview
A brief overview of the steps involved:
- Build our core model logic that can be shared between Spark and MLeap
- Build the MLeap transformer
- Build the Spark transformer
- Build bundle serialization for MLeap
- Build bundle serialization for Spark
- Configure the MLeap Bundle registries with the MLeap and Sparkcustom transformer
Core Model
The core model is the logic needed to transform the input data. It has no dependencieson Spark or MLeap. In the case of our StringMapModel
, it is a class thatknows how to map one string to a double. Let’s see what this lookslike in Scala.
case class StringMapModel(labels: Map[String, Double]) extends Model {
def apply(label: String): Double = labels(label)
override def inputSchema: StructType = StructType("input" -> ScalarType.String).get
override def outputSchema: StructType = StructType("output" -> ScalarType.Double).get
}
The case class has a set of labels that it know how to map to a double.This is very similar to a StringIndexerModel
except that the values ofthe strings are arbitrary and not in sequence.
MLeap Transformer
The MLeap transformer is the piece of code that knows how to executeyour core model against a leap frame. All MLeap transformers inheritfrom a base class: ml.combust.mleap.runtime.transformer.Transformer
.For our example StringMap
transformer, we can use a utility base classfor simple input/output transformers called:ml.combust.mleap.runtime.transformer.SimpleTransformer
. This baseclass takes care of a small amount of boilerplate for any transformerthat has exactly one input and one output column.
Here is the Scala code for the MLeap transformer.
import ml.combust.mleap.core.feature.StringMapModel
import ml.combust.mleap.core.types.NodeShape
import ml.combust.mleap.runtime.function.UserDefinedFunction
import ml.combust.mleap.runtime.transformer.{SimpleTransformer, Transformer}
case class StringMap(override val uid: String = Transformer.uniqueName("string_map"),
override val shape: NodeShape,
override val model: StringMapModel) extends SimpleTransformer {
override val exec: UserDefinedFunction = (label: String) => model(label)
Note the UserDefinedFunction
exec. This is an MLeapUserDefinedFunction that gets created from a Scala function usingreflection. UserDefinedFunctions are the primary way that MLeap allowsus to transform LeapFrames. The NodeShape shape defines the inputColand outputCol for this transformer.
Spark Transformer
The Spark transformer knows how to execute the core model against aSpark DataFrame. All Spark transformers inherit fromorg.apache.spark.ml.Transformer
. If you have ever written a customSpark transformer before, this process will be very familiar.
Here is what a custom Spark transformer looks like in Scala.
import ml.combust.mleap.core.feature.StringMapModel
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types._
class StringMap(override val uid: String,
val model: StringMapModel) extends Transformer
with HasInputCol
with HasOutputCol {
def this(model: StringMapModel) = this(uid = Identifiable.randomUID("string_map"), model = model)
def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
@org.apache.spark.annotation.Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val stringMapUdf = udf {
(label: String) => model(label)
}
dataset.withColumn($(outputCol), stringMapUdf(dataset($(inputCol))))
}
override def copy(extra: ParamMap): Transformer = copyValues(new StringMap(uid, model), extra)
@DeveloperApi
override def transformSchema(schema: StructType): StructType = {
require(schema($(inputCol)).dataType.isInstanceOf[StringType],
s"Input column must be of type StringType but got ${schema($(inputCol)).dataType}")
val inputFields = schema.fields
require(!inputFields.exists(_.name == $(outputCol)),
s"Output column ${$(outputCol)} already exists.")
StructType(schema.fields :+ StructField($(outputCol), DoubleType))
}
}
MLeap Serialization
We need to define how to serialize/deserialize our model to/from anMLeap Bundle. In order to do this, we make an implementation ofml.combust.mleap.bundle.ops.MleapOp
and ml.combust.bundle.op.OpModel
for ourMLeap transformer and core model, respectively. These type classes areall we need to define bundle serialization.
Here is what the serialization code looks like for our MLeap transformerin Scala.
NOTE: The code below looks long, but most of it is auto-generated by the IDE.
import ml.combust.bundle.BundleContext
import ml.combust.bundle.dsl._
import ml.combust.bundle.op.OpModel
import ml.combust.mleap.bundle.ops.MleapOp
import ml.combust.mleap.core.feature.StringMapModel
import ml.combust.mleap.runtime.MleapContext
import ml.combust.mleap.runtime.transformer.feature.StringMap
class StringMapOp extends MleapOp[StringMap, StringMapModel] {
override val Model: OpModel[MleapContext, StringMapModel] = new OpModel[MleapContext, StringMapModel] {
// the class of the model is needed for when we go to serialize JVM objects
override val klazz: Class[StringMapModel] = classOf[StringMapModel]
// a unique name for our op: "string_map"
override def opName: String = Bundle.BuiltinOps.feature.string_map
override def store(model: Model, obj: StringMapModel)
(implicit context: BundleContext[MleapContext]): Model = {
// unzip our label map so we can store the label and the value
// as two parallel arrays, we do this because MLeap Bundles do
// not support storing data as a map
val (labels, values) = obj.labels.toSeq.unzip
// add the labels and values to the Bundle model that
// will be serialized to our MLeap bundle
model.withValue("labels", Value.stringList(labels)).
withValue("values", Value.doubleList(values))
}
override def load(model: Model)
(implicit context: BundleContext[MleapContext]): StringMapModel = {
// retrieve our list of labels
val labels = model.value("labels").getStringList
// retrieve our list of values
val values = model.value("values").getDoubleList
// reconstruct the model using the parallel labels and values
StringMapModel(labels.zip(values).toMap)
}
}
// the core model that is used by the transformer
override def model(node: StringMap): StringMapModel = node.model
}
We will need to register StringMapOp
with the MLeap bundle registry atruntime to let MLeap know about it. We go over the registry later inthis article.
Spark Serialization
We also need to define how to serialize/deserialize the custom Sparktransformer to/from MLeap. This is very similar to the process we tookfor the MLeap transformer above. We will again be implementing bothml.combust.bundle.op.OpNode
and ml.combust.bundle.op.OpModel
.
Here is what the serialization code looks like for StringMap in Scala.
import ml.combust.bundle.BundleContext
import ml.combust.bundle.dsl._
import ml.combust.bundle.op.{OpModel, OpNode}
import ml.combust.mleap.core.feature.StringMapModel
import ml.combust.mleap.runtime.MleapContext
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.mleap.feature.StringMap
class StringMapOp extends OpNode[SparkBundleContext, StringMap, StringMapModel] {
override val Model: OpModel[SparkBundleContext, StringMapModel] = new OpModel[SparkBundleContext, StringMapModel] {
// the class of the model is needed for when we go to serialize JVM objects
override val klazz: Class[StringMapModel] = classOf[StringMapModel]
// a unique name for our op: "string_map"
// this should be the same as for the MLeap transformer serialization
override def opName: String = Bundle.BuiltinOps.feature.string_map
override def store(model: Model, obj: StringMapModel)
(implicit context: BundleContext[SparkBundleContext]): Model = {
// unzip our label map so we can store the label and the value
// as two parallel arrays, we do this because MLeap Bundles do
// not support storing data as a map
val (labels, values) = obj.labels.toSeq.unzip
// add the labels and values to the Bundle model that
// will be serialized to our MLeap bundle
model.withValue("labels", Value.stringList(labels)).
withValue("values", Value.doubleList(values))
}
override def load(model: Model)
(implicit context: BundleContext[SparkBundleContext]): StringMapModel = {
// retrieve our list of labels
val labels = model.value("labels").getStringList
// retrieve our list of values
val values = model.value("values").getDoubleList
// reconstruct the model using the parallel labels and values
StringMapModel(labels.zip(values).toMap)
}
}
override val klazz: Class[StringMap] = classOf[StringMap]
override def name(node: StringMap): String = node.uid
override def model(node: StringMap): StringMapModel = node.model
override def load(node: Node, model: StringMapModel)
(implicit context: BundleContext[SparkBundleContext]): StringMap = {
new StringMap(uid = node.name, model = model).
setInputCol(node.shape.standardInput.name).
setOutputCol(node.shape.standardOutput.name)
}
override def shape(node: StringMap)(implicit context: BundleContext[SparkBundleContext]): NodeShape =
NodeShape().withStandardIO(node.getInputCol, node.getOutputCol)
}
We will need to register this with the MLeap registry as well, so thatMLeap knows how to serialize this Spark transformer.
MLeap Bundle Registries
A registry contains all of the custom transformers and types for a givenexecution engine. In this case, we support the MLeap and Spark executionengines for the StringMap
transformer, so we will have to configureboth the Spark and MLeap registry to know how to serialize/deserializetheir respective transformers.
MLeap uses Typesafe Config toconfigure registries. By default, MLeap ships with registries configuredfor the Spark runtime and the MLeap runtime. You can take a look at eachof them here:
By default, the MLeap runtime uses the configuration at ml.combust.mleap.registry.default
.Spark uses the configuration at ml.combust.mleap.spark.registry.default
.
MLeap Registry
In order to add the custom transformer to the default MLeap registry, we will add areference.conf
file to our own project that looks like this:
// make a list of all your custom transformers
// the list contains the fully-qualified class names of the
// OpNode implementations for your transformers
my.domain.mleap.ops = ["my.domain.mleap.ops.StringMapOp"]
// include the custom transformers we have defined to the default MLeap registry
ml.combust.mleap.registry.default.ops += "my.domain.mleap.ops"
Spark Registry
In order to add the custom transformer to the default Spark registry, we will add areference.conf
file to our own project that looks like this:
// make a list of all your custom transformers
// the list contains the fully-qualified class names of the
// OpNode implementations for your transformers
my.domain.mleap.spark.ops = ["my.domain.spark.ops.StringMapOp"]
// include the custom transformers ops we have defined to the default Spark registries
ml.combust.mleap.spark.registry.v20.ops += my.domain.mleap.spark.ops
ml.combust.mleap.spark.registry.v21.ops += my.domain.mleap.spark.ops
ml.combust.mleap.spark.registry.v22.ops += my.domain.mleap.spark.ops