CREATE AGGREGATE

Creates a user-defined aggregate function (UDAF). A UDAF is a callable routine that accepts input parameters, executes programmatic logic in the function body, and returns a scalar-type value.

SingleStore also supports Wasm-based UDAFs.

Syntax

CREATE [OR REPLACE] AGGREGATE function_name ( [parameter_list] )
RETURNS { data_type [data_type_modifier] }
WITH STATE data_type
INITIALIZE WITH udf_function_name
ITERATE WITH udf_function_name
MERGE WITH udf_function_name
TERMINATE WITH udf_function_name
DEFINER = 'user'@
parameter_list:
data_type [data_type_modifier [, ...] ] [, ...]
data_type_modifier:
DEFAULT default_value | NOT NULL | NULL | COLLATE collation_name

Syntax for Wasm-based UDAFs

CREATE [OR REPLACE] AGGREGATE function_name ( [parameter_list] )
RETURNS { data_type [data_type_modifier] }
WITH STATE <data_type | HANDLE>
AS WASM FROM content_src
[WITH WIT FROM content_src]
INITIALIZE WITH udf_function_name
ITERATE WITH udf_function_name
MERGE WITH udf_function_name
TERMINATE WITH udf_function_name
[ SERIALIZE WITH udf_function_name
DESERIALIZE WITH udf_function_name
[ COPYMERGE WITH udf_function_name
CLONE WITH udf_function_name
DESTROY WITH udf_function_name ] ]
DEFINER = 'user'@
parameter_list:
data_type [data_type_modifier [, ...] ] [, ...]
data_type_modifier:
DEFAULT default_value | NOT NULL | NULL | COLLATE collation_name
contentSrc: BASE64 '"' <base64> '" |
HTTP '"' <url> '"' <cred> <config> |
LINK <linkName> <connStr> |
S3 <s3_config> |
AZURE <azure_config> |
GCS <gcs_config>

Arguments

OR REPLACE

If specified, replaces a UDAF if one already exists with the same name.

function_name

The name of the function. By their nature, UDAF names may override existing builtin aggregate function names for scalar data types, such as SUM(). UDAF names cannot be use the same name as stored procedures, tables, views, user-defined scalar-value functions (UDFs), or user-defined table-valued functions (TVFs).

You can also specify database_name and function_name together by replacing function_name with database_name.function_name instead of specifying the database in USING database_name. For example, you can write the following:

CREATE AGGREGATE db.some_func(int)
...

Function names are not case-sensitive. For details on case-sensitivity, refer to the Database Object Case-Sensitivity topic.

WITH STATE

The STATE type of a UDAF may be a scalar type, ARRAY type, or a RECORD type.

The STATE type HANDLE is only supported with Wasm-based UDAFs. When using the HANDLE state, you must specify the SERIALIZE and DESERIALIZE functions.

parameter_list

Input parameters are optional. Any number of input parameters can be specified, and each must be delimited by a comma (,). Each input parameter is specified only by its data type and optional modifier.

UDAFs only allow scalar data types as input parameters. See the Data Types topic for more information.

The following example shows how to declare a single input parameter:

CREATE AGGREGATE my_sum(BIGINT)
...

The following example demonstrates how to declare more than one input parameters that also specify a data type modifier:

CREATE AGGREGATE multi_param_example(INT, VARCHAR(255) NOT NULL COLLATE utf8_bin, DECIMAL(19,4))
...

udf_function_name

The name of each UDF function to execute for the INITIALIZE WITH, ITERATE WITH, MERGE WITH, and TERMINATE WITH clauses. The INITIALIZE function takes in no arguments, and it returns a STATE data type. The ITERATE function takes in a STATE data type and the input parameter data type, and it returns a STATE data type. If the UDAF has n parameters, the ITERATE function will take in n+1 arguments, with the first argument being the STATE type. The MERGE function takes in two STATE data types, and it returns a STATE data type. The TERMINATE function takes in a STATE data type, and it returns the type specified in the RETURNS clause.

MySQL Client Delimiters

When creating a UDF using a MySQL-compatible client connected to SingleStore Helios, you must change the client delimiter to ensure that the function definition is correctly passed to the server and then set it back to a semicolon after the alternate delimiter is no longer needed. See the MySQL Client Delimiters topic for details on MySQL client delimiters.

Security and Permissions

The invoker of a UDAF must have EXECUTE permissions on the UDAF. Also, the UDAF’s definer must have EXECUTE permissions on each of the four UDFs that the UDAF uses.

Using optional parameter DEFINER

UDFs can be created or run based on the DEFINER. The DEFINER is equivalent to the current user. When the DEFINER optional parameter is used a special flag is stored in the metadata table which indicates if a different user is required to run the procedure.

Remarks

  • This command causes implicit commits. Refer to COMMIT for more information.

Wasm-based UDAFs

A Wasm-based UDAF can optionally pass its state by reference instead of value. For aggregation operations that require a state with a large amount of data, passing state by reference can improve performance. For example, a Wasm-based UDAF that maintains a hash table with thousands of entries. In this case, passing state by value necessitates the serialization of the hash table in and out of each Wasm function for every single row. Hence, passing state by reference substantially eliminates this overhead by allowing the Wasm function to internally maintain the data structure and pass a pointer (i.e. a HANDLE) between rows.

The life-cycle of the HANDLE state must be managed in the Wasm code. The implicit contract states that you need to create a unique handle for each call to the INITIALIZE function. At the same time, ITERATE calls can freely update the state associated with a handle or return a new handle. However, any state that is no longer accessible by the handle must be cleaned up, i.e., for any handle that is not returned, its state must be cleaned up by the functions. This applies to INITIALIZE, MERGE, TERMINATE, SERIALIZE, and DESERIALIZE functions

Here's an example. Two states are passed to the MERGE function. The user can reuse any of the provided states or create a new one. The following pseudo-code blocks demonstrate these use cases:

-- Reuse one of the existing states
def merge(a, b):
a = a + b
delete b
return a

-- Return a new state 
def merge(a, b):
c = a + b;
delete a
delete b
return c

Where a, b, and c are synonymous to states. In this example, any state that is no longer in use, is deleted (cleaned up).

Because each Wasm UDF runs in its own in-process sandbox, the UDAF needs to import all the Wasm functions (INITIALIZE, ITERATE, MERGE, TERMINATE, SERIALIZE, and DESERIALIZE) from the same Wasm module in a single CREATE AGGREGATE statement. Use the AS WASM clause in the function definition to import the Wasm function.

The HANDLE STATE type also supports the CUBE and ROLLUP grouping operations. The COPYMERGE, CLONE, and DESTROY callback functions must be specified if the GROUP BY [CUBE|ROLLUP] grouping operation is used.

  • COPYMERGE: This callback is similar to MERGE. The only difference is that it must not destroy either incoming HANDLE, and it must produce a new HANDLE as output.

  • CLONE: This callback accepts a single HANDLE argument and returns a HANDLE. It must copy the data of the input HANDLE and return a new HANDLE with the copied data. It must not make any changes to the input HANDLE.

  • DESTROY: This callback accepts a single HANDLE argument and returns an integer, which remains unused.

For information on how to import Wasm functions, refer to Create Wasm UDFs. Because Wasm functions are sandboxed, combining Wasm functions and PSQL functions in the same CREATE AGGREGATE statement is not supported.

Examples

Create UDAF

The following example creates a new UDAF named avg_udaf, which uses a RECORD state type and has the same behavior as the builtin AVG function. Note that before we can create the UDAF, each of the prerequisite user-defined scalar functions (UDFs) must be created, as the UDAF definition depends on their existence.

Create UDF Dependencies

The avg_udaf example UDAF depends on the following UDFs:

DELIMITER //
CREATE FUNCTION avg_init() RETURNS RECORD(s BIGINT, c BIGINT) AS
BEGIN
RETURN ROW(0, 0);
END //
DELIMITER ;
DELIMITER //
CREATE FUNCTION avg_iter(state RECORD(s BIGINT, c BIGINT), value BIGINT) RETURNS RECORD(s BIGINT, c BIGINT) AS
BEGIN
RETURN ROW(state.s + value, state.c + 1);
END //
DELIMITER ;
DELIMITER //
CREATE FUNCTION avg_merge(state1 RECORD(s BIGINT, c BIGINT), state2 RECORD(s BIGINT, c BIGINT)) RETURNS RECORD(s BIGINT, c BIGINT) AS
BEGIN
RETURN row(state1.s + state2.s, state1.c + state2.c);
END //
DELIMITER ;
DELIMITER //
CREATE FUNCTION avg_terminate(state RECORD(s BIGINT, c BIGINT)) RETURNS BIGINT AS
BEGIN
RETURN state.s / state.c;
END //
DELIMITER ;

Create UDAF Example

Once the UDF dependencies have been created, you can create the UDAF. Execute the following statement:

