窗口 Top-N

Batch Streaming

窗口 Top-N 是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。

与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态。 因此,窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于 窗口表值函数上。 另外,窗口 Top-N 可以用于基于 窗口表值函数 的操作之上,比如 窗口聚合窗口 Top-N窗口关联

注意:SESSION 窗口 Top-N 目前不支持批模式。

窗口 Top-N 的语法和普通的 Top-N 相同,更多信息参见:Top-N 文档。 除此之外,窗口 Top-N 需要 PARTITION BY 子句包含 窗口表值函数窗口聚合 产生的 window_startwindow_end。 否则优化器无法翻译。

下面展示了窗口 Top-N 的语法:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
  5. ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  6. FROM table_name) -- relation applied windowing TVF
  7. WHERE rownum <= N [AND conditions]

示例

在窗口聚合后进行窗口 Top-N

下面的示例展示了在10分钟的滚动窗口上计算销售额位列前三的供应商。

  1. -- tables must have time attribute, e.g. `bidtime` in this table
  2. Flink SQL> desc Bid;
  3. +-------------+------------------------+------+-----+--------+---------------------------------+
  4. | name | type | null | key | extras | watermark |
  5. +-------------+------------------------+------+-----+--------+---------------------------------+
  6. | bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
  7. | price | DECIMAL(10, 2) | true | | | |
  8. | item | STRING | true | | | |
  9. | supplier_id | STRING | true | | | |
  10. +-------------+------------------------+------+-----+--------+---------------------------------+
  11. Flink SQL> SELECT * FROM Bid;
  12. +------------------+-------+------+-------------+
  13. | bidtime | price | item | supplier_id |
  14. +------------------+-------+------+-------------+
  15. | 2020-04-15 08:05 | 4.00 | A | supplier1 |
  16. | 2020-04-15 08:06 | 4.00 | C | supplier2 |
  17. | 2020-04-15 08:07 | 2.00 | G | supplier1 |
  18. | 2020-04-15 08:08 | 2.00 | B | supplier3 |
  19. | 2020-04-15 08:09 | 5.00 | D | supplier4 |
  20. | 2020-04-15 08:11 | 2.00 | B | supplier3 |
  21. | 2020-04-15 08:13 | 1.00 | E | supplier1 |
  22. | 2020-04-15 08:15 | 3.00 | H | supplier2 |
  23. | 2020-04-15 08:17 | 6.00 | F | supplier5 |
  24. +------------------+-------+------+-------------+
  25. Flink SQL> SELECT *
  26. FROM (
  27. SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
  28. FROM (
  29. SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
  30. FROM TABLE(
  31. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  32. GROUP BY window_start, window_end, supplier_id
  33. )
  34. ) WHERE rownum <= 3;
  35. +------------------+------------------+-------------+-------+-----+--------+
  36. | window_start | window_end | supplier_id | price | cnt | rownum |
  37. +------------------+------------------+-------------+-------+-----+--------+
  38. | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier1 | 6.00 | 2 | 1 |
  39. | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier4 | 5.00 | 1 | 2 |
  40. | 2020-04-15 08:00 | 2020-04-15 08:10 | supplier2 | 4.00 | 1 | 3 |
  41. | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier5 | 6.00 | 1 | 1 |
  42. | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier2 | 3.00 | 1 | 2 |
  43. | 2020-04-15 08:10 | 2020-04-15 08:20 | supplier3 | 2.00 | 1 | 3 |
  44. +------------------+------------------+-------------+-------+-----+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3)2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000

在窗口表值函数后进行窗口 Top-N

下面的示例展示了在10分钟的滚动窗口上计算价格位列前三的数据。

  1. Flink SQL> SELECT *
  2. FROM (
  3. SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
  4. FROM TABLE(
  5. TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
  6. ) WHERE rownum <= 3;
  7. +------------------+-------+------+-------------+------------------+------------------+--------+
  8. | bidtime | price | item | supplier_id | window_start | window_end | rownum |
  9. +------------------+-------+------+-------------+------------------+------------------+--------+
  10. | 2020-04-15 08:05 | 4.00 | A | supplier1 | 2020-04-15 08:00 | 2020-04-15 08:10 | 2 |
  11. | 2020-04-15 08:06 | 4.00 | C | supplier2 | 2020-04-15 08:00 | 2020-04-15 08:10 | 3 |
  12. | 2020-04-15 08:09 | 5.00 | D | supplier4 | 2020-04-15 08:00 | 2020-04-15 08:10 | 1 |
  13. | 2020-04-15 08:11 | 2.00 | B | supplier3 | 2020-04-15 08:10 | 2020-04-15 08:20 | 3 |
  14. | 2020-04-15 08:15 | 3.00 | H | supplier2 | 2020-04-15 08:10 | 2020-04-15 08:20 | 2 |
  15. | 2020-04-15 08:17 | 6.00 | F | supplier5 | 2020-04-15 08:10 | 2020-04-15 08:20 | 1 |
  16. +------------------+-------+------+-------------+------------------+------------------+--------+

注意: 为了更好地理解窗口行为,这里把 timestamp 值后面的0去掉了。例如:在 Flink SQL Client 中,如果类型是 TIMESTAMP(3)2020-04-15 08:05 应该显示成 2020-04-15 08:05:00.000

限制

目前,Flink只支持在滚动,滑动和累计 窗口表值函数后进行窗口 Top-N。基于会话窗口的Top-N将在将来版本中支持。