Table API

Table API是用于流和批处理的统一关系API。 Table API查询可以在批量或流式输入上运行而无需修改。 Table API是SQL语言的超级集合,专门用于与Apache Flink一起使用。 Table API是Scala和Java语言集成API。 Table API查询不是像SQL一样将字符串值指定为SQL,而是在Java或Scala中以嵌入语言的样式定义,并支持自动完成和语法验证等IDE支持。

Table API与Flink的SQL集成共享其API的许多概念和部分。查看Common Concepts&API以了解如何注册表或创建Table对象。流概念页讨论流如动态表和时间属性,具体的概念。

以下示例假定Orders使用属性调用的已注册表(a, b, c, rowtime)rowtime字段是流式传输中的逻辑时间属性或批处理中的常规时间戳字段。

概述和示例

Table API可用于Scala和Java。Scala Table API利用Scala表达式,Java Table API基于字符串,这些字符串被解析并转换为等效表达式。

以下示例显示了Scala和Java Table API之间的差异。表程序在批处理环境中执行。Orders按字段Scan表,a并按计算结果行。表程序的结果转换为DataSet类型Row并打印。

通过导入启用Java Table API org.apache.flink.table.api.java.*以下示例显示如何构造Java Table API程序以及如何将表达式指定为字符串。

  1. // environment configuration
  2. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  3. BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  4. // register Orders table in table environment
  5. // ...
  6. // specify table program
  7. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  8. Table counts = orders
  9. .groupBy("a")
  10. .select("a, b.count as cnt");
  11. // conversion to DataSet
  12. DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
  13. result.print();

The Scala Table API is enabled by importing org.apache.flink.api.scala. and org.apache.flink.table.api.scala..

The following example shows how a Scala Table API program is constructed. Table attributes are referenced using Scala Symbols, which start with an apostrophe character (').

  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.table.api.scala._
  3. // environment configuration
  4. val env = ExecutionEnvironment.getExecutionEnvironment
  5. val tEnv = TableEnvironment.getTableEnvironment(env)
  6. // register Orders table in table environment
  7. // ...
  8. // specify table program
  9. val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  10. val result = orders
  11. .groupBy('a)
  12. .select('a, 'b.count as 'cnt)
  13. .toDataSet[Row] // conversion to DataSet
  14. .print()

下一个示例显示了一个更复杂的 Table API程序。程序再次扫描Orders表格。它过滤空值,规范化aString类型的字段,并计算每小时和产品a的平均计费金额b

  1. // environment configuration
  2. // ...
  3. // specify table program
  4. Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)
  5. Table result = orders
  6. .filter("a.isNotNull && b.isNotNull && c.isNotNull")
  7. .select("a.lowerCase(), b, rowtime")
  8. .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
  9. .groupBy("hourlyWindow, a")
  10. .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
  1. // environment configuration
  2. // ...
  3. // specify table program
  4. val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime)
  5. val result: Table = orders
  6. .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
  7. .select('a.lowerCase(), 'b, 'rowtime)
  8. .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
  9. .groupBy('hourlyWindow, 'a)
  10. .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)

由于 Table API是批处理和流数据的统一API,因此两个示例程序都可以在批处理和流输入上执行,而无需对表程序本身进行任何修改。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅流式概念)。

算子操作

Table API支持以下 算子操作。请注意,并非所有 算子操作都可用于批处理和流式传输; 他们被相应地标记。

Scan,Projection和过滤

算子描述
Scan批量与SQL查询中的FROM子句类似。执行已注册表的扫描。
  1. Table orders = tableEnv.scan("Orders");
Select批量与SQL SELECT语句类似。执行选择 算子操作。
  1. Table orders = tableEnv.scan("Orders");Table result = orders.select("a, c as d");
您可以使用star()作为通配符,选择表中的所有列。
  1. Table result = orders.select("");
As批量重命名字段。
  1. Table orders = tableEnv.scan("Orders");Table result = orders.as("x, y, z, t");
