Iterative Graph Processing

Gelly exploits Flink’s efficient iteration operators to support large-scale iterative graph processing. Currently, we provide implementations of the vertex-centric, scatter-gather, and gather-sum-apply models. In the following sections, we describe these abstractions and show how you can use them in Gelly.

Vertex-Centric Iterations

The vertex-centric model, also known as “think like a vertex” or “Pregel”, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, each vertex executes one user-defined function. Vertices communicate with other vertices through messages. A vertex can send a message to any other vertex in the graph, as long as it knows its unique ID.

The computational model is shown in the figure below. The dotted boxes correspond to parallelization units. In each superstep, all active vertices execute the same user-defined computation in parallel. Supersteps are executed synchronously, so that messages sent during one superstep are guaranteed to be delivered in the beginning of the next superstep.

Vertex-Centric Computational Model

To use vertex-centric iterations in Gelly, the user only needs to define the vertex compute function, ComputeFunction. This function and the maximum number of iterations to run are given as parameters to Gelly’s runVertexCentricIteration. This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values. An optional message combiner, MessageCombiner, can be defined to reduce communication costs.

Let us consider computing Single-Source-Shortest-Paths with vertex-centric iterations. Initially, each vertex has a value of infinite distance, except from the source vertex, which has a value of zero. During the first superstep, the source propagates distances to its neighbors. During the following supersteps, each vertex checks its received messages and chooses the minimum distance among them. If this distance is smaller than its current value, it updates its state and produces messages for its neighbors. If a vertex does not change its value during a superstep, then it does not produce any messages for its neighbors for the next superstep. The algorithm converges when there are no value updates or the maximum number of supersteps has been reached. In this algorithm, a message combiner can be used to reduce the number of messages sent to a target vertex.

Java

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...;
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the vertex-centric iteration
  6. Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
  7. new SSSPComputeFunction(), new SSSPCombiner(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
  12. public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) {
  13. double minDistance = (vertex.getId().equals(srcId)) ? 0d : Double.POSITIVE_INFINITY;
  14. for (Double msg : messages) {
  15. minDistance = Math.min(minDistance, msg);
  16. }
  17. if (minDistance < vertex.getValue()) {
  18. setNewVertexValue(minDistance);
  19. for (Edge<Long, Double> e: getEdges()) {
  20. sendMessageTo(e.getTarget(), minDistance + e.getValue());
  21. }
  22. }
  23. }
  24. // message combiner
  25. public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
  26. public void combineMessages(MessageIterator<Double> messages) {
  27. double minMessage = Double.POSITIVE_INFINITY;
  28. for (Double msg: messages) {
  29. minMessage = Math.min(minMessage, msg);
  30. }
  31. sendCombinedMessage(minMessage);
  32. }
  33. }

