Troubleshoot Pipeline Performance and Memory Usage
On this page
Concepts
This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.
Detect Slow Performance and High Memory Usage
Run PROFILE PIPELINE
Run PROFILE PIPELINE to gather resource consumption metrics, such as starting and ending times, for operations that a batch processes.
Query the information_ schema. PIPELINE_ BATCHES_ SUMMARY
View
The following query returns diagnostics for each batch associated with a pipeline that writes to a database.
SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, BATCH_STATE, BATCH_TIMEFROM information_schema.PIPELINES_BATCHES_SUMMARYORDER BY BATCH_ID;
-
BATCH_
: If theSTATE BATCH_
of a batch isSTATE Queued
, it indicates that the batch cannot extract, shape, or load data until cluster resources are freed. -
A high
BATCH_
value indicates a long amount of time for the batch to extract, shape, and to load the data (combined).TIME
For complete reference and description of all the fields, refer to information_
Query the information_ schema. MV_ ACTIVITIES
and information_ schema. MV_ QUERIES
Views
As an alternative to querying the information_
table, as detailed in the previous section, you can run the following query, which provides diagnostic information per pipeline, such as avg_
, avg_
, avg_
, and avg_
.
SELECT substr(replace(replace(MVAC.activity_name,'\n',''), ' ',' '),1,30) AS ACTIVITY_n,substr(replace(replace(query_text,'\n',''), ' ',' '),1,30) AS query_text,database_name AS databaseName,last_finished_timestamp AS last_run,(cpu_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_time_ms,(cpu_wait_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_wait_time_ms,(elapsed_time_ms/(run_count+success_count+failure_count)) AS avg_elapsed_time_ms,(lock_time_ms/(run_count+success_count+failure_count)) AS avg_lock_time_ms,(network_time_ms/(run_count+success_count+failure_count)) AS avg_network_time_ms,(disk_time_ms/(run_count+success_count+failure_count)) AS avg_disk_time_ms,(round((disk_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_io_mb,(round((network_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_network_mb,round((1000*(memory_bs/1024/1024)/(elapsed_time_ms)),2) AS avg_memory_mb,(memory_major_faults/(run_count+success_count+failure_count)) AS avg_major_faults,(run_count+success_count+failure_count) AS total_executionsFROM information_schema.mv_activities_cumulative MVACJOIN information_schema.mv_queries MVQ ON MVQ.activity_name = MVAC.activity_nameWHERE MVAC.activity_name like '%RunPipeline%' ORDER BY avg_elapsed_time_ms DESC LIMIT 10;
Address Slow Performance and High Memory Usage
Performing the following tasks may improve the pipeline performance and reduce memory usage.
-
Decrease the maximum number of partitions that a batch can utilize using the ALTER PIPELINE SET MAX_
PARTITIONS_ PER_ BATCH command. Decreasing the maximum number of partitions will limit parallelism for large data sources. -
Increase the batch interval of an existing pipeline using the ALTER PIPELINE SET BATCH_
INTERVAL command. If you increase the batch interval, data will be loaded from the data source less frequently. -
Break large files into smaller chunks to increase the ingestion speed.
Large files get parsed on a single leaf, which increases the ingestion time. -
Add more leaves to your cluster, and then rebalance the cluster, which will result in fewer numbers of partitions per leaf.
-
Are you using a stored procedure with your pipeline? If so, the stored procedure implementation can significantly affect the memory usage and pipeline performance.
For details, refer to Writing Efficient Stored Procedures for Pipelines. -
For low parallelism pipelines, such as single file S3 loads and single partition Kafka topics, you can use an aggregator pipeline by running the
CREATE AGGREGATOR PIPELINE
command (an aggregator pipeline is not required for single-partition Kafka topics).If you are using an existing non-aggregator pipeline, you cannot convert it directly to an aggregator pipeline. Instead, drop your existing pipeline and recreate it as an aggregator pipeline. Caution
If a pipeline is dropped and recreated, the pipeline will start reading from the earliest offset in the data source, once the pipeline is started.
This may cause duplicate records to be inserted into the destination table(s). -
Have queries or stored procedures that you normally run changed recently? If so, check these queries or stored procedures for changes that could be contributing to slow performance and high memory usage.
-
Do you have any uncommitted transactions? These are transactions where
START TRANSACTION
is run, but neitherCOMMIT
norROLLBACK
was run afterwards.For information on resolving issues with uncommitted transactions, refer to Uncommitted Transactions section in ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction. -
Using the
IGNORE
orSKIP ALL ERRORS
options in theCREATE PIPELINE .
statement may cause the pipeline to run slowly, because these options write large numbers of warnings to the logs.. . LOAD DATA . . . For IGNORE
andSKIP ALL ERRORS
alternatives, see LOAD DATA. -
If possible, avoid using the
ENCLOSED BY
andOPTIONALLY ENCLOSED BY
options in yourCREATE PIPELINE .
statement.. . LOAD DATA . . . -
If your pipeline uses a transform, and implements the transform using an interpreted language such as Python, consider using a compiled language to improve the performance.
-
If your pipeline uses a transform, and it is slow, you may be able to use a
CREATE PIPELINE .
statement instead of using a transform.. . SET . . . WHERE -
Check if removing or moving files that were already processed by the pipeline improves pipeline performance.
To perform this check, follow these steps: -
Run:
SELECT database_name, pipeline_name, batch_start_unix_timestamp, batch_time - max(batch_partition_time) AS batch_overheadFROM information_schema.PIPELINES_BATCHESGROUP BY database_name, pipeline_name, batch_idORDER BY database_name, pipeline_name, batch_id; -
If
batch_
is increasing asoverhead batch_
increases, try the following steps:start_ unix_ timestamp -
Run
SELECT * FROM information_
.schema. PIPELINES_ FILES; -
For rows in the output that have the value
LOADED
in theFILE_
column, move or the delete the corresponding files in theSTATE FILE_
column.NAME
-
-
Clear the information_ schema. PIPELINE_ FILES
View
You can clear the data in the information_
table.
-
To drop a single file and its file record from the
information_
table, use the ALTER PIPELINE .schema. PIPELINES_ FILES . . DROP FILE command. For example: ALTER PIPELINE <your_pipeline> DROP FILE 'pipeline_files'; -
To drop all of the files associated with a pipeline, you can drop and recreate the pipeline.
This clears all the file records associated with the pipeline. However, the metadata related to the pipeline will also be deleted. If you do need the metadata, copy it to a separate table before recreating the pipeline. To recreate the pipeline: -
Inspect and note the pipeline settings to use when recreating the pipeline.
SHOW CREATE PIPELINE <your_pipeline> EXTENDED; -
Stop the pipeline.
STOP PIPELINE <your_pipeline>; -
Drop the pipeline.
DROP PIPELINE <your_pipeline>; -
Recreate the pipeline using the required configuration options.
-
Start the pipeline.
START PIPELINE <your_pipeline>;
-
Last modified: October 8, 2024