Flink on Mesos

Getting Started

This Getting Started section guides you through setting up a fully functional Flink Cluster on Mesos.

Introduction

Apache Mesos is another resource provider supported by Apache Flink. Flink utilizes the worker’s provided by Mesos to run its TaskManagers. Apache Flink provides the script bin/mesos-appmaster.sh to initiate the Flink on Mesos cluster.

Preparation

Flink on Mesos expects a Mesos cluster to be around. It also requires the Flink binaries being deployed. Additionally, Hadoop needs to be installed on the very same machine.

Flink provides bin/mesos-appmaster.sh to initiate a Flink on Mesos cluster. A Mesos application master will be created (i.e. a JobManager process with Mesos support) which will utilize the Mesos workers to run Flink’s TaskManager processes.

For bin/mesos-appmaster.sh to work, you have to set the two variables HADOOP_CLASSPATH and MESOS_NATIVE_JAVA_LIBRARY:

  1. $ export HADOOP_CLASSPATH=$(hadoop classpath)
  2. $ export MESOS_NATIVE_JAVA_LIBRARY=/path/to/lib/libmesos.so

MESOS_NATIVE_JAVA_LIBRARY needs to point to Mesos’ native Java library. The library name libmesos.so used above refers to Mesos’ Linux library. Running Mesos on MacOS would require you to use libmesos.dylib instead.

Starting a Flink Session on Mesos

Connect to the machine which matches all the requirements listed in the Preparation section. Change into Flink’s home directory and call bin/mesos-appmaster.sh:

  1. # (0) set required environment variables
  2. $ export HADOOP_CLASSPATH=$(hadoop classpath)
  3. $ export MESOS_NATIVE_JAVA_LIBRARY=/path/to/lib/libmesos.so
  4. # (1) create Flink on Mesos cluster
  5. $ ./bin/mesos-appmaster.sh \
  6. -Dmesos.master=<mesos-master>:5050 \
  7. -Djobmanager.rpc.address=<jobmanager-host> \
  8. -Dmesos.resourcemanager.framework.user=<flink-user> \
  9. -Dmesos.resourcemanager.tasks.cpus=6
  10. # (2) execute Flink job passing the relevant configuration parameters
  11. $ ./bin/flink run \
  12. --detached \
  13. --target remote \
  14. -Djobmanager.rpc.address=<jobmanager-host> \
  15. -Dmesos.resourcemanager.framework.user=<flink-user> \
  16. -Dmesos.master=<mesos-master>:5050 \
  17. examples/streaming/WindowJoin.jar

The commands above use a few placeholders that need to be substituted by settings of the actual underlying cluster:

  • <mesos-master> refers to the Mesos master’s IP address or hostname.
  • <jobmanager-host> refers to the host that executes bin/mesos-appmaster.sh which is starting Flink’s JobManager process. It’s important to not use localhost or 127.0.0.1 as this parameter is being shared with the Mesos cluster and the TaskManagers.
  • <flink-user> refers to the user that owns the Mesos master’s Flink installation directory (see Mesos’ documentation on specifying a user for further details).

The run action requires --target to be set to remote. Refer to the CLI documentation for further details on that parameter.

The Flink on Mesos cluster is now deployed in Session Mode. Note that you can run multiple Flink jobs on a Session cluster. Each job needs to be submitted to the cluster. TaskManagers are deployed on the Mesos workers as needed. Keep in mind that you can only run as many jobs as the Mesos cluster allows in terms of resources provided by the Mesos workers. Play around with Flink’s parameters to find the right resource utilization for your needs.

Check out Flink’s Mesos configuration to further influence the resources Flink on Mesos is going to allocate.

Deployment Modes

For production use, we recommend deploying Flink Applications in the Per-Job Mode, as it provides a better isolation for each job.

Application Mode

Flink on Mesos does not support Application Mode.

Per-Job Cluster Mode

A job which is executed in Per-Job Cluster Mode spins up a dedicated Flink cluster that is only used for that specific job. No extra job submission is needed. bin/mesos-appmaster-job.sh is used as the startup script. It will start a Flink cluster for a dedicated job which is passed as a JobGraph file. This file can be created by applying the following code to your Job source code:

  1. final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
  2. final String jobGraphFilename = "job.graph";
  3. File jobGraphFile = new File(jobGraphFilename);
  4. try (FileOutputStream output = new FileOutputStream(jobGraphFile);
  5. ObjectOutputStream obOutput = new ObjectOutputStream(output)){
  6. obOutput.writeObject(jobGraph);
  7. }

