Pre-defined Timestamp Extractors / Watermark Emitters
As described in timestamps and watermark handling,Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically,one can do so by implementing one of the AssignerWithPeriodicWatermarks
and AssignerWithPunctuatedWatermarks
interfaces, dependingon the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property ofthe incoming records, e.g. whenever a special element is encountered in the stream.
In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an examplefor custom implementations.
Assigners with ascending timestamps
The simplest special case for periodic watermark generation is the case where timestamps seen by a given source taskoccur in ascending order. In that case, the current timestamp can always act as a watermark, because no earlier timestamps willarrive.
Note that it is only necessary that timestamps are ascending per parallel data source task. For example, ifin a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary thattimestamps are ascending within each Kafka partition. Flink’s watermark merging mechanism will generate correctwatermarks whenever parallel streams are shuffled, unioned, connected, or merged.
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
Assigners allowing a fixed amount of lateness
Another example of periodic watermark generation is when the watermark lags behind the maximum (event-time) timestampseen in the stream by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in astream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period oftime for testing. For these cases, Flink provides the BoundedOutOfOrdernessTimestampExtractor
which takes as an argumentthe maxOutOfOrderness
, i.e. the maximum amount of time an element is allowed to be late before being ignored when computing thefinal result for the given window. Lateness corresponds to the result of t - t_w
, where t
is the (event-time) timestamp of anelement, and t_w
that of the previous watermark. If lateness > 0
then the element is considered late and is, by default, ignored when computingthe result of the job for its corresponding window. See the documentation about allowed latenessfor more information about working with late elements.
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))