CREATE AGGREGATE avg_udaf(BIGINT) RETURNS BIGINT
WITH STATE RECORD(s BIGINT, c BIGINT)
INITIALIZE WITH avg_init
ITERATE WITH avg_iter
MERGE WITH avg_merge
TERMINATE WITH avg_terminate;

After the UDAF has been successfully created, execute the following commands to try it:

CREATE TABLE t (i BIGINT);
INSERT INTO t VALUES (1), (2), (3), (4), (5);
SELECT avg_udaf(i) FROM t;
+-------------+
| avg_udaf(i) |
+-------------+
|           3 |
+-------------+
1 row in set

Create aggregate using Definer

CREATE AGGREGATE avg_udaf(BIGINT) RETURNS BIGINT
  WITH STATE RECORD(s BIGINT, c BIGINT)
  INITIALIZE WITH avg_init
  ITERATE WITH avg_iter
  MERGE WITH avg_merge
  TERMINATE WITH avg_terminate
  DEFINER = 'user'@'%';

To view a list of the functions and see the definer used:

SHOW FUNCTIONS;
+------------------------+--------------------------------+---------+-------------+---------------+-----+
| Functions_in_func_proc | Function Type                  | Definer | Data Format | Runtime Type  | Link|
+------------------------+--------------------------------+---------+-------------+---------------+-----+
| avg_init               | User Defined Function          | root@%  |             | PSQL          |     |
| avg_iter               | User Defined Function          | root@%  |             | PSQL          |     |
| avg_merge              | User Defined Function          | root@%  |             | PSQL          |     |
| avg_terminate          | User Defined Function          | root@%  |             | PSQL          |     | 
+------------------------+--------------------------------+---------+-------------+---------------+-----+

Create Wasm-based UDAF

The following example imports Wasm-based UDAFs from the Bloom Filters in SingleStore GitHub repository into SingleStore Helios. Bloom filters are a space-efficient probabilistic data structure that can be used to test whether an element is a member of a set. This example performs the following operations:

  • Import a Wasm-based UDAF and UDF.

  • Generate a Bloom Filter from a column with string values.

  • Run the Bloom Filter on another column.

This example imports the following modules from the repository:

  • bloom_filter: A UDAF that generates a Bloom Filter from a column of string values.

  • bloom_maybe_exists: A UDF that returns 0 if the argument string does not match the specified filter. If the argument string matches the filter, it may return 1.

  • bloom_init_handle: Initializes a Bloom Filter with a predefined set of parameters.

  • bloom_update_handle: Adds a value to the Bloom Filter.

  • bloom_merge_handle: Merges two Bloom Filters, and returns the handle to the merged filter.

  • bloom_serialize_handle: Serializes a Bloom Filter.

  • bloom_deserialize_handle: Deserializes a Bloom Filter.

Refer to the lib.rs file in the repository for the source code.

This example uses Visual Studio Code with the Dev Containers extension.

  1. Clone the Github repository:

    git clone https://github.com/singlestore-labs/singlestoredb-extension-bloom-filters.git
  2. In the Visual Studio Code Command Palette, select Reopen in Container, and use the Dockerfile to build the container. You can also open the repository in a Rust container, and then manually install the WASI Cargo extension.

  3. Run the following command in the terminal to verify that the Cargo CLI is installed:

    cargo -V
    cargo x.x.x
  4. Run the following command in the terminal to compile the Wasm modules:

    cargo wasi build --release

    This command generates the extension.wasm file in the target/wasm32-wasi/release directory.

  5. Import the UDAF and UDF modules into SingleStore Helios.

    1. Upload the extension.wasm and extension.wit files to your cloud storage account, say an S3 bucket named wasm-modules.

      Note

      While not officially supported by SingleStore, you may use the pushwasm tool to upload the .wit and .wasm files to your workspace and create Wasm UDFs, UDAFs, or TVFs. For more information, refer to pushwasm.

    2. Run the following SQL statement to import the bloom_filter() Wasm-based UDAF.

      CREATE OR REPLACE AGGREGATE bloom_filter(
      text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL)
      RETURNS LONGBLOB NOT NULL
      WITH STATE HANDLE
      AS WASM FROM S3 'wasm-modules/extension.wasm'
      CREDENTIALS '{
      "aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB",
      "aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'
      WITH WIT FROM S3 'wasm-modules/extension.wit'
      CREDENTIALS '{
      "aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB",
      "aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'
      INITIALIZE WITH bloom_init_handle
      ITERATE WITH bloom_update_handle
      MERGE WITH bloom_merge_handle
      TERMINATE WITH bloom_serialize_handle
      SERIALIZE WITH bloom_serialize_handle
      DESERIALIZE WITH bloom_deserialize_handle;
    3. Run the following SQL statement to import the bloom_maybe_exists() Wasm-based UDF.

      CREATE OR REPLACE FUNCTION bloom_maybe_exists
      AS WASM FROM S3 'wasm-modules/extension.wasm'
      CREDENTIALS '{
      "aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB",
      "aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" }'
      WITH WIT FROM S3 'wasm-modules/extension.wit'
      CREDENTIALS '{
      "aws_access_key_id": "ASIAZPIKLSJ3HM7FKAUB",
      "aws_secret_access_key": FwoGZXIvYXdzEL3fv [...]" };

      These statements import the extension.wasm and extension.wit files from the S3 bucket specified in the CREATE AGGREGATE function.

  6. Run the following SQL statements to create a data set:

    CREATE DATABASE dbTest;
    USE dbTest;
    CREATE TABLE tbSrc (tmp TEXT);
    INSERT INTO tbSrc VALUES ("Aron"), ("Jane"), ("Adam"), ("Millie"), ("Turner");
    CREATE TABLE tbTest (tmp TEXT);
    INSERT INTO tbTest VALUES ("Smith"), ("Millie"), ("Gray"), ("Jane"), ("Rose");
  7. Run the following SQL statement to generate the Bloom Filter from the tbSrc.tmp column and store them in a variable named @bloomf:

    SELECT bloom_filter(tbSrc.tmp) FROM tbSrc INTO @bloomf;
  8. Run the Bloom Filter created earlier on the tbTest.tmp column using the following SQL statement:

    SELECT tmp "Value", bloom_maybe_exists(@bloomf, tmp) "Exists in tbTest" FROM tbTest;
    +--------+------------------+
    | Value  | Exists in tbTest |
    +--------+------------------+
    | Rose   |                0 |
    | Millie |                1 |
    | Gray   |                0 |
    | Jane   |                1 |
    | Smith  |                0 |
    +--------+------------------+

Refer to Theta Sketch for more user-defined aggregate function examples.

Use CUBE Grouping Operation

The following example imports Wasm-based UDFs and UDAFs from the Theta Sketch in SingleStore GitHub repository to demonstrate the use of CUBE grouping operation. Refer to the GitHub repository for the source code. This example uses the theta_sketch_get_estimate() UDF and the theta_sketch_build_agg() UDAF to generate Theta Sketches from a column of data and return the estimates.

This example uses the following data set:

CREATE TABLE t1 (ID1 CHAR(1), ID2 INT);
INSERT INTO t1 VALUES ('x',10), ('y',20), ('y',30), ('z',100), ('n',20), ('y',200);
SELECT * FROM t1;
+-----+-----+
| ID1 | ID2 |
+-----+-----+
| y   |  30 |
| y   | 200 |
| y   |  20 |
| x   |  10 |
| n   |  20 |
| z   | 100 |
+-----+-----+

The following statement returns an estimate of the number of distinct values in ID2 column for each value in the ID1 column:

SELECT ID1, theta_sketch_get_estimate(theta_sketch_build_agg(ID2)) AS 'Estimate' FROM t1 GROUP BY ID1;
+-----+----------+
| ID1 | Estimate |
+-----+----------+
| y   |        3 |
| n   |        1 |
| x   |        1 |
| z   |        1 |
+-----+----------+

The following statement returns a set of all possible combinations (permutations) for values in ID1 and ID2 columns using the GROUP BY CUBE clause:

SELECT ID1, ID2 FROM t1 GROUP BY CUBE(ID1,ID2);
+------+------+
| ID1  | ID2  |
+------+------+
| x    |   10 |
| y    |  200 |
| NULL |   10 |
| NULL |  200 |
| y    | NULL |
| x    | NULL |
| NULL | NULL |
| z    |  100 |
| y    |   20 |
| n    |   20 |
| y    |   30 |
| NULL |  100 |
| NULL |   20 |
| NULL |   30 |
| n    | NULL |
| z    | NULL |
+------+------+

You can also use the GROUP BY CUBE clause to return the estimate of the number of occurrences of ID2 for each distinct ID1:

SELECT ID1,theta_sketch_get_estimate(theta_sketch_build_agg(ID2)) AS 'Estimate' FROM t1 GROUP BY CUBE(ID1);
+------+----------+
| ID1  | Estimate |
+------+----------+
| x    |        1 |
| n    |        1 |
| NULL |        5 |
| z    |        1 |
| y    |        3 |
+------+----------+

Last modified: January 10, 2024

Was this article helpful?