点分割存储

  在第一章分布式图系统中,我们介绍了图存储的两种方式:点分割存储和边分割存储。GraphX借鉴powerGraph,使用的是点分割方式存储图。这种存储方式特点是任何一条边只会出现在一台机器上,每个点有可能分布到不同的机器上。
当点被分割到不同机器上时,是相同的镜像,但是有一个点作为主点,其他的点作为虚点,当点的数据发生变化时,先更新主点的数据,然后将所有更新好的数据发送到虚点所在的所有机器,更新虚点。
这样做的好处是在边的存储上是没有冗余的,而且对于某个点与它的邻居的交互操作,只要满足交换律和结合律,就可以在不同的机器上面执行,网络开销较小。但是这种分割方式会存储多份点数据,更新点时,
会发生网络传输,并且有可能出现同步问题。

  GraphX在进行图分割时,有几种不同的分区(partition)策略,它通过PartitionStrategy专门定义这些策略。在PartitionStrategy中,总共定义了EdgePartition2DEdgePartition1DRandomVertexCut以及
CanonicalRandomVertexCut这四种不同的分区策略。下面分别介绍这几种策略。

1 RandomVertexCut

  1. case object RandomVertexCut extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. math.abs((src, dst).hashCode()) % numParts
  4. }
  5. }

  这个方法比较简单,通过取源顶点和目标顶点id的哈希值来将边分配到不同的分区。这个方法会产生一个随机的边分割,两个顶点之间相同方向的边会分配到同一个分区。

2 CanonicalRandomVertexCut

  1. case object CanonicalRandomVertexCut extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. if (src < dst) {
  4. math.abs((src, dst).hashCode()) % numParts
  5. } else {
  6. math.abs((dst, src).hashCode()) % numParts
  7. }
  8. }
  9. }

  这种分割方法和前一种方法没有本质的不同。不同的是,哈希值的产生带有确定的方向(即两个顶点中较小id的顶点在前)。两个顶点之间所有的边都会分配到同一个分区,而不管方向如何。

3 EdgePartition1D

  1. case object EdgePartition1D extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. val mixingPrime: VertexId = 1125899906842597L
  4. (math.abs(src * mixingPrime) % numParts).toInt
  5. }
  6. }

  这种方法仅仅根据源顶点id来将边分配到不同的分区。有相同源顶点的边会分配到同一分区。

4 EdgePartition2D

  1. case object EdgePartition2D extends PartitionStrategy {
  2. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
  3. val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
  4. val mixingPrime: VertexId = 1125899906842597L
  5. if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
  6. // Use old method for perfect squared to ensure we get same results
  7. val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
  8. val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
  9. (col * ceilSqrtNumParts + row) % numParts
  10. } else {
  11. // Otherwise use new method
  12. val cols = ceilSqrtNumParts
  13. val rows = (numParts + cols - 1) / cols
  14. val lastColRows = numParts - rows * (cols - 1)
  15. val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
  16. val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
  17. col * rows + row
  18. }
  19. }
  20. }

  这种分割方法同时使用到了源顶点id和目的顶点id。它使用稀疏边连接矩阵的2维区分来将边分配到不同的分区,从而保证顶点的备份数不大于2 * sqrt(numParts)的限制。这里numParts表示分区数。
这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。当分区数能完全开方时,采用下面的方法:

  1. val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
  2. val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
  3. (col * ceilSqrtNumParts + row) % numParts

  当分区数不能完全开方时,采用下面的方法。这个方法的最后一列允许拥有不同的行数。

  1. val cols = ceilSqrtNumParts
  2. val rows = (numParts + cols - 1) / cols
  3. //最后一列允许不同的行数
  4. val lastColRows = numParts - rows * (cols - 1)
  5. val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
  6. val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
  7. col * rows + row

  下面举个例子来说明该方法。假设我们有一个拥有12个顶点的图,要把它切分到9台机器。我们可以用下面的稀疏矩阵来表示:

  1. __________________________________
  2. v0 | P0 * | P1 | P2 * |
  3. v1 | **** | * | |
  4. v2 | ******* | ** | **** |
  5. v3 | ***** | * * | * |
  6. ----------------------------------
  7. v4 | P3 * | P4 *** | P5 ** * |
  8. v5 | * * | * | |
  9. v6 | * | ** | **** |
  10. v7 | * * * | * * | * |
  11. ----------------------------------
  12. v8 | P6 * | P7 * | P8 * *|
  13. v9 | * | * * | |
  14. v10 | * | ** | * * |
  15. v11 | * <-E | *** | ** |
  16. ----------------------------------

  上面的例子中*表示分配到处理器上的边。E表示连接顶点v11v1的边,它被分配到了处理器P6上。为了获得边所在的处理器,我们将矩阵切分为sqrt(numParts) * sqrt(numParts)块。
注意,上图中与顶点v11相连接的边只出现在第一列的块(P0,P3,P6)或者最后一行的块(P6,P7,P8)中,这保证了V11的副本数不会超过2 * sqrt(numParts)份,在上例中即副本不能超过6份。

  在上面的例子中,P0里面存在很多边,这会造成工作的不均衡。为了提高均衡,我们首先用顶点id乘以一个大的素数,然后再shuffle顶点的位置。乘以一个大的素数本质上不能解决不平衡的问题,只是减少了不平衡的情况发生。

5 参考文献

【1】spark源码