CREATE PIPELINE

Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.

Note

When you run a CREATE PIPELINE ... LOAD DATA command, it processes extracted and transformed records as if you ran LOAD DATA alone, using the same options. These options are explained in LOAD DATA.

CREATE PIPELINE ... LOAD DATA also has additional options that are not available in LOAD DATA. These additional options are explained in the LOAD DATA section at the end of this topic.

Syntax

CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> AS
LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
| azure_blob_configuration | hdfs_configuration | gcs_configuration | link_configuration}
[BATCH_INTERVAL <milliseconds>]
[MAX_PARTITIONS_PER_BATCH <max_partitions_per_batch>]
[MAX_RETRIES_PER_BATCH_PARTITION max_retries]
[RESOURCE POOL <resource_pool_name>]
[(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
[WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
[REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS]
{ INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> }
{ <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> }
[ (<column_name>, ... ) ]
[SET <column_name> = <expression>,... | <pipeline_source_file>]
[WHERE <expression>,...]
[ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]]
[NULL DEFINED BY <string> [OPTIONALLY ENCLOSED]]
<kafka_configuration>:
KAFKA 'kafka_topic_endpoint'
<s3_configuration>:
S3 { '<bucket-name>' | '<bucket-name/path>' }
[CONFIG '<configuration_json>']
CREDENTIALS '<credentials_json>'
<filesystem_configuration>:
FS 'path'
[CONFIG '<configuration_json>']
<azure_blob_configuration>:
AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' }
CREDENTIALS '<credentials_json>'
[CONFIG '<configuration_json>']
<hdfs_configuration>:
HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>'
[CONFIG '<configuration_json>']
<gcs_configuration>:
GCS { '<bucket-name>' | '<bucket-name/path>' }
CREDENTIALS '<credentials_json>'
[CONFIG '<configuration_json>']
<link_configuration>:
LINK <connection_name> '<path>'
<json_format_options>:
FORMAT JSON
( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)
<avro_format_options>:
FORMAT AVRO SCHEMA REGISTRY {uri}
( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)
[SCHEMA '<avro_schema>']
<subvalue_path>:
{% | [%::]ident [::ident ...]}
<parquet_format_options>:
FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']
<parquet_subvalue_mapping>:
( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)
<parquet_subvalue_path>:
{ident [::ident ...]}
<csv_format_options>:
[FORMAT CSV]
[{FIELDS | COLUMNS}
TERMINATED BY '<string>'
[[OPTIONALLY] ENCLOSED BY '<char>']
[ESCAPED BY '<char>']
]
[LINES
[STARTING BY '<string>']
[TERMINATED BY '<string>']
]
[IGNORE <number> LINES]
[ ({<column_name> | @<variable_name>}, ...) ]

Note

This topic uses the term "batch", which is a subset of data that a pipeline extracts from its data source. For more information on batches, see The Lifecycle of a Pipeline.

Note

The REPLACE capability of pipelines maintains the cursor positions when replacing existing pipelines.

Remarks

  • Pipeline names are always case-sensitive for operations that refer to pipelines.

  • If the OR REPLACE clause is provided and a pipeline with pipeline_name already exists, then the CREATE query will alter that pipeline to match the new definition. Its state, including its cursor positions, will be preserved by the CREATE.

  • SKIP CONSTRAINT ERRORS and SKIP DUPLICATE KEY ERRORS are unsupported with pipelines into stored procedures.

  • IGNORE, SKIP PARSER ERRORS, and SKIP ALL ERRORS are supported with CSV and Kafka JSON pipelines only.

  • REPLACE, SKIP CONSTRAINT ERRORS, and SKIP DUPLICATE KEY ERRORS are supported with non-CSV pipelines.

  • This command causes implicit commits. Refer to COMMIT for more information.

  • You can use the function pipeline_batch_id() in an expression in the SET clause. This function returns the id of the batch used to load the data. For example, given the table definition CREATE TABLE t(b_id INT, column_2 TEXT);, you could create this statement to load the batch id into the column b_id:

    CREATE PIPELINE p AS LOAD DATA ... INTO TABLE t(@b_id,column_2)
    ... SET b_id = pipeline_batch_id();
  • Pipeline names that begin with a number or are solely numeric; require backticks (` `) around the pipeline name.

    CREATE PIPELINE `3e93587cb1` AS LOAD DATA KAFKA 'host.example.com/my-topic' INTO TABLE <table_name>;
  • Refer to the Permission Matrix for the required permission.

REPLACE, IGNORE or SKIP ERRORS

REPLACE, IGNORE or SKIP can be used when creating a pipeline. See Additional CREATE PIPELINE Examples for more information.

The SKIP ... ERRORS behavior allows you to specify an error scenario that, when encountered, discards an offending row. See LOAD DATA for more information.

Unlike SKIP ... ERRORS which discards offending rows, IGNORE may change the inserted row’s data to ensure that it adheres to the table schema. See LOAD DATA

The CREATE PIPELINE .. LINK statement loads data from the data provider using a connection link. To use this command, you only need to know the connection link name, not the connection details and configuration. However, you need the SHOW LINK permission, provided by your administrator, to use a connection link. This command supports connections to S3, Azure, GCS, HDFS, and Kafka only.

The following example creates a pipeline using an HDFS connection:

USE db1;
CREATE PIPELINE mypipeline AS LOAD DATA
LINK HDFS connection 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE my_table;
START PIPELINE mypipeline;

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host.example.com/my-topic'
INTO TABLE table_name;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name. For a tutorial on getting started with Kafka Pipelines, see Load Data from Kafka Using a Pipeline .

Kafka Pipeline Syntax Using the CONFIG and CREDENTIALS Clauses

The following statement creates a Kafka pipeline to load data from Confluent Cloud, using the CONFIG and CREDENTIALS clauses.

CREATE PIPELINE pipeline_name AS LOAD DATA KAFKA '<Confluent Cloud cluster endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location":"<CA certificate file path>",
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE table_name;

For a tutorial on getting started with Kafka Pipelines, see Load Data from Kafka Using a Pipeline.

Note

By default, records may be inserted out of order if the SingleStore cluster is sufficiently behind in loading records from the source. If you require the records to be inserted in order, e.g. in upsert scenarios, pass the DISABLE OUT_OF_ORDER OPTIMIZATION clause to CREATE PIPELINE or ALTER PIPELINE. When this optimization is disabled, records from the same source file or source partition will be inserted in the order in which they appear in that partition. However, records from different source files or source partitions may still be inserted in an arbitrary order with respect to each other.

Kafka Pipelines do not support Kafka transactions.

Compressed topics are supported without requiring any additional configuration in SingleStore.

A CREATE PIPELINE ... KAFKA statement cannot specify more than one topic.

In a CREATE PIPELINE ... KAFKA statement, you can specify the CONFIG and CREDENTIALS clauses. As an alternative, you can reference a connection link. For more information, see Configuring and Using Connection Links.

Specifying Multiple Kafka Brokers

The following example creates a pipeline where three Kafka brokers are specified. They are located on port 9092, at host1.example.com, host2.example.com and host3.example.com.

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/my-topic'
INTO TABLE 'table_name';

S3 Pipeline Syntax

The following example statement demonstrates how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key",
"aws_session_token": "your_temp_session_token",
"aws_web_identity_token": "your_aws_web_identiy_token",
"role_arn":"your_role_arn"}'
INTO TABLE `table_name`;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an S3 bucket named bucket-name as the data source, and will start ingesting the bucket’s objects into table_name. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).

No CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, SingleStore will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown in the example below. The CONFIG clause can also be used to specify the suffixes for files to load or to specify the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

The request_payer field name may be added to the CONFIG clause.

CREATE OR REPLACE PIPELINE <pipeline_name>
AS LOAD DATA S3 'data-test-bucket'
CONFIG '{"region": "us-east-1","request_payer": "requester"}'
CREDENTIALS '{"aws_access_key_id": "ANIAVX7U2LM9QVJMK2ZT",
"aws_secret_access_key": "xxxxxxxxxxxxxxxxxxxxxxx"}'
INTO TABLE 'market_data'
(ts, timestamp, event_type, ticker, price, quantity, exchange, conditions);

Note

S3 pipelines support the * wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA S3 'my_bucket/my_path*' ....

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... S3 statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

S3 Pipeline Using Specified Region

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key"
[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;

S3 Pipeline Using Metadata Garbage Collection (GC)

By default, each pipeline maintains progress metadata for every known file name, for the lifetime of the pipeline. The optional ENABLE OFFSETS METADATA GC clause of CREATE PIPELINE causes the created pipeline to periodically discard metadata for already-loaded files, using a timestamp-based strategy that preserves exactly-once semantics. This option is only supported for S3 pipelines.

The number of batch metadata entries retained before being overwritten by new incoming batches may be adjusted by changing the default value of the global engine variable. pipelines_batches_metadata_to_keep. See Sync Variables Lists for more information.

CREATE PIPELINE <pipeline_name> AS LOAD DATA S3 "<file_name>"
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id",
"aws_secret_access_key": "your_secret_access_key"
[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
ENABLE/DISABLE OFFSETS METADATA GC
INTO TABLE <table_name>;

Note

By default the METADATA GC clause is disabled. Once it has been enabled it may be disabled by using the ALTER PIPELINE command.

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA AZURE 'container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'
INTO TABLE `table_name`;
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an Azure Blob container named container-name as the data source, and will start ingesting the bucket’s objects into table_name. For a tutorial on getting started with Azure Pipelines, see Load Data from Azure Blob Storage Using a Pipeline.

Note that no CONFIG clause is required to create an Azure pipeline unless you need to specify the suffixes for files to load or the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

An Azure pipeline must authenticate with Azure before it can begin reading blobs from a container. The pipeline requires you to provide an account name and account access key.

The CREDENTIALS clause should be a JSON object with two fields:

account_name: this is the account name under which your blob container resides. This is usually a human-readable name, given by the person who created the account.

account_key: usually an 88 character 512-bit string linked to a storage account.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... AZURE statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

See the Azure documentation about viewing and managing Azure Access Keys to learn more.

Permissions and Policies

An Azure pipeline can be created to read from either a container or a blob. Both of these resource types may be configured with access policies or permissions. Before creating an Azure pipeline, it’s important to consider both the provided user credentials and any existing policies or permissions on the desired resource.

For example, if you provide credentials that implicitly grant access to all blobs in a container, you may not need to add or modify any access policies or permissions on the desired resource. However, if you provide credentials that do not have permission to access the resource, an administrator will need to allow access for your credentials.

Consider the following scenarios:

  • Read all objects in a container: An Azure pipeline configured for a container will automatically read all blobs it has access to. Changes to a container-level policy or permissions may be required.

  • Read all blobs with a given prefix: An Azure pipeline configured to read all objects in a container with a given prefix may require changes to a container-level policy with a prefix in order to allow access the desired blobs.

  • Read a specific object in a container: An Azure pipeline configured for a specific blob may require changes to the policies and permissions for both the blob and container.

Filesystem Pipeline Syntax

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';
START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a directory as the data source, and will start ingesting data into table_name.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

In order to ensure a pipeline processes zero byte files, enable the process_zero_byte_files option in the CONFIG clause. If this option is disabled or missing, zero byte files will be skipped by default.

CONFIG '{"process_zero_byte_files" : true}'

For a tutorial on getting started with Filesystem Pipelines, see Load Data from the Filesystem Using a Pipeline.

Filesystem Paths and Permissions

The paths used by the Unix filesystem extractor address files which must be accessible from every node in the cluster. The most common way to use this extractor is with a distributed filesystem, such as NFS. The path provided to a Unix filesystem extractor must be absolute, and will usually include a Unix glob pattern which matches many files, as seen in the example above. In order for the Unix filesystem extractor to load data, the files must grant read permissions to the SingleStore Linux user.

Azure and S3 Folder Paths

If an Azure Blob or S3 bucket contains multiple folders beginning with similar names, the pipeline will read the data from all the folders unless and until a forward slash (/) is included at the end of the path.

Below is an Azure example using three folders that begin with payment in their names. The forward slash ensures only the data in the Payment folder is loaded.

  • Payment

  • PaymentHistory

  • PaymentPlan

CREATE OR REPLACE PIPELINE pipeline_csv_insert_payment AS
LOAD DATA AZURE 'uploads/csv/Payment/'
CREDENTIALS
'{"account_name": "<account-name>",
"account_key": "<account-key>"
}'
INTO PROCEDURE spPipelineCsvInsertPayment
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\r\n'
IGNORE 1 LINES;

Below is an S3 bucket example using two folders: BooksNautical and BooksHorror. The forward slash ensures only the data in the BooksNautical folder is loaded.

CREATE PIPELINE books AS
LOAD DATA S3 's3://test-bucket5-ss/BooksNautical/'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXX"}'
INTO TABLE all_books
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\'
LINES TERMINATED BY '\r\n' STARTING BY ''
IGNORE 1 LINES;

HDFS Pipeline Syntax

The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `table_name`
FIELDS TERMINATED BY '\t';
START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name and references the HDFS path /path/to/files as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*) are imported into my_table. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS file.

If you would like to create a pipeline that imports Hive output files, set the disable_partial_check attribute in your CREATE PIPELINE’s CONFIG JSON to true. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.

You also specify attributes in CREATE PIPELINE’s CONFIG clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

As an alternative to specifying the CONFIG clause in a CREATE PIPELINE ... HDFS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

For a tutorial on getting started with HDFS Pipelines, Load Data from HDFS Using a Pipeline.

GCS Pipeline Syntax

The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA GCS 'bucket-name'
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';
START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name, uses a GCS bucket named bucket-name as the data source, and ingests the bucket’s objects into my_table. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

Note

GCS pipelines support the * wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA GCS 'my_bucket/my_path*' ....

For a tutorial on getting started with GCS Pipelines, see Load Data from Google Cloud Storage (GCS) Using a Pipeline.

Authentication and Access Control in GCS

A GCS pipeline requires you to authenticate to the desired bucket. Depending on the provided credentials, a bucket’s access permissions or policies may require changes to allow access. To familiarize yourself with access management for buckets and objects, see the GCS documentation here.

Authentication

A GCS pipeline must authenticate to Google before it can begin reading objects from a bucket. While GCS allows anonymous authentication to buckets and their objects, SingleStore GCS pipelines support only HMAC keys. The pipeline requires you to provide a GCS access key and secret key.

