Writing Efficient Stored Procedures for Pipelines

Using a stored procedure with a pipeline is a method you can use to shape data that your pipeline extracts from a data source.

Note

The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node.

Performing the following operations in your stored procedure will force computation and memory usage to occur on an aggregator node. The impact of these operations are listed in the Effect column. These effects apply to all stored procedures, not pipeline stored procedures specifically.

Operation

Effect

Using COLLECT

Computation and memory usage on the aggregator are significant if the query type variable passed to COLLECT() contains a query that returns a large resultset.

Loading data into a reference table

Reference tables are typically small. In this case, computation and memory usage on the aggregator are not significant.

Loading data into table that contains an auto-increment column

Computation on the aggregator is significant if a lot of data is being loaded. Maximum memory usage on the aggregator is 20000 rows * the number of partitions.

Due to the effects of the operations listed, you should implement alternative operations in your stored procedure, when possible. The following section offers some alternatives to using COLLECT().

Using Alternatives to COLLECT() in Pipeline Stored Procedures

The following examples demonstrate how to implement pipeline stored procedures without using COLLECT().

Example: Splitting Data into Multiple Tables

This example extracts a list of orders from the data source. The data source, east_west_orders.txt, contains a line per order with the columns region (column 1), customer ID (column 2), order amount (column 3). Following is sample data in the file.

E,10001,20.25
W,10002,30.58
W,10003,25.73
E,10004,32.71
W,10005,23.89

The pipeline inserts each row of the extracted data into the eastern_region_orders or western_region_orders table. These tables are defined as follows:

CREATE TABLE eastern_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));
CREATE TABLE western_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));

Without using COLLECT(), create the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE orders_proc(batch QUERY(region CHAR, customer_id INT, amount FLOAT))
AS
BEGIN
INSERT INTO eastern_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'E';
INSERT INTO western_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'W';
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE orders_pipeline
AS LOAD DATA FS '/east_west_orders.txt'
INTO PROCEDURE orders_proc
FIELDS TERMINATED BY ',';
START PIPELINE orders_pipeline;

Retrieve the contents of eastern_region_orders and western_region_orders:

SELECT * FROM eastern_region_orders ORDER BY customer_id, amount;
+-------------+--------+
| customer_id | amount |
+-------------+--------+
|       10001 |  20.25 |
|       10004 |  32.71 |
+-------------+--------+
SELECT * FROM western_region_orders ORDER BY customer_id, amount;
+-------------+--------+
| customer_id | amount |
+-------------+--------+
|       10002 |  30.58 |
|       10003 |  25.73 |
|       10005 |  23.89 |
+-------------+--------+

Example: Inserting Multiple Records in the Destination Table, for Each Input Record

This example extracts a list of product SKUs and their quantities from the data source skus_and_quantities.txt. The data source, contains a line per SKU/quantity with the columns SKU (column 1) and quantity (column 2). Following is sample data in the file.

AB27382,1
BD87218,5
EI30283,2
WT73629,3

The pipeline inserts the data extracted from the data source into the table serial_numbers_and_skus, which is defined as follows.

CREATE TABLE serial_numbers_and_skus(serial_number TEXT DEFAULT NULL, sku TEXT, SHARD KEY(serial_number));

Every SKU in the table has one or more associated serial numbers. In this example, the serial numbers are not populated by the pipeline stored procedure.

Without using COLLECT(), you write a pipeline stored procedure that performs a join of the batch to the numbers table, based on the quantity field in each record of the batch. This will allow you to create quantity number of records in the destination table for each record in the batch.

First, create the numbers table and populate it with sequential numbers up to max_value.

CREATE REFERENCE TABLE numbers(n INT);
DELIMITER //
CREATE OR REPLACE PROCEDURE populate_numbers_table(max_value INT)
AS
BEGIN
FOR i in 1 .. max_value LOOP
INSERT INTO numbers(n) VALUES (i);
END LOOP;
END //
DELIMITER ;
CALL populate_numbers_table(100);

Here, the 100 input for max_value is arbitrary. Use an input that is the maximum number of rows in the numbers table you want to create for each SKU.

Create the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE serial_and_skus_proc(batch QUERY(sku TEXT, quantity INT))
AS
BEGIN
INSERT INTO serial_numbers_and_skus(sku) SELECT sku FROM batch
JOIN numbers ON numbers.n <= batch.quantity;
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE serial_numbers_and_skus_pipeline
AS LOAD DATA FS '/serial_numbers_and_skus.txt'
INTO PROCEDURE serial_and_skus_proc
FIELDS TERMINATED BY ',';
START PIPELINE serial_numbers_and_skus_pipeline;

Retrieve the contents of serial_numbers_and_skus. The NULL serial numbers are expected; populating the serial numbers are outside the scope of this example.

SELECT * FROM serial_numbers_and_skus ORDER BY sku;
+---------------+---------+
| serial_number | sku     |
+---------------+---------+
| NULL          | AB27382 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | BD87218 |
| NULL          | EI30283 |
| NULL          | EI30283 |
| NULL          | WT73629 |
| NULL          | WT73629 |
| NULL          | WT73629 |
+---------------+---------+

Note

Loading data into a reference table will cause the pipeline’s source data to be ingested through the aggregator. In this example, the source data is not ingested through the aggregator, because although the numbers reference table is utilized, no data is ingested into the table.

Last modified: September 5, 2023

Was this article helpful?