聚合数据模型,也称为 Aggregate 数据模型。

下面以实际的例子来说明什么是聚合模型,以及如何正确的使用聚合模型。

导入数据聚合

假设业务有如下数据表模式:

ColumnNameTypeAggregationTypeComment
user_idLARGEINT用户 id
dateDATE数据灌入日期
cityVARCHAR(20)用户所在城市
ageSMALLINT用户年龄
sexTINYINT用户性别
last_visit_dateDATETIMEREPLACE用户最后一次访问时间
costBIGINTSUM用户总消费
max_dwell_timeINTMAX用户最大停留时间
min_dwell_timeINTMIN用户最小停留时间

如果转换成建表语句则如下(省略建表语句中的 Partition 和 Distribution 信息)

  1. CREATE TABLE IF NOT EXISTS example_tbl_agg1
  2. (
  3. `user_id` LARGEINT NOT NULL COMMENT "用户id",
  4. `date` DATE NOT NULL COMMENT "数据灌入日期时间",
  5. `city` VARCHAR(20) COMMENT "用户所在城市",
  6. `age` SMALLINT COMMENT "用户年龄",
  7. `sex` TINYINT COMMENT "用户性别",
  8. `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
  9. `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
  10. `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
  11. `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
  12. )
  13. AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
  14. DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
  15. PROPERTIES (
  16. "replication_allocation" = "tag.location.default: 1"
  17. );

这是一个典型的用户信息和访问行为的事实表。在一般星型模型中,用户信息和访问行为一般分别存放在维度表和事实表中。这里为了更加方便的解释 Doris 的数据模型,将两部分信息统一存放在一张表中。

表中的列按照是否设置了 AggregationType,分为 Key (维度列) 和 Value(指标列)。没有设置 AggregationType 的 user_id、date、age、sex 称为 Key,而设置了 AggregationType 的称为 Value。

当导入数据时,对于 Key 列相同的行会聚合成一行,而 Value 列会按照设置的 AggregationType 进行聚合。AggregationType 目前有以下几种聚合方式和 agg_state:

  • SUM:求和,多行的 Value 进行累加。

  • REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。

  • MAX:保留最大值。

  • MIN:保留最小值。

  • REPLACE_IF_NOT_NULL:非空值替换。和 REPLACE 的区别在于对于 null 值,不做替换。

  • HLL_UNION:HLL 类型的列的聚合方式,通过 HyperLogLog 算法聚合。

  • BITMAP_UNION:BIMTAP 类型的列的聚合方式,进行位图的并集聚合。

聚合模型 - 图1警告

如果这几种聚合方式无法满足需求,则可以选择使用 agg_state 类型。

假设有以下导入数据(原始数据):

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1北京2002017/10/1 6:00201010
100002017/10/1北京2002017/10/1 7:001522
100012017/10/1北京3012017/10/1 17:0522222
100022017/10/2上海2012017/10/2 12:5920055
100032017/10/2广州3202017/10/2 11:20301111
100042017/10/1深圳3502017/10/1 10:0010033
100042017/10/3深圳3502017/10/3 10:201166

通过 SQL 导入数据:

  1. insert into example_tbl_agg1 values
  2. (10000,"2017-10-01","北京",20,0,"2017-10-01 06:00:00",20,10,10),
  3. (10000,"2017-10-01","北京",20,0,"2017-10-01 07:00:00",15,2,2),
  4. (10001,"2017-10-01","北京",30,1,"2017-10-01 17:05:45",2,22,22),
  5. (10002,"2017-10-02","上海",20,1,"2017-10-02 12:59:12",200,5,5),
  6. (10003,"2017-10-02","广州",32,0,"2017-10-02 11:20:00",30,11,11),
  7. (10004,"2017-10-01","深圳",35,0,"2017-10-01 10:00:15",100,3,3),
  8. (10004,"2017-10-03","深圳",35,0,"2017-10-03 10:20:22",11,6,6);

这是一张记录用户访问某商品页面行为的表。以第一行数据为例,解释如下:

数据说明
10000用户 id,每个用户唯一识别 id
2017/10/1数据入库时间,精确到日期
北京用户所在城市
20用户年龄
0性别男(1 代表女性)
2017/10/1 6:00用户本次访问该页面的时间,精确到秒
20用户本次访问产生的消费
10用户本次访问,驻留该页面的时间
10用户本次访问,驻留该页面的时间(冗余)

那么当这批数据正确导入到 Doris 中后,Doris 中最终存储如下:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1北京2002017/10/1 7:0035102
100012017/10/1北京3012017/10/1 17:0522222
100022017/10/2上海2012017/10/2 12:5920055
100032017/10/2广州3202017/10/2 11:20301111
100042017/10/1深圳3502017/10/1 10:0010033
100042017/10/3深圳3502017/10/3 10:201166

可以看到,用户 10000 只剩下了一行聚合后的数据。而其余用户的数据和原始数据保持一致。对于用户 10000 聚合后的数据,前 5 列没有变化:

  • 第 6 列值为 2017-10-01 07:00:00。因为 last_visit_date 列的聚合方式为 REPLACE,所以 2017-10-01 07:00:00 替换了 2017-10-01 06:00:00 保存了下来。注意:在同一个导入批次中的数据,对于 REPLACE 这种聚合方式,替换顺序不做保证,如在这个例子中,最终保存下来的,也有可能是 2017-10-01 06:00:00;而对于不同导入批次中的数据,可以保证,后一批次的数据会替换前一批次。

  • 第 7 列值为 35:因为 cost 列的聚合类型为 SUM,所以由 20 + 15 累加获得 35。

  • 第 8 列值为 10:因为 max_dwell_time 列的聚合类型为 MAX,所以 10 和 2 取最大值,获得 10。

  • 第 9 列值为 2:因为 min_dwell_time 列的聚合类型为 MIN,所以 10 和 2 取最小值,获得 2。

经过聚合,Doris 中最终只会存储聚合后的数据。换句话说,即明细数据会丢失,用户不能够再查询到聚合前的明细数据了。

导入数据与已有数据聚合

假设现在表中已经拥有了前面导入的数据:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1北京2002017/10/1 7:0035102
100012017/10/1北京3012017/10/1 17:0522222
100022017/10/2上海2012017/10/2 12:5920055
100032017/10/2广州3202017/10/2 11:20301111
100042017/10/1深圳3502017/10/1 10:0010033
100042017/10/3深圳3502017/10/3 10:201166

再导入一批新的数据:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100042017/10/3深圳3502017/10/3 11:22441919
100052017/10/3长沙2912017/10/3 18:11311

通过 SQL 导入数据:

  1. insert into example_tbl_agg1 values
  2. (10004,"2017-10-03","深圳",35,0,"2017-10-03 11:22:00",44,19,19),
  3. (10005,"2017-10-03","长沙",29,1,"2017-10-03 18:11:02",3,1,1);

那么当这批数据正确导入到 Doris 中后,Doris 中最终存储如下:

user_iddatecityagesexlast_visit_datecostmax_dwell_timemin_dwell_time
100002017/10/1北京2002017/10/1 7:0035102
100012017/10/1北京3012017/10/1 17:0522222
100022017/10/2上海2012017/10/2 12:5920055
100032017/10/2广州3202017/10/2 11:20301111
100042017/10/1深圳3502017/10/1 10:0010033
100042017/10/3深圳3502017/10/3 11:2255196
100052017/10/3长沙2912017/10/3 18:11311

可以看到,用户 10004 的已有数据和新导入的数据发生了聚合。同时新增了 10005 用户的数据。

数据的聚合,在 Doris 中有如下三个阶段发生:

  1. 每一批次数据导入的 ETL 阶段。该阶段会在每一批次导入的数据内部进行聚合。

  2. 底层 BE 进行数据 Compaction 的阶段。该阶段,BE 会对已导入的不同批次的数据进行进一步的聚合。

  3. 数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。

