Writing a Transform to Use with a Pipeline
On this page
A transform is one of Methods for Data Shaping with Pipelines of shaping data with a pipeline.
This topic discusses important considerations for writing transforms and provides an example implementation of a Kafka transform.
Reading from stdin
When data is extracted from the source, it’s streamed to the transform via the stdin
communication channel, in bytes.
Kafka Pipeline Byte Length Encoding
For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes.stdin
stream in the following way:
-
Read the first eight bytes to determine the byte length of the actual message data that follows.
-
Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire
stdin
stream has been read. -
If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
-
Continue repeating this process until all data from
stdin
has been read.
Writing to stdout
The transform must write data to stdout
in the format specified in a CREATE PIPELINE
statement.FIELDS TERMINATED BY
string, or it may need to write raw stream
Avro with the specified schema.stdout
will be interpreted as if by a LOAD DATA statement with the same options as those given to a CREATE PIPELINE
statement.
Note
Transactional guarantees apply to data written to stdout
, only.
Other Considerations for Writing Transforms
The CREATE PIPELINE .
Example Implementation of a Kafka Transform
The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it.stdin
to determine the length of the message, and then simply writes the contents of the message to stdout
.
Note
The code example works for both Python 2 and Python 3.
#!/usr/bin/env pythonimport structimport sysbinary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.bufferbinary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.bufferbinary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.bufferdef input_stream():"""Consume STDIN and yield each record that is received from SingleStore"""while True:byte_len = binary_stdin.read(8)if len(byte_len) == 8:byte_len = struct.unpack("L", byte_len)[0]result = binary_stdin.read(byte_len)yield resultelse:assert len(byte_len) == 0, byte_lenreturndef log(message):"""Log an informational message to stderr which will show up in SingleStore inthe event of transform failure."""binary_stderr.write(message + b"\n")def emit(message):"""Emit a record back to SingleStore by writing it to STDOUT. The recordshould be formatted as JSON, Avro, or CSV as it will be parsed byLOAD DATA."""binary_stdout.write(message + b"\n")log(b"Begin transform")# We start the transform here by reading from the input_stream() iterator.for data in input_stream():# Since this is an identity transform we just emit what we receive.emit(data)log(b"End transform")
Example Implementation of a Python Transform
The following transform is written in Python and reads TXT-formatted data from an extractor and modifies it.
A text file named "colors" was used.
colors.txt
celadon
fuchsia
charcoal
aquamarine
crimson
ivory
coral
ultramarine
green
orange
Create a new table for the pipeline data.
CREATE TABLE colors(name TEXT);
In the Python file below, it is important to set the shebang (first line in the file) to match the location of the Python interpreter.
In this example, the words in the text file are going to be reversed.
#!/usr/bin/python3import sysfor line in sys.stdin:sys.stdout.write(line[::-1])
Create a pipeline using the text and Python files to transform the data.
CREATE PIPELINE color_list AS LOAD DATA FS '/tmp/colors.txt'WITH TRANSFORM ('file:///tmp/reverse.py', '', '')INTO TABLE colors fields terminated BY ',';
Start the pipeline.
START PIPELINE color_list;
View the contents of the table to verify the transform worked.
SELECT * FROM colors;
+-------------+
| name |
+-------------+
| nodelec |
| neerg |
| yrovi |
| aihsuf |
| eniramauqa |
| laroc |
| eniramartlu |
| egnaro |
| laocrahc |
| nosmirc |
+-------------+
Error Logging Upon Transform Failure
If a transform fails, the contents of stderr
will appear in PIPELINES_
Modifying the Transform
If you were to modify the transform, you could reload it by running ALTER PIPELINE RELOAD TRANSFORM.
See Also
Last modified: October 19, 2023