Change Data Capture

Note

This is a preview feature. SingleStore does not recommend using this feature in a production environment.

Change Data Capture (CDC) in SingleStore allows you to monitor and capture changes made to a database in real-time using OBSERVE queries. The OBSERVE command implements log-based CDC and uses the existing database write-ahead log (WAL) to source the information. It allows downstream consumers to receive a near real-time, immutable stream of changes made to a SingleStore database.

Why Use CDC

CDC allows you to:

  • Replicate your SingleStore databases to both homogeneous (SingleStore) and heterogeneous (for example, Kafka) systems.

  • Maintain a copy of your SingleStore database or a subset of tables in open source formats to external storage options like Iceberg, Delta Lake, Apache Hudi, etc.

  • Enable real-time data consumption by streaming changes to data processing pipelines, analytical systems, and incrementally updated dashboards.

  • Support enterprise-grade high-availability node configuration through data replication.

  • Filter data before moving/replicating to downstream systems.

Enable CDC

To enable CDC:

  1. Upgrade to SingleStore version 8.7.16.

  2. Set the enable_observe_queries global variable to 1.

    Warning

    Once the enable_observe_queries variable is enabled, you can no longer rollback to a SingleStore version earlier than 8.7.16, even if this variable is disabled afterwards.

    SET GLOBAL enable_observe_queries = 1;
  3. After upgrading SingleStore, create a snapshot of the relevant databases. This snapshot serves as the initial offset for OBSERVE queries, which allows any OBSERVE query to resume from this offset. Refer to SNAPSHOT DATABASE for information on creating a snapshot.

If enable_observe_queries is disabled, repeat the procedure to re-enable it.

Syntax

OBSERVE field_filter 
    FROM table_filter 
    [ AS format]
    [ BEGIN AT offset_config ]
    [ END AT TAIL ]


field_filter: [table.]field, ...
format: SQL
offset_config: [offset | NULL], ...  // number of partitions

Arguments

  • field_filter: A comma-separated list of columns to return. Currently only wildcards are supported for this argument. The output also contains a few auxiliary columns. Refer to Auxiliary Columns for more information.

  • table_filter: Name of the table.

  • format: The required serialization format for the column data. The default is SQL.

  • BEGIN AT offset_config: The offset from which the OBSERVE query must resume. Specify one offset per partition. If NULL is specified for a partition, the observation is resumed from the last observed offset.

  • END AT TAIL: The offset at which the OBSERVE query must stop observation. If END AT TAIL is specified, the observation stops once the offset reaches the end of database write-ahead log (WAL) for each partition. If the END AT clause is not specified, the OBSERVE query runs indefinitely.

Remarks

  • The OBSERVE command only streams committed data. Rolled back (aborted) and failed transactions are filtered and excluded.

  • To manually stop an OBSERVE query, use the KILL QUERY command.

  • To prevent the aggregator from terminating OBSERVE queries that are idle, use the observe_agg_timeout_secs engine variable. This engine variable specifies the maximum time (in seconds) that an OBSERVE query can remain idle on an aggregator node before the query is terminated. By default, observe_agg_timeout_secs is set to 600 seconds.

    Note

    OBSERVE queries do not block DDL operations on the aggregator. Hence, the timeout can be set to larger values.

  • To prevent long-running OBSERVE queries from blocking DDL operations, use the observe_leaf_timeout_secs engine variable. This engine variable specifies the maximum time an OBSERVE query can remain idle on leaf nodes before closing the connection. This timeout is only applicable to leaf partitions.

  • If the primary key in a columnstore table is not defined using the the PRIMARY KEY(column_name) clause or the PRIMARY KEY keyword in the CREATE TABLE statement, the OBSERVE query will not return the primary key as expected in the CDC stream. Instead the InternalId column is used.

  • To view or observe state of the CDC extractor pool, use the SHOW CDC EXTRACTOR POOL command.

Offset

An offset is an opaque identifier that represents a position in a database snapshot or a database write-ahead log (WAL). Within a partition, each offset represents a unique position which is always strictly increasing. Offset values are immutable and do not change between queries.

The OBSERVE query can resume from either the last observed offset or a specific offset. The offset in the BEGIN AT clause indicates that all the changes before the specified offset have been observed.

The offset specified in the END AT clause indicates that the observation must stop once this offset is reached. The END AT TAIL clause continues observation until the offset reaches the end of database WAL. If the END AT clause is not specified, the OBSERVE query runs indefinitely.

For information on offsets that can be used to start an OBSERVE query, refer to OBSERVE_DATABASE_OFFSETS information schema view.

Consider the following query:

OBSERVE * FROM test
BEGIN AT ('0000000000000088000000000000000C000000000000C087',
NULL,
'0000000000000078000000000000000C000000000000C087',
NULL);

