Execution Mode

The Python API supports different runtime execution modes from which you can choose depending on the requirements of your use case and the characteristics of your job. The Python runtime execution mode defines how the Python user-defined functions will be executed.

Prior to release-1.15, there is the only execution mode called PROCESS execution mode. The PROCESS mode means that the Python user-defined functions will be executed in separate Python processes.

In release-1.15, it has introduced a new execution mode called THREAD execution mode. The THREAD mode means that the Python user-defined functions will be executed in JVM.

NOTE: Multiple Python user-defined functions running in the same JVM are still affected by GIL.

When can/should I use THREAD execution mode?

The purpose of the introduction of THREAD mode is to overcome the overhead of serialization/deserialization and network communication introduced of inter-process communication in the PROCESS mode. So if performance is not your concern, or the computing logic of your Python user-defined functions is the performance bottleneck of the job, PROCESS mode will be the best choice as PROCESS mode provides the best isolation compared to THREAD mode.

Configuring Python execution mode

The execution mode can be configured via the python.execution-mode setting. There are two possible values:

  • PROCESS: The Python user-defined functions will be executed in separate Python process. (default)
  • THREAD: The Python user-defined functions will be executed in JVM.

You could specify the execution mode in Python Table API or Python DataStream API jobs as following:

  1. ## Python Table API
  2. # Specify `PROCESS` mode
  3. table_env.get_config().set("python.execution-mode", "process")
  4. # Specify `THREAD` mode
  5. table_env.get_config().set("python.execution-mode", "thread")
  6. ## Python DataStream API
  7. config = Configuration()
  8. # Specify `PROCESS` mode
  9. config.set_string("python.execution-mode", "process")
  10. # Specify `THREAD` mode
  11. config.set_string("python.execution-mode", "thread")
  12. # Create the corresponding StreamExecutionEnvironment
  13. env = StreamExecutionEnvironment.get_execution_environment(config)

Supported Cases

Python Table API

The following table shows where the THREAD execution mode is supported in Python Table API.

UDFsPROCESSTHREAD
Python UDFYesYes
Python UDTFYesYes
Python UDAFYesNo
Pandas UDF & Pandas UDAFYesNo

Python DataStream API

The following Table shows the supported cases in Python DataStream API.

OperatorsPROCESSTHREAD
MapYesYes
FlatMapYesYes
FilterYesYes
ReduceYesYes
UnionYesYes
ConnectYesYes
CoMapYesYes
CoFlatMapYesYes
Process FunctionYesYes
Window ApplyYesYes
Window AggregateYesYes
Window ReduceYesYes
Window ProcessYesYes
Side OutputYesYes
StateYesYes
IterateNoNo
Window CoGroupNoNo
Window JoinNoNo
Interval JoinNoNo
Async I/ONoNo

Currently, it still doesn’t support to execute Python UDFs in THREAD execution mode in all places. It will fall back to PROCESS execution mode in these cases. So it may happen that you configure a job to execute in THREAD execution mode, however, it’s actually executed in PROCESS execution mode.

THREAD execution mode is only supported in Python 3.8+.

Execution Behavior

This section provides an overview of the execution behavior of THREAD execution mode and contrasts they with PROCESS execution mode. For more details, please refer to the FLIP that introduced this feature: FLIP-206.

PROCESS Execution Mode

In PROCESS execution mode, the Python user-defined functions will be executed in separate Python Worker process. The Java operator process communicates with the Python worker process using various Grpc services.

Process Execution Mode

THREAD Execution Mode

In THREAD execution mode, the Python user-defined functions will be executed in the same process as Java operators. PyFlink takes use of third part library PEMJA to embed Python in Java Application.

Embedded Execution Mode