Elastic Scaling

Apache Flink allows you to rescale your jobs. You can do this manually by stopping the job and restarting from the savepoint created during shutdown with a different parallelism.

This page describes options where Flink automatically adjusts the parallelism instead.

Reactive Mode

Reactive mode is an MVP (“minimum viable product”) feature. The Flink community is actively looking for feedback by users through our mailing lists. Please check the limitations listed on this page.

Reactive Mode configures a job so that it always uses all resources available in the cluster. Adding a TaskManager will scale up your job, removing resources will scale it down. Flink will manage the parallelism of the job, always setting it to the highest possible values.

Reactive Mode restarts a job on a rescaling event, restoring it from the latest completed checkpoint. This means that there is no overhead of creating a savepoint (which is needed for manually rescaling a job). Also, the amount of data that is reprocessed after rescaling depends on the checkpointing interval, and the restore time depends on the state size.

The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the replica factor of a Kubernetes deployment, or an autoscaling group on AWS. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.

Getting started

If you just want to try out Reactive Mode, follow these instructions. They assume that you are deploying Flink on a single machine.

  1. # these instructions assume you are in the root directory of a Flink distribution.
  2. # Put Job into lib/ directory
  3. cp ./examples/streaming/TopSpeedWindowing.jar lib/
  4. # Submit Job in Reactive Mode
  5. ./bin/standalone-job.sh start -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="10s" -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
  6. # Start first TaskManager
  7. ./bin/taskmanager.sh start

Let’s quickly examine the used submission command:

  • ./bin/standalone-job.sh start deploys Flink in Application Mode
  • -Dscheduler-mode=reactive enables Reactive Mode.
  • -Dexecution.checkpointing.interval="10s" configure checkpointing and restart strategy.
  • the last argument is passing the Job’s main class name.

You have now started a Flink job in Reactive Mode. The web interface shows that the job is running on one TaskManager. If you want to scale up the job, simply add another TaskManager to the cluster:

  1. # Start additional TaskManager
  2. ./bin/taskmanager.sh start

To scale down, remove a TaskManager instance.

  1. # Remove a TaskManager
  2. ./bin/taskmanager.sh stop

Usage

Configuration

To enable Reactive Mode, you need to configure scheduler-mode to reactive.

The parallelism of individual operators in a job will be determined by the scheduler. It is not configurable and will be ignored if explicitly set, either on individual operators or the entire job.

The only way of influencing the parallelism is by setting a max parallelism for an operator (which will be respected by the scheduler). The maxParallelism is bounded by 2^15 (32768). If you do not set a max parallelism for individual operators or the entire job, the default parallelism rules will be applied, potentially applying lower bounds than the max possible value. As with the default scheduling mode, please take the best practices for parallelism into consideration.

Note that such a high max parallelism might affect performance of the job, since more internal structures are needed to maintain some internal structures of Flink.

When enabling Reactive Mode, the jobmanager.adaptive-scheduler.resource-wait-timeout configuration key will default to -1. This means that the JobManager will run forever waiting for sufficient resources. If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure jobmanager.adaptive-scheduler.resource-wait-timeout.

With Reactive Mode enabled, the jobmanager.adaptive-scheduler.resource-stabilization-timeout configuration key will default to 0: Flink will start running the job, as soon as there are sufficient resources available. In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job. Additionally, one can configure jobmanager.adaptive-scheduler.min-parallelism-increase: This configuration option specifics the minimum amount of additional, aggregate parallelism increase before triggering a scale-up. For example if you have a job with a source (parallelism=2) and a sink (parallelism=2), the aggregate parallelism is 4. By default, the configuration key is set to 1, so any increase in the aggregate parallelism will trigger a restart.

Recommendations

  • Configure periodic checkpointing for stateful jobs: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will lose its state. Checkpointing also configures a restart strategy. Reactive Mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it.

  • Downscaling in Reactive Mode might take longer if the TaskManager is not properly shutdown (i.e., if a SIGKILL signal is used instead of a SIGTERM signal). In this case, Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism.

    The default timeout is configured to 50 seconds. Adjust the heartbeat.timeout configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the heartbeat.interval always needs to be lower than the timeout.

Limitations

Since Reactive Mode is a new, experimental feature, not all features supported by the default scheduler are also available with Reactive Mode (and its adaptive scheduler). The Flink community is working on addressing these limitations.

The limitations of Adaptive Scheduler also apply to Reactive Mode.

Adaptive Scheduler

Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users because slot allocation on a session cluster with multiple jobs is not defined.

The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism. In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible. You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations:

  • If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session.

One benefit of the Adaptive Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases.

Usage

The following configuration parameters need to be set:

  • jobmanager.scheduler: adaptive: Change from the default scheduler to adaptive scheduler
  • cluster.declarative-resource-management.enabled Declarative resource management must be enabled (enabled by default).

The behavior of Adaptive Scheduler is configured by all configuration options containing adaptive-scheduler in their name.

Limitations

  • Streaming jobs only: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
  • No support for local recovery: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage.
  • No support for partial failover: Partial failover means that the scheduler is able to restart parts (“regions” in Flink’s internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink’s default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job.
  • Unused slots: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.
  • Scaling events trigger job and task restarts, which will increase the number of Task attempts.

Adaptive Batch Scheduler

The Adaptive Batch Scheduler can automatically decide parallelisms of operators for batch jobs. If an operator is not set with a parallelism, the scheduler will decide parallelism for it according to the size of its consumed datasets. This can bring many benefits:

  • Batch job users can be relieved from parallelism tuning
  • Automatically tuned parallelisms can better fit consumed datasets which have a varying volume size every day
  • Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned

Usage

To automatically decide parallelisms for operators with Adaptive Batch Scheduler, you need to:

  • Configure to use Adaptive Batch Scheduler.
  • Set the parallelism of operators to -1.

Configure to use Adaptive Batch Scheduler

To use Adaptive Batch Scheduler, you need to:

In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:

Set the parallelism of operators to -1

Adaptive Batch Scheduler will only decide parallelism for operators whose parallelism is not specified by users (parallelism is -1). So if you want the parallelism of operators to be decided automatically, you should configure as follows:

  • Set parallelism.default: -1
  • Set table.exec.resource.default-parallelism: -1 in SQL jobs.
  • Don’t call setParallelism() for operators in DataStream/DataSet jobs.
  • Don’t call setParallelism() on StreamExecutionEnvironment/ExecutionEnvironment in DataStream/DataSet jobs.

Performance tuning

  1. It’s recommended to use Sort Shuffle and set taskmanager.network.memory.buffers-per-channel to 0. This can decouple the required network memory from parallelism, so that for large scale jobs, the “Insufficient number of network buffers” errors are less likely to happen.
  2. It’s recommended to set jobmanager.adaptive-batch-scheduler.max-parallelism to the parallelism you expect to need in the worst case. Values larger than that are not recommended, because excessive value may affect the performance. This option can affect the number of subpartitions produced by upstream tasks, large number of subpartitions may degrade the performance of hash shuffle and the performance of network transmission due to small packets.

Limitations

  • Batch jobs only: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
  • ALL-EXCHANGES-BLOCKING jobs only: At the moment, Adaptive Batch Scheduler only supports jobs whose shuffle mode is ALL-EXCHANGES-BLOCKING.
  • The decided parallelism will be a power of 2: In order to ensure downstream tasks to consume the same count of subpartitions, the configuration option jobmanager.adaptive-batch-scheduler.max-parallelism should be set to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M <= N).
  • FileInputFormat sources are not supported: FileInputFormat sources are not supported, including StreamExecutionEnvironment#readFile(...) StreamExecutionEnvironment#readTextFile(...) and StreamExecutionEnvironment#createInput(FileInputFormat, ...). Users should use the new sources(FileSystem DataStream Connector or FileSystem SQL Connector) to read files when using the Adaptive Batch Scheduler.
  • Inconsistent broadcast results metrics on WebUI: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See FLIP-187 for details.