Writing Efficient Stored Procedures for Pipelines
Using a stored procedure with a pipeline is a method you can use to shape data that your pipeline extracts from a data source.
Note
The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node.
Performing the following operations in your stored procedure will force computation and memory usage to occur on an aggregator node. The impact of these operations are listed in the Effect column. These effects apply to all stored procedures, not pipeline stored procedures specifically.
Operation | Effect |
---|---|
Using COLLECT | Computation and memory usage on the aggregator are significant if the query type variable passed to |
Loading data into a reference table | Reference tables are typically small. In this case, computation and memory usage on the aggregator are not significant. |
Loading data into table that contains an auto-increment column | Computation on the aggregator is significant if a lot of data is being loaded. Maximum memory usage on the aggregator is |
Due to the effects of the operations listed, you should implement alternative operations in your stored procedure, when possible. The following section offers some alternatives to using COLLECT()
.
Using Alternatives to COLLECT()
in Pipeline Stored Procedures
The following examples demonstrate how to implement pipeline stored procedures without using COLLECT()
.
Example: Splitting Data into Multiple Tables
This example extracts a list of orders from the data source. The data source, east_west_orders.txt
, contains a line per order with the columns region (column 1), customer ID (column 2), order amount (column 3). Following is sample data in the file.
E,10001,20.25 W,10002,30.58 W,10003,25.73 E,10004,32.71 W,10005,23.89
The pipeline inserts each row of the extracted data into the eastern_region_orders
or western_region_orders
table. These tables are defined as follows:
CREATE TABLE eastern_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id)); CREATE TABLE western_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));
Without using COLLECT()
, create the stored procedure:
DELIMITER // CREATE OR REPLACE PROCEDURE orders_proc(batch QUERY(region CHAR, customer_id INT, amount FLOAT)) AS BEGIN INSERT INTO eastern_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'E'; INSERT INTO western_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'W'; END // DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE orders_pipeline AS LOAD DATA FS '/east_west_orders.txt' INTO PROCEDURE orders_proc FIELDS TERMINATED BY ','; START PIPELINE orders_pipeline;
Retrieve the contents of eastern_region_orders
and western_region_orders
:
SELECT * FROM eastern_region_orders ORDER BY customer_id, amount; **** +-------------+--------+ | customer_id | amount | +-------------+--------+ | 10001 | 20.25 | | 10004 | 32.71 | +-------------+--------+
SELECT * FROM western_region_orders ORDER BY customer_id, amount; **** +-------------+--------+ | customer_id | amount | +-------------+--------+ | 10002 | 30.58 | | 10003 | 25.73 | | 10005 | 23.89 | +-------------+--------+
Example: Inserting Multiple Records in the Destination Table, for Each Input Record
This example extracts a list of product SKUs and their quantities from the data source skus_and_quantities.txt
. The data source, contains a line per SKU/quantity with the columns SKU (column 1) and quantity (column 2). Following is sample data in the file.
AB27382,1 BD87218,5 EI30283,2 WT73629,3
The pipeline inserts the data extracted from the data source into the table serial_numbers_and_skus
, which is defined as follows.
CREATE TABLE serial_numbers_and_skus(serial_number TEXT DEFAULT NULL, sku TEXT, SHARD KEY(serial_number));
Every SKU in the table has one or more associated serial numbers. In this example, the serial numbers are not populated by the pipeline stored procedure.
Without using COLLECT()
, you write a pipeline stored procedure that performs a join of the batch to the numbers
table, based on the quantity field in each record of the batch. This will allow you to create quantity
number of records in the destination table for each record in the batch.
First, create the numbers
table and populate it with sequential numbers up to max_value
.
CREATE REFERENCE TABLE numbers(n INT); DELIMITER // CREATE OR REPLACE PROCEDURE populate_numbers_table(max_value INT) AS BEGIN FOR i in 1 .. max_value LOOP INSERT INTO numbers(n) VALUES (i); END LOOP; END // DELIMITER ; CALL populate_numbers_table(100);
Here, the 100
input for max_value
is arbitrary. Use an input that is the maximum number of rows in the numbers
table you want to create for each SKU.
Create the stored procedure:
DELIMITER // CREATE OR REPLACE PROCEDURE serial_and_skus_proc(batch QUERY(sku TEXT, quantity INT)) AS BEGIN INSERT INTO serial_numbers_and_skus(sku) SELECT sku FROM batch JOIN numbers ON numbers.n <= batch.quantity; END // DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE serial_numbers_and_skus_pipeline AS LOAD DATA FS '/serial_numbers_and_skus.txt' INTO PROCEDURE serial_and_skus_proc FIELDS TERMINATED BY ','; START PIPELINE serial_numbers_and_skus_pipeline;
Retrieve the contents of serial_numbers_and_skus
. The NULL
serial numbers are expected; populating the serial numbers are outside the scope of this example.
SELECT * FROM serial_numbers_and_skus ORDER BY sku; **** +---------------+---------+ | serial_number | sku | +---------------+---------+ | NULL | AB27382 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | EI30283 | | NULL | EI30283 | | NULL | WT73629 | | NULL | WT73629 | | NULL | WT73629 | +---------------+---------+
Note
Loading data into a reference table will cause the pipeline’s source data to be ingested through the aggregator. In this example, the source data is not ingested through the aggregator, because although the numbers
reference table is utilized, no data is ingested into the table.