# Kafka Pipeline Using Avro Format

Kafka pipelines can use Avro-formatted data for ingestion with the following minimum required syntax:

```sql
CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
FORMAT AVRO;
```

To skip errors without skipping the entire batch, use the `SKIP PARSER ERRORS` clause.

```sql
CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP PARSER ERRORS 
FORMAT AVRO;
```

The `SKIP ALL ERRORS` clause can also be used to skip all failed messages when added to the `CREATE PIPELINE` statement. For example:

```sql
CREATE PIPELINE <pipeline_name> AS
LOAD DATA KAFKA '<host.example.com/my-topic>'
INTO TABLE <table_name>
SKIP ALL ERRORS 
FORMAT AVRO;
```

Refer to [SKIP ALL ERRORS](https://docs.singlestore.com/cloud/reference/sql-reference/data-manipulation-language-dml/load-data/#UUID-8d8456d6-b63b-4a11-94e1-da02f8ee4083.md) for more information.

> **❗ Important**: The use of `SKIP PARSER ERRORS` and `SKIP ALL ERRORS` along with `WITH TRANSFORM` is not supported. Using both clauses will cause an error when creating or altering a pipeline.

If the `SKIP PARSER ERRORS` clause is added during create pipeline, the `pipelines_stop_on_error` configuration will be ignored.

* The warning message will be logged in the `pipelines_errors` table, adding the Kafka message in the `LOAD_DATA_LINE` column.
* The entire Kafka message which has the parse error will be logged.
* Skip the message and move to the next Kafka message.

If  the `SKIP PARSER ERRORS` clause is not added during create pipeline, the default behavior will remain the same.

* Error messages will be logged in the `pipelines_errors` table, adding the Kafka message in the `LOAD_DATA_LINE` column.
* The pipeline will stop or skip the current batch and move to the next batch based on whether the `pipelines_stop_on_error` variable is set to true or false.
* The engine variable `pipelines_parse_errors_threshold` will stop or skip the current batch if the number of errors exceeds the threshold value. This depends on whether the `pipelines_stop_on_error` variable is set to true or false.
  > **📝 Note**: Refer to [PIPELINES\_BATCHES\_SUMMARY](https://docs.singlestore.com/cloud/reference/information-schema-reference/data-ingest/pipelines-batches-summary.md) for more details about batches.

## Avro to JSON Type Conversions

SingleStore pipelines convert Avro data types to SQL data types during ingestion as follows:

* **Primitive types**: Avro primitive types, such as boolean, numbers, and strings, are first converted to strings and then cast to their closest matching SQL equivalents.
  > **📝 Note**: If an Avro string contains raw binary data (including non-UTF8 characters), each byte is represented as a Unicode escape sequence in the form `\u00XX`. For example, the byte sequence `'\xC0\xC1'` is serialized as `'\u00C0\u00C1'`.
* **Complex types**: The following Avro types are encoded using the Avro JSON encoder:

  * **Records**: Encodes the fields (in the sequence they are declared) as a JSON object.
  * **Array**: Encodes a homogeneous array of values to a JSON array.
  * **Map**: Encodes a homogeneous map of key-value pairs from string keys (`map<string T>`) to values as a JSON object with string keys.
  * **Union**: If its type is `NULL`, it is encoded as JSON `null`. Otherwise, it is encoded as a JSON object with type metadata. For example, `["null","string"]` with an active string branch is encoded as `{"string":"hello"}`; when the active branch is `null`, it is encoded as JSON `null`.

Refer to [Specification | Apache Avro](https://avro.apache.org/docs/1.12.0/specification/) for related information.

For example, consider the following Avro schema:

```json
{
  "type": "record",
  "name": "Event",
  "namespace": "com.example",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "name", "type": "string" },
    { "name": "score", "type": "float" },
    { "name": "active", "type": "boolean" },

    { "name": "payload", "type": {
      "type": "record",
      "name": "Payload",
      "fields": [
        { "name": "tag", "type": "string" },
        { "name": "count", "type": "long" }
      ]
    }},

    { "name": "tags", "type": {
      "type": "array",
      "items": "string"
    }},

    { "name": "props", "type": {
      "type": "map",
      "values": "long"
    }},

    { "name": "maybe_str", "type": ["null", "string"], "default": null }
  ]
}
```

and the following Avro data:

```json
{
  "id": 42,
  "name": "alpha",
  "score": 9.5,
  "active": true,
  "payload": { "tag": "t1", "count": 1234567890123 },
  "tags": ["a", "b"],
  "props": { "k1": 1, "k2": 2 },
  "maybe_str": "hello"
}
```

The Avro data is represented in JSON as follows:

```json
{
  "id": 42,
  "name": "alpha",
  "score": 9.5,
  "active": true,
  "payload": { "tag": "t1", "count": 1234567890123 },
  "tags": ["a", "b"],
  "props": { "k1": 1, "k2": 2 },
  "maybe_str": { "string": "hello" }
}
```

The following table shows the Avro fields types to equivalent SQL types mapping in this example:

| Field       | Avro Type       | SQL Type                          |
| ----------- | --------------- | --------------------------------- |
| `id`        | `int`           | `INT`                             |
| `name`      | `string`        | `VARCHAR`/`TEXT`                  |
| `score`     | `float`         | `FLOAT`                           |
| `active`    | `boolean`       | `BOOLEAN`                         |
| `payload`   | `record`        | `JSON`(object)                    |
| `tags`      | `array<string>` | `JSON`(array)                     |
| `props`     | `map<long>`     | `JSON`(object)                    |
| `maybe_str` | `union`         | `JSON`(object with type metadata) |

***

Modified at: December 5, 2025

Source: [/cloud/load-data/data-sources/load-data-from-kafka/kafka-pipeline-using-avro-format/](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/kafka-pipeline-using-avro-format/)

(An index of the documentation is available at /llms.txt)
