Load Data from MySQL

You can replicate your MySQL databases in SingleStore using Change Data Capture (CDC).

Create a link to the MySQL endpoint using the CREATE LINK statement, and use one of the following methods:

  • To replicate/migrate MySQL tables (as is), use the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement. Refer to Remarks for more information.

  • To apply transformations and manually ingest the tables:

    1. Create a table in SingleStore with a structure that can store the ingested MySQL tables.

    2. Create a stored procedure to map the MySQL tables to the SingleStore table and implement other transformations required.

    3. Create a pipeline to ingest the MySQL tables using the CREATE AGGREGATOR PIPELINE SQL statement. Refer to Parameters for a list of supported parameters.

      Note

      The CDC feature only supports AGGREGATOR pipelines.

  • (Optional) Run the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement to infer the schema of the MySQL table(s) and generate the relevant table(s), stored procedure, and aggregator pipeline, automatically. Subsequently, use the automatically generated stored procedure and pipeline as a base for custom transformations. To inspect the stored procedure and pipeline, use the SHOW CREATE PROCEDURE and SHOW CREATE PIPELINE commands, respectively.

Once all the components are configured, start the pipelines.

Note: Install JRE version 11+, and specify the full path of the java binary using the java_pipelines_java11_path engine variable. You must set this engine variable on each node. Update the java binary path in the following command and then run it to configure java_pipelines_java11_path:

sdb-admin update-config --all --set-global --key "java_pipelines_java11_path" --value "/path_to/bin/java"

Refer to sdb-admin update-config for more information.

Syntax

CREATE LINK <link_name> AS MYSQL
CONFIG '{"database.hostname": "<hostname>",
"database.port": 3306 [, "database.ssl.mode":"<ssl_mode>"]}'
CREDENTIALS '{"database.password": "<password>", "database.user": "<user>"}';

Refer to CREATE LINK for the complete syntax and related information.

CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE AS LOAD DATA
LINK <link_name> "<source_db>.<source_table>" FORMAT AVRO;
CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE AS LOAD DATA
LINK <link_name> "*" FORMAT AVRO;

Refer to CREATE TABLE for related information.

CREATE [OR REPLACE] AGGREGATOR PIPELINE [IF NOT EXISTS] <pipeline_name> AS
LOAD DATA MYSQL <link_configuration> ...
<avro_format_options>

Refer to CREATE PIPELINE for the complete syntax and related information. Refer to CREATE PROCEDURE for information on creating stored procedures.

Parameters

The CREATE AGGREGATOR PIPELINE ... AS LOAD DATA MYSQL statement supports the following parameters in the CONFIG/CREDENTIALS clause:

  • max.queue.size: Specifies the size of the queue inside the extractor process for records that are ready for ingestion. The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_cdc_java_heap_size.

  • max.batch.size: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch). The default batch size is 512. max.batch.size must be lower than max.queue.size.

  • poll.interval.ms: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process. The default interval is 500 milliseconds.

  • snapshot.mode: Specifies the snapshot mode for the pipeline. It can have one of the following values:

    • "initial" (Default): Perform a full snapshot first and replay CDC events created during the snapshot. Then, continue ingestion using CDC.

    • "incremental": Start the snapshot operation and CDC simultaneously.

    • "never": Skip the snapshot, and ingest changes using CDC.

    Refer to CDC Snapshot Strategies for more information.

Remarks

  • The CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement,

    • Connects to the MySQL instance using the specified LINK <link_name> clause.

    • Infers the schema of the table and creates a table (named <table_name>) using the inferred schema. When the IF NOT EXISTS clause is specified and a table with the specified name already exists, a new table is not created and the existing table is used instead.

    • Creates a pipeline (named <source_db_name>.<table_name>) and stored procedure (named <source_db_name>.<table_name>) that maps the AVRO data structure to the SingleStore data structure. The IF NOT EXISTS clause is ignored for pipelines and stored procedures. If a pipeline or stored procedure with the same name already exists, the CREATE TABLE ... AS INFER PIPELINE statement returns an error.

  • The CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE statement creates a table in SingleStore for each table in the source MySQL database using the same set of operations as the CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement (specified above). Additionally, it discovers the available databases and tables filtered by database.exclude.list, database.include.list, table.exclude.list, and table.include.list.

  • All the source MySQL tables must have primary keys.

  • You can also specify a table name that differs from the name of the source MySQL table.

  • The MYSQL source only supports AVRO data type.

  • Use the SYNC PIPELINE <pipeline_name> statement to replicate (sync with) the data source.

  • If the replication process fails with an out of memory error in Java, increase the heap size using the pipeline_cdc_java_heap_size engine variable.

  • The MySQL instance must allow incoming traffic from the SingleStore cluster.

  • To view pipeline errors, run the following SQL statement:

    SELECT * FROM information_schema.pipelines_errors;
  • Before restarting the INFER PIPELINE operation, delete all the related artifacts.

    DROP TABLE <target_table_name>;
    DROP PIPELINE <pipeline_name>;
    DROP PROCEDURE <procedure_name>;
  • SingleStore intentionally does not support all of the features of MySQL. Refer to Unsupported MySQL Features for more information.

CDC Snapshot Strategies

