Load Data from MySQL
On this page
You can replicate your MySQL databases in SingleStore using Change Data Capture (CDC).
Create a link to the MySQL endpoint using the CREATE LINK
statement, and use one of the following methods:
-
To replicate/migrate MySQL tables (as is), use the
CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement.Refer to Remarks for more information. -
To apply transformations and manually ingest the tables:
-
Create a table in SingleStore with a structure that can store the ingested MySQL tables.
-
Create a stored procedure to map the MySQL tables to the SingleStore table and implement other transformations required.
-
Create a pipeline to ingest the MySQL tables using the
CREATE AGGREGATOR PIPELINE
SQL statement.Refer to Parameters for a list of supported parameters. Note
The CDC feature only supports
AGGREGATOR
pipelines.
-
-
(Optional) Run the
CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement to infer the schema of the MySQL table(s) and generate the relevant table(s), stored procedure, and aggregator pipeline, automatically.Subsequently, use the automatically generated stored procedure and pipeline as a base for custom transformations. To inspect the stored procedure and pipeline, use the SHOW CREATE PROCEDURE
andSHOW CREATE PIPELINE
commands, respectively.
Once all the components are configured, start the pipelines.
Note: Install JRE version 11+, and specify the full path of the java
binary using the java_
engine variable.java
binary path in the following command and then run it to configure java_
:
sdb-admin update-config --all --set-global --key "java_pipelines_java11_path" --value "/path_to/bin/java"
Refer to sdb-admin update-config for more information.
Syntax
CREATE LINK <link_name> AS MYSQLCONFIG '{"database.hostname": "<hostname>","database.port": 3306 [, "database.ssl.mode":"<ssl_mode>"]}'CREDENTIALS '{"database.password": "<password>", "database.user": "<user>"}';
Refer to CREATE LINK for the complete syntax and related information.
CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE AS LOAD DATALINK <link_name> "<source_db>.<source_table>" FORMAT AVRO;CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE AS LOAD DATALINK <link_name> "*" FORMAT AVRO;
Refer to CREATE TABLE for related information.
CREATE [OR REPLACE] AGGREGATOR PIPELINE [IF NOT EXISTS] <pipeline_name> ASLOAD DATA MYSQL <link_configuration> ...<avro_format_options>
Refer to CREATE PIPELINE for the complete syntax and related information.
Parameters
The CREATE AGGREGATOR PIPELINE .
statement supports the following parameters in the CONFIG
/CREDENTIALS
clause:
-
max.
: Specifies the size of the queue inside the extractor process for records that are ready for ingestion.queue. size The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_ cdc_ java_ heap_ size. -
max.
: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch).batch. size The default batch size is 512. max.
must be lower thanbatch. size max.
.queue. size -
poll.
: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process.interval. ms The default interval is 500 milliseconds. -
snapshot.
: Specifies the snapshot mode for the pipeline.mode It can have one of the following values: -
"initial"
(Default): Perform a full snapshot first and replay CDC events created during the snapshot.Then, continue ingestion using CDC. -
"incremental"
: Start the snapshot operation and CDC simultaneously. -
"never"
: Skip the snapshot, and ingest changes using CDC.
Refer to CDC Snapshot Strategies for more information.
-
Remarks
-
The
CREATE TABLE [IF NOT EXISTS] <table_
statement,name> AS INFER PIPELINE -
Connects to the MySQL instance using the specified
LINK <link_
clause.name> -
Infers the schema of the table and creates a table (named <table_
name>) using the inferred schema. When the IF NOT EXISTS
clause is specified and a table with the specified name already exists, a new table is not created and the existing table is used instead. -
Creates a pipeline (named <source_
db_ name>. <table_ name>) and stored procedure (named <source_ db_ name>. <table_ name>) that maps the AVRO data structure to the SingleStore data structure. The IF NOT EXISTS
clause is ignored for pipelines and stored procedures.If a pipeline or stored procedure with the same name already exists, the CREATE TABLE .
statement returns an error.. . AS INFER PIPELINE
-
-
The
CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE
statement creates a table in SingleStore for each table in the source MySQL database using the same set of operations as theCREATE TABLE [IF NOT EXISTS] <table_
statement (specified above).name> AS INFER PIPELINE Additionally, it discovers the available databases and tables filtered by database.
,exclude. list database.
,include. list table.
, andexclude. list table.
.include. list -
All the source MySQL tables must have primary keys.
-
You can also specify a table name that differs from the name of the source MySQL table.
-
The
MYSQL
source only supportsAVRO
data type. -
Use the
SYNC PIPELINE <pipeline_
statement to replicate (sync with) the data source.name> -
If the replication process fails with an out of memory error in Java, increase the heap size using the
pipeline_
engine variable.cdc_ java_ heap_ size -
The MySQL instance must allow incoming traffic from the SingleStore cluster.
-
To view pipeline errors, run the following SQL statement:
SELECT * FROM information_schema.pipelines_errors; -
Before restarting the
INFER PIPELINE
operation, delete all the related artifacts.DROP TABLE <target_table_name>;DROP PIPELINE <pipeline_name>;DROP PROCEDURE <procedure_name>; -
SingleStore intentionally does not support all of the features of MySQL.
Refer to Unsupported MySQL Features for more information.
CDC Snapshot Strategies
SingleStore supports the following strategies for creating snapshots:
-
Perform a full snapshot before CDC: The pipeline captures the position in the binary log and then performs a full snapshot of the data.
Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default. To use this strategy, set
"snapshot.
in themode":"initial" CONFIG
JSON.Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
CDC only: The pipeline will not ingest existing data, and only the changes are captured using CDC.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON. -
Perform a snapshot in parallel to CDC: The pipeline captures the position in the binary log and starts capturing the changes using CDC.
In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records. To use this strategy, set
"snapshot.
in themode":"incremental" CONFIG
JSON.Requirement: The binary log retention period must be long enough to compensate for unexpected pipeline downtime.
-
Manually perform the snapshot and capture changes using the CDC pipeline:
-
Create a pipeline and then wait for at least one batch of ingestion to capture the binary log position.
-
Stop the pipeline.
-
Snapshot the data using any of the suitable methods.
-
Start the pipeline.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON.Requirement: The binary log retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
Configure Ingestion Speed Limit
Use the following engine variables to configure ingestion speed:
Variable Name |
Description |
Default Value |
---|---|---|
|
Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to your SingleStore databases. |
|
|
Specifies the JVM heap size limit (in MBs) for CDC-in pipelines. |
|
|
Specifies the maximum number of CDC-in extractor instances that can run concurrently. |
|
|
Specifies the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events. |
|
In-Depth Variable Definitions
Use the pipelines_
engine variable to limit the impact of CDC pipelines on the master aggregator node.1000000
.0
.
Warning
Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.
Use the max.
parameter in the CONFIG
JSON to control the ingestion speed.max.
to half of max.
.pipelines_
engine variable accordingly.INFORMATION_
table for information on pipeline batch performance.
Use the pipelines_
engine variable to limit the number of CDC-in extractor instances that can run concurrently.pipelines_
, some pipelines will have to wait in the queue until an extractor can be acquired to fetch data.1024
.
Use the pipelines_
variable to specify the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events.3600
.
Required Configurations to Replicate MySQL Tables
Configure MySQL
-
Enable binary logging for MySQL replication by setting the
log-bin
option tomysql-bin
in your MySQL configuration.Refer to Enabling the binlog for more information. -
Configure the following additional options in MySQL, as specified:
binlog_format=ROWbinlog_row_image=FULLYou may need to restart MySQL in order to incorporate the configuration changes.
Consult the MySQL documentation for more information. -
Create a new database named
singlestore
.This database is required for internal usage. CREATE DATABASE IF NOT EXISTS singlestore;The
singlestore
database is meant to store the pipeline metadata information to confirm the status of the pipeline and other operations.SingleStore does not allow replicating this database and the tables that are included in it. -
Create a database user for connecting to the MySQL instance:
CREATE USER <user> IDENTIFIED BY '<password>';You can also use an existing user.
-
Grant the following privileges to the
<user>
:GRANT CREATE, INSERT, DELETE, DROP, SELECT ON singlestore.* TO <user>;GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO <user>;GRANT SELECT ON *.* TO <user>; -
For authentication, the
<user>
must use themysql_
plugin.native_ password Refer to Native Pluggable Authentication for more information.
Replicate MySQL Tables
-
Create a link to the MySQL endpoint using the
CREATE LINK
command.CREATE LINK <link_name> AS MYSQLCONFIG '{"database.hostname": "<hostname>", "database.port": 3306, "database.ssl.mode":"<ssl_mode>"}'CREDENTIALS '{"database.password": "<password>", "database.user": "<user>"}'; -
Run the
CREATE {TABLE | TABLES} AS INFER PIPELINE
SQL statement to replicate the tables. -
Start the pipeline(s):
-
To start all the pipelines, run the
START ALL PIPELINES
SQL statement. -
To start a specific pipeline, run the
START PIPELINE <pipeline_
SQL statement.name> By default, the pipeline has the same name as the target table.
Note
Once the pipeline(s) (CDC operation) has started, do not run
ALTER TABLE
statements. -
Example
Perform the following tasks after configuring MySQL:
-
Create a link to the MySQL endpoint.
CREATE LINK pLink AS MYSQLCONFIG '{"database.hostname": "svchost", "database.port": 3306, "database.ssl.mode":"required"}'CREDENTIALS '{"database.password": "s2user", "database.user": "admin"}'; -
Create tables, pipelines, and stored procedures in SingleStore based on the inference from the source tables.
CREATE TABLES AS INFER PIPELINE AS LOAD DATALINK pLink "*" FORMAT AVRO; -
Start the pipelines and begin the replication process.
START ALL PIPELINES;
Last modified: July 4, 2024