Load Data from Avro Files

Syntax for Avro LOAD DATA

Below is the basic syntax for loading data from a local Avro file into a table.

LOAD DATA [LOCAL] INFILE 'file_name'
WHERE/SET/SKIP ERRORS[REPLACE | SKIP { CONSTRAINT | DUPLICATE KEY } ERRORS]
INTO TABLE tbl_name
FORMAT AVRO SCHEMA REGISTRY {"IP" | "Hostname"}
subvalue_mapping
[SET col_name = expr,...]
[WHERE expr,...]
[MAX_ERRORS number]
[ERRORS HANDLE string]
[SCHEMA 'avro_schema']
subvalue_mapping:
( {col_name | @variable_name} <- subvalue_path, ...)
subvalue_path:
{% | [%::]ident [::ident ...]

Order of Operations for Updating an Avro Schema and its Associated Database Objects

When updating an Avro schema, follow the steps below (in order) to modify the pipeline and the pipeline’s target table to match the updated schema. The schema (which resides in the schema registry) can be modified at any time during this process.

  1. Run STOP PIPELINE to detach the Pipeline. When adding a field to the schema, this step is not required.

  2. Using ALTER TABLE, to add or remove fields from the pipeline’s target tables.

  3. Using ALTER PIPELINE or CREATE OR REPLACE PIPELINE, to add or remove fields from the pipeline.

  4. Run START PIPELINE to start the pipeline. If a field is added to the schema and the pipeline is running, this step is not required.

If adding a field to the schema prior to adding a field to the pipeline and its target table, and have not stopped the Pipeline, the Pipeline continues to run. The newly added field will not contain data unless it has also been added to the pipeline's target table. If a field is added to the schema before being added to a running pipeline and its target table, the pipeline will continue to run.

Before the new field is added to the schema, a default value for the field is populated in the pipeline’s target table. See Example 1: Adding a New Field to an Avro Schema below for details.

Schema Evolution Examples

Example 1: Adding a New Field to an Avro Schema

Consider an Avro schema product with the fields id and color:

{
"type": "record",
"name": "product",
"fields": [
{ "name": "id",
"type": "long"
},
{
"name": "color",
"type": [
"null",
"string"
]
}
]
}

The fields are loaded into the SingleStore table t through the Pipeline via the following commands:

CREATE TABLE t(id BIGINT DEFAULT 0, color VARCHAR(30) DEFAULT NULL, input_record JSON DEFAULT NULL);
CREATE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
INTO TABLE t
FORMAT AVRO SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
(id <- %::id, color <- %::color, input_record <- %);

Now, the Avro schema evolves with a new field price:

{
"type": "record",
"name": "product",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "color",
"type": [
"null",
"string"
]
},
{
"name": "price",
"type": "float"
}
]
}

To reflect the addition of the new field, you need to first update the table t along with the default value of the new field age:

ALTER TABLE t ADD field price FLOAT DEFAULT NULL;

Using the CREATE OR REPLACE PIPELINE command, update the Pipeline to load the new field. This allows the Pipeline to continue running without losing any offsets.

CREATE OR REPLACE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
INTO TABLE t FORMAT AVRO SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
(id <- %::id, color <- %::color, price <- %::price DEFAULT NULL, input_record <- %);

Populating the Pipeline’s Target Table with a Default Value

If, prior to updating the Avro schema to include the new field price, you updated the Pipeline and its target table to include the field, the table’s field would be populated with the DEFAULT value. This value can be specified either in the ALTER statement that alters the table (DEFAULT NULL in this example), or in the CREATE OR REPLACE PIPELINE statement that alters the Pipeline (DEFAULT NULL in this example). If the DEFAULT value is specified in both places, as in this example, the table’s DEFAULT value is used. If the DEFAULT value is specified in neither place, an error is thrown.

Example 2: Removing a Field from an Avro Schema

Consider an Avro schema product with the fields id, color, and price:

{
"type": "record",
"name": "product",
"fields": [
{
"name": "id",
"type": "long"
},
{ "name": "color",
"type": [
"null",
"string"
]
},
{
"name": "price",
"type": "float"
}
]
}

Load the schema into the SingleStore table t through the Pipeline by the following command:

CREATE OR REPLACE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
INTO TABLE t FORMAT AVRO SCHEMA REGISTRY "your_schema_registry_host_name_or_ip:your_schema_registry_port"
(id <- %::id, color <- %::color, price <- %::price, input_record <- %);

Now, the Avro schema evolves by no longer containing the field price:

{
"type": "record",
"name": "data",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "color",
"type": [
"null",
"string"
]
}
]
}

Run the following commands in sequence to evolve the schema:

STOP PIPELINE DETACH p;

Note: The DETACH option in the STOP PIPELINE command allows to temporarily stop a pipeline in order to make changes to a source table to remove a field.

ALTER TABLE t REMOVE field age;
CREATE OR REPLACE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
INTO TABLE t FORMAT AVRO SCHEMA REGISTRY "" (id <- %::id, color <- %::color, input_record <- %);
START PIPELINE p;

Example 3: Connecting to Confluent Schema Registry Over SSL

Note

This example only applies to SingleStore 7.3.5 and later.

The following example shows how to connect to Confluent Schema Registry over SSL, using the SSL configuration settings in the CONFIG and CREDENTIALS clauses of CREATE PIPELINE.

CREATE OR REPLACE PIPELINE p AS LOAD DATA FS "/path/to/files/data.avro"
INTO TABLE t FORMAT AVRO SCHEMA REGISTRY "" (id <- %::id, color <- %::color, input_record <- %);
CONFIG '{"schema.registry.ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"schema.registry.ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"schema.registry.ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'
CREDENTIALS '{"schema.registry.ssl.key.password": "abcdefgh"}'
START PIPELINE p;

Note

You can use a subset of the `ssl.` settings as follows:

  • schema.registry.ssl.key.location, schema.registry.ssl.ca.location, and schema.registry.ssl.key.password

  • schema.registry.ssl.certificate.location, schema.registry.ssl.key.location, and schema.registry.ssl.key.password

schema.registry.ssl.key.password is only required if your SSL certificate key has a password.

Last modified: December 16, 2024

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK