This topic does not apply to SingleStore Managed Service.
A transform is one of three methods of shaping data with a pipeline.
This topic discusses important considerations for writing transforms and provides an example implementation of a Kafka transform.
When data is extracted from the source, it’s streamed to the transform via the
stdin communication channel, in bytes. However, the bytes are encoded differently for different extractors. For example, while an S3 pipeline streams raw bytes from the extractor to the transform, a Kafka pipeline streams byte length encoded data.
Kafka Pipeline Byte Length Encoding
For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the
stdin stream in the following way:
- Read the first eight bytes to determine the byte length of the actual message data that follows.
- Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire
stdinstream has been read.
- If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
- Continue repeating this process until all data from
stdinhas been read.
The transform must write data to
stdout in the format specified in a
CREATE PIPELINE statement. For example, it may need to write CSV with the specified
FIELDS TERMINATED BY string, or it may need to write “raw stream” Avro with the specified schema. The bytes written to
stdout will be interpreted as if by a LOAD DATA statement with the same options as those given to a
CREATE PIPELINE statement.
Transactional guarantees apply to data written to
stdout, only. There are no transactional guarantees for any side effects that are coded in the transform logic.
Other Considerations for Writing Transforms
The CREATE PIPELINE … WITH TRANSFORM topic discusses additional considerations for writing transforms.
Example Implementation of a Kafka Transform
The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of
stdin to determine the length of the message, and then simply writes the contents of the message to
The code example works for both Python 2 and Python 3.
At the top of your transform file, use a shebang to specify the interpreter to use to execute the script (Python in this case).
#!/usr/bin/env python import struct import sys binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer def input_stream(): """ Consume STDIN and yield each record that is received from SingleStore DB """ while True: byte_len = binary_stdin.read(8) if len(byte_len) == 8: byte_len = struct.unpack("L", byte_len) result = binary_stdin.read(byte_len) yield result else: assert len(byte_len) == 0, byte_len return def log(message): """ Log an informational message to stderr which will show up in SingleStore DB in the event of transform failure. """ binary_stderr.write(message + b"\n") def emit(message): """ Emit a record back to SingleStore DB by writing it to STDOUT. The record should be formatted as JSON, Avro, or CSV as it will be parsed by LOAD DATA. """ binary_stdout.write(message + b"\n") log(b"Begin transform") # We start the transform here by reading from the input_stream() iterator. for data in input_stream(): # Since this is an identity transform we just emit what we receive. emit(data) log(b"End transform")
Error Logging Upon Transform Failure
If a transform fails, the contents of
stderr will appear in information_schema.PIPELINES_ERRORS. Here, failure means the transform returned a non-zero exit status code to the operating system.
Modifying the Transform
If you were to modify the transform, you could reload it by running ALTER PIPELINE … RELOAD TRANSFORM.