# Replicate Data from MongoDB®

Change Data Capture (CDC) pipelines enable you to ingest historical data and sync the continuous changes to data as they happen on the source MongoDB® database. SingleStore allows you to replicate your existing MongoDB® collections to your SingleStore database using Change Data Capture (CDC). The CDC pipeline ingests data in the BSON (A binary-encoded serialization of JSON-like documents. It is designed to be efficient in space but also rich in its ability to represent more data types than JSON.) format.

> **📝 Note**: You can only create a maximum of 16 CDC-in pipelines in total for MongoDB® and MySQL sources.

You can perform the replication using SQL statements.

## &#x20;Data Storage Model

The `_id` field of the document is added to the `_id` column, and the rest of the fields in the document are added to the `_more` column of the table in SingleStore.

| Column Name | Data Type          | Description                                                                                                                                                                                                                                                                                                                   |
| ----------- | ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `_id`       | BSON NOT NULL      | The`_id`field required in allMongoDB®documents.                                                                                                                                                                                                                                                                               |
| `_more`     | BSON NOT NULL      | Contains all the fields in the document, except the`_id`field.                                                                                                                                                                                                                                                                |
| `$_id`      | PERSISTED LONGBLOB | Used to implementSingleStoreunique indexes onMongoDB®collections, similar to BSON unique indexes. This column contains the`_id`field normalized using the`BSON_NORMALIZE`or`BSON_NORMALIZE_NO_ARRAY`function, which transform the BSON data into a byte stream that preserves the BSON comparison when compared byte-by-byte. |

## View the BSON Data

To view the BSON data, SingleStore recommends the following:

* Cast the columns to JSON using the following SQL command:
  ```sql
  SELECT _id :> JSON , _more :> JSON FROM <table_name>;
  ```

## Prerequisites

## Configure `java` Binary Path

Install JRE version 25+, and specify the full path of the `java` binary using the `java_path` engine variable. You must set this engine variable on each node. Update the `java` binary path in the following command and then run it to configure `java_path`:

```Shell
sdb-admin update-config --all --set-global --key "java_path" --value "/path_to/bin/java"
```

Refer to [sdb-admin update-config](https://docs.singlestore.com/db/v9.1/reference/singlestore-tools-reference/sdb-admin-commands/update-config.md) for more information.

## Source Database Requirements

The replication feature uses MongoDB® [change streams](https://www.mongodb.com/docs/manual/changeStreams/), and it works only with replica sets. A MongoDB® user must have the following roles (privileges) to replicate the collections in SingleStore:

* Read the `admin` database for the operation log (oplog)
* Read the `config` database in the configuration server
* `listDatabases` privilege
* `clusterMonitor` role
* Cluster-wide `find` and `changeStream` privileges
* Write permission to the `singlestore` database. The `singlestore` database is automatically created during the replication process.

You can provide the privileges or assign roles to the MongoDB® user using the MongoDB® Atlas UI or MongoDB® commands. For example:

* Using the UI for MongoDB® Atlas:

  1. On the MongoDB® Cloud dashboard, select **Projects > \<your\_project>**.

  2. In the left navigation pane, under **Security**, select **Database Access**.

  3. On the **Database Users page**, select **Edit** for the MongoDB® user used to connect to the MongoDB® instance.

  4. On the **Edit User** dialog, under **Database User Privileges**, select **Specific Privileges > Add Specific Privilege**.

  5. Add the following privileges:

     * `readAnyDatabase`
     * `read` for the `config` database
     * `readWrite` for the `singlestore` database

     ![Provide the readWrite privilege for the singlestore database to the user.](https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/blted17cafda8869051/6a3052798859ae5507205f55/mongo-singlestore-db-privilege-jfJqcv.png)

  6. Select **Update User**.
* Using MongoDB® commands for self-managed deployments: The following commands create a role and then assign the role to a user.
  ```MongoDB
  db.adminCommand({ createRole: 'cdcRole', 
    privileges: [
      {'resource': {'cluster': true}, 'actions': ['find', 'changeStream']}
    ], 
    roles: [
      {'role': 'read', 'db': 'admin'},
      {'role': 'read', 'db': 'local'},
      {'role': 'read', 'db': 'config'},
      {'role': 'readWrite', 'db': 'singlestore'}
  ]})

  db.adminCommand({ createUser: 'cdcUser', 
    pwd: 's2mongoCDC12', 
    roles: ['cdcRole']
  })

  ```
  Refer to [Database Commands](https://www.mongodb.com/docs/manual/reference/command/) for more information.

## Replicate MongoDB® Collections using SQL

You can replicate your existing MongoDB® collections to your SingleStore database using SQL commands via Change Data Capture (CDC) pipelines.

> **📝 Note**: You can only create a maximum of 16 CDC-in pipelines in total for MongoDB® and MySQL sources.

Perform the following tasks:

1. Ensure that the [Prerequisites](https://docs.singlestore.com/db/v9.1/load-data/data-sources/replicate-data-from-mongodb/#section-idm323445173775296.md) are met.

2. (Optional) Create a link to the MongoDB® instance. Refer to [CREATE LINK](https://docs.singlestore.com/db/v9.1/reference/sql-reference/security-management-commands/create-link.md) for more information. You can also specify the link configuration and credentials in the `CONFIG`/`CREDENTIALS` clause of the `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement instead of creating a link.

3. Create the required table(s), stored procedure(s), and pipeline(s) using the `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement. Refer to [Syntax](https://docs.singlestore.com/#section-idm4526499926180833676408836072.md) for more information. You can either [replicate the MongoDB® collections as is](https://docs.singlestore.com/#section-idm3210335078812990.md) or [apply custom transformations](https://docs.singlestore.com/#section-idm3225837697032472.md).
   > **📝 Note**: Before restarting the `INFER PIPELINE` operation, delete all the related artifacts.```sql
   > DROP TABLE <target_table_name>;
   > DROP PIPELINE <pipeline_name>;
   > DROP PROCEDURE <procedure_name>;
   > ```

4. Once all the components are configured, start the pipelines.

   * To start all the pipelines, run the `START ALL PIPELINES` SQL statement.
   * To start a specific pipeline, run the `START PIPELINE <pipeline_name>` SQL statement. By default, the pipeline is named `<source_db_name>.<table_name>`.

To ingest data in the JSON format instead of BSON, you need to manually create the required table structures, pipelines, and stored procedures for mapping the BSON data type to JSON.

For more information, refer to the relevant section on this page:

* [Replication Strategies](https://docs.singlestore.com/#section-idm323445026270996.md)
* [CDC Snapshot Strategies](https://docs.singlestore.com/#section-idm4607585187219234014392364609.md)
* [Configure Ingestion Speed Limit using Engine Variables](https://docs.singlestore.com/#section-idm465169428608163391441938857.md)

## Replicate MongoDB® Collections Example

The following example shows how to replicate MongoDB® collections without any custom transformations. This example uses the `LINK` clause to specify the MongoDB® Atlas endpoint connection configuration.

1. Create a link to the MongoDB® endpoint, for example, the primary node of a MongoDB® Atlas cluster.
   ```sql
   CREATE LINK <linkname> AS MONGODB
     CONFIG '{"mongodb.hosts":"<Hostname>",
       "collection.include.list": "<Collection list>",
       "mongodb.ssl.enabled":"true",
       "mongodb.authsource":"admin"}'
     CREDENTIALS '{"mongodb.user":"<username>",
       "mongodb.password":"<password>"}';
   ```

2. Create tables, pipelines, and stored procedures in SingleStore based on the inference from the source collections.
   ```sql
   CREATE TABLES AS INFER PIPELINE AS LOAD DATA 
     LINK <linkname> '*' FORMAT AVRO;
   ```

3. Once the link and the tables are created, run the following command to start all the pipelines and begin the data replication process:
   ```sql
   ## Start pipelines
   START ALL PIPELINES;
   ```

4. To view the ingested data, run the following SQL statement:
   ```sql
   SELECT _id :> JSON , _more :> JSON FROM <table_name>;
   ```

## Syntax

```sql
CREATE TABLE [IF NOT EXISTS] <table_name> 
  AS INFER PIPELINE 
  AS LOAD DATA <mongodb_configuration>
  FORMAT AVRO;

-- Use either the LINK or MONGODB clause, they are mutually exclusive -- 
<mongodb_configuration>:
  LINK <link_name> "<source_db>.<source_collection>" 
  | MONGODB "<source_db>.<source_collection>" CONFIG <config_json> CREDENTIALS <credentials_json> 

```

```sql
CREATE TABLES [IF NOT EXISTS] 
  AS INFER PIPELINE 
  AS LOAD DATA <mongodb_configuration>  
  FORMAT AVRO;

-- Use either the LINK or MONGODB clause, they are mutually exclusive --
<mongodb_configuration>:
  LINK <link_name> "*" 
  | MONGODB '*' CONFIG <config_json> CREDENTIALS <credentials_json> 

```

## `CREATE TABLE ... AS INFER PIPELINE` Behavior

The `CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE` statement,

1. Connects to the MongoDB® servers using the specified `LINK <link_name>` or `MONGODB <collection> CONFIG <conf_json> CREDENTIALS <cred_json>` clause.

2. Discovers the available databases and collections filtered by `collection.include.list`.

3. Infers the schema of the collection and then creates a table (named **\<table\_name>**) in SingleStore using the inferred schema. You can also specify a table name that differs from the name of the source MongoDB® collection. If the specified table already exists, a new table is not created and the existing table is used instead.

4. Creates a pipeline (named **\<source\_db\_name>.\<table\_name>**) and stored procedure (named **\<source\_db\_name>.\<table\_name>**) that maps the AVRO data structure to the SingleStore data structure. The `[IF NOT EXISTS]` clause is ignored for pipelines and stored procedures. If a pipeline or stored procedure with the same name already exists, the `CREATE TABLE ... AS INFER PIPELINE` statement returns an error.

## `CREATE TABLES AS INFER PIPELINE` Behavior

The `CREATE TABLES [IF NOT EXISTS] AS INFER PIPELINE` statement creates a table for each collection in the source database using the same set of operations as the `CREATE TABLE [IF NOT EXISTS] <table_name> AS INFER PIPELINE` statement (specified above).

## Arguments

* `<table_name>`: Name of the table to create in the SingleStore database. You can also specify a table name that differs from the name of the source MongoDB® collection.
* `<link_name>`: Name of the link to the MongoDB® endpoint. Refer to [CREATE LINK](https://docs.singlestore.com/db/v9.1/reference/sql-reference/security-management-commands/create-link.md) for more information.
* `<collection>`: Name of the source MongoDB® collection.
* `<config_json>`: Configuration parameters, including the source MongoDB® configuration, in the JSON format. Refer to [Parameters](https://docs.singlestore.com/#section-idm4549735334267233676416981853.md) for supported parameters.
* `<credentials_json>`: Credentials to use to access the MongoDB® database, in JSON format. For example:
  ```
  CREDENTIALS '{"mongodb.password": "<password>", "mongodb.user": "<user>"}'
  ```
  * `mongodb.user`: The name of the database user to use when connecting to MongoDB® servers.
  * `mongodb.password`: The password to use when connecting to MongoDB® servers.

## Parameters

The `CREATE {TABLE | TABLES}`, `CREATE LINK`. and `CREATE AGGREGATOR PIPELINE` statement supports the following parameters in the `CONFIG` clause:

* `mongodb.hosts`: A comma-separated list of MongoDB® servers (nodes) in the replica set, in `'hostname:[port]'` format.

  The `mongodb.connection.string` and `mongodb.hosts` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement.

  * If `mongodb.members.auto.discover` is set to `FALSE`, you must prefix the `'hostname:[port]'` with the name of the replica set in `mongodb.hosts`, e.g., **rset0/svchost-xxx:27017**. The first node specified in `mongodb.hosts` is always selected as the primary node.
  * If `mongodb.members.auto.discover` is set to `TRUE`, you must specify both the primary and secondary nodes in the replica set in `mongodb.hosts`.
* `mongodb.connection.string`: Specifies the URI of the remote MongoDB® instance. This parameter supports both the standard and SRV [connection string formats](https://www.mongodb.com/docs/manual/reference/connection-string/). The `mongodb.connection.string` and `mongodb.hosts` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement.
* `mongodb.members.auto.discover`: Specifies whether the MongoDB® servers defined in `mongodb.hosts` should be used to discover all the members of the replica set. If disabled, the servers are used as is.
* `mongodb.ssl.enabled`: Enables the connector to use SSL when connecting to MongoDB® servers.
* `mongodb.authsource`: Specifies the database containing MongoDB® credentials to use as an authentication source. This parameter is only required when the MongoDB® instance is configured to use authentication with an authentication database other than `admin`.
* `mongodb.socket.timeout.ms`: Specifies the socket timeout (in milliseconds) for connections to a MongoDB® instance. The pipeline returns an error if there is no response from the MongoDB® server within the specified timeout. By default, `mongodb.socket.timeout.ms` is set to `12000` (2 minutes).

  **Note**: SingleStore does not recommend updating this parameter unless troubleshooting unusual behavior.
* `collection.include.list`: A comma-separated list of regular expressions that match fully-qualified namespaces (in `databaseName.collectionName` format) for MongoDB® collections to monitor. By default, all the collections are monitored, except for those in the `local` and `admin` databases. When this option is specified, collections excluded from the list are not monitored. The `collection.include.list` and `collection.exclude.list` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement. This parameter is only supported in `CREATE TABLE ... AS INFER PIPELINE` statements.
* `collection.exclude.list`: A comma-separated list of regular expressions that match fully-qualified namespaces (in `databaseName.collectionName` format) for MongoDB® collections to exclude from the monitoring list. By default, this list is empty. The `collection.include.list` and `collection.exclude.list` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement. This parameter is only supported in `CREATE TABLE ... AS INFER PIPELINE` statements.
* `database.include.list` (Optional): A comma-separated list of regular expressions that match the names of databases to monitor. By default, all the databases are monitored. When this option is specified, databases excluded from the list are not monitored. The `database.include.list` and `database.exclude.list` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement. If this option is used with the `collection.include.list` or `collection.exclude.list` option, it returns the intersection of the matches. This parameter is only supported in `CREATE TABLE ... AS INFER PIPELINE` statements.
* `database.exclude.list` (Optional): A comma-separated list of regular expressions that match the names of databases to exclude from monitoring. By default, this list is empty. The `database.include.list` and `database.exclude.list` parameters are mutually exclusive, i.e., they cannot be used in the same `CREATE TABLE ... AS INFER PIPELINE` statement. If this option is used with the `collection.include.list` or `collection.exclude'list` option, it returns the intersection of the matches. This parameter is only supported in `CREATE TABLE ... AS INFER PIPELINE` statements.
* `signal.data.collection` (Optional): A collection in the remote source that is used by SingleStore to generate special markings for snapshotting and synchronization. By default, this parameter is set to `singlestore.signals_xxxxxx`, where `xxxxxx` is an automatically generated character sequence. The default signal collection is in the database named `singlestore`. The MongoDB® user must have **write** permissions to this collection. Once the pipelines are started, any change to the value of this parameter is ignored, and the pipelines use the latest value specified before the pipelines started.
* `max.queue.size`: Specifies the size of the queue inside the extractor process for records that are ready for ingestion. The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the [pipelines\_cdc\_java\_heap\_size](https://docs.singlestore.com/db/v9.1/reference/configuration-reference/engine-variables/list-of-engine-variables/#sync-variables-lists.md).
* `max.batch.size`: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch). The default batch size is 512. `max.batch.size` must be lower than `max.queue.size`.
* `poll.interval.ms`: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process. The default interval is 500 milliseconds.
* `snapshot.mode`: Specifies the snapshot mode for the pipeline. It can have one of the following values:

  * `"initial"` (Default): Perform a full snapshot first and replay CDC events created during the snapshot. Then, continue ingestion using CDC.
  * `"incremental"`: Start the snapshot operation and CDC simultaneously.
  * `"never"`: Skip the snapshot, and ingest changes using CDC.

  Refer to [CDC Snapshot Strategies](https://docs.singlestore.com/#section-idm4607585187219234014392364609.md) for more information.

## Replication Strategies

Use one of the following methods to create the required components for data ingestion.

## Replicate MongoDB® Collections As Is

To replicate or migrate MongoDB® collections as is, use the `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement. This method automatically creates the required tables, pipelines, and stored procedures. Refer to [Syntax](https://docs.singlestore.com/#section-idm4526499926180833676408836072.md) for more information.

## Apply Transformations or Ingest a Subset of Columns

To apply transformations or ingest only a subset of collections, manually create the required tables, stored procedure, and pipelines:

1. Run the `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement to infer the schema of the MongoDB® collection(s) and automatically generate templates for the relevant table(s), stored procedure(s), and aggregator pipeline(s).

2. Use the automatically-generated templates as a base to create a new table(s), stored procedure(s), and pipeline(s) for custom transformations. To inspect the generated table(s), stored procedure(s), and pipeline(s), use the `SHOW CREATE TABLE` , `SHOW CREATE PROCEDURE`, and `SHOW CREATE PIPELINE` commands, respectively. After running the `SHOW` commands, you can drop the templates and then recreate the same components with custom transformations.

   Using the automatically-generated templates:

   1. Create table(s) in SingleStore with a structure that can store the ingested MongoDB® collection. Refer to [CREATE TABLE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-definition-language-ddl/create-table.md) for more information.

   2. Create stored(s) procedure to map the MongoDB® collection to the SingleStore table and implement other transformations required. Refer to [CREATE PROCEDURE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/procedural-sql-reference/create-procedure.md) for information on creating stored procedures.

   3. Create pipeline(s) to ingest the MongoDB® collection(s) using the `CREATE AGGREGATOR PIPELINE` SQL statement.

      Refer to [Parameters](https://docs.singlestore.com/#section-idm4549735334267233676416981853.md) for a list of supported parameters. Refer to [CREATE PIPELINE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/create-pipeline.md) for the complete syntax and related information.

      **Note**: The CDC feature only supports `AGGREGATOR` pipelines.

Refer to [Syntax](https://docs.singlestore.com/#section-idm4526499926180833676408836072.md) for information on `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement.

## CDC Snapshot Strategies

SingleStore supports the following strategies for creating snapshots:

* **Perform a full snapshot before CDC** (`"snapshot.mode":"initial"`):

  The pipeline captures the position in the oplog and then performs a full snapshot of the data. Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default. If the pipeline is restarted while the snapshot is in progress, the snapshot is restarted from the beginning.

  If the initial snapshot is large in size and the deployment is prone to restarts or connection issues from the source, SingleStore recommends using the `incremental` snapshot mode. Note that the `incremental` snapshot mode is slower than the `initial` mode. For faster data ingestion when the initial historical data is large in size, manually perform the snapshot and capture changes using the `never` mode.

  To use this strategy, set `"snapshot.mode":"initial"` in the `CONFIG` JSON.

  **Requirement**: The oplog retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.
* **CDC only** (`"snapshot.mode":"never"`):

  The pipeline will not ingest existing data, and only the changes are captured using CDC.

  To use this strategy, set `"snapshot.mode":"never"` in the `CONFIG` JSON.
* **Perform a snapshot in parallel to CDC** (`"snapshot.mode":"incremental"`):

  The pipeline captures the position in the oplog and starts capturing the changes using CDC. In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records. Although this strategy is slower than performing a full snapshot and then ingesting changes using CDC, it is more resilient to pipeline restarts.

  To use this strategy, set `"snapshot.mode":"incremental"` in the `CONFIG` JSON.

  **Requirement**: The oplog retention period must be long enough to compensate for unexpected pipeline downtime.
* **Manually perform the snapshot and capture changes using the CDC pipeline**:

  To use this strategy, set `"snapshot.mode":"never"` in the `CONFIG` JSON.

  1. Create a pipeline and then wait for at least one batch of ingestion to capture the oplog position.

  2. Stop the pipeline.

  3. Snapshot the data using any of the suitable methods, for example, `mongodump`.

  4. Restore the snapshot in SingleStore using any of the supported tools, for example `mongorestore`.

  5. Start the CDC pipeline.

  This strategy provides faster data ingestion when the initial historical data is very large in size.

  **Requirement**: The oplog retention period must be long enough to maintain the records while the snapshot is in progress. Otherwise, the pipeline will fail and the process will have to be started over.

## Configure Ingestion Speed Limit using Engine Variables

Use the following engine variables to configure ingestion speed:

| Variable Name                     | Description                                                                                                                                                             | Default Value |
| --------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------- |
| `pipelines_cdc_row_emit_delay_us` | Specifies a forced delay in row emission while migrating/replicating your tables (or collections) to yourSingleStoredatabases. It can have a maximum value of`1000000`. | `1`           |
| `pipelines_cdc_java_heap_size`    | Specifies the JVM heap size limit (in MBs) for CDC-in pipelines.                                                                                                        | `128`         |

## In-Depth Variable Definitions

Use the `pipelines_cdc_row_emit_delay_us` engine variable to limit the impact of CDC pipelines on the master aggregator node. It specifies a forced delay in row emission during ingest. This variable can be set to a maximum value of `1000000`. To disable the forced delay while ingestion, set this variable to `0`.

> **⚠️ Warning**: Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.

Use the `max.queue.size` parameter in the `CONFIG` JSON to control the ingestion speed. SingleStore recommends setting `max.batch.size` to half of `max.queue.size`. Increasing the queue size requires a larger Java heap limit, adjust the `pipelines_cdc_java_heap_size` engine variable accordingly. Query the `INFORMATION_SCHEMA.PIPELINES_BATCHES_SUMMARY` table for information on pipeline batch performance.

## Optimize CDC-in Pipelines

## Pipelines and Extractors

CDC-in [pipelines](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines.md)  ("pipelines") are aggregator pipelines that run on the Master Aggregator (MA). Each pipeline extracts data from a single source table and loads data into a single SingleStore table. Extractors are subprocesses that extract data from the source and provide the data to the pipelines. The extractors are shared between the CDC-in pipelines.

SingleStore recommends ingesting a limited number of tables using CDC-in pipelines.

## Memory and Resource Consumption

Each extractor consumes a persistent amount of resources (approximately `pipelines_cdc_java_heap_size` per extractor). It is used to store the queue of rows for subsequent batches of ingestion.

**Note**: Total memory consumption may be higher and includes memory usage by the static system memory, shared libraries, JVM heap, etc.

To reduce the resource consumption on the MA reduce the JVM heap size (`pipelines_cdc_java_heap_size`). Although, reducing this limit may also reduce the ingestion speed.

**Note**: If JVM heap size is reduced, you may also need to reduce the pipeline batch and queue size.

## Limitations

* SingleStore does not support data replication using CDC pipelines from MongoDB® Atlas standalone or serverless instances. Replication is only supported for replica set deployments.
* Because MongoDB® time series collections do not support change streams, they cannot be replicated in SingleStore using CDC. Refer to [Time Series Collection Limitations](https://www.mongodb.com/docs/manual/core/timeseries/timeseries-limitations/) for more information.

## Troubleshooting

* If the `CREATE {TABLES | TABLE} AS INFER PIPELINE` SQL statement returns an error, run the `SHOW WARNINGS` command to view the reason behind the failure.
* To view the status of the pipelines, query the `information_schema.PIPELINES_CURSORS` table. Run the following SQL statement to display the status of the replication task:
  ```sql
  SELECT SOURCE_PARTITION_ID, 
   EARLIEST_OFFSET,
   LATEST_OFFSET,
   LATEST_EXPECTED_OFFSET-LATEST_OFFSET as STATUS,
   UPDATED_UNIX_TIMESTAMP 
  FROM information_schema.PIPELINES_CURSORS;
  ```
  The value in the `STATUS` column indicates the following:

  * `1`: Indicates that the snapshot is in progress and the pipeline is expecting more data.
  * `0`: Indicates that the initial snapshot is complete and the pipeline is capturing changes using CDC.

  **Note**: If `snapshot.mode` is set to `incremental`, the pipeline performs incremental snapshots in parallel to capturing changes via CDC. In this case, the value `1` in the `STATUS` column indicates that the pipeline is performing incremental snapshots and capturing changes in parallel.
* To view pipeline errors, run the following SQL statement:
  ```sql
  SELECT * FROM information_schema.PIPELINES_ERRORS;
  ```
* If a pipeline fails with an out of memory error in Java, either increase the heap size using the `pipeline_cdc_java_heap_size` engine variable or reduce the `max.queue.size` and `max.batch.size` parameters in the `CONFIG` clause. The heap size is limited by the memory available on the MA. SingleStore recommends setting the queue size as double of the batch size.

## Example

## Example 1 - Create Tables with Different Names from the Source Collection

To create tables in SingleStore with names that differ from the name of the source MongoDB® collection, use the following syntax:

```sql
CREATE TABLE <new_table_name> AS INFER PIPELINE AS LOAD DATA 
LINK <link_name> '<source_db.source_collection>' FORMAT AVRO;
```

You can also use this command to import collections if a table with the same name already exists in SingleStore. Additionally, you can use this syntax to reimport a collection with a distinct table name.

## Example 2 - Use Regular Expressions to Include or Exclude Collections or Databases

The names of the databases or collections to include or exclude are specified using regular expressions. For example, to include or exclude all the collections in a database named `dbTest`, use the following regular expression:

```
dbTest[.].* 
-- OR -- 
dbTest\..*
```

where,

* `dbTest` matches the exact string `dbTest`.
* `[.]` and `\.` match the character dot (`.`), which represents the dot (`.`) in `<database_name>.<collection_name>` notation.

  Note that `.` is a special character in regular expressions, and to match the character `.` it must either be escaped (`\.`) or specified as a literal character (`[.]`).
* `.*` matches any sequence of characters, because the dot (`.`) matches any single character and the asterisk (`*`) matches zero or more occurrences of any character.

Here's a sample `CREATE LINK` statement to include all the collections in the `dbTest` database, for example, `dbTest.foo`, `dbTest.bar`, `dbTest.exampleCollection`, etc.:

```sql
CREATE LINK mongoRepl AS MONGODB 
CONFIG '{ 
    "mongodb.connection.string": "mongodb+srv://cluster0.mongodb.net/", 
    "collection.include.list": "dbTest\..*", 
    "mongodb.ssl.enabled": "true", 
    "mongodb.authsource": "admin" 
}' 
CREDENTIALS '{ 
    "mongodb.user": "<username>", 
    "mongodb.password": "<password>" 
}';
```

To specify the collections or databases to include or exclude from the replication task, use either (or a combination) of the following parameters: `collection.include.list`, `collection.exclude.list`, `database.include.list`, or `database.exclude.list`. Refer to [Parameters](https://docs.singlestore.com/#section-idm4549735334267233676416981853.md) for more information.

***

Modified at: May 12, 2026

Source: [/db/v9.1/load-data/data-sources/replicate-data-from-mongodb/](https://docs.singlestore.com/db/v9.1/load-data/data-sources/replicate-data-from-mongodb/)

(An index of the documentation is available at /llms.txt)
