去重

Batch Streaming

去重是去掉重复的行,只保留第一或者最后一行。有时,上游的 ETL 生成的数据不是端到端精确一次的。在发生故障恢复后,可能会导致结果下游中出现重复记录。这些重复记录会影响下游的分析工作,比如:SUMCOUNT的结果会偏大,所以需要先去重再进行下一步的分析。

Flink 使用 ROW_NUMBER() 去除重复数据,就像 Top-N 查询一样。其实,去重就是 Top-N 在 N 为 1 时的特例,并且去重必须要求按照处理或者事件时间排序。

下面的例子展示了去重语句的语法:

  1. SELECT [column_list]
  2. FROM (
  3. SELECT [column_list],
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
  5. ORDER BY time_attr [asc|desc]) AS rownum
  6. FROM table_name)
  7. WHERE rownum = 1

参数说明:

  • ROW_NUMBER():为每一行分配一个唯一且连续的数字,从 1 开始。
  • PARTITION BY col1[, col2...]:指定分区键,即需要去重的键。
  • ORDER BY time_attr [asc|desc]:指定排序列,必须是 时间属性。目前 Flink 支持 处理时间属性事件时间属性。Order by ASC 保留第一行,DESC 保留最后一行。
  • WHERE rownum = 1:Flink 需要这个条件来识别去重语句。

注意:上述格式必须严格遵守,否则优化器无法识别。

下面的例子展示了在流表上如何指定去重 SQL 查询:

  1. CREATE TABLE Orders (
  2. order_id STRING,
  3. user STRING,
  4. product STRING,
  5. num BIGINT,
  6. proctime AS PROCTIME()
  7. ) WITH (...);
  8. -- remove duplicate rows on order_id and keep the first occurrence row,
  9. -- because there shouldn't be two orders with the same order_id.
  10. SELECT order_id, user, product, num
  11. FROM (
  12. SELECT *,
  13. ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
  14. FROM Orders)
  15. WHERE row_num = 1