ALTER PIPELINE

The ALTER PIPELINE clause changes an existing pipeline’s configuration.

Syntax

ALTER PIPELINE pipeline_name
[SET
[OFFSETS
[EARLIEST | LATEST | json_source_partition_offset]
]
[BATCH_INTERVAL milliseconds]
[MAX_PARTITIONS_PER_BATCH max_partitions_per_batch]
[RESOURCE POOL pool_name]
[TRANSFORM ('uri', ['executable', 'arguments [...]'])]
]
[(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION]
[RELOAD TRANSFORM]
[DROP {FILE 'filename' | PARTITION 'partition_id' | ORPHAN FILES}]
[FIELDS | COLUMNS]
[TERMINATED BY 'string'
[[OPTIONALLY] ENCLOSED BY 'char']
[ESCAPED BY 'char']
]
[LINES
[STARTING BY '<string>']
[TERMINATED BY '<string>']
]

This command causes implicit commits. Refer to COMMIT for more information.

Each of the clauses in a ALTER PIPELINE statement are described below.

ALTER PIPELINE SET

You can set a pipeline’s offsets, transform, batch interval, or max partitions per batch by using the SET clause.

ALTER PIPELINE SET OFFSETS

A pipeline’s current starting offset can be altered by using the SET OFFSETS clause. When a new offset is set, the pipeline will begin extracting data from the specified offset, regardless of any previous offsets that have or have not been extracted. There are three offset options:

SET OFFSETS EARLIEST: Configures the pipeline to start reading from the earliest (or oldest) available offset in the data source. For example,

ALTER PIPELINE mypipeline SET OFFSETS EARLIEST;

SET OFFSETS LATEST: Configures the pipeline to start reading from the latest (or newest) available offset in the data source. For example,

ALTER PIPELINE mypipeline SET OFFSETS LATEST;

SET OFFSETS {<file path>:1}: Configures the pipeline to treat an unloaded or erroring file as if it is already loaded, thereby skipping the file.

ALTER PIPELINE mypipeline SET OFFSETS '{"<path_to_file>":1}';

SET OFFSETS '{"<source-partition>": <partition-offset>}': Configures the pipeline to start reading from specific data source partitions and offsets. When you manually specify which source partition and offset to start extracting from, there are a few important things to consider:

  • ALTER PIPELINE SET OFFSETS only updates the metadata for the source partitions specified in the JSON string, and so it can be used to skip or reload files. The pipeline will start reading data from the specified data source partitions and offsets. Consequently, it will discover and extract data from partitions that are not specified in the ALTER PIPELINE statement.

  • If the data source has more partitions than are specified in the JSON string, only data from the specified offsets will be extracted. No new offsets from the other partitions will be extracted.

  • If the specified source partition doesn’t exist, no data will be extracted and no errors will appear. However, data in other partitions will be discovered and extracted. The non-existent partition will be present in a row of the information_schema.PIPELINES_CURSORS table with its EARLIEST_OFFSET and LATEST_OFFSETcolumns set to NULL.

In the following example, the data source has two partitions with IDs of 0 and 1, and the pipeline will start reading from offset 100 in both partitions.

ALTER PIPELINE mypipeline SET OFFSETS '{"0":100,"1":100}';

Note

When ingesting files into a pipeline, the entire source folder may not have been scanned. If the folder contains many files, it is possible that the information_schema.PIPELINES_FILES view might show only a partial list of files marked as unloaded. Once the ALTER PIPELINE <mypipeline> SET OFFSETS LATEST command runs, only the files marked as unloaded will load. Once the pipeline starts, it will scan the folder for additional files and start loading them.

To confirm all the files have been marked loaded before starting the pipeline, do the following:

  1. Start the pipeline and immediately stop it.

  2. Run ALTER PIPELINE <mypipeline> SET OFFSETS LATEST.

  3. Repeat this process until the information_schema.PIPELINES_BATCHES has a record which states No Data for that pipeline.

ALTER PIPELINE SET OFFSETS - KAFKA

Kafka pipelines are the exception, as they do not check for the latest offsets when a pipeline is stopped. Offsets will not appear for files added after the pipeline is stopped.

SELECT * FROM information_schema.pipelines_cursors WHERE database_name = 'news';
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | SOURCE_PARTITION_ID | EARLIEST_OFFSET | LATEST_OFFSET | CURSOR_OFFSET | SUCCESSFUL_CURSOR_OFFSET | UPDATED_UNIX_TIMESTAMP | EXTRA_FIELDS |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| news          | articles      | KAFKA       | 0                   |               0 |        507147 |        507147 |                   507147 |      1682365886.811381 | NULL         |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+

With the pipeline stopped, run the query ALTER PIPELINE <pipeline_name> SET OFFSETS LATEST. The offset count will not change.

ALTER PIPELINE articles SET OFFSETS LATEST;
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | SOURCE_PARTITION_ID | EARLIEST_OFFSET | LATEST_OFFSET | CURSOR_OFFSET | SUCCESSFUL_CURSOR_OFFSET | UPDATED_UNIX_TIMESTAMP | EXTRA_FIELDS |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| news          | articles      | KAFKA       | 0                   |               0 |        507147 |        507147 |                   507147 |      1682365886.811381 | NULL         |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+

ALTER PIPELINE SET BATCH_INTERVAL

You can alter the batch interval for an existing pipeline by using the SET BATCH_INTERVAL clause. A batch interval is the amount of time (in milliseconds) that the pipeline waits before checking the data source for new data, once all of the existing data has been loaded from the data source. The syntax for setting a batch interval is identical to the BATCH_INTERVAL syntax that is used when creating a new pipeline.

In the following example, the batch interval of mypipeline is set to 0:

ALTER PIPELINE mypipeline SET BATCH_INTERVAL 0;

ALTER PIPELINE SET MAX_PARTITIONS_PER_BATCH

You can alter the maximum number of partitions per batch for an existing pipeline by using the SET MAX_PARTITIONS_PER_BATCH clause. This can be useful for limiting parallelism on large clusters, or to reduce throttling by data sources.

In the following example, the maximum number of partitions per batch for mypipeline is set to 10:

ALTER PIPELINE mypipeline SET MAX_PARTITIONS_PER_BATCH 10;

ALTER PIPELINE SET MAX_RETRIES_PER_BATCH_PARTITION

You can alter the maximum number of retry attempts, per batch partition, for writing batch partition data to the destination table for an existing pipeline by using the SET MAX_RETRIES_PER_BATCH_PARTITION clause.

In the following example, the maximum number of retry attempts, per batch partition, for mypipeline is set to 3:

ALTER PIPELINE mypipeline SET MAX_RETRIES_PER_BATCH_PARTITION 3;

ALTER PIPELINE SET RESOURCE POOL

You can alter the resource pool of a pipeline by using the SET RESOURCE POOL clause. The user who alters the pipeline must have permissions to use the resource pool.

For more information on resource pools, see Set Resource Limits.

ALTER PIPELINE OUT_OF_ORDER OPTIMIZATION

By default, records may be inserted out of order if the SingleStore cluster is sufficiently behind in loading records from the source. If you require the records to be inserted in order, e.g. in upsert scenarios, pass the DISABLE OUT_OF_ORDER OPTIMIZATION clause to CREATE PIPELINE or ALTER PIPELINE. When this optimization is disabled, records from the same source file or partition will be inserted in the order in which they appear in that partition. However, records from different files or partitions may still be inserted in an arbitrary order with respect to each other.

An example of disabling OUT_OF_ORDER OPTIMIZATION follows:

ALTER PIPELINE mypipeline DISABLE OUT_OF_ORDER OPTIMIZATION;

ALTER PIPELINE SET TRANSFORM

You can configure an existing pipeline to use a transform by using the SET TRANSFORM clause. The syntax for applying a transform to a pipeline is identical to the WITH TRANSFORM syntax that is used when creating a new pipeline.

The following example transforms the source data of mypipeline using the URI http://memsql.com/my-transform-tarball.tar.gz, along with the transform parameters my-executable.py and -arg1 -arg1:

ALTER PIPELINE mypipeline SET TRANSFORM ('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1');
  • SET TRANSFORM ('uri', ['executable', 'arguments [...]']): Each of the transform’s parameters are described below:

  • uri: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either an http:// or file:// endpoint. If the URI points to a tarball with a .tar.gz or .tgz extension, its contents will be automatically extracted. Additionally, the executable parameter must be specified if a the uri is a tarball. If the URI specifies an executable file itself, the executable and arguments parameters are optional.

  • executable: The filename of the transform executable to run. This parameter is required if a tarball was specified as the endpoint for the transform’s url. If the url itself specifies an executable, this parameter is optional.

  • arguments: A series of arguments that are passed to the transform executable at runtime.

ALTER PIPELINE RELOAD TRANSFORM

This command will reload the transform from the uri specified when the pipeline was created. This will redeploy the latest transform code hosted at that uri. For example,

ALTER PIPELINE mypipeline RELOAD TRANSFORM;

You can run ALTER PIPELINE RELOAD TRANSFORM while the pipeline is running; the transform will reload without you having to stop and restart the pipeline.

ALTER PIPELINE DROP FILE

ALTER PIPELINE ... DROP FILE will cause the pipeline to forget all metadata associated with a given file.

ALTER PIPELINE mypipeline DROP FILE 'my_file';

ALTER PIPELINE ... DROP PARTITION causes the pipeline to forget all metadata associated with a given kafka partition.

ALTER PIPELINE mypipeline DROP PARTITION '2';

ALTER PIPELINE ... DROP ORPHAN FILES will cause the pipeline to forget all metadata associated with all Unloaded files.

ALTER PIPELINE mypipeline DROP ORPHAN FILES;

The pipeline will not try to load these files again unless they reappear in the source. Use this command to instruct a pipeline that some files in the source have been removed, and to not try to load them. This command will not forget metadata associated with already Loaded or Skipped files; SingleStore will not try to reload such files.

Note

Monitor the information_schema.pipelines_files table. It stores data for the duration of the pipeline’s lifetime, resulting in a large number of files referenced in the table. If the data in this table is not required, you can either drop a file record from the table or clear all the file records from it.

ALTER PIPELINE Format Options for Parsing the Input File

You can modify how a pipeline parses the input file:

FIELDS TERMINATED BY 'string': You can modify the field and column delimiters. The following example loads the data from the input where fields are separated by commas.

ALTER PIPELINE mypipeline FIELDS TERMINATED BY ',';

FIELDS ENCLOSED BY 'char': You can modify the string that encloses the field values. The OPTIONALLY keyword does not affect the behavior of this option; it exists to maintain compatibility with MySQL. The following example loads the data from the input where fields are enclosed in double quotes.

ALTER PIPELINE mypipeline FIELDS ENCLOSED BY '"';

FIELDS ESCAPED BY 'char': You can modify the escape character. For example, \ (backslash) is the default escape character in a SQL query. Hence, the \\ (double backslash) is used to escape the backslash itself.

ALTER PIPELINE mypipeline FIELDS ESCAPED BY '\\';

LINES TERMINATED BY 'string': You can modify the line delimiters. In the following example, the lines in the input file are terminated by carriage return/newline pairs.

ALTER PIPELINE mypipeline LINES TERMINATED BY '\r\n';

LINES STARTING BY 'string': You can modify the common prefix in the input lines of a pipeline that you want to ignore. In the following example, the prefix ### is skipped:

ALTER PIPELINE mypipeline LINES STARTING BY '###';

Remarks

Last modified: November 12, 2024

Was this article helpful?