Writing custom Aggregates for statistical or analytical purposes.
Reaching for something like SUM(vals)
or AVG(vals)
is a common habit when using PostgreSQL. These aggregate functions offer users an easy, efficient way to compute results from a set of inputs.
How do they work? What makes them different than a function? How do we make one? What kinds of other uses exist?
We’ll explore creating some basic ones using SQL, then create an extension that defines aggregates in Rust using pgx
0.3.0’s new aggregate support.
Aggregates in the world
Aggregates have a number of uses, beyond the ones already mentioned (sum and average) we can find slightly less conceptually straightforward aggregates like:
- JSON/JSONB/XML ‘collectors’:
json_agg
,json_object_agg
,xmlagg
. - Bitwise operators:
bit_and
,bit_xor
, etc. - Some
std::iter::Iterator
equivalents: - PostGIS‘s geospacial aggregates such as
ST_3DExtent
which produces bounding boxes over geometry sets. - PG-Strom‘s HyperLogLog aggregates such as
hll_count
.
So, aggregates can collect items, work like a fold
, or do complex analytical math… how do we make one?
What makes an Aggregate?
Defining an aggregate is via CREATE AGGREGATE
and CREATE FUNCTION
, here’s a reimplementation of sum
only for integer
:
CREATE FUNCTION example_sum_state(
state integer,
next integer
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT $1 + $2;
$$;
CREATE AGGREGATE example_sum(integer)
(
SFUNC = example_sum_state, -- State function
STYPE = integer, -- State type
INITCOND = '0' -- Must be a string or null
);
SELECT example_sum(value) FROM UNNEST(ARRAY [1, 2, 3]) as value;
-- example_sum
-- -------------
-- 6
-- (1 row)
Conceptually, the aggregate loops over each item in the input and runs the SFUNC
function on the current state as well as each value. That code is analogous to:
fn example_sum(values: Vec<isize>) -> isize {
let mut sum = 0;
for value in values {
sum += value;
}
sum
}
Aggregates are more than just a loop, though. If the aggregate specifies a combinefunc
PostgreSQL can run different instances of the aggregate over subsets of the data, then combine them later. This is called partial aggregation and enables different worker processes to handle data in parallel. Let’s make our example_sum
aggregate above have a combinefunc
:
(Readers may note we could use example_sum_state
in this particular case, but not in general, so we’re gonna make a new function for demonstration.)
CREATE FUNCTION example_sum_combine(
first integer,
second integer
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT $1 + $2;
$$;
DROP AGGREGATE example_sum(integer);
CREATE AGGREGATE example_sum(integer)
(
SFUNC = example_sum_state,
STYPE = integer,
INITCOND = '0',
combinefunc = example_sum_combine
);
SELECT example_sum(value) FROM generate_series(0, 4000) as value;
-- example_sum
-- -------------
-- 8002000
-- (1 row)
Here’s one using FINALFUNC
, which offers a way to compute some final value from the state:
CREATE FUNCTION example_uniq_state(
state text[],
next text
) RETURNS text[]
LANGUAGE SQL
STRICT
AS $$
SELECT array_append($1, $2);
$$;
CREATE FUNCTION example_uniq_final(
state text[]
) RETURNS integer
LANGUAGE SQL
STRICT
AS $$
SELECT count(DISTINCT value) FROM UNNEST(state) as value
$$;
CREATE AGGREGATE example_uniq(text)
(
SFUNC = example_uniq_state, -- State function
STYPE = text[], -- State type
INITCOND = '{}', -- Must be a string or null
FINALFUNC = example_uniq_final -- Final function
);
SELECT example_uniq(value) FROM UNNEST(ARRAY ['a', 'a', 'b']) as value;
-- example_uniq
-- --------------
-- 2
-- (1 row)
This is particularly handy as your STYPE
doesn’t need to be the type you return!
Aggregates can take multiple arguments, too:
CREATE FUNCTION example_concat_state(
state text[],
first text,
second text,
third text
) RETURNS text[]
LANGUAGE SQL
STRICT
AS $$
SELECT array_append($1, concat($2, $3, $4));
$$;
CREATE AGGREGATE example_concat(text, text, text)
(
SFUNC = example_concat_state,
STYPE = text[],
INITCOND = '{}'
);
SELECT example_concat(first, second, third) FROM
UNNEST(ARRAY ['a', 'b', 'c']) as first,
UNNEST(ARRAY ['1', '2', '3']) as second,
UNNEST(ARRAY ['!', '@', '#']) as third;
-- example_concat
-- ---------------------------------------------------------------------------------------------------------------
-- {a1!,a2!,a3!,b1!,b2!,b3!,c1!,c2!,c3!,a1@,a2@,a3@,b1@,b2@,b3@,c1@,c2@,c3@,a1#,a2#,a3#,b1#,b2#,b3#,c1#,c2#,c3#}
-- (1 row)
See how we see a1
, b1
, and c1
? Multiple arguments might not work as you expect! As you can see, each argument is passed with each other argument.
SELECT UNNEST(ARRAY ['a', 'b', 'c']) as first,
UNNEST(ARRAY ['1', '2', '3']) as second,
UNNEST(ARRAY ['!', '@', '#']) as third;
-- first | second | third
-- -------+--------+-------
-- a | 1 | !
-- b | 2 | @
-- c | 3 | #
-- (3 rows)
Aggregates have several more optional fields, such as a PARALLEL
. Their signatures are documented in the CREATE AGGREGATE
documentation and this article isn’t meant to be comprehensive.
Reminder: You can also create functions with
pl/pgsql
,c
, pl/Python, or even in the experimentalpl/Rust
.
Extensions can, of course, create aggregates too. Next, let’s explore how to do that with Rust using pgx
0.3.0’s Aggregate support.
Familiarizing with pgx
pgx
is a suite of crates that provide everything required to build, test, and package extensions for PostgreSQL versions 10 through 14 using pure Rust.
It includes:
cargo-pgx
: Acargo
plugin that provides commands likecargo pgx package
andcargo pgx test
,pgx
: A crate providing macros, high level abstractions (such as SPI), and low level generated bindings for PostgreSQL.pgx-tests
: A crate providing a test framework for running tests inside PostgreSQL.
Note: pgx
does not currently offer Windows support, but works great in WSL2.
If a Rust toolchain is not already installed, please follow the instructions on rustup.rs.
You’ll also need to make sure you have some development libraries like zlib
and libclang
, as cargo pgx init
will, by default, build it’s own development PostgreSQL installs. Usually it’s possible to figure out if something is missing from error messages and then discover the required package for the system.
Install cargo-pgx
then initialize its development PostgreSQL installations (used for cargo pgx test
and cargo pgx run
):
$ cargo install cargo-pgx
$ cargo pgx init
# ...
We can create a new extension with:
$ cargo pgx new exploring_aggregates
$ cd exploring_aggregates
Then run it:
$ cargo pgx run
# ...
building extension with features ``
"cargo" "build"
Finished dev [unoptimized + debuginfo] target(s) in 0.06s
installing extension
Copying control file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates.control`
Copying shared library to `/home/ana/.pgx/13.5/pgx-install/lib/postgresql/exploring_aggregates.so`
Discovering SQL entities
Discovered 1 SQL entities: 0 schemas (0 unique), 1 functions, 0 types, 0 enums, 0 sqls, 0 ords, 0 hashes
running SQL generator
"/home/ana/git/samples/exploring_aggregates/target/debug/sql-generator" "--sql" "/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql"
Copying extension schema file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql`
Finished installing exploring_aggregates
Starting Postgres v13 on port 28813
Creating database exploring_aggregates
psql (13.5)
Type "help" for help.
exploring_aggregates=#
Observing the start of the src/lib.rs
file, we can see the pg_module_magic!()
and a function hello_exploring_aggregates
:
use pgx::*;
pg_module_magic!();
#[pg_extern]
fn hello_exploring_aggregates() -> &'static str {
"Hello, exploring_aggregates"
}
Back on our psql
prompt, we can load the extension and run the function:
CREATE EXTENSION exploring_aggregates;
-- CREATE EXTENSION
\dx+ exploring_aggregates
-- Objects in extension "exploring_aggregates"
-- Object description
-- ---------------------------------------
-- function hello_exploring_aggregates()
-- (1 row)
SELECT hello_exploring_aggregates();
-- hello_exploring_aggregates
-- -----------------------------
-- Hello, exploring_aggregates
-- (1 row)
Next, let’s run the tests:
$ cargo pgx test
"cargo" "test" "--features" " pg_test"
Finished test [unoptimized + debuginfo] target(s) in 0.08s
Running unittests (target/debug/deps/exploring_aggregates-4783beb51375d29c)
running 1 test
building extension with features ` pg_test`
"cargo" "build" "--features" " pg_test"
Finished dev [unoptimized + debuginfo] target(s) in 0.41s
installing extension
Copying control file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates.control`
Copying shared library to `/home/ana/.pgx/13.5/pgx-install/lib/postgresql/exploring_aggregates.so`
Discovering SQL entities
Discovered 3 SQL entities: 1 schemas (1 unique), 2 functions, 0 types, 0 enums, 0 sqls, 0 ords, 0 hashes, 0 aggregates
running SQL generator
"/home/ana/git/samples/exploring_aggregates/target/debug/sql-generator" "--sql" "/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql"
Copying extension schema file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql`
Finished installing exploring_aggregates
test tests::pg_test_hello_exploring_aggregates ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 2.44s
Stopping Postgres
Running unittests (target/debug/deps/sql_generator-1bb38131b30894e5)
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
Doc-tests exploring_aggregates
running 0 tests
test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
We can also inspect the SQL the extension generates:
$ cargo pgx schema
Building SQL generator with features ``
"cargo" "build" "--bin" "sql-generator"
Finished dev [unoptimized + debuginfo] target(s) in 0.06s
Discovering SQL entities
Discovered 1 SQL entities: 0 schemas (0 unique), 1 functions, 0 types, 0 enums, 0 sqls, 0 ords, 0 hashes, 0 aggregates
running SQL generator
"/home/ana/git/samples/exploring_aggregates/target/debug/sql-generator" "--sql" "sql/exploring_aggregates-0.0.0.sql"
This creates sql/exploring_aggregates-0.0.0.sql
:
/*
This file is auto generated by pgx.
The ordering of items is not stable, it is driven by a dependency graph.
*/
-- src/lib.rs:5
-- exploring_aggregates::hello_exploring_aggregates
CREATE OR REPLACE FUNCTION "hello_exploring_aggregates"() RETURNS text /* &str */
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'hello_exploring_aggregates_wrapper';
Finally we can create a package for the pg_config
version installed on the system, this is done in release mode, so it takes a few minutes:
$ cargo pgx package
building extension with features ``
"cargo" "build" "--release"
Finished release [optimized] target(s) in 0.07s
installing extension
Discovering SQL entities
Discovered 1 SQL entities: 0 schemas (0 unique), 1 functions, 0 types, 0 enums, 0 sqls, 0 ords, 0 hashes, 0 aggregates
running SQL generator
"/home/ana/git/samples/exploring_aggregates/target/release/sql-generator" "--sql" "/home/ana/git/samples/exploring_aggregates/target/release/exploring_aggregates-pg13/usr/share/postgresql/13/extension/exploring_aggregates--0.0.0.sql"
Copying extension schema file to `target/release/exploring_aggregates-pg13/usr/share/postgresql/13/extension/exploring_aggregates--0.0.0.sql`
Finished installing exploring_aggregates
Let’s make some aggregates with pgx
now!
Aggregates with pgx
While designing the aggregate support for pgx
0.3.0 we wanted to try to make things feel idiomatic and natural from the Rust side, but it should be flexible enough for any use.
Aggregates in pgx
are defined by creating a type (this doesn’t necessarily need to be the state type), then using the #[pg_aggregate]
procedural macro on an pgx::Aggregate
implementation for that type.
The pgx::Aggregate
trait has quite a few items (fn
s, const
s, type
s) that you can implement, but the procedural macro can fill in stubs for all non-essential items. The state type (the implementation target by default) must have a #[derive(PostgresType)]
declaration, or be a type PostgreSQL already knows about.
Here’s the simplest aggregate you can make with pgx
:
use pgx::*;
use serde::{Serialize, Deserialize};
pg_module_magic!();
#[derive(Copy, Clone, Default, Debug, PostgresType, Serialize, Deserialize)]
pub struct DemoSum {
count: i32,
}
#[pg_aggregate]
impl Aggregate for DemoSum {
const INITIAL_CONDITION: Option<&'static str> = Some(r#"{ "count": 0 }"#);
type Args = i32;
fn state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo
) -> Self::State {
current.count += arg;
current
}
}
We can review the generated SQL (generated via cargo pgx schema
):
/*
This file is auto generated by pgx.
The ordering of items is not stable, it is driven by a dependency graph.
*/
-- src/lib.rs:6
-- exploring_aggregates::DemoSum
CREATE TYPE DemoSum;
-- src/lib.rs:6
-- exploring_aggregates::demosum_in
CREATE OR REPLACE FUNCTION "demosum_in"(
"input" cstring /* &cstr_core::CStr */
) RETURNS DemoSum /* exploring_aggregates::DemoSum */
IMMUTABLE PARALLEL SAFE STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demosum_in_wrapper';
-- src/lib.rs:6
-- exploring_aggregates::demosum_out
CREATE OR REPLACE FUNCTION "demosum_out"(
"input" DemoSum /* exploring_aggregates::DemoSum */
) RETURNS cstring /* &cstr_core::CStr */
IMMUTABLE PARALLEL SAFE STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demosum_out_wrapper';
-- src/lib.rs:6
-- exploring_aggregates::DemoSum
CREATE TYPE DemoSum (
INTERNALLENGTH = variable,
INPUT = demosum_in, /* exploring_aggregates::demosum_in */
OUTPUT = demosum_out, /* exploring_aggregates::demosum_out */
STORAGE = extended
);
-- src/lib.rs:11
-- exploring_aggregates::demo_sum_state
CREATE OR REPLACE FUNCTION "demo_sum_state"(
"this" DemoSum, /* exploring_aggregates::DemoSum */
"arg_one" integer /* i32 */
) RETURNS DemoSum /* exploring_aggregates::DemoSum */
STRICT
LANGUAGE c /* Rust */
AS 'MODULE_PATHNAME', 'demo_sum_state_wrapper';
-- src/lib.rs:11
-- exploring_aggregates::DemoSum
CREATE AGGREGATE DemoSum (
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = DemoSum, /* exploring_aggregates::DemoSum */
INITCOND = '{ "count": 0 }' /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
);
We can test it out with cargo pgx run
:
$ cargo pgx run
Stopping Postgres v13
building extension with features ``
"cargo" "build"
Finished dev [unoptimized + debuginfo] target(s) in 0.06s
installing extension
Copying control file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates.control`
Copying shared library to `/home/ana/.pgx/13.5/pgx-install/lib/postgresql/exploring_aggregates.so`
Discovering SQL entities
Discovered 5 SQL entities: 0 schemas (0 unique), 3 functions, 1 types, 0 enums, 0 sqls, 0 ords, 0 hashes, 1 aggregates
running SQL generator
"/home/ana/git/samples/exploring_aggregates/target/debug/sql-generator" "--sql" "/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql"
Copying extension schema file to `/home/ana/.pgx/13.5/pgx-install/share/postgresql/extension/exploring_aggregates--0.0.0.sql`
Finished installing exploring_aggregates
Starting Postgres v13 on port 28813
Re-using existing database exploring_aggregates
psql (13.5)
Type "help" for help.
exploring_aggregates=#
Now we’re connected via psql
:
CREATE EXTENSION exploring_aggregates;
-- CREATE EXTENSION
SELECT DemoSum(value) FROM generate_series(0, 4000) as value;
-- demosum
-- -------------------
-- {"count":8002000}
-- (1 row)
Pretty cool!
…But we don’t want that silly {"count": ... }
stuff, just the number! We can resolve this by changing the State
type, or by adding a finalize
(which maps to ffunc
) as we saw in the previous section.
Let’s change the State
this time:
#[derive(Copy, Clone, Default, Debug)]
pub struct DemoSum;
#[pg_aggregate]
impl Aggregate for DemoSum {
const INITIAL_CONDITION: Option<&'static str> = Some(r#"0"#);
type Args = i32;
type State = i32;
fn state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
current += arg;
current
}
}
Now when we run it:
SELECT DemoSum(value) FROM generate_series(0, 4000) as value;
-- demosum
-- ---------
-- 8002000
-- (1 row)
This is a fine reimplementation of SUM
so far, but as we saw previously we need a combine
(mapping to combinefunc
) to support partial aggregation:
#[pg_aggregate]
impl Aggregate for DemoSum {
// ...
fn combine(
mut first: Self::State,
second: Self::State,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
first += second;
first
}
}
We can also change the name of the generated aggregate, or set the PARALLEL
settings, for example:
#[pg_aggregate]
impl Aggregate for DemoSum {
// ...
const NAME: &'static str = "demo_sum";
const PARALLEL: Option<ParallelOption> = Some(pgx::aggregate::ParallelOption::Unsafe);
// ...
}
This generates:
-- src/lib.rs:9
-- exploring_aggregates::DemoSum
CREATE AGGREGATE demo_sum (
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = integer, /* i32 */
COMBINEFUNC = "demo_sum_combine", /* exploring_aggregates::DemoSum::combine */
INITCOND = '0', /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
PARALLEL = UNSAFE /* exploring_aggregates::DemoSum::PARALLEL */
);
Rust state types
It’s possible to use a non-SQL (say, HashSet<String>
) type as a state by using Internal
.
When using this strategy, a
finalize
function must be provided.
Here’s a unique string counter aggregate that uses a HashSet
:
use pgx::*;
use std::collections::HashSet;
pg_module_magic!();
#[derive(Copy, Clone, Default, Debug)]
pub struct DemoUnique;
#[pg_aggregate]
impl Aggregate for DemoUnique {
type Args = &'static str;
type State = Internal;
type Finalize = i32;
fn state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
let inner = unsafe { current.get_or_insert_default::<HashSet<String>>() };
inner.insert(arg.to_string());
current
}
fn combine(
mut first: Self::State,
mut second: Self::State,
_fcinfo: pg_sys::FunctionCallInfo
) -> Self::State {
let first_inner = unsafe { first.get_or_insert_default::<HashSet<String>>() };
let second_inner = unsafe { second.get_or_insert_default::<HashSet<String>>() };
let unioned: HashSet<_> = first_inner.union(second_inner).collect();
Internal::new(unioned)
}
fn finalize(
mut current: Self::State,
_direct_arg: Self::OrderedSetArgs,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::Finalize {
let inner = unsafe { current.get_or_insert_default::<HashSet<String>>() };
inner.len() as i32
}
}
We can test it:
SELECT DemoUnique(value) FROM UNNEST(ARRAY ['a', 'a', 'b']) as value;
-- demounique
-- ------------
-- 2
-- (1 row)
Using Internal
here means that the values it holds get dropped at the end of PgMemoryContexts::CurrentMemoryContext
, the aggregate context in this case.
Ordered-Set Aggregates
PostgreSQL also supports what are called Ordered-Set Aggregates. Ordered-Set Aggregates can take a direct argument, and specify a sort ordering for the inputs.
PostgreSQL does not order inputs behind the scenes!
Let’s create a simple percentile_disc
reimplementation to get an idea of how to make one with pgx
. You’ll notice we add ORDERED_SET = true
and set an (optional) OrderedSetArgs
, which determines the direct arguments.
#[derive(Copy, Clone, Default, Debug)]
pub struct DemoPercentileDisc;
#[pg_aggregate]
impl Aggregate for DemoPercentileDisc {
type Args = name!(input, i32);
type State = Internal;
type Finalize = i32;
const ORDERED_SET: bool = true;
type OrderedSetArgs = name!(percentile, f64);
fn state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
let inner = unsafe { current.get_or_insert_default::<Vec<i32>>() };
inner.push(arg);
current
}
fn finalize(
mut current: Self::State,
direct_arg: Self::OrderedSetArgs,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::Finalize {
let inner = unsafe { current.get_or_insert_default::<Vec<i32>>() };
// This isn't done for us!
inner.sort();
let target_index = (inner.len() as f64 * direct_arg).round() as usize;
inner[target_index.saturating_sub(1)]
}
}
This creates SQL like:
-- src/lib.rs:9
-- exploring_aggregates::DemoPercentileDisc
CREATE AGGREGATE DemoPercentileDisc (
"percentile" double precision /* f64 */
ORDER BY
"input" integer /* i32 */
)
(
SFUNC = "demo_percentile_disc_state", /* exploring_aggregates::DemoPercentileDisc::state */
STYPE = internal, /* pgx::datum::internal::Internal */
FINALFUNC = "demo_percentile_disc_finalize" /* exploring_aggregates::DemoPercentileDisc::final */
);
We can test it like so:
SELECT DemoPercentileDisc(0.5) WITHIN GROUP (ORDER BY income) FROM UNNEST(ARRAY [6000, 70000, 500]) as income;
-- demopercentiledisc
-- --------------------
-- 6000
-- (1 row)
SELECT DemoPercentileDisc(0.05) WITHIN GROUP (ORDER BY income) FROM UNNEST(ARRAY [5, 100000000, 6000, 70000, 500]) as income;
-- demopercentiledisc
-- --------------------
-- 5
-- (1 row)
Moving-Aggregate mode
Aggregates can also support moving-aggregate mode, which can remove inputs from the aggregate as well.
This allows for some optimization if you are using aggregates as window functions. The documentation explains that this is because PostgreSQL doesn’t need to recalculate the aggregate each time the frame starting point moves.
Moving-aggregate mode has it’s own moving_state
function as well as an moving_state_inverse
function for removing inputs. Because moving-aggregate mode may require some additional tracking on the part of the aggregate, there is also a MovingState
associated type as well as a moving_state_finalize
function for any specialized final computation.
Let’s take our sum example above and add moving-aggregate mode support to it:
#[derive(Copy, Clone, Default, Debug)]
pub struct DemoSum;
#[pg_aggregate]
impl Aggregate for DemoSum {
const NAME: &'static str = "demo_sum";
const PARALLEL: Option<ParallelOption> = Some(pgx::aggregate::ParallelOption::Unsafe);
const INITIAL_CONDITION: Option<&'static str> = Some(r#"0"#);
const MOVING_INITIAL_CONDITION: Option<&'static str> = Some(r#"0"#);
type Args = i32;
type State = i32;
type MovingState = i32;
fn state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
pgx::log!("state({}, {})", current, arg);
current += arg;
current
}
fn moving_state(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::MovingState {
pgx::log!("moving_state({}, {})", current, arg);
current += arg;
current
}
fn moving_state_inverse(
mut current: Self::State,
arg: Self::Args,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::MovingState {
pgx::log!("moving_state_inverse({}, {})", current, arg);
current -= arg;
current
}
fn combine(
mut first: Self::State,
second: Self::State,
_fcinfo: pg_sys::FunctionCallInfo,
) -> Self::State {
pgx::log!("combine({}, {})", first, second);
first += second;
first
}
}
This generates:
-- src/lib.rs:8
-- exploring_aggregates::DemoSum
CREATE AGGREGATE demo_sum (
integer /* i32 */
)
(
SFUNC = "demo_sum_state", /* exploring_aggregates::DemoSum::state */
STYPE = integer, /* i32 */
COMBINEFUNC = "demo_sum_combine", /* exploring_aggregates::DemoSum::combine */
INITCOND = '0', /* exploring_aggregates::DemoSum::INITIAL_CONDITION */
MSFUNC = "demo_sum_moving_state", /* exploring_aggregates::DemoSum::moving_state */
MINVFUNC = "demo_sum_moving_state_inverse", /* exploring_aggregates::DemoSum::moving_state_inverse */
MINITCOND = '0', /* exploring_aggregates::DemoSum::MOVING_INITIAL_CONDITION */
PARALLEL = UNSAFE, /* exploring_aggregates::DemoSum::PARALLEL */
MSTYPE = integer /* exploring_aggregates::DemoSum::MovingState = i32 */
);
Using it (we’ll also turn on logging to see what happens with SET client_min_messages TO debug;
):
SET client_min_messages TO debug;
-- SET
SELECT demo_sum(value) OVER (
ROWS CURRENT ROW
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(0, 20)
-- LOG: moving_state(0, 300)
-- LOG: moving_state(0, 4000)
-- demo_sum
-- ----------
-- 1
-- 20
-- 300
-- 4000
-- (4 rows)
Inside the OVER ()
we can use syntax for window function calls.
SELECT demo_sum(value) OVER (
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(1, 20)
-- LOG: moving_state(21, 300)
-- LOG: moving_state(321, 4000)
-- demo_sum
-- ----------
-- 4321
-- 4321
-- 4321
-- 4321
-- (4 rows)
SELECT demo_sum(value) OVER (
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
) FROM UNNEST(ARRAY [1, 20, 300, 4000]) as value;
-- LOG: moving_state(0, 1)
-- LOG: moving_state(1, 20)
-- LOG: moving_state_inverse(21, 1)
-- LOG: moving_state(20, 300)
-- LOG: moving_state_inverse(320, 20)
-- LOG: moving_state(300, 4000)
-- demo_sum
-- ----------
-- 1
-- 21
-- 320
-- 4300
-- (4 rows)
SELECT demo_sum(value) OVER (
ORDER BY sorter
ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING
) FROM (
VALUES (1, 10000),
(2, 1)
) AS v (sorter, value);
-- LOG: moving_state(0, 10000)
-- LOG: moving_state(10000, 1)
-- LOG: moving_state_inverse(10001, 10000)
-- demo_sum
-- ----------
-- 10001
-- 1
-- (2 rows)
Wrapping up
I had a lot of fun implementing the aggregate support for pgx
, and hope you have just as much fun using it! If you have questions, open up an issue.
Moving-aggregate mode is pretty new to me, and I’m still learning about it! If you have any good resources I’d love to recieve them from you!
If you’re looking for more materials about aggregates, the TimescaleDB folks wrote about aggregates and how they impacted their hyperfunctions in this article. Also, My pal Tim McNamara wrote about how to implement harmonic and geometric means as aggregates in this article.