Writing a Transform to Use With a Pipeline min read


Info

This topic does not apply to SingleStore Managed Service.

A transform is one of three methods of shaping data with a pipeline.

This topic discusses important considerations for writing transforms and provides an example implementation of a Kafka transform.

Reading from stdin

When data is extracted from the source, it’s streamed to the transform via the stdin communication channel, in bytes. However, the bytes are encoded differently for different extractors. For example, while an S3 pipeline streams raw bytes from the extractor to the transform, a Kafka pipeline streams byte length encoded data.

Kafka Pipeline Byte Length Encoding

For a Kafka pipeline, any data streamed to the transform is byte length encoded: the first eight bytes of the stream indicates the length of the message in bytes. Your transform should process the stdin stream in the following way:

  1. Read the first eight bytes to determine the byte length of the actual message data that follows.
  2. Read the number of bytes indicated by Step 1, which is the actual message data, then convert it to a string and store it until the entire stdin stream has been read.
  3. If the stream still contains more data, read eight more bytes to determine the byte length of the next message, then repeat Step 2 again.
  4. Continue repeating this process until all data from stdin has been read.

Writing to stdout

The transform must write data to stdout in the format specified in a CREATE PIPELINE statement. For example, it may need to write CSV with the specified FIELDS TERMINATED BY string, or it may need to write “raw stream” Avro with the specified schema. The bytes written to stdout will be interpreted as if by a LOAD DATA statement with the same options as those given to a CREATE PIPELINE statement.

Info

Transactional guarantees apply to data written to stdout, only. There are no transactional guarantees for any side effects that are coded in the transform logic.

Other Considerations for Writing Transforms

The CREATE PIPELINE … WITH TRANSFORM topic discusses additional considerations for writing transforms.

Example Implementation of a Kafka Transform

The following transform is written in Python and reads CSV-formatted data from an extractor without modifying it. This transform reads the first eight bytes of stdin to determine the length of the message, and then simply writes the contents of the message to stdout.

Info

The code example works for both Python 2 and Python 3.

At the top of your transform file, use a shebang to specify the interpreter to use to execute the script (Python in this case).

#!/usr/bin/env python

import struct
import sys

binary_stdin = sys.stdin if sys.version_info < (3, 0) else sys.stdin.buffer
binary_stderr = sys.stderr if sys.version_info < (3, 0) else sys.stderr.buffer
binary_stdout = sys.stdout if sys.version_info < (3, 0) else sys.stdout.buffer

def input_stream():
    """
        Consume STDIN and yield each record that is received from SingleStore DB
    """
    while True:
        byte_len = binary_stdin.read(8)
        if len(byte_len) == 8:
            byte_len = struct.unpack("L", byte_len)[0]
            result = binary_stdin.read(byte_len)
            yield result
        else:
            assert len(byte_len) == 0, byte_len
            return


def log(message):
    """
        Log an informational message to stderr which will show up in SingleStore DB in
        the event of transform failure.
    """
    binary_stderr.write(message + b"\n")


def emit(message):
    """
        Emit a record back to SingleStore DB by writing it to STDOUT.  The record
        should be formatted as JSON, Avro, or CSV as it will be parsed by
        LOAD DATA.
    """
    binary_stdout.write(message + b"\n")

log(b"Begin transform")

# We start the transform here by reading from the input_stream() iterator.
for data in input_stream():
    # Since this is an identity transform we just emit what we receive.
    emit(data)

log(b"End transform")

Error Logging Upon Transform Failure

If a transform fails, the contents of stderr will appear in information_schema.PIPELINES_ERRORS. Here, failure means the transform returned a non-zero exit status code to the operating system.

Modifying the Transform

If you were to modify the transform, you could reload it by running ALTER PIPELINE … RELOAD TRANSFORM.

See Also