Change Data Capture
On this page
Note
This is a Preview feature.
Change Data Capture (CDC) in SingleStore allows you to monitor and capture changes made to a database in real-time using OBSERVE
queries.OBSERVE
command implements log-based CDC and uses the existing database write-ahead log (WAL) to source the information.
Why Use CDC
CDC allows you to:
-
Replicate your SingleStore databases to both homogeneous (SingleStore) and heterogeneous (for example, Kafka) systems.
-
Maintain a copy of your SingleStore database or a subset of tables in open source formats to external storage options like Iceberg, Delta Lake, Apache Hudi, etc.
-
Enable real-time data consumption by streaming changes to data processing pipelines, analytical systems, and incrementally updated dashboards.
-
Support enterprise-grade high-availability node configuration through data replication.
-
Filter data before moving/replicating to downstream systems.
Enable CDC
To enable CDC:
-
Upgrade to SingleStore version 8.
7. 16. -
Set the
enable_
global variable to 1.observe_ queries Warning
Once the
enable_
variable is enabled, you can no longer rollback to a SingleStore version earlier than 8.observe_ queries 7. 16, even if this variable is disabled afterwards. SET GLOBAL enable_observe_queries = 1; -
After upgrading SingleStore, create a snapshot of the relevant databases.
This snapshot serves as the initial offset for OBSERVE
queries, which allows anyOBSERVE
query to resume from this offset.Refer to SNAPSHOT DATABASE for information on creating a snapshot.
If enable_
is disabled, repeat the procedure to re-enable it.
Syntax
OBSERVE field_filter
FROM table_filter
[ AS format]
[ BEGIN AT offset_config ]
[ END AT TAIL ]
field_filter: [table.]field, ...
format: SQL
offset_config: [offset | NULL], ... // number of partitions
Arguments
-
field_
: A comma-separated list of columns to return.filter Currently only wildcards are supported for this argument. The output also contains a few auxiliary columns. Refer to Auxiliary Columns for more information. -
table_
: Name of the table.filter -
format
: The required serialization format for the column data.The default is SQL
. -
BEGIN AT offset_
: The offset from which theconfig OBSERVE
query must resume.Specify one offset per partition. If NULL
is specified for a partition, the observation is resumed from the last observed offset. -
END AT TAIL
: The offset at which theOBSERVE
query must stop observation.If END AT TAIL
is specified, the observation stops once the offset reaches the end of database write-ahead log (WAL) for each partition.If the END AT
clause is not specified, theOBSERVE
query runs indefinitely.
Remarks
-
The
OBSERVE
command only streams committed data.Rolled back (aborted) and failed transactions are filtered and excluded. -
To manually stop an
OBSERVE
query, use the KILL QUERY command. -
To prevent the aggregator from terminating
OBSERVE
queries that are idle, use theobserve_
engine variable.agg_ timeout_ secs This engine variable specifies the maximum time (in seconds) that an OBSERVE
query can remain idle on an aggregator node before the query is terminated.By default, observe_
is set toagg_ timeout_ secs 600
seconds.Note
OBSERVE
queries do not block DDL operations on the aggregator.Hence, the timeout can be set to larger values. -
To prevent long-running
OBSERVE
queries from blocking DDL operations, use theobserve_
engine variable.leaf_ timeout_ secs This engine variable specifies the maximum time an OBSERVE
query can remain idle on leaf nodes before closing the connection.This timeout is only applicable to leaf partitions. -
If the primary key in a columnstore table is not defined using the the
PRIMARY KEY(column_
clause or thename) PRIMARY KEY
keyword in theCREATE TABLE
statement, theOBSERVE
query will not return the primary key as expected in the CDC stream.Instead the InternalId
column is used. -
To view or observe state of the CDC extractor pool, use the SHOW CDC EXTRACTOR POOL command.
Offset
An offset is an opaque identifier that represents a position in a database snapshot or a database write-ahead log (WAL).
The OBSERVE
query can resume from either the last observed offset or a specific offset.BEGIN AT
clause indicates that all the changes before the specified offset have been observed.
The offset specified in the END AT
clause indicates that the observation must stop once this offset is reached.END AT TAIL
clause continues observation until the offset reaches the end of database WAL.END AT
clause is not specified, the OBSERVE
query runs indefinitely.
For information on offsets that can be used to start an OBSERVE
query, refer to OBSERVE_
Consider the following query:
OBSERVE * FROM testBEGIN AT ('0000000000000088000000000000000C000000000000C087',NULL,'0000000000000078000000000000000C000000000000C087',NULL);
This query starts observing from the test
table at the following logical position for each partition:
-
Partition 0:
0000000000000088000000000000000C000000000000C087
-
Partition 1: Latest snapshot
-
Partition 2:
0000000000000078000000000000000C000000000000C087
-
Partition 3: Latest snapshot
Specifying NULL
for a partition in the BEGIN AT
clause indicates that observation starts from the latest snapshot for the partition.BEGIN AT
clause is equivalent to specifying NULL
for each partition.
For each database partition, the OBSERVE
query scans each offset in the transaction log.
The OBSERVE
query automatically transitions from a snapshot to the next logical position in the database WAL, eliminating the need for an out-of-band snapshot scan or stopping write queries on the database.
Use the snapshots_
and snapshot_
engine variables to control the number and size of snapshots.
Note: Snapshots are only observed at the start of an OBSERVE
query.
Transactions
All changes are contained within a set of transaction boundary records, BeginTransaction
and CommitTransaction
.TxID
, which is unique within a database.TxID
.
The OBSERVE
query only retrieves committed transactions.
Snapshots also contain boundary records, namely BeginSnapshot
and CommitSnapshot
, which indicate that the data is sourced from a snapshot.BeginSnapshot
and CommitSnapshot
records are always returned even if they do not contain any records.
Output Format
SQL
In SQL
format, the columns being observed are returned as a series of SQL columns similar to the SELECT
statement.
Change Records
CDC returns an event stream of every insert, update, and delete (change) made to the data and schema change for the observed table(s).
For Insert
and Update
operations, the data in the change record is the inserted row data and the state of the row after the update, respectively.Delete
operations, only the primary key or the internal ID is specified in the change record.
Auxiliary Columns
The schema of the result set returned by an OBSERVE
query contains the following additional auxiliary columns.
Column Name |
Description |
---|---|
|
The current offset. |
|
ID of the partition to which the current record is written. |
|
The type of change. |
|
The name of the table to which the current record is written. |
|
Unique ID of a transaction within a database. |
|
The commit timestamp of the record. |
|
An internal identifier for tables that do not have primary keys, which allows identification of records in the absence of a primary key. If a primary key is defined for the table, the Note: Columnstore tables created with a primary key before upgrading to SingleStore version 8. |
Replicate Tables
Replicate to SingleStore or a SQL-compatible Database
You can replicate your SingleStore databases to another SQL-compatible database (SingleStore or non-SingleStore database) using Change Data Capture.
Replicate to Kafka using Debezium
The SingleStore Connector for Debezium ("the connector") captures and records row-level changes in a SingleStore database.
Refer to the SingleStore Connector for Debezium GitHub repository for information on how to configure the connector and replicate your SingleStore databases to Kafka.
Limitations
-
DDL changes are not supported.
The OBSERVE
query stops once a DDL event is observed on all the partitions, effectively aligning all partition streams within the database. -
OBSERVE
query plans are not removed by default when a table is updated or a database is dropped.You must manually drop the OBSERVE
query plans:SELECT PLAN_ID FROM information_schema.mv_plancacheWHERE QUERY_TEXT LIKE 'OBSERVE %';DROP <plan_id> FROM PLANCACHE; -
The earliest possible offset for a database is its restore point.
-
The
OBSERVE
command does not support theJOIN
clause.
Example
Example 1
The following example shows the output returned by an OBSERVE
query for a sample dataset.
CREATE DATABASE dbTest;USE dbTest;CREATE TABLE tblEx(ID INT NOT NULL PRIMARY KEY,Code VARCHAR(4));INSERT INTO tblEx(ID, Code) VALUES (1, "KorE");INSERT INTO tblEx(ID, Code) VALUES (2, "PamY");INSERT INTO tblEx(ID, Code) VALUES (3, "TabK");UPDATE tblEx SET Code = "JonA" WHERE ID = 2;
OBSERVE * FROM tblEx;
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+
| Offset | PartitionId | Type | TxId | TxTimestamp | InternalId | ID | Code |
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+
| 0x00000000000000010000000000000000000000000000000000000001 | 0 | BeginSnapshot | 0x00000000000000000000000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 0 | CommitSnapshot | 0x00000000000000000000000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000040000000000000008000000000000803900000000 | 0 | BeginTransaction | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 | 1048596 | NULL | NULL | NULL |
| 0x00000000000000040000000000000008000000000000804600000000 | 0 | Insert | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 | 1048596 | NULL | 2 | PamY |
| 0x00000000000000040000000000000008000000000000804700000000 | 0 | CommitTransaction | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 | 1048596 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000000000000001 | 1 | BeginSnapshot | 0x00000000000000000100000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 1 | CommitSnapshot | 0x00000000000000000100000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000050000000000000005000000000000503900000000 | 1 | BeginTransaction | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 | 1048595 | NULL | NULL | NULL |
| 0x00000000000000050000000000000005000000000000504600000000 | 1 | Insert | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 | 1048595 | NULL | 1 | KorE |
| 0x00000000000000050000000000000005000000000000504700000000 | 1 | CommitTransaction | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 | 1048595 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000000000000001 | 4 | BeginSnapshot | 0x00000000000000000400000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 4 | CommitSnapshot | 0x00000000000000000400000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000000000000001 | 3 | BeginSnapshot | 0x00000000000000000300000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 3 | CommitSnapshot | 0x00000000000000000300000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000070000000000000008000000000000803900000000 | 3 | BeginTransaction | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 | 1048597 | NULL | NULL | NULL |
| 0x00000000000000070000000000000008000000000000804600000000 | 3 | Insert | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 | 1048597 | NULL | 3 | TabK |
| 0x00000000000000070000000000000008000000000000804700000000 | 3 | CommitTransaction | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 | 1048597 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000000000000001 | 2 | BeginSnapshot | 0x00000000000000000200000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 2 | CommitSnapshot | 0x00000000000000000200000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000040000000000000009000000000000903900000000 | 0 | BeginTransaction | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 | 1048598 | NULL | NULL | NULL |
| 0x00000000000000040000000000000009000000000000904600000000 | 0 | Update | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 | 1048598 | NULL | 2 | JonA |
| 0x00000000000000040000000000000009000000000000904700000000 | 0 | CommitTransaction | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 | 1048598 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000000000000001 | 5 | BeginSnapshot | 0x00000000000000000500000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
| 0x00000000000000010000000000000000000000000000104B00000001 | 5 | CommitSnapshot | 0x00000000000000000500000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 | 1 | NULL | NULL | NULL |
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+
Note
By default, the singlestore
and mysql
command-line SQL clients buffer their output until a query completes.OBSERVE
query may not be displayed while the query is running.OBSERVE
query and view the rows in the result set as they are returned by the query, use the --quick
option with the clients.
Example 2
The following example starts observing from the specified offset for each partition and ends the observation once the offset reaches the end of database write-ahead log (WAL) for each partition in the SQL
format.
OBSERVE * FROM testAS SQLBEGIN AT('0000000000000088000000000000000E000000000000E06E',NULL,'0000000000000088000000000000000E000000000000E053',NULL)END AT TAIL;
References
Last modified: November 12, 2024