SingleStore Debezium Connector Properties

The SingleStore Debezium connector supports the following configuration properties:

Kafka Connect Properties

Property

Default Value

Description

name

Unique name for the connector. Any attempt to register again with the same name fails. This property is required by all Kafka Connect connectors.

connector.class

The name of the Java Class for the connector. For the SingleStore Debezium connector, specify com.singlestore.debezium.SingleStoreConnector.

tasks.max

1

The maximum number of tasks that can be created for this connector. The connector only uses a single task and fails if more is specified. Hence, the default is always acceptable.

Connection Properties

Property

Default Value

Description

database.hostname

IP address or hostname of the SingleStore deployment.

database.port

3306

Port of the SingleStore deployment.

database.user

Name of the SingleStore database user with which to access the database.

database.password

Password for the SingleStore database user.

database.dbname

Name of the SingleStore database from which the connector captures changes.

database.table

The name of the table from which the connector captures changes.

database.ssl.mode

disable

Whether to use an encrypted connection to SingleStore. It can have one of the following values:

  • disable (Default): Use an unencrypted connection.

  • trust: Use a secure (encrypted) connection (no certificate and hostname validation).

  • verify-ca: Use a secure (encrypted) connection, and additionally verify the server TLS certificate against the configured Certificate Authority (CA) certificates; fail if no valid matching CA certificates are found.

  • verify-full: Similar to verify-ca, but additionally verify that the server certificate matches the host to which the connection is attempted.

database.ssl.keystore

The location of the key store file. The file can be used for two-way authentication between the client and SingleStore. This property is optional.

database.ssl.keystore.password

The password for the key store file. The password is optional, but it must be specified if database.ssl.keystore is configured.

database.ssl.truststore

The location of the trust store file for the server certificate verification.

database.ssl.truststore.password

The password for the trust store file. Used to check the integrity of the trust store and unlock the trust store.

database.ssl.server.cert

Server's certificate in DER format or the server's CA certificate. The certificate is added to the trust store, which allows the connection to trust a self-signed certificate.

connect.timeout.ms

30000

Maximum time (in milliseconds) to wait after trying to connect to the database before timing out.

driver.parameters

Additional JDBC parameters to use in the connection string used to connect to SingleStore, in the following format: param1=value1; param2 = value2; ... . Refer to Connection String Parameters for more information.

Required Connector Configuration Properties

The following configuration properties are required unless a default value is applied.

Property

Default Value

Description

topic.prefix

Specifies the topic prefix that identifies and provides a namespace for the particular database server/cluster that is capturing the changes. The topic prefix must be unique across all other connectors because it is used as a prefix for all Kafka topic names that receive events generated by this connector. It can only contain alphanumeric characters, hyphens, dots, and underscores.

decimal.handling.mode

precise

Specifies how DECIMAL and NUMERIC columns are represented in change events. It can have the following values:

  • precise: Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's org.apache.kafka.connect.data.Decimal type.

  • string: Represents values as string.

  • double: Represents values using Java's double. Although it does not provide the precision, it is easier to use.

binary.handling.mode

bytes

Specifies how binary (BLOB, BINARY, etc.) columns are represented in change events. It can have the following values:

  • bytes: Represents binary data as byte array (default).

  • base64: Represents binary data as base64-encoded string.

  • base64-url-safe: Represents binary data as base64-url-safe-encoded string.

  • hex: Represents binary data as hex-encoded (base16) string.

time.precision.mode

advanced

Specifies the precision type for time, date, and timestamps.

It can have the following values:

  • adaptive: Bases the precision of time, date, and timestamp values on the database column's precision.

  • adaptive_time_microseconds: Similar to adaptive mode, but TIME fields always use microseconds precision.

  • connect: Always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns' precision.

geography.handling.mode

precise

Specifies how GEOGRAPHY and GEOGRAPHYPOINT columns are represented in change events. It can have the following values:

  • geometry: Uses io.debezium.data.geometry.Geometry to represent values, which contains a structure with two fields:

    • srid (INT32): Spatial reference system ID that defines the type of geometry object stored in the structure.

    • wkb (BYTES): Binary representation of the geometry object encoded in the Well-Known-Binary (wkb) format.

  • string: Represents values as string.

vector.handling.mode

string

Specifies how VECTOR type columns are represented in change events. It can have the following values:

  • string: Represents vector values using JSON strings.

  • binary: Uses binary.handling.mode to define how vectors are represented.

  • array: Represents vectors using the ARRAY type.

tombstones.on.delete

true

Specifies whether delete operations are represented by a delete event and a subsequent tombstone event (true) or only by a delete event (false). Generating the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record is deleted.

column.include.list

Specifies a regular expression. Matching columns are included in change events.

column.exclude.list

Specifies a regular expression. Matching columns are excluded from change events.

column.mask.hash.([^.]+).with.salt.(.+)

Specifies a comma-separated list of regular expressions matching fully-qualified names of columns that must be masked by hashing the input, using the specified hash algorithms and salt.

column.mask.with.(d+).chars

Specifies a comma-separated list of regular expressions matching fully-qualified names of columns that must be masked with the specified number of asterisks (*).

column.truncate.to.(d+).chars

Specifies a comma-separated list of regular expressions matching fully-qualified names of columns that must be truncated to the configured amount of characters.

column.propagate.source.type

Specifies a comma-separated list of regular expressions matching fully-qualified names of columns that add the column’s original type and original length as parameters to the corresponding field schemas in the emitted change records.

datatype.propagate.source.type

Specifies a comma-separated list of regular expressions matching the database-specific data type names that add the data type's original type and original length as parameters to the corresponding field schemas in the generated change records.

populate.internal.id

false

Specifies whether to add InternalId to the after field of the event message.

Advanced Connector Configuration Properties

The following advanced configuration properties have default values that work in most cases and they typically do not need to be specified in the connector’s configuration.

Property

Default Value

Description

converters

(Optional) Specifies a list of custom converters to use instead of default ones. The converters are defined using the <converter.prefix>.type option and configured using <converter.prefix>.<option>.

snapshot.mode

initial

Specifies the snapshot strategy to use on connector startup.

The connector supports the following snapshot modes:

  • initial (default): If the connector does not detect any offsets for the logical server name, it performs a full snapshot that captures the current state of the configured tables. After the snapshot completes, the connector starts streaming changes.

  • initial_only: The connector performs a full snapshot. Once the snapshot is complete, the connector stops, and does not stream any changes.

  • when_needed: Performs a snapshot only if the connector doesn't detect any offset or the detected offset is not valid or stale.

  • no_data: The connector does not perform snapshots. After the connector starts in no_data mode,

    • If there is a previously stored offset in the Kafka offsets topic, the connector continues streaming changes from that position.

    • If no offset is stored, the connector starts streaming changes from the tail of the database log.

event.processing.failure.handling.mode

fail

Specifies how failures that occur during event processing are handled, for example, failures because of a corrupted event. The connector supports the following modes:

  • fail (Default): Raises an exception that indicates the problematic event and its position and stops the connector.

  • warn: Logs the problematic event and its position and skips the event.

  • ignore: Skips the problematic event.

max.batch.size

2048

Specifies the maximum size of each batch of source records.

max.queue.size

8192

Specifies the maximum size of the queue for change events read from the database log but not yet recorded or forwarded.

max.queue.size.in.bytes

0 (disabled)

Specifies the maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded.

poll.interval.ms

500

Specifies the time (in milliseconds) that the connector waits for new change events to appear after receiving no events.

heartbeat.topics.prefix

__debezium-heartbeat

Specifies the prefix that is used to name heartbeat topics.

heartbeat.action.query

Specifies an optional query to execute with every heartbeat.

heartbeat.interval.ms

0 (disabled)

Specifies the time interval in milliseconds at which the connector periodically sends heartbeat messages to a heartbeat topic.

snapshot.delay.ms

0

Specifies the number of milliseconds to wait before a snapshot begins.

retriable.restart.connector.wait.ms

10000

Specifies the number of milliseconds to wait before restarting the connector after a retriable exception occurs.

skipped.operations

t

Specifies a comma-separated list of operations to skip during streaming. It can have the following values:

  • c for inserts/create

  • u for updates

  • d for deletes

  • t for truncates

  • none to indicate that nothing is skipped.

notification.enabled.channels

Specifies a list of notification channel names that are enabled.

topic.naming.strategy

Specifies the name of the TopicNamingStrategy Class used to determine the topic name for data change, schema change, transaction, heartbeat event, etc.

custom.metric.tags

Specifies the custom metric tags in key-value pairs to customize the MBean object name which is appended at the end of the regular name. Each key represents a tag for the MBean object name, and the corresponding value represents the value of that key. For example: key1=value1,key2=value2.

errors.max.retries

-1 (No limit)

Specifies the maximum number of retries on connection errors before failing.

sourceinfo.struct.maker

SingleStoreSourceInfoStructMaker

Specifies the name of the SourceInfoStructMaker Class that returns the SourceInfo schema and structure.

notification.sink.topic.name

Specifies the name of the topic for the notifications. This property is required if the sink is in the list of enabled channels.

post.processors

Specifies an optional list of post processors. The processors are defined using the <post.processor.prefix>.type option and configured using <post.processor.prefix>.<option>.

Last modified: April 1, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK