CREATE PIPELINE .
On this page
Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline’s data source.
Syntax
CREATE PROCEDURE <procedure_name> (<query_name> QUERY(<field_name> <data_type>, ...))ASBEGIN<procedure_body>ENDCREATE PIPELINE <pipeline_name>AS LOAD DATA <load_data_options>INTO PROCEDURE <procedure_name>
Note
For information on creating a pipeline other than using the INTO PROCEDURE clause, see CREATE PIPELINE.
Remarks
Note
The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline.
The following are a list of restrictions and recommendations that you should follow when using a stored procedure with your pipeline:
- 
        The field list in your QUERY type variable must conform to the schema of the pipeline source (Kafka, S3, etc. ). 
- 
        The query type variable must be the only parameter in the stored procedure used with your pipeline. 
- 
        The value of <query_is the current batch of records that have been extracted from the pipeline’s data source.name> 
- 
        DDL commands are not allowed in stored procedures that are called from pipelines. 
- 
        The ECHO SELECTcommand can not be used in stored procedures that are called from pipelines.For more information, refer to ECHO SELECT. 
- 
        Use the pipeline_built-in function in thesource_ file() SETclause to set a table column to the pipeline data source file.
- 
        The SETandWHEREclauses are executed before the stored procedure.
- 
        CREATE PIPELINE .and. . AS LOAD DATA . . . IGNORE CREATE PIPELINE .only recognize parsing errors.. . AS LOAD DATA . . . SKIP . . . ERRORS You must specify the desired behavior in the event of constraint errors in the body of the stored procedure. 
- 
        Transactions (e. g BEGIN TRANSACTION,COMMIT, etc.) are not allowed because the pipeline manages the transaction state for you. 
- 
        Pipelines into stored procedures maintain the same exactly-once semantics as other pipelines, but this means certain read-write patterns (reads after writes in reshuffles or reference tables) are not allowed. 
- 
        Upserts can be specified in the ON DUPLICATE KEY UPDATEclause ofCREATE PIPELINE.Alternatively, duplicate key behavior can be specified inside of a stored procedure. 
- 
        OUT_cannot be disabled when using pipelines into stored procedures.OF_ ORDER OPTIMIZATION The order in which data is loaded into the destination table(s) must be enforced in the stored procedure itself. 
- 
        Requires EXECUTEpermission when usingINTO PROCEDURE.
Note
SingleStore does not support the DISABLE OUT_ clause in CREATE PIPELINE . statements.
To preserve the order in which records are ingested from Kafka within each pipeline batch:
- 
          Set MAX_.PARTITIONS_ PER_ BATCH = 1 This ensures that each batch is read from a single Kafka partition and processed on a single leaf node. Refer to MAX_ PARTITIONS_ PER_ BATCH for more information. 
- 
          Explicitly sort the records within the batch using an ORDER BYclause inside the stored procedure.
