User-defined Functions

User-defined functions (UDFs) are extension points to call frequently used logic or custom logic that cannot be expressed otherwise in queries.

User-defined functions can be implemented in a JVM language (such as Java or Scala) or Python. An implementer can use arbitrary third party libraries within a UDF. This page will focus on JVM-based languages, please refer to the PyFlink documentation for details on writing general and vectorized UDFs in Python.

Overview

Currently, Flink distinguishes between the following kinds of functions:

  • Scalar functions map scalar values to a new scalar value.
  • Table functions map scalar values to new rows.
  • Aggregate functions map scalar values of multiple rows to a new scalar value.
  • Table aggregate functions map scalar values of multiple rows to new rows.
  • Async table functions are special functions for table sources that perform a lookup.

The following example shows how to create a simple scalar function and how to call the function in both Table API and SQL.

For SQL queries, a function must always be registered under a name. For Table API, a function can be registered or directly used inline.

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. import static org.apache.flink.table.api.Expressions.*;
  4. // define function logic
  5. public static class SubstringFunction extends ScalarFunction {
  6. public String eval(String s, Integer begin, Integer end) {
  7. return s.substring(begin, end);
  8. }
  9. }
  10. TableEnvironment env = TableEnvironment.create(...);
  11. // call function "inline" without registration in Table API
  12. env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
  13. // register function
  14. env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
  15. // call registered function in Table API
  16. env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
  17. // call registered function in SQL
  18. env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
  19. // call registered function in SQL using named parameters
  20. env.sqlQuery("SELECT SubstringFunction(param1 => myField, param2 => 5, param3 => 12) FROM MyTable");

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.functions.ScalarFunction
  3. // define function logic
  4. class SubstringFunction extends ScalarFunction {
  5. def eval(s: String, begin: Integer, end: Integer): String = {
  6. s.substring(begin, end)
  7. }
  8. }
  9. val env = TableEnvironment.create(...)
  10. // call function "inline" without registration in Table API
  11. env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12))
  12. // register function
  13. env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction])
  14. // call registered function in Table API
  15. env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12))
  16. // call registered function in SQL
  17. env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")

For interactive sessions, it is also possible to parameterize functions before using or registering them. In this case, function instances instead of function classes can be used as temporary functions.

It requires that the parameters are serializable for shipping function instances to the cluster.

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. import static org.apache.flink.table.api.Expressions.*;
  4. // define parameterizable function logic
  5. public static class SubstringFunction extends ScalarFunction {
  6. private boolean endInclusive;
  7. public SubstringFunction(boolean endInclusive) {
  8. this.endInclusive = endInclusive;
  9. }
  10. public String eval(String s, Integer begin, Integer end) {
  11. return s.substring(begin, endInclusive ? end + 1 : end);
  12. }
  13. }
  14. TableEnvironment env = TableEnvironment.create(...);
  15. // call function "inline" without registration in Table API
  16. env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));
  17. // register function
  18. env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true));

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.functions.ScalarFunction
  3. // define parameterizable function logic
  4. class SubstringFunction(val endInclusive) extends ScalarFunction {
  5. def eval(s: String, begin: Integer, end: Integer): String = {
  6. s.substring(endInclusive ? end + 1 : end)
  7. }
  8. }
  9. val env = TableEnvironment.create(...)
  10. // call function "inline" without registration in Table API
  11. env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5, 12))
  12. // register function
  13. env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true))

You can use star * expression as one argument of the function call to act as a wildcard in Table API, all columns in the table will be passed to the function at the corresponding position.

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. import static org.apache.flink.table.api.Expressions.*;
  4. public static class MyConcatFunction extends ScalarFunction {
  5. public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... fields) {
  6. return Arrays.stream(fields)
  7. .map(Object::toString)
  8. .collect(Collectors.joining(","));
  9. }
  10. }
  11. TableEnvironment env = TableEnvironment.create(...);
  12. // call function with $("*"), if MyTable has 3 fields (a, b, c),
  13. // all of them will be passed to MyConcatFunction.
  14. env.from("MyTable").select(call(MyConcatFunction.class, $("*")));
  15. // it's equal to call function with explicitly selecting all columns.
  16. env.from("MyTable").select(call(MyConcatFunction.class, $("a"), $("b"), $("c")));

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.functions.ScalarFunction
  3. import scala.annotation.varargs
  4. class MyConcatFunction extends ScalarFunction {
  5. @varargs
  6. def eval(@DataTypeHint(inputGroup = InputGroup.ANY) row: AnyRef*): String = {
  7. row.map(f => f.toString).mkString(",")
  8. }
  9. }
  10. val env = TableEnvironment.create(...)
  11. // call function with $"*", if MyTable has 3 fields (a, b, c),
  12. // all of them will be passed to MyConcatFunction.
  13. env.from("MyTable").select(call(classOf[MyConcatFunction], $"*"));
  14. // it's equal to call function with explicitly selecting all columns.
  15. env.from("MyTable").select(call(classOf[MyConcatFunction], $"a", $"b", $"c"));

TableEnvironment provides two overload methods to create temporary system function with an UserDefinedFunction:

  • createTemporarySystemFunction( String name, Class<? extends UserDefinedFunction> functionClass)
  • createTemporarySystemFunction(String name, UserDefinedFunction functionInstance)

It is recommended to use functionClass over functionInstance as far as user-defined functions provide no args constructor, because Flink as the framework underneath can add more logic to control the process of creating new instance. Current built-in standard logic in TableEnvironmentImpl will validate the class and methods in the class based on different subclass types of UserDefinedFunction, e.g. ScalaFunction, TableFunction. More logic or optimization could be added in the framework in the future with no need to change any users’ existing code.

Implementation Guide

Independent of the kind of function, all user-defined functions follow some basic implementation principles.

Function Class

An implementation class must extend from one of the available base classes (e.g. org.apache.flink.table.functions.ScalarFunction).

The class must be declared public, not abstract, and should be globally accessible. Thus, non-static inner or anonymous classes are not allowed.

For storing a user-defined function in a persistent catalog, the class must have a default constructor and must be instantiable during runtime. Anonymous functions in Table API can only be persisted if the function is not stateful (i.e. containing only transient and static fields).

Evaluation Methods

The base class provides a set of methods that can be overridden such as open(), close(), or isDeterministic().

However, in addition to those declared methods, the main runtime logic that is applied to every incoming record must be implemented through specialized evaluation methods.

Depending on the function kind, evaluation methods such as eval(), accumulate(), or retract() are called by code-generated operators during runtime.

The methods must be declared public and take a well-defined set of arguments.

Regular JVM method calling semantics apply. Therefore, it is possible to:

  • implement overloaded methods such as eval(Integer) and eval(LocalDateTime),
  • use var-args such as eval(Integer...),
  • use object inheritance such as eval(Object) that takes both LocalDateTime and Integer,
  • and combinations of the above such as eval(Object...) that takes all kinds of arguments.

If you intend to implement functions in Scala, please add the scala.annotation.varargs annotation in case of variable arguments. Furthermore, it is recommended to use boxed primitives (e.g. java.lang.Integer instead of Int) to support NULL.

The following snippets shows an example of an overloaded function:

Java

  1. import org.apache.flink.table.functions.ScalarFunction;
  2. // function with overloaded evaluation methods
  3. public static class SumFunction extends ScalarFunction {
  4. public Integer eval(Integer a, Integer b) {
  5. return a + b;
  6. }
  7. public Integer eval(String a, String b) {
  8. return Integer.valueOf(a) + Integer.valueOf(b);
  9. }
  10. public Integer eval(Double... d) {
  11. double result = 0;
  12. for (double value : d)
  13. result += value;
  14. return (int) result;
  15. }
  16. }

Scala

  1. import org.apache.flink.table.functions.ScalarFunction
  2. import java.lang.Integer
  3. import java.lang.Double
  4. import scala.annotation.varargs
  5. // function with overloaded evaluation methods
  6. class SumFunction extends ScalarFunction {
  7. def eval(a: Integer, b: Integer): Integer = {
  8. a + b
  9. }
  10. def eval(a: String, b: String): Integer = {
  11. Integer.valueOf(a) + Integer.valueOf(b)
  12. }
  13. @varargs // generate var-args like Java
  14. def eval(d: Double*): Integer = {
  15. d.sum.toInt
  16. }
  17. }

Type Inference

The table ecosystem (similar to the SQL standard) is a strongly typed API. Therefore, both function parameters and return types must be mapped to a data type.

From a logical perspective, the planner needs information about expected types, precision, and scale. From a JVM perspective, the planner needs information about how internal data structures are represented as JVM objects when calling a user-defined function.

The logic for validating input arguments and deriving data types for both the parameters and the result of a function is summarized under the term type inference.

Flink’s user-defined functions implement an automatic type inference extraction that derives data types from the function’s class and its evaluation methods via reflection. If this implicit reflective extraction approach is not successful, the extraction process can be supported by annotating affected parameters, classes, or methods with @DataTypeHint and @FunctionHint. More examples on how to annotate functions are shown below.

If more advanced type inference logic is required, an implementer can explicitly override the getTypeInference() method in every user-defined function. However, the annotation approach is recommended because it keeps custom type inference logic close to the affected locations and falls back to the default behavior for the remaining implementation.

Automatic Type Inference

The automatic type inference inspects the function’s class and evaluation methods to derive data types for the arguments and result of a function. @DataTypeHint and @FunctionHint annotations support the automatic extraction.

For a full list of classes that can be implicitly mapped to a data type, see the data type extraction section.

@DataTypeHint

In many scenarios, it is required to support the automatic extraction inline for parameters and return types of a function

The following example shows how to use data type hints. More information can be found in the documentation of the annotation class.

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.InputGroup;
  3. import org.apache.flink.table.functions.ScalarFunction;
  4. import org.apache.flink.types.Row;
  5. // function with overloaded evaluation methods
  6. public static class OverloadedFunction extends ScalarFunction {
  7. // no hint required
  8. public Long eval(long a, long b) {
  9. return a + b;
  10. }
  11. // define the precision and scale of a decimal
  12. public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
  13. return BigDecimal.valueOf(a + b);
  14. }
  15. // define a nested data type
  16. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  17. public Row eval(int i) {
  18. return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
  19. }
  20. // allow wildcard input and customly serialized output
  21. @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  22. public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
  23. return MyUtils.serializeToByteBuffer(o);
  24. }
  25. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.InputGroup
  3. import org.apache.flink.table.functions.ScalarFunction
  4. import org.apache.flink.types.Row
  5. import scala.annotation.varargs
  6. // function with overloaded evaluation methods
  7. class OverloadedFunction extends ScalarFunction {
  8. // no hint required
  9. def eval(a: Long, b: Long): Long = {
  10. a + b
  11. }
  12. // define the precision and scale of a decimal
  13. @DataTypeHint("DECIMAL(12, 3)")
  14. def eval(double a, double b): BigDecimal = {
  15. java.lang.BigDecimal.valueOf(a + b)
  16. }
  17. // define a nested data type
  18. @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  19. def eval(Int i): Row = {
  20. Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i))
  21. }
  22. // allow wildcard input and customly serialized output
  23. @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer])
  24. def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o): java.nio.ByteBuffer = {
  25. MyUtils.serializeToByteBuffer(o)
  26. }
  27. }

@FunctionHint

In some scenarios, it is desirable that one evaluation method handles multiple different data types at the same time. Furthermore, in some scenarios, overloaded evaluation methods have a common result type that should be declared only once.

The @FunctionHint annotation can provide a mapping from argument data types to a result data type. It enables annotating entire function classes or evaluation methods for input, accumulator, and result data types. One or more annotations can be declared on top of a class or individually for each evaluation method for overloading function signatures. All hint parameters are optional. If a parameter is not defined, the default reflection-based extraction is used. Hint parameters defined on top of a function class are inherited by all evaluation methods.

The following example shows how to use function hints. More information can be found in the documentation of the annotation class.

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.FunctionHint;
  3. import org.apache.flink.table.functions.TableFunction;
  4. import org.apache.flink.types.Row;
  5. // function with overloaded evaluation methods
  6. // but globally defined output type
  7. @FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
  8. public static class OverloadedFunction extends TableFunction<Row> {
  9. public void eval(int a, int b) {
  10. collect(Row.of("Sum", a + b));
  11. }
  12. // overloading of arguments is still possible
  13. public void eval() {
  14. collect(Row.of("Empty args", -1));
  15. }
  16. }
  17. // decouples the type inference from evaluation methods,
  18. // the type inference is entirely determined by the function hints
  19. @FunctionHint(
  20. input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  21. output = @DataTypeHint("INT")
  22. )
  23. @FunctionHint(
  24. input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  25. output = @DataTypeHint("BIGINT")
  26. )
  27. @FunctionHint(
  28. input = {},
  29. output = @DataTypeHint("BOOLEAN")
  30. )
  31. public static class OverloadedFunction extends TableFunction<Object> {
  32. // an implementer just needs to make sure that a method exists
  33. // that can be called by the JVM
  34. public void eval(Object... o) {
  35. if (o.length == 0) {
  36. collect(false);
  37. }
  38. collect(o[0]);
  39. }
  40. }

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.FunctionHint
  3. import org.apache.flink.table.functions.TableFunction
  4. import org.apache.flink.types.Row
  5. // function with overloaded evaluation methods
  6. // but globally defined output type
  7. @FunctionHint(output = new DataTypeHint("ROW<s STRING, i INT>"))
  8. class OverloadedFunction extends TableFunction[Row] {
  9. def eval(a: Int, b: Int): Unit = {
  10. collect(Row.of("Sum", Int.box(a + b)))
  11. }
  12. // overloading of arguments is still possible
  13. def eval(): Unit = {
  14. collect(Row.of("Empty args", Int.box(-1)))
  15. }
  16. }
  17. // decouples the type inference from evaluation methods,
  18. // the type inference is entirely determined by the function hints
  19. @FunctionHint(
  20. input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")),
  21. output = new DataTypeHint("INT")
  22. )
  23. @FunctionHint(
  24. input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")),
  25. output = new DataTypeHint("BIGINT")
  26. )
  27. @FunctionHint(
  28. input = Array(),
  29. output = new DataTypeHint("BOOLEAN")
  30. )
  31. class OverloadedFunction extends TableFunction[AnyRef] {
  32. // an implementer just needs to make sure that a method exists
  33. // that can be called by the JVM
  34. @varargs
  35. def eval(o: AnyRef*) = {
  36. if (o.length == 0) {
  37. collect(Boolean.box(false))
  38. }
  39. collect(o(0))
  40. }
  41. }

Custom Type Inference

For most scenarios, @DataTypeHint and @FunctionHint should be sufficient to model user-defined functions. However, by overriding the automatic type inference defined in getTypeInference(), implementers can create arbitrary functions that behave like built-in system functions.

The following example implemented in Java illustrates the potential of a custom type inference logic. It uses a string literal argument to determine the result type of a function. The function takes two string arguments: the first argument represents the string to be parsed, the second argument represents the target type.

Java

  1. import org.apache.flink.table.api.DataTypes;
  2. import org.apache.flink.table.catalog.DataTypeFactory;
  3. import org.apache.flink.table.functions.ScalarFunction;
  4. import org.apache.flink.table.types.inference.TypeInference;
  5. import org.apache.flink.types.Row;
  6. public static class LiteralFunction extends ScalarFunction {
  7. public Object eval(String s, String type) {
  8. switch (type) {
  9. case "INT":
  10. return Integer.valueOf(s);
  11. case "DOUBLE":
  12. return Double.valueOf(s);
  13. case "STRING":
  14. default:
  15. return s;
  16. }
  17. }
  18. // the automatic, reflection-based type inference is disabled and
  19. // replaced by the following logic
  20. @Override
  21. public TypeInference getTypeInference(DataTypeFactory typeFactory) {
  22. return TypeInference.newBuilder()
  23. // specify typed arguments
  24. // parameters will be casted implicitly to those types if necessary
  25. .typedArguments(DataTypes.STRING(), DataTypes.STRING())
  26. // specify a strategy for the result data type of the function
  27. .outputTypeStrategy(callContext -> {
  28. if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
  29. throw callContext.newValidationError("Literal expected for second argument.");
  30. }
  31. // return a data type based on a literal
  32. final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
  33. switch (literal) {
  34. case "INT":
  35. return Optional.of(DataTypes.INT().notNull());
  36. case "DOUBLE":
  37. return Optional.of(DataTypes.DOUBLE().notNull());
  38. case "STRING":
  39. default:
  40. return Optional.of(DataTypes.STRING());
  41. }
  42. })
  43. .build();
  44. }
  45. }

For more examples of custom type inference, see also the flink-examples-table module with advanced function implementation .

Named Parameters

When calling a function, you can use parameter names to specify the values of the parameters. Named parameters allow passing both the parameter name and value to a function, avoiding confusion caused by incorrect parameter order and improving code readability and maintainability. In addition, named parameters can also omit optional parameters, which are filled with null by default. We can use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required or not.

@ArgumentHint

The following 3 examples demonstrate how to use @ArgumentHint in different scopes. More information can be found in the documentation of the annotation class.

  1. Using @ArgumentHint annotation on the parameters of the eval method of the function.

    Java

  1. import com.sun.tracing.dtrace.ArgsAttributes;
  2. import org.apache.flink.table.annotation.ArgumentHint;
  3. import org.apache.flink.table.functions.ScalarFunction;
  4. public static class NamedParameterClass extends ScalarFunction {
  5. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  6. public String eval(@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")) String s1,
  7. @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INT")) Integer s2) {
  8. return s1 + ", " + s2;
  9. }
  10. }

Scala

  1. import org.apache.flink.table.annotation.ArgumentHint;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. class NamedParameterClass extends ScalarFunction {
  4. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  5. def eval(@ArgumentHint(name = "param1", isOptional = false, `type` = @DataTypeHint("STRING")) s1: String,
  6. @ArgumentHint(name = "param2", isOptional = true, `type` = @DataTypeHint("INTEGER")) s2: Integer) = {
  7. s1 + ", " + s2
  8. }
  9. }
  1. Using @ArgumentHint annotation on the eval method of the function.

    Java

  1. import org.apache.flink.table.annotation.ArgumentHint;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. public static class NamedParameterClass extends ScalarFunction {
  4. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  5. @FunctionHint(
  6. argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
  7. @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
  8. )
  9. public String eval(String s1, Integer s2) {
  10. return s1 + ", " + s2;
  11. }
  12. }

Scala

  1. import org.apache.flink.table.annotation.ArgumentHint;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. class NamedParameterClass extends ScalarFunction {
  4. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  5. @FunctionHint(
  6. argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
  7. new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
  8. )
  9. def eval(s1: String, s2: Int): String = {
  10. s1 + ", " + s2
  11. }
  12. }
  1. Using @ArgumentHint annotation on the class of the function.

    Java

  1. import org.apache.flink.table.annotation.ArgumentHint;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  4. @FunctionHint(
  5. argument = {@ArgumentHint(name = "param1", isOptional = false, type = @DataTypeHint("STRING")),
  6. @ArgumentHint(name = "param2", isOptional = true, type = @DataTypeHint("INTEGER"))}
  7. )
  8. public static class NamedParameterClass extends ScalarFunction {
  9. public String eval(String s1, Integer s2) {
  10. return s1 + ", " + s2;
  11. }
  12. }

Scala

  1. import org.apache.flink.table.annotation.ArgumentHint;
  2. import org.apache.flink.table.functions.ScalarFunction;
  3. // Use the @ArgumentHint annotation to specify the name, type, and whether a parameter is required.
  4. @FunctionHint(
  5. argument = Array(new ArgumentHint(name = "param1", isOptional = false, `type` = new DataTypeHint("STRING")),
  6. new ArgumentHint(name = "param2", isOptional = true, `type` = new DataTypeHint("INTEGER")))
  7. )
  8. class NamedParameterClass extends ScalarFunction {
  9. def eval(s1: String, s2: Int): String = {
  10. s1 + ", " + s2
  11. }
  12. }
  • @ArgumentHint annotation already contains @DataTypeHint annotation, so it cannot be used together with @DataTypeHint in @FunctionHint. When applied to function parameters, @ArgumentHint cannot be used with @DataTypeHint at the same time, and it is recommended to use @ArgumentHint.
  • Named parameters only take effect when the corresponding class does not contain overloaded functions and variable parameter functions, otherwise using named parameters will cause an error.

Determinism

Every user-defined function class can declare whether it produces deterministic results or not by overriding the isDeterministic() method. If the function is not purely functional (like random(), date(), or now()), the method must return false. By default, isDeterministic() returns true.

Furthermore, the isDeterministic() method might also influence the runtime behavior. A runtime implementation might be called at two different stages:

1. During planning (i.e. pre-flight phase): If a function is called with constant expressions or constant expressions can be derived from the given statement, a function is pre-evaluated for constant expression reduction and might not be executed on the cluster anymore. Unless isDeterministic() is used to disable constant expression reduction in this case. For example, the following calls to ABS are executed during planning: SELECT ABS(-1) FROM t and SELECT ABS(field) FROM t WHERE field = -1; whereas SELECT ABS(field) FROM t is not.

2. During runtime (i.e. cluster execution): If a function is called with non-constant expressions or isDeterministic() returns false.

System (Built-in) Function Determinism

The determinism of system (built-in) functions are immutable. There exists two kinds of functions which are not deterministic: dynamic function and non-deterministic function, according to Apache Calcite’s SqlOperator definition:

  1. /**
  2. * Returns whether a call to this operator is guaranteed to always return
  3. * the same result given the same operands; true is assumed by default.
  4. */
  5. public boolean isDeterministic() {
  6. return true;
  7. }
  8. /**
  9. * Returns whether it is unsafe to cache query plans referencing this
  10. * operator; false is assumed by default.
  11. */
  12. public boolean isDynamicFunction() {
  13. return false;
  14. }

isDeterministic indicates the determinism of a function, will be evaluated per record during runtime if returns false. isDynamicFunction implies the function can only be evaluated at query-start if returns true, it will be only pre-evaluated during planning for batch mode, while for streaming mode, it is equivalent to a non-deterministic function because of the query is continuously being executed logically(the abstraction of continuous query over the dynamic tables), so the dynamic functions are also re-evaluated for each query execution(equivalent to per record in current implementation).

The following system functions are always non-deterministic(evaluated per record during runtime both in batch and streaming mode):

  • UUID
  • RAND
  • RAND_INTEGER
  • CURRENT_DATABASE
  • UNIX_TIMESTAMP
  • CURRENT_ROW_TIMESTAMP

The following system temporal functions are dynamic, which will be pre-evaluated during planning(query-start) for batch mode and evaluated per record for streaming mode:

  • CURRENT_DATE
  • CURRENT_TIME
  • CURRENT_TIMESTAMP
  • NOW
  • LOCALTIME
  • LOCALTIMESTAMP

Note: isDynamicFunction is only applicable for system functions.

Runtime Integration

Sometimes it might be necessary for a user-defined function to get global runtime information or do some setup/clean-up work before the actual work. User-defined functions provide open() and close() methods that can be overridden and provide similar functionality as the methods in RichFunction of DataStream API.

The open() method is called once before the evaluation method. The close() method after the last call to the evaluation method.

The open() method provides a FunctionContext that contains information about the context in which user-defined functions are executed, such as the metric group, the distributed cache files, or the global job parameters.

The following information can be obtained by calling the corresponding methods of FunctionContext:

MethodDescription
getMetricGroup()Metric group for this parallel subtask.
getCachedFile(name)Local temporary file copy of a distributed cache file.
getJobParameter(name, defaultValue)Global job parameter value associated with given key.
getExternalResourceInfos(resourceName)Returns a set of external resource infos associated with the given key.

Note: Depending on the context in which the function is executed, not all methods from above might be available. For example, during constant expression reduction adding a metric is a no-op operation.

The following example snippet shows how to use FunctionContext in a scalar function for accessing a global job parameter:

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.functions.FunctionContext;
  3. import org.apache.flink.table.functions.ScalarFunction;
  4. public static class HashCodeFunction extends ScalarFunction {
  5. private int factor = 0;
  6. @Override
  7. public void open(FunctionContext context) throws Exception {
  8. // access the global "hashcode_factor" parameter
  9. // "12" would be the default value if the parameter does not exist
  10. factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
  11. }
  12. public int eval(String s) {
  13. return s.hashCode() * factor;
  14. }
  15. }
  16. TableEnvironment env = TableEnvironment.create(...);
  17. // add job parameter
  18. env.getConfig().addJobParameter("hashcode_factor", "31");
  19. // register the function
  20. env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);
  21. // use the function
  22. env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.functions.FunctionContext
  3. import org.apache.flink.table.functions.ScalarFunction
  4. class HashCodeFunction extends ScalarFunction {
  5. private var factor: Int = 0
  6. override def open(context: FunctionContext): Unit = {
  7. // access the global "hashcode_factor" parameter
  8. // "12" would be the default value if the parameter does not exist
  9. factor = context.getJobParameter("hashcode_factor", "12").toInt
  10. }
  11. def eval(s: String): Int = {
  12. s.hashCode * factor
  13. }
  14. }
  15. val env = TableEnvironment.create(...)
  16. // add job parameter
  17. env.getConfig.addJobParameter("hashcode_factor", "31")
  18. // register the function
  19. env.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction])
  20. // use the function
  21. env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable")

Scalar Functions

A user-defined scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the data types section can be used as a parameter or return type of an evaluation method.

In order to define a scalar function, one has to extend the base class ScalarFunction in org.apache.flink.table.functions and implement one or more evaluation methods named eval(...).

The following example shows how to define your own hash code function and call it in a query. See the Implementation Guide for more details.

Java

  1. import org.apache.flink.table.annotation.InputGroup;
  2. import org.apache.flink.table.api.*;
  3. import org.apache.flink.table.functions.ScalarFunction;
  4. import static org.apache.flink.table.api.Expressions.*;
  5. public static class HashFunction extends ScalarFunction {
  6. // take any data type and return INT
  7. public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
  8. return o.hashCode();
  9. }
  10. }
  11. TableEnvironment env = TableEnvironment.create(...);
  12. // call function "inline" without registration in Table API
  13. env.from("MyTable").select(call(HashFunction.class, $("myField")));
  14. // register function
  15. env.createTemporarySystemFunction("HashFunction", HashFunction.class);
  16. // call registered function in Table API
  17. env.from("MyTable").select(call("HashFunction", $("myField")));
  18. // call registered function in SQL
  19. env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

Scala

  1. import org.apache.flink.table.annotation.InputGroup
  2. import org.apache.flink.table.api._
  3. import org.apache.flink.table.functions.ScalarFunction
  4. class HashFunction extends ScalarFunction {
  5. // take any data type and return INT
  6. def eval(@DataTypeHint(inputGroup = InputGroup.ANY) o: AnyRef): Int = {
  7. o.hashCode()
  8. }
  9. }
  10. val env = TableEnvironment.create(...)
  11. // call function "inline" without registration in Table API
  12. env.from("MyTable").select(call(classOf[HashFunction], $"myField"))
  13. // register function
  14. env.createTemporarySystemFunction("HashFunction", classOf[HashFunction])
  15. // call registered function in Table API
  16. env.from("MyTable").select(call("HashFunction", $"myField"))
  17. // call registered function in SQL
  18. env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")

If you intend to implement or call functions in Python, please refer to the Python Scalar Functions documentation for more details.

Table Functions

Similar to a user-defined scalar function, a user-defined table function (UDTF) takes zero, one, or multiple scalar values as input arguments. However, it can return an arbitrary number of rows (or structured types) as output instead of a single value. The returned record may consist of one or more fields. If an output record consists of only a single field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.

In order to define a table function, one has to extend the base class TableFunction in org.apache.flink.table.functions and implement one or more evaluation methods named eval(...). Similar to other functions, input and output data types are automatically extracted using reflection. This includes the generic argument T of the class for determining an output data type. In contrast to scalar functions, the evaluation method itself must not have a return type, instead, table functions provide a collect(T) method that can be called within every evaluation method for emitting zero, one, or more records.

In the Table API, a table function is used with .joinLateral(...) or .leftOuterJoinLateral(...). The joinLateral operator (cross) joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator). The leftOuterJoinLateral operator joins each row from the outer table (table on the left of the operator) with all rows produced by the table-valued function (which is on the right side of the operator) and preserves outer rows for which the table function returns an empty table.

In SQL, use LATERAL TABLE(<TableFunction>) with JOIN or LEFT JOIN with an ON TRUE join condition.

The following example shows how to define your own split function and call it in a query. See the Implementation Guide for more details.

Java

  1. import org.apache.flink.table.annotation.DataTypeHint;
  2. import org.apache.flink.table.annotation.FunctionHint;
  3. import org.apache.flink.table.api.*;
  4. import org.apache.flink.table.functions.TableFunction;
  5. import org.apache.flink.types.Row;
  6. import static org.apache.flink.table.api.Expressions.*;
  7. @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
  8. public static class SplitFunction extends TableFunction<Row> {
  9. public void eval(String str) {
  10. for (String s : str.split(" ")) {
  11. // use collect(...) to emit a row
  12. collect(Row.of(s, s.length()));
  13. }
  14. }
  15. }
  16. TableEnvironment env = TableEnvironment.create(...);
  17. // call function "inline" without registration in Table API
  18. env
  19. .from("MyTable")
  20. .joinLateral(call(SplitFunction.class, $("myField")))
  21. .select($("myField"), $("word"), $("length"));
  22. env
  23. .from("MyTable")
  24. .leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
  25. .select($("myField"), $("word"), $("length"));
  26. // rename fields of the function in Table API
  27. env
  28. .from("MyTable")
  29. .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord", "newLength"))
  30. .select($("myField"), $("newWord"), $("newLength"));
  31. // register function
  32. env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
  33. // call registered function in Table API
  34. env
  35. .from("MyTable")
  36. .joinLateral(call("SplitFunction", $("myField")))
  37. .select($("myField"), $("word"), $("length"));
  38. env
  39. .from("MyTable")
  40. .leftOuterJoinLateral(call("SplitFunction", $("myField")))
  41. .select($("myField"), $("word"), $("length"));
  42. // call registered function in SQL
  43. env.sqlQuery(
  44. "SELECT myField, word, length " +
  45. "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
  46. env.sqlQuery(
  47. "SELECT myField, word, length " +
  48. "FROM MyTable " +
  49. "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
  50. // rename fields of the function in SQL
  51. env.sqlQuery(
  52. "SELECT myField, newWord, newLength " +
  53. "FROM MyTable " +
  54. "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

Scala

  1. import org.apache.flink.table.annotation.DataTypeHint
  2. import org.apache.flink.table.annotation.FunctionHint
  3. import org.apache.flink.table.api._
  4. import org.apache.flink.table.functions.TableFunction
  5. import org.apache.flink.types.Row
  6. @FunctionHint(output = new DataTypeHint("ROW<word STRING, length INT>"))
  7. class SplitFunction extends TableFunction[Row] {
  8. def eval(str: String): Unit = {
  9. // use collect(...) to emit a row
  10. str.split(" ").foreach(s => collect(Row.of(s, Int.box(s.length))))
  11. }
  12. }
  13. val env = TableEnvironment.create(...)
  14. // call function "inline" without registration in Table API
  15. env
  16. .from("MyTable")
  17. .joinLateral(call(classOf[SplitFunction], $"myField")
  18. .select($"myField", $"word", $"length")
  19. env
  20. .from("MyTable")
  21. .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField"))
  22. .select($"myField", $"word", $"length")
  23. // rename fields of the function in Table API
  24. env
  25. .from("MyTable")
  26. .leftOuterJoinLateral(call(classOf[SplitFunction], $"myField").as("newWord", "newLength"))
  27. .select($"myField", $"newWord", $"newLength")
  28. // register function
  29. env.createTemporarySystemFunction("SplitFunction", classOf[SplitFunction])
  30. // call registered function in Table API
  31. env
  32. .from("MyTable")
  33. .joinLateral(call("SplitFunction", $"myField"))
  34. .select($"myField", $"word", $"length")
  35. env
  36. .from("MyTable")
  37. .leftOuterJoinLateral(call("SplitFunction", $"myField"))
  38. .select($"myField", $"word", $"length")
  39. // call registered function in SQL
  40. env.sqlQuery(
  41. "SELECT myField, word, length " +
  42. "FROM MyTable, LATERAL TABLE(SplitFunction(myField))")
  43. env.sqlQuery(
  44. "SELECT myField, word, length " +
  45. "FROM MyTable " +
  46. "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE")
  47. // rename fields of the function in SQL
  48. env.sqlQuery(
  49. "SELECT myField, newWord, newLength " +
  50. "FROM MyTable " +
  51. "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE")

If you intend to implement functions in Scala, do not implement a table function as a Scala object. Scala objects are singletons and will cause concurrency issues.

If you intend to implement or call functions in Python, please refer to the Python Table Functions documentation for more details.

Aggregate Functions

A user-defined aggregate function (UDAGG) maps scalar values of multiple rows to a new scalar value.

The behavior of an aggregate function is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling createAccumulator(). Subsequently, the accumulate(...) method of the function is called for each input row to update the accumulator. Once all rows have been processed, the getValue(...) method of the function is called to compute and return the final result.

The following example illustrates the aggregation process:

UDAGG mechanism

In the example, we assume a table that contains data about beverages. The table consists of three columns (id, name, and price) and 5 rows. We would like to find the highest price of all beverages in the table, i.e., perform a max() aggregation. We need to consider each of the 5 rows. The result is a single numeric value.

In order to define an aggregate function, one has to extend the base class AggregateFunction in org.apache.flink.table.functions and implement one or more evaluation methods named accumulate(...). An accumulate method must be declared publicly and not static. Accumulate methods can also be overloaded by implementing multiple methods named accumulate.

By default, input, accumulator, and output data types are automatically extracted using reflection. This includes the generic argument ACC of the class for determining an accumulator data type and the generic argument T for determining an accumulator data type. Input arguments are derived from one or more accumulate(...) methods. See the Implementation Guide for more details.

If you intend to implement or call functions in Python, please refer to the Python Functions documentation for more details.

The following example shows how to define your own aggregate function and call it in a query.

Java

  1. import org.apache.flink.table.api.*;
  2. import org.apache.flink.table.functions.AggregateFunction;
  3. import static org.apache.flink.table.api.Expressions.*;
  4. // mutable accumulator of structured type for the aggregate function
  5. public static class WeightedAvgAccumulator {
  6. public long sum = 0;
  7. public int count = 0;
  8. }
  9. // function that takes (value BIGINT, weight INT), stores intermediate results in a structured
  10. // type of WeightedAvgAccumulator, and returns the weighted average as BIGINT
  11. public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {
  12. @Override
  13. public WeightedAvgAccumulator createAccumulator() {
  14. return new WeightedAvgAccumulator();
  15. }
  16. @Override
  17. public Long getValue(WeightedAvgAccumulator acc) {
  18. if (acc.count == 0) {
  19. return null;
  20. } else {
  21. return acc.sum / acc.count;
  22. }
  23. }
  24. public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
  25. acc.sum += iValue * iWeight;
  26. acc.count += iWeight;
  27. }
  28. public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
  29. acc.sum -= iValue * iWeight;
  30. acc.count -= iWeight;
  31. }
  32. public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
  33. for (WeightedAvgAccumulator a : it) {
  34. acc.count += a.count;
  35. acc.sum += a.sum;
  36. }
  37. }
  38. public void resetAccumulator(WeightedAvgAccumulator acc) {
  39. acc.count = 0;
  40. acc.sum = 0L;
  41. }
  42. }
  43. TableEnvironment env = TableEnvironment.create(...);
  44. // call function "inline" without registration in Table API
  45. env
  46. .from("MyTable")
  47. .groupBy($("myField"))
  48. .select($("myField"), call(WeightedAvg.class, $("value"), $("weight")));
  49. // register function
  50. env.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);
  51. // call registered function in Table API
  52. env
  53. .from("MyTable")
  54. .groupBy($("myField"))
  55. .select($("myField"), call("WeightedAvg", $("value"), $("weight")));
  56. // call registered function in SQL
  57. env.sqlQuery(
  58. "SELECT myField, WeightedAvg(`value`, weight) FROM MyTable GROUP BY myField"
  59. );

Scala

  1. import org.apache.flink.table.api._
  2. import org.apache.flink.table.functions.AggregateFunction
  3. // mutable accumulator of structured type for the aggregate function
  4. case class WeightedAvgAccumulator(
  5. var sum: Long = 0,
  6. var count: Int = 0
  7. )
  8. // function that takes (value BIGINT, weight INT), stores intermediate results in a structured
  9. // type of WeightedAvgAccumulator, and returns the weighted average as BIGINT
  10. class WeightedAvg extends AggregateFunction[java.lang.Long, WeightedAvgAccumulator] {
  11. override def createAccumulator(): WeightedAvgAccumulator = {
  12. WeightedAvgAccumulator()
  13. }
  14. override def getValue(acc: WeightedAvgAccumulator): java.lang.Long = {
  15. if (acc.count == 0) {
  16. null
  17. } else {
  18. acc.sum / acc.count
  19. }
  20. }
  21. def accumulate(acc: WeightedAvgAccumulator, iValue: java.lang.Long, iWeight: java.lang.Integer): Unit = {
  22. acc.sum += iValue * iWeight
  23. acc.count += iWeight
  24. }
  25. def retract(acc: WeightedAvgAccumulator, iValue: java.lang.Long, iWeight: java.lang.Integer): Unit = {
  26. acc.sum -= iValue * iWeight
  27. acc.count -= iWeight
  28. }
  29. def merge(acc: WeightedAvgAccumulator, it: java.lang.Iterable[WeightedAvgAccumulator]): Unit = {
  30. val iter = it.iterator()
  31. while (iter.hasNext) {
  32. val a = iter.next()
  33. acc.count += a.count
  34. acc.sum += a.sum
  35. }
  36. }
  37. def resetAccumulator(acc: WeightedAvgAccumulator): Unit = {
  38. acc.count = 0
  39. acc.sum = 0L
  40. }
  41. }
  42. val env = TableEnvironment.create(...)
  43. // call function "inline" without registration in Table API
  44. env
  45. .from("MyTable")
  46. .groupBy($"myField")
  47. .select($"myField", call(classOf[WeightedAvg], $"value", $"weight"))
  48. // register function
  49. env.createTemporarySystemFunction("WeightedAvg", classOf[WeightedAvg])
  50. // call registered function in Table API
  51. env
  52. .from("MyTable")
  53. .groupBy($"myField")
  54. .select($"myField", call("WeightedAvg", $"value", $"weight"))
  55. // call registered function in SQL
  56. env.sqlQuery(
  57. "SELECT myField, WeightedAvg(`value`, weight) FROM MyTable GROUP BY myField"
  58. )

The accumulate(...) method of our WeightedAvg class takes three inputs. The first one is the accumulator and the other two are user-defined inputs. In order to calculate a weighted average value, the accumulator needs to store the weighted sum and count of all the data that has been accumulated. In our example, we define a class WeightedAvgAccumulator to be the accumulator. Accumulators are automatically managed by Flink’s checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics.

Mandatory and Optional Methods

The following methods are mandatory for each AggregateFunction:

  • createAccumulator()
  • accumulate(...)
  • getValue(...)

Additionally, there are a few methods that can be optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the merge(...) method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that “connects” them).

The following methods of AggregateFunction are required depending on the use case:

  • retract(...) is required for aggregations on OVER windows.
  • merge(...) is required for many bounded aggregations and session window and hop window aggregations. Besides, this method is also helpful for optimizations. For example, two phase aggregation optimization requires all the AggregateFunction support merge method.

If the aggregate function can only be applied in an OVER window, this can be declared by returning the requirement FunctionRequirement.OVER_WINDOW_ONLY in getRequirements().

If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information about this advanced feature.

Since some of the methods are optional, or can be overloaded, the runtime invokes aggregate function methods via generated code. This means the base class does not always provide a signature to be overridden by the concrete implementation. Nevertheless, all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above to be called.

Detailed documentation for all methods that are not declared in AggregateFunction and called by generated code is given below.

accumulate(...)

Java

  1. /*
  2. * Processes the input values and updates the provided accumulator instance. The method
  3. * accumulate can be overloaded with different custom types and arguments. An aggregate function
  4. * requires at least one accumulate() method.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  8. */
  9. public void accumulate(ACC accumulator, [user defined inputs])

Scala

  1. /*
  2. * Processes the input values and updates the provided accumulator instance. The method
  3. * accumulate can be overloaded with different custom types and arguments. An aggregate function
  4. * requires at least one accumulate() method.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  8. */
  9. def accumulate(accumulator: ACC, [user defined inputs]): Unit

retract(...)

Java

  1. /*
  2. * Retracts the input values from the accumulator instance. The current design assumes the
  3. * inputs are the values that have been previously accumulated. The method retract can be
  4. * overloaded with different custom types and arguments. This method must be implemented for
  5. * bounded OVER aggregates over unbounded tables.
  6. *
  7. * param: accumulator the accumulator which contains the current aggregated results
  8. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  9. */
  10. public void retract(ACC accumulator, [user defined inputs])

Scala

  1. /*
  2. * Retracts the input values from the accumulator instance. The current design assumes the
  3. * inputs are the values that have been previously accumulated. The method retract can be
  4. * overloaded with different custom types and arguments. This method must be implemented for
  5. * bounded OVER aggregates over unbounded tables.
  6. *
  7. * param: accumulator the accumulator which contains the current aggregated results
  8. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  9. */
  10. def retract(accumulator: ACC, [user defined inputs]): Unit

merge(...)

Java

  1. /*
  2. * Merges a group of accumulator instances into one accumulator instance. This method must be
  3. * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
  4. *
  5. * param: accumulator the accumulator which will keep the merged aggregate results. It should
  6. * be noted that the accumulator may contain the previous aggregated
  7. * results. Therefore user should not replace or clean this instance in the
  8. * custom merge method.
  9. * param: iterable an java.lang.Iterable pointed to a group of accumulators that will be
  10. * merged.
  11. */
  12. public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)

Scala

  1. /*
  2. * Merges a group of accumulator instances into one accumulator instance. This method must be
  3. * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
  4. *
  5. * param: accumulator the accumulator which will keep the merged aggregate results. It should
  6. * be noted that the accumulator may contain the previous aggregated
  7. * results. Therefore user should not replace or clean this instance in the
  8. * custom merge method.
  9. * param: iterable an java.lang.Iterable pointed to a group of accumulators that will be
  10. * merged.
  11. */
  12. def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit

If you intend to implement or call functions in Python, please refer to the Python Aggregate Functions documentation for more details.

Table Aggregate Functions

A user-defined table aggregate function (UDTAGG) maps scalar values of multiple rows to zero, one, or multiple rows (or structured types). The returned record may consist of one or more fields. If an output record consists of only a single field, the structured record can be omitted, and a scalar value can be emitted that will be implicitly wrapped into a row by the runtime.

Similar to an aggregate function, the behavior of a table aggregate is centered around the concept of an accumulator. The accumulator is an intermediate data structure that stores the aggregated values until a final aggregation result is computed.

For each set of rows that needs to be aggregated, the runtime will create an empty accumulator by calling createAccumulator(). Subsequently, the accumulate(...) method of the function is called for each input row to update the accumulator. Once all rows have been processed, the emitValue(...) or emitUpdateWithRetract(...) method of the function is called to compute and return the final result.

The following example illustrates the aggregation process:

UDTAGG mechanism

In the example, we assume a table that contains data about beverages. The table consists of three columns (id, name, and price) and 5 rows. We would like to find the 2 highest prices of all beverages in the table, i.e., perform a TOP2() table aggregation. We need to consider each of the 5 rows. The result is a table with the top 2 values.

In order to define a table aggregate function, one has to extend the base class TableAggregateFunction in org.apache.flink.table.functions and implement one or more evaluation methods named accumulate(...). An accumulate method must be declared publicly and not static. Accumulate methods can also be overloaded by implementing multiple methods named accumulate.

By default, input, accumulator, and output data types are automatically extracted using reflection. This includes the generic argument ACC of the class for determining an accumulator data type and the generic argument T for determining an accumulator data type. Input arguments are derived from one or more accumulate(...) methods. See the Implementation Guide for more details.

If you intend to implement or call functions in Python, please refer to the Python Functions documentation for more details.

The following example shows how to define your own table aggregate function and call it in a query.

Java

  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.table.api.*;
  3. import org.apache.flink.table.functions.TableAggregateFunction;
  4. import org.apache.flink.util.Collector;
  5. import static org.apache.flink.table.api.Expressions.*;
  6. // mutable accumulator of structured type for the aggregate function
  7. public static class Top2Accumulator {
  8. public Integer first;
  9. public Integer second;
  10. }
  11. // function that takes (value INT), stores intermediate results in a structured
  12. // type of Top2Accumulator, and returns the result as a structured type of Tuple2<Integer, Integer>
  13. // for value and rank
  14. public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {
  15. @Override
  16. public Top2Accumulator createAccumulator() {
  17. Top2Accumulator acc = new Top2Accumulator();
  18. acc.first = Integer.MIN_VALUE;
  19. acc.second = Integer.MIN_VALUE;
  20. return acc;
  21. }
  22. public void accumulate(Top2Accumulator acc, Integer value) {
  23. if (value > acc.first) {
  24. acc.second = acc.first;
  25. acc.first = value;
  26. } else if (value > acc.second) {
  27. acc.second = value;
  28. }
  29. }
  30. public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
  31. for (Top2Accumulator otherAcc : it) {
  32. accumulate(acc, otherAcc.first);
  33. accumulate(acc, otherAcc.second);
  34. }
  35. }
  36. public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
  37. // emit the value and rank
  38. if (acc.first != Integer.MIN_VALUE) {
  39. out.collect(Tuple2.of(acc.first, 1));
  40. }
  41. if (acc.second != Integer.MIN_VALUE) {
  42. out.collect(Tuple2.of(acc.second, 2));
  43. }
  44. }
  45. }
  46. TableEnvironment env = TableEnvironment.create(...);
  47. // call function "inline" without registration in Table API
  48. env
  49. .from("MyTable")
  50. .groupBy($("myField"))
  51. .flatAggregate(call(Top2.class, $("value")))
  52. .select($("myField"), $("f0"), $("f1"));
  53. // call function "inline" without registration in Table API
  54. // but use an alias for a better naming of Tuple2's fields
  55. env
  56. .from("MyTable")
  57. .groupBy($("myField"))
  58. .flatAggregate(call(Top2.class, $("value")).as("value", "rank"))
  59. .select($("myField"), $("value"), $("rank"));
  60. // register function
  61. env.createTemporarySystemFunction("Top2", Top2.class);
  62. // call registered function in Table API
  63. env
  64. .from("MyTable")
  65. .groupBy($("myField"))
  66. .flatAggregate(call("Top2", $("value")).as("value", "rank"))
  67. .select($("myField"), $("value"), $("rank"));

Scala

  1. import java.lang.Integer
  2. import org.apache.flink.api.java.tuple.Tuple2
  3. import org.apache.flink.table.api._
  4. import org.apache.flink.table.functions.TableAggregateFunction
  5. import org.apache.flink.util.Collector
  6. // mutable accumulator of structured type for the aggregate function
  7. case class Top2Accumulator(
  8. var first: Integer,
  9. var second: Integer
  10. )
  11. // function that takes (value INT), stores intermediate results in a structured
  12. // type of Top2Accumulator, and returns the result as a structured type of Tuple2[Integer, Integer]
  13. // for value and rank
  14. class Top2 extends TableAggregateFunction[Tuple2[Integer, Integer], Top2Accumulator] {
  15. override def createAccumulator(): Top2Accumulator = {
  16. Top2Accumulator(
  17. Integer.MIN_VALUE,
  18. Integer.MIN_VALUE
  19. )
  20. }
  21. def accumulate(acc: Top2Accumulator, value: Integer): Unit = {
  22. if (value > acc.first) {
  23. acc.second = acc.first
  24. acc.first = value
  25. } else if (value > acc.second) {
  26. acc.second = value
  27. }
  28. }
  29. def merge(acc: Top2Accumulator, it: java.lang.Iterable[Top2Accumulator]) {
  30. val iter = it.iterator()
  31. while (iter.hasNext) {
  32. val otherAcc = iter.next()
  33. accumulate(acc, otherAcc.first)
  34. accumulate(acc, otherAcc.second)
  35. }
  36. }
  37. def emitValue(acc: Top2Accumulator, out: Collector[Tuple2[Integer, Integer]]): Unit = {
  38. // emit the value and rank
  39. if (acc.first != Integer.MIN_VALUE) {
  40. out.collect(Tuple2.of(acc.first, 1))
  41. }
  42. if (acc.second != Integer.MIN_VALUE) {
  43. out.collect(Tuple2.of(acc.second, 2))
  44. }
  45. }
  46. }
  47. val env = TableEnvironment.create(...)
  48. // call function "inline" without registration in Table API
  49. env
  50. .from("MyTable")
  51. .groupBy($"myField")
  52. .flatAggregate(call(classOf[Top2], $"value"))
  53. .select($"myField", $"f0", $"f1")
  54. // call function "inline" without registration in Table API
  55. // but use an alias for a better naming of Tuple2's fields
  56. env
  57. .from("MyTable")
  58. .groupBy($"myField")
  59. .flatAggregate(call(classOf[Top2], $"value").as("value", "rank"))
  60. .select($"myField", $"value", $"rank")
  61. // register function
  62. env.createTemporarySystemFunction("Top2", classOf[Top2])
  63. // call registered function in Table API
  64. env
  65. .from("MyTable")
  66. .groupBy($"myField")
  67. .flatAggregate(call("Top2", $"value").as("value", "rank"))
  68. .select($"myField", $"value", $"rank")

The accumulate(...) method of our Top2 class takes two inputs. The first one is the accumulator and the second one is the user-defined input. In order to calculate a result, the accumulator needs to store the 2 highest values of all the data that has been accumulated. Accumulators are automatically managed by Flink’s checkpointing mechanism and are restored in case of a failure to ensure exactly-once semantics. The result values are emitted together with a ranking index.

Mandatory and Optional Methods

The following methods are mandatory for each TableAggregateFunction:

  • createAccumulator()
  • accumulate(...)
  • emitValue(...) or emitUpdateWithRetract(...)

Additionally, there are a few methods that can be optionally implemented. While some of these methods allow the system more efficient query execution, others are mandatory for certain use cases. For instance, the merge(...) method is mandatory if the aggregation function should be applied in the context of a session group window (the accumulators of two session windows need to be joined when a row is observed that “connects” them).

The following methods of TableAggregateFunction are required depending on the use case:

  • retract(...) is required for aggregations on OVER windows.
  • merge(...) is required for many bounded aggregations and unbounded session and hop window aggregations.
  • emitValue(...) is required for bounded and window aggregations.

The following methods of TableAggregateFunction are used to improve the performance of streaming jobs:

  • emitUpdateWithRetract(...) is used to emit values that have been updated under retract mode.

The emitValue(...) method always emits the full data according to the accumulator. In unbounded scenarios, this may bring performance problems. Take a Top N function as an example. The emitValue(...) would emit all N values each time. In order to improve the performance, one can implement emitUpdateWithRetract(...) which outputs data incrementally in retract mode. In other words, once there is an update, the method can retract old records before sending new, updated ones. The method will be used in preference to the emitValue(...) method.

If the table aggregate function can only be applied in an OVER window, this can be declared by returning the requirement FunctionRequirement.OVER_WINDOW_ONLY in getRequirements().

If an accumulator needs to store large amounts of data, org.apache.flink.table.api.dataview.ListView and org.apache.flink.table.api.dataview.MapView provide advanced features for leveraging Flink’s state backends in unbounded data scenarios. Please see the docs of the corresponding classes for more information about this advanced feature.

Since some of methods are optional or can be overloaded, the methods are called by generated code. The base class does not always provide a signature to be overridden by the concrete implementation class. Nevertheless, all mentioned methods must be declared publicly, not static, and named exactly as the names mentioned above to be called.

Detailed documentation for all methods that are not declared in TableAggregateFunction and called by generated code is given below.

accumulate(...)

Java

  1. /*
  2. * Processes the input values and updates the provided accumulator instance. The method
  3. * accumulate can be overloaded with different custom types and arguments. An aggregate function
  4. * requires at least one accumulate() method.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  8. */
  9. public void accumulate(ACC accumulator, [user defined inputs])

Scala

  1. /*
  2. * Processes the input values and updates the provided accumulator instance. The method
  3. * accumulate can be overloaded with different custom types and arguments. An aggregate function
  4. * requires at least one accumulate() method.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  8. */
  9. def accumulate(accumulator: ACC, [user defined inputs]): Unit

retract(...)

Java

  1. /*
  2. * Retracts the input values from the accumulator instance. The current design assumes the
  3. * inputs are the values that have been previously accumulated. The method retract can be
  4. * overloaded with different custom types and arguments. This method must be implemented for
  5. * bounded OVER aggregates over unbounded tables.
  6. *
  7. * param: accumulator the accumulator which contains the current aggregated results
  8. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  9. */
  10. public void retract(ACC accumulator, [user defined inputs])

Scala

  1. /*
  2. * Retracts the input values from the accumulator instance. The current design assumes the
  3. * inputs are the values that have been previously accumulated. The method retract can be
  4. * overloaded with different custom types and arguments. This method must be implemented for
  5. * bounded OVER aggregates over unbounded tables.
  6. *
  7. * param: accumulator the accumulator which contains the current aggregated results
  8. * param: [user defined inputs] the input value (usually obtained from new arrived data).
  9. */
  10. def retract(accumulator: ACC, [user defined inputs]): Unit

merge(...)

Java

  1. /*
  2. * Merges a group of accumulator instances into one accumulator instance. This method must be
  3. * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
  4. *
  5. * param: accumulator the accumulator which will keep the merged aggregate results. It should
  6. * be noted that the accumulator may contain the previous aggregated
  7. * results. Therefore user should not replace or clean this instance in the
  8. * custom merge method.
  9. * param: iterable an java.lang.Iterable pointed to a group of accumulators that will be
  10. * merged.
  11. */
  12. public void merge(ACC accumulator, java.lang.Iterable<ACC> iterable)