Scala

  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the vertex-centric iteration
  6. val result = graph.runVertexCentricIteration(new SSSPComputeFunction, new SSSPCombiner, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. final class SSSPComputeFunction extends ComputeFunction[Long, Double, Double, Double] {
  11. override def compute(vertex: Vertex[Long, Double], messages: MessageIterator[Double]) = {
  12. var minDistance = if (vertex.getId.equals(srcId)) 0 else Double.MaxValue
  13. while (messages.hasNext) {
  14. val msg = messages.next
  15. if (msg < minDistance) {
  16. minDistance = msg
  17. }
  18. }
  19. if (vertex.getValue > minDistance) {
  20. setNewVertexValue(minDistance)
  21. for (edge: Edge[Long, Double] <- getEdges) {
  22. sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
  23. }
  24. }
  25. }
  26. // message combiner
  27. final class SSSPCombiner extends MessageCombiner[Long, Double] {
  28. override def combineMessages(messages: MessageIterator[Double]) {
  29. var minDistance = Double.MaxValue
  30. while (messages.hasNext) {
  31. val msg = inMessages.next
  32. if (msg < minDistance) {
  33. minDistance = msg
  34. }
  35. }
  36. sendCombinedMessage(minMessage)
  37. }
  38. }

Configuring a Vertex-Centric Iteration

A vertex-centric iteration can be configured using a VertexCentricConfiguration object. Currently, the following parameters can be specified:

  • Name: The name for the vertex-centric iteration. The name is displayed in logs and messages and can be specified using the setName() method.

  • Parallelism: The parallelism for the iteration. It can be set using the setParallelism() method.

  • Solution set in unmanaged memory: Defines whether the solution set is kept in managed memory (Flink’s internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the setSolutionSetUnmanagedMemory() method.

  • Aggregators: Iteration aggregators can be registered using the registerAggregator() method. An iteration aggregator combines all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined ComputeFunction.

  • Broadcast Variables: DataSets can be added as Broadcast Variables to the ComputeFunction, using the addBroadcastSet() method.

    Java

  1. Graph<Long, Double, Double> graph = ...;
  2. // configure the iteration
  3. VertexCentricConfiguration parameters = new VertexCentricConfiguration();
  4. // set the iteration name
  5. parameters.setName("Gelly Iteration");
  6. // set the parallelism
  7. parameters.setParallelism(16);
  8. // register an aggregator
  9. parameters.registerAggregator("sumAggregator", new LongSumAggregator());
  10. // run the vertex-centric iteration, also passing the configuration parameters
  11. Graph<Long, Long, Double> result =
  12. graph.runVertexCentricIteration(
  13. new Compute(), null, maxIterations, parameters);
  14. // user-defined function
  15. public static final class Compute extends ComputeFunction {
  16. LongSumAggregator aggregator = new LongSumAggregator();
  17. public void preSuperstep() {
  18. // retrieve the Aggregator
  19. aggregator = getIterationAggregator("sumAggregator");
  20. }
  21. public void compute(Vertex<Long, Long> vertex, MessageIterator inMessages) {
  22. //do some computation
  23. Long partialValue = ...
  24. // aggregate the partial value
  25. aggregator.aggregate(partialValue);
  26. // update the vertex value
  27. setNewVertexValue(...);
  28. }
  29. }

Scala

  1. val graph: Graph[Long, Long, Double] = ...
  2. val parameters = new VertexCentricConfiguration
  3. // set the iteration name
  4. parameters.setName("Gelly Iteration")
  5. // set the parallelism
  6. parameters.setParallelism(16)
  7. // register an aggregator
  8. parameters.registerAggregator("sumAggregator", new LongSumAggregator)
  9. // run the vertex-centric iteration, also passing the configuration parameters
  10. val result = graph.runVertexCentricIteration(new Compute, new Combiner, maxIterations, parameters)
  11. // user-defined function
  12. final class Compute extends ComputeFunction {
  13. var aggregator = new LongSumAggregator
  14. override def preSuperstep {
  15. // retrieve the Aggregator
  16. aggregator = getIterationAggregator("sumAggregator")
  17. }
  18. override def compute(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
  19. //do some computation
  20. val partialValue = ...
  21. // aggregate the partial value
  22. aggregator.aggregate(partialValue)
  23. // update the vertex value
  24. setNewVertexValue(...)
  25. }
  26. }

Scatter-Gather Iterations

The scatter-gather model, also known as “signal/collect” model, expresses computation from the perspective of a vertex in the graph. The computation proceeds in synchronized iteration steps, called supersteps. In each superstep, a vertex produces messages for other vertices and updates its value based on the messages it receives. To use scatter-gather iterations in Gelly, the user only needs to define how a vertex behaves in each superstep:

  • Scatter: produces the messages that a vertex will send to other vertices.
  • Gather: updates the vertex value using received messages.

Gelly provides methods for scatter-gather iterations. The user only needs to implement two functions, corresponding to the scatter and gather phases. The first function is a ScatterFunction, which allows a vertex to send out messages to other vertices. Messages are received during the same superstep as they are sent. The second function is GatherFunction, which defines how a vertex will update its value based on the received messages. These functions and the maximum number of iterations to run are given as parameters to Gelly’s runScatterGatherIteration. This method will execute the scatter-gather iteration on the input Graph and return a new Graph, with updated vertex values.

A scatter-gather iteration can be extended with information such as the total number of vertices, the in degree and out degree. Additionally, the neighborhood type (in/out/all) over which to run the scatter-gather iteration can be specified. By default, the updates from the in-neighbors are used to modify the current vertex’s state and messages are sent to out-neighbors.

Let us consider computing Single-Source-Shortest-Paths with scatter-gather iterations on the following graph and let vertex 1 be the source. In each superstep, each vertex sends a candidate distance message to all its neighbors. The message value is the sum of the current value of the vertex and the edge weight connecting this vertex with its neighbor. Upon receiving candidate distance messages, each vertex calculates the minimum distance and, if a shorter path has been discovered, it updates its value. If a vertex does not change its value during a superstep, then it does not produce messages for its neighbors for the next superstep. The algorithm converges when there are no value updates.

Scatter-gather SSSP superstep 1

Java

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...;
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the scatter-gather iteration
  6. Graph<Long, Double, Double> result = graph.runScatterGatherIteration(
  7. new MinDistanceMessenger(), new VertexDistanceUpdater(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. // scatter: messaging
  12. public static final class MinDistanceMessenger extends ScatterFunction<Long, Double, Double, Double> {
  13. public void sendMessages(Vertex<Long, Double> vertex) {
  14. for (Edge<Long, Double> edge : getEdges()) {
  15. sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
  16. }
  17. }
  18. }
  19. // gather: vertex update
  20. public static final class VertexDistanceUpdater extends GatherFunction<Long, Double, Double> {
  21. public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
  22. Double minDistance = Double.MAX_VALUE;
  23. for (double msg : inMessages) {
  24. if (msg < minDistance) {
  25. minDistance = msg;
  26. }
  27. }
  28. if (vertex.getValue() > minDistance) {
  29. setNewVertexValue(minDistance);
  30. }
  31. }
  32. }

Scala

  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the scatter-gather iteration
  6. val result = graph.runScatterGatherIteration(new MinDistanceMessenger, new VertexDistanceUpdater, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. // messaging
  11. final class MinDistanceMessenger extends ScatterFunction[Long, Double, Double, Double] {
  12. override def sendMessages(vertex: Vertex[Long, Double]) = {
  13. for (edge: Edge[Long, Double] <- getEdges) {
  14. sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
  15. }
  16. }
  17. }
  18. // vertex update
  19. final class VertexDistanceUpdater extends GatherFunction[Long, Double, Double] {
  20. override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) = {
  21. var minDistance = Double.MaxValue
  22. while (inMessages.hasNext) {
  23. val msg = inMessages.next
  24. if (msg < minDistance) {
  25. minDistance = msg
  26. }
  27. }
  28. if (vertex.getValue > minDistance) {
  29. setNewVertexValue(minDistance)
  30. }
  31. }
  32. }

Configuring a Scatter-Gather Iteration

A scatter-gather iteration can be configured using a ScatterGatherConfiguration object. Currently, the following parameters can be specified:

  • Name: The name for the scatter-gather iteration. The name is displayed in logs and messages and can be specified using the setName() method.

  • Parallelism: The parallelism for the iteration. It can be set using the setParallelism() method.

  • Solution set in unmanaged memory: Defines whether the solution set is kept in managed memory (Flink’s internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the setSolutionSetUnmanagedMemory() method.

  • Aggregators: Iteration aggregators can be registered using the registerAggregator() method. An iteration aggregator combines all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined ScatterFunction and GatherFunction.

  • Broadcast Variables: DataSets can be added as Broadcast Variables to the ScatterFunction and GatherFunction, using the addBroadcastSetForUpdateFunction() and addBroadcastSetForMessagingFunction() methods, respectively.

  • Number of Vertices: Accessing the total number of vertices within the iteration. This property can be set using the setOptNumVertices() method. The number of vertices can then be accessed in the vertex update function and in the messaging function using the getNumberOfVertices() method. If the option is not set in the configuration, this method will return -1.

  • Degrees: Accessing the in/out degree for a vertex within an iteration. This property can be set using the setOptDegrees() method. The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using the getInDegree() and getOutDegree() methods. If the degrees option is not set in the configuration, these methods will return -1.

  • Messaging Direction: By default, a vertex sends messages to its out-neighbors and updates its value based on messages received from its in-neighbors. This configuration option allows users to change the messaging direction to either EdgeDirection.IN, EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the setDirection() method.

    Java

  1. Graph<Long, Double, Double> graph = ...;
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the iteration name
  5. parameters.setName("Gelly Iteration");
  6. // set the parallelism
  7. parameters.setParallelism(16);
  8. // register an aggregator
  9. parameters.registerAggregator("sumAggregator", new LongSumAggregator());
  10. // run the scatter-gather iteration, also passing the configuration parameters
  11. Graph<Long, Double, Double> result =
  12. graph.runScatterGatherIteration(
  13. new Messenger(), new VertexUpdater(), maxIterations, parameters);
  14. // user-defined functions
  15. public static final class Messenger extends ScatterFunction {...}
  16. public static final class VertexUpdater extends GatherFunction {
  17. LongSumAggregator aggregator = new LongSumAggregator();
  18. public void preSuperstep() {
  19. // retrieve the Aggregator
  20. aggregator = getIterationAggregator("sumAggregator");
  21. }
  22. public void updateVertex(Vertex<Long, Long> vertex, MessageIterator inMessages) {
  23. //do some computation
  24. Long partialValue = ...
  25. // aggregate the partial value
  26. aggregator.aggregate(partialValue);
  27. // update the vertex value
  28. setNewVertexValue(...);
  29. }
  30. }

Scala

  1. val graph: Graph[Long, Double, Double] = ...
  2. val parameters = new ScatterGatherConfiguration
  3. // set the iteration name
  4. parameters.setName("Gelly Iteration")
  5. // set the parallelism
  6. parameters.setParallelism(16)
  7. // register an aggregator
  8. parameters.registerAggregator("sumAggregator", new LongSumAggregator)
  9. // run the scatter-gather iteration, also passing the configuration parameters
  10. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  11. // user-defined functions
  12. final class Messenger extends ScatterFunction {...}
  13. final class VertexUpdater extends GatherFunction {
  14. var aggregator = new LongSumAggregator
  15. override def preSuperstep {
  16. // retrieve the Aggregator
  17. aggregator = getIterationAggregator("sumAggregator")
  18. }
  19. override def updateVertex(vertex: Vertex[Long, Long], inMessages: MessageIterator[Long]) {
  20. //do some computation
  21. val partialValue = ...
  22. // aggregate the partial value
  23. aggregator.aggregate(partialValue)
  24. // update the vertex value
  25. setNewVertexValue(...)
  26. }
  27. }

The following example illustrates the usage of the degree as well as the number of vertices options.

Java

  1. Graph<Long, Double, Double> graph = ...;
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true);
  6. // set the degree option to true
  7. parameters.setOptDegrees(true);
  8. // run the scatter-gather iteration, also passing the configuration parameters
  9. Graph<Long, Double, Double> result =
  10. graph.runScatterGatherIteration(
  11. new Messenger(), new VertexUpdater(), maxIterations, parameters);
  12. // user-defined functions
  13. public static final class Messenger extends ScatterFunction {
  14. ...
  15. // retrieve the vertex out-degree
  16. outDegree = getOutDegree();
  17. ...
  18. }
  19. public static final class VertexUpdater extends GatherFunction {
  20. ...
  21. // get the number of vertices
  22. long numVertices = getNumberOfVertices();
  23. ...
  24. }

Scala

  1. val graph: Graph[Long, Double, Double] = ...
  2. // configure the iteration
  3. val parameters = new ScatterGatherConfiguration
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true)
  6. // set the degree option to true
  7. parameters.setOptDegrees(true)
  8. // run the scatter-gather iteration, also passing the configuration parameters
  9. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  10. // user-defined functions
  11. final class Messenger extends ScatterFunction {
  12. ...
  13. // retrieve the vertex out-degree
  14. val outDegree = getOutDegree
  15. ...
  16. }
  17. final class VertexUpdater extends GatherFunction {
  18. ...
  19. // get the number of vertices
  20. val numVertices = getNumberOfVertices
  21. ...
  22. }

The following example illustrates the usage of the edge direction option. Vertices update their values to contain a list of all their in-neighbors.

Java

  1. Graph<Long, HashSet<Long>, Double> graph = ...;
  2. // configure the iteration
  3. ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN);
  6. // run the scatter-gather iteration, also passing the configuration parameters
  7. DataSet<Vertex<Long, HashSet<Long>>> result =
  8. graph.runScatterGatherIteration(
  9. new Messenger(), new VertexUpdater(), maxIterations, parameters)
  10. .getVertices();
  11. // user-defined functions
  12. public static final class Messenger extends GatherFunction {...}
  13. public static final class VertexUpdater extends ScatterFunction {...}

Scala

  1. val graph: Graph[Long, HashSet[Long], Double] = ...
  2. // configure the iteration
  3. val parameters = new ScatterGatherConfiguration
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN)
  6. // run the scatter-gather iteration, also passing the configuration parameters
  7. val result = graph.runScatterGatherIteration(new Messenger, new VertexUpdater, maxIterations, parameters)
  8. .getVertices
  9. // user-defined functions
  10. final class Messenger extends ScatterFunction {...}
  11. final class VertexUpdater extends GatherFunction {...}

Gather-Sum-Apply Iterations

Like in the scatter-gather model, Gather-Sum-Apply also proceeds in synchronized iterative steps, called supersteps. Each superstep consists of the following three phases:

  • Gather: a user-defined function is invoked in parallel on the edges and neighbors of each vertex, producing a partial value.
  • Sum: the partial values produced in the Gather phase are aggregated to a single value, using a user-defined reducer.
  • Apply: each vertex value is updated by applying a function on the current value and the aggregated value produced by the Sum phase.

Let us consider computing Single-Source-Shortest-Paths with GSA on the following graph and let vertex 1 be the source. During the Gather phase, we calculate the new candidate distances, by adding each vertex value with the edge weight. In Sum, the candidate distances are grouped by vertex ID and the minimum distance is chosen. In Apply, the newly calculated distance is compared to the current vertex value and the minimum of the two is assigned as the new value of the vertex.

GSA SSSP superstep 1

Notice that, if a vertex does not change its value during a superstep, it will not calculate candidate distance during the next superstep. The algorithm converges when no vertex changes value.

To implement this example in Gelly GSA, the user only needs to call the runGatherSumApplyIteration method on the input graph and provide the GatherFunction, SumFunction and ApplyFunction UDFs. Iteration synchronization, grouping, value updates and convergence are handled by the system:

Java

  1. // read the input graph
  2. Graph<Long, Double, Double> graph = ...;
  3. // define the maximum number of iterations
  4. int maxIterations = 10;
  5. // Execute the GSA iteration
  6. Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
  7. new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
  8. // Extract the vertices as the result
  9. DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
  10. // - - - UDFs - - - //
  11. // Gather
  12. private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
  13. public Double gather(Neighbor<Double, Double> neighbor) {
  14. return neighbor.getNeighborValue() + neighbor.getEdgeValue();
  15. }
  16. }
  17. // Sum
  18. private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
  19. public Double sum(Double newValue, Double currentValue) {
  20. return Math.min(newValue, currentValue);
  21. }
  22. }
  23. // Apply
  24. private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
  25. public void apply(Double newDistance, Double oldDistance) {
  26. if (newDistance < oldDistance) {
  27. setResult(newDistance);
  28. }
  29. }
  30. }

