CREATE AGGREGATE
On this page
Creates a user-defined aggregate function (UDAF).
SingleStore also supports Wasm-based UDAFs.
Syntax
CREATE [OR REPLACE] AGGREGATE function_name ( [parameter_list] )RETURNS { data_type [data_type_modifier] }WITH STATE data_typeINITIALIZE WITH udf_function_nameITERATE WITH udf_function_nameMERGE WITH udf_function_nameTERMINATE WITH udf_function_nameDEFINER = 'user'@parameter_list:data_type [data_type_modifier [, ...] ] [, ...]data_type_modifier:DEFAULT default_value | NOT NULL | NULL | COLLATE collation_name
Syntax for Wasm-based UDAFs
CREATE [OR REPLACE] AGGREGATE function_name ( [parameter_list] )RETURNS { data_type [data_type_modifier] }WITH STATE <data_type | HANDLE>AS WASM FROM content_src[WITH WIT FROM content_src]INITIALIZE WITH udf_function_nameITERATE WITH udf_function_nameMERGE WITH udf_function_nameTERMINATE WITH udf_function_name[ SERIALIZE WITH udf_function_nameDESERIALIZE WITH udf_function_name[ COPYMERGE WITH udf_function_nameCLONE WITH udf_function_nameDESTROY WITH udf_function_name ] ]DEFINER = 'user'@parameter_list:data_type [data_type_modifier [, ...] ] [, ...]data_type_modifier:DEFAULT default_value | NOT NULL | NULL | COLLATE collation_namecontentSrc: BASE64 '"' <base64> '" |HTTP '"' <url> '"' <cred> <config> |LINK <linkName> <connStr> |S3 <s3_config> |AZURE <azure_config> |GCS <gcs_config>
Arguments
OR REPLACE
If specified, replaces a UDAF if one already exists with the same name.
function_
The name of the function.SUM()
.
You can also specify database_
and function_
together by replacing function_
with database_
instead of specifying the database in USING database_
.
CREATE AGGREGATE db.some_func(int)...
Function names are not case-sensitive.
WITH STATE
The STATE
type of a UDAF may be a scalar type, ARRAY
type, or a RECORD
type.
The STATE
type HANDLE
is only supported with Wasm-based UDAFs.HANDLE
state, you must specify the SERIALIZE
and DESERIALIZE
functions.
parameter_
Input parameters are optional.,
).
UDAFs only allow scalar data types as input parameters.
The following example shows how to declare a single input parameter:
CREATE AGGREGATE my_sum(BIGINT)...
The following example demonstrates how to declare more than one input parameters that also specify a data type modifier:
CREATE AGGREGATE multi_param_example(INT, VARCHAR(255) NOT NULL COLLATE utf8_bin, DECIMAL(19,4))...
udf_
The name of each UDF function to execute for the INITIALIZE WITH
, ITERATE WITH
, MERGE WITH
, and TERMINATE WITH
clauses.INITIALIZE
function takes in no arguments, and it returns a STATE
data type.ITERATE
function takes in a STATE
data type and the input parameter data type, and it returns a STATE
data type.n
parameters, the ITERATE
function will take in n+1
arguments, with the first argument being the STATE
type.MERGE
function takes in two STATE
data types, and it returns a STATE
data type.TERMINATE
function takes in a STATE
data type, and it returns the type specified in the RETURNS
clause.
MySQL Client Delimiters
When creating a UDF using a MySQL-compatible client connected to SingleStore Helios, you must change the client delimiter to ensure that the function definition is correctly passed to the server and then set it back to a semicolon after the alternate delimiter is no longer needed.
Security and Permissions
The invoker of a UDAF must have EXECUTE
permissions on the UDAF.EXECUTE
permissions on each of the four UDFs that the UDAF uses.
Using optional parameter DEFINER
UDFs can be created or run based on the DEFINER
.DEFINER
is equivalent to the current user.DEFINER
optional parameter is used a special flag is stored in the metadata table which indicates if a different user is required to run the procedure.
Remarks
-
This command causes implicit commits.
Refer to COMMIT for more information.
Wasm-based UDAFs
A Wasm-based UDAF can optionally pass its state by reference instead of value.HANDLE
) between rows.
The life-cycle of the HANDLE
state must be managed in the Wasm code.INITIALIZE
function.ITERATE
calls can freely update the state associated with a handle or return a new handle.INITIALIZE
, MERGE
, TERMINATE
, SERIALIZE
, and DESERIALIZE
functions
Here's an example.MERGE
function.
-- Reuse one of the existing states
def merge(a, b):
a = a + b
delete b
return a
-- Return a new state
def merge(a, b):
c = a + b;
delete a
delete b
return c
Where a
, b
, and c
are synonymous to states.
Because each Wasm UDF runs in its own in-process sandbox, the UDAF needs to import all the Wasm functions (INITIALIZE
, ITERATE
, MERGE
, TERMINATE
, SERIALIZE
, and DESERIALIZE
) from the same Wasm module in a single CREATE AGGREGATE
statement.AS WASM
clause in the function definition to import the Wasm function.
The HANDLE
STATE
type also supports the CUBE
and ROLLUP
grouping operations.COPYMERGE
, CLONE
, and DESTROY
callback functions must be specified if the GROUP BY [CUBE|ROLLUP]
grouping operation is used.
-
COPYMERGE
: This callback is similar toMERGE
.The only difference is that it must not destroy either incoming HANDLE
, and it must produce a newHANDLE
as output. -
CLONE
: This callback accepts a singleHANDLE
argument and returns aHANDLE
.It must copy the data of the input HANDLE
and return a newHANDLE
with the copied data.It must not make any changes to the input HANDLE
. -
DESTROY
: This callback accepts a singleHANDLE
argument and returns an integer, which remains unused.
For information on how to import Wasm functions, refer to Create Wasm UDFs.CREATE AGGREGATE
statement is not supported.
Examples
Create UDAF
The following example creates a new UDAF named avg_
, which uses a RECORD
state type and has the same behavior as the builtin AVG
function.
Create UDF Dependencies
The avg_
example UDAF depends on the following UDFs:
DELIMITER //CREATE FUNCTION avg_init() RETURNS RECORD(s BIGINT, c BIGINT) ASBEGINRETURN ROW(0, 0);END //DELIMITER ;
DELIMITER //CREATE FUNCTION avg_iter(state RECORD(s BIGINT, c BIGINT), value BIGINT) RETURNS RECORD(s BIGINT, c BIGINT) ASBEGINRETURN ROW(state.s + value, state.c + 1);END //DELIMITER ;
DELIMITER //CREATE FUNCTION avg_merge(state1 RECORD(s BIGINT, c BIGINT), state2 RECORD(s BIGINT, c BIGINT)) RETURNS RECORD(s BIGINT, c BIGINT) ASBEGINRETURN row(state1.s + state2.s, state1.c + state2.c);END //DELIMITER ;
DELIMITER //CREATE FUNCTION avg_terminate(state RECORD(s BIGINT, c BIGINT)) RETURNS BIGINT ASBEGINRETURN state.s / state.c;END //DELIMITER ;
Create UDAF Example
Once the UDF dependencies have been created, you can create the UDAF.
CREATE AGGREGATE avg_udaf(BIGINT) RETURNS BIGINTWITH STATE RECORD(s BIGINT, c BIGINT)INITIALIZE WITH avg_initITERATE WITH avg_iterMERGE WITH avg_mergeTERMINATE WITH avg_terminate;
After the UDAF has been successfully created, execute the following commands to try it:
CREATE TABLE t (i BIGINT);INSERT INTO t VALUES (1), (2), (3), (4), (5);SELECT avg_udaf(i) FROM t;
+-------------+
| avg_udaf(i) |
+-------------+
| 3 |
+-------------+
1 row in set
Create aggregate using Definer
CREATE AGGREGATE avg_udaf(BIGINT) RETURNS BIGINTWITH STATE RECORD(s BIGINT, c BIGINT)INITIALIZE WITH avg_initITERATE WITH avg_iterMERGE WITH avg_mergeTERMINATE WITH avg_terminateDEFINER = 'user'@'%';
To view a list of the functions and see the definer used:
SHOW FUNCTIONS;
+------------------------+--------------------------------+---------+-------------+---------------+-----+
| Functions_in_func_proc | Function Type | Definer | Data Format | Runtime Type | Link|
+------------------------+--------------------------------+---------+-------------+---------------+-----+
| avg_init | User Defined Function | root@% | | PSQL | |
| avg_iter | User Defined Function | root@% | | PSQL | |
| avg_merge | User Defined Function | root@% | | PSQL | |
| avg_terminate | User Defined Function | root@% | | PSQL | |
+------------------------+--------------------------------+---------+-------------+---------------+-----+
Create Wasm-based UDAF
The following example imports Wasm-based UDAFs from the Bloom Filters in SingleStore GitHub repository into SingleStore Helios.
-
Import a Wasm-based UDAF and UDF.
-
Generate a Bloom Filter from a column with string values.
-
Run the Bloom Filter on another column.
This example imports the following modules from the repository:
-
bloom_
: A UDAF that generates a Bloom Filter from a column of string values.filter -
bloom_
: A UDF that returnsmaybe_ exists 0
if the argument string does not match the specified filter.If the argument string matches the filter, it may return 1
. -
bloom_
: Initializes a Bloom Filter with a predefined set of parameters.init_ handle -
bloom_
: Adds a value to the Bloom Filter.update_ handle -
bloom_
: Merges two Bloom Filters, and returns the handle to the merged filter.merge_ handle -
bloom_
: Serializes a Bloom Filter.serialize_ handle -
bloom_
: Deserializes a Bloom Filter.deserialize_ handle
Refer to the lib.
This example uses Visual Studio Code with the Dev Containers extension.
-
Clone the Github repository:
git clone https://github.com/singlestore-labs/singlestoredb-extension-bloom-filters.git -
In the Visual Studio Code Command Palette, select Reopen in Container, and use the Dockerfile to build the container.
You can also open the repository in a Rust container, and then manually install the WASI Cargo extension. -
Run the following command in the terminal to verify that the Cargo CLI is installed:
cargo -Vcargo x.x.x
-
Run the following command in the terminal to compile the Wasm modules:
cargo wasi build --releaseThis command generates the extension.
wasm file in the target/wasm32-wasi/release directory. -
Import the UDAF and UDF modules into SingleStore Helios.
-
Upload the extension.
wasm and extension. wit files to your cloud storage account, say an S3 bucket named wasm-modules. Note
While not officially supported by SingleStore, you may use the
pushwasm
tool to upload the.
andwit .
files to your workspace and create Wasm UDFs, UDAFs, or TVFs.wasm For more information, refer to pushwasm. -
Run the following SQL statement to import the
bloom_
Wasm-based UDAF.filter() CREATE OR REPLACE AGGREGATE bloom_filter(text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL)RETURNS LONGBLOB NOT NULLWITH STATE HANDLEAS WASM FROM S3 'wasm-modules/extension.wasm'CREDENTIALS '{"aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB","aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'WITH WIT FROM S3 'wasm-modules/extension.wit'CREDENTIALS '{"aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB","aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'INITIALIZE WITH bloom_init_handleITERATE WITH bloom_update_handleMERGE WITH bloom_merge_handleTERMINATE WITH bloom_serialize_handleSERIALIZE WITH bloom_serialize_handleDESERIALIZE WITH bloom_deserialize_handle; -
Run the following SQL statement to import the
bloom_
Wasm-based UDF.maybe_ exists() CREATE OR REPLACE FUNCTION bloom_maybe_existsAS WASM FROM S3 'wasm-modules/extension.wasm'CREDENTIALS '{"aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB","aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'WITH WIT FROM S3 'wasm-modules/extension.wit'CREDENTIALS '{"aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB","aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" };These statements import the extension.
wasm and extension. wit files from the S3 bucket specified in the CREATE AGGREGATE
function.
-
-
Run the following SQL statements to create a data set:
CREATE DATABASE dbTest;USE dbTest;CREATE TABLE tbSrc (tmp TEXT);INSERT INTO tbSrc VALUES ("Aron"), ("Jane"), ("Adam"), ("Millie"), ("Turner");CREATE TABLE tbTest (tmp TEXT);INSERT INTO tbTest VALUES ("Smith"), ("Millie"), ("Gray"), ("Jane"), ("Rose"); -
Run the following SQL statement to generate the Bloom Filter from the
tbSrc.
column and store them in a variable namedtmp @bloomf
:SELECT bloom_filter(tbSrc.tmp) FROM tbSrc INTO @bloomf; -
Run the Bloom Filter created earlier on the
tbTest.
column using the following SQL statement:tmp SELECT tmp "Value", bloom_maybe_exists(@bloomf, tmp) "Exists in tbTest" FROM tbTest;+--------+------------------+ | Value | Exists in tbTest | +--------+------------------+ | Rose | 0 | | Millie | 1 | | Gray | 0 | | Jane | 1 | | Smith | 0 | +--------+------------------+
Refer to Theta Sketch for more user-defined aggregate function examples.
Use CUBE
Grouping Operation
The following example imports Wasm-based UDFs and UDAFs from the Theta Sketch in SingleStore GitHub repository to demonstrate the use of CUBE
grouping operation.theta_
UDF and the theta_
UDAF to generate Theta Sketches from a column of data and return the estimates.
This example uses the following data set:
CREATE TABLE t1 (ID1 CHAR(1), ID2 INT);INSERT INTO t1 VALUES ('x',10), ('y',20), ('y',30), ('z',100), ('n',20), ('y',200);SELECT * FROM t1;
+-----+-----+
| ID1 | ID2 |
+-----+-----+
| y | 30 |
| y | 200 |
| y | 20 |
| x | 10 |
| n | 20 |
| z | 100 |
+-----+-----+
The following statement returns an estimate of the number of distinct values in ID2 column for each value in the ID1 column:
SELECT ID1, theta_sketch_get_estimate(theta_sketch_build_agg(ID2)) AS 'Estimate' FROM t1 GROUP BY ID1;
+-----+----------+
| ID1 | Estimate |
+-----+----------+
| y | 3 |
| n | 1 |
| x | 1 |
| z | 1 |
+-----+----------+
The following statement returns a set of all possible combinations (permutations) for values in ID1 and ID2 columns using the GROUP BY CUBE
clause:
SELECT ID1, ID2 FROM t1 GROUP BY CUBE(ID1,ID2);
+------+------+
| ID1 | ID2 |
+------+------+
| x | 10 |
| y | 200 |
| NULL | 10 |
| NULL | 200 |
| y | NULL |
| x | NULL |
| NULL | NULL |
| z | 100 |
| y | 20 |
| n | 20 |
| y | 30 |
| NULL | 100 |
| NULL | 20 |
| NULL | 30 |
| n | NULL |
| z | NULL |
+------+------+
You can also use the GROUP BY CUBE
clause to return the estimate of the number of occurrences of ID2 for each distinct ID1:
SELECT ID1,theta_sketch_get_estimate(theta_sketch_build_agg(ID2)) AS 'Estimate' FROM t1 GROUP BY CUBE(ID1);
+------+----------+
| ID1 | Estimate |
+------+----------+
| x | 1 |
| n | 1 |
| NULL | 5 |
| z | 1 |
| y | 3 |
+------+----------+
Last modified: January 10, 2024