Replicate Data from MongoDB®
On this page
Change Data Capture (CDC) pipelines enable you to ingest historical data and sync the continuous changes to data as they happen on the source MongoDB® database.
You can perform the replication using SQL statements.
Data Storage Model
The _
field of the document is added to the _
column, and the rest of the fields in the document are added to the _
column of the table in SingleStore.
Column Name |
Data Type |
Description |
---|---|---|
|
BSON NOT NULL |
The |
|
BSON NOT NULL |
Contains all the fields in the document, except the |
|
PERSISTED LONGBLOB |
Used to implement SingleStore unique indexes on MongoDB® collections, similar to BSON unique indexes. |
View the BSON Data
To view the BSON data, SingleStore recommends the following:
-
Cast the columns to JSON using the following SQL command:
SELECT _id :> JSON , _more :> JSON FROM <table_name>;
Prerequisites
Configure java
Binary Path
Install JRE version 11+, and specify the full path of the java
binary using the java_
engine variable.java
binary path in the following command and then run it to configure java_
:
sdb-admin update-config --all --set-global --key "java_pipelines_java11_path" --value "/path_to/bin/java"
Refer to sdb-admin update-config for more information.
Source Database Requirements
The replication feature uses MongoDB® change streams, and it works only with replica sets.
-
Read the
admin
database for the operation log (oplog) -
Read the
config
database in the configuration server -
listDatabases
privilege -
Cluster-wide
find
andchangeStream
privileges -
Write permission to the
singlestore
database.The singlestore
database is automatically created during the replication process.
You can provide the privileges or assign roles to the MongoDB® user using the MongoDB® Atlas UI or MongoDB® commands.
-
Using the UI for MongoDB® Atlas:
-
On the MongoDB® Cloud dashboard, select Projects > <your_
project>. -
In the left navigation pane, under Security, select Database Access.
-
On the Database Users page, select Edit for the MongoDB® user used to connect to the MongoDB® instance.
-
On the Edit User dialog, under Database User Privileges, select Specific Privileges > Add Specific Privilege.
-
Add the following privileges:
-
readAnyDatabase
-
read
for theconfig
database -
readWrite
for thesinglestore
database
-
-
Select Update User.
-
-
Using MongoDB® commands for self-managed deployments: The following commands create a role and then assign the role to a user.
db.adminCommand({ createRole: 'cdcRole',privileges: [{'resource': {'cluster': true}, 'actions': ['find', 'changeStream']}],roles: [{'role': 'read', 'db': 'admin'},{'role': 'read', 'db': 'local'},{'role': 'read', 'db': 'config'},{'role': 'readWrite', 'db': 'singlestore'}]})db.adminCommand({ createUser: 'cdcUser',pwd: 's2mongoCDC12',roles: ['cdcRole']})Refer to Database Commands for more information.
Replicate MongoDB® Collections using SQL
To replicate your existing MongoDB® collections to your SingleStore database using SQL commands via Change Data Capture (CDC) pipelines, perform the following tasks:
-
Ensure that the Prerequisites are met.
-
(Optional) Create a link to the MongoDB® instance.
Refer to CREATE LINK for more information. You can also specify the link configuration and credentials in the CONFIG
/CREDENTIALS
clause of theCREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement instead of creating a link. -
Create the required table(s), stored procedure(s), and pipeline(s) using the
CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement.Refer to Syntax for more information. You can either replicate the MongoDB® collections as is or apply custom transformations. Note
Before restarting the
INFER PIPELINE
operation, delete all the related artifacts.DROP TABLE <target_table_name>;DROP PIPELINE <pipeline_name>;DROP PROCEDURE <procedure_name>; -
Once all the components are configured, start the pipelines.
-
To start all the pipelines, run the
START ALL PIPELINES
SQL statement. -
To start a specific pipeline, run the
START PIPELINE <pipeline_
SQL statement.name> By default, the pipeline is named <source_
.db_ name>. <table_ name>
-
To ingest data in the JSON format instead of BSON, you need to manually create the required table structures, pipelines, and stored procedures for mapping the BSON data type to JSON.
For more information, refer to the relevant section on this page:
Replicate MongoDB® Collections Example
The following example shows how to replicate MongoDB® collections without any custom transformations.LINK
clause to specify the MongoDB® Atlas endpoint connection configuration.
-
Create a link to the MongoDB® endpoint, for example, the primary node of a MongoDB® Atlas cluster.
CREATE LINK <linkname> AS MONGODBCONFIG '{"mongodb.hosts":"<Hostname>","collection.include.list": "<Collection list>","mongodb.ssl.enabled":"true","mongodb.authsource":"admin"}'CREDENTIALS '{"mongodb.user":"<username>","mongodb.password":"<password>"}'; -
Create tables, pipelines, and stored procedures in SingleStore based on the inference from the source collections.
CREATE TABLES AS INFER PIPELINE AS LOAD DATALINK <linkname> '*' FORMAT AVRO; -
Once the link and the tables are created, run the following command to start all the pipelines and begin the data replication process:
## Start pipelinesSTART ALL PIPELINES; -
To view the ingested data, run the following SQL statement:
SELECT _id :> JSON , _more :> JSON FROM <table_name>;
Syntax
CREATE TABLE [IF NOT EXISTS] <table_name>AS INFER PIPELINE AS LOAD DATA{ LINK <link_name> "<source_db>.<source_collection>" |MONGODB "<source_db>.<source_collection>" CONFIG <config_json> CREDENTIALS <credentials_json> }FORMAT AVRO;
CREATE TABLES [IF NOT EXISTS]AS INFER PIPELINE AS LOAD DATA{ LINK <link_name> "*" |MONGODB '*' CONFIG <config_json> CREDENTIALS <credentials_json> }FORMAT AVRO;
CREATE TABLE . . . AS INFER PIPELINE
Behavior
The CREATE TABLE [IF NOT EXISTS] <table_
statement,
-
Connects to the MongoDB® servers using the specified
LINK <link_
orname> MONGODB <collection> CONFIG <conf_
clause.json> CREDENTIALS <cred_ json> -
Discovers the available databases and collections filtered by
collection.
.include. list -
Infers the schema of the collection and then creates a table (named <table_
name>) in SingleStore using the inferred schema. You can also specify a table name that differs from the name of the source MongoDB® collection. If the specified table already exists, a new table is not created and the existing table is used instead. -
Creates a pipeline (named <source_
db_ name>. <table_ name>) and stored procedure (named <source_ db_ name>. <table_ name>) that maps the AVRO data structure to the SingleStore data structure. The [IF NOT EXISTS]
clause is ignored for pipelines and stored procedures.If a pipeline or stored procedure with the same name already exists, the CREATE TABLE .
statement returns an error.. . AS INFER PIPELINE
CREATE TABLES AS INFER PIPELINE
Behavior
The CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE
statement creates a table for each collection in the source database using the same set of operations as the CREATE TABLE [IF NOT EXISTS] <table_
statement (specified above).
Arguments
-
<table_
: Name of the table to create in the SingleStore database.name> You can also specify a table name that differs from the name of the source MongoDB® collection. -
<link_
: Name of the link to the MongoDB® endpoint.name> Refer to CREATE LINK for more information. -
<collection>
: Name of the source MongoDB® collection. -
<config_
: Configuration parameters, including the source MongoDB® configuration, in the JSON format.json> Refer to Parameters for supported parameters. -
<credentials_
: Credentials to use to access the MongoDB® database, in JSON format.json> For example: CREDENTIALS '{"mongodb.password": "<password>", "mongodb.user": "<user>"}'
-
mongodb.
: The name of the database user to use when connecting to MongoDB® servers.user -
mongodb.
: The password to use when connecting to MongoDB® servers.password
-
Parameters
The CREATE {TABLE | TABLES}
, CREATE LINK
.CREATE AGGREGATOR PIPELINE
statement supports the following parameters in the CONFIG
clause:
-
mongodb.
: A comma-separated list of MongoDB® servers (nodes) in the replica set, inhosts 'hostname:[port]'
format.-
If
mongodb.
is set tomembers. auto. discover FALSE
, you must prefix the'hostname:[port]'
with the name of the replica set inmongodb.
, e.hosts g. , rset0/svchost-xxx:27017. The first node specified in mongodb.
is always selected as the primary node.hosts -
If
mongodb.
is set tomembers. auto. discover TRUE
, you must specify both the primary and secondary nodes in the replica set inmongodb.
.hosts
-
-
mongodb.
: Specifies whether the MongoDB® servers defined inmembers. auto. discover mongodb.
should be used to discover all the members of the replica set.hosts If disabled, the servers are used as is. -
mongodb.
: Enables the connector to use SSL when connecting to MongoDB® servers.ssl. enabled -
mongodb.
: Specifies the database containing MongoDB® credentials to use as an authentication source.authsource This parameter is only required when the MongoDB® instance is configured to use authentication with an authentication database other than admin
. -
collection.
: A comma-separated list of regular expressions that match fully-qualified namespaces (ininclude. list databaseName.
format) for MongoDB® collections to monitor.collectionName By default, all the collections are monitored, except for those in the local
andadmin
databases.When this option is specified, collections excluded from the list are not monitored. The collection.
andinclude. list collection.
parameters are mutually exclusive, i.exclude. list e. , they cannot be used in the same CREATE TABLE .
statement.. . AS INFER PIPELINE This parameter is only supported in CREATE TABLE .
statements.. . AS INFER PIPELINE -
collection.
: A comma-separated list of regular expressions that match fully-qualified namespaces (inexclude. list databaseName.
format) for MongoDB® collections to exclude from the monitoring list.collectionName By default, this list is empty. The collection.
andinclude. list collection.
parameters are mutually exclusive, i.exclude. list e. , they cannot be used in the same CREATE TABLE .
statement.. . AS INFER PIPELINE This parameter is only supported in CREATE TABLE .
statements.. . AS INFER PIPELINE -
database.
(Optional): A comma-separated list of regular expressions that match the names of databases to monitor.include. list By default, all the databases are monitored. When this option is specified, databases excluded from the list are not monitored. The database.
andinclude. list database.
parameters are mutually exclusive, i.exclude. list e. , they cannot be used in the same CREATE TABLE .
statement.. . AS INFER PIPELINE If this option is used with the collection.
orinclude. list collection.
option, it returns the intersection of the matches.exclude. list This parameter is only supported in CREATE TABLE .
statements.. . AS INFER PIPELINE -
database.
(Optional): A comma-separated list of regular expressions that match the names of databases to exclude from monitoring.exclude. list By default, this list is empty. The database.
andinclude. list database.
parameters are mutually exclusive, i.exclude. list e. , they cannot be used in the same CREATE TABLE .
statement.. . AS INFER PIPELINE If this option is used with the collection.
orinclude. list collection.
option, it returns the intersection of the matches.exclude'list This parameter is only supported in CREATE TABLE .
statements.. . AS INFER PIPELINE -
signal.
(Optional): A collection in the remote source that is used by SingleStore to generate special markings for snapshotting and synchronization.data. collection By default, this parameter is set to singlestore.
, wheresignals_ xxxxxx xxxxxx
is an automatically generated character sequence.The default signal collection is in the database named singlestore
.The MongoDB® user must have write permissions to this collection. Once the pipelines are started, any change to the value of this parameter is ignored, and the pipelines use the latest value specified before the pipelines started. -
max.
: Specifies the size of the queue inside the extractor process for records that are ready for ingestion.queue. size The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_ cdc_ java_ heap_ size. -
max.
: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch).batch. size The default batch size is 512. max.
must be lower thanbatch. size max.
.queue. size -
poll.
: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process.interval. ms The default interval is 500 milliseconds. -
snapshot.
: Specifies the snapshot mode for the pipeline.mode It can have one of the following values: -
"initial"
(Default): Perform a full snapshot first and replay CDC events created during the snapshot.Then, continue ingestion using CDC. -
"incremental"
: Start the snapshot operation and CDC simultaneously. -
"never"
: Skip the snapshot, and ingest changes using CDC.
Refer to CDC Snapshot Strategies for more information.
-
Replication Strategies
Use one of the following methods to create the required components for data ingestion.
Replicate MongoDB® Collections As Is
To replicate or migrate MongoDB® collections as is, use the CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement.
Apply Transformations or Ingest a Subset of Columns
To apply transformations or ingest only a subset of collections, manually create the required tables, stored procedure, and pipelines:
-
Run the
CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement to infer the schema of the MongoDB® collection(s) and automatically generate templates for the relevant table(s), stored procedure(s), and aggregator pipeline(s). -
Use the automatically-generated templates as a base to create a new table(s), stored procedure(s), and pipeline(s) for custom transformations.
To inspect the generated table(s), stored procedure(s), and pipeline(s), use the SHOW CREATE TABLE
,SHOW CREATE PROCEDURE
, andSHOW CREATE PIPELINE
commands, respectively.After running the SHOW
commands, you can drop the templates and then recreate the same components with custom transformations.Using the automatically-generated templates:
-
Create table(s) in SingleStore with a structure that can store the ingested MongoDB® collection.
Refer to CREATE TABLE for more information. -
Create stored(s) procedure to map the MongoDB® collection to the SingleStore table and implement other transformations required.
Refer to CREATE PROCEDURE for information on creating stored procedures. -
Create pipeline(s) to ingest the MongoDB® collection(s) using the
CREATE AGGREGATOR PIPELINE
SQL statement.Refer to Parameters for a list of supported parameters.
Refer to CREATE PIPELINE for the complete syntax and related information. Note: The CDC feature only supports
AGGREGATOR
pipelines.
-
Refer to Syntax for information on CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement.
CDC Snapshot Strategies
SingleStore supports the following strategies for creating snapshots:
-
Perform a full snapshot before CDC:
The pipeline captures the position in the oplog and then performs a full snapshot of the data.
Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default. If the pipeline is restarted while the snapshot is in progress, the snapshot is restarted from the beginning. To use this strategy, set
"snapshot.
in themode":"initial" CONFIG
JSON.Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
CDC only:
The pipeline will not ingest existing data, and only the changes are captured using CDC.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON. -
Perform a snapshot in parallel to CDC:
The pipeline captures the position in the oplog and starts capturing the changes using CDC.
In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records. Although this strategy is slower than performing a full snapshot and then ingesting changes using CDC, it is more resilient to pipeline restarts. To use this strategy, set
"snapshot.
in themode":"incremental" CONFIG
JSON.Requirement: The oplog retention period must be long enough to compensate for unexpected pipeline downtime.
-
Manually perform the snapshot and capture changes using the CDC pipeline:
-
Create a pipeline and then wait for at least one batch of ingestion to capture the oplog position.
-
Stop the pipeline.
-
Snapshot the data using any of the suitable methods, for example,
mongodump
. -
Restore the snapshot in SingleStore using any of the supported tools, for example
mongorestore
. -
Start the CDC pipeline.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON.This strategy provides faster data ingestion when the initial historical data is very large in size. Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
Configure Ingestion Speed Limit using Engine Variables
Use the following engine variables to configure ingestion speed:
Variable Name |
Description |
Default Value |
---|---|---|
|
Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to your SingleStore databases. |
|
|
Specifies the JVM heap size limit (in MBs) for CDC-in pipelines. |
|
In-Depth Variable Definitions
Use the pipelines_
engine variable to limit the impact of CDC pipelines on the master aggregator node.1000000
.0
.
Warning
Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.
Use the max.
parameter in the CONFIG
JSON to control the ingestion speed.max.
to half of max.
.pipelines_
engine variable accordingly.INFORMATION_
table for information on pipeline batch performance.
Limitations
-
SingleStore does not support data replication using CDC pipelines from MongoDB® Atlas standalone or serverless instances.
Replication is only supported for replica set deployments. -
Because MongoDB® time series collections do not support change streams, they cannot be replicated in SingleStore using CDC.
Refer to Time Series Collection Limitations for more information.
Troubleshooting
-
If the
CREATE {TABLES | TABLE} AS INFER PIPELINE
SQL statement returns an error, run theSHOW WARNINGS
command to view the reason behind the failure. -
To view the status of all the pipelines, query the
information_
table.schema. pipelines_ cursors Run the following SQL statement to display the status of the replication task: SELECT SOURCE_PARTITION_ID,EARLIEST_OFFSET,LATEST_OFFSET,LATEST_EXPECTED_OFFSET-LATEST_OFFSET as DIFF,UPDATED_UNIX_TIMESTAMPFROM information_schema.pipelines_cursors; -
To view pipeline errors, run the following SQL statement:
SELECT * FROM information_schema.pipelines_errors; -
If a pipeline fails with an out of memory error in Java, either increase the heap size using the
pipeline_
engine variable or reduce thecdc_ java_ heap_ size max.
andqueue. size max.
parameters in thebatch. size CONFIG
clause.The heap size is limited by the memory available on the MA. SingleStore recommends setting the queue size double of batch size.
Example
Example 1 - Create Tables with Different Names from the Source Collection
To create tables in SingleStore with names that differ from the name of the source MongoDB® collection, use the following syntax:
CREATE TABLE <new_table_name> AS INFER PIPELINE AS LOAD DATALINK <link_name> '<source_db.source_collection>' FORMAT AVRO;
You can also use this command to import collections if a table with the same name already exists in SingleStore.
Last modified: September 10, 2024