常见问题

本页介绍了针对PyFlink用户的一些常见问题的解决方案。

准备Python虚拟环境

您可以下载便捷脚本,以准备可在Mac OS和大多数Linux发行版上使用的Python虚拟环境包(virtual env zip)。 您可以指定PyFlink的版本,来生成对应的PyFlink版本所需的Python虚拟环境,否则将安装最新版本的PyFlink所对应的Python虚拟环境。

  1. $ sh setup-pyflink-virtual-env.sh 1.12.0

使用Python虚拟环境执行PyFlink任务

在设置了python虚拟环境之后(如上一节所述),您应该在执行PyFlink作业之前激活虚拟环境。

本地(Local)

  1. # activate the conda python virtual environment
  2. $ source venv/bin/activate
  3. $ python xxx.py

集群(Cluster)

  1. $ # 指定Python虚拟环境
  2. $ table_env.add_python_archive("venv.zip")
  3. $ # 指定用于执行python UDF workers (用户自定义函数工作者) 的python解释器的路径
  4. $ table_env.get_config().set_python_executable("venv.zip/venv/bin/python")

如果需要了解add_python_archiveset_python_executable用法的详细信息,请参阅相关文档

添加Jar文件

PyFlink作业可能依赖jar文件,比如connector,Java UDF等。 您可以在提交作业时使用以下Python Table API或通过命令行参数来指定依赖项。

  1. # 注意:仅支持本地文件URL(以"file:"开头)。
  2. table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
  3. # 注意:路径必须指定协议(例如:文件——"file"),并且用户应确保在客户端和群集上都可以访问这些URL。
  4. table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

有关添加Java依赖项的API的详细信息,请参阅相关文档

添加Python文件

您可以使用命令行参数pyfs或TableEnvironment的API add_python_file添加python文件依赖,这些依赖可以是python文件,python软件包或本地目录。 例如,如果您有一个名为myDir的目录,该目录具有以下层次结构:

  1. myDir
  2. ├──utils
  3. ├──__init__.py
  4. ├──my_util.py

您可以将添加目录myDir添加到Python依赖中,如下所示:

  1. table_env.add_python_file('myDir')
  2. def my_udf():
  3. from utils import my_util

当在 mini cluster 环境执行作业时,显式等待作业执行结束

当在 mini cluster 环境执行作业(比如,在IDE中执行作业)且在作业中使用了如下API(比如 Python Table API 的 TableEnvironment.execute_sql, StatementSet.execute 和 Python DataStream API 的 StreamExecutionEnvironment.execute_async) 的时候,因为这些API是异步的,请记得显式地等待作业执行结束。否则程序会在已提交的作业执行结束之前退出,以致无法观测到已提交作业的执行结果。 请参考如下示例代码,了解如何显式地等待作业执行结束:

  1. # 异步执行 SQL / Table API 作业
  2. t_result = table_env.execute_sql(...)
  3. t_result.wait()
  4. # 异步执行 DataStream 作业
  5. job_client = stream_execution_env.execute_async('My DataStream Job')
  6. job_client.get_job_execution_result().result()

注意: 当往远程集群提交作业时,无需显式地等待作业执行结束,所以当往远程集群提交作业之前,请记得移除这些等待作业执行结束的代码逻辑。