基础 Demo

基础 Demo 会引导你使用 Spark 来构建 ML Pipeline,导出 Pipeline 为 MLeap Bundle,以及随后在 MLeap Runtime 中使用它来转换 Data Frame。

构建和导出 MLeap Bundle

本章节我们会通过编码来创建一个简单的 Spark ML Pipeline,然后将其导出成 MLeap Bundle。我们的 Pipeline 非常简单,它在一个离散特征上进行字符串索引,然后使用一个二分器将结果转为 0 或 1。这个 Pipeline 没有实际的用途,但能够展示出从 Spark ML Pipeline 构建得到 MLeap Bundle 是多么容易。

  1. import ml.combust.bundle.BundleFile
  2. import ml.combust.mleap.spark.SparkSupport._
  3. import org.apache.spark.ml.Pipeline
  4. import org.apache.spark.ml.bundle.SparkBundleContext
  5. import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
  6. import org.apache.spark.sql._
  7. import org.apache.spark.sql.functions._
  8. import resource._
  9. val datasetName = "./mleap-docs/assets/spark-demo.csv"
  10. val dataframe: DataFrame = spark.sqlContext.read.format("csv")
  11. .option("header", true)
  12. .load(datasetName)
  13. .withColumn("test_double", col("test_double").cast("double"))
  14. // User out-of-the-box Spark transformers like you normally would
  15. val stringIndexer = new StringIndexer().
  16. setInputCol("test_string").
  17. setOutputCol("test_index")
  18. val binarizer = new Binarizer().
  19. setThreshold(0.5).
  20. setInputCol("test_double").
  21. setOutputCol("test_bin")
  22. val pipelineEstimator = new Pipeline()
  23. .setStages(Array(stringIndexer, binarizer))
  24. val pipeline = pipelineEstimator.fit(dataframe)
  25. // then serialize pipeline
  26. val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
  27. for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
  28. pipeline.writeBundle.save(bf)(sbc).get
  29. }

训练数据集可以从这里获取。

注意:由于 GitBook 不允许用户直接点击链接下载,请右键另存为。

导入 MLeap Bundle

本节中我们会加载上一节生成的 MLeap Bundle 到 MLeap Runtime 中。我们将会使用 MLeap Runtime 来转换一帧 Leap Frame。

  1. import ml.combust.bundle.BundleFile
  2. import ml.combust.mleap.runtime.MleapSupport._
  3. import resource._
  4. // load the Spark pipeline we saved in the previous section
  5. val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {
  6. bundleFile.loadMleapBundle().get
  7. }).opt.get
  8. // create a simple LeapFrame to transform
  9. import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
  10. import ml.combust.mleap.core.types._
  11. // MLeap makes extensive use of monadic types like Try
  12. val schema = StructType(StructField("test_string", ScalarType.String),
  13. StructField("test_double", ScalarType.Double)).get
  14. val data = Seq(Row("hello", 0.6), Row("MLeap", 0.2))
  15. val frame = DefaultLeapFrame(schema, data)
  16. // transform the dataframe using our pipeline
  17. val mleapPipeline = bundle.root
  18. val frame2 = mleapPipeline.transform(frame).get
  19. val data2 = frame2.dataset
  20. // get data from the transformed rows and make some assertions
  21. assert(data2(0).getDouble(2) == 1.0) // string indexer output
  22. assert(data2(0).getDouble(3) == 1.0) // binarizer output
  23. // the second row
  24. assert(data2(1).getDouble(2) == 2.0)
  25. assert(data2(1).getDouble(3) == 0.0)

搞定!这个例子非常简单。你很可能不会像我们那样手动去构建 Spark ML Pipeline,而是使用 Estimator 和 Pipeline 基于你的数据来训练得到有用的模型。更高级的例子,可以参见我们的 MNIST Demo 章节。