Important

The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.

Retrieve Kafka Properties

A Kafka message has multiple properties, such as, offset, partition, timestamp, and topic_name. Use the get_kafka_pipeline_prop(<property>) function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats. In addition to retrieving Kafka metadata properties, you can also extract the Kafka message key using the KAFKA KEY clause in the pipeline definition.

Note

All Kafka properties are received as strings. To use them as other data types, typecasting is required.

Retrieving Kafka Properties from a JSON Pipeline

The following CREATE TABLE statement creates a new table t, and adds four properties to each Kafka message. If the property names are prop_offset, prop_partition, prop_timestamp, and prop_topicname, then:

  1. Create a new table t.

    CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text);
  2. Create a pipeline to load data into this new table.

    CREATE OR REPLACE PIPELINE p AS
    LOAD DATA kafka 'localhost:9092/jsontopic'
    INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)
    SET offset = get_kafka_pipeline_prop("prop_offset"),
    partition = get_kafka_pipeline_prop("prop_partition"),
    timestamp = get_kafka_pipeline_prop("prop_timestamp"),
    topicname = get_kafka_pipeline_prop("prop_topicname");

Note

  • SingleStore supports four predefined properties with names: prop_offset, prop_partition, prop_timestamp, and prop_topicname.

  • To support Kafka properties or headers, SingleStore prefixes the Kafka message with its size before the actual payload. When applying transforms with a Kafka properties request, ensure that this size prefix is not removed from the message. Refer to Writing a Transform to Use with a Pipeline for related information.

Example: Retrieving Kafka Properties with a Stored Procedure

The following example shows how to retrieve Kafka properties with a Stored Procedure. A CONVERT clause is also used to typecast the properties into usable data types. For more information on Stored Procedures, refer to CREATE PIPELINE ... INTO PROCEDURE.

  1. Create a new table t.

    CREATE TABLE t(a INT, b INT, c INT,
    offset BIGINT, partition BIGINT, timestamp BIGINT, topicname Text);
  2. Create a Stored Procedure with the CONVERT clause to typecast the Kafka properties.

    CREATE OR REPLACE PROCEDURE test_proc(
    batch QUERY( a INT, b INT, c INT,
    offset TEXT, partition TEXT, timestamp TEXT, topicname TEXT)
    AS BEGIN INSERT INTO t(a, b, c, offset, partition, timestamp, topicname)
    SELECT a, b, c,
    CONVERT(offset,SIGNED),
    CONVERT(partition, SIGNED),
    CONVERT(timestamp, SIGNED),
    topicname
    FROM batch;
  3. Run a pipeline and retrieve the Kafka properties with the Stored Procedure.

    CREATE OR REPLACE PIPELINE p
    AS LOAD DATA kafka 'localhost:9092/jsontopic'
    INTO PROCEDURE test_proc FORMAT JSON
    (a <- json_a, b <- json_b, c <- json_c)
    set offset = get_kafka_pipeline_prop("prop_offset"),
    partition = get_kafka_pipeline_prop("prop_partition"),
    timestamp = get_kafka_pipeline_prop("prop_timestamp"),
    topicname = get_kafka_pipeline_prop("prop_topicname");
  4. Query the table t to view the typecasted properties.

    SELECT * FROM t;
    +----+----+----+------+---------+-------------+---------+
    | a | b | c | offset | partition | timestamp | topicname |
    +----+----+----+------+---------+-------------+---------+
    | 4 | 5 | 6 | 1 | 0 | 1738318906369 | jsontopic |
    | 1 | 2 | 3 | 0 | 0 | 1738317123940 | jsontopic |
    +----+----+----+------+---------+-------------+---------+

Example: Retrieving Kafka Message Key

A Kafka message includes both a payload and a message key. Use the KAFKA KEY clause to extract the message key and store it in a table column as part of pipeline ingestion. SingleStore supports Kafka message keys only for Avro and JSON payloads, and you must encode the key in the corresponding Avro and JSON format.

The following example creates a table and a pipeline that extracts the Kafka message key and Kafka message properties.

  1. Create a new table t.

    CREATE TABLE t (
    inp_json JSON,
    kafka_key TEXT,
    kafka_partition INT,
    kafka_offset BIGINT,
    kafka_timestamp BIGINT
    );
  2. Create and run a pipeline to ingest Kafka messages into the table t, including the message payload, message key, and Kafka metadata.

    CREATE OR REPLACE PIPELINE p AS
    LOAD DATA KAFKA 'localhost:9092/test_topic'
    INTO TABLE t
    FORMAT JSON (inp_json <- %)
    KAFKA KEY (kafka_key <- %)
    SET kafka_partition = get_kafka_pipeline_prop("prop_partition"),
    kafka_offset = get_kafka_pipeline_prop("prop_offset"),
    kafka_timestamp = get_kafka_pipeline_prop("prop_timestamp");

Note

  • Use KAFKA KEY to ingest the Kafka message key as part of the table data.

  • Use get_kafka_pipeline_prop() to retrieve Kafka metadata properties such as prop_offset, prop_partition, prop_timestamp, and prop_topicname.

  • The get_kafka_pipeline_prop() function returns Kafka properties as strings. Cast them to the appropriate data type if required.

Kafka Headers

Kafka headers are metadata key-value pairs that you can add to Kafka messages. They provide additional context or information about the message without affecting the message payload. Each header consists of a key and a value. 

The get_kafka_pipeline_prop("propname") function can be used to consume these headers. For example, if a Kafka message has the headers:

message_headers = [
(“Country”,”USA”),
(“City”, “Arizona”),
(“Language”, “English”),
]

Enter the header_key (“Country”, “City”, and “Language” in this example) in place of the “propname” in the argument of the get_kafka_pipeline_prop("propname") function to ingest the headers. 

For example:

  1. Create a new table t.

    CREATE TABLE t (info text, country text, city text, language text);
  2. Create a pipeline to load data into this new table.

    CREATE PIPELINE p AS
    LOAD DATA KAFKA 'localhost:9092/topic_name'
    (info)
    SET country = get_kafka_pipeline_prop("Country"),
    city = get_kafka_pipeline_prop("City"),
    language = get_kafka_pipeline_prop("Language");

Note

  • Currently, this function only ingests Kafka header values that are in string format.

  • When a transform is used with the Kafka properties or headers request, the transform script must not remove the size prefix from the message for the get_kafka_pipeline_prop() function to work.

Last modified: March 19, 2026

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.