CREATE PIPELINE
Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.
Note
When you run a CREATE PIPELINE ... LOAD DATA
command, it processes extracted and transformed records as if you ran LOAD DATA
alone, using the same options. These options are explained in LOAD DATA.
CREATE PIPELINE ... LOAD DATA
also has additional options that are not available in LOAD DATA
. These additional options are explained in the LOAD DATA
section at the end of this topic.
Syntax
CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> AS LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration | azure_blob_configuration | hdfs_configuration | gcs_configuration | link_configuration} [BATCH_INTERVAL <milliseconds>] [MAX_PARTITIONS_PER_BATCH <max_partitions_per_batch>] [MAX_RETRIES_PER_BATCH_PARTITION max_retries] [RESOURCE POOL <resource_pool_name>] [(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION] [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ] [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS] { INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> } { <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> } [ (<column_name>, ... ) ] [SET <column_name> = <expression>,... | <pipeline_source_file>] [WHERE <expression>,...] [ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]] [NULL DEFINED BY <string> [OPTIONALLY ENCLOSED]] <kafka_configuration>: KAFKA 'kafka_topic_endpoint' <s3_configuration>: S3 { '<bucket-name>' | '<bucket-name/path>' } [CONFIG '<configuration_json>'] CREDENTIALS '<credentials_json>' <filesystem_configuration>: FS 'path' [CONFIG '<configuration_json>'] <azure_blob_configuration>: AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' } CREDENTIALS '<credentials_json>' [CONFIG '<configuration_json>'] <hdfs_configuration>: HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>' [CONFIG '<configuration_json>'] <gcs_configuration>: GCS { '<bucket-name>' | '<bucket-name/path>' } CREDENTIALS '<credentials_json>' [CONFIG '<configuration_json>'] <link_configuration>: LINK [<database_name>.]<connection_name> '<path>' <json_format_options>: FORMAT JSON ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...) [KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)] <avro_format_options>: FORMAT AVRO SCHEMA REGISTRY {uri} ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...) [SCHEMA '<avro_schema>'] [KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)] <subvalue_path>: {% | [%::]ident [::ident ...]} <parquet_format_options>: FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>'] <parquet_subvalue_mapping>: ( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...) <parquet_subvalue_path>: {ident [::ident ...]} <csv_format_options>: [FORMAT CSV] [{FIELDS | COLUMNS} TERMINATED BY '<string>' [[OPTIONALLY] ENCLOSED BY '<char>'] [ESCAPED BY '<char>'] ] [LINES [STARTING BY '<string>'] [TERMINATED BY '<string>'] ] [IGNORE <number> LINES] [ ({<column_name> | @<variable_name>}, ...) ]
Note
This topic uses the term "batch", which is a subset of data that a pipeline extracts from its data source. For more information on batches, see The Lifecycle of a Pipeline.
Note
The REPLACE
capability of pipelines maintains the cursor positions when replacing existing pipelines.
Remarks
Pipeline names are always case-sensitive for operations that refer to pipelines.
If the
OR REPLACE
clause is provided and a pipeline withpipeline_name
already exists, then theCREATE
query will alter that pipeline to match the new definition. Its state, including its cursor positions, will be preserved by theCREATE
.CONFIG
andCREDENTIALS
can be specified in either order (CONFIG
followed byCREDENTIALS
orCREDENTIALS
followed byCONFIG
).SKIP CONSTRAINT ERRORS
andSKIP DUPLICATE KEY ERRORS
are unsupported with pipelines into stored procedures.IGNORE
,SKIP PARSER ERRORS
, andSKIP ALL ERRORS
are unsupported with non-CSV pipelines.REPLACE
,SKIP CONSTRAINT ERRORS
, andSKIP DUPLICATE KEY ERRORS
are supported with non-CSV pipelines.This command causes implicit commits. See COMMIT for more information.
Pipelines can be used to ingest and process data with supported data sets. For more information, see Working with Character Sets and Collations.
You can use the function
pipeline_batch_id()
in an expression in theSET
clause. This function returns the id of the batch used to load the data. For example, given the table definitionCREATE TABLE t(b_id INT, column_2 TEXT);
, you could create this statement to load the batch id into the columnb_id
:CREATE PIPELINE p AS LOAD DATA ... INTO TABLE t(@b_id,column_2) ... SET b_id = pipeline_batch_id();
Pipeline names that begin with a number or are solely numeric; require backticks (` `) around the pipeline name.
CREATE PIPELINE `3e93587cb1` AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE <table_name>;
See the Permission Matrix for the required permission.
REPLACE, IGNORE or SKIP ERRORS
REPLACE, IGNORE or SKIP can be used when creating a pipeline. See Additional CREATE PIPELINE Examples for more information.
The SKIP ... ERRORS
behavior allows you to specify an error scenario that, when encountered, discards an offending row. See SKIP ALL ERRORS Example for more information.
Unlike SKIP ... ERRORS
which discards offending rows, IGNORE
may change the inserted row’s data to ensure that it adheres to the table schema. See IGNORE
Error Handling
Creating a Pipeline Using a Connection Link
The CREATE PIPELINE .. LINK
statement loads data from the data provider using a connection link. To use this command, you only need to know the connection link name, not the connection details and configuration. However, you need the SHOW LINK
permission, provided by your administrator, to use a connection link. This command supports connections to S3, Azure, GCS, HDFS, and Kafka only.
The following example creates a pipeline using an HDFS connection:
USE db1; CREATE PIPELINE mypipeline AS LOAD DATA LINK HDFSconnection 'hdfs://hadoop-namenode:8020/path/to/files' INTO TABLE my_table; START PIPELINE mypipeline;
Kafka Pipeline Syntax
The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:
Minimum Required Kafka Pipeline Syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE table_name; START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_name
, uses a Kafka cluster as the data source, points to the location of the my-topic
topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name
. For a tutorial on getting started with Kafka Pipelines, see Load Data from Kafka Using a Pipeline .
Kafka Pipeline Syntax Using the CONFIG and CREDENTIALS Clauses
The following statement creates a Kafka pipeline to load data from Confluent Cloud, using the CONFIG
and CREDENTIALS
clauses.
CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA '<Confluent Cloud cluster endpoint>/test' CONFIG '{"sasl.username": "<CLUSTER_API_KEY>", "sasl.mechanism": "PLAIN", "security.protocol": "SASL_SSL", "ssl.ca.location":"<CA certificate file path>", CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}' INTO TABLE table_name;
The Kafka download process involves querying one broker for the URLs of other brokers. It can be difficult to configure Kafka brokers to advertise proxy URLs in that step of the process.
The CONFIG
clause of a Kafka pipeline can accept a spoof.dns
element as an alternative to configuring Kafka brokers. The spoof.dns
element must be a JSON object consisting of an arbitrary number of key-value pairs with URL string values. When the pipeline attempts to connect to a Kafka broker whose URL matches one of the keys, the pipeline will connect to the corresponding URL value effectively remapping the broker URLs inside the pipeline Kafka client.
Using the spoof.dns
element is useful when connecting to Kafka via a proxy; for example, when connecting across multiple cloud services. Instead, you can use spoof.dns
to re-route connections to the proxy without modifying the Kafka broker configuration.
CREATE PIPELINE `p` AS LOAD DATA KAFKA 'bootstrap.broker.url.com:6000/topic' CONFIG '{ "spoof.dns": { "broker1.url.com:6000":"proxy.url.com:6000", "broker2.url.com:9092":"proxy.url.com:6001", "broker3.url.com:9092":"proxy2.url.com:6000", } }' INTO TABLE t;
This CREATE PIPELINE
command will re-route all connections from broker1.url.com:6000
to proxy.url.com:6000
; from broker2.url.com:9092
to proxy.url.com:6001
; and from broker3.url.com:9092
to proxy2.url.com:6000
respectively.
For a tutorial on getting started with Kafka Pipelines, see Load Data from Kafka Using a Pipeline.
Note
By default, records may be inserted out of order if the SingleStore cluster is sufficiently behind in loading records from the source. If you require the records to be inserted in order, e.g. in upsert scenarios, pass the DISABLE OUT_OF_ORDER OPTIMIZATION
clause to CREATE PIPELINE
or ALTER PIPELINE
. When this optimization is disabled, records from the same source file or source partition will be inserted in the order in which they appear in that partition. However, records from different source files or source partitions may still be inserted in an arbitrary order with respect to each other.
Compressed topics are supported without requiring any additional configuration in SingleStore.
A CREATE PIPELINE ... KAFKA
statement cannot specify more than one topic.
In a CREATE PIPELINE ... KAFKA
statement, you can specify the CONFIG
and CREDENTIALS
clauses. As an alternative, you can reference a connection link. For more information, see Configuring and Using Connection Links.
Kafka pipelines that use FORMAT JSON
or FORMAT AVRO SCHEMA REGISTRY
can use KAFKA KEY
to extract keys from Kafka records. Keys must be in the format specified in the FORMAT
clause.
Example: Accessing Kafka Keys in a Pipeline
The following CREATE PIPELINE
statement creates a new pipeline named pipeline_name
, uses a Kafka cluster as the data source, points to the location of the my-topic
topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name
when the pipeline is started. The pipeline expects the payload of each Kafka record to be in JSON format and have a field named json_a
, which the pipeline maps to a column named col_a
in the destination table. In addition, the pipeline expects the key of each Kafka record to be in JSON format and have a field named json_b
, which the pipeline maps to a column named col_b
in the destination table.
CREATE TABLE table_name (col_a INT, col_b TEXT); CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE 'table_name' FORMAT JSON (col_a <- json_a) KAFKA KEY (col_b <- json_b); START PIPELINE pipeline_name;
Inserting a record into the my-topic
topic with key {"json_b": "hello"}
and payload {"json_a": 1}
would result in the following row in the destination table:
col_a | col_b |
---|---|
1 | hello |
Kafka Pipeline Support for Transactions
A Kafka Pipeline can be configured to use one the following isolation levels, which determines whether the pipeline will support Kafka transactions.
read_committed
: The pipeline will read committed messages, which have been written transactionally, from Kafka partitions. The pipeline will also read messages, which have not been written transactionally, from Kafka partitions.read_uncommitted
: The pipeline will read messages, which have not been written transactionally, from Kafka partitions. The pipeline will also read messages which have been written transactionally, regardless if they have been committed.
For more information, see the Reading Transactional Messages section of the KafkaConsumer topic in the Apache documentation.
The isolation level, isolation.level
, can be set using the CONFIG
option of a CREATE PIPELINE
statement. For example:
CREATE PIPELINE ... KAFKA ... CONFIG '{"isolation.level': "read_uncommitted"}' ...
By default, the isolation level is read_committed
.
Specifying Multiple Kafka Brokers
The following example creates a pipeline where three Kafka brokers are specified. They are located on port 9092
, at host1.example.com
, host2.example.com
and host3.example.com
.
CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA 'host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/my-topic' INTO TABLE 'table_name';
S3 Pipeline Syntax
The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:
Minimum Required S3 Pipeline Syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA S3 'bucket-name' CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}' INTO TABLE `table_name`; START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_name
, uses an S3 bucket named bucket-name
as the data source, and will start ingesting the bucket’s objects into table_name
. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).
No CONFIG
clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG
clause is specified, SingleStoreDB will automatically use the us-east-1
region, also known as US Standard
in the Amazon S3 console. To specify a different region, such as us-west-1
, include a CONFIG
clause as shown in the example below. The CONFIG
clause can also be used to specify the suffixes
for files to load or to specify the disable_gunzip
condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE
only loads files that have the specified suffix. Suffixes in the CONFIG
clause can be specified without a .
before them, for example, CONFIG '{"suffixes": ["csv"]}'
. When enabled, the disable_gunzip
option disables the decompression of files with the .gz
extension. If this option is disabled or missing, files with the .gz
extension will be decompressed.
The request_payer
field name may be added to the CONFIG
clause.
CREATE OR REPLACE PIPELINE <pipeline_name> AS LOAD DATA S3 'data-test-bucket' CONFIG '{"region": "us-east-1","request_payer": "requester"}' CREDENTIALS '{"aws_access_key_id": "ANIAVX7U2LM9QVJMK2ZT", "aws_secret_access_key": "xxxxxxxxxxxxxxxxxxxxxxx"}' INTO TABLE 'market_data' (ts, timestamp, event_type, ticker, price, quantity, exchange, conditions);
Note
S3 pipelines support the *
wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA S3 'my_bucket/my_path*' ...
.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE ... S3
statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.
AWS Pipeline Syntax for AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE
AWS_ROLE_ARN
and the contents of the AWS_WEB_IDENTITY_TOKEN_FILE
may be used as credentials for creating a pipeline. A syntax example is below:
CREATE PIPELINE pipeline_web_token AS LOAD DATA S3 'bucket-name' CONFIG '{"region": "us-east-1"}' CREDENTIALS '{"role_arn": "arn:aws:iam::...", "aws_web_identity_token": "eyJhb...."}' INTO TABLE `table_name`;
AWS Elastic Kubernetes Service (EKS) IAM Roles for Service Accounts (IRSA) Authentication
AWS EKS IRSA may be used when creating an S3 pipeline. To use AWS EKS IRSA credentials, set the enable_eks_irsa
global variable to on
as its disabled by default. To enable this authentication add "creds_mode": "eks_irsa"
to the CONFIG
section of the pipeline and it will use the service role assigned to the EKS pods for S3 Authentication.
AWS EKS IRSA may be used when creating an S3 pipeline. To enable this authentication add "creds_mode": "eks_irsa"
to the CONFIG
section of the pipeline and it will use the service role assigned to the EKS pods for S3 Authentication.
For a tutorial on getting started with S3 Pipelines, see Load Data from Amazon S3 using a Pipeline.
S3 Pipeline Using Specified Region
CREATE PIPELINE pipeline_name AS LOAD DATA S3 'bucket-name' CONFIG '{"region": "us-west-1"}' CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key" [, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}' INTO TABLE `table_name`;
S3 Pipeline Using Metadata Garbage Collection (GC)
By default, each pipeline maintains progress metadata for every known file name, for the lifetime of the pipeline. The optional ENABLE OFFSETS METADATA GC
clause of CREATE PIPELINE
causes the created pipeline to periodically discard metadata for already-loaded files, using a timestamp-based strategy that preserves exactly-once semantics. This option is only supported for S3 pipelines.
The number of batch metadata entries retained before being overwritten by new incoming batches may be adjusted by changing the default value of the global engine variable. pipelines_batches_metadata_to_keep
. See Pipelines Sync Variables for more information.
CREATE PIPELINE <pipeline_name> AS LOAD DATA S3 "<file_name>" CONFIG '{"region": "us-west-1"}' CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key" [, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}' ENABLE/DISABLE OFFSETS METADATA GC INTO TABLE <table_name>;
Note
By default the METADATA GC
clause is disabled. Once it has been enabled it may be disabled by using the ALTER PIPELINE
command.
Azure Blob Pipeline Syntax
The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.
Minimum Required Azure Pipeline Syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA AZURE 'container-name' CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}' INTO TABLE `table_name`; START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_name
, uses an Azure Blob container named container-name
as the data source, and will start ingesting the bucket’s objects into table_name
. For a tutorial on getting started with Azure Pipelines, see Load Data from Azure Blobs using a Pipeline.
Note that no CONFIG
clause is required to create an Azure pipeline unless you need to specify the suffixes
for files to load or the disable_gunzip
condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE
only loads files that have the specified suffix. Suffixes in the CONFIG
clause can be specified without a .
before them, for example, CONFIG '{"suffixes": ["csv"]}'
. When enabled, the disable_gunzip
option disables the decompression of files with the .gz
extension. If this option is disabled or missing, files with the .gz
extension will be decompressed.
An Azure pipeline must authenticate with Azure before it can begin reading blobs from a container. The pipeline requires you to provide an account name and account access key.
The CREDENTIALS
clause should be a JSON object with two fields:
account_name
: this is the account name under which your blob container resides. This is usually a human-readable name, given by the person who created the account.
account_key
: usually an 88 character 512-bit string linked to a storage account.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE ... AZURE
statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.
See the Azure documentation about viewing and managing Azure Access Keys to learn more.
Permissions and Policies
An Azure pipeline can be created to read from either a container or a blob. Both of these resource types may be configured with access policies or permissions. Before creating an Azure pipeline, it’s important to consider both the provided user credentials and any existing policies or permissions on the desired resource.
For example, if you provide credentials that implicitly grant access to all blobs in a container, you may not need to add or modify any access policies or permissions on the desired resource. However, if you provide credentials that do not have permission to access the resource, an administrator will need to allow access for your credentials.
Consider the following scenarios:
Read all objects in a container: An Azure pipeline configured for a container will automatically read all blobs it has access to. Changes to a container-level policy or permissions may be required.
Read all blobs with a given prefix: An Azure pipeline configured to read all objects in a container with a given prefix may require changes to a container-level policy with a prefix in order to allow access the desired blobs.
Read a specific object in a container: An Azure pipeline configured for a specific blob may require changes to the policies and permissions for both the blob and container.
Filesystem Pipeline Syntax
The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA FS '/path/to/files/*' INTO TABLE `table_name` FIELDS TERMINATED BY ','; START PIPELINE pipeline_name;
This statement creates a new pipeline named pipeline_name
, uses a directory as the data source, and will start ingesting data into table_name
.
To disable the decompression of files with the .gz
extension, enable the disable_gunzip
option in the CONFIG
clause. If this option is disabled or missing, files with the .gz
extension will be decompressed.
CONFIG '{"disable_gunzip" : true}'
In order to ensure a pipeline processes zero byte files, enable the process_zero_byte_files
option in the CONFIG
clause. If this option is disabled or missing, zero byte files will be skipped by default.
CONFIG '{"process_zero_byte_files" : true}'
For a tutorial on getting started with Filesystem Pipelines, see Load Data from the Filesystem using a Pipeline.
Filesystem Paths and Permissions
The paths used by the Unix filesystem extractor address files which must be accessible from every node in the cluster. The most common way to use this extractor is with a distributed filesystem, such as NFS. The path provided to a Unix filesystem extractor must be absolute, and will usually include a Unix glob pattern which matches many files, as seen in the example above. In order for the Unix filesystem extractor to load data, the files must grant read permissions to the SingleStoreDB Linux user.
Azure and S3 Folder Paths
If an Azure Blob or S3 bucket contains multiple folders beginning with similar names, the pipeline will read the data from all the folders unless and until a forward slash (/)
is included at the end of the path.
Below is an Azure example using three folders that begin with payment in their names. The forward slash ensures only the data in the Payment folder is loaded.
Payment
PaymentHistory
PaymentPlan
CREATE OR REPLACE PIPELINE pipeline_csv_insert_payment AS
LOAD DATA AZURE 'uploads/csv/Payment/'
CREDENTIALS
'{"account_name": "<account-name>",
"account_key": "<account-key>"
}'
INTO PROCEDURE spPipelineCsvInsertPayment
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\r\n'
IGNORE 1 LINES;
Below is an S3 bucket example using two folders: BooksNautical and BooksHorror. The forward slash ensures only the data in the BooksNautical folder is loaded.
CREATE PIPELINE books AS
LOAD DATA S3 's3://test-bucket5-ss/BooksNautical/'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXX"}'
INTO TABLE all_books
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\'
LINES TERMINATED BY '\r\n' STARTING BY ''
IGNORE 1 LINES;
HDFS Pipeline Syntax
The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files' INTO TABLE `table_name` FIELDS TERMINATED BY '\t'; START PIPELINE pipeline_name;
This example creates a new pipeline named pipeline_name
and references the HDFS path /path/to/files
as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*)
are imported into my_table
. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS
file.
If you would like to create a pipeline that imports Hive output files, set the disable_partial_check
attribute in your CREATE PIPELINE
’s CONFIG
JSON to true
. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.
You also specify attributes in CREATE PIPELINE
’s CONFIG
clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.
To disable the decompression of files with the .gz
extension, enable the disable_gunzip
option in the CONFIG
clause. If this option is disabled or missing, files with the .gz
extension will be decompressed.
CONFIG '{"disable_gunzip" : true}'
As an alternative to specifying the CONFIG
clause in a CREATE PIPELINE ... HDFS
statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.
For a tutorial on getting started with HDFS Pipelines, Load Data from HDFS using a Pipeline.
GCS Pipeline Syntax
The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:
CREATE PIPELINE pipeline_name AS LOAD DATA GCS 'bucket-name' CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}' INTO TABLE `table_name` FIELDS TERMINATED BY ','; START PIPELINE pipeline_name;
This example creates a new pipeline named pipeline_name
, uses a GCS bucket named bucket-name
as the data source, and ingests the bucket’s objects into my_table
. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.
To disable the decompression of files with the .gz
extension, enable the disable_gunzip
option in the CONFIG
clause. If this option is disabled or missing, files with the .gz
extension will be decompressed.
CONFIG '{"disable_gunzip" : true}'
Note
GCS pipelines support the *
wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA GCS 'my_bucket/my_path*' ...
.
For a tutorial on getting started with GCS Pipelines, see Load Data from Google Cloud Storage (GCS) using a Pipeline.
Authentication and Access Control in GCS
A GCS pipeline requires you to authenticate to the desired bucket. Depending on the provided credentials, a bucket’s access permissions or policies may require changes to allow access. To familiarize yourself with access management for buckets and objects, see the GCS documentation here.
Authentication
A GCS pipeline must authenticate to Google before it can begin reading objects from a bucket. While GCS allows anonymous authentication to buckets and their objects, SingleStoreDB GCS pipelines support only HMAC keys. The pipeline requires you to provide a GCS access key and secret key.
The CREDENTIALS
clause should be a JSON object with two fields:
access_id
: usually a 24 or 60 character alphanumeric string, which is linked to the Google account, typically is all uppercase and starts with GOOG
.
secret_key
: usually a 40 character Base-64 encoded string that is linked to a specific access_id
.
... CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}' ...
See the GCS documentation about managing GCS HMAC keys to learn more.
As an alternative to specifying the CONFIG
and CREDENTIALS
clauses in a CREATE PIPELINE ... GCS
statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.
GCS Pipeline Limitations
A GCS pipeline has a few limitations and also inherits limitations from the Google Cloud Storage service itself. See the GCS Documentation for more detailed information.
Versioning: If your GCS pipeline is configured to read from a version-enabled bucket, you will only receive the latest version of the object from the bucket. Currently, you cannot configure your GCS pipeline to read specific version IDs.
5 TB Max Object Size: Google Cloud Storage supports a maximum object size of 5 TB, and this limit also applies to an GCS pipeline.
Rate Limiting: A large SingleStoreDBcluster configured with a GCS pipeline might encounter throttling or rate limiting imposed by GCS. Currently, a GCS pipeline cannot be configured to bypass these limits by reducing the frequency of read requests. For more information on GCS’s rate limiting and how to optimize your data, see this GCS documentation page.
Creating a Parquet Pipeline
The CREATE PIPELINE .. FORMAT PARQUET
statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source.
The statement assigns the extracted fields to the columns of a new row to be inserted into table_name
or passed to proc_name
. The fields can also be assigned to temporary values in the SET
clause, for use in SQL transformations.
Rows that don’t match the WHERE
clause are not included.
Parquet 2.0 encodings are supported.
Parquet Pipelines do not support Kafka.
When you write a CREATE PIPELINE .. FORMAT PARQUET
statement, you include the LOAD DATA
clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.
Caution
Parquet files that are compressed using the internal Parquet writer should not have a .gz
extension in the file name.
Extracting Parquet Values
parquet_subvalue_mapping
specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET
clause.
CREATE PIPELINE .. FORMAT PARQUET
uses the ::
-separated list of field names in a parquet_subvalue_path
to perform successive field name name lookups in nested Parquet group
schemas. The last field in parquet_subvalue_path
must not itself have group
type. Additionally, there must be no fields with repeated type along parquet_subvalue_path
, and the final field in the path must have a primitive type. Extracting whole records or lists of Parquet as JSON is unsupported, as is extracting individual list elements.
If an optional field along a parquet_subvalue_path
is omitted in a given record, the extracted value for that column will be NULL
.
parquet_subvalue_path
components containing whitespace or punctuation must be surrounded by backticks.
For example, consider a Parquet record with the following schema:
message m { required int64 f1; optional group g1 { optional int64 f2; optional int64 f3; } }
In an instance of this schema whose JSON equivalent is {"f1":1, {"g1":{"f2":2, "f3":None}}}
, the parquet_subvalue_path
f1
, g1::f2
, g1::f3
will extract the values 1
, 2
, and NULL
, respectively. If the JSON equivalent is instead {"f1":1, {"g1":None}}
, the parquet_subvalue_path
will extract 1
, NULL
, and NULL
, respectively.
If you are loading .parquet
data from an S3 bucket, ensure that non-empty _SUCCESS
, _committed
, and _started
files in the S3 folder are not deleted before the data in the files is loaded into the destination table.
Converting Parquet Values
After a parquet_subvalue_path
is evaluated, its value is converted to an unspecified SQL type which may be further explicitly or implicitly converted as if from a SQL string, as shown in the following table. The converted value is written to a column in the destination table or is written to a temporary variable specified in the SET
clause.
Parquet Type | Converted Value |
---|---|
|
|
| The string representation of the integer. |
| The string representation of the integer. |
| |
| SQL |
| SQL |
| Verbatim, from input bytes |
| Verbatim, from input bytes |
Converting Parquet Logical Types
Unsigned integer logical type annotations on int32
and int64
types are respected.
The decimal
annotation on binary
and fixed_len_byte_array
types will trigger conversion as if to a numeric literal compatible with a SQL DECIMAL
type of the same scale and precision.
All other logical type annotations are ignored and have no effect on conversion.
Converting Parquet Time Values
Values of int96
type will be converted as if to datetime literals truncated to microsecond precision. The underlying value will be converted to the SingleStoreDBcluster’s time zone according to the TIMEZONE
clause, with a default heuristic in place when the clause is omitted.
Time zone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as int96
. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files provide no definitive information about the time zone of int96
data.
When the TIMEZONE
clause is omitted, SingleStoreDB will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Providing the value of @@time_zone
to the TIMEZONE
clause will disable this behavior and guarantee no conversion.
When the TIMEZONE
clause is provided, SingleStoreDB will assume that encoded data is in the specified time zone, converting it to the SingleStoreDB time zone.
Times outside years 0001 through 9999 will be converted to NULL
and a warning will be emitted. In addition, if the pipeline is performing time zone conversions, then the valid range is further restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStoreDB time zone.
No automatic integer-to-datetime conversion occurs for time values encoded as int64
or int32
, even if the logical type is a time type like timestamp
. Use the SET
clause to explicitly convert such values to a DATETIME
compatible form.
Example
Consider a Parquet file with the following schema:
message m1 { required boolean f1; required fixed_len_byte_array(4) f2; optional group g1 { optional binary f3 (STRING); required int64 f4; } }
example.parquet
contains four records whose JSON representation is:
{"f1": false, "f2": "rec1", "g1": null} {"f1": true, "f2": "rec2", "g1": null} {"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}} {"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}
Create a pipeline that ingests example.parquet
into table parquet_tbl
:
CREATE TABLE parquet_tbl(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED); CREATE PIPELINE parquet_pipe AS LOAD DATA FS "example.parquet" INTO TABLE parquet_tbl (@v1 <- f1, @v2 <- f2, c3 <- g1::f3, c4 <- g1::f4) FORMAT PARQUET SET c2 = UPPER(CONVERT(@v2, CHAR)) WHERE @v1 = TRUE;
Test the pipeline:
TEST PIPELINE parquet_pipe; **** +------+------+------+ | c3 | c4 | c2 | +------+------+------+ | NULL | NULL | REC2 | | NULL | 3 | REC3 | | four | 4 | REC4 | +------+------+------+
Note the following about the output of TEST PIPELINE parquet_exp;
:
The first record in the
example.parquet
.json representation was not included in the output because theWHERE
clause, using the temporary variablev1
, filters out rows wheref1
is false.The second record of the output contains
NULL
for columnsc3
andc4
because the optional groupg1
isnull
in that record in the .json representation.The third record of the output contains
NULL
for columnc3
because the optional fieldg1::f3
isnull
in that record in the .json representation.The column
c2
in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variablef2
, which is set to the value ofv2
in each record in the .json representation.
LOAD DATA
Note
Pipelines natively load JSON, Avro, and CSV input data; see LOAD DATA for example syntax.
When you run a CREATE PIPELINE ... LOAD DATA
command, it processes extracted and transformed records as if you ran LOAD DATA
alone, using the same options. These options are explained in LOAD DATA.
CREATE PIPELINE ... LOAD DATA
also has additional options that are not available in LOAD DATA
. These additional options are explained below.
The SCHEMA REGISTRY {"IP" | "Hostname"}
option allows LOAD DATA
to pull an Avro schema from a schema registry. For more information, see the Avro Schema Evolution with Pipelines topic.
AS LOAD DATA
: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.
AS LOAD DATA KAFKA 'topic_endpoint'
: To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:LOAD DATA KAFKA 'host.example.com/my-topic'
AS LOAD DATA S3 'bucket-name'
: To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:LOAD DATA S3 'bucket-name'
AS LOAD DATA ... FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}
: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry. You can optionally use theCONFIG
andCREDENTIALS
clauses to specify configuration settings to connect to the schema registry over SSL.Note
The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL. The
ssl.
settings shown only apply to SingleStoreDB 7.3.5 and later.CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro" INTO TABLE ... FORMAT AVRO SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port" ... CONFIG '{"schema.registry.ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node", "schema.registry.ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node", "schema.registry.ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}' CREDENTIALS '{"schema.registry.ssl.key.password": "your-password-for-the-ssl-certificate-key"}'
Note
You can use a subset of the
ssl.
settings as follows:schema.registry.ssl.key.location
,schema.registry.ssl.ca.location
, andschema.registry.ssl.key.password
schema.registry.ssl.certificate.location
,schema.registry.ssl.key.location
, andschema.registry.ssl.key.password
schema.registry.ssl.key.password
is only required if your SSL certificate key has a password.As an alternative to specifying the SSL configuration settings in the
CONFIG
andCREDENTIALS
clauses, you can install the registry's certificate (ca-cert) on all nodes in your SingleStoreDBcluster.[BATCH_INTERVAL milliseconds]
: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is2500
. For example:LOAD DATA KAFKA '127.0.0.1/my-topic' BATCH_INTERVAL 500
[MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.[MAX_RETRIES_PER_BATCH_PARTITION max_retries]
: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table. If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up tomax_retries
specified in this clause. If no value is specified orMAX_RETRIES_PER_BATCH_PARTITION
is set to0
, thenmax_retries
is set to the value specified by the PIPELINES_MAX_RETRIES_PER_BATCH_PARTITION engine variable.[RESOURCE POOL pool_name]
: Specifies the resource pool that is used to load pipeline data.If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the
resource_pool
engine variable as its resource pool.In order to create a new resource pool, the user will need to have
SUPER
orCREATE_RG
privileges.Use
information_schema.USER_PRIVILEGES
to see users and their related privileges. See USER_PRIVILEGES for more information.
To obtain details about a resource pool, use
select * from information_schema.mv_resource_pool_status
to review the current running threads on a cluster for all nodes.The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.
The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.
For more information on resource pools, see Set Resource Limits.
Note
A good starting point for troubleshooting memory issues in pipelines is to run
SELECT * FROM information_schema.mv_processlist
. See mv_processlist for more information.[AGGREGATOR]
: SpecifyingCREATE AGGREGATOR PIPELINE
tells SingleStoreDB to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single fileS3
loads or single partitionKafka
topics (an aggregator pipeline is not required for single-partition Kafka topics). An aggregator pipeline is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.
Note
SingleStoreDB does not support the LOAD DATA ... [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS]
semantics for ingesting data into columnstore tables with unique keys. As an alternative, you can use the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE or write a stored procedure that handles duplicate key errors, and have the Pipeline call this procedure.
Example
The following example shows how to use multiple SET
clauses with CREATE PIPELINE ... LOAD DATA
for ingesting values to columns using variables. Consider the following file:
echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv
Create a pipeline that ingests infile.csv
into the following table orders
.
CREATE TABLE orders ( ID INT, del_t1 DATETIME, del_t2 DATETIME);
CREATE PIPELINE order_load AS LOAD DATA FS '/pipeline_test_infile/' INTO TABLE orders FIELDS TERMINATED BY ',' (ID, @del_t1, @del_t2) SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1), del_t2 = IF(@del_t2='NULL',NULL,@del_t2);
Test the pipeline:
TEST PIPELINE order_load; **** +------+---------------------+---------------------+ | ID | del_t1 | del_t2 | +------+---------------------+---------------------+ | 1 | NULL | 2020-08-06 07:53:09 | | 2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 | | 3 | 2020-08-05 07:53:09 | NULL | +------+---------------------+---------------------+
Loading Data into a Pipeline Using JSON
When using JSON formatted data, the mapping clause should be used. Without this clause, the data mapping will be inferred based on its schema. If a field name is nullable and has not been specified using the mapping clause, a NULL value will be returned instead of an error.
Below areexamples for loading data into a pipeline using JSON.
The following examples use a JSON key pair and an array from a local file source.
CREATE TABLE keypairs(`key` varchar(10), `value` varchar (10)); CREATE PIPELINE jtest AS LOAD DATA FS '<file path>/keypairs.json' INTO TABLE keypairs FORMAT JSON(`key` <- keypairs::`key`,`value` <- keypairs::`value`); START PIPELINE jtest; SELECT * FROM keypairs; ******* +------+--------+ | key | value | +------+--------+ | 1 | 1 | +------+--------+
Warning
To use reserved words in a table, backticks are required. The example above uses key
and value
as column names by using backticks.
CREATE TABLE teams(basketball varchar(50), baseball varchar (50), football varchar(50), hockey varchar(50)); CREATE PIPELINE teams_list AS LOAD DATA FS '<file path>/jtinsert.json' INTO TABLE teams FORMAT JSON (basketball <- teams::basketball, baseball <- teams::baseball, football <- teams::football, hockey <- teams::hockey); START PIPELINE teams_list; SELECT * FROM teams; ******* +------------+-----------+----------+--------+ | basketball | baseball | football | hockey | +------------+-----------+----------+--------+ | Lakers | Dodgers | Raiders | Kings | +------------+-----------+----------+--------+
The following examples use a JSON key pair and an array from an S3 bucket.
CREATE TABLE keypairs(`key` varchar(10), `value` varchar (10)); CREATE PIPELINE jtest AS LOAD DATA S3 's3://<bucket_name>/<filename>.json' CONFIG '{"region":"us-west-2"}' CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX", "aws_secret_access_key": "XXXXXXXXXXX"}' INTO TABLE keypairs FORMAT JSON (`key` <- keypairs::`key`, `value` <- keypairs::`value`); START PIPELINE jtest; SELECT * FROM keypairs; ******* +------+--------+ | key | value | +------+--------+ | 1 | 1 | +------+--------+
CREATE TABLE teams(basketball varchar(50), baseball varchar (50), football varchar(50), hockey varchar(50)); CREATE PIPELINE teams_list AS LOAD DATA S3 's3://<bucket_name>/<filename>.json' CONFIG '{"region":"us-west-2"}' CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX", "aws_secret_access_key": "XXXXXXXXXXX"}' INTO TABLE teams FORMAT JSON (basketball <- teams::basketball, baseball <- teams::baseball, football <- teams::football, hockey <- teams::hockey); START PIPELINE teams_list; SELECT * FROM teams; ******* +------------+-----------+----------+--------+ | basketball | baseball | football | hockey | +------------+-----------+----------+--------+ | Lakers | Dodgers | Raiders | Kings | +------------+-----------+----------+--------+
Loading JSON Data from a CSV File
To use an ENCLOSED BY <char>
as a terminating field, a TERMINATED BY
clause is needed. For clarity, instances of an ENCLOSED BY <char>
appearing within a field value can be duplicated, and they will be understood as a singular occurrence of the character.
If an ENCLOSED BY ""
is used, quotes are treated as follows:
"The ""NEW"" employee" → The "NEW" employee
The "NEW" employee → The "NEW" employee
The ""NEW"" employee → The ""NEW"" employee
Example 1
An ENCLOSED BY clause is required when a csv file has a JSON column enclosed with double quotes (" ").
CREATE TABLE employees(emp_id int, data JSON);
csv file contents **** emp_id,data 159,"{""name"": ""Damien Karras"", ""age"": 38, ""city"": ""New York""}"
CREATE OR REPLACE PIPELINE emps AS LOAD DATA FS '/tmp/<file_name>.csv' INTO TABLE employees FIELDS TERMINATED BY ',' ENCLOSED BY '"' IGNORE 1 LINES; START PIPELINE emps; SELECT * FROM employees; **** +--------+-----------------------------------------------------+ | emp_id | data | +--------+-----------------------------------------------------+ | 159 | {"age":38,"city":"New York","name":"Damien Karras"} | +--------+-----------------------------------------------------+
Example 2
An ESCAPED BY
clause is required when a character is specified as an escape character for a string. The example below uses a backslash (\) as the escape character.
csv file contents **** emp_id,data 298,"{\"name\": \"Bill Denbrough\", \"age\": 25, \"city\": \"Bangor\"}"
CREATE OR REPlACE PIPELINE emps2 AS LOAD DATA FS '/tmp/<file_name>.csv' INTO TABLE employees FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\' IGNORE 1 LINES; START PIPELINE emps2; SELECT * FROM employees; **** +--------+-----------------------------------------------------+ | emp_id | data | +--------+-----------------------------------------------------+ | 298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"} | | 159 | {"age":38,"city":"New York","name":"Damien Karras"} | +--------+-----------------------------------------------------+
Example 3
This example will fail as the JSON field in the csv file is not in the correct format.
csv file contents **** emp_id,data 410,"{"name": "Annie Wilkes", "age": 45, "city":"Silver Creek"}"
CREATE OR REPLACE PIPELINE emps3 AS LOAD DATA FS '/tmp/<file_name>.csv' INTO TABLE employees FIELDS TERMINATED BY ',' ENCLOSED BY '"' IGNORE 1 LINES; START PIPELINE emps3; **** ERROR 1262 (01000): Leaf Error (127.0.0.1:3307): Row 1 was truncated; it contained more data than there were input columns
Example 4
An ENCLOSED BY
clause is required when a csv file has a JSON column enclosed with curly brackets ({ }).
csv file contents **** emp_id,data 089,{"name": "Wilbur Whateley","age": 62,"city": "Dunwich"}
CREATE OR REPLACE PIPELINE emps4 AS LOAD DATA FS '/tmp/<file_name>.csv' INTO TABLE employees FIELDS TERMINATED BY ',' ENCLOSED BY '{' IGNORE 1 LINES; START PIPELINE emps4; SELECT * FROM employees; **** +--------+------------------------------------------------------+ | emp_id | data | +--------+------------------------------------------------------+ | 298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"} | | 159 | {"age":38,"city":"New York","name":"Damien Karras"} | | 89 | {"age":62,"city":"Dunwich","name":"Wilbur Whateley"} | +--------+------------------------------------------------------+
WITH TRANSFORM
Creates a pipeline that uses a transform. A transform is an optional, user-provided program, such as a Python script. For more information, see CREATE PIPELINE ... WITH TRANSFORM.
INTO PROCEDURE
Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline's data source. For more information, see CREATE PIPELINE ... INTO PROCEDURE.