CREATE PIPELINE
On this page
Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.
Note
When you run a CREATE PIPELINE .
command, it processes extracted and transformed records as if you ran LOAD DATA
alone, using the same options.
CREATE PIPELINE .
also has additional options that are not available in LOAD DATA
.LOAD DATA
section at the end of this topic.
Creating a pipeline does not automatically cause data to be loaded.
Syntax
CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> ASLOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration| azure_blob_configuration | hdfs_configuration | gcs_configuration| link_configuration | mongodb_configuration | mysql_configuration }[BATCH_INTERVAL <milliseconds>][MAX_PARTITIONS_PER_BATCH <num_partitions>][MAX_RETRIES_PER_BATCH_PARTITION <max_retries>][MAX_OFFSETS_PER_BATCH_PARTITION <num_offsets>][RESOURCE POOL <resource_pool_name>][(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION][WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ][REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS][STOP_ON_ERROR { ON | OFF }]{ INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> }{ <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> }[ (<column_name>, ... ) ][SET <column_name> = <expression>,... | <pipeline_source_file>][WHERE <expression>,...][ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]][NULL DEFINED BY <string> [OPTIONALLY ENCLOSED]]<kafka_configuration>:KAFKA 'kafka_topic_endpoint'<s3_configuration>:S3 { '<bucket-name>' | '<bucket-name/path>' }[CONFIG '<configuration_json>']CREDENTIALS '<credentials_json>'<filesystem_configuration>:FS 'path'[CONFIG '<configuration_json>']<azure_blob_configuration>:AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' }CREDENTIALS '<credentials_json>'[CONFIG '<configuration_json>']<hdfs_configuration>:HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>'[CONFIG '<configuration_json>']<gcs_configuration>:GCS { '<bucket-name>' | '<bucket-name/path>' }CREDENTIALS '<credentials_json>'[CONFIG '<configuration_json>']<link_configuration>:LINK <connection_name> '<path>'<mongodb_configuration>:MONGODB '<db_name.collection_name>'CREDENTIALS '<credentials_json>'CONFIG '<configuration_json>'<avro_format_options><mysql_configuration>:MYSQL '<db_name.table_name>'CREDENTIALS '<credentials_json>'CONFIG '<configuration_json>'<avro_format_options><json_format_options>:FORMAT JSON( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)[KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)]<avro_format_options>:FORMAT AVRO SCHEMA REGISTRY {uri}( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)[SCHEMA '<avro_schema>'][KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)]<subvalue_path>:{% | [%::]ident [::ident ...]}<parquet_format_options>:FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']<parquet_subvalue_mapping>:( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)<parquet_subvalue_path>:{ident [::ident ...]}<iceberg_format_options>:FORMAT ICEBERG <iceberg_subvalue_mapping><iceberg_subvalue_mapping>:({<singlestore_col_name> | @<variable_name>}<-<iceberg_subvalue_path> [, ... ])<iceberg_subvalue_path>: {ident [::ident ...]}<csv_format_options>:[FORMAT CSV][{FIELDS | COLUMNS}TERMINATED BY '<string>'[[OPTIONALLY] ENCLOSED BY '<char>'][ESCAPED BY '<char>']][LINES[STARTING BY '<string>'][TERMINATED BY '<string>']][IGNORE <number> LINES][ ({<column_name> | @<variable_name>}, ...) ]
Note
This topic uses the term "batch", which is a subset of data that a pipeline extracts from its data source.
Note
The REPLACE
capability of pipelines maintains the cursor positions when replacing existing pipelines.
Remarks
-
Pipeline names are always case-sensitive for operations that refer to pipelines.
-
If the
OR REPLACE
clause is provided and a pipeline withpipeline_
already exists, then thename CREATE
query will alter that pipeline to match the new definition (including its credentials).Its state, including its cursor positions, will be preserved by the CREATE
. -
The
OR REPLACE
andIF NOT EXISTS
clauses are mutually exclusive, i.e. , they cannot be used in the same CREATE PIPELINE
statement. -
The
MONGODB
andMYSQL
source only support theAGGREGATOR
pipelines andAVRO
data type.Note: SingleStore recommends using the
CREATE { TABLE | TABLES AS INFER PIPELINE }
statements to generate pipeline templates for theMONGODB
andMYSQL
sources.Use the automatically generated templates to create the required pipelines. For related information, refer to Replicate Data from MongoDB® and Replicate Data from MySQL. -
CONFIG
andCREDENTIALS
can be specified in either order (CONFIG
followed byCREDENTIALS
orCREDENTIALS
followed byCONFIG
). -
SKIP CONSTRAINT ERRORS
andSKIP DUPLICATE KEY ERRORS
are unsupported with pipelines into stored procedures. -
IGNORE
,SKIP PARSER ERRORS
, andSKIP ALL ERRORS
are supported with CSV and Kafka JSON pipelines only. -
REPLACE
,SKIP CONSTRAINT ERRORS
, andSKIP DUPLICATE KEY ERRORS
are supported with non-CSV pipelines. -
This command causes implicit commits.
Refer to COMMIT for more information. -
Pipelines can be used to ingest and process data with supported data sets.
For more information, see Character Encoding. -
You can use the function
pipeline_
in an expression in thebatch_ id() SET
clause.This function returns the id of the batch used to load the data. For example, given the table definition CREATE TABLE t(b_
, you could create this statement to load the batch id into the columnid INT, column_ 2 TEXT); b_
:id CREATE PIPELINE p AS LOAD DATA ... INTO TABLE t(@b_id,column_2)... SET b_id = pipeline_batch_id(); -
Pipeline names that begin with a number or are solely numeric; require backticks (` `) around the pipeline name.
CREATE PIPELINE `3e93587cb1` AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE <table_name>; -
STOP_
when enabled it will cause a pipeline to stop when an error occurs.ON_ ERROR If no value is specified, the behavior is controlled by the engine variable pipelines_
.stop_ on_ error The syntax for enabling or disabling is as follows: CREATE PIPELINE <pipeline_name> AS LOAD DATA FS '</path/to/file' STOP_ON_ERROR ON/OFF INTO TABLE <table_name>; -
Refer to the Permission Matrix for the required permission.
REPLACE, IGNORE or SKIP ERRORS
REPLACE, IGNORE or SKIP can be used when creating a pipeline.
The SKIP .
behavior allows you to specify an error scenario that, when encountered, discards an offending row.
Unlike SKIP .
which discards offending rows, IGNORE
may change the inserted row’s data to ensure that it adheres to the table schema.
Creating a Pipeline Using a Connection Link
The CREATE PIPELINE .
statement loads data from the data provider using a connection link.SHOW LINK
permission, provided by your administrator, to use a connection link.
The following example creates a pipeline using an HDFS connection:
USE db1;CREATE PIPELINE mypipeline AS LOAD DATALINK HDFS connection 'hdfs://hadoop-namenode:8020/path/to/files'INTO TABLE my_table;START PIPELINE mypipeline;
Kafka Pipeline Syntax
For a tutorial on getting started with Kafka pipelines, see Load Data from Kafka.
The following example statement demonstrates how to create a Kafka pipeline using the minimum required syntax:
Minimum Required Kafka Pipeline Syntax
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA 'host.example.com/my-topic'INTO TABLE table_name;START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_
, uses a Kafka cluster as the data source, points to the location of the my-topic
topic at the Kafka cluster’s endpoint, and begins to ingest data into table_
when the pipeline is started.
Remarks
Note
By default, records may be inserted out of order if the SingleStore cluster is sufficiently behind in loading records from the source.DISABLE OUT_
clause to CREATE PIPELINE
or ALTER PIPELINE
.
The Kafka download process involves querying one broker for the URLs of other brokers.
Compressed topics are supported without requiring any additional configuration in SingleStore.
A CREATE PIPELINE .
statement cannot specify more than one topic.
In a CREATE PIPELINE .
statement, you can specify the CONFIG
and CREDENTIALS
clauses.
Kafka pipelines that use FORMAT JSON
or FORMAT AVRO SCHEMA REGISTRY
can use KAFKA KEY
to extract keys from Kafka records.FORMAT
clause.
Kafka Pipeline Syntax Using the CONFIG and CREDENTIALS Clauses
There are multiple configuration options available for Kafka.
Note
Some of the configuration options are not supported in SingleStore."Forbidden Key"
error when accessing unsupported configuration options.
The following statement creates a Kafka pipeline to load data from Confluent Cloud, using the CONFIG
and CREDENTIALS
clauses.
CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA '<Confluent Cloud cluster endpoint>/test'CONFIG '{"sasl.username": "<CLUSTER_API_KEY>","sasl.mechanism": "PLAIN","security.protocol": "SASL_SSL","ssl.ca.location":"<CA certificate file path>",}CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'INTO TABLE table_name;
For more information about SingleStore-specific configurations for a Kafka environment, refer to Kafka Configurations.
Example: Accessing Kafka Keys in a Pipeline
The following CREATE PIPELINE
statement creates a new pipeline named pipeline_
, uses a Kafka cluster as the data source, points to the location of the my-topic
topic at the Kafka cluster’s endpoint, and begins to ingest data into table_
when the pipeline is started.json_
, which the pipeline maps to a column named col_
in the destination table.json_
, which the pipeline maps to a column named col_
in the destination table.
CREATE TABLE table_name (col_a INT, col_b TEXT);CREATE PIPELINE pipeline_name ASLOAD DATA KAFKA 'host.example.com/my-topic'INTO TABLE 'table_name'FORMAT JSON (col_a <- json_a)KAFKA KEY (col_b <- json_b);START PIPELINE pipeline_name;
Inserting a record into the my-topic
topic with key {"json_
and payload {"json_
results in the following row in the destination table:
col_ |
col_ |
---|---|
1 |
hello |
Retrieve Kafka Properties
A Kafka message has multiple properties, such as, offset
, partition
, timestamp
, and topic_
.get_
function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats.
Retrieving Kafka Properties from a JSON Pipeline
The following CREATE TABLE
statement creates a new table t, and adds four properties to each Kafka message.prop_
, prop_
, prop_
, and prop_
, then:
-
Create a new table t.
CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text); -
Create a pipeline to load data into this new table.
CREATE OR REPLACE PIPELINE p ASLOAD DATA kafka 'localhost:9092/jsontopic'INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)SET offset = get_kafka_pipeline_prop("prop_offset"),partition = get_kafka_pipeline_prop("prop_partition"),timestamp = get_kafka_pipeline_prop("prop_timestamp"),topicname = get_kafka_pipeline_prop("prop_topicname");
Note
SingleStore supports four predefined properties with names: prop_
, prop_
, prop_
, and prop_
.
Kafka Headers
Kafka headers are metadata key-value pairs that can be added to Kafka messages.
The get_
function can be used to consume these headers.
message_headers = [
(“Country”,”USA”),
(“City”, “Arizona”),
(“Language”, “English”),
]
Enter the header_
(“Country”, “City”, and “Language” in this example) instead of the “propname” in the argument of the get_
function to ingest the headers.
For example:
-
Create a new table t.
CREATE TABLE t (info text, country text, city text, language text); -
Create a pipeline to load data into this new table.
CREATE PIPELINE p ASLOAD DATA KAFKA 'localhost:9092/topic_name'(info)SET country = get_kafka_pipeline_prop("Country"),city = get_kafka_pipeline_prop("City"),language = get_kafka_pipeline_prop("Language");
Note
-
Currently, this function only ingests Kafka header values that are in string format.
-
If a transform is used with the Kafka properties or headers request, the transform script must not remove the size from the message for the
get_
function to work.kafka_ pipeline_ prop("propname")
Kafka Pipeline Support for Transactions
A Kafka pipeline can be configured to use one the following isolation levels, which determines whether the pipeline will support Kafka transactions.
-
read_
: The pipeline will read committed messages, which have been written transactionally, from Kafka partitions.committed The pipeline will also read messages, which have not been written transactionally, from Kafka partitions. -
read_
: The pipeline will read messages, which have not been written transactionally, from Kafka partitions.uncommitted The pipeline will also read messages which have been written transactionally, regardless if they have been committed.
For more information, see the Reading Transactional Messages section of the KafkaConsumer topic in the Apache documentation.
The isolation level, isolation.
, can be set using the CONFIG
option of a CREATE PIPELINE
statement.
CREATE PIPELINE .
By default, the isolation level is read_
.
Specifying Multiple Kafka Brokers
The following example creates a pipeline where three Kafka brokers are specified.9092
on host1.
, host2.
, and host3.
.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA 'host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/my-topic'INTO TABLE 'table_name';
CREATE PIPELINE . . . MAX_ OFFSETS_ PER_ BATCH_ PARTITION
Use this command to set the maximum number of data source partition offsets to extract in a single batch transaction.pipelines_
, and can be set for a single pipeline.
In the following example, the maximum number of data source partition offsets, per batch partition, and for mypipeline
, is set to 30:
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA 'host.example.com/my-topic'MAX_OFFSETS_PER_BATCH_PARTITION = 30INTO TABLE <table_name>;
Kafka Pipeline Using JSON Format
Kafka pipelines can use JSON formatted data for ingest.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>FORMAT JSON;
To skip errors without skipping the entire batch, use the SKIP PARSER ERRORS
clause.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>SKIP PARSER ERRORSFORMAT JSON;
The SKIP ALL ERRORS
clause can also be used to skip all erroneous messages.CREATE PIPELINE
statement.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>SKIP ALL ERRORSFORMAT JSON;
Refer to SKIP ALL ERRORS for more information.
Important
The use of SKIP PARSER ERRORS
and SKIP ALL ERRORS
along with WITH TRANSFORM
is not supported.
If the SKIP PARSER ERRORS
clause is added during create pipeline, the pipelines_
configuration will be ignored.
-
The warning message will be logged in the
pipelines_
table, adding the Kafka message in theerrors LOAD_
column.DATA_ LINE -
The entire Kafka message which has the parse error will be logged.
-
Skip the message and move to the next Kafka message.
If the SKIP PARSER ERRORS
clause is not added during create pipeline, the default behavior will remain the same.
-
Error messages will be logged in the
pipelines_
table, adding the Kafka message in theerrors LOAD_
column.DATA_ LINE -
The pipeline will stop or skip the current batch and move to the next batch based on whether the
pipelines_
variable is set to true or false.stop_ on_ error -
The engine variable
pipelines_
will stop or skip the current batch if the number of errors exceeds the threshold value.parse_ errors_ threshold This depends on whether the pipelines_
variable is set to true or false.stop_ on_ error Note
See PIPELINES_
BATCHES for more details about batches.
Kafka Pipeline Using Avro Format
Kafka pipelines can use Avro-formatted data for ingestion with the following minimum required syntax:
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>FORMAT AVRO;
To skip errors without skipping the entire batch, use the SKIP PARSER ERRORS
clause.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>SKIP PARSER ERRORSFORMAT AVRO;
The SKIP ALL ERRORS
clause can also be used to skip all failed messages when added to the CREATE PIPELINE
statement.
CREATE PIPELINE <pipeline_name> ASLOAD DATA KAFKA '<host.example.com/my-topic>'INTO TABLE <table_name>SKIP ALL ERRORSFORMAT AVRO;
Refer to SKIP ALL ERRORS for more information.
Important
The use of SKIP PARSER ERRORS
and SKIP ALL ERRORS
along with WITH TRANSFORM
is not supported.
If the SKIP PARSER ERRORS
clause is added during create pipeline, the pipelines_
configuration will be ignored.
-
The warning message will be logged in the
pipelines_
table, adding the Kafka message in theerrors LOAD_
column.DATA_ LINE -
The entire Kafka message which has the parse error will be logged.
-
Skip the message and move to the next Kafka message.
If the SKIP PARSER ERRORS
clause is not added during create pipeline, the default behavior will remain the same.
-
Error messages will be logged in the
pipelines_
table, adding the Kafka message in theerrors LOAD_
column.DATA_ LINE -
The pipeline will stop or skip the current batch and move to the next batch based on whether the
pipelines_
variable is set to true or false.stop_ on_ error -
The engine variable
pipelines_
will stop or skip the current batch if the number of errors exceeds the threshold value.parse_ errors_ threshold This depends on whether the pipelines_
variable is set to true or false.stop_ on_ error Note
See PIPELINES_
BATCHES for more details about batches.
S3 Pipeline Syntax
The following example statement demonstrates how to create an S3 pipeline using the minimum required syntax:
Minimum Required S3 Pipeline Syntax:
CREATE PIPELINE <pipeline_name> ASLOAD DATA S3 'bucket-name'CREDENTIALS '{"aws_access_key_id": "your_access_key_id","aws_secret_access_key": "your_secret_access_key"}'INTO TABLE `table_name`;START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_
, uses an S3 bucket named bucket-name
as the data source, and will start ingesting the bucket’s objects into table_
.
S3 pipelines support globbing (via the *
wildcard) in the bucket path and name.CREATE PIPELINE p AS LOAD DATA S3 'my_
.
To load files from a specific folder in your S3 bucket while ignoring the files in the subfolders, use the '**
' regular expression pattern as 's3://<bucket_
'.
CREATE PIPELINE <your_pipeline> ASLOAD DATA S3 's3://<bucket_name>/<folder_name>/**'CONFIG '{"region":"<your_region>"}'CREDENTIALS '{"aws_access_key_id": "<access_key_id>","aws_secret_access_key": "<secret_access_key>"}'SKIP DUPLICATE KEY ERRORSINTO TABLE <your_table>;
Using two asterisks (**) after the folder instructs the pipeline to load all of the files in the main folder and ignore the files in the subfolders.
No CONFIG
clause is required to create an S3 pipeline.request_
, endpoint_
x, and more.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE .
statement, you can reference a connection link.
AWS Pipeline Syntax for AWS_
AWS_
and the contents of the AWS_
may be used as credentials for creating a pipeline.
CREATE PIPELINE pipeline_web_token AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-east-1"}'
CREDENTIALS '{"role_arn": "arn:aws:iam::...",
"aws_web_identity_token": "eyJhb...."}'
INTO TABLE `table_name`;
AWS Elastic Kubernetes Service (EKS) IAM Roles for Service Accounts (IRSA) Authentication
AWS EKS IRSA may be used when creating an S3 pipeline.enable_
global variable to ON
as it is disabled by default."creds_
to the CONFIG
section of the pipeline and it will use the service role assigned to the EKS pods for S3 Authentication.
For further details refer Enable EKS IRSA
For a tutorial on getting started with S3 Pipelines, see Load Data from Amazon Web Services (AWS) S3.
S3 Pipeline Using Specified Region
CREATE PIPELINE pipeline_name ASLOAD DATA S3 'bucket-name'CONFIG '{"region": "us-west-1"}'CREDENTIALS '{"aws_access_key_id": "your_access_key_id","aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'INTO TABLE `table_name`;
S3 Pipeline Using Metadata Garbage Collection (GC)
By default, each pipeline maintains progress metadata for every known file name, for the lifetime of the pipeline.ENABLE OFFSETS METADATA GC
clause of CREATE PIPELINE
causes the created pipeline to periodically discard metadata for already-loaded files, using a timestamp-based strategy that preserves exactly-once semantics.
The number of batch metadata entries retained before being overwritten by new incoming batches may be adjusted by changing the default value of the global engine variable.pipelines_
.
CREATE PIPELINE <pipeline_name> AS LOAD DATA S3 "<file_name>"CONFIG '{"region": "us-west-1"}'CREDENTIALS '{"aws_access_key_id": "your_access_key_id","aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'ENABLE/DISABLE OFFSETS METADATA GCINTO TABLE <table_name>;
Note
By default the METADATA GC
clause is disabled.ALTER PIPELINE
command.
Azure Blob Pipeline Syntax
The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax.
Minimum Required Azure Pipeline Syntax:
CREATE PIPELINE pipeline_name ASLOAD DATA AZURE 'container-name'CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'INTO TABLE `table_name`;START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_
, uses an Azure Blob container named container-name
as the data source, and will start ingesting the bucket’s objects into table_
.
Note that no CONFIG
clause is required to create an Azure pipeline unless you need to specify the suffixes
for files to load or the disable_
condition.
An Azure pipeline must authenticate with Azure before it can begin reading blobs from a container.
The CREDENTIALS
clause should be a JSON object with two fields:
account_
: this is the account name under which your blob container resides.
account_
: usually an 88 character 512-bit string linked to a storage account.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE .
statement, you can reference a connection link.
See the Azure documentation about viewing and managing Azure Access Keys to learn more.
Permissions and Policies
An Azure pipeline can be created to read from either a container or a blob.
For example, if you provide credentials that implicitly grant access to all blobs in a container, you may not need to add or modify any access policies or permissions on the desired resource.
Consider the following scenarios:
-
Read all objects in a container: An Azure pipeline configured for a container will automatically read all blobs it has access to.
Changes to a container-level policy or permissions may be required. -
Read all blobs with a given prefix: An Azure pipeline configured to read all objects in a container with a given prefix may require changes to a container-level policy with a prefix in order to allow access the desired blobs.
-
Read a specific object in a container: An Azure pipeline configured for a specific blob may require changes to the policies and permissions for both the blob and container.
Filesystem Pipeline Syntax
The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_nameAS LOAD DATA FS '/path/to/files/*'INTO TABLE `table_name`FIELDS TERMINATED BY ',';START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_
, uses a directory as the data source, and will start ingesting data into table_
.
For more information on SingleStore-specific configurations for filesystems, refer to Filesystem Configurations.
For a tutorial on getting started with Filesystem Pipelines, see Load Data from the Filesystem Using a Pipeline.
Filesystem Paths and Permissions
The paths used by the Unix filesystem extractor address files which must be accessible from every node in the cluster.
Azure and S3 Folder Paths
If an Azure Blob or S3 bucket contains multiple folders beginning with similar names, the pipeline will read the data from all the folders unless and until a forward slash (/)
is included at the end of the path.
Below is an Azure example using three folders that begin with payment in their names.
-
Payment
-
PaymentHistory
-
PaymentPlan
CREATE OR REPLACE PIPELINE pipeline_csv_insert_payment ASLOAD DATA AZURE 'uploads/csv/Payment/'CREDENTIALS'{"account_name": "<account-name>","account_key": "<account-key>"}'INTO PROCEDURE spPipelineCsvInsertPaymentFIELDS TERMINATED BY ','LINES TERMINATED BY '\r\n'IGNORE 1 LINES;
Below is an S3 bucket example using two folders: BooksNautical and BooksHorror.
CREATE PIPELINE books ASLOAD DATA S3 's3://test-bucket5-ss/BooksNautical/'CONFIG '{"region":"us-west-2"}'CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXX","aws_secret_access_key": "XXXXXXXXXX"}'INTO TABLE all_booksFIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\'LINES TERMINATED BY '\r\n' STARTING BY ''IGNORE 1 LINES;
HDFS Pipeline Syntax
The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_nameAS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'INTO TABLE `table_name`FIELDS TERMINATED BY '\t';START PIPELINE pipeline_name;
This example creates a new pipeline named pipeline_
and references the HDFS path /path/to/files
as the data source.part-(m|r)-(.
are imported into my_
.SUCCESS
file.
You also specify attributes in CREATE PIPELINE
’s CONFIG
clause when you use advanced HDFS pipelines mode.
For more information on SingleStore-specific configurations for HDFS, refer to HDFS Configurations.
As an alternative to specifying the CONFIG
clause in a CREATE PIPELINE .
statement, you can reference a connection link.
For a tutorial on getting started with HDFS Pipelines, Load Data from HDFS Using a Pipeline.
GCS Pipeline Syntax
The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_nameAS LOAD DATA GCS 'bucket-name'CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'INTO TABLE `table_name`FIELDS TERMINATED BY ',';START PIPELINE pipeline_name;
This example creates a new pipeline named pipeline_
, uses a GCS bucket named bucket-name
as the data source, and ingests the bucket’s objects into my_
.
For more information on SingleStore-specific configurations for GCS, refer to GCS Configurations.
GCS pipelines support globbing (via the *
wildcard) in the bucket path and name.CREATE PIPELINE p AS LOAD DATA GCS 'my_
.
For a tutorial on getting started with GCS Pipelines, see Load Data from Google Cloud Storage (GCS) Using a Pipeline.
Authentication and Access Control in GCS
A GCS pipeline requires you to authenticate to the desired bucket.
Authentication
A GCS pipeline must authenticate to Google before it can begin reading objects from a bucket.
The CREDENTIALS
clause should be a JSON object with two fields:
access_
: usually a 24 or 60 character alphanumeric string, which is linked to the Google account, typically is all uppercase and starts with GOOG
.
secret_
: usually a 40 character Base-64 encoded string that is linked to a specific access_
.
...
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
...
See the GCS documentation about managing GCS HMAC keys to learn more.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE .
statement, you can reference a connection link.
GCS Pipeline Limitations
A GCS pipeline has a few limitations and also inherits limitations from the Google Cloud Storage service itself.
-
Versioning: If your GCS pipeline is configured to read from a version-enabled bucket, you will only receive the latest version of the object from the bucket.
Currently, you cannot configure your GCS pipeline to read specific version IDs. -
5 TB Max Object Size: Google Cloud Storage supports a maximum object size of 5 TB, and this limit also applies to an GCS pipeline.
-
Rate Limiting: A large SingleStorecluster configured with a GCS pipeline might encounter throttling or rate limiting imposed by GCS.
Currently, a GCS pipeline cannot be configured to bypass these limits by reducing the frequency of read requests. For more information on GCS’s rate limiting and how to optimize your data, see this GCS documentation page.
Creating an Iceberg Pipeline
Refer to Iceberg Ingest for information on creating pipelines to directly ingest data from Apache Iceberg tables into SingleStore and for information on using <iceberg_
.
Creating a Parquet Pipeline
The CREATE PIPELINE .
statement extracts specified fields from records in Parquet files.
LOAD DATA
Note
Pipelines natively load JSON, Avro, and CSV input data; see LOAD DATA for example syntax.
When you run a CREATE PIPELINE .
command, it processes extracted and transformed records as if you ran LOAD DATA
alone, using the same options.
CREATE PIPELINE .
also has additional options that are not available in LOAD DATA
.
The SCHEMA REGISTRY {"IP" | "Hostname"}
option allows LOAD DATA
to pull an Avro schema from a schema registry.
AS LOAD DATA
: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.
-
AS LOAD DATA KAFKA 'topic_
: To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster.endpoint' For example: LOAD DATA KAFKA 'host.example.com/my-topic' -
AS LOAD DATA S3 'bucket-name'
: To use S3 as a data source, you must specify a bucket name or a bucket name and object name.For example: LOAD DATA S3 'bucket-name' -
AS LOAD DATA .
: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry.. . FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"} You can optionally use the CONFIG
andCREDENTIALS
clauses to specify configuration settings to connect to the schema registry over SSL.Note
The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL.
The ssl.
settings shown only apply to SingleStore 7.3. 5 and later. CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro"INTO TABLE ...FORMAT AVROSCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"...CONFIG '{"schema.registry.ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node","schema.registry.ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node","schema.registry.ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}'CREDENTIALS '{"schema.registry.ssl.key.password": "your-password-for-the-ssl-certificate-key"}'Note
You can use a subset of the
ssl.
settings as follows:-
schema.
,registry. ssl. key. location schema.
, andregistry. ssl. ca. location schema.
registry. ssl. key. password -
schema.
,registry. ssl. certificate. location schema.
, andregistry. ssl. key. location schema.
registry. ssl. key. password
schema.
is only required if your SSL certificate key has a password.registry. ssl. key. password As an alternative to specifying the SSL configuration settings in the
CONFIG
andCREDENTIALS
clauses, you can install the registry's certificate (ca-cert) on all nodes in your SingleStorecluster. -
-
[BATCH_
: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source.INTERVAL milliseconds] If a batch interval is not specified, the default value is 2500
.For example: LOAD DATA KAFKA '127.0.0.1/my-topic'BATCH_INTERVAL 500 -
[MAX_
: Specifies the maximum number of batch partitions that can be scheduled into a single batch.PARTITIONS_ PER_ BATCH max_ partitions_ per_ batch] Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources. -
[MAX_
: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table.RETRIES_ PER_ BATCH_ PARTITION max_ retries] If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up to max_
specified in this clause.retries If no value is specified or MAX_
is set toRETRIES_ PER_ BATCH_ PARTITION 0
, thenmax_
is set to the value specified by the PIPELINES_retries MAX_ RETRIES_ PER_ BATCH_ PARTITION engine variable. -
[RESOURCE POOL pool_
: Specifies the resource pool that is used to load pipeline data.name] -
If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool.
If no default resource pool has been set for the user, then the pipeline uses the value of the resource_
engine variable as its resource pool.pool -
In order to create a new resource pool, the user will need to have
SUPER
orCREATE_
privileges.RG -
Use
information_
to see users and their related privileges.schema. USER_ PRIVILEGES See USER_ PRIVILEGES for more information.
-
-
To obtain details about a resource pool, use
select * from information_
to review the current running threads on a cluster for all nodes.schema. mv_ resource_ pool_ status -
The background tasks of a pipeline runs in its resource pool.
Therefore, a resource pool used by a pipeline cannot be dropped. -
The user who creates the pipeline must have permissions to use the resource pool.
The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool. -
For more information on resource pools, see Set Resource Limits.
Note
A good starting point for troubleshooting memory issues in pipelines is to run
SELECT * FROM information_
.schema. mv_ processlist See mv_ processlist for more information. -
-
[AGGREGATOR]
: SpecifyingCREATE AGGREGATOR PIPELINE
tells SingleStore to pull data through the aggregator, instead of directly to the leaves.This option can be more efficient for low parallelism pipelines, like single file S3
loads or single partitionKafka
topics (an aggregator pipeline is not required for single-partition Kafka topics).An aggregator pipeline is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.
Example
The following example shows how to use multiple SET
clauses with CREATE PIPELINE .
for ingesting values to columns using variables.
echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv
Create a pipeline that ingests infile.
into the following table orders
.
CREATE TABLE orders (ID INT,del_t1 DATETIME,del_t2 DATETIME);
CREATE PIPELINE order_load ASLOAD DATA FS '/pipeline_test_infile/'INTO TABLE ordersFIELDS TERMINATED BY ','(ID, @del_t1, @del_t2)SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),del_t2 = IF(@del_t2='NULL',NULL,@del_t2);
Test the pipeline:
TEST PIPELINE order_load;
+------+---------------------+---------------------+
| ID | del_t1 | del_t2 |
+------+---------------------+---------------------+
| 1 | NULL | 2020-08-06 07:53:09 |
| 2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
| 3 | 2020-08-05 07:53:09 | NULL |
+------+---------------------+---------------------+
Note
If you want to see more examples of loading data with vectors, refer to How to Bulk Load Vectors.
Loading Data into a Pipeline Using JSON
When using JSON formatted data, the mapping clause should be used.
Below are examples for loading data into a pipeline using JSON.
The following examples use a JSON key pair and an array from a local file source.
CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR(10));CREATE PIPELINE jtest ASLOAD DATA FS '<file path>/keypairs.json'INTO TABLE keypairsFORMAT JSON(`key` <- keypairs::`key`,`value` <- keypairs::`value`);START PIPELINE jtest;SELECT * FROM keypairs;
+------+--------+
| key | value |
+------+--------+
| 1 | 1 |
+------+--------+
Warning
To use reserved words in a table, backticks are required.key
and value
as column names by using backticks.
CREATE TABLE teams(basketball VARCHAR(50), baseball VARCHAR (50),football VARCHAR(50), hockey VARCHAR(50));CREATE PIPELINE teams_list ASLOAD DATA FS '<file path>/jtinsert.json'INTO TABLE teamsFORMAT JSON(basketball <- teams::basketball,baseball <- teams::baseball,football <- teams::football,hockey <- teams::hockey);START PIPELINE teams_list;SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball | football | hockey |
+------------+-----------+----------+--------+
| Lakers | Dodgers | Raiders | Kings |
+------------+-----------+----------+--------+
The following examples use a JSON key pair and an array from an S3 bucket.
CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR (10));CREATE PIPELINE jtest ASLOAD DATA S3 's3://<bucket_name>/<filename>.json'CONFIG '{"region":"us-west-2"}'CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX","aws_secret_access_key": "XXXXXXXXXXX"}'INTO TABLE keypairsFORMAT JSON(`key` <- keypairs::`key`,`value` <- keypairs::`value`);START PIPELINE jtest;SELECT * FROM keypairs;
+------+--------+
| key | value |
+------+--------+
| 1 | 1 |
+------+--------+
CREATE TABLE teams(basketball varchar(50), baseball varchar (50),football varchar(50), hockey varchar(50));
Below is the contents of the JSON file used in the pipeline example:
{"teams": [{"basketball": "Knicks"},{"baseball": "Yankees"},{"football": "Giants"},{"hockey": "Rangers"}]}
CREATE PIPELINE teams_list ASLOAD DATA S3 's3://<bucket_name>/<filename>.json'CONFIG '{"region":"us-west-2"}'CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX","aws_secret_access_key": "XXXXXXXXXXX"}'INTO TABLE teamsFORMAT JSON(basketball <- teams::basketball,baseball <- teams::baseball,football <- teams::football,hockey <- teams::hockey);START PIPELINE teams_list;SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball | football | hockey |
+------------+-----------+----------+--------+
| Lakers | Dodgers | Raiders | Kings |
+------------+-----------+----------+--------+
Loading JSON Data from a CSV File
To use an ENCLOSED BY <char>
as a terminating field, a TERMINATED BY
clause is needed.ENCLOSED BY <char>
appearing within a field value can be duplicated, and they will be understood as a singular occurrence of the character.
If an ENCLOSED BY ""
is used, quotes are treated as follows:
-
"The ""NEW"" employee" → The "NEW" employee
-
The "NEW" employee → The "NEW" employee
-
The ""NEW"" employee → The ""NEW"" employee
Example 1
An ENCLOSED BY clause is required when a csv file has a JSON column enclosed with double quotes (" ").
CREATE TABLE employees(emp_id INT, data JSON);
csv file contents
emp_id,data
159,"{""name"": ""Damien Karras"", ""age"": 38, ""city"": ""New York""}"
CREATE OR REPLACE PIPELINE emps ASLOAD DATA FS '/tmp/<file_name>.csv'INTO TABLE employeesFIELDS TERMINATED BY ','ENCLOSED BY '"'IGNORE 1 LINES;START PIPELINE emps;SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data |
+--------+-----------------------------------------------------+
| 159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+
Example 2
An ESCAPED BY
clause is required when a character is specified as an escape character for a string.
csv file contents
emp_id,data
298,"{\"name\": \"Bill Denbrough\", \"age\": 25, \"city\": \"Bangor\"}"
CREATE OR REPlACE PIPELINE emps2 ASLOAD DATA FS '/tmp/<file_name>.csv'INTO TABLE employeesFIELDS TERMINATED BY ','ENCLOSED BY '"'ESCAPED BY '\\'IGNORE 1 LINES;START PIPELINE emps2;SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data |
+--------+-----------------------------------------------------+
| 298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"} |
| 159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+
Example 3
This example will fail as the JSON field in the csv file is not in the correct format.
csv file contents
emp_id,data
410,"{"name": "Annie Wilkes", "age": 45, "city":"Silver Creek"}"
CREATE OR REPLACE PIPELINE emps3 ASLOAD DATA FS '/tmp/<file_name>.csv'INTO TABLE employeesFIELDS TERMINATED BY ','ENCLOSED BY '"'IGNORE 1 LINES;START PIPELINE emps3;
ERROR 1262 (01000): Leaf Error (127.0.0.1:3307): Row 1 was truncated; it contained more data than there were input columns
Example 4
An ENCLOSED BY
clause is required when a csv file has a JSON column enclosed with curly brackets ({ }).
csv file contents
emp_id,data
089,{"name": "Wilbur Whateley","age": 62,"city": "Dunwich"}
CREATE OR REPLACE PIPELINE emps4 ASLOAD DATA FS '/tmp/<file_name>.csv'INTO TABLE employeesFIELDS TERMINATED BY ','ENCLOSED BY '{'IGNORE 1 LINES;START PIPELINE emps4;SELECT * FROM employees;
+--------+------------------------------------------------------+
| emp_id | data |
+--------+------------------------------------------------------+
| 298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"} |
| 159 | {"age":38,"city":"New York","name":"Damien Karras"} |
| 89 | {"age":62,"city":"Dunwich","name":"Wilbur Whateley"} |
+--------+------------------------------------------------------+
WITH TRANSFORM
Creates a pipeline that uses a transform.
INTO PROCEDURE
Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline's data source.
Last modified: December 13, 2024