Windowing table-valued functions (Windowing TVFs)

Batch Streaming

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality.

Apache Flink provides several window table-valued functions (TVF) to divide the elements of your table into windows, including:

Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows.

Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is part of the SQL 2016 standard, a special table-function, but can have a table as a parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are used semantically like tables, their invocation occurs in a FROM clause of a SELECT statement.

Windowing TVFs is a replacement of legacy Grouped Window Functions. Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, Grouped Window Functions can only support Window Aggregation.

See more how to apply further computations based on windowing TVF:

Window Functions

Apache Flink provides 4 built-in windowing TVFs: TUMBLE, HOP, CUMULATE and SESSION. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. In streaming mode, the “window_time” field is a time attributes of the window. In batch mode, the “window_time” field is an attribute of type TIMESTAMP or TIMESTAMP_LTZ based on input time field type. The “window_time” field can be used in subsequent time-based operations, e.g. another windowing TVF, or interval joins, over aggregations. The value of window_time always equal to window_end - 1ms.

TUMBLE

The TUMBLE function assigns each element to a window of specified window size. Tumbling windows have a fixed size and do not overlap. For example, suppose you specify a tumbling window with a size of 5 minutes. In that case, Flink will evaluate the current window, and a new window started every five minutes, as illustrated by the following figure.

Tumbling Windows

The TUMBLE function assigns a window for each row of a relation based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of TUMBLE is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after window TVF.

TUMBLE function takes three required parameters, one optional parameter:

  1. TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
  • data: is a table parameter that can be any relation with a time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to tumbling windows.
  • size: is a duration specifying the width of the tumbling windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. | name | type | null | key | extras | watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
  7. | price | DECIMAL(10, 2) | true | | | |
  8. | item | STRING | true | | | |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. | bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:05 | 4.00 | C |
  15. | 2020-04-15 08:07 | 2.00 | A |
  16. | 2020-04-15 08:09 | 5.00 | D |
  17. | 2020-04-15 08:11 | 3.00 | B |
  18. | 2020-04-15 08:13 | 1.00 | E |
  19. | 2020-04-15 08:17 | 6.00 | F |
  20. +------------------+-------+------+
  21. Flink SQL> SELECT * FROM TABLE(
  22. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
  23. -- or with the named params
  24. -- note: the DATA param must be the first
  25. Flink SQL> SELECT * FROM TABLE(
  26. TUMBLE(
  27. DATA => TABLE Bid,
  28. TIMECOL => DESCRIPTOR(bidtime),
  29. SIZE => INTERVAL '10' MINUTES));
  30. +------------------+-------+------+------------------+------------------+-------------------------+
  31. | bidtime | price | item | window_start | window_end | window_time |
  32. +------------------+-------+------+------------------+------------------+-------------------------+
  33. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  34. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  35. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  36. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  37. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  38. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  39. +------------------+-------+------+------------------+------------------+-------------------------+
  40. -- apply aggregation on the tumbling windowed table
  41. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  42. FROM TABLE(
  43. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  44. GROUP BY window_start, window_end;
  45. +------------------+------------------+-------------+
  46. | window_start | window_end | total_price |
  47. +------------------+------------------+-------------+
  48. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  49. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  50. +------------------+------------------+-------------+

Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. 2020-04-15 08:05 should be displayed as 2020-04-15 08:05:00.000 in Flink SQL Client if the type is TIMESTAMP(3).

HOP

The HOP function assigns elements to windows of fixed length. Like a TUMBLE windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case, elements are assigned to multiple windows. Hopping windows are also known as “sliding windows”.

For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure.

Hopping windows

The HOP function assigns windows that cover rows within the interval of size and shifting every slide based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of HOP is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after windowing TVF.

HOP takes four required parameters, one optional parameter:

  1. HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to hopping windows.
  • slide: is a duration specifying the duration between the start of sequential hopping windows
  • size: is a duration specifying the width of the hopping windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. > SELECT * FROM TABLE(
  2. HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6. HOP(
  7. DATA => TABLE Bid,
  8. TIMECOL => DESCRIPTOR(bidtime),
  9. SLIDE => INTERVAL '5' MINUTES,
  10. SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. | bidtime | price | item | window_start | window_end | window_time |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  15. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  16. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  18. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  21. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  22. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  23. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  24. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
  26. +------------------+-------+------+------------------+------------------+-------------------------+
  27. -- apply aggregation on the hopping windowed table
  28. > SELECT window_start, window_end, SUM(price) AS total_price
  29. FROM TABLE(
  30. HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  31. GROUP BY window_start, window_end;
  32. +------------------+------------------+-------------+
  33. | window_start | window_end | total_price |
  34. +------------------+------------------+-------------+
  35. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  36. | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
  37. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  38. | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
  39. +------------------+------------------+-------------+

CUMULATE

Cumulating windows are very useful in some scenarios, such as tumbling windows with early firing in a fixed window interval. For example, a daily dashboard draws cumulative UVs from 00:00 to every minute, the UV at 10:00 represents the total number of UV from 00:00 to 10:00. This can be easily and efficiently implemented by CUMULATE windowing.

The CUMULATE function assigns elements to windows that cover rows within an initial interval of step size and expand to one more step size (keep window start fixed) every step until the max window size. You can think CUMULATE function as applying TUMBLE windowing with max window size first, and split each tumbling windows into several windows with same window start and window ends of step-size difference. So cumulating windows do overlap and don’t have a fixed size.

For example, you could have a cumulating window for 1 hour step and 1 day max size, and you will get windows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) for every day.

Cumulating Windows

The CUMULATE functions assigns windows based on a time attribute column. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of CUMULATE is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after window TVF.

CUMULATE takes four required parameters, one optional parameter:

  1. CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to cumulating windows.
  • step: is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size: is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. > SELECT * FROM TABLE(
  2. CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6. CUMULATE(
  7. DATA => TABLE Bid,
  8. TIMECOL => DESCRIPTOR(bidtime),
  9. STEP => INTERVAL '2' MINUTES,
  10. SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. | bidtime | price | item | window_start | window_end | window_time |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
  15. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  16. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  18. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
  21. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  22. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  23. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  24. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  26. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  27. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  28. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  29. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  30. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  31. +------------------+-------+------+------------------+------------------+-------------------------+
  32. -- apply aggregation on the cumulating windowed table
  33. > SELECT window_start, window_end, SUM(price) AS total_price
  34. FROM TABLE(
  35. CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  36. GROUP BY window_start, window_end;
  37. +------------------+------------------+-------------+
  38. | window_start | window_end | total_price |
  39. +------------------+------------------+-------------+
  40. | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
  41. | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
  42. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  43. | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
  44. | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
  45. | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
  46. | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
  47. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  48. +------------------+------------------+-------------+

SESSION

Note:

  1. Session Window TVF is not supported in batch mode now.
  2. Session Window Aggregation does not support any optimization in Performance Tuning now.
  3. Session Window Join, Session Window TopN and Session Window Deduplication are conceptually supported and in beta mode. Issues can be reported in JIRA.

The SESSION function groups elements by sessions of activity. In contrast to TUMBLE windows and HOP windows, session windows do not overlap and do not have a fixed start and end time. Instead, a session window closes when it doesn’t receive elements for a certain period of time, i.e., when a gap of inactivity occurred. A session window should be configured with a static session gap which defines how long the period of inactivity is. When this period expires, the current session closes and subsequent elements are assigned to a new session window.

For example, you could have windows of gap 10 minutes. With this, when the interval between two events of the same user is less than 10 minutes, these events will be grouped into the same session window. If there is no data after 10 minutes following the latest event, then this session window will close and be sent downstream. Subsequent events will be assigned to a new session window.

Session windows

The SESSION function assigns windows that cover rows based on datetime. In streaming mode, the time attribute field must be either event or processing time attributes. The return value of SESSION is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after windowing TVF.

SESSION takes three required parameters and one optional parameter:

  1. SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)
  • data: is a table parameter that can be any relation with an time attribute column.
  • keycols: is a column descriptor indicating which columns should be used to partition the data prior to session windows.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to session windows.
  • gap: is the maximum interval in timestamp for two events to be considered part of the same session window.

Here is an example invocation on the Bid table:

  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. | name | type | null | key | extras | watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
  7. | price | DECIMAL(10, 2) | true | | | |
  8. | item | STRING | true | | | |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. | bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:07 | 4.00 | A |
  15. | 2020-04-15 08:06 | 2.00 | A |
  16. | 2020-04-15 08:09 | 5.00 | B |
  17. | 2020-04-15 08:08 | 3.00 | A |
  18. | 2020-04-15 08:17 | 1.00 | B |
  19. +------------------+-------+------+
  20. -- session window with partition keys
  21. > SELECT * FROM TABLE(
  22. SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
  23. -- or with the named params
  24. -- note: the DATA param must be the first
  25. > SELECT * FROM TABLE(
  26. SESSION(
  27. DATA => TABLE Bid PARTITION BY item,
  28. TIMECOL => DESCRIPTOR(bidtime),
  29. GAP => INTERVAL '5' MINUTES);
  30. +------------------+-------+------+------------------+------------------+-------------------------+
  31. | bidtime | price | item | window_start | window_end | window_time |
  32. +------------------+-------+------+------------------+------------------+-------------------------+
  33. | 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  34. | 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  35. | 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:13 | 2020-04-15 08:12:59.999 |
  36. | 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:09 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  37. | 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 | 2020-04-15 08:21:59.999 |
  38. +------------------+-------+------+------------------+------------------+-------------------------+
  39. -- apply aggregation on the session windowed table with partition keys
  40. > SELECT window_start, window_end, item, SUM(price) AS total_price
  41. FROM TABLE(
  42. SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  43. GROUP BY item, window_start, window_end;
  44. +------------------+------------------+------+-------------+
  45. | window_start | window_end | item | total_price |
  46. +------------------+------------------+------+-------------+
  47. | 2020-04-15 08:06 | 2020-04-15 08:13 | A | 9.00 |
  48. | 2020-04-15 08:09 | 2020-04-15 08:14 | B | 5.00 |
  49. | 2020-04-15 08:17 | 2020-04-15 08:22 | B | 1.00 |
  50. +------------------+------------------+------+-------------+
  51. -- session window without partition keys
  52. > SELECT * FROM TABLE(
  53. SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));
  54. -- or with the named params
  55. -- note: the DATA param must be the first
  56. > SELECT * FROM TABLE(
  57. SESSION(
  58. DATA => TABLE Bid,
  59. TIMECOL => DESCRIPTOR(bidtime),
  60. GAP => INTERVAL '5' MINUTES);
  61. +------------------+-------+------+------------------+------------------+-------------------------+
  62. | bidtime | price | item | window_start | window_end | window_time |
  63. +------------------+-------+------+------------------+------------------+-------------------------+
  64. | 2020-04-15 08:07 | 4.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  65. | 2020-04-15 08:06 | 2.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  66. | 2020-04-15 08:08 | 3.00 | A | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  67. | 2020-04-15 08:09 | 5.00 | B | 2020-04-15 08:06 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  68. | 2020-04-15 08:17 | 1.00 | B | 2020-04-15 08:17 | 2020-04-15 08:22 | 2020-04-15 08:21:59.999 |
  69. +------------------+-------+------+------------------+------------------+-------------------------+
  70. -- apply aggregation on the session windowed table without partition keys
  71. > SELECT window_start, window_end, SUM(price) AS total_price
  72. FROM TABLE(
  73. SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
  74. GROUP BY window_start, window_end;
  75. +------------------+------------------+-------------+
  76. | window_start | window_end | total_price |
  77. +------------------+------------------+-------------+
  78. | 2020-04-15 08:06 | 2020-04-15 08:14 | 14.00 |
  79. | 2020-04-15 08:17 | 2020-04-15 08:22 | 1.00 |
  80. +------------------+------------------+-------------+

Window Offset

Offset is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp 2021-06-30 00:00:04 for a Tumble window with 10 MINUTE as size?

  • If offset value is -16 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset value is -6 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset is -4 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If offset is 0, the record assigns to window [2021-06-30 00:00:00, 2021-06-30 00:10:00).
  • If offset is 4 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset is 6 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If offset is 16 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00). We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, -16 MINUTE, -6 MINUTE and 4 MINUTE have same effect for a Tumble window with 10 MINUTE as size.

Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.

We show an example to describe how to use offset in Tumble window in the following SQL.

  1. -- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
  2. -- window table-valued function should be used with aggregate operation,
  3. -- this example is just used for explaining the syntax and the data produced by table-valued function.
  4. Flink SQL> SELECT * FROM TABLE(
  5. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
  6. -- or with the named params
  7. -- note: the DATA param must be the first and `OFFSET` should be wrapped with double quotes
  8. Flink SQL> SELECT * FROM TABLE(
  9. TUMBLE(
  10. DATA => TABLE Bid,
  11. TIMECOL => DESCRIPTOR(bidtime),
  12. SIZE => INTERVAL '10' MINUTES,
  13. `OFFSET` => INTERVAL '1' MINUTES));
  14. +------------------+-------+------+------------------+------------------+-------------------------+
  15. | bidtime | price | item | window_start | window_end | window_time |
  16. +------------------+-------+------+------------------+------------------+-------------------------+
  17. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  18. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  21. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  22. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  23. +------------------+-------+------+------------------+------------------+-------------------------+
  24. -- apply aggregation on the tumbling windowed table
  25. Flink SQL> SELECT window_start, window_end, SUM(price) AS total_price
  26. FROM TABLE(
  27. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  28. GROUP BY window_start, window_end;
  29. +------------------+------------------+-------------+
  30. | window_start | window_end | total_price |
  31. +------------------+------------------+-------------+
  32. | 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
  33. | 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
  34. +------------------+------------------+-------------+

Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. 2020-04-15 08:05 should be displayed as 2020-04-15 08:05:00.000 in Flink SQL Client if the type is TIMESTAMP(3).