Hive HLL UDF

Hive HLL UDF 提供了在 hive 表中生成 HLL 运算等 UDF,Hive 中的 HLL 与 Doris HLL 完全一致,Hive 中的 HLL 可以通过 Spark HLL Load 导入 Doris。关于 HLL 更多介绍可以参考:使用 HLL 近似去重

函数简介:

  1. UDAF

    · to_hll:聚合函数,返回一个 Doris HLL 列,类似于 to_bitmap 函数

    · hll_union:聚合函数,功能同 Doris 的 BE 同名函数,计算分组的并集,返回一个 Doris HLL 列,类似于 bitmap_union 函数

  2. UDF

    · hll_cardinality:返回添加到 HLL 的不同元素的数量,类似于 bitmap_count 函数

    主要目的:

  3. 减少数据导入 Doris 时间 , 除去了构建字典、HLL 预聚合等流程;

  4. 节省 Hive 存储,使用 HLL 对数据压缩,极大减少了存储成本,相对于 Bitmap 的统计更加节省存储;

  5. 提供在 Hive 中 HLL 的灵活运算:并集、基数统计,计算后的 HLL 也可以直接导入 Doris;

    注意事项: HLL 统计为近似计算有一定误差,大概 1%~2% 左右。

使用方法

在 Hive 中创建 HLL 类型和普通表,往普通表插入测试数据

  1. -- 创建一个测试数据库,以 hive_test 为例:
  2. use hive_test;
  3. -- 例子:创建 Hive HLL
  4. CREATE TABLE IF NOT EXISTS `hive_hll_table`(
  5. `k1` int COMMENT '',
  6. `k2` String COMMENT '',
  7. `k3` String COMMENT '',
  8. `uuid` binary COMMENT 'hll'
  9. ) comment 'comment'
  10. -- 例子:创建普通 Hive 表,插入测试数据
  11. CREATE TABLE IF NOT EXISTS `hive_table`(
  12. `k1` int COMMENT '',
  13. `k2` String COMMENT '',
  14. `k3` String COMMENT '',
  15. `uuid` int COMMENT ''
  16. ) comment 'comment'
  17. insert into hive_table select 1, 'a', 'b', 12345;
  18. insert into hive_table select 1, 'a', 'c', 12345;
  19. insert into hive_table select 2, 'b', 'c', 23456;
  20. insert into hive_table select 3, 'c', 'd', 34567;

Hive HLL UDF 使用:

Hive HLL UDF 需要在 Hive/Spark 中使用,首先需要编译 fe 得到 hive-udf.jar。 编译准备工作:如果进行过 ldb 源码编译可直接编译 fe,如果没有进行过 ldb 源码编译,则需要手动安装 thrift,可参考:FE 开发环境搭建 中的编译与安装

  1. --clone doris 源码
  2. git clone https://github.com/apache/doris.git
  3. cd doris
  4. git submodule update --init --recursive
  5. --安装 thrift,已安装可略过
  6. --进入 fe 目录
  7. cd fe
  8. --执行 maven 打包命令(fe 的子 module 会全部打包)
  9. mvn package -Dmaven.test.skip=true
  10. --也可以只打 hive-udf module
  11. mvn package -pl hive-udf -am -Dmaven.test.skip=true
  12. -- 打包编译完成进入 hive-udf 目录会有 target 目录,里面就会有打包完成的 hive-udf.jar
  13. -- 需要将编译好的 hive-udf.jar 包上传至 HDFS,这里以传至 hdfs 的根目录为示例:
  14. hdfs dfs -put hive-udf/target/hive-udf.jar /

下面进入 Hive 中进行 SQL 语句操作:

  1. -- 加载 hive hll udf jar 包,根据实际情况更改 hostname port
  2. add jar hdfs://hostname:port/hive-udf.jar;
  3. -- 创建 UDAF 函数
  4. create temporary function to_hll as 'org.apache.doris.udf.ToHllUDAF' USING JAR 'hdfs://hostname:port/hive-udf.jar';
  5. create temporary function hll_union as 'org.apache.doris.udf.HllUnionUDAF' USING JAR 'hdfs://hostname:port/hive-udf.jar';
  6. -- 创建 UDF 函数
  7. create temporary function hll_cardinality as 'org.apache.doris.udf.HllCardinalityUDF' USING JAR 'hdfs://node:9000/hive-udf.jar';
  8. -- 例子:通过 to_hll 这个 UDAF 进行聚合生成 hll 写入 Hive HLL
  9. insert into hive_hll_table
  10. select
  11. k1,
  12. k2,
  13. k3,
  14. to_hll(uuid) as uuid
  15. from
  16. hive_table
  17. group by
  18. k1,
  19. k2,
  20. k3
  21. -- 例子:hll_cardinality 计算 hll 中元素个数
  22. select k1, k2, k3, hll_cardinality(uuid) from hive_hll_table;
  23. +-----+-----+-----+------+
  24. | k1 | k2 | k3 | _c3 |
  25. +-----+-----+-----+------+
  26. | 1 | a | b | 1 |
  27. | 1 | a | c | 1 |
  28. | 2 | b | c | 1 |
  29. | 3 | c | d | 1 |
  30. +-----+-----+-----+------+
  31. -- 例子:hll_union 用于计算分组后的 hll 并集,将返回 3
  32. select k1, hll_union(uuid) from hive_hll_table group by k1;
  33. -- 例子:也可以合并后继续统计
  34. select k3, hll_cardinality(hll_union(uuid)) from hive_hll_table group by k3;
  35. +-----+------+
  36. | k3 | _c1 |
  37. +-----+------+
  38. | b | 1 |
  39. | c | 2 |
  40. | d | 1 |
  41. +-----+------+

