概念和通用API

Table API和SQL集成在一个联合API中。此API的核心概念是Table用作查询的输入和输出。本文档显示了具有 Table API和SQL查询的程序的常见结构,如何注册Table,如何查询Table以及如何发出Table

Table API和SQL程序的结构

批处理和流式传输的所有 Table API和SQL程序都遵循相同的模式。以下代码示例显示了 Table API和SQL程序的常见结构。

  1. // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. // create a TableEnvironment
  4. // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
  5. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  6. // register a Table
  7. tableEnv.registerTable("table1", ...) // or
  8. tableEnv.registerTableSource("table2", ...); // or
  9. tableEnv.registerExternalCatalog("extCat", ...);
  10. // create a Table from a Table API query
  11. Table tapiResult = tableEnv.scan("table1").select(...);
  12. // create a Table from a SQL query
  13. Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
  14. // emit a Table API result Table to a TableSink, same for SQL result
  15. tapiResult.writeToSink(...);
  16. // execute
  17. env.execute();
  1. // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. // create a TableEnvironment
  4. val tableEnv = TableEnvironment.getTableEnvironment(env)
  5. // register a Table
  6. tableEnv.registerTable("table1", ...) // or
  7. tableEnv.registerTableSource("table2", ...) // or
  8. tableEnv.registerExternalCatalog("extCat", ...)
  9. // create a Table from a Table API query
  10. val tapiResult = tableEnv.scan("table1").select(...)
  11. // Create a Table from a SQL query
  12. val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
  13. // emit a Table API result Table to a TableSink, same for SQL result
  14. tapiResult.writeToSink(...)
  15. // execute
  16. env.execute()

注意: Table API和SQL查询可以轻松集成并嵌入到DataStream或DataSet程序中。查看与DataStream和DataSet API集成部分,了解如何将DataStream和DataSet转换为Tables,反之亦然。

创建一个TableEnvironment

TableEnvironment是 Table API和SQL集成的核心概念。它负责:

  • Table在内部目录中注册a
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的(标量,表或聚合)函数
  • 转换a DataStreamDataSet转换为aTable
  • 持有对ExecutionEnvironment或的引用StreamExecutionEnvironmentA Table总是与特定的约束TableEnvironment不可能在同一查询中组合不同TableEnvironments的表,例如,关联或联合它们。

TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironmentExecutionEnvironment与可选的TableConfigTableConfig可用于配置TableEnvironment或定制查询优化和翻译过程(参见查询优化)。

  1. // ***************
  2. // STREAMING QUERY
  3. // ***************
  4. StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  5. // create a TableEnvironment for streaming queries
  6. StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);
  7. // ***********
  8. // BATCH QUERY
  9. // ***********
  10. ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
  11. // create a TableEnvironment for batch queries
  12. BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
  1. // ***************
  2. // STREAMING QUERY
  3. // ***************
  4. val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
  5. // create a TableEnvironment for streaming queries
  6. val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
  7. // ***********
  8. // BATCH QUERY
  9. // ***********
  10. val bEnv = ExecutionEnvironment.getExecutionEnvironment
  11. // create a TableEnvironment for batch queries
  12. val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在目录中注册表

A TableEnvironment维护按名称注册的表的目录。有两种类型的表,输入表输出表输入表可以在 Table API和SQL查询中引用,并提供输入数据。输出表可用于将 Table API或SQL查询的结果发送到外部系统。

可以从各种来源注册输入表:

  • 现有Table对象,通常是 Table API或SQL查询的结果。
  • a TableSource,访问外部数据,例如文件,数据库或消息传递系统。
  • a DataStreamDataSet来自DataStream或DataSet程序。注册一个DataStreamDataSet与DataStream和DataSet API集成中讨论。可以使用a注册输出表TableSink

注册表

A TableTableEnvironment以下注册

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table is the result of a simple projection query
  4. Table projTable = tableEnv.scan("X").select(...);
  5. // register the Table projTable as table "projectedX"
  6. tableEnv.registerTable("projectedTable", projTable);
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // Table is the result of a simple projection query
  4. val projTable: Table = tableEnv.scan("X").select(...)
  5. // register the Table projTable as table "projectedX"
  6. tableEnv.registerTable("projectedTable", projTable)

注意:注册Table的处理方式VIEW与关系数据库系统中已知的类似,即定义的查询Table未经优化,但在另一个查询引用已注册的内容时将内联Table如果多个查询引用相同的注册Table,这将被内联的每个引用的查询和执行多次,即,注册的结果Table被共享。

注册TableSource

A TableSource提供对外部数据的访问,存储在存储系统中,例如数据库(MySQL,HBase,…),具有特定编码的文件(CSV,Apache [Parquet,Avro,ORC] ……)或消息系统(Apache Kafka,RabbitMQ,……)。

Flink旨在为常见的数据格式和存储系统提供TableSource。请查看Table Sources和Sinks页面,获取受支持的TableSource列表以及如何构建自定义的说明TableSource

A TableSourceTableEnvironment以下注册

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create a TableSource
  4. TableSource csvSource = new CsvTableSource("/path/to/file", ...);
  5. // register the TableSource as table "CsvTable"
  6. tableEnv.registerTableSource("CsvTable", csvSource);
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // create a TableSource
  4. val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
  5. // register the TableSource as table "CsvTable"
  6. tableEnv.registerTableSource("CsvTable", csvSource)

注册TableSink

已注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache [Parquet] ,Avro,ORC],……)。

Flink旨在为常见的数据格式和存储系统提供TableSink。有关可用接收的详细信息以及有关如何实现自定义的说明,请参阅有关“ 表源和接收器”页面的文档TableSink

A TableSinkTableEnvironment以下注册

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create a TableSink
  4. TableSink csvSink = new CsvTableSink("/path/to/file", ...);
  5. // define the field names and types
  6. String[] fieldNames = {"a", "b", "c"};
  7. TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
  8. // register the TableSink as table "CsvSinkTable"
  9. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // create a TableSink
  4. val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
  5. // define the field names and types
  6. val fieldNames: Array[String] = Array("a", "b", "c")
  7. val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
  8. // register the TableSink as table "CsvSinkTable"
  9. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注册外部目录

外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。

可以通过实现ExternalCatalog接口创建外部目录,并在TableEnvironment以下内容中注册

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // create an external catalog
  4. ExternalCatalog catalog = new InMemoryExternalCatalog();
  5. // register the ExternalCatalog catalog
  6. tableEnv.registerExternalCatalog("InMemCatalog", catalog);
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // create an external catalog
  4. val catalog: ExternalCatalog = new InMemoryExternalCatalog
  5. // register the ExternalCatalog catalog
  6. tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在a中注册,就可以通过指定其完整路径,例如,从 Table API或SQL查询中访问TableEnvironmenta中定义的所有表ExternalCatalogcatalog.database.table

目前,Flink提供了InMemoryExternalCatalog演示和测试目的。但是,该ExternalCatalog接口也可用于将目录(如HCatalog或Metastore)连接到 Table API。

查询表

Table API

Table API是Scala和Java语言集成查询API。与SQL相反,查询未指定为字符串,而是在宿主语言中逐步组合。

API基于Table表示表(流或批处理)类,并提供应用关系 算子操作的方法。这些方法返回一个新Table对象,它表示在输入上应用关系运算的结果Table一些关系 算子操作由多个方法调用组成,例如table.groupBy(…).select(),其中groupBy(…)指定了分组table,以及select(…)对分组的Projectiontable

Table API文档介绍了支持流处理和批次表中的所有 Table API 算子操作。

以下示例显示了一个简单的 Table API聚合查询:

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register Orders table
  4. // scan registered Orders table
  5. Table orders = tableEnv.scan("Orders");
  6. // compute revenue for all customers from France
  7. Table revenue = orders
  8. .filter("cCountry === 'FRANCE'")
  9. .groupBy("cID, cName")
  10. .select("cID, cName, revenue.sum AS revSum");
  11. // emit or convert Table
  12. // execute query
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // register Orders table
  4. // scan registered Orders table
  5. val orders = tableEnv.scan("Orders")
  6. // compute revenue for all customers from France
  7. val revenue = orders
  8. .filter('cCountry === "FRANCE")
  9. .groupBy('cID, 'cName)
  10. .select('cID, 'cName, 'revenue.sum AS 'revSum)
  11. // emit or convert Table
  12. // execute query

Note: The Scala Table API uses Scala Symbols, which start with a single tick (') to reference the attributes of a Table. The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala. and org.apache.flink.table.api.scala. in order to use Scala implicit conversions.

SQL

Flink的SQL集成基于Apache Calcite,它实现了SQL标准。SQL查询被指定为常规字符串。

SQL文件描述Flink的流处理和批量表的SQL支持。

以下示例显示如何指定查询并将结果作为a返回Table

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register Orders table
  4. // compute revenue for all customers from France
  5. Table revenue = tableEnv.sqlQuery(
  6. "SELECT cID, cName, SUM(revenue) AS revSum " +
  7. "FROM Orders " +
  8. "WHERE cCountry = 'FRANCE' " +
  9. "GROUP BY cID, cName"
  10. );
  11. // emit or convert Table
  12. // execute query
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // register Orders table
  4. // compute revenue for all customers from France
  5. val revenue = tableEnv.sqlQuery("""
  6. |SELECT cID, cName, SUM(revenue) AS revSum
  7. |FROM Orders
  8. |WHERE cCountry = 'FRANCE'
  9. |GROUP BY cID, cName
  10. """.stripMargin)
  11. // emit or convert Table
  12. // execute query

以下示例说明如何指定将其结果插入已注册表的更新查询。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // register "Orders" table
  4. // register "RevenueFrance" output table
  5. // compute revenue for all customers from France and emit to "RevenueFrance"
  6. tableEnv.sqlUpdate(
  7. "INSERT INTO RevenueFrance " +
  8. "SELECT cID, cName, SUM(revenue) AS revSum " +
  9. "FROM Orders " +
  10. "WHERE cCountry = 'FRANCE' " +
  11. "GROUP BY cID, cName"
  12. );
  13. // execute query
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // register "Orders" table
  4. // register "RevenueFrance" output table
  5. // compute revenue for all customers from France and emit to "RevenueFrance"
  6. tableEnv.sqlUpdate("""
  7. |INSERT INTO RevenueFrance
  8. |SELECT cID, cName, SUM(revenue) AS revSum
  9. |FROM Orders
  10. |WHERE cCountry = 'FRANCE'
  11. |GROUP BY cID, cName
  12. """.stripMargin)
  13. // execute query

混合 Table API和SQL

Table API和SQL查询可以轻松混合,因为它们都返回Table对象:

  • 可以在TableSQL查询返回的对象上定义 Table API 查询。
  • 通过在SQL查询的子句中注册生成的表并在其中TableEnvironment引用它,可以在 Table API查询的结果上定义FROMSQL查询。

发射暴露一张表

一个Table由它写入发出TableSinkA TableSink是支持各种文件格式(例如CSV,Apache Parquet,Apache Avro),存储系统(例如,JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息传递系统(例如,Apache Kafka,RabbitMQ)的通用接口)。

批处理Table只能写入a BatchTableSink,而流式处理Table需要a AppendStreamTableSink,a RetractStreamTableSink或an UpsertStreamTableSink

有关可用接收的详细信息以及如何实现自定义的说明,请参阅有关表源和接收器的文档TableSink

有两种方法可以发出表格:

  • Table.writeToSink(TableSink sink)方法使用提供的方法发出表,TableSink并使用要发出的表的模式自动配置接收器。
  • Table.insertInto(String sinkTable)方法查找TableSinkTableEnvironment目录中提供的名称下使用特定模式注册的方法。要发出的表的模式根据已注册的模式进行验证TableSink以下示例显示如何发出Table

  • Java

  • Scala
  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // compute a result Table using Table API operators and/or SQL queries
  4. Table result = ...
  5. // create a TableSink
  6. TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
  7. // METHOD 1:
  8. // Emit the result Table to the TableSink via the writeToSink() method
  9. result.writeToSink(sink);
  10. // METHOD 2:
  11. // Register the TableSink with a specific schema
  12. String[] fieldNames = {"a", "b", "c"};
  13. TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
  14. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
  15. // Emit the result Table to the registered TableSink via the insertInto() method
  16. result.insertInto("CsvSinkTable");
  17. // execute the program
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // compute a result Table using Table API operators and/or SQL queries
  4. val result: Table = ...
  5. // create a TableSink
  6. val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
  7. // METHOD 1:
  8. // Emit the result Table to the TableSink via the writeToSink() method
  9. result.writeToSink(sink)
  10. // METHOD 2:
  11. // Register the TableSink with a specific schema
  12. val fieldNames: Array[String] = Array("a", "b", "c")
  13. val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
  14. tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
  15. // Emit the result Table to the registered TableSink via the insertInto() method
  16. result.insertInto("CsvSinkTable")
  17. // execute the program

翻译并执行查询

Table API和SQL查询将转换为DataStreamDataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

  • 优化逻辑计划,
  • 转换为DataStream或DataSet程序。在以下情况下转换 Table API或SQL查询:

  • a Table被发射到a TableSink,即何时Table.writeToSink()或被Table.insertInto()称为。

  • 指定了SQL更新查询,即TableEnvironment.sqlUpdate()调用时。
  • a Table转换为a DataStreamDataSet(请参阅与DataStream和DataSet API集成)。一旦翻译,一 Table API或SQL查询像一个普通的数据流中或数据集处理程序,当被执行StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()被调用。

与DataStream和DataSet API集成

Table API和SQL查询可以轻松集成并嵌入到DataStreamDataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或关联元数据,然后使用DataStream或进一步处理数据。 DataSet API(以及在这些API之上构建的任何库,例如CEP或Gelly)。相反, Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。

这种相互作用可以通过将a DataStreamDataSeta 转换为a 来实现,Table反之亦然。在本节中,我们将介绍如何完成这些转换。

Scala的隐式转换

Scala Table API函数的隐式转换DataSetDataStream以及Table类。org.apache.flink.table.api.scala.除了org.apache.flink.api.scala.Scala DataStream API 之外,还可以通过导入包来启用这些转换

将DataStream或DataSet注册为表

A DataStreamDataSet可以在TableEnvironment表中注册结果表的模式取决于已注册DataStream的数据类型DataSet有关详细信息,请查看有关将数据类型映射到表架构的部分。

  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  4. DataStream<Tuple2<Long, String>> stream = ...
  5. // register the DataStream as Table "myTable" with fields "f0", "f1"
  6. tableEnv.registerDataStream("myTable", stream);
  7. // register the DataStream as table "myTable2" with fields "myLong", "myString"
  8. tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent
  3. val tableEnv = TableEnvironment.getTableEnvironment(env)
  4. val stream: DataStream[(Long, String)] = ...
  5. // register the DataStream as Table "myTable" with fields "f0", "f1"
  6. tableEnv.registerDataStream("myTable", stream)
  7. // register the DataStream as table "myTable2" with fields "myLong", "myString"
  8. tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

注意:a的名称DataStream Table必须与^DataStreamTable[0-9]+模式不匹配,并且a的名称DataSet Table必须与^DataSetTable[0-9]+模式不匹配这些模式仅供内部使用。

将DataStream或DataSet转换为表

它也可以直接转换为a 而不是注册a DataStreamDataSetin 如果要在 Table API查询中使用Table,这很方便。TableEnvironmentTable

  1. // get StreamTableEnvironment
  2. // registration of a DataSet in a BatchTableEnvironment is equivalent
  3. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  4. DataStream<Tuple2<Long, String>> stream = ...
  5. // Convert the DataStream into a Table with default fields "f0", "f1"
  6. Table table1 = tableEnv.fromDataStream(stream);
  7. // Convert the DataStream into a Table with fields "myLong", "myString"
  8. Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent
  3. val tableEnv = TableEnvironment.getTableEnvironment(env)
  4. val stream: DataStream[(Long, String)] = ...
  5. // convert the DataStream into a Table with default fields '_1, '_2
  6. val table1: Table = tableEnv.fromDataStream(stream)
  7. // convert the DataStream into a Table with fields 'myLong, 'myString
  8. val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

将表转换为DataStream或DataSet

A Table可以转换为DataStreamDataSet通过这种方式,可以在 Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。

当转换一个TableDataStreamDataSet,需要指定将所得的数据类型DataStreamDataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row以下列表概述了不同选项的函数:

  • Row:字段按位置,任意数量的字段映射,支持null值,无类型安全访问。
  • POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
  • Case Class:字段按位置映射,不支持null值,类型安全访问。
  • 元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
  • 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

将表转换为DataStream

一个Table是流处理查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。

将a转换Table为a 有两种模式DataStream

  • 追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
  • 缩进模式:始终可以使用此模式。它用标志编码INSERTDELETE改变boolean
  • Java
  • Scala
  1. // get StreamTableEnvironment.
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into an append DataStream of Row by specifying the class
  6. DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
  7. // convert the Table into an append DataStream of Tuple2<String, Integer>
  8. // via a TypeInformation
  9. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  10. Types.STRING(),
  11. Types.INT());
  12. DataStream<Tuple2<String, Integer>> dsTuple =
  13. tableEnv.toAppendStream(table, tupleType);
  14. // convert the Table into a retract DataStream of Row.
  15. // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
  16. // The boolean field indicates the type of the change.
  17. // True is INSERT, false is DELETE.
  18. DataStream<Tuple2<Boolean, Row>> retractStream =
  19. tableEnv.toRetractStream(table, Row.class);
  1. // get TableEnvironment.
  2. // registration of a DataSet is equivalent
  3. val tableEnv = TableEnvironment.getTableEnvironment(env)
  4. // Table with two fields (String name, Integer age)
  5. val table: Table = ...
  6. // convert the Table into an append DataStream of Row
  7. val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
  8. // convert the Table into an append DataStream of Tuple2[String, Int]
  9. val dsTuple: DataStream[(String, Int)] dsTuple =
  10. tableEnv.toAppendStream[(String, Int)](table)
  11. // convert the Table into a retract DataStream of Row.
  12. // A retract stream of type X is a DataStream[(Boolean, X)].
  13. // The boolean field indicates the type of the change.
  14. // True is INSERT, false is DELETE.
  15. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有关动态表及其属性的详细讨论,请参阅Streaming Queries文档。

将表转换为DataSet

A Table转换DataSet为如下:

  1. // get BatchTableEnvironment
  2. BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Table with two fields (String name, Integer age)
  4. Table table = ...
  5. // convert the Table into a DataSet of Row by specifying a class
  6. DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
  7. // convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
  8. TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  9. Types.STRING(),
  10. Types.INT());
  11. DataSet<Tuple2<String, Integer>> dsTuple =
  12. tableEnv.toDataSet(table, tupleType);
  1. // get TableEnvironment
  2. // registration of a DataSet is equivalent
  3. val tableEnv = TableEnvironment.getTableEnvironment(env)
  4. // Table with two fields (String name, Integer age)
  5. val table: Table = ...
  6. // convert the Table into a DataSet of Row
  7. val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
  8. // convert the Table into a DataSet of Tuple2[String, Int]
  9. val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

将数据类型映射到表模式

Flink的DataStream和DataSet API支持各种类型。复合类型(如Tuples(内置Scala和Flink Java元组),POJO,Scala案例类和Flink的Row类型)允许嵌套数据结构具有可在表表达式中访问的多个字段。其他类型被视为原子类型。在下文中,我们将描述 Table API如何将这些类型转换为内部行表示,并显示将a转换DataStream为a的示例Table

数据类型到表模式的映射可以以两种方式发生:基于字段位置基于字段名称

基于位置的映射

基于位置的映射可用于在保持字段顺序的同时为字段提供更有意义的名称。此映射可用于具有已定义字段顺序和原子类型的复合数据类型。复合数据类型(如元组,行和案例类)具有此类字段顺序。但是,必须根据字段名称映射POJO的字段(请参阅下一节)。

定义基于位置的映射时,指定的名称不得存在于输入数据类型中,否则API将假定映射应基于字段名称进行。如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者f0使用原子类型。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, Integer>> stream = ...
  4. // convert DataStream into Table with default field names "f0" and "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field names "myLong" and "myInt"
  7. Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[(Long, Int)] = ...
  4. // convert DataStream into Table with default field names "_1" and "_2"
  5. val table: Table = tableEnv.fromDataStream(stream)
  6. // convert DataStream into Table with field names "myLong" and "myInt"
  7. val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt)

基于名称的映射

基于名称的映射可用于任何数据类型,包括POJO。它是定义表模式映射的最灵活方式。映射中的所有字段都按名称引用,并且可能使用别名重命名as字段可以重新排序和预测。

如果未指定字段名称,则使用复合类型的默认字段名称和字段顺序,或者f0使用原子类型。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, Integer>> stream = ...
  4. // convert DataStream into Table with default field names "f0" and "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field "f1" only
  7. Table table = tableEnv.fromDataStream(stream, "f1");
  8. // convert DataStream into Table with swapped fields
  9. Table table = tableEnv.fromDataStream(stream, "f1, f0");
  10. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
  11. Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[(Long, Int)] = ...
  4. // convert DataStream into Table with default field names "_1" and "_2"
  5. val table: Table = tableEnv.fromDataStream(stream)
  6. // convert DataStream into Table with field "_2" only
  7. val table: Table = tableEnv.fromDataStream(stream, '_2)
  8. // convert DataStream into Table with swapped fields
  9. val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
  10. // convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
  11. val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)

原子类型

Flink认为原语(IntegerDoubleString)或通用类型作为原子类型(无法进行分析和分解类型)。A DataStreamDataSet原子类型转换为Table具有单个属性的a。从原子类型推断属性的类型,并且可以指定属性的名称。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Long> stream = ...
  4. // convert DataStream into Table with default field name "f0"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with field name "myLong"
  7. Table table = tableEnv.fromDataStream(stream, "myLong");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[Long] = ...
  4. // convert DataStream into Table with default field name "f0"
  5. val table: Table = tableEnv.fromDataStream(stream)
  6. // convert DataStream into Table with field name "myLong"
  7. val table: Table = tableEnv.fromDataStream(stream, 'myLong)

元组(Scala和Java)和案例类(仅限Scala)

Flink支持Scala的内置元组,并为Java提供自己的元组类。两种元组的DataStream和DataSet都可以转换为表。可以通过为所有字段提供名称(基于位置的映射)来重命名字段。如果未指定字段名称,则使用默认字段名称。如果原始字段名(f0f1,…为Flink元组和_1_2…Scala元组)被引用时,API假设映射,而不是基于位置的基于域名的。基于名称的映射允许使用别名(as重新排序字段和Projection

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Long, String>> stream = ...
  4. // convert DataStream into Table with default field names "f0", "f1"
  5. Table table = tableEnv.fromDataStream(stream);
  6. // convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
  7. Table table = tableEnv.fromDataStream(stream, "myLong, myString");
  8. // convert DataStream into Table with reordered fields "f1", "f0" (name-based)
  9. Table table = tableEnv.fromDataStream(stream, "f1, f0");
  10. // convert DataStream into Table with projected field "f1" (name-based)
  11. Table table = tableEnv.fromDataStream(stream, "f1");
  12. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
  13. Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. val stream: DataStream[(Long, String)] = ...
  4. // convert DataStream into Table with renamed default field names '_1, '_2
  5. val table: Table = tableEnv.fromDataStream(stream)
  6. // convert DataStream into Table with field names "myLong", "myString" (position-based)
  7. val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
  8. // convert DataStream into Table with reordered fields "_2", "_1" (name-based)
  9. val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
  10. // convert DataStream into Table with projected field "_2" (name-based)
  11. val table: Table = tableEnv.fromDataStream(stream, '_2)
  12. // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
  13. val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
  14. // define case class
  15. case class Person(name: String, age: Int)
  16. val streamCC: DataStream[Person] = ...
  17. // convert DataStream into Table with default field names 'name, 'age
  18. val table = tableEnv.fromDataStream(streamCC)
  19. // convert DataStream into Table with field names 'myName, 'myAge (position-based)
  20. val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
  21. // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
  22. val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO(Java和Scala)

Flink支持POJO作为复合类型。这里记录了决定POJO的规则

当转换一个POJO DataStreamDataSetTable没有指定字段名,则使用原始POJO字段的名称。名称映射需要原始名称,不能通过职位来完成。可以使用别名(使用as关键字),重新排序和Projection来重命名字段

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // Person is a POJO with fields "name" and "age"
  4. DataStream<Person> stream = ...
  5. // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
  6. Table table = tableEnv.fromDataStream(stream);
  7. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
  8. Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
  9. // convert DataStream into Table with projected field "name" (name-based)
  10. Table table = tableEnv.fromDataStream(stream, "name");
  11. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  12. Table table = tableEnv.fromDataStream(stream, "name as myName");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // Person is a POJO with field names "name" and "age"
  4. val stream: DataStream[Person] = ...
  5. // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
  6. val table: Table = tableEnv.fromDataStream(stream)
  7. // convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
  8. val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
  9. // convert DataStream into Table with projected field "name" (name-based)
  10. val table: Table = tableEnv.fromDataStream(stream, 'name)
  11. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  12. val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row数据类型支持字段和字段与任意数量的null值。字段名称可以通过指定RowTypeInfo或转化时Row DataStreamDataSetTable行类型支持按位置和名称映射字段。可以通过为所有字段提供名称(基于位置的映射)或为Projection/排序/重命名(基于名称的映射)单独选择字段来重命名字段。

  1. // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
  2. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
  3. // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
  4. DataStream<Row> stream = ...
  5. // convert DataStream into Table with default field names "name", "age"
  6. Table table = tableEnv.fromDataStream(stream);
  7. // convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
  8. Table table = tableEnv.fromDataStream(stream, "myName, myAge");
  9. // convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
  10. Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
  11. // convert DataStream into Table with projected field "name" (name-based)
  12. Table table = tableEnv.fromDataStream(stream, "name");
  13. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  14. Table table = tableEnv.fromDataStream(stream, "name as myName");
  1. // get a TableEnvironment
  2. val tableEnv = TableEnvironment.getTableEnvironment(env)
  3. // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
  4. val stream: DataStream[Row] = ...
  5. // convert DataStream into Table with default field names "name", "age"
  6. val table: Table = tableEnv.fromDataStream(stream)
  7. // convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
  8. val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
  9. // convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
  10. val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
  11. // convert DataStream into Table with projected field "name" (name-based)
  12. val table: Table = tableEnv.fromDataStream(stream, 'name)
  13. // convert DataStream into Table with projected and renamed field "myName" (name-based)
  14. val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

查询优化

Apache Flink利用Apache Calcite优化和翻译查询。当前执行的优化包括Projection和过滤器下推,子查询去相关以及其他类型的查询重写。Flink尚未优化连接顺序,但是以与查询中定义的顺序相同的顺序执行它们(FROM子句中的表的顺序和/或子句中连接谓词的顺序WHERE)。

可以通过提供CalciteConfig对象来调整在不同阶段应用的优化规则集这可以通过调用构建器创建,并通过调用CalciteConfig.createBuilder())提供给TableEnvironment tableEnv.getConfig.setCalciteConfig(calciteConfig)

解释表

Table API提供了一种机制来解释计算a的逻辑和优化查询计划Table这是通过该TableEnvironment.explain(table)方法完成的它返回一个描述三个计划的String:

  • 关系查询的抽象语法树,即未优化的逻辑查询计划,
  • 优化的逻辑查询计划,以及
  • 物理执行计划。以下代码显示了一个示例和相应的输出:

  • Java

  • Scala
  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  3. DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
  4. DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
  5. Table table1 = tEnv.fromDataStream(stream1, "count, word");
  6. Table table2 = tEnv.fromDataStream(stream2, "count, word");
  7. Table table = table1
  8. .where("LIKE(word, 'F%')")
  9. .unionAll(table2);
  10. String explanation = tEnv.explain(table);
  11. System.out.println(explanation);
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val tEnv = TableEnvironment.getTableEnvironment(env)
  3. val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
  4. val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
  5. val table = table1
  6. .where('word.like("F%"))
  7. .unionAll(table2)
  8. val explanation: String = tEnv.explain(table)
  9. println(explanation)
  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. LogicalFilter(condition=[LIKE($1, 'F%')])
  4. LogicalTableScan(table=[[_DataStreamTable_0]])
  5. LogicalTableScan(table=[[_DataStreamTable_1]])
  6. == Optimized Logical Plan ==
  7. DataStreamUnion(union=[count, word])
  8. DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
  9. DataStreamScan(table=[[_DataStreamTable_0]])
  10. DataStreamScan(table=[[_DataStreamTable_1]])
  11. == Physical Execution Plan ==
  12. Stage 1 : Data Source
  13. content : collect elements with CollectionInputFormat
  14. Stage 2 : Data Source
  15. content : collect elements with CollectionInputFormat
  16. Stage 3 : Operator
  17. content : from: (count, word)
  18. ship_strategy : REBALANCE
  19. Stage 4 : Operator
  20. content : where: (LIKE(word, 'F%')), select: (count, word)
  21. ship_strategy : FORWARD
  22. Stage 5 : Operator
  23. content : from: (count, word)
  24. ship_strategy : REBALANCE