Retrieve Kafka Properties

A Kafka message has multiple properties, such as, offset, partition, timestamp, and topic_name. Use the get_kafka_pipeline_prop("propname") function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats.

Note

All Kafka properties are received as strings. To use them as other data types, typecasting is required.

Retrieving Kafka Properties from a JSON Pipeline

The following CREATE TABLE statement creates a new table t, and adds four properties to each Kafka message. If the property names are prop_offset, prop_partition, prop_timestamp, and prop_topicname, then:

  1. Create a new table t.

    CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text);
  2. Create a pipeline to load data into this new table.

    CREATE OR REPLACE PIPELINE p AS
    LOAD DATA kafka 'localhost:9092/jsontopic'
    INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)
    SET offset = get_kafka_pipeline_prop("prop_offset"),
    partition = get_kafka_pipeline_prop("prop_partition"),
    timestamp = get_kafka_pipeline_prop("prop_timestamp"),
    topicname = get_kafka_pipeline_prop("prop_topicname");

Note

  • SingleStore supports four predefined properties with names: prop_offset, prop_partition, prop_timestamp, and prop_topicname.

  • To support Kafka properties or headers, the size of the Kafka message is prepended to the actual message. Do not remove this size from the message while using transforms with the Kafka properties request.

Example: Retrieving Kafka Properties with a Stored Procedure

The following example shows how to retrieve Kafka properties with a Stored Procedure. A CONVERT clause is also used to typecast the properties into usable data types. For more information on Stored Procedures, refer to CREATE PIPELINE ... INTO PROCEDURE.

  1. Create a new table t.

    CREATE TABLE t(a INT, b INT, c INT,
    offset BIGINT, partition BIGINT, timestamp BIGINT, topicname Text);
  2. Create a Stored Procedure with the CONVERT clause to typecast the Kafka properties.

    CREATE OR REPLACE PROCEDURE test_proc(
    batch QUERY( a INT, b INT, c INT,
    offset TEXT, partition TEXT, timestamp TEXT, topicname TEXT)
    AS BEGIN INSERT INTO t(a, b, c, offset, partition, timestamp, topicname)
    SELECT a, b, c,
    CONVERT(offset,SIGNED),
    CONVERT(partition, SIGNED),
    (timestamp, SIGNED),
    topicname
    FROM batch;
  3. Run a pipeline and retrieve the Kafka properties with the Stored Procedure.

    CREATE OR REPLACE PIPELINE p
    AS LOAD DATA kafka 'localhost:9092/jsontopic'
    INTO PROCEDURE test_proc FORMAT JSON
    (a <- json_a, b <- json_b, c <- json_c)
    set offset = get_kafka_pipeline_prop("prop_offset"),
    partition = get_kafka_pipeline_prop("prop_partition"),
    timestamp = get_kafka_pipeline_prop("prop_timestamp"),
    topicname = get_kafka_pipeline_prop("prop_topicname");
  4. Query the table t to view the typecasted properties.

    SELECT * FROM t;
    +----+----+----+------+---------+-------------+---------+
    | a | b | c | offset | partition | timestamp | topicname |
    +----+----+----+------+---------+-------------+---------+
    | 4 | 5 | 6 | 1 | 0 | 1738318906369 | jsontopic |
    | 1 | 2 | 3 | 0 | 0 | 1738317123940 | jsontopic |
    +----+----+----+------+---------+-------------+---------+

Kafka Headers

Kafka headers are metadata key-value pairs that can be added to Kafka messages. They provide additional context or information about the message without affecting the message payload. Each header consists of a key and a value. 

The get_kafka_pipeline_prop("propname") function can be used to consume these headers. For example, if a Kafka message has the headers:

message_headers = [	
(“Country”,”USA”),	
(“City”, “Arizona”),	
(“Language”, “English”),
]

Enter the header_key (“Country”, “City”, and “Language” in this example) instead of the “propname” in the argument of the get_kafka_pipeline_prop("propname") function to ingest the headers. 

For example:

  1. Create a new table t.

    CREATE TABLE t (info text, country text, city text, language text);
  2. Create a pipeline to load data into this new table.

    CREATE PIPELINE p AS
    LOAD DATA KAFKA 'localhost:9092/topic_name'
    (info)
    SET country = get_kafka_pipeline_prop("Country"),
    city = get_kafka_pipeline_prop("City"),
    language = get_kafka_pipeline_prop("Language");

Note

  • Currently, this function only ingests Kafka header values that are in string format.

  • If a transform is used with the Kafka properties or headers request, the transform script must not remove the size from the message for the get_kafka_pipeline_prop("propname") function to work.

Last modified: July 23, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK