简介
分布式连接是大数据量场景中很重要的一个查询优化执行手段,当两个或多个表的数据量比较大的时候,在执行两表或者多表的连接的时候,需要尽量的将 join 通过并行执行的方式以提高执行效率,减小查询的响应时间。
本节主要介绍 OceanBase 支持的分布式连接的几种典型场景。
Partition-Wise Join 场景
Partition-wise join 是指两表的连接条件包含了两表的分区键,并且两表的分区方式是一样的。
在如下例所示,t2 和t3 都是在 c1 这个列上进行了 key 分区,分区总数为4个分区,分区模式完全相同。当查询的条件为 t2.c1=t3.c1 and t2.c2=t3.c2时,查询条件完全包含了分区键,查询可以以分区为单位在每个分区内进行。如果并行度为4的话,该查询可以同时做4个分区的join并且将最后结果输出。
create table t2 (c1 int, c2 int) partition by key(c1) partitions 4;
create table t3 (c1 int, c2 int) partition by key(c1) partitions 4;
explain select * from t2, t3 where t2.c1 = t3.c1 and t2.c2=t3.c2\G
*************************** 1. row ***************************
Query Plan:
============================================
|ID|OPERATOR |NAME|EST. ROWS|COST|
--------------------------------------------
|0 |EXCHANGE IN DISTR | |63 |8374|
|1 | EXCHANGE OUT DISTR| |63 |8362|
|2 | HASH JOIN | |63 |8362|
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | TABLE SCAN |t3 |4000 |499 |
============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c1 = t3.c1], [t2.c2 = t3.c2]), other_conds(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
以一边分区表的分区模式重新分布另一个表的场景
同样以 t2 和 t3 表为例,如下例所示,查询 join 条件覆盖了 t2 的分区键而没有覆盖 t3 的分区键,所以执行计划将会按照 t2 的分区模式,将 t3 的数据进行以 partition key 为目标的分组。在将 t3 的数据依照 c2 的值和 t2 的 c1 列的分区方式打散重分布后,可以以 t2 的分区为单位进行分区间的并行连接。
explain select * from t2, t3 where t2.c1 = t3.c2\G
*************************** 1. row ***************************
Query Plan:
=======================================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
-------------------------------------------------------
|0 |EXCHANGE IN DISTR | |31680 |35454|
|1 | EXCHANGE OUT DISTR | |31680 |29456|
|2 | HASH JOIN | |31680 |29456|
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | EXCHANGE IN DISTR | |4000 |878 |
|5 | EXCHANGE OUT DISTR (PKEY)| |4000 |499 |
|6 | TABLE SCAN |t3 |4000 |499 |
=======================================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c1 = t3.c2]), other_conds(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil)
5 - (#keys=1, [t3.c2]), output([t3.c1], [t3.c2]), filter(nil)
6 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])
两个表都重新分布的场景
对于同样的 t2 和 t3 表,如果连接条件既没有包含 t2 的分区键也没有包含 t3 的分区键,会生成如下例所示的计划。注意这个计划的 join 部分是在主线程完成。在后续的 OceanBase 版本中,这样的计划的 join 部分也会在工作线程以并行的方式执行。
explain select * from t2, t3 where t2.c2 = t3.c2\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME|EST. ROWS|COST |
----------------------------------------------
|0 |HASH JOIN | |31680 |29835|
|1 | EXCHANGE IN DISTR | |4000 |878 |
|2 | EXCHANGE OUT DISTR| |4000 |499 |
|3 | TABLE SCAN |t2 |4000 |499 |
|4 | EXCHANGE IN DISTR | |4000 |878 |
|5 | EXCHANGE OUT DISTR| |4000 |499 |
|6 | TABLE SCAN |t3 |4000 |499 |
==============================================
Outputs & filters:
-------------------------------------
0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
equal_conds([t2.c2 = t3.c2]), other_conds(nil)
1 - output([t2.c1], [t2.c2]), filter(nil)
2 - output([t2.c1], [t2.c2]), filter(nil)
3 - output([t2.c1], [t2.c2]), filter(nil),
access([t2.c1], [t2.c2]), partitions(p[0-3])
4 - output([t3.c1], [t3.c2]), filter(nil)
5 - output([t3.c1], [t3.c2]), filter(nil)
6 - output([t3.c1], [t3.c2]), filter(nil),
access([t3.c1], [t3.c2]), partitions(p[0-3])