Table API
Table API是用于流和批处理的统一关系API。 Table API查询可以在批量或流式输入上运行而无需修改。 Table API是SQL语言的超级集合,专门用于与Apache Flink一起使用。 Table API是Scala和Java语言集成API。 Table API查询不是像SQL一样将字符串值指定为SQL,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。
Table API与Flink的SQL集成共享其API的许多概念和部分。查看Common Concepts&API以了解如何注册表或创建Table
对象。该流概念页讨论流如动态表和时间属性,具体的概念。
以下示例假定Orders
使用属性调用的已注册表(a, b, c, rowtime)
。该rowtime
字段是流式传输中的逻辑时间属性或批处理中的常规时间戳字段。
概述和示例
Table API可用于Scala和Java。Scala Table API利用Scala表达式,Java Table API基于字符串,这些字符串被解析并转换为等效表达式。
以下示例显示了Scala和Java Table API之间的差异。表程序在批处理环境中执行。它Orders
按字段Scan表,a
并按组计算结果行。表程序的结果转换为DataSet
类型Row
并打印。
通过导入启用Java Table API org.apache.flink.table.api.java.*
。以下示例显示如何构造Java Table API程序以及如何将表达式指定为字符串。
// environment configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table in table environment
// ...
// specify table program
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
Table counts = orders
.groupBy("a")
.select("a, b.count as cnt");
// conversion to DataSet
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();
The Scala Table API is enabled by importing org.apache.flink.api.scala.
and org.apache.flink.table.api.scala.
.
The following example shows how a Scala Table API program is constructed. Table attributes are referenced using Scala Symbols, which start with an apostrophe character ('
).
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
// environment configuration
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// register Orders table in table environment
// ...
// specify table program
val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
val result = orders
.groupBy('a)
.select('a, 'b.count as 'cnt)
.toDataSet[Row] // conversion to DataSet
.print()
下一个示例显示了一个更复杂的 Table API程序。程序再次扫描Orders
表格。它过滤空值,规范化a
String类型的字段,并计算每小时和产品a
的平均计费金额b
。
// environment configuration
// ...
// specify table program
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
Table result = orders
.filter("a.isNotNull && b.isNotNull && c.isNotNull")
.select("a.lowerCase(), b, rowtime")
.window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
.groupBy("hourlyWindow, a")
.select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
// environment configuration
// ...
// specify table program
val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
val result: Table = orders
.filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
.select('a.lowerCase(), 'b, 'rowtime)
.window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
.groupBy('hourlyWindow, 'a)
.select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)
由于 Table API是批处理和流数据的统一API,因此两个示例程序都可以在批处理和流输入上执行,而无需对表程序本身进行任何修改。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅流式概念)。
算子操作
Table API支持以下 算子操作。请注意,并非所有 算子操作都可用于批处理和流式传输; 他们被相应地标记。
Scan,Projection和过滤
算子 | 描述 |
---|---|
Scan批量流 | 与SQL查询中的FROM子句类似。执行已注册表的扫描。
|
Select批量流 | 与SQL SELECT语句类似。执行选择 算子操作。您可以使用star( )作为通配符,选择表中的所有列。
|
As批量流 | 重命名字段。
|
Where / Filter Batch Streaming | 与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。要么
|
Operators | Description |
---|---|
ScanBatchStreaming | Similar to the FROM clause in a SQL query. Performs a scan of a registered table.
|
SelectBatchStreaming | Similar to a SQL SELECT statement. Performs a select operation.You can use star ( ) to act as a wild card, selecting all of the columns in the table.
|
AsBatchStreaming | Renames fields.
|
Where / FilterBatchStreaming | Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.or
|
聚合
算子 | 描述 |
---|---|
GroupBy聚合批处理流结果更新 | 与SQL GROUP BY子句类似。使用以下运行的聚合 算子对分组键上的行进行分组,以按组聚合行。注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts。 |
GroupBy窗口聚合批量流 | 组和聚合组窗口上的表以及可能的一个或多个分组键。
|
Over Windows聚合流 | 与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅over windows部分。注意:必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个时间属性上指定ORDER BY 。 |
Distinct批量流结果更新 | 与SQL DISTINCT子句类似。返回具有不同值组合的记录。注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts。 |
Operators | Description |
---|---|
GroupBy AggregationBatchStreamingResult Updating | Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.Note: For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. |
GroupBy Window AggregationBatchStreaming | Groups and aggregates a table on a group window and possibly one or more grouping keys.
|
Over Window AggregationStreaming | Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the over windows section for more details.Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute. |
DistinctBatch | Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. |
Join
算子 | 描述 |
---|---|
Inner Join批量流 | 与SQL JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须通过连接 算子或使用where或filter 算子定义至少一个相等连接谓词。注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts。 |
Outer Join批处理流结果更新 | 与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts。 |
Time-windowed Join批量流 | 注意:时间窗口连接是可以以流方式处理的常规连接的子集。时间窗口连接需要至少一个等连接谓词和一个限制双方时间的连接条件。这样的条件可以由两个适当的范围谓词(<, <=, >=, > )或单个等式谓词来定义,该单个等式谓词比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:- ltime === rtime - ltime >= rtime && ltime < rtime + 10.minutes
|
TableFunction Inner Join批量流 | 使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果其表函数调用返回空结果,则删除左(外)表的一行。
|
TableFunction Left Outer Join Batch Streaming | 使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果表函数调用返回空结果,则保存相应的外部行,并使用空值填充结果。注意:目前,左外连接的表函数的谓词只能是空或文字true 。
|
Operators | Description |
---|---|
Inner JoinBatchStreaming | Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. |
Outer JoinBatchStreamingResult Updating | Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. |
Time-windowed JoinBatchStreaming | Note: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, > ) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.For example, the following predicates are valid window join conditions:- 'ltime === 'rtime - 'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes
|
TableFunction Inner JoinBatchStreaming | Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
|
TableFunction Left Outer JoinBatchStreaming | Joins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.Note: Currently, the predicate of a table function left outer join can only be empty or literal true .
|
设置 算子操作
算子 | 描述 |
---|---|
Union批次 | 与SQL UNION子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。
|
UnionAll Batch Streaming | 与SQL UNION ALL子句类似。工会两张桌子。两个表必须具有相同的字段类型。
|
Intersect批次 | 类似于SQL INTERSECT子句。Intersect返回两个表中存在的记录。如果一个或两个表不止一次出现记录,则只返回一次,即结果表没有重复记录。两个表必须具有相同的字段类型。
|
IntersectAll Batch | 类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中存在的记录。如果两个表中的记录多次出现,则返回的值与两个表中的记录一样多,即生成的表可能具有重复记录。两个表必须具有相同的字段类型。
|
Minus批 | 与SQL EXCEPT子句类似。减号返回左表中右表中不存在的记录。左表中的重复记录只返回一次,即删除重复项。两个表必须具有相同的字段类型。
|
MinusAll Batch | 类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中存在的重复次数。两个表必须具有相同的字段类型。
|
In批量流中 | 与SQL IN子句类似。如果表达式存在于给定的表子查询中,则返回true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。注意:对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts。 |
Operators | Description |
---|---|
UnionBatch | Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.
|
UnionAllBatchStreaming | Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.
|
IntersectBatch | Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.
|
IntersectAllBatch | Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.
|
MinusBatch | Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.
|
MinusAllBatch | Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.
|
InBatchStreaming | Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. |
OrderBy,Offset&Fetch
算子 | 描述 |
---|---|
Order By批次 | 与SQL ORDER BY子句类似。返回跨所有并行分区全局排序的记录。
|
Offset & Fetch批次 | 类似于SQL OFFSET和FETCH子句。偏移和提取限制从排序结果返回的记录数。Offset和Fetch在技术上是Order By 算子的一部分,因此必须以它为前缀。
|
Operators | Description |
---|---|
Order ByBatch | Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.
|
Offset & FetchBatch | Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.
|
Insert
算子 | 描述 |
---|---|
Insert批量流处理 | 类似于SQL查询中的INSERT INTO子句。执行插入已注册的输出表。输出表必须在TableEnvironment中注册(请参阅注册TableSink)。此外,已注册表的模式必须与查询的模式匹配。
|
Operators | Description |
---|---|
Insert IntoBatchStreaming | Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.Output tables must be registered in the TableEnvironment (see Register a TableSink). Moreover, the schema of the registered table must match the schema of the query.
|
GroupWindows
组窗口根据时间或行计数间隔将行组聚合为有限组,并按组评估聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。
Windows是使用该window(w: Window)
子句定义的,需要使用该as
子句指定的别名。为了按窗口对表进行分组,必须在groupBy(…)
子句中引用窗口别名,就像常规分组属性一样。以下示例显示如何在表上定义窗口聚合。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w") // group the table by window w
.select("b.sum"); // aggregate
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w) // group the table by window w
.select('b.sum) // aggregate
在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即groupBy(…)
子句引用窗口别名和至少一个附加属性。一个groupBy(…)
仅引用一个窗口别名(如在上面的例子)子句只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, b.sum"); // aggregate
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'b.sum) // aggregate
窗口属性诸如开始,结束,或一个时间窗口的rowtime时间戳可以在选择语句被添加为窗口别名作为的一个属性w.start
,w.end
以及w.rowtime
分别。窗口开始和行时间戳是包含的低窗口和超窗口边界。相反,窗口结束时间戳是独占的上窗口边界。例如,从下午2点开始的30分钟的翻滚窗口将具有14:00:00.000
开始时间戳,14:29:59.999
行时间戳和14:30:00.000
结束时间戳。
Table table = input
.window([Window w].as("w")) // define window with alias w
.groupBy("w, a") // group the table by attribute a and window w
.select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
val table = input
.window([w: Window] as 'w) // define window with alias w
.groupBy('w, 'a) // group the table by attribute a and window w
.select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps
该Window
参数定义行如何映射到窗口。Window
不是用户可以实现的接口。相反, Table API提供了一组Window
具有特定语义的预定义类,这些类被转换为底层DataStream
或DataSet
算子操作。支持的窗口定义如下所示。
翻滚(翻滚的Windows)
翻滚窗口将行分配给固定长度的非重叠连续窗口。例如,5分钟的翻滚窗口以5分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义翻滚窗口。
使用Tumble
类定义翻滚窗口如下:
方法 | 描述 |
---|---|
over | 定义窗口的长度,可以是时间或行计数间隔。 |
on | 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性。 |
as | 为窗口指定别名。别名用于引用以下groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select() 。 |
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
// Tumbling Event-time Window
.window(Tumble over 10.minutes on 'rowtime as 'w)
// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.minutes on 'proctime as 'w)
// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble over 10.rows on 'proctime as 'w)
滑动(滑动窗口)
滑动窗口具有固定大小,并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,15分钟大小和5分钟滑动间隔的滑动窗口将每行分配给3个不同的15分钟大小的窗口,这些窗口以5分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。
滑动窗口使用Slide
如下类定义:
方法 | 描述 |
---|---|
over | 定义窗口的长度,可以是时间或行计数间隔。 |
every | 定义滑动间隔,可以是时间间隔也可以是行计数间隔。滑动间隔必须与大小间隔的类型相同。 |
on | 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性。 |
as | 为窗口指定别名。别名用于引用以下groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select() 。 |
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
// Sliding Event-time Window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide over 10.rows every 5.rows on 'proctime as 'w)
会话(会话窗口)
会话窗口没有固定的大小,但它们的边界由不活动的间隔定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。例如,如果在30分钟不活动后观察到一行,则会开始一个30分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在30分钟内未添加任何行,则会关闭。会话窗口可以在事件时间或处理时间上工作。
使用Session
类定义会话窗口,如下所示:
方法 | 描述 |
---|---|
withGap | 将两个窗口之间的间隔定义为时间间隔。 |
on | 组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性。 |
as | 为窗口指定别名。别名用于引用以下groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳select() 。 |
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
// Session Event-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session withGap 10.minutes on 'proctime as 'w)
OverWindows
从标准SQL(OVER
子句)已知过窗口聚合,并在SELECT
查询的子句中定义。与GROUP BY
子句中指定的组窗口不同,在窗口上不会折叠行。而是通过窗口聚合计算每个输入行在其相邻行的范围内的聚合。
OverWindows是使用window(w: OverWindow*)
子句定义的,并通过select()
方法中的别名引用。以下示例显示如何在表上定义过窗口聚合。
Table table = input
.window([OverWindow w].as("w")) // define over window with alias w
.select("a, b.sum over w, c.min over w"); // aggregate over the over window w
val table = input
.window([w: OverWindow] as 'w) // define over window with alias w
.select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w
的OverWindow
限定范围在其聚集计算的行。OverWindow
不是用户可以实现的接口。相反, Table API提供了Over
用于配置覆盖窗口属性的类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口。支持的窗口定义在Over
(和其他类)上公开为方法,如下所示:
方法 | Required | 描述 |
---|---|---|
partitionBy | Optional | 定义一个或多个属性上的输入分区。每个分区都单独排序,聚合函数分别应用于每个分区。注意:在流式环境中,如果窗口包含partition by子句,则只能并行计算窗口聚合。no / notpartitionBy(…) 流由单个非并行任务处理。 |
orderBy | Required | 定义每个分区中行的顺序,从而定义聚合函数应用于行的顺序。注意:对于流式查询,这必须是声明的事件时间或处理时间属性。目前,仅支持单个排序属性。 |
preceding | Required | 定义窗口中包含的行的间隔,并在当前行之前。间隔可以指定为时间或行计数间隔。在窗口上限定具有间隔的大小,例如,10.minutes 对于时间间隔或10.rows 行计数间隔。使用常量(即,UNBOUNDED_RANGE 时间间隔或UNBOUNDED_ROW 行计数间隔)指定在窗口上无界限。在Windows上无限制地从分区的第一行开始。 |
following | Optional | 定义窗口中包含的行的窗口间隔,并跟随当前行。必须在与前一个间隔(时间或行计数)相同的单位中指定间隔。目前,不支持在当前行之后包含行的窗口。相反,您可以指定两个常量之一:- CURRENT_ROW 将窗口的上限设置为当前行。- CURRENT_RANGE 设置窗口的上限以对当前行的键进行排序,即窗口中包含与当前行具有相同排序键的所有行。如果following 省略该子句,则将时间间隔窗口CURRENT_RANGE 的上限定义为,并将行计数间隔窗口的上限定义为CURRENT_ROW 。 |
as | Required | 为覆盖窗口指定别名。别名用于引用以下select() 子句中的over window 。 |
注意:目前,同一select()
调用中的所有聚合函数必须计算相同的窗口。
在Windows上无限制
// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
限制在Windows上
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)
数据类型
Table API建立在Flink的DataSet和DataStream API之上。在内部,它还使用Flink TypeInformation
来定义数据类型。完全支持的类型列在org.apache.flink.table.api.Types
。下表总结了 Table API类型,SQL类型和生成的Java类之间的关系。
Table API | SQL | Java类型 |
---|---|---|
Types.STRING | VARCHAR | java.lang.String |
Types.BOOLEAN | BOOLEAN | java.lang.Boolean |
Types.BYTE | TINYINT | java.lang.Byte |
Types.SHORT | SMALLINT | java.lang.Short |
Types.INT | INTEGER, INT | java.lang.Integer |
Types.LONG | BIGINT | java.lang.Long |
Types.FLOAT | REAL, FLOAT | java.lang.Float |
Types.DOUBLE | DOUBLE | java.lang.Double |
Types.DECIMAL | DECIMAL | java.math.BigDecimal |
Types.SQL_DATE | DATE | java.sql.Date |
Types.SQL_TIME | TIME | java.sql.Time |
Types.SQL_TIMESTAMP | TIMESTAMP(3) | java.sql.Timestamp |
Types.INTERVAL_MONTHS | INTERVAL YEAR TO MONTH | java.lang.Integer |
Types.INTERVAL_MILLIS | INTERVAL DAY TO SECOND(3) | java.lang.Long |
Types.PRIMITIVE_ARRAY | ARRAY | 例如 int[] |
Types.OBJECT_ARRAY | ARRAY | 例如 java.lang.Byte[] |
Types.MAP | MAP | java.util.HashMap |
Types.MULTISET | MULTISET | 例如,java.util.HashMap<String, Integer> 对于多重集合String |
Types.ROW | ROW | org.apache.flink.types.Row |
通用类型和(嵌套)复合类型(例如,POJO,元组,Row,Scala案例类)也可以是行的字段。
可以使用值访问函数访问具有任意嵌套的复合类型的字段。
通用类型被视为黑盒子,可以由用户定义的函数传递或处理。
表达式语法
前面部分中的一些 算子需要一个或多个表达式。可以使用嵌入式Scala DSL或字符串指定表达式。请参阅上面的示例以了解如何指定表达式。
这是表达式的EBNF语法:
expressionList = expression , { "," , expression } ;
expression = timeIndicator | overConstant | alias ;
alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
term = product , [ ( "+" | "-" ) , product ] ;
product = unary , [ ( "*" | "/" | "%") , unary ] ;
unary = [ "!" | "-" ] , composite ;
composite = over | nullLiteral | suffixed | atom ;
suffixed = interval | cast | as | if | functionCall ;
interval = timeInterval | rowInterval ;
timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
rowInterval = composite , "." , "rows" ;
cast = composite , ".cast(" , dataType , ")" ;
dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
as = composite , ".as(" , fieldReference , ")" ;
if = composite , ".?(" , expression , "," , expression , ")" ;
functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
atom = ( "(" , expression , ")" ) | literal | fieldReference ;
fieldReference = "*" | identifier ;
nullLiteral = "Null(" , dataType , ")" ;
timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
over = composite , "over" , fieldReference ;
overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;
这里literal
是一个有效的Java文字,fieldReference
指定数据中的列(或使用的所有列*
),并functionIdentifier
指定支持的标量函数。列名和函数名遵循Java标识符语法。指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用 算子和函数。
如果需要使用精确数值或大小数, Table API也支持JavaBigDecimal类型。在Scala Table API中,小数可以BigDecimal("123456")
通过附加“p” 来定义,也可以在Java中定义,例如123456p
。
为了使用时态值, Table API支持Java SQL的Date,Time和Timestamp类型。在Scala的 Table API文本可以通过使用来定义java.sql.Date.valueOf("2016-06-27")
,java.sql.Time.valueOf("10:10:42")
或java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")
。在Java和Scala Table API还支持调用"2016-06-27".toDate()
,"10:10:42".toTime()
以及"2016-06-27 10:10:42.123".toTimestamp()
对String转化成时间类型。注意:由于Java临时SQL类型与时区有关,因此请确保Flink Client和所有TaskManagers使用相同的时区。
时间间隔可以表示为月数(Types.INTERVAL_MONTHS
)或毫秒数(Types.INTERVAL_MILLIS
)。可以添加或减去相同类型的间隔(例如1.hour + 10.minutes
)。可以将毫秒的间隔添加到时间点(例如"2016-08-10".toDate + 5.days
)。