Intro to the Python Table API

This document is a short introduction to the PyFlink Table API, which is used to help novice users quickly understand the basic usage of PyFlink Table API. For advanced usage, please refer to other documents in this user guide.

Common Structure of Python Table API Program

All Table API and SQL programs, both batch and streaming, follow the same pattern. The following code example shows the common structure of Table API and SQL programs.

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. from pyflink.table.expressions import col
  3. # 1. create a TableEnvironment
  4. env_settings = EnvironmentSettings.in_streaming_mode()
  5. table_env = TableEnvironment.create(env_settings)
  6. # 2. create source Table
  7. table_env.execute_sql("""
  8. CREATE TABLE datagen (
  9. id INT,
  10. data STRING
  11. ) WITH (
  12. 'connector' = 'datagen',
  13. 'fields.id.kind' = 'sequence',
  14. 'fields.id.start' = '1',
  15. 'fields.id.end' = '10'
  16. )
  17. """)
  18. # 3. create sink Table
  19. table_env.execute_sql("""
  20. CREATE TABLE print (
  21. id INT,
  22. data STRING
  23. ) WITH (
  24. 'connector' = 'print'
  25. )
  26. """)
  27. # 4. query from source table and perform calculations
  28. # create a Table from a Table API query:
  29. source_table = table_env.from_path("datagen")
  30. # or create a Table from a SQL query:
  31. # source_table = table_env.sql_query("SELECT * FROM datagen")
  32. result_table = source_table.select(col("id") + 1, col("data"))
  33. # 5. emit query result to sink table
  34. # emit a Table API result Table to a sink table:
  35. result_table.execute_insert("print").wait()
  36. # or emit results via SQL query:
  37. # table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

Create a TableEnvironment

TableEnvironment is a central concept of the Table API and SQL integration. The following code example shows how to create a TableEnvironment:

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # create a streaming TableEnvironment
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. # or create a batch TableEnvironment
  6. env_settings = EnvironmentSettings.in_batch_mode()
  7. table_env = TableEnvironment.create(env_settings)

For more details about the different ways to create a TableEnvironment, please refer to the TableEnvironment Documentation.

TableEnvironment is responsible for:

Create Tables

Table is a core component of the Python Table API. A Table object describes a pipeline of data transformations. It does not contain the data itself in any way. Instead, it describes how to read data from a table source, and how to eventually write data to a table sink. The declared pipeline can be printed, optimized, and eventually executed in a cluster. The pipeline can work with bounded or unbounded streams which enables both streaming and batch scenarios.

A Table is always bound to a specific TableEnvironment. It is not possible to combine tables from different TableEnvironments in same query, e.g., to join or union them.

Create using a List Object

You can create a Table from a list object, this is usually used when writing examples or unit tests.

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # create a batch TableEnvironment
  3. env_settings = EnvironmentSettings.in_batch_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
  6. table.execute().print()

The results are as following:

  1. +----------------------+--------------------------------+
  2. | _1 | _2 |
  3. +----------------------+--------------------------------+
  4. | 1 | Hi |
  5. | 2 | Hello |
  6. +----------------------+--------------------------------+

You can also create a Table with specified column names:

  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. table.execute().print()

The results are as following:

  1. +----------------------+--------------------------------+
  2. | id | data |
  3. +----------------------+--------------------------------+
  4. | 1 | Hi |
  5. | 2 | Hello |
  6. +----------------------+--------------------------------+

By default, the table schema is extracted from the data automatically. If the automatically generated table schema isn’t as expected, you can also specify it manually:

  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. # by default, the type of the "id" column is BIGINT
  3. print('By default the type of the "id" column is %s.' % table.get_schema().get_field_data_type("id"))
  4. from pyflink.table import DataTypes
  5. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
  6. DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
  7. DataTypes.FIELD("data", DataTypes.STRING())]))
  8. # now the type of the "id" column is set as TINYINT
  9. print('Now the type of the "id" column is %s.' % table.get_schema().get_field_data_type("id"))

The results are as following:

  1. By default the type of the "id" column is BIGINT.
  2. Now the type of the "id" column is TINYINT.

Create using DDL statements

You can also create a Table using SQL DDL statements. It represents a Table which reads data from the specified external storage.

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # create a stream TableEnvironment
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table_env.execute_sql("""
  6. CREATE TABLE random_source (
  7. id BIGINT,
  8. data TINYINT
  9. ) WITH (
  10. 'connector' = 'datagen',
  11. 'fields.id.kind'='sequence',
  12. 'fields.id.start'='1',
  13. 'fields.id.end'='3',
  14. 'fields.data.kind'='sequence',
  15. 'fields.data.start'='4',
  16. 'fields.data.end'='6'
  17. )
  18. """)
  19. table = table_env.from_path("random_source")
  20. table.execute().print()

The results are as following:

  1. +----+----------------------+--------+
  2. | op | id | data |
  3. +----+----------------------+--------+
  4. | +I | 1 | 4 |
  5. | +I | 2 | 5 |
  6. | +I | 3 | 6 |
  7. +----+----------------------+--------+

Create using TableDescriptor

TableDescriptor is another way to define a Table. It’s equivalent to SQL DDL statements.

  1. from pyflink.table import EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes
  2. # create a stream TableEnvironment
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table_env.create_temporary_table(
  6. 'random_source',
  7. TableDescriptor.for_connector('datagen')
  8. .schema(Schema.new_builder()
  9. .column('id', DataTypes.BIGINT())
  10. .column('data', DataTypes.TINYINT())
  11. .build())
  12. .option('fields.id.kind', 'sequence')
  13. .option('fields.id.start', '1')
  14. .option('fields.id.end', '3')
  15. .option('fields.data.kind', 'sequence')
  16. .option('fields.data.start', '4')
  17. .option('fields.data.end', '6')
  18. .build())
  19. table = table_env.from_path("random_source")
  20. table.execute().print()

The results are as following:

  1. +----+----------------------+--------+
  2. | op | id | data |
  3. +----+----------------------+--------+
  4. | +I | 1 | 4 |
  5. | +I | 2 | 5 |
  6. | +I | 3 | 6 |
  7. +----+----------------------+--------+

Create using a Catalog

TableEnvironment maintains a map of catalogs of tables which are created with an identifier.

The tables in a catalog may either be temporary, and tied to the lifecycle of a single Flink session, or permanent, and visible across multiple Flink sessions.

The tables and views created via SQL DDL, e.g. “create table …” and “create view …” are also stored in a catalog.

You can directly access the tables in a catalog via SQL.

If you want to use tables from a catalog with the Table API, you can use the “from_path” method to create the Table API objects:

  1. # prepare the catalog
  2. # register Table API tables in the catalog
  3. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  4. table_env.create_temporary_view('source_table', table)
  5. # create Table API table from catalog
  6. new_table = table_env.from_path('source_table')
  7. new_table.execute().print()

The results are as following:

  1. +----+----------------------+--------------------------------+
  2. | op | id | data |
  3. +----+----------------------+--------------------------------+
  4. | +I | 1 | Hi |
  5. | +I | 2 | Hello |
  6. +----+----------------------+--------------------------------+

Write Queries

Write Table API Queries

The Table object offers many methods for applying relational operations. These methods return new Table objects representing the result of applying the relational operations on the input Table. These relational operations may be composed of multiple method calls, such as table.group_by(...).select(...).

The Table API documentation describes all Table API operations that are supported on streaming and batch tables.

The following example shows a simple Table API aggregation query:

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. from pyflink.table.expressions import col
  3. # using batch table environment to execute the queries
  4. env_settings = EnvironmentSettings.in_batch_mode()
  5. table_env = TableEnvironment.create(env_settings)
  6. orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
  7. ['name', 'country', 'revenue'])
  8. # compute revenue for all customers from France
  9. revenue = orders \
  10. .select(col("name"), col("country"), col("revenue")) \
  11. .where(col("country") == 'FRANCE') \
  12. .group_by(col("name")) \
  13. .select(col("name"), col("country").sum.alias('rev_sum'))
  14. revenue.execute().print()

The results are as following:

  1. +--------------------------------+----------------------+
  2. | name | rev_sum |
  3. +--------------------------------+----------------------+
  4. | Jack | 30 |
  5. +--------------------------------+----------------------+

The Row-based Operations are also supported in Python Table API, which include Map Operation, FlatMap Operation, Aggregate Operation and FlatAggregate Operation.

The following example shows a simple row-based operation query:

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. from pyflink.table import DataTypes
  3. from pyflink.table.udf import udf
  4. import pandas as pd
  5. # using batch table environment to execute the queries
  6. env_settings = EnvironmentSettings.in_batch_mode()
  7. table_env = TableEnvironment.create(env_settings)
  8. orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],
  9. ['name', 'country', 'revenue'])
  10. map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),
  11. result_type=DataTypes.ROW(
  12. [DataTypes.FIELD("name", DataTypes.STRING()),
  13. DataTypes.FIELD("revenue", DataTypes.BIGINT())]),
  14. func_type="pandas")
  15. orders.map(map_function).execute().print()

The results are as following:

  1. +--------------------------------+----------------------+
  2. | name | revenue |
  3. +--------------------------------+----------------------+
  4. | Jack | 100 |
  5. | Rose | 300 |
  6. | Jack | 200 |
  7. +--------------------------------+----------------------+

Write SQL Queries

Flink’s SQL integration is based on Apache Calcite, which implements the SQL standard. SQL queries are specified as Strings.

The SQL documentation describes Flink’s SQL support for streaming and batch tables.

The following example shows a simple SQL aggregation query:

  1. from pyflink.table import EnvironmentSettings, TableEnvironment
  2. # use a stream TableEnvironment to execute the queries
  3. env_settings = EnvironmentSettings.in_streaming_mode()
  4. table_env = TableEnvironment.create(env_settings)
  5. table_env.execute_sql("""
  6. CREATE TABLE random_source (
  7. id BIGINT,
  8. data TINYINT
  9. ) WITH (
  10. 'connector' = 'datagen',
  11. 'fields.id.kind'='sequence',
  12. 'fields.id.start'='1',
  13. 'fields.id.end'='8',
  14. 'fields.data.kind'='sequence',
  15. 'fields.data.start'='4',
  16. 'fields.data.end'='11'
  17. )
  18. """)
  19. table_env.execute_sql("""
  20. CREATE TABLE print_sink (
  21. id BIGINT,
  22. data_sum TINYINT
  23. ) WITH (
  24. 'connector' = 'print'
  25. )
  26. """)
  27. table_env.execute_sql("""
  28. INSERT INTO print_sink
  29. SELECT id, sum(data) as data_sum FROM
  30. (SELECT id / 2 as id, data FROM random_source)
  31. WHERE id > 1
  32. GROUP BY id
  33. """).wait()

The results are as following:

  1. 2> +I(4,11)
  2. 6> +I(2,8)
  3. 8> +I(3,10)
  4. 6> -U(2,8)
  5. 8> -U(3,10)
  6. 6> +U(2,15)
  7. 8> +U(3,19)

In fact, this shows the change logs received by the print sink. The output format of a change log is:

  1. {subtask id}> {message type}{string format of the value}

For example, “2> +I(4,11)” means this message comes from the 2nd subtask, and “+I” means it is an insert message. “(4, 11)” is the content of the message. In addition, “-U” means a retract record (i.e. update-before), which means this message should be deleted or retracted from the sink. “+U” means this is an update record (i.e. update-after), which means this message should be updated or inserted by the sink.

So, we get this result from the change logs above:

  1. (4, 11)
  2. (2, 15)
  3. (3, 19)

Mix the Table API and SQL

The Table objects used in Table API and the tables used in SQL can be freely converted to each other.

The following example shows how to use a Table object in SQL:

  1. # create a sink table to emit results
  2. table_env.execute_sql("""
  3. CREATE TABLE table_sink (
  4. id BIGINT,
  5. data VARCHAR
  6. ) WITH (
  7. 'connector' = 'print'
  8. )
  9. """)
  10. # convert the Table API table to a SQL view
  11. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  12. table_env.create_temporary_view('table_api_table', table)
  13. # emit the Table API table
  14. table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

The results are as following:

  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)

And the following example shows how to use SQL tables in the Table API:

  1. # create a sql source table
  2. table_env.execute_sql("""
  3. CREATE TABLE sql_source (
  4. id BIGINT,
  5. data TINYINT
  6. ) WITH (
  7. 'connector' = 'datagen',
  8. 'fields.id.kind'='sequence',
  9. 'fields.id.start'='1',
  10. 'fields.id.end'='4',
  11. 'fields.data.kind'='sequence',
  12. 'fields.data.start'='4',
  13. 'fields.data.end'='7'
  14. )
  15. """)
  16. # convert the sql table to Table API table
  17. table = table_env.from_path("sql_source")
  18. # or create the table from a sql query
  19. # table = table_env.sql_query("SELECT * FROM sql_source")
  20. # emit the table
  21. table.execute().print()

The results are as following:

  1. +----+----------------------+--------+
  2. | op | id | data |
  3. +----+----------------------+--------+
  4. | +I | 1 | 4 |
  5. | +I | 2 | 5 |
  6. | +I | 3 | 6 |
  7. | +I | 4 | 7 |
  8. +----+----------------------+--------+

Emit Results

Print the Table

You can call the TableResult.print method to print the content of the Table to console. This is usually used when you want to preview the table.

  1. # prepare source tables
  2. source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
  3. # Get TableResult
  4. table_result = table_env.execute_sql("select a + 1, b, c from %s" % source)
  5. # Print the table
  6. table_result.print()

The results are as following:

  1. +----+----------------------+--------------------------------+--------------------------------+
  2. | op | EXPR$0 | b | c |
  3. +----+----------------------+--------------------------------+--------------------------------+
  4. | +I | 2 | Hi | Hello |
  5. | +I | 3 | Hello | Hello |
  6. +----+----------------------+--------------------------------+--------------------------------+

Note It will trigger the materialization of the table and collect table content to the memory of the client, it’s a good practice to limit the number of rows collected via Table.limit .

Collect Results to Client

You can call the TableResult.collect method to collect results of a table to client. The type of the results is an auto closeable iterator.

The following code shows how to use the TableResult.collect() method:

  1. # prepare source tables
  2. source = table_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
  3. # Get TableResult
  4. table_result = table_env.execute_sql("select a + 1, b, c from %s" % source)
  5. # Traversal result
  6. with table_result.collect() as results:
  7. for result in results:
  8. print(result)

The results are as following:

  1. <Row(2, 'Hi', 'Hello')>
  2. <Row(3, 'Hello', 'Hello')>

Note It will trigger the materialization of the table and collect table content to the memory of the client, it’s a good practice to limit the number of rows collected via Table.limit .

Collect Results to Client by converting it to pandas DataFrame

You can call the “to_pandas” method to convert a Table object to a pandas DataFrame:

  1. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  2. print(table.to_pandas())

The results are as following:

  1. id data
  2. 0 1 Hi
  3. 1 2 Hello

Note It will trigger the materialization of the table and collect table content to the memory of the client, it’s a good practice to limit the number of rows collected via Table.limit .

Note Not all the data types are supported.

Emit Results to One Sink Table

You can call the “execute_insert” method to emit the data in a Table object to a sink table:

  1. table_env.execute_sql("""
  2. CREATE TABLE sink_table (
  3. id BIGINT,
  4. data VARCHAR
  5. ) WITH (
  6. 'connector' = 'print'
  7. )
  8. """)
  9. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  10. table.execute_insert("sink_table").wait()

The results are as following:

  1. 6> +I(1,Hi)
  2. 6> +I(2,Hello)

This could also be done using SQL:

  1. table_env.create_temporary_view("table_source", table)
  2. table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

Emit Results to Multiple Sink Tables

You can use a StatementSet to emit the Tables to multiple sink tables in one job:

  1. # prepare source tables and sink tables
  2. table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  3. table_env.create_temporary_view("simple_source", table)
  4. table_env.execute_sql("""
  5. CREATE TABLE first_sink_table (
  6. id BIGINT,
  7. data VARCHAR
  8. ) WITH (
  9. 'connector' = 'print'
  10. )
  11. """)
  12. table_env.execute_sql("""
  13. CREATE TABLE second_sink_table (
  14. id BIGINT,
  15. data VARCHAR
  16. ) WITH (
  17. 'connector' = 'print'
  18. )
  19. """)
  20. # create a statement set
  21. statement_set = table_env.create_statement_set()
  22. # emit the "table" object to the "first_sink_table"
  23. statement_set.add_insert("first_sink_table", table)
  24. # emit the "simple_source" to the "second_sink_table" via a insert sql query
  25. statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")
  26. # execute the statement set
  27. statement_set.execute().wait()

The results are as following:

  1. 7> +I(1,Hi)
  2. 7> +I(1,Hi)
  3. 7> +I(2,Hello)
  4. 7> +I(2,Hello)

Explain Tables

The Table API provides a mechanism to explain the logical and optimized query plans used to compute a Table. This is done through the Table.explain() or StatementSet.explain() methods. Table.explain() returns the plan of a Table. StatementSet.explain() is used to get the plan for a job which contains multiple sinks. These methods return a string describing three things:

  1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
  2. the optimized logical query plan, and
  3. the physical execution plan.

TableEnvironment.explain_sql() and TableEnvironment.execute_sql() support executing an EXPLAIN statement to get the plans. Please refer to the EXPLAIN page for more details.

The following code shows how to use the Table.explain() method:

  1. # using a stream TableEnvironment
  2. from pyflink.table import EnvironmentSettings, TableEnvironment
  3. from pyflink.table.expressions import col
  4. env_settings = EnvironmentSettings.in_streaming_mode()
  5. table_env = TableEnvironment.create(env_settings)
  6. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  7. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  8. table = table1 \
  9. .where(col("data").like('H%')) \
  10. .union_all(table2)
  11. print(table.explain())

The results are as following:

  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. :- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
  4. : +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]])
  5. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]])
  6. == Optimized Logical Plan ==
  7. Union(all=[true], union=[id, data])
  8. :- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
  9. : +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  10. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  11. == Physical Execution Plan ==
  12. Stage 133 : Data Source
  13. content : Source: PythonInputFormatTableSource(id, data)
  14. Stage 134 : Operator
  15. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_201907291, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  16. ship_strategy : FORWARD
  17. Stage 135 : Operator
  18. content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
  19. ship_strategy : FORWARD
  20. Stage 136 : Data Source
  21. content : Source: PythonInputFormatTableSource(id, data)
  22. Stage 137 : Operator
  23. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1709623525, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  24. ship_strategy : FORWARD

The following code shows how to use the StatementSet.explain() method:

  1. # using a stream TableEnvironment
  2. from pyflink.table import EnvironmentSettings, TableEnvironment
  3. from pyflink.table.expressions import col
  4. env_settings = EnvironmentSettings.in_streaming_mode()
  5. table_env = TableEnvironment.create(env_settings)
  6. table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  7. table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
  8. table_env.execute_sql("""
  9. CREATE TABLE print_sink_table (
  10. id BIGINT,
  11. data VARCHAR
  12. ) WITH (
  13. 'connector' = 'print'
  14. )
  15. """)
  16. table_env.execute_sql("""
  17. CREATE TABLE black_hole_sink_table (
  18. id BIGINT,
  19. data VARCHAR
  20. ) WITH (
  21. 'connector' = 'blackhole'
  22. )
  23. """)
  24. statement_set = table_env.create_statement_set()
  25. statement_set.add_insert("print_sink_table", table1.where(col("data").like('H%')))
  26. statement_set.add_insert("black_hole_sink_table", table2)
  27. print(statement_set.explain())

The results are as following:

  1. == Abstract Syntax Tree ==
  2. LogicalSink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  3. +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'H%')])
  4. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]])
  5. LogicalSink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  6. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]])
  7. == Optimized Logical Plan ==
  8. Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  9. +- Calc(select=[id, data], where=[LIKE(data, _UTF-16LE'H%')])
  10. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  11. Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  12. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])
  13. == Physical Execution Plan ==
  14. Stage 139 : Data Source
  15. content : Source: PythonInputFormatTableSource(id, data)
  16. Stage 140 : Operator
  17. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_541737614, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  18. ship_strategy : FORWARD
  19. Stage 141 : Operator
  20. content : Calc(select=[id, data], where=[(data LIKE _UTF-16LE'H%')])
  21. ship_strategy : FORWARD
  22. Stage 143 : Data Source
  23. content : Source: PythonInputFormatTableSource(id, data)
  24. Stage 144 : Operator
  25. content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1437429083, source: [PythonInputFormatTableSource(id, data)]], fields=[id, data])
  26. ship_strategy : FORWARD
  27. Stage 142 : Data Sink
  28. content : Sink: Sink(table=[default_catalog.default_database.print_sink_table], fields=[id, data])
  29. ship_strategy : FORWARD
  30. Stage 145 : Data Sink
  31. content : Sink: Sink(table=[default_catalog.default_database.black_hole_sink_table], fields=[id, data])
  32. ship_strategy : FORWARD