Retrieve Kafka Properties
On this page
A Kafka message has multiple properties, such as, offset
, partition
, timestamp
, and topic_
.get_
function to retrieve these properties for pipelines that ingest data in CSV, JSON, and AVRO formats.
Note
All Kafka properties are received as strings.
Retrieving Kafka Properties from a JSON Pipeline
The following CREATE TABLE
statement creates a new table t, and adds four properties to each Kafka message.prop_
, prop_
, prop_
, and prop_
, then:
-
Create a new table t.
CREATE TABLE t(a INT, b JSON, c INT, offset Text, partition Text, timestamp Text, topicname Text); -
Create a pipeline to load data into this new table.
CREATE OR REPLACE PIPELINE p ASLOAD DATA kafka 'localhost:9092/jsontopic'INTO TABLE t FORMAT JSON (a <- json_a, b <- json_b, c <- json_c)SET offset = get_kafka_pipeline_prop("prop_offset"),partition = get_kafka_pipeline_prop("prop_partition"),timestamp = get_kafka_pipeline_prop("prop_timestamp"),topicname = get_kafka_pipeline_prop("prop_topicname");
Note
-
SingleStore supports four predefined properties with names:
prop_
,offset prop_
,partition prop_
, andtimestamp prop_
.topicname -
To support Kafka properties or headers, the size of the Kafka message is prepended to the actual message.
Do not remove this size from the message while using transforms with the Kafka properties request.
Example: Retrieving Kafka Properties with a Stored Procedure
The following example shows how to retrieve Kafka properties with a Stored Procedure.CONVERT
clause is also used to typecast the properties into usable data types.
-
Create a new table
t
.CREATE TABLE t(a INT, b INT, c INT,offset BIGINT, partition BIGINT, timestamp BIGINT, topicname Text); -
Create a Stored Procedure with the
CONVERT
clause to typecast the Kafka properties.CREATE OR REPLACE PROCEDURE test_proc(batch QUERY( a INT, b INT, c INT,offset TEXT, partition TEXT, timestamp TEXT, topicname TEXT)AS BEGIN INSERT INTO t(a, b, c, offset, partition, timestamp, topicname)SELECT a, b, c,CONVERT(offset,SIGNED),CONVERT(partition, SIGNED),(timestamp, SIGNED),topicnameFROM batch; -
Run a pipeline and retrieve the Kafka properties with the Stored Procedure.
CREATE OR REPLACE PIPELINE pAS LOAD DATA kafka 'localhost:9092/jsontopic'INTO PROCEDURE test_proc FORMAT JSON(a <- json_a, b <- json_b, c <- json_c)set offset = get_kafka_pipeline_prop("prop_offset"),partition = get_kafka_pipeline_prop("prop_partition"),timestamp = get_kafka_pipeline_prop("prop_timestamp"),topicname = get_kafka_pipeline_prop("prop_topicname"); -
Query the table t to view the typecasted properties.
SELECT * FROM t;+----+----+----+------+---------+-------------+---------+ | a | b | c | offset | partition | timestamp | topicname | +----+----+----+------+---------+-------------+---------+ | 4 | 5 | 6 | 1 | 0 | 1738318906369 | jsontopic | | 1 | 2 | 3 | 0 | 0 | 1738317123940 | jsontopic | +----+----+----+------+---------+-------------+---------+
Kafka Headers
Kafka headers are metadata key-value pairs that can be added to Kafka messages.
The get_
function can be used to consume these headers.
message_headers = [
(“Country”,”USA”),
(“City”, “Arizona”),
(“Language”, “English”),
]
Enter the header_
(“Country”, “City”, and “Language” in this example) instead of the “propname” in the argument of the get_
function to ingest the headers.
For example:
-
Create a new table t.
CREATE TABLE t (info text, country text, city text, language text); -
Create a pipeline to load data into this new table.
CREATE PIPELINE p ASLOAD DATA KAFKA 'localhost:9092/topic_name'(info)SET country = get_kafka_pipeline_prop("Country"),city = get_kafka_pipeline_prop("City"),language = get_kafka_pipeline_prop("Language");
Note
-
Currently, this function only ingests Kafka header values that are in string format.
-
If a transform is used with the Kafka properties or headers request, the transform script must not remove the size from the message for the
get_
function to work.kafka_ pipeline_ prop("propname")
Last modified: July 23, 2025