Flink on Mesos Per-Job cluster can be started in the following way:

  1. # (0) set required environment variables
  2. $ export HADOOP_CLASSPATH=$(hadoop classpath)
  3. $ export MESOS_NATIVE_JAVA_LIBRARY=/path/to/lib/libmesos.so
  4. # (1) create Per-Job Flink on Mesos cluster
  5. $ ./bin/mesos-appmaster-job.sh \
  6. -Dmesos.master=<mesos-master>:5050 \
  7. -Djobmanager.rpc.address=<jobmanager-host> \
  8. -Dmesos.resourcemanager.framework.user=<flink-user> \
  9. -Dinternal.jobgraph-path=<job-graph-file>

<job-graph-file> refers to the path of the uploaded JobGraph file defining the job that shall be executed on the Per-Job Flink cluster in the command above. The meaning of <mesos-master>, <jobmanager-host> and <flink-user> are described in the Getting Started guide of this page.

Session Mode

The Getting Started guide at the top of this page describes deploying Flink in Session Mode.

Flink on Mesos Reference

Deploying User Libraries

User libraries can be passed to the Mesos workers by placing them in Flink’s lib/ folder. This way, they will be picked by Mesos’ Fetcher and copied over into the worker’s sandbox folders. Alternatively, Docker containerization can be used as described in Installing Flink on the Workers.

Installing Flink on the Workers

Flink on Mesos offers two ways to distribute the Flink and user binaries within the Mesos cluster:

  1. Using Mesos’ Artifact Server: The Artifact Server provides the resources which are moved by Mesos’ Fetcher into the Mesos worker’s sandbox folders. It can be explicitly specified by setting mesos.resourcemanager.tasks.container.type to mesos. This is the default option and is used in the example commands of this page.
  2. Using Docker containerization: This enables the user to provide user libraries and other customizations as part of a Docker image. Docker utilization can be enabled by setting mesos.resourcemanager.tasks.container.type to docker and by providing the image name through mesos.resourcemanager.tasks.container.image.name.

High Availability on Mesos

You will need to run a service like Marathon or Apache Aurora which takes care of restarting the JobManager process in case of node or process failures. In addition, Zookeeper needs to be configured as described in the High Availability section of the Flink docs.

Marathon

Marathon needs to be set up to launch the bin/mesos-appmaster.sh script. In particular, it should also adjust any configuration parameters for the Flink cluster.

Here is an example configuration for Marathon:

  1. {
  2. "id": "flink",
  3. "cmd": "/opt/flink-1.12.0/bin/mesos-appmaster.sh -Djobmanager.rpc.address=$HOST -Dmesos.resourcemanager.framework.user=<flink-user> -Dmesos.master=<mesos-master>:5050 -Dparallelism.default=2",
  4. "user": "<flink-user>",
  5. "cpus": 2,
  6. "mem": 2048,
  7. "instances": 1,
  8. "env": {
  9. "MESOS_NATIVE_JAVA_LIBRARY": "/usr/lib/libmesos.so"
  10. },
  11. "healthChecks": [
  12. {
  13. "protocol": "HTTP",
  14. "path": "/",
  15. "port": 8081,
  16. "gracePeriodSeconds": 300,
  17. "intervalSeconds": 60,
  18. "timeoutSeconds": 20,
  19. "maxConsecutiveFailures": 3
  20. }
  21. ]
  22. }

Flink is installed into /opt/flink-1.12.0 having <flink-user> as the owner of the Flink directory (notice that the user is used twice: once as a Marathon and another time as a Mesos parameter) for the example configuration above to work. Additionally, we have the bundled Hadoop jar saved in Flink’s lib/ folder for the sake of simplicity here. This way, we don’t have to set HADOOP_CLASSPATH as a environment variable next to MESOS_NATIVE_JAVA_LIBRARY.

<mesos-master> needs to be set to the hostname or IP of Mesos’ master node. $HOST is a Marathon environment variable referring to the hostname of the machine the script is executed on. $HOST should not be replaced in the config above!

The whole Flink cluster including the JobManager will be run as Mesos tasks in the Mesos cluster when deploying Flink using Marathon. Flink’s binaries have to be installed on all Mesos workers for the above Marathon config to work.

Supported Hadoop versions

Flink on Mesos is compiled against Hadoop 2.4.1, and all Hadoop versions >= 2.4.1 are supported, including Hadoop 3.x.

For providing Flink with the required Hadoop dependencies, we recommend setting the HADOOP_CLASSPATH environment variable already introduced in the Getting Started / Preparation section.

