Join the SingleStore Community Today
Get expert advice, develop skills, and connect with others.

Avro Schema Evolution with Pipelines

Avro schema evolution is the ability of existing consumers of a schema to easily handle updates made to the schema.

SingleStore DB Pipelines support some Avro schema evolution capabilities, which are explained below.

  • When you create a Pipeline, instead of specifying the Avro schema definition directly in the CREATE PIPELINE statement, you can specify the host name or IP address of Confluent Schema Registry. The schema registry contains the definition of the schema. If fields are added to the definition, or fields are removed, the Pipeline sees these changes.

  • Without Avro schema evolution, you need to reset a Pipeline’s offsets after updating the Avro schema. The offsets track the current position in the data source from which the Pipeline is reading. Because the offsets are reset to the beginning position, you also need to unload the data from the Pipeline’s target table. As an alternative to resetting the offsets and unloading the data, you could collect its offsets (which may be a difficult process).

  • If you add fields to the Avro schema (but do not remove fields), you do not need to stop the Pipeline before modifying it to add the fields.

Order of Operations for Updating an Avro Schema and its Associated Database Objects

When you update an Avro schema, follow the steps below (in order) to modify the Pipeline and the Pipeline’s target table to match the updated schema. You can modify the schema (which resides in the schema registry) at any time during this process.

  1. Run STOP PIPELINE DETACH to detach the Pipeline. If you are are adding a field to the schema, this step is not required.

  2. Using ALTER TABLE, add or remove fields from the Pipeline’s target tables.

  3. Using ALTER PIPELINE or CREATE OR REPLACE PIPELINE, add or remove fields from the Pipeline.

  4. Run START PIPELINE to start the Pipeline. If you are adding a field to the schema and have not stopped the Pipeline, this step is not required.

If you add a field to the schema prior to adding the field to the Pipeline and its target table, and have not stopped the Pipeline, the Pipeline continues to run. The data from the new field in the data source is not populated in the Pipeline’s target table, until the field is added to the Pipeline and its target table.

If you add a field to the schema after adding the field to the Pipeline and its target table, and have not stopped the Pipeline, the Pipeline continues to run. Before the new field is added to the schema, a default value for the field is populated in the Pipeline’s target table. See Example 1 below for details.

Schema Evolution Examples

Example 1: Adding a New Field to an Avro Schema

Consider an Avro schema product with the fields id and color:

{
  "type": "record",
  "name": "product",
  "fields": [{ "name": "id", "type": "long"},
             { "name": "color", "type": [ "null","string" ]}]
}

The fields are loaded into the SingleStore DB table t through the Pipeline via the following commands:

CREATE TABLE t(id BIGINT DEFAULT 0, name VARCHAR DEFAULT NULL, input_record JSON DEFAULT NULL);
CREATE PIPELINE p AS LOAD DATA LOCAL INFILE "data.avro"
     INTO TABLE t
     FORMAT AVRO
     SCHEMA REGISTRY "your_schema_registry_host_name_or_ip"
     (id <- %::id,
     color <- %::color,
     input_record <- %);

Now, the Avro schema evolves with a new field price:

{
  "type": "record",
  "name": "product",
  "fields": [{ "name": "id", "type": "long"},
             { "name": "color", "type": [ "null", "string" ]}
             { "name": "price", "type": "float” }]
}

To reflect the addition of the new field, you need to first update the table t along with the default value of the new field price:

ALTER TABLE t ADD COLUMN price FLOAT DEFAULT NULL;

Using the CREATE OR REPLACE PIPELINE command, update the Pipeline to load the new field. This allows the Pipeline to continue running without losing any offsets.

CREATE OR REPLACE PIPELINE p AS LOAD DATA LOCAL INFILE “data.avro”
	INTO TABLE t
  FORMAT AVRO
  SCHEMA REGISTRY "your_schema_registry_host_name_or_ip"
    	(id <- %::id,
 	 color <- %::color,
         price <- %::price DEFAULT NULL,
     	 input_record <- %);

Populating the Pipeline’s Target Table with a Default Value

If, prior to updating the Avro schema to include the new field price, you updated the Pipeline and its target table to include the field, the table’s field would be populated with the DEFAULT value. This value can be specified either in the ALTER statement that alters the table (DEFAULT NULL in this example), or in the CREATE OR REPLACE PIPELINE statement that alters the Pipeline (DEFAULT NULL in this example). If the DEFAULT value is specified in both places, as in this example, the table’s DEFAULT value is used. If the DEFAULT value is specified in neither place, an error is thrown.

Example 2: Removing a Field from an Avro Schema

Consider an Avro schema product with the fields id, color, and price:

{
  "type": "record",
  "name": "product",
  "fields": [{ "name": "id", "type": "long"},
             { "name": "color", "type": [ "null", "string" ]}
             { "name": "price", "type": "float” }]
}

Load the schema into the SingleStore DB table t through the Pipeline by the following command:

CREATE OR REPLACE PIPELINE p AS LOAD DATA LOCAL INFILE “data.avro”
	INTO TABLE t
  FORMAT AVRO
  SCHEMA REGISTRY "your_schema_registry_host_name_or_ip"
    (id <- %::id,
     color <- %::color,
     price <- %::price,
     input_record <- %);

Now, the Avro schema evolves by no longer containing the field price:

{
  "type": "record",
  "name": "data",
  "fields": [{ "name": "id", "type": "long"},
             { "name": "color", "type": [ "null", "string" ]}
}

Run the following commands in sequence to evolve the schema:

STOP PIPELINE DETACH p;

Note: The DETACH option in the STOP PIPELINE command allows to temporarily stop a pipeline in order to make changes to a source table to remove a field.

ALTER TABLE t DROP COLUMN price;
CREATE OR REPLACE PIPELINE p AS LOAD DATA LOCAL INFILE “data.avro”
	INTO TABLE t
  FORMAT AVRO
  SCHEMA REGISTRY "your_schema_registry_host_name_or_ip"
    	(id <- %::id,
 	    color <- %::color,
     	input_record <- %);
START PIPELINE p;