SingleStore DB

CREATE PIPELINE

Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.

Notice

SingleStore Managed Service does not support Filesystem Pipelines.

Syntax
CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> AS
  LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
            | azure_blob_configuration | hdfs_configuration | gcs_configuration | link_configuration}
    [BATCH_INTERVAL <milliseconds>]
    [MAX_PARTITIONS_PER_BATCH <max_partitions_per_batch>]
    [MAX_RETRIES_PER_BATCH_PARTITION max_retries]
    [RESOURCE POOL <resource_pool_name>]
    [(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
    [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
  [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS]
  { INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> }
  { <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> }
  [ (<column_name>, ... ) ]
  [SET  <column_name> = <expression>,... | <pipeline_source_file>]
  [WHERE <expression>,...]
  [ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]]

  <kafka_configuration>:
    KAFKA 'kafka_topic_endpoint'

  <s3_configuration>:
    S3 { '<bucket-name>' | '<bucket-name/path>' }
      [CONFIG '<configuration_json>']
      CREDENTIALS '<credentials_json>'

  <filesystem_configuration>:
    FS 'path'
      [CONFIG '<configuration_json>']

  <azure_blob_configuration>:
    AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' }
      CREDENTIALS '<credentials_json>'
      [CONFIG '<configuration_json>']

  <hdfs_configuration>:
    HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>'
      [CONFIG '<configuration_json>']

  <gcs_configuration>:
   GCS { '<bucket-name>' | '<bucket-name/path>' }
     CREDENTIALS '<credentials_json>'
     [CONFIG '<configuration_json>']

  <link_configuration>:
   LINK [<database_name>.]<connection_name> '<path>'

  <json_format_options>:
    FORMAT JSON
    ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)    
    [KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)]
  
<avro_format_options>:
    FORMAT AVRO SCHEMA REGISTRY {<IP address> | <hostname>}:<port>
    ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)
    [SCHEMA '<avro_schema>']
    [KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)]
    
  <subvalue_path>:
    {% | [%::]ident [::ident ...]}

  <parquet_format_options>:
    FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']

  <parquet_subvalue_mapping>:
    ( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)

  <parquet_subvalue_path>:
    {ident [::ident ...]}

  <csv_format_options>:
    [FORMAT CSV]
    [{FIELDS | COLUMNS}
     TERMINATED BY '<string>'
       [[OPTIONALLY] ENCLOSED BY '<char>']
       [ESCAPED BY '<char>']
    ]
    [LINES
      [STARTING BY '<string>']
      [TERMINATED BY '<string>']
    ]
    [IGNORE <number> LINES]
    [ ({<column_name> | @<variable_name>}, ...) ]

Note

This topic uses the term "batch", which is a subset of data that a pipeline extracts from its data source. For more information on batches, see The Lifecycle of a Pipeline.

Notice

The REPLACE capability of pipelines maintains the cursor positions when replacing existing pipelines.

Remarks
  • If the OR REPLACE clause is provided and a pipeline with pipeline_name already exists, then the CREATE query will alter that pipeline to match the new definition. Its state, including its cursor positions, will be preserved by the CREATE.

  • CONFIG and CREDENTIALS can be specified in either order (CONFIG followed by CREDENTIALS or CREDENTIALS followed by CONFIG).

  • SKIP CONSTRAINT ERRORS and SKIP DUPLICATE KEY ERRORS are unsupported with pipelines into stored procedures.

  • IGNORE, SKIP PARSER ERRORS, and SKIP ALL ERRORS are unsupported with non-CSV pipelines.

  • REPLACE, SKIP CONSTRAINT ERRORS, and SKIP DUPLICATE KEY ERRORS are supported with non-CSV pipelines.

  • Pipelines can be used to ingest and process data with supported data sets. For more information, see Working with Character Sets and Collations.

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host.example.com/my-topic'
INTO TABLE 'table_name';

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name. For a tutorial on getting started with Kafka Pipelines, see Load Data from Kafka Using a Pipeline .

Notice

By default, records may be inserted out of order if the SingleStore cluster is sufficiently behind in loading records from the source. If you require the records to be inserted in order, e.g. in upsert scenarios, pass the DISABLE OUT_OF_ORDER OPTIMIZATION clause to CREATE PIPELINE or ALTER PIPELINE. When this optimization is disabled, records from the same source file or partition will be inserted in the order in which they appear in that partition. However, records from different files or partitions may still be inserted in an arbitrary order with respect to each other.

Compressed topics are supported without requiring any additional configuration in SingleStore.

A CREATE PIPELINE ... KAFKA statement cannot specify more than one topic.

In a CREATE PIPELINE ... KAFKA statement, you can specify the CONFIG and CREDENTIALS clauses. As an alternative, you can reference a connection link. For more information, see Configuring and Using Connection Links.

Kafka pipelines that use FORMAT JSON or FORMAT AVRO SCHEMA REGISTRY can use KAFKA KEY to extract keys from Kafka records. Keys must be in the format specified in the FORMAT clause.

Example: Accessing Kafka Keys in a Pipeline

The following CREATE PIPELINE statement creates a new pipeline named pipeline_name, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name when the pipeline is started. The pipeline expects the payload of each Kafka record to be in JSON format and have a field named a, which the pipeline maps to a column named a in the destination table. In addition, the pipeline expects the key of each Kafka record to be in JSON format and have a field named b, which the pipeline maps to a column named b in the destination table.

CREATE TABLE table_name (a INT, b TEXT);

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host.example.com/my-topic'
INTO TABLE `table_name`
FORMAT JSON (a <- a)
KAFKA KEY (b <- b);

START PIPELINE pipeline_name;

Inserting a record into the my-topic topic with key {"b": "hello"} and payload {"a": 1}would result in the following row in the destination table:

a

b

1

hello

Kafka Pipeline Support for Transactions

A Kafka Pipeline can be configured to use one the following isolation levels, which determines whether the pipeline will support Kafka transactions.

  • read_committed: The pipeline will read committed messages, which have been written transactionally, from Kafka partitions. The pipeline will also read messages, which have not been written transactionally, from Kafka partitions.

  • read_uncommitted: The pipeline will read messages, which have not been written transactionally, from Kafka partitions. The pipeline will also read messages which have been written transactionally, regardless if they have been committed.

For more information, see the Reading Transactional Messages section of the KafkaConsumer topic in the Apache documentation.

The isolation level, isolation.level , can be set using the CONFIG option of a CREATE PIPELINE statement. For example:

CREATE PIPELINE ... KAFKA ... CONFIG '{"isolation.level': "read_uncommitted"}' ...

By default, the isolation level is read_committed.

Specifying Multiple Kafka Brokers

The following example creates a pipeline where three Kafka brokers are specified. They are located on port 9092, at host1.example.com, host2.example.com and host3.example.com.

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA 'host1.example.com:9092,host2.example.com:9092,host3.example.com:9092/my-topic'
INTO TABLE 'table_name';
S3 Pipeline Syntax

The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an S3 bucket named bucket-name as the data source, and will start ingesting the bucket’s objects into table_name. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).

No CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, SingleStore DB will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown in the example below. The CONFIG clause can also be used to specify the suffixes for files to load or to specify the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

Note

S3 pipelines support the * wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA S3 'my_bucket/my_path*' ....

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... S3 statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

For a tutorial on getting started with S3 Pipelines, see Load Data from Amazon S3 using a Pipeline.

S3 Pipeline Using Specified Region

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;
Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA AZURE 'container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'
INTO TABLE `table_name`;

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an Azure Blob container named container-name as the data source, and will start ingesting the bucket’s objects into table_name. For a tutorial on getting started with Azure Pipelines, see Load Data from Azure Blobs using a Pipeline.

Note that no CONFIG clause is required to create an Azure pipeline unless you need to specify the suffixes for files to load or the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

An Azure pipeline must authenticate with Azure before it can begin reading blobs from a container. The pipeline requires you to provide an account name and account access key.

The CREDENTIALS clause should be a JSON object with two fields:

account_name: this is the account name under which your blob container resides. This is usually a human-readable name, given by the person who created the account.

account_key: usually an 88 character 512-bit string linked to a storage account.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... AZURE statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

See the Azure documentation about viewing and managing Azure Access Keys to learn more.

Permissions and Policies

An Azure pipeline can be created to read from either a container or a blob. Both of these resource types may be configured with access policies or permissions. Before creating an Azure pipeline, it’s important to consider both the provided user credentials and any existing policies or permissions on the desired resource.

For example, if you provide credentials that implicitly grant access to all blobs in a container, you may not need to add or modify any access policies or permissions on the desired resource. However, if you provide credentials that do not have permission to access the resource, an administrator will need to allow access for your credentials.

Consider the following scenarios:

  • Read all objects in a container: An Azure pipeline configured for a container will automatically read all blobs it has access to. Changes to a container-level policy or permissions may be required.

  • Read all blobs with a given prefix: An Azure pipeline configured to read all objects in a container with a given prefix may require changes to a container-level policy with a prefix in order to allow access the desired blobs.

  • Read a specific object in a container: An Azure pipeline configured for a specific blob may require changes to the policies and permissions for both the blob and container.

Filesystem Pipeline Syntax

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a directory as the data source, and will start ingesting data into table_name.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

In order to ensure a pipeline processes zero byte files, enable the process_zero_byte_files option in the CONFIG clause. If this option is disabled or missing, zero byte files will be skipped by default.

CONFIG '{"process_zero_byte_files" : true}'

For a tutorial on getting started with Filesystem Pipelines, see Load Data from the Filesystem using a Pipeline.

Filesystem Paths and Permissions

The paths used by the Unix filesystem extractor address files which must be accessible from every node in the cluster. The most common way to use this extractor is with a distributed filesystem, such as NFS. The path provided to a Unix filesystem extractor must be absolute, and will usually include a Unix glob pattern which matches many files, as seen in the example above. In order for the Unix filesystem extractor to load data, the files must grant read permissions to the SingleStore DB Linux user.

HDFS Pipeline Syntax

The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `table_name`
FIELDS TERMINATED BY '\t';

START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name and references the HDFS path /path/to/files as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*) are imported into my_table. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS file.

If you would like to create a pipeline that imports Hive output files, set the disable_partial_check attribute in your CREATE PIPELINE’s CONFIG JSON to true. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.

You also specify attributes in CREATE PIPELINE’s CONFIG clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

As an alternative to specifying the CONFIG clause in a CREATE PIPELINE ... HDFS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

For a tutorial on getting started with HDFS Pipelines, Load Data from HDFS using a Pipeline.

GCS Pipeline Syntax

The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA GCS 'bucket-name'
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';

START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name, uses a GCS bucket named bucket-name as the data source, and ingests the bucket’s objects into my_table. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

Note

GCS pipelines support the * wildcard in the bucket path, but not the bucket name. For example, you could create this pipeline: CREATE PIPELINE p AS LOAD DATA GCS 'my_bucket/my_path*' ....

For a tutorial on getting started with GCS Pipelines, see Load Data from Google Cloud Storage (GCS) using a Pipeline.

Authentication and Access Control in GCS

A GCS pipeline requires you to authenticate to the desired bucket. Depending on the provided credentials, a bucket’s access permissions or policies may require changes to allow access. To familiarize yourself with access management for buckets and objects, see the GCS documentation here.

Authentication

A GCS pipeline must authenticate to Google before it can begin reading objects from a bucket. While GCS allows anonymous authentication to buckets and their objects, SingleStore DB GCS pipelines support only HMAC keys. The pipeline requires you to provide a GCS access key and secret key.

The CREDENTIALS clause should be a JSON object with two fields:

access_id: usually a 24 or 60 character alphanumeric string, which is linked to the Google account, typically is all uppercase and starts with GOOG.

secret_key: usually a 40 character Base-64 encoded string that is linked to a specific access_id.

...
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
...

See the GCS documentation about managing GCS HMAC keys to learn more.

As an alternative to specifying the CONFIG and CREDENTIALS clauses in a CREATE PIPELINE ... GCS statement, you can reference a connection link. For more information, see Configuring and Using Connection Links.

GCS Pipeline Limitations

A GCS pipeline has a few limitations and also inherits limitations from the Google Cloud Storage service itself. See the GCS Documentation for more detailed information.

  • Versioning: If your GCS pipeline is configured to read from a version-enabled bucket, you will only receive the latest version of the object from the bucket. Currently, you cannot configure your GCS pipeline to read specific version IDs.

  • 5 TB Max Object Size: Google Cloud Storage supports a maximum object size of 5 TB, and this limit also applies to an GCS pipeline.

  • Rate Limiting: A large SingleStore DB cluster configured with a GCS pipeline might encounter throttling or rate limiting imposed by GCS. Currently, a GCS pipeline cannot be configured to bypass these limits by reducing the frequency of read requests. For more information on GCS’s rate limiting and how to optimize your data, see this GCS documentation page.

Creating a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source.

The statement assigns the extracted fields to the columns of a new row to be inserted into table_name or passed to proc_name. The fields can also be assigned to temporary values in the SET clause, for use in SQL transformations.

Rows that don’t match the WHERE clause are not included.

Parquet 2.0 encodings are supported.

Parquet Pipelines do not support Kafka.

When you write a CREATE PIPELINE .. FORMAT PARQUET statement, you include the LOAD DATA clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.

Extracting Parquet Values

parquet_subvalue_mapping specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET clause.

CREATE PIPELINE .. FORMAT PARQUET uses the ::-separated list of field names in a parquet_subvalue_path to perform successive field name name lookups in nested Parquet group schemas. The last field in parquet_subvalue_path must not itself have group type. Additionally, there must be no fields with repeated type along parquet_subvalue_path; unlike with JSON and Avro, extracting sub-records or sub-lists of Parquet records into SQL JSON columns is not supported with Parquet.

If an optional field along a parquet_subvalue_path is omitted in a given record, the extracted value for that column will be NULL.

parquet_subvalue_path components containing whitespace or punctuation must be surrounded by backticks.

For example, consider a Parquet record with the following schema:

message m {
  required int64 f1;
  optional group g1 {
    optional int64 f2;
    optional int64 f3;
  }
}

In an instance of this schema whose JSON equivalent is {"f1":1, {"g1":{"f2":2, "f3":None}}}, the parquet_subvalue_pathf1, g1::f2, g1::f3 will extract the values 1, 2, and NULL, respectively. If the JSON equivalent is instead {"f1":1, {"g1":None}}, the parquet_subvalue_path will extract 1, NULL, and NULL, respectively.

If you are loading .parquet data from an S3 bucket, ensure that non-empty _SUCCESS, _committed, and _started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

Converting Parquet Values

After a parquet_subvalue_path is evaluated, its value is converted to an unspecified SQL type which may be further explicitly or implicitly converted as if from a SQL string, as shown in the following table. The converted value is written to a column in the destination table or is written to a temporary variable specified in the SET clause.

Parquet Type

Converted Value

boolean

"1"/"0"

int32

The string representation of the integer.

int64

The string representation of the integer.

int96

See Converting Parquet Time Values.

float

SQL NULL if not finite. Otherwise, a string convertible without loss of precision to FLOAT

double

SQL NULL if not finite. Otherwise, a string convertible without loss of precision to DOUBLE

binary

Verbatim, from input bytes

fixed_len_byte_array

Verbatim, from input bytes

Converting Parquet Logical Types

Unsigned integer logical type annotations on int32 and int64 types are respected.

The decimal annotation on binary and fixed_len_byte_array types will trigger conversion as if to a numeric literal compatible with a SQL DECIMAL type of the same scale and precision.

All other logical type annotations are ignored and have no effect on conversion.

Converting Parquet Time Values

Values of int96 type will be converted as if to datetime literals truncated to microsecond precision. The underlying value will be converted to the SingleStore DB cluster’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Time zone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as int96. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files provide no definitive information about the time zone of int96 data.

When the TIMEZONE clause is omitted, SingleStore DB will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Providing the value of @@time_zone to the TIMEZONE clause will disable this behavior and guarantee no conversion.

When the TIMEZONE clause is provided, SingleStore DB will assume that encoded data is in the specified time zone, converting it to the SingleStore DB time zone.

Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted. In addition, if the pipeline is performing time zone conversions, then the valid range is further restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore DB time zone.

No automatic integer-to-datetime conversion occurs for time values encoded as int64 or int32, even if the logical type is a time type like timestamp. Use the SET clause to explicitly convert such values to a DATETIME compatible form.

Example

Consider a Parquet file with the following schema:

message m1 {
  required boolean f1;
  required fixed_len_byte_array(4) f2;
  optional group g1 {
    optional binary f3 (STRING);
    required int64 f4;
  }
}

example.parquet contains four records whose JSON representation is:

{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}

Create a pipeline that ingests example.parquet into table t:

CREATE TABLE t(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);

CREATE PIPELINE p
  AS LOAD DATA FS "example.parquet"
  INTO TABLE t
  (@v1 <- f1,
  @v2 <- f2,
  c3 <- g1::f3,
  c4 <- g1::f4)
  FORMAT PARQUET
  SET c2 = UPPER(CONVERT(@v2, CHAR))
  WHERE @v1 = TRUE;

Test the pipeline:

TEST PIPELINE p;
****
+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

Note the following about the output of TEST PIPELINE p;:

  • The first record in the example.parquet .json representation was not included in the output because the WHERE clause, using the temporary variable v1, filters out rows where f1 is false.

  • The second record of the output contains NULL for columns c3 and c4 because the optional group g1 is null in that record in the .json representation.

  • The third record of the output contains NULL for column c3 because the optional field g1::f3 is null in that record in the .json representation.

  • The column c2 in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable f2 , which is set to the value of v2 in each record in the .json representation.

LOAD DATA

CREATE PIPELINE shares many options with LOAD DATA, including its error handling options. It processes extracted and transformed records as if it were a LOAD DATA with the same options.

Like LOAD DATA, pipelines natively load JSON, Avro, and CSV input data, see LOAD DATA for example syntax.

The SCHEMA REGISTRY {"IP" | "Hostname"} option allows LOAD DATA to pull an Avro schema from a schema registry. For more information, see the Avro Schema Evolution with Pipelines topic.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:

    LOAD DATA KAFKA 'host.example.com/my-topic'
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'bucket-name'
  • AS LOAD DATA ... FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry. You can optionally use the CONFIG and CREDENTIALS clauses to specify configuration settings to connect to the schema registry over SSL.

    Notice

    The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL. The ssl. settings shown only apply to SingleStore DB 7.3.5 and later.

    CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro"
      INTO TABLE ...
      FORMAT AVRO
      SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
      ...
      CONFIG '{"schema.registry.ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node",
      "schema.registry.ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node",
      "schema.registry.ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}'
      CREDENTIALS '{"schema.registry.ssl.key.password": "your-password-for-the-ssl-certificate-key"}'

    Notice

    You can use a subset of the ssl. settings as follows:

    • schema.registry.ssl.key.location, schema.registry.ssl.ca.location, and schema.registry.ssl.key.password

    • schema.registry.ssl.certificate.location, schema.registry.ssl.key.location, and schema.registry.ssl.key.password

    schema.registry.ssl.key.password is only required if your SSL certificate key has a password.

    As an alternative to specifying the SSL configuration settings in the CONFIG and CREDENTIALS clauses, you can install the registry's certificate (ca-cert) on all nodes in your SingleStore DB cluster.

  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.

  • [MAX_RETRIES_PER_BATCH_PARTITION max_retries]: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table. If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up to max_retries specified in this clause. If no value is specified or MAX_RETRIES_PER_BATCH_PARTITION is set to 0, then max_retries is set to the value specified by the PIPELINES_MAX_RETRIES_PER_BATCH_PARTITION engine variable.

  • [RESOURCE POOL pool_name]: Specifies the resource pool that is used to load pipeline data.

    • If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the resource_pool engine variable as its resource pool.

    • The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.

    • The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.

    • For more information on resource pools, see Setting Resource Limits.

  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells SingleStore DB to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics, and is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.

Notice

SingleStore DB does not support the LOAD DATA ... [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS] semantics for ingesting data into columnstore tables with unique keys. As an alternative, you can use the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE or write a stored procedure that handles duplicate key errors, and have the Pipeline call this procedure.

Example

The following example shows how to use multiple SET clauses with CREATE PIPELINE ... LOAD DATA for ingesting values to columns using variables. Consider the following file:

echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv

Create a pipeline that ingests infile.csv into the following table orders.

CREATE TABLE orders (
  ID INT,
  del_t1 DATETIME,
  del_t2 DATETIME);
CREATE PIPELINE order_load AS
    LOAD DATA FS '/pipeline_test_infile/'
    INTO TABLE orders
    FIELDS TERMINATED BY ','
    (ID, @del_t1, @del_t2)
    SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),
          del_t2 = IF(@del_t2='NULL',NULL,@del_t2);

Test the pipeline:

TEST PIPELINE order_load;
****
+------+---------------------+---------------------+
| ID   | del_t1              | del_t2              |
+------+---------------------+---------------------+
|    1 | NULL                | 2020-08-06 07:53:09 |
|    2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
|    3 | 2020-08-05 07:53:09 | NULL                |
+------+---------------------+---------------------+
WITH TRANSFORM

Creates a pipeline that uses a transform. A transform is an optional, user-provided program, such as a Python script. For more information, see CREATE PIPELINE ... WITH TRANSFORM.

INTO PROCEDURE

Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline's data source. For more information, see CREATE PIPELINE ... INTO PROCEDURE.