数据库中的聚合算子,例如 SUM,AVG,GROUP BY,SORT 等,会耗费大量的 CPU 和 IO 资源进行计算和读写,通过并行执行的方式,将对应的算子下压到各个计算节点上,充分利用集群的计算资源,提升执行效率。

总的来看,聚合算子的下压可以分为 Partition-Wise Aggregation 计划和下压加两阶段聚合计划。

Partition-Wise Aggregation

如果聚合查询的语意是按照分区键去做分组且在分组内做聚合,那么这样的聚合操作称为 Partition-Wise Aggregation。所有的操作都被下压到分区内并行的执行。需要注意的是,Partition-Wise Aggregation 不一定是最优的执行,因为并行度会受限于分区的数量。

  1. explain select sum(c2) from t2 group by c1\G;
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. ============================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST|
  6. --------------------------------------------
  7. |0 |EXCHANGE IN DISTR | |1000 |2834|
  8. |1 | EXCHANGE OUT DISTR| |1000 |2740|
  9. |2 | HASH GROUP BY | |1000 |2740|
  10. |3 | TABLE SCAN |t2 |4000 |499 |
  11. ============================================
  12. Outputs & filters:
  13. -------------------------------------
  14. 0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
  15. 1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
  16. 2 - output([T_FUN_SUM(t2.c2)]), filter(nil),
  17. group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
  18. 3 - output([t2.c1], [t2.c2]), filter(nil),
  19. access([t2.c1], [t2.c2]), partitions(p[0-3])

下压加两阶段聚合

在更一般的情况下,aggregation 操作可能不是按照分区键作为分组来做的,在这样的情况下,OceanBase 数据库会采用两阶段聚合的方式,如下例所示。

  1. 将 aggregation 操作下压,获得部分的聚合结果。

  2. 进行汇总的方式获得最终结果。

  1. explain select sum(c1) from t2 group by c2\G;
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. =============================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST|
  6. ---------------------------------------------
  7. |0 |HASH GROUP BY | |1000 |3395|
  8. |1 | EXCHANGE IN DISTR | |1000 |2834|
  9. |2 | EXCHANGE OUT DISTR| |1000 |2740|
  10. |3 | HASH GROUP BY | |1000 |2740|
  11. |4 | TABLE SCAN |t2 |4000 |499 |
  12. =============================================
  13. Outputs & filters:
  14. -------------------------------------
  15. 0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
  16. group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
  17. 1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  18. 2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  19. 3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
  20. group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
  21. 4 - output([t2.c1], [t2.c2]), filter(nil),
  22. access([t2.c1], [t2.c2]), partitions(p[0-3])
  1. explain select sum(c1) from t2 group by c2\G;
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. =============================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST|
  6. ---------------------------------------------
  7. |0 |HASH GROUP BY | |1000 |3395|
  8. |1 | EXCHANGE IN DISTR | |1000 |2834|
  9. |2 | EXCHANGE OUT DISTR| |1000 |2740|
  10. |3 | HASH GROUP BY | |1000 |2740|
  11. |4 | TABLE SCAN |t2 |4000 |499 |
  12. =============================================
  13. Outputs & filters:
  14. -------------------------------------
  15. 0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
  16. group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
  17. 1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  18. 2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
  19. 3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
  20. group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
  21. 4 - output([t2.c1], [t2.c2]), filter(nil),
  22. access([t2.c1], [t2.c2]), partitions(p[0-3])