Writing Efficient Stored Procedures for Pipelines
On this page
Using a stored procedure with a pipeline is a method you can use to shape data that your pipeline extracts from a data source.
Note
The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline.
Performing the following operations in your stored procedure will force computation and memory usage to occur on an aggregator node.
Operation |
Effect |
---|---|
Using COLLECT |
Computation and memory usage on the aggregator are significant if the query type variable passed to |
Loading data into a reference table |
Reference tables are typically small. |
Loading data into table that contains an auto-increment column |
Computation on the aggregator is significant if a lot of data is being loaded. |
Due to the effects of the operations listed, you should implement alternative operations in your stored procedure, when possible.COLLECT()
.
Using Alternatives to COLLECT()
in Pipeline Stored Procedures
The following examples demonstrate how to implement pipeline stored procedures without using COLLECT()
.
Example: Splitting Data into Multiple Tables
This example extracts a list of orders from the data source.east_
, contains a line per order with the columns region (column 1), customer ID (column 2), order amount (column 3).
E,10001,20.25
W,10002,30.58
W,10003,25.73
E,10004,32.71
W,10005,23.89
The pipeline inserts each row of the extracted data into the eastern_
or western_
table.
CREATE TABLE eastern_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));CREATE TABLE western_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));
Without using COLLECT()
, create the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE orders_proc(batch QUERY(region CHAR, customer_id INT, amount FLOAT))ASBEGININSERT INTO eastern_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'E';INSERT INTO western_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'W';END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE orders_pipelineAS LOAD DATA FS '/east_west_orders.txt'INTO PROCEDURE orders_procFIELDS TERMINATED BY ',';START PIPELINE orders_pipeline;
Retrieve the contents of eastern_
and western_
:
SELECT * FROM eastern_region_orders ORDER BY customer_id, amount;
+-------------+--------+
| customer_id | amount |
+-------------+--------+
| 10001 | 20.25 |
| 10004 | 32.71 |
+-------------+--------+
SELECT * FROM western_region_orders ORDER BY customer_id, amount;
+-------------+--------+
| customer_id | amount |
+-------------+--------+
| 10002 | 30.58 |
| 10003 | 25.73 |
| 10005 | 23.89 |
+-------------+--------+
Example: Inserting Multiple Records in the Destination Table, for Each Input Record
This example extracts a list of product SKUs and their quantities from the data source skus_
.
AB27382,1
BD87218,5
EI30283,2
WT73629,3
The pipeline inserts the data extracted from the data source into the table serial_
, which is defined as follows.
CREATE TABLE serial_numbers_and_skus(serial_number TEXT DEFAULT NULL, sku TEXT, SHARD KEY(serial_number));
Every SKU in the table has one or more associated serial numbers.
Without using COLLECT()
, you write a pipeline stored procedure that performs a join of the batch to the numbers
table, based on the quantity field in each record of the batch.quantity
number of records in the destination table for each record in the batch.
First, create the numbers
table and populate it with sequential numbers up to max_
.
CREATE REFERENCE TABLE numbers(n INT);DELIMITER //CREATE OR REPLACE PROCEDURE populate_numbers_table(max_value INT)ASBEGINFOR i in 1 .. max_value LOOPINSERT INTO numbers(n) VALUES (i);END LOOP;END //DELIMITER ;CALL populate_numbers_table(100);
Here, the 100
input for max_
is arbitrary.numbers
table you want to create for each SKU.
Create the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE serial_and_skus_proc(batch QUERY(sku TEXT, quantity INT))ASBEGININSERT INTO serial_numbers_and_skus(sku) SELECT sku FROM batchJOIN numbers ON numbers.n <= batch.quantity;END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE serial_numbers_and_skus_pipelineAS LOAD DATA FS '/serial_numbers_and_skus.txt'INTO PROCEDURE serial_and_skus_procFIELDS TERMINATED BY ',';START PIPELINE serial_numbers_and_skus_pipeline;
Retrieve the contents of serial_
.NULL
serial numbers are expected; populating the serial numbers are outside the scope of this example.
SELECT * FROM serial_numbers_and_skus ORDER BY sku;
+---------------+---------+
| serial_number | sku |
+---------------+---------+
| NULL | AB27382 |
| NULL | BD87218 |
| NULL | BD87218 |
| NULL | BD87218 |
| NULL | BD87218 |
| NULL | BD87218 |
| NULL | EI30283 |
| NULL | EI30283 |
| NULL | WT73629 |
| NULL | WT73629 |
| NULL | WT73629 |
+---------------+---------+
Note
Loading data into a reference table will cause the pipeline’s source data to be ingested through the aggregator.numbers
reference table is utilized, no data is ingested into the table.
Last modified: September 5, 2023