PageRank

  1. import scala.language.postfixOps
  2. import scala.reflect.ClassTag
  3. import org.apache.spark.graphx._
  4. import org.apache.spark.internal.Logging
  5. /**
  6. * PageRank algorithm implementation. There are two implementations of PageRank implemented.
  7. *
  8. * The first implementation uses the standalone [[Graph]] interface and runs PageRank
  9. * for a fixed number of iterations:
  10. * {{{
  11. * var PR = Array.fill(n)( 1.0 )
  12. * val oldPR = Array.fill(n)( 1.0 )
  13. * for( iter <- 0 until numIter ) {
  14. * swap(oldPR, PR)
  15. * for( i <- 0 until n ) {
  16. * PR[i] = alpha + (1 - alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
  17. * }
  18. * }
  19. * }}}
  20. *
  21. * The second implementation uses the [[Pregel]] interface and runs PageRank until
  22. * convergence:
  23. *
  24. * {{{
  25. * var PR = Array.fill(n)( 1.0 )
  26. * val oldPR = Array.fill(n)( 0.0 )
  27. * while( max(abs(PR - oldPr)) > tol ) {
  28. * swap(oldPR, PR)
  29. * for( i <- 0 until n if abs(PR[i] - oldPR[i]) > tol ) {
  30. * PR[i] = alpha + (1 - \alpha) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum
  31. * }
  32. * }
  33. * }}}
  34. *
  35. * `alpha` is the random reset probability (typically 0.15), `inNbrs[i]` is the set of
  36. * neighbors which link to `i` and `outDeg[j]` is the out degree of vertex `j`.
  37. *
  38. * Note that this is not the "normalized" PageRank and as a consequence pages that have no
  39. * inlinks will have a PageRank of alpha.
  40. */
  41. object PageRank extends Logging {
  42. /**
  43. * Run PageRank for a fixed number of iterations returning a graph
  44. * with vertex attributes containing the PageRank and edge
  45. * attributes the normalized edge weight.
  46. *
  47. * @tparam VD the original vertex attribute (not used)
  48. * @tparam ED the original edge attribute (not used)
  49. *
  50. * @param graph the graph on which to compute PageRank
  51. * @param numIter the number of iterations of PageRank to run
  52. * @param resetProb the random reset probability (alpha)
  53. *
  54. * @return the graph containing with each vertex containing the PageRank and each edge
  55. * containing the normalized weight.
  56. */
  57. def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,
  58. resetProb: Double = 0.15): Graph[Double, Double] =
  59. {
  60. runWithOptions(graph, numIter, resetProb)
  61. }
  62. /**
  63. * Run PageRank for a fixed number of iterations returning a graph
  64. * with vertex attributes containing the PageRank and edge
  65. * attributes the normalized edge weight.
  66. *
  67. * @tparam VD the original vertex attribute (not used)
  68. * @tparam ED the original edge attribute (not used)
  69. *
  70. * @param graph the graph on which to compute PageRank
  71. * @param numIter the number of iterations of PageRank to run
  72. * @param resetProb the random reset probability (alpha)
  73. * @param srcId the source vertex for a Personalized Page Rank (optional)
  74. *
  75. * @return the graph containing with each vertex containing the PageRank and each edge
  76. * containing the normalized weight.
  77. *
  78. */
  79. def runWithOptions[VD: ClassTag, ED: ClassTag](
  80. graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
  81. srcId: Option[VertexId] = None): Graph[Double, Double] =
  82. {
  83. require(numIter > 0, s"Number of iterations must be greater than 0," +
  84. s" but got ${numIter}")
  85. require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
  86. s" to [0, 1], but got ${resetProb}")
  87. val personalized = srcId isDefined
  88. val src: VertexId = srcId.getOrElse(-1L)
  89. // Initialize the PageRank graph with each edge attribute having
  90. // weight 1/outDegree and each vertex with attribute resetProb.
  91. // When running personalized pagerank, only the source vertex
  92. // has an attribute resetProb. All others are set to 0.
  93. var rankGraph: Graph[Double, Double] = graph
  94. // Associate the degree with each vertex
  95. .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) }
  96. // Set the weight on the edges based on the degree
  97. .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src )
  98. // Set the vertex attributes to the initial pagerank values
  99. .mapVertices { (id, attr) =>
  100. if (!(id != src && personalized)) resetProb else 0.0
  101. }
  102. def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 }
  103. var iteration = 0
  104. var prevRankGraph: Graph[Double, Double] = null
  105. while (iteration < numIter) {
  106. rankGraph.cache()
  107. // Compute the outgoing rank contributions of each vertex, perform local preaggregation, and
  108. // do the final aggregation at the receiving vertices. Requires a shuffle for aggregation.
  109. val rankUpdates = rankGraph.aggregateMessages[Double](
  110. ctx => ctx.sendToDst(ctx.srcAttr * ctx.attr), _ + _, TripletFields.Src)
  111. // Apply the final rank updates to get the new ranks, using join to preserve ranks of vertices
  112. // that didn't receive a message. Requires a shuffle for broadcasting updated ranks to the
  113. // edge partitions.
  114. prevRankGraph = rankGraph
  115. val rPrb = if (personalized) {
  116. (src: VertexId, id: VertexId) => resetProb * delta(src, id)
  117. } else {
  118. (src: VertexId, id: VertexId) => resetProb
  119. }
  120. rankGraph = rankGraph.joinVertices(rankUpdates) {
  121. (id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum
  122. }.cache()
  123. rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices
  124. logInfo(s"PageRank finished iteration $iteration.")
  125. prevRankGraph.vertices.unpersist(false)
  126. prevRankGraph.edges.unpersist(false)
  127. iteration += 1
  128. }
  129. rankGraph
  130. }
  131. /**
  132. * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
  133. * PageRank and edge attributes containing the normalized edge weight.
  134. *
  135. * @tparam VD the original vertex attribute (not used)
  136. * @tparam ED the original edge attribute (not used)
  137. *
  138. * @param graph the graph on which to compute PageRank
  139. * @param tol the tolerance allowed at convergence (smaller => more accurate).
  140. * @param resetProb the random reset probability (alpha)
  141. *
  142. * @return the graph containing with each vertex containing the PageRank and each edge
  143. * containing the normalized weight.
  144. */
  145. def runUntilConvergence[VD: ClassTag, ED: ClassTag](
  146. graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double] =
  147. {
  148. runUntilConvergenceWithOptions(graph, tol, resetProb)
  149. }
  150. /**
  151. * Run a dynamic version of PageRank returning a graph with vertex attributes containing the
  152. * PageRank and edge attributes containing the normalized edge weight.
  153. *
  154. * @tparam VD the original vertex attribute (not used)
  155. * @tparam ED the original edge attribute (not used)
  156. *
  157. * @param graph the graph on which to compute PageRank
  158. * @param tol the tolerance allowed at convergence (smaller => more accurate).
  159. * @param resetProb the random reset probability (alpha)
  160. * @param srcId the source vertex for a Personalized Page Rank (optional)
  161. *
  162. * @return the graph containing with each vertex containing the PageRank and each edge
  163. * containing the normalized weight.
  164. */
  165. def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](
  166. graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15,
  167. srcId: Option[VertexId] = None): Graph[Double, Double] =
  168. {
  169. require(tol >= 0, s"Tolerance must be no less than 0, but got ${tol}")
  170. require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" +
  171. s" to [0, 1], but got ${resetProb}")
  172. val personalized = srcId.isDefined
  173. val src: VertexId = srcId.getOrElse(-1L)
  174. // Initialize the pagerankGraph with each edge attribute
  175. // having weight 1/outDegree and each vertex with attribute 1.0.
  176. val pagerankGraph: Graph[(Double, Double), Double] = graph
  177. // Associate the degree with each vertex
  178. .outerJoinVertices(graph.outDegrees) {
  179. (vid, vdata, deg) => deg.getOrElse(0)
  180. }
  181. // Set the weight on the edges based on the degree
  182. .mapTriplets( e => 1.0 / e.srcAttr )
  183. // Set the vertex attributes to (initialPR, delta = 0)
  184. .mapVertices { (id, attr) =>
  185. if (id == src) (resetProb, Double.NegativeInfinity) else (0.0, 0.0)
  186. }
  187. .cache()
  188. // Define the three functions needed to implement PageRank in the GraphX
  189. // version of Pregel
  190. def vertexProgram(id: VertexId, attr: (Double, Double), msgSum: Double): (Double, Double) = {
  191. val (oldPR, lastDelta) = attr
  192. val newPR = oldPR + (1.0 - resetProb) * msgSum
  193. (newPR, newPR - oldPR)
  194. }
  195. def personalizedVertexProgram(id: VertexId, attr: (Double, Double),
  196. msgSum: Double): (Double, Double) = {
  197. val (oldPR, lastDelta) = attr
  198. var teleport = oldPR
  199. val delta = if (src==id) 1.0 else 0.0
  200. teleport = oldPR*delta
  201. val newPR = teleport + (1.0 - resetProb) * msgSum
  202. val newDelta = if (lastDelta == Double.NegativeInfinity) newPR else newPR - oldPR
  203. (newPR, newDelta)
  204. }
  205. def sendMessage(edge: EdgeTriplet[(Double, Double), Double]) = {
  206. if (edge.srcAttr._2 > tol) {
  207. Iterator((edge.dstId, edge.srcAttr._2 * edge.attr))
  208. } else {
  209. Iterator.empty
  210. }
  211. }
  212. def messageCombiner(a: Double, b: Double): Double = a + b
  213. // The initial message received by all vertices in PageRank
  214. val initialMessage = if (personalized) 0.0 else resetProb / (1.0 - resetProb)
  215. // Execute a dynamic version of Pregel.
  216. val vp = if (personalized) {
  217. (id: VertexId, attr: (Double, Double), msgSum: Double) =>
  218. personalizedVertexProgram(id, attr, msgSum)
  219. } else {
  220. (id: VertexId, attr: (Double, Double), msgSum: Double) =>
  221. vertexProgram(id, attr, msgSum)
  222. }
  223. Pregel(pagerankGraph, initialMessage, activeDirection = EdgeDirection.Out)(
  224. vp, sendMessage, messageCombiner)
  225. .mapVertices((vid, attr) => attr._1)
  226. } // end of deltaPageRank
  227. }