How the SingleStore Debezium Connector Works
On this page
To optimally configure and run the connector, it is essential to understand how it performs snapshots, streams change events, determines Kafka topic names, and uses metadata.
The SingleStore Debezium connector uses Change Data Capture (CDC) (OBSERVE
queries) to capture change events.OBSERVE
query does not block DDL operations from completing.OBSERVE
query stops once a DDL event is observed on all the partitions, effectively aligning all partition streams within the database.
Snapshots
When the connector starts for the first time, it performs an initial consistent snapshot of the database as follows:
-
Establish a connection to the SingleStore database.
-
Identify the table to capture.
-
Read the schema of the captured table.
-
Run the
OBSERVE
query and read everything until aCommitSnapshot
event is observed for each database partition. -
Record the offset (of
CommitSnapshot
events) on successful completion of the snapshot for each database partition.The offset is a unique identifier that represents a position in a database snapshot or a database write-ahead log (WAL).
Change Record Streams
After completing the initial snapshot, the connector streams the change records from the offsets recorded while performing the snapshot.
The connector forwards events as change records to the Kafka Connect framework.
When Kafka Connect shuts down gracefully, it stops the connector, flushes all the events to Kafka, and records the latest offset received for each database partition.
Kafka Topic Names
The connector writes change records for each INSERT
, UPDATE
, and DELETE
operation on a table to a single Kafka topic.
topicPrefix.
where,
-
topicPrefix
: Topic name prefix specified using thetopic.
connector configuration property.prefix -
databaseName
: Database name specified using thedatabase.
connector configuration property.dbname -
tableName
: Table name specified using thedatabase.
connector configuration property.table
For example, if the topic prefix is set0, the SingleStore database name is inventory, and the name of the table is orders, the connector writes events to the Kafka topic named set0.
Data Change Events
The connector generates change events in key:value
pairs.
Debezium and Kafka Connect are designed to handle a continuous stream of change events.schema
and payload
, where the schema
defines the structure of the payload
and the payload
contains the event data.
Change Event Keys
The change event keys have a schema
and a payload
.
-
schema
: Specifies a Kafka Connect schema that describes the structure of the key'spayload
. -
payload
: For tables that contain primary key(s), the change event key payload contains the primary key fields.Otherwise, the payload contains a single field InternalId
that specifies theInternalID
of the row for which the change event was generated.InternalId
represents a unique ID assigned to each row in the database.
{"schema":{ (1)"type":"struct","optional":false,"fields":[{"type":"int64","optional":false,"field":"InternalId"}]},"payload":{ (2)"InternalId": "100000000000000600000007"}}
Change Event Values
The change event values also have a schema
and a payload
.
-
schema
: Describes the Event Envelope structure of thepayload
, including its nested fields.Change events for create, update, and delete operations have a value payload with an envelope structure. -
payload
: The envelope structure in a change event value payload contains the following fields:-
op
(Required): Contains a string value that specifies the type of operation.It has one of the following values: c
(insert),u
(update),d
(delete), orr
(read, which indicates a snapshot). -
before
(Optional): Specifies the state of the row before the event occurred.This field is always null
.It is added to match the format generated by other Debezium connectors. -
after
(Optional): If present, specifies the state of a row after a change occurs. -
source
(Required): Contains a structure that describes the source metadata for the event.The structure includes the following fields: -
version
: Specifies the Debezium version. -
connector
: Specifies the connector plugin name. -
name
: Specifies the connector name. -
db
: Specifies the name of the SingleStore database. -
table
: Specifies the table name. -
snapshot
: Specifies whether the event is part of an ongoing snapshot. -
txId
: Specifies the database transaction ID. -
partitionId
: Specifies the database partition ID. -
offsets
: Specifies the list of database-level offsets for each database partition. -
ts_
: Specifies the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector processed the event.ms Other Debezium connectors include a timestamp that indicates when the record in the source database changed, but this information cannot be retrieved using CDC with SingleStore. -
sequence
: Specifies an extra sequencing metadata about a change event.This value is always null
.
-
-
ts_
(Optional): If present, specifies the time (based on the system clock in the JVM that runs the Kafka Connect task) at which the connector processed the event.ms
-
Create Event Example
The following is an example of a change event value that the connector generates for a create (insert) event:
{"schema":{...},"payload":{"before":null,"after":{"a":33},"source":{"version":"0.1.8","connector":"singlestore","name":"singlestore","ts_ms":1706197043473,"snapshot":"true","db":"dbTest","sequence":null,"table":"example","txId":"ffffffffffffffff0000000000000003000000000000004600000000000460390000000000000000","partitionId":0,"offsets":["000000000000000300000000000000460000000000046049"]},"op":"c","ts_ms":1706197043473,"transaction":null}}
The value in the op
field is c
, which indicates this is a create operation.
Update Event Example
The following is an example of a change event value that the connector generates for an update event:
{"schema":{...},"payload":{"before":null,"after":{"a":22},"source":{"version":"0.1.8","connector":"singlestore","name":"singlestore","ts_ms":1706197446500,"snapshot":"true","db":"dbTest","sequence":null,"table":"example","txId":"ffffffffffffffff0000000000000003000000000000004c000000000004c16e0000000000000000","partitionId":0,"offsets":["0000000000000003000000000000004c000000000004c17e"]},"op":"u","ts_ms":1706197446500,"transaction":null}}
The payload
structure is similar to the create event with the following differences:
-
The value in the
op
field isu
, which indicates this is an update operation. -
The
after
field contains the updated value of the columna
. -
The structure of the
source
field is similar to the create event, but the respective values are different because the connector captured the event from a different offset. -
The
ts_
field indicates the timestamp when the connector processed the event.ms
Delete Event Example
The following is an example of a change event value that the connector generates for a delete event:
{"schema":{...},"payload":{"before":null,"after":null,"source":{"version":"0.1.8","connector":"singlestore","name":"singlestore","ts_ms":1706197665407,"snapshot":"true","db":"dbTest","sequence":null,"table":"example","txId":"ffffffffffffffff00000000000000030000000000000053000000000005316e0000000000000000","partitionId":0,"offsets":["000000000000000300000000000000530000000000053179"]},"op":"d","ts_ms":1706197665408,"transaction":null}}
The payload
structure indicates the following:
-
The value in the
op
field isd
, which indicates this is a delete operation. -
The
after
field containsnull
which indicates the row does not exist (is deleted). -
The structure of the
source
field is similar to the create event, but the respective values are different because the connector captured the event from a different offset. -
The
ts_
field indicates the timestamp when the connector processed the event.ms
Exactly-Once Delivery
Note
Exactly-once semantics (EOS) is currently supported only with Kafka Connect in distributed mode.
Kafka Connect supports exactly-once delivery starting from Kafka version 3.
-
Set
exactly.
in the Kafka Connect worker configuration.once. source. support = enabled This ensures EOS is enabled across all workers. When performing a rolling update, first set exactly.
on each worker, and then gradually updateonce. source. support = preparing exactly.
toonce. source. support enabled
. -
Add
exactly.
in the connector configuration.once. support = required
Limitation
If the connector is stopped for an extended period of time, the offset may become stale, and the connector may need to be manually restarted.
To avoid such scenarios, closely monitor the connector downtime.
Last modified: March 31, 2025