Working with State

This document explains how to use Flink’s state abstractions when developing an application.

Keyed State and Operator State

There are two basic kinds of state in Flink: Keyed State and Operator State.

Keyed State

Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.

You can think of Keyed State as Operator State that has been partitioned,or sharded, with exactly one state-partition per key.Each keyed-state is logically bound to a uniquecomposite of <parallel-operator-instance, key>, and since each key“belongs” to exactly one parallel instance of a keyed operator, we canthink of this simply as <operator, key>.

Keyed State is further organized into so-called Key Groups. Key Groups are theatomic unit by which Flink can redistribute Keyed State;there are exactly as many Key Groups as the defined maximum parallelism.During execution each parallel instance of a keyed operator works with the keysfor one or more Key Groups.

Operator State

With Operator State (or non-keyed state), each operator state isbound to one parallel operator instance.The Kafka Connector is a good motivating example for the use of Operator Statein Flink. Each parallel instance of the Kafka consumer maintains a mapof topic partitions and offsets as its Operator State.

The Operator State interfaces support redistributing state amongparallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution.

Raw and Managed State

Keyed State and Operator State exist in two forms: managed and raw.

Managed State is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB.Examples are “ValueState”, “ListState”, etc. Flink’s runtime encodesthe states and writes them into the checkpoints.

Raw State is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes intothe checkpoint. Flink knows nothing about the state’s data structures and sees only the raw bytes.

All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators.Using managed state (rather than raw state) is recommended, since withmanaged state Flink is able to automatically redistribute state when the parallelism ischanged, and also do better memory management.

Attention If your managed state needs custom serialization logic, please see the corresponding guide in order to ensure future compatibility. Flink’s default serializers don’t need special treatment.

Using Managed Keyed State

The managed keyed state interface provides access to different types of state that are all scoped tothe key of the current input element. This means that this type of state can only be usedon a KeyedStream, which can be created via stream.keyBy(…).

Now, we will first look at the different types of state available and then we will seehow they can be used in a program. The available state primitives are:

  • ValueState<T>: This keeps a value that can be updated andretrieved (scoped to key of the input element as mentioned above, so there will possibly be one valuefor each key that the operation sees). The value can be set using update(T) and retrieved usingT value().

  • ListState<T>: This keeps a list of elements. You can append elements and retrieve an Iterableover all currently stored elements. Elements are added using add(T) or addAll(List<T>), the Iterable canbe retrieved using Iterable<T> get(). You can also override the existing list with update(List<T>)

  • ReducingState<T>: This keeps a single value that represents the aggregation of all valuesadded to the state. The interface is similar to ListState but elements added usingadd(T) are reduced to an aggregate using a specified ReduceFunction.

  • AggregatingState<IN, OUT>: This keeps a single value that represents the aggregation of all valuesadded to the state. Contrary to ReducingState, the aggregate type may be different from the typeof elements that are added to the state. The interface is the same as for ListState but elementsadded using add(IN) are aggregated using a specified AggregateFunction.

  • FoldingState<T, ACC>: This keeps a single value that represents the aggregation of all valuesadded to the state. Contrary to ReducingState, the aggregate type may be different from the typeof elements that are added to the state. The interface is similar to ListState but elementsadded using add(T) are folded into an aggregate using a specified FoldFunction.

  • MapState<UK, UV>: This keeps a list of mappings. You can put key-value pairs into the state andretrieve an Iterable over all currently stored mappings. Mappings are added using put(UK, UV) orputAll(Map<UK, UV>). The value associated with a user key can be retrieved using get(UK). The iterableviews for mappings, keys and values can be retrieved using entries(), keys() and values() respectively.

All types of state also have a method clear() that clears the state for the currentlyactive key, i.e. the key of the input element.

Attention FoldingState and FoldingStateDescriptor have been deprecated in Flink 1.4 and will be completely removed in the future. Please use AggregatingState and AggregatingStateDescriptor instead.

It is important to keep in mind that these state objects are only used for interfacingwith state. The state is not necessarily stored inside but might reside on disk or somewhere else.The second thing to keep in mind is that the value you get from the statedepends on the key of the input element. So the value you get in one invocation of youruser function can differ from the value in another invocation if the keys involved are different.

To get a state handle, you have to create a StateDescriptor. This holds the name of the state(as we will see later, you can create several states, and they have to have unique names sothat you can reference them), the type of the values that the state holds, and possiblya user-specified function, such as a ReduceFunction. Depending on what type of state youwant to retrieve, you create either a ValueStateDescriptor, a ListStateDescriptor,a ReducingStateDescriptor, a FoldingStateDescriptor or a MapStateDescriptor.

State is accessed using the RuntimeContext, so it is only possible in rich functions.Please see here forinformation about that, but we will also see an example shortly. The RuntimeContext thatis available in a RichFunction has these methods for accessing state:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)This is an example FlatMapFunction that shows how all of the parts fit together:
  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
  2. /**
  3. * The ValueState handle. The first field is the count, the second field a running sum.
  4. */
  5. private transient ValueState<Tuple2<Long, Long>> sum;
  6. @Override
  7. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
  8. // access the state value
  9. Tuple2<Long, Long> currentSum = sum.value();
  10. // update the count
  11. currentSum.f0 += 1;
  12. // add the second field of the input value
  13. currentSum.f1 += input.f1;
  14. // update the state
  15. sum.update(currentSum);
  16. // if the count reaches 2, emit the average and clear the state
  17. if (currentSum.f0 >= 2) {
  18. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
  19. sum.clear();
  20. }
  21. }
  22. @Override
  23. public void open(Configuration config) {
  24. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
  25. new ValueStateDescriptor<>(
  26. "average", // the state name
  27. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
  28. Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
  29. sum = getRuntimeContext().getState(descriptor);
  30. }
  31. }
  32. // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
  33. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
  34. .keyBy(0)
  35. .flatMap(new CountWindowAverage())
  36. .print();
  37. // the printed output will be (1,4) and (1,5)
  1. class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
  2. private var sum: ValueState[(Long, Long)] = _
  3. override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
  4. // access the state value
  5. val tmpCurrentSum = sum.value
  6. // If it hasn't been used before, it will be null
  7. val currentSum = if (tmpCurrentSum != null) {
  8. tmpCurrentSum
  9. } else {
  10. (0L, 0L)
  11. }
  12. // update the count
  13. val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
  14. // update the state
  15. sum.update(newSum)
  16. // if the count reaches 2, emit the average and clear the state
  17. if (newSum._1 >= 2) {
  18. out.collect((input._1, newSum._2 / newSum._1))
  19. sum.clear()
  20. }
  21. }
  22. override def open(parameters: Configuration): Unit = {
  23. sum = getRuntimeContext.getState(
  24. new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
  25. )
  26. }
  27. }
  28. object ExampleCountWindowAverage extends App {
  29. val env = StreamExecutionEnvironment.getExecutionEnvironment
  30. env.fromCollection(List(
  31. (1L, 3L),
  32. (1L, 5L),
  33. (1L, 7L),
  34. (1L, 4L),
  35. (1L, 2L)
  36. )).keyBy(_._1)
  37. .flatMap(new CountWindowAverage())
  38. .print()
  39. // the printed output will be (1,4) and (1,5)
  40. env.execute("ExampleManagedState")
  41. }

This example implements a poor man’s counting window. We key the tuples by the first field(in the example all have the same key 1). The function stores the count and a running sum ina ValueState. Once the count reaches 2 it will emit the average and clear the state so thatwe start over from 0. Note that this would keep a different state value for each different inputkey if we had tuples with different values in the first field.

State Time-To-Live (TTL)

A time-to-live (TTL) can be assigned to the keyed state of any type. If a TTL is configured and astate value has expired, the stored value will be cleaned up on a best effort basis which isdiscussed in more detail below.

All state collection types support per-entry TTLs. This means that list elements and map entriesexpire independently.

In order to use state TTL one must first build a StateTtlConfig configuration object. The TTL functionality can then be enabled in any state descriptor by passing the configuration:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.state.ValueStateDescriptor;
  3. import org.apache.flink.api.common.time.Time;
  4. StateTtlConfig ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build();
  9. ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
  10. stateDescriptor.enableTimeToLive(ttlConfig);
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.state.ValueStateDescriptor
  3. import org.apache.flink.api.common.time.Time
  4. val ttlConfig = StateTtlConfig
  5. .newBuilder(Time.seconds(1))
  6. .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  7. .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  8. .build
  9. val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
  10. stateDescriptor.enableTimeToLive(ttlConfig)

The configuration has several options to consider:

The first parameter of the newBuilder method is mandatory, it is the time-to-live value.

The update type configures when the state TTL is refreshed (by default OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
  • StateTtlConfig.UpdateType.OnReadAndWrite - also on read accessThe state visibility configures whether the expired value is returned on read access if it is not cleaned up yet (by default NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - expired value is never returned

  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - returned if still availableIn case of NeverReturnExpired, the expired state behaves as if it does not exist anymore, even if it still has to be removed. The option can be useful for use cases where data has to become unavailable for read access strictly after TTL, e.g. application working with privacy sensitive data.

Another option ReturnExpiredIfNotCleanedUp allows to return the expired state before its cleanup.

Notes:

  • The state backends store the timestamp of the last modification along with the user value, which means that enabling this feature increases consumption of state storage. Heap state backend stores an additional Java object with a reference to the user state object and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry.

  • Only TTLs in reference to processing time are currently supported.

  • Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versawill lead to compatibility failure and StateMigrationException.

  • The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.

  • The map state with TTL currently supports null user values only if the user value serializer can handle null values. If the serializer does not support null values, it can be wrapped with NullableSerializer at the cost of an extra byte in the serialized form.

Cleanup of Expired State

By default, expired values are only removed when they are read out explicitly, e.g. by calling ValueState.value().

Attention This means that by default if expired state is not read, it won’t be removed, possibly leading to ever growing state. This might change in future releases.

Cleanup in full snapshot

Additionally, you can activate the cleanup at the moment of taking the full state snapshot which will reduce its size. The local state is not cleaned up under the current implementation but it will not include the removed expired state in case of restoration from the previous snapshot.It can be configured in StateTtlConfig:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. import org.apache.flink.api.common.time.Time;
  3. StateTtlConfig ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot()
  6. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. import org.apache.flink.api.common.time.Time
  3. val ttlConfig = StateTtlConfig
  4. .newBuilder(Time.seconds(1))
  5. .cleanupFullSnapshot
  6. .build

This option is not applicable for the incremental checkpointing in the RocksDB state backend.

Notes:

  • For existing jobs, this cleanup strategy can be activated or deactivated anytime in StateTtlConfig, e.g. after restart from savepoint.

Cleanup in background

Besides cleanup in full snapshot, you can also activate the cleanup in background. The following option will activate a default background cleanup in StateTtlConfig if it is supported for the used backend:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. StateTtlConfig ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupInBackground()
  5. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. val ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupInBackground
  5. .build

For more fine-grained control over some special cleanup in background, you can configure it separately as described below.Currently, heap state backend relies on incremental cleanup and RocksDB backend uses compaction filter for background cleanup.

Incremental cleanup

Another option is to trigger cleanup of some state entries incrementally.The trigger can be a callback from each state access or/and each record processing.If this cleanup strategy is active for certain state,The storage backend keeps a lazy global iterator for this state over all its entries.Every time incremental cleanup is triggered, the iterator is advanced.The traversed state entries are checked and expired ones are cleaned up.

This feature can be activated in StateTtlConfig:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. StateTtlConfig ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupIncrementally(10, true)
  5. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. val ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupIncrementally(10, true)
  5. .build

This strategy has two parameters. The first one is number of checked state entries per each cleanup triggering.If enabled, it is always triggered per each state access.The second parameter defines whether to trigger cleanup additionally per each record processing.If you enable the default background cleanup then this strategy will be activated for heap backend with 5 checked entries and without cleanup per record processing.

Notes:

  • If no access happens to the state or no records are processed, expired state will persist.
  • Time spent for the incremental cleanup increases record processing latency.
  • At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.
  • If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys while iterating because of its specific implementation which does not support concurrent modifications. Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
  • For existing jobs, this cleanup strategy can be activated or deactivated anytime in StateTtlConfig, e.g. after restart from savepoint.
Cleanup during RocksDB compaction

If RocksDB state backend is used, another cleanup strategy is to activate Flink specific compaction filter.RocksDB periodically runs asynchronous compactions to merge state updates and reduce storage.Flink compaction filter checks expiration timestamp of state entries with TTLand excludes expired values.

This feature is disabled by default. It has to be firstly activated for the RocksDB backendby setting Flink configuration option state.backend.rocksdb.ttl.compaction.filter.enabledor by calling RocksDBStateBackend::enableTtlCompactionFilter if a custom RocksDB state backend is created for a job.Then any state with TTL can be configured to use the filter:

  1. import org.apache.flink.api.common.state.StateTtlConfig;
  2. StateTtlConfig ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupInRocksdbCompactFilter(1000)
  5. .build();
  1. import org.apache.flink.api.common.state.StateTtlConfig
  2. val ttlConfig = StateTtlConfig
  3. .newBuilder(Time.seconds(1))
  4. .cleanupInRocksdbCompactFilter(1000)
  5. .build

RocksDB compaction filter will query current timestamp, used to check expiration, from Flink every time after processing certain number of state entries.You can change it and pass a custom value to StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) method. Updating the timestamp more often can improve cleanup speed but it decreases compaction performance because it uses JNI call from native code.If you enable the default background cleanup then this strategy will be activated for RocksDB backend and the current timestamp will be queried each time 1000 entries have been processed.

You can activate debug logs from the native code of RocksDB filter by activating debug level for FlinkCompactionFilter:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

Notes:

  • Calling of TTL filter during compaction slows it down. The TTL filter has to parse timestamp of last access and check its expiration for every stored state entry per key which is being compacted. In case of collection state type (list or map) the check is also invoked per stored element.
  • If this feature is used with a list state which has elements with non-fixed byte length,the native TTL filter has to call additionally a Flink java type serializer of the element over JNI per each state entrywhere at least the first element has expired to determine the offset of the next unexpired element.
  • For existing jobs, this cleanup strategy can be activated or deactivated anytime in StateTtlConfig, e.g. after restart from savepoint.

State in the Scala DataStream API

In addition to the interface described above, the Scala API has shortcuts for statefulmap() or flatMap() functions with a single ValueState on KeyedStream. The user functiongets the current value of the ValueState in an Option and must return an updated value thatwill be used to update the state.

  1. val stream: DataStream[(String, Int)] = ...
  2. val counts: DataStream[(String, Int)] = stream
  3. .keyBy(_._1)
  4. .mapWithState((in: (String, Int), count: Option[Int]) =>
  5. count match {
  6. case Some(c) => ( (in._1, c), Some(c + in._2) )
  7. case None => ( (in._1, 0), Some(in._2) )
  8. })

Using Managed Operator State

To use managed operator state, a stateful function can implement either the more general CheckpointedFunctioninterface, or the ListCheckpointed<T extends Serializable> interface.

CheckpointedFunction

The CheckpointedFunction interface provides access to non-keyed state with differentredistribution schemes. It requires the implementation of two methods:

  1. void snapshotState(FunctionSnapshotContext context) throws Exception;
  2. void initializeState(FunctionInitializationContext context) throws Exception;

Whenever a checkpoint has to be performed, snapshotState() is called. The counterpart, initializeState(),is called every time the user-defined function is initialized, be that when the function is first initializedor be that when the function is actually recovering from an earlier checkpoint. Given this, initializeState() is notonly the place where different types of state are initialized, but also where state recovery logic is included.

Currently, list-style managed operator state is supported. The stateis expected to be a List of serializable objects, independent from each other,thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at whichnon-keyed state can be redistributed. Depending on the state accessing method,the following redistribution schemes are defined:

  • Even-split redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation ofall lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators.Each operator gets a sublist, which can be empty, or contain one or more elements.As an example, if with parallelism 1 the checkpointed state of an operatorcontains elements element1 and element2, when increasing the parallelism to 2, element1 may end up in operator instance 0,while element2 will go to operator instance 1.

  • Union redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation ofall lists. On restore/redistribution, each operator gets the complete list of state elements.

Below is an example of a stateful SinkFunction that uses CheckpointedFunctionto buffer elements before sending them to the outside world. It demonstratesthe basic even-split redistribution list state:

  1. public class BufferingSink
  2. implements SinkFunction<Tuple2<String, Integer>>,
  3. CheckpointedFunction {
  4. private final int threshold;
  5. private transient ListState<Tuple2<String, Integer>> checkpointedState;
  6. private List<Tuple2<String, Integer>> bufferedElements;
  7. public BufferingSink(int threshold) {
  8. this.threshold = threshold;
  9. this.bufferedElements = new ArrayList<>();
  10. }
  11. @Override
  12. public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
  13. bufferedElements.add(value);
  14. if (bufferedElements.size() == threshold) {
  15. for (Tuple2<String, Integer> element: bufferedElements) {
  16. // send it to the sink
  17. }
  18. bufferedElements.clear();
  19. }
  20. }
  21. @Override
  22. public void snapshotState(FunctionSnapshotContext context) throws Exception {
  23. checkpointedState.clear();
  24. for (Tuple2<String, Integer> element : bufferedElements) {
  25. checkpointedState.add(element);
  26. }
  27. }
  28. @Override
  29. public void initializeState(FunctionInitializationContext context) throws Exception {
  30. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  31. new ListStateDescriptor<>(
  32. "buffered-elements",
  33. TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
  34. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  35. if (context.isRestored()) {
  36. for (Tuple2<String, Integer> element : checkpointedState.get()) {
  37. bufferedElements.add(element);
  38. }
  39. }
  40. }
  41. }
  1. class BufferingSink(threshold: Int = 0)
  2. extends SinkFunction[(String, Int)]
  3. with CheckpointedFunction {
  4. @transient
  5. private var checkpointedState: ListState[(String, Int)] = _
  6. private val bufferedElements = ListBuffer[(String, Int)]()
  7. override def invoke(value: (String, Int), context: Context): Unit = {
  8. bufferedElements += value
  9. if (bufferedElements.size == threshold) {
  10. for (element <- bufferedElements) {
  11. // send it to the sink
  12. }
  13. bufferedElements.clear()
  14. }
  15. }
  16. override def snapshotState(context: FunctionSnapshotContext): Unit = {
  17. checkpointedState.clear()
  18. for (element <- bufferedElements) {
  19. checkpointedState.add(element)
  20. }
  21. }
  22. override def initializeState(context: FunctionInitializationContext): Unit = {
  23. val descriptor = new ListStateDescriptor[(String, Int)](
  24. "buffered-elements",
  25. TypeInformation.of(new TypeHint[(String, Int)]() {})
  26. )
  27. checkpointedState = context.getOperatorStateStore.getListState(descriptor)
  28. if(context.isRestored) {
  29. for(element <- checkpointedState.get()) {
  30. bufferedElements += element
  31. }
  32. }
  33. }
  34. }

The initializeState method takes as argument a FunctionInitializationContext. This is used to initializethe non-keyed state “containers”. These are a container of type ListState where the non-keyed state objectsare going to be stored upon checkpointing.

Note how the state is initialized, similar to keyed state,with a StateDescriptor that contains the state name and informationabout the type of the value that the state holds:

  1. ListStateDescriptor<Tuple2<String, Integer>> descriptor =
  2. new ListStateDescriptor<>(
  3. "buffered-elements",
  4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
  5. checkpointedState = context.getOperatorStateStore().getListState(descriptor);
  1. val descriptor = new ListStateDescriptor[(String, Long)](
  2. "buffered-elements",
  3. TypeInformation.of(new TypeHint[(String, Long)]() {})
  4. )
  5. checkpointedState = context.getOperatorStateStore.getListState(descriptor)

The naming convention of the state access methods contain its redistributionpattern followed by its state structure. For example, to use list state with theunion redistribution scheme on restore, access the state by using getUnionListState(descriptor).If the method name does not contain the redistribution pattern, e.g. getListState(descriptor),it simply implies that the basic even-split redistribution scheme will be used.

After initializing the container, we use the isRestored() method of the context to check if we arerecovering after a failure. If this is true, i.e. we are recovering, the restore logic is applied.

As shown in the code of the modified BufferingSink, this ListState recovered during stateinitialization is kept in a class variable for future use in snapshotState(). There the ListState is clearedof all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.

As a side note, the keyed state can also be initialized in the initializeState() method. This can be doneusing the provided FunctionInitializationContext.

ListCheckpointed

The ListCheckpointed interface is a more limited variant of CheckpointedFunction,which only supports list-style state with even-split redistribution scheme on restore.It also requires the implementation of two methods:

  1. List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
  2. void restoreState(List<T> state) throws Exception;

On snapshotState() the operator should return a list of objects to checkpoint andrestoreState has to handle such a list upon recovery. If the state is not re-partitionable, you can alwaysreturn a Collections.singletonList(MY_STATE) in the snapshotState().

Stateful Source Functions

Stateful sources require a bit more care as opposed to other operators.In order to make the updates to the state and output collection atomic (required for exactly-once semanticson failure/recovery), the user is required to get a lock from the source’s context.

  1. public static class CounterSource
  2. extends RichParallelSourceFunction<Long>
  3. implements ListCheckpointed<Long> {
  4. /** current offset for exactly once semantics */
  5. private Long offset = 0L;
  6. /** flag for job cancellation */
  7. private volatile boolean isRunning = true;
  8. @Override
  9. public void run(SourceContext<Long> ctx) {
  10. final Object lock = ctx.getCheckpointLock();
  11. while (isRunning) {
  12. // output and state update are atomic
  13. synchronized (lock) {
  14. ctx.collect(offset);
  15. offset += 1;
  16. }
  17. }
  18. }
  19. @Override
  20. public void cancel() {
  21. isRunning = false;
  22. }
  23. @Override
  24. public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
  25. return Collections.singletonList(offset);
  26. }
  27. @Override
  28. public void restoreState(List<Long> state) {
  29. for (Long s : state)
  30. offset = s;
  31. }
  32. }
  1. class CounterSource
  2. extends RichParallelSourceFunction[Long]
  3. with ListCheckpointed[Long] {
  4. @volatile
  5. private var isRunning = true
  6. private var offset = 0L
  7. override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
  8. val lock = ctx.getCheckpointLock
  9. while (isRunning) {
  10. // output and state update are atomic
  11. lock.synchronized({
  12. ctx.collect(offset)
  13. offset += 1
  14. })
  15. }
  16. }
  17. override def cancel(): Unit = isRunning = false
  18. override def restoreState(state: util.List[Long]): Unit =
  19. for (s <- state) {
  20. offset = s
  21. }
  22. override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
  23. Collections.singletonList(offset)
  24. }

Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the org.apache.flink.runtime.state.CheckpointListener interface.