Scala

  1. // read the input graph
  2. val graph: Graph[Long, Double, Double] = ...
  3. // define the maximum number of iterations
  4. val maxIterations = 10
  5. // Execute the GSA iteration
  6. val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance, new UpdateDistance, maxIterations)
  7. // Extract the vertices as the result
  8. val singleSourceShortestPaths = result.getVertices
  9. // - - - UDFs - - - //
  10. // Gather
  11. final class CalculateDistances extends GatherFunction[Double, Double, Double] {
  12. override def gather(neighbor: Neighbor[Double, Double]): Double = {
  13. neighbor.getNeighborValue + neighbor.getEdgeValue
  14. }
  15. }
  16. // Sum
  17. final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
  18. override def sum(newValue: Double, currentValue: Double): Double = {
  19. Math.min(newValue, currentValue)
  20. }
  21. }
  22. // Apply
  23. final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
  24. override def apply(newDistance: Double, oldDistance: Double) = {
  25. if (newDistance < oldDistance) {
  26. setResult(newDistance)
  27. }
  28. }
  29. }

Note that gather takes a Neighbor type as an argument. This is a convenience type which simply wraps a vertex with its neighboring edge.

For more examples of how to implement algorithms with the Gather-Sum-Apply model, check the GSAPageRank and GSAConnectedComponents library methods of Gelly.

Configuring a Gather-Sum-Apply Iteration

A GSA iteration can be configured using a GSAConfiguration object. Currently, the following parameters can be specified:

  • Name: The name for the GSA iteration. The name is displayed in logs and messages and can be specified using the setName() method.

  • Parallelism: The parallelism for the iteration. It can be set using the setParallelism() method.

  • Solution set in unmanaged memory: Defines whether the solution set is kept in managed memory (Flink’s internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the setSolutionSetUnmanagedMemory() method.

  • Aggregators: Iteration aggregators can be registered using the registerAggregator() method. An iteration aggregator combines all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined GatherFunction, SumFunction and ApplyFunction.

  • Broadcast Variables: DataSets can be added as Broadcast Variables to the GatherFunction, SumFunction and ApplyFunction, using the methods addBroadcastSetForGatherFunction(), addBroadcastSetForSumFunction() and addBroadcastSetForApplyFunction methods, respectively.

  • Number of Vertices: Accessing the total number of vertices within the iteration. This property can be set using the setOptNumVertices() method. The number of vertices can then be accessed in the gather, sum and/or apply functions by using the getNumberOfVertices() method. If the option is not set in the configuration, this method will return -1.

  • Neighbor Direction: By default values are gathered from the out neighbors of the Vertex. This can be modified using the setDirection() method.

The following example illustrates the usage of the number of vertices option.

Java

  1. Graph<Long, Double, Double> graph = ...;
  2. // configure the iteration
  3. GSAConfiguration parameters = new GSAConfiguration();
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true);
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. Graph<Long, Long, Long> result = graph.runGatherSumApplyIteration(
  8. new Gather(), new Sum(), new Apply(),
  9. maxIterations, parameters);
  10. // user-defined functions
  11. public static final class Gather {
  12. ...
  13. // get the number of vertices
  14. long numVertices = getNumberOfVertices();
  15. ...
  16. }
  17. public static final class Sum {
  18. ...
  19. // get the number of vertices
  20. long numVertices = getNumberOfVertices();
  21. ...
  22. }
  23. public static final class Apply {
  24. ...
  25. // get the number of vertices
  26. long numVertices = getNumberOfVertices();
  27. ...
  28. }

Scala

  1. val graph: Graph[Long, Double, Double] = ...
  2. // configure the iteration
  3. val parameters = new GSAConfiguration
  4. // set the number of vertices option to true
  5. parameters.setOptNumVertices(true)
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
  8. // user-defined functions
  9. final class Gather {
  10. ...
  11. // get the number of vertices
  12. val numVertices = getNumberOfVertices
  13. ...
  14. }
  15. final class Sum {
  16. ...
  17. // get the number of vertices
  18. val numVertices = getNumberOfVertices
  19. ...
  20. }
  21. final class Apply {
  22. ...
  23. // get the number of vertices
  24. val numVertices = getNumberOfVertices
  25. ...
  26. }

The following example illustrates the usage of the edge direction option.

Java

  1. Graph<Long, HashSet<Long>, Double> graph = ...;
  2. // configure the iteration
  3. GSAConfiguration parameters = new GSAConfiguration();
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN);
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. DataSet<Vertex<Long, HashSet<Long>>> result =
  8. graph.runGatherSumApplyIteration(
  9. new Gather(), new Sum(), new Apply(), maxIterations, parameters)
  10. .getVertices();

Scala

  1. val graph: Graph[Long, HashSet[Long], Double] = ...
  2. // configure the iteration
  3. val parameters = new GSAConfiguration
  4. // set the messaging direction
  5. parameters.setDirection(EdgeDirection.IN)
  6. // run the gather-sum-apply iteration, also passing the configuration parameters
  7. val result = graph.runGatherSumApplyIteration(new Gather, new Sum, new Apply, maxIterations, parameters)
  8. .getVertices()