The CREDENTIALS clause should be a JSON object with two fields:

access_id: usually a 24 or 60 character alphanumeric string, which is linked to the Google account, typically is all uppercase and starts with GOOG.

secret_key: usually a 40 character Base-64 encoded string that is linked to a specific access_id.

...
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
...

See the GCS documentation about managing GCS HMAC keys to learn more.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... GCS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

GCS Pipeline Limitations

A GCS pipeline has a few limitations and also inherits limitations from the Google Cloud Storage service itself. See the GCS Documentation for more detailed information.

  • Versioning: If your GCS pipeline is configured to read from a version-enabled bucket, you will only receive the latest version of the object from the bucket. Currently, you cannot configure your GCS pipeline to read specific version IDs.

  • 5 TB Max Object Size: Google Cloud Storage supports a maximum object size of 5 TB, and this limit also applies to an GCS pipeline.

  • Rate Limiting: A large SingleStorecluster configured with a GCS pipeline might encounter throttling or rate limiting imposed by GCS. Currently, a GCS pipeline cannot be configured to bypass these limits by reducing the frequency of read requests. For more information on GCS’s rate limiting and how to optimize your data, see this GCS documentation page.

Creating a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source.

The statement assigns the extracted fields to the columns of a new row to be inserted into table_name or passed to proc_name. The fields can also be assigned to temporary values in the SET clause, for use in SQL transformations.

Rows that don’t match the WHERE clause are not included.

Parquet 2.0 encodings are supported.

Parquet Pipelines do not support Kafka.

When you write a CREATE PIPELINE .. FORMAT PARQUET statement, you include the LOAD DATA clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.

Caution

Parquet files that are compressed using the internal Parquet writer should not have a .gz extension in the file name.

Extracting Parquet Values

parquet_subvalue_mapping specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET clause.

CREATE PIPELINE .. FORMAT PARQUET uses the ::-separated list of field names in a parquet_subvalue_path to perform successive field name name lookups in nested Parquet group schemas. The last field in parquet_subvalue_path must not itself have group type. Additionally, there must be no fields with repeated type along parquet_subvalue_path, and the final field in the path must have a primitive type. Extracting whole records or lists of Parquet as JSON is unsupported, as is extracting individual list elements.

If an optional field along a parquet_subvalue_path is omitted in a given record, the extracted value for that column will be NULL.

parquet_subvalue_path components containing whitespace or punctuation must be surrounded by backticks.

For example, consider a Parquet record with the following schema:

message m {
required int64 f1;
optional group g1 {
optional int64 f2;
optional int64 f3;
}
}

In an instance of this schema whose JSON equivalent is {"f1":1, {"g1":{"f2":2, "f3":None}}}, the parquet_subvalue_pathf1, g1::f2, g1::f3 will extract the values 1, 2, and NULL, respectively. If the JSON equivalent is instead {"f1":1, {"g1":None}}, the parquet_subvalue_path will extract 1, NULL, and NULL, respectively.

If you are loading .parquet data from an S3 bucket, ensure that non-empty _SUCCESS, _committed, and _started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

Converting Parquet Values

After a parquet_subvalue_path is evaluated, its value is converted to an unspecified SQL type which may be further explicitly or implicitly converted as if from a SQL string, as shown in the following table. The converted value is written to a column in the destination table or is written to a temporary variable specified in the SET clause.

Parquet Type

Converted Value

boolean

"1"/"0"

int32

The string representation of the integer.

int64

The string representation of the integer.

int96

See Converting Parquet Time Values.

float

SQL NULL if not finite. Otherwise, a string convertible without loss of precision to FLOAT

double

SQL NULL if not finite. Otherwise, a string convertible without loss of precision to DOUBLE

binary

Verbatim, from input bytes

fixed_len_byte_array

Verbatim, from input bytes

Converting Parquet Logical Types

Unsigned integer logical type annotations on int32 and int64 types are respected.

The decimal annotation on binary and fixed_len_byte_array types will trigger conversion as if to a numeric literal compatible with a SQL DECIMAL type of the same scale and precision.

All other logical type annotations are ignored and have no effect on conversion.

Converting Parquet Time Values

Values of int96 type will be converted as if to datetime literals truncated to microsecond precision. The underlying value will be converted to the SingleStorecluster’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Time zone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as int96. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files provide no definitive information about the time zone of int96 data.

When the TIMEZONE clause is omitted, SingleStore will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Providing the value of @@time_zone to the TIMEZONE clause will disable this behavior and guarantee no conversion.

When the TIMEZONE clause is provided, SingleStore will assume that encoded data is in the specified time zone, converting it to the SingleStore time zone.

Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted. In addition, if the pipeline is performing time zone conversions, then the valid range is further restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore time zone.

No automatic integer-to-datetime conversion occurs for time values encoded as int64 or int32, even if the logical type is a time type like timestamp. Use the SET clause to explicitly convert such values to a DATETIME compatible form.

Example

Consider a Parquet file with the following schema:

message m1 {
required boolean f1;
required fixed_len_byte_array(4) f2;
optional group g1 {
optional binary f3 (STRING);
required int64 f4;
}
}

example.parquet contains four records whose JSON representation is:

{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}

Create a pipeline that ingests example.parquet into table parquet_tbl:

CREATE TABLE parquet_tbl(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);
CREATE PIPELINE parquet_pipe
AS LOAD DATA FS "example.parquet"
INTO TABLE parquet_tbl
(@v1 <- f1,
@v2 <- f2,
c3 <- g1::f3,
c4 <- g1::f4)
FORMAT PARQUET
SET c2 = UPPER(CONVERT(@v2, CHAR))
WHERE @v1 = TRUE;

Test the pipeline:

TEST PIPELINE parquet_pipe;
+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

Note the following about the output of TEST PIPELINE parquet_exp;:

  • The first record in the example.parquet .json representation was not included in the output because the WHERE clause, using the temporary variable v1, filters out rows where f1 is false.

  • The second record of the output contains NULL for columns c3 and c4 because the optional group g1 is null in that record in the .json representation.

  • The third record of the output contains NULL for column c3 because the optional field g1::f3 is null in that record in the .json representation.

  • The column c2 in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable f2 , which is set to the value of v2 in each record in the .json representation.

LOAD DATA

Note

Pipelines natively load JSON, Avro, and CSV input data; see LOAD DATA for example syntax.

When you run a CREATE PIPELINE ... LOAD DATA command, it processes extracted and transformed records as if you ran LOAD DATA alone, using the same options. These options are explained in LOAD DATA.

CREATE PIPELINE ... LOAD DATA also has additional options that are not available in LOAD DATA. These additional options are explained below.

The SCHEMA REGISTRY {"IP" | "Hostname"} option allows LOAD DATA to pull an Avro schema from a schema registry. For more information, see the Avro Schema Evolution with Pipelines topic.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:

    LOAD DATA KAFKA 'host.example.com/my-topic'
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'bucket-name'
  • AS LOAD DATA ... FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry. You can optionally use the CONFIG and CREDENTIALS clauses to specify configuration settings to connect to the schema registry over SSL.

    Note

    The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL. The ssl. settings shown only apply to SingleStore 7.3.5 and later.

    CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro"
    INTO TABLE ...
    FORMAT AVRO
    SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
    ...
    CONFIG '{"schema.registry.ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node",
    "schema.registry.ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node",
    "schema.registry.ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}'
    CREDENTIALS '{"schema.registry.ssl.key.password": "your-password-for-the-ssl-certificate-key"}'

    Note

    You can use a subset of the ssl. settings as follows:

    • schema.registry.ssl.key.location, schema.registry.ssl.ca.location, and schema.registry.ssl.key.password

    • schema.registry.ssl.certificate.location, schema.registry.ssl.key.location, and schema.registry.ssl.key.password

    schema.registry.ssl.key.password is only required if your SSL certificate key has a password.

    As an alternative to specifying the SSL configuration settings in the CONFIG and CREDENTIALS clauses, you can install the registry's certificate (ca-cert) on all nodes in your SingleStorecluster.

  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.

  • [MAX_RETRIES_PER_BATCH_PARTITION max_retries]: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table. If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up to max_retries specified in this clause. If no value is specified or MAX_RETRIES_PER_BATCH_PARTITION is set to 0, then max_retries is set to the value specified by the PIPELINES_MAX_RETRIES_PER_BATCH_PARTITION engine variable.

  • [RESOURCE POOL pool_name]: Specifies the resource pool that is used to load pipeline data.

    • If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the resource_pool engine variable as its resource pool.

      • In order to create a new resource pool, the user will need to have SUPER or CREATE_RG privileges.

      • Use information_schema.USER_PRIVILEGES to see users and their related privileges. See USER_PRIVILEGES for more information.

    • To obtain details about a resource pool, use select * from information_schema.mv_resource_pool_status to review the current running threads on a cluster for all nodes.

    • The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.

    • The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.

    • For more information on resource pools, see Set Resource Limits.

    Note

    A good starting point for troubleshooting memory issues in pipelines is to run SELECT * FROM information_schema.mv_processlist. See mv_processlist for more information.

  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells SingleStore to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics (an aggregator pipeline is not required for single-partition Kafka topics). An aggregator pipeline is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.

Note

SingleStore does not support the LOAD DATA ... [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS] semantics for ingesting data into columnstore tables with unique keys. As an alternative, you can use the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE or write a stored procedure that handles duplicate key errors, and have the Pipeline call this procedure.

Example

The following example shows how to use multiple SET clauses with CREATE PIPELINE ... LOAD DATA for ingesting values to columns using variables. Consider the following file:

echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv

Create a pipeline that ingests infile.csv into the following table orders.

CREATE TABLE orders (
ID INT,
del_t1 DATETIME,
del_t2 DATETIME);
CREATE PIPELINE order_load AS
LOAD DATA FS '/pipeline_test_infile/'
INTO TABLE orders
FIELDS TERMINATED BY ','
(ID, @del_t1, @del_t2)
SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),
del_t2 = IF(@del_t2='NULL',NULL,@del_t2);

Test the pipeline:

TEST PIPELINE order_load;
+------+---------------------+---------------------+
| ID   | del_t1              | del_t2              |
+------+---------------------+---------------------+
|    1 | NULL                | 2020-08-06 07:53:09 |
|    2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
|    3 | 2020-08-05 07:53:09 | NULL                |
+------+---------------------+---------------------+

Loading Data into a Pipeline Using JSON

When using JSON formatted data, the mapping clause should be used. Without this clause, the data mapping will be inferred based on its schema. If a field name is nullable and has not been specified using the mapping clause, a NULL value will be returned instead of an error.

Below are examples for loading data into a pipeline using JSON.

The following examples use a JSON key pair and an array from a local file source.

CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR(10)); 
CREATE PIPELINE jtest AS
LOAD DATA FS '<file path>/keypairs.json'
INTO TABLE keypairs
FORMAT JSON(`key` <- keypairs::`key`,`value` <- keypairs::`value`); 
START PIPELINE jtest;
SELECT * FROM keypairs;
+------+--------+
| key  | value  | 
+------+--------+
|    1 | 1      |
+------+--------+

Warning

To use reserved words in a table, backticks are required. The example above uses key and value as column names by using backticks.

CREATE TABLE teams(basketball VARCHAR(50), baseball VARCHAR (50),
football VARCHAR(50), hockey VARCHAR(50));
CREATE PIPELINE teams_list AS
LOAD DATA FS '<file path>/jtinsert.json'
INTO TABLE teams
FORMAT JSON
(basketball <- teams::basketball,
baseball <- teams::baseball,
football <- teams::football,
hockey <- teams::hockey);
START PIPELINE teams_list;
SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball  | football | hockey |
+------------+-----------+----------+--------+
|  Lakers    | Dodgers   | Raiders  | Kings  |
+------------+-----------+----------+--------+

The following examples use a JSON key pair and an array from an S3 bucket.

CREATE TABLE keypairs(`key` VARCHAR(10), `value` VARCHAR (10));
CREATE PIPELINE jtest AS
LOAD DATA S3 's3://<bucket_name>/<filename>.json'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXX"}'
INTO TABLE keypairs
FORMAT JSON
(`key` <- keypairs::`key`,
`value` <- keypairs::`value`);
START PIPELINE jtest;
SELECT * FROM keypairs;
+------+--------+
| key  | value  | 
+------+--------+
|    1 | 1      |
+------+--------+
CREATE TABLE teams(basketball varchar(50), baseball varchar (50),
football varchar(50), hockey varchar(50));

Below is the contents of the JSON file used in the pipeline example:

{
"teams": [{
"basketball": "Knicks"
},
{
"baseball": "Yankees"
},
{
"football": "Giants"
},
{
"hockey": "Rangers"
}
]
}
CREATE PIPELINE teams_list AS
LOAD DATA S3 's3://<bucket_name>/<filename>.json'
CONFIG '{"region":"us-west-2"}'
CREDENTIALS '{"aws_access_key_id": "XXXXXXXXXXXXXXXXX",
"aws_secret_access_key": "XXXXXXXXXXX"}'
INTO TABLE teams
FORMAT JSON
(basketball <- teams::basketball,
baseball <- teams::baseball,
football <- teams::football,
hockey <- teams::hockey);
START PIPELINE teams_list;
SELECT * FROM teams;
+------------+-----------+----------+--------+
| basketball | baseball  | football | hockey |
+------------+-----------+----------+--------+
|  Lakers    | Dodgers   | Raiders  | Kings  |
+------------+-----------+----------+--------+

Loading JSON Data from a CSV File

To use an ENCLOSED BY <char> as a terminating field, a TERMINATED BY clause is needed. For clarity, instances of an ENCLOSED BY <char> appearing within a field value can be duplicated, and they will be understood as a singular occurrence of the character.

If an ENCLOSED BY "" is used, quotes are treated as follows:

  • "The ""NEW"" employee"  → The "NEW" employee

  • The "NEW" employee → The "NEW" employee

  • The ""NEW"" employee → The ""NEW"" employee

Example 1

An  ENCLOSED BY clause is required when a csv file has a JSON column enclosed with double quotes (" ").

CREATE TABLE employees(emp_id INT, data JSON);
csv file contents
emp_id,data
159,"{""name"": ""Damien Karras"", ""age"": 38, ""city"": ""New York""}"
CREATE OR REPLACE PIPELINE emps AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
IGNORE 1 LINES;
START PIPELINE emps;
SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data                                                |
+--------+-----------------------------------------------------+
|    159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+

Example 2

An  ESCAPED BY clause is required when a character is specified as an escape character for a string. The example below uses a backslash (\) as the escape character.

csv file contents
emp_id,data
298,"{\"name\": \"Bill Denbrough\", \"age\": 25, \"city\": \"Bangor\"}"
CREATE OR REPlACE PIPELINE emps2 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '\\'
IGNORE 1 LINES;
START PIPELINE emps2;
SELECT * FROM employees;
+--------+-----------------------------------------------------+
| emp_id | data                                                |
+--------+-----------------------------------------------------+
|    298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"}  |
|    159 | {"age":38,"city":"New York","name":"Damien Karras"} |
+--------+-----------------------------------------------------+

Example 3

This example will fail as the JSON field in the csv file is not in the correct format.

csv file contents
emp_id,data
410,"{"name": "Annie Wilkes", "age": 45, "city":"Silver Creek"}"
CREATE OR REPLACE PIPELINE emps3 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
IGNORE 1 LINES;
START PIPELINE emps3;
ERROR 1262 (01000): Leaf Error (127.0.0.1:3307): Row 1 was truncated; it contained more data than there were input columns

Example 4

An  ENCLOSED BY clause is required when a csv file has a JSON column enclosed with curly brackets ({ }).

csv file contents
emp_id,data
089,{"name": "Wilbur Whateley","age": 62,"city": "Dunwich"}
CREATE OR REPLACE PIPELINE emps4 AS
LOAD DATA FS '/tmp/<file_name>.csv'
INTO TABLE employees
FIELDS TERMINATED BY ','
ENCLOSED BY '{'
IGNORE 1 LINES;
START PIPELINE emps4;
SELECT * FROM employees;
+--------+------------------------------------------------------+
| emp_id | data                                                 |
+--------+------------------------------------------------------+
|    298 | {"age":25,"city":"Bangor","name":"Bill Denbrough"}   |
|    159 | {"age":38,"city":"New York","name":"Damien Karras"}  |
|     89 | {"age":62,"city":"Dunwich","name":"Wilbur Whateley"} |
+--------+------------------------------------------------------+

WITH TRANSFORM

Creates a pipeline that uses a transform. A transform is an optional, user-provided program, such as a Python script. For more information, see CREATE PIPELINE ... WITH TRANSFORM.

INTO PROCEDURE

Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline's data source. For more information, see CREATE PIPELINE ... INTO PROCEDURE.

Last modified: March 6, 2024

Was this article helpful?