API 迁移指南
- Migrating from Flink 1.3+ to Flink 1.7
- Migrating from Flink 1.2 to Flink 1.3
- Migrating from Flink 1.1 to Flink 1.2
Migrating from Flink 1.3+ to Flink 1.7
API changes for serializer snapshots
This would be relevant mostly for users implementing custom TypeSerializer
s for their state.
The old TypeSerializerConfigSnapshot
abstraction is now deprecated, and will be fully removed in the futurein favor of the new TypeSerializerSnapshot
. For details and guides on how to migrate, please seeMigrating from deprecated serializer snapshot APIs before Flink 1.7.
Migrating from Flink 1.2 to Flink 1.3
There are a few APIs that have been changed since Flink 1.2. Most of the changes are documented in theirspecific documentations. The following is a consolidated list of API changes and links to details for migration whenupgrading to Flink 1.3.
TypeSerializer interface changes
This would be relevant mostly for users implementing custom TypeSerializer
s for their state.
Since Flink 1.3, two additional methods have been added that are related to serializer compatibilityacross savepoint restores. Please seeHandling serializer upgrades and compatibilityfor further details on how to implement these methods.
ProcessFunction is always a RichFunction
In Flink 1.2, ProcessFunction
and its rich variant RichProcessFunction
was introduced.Since Flink 1.3, RichProcessFunction
was removed and ProcessFunction
is now always a RichFunction
with access tothe lifecycle methods and runtime context.
Flink CEP library API changes
The CEP library in Flink 1.3 ships with a number of new features which have led to some changes in the API.Please visit the CEP Migration docs for details.
Logger dependencies removed from Flink core artifacts
In Flink 1.3, to make sure that users can use their own custom logging framework, core Flink artifacts arenow clean of specific logger dependencies.
Example and quickstart archetypes already have loggers specified and should not be affected.For other custom projects, make sure to add logger dependencies. For example, in Maven’s pom.xml
, you can add:
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
Migrating from Flink 1.1 to Flink 1.2
As mentioned in the State documentation, Flink has two types of state:keyed and non-keyed state (also called operator state). Both types are available toboth operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern thedeprecation of the aligned window operators from Flink 1.1 (see Aligned Processing Time Window Operators).
The migration process will serve two goals:
allow your functions to take advantage of the new features introduced in Flink 1.2, such as rescaling,
make sure that your new Flink 1.2 job will be able to resume execution from a savepoint generated by itsFlink 1.1 predecessor.
After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2simply by taking a savepoint with your Flink 1.1 job and giving it toyour Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where itsFlink 1.1 predecessor left off.
Example User Functions
As running examples for the remainder of this document we will use the CountMapper
and the BufferingSink
functions. The first is an example of a function with keyed state, whilethe second has non-keyed state. The code for the aforementioned two functions in Flink 1.1 is presented below:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void open(Configuration parameters) throws Exception {
counter = getRuntimeContext().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
}
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
Checkpointed<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private ArrayList<Tuple2<String, Integer>> bufferedElements;
BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public ArrayList<Tuple2<String, Integer>> snapshotState(
long checkpointId, long checkpointTimestamp) throws Exception {
return bufferedElements;
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
bufferedElements.addAll(state);
}
}
The CountMapper
is a RichFlatMapFunction
which assumes a grouped-by-key input stream of the form(word, 1)
. The function keeps a counter for each incoming key (ValueState<Integer> counter
) and ifthe number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emittedcontaining the word itself and the number of occurrences.
The BufferingSink
is a SinkFunction
that receives elements (potentially the output of the CountMapper
)and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink.This is a common way to avoid many expensive calls to a database or an external storage system. To do thebuffering in a fault-tolerant manner, the buffered elements are kept in a list (bufferedElements
) which isperiodically checkpointed.
State API Migration
To leverage the new features of Flink 1.2, the code above should be modified to use the new state abstractions.After doing these changes, you will be able to change the parallelism of your job (scale up or down) and youare guaranteed that the new version of your job will start from where its predecessor left off.
Keyed State: Something to note before delving into the details of the migration process is that if your functionhas only keyed state, then the exact same code from Flink 1.1 also works for Flink 1.2 with full supportfor the new features and full backwards compatibility. Changes could be made just for better code organization,but this is just a matter of style.
With the above said, the rest of this section focuses on the non-keyed state.
Rescaling and new state abstractions
The first modification is the transition from the old Checkpointed<T extends Serializable>
state interfaceto the new ones. In Flink 1.2, a stateful function can implement either the more general CheckpointedFunction
interface, or the ListCheckpointed<T extends Serializable>
interface, which is semantically closer to the oldCheckpointed
one.
In both cases, the non-keyed state is expected to be a List
of serializable objects, independent from each other,thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at whichnon-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the BufferingSink
contains elements (test1, 2)
and (test2, 2)
, when increasing the parallelism to 2, (test1, 2)
may end up in task 0,while (test2, 2)
will go to task 1.
More details on the principles behind rescaling of both keyed state and non-keyed state can be found inthe State documentation.
ListCheckpointed
The ListCheckpointed
interface requires the implementation of two methods:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
Their semantics are the same as their counterparts in the old Checkpointed
interface. The only differenceis that now snapshotState()
should return a list of objects to checkpoint, as stated earlier, andrestoreState
has to handle this list upon recovery. If the state is not re-partitionable, you can alwaysreturn a Collections.singletonList(MY_STATE)
in the snapshotState()
. The updated code for BufferingSink
is included below:
public class BufferingSinkListCheckpointed implements
SinkFunction<Tuple2<String, Integer>>,
ListCheckpointed<Tuple2<String, Integer>>,
CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSinkListCheckpointed(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
this.bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public List<Tuple2<String, Integer>> snapshotState(
long checkpointId, long timestamp) throws Exception {
return this.bufferedElements;
}
@Override
public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
if (!state.isEmpty()) {
this.bufferedElements.addAll(state);
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
As shown in the code, the updated function also implements the CheckpointedRestoring
interface. This is for backwardscompatibility reasons and more details will be explained at the end of this section.
CheckpointedFunction
The CheckpointedFunction
interface requires again the implementation of two methods:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
As in Flink 1.1, snapshotState()
is called whenever a checkpoint is performed, but now initializeState()
(which isthe counterpart of the restoreState()
) is called every time the user-defined function is initialized, rather than onlyin the case that we are recovering from a failure. Given this, initializeState()
is not only the place where differenttypes of state are initialized, but also where state recovery logic is included. An implementation of theCheckpointedFunction
interface for BufferingSink
is presented below.
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
checkpointedState = context.getOperatorStateStore().
getSerializableListState("buffered-elements");
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
@Override
public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
// this is from the CheckpointedRestoring interface.
this.bufferedElements.addAll(state);
}
}
The initializeState
takes as argument a FunctionInitializationContext
. This is used to initializethe non-keyed state “container”. This is a container of type ListState
where the non-keyed state objectsare going to be stored upon checkpointing:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
After initializing the container, we use the isRestored()
method of the context to check if we arerecovering after a failure. If this is true
, i.e. we are recovering, the restore logic is applied.
As shown in the code of the modified BufferingSink
, this ListState
recovered during stateinitialization is kept in a class variable for future use in snapshotState()
. There the ListState
is clearedof all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint.
As a side note, the keyed state can also be initialized in the initializeState()
method. This can be doneusing the FunctionInitializationContext
given as argument, instead of the RuntimeContext
, which is the casefor Flink 1.1. If the CheckpointedFunction
interface was to be used in the CountMapper
example,the old open()
method could be removed and the new snapshotState()
and initializeState()
methodswould look like this:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
implements CheckpointedFunction {
private transient ValueState<Integer> counter;
private final int numberElements;
public CountMapper(int numberElements) {
this.numberElements = numberElements;
}
@Override
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int count = counter.value() + 1;
counter.update(count);
if (count % numberElements == 0) {
out.collect(Tuple2.of(value.f0, count));
counter.update(0); // reset to 0
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
counter = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("counter", Integer.class, 0));
}
}
Notice that the snapshotState()
method is empty as Flink itself takes care of snapshotting managed keyed stateupon checkpointing.
Backwards compatibility with Flink 1.1
So far we have seen how to modify our functions to take advantage of the new features introduced by Flink 1.2.The question that remains is “Can I make sure that my modified (Flink 1.2) job will start from where my alreadyrunning job from Flink 1.1 stopped?”.
The answer is yes, and the way to do it is pretty straightforward. For the keyed state, you have to do nothing.Flink will take care of restoring the state from Flink 1.1. For the non-keyed state, your new function has toimplement the CheckpointedRestoring
interface, as shown in the code above. This has a single method, thefamiliar restoreState()
from the old Checkpointed
interface from Flink 1.1. As shown in the modified code ofthe BufferingSink
, the restoreState()
method is identical to its predecessor.
Aligned Processing Time Window Operators
In Flink 1.1, and only when operating on processing time with no specified evictor or trigger,the command timeWindow()
on a keyed stream would instantiate a special type of WindowOperator
. This could beeither an AggregatingProcessingTimeWindowOperator
or an AccumulatingProcessingTimeWindowOperator
. Both ofthese operators are referred to as aligned window operators as they assume their input elements arrive inorder. This is valid when operating in processing time, as elements get as timestamp the wall-clock time atthe moment they arrive at the window operator. These operators were restricted to using the memory state backend, andhad optimized data structures for storing the per-window elements which leveraged the in-order input element arrival.
In Flink 1.2, the aligned window operators are deprecated, and all windowing operations go through the genericWindowOperator
. This migration requires no change in the code of your Flink 1.1 job, as Flink will transparentlyread the state stored by the aligned window operators in your Flink 1.1 savepoint, translate it into a formatthat is compatible with the generic WindowOperator
, and resume execution using the generic WindowOperator
.
Note Although deprecated, you can still use the aligned window operatorsin Flink 1.2 through special WindowAssigners
introduced for exactly this purpose. These assigners are theSlidingAlignedProcessingTimeWindows
and the TumblingAlignedProcessingTimeWindows
assigners, for sliding and tumblingwindows respectively. A Flink 1.2 job that uses aligned windowing has to be a new job, as there is no way toresume execution from a Flink 1.1 savepoint while using these operators.
Attention The aligned window operators provide no rescaling capabilitiesand no backwards compatibility with Flink 1.1.
The code to use the aligned window operators in Flink 1.2 is presented below:
// for tumbling windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)
// for tumbling windows
val window1 = source
.keyBy(0)
.window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS)))
.apply(your-function)
// for sliding windows
val window2 = source
.keyBy(0)
.window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
.apply(your-function)