Iteration Abstractions Comparison

Although the three iteration abstractions in Gelly seem quite similar, understanding their differences can lead to more performant and maintainable programs. Among the three, the vertex-centric model is the most general model and supports arbitrary computation and messaging for each vertex. In the scatter-gather model, the logic of producing messages is decoupled from the logic of updating vertex values. Thus, programs written using scatter-gather are sometimes easier to follow and maintain. Separating the messaging phase from the vertex value update logic not only makes some programs easier to follow but might also have a positive impact on performance. Scatter-gather implementations typically have lower memory requirements, because concurrent access to the inbox (messages received) and outbox (messages to send) data structures is not required. However, this characteristic also limits expressiveness and makes some computation patterns non-intuitive. Naturally, if an algorithm requires a vertex to concurrently access its inbox and outbox, then the expression of this algorithm in scatter-gather might be problematic. Strongly Connected Components and Approximate Maximum Weight Matching are examples of such graph algorithms. A direct consequence of this restriction is that vertices cannot generate messages and update their states in the same phase. Thus, deciding whether to propagate a message based on its content would require storing it in the vertex value, so that the gather phase has access to it, in the following iteration step. Similarly, if the vertex update logic includes computation over the values of the neighboring edges, these have to be included inside a special message passed from the scatter to the gather phase. Such workarounds often lead to higher memory requirements and non-elegant, hard to understand algorithm implementations.

