Checkpointing
Every function and operator in Flink can be stateful (see working with state for details). Stateful functions store data across the processing of individual elements/events, making state a critical building block for any type of more elaborate operation.
In order to make state fault tolerant, Flink needs to checkpoint the state. Checkpoints allow Flink to recover state and positions in the streams to give the application the same semantics as a failure-free execution.
The documentation on streaming fault tolerance describes in detail the technique behind Flink’s streaming fault tolerance mechanism.
Prerequisites
Flink’s checkpointing mechanism interacts with durable storage for streams and state. In general, it requires:
- A persistent (or durable) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, …).
- A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, …)
Enabling and Configuring Checkpointing
By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n)
on the StreamExecutionEnvironment
, where n is the checkpoint interval in milliseconds.
Other parameters for checkpointing include:
checkpoint storage: You can set the location where checkpoint snapshots are made durable. By default Flink will use the JobManager’s heap. For production deployments it is recommended to instead use a durable filesystem. See checkpoint storage for more details on the available options for job-wide and cluster-wide configuration.
exactly-once vs. at-least-once: You can optionally pass a mode to the
enableCheckpointing(n)
method to choose between the two guarantee levels. Exactly-once is preferable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications.checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then.
minimum time between checkpoints: To make sure that the streaming application makes a certain amount of progress between checkpoints, one can define how much time needs to pass between checkpoints. If this value is set for example to 5000, the next checkpoint will be started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. Note that this implies that the checkpoint interval will never be smaller than this parameter.
It is often easier to configure applications by defining the “time between checkpoints” than the checkpoint interval, because the “time between checkpoints” is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow).
Note that this value also implies that the number of concurrent checkpoints is one.
tolerable checkpoint failure number: This defines how many consecutive checkpoint failures will be tolerated, before the whole job is failed over. The default value is
0
, which means no checkpoint failures will be tolerated, and the job will fail on first reported checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored.number of concurrent checkpoints: By default, the system will not trigger another checkpoint while one is still in progress. This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints (100s of milliseconds) to re-process very little upon failures.
This option cannot be used when a minimum time between checkpoints is defined.
externalized checkpoints: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the deployment notes on externalized checkpoints.
unaligned checkpoints: You can enable unaligned checkpoints to greatly reduce checkpointing times under backpressure. This only works for exactly-once checkpoints and with one concurrent checkpoint.
checkpoints with finished tasks: By default Flink will continue performing checkpoints even if parts of the DAG have finished processing all of their records. Please refer to important considerations for details.
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
// enables the unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
// sets the checkpoint storage where checkpoint snapshots will be written
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "hdfs:///my/checkpoint/dir");
env.configure(config);
// enable checkpointing with finished tasks
Configuration config = new Configuration();
config.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// only two consecutive checkpoint failures are tolerated
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2)
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// enable externalized checkpoints which are retained
// after job cancellation
env.getCheckpointConfig().setExternalizedCheckpointRetention(
ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
// enables the unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
// sets the checkpoint storage where checkpoint snapshots will be written
env.getCheckpointConfig.setCheckpointStorage("hdfs:///my/checkpoint/dir")
// enable checkpointing with finished tasks
val config = new Configuration()
config.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true)
env.configure(config)
Python
env = StreamExecutionEnvironment.get_execution_environment()
# start a checkpoint every 1000 ms
env.enable_checkpointing(1000)
# advanced options:
# set mode to exactly-once (this is the default)
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# make sure 500 ms of progress happen between checkpoints
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
# checkpoints have to complete within one minute, or are discarded
env.get_checkpoint_config().set_checkpoint_timeout(60000)
# only two consecutive checkpoint failures are tolerated
env.get_checkpoint_config().set_tolerable_checkpoint_failure_number(2)
# allow only one checkpoint to be in progress at the same time
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# enable externalized checkpoints which are retained after job cancellation
env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION)
# enables the unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()
Related Config Options
Some more parameters and/or defaults may be set via Flink configuration file (see configuration)` for a full guide):
Key | Default | Type | Description |
---|---|---|---|
execution.checkpointing.aligned-checkpoint-timeout | 0 ms | Duration | Only relevant if execution.checkpointing.unaligned.enabled is enabled.If timeout is 0, checkpoints will always start unaligned. If timeout has a positive value, checkpoints will start aligned. If during checkpointing, checkpoint start delay exceeds this timeout, alignment will timeout and checkpoint barrier will start working as unaligned checkpoint. |
execution.checkpointing.checkpoints-after-tasks-finish | true | Boolean | Feature toggle for enabling checkpointing even if some of tasks have finished. Before you enable it, please take a look at the important considerations |
execution.checkpointing.cleaner.parallel-mode | true | Boolean | Option whether to discard a checkpoint’s states in parallel using the ExecutorService passed into the cleaner |
execution.checkpointing.create-subdir | true | Boolean | Whether to create sub-directories named by job id under the ‘execution.checkpointing.dir ‘ to store the data files and meta data of checkpoints. The default value is true to enable user could run several jobs with the same checkpoint directory at the same time. If this value is set to false, pay attention not to run several jobs with the same directory simultaneously.WARNING: This is an advanced configuration. If set to false, users must ensure that no multiple jobs are run with the same checkpoint directory, and that no files exist other than those necessary for the restoration of the current job when starting a new job. |
execution.checkpointing.data-inline-threshold | 20 kb | MemorySize | The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB. |
execution.checkpointing.dir | (none) | String | The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the ‘execution.checkpointing.storage’ is set to ‘jobmanager’, only the meta data of checkpoints will be stored in this directory. |
execution.checkpointing.externalized-checkpoint-retention | NO_EXTERNALIZED_CHECKPOINTS | Enum | Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status JobStatus#FAILED or JobStatus#SUSPENDED ). In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.The mode defines how an externalized checkpoint should be cleaned up on job cancellation. If you choose to retain externalized checkpoints on cancellation you have to handle checkpoint clean up manually when you cancel the job as well (terminating with job status JobStatus#CANCELED ).The target directory for externalized checkpoints is configured via execution.checkpointing.dir .Possible values:
|
execution.checkpointing.file-merging.across-checkpoint-boundary | false | Boolean | Only relevant if execution.checkpointing.file-merging.enabled is enabled.Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file. |
execution.checkpointing.file-merging.enabled | false | Boolean | Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you’re aware of the possible effects of enabling it. |
execution.checkpointing.file-merging.max-file-size | 32 mb | MemorySize | Max size of a physical file for merged checkpoints. |
execution.checkpointing.file-merging.max-space-amplification | 2.0 | Float | Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space. Any value below 1f means disabling the space control. |
execution.checkpointing.file-merging.pool-blocking | false | Boolean | Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned. |
execution.checkpointing.incremental | false | Boolean | Option whether to create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Once enabled, the state size shown in web UI or fetched from rest API only represents the delta checkpoint size instead of full checkpoint size. Some state backends may not support incremental checkpoints and ignore this option. |
execution.checkpointing.interval | (none) | Duration | Gets the interval in which checkpoints are periodically scheduled. This setting defines the base interval. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints , execution.checkpointing.min-pause and execution.checkpointing.interval-during-backlog |
execution.checkpointing.interval-during-backlog | (none) | Duration | If it is not null and any source reports isProcessingBacklog=true, it is the interval in which checkpoints are periodically scheduled. Checkpoint triggering may be delayed by the settings execution.checkpointing.max-concurrent-checkpoints and execution.checkpointing.min-pause .Note: if it is not null, the value must either be 0, which means the checkpoint is disabled during backlog, or be larger than or equal to execution.checkpointing.interval. |
execution.checkpointing.local-backup.dirs | (none) | String | The config parameter defining the root directories for storing file-based state for local recovery. Local recovery currently only covers keyed state backends. If not configured it will default to <WORKING_DIR>/localState. The <WORKING_DIR> can be configured via process.taskmanager.working-dir |
execution.checkpointing.local-backup.enabled | false | Boolean | This option configures local backup for the state backend, which indicates whether to make backup checkpoint on local disk. If not configured, fallback to execution.state-recovery.from-local. By default, local backup is deactivated. Local backup currently only covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend). |
execution.checkpointing.max-concurrent-checkpoints | 1 | Integer | The maximum number of checkpoint attempts that may be in progress at the same time. If this value is n, then no checkpoints will be triggered while n checkpoint attempts are currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need to finish or expire. |
execution.checkpointing.min-pause | 0 ms | Duration | The minimal pause between checkpointing attempts. This setting defines how soon thecheckpoint coordinator may trigger another checkpoint after it becomes possible to triggeranother checkpoint with respect to the maximum number of concurrent checkpoints(see execution.checkpointing.max-concurrent-checkpoints ).If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure that a minimum amount of time passes where no checkpoint is in progress at all. |
execution.checkpointing.mode | EXACTLY_ONCE | Enum | The checkpointing mode (exactly-once vs. at-least-once). Possible values:
|
execution.checkpointing.num-retained | 1 | Integer | The maximum number of completed checkpoints to retain. |
execution.checkpointing.savepoint-dir | (none) | String | The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend). |
execution.checkpointing.storage | (none) | String | The checkpoint storage implementation to be used to checkpoint state. The implementation can be specified either via their shortcut name, or via the class name of a CheckpointStorageFactory . If a factory is specified it is instantiated via its zero argument constructor and its CheckpointStorageFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.Recognized shortcut names are ‘jobmanager’ and ‘filesystem’. ‘execution.checkpointing.storage’ and ‘execution.checkpointing.dir’ are usually combined to configure the checkpoint location. By default, the checkpoint meta data and actual program state will be stored in the JobManager’s memory directly. When ‘execution.checkpointing.storage’ is set to ‘jobmanager’, if ‘execution.checkpointing.dir’ is configured, the meta data of checkpoints will be persisted to the path specified by ‘execution.checkpointing.dir’. Otherwise, the meta data will be stored in the JobManager’s memory. When ‘execution.checkpointing.storage’ is set to ‘filesystem’, a valid path must be configured to ‘execution.checkpointing.dir’, and the checkpoint meta data and actual program state will both be persisted to the path. |
execution.checkpointing.timeout | 10 min | Duration | The maximum time that a checkpoint may take before being discarded. |
execution.checkpointing.tolerable-failed-checkpoints | 0 | Integer | The tolerable checkpoint consecutive failure number. If set to 0, that means we do not tolerance any checkpoint failure. This only applies to the following failure reasons: IOException on the Job Manager, failures in the async phase on the Task Managers and checkpoint expiration due to a timeout. Failures originating from the sync phase on the Task Managers are always forcing failover of an affected task. Other types of checkpoint failures (such as checkpoint being subsumed) are being ignored. |
execution.checkpointing.unaligned.enabled | false | Boolean | Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure. Unaligned checkpoints contain data stored in buffers as part of the checkpoint state, which allows checkpoint barriers to overtake these buffers. Thus, the checkpoint duration becomes independent of the current throughput as checkpoint barriers are effectively not embedded into the stream of data anymore. Unaligned checkpoints can only be enabled if execution.checkpointing.mode is EXACTLY_ONCE and if execution.checkpointing.max-concurrent-checkpoints is 1 |
execution.checkpointing.unaligned.forced | false | Boolean | Forces unaligned checkpoints, particularly allowing them for iterative jobs. |
execution.checkpointing.unaligned.interruptible-timers.enabled | false | Boolean | Allows unaligned checkpoints to skip timers that are currently being fired. For this feature to be enabled, it must be also supported by the operator. Currently this is supported by all TableStreamOperators and CepOperator. |
execution.checkpointing.unaligned.max-subtasks-per-channel-state-file | 5 | Integer | Defines the maximum number of subtasks that share the same channel state file. It can reduce the number of small files when enable unaligned checkpoint. Each subtask will create a new channel state file when this is configured to 1. |
execution.checkpointing.write-buffer-size | 4096 | Integer | The default size of the write buffer for the checkpoint streams that write to file systems. The actual write buffer size is determined to be the maximum of the value of this option and option ‘execution.checkpointing.data-inline-threshold’. |
Selecting Checkpoint Storage
Flink’s checkpointing mechanism stores consistent snapshots of all the state in timers and stateful operators, including connectors, windows, and any user-defined state. Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured Checkpoint Storage.
By default, checkpoints are stored in memory in the JobManager. For proper persistence of large state, Flink supports various approaches for checkpointing state in other locations. The choice of checkpoint storage can be configured like the following code snippet.
Configuration config = new Configuration();
config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "...");
env.configure(config);
It is strongly encouraged that checkpoints be stored in a highly-available filesystem for production deployments.
See checkpoint storage for more details on the available options for job-wide and cluster-wide configuration.
State Checkpoints in Iterative Jobs
Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: env.enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE, force = true)
.
Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
Checkpointing with parts of the graph finished
Starting from Flink 1.14 it is possible to continue performing checkpoints even if parts of the job graph have finished processing all data, which might happen if it contains bounded sources. This feature is enabled by default since 1.15, and it could be disabled via a feature flag:
Configuration config = new Configuration();
config.set(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
Once the tasks/subtasks are finished, they do not contribute to the checkpoints any longer. This is an important consideration when implementing any custom operators or UDFs (User-Defined Functions).
In order to support checkpointing with tasks that finished, we adjusted the task lifecycle and introduced the StreamOperator#finish method. This method is expected to be a clear cutoff point for flushing any remaining buffered state. All checkpoints taken after the finish method has been called should be empty (in most cases) and should not contain any buffered data since there will be no way to emit this data. One notable exception is if your operator has some pointers to transactions in external systems (i.e. order to implement the exactly-once semantic). In such a case, checkpoints taken after invoking the finish()
method should keep a pointer to the last transaction(s) that will be committed in the final checkpoint before the operator is closed. A good built-in example of this are exactly-once sinks and the TwoPhaseCommitSinkFunction
.
How does this impact the operator state?
There is a special handling for UnionListState
, which has often been used to implement a global view over offsets in an external system (i.e. storing current offsets of Kafka partitions). If we had discarded a state for a single subtask that had its close
method called, we would have lost the offsets for partitions that it had been assigned. In order to work around this problem, we let checkpoints succeed only if none or all subtasks that use UnionListState
are finished.
We have not seen ListState
used in a similar way, but you should be aware that any state checkpointed after the close
method will be discarded and not be available after a restore.
Any operator that is prepared to be rescaled should work well with tasks that partially finish. Restoring from a checkpoint where only a subset of tasks finished is equivalent to restoring such a task with the number of new subtasks equal to the number of running tasks.
Waiting for the final checkpoint before task exit
To ensure all the records could be committed for operators using the two-phase commit, the tasks would wait for the final checkpoint completed successfully after all the operators finished. The final checkpoint would be triggered immediately after all operators have reached end of data, without waiting for periodic triggering, but the job will need to wait for this final checkpoint to be completed.
Unify file merging mechanism for checkpoints (Experimental)
The unified file merging mechanism for checkpointing is introduced to Flink 1.20 as an MVP (“minimum viable product”) feature, which allows scattered small checkpoint files to be written into larger files, reducing the number of file creations and file deletions, which alleviates the pressure of file system metadata management raised by the file flooding problem during checkpoints. The mechanism can be enabled by setting execution.checkpointing.file-merging.enabled
to true
. Note that as a trade-off, enabling this mechanism may lead to space amplification, that is, the actual occupation on the file system will be larger than actual state size. execution.checkpointing.file-merging.max-space-amplification
can be used to limit the upper bound of space amplification.
This mechanism is applicable to keyed state, operator state and channel state in Flink. Merging at subtask level is provided for shared scope state; Merging at TaskManager level is provided for private scope state. The maximum number of subtasks allowed to be written to a single file can be configured through the execution.checkpointing.file-merging.max-subtasks-per-file
option.
This feature also supports merging files across checkpoints. To enable this, set execution.checkpointing.file-merging.across-checkpoint-boundary
to true
.
This mechanism introduces a file pool to handle concurrent writing scenarios. There are two modes, the non-blocking mode will always provide usable physical file without blocking when receive a file request, it may create many physical files if poll file frequently; while the blocking mode will be blocked until there are returned files available in the file pool. This can be configured via setting execution.checkpointing.file-merging.pool-blocking
as true
for blocking or false
for non-blocking.