Watch the 7.3 Webinar On-Demand
This new release brings updates to Universal Storage, query optimization, and usability that you won’t want to miss.

CREATE PIPELINE

Create a new Pipeline to continuously extract, shape, and load data into a table or stored procedure.

Syntax

CREATE [OR REPLACE] [AGGREGATOR] PIPELINE [IF NOT EXISTS] <pipeline_name> AS
  LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration
            | azure_blob_configuration | hdfs_configuration | gcs_configuration | link_configuration}
    [BATCH_INTERVAL <milliseconds>]
    [MAX_PARTITIONS_PER_BATCH <max_partitions_per_batch>]
    [MAX_RETRIES_PER_BATCH_PARTITION max_retries]
    [RESOURCE POOL <resource_pool_name>]
    [(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
    [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
  [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY | PARSER } ERRORS]
  { INTO TABLE <table_name> | INTO PROCEDURE <procedure_name> }
  { <json_format_options> | <avro_format_options> | <parquet_format_options> | <csv_format_options> }
  [ (<column_name>, ... ) ]
  [SET  <column_name> = <expression>,... | <pipeline_source_file>]
  [WHERE <expression>,...]
  [ON DUPLICATE KEY UPDATE <column_name> = <expression>, [...]]

  <kafka_configuration>:
    KAFKA 'kafka_topic_endpoint'

  <s3_configuration>:
    S3 { '<bucket-name>' | '<bucket-name/object-name>' | '<bucket-name/prefix/object-name>' }
      [CONFIG '<configuration_json>']
      CREDENTIALS '<credentials_json>'

  <filesystem_configuration>:
    FS 'path'
      [CONFIG '<configuration_json>']

  <azure_blob_configuration>:
    AZURE { '<container-name>' | '<container-name/object-name>' | '<container-name/prefix/object-name>' }
      CREDENTIALS '<credentials_json>'
      [CONFIG '<configuration_json>']

  <hdfs_configuration>:
    HDFS '<hdfs://<namenode DNS> | IP address>:<port>/<directory>'
      [CONFIG '<configuration_json>']

  <gcs_configuration>:
   GCS { '<bucket-name>' | '<bucket-name/object-name>' | '<bucket-name/prefix/object-name>' }
     CREDENTIALS '<credentials_json>'
     [CONFIG '<configuration_json>']

  <link_configuration>:
   LINK [<database_name>.]<connection_name> '<path>'

  <json_format_options>:
    FORMAT JSON
    ( {<column_name> | @<variable_name>} <- <subvalue_path> [DEFAULT <literal_expr>], ...)

  <avro_format_options>:
    FORMAT AVRO SCHEMA REGISTRY {<IP address> | <hostname>}:<port>
    ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)
    [SCHEMA '<avro_schema>']

  <subvalue_path>:
    {% | [%::]ident [::ident ...]}

  <parquet_format_options>:
    FORMAT PARQUET <parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']

  <parquet_subvalue_mapping>:
    ( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)

  <parquet_subvalue_path>:
    {ident [::ident ...]}

  <csv_format_options>:
    [FORMAT CSV]
    [{FIELDS | COLUMNS}
     TERMINATED BY '<string>'
       [[OPTIONALLY] ENCLOSED BY '<char>']
       [ESCAPED BY '<char>']
    ]
    [LINES
      [STARTING BY '<string>']
      [TERMINATED BY '<string>']
    ]
    [IGNORE <number> LINES]
    [ ({<column_name> | @<variable_name>}, ...) ]
Info

The REPLACE capability of pipelines maintains the cursor positions when replacing existing pipelines.

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA KAFKA '127.0.0.1/my-topic'
INTO TABLE `table_name`;

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into table_name. For more information about Kafka Pipelines, see Kafka Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Info

Unless otherwise specified, Kafka records may be processed out of order if the SingleStore DB cluster is 1.5 * pipelines_max_offsets_per_batch_partition behind in a single Kafka partition; however, records will be committed in order in all cases. This is an optimization specific to Kafka pipelines and is enabled by default. If you require the records to be processed in order (e.g. in upsert scenarios), create your pipeline with DISABLE OUT_OF_ORDER OPTIMIZATION specified.

Compressed topics are supported without requiring any additional configuration in SingleStore DB.

A CREATE PIPELINE statement cannot specify more than one topic.

S3 Pipeline Syntax

The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an S3 bucket named bucket-name as the data source, and will start ingesting the bucket’s objects into table_name. Credentials for your S3 bucket can either be in the form of an AWS access key or an Amazon Resource Name (ARN).

No CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, SingleStore DB will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown in the example below. The CONFIG clause can also be used to specify the suffixes for files to load or to specify the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

For more information about S3 Pipelines, see S3 Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

S3 Pipeline Using Specified Region

CREATE PIPELINE pipeline_name AS
LOAD DATA S3 'bucket-name'
CONFIG '{"region": "us-west-1"}'
CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"[, "aws_session_token": "your_temp_session_token"][, "role_arn":"your_role_arn"]}'
INTO TABLE `table_name`;

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE pipeline_name AS
LOAD DATA AZURE 'container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key": "my_account_key"}'
INTO TABLE `table_name`;

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses an Azure Blob container named container-name as the data source, and will start ingesting the bucket’s objects into table_name. For more information about Azure Pipelines, see Azure Blob Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Note that no CONFIG clause is required to create an Azure pipeline unless you need to specify the suffixes for files to load or the disable_gunzip condition. These suffixes are a JSON array of strings. When specified, CREATE PIPELINE only loads files that have the specified suffix. Suffixes in the CONFIG clause can be specified without a . before them, for example, CONFIG '{"suffixes": ["csv"]}'. When enabled, the disable_gunzip option disables the decompression of files with the .gz extension. If this option is disabled or missing, files with the .gz extension will be decompressed.

Each of the clauses in a CREATE PIPELINE statement are described below.

Filesystem Pipeline Syntax

Info

SingleStore Managed Service does not support Filesystem Pipelines.

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';

START PIPELINE pipeline_name;

This statement creates a new pipeline named pipeline_name, uses a directory as the data source, and will start ingesting data into table_name.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

In order to ensure a pipeline processes zero byte files, enable the process_zero_byte_files option in the CONFIG clause. If this option is disabled or missing, zero byte files will be skipped by default.

CONFIG '{"process_zero_byte_files" : true}'

For more information about Filesystem Pipelines, see Filesystem Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

HDFS Pipeline Syntax

The following example statement demonstrates how to create an HDFS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA HDFS 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE `table_name`
FIELDS TERMINATED BY '\t';

START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name and references the HDFS path /path/to/files as the data source. Once the pipeline is started, the HDFS extractor recursively walks the directory tree in the HDFS path, looking for MapReduce output files. Any files with names matching the regular expression part-(m|r)-(.*) are imported into my_table. The import occurs only if there are additional files in the directory that don’t match the regular expression; this check confirms that MapReduce created a SUCCESS file.

If you would like to create a pipeline that imports Hive output files, set the disable_partial_check attribute in your CREATE PIPELINE's CONFIG JSON to true. When the pipeline runs, the extractor will import files as described in the previous paragraph, but will not perform the check for additional files in the directory.

You also specify attributes in CREATE PIPELINE's CONFIG clause when you use advanced HDFS pipelines mode. In this mode, you can encrypt your pipeline’s connection to HDFS and you can authenticate your pipeline using Kerberos.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

For more information about HDFS Pipelines, see HDFS Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

GCS Pipeline Syntax

The following example statement demonstrates how to create a GCS pipeline using the minimum required syntax:

CREATE PIPELINE pipeline_name
AS LOAD DATA GCS 'bucket-name'
CREDENTIALS '{"access_id": "your_google_access_key", "secret_key": "your_google_secret_key"}'
INTO TABLE `table_name`
FIELDS TERMINATED BY ',';

START PIPELINE pipeline_name;

This example creates a new pipeline named pipeline_name, uses a GCS bucket named bucket-name as the data source, and ingests the bucket’s objects into my_table. A GCS pipeline requires authentication to Google before it can start reading objects from a bucket. It supports only HMAC keys and requires a GCS access key and secret key.

To disable the decompression of files with the .gz extension, enable the disable_gunzip option in the CONFIG clause. If this option is disabled or missing, files with the .gz extension will be decompressed.

CONFIG '{"disable_gunzip" : true}'

For more information about GCS pipelines, see GCS Pipelines Overview.

The CREATE PIPELINE .. LINK statement loads data from the data provider using a connection link. To use this command, you only need to know the connection link name, not the connection details and configuration. However, you need the SHOW LINK permission, provided by your administrator, to use a connection link. This command supports connections to S3, Azure, GCS, HDFS, and Kafka only.

The following example creates a pipeline using an HDFS connection:

USE db1;
CREATE PIPELINE mypipeline AS LOAD DATA
LINK HDFSconnection 'hdfs://hadoop-namenode:8020/path/to/files'
INTO TABLE my_table;

START PIPELINE mypipeline;

Creating a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement extracts specified fields from records in Parquet files. These files may be located in an Azure, HDFS, S3, or filesystem data source.

The statement assigns the extracted fields to the columns of a new row to be inserted into table_name or passed to proc_name. The fields can also be assigned to temporary values in the SET clause, for use in SQL transformations.

Rows that don’t match the WHERE clause are not included.

Parquet 2.0 encodings are supported.

Parquet Pipelines do not support Kafka.

When you write a CREATE PIPELINE .. FORMAT PARQUET statement, you include the LOAD DATA clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.

Extracting Parquet Values

parquet_subvalue_mapping specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET clause.

CREATE PIPELINE .. FORMAT PARQUET uses the ::-separated list of field names in a parquet_subvalue_path to perform successive field name name lookups in nested Parquet group schemas. The last field in parquet_subvalue_path must not itself have group type. Additionally, there must be no fields with repeated type along parquet_subvalue_path; unlike with JSON and Avro, extracting sub-records or sub-lists of Parquet records into SQL JSON columns is not supported with Parquet.

If an optional field along a parquet_subvalue_path is omitted in a given record, the extracted value for that column will be NULL.

parquet_subvalue_path components containing whitespace or punctuation must be surrounded by backticks.

For example, consider a Parquet record with the following schema:

message m {
  required int64 f1;
  optional group g1 {
    optional int64 f2;
    optional int64 f3;
  }
}

In an instance of this schema whose JSON equivalent is {"f1":1, {"g1":{"f2":2, "f3":None}}}, the parquet_subvalue_path f1, g1::f2, g1::f3 will extract the values 1, 2, and NULL, respectively. If the JSON equivalent is instead {"f1":1, {"g1":None}}, the parquet_subvalue_path will extract 1, NULL, and NULL, respectively.

If you are loading .parquet data from an S3 bucket, ensure that non-empty _SUCCESS, _committed, and _started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

{

Info

SingleStore supports shell globs in bucket names. So you can use wildcards while specifying the S3 path, for example, s3://my_bucket/test_data.parquet/*.parquet.

}

Converting Parquet Values

After a parquet_subvalue_path is evaluated, its value is converted to an unspecified SQL type which may be further explicitly or implicitly converted as if from a SQL string, as shown in the following table. The converted value is written to a column in the destination table or is written to a temporary variable specified in the SET clause.

Parquet Type Converted Value
boolean "1"/"0"
int32 The string representation of the integer.
int64 The string representation of the integer.
int96 See Converting Parquet Time Values.
float SQL NULL if not finite. Otherwise, a string convertible without loss of precision to FLOAT
double SQL NULL if not finite. Otherwise, a string convertible without loss of precision to DOUBLE
binary Verbatim, from input bytes
fixed_len_byte_array Verbatim, from input bytes

Converting Parquet Logical Types

Unsigned integer logical type annotations on int32 and int64 types are respected.

The decimal annotation on binary and fixed_len_byte_array types will trigger conversion as if to a numeric literal compatible with a SQL DECIMAL type of the same scale and precision.

All other logical type annotations are ignored and have no effect on conversion.

Converting Parquet Time Values

Values of int96 type will be converted as if to datetime literals truncated to microsecond precision. The underlying value will be converted to the SingleStore DB cluster’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Time zone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as int96. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files provide no definitive information about the time zone of int96 data.

When the TIMEZONE clause is omitted, SingleStore DB will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Providing the value of @@time_zone to the TIMEZONE clause will disable this behavior and guarantee no conversion.

When the TIMEZONE clause is provided, SingleStore DB will assume that encoded data is in the specified time zone, converting it to the SingleStore DB time zone.

Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted. In addition, if the pipeline is performing time zone conversions, then the valid range is further restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore DB time zone.

No automatic integer-to-datetime conversion occurs for time values encoded as int64 or int32, even if the logical type is a time type like timestamp. Use the SET clause to explicitly convert such values to a DATETIME compatible form.

Example

Consider a Parquet file with the following schema:

message m1 {
  required boolean f1;
  required fixed_len_byte_array(4) f2;
  optional group g1 {
    optional binary f3 (STRING);
    required int64 f4;
  }
}

example.parquet contains four records whose JSON representation is:

{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}

Create a pipeline that ingests example.parquet into table t:

CREATE TABLE t(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);

CREATE PIPELINE p
  AS LOAD DATA FS "example.parquet"
  INTO TABLE t
  (@v1 <- f1,
  @v2 <- f2,
  c3 <- g1::f3,
  c4 <- g1::f4)
  FORMAT PARQUET
  SET c2 = UPPER(CONVERT(@v2, CHAR))
  WHERE @v1 = TRUE;

Test the pipeline:

TEST PIPELINE p;
****
+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

Note the following about the output of TEST PIPELINE p;:

  • The first record in the example.parquet .json representation was not included in the output because the WHERE clause, using the temporary variable v1, filters out rows where f1 is false.
  • The second record of the output contains NULL for columns c3 and c4 because the optional group g1 is null in that record in the .json representation.
  • The third record of the output contains NULL for column c3 because the optional field g1::f3 is null in that record in the .json representation.
  • The column c2 in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable f2 , which is set to the value of v2 in each record in the .json representation.

LOAD DATA

CREATE PIPELINE shares many options with LOAD DATA, including its error handling options. It processes extracted and transformed records as if it were a LOAD DATA with the same options.

Like LOAD DATA, pipelines natively load JSON, Avro, and CSV input data.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'bucket-name'
    
  • AS LOAD DATA ... FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}: This option allows the pipeline to pull an Avro schema from Confluent Schema Registry. You can optionally use the CONFIG and CREDENTIALS clauses to specify configuration settings to connect to the schema registry over SSL.

    Info

    The following syntax assumes you are creating a filesystem pipeline that connects to the schema registry over SSL. The ssl. settings shown only apply to SingleStore DB 7.3.5 and later.

    CREATE PIPELINE ... AS LOAD DATA FS "/path/to/files/data.avro"
      INTO TABLE ...
      FORMAT AVRO
      SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
      ...
      CONFIG '{"ssl.certificate.location": "path-to-your-ssl-certificate-on-the-singlestore-db-node",
      "ssl.key.location": "path-to-your-ssl-certificate-key-on-the-singlestore-db-node",
      "ssl.ca.location": "path-to-your-ca-certificate-on-the-singlestore-db-node"}'
      CREDENTIALS '{"ssl.key.password": "your-password-for-the-ssl-certificate-key"}'
    

    Note: You can use a subset of the ssl. settings as follows:

    • ssl.key.location, ssl.ca.location, and ssl.key.password
    • ssl.certificate.location, ssl.key.location, and ssl.key.password

    ssl.key.password is only required if your SSL certificate key has a password.

    As an alternative to specifying the SSL configuration settings in the CONFIG and CREDENTIALS clauses, you can install the registry’s certificate (ca-cert) on all nodes in your SingleStore DB cluster.

    For more information, including examples, see the Avro Schema Evolution With Pipelines topic.

  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the amount of time that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
    
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.

  • [MAX_RETRIES_PER_BATCH_PARTITION max_retries]: Specifies the maximum number of retries for each batch partition to write batch partition data to the destination table. If a batch transaction fails at any point, for example during extraction from a data source, optional transformation, or loading of the data into the destination table, it will be retried up to max_retries specified in this clause. If no value is specified or MAX_RETRIES_PER_BATCH_PARTITION is set to 0, then max_retries is set to the value specified by the pipelines_max_retries_per_batch_partition engine variable.

  • [RESOURCE POOL pool_name]: Specifies the resource pool that is used to load pipeline data.

    • If the resource pool is not specified at the time of pipeline creation, then the pipeline uses the user’s default resource pool. If no default resource pool has been set for the user, then the pipeline uses the value of the resource_pool engine variable as its resource pool.
    • The background tasks of a pipeline runs in its resource pool. Therefore, a resource pool used by a pipeline cannot be dropped.
    • The user who creates the pipeline must have permissions to use the resource pool. The pipeline will continue to use the pool even if the right to use the pool for the pipeline is revoked for the user who created the pool.
    • For more information on resource pools, see Setting Resource Limits.
  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells SingleStore DB to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics, and is required for pipelines into reference tables and tables with auto increment columns. A stored procedure cannot be used with an aggregator pipeline.

Info

SingleStore DB does not support the LOAD DATA ... [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS] semantics for ingesting data into columnstore tables with unique keys. As an alternative, you can use the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE or write a stored procedure that handles duplicate key errors, and have the Pipeline call this procedure.

Example

The following example shows how to use multiple SET clauses with CREATE PIPELINE ... LOAD DATA for ingesting values to columns using variables. Consider the following file:

echo "1,NULL,2020-08-06 07:53:09" >> /pipeline_test_infile/infile.csv
echo "2,2020-08-06 07:53:09,2020-09-06 07:53:09" > /pipeline_test_infile/infile.csv
echo "3,2020-08-06 07:53:09,NULL" >> /pipeline_test_infile/infile.csv

Create a pipeline that ingests infile.csv into the following table orders.

CREATE TABLE orders (
  ID INT,
  del_t1 DATETIME,
  del_t2 DATETIME);
CREATE PIPELINE order_load AS
	LOAD DATA FS '/pipeline_test_infile/'
	INTO TABLE orders
	FIELDS TERMINATED BY ','
	(ID, @del_t1, @del_t2)
	SET del_t1 = IF(@del_t1='NULL',NULL,@del_t1),
		  del_t2 = IF(@del_t2='NULL',NULL,@del_t2);

Test the pipeline:

TEST PIPELINE order_load;
****
+------+---------------------+---------------------+
| ID   | del_t1              | del_t2              |
+------+---------------------+---------------------+
|    1 | NULL                | 2020-08-06 07:53:09 |
|    2 | 2020-08-07 07:53:09 | 2020-09-08 07:53:09 |
|    3 | 2020-08-05 07:53:09 | NULL                |
+------+---------------------+---------------------+

WITH TRANSFORM

A transform is an optional user-provided program, such as a Python script. A transform receives data from a pipeline’s extractor, shapes the data, and returns the results to the pipeline. The pipeline then loads the result data into the database.

A transform is one of three methods you can use to shape data ingested from a pipeline.

Info

SingleStore Managed Service does not support transforms.

Syntax

WITH TRANSFORM ('uri', 'program', 'arguments [...]'): Each of the transform’s parameters are described below:

  • uri: The transform’s URI is the location from where the user-provided program can be downloaded, which is specified as either an http:// or file:// endpoint. If the URI contains a tarball with a .tar.gz or .tgz extension, its contents will be automatically extracted. If the uri contains a tarball, the program parameter must also be specified. Alternatively, if the URI specifies the user-provided program filename itself (such as file://localhost/root/path/to/my-transform.py), the program and arguments parameters can be empty.
  • program: The filename of the user-provided program to run. This parameter is required if a tarball was specified as the endpoint for the transform’s url. If the url specifies the user-provided program file itself, this parameter can be empty.
  • arguments: A series of arguments that are passed to the transform at runtime. Each argument must be delimited by a space.
WITH TRANSFORM('http://memsql.com/my-transform.py','','')
WITH TRANSFORM('file://localhost/root/path/to/my-transform.py','','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-transform.py','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-transform.py', '-arg1 -arg1')

Remarks

  • During pipeline creation, a cluster’s master aggregator distributes the transform to each leaf node in the cluster. Each leaf node then executes the transform every time a batch partition is processed.
  • When the CREATE PIPELINE statement is executed, the transform must be accessible at the specified file system or network endpoint. If the transform is unavailable, pipeline creation will fail.
  • Depending on your desired language used to write the transform and your desired platform used to deploy the transform, any virtual machine overhead may greatly reduce a pipeline’s performance. Transforms are executed every time a batch partition is processed, which can be many times per second. Virtual machine overhead will reduce the execution speed of a transform, and thus degrade the performance of the entire pipeline.
  • You must install any required dependencies for your transform (such as Python) on each leaf node in your cluster. Test out your pipeline by running TEST PIPELINE before running START PIPELINE to make sure your nodes are set up properly.
  • Transforms can be written in any language, but the SingleStore DB node’s host Linux distribution must have the required dependencies to execute the transform. For example, if you write a transform in Python, the node’s Linux distribution must have Python installed and configured before it can be executed.
  • At the top of your transform file, use a shebang to specify the interpreter to use to execute the script (e.g. #!/usr/bin/env python3 for Python 3 or #!/usr/bin/env ruby for Ruby).
  • Use Unix line endings in your transform file.
  • A transform reads from stdin to receive data from a pipeline’s extractor. After shaping the input data, the transform writes to stdout, which returns the results to the pipeline.
  • Transactional guarantees apply to data written to stdout, only. There are no transactional guarantees for any side effects that are coded in the transform logic.
Info

For an example implementation of a transform, See Writing a Transform.

INTO PROCEDURE

Creates a pipeline that uses a stored procedure to shape the data that is extracted from the pipeline’s data source. Using a stored procedure is one of three methods you can use to shape the data.

Syntax

CREATE PROCEDURE <procedure_name> (<query_name> QUERY(<field_name> <data_type>, ...))
AS
BEGIN
	<procedure_body>
END

CREATE PIPELINE <pipeline_name>
AS LOAD DATA <load_data_options>
INTO PROCEDURE <procedure_name>

Remarks

Info

The implementation of your stored procedure can have significant consequences on the performance and memory use of your pipeline. Pipelines by default shard incoming data across leaf partitions and perform the majority of their work in parallel across those partitions, but certain features available in stored procedures can force computation and memory usage to occur on an aggregator node. For more details, see Writing Efficient Stored Procedures for Pipelines.

The following are a list of restrictions and recommendations that you should follow when using a stored procedure with your pipeline:

  • The field list in your QUERY type variable must conform to the schema of the pipeline source (Kafka, S3, etc.).
  • The query type variable must be the only parameter in the stored procedure used with your pipeline.
  • The value of <query_name> is the current batch of records that have been extracted from the pipeline’s data source.
  • DDL commands are not allowed in stored procedures that are called from pipelines.
  • Use the pipeline_source_file() built-in function in the SET clause to set a table column to the pipeline data source file.
  • The SET and WHERE clauses are executed before the stored procedure.
  • CREATE PIPELINE ... AS LOAD DATA ... IGNORE and CREATE PIPELINE ... AS LOAD DATA ... SKIP ... ERRORS only recognize parsing errors. You must specify the desired behavior in the event of constraint errors in the body of the stored procedure.
  • Transactions (e.g BEGIN TRANSACTION, COMMIT, etc.) are not allowed because the pipeline manages the transaction state for you.
  • Pipelines into stored procedures maintain the same exactly-once semantics as other pipelines, but this means certain read-write patterns (reads after writes in reshuffles or reference tables) are not allowed.
  • Upserts can be specified in the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE. Alternatively, duplicate key behavior can be specified inside of a stored procedure.
  • OUT_OF_ORDER OPTIMIZATION cannot be disabled when using pipelines into stored procedures. If needed, you should enforce ordering in the stored procedure.

Equivalent CREATE PIPELINE … INTO TABLE and CREATE PIPELINE … INTO PROCEDURE Statements

For a table t containing one INT column a:

CREATE PIPELINE pipeline_without_stored_proc
AS LOAD DATA FS '/data.txt'
INTO TABLE t;

is equivalent to:

CREATE OR REPLACE PROCEDURE proc(batch QUERY(a INT))
AS
BEGIN
  INSERT INTO t(a) SELECT * FROM batch;
END //

DELIMITER ;

CREATE PIPELINE pipeline_using_stored_proc
AS LOAD DATA FS '/data.txt'
INTO PROCEDURE proc;

Examples

Info

For simplicity, these examples extract data from the file system. For examples of using CREATE PIPELINE to extract data from other types of data sources, see other examples in this topic.

Example: Loading Data into Multiple Tables

See the second method in this example.

Example: Denormalizing Data

This example extracts, from the data source course_enrollment.txt, a list of students who are enrolled in courses. The data source contains a line per enrollment with the columns course ID (column 1) and the student ID (column 2). Following is sample data in the file.

CS-101,1000047
CS-101,1010382
CS-201,1070044
CS-201,1008022

The pipeline inserts the data extracted from the data source into the course_enrollment table that is defined as follows:

CREATE TABLE course_enrollment(course_id TEXT, first_name TEXT, last_name TEXT);

To find the student’s first name and last name, the stored procedure joins the extracted data with the student table, which is defined as follows, along with sample data.

CREATE TABLE student(id INT PRIMARY KEY, first_name TEXT, last_name TEXT);

INSERT INTO student(id, first_name, last_name) VALUES (1000047,"John","Smith");
INSERT INTO student(id, first_name, last_name) VALUES (1010382,"Mary","Smith");
INSERT INTO student(id, first_name, last_name) VALUES (1070044,"John","Doe");
INSERT INTO student(id, first_name, last_name) VALUES (1008022,"Mary","Doe");

Define the stored procedure:

DELIMITER //

CREATE OR REPLACE PROCEDURE course_enrollment_proc(batch QUERY(course_id TEXT, student_id INT))
AS
BEGIN
  INSERT INTO course_enrollment(course_id, first_name, last_name) SELECT b.course_id, s.first_name, s.last_name FROM batch b JOIN student s ON b.student_id = s.id;
END //

DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE course_enrollment_pipeline
AS LOAD DATA FS '/course_enrollment.txt'
INTO PROCEDURE course_enrollment_proc
FIELDS TERMINATED BY ',';

START PIPELINE course_enrollment_pipeline;

Retrieve the data in the course_enrollment table.

SELECT * FROM course_enrollment ORDER BY course_id, last_name, first_name;
****
+-----------+------------+-----------+
| course_id | first_name | last_name |
+-----------+------------+-----------+
| CS-101    | John       | Smith     |
| CS-101    | Mary       | Smith     |
| CS-201    | John       | Doe       |
| CS-201    | Mary       | Doe       |
+-----------+------------+-----------+

Example: Normalizing Data

This example extracts, from the data source passengers.txt, a list of passengers who are booked on flights. The data source contains a line per passenger with the columns flight number (column 1), first name (column 2), last name (column 3), and date of birth (column 4). Following is sample data in the file.

101,John,Smith,1990-02-08
101,Mary,Smith,1991-04-19
101,John,Doe,1986-09-09
101,Mary,Doe,1984-05-25

The pipeline splits the data extracted from the data source into two tables that are defined as follows:

CREATE TABLE passenger(id BIGINT AUTO_INCREMENT PRIMARY KEY, first_name TEXT, last_name TEXT, date_of_birth DATETIME);
CREATE TABLE passenger_list(flight_id INT, passenger_id INT, SHARD(flight_id));

Create the stored procedure:

DELIMITER //

CREATE OR REPLACE PROCEDURE passenger_list_proc(batch QUERY(flight_id INT, first_name TEXT, last_name TEXT, date_of_birth DATETIME))
AS
BEGIN

  /* Insert new passenger records from the batch into the passenger table */
  INSERT INTO passenger(first_name, last_name, date_of_birth)
    SELECT DISTINCT first_name, last_name, date_of_birth FROM batch b WHERE NOT EXISTS
      (SELECT * FROM passenger p WHERE p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth);

  /* Insert passengers ids and their flights from the batch into the passenger_list table. To get the passenger ids, join to the passenger table. */
  INSERT INTO passenger_list(flight_id, passenger_id)
    SELECT flight_id, p.id FROM batch b JOIN passenger p
    on p.first_name = b.first_name AND p.last_name = b.last_name AND p.date_of_birth = b.date_of_birth;

END //

DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE passenger_pipeline
AS LOAD DATA FS '/passenger.txt'
INTO PROCEDURE passenger_list_proc
FIELDS TERMINATED BY ',';

START PIPELINE passenger_pipeline;

Retrieve the data from the passenger and passenger_list tables:

SELECT * FROM passenger ORDER BY id;
****
+----+------------+-----------+---------------------+
| id | first_name | last_name | date_of_birth       |
+----+------------+-----------+---------------------+
|  1 | Mary       | Doe       | 1984-05-25 00:00:00 |
|  2 | John       | Smith     | 1990-02-08 00:00:00 |
|  3 | Mary       | Smith     | 1991-04-19 00:00:00 |
|  4 | John       | Doe       | 1986-09-09 00:00:00 |
+----+------------+-----------+---------------------+
SELECT * FROM passenger_list ORDER by flight_id, passenger_id;
****
+-----------+--------------+
| flight_id | passenger_id |
+-----------+--------------+
|       101 |            1 |
|       101 |            2 |
|       101 |            3 |
|       101 |            4 |
+-----------+--------------+

Example: Loading JSON Data and Calculating an Aggregate

This example extracts a list of tweets from the data source tweets.txt. The data source contains one line per tweet, where each tweet is represented as a JSON object. Following is sample data in the file.

{"tweet_id":"100", "tweet_user_id":"200", "tweet_text":"Test tweet 1", "retweet_user_id":"502"}
{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}
{"tweet_id":"101", "tweet_user_id":"213", "tweet_text":"Test tweet 2", "retweet_user_id":"518"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}
{"tweet_id":"103", "tweet_user_id":"265", "tweet_text":"Test tweet 4"}
{"tweet_id":"102", "tweet_user_id":"239", "tweet_text":"Test tweet 3", "retweet_user_id":"511"}

tweet_user_id is id of the user who created the tweet. retweet_user_id is optional, and is the id of a user who retweeted the tweet. A single line in the data source can contain at most one retweet_user_id.

If more than one user has retweeted a tweet, the file will contain duplicate lines with the same tweet_id, tweet_user_id, and tweet_text but with a different retweet_user_id. This is the case with tweet_ids 101 and 102.

Info

The data in tweets.txt is for demonstration purposes only and is not intended to be real Twitter data.

The pipeline inserts the tweet_id, tweet_user_id, and tweet_text from the data source into the tweets table. Duplicate entries containing the same tweet_id are not inserted.

The retweets_counter table contains the number of tweets that have been retweeted by each user.

The tables are defined as follows:

CREATE TABLE tweets(tweet_id TEXT PRIMARY KEY, tweet_user_id TEXT, tweet_text TEXT);
CREATE TABLE retweets_counter(user_id TEXT PRIMARY KEY, num_retweets INT);

Create the stored procedure:

DELIMITER //

CREATE OR REPLACE PROCEDURE tweets_proc(batch QUERY(tweet JSON))
AS
BEGIN

  /* INSERT IGNORE does not attempt to insert records having a duplicate tweet_id into the tweets table. */
  INSERT IGNORE INTO tweets(tweet_id, tweet_user_id, tweet_text)
    SELECT tweet::tweet_id, tweet::tweet_user_id, tweet::tweet_text
    FROM batch;

  /* For each tweet in the batch, retrieve the id of the user who retweeted it. Then, for each of these retrived ids, upsert the id into the retweets_counter table. For each duplicate id encountered, add 1 to the num_retweets column. */
  INSERT INTO retweets_counter(user_id, num_retweets)
    SELECT tweet::retweet_user_id, 1
    FROM batch
    WHERE tweet::retweet_user_id IS NOT NULL
  ON DUPLICATE KEY UPDATE num_retweets = num_retweets + 1;
END //

DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE tweets_pipeline
AS LOAD DATA FS '/tweets.txt'
INTO PROCEDURE tweets_proc;

START PIPELINE tweets_pipeline;

Retrieve the data from the tweets and retweets_counter tables.

SELECT * FROM tweets ORDER BY tweet_id;
****
+----------+---------------+----------------+
| tweet_id | tweet_user_id | tweet_text     |
+----------+---------------+----------------+
| "100"    | "200"         | "Test tweet 1" |
| "101"    | "213"         | "Test tweet 2" |
| "102"    | "239"         | "Test tweet 3" |
| "103"    | "265"         | "Test tweet 4" |
+----------+---------------+----------------+
SELECT * FROM retweets_counter ORDER BY user_id;
****
+---------+--------------+
| user_id | num_retweets |
+---------+--------------+
| "502"   |            1 |
| "511"   |            3 |
| "518"   |            2 |
+---------+--------------+

Example: Tracking a Slowly Changing Dimension

This example tracks a Type 2 Slowly Changing Dimension. The data source, product_history.txt, contains a line for each product. A product has a code (column 1), description (column 2), price (column 3), and create date of the product record (column 4). A change to a product record is entered as a new record in the file. The new record has the same code as the original product record. Following is sample data in the file.

10,notebook,3.95,2020-01-05
15,pens (10 pack),10.25,2020-01-05
15,pens w/med size point (10 pack),10.25,2020-01-13
10,notebook (200 pages),3.95,2020-01-10
15,pens with med size point (10 pack),10.25,2020-01-14
18,envelopes (50 pack),2.25,2020-01-18
15,pens with medium size point (10 pack),10.25,2020-01-19

The pipeline inserts each extracted row from product_history.txt into the table product_history. The table is defined as follows:

CREATE TABLE product_history(code INT, description TEXT, price FLOAT, start_date DATETIME,current_record BOOLEAN);

start_date is the created date of the product record in product_history.txt. current_record is 1 if the record is the latest created record for an id.

Create the stored procedure:


DELIMITER //

CREATE OR REPLACE PROCEDURE changing_dimension_proc(batch QUERY(code INT, new_description TEXT, new_price FLOAT,start_date DATETIME))
AS
BEGIN
  /* Insert all batch records into product_history. Insert 0 for current_record. */
  INSERT INTO product_history(code, description, price, start_date, current_record) SELECT code, new_description, new_price, start_date,0 FROM batch;

  /* Update all product_history records having a code that is found in the batch. The update sets current_record to 0. This is done so current_record can be set correctly in the next step. */
  UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 0 WHERE current_record = 0;

  /* Update product_history records having a code that is in the batch, setting current_record = 1 if the record has the latest start_date of all of the records in product_history with the same code. */
  UPDATE product_history ph JOIN batch b ON ph.code = b.code SET current_record = 1 WHERE ph.start_date = (SELECT max(start_date) FROM product_history WHERE code = b.code);
END //

DELIMITER ;

Create and start the pipeline:

CREATE PIPELINE changing_dimension_pipeline
AS LOAD DATA FS '/product_history.txt'
INTO PROCEDURE changing_dimension_proc
FIELDS TERMINATED BY ',';

START PIPELINE changing_dimension_pipeline;

Retrieve the data from the product_history table:

SELECT * FROM product_history ORDER by code, start_date;
****
+------+---------------------------------------+-------+---------------------+----------------+
| code | description                           | price | start_date          | current_record |
+------+---------------------------------------+-------+---------------------+----------------+
|   10 | notebook                              |  3.95 | 2020-01-05 00:00:00 |              0 |
|   10 | notebook (200 pages)                  |  3.95 | 2020-01-10 00:00:00 |              1 |
|   15 | pens (10 pack)                        | 10.25 | 2020-01-05 00:00:00 |              0 |
|   15 | pens w/med size point (10 pack)       | 10.25 | 2020-01-13 00:00:00 |              0 |
|   15 | pens with med size point (10 pack)    | 10.25 | 2020-01-14 00:00:00 |              0 |
|   15 | pens with medium size point (10 pack) | 10.25 | 2020-01-19 00:00:00 |              1 |
|   18 | envelopes (50 pack)                   |  2.25 | 2020-01-18 00:00:00 |              1 |
+------+---------------------------------------+-------+---------------------+----------------+

Example: Handling Duplicate Keys Using Upserts

Upserts can be specified in the ON DUPLICATE KEY UPDATE clause of CREATE PIPELINE. Alternatively, duplicate key behavior can be specified inside the stored procedure itself, as shown in the next example.

Example: Handling Duplicate Keys by Inserting Duplicate Records into a Separate Table

This example extracts, from the data source duplicate_keys.txt, a list records that each contain a key (column 1) and a value (column 2). Following is sample data in the file.

1,33
4,28
1,45
1,56
6,88
4,67

The pipeline inserts the extracted rows from the data source into the table t, except for the extracted rows that have duplicate keys. Duplicate key records are inserted into t_duplicates. The tables are defined as follows.

CREATE TABLE t(a INT PRIMARY KEY, b INT);
CREATE TABLE t_duplicates(a INT, b INT);

Define the stored procedure:

DELIMITER //

CREATE OR REPLACE PROCEDURE duplicate_keys_proc(batch QUERY(a INT, b INT))
AS
BEGIN  
  FOR batch_record IN COLLECT(batch) LOOP
    BEGIN
      INSERT INTO t(a,b) VALUES (batch_record.a,batch_record.b);
      EXCEPTION
        WHEN ER_DUP_ENTRY THEN
          INSERT INTO t_duplicates(a,b) VALUES (batch_record.a,batch_record.b);
    END;
  END LOOP;
END //

DELIMITER ;

The stored procedure handles batch records with duplicate keys by handling the ER_DUP_ENTRY exception.

Create and start the pipeline:

CREATE PIPELINE duplicate_keys_pipeline
AS LOAD DATA FS '/duplicate_keys.txt'
INTO PROCEDURE duplicate_keys_proc
FIELDS TERMINATED BY ',';

START PIPELINE duplicate_keys_pipeline;

Retrieve the data from t and t_duplicates:

SELECT * FROM t ORDER BY a;
****
+---+------+
| a | b    |
+---+------+
| 1 |   33 |
| 4 |   28 |
| 6 |   88 |
+---+------+
SELECT * FROM t_duplicates ORDER BY a;
****
+------+------+
| a    | b    |
+------+------+
|    1 |   56 |
|    1 |   45 |
|    4 |   67 |
+------+------+