Important
The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.
Retrieve Kafka Properties
On this page
A Kafka message has multiple properties, such as, offset, partition, timestamp, and topic_.get_ function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats.KAFKA KEY clause in the pipeline definition.
Note
All Kafka properties are received as strings.
Retrieving Kafka Properties from a JSON Pipeline
The following CREATE TABLE statement creates a new table t, and adds four properties to each Kafka message.prop_, prop_, prop_, and prop_, then:
-
Create a new table t.
CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text); -
Create a pipeline to load data into this new table.
CREATE OR REPLACE PIPELINE p ASLOAD DATA kafka 'localhost:9092/jsontopic'INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)SET offset = get_kafka_pipeline_prop("prop_offset"),partition = get_kafka_pipeline_prop("prop_partition"),timestamp = get_kafka_pipeline_prop("prop_timestamp"),topicname = get_kafka_pipeline_prop("prop_topicname");
Note
-
SingleStore supports four predefined properties with names:
prop_,offset prop_,partition prop_, andtimestamp prop_.topicname -
To support Kafka properties or headers, SingleStore prefixes the Kafka message with its size before the actual payload.
When applying transforms with a Kafka properties request, ensure that this size prefix is not removed from the message. Refer to Writing a Transform to Use with a Pipeline for related information.
Example: Retrieving Kafka Properties with a Stored Procedure
The following example shows how to retrieve Kafka properties with a Stored Procedure.CONVERT clause is also used to typecast the properties into usable data types.
-
Create a new table
t.CREATE TABLE t(a INT, b INT, c INT,offset BIGINT, partition BIGINT, timestamp BIGINT, topicname Text); -
Create a Stored Procedure with the
CONVERTclause to typecast the Kafka properties.CREATE OR REPLACE PROCEDURE test_proc(batch QUERY( a INT, b INT, c INT,offset TEXT, partition TEXT, timestamp TEXT, topicname TEXT)AS BEGIN INSERT INTO t(a, b, c, offset, partition, timestamp, topicname)SELECT a, b, c,CONVERT(offset,SIGNED),CONVERT(partition, SIGNED),CONVERT(timestamp, SIGNED),topicnameFROM batch; -
Run a pipeline and retrieve the Kafka properties with the Stored Procedure.
CREATE OR REPLACE PIPELINE pAS LOAD DATA kafka 'localhost:9092/jsontopic'INTO PROCEDURE test_proc FORMAT JSON(a <- json_a, b <- json_b, c <- json_c)set offset = get_kafka_pipeline_prop("prop_offset"),partition = get_kafka_pipeline_prop("prop_partition"),timestamp = get_kafka_pipeline_prop("prop_timestamp"),topicname = get_kafka_pipeline_prop("prop_topicname"); -
Query the table t to view the typecasted properties.
SELECT * FROM t;+----+----+----+------+---------+-------------+---------+ | a | b | c | offset | partition | timestamp | topicname | +----+----+----+------+---------+-------------+---------+ | 4 | 5 | 6 | 1 | 0 | 1738318906369 | jsontopic | | 1 | 2 | 3 | 0 | 0 | 1738317123940 | jsontopic | +----+----+----+------+---------+-------------+---------+
Example: Retrieving Kafka Message Key
A Kafka message includes both a payload and a message key.KAFKA KEY clause to extract the message key and store it in a table column as part of pipeline ingestion.
The following example creates a table and a pipeline that extracts the Kafka message key and Kafka message properties.
-
Create a new table
t.CREATE TABLE t (inp_json JSON,kafka_key TEXT,kafka_partition INT,kafka_offset BIGINT,kafka_timestamp BIGINT); -
Create and run a pipeline to ingest Kafka messages into the table
t, including the message payload, message key, and Kafka metadata.CREATE OR REPLACE PIPELINE p ASLOAD DATA KAFKA 'localhost:9092/test_topic'INTO TABLE tFORMAT JSON (inp_json <- %)KAFKA KEY (kafka_key <- %)SET kafka_partition = get_kafka_pipeline_prop("prop_partition"),kafka_offset = get_kafka_pipeline_prop("prop_offset"),kafka_timestamp = get_kafka_pipeline_prop("prop_timestamp");
Note
-
Use
KAFKA KEYto ingest the Kafka message key as part of the table data. -
Use
get_to retrieve Kafka metadata properties such askafka_ pipeline_ prop() prop_,offset prop_,partition prop_, andtimestamp prop_.topicname -
The
get_function returns Kafka properties as strings.kafka_ pipeline_ prop() Cast them to the appropriate data type if required.
Kafka Headers
Kafka headers are metadata key-value pairs that you can add to Kafka messages.
The get_ function can be used to consume these headers.
message_headers = [
(“Country”,”USA”),
(“City”, “Arizona”),
(“Language”, “English”),
]Enter the header_ (“Country”, “City”, and “Language” in this example) in place of the “propname” in the argument of the get_ function to ingest the headers.
For example:
-
Create a new table t.
CREATE TABLE t (info text, country text, city text, language text); -
Create a pipeline to load data into this new table.
CREATE PIPELINE p ASLOAD DATA KAFKA 'localhost:9092/topic_name'(info)SET country = get_kafka_pipeline_prop("Country"),city = get_kafka_pipeline_prop("City"),language = get_kafka_pipeline_prop("Language");
Note
-
Currently, this function only ingests Kafka header values that are in string format.
-
When a transform is used with the Kafka properties or headers request, the transform script must not remove the size prefix from the message for the
get_function to work.kafka_ pipeline_ prop()
Last modified: March 19, 2026