基础 Demo
基础 Demo 会引导你使用 Spark 来构建 ML Pipeline,导出 Pipeline 为 MLeap Bundle,以及随后在 MLeap Runtime 中使用它来转换 Data Frame。
构建和导出 MLeap Bundle
本章节我们会通过编码来创建一个简单的 Spark ML Pipeline,然后将其导出成 MLeap Bundle。我们的 Pipeline 非常简单,它在一个离散特征上进行字符串索引,然后使用一个二分器将结果转为 0 或 1。这个 Pipeline 没有实际的用途,但能够展示出从 Spark ML Pipeline 构建得到 MLeap Bundle 是多么容易。
import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import resource._
val datasetName = "./mleap-docs/assets/spark-demo.csv"
val dataframe: DataFrame = spark.sqlContext.read.format("csv")
.option("header", true)
.load(datasetName)
.withColumn("test_double", col("test_double").cast("double"))
// User out-of-the-box Spark transformers like you normally would
val stringIndexer = new StringIndexer().
setInputCol("test_string").
setOutputCol("test_index")
val binarizer = new Binarizer().
setThreshold(0.5).
setInputCol("test_double").
setOutputCol("test_bin")
val pipelineEstimator = new Pipeline()
.setStages(Array(stringIndexer, binarizer))
val pipeline = pipelineEstimator.fit(dataframe)
// then serialize pipeline
val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
pipeline.writeBundle.save(bf)(sbc).get
}
训练数据集可以从这里获取。
注意:由于 GitBook 不允许用户直接点击链接下载,请右键另存为。
导入 MLeap Bundle
本节中我们会加载上一节生成的 MLeap Bundle 到 MLeap Runtime 中。我们将会使用 MLeap Runtime 来转换一帧 Leap Frame。
import ml.combust.bundle.BundleFile
import ml.combust.mleap.runtime.MleapSupport._
import resource._
// load the Spark pipeline we saved in the previous section
val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {
bundleFile.loadMleapBundle().get
}).opt.get
// create a simple LeapFrame to transform
import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
import ml.combust.mleap.core.types._
// MLeap makes extensive use of monadic types like Try
val schema = StructType(StructField("test_string", ScalarType.String),
StructField("test_double", ScalarType.Double)).get
val data = Seq(Row("hello", 0.6), Row("MLeap", 0.2))
val frame = DefaultLeapFrame(schema, data)
// transform the dataframe using our pipeline
val mleapPipeline = bundle.root
val frame2 = mleapPipeline.transform(frame).get
val data2 = frame2.dataset
// get data from the transformed rows and make some assertions
assert(data2(0).getDouble(2) == 1.0) // string indexer output
assert(data2(0).getDouble(3) == 1.0) // binarizer output
// the second row
assert(data2(1).getDouble(2) == 2.0)
assert(data2(1).getDouble(3) == 0.0)
搞定!这个例子非常简单。你很可能不会像我们那样手动去构建 Spark ML Pipeline,而是使用 Estimator 和 Pipeline 基于你的数据来训练得到有用的模型。更高级的例子,可以参见我们的 MNIST Demo 章节。