背景

PostgreSQL 9.6开始就支持并行计算了,意味着聚合、扫描、排序、JOIN等都开始支持并行计算。对于聚合操作来说,并行计算与非并行计算是有差异的。

例如avg聚合,对一张表进行计算时,一个任务中操作和多个并行任务操作,算法是不一样的。

PostgreSQL提供了一套标准的接口,可以支持聚合函数的并行操作。

自定义并行聚合的原理和例子

创建聚合函数的语法如下:

  1. CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (
  2. SFUNC = sfunc,
  3. STYPE = state_data_type
  4. [ , SSPACE = state_data_size ]
  5. [ , FINALFUNC = ffunc ]
  6. [ , FINALFUNC_EXTRA ]
  7. [ , COMBINEFUNC = combinefunc ]
  8. [ , SERIALFUNC = serialfunc ]
  9. [ , DESERIALFUNC = deserialfunc ]
  10. [ , INITCOND = initial_condition ]
  11. [ , MSFUNC = msfunc ]
  12. [ , MINVFUNC = minvfunc ]
  13. [ , MSTYPE = mstate_data_type ]
  14. [ , MSSPACE = mstate_data_size ]
  15. [ , MFINALFUNC = mffunc ]
  16. [ , MFINALFUNC_EXTRA ]
  17. [ , MINITCOND = minitial_condition ]
  18. [ , SORTOP = sort_operator ]
  19. [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]
  20. )

相比非并行,多了一个过程,那就是combinefunc的过程(也叫partial agg)。

非并行模式的聚合流程大致如下:

  1. 循环
  2. sfunc( internal-state, next-data-values ) ---> next-internal-state
  3. 最后调用一次(可选)
  4. ffunc( internal-state ) ---> aggregate-value

pic

并行模式的聚合流程大致如下,如果没有写combinefunc,那么实际上聚合过程并没有实现并行而只是扫描并行:

pic

下面这个例子,我们可以观察到一个COUNT操作的并行聚合。

  1. postgres=# set max_parallel_workers=4;
  2. SET
  3. postgres=# set max_parallel_workers_per_gather =4;
  4. SET
  5. postgres=# set parallel_setup_cost =0;
  6. SET
  7. postgres=# set parallel_tuple_cost =0;
  8. SET
  9. postgres=# alter table test set (parallel_workers =4);
  10. ALTER TABLE
  11. postgres=# explain (analyze,verbose,timing,costs,buffers) select count(*) from test;
  12. QUERY PLAN
  13. -----------------------------------------------------------------------------------------------------------------------------------------------
  14. -- final并行,可有可无,看具体的聚合算法
  15. Finalize Aggregate (cost=15837.02..15837.03 rows=1 width=8) (actual time=57.296..57.296 rows=1 loops=1)
  16. Output: count(*)
  17. Buffers: shared hit=3060
  18. -> Gather (cost=15837.00..15837.01 rows=4 width=8) (actual time=57.287..57.292 rows=5 loops=1)
  19. Output: (PARTIAL count(*))
  20. Workers Planned: 4
  21. Workers Launched: 4
  22. Buffers: shared hit=3060
  23. -- 一下就是combinefunc完成的聚合并行(显示为PARTIAL agg)
  24. -> Partial Aggregate (cost=15837.00..15837.01 rows=1 width=8) (actual time=52.333..52.333 rows=1 loops=5)
  25. Output: PARTIAL count(*)
  26. Buffers: shared hit=12712
  27. Worker 0: actual time=50.917..50.918 rows=1 loops=1
  28. Buffers: shared hit=2397
  29. Worker 1: actual time=51.293..51.294 rows=1 loops=1
  30. Buffers: shared hit=2423
  31. Worker 2: actual time=51.062..51.063 rows=1 loops=1
  32. Buffers: shared hit=2400
  33. Worker 3: actual time=51.436..51.436 rows=1 loops=1
  34. Buffers: shared hit=2432
  35. -> Parallel Seq Scan on public.test (cost=0.00..15212.00 rows=250000 width=0) (actual time=0.010..30.499 rows=200000 loops=5)
  36. Buffers: shared hit=12712
  37. Worker 0: actual time=0.013..30.343 rows=190269 loops=1
  38. Buffers: shared hit=2397
  39. Worker 1: actual time=0.010..30.401 rows=192268 loops=1
  40. Buffers: shared hit=2423
  41. Worker 2: actual time=0.013..30.467 rows=190350 loops=1
  42. Buffers: shared hit=2400
  43. Worker 3: actual time=0.009..30.221 rows=192861 loops=1
  44. Buffers: shared hit=2432
  45. Planning time: 0.074 ms
  46. Execution time: 60.169 ms
  47. (31 rows)

