Change Data Capture

Note

This is a Preview feature.

Change Data Capture (CDC) in SingleStore allows you to monitor and capture changes made to a database in real-time using OBSERVE queries. It allows downstream consumers to receive a near real-time, immutable stream of changes made to a SingleStore database.

Why Use CDC

CDC allows you to:

  • Replicate your SingleStore databases to both homogeneous (SingleStore) and heterogeneous (for example, Kafka) systems.

  • Maintain a copy of your SingleStore database or a subset of tables in open source formats to external storage options like Iceberg, Delta Lake, Apache Hudi, etc.

  • Enable real-time data consumption by streaming changes to data processing pipelines, analytical systems, and incrementally updated dashboards.

  • Support enterprise-grade high-availability node configuration through data replication.

  • Filter data before moving/replicating to downstream systems.

Enable CDC

To enable CDC:

  1. Upgrade from SingleStore version 8.5.x or 8.7.

  2. Set the enable_observe_queries global variable to 1.

    Warning

    Once the enable_observe_queries variable is enabled, you can no longer rollback to a SingleStore version 8.5.x. This limitation is not applicable to SingleStore version 8.7.

    SET GLOBAL enable_observe_queries = 1;
  3. After upgrading SingleStore, create a snapshot of the relevant databases. This snapshot serves as the initial offset for OBSERVE queries, which allows any OBSERVE query to resume from this offset.

If enable_observe_queries is disabled, repeat the procedure to re-enable it.

Syntax

OBSERVE field_filter
FROM table_filter
[ AS format]
[ BEGIN AT offset_config ]
[ END AT TAIL ]
field_filter: [table.]field, ...
format: SQL
offset_config: [offset | NULL], ... // number of partitions

Arguments

  • field_filter: A comma-separated list of columns to return. Currently only wildcards are supported for this argument. The output also contains a few auxiliary columns. Refer to Auxiliary Columns for more information.

  • table_filter: Name of the table.

  • format: The required serialization format for the column data. The default is SQL.

  • BEGIN AT offset_config: The offset from which the OBSERVE query must resume. Specify one offset per partition. If NULL is specified for a partition, the observation is resumed from the last observed offset.

  • END AT TAIL: The offset at which the OBSERVE query must stop observation. If END AT TAIL is specified, the observation stops once the offset reaches the end of database write-ahead log (WAL) for each partition. If the END AT clause is not specified, the OBSERVE query runs indefinitely.

Offset

An offset is an opaque identifier that represents a position in a database snapshot or a database write-ahead log (WAL). Within a partition, each offset represents a unique position which is always strictly increasing. Offset values are immutable and do not change between queries.

The OBSERVE query can resume from either the last observed offset or a specific offset. The offset in the BEGIN AT clause indicates that all the changes before the specified offset have been observed.

The offset specified in the END AT clause indicates that the observation must stop once this offset is reached. The END AT TAIL clause continues observation until the offset reaches the end of database WAL. If the END AT clause is not specified, the OBSERVE query runs indefinitely.

Consider the following query:

OBSERVE * FROM test
BEGIN AT ('0000000000000088000000000000000C000000000000C087',
NULL,
'0000000000000078000000000000000C000000000000C087',
NULL);

This query starts observing from the test table at the following logical position for each partition:

  • Partition 0: 0000000000000088000000000000000C000000000000C087

  • Partition 1: Latest snapshot

  • Partition 2: 0000000000000078000000000000000C000000000000C087

  • Partition 3: Latest snapshot

Specifying NULL for a partition in the BEGIN AT clause indicates that observation starts from the latest snapshot for the partition. Omitting the BEGIN AT clause is equivalent to specifying NULL for each partition.

For each database partition, the OBSERVE query scans each offset in the transaction log. The result set is then multiplexed and relayed to the client.

Leaves multiplexing changes on the aggregator, which relays them to the client.

The OBSERVE query automatically transitions from a snapshot to the next logical position in the database WAL, eliminating the need for an out-of-band snapshot scan or stopping write queries on the database.

The offset transitions from a snapshot to the next logical position in a database write-ahead log.

Transactions

All changes are contained within a set of transaction boundary records, BeginTransaction and CommitTransaction. These events (or changes) specify the beginning and ending positions for all the changes applied within a transaction. Each transaction is identified by a TxID, which is unique within a database. It can be shared between partitions in the instance of a two-phase commit (2PC). All 2PC transactions have a TxPartitions value greater than 1, which implies that this transaction writes data to at most TxPartitions number of partitions. All individual change records within a transaction share the same TxID.

The OBSERVE query only retrieves committed transactions. Aborted or uncommitted transactions are not observed.

Snapshots also contain boundary records, BeginSnapshopt and CommitSnapshot, which indicate that the data is sourced from a snapshot.

Output Format

SQL

In SQL format, the columns being observed are returned as a series of SQL columns similar to the SELECT statement.

Change Records

Changes to the database are defined as change records. Each change record on the database is sent as a single row in the result set. It also contains information on the current state of the row.

For Insert and Update operations, the data in the change record is the inserted row data and the state of the row after the update, respectively. For Delete operations, only the primary key or the internal ID is specified in the change record.

Auxiliary Columns

The schema of the result set returned by an OBSERVE query contains the following additional auxiliary columns.

Column Name

Description

Offset

The current offset.

PartitionId

ID of the partition to which the current record is written.

Type

The type of change. For example, Insert, Update, Delete, BeginTransaction, CommitTransaction, etc.

Table

The name of the table to which the current record is written. It can contain NULL values for records that do not contain data, for example, BeginTransaction.

TxId

Unique ID of a transaction within a database. All records between a BeginTransaction and CommitTransaction change share the same transaction ID. For multi-partition transactions, the transaction ID is shared between multiple partitions.

