Replicate MongoDB® Collections to SingleStore

SingleStore allows you to replicate your existing MongoDB® collections to your SingleStore Helios database using Change Data Capture (CDC). The CDC pipeline ingests data in the BSON format. You can perform the replication using any of the following methods:

  • MongoDB® wrapper commands through the SingleStore Kai endpoint

  • SQL commands

Note: Because MongoDB® time series collections do not support change streams, they cannot be replicated in SingleStore using CDC. Refer to Time Series Collection Limitations for more information.

Source Database Requirements

The replication feature uses MongoDB® change streams, and it works only with replica sets. A MongoDB® user must have the following roles (privileges) to replicate the collections in SingleStore:

  • Read the admin database for the operation log (oplog)

  • Read the config database in the configuration server

  • listDatabases privilege

  • Cluster-wide find and changeStream privileges

  • Write permission to the singlestore database

Allow Access to the MongoDB® Instance

To allow SingleStore to connect to your MongoDB® instance (MongoDB® Atlas or self-managed deployment), allow incoming traffic from the outbound IP addresses of your SingleStore Helios workspace group.

Note

Allow all the outbound IP addresses.

  1. Copy the outbound IP address of your SingleStore Helios workspace.

    1. On the Cloud Portal, select <your_workspace_group> > Firewall > Outbound.

    2. Copy all the IPs listed under IPs for Outbound Connections.

  2. Allow incoming traffic to the MongoDB® instance from the SingleStore Helios outbound IP addresses. Follow the instructions for your deployment:

Replicate MongoDB® Collections using MongoDB® Wrapper Commands

Note

SingleStore recommends using MongoDB® wrapper commands to replicate your MongoDB® collections into a SingleStore Kai-enabled workspace.

To replicate your MongoDB® collections using MongoDB® wrapper commands, perform the following tasks and run the respective commands on a SingleStore Kai-enabled workspace:

  1. Create a link: Create a link using the URI of the (remote) MongoDB® instance.

    db.runCommand({createLink:"<link_name>",uri:"mongodb://<username>:<password>@<hostname>:27017"})

    For MongoDB® Atlas instances, use the mongodb+srv:// scheme. Refer to createLink for more information.

  2. Create a collection: Create a collection, and specify the link created in the previous step using the from option as follows:

    db.createCollection("<collection_name>", { from:{link:"<link_name>"} })

    Refer to createCollection for more information.

Here's an example:

db.runCommand({createLink:"pLink",uri:"mongodb://admin:pa55w0rd@svchost:27017"})
db.createCollection("colTest", {from:{link:"pLink", database:"dbTest", collection:"example"}})

Replicate MongoDB® Collections using SQL

Note

SingleStore does not recommend using this feature if you are using a SingleStore Kai-enabled workspace as it may create table structures that are not optimal for the API.

To replicate your existing MongoDB® collections to your SingleStore Helios database using SQL syntax for pipelines, create a link to the MongoDB® endpoint using the CREATE LINK statement, and use one of the following methods:

  • To replicate/migrate collections (as is), use the CREATE {TABLES | TABLE} AS INFER PIPELINE ... SQL statement. Refer to Remarks for more information.

  • To apply transformations and manually ingest the collections,

    1. Create a table with a structure that can store the ingested collections.

    2. Create a stored procedure to map the MongoDB® collection to the SingleStore table and implement other transformations required.

    3. Create a pipeline to ingest the MongoDB® collections using the CREATE AGGREGATOR PIPELINE SQL statement. Refer to Parameters for a list of supported parameters.

      Note

      The CDC feature only supports AGGREGATOR pipelines.

  • (Optional) Run the CREATE {TABLES | TABLE} AS INFER PIPELINE ... SQL statement to infer the schema of the collection(s) and generate the relevant table(s), stored procedure, and aggregator pipeline, automatically. Subsequently, use the automatically generated stored procedure and pipeline as a base for custom transformations. To inspect the stored procedure and pipeline, use the SHOW CREATE PROCEDURE and SHOW CREATE PIPELINE commands, respectively.