SingleStore supports the following strategies for creating snapshots:

  • Perform a full snapshot before CDC: The pipeline captures the position in the binary log and then performs a full snapshot of the data. Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default.

    To use this strategy, set "snapshot.mode":"initial" in the CONFIG JSON.

    Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

  • CDC only: The pipeline will not ingest existing data, and only the changes are captured using CDC.

    To use this strategy, set "snapshot.mode":"never" in the CONFIG JSON.

  • Perform a snapshot in parallel to CDC: The pipeline captures the position in the binary log and starts capturing the changes using CDC. In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records.

    To use this strategy, set "snapshot.mode":"incremental" in the CONFIG JSON.

    Requirement: The binary log retention period must be long enough to compensate for unexpected pipeline downtime.

  • Manually perform the snapshot and capture changes using the CDC pipeline:

    1. Create a pipeline and then wait for at least one batch of ingestion to capture the binary log position.

    2. Stop the pipeline.

    3. Snapshot the data using any of the suitable methods.

    4. Start the pipeline.

    To use this strategy, set "snapshot.mode":"never" in the CONFIG JSON.

    Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

Configure Ingestion Speed Limit

Use the following engine variables to configure ingestion speed:

Variable Name

Description

Default Value

pipelines_cdc_row_emit_delay_us

Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to your SingleStore databases. It can have a maximum value of 1000000.

1

pipelines_cdc_java_heap_size

Specifies the JVM heap size limit (in MBs) for CDC-in pipelines.

128

pipelines_cdc_max_extractors

Specifies the maximum number of CDC-in extractor instances that can run concurrently.

16

pipelines_cdc_min_extractor_lifetime_s

Specifies the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events.

60

In-Depth Variable Definitions

Use the pipelines_cdc_row_emit_delay_us engine variable to limit the impact of CDC pipelines on the master aggregator node. It specifies a forced delay in row emission during ingest. This variable can be set to a maximum value of 1000000. To disable the forced delay while ingestion, set this variable to 0.

Warning

Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.

Use the max.queue.size parameter in the CONFIG JSON to control the ingestion speed. SingleStore recommends setting max.batch.size to half of max.queue.size. Increasing the queue size requires a larger Java heap limit, adjust the pipelines_cdc_java_heap_size engine variable accordingly. Query the INFORMATION_SCHEMA.PIPELINES_BATCHES_SUMMARY table for information on pipeline batch performance.

Use the pipelines_cdc_max_extractors engine variable to limit the number of CDC-in extractor instances that can run concurrently. The extractors provide data to the CDC-in pipelines and consume a persistent amount of resources in the process. Therefore, the Master Aggregator node can only run a limited number of extractor instances. If the number of CDC-in pipelines is greater than pipelines_cdc_max_extractors, some pipelines will have to wait in the queue until an extractor can be acquired to fetch data. This variable can be set to a maximum value of 1024.

Use the pipelines_cdc_min_extractor_lifetime_s variable to specify the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events. This variable can be set to a maximum value of 3600.

Required Configurations to Replicate MySQL Tables

Configure MySQL

  1. Enable binary logging for MySQL replication by setting the log-bin option to mysql-bin in your MySQL configuration. Refer to Enabling the binlog for more information.

  2. Configure the following additional options in MySQL, as specified:

    binlog_format=ROW
    binlog_row_image=FULL

    You may need to restart MySQL in order to incorporate the configuration changes. Consult the MySQL documentation for more information.

  3. Create a new database named singlestore. This database is required for internal usage.

    CREATE DATABASE IF NOT EXISTS singlestore;

    The singlestore database is meant to store the pipeline metadata information to confirm the status of the pipeline and other operations. SingleStore does not allow replicating this database and the tables that are included in it.

  4. Create a database user for connecting to the MySQL instance:

    CREATE USER <user> IDENTIFIED BY '<password>';

    You can also use an existing user.

  5. Grant the following privileges to the <user>:

    GRANT CREATE, INSERT, DELETE, DROP, SELECT ON singlestore.* TO <user>;
    GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO <user>;
    GRANT SELECT ON *.* TO <user>;
  6. For authentication, the <user> must use the mysql_native_password plugin. Refer to Native Pluggable Authentication for more information.

Replicate MySQL Tables

  1. Create a link to the MySQL endpoint using the CREATE LINK command.

    CREATE LINK <link_name> AS MYSQL
    CONFIG '{"database.hostname": "<hostname>", "database.port": 3306, "database.ssl.mode":"<ssl_mode>"}'
    CREDENTIALS '{"database.password": "<password>", "database.user": "<user>"}';
  2. Run the CREATE {TABLE | TABLES} AS INFER PIPELINE SQL statement to replicate the tables.

  3. Start the pipeline(s):

    1. To start all the pipelines, run the START ALL PIPELINES SQL statement.

    2. To start a specific pipeline, run the START PIPELINE <pipeline_name> SQL statement. By default, the pipeline has the same name as the target table.

    Note

    Once the pipeline(s) (CDC operation) has started, do not run ALTER TABLE statements.

Example

Perform the following tasks after configuring MySQL:

  1. Create a link to the MySQL endpoint.

    CREATE LINK pLink AS MYSQL
    CONFIG '{"database.hostname": "svchost", "database.port": 3306, "database.ssl.mode":"required"}'
    CREDENTIALS '{"database.password": "s2user", "database.user": "admin"}';
  2. Create tables, pipelines, and stored procedures in SingleStore based on the inference from the source tables.

    CREATE TABLES AS INFER PIPELINE AS LOAD DATA
    LINK pLink "*" FORMAT AVRO;
  3. Start the pipelines and begin the replication process.

    START ALL PIPELINES;

Last modified: July 4, 2024

Was this article helpful?