This query starts observing from the test table at the following logical position for each partition:

  • Partition 0: 0000000000000088000000000000000C000000000000C087

  • Partition 1: Latest snapshot

  • Partition 2: 0000000000000078000000000000000C000000000000C087

  • Partition 3: Latest snapshot

Specifying NULL for a partition in the BEGIN AT clause indicates that observation starts from the latest snapshot for the partition. Omitting the BEGIN AT clause is equivalent to specifying NULL for each partition.

For each database partition, the OBSERVE query scans each offset in the transaction log. The result set is then multiplexed and relayed to the client.

Leaves multiplexing changes on the aggregator, which relays them to the client.

The OBSERVE query automatically transitions from a snapshot to the next logical position in the database WAL, eliminating the need for an out-of-band snapshot scan or stopping write queries on the database.

The offset transitions from a snapshot to the next logical position in a database write-ahead log.

Use the snapshots_to_keep and snapshot_trigger_size engine variables to control the number and size of snapshots. Refer to List of Engine Variables for more information.

Note: Snapshots are only observed at the start of an OBSERVE query.

Transactions

All changes are contained within a set of transaction boundary records, BeginTransaction and CommitTransaction. These events (or changes) specify the beginning and ending positions for all the changes applied within a transaction. There are no nested transactions. Each transaction is identified by a TxID, which is unique within a database. It can be shared between partitions in the instance of a two-phase commit (2PC). All individual change records within a transaction share the same TxID.

The OBSERVE query only retrieves committed transactions. Rolled back (aborted) or uncommitted transactions are not observed.

Snapshots also contain boundary records, namely BeginSnapshot and CommitSnapshot, which indicate that the data is sourced from a snapshot. Snapshots can have transactions within them. BeginSnapshot and CommitSnapshot records are always returned even if they do not contain any records.

Output Format

SQL

In SQL format, the columns being observed are returned as a series of SQL columns similar to the SELECT statement.

Change Records

CDC returns an event stream of every insert, update, and delete (change) made to the data and schema change for the observed table(s). These changes to the database are defined as change records. Each change record on the database is sent as a single row in the result set. It also contains information on the current state of the row.

For Insert and Update operations, the data in the change record is the inserted row data and the state of the row after the update, respectively. For Delete operations, only the primary key or the internal ID is specified in the change record.

Auxiliary Columns

The schema of the result set returned by an OBSERVE query contains the following additional auxiliary columns.

Column Name

Description

Offset

The current offset.

PartitionId

ID of the partition to which the current record is written.

Type

The type of change. For example, Insert, Update, Delete, BeginTransaction, CommitTransaction, etc.

Table

The name of the table to which the current record is written. It can contain NULL values for records that do not contain data, for example, BeginTransaction.

TxId

Unique ID of a transaction within a database. All records between a BeginTransaction and CommitTransaction change share the same transaction ID. For multi-partition transactions, the transaction ID is shared between multiple partitions.

TxTimestamp

The commit timestamp of the record. The commit timestamp is specific to a partition and cannot be used to determine the order of records across partitions. The value of TxTimestamp is 0 (invalid timestamp) for multi-partition transactions.

InternalId

An internal identifier for tables that do not have primary keys, which allows identification of records in the absence of a primary key. InternalId is unique across partitions.

If a primary key is defined for the table, the InternalId column contains NULL for INSERT, UPDATE, and DELETE change records, and the primary key columns are populated in the change record.

Note: Columnstore tables created with a primary key before upgrading to SingleStore version 8.7.16, return InternalId (internal identifier for tables) instead of the primary key columns.

Replicate Tables

Replicate to SingleStore or a SQL-compatible Database

You can replicate your SingleStore databases to another SQL-compatible database (SingleStore or non-SingleStore database) using Change Data Capture. Refer to the CDC-out Examples GitHub repository for examples.

Replicate to Kafka using Debezium

The SingleStore Connector for Debezium ("the connector") captures and records row-level changes in a SingleStore database. The connector can be configured to ignore, mask, or truncate values in specific columns.

Refer to the SingleStore Connector for Debezium GitHub repository for information on how to configure the connector and replicate your SingleStore databases to Kafka.

Limitations

  • DDL changes are not supported. The OBSERVE query stops once a DDL event is observed on all the partitions, effectively aligning all partition streams within the database.

  • OBSERVE query plans are not removed by default when a table is updated or a database is dropped. You must manually drop the OBSERVE query plans:

    SELECT PLAN_ID FROM information_schema.mv_plancache
    WHERE QUERY_TEXT LIKE 'OBSERVE %';
    DROP <plan_id> FROM PLANCACHE;
  • The earliest possible offset for a database is its restore point.

  • The OBSERVE command does not support the JOIN clause.

Example

Example 1

The following example shows the output returned by an OBSERVE query for a sample dataset.

CREATE DATABASE dbTest;
USE dbTest;
CREATE TABLE tblEx(
ID INT NOT NULL PRIMARY KEY,
Code VARCHAR(4));
INSERT INTO tblEx(ID, Code) VALUES (1, "KorE");
INSERT INTO tblEx(ID, Code) VALUES (2, "PamY");
INSERT INTO tblEx(ID, Code) VALUES (3, "TabK");
UPDATE tblEx SET Code = "JonA" WHERE ID = 2;
OBSERVE * FROM tblEx;
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+
| Offset                                                     | PartitionId | Type              | TxId                                                                                                 | TxTimestamp | InternalId | ID    | Code  |
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+
| 0x00000000000000010000000000000000000000000000000000000001 |           0 | BeginSnapshot     | 0x00000000000000000000000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           0 | CommitSnapshot    | 0x00000000000000000000000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000040000000000000008000000000000803900000000 |           0 | BeginTransaction  | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 |     1048596 | NULL       |  NULL | NULL  |
| 0x00000000000000040000000000000008000000000000804600000000 |           0 | Insert            | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 |     1048596 | NULL       |     2 | PamY  |
| 0x00000000000000040000000000000008000000000000804700000000 |           0 | CommitTransaction | 0x01000000000000000000000000000000040000000000000008000000000000803900000000000000000000000000000000 |     1048596 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000000000000001 |           1 | BeginSnapshot     | 0x00000000000000000100000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           1 | CommitSnapshot    | 0x00000000000000000100000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000050000000000000005000000000000503900000000 |           1 | BeginTransaction  | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 |     1048595 | NULL       |  NULL | NULL  |
| 0x00000000000000050000000000000005000000000000504600000000 |           1 | Insert            | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 |     1048595 | NULL       |     1 | KorE  |
| 0x00000000000000050000000000000005000000000000504700000000 |           1 | CommitTransaction | 0x01000000000000000100000000000000050000000000000005000000000000503900000000000000000000000000000000 |     1048595 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000000000000001 |           4 | BeginSnapshot     | 0x00000000000000000400000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           4 | CommitSnapshot    | 0x00000000000000000400000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000000000000001 |           3 | BeginSnapshot     | 0x00000000000000000300000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           3 | CommitSnapshot    | 0x00000000000000000300000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000070000000000000008000000000000803900000000 |           3 | BeginTransaction  | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 |     1048597 | NULL       |  NULL | NULL  |
| 0x00000000000000070000000000000008000000000000804600000000 |           3 | Insert            | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 |     1048597 | NULL       |     3 | TabK  |
| 0x00000000000000070000000000000008000000000000804700000000 |           3 | CommitTransaction | 0x01000000000000000300000000000000070000000000000008000000000000803900000000000000000000000000000000 |     1048597 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000000000000001 |           2 | BeginSnapshot     | 0x00000000000000000200000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           2 | CommitSnapshot    | 0x00000000000000000200000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000040000000000000009000000000000903900000000 |           0 | BeginTransaction  | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 |     1048598 | NULL       |  NULL | NULL  |
| 0x00000000000000040000000000000009000000000000904600000000 |           0 | Update            | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 |     1048598 | NULL       |     2 | JonA  |
| 0x00000000000000040000000000000009000000000000904700000000 |           0 | CommitTransaction | 0x01000000000000000000000000000000040000000000000009000000000000903900000000000000000000000000000000 |     1048598 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000000000000001 |           5 | BeginSnapshot     | 0x00000000000000000500000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
| 0x00000000000000010000000000000000000000000000104B00000001 |           5 | CommitSnapshot    | 0x00000000000000000500000000000000010000000000000000FFFFFFFFFFFFFFFF00000000000000000000000000000000 |           1 | NULL       |  NULL | NULL  |
+------------------------------------------------------------+-------------+-------------------+------------------------------------------------------------------------------------------------------+-------------+------------+-------+-------+

Note

By default, the singlestore and mysql command-line SQL clients buffer their output until a query completes. Therefore, the output of the OBSERVE query may not be displayed while the query is running. To view the unbuffered output of the OBSERVE query and view the rows in the result set as they are returned by the query, use the --quick option with the clients. Refer to mysql Client Options for more information.

Example 2

The following example starts observing from the specified offset for each partition and ends the observation once the offset reaches the end of database write-ahead log (WAL) for each partition in the SQL format.

OBSERVE * FROM test
AS SQL
BEGIN AT
('0000000000000088000000000000000E000000000000E06E',
NULL,
'0000000000000088000000000000000E000000000000E053',
NULL)
END AT TAIL;

References

Last modified: November 22, 2024

Was this article helpful?