Once all the components are configured, start the pipelines.

To ingest data in the JSON format instead of BSON, you need to manually create the required table structures, pipelines, and stored procedures for mapping the BSON data type to JSON.

Syntax

CREATE LINK [db_name.]connection_name AS MONGODB
CREDENTIALS 'credentials_json' [<link_configuration>]

Refer to CREATE LINK for the complete syntax and related information.

CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE AS LOAD DATA
[ LINK <link_name> FORMAT AVRO ... ]
[ MONGODB <collection> CONFIG <conf_json> CREDENTIALS <cred_json> FORMAT AVRO ... ]
CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE AS LOAD DATA
[ LINK <link_name> FORMAT AVRO ... ]
[ MONGODB '*' CONFIG <conf_json> CREDENTIALS <cred_json> FORMAT AVRO ... ]

Refer to CREATE TABLE for related information.

CREATE [OR REPLACE] AGGREGATOR PIPELINE [IF NOT EXISTS] <pipeline_name> AS
LOAD DATA MONGODB <link_configuration> ...
<avro_format_options>
<link_configuration>:
LINK [<database_name>.]<connection_name> '<path>'
<avro_format_options>:
FORMAT AVRO SCHEMA REGISTRY {uri}
( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)
[SCHEMA '<avro_schema>']
[KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)]

Refer to CREATE PIPELINE for the complete syntax and related information.

Refer to CREATE PROCEDURE for information on creating stored procedures.

Parameters

The CREATE AGGREGATOR PIPELINE ... AS LOAD DATA MONGODB statement supports the following parameters in the CONFIG/CREDENTIALS clause:

  • mongodb.hosts: A comma-separated list of MongoDB® servers (nodes) in the replica set, in 'hostname:[port]' format.

    The mongodb.connection.string and mongodb.hosts parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement.

    • If mongodb.members.auto.discover is set to FALSE, you must prefix the 'hostname:[port]' with the name of the replica set in mongodb.hosts, e.g., rset0/svchost-xxx:27017. The first node specified in mongodb.hosts is always selected as the primary node.

    • If mongodb.members.auto.discover is set to TRUE, you must specify both the primary and secondary nodes in the replica set in mongodb.hosts.

  • mongodb.connection.string: Specifies the URI of the remote MongoDB® instance. This parameter supports both the standard and SRV connection string formats. The mongodb.connection.string and mongodb.hosts parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement.

  • mongodb.members.auto.discover: Specifies whether the MongoDB® servers defined in mongodb.hosts should be used to discover all the members of the replica set. If disabled, the servers are used as is.

  • mongodb.user: The name of the database user to use when connecting to MongoDB® servers.

  • mongodb.password: The password to use when connecting to MongoDB® servers.

  • mongodb.ssl.enabled: Enables the connector to use SSL when connecting to MongoDB® servers.

  • mongodb.authsource: Specifies the database containing MongoDB® credentials to use as an authentication source. This parameter is only required when the MongoDB® instance is configured to use authentication with an authentication database other than admin.

  • collection.include.list: A comma-separated list of regular expressions that match fully-qualified namespaces (in databaseName.collectionName format) for MongoDB® collections to monitor. By default, all the collections are monitored, except for those in the local and admin databases. When this option is specified, collections excluded from the list are not monitored. The collection.include.list and collection.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement. This parameter is only supported in CREATE TABLE ... AS INFER PIPELINE statements.

  • collection.exclude.list: A comma-separated list of regular expressions that match fully-qualified namespaces (in databaseName.collectionName format) for MongoDB® collections to exclude from the monitoring list. By default, this list is empty. The collection.include.list and collection.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement. This parameter is only supported in CREATE TABLE ... AS INFER PIPELINE statements.

  • database.include.list (Optional): A comma-separated list of regular expressions that match the names of databases to monitor. By default, all the databases are monitored. When this option is specified, databases excluded from the list are not monitored. The database.include.list and database.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement. If this option is used with the collection.include.list or collection.exclude.list option, it returns the intersection of the matches. This parameter is only supported in CREATE TABLE ... AS INFER PIPELINE statements.

  • database.exclude.list (Optional): A comma-separated list of regular expressions that match the names of databases to exclude from monitoring. By default, this list is empty. The database.include.list and database.exclude.list parameters are mutually exclusive, i.e., they cannot be used in the same CREATE TABLE ... AS INFER PIPELINE statement. If this option is used with the collection.include.list or collection.exclude'list option, it returns the intersection of the matches. This parameter is only supported in CREATE TABLE ... AS INFER PIPELINE statements.

  • signal.data.collection (Optional): A collection in the remote source that is used by SingleStore to generate special markings for snapshotting and synchronization. By default, this parameter is set to singlestore.signals. The default signal collection is in the database named singlestore. The MongoDB® user must have write permissions to this collection.

  • max.queue.size: Specifies the size of the queue inside the extractor process for records that are ready for ingestion. The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_cdc_java_heap_size.

  • max.batch.size: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch). The default batch size is 512. max.batch.size must be lower than max.queue.size.

  • poll.interval.ms: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process. The default interval is 500 milliseconds.

  • snapshot.mode: Specifies the snapshot mode for the pipeline. It can have one of the following values:

    • "initial" (Default): Perform a full snapshot first and replay CDC events created during the snapshot. Then, continue ingestion using CDC.

    • "incremental": Start the snapshot operation and CDC simultaneously.

    • "never": Skip the snapshot, and ingest changes using CDC.

    Refer to CDC Snapshot Strategies for more information.

Remarks

  • The CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement,

    1. Connects to the MongoDB® servers using the specified LINK <link_name> or MONGODB <collection> CONFIG <conf_json> CREDENTIALS <cred_json> clause.

    2. Discovers the available databases and collections filtered by collection.include.list.

    3. Infers the schema of the collection and then creates a table (named <table_name>) in SingleStore using the inferred schema. You can also specify a table name that differs from the name of the source MongoDB® collection. If the specified table already exists, a new table is not created and the existing table is used instead.

    4. Creates a pipeline (named <source_db_name>.<table_name>) and stored procedure (named <source_db_name>.<table_name>) that maps the AVRO data structure to the SingleStore data structure. The [IF NOT EXISTS] clause is ignored for pipelines and stored procedures. If a pipeline or stored procedure with the same name already exists, the CREATE TABLE ... AS INFER PIPELINE statement returns an error.

  • The CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE statement creates a table for each collection in the source database using the same set of operations as the CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE statement (specified in the previous list item).

  • The MONGODB source only supports AVRO data type.

  • Use the SYNC PIPELINE <pipeline_name> statement to replicate (sync with) the data source.

CDC Snapshot Strategies

SingleStore supports the following strategies for creating snapshots:

  • Perform a full snapshot before CDC: The pipeline captures the position in the oplog and then performs a full snapshot of the data. Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default.

    To use this strategy, set "snapshot.mode":"initial" in the CONFIG JSON.

    Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

  • CDC only: The pipeline will not ingest existing data, and only the changes are captured using CDC.

    To use this strategy, set "snapshot.mode":"never" in the CONFIG JSON.

  • Perform a snapshot in parallel to CDC: The pipeline captures the position in the oplog and starts capturing the changes using CDC. In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records.

    To use this strategy, set "snapshot.mode":"incremental" in the CONFIG JSON.

    Requirement: The oplog retention period must be long enough to compensate for unexpected pipeline downtime.

  • Manually perform the snapshot and capture changes using the CDC pipeline:

    1. Create a pipeline and then wait for at least one batch of ingestion to capture the oplog position.

    2. Stop the pipeline.

    3. Snapshot the data using any of the suitable methods, for example, mongodump and mongorestore.

    4. Start the pipeline.

    To use this strategy, set "snapshot.mode":"never" in the CONFIG JSON.

    Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

Configure Ingestion Speed Limit

Use the following engine variables to configure ingestion speed:

Variable Name

Description

Default Value

pipelines_cdc_row_emit_delay_us

Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to your SingleStore Helios databases. It can have a maximum value of 1000000.

1

pipelines_cdc_java_heap_size

Specifies the JVM heap size limit (in MBs) for CDC-in pipelines.

128

pipelines_cdc_max_extractors

Specifies the maximum number of CDC-in extractor instances that can run concurrently.

16

pipelines_cdc_min_extractor_lifetime_s

Specifies the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events.

60

In-Depth Variable Definitions

Use the pipelines_cdc_row_emit_delay_us engine variable to limit the impact of CDC pipelines on the master aggregator node. It specifies a forced delay in row emission during ingest. This variable can be set to a maximum value of 1000000.

Use the max.queue.size parameter in the CONFIG JSON to control the ingestion speed. SingleStore recommends setting max.batch.size to half of max.queue.size. Increasing the queue size requires a larger Java heap limit, adjust the pipelines_cdc_java_heap_size engine variable accordingly. Query the INFORMATION_SCHEMA.PIPELINES_BATCHES_SUMMARY table for information on pipeline batch performance.

Use the pipelines_cdc_max_extractors engine variable to limit the number of CDC-in extractor instances that can run concurrently. The extractors provide data to the CDC-in pipelines and consume a persistent amount of resources in the process. Therefore, the Master Aggregator node can only run a limited number of extractor instances. If the number of CDC-in pipelines is greater than pipelines_cdc_max_extractors, some pipelines will have to wait in the queue until an extractor can be acquired to fetch data. This variable can be set to a maximum value of 1024.

Use the pipelines_cdc_min_extractor_lifetime_s variable to specify the minimum duration (in seconds) that the extractor allocates to a single pipeline for ingesting data and listening to CDC events. This variable can be set to a maximum value of 3600.

Examples

Example 1 - Replicate Collections using SQL

The following example replicates data from an existing MongoDB® Atlas cluster into SingleStore Helios.

  1. Create a link to the MongoDB® endpoint, for example, the primary node of a MongoDB® Atlas cluster.

    CREATE LINK <linkname> AS MONGODB
    CONFIG '{"mongodb.hosts":"<Hostname>",
    "collection.include.list": "<Collection list>",
    "mongodb.ssl.enabled":"true",
    "mongodb.authsource":"admin"}'
    CREDENTIALS '{"mongodb.user":"<username>",
    "mongodb.password":"<password>"}';
  2. Create tables in SingleStore Helios.

    ## Infer tables from source
    CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK <linkname> '*' FORMAT AVRO;
  3. Once the link and the tables are created, run the following command to start all the pipelines and begin the data replication process:

    ## Start pipelines
    START ALL PIPELINES;

To view the status of all the pipelines, query the information_schema.pipelines_cursors table. Run the following SQL statement to display the status of the replication task:

SELECT SOURCE_PARTITION_ID,
EARLIEST_OFFSET,
LATEST_OFFSET,
LATEST_EXPECTED_OFFSET-LATEST_OFFSET as DIFF,
UPDATED_UNIX_TIMESTAMP
FROM information_schema.pipelines_cursors;

To view the ingested BSON data, SingleStore recommends the following:

  • Use the Kai Shell or other supported MongoDB tools, such as MongoDB Compass.

  • Cast the columns to JSON using the following SQL command:

    SELECT _id :> JSON , _more :> JSON FROM <table_name>;

Example 2 - Create Tables with Different Names from the Source Collection

To create tables in SingleStore with names that differ from the name of the source MongoDB® collection, use the following syntax:

CREATE TABLE <new_table_name> AS INFER PIPELINE AS LOAD DATA
LINK <link_name> '<source_db.source_collection>' FORMAT AVRO;

You can also use this command to import collections if a table with the same name already exists in SingleStore. Additionally, you can use this syntax to reimport a collection with a distinct table name.

Last modified: May 21, 2024

Was this article helpful?