Overview

Flink provides rich data types for Date and Time, including DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND (please see Date and Time for detailed information). Flink supports setting time zone in session level (please see table.local-time-zone for detailed information). These timestamp data types and time zone support of Flink make it easy to process business data across time zones.

TIMESTAMP vs TIMESTAMP_LTZ

TIMESTAMP type

  • TIMESTAMP(p) is an abbreviation for TIMESTAMP(p) WITHOUT TIME ZONE, the precision p supports range is from 0 to 9, 6 by default.
  • TIMESTAMP describes a timestamp represents year, month, day, hour, minute, second and fractional seconds.
  • TIMESTAMP can be specified from a string literal, e.g.
  1. Flink SQL> SELECT TIMESTAMP '1970-01-01 00:00:04.001';
  2. +-------------------------+
  3. | 1970-01-01 00:00:04.001 |
  4. +-------------------------+

TIMESTAMP_LTZ type

  • TIMESTAMP_LTZ(p) is an abbreviation for TIMESTAMP(p) WITH LOCAL TIME ZONE, the precision p supports range is from 0 to 9, 6 by default.
  • TIMESTAMP_LTZ describes an absolute time point on the time-line, it stores a long value representing epoch-milliseconds and an int representing nanosecond-of-millisecond. The epoch time is measured from the standard Java epoch of 1970-01-01T00:00:00Z. Every datum of TIMESTAMP_LTZ type is interpreted in the local time zone configured in the current session for computation and visualization.
  • TIMESTAMP_LTZ has no literal representation and thus can not specify from literal, it can derives from a long epoch time(e.g. The long time produced by Java System.currentTimeMillis())
  1. Flink SQL> CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(4001, 3);
  2. Flink SQL> SET table.local-time-zone=UTC;
  3. Flink SQL> SELECT * FROM T1;
  4. +---------------------------+
  5. | TO_TIMESTAMP_LTZ(4001, 3) |
  6. +---------------------------+
  7. | 1970-01-01 00:00:04.001 |
  8. +---------------------------+
  9. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  10. Flink SQL> SELECT * FROM T1;
  11. +---------------------------+
  12. | TO_TIMESTAMP_LTZ(4001, 3) |
  13. +---------------------------+
  14. | 1970-01-01 08:00:04.001 |
  15. +---------------------------+
  • TIMESTAMP_LTZ can be used in cross time zones business because the absolute time point (e.g. above 4001 milliseconds) describes a same instantaneous point in different time zones. Giving a background that at a same time point, the System.currentTimeMillis() of all machines in the world returns same value (e.g. the 4001 milliseconds in above example), this is absolute time point meaning.

Time Zone Usage

The local time zone defines current session time zone id. You can config the time zone in Sql Client or Applications.

SQL Client

  1. -- set to UTC time zone
  2. Flink SQL> SET table.local-time-zone=UTC;
  3. -- set to Shanghai time zone
  4. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  5. -- set to Los_Angeles time zone
  6. Flink SQL> SET table.local-time-zone=America/Los_Angeles;

Java

  1. EnvironmentSettings envSetting = EnvironmentSettings.newInstance().build();
  2. TableEnvironment tEnv = TableEnvironment.create(envSetting);
  3. // set to UTC time zone
  4. tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
  5. // set to Shanghai time zone
  6. tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
  7. // set to Los_Angeles time zone
  8. tEnv.getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));

Scala

  1. val envSetting = EnvironmentSettings.newInstance.build
  2. val tEnv = TableEnvironment.create(envSetting)
  3. // set to UTC time zone
  4. tEnv.getConfig.setLocalTimeZone(ZoneId.of("UTC"))
  5. // set to Shanghai time zone
  6. tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
  7. // set to Los_Angeles time zone
  8. tEnv.getConfig.setLocalTimeZone(ZoneId.of("America/Los_Angeles"))

The session time zone is useful in Flink SQL, the main usages are:

Decide time functions return value

The following time functions is influenced by the configured time zone.

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • CURRENT_ROW_TIMESTAMP()
  • NOW()
  • PROCTIME()
  1. Flink SQL> SET sql-client.execution.result-mode=tableau;
  2. Flink SQL> CREATE VIEW MyView1 AS SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_ROW_TIMESTAMP(), NOW(), PROCTIME();
  3. Flink SQL> DESC MyView1;
  1. +------------------------+-----------------------------+-------+-----+--------+-----------+
  2. | name | type | null | key | extras | watermark |
  3. +------------------------+-----------------------------+-------+-----+--------+-----------+
  4. | LOCALTIME | TIME(0) | false | | | |
  5. | LOCALTIMESTAMP | TIMESTAMP(3) | false | | | |
  6. | CURRENT_DATE | DATE | false | | | |
  7. | CURRENT_TIME | TIME(0) | false | | | |
  8. | CURRENT_TIMESTAMP | TIMESTAMP_LTZ(3) | false | | | |
  9. |CURRENT_ROW_TIMESTAMP() | TIMESTAMP_LTZ(3) | false | | | |
  10. | NOW() | TIMESTAMP_LTZ(3) | false | | | |
  11. | PROCTIME() | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
  12. +------------------------+-----------------------------+-------+-----+--------+-----------+
  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT * FROM MyView1;
  1. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
  2. | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
  3. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
  4. | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 | 15:18:36 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 | 2021-04-15 15:18:36.384 |
  5. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT * FROM MyView1;
  1. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
  2. | LOCALTIME | LOCALTIMESTAMP | CURRENT_DATE | CURRENT_TIME | CURRENT_TIMESTAMP | CURRENT_ROW_TIMESTAMP() | NOW() | PROCTIME() |
  3. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
  4. | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 | 23:18:36 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 | 2021-04-15 23:18:36.384 |
  5. +-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+

TIMESTAMP_LTZ string representation

The session timezone is used when represents a TIMESTAMP_LTZ value to string format, i.e print the value, cast the value to STRING type, cast the value to TIMESTAMP, cast a TIMESTAMP value to TIMESTAMP_LTZ:

  1. Flink SQL> CREATE VIEW MyView2 AS SELECT TO_TIMESTAMP_LTZ(4001, 3) AS ltz, TIMESTAMP '1970-01-01 00:00:01.001' AS ntz;
  2. Flink SQL> DESC MyView2;
  1. +------+------------------+-------+-----+--------+-----------+
  2. | name | type | null | key | extras | watermark |
  3. +------+------------------+-------+-----+--------+-----------+
  4. | ltz | TIMESTAMP_LTZ(3) | true | | | |
  5. | ntz | TIMESTAMP(3) | false | | | |
  6. +------+------------------+-------+-----+--------+-----------+
  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT * FROM MyView2;
  1. +-------------------------+-------------------------+
  2. | ltz | ntz |
  3. +-------------------------+-------------------------+
  4. | 1970-01-01 00:00:04.001 | 1970-01-01 00:00:01.001 |
  5. +-------------------------+-------------------------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT * FROM MyView2;
  1. +-------------------------+-------------------------+
  2. | ltz | ntz |
  3. +-------------------------+-------------------------+
  4. | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 |
  5. +-------------------------+-------------------------+
  1. Flink SQL> CREATE VIEW MyView3 AS SELECT ltz, CAST(ltz AS TIMESTAMP(3)), CAST(ltz AS STRING), ntz, CAST(ntz AS TIMESTAMP_LTZ(3)) FROM MyView2;
  1. Flink SQL> DESC MyView3;
  2. +-------------------------------+------------------+-------+-----+--------+-----------+
  3. | name | type | null | key | extras | watermark |
  4. +-------------------------------+------------------+-------+-----+--------+-----------+
  5. | ltz | TIMESTAMP_LTZ(3) | true | | | |
  6. | CAST(ltz AS TIMESTAMP(3)) | TIMESTAMP(3) | true | | | |
  7. | CAST(ltz AS STRING) | STRING | true | | | |
  8. | ntz | TIMESTAMP(3) | false | | | |
  9. | CAST(ntz AS TIMESTAMP_LTZ(3)) | TIMESTAMP_LTZ(3) | false | | | |
  10. +-------------------------------+------------------+-------+-----+--------+-----------+
  1. Flink SQL> SELECT * FROM MyView3;
  1. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
  2. | ltz | CAST(ltz AS TIMESTAMP(3)) | CAST(ltz AS STRING) | ntz | CAST(ntz AS TIMESTAMP_LTZ(3)) |
  3. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+
  4. | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 08:00:04.001 | 1970-01-01 00:00:01.001 | 1970-01-01 00:00:01.001 |
  5. +-------------------------+---------------------------+-------------------------+-------------------------+-------------------------------+

Time Attribute and Time Zone

Please see Time Attribute for more information about time attribute.

Processing Time and Time Zone

Flink SQL defines process time attribute by function PROCTIME(), the function return type is TIMESTAMP_LTZ.

Before Flink 1.13, the function return type of PROCTIME() is TIMESTAMP, and the return value is the TIMESTAMP in UTC time zone, e.g. the wall-clock shows 2021-03-01 12:00:00 at Shanghai, however the PROCTIME() displays 2021-03-01 04:00:00 which is wrong. Flink 1.13 fixes this issue and uses TIMESTAMP_LTZ type as return type of PROCTIME(), users don’t need to deal time zone problems anymore.

The PROCTIME() always represents your local timestamp value, using TIMESTAMP_LTZ type can also support DayLight Saving Time well.

  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT PROCTIME();
  1. +-------------------------+
  2. | PROCTIME() |
  3. +-------------------------+
  4. | 2021-04-15 14:48:31.387 |
  5. +-------------------------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT PROCTIME();
  1. +-------------------------+
  2. | PROCTIME() |
  3. +-------------------------+
  4. | 2021-04-15 22:48:31.387 |
  5. +-------------------------+
  1. Flink SQL> CREATE TABLE MyTable1 (
  2. item STRING,
  3. price DOUBLE,
  4. proctime as PROCTIME()
  5. ) WITH (
  6. 'connector' = 'socket',
  7. 'hostname' = '127.0.0.1',
  8. 'port' = '9999',
  9. 'format' = 'csv'
  10. );
  11. Flink SQL> CREATE VIEW MyView3 AS
  12. SELECT
  13. TUMBLE_START(proctime, INTERVAL '10' MINUTES) AS window_start,
  14. TUMBLE_END(proctime, INTERVAL '10' MINUTES) AS window_end,
  15. TUMBLE_PROCTIME(proctime, INTERVAL '10' MINUTES) as window_proctime,
  16. item,
  17. MAX(price) as max_price
  18. FROM MyTable1
  19. GROUP BY TUMBLE(proctime, INTERVAL '10' MINUTES), item;
  20. Flink SQL> DESC MyView3;
  1. +-----------------+-----------------------------+-------+-----+--------+-----------+
  2. | name | type | null | key | extras | watermark |
  3. +-----------------+-----------------------------+-------+-----+--------+-----------+
  4. | window_start | TIMESTAMP(3) | false | | | |
  5. | window_end | TIMESTAMP(3) | false | | | |
  6. | window_proctime | TIMESTAMP_LTZ(3) *PROCTIME* | false | | | |
  7. | item | STRING | true | | | |
  8. | max_price | DOUBLE | true | | | |
  9. +-----------------+-----------------------------+-------+-----+--------+-----------+

Use the following command to ingest data for MyTable1 in a terminal:

  1. > nc -lk 9999
  2. A,1.1
  3. B,1.2
  4. A,1.8
  5. B,2.5
  6. C,3.8
  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT * FROM MyView3;
  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_procime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.005 | A | 1.8 |
  5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | B | 2.5 |
  6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:10:00.007 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT * FROM MyView3;

Returns the different window start, window end and window proctime compared to calculation in UTC timezone.

  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_procime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.005 | A | 1.8 |
  5. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | B | 2.5 |
  6. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:10:00.007 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+

Processing time window is non-deterministic, so each run will get different windows and different aggregations. The above example is just for explaining how time zone affects processing time window.

Event Time and Time Zone

Flink supports defining event time attribute on TIMESTAMP column and TIMESTAMP_LTZ column.

Event Time Attribute on TIMESTAMP

If the timestamp data in the source is represented as year-month-day-hour-minute-second, usually a string value without time-zone information, e.g. 2020-04-15 20:13:40.564, it’s recommended to define the event time attribute as a TIMESTAMP column:

  1. Flink SQL> CREATE TABLE MyTable2 (
  2. item STRING,
  3. price DOUBLE,
  4. ts TIMESTAMP(3), -- TIMESTAMP data type
  5. WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
  6. ) WITH (
  7. 'connector' = 'socket',
  8. 'hostname' = '127.0.0.1',
  9. 'port' = '9999',
  10. 'format' = 'csv'
  11. );
  12. Flink SQL> CREATE VIEW MyView4 AS
  13. SELECT
  14. TUMBLE_START(ts, INTERVAL '10' MINUTES) AS window_start,
  15. TUMBLE_END(ts, INTERVAL '10' MINUTES) AS window_end,
  16. TUMBLE_ROWTIME(ts, INTERVAL '10' MINUTES) as window_rowtime,
  17. item,
  18. MAX(price) as max_price
  19. FROM MyTable2
  20. GROUP BY TUMBLE(ts, INTERVAL '10' MINUTES), item;
  21. Flink SQL> DESC MyView4;
  1. +----------------+------------------------+------+-----+--------+-----------+
  2. | name | type | null | key | extras | watermark |
  3. +----------------+------------------------+------+-----+--------+-----------+
  4. | window_start | TIMESTAMP(3) | true | | | |
  5. | window_end | TIMESTAMP(3) | true | | | |
  6. | window_rowtime | TIMESTAMP(3) *ROWTIME* | true | | | |
  7. | item | STRING | true | | | |
  8. | max_price | DOUBLE | true | | | |
  9. +----------------+------------------------+------+-----+--------+-----------+

Use the following command to ingest data for MyTable2 in a terminal:

  1. > nc -lk 9999
  2. A,1.1,2021-04-15 14:01:00
  3. B,1.2,2021-04-15 14:02:00
  4. A,1.8,2021-04-15 14:03:00
  5. B,2.5,2021-04-15 14:04:00
  6. C,3.8,2021-04-15 14:05:00
  7. C,3.8,2021-04-15 14:11:00
  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT * FROM MyView4;
  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_rowtime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
  5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
  6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT * FROM MyView4;

Returns the same window start, window end and window rowtime compared to calculation in UTC timezone.

  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_rowtime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
  5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
  6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+

Event Time Attribute on TIMESTAMP_LTZ

If the timestamp data in the source is represented as a epoch time, usually a long value, e.g. 1618989564564, it’s recommended to define event time attribute as a TIMESTAMP_LTZ column.

  1. Flink SQL> CREATE TABLE MyTable3 (
  2. item STRING,
  3. price DOUBLE,
  4. ts BIGINT, -- long time value in epoch milliseconds
  5. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  6. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND
  7. ) WITH (
  8. 'connector' = 'socket',
  9. 'hostname' = '127.0.0.1',
  10. 'port' = '9999',
  11. 'format' = 'csv'
  12. );
  13. Flink SQL> CREATE VIEW MyView5 AS
  14. SELECT
  15. TUMBLE_START(ts_ltz, INTERVAL '10' MINUTES) AS window_start,
  16. TUMBLE_END(ts_ltz, INTERVAL '10' MINUTES) AS window_end,
  17. TUMBLE_ROWTIME(ts_ltz, INTERVAL '10' MINUTES) as window_rowtime,
  18. item,
  19. MAX(price) as max_price
  20. FROM MyTable3
  21. GROUP BY TUMBLE(ts_ltz, INTERVAL '10' MINUTES), item;
  22. Flink SQL> DESC MyView5;
  1. +----------------+----------------------------+-------+-----+--------+-----------+
  2. | name | type | null | key | extras | watermark |
  3. +----------------+----------------------------+-------+-----+--------+-----------+
  4. | window_start | TIMESTAMP(3) | false | | | |
  5. | window_end | TIMESTAMP(3) | false | | | |
  6. | window_rowtime | TIMESTAMP_LTZ(3) *ROWTIME* | true | | | |
  7. | item | STRING | true | | | |
  8. | max_price | DOUBLE | true | | | |
  9. +----------------+----------------------------+-------+-----+--------+-----------+

The input data of MyTable3 is:

  1. A,1.1,1618495260000 # The corresponding utc timestamp is 2021-04-15 14:01:00
  2. B,1.2,1618495320000 # The corresponding utc timestamp is 2021-04-15 14:02:00
  3. A,1.8,1618495380000 # The corresponding utc timestamp is 2021-04-15 14:03:00
  4. B,2.5,1618495440000 # The corresponding utc timestamp is 2021-04-15 14:04:00
  5. C,3.8,1618495500000 # The corresponding utc timestamp is 2021-04-15 14:05:00
  6. C,3.8,1618495860000 # The corresponding utc timestamp is 2021-04-15 14:11:00
  1. Flink SQL> SET table.local-time-zone=UTC;
  2. Flink SQL> SELECT * FROM MyView5;
  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_rowtime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | A | 1.8 |
  5. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | B | 2.5 |
  6. | 2021-04-15 14:00:00.000 | 2021-04-15 14:10:00.000 | 2021-04-15 14:09:59.999 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+
  1. Flink SQL> SET table.local-time-zone=Asia/Shanghai;
  2. Flink SQL> SELECT * FROM MyView5;

Returns the different window start, window end and window rowtime compared to calculation in UTC timezone.

  1. +-------------------------+-------------------------+-------------------------+------+-----------+
  2. | window_start | window_end | window_rowtime | item | max_price |
  3. +-------------------------+-------------------------+-------------------------+------+-----------+
  4. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | A | 1.8 |
  5. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | B | 2.5 |
  6. | 2021-04-15 22:00:00.000 | 2021-04-15 22:10:00.000 | 2021-04-15 22:09:59.999 | C | 3.8 |
  7. +-------------------------+-------------------------+-------------------------+------+-----------+

Daylight Saving Time Support

Flink SQL supports defining time attributes on TIMESTAMP_LTZ column, base on this, Flink SQL gracefully uses TIMESTAMP and TIMESTAMP_LTZ type in window processing to support the Daylight Saving Time.

Flink use timestamp literal to split the window and assigns window to data according to the epoch time of the each row. It means Flink uses TIMESTAMP type for window start and window end (e.g. TUMBLE_START and TUMBLE_END), uses TIMESTAMP_LTZ for window time attribute (e.g. TUMBLE_PROCTIME, TUMBLE_ROWTIME). Given a example of tumble window, the DaylightTime in Los_Angele starts at time 2021-03-14 02:00:00:

  1. long epoch1 = 1615708800000L; // 2021-03-14 00:00:00
  2. long epoch2 = 1615712400000L; // 2021-03-14 01:00:00
  3. long epoch3 = 1615716000000L; // 2021-03-14 03:00:00, skip one hour (2021-03-14 02:00:00)
  4. long epoch4 = 1615719600000L; // 2021-03-14 04:00:00

The tumble window [2021-03-14 00:00:00, 2021-03-14 00:04:00] will collect 3 hours’ data in Los_angele time zone, but it collect 4 hours’ data in other non-DST time zones, what user to do is only define time attribute on TIMESTAMP_LTZ column.

All windows in Flink like Hop window, Session window, Cumulative window follow this way, and all operations in Flink SQL support TIMESTAMP_LTZ well, thus Flink gracefully supports the Daylight Saving Time zone.

Difference between Batch and Streaming Mode

The following time functions:

  • LOCALTIME
  • LOCALTIMESTAMP
  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW()

Flink evaluates their values according to execution mode. They are evaluated for each record in streaming mode. But in batch mode, they are evaluated once as the query starts and uses the same result for every row.

The following time functions are evaluated for each record no matter in batch or streaming mode:

  • CURRENT_ROW_TIMESTAMP()
  • PROCTIME()