常见问题
本页介绍了针对PyFlink用户的一些常见问题的解决方案。
准备Python虚拟环境
您可以下载便捷脚本,以准备可在Mac OS和大多数Linux发行版上使用的Python虚拟环境包(virtual env zip)。 您可以指定PyFlink的版本,来生成对应的PyFlink版本所需的Python虚拟环境,否则将安装最新版本的PyFlink所对应的Python虚拟环境。
$ sh setup-pyflink-virtual-env.sh 1.12.0
使用Python虚拟环境执行PyFlink任务
在设置了python虚拟环境之后(如上一节所述),您应该在执行PyFlink作业之前激活虚拟环境。
本地(Local)
# activate the conda python virtual environment
$ source venv/bin/activate
$ python xxx.py
集群(Cluster)
$ # 指定Python虚拟环境
$ table_env.add_python_archive("venv.zip")
$ # 指定用于执行python UDF workers (用户自定义函数工作者) 的python解释器的路径
$ table_env.get_config().set_python_executable("venv.zip/venv/bin/python")
如果需要了解add_python_archive
和set_python_executable
用法的详细信息,请参阅相关文档。
添加Jar文件
PyFlink作业可能依赖jar文件,比如connector,Java UDF等。 您可以在提交作业时使用以下Python Table API或通过命令行参数来指定依赖项。
# 注意:仅支持本地文件URL(以"file:"开头)。
table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
# 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
有关添加Java依赖项的API的详细信息,请参阅相关文档。
添加Python文件
您可以使用命令行参数pyfs
或TableEnvironment的API add_python_file
添加python文件依赖,这些依赖可以是python文件,python软件包或本地目录。 例如,如果您有一个名为myDir
的目录,该目录具有以下层次结构:
myDir
├──utils
├──__init__.py
├──my_util.py
您可以将添加目录myDir
添加到Python依赖中,如下所示:
table_env.add_python_file('myDir')
def my_udf():
from utils import my_util
当在 mini cluster 环境执行作业时,显式等待作业执行结束
当在 mini cluster 环境执行作业(比如,在IDE中执行作业)且在作业中使用了如下API(比如 Python Table API 的 TableEnvironment.execute_sql, StatementSet.execute 和 Python DataStream API 的 StreamExecutionEnvironment.execute_async) 的时候,因为这些API是异步的,请记得显式地等待作业执行结束。否则程序会在已提交的作业执行结束之前退出,以致无法观测到已提交作业的执行结果。 请参考如下示例代码,了解如何显式地等待作业执行结束:
# 异步执行 SQL / Table API 作业
t_result = table_env.execute_sql(...)
t_result.wait()
# 异步执行 DataStream 作业
job_client = stream_execution_env.execute_async('My DataStream Job')
job_client.get_job_execution_result().result()
注意: 当往远程集群提交作业时,无需显式地等待作业执行结束,所以当往远程集群提交作业之前,请记得移除这些等待作业执行结束的代码逻辑。