马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
在《OceanBase 数据库源码解析》这本书中,关于SQL执行器的深入分析相对较少,因此,希望增加一些实用且详尽的补充内容。
上一篇博客《 OceanBase技能解析: 执行器中的自顺应技能》中,已开端介绍了执行器中几项典范的自顺应技能,但其中对于hash group by 中的两阶段下压技能,我是基于大家已有一定相识的前提睁开了阐述。若你在执行器的多阶段下压技能方面尚存疑惑,欢迎阅读这篇博客,来一起学习一下 OceanBase 中比较常见的几种自顺应分布式下压技能。
什么是分布式下压
在分布式执行的过程中,为了更好地利用并行的能力,降低 CPU 和网络的开销,优化器天生筹划的过程中,通常会将部分算子下压到更下层的各个计算节点上。目的是为了充分利用集群的计算资源,提升执行服从。这次就来介绍下 OceanBase 里面最常见的几种分布式下压技能。
LIMIT 下压
我们先介绍一下 limit 的下压。举一个简单的例子,这两条 SQL 是创建一个 orders 表,并从 orders 表中读 100 行数据。
- CREATE TABLE `orders` (
- `o_orderkey` bigint(20) NOT NULL,
- `o_custkey` bigint(20) NOT NULL,
- `o_orderdate` date NOT NULL,
- PRIMARY KEY (`o_orderkey`, `o_orderdate`, `o_custkey`),
- KEY `o_orderkey` (`o_orderkey`) LOCAL BLOCK_SIZE 16384
- ) partition by range columns(o_orderdate)
- subpartition by hash(o_custkey) subpartitions 64
- (partition ord1 values less than ('1992-01-01'),
- partition ord2 values less than ('1992-02-01'),
- partition ord3 values less than ('1992-03-01'),
- partition ord77 values less than ('1998-05-01'),
- partition ord78 values less than ('1998-06-01'),
- partition ord79 values less than ('1998-07-01'),
- partition ord80 values less than ('1998-08-01'),
- partition ord81 values less than (MAXVALUE));
- select * from orders limit 100;
复制代码
图中的筹划是分布式下压的一个很常见的场景:
- explain select * from orders limit 100;
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
- | Query Plan |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
- | ================================================================= |
- | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
- | ----------------------------------------------------------------- |
- | |0 |LIMIT | |1 |2794 | |
- | |1 |└─PX COORDINATOR | |1 |2794 | |
- | |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
- | |3 | └─LIMIT | |1 |2792 | |
- | |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
- | |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
- | ================================================================= |
- | Outputs & filters: |
- | ------------------------------------- |
- | 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | limit(100), offset(nil) |
- | 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | dop=1 |
- | 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | limit(100), offset(nil) |
- | 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
- | force partition granule |
- | 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
- | access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
- | p6sp[0-63], p7sp[0-63]) |
- | limit(100), offset(nil), is_index_back=false, is_global_index=false, |
- | range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
复制代码 可以看到筹划中有两个 limit 算子(1 号和 3 号)。通过下压天生 3 号 limit 算子,可以降低对 5 号 table scan 对 orders 每个分区的扫描行数,让每个 table scan 的线程最多只扫描 100 行数据,如许可以降低 table scan 扫描数据的开销以及发送数据到 1 号算子进行汇总的网络开销。现在 OB 的一个 exchange 算子是从下层收到 64K 数据以后发一个包,limit 假如不下压的话可能会多扫描很多的数据,并且带来很大的网络开销。
真实的场景中,limit 通常伴随着 order by。假如在前面的例子中加上 order by 关键字,order by 加 limit 会在筹划中天生一个 top-n sort 算子,它的性能是比 sort 要好很多的。
- explain select * from orders order by o_orderdate limit 100;
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
- | Query Plan |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
- | ================================================================= |
- | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
- | ----------------------------------------------------------------- |
- | |0 |LIMIT | |1 |2794 | |
- | |1 |└─PX COORDINATOR MERGE SORT | |1 |2794 | |
- | |2 | └─EXCHANGE OUT DISTR |:EX10000|1 |2793 | |
- | |3 | └─TOP-N SORT | |1 |2792 | |
- | |4 | └─PX PARTITION ITERATOR| |1 |2792 | |
- | |5 | └─TABLE FULL SCAN |orders |1 |2792 | |
- | ================================================================= |
- | Outputs & filters: |
- | ------------------------------------- |
- | 0 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | limit(100), offset(nil) |
- | 1 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | sort_keys([orders.o_orderdate, ASC]) |
- | 2 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | dop=1 |
- | 3 - output([orders.o_orderkey], [orders.o_custkey], [orders.o_orderdate]), filter(nil) |
- | sort_keys([orders.o_orderdate, ASC]), topn(100) |
- | 4 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
- | force partition granule |
- | 5 - output([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), filter(nil) |
- | access([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], |
- | p6sp[0-63], p7sp[0-63]) |
- | is_index_back=false, is_global_index=false, |
- | range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
复制代码 假如上面的 limit 不下压的话,3 号算子就会变成 sort 算子,每个线程需要将自己扫描的所有数据排序以后发送给上层的 DFO(DFO 就是一个子筹划,相邻的 DFO 之间以 exchange 算子作为分割,详见:OceanBase分布式数据库-海量数据 笔笔算数)。
limit 下压的作用,就是能够提前结束执行,减少计算和网络的开销。
AGGREGATION 下压
下面介绍一下聚合中的分布式下压,以这条 group by 语句为例:
- select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
复制代码
这条 SQL 查询了每一天的订单数和销售额,假如希望并行地执行这条 SQL 的话,最直接的想法肯定是让表中数据根据 group by 列(o_orderdate)的 hash 值进行数据的分发,因为如许可以确保 o_orderdate 值相同的行都被发送到了同一个线程,各个线程可以并行地对收到的数据去进行聚合。
但是这个筹划的一个弊端是要对表中所有的数据都要做一次 shuffle 网络的开销可能很大;还有一个题目是假如表中存在数据倾斜比如某一天的订单特殊多,那么处理负责处理这一天订单的线程的工作量就会比其他线程多很多,这个长尾的任务可能直接导致这个查询的执行时间特殊长。
为了办理上述这些题目,我们会对 group by 算子进行下压,天生如许一个筹划:
- explain select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------+
- | Query Plan |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------+
- | ===================================================================== |
- | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
- | --------------------------------------------------------------------- |
- | |0 |PX COORDINATOR | |1 |2796 | |
- | |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |2795 | |
- | |2 | └─HASH GROUP BY | |1 |2795 | |
- | |3 | └─EXCHANGE IN DISTR | |1 |2794 | |
- | |4 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|1 |2794 | |
- | |5 | └─HASH GROUP BY | |1 |2793 | |
- | |6 | └─PX PARTITION ITERATOR| |1 |2792 | |
- | |7 | └─TABLE FULL SCAN |orders |1 |2792 | |
- | ===================================================================== |
- | Outputs & filters: |
- | ------------------------------------- |
- | 0 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
- | 1 - output([INTERNAL_FUNCTION(T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice)), T_FUN_SUM(T_FUN_SUM(orders.o_totalprice)))]), filter(nil) |
- | dop=1 |
- | 2 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]), filter(nil) |
- | group([orders.o_orderdate]), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(orders.o_totalprice))], [T_FUN_SUM(T_FUN_SUM(orders.o_totalprice))]) |
- | 3 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
- | 4 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
- | (#keys=1, [orders.o_orderdate]), dop=1 |
- | 5 - output([orders.o_orderdate], [T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]), filter(nil) |
- | group([orders.o_orderdate]), agg_func([T_FUN_COUNT(orders.o_totalprice)], [T_FUN_SUM(orders.o_totalprice)]) |
- | 6 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
- | force partition granule |
- | 7 - output([orders.o_orderdate], [orders.o_totalprice]), filter(nil) |
- | access([orders.o_orderdate], [orders.o_totalprice]), partitions(p0sp[0-63], p1sp[0-63], p2sp[0-63], p3sp[0-63], p4sp[0-63], p5sp[0-63], p6sp[0-63], |
- | p7sp[0-63]) |
- | is_index_back=false, is_global_index=false, |
- | range_key([orders.o_orderkey], [orders.o_orderdate], [orders.o_custkey]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true |
- +-----------------------------------------------------------------------------------------------------------------------------------------------------------+
复制代码 在这个筹划里面,每个线程在进行数据的分发之前会先对自己读取的这部分数据进行预聚合,也就是筹划里面的 5 号 group by 算子做的工作。然后 5 号算子将聚合的效果发送给上层的算子,之后上层的 2号 group by 算子会对收到的数据再进一次聚合。因为经过这个 5 号 group by 算子的提前聚合之后,数据量一般都会大幅降低,如许即可以降低数据 shuffle 带来的网络开销,也能降低数据倾斜对执行时间的影响。
接下来展示一下具体的执行过程来进行说明,还是刚才那条熟悉的 SQL,求每一天的订单数和销售额。
- select count(o_totalprice), sum(o_totalprice) from orders group by o_orderdate;
复制代码 原始的数据共有 7 行,每笔订单的销售额都是 10 元,它分布在 1、2、3 号这三天里面。
下图中展示了执行的过程,这里我们把并行度设置为 2:
我们从左往右看,可以看到左上方的第一个线程扫描了 3 行的数据,左下方的第二个线程扫描了 4 行数据。日期相同的数据,也就是同一组的数据,都被标成了相同的颜色。
先看左侧,第一个线程对其扫描的 3 行数据去进行聚合,这 3 行数据分布在两个 group 里面,6 月 1 号有 2 行,6 月 3 号有 1 行。因为 6 月 1 号有 2 行,以是它的 count 是 2,销售额是 20。6 月 3 号有 1 行,它的 count 是 1,销售额是10。第二个线程扫描的 4 行数据也分布在两个 group 里面,聚合后也天生了两行数据,这里不再赘述。这部分工尴尬刁难应筹划里的 5 号算子。
然后这两个线程利用 o_orderdate 列的 hash 值进行数据的分发,让同一天的数据都发送到同一个线程。这部分工尴尬刁难应筹划里的 3 号和 4 号算子。
右侧的每个线程对收到的数据会再进行一次聚合。可以看到左边两个线程中 6 月 3 号的数据(红色)都被发送到了右下方这个线程里,这两行从左侧差别线程发过来的 6 月 3 号的数据被右侧的算子再次进行聚合,count 和 sum 都再次被相加,count 变成了2,sum 变成了 20,终极被聚合成了一行。这部分工尴尬刁难应筹划里的 2 号算子。
然后所有的数据都会发送给协调者,协调者对数据进行汇总之后,将终极的计算效果发送给客户端。
JOIN FILTER 下压
join 算子中也会把左表的过滤条件 join filter 下压到右表,对右表的数据进行提前过滤和分区裁剪。
提前过滤
hash join 在执行的时间,总是先读左表的数据,创建一个哈希表。然后用右侧的数据去探测这个哈希表,假如探测成功的话就会把这个数据发送给上层算子。这里存在的一个题目就是假如 hash join 的右侧存在一个数据 reshuffle(重分布)的话,网络的开销可能比较大,这个开销标取决于右表的数据量巨细。在这种情况下,我们可以通过 join filter 来降低数据 shuffle 的网络开销。
以这个筹划为例:
在上面这个筹划中,2 号的 hash join 算子从左侧读数据,读的时间会利用 t1.c1 这个连接键创建一个 join filter,就是筹划中的这个 3 号 join filter create 算子。join filter 最常见的一个形式是 bloom filter,join filter 创建完成以后会被发送到 hash join 右侧这个 DFO(6 号算子以及更下层的算子)。
可以看到,10 号的这个 table scan 上面有一个过滤条件 sys_op_bloom_filter(t2.c1),表现会用 bloom_filter 对 hash join 右表 t2.c1 的值去进行一个快速的探测。假如探测失败的话说明不存在 t2.c1 跟这个 t1.c1 相等,那么这行数据可以直接被提前过滤掉,不需要向上发送给 hash join。
分区裁剪
join filter 不仅可以对行进行过滤,还可以用于分区裁剪,即对分区进行过滤。假如 t1 是一个分区表,并且连接键是它的分区键的话,那么可以天生如许的筹划:
可以看到这个筹划里,3 号是一个 partition join filter create 算子,它会感知 hash join 右边的 t1 表的分区方式,它每从下层获取一行左表的数据,就会用 c1 的值去计算这行数据在右表 t1 表里的哪个分区里面,并将这个 partition id 记录到 join filter 里。终极这个 partition id 的 join filter 会在 8 号算子上用于 hash join 右表的分区裁剪。右表扫描每一个分区之前都会检查这个 partition id 是否存在于 join filter 中,假如不存在的话,可以直接跳过整分区的扫描。
join filter 可以提前对数据进行过滤、提前对分区进行裁剪,降低了扫描数据、网络传输和探测 hash 表的开销。现在 4.2 之前只支持 bloom filter 这一种类型的 join filter。4.2上新支持了 in filter 和 range filter 这两种类型的 join filter,这两种新的 join filter 在一些场景中对性能有很好的提升,特殊是在左表差别值的个数较少或者是左表值一连的场景。
其他的分布式下压
除了上述介绍的几个比较常见也比较易于理解的分布式下压技能,OceanBase 还支持更多的自顺应分布式下压,例如: window function 的自顺应两阶段下压、三阶段的聚合下压等等。
OceanBase 中这些更为复杂的分布式下压技能,由于精力所限,就不再一一详细介绍。下面会贴一下刚才提到的两种分布式下压的执行筹划,供有兴趣的同学去进行更深入的研究。
window function 的自顺应两阶段下压:
- select /*+parallel(3) */
- c1, sum(c2) over (partition by c1) from t1 order by c1;
- Query Plan
- ===================================================
- |ID|OPERATOR |NAME |
- ---------------------------------------------------
- |0 |PX COORDINATOR MERGE SORT | |
- |1 | EXCHANGE OUT DISTR |:EX10001|
- |2 | MATERIAL | |
- |3 | WINDOW FUNCTION CONSOLIDATOR | |
- |4 | EXCHANGE IN MERGE SORT DISTR | |
- |5 | EXCHANGE OUT DISTR (HASH HYBRID)|:EX10000|
- |6 | WINDOW FUNCTION | |
- |7 | SORT | |
- |8 | PX BLOCK ITERATOR | |
- |9 | TABLE SCAN |t1 |
- ===================================================
复制代码 三阶段的聚合下压:
- select /*+ parallel(2) */
- c1, sum(distinct c2),count(distinct c3), sum(c4) from t group by c1;
- Query Plan
- ===========================================================================
- |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
- ---------------------------------------------------------------------------
- |0 |PX COORDINATOR | |1 |8 |
- |1 |└─EXCHANGE OUT DISTR |:EX10002|1 |7 |
- |2 | └─HASH GROUP BY | |1 |6 |
- |3 | └─EXCHANGE IN DISTR | |2 |6 |
- |4 | └─EXCHANGE OUT DISTR (HASH) |:EX10001|2 |6 |
- |5 | └─HASH GROUP BY | |2 |4 |
- |6 | └─EXCHANGE IN DISTR | |2 |4 |
- |7 | └─EXCHANGE OUT DISTR (HASH)|:EX10000|2 |3 |
- |8 | └─HASH GROUP BY | |2 |2 |
- |9 | └─PX BLOCK ITERATOR | |1 |1 |
- |10| └─TABLE FULL SCAN |t |1 |1 |
- ===========================================================================
复制代码
下回预报
这篇博客给大家介绍了 OceanBase 执行器中几个比较具有代表性的分布式下压技能,但是已经假设大家对数据库的分布式执行技能有所相识。假如大家对执行器的并行执行技能还不是特殊相识,请等候下一篇博客《OceanBase 并行执行技能》。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |