Schema and Pipeline Inference
On this page
Table and pipeline definitions can be inferred from input files using the INFER
and CREATE INFERRED
commands.
The INFER
command creates a DDL definition for a table or pipeline based on input files and outputs the inferred definitions.INFER
can be reviewed, edited, and subsequently used with the CREATE TABLE or CREATE PIPELINE commands.
The CREATE INFERRED
command infers the schema for a table and pipeline based on input files and creates a table and pipeline based on the inferred schema.
Syntax
Syntax for inferring a table definition from an input file.
INFER TABLE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO]][AS JSON]
Syntax for inferring a pipeline definition from an input file.
INFER PIPELINE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO]][AS JSON]
The input_
may be a configuration for loading from Apache Kafka, Amazon S3, a local filesystem, Microsoft Azure, HDFS, and Google Cloud Storage.
All options available for CREATE PIPELINE
are supported by INFER
and CREATE INFERRED
, with the exception of the format options which are inferred from the schema.
Remarks
-
CSV and Avro formats are fully supported.
-
The default format is CSV.
Examples
The following examples show the use of INFER
and CREATE INFER
on a data file in Avro format stored in an AWS S3 bucket.
This example uses data that conforms to the schema of a books table shown below.INFER
and CREATE INFER
infer the schema for this table from an input file.
Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema which can be used in these examples.
books(Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME);
Example 1 - INFER TABLE
The following example produces a table definition by scanning the specified Avro file and inferring the table definition from selected rows in the file.
INFER TABLE AS LOAD DATA S3 's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL)"
Example 2 - INFER PIPELINE
The following example produces a table and pipeline definition by scanning the specified Avro file and inferring the table and definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL);
CREATE PIPELINE `infer_example_pipeline`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS '{\n \""aws_access_key_id\"":\""your_access_key_id\"",
\n \""aws_secret_access_key\"":\""your_secret_access_key\"",
\n \""aws_session_token\"":\""your_session_token\""}'
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `infer_example_table`
FORMAT AVRO(
`id` <- `id`,
`name` <- `name`,
`num_pages` <- `num_pages`,
`rating` <- `rating`,
`publish_date` <- `publish_date`);"
Example 3 - CREATE INFERRED PIPELINE
The following example creates a pipeline with name books_
by inferring the schema from the specified file.
CREATE INFERRED PIPELINE books_pipe AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
Created 'books_pipe' pipeline
The following SQL uses the SHOW CREATE PIPELINE
command to view the CREATE PIPELINE
for the pipeline that was inferred by the CREATE INFERRED PIPELINE
command above.
SHOW CREATE PIPELINE books_pipe;
Pipeline,Create Pipeline
books_pipe,"CREATE PIPELINE `books_pipe`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS <CREDENTIALS REDACTED>
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `books_pipe`
FORMAT AVRO(
`books_pipe`.`id` <- `id`,
`books_pipe`.`name` <- `name`,
`books_pipe`.`num_pages` <- `num_pages`,
`books_pipe`.`rating` <- `rating`,
`books_pipe`.`publish_date` <- `publish_date`)"
The following SQL uses the SHOW CREATE TABLE
command to view the CREATE TABLE
command for the table that was inferred by the CREATE INFERRED PIPELINE
command above.
SHOW CREATE TABLE books_pipe;
Table,Create Table
books_pipe,"CREATE TABLE `books_pipe` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double DEFAULT NULL,
`publish_date` bigint(20) NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'"
The pipeline and table definitions can be adjusted using CREATE OR REPLACE PIPELINE
(CREATE PIPELINE) and ALTER TABLE.
Once the pipeline and table definitions have been adjusted, start the pipeline.
START PIPELINE books_pipe FOREGROUND;
The command above starts a pipeline in the foreground so that errors will be displayed in the client.FOREGROUND
keyword.
Confirm that the data has been loaded.
SELECT * FROM books_pipeORDER BY id;
+----+--------------------+-----------+--------+------------------+
| id | name | num_pages | rating | publish_date |
+----+--------------------+-----------+--------+------------------+
| 1 | HappyPlace | 400 | 4.9 | 1680721200000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669665600000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591124400000000 |
+----+--------------------+-----------+--------+------------------+
Example 5 - Using AS_ JSON
The AS JSON
keyword can be used to produce pipeline and table definitions in JSON format.INFER PIPELINE
with the AS JSON
keyword.INFER TABLE
can be used in a similar manner.
The following example produces a pipeline and table definition by scanning the specified Avro file and inferring the definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVROAS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
"connection_string":"s3://data-folder/books.avro",
"link":null,"source_type":"S3",
"config":"{\"region\":\"us-west-2\"}",
"credentials":"{\n
\"aws_access_key_id\":\"your_access_key_id\",\n
\"aws_secret_access_key\":\"your_secret_access_key\",\n
\"aws_session_token\":\"your_session_token\"}",
"batch_interval":2500,
"resource_pool":null,
"max_partitions_per_batch":null,
"max_retries_per_batch_partition":null,
"enable_out_of_order_optimization":false,
"aggregator_pipeline":false,
"transform":null,"load_error_policy":null,
"dup_key_policy":null,
"table":"infer_example_table",
"procedure":null,
"data_format":"AVRO",
"with_temporal_conversion":false,
"avro_schema":null,
"time_zone":null,
"avro_schema_registry_url":null,
"fields_terminated_by":null,
"fields_enclosed_by":null,
"fields_escaped_by":null,
"lines_terminated_by":null,
"lines_starting_by":null,
"extended_null":null,
"enclosed_null":null,
"trailing_nullcols":null,
"null_defined_by":null,
"ignore_lines":null,
"column_list":[
"`id`","`name`",
"`num_pages`",
"`rating`",
"`publish_date`"],
"json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
"column_defaults":[null,null,null,null,null],
"where_clause":null,"set_clause":null,
"on_duplicate_key_update":null,
"kafka_key_start_index": 0,
"kafka_key_format":null,
"stop_on_error":null,
"enable_offsets_metadata_gc":false,
"gc_timestamp":0,"create_time":1716313589,
"alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
"columns":[
{"name":"`id`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`name`",
"type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
"base_type":"LONGTEXT",
"nullable":false,
"collation":"utf8_general_ci",
"length":null,"precision":null,
"scale":null,"signed":null,
"possible_values":null},
"default":null},
{"name":"`num_pages`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`rating`",
"type":{"sql_text":"double NULL",
"base_type":"DOUBLE",
"nullable":true,
"collation":null,
"length":null,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`publish_date`",
"type":{"sql_text":"bigint(20) NOT NULL",
"base_type":"BIGINT",
"nullable":false,
"collation":null,
"length":20,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null}],
"indexes":[]}}
Generate an Avro File
An Avro file can be generated that matches the schema of the following SQL CREATE TABLE
statement.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,Publish_date DATETIME);
Create and activate a Python virtual environment to use for generating the code and install the avro package.
python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate
pip3 install avro
Avro schemas are defined in JSON.
{"namespace": "books.avro","type": "record","name": "Book","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "num_pages", "type": "int"},{"name": "rating", "type": "double"},{"name": "publish_timestamp", "type": "long","logicalType": "timestamp-micros"} ]}
The timestamp-micros
is a logical type in Avro that annotates an Avro int as defined in the Avro specification.
The following Python code creates an Avro file named books.books
table.
Either run the following code in the Python interpreter or save this code to a file named create_
import avro.schemafrom avro.datafile import DataFileWriterfrom avro.io import DatumWriterimport datetime as dtschema = avro.schema.parse(open("books.avsc","rb").read())day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,"rating":4.9, "publish_timestamp":day1})writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,"rating":4.9, "publish_timestamp":day2})writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,"rating":4.9, "publish_timestamp":day3})writer.close()
If you added the code to a file, change to the directory where this Python script resides and run the script to generate the Avro file.
cd /path/to/avro-scriptpython3 create_avro_file.py
After the Avro file has been created, move this file to the location from which you want to load it, like an Amazon S3 bucket.
Related Topics
Last modified: August 22, 2024