Schema and Pipeline Inference
On this page
Table and pipeline definitions can be inferred from input files using the INFER and CREATE INFERRED commands.
The INFER command creates a DDL definition for a table or pipeline based on input files and outputs the inferred definitions.INFER can be reviewed, edited, and subsequently used with the CREATE TABLE or CREATE PIPELINE commands.
The CREATE INFERRED command infers the schema for a table and pipeline based on input files and creates a table and pipeline based on the inferred schema.
Syntax
Syntax for inferring a table definition from an input file.
INFER TABLE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO | PARQUET | ICEBERG]][AS JSON]
Syntax for inferring a pipeline definition from an input file.
INFER PIPELINE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO | PARQUET | ICEBERG]][AS JSON]
The input_ may be a configuration for loading from Apache Kafka, Amazon S3, a local filesystem, Microsoft Azure, HDFS, and Google Cloud Storage.
All options available for CREATE PIPELINE are supported by INFER and CREATE INFERRED, with the exception of the format options which are inferred from the schema.
Remarks
-
CSV, Avro, Parquet , and Iceberg formats are fully supported.
-
The default format is CSV.
-
TEXTandENUMtype useutf8mb4charset andutf8mb4_collation by default.bin
Note
If the encoding of the source file is not utf8mb4, multi-byte characters in the source file may be replaced with their corresponding single byte counterparts in the inferred table.
To change the encoding of the source file to utf8mb4 on a linux machine, run the following commands:
-
Determine the current encoding of the CSV file.
file -i input.csv -
Convert the file data into
utf8mb4encoded data.iconv -f <input-encoding> -t UTF-8 input.csv -o output.csv
Run the INFER PIPELINE query on the output. file to get the correct inference.
Using the IGNORE clause with INFER PIPELINE for CSV Files
When inferring data from a CSV file, use the IGNORE <n> LINES clause to ignore the specified number of lines from the beginning of the file.
In the following example, the first two lines from the beginning of the CSV file are ignored.
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;
Header Detection with the IGNORE Clause
CSV Header Inference can be controlled by using the HEADER_ clause and the IGNORE clause.IGNORE clause.
INFER PIPELINE ....[FORMAT CSV] [IGNORE (X >= 1) LINES] [HEADER_DETECTION ON/OFF]
Default Behavior
-
If the first non-ignored line is entirely text, and subsequent lines contain a non-text field, then the first line is treated as the header and all remaining lines are ingested as data.
-
If all non-ignored lines are entirely text, there’s no clear header row.
Every line is ingested as data, with columns auto-named col0,col1,col2, etc. -
If the first non-ignored line contains any non-text field, no header is inferred.
All lines are ingested as data, with columns auto-named.
HEADER_
-
Only the first non-ignored line is inspected.
If it’s entirely text, it becomes the header. Otherwise, no header is inferred. All subsequent lines are ingested as data.
HEADER_
-
No header is ever inferred.
Every line (including the first non-ignored line) is ingested as data, with columns auto-named col0,col1,col2, etc.
Note
Empty lines in the source CSV file do not affect inference.IGNORE LINES clause of the new CREATE PIPELINE query.SKIP PARSER ERRORS clause in the new CREATE PIPELINE query.
Enabling the SKIP PARSER ERRORS clause can be risky, as it may also suppress other parsing errors, such as inconsistent field counts or malformed records.
Infer Data from Parquet Files
Date/time values are stored internally in Parquet as physical integers with annotated logical types.
-
Parquet Logical Types and Infers:
Date,Time, andTimestampcolumns are inferred asDate,Time(6), andDatetime(6)columns respectively. -
Parquet Physical Types and Generates: Conversion of Parquet physical types (from
inttodate/timeobjects) is performed with the help of built-in functions that are a part of theSET CLAUSEof the generatedCREATE PIPELINEquery.
To infer data from Parquet files, run the following query:
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' FORMAT PARQUET;
Infer Data from Iceberg Files
When inferring data from Iceberg files:
-
All Iceberg primitive data types and nested structs are supported.
Map-type or List-type columns are skipped while inferring data, with a warning. -
Nested structs in Iceberg can be perceived as a tree-like structure.
While inferring data, nested structs are cut open to map each Iceberg field to a SingleStore table column. For example: An Iceberg field
a struct<int, c struct<d string, e int>>will lead to the generation of three columns in SingleStore:a.,b int a., andc. d string a..c. e int -
Column mappings are appended to the new
CREATE PIPELINEquery to enable users to simply copy paste the generated queries without needing to figure out column mappings on their own. -
Iceberg Inference skips the manual hassle of taking care of
date/timetype Iceberg columns that cannot be mapped to a SingleStore table column without appropriate Conversion built-in functions.It automatically generates them as part of the SET CLAUSEof the generatedCREATE PIPELINEquery. -
For data distribution, Iceberg supports the addition of a subset of columns as the partition key with transforms.
The partition key of Iceberg maps to the shard key of SingleStore. The SingleStore engine picks up the columns (at any nesting level) defined as the partition key and defines them as the shard key in the new
CREATE TABLEquery. -
To infer pipeline in merge mode:
-
Set the
infer_global variable to 'pipeline_ options merge' before inference. -
Or execute the following query:
INFER PIPELINE AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;This query will automatically generate
CREATE TABLE,CREATE VIEW, andCREATE PIPELINEqueries accordingly by adding extra metadata columns and other specific configurations.
-
-
To create all the inferred
TABLE,VIEW, andPIPELINEobjects automatically, execute the following query:CREATE INFERRED PIPELINE <name> AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;Query the view and pipeline by using the specified
<name>.
Examples
The following examples show the use of INFER and CREATE INFER on a data file in Avro format stored in an AWS S3 bucket.
This example uses data that conforms to the schema of a books table shown below.INFER and CREATE INFER infer the schema for this table from an input file.
Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema which can be used in these examples.
books(Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME);Example 1 - INFER TABLE
The following example produces a table definition by scanning the specified Avro file and inferring the table definition from selected rows in the file.
INFER TABLE AS LOAD DATA S3 's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL)"Example 2 - INFER PIPELINE
The following example produces a table and pipeline definition by scanning the specified Avro file and inferring the table and definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL);
CREATE PIPELINE `infer_example_pipeline`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS '{\n \""aws_access_key_id\"":\""your_access_key_id\"",
\n \""aws_secret_access_key\"":\""your_secret_access_key\"",
\n \""aws_session_token\"":\""your_session_token\""}'
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `infer_example_table`
FORMAT AVRO(
`id` <- `id`,
`name` <- `name`,
`num_pages` <- `num_pages`,
`rating` <- `rating`,
`publish_date` <- `publish_date`);"Example 3 - CREATE INFERRED PIPELINE
The following example creates a pipeline with name books_ by inferring the schema from the specified file.
CREATE INFERRED PIPELINE books_pipe AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
Created 'books_pipe' pipelineThe following SQL uses the SHOW CREATE PIPELINE command to view the CREATE PIPELINE for the pipeline that was inferred by the CREATE INFERRED PIPELINE command above.
SHOW CREATE PIPELINE books_pipe;
Pipeline,Create Pipeline
books_pipe,"CREATE PIPELINE `books_pipe`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS <CREDENTIALS REDACTED>
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `books_pipe`
FORMAT AVRO(
`books_pipe`.`id` <- `id`,
`books_pipe`.`name` <- `name`,
`books_pipe`.`num_pages` <- `num_pages`,
`books_pipe`.`rating` <- `rating`,
`books_pipe`.`publish_date` <- `publish_date`)"The following SQL uses the SHOW CREATE TABLE command to view the CREATE TABLE command for the table that was inferred by the CREATE INFERRED PIPELINE command above.
SHOW CREATE TABLE books_pipe;
Table,Create Table
books_pipe,"CREATE TABLE `books_pipe` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double DEFAULT NULL,
`publish_date` bigint(20) NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'"The pipeline and table definitions can be adjusted using CREATE OR REPLACE PIPELINE (CREATE PIPELINE) and ALTER TABLE.
Once the pipeline and table definitions have been adjusted, start the pipeline.
START PIPELINE books_pipe FOREGROUND;
The command above starts a pipeline in the foreground so that errors will be displayed in the client.FOREGROUND keyword.
Confirm that the data has been loaded.
SELECT * FROM books_pipeORDER BY id;
+----+--------------------+-----------+--------+------------------+
| id | name | num_pages | rating | publish_date |
+----+--------------------+-----------+--------+------------------+
| 1 | HappyPlace | 400 | 4.9 | 1680721200000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669665600000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591124400000000 |
+----+--------------------+-----------+--------+------------------+Example 4 - Using AS_ JSON
The AS JSON keyword can be used to produce pipeline and table definitions in JSON format.INFER PIPELINE with the AS JSON keyword.INFER TABLE can be used in a similar manner.
The following example produces a pipeline and table definition by scanning the specified Avro file and inferring the definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVROAS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
"connection_string":"s3://data-folder/books.avro",
"link":null,"source_type":"S3",
"config":"{\"region\":\"us-west-2\"}",
"credentials":"{\n
\"aws_access_key_id\":\"your_access_key_id\",\n
\"aws_secret_access_key\":\"your_secret_access_key\",\n
\"aws_session_token\":\"your_session_token\"}",
"batch_interval":2500,
"resource_pool":null,
"max_partitions_per_batch":null,
"max_retries_per_batch_partition":null,
"enable_out_of_order_optimization":false,
"aggregator_pipeline":false,
"transform":null,"load_error_policy":null,
"dup_key_policy":null,
"table":"infer_example_table",
"procedure":null,
"data_format":"AVRO",
"with_temporal_conversion":false,
"avro_schema":null,
"time_zone":null,
"avro_schema_registry_url":null,
"fields_terminated_by":null,
"fields_enclosed_by":null,
"fields_escaped_by":null,
"lines_terminated_by":null,
"lines_starting_by":null,
"extended_null":null,
"enclosed_null":null,
"trailing_nullcols":null,
"null_defined_by":null,
"ignore_lines":null,
"column_list":[
"`id`","`name`",
"`num_pages`",
"`rating`",
"`publish_date`"],
"json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
"column_defaults":[null,null,null,null,null],
"where_clause":null,"set_clause":null,
"on_duplicate_key_update":null,
"kafka_key_start_index": 0,
"kafka_key_format":null,
"stop_on_error":null,
"enable_offsets_metadata_gc":false,
"gc_timestamp":0,"create_time":1716313589,
"alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
"columns":[
{"name":"`id`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`name`",
"type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
"base_type":"LONGTEXT",
"nullable":false,
"collation":"utf8_general_ci",
"length":null,"precision":null,
"scale":null,"signed":null,
"possible_values":null},
"default":null},
{"name":"`num_pages`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`rating`",
"type":{"sql_text":"double NULL",
"base_type":"DOUBLE",
"nullable":true,
"collation":null,
"length":null,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`publish_date`",
"type":{"sql_text":"bigint(20) NOT NULL",
"base_type":"BIGINT",
"nullable":false,
"collation":null,
"length":20,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null}],
"indexes":[]}}Generate an Avro File
An Avro file can be generated that matches the schema of the following SQL CREATE TABLE statement.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,Publish_date DATETIME);
Create and activate a Python virtual environment to use for generating the code and install the avro package.
python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate
pip3 install avroAvro schemas are defined in JSON.
{"namespace": "books.avro","type": "record","name": "Book","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "num_pages", "type": "int"},{"name": "rating", "type": "double"},{"name": "publish_timestamp", "type": "long","logicalType": "timestamp-micros"} ]}
The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification.
The following Python code creates an Avro file named books.books table.
Either run the following code in the Python interpreter or save this code to a file named create_
import avro.schemafrom avro.datafile import DataFileWriterfrom avro.io import DatumWriterimport datetime as dtschema = avro.schema.parse(open("books.avsc","rb").read())day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,"rating":4.9, "publish_timestamp":day1})writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,"rating":4.9, "publish_timestamp":day2})writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,"rating":4.9, "publish_timestamp":day3})writer.close()
If you added the code to a file, change to the directory where this Python script resides and run the script to generate the Avro file.
cd /path/to/avro-scriptpython3 create_avro_file.py
After the Avro file has been created, move this file to the location from which you want to load it, like an Amazon S3 bucket.
Related Topics
Last modified: June 17, 2025