How the SingleStore Debezium Connector Works

To optimally configure and run the connector, it is essential to understand how it performs snapshots, streams change events, determines Kafka topic names, and uses metadata.

The SingleStore Debezium connector uses Change Data Capture (CDC) (OBSERVE queries) to capture change events. The connector does not handle schema changes. Although DDL changes are not supported, the OBSERVE query does not block DDL operations from completing. The OBSERVE query stops once a DDL event is observed on all the partitions, effectively aligning all partition streams within the database.

Snapshots

When the connector starts for the first time, it performs an initial consistent snapshot of the database as follows:

  1. Establish a connection to the SingleStore database.

  2. Identify the table to capture.

  3. Read the schema of the captured table.

  4. Run the OBSERVE query and read everything until a CommitSnapshot event is observed for each database partition.

  5. Record the offset (of CommitSnapshot events) on successful completion of the snapshot for each database partition. The offset is a unique identifier that represents a position in a database snapshot or a database write-ahead log (WAL).

Change Record Streams

After completing the initial snapshot, the connector streams the change records from the offsets recorded while performing the snapshot. The connector records the offset of each observed event for each database partition. If the streaming stops for any reason, the connector retries streaming from the last observed offset upon restart.

The connector forwards events as change records to the Kafka Connect framework. The Kafka Connect process asynchronously writes the change records to the appropriate Kafka topic in the same order in which they were generated. Additionally, Kafka Connect periodically records the most recently observed offset in a separate Kafka topic.

When Kafka Connect shuts down gracefully, it stops the connector, flushes all the events to Kafka, and records the latest offset received for each database partition. Upon restart, Kafka Connect requests SingleStore to send the events that occurred after the last recorded offset and starts observing the events from the last recorded offset for each database partition.

Kafka Topic Names

The connector writes change records for each INSERT, UPDATE, and DELETE operation on a table to a single Kafka topic. The Kafka topics are unique for each table. The names of the Kafka topics use the following format:

topicPrefix.databaseName.tableName

where,

  • topicPrefix: Topic name prefix specified using the topic.prefix connector configuration property.

  • databaseName: Database name specified using the database.dbname connector configuration property.

  • tableName: Table name specified using the database.table connector configuration property.

For example, if the topic prefix is set0, the SingleStore database name is inventory, and the name of the table is orders, the connector writes events to the Kafka topic named set0.inventory.orders.

Data Change Events

The connector generates change events in key:value pairs. The structure of the table that generated the change event determines the structure of the key and value. Refer to Kafka Topic Names for information on how Debezium constructs topic names.

Debezium and Kafka Connect are designed to handle a continuous stream of change events. Because the structure of these change events may vary over time, each event in Kafka Connect is self-contained to facilitate the processing of mutable event structures. Every event message has two parts: schema and payload, where the schema defines the structure of the payload and the payload contains the event data.

Change Event Keys

The change event keys have a schema and a payload.

  • schema: Specifies a Kafka Connect schema that describes the structure of the key's payload.

  • payload: For tables that contain primary key(s), the change event key payload contains the primary key fields. Otherwise, the payload contains a single field InternalId that specifies the InternalID of the row for which the change event was generated. InternalId represents a unique ID assigned to each row in the database.

{
"schema":{ (1)
"type":"struct",
"optional":false,
"fields":[
{
"type":"int64",
"optional":false,
"field":"InternalId"
}
]
},
"payload":{ (2)
"InternalId": "100000000000000600000007"
}
}

Change Event Values

The change event values also have a schema and a payload.

  • schema: Describes the Event Envelope structure of the payload, including its nested fields. Change events for create, update, and delete operations have a value payload with an envelope structure.

  • payload: The envelope structure in a change event value payload contains the following fields:

    • op (Required): Contains a string value that specifies the type of operation. It has one of the following values: c (insert), u (update), d (delete), or r (read, which indicates a snapshot).

    • before (Optional): Specifies the state of the row before the event occurred. This field is always null. It is added to match the format generated by other Debezium connectors.

    • after (Optional): If present, specifies the state of a row after a change occurs.

    • source (Required): Contains a structure that describes the source metadata for the event. The structure includes the following fields:

      • version: Specifies the Debezium version.

      • connector: Specifies the connector plugin name.

      • name: Specifies the connector name.

      • db: Specifies the name of the SingleStore database.

      • table: Specifies the table name.

      • snapshot: Specifies whether the event is part of an ongoing snapshot.

      • txId: Specifies the database transaction ID.

      • partitionId: Specifies the database partition ID.

      • offsets: Specifies the list of database-level offsets for each database partition.

      • ts_ms: Specifies the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector processed the event. Other Debezium connectors include a timestamp that indicates when the record in the source database changed, but this information cannot be retrieved using CDC with SingleStore.

      • sequence: Specifies an extra sequencing metadata about a change event. This value is always null.

    • ts_ms (Optional): If present, specifies the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector processed the event.

Create Event Example

The following is an example of a change event value that the connector generates for a create (insert) event:

{
"schema":{...},
"payload":{
"before":null,
"after":{
"a":33
},
"source":{
"version":"0.1.8",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197043473,
"snapshot":"true",
"db":"dbTest",
"sequence":null,
"table":"example",
"txId":"ffffffffffffffff0000000000000003000000000000004600000000000460390000000000000000",
"partitionId":0,
"offsets":[
"000000000000000300000000000000460000000000046049"
]
},
"op":"c",
"ts_ms":1706197043473,
"transaction":null
}
}

The value in the op field is c, which indicates this is a create operation.

Update Event Example

The following is an example of a change event value that the connector generates for an update event:

{
"schema":{...},
"payload":{
"before":null,
"after":{
"a":22
},
"source":{
"version":"0.1.8",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197446500,
"snapshot":"true",
"db":"dbTest",
"sequence":null,
"table":"example",
"txId":"ffffffffffffffff0000000000000003000000000000004c000000000004c16e0000000000000000",
"partitionId":0,
"offsets":[
"0000000000000003000000000000004c000000000004c17e"
]
},
"op":"u",
"ts_ms":1706197446500,
"transaction":null
}
}

The payload structure is similar to the create event with the following differences:

  • The value in the op field is u, which indicates this is an update operation.

  • The after field contains the updated value of the column a.

  • The structure of the source field is similar to the create event, but the respective values are different because the connector captured the event from a different offset.

  • The ts_ms field indicates the timestamp when the connector processed the event.

Delete Event Example

The following is an example of a change event value that the connector generates for a delete event:

{
"schema":{...},
"payload":{
"before":null,
"after":null,
"source":{
"version":"0.1.8",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197665407,
"snapshot":"true",
"db":"dbTest",
"sequence":null,
"table":"example",
"txId":"ffffffffffffffff00000000000000030000000000000053000000000005316e0000000000000000",
"partitionId":0,
"offsets":[
"000000000000000300000000000000530000000000053179"
]
},
"op":"d",
"ts_ms":1706197665408,
"transaction":null
}
}

The payload structure indicates the following:

  • The value in the op field is d, which indicates this is a delete operation.

  • The after field contains null which indicates the row does not exist (is deleted).

  • The structure of the source field is similar to the create event, but the respective values are different because the connector captured the event from a different offset.

  • The ts_ms field indicates the timestamp when the connector processed the event.

Exactly-Once Delivery

Note

Exactly-once semantics (EOS) is currently supported only with Kafka Connect in distributed mode.

Kafka Connect supports exactly-once delivery starting from Kafka version 3.3.0. To configure exactly-once delivery for the SingleStore Debezium connector, perform the following tasks:

  1. Set exactly.once.source.support = enabled in the Kafka Connect worker configuration. This ensures EOS is enabled across all workers. When performing a rolling update, first set exactly.once.source.support = preparing on each worker, and then gradually update exactly.once.source.support to enabled.

  2. Add exactly.once.support = required in the connector configuration.

Limitation

If the connector is stopped for an extended period of time, the offset may become stale, and the connector may need to be manually restarted. When the connector is restarted, it re-runs the initial snapshot, which can lead to duplicate events resulting in a loss of the exactly-once delivery guarantee during the snapshot phase.

To avoid such scenarios, closely monitor the connector downtime. Refer to Troubleshoot Connector Unable to Start for information on handling and resolving this issue.

Last modified: March 31, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK