我们已经对postgresql的执行计划比较熟了,那么在分布式数据库下,sql是如何执行的呢,我们通过分析greenplum来看一下mpp类型的数据库是如何进行处理的。
greenplum支持列存与orca优化器,这里我们先不去进行细致的分析,后续再进行分析。
greenplum工作机制简析
我们在执行计划分析前,先看一下greenplum的工作原理。在进行海量数据分析时,单机数据库能够承载的数据量是有限的,并且其计算能力也是受限与单机处理能力。为了进行海量的数据分析,其中一个思路就是一台机器不行,我就让更多的机器参与,将数据分布在多台机器上,多台机器同时进行运算,这样数据量和计算能力都得到了水平扩展。这就是mpp数据库的思想,分而治之。
gp是由一个master节点,多个segments节点构成,master节点负责执行计划解析以及分发、将segment节点的数据汇集等工作,segments承担数据存储与计算的工作。我们以最简单的表存储与查询为例:gp在建表时通过分布键将表切分分布在多个segment中,这个segment可以理解为一个postgresql数据库实例,这样表的大小不再首先与单机postgresql数据库能承载的大小,表数据非常大时,可通过增加节点数量解决。查询时,每个segment将各自节点数据进行查询,汇总在master节点,由master节点汇集返回给用户。
因为master节点对于我们理解gp的工作机制非常重要,对此我们先列出master节点的工作,方便我们接下来理解执行计划:
- 执行计划解析及分发
- 将子节点的数据汇集到一起
- 将所有segment的有序数据进行归并操作
- 聚合函数在master节点上进行最后的计算
- 需要有唯一的序列的功能(如开窗函数不带partiton by子句)
需要注意的是,greenplum优化器有2个,一个是pg的优化器,另一个是gporca,关于orca我们后续再进行分析,这里先分析pg优化器的情况。
-- optimizer = on, 则表示开启了orca优化器
postgres=# show optimizer;
optimizer
-----------
off
(1 row)
分布表、分布键
我们先建立一个分布表,
-- 建立分布表,分布键为a,分布方式默认为hash分布
postgres=# create table t1(a int primary key, b int) distributed by(a);
create table
-- 插入数据
postgres=# insert into t1 select s,s from generate_series(1,1000) s;
insert 0 1000
postgres=# create table t2(a int primary key, b int) distributed by(a);
create table
postgres=# insert into t2 select s,s from generate_series(1,1000) s;
insert 0 1000
全表扫描
我们先看最简单的全表扫描的是如何实现的,其逻辑应该是什么呢?先在各segment节点进行顺序扫描,再在master节点进行汇总。我们看一下其执行计划:
postgres=# explain select * from t1;
query plan
--------------------------------------------------------------------------------
gather motion 2:1 (slice1; segments: 2) (cost=0.00..21.02 rows=1001 width=8)
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=8)
optimizer: postgres query optimizer -- pg优化器,非orca优化器
(3 rows)
这个是基础,相比pg的执行计划,多了一个gather motion
的步骤,即聚合操作,在master节点上将segment节点的数据聚合起来。我们可以通过gp_segment_id,查看数据是位于那个segment上的:
-- 查询,可以看到,分布在不同节点的数据
postgres=# select gp_segment_id,* from t1 where mod(a, 100) = 0;
gp_segment_id | a | b
--------------- ------ ------
1 | 0 | 0
1 | 200 | 200
1 | 300 | 300
1 | 400 | 400
1 | 900 | 900
0 | 100 | 100
0 | 500 | 500
0 | 600 | 600
0 | 700 | 700
0 | 800 | 800
0 | 1000 | 1000
(11 rows)
多次执行,其结果的顺序并与固定不变的,原因是,master节点在聚合segment节点的数据时,不同segment节点数据到master节点的先后顺序不确定,master采用的策略是谁先到谁的数据先放在master上。
postgres=# select gp_segment_id,* from t1 where mod(a, 100) = 0;
gp_segment_id | a | b
--------------- ------ ------
0 | 100 | 100
0 | 500 | 500
0 | 600 | 600
0 | 700 | 700
0 | 800 | 800
0 | 1000 | 1000
1 | 0 | 0
1 | 200 | 200
1 | 300 | 300
1 | 400 | 400
1 | 900 | 900
(11 rows)
分布式执行计划中limit怎么实现
在分布式执行计划中,limit怎么实现呢?先不看执行计划,自己先想一下,思路就有了,那就是,每个segment节点先执行一遍limit,再聚合到master节点,在maste节点中再次执行一次limit。我们看一下执行计划,确实如此。
postgres=# explain select gp_segment_id,* from t1 limit 5;
query plan
------------------------------------------------------------------------------------
limit (cost=0.60..0.71 rows=5 width=12)
-> gather motion 2:1 (slice1; segments: 2) (cost=0.60..0.81 rows=10 width=12)
-> limit (cost=0.60..0.66 rows=5 width=12)
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=12)
optimizer: postgres query optimizer
(5 rows)
这次例子比较简单,却对我们理解分布式执行计划非常重要,因为其他的很多sql操作也是同样的思路。比如聚合函数
。我们以count函数为例,其思路就是先在segment节点计算各自的count,再聚合到master节点,由master节点完成最后的finalize aggregate
。
postgres=# explain select count(*) from t1;
query plan
----------------------------------------------------------------------------------
finalize aggregate (cost=7.30..7.31 rows=1 width=8)
-> gather motion 2:1 (slice1; segments: 2) (cost=7.26..7.30 rows=2 width=8)
-> partial aggregate (cost=7.26..7.27 rows=1 width=8)
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=0)
optimizer: postgres query optimizer
(5 rows)
排序sort的执行计划
全表排序在分布式下如何试下呢?很容易让我们联想到“分而治之”的算法思想,当数据量非常大时,我们通常采用“分而治之”的思想,将数据分片,在各自数据分片中完成部分排序,再汇总进行归并排序。我们看一下gp中的实现,就是这个思路:
postgres=# explain (costs off) select gp_segment_id,* from t1 order by b;
query plan
------------------------------------------
gather motion 2:1 (slice1; segments: 2)
merge key: b -- 由maste节点进行归并
-> sort -- segment节点进行部分排序
sort key: b
-> seq scan on t1
optimizer: postgres query optimizer
(6 rows)
-- 先sort,再limit
postgres=# explain select gp_segment_id,* from t1 order by b limit 5;
query plan
--------------------------------------------------------------------------------------
limit (cost=14.44..14.52 rows=5 width=12)
-> gather motion 2:1 (slice1; segments: 2) (cost=14.44..14.61 rows=10 width=12)
merge key: b
-> limit (cost=14.44..14.46 rows=5 width=12)
-> sort (cost=14.32..15.57 rows=500 width=12)
sort key: b
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=12)
optimizer: postgres query optimizer
(8 rows)
join的执行计划
join是非常重要的,分布式join如何实现呢?join有多种实现方式:
- hashjoin
- mergejoin
- nestloopjoin
hashjoin
我们先分析hashjoin,在进行join时,会有join的条件,比如
select * from t1,t2 where t1.a = t2.a;
这里判断条件为t1.a = t2.a
,因为表t1
,t2
的分布键都为a
,所以,等值的数据一定在同一个segment节点上,这种情况下就比较容易实现了,相当于在各自segment节点进行hashjoin,再在master节点上聚合就好了:
postgres=# explain (costs off) select * from t1,t2 where t1.a = t2.a;
query plan
------------------------------------------
gather motion 2:1 (slice1; segments: 2) -- master节点聚合数据
-> hash join -- segment节点上进行hashjoin
hash cond: (t1.a = t2.a)
-> seq scan on t1
-> hash
-> seq scan on t2
optimizer: postgres query optimizer
(7 rows)
但如果是下面这种情况呢?在数据分布的时候,表t2
是按a
进行分布的,但join的条件却是t1.a = t2.b
,这就存在问题了,满足join条件的t2
中的数据与t1
并不一定在同一个segment节点上,这样我们在各自segment上进行hashjion再汇集到master节点的算法实现就不满足条件,我们在实现算法的时候,需要牢记“分而治之”的思想。
select * from t1,t2 where t1.a = t2.b;
那么怎么办呢?我们可以进行重分布,以b为分布键重分布,这样就可以借鉴前面的实现思路了。执行计划如下:
postgres=# explain (costs off) select * from t1,t2 where t1.a = t2.b;
query plan
------------------------------------------------------------
gather motion 2:1 (slice1; segments: 2)
-> hash join
hash cond: (t2.b = t1.a)
-> redistribute motion 2:2 (slice2; segments: 2) -- 重分布
hash key: t2.b
-> seq scan on t2
-> hash
-> seq scan on t1
optimizer: postgres query optimizer
(9 rows)
与面前的执行计划相比,多了一个redistribute motion
的步骤,即重分布,数据按照新的分布键(关联键)重新打散到各segment上。
nestloopjoin
nestloop在gp中是如何实现的呢?这里就涉及到一个概念,广播,将每个segment上的某个表的数据全部发送给所有segment(需要注意,采用这种方式的表一般较小)。在实现nestloop时,将其中一个较小的表广播到所有节点,然后进行nestloop,再在master节点聚合所有数据。
citus中有reference table的概念,在每个节点上保存整张表完整的数据,与这里的广播起的作用类似。
postgres=# explain (costs off) select * from t1,t2;
query plan
---------------------------------------------------------
gather motion 2:1 (slice1; segments: 2)
-> nested loop
-> broadcast motion 2:2 (slice2; segments: 2) -- 广播
-> seq scan on t2
-> materialize
-> seq scan on t1
optimizer: postgres query optimizer
(7 rows)
mergejoin
采用mergejoin的话,需要先进行排序,再按照归并排序的方式进行join,执行计划如下:
postgres=# set enable_hashjoin = off;
set
-- 不涉及重分布
postgres=# explain (costs off) select * from t1,t2 where t1.a = t2.a;
query plan
--------------------------------------------
gather motion 2:1 (slice1; segments: 2)
-> merge join
merge cond: (t1.a = t2.a)
-> index scan using t1_pkey on t1 -- 索引扫描,相当于排序
-> index scan using t2_pkey on t2
optimizer: postgres query optimizer
(6 rows)
-- 需要进行重分布
postgres=# explain (costs off) select * from t1,t2 where t1.a = t2.b;
query plan
------------------------------------------------------------------
gather motion 2:1 (slice1; segments: 2)
-> merge join
merge cond: (t1.a = t2.b)
-> index scan using t1_pkey on t1
-> sort
sort key: t2.b
-> redistribute motion 2:2 (slice2; segments: 2)
hash key: t2.b
-> seq scan on t2
optimizer: postgres query optimizer
(10 rows)
group by执行计划
在ap场景中,group by是非常重要的,实现group by有两种方式,一种是hashaggregate,另一种是groupaggregate。
- hashaggregate: hash聚合,数据库会根据group by字段后面的值计算hash值,并根据前面使用的聚合函数在内存中维护相应的列表,然后数据库会通过这个列表来实现聚合操作。
- groupaggregate: group聚合,先将表中的数据按照group by的字段排序,这样同一个group by的值就在一起,再对排序好的数据进行一次全表扫描就可以得到聚合结果。
在gp中,因为有分布键的问题,在执行聚合时,需要考虑group by的字段是否为分布键两种情况,在group by的字段为分布键时,group by相同的字段一定在同一个segment中,所以,只需要在各segment节点各自进行hashaggregate,再在master节点聚合数据就可以了。为什么没有采用groupaggregate呢,因为它需要全表排序,在分布表时,生成分布执行计划,hashaggregate效率更高。
postgres=# explain select a,count(*) from t1 group by a;
query plan
---------------------------------------------------------------------------------
gather motion 2:1 (slice1; segments: 2) (cost=8.51..28.53 rows=1001 width=12)
-> hashaggregate (cost=8.51..13.51 rows=500 width=12)
group key: a
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=4)
optimizer: postgres query optimizer
(5 rows)
另一种情况是当group by的字段不是分布键时,这时候就需要重分布,将数据按group by 的字段作为分布键进行重分布, 再进行hashaggregate:
postgres=# explain select b,count(*) from t1 group by b;
query plan
-----------------------------------------------------------------------------------
gather motion 2:1 (slice1; segments: 2) (cost=18.52..38.54 rows=1001 width=12)
-> hashaggregate (cost=18.52..23.52 rows=500 width=12)
group key: b
-> redistribute motion 2:2 (slice2; segments: 2) (cost=0.00..16.02 rows=500 width=4)
hash key: b
-> seq scan on t1 (cost=0.00..6.00 rows=500 width=4)
optimizer: postgres query optimizer
(7 rows)
总结
以上对greenplum中常见的执行计划做了分析,让我们理解了分布式执行计划是如何实现的,当然,sql语句有非常多种情况,这里也无法一一做出分析,但处理的思想是一致的,那就是分而治之,这也是mpp数据库的精髓。后续在学习greenplum时,可以去研究orca优化器以及列存,这是相对于oltp数据库较为不同的地方,ap数据库与tp数据库的用户需求不同,所以在实现的方式上,一定有所不同,后续可以进行深入学习研究。