转换操作

  GraphX中的转换操作主要有mapVertices,mapEdgesmapTriplets三个,它们在Graph文件中定义,在GraphImpl文件中实现。下面分别介绍这三个方法。

1 mapVertices

  mapVertices用来更新顶点属性。从图的构建那章我们知道,顶点属性保存在边分区中,所以我们需要改变的是边分区中的属性。

  1. override def mapVertices[VD2: ClassTag]
  2. (f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  3. if (eq != null) {
  4. vertices.cache()
  5. // 使用方法f处理vertices
  6. val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
  7. //获得两个不同vertexRDD的不同
  8. val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
  9. //更新ReplicatedVertexView
  10. val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
  11. .updateVertices(changedVerts)
  12. new GraphImpl(newVerts, newReplicatedVertexView)
  13. } else {
  14. GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
  15. }
  16. }

  上面的代码中,当VDVD2类型相同时,我们可以重用没有发生变化的点,否则需要重新创建所有的点。我们分析VDVD2相同的情况,分四步处理。

  • 1 使用方法f处理vertices,获得新的VertexRDD

  • 2 使用在VertexRDD中定义的diff方法求出新VertexRDD和源VertexRDD的不同

  1. override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
  2. val otherPartition = other match {
  3. case other: VertexRDD[_] if this.partitioner == other.partitioner =>
  4. other.partitionsRDD
  5. case _ =>
  6. VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
  7. }
  8. val newPartitionsRDD = partitionsRDD.zipPartitions(
  9. otherPartition, preservesPartitioning = true
  10. ) { (thisIter, otherIter) =>
  11. val thisPart = thisIter.next()
  12. val otherPart = otherIter.next()
  13. Iterator(thisPart.diff(otherPart))
  14. }
  15. this.withPartitionsRDD(newPartitionsRDD)
  16. }

  这个方法首先处理新生成的VertexRDD的分区,如果它的分区和源VertexRDD的分区一致,那么直接取出它的partitionsRDD,否则重新分区后取出它的partitionsRDD
针对新旧两个VertexRDD的所有分区,调用VertexPartitionBaseOps中的diff方法求得分区的不同。

  1. def diff(other: Self[VD]): Self[VD] = {
  2. //首先判断
  3. if (self.index != other.index) {
  4. diff(createUsingIndex(other.iterator))
  5. } else {
  6. val newMask = self.mask & other.mask
  7. var i = newMask.nextSetBit(0)
  8. while (i >= 0) {
  9. if (self.values(i) == other.values(i)) {
  10. newMask.unset(i)
  11. }
  12. i = newMask.nextSetBit(i + 1)
  13. }
  14. this.withValues(other.values).withMask(newMask)
  15. }
  16. }

  该方法隐藏两个VertexRDD中相同的顶点信息,得到一个新的VertexRDD

  • 3 更新ReplicatedVertexView
  1. def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
  2. //生成一个VertexAttributeBlock
  3. val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
  4. .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
  5. hasSrcId, hasDstId))
  6. .partitionBy(edges.partitioner.get)
  7. //生成新的边RDD
  8. val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
  9. (ePartIter, shippedVertsIter) => ePartIter.map {
  10. case (pid, edgePartition) =>
  11. (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
  12. }
  13. })
  14. new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
  15. }

  updateVertices方法返回一个新的ReplicatedVertexView,它更新了边分区中包含的顶点属性。我们看看它的实现过程。首先看shipVertexAttributes方法的调用。
调用shipVertexAttributes方法会生成一个VertexAttributeBlockVertexAttributeBlock包含当前分区的顶点属性,这些属性可以在特定的边分区使用。

  1. def shipVertexAttributes(
  2. shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = {
  3. Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
  4. val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
  5. val vids = new PrimitiveVector[VertexId](initialSize)
  6. val attrs = new PrimitiveVector[VD](initialSize)
  7. var i = 0
  8. routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
  9. if (isDefined(vid)) {
  10. vids += vid
  11. attrs += this(vid)
  12. }
  13. i += 1
  14. }
  15. //(边分区id,VertexAttributeBlock(顶点id,属性))
  16. (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
  17. }
  18. }

  获得新的顶点属性之后,我们就可以调用updateVertices更新边中顶点的属性了,如下面代码所示:

  1. edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))
  2. //更新EdgePartition的属性
  3. def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
  4. val newVertexAttrs = new Array[VD](vertexAttrs.length)
  5. System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
  6. while (iter.hasNext) {
  7. val kv = iter.next()
  8. //global2local获得顶点的本地index
  9. newVertexAttrs(global2local(kv._1)) = kv._2
  10. }
  11. new EdgePartition(
  12. localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
  13. activeSet)
  14. }

2 mapEdges

  mapEdges用来更新边属性。

  1. override def mapEdges[ED2: ClassTag](
  2. f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
  3. val newEdges = replicatedVertexView.edges
  4. .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
  5. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  6. }

  相比于mapVerticesmapEdges显然要简单得多,它只需要根据方法f生成新的EdgeRDD,然后再初始化即可。

3 mapTriplets:用来更新边属性

  mapTriplets用来更新边属性。

  1. override def mapTriplets[ED2: ClassTag](
  2. f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
  3. tripletFields: TripletFields): Graph[VD, ED2] = {
  4. vertices.cache()
  5. replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  6. val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
  7. part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))
  8. }
  9. new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
  10. }

  这段代码中,replicatedVertexView调用upgrade方法修改当前的ReplicatedVertexView,使调用者可以访问到指定级别的边信息(如仅仅可以读源顶点的属性)。

  1. def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
  2. //判断传递级别
  3. val shipSrc = includeSrc && !hasSrcId
  4. val shipDst = includeDst && !hasDstId
  5. if (shipSrc || shipDst) {
  6. val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
  7. vertices.shipVertexAttributes(shipSrc, shipDst)
  8. .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
  9. includeSrc, includeDst, shipSrc, shipDst))
  10. .partitionBy(edges.partitioner.get)
  11. val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
  12. (ePartIter, shippedVertsIter) => ePartIter.map {
  13. case (pid, edgePartition) =>
  14. (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
  15. }
  16. })
  17. edges = newEdges
  18. hasSrcId = includeSrc
  19. hasDstId = includeDst
  20. }
  21. }

  最后,用f处理边,生成新的RDD,最后用新的数据初始化图。

4 总结

  调用mapVertices,mapEdgesmapTriplets时,其内部的结构化索引(Structural indices)并不会发生变化,它们都重用路由表中的数据。