分层取样

  先将总体的单位按某种特征分为若干次级总体(层),然后再从每一层内进行单纯随机抽样,组成一个样本的统计学计算方法叫做分层抽样。在spark.mllib中,用key来分层。

  与存在于spark.mllib中的其它统计函数不同,分层采样方法sampleByKeysampleByKeyExact可以在key-value对的RDD上执行。在分层采样中,可以认为key是一个标签,
value是特定的属性。例如,key可以是男人或者女人或者文档id,它相应的value可能是一组年龄或者是文档中的词。sampleByKey方法通过掷硬币的方式决定是否采样一个观察数据,
因此它需要我们传递(pass over)数据并且提供期望的数据大小(size)。sampleByKeyExact比每层使用sampleByKey随机抽样需要更多的有意义的资源,但是它能使样本大小的准确性达到了99.99%

  sampleByKeyExact()允许用户准确抽取f_k * n_k个样本,
这里f_k表示期望获取键为k的样本的比例,n_k表示键为k的键值对的数量。下面是一个使用的例子:

  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.rdd.PairRDDFunctions
  4. val sc: SparkContext = ...
  5. val data = ... // an RDD[(K, V)] of any key value pairs
  6. val fractions: Map[K, Double] = ... // specify the exact fraction desired from each key
  7. // Get an exact sample from each stratum
  8. val approxSample = data.sampleByKey(withReplacement = false, fractions)
  9. val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)

  当withReplacementtrue时,采用PoissonSampler取样器,当withReplacementfalse使,采用BernoulliSampler取样器。

  1. def sampleByKey(withReplacement: Boolean,
  2. fractions: Map[K, Double],
  3. seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
  4. val samplingFunc = if (withReplacement) {
  5. StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
  6. } else {
  7. StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
  8. }
  9. self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
  10. }
  11. def sampleByKeyExact(
  12. withReplacement: Boolean,
  13. fractions: Map[K, Double],
  14. seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope {
  15. val samplingFunc = if (withReplacement) {
  16. StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed)
  17. } else {
  18. StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
  19. }
  20. self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
  21. }

  下面我们分别来看sampleByKeysampleByKeyExact的实现。

1 sampleByKey的实现

  当我们需要不重复抽样时,我们需要用泊松抽样器来抽样。当需要重复抽样时,用伯努利抽样器抽样。sampleByKey的实现比较简单,它就是统一的随机抽样。

1.1 泊松抽样器

  我们首先看泊松抽样器的实现。

  1. def getPoissonSamplingFunction[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)],
  2. fractions: Map[K, Double],
  3. exact: Boolean,
  4. seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
  5. (idx: Int, iter: Iterator[(K, V)]) => {
  6. //初始化随机生成器
  7. val rng = new RandomDataGenerator()
  8. rng.reSeed(seed + idx)
  9. iter.flatMap { item =>
  10. //获得下一个泊松值
  11. val count = rng.nextPoisson(fractions(item._1))
  12. if (count == 0) {
  13. Iterator.empty
  14. } else {
  15. Iterator.fill(count)(item)
  16. }
  17. }
  18. }
  19. }

  getPoissonSamplingFunction返回的是一个函数,传递给mapPartitionsWithIndex处理每个分区的数据。这里RandomDataGenerator是一个随机生成器,它用于同时生成均匀值(uniform values)和泊松值(Poisson values)。

1.2 伯努利抽样器

  1. def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
  2. fractions: Map[K, Double],
  3. exact: Boolean,
  4. seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
  5. var samplingRateByKey = fractions
  6. (idx: Int, iter: Iterator[(K, V)]) => {
  7. //初始化随机生成器
  8. val rng = new RandomDataGenerator()
  9. rng.reSeed(seed + idx)
  10. // Must use the same invoke pattern on the rng as in getSeqOp for without replacement
  11. // in order to generate the same sequence of random numbers when creating the sample
  12. iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1))
  13. }
  14. }

2 sampleByKeyExact的实现

  sampleByKeyExact获取更准确的抽样结果,它的实现也分为两种情况,重复抽样和不重复抽样。前者使用泊松抽样器,后者使用伯努利抽样器。

2.1 泊松抽样器

  1. val counts = Some(rdd.countByKey())
  2. //计算立即接受的样本数量,并且为每层生成候选名单
  3. val finalResult = getAcceptanceResults(rdd, true, fractions, counts, seed)
  4. //决定接受样本的阈值,生成准确的样本大小
  5. val thresholdByKey = computeThresholdByKey(finalResult, fractions)
  6. (idx: Int, iter: Iterator[(K, V)]) => {
  7. val rng = new RandomDataGenerator()
  8. rng.reSeed(seed + idx)
  9. iter.flatMap { item =>
  10. val key = item._1
  11. val acceptBound = finalResult(key).acceptBound
  12. // Must use the same invoke pattern on the rng as in getSeqOp for with replacement
  13. // in order to generate the same sequence of random numbers when creating the sample
  14. val copiesAccepted = if (acceptBound == 0) 0L else rng.nextPoisson(acceptBound)
  15. //候选名单
  16. val copiesWaitlisted = rng.nextPoisson(finalResult(key).waitListBound)
  17. val copiesInSample = copiesAccepted +
  18. (0 until copiesWaitlisted).count(i => rng.nextUniform() < thresholdByKey(key))
  19. if (copiesInSample > 0) {
  20. Iterator.fill(copiesInSample.toInt)(item)
  21. } else {
  22. Iterator.empty
  23. }
  24. }
  25. }

2.2 伯努利抽样

  1. def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)],
  2. fractions: Map[K, Double],
  3. exact: Boolean,
  4. seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = {
  5. var samplingRateByKey = fractions
  6. //计算立即接受的样本数量,并且为每层生成候选名单
  7. val finalResult = getAcceptanceResults(rdd, false, fractions, None, seed)
  8. //决定接受样本的阈值,生成准确的样本大小
  9. samplingRateByKey = computeThresholdByKey(finalResult, fractions)
  10. (idx: Int, iter: Iterator[(K, V)]) => {
  11. val rng = new RandomDataGenerator()
  12. rng.reSeed(seed + idx)
  13. // Must use the same invoke pattern on the rng as in getSeqOp for without replacement
  14. // in order to generate the same sequence of random numbers when creating the sample
  15. iter.filter(t => rng.nextUniform() < samplingRateByKey(t._1))
  16. }
  17. }