Pipeline Troubleshooting

Detect and Address Slow Performance and High Memory Usage of Pipelines

Concepts

This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.

Detect slow performance and high memory usage

Run PROFILE PIPELINE

Run PROFILE PIPELINE to gather resource consumption metrics, such as starting and ending times, for operations that a batch processes. Partition specific metrics are not included; to retrieve those see the next section.

Query the information_schema.PIPELINE_BATCHES table

The following query returns diagnostics per database, per pipeline and per batch.

SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, BATCH_STATE, BATCH_TIME
FROM information_schema.PIPELINES_BATCHES_SUMMARY
ORDER BY BATCH_ID;
  • BATCH_STATE , if set to Queued, indicates that the batch cannot extract, shape, or load data until cluster resources are freed.

  • A high BATCH_TIME value indicates a long amount of time for the batch to extract, shape, and to load the data (combined).

For a description of all of the fields in the information_schema.PIPELINES_BATCHES_SUMMARY table, see the information_schema.PIPELINE_BATCHES_SUMMARY reference.

Query the information_schema.MV_ACTIVITIES and information_schema.MV_QUERIES tables

As an alternative to querying the information_schema.PIPELINE_BATCHES_SUMMARY table, as detailed in the previous section, you can run the following query, which provides diagnostic information per pipeline, such as avg_cpu_time_ms, avg_network_time_ms, avg_disk_time_ms, and avg_memory_mb.

SELECT substr(replace(replace(MVAC.activity_name,'\n',''), ' ',' '),1,30) AS ACTIVITY_n,
substr(replace(replace(query_text,'\n',''), ' ',' '),1,30) AS query_text,
database_name AS databaseName,last_finished_timestamp AS last_run,
(cpu_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_time_ms,
(cpu_wait_time_ms/(run_count+success_count+failure_count)) AS avg_cpu_wait_time_ms,
(elapsed_time_ms/(run_count+success_count+failure_count)) AS avg_elapsed_time_ms,
(lock_time_ms/(run_count+success_count+failure_count)) AS avg_lock_time_ms,
(network_time_ms/(run_count+success_count+failure_count)) AS avg_network_time_ms,
(disk_time_ms/(run_count+success_count+failure_count)) AS avg_disk_time_ms,
(round((disk_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_io_mb,
(round((network_b/1024/1024),2)/(run_count+success_count+failure_count)) AS avg_network_mb,
round((1000*(memory_bs/1024/1024)/(elapsed_time_ms)),2) AS avg_memory_mb,
(memory_major_faults/(run_count+success_count+failure_count)) AS avg_major_faults,
(run_count+success_count+failure_count) AS total_executions
FROM information_schema.mv_activities_cumulative MVAC
JOIN information_schema.mv_queries MVQ ON MVQ.activity_name = MVAC.activity_name
WHERE MVAC.activity_name like '%RunPipeline%' ORDER BY avg_elapsed_time_ms DESC LIMIT 10;

Address Slow Performance and High Memory Usage

The following items may help to increase pipeline performance and reduce memory usage.

  • You can decrease the maximum number of partitions that a batch can utilize using ALTER PIPELINE SET MAX_PARTITIONS_PER_BATCH. Decreasing the maximum number of partitions will limit parallelism for large data sources.

  • You can increase the batch interval of an existing pipeline using ALTER PIPELINE SET BATCH_INTERVAL. If you increase the batch interval, data will be loaded from the data source less frequently.

  • You can add more leaves to your cluster, and then rebalance the cluster, which will result in fewer numbers of partitions per leaf.

  • Are you using a stored procedure with your pipeline? If so, how the stored procedure is implemented can have significant consequences on the memory use and performance of your pipeline. For details, see Writing Efficient Stored Procedures for Pipelines.

  • For low parallelism pipelines, such as single file S3 loads and single partition Kafka topics, you can use an aggregator pipeline by running the CREATE AGGREGATOR PIPELINE command (an aggregator pipeline is not required for single-partition Kafka topics). If you are using an existing non-aggregator pipeline, you cannot convert it directly to an aggregator pipeline. Instead, drop your existing pipeline and recreate it as an aggregator pipeline.

    Caution

    If a pipeline is dropped and recreated, the pipeline will start reading from the earliest offset in the data source, once the pipeline is started. This may cause duplicate records to be inserted into the destination table(s).

  • Have queries or stored procedures that you normally run changed recently? If so, check these queries or stored procedures for changes that could be contributing to slow performance and high memory usage.

  • Do you have any uncommitted transactions? These are transactions for which you have run START TRANSACTION, but have not run COMMIT or ROLLBACK afterwards. For more information, see the Uncommitted Transactions section in ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction.

  • If you are using the IGNORE or SKIP ALL ERRORS options in your CREATE PIPELINE ... LOAD DATA ... statement, that could be causing the pipeline to run slowly, as these options may write large numbers of warnings to the logs. For IGNORE and SKIP ALL ERRORS alternatives, see LOAD DATA.

  • If possible, avoid using the ENCLOSED BY and OPTIONALLY ENCLOSED BY options in your CREATE PIPELINE ... LOAD DATA ... statement.

  • If your pipeline uses a transform, and implements the transform using an interpreted language such as Python, consider using a compiled language to speed up performance.

  • If your pipeline uses a transform, and it is slow, you may be able to use a CREATE PIPELINE ... SET ... WHERE statement instead of using a transform.

  • Check if removing or moving files that were already processed by the pipeline could improve pipeline performance. To perform this check, follow these steps:

    1. Run:

      SELECT database_name, pipeline_name, batch_start_unix_timestamp, batch_time - max(batch_partition_time) AS batch_overhead
      FROM information_schema.PIPELINES_BATCHES
      GROUP BY database_name, pipeline_name, batch_id
      ORDER BY database_name, pipeline_name, batch_id;
    2. If batch_overhead is increasing as batch_start_unix_timestamp increases, try the following:

      1. Run SELECT * FROM information_schema.PIPELINES_FILES;.

      2. For rows in the output that have the value LOADED in the FILE_STATE column, move or the delete the corresponding files in the FILE_NAME column.

High Memory Usage for S3 Pipeline

When using an S3 pipeline, over time, an increase in memory in the md_extractors_offsets may occur. The continued increase in memory use can eventually lead to Out-of-Memory conditions and can impact performance. By default the pipeline garbage collector (GC) for S3 is not enabled. The optional clause ENABLE OFFSETS METADATA GC should be added to the CREATE PIPELINE query block. To enable pipeline garbage collection on an existing pipeline, use the ALTER PIPELINE statement with the ENABLE OFFSETS METADATA GC clause.

See the S3 Pipeline Using Metadata Garbage Collection (GC) section in the CREATE PIPELINE or the ALTER PIPELINE topics.

To check the memory usage use the query below:

SELECT * FROM information_schema.INTERNAL_TABLE_STATISTICS WHERE table_name like "md_extractors_offsets" ORDER BY memory_use DESC;
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+
| DATABASE_NAME | TABLE_NAME            | ORDINAL | HOST      | PORT | NODE_TYPE  | PARTITION_TYPE | ROWS | MEMORY_USE | STORAGE_TYPE      | ROWS_IN_MEMORY |
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+
| ticket_test   | md_extractors_offsets |    NULL | 127.0.0.1 | 3306 | Aggregator | Reference      |    2 |     524544 | INTERNAL_METADATA |              2 |
| ticket_test   | md_extractors_offsets |    NULL | 127.0.0.1 | 3307 | Leaf       | Reference      |    2 |     524544 | INTERNAL_METADATA |              2 |
| ticket_test   | md_extractors_offsets |       1 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       7 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       6 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       5 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       4 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       3 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       2 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
| ticket_test   | md_extractors_offsets |       0 | 127.0.0.1 | 3307 | Leaf       | Master         |    0 |          0 | INTERNAL_METADATA |              0 |
+---------------+-----------------------+---------+-----------+------+------------+----------------+------+------------+-------------------+----------------+

Troubleshoot Pipelines

Concepts

This topic requires an understanding of pipeline batches, which are explained in The Lifecycle of a Pipeline.

View pipeline errors

The Pipelines provide information about pipeline errors that have occurred. Some useful queries against these tables are provided in this section.

Query the information_schema.PIPELINES_ERRORS table

You can run the following query to show all errors that have occurred, per database, per pipeline, per batch, and per partition.

SELECT DATABASE_NAME, PIPELINE_NAME, BATCH_ID, PARTITION, BATCH_SOURCE_PARTITION_ID,
ERROR_KIND, ERROR_CODE, ERROR_MESSAGE, LOAD_DATA_LINE_NUMBER, LOAD_DATA_LINE
FROM information_schema.PIPELINES_ERRORS;

Query files that were skipped

The query in the previous section does not show files that were skipped because they had errors. To return such files that were skipped per database and per pipeline (but not per batch nor per partition), run the following query.

SELECT * FROM information_schema.PIPELINES_FILES WHERE FILE_STATE = 'Skipped';

If you need additional information, such as the database, the partition, the error that was generated and the line of the error file or object that caused the issue, run the following query.

SELECT pe.DATABASE_NAME, pe.PIPELINE_NAME, pe.BATCH_ID, pe.PARTITION,
pe.BATCH_SOURCE_PARTITION_ID, pe.ERROR_TYPE, pe.ERROR_KIND, pe.ERROR_CODE, pe.ERROR_MESSAGE,
pe.LOAD_DATA_LINE_NUMBER, pe.LOAD_DATA_LINE
FROM information_schema.PIPELINES_FILES pf, information_schema.PIPELINES_ERRORS pe
WHERE pe.BATCH_SOURCE_PARTITION_ID = pf.FILE_NAME and pf.FILE_STATE = 'Skipped';

Address specific errors

The following table lists errors that can occur when running a pipeline statement, such as CREATE PIPELINE, and errors that can occur while a pipeline is extracting, shaping, and loading data.

Error

Resolution

You get a syntax error when running CREATE PIPELINE.

Both CREATE PIPELINE and LOAD DATA (which is part of the CREATE PIPELINE syntax) have many options. Verify the syntax for the options you include is specified in the correct order.

You receive error 1970: Subprocess timed out

The master aggregator can likely not connect to the pipeline's data source. Check the connection parameters, such as CONFIG and CREDENTIALS, that specify how to connect to the data source.Also verify that the data source is reachable from the master aggregator.

CREATE PIPELINE ... S3 returns an error that the bucket cannot be located.

The bucket name is case-sensitive. Verify that the case of the bucket name specified in your CREATE PIPELINE ... S3 statement matches the case of the bucket name in S3.

Error 1953: exited with failure result (8 : Exec format error) or No such file or directory

This error can occur when a pipeline attempts to run a transform. Check the following:

1. Verify that the first line of your transform contains a shebang. This specifies the interpreter (such as Python) to use to execute the script.

2. Is the interpreter (such as Python) installed on all leaves?

3. If the transform was written on a Windows machine, do the newlines use \r\n?

CREATE PIPELINE ... WITH TRANSFORM fails with a libcurl error.

An incorrect path to the transform was likely specified. If the path to the transform is correct, then running curl with the path to the transform will succeed.

Error: java.lang.OutOfMemoryError: Java heap space

This error may occur when the default value (8MB) for the engine variable java_pipeline_heap_size is exceeded. Raise the default level to potentially correct this error.

A parsing error occurs in your transform.

To debug your transform, you can run EXTRACT PIPELINE ... INTO OUTFILE. This command saves a sample of the data extracted from the data source to a file. For debugging purposes, you can make changes to the file as needed and then send the file to the transform. For more information, see EXTRACT PIPELINE … INTO OUTFILE.

Rename a table referenced by a pipeline

When trying to rename a table that is referenced by a pipeline the following error will result:

ERROR 1945 ER_CANNOT_DROP_REFERENCED_BY_PIPELINE: Cannot rename table because it is referenced by pipeline <pipeline_name>

The following sequence demonstrates how to rename a pipeline referenced table:

  1. Save your pipeline settings:

    SHOW CREATE PIPELINE <pipeline_name> EXTENDED;
  2. Stop the pipeline:

    STOP PIPELINE <pipeline_name>;
  3. Drop the pipeline:

    DROP PIPELINE <pipeline_name>;
  4. Change the name of the table:

    ALTER TABLE <old_table_name> RENAME <new_table_name>;
  5. Recreate the pipeline with the settings obtain in step 1 and change the table name to reflect the new table name.

  6. Start the pipeline:

    START PIPELINE <pipeline_name>;

Pipeline errors that are handled automatically

Typical error handing scenario

In most situations, an error that occurs while a pipeline is running is handled in this way:

If an error occurs while a batch b is running, then b will fail and b 's transaction rolls back. Then b is retried at most pipelines_max_retries_per_batch_partition times. If all of the retries are unsuccessful and pipelines_stop_on_error is set to ON, the pipeline stops. Otherwise, the pipeline continues and processes a new batch nb ,which processes the same files or objects that b attempted to process, excluding any files or objects that may have caused the error.

The following table lists events, which may or may not cause errors, and how the events are handled.

Event

How the Event is Handled

The pipeline cannot access a file or object.

The typical error handling scenario (mentioned earlier in this topic) applies.

nb skips the file/object.

The pipeline cannot read a file or object because it is corrupted.

The typical error handling scenario (mentioned earlier in this topic) applies.

nb skips the file or object.

After fixing the issue with the corrupted file/object, you can have the pipeline reprocess the file/object by running ALTER PIPELINE ... DROP FILE <filename>;. The pipeline will process the file/object during the next batch.

A file or object is removed from the filesystem after the batch has started processing the file/object.

The batch does not fail; the file or object is processed.

A file is removed from the filesystem (or an object is removed from an object store) after the pipeline registers the file/object in information_schema.PIPELINES_FILES, but before the file/object is processed.

The typical error handling scenario (mentioned earlier in this topic) applies.

nb skips the file or object.

The cluster restarts while the batch is being processed.

The typical error handling scenario (mentioned earlier in this topic) applies.

Once the cluster is online, b is retried.

A leaf node is unavailable before the pipeline starts.

This does not cause the pipeline to fail. The pipeline will not ingest any data to the unavailable leaf node.

A leaf node fails while the pipeline is running.

The batch fails. The batch is retried as described in the typical error handling scenario; that batch and all future batches no longer attempt to load data to the unavailable leaf node.

An aggregator fails while the pipeline is running

The batch fails. When the aggregator is available, the batch is retried as described in the typical error handling scenario.

The pipeline reaches the allocated storage space for errors.

The pipeline pauses. How to address the issue: 1) Increase the value of the ingest_errors_max_disk_space_mb engine variable. 2) Run CLEAR PIPELINE ERRORS; to free up storage space for errors. (Running this command will remove all existing pipeline errors that are shown when running SHOW ERRORS;).

Additional Information

Last modified: September 12, 2023

Was this article helpful?