CREATE PIPELINE ... INTO PROCEDURE

Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline’s data source. Using a stored procedure is one of three methods you can use to shape the data.

Syntax

CREATE PROCEDURE <procedure_name> (<query_name> QUERY(<field_name> <data_type>, ...))
AS
BEGIN
<procedure_body>
END
CREATE PIPELINE <pipeline_name>
AS LOAD DATA <load_data_options>
INTO PROCEDURE <procedure_name>

Note

For information on creating a pipeline other than using the INTO PROCEDURE clause, see CREATE PIPELINE.

Remarks

Note

The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node. For more details, see Writing Efficient Stored Procedures for Pipelines.

The following are a list of restrictions and recommendations that you should follow when using a stored procedure with your pipeline:

  • The field list in your QUERY type variable must conform to the schema of the pipeline source (Kafka, S3, etc.).

  • The query type variable must be the only parameter in the stored procedure used with your pipeline.

  • The value of <query_name> is the current batch of records that have been extracted from the pipeline’s data source.

  • DDL commands are not allowed in stored procedures that are called from pipelines.

  • Use the pipeline_source_file() built-in function in the SET clause to set a table column to the pipeline data source file.

  • The SET and WHERE clauses are executed before the stored procedure.

  • CREATE PIPELINE ... AS LOAD DATA ... IGNORE and CREATE PIPELINE ... AS LOAD DATA ... SKIP ... ERRORS only recognize parsing errors. You must specify the desired behavior in the event of constraint errors in the body of the stored procedure.

  • Transactions (e.g BEGIN TRANSACTION, COMMIT, etc.) are not allowed because the pipeline manages the transaction state for you.

  • Pipelines into stored procedures maintain the same exactly-once semantics as other pipelines, but this means certain read-write patterns (reads after writes in reshuffles or reference tables) are not allowed.

  • Upserts can be specified in the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE. Alternatively, duplicate key behavior can be specified inside of a stored procedure.

  • OUT_OF_ORDER OPTIMIZATION cannot be disabled when using pipelines into stored procedures. The order in which data is loaded into the destination table(s) must be enforced in the stored procedure itself.

  • Requires EXECUTE permission when using INTO PROCEDURE.

Equivalent CREATE PIPELINE … INTO TABLE and CREATE PIPELINE … INTO PROCEDURE Statements

For a table t containing one INT column a:

CREATE PIPELINE pipeline_without_stored_proc
AS LOAD DATA FS '/data.txt'
INTO TABLE t;

is logically equivalent to:

CREATE OR REPLACE PROCEDURE proc(batch QUERY(a INT))
AS
BEGIN
INSERT INTO t(a) SELECT * FROM batch;
END //
DELIMITER ;
CREATE PIPELINE pipeline_using_stored_proc
AS LOAD DATA FS '/data.txt'
INTO PROCEDURE proc;

Examples

Note

For simplicity, these examples extract data from the file system. For examples of using CREATE PIPELINE to extract data from other types of data sources, see other examples in this topic.

Example: Loading Data into Multiple Tables

See the second method in this example.

Example: Denormalizing Data

This example extracts, from the data source course_enrollment.txt, a list of students who are enrolled in courses. The data source contains a line per enrollment with the columns course ID (column 1) and the student ID (column 2). Following is sample data in the file.

CS-101,1000047
CS-101,1010382
CS-201,1070044
CS-201,1008022

The pipeline inserts the data extracted from the data source into the course_enrollment table that is defined as follows:

CREATE TABLE course_enrollment(course_id TEXT, first_name TEXT, last_name TEXT);

To find the student’s first name and last name, the stored procedure joins the extracted data with the student table, which is defined as follows, along with sample data.

CREATE TABLE student(id INT PRIMARY KEY, first_name TEXT, last_name TEXT);
INSERT INTO student(id, first_name, last_name) VALUES (1000047,"John","Smith");
INSERT INTO student(id, first_name, last_name) VALUES (1010382,"Mary","Smith");
INSERT INTO student(id, first_name, last_name) VALUES (1070044,"John","Doe");
INSERT INTO student(id, first_name, last_name) VALUES (1008022,"Mary","Doe");

