Pipeline Troubleshooting
On this page
Detect and Address Slow Performance and High Memory Usage of Pipelines
Concepts
This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.
Detect slow performance and high memory usage
Run PROFILE PIPELINE
Run PROFILE PIPELINE to gather resource consumption metrics, such as starting and ending times, for operations that a batch processes.
Query the information_ schema. PIPELINE_ BATCHES
table
The following query returns diagnostics per database, per pipeline and per batch.
SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, BATCH_STATE, BATCH_TIMEFROM information_schema.PIPELINES_BATCHES_SUMMARYORDER BY BATCH_ID;
-
BATCH_
, if set toSTATE Queued
, indicates that the batch cannot extract, shape, or load data until cluster resources are freed. -
A high
BATCH_
value indicates a long amount of time for the batch to extract, shape, and to load the data (combined).TIME
For a description of all of the fields in the information_
table, see the information_
Query the information_ schema. MV_ ACTIVITIES
and information_ schema. MV_ QUERIES
tables
As an alternative to querying the information_
table, as detailed in the previous section, you can run the following query, which provides diagnostic information per pipeline, such as avg_
, avg_
, avg_
, and avg_
.
SELECT substr(replace(replace(MVAC.activity_name,'\n',''), ' ',' '),1,30) AS ACTIVITY_n,substr(replace(replace(query_text,'\n',''), ' ',' '),1,30) AS query_text,database_name AS databaseName,last_finished_timestamp AS last_run,(cpu_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_time_ms,(cpu_wait_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_wait_time_ms,(elapsed_time_ms/(run_count+success_count+failure_count)) AS avg_elapsed_time_ms,(lock_time_ms/(run_count+success_count+failure_count)) AS avg_lock_time_ms,(network_time_ms/(run_count+success_count+failure_count)) AS avg_network_time_ms,(disk_time_ms/(run_count+success_count+failure_count)) AS avg_disk_time_ms,(round((disk_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_io_mb,(round((network_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_network_mb,round((1000*(memory_bs/1024/1024)/(elapsed_time_ms)),2) AS avg_memory_mb,(memory_major_faults/(run_count+success_count+failure_count)) AS avg_major_faults,(run_count+success_count+failure_count) AS total_executionsFROM information_schema.mv_activities_cumulative MVACJOIN information_schema.mv_queries MVQ ON MVQ.activity_name = MVAC.activity_nameWHERE MVAC.activity_name like '%RunPipeline%' ORDER BY avg_elapsed_time_ms DESC LIMIT 10;
Address Slow Performance and High Memory Usage
The following items may help to increase pipeline performance and reduce memory usage.
-
You can decrease the maximum number of partitions that a batch can utilize using ALTER PIPELINE SET MAX_
PARTITIONS_ PER_ BATCH. Decreasing the maximum number of partitions will limit parallelism for large data sources. -
You can increase the batch interval of an existing pipeline using ALTER PIPELINE SET BATCH_
INTERVAL. If you increase the batch interval, data will be loaded from the data source less frequently. -
You can break large files into smaller chunks to increase the ingestion speed.
Large files get parsed on a single leaf, which increases the ingestion time. -
You can add more leaves to your cluster, and then rebalance the cluster, which will result in fewer numbers of partitions per leaf.
-
Are you using a stored procedure with your pipeline? If so, how the stored procedure is implemented can have significant consequences on the memory use and performance of your pipeline.
For details, see Writing Efficient Stored Procedures for Pipelines. -
For low parallelism pipelines, such as single file S3 loads and single partition Kafka topics, you can use an aggregator pipeline by running the
CREATE AGGREGATOR PIPELINE
command (an aggregator pipeline is not required for single-partition Kafka topics).If you are using an existing non-aggregator pipeline, you cannot convert it directly to an aggregator pipeline. Instead, drop your existing pipeline and recreate it as an aggregator pipeline. Caution
If a pipeline is dropped and recreated, the pipeline will start reading from the earliest offset in the data source, once the pipeline is started.
This may cause duplicate records to be inserted into the destination table(s). -
Have queries or stored procedures that you normally run changed recently? If so, check these queries or stored procedures for changes that could be contributing to slow performance and high memory usage.
-
Do you have any uncommitted transactions? These are transactions for which you have run
START TRANSACTION
, but have not runCOMMIT
orROLLBACK
afterwards.For more information, see the Uncommitted Transactions section in ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction. -
If you are using the
IGNORE
orSKIP ALL ERRORS
options in yourCREATE PIPELINE .
statement, that could be causing the pipeline to run slowly, as these options may write large numbers of warnings to the logs.. . LOAD DATA . . . For IGNORE
andSKIP ALL ERRORS
alternatives, see LOAD DATA. -
If possible, avoid using the
ENCLOSED BY
andOPTIONALLY ENCLOSED BY
options in yourCREATE PIPELINE .
statement.. . LOAD DATA . . . -
If your pipeline uses a transform, and implements the transform using an interpreted language such as Python, consider using a compiled language to speed up performance.
-
If your pipeline uses a transform, and it is slow, you may be able to use a
CREATE PIPELINE .
statement instead of using a transform.. . SET . . . WHERE -
Check if removing or moving files that were already processed by the pipeline could improve pipeline performance.
To perform this check, follow these steps: -
Run:
SELECT database_name, pipeline_name, batch_start_unix_timestamp, batch_time - max(batch_partition_time) AS batch_overheadFROM information_schema.PIPELINES_BATCHESGROUP BY database_name, pipeline_name, batch_idORDER BY database_name, pipeline_name, batch_id; -
If
batch_
is increasing asoverhead batch_
increases, try the following:start_ unix_ timestamp -
Run
SELECT * FROM information_
.schema. PIPELINES_ FILES; -
For rows in the output that have the value
LOADED
in theFILE_
column, move or the delete the corresponding files in theSTATE FILE_
column.NAME
-
-
Clearing the information_ schema. pipeline_ files
Table
You can clear the data in the information_
table.
-
To drop a single file and its file record from the
information_
table, use the ALTER PIPELINE .schema. pipelines_ files . . DROP FILE command. For example: ALTER PIPELINE <your_pipeline>; DROP FILE 'pipeline_files'; -
To drop all of the files associated with a pipeline, you can drop and recreate the pipeline.
This will clear all the file records associated with the pipeline. However, the metadata related to the pipeline will also be deleted. If you do need the metadata, copy it to a separate table before recreating the pipeline. To recreate the pipeline: -
Make a note of the pipeline settings to use when recreating the pipeline.
SHOW CREATE PIPELINE <your_pipeline> EXTENDED; -
Stop the pipeline.
STOP PIPELINE <your_pipeline>; -
Drop the pipeline.
DROP PIPELINE <your_pipeline>; -
Recreate the pipeline using the settings obtained in step "a".
-
Start the pipeline.
START PIPELINE <your_pipeline>;
-
High Memory Usage for S3 Pipeline
When using an S3 pipeline, over time, an increase in memory in the md_
may occur.ENABLE OFFSETS METADATA GC
should be added to the CREATE PIPELINE
query block.ALTER PIPELINE
statement with the ENABLE OFFSETS METADATA GC
clause.
See the S3 Pipeline Using Metadata Garbage Collection (GC) section in the CREATE PIPELINE or the ALTER PIPELINE topics.
To check the memory usage use the query below:
SELECT * FROM information_schema.INTERNAL_TABLE_STATISTICS WHERE table_name like "md_extractors_offsets" ORDER BY memory_use DESC;
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+
| DATABASE_NAME | TABLE_NAME | ORDINAL | HOST | PORT | NODE_TYPE | PARTITION_TYPE | ROWS | MEMORY_USE | STORAGE_TYPE | ROWS_IN_MEMORY |
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+
| ticket_test | md_extractors_offsets | NULL | 127.0.0.1 | 3306 | Aggregator | Reference | 2 | 524544 | INTERNAL_METADATA | 2 |
| ticket_test | md_extractors_offsets | NULL | 127.0.0.1 | 3307 | Leaf | Reference | 2 | 524544 | INTERNAL_METADATA | 2 |
| ticket_test | md_extractors_offsets | 1 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 7 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 6 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 5 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 4 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 3 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 2 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
| ticket_test | md_extractors_offsets | 0 | 127.0.0.1 | 3307 | Leaf | Master | 0 | 0 | INTERNAL_METADATA | 0 |
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+
Troubleshoot Pipelines
Concepts
This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.
View pipeline errors
The Data Ingest provide information about pipeline errors that have occurred.
Query the information_ schema. PIPELINES_ ERRORS
table
You can run the following query to show all errors that have occurred, per database, per pipeline, per batch, and per partition.
SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, PARTITION, BATCH_SOURCE_PARTITION_ID,ERROR_KIND, ERROR_CODE, ERROR_MESSAGE, LOAD_DATA_LINE_NUMBER, LOAD_DATA_LINEFROM information_schema.PIPELINES_ERRORS;
Query files that were skipped
The query in the previous section does not show files that were skipped because they had errors.
SELECT * FROM information_schema.PIPELINES_FILES WHERE FILE_STATE = 'Skipped';
If you need additional information, such as the database, the partition, the error that was generated and the line of the error file or object that caused the issue, run the following query.
SELECT pe.DATABASE_NAME, pe.PIPELINE_NAME, pe.BATCH_ID, pe.PARTITION,pe.BATCH_SOURCE_PARTITION_ID, pe.ERROR_TYPE, pe.ERROR_KIND, pe.ERROR_CODE, pe.ERROR_MESSAGE,pe.LOAD_DATA_LINE_NUMBER, pe.LOAD_DATA_LINEFROM information_schema.PIPELINES_FILES pf, information_schema.PIPELINES_ERRORS peWHERE pe.BATCH_SOURCE_PARTITION_ID = pf.FILE_NAME and pf.FILE_STATE = 'Skipped';
Address specific errors
The following table lists errors that can occur when running a pipeline statement, such as CREATE PIPELINE
, and errors that can occur while a pipeline is extracting, shaping, and loading data.
Error |
Resolution |
---|---|
You get a syntax error when running |
Both |
You receive error |
The master aggregator can likely not connect to the pipeline's data source. |
|
The bucket name is case-sensitive. |
Error |
This error can occur when a pipeline attempts to run a transform. 1. 2. 3. |
|
An incorrect path to the transform was likely specified. |
Error: |
This error may occur when the default value (8MB) for the engine variable |
A parsing error occurs in your transform. |
To debug your transform, you can run |
Error: An error that isn't associated with any specific source partition of the pipeline occurred during the batch loading process. The whole batch will be failed.
Issue
The batch loading process was able to load the data from the source, but it failed to ingest the data into the SingleStore database.
Solution
Address the secondary error to solve the issue.
For example, consider the following error:
Error,2790,"An error that isn't associated with any specific source partition of the pipeline
occurred during the batch loading process. The whole batch will be failed.
Error 1205 : ""Leaf Error (svchost:3306): Lock wait timeout exceeded; try restarting transaction.
Unique key Row Value lock owned by connection id xxxx, query `open idle transaction`"""
In this case, the pipelines failed because the query was unable to acquire row locks for ingesting data.
Rename a table referenced by a pipeline
When trying to rename a table that is referenced by a pipeline the following error will result:
ERROR 1945 ER_CANNOT_DROP_REFERENCED_BY_PIPELINE: Cannot rename table because it is referenced by pipeline <pipeline_name>
The following sequence demonstrates how to rename a pipeline referenced table:
-
Save your pipeline settings:
SHOW CREATE PIPELINE <pipeline_name> EXTENDED; -
Stop the pipeline:
STOP PIPELINE <pipeline_name>; -
Drop the pipeline:
DROP PIPELINE <pipeline_name>; -
Change the name of the table:
ALTER TABLE <old_table_name> RENAME <new_table_name>; -
Recreate the pipeline with the settings obtain in step 1 and change the table name to reflect the new table name.
-
Start the pipeline:
START PIPELINE <pipeline_name>;
Pipeline errors that are handled automatically
Typical error handing scenario
In most situations, an error that occurs while a pipeline is running is handled in this way:
If an error occurs while a batch b
is running, then b
will fail and b
's transaction rolls back.b
is retried at most pipelines_
times.pipelines_
is set to ON
, the pipeline stops.nb
,which processes the same files or objects that b
attempted to process, excluding any files or objects that may have caused the error.
The following table lists events, which may or may not cause errors, and how the events are handled.
Event |
How the Event is Handled |
---|---|
The pipeline cannot access a file or object. |
The typical error handling scenario (mentioned earlier in this topic) applies.
|
The pipeline cannot read a file or object because it is corrupted. |
The typical error handling scenario (mentioned earlier in this topic) applies.
After fixing the issue with the corrupted file/object, you can have the pipeline reprocess the file/object by running |
A file or object is removed from the filesystem after the batch has started processing the file/object. |
The batch does not fail; the file or object is processed. |
A file is removed from the filesystem (or an object is removed from an object store) after the pipeline registers the file/object in |
The typical error handling scenario (mentioned earlier in this topic) applies.
|
The cluster restarts while the batch is being processed. |
The typical error handling scenario (mentioned earlier in this topic) applies. Once the cluster is online, |
A leaf node is unavailable before the pipeline starts. |
This does not cause the pipeline to fail. |
A leaf node fails while the pipeline is running. |
The batch fails. |
An aggregator fails while the pipeline is running |
The batch fails. |
The pipeline reaches the allocated storage space for errors. |
The pipeline pauses. |
Additional Information
Refer to the Pipeline Summary and Pipeline Performance dashboards for more information regarding troubleshooting pipeline errors and performance issues.
Last modified: September 12, 2023