自定义函数(UDF)
用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。
注意: 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。
标量函数(ScalarFunction)
PyFlink支持在Python Table API程序中使用Python标量函数。 如果要定义Python标量函数, 可以继承pyflink.table.udf
中的基类ScalarFunction,并实现eval方法。 Python标量函数的行为由名为eval
的方法定义,eval方法支持可变长参数,例如eval(* args)
。
以下示例显示了如何定义自己的Python哈希函数、如何在TableEnvironment中注册它以及如何在作业中使用它。
class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12
def eval(self, s):
return hash(s) * self.factor
table_env = BatchTableEnvironment.create(env)
# Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
# 注册Python自定义函数
table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
# 在Python Table API中使用Python自定义函数
my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
# 在SQL API中使用Python自定义函数
table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除此之外,还支持在Python Table API程序中使用Java / Scala标量函数。
'''
Java code:
// Java类必须具有公共的无参数构造函数,并且可以在当前的Java类加载器中可以加载到。
public class HashCode extends ScalarFunction {
private int factor = 12;
public int eval(String s) {
return s.hashCode() * factor;
}
}
'''
table_env = BatchTableEnvironment.create(env)
# Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
# 注册Java函数
table_env.register_java_function("hash_code", "my.java.function.HashCode")
# 在Python Table API中使用Java函数
my_table.select("string.hash_code(), hash_code(string)")
# 在SQL API中使用Java函数
table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除了扩展基类ScalarFunction
之外,还支持多种方式来定义Python标量函数。 以下示例显示了多种定义Python标量函数的方式。该函数需要两个类型为bigint的参数作为输入参数,并返回它们的总和作为结果。
# 方式一:扩展基类`ScalarFunction`
class Add(ScalarFunction):
def eval(self, i, j):
return i + j
add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
# 方式二:普通Python函数
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
# 方式三:lambda函数
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
# 方式四:callable函数
class CallableAdd(object):
def __call__(self, i, j):
return i + j
add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
# 方式五:partial函数
def partial_add(i, j, k):
return i + j + k
add = udf(functools.partial(partial_add, k=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT())
# 注册Python自定义函数
table_env.register_function("add", add)
# 在Python Table API中使用Python自定义函数
my_table.select("add(a, b)")
表值函数
与Python用户自定义标量函数类似,Python用户自定义表值函数以零个,一个或者多个列作为输入参数。但是,与标量函数不同的是,表值函数可以返回 任意数量的行作为输出而不是单个值。Python用户自定义表值函数的返回类型可以是Iterable,Iterator或generator类型。
以下示例说明了如何定义自己的Python自定义表值函数,将其注册到TableEnvironment中,并在作业中使用它。
class Split(TableFunction):
def eval(self, string):
for s in string.split(" "):
yield s, len(s)
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
# Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
# 注册Python表值函数
table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
# 在Python Table API中使用Python表值函数
my_table.join_lateral("split(a) as (word, length)")
my_table.left_outer_join_lateral("split(a) as (word, length)")
# 在SQL API中使用Python表值函数
table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
除此之外,还支持在Python Table API程序中使用Java / Scala表值函数。
'''
Java code:
// 类型"Tuple2 <String,Integer>"代表,表值函数的输出类型为(String,Integer)。
// Java类必须具有公共的无参数构造函数,并且可以在当前的Java类加载器中加载到。
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";
public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
'''
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
my_table = ... # type: Table, table schema: [a: String]
# Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
# 注册java自定义函数。
table_env.register_java_function("split", "my.java.function.Split")
# 在Python Table API中使用表值函数。 "as"指定表的字段名称。
my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, word, length")
# 注册python函数。
# 在SQL中将table函数与LATERAL和TABLE关键字一起使用。
# CROSS JOIN表值函数(等效于Table API中的"join")。
table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
# LEFT JOIN一个表值函数(等同于Table API中的"left_outer_join")。
table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")
注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。
像Python标量函数一样,您可以使用上述五种方式来定义Python表值函数。
注意 唯一的区别是,Python表值函数的返回类型必须是iterable(可迭代子类), iterator(迭代器) or generator(生成器)。
# 选项1:生成器函数
@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
def generator_func(x):
yield 1
yield 2
# 选项2:返回迭代器
@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
def iterator_func(x):
return range(5)
# 选项3:返回可迭代子类
@udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
def iterable_func(x):
result = [1, 2, 3]
return result
table_env.register_function("iterable_func", iterable_func)
table_env.register_function("iterator_func", iterator_func)
table_env.register_function("generator_func", generator_func)