Gather-sum-apply iterations are also quite similar to scatter-gather iterations. In fact, any algorithm which can be expressed as a GSA iteration can also be written in the scatter-gather model. The messaging phase of the scatter-gather model is equivalent to the Gather and Sum steps of GSA: Gather can be seen as the phase where the messages are produced and Sum as the phase where they are routed to the target vertex. Similarly, the value update phase corresponds to the Apply step.

The main difference between the two implementations is that the Gather phase of GSA parallelizes the computation over the edges, while the messaging phase distributes the computation over the vertices. Using the SSSP examples above, we see that in the first superstep of the scatter-gather case, vertices 1, 2 and 3 produce messages in parallel. Vertex 1 produces 3 messages, while vertices 2 and 3 produce one message each. In the GSA case on the other hand, the computation is parallelized over the edges: the three candidate distance values of vertex 1 are produced in parallel. Thus, if the Gather step contains “heavy” computation, it might be a better idea to use GSA and spread out the computation, instead of burdening a single vertex. Another case when parallelizing over the edges might prove to be more efficient is when the input graph is skewed (some vertices have a lot more neighbors than others).

Another difference between the two implementations is that the scatter-gather implementation uses a coGroup operator internally, while GSA uses a reduce. Therefore, if the function that combines neighbor values (messages) requires the whole group of values for the computation, scatter-gather should be used. If the update function is associative and commutative, then the GSA’s reducer is expected to give a more efficient implementation, as it can make use of a combiner.

Another thing to note is that GSA works strictly on neighborhoods, while in the vertex-centric and scatter-gather models, a vertex can send a message to any vertex, given that it knows its vertex ID, regardless of whether it is a neighbor. Finally, in Gelly’s scatter-gather implementation, one can choose the messaging direction, i.e. the direction in which updates propagate. GSA does not support this yet, so each vertex will be updated based on the values of its in-neighbors only.

The main differences among the Gelly iteration models are shown in the table below.

Iteration ModelUpdate FunctionUpdate LogicCommunication ScopeCommunication Logic
Vertex-Centricarbitraryarbitraryany vertexarbitrary
Scatter-Gatherarbitrarybased on received messagesany vertexbased on vertex state
Gather-Sum-Applyassociative and commutativebased on neighbors’ valuesneighborhoodbased on vertex state