OceanBase的优化器会分两阶段来生成分布式的执行计划。在第一阶段,不考虑数据的物理分布,生成所有基于本地关系优化的最优执行计划。在本地计划生成后,优化器会检查数据是否访问了多个分区,或者是否是本地单分区表但是用户用hint强制指定了采用并行查询执行。此时,将会根据执行计划树,在需要进行数据重分布的地方,插入Exchange节点,从而将原先的本地计划树变成分布式计划。
相关名词
job分布式计划以数据重分布点为边界,切分为可以并行执行的逻辑子计划,每个子计划由一个job进行封装。
root jobjob树最顶端的job。
普通jobjob树的其它job。
中间结果管理器(Intermediate Result Manager)用于缓存需要在不同job间传递的数据。
exchange算子用于进行跨job数据传递的算子,具体分为如下两种:
exchange out / transmit算子负责生产数据,并写入当前server的中间结果管理器中。
exchange in / receive算子负责消费数据,按需从目标server的中间结果管理器中读取。
task每个逻辑子计划(job)可以并行执行,每个并行执行的任务称为一个task。
主线程负责接收客户端请求、返回操作结果的线程,负责执行root job。
工作线程负责执行普通job的task。
调度线程负责调度job。
生成分布式计划
生成分布式计划的过程就是在原始计划树上寻找恰当位置插入exchange算子的过程,在自顶向下遍历计划树的时候,需要根据相应算子的数据处理的情况以及输入算子的数据分区情况,决定是否需要插入exchange算子。
以最简单的单表扫描为例:
explain select * from t2\G
*************************** 1. row ***************************
Query Plan:
============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR | |4000 |878 |
|1 | EXCHANGE OUT DISTR| |4000 |499 |
|2 | TABLE SCAN |t2 |4000 |499 |
============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2]), filter(nil)
1 - output([t2.c1], [t2.c2]), filter(nil)
2 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
当t2是一个分区表,可以在table scan上插入配对的exchange算子,从而将table scan和exchange out封装成一个job,可以用于并行的执行。
单输入可下压算子
主要包括 aggregation,sort,group by和limit算子等,除了limit算子以外,其余所列举的算子都会有一个操作的键,如果操作的键和输入数据的数据分布是一致的,则可以做一阶段聚合操作,也即Partition Wise Aggregation。如果操作的键和输入数据的数据分布是不一致的,则需要做两阶段聚合操作,聚合算子需要做下压操作。如下面的例子所示:
- 一阶段聚合
explain select sum(c2) from t2 group by c1\G
*************************** 1. row ***************************
Query Plan:
============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR | |1000 |2834|
|1 | EXCHANGE OUT DISTR| |1000 |2740|
|2 | HASH GROUP BY | |1000 |2740|
|3 | TABLE SCAN |t2 |4000 |499 |
============================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(t2.c2)]), filter(nil)
1 - output([T_FUN_SUM(t2.c2)]), filter(nil)
2 - output([T_FUN_SUM(t2.c2)]), filter(nil),
group([t2.c1]), agg_func([T_FUN_SUM(t2.c2)])
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
- 二阶段聚合
explain select sum(c1) from t2 group by c2\G
*************************** 1. row ***************************
Query Plan:
=============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
---------------------------------------------
|0 |HASH GROUP BY | |1000 |3395|
|1 | EXCHANGE IN DISTR | |1000 |2834|
|2 | EXCHANGE OUT DISTR| |1000 |2740|
|3 | HASH GROUP BY | |1000 |2740|
|4 | TABLE SCAN |t2 |4000 |499 |
=============================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.c1))]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.c1))])
1 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
2 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil)
3 - output([T_FUN_SUM(t2.c1)], [t2.c2]), filter(nil),
group([t2.c2]), agg_func([T_FUN_SUM(t2.c1)])
4 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
二元输入算子
在OceanBase当前版本中,主要考虑join算子的情况。在OceanBase 1.x版本中,还没有引入代价模型进行分布式计划的优化,主要还是基于规则来生成分布式的计划和数据重分布方法。对于join来说,主要有三种方式。
- partition-wise join 当左右表都是分区表且分区方式相同,物理分布一样,且join的连接条件为分区键时,可以使用以partition为单位的连接方法。计划如下所示:
explain select * from t2, t3 where t2.c1 = t3.c1\G
*************************** 1. row ***************************
Query Plan:
=============================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
---------------------------------------------
|0 |EXCHANGE IN DISTR | |31680 |35075|
|1 | EXCHANGE OUT DISTR| |31680 |29077|
|2 | HASH JOIN | |31680 |29077|
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | TABLE SCAN |t3 |4000 |499 |
=============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c1 = t3.c1]), other_conds(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
- partial partition-wise join 当左右表中一个表为分区表,另一个表为非分区表,或者两者皆为分区表但是连接键仅和其中一个分区表的分区键相同的情况下,会以该分区表的分区分布为基准,重新分布另一个表的数据。计划如下例所示:
explain select * from t1, t2 where t1.c1 = t2.c1\G
*************************** 1. row ***************************
Query Plan:
======================================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
------------------------------------------------------
|0 |EXCHANGE IN DISTR | |24 |1977|
|1 | EXCHANGE OUT DISTR | |24 |1973|
|2 | HASH JOIN | |24 |1973|
|3 | EXCHANGE IN DISTR | |3 |37 |
|4 | EXCHANGE OUT DISTR (PKEY)| |3 |37 |
|5 | TABLE SCAN |t1 |3 |37 |
|6 | TABLE SCAN |t2 |4000 |499 |
======================================================
Outputs & filters:
-------------------------------------
0 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil)
1 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil)
2 - output([t1.c1], [t1.c2], [t2.c1], [t2.c2]), filter(nil),
equal_conds([t1.c1 = t2.c1]), other_conds(nil)
3 - output([t1.c1], [t1.c2]), filter(nil)
4 - (#keys=1, [t1.c1]), output([t1.c1], [t1.c2]), filter(nil)
5 - output([t1.c1], [t1.c2]), filter(nil),
access([t1.c1], [t1.c2]), partitions(p0)
6 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
- 左右表都需要进行数据重分布当连接键和左右表的分区键都没有关系的情况下,由于一些实现的限制,当前会生成将左右表的数据都重新分布到一台计算节点上再执行连接的计划。未来会将此种计划的join算子也能完全并行执行。生成的计划如下例所示:
explain select * from t2, t3 where t2.c2 = t3.c2\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
----------------------------------------------
|0 |HASH JOIN | |31680 |29835|
|1 | EXCHANGE IN DISTR | |4000 |878 |
|2 | EXCHANGE OUT DISTR| |4000 |499 |
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | EXCHANGE IN DISTR | |4000 |878 |
|5 | EXCHANGE OUT DISTR| |4000 |499 |
|6 | TABLE SCAN |t3 |4000 |499 |
==============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c2 = t3.c2]), other_conds(nil)
1 - output([t2.c1], [t2.c2]), filter(nil)
2 - output([t2.c1], [t2.c2]), filter(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil)
5 - output([t3.c1], [t3.c2]), filter(nil)
6 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
多元输入算子
OceanBase 1.0暂时没有支持多元输入算子的并行执行。