了解了并行聚合的原理后,我们就可以写自定义聚合函数的并行计算了。

例子

例如我们要支持一个数组的聚合,并且在聚合过程中我们要实现对元素去重。

1、创建测试表

  1. create table test(id int, col int[]);

2、生成测试数据

  1. CREATE OR REPLACE FUNCTION public.gen_arr(integer, integer)
  2. RETURNS integer[]
  3. LANGUAGE sql
  4. STRICT
  5. AS $function$
  6. select array(select ($1*random())::int from generate_series(1,$2));
  7. $function$;
  8. insert into test select random()*1000, gen_arr(500,10) from generate_series(1,10000);

3、创建聚合函数

例子1,没有combinefunc,只支持扫描并行。

数组去重函数

  1. postgres=# create or replace function uniq(int[]) returns int[] as $$
  2. select array( select unnest($1) group by 1);
  3. $$ language sql strict parallel safe;
  4. CREATE FUNCTION

数组合并与去重函数

  1. postgres=# create or replace function array_uniq_cat(anyarray,anyarray) returns anyarray as $$
  2. select uniq(array_cat($1,$2));
  3. $$ language sql strict parallel safe;
  4. CREATE FUNCTION

聚合函数(不带COMBINEFUNC)

  1. create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, PARALLEL=safe);

并行查询例子:

  1. postgres=# set max_parallel_workers=4;
  2. SET
  3. postgres=# set max_parallel_workers_per_gather =4;
  4. SET
  5. postgres=# set parallel_setup_cost =0;
  6. SET
  7. postgres=# set parallel_tuple_cost =0;
  8. SET
  9. postgres=# alter table test set (parallel_workers =4);
  10. ALTER TABLE
  11. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;

很明显没有设置COMBINEFUNC时,未使用并行聚合。

  1. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;
  2. QUERY PLAN
  3. -----------------------------------------------------------------------------------------------------------------------------------
  4. HashAggregate (cost=4139.74..4141.74 rows=200 width=36) (actual time=602.957..603.195 rows=1001 loops=1)
  5. Output: id, arragg(col)
  6. Group Key: test.id
  7. Buffers: shared hit=6
  8. -> Gather (cost=0.00..163.37 rows=15748 width=36) (actual time=0.328..43.734 rows=10000 loops=1)
  9. Output: id, col
  10. Workers Planned: 4
  11. Workers Launched: 4
  12. Buffers: shared hit=6
  13. -- 只有并行扫描,没有并行聚合。
  14. -> Parallel Seq Scan on public.test (cost=0.00..163.37 rows=3937 width=36) (actual time=0.017..0.891 rows=2000 loops=5)
  15. Output: id, col
  16. Buffers: shared hit=124
  17. Worker 0: actual time=0.019..0.177 rows=648 loops=1
  18. Buffers: shared hit=8
  19. Worker 1: actual time=0.022..0.180 rows=648 loops=1
  20. Buffers: shared hit=8
  21. Worker 2: actual time=0.017..3.772 rows=7570 loops=1
  22. Buffers: shared hit=94
  23. Worker 3: actual time=0.015..0.189 rows=648 loops=1
  24. Buffers: shared hit=8
  25. Planning time: 0.084 ms
  26. Execution time: 603.450 ms
  27. (22 rows)

例子2,有combinefunc,支持并行聚合。

  1. drop aggregate arragg(anyarray);
  2. create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, COMBINEFUNC = array_uniq_cat, PARALLEL=safe);

