数据库中的聚合算子,例如sum, avg, group by, sort等等,会耗费大量的CPU和IO资源进行计算和读写,通过并行执行的方式,将对应的算子下压到各个计算节点上,充分利用集群的计算资源,提升执行效率。

    总的来看,聚合算子的下压可以分为两种计划

    Partition-Wise Aggregation

    示例1:

    1. explain select sum(c2) from t2 group by c1\G
    2. *************************** 1. row ***************************
    3. Query Plan:
    4. ============================================
    5. |ID|OPERATOR |NAME|EST. ROWS|COST|
    6. --------------------------------------------
    7. |0 |EXCHANGE IN DISTR | |1000 |2834|
    8. |1 | EXCHANGE OUT DISTR| |1000 |2740|
    9. |2 | HASH GROUP BY | |1000 |2740|
    10. |3 | TABLE SCAN |t2 |4000 |499 |
    11. ============================================
    12. Outputs & filters:
    13. -------------------------------------
    14. 0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
    15. 1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
    16. 2 - output([T_FUN_SUM(t2.c2)]), filter(nil),
    17. group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
    18. 3 - output([t2.c1], [t2.c2]), filter(nil),
    19. access([t2.c1], [t2.c2]), partitions(p[0-3])

    如果聚合查询的语意是按照分区键去做分组且在分组内做聚合,那么这样的聚合操作称为partition-wise aggregation。所有的操作都被下压到分区内并行的执行。需要注意的是,partition-wise aggregation不一定是最优的执行,因为并行度会受限于分区的数量。在OceanBase 1.x版本中,对于这样的情况,暂时没有做别的替代计划的实现。

    下压加两阶段聚合

    示例2:

    1. explain select sum(c1) from t2 group by c2\G
    2. *************************** 1. row ***************************
    3. Query Plan:
    4. =============================================
    5. |ID|OPERATOR |NAME|EST. ROWS|COST|
    6. ---------------------------------------------
    7. |0 |HASH GROUP BY | |1000 |3395|
    8. |1 | EXCHANGE IN DISTR | |1000 |2834|
    9. |2 | EXCHANGE OUT DISTR| |1000 |2740|
    10. |3 | HASH GROUP BY | |1000 |2740|
    11. |4 | TABLE SCAN |t2 |4000 |499 |
    12. =============================================
    13. Outputs & filters:
    14. -------------------------------------
    15. 0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
    16. group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
    17. 1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
    18. 2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
    19. 3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
    20. group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
    21. 4 - output([t2.c1], [t2.c2]), filter(nil),
    22. access([t2.c1], [t2.c2]), partitions(p[0-3])

    在更一般的情况下,aggregation操作可能不是按照分区键作为分组来做的,在这样的情况下,OceanBase会采用如例2所示的两阶段聚合的方式,将aggregation操作下压,获得部分的聚合结果,再进行汇总的方式获得最终结果。需要指出的是,在目前,两阶段聚合的第二阶段不是并行执行的,在OceanBase以后的版本中,会将两阶段聚合的第二阶段也并行执行。