Data Preparation
DataFrame Metadata
Column metadata is one of the most useful and the least known features of the Spark Dataset
. It is worth noting that all features described below, although not private, are part of the developer API and as such can be unstable or even removed in minor versions.
Metadata in ML pipelines
Although it is widely used by ML Pipelines
to indicate variable types and levels a whole process is usually completely transparent and at least partially hidden from the final user so let’s look at a simple pipeline and see what happens behind the scenes.
We’ll start with a simple dataset:
val df = Seq(
(0.0, "x", 2.0),
(1.0, "y", 3.0),
(2.0, "x", -1.0)
).toDF("label", "x1", "x2")
and a following pipeline:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
val stages = Array(
new StringIndexer().setInputCol("x1").setOutputCol("x1_"),
new VectorAssembler().setInputCols(Array("x1_", "x2")).setOutputCol("features")
)
val model = new Pipeline().setStages(stages).fit(df)
Now we can extract stages
, transform
data step-by-step:
val dfs = model.stages.scanLeft(df)((df, stage) => stage.transform(df))
and see what is going on at each stage:
Our initial dataset has no metadata:
dfs(0).schema.map(_.metadata)
// Seq[org.apache.spark.sql.types.Metadata] = List({}, {}, {}, {})
After transforming with
StringIndexerModel
we can see indexer specific metadata:dfs(1).schema.last.metadata
// org.apache.spark.sql.types.Metadata =
// {"ml_attr":{"vals":["x","y"],"type":"nominal","name":"x1_"}}
It is important to note that this information is stored locally so it is better to keep that in mind if number of unique values is large.
Finally metadata for assembled feature vector:
dfs(2).schema.last.metadata
// org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{
// "numeric":[{"idx":1,"name":"x2"}],
// "nominal":[{"vals":["x","y"],"idx":0,"name":"x1_"}]
// },"num_attrs":2}}
Metadata from upstream stages is picked by the assembler and used to describe vector indices.
Let’s check if metadata is actually used in practice:
import org.apache.spark.ml.classification.DecisionTreeClassifier
new DecisionTreeClassifier().setLabelCol("label").fit(dfs.last).toDebugString
// String =
// "DecisionTreeClassificationModel (uid=dtc_72c1c370aa00) of depth 2 with 5 nodes
// If (feature 0 in {0.0})
// If (feature 1 <= -1.0)
// Predict: 2.0
// Else (feature 1 > -1.0)
// Predict: 1.0
// Else (feature 0 not in {0.0})
// Predict: 0.0
// "
Note: Prior to Spark 2.0.0 label
column would require indexing.
As you can see nominal and numerical features are recognized and used in different ways. Which is exactly the thing we would expect.
Setting ML attributes manually
Scala
So far so good but what if you work with data which has been already preprocessed? In case like this Spark provides a set of utilities designed to create ML compliant metadata. Let’s get familiar with a whole process by building metadata equivalent to the one generated by the ML pipeline we used before.
We’ll need a NominalAttribute
:
import org.apache.spark.ml.attribute.NominalAttribute
val firstAttr = NominalAttribute.defaultAttr.withValues("x", "y").withName("x1_")
and a NumericAttribute
import org.apache.spark.ml.attribute.NumericAttribute
val secondAttr = NumericAttribute.defaultAttr.withName("x2")
Numeric attributes provide also a number of methods which can be used to store basic descriptive statistics like mean, standard deviation, minimum or maximum
Finally we combine attributes using AttributeGroup
and convert it to Metadata
object:
import org.apache.spark.ml.attribute.AttributeGroup
val featuresMetadata = new AttributeGroup("features", Array(firstAttr, secondAttr))
All what is left is quick sanity check:
featuresMetadata == dfs.last.schema.last.metadata
// Boolean = true
Generated metadata can be applied using Column.as
method:
/* Note We use local MLib API only to show VectorUDT usage.
*/
import org.apache.spark.mllib.linalg.Vectors
val records = Seq(
(0.0, Vectors.dense(Array(0.0, 2.0))),
(1.0, Vectors.dense(Array(1.0, 3.0))),
(2.0, Vectors.dense(Array(0.0, -1.0)))
)
records.toDF("label", "features")
.withColumn("features", $"features".as("features", featuresMetadata))
or added to schema when creating DataFrame
:
import org.apache.spark.sql.types._
import org.apache.spark.mllib.linalg.VectorUDT
val schema = StructType(Seq(
StructField("label", DoubleType, false),
StructField("features", new VectorUDT(), false, featuresMetadata)
))
Python
Unlike Scala Python doesn’t provide any helpers and metadata is simply represented as a standard Python dict
.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
df = sc.parallelize((
(0.0, "x", 2.0),
(1.0, "y", 3.0),
(2.0, "x", -1.0)
)).toDF(["label", "x1", "x2"])
model = Pipeline(stages=[
StringIndexer(inputCol="x1", outputCol="x1_"),
VectorAssembler(inputCols=["x1_", "x2"], outputCol="features"),
]).fit(df)
model.transform(df).schema[-1].metadata
## {'ml_attr': {'attrs': {'nominal': [{'idx': 0,
## 'name': 'x1_',
## 'vals': ['x', 'y']}],
## 'numeric': [{'idx': 1, 'name': 'x2'}]},
## 'num_attrs': 2}}
Before Spark 2.2, PySpark doesn’t support attaching metadata to a single column. It is possible though, to use method similar to this one:
import json
from pyspark import SparkContext
from pyspark.sql import Column
from pyspark.sql.functions import col
def withMeta(self, alias, meta):
sc = SparkContext._active_spark_context
jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))
Column.withMeta = withMeta
meta = {"ml_attr": {"name": "label_with_meta",
"type": "nominal",
"vals": ["0.0", "1.0", "2.0"]}}
df_with_meta = df.withColumn("label_with_meta", col("label").withMeta("", meta))
df_with_meta.schema[-1].metadata == meta
## True
Spark 2.2 added support for setting metadata with Column.alias
:
df_with_meta = df.withColumn("label_with_meta", col("label").alias("label", metadata=meta))
df_with_meta.schema[-1].metadata == meta
## True
Setting custom column metadata
Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks.
Metadata
object created from JSON string:
import org.apache.spark.sql.types.Metadata
Metadata.fromJson("""{"foo": "bar"}""")
// org.apache.spark.sql.types.Metadata = {"foo":"bar"}
or constructed using MetadataBuilder
:
import org.apache.spark.sql.types.MetadataBuilder
new MetadataBuilder().putString("foo", "bar").build
// org.apache.spark.sql.types.Metadata = {"foo":"bar"}
Moreover it can attached to Parquet files and loaded back later:
Seq((1L, "foo"), (2L, "bar"))
.toDF("id", "txt")
.withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}""")))
.write.parquet("/tmp/foo")
spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata)
// Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"})
Accessing Metadata Directly
Metadata can be also accessed directly using Parquet tools:
import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val conf = spark.sparkContext.hadoopConfiguration
def getFooters(conf: Configuration, path: String) = {
val fs = FileSystem.get(conf)
val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
footers
}
def getFileMetadata(conf: Configuration, path: String) = {
getFooters(conf, path)
.asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
}
getFileMetadata(conf, "/tmp/foo").headOption
// Option[scala.collection.mutable.Map[String,String]] =
// Some(Map(org.apache.spark.sql.parquet.row.metadata ->
// {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
// {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))
We can also use extracted footers to write standalone metadata file when needed:
import org.apache.parquet.hadoop.ParquetFileWriter
def createMetadata(conf: Configuration, path: String) = {
val footers = getFooters(conf, path)
ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
}