UDF功能

详细介绍一下如何使用UDF功能

整体步骤说明

  • 在本地按UDF函数格式 编写udf 函数 ,并打包称jar包文件
  • 在【Scriptis >> 工作空间】上传至工作空间对应的目录
  • 在 【管理台>>UDF函数】 创建udf (默认加载)
  • 在任务代码中使用(对于新起的引擎才生效)

Step1 本地编写jar包

Hive UDF示例:

  1. 引入 hive 依赖
  1. <dependency>
  2. <groupId>org.apache.hive</groupId>
  3. <artifactId>hive-exec</artifactId>
  4. <version>3.1.3</version>
  5. </dependency>
  1. 编写UDF 类
  1. import org.apache.hadoop.hive.ql.exec.UDF;
  2. public class UDFExample extends UDF {
  3. public Integer evaluate(Integer value) {
  4. return value == null ? null : value + 1;
  5. }
  6. }
  7. 3. 编译打包
  8. ```shell
  9. mvn package

Step2【Scriptis >> 工作空间】上传jar包 选择对应的文件夹 鼠标右键 选择上传

UDF功能 - 图1

Step3【管理台>>UDF函数】 创建UDF

  • 函数名称:符合规则即可,如test_udf_jar 在sql等脚本中使用
  • 函数类型:通用
  • 脚本路径:选择jar包存放的共享目录路径 如 ../../../wds_functions_1_0_0.jar
  • 注册格式:包名+类名,如 com.webank.wedatasphere.willink.bdp.udf.ToUpperCase
  • 使用格式:输入类型与返回类型,需与jar包里定义一致
  • 分类:下拉选择;或者输入自定义目录(会在个人函数下新建目标一级目录)

UDF功能 - 图2

注意 新建的udf 函数 是默认加载的 可以在 【Scriptis >> UDF函数】 页面查看到,方便大家在Scriptis 任务编辑时 方便查看,勾选中的UDF函数 表明是会被加载使用的

UDF功能 - 图3

Step4 使用该udf函数

在任务中 使用上述步骤创新的udf 函数 函数名为 【创建UDF】 函数名称 在pyspark中: print (sqlContext.sql(“select test_udf_jar(name1) from stacyyan_ind.result_sort_1_20200226”).collect())

整体步骤说明

  • 在【Scriptis >> 工作空间】在需要的目录下新建Spark脚本文件
  • 在 【管理台>>UDF函数】 创建udf (默认加载)
  • 在任务代码中使用(对于新起的引擎才生效)

Step1 dss-scriptis-新建scala脚本

UDF功能 - 图4

def helloWorld(str: String): String = “hello, “ + str

Step2 创建UDF

  • 函数名称:符合规则即可,如test_udf_scala
  • 函数类型:spark
  • 脚本路径:../../../b
  • 注册格式:输入类型与返回类型,需与定义一致;注册格式需定义的函数名严格保持一致,如helloWorld
  • 分类:下拉选择dss-scriptis-UDF函数-个人函数下存在的一级目录;或者输入自定义目录(会在个人函数下新建目标一级目录)

UDF功能 - 图5

Step3 使用该udf函数

在任务中 使用上述步骤创建新的udf 函数 函数名为 【创建UDF】 函数名称

  • 在scala中 val s=sqlContext.sql(“select test_udf_scala(name1) from stacyyan_ind.result_sort_1_20200226”) show(s)
  • 在pyspark中 print(sqlContext.sql(“select test_udf_scala(name1) from stacyyan_ind.result_sort_1_20200226”).collect());
  • 在sql中 select test_udf_scala(name1) from stacyyan_ind.result_sort_1_20200226;

整体步骤说明

  • 在【Scriptis >> 工作空间】在需要的目录下新建Python脚本文件
  • 在 【管理台>>UDF函数】 创建udf (默认加载)
  • 在任务代码中使用(对于新起的引擎才生效)

Step1 dss-scriptis-新建pyspark脚本

UDF功能 - 图6

def addation(a, b): return a + b Step2 创建UDF

  • 函数名称:符合规则即可,如test_udf_py
  • 函数类型:spark
  • 脚本路径:../../../a
  • 注册格式:需定义的函数名严格保持一致,如addation
  • 使用格式:输入类型与返回类型,需与定义一致
  • 分类:下拉选择dss-scriptis-UDF函数-个人函数下存在的一级目录;或者输入自定义目录(会在个人函数下新建目标一级目录)

UDF功能 - 图7

Step3 使用该udf函数 在任务中 使用上述步骤创建新的udf 函数 函数名为 【创建UDF】 函数名称

  • 在pyspark中 print(sqlContext.sql(“select test_udf_py(pv,impression) from neiljianliu_ind.alias where entityid=504059 limit 50”).collect());
  • 在sql中 select test_udf_py(pv,impression) from neiljianliu_ind.alias where entityid=504059 limit 50

整体步骤说明

  • 在【Scriptis >> 工作空间】在需要的目录下新建Spark Scala脚本文件
  • 在 【管理台>>UDF函数】 创建udf (默认加载)
  • 在任务代码中使用(对于新起的引擎才生效)

Step1 dss-scriptis-新建scala脚本 def hellozdy(str:String):String = “hellozdy,haha “ + str

Step2 创建函数

  • 函数名称:需与定义的函数名严格保持一致,如hellozdy
  • 函数类型:自定义函数
  • 脚本路径:../../../d
  • 使用格式:输入类型与返回类型,需与定义一致
  • 分类:下拉选择dss-scriptis-方法函数-个人函数下存在的一级目录;或者输入自定义目录(会在个人函数下新建目标一级目录)

Step3 使用该函数 在任务中 使用上述步骤创建新的udf 函数 函数名为 【创建UDF】 函数名称 val a = hellozdy(“abcd”); print(a)

“FAILED: SemanticException [Error 10011]: Invalid function xxxx”

UDF功能 - 图8

  • 首先检查UDF函数配置是否正确:

    UDF功能 - 图9

  • 注册格式即为函数路径名称:

    UDF功能 - 图10

  • 检查scriptis-udf函数-查看加载的函数是否勾选,当函数未勾选时,引擎启动时将不会加载udf

    UDF功能 - 图11

  • 检查引擎是否已加载UDF,如果未加载,请重新另起一个引擎或者重启当前引擎 备注:只有当引擎初始化时,才会加载UDF,中途新增UDF,当前引擎将无法感知并且无法进行加载