Replicate Data from MySQL

Change Data Capture (CDC) pipelines enable you to ingest historical data and sync the continuous changes to data as they happen on the source MySQL database. You can replicate your MySQL databases to SingleStore using CDC pipelines by performing these tasks:

  1. Configure the source MySQL database. Refer to Configure MySQL for more information.

  2. Configure the path of the java binary. Refer to Configure java Binary Path for more information.

  3. (Optional) Create a link to the MySQL database. Refer to CREATE LINK for more information. You can also specify the link configuration and credentials in the CONFIG/CREDENTIALS clause of the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement instead of creating a link.

    Note

    Create a link to the primary MySQL instance. SingleStore does not support replicating MySQL databases using CDC from secondary (replica) MySQL instances.

  4. Create the required table(s), stored procedure(s), and pipeline(s) using the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement. Refer to Syntax for more information. You can either replicate the MySQL tables as is or apply custom transformations.

    Note

    Before restarting the INFER PIPELINE operation, delete all the related artifacts.

    DROP TABLE <target_table_name>;
    DROP PIPELINE <pipeline_name>;
    DROP PROCEDURE <procedure_name>;
  5. Once all the components are configured, start the pipelines.

    • To start all the pipelines, run the START ALL PIPELINES SQL statement.

    • To start a specific pipeline, run the START PIPELINE <pipeline_name> SQL statement. By default, the pipeline is named <source_db_name>.<table_name>.

    Note: Once the pipeline(s) (CDC operation) has started, do not run ALTER TABLE statements on the source MySQL database.

For more information, refer to the relevant section on this page.

Replicate MySQL Tables Example

The following example shows how to replicate MySQL tables without any custom transformations. This example uses the LINK clause to specify the MySQL endpoint connection configuration.

Perform the following tasks after configuring MySQL:

  1. Create a link to the MySQL endpoint using the CREATE LINK command.

    CREATE LINK pLink AS MYSQL
    CONFIG '{"database.hostname": "svchost", "database.port": 3306, "database.ssl.mode":"required"'
    CREDENTIALS '{"database.password": "pa55w0rd", "database.user": "repl_user"}';
  2. Create tables, pipelines, and stored procedures in SingleStore based on the inference from the source tables:

    CREATE TABLES AS INFER PIPELINE AS LOAD DATA
    LINK pLink "*" FORMAT AVRO;
  3. Start the pipelines, and begin the replication process.

    START ALL PIPELINES;

Prerequisites

Configure java Binary Path

Install JRE version 11+, and specify the full path of the java binary using the java_pipelines_java11_path engine variable. You must set this engine variable on each node. Update the java binary path in the following command and then run it to configure java_pipelines_java11_path:

sdb-admin update-config --all --set-global --key "java_pipelines_java11_path" --value "/path_to/bin/java"

Refer to sdb-admin update-config for more information.

Configure MySQL

  1. Enable binary logging for MySQL replication by setting the log-bin option to mysql-bin in your MySQL configuration. Refer to Enabling the binlog for more information.

  2. Configure the following additional options in MySQL, as specified:

    binlog_format=ROW
    binlog_row_image=FULL

    You may need to restart MySQL in order to incorporate the configuration changes. Consult the MySQL documentation for more information.

  3. Create a new database named singlestore. This database is required for internal usage.

    CREATE DATABASE IF NOT EXISTS singlestore;

    The singlestore database is meant to store the pipeline metadata information to confirm the status of the pipeline and other operations. SingleStore does not allow replicating this database and the tables that are included in it. The tables in the singlestore database can be empty by design.

  4. Create a database user for connecting to the MySQL instance:

    CREATE USER <user> IDENTIFIED BY '<password>';

    You can also use an existing user.

  5. Grant the following privileges to the <user>:

    GRANT CREATE, INSERT, DELETE, DROP, SELECT ON singlestore.* TO <user>;
    GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO <user>;
    GRANT SELECT ON *.* TO <user>;
  6. For authentication, the <user> must use the mysql_native_password plugin. Refer to Native Pluggable Authentication for more information.

  7. The MySQL instance must allow incoming traffic from the SingleStore cluster.

  8. All the source MySQL tables must have primary keys.

Syntax

CREATE TABLE [IF NOT EXISTS] <table_name>
AS INFER PIPELINE AS LOAD DATA
{ LINK <link_name> "<source_db>.<source_table>" |
MYSQL "<source_db>.<source_table>" CONFIG '<config_json>' CREDENTIALS '<credentials_json>' }
FORMAT AVRO;
CREATE TABLES [IF NOT EXISTS]
AS INFER PIPELINE AS LOAD DATA
{ LINK <link_name> "*" |
MYSQL "*" CONFIG '<config_json>' CREDENTIALS '<credentials_json>' }
FORMAT AVRO;

CREATE TABLE ... AS INFER PIPELINE Behavior

The CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement,

  1. Connects to the MySQL instance using the specified LINK <link_name> or CONFIG/CREDENTIALS clause.

  2. Infers the schema of the table and creates a table (named <table_name>) using the inferred schema. When the IF NOT EXISTS clause is specified and a table with the specified name already exists, a new table is not created and the existing table is used instead. Refer to Limitations for more information.

  3. Creates a pipeline (named <source_db_name>.<table_name>) and stored procedure (named <source_db_name>.<table_name>) that maps the AVRO data structure to the SingleStore data structure. The IF NOT EXISTS clause is ignored for pipelines and stored procedures. If a pipeline or stored procedure with the same name already exists, the CREATE TABLE ... AS INFER PIPELINE statement returns an error.

CREATE TABLES AS INFER PIPELINE Behavior

The CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE statement creates a table in SingleStore for each table in the source MySQL database using the same set of operations as the CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement (specified above). Additionally, it discovers the available databases and tables filtered by database.exclude.list, database.include.list, table.exclude.list, and table.include.list.

Arguments

  • <table_name>: Name of the table to create in the SingleStore database. You can also specify a table name that differs from the name of the source MySQL table.

  • <link_name>: Name of the link to the MySQL endpoint. Refer to CREATE LINK for more information.

  • <source_db>: Name of the source MySQL database.

  • <source_table>: Name of the source MySQL table.

  • <config_json>: Configuration parameters, including the source MySQL configuration, in the JSON format. Refer to Parameters for supported parameters.

  • <credentials_json>: Credentials to use to access the MySQL database in JSON format. For example:

    CREDENTIALS '{"database.password": "<password>", "database.user": "<user>"}'
    • database.user: Name of the MySQL database user.

    • database.password: Password of the MySQL database user.

Parameters

The CREATE {TABLE | TABLES}, CREATE LINK, and CREATE AGGREGATOR PIPELINE statements support the following parameters in the CONFIG clause:

Parameter

Description

database.hostname

IP address or the hostname of the source MySQL instance.

database.port

Port of the MySQL instance.

Default: 3306

database.ssl.mode

Specifies the SSL mode.

max.queue.size

Specifies the size of the queue inside the extractor process for records that are ready for ingestion. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_cdc_java_heap_size engine variable. Refer to Configure Ingestion Speed Limit using Engine Variables for more information.

Default: 1024

max.batch.size

Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch). max.batch.size must be lower than max.queue.size. Refer to Configure Ingestion Speed Limit using Engine Variables for more information.

Default: 512

poll.interval.ms

Specifies the interval for polling of remote sources (in milliseconds) if there were no new records in the previous iteration in the replication process.

Default: 500

table.include.list

A comma-separated list of regular expressions that match fully-qualified table identifiers (in databaseName.tableName format) for MySQL tables to monitor. By default, changes in all the non-system tables in the database are captured. This parameter is only supported in CREATE TABLES AS INFER PIPELINE statement. When this option is specified, tables excluded from the list are not monitored. The table.include.list and table.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLES AS INFER PIPELINE statement. This parameter is ignored in the CREATE TABLE ... AS INFER PIPELINE statement.

