Schema and Pipeline Inference
On this page
Table and pipeline definitions can be inferred from input files using the following commands:
Using the IGNORE clause with INFER PIPELINE for CSV Files
When inferring data from a CSV file, use the IGNORE <n> LINES clause to ignore the specified number of lines from the beginning of the file.
In the following example, the first two lines from the beginning of the CSV file are ignored.
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;
Header Detection with the IGNORE Clause
CSV Header Inference can be controlled by using the HEADER_ clause and the IGNORE clause.IGNORE clause.
INFER PIPELINE ....[FORMAT CSV] [IGNORE (X >= 1) LINES] [HEADER_DETECTION ON/OFF]
Default Behavior
-
If the first non-ignored line is entirely text, and subsequent lines contain a non-text field, then the first line is treated as the header and all remaining lines are ingested as data.
-
If all non-ignored lines are entirely text and there is no clear header row, then every line is ingested as data, with columns auto-named
col0,col1,col2, etc. -
If the first non-ignored line contains any non-text field and no header is inferred, then all lines are ingested as data, with columns auto-named.
HEADER_
-
Only the first non-ignored line is inspected.
If it’s entirely text, it becomes the header. Otherwise, no header is inferred. All subsequent lines are ingested as data.
HEADER_
-
No header is inferred.
Every line (including the first non-ignored line) is ingested as data, with columns auto-named col0,col1,col2, etc.
Note
Empty lines in the source CSV file do not affect inference.IGNORE LINES clause of the new CREATE PIPELINE query.SKIP PARSER ERRORS clause in the new CREATE PIPELINE query.
SingleStore does not recommend enabling the SKIP PARSER ERRORS clause, as it suppresses other parsing errors, such as inconsistent field counts or malformed records.
Infer Data from Parquet Files
Date/Time values are stored internally in Parquet as physical integers with annotated logical types.
-
Parquet Logical Types and Infers:
Date,Time, andTimestampcolumns are inferred asDate,Time(6), andDatetime(6)columns respectively. -
Parquet Physical Types and Generates: Conversion of Parquet physical types (from
inttodate/timeobjects) is performed with the help of built-in functions that are a part of theSET CLAUSEof the generatedCREATE PIPELINEquery.
To infer data from Parquet files, run the following command:
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' FORMAT PARQUET;
Infer Data from Iceberg Files
When inferring data from Iceberg files:
-
All Iceberg primitive data types and nested structs are supported.
Map-type or List-type columns are skipped while inferring data, with a warning. -
Nested structs in Iceberg can be perceived as a tree-like structure.
While inferring data, nested structs are cut open to map each Iceberg field to a SingleStore table column. For example: An Iceberg field
a struct<int, c struct<d string, e int>>will lead to the generation of three columns in SingleStore:a.,b int a., andc. d string a..c. e int -
Column mappings are included in the generated
CREATE PIPELINEquery that enable users to copy and paste the query without defining the column mappings manually. -
Iceberg Inference automatically handles
DateandTimecolumns that cannot be directly mapped to a SingleStore table column without appropriate conversion built-in functions.These columns are automatically generated in the SETclause of the generatedCREATE PIPELINEquery. -
For data distribution, Iceberg supports the addition of a subset of columns as the partition key with transforms.
The partition key of Iceberg maps to the shard key of SingleStore. SingleStore identifies columns defined as the partition key, at any nesting level, and sets them as the shard key in the generated
CREATE TABLEquery. -
To infer pipeline in merge mode:
-
Set the
infer_global variable to 'pipeline_ options merge' before inference. -
Or run the following command:
INFER PIPELINE AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;This command automatically generates
CREATE TABLE,CREATE VIEW, andCREATE PIPELINEqueries accordingly by adding extra metadata columns and other specific configurations.
-
-
To create all the inferred
TABLE,VIEW, andPIPELINEobjects automatically, run the following command:CREATE INFERRED PIPELINE <name> AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;Query the view and pipeline by using the specified
<name>.
Generate an Avro File
An Avro file can be generated that matches the schema of the following CREATE TABLE statement.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,Publish_date DATETIME);
Create and activate a Python virtual environment to use for generating the code and install the avro package.
python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate
pip3 install avroAvro schemas are defined in JSON.books table schema.books..
{"namespace": "books.avro","type": "record","name": "Book","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "num_pages", "type": "int"},{"name": "rating", "type": "double"},{"name": "publish_timestamp", "type": "long","logicalType": "timestamp-micros"} ]}
The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification.
The following Python code creates an Avro file named books. that contains data matching the structure of the books table.
Either run the following code in the Python interpreter or save this code to a file named create_.
import avro.schemafrom avro.datafile import DataFileWriterfrom avro.io import DatumWriterimport datetime as dtschema = avro.schema.parse(open("books.avsc","rb").read())day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,"rating":4.9, "publish_timestamp":day1})writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,"rating":4.9, "publish_timestamp":day2})writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,"rating":4.9, "publish_timestamp":day3})writer.close()
If the code is in a file, navigate to the directory containing the Python script and run it to generate the Avro file.
cd /path/to/avro-scriptpython3 create_avro_file.py
After the Avro file is created, move it to the desired location for loading, such as an Amazon S3 bucket.
Examples
Infer Headers When All Columns are Strings
If all non-ignored lines in a CSV file contain only text, SingleStore cannot automatically detect a header.col0, col1, col2, etc.books.:
Id,Name,NumPages,Rating,PublishDate
1,HappyPlace,400,4.9,2023-04-05
2,Legends & Lattes,304,4.9,2022-11-28
3,The Vanishing Half,352,4.9,2020-06-02Run the following command without header detection:
INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV;
As a result, column names are auto-generated as col0, col1, col2, col3, and col4.
Run the following command with header detection enabled:
INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV HEADER_DETECTION ON;The first line is treated as the header.Id, Name, NumPages, Rating, PublishDate.
Using AS JSON
The following example demonstrates how to use INFER PIPELINE with the AS JSON keyword with a data file in Avro format stored in an AWS S3 bucket.INFER TABLE can be used in a similar manner.
This example uses data that conforms to the schema of the books table, as shown in the following.
{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema.
The following example generates a pipeline and table definition by scanning the specified Avro file and inferring the schema from selected rows.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVROAS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
"connection_string":"s3://data-folder/books.avro",
"link":null,"source_type":"S3",
"config":"{\"region\":\"us-west-2\"}",
"credentials":"{\n
\"aws_access_key_id\":\"your_access_key_id\",\n
\"aws_secret_access_key\":\"your_secret_access_key\",\n
\"aws_session_token\":\"your_session_token\"}",
"batch_interval":2500,
"resource_pool":null,
"max_partitions_per_batch":null,
"max_retries_per_batch_partition":null,
"enable_out_of_order_optimization":false,
"aggregator_pipeline":false,
"transform":null,"load_error_policy":null,
"dup_key_policy":null,
"table":"infer_example_table",
"procedure":null,
"data_format":"AVRO",
"with_temporal_conversion":false,
"avro_schema":null,
"time_zone":null,
"avro_schema_registry_url":null,
"fields_terminated_by":null,
"fields_enclosed_by":null,
"fields_escaped_by":null,
"lines_terminated_by":null,
"lines_starting_by":null,
"extended_null":null,
"enclosed_null":null,
"trailing_nullcols":null,
"null_defined_by":null,
"ignore_lines":null,
"column_list":[
"`id`","`name`",
"`num_pages`",
"`rating`",
"`publish_date`"],
"json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
"column_defaults":[null,null,null,null,null],
"where_clause":null,"set_clause":null,
"on_duplicate_key_update":null,
"kafka_key_start_index": 0,
"kafka_key_format":null,
"stop_on_error":null,
"enable_offsets_metadata_gc":false,
"gc_timestamp":0,"create_time":1716313589,
"alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
"columns":[
{"name":"`id`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`name`",
"type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
"base_type":"LONGTEXT",
"nullable":false,
"collation":"utf8_general_ci",
"length":null,"precision":null,
"scale":null,"signed":null,
"possible_values":null},
"default":null},
{"name":"`num_pages`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`rating`",
"type":{"sql_text":"double NULL",
"base_type":"DOUBLE",
"nullable":true,
"collation":null,
"length":null,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`publish_date`",
"type":{"sql_text":"bigint(20) NOT NULL",
"base_type":"BIGINT",
"nullable":false,
"collation":null,
"length":20,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null}],
"indexes":[]}}Related Topics
Last modified: November 14, 2025