Time Attributes

Flink can process data based on different notions of time.

  • Processing time refers to the machine’s system time (also known as epoch time, e.g. Java’s System.currentTimeMillis()) that is executing the respective operation.
  • Event time refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened.

For more information about time handling in Flink, see the introduction about event time and watermarks.

Introduction to Time Attributes

Time attributes can be part of every table schema. They are defined when creating a table from a CREATE TABLE DDL or a DataStream. Once a time attribute is defined, it can be referenced as a field and used in time-based operations. As long as a time attribute is not modified, and is simply forwarded from one part of a query to another, it remains a valid time attribute. Time attributes behave like regular timestamps, and are accessible for calculations. When used in calculations, time attributes are materialized and act as standard timestamps. However, ordinary timestamps cannot be used in place of, or be converted to, time attributes.

Event Time

Event time allows a table program to produce results based on timestamps in every record, allowing for consistent results despite out-of-order or late events. It also ensures the replayability of the results of the table program when reading records from persistent storage.

Additionally, event time allows for unified syntax for table programs in both batch and streaming environments. A time attribute in a streaming environment can be a regular column of a row in a batch environment.

To handle out-of-order events and to distinguish between on-time and late events in streaming, Flink needs to know the timestamp for each row, and it also needs regular indications of how far along in event time the processing has progressed so far (via so-called watermarks).

Event time attributes can be defined in CREATE table DDL or during DataStream-to-Table conversion.

Defining in DDL

The event time attribute is defined using a WATERMARK statement in CREATE table DDL. A watermark statement defines a watermark generation expression on an existing event time field, which marks the event time field as the event time attribute. Please see CREATE TABLE DDL for more information about watermark statement and watermark strategies.

Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column. If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. 2020-04-15 20:13:40.564, it’s recommended to define the event time attribute as a TIMESTAMP column::

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. user_action_time TIMESTAMP(3),
  5. -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  6. WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  7. ) WITH (
  8. ...
  9. );
  10. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  11. FROM user_actions
  12. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. 1618989564564, it’s recommended to define event time attribute as a TIMESTAMP_LTZ column:

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. ts BIGINT,
  5. time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  6. -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
  7. WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
  8. ) WITH (
  9. ...
  10. );
  11. SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  12. FROM user_actions
  13. GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

During DataStream-to-Table Conversion

When converting a DataStream to a table, an event time attribute can be defined with the .rowtime property during schema definition. Timestamps and watermarks must have already been assigned in the DataStream being converted. During the conversion, Flink always derives rowtime attribute as TIMESTAMP WITHOUT TIME ZONE, because DataStream doesn’t have time zone notion, and treats all event time values as in UTC.

There are two ways of defining the time attribute when converting a DataStream into a Table. Depending on whether the specified .rowtime field name exists in the schema of the DataStream, the timestamp is either (1) appended as a new column, or it (2) replaces an existing column.

In either case, the event time timestamp field will hold the value of the DataStream event time timestamp.

Java

  1. // Option 1:
  2. // extract timestamp and assign watermarks based on knowledge of the stream
  3. DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  4. // declare an additional logical field as an event time attribute
  5. Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
  6. // Option 2:
  7. // extract timestamp from first field, and assign watermarks based on knowledge of the stream
  8. DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
  9. // the first field has been used for timestamp extraction, and is no longer necessary
  10. // replace first field with a logical event time attribute
  11. Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
  12. // Usage:
  13. WindowedTable windowedTable = table.window(Tumble
  14. .over(lit(10).minutes())
  15. .on($("user_action_time"))
  16. .as("userActionWindow"));

Scala

  1. // Option 1:
  2. // extract timestamp and assign watermarks based on knowledge of the stream
  3. val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  4. // declare an additional logical field as an event time attribute
  5. val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
  6. // Option 2:
  7. // extract timestamp from first field, and assign watermarks based on knowledge of the stream
  8. val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
  9. // the first field has been used for timestamp extraction, and is no longer necessary
  10. // replace first field with a logical event time attribute
  11. val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
  12. // Usage:
  13. val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

Python

  1. # Option 1:
  2. # extract timestamp and assign watermarks based on knowledge of the stream
  3. stream = input_stream.assign_timestamps_and_watermarks(...)
  4. table = t_env.from_data_stream(stream, col('user_name'), col('data'), col('user_action_time').rowtime)
  5. # Option 2:
  6. # extract timestamp from first field, and assign watermarks based on knowledge of the stream
  7. stream = input_stream.assign_timestamps_and_watermarks(...)
  8. # the first field has been used for timestamp extraction, and is no longer necessary
  9. # replace first field with a logical event time attribute
  10. table = t_env.from_data_stream(stream, col("user_action_time").rowtime, col('user_name'), col('data'))
  11. # Usage:
  12. table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))

Processing Time

Processing time allows a table program to produce results based on the time of the local machine. It is the simplest notion of time, but it will generate non-deterministic results. Processing time does not require timestamp extraction or watermark generation.

There are two ways to define a processing time attribute.

Defining in DDL

The processing time attribute is defined as a computed column in CREATE table DDL using the system PROCTIME() function, the function return type is TIMESTAMP_LTZ. Please see CREATE TABLE DDL for more information about computed column.

  1. CREATE TABLE user_actions (
  2. user_name STRING,
  3. data STRING,
  4. user_action_time AS PROCTIME() -- declare an additional field as a processing time attribute
  5. ) WITH (
  6. ...
  7. );
  8. SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
  9. FROM user_actions
  10. GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

During DataStream-to-Table Conversion

The processing time attribute is defined with the .proctime property during schema definition. The time attribute must only extend the physical schema by an additional logical field. Thus, it is only definable at the end of the schema definition.

Java

  1. DataStream<Tuple2<String, String>> stream = ...;
  2. // declare an additional logical field as a processing time attribute
  3. Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());
  4. WindowedTable windowedTable = table.window(
  5. Tumble.over(lit(10).minutes())
  6. .on($("user_action_time"))
  7. .as("userActionWindow"));

Scala

  1. val stream: DataStream[(String, String)] = ...
  2. // declare an additional logical field as a processing time attribute
  3. val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
  4. val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time" as "userActionWindow")

Python

  1. stream = ...
  2. # declare an additional logical field as a processing time attribute
  3. table = t_env.from_data_stream(stream, col("UserActionTimestamp"), col("user_name"), col("data"), col("user_action_time").proctime)
  4. windowed_table = table.window(Tumble.over(lit(10).minutes).on(col("user_action_time")).alias("userActionWindow"))