高斯混合模型

  现有的高斯模型有单高斯模型(SGM)和高斯混合模型(GMM)两种。从几何上讲,单高斯分布模型在二维空间上近似于椭圆,在三维空间上近似于椭球。
在很多情况下,属于同一类别的样本点并不满足“椭圆”分布的特性,所以我们需要引入混合高斯模型来解决这种情况。

1 单高斯模型

  多维变量X服从高斯分布时,它的概率密度函数PDF定义如下:

1.1

  在上述定义中,x是维数为D的样本向量,mu是模型期望,sigma是模型协方差。对于单高斯模型,可以明确训练样本是否属于该高斯模型,所以我们经常将mu用训练样本的均值代替,将sigma用训练样本的协方差代替。
假设训练样本属于类别C,那么上面的定义可以修改为下面的形式:

1.2

  这个公式表示样本属于类别C的概率。我们可以根据定义的概率阈值来判断样本是否属于某个类别。

2 高斯混合模型

  高斯混合模型,顾名思义,就是数据可以看作是从多个高斯分布中生成出来的。从中心极限定理可以看出,高斯分布这个假设其实是比较合理的。
为什么我们要假设数据是由若干个高斯分布组合而成的,而不假设是其他分布呢?实际上不管是什么分布,只K取得足够大,这个XX Mixture Model就会变得足够复杂,就可以用来逼近任意连续的概率密度分布。只是因为高斯函数具有良好的计算性能,所GMM被广泛地应用。

  每个GMMK个高斯分布组成,每个高斯分布称为一个组件(Component),这些组件线性加成在一起就组成了GMM的概率密度函数 (1):

1.3

  根据上面的式子,如果我们要从GMM分布中随机地取一个点,需要两步:

  • 随机地在这K个组件之中选一个,每个组件被选中的概率实际上就是它的系数pi_k

  • 选中了组件之后,再单独地考虑从这个组件的分布中选取一个点。

  怎样用GMM来做聚类呢?其实很简单,现在我们有了数据,假定它们是由GMM生成出来的,那么我们只要根据数据推出GMM的概率分布来就可以了,然后GMMK个组件实际上就对应了K个聚类了。
在已知概率密度函数的情况下,要估计其中的参数的过程被称作“参数估计”。

  我们可以利用最大似然估计来确定这些参数,GMM的似然函数 (2) 如下(此处公式有误,括号中的x应该为x_i):

1.4

  可以用EM算法来求解这些参数。EM算法求解的过程如下:

  • 1 E-步。求数据点由各个组件生成的概率(并不是每个组件被选中的概率)。对于每个数据$x_{i}$来说,它由第k个组件生成的概率为公式 (3)

1.5

  在上面的概率公式中,我们假定musigma均是已知的,它们的值来自于初始化值或者上一次迭代。

  • 2 M-步。估计每个组件的参数。由于每个组件都是一个标准的高斯分布,可以很容易分布求出最大似然所对应的参数值,分别如下公式 (4), (5), (6), (7)

1.6

1.7

1.8

1.9

3 源码分析

3.1 实例

  在分析源码前,我们还是先看看高斯混合模型如何使用。

  1. import org.apache.spark.mllib.clustering.GaussianMixture
  2. import org.apache.spark.mllib.clustering.GaussianMixtureModel
  3. import org.apache.spark.mllib.linalg.Vectors
  4. // 加载数据
  5. val data = sc.textFile("data/mllib/gmm_data.txt")
  6. val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
  7. // 使用高斯混合模型聚类
  8. val gmm = new GaussianMixture().setK(2).run(parsedData)
  9. // 保存和加载模型
  10. gmm.save(sc, "myGMMModel")
  11. val sameModel = GaussianMixtureModel.load(sc, "myGMMModel")
  12. // 打印参数
  13. for (i <- 0 until gmm.k) {
  14. println("weight=%f\nmu=%s\nsigma=\n%s\n" format
  15. (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
  16. }

  由上面的代码我们可以知道,使用高斯混合模型聚类使用到了GaussianMixture类中的run方法。下面我们直接进入run方法,分析它的实现。

3.2 高斯混合模型的实现

3.2.1 初始化

  在run方法中,程序所做的第一步就是初始化权重(上文中介绍的pi)及其相对应的高斯分布。

  1. val (weights, gaussians) = initialModel match {
  2. case Some(gmm) => (gmm.weights, gmm.gaussians)
  3. case None => {
  4. val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
  5. (Array.fill(k)(1.0 / k), Array.tabulate(k) { i =>
  6. val slice = samples.view(i * nSamples, (i + 1) * nSamples)
  7. new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
  8. })
  9. }
  10. }

  在上面的代码中,当initialModel为空时,用所有值均为1.0/k的数组初始化权重,用值为MultivariateGaussian对象的数组初始化所有的高斯分布(即上文中提到的组件)。
每一个MultivariateGaussian对象都由从数据集中抽样的子集计算而来。这里用样本数据的均值和方差初始化MultivariateGaussianmusigma

  1. //计算均值
  2. private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = {
  3. val v = BDV.zeros[Double](x(0).length)
  4. x.foreach(xi => v += xi)
  5. v / x.length.toDouble
  6. }
  7. //计算方差
  8. private def initCovariance(x: IndexedSeq[BV[Double]]): BreezeMatrix[Double] = {
  9. val mu = vectorMean(x)
  10. val ss = BDV.zeros[Double](x(0).length)
  11. x.foreach(xi => ss += (xi - mu) :^ 2.0)
  12. diag(ss / x.length.toDouble)
  13. }

3.2.2 EM算法求参数

  初始化后,就可以使用EM算法迭代求似然函数中的参数。迭代结束的条件是迭代次数达到了我们设置的次数或者两次迭代计算的对数似然值之差小于阈值。

  1. while (iter < maxIterations && math.abs(llh-llhp) > convergenceTol)

  在迭代内部,就可以按照E-步M-步来更新参数了。

  • E-步:更新参数gamma
  1. val compute = sc.broadcast(ExpectationSum.add(weights, gaussians)_)
  2. val sums = breezeData.aggregate(ExpectationSum.zero(k, d))(compute.value, _ += _)

  我们先要了解ExpectationSum以及add方法的实现。

  1. private class ExpectationSum(
  2. var logLikelihood: Double,
  3. val weights: Array[Double],
  4. val means: Array[BDV[Double]],
  5. val sigmas: Array[BreezeMatrix[Double]]) extends Serializable

  ExpectationSum是一个聚合类,它表示部分期望结果:主要包含对数似然值,权重值(第二章中介绍的pi),均值,方差。add方法的实现如下:

  1. def add( weights: Array[Double],dists: Array[MultivariateGaussian])
  2. (sums: ExpectationSum, x: BV[Double]): ExpectationSum = {
  3. val p = weights.zip(dists).map {
  4. //计算pi_i * N(x)
  5. case (weight, dist) => MLUtils.EPSILON + weight * dist.pdf(x)
  6. }
  7. val pSum = p.sum
  8. sums.logLikelihood += math.log(pSum)
  9. var i = 0
  10. while (i < sums.k) {
  11. p(i) /= pSum
  12. sums.weights(i) += p(i)
  13. sums.means(i) += x * p(i)
  14. //A := alpha * x * x^T^ + A
  15. BLAS.syr(p(i), Vectors.fromBreeze(x),
  16. Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix])
  17. i = i + 1
  18. }
  19. sums
  20. }

  从上面的实现我们可以看出,最终,logLikelihood表示公式 (2) 中的对数似然。pweights分别表示公式 (3) 中的gammapimeans表示公式 (6) 中的求和部分,sigmas表示公式 (7) 中的求和部分。

  调用RDDaggregate方法,我们可以基于所有给定数据计算上面的值。利用计算的这些新值,我们可以在M-步中更新musigma

  • M-步:更新参数musigma
  1. var i = 0
  2. while (i < k) {
  3. val (weight, gaussian) =
  4. updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights)
  5. weights(i) = weight
  6. gaussians(i) = gaussian
  7. i = i + 1
  8. }
  9. private def updateWeightsAndGaussians(
  10. mean: BDV[Double],
  11. sigma: BreezeMatrix[Double],
  12. weight: Double,
  13. sumWeights: Double): (Double, MultivariateGaussian) = {
  14. // mean/weight
  15. val mu = (mean /= weight)
  16. // -weight * mu * mut +sigma
  17. BLAS.syr(-weight, Vectors.fromBreeze(mu),
  18. Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])
  19. val newWeight = weight / sumWeights
  20. val newGaussian = new MultivariateGaussian(mu, sigma / weight)
  21. (newWeight, newGaussian)
  22. }

  基于 E-步 计算出来的值,根据公式 (6) ,我们可以通过(mean /= weight)来更新mu;根据公式 (7) ,我们可以通过BLAS.syr()来更新sigma;同时,根据公式 (5)
