Load Data from MongoDB®
On this page
Note
This is a Preview feature.
SingleStore does not recommend using this feature in a production environment.
You can replicate your MongoDB® collections to your SingleStore databases using Change Data Capture (CDC).
You can perform the replication using SQL statements.
Because MongoDB® time series collections do not support change streams, they cannot be replicated in SingleStore using CDC.
Replicate MongoDB® Collections using SQL
To replicate your existing MongoDB® collections to your SingleStore database using SQL syntax for pipelines, create a link to the MongoDB® endpoint using the CREATE LINK
statement, and use one of the following methods:
-
To replicate/migrate collections (as is), use the
CREATE {TABLES | TABLE} AS INFER PIPELINE .
SQL statement.. . Refer to Remarks for more information. -
To apply transformations and manually ingest the collections,
-
Create a table with a structure that can store the ingested collections.
-
Create a stored procedure to map the MongoDB® collection to the SingleStore table and implement other transformations required.
-
Create a pipeline to ingest the MongoDB® collections using the
CREATE AGGREGATOR PIPELINE
SQL statement.Refer to Parameters for a list of supported parameters. Note
The CDC feature only supports
AGGREGATOR
pipelines.
-
-
(Optional) Run the
CREATE {TABLES | TABLE} AS INFER PIPELINE .
SQL statement to infer the schema of the collection(s) and generate the relevant table(s), stored procedure, and aggregator pipeline, automatically.. . Subsequently, use the automatically generated stored procedure and pipeline as a base for custom transformations. To inspect the stored procedure and pipeline, use the SHOW CREATE PROCEDURE
andSHOW CREATE PIPELINE
commands, respectively.
Once all the components are configured, start the pipelines.
Syntax
CREATE LINK [db_name.]connection_name AS MONGODBCREDENTIALS 'credentials_json' [<link_configuration>]
Refer to CREATE LINK for the complete syntax and related information.
CREATE TABLE <table_name> AS INFER PIPELINE AS LOAD DATA[ LINK <link_name> FORMAT AVRO ... ][ MONGODB <collection> CONFIG <conf_json> CREDENTIALS <cred_json> FORMAT AVRO ... ]CREATE TABLES AS INFER PIPELINE AS LOAD DATA[ LINK <link_name> FORMAT AVRO ... ][ MONGODB '*' CONFIG <conf_json> CREDENTIALS <cred_json> FORMAT AVRO ... ]
Refer to CREATE TABLE for related information.
CREATE [OR REPLACE] AGGREGATOR PIPELINE [IF NOT EXISTS] <pipeline_name> ASLOAD DATA MONGODB <link_configuration> ...<avro_format_options><link_configuration>:LINK [<database_name>.]<connection_name> '<path>'<avro_format_options>:FORMAT AVRO SCHEMA REGISTRY {uri}( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)[SCHEMA '<avro_schema>'][KAFKA KEY ( {<column_name> | @<variable_name>} <- <subvalue_path>, ...)]
Refer to CREATE PIPELINE for the complete syntax and related information.
Refer to CREATE PROCEDURE for information on creating stored procedures.
Parameters
The CREATE AGGREGATOR PIPELINE .
statement supports the following parameters in the CONFIG
/CREDENTIALS
clause:
-
mongodb.
: A comma-separated list of MongoDB® servers (nodes) in the replica set, inhosts 'hostname:[port]'
format.-
If
mongodb.
is set tomembers. auto. discover FALSE
, you must prefix the'hostname:[port]'
with the name of the replica set inmongodb.
, e.hosts g. , rset0/svchost-xxx:27017. The first node specified in mongodb.
is always selected as the primary node.hosts -
If
mongodb.
is set tomembers. auto. discover TRUE
, you must specify both the primary and secondary nodes in the replica set inmongodb.
.hosts
-
-
mongodb.
: Specifies whether the MongoDB® servers defined inmembers. auto. discover mongodb.
should be used to discover all the members of the replica set.hosts If disabled, the servers are used as is. -
mongodb.
: The name of the database user to use when connecting to MongoDB® servers.user -
mongodb.
: The password to use when connecting to MongoDB® servers.password -
mongodb.
: Enables the connector to use SSL when connecting to MongoDB® servers.ssl. enabled -
mongodb.
: Specifies the database containing MongoDB® credentials to use as an authentication source.authsource This parameter is only required when the MongoDB® instance is configured to use authentication with an authentication database other than admin
. -
collection.
: A comma-separated list of regular expressions that match fully-qualified namespaces (ininclude. list databaseName.
format) for MongoDB® collections to monitor.collectionName Collections excluded from the list are not monitored. By default, all collections are monitored, except for those in the local
andadmin
databases.The list must contain the singlestore.
collection, in addition to the specified regular expressions.signals Also, the MongoDB® user must have write permissions to the singlestore.
collection.signals -
signal.
(Optional): A collection in the remote source that is used by SingleStore to generate special markings for snapshotting and synchronization.data. collection By default, this parameter is set to singlestore.
.signals The default signal collection is in the database named singlestore
.The MongoDB® user must have write permissions to this collection. -
max.
: Specifies the size of the queue inside the extractor process for records that are ready for ingestion.queue. size The default queue size is 1024. This variable also specifies the number of rows for each partition. Increasing the queue size results in an increase in the memory consumption by the replication process and you may need to increase the pipelines_ cdc_ java_ heap_ size. -
max.
: Specifies the maximum number of rows of data fetched from the remote source in a single iteration (batch).batch. size The default batch size is 512. max.
must be lower thanbatch. size max.
.queue. size -
poll.
: Specifies the interval for polling of remote sources if there were no new records in the previous iteration in the replication process.interval. ms The default interval is 500 milliseconds. -
snapshot.
: Specifies the snapshot mode for the pipeline.mode It can have one of the following values: -
"initial"
(Default): Perform a full snapshot first and replay CDC events created during the snapshot.Then, continue ingestion using CDC. -
"incremental"
: Start the snapshot operation and CDC simultaneously. -
"never"
: Skip the snapshot, and ingest changes using CDC.
Refer to CDC Snapshot Strategies for more information.
-
Remarks
-
The
CREATE TABLE <table_
statement,name> AS INFER PIPELINE -
Connects to the MongoDB® servers using the specified
LINK <link_
orname> MONGODB <collection> CONFIG <conf_
clause.json> CREDENTIALS <cred_ json> -
Discovers the available databases and collections filtered by
collection.
.include. list -
Infers the schema of the collection and then creates a table (named <table_
name>) in SingleStore using the inferred schema. You can also specify a table name that differs from the name of the source MongoDB® collection. -
Creates a pipeline (named <table_
name>) and stored procedure (named <table_ name>_ apply_ changes) that maps the AVRO data structure to the SingleStore data structure.
-
-
The
CREATE TABLES AS INFER PIPELINE
statement creates a table for each collection in the source database using the same set of operations as theCREATE TABLE <table_
statement (specified in the previous list item).name> AS INFER PIPELINE -
The
MONGODB
source only supportsAVRO
data type. -
Use the
SYNC PIPELINE <pipeline_
statement to replicate (sync with) the data source.name>
Source Database Requirements
The replication feature uses MongoDB® change streams, and it works only with replica sets.
-
Read the
admin
database for the operation log (oplog) -
Read the
config
database in the configuration server -
listDatabases
privilege -
Cluster-wide
find
andchangeStream
privileges
CDC Snapshot Strategies
SingleStore supports the following strategies for creating snapshots:
-
Perform a full snapshot before CDC: The pipeline captures the position in the oplog and then performs a full snapshot of the data.
Once the snapshot is complete, the pipeline continues ingestion using CDC. This strategy is enabled by default. To use this strategy, set
"snapshot.
in themode":"initial" CONFIG
JSON.Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
CDC only: The pipeline will not ingest existing data, and only the changes are captured using CDC.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON. -
Perform a snapshot in parallel to CDC: The pipeline captures the position in the oplog and starts capturing the changes using CDC.
In parallel, the pipeline performs incremental snapshots of the existing data and merges it with the CDC records. To use this strategy, set
"snapshot.
in themode":"incremental" CONFIG
JSON.Requirement: The oplog retention period must be long enough to compensate for unexpected pipeline downtime.
-
Manually perform the snapshot and capture changes using the CDC pipeline:
-
Create a pipeline and then wait for at least one batch of ingestion to capture the oplog position.
-
Stop the pipeline.
-
Snapshot the data using any of the suitable methods, for example,
mongodump
andmongorestore
. -
Start the pipeline.
To use this strategy, set
"snapshot.
in themode":"never" CONFIG
JSON.Requirement: The oplog retention period must be long enough to maintain the records while the snapshot is in progress.
Otherwise, the pipeline will fail and the process will have to be started over. -
Configure Ingestion Speed Limit
Use the pipelines_
engine variable to limit the impact of CDC pipelines on the master aggregator node.1000000
.0
.
Warning
Disabling the emit delay may result in excessive CPU usage on master aggregator nodes.
Use the max.
parameter in the CONFIG
JSON to control the ingestion speed.max.
to half of max.
.pipelines_
engine variable accordingly.INFORMATION_
table for information on pipeline batch performance.
Examples
Example 1 - Replicate Collections using SQL
The following example replicates data from an existing MongoDB® Atlas cluster into SingleStore.
-
Create a link to the MongoDB® endpoint, for example, the primary node of a MongoDB® Atlas cluster.
CREATE LINK <linkname> AS MONGODBCONFIG '{"mongodb.hosts":"<Hostname>","collection.include.list": "<Collection list>","mongodb.ssl.enabled":"true","mongodb.authsource":"admin"}'CREDENTIALS '{"mongodb.user":"<username>","mongodb.password":"<password>"}'; -
Create tables in SingleStore.
## Infer tables from sourceCREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK <linkname> '*' FORMAT AVRO; -
Once the link and the tables are created, run the following command to start all the pipelines and begin the data replication process:
## Start pipelinesSTART ALL PIPELINES;
To view the status of all the pipelines, query the information_
table.
SELECT SOURCE_PARTITION_ID,EARLIEST_OFFSET,LATEST_OFFSET,LATEST_EXPECTED_OFFSET-LATEST_OFFSET as DIFF,UPDATED_UNIX_TIMESTAMPFROM information_schema.pipelines_cursors;
Example 2 - Create Tables with Different Names from the Source Collection
To create tables in SingleStore with names that differ from the name of the source MongoDB® collection, use the following syntax:
CREATE TABLE <new_table_name> AS INFER PIPELINE AS LOAD DATALINK <link_name> '<source_db.source_collection>' FORMAT AVRO;
You can also use this command to import collections if a table with the same name already exists in SingleStore.
Last modified: March 14, 2024