Python API Tutorial
In this guide we will start from scratch and go from setting up a Flink Python projectto running a Python Table API program.
Setting up a Python Project
Firstly, you can fire up your favorite IDE and create a Python project and thenyou need to install the PyFlink package. Pleasesee Build PyFlinkfor more details about this.
Writing a Flink Python Table API Program
The first step in a Flink Python Table API program is to create a BatchTableEnvironment
(or StreamTableEnvironment
if you are writing a streaming job). It is the main entry pointfor Python Table API jobs.
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
The ExecutionEnvironment
(or StreamExecutionEnvironment
if you are writing a streaming job)can be used to set execution parameters, such as the restart strategy, default parallelism, etc.
The TableConfig
can be used by setting the parameters such as the built-in catalog name, thethreshold where generating code, etc.
Next we will create a source table and a sink table.
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.line_delimiter(' ')
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
This registers a table named mySource
and a table named mySink
in theExecutionEnvironment
. The table mySource
has only one column: word.It represents the words read from file /tmp/input
. The table mySink
has two columns:word and count. It writes data to file /tmp/output
, with \t
as the field delimiter.
Then we need to create a job which reads input from table mySource
, preforms someoperations and writes the results to table mySink
.
t_env.scan('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
The last thing is to start the actual Flink Python Table API job. All operations, such ascreating sources, transformations and sinks only build up a graph of internal operations.Only when t_env.execute(job_name)
is called, this graph of operations will be thrown on a cluster orexecuted on your local machine.
t_env.execute("tutorial_job")
The complete code so far is as follows:
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.line_delimiter(' ')
.field('word', DataTypes.STRING())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())) \
.register_table_source('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field_delimiter('\t')
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('word', DataTypes.STRING())
.field('count', DataTypes.BIGINT())) \
.register_table_sink('mySink')
t_env.scan('mySource') \
.group_by('word') \
.select('word, count(1)') \
.insert_into('mySink')
t_env.execute("tutorial_job")
Executing a Flink Python Table API Program
You can run this example in your IDE or on the command line (suppose the job script file isWordCount.py):
$ python WordCount.py
The command builds and runs the Python Table API program in a local mini cluster.You can also submit the Python Table API program to a remote cluster, you can referJob Submission Examplesfor more details.
This should get you started with writing your own Flink Python Table API programs.To learn more about the Python Table API, you can referFlink Python Table API Docs for more details.