Dependency Management

Java Dependency in Python Program

If third-party Java dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job.

  1. # Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";" and will be uploaded to the cluster.
  2. # NOTE: Only local file URLs (start with "file://") are supported.
  3. table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
  4. # Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" and will be added to the classpath of the cluster.
  5. # NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
  6. table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

Python Dependency in Python Program

If third-party Python dependencies are used, you can specify the dependencies with the following Python Table APIs or through command line arguments directly when submitting the job.

APIsDescription
add_python_file(file_path)

Adds python file dependencies which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker.

  1. table_env.add_python_file(file_path)
set_python_requirements(requirements_file_path, requirements_cache_dir=None)

Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker. For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter “requirements_cached_dir”. It will be uploaded to the cluster to support offline installation.

  1. # commands executed in shell
  2. echo numpy==1.16.5 > requirements.txt
  3. pip download -d cached_dir -r requirements.txt no-binary :all:
  4. # python code
  5. table_env.set_python_requirements(“requirements.txt”, cached_dir”)

Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0).

add_python_archive(archive_path, target_dir=None)

Adds a python archive file dependency. The file will be extracted to the working directory of python UDF worker. If the parameter “target_dir” is specified, the archive file will be extracted to a directory named “target_dir”. Otherwise, the archive file will be extracted to a directory with the same name of the archive file.

  1. # command executed in shell
  2. # assert the relative path of python interpreter is py_env/bin/python
  3. zip -r py_env.zip py_env
  4. # python code
  5. table_env.add_python_archive(“py_env.zip”)
  6. # or
  7. table_env.add_python_archive(“py_env.zip”, myenv”)
  8. # the files contained in the archive file can be accessed in UDF
  9. def my_udf():
  10. with open(“myenv/py_env/data/data.txt”) as f:

Please make sure the uploaded python environment matches the platform that the cluster is running on. Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc.

set_python_executable(python_exec)

Sets the path of the python interpreter which is used to execute the python udf workers, e.g., “/usr/local/bin/python3”.

  1. table_env.add_python_archive(“py_env.zip”)
  2. table_env.get_config().set_python_executable(“py_env.zip/py_env/bin/python”)

Please make sure that the specified environment matches the platform that the cluster is running on.

Python Dependency in Java/Scala Program

It also supports to use Python UDFs in the Java Table API programs or pure SQL programs. The following example shows how to use the Python UDFs in a Java Table API program:

  1. import org.apache.flink.configuration.CoreOptions;
  2. import org.apache.flink.table.api.EnvironmentSettings;
  3. import org.apache.flink.table.api.TableEnvironment;
  4. TableEnvironment tEnv = TableEnvironment.create(
  5. EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
  6. tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1);
  7. // register the Python UDF
  8. tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
  9. tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
  10. // use Python UDF in the Java Table API program
  11. tEnv.executeSql("select add_one(a) as a from source").collect();

You can refer to the SQL statement about CREATE FUNCTION for more details on how to create Python user-defined functions using SQL statements.

The Python dependencies could be specified via the Python config options, such as python.archives, python.files, python.requirements, python.client.executable, python.executable. etc or through command line arguments when submitting the job.