Batch Examples

The following example programs showcase different applications of Flink from simple word counting to graph algorithms. The code samples illustrate the use of Flink’s DataSet API.

The full source code of the following and more examples can be found in the flink-examples-batch module of the Flink source repository.

Running an example

In order to run a Flink example, we assume you have a running Flink instance available. The “Quickstart” and “Setup” tabs in the navigation describe various ways of starting Flink.

The easiest way is running the ./bin/start-cluster.sh, which by default starts a local cluster with one JobManager and one TaskManager.

Each binary release of Flink contains an examples directory with jar files for each of the examples on this page.

To run the WordCount example, issue the following command:

  1. ./bin/flink run ./examples/batch/WordCount.jar

The other examples can be started in a similar way.

Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:

  1. ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

Note that non-local file systems require a schema prefix, such as hdfs://.

Word Count

WordCount is the “Hello World” of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.

Java

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. DataSet<String> text = env.readTextFile("/path/to/file");
  3. DataSet<Tuple2<String, Integer>> counts =
  4. // split up the lines in pairs (2-tuples) containing: (word,1)
  5. text.flatMap(new Tokenizer())
  6. // group by the tuple field "0" and sum up tuple field "1"
  7. .groupBy(0)
  8. .sum(1);
  9. counts.writeAsCsv(outputPath, "\n", " ");
  10. // User-defined functions
  11. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
  12. @Override
  13. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  14. // normalize and split the line
  15. String[] tokens = value.toLowerCase().split("\\W+");
  16. // emit the pairs
  17. for (String token : tokens) {
  18. if (token.length() > 0) {
  19. out.collect(new Tuple2<String, Integer>(token, 1));
  20. }
  21. }
  22. }
  23. }

The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.

Scala

  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. // get input data
  3. val text = env.readTextFile("/path/to/file")
  4. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  5. .map { (_, 1) }
  6. .groupBy(0)
  7. .sum(1)
  8. counts.writeAsCsv(outputPath, "\n", " ")

The WordCount example implements the above described algorithm with input parameters: --input <path> --output <path>. As test data, any text file will do.

Page Rank

The PageRank algorithm computes the “importance” of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.

In this simple example, PageRank is implemented with a bulk iteration and a fixed number of iterations.

Java

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. // read the pages and initial ranks by parsing a CSV file
  3. DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
  4. .types(Long.class, Double.class);
  5. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  6. DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
  7. // set iterative data set
  8. IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
  9. DataSet<Tuple2<Long, Double>> newRanks = iteration
  10. // join pages with outgoing edges and distribute rank
  11. .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
  12. // collect and sum ranks
  13. .groupBy(0).sum(1)
  14. // apply dampening factor
  15. .map(new Dampener(DAMPENING_FACTOR, numPages));
  16. DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
  17. newRanks,
  18. newRanks.join(iteration).where(0).equalTo(0)
  19. // termination condition
  20. .filter(new EpsilonFilter()));
  21. finalPageRanks.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class JoinVertexWithEdgesMatch
  24. implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
  25. Tuple2<Long, Double>> {
  26. @Override
  27. public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
  28. Collector<Tuple2<Long, Double>> out) {
  29. Long[] neighbors = adj.f1;
  30. double rank = page.f1;
  31. double rankToDistribute = rank / ((double) neigbors.length);
  32. for (int i = 0; i < neighbors.length; i++) {
  33. out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
  34. }
  35. }
  36. }
  37. public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
  38. private final double dampening, randomJump;
  39. public Dampener(double dampening, double numVertices) {
  40. this.dampening = dampening;
  41. this.randomJump = (1 - dampening) / numVertices;
  42. }
  43. @Override
  44. public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  45. value.f1 = (value.f1 * dampening) + randomJump;
  46. return value;
  47. }
  48. }
  49. public static final class EpsilonFilter
  50. implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
  51. @Override
  52. public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  53. return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
  54. }
  55. }

The PageRank program implements the above example. It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>.

Scala

  1. // User-defined types
  2. case class Link(sourceId: Long, targetId: Long)
  3. case class Page(pageId: Long, rank: Double)
  4. case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
  5. // set up execution environment
  6. val env = ExecutionEnvironment.getExecutionEnvironment
  7. // read the pages and initial ranks by parsing a CSV file
  8. val pages = env.readCsvFile[Page](pagesInputPath)
  9. // the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
  10. val links = env.readCsvFile[Link](linksInputPath)
  11. // assign initial ranks to pages
  12. val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
  13. // build adjacency list from link input
  14. val adjacencyLists = links
  15. // initialize lists
  16. .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  17. // concatenate lists
  18. .groupBy("sourceId").reduce {
  19. (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  20. }
  21. // start iteration
  22. val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  23. currentRanks =>
  24. val newRanks = currentRanks
  25. // distribute ranks to target pages
  26. .join(adjacencyLists).where("pageId").equalTo("sourceId") {
  27. (page, adjacent, out: Collector[Page]) =>
  28. for (targetId <- adjacent.targetIds) {
  29. out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
  30. }
  31. }
  32. // collect ranks and sum them up
  33. .groupBy("pageId").aggregate(SUM, "rank")
  34. // apply dampening factor
  35. .map { p =>
  36. Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
  37. }
  38. // terminate if no rank update was significant
  39. val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
  40. (current, next, out: Collector[Int]) =>
  41. // check for significant update
  42. if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
  43. }
  44. (newRanks, termination)
  45. }
  46. val result = finalRanks
  47. // emit result
  48. result.writeAsCsv(outputPath, "\n", " ")

The PageRank program implements the above example. It requires the following parameters to run: --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>.

Input files are plain text files and must be formatted as follows:

  • Pages represented as an (long) ID separated by new-line characters.
    • For example "1\n2\n12\n42\n63\n" gives five pages with IDs 1, 2, 12, 42, and 63.
  • Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
    • For example "1 2\n2 12\n1 12\n42 63\n" gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).

For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).

Connected Components

The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.

This implementation uses a delta iteration: Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.

Java

  1. // read vertex and edge data
  2. DataSet<Long> vertices = getVertexDataSet(env);
  3. DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
  4. // assign the initial component IDs (equal to the vertex ID)
  5. DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
  6. // open a delta iteration
  7. DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
  8. verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
  9. // apply the step logic:
  10. DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
  11. // join with the edges
  12. .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
  13. // select the minimum neighbor component ID
  14. .groupBy(0).aggregate(Aggregations.MIN, 1)
  15. // update if the component ID of the candidate is smaller
  16. .join(iteration.getSolutionSet()).where(0).equalTo(0)
  17. .flatMap(new ComponentIdFilter());
  18. // close the delta iteration (delta and new workset are identical)
  19. DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
  20. // emit result
  21. result.writeAsCsv(outputPath, "\n", " ");
  22. // User-defined functions
  23. public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
  24. @Override
  25. public Tuple2<T, T> map(T vertex) {
  26. return new Tuple2<T, T>(vertex, vertex);
  27. }
  28. }
  29. public static final class UndirectEdge
  30. implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  31. Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
  32. @Override
  33. public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
  34. invertedEdge.f0 = edge.f1;
  35. invertedEdge.f1 = edge.f0;
  36. out.collect(edge);
  37. out.collect(invertedEdge);
  38. }
  39. }
  40. public static final class NeighborWithComponentIDJoin
  41. implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
  42. @Override
  43. public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
  44. return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
  45. }
  46. }
  47. public static final class ComponentIdFilter
  48. implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
  49. Tuple2<Long, Long>> {
  50. @Override
  51. public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
  52. Collector<Tuple2<Long, Long>> out) {
  53. if (value.f0.f1 < value.f1.f1) {
  54. out.collect(value.f0);
  55. }
  56. }
  57. }

The ConnectedComponents program implements the above example. It requires the following parameters to run: --vertices <path> --edges <path> --output <path> --iterations <n>.

Scala

  1. // set up execution environment
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. // read vertex and edge data
  4. // assign the initial components (equal to the vertex id)
  5. val vertices = getVerticesDataSet(env).map { id => (id, id) }
  6. // undirected edges by emitting for each input edge the input edges itself and an inverted
  7. // version
  8. val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
  9. // open a delta iteration
  10. val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  11. (s, ws) =>
  12. // apply the step logic: join with the edges
  13. val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
  14. (edge._2, vertex._2)
  15. }
  16. // select the minimum neighbor
  17. val minNeighbors = allNeighbors.groupBy(0).min(1)
  18. // update if the component of the candidate is smaller
  19. val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
  20. (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
  21. if (newVertex._2 < oldVertex._2) out.collect(newVertex)
  22. }
  23. // delta and new workset are identical
  24. (updatedComponents, updatedComponents)
  25. }
  26. verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

The ConnectedComponents program implements the above example. It requires the following parameters to run: --vertices <path> --edges <path> --output <path> --iterations <n>.

Input files are plain text files and must be formatted as follows:

  • Vertices represented as IDs and separated by new-line characters.
    • For example "1\n2\n12\n42\n63\n" gives five vertices with (1), (2), (12), (42), and (63).
  • Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
    • For example "1 2\n2 12\n1 12\n42 63\n" gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).