Kafka preserves message ordering only within individual partitions.
Equivalent CREATE PIPELINE … INTO TABLE and CREATE PIPELINE … INTO PROCEDURE Statements
For a table t containing one INT column a:
CREATE PIPELINE pipeline_without_stored_procAS LOAD DATA FS '/data.txt'INTO TABLE t;
is logically equivalent to:
CREATE OR REPLACE PROCEDURE proc(batch QUERY(a INT))ASBEGININSERT INTO t(a) SELECT * FROM batch;END //DELIMITER ;CREATE PIPELINE pipeline_using_stored_procAS LOAD DATA FS '/data.txt'INTO PROCEDURE proc;
Examples
Note
For simplicity, these examples extract data from the file system.CREATE PIPELINE to extract data from other types of data sources, see other examples in this topic.
Example: Loading Data into Multiple Tables
See the second method in this example.
Example: Denormalizing Data
This example extracts, from the data source course_, a list of students who are enrolled in courses.
CS-101,1000047
CS-101,1010382
CS-201,1070044
CS-201,1008022The pipeline inserts the data extracted from the data source into the course_ table that is defined as follows:
CREATE TABLE course_enrollment(course_id TEXT, first_name TEXT, last_name TEXT);
To find the student’s first name and last name, the stored procedure joins the extracted data with the student table, which is defined as follows, along with sample data.
CREATE TABLE student(id INT PRIMARY KEY, first_name TEXT, last_name TEXT);INSERT INTO student(id, first_name, last_name) VALUES (1000047,"John","Smith");INSERT INTO student(id, first_name, last_name) VALUES (1010382,"Mary","Smith");INSERT INTO student(id, first_name, last_name) VALUES (1070044,"John","Doe");INSERT INTO student(id, first_name, last_name) VALUES (1008022,"Mary","Doe");
Define the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE course_enrollment_proc(batch QUERY(course_id TEXT, student_id INT))ASBEGININSERT INTO course_enrollment(course_id, first_name, last_name) SELECT b.course_id, s.first_name, s.last_name FROM batch b JOIN student s ON b.student_id = s.id;END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE course_enrollment_pipelineAS LOAD DATA FS '/course_enrollment.txt'INTO PROCEDURE course_enrollment_procFIELDS TERMINATED BY ',';START PIPELINE course_enrollment_pipeline;
Retrieve the data in the course_ table.
SELECT * FROM course_enrollment ORDER BY course_id, last_name, first_name;
+-----------+------------+-----------+
| course_id | first_name | last_name |
+-----------+------------+-----------+
| CS-101    | John       | Smith     |
| CS-101    | Mary       | Smith     |
| CS-201    | John       | Doe       |
| CS-201    | Mary       | Doe       |
+-----------+------------+-----------+Example: Normalizing Data
This example extracts, from the data source passengers., a list of passengers who are booked on flights.
101,John,Smith,1990-02-08
101,Mary,Smith,1991-04-19
101,John,Doe,1986-09-09
101,Mary,Doe,1984-05-25The pipeline splits the data extracted from the data source into two tables that are defined as follows:
CREATE TABLE passenger(id BIGINT AUTO_INCREMENT PRIMARY KEY, first_name TEXT, last_name TEXT, date_of_birth DATETIME);CREATE TABLE passenger_list(flight_id INT, passenger_id INT, SHARD(flight_id));
Create the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE passenger_list_proc(batch QUERY(flight_id INT, first_name TEXT, last_name TEXT, date_of_birth DATETIME))ASBEGIN/* Insert new passenger records from the batch into the passenger table */INSERT INTO passenger(first_name, last_name, date_of_birth)SELECT DISTINCT first_name, last_name, date_of_birth FROM batch b WHERE NOT EXISTS(SELECT * FROM passenger p WHERE p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth);/* Insert passengers ids and their flights from the batch into the passenger_list table. To get the passenger ids, join to the passenger table. */INSERT INTO passenger_list(flight_id, passenger_id)SELECT flight_id, p.id FROM batch b JOIN passenger pon p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth;END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE passenger_pipelineAS LOAD DATA FS '/passenger.txt'INTO PROCEDURE passenger_list_procFIELDS TERMINATED BY ',';START PIPELINE passenger_pipeline;
Retrieve the data from the passenger and passenger_ tables:
SELECT * FROM passenger ORDER BY id;
+----+------------+-----------+---------------------+
| id | first_name | last_name | date_of_birth       |
+----+------------+-----------+---------------------+
|  1 | Mary       | Doe       | 1984-05-25 00:00:00 |
|  2 | John       | Smith     | 1990-02-08 00:00:00 |
|  3 | Mary       | Smith     | 1991-04-19 00:00:00 |
|  4 | John       | Doe       | 1986-09-09 00:00:00 |
+----+------------+-----------+---------------------+SELECT * FROM passenger_list ORDER by flight_id, passenger_id;
+-----------+--------------+
| flight_id | passenger_id |
+-----------+--------------+
|       101 |            1 |
|       101 |            2 |
|       101 |            3 |
|       101 |            4 |
+-----------+--------------+Example: Loading JSON Data and Calculating an Aggregate
This example extracts a list of tweets from the data source tweets..
{"tweet_id":"100", "tweet_user_id":"200", "tweet_text":"Test tweet 1", "retweet_user_id":"502"}{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}{"tweet_id":"103", "tweet_user_id":"265", "tweet_text":"Test tweet 4"}{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}
tweet_ is id of the user who created the tweet.retweet_ is optional, and is the id of a user who retweeted the tweet.retweet_.
If more than one user has retweeted a tweet, the file will contain duplicate lines with the same tweet_, tweet_, and tweet_ but with a different retweet_.tweet_s 101 and 102.
Note
The data in tweets. is for demonstration purposes only and is not intended to be real Twitter data.
The pipeline inserts the tweet_, tweet_, and tweet_ from the data source into the tweets table.tweet_ are not inserted.
The retweets_ table contains the number of tweets that have been retweeted by each user.
The tables are defined as follows:
CREATE TABLE tweets(tweet_id TEXT PRIMARY KEY, tweet_user_id TEXT, tweet_text TEXT);CREATE TABLE retweets_counter(user_id TEXT PRIMARY KEY, num_retweets INT);
Create the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE tweets_proc(batch QUERY(tweet JSON))ASBEGIN/* INSERT IGNORE does not attempt to insert records having a duplicate tweet_id into the tweets table. */INSERT IGNORE INTO tweets(tweet_id, tweet_user_id, tweet_text)SELECT tweet::tweet_id, tweet::tweet_user_id, tweet::tweet_textFROM batch;/* For each tweet in the batch, retrieve the id of the user who retweeted it. Then, for each of these retrived ids, upsert the id into the retweets_counter table. For each duplicate id encountered, add 1 to the num_retweets column. */INSERT INTO retweets_counter(user_id, num_retweets)SELECT tweet::retweet_user_id, 1FROM batchWHERE tweet::retweet_user_id IS NOT NULLON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1;END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE tweets_pipelineAS LOAD DATA FS '/tweets.txt'INTO PROCEDURE tweets_proc(tweet <- %)FORMAT JSON;START PIPELINE tweets_pipeline;
Retrieve the data from the tweets and retweets_ tables.
SELECT * FROM tweets ORDER BY tweet_id;
+----------+---------------+----------------+
| tweet_id | tweet_user_id | tweet_text     |
+----------+---------------+----------------+
| "100"    | "200"         | "Test tweet 1" |
| "101"    | "213"         | "Test tweet 2" |
| "102"    | "239"         | "Test tweet 3" |
| "103"    | "265"         | "Test tweet 4" |
+----------+---------------+----------------+SELECT * FROM retweets_counter ORDER BY user_id;
+---------+--------------+
| user_id | num_retweets |
+---------+--------------+
| "502"   |            1 |
| "511"   |            3 |
| "518"   |            2 |
+---------+--------------+Example: Tracking a Slowly Changing Dimension
This example tracks a Type 2 Slowly Changing Dimension.product_, contains a line for each product.
10,notebook,3.95,2020-01-05
15,pens (10 pack),10.25,2020-01-05
15,pens w/med size point (10 pack),10.25,2020-01-13
10,notebook (200 pages),3.95,2020-01-10
15,pens with med size point (10 pack),10.25,2020-01-14
18,envelopes (50 pack),2.25,2020-01-18
15,pens with medium size point (10 pack),10.25,2020-01-19The pipeline inserts each extracted row from product_ into the table product_.
CREATE TABLE product_history(code INT, description TEXT, price FLOAT, start_date DATETIME,current_record BOOLEAN);
start_ is the created date of the product record in product_.current_ is 1 if the record is the latest created record for an id.
Create the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE changing_dimension_proc(batch QUERY(code INT, new_description TEXT, new_price FLOAT,start_date DATETIME))ASBEGIN/* Insert all batch records into product_history. Insert 0 for current_record. */INSERT INTO product_history(code, description, price, start_date, current_record) SELECT code, new_description, new_price, start_date,0 FROM batch;/* Update all product_history records having a code that is found in the batch. The update sets current_record to 0. This is done so current_record can be set correctly in the next step. */UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 0 WHERE current_record = 0;/* Update product_history records having a code that is in the batch, setting current_record = 1 if the record has the latest start_date of all of the records in product_history with the same code. */UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 1 WHERE ph.start_date = (SELECT max(start_date) FROM product_history WHERE code = b.code);END //DELIMITER ;
Create and start the pipeline:
CREATE PIPELINE changing_dimension_pipelineAS LOAD DATA FS '/product_history.txt'INTO PROCEDURE changing_dimension_procFIELDS TERMINATED BY ',';START PIPELINE changing_dimension_pipeline;
Retrieve the data from the product_ table:
SELECT * FROM product_history ORDER by code, start_date;
+------+---------------------------------------+-------+---------------------+----------------+
| code | description                           | price | start_date          | current_record |
+------+---------------------------------------+-------+---------------------+----------------+
|   10 | notebook                              |  3.95 | 2020-01-05 00:00:00 |              0 |
|   10 | notebook (200 pages)                  |  3.95 | 2020-01-10 00:00:00 |              1 |
|   15 | pens (10 pack)                        | 10.25 | 2020-01-05 00:00:00 |              0 |
|   15 | pens w/med size point (10 pack)       | 10.25 | 2020-01-13 00:00:00 |              0 |
|   15 | pens with med size point (10 pack)    | 10.25 | 2020-01-14 00:00:00 |              0 |
|   15 | pens with medium size point (10 pack) | 10.25 | 2020-01-19 00:00:00 |              1 |
|   18 | envelopes (50 pack)                   |  2.25 | 2020-01-18 00:00:00 |              1 |
+------+---------------------------------------+-------+---------------------+----------------+Example: Handling Duplicate Keys Using Upserts
Upserts can be specified in the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE.
Example: Handling Duplicate Keys by Inserting Duplicate Records into a Separate Table
This example extracts, from the data source duplicate_, a list records that each contain a key (column 1) and a value (column 2).
1,33
4,28
1,45
1,56
6,88
4,67The pipeline inserts the extracted rows from the data source into the table t, except for the extracted rows that have duplicate keys.t_.
CREATE TABLE t(a INT PRIMARY KEY, b INT);CREATE TABLE t_duplicates(a INT, b INT);
Define the stored procedure:
DELIMITER //CREATE OR REPLACE PROCEDURE duplicate_keys_proc(batch QUERY(a INT, b INT))ASBEGINFOR batch_record IN COLLECT(batch) LOOPBEGININSERT INTO t(a,b) VALUES (batch_record.a,batch_record.b);EXCEPTIONWHEN ER_DUP_ENTRY THENINSERT INTO t_duplicates(a,b) VALUES (batch_record.a,batch_record.b);END;END LOOP;END //DELIMITER ;
The stored procedure handles batch records with duplicate keys by handling the ER_ exception.
Create and start the pipeline:
CREATE PIPELINE duplicate_keys_pipelineAS LOAD DATA FS '/duplicate_keys.txt'INTO PROCEDURE duplicate_keys_procFIELDS TERMINATED BY ',';START PIPELINE duplicate_keys_pipeline;
Retrieve the data from t and t_:
SELECT * FROM t ORDER BY a;
+---+------+
| a | b    |
+---+------+
| 1 |   33 |
| 4 |   28 |
| 6 |   88 |
+---+------+SELECT * FROM t_duplicates ORDER BY a;
+------+------+
| a    | b    |
+------+------+
|    1 |   56 |
|    1 |   45 |
|    4 |   67 |
+------+------+Last modified: September 8, 2025