TxPartitions

The number of partitions affected by the current transaction. It represents an upper bound on the number of partitions that return records for a transaction.

InternalId

An internal identifier for tables that do not have primary keys, which allows identification of records in the absence of a primary key.

Replicate Tables

Replicate to SingleStore or a SQL-compatible Database

You can replicate your SingleStore databases to another SQL-compatible database (SingleStore or non-SingleStore database) using Change Data Capture. Refer to the CDC-out Examples GitHub repository for examples.

Replicate to Kafka using Debezium

The SingleStore Connector for Debezium ("the connector") captures and records row-level changes in a SingleStore database. The connector can be configured to ignore, mask, or truncate values in specific columns.

Refer to the SingleStore Connector for Debezium GitHub repository for information on how to configure the connector and replicate your SingleStore databases to Kafka.

Limitations

  • DDL changes are not supported. The OBSERVE query stops once a DDL event is observed on all the partitions, effectively aligning all partition streams within the database.

  • OBSERVE query plans are not removed by default when a table is updated or a database is dropped. You must manually drop the OBSERVE query plans:

    SELECT PLAN_ID FROM information_schema.mv_plancache
    WHERE QUERY_TEXT LIKE 'OBSERVE %';
    DROP <plan_id> FROM PLANCACHE;
  • The earliest possible offset for a database is its restore point.

  • The TxPartitions specifies the number of partitions affected by the current transaction. It represents an upper bound on the number of partitions that return records for a transaction. The OBSERVE query may not return an empty transaction even if the distributed transaction did not write to any of the partitions.

  • For Delete operations on columnstore tables, only the internal ID (InternalId) is populated in the change record.

  • The OBSERVE command does not support the JOIN clause.

Example

Example 1

The following example shows the output returned by an OBSERVE query for a sample dataset.

CREATE DATABASE dbTest;
USE dbTest;
CREATE TABLE tblEx(
ID INT NOT NULL PRIMARY KEY,
Code VARCHAR(4));
INSERT INTO tblEx(ID, Code) VALUES (1, "KorE");
INSERT INTO tblEx(ID, Code) VALUES (2, "PamY");
INSERT INTO tblEx(ID, Code) VALUES (3, "TabK");
UPDATE tblEx SET Code = "JonA" WHERE ID = 2;
OBSERVE * FROM tblEx;
+----------------------------------------------------+-------------+--------------------+--------+--------------------------------------------------------------------------------------+--------------+----------------------+------+------+
| Offset                                             | PartitionId | Type               | Table  | TxId                                                                                 | TxPartitions | InternalId           | ID   | Code |
+----------------------------------------------------+-------------+--------------------+--------+--------------------------------------------------------------------------------------+--------------+----------------------+------+------+
| 0x0000000000000077000000000000000E000000000000E06E |           4 | BeginTransaction   |        | 0x0100000000000000040000000000000077000000000000000E000000000000E06E0000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000077000000000000000E000000000000E087 |           4 | Insert             | tblEx  | 0x0100000000000000040000000000000077000000000000000E000000000000E06E0000000000000000 |            1 |  1152921504606846977 |    1 | KorE |
| 0x0000000000000077000000000000000E000000000000E088 |           4 | CommitTransaction  |        | 0x0100000000000000040000000000000077000000000000000E000000000000E06E0000000000000000 |            1 |                    0 |.NULL | NULL |
| 0x0000000000000077000000000000000F000000000000F039 |           4 | BeginTransaction   |        | 0x0100000000000000040000000000000077000000000000000F000000000000F0390000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000077000000000000000F000000000000F052 |           4 | Insert             | tblEx  | 0x0100000000000000040000000000000077000000000000000F000000000000F0390000000000000000 |            1 |  1152921504606846978 |    3 | TabK |
| 0x0000000000000077000000000000000F000000000000F053 |           4 | CommitTransaction  |        | 0x0100000000000000040000000000000077000000000000000F000000000000F0390000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000078000000000000000C000000000000C06E |           3 | BeginTransaction   |        | 0x0100000000000000030000000000000078000000000000000C000000000000C06E0000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000078000000000000000C000000000000C087 |           3 | Insert             | tblEx  | 0x0100000000000000030000000000000078000000000000000C000000000000C06E0000000000000000 |            1 |  1152921504606846977 |    2 | PamY |
| 0x0000000000000078000000000000000C000000000000C088 |           3 | CommitTransaction  |        | 0x0100000000000000030000000000000078000000000000000C000000000000C06E0000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000078000000000000000E000000000000E039 |           3 | BeginTransaction   |        | 0x0100000000000000030000000000000078000000000000000E000000000000E0390000000000000000 |            1 |                    0 | NULL | NULL |
| 0x0000000000000078000000000000000E000000000000E052 |           3 | Update             | tblEx  | 0x0100000000000000030000000000000078000000000000000E000000000000E0390000000000000000 |            1 |  1152921504606846977 |    2 | JonA |
| 0x0000000000000078000000000000000E000000000000E053 |           3 | CommitTransaction  |        | 0x0100000000000000030000000000000078000000000000000E000000000000E0390000000000000000 |            1 |                    0 | NULL | NULL |
+----------------------------------------------------+-------------+--------------------+--------+--------------------------------------------------------------------------------------+--------------+----------------------+------+------+

Example 2

The following example starts observing from the specified offset for each partition and ends the observation once the offset reaches the end of database write-ahead log (WAL) for each partition in the SQL format.

OBSERVE * FROM test
AS SQL
BEGIN AT
('0000000000000088000000000000000E000000000000E06E',
NULL,
'0000000000000088000000000000000E000000000000E053',
NULL)
END AT TAIL;

Last modified: July 11, 2024

Was this article helpful?