# How the SingleStore Kafka Sink Connector Works

The SingleStore Kafka Sink connector ("the connector") provides a reliable and high-performance way to stream data from Kafka topics directly into SingleStore tables. The connector subscribes to one or more Kafka topics and processes records in batches. Each record must have a structured key and value, typically encoded in Avro or JSON formats.

By default, each Kafka topic is mapped to a SingleStore table with the same name. You can also specify custom topic-to-table mappings. Refer to [Data Mapping](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/singlestore-kafka-sink-connector/data-mapping.md) for more information. If the target table does not exist in SingleStore, the connector automatically creates the table based on the schema of the incoming records. Refer to [Automated Table Creation](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/singlestore-kafka-sink-connector/how-the-singlestore-kafka-sink-connector-works/#section-idm235064277766275.md) for more information.

> **📝 Note**: The connector performs insert operations by default, and each Kafka record is inserted as a new row. It does not support standalone update or delete operations. To enable upsert behavior (update a row on duplicate key), set the `singlestore.upsert` connector configuration property to `true`.

The connector also supports exactly-once delivery to ensure that each record is inserted only once even on retries or failures. Refer to [Exactly-Once Delivery](https://docs.singlestore.com/#section-idm235065099958771.md) for more information.

## Automated Table Creation

If the target table does not exist in SingleStore, the connector can automatically create tables, provided the schema of the Kafka record value is available. Refer to [Data Mapping](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/singlestore-kafka-sink-connector/data-mapping.md) for information on how the table and column names are determined, including table-to-topic mapping, dynamic routing using record fields, and field-to-column mapping.

## Table Keys

You can configure the connector to automatically add keys (indexes) to the new tables using the `tableKey.<index_type>[.name]` property, where:

* `index_type`: Specifies the index to add. The connector supports the following values: `PRIMARY`, `UNIQUE`, `SHARD`, `COLUMNSTORE`, and `KEY`. Refer to [Understanding Keys and Indexes in SingleStore](https://docs.singlestore.com/db/v9.1/create-a-database/understanding-keys-and-indexes-in-singlestore.md) for information on keys and indexes supported by SingleStore.
* `name`: (Optional) Specifies a name for the key. For example `tableKey.PRIMARY.key_primary_orders`, where **key\_primary\_orders** is the name of the key.

> **📝 Note**: These keys (indexes) are only added to the tables when the connector automatically creates a table. If the table already exists, the connector uses it as-is and does not attempt to modify the table's schema or indexes.

The value of the `tableKey.<index_type>[.name]` property can be specified as a comma-separated list, for example:

```
tableKey.PRIMARY=id
tableKey.COLUMNSTORE=data,created_at
tableKey.UNIQUE.unique_email=email

```

In this example,

* A primary key is created on the `id` column.
* A columnstore key is created on `data` and `created_at` columns.
* A unique key named `unique_email` is created on the `email` column.

## Exactly-Once Delivery

The SingleStore Kafka Sink Connector supports exactly-once delivery to prevent ingesting duplicate data in the database, even in cases of retries or failures.

To enable exactly-once delivery, set the following property:

```
singlestore.metadata.allow=true
```

When exactly-once delivery is enabled, the connector creates a table named `kafka_connect_transaction_metadata` (default name unless specified) which tracks metadata for every transaction to ensure idempotency. Use the `singlestore.metadata.table` property to change the name of the metadata table. For example:

```
singlestore.metadata.table=my_custom_metadata_table
```

Each record in the `kafka_connect_transaction_metadata` tables includes:

* A unique identifier consisting of Kafka topic, partition, and offset. This identifier guarantees the uniqueness of ingested data.
* The number of records written in the transaction.
* The timestamp of when the transaction occurred.

The data is written to both the target SingleStore table and metadata table within a single transaction. If any part of the process fails, the transaction is rolled back and no data is committed to the SingleStore database. Kafka only advances the offset after a successful commit and the same offset is reused if the operation fails. If the data is already inserted before a failure, the connector detects the duplicate identifier on the next ingestion attempt and skips reprocessing the record, ensuring exactly-once delivery.

***

Modified at: June 5, 2026

Source: [/db/v9.1/load-data/integrate-with-singlestore/singlestore-kafka-sink-connector/how-the-singlestore-kafka-sink-connector-works/](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/singlestore-kafka-sink-connector/how-the-singlestore-kafka-sink-connector-works/)

(An index of the documentation is available at /llms.txt)
