ALTER PIPELINE
On this page
The ALTER PIPELINE
clause changes an existing pipeline’s configuration.
Syntax
ALTER PIPELINE pipeline_name[SET[OFFSETS[EARLIEST | LATEST | json_source_partition_offset]][BATCH_INTERVAL milliseconds][MAX_PARTITIONS_PER_BATCH max_partitions_per_batch][MAX_OFFSETS_PER_BATCH_PARTITION max_offsets_per_batch_partition][RESOURCE POOL pool_name][TRANSFORM ('uri', ['executable', 'arguments [...]'])]][(ENABLE|DISABLE) OUT_OF_ORDER OPTIMIZATION][(ENABLE|DISABLE) OFFSETS METADATA GC][STOP_ON_ERROR { ON | OFF | NONE}][RELOAD TRANSFORM][DROP {FILE 'filename' | PARTITION 'partition_id' | ORPHAN FILES}][FIELDS | COLUMNS][TERMINATED BY 'string'[[OPTIONALLY] ENCLOSED BY 'char'][ESCAPED BY 'char']][LINES[STARTING BY '<string>'][TERMINATED BY '<string>']]
This command causes implicit commits.
Each of the clauses in a ALTER PIPELINE
statement are described below.
ALTER PIPELINE SET
You can set a pipeline’s offsets, transform, batch interval, or max partitions per batch by using the SET
clause.
ALTER PIPELINE SET OFFSETS
A pipeline’s current starting offset can be altered by using the SET OFFSETS
clause.
SET OFFSETS EARLIEST
: Configures the pipeline to start reading from the earliest (or oldest) available offset in the data source.
ALTER PIPELINE mypipeline SET OFFSETS EARLIEST;
SET OFFSETS LATEST
: Configures the pipeline to start reading from the latest (or newest) available offset in the data source.
ALTER PIPELINE mypipeline SET OFFSETS LATEST;
SET OFFSETS {<file path>:1}
: Configures the pipeline to treat an unloaded or erroring file as if it is already loaded, thereby skipping the file.
ALTER PIPELINE mypipeline SET OFFSETS '{"<path_to_file>":1}';
SET OFFSETS '{"<source-partition>": <partition-offset>}'
: Configures the pipeline to start reading from specific data source partitions and offsets.
-
ALTER PIPELINE SET OFFSETS
only updates the metadata for the source partitions specified in the JSON string, and so it can be used to skip or reload files.The pipeline will start reading data from the specified data source partitions and offsets. Consequently, it will discover and extract data from partitions that are not specified in the ALTER PIPELINE
statement. -
If the data source has more partitions than are specified in the JSON string, only data from the specified offsets will be extracted.
No new offsets from the other partitions will be extracted. -
If the specified source partition doesn’t exist, no data will be extracted and no errors will appear.
However, data in other partitions will be discovered and extracted. The non-existent partition will be present in a row of the information_
table with itsschema. PIPELINES_ CURSORS EARLIEST_
andOFFSET LATEST_
columns set toOFFSET NULL
.
In the following example, the data source has two partitions with IDs of 0
and 1
, and the pipeline will start reading from offset 100
in both partitions.
ALTER PIPELINE mypipeline SET OFFSETS '{"0":100,"1":100}';
Note
When ingesting files into a pipeline, the entire source folder may not have been scanned.information_
view might show only a partial list of files marked as unloaded.ALTER PIPELINE <mypipeline> SET OFFSETS LATEST
command runs, only the files marked as unloaded will load.
To confirm all the files have been marked loaded before starting the pipeline, do the following:
-
Start the pipeline and immediately stop it.
-
Run
ALTER PIPELINE <mypipeline> SET OFFSETS LATEST
. -
Repeat this process until the
information_
has a record which states No Data for that pipeline.schema. PIPELINES_ BATCHES
ALTER PIPELINE SET OFFSETS - KAFKA
Kafka pipelines are the exception, as they do not check for the latest offsets when a pipeline is stopped.
SELECT * FROM information_schema.pipelines_cursors WHERE database_name = 'news';
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | SOURCE_PARTITION_ID | EARLIEST_OFFSET | LATEST_OFFSET | CURSOR_OFFSET | SUCCESSFUL_CURSOR_OFFSET | UPDATED_UNIX_TIMESTAMP | EXTRA_FIELDS |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| news | articles | KAFKA | 0 | 0 | 507147 | 507147 | 507147 | 1682365886.811381 | NULL |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
With the pipeline stopped, run the query ALTER PIPELINE <pipeline_
.
ALTER PIPELINE articles SET OFFSETS LATEST;
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| DATABASE_NAME | PIPELINE_NAME | SOURCE_TYPE | SOURCE_PARTITION_ID | EARLIEST_OFFSET | LATEST_OFFSET | CURSOR_OFFSET | SUCCESSFUL_CURSOR_OFFSET | UPDATED_UNIX_TIMESTAMP | EXTRA_FIELDS |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
| news | articles | KAFKA | 0 | 0 | 507147 | 507147 | 507147 | 1682365886.811381 | NULL |
+---------------+---------------+-------------+---------------------+-----------------+---------------+---------------+--------------------------+------------------------+--------------+
ALTER PIPELINE SET BATCH_ INTERVAL
You can alter the batch interval for an existing pipeline by using the SET BATCH_
clause.BATCH_
syntax that is used when creating a new pipeline.
In the following example, the batch interval of mypipeline
is set to 0
:
ALTER PIPELINE mypipeline SET BATCH_INTERVAL 0;
ALTER PIPELINE SET MAX_ PARTITIONS_ PER_ BATCH
You can alter the maximum number of partitions per batch for an existing pipeline by using the SET MAX_
clause.
In the following example, the maximum number of partitions per batch for mypipeline
is set to 10
:
ALTER PIPELINE mypipeline SET MAX_PARTITIONS_PER_BATCH 10;
ALTER PIPELINE SET MAX_ OFFSETS_ PER_ BATCH_ PARTITION
Use this command to set the maximum number of data source partition offsets to extract in a single batch transaction.pipelines_
, and can be set for a single pipeline.
In the following example, the maximum number of data source partition offsets, per batch partition, and for mypipeline
, is set to 30
:
ALTER PIPELINE mypipeline SET MAX_OFFSETS_PER_BATCH_PARTITION 30;
ALTER PIPELINE SET MAX_ RETRIES_ PER_ BATCH_ PARTITION
You can alter the maximum number of retry attempts, per batch partition, for writing batch partition data to the destination table for an existing pipeline by using the SET MAX_
clause.
In the following example, the maximum number of retry attempts, per batch partition, for mypipeline
is set to 3
:
ALTER PIPELINE mypipeline SET MAX_RETRIES_PER_BATCH_PARTITION 3;
ALTER PIPELINE SET RESOURCE POOL
You can alter the resource pool of a pipeline by using the SET RESOURCE POOL
clause.
ALTER PIPELINE OUT_ OF_ ORDER OPTIMIZATION
By default, records may be inserted out of order if the SingleStore workspace is sufficiently behind in loading records from the source.DISABLE OUT_
clause to CREATE PIPELINE
or ALTER PIPELINE
.
An example of disabling OUT_
follows:
ALTER PIPELINE mypipeline DISABLE OUT_OF_ORDER OPTIMIZATION;
ALTER PIPELINE SET TRANSFORM
You can configure an existing pipeline to use a transform by using the SET TRANSFORM
clause.WITH TRANSFORM
syntax that is used when creating a new pipeline.
The following example transforms the source data of mypipeline
using the URI http://memsql.
, along with the transform parameters my-executable.
and -arg1 -arg1
:
ALTER PIPELINE mypipeline SET TRANSFORM ('http://memsql.com/my-transform-tarball.tar.gz', 'my-executable.py', '-arg1 -arg1');
-
SET TRANSFORM ('uri', ['executable', 'arguments [.
: Each of the transform’s parameters are described below:. . ]']) -
uri
: The transform’s URI is the location from where the executable program can be downloaded, which is specified as either anhttp://
orfile://
endpoint.If the URI points to a tarball with a .
ortar. gz .
extension, its contents will be automatically extracted.tgz Additionally, the executable
parameter must be specified if a theuri
is a tarball.If the URI specifies an executable file itself, the executable
andarguments
parameters are optional. -
executable
: The filename of the transform executable to run.This parameter is required if a tarball was specified as the endpoint for the transform’s url
.If the url
itself specifies an executable, this parameter is optional. -
arguments
: A series of arguments that are passed to the transform executable at runtime.
ALTER PIPELINE RELOAD TRANSFORM
This command will reload the transform from the uri specified when the pipeline was created.
ALTER PIPELINE mypipeline RELOAD TRANSFORM;
You can run ALTER PIPELINE RELOAD TRANSFORM
while the pipeline is running; the transform will reload without you having to stop and restart the pipeline.
ALTER PIPELINE DROP FILE
ALTER PIPELINE .
will cause the pipeline to forget all metadata associated with a given file.
ALTER PIPELINE mypipeline DROP FILE 'my_file';
ALTER PIPELINE .
causes the pipeline to forget all metadata associated with a given kafka partition.
ALTER PIPELINE mypipeline DROP PARTITION '2';
ALTER PIPELINE .
will cause the pipeline to forget all metadata associated with all Unloaded
files.
ALTER PIPELINE mypipeline DROP ORPHAN FILES;
The pipeline will not try to load these files again unless they reappear in the source.Loaded
or Skipped
files; SingleStore will not try to reload such files.
Note
Monitor the information_
table.
ALTER PIPELINE Format Options for Parsing the Input File
You can modify how a pipeline parses the input file:
FIELDS TERMINATED BY 'string'
: You can modify the field and column delimiters.
ALTER PIPELINE mypipeline FIELDS TERMINATED BY ',';
FIELDS ENCLOSED BY 'char'
: You can modify the string that encloses the field values.OPTIONALLY
keyword does not affect the behavior of this option; it exists to maintain compatibility with MySQL.
ALTER PIPELINE mypipeline FIELDS ENCLOSED BY '"';
FIELDS ESCAPED BY 'char'
: You can modify the escape character.
ALTER PIPELINE mypipeline FIELDS ESCAPED BY '\\';
LINES TERMINATED BY 'string'
: You can modify the line delimiters.
ALTER PIPELINE mypipeline LINES TERMINATED BY '\r\n';
LINES STARTING BY 'string'
: You can modify the common prefix in the input lines of a pipeline that you want to ignore.
ALTER PIPELINE mypipeline LINES STARTING BY '###';
PIPELINE ENABLE or DISABLE OFFSETS METADATA GARBAGE COLLECTION (GC)
If the OFFSETS METADATA GC
clause was enabled during CREATE PIPELINE
, it may be disabled by using the ALTER PIPELINE
command.ALTER PIPELINE
as well.
To disable:
ALTER PIPELINE <pipeline_name> DISABLE OFFSETS METADATA GC;
To enable:
ALTER PIPELINE <pipeline_name> ENABLE OFFSETS METADATA GC;
Note
This option is only supported for S3 pipelines.
ALTER PIPELINE STOP ON ERROR
When STOP_
is enabled it will cause a pipeline to stop when an error occurs.
ALTER PIPELINE <pipeline_name> SET STOP_ON_ERROR ON/OFF;
To let the engine variable to take control set STOP_
to NONE.
ALTER PIPELINE <pipeline_name> SET STOP_ON_ERROR NONE;
Remarks
-
Refer to the Permission Matrix for the required permission.
Last modified: November 12, 2024