Where / Filter Batch Streaming与SQL WHERE子句类似。过滤掉未通过过滤谓词的行。
  1. Table orders = tableEnv.scan("Orders");Table result = orders.where("b === 'red'");
要么
  1. Table orders = tableEnv.scan("Orders");Table result = orders.filter("a % 2 === 0");
OperatorsDescription
ScanBatchStreamingSimilar to the FROM clause in a SQL query. Performs a scan of a registered table.
  1. val orders: Table = tableEnv.scan("Orders")
SelectBatchStreamingSimilar to a SQL SELECT statement. Performs a select operation.
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.select('a, 'c as 'd)
You can use star () to act as a wild card, selecting all of the columns in the table.
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.select(')
AsBatchStreamingRenames fields.
  1. val orders: Table = tableEnv.scan("Orders").as('x, 'y, 'z, 't)
Where / FilterBatchStreamingSimilar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.filter('a % 2 === 0)
or
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.where('b === "red")

聚合

算子描述
GroupBy聚合批处理结果更新与SQL GROUP BY子句类似。使用以下运行的聚合 算子对分组键上的行进行分组,以按组聚合行。
  1. Table orders = tableEnv.scan("Orders");Table result = orders.groupBy("a").select("a, b.sum as d");
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts
GroupBy窗口聚合批量组和聚合组窗口上的表以及可能的一个或多个分组键。
  1. Table orders = tableEnv.scan("Orders");Table result = orders .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window .groupBy("a, w") // group by key and window .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate
Over Windows聚合与SQL OVER子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅over windows部分
  1. Table orders = tableEnv.scan("Orders");Table result = orders // define window .window(Over .partitionBy("a") .orderBy("rowtime") .preceding("UNBOUNDED_RANGE") .following("CURRENT_RANGE") .as("w")) .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate
注意:必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有PRREDING(UNBOUNDED和有界)到CURRENT ROW范围的窗口。尚不支持使用FOLLOWING的范围。必须在单个时间属性上指定ORDER BY
Distinct批量结果更新与SQL DISTINCT子句类似。返回具有不同值组合的记录。
  1. Table orders = tableEnv.scan("Orders");Table result = orders.distinct();
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts
OperatorsDescription
GroupBy AggregationBatchStreamingResult UpdatingSimilar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.groupBy('a).select('a, 'b.sum as 'd)
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details.
GroupBy Window AggregationBatchStreamingGroups and aggregates a table on a group window and possibly one or more grouping keys.
  1. val orders: Table = tableEnv.scan("Orders")val result: Table = orders .window(Tumble over 5.minutes on 'rowtime as 'w) // define window .groupBy('a, 'w) // group by key and window .select('a, w.start, 'w.end, 'w.rowtime, 'b.sum as 'd) // access window properties and aggregate
Over Window AggregationStreamingSimilar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the over windows section for more details.
  1. val orders: Table = tableEnv.scan("Orders")val result: Table = orders // define window .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) .select('a, 'b.avg over 'w, 'b.max over 'w, 'b.min over 'w) // sliding aggregate
Note: All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single time attribute.
DistinctBatchSimilar to a SQL DISTINCT clause. Returns records with distinct value combinations.
  1. val orders: Table = tableEnv.scan("Orders")val result = orders.distinct()
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details.

Join

算子描述
Inner Join批量与SQL JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须通过连接 算子或使用where或filter 算子定义至少一个相等连接谓词。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table result = left.join(right).where("a = d").select("a, b, e");
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts
Outer Join批处理结果更新与SQL LEFT / RIGHT / FULL OUTER JOIN子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
注意:对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts
Time-windowed Join批量注意:时间窗口连接是可以以流方式处理的常规连接的子集。时间窗口连接需要至少一个等连接谓词和一个限制双方时间的连接条件。这样的条件可以由两个适当的范围谓词(<, <=, >=, >)或单个等式谓词来定义,该单个等式谓词比较两个输入表的相同类型的时间属性(即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:- ltime === rtime- ltime >= rtime && ltime < rtime + 10.minutes
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");Table result = left.join(right) .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes") .select("a, b, e, ltime");
TableFunction Inner Join批量使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果其表函数调用返回空结果,则删除左(外)表的一行。
  1. // register functionTableFunction<String> split = new MySplitUDTF();tEnv.registerFunction("split", split);// joinTable orders = tableEnv.scan("Orders");Table result = orders .join(new Table(tEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v");
TableFunction Left Outer Join Batch Streaming使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果表函数调用返回空结果,则保存相应的外部行,并使用空值填充结果。注意:目前,左外连接的表函数的谓词只能是空或文字true
  1. // register functionTableFunction<String> split = new MySplitUDTF();tEnv.registerFunction("split", split);// joinTable orders = tableEnv.scan("Orders");Table result = orders .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v");
OperatorsDescription
Inner JoinBatchStreamingSimilar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'd, 'e, 'f)val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details.
Outer JoinBatchStreamingResult UpdatingSimilar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.
  1. val left = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)val right = tableEnv.fromDataSet(ds2, 'd, 'e, 'f)val leftOuterResult = left.leftOuterJoin(right, 'a === 'd).select('a, 'b, 'e)val rightOuterResult = left.rightOuterJoin(right, 'a === 'd).select('a, 'b, 'e)val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details.
Time-windowed JoinBatchStreamingNote: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<, <=, >=, >) or a single equality predicate that compares time attributes of the same type (i.e., processing time or event time) of both input tables.For example, the following predicates are valid window join conditions:- 'ltime === 'rtime- 'ltime >= 'rtime && 'ltime < 'rtime + 10.minutes
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c, 'ltime.rowtime)val right = ds2.toTable(tableEnv, 'd, 'e, 'f, 'rtime.rowtime)val result = left.join(right) .where('a === 'd && 'ltime >= 'rtime - 5.minutes && 'ltime < 'rtime + 10.minutes) .select('a, 'b, 'e, 'ltime)
TableFunction Inner JoinBatchStreamingJoins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
  1. // instantiate functionval split: TableFunction[] = new MySplitUDTF()// joinval result: Table = table .join(split('c) as ('s, 't, 'v)) .select('a, 'b, 's, 't, 'v)
TableFunction Left Outer JoinBatchStreamingJoins a table with a the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.Note: Currently, the predicate of a table function left outer join can only be empty or literal true.
  1. // instantiate functionval split: TableFunction[] = new MySplitUDTF()// joinval result: Table = table .leftOuterJoin(split('c) as ('s, 't, 'v)) .select('a, 'b, 's, 't, 'v)

设置 算子操作

算子描述
Union批次与SQL UNION子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.union(right);
UnionAll Batch Streaming与SQL UNION ALL子句类似。工会两张桌子。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.unionAll(right);
Intersect批次类似于SQL INTERSECT子句。Intersect返回两个表中存在的记录。如果一个或两个表不止一次出现记录,则只返回一次,即结果表没有重复记录。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table result = left.intersect(right);
IntersectAll Batch类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中存在的记录。如果两个表中的记录多次出现,则返回的值与两个表中的记录一样多,即生成的表可能具有重复记录。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "d, e, f");Table result = left.intersectAll(right);
Minus与SQL EXCEPT子句类似。减号返回左表中右表中不存在的记录。左表中的重复记录只返回一次,即删除重复项。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.minus(right);
MinusAll Batch类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。在左表中出现n次并在右表中出现m次的记录返回(n-m)次,即,删除右表中存在的重复次数。两个表必须具有相同的字段类型。
  1. Table left = tableEnv.fromDataSet(ds1, "a, b, c");Table right = tableEnv.fromDataSet(ds2, "a, b, c");Table result = left.minusAll(right);
In批量流中与SQL IN子句类似。如果表达式存在于给定的表子查询中,则返回true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。
  1. Table left = ds1.toTable(tableEnv, "a, b, c");Table right = ds2.toTable(tableEnv, "a");// using implicit registrationTable result = left.select("a, b, c").where("a.in(" + right + ")");// using explicit registrationtableEnv.registerTable("RightTable", right);Table result = left.select("a, b, c").where("a.in(RightTable)");
注意:对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅Streaming Concepts
OperatorsDescription
UnionBatchSimilar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'a, 'b, 'c)val result = left.union(right)
UnionAllBatchStreamingSimilar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'a, 'b, 'c)val result = left.unionAll(right)
IntersectBatchSimilar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'e, 'f, 'g)val result = left.intersect(right)
IntersectAllBatchSimilar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'e, 'f, 'g)val result = left.intersectAll(right)
MinusBatchSimilar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'a, 'b, 'c)val result = left.minus(right)
MinusAllBatchSimilar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'a, 'b, 'c)val result = left.minusAll(right)
InBatchStreamingSimilar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
  1. val left = ds1.toTable(tableEnv, 'a, 'b, 'c)val right = ds2.toTable(tableEnv, 'a)val result = left.select('a, 'b, 'c).where('a.in(right))
Note: For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details.

OrderBy,Offset&Fetch

算子描述
Order By批次与SQL ORDER BY子句类似。返回跨所有并行分区全局排序的记录。
  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");Table result = in.orderBy("a.asc");
Offset & Fetch批次类似于SQL OFFSET和FETCH子句。偏移和提取限制从排序结果返回的记录数。Offset和Fetch在技术上是Order By 算子的一部分,因此必须以它为前缀。
  1. Table in = tableEnv.fromDataSet(ds, "a, b, c");// returns the first 5 records from the sorted resultTable result1 = in.orderBy("a.asc").fetch(5); // skips the first 3 records and returns all following records from the sorted resultTable result2 = in.orderBy("a.asc").offset(3);// skips the first 10 records and returns the next 5 records from the sorted resultTable result3 = in.orderBy("a.asc").offset(10).fetch(5);
OperatorsDescription
Order ByBatchSimilar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.
  1. val in = ds.toTable(tableEnv, 'a, 'b, 'c)val result = in.orderBy('a.asc)
Offset & FetchBatchSimilar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.
  1. val in = ds.toTable(tableEnv, 'a, 'b, 'c)// returns the first 5 records from the sorted resultval result1: Table = in.orderBy('a.asc).fetch(5)// skips the first 3 records and returns all following records from the sorted resultval result2: Table = in.orderBy('a.asc).offset(3)// skips the first 10 records and returns the next 5 records from the sorted resultval result3: Table = in.orderBy('a.asc).offset(10).fetch(5)

Insert

算子描述
Insert批量流处理类似于SQL查询中的INSERT INTO子句。执行插入已注册的输出表。输出表必须在TableEnvironment中注册(请参阅注册TableSink)。此外,已注册表的模式必须与查询的模式匹配。
  1. Table orders = tableEnv.scan("Orders");orders.insertInto("OutOrders");
OperatorsDescription
Insert IntoBatchStreamingSimilar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.Output tables must be registered in the TableEnvironment (see Register a TableSink). Moreover, the schema of the registered table must match the schema of the query.
  1. val orders: Table = tableEnv.scan("Orders")orders.insertInto("OutOrders")

GroupWindows

组窗口根据时间或行计数间隔将行组聚合为有限组,并按组评估聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。

Windows是使用该window(w: Window)子句定义的,需要使用该as子句指定的别名为了按窗口对表进行分组,必须在groupBy(…)子句中引用窗口别名,就像常规分组属性一样。以下示例显示如何在表上定义窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w") // group the table by window w
  4. .select("b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w) // group the table by window w
  4. .select('b.sum) // aggregate

在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即groupBy(…)子句引用窗口别名和至少一个附加属性。一个groupBy(…)仅引用一个窗口别名(如在上面的例子)子句只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, b.sum"); // aggregate
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'b.sum) // aggregate

窗口属性诸如开始,结束,或一个时间窗口的rowtime时间戳可以在选择语句被添加为窗口别名作为的一个属性w.startw.end以及w.rowtime分别。窗口开始和行时间戳是包含的低窗口和超窗口边界。相反,窗口结束时间戳是独占的上窗口边界。例如,从下午2点开始的30分钟的翻滚窗口将具有14:00:00.000开始时间戳,14:29:59.999行时间戳和14:30:00.000结束时间戳。

  1. Table table = input
  2. .window([Window w].as("w")) // define window with alias w
  3. .groupBy("w, a") // group the table by attribute a and window w
  4. .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  1. val table = input
  2. .window([w: Window] as 'w) // define window with alias w
  3. .groupBy('w, 'a) // group the table by attribute a and window w
  4. .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps

Window参数定义行如何映射到窗口。Window不是用户可以实现的接口。相反, Table API提供了一组Window具有特定语义的预定义类,这些类被转换为底层DataStreamDataSet 算子操作。支持的窗口定义如下所示。

翻滚(翻滚的Windows)

翻滚窗口将行分配给固定长度的非重叠连续窗口。例如,5分钟的翻滚窗口以5分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义翻滚窗口。

使用Tumble定义翻滚窗口如下:

方法描述
over定义窗口的长度,可以是时间或行计数间隔。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Tumbling Event-time Window
  2. .window(Tumble.over("10.minutes").on("rowtime").as("w"));
  3. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Tumble.over("10.minutes").on("proctime").as("w"));
  5. // Tumbling Row-count Window (assuming a processing-time attribute "proctime")
  6. .window(Tumble.over("10.rows").on("proctime").as("w"));
  1. // Tumbling Event-time Window
  2. .window(Tumble over 10.minutes on 'rowtime as 'w)
  3. // Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Tumble over 10.minutes on 'proctime as 'w)
  5. // Tumbling Row-count Window (assuming a processing-time attribute "proctime")
  6. .window(Tumble over 10.rows on 'proctime as 'w)

滑动(滑动窗口)

滑动窗口具有固定大小,并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,15分钟大小和5分钟滑动间隔的滑动窗口将每行分配给3个不同的15分钟大小的窗口,这些窗口以5分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。

滑动窗口使用Slide如下类定义

方法描述
over定义窗口的长度,可以是时间或行计数间隔。
every定义滑动间隔,可以是时间间隔也可以是行计数间隔。滑动间隔必须与大小间隔的类型相同。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Sliding Event-time Window
  2. .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
  3. // Sliding Processing-time window (assuming a processing-time attribute "proctime")
  4. .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
  5. // Sliding Row-count window (assuming a processing-time attribute "proctime")
  6. .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  1. // Sliding Event-time Window
  2. .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
  3. // Sliding Processing-time window (assuming a processing-time attribute "proctime")
  4. .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
  5. // Sliding Row-count window (assuming a processing-time attribute "proctime")
  6. .window(Slide over 10.rows every 5.rows on 'proctime as 'w)

会话(会话窗口)

会话窗口没有固定的大小,但它们的边界由不活动的间隔定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。例如,如果在30分钟不活动后观察到一行,则会开始一个30分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在30分钟内未添加任何行,则会关闭。会话窗口可以在事件时间或处理时间上工作。

使用Session定义会话窗口,如下所示:

方法描述
withGap将两个窗口之间的间隔定义为时间间隔。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何Long或Timestamp属性。对于流式查询,这必须是声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下groupBy()子句中的窗口,并可选择子句中选择窗口属性,如窗口开始,结束或行时间戳select()
  1. // Session Event-time Window
  2. .window(Session.withGap("10.minutes").on("rowtime").as("w"));
  3. // Session Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Session.withGap("10.minutes").on("proctime").as("w"));
  1. // Session Event-time Window
  2. .window(Session withGap 10.minutes on 'rowtime as 'w)
  3. // Session Processing-time Window (assuming a processing-time attribute "proctime")
  4. .window(Session withGap 10.minutes on 'proctime as 'w)

OverWindows

从标准SQL(OVER子句)已知过窗口聚合,并在SELECT查询子句中定义GROUP BY子句中指定的组窗口不同,在窗口上不会折叠行。而是通过窗口聚合计算每个输入行在其相邻行的范围内的聚合。

OverWindows是使用window(w: OverWindow*)子句定义的,并通过select()方法中的别名引用以下示例显示如何在表上定义过窗口聚合。

  1. Table table = input
  2. .window([OverWindow w].as("w")) // define over window with alias w
  3. .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
  1. val table = input
  2. .window([w: OverWindow] as 'w) // define over window with alias w
  3. .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w

OverWindow限定范围在其聚集计算的行。OverWindow不是用户可以实现的接口。相反, Table API提供了Over用于配置覆盖窗口属性类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口。支持的窗口定义在Over(和其他类)上公开为方法,如下所示:

方法Required描述
partitionByOptional定义一个或多个属性上的输入分区。每个分区都单独排序,聚合函数分别应用于每个分区。注意:在流式环境中,如果窗口包含partition by子句,则只能并行计算窗口聚合。no / notpartitionBy(…)流由单个非并行任务处理。
orderByRequired定义每个分区中行的顺序,从而定义聚合函数应用于行的顺序。注意:对于流式查询,这必须是声明的事件时间或处理时间属性目前,仅支持单个排序属性。
precedingRequired定义窗口中包含的行的间隔,并在当前行之前。间隔可以指定为时间或行计数间隔。在窗口上限定具有间隔的大小,例如,10.minutes对于时间间隔或10.rows行计数间隔。使用常量(即,UNBOUNDED_RANGE时间间隔或UNBOUNDED_ROW行计数间隔)指定在窗口无界限在Windows上无限制地从分区的第一行开始。
followingOptional定义窗口中包含的行的窗口间隔,并跟随当前行。必须在与前一个间隔(时间或行计数)相同的单位中指定间隔。目前,不支持在当前行之后包含行的窗口。相反,您可以指定两个常量之一:- CURRENT_ROW 将窗口的上限设置为当前行。- CURRENT_RANGE 设置窗口的上限以对当前行的键进行排序,即窗口中包含与当前行具有相同排序键的所有行。如果following省略子句,则将时间间隔窗口CURRENT_RANGE的上限定义为,并将行计数间隔窗口的上限定义为CURRENT_ROW
asRequired为覆盖窗口指定别名。别名用于引用以下select()子句中的over window

注意:目前,同一select()调用中的所有聚合函数必须计算相同的窗口。

在Windows上无限制

  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
  3. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
  5. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
  7. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
  1. // Unbounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
  3. // Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)
  5. // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
  7. // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

限制在Windows上

  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
  3. // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
  5. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
  7. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
  1. // Bounded Event-time over window (assuming an event-time attribute "rowtime")
  2. .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)
  3. // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
  4. .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)
  5. // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
  6. .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)
  7. // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
  8. .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

数据类型

Table API建立在Flink的DataSet和DataStream API之上。在内部,它还使用Flink TypeInformation来定义数据类型。完全支持的类型列在org.apache.flink.table.api.Types下表总结了 Table API类型,SQL类型和生成的Java类之间的关系。

Table APISQLJava类型
Types.STRINGVARCHARjava.lang.String
Types.BOOLEANBOOLEANjava.lang.Boolean
Types.BYTETINYINTjava.lang.Byte
Types.SHORTSMALLINTjava.lang.Short
Types.INTINTEGER, INTjava.lang.Integer
Types.LONGBIGINTjava.lang.Long
Types.FLOATREAL, FLOATjava.lang.Float
Types.DOUBLEDOUBLEjava.lang.Double
Types.DECIMALDECIMALjava.math.BigDecimal
Types.SQL_DATEDATEjava.sql.Date
Types.SQL_TIMETIMEjava.sql.Time
Types.SQL_TIMESTAMPTIMESTAMP(3)java.sql.Timestamp
Types.INTERVAL_MONTHSINTERVAL YEAR TO MONTHjava.lang.Integer
Types.INTERVAL_MILLISINTERVAL DAY TO SECOND(3)java.lang.Long
Types.PRIMITIVE_ARRAYARRAY例如 int[]
Types.OBJECT_ARRAYARRAY例如 java.lang.Byte[]
Types.MAPMAPjava.util.HashMap
Types.MULTISETMULTISET例如,java.util.HashMap<String, Integer>对于多重集合String
Types.ROWROWorg.apache.flink.types.Row

通用类型和(嵌套)复合类型(例如,POJO,元组,Row,Scala案例类)也可以是行的字段。

可以使用值访问函数访问具有任意嵌套的复合类型的字段

通用类型被视为黑盒子,可以由用户定义的函数传递或处理

表达式语法

前面部分中的一些 算子需要一个或多个表达式。可以使用嵌入式Scala DSL或字符串指定表达式。请参阅上面的示例以了解如何指定表达式。

这是表达式的EBNF语法:

  1. expressionList = expression , { "," , expression } ;
  2. expression = timeIndicator | overConstant | alias ;
  3. alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;
  4. logic = comparison , [ ( "&&" | "||" ) , comparison ] ;
  5. comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;
  6. term = product , [ ( "+" | "-" ) , product ] ;
  7. product = unary , [ ( "*" | "/" | "%") , unary ] ;
  8. unary = [ "!" | "-" ] , composite ;
  9. composite = over | nullLiteral | suffixed | atom ;
  10. suffixed = interval | cast | as | if | functionCall ;
  11. interval = timeInterval | rowInterval ;
  12. timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;
  13. rowInterval = composite , "." , "rows" ;
  14. cast = composite , ".cast(" , dataType , ")" ;
  15. dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;
  16. as = composite , ".as(" , fieldReference , ")" ;
  17. if = composite , ".?(" , expression , "," , expression , ")" ;
  18. functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;
  19. atom = ( "(" , expression , ")" ) | literal | fieldReference ;
  20. fieldReference = "*" | identifier ;
  21. nullLiteral = "Null(" , dataType , ")" ;
  22. timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;
  23. timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;
  24. over = composite , "over" , fieldReference ;
  25. overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;
  26. timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;

这里literal是一个有效的Java文字,fieldReference指定数据中的列(或使用的所有列*),并functionIdentifier指定支持的标量函数。列名和函数名遵循Java标识符语法。指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用 算子和函数。

如果需要使用精确数值或大小数, Table API也支持JavaBigDecimal类型。在Scala Table API中,小数可以BigDecimal("123456")通过附加“p” 来定义,也可以在Java中定义,例如123456p

为了使用时态值, Table API支持Java SQL的Date,Time和Timestamp类型。在Scala的 Table API文本可以通过使用来定义java.sql.Date.valueOf("2016-06-27")java.sql.Time.valueOf("10:10:42")java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")在Java和Scala Table API还支持调用"2016-06-27".toDate()"10:10:42".toTime()以及"2016-06-27 10:10:42.123".toTimestamp()对String转化成时间类型。注意:由于Java临时SQL类型与时区有关,因此请确保Flink Client和所有TaskManagers使用相同的时区。

时间间隔可以表示为月数(Types.INTERVAL_MONTHS)或毫秒数(Types.INTERVAL_MILLIS)。可以添加或减去相同类型的间隔(例如1.hour + 10.minutes)。可以将毫秒的间隔添加到时间点(例如"2016-08-10".toDate + 5.days)。