State Bootstrapping
Often times applications require some intial state provided by historical data in a file, database, or other system. Because state is managed by Apache Flink’s snapshotting mechanism, for Stateful Function applications, that means writing the intial state into a savepoint that can be used to start the job. Users can bootstrap initial state for Stateful Functions applications using Flink’s State Processor API and a StatefulFunctionSavepointCreator
.
To get started, include the following libraries in your application:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-state-processor</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api_2.12</artifactId>
<version>1.11.1</version>
</dependency>
Attention: The savepoint creator currently only supports initializing the state for Java modules.
State Bootstrap Function
A StateBootstrapFunction
defines how to bootstrap state for a StatefulFunction
instance with a given input.
Each bootstrap functions instance directly corresponds to a StatefulFunction
type. Likewise, each instance is uniquely identified by an address, represented by the type and id of the function being bootstrapped. Any state that is persisted by a bootstrap functions instance will be available to the corresponding live StatefulFunction
instance having the same address.
For example, consider the following state bootstrap function:
public class MyStateBootstrapFunction implements StateBootstrapFunction {
@Persisted
private PersistedValue<MyState> state = PersistedValue.of("my-state", MyState.class);
@Override
public void bootstrap(Context context, Object input) {
state.set(extractStateFromInput(input));
}
}
Assume that this bootstrap function was provided for function type MyFunctionType
, and the id of the bootstrap function instance was id-13
. The function writes persisted state of name my-state
using the given bootstrap data. After restoring a Stateful Functions application from the savepoint generated using this bootstrap function, the stateful function instance with address (MyFunctionType, id-13)
will already have state values available under state name my-state
.
Creating A Savepoint
Savepoints are created by defining certain metadata, such as max parallelism and state backend. The default state backend is RocksDB.
int maxParallelism = 128;
StatefulFunctionsSavepointCreator newSavepoint = new StatefulFunctionsSavepointCreator(maxParallelism);
Each input data set is registered in the savepoint creator with a router that routes each record to zero or more function instances. You may then register any number of function types to the savepoint creator, similar to how functions are registered within a stateful functions module. Finally, specify an output location for the resulting savepoint.
// Read data from a file, database, or other location
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final DataSet<Tuple2<String, Integer>> userSeenCounts = env.fromElements(
Tuple2.of("foo", 4), Tuple2.of("bar", 3), Tuple2.of("joe", 2));
// Register the dataset with a router
newSavepoint.withBootstrapData(userSeenCounts, MyStateBootstrapFunctionRouter::new);
// Register a bootstrap function to process the records
newSavepoint.withStateBootstrapFunctionProvider(
new FunctionType("apache", "my-function"),
ignored -> new MyStateBootstrapFunction());
newSavepoint.write("file:///savepoint/path/");
env.execute();
For full details of how to use Flink’s DataSet
API, please check the official documentation.
Deployment
After creating a new savpepoint, it can be used to provide the initial state for a Stateful Functions application.
When deploying based on an image, pass the -s
command to the Flink JobMaster image.
version: "2.1"
services:
master:
image: my-statefun-application-image
command: -s file:///savepoint/path
When deploying to a Flink session cluster, specify the savepoint argument in the Flink CLI.
$ ./bin/flink run -s file:///savepoint/path stateful-functions-job.jar