连通图

  1. import scala.reflect.ClassTag
  2. import org.apache.spark.graphx._
  3. /** Connected components algorithm. */
  4. object ConnectedComponents {
  5. /**
  6. * Compute the connected component membership of each vertex and return a graph with the vertex
  7. * value containing the lowest vertex id in the connected component containing that vertex.
  8. *
  9. * @tparam VD the vertex attribute type (discarded in the computation)
  10. * @tparam ED the edge attribute type (preserved in the computation)
  11. * @param graph the graph for which to compute the connected components
  12. * @param maxIterations the maximum number of iterations to run for
  13. * @return a graph with vertex attributes containing the smallest vertex in each
  14. * connected component
  15. */
  16. def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
  17. maxIterations: Int): Graph[VertexId, ED] = {
  18. require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
  19. s" but got ${maxIterations}")
  20. val ccGraph = graph.mapVertices { case (vid, _) => vid }
  21. def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
  22. if (edge.srcAttr < edge.dstAttr) {
  23. Iterator((edge.dstId, edge.srcAttr))
  24. } else if (edge.srcAttr > edge.dstAttr) {
  25. Iterator((edge.srcId, edge.dstAttr))
  26. } else {
  27. Iterator.empty
  28. }
  29. }
  30. val initialMessage = Long.MaxValue
  31. val pregelGraph = Pregel(ccGraph, initialMessage,
  32. maxIterations, EdgeDirection.Either)(
  33. vprog = (id, attr, msg) => math.min(attr, msg),
  34. sendMsg = sendMessage,
  35. mergeMsg = (a, b) => math.min(a, b))
  36. ccGraph.unpersist()
  37. pregelGraph
  38. } // end of connectedComponents
  39. /**
  40. * Compute the connected component membership of each vertex and return a graph with the vertex
  41. * value containing the lowest vertex id in the connected component containing that vertex.
  42. *
  43. * @tparam VD the vertex attribute type (discarded in the computation)
  44. * @tparam ED the edge attribute type (preserved in the computation)
  45. * @param graph the graph for which to compute the connected components
  46. * @return a graph with vertex attributes containing the smallest vertex in each
  47. * connected component
  48. */
  49. def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
  50. run(graph, Int.MaxValue)
  51. }
  52. }