数据库中的聚合算子,例如 SUM,AVG,GROUP BY,SORT 等,会耗费大量的 CPU 和 IO 资源进行计算和读写,通过并行执行的方式,将对应的算子下压到各个计算节点上,充分利用集群的计算资源,提升执行效率。
总的来看,聚合算子的下压可以分为 Partition-Wise Aggregation 计划和下压加两阶段聚合计划。
Partition-Wise Aggregation
如果聚合查询的语意是按照分区键去做分组且在分组内做聚合,那么这样的聚合操作称为 Partition-Wise Aggregation。所有的操作都被下压到分区内并行的执行。需要注意的是,Partition-Wise Aggregation 不一定是最优的执行,因为并行度会受限于分区的数量。
explain select sum(c2) from t2 group by c1\G;
*************************** 1. row ***************************
Query Plan:
============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR | |1000 |2834|
|1 | EXCHANGE OUT DISTR| |1000 |2740|
|2 | HASH GROUP BY | |1000 |2740|
|3 | TABLE SCAN |t2 |4000 |499 |
============================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
2 - output([T_FUN_SUM(t2.c2)]), filter(nil),
group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
下压加两阶段聚合
在更一般的情况下,aggregation 操作可能不是按照分区键作为分组来做的,在这样的情况下,OceanBase 数据库会采用两阶段聚合的方式,如下例所示。
将 aggregation 操作下压,获得部分的聚合结果。
进行汇总的方式获得最终结果。
explain select sum(c1) from t2 group by c2\G;
*************************** 1. row ***************************
Query Plan:
=============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
---------------------------------------------
|0 |HASH GROUP BY | |1000 |3395|
|1 | EXCHANGE IN DISTR | |1000 |2834|
|2 | EXCHANGE OUT DISTR| |1000 |2740|
|3 | HASH GROUP BY | |1000 |2740|
|4 | TABLE SCAN |t2 |4000 |499 |
=============================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
4 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
explain select sum(c1) from t2 group by c2\G;
*************************** 1. row ***************************
Query Plan:
=============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
---------------------------------------------
|0 |HASH GROUP BY | |1000 |3395|
|1 | EXCHANGE IN DISTR | |1000 |2834|
|2 | EXCHANGE OUT DISTR| |1000 |2740|
|3 | HASH GROUP BY | |1000 |2740|
|4 | TABLE SCAN |t2 |4000 |499 |
=============================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
4 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])