Best Practices

This page contains a collection of best practices for Flink programmers on how to solve frequently encountered problems.

Almost all Flink applications, both batch and streaming, rely on external configuration parameters.They are used to specify input and output sources (like paths or addresses), system parameters (parallelism, runtime configuration), and application specific parameters (typically used within user functions).

Flink provides a simple utility called ParameterTool to provide some basic tooling for solving these problems.Please note that you don’t have to use the ParameterTool described here. Other frameworks such as Commons CLI andargparse4j also work well with Flink.

Getting your configuration values into the ParameterTool

The ParameterTool provides a set of predefined static methods for reading the configuration. The tool is internally expecting a Map<String, String>, so it’s very easy to integrate it with your own configuration style.

From .properties files

The following method will read a Properties file and provide the key/value pairs:

  1. String propertiesFilePath = "/home/sam/flink/myjob.properties";
  2. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFilePath);
  3. File propertiesFile = new File(propertiesFilePath);
  4. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
  5. InputStream propertiesFileInputStream = new FileInputStream(file);
  6. ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFileInputStream);

From the command line arguments

This allows getting arguments like —input hdfs:///mydata —elements 42 from the command line.

  1. public static void main(String[] args) {
  2. ParameterTool parameter = ParameterTool.fromArgs(args);
  3. // .. regular code ..

From system properties

When starting a JVM, you can pass system properties to it: -Dinput=hdfs:///mydata. You can also initialize the ParameterTool from these system properties:

  1. ParameterTool parameter = ParameterTool.fromSystemProperties();

Now that we’ve got the parameters from somewhere (see above) we can use them in various ways.

Directly from the ParameterTool

The ParameterTool itself has methods for accessing the values.

  1. ParameterTool parameters = // ...
  2. parameter.getRequired("input");
  3. parameter.get("output", "myDefaultValue");
  4. parameter.getLong("expectedCount", -1L);
  5. parameter.getNumberOfParameters()
  6. // .. there are more methods available.

You can use the return values of these methods directly in the main() method of the client submitting the application.For example, you could set the parallelism of a operator like this:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. int parallelism = parameters.get("mapParallelism", 2);
  3. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

Since the ParameterTool is serializable, you can pass it to the functions itself:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

and then use it inside the function for getting values from the command line.

Register the parameters globally

Parameters registered as global job parameters in the ExecutionConfig can be accessed as configuration values from the JobManager web interface and in all functions defined by the user.

Register the parameters globally:

  1. ParameterTool parameters = ParameterTool.fromArgs(args);
  2. // set up the execution environment
  3. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  4. env.getConfig().setGlobalJobParameters(parameters);

Access them in any rich user function:

  1. public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
  2. @Override
  3. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  4. ParameterTool parameters = (ParameterTool)
  5. getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  6. parameters.getRequired("input");
  7. // .. do more ..

Naming large TupleX types

It is recommended to use POJOs (Plain old Java objects) instead of TupleX for data types with many fields.Also, POJOs can be used to give large Tuple-types a name.

Example

Instead of using:

  1. Tuple11<String, String, ..., String> var = new ...;

It is much easier to create a custom type extending from the large Tuple type.

  1. CustomType var = new ...;
  2. public static class CustomType extends Tuple11<String, String, ..., String> {
  3. // constructor matching super
  4. }

Using Logback instead of Log4j

Note: This tutorial is applicable starting from Flink 0.10

Apache Flink is using slf4j as the logging abstraction in the code. Users are advised to use sfl4j as well in their user functions.

Sfl4j is a compile-time logging interface that can use different logging implementations at runtime, such as log4j or Logback.

Flink is depending on Log4j by default. This page describes how to use Flink with Logback. Users reported that they were also able to set up centralized logging with Graylog using this tutorial.

To get a logger instance in the code, use the following code:

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. public class MyClass implements MapFunction {
  4. private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
  5. // ...

In all cases where classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.

Therefore, you will need to exclude log4j from Flink’s dependencies. The following description will assume a Maven project created from a Flink quickstart.

Change your projects pom.xml file like this:

  1. <dependencies>
  2. <!-- Add the two required logback dependencies -->
  3. <dependency>
  4. <groupId>ch.qos.logback</groupId>
  5. <artifactId>logback-core</artifactId>
  6. <version>1.1.3</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>ch.qos.logback</groupId>
  10. <artifactId>logback-classic</artifactId>
  11. <version>1.1.3</version>
  12. </dependency>
  13. <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
  14. Hadoop is logging to log4j! -->
  15. <dependency>
  16. <groupId>org.slf4j</groupId>
  17. <artifactId>log4j-over-slf4j</artifactId>
  18. <version>1.7.7</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.flink</groupId>
  22. <artifactId>flink-java</artifactId>
  23. <version>1.9.0</version>
  24. <exclusions>
  25. <exclusion>
  26. <groupId>log4j</groupId>
  27. <artifactId>*</artifactId>
  28. </exclusion>
  29. <exclusion>
  30. <groupId>org.slf4j</groupId>
  31. <artifactId>slf4j-log4j12</artifactId>
  32. </exclusion>
  33. </exclusions>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-streaming-java_2.11</artifactId>
  38. <version>1.9.0</version>
  39. <exclusions>
  40. <exclusion>
  41. <groupId>log4j</groupId>
  42. <artifactId>*</artifactId>
  43. </exclusion>
  44. <exclusion>
  45. <groupId>org.slf4j</groupId>
  46. <artifactId>slf4j-log4j12</artifactId>
  47. </exclusion>
  48. </exclusions>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.apache.flink</groupId>
  52. <artifactId>flink-clients_2.11</artifactId>
  53. <version>1.9.0</version>
  54. <exclusions>
  55. <exclusion>
  56. <groupId>log4j</groupId>
  57. <artifactId>*</artifactId>
  58. </exclusion>
  59. <exclusion>
  60. <groupId>org.slf4j</groupId>
  61. <artifactId>slf4j-log4j12</artifactId>
  62. </exclusion>
  63. </exclusions>
  64. </dependency>
  65. </dependencies>

The following changes were done in the <dependencies> section:

  • Exclude all log4j dependencies from all Flink dependencies: this causes Maven to ignore Flink’s transitive dependencies to log4j.
  • Exclude the slf4j-log4j12 artifact from Flink’s dependencies: since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
  • Add the Logback dependencies: logback-core and logback-classic
  • Add dependencies for log4j-over-slf4j. log4j-over-slf4j is a tool which allows legacy applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger calls from Log4j to Slf4j which is in turn logging to Logback.Please note that you need to manually add the exclusions to all new Flink dependencies you are adding to the pom file.

You may also need to check if other (non-Flink) dependencies are pulling in log4j bindings. You can analyze the dependencies of your project with mvn dependency:tree.

This tutorial is applicable when running Flink on YARN or as a standalone cluster.

In order to use Logback instead of Log4j with Flink, you need to remove log4j-1.2.xx.jar and sfl4j-log4j12-xxx.jar from the lib/ directory.

Next, you need to put the following jar files into the lib/ folder:

  • logback-classic.jar
  • logback-core.jar
  • log4j-over-slf4j.jar: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j.Note that you need to explicitly set the lib/ directory when using a per-job YARN cluster.

The command to submit Flink on YARN with a custom logger is: ./bin/flink run -yt $FLINK_HOME/lib <… remaining arguments …>