使用了并行聚合。

  1. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;
  2. QUERY PLAN
  3. -----------------------------------------------------------------------------------------------------------------------------------------
  4. Finalize HashAggregate (cost=1361.46..1363.46 rows=200 width=36) (actual time=285.489..285.732 rows=1001 loops=1)
  5. Output: id, arragg(col)
  6. Group Key: test.id
  7. Buffers: shared hit=36
  8. -> Gather (cost=1157.46..1159.46 rows=800 width=36) (actual time=63.654..74.163 rows=4297 loops=1)
  9. Output: id, (PARTIAL arragg(col))
  10. Workers Planned: 4
  11. Workers Launched: 4
  12. Buffers: shared hit=36
  13. -- 并行聚合
  14. -> Partial HashAggregate (cost=1157.46..1159.46 rows=200 width=36) (actual time=57.367..57.727 rows=859 loops=5)
  15. Output: id, PARTIAL arragg(col)
  16. Group Key: test.id
  17. Buffers: shared hit=886
  18. Worker 0: actual time=54.788..54.997 rows=857 loops=1
  19. Buffers: shared hit=213
  20. Worker 1: actual time=56.881..57.255 rows=861 loops=1
  21. Buffers: shared hit=213
  22. Worker 2: actual time=55.415..55.813 rows=856 loops=1
  23. Buffers: shared hit=212
  24. Worker 3: actual time=56.453..56.854 rows=838 loops=1
  25. Buffers: shared hit=212
  26. -> Parallel Seq Scan on public.test (cost=0.00..163.37 rows=3937 width=36) (actual time=0.011..0.736 rows=2000 loops=5)
  27. Output: id, col
  28. Buffers: shared hit=124
  29. Worker 0: actual time=0.009..0.730 rows=1981 loops=1
  30. Buffers: shared hit=25
  31. Worker 1: actual time=0.012..0.773 rows=2025 loops=1
  32. Buffers: shared hit=25
  33. Worker 2: actual time=0.015..0.741 rows=1944 loops=1
  34. Buffers: shared hit=24
  35. Worker 3: actual time=0.012..0.751 rows=1944 loops=1
  36. Buffers: shared hit=24
  37. Planning time: 0.073 ms
  38. Execution time: 285.949 ms
  39. (34 rows)

实际上并行聚合与分布式数据库聚合阶段原理是一样的,分布式数据库自定义聚合可以参考末尾的文章。

例子3,将多个一元数组聚合为一个一元数组

PostgreSQL内置的array_agg会将数组聚合为多元数组,有些场景无法满足需求。

  1. List of functions
  2. Schema | Name | Result data type | Argument data types | Type
  3. ------------+-------------------------+------------------+-----------------------+--------
  4. pg_catalog | array_agg | anyarray | anyarray | agg
  5. pg_catalog | array_agg | anyarray | anynonarray | agg
  1. postgres=# \set VERBOSITY verbose
  2. postgres=# select array_agg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
  3. ERROR: 2202E: cannot accumulate arrays of different dimensionality
  4. LOCATION: accumArrayResultArr, arrayfuncs.c:5270
  5. postgres=# select array_agg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
  6. array_agg
  7. -------------------
  8. {\{1,2,3\},\{3,4,5\}}
  9. (1 row)

如果要将数组合并为一元数组,可以自定义一个聚合函数如下:

  1. postgres=# create aggregate arragg (anyarray) (sfunc = array_cat, stype=anyarray, PARALLEL=safe);
  2. CREATE AGGREGATE
  3. postgres=# select arragg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
  4. arragg
  5. ---------------
  6. {1,2,3,3,4,5}
  7. (1 row)
  8. postgres=# select arragg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
  9. arragg
  10. -----------------
  11. {1,2,3,2,3,4,5}
  12. (1 row)

参考

https://www.postgresql.org/docs/10/static/sql-createaggregate.html

https://www.postgresql.org/docs/10/static/xaggr.html#XAGGR-PARTIAL-AGGREGATES

《PostgreSQL aggregate function customize》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》