If that is not possible, the dependencies can also be put into the lib/ folder of Flink.

Flink also offers pre-bundled Hadoop fat jars for placing them in the lib/ folder, on the Downloads / Additional Components section of the website. These pre-bundled fat jars are shaded to avoid dependency conflicts with common libraries. The Flink community is not testing the Mesos integration against these pre-bundled jars.

Flink on Mesos Architecture

The Flink on Mesos implementation consists of two components: The application master and the workers. The workers are simple TaskManagers parameterized by the environment which is set up through the application master. The most sophisticated component of the Flink on Mesos implementation is the application master. The application master currently hosts the following components:

  • Mesos Scheduler: The Scheduler is responsible for registering a framework with Mesos, requesting resources, and launching worker nodes. The Scheduler continuously needs to report back to Mesos to ensure the framework is in a healthy state. To verify the health of the cluster, the Scheduler monitors the spawned workers, marks them as failed and restarts them if necessary.

    Flink’s Mesos Scheduler itself is currently not highly available. However, it persists all necessary information about its state (e.g. configuration, list of workers) in ZooKeeper. In the presence of a failure, it relies on an external system to bring up a new Scheduler (see the Marathon subsection for further details). The Scheduler will then register with Mesos again and go through the reconciliation phase. In the reconciliation phase, the Scheduler receives a list of running workers nodes. It matches these against the recovered information from ZooKeeper and makes sure to bring back the cluster in the state before the failure.

  • Artifact Server: The Artifact Server is responsible for providing resources to the worker nodes. The resources can be anything from the Flink binaries to shared secrets or configuration files. For instance, in non-containerized environments, the Artifact Server will provide the Flink binaries. What files will be served depends on the configuration overlay used.

Flink’s Mesos startup scripts bin/mesos-appmaster.sh and bin/mesos-appmaster-job.sh provide a way to configure and start the application master. The worker nodes inherit all further configuration. They are deployed through bin/mesos-taskmanager.sh. The configuration inheritance is achieved using configuration overlays. Configuration overlays provide a way to infer a configuration from environment variables and config files which are shipped to the worker nodes.

See Mesos Architecture for a more details on how frameworks are handled by Mesos.

Appendix

The following resource files can be used to set up a local Mesos cluster running the Marathon framework and having Flink 1.11.2 installed.

