Writing a Transform to Use with a Pipeline
On this page
A transform is one of Methods for Data Shaping with Pipelines of shaping data with a pipeline.
This topic discusses important considerations for writing transforms and provides an example implementation of a Kafka transform.
When data is extracted from the source, it’s streamed to the transform via the
stdin communication channel, in bytes.
For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes.
stdin stream in the following way:
Read the first eight bytes to determine the byte length of the actual message data that follows.
Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire
stdinstream has been read.
If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
Continue repeating this process until all data from
stdinhas been read.
The transform must write data to
stdout in the format specified in a
CREATE PIPELINE statement.
FIELDS TERMINATED BY string, or it may need to write
raw stream Avro with the specified schema.
stdout will be interpreted as if by a LOAD DATA statement with the same options as those given to a
CREATE PIPELINE statement.
Transactional guarantees apply to data written to
The CREATE PIPELINE .
The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it.
stdin to determine the length of the message, and then simply writes the contents of the message to
The code example works for both Python 2 and Python 3.
#!/usr/bin/env pythonimport structimport sysbinary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.bufferbinary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.bufferbinary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.bufferdef input_stream():"""Consume STDIN and yield each record that is received from SingleStoreDB"""while True:byte_len = binary_stdin.read(8)if len(byte_len) == 8:byte_len = struct.unpack("L", byte_len)result = binary_stdin.read(byte_len)yield resultelse:assert len(byte_len) == 0, byte_lenreturndef log(message):"""Log an informational message to stderr which will show up in SingleStoreDB inthe event of transform failure."""binary_stderr.write(message + b"\n")def emit(message):"""Emit a record back to SingleStoreDB by writing it to STDOUT. The recordshould be formatted as JSON, Avro, or CSV as it will be parsed byLOAD DATA."""binary_stdout.write(message + b"\n")log(b"Begin transform")# We start the transform here by reading from the input_stream() iterator.for data in input_stream():# Since this is an identity transform we just emit what we receive.emit(data)log(b"End transform")
The following transform is written in Python and reads TXT-formatted data from an extractor and modifies it.
A text file named "colors" was used.
celadon fuchsia charcoal aquamarine crimson ivory coral ultramarine green orange
Create a new table for the pipeline data.
CREATE TABLE colors(name TEXT);
In the Python file below, it is important to set the shebang (first line in the file) to match the location of the Python interpreter.
In this example, the words in the text file are going to be reversed.
#!/usr/bin/python3import sysfor line in sys.stdin:sys.stdout.write(line[::-1])
Create a pipeline using the text and Python files to transform the data.
CREATE PIPELINE color_list AS LOAD DATA FS '/tmp/colors.txt'WITH TRANSFORM ('file:///tmp/reverse.py', '', '')INTO TABLE colors fields terminated BY ',';
Start the pipeline.
START PIPELINE color_list;
View the contents of the table to verify the transform worked.
SELECT * FROM colors;
+-------------+ | name | +-------------+ | nodelec | | neerg | | yrovi | | aihsuf | | eniramauqa | | laroc | | eniramartlu | | egnaro | | laocrahc | | nosmirc | +-------------+
If a transform fails, the contents of
stderr will appear in PIPELINES_
If you were to modify the transform, you could reload it by running ALTER PIPELINE RELOAD TRANSFORM.
Last modified: September 21, 2023