常见问题

本页介绍了针对PyFlink用户的一些常见问题的解决方案。

准备Python虚拟环境

您可以下载便捷脚本,以准备可在Mac OS和大多数Linux发行版上使用的Python虚拟环境包(virtual env zip)。 您可以指定PyFlink的版本,来生成对应的PyFlink版本所需的Python虚拟环境,否则将安装最新版本的PyFlink所对应的Python虚拟环境。

  1. $ sh setup-pyflink-virtual-env.sh 1.11.0

使用Python虚拟环境执行PyFlink任务

在设置了python虚拟环境之后(如上一节所述),您应该在执行PyFlink作业之前激活虚拟环境。

本地(Local)

  1. # activate the conda python virtual environment
  2. $ source venv/bin/activate
  3. $ python xxx.py

集群(Cluster)

  1. $ # 指定Python虚拟环境
  2. $ table_env.add_python_archive("venv.zip")
  3. $ # 指定用于执行python UDF workers (用户自定义函数工作者) 的python解释器的路径
  4. $ table_env.get_config().set_python_executable("venv.zip/venv/bin/python")

如果需要了解add_python_archiveset_python_executable用法的详细信息,请参阅相关文档

添加Jar文件

PyFlink作业可能依赖jar文件,比如connector,Java UDF等。 您可以在提交作业时使用以下Python Table API或通过命令行参数来指定依赖项。

  1. # 注意:仅支持本地文件URL(以"file:"开头)。
  2. table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
  3. # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
  4. table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

有关添加Java依赖项的API的详细信息,请参阅相关文档

添加Python文件

您可以使用命令行参数pyfs或TableEnvironment的API add_python_file添加python文件依赖,这些依赖可以是python文件,python软件包或本地目录。 例如,如果您有一个名为myDir的目录,该目录具有以下层次结构:

  1. myDir
  2. ├──utils
  3. ├──__init__.py
  4. ├──my_util.py

您可以将添加目录myDir添加到Python依赖中,如下所示:

  1. table_env.add_python_file('myDir')
  2. def my_udf():
  3. from utils import my_util