Schema and Pipeline Inference

Table and pipeline definitions can be inferred from input files using the INFER and CREATE INFERRED commands.

The INFER command creates a DDL definition for a table or pipeline based on input files and outputs the inferred definitions. The output of INFER can be reviewed, edited, and subsequently used with the CREATE TABLE or CREATE PIPELINE commands.

The CREATE INFERRED command infers the schema for a table and pipeline based on input files and creates a table and pipeline based on the inferred schema.

Syntax

Syntax for inferring a table definition from an input file.

INFER TABLE AS LOAD DATA {input_configuration}             
[FORMAT [CSV | AVRO]]
[AS JSON]

Syntax for inferring a pipeline definition from an input file.

INFER PIPELINE AS LOAD DATA {input_configuration}
[FORMAT [CSV | AVRO]]
     [AS JSON]

The input_configuration may be a configuration for loading from Apache Kafka, Amazon S3, a local filesystem, Microsoft Azure, HDFS, and Google Cloud Storage. Configuration specifications can be found in CREATE PIPELINE. The following examples use Amazon S3.

All options available for CREATE PIPELINE are supported by INFER and CREATE INFERRED, with the exception of the format options which are inferred from the schema.

Remarks

  • CSV and Avro formats are fully supported.

  • The default format is CSV.

Examples

The following examples show the use of INFER and CREATE INFER on a data file in Avro format stored in an AWS S3 bucket.

This example uses data that conforms to the schema of a books table shown below. INFER and CREATE INFER infer the schema for this table from an input file.

Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema which can be used in these examples.

books(Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME);

Example 1 - INFER TABLE

The following example produces a table definition by scanning the specified Avro file and inferring the table definition from selected rows in the file. The output is printed in query definition format.

INFER TABLE AS LOAD DATA S3 's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
    `id` int(11) NOT NULL,
    `name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
    `num_pages` int(11) NOT NULL,
    `rating` double NULL,
    `publish_date` bigint(20) NOT NULL)"

Example 2 - INFER PIPELINE

The following example produces a table and pipeline definition by scanning the specified Avro file and inferring the table and definitions from selected rows in the file. The output is printed in query definition format.

INFER PIPELINE AS LOAD DATA S3
        's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
    `id` int(11) NOT NULL,
    `name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
    `num_pages` int(11) NOT NULL,
    `rating` double NULL,
    `publish_date` bigint(20) NOT NULL);
CREATE PIPELINE `infer_example_pipeline`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS '{\n    \""aws_access_key_id\"":\""your_access_key_id\"",
\n    \""aws_secret_access_key\"":\""your_secret_access_key\"",
\n    \""aws_session_token\"":\""your_session_token\""}'
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `infer_example_table`
FORMAT AVRO(
    `id` <- `id`,
    `name` <- `name`,
    `num_pages` <- `num_pages`,
    `rating` <- `rating`,
    `publish_date` <- `publish_date`);"

Example 3 - CREATE INFERRED PIPELINE

The following example creates a pipeline with name books_pipe by inferring the schema from the specified file. This command also creates a table of the same name as the pipeline. The pipeline is not started so the user can review the pipeline and table definitions and adjust them as desired.

CREATE INFERRED PIPELINE books_pipe AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO;
Created 'books_pipe' pipeline

The following SQL uses the SHOW CREATE PIPELINE command to view the CREATE PIPELINE for the pipeline that was inferred by the CREATE INFERRED PIPELINE command above.

SHOW CREATE PIPELINE books_pipe;
Pipeline,Create Pipeline
books_pipe,"CREATE PIPELINE `books_pipe`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS <CREDENTIALS REDACTED>
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `books_pipe`
FORMAT AVRO(
    `books_pipe`.`id` <- `id`,
    `books_pipe`.`name` <- `name`,
    `books_pipe`.`num_pages` <- `num_pages`,
    `books_pipe`.`rating` <- `rating`,
    `books_pipe`.`publish_date` <- `publish_date`)"

The following SQL uses the SHOW CREATE TABLE command to view the CREATE TABLE command for the table that was inferred by the CREATE INFERRED PIPELINE command above.

SHOW CREATE TABLE books_pipe;
Table,Create Table
books_pipe,"CREATE TABLE `books_pipe` (
  `id` int(11) NOT NULL,
  `name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `num_pages` int(11) NOT NULL,
  `rating` double DEFAULT NULL,
  `publish_date` bigint(20) NOT NULL,
   SORT KEY `__UNORDERED` (),
   SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL 
AUTOSTATS_HISTOGRAM_MODE=CREATE 
AUTOSTATS_SAMPLING=ON 
SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'"

The pipeline and table definitions can be adjusted using CREATE OR REPLACE PIPELINE (CREATE PIPELINE) and ALTER TABLE.

Once the pipeline and table definitions have been adjusted, start the pipeline.

START PIPELINE books_pipe FOREGROUND;

The command above starts a pipeline in the foreground so that errors will be displayed in the client. If you are using pipelines that will run continuously, consider starting pipelines in the background by omitting the FOREGROUND keyword. Refer to START PIPELINE for more information.

Confirm that the data has been loaded.

SELECT * FROM books_pipe
ORDER BY id;
+----+--------------------+-----------+--------+------------------+
| id | name               | num_pages | rating | publish_date     |
+----+--------------------+-----------+--------+------------------+
|  1 | HappyPlace         |       400 |    4.9 | 1680721200000000 |
|  2 | Legends & Lattes   |       304 |    4.9 | 1669665600000000 |
|  3 | The Vanishing Half |       352 |    4.9 | 1591124400000000 |
+----+--------------------+-----------+--------+------------------+

Example 5 - Using AS_JSON

The AS JSON keyword can be used to produce pipeline and table definitions in JSON format. The following example shows the use of INFER PIPELINE with the AS JSON keyword. INFER TABLE can be used in a similar manner.

The following example produces a pipeline and table definition by scanning the specified Avro file and inferring the definitions from selected rows in the file. The output is printed in JSON format.

INFER PIPELINE AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO
AS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
 "connection_string":"s3://data-folder/books.avro",
 "link":null,"source_type":"S3",
 "config":"{\"region\":\"us-west-2\"}",
 "credentials":"{\n
    \"aws_access_key_id\":\"your_access_key_id\",\n
    \"aws_secret_access_key\":\"your_secret_access_key\",\n
    \"aws_session_token\":\"your_session_token\"}",
 "batch_interval":2500,
 "resource_pool":null,
 "max_partitions_per_batch":null,
 "max_retries_per_batch_partition":null,
 "enable_out_of_order_optimization":false,
 "aggregator_pipeline":false,
 "transform":null,"load_error_policy":null,
 "dup_key_policy":null,
 "table":"infer_example_table",
 "procedure":null,
 "data_format":"AVRO",
 "with_temporal_conversion":false,
 "avro_schema":null,
 "time_zone":null,
 "avro_schema_registry_url":null,
 "fields_terminated_by":null,
 "fields_enclosed_by":null,
 "fields_escaped_by":null,
 "lines_terminated_by":null,
 "lines_starting_by":null,
 "extended_null":null,
 "enclosed_null":null,
 "trailing_nullcols":null,
 "null_defined_by":null,
 "ignore_lines":null,
 "column_list":[
  "`id`","`name`",
  "`num_pages`",
  "`rating`",
  "`publish_date`"],
 "json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
 "column_defaults":[null,null,null,null,null],
 "where_clause":null,"set_clause":null,
 "on_duplicate_key_update":null,
 "kafka_key_start_index": 0,
 "kafka_key_format":null,
 "stop_on_error":null,
 "enable_offsets_metadata_gc":false,
 "gc_timestamp":0,"create_time":1716313589,
 "alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
 "columns":[
 {"name":"`id`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`name`",
  "type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
   "base_type":"LONGTEXT",
   "nullable":false,
   "collation":"utf8_general_ci",
   "length":null,"precision":null,
   "scale":null,"signed":null,
   "possible_values":null},
  "default":null},
 {"name":"`num_pages`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`rating`",
  "type":{"sql_text":"double NULL",
   "base_type":"DOUBLE",
   "nullable":true,
   "collation":null,
   "length":null,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`publish_date`",
  "type":{"sql_text":"bigint(20) NOT NULL",
   "base_type":"BIGINT",
   "nullable":false,
   "collation":null,
   "length":20,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null}],
 "indexes":[]}}

Generate an Avro File

An Avro file can be generated that matches the schema of the following SQL CREATE TABLE statement. This Avro file can then be used in the preceding examples.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);

Create and activate a Python virtual environment to use for generating the code and install the avro package.

python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate
pip3 install avro

Avro schemas are defined in JSON. The following JSON defines an Avro schema which matches the preceding books table schema. Save the schema in a file named books.avsc.

{"namespace": "books.avro",
 "type": "record",
 "name": "Book",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "num_pages", "type": "int"},
     {"name": "rating", "type": "double"},
     {"name": "publish_timestamp", "type": "long",
               "logicalType": "timestamp-micros"} ]}

The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification. The int stores the number of microseconds from the Unix epoch, January 1, 1970 00:00:00.00000 UTC.

The following Python code creates an Avro file named books.avro that contains data that matches the structure of the books table.

Either run the following code in the Python interpreter or save this code to a file named create_avro_file.py.

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import datetime as dt
schema = avro.schema.parse(open("books.avsc","rb").read())
day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)
day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)
day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)
writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)
writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,
                            "rating":4.9, "publish_timestamp":day1})
writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,
                            "rating":4.9, "publish_timestamp":day2})
writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,
                            "rating":4.9, "publish_timestamp":day3})
writer.close()

If you added the code to a file, change to the directory where this Python script resides and run the script to generate the Avro file.

cd /path/to/avro-script
python3 create_avro_file.py

After the Avro file has been created, move this file to the location from which you want to load it, like an Amazon S3 bucket.

Last modified: June 6, 2024

Was this article helpful?