Dockerfile

  1. FROM mesosphere/mesos:1.7.1
  2. # install Java 11 and wget
  3. RUN apt update && \
  4. apt -y install wget && \
  5. wget -nv https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz && \
  6. tar xzf openjdk-11.0.2_linux-x64_bin.tar.gz && \
  7. mv jdk-11* /usr/local/jdk-11.0.2 && \
  8. update-alternatives --install /usr/bin/java java /usr/local/jdk-11.0.2/bin/java 2048 && \
  9. update-alternatives --auto java
  10. ENV JAVA_HOME=/usr/local/jdk-11.0.2
  11. WORKDIR /opt
  12. # install Hadoop
  13. RUN wget -nv https://apache.mirror.digionline.de/hadoop/common/hadoop-2.10.1/hadoop-2.10.1.tar.gz && \
  14. tar -xf hadoop-2.10.1.tar.gz
  15. ENV HADOOP_CLASSPATH=/opt/hadoop-2.10.1/etc/hadoop:/opt/hadoop-2.10.1/share/hadoop/common/lib/*:/opt/hadoop-2.10.1/share/hadoop/common/*:/opt/hadoop-2.10.1/share/hadoop/hdfs:/opt/hadoop-2.10.1/share/hadoop/hdfs/lib/*:/opt/hadoop-2.10.1/share/hadoop/hdfs/*:/opt/hadoop-2.10.1/share/hadoop/yarn:/opt/hadoop-2.10.1/share/hadoop/yarn/lib/*:/opt/hadoop-2.10.1/share/hadoop/yarn/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
  16. # install Flink on Mesos
  17. RUN wget -nv https://apache.mirror.digionline.de/flink/flink-1.11.2/flink-1.11.2-bin-scala_2.11.tgz && \
  18. tar -xf flink-1.11.2-bin-scala_2.11.tgz
  19. ENV MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so

Docker Compose

The docker-compose.yml provided below is based on the work done by Sean Bennet.

Keep in mind that it requires the Dockerfile of the previous section to be found in the same directory and the file being named Dockerfile. It might make sense to scale the worker nodes up to have enough workers to run Flink on Mesos next to the Marathon framework:

  1. docker-compose up -d --scale worker=2
  1. version: "3.8"
  2. services:
  3. zookeeper:
  4. build:
  5. context: .
  6. dockerfile: Dockerfile
  7. command: /usr/share/zookeeper/bin/zkServer.sh start-foreground
  8. container_name: zookeeper
  9. master:
  10. build:
  11. context: .
  12. dockerfile: Dockerfile
  13. command: mesos-master --registry=in_memory
  14. container_name: master
  15. environment:
  16. - MESOS_ZK=zk://zookeeper:2181/mesos
  17. - MESOS_LOG_DIR=/var/log/mesos
  18. - MESOS_QUORUM=1
  19. - MESOS_WORK_DIR=/var/lib/mesos
  20. depends_on:
  21. - zookeeper
  22. ports:
  23. - "5050:5050"
  24. - "8081:8081"
  25. worker:
  26. build:
  27. context: .
  28. dockerfile: Dockerfile
  29. command: mesos-slave --launcher=posix
  30. environment:
  31. - MESOS_MASTER=zk://zookeeper:2181/mesos
  32. - MESOS_WORK_DIR=/var/lib/mesos
  33. - MESOS_LOG_DIR=/var/log/mesos
  34. - MESOS_LOGGING_LEVEL=INFO
  35. - MESOS_SYSTEMD_ENABLE_SUPPORT=false
  36. depends_on:
  37. - zookeeper
  38. - master
  39. ports:
  40. - "8081"
  41. marathon:
  42. image: mesosphere/marathon:v1.11.24
  43. container_name: marathon
  44. environment:
  45. - MARATHON_MASTER=zk://zookeeper:2181/mesos
  46. - MARATHON_ZK=zk://zookeeper:2181/marathon
  47. - MARATHON_ZK_CONNECTION_TIMEOUT=60000
  48. ports:
  49. - "8080:8080"
  50. depends_on:
  51. - zookeeper
  52. - master

Marathon configuration

The following Marathon configuration can be applied through the Marathon UI: http://localhost:8080/ It will start a Flink on Mesos cluster on any of the worker machines. Flink’s default port 8081 is forwarded to random ports due to the scaling of the worker nodes. Use docker ps to figure out the host system’s ports that to be able to access Flink’s web interface.

  1. {
  2. "id": "flink",
  3. "cmd": "/opt/flink-1.11.2/bin/mesos-appmaster.sh -Dmesos.resourcemanager.framework.user=root -Dmesos.master=master:5050 -Djobmanager.rpc.address=$HOST -Dparallelism.default=2",
  4. "cpus": 2,
  5. "mem": 4096,
  6. "disk": 0,
  7. "instances": 1,
  8. "env": {
  9. "HADOOP_CLASSPATH": "/opt/hadoop-2.10.1/etc/hadoop:/opt/hadoop-2.10.1/share/hadoop/common/lib/*:/opt/hadoop-2.10.1/share/hadoop/common/*:/opt/hadoop-2.10.1/share/hadoop/hdfs:/opt/hadoop-2.10.1/share/hadoop/hdfs/lib/*:/opt/hadoop-2.10.1/share/hadoop/hdfs/*:/opt/hadoop-2.10.1/share/hadoop/yarn:/opt/hadoop-2.10.1/share/hadoop/yarn/lib/*:/opt/hadoop-2.10.1/share/hadoop/yarn/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/*:/opt/hadoop-2.10.1/etc/hadoop:/opt/hadoop-2.10.1/share/hadoop/common/lib/*:/opt/hadoop-2.10.1/share/hadoop/common/*:/opt/hadoop-2.10.1/share/hadoop/hdfs:/opt/hadoop-2.10.1/share/hadoop/hdfs/lib/*:/opt/hadoop-2.10.1/share/hadoop/hdfs/*:/opt/hadoop-2.10.1/share/hadoop/yarn:/opt/hadoop-2.10.1/share/hadoop/yarn/lib/*:/opt/hadoop-2.10.1/share/hadoop/yarn/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.10.1/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar:/contrib/capacity-scheduler/*.jar"
  10. },
  11. "healthChecks": [
  12. {
  13. "protocol": "HTTP",
  14. "path": "/",
  15. "port": 8081,
  16. "gracePeriodSeconds": 300,
  17. "intervalSeconds": 60,
  18. "timeoutSeconds": 20,
  19. "maxConsecutiveFailures": 3
  20. }
  21. ],
  22. "user": "root"
  23. }