Hive Functions

Hive User Defined Functions

Users can use their existing Hive User Defined Functions in Flink.

Supported UDF types include:

  • UDF
  • GenericUDF
  • GenericUDTF
  • UDAF
  • GenericUDAFResolver2Upon query planning and execution, Hive’s UDF and GenericUDF are automatically translated into Flink’s ScalarFunction,Hive’s GenericUDTF is automatically translated into Flink’s TableFunction,and Hive’s UDAF and GenericUDAFResolver2 are translated into Flink’s AggregateFunction.

To use a Hive User Defined Function, user have to

  • set a HiveCatalog backed by Hive Metastore that contains that function as current catalog of the session
  • include a jar that contains that function in Flink’s classpath
  • use Blink planner.

Using Hive User Defined Functions

Assuming we have the following Hive functions registered in Hive Metastore:

  1. /**
  2. * Test simple udf. Registered under name 'myudf'
  3. */
  4. public class TestHiveSimpleUDF extends UDF {
  5. public IntWritable evaluate(IntWritable i) {
  6. return new IntWritable(i.get());
  7. }
  8. public Text evaluate(Text text) {
  9. return new Text(text.toString());
  10. }
  11. }
  12. /**
  13. * Test generic udf. Registered under name 'mygenericudf'
  14. */
  15. public class TestHiveGenericUDF extends GenericUDF {
  16. @Override
  17. public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
  18. checkArgument(arguments.length == 2);
  19. checkArgument(arguments[1] instanceof ConstantObjectInspector);
  20. Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
  21. checkArgument(constant instanceof IntWritable);
  22. checkArgument(((IntWritable) constant).get() == 1);
  23. if (arguments[0] instanceof IntObjectInspector ||
  24. arguments[0] instanceof StringObjectInspector) {
  25. return arguments[0];
  26. } else {
  27. throw new RuntimeException("Not support argument: " + arguments[0]);
  28. }
  29. }
  30. @Override
  31. public Object evaluate(DeferredObject[] arguments) throws HiveException {
  32. return arguments[0].get();
  33. }
  34. @Override
  35. public String getDisplayString(String[] children) {
  36. return "TestHiveGenericUDF";
  37. }
  38. }
  39. /**
  40. * Test split udtf. Registered under name 'mygenericudtf'
  41. */
  42. public class TestHiveUDTF extends GenericUDTF {
  43. @Override
  44. public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
  45. checkArgument(argOIs.length == 2);
  46. // TEST for constant arguments
  47. checkArgument(argOIs[1] instanceof ConstantObjectInspector);
  48. Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
  49. checkArgument(constant instanceof IntWritable);
  50. checkArgument(((IntWritable) constant).get() == 1);
  51. return ObjectInspectorFactory.getStandardStructObjectInspector(
  52. Collections.singletonList("col1"),
  53. Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
  54. }
  55. @Override
  56. public void process(Object[] args) throws HiveException {
  57. String str = (String) args[0];
  58. for (String s : str.split(",")) {
  59. forward(s);
  60. forward(s);
  61. }
  62. }
  63. @Override
  64. public void close() {
  65. }
  66. }

From Hive CLI, we can see they are registered:

  1. hive> show functions;
  2. OK
  3. ......
  4. mygenericudf
  5. myudf
  6. myudtf

Then, users can use them in SQL as:

  1. Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);

Limitations

Hive built-in functions are currently not supported out of box in Flink. To use Hive built-in functions, users must register them manually in Hive Metastore first.

Support for Hive functions has only been tested for Flink batch in Blink planner.

Hive functions currently cannot be used across catalogs in Flink.

Please reference to Hive for data type limitations.