数据在不同时间,可能聚合的程度不一致。比如一批数据刚导入时,可能还未与之前已存在的数据进行聚合。但是对于用户而言,用户只能查询到聚合后的数据。即不同的聚合程度对于用户查询而言是透明的。用户需始终认为数据以最终的完成的聚合程度存在,而不应假设某些聚合还未发生。(可参阅聚合模型的局限性一节获得更多详情。)

agg_state

  1. AGG_STATE不能作为key列使用,建表时需要同时声明聚合函数的签名。
  2. 用户不需要指定长度和默认值。实际存储的数据大小与函数实现有关。

建表

  1. set enable_agg_state=true;
  2. create table aggstate(
  3. k1 int null,
  4. k2 agg_state<sum(int)> generic,
  5. k3 agg_state<group_concat(string)> generic
  6. )
  7. aggregate key (k1)
  8. distributed BY hash(k1) buckets 3
  9. properties("replication_num" = "1");

其中 agg_state 用于声明数据类型为 agg_state,sum/group_concat 为聚合函数的签名。注意 agg_state 是一种数据类型,同 int/array/string

agg_state 只能配合state /merge/union函数组合器使用。

agg_state 是聚合函数的中间结果,例如,聚合函数 sum,则 agg_state 可以表示 sum(1,2,3,4,5) 的这个中间状态,而不是最终的结果。

agg_state 类型需要使用 state 函数来生成,对于当前的这个表,则为sum_state,group_concat_state

  1. insert into aggstate values(1,sum_state(1),group_concat_state('a'));
  2. insert into aggstate values(1,sum_state(2),group_concat_state('b'));
  3. insert into aggstate values(1,sum_state(3),group_concat_state('c'));

此时表只有一行 ( 注意,下面的表只是示意图,不是真的可以 select 显示出来)

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)

再插入一条数据

  1. insert into aggstate values(2,sum_state(4),group_concat_state('d'));

此时表的结构为

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)
2sum(4)group_concat_state(d)

我们可以通过 merge 操作来合并多个 state,并且返回最终聚合函数计算的结果

  1. mysql> select sum_merge(k2) from aggstate;
  2. +---------------+
  3. | sum_merge(k2) |
  4. +---------------+
  5. | 10 |
  6. +---------------+

sum_merge 会先把 sum(1,2,3) 和 sum(4) 合并成 sum(1,2,3,4) ,并返回计算的结果。因为 group_concat 对于顺序有要求,所以结果是不稳定的。

  1. mysql> select group_concat_merge(k3) from aggstate;
  2. +------------------------+
  3. | group_concat_merge(k3) |
  4. +------------------------+
  5. | c,b,a,d |
  6. +------------------------+

如果不想要聚合的最终结果,可以使用 union 来合并多个聚合的中间结果,生成一个新的中间结果。

  1. insert into aggstate select 3,sum_union(k2),group_concat_union(k3) from aggstate ;

此时的表结构为

k1k2k3
1sum(1,2,3)group_concat_state(a,b,c)
2sum(4)group_concat_state(d)
3sum(1,2,3,4)group_concat_state(a,b,c,d)

可以通过查询

  1. mysql> select sum_merge(k2) , group_concat_merge(k3)from aggstate;
  2. +---------------+------------------------+
  3. | sum_merge(k2) | group_concat_merge(k3) |
  4. +---------------+------------------------+
  5. | 20 | c,b,a,d,c,b,a,d |
  6. +---------------+------------------------+
  7. mysql> select sum_merge(k2) , group_concat_merge(k3)from aggstate where k1 != 2;
  8. +---------------+------------------------+
  9. | sum_merge(k2) | group_concat_merge(k3) |
  10. +---------------+------------------------+
  11. | 16 | c,b,a,d,c,b,a |
  12. +---------------+------------------------+

用户可以通过 agg_state 做出更细致的聚合函数操作。

注意 agg_state 存在一定的性能开销。