调试
本页介绍如何在PyFlink进行调试
打印日志信息
客户端日志
你可以通过 print
或者标准的 Python logging 模块,在 PyFlink 作业中,Python UDF 之外的地方打印上下文和调试信息。 在提交作业时,日志信息会打印在客户端的日志文件中。
from pyflink.table import EnvironmentSettings, TableEnvironment
# 创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
# 使用 logging 模块
import logging
logging.warning(table.get_schema())
# 使用 print 函数
print(table.get_schema())
注意: 客户端缺省的日志级别是 WARNING
,因此,只有日志级别在 WARNING
及以上的日志信息才会打印在客户端的日志文件中。
服务器端日志
你可以通过 print
或者标准的 Python logging 模块,在 Python UDF 中打印上下文和调试信息。 在作业运行的过程中,日志信息会打印在 TaskManager
的日志文件中。
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import logging
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
# 使用 logging 模块
logging.info("debug")
# 使用 print 函数
print('debug')
return i + j
注意: 服务器端缺省的日志级别是 INFO
,因此,只有日志级别在 INFO
及以上的日志信息才会打印在 TaskManager
的日志文件中。
查看日志
如果设置了环境变量FLINK_HOME
,日志将会放置在FLINK_HOME
指向目录的log目录之下。否则,日志将会放在安装的Pyflink模块的 log目录下。你可以通过执行下面的命令来查找PyFlink模块的log目录的路径:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
调试Python UDFs
本地调试
你可以直接在 PyCharm 等 IDE 调试你的 Python 函数。
远程调试
你可以利用PyCharm提供的pydevd_pycharm工具进行Python UDF的调试
在PyCharm里创建一个Python Remote Debug
run -> Python Remote Debug -> + -> 选择一个port (e.g. 6789)
安装
pydevd-pycharm
工具$ pip install pydevd-pycharm
在你的Python UDF里面添加如下的代码
import pydevd_pycharm
pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
启动刚刚创建的Python Remote Dubug Server
运行你的Python代码
Profiling Python UDFs
你可以打开profile来分析性能瓶颈
t_env.get_config().set("python.profile.enabled", "true")
你可以在日志里面查看profile的结果