Hive HLL UDF 说明

Hive HLL 导入 doris

方法一:Catalog(推荐)

创建 Hive 表指定为 TEXT 格式,对于 Binary 类型,Hive 会以 base64 编码的字符串形式保存,此时可以通过 Hive Catalog 的形式,直接将 HLL 数据通过 hll_from_base64 函数插入到 Doris 内部。

以下是一个完整的例子:

  1. 在 Hive 中创建 Hive 表
  1. CREATE TABLE IF NOT EXISTS `hive_hll_table`(
  2. `k1` int COMMENT '',
  3. `k2` String COMMENT '',
  4. `k3` String COMMENT '',
  5. `uuid` binary COMMENT 'hll'
  6. ) stored as textfile
  7. -- 然后可以沿用前面的步骤基于普通表使用 to_hll 函数往 hive_hll_table 插入数据,这里不再赘述
  1. 在 Doris 中创建 Catalog
  1. CREATE CATALOG hive PROPERTIES (
  2. 'type'='hms',
  3. 'hive.metastore.uris' = 'thrift://127.0.0.1:9083'
  4. );
  1. 创建 Doris 内表
  1. CREATE TABLE IF NOT EXISTS `doris_test`.`doris_hll_table`(
  2. `k1` int COMMENT '',
  3. `k2` varchar(10) COMMENT '',
  4. `k3` varchar(10) COMMENT '',
  5. `uuid` HLL HLL_UNION COMMENT 'hll'
  6. )
  7. AGGREGATE KEY(k1, k2, k3)
  8. DISTRIBUTED BY HASH(`k1`) BUCKETS 1
  9. PROPERTIES (
  10. "replication_allocation" = "tag.location.default: 1"
  11. );
  1. 从 Hive 插入数据到 Doris 中
  1. insert into doris_hll_table select k1, k2, k3, hll_from_base64(uuid) from hive.hive_test.hive_hll_table;
  2. -- 可以查看导入后的数据,结合 hll_to_base64 进行解码
  3. select *, hll_to_base64(uuid) from doris_hll_table;
  4. +------+------+------+------+---------------------+
  5. | k1 | k2 | k3 | uuid | hll_to_base64(uuid) |
  6. +------+------+------+------+---------------------+
  7. | 1 | a | b | NULL | AQFw+a9MhpKhoQ== |
  8. | 1 | a | c | NULL | AQFw+a9MhpKhoQ== |
  9. | 2 | b | c | NULL | AQGyB7kbWBxh+A== |
  10. | 3 | c | d | NULL | AQFYbJB5VpNBhg== |
  11. +------+------+------+------+---------------------+
  12. -- 也可以进一步使用 Doris 原生的 HLL 函数进行统计,可以看到和前面在 Hive 中统计的结果一致
  13. select k3, hll_cardinality(hll_union(uuid)) from doris_hll_table group by k3;
  14. +------+----------------------------------+
  15. | k3 | hll_cardinality(hll_union(uuid)) |
  16. +------+----------------------------------+
  17. | b | 1 |
  18. | d | 1 |
  19. | c | 2 |
  20. +------+----------------------------------+
  21. -- 此时,查外表的数据,也就是查导入前的数据进行统计、对比也能校验数据正确性
  22. select k3, hll_cardinality(hll_union(hll_from_base64(uuid))) from hive.hive_test.hive_hll_table group by k3;
  23. +------+---------------------------------------------------+
  24. | k3 | hll_cardinality(hll_union(hll_from_base64(uuid))) |
  25. +------+---------------------------------------------------+
  26. | d | 1 |
  27. | b | 1 |
  28. | c | 2 |
  29. +------+---------------------------------------------------+

方法二:Spark Load

详见:Spark Load -> 基本操作 -> 创建导入 (示例 3:上游数据源是 hive binary 类型情况)