You are viewing an older version of this section. View current production version.
Join the SingleStore Community Today
Get expert advice, develop skills, and connect with others.

CREATE PIPELINE

The CREATE PIPELINE clause creates a new pipeline in a MemSQL database.

CREATE [AGGREGATOR] PIPELINE [IF NOT EXISTS] pipeline_name AS
  LOAD DATA { kafka_configuration | s3_configuration | filesystem_configuration | azure_blob_configuration }
    [BATCH_INTERVAL milliseconds]
    [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
    [WITH TRANSFORM ('uri', 'executable', 'arguments [...]') ]
  [REPLACE | IGNORE | SKIP { ALL | CONSTRAINT | DUPLICATE KEY } ERRORS]
  INTO TABLE table_name
  [FIELDS | COLUMNS]
  [TERMINATED BY 'string'
    [[OPTIONALLY] ENCLOSED BY 'char']
    [ESCAPED BY 'char']
  ]
  [LINES
    [STARTING BY 'string']
    [TERMINATED BY 'string']
  ]
  [IGNORE number LINES]
  [ (column_name, ... ) ]
  [SET  col_name = expr,...]
  [WHERE expr,...]
  [ON DUPLICATE KEY UPDATE column_name = expression, [...]]

  kafka_configuration:
    KAFKA 'kafka_topic_endpoint'

  s3_configuration:
    S3 { 'bucket-name' | 'bucket-name/object-name' | 'bucket-name/prefix/object-name' }
      [CONFIG 'configuration_json']
      CREDENTIALS 'credentials_json'


  filesystem_configuration:
  FS 'path'

  azure_blob_configuration:
    AZURE { 'container-name' | 'container-name/object-name' | 'container-name/prefix/object-name' }
      CREDENTIALS 'credentials_json'
      [CONFIG 'configuration_json']

Kafka Pipeline Syntax

The following example statement demonstrate how to create a Kafka pipeline using the minimum required syntax:

Minimum Required Kafka Pipeline Syntax:

CREATE PIPELINE mypipeline AS
  LOAD DATA KAFKA '127.0.0.1/my-topic'
  INTO TABLE `my_table`;

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses a Kafka cluster as the data source, points to the location of the my-topic topic at the Kafka cluster’s endpoint, and will start ingesting data into my_table. For more information about Kafka Pipelines, see Kafka Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

S3 Pipeline Syntax

The following example statement demonstrate how to create an S3 pipeline using the minimum required syntax:

Minimum Required S3 Pipeline Syntax:

CREATE PIPELINE mypipeline AS
  LOAD DATA S3 'my-bucket-name'
    CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"}'
  INTO TABLE `my_table`

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses an S3 bucket named my-bucket-name as the data source, and will start ingesting the bucket’s objects into my_table. For more information about S3 Pipelines, see S3 Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Note that no CONFIG clause is required to create an S3 pipeline. This clause is used to specify the Amazon S3 region where the source bucket is located. If no CONFIG clause is specified, MemSQL will automatically use the us-east-1 region, also known as US Standard in the Amazon S3 console. To specify a different region, such as us-west-1, include a CONFIG clause as shown:

S3 Pipeline Using Specified Region:

CREATE PIPELINE mypipeline AS
  LOAD DATA S3 'my-bucket-name'
    CONFIG '{"region": "us-west-1"}'
    CREDENTIALS '{"aws_access_key_id": "your_access_key_id", "aws_secret_access_key": "your_secret_access_key"}'
 INTO TABLE `my_table`

Azure Blob Pipeline Syntax

The following example statement demonstrate how to create an Azure Blob pipeline using the minimum required syntax. Note that Azure Blob Pipelines are only available in MemSQL 5.8.5 and above.

Minimum Required Azure Pipeline Syntax:

CREATE PIPELINE mypipeline AS
LOAD DATA AZURE 'my-container-name'
CREDENTIALS '{"account_name": "my_account_name", "account_key":
"my_account_key"}'
INTO TABLE `my_table`

START PIPELINE mypipeline;

This statement creates a new pipeline named my_pipeline, uses an Azure Blob container named my-container-name as the data source, and will start ingesting the bucket’s objects into my_table. For more information about Azure Pipelines, see Azure Blob Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

Note that no CONFIG clause is required to create an Azure pipeline.

Each of the clauses in a CREATE PIPELINE statement are described below.

Filesystem Pipeline Syntax

The following example statement demonstrates how to create a Filesystem pipeline using the minimum required syntax:

CREATE PIPELINE my_pipeline
AS LOAD DATA FS '/path/to/files/*'
INTO TABLE `my_table`
FIELDS TERMINATED BY ',';

START PIPELINE my_pipeline;

This statement creates a new pipeline named my_pipeline, uses a directory as the data source, and will start ingesting data into my_table. For more information about Filesystem Pipelines, see Filesystem Pipelines Overview. For more information on how pipelines can be started and run, see START PIPELINE.

LOAD DATA

The CREATE PIPELINE clause uses standard LOAD DATA syntax options, including its error handling options. For more information, see the LOAD DATA page.

AS LOAD DATA: You can load data by specifying the data source as a Kafka cluster and topic, an S3 bucket or object (with optional prefix), or a file.

  • AS LOAD DATA KAFKA 'topic_endpoint': To use Kafka as a data source, you must specify the endpoint for the Kafka cluster and the path to a specific topic with the cluster. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    
  • AS LOAD DATA S3 'bucket-name': To use S3 as a data source, you must specify a bucket name or a bucket name and object name. For example:

    LOAD DATA S3 'my-bucket-name'
    
  • [BATCH_INTERVAL milliseconds]: You can specify a batch interval in milliseconds, which is the time duration between the end of a batch operation and the start of the next one. If a batch interval is not specified, the default value is 2500. For example:

    LOAD DATA KAFKA '127.0.0.1/my-topic'
    BATCH_INTERVAL 500
    
  • [MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]: Specifies the maximum number of batch partitions that can be scheduled into a single batch. Useful for limiting the parallelism of a pipeline on large clusters to prevent the pipeline from throttling system resources.

  • [AGGREGATOR]: Specifying CREATE AGGREGATOR PIPELINE tells MemSQL to pull data through the aggregator, instead of directly to the leaves. This option can be more efficient for low parallelism pipelines, like single file S3 loads or single partition Kafka topics, and is required for pipelines into reference tables and tables with auto increment columns.

WITH TRANSFORM

Pipeline source data can be transformed by specifying an executable program. The data is transformed after the extraction process and before it is loaded into the database. For more information, see Transforms.

  • WITH TRANSFORM ('uri', 'executable', 'arguments [...]'): Each of the transform’s parameters are required, and they are described below:
  • uri: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either an http:// or file:// endpoint. If the URI points to a tarball with a .tar.gz or .tgz extension, its contents will be automatically extracted. Additionally, the executable parameter must be specified if a the uri is a tarball. If the URI specifies an executable file itself, the executable and arguments parameters can be empty.
  • executable: The filename of the transform executable to run. This parameter is required if a tarball was specified as the endpoint for the transform’s url. If the url itself specifies an executable, this parameter can be empty.
  • arguments: A series of arguments that are passed to the transform executable at runtime. Each argument must be delimited by a space.
WITH TRANSFORM('http://memsql.com/my-transform.py','','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py','')
WITH TRANSFORM('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1')