How the SingleStore Kafka Sink Connector Works

The SingleStore Kafka Sink connector ("the connector") provides a reliable and high-performance way to stream data from Kafka topics directly into SingleStore tables. The connector subscribes to one or more Kafka topics and processes records in batches. Each record must have a structured key and value, typically encoded in Avro or JSON formats.

By default, each Kafka topic is mapped to a SingleStore table with the same name. You can also specify custom topic-to-table mappings. Refer to Data Mapping for more information. If the target table does not exist in SingleStore, the connector automatically creates the table based on the schema of the incoming records. Refer to Automated Table Creation for more information.

Note

The connector only performs insert operations, and each Kafka record is inserted as a new row. It does not support update or delete operations. For duplicate keys, the behavior depends on the singlestore.upsert connector configuration property.

The connector also supports exactly-once delivery to ensure that each record is inserted only once even on retries or failures. Refer to Exactly-Once Delivery for more information.

Automated Table Creation

If the target table does not exist in SingleStore, the connector can automatically create tables, provided the schema of the Kafka record value is available. Refer to Data Mapping for information on how the table and column names are determined, including table-to-topic mapping, dynamic routing using record fields, and field-to-column mapping.

Table Keys

You can configure the connector to automatically add keys (indexes) to the new tables using the tableKey.<index_type>[.name] property, where:

  • index_type: Specifies the index to add. The connector supports the following values: PRIMARY, UNIQUE, SHARD, COLUMNSTORE, and KEY. Refer to Understanding Keys and Indexes in SingleStore for information on keys and indexes supported by SingleStore.

  • name: (Optional) Specifies a name for the key. For example tableKey.PRIMARY.key_primary_orders, where key_primary_orders is the name of the key.

Note

These keys (indexes) are only added to the tables when the connector automatically creates a table. If the table already exists, the connector uses it as-is and does not attempt to modify the table's schema or indexes.

The value of the tableKey.<index_type>[.name] property can be specified as a comma-separated list, for example:

tableKey.PRIMARY=id
tableKey.COLUMNSTORE=data,created_at
tableKey.UNIQUE.unique_email=email

In this example,

  • A primary key is created on the id column.

  • A columnstore key is created on data and created_at columns.

  • A unique key named unique_email is created on the email column.

Exactly-Once Delivery

The SingleStore Kafka Sink Connector supports exactly-once delivery to prevent ingesting duplicate data in the database, even in cases of retries or failures.

To enable exactly-once delivery, set the following property:

singlestore.metadata.allow=true

When exactly-once delivery is enabled, the connector creates a table named kafka_connect_transaction_metadata (default name unless specified) which tracks metadata for every transaction to ensure idempotency. Use the singlestore.metadata.table property to change the name of the metadata table. For example:

singlestore.metadata.table=my_custom_metadata_table

Each record in the kafka_connect_transaction_metadata tables includes:

  • A unique identifier consisting of Kafka topic, partition, and offset. This identifier guarantees the uniqueness of ingested data.

  • The number of records written in the transaction.

  • The timestamp of when the transaction occurred.

The data is written to both the target SingleStore table and metadata table within a single transaction. If any part of the process fails, the transaction is rolled back and no data is committed to the SingleStore database. Kafka only advances the offset after a successful commit and the same offset is reused if the operation fails. If the data is already inserted before a failure, the connector detects the duplicate identifier on the next ingestion attempt and skips reprocessing the record, ensuring exactly-once delivery.

Last modified: August 5, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK