Detect and Address Slow Performance and High Memory Usage of Pipelines
Concepts
This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.
Detect slow performance and high memory usage
Run PROFILE PIPELINE
Run PROFILE PIPELINE to gather resource consumption metrics, such as starting and ending times, for operations that a batch processes. Partition specific metrics are not included; to retrieve those see the next section.
Query the information_schema.PIPELINE_BATCHES
table
The following query returns diagnostics per database, per pipeline and per batch.
SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, BATCH_STATE, BATCH_TIME FROM information_schema.PIPELINES_BATCHES_SUMMARY ORDER BY BATCH_ID;
BATCH_STATE
, if set toQueued
, indicates that the batch cannot extract, shape, or load data until cluster resources are freed.A high
BATCH_TIME
value indicates a long amount of time for the batch to extract, shape, and to load the data (combined).
For a description of all of the fields in the information_schema.PIPELINES_BATCHES_SUMMARY
table, see the information_schema.PIPELINE_BATCHES_SUMMARY reference.
Query the information_schema.MV_ACTIVITIES
and information_schema.MV_QUERIES
tables
As an alternative to querying the information_schema.PIPELINE_BATCHES_SUMMARY
table, as detailed in the previous section, you can run the following query, which provides diagnostic information per pipeline, such as avg_cpu_time_ms
, avg_network_time_ms
, avg_disk_time_ms
, and avg_memory_mb
.
SELECT substr(replace(replace(MVAC.activity_name,'\n',''), ' ',' '),1,30) AS ACTIVITY_n, substr(replace(replace(query_text,'\n',''), ' ',' '),1,30) AS query_text, database_name AS databaseName,last_finished_timestamp AS last_run, (cpu_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_time_ms, (cpu_wait_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_wait_time_ms, (elapsed_time_ms/(run_count+success_count+failure_count)) AS avg_elapsed_time_ms, (lock_time_ms/(run_count+success_count+failure_count)) AS avg_lock_time_ms, (network_time_ms/(run_count+success_count+failure_count)) AS avg_network_time_ms, (disk_time_ms/(run_count+success_count+failure_count)) AS avg_disk_time_ms, (round((disk_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_io_mb, (round((network_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_network_mb, round((1000*(memory_bs/1024/1024)/(elapsed_time_ms)),2) AS avg_memory_mb, (memory_major_faults/(run_count+success_count+failure_count)) AS avg_major_faults, (run_count+success_count+failure_count) AS total_executions FROM information_schema.mv_activities_cumulative MVAC JOIN information_schema.mv_queries MVQ ON MVQ.activity_name = MVAC.activity_name WHERE MVAC.activity_name like '%RunPipeline%' ORDER BY avg_elapsed_time_ms DESC LIMIT 10;
Address Slow Performance and High Memory Usage
The following items may help to increase pipeline performance and reduce memory usage.
You can decrease the maximum number of partitions that a batch can utilize using ALTER PIPELINE SET MAX_PARTITIONS_PER_BATCH. Decreasing the maximum number of partitions will limit parallelism for large data sources.
You can increase the batch interval of an existing pipeline using ALTER PIPELINE SET BATCH_INTERVAL. If you increase the batch interval, data will be loaded from the data source less frequently.
You can add more leaves to your cluster, and then rebalance the cluster, which will result in fewer numbers of partitions per leaf.
Are you using a stored procedure with your pipeline? If so, how the stored procedure is implemented can have significant consequences on the memory use and performance of your pipeline. For details, see Writing Efficient Stored Procedures for Pipelines.
For low parallelism pipelines, such as single file S3 loads and single partition Kafka topics, you can use an aggregator pipeline by running the
CREATE AGGREGATOR PIPELINE
command (an aggregator pipeline is not required for single-partition Kafka topics). If you are using an existing non-aggregator pipeline, you cannot convert it directly to an aggregator pipeline. Instead, drop your existing pipeline and recreate it as an aggregator pipeline.Caution
If a pipeline is dropped and recreated, the pipeline will start reading from the earliest offset in the data source, once the pipeline is started. This may cause duplicate records to be inserted into the destination table(s).
Have queries or stored procedures that you normally run changed recently? If so, check these queries or stored procedures for changes that could be contributing to slow performance and high memory usage.
Do you have any uncommitted transactions? These are transactions for which you have run
START TRANSACTION
, but have not runCOMMIT
orROLLBACK
afterwards. For more information, see the Uncommitted Transactions section in ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction.If you are using the
IGNORE
orSKIP ALL ERRORS
options in yourCREATE PIPELINE ... LOAD DATA ...
statement, that could be causing the pipeline to run slowly, as these options may write large numbers of warnings to the logs. ForIGNORE
andSKIP ALL ERRORS
alternatives, see LOAD DATA.If possible, avoid using the
ENCLOSED BY
andOPTIONALLY ENCLOSED BY
options in yourCREATE PIPELINE ... LOAD DATA ...
statement.If your pipeline uses a transform, and implements the transform using an interpreted language such as Python, consider using a compiled language to speed up performance.
If your pipeline uses a transform, and it is slow, you may be able to use a
CREATE PIPELINE ... SET ... WHERE
statement instead of using a transform.Check if removing or moving files that were already processed by the pipeline could improve pipeline performance. To perform this check, follow these steps:
Run:
SELECT database_name, pipeline_name, batch_start_unix_timestamp, batch_time - max(batch_partition_time) AS batch_overhead FROM information_schema.PIPELINES_BATCHES GROUP BY database_name, pipeline_name, batch_id ORDER BY database_name, pipeline_name, batch_id;
If
batch_overhead
is increasing asbatch_start_unix_timestamp
increases, try the following:Run
SELECT * FROM information_schema.PIPELINES_FILES;
.For rows in the output that have the value
LOADED
in theFILE_STATE
column, move or the delete the corresponding files in theFILE_NAME
column.