Define the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE course_enrollment_proc(batch QUERY(course_id TEXT, student_id INT))
AS
BEGIN
INSERT INTO course_enrollment(course_id, first_name, last_name) SELECT b.course_id, s.first_name, s.last_name FROM batch b JOIN student s ON b.student_id = s.id;
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE course_enrollment_pipeline
AS LOAD DATA FS '/course_enrollment.txt'
INTO PROCEDURE course_enrollment_proc
FIELDS TERMINATED BY ',';
START PIPELINE course_enrollment_pipeline;

Retrieve the data in the course_enrollment table.

SELECT * FROM course_enrollment ORDER BY course_id, last_name, first_name;
+-----------+------------+-----------+
| course_id | first_name | last_name |
+-----------+------------+-----------+
| CS-101    | John       | Smith     |
| CS-101    | Mary       | Smith     |
| CS-201    | John       | Doe       |
| CS-201    | Mary       | Doe       |
+-----------+------------+-----------+

Example: Normalizing Data

This example extracts, from the data source passengers.txt, a list of passengers who are booked on flights. The data source contains a line per passenger with the columns flight number (column 1), first name (column 2), last name (column 3), and date of birth (column 4). Following is sample data in the file.

101,John,Smith,1990-02-08
101,Mary,Smith,1991-04-19
101,John,Doe,1986-09-09
101,Mary,Doe,1984-05-25

The pipeline splits the data extracted from the data source into two tables that are defined as follows:

CREATE TABLE passenger(id BIGINT AUTO_INCREMENT PRIMARY KEY, first_name TEXT, last_name TEXT, date_of_birth DATETIME);
CREATE TABLE passenger_list(flight_id INT, passenger_id INT, SHARD(flight_id));

Create the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE passenger_list_proc(batch QUERY(flight_id INT, first_name TEXT, last_name TEXT, date_of_birth DATETIME))
AS
BEGIN
/* Insert new passenger records from the batch into the passenger table */
INSERT INTO passenger(first_name, last_name, date_of_birth)
SELECT DISTINCT first_name, last_name, date_of_birth FROM batch b WHERE NOT EXISTS
(SELECT * FROM passenger p WHERE p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth);
/* Insert passengers ids and their flights from the batch into the passenger_list table. To get the passenger ids, join to the passenger table. */
INSERT INTO passenger_list(flight_id, passenger_id)
SELECT flight_id, p.id FROM batch b JOIN passenger p
on p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth;
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE passenger_pipeline
AS LOAD DATA FS '/passenger.txt'
INTO PROCEDURE passenger_list_proc
FIELDS TERMINATED BY ',';
START PIPELINE passenger_pipeline;

Retrieve the data from the passenger and passenger_list tables:

SELECT * FROM passenger ORDER BY id;
+----+------------+-----------+---------------------+
| id | first_name | last_name | date_of_birth       |
+----+------------+-----------+---------------------+
|  1 | Mary       | Doe       | 1984-05-25 00:00:00 |
|  2 | John       | Smith     | 1990-02-08 00:00:00 |
|  3 | Mary       | Smith     | 1991-04-19 00:00:00 |
|  4 | John       | Doe       | 1986-09-09 00:00:00 |
+----+------------+-----------+---------------------+
SELECT * FROM passenger_list ORDER by flight_id, passenger_id;
+-----------+--------------+
| flight_id | passenger_id |
+-----------+--------------+
|       101 |            1 |
|       101 |            2 |
|       101 |            3 |
|       101 |            4 |
+-----------+--------------+

Example: Loading JSON Data and Calculating an Aggregate

This example extracts a list of tweets from the data source tweets.txt. The data source contains one line per tweet, where each tweet is represented as a JSON object. Following is sample data in the file.

{"tweet_id":"100", "tweet_user_id":"200", "tweet_text":"Test tweet 1", "retweet_user_id":"502"}
{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}
{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}
{"tweet_id":"103", "tweet_user_id":"265", "tweet_text":"Test tweet 4"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}

tweet_user_id is id of the user who created the tweet. retweet_user_id is optional, and is the id of a user who retweeted the tweet. A single line in the data source can contain at most one retweet_user_id.

If more than one user has retweeted a tweet, the file will contain duplicate lines with the same tweet_id, tweet_user_id, and tweet_text but with a different retweet_user_id. This is the case with tweet_ids 101 and 102.

Note

The data in tweets.txt is for demonstration purposes only and is not intended to be real Twitter data.

The pipeline inserts the tweet_id, tweet_user_id, and tweet_text from the data source into the tweets table. Duplicate entries containing the same tweet_id are not inserted.

The retweets_counter table contains the number of tweets that have been retweeted by each user.

The tables are defined as follows:

CREATE TABLE tweets(tweet_id TEXT PRIMARY KEY, tweet_user_id TEXT, tweet_text TEXT);
CREATE TABLE retweets_counter(user_id TEXT PRIMARY KEY, num_retweets INT);

Create the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE tweets_proc(batch QUERY(tweet JSON))
AS
BEGIN
/* INSERT IGNORE does not attempt to insert records having a duplicate tweet_id into the tweets table. */
INSERT IGNORE INTO tweets(tweet_id, tweet_user_id, tweet_text)
SELECT tweet::tweet_id, tweet::tweet_user_id, tweet::tweet_text
FROM batch;
/* For each tweet in the batch, retrieve the id of the user who retweeted it. Then, for each of these retrived ids, upsert the id into the retweets_counter table. For each duplicate id encountered, add 1 to the num_retweets column. */
INSERT INTO retweets_counter(user_id, num_retweets)
SELECT tweet::retweet_user_id, 1
FROM batch
WHERE tweet::retweet_user_id IS NOT NULL
ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1;
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE tweets_pipeline
AS LOAD DATA FS '/tweets.txt'
INTO PROCEDURE tweets_proc
(tweet <- %)
FORMAT JSON;
START PIPELINE tweets_pipeline;

Retrieve the data from the tweets and retweets_counter tables.

SELECT * FROM tweets ORDER BY tweet_id;
+----------+---------------+----------------+
| tweet_id | tweet_user_id | tweet_text     |
+----------+---------------+----------------+
| "100"    | "200"         | "Test tweet 1" |
| "101"    | "213"         | "Test tweet 2" |
| "102"    | "239"         | "Test tweet 3" |
| "103"    | "265"         | "Test tweet 4" |
+----------+---------------+----------------+
SELECT * FROM retweets_counter ORDER BY user_id;
+---------+--------------+
| user_id | num_retweets |
+---------+--------------+
| "502"   |            1 |
| "511"   |            3 |
| "518"   |            2 |
+---------+--------------+

Example: Tracking a Slowly Changing Dimension

This example tracks a Type 2 Slowly Changing Dimension. The data source, product_history.txt, contains a line for each product. A product has a code (column 1), description (column 2), price (column 3), and create date of the product record (column 4). A change to a product record is entered as a new record in the file. The new record has the same code as the original product record. Following is sample data in the file.

10,notebook,3.95,2020-01-05
15,pens (10 pack),10.25,2020-01-05
15,pens w/med size point (10 pack),10.25,2020-01-13
10,notebook (200 pages),3.95,2020-01-10
15,pens with med size point (10 pack),10.25,2020-01-14
18,envelopes (50 pack),2.25,2020-01-18
15,pens with medium size point (10 pack),10.25,2020-01-19

The pipeline inserts each extracted row from product_history.txt into the table product_history. The table is defined as follows:

CREATE TABLE product_history(code INT, description TEXT, price FLOAT, start_date DATETIME,current_record BOOLEAN);

start_date is the created date of the product record in product_history.txt. current_record is 1 if the record is the latest created record for an id.

Create the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE changing_dimension_proc(batch QUERY(code INT, new_description TEXT, new_price FLOAT,start_date DATETIME))
AS
BEGIN
/* Insert all batch records into product_history. Insert 0 for current_record. */
INSERT INTO product_history(code, description, price, start_date, current_record) SELECT code, new_description, new_price, start_date,0 FROM batch;
/* Update all product_history records having a code that is found in the batch. The update sets current_record to 0. This is done so current_record can be set correctly in the next step. */
UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 0 WHERE current_record = 0;
/* Update product_history records having a code that is in the batch, setting current_record = 1 if the record has the latest start_date of all of the records in product_history with the same code. */
UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 1 WHERE ph.start_date = (SELECT max(start_date) FROM product_history WHERE code = b.code);
END //
DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE changing_dimension_pipeline
AS LOAD DATA FS '/product_history.txt'
INTO PROCEDURE changing_dimension_proc
FIELDS TERMINATED BY ',';
START PIPELINE changing_dimension_pipeline;

Retrieve the data from the product_history table:

SELECT * FROM product_history ORDER by code, start_date;
+------+---------------------------------------+-------+---------------------+----------------+
| code | description                           | price | start_date          | current_record |
+------+---------------------------------------+-------+---------------------+----------------+
|   10 | notebook                              |  3.95 | 2020-01-05 00:00:00 |              0 |
|   10 | notebook (200 pages)                  |  3.95 | 2020-01-10 00:00:00 |              1 |
|   15 | pens (10 pack)                        | 10.25 | 2020-01-05 00:00:00 |              0 |
|   15 | pens w/med size point (10 pack)       | 10.25 | 2020-01-13 00:00:00 |              0 |
|   15 | pens with med size point (10 pack)    | 10.25 | 2020-01-14 00:00:00 |              0 |
|   15 | pens with medium size point (10 pack) | 10.25 | 2020-01-19 00:00:00 |              1 |
|   18 | envelopes (50 pack)                   |  2.25 | 2020-01-18 00:00:00 |              1 |
+------+---------------------------------------+-------+---------------------+----------------+

Example: Handling Duplicate Keys Using Upserts

Upserts can be specified in the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE. Alternatively, duplicate key behavior can be specified inside the stored procedure itself, as shown in the next example.

Example: Handling Duplicate Keys by Inserting Duplicate Records into a Separate Table

This example extracts, from the data source duplicate_keys.txt, a list records that each contain a key (column 1) and a value (column 2). Following is sample data in the file.

1,33
4,28
1,45
1,56
6,88
4,67

The pipeline inserts the extracted rows from the data source into the table t, except for the extracted rows that have duplicate keys. Duplicate key records are inserted into t_duplicates. The tables are defined as follows.

CREATE TABLE t(a INT PRIMARY KEY, b INT);
CREATE TABLE t_duplicates(a INT, b INT);

Define the stored procedure:

DELIMITER //
CREATE OR REPLACE PROCEDURE duplicate_keys_proc(batch QUERY(a INT, b INT))
AS
BEGIN
FOR batch_record IN COLLECT(batch) LOOP
BEGIN
INSERT INTO t(a,b) VALUES (batch_record.a,batch_record.b);
EXCEPTION
WHEN ER_DUP_ENTRY THEN
INSERT INTO t_duplicates(a,b) VALUES (batch_record.a,batch_record.b);
END;
END LOOP;
END //
DELIMITER ;

The stored procedure handles batch records with duplicate keys by handling the ER_DUP_ENTRY exception.

Create and start the pipeline:

CREATE PIPELINE duplicate_keys_pipeline
AS LOAD DATA FS '/duplicate_keys.txt'
INTO PROCEDURE duplicate_keys_proc
FIELDS TERMINATED BY ',';
START PIPELINE duplicate_keys_pipeline;

Retrieve the data from t and t_duplicates:

SELECT * FROM t ORDER BY a;
+---+------+
| a | b    |
+---+------+
| 1 |   33 |
| 4 |   28 |
| 6 |   88 |
+---+------+
SELECT * FROM t_duplicates ORDER BY a;
+------+------+
| a    | b    |
+------+------+
|    1 |   56 |
|    1 |   45 |
|    4 |   67 |
+------+------+

Last modified: April 1, 2024

Was this article helpful?