Writing a Transform to Use with a Pipeline

A transform is one of Methods for Data Shaping with Pipelines of shaping data with a pipeline.

This topic discusses important considerations for writing transforms and provides an example implementation of a Kafka transform.

Reading from stdin

When data is extracted from the source, it’s streamed to the transform via the stdin communication channel, in bytes. However, the bytes are encoded differently for different extractors. For example, while an S3 pipeline streams raw bytes from the extractor to the transform, a Kafka pipeline streams byte length encoded data.

Kafka Pipeline Byte Length Encoding

For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the stdin stream in the following way:

  1. Read the first eight bytes to determine the byte length of the actual message data that follows.

  2. Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire stdin stream has been read.

  3. If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.

  4. Continue repeating this process until all data from stdin has been read.

Writing to stdout

The transform must write data to stdout in the format specified in a CREATE PIPELINE statement. For example, it may need to write CSV with the specified FIELDS TERMINATED BY string, or it may need to write raw stream Avro with the specified schema. The bytes written to stdout will be interpreted as if by a LOAD DATA statement with the same options as those given to a CREATE PIPELINE statement.

Note

Transactional guarantees apply to data written to stdout, only. There are no transactional guarantees for any side effects that are coded in the transform logic.

Other Considerations for Writing Transforms

The CREATE PIPELINE ... WITH TRANSFORM topic discusses additional considerations for writing transforms.

Example Implementation of a Kafka Transform

The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of stdin to determine the length of the message, and then simply writes the contents of the message to stdout.

Note

The code example works for both Python 2 and Python 3. At the top of your transform file, use a shebang to specify the interpreter to use to execute the script (Python in this case).

#!/usr/bin/env python
import struct
import sys
binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer
def input_stream():
"""
Consume STDIN and yield each record that is received from SingleStore
"""
while True:
byte_len = binary_stdin.read(8)
if len(byte_len) == 8:
byte_len = struct.unpack("L", byte_len)[0]
result = binary_stdin.read(byte_len)
yield result
else:
assert len(byte_len) == 0, byte_len
return
def log(message):
"""
Log an informational message to stderr which will show up in SingleStore in
the event of transform failure.
"""
binary_stderr.write(message + b"\n")
def emit(message):
"""
Emit a record back to SingleStore by writing it to STDOUT. The record
should be formatted as JSON, Avro, or CSV as it will be parsed by
LOAD DATA.
"""
binary_stdout.write(message + b"\n")
log(b"Begin transform")
# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
# Since this is an identity transform we just emit what we receive.
emit(data)
log(b"End transform")

Example Implementation of a Python Transform

The following transform is written in Python and reads TXT-formatted data from an extractor and modifies it.

A text file named "colors" was used.

colors.txt
celadon
fuchsia
charcoal
aquamarine
crimson
ivory
coral
ultramarine
green
orange

Create a new table for the pipeline data.

CREATE TABLE colors(name TEXT);

In the Python file below, it is important to set the shebang (first line in the file) to match the location of the Python interpreter.

In this example, the words in the text file are going to be reversed.

#!/usr/bin/python3
import sys
for line in sys.stdin:
sys.stdout.write(line[::-1])

Create a pipeline using the text and Python files to transform the data.

CREATE PIPELINE color_list AS LOAD DATA FS '/tmp/colors.txt'
WITH TRANSFORM ('file:///tmp/reverse.py', '', '')
INTO TABLE colors fields terminated BY ',';

Start the pipeline.

START PIPELINE color_list;

View the contents of the table to verify the transform worked.

SELECT * FROM colors;
+-------------+
| name        |
+-------------+
| nodelec     |
| neerg       |
| yrovi       |
| aihsuf      |
| eniramauqa  |
| laroc       |
| eniramartlu |
| egnaro      |
| laocrahc    |
| nosmirc     |
+-------------+

Error Logging Upon Transform Failure

If a transform fails, the contents of stderr will appear in PIPELINES_ERRORS. Here, failure means the transform returned a non-zero exit status code to the operating system.

Modifying the Transform

If you were to modify the transform, you could reload it by running ALTER PIPELINE RELOAD TRANSFORM.

See Also

Last modified: October 19, 2023

Was this article helpful?