Concepts & Common API
The Table API and SQL are integrated in a joint API. The central concept of this API is a Table
which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a Table
, how to query a Table
, and how to emit a Table
.
Structure of Table API and SQL Programs
The following code example shows the common structure of Table API and SQL programs.
Java
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;
// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);
// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build());
// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");
// Create a Table object from a Table API query
Table table2 = tableEnv.from("SourceTable");
// Create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");
// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.insertInto("SinkTable").execute();
Scala
import org.apache.flink.table.api._
import org.apache.flink.connector.datagen.table.DataGenOptions
// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
val tableEnv = TableEnvironment.create(/*…*/)
// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build())
// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable")
// Create a Table object from a Table API query
val table1 = tableEnv.from("SourceTable")
// Create a Table object from a SQL query
val table2 = tableEnv.sqlQuery("SELECT * FROM SourceTable")
// Emit a Table API result Table to a TableSink, same for SQL result
val tableResult = table1.insertInto("SinkTable").execute()
Python
from pyflink.table import *
# Create a TableEnvironment for batch or streaming execution
table_env = ... # see "Create a TableEnvironment" section
# Create a source table
table_env.executeSql("""CREATE TEMPORARY TABLE SourceTable (
f0 STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
)
""")
# Create a sink table
table_env.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable")
# Create a Table from a Table API query
table1 = table_env.from_path("SourceTable").select(...)
# Create a Table from a SQL query
table2 = table_env.sql_query("SELECT ... FROM SourceTable ...")
# Emit a Table API result Table to a TableSink, same for SQL result
table_result = table1.execute_insert("SinkTable")
Table API and SQL queries can be easily integrated with and embedded into DataStream programs. Have a look at the DataStream API Integration page to learn how DataStreams can be converted into Tables and vice versa.
Create a TableEnvironment
The TableEnvironment
is the entrypoint for Table API and SQL integration and is responsible for:
- Registering a
Table
in the internal catalog - Registering catalogs
- Loading pluggable modules
- Executing SQL queries
- Registering a user-defined (scalar, table, or aggregation) function
- Converting between
DataStream
andTable
(in case ofStreamTableEnvironment
)
A Table
is always bound to a specific TableEnvironment
. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them. A TableEnvironment
is created by calling the static TableEnvironment.create()
method.
Java
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Scala
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings)
Python
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# create a batch TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
Alternatively, users can create a StreamTableEnvironment
from an existing StreamExecutionEnvironment
to interoperate with the DataStream
API.
Java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
Python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
s_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(s_env)
Create Tables in the Catalog
A TableEnvironment
maintains a map of catalogs of tables which are created with an identifier. Each identifier consists of 3 parts: catalog name, database name and object name. If a catalog or database is not specified, the current default value will be used (see examples in the Table identifier expanding section).
Tables can be either virtual (VIEWS
) or regular (TABLES
). VIEWS
can be created from an existing Table
object, usually the result of a Table API or SQL query. TABLES
describe external data, such as a file, database table, or message queue.
Temporary vs Permanent tables.
Tables may either be temporary, and tied to the lifecycle of a single Flink session, or permanent, and visible across multiple Flink sessions and clusters.
Permanent tables require a catalog (such as Hive Metastore) to maintain metadata about the table. Once a permanent table is created, it is visible to any Flink session that is connected to the catalog and will continue to exist until the table is explicitly dropped.
On the other hand, temporary tables are always stored in memory and only exist for the duration of the Flink session they are created within. These tables are not visible to other sessions. They are not bound to any catalog or database but can be created in the namespace of one. Temporary tables are not dropped if their corresponding database is removed.
Shadowing
It is possible to register a temporary table with the same identifier as an existing permanent table. The temporary table shadows the permanent one and makes the permanent table inaccessible as long as the temporary one exists. All queries with that identifier will be executed against the temporary table.
This might be useful for experimentation. It allows running exactly the same query first against a temporary table that e.g. has just a subset of data, or the data is obfuscated. Once verified that the query is correct it can be run against the real production table.
Create a Table
Virtual Tables
A Table
API object corresponds to a VIEW
(virtual table) in SQL terms. It encapsulates a logical query plan. It can be created in a catalog as follows:
Java
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
Scala
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.from("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable)
Python
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# table is the result of a simple projection query
proj_table = table_env.from_path("X").select(...)
# register the Table projTable as table "projectedTable"
table_env.register_table("projectedTable", proj_table)
Note: Table
objects are similar to VIEW
’s from relational database systems, i.e., the query that defines the Table
is not optimized but will be inlined when another query references the registered Table
. If multiple queries reference the same registered Table
, it will be inlined for each referencing query and executed multiple times, i.e., the result of the registered Table
will not be shared.
Connector Tables
It is also possible to create a TABLE
as known from relational databases from a connector declaration. The connector describes the external system that stores the data of a table. Storage systems such as Apache Kafka or a regular file system can be declared here.
Such tables can either be created using the Table API directly, or by switching to SQL DDL.
// Using table descriptors
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("f0", DataTypes.STRING())
.build())
.option(DataGenOptions.ROWS_PER_SECOND, 100)
.build();
tableEnv.createTable("SourceTableA", sourceDescriptor);
tableEnv.createTemporaryTable("SourceTableB", sourceDescriptor);
// Using SQL DDL
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)");
Expanding Table identifiers
Tables are always registered with a 3-part identifier consisting of catalog, database, and table name.
Users can set one catalog and one database inside it to be the “current catalog” and “current database”. With them, the first two parts in the 3-parts identifier mentioned above can be optional - if they are not provided, the current catalog and current database will be referred. Users can switch the current catalog and current database via table API or SQL.
Identifiers follow SQL requirements which means that they can be escaped with a backtick character (`
).
Java
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
Scala
// get a TableEnvironment
val tEnv: TableEnvironment = ...
tEnv.useCatalog("custom_catalog")
tEnv.useDatabase("custom_database")
val table: Table = ...
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table)
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table)
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table)
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table)
Query a Table
Table API
The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language.
The API is based on the Table
class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new Table
object, which represents the result of applying the relational operation on the input Table
. Some relational operations are composed of multiple method calls such as table.groupBy(...).select()
, where groupBy(...)
specifies a grouping of table
, and select(...)
the projection on the grouping of table
.
The Table API document describes all Table API operations that are supported on streaming and batch tables.
The following example shows a simple Table API aggregation query:
Java
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
// emit or convert Table
// execute query
Scala
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.from("Orders")
// compute revenue for all customers from France
val revenue = orders
.filter($"cCountry" === "FRANCE")
.groupBy($"cID", $"cName")
.select($"cID", $"cName", $"revenue".sum AS "revSum")
// emit or convert Table
// execute query
Note: The Scala Table API uses Scala String interpolation that starts with a dollar sign ($
) to reference the attributes of a Table
. The Table API uses Scala implicits. Make sure to import
org.apache.flink.table.api._
- for implicit expression conversionsorg.apache.flink.api.scala._
andorg.apache.flink.table.api.bridge.scala._
if you want to convert from/to DataStream.Python
# get a TableEnvironment
table_env = # see "Create a TableEnvironment" section
# register Orders table
# scan registered Orders table
orders = table_env.from_path("Orders")
# compute revenue for all customers from France
revenue = orders \
.filter(col('cCountry') == 'FRANCE') \
.group_by(col('cID'), col('cName')) \
.select(col('cID'), col('cName'), col('revenue').sum.alias('revSum'))
# emit or convert Table
# execute query
SQL
Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as regular Strings.
The SQL document describes Flink’s SQL support for streaming and batch tables.
The following example shows how to specify a query and return the result as a Table
.
Java
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
Scala
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
Python
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register Orders table
# compute revenue for all customers from France
revenue = table_env.sql_query(
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
# emit or convert Table
# execute query
The following example shows how to specify an update query that inserts its result into a registered table.
Java
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
Scala
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
Python
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# register "Orders" table
# register "RevenueFrance" output table
# compute revenue for all customers from France and emit to "RevenueFrance"
table_env.execute_sql(
"INSERT INTO RevenueFrance "
"SELECT cID, cName, SUM(revenue) AS revSum "
"FROM Orders "
"WHERE cCountry = 'FRANCE' "
"GROUP BY cID, cName"
)
Mixing Table API and SQL
Table API and SQL queries can be easily mixed because both return Table
objects:
- A Table API query can be defined on the
Table
object returned by a SQL query. - A SQL query can be defined on the result of a Table API query by registering the resulting Table in the
TableEnvironment
and referencing it in theFROM
clause of the SQL query.
Emit a Table
A Table
is emitted by writing it to a TableSink
. A TableSink
is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ).
A batch Table
can only be written to a BatchTableSink
, while a streaming Table
requires either an AppendStreamTableSink
, a RetractStreamTableSink
, or an UpsertStreamTableSink
.
Please see the documentation about Table Sources & Sinks for details about available sinks and instructions for how to implement a custom DynamicTableSink
.
The Table.insertInto(String tableName)
method defines a complete end-to-end pipeline emitting the source table to a registered sink table. The method looks up the table sink from the catalog by the name and validates that the schema of the Table
is identical to the schema of the sink. A pipeline can be explained with TablePipeline.explain()
and executed invoking TablePipeline.execute()
.
The following examples shows how to emit a Table
:
Java
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build();
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build());
// compute a result Table using Table API operators and/or SQL queries
Table result = ...;
// Prepare the insert into pipeline
TablePipeline pipeline = result.insertInto("CsvSinkTable");
// Print explain details
pipeline.printExplain();
// emit the result Table to the registered TableSink
pipeline.execute();
Scala
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an output Table
val schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build()
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build())
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// Prepare the insert into pipeline
val pipeline = result.insertInto("CsvSinkTable")
// Print explain details
pipeline.printExplain()
// emit the result Table to the registered TableSink
pipeline.execute()
Python
# get a TableEnvironment
table_env = ... # see "Create a TableEnvironment" section
# create a TableSink
schema = Schema.new_builder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build()
table_env.create_temporary_table("CsvSinkTable", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.for_format("csv")
.option("field-delimiter", "|")
.build())
.build())
# compute a result Table using Table API operators and/or SQL queries
result = ...
# emit the result Table to the registered TableSink
result.execute_insert("CsvSinkTable")
Translate and Execute a Query
Table API and SQL queries are translated into DataStream programs whether their input is streaming or batch. A query is internally represented as a logical query plan and is translated in two phases:
- Optimization of the logical plan,
- Translation into a DataStream program.
A Table API or SQL query is translated when:
TableEnvironment.executeSql()
is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.TablePipeline.execute()
is called. This method is used for executing a source-to-sink pipeline, and the Table API program is translated immediately once this method is called.Table.execute()
is called. This method is used for collecting the table content to the local client, and the Table API is translated immediately once this method is called.StatementSet.execute()
is called. ATablePipeline
(emitted to a sink throughStatementSet.add()
) or an INSERT statement (specified throughStatementSet.addInsertSql()
) will be buffered inStatementSet
first. They are transformed onceStatementSet.execute()
is called. All sinks will be optimized into one DAG.- A
Table
is translated when it is converted into aDataStream
(see Integration with DataStream). Once translated, it’s a regular DataStream program and is executed whenStreamExecutionEnvironment.execute()
is called.
Query Optimization
Apache Flink leverages and extends Apache Calcite to perform sophisticated query optimization. This includes a series of rule and cost-based optimizations such as:
- Subquery decorrelation based on Apache Calcite
- Project pruning
- Partition pruning
- Filter push-down
- Sub-plan deduplication to avoid duplicate computation
- Special subquery rewriting, including two parts:
- Converts IN and EXISTS into left semi-joins
- Converts NOT IN and NOT EXISTS into left anti-join
- Optional join reordering
- Enabled via
table.optimizer.join-reorder-enabled
- Enabled via
Note: IN/EXISTS/NOT IN/NOT EXISTS are currently only supported in conjunctive conditions in subquery rewriting.
The optimizer makes intelligent decisions, based not only on the plan but also rich statistics available from the data sources and fine-grain costs for each operator such as io, cpu, network, and memory.
Advanced users may provide custom optimizations via a CalciteConfig
object that can be provided to the table environment by calling TableEnvironment#getConfig#setPlannerConfig
.
Explaining a Table
The Table API provides a mechanism to explain the logical and optimized query plans to compute a Table
. This is done through the Table.explain()
method or StatementSet.explain()
method. Table.explain()
returns the plan of a Table
. StatementSet.explain()
returns the plan of multiple sinks. It returns a String describing three plans:
- the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
- the optimized logical query plan, and
- the physical execution plan.
TableEnvironment.explainSql()
and TableEnvironment.executeSql()
support execute a EXPLAIN
statement to get the plans, Please refer to EXPLAIN page.
The following code shows an example and the corresponding output for given Table
using Table.explain()
method:
Java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
// explain Table API
Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
Table table = table1
.where($("word").like("F%"))
.unionAll(table2);
System.out.println(table.explain());
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table2 = env.fromElements((1, "hello")).toTable(tEnv, $"count", $"word")
val table = table1
.where($"word".like("F%"))
.unionAll(table2)
println(table.explain())
Python
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
table1 = t_env.from_elements([(1, "hello")], ["count", "word"])
table2 = t_env.from_elements([(1, "hello")], ["count", "word"])
table = table1 \
.where(col('word').like('F%')) \
.union_all(table2)
print(table.explain())
The result of the above example is
Explain ↕
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
The following code shows an example and the corresponding output for multiple-sinks plan using StatementSet.explain()
method:
Java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = Schema.newBuilder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build());
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build());
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build());
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build());
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.add(table1.insertInto("MySink1"));
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.add(table2.insertInto("MySink2"));
String explanation = stmtSet.explain();
System.out.println(explanation);
Scala
val settings = EnvironmentSettings.inStreamingMode()
val tEnv = TableEnvironment.create(settings)
val schema = Schema.newBuilder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build()
tEnv.createTemporaryTable("MySource1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build())
tEnv.createTemporaryTable("MySource2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build())
tEnv.createTemporaryTable("MySink1", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build())
tEnv.createTemporaryTable("MySink2", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build())
val stmtSet = tEnv.createStatementSet()
val table1 = tEnv.from("MySource1").where($"word".like("F%"))
stmtSet.add(table1.insertInto("MySink1"))
val table2 = table1.unionAll(tEnv.from("MySource2"))
stmtSet.add(table2.insertInto("MySink2"))
val explanation = stmtSet.explain()
println(explanation)
Python
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(environment_settings=settings)
schema = Schema.new_builder()
.column("count", DataTypes.INT())
.column("word", DataTypes.STRING())
.build()
t_env.create_temporary_table("MySource1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySource2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/source/path2")
.format("csv")
.build())
t_env.create_temporary_table("MySink1", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path1")
.format("csv")
.build())
t_env.create_temporary_table("MySink2", TableDescriptor.for_connector("filesystem")
.schema(schema)
.option("path", "/sink/path2")
.format("csv")
.build())
stmt_set = t_env.create_statement_set()
table1 = t_env.from_path("MySource1").where(col('word').like('F%'))
stmt_set.add_insert("MySink1", table1)
table2 = table1.union_all(t_env.from_path("MySource2"))
stmt_set.add_insert("MySink2", table2)
explanation = stmt_set.explain()
print(explanation)
the result of multiple-sinks plan is
MultiTable Explain ↕
== Abstract Syntax Tree ==
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Physical Plan ==
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Optimized Execution Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])(reuse_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[`default_catalog`.`default_database`.`MySink1`], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[`default_catalog`.`default_database`.`MySink2`], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])