Scala

  1. /*
  2. * Merges a group of accumulator instances into one accumulator instance. This method must be
  3. * implemented for unbounded session window grouping aggregates and bounded grouping aggregates.
  4. *
  5. * param: accumulator the accumulator which will keep the merged aggregate results. It should
  6. * be noted that the accumulator may contain the previous aggregated
  7. * results. Therefore user should not replace or clean this instance in the
  8. * custom merge method.
  9. * param: iterable an java.lang.Iterable pointed to a group of accumulators that will be
  10. * merged.
  11. */
  12. def merge(accumulator: ACC, iterable: java.lang.Iterable[ACC]): Unit

emitValue(...)

Java

  1. /*
  2. * Called every time when an aggregation result should be materialized. The returned value could
  3. * be either an early and incomplete result (periodically emitted as data arrives) or the final
  4. * result of the aggregation.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: out the collector used to output data.
  8. */
  9. public void emitValue(ACC accumulator, org.apache.flink.util.Collector<T> out)

Scala

  1. /*
  2. * Called every time when an aggregation result should be materialized. The returned value could
  3. * be either an early and incomplete result (periodically emitted as data arrives) or the final
  4. * result of the aggregation.
  5. *
  6. * param: accumulator the accumulator which contains the current aggregated results
  7. * param: out the collector used to output data.
  8. */
  9. def emitValue(accumulator: ACC, out: org.apache.flink.util.Collector[T]): Unit

emitUpdateWithRetract(...)

Java

  1. /*
  2. * Called every time when an aggregation result should be materialized. The returned value could
  3. * be either an early and incomplete result (periodically emitted as data arrives) or the final
  4. * result of the aggregation.
  5. *
  6. * Compared to emitValue(), emitUpdateWithRetract() is used to emit values that have been updated. This method
  7. * outputs data incrementally in retraction mode (also known as "update before" and "update after"). Once
  8. * there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract()
  9. * method will be used in preference to the emitValue() method if both methods are defined in the table aggregate
  10. * function, because the method is treated to be more efficient than emitValue as it can output
  11. * values incrementally.
  12. *
  13. * param: accumulator the accumulator which contains the current aggregated results
  14. * param: out the retractable collector used to output data. Use the collect() method
  15. * to output(add) records and use retract method to retract(delete)
  16. * records.
  17. */
  18. public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out)

Scala

  1. /*
  2. * Called every time when an aggregation result should be materialized. The returned value could
  3. * be either an early and incomplete result (periodically emitted as data arrives) or the final
  4. * result of the aggregation.
  5. *
  6. * Compared to emitValue(), emitUpdateWithRetract() is used to emit values that have been updated. This method
  7. * outputs data incrementally in retraction mode (also known as "update before" and "update after"). Once
  8. * there is an update, we have to retract old records before sending new updated ones. The emitUpdateWithRetract()
  9. * method will be used in preference to the emitValue() method if both methods are defined in the table aggregate
  10. * function, because the method is treated to be more efficient than emitValue as it can output
  11. * values incrementally.
  12. *
  13. * param: accumulator the accumulator which contains the current aggregated results
  14. * param: out the retractable collector used to output data. Use the collect() method
  15. * to output(add) records and use retract method to retract(delete)
  16. * records.
  17. */
  18. def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): Unit

Retraction Example

The following example shows how to use the emitUpdateWithRetract(...) method to emit only incremental updates. In order to do so, the accumulator keeps both the old and new top 2 values.

Note: Do not update accumulator within emitUpdateWithRetract because after function#emitUpdateWithRetract is invoked, GroupTableAggFunction will not re-invoke function#getAccumulators to update the latest accumulator to state.

Java

  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.table.functions.TableAggregateFunction;
  3. public static class Top2WithRetractAccumulator {
  4. public Integer first;
  5. public Integer second;
  6. public Integer oldFirst;
  7. public Integer oldSecond;
  8. }
  9. public static class Top2WithRetract
  10. extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2WithRetractAccumulator> {
  11. @Override
  12. public Top2WithRetractAccumulator createAccumulator() {
  13. Top2WithRetractAccumulator acc = new Top2WithRetractAccumulator();
  14. acc.first = Integer.MIN_VALUE;
  15. acc.second = Integer.MIN_VALUE;
  16. acc.oldFirst = Integer.MIN_VALUE;
  17. acc.oldSecond = Integer.MIN_VALUE;
  18. return acc;
  19. }
  20. public void accumulate(Top2WithRetractAccumulator acc, Integer v) {
  21. acc.oldFirst = acc.first;
  22. acc.oldSecond = acc.second;
  23. if (v > acc.first) {
  24. acc.second = acc.first;
  25. acc.first = v;
  26. } else if (v > acc.second) {
  27. acc.second = v;
  28. }
  29. }
  30. public void emitUpdateWithRetract(
  31. Top2WithRetractAccumulator acc,
  32. RetractableCollector<Tuple2<Integer, Integer>> out) {
  33. if (!acc.first.equals(acc.oldFirst)) {
  34. // if there is an update, retract the old value then emit a new value
  35. if (acc.oldFirst != Integer.MIN_VALUE) {
  36. out.retract(Tuple2.of(acc.oldFirst, 1));
  37. }
  38. out.collect(Tuple2.of(acc.first, 1));
  39. }
  40. if (!acc.second.equals(acc.oldSecond)) {
  41. // if there is an update, retract the old value then emit a new value
  42. if (acc.oldSecond != Integer.MIN_VALUE) {
  43. out.retract(Tuple2.of(acc.oldSecond, 2));
  44. }
  45. out.collect(Tuple2.of(acc.second, 2));
  46. }
  47. }
  48. }

Scala

  1. import org.apache.flink.api.java.tuple.Tuple2
  2. import org.apache.flink.table.functions.TableAggregateFunction
  3. import org.apache.flink.table.functions.TableAggregateFunction.RetractableCollector
  4. case class Top2WithRetractAccumulator(
  5. var first: Integer,
  6. var second: Integer,
  7. var oldFirst: Integer,
  8. var oldSecond: Integer
  9. )
  10. class Top2WithRetract
  11. extends TableAggregateFunction[Tuple2[Integer, Integer], Top2WithRetractAccumulator] {
  12. override def createAccumulator(): Top2WithRetractAccumulator = {
  13. Top2WithRetractAccumulator(
  14. Integer.MIN_VALUE,
  15. Integer.MIN_VALUE,
  16. Integer.MIN_VALUE,
  17. Integer.MIN_VALUE
  18. )
  19. }
  20. def accumulate(acc: Top2WithRetractAccumulator, value: Integer): Unit = {
  21. acc.oldFirst = acc.first
  22. acc.oldSecond = acc.second
  23. if (value > acc.first) {
  24. acc.second = acc.first
  25. acc.first = value
  26. } else if (value > acc.second) {
  27. acc.second = value
  28. }
  29. }
  30. def emitUpdateWithRetract(
  31. acc: Top2WithRetractAccumulator,
  32. out: RetractableCollector[Tuple2[Integer, Integer]])
  33. : Unit = {
  34. if (!acc.first.equals(acc.oldFirst)) {
  35. // if there is an update, retract the old value then emit a new value
  36. if (acc.oldFirst != Integer.MIN_VALUE) {
  37. out.retract(Tuple2.of(acc.oldFirst, 1))
  38. }
  39. out.collect(Tuple2.of(acc.first, 1))
  40. }
  41. if (!acc.second.equals(acc.oldSecond)) {
  42. // if there is an update, retract the old value then emit a new value
  43. if (acc.oldSecond != Integer.MIN_VALUE) {
  44. out.retract(Tuple2.of(acc.oldSecond, 2))
  45. }
  46. out.collect(Tuple2.of(acc.second, 2))
  47. }
  48. }
  49. }