# Writing Efficient Stored Procedures for Pipelines

Using a [stored procedure](https://docs.singlestore.com/cloud/reference/sql-reference/procedural-sql-reference/create-procedure.md) with a pipeline is [a method](https://docs.singlestore.com/cloud/load-data/about-singlestore-pipelines/pipeline-concepts/data-shaping-with-pipelines/#methods-for-data-shaping-with-pipelines.md) you can use to shape data that your pipeline extracts from a data source.

> **📝 Note**: The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node.

Performing the following operations in your stored procedure will force computation and memory usage to occur on an aggregator node. The impact of these operations are listed in the Effect column. These effects apply to all stored procedures, not pipeline stored procedures specifically.

| Operation                                                                                                                                                                | Effect                                                                                                                                                          |
| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Using[COLLECT](https://docs.singlestore.com/cloud/reference/sql-reference/procedural-sql-reference/collect.md)                                                           | Computation and memory usage on the aggregator are significant if the query type variable passed to`COLLECT()`contains a query that returns a large resultset.  |
| Loading data into a[reference table](https://docs.singlestore.com/cloud/create-a-database/other-schema-concepts.md)                                                      | Reference tables are typically small. In this case, computation and memory usage on the aggregator are not significant.                                         |
| Loading data into table that contains an[auto-increment column](https://docs.singlestore.com/cloud/reference/sql-reference/data-definition-language-ddl/create-table.md) | Computation on the aggregator is significant if a lot of data is being loaded. Maximum memory usage on the aggregator is`20000`rows`*`the number of partitions. |

Due to the effects of the operations listed, you should implement alternative operations in your stored procedure, when possible. The following section offers some alternatives to using `COLLECT()`.

## Using Alternatives to `COLLECT()` in Pipeline Stored Procedures

The following examples demonstrate how to implement pipeline stored procedures without using `COLLECT()`.

## Example: Splitting Data into Multiple Tables

This example extracts a list of orders from the data source. The data source, `east_west_orders.txt`, contains a line per order with the columns region (column 1), customer ID (column 2), order amount (column 3). Following is sample data in the file.

```
E,10001,20.25
W,10002,30.58
W,10003,25.73
E,10004,32.71
W,10005,23.89

```

The pipeline inserts each row of the extracted data into the `eastern_region_orders` or `western_region_orders` table. These tables are defined as follows:

```sql
CREATE TABLE eastern_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));
CREATE TABLE western_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));

```

Without using `COLLECT()`, create the stored procedure:

```sql
DELIMITER //

CREATE OR REPLACE PROCEDURE orders_proc(batch QUERY(region CHAR, customer_id INT, amount FLOAT))
AS
BEGIN
  INSERT INTO eastern_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'E';
  INSERT INTO western_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'W';
END //

DELIMITER ;

```

Create and start the pipeline:

```sql
CREATE PIPELINE orders_pipeline
AS LOAD DATA FS '/east_west_orders.txt'
INTO PROCEDURE orders_proc
FIELDS TERMINATED BY ',';

START PIPELINE orders_pipeline;

```

Retrieve the contents of `eastern_region_orders` and `western_region_orders`:

```sql
SELECT * FROM eastern_region_orders ORDER BY customer_id, amount;

```

```output

+-------------+--------+
| customer_id | amount |
+-------------+--------+
|       10001 |  20.25 |
|       10004 |  32.71 |
+-------------+--------+

```

```sql
SELECT * FROM western_region_orders ORDER BY customer_id, amount;

```

```output

+-------------+--------+
| customer_id | amount |
+-------------+--------+
|       10002 |  30.58 |
|       10003 |  25.73 |
|       10005 |  23.89 |
+-------------+--------+

```

## Example: Inserting Multiple Records in the Destination Table, for Each Input Record

This example extracts a list of product SKUs and their quantities from the data source `skus_and_quantities.txt`. The data source, contains a line per SKU/quantity with the columns SKU (column 1) and quantity (column 2). Following is sample data in the file.

```
AB27382,1
BD87218,5
EI30283,2
WT73629,3

```

The pipeline inserts the data extracted from the data source into the table `serial_numbers_and_skus`, which is defined as follows.

```sql
CREATE TABLE serial_numbers_and_skus(serial_number TEXT DEFAULT NULL, sku TEXT, SHARD KEY(serial_number));

```

Every SKU in the table has one or more associated serial numbers. In this example, the serial numbers are not populated by the pipeline stored procedure.

Without using `COLLECT()`, you write a pipeline stored procedure that performs a join of the batch to the `numbers` table, based on the quantity field in each record of the batch. This will allow you to create `quantity` number of records in the destination table for each record in the batch.

First, create the `numbers` table and populate it with sequential numbers up to `max_value`.

```sql
CREATE REFERENCE TABLE numbers(n INT);

DELIMITER //

CREATE OR REPLACE PROCEDURE populate_numbers_table(max_value INT)
AS
BEGIN
    FOR i in 1 .. max_value LOOP
        INSERT INTO numbers(n) VALUES (i);
    END LOOP;
END //

DELIMITER ;

CALL populate_numbers_table(100);

```

Here, the `100` input for `max_value` is arbitrary. Use an input that is the maximum number of rows in the `numbers` table you want to create for each SKU.

Create the stored procedure:

```sql
DELIMITER //

CREATE OR REPLACE PROCEDURE serial_and_skus_proc(batch QUERY(sku TEXT, quantity INT))
AS
BEGIN
    INSERT INTO serial_numbers_and_skus(sku) SELECT sku FROM batch
    JOIN numbers ON numbers.n <= batch.quantity;
END //

DELIMITER ;

```

Create and start the pipeline:

```sql
CREATE PIPELINE serial_numbers_and_skus_pipeline
AS LOAD DATA FS '/serial_numbers_and_skus.txt'
INTO PROCEDURE serial_and_skus_proc
FIELDS TERMINATED BY ',';

START PIPELINE serial_numbers_and_skus_pipeline;

```

Retrieve the contents of `serial_numbers_and_skus`. The `NULL` serial numbers are expected; populating the serial numbers are outside the scope of this example.

```sql
SELECT * FROM serial_numbers_and_skus ORDER BY sku;

```

```output

+---------------+---------+
| serial_number | sku     |
+---------------+---------+
| NULL          | AB27382 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | EI30283 |
| NULL          | EI30283 |
| NULL          | WT73629 |
| NULL          | WT73629 |
| NULL          | WT73629 |
+---------------+---------+

```

> **📝 Note**: Loading data into a reference table will cause the pipeline’s source data to be ingested through the aggregator. In this example, the source data is not ingested through the aggregator, because although the `numbers` reference table is utilized, no data is ingested into the table.

***

Modified at: September 5, 2023

Source: [/cloud/load-data/about-singlestore-pipelines/pipeline-concepts/writing-efficient-stored-procedures-for-pipelines/](https://docs.singlestore.com/cloud/load-data/about-singlestore-pipelines/pipeline-concepts/writing-efficient-stored-procedures-for-pipelines/)

(An index of the documentation is available at /llms.txt)
