自定义函数(UDF)

用户自定义函数是重要的功能,因为它们极大地扩展了Python Table API程序的表达能力。

注意: 要执行Python用户自定义函数,客户端和集群端都需要安装Python版本3.5、3.6或3.7,并安装PyFlink。

标量函数(ScalarFunction)

PyFlink支持在Python Table API程序中使用Python标量函数。 如果要定义Python标量函数, 可以继承pyflink.table.udf中的基类ScalarFunction,并实现eval方法。 Python标量函数的行为由名为eval的方法定义,eval方法支持可变长参数,例如eval(* args)

以下示例显示了如何定义自己的Python哈希函数、如何在TableEnvironment中注册它以及如何在作业中使用它。

  1. class HashCode(ScalarFunction):
  2. def __init__(self):
  3. self.factor = 12
  4. def eval(self, s):
  5. return hash(s) * self.factor
  6. table_env = BatchTableEnvironment.create(env)
  7. # Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
  8. table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
  9. # 注册Python自定义函数
  10. table_env.register_function("hash_code", udf(HashCode(), DataTypes.BIGINT(), DataTypes.BIGINT()))
  11. # 在Python Table API中使用Python自定义函数
  12. my_table.select("string, bigint, bigint.hash_code(), hash_code(bigint)")
  13. # 在SQL API中使用Python自定义函数
  14. table_env.sql_query("SELECT string, bigint, hash_code(bigint) FROM MyTable")

注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。

除此之外,还支持在Python Table API程序中使用Java / Scala标量函数。

  1. '''
  2. Java code:
  3. // Java类必须具有公共的无参数构造函数,并且可以在当前的Java类加载器中可以加载到。
  4. public class HashCode extends ScalarFunction {
  5. private int factor = 12;
  6. public int eval(String s) {
  7. return s.hashCode() * factor;
  8. }
  9. }
  10. '''
  11. table_env = BatchTableEnvironment.create(env)
  12. # Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
  13. table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
  14. # 注册Java函数
  15. table_env.register_java_function("hash_code", "my.java.function.HashCode")
  16. # 在Python Table API中使用Java函数
  17. my_table.select("string.hash_code(), hash_code(string)")
  18. # 在SQL API中使用Java函数
  19. table_env.sql_query("SELECT string, bigint, hash_code(string) FROM MyTable")

注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。

除了扩展基类ScalarFunction之外,还支持多种方式来定义Python标量函数。 以下示例显示了多种定义Python标量函数的方式。该函数需要两个类型为bigint的参数作为输入参数,并返回它们的总和作为结果。

  1. # 方式一:扩展基类`ScalarFunction`
  2. class Add(ScalarFunction):
  3. def eval(self, i, j):
  4. return i + j
  5. add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  6. # 方式二:普通Python函数
  7. @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  8. def add(i, j):
  9. return i + j
  10. # 方式三:lambda函数
  11. add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  12. # 方式四:callable函数
  13. class CallableAdd(object):
  14. def __call__(self, i, j):
  15. return i + j
  16. add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  17. # 方式五:partial函数
  18. def partial_add(i, j, k):
  19. return i + j + k
  20. add = udf(functools.partial(partial_add, k=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
  21. DataTypes.BIGINT())
  22. # 注册Python自定义函数
  23. table_env.register_function("add", add)
  24. # 在Python Table API中使用Python自定义函数
  25. my_table.select("add(a, b)")

表值函数

与Python用户自定义标量函数类似,Python用户自定义表值函数以零个,一个或者多个列作为输入参数。但是,与标量函数不同的是,表值函数可以返回 任意数量的行作为输出而不是单个值。Python用户自定义表值函数的返回类型可以是Iterable,Iterator或generator类型。

以下示例说明了如何定义自己的Python自定义表值函数,将其注册到TableEnvironment中,并在作业中使用它。

  1. class Split(TableFunction):
  2. def eval(self, string):
  3. for s in string.split(" "):
  4. yield s, len(s)
  5. env = StreamExecutionEnvironment.get_execution_environment()
  6. table_env = StreamTableEnvironment.create(env)
  7. my_table = ... # type: Table, table schema: [a: String]
  8. # Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
  9. table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
  10. # 注册Python表值函数
  11. table_env.register_function("split", udtf(Split(), DataTypes.STRING(), [DataTypes.STRING(), DataTypes.INT()]))
  12. # 在Python Table API中使用Python表值函数
  13. my_table.join_lateral("split(a) as (word, length)")
  14. my_table.left_outer_join_lateral("split(a) as (word, length)")
  15. # 在SQL API中使用Python表值函数
  16. table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
  17. table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")

注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。

除此之外,还支持在Python Table API程序中使用Java / Scala表值函数。

  1. '''
  2. Java code:
  3. // 类型"Tuple2 <String,Integer>"代表,表值函数的输出类型为(String,Integer)。
  4. // Java类必须具有公共的无参数构造函数,并且可以在当前的Java类加载器中加载到。
  5. public class Split extends TableFunction<Tuple2<String, Integer>> {
  6. private String separator = " ";
  7. public void eval(String str) {
  8. for (String s : str.split(separator)) {
  9. // use collect(...) to emit a row
  10. collect(new Tuple2<String, Integer>(s, s.length()));
  11. }
  12. }
  13. }
  14. '''
  15. env = StreamExecutionEnvironment.get_execution_environment()
  16. table_env = StreamTableEnvironment.create(env)
  17. my_table = ... # type: Table, table schema: [a: String]
  18. # Python worker进程默认使用off-heap内存,配置当前taskmanager的off-heap内存大小
  19. table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')
  20. # 注册java自定义函数。
  21. table_env.register_java_function("split", "my.java.function.Split")
  22. # 在Python Table API中使用表值函数。 "as"指定表的字段名称。
  23. my_table.join_lateral("split(a) as (word, length)").select("a, word, length")
  24. my_table.left_outer_join_lateral("split(a) as (word, length)").select("a, word, length")
  25. # 注册python函数。
  26. # 在SQL中将table函数与LATERAL和TABLE关键字一起使用。
  27. # CROSS JOIN表值函数(等效于Table API中的"join")。
  28. table_env.sql_query("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)")
  29. # LEFT JOIN一个表值函数(等同于Table API中的"left_outer_join")。
  30. table_env.sql_query("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE")

注意当前不支持Python worker进程与RocksDB state backend同时使用managed memory。 如果作业中不使用RocksDB state backend的话, 您也可以将配置项python.fn-execution.memory.managed设置为true, 配置Python worker进程使用managed memory。这样的话,就不需要配置taskmanager.memory.task.off-heap.size了。

像Python标量函数一样,您可以使用上述五种方式来定义Python表值函数。

注意 唯一的区别是,Python表值函数的返回类型必须是iterable(可迭代子类), iterator(迭代器) or generator(生成器)。

  1. # 选项1:生成器函数
  2. @udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
  3. def generator_func(x):
  4. yield 1
  5. yield 2
  6. # 选项2:返回迭代器
  7. @udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
  8. def iterator_func(x):
  9. return range(5)
  10. # 选项3:返回可迭代子类
  11. @udtf(input_types=DataTypes.BIGINT(), result_types=DataTypes.BIGINT())
  12. def iterable_func(x):
  13. result = [1, 2, 3]
  14. return result
  15. table_env.register_function("iterable_func", iterable_func)
  16. table_env.register_function("iterator_func", iterator_func)
  17. table_env.register_function("generator_func", generator_func)