我们可以通过weight / sumWeights来计算pi

  迭代执行以上的 E-步M-步,到达一定的迭代数或者对数似然值变化较小后,我们停止迭代。这时就可以获得聚类后的参数了。

3.3 多元高斯模型中相关方法介绍

  在上面的求参代码中,我们用到了MultivariateGaussian以及MultivariateGaussian中的部分方法,如pdfMultivariateGaussian定义如下:

  1. class MultivariateGaussian @Since("1.3.0") (
  2. @Since("1.3.0") val mu: Vector,
  3. @Since("1.3.0") val sigma: Matrix) extends Serializable

  MultivariateGaussian包含一个向量mu和一个矩阵sigma,分别表示期望和协方差。MultivariateGaussian最重要的方法是pdf,顾名思义就是计算给定数据的概率密度函数。它的实现如下:

  1. private[mllib] def pdf(x: BV[Double]): Double = {
  2. math.exp(logpdf(x))
  3. }
  4. private[mllib] def logpdf(x: BV[Double]): Double = {
  5. val delta = x - breezeMu
  6. val v = rootSigmaInv * delta
  7. u + v.t * v * -0.5
  8. }

  上面的rootSigmaInvu通过方法calculateCovarianceConstants计算。根据公式 (1) ,这个概率密度函数的计算需要计算sigma的行列式以及逆。

  1. sigma = U * D * U.t
  2. inv(Sigma) = U * inv(D) * U.t = (D^{-1/2}^ * U.t).t * (D^{-1/2}^ * U.t)
  3. -0.5 * (x-mu).t * inv(Sigma) * (x-mu) = -0.5 * norm(D^{-1/2}^ * U.t * (x-mu))^2^

  这里,UD是奇异值分解得到的子矩阵。calculateCovarianceConstants具体的实现代码如下:

  1. private def calculateCovarianceConstants: (DBM[Double], Double) = {
  2. val eigSym.EigSym(d, u) = eigSym(sigma.toBreeze.toDenseMatrix) // sigma = u * diag(d) * u.t
  3. val tol = MLUtils.EPSILON * max(d) * d.length
  4. try {
  5. //所有非0奇异值的对数和
  6. val logPseudoDetSigma = d.activeValuesIterator.filter(_ > tol).map(math.log).sum
  7. //通过求非负值的倒数平方根,计算奇异值对角矩阵的根伪逆矩阵
  8. val pinvS = diag(new DBV(d.map(v => if (v > tol) math.sqrt(1.0 / v) else 0.0).toArray))
  9. (pinvS * u.t, -0.5 * (mu.size * math.log(2.0 * math.Pi) + logPseudoDetSigma))
  10. } catch {
  11. case uex: UnsupportedOperationException =>
  12. throw new IllegalArgumentException("Covariance matrix has no non-zero singular values")
  13. }
  14. }

  上面的代码中,eigSym用于分解sigma矩阵。

4 参考文献

【1】漫谈 Clustering (3): Gaussian Mixture Model