Joins

Batch Streaming

Flink SQL支持对动态表进行复杂而灵活的连接操作。 为了处理不同的场景,需要多种查询语义,因此有几种不同类型的 Join。

默认情况下,joins 的顺序是没有优化的。表的 join 顺序是在 FROM 从句指定的。可以通过把更新频率最低的表放在第一个、频率最高的放在最后这种方式来微调 join 查询的性能。需要确保表的顺序不会产生笛卡尔积,因为不支持这样的操作并且会导致查询失败。

Regular Joins

Regular join 是最通用的 join 类型。在这种 join 下,join 两侧表的任何新记录或变更都是可见的,并会影响整个 join 的结果。 例如:如果左边有一条新纪录,在 Product.id 相等的情况下,它将和右边表的之前和之后的所有记录进行 join。

  1. SELECT * FROM Orders
  2. INNER JOIN Product
  3. ON Orders.productId = Product.id

对于流式查询,regular join 的语法是最灵活的,允许任何类型的更新(插入、更新、删除)输入表。 然而,这种操作具有重要的操作意义:Flink 需要将 Join 输入的两边数据永远保持在状态中。 因此,计算查询结果所需的状态可能会无限增长,这取决于所有输入表的输入数据量。你可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意:这样做可能会影响查询的正确性。查看 查询配置 了解详情。

INNER Equi-JOIN