table.exclude.list

A comma-separated list of regular expressions that match fully-qualified table identifiers (in databaseName.tableName format) for MySQL tables to exclude from the monitoring list. By default, this list is empty. This parameter is only supported in CREATE TABLES AS INFER PIPELINE statement. The table.include.list and table.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLES AS INFER PIPELINE statement.

database.include.list

A comma-separated list of regular expressions that match the names of databases to monitor. By default, all the databases are monitored. When this option is specified, databases excluded from the list are not monitored. This parameter is only supported in CREATE TABLES AS INFER PIPELINE statements. The database.include.list and database.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE AS INFER PIPELINE statement. If this option is used with the table.include.list or table.exclude.list option, it returns the intersection of the matches

database.exclude.list

A comma-separated list of regular expressions that match the names of databases to exclude from monitoring. By default, this list is empty. This parameter is only supported in CREATE TABLES AS INFER PIPELINE statement. The database.include.list and database.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLES AS INFER PIPELINE statement. If this option is used with the table.include.list or table.exclude.list option, it returns the intersection of the matches.

signal.data.collection

A table in the remote MySQL source that is used by SingleStore to generate special markings for snapshotting and synchronization. By default, this parameter is set to singlestore.signals_xxxxxx, where xxxxxx is an automatically generated character sequence. The default signal table is in the singlestore database. Once the pipelines are started, any change to the value of this parameter is ignored, and the pipelines use the latest value specified before the pipelines started.

snapshot.mode

Specifies the snapshot mode for the pipeline. It can have one of the following values:

  • "initial" (Default): Perform a full snapshot first and replay CDC events created during the snapshot. Then, continue ingestion using CDC.

  • "incremental": Start the snapshot operation and CDC simultaneously.

  • "schema_only": Skip the snapshot, and ingest changes using CDC.

Refer to CDC Snapshot Strategies for more information.

Default: "initial"

Replication Strategies

Use one of the following methods to create the required components for data ingestion.

Replicate MySQL Tables As Is

To replicate or migrate MySQL tables as is, use the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement. This method automatically creates the required tables, pipelines, and stored procedures. Refer to Syntax for more information.

Apply Transformations or Ingest a Subset of Columns

To apply transformations or ingest only a subset of columns, manually create the required tables, stored procedure, and pipelines:

  1. Run the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement to infer the schema of the MySQL table(s) and automatically generate templates for the relevant table(s), stored procedure(s), and aggregator pipeline(s).

  2. Use the automatically-generated templates as a base to create a new table(s), stored procedure(s), and pipeline(s) for custom transformations. To inspect the generated table(s), stored procedure(s), and pipeline(s), use the SHOW CREATE TABLE , SHOW CREATE PROCEDURE, and SHOW CREATE PIPELINE commands, respectively. After running the SHOW commands, you can drop the templates and then recreate the same components with custom transformations.

    Using the automatically-generated templates:

    1. Create table(s) in SingleStore with a structure that can store the ingested MySQL table. By default, the primary key is used as the shard key for a table. You can specify a different shard key and sort key in the table definition. For an example, refer to Example 3. Refer to CREATE TABLE for more information.

    2. Create stored(s) procedure to map the MySQL table columns to the SingleStore table and implement other transformations required. Refer to CREATE PROCEDURE for information on creating stored procedures.

    3. Create pipeline(s) to ingest the MySQL tables using the CREATE AGGREGATOR PIPELINE SQL statement.

      Note that the pipeline definition may contain encoded names in the field mapping clause for unsupported symbols in the _uxxxx format because of Avro naming limitations, for example, `i_d` <- `payload`::`i_u005fd`. The table and stored procedure declarations use human readable Unicode symbols instead of the encoding. Hence, they do not require additional handling. Refer to Parameters for a list of supported parameters. Refer to CREATE PIPELINE for the complete syntax and related information.

      Note: The CDC feature only supports AGGREGATOR pipelines.

Refer to Syntax for information on CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement.

CDC Snapshot Strategies

SingleStore supports the following strategies for creating snapshots:

  • Perform a full snapshot before CDC:

    The pipeline captures the position in the binary log and then performs a full snapshot of the data. Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default. If the pipeline is restarted while the snapshot is in progress, the snapshot is restarted from the beginning.

    To use this strategy, set "snapshot.mode":"initial" in the CONFIG JSON.

    Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

  • CDC only:

    The pipeline will not ingest existing data, and only the changes are captured using CDC.

    To use this strategy, set "snapshot.mode":"schema_only" in the CONFIG JSON.

  • Perform a snapshot in parallel to CDC:

    The pipeline captures the position in the binary log and starts capturing the changes using CDC. In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records. Although this strategy is slower than performing a full snapshot and then ingesting changes using CDC, it is more resilient to pipeline restarts.

    To use this strategy, set "snapshot.mode":"incremental" in the CONFIG JSON.

    Requirement: The binary log retention period must be long enough to compensate for unexpected pipeline downtime.

  • Manually perform the snapshot and capture changes using the CDC pipeline:

    1. Create a pipeline and then wait for at least one batch of ingestion to capture the binary log position.

    2. Stop the pipeline.

    3. Snapshot the data using any of the suitable methods, for example, mysqldump.

    4. Restore the snapshot in SingleStore with any of the supported methods, for example, load data from an S3 bucket.

    5. Start the CDC pipeline.

    To use this strategy, set "snapshot.mode":"schema_only" in the CONFIG JSON. This strategy provides faster data ingestion when the initial historical data is very large in size.

    Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

Configure Ingestion Speed Limit using Engine Variables

Use the following engine variables to configure ingestion speed:

Variable Name

Description

Default Value

pipelines_cdc_row_emit_delay_us

Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to your SingleStore databases. It can have a maximum value of 1000000.

1

pipelines_cdc_java_heap_size

Specifies the JVM heap size limit (in MBs) for CDC-in pipelines.

128

pipelines_cdc_max_extractors

Specifies the maximum number of CDC-in extractor instances that can run concurrently.

16

pipelines_cdc_min_extractor_lifetime_s

Specifies the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events.

60

Refer to Engine Variables for information on how to set engine variables. You can also use the supported parameters to configure the ingestion speed.

In-Depth Variable Definitions

Use the pipelines_cdc_row_emit_delay_us engine variable to limit the impact of CDC pipelines on the master aggregator node. It specifies a forced delay in row emission during ingest. This variable can be set to a maximum value of 1000000. To disable the forced delay while ingestion, set this variable to 0.

Warning

Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.

Use the max.queue.size parameter in the CONFIG JSON to control the ingestion speed. SingleStore recommends setting max.batch.size to half of max.queue.size. Increasing the queue size requires a larger Java heap limit, adjust the pipelines_cdc_java_heap_size engine variable accordingly. Query the INFORMATION_SCHEMA.PIPELINES_BATCHES_SUMMARY table for information on pipeline batch performance.

Use the pipelines_cdc_max_extractors engine variable to limit the number of CDC-in extractor instances that can run concurrently. The extractors provide data to the CDC-in pipelines and consume a persistent amount of resources in the process. Therefore, the Master Aggregator node can only run a limited number of extractor instances. If the number of CDC-in pipelines is greater than pipelines_cdc_max_extractors, some pipelines will have to wait in the queue until an extractor can be acquired to fetch data. This variable can be set to a maximum value of 1024.

Use the pipelines_cdc_min_extractor_lifetime_s variable to specify the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events. This variable can be set to a maximum value of 3600.

Limitations

  • SingleStore intentionally does not support all of the features of MySQL. Refer to Unsupported MySQL Features for more information.

  • Autoincrement properties, default values, indexes, and additional keys (excluding primary keys) are not inferred or replicated.

  • If any column in the source MySQL table has a data type that is not supported by SingleStore, all the columns in the table are ingested as TEXT type in SingleStore. Apply custom transformations to your stored procedures and pipelines to map the unsupported MySQL type to SingleStore type. Refer to Data Types for information on supported data types.

  • The MySQL database, table, or column names must not contain the following characters: /, \, `, ', ", '\b', '\t', '\n', '\f', '\r', or non-printable characters. Additionally, the database name must not contain ..

Troubleshooting

  • If the CREATE {TABLES | TABLE} AS INFER PIPELINE SQL statement returns an error, run the SHOW WARNINGS command to view the reason behind the failure.

  • To view the status of the pipelines, query the information_schema.pipelines_cursors table. Run the following SQL statement to display the status of the replication task:

    SELECT SOURCE_PARTITION_ID,
    EARLIEST_OFFSET,
    LATEST_OFFSET,
    LATEST_EXPECTED_OFFSET-LATEST_OFFSET as DIFF,
    UPDATED_UNIX_TIMESTAMP
    FROM information_schema.pipelines_cursors;
  • To view pipeline errors, run the following SQL statement:

    SELECT * FROM information_schema.pipelines_errors;
  • If the replication process fails with an out of memory error in Java, increase the heap size using the pipeline_cdc_java_heap_size engine variable. Refer to Configure Ingestion Speed Limit using Engine Variables for more information.

Examples

Refer to Replicate MySQL Tables Example for a sample replication example.

Example 1

The following example shows how to specify a table name that differs from the source MySQL table name:

CREATE TABLE s2table AS INFER PIPELINE AS LOAD DATA
LINK pLink "db1.mtest" FORMAT AVRO;

This command replicates the mtest table from the source MySQL database to the s2table in SingleStore.

Example 2

The following example shows how to specify additional parameters in a CREATE LINK statement:

CREATE LINK pLink AS MYSQL
CONFIG '{
"database.hostname": "svchost",
"database.port": 3306,
"database.ssl.mode":"required",
"database.exclude.list": "mysql,performance_schema",
"table.include.list": "db1.example",
"max.queue.size": 1024,
"max.batch.size": 512,
"poll.interval.ms": 500}'
CREDENTIALS '{
"database.password": "pa55w0rd",
"database.user": "repl_user"}';

Example 3

The following example shows how to specify a different sort key and shard key for the table.

  1. Infer the schema of the source MySQL table and generate a template for the table:

    CREATE TABLE IF NOT EXISTS mtest AS INFER PIPELINE
    AS LOAD DATA LINK pLink "db1.mtest" FORMAT AVRO;
  2. Run the SHOW CREATE TABLE command to get the table definition:

    SHOW CREATE TABLE mtest;
    +-----------+-----------------------------------------------------------------------------------------------------------------------------------+
    | Table     | Create Table |
    +-----------+----------------------------------------------------------------------------------------------------------------------------------+
    | mtest     | CREATE TABLE `mtest` (
    `id` int(11) NOT NULL,
    `text` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
    PRIMARY KEY (`id`),
    SHARD KEY `__SHARDKEY` (`id`),
    SORT KEY `__UNORDERED` ()
    ) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL AUTOSTATS_HISTOGRAM_MODE=CREATE AUTOSTATS_SAMPLING=ON SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER' |
    +-----------+-----------------------------------------------------------------------------------------------------------------------------------+
  3. Drop the existing table, and use the definition of the template as base to specify a different sort key or shard key:

    DROP TABLE mtest;
    CREATE TABLE mtest (
    id INT(11) NOT NULL PRIMARY KEY,
    text TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci,
    SHARD KEY (id), SORT KEY(id, text));

Last modified: September 10, 2024

Was this article helpful?