- Concepts & Common API
- Main Differences Between the Two Planners
- Structure of Table API and SQL Programs
- Create a TableEnvironment
- Create Tables in the Catalog
- Query a Table
- Emit a Table
- Translate and Execute a Query
- Integration with DataStream and DataSet API
- Query Optimization
Concepts & Common API
The Table API and SQL are integrated in a joint API. The central concept of this API is a Table
which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a Table
, how to query a Table
, and how to emit a Table
.
Main Differences Between the Two Planners
- Blink treats batch jobs as a special case of streaming. As such, the conversion between Table and DataSet is also not supported, and batch jobs will not be translated into
DateSet
programs but translated intoDataStream
programs, the same as the streaming jobs. - The Blink planner does not support
BatchTableSource
, use boundedStreamTableSource
instead of it. - The Blink planner only support the brand new
Catalog
and does not supportExternalCatalog
which is deprecated. - The implementations of
FilterableTableSource
for the old planner and the Blink planner are incompatible. The old planner will push downPlannerExpression
s intoFilterableTableSource
, while the Blink planner will push downExpression
s. - String based key-value config options (Please see the documentation about Configuration for details) are only used for the Blink planner.
- The implementation(
CalciteConfig
) ofPlannerConfig
in two planners is different. - The Blink planner will optimize multiple-sinks into one DAG (supported only on
TableEnvironment
, not onStreamTableEnvironment
). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other. - The old planner does not support catalog statistics now, while the Blink planner does.
Structure of Table API and SQL Programs
All Table API and SQL programs for batch and streaming follow the same pattern. The following code example shows the common structure of Table API and SQL programs.
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1");
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable");
// create a Table object from a Table API query
Table tapiResult = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");
// execute
tableEnv.execute("java_job");
// create a TableEnvironment for specific planner batch or streaming
val tableEnv = ... // see "Create a TableEnvironment" section
// create a Table
tableEnv.connect(...).createTemporaryTable("table1")
// register an output Table
tableEnv.connect(...).createTemporaryTable("outputTable")
// create a Table from a Table API query
val tapiResult = tableEnv.from("table1").select(...)
// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")
// execute
tableEnv.execute("scala_job")
# create a TableEnvironment for specific planner batch or streaming
table_env = ... # see "Create a TableEnvironment" section
# register a Table
table_env.connect(...).create_temporary_table("table1")
# register an output Table
table_env.connect(...).create_temporary_table("outputTable")
# create a Table from a Table API query
tapi_result = table_env.from_path("table1").select(...)
# create a Table from a SQL query
sql_result = table_env.sql_query("SELECT ... FROM table1 ...")
# emit a Table API result Table to a TableSink, same for SQL result
tapi_result.insert_into("outputTable")
# execute
table_env.execute("python_job")
Note: Table API and SQL queries can be easily integrated with and embedded into DataStream or DataSet programs. Have a look at the Integration with DataStream and DataSet API section to learn how DataStreams and DataSets can be converted into Tables and vice versa.
Create a TableEnvironment
The TableEnvironment
is a central concept of the Table API and SQL integration. It is responsible for:
- Registering a
Table
in the internal catalog - Registering catalogs
- Loading pluggable modules
- Executing SQL queries
- Registering a user-defined (scalar, table, or aggregation) function
- Converting a
DataStream
orDataSet
into aTable
- Holding a reference to an
ExecutionEnvironment
orStreamExecutionEnvironment
A Table
is always bound to a specific TableEnvironment
. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them.
A TableEnvironment
is created by calling the static BatchTableEnvironment.create()
or StreamTableEnvironment.create()
method with a StreamExecutionEnvironment
or an ExecutionEnvironment
and an optional TableConfig
. The TableConfig
can be used to configure the TableEnvironment
or to customize the query optimization and translation process (see Query Optimization).
Make sure to choose the specific planner BatchTableEnvironment
/StreamTableEnvironment
that matches your programming language.
If both planner jars are on the classpath (the default behavior), you should explicitly set which planner to use in the current program.
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
# **********************
# FLINK STREAMING QUERY
# **********************
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
f_s_env = StreamExecutionEnvironment.get_execution_environment()
f_s_settings = EnvironmentSettings.new_instance().use_old_planner().in_streaming_mode().build()
f_s_t_env = StreamTableEnvironment.create(f_s_env, environment_settings=f_s_settings)
# ******************
# FLINK BATCH QUERY
# ******************
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment
f_b_env = ExecutionEnvironment.get_execution_environment()
f_b_t_env = BatchTableEnvironment.create(f_b_env, table_config)
# **********************
# BLINK STREAMING QUERY
# **********************
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
# ******************
# BLINK BATCH QUERY
# ******************
from pyflink.table import EnvironmentSettings, BatchTableEnvironment
b_b_settings = EnvironmentSettings.new_instance().use_blink_planner().in_batch_mode().build()
b_b_t_env = BatchTableEnvironment.create(environment_settings=b_b_settings)
Note: If there is only one planner jar in /lib
directory, you can use useAnyPlanner
(use_any_planner
for python) to create specific EnvironmentSettings
.
Create Tables in the Catalog
A TableEnvironment
maintains a map of catalogs of tables which are created with an identifier. Eachidentifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is notspecified, the current default value will be used (see examples in the Table identifier expanding section).
Tables can be either virtual (VIEWS
) or regular (TABLES
). VIEWS
can be created from anexisting Table
object, usually the result of a Table API or SQL query. TABLES
describeexternal data, such as a file, database table, or message queue.
Temporary vs Permanent tables.
Tables may either be temporary, and tied to the lifecycle of a single Flink session, or permanent,and visible across multiple Flink sessions and clusters.
Permanent tables require a catalog (such as Hive Metastore)to maintain metadata about the table. Once a permanent table is created, it is visible to any Flinksession that is connected to the catalog and will continue to exist until the table is explicitlydropped.
On the other hand, temporary tables are always stored in memory and only exist for the duration ofthe Flink session they are created within. These tables are not visible to other sessions. They arenot bound to any catalog or database but can be created in the namespace of one. Temporary tablesare not dropped if their corresponding database is removed.
Shadowing
It is possible to register a temporary table with the same identifier as an existing permanenttable. The temporary table shadows the permanent one and makes the permanent table inaccessible aslong as the temporary one exists. All queries with that identifier will be executed against thetemporary table.
This might be useful for experimentation. It allows running exactly the same query first against atemporary table that e.g. has just a subset of data, or the data is obfuscated. Once verified thatthe query is correct it can be run against the real production table.
Create a Table
Virtual Tables
A Table
API object corresponds to a VIEW
(virtual table) in a SQL terms. It encapsulates a logicalquery plan. It can be created in a catalog as follows:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# table is the result of a simple projection query
proj_table = table_env.from_path("X").select(...)
# register the Table projTable as table "projectedTable"
table_env.register_table("projectedTable", proj_table)
Note: Table
objects are similar to VIEW
’s from relational databasesystems, i.e., the query that defines the Table
is not optimized but will be inlined when anotherquery references the registered Table
. If multiple queries reference the same registered Table
,it will be inlined for each referencing query and executed multiple times, i.e., the result of theregistered Table
will not be shared.
Connector Tables
It is also possible to create a TABLE
as known from relational databases from a connector declaration.The connector describes the external system that stores the data of a table. Storage systems such as Apacha Kafka or a regular file system can be declared here.
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
table_environment \
.connect(...) \
.with_format(...) \
.with_schema(...) \
.in_append_mode() \
.create_temporary_table("MyTable")
tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
Expanding Table identifiers
Tables are always registered with a 3-part identifier consisting of catalog, database, and table name.
Users can set one catalog and one database inside it to be the “current catalog” and “current database”.With them, the first two parts in the 3-parts identifier mentioned above can be optional - if they are not provided,the current catalog and current database will be referred. Users can switch the current catalog and current database viatable API or SQL.
Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`
).Additionally all SQL reserved keywords must be escaped.
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.
tableEnv.createTemporaryView("`View`", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
// get a TableEnvironment
val tEnv: TableEnvironment = ...;
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
val table: Table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'View' in the catalog named 'custom_catalog' in the
// database named 'custom_database'. 'View' is a reserved keyword and must be escaped.
tableEnv.createTemporaryView("`View`", table)
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
Query a Table
Table API
The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language.
The API is based on the Table
class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new Table
object, which represents the result of applying the relational operation on the input Table
. Some relational operations are composed of multiple method calls such as table.groupBy(…).select()
, where groupBy(…)
specifies a grouping of table
, and select(…)
the projection on the grouping of table
.
The Table API document describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// emit or convert Table
// execute query
Note: The Scala Table API uses Scala Symbols, which start with a single tick ('
) to reference the attributes of a Table
. The Table API uses Scala implicits. Make sure to import org.apache.flink.api.scala.
and org.apache.flink.table.api.scala.
in order to use Scala implicit conversions.
# get a TableEnvironment
table_env = # see "Create a TableEnvironment" section
# register Orders table
# scan registered Orders table
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
.filter("cCountry === 'FRANCE'") \
.group_by("cID, cName") \
.select("cID, cName, revenue.sum AS revSum")
# emit or convert Table
# execute query
SQL
Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as regular Strings.
The SQL document describes Flink’s SQL support for streaming and batch tables.
The following example shows how to specify a query and return the result as a Table
.
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register Orders table
# compute revenue for all customers from France
revenue = table_env.sql_query(
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
# emit or convert Table
# execute query
The following example shows how to specify an update query that inserts its result into a registered table.
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// execute query
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// execute query
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register "Orders" table
# register "RevenueFrance" output table
# compute revenue for all customers from France and emit to "RevenueFrance"
table_env.sql_update(
"INSERT INTO RevenueFrance "
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
# execute query
Mixing Table API and SQL
Table API and SQL queries can be easily mixed because both return Table
objects:
- A Table API query can be defined on the
Table
object returned by a SQL query. - A SQL query can be defined on the result of a Table API query by registering the resulting Table in the
TableEnvironment
and referencing it in theFROM
clause of the SQL query.
Emit a Table
A Table
is emitted by writing it to a TableSink
. A TableSink
is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A batch Table
can only be written to a BatchTableSink
, while a streaming Table
requires either an AppendStreamTableSink
, a RetractStreamTableSink
, or an UpsertStreamTableSink
.
Please see the documentation about Table Sources & Sinks for details about available sinks and instructions for how to implement a custom TableSink
.
The Table.insertInto(String tableName)
method emits the Table
to a registered TableSink
. The method looks up the TableSink
from the catalog by the name and validates that the schema of the Table
is identical to the schema of the TableSink
.
The following examples shows how to emit a Table
:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG());
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
// execute the program
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an output Table
val schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.LONG())
tableEnv.connect(new FileSystem("/path/to/file"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable")
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")
// execute the program
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# create a TableSink
t_env.connect(FileSystem().path("/path/to/file")))
.with_format(Csv()
.field_delimiter(',')
.deriveSchema())
.with_schema(Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT()))
.create_temporary_table("CsvSinkTable")
# compute a result Table using Table API operators and/or SQL queries
result = ...
# emit the result Table to the registered TableSink
result.insert_into("CsvSinkTable")
# execute the program
Translate and Execute a Query
The behavior of translating and executing a query is different for the two planners.
Table API and SQL queries are translated into DataStream or DataSet programs depending on whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases:
- Optimization of the logical plan
- Translation into a DataStream or DataSet programA Table API or SQL query is translated when:
- a
Table
is emitted to aTableSink
, i.e., whenTable.insertInto()
is called. - a SQL update query is specified, i.e., when
TableEnvironment.sqlUpdate()
is called. - a
Table
is converted into aDataStream
orDataSet
(see Integration with DataStream and DataSet API).
Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when StreamExecutionEnvironment.execute()
or ExecutionEnvironment.execute()
is called.
Table API and SQL queries are translated into DataStream programs whether their input is streaming or batch. A query is internally represented as a logical query plan and is translated in two phases:
- Optimization of the logical plan,
- Translation into a DataStream program.The behavior of translating a query is different for
TableEnvironment
andStreamTableEnvironment
.
For TableEnvironment
, a Table API or SQL query is translated when TableEnvironment.execute()
is called, because TableEnvironment
will optimize multiple-sinks into one DAG.
While for StreamTableEnvironment
, a Table API or SQL query is translated when:
- a
Table
is emitted to aTableSink
, i.e., whenTable.insertInto()
is called. - a SQL update query is specified, i.e., when
TableEnvironment.sqlUpdate()
is called. - a
Table
is converted into aDataStream
.
Once translated, a Table API or SQL query is handled like a regular DataStream program and is executed when TableEnvironment.execute()
or StreamExecutionEnvironment.execute()
is called.
Integration with DataStream and DataSet API
Both planners on stream can integrate with the DataStream
API. Only old planner can integrate with the DataSet API
, Blink planner on batch could not be combined with both.Note: The DataSet
API discussed below is only relevant for the old planner on batch.
Table API and SQL queries can be easily integrated with and embedded into DataStream and DataSet programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream or DataSet API (and any of the libraries built on top of these APIs, such as CEP or Gelly). Inversely, a Table API or SQL query can also be applied on the result of a DataStream or DataSet program.
This interaction can be achieved by converting a DataStream
or DataSet
into a Table
and vice versa. In this section, we describe how these conversions are done.
Implicit Conversion for Scala
The Scala Table API features implicit conversions for the DataSet
, DataStream
, and Table
classes. These conversions are enabled by importing the package org.apache.flink.table.api.scala.
in addition to org.apache.flink.api.scala.
for the Scala DataStream API.
Create a View from a DataStream or DataSet
A DataStream
or DataSet
can be registered in a TableEnvironment
as a View. The schema of the resulting view depends on the data type of the registered DataStream
or DataSet
. Please check the section about mapping of data types to table schema for details.
Note: Views created from a DataStream
or DataSet
can be registered as temporary views only.
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream)
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, 'myLong, 'myString)
Convert a DataStream or DataSet into a Table
Instead of registering a DataStream
or DataSet
in a TableEnvironment
, it can also be directly converted into a Table
. This is convenient if you want to use the Table in a Table API query.
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
Convert a Table into a DataStream or DataSet
A Table
can be converted into a DataStream
or DataSet
. In this way, custom DataStream or DataSet programs can be run on the result of a Table API or SQL query.
When converting a Table
into a DataStream
or DataSet
, you need to specify the data type of the resulting DataStream
or DataSet
, i.e., the data type into which the rows of the Table
are to be converted. Often the most convenient conversion type is Row
. The following list gives an overview of the features of the different options:
- Row: fields are mapped by position, arbitrary number of fields, support for
null
values, no type-safe access. - POJO: fields are mapped by name (POJO fields must be named as
Table
fields), arbitrary number of fields, support fornull
values, type-safe access. - Case Class: fields are mapped by position, no support for
null
values, type-safe access. - Tuple: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for
null
values, type-safe access. - Atomic Type:
Table
must have a single field, no support fornull
values, type-safe access.
Convert a Table into a DataStream
A Table
that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query’s input streams. Hence, the DataStream
into which such a dynamic query is converted needs to encode the updates of the table.
There are two modes to convert a Table
into a DataStream
:
- Append Mode: This mode can only be used if the dynamic
Table
is only modified byINSERT
changes, i.e, it is append-only and previously emitted results are never updated. - Retract Mode: This mode can always be used. It encodes
INSERT
andDELETE
changes with aboolean
flag.
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
Note: A detailed discussion about dynamic tables and their properties is given in the Dynamic Tables document.
Convert a Table into a DataSet
A Table
is converted into a DataSet
as follows:
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
Mapping of Data Types to Table Schema
Flink’s DataStream and DataSet APIs support very diverse types. Composite types such as Tuples (built-in Scala and Flink Java tuples), POJOs, Scala case classes, and Flink’s Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a DataStream
into a Table
.
The mapping of a data type to a table schema can happen in two ways: based on the field positions or based on the field names.
Position-based Mapping
Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types with a defined field order as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can’t be renamed using an alias as
.
When defining a position-based mapping, the specified names must not exist in the input data type, otherwise the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or f0
for atomic types.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section;
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "myLong" only
Table table = tableEnv.fromDataStream(stream, "myLong");
// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myInt)
Name-based Mapping
Name-based mapping can be used for any data type including POJOs. It is the most flexible way of defining a table schema mapping. All fields in the mapping are referenced by name and can be possibly renamed using an alias as
. Fields can be reordered and projected out.
If no field names are specified, the default field names and field order of the composite type are used or f0
for atomic types.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");
// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)
Atomic Types
Flink treats primitives (Integer
, Double
, String
) or generic types (types that cannot be analyzed and decomposed) as atomic types. A DataStream
or DataSet
of an atomic type is converted into a Table
with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute can be specified.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Long> stream = ...
// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[Long] = ...
// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuples (Scala and Java) and Case Classes (Scala only)
Flink supports Scala’s built-in tuples and provides its own tuple classes for Java. DataStreams and DataSets of both kinds of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (f0
, f1
, … for Flink Tuples and _1
, _2
, … for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (as
).
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// convert DataStream into Table with default field names "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// convert DataStream into Table with reordered fields "f1", "f0" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// convert DataStream into Table with projected field "f1" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1");
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
POJO (Java and Scala)
Flink supports POJOs as composite types. The rules for what determines a POJO are documented here.
When converting a POJO DataStream
or DataSet
into a Table
without specifying field names, the names of the original POJO fields are used. The name mapping requires the original names and cannot be done by positions. Fields can be renamed using an alias (with the as
keyword), reordered, and projected.
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Row
The Row
data type supports an arbitrary number of fields and fields with null
values. Field names can be specified via a RowTypeInfo
or when converting a Row
DataStream
or DataSet
into a Table
. The row type supports mapping of fields by position and by name. Fields can be renamed by providing names for all fields (mapping based on position) or selected individually for projection/ordering/renaming (mapping based on name).
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...
// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Query Optimization
Apache Flink leverages Apache Calcite to optimize and translate queries. The optimization currently performed include projection and filter push-down, subquery decorrelation, and other kinds of query rewriting. Old planner does not yet optimize the order of joins, but executes them in the same order as defined in the query (order of Tables in the FROM
clause and/or order of join predicates in the WHERE
clause).
It is possible to tweak the set of optimization rules which are applied in different phases by providing a CalciteConfig
object. This can be created via a builder by calling CalciteConfig.createBuilder())
and is provided to the TableEnvironment by calling tableEnv.getConfig.setPlannerConfig(calciteConfig)
.
Apache Flink leverages and extends Apache Calcite to perform sophisticated query optimization.This includes a series of rule and cost-based optimizations such as:
- Subquery decorrelation based on Apache Calcite
- Project pruning
- Partition pruning
- Filter push-down
- Sub-plan deduplication to avoid duplicate computation
- Special subquery rewriting, including two parts:
- Converts IN and EXISTS into left semi-joins
- Converts NOT IN and NOT EXISTS into left anti-join
- Optional join reordering
- Enabled via
table.optimizer.join-reorder-enabled
- Enabled via
Note: IN/EXISTS/NOT IN/NOT EXISTS are currently only supported in conjunctive conditions in subquery rewriting.
The optimizer makes intelligent decisions, based not only on the plan but also rich statistics available from the data sources and fine-grain costs for each operator such as io, cpu, network, and memory.
Advanced users may provide custom optimizations via a CalciteConfig
object that can be provided to the table environment by calling TableEnvironment#getConfig#setPlannerConfig
.
Explaining a Table
The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table
. This is done through the TableEnvironment.explain(table)
method or TableEnvironment.explain()
method. explain(table)
returns the plan of a given Table
. explain()
returns the result of a multiple sinks plan and is mainly used for the Blink planner. It returns a String describing three plans:
- the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
- the optimized logical query plan, and
- the physical execution plan.The following code shows an example and the corresponding output for given
Table
usingexplain(table)
:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
.where("LIKE(word, 'F%')")
.unionAll(table2);
String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
.where("LIKE(word, 'F%')") \
.union_all(table2)
explanation = t_env.explain(table)
print(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[3], fields=[count, word])
DataStreamScan(id=[6], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : Flat Map
ship_strategy : FORWARD
Stage 3 : Operator
content : Map
ship_strategy : FORWARD
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Stage 5 : Operator
content : Flat Map
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 9 : Operator
content : Map
ship_strategy : FORWARD
The following code shows an example and the corresponding output for multiple-sinks plan using explain()
:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING());
tEnv.connect(new FileSystem("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1");
tEnv.connect(new FileSystem("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2");
tEnv.connect(new FileSystem("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1");
tEnv.connect(new FileSystem("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2");
Table table1 = tEnv.from("MySource1").where("LIKE(word, 'F%')");
table1.insertInto("MySink1");
Table table2 = table1.unionAll(tEnv.from("MySource2"));
table2.insertInto("MySink2");
String explanation = tEnv.explain(false);
System.out.println(explanation);
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)
val schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING())
tEnv.connect(new FileSystem("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1")
tEnv.connect(new FileSystem("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2")
tEnv.connect(new FileSystem("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1")
tEnv.connect(new FileSystem("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2")
val table1 = tEnv.from("MySource1").where("LIKE(word, 'F%')")
table1.insertInto("MySink1")
val table2 = table1.unionAll(tEnv.from("MySource2"))
table2.insertInto("MySink2")
val explanation = tEnv.explain(false)
println(explanation)
settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = TableEnvironment.create(environment_settings=settings)
schema = Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING())
t_env.connect(FileSystem().path("/source/path1")))
.with_format(Csv().deriveSchema())
.with_schema(schema)
.create_temporary_table("MySource1")
t_env.connect(FileSystem().path("/source/path2")))
.with_format(Csv().deriveSchema())
.with_schema(schema)
.create_temporary_table("MySource2")
t_env.connect(FileSystem().path("/sink/path1")))
.with_format(Csv().deriveSchema())
.with_schema(schema)
.create_temporary_table("MySink1")
t_env.connect(FileSystem().path("/sink/path2")))
.with_format(Csv().deriveSchema())
.with_schema(schema)
.create_temporary_table("MySink2")
table1 = t_env.from_path("MySource1").where("LIKE(word, 'F%')")
table1.insert_into("MySink1")
table2 = table1.union_all(t_env.from_path("MySource2"))
table2.insert_into("MySink2")
explanation = t_env.explain()
print(explanation)
the result of multiple-sinks plan is
== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalSink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
Sink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])
Sink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
ship_strategy : FORWARD
Stage 5 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Stage 9 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 10 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 12 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD