CREATE PIPELINE

Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.

Note

When you run a CREATE PIPELINE ... LOAD DATA command, it processes extracted and transformed records as if you ran LOAD DATA alone, using the same options. These options are explained in LOAD DATA.

CREATE PIPELINE ... LOAD DATA also has additional options that are not available in LOAD DATA. These additional options are explained in the LOAD DATA section at the end of this topic.

Creating a pipeline does not automatically cause data to be loaded. Once a pipeline is created, it must be running before it will load data. See START PIPELINE.

Syntax

CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> AS
LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
| azure_blob_configuration | hdfs_configuration | gcs_configuration
| link_configuration | mongodb_configuration | mysql_configuration }
[BATCH_INTERVAL <milliseconds>]
[MAX_PARTITIONS_PER_BATCH <num_partitions>]
[MAX_RETRIES_PER_BATCH_PARTITION <max_retries>]
[MAX_OFFSETS_PER_BATCH_PARTITION <num_offsets>]
[RESOURCE POOL <resource_pool_name>]
[(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
[WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
[REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS]
[STOP_ON_ERROR { ON | OFF }]
{ INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> }
{ <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> }
[ (<column_name>, ... ) ]
[SET <column_name> = <expression>,... | <pipeline_source_file>]
[WHERE <expression>,...]
[ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]]
[NULL DEFINED BY <string> [OPTIONALLY ENCLOSED]]
<kafka_configuration>:
KAFKA 'kafka_topic_endpoint'
<s3_configuration>:
S3 { '<bucket-name>' | '<bucket-name/path>' }
[CONFIG '<configuration_json>']
CREDENTIALS '<credentials_json>'
<filesystem_configuration>:
FS 'path'
[CONFIG '<configuration_json>']
<azure_blob_configuration>:
AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' }
CREDENTIALS '<credentials_json>'
[CONFIG '<configuration_json>']
<hdfs_configuration>:
HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>'
[CONFIG '<configuration_json>']
<gcs_configuration>:
GCS { '<bucket-name>' | '<bucket-name/path>' }
CREDENTIALS '<credentials_json>'
[CONFIG '<configuration_json>']
<link_configuration>:
LINK <connection_name> '<path>'
<mongodb_configuration>:
MONGODB '<db_name.collection_name>'
CREDENTIALS '<credentials_json>'
CONFIG '<configuration_json>'
<avro_format_options>
<mysql_configuration>:
MYSQL '<db_name.table_name>'
CREDENTIALS '<credentials_json>'
CONFIG '<configuration_json>'
<avro_format_options>
<json_format_options>:
FORMAT JSON
( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)
[KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)]
<avro_format_options>:
FORMAT AVRO SCHEMA REGISTRY {uri}
( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)
[SCHEMA '<avro_schema>']
[KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)]
<subvalue_path>:
{% | [%::]ident [::ident ...]}
<parquet_format_options>:
FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']
<parquet_subvalue_mapping>:
( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)
<parquet_subvalue_path>:
{ident [::ident ...]}
<iceberg_format_options>:
FORMAT ICEBERG <iceberg_subvalue_mapping>
<iceberg_subvalue_mapping>:(
{<singlestore_col_name> | @<variable_name>}<-<iceberg_subvalue_path> [, ... ])
<iceberg_subvalue_path>: {ident [::ident ...]}
<csv_format_options>:
[FORMAT CSV]
[{FIELDS | COLUMNS}
TERMINATED BY '<string>'
[[OPTIONALLY] ENCLOSED BY '<char>']
[ESCAPED BY '<char>']
]
[LINES
[STARTING BY '<string>']
[TERMINATED BY '<string>']
]
[IGNORE <number> LINES]
[ ({<column_name> | @<variable_name>}, ...) ]

Note

This topic uses the term "batch", which is a subset of data that a pipeline extracts from its data source. For more information on batches, see The Lifecycle of a Pipeline.

Note

The REPLACE capability of pipelines maintains the cursor positions when replacing existing pipelines.

Remarks

  • SingleStore Helios does not support Filesystem Pipelines.

  • Pipeline names are always case-sensitive for operations that refer to pipelines.

  • If the OR REPLACE clause is provided and a pipeline with pipeline_name already exists, then the CREATE query will alter that pipeline to match the new definition (including its credentials). Its state, including its cursor positions, will be preserved by the CREATE.

  • The OR REPLACE and IF NOT EXISTS clauses are mutually exclusive, i.e., they cannot be used in the same CREATE PIPELINE statement.

  • The MONGODB and MYSQL source only support the AGGREGATOR pipelines and AVRO data type.

    Note: SingleStore recommends using the CREATE { TABLE | TABLES AS INFER PIPELINE } statements to generate pipeline templates for the MONGODB and MYSQL sources. Use the automatically generated templates to create the required pipelines. For related information, refer to Replicate MongoDB® Collections using SQL and Replicate Data from MySQL.

  • CONFIG and CREDENTIALS can be specified in either order (CONFIG followed by CREDENTIALS or CREDENTIALS followed by CONFIG).

  • SKIP CONSTRAINT ERRORS and SKIP DUPLICATE KEY ERRORS are unsupported with pipelines into stored procedures.

  • IGNORE, SKIP PARSER ERRORS, and SKIP ALL ERRORS are supported with CSV and Kafka JSON pipelines only.

  • REPLACE, SKIP CONSTRAINT ERRORS, and SKIP DUPLICATE KEY ERRORS are supported with non-CSV pipelines.

  • This command causes implicit commits. Refer to COMMIT for more information.

  • Pipelines can be used to ingest and process data with supported data sets. For more information, see Character Encoding.

  • You can use the function pipeline_batch_id() in an expression in the SET clause. This function returns the id of the batch used to load the data. For example, given the table definition CREATE TABLE t(b_id INT, column_2 TEXT);, you could create this statement to load the batch id into the column b_id:

    CREATE PIPELINE p AS LOAD DATA ... INTO TABLE t(@b_id,column_2)
    ... SET b_id = pipeline_batch_id();
  • CREATE PIPELINE with a 3rd party file storage system (S3, HDFS, Azure Blob, GSC, or Kafka) requires CREATE PIPELINE and OUTBOUND privileges.

    Without the required privileges, an error will be generated:

    CREATE PIPELINE:
    CREATE PIPELINE pipeline1
    AS LOAD DATA S3 'my-bucket-name'
    CONFIG '{"region": "us-east-1"}'
    CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key", "aws_session_token": "your_session_token"}'
    INTO TABLE 't1'
    FIELDS TERMINATED BY ',';
    ERROR 1227 (42000): Access denied; you need (at least one of) the OUTBOUND privilege(s) for this operation

    Here is how to set the privileges for a user to be allowed to execute this command:

    CREATE PIPELINE:
    GRANT CREATE PIPELINE, OUTBOUND ON *.* to user1;

    The OUTBOUND privilege will be displayed in the SHOW GRANTS command:

    SHOW GRANTS FOR user1;
    +--------------------------------------------------------+
    | Grants for user1@%                                     |
    +--------------------------------------------------------+
    | GRANT CREATE PIPELINE, OUTBOUND ON *.* TO 'user1'@'%   |
    +--------------------------------------------------------+
  • Pipeline names that begin with a number or are solely numeric; require backticks (` `) around the pipeline name.

    CREATE PIPELINE `3e93587cb1` AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE <table_name>;
  • STOP_ON_ERROR when enabled it will cause a pipeline to stop when an error occurs. If no value is specified, the behavior is controlled by the engine variable pipelines_stop_on_error. The syntax for enabling or disabling is as follows:

    CREATE PIPELINE <pipeline_name> AS LOAD DATA FS '</path/to/file' STOP_ON_ERROR ON/OFF INTO TABLE <table_name>;
  • Refer to the Permission Matrix for the required permission.

REPLACE, IGNORE or SKIP ERRORS

REPLACE, IGNORE or SKIP can be used when creating a pipeline. See Additional CREATE PIPELINE Examples for more information.

The SKIP ... ERRORS behavior allows you to specify an error scenario that, when encountered, discards an offending row. See LOAD DATA for more information.

Unlike SKIP ... ERRORS which discards offending rows, IGNORE may change the inserted row’s data to ensure that it adheres to the table schema. See LOAD DATA

The CREATE PIPELINE .. LINK statement loads data from the data provider using a connection link. To use this command, you only need to know the connection link name, not the connection details and configuration. However, you need the SHOW LINK permission, provided by your administrator, to use a connection link. This command supports connections to S3, Azure, GCS, HDFS, and Kafka only.

The following example creates a pipeline using an HDFS connection:

USE db1;
CREATE PIPELINE mypipeline AS LOAD DATA
LINK HDFS connection 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE my_table;
START PIPELINE mypipeline;

Kafka Pipeline Syntax

For a tutorial on getting started with Kafka pipelines, see Load Data from Kafka Using a Pipeline.

The following example statement demonstrates how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA 'host.example.com/my-topic'
INTO TABLE table_name;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a Kafka workspace as the data source, points to the location of the my-topic topic at the Kafka workspace’s endpoint, and begins to ingest data into table_name when the pipeline is started.

Remarks

Note

By default, records may be inserted out of order if the SingleStore workspace is sufficiently behind in loading records from the source. If you require the records to be inserted in order (e.g. in upsert scenarios), pass the DISABLE OUT_OF_ORDER OPTIMIZATION clause to CREATE PIPELINE or ALTER PIPELINE. When this optimization is disabled, records from the same source file or source partition will be inserted in the order in which they appear in that partition. However, records from different source files or source partitions may still be inserted in an arbitrary order with respect to each other.

The Kafka download process involves querying one broker for the URLs of other brokers. It can be difficult to configure Kafka brokers to advertise proxy URLs in that step of the process.

Compressed topics are supported without requiring any additional configuration in SingleStore.

A CREATE PIPELINE ... KAFKA statement cannot specify more than one topic.

In a CREATE PIPELINE ... KAFKA statement, you can specify the CONFIG and CREDENTIALS clauses. As an alternative, you can reference a connection link. For more information, see Configuring and Using Connection Links.

Kafka pipelines that use FORMAT JSON or FORMAT AVRO SCHEMA REGISTRY can use KAFKA KEY to extract keys from Kafka records. Keys must be in the format specified in the FORMAT clause.

Kafka Pipeline Syntax Using the CONFIG and CREDENTIALS Clauses

There are multiple configuration options available for Kafka. Consult the CONFIGURATION.md file in the librdkafka project in GitHub to see the full list.

Note

Some of the configuration options are not supported in SingleStore. The client will receive a "Forbidden Key" error when accessing unsupported configuration options.

The following statement creates a Kafka pipeline to load data from Confluent Cloud, using the CONFIG and CREDENTIALS clauses.

CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location":"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",}
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE table_name;

For more information about SingleStore Helios-specific configurations for a Kafka environment, refer to Kafka Configurations.

Example: Accessing Kafka Keys in a Pipeline

The following CREATE PIPELINE statement creates a new pipeline named pipeline_name, uses a Kafka workspace as the data source, points to the location of the my-topic topic at the Kafka workspace’s endpoint, and begins to ingest data into table_name when the pipeline is started. The pipeline expects the payload of each Kafka record to be in JSON format and have a field named json_a, which the pipeline maps to a column named col_a in the destination table. In addition, the pipeline expects the key of each Kafka record to be in JSON format and have a field named json_b, which the pipeline maps to a column named col_b in the destination table.

CREATE TABLE table_name (col_a INT, col_b TEXT);
CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host.example.com/my-topic'
INTO TABLE 'table_name'
FORMAT JSON (col_a <- json_a)
KAFKA KEY (col_b <- json_b);
START PIPELINE pipeline_name;

Inserting a record into the my-topic topic with key {"json_b": "hello"} and payload {"json_a": 1} results in the following row in the destination table:

col_a

col_b

1

hello

Retrieve Kafka Properties

A Kafka message has multiple properties, such as, offset, partition, timestamp, and topic_name. Use the get_kafka_pipeline_prop("propname") function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats.

Retrieving Kafka Properties from a JSON Pipeline

The following CREATE TABLE statement creates a new table t, and adds four properties to each Kafka message. If the property names are prop_offset, prop_partition, prop_timestamp, and prop_topicname, then:

  1. Create a new table t.

    CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text);
  2. Create a pipeline to load data into this new table.

    CREATE OR REPLACE PIPELINE p AS
    LOAD DATA kafka 'localhost:9092/jsontopic'
    INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)
    SET offset = get_kafka_pipeline_prop("prop_offset"),
    partition = get_kafka_pipeline_prop("prop_partition"),
    timestamp = get_kafka_pipeline_prop("prop_timestamp"),
    topicname = get_kafka_pipeline_prop("prop_topicname");

Note

SingleStore supports four predefined properties with names: prop_offset, prop_partition, prop_timestamp, and prop_topicname.

Kafka Headers

Kafka headers are metadata key-value pairs that can be added to Kafka messages. They provide additional context or information about the message without affecting the message payload. Each header consists of a key and a value. 

The get_kafka_pipeline_prop("propname") function can be used to consume these headers. For example, if a Kafka message has the headers:

message_headers = [	
(“Country”,”USA”),	
(“City”, “Arizona”),	
(“Language”, “English”),
]

Enter the header_key (“Country”, “City”, and “Language” in this example) instead of the “propname” in the argument of the get_kafka_pipeline_prop("propname") function to ingest the headers. 

For example:

  1. Create a new table t.

    CREATE TABLE t (info text, country text, city text, language text);
  2. Create a pipeline to load data into this new table.

    CREATE PIPELINE p AS
    LOAD DATA KAFKA 'localhost:9092/topic_name'
    (info)
    SET country = get_kafka_pipeline_prop("Country"),
    city = get_kafka_pipeline_prop("City"),
    language = get_kafka_pipeline_prop("Language");

Note

  • Currently, this function only ingests Kafka header values that are in string format.

  • If a transform is used with the Kafka properties or headers request, the transform script must not remove the size from the message for the get_kafka_pipeline_prop("propname") function to work.

Kafka Pipeline Support for Transactions

A Kafka pipeline can be configured to use one the following isolation levels, which determines whether the pipeline will support Kafka transactions.

  • read_committed: The pipeline will read committed messages, which have been written transactionally, from Kafka partitions. The pipeline will also read messages, which have not been written transactionally, from Kafka partitions.

  • read_uncommitted: The pipeline will read messages, which have not been written transactionally, from Kafka partitions. The pipeline will also read messages which have been written transactionally, regardless if they have been committed.

For more information, see the Reading Transactional Messages section of the KafkaConsumer topic in the Apache documentation.

The isolation level, isolation.level , can be set using the CONFIG option of a CREATE PIPELINE statement. For example:

CREATE PIPELINE ... KAFKA ... CONFIG '{"isolation.level': "read_uncommitted"}' ...

By default, the isolation level is read_committed.

Specifying Multiple Kafka Brokers

The following example creates a pipeline where three Kafka brokers are specified. They are located on port 9092 on host1.example.com, host2.example.com, and host3.example.com.

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA 'host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/my-topic'
INTO TABLE 'table_name';

CREATE PIPELINE ... MAX_OFFSETS_PER_BATCH_PARTITION

Use this command to set the maximum number of data source partition offsets to extract in a single batch transaction. This command overrides the global variable pipelines_max_offsets_per_batch_partition, and can be set for a single pipeline. It applies only to Kafka pipelines.

In the following example, the maximum number of data source partition offsets, per batch partition, and for mypipeline, is set to 30:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA 'host.example.com/my-topic'
MAX_OFFSETS_PER_BATCH_PARTITION = 30
INTO TABLE <table_name>;

Kafka Pipeline Using JSON Format

Kafka pipelines can use JSON formatted data for ingest. The minimum required syntax is below:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
FORMAT JSON;

To skip errors without skipping the entire batch, use the SKIP PARSER ERRORS clause.

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP PARSER ERRORS 
FORMAT JSON;

The SKIP ALL ERRORS clause can also be used to skip all erroneous messages. It is added in the CREATE PIPELINE statement. For example:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP ALL ERRORS 
FORMAT JSON;

Refer to SKIP ALL ERRORS for more information.

Important

The use of SKIP PARSER ERRORS and SKIP ALL ERRORS along with WITH TRANSFORM is not supported. Using both clauses will cause an error when creating or altering a pipeline.

If the SKIP PARSER ERRORS clause is added during create pipeline, the pipelines_stop_on_error configuration will be ignored.

  • The warning message will be logged in the pipelines_errors table, adding the Kafka message in the LOAD_DATA_LINE column.

  • The entire Kafka message which has the parse error will be logged.

  • Skip the message and move to the next Kafka message.

If  the SKIP PARSER ERRORS clause is not added during create pipeline, the default behavior will remain the same.

  • Error messages will be logged in the pipelines_errors table, adding the Kafka message in the LOAD_DATA_LINE column.

  • The pipeline will stop or skip the current batch and move to the next batch based on whether the pipelines_stop_on_error variable is set to true or false.

  • The engine variable pipelines_parse_errors_threshold will stop or skip the current batch if the number of errors exceeds the threshold value. This depends on whether the pipelines_stop_on_error variable is set to true or false.

    Note

    See PIPELINES_BATCHES for more details about batches.

Kafka Pipeline Using Avro Format

Kafka pipelines can use Avro-formatted data for ingestion with the following minimum required syntax:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
FORMAT AVRO;

To skip errors without skipping the entire batch, use the SKIP PARSER ERRORS clause.

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP PARSER ERRORS 
FORMAT AVRO;

The SKIP ALL ERRORS clause can also be used to skip all failed messages when added to the CREATE PIPELINE statement. For example:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP ALL ERRORS 
FORMAT AVRO;

Refer to SKIP ALL ERRORS for more information.

Important

The use of SKIP PARSER ERRORS and SKIP ALL ERRORS along with WITH TRANSFORM is not supported. Using both clauses will cause an error when creating or altering a pipeline.

If the SKIP PARSER ERRORS clause is added during create pipeline, the pipelines_stop_on_error configuration will be ignored.

  • The warning message will be logged in the pipelines_errors table, adding the Kafka message in the LOAD_DATA_LINE column.

  • The entire Kafka message which has the parse error will be logged.

  • Skip the message and move to the next Kafka message.

If  the SKIP PARSER ERRORS clause is not added during create pipeline, the default behavior will remain the same.

  • Error messages will be logged in the pipelines_errors table, adding the Kafka message in the LOAD_DATA_LINE column.

  • The pipeline will stop or skip the current batch and move to the next batch based on whether the pipelines_stop_on_error variable is set to true or false.

  • The engine variable pipelines_parse_errors_threshold will stop or skip the current batch if the number of errors exceeds the threshold value. This depends on whether the pipelines_stop_on_error variable is set to true or false.

    Note

    See PIPELINES_BATCHES for more details about batches.

S3 Pipeline Syntax

The following example statement demonstrates how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE <pipeline_name> AS
LOAD DATA S3 'bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key"}'
INTO TABLE `table_name`;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an S3 bucket named bucket-name as the data source, and will start ingesting the bucket’s objects into table_name. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).

S3 pipelines support globbing (via the * wildcard) in the bucket path and name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA S3 'my_bucket_path/file1*' ....

To load files from a specific folder in your S3 bucket while ignoring the files in the subfolders, use the '**' regular expression pattern as 's3://<bucket_name>/<folder_name>/**'. For example:

CREATE PIPELINE <your_pipeline> AS
LOAD DATA S3 's3://<bucket_name>/<folder_name>/**'
CONFIG '{"region":"<your_region>"}'
CREDENTIALS '{"aws_access_key_id": "<access_key_id>",  
              "aws_secret_access_key": "<secret_access_key>"}'
SKIP DUPLICATE KEY ERRORS
INTO TABLE <your_table>;

Using two asterisks (**) after the folder instructs the pipeline to load all of the files in the main folder and ignore the files in the subfolders. However, the files in the subfolders will get scanned when listing the contents of the bucket.

No CONFIG clause is required to create an S3 pipeline. However, this clause can be used to specify things, such as, the request_payer, endpoint_urlx, and more. For more information about SingleStore Helios-specific configurations for S3, refer to S3 Configurations.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... S3 statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

AWS Pipeline Syntax for AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE

AWS_ROLE_ARN and the contents of the AWS_WEB_IDENTITY_TOKEN_FILE may be used as credentials for creating a pipeline. A syntax example is below:

CREATE PIPELINE pipeline_web_token AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-east-1"}'
CREDENTIALS '{"role_arn": "arn:aws:iam::...", 
              "aws_web_identity_token": "eyJhb...."}'
INTO TABLE `table_name`;

AWS Elastic Kubernetes Service  (EKS)  IAM Roles for Service Accounts (IRSA) Authentication

AWS EKS IRSA may be used when creating an S3 pipeline. To use AWS EKS IRSA credentials, contact SingleStore support. To enable this authentication add "creds_mode": "eks_irsa" to the CONFIG section of the pipeline and it will use the service role assigned to the EKS pods for S3 Authentication.

For a tutorial on getting started with S3 Pipelines, see Load Data from Amazon Web Services (AWS) S3.

S3 Pipeline Using Specified Region

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key"
[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;

S3 Pipeline Using Metadata Garbage Collection (GC)

By default, each pipeline maintains progress metadata for every known file name, for the lifetime of the pipeline. The optional ENABLE OFFSETS METADATA GC clause of CREATE PIPELINE causes the created pipeline to periodically discard metadata for already-loaded files, using a timestamp-based strategy that preserves exactly-once semantics. This option is only supported for S3 pipelines.

The number of batch metadata entries retained before being overwritten by new incoming batches may be adjusted by changing the default value of the global engine variable. pipelines_batches_metadata_to_keep. See Sync Variables Lists for more information.

CREATE PIPELINE <pipeline_name> AS LOAD DATA S3 "<file_name>"
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key"
[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
ENABLE/DISABLE OFFSETS METADATA GC
INTO TABLE <table_name>;

Note

By default the METADATA GC clause is disabled. Once it has been enabled it may be disabled by using the ALTER PIPELINE command.

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA AZURE 'container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'
INTO TABLE `table_name`;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an Azure Blob container named container-name as the data source, and will start ingesting the bucket’s objects into table_name. For a tutorial on getting started with Azure Pipelines, see Load Data from Azure Blob Storage Using a Pipeline.

Note that no CONFIG clause is required to create an Azure pipeline unless you need to specify the suffixes for files to load or the disable_gunzip condition. For more information about SingleStore Helios-specific configurations for Azure Blobs, refer to Azure Blob Configurations.

An Azure pipeline must authenticate with Azure before it can begin reading blobs from a container. The pipeline requires you to provide an account name and account access key.

The CREDENTIALS clause should be a JSON object with two fields:

account_name: this is the account name under which your blob container resides. This is usually a human-readable name, given by the person who created the account.

account_key: usually an 88 character 512-bit string linked to a storage account.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... AZURE statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

See the Azure documentation about viewing and managing Azure Access Keys to learn more.

Permissions and Policies

An Azure pipeline can be created to read from either a container or a blob. Both of these resource types may be configured with access policies or permissions. Before creating an Azure pipeline, it’s important to consider both the provided user credentials and any existing policies or permissions on the desired resource.

For example, if you provide credentials that implicitly grant access to all blobs in a container, you may not need to add or modify any access policies or permissions on the desired resource. However, if you provide credentials that do not have permission to access the resource, an administrator will need to allow access for your credentials.

Consider the following scenarios:

  • Read all objects in a container: An Azure pipeline configured for a container will automatically read all blobs it has access to. Changes to a container-level policy or permissions may be required.

  • Read all blobs with a given prefix: An Azure pipeline configured to read all objects in a container with a given prefix may require changes to a container-level policy with a prefix in order to allow access the desired blobs.

  • Read a specific object in a container: An Azure pipeline configured for a specific blob may require changes to the policies and permissions for both the blob and container.

HDFS Pipeline Syntax

The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `table_name`
FIELDS TERMINATED BY '\t';
START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name and references the HDFS path /path/to/files as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*) are imported into my_table. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS file.

For more information on SingleStore Helios-specific configurations for HDFS, refer to HDFS Configurations.

As an alternative to specifying the CONFIG clause in a CREATE PIPELINE ... HDFS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

For a tutorial on getting started with HDFS Pipelines, Load Data from HDFS Using a Pipeline.

GCS Pipeline Syntax

The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA GCS 'bucket-name'
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';
START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name, uses a GCS bucket named bucket-name as the data source, and ingests the bucket’s objects into my_table. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.

For more information on SingleStore Helios-specific configurations for GCS, refer to GCS Configurations.

GCS pipelines support globbing (via the * wildcard) in the bucket path and name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA GCS 'my_bucket_path/file1*' ....

For a tutorial on getting started with GCS Pipelines, see Load Data from Google Cloud Storage (GCS) Using a Pipeline.

Authentication and Access Control in GCS

A GCS pipeline requires you to authenticate to the desired bucket. Depending on the provided credentials, a bucket’s access permissions or policies may require changes to allow access. To familiarize yourself with access management for buckets and objects, see the GCS documentation here.

Authentication

A GCS pipeline must authenticate to Google before it can begin reading objects from a bucket. While GCS allows anonymous authentication to buckets and their objects, SingleStore Helios GCS pipelines support only HMAC keys. The pipeline requires you to provide a GCS access key and secret key.

The CREDENTIALS clause should be a JSON object with two fields:

access_id: usually a 24 or 60 character alphanumeric string, which is linked to the Google account, typically is all uppercase and starts with GOOG.

secret_key: usually a 40 character Base-64 encoded string that is linked to a specific access_id.

...
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
...

See the GCS documentation about managing GCS HMAC keys to learn more.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... GCS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

GCS Pipeline Limitations

A GCS pipeline has a few limitations and also inherits limitations from the Google Cloud Storage service itself. See the GCS Documentation for more detailed information.

  • Versioning: If your GCS pipeline is configured to read from a version-enabled bucket, you will only receive the latest version of the object from the bucket. Currently, you cannot configure your GCS pipeline to read specific version IDs.

  • 5 TB Max Object Size: Google Cloud Storage supports a maximum object size of 5 TB, and this limit also applies to an GCS pipeline.

  • Rate Limiting: A large SingleStore Heliosworkspace configured with a GCS pipeline might encounter throttling or rate limiting imposed by GCS. Currently, a GCS pipeline cannot be configured to bypass these limits by reducing the frequency of read requests. For more information on GCS’s rate limiting and how to optimize your data, see this GCS documentation page.

Creating an Iceberg Pipeline

Refer to Iceberg Ingest for information on creating pipelines to directly ingest data from Apache Iceberg tables into SingleStore and for information on using <iceberg_format_options>.

Creating a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source. Refer to Create a Parquet Pipeline for information on creating Parquet pipelines.

LOAD DATA

Note

Pipelines natively load JSON, Avro, and CSV input data; see LOAD DATA for example syntax.

When you run a CREATE PIPELINE ... LOAD DATA command, it processes extracted and transformed records as if you ran LOAD DATA alone, using the same options. These options are explained in LOAD DATA.

CREATE PIPELINE ... LOAD DATA also has additional options that are not available in LOAD DATA. These additional options are explained below.

The SCHEMA REGISTRY {"IP" | "Hostname"} option allows LOAD DATA to pull an Avro schema from a schema registry. For more information, see the Avro Schema Evolution with Pipelines topic.

AS LOAD DATA: You can load data by specifying the data source as a Kafka workspace and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka workspace and the path to a specific topic with the workspace. For example:

    LOAD DATA KAFKA 'host.example.com/my-topic'
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'bucket-name'
  • AS LOAD DATA ... FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry. You can optionally use the CONFIG and CREDENTIALS clauses to specify configuration settings to connect to the schema registry over SSL.

    Note

    The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL. The ssl. settings shown only apply to SingleStore Helios 7.3.5 and later.

    CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro"
    INTO TABLE ...
    FORMAT AVRO
    SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
    ...
    CONFIG '{"schema.registry.ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node",
    "schema.registry.ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node",
    "schema.registry.ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}'
    CREDENTIALS '{"schema.registry.ssl.key.password": "your-password-for-the-ssl-certificate-key"}'

    Note

    You can use a subset of the ssl. settings as follows:

    • schema.registry.ssl.key.location, schema.registry.ssl.ca.location, and schema.registry.ssl.key.password

    • schema.registry.ssl.certificate.location, schema.registry.ssl.key.location, and schema.registry.ssl.key.password

    schema.registry.ssl.key.password is only required if your SSL certificate key has a password.

    As an alternative to specifying the SSL configuration settings in the CONFIG and CREDENTIALS clauses, you can install the registry's certificate (ca-cert) on all nodes in your SingleStore Heliosworkspace.

  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large workspaces to prevent the pipeline from throttling system resources.

  • [MAX_RETRIES_PER_BATCH_PARTITION max_retries]: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table. If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up to max_retries specified in this clause. If no value is specified or MAX_RETRIES_PER_BATCH_PARTITION is set to 0, then max_retries is set to the value specified by the PIPELINES_MAX_RETRIES_PER_BATCH_PARTITION engine variable.

  • [RESOURCE POOL pool_name]: Specifies the resource pool that is used to load pipeline data.

    • If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the resource_pool engine variable as its resource pool.

      • In order to create a new resource pool, the user will need to have SUPER or CREATE_RG privileges.

      • Use information_schema.USER_PRIVILEGES to see users and their related privileges. See USER_PRIVILEGES for more information.

    • To obtain details about a resource pool, use select * from information_schema.mv_resource_pool_status to review the current running threads on a cluster for all nodes.

    • The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.

    • The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.

    Note

    A good starting point for troubleshooting memory issues in pipelines is to run SELECT * FROM information_schema.mv_processlist. See mv_processlist for more information.

  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells SingleStore Helios to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics (an aggregator pipeline is not required for single-partition Kafka topics). An aggregator pipeline is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.

Example

The following example shows how to use multiple SET clauses with CREATE PIPELINE ... LOAD DATA for ingesting values to columns using variables. Consider the following file:

echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv

Create a pipeline that ingests infile.csv into the following table orders.

CREATE TABLE orders (
ID INT,
del_t1 DATETIME,
del_t2 DATETIME);
CREATE PIPELINE order_load AS
LOAD DATA FS '/pipeline_test_infile/'
INTO TABLE orders
FIELDS TERMINATED BY ','
(ID, @del_t1, @del_t2)
SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),
del_t2 = IF(@del_t2='NULL',NULL,@del_t2);

Test the pipeline:

TEST PIPELINE order_load;
+------+---------------------+---------------------+
| ID   | del_t1              | del_t2              |
+------+---------------------+---------------------+
|    1 | NULL                | 2020-08-06 07:53:09 |
|    2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
|    3 | 2020-08-05 07:53:09 | NULL                |
+------+---------------------+---------------------+

Note

If you want to see more examples of loading data with vectors, refer to How to Bulk Load Vectors.

Loading Data into a Pipeline Using JSON

When using JSON formatted data, the mapping clause should be used. Without this clause, the data mapping will be inferred based on its schema. If a field name is nullable and has not been specified using the mapping clause, a NULL value will be returned instead of an error.

Below are examples for loading data into a pipeline using JSON.

The following examples use a JSON key pair and an array from a local file source.

CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR(10)); 
CREATE PIPELINE jtest AS
LOAD DATA FS '<file path>/keypairs.json'
INTO TABLE keypairs
FORMAT JSON(`key` <- keypairs::`key`,`value` <- keypairs::`value`); 
START PIPELINE jtest;
SELECT * FROM keypairs;
+------+--------+
| key  | value  | 
+------+--------+
|    1 | 1      |
+------+--------+

Warning

To use reserved words in a table, backticks are required. The example above uses key and value as column names by using backticks.

CREATE TABLE teams(basketball VARCHAR(50), baseball VARCHAR (50),
football VARCHAR(50), hockey VARCHAR(50));
CREATE PIPELINE teams_list AS
LOAD DATA FS '<file path>/jtinsert.json'
INTO TABLE teams
FORMAT JSON
(basketball <- teams::basketball,
baseball <- teams::baseball,
football <- teams::football,
hockey <- teams::hockey);
START PIPELINE teams_list;
SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball  | football | hockey |
+------------+-----------+----------+--------+
|  Lakers    | Dodgers   | Raiders  | Kings  |
+------------+-----------+----------+--------+

The following examples use a JSON key pair and an array from an S3 bucket.

CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR (10));
CREATE PIPELINE jtest AS
LOAD DATA S3 's3://<bucket_name>/<filename>.json'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXX"}'
INTO TABLE keypairs
FORMAT JSON
(`key` <- keypairs::`key`,
`value` <- keypairs::`value`);
START PIPELINE jtest;
SELECT * FROM keypairs;
+------+--------+
| key  | value  | 
+------+--------+
|    1 | 1      |
+------+--------+
CREATE TABLE teams(basketball varchar(50), baseball varchar (50),
football varchar(50), hockey varchar(50));

Below is the contents of the JSON file used in the pipeline example:

{
"teams": [{
"basketball": "Knicks"
},
{
"baseball": "Yankees"
},
{
"football": "Giants"
},
{
"hockey": "Rangers"
}
]
}
CREATE PIPELINE teams_list AS
LOAD DATA S3 's3://<bucket_name>/<filename>.json'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXX"}'
INTO TABLE teams
FORMAT JSON
(basketball <- teams::basketball,
baseball <- teams::baseball,
football <- teams::football,
hockey <- teams::hockey);
START PIPELINE teams_list;
SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball  | football | hockey |
+------------+-----------+----------+--------+
|  Lakers    | Dodgers   | Raiders  | Kings  |
+------------+-----------+----------+--------+

Loading JSON Data from a CSV File

To use an ENCLOSED BY <char> as a terminating field, a TERMINATED BY clause is needed. For clarity, instances of an ENCLOSED BY <char> appearing within a field value can be duplicated, and they will be understood as a singular occurrence of the character.

If an ENCLOSED BY "" is used, quotes are treated as follows:

  • "The ""NEW"" employee"  → The "NEW" employee

  • The "NEW" employee → The "NEW" employee

  • The ""NEW"" employee → The ""NEW"" employee

Example 1

An  ENCLOSED BY clause is required when a csv file has a JSON column enclosed with double quotes (" ").

CREATE TABLE employees(emp_id INT, data JSON);
csv file contents
emp_id,data
159,"{""name"": ""Damien Karras"", ""age"": 38, ""city"": ""New York""}"
CREATE OR REPLACE PIPELINE emps AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
IGNORE 1 LINES;
START PIPELINE emps;
SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data                                                |
+--------+-----------------------------------------------------+
|    159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+

Example 2

An  ESCAPED BY clause is required when a character is specified as an escape character for a string. The example below uses a backslash (\) as the escape character.

csv file contents
emp_id,data
298,"{\"name\": \"Bill Denbrough\", \"age\": 25, \"city\": \"Bangor\"}"
CREATE OR REPlACE PIPELINE emps2 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '\\'
IGNORE 1 LINES;
START PIPELINE emps2;
SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data                                                |
+--------+-----------------------------------------------------+
|    298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"}  |
|    159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+

Example 3

This example will fail as the JSON field in the csv file is not in the correct format.

csv file contents
emp_id,data
410,"{"name": "Annie Wilkes", "age": 45, "city":"Silver Creek"}"
CREATE OR REPLACE PIPELINE emps3 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
IGNORE 1 LINES;
START PIPELINE emps3;
ERROR 1262 (01000): Leaf Error (127.0.0.1:3307): Row 1 was truncated; it contained more data than there were input columns

Example 4

An  ENCLOSED BY clause is required when a csv file has a JSON column enclosed with curly brackets ({ }).

csv file contents
emp_id,data
089,{"name": "Wilbur Whateley","age": 62,"city": "Dunwich"}
CREATE OR REPLACE PIPELINE emps4 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '{'
IGNORE 1 LINES;
START PIPELINE emps4;
SELECT * FROM employees;
+--------+------------------------------------------------------+
| emp_id | data                                                 |
+--------+------------------------------------------------------+
|    298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"}   |
|    159 | {"age":38,"city":"New York","name":"Damien Karras"}  |
|     89 | {"age":62,"city":"Dunwich","name":"Wilbur Whateley"} |
+--------+------------------------------------------------------+

INTO PROCEDURE

Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline's data source. For more information, see CREATE PIPELINE ... INTO PROCEDURE.

Last modified: December 13, 2024

Was this article helpful?