TableEnvironment
This document is an introduction of PyFlink TableEnvironment
. It includes detailed descriptions of every public interface of the TableEnvironment
class.
Create a TableEnvironment
The recommended way to create a TableEnvironment
is to create from an EnvironmentSettings
object:
from pyflink.table import EnvironmentSettings, TableEnvironment
# create a streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
# or a batch TableEnvironment
# env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = TableEnvironment.create(env_settings)
Alternatively, users can create a StreamTableEnvironment
from an existing StreamExecutionEnvironment
to interoperate with the DataStream API.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, BatchTableEnvironment, TableConfig
# create a blink streaming TableEnvironment from a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
TableEnvironment API
Table/SQL Operations
These APIs are used to create/remove Table API/SQL Tables and write queries:
APIs | Description | Docs |
---|---|---|
from_elements(elements, schema=None, verify_schema=True) | Creates a table from a collection of elements. | link |
from_pandas(pdf, schema=None, split_num=1) | Creates a table from a pandas DataFrame. | link |
from_path(path) | Creates a table from a registered table under the specified path, e.g. tables registered via create_temporary_view. | link |
sql_query(query) | Evaluates a SQL query and retrieves the result as a Table object. | link |
create_temporary_view(view_path, table) | Registers a Table object as a temporary view similar to SQL temporary views. | link |
drop_temporary_view(view_path) | Drops a temporary view registered under the given path. | link |
drop_temporary_table(table_path) | Drops a temporary table registered under the given path. You can use this interface to drop the temporary source table and temporary sink table. | link |
execute_sql(stmt) | Executes the given single statement and returns the execution result. The statement can be DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE. Note that for “INSERT INTO” statement this is an asynchronous operation, which is usually expected when submitting a job to a remote cluster. However, when executing a job in a mini cluster or IDE, you need to wait until the job execution finished, then you can refer to here for more details. Please refer the SQL documentation for more details about SQL statement. | link |
Deprecated APIs
APIs | Description | Docs |
---|---|---|
from_table_source(table_source) | Creates a table from a table source. | link |
scan(*table_path) | Scans a registered table from catalog and returns the resulting Table. It can be replaced by from_path. | link |
register_table(name, table) | Registers a Table object under a unique name in the TableEnvironment’s catalog. Registered tables can be referenced in SQL queries. It can be replaced by create_temporary_view. | link |
register_table_source(name, table_source) | Registers an external TableSource in the TableEnvironment’s catalog. | link |
register_table_sink(name, table_sink) | Registers an external TableSink in the TableEnvironment’s catalog. | link |
insert_into(target_path, table) | Instructs to write the content of a Table object into a sink table. Note that this interface would not trigger the execution of jobs. You need to call the “execute” method to execute your job. | link |
sql_update(stmt) | Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement. It can be replaced by execute_sql. | link |
connect(connector_descriptor) | Creates a temporary table from a descriptor. Currently the recommended way is using execute_sql to register temporary tables. | link |
Execute/Explain Jobs
These APIs are used to explain/execute jobs. Note that the API execute_sql
can also be used to execute jobs.
APIs | Description | Docs |
---|---|---|
explain_sql(stmt, *extra_details) | Returns the AST and the execution plan of the specified statement. | link |
create_statement_set() | Creates a StatementSet instance which accepts DML statements or Tables. It can be used to execute a multi-sink job. | link |
Deprecated APIs
APIs | Description | Docs |
---|---|---|
explain(table=None, extended=False) | Returns the AST of the specified Table API and SQL queries and the execution plan to compute the result of the given Table object or multi-sinks plan. If you use the insert_into or sql_update method to emit data to multiple sinks, you can use this method to get the plan. It can be replaced by TableEnvironment.explain_sql, Table.explain or StatementSet.explain. | link |
execute(job_name) | Triggers the program execution. The environment will execute all parts of the program. If you use the insert_into or sql_update method to emit data to sinks, you can use this method trigger the program execution. This method will block the client program until the job is finished/canceled/failed. | link |
Create/Drop User Defined Functions
These APIs are used to register UDFs or remove the registered UDFs. Note that the API execute_sql
can also be used to register/remove UDFs. For more details about the different kinds of UDFs, please refer to User Defined Functions.
APIs | Description | Docs |
---|---|---|
create_temporary_function(path, function) | Registers a Python user defined function class as a temporary catalog function. | link |
create_temporary_system_function(name, function) | Registers a Python user defined function class as a temporary system function. If the name of a temporary system function is the same as a temporary catalog function, the temporary system function takes precedence. | link |
create_java_function(path, function_class_name, ignore_if_exists=None) | Registers a Java user defined function class as a catalog function under the given path. If the catalog is persistent, the registered catalog function can be used across multiple Flink sessions and clusters. | link |
create_java_temporary_function(path, function_class_name) | Registers a Java user defined function class as a temporary catalog function. | link |
create_java_temporary_system_function(name, function_class_name) | Registers a Java user defined function class as a temporary system function. | link |
drop_function(path) | Drops a catalog function registered under the given path. | link |
drop_temporary_function(path) | Drops a temporary system function registered under the given name. | link |
drop_temporary_system_function(name) | Drops a temporary system function registered under the given name. | link |
Deprecated APIs
APIs | Description | Docs |
---|---|---|
register_function(name, function) | Registers a Python user-defined function under a unique name. Replaces already existing user-defined function under this name. It can be replaced by create_temporary_system_function. | link |
register_java_function(name, function_class_name) | Registers a Java user defined function under a unique name. Replaces already existing user-defined functions under this name. It can be replaced by create_java_temporary_system_function. | link |
Dependency Management
These APIs are used to manage the Python dependencies which are required by the Python UDFs. Please refer to the Dependency Management documentation for more details.
APIs | Description | Docs |
---|---|---|
add_python_file(file_path) | Adds a Python dependency which could be Python files, Python packages or local directories. They will be added to the PYTHONPATH of the Python UDF worker. | link |
set_python_requirements(requirements_file_path, requirements_cache_dir=None) | Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the Python UDF worker. | link |
add_python_archive(archive_path, target_dir=None) | Adds a Python archive file. The file will be extracted to the working directory of Python UDF worker. | link |
Configuration
APIs | Description | Docs |
---|---|---|
get_config() | Returns the table config to define the runtime behavior of the Table API. You can find all the available configuration options in Configuration and Python Configuation. The following code is an example showing how to set the configuration options through this API: python # set the parallelism to 8 table_env.get_config().get_configuration().set_string( "parallelism.default", "8") | link |
Catalog APIs
These APIs are used to access catalogs and modules. You can find more detailed introduction in Modules and Catalogs documentation.
APIs | Description | Docs |
---|---|---|
register_catalog(catalog_name, catalog) | Registers a Catalog under a unique name. | link |
get_catalog(catalog_name) | Gets a registered Catalog by name. | link |
use_catalog(catalog_name) | Sets the current catalog to the given value. It also sets the default database to the catalog’s default one. | link |
get_current_catalog() | Gets the current default catalog name of the current session. | link |
get_current_database() | Gets the current default database name of the running session. | link |
use_database(database_name) | Sets the current default database. It has to exist in the current catalog. That path will be used as the default one when looking for unqualified object names. | link |
load_module(module_name, module) | Loads a Module under a unique name. Modules will be kept in the loaded order. | link |
unload_module(module_name) | Unloads a Module with given name. | link |
use_modules(*module_names) | Enables and changes the resolution order of loaded modules. | link |
list_catalogs() | Gets the names of all catalogs registered in this environment. | link |
list_modules() | Gets the names of all enabled modules registered in this environment. | link |
list_full_modules() | Gets the names of all loaded modules (including disabled modules) registered in this environment. | link |
list_databases() | Gets the names of all databases in the current catalog. | link |
list_tables() | Gets the names of all tables and views in the current database of the current catalog. It returns both temporary and permanent tables and views. | link |
list_views() | Gets the names of all views in the current database of the current catalog. It returns both temporary and permanent views. | link |
list_user_defined_functions() | Gets the names of all user defined functions registered in this environment. | link |
list_functions() | Gets the names of all functions in this environment. | link |
list_temporary_tables() | Gets the names of all temporary tables and views available in the current namespace (the current database of the current catalog). | link |
list_temporary_views() | Gets the names of all temporary views available in the current namespace (the current database of the current catalog). | link |
Statebackend, Checkpoint and Restart Strategy
Before Flink 1.10 you can configure the statebackend, checkpointing and restart strategy via the StreamExecutionEnvironment
. And now you can configure them by setting key-value options in TableConfig
, see Fault Tolerance, State Backends and Checkpointing for more details.
The following code is an example showing how to configure the statebackend, checkpoint and restart strategy through the Table API:
# set the restart strategy to "fixed-delay"
table_env.get_config().get_configuration().set_string("restart-strategy", "fixed-delay")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.attempts", "3")
table_env.get_config().get_configuration().set_string("restart-strategy.fixed-delay.delay", "30s")
# set the checkpoint mode to EXACTLY_ONCE
table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "3min")
# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
table_env.get_config().get_configuration().set_string("state.backend", "rocksdb")
# set the checkpoint directory, which is required by the RocksDB statebackend
table_env.get_config().get_configuration().set_string("state.checkpoints.dir", "file:///tmp/checkpoints/")