在管道的引擎盖下看
介绍
将不同的变换器和预测器链接在一起的能力是任何机器学习(ML)库的重要特征。在FlinkML中,我们希望提供直观的API,同时利用Scala语言的函数来提供管道的类型安全实现。我们希望实现的是一个易于使用的API,它可以保护用户免受飞行前(作业启动前)时间的类型错误的影响,从而消除了长时间运行的作业被提交到集群只是为了看到它们失败的情况通常在ML管道中发生的一系列数据转换中的一些错误。
在本指南中,我们将描述我们在FlinkML中实现可链接变换器和预测器时所做的选择,并提供有关开发人员如何创建自己的算法以利用这些函数的指南。
什么和为什么
那么“ML管道”是什么意思呢?ML上下文中的流水线可以被认为是具有一些数据作为输入的 算子操作链,对该数据执行许多转换,然后输出转换的数据,或者用作预测函数的输入(特征) ,例如学习模型,或者只是输出转换后的数据本身,用于其他一些任务。最终学习者当然也可以成为管道的一部分。ML管道通常可以是复杂的 算子操作集(深入解释),并且可能成为端到端学习系统的错误源。
然后,ML管道的目的是创建一个框架,可用于管理这些 算子操作链引入的复杂性。管道应该使开发人员能够轻松定义可应用于训练数据的链式转换,以便创建将用于训练学习模型的最终特征,然后执行相同的转换集,就像无标记一样容易(测试数据。管道还应简化这些 算子操作链上的交叉验证和模型选择。
最后,通过确保管道链中的连续链接“合在一起”,我们还避免了代价高昂的类型错误。由于管道中的每个步骤都可能是计算量很大的 算子操作,因此我们希望避免运行流水线作业,除非我们确定管道中的所有输入/输出对“都适合”。
FlinkML中的管道
可以在ml.pipeline
包中找到FlinkML中管道的构建块。FlinkML如下启发的API sklearn这意味着我们有Estimator
,Transformer
和Predictor
接口。对于深入看看sklearn API的设计,有兴趣的读者可以参考这个文件。简而言之,它Estimator
是继承Transformer
和Predictor
继承的基类。Estimator
定义fit
方法,并Transformer
定义transform
方法并Predictor
定义predict
方法。
执行模型的实际训练的fit
方法Estimator
,例如在线性回归任务中找到正确的权重,或者在特征缩放器中找到数据的均值和标准偏差。正如命名所显示的那样,实现的类Transformer
是变换 算子操作,如缩放输入和Predictor
实现是学习算法,如多元线性回归。可以通过将多个变换器链接在一起来创建管道,并且管道中的最终链接可以是预测变量或另一个变换器。以Predictor结尾的管道无法再进行链接。下面是如何形成管道的示例:
// Training data
val input: DataSet[LabeledVector] = ...
// Test data
val unlabeled: DataSet[Vector] = ...
val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures()
val mlr = MultipleLinearRegression()
// Construct the pipeline
val pipeline = scaler
.chainTransformer(polyFeatures)
.chainPredictor(mlr)
// Train the pipeline (scaler and multiple linear regression)
pipeline.fit(input)
// Calculate predictions for the testing data
val predictions: DataSet[LabeledVector] = pipeline.predict(unlabeled)
正如我们所提到的,FlinkML管道是类型安全的。如果我们试图将类型输出的变压器链接A
到另一个具有类型输入的变压器,B
那么如果A
!= ,我们将在飞行前时间得到错误B
。FlinkML通过使用Scala的含义实现了这种类型安全性。
Scala隐式
如果您不熟悉Scala的含义,我们可以推荐Martin Odersky的“Scala编程” 摘录。简而言之,隐式转换通过提供从一种类型到另一种类型的转换来允许Scala中的ad-hoc多态,并且隐式值为编译器提供可以通过隐式参数提供给函数调用的默认值。隐式转换和隐式参数的组合使我们能够以类型安全的方式链接转换和预测 算子操作。
算子操作
正如我们所提到的,trait(抽象类)Estimator
定义了一个fit
方法。该方法有两个参数列表(即一个curried函数)。第一个参数列表采用输入(训练)DataSet
和估计器的参数。第二个参数列表采用一个implicit
类型的参数FitOperation
。FitOperation
是一个定义fit
方法的类,这是应该实现训练具体估计器的实际逻辑的地方。该fit
方法Estimator
基本上是围绕拟合方法的打包FitOperation
。所述predict
的方法Predictor
和所述transform
的方法Transform
被设计成在类似的方式,与相应的 算子操作类。
在这些方法中, 算子操作对象作为隐式参数提供。Scala将在类型的伴随对象中查找implicits,因此实现这些接口的类应该将这些对象作为隐藏对象提供在伴随对象中。
作为一个例子,我们可以看一下这个StandardScaler
课程。StandardScaler
延伸Transformer
,所以它可以访问它fit
和transform
函数。这两个函数分别为和伴随对象的方法和方法提供对象FitOperation
和TransformOperation
隐式参数,通过和:fit
transform
StandardScaler
transformVectors
fitVectorStandardScaler
class StandardScaler extends Transformer[StandardScaler] {
...
}
object StandardScaler {
...
implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] {
override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T])
: Unit = {
...
}
implicit def transformVectors[T <: Vector: VectorConverter: TypeInformation: ClassTag] = {
new TransformOperation[StandardScaler, T, T] {
override def transform(
instance: StandardScaler,
transformParameters: ParameterMap,
input: DataSet[T])
: DataSet[T] = {
...
}
}
请注意,StandardScaler
并不能覆盖fit
的方法Estimator
或transform
方法Transformer
。更确切地说,它的的实现FitOperation
和TransformOperation
覆盖其各自的fit
和transform
方法,其然后由被叫fit
和transform
方法Estimator
和Transformer
。类似地,实现的类Predictor
应该PredictOperation
在其伴随对象中定义隐式对象。
类型和类型安全
除了fit
与transform
运营,我们上面列出的StandardScaler
还提供fit
和transform
类型的输入 算子操作LabeledVector
。这允许我们将算法用于标记或未标记的输入,这将自动发生,具体取决于我们为拟合和变换 算子操作提供的输入类型。编译器选择正确的隐式 算子操作,具体取决于输入类型。
如果我们尝试使用不受支持的类型调用fit
或transform
方法,我们将在启动作业之前收到运行时错误。虽然在编译时也可以捕获这些类型的错误,但是我们能够为用户提供的错误消息的信息量会少得多,这就是我们选择抛出运行时异常的原因。
链接
链接是通过调用chainTransformer
或实现chainPredictor
的类的对象来实现的Transformer
。这些方法分别返回一个ChainedTransformer
或一个ChainedPredictor
对象。正如我们所提到的,ChainedTransformer
对象可以进一步链接,而ChainedPredictor
对象则不能。这些类负责为一对连续的变压器或变压器和预测器应用拟合,变换和预测 算子操作。他们还充当递归如果链的长度是大于二,因为每个ChainedTransformer
定义了一个transform
和fit
算子操作可以与多个变压器或一个预测器被进一步链接。
值得注意的是,开发人员和用户在实现算法时无需担心链接,所有这些都由FlinkML自动处理。
如何实现管道 算子
为了支持FlinkML的流水线 算子操作,算法必须遵循某种设计模式,我们将在本节中对其进行描述。假设我们想要实现一个改变数据均值的管道 算子。由于居中数据是许多分析管道中常见的预处理步骤,因此我们将其实现为Transformer
。因此,我们首先创建一个MeanTransformer
继承自的类Transformer
class MeanTransformer extends Transformer[MeanTransformer] {}
由于我们希望能够配置结果数据的均值,因此我们必须添加配置参数。
class MeanTransformer extends Transformer[MeanTransformer] {
def setMean(mean: Double): this.type = {
parameters.add(MeanTransformer.Mean, mean)
this
}
}
object MeanTransformer {
case object Mean extends Parameter[Double] {
override val defaultValue: Option[Double] = Some(0.0)
}
def apply(): MeanTransformer = new MeanTransformer
}
参数在变换器类的伴随对象中定义并扩展Parameter
该类。由于参数实例应该作为参数映射的不可变键,因此它们应该实现为case objects
。如果此组件的用户未设置其他值,则将使用默认值。如果没有指定默认值,意味着defaultValue = None
该算法必须相应地处理这种情况。
我们现在可以实例化一个MeanTransformer
对象并设置转换数据的平均值。但我们仍然必须实现转型的运作方式。工作流程可分为两个阶段。在第一阶段,变压器学习给定训练数据的平均值。然后可以在第二阶段中使用该知识来相对于配置的结果平均值变换所提供的数据。
平均值的学习可以在fit
我们Transformer
继承的 算子操作中实现Estimator
。在该fit
算子操作中,针对给定的训练数据训练管道组件。然而,该算法不是通过覆盖该fit
方法而是通过提供对应FitOperation
于正确类型的实现来实现的。看一下该fit
方法的定义Estimator
,即父类Transformer
,揭示了为什么会出现这种情况。
trait Estimator[Self] extends WithParameters with Serializable {
that: Self =>
def fit[Training](
training: DataSet[Training],
fitParameters: ParameterMap = ParameterMap.Empty)
(implicit fitOperation: FitOperation[Self, Training]): Unit = {
FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
fitOperation.fit(this, fitParameters, training)
}
}
我们看到该fit
方法是使用类型的输入数据集Training
,可选参数列表和第二个参数列表调用的,其中隐式参数类型FitOperation
。在函数体内,首先注册一些机器学习类型,然后调用参数的fit
方法FitOperation
。实例将自身,参数图和训练数据集作为方法的参数。因此,所有的程序逻辑都发生在FitOperation
。
将FitOperation
有两个类型参数。第一个定义管道 算子类型,它FitOperation
应该工作,第二个类型参数定义数据集数据元的类型。如果我们首先想要实现这个MeanTransformer
工作DenseVector
,那么我们就必须提供一个实现FitOperation[MeanTransformer, DenseVector]
。
val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
import org.apache.flink.ml.math.Breeze._
val meanTrainingData: DataSet[DenseVector] = input
.map{ x => (x.asBreeze, 1) }
.reduce{
(left, right) =>
(left._1 + right._1, left._2 + right._2)
}
.map{ p => (p._1/p._2).fromBreeze }
}
}
A FitOperation[T, I]
具有使用fit
类型实例T
,参数映射和输入调用的方法DataSet[I]
。在我们的案例T=MeanTransformer
和I=DenseVector
。如果我们的拟合步骤取决于在创建时未直接给出的某些参数值,则参数映射是必需的Transformer
。在FitOperation
所述的MeanTransformer
和的DenseVector
给定的输入数据设置的实例,并除以矢量的总数的结果。这样,我们获得了一个DataSet[DenseVector]
具有平均值的单个数据元。
但是如果仔细观察实现,我们会发现平均计算的结果永远不会存储在任何地方。如果我们想在后面的步骤中使用这些知识来调整其他输入的平均值,我们必须保持它。这里是哪里类型的参数,MeanTransformer
这是考虑到该fit
方法的用武之地。我们可以使用此实例来存储状态,该状态由transform
对同一对象起作用的后续 算子操作使用。但首先我们必须MeanTransformer
通过成员字段进行扩展,然后调整FitOperation
实现。
class MeanTransformer extends Transformer[Centering] {
var meanOption: Option[DataSet[DenseVector]] = None
def setMean(mean: Double): Mean = {
parameters.add(MeanTransformer.Mean, mu)
}
}
val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] {
override def fit(instance: MeanTransformer, fitParameters: ParameterMap, input: DataSet[DenseVector]) : Unit = {
import org.apache.flink.ml.math.Breeze._
instance.meanOption = Some(input
.map{ x => (x.asBreeze, 1) }
.reduce{
(left, right) =>
(left._1 + right._1, left._2 + right._2)
}
.map{ p => (p._1/p._2).fromBreeze })
}
}
如果我们查看transform
方法Transformer
,我们将看到我们还需要一个实现TransformOperation
。可能的平均转换实现可能如下所示。
val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] {
override def transform(
instance: MeanTransformer,
transformParameters: ParameterMap,
input: DataSet[DenseVector])
: DataSet[DenseVector] = {
val resultingParameters = parameters ++ transformParameters
val resultingMean = resultingParameters(MeanTransformer.Mean)
instance.meanOption match {
case Some(trainingMean) => {
input.map{ new MeanTransformMapper(resultingMean) }.withBroadcastSet(trainingMean, "trainingMean")
}
case None => throw new RuntimeException("MeanTransformer has not been fitted to data.")
}
}
}
class MeanTransformMapper(resultingMean: Double) extends RichMapFunction[DenseVector, DenseVector] {
var trainingMean: DenseVector = null
override def open(parameters: Configuration): Unit = {
trainingMean = getRuntimeContext().getBroadcastVariable[DenseVector]("trainingMean").get(0)
}
override def map(vector: DenseVector): DenseVector = {
import org.apache.flink.ml.math.Breeze._
val result = vector.asBreeze - trainingMean.asBreeze + resultingMean
result.fromBreeze
}
}
现在我们已经实现了所有实现,以适应实例MeanTransformer
的训练数据集DenseVector
并对其进行转换。但是,当我们执行fit
算子操作时
val trainingData: DataSet[DenseVector] = ...
val meanTransformer = MeanTransformer()
meanTransformer.fit(trainingData)
我们在运行时收到以下错误:"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.math.DenseVector]"
。原因是Scala编译器无法FitOperation
为方法的隐式参数找到具有正确类型参数的拟合值fit
。因此,它选择了一个回退隐式值,它在运行时为您提供此错误消息。为了使编译器能够识别我们的实现,我们必须将其定义为隐式值并将其放在MeanTransformer's
伴随对象的范围内。
object MeanTransformer{
implicit val denseVectorMeanFitOperation = new FitOperation[MeanTransformer, DenseVector] ...
implicit val denseVectorMeanTransformOperation = new TransformOperation[MeanTransformer, DenseVector, DenseVector] ...
}
现在,我们可以调用fit
和transform
我们的MeanTransformer
有DataSet[DenseVector]
作为输入。此外,我们现在可以将此变换器用作分析管道的一部分,其中我们具有DenseVector
输入和预期输出。
val trainingData: DataSet[DenseVector] = ...
val mean = MeanTransformer.setMean(1.0)
val polyFeatures = PolynomialFeatures().setDegree(3)
val pipeline = mean.chainTransformer(polyFeatures)
pipeline.fit(trainingData)
值得注意的是,启用链接不需要额外的代码。系统使用各个组件的 算子操作自动构建管道逻辑。
到目前为止一切正常DenseVector
。但是,如果我们LabeledVector
改用变压器,会发生什么?
val trainingData: DataSet[LabeledVector] = ...
val mean = MeanTransformer()
mean.fit(trainingData)
和以前一样,我们在执行程序时会看到以下异常:"There is no FitOperation defined for class MeanTransformer which trains on a DataSet[org.apache.flink.ml.common.LabeledVector]"
。值得注意的是,这个异常是在飞行前阶段抛出的,这意味着作业尚未提交给运行时系统。这样做的好处是,您不会看到一个运行了几天但由于管道组件不兼容而失败的作业。因此,在一开始就检查类型兼容性是否完整。
为了使MeanTransformer
工作LabeledVector
也顺利进行,我们必须提供相应的 算子操作。因此,我们必须在伴随对象的范围内定义一个FitOperation[MeanTransformer, LabeledVector]
和TransformOperation[MeanTransformer, LabeledVector, LabeledVector]
作为隐式值MeanTransformer
。
object MeanTransformer {
implicit val labeledVectorFitOperation = new FitOperation[MeanTransformer, LabeledVector] ...
implicit val labeledVectorTransformOperation = new TransformOperation[MeanTransformer, LabeledVector, LabeledVector] ...
}
如果我们想要实现a Predictor
而不是a Transformer
,那么我们也必须提供a FitOperation
。此外,a Predictor
RequiredPredictOperation
实现如何根据测试数据计算预测。