根据 join 限制条件返回一个简单的笛卡尔积。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。(cross join 指的是类似 SELECT * FROM table_a CROSS JOIN table_b,theta join 指的是类似 SELECT * FROM table_a, table_b

  1. SELECT *
  2. FROM Orders
  3. INNER JOIN Product
  4. ON Orders.product_id = Product.id

OUTER Equi-JOIN

返回所有符合条件的笛卡尔积(即:所有通过 join 条件连接的行),加上所有外表没有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins,即:至少有一个等值条件。不支持任意的 cross join 和 theta join。

  1. SELECT *
  2. FROM Orders
  3. LEFT JOIN Product
  4. ON Orders.product_id = Product.id
  5. SELECT *
  6. FROM Orders
  7. RIGHT JOIN Product
  8. ON Orders.product_id = Product.id
  9. SELECT *
  10. FROM Orders
  11. FULL OUTER JOIN Product
  12. ON Orders.product_id = Product.id

Interval Joins

返回一个符合 join 条件和时间限制的简单笛卡尔积。Interval join 需要至少一个 equi-join 条件和一个 join 两边都包含的时间限定 join 条件。范围判断可以定义成就像一个条件(<, <=, >=, >),也可以是一个 BETWEEN 条件,或者两边表的一个相同类型(即:处理时间 或 事件时间)的时间属性 的等式判断。

例如:如果订单是在被接收到4小时后发货,这个查询会把所有订单和它们相应的 shipments join 起来。

  1. SELECT *
  2. FROM Orders o, Shipments s
  3. WHERE o.id = s.order_id
  4. AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

下面列举了一些有效的 interval join 时间条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

对于流式查询,对比 regular join,interval join 只支持有时间属性的非更新表。 由于时间属性是递增的,Flink 从状态中移除旧值也不会影响结果的正确性。

Temporal Joins

时态表(Temporal table)是一个随时间变化的表:在 Flink 中被称为动态表。时态表中的行与一个或多个时间段相关联,所有 Flink 中的表都是时态的(Temporal)。 时态表包含一个或多个版本的表快照,它可以是一个变化的历史表,跟踪变化(例如,数据库变化日志,包含所有快照)或一个变化的维度表,也可以是一个将变更物化的维表(例如,存放最终快照的数据表)。

事件时间 Temporal Join

基于事件时间的 Temporal join 允许对版本表进行 join。 这意味着一个表可以使用变化的元数据来丰富,并在某个时间点检索其具体值。

Temporal Joins 使用任意表(左侧输入/探测端)的每一行与版本表中对应的行进行关联(右侧输入/构建端)。 Flink 使用 SQL:2011标准 中的 FOR SYSTEM_TIME AS OF 语法去执行操作。 Temporal join 的语法如下:

  1. SELECT [column_list]
  2. FROM table1 [AS <alias1>]
  3. [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
  4. ON table1.column-name1 = table2.column-name1

有了事件时间属性(即:rowtime 属性),就能检索到过去某个时间点的值。 这允许在一个共同的时间点上连接这两个表。 版本表将存储自最后一个 watermark 以来的所有版本(按时间标识)。

例如,假设我们有一个订单表,每个订单都有不同货币的价格。 为了正确地将该表统一为单一货币(如美元),每个订单都需要与下单时相应的汇率相关联。

  1. -- Create a table of orders. This is a standard
  2. -- append-only dynamic table.
  3. CREATE TABLE orders (
  4. order_id STRING,
  5. price DECIMAL(32,2),
  6. currency STRING,
  7. order_time TIMESTAMP(3),
  8. WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
  9. ) WITH (/* ... */);
  10. -- Define a versioned table of currency rates.
  11. -- This could be from a change-data-capture
  12. -- such as Debezium, a compacted Kafka topic, or any other
  13. -- way of defining a versioned table.
  14. CREATE TABLE currency_rates (
  15. currency STRING,
  16. conversion_rate DECIMAL(32, 2),
  17. update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
  18. WATERMARK FOR update_time AS update_time - INTERVAL '15' SECOND,
  19. PRIMARY KEY(currency) NOT ENFORCED
  20. ) WITH (
  21. 'connector' = 'kafka',
  22. 'value.format' = 'debezium-json',
  23. /* ... */
  24. );
  25. SELECT
  26. order_id,
  27. price,
  28. orders.currency,
  29. conversion_rate,
  30. order_time
  31. FROM orders
  32. LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
  33. ON orders.currency = currency_rates.currency;
  34. order_id price currency conversion_rate order_time
  35. ======== ===== ======== =============== =========
  36. o_001 11.11 EUR 1.14 12:00:00
  37. o_002 12.51 EUR 1.10 12:06:00

注意: 事件时间 temporal join 是通过左和右两侧的 watermark 触发的; 这里的 INTERVAL 时间减法用于等待后续事件,以确保 join 满足预期。 请确保 join 两边设置了正确的 watermark 。

注意: 事件时间 temporal join 需要包含主键相等的条件,即:currency_rates 表的主键 currency_rates.currency 包含在条件 orders.currency = currency_rates.currency 中。

regular joins 相比,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 的结果也不会被影响。 与 interval joins 对比,temporal join没有定义join的时间窗口。 Probe side (例子中的 orders 表)的记录总是在 time 属性指定的时间与 build side 的版本行进行连接。因此,build side 表的行可能已经过时了。 随着时间的推移,不再被需要的记录版本(对于给定的主键)将从状态中删除。

处理时间 Temporal Join

基于处理时间的 temporal join 使用处理时间属性将数据与外部版本表(例如 mysql、hbase)的最新版本相关联。

通过定义一个处理时间属性,这个 join 总是返回最新的值。可以将 build side 中被查找的表想象成一个存储所有记录简单的 HashMap<K,V>。 这种 join 的强大之处在于,当无法在 Flink 中将表具体化为动态表时,它允许 Flink 直接针对外部系统工作。

下面这个处理时间 temporal join 示例展示了一个追加表 ordersLatestRates 表进行 join。 LatestRates 是一个最新汇率的维表,比如 HBase 表,在 10:1510:3010:52这些时间,LatestRates 表的数据看起来是这样的:

  1. 10:15> SELECT * FROM LatestRates;
  2. currency rate
  3. ======== ======
  4. US Dollar 102
  5. Euro 114
  6. Yen 1
  7. 10:30> SELECT * FROM LatestRates;
  8. currency rate
  9. ======== ======
  10. US Dollar 102
  11. Euro 114
  12. Yen 1
  13. 10:52> SELECT * FROM LatestRates;
  14. currency rate
  15. ======== ======
  16. US Dollar 102
  17. Euro 116 <==== changed from 114 to 116
  18. Yen 1

LastestRates 表的数据在 10:1510:30 是相同的。 欧元(Euro)的汇率(rate)在 10:52 从 114 变更为 116。

Orders 表示支付金额的 amountcurrency的追加表。 例如:在 10:15 ,有一个金额为 2 Euro 的 order。

  1. SELECT * FROM Orders;
  2. amount currency
  3. ====== =========
  4. 2 Euro <== arrived at time 10:15
  5. 1 US Dollar <== arrived at time 10:30
  6. 2 Euro <== arrived at time 10:52

给出下面这些表,我们希望所有 Orders 表的记录转换为一个统一的货币。

  1. amount currency rate amount*rate
  2. ====== ========= ======= ============
  3. 2 Euro 114 228 <== arrived at time 10:15
  4. 1 US Dollar 102 102 <== arrived at time 10:30
  5. 2 Euro 116 232 <== arrived at time 10:52

目前,temporal join 还不支持与任意 view/table 的最新版本 join 时使用 FOR SYSTEM_TIME AS OF 语法。可以像下面这样使用 temporal table function 语法来实现(时态表函数):

  1. SELECT
  2. o_amount, r_rate
  3. FROM
  4. Orders,
  5. LATERAL TABLE (Rates(o_proctime))
  6. WHERE
  7. r_currency = o_currency

注意 Temporal join 不支持与 table/view 的最新版本进行 join 时使用 FOR SYSTEM_TIME AS OF 语法是出于语义考虑,因为左流的连接处理不会等待 temporal table 的完整快照,这可能会误导生产环境中的用户。处理时间 temporal join 使用 temporal table function 也存在相同的语义问题,但它已经存在了很长时间,因此我们从兼容性的角度支持它。

processing-time 的结果是不确定的。 processing-time temporal join 常常用在使用外部系统来丰富流的数据。(例如维表)

regular joins 的差异,就算 build side(例子中的 currency_rates 表)发生变更了,之前的 temporal table 结果也不会被影响。 与 interval joins 的差异,temporal join 没有定义数据连接的时间窗口。即:旧数据没存储在状态中。

Temporal Table Function Join

使用 temporal table function 去 join 表的语法和 Table Function 相同。

注意:目前只支持 inner join 和 left outer join。

假设Rates是一个 temporal table function,这个 join 在 SQL 中可以被表达为:

  1. SELECT
  2. o_amount, r_rate
  3. FROM
  4. Orders,
  5. LATERAL TABLE (Rates(o_proctime))
  6. WHERE
  7. r_currency = o_currency

上述 temporal table DDL 和 temporal table function 的主要区别在于:

  • SQL 中可以定义 temporal table DDL,但不能定义 temporal table 函数;
  • temporal table DDL 和 temporal table function 都支持 temporal join 版本表,但只有 temporal table function 可以 temporal join 任何表/视图的最新版本(即”处理时间 Temporal Join”)。

Lookup Join

lookup join 通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性,另一个表由查找源连接器(lookup source connnector)支持。

lookup join 和上面的 处理时间 Temporal Join 语法相同,右表使用查找源连接器支持。

下面的例子展示了 lookup join 的语法。

  1. -- Customers is backed by the JDBC connector and can be used for lookup joins
  2. CREATE TEMPORARY TABLE Customers (
  3. id INT,
  4. name STRING,
  5. country STRING,
  6. zip STRING
  7. ) WITH (
  8. 'connector' = 'jdbc',
  9. 'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  10. 'table-name' = 'customers'
  11. );
  12. -- enrich each order with customer information
  13. SELECT o.order_id, o.total, c.country, c.zip
  14. FROM Orders AS o
  15. JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
  16. ON o.customer_id = c.id;

在上面的示例中,Orders 表由保存在 MySQL 数据库中的 Customers 表数据来丰富。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时,Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件,在上面的示例中是 o.customer_id = c.id

数组展开

返回给定数组中每个元素的新行。尚不支持 WITH ORDINALITY(会额外生成一个标识顺序的整数列)展开。

  1. SELECT order_id, tag
  2. FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Table Function

将表与表函数的结果联接。左侧(外部)表的每一行都与表函数的相应调用产生的所有行相连接。用户自定义表函数 必须在使用前注册。

INNER JOIN

如果表函数调用返回一个空结果,那么左表的这行数据将不会输出。

  1. SELECT order_id, res
  2. FROM Orders,
  3. LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

如果表函数调用返回了一个空结果,则保留相应的行,并用空值填充未关联到的结果。当前,针对 lateral table 的 left outer join 需要 ON 子句中有一个固定的 TRUE 连接条件。

  1. SELECT order_id, res
  2. FROM Orders
  3. LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  4. ON TRUE