分布式连接是大数据量场景中很重要的一个查询优化执行手段,当两个或多个表的数据量比较大的时候,在执行两表或者多表的连接的时候,需要尽量的将join通过并行执行的方式以提高执行效率,减小查询的响应时间。
本节将就OceanBase支持的分布式连接的几种典型场景分别做一个介绍。
Partition-Wise Join
create table t2 (c1 int, c2 int) partition by key(c1) partitions 4;
create table t3 (c1 int, c2 int) partition by key(c1) partitions 4;
explain select * from t2, t3 where t2.c1 = t3.c1 and t2.c2=t3.c2\G
*************************** 1. row ***************************
Query Plan:
============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR | |63 |8374|
|1 | EXCHANGE OUT DISTR| |63 |8362|
|2 | HASH JOIN | |63 |8362|
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | TABLE SCAN |t3 |4000 |499 |
============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c1 = t3.c1], [t2.c2 = t3.c2]), other_conds(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
例1
Partition-wise join是指两表的连接条件包含了两表的分区键,并且两表的分区方式是一样的。以例1的查询来说,t2和t3都是在c1这个列上进行了key分区,分区总数为4个分区,他们的分区模式完全相同。当查询的条件为t2.c1=t3.c1 and t2.c2=t3.c2时,查询条件完全包含了分区键,查询可以以分区为单位在每个分区内进行。如果并行度为4的话,该查询可以同时做4个分区的join并且将最后结果输出。
以一边分区表的分区模式重新分布另一个表
同样以t2和t3表为例:
explain select * from t2, t3 where t2.c1 = t3.c2\G
*************************** 1. row ***************************
Query Plan:
=======================================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
-------------------------------------------------------
|0 |EXCHANGE IN DISTR | |31680 |35454|
|1 | EXCHANGE OUT DISTR | |31680 |29456|
|2 | HASH JOIN | |31680 |29456|
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | EXCHANGE IN DISTR | |4000 |878 |
|5 | EXCHANGE OUT DISTR (PKEY)| |4000 |499 |
|6 | TABLE SCAN |t3 |4000 |499 |
=======================================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c1 = t3.c2]), other_conds(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil)
5 - (#keys=1, [t3.c2]), output([t3.c1], [t3.c2]), filter(nil)
6 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
例2
例2的查询中,查询join条件覆盖了t2的分区键而没有覆盖t3的分区键,所以执行计划将会按照t2的分区模式,将t3的数据进行以partition key为目标的分组。在将t3的数据依照c2的值和t2的c1列的分区方式打散重分布后,可以以t2的分区为单位进行分区间的并行连接。
两个表都重新分布
explain select * from t2, t3 where t2.c2 = t3.c2\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
----------------------------------------------
|0 |HASH JOIN | |31680 |29835|
|1 | EXCHANGE IN DISTR | |4000 |878 |
|2 | EXCHANGE OUT DISTR| |4000 |499 |
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | EXCHANGE IN DISTR | |4000 |878 |
|5 | EXCHANGE OUT DISTR| |4000 |499 |
|6 | TABLE SCAN |t3 |4000 |499 |
==============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c2 = t3.c2]), other_conds(nil)
1 - output([t2.c1], [t2.c2]), filter(nil)
2 - output([t2.c1], [t2.c2]), filter(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil)
5 - output([t3.c1], [t3.c2]), filter(nil)
6 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
例3
对于同样的t2和t3表,如果连接条件既没有包含t2的分区键也没有包含t3的分区键,会生成如例3所示的计划。注意这个计划的join部分是在主线程完成。在后续的OceanBase版本中,这样的计划的join部分也会在工作线程以并行的方式执行。