PIPELINES

This view stores high-level information about any pipelines that have been created in the cluster. Each row represents a single pipeline.

information_schema.PIPELINES Schema

Column Name

Description

DATABASE_NAME

The name of the database associated with the pipeline.

PIPELINE_NAME

The name of the pipeline.

PIPELINE_ID

The unique ID of the pipeline.

CONFIG_JSON

The pipeline’s configuration in JSON format. This JSON is read only, and it’s automatically generated when your pipeline configuration changes. The JSON schema for this column is described in CONFIG_JSON Schema, and is intended for use in a Web-based application (such as MemSQL Ops with SingleStore versions earlier than 7.5).

STATE

The current state of the pipeline. Possible values are Running, Error, and Stopped. Running: The pipeline is currently running. If the pipelines_stop_on_error variable is set to ON, the pipeline has not encountered any errors during extraction, transformation, or loading. Error: The pipeline encountered an error and is currently stopped. When a pipeline is in the Error state, it must be manually started.If the pipelines_stop_on_error variable is set to OFF, a pipeline cannot enter the Error state. The pipeline will remain in the Running state until it’s manually stopped, and any errors that occur will be written to the information_schema.PIPELINES_ERRORS table. Stopped: The pipeline is currently stopped. The pipeline can only enter the Stopped state due to manual intervention.

SKIPPED_BATCH_PARTITIONS

The sum of the number of batches that have been skipped in the pipeline. Batches may be skipped if the maximum number of batch retries was reached, which is set using the pipelines_max_retries_per_batch_partition variable.

CREATE_TIME

The date/time when the pipeline was created or recreated by the CREATE PIPELINE or CREATE OR REPLACE PIPELINE statements.

ALTER_TIME

The date/time when the pipeline was altered by an ALTER PIPELINE statement.

CONFIG_JSON Schema

The CONFIG_JSON column in the information_schema.PIPELINES table contains a fixed set of read-only JSON key/value pairs. Some of these JSON values can also be seen by executing the SHOW CREATE PIPELINE <pipeline-name> statement. Each key/value pair is described below.

Example CONFIG_JSON for Kafka Pipelines

{
"name":"mypipeline",
"source_type":"KAFKA",
"credentials":"172.17.0.2\/test-topic",
"batch_interval":0,
"transform":["http:\/\/127.0.0.1:8000\/transform.py","",""],
"dup_key_policy":null,
"table":"messages",
"fields_terminated_by":"\t",
"fields_enclosed_by":"",
"fields_escaped_by":"\\",
"lines_terminated_by":"\n",
"lines_starting_by":"",
"extended_null":false,
"column_list":null,
"on_duplicate_key_update":null,
"running":false
}

Example CONFIG_JSON for S3 Pipelines

{
"name": "my-s3-pipeline",
"source_type": "S3",
"connection_string": "my-s3-bucket-name",
"config": "{\"region\": \"us-west-1\"}",
"credentials": "<CREDENTIALS REDACTED>",
"batch_interval": 2500,
"max_partitions_per_batch": -1,
"transform": null,
"load_error_policy": null,
"dup_key_policy": null,
"table": "my_table_name",
"fields_terminated_by": ",",
"fields_enclosed_by": "",
"fields_escaped_by": "\\",
"lines_terminated_by": "\n",
"lines_starting_by": "",
"extended_null": false,
"column_list": null,
"on_duplicate_key_update": null
}

CONFIG_JSON Schema Definition

Key Name

Value Description

name

The name of the pipeline.

source_type

The data source type for the pipeline.

connection_string

The name of the S3 bucket or bucket’s object with optional prefix.

config

The configuration information provided when creating an S3 pipeline, namely the region where the source bucket is hosted.

credentials

Either the Kafka topic URL for the pipeline or <CREDENTIALS REDACTED> for an S3 pipeline.

batch_interval

The time duration in milliseconds between batch extraction operations.

transform

The transform’s URI, executable entry point, and arguments.

load_error_policy

The load error policy for the pipeline. For example, if IGNORE or SKIP ... ERRORS was specified during pipeline creation, they will appear as a JSON key/value pair like so: {"load_error_policy": "skip_all_errors"}

dup_key_policy

The duplicate key policy that indicates how a row should be inserted if it contains a duplicate key value.

table

The name of the table in which to insert data.

fields_terminated_by

The character that terminates a field.

fields_enclosed_by

The character that encloses a field.

fields_escaped_by

The character that escapes a field.

lines_terminated_by

The character that terminates a line.

lines_starting_by

The string prefix for a line.

extended_null

Specifies whether the non-quoted and case-insensitive string null will be loaded as a null type.

column_list

The column list to load data into.

on_duplicate_key_update

Specifies whether duplicate keys will be updated or ignored.

running

Specifies whether the pipeline is currently running. Current state of the pipeline is either running, testing, profiling, running...foreground, error, or stopped.

Last modified: June 2, 2023

Was this article helpful?