Data Mapping

Default Table Name

By default, the SingleStore Kafka Sink Connector ("the connector") writes records from each Kafka topic to a SingleStore table with the same name as the Kafka topic. For example, records from a Kafka topic named orders are written to the orders table.

Write to a Custom Table

You can write records to a custom table using either of the following:

  • Specify the target table using the singlestore.tableName.<topicName> property.

  • Use dynamic record routing to map fields to specific tables.

Using the singlestore.tableName.<topicName> Property

To override the default behavior and explicitly specify the name of the target SingleStore table for a topic, use the singlestore.tableName.<topicName> configuration property. For example, the following property specifies that the records from the orders topic are written to a table named cust_orders.

singlestore.tableName.orders=cust_orders

Using Dynamic Record Routing

You can also configure dynamic routing of records to tables based on a field in the Kafka record. Use the singlestore.recordToTable.mappingField property to specify the field that determines the target table and use singlestore.recordToTable.mapping.<value> to map the field values to table names.

For example, consider the following configuration:

singlestore.recordToTable.mappingField=data.tableType
singlestore.recordToTable.mapping.sales=sales_data
singlestore.recordToTable.mapping.users=user_profiles

This configuration specifies that when a Kafka record contains a field data (which is a struct), it is mapped as follows based on the value of the tableType field:

  • If data contains a field tableType with the value sales, the record is written to the sales_data table.

  • If data contains a field tableType with the value users, the record is written to the user_profiles table.

Record-to-Row Mapping

The connector maps each Kafka record to a row in the target SingleStore table according to the structure of the record's value as follows:

Record value is not a Struct

The record is written as a row with a single column.

  • If the schema is defined, the column name is derived from the value schema field names.

  • If the schema is not defined, the column name defaults to data.

Record value is a Struct

Each field in the record is mapped to a column with the same name in the target table.

By default, all the fields are inserted into the table. To insert only a specific set of fields, use the field.whitelist property and specify the fields to insert. For example, specify fields.whitelist=ID,code,quantity to insert only the ID, code, and quantity fields from the Kafka record, and ignore all the other fields.

You can also explicitly specify field-to-column mappings using the singlestore.columnToField.<tableName>.<columnName> property, which allows you to map Kafka records fields, including nested fields, to specific columns in the target table. For example:

singlestore.columnToField.customer_orders.customer_id=payload.user.id
singlestore.columnToField.customer_orders.order_total=payload.order.total

In this configuration, the payload.user.id field is mapped to the customer_id column and the payload.order.total field is mapped to the order_total column in the customer_orders table.

Note

When either of the fields.whitelist and singlestore.columnToField.<tableName>.<columnName> properties are used, fields not included in the list or mapping are ignored.

Last modified: August 3, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK