Windowing table-valued functions (Windowing TVFs)

Batch Streaming

Windows are at the heart of processing infinite streams. Windows split the stream into “buckets” of finite size, over which we can apply computations. This document focuses on how windowing is performed in Flink SQL and how the programmer can benefit to the maximum from its offered functionality.

Apache Flink provides several window table-valued functions (TVF) to divide the elements of your table into windows, including:

Note that each element can logically belong to more than one window, depending on the windowing table-valued function you use. For example, HOP windowing creates overlapping windows wherein a single element can be assigned to multiple windows.

Windowing TVFs are Flink defined Polymorphic Table Functions (abbreviated PTF). PTF is part of the SQL 2016 standard, a special table-function, but can have a table as a parameter. PTF is a powerful feature to change the shape of a table. Because PTFs are used semantically like tables, their invocation occurs in a FROM clause of a SELECT statement.

Windowing TVFs is a replacement of legacy Grouped Window Functions. Windowing TVFs is more SQL standard compliant and more powerful to support complex window-based computations, e.g. Window TopN, Window Join. However, Grouped Window Functions can only support Window Aggregation.

See more how to apply further computations based on windowing TVF:

Window Functions

Apache Flink provides 3 built-in windowing TVFs: TUMBLE, HOP and CUMULATE. The return value of windowing TVF is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. In streaming mode, the “window_time” field is a time attributes of the window. In batch mode, the “window_time” field is an attribute of type TIMESTAMP or TIMESTAMP_LTZ based on input time field type. The “window_time” field can be used in subsequent time-based operations, e.g. another windowing TVF, or interval joins, over aggregations. The value of window_time always equal to window_end - 1ms.

TUMBLE

The TUMBLE function assigns each element to a window of specified window size. Tumbling windows have a fixed size and do not overlap. For example, suppose you specify a tumbling window with a size of 5 minutes. In that case, Flink will evaluate the current window, and a new window started every five minutes, as illustrated by the following figure.

Tumbling Windows

The TUMBLE function assigns a window for each row of a relation based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of TUMBLE is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after window TVF.

TUMBLE function takes three required parameters, one optional parameter:

  1. TUMBLE(TABLE data, DESCRIPTOR(timecol), size [, offset ])
  • data: is a table parameter that can be any relation with a time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to tumbling windows.
  • size: is a duration specifying the width of the tumbling windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. | name | type | null | key | extras | watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
  7. | price | DECIMAL(10, 2) | true | | | |
  8. | item | STRING | true | | | |
  9. +-------------+------------------------+------+-----+--------+---------------------------------+
  10. Flink SQL> SELECT * FROM Bid;
  11. +------------------+-------+------+
  12. | bidtime | price | item |
  13. +------------------+-------+------+
  14. | 2020-04-15 08:05 | 4.00 | C |
  15. | 2020-04-15 08:07 | 2.00 | A |
  16. | 2020-04-15 08:09 | 5.00 | D |
  17. | 2020-04-15 08:11 | 3.00 | B |
  18. | 2020-04-15 08:13 | 1.00 | E |
  19. | 2020-04-15 08:17 | 6.00 | F |
  20. +------------------+-------+------+
  21. Flink SQL> SELECT * FROM TABLE(
  22. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
  23. -- or with the named params
  24. -- note: the DATA param must be the first
  25. Flink SQL> SELECT * FROM TABLE(
  26. TUMBLE(
  27. DATA => TABLE Bid,
  28. TIMECOL => DESCRIPTOR(bidtime),
  29. SIZE => INTERVAL '10' MINUTES));
  30. +------------------+-------+------+------------------+------------------+-------------------------+
  31. | bidtime | price | item | window_start | window_end | window_time |
  32. +------------------+-------+------+------------------+------------------+-------------------------+
  33. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  34. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  35. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  36. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  37. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  38. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  39. +------------------+-------+------+------------------+------------------+-------------------------+
  40. -- apply aggregation on the tumbling windowed table
  41. Flink SQL> SELECT window_start, window_end, SUM(price)
  42. FROM TABLE(
  43. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  44. GROUP BY window_start, window_end;
  45. +------------------+------------------+-------+
  46. | window_start | window_end | price |
  47. +------------------+------------------+-------+
  48. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  49. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  50. +------------------+------------------+-------+

Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. 2020-04-15 08:05 should be displayed as 2020-04-15 08:05:00.000 in Flink SQL Client if the type is TIMESTAMP(3).

HOP

The HOP function assigns elements to windows of fixed length. Like a TUMBLE windowing function, the size of the windows is configured by the window size parameter. An additional window slide parameter controls how frequently a hopping window is started. Hence, hopping windows can be overlapping if the slide is smaller than the window size. In this case, elements are assigned to multiple windows. Hopping windows are also known as “sliding windows”.

For example, you could have windows of size 10 minutes that slides by 5 minutes. With this, you get every 5 minutes a window that contains the events that arrived during the last 10 minutes, as depicted by the following figure.

Hopping windows

The HOP function assigns windows that cover rows within the interval of size and shifting every slide based on a time attribute field. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of HOP is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after windowing TVF.

HOP takes four required parameters, one optional parameter:

  1. HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to hopping windows.
  • slide: is a duration specifying the duration between the start of sequential hopping windows
  • size: is a duration specifying the width of the hopping windows.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. > SELECT * FROM TABLE(
  2. HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6. HOP(
  7. DATA => TABLE Bid,
  8. TIMECOL => DESCRIPTOR(bidtime),
  9. SLIDE => INTERVAL '5' MINUTES,
  10. SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. | bidtime | price | item | window_start | window_end | window_time |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  15. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  16. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  18. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  21. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  22. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
  23. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  24. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
  26. +------------------+-------+------+------------------+------------------+-------------------------+
  27. -- apply aggregation on the hopping windowed table
  28. > SELECT window_start, window_end, SUM(price)
  29. FROM TABLE(
  30. HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  31. GROUP BY window_start, window_end;
  32. +------------------+------------------+-------+
  33. | window_start | window_end | price |
  34. +------------------+------------------+-------+
  35. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  36. | 2020-04-15 08:05 | 2020-04-15 08:15 | 15.00 |
  37. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  38. | 2020-04-15 08:15 | 2020-04-15 08:25 | 6.00 |
  39. +------------------+------------------+-------+

CUMULATE

Cumulating windows are very useful in some scenarios, such as tumbling windows with early firing in a fixed window interval. For example, a daily dashboard draws cumulative UVs from 00:00 to every minute, the UV at 10:00 represents the total number of UV from 00:00 to 10:00. This can be easily and efficiently implemented by CUMULATE windowing.

The CUMULATE function assigns elements to windows that cover rows within an initial interval of step size and expand to one more step size (keep window start fixed) every step until the max window size. You can think CUMULATE function as applying TUMBLE windowing with max window size first, and split each tumbling windows into several windows with same window start and window ends of step-size difference. So cumulating windows do overlap and don’t have a fixed size.

For example, you could have a cumulating window for 1 hour step and 1 day max size, and you will get windows: [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00) for every day.

Cumulating Windows

The CUMULATE functions assigns windows based on a time attribute column. In streaming mode, the time attribute field must be either event or processing time attributes. In batch mode, the time attribute field of window table function must be an attribute of type TIMESTAMP or TIMESTAMP_LTZ. The return value of CUMULATE is a new relation that includes all columns of original relation as well as additional 3 columns named “window_start”, “window_end”, “window_time” to indicate the assigned window. The original time attribute “timecol” will be a regular timestamp column after window TVF.

CUMULATE takes four required parameters, one optional parameter:

  1. CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
  • data: is a table parameter that can be any relation with an time attribute column.
  • timecol: is a column descriptor indicating which time attributes column of data should be mapped to cumulating windows.
  • step: is a duration specifying the increased window size between the end of sequential cumulating windows.
  • size: is a duration specifying the max width of the cumulating windows. size must be an integral multiple of step.
  • offset: is an optional parameter to specify the offset which window start would be shifted by.

Here is an example invocation on the Bid table:

  1. > SELECT * FROM TABLE(
  2. CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES));
  3. -- or with the named params
  4. -- note: the DATA param must be the first
  5. > SELECT * FROM TABLE(
  6. CUMULATE(
  7. DATA => TABLE Bid,
  8. TIMECOL => DESCRIPTOR(bidtime),
  9. STEP => INTERVAL '2' MINUTES,
  10. SIZE => INTERVAL '10' MINUTES));
  11. +------------------+-------+------+------------------+------------------+-------------------------+
  12. | bidtime | price | item | window_start | window_end | window_time |
  13. +------------------+-------+------+------------------+------------------+-------------------------+
  14. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
  15. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  16. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  17. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
  18. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
  21. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  22. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  23. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  24. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  25. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
  26. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
  27. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  28. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  29. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
  30. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
  31. +------------------+-------+------+------------------+------------------+-------------------------+
  32. -- apply aggregation on the cumulating windowed table
  33. > SELECT window_start, window_end, SUM(price)
  34. FROM TABLE(
  35. CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
  36. GROUP BY window_start, window_end;
  37. +------------------+------------------+-------+
  38. | window_start | window_end | price |
  39. +------------------+------------------+-------+
  40. | 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
  41. | 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
  42. | 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
  43. | 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
  44. | 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
  45. | 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
  46. | 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
  47. | 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
  48. +------------------+------------------+-------+

Window Offset

Offset is an optional parameter which could be used to change the window assignment. It could be positive duration and negative duration. Default values for window offset is 0. The same record maybe assigned to the different window if set different offset value.
For example, which window would be assigned to for a record with timestamp 2021-06-30 00:00:04 for a Tumble window with 10 MINUTE as size?

  • If offset value is -16 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset value is -6 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset is -4 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If offset is 0, the record assigns to window [2021-06-30 00:00:00, 2021-06-30 00:10:00).
  • If offset is 4 MINUTE, the record assigns to window [2021-06-29 23:54:00, 2021-06-30 00:04:00).
  • If offset is 6 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00).
  • If offset is 16 MINUTE, the record assigns to window [2021-06-29 23:56:00, 2021-06-30 00:06:00). We could find that, some windows offset parameters may have same effect on the assignment of windows. In the above case, -16 MINUTE, -6 MINUTE and 4 MINUTE have same effect for a Tumble window with 10 MINUTE as size.

Note: The effect of window offset is just for updating window assignment, it has no effect on Watermark.

We show an example to describe how to use offset in Tumble window in the following SQL.

  1. -- NOTE: Currently Flink doesn't support evaluating individual window table-valued function,
  2. -- window table-valued function should be used with aggregate operation,
  3. -- this example is just used for explaining the syntax and the data produced by table-valued function.
  4. Flink SQL> SELECT * FROM TABLE(
  5. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES));
  6. -- or with the named params
  7. -- note: the DATA param must be the first
  8. Flink SQL> SELECT * FROM TABLE(
  9. TUMBLE(
  10. DATA => TABLE Bid,
  11. TIMECOL => DESCRIPTOR(bidtime),
  12. SIZE => INTERVAL '10' MINUTES,
  13. OFFSET => INTERVAL '1' MINUTES));
  14. +------------------+-------+------+------------------+------------------+-------------------------+
  15. | bidtime | price | item | window_start | window_end | window_time |
  16. +------------------+-------+------+------------------+------------------+-------------------------+
  17. | 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  18. | 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  19. | 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:01 | 2020-04-15 08:11 | 2020-04-15 08:10:59.999 |
  20. | 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  21. | 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  22. | 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:11 | 2020-04-15 08:21 | 2020-04-15 08:20:59.999 |
  23. +------------------+-------+------+------------------+------------------+-------------------------+
  24. -- apply aggregation on the tumbling windowed table
  25. Flink SQL> SELECT window_start, window_end, SUM(price)
  26. FROM TABLE(
  27. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES, INTERVAL '1' MINUTES))
  28. GROUP BY window_start, window_end;
  29. +------------------+------------------+-------+
  30. | window_start | window_end | price |
  31. +------------------+------------------+-------+
  32. | 2020-04-15 08:01 | 2020-04-15 08:11 | 11.00 |
  33. | 2020-04-15 08:11 | 2020-04-15 08:21 | 10.00 |
  34. +------------------+------------------+-------+

Note: in order to better understand the behavior of windowing, we simplify the displaying of timestamp values to not show the trailing zeros, e.g. 2020-04-15 08:05 should be displayed as 2020-04-15 08:05:00.000 in Flink SQL Client if the type is TIMESTAMP(3).