The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node.
Performing the following operations in your stored procedure will force computation and memory usage to occur on an aggregator node. The impact of these operations are listed in the Effect column. These effects apply to all stored procedures, not pipeline stored procedures specifically.
|Using COLLECT()||Computation and memory usage on the aggregator are significant if the query type variable passed to
|Loading data into a reference table||Reference tables are typically small. In this case, computation and memory usage on the aggregator are not significant.|
|Loading data into table that contains an auto-increment column||Computation on the aggregator is significant if a lot of data is being loaded. Maximum memory usage on the aggregator is
Due to the effects of the operations listed, you should implement alternative operations in your stored procedure, when possible. The following section offers some alternatives to using
Using Alternatives to
COLLECT() in Pipeline Stored Procedures
The following examples demonstrate how to implement pipeline stored procedures without using
Example: Splitting Data into Multiple Tables
This example extracts a list of orders from the data source. The data source,
east_west_orders.txt, contains a line per order with the columns region (column 1), customer ID (column 2), order amount (column 3). Following is sample data in the file.
E,10001,20.25 W,10002,30.58 W,10003,25.73 E,10004,32.71 W,10005,23.89
The pipeline inserts each row of the extracted data into the
western_region_orders table. These tables are defined as follows:
CREATE TABLE eastern_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id)); CREATE TABLE western_region_orders(customer_id INT, amount FLOAT, SHARD KEY(customer_id));
COLLECT(), create the stored procedure:
DELIMITER // CREATE OR REPLACE PROCEDURE orders_proc(batch QUERY(region CHAR, customer_id INT, amount FLOAT)) AS BEGIN INSERT INTO eastern_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'E'; INSERT INTO western_region_orders(customer_id, amount) SELECT customer_id, amount FROM batch WHERE REGION = 'W'; END // DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE orders_pipeline AS LOAD DATA FS '/east_west_orders.txt' INTO PROCEDURE orders_proc FIELDS TERMINATED BY ','; START PIPELINE orders_pipeline;
Retrieve the contents of
SELECT * FROM eastern_region_orders ORDER BY customer_id, amount; **** +-------------+--------+ | customer_id | amount | +-------------+--------+ | 10001 | 20.25 | | 10004 | 32.71 | +-------------+--------+
SELECT * FROM western_region_orders ORDER BY customer_id, amount; **** +-------------+--------+ | customer_id | amount | +-------------+--------+ | 10002 | 30.58 | | 10003 | 25.73 | | 10005 | 23.89 | +-------------+--------+
Example: Inserting Multiple Records in the Destination Table, for Each Input Record
This example extracts a list of product SKUs and their quantities from the data source
skus_and_quantities.txt. The data source, contains a line per SKU/quantity with the columns SKU (column 1) and quantity (column 2). Following is sample data in the file.
AB27382,1 BD87218,5 EI30283,2 WT73629,3
The pipeline inserts the data extracted from the data source into the table
serial_numbers_and_skus, which is defined as follows.
CREATE TABLE serial_numbers_and_skus(serial_number TEXT DEFAULT NULL, sku TEXT, SHARD KEY(serial_number));
Every SKU in the table has one or more associated serial numbers. In this example, the serial numbers are not populated by the pipeline stored procedure.
COLLECT(), you write a pipeline stored procedure that performs a join of the batch to the
numbers table, based on the quantity field in each record of the batch. This will allow you to create
quantity number of records in the destination table for each record in the batch.
First, create the
numbers table and populate it with sequential numbers up to
CREATE REFERENCE TABLE numbers(n INT); DELIMITER // CREATE OR REPLACE PROCEDURE populate_numbers_table(max_value INT) AS BEGIN FOR i in 1 .. max_value LOOP INSERT INTO numbers(n) VALUES (i); END LOOP; END // DELIMITER ; CALL populate_numbers_table(100);
100 input for
max_value is arbitrary. Use an input that is the maximum number of rows in the
numbers table you want to create for each SKU.
Create the stored procedure:
DELIMITER // CREATE OR REPLACE PROCEDURE serial_and_skus_proc(batch QUERY(sku TEXT, quantity INT)) AS BEGIN INSERT INTO serial_numbers_and_skus(sku) SELECT sku FROM batch JOIN numbers ON numbers.n <= batch.quantity; END // DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE serial_numbers_and_skus_pipeline AS LOAD DATA FS '/serial_numbers_and_skus.txt' INTO PROCEDURE serial_and_skus_proc FIELDS TERMINATED BY ','; START PIPELINE serial_numbers_and_skus_pipeline;
Retrieve the contents of
NULL serial numbers are expected; populating the serial numbers are outside the scope of this example.
SELECT * FROM serial_numbers_and_skus ORDER BY sku; **** +---------------+---------+ | serial_number | sku | +---------------+---------+ | NULL | AB27382 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | BD87218 | | NULL | EI30283 | | NULL | EI30283 | | NULL | WT73629 | | NULL | WT73629 | | NULL | WT73629 | +---------------+---------+
Loading data into a reference table will cause the pipeline’s source data to be ingested through the aggregator. In this example, the source data is not ingested through the aggregator, because although the
numbers reference table is utilized, no data is ingested into the table.