Warning
SingleStore 9.0 gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 8.9 is recommended for production workloads, which can later be upgraded to SingleStore 9.0.
Schema and Pipeline Inference
On this page
Table and pipeline definitions can be inferred from input files using the INFER
and CREATE INFERRED
commands.
The INFER
command creates a DDL definition for a table or pipeline based on input files and outputs the inferred definitions.INFER
can be reviewed, edited, and subsequently used with the CREATE TABLE or CREATE PIPELINE commands.
The CREATE INFERRED
command infers the schema for a table and pipeline based on input files and creates a table and pipeline based on the inferred schema.
Syntax
Syntax for inferring a table definition from an input file.
INFER TABLE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO | PARQUET | ICEBERG]][AS JSON]
Syntax for inferring a pipeline definition from an input file.
INFER PIPELINE AS LOAD DATA {input_configuration}[FORMAT [CSV | AVRO | PARQUET | ICEBERG]][AS JSON]
The input_
may be a configuration for loading from Apache Kafka, Amazon S3, a local filesystem, Microsoft Azure, HDFS, and Google Cloud Storage.
All options available for CREATE PIPELINE
are supported by INFER
and CREATE INFERRED
, with the exception of the format options which are inferred from the schema.
Remarks
-
CSV, Avro, Parquet , and Iceberg formats are fully supported.
-
The default format is CSV.
-
TEXT
andENUM
type useutf8mb4
charset andutf8mb4_
collation by default.bin
Note
If the encoding of the source file is not utf8mb4
, multi-byte characters in the source file may be replaced with their corresponding single byte counterparts in the inferred table.
To change the encoding of the source file to utf8mb4
on a linux machine, run the following commands:
-
Determine the current encoding of the CSV file.
file -i input.csv
-
Convert the file data into
utf8mb4
encoded data.iconv -f <input-encoding> -t UTF-8 input.csv -o output.csv
Run the INFER PIPELINE
query on the output.
file to get the correct inference.
Using the IGNORE clause with INFER PIPELINE for CSV Files
When inferring data from a CSV file, use the IGNORE <n> LINES
clause to ignore the specified number of lines from the beginning of the file.
In the following example, the first two lines from the beginning of the CSV file are ignored.
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;
Header Detection with the IGNORE
Clause
CSV Header Inference can be controlled by using the HEADER_
clause and the IGNORE
clause.IGNORE
clause.
INFER PIPELINE ....[FORMAT CSV] [IGNORE (X >= 1) LINES] [HEADER_DETECTION ON/OFF]
Default Behavior
-
If the first non-ignored line is entirely text, and subsequent lines contain a non-text field, then the first line is treated as the header and all remaining lines are ingested as data.
-
If all non-ignored lines are entirely text, there’s no clear header row.
Every line is ingested as data, with columns auto-named col0
,col1
,col2
, etc. -
If the first non-ignored line contains any non-text field, no header is inferred.
All lines are ingested as data, with columns auto-named.
HEADER_
-
Only the first non-ignored line is inspected.
If it’s entirely text, it becomes the header. Otherwise, no header is inferred. All subsequent lines are ingested as data.
HEADER_
-
No header is ever inferred.
Every line (including the first non-ignored line) is ingested as data, with columns auto-named col0
,col1
,col2
, etc.
Note
Empty lines in the source CSV file do not affect inference.IGNORE LINES
clause of the new CREATE PIPELINE
query.SKIP PARSER ERRORS
clause in the new CREATE PIPELINE
query.
Enabling the SKIP PARSER ERRORS
clause can be risky, as it may also suppress other parsing errors, such as inconsistent field counts or malformed records.
Infer Data from Parquet Files
Date/time values are stored internally in Parquet as physical integers with annotated logical types.
-
Parquet Logical Types and Infers:
Date
,Time
, andTimestamp
columns are inferred asDate
,Time(6)
, andDatetime(6)
columns respectively. -
Parquet Physical Types and Generates: Conversion of Parquet physical types (from
int
todate/time
objects) is performed with the help of built-in functions that are a part of theSET CLAUSE
of the generatedCREATE PIPELINE
query.
To infer data from Parquet files, run the following query:
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' FORMAT PARQUET;
Infer Data from Iceberg Files
When inferring data from Iceberg files:
-
All Iceberg primitive data types and nested structs are supported.
Map-type or List-type columns are skipped while inferring data, with a warning. -
Nested structs in Iceberg can be perceived as a tree-like structure.
While inferring data, nested structs are cut open to map each Iceberg field to a SingleStore table column. For example: An Iceberg field
a struct<int, c struct<d string, e int>>
will lead to the generation of three columns in SingleStore:a.
,b int a.
, andc. d string a.
.c. e int -
Column mappings are appended to the new
CREATE PIPELINE
query to enable users to simply copy paste the generated queries without needing to figure out column mappings on their own. -
Iceberg Inference skips the manual hassle of taking care of
date
/time
type Iceberg columns that cannot be mapped to a SingleStore table column without appropriate Conversion built-in functions.It automatically generates them as part of the SET CLAUSE
of the generatedCREATE PIPELINE
query. -
For data distribution, Iceberg supports the addition of a subset of columns as the partition key with transforms.
The partition key of Iceberg maps to the shard key of SingleStore. The SingleStore engine picks up the columns (at any nesting level) defined as the partition key and defines them as the shard key in the new
CREATE TABLE
query. -
To infer pipeline in merge mode:
-
Set the
infer_
global variable to 'pipeline_ options merge
' before inference. -
Or execute the following query:
INFER PIPELINE AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;This query will automatically generate
CREATE TABLE
,CREATE VIEW
, andCREATE PIPELINE
queries accordingly by adding extra metadata columns and other specific configurations.
-
-
To create all the inferred
TABLE
,VIEW
, andPIPELINE
objects automatically, execute the following query:CREATE INFERRED PIPELINE <name> AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;Query the view and pipeline by using the specified
<name>
.
Examples
The following examples show the use of INFER
and CREATE INFER
on a data file in Avro format stored in an AWS S3 bucket.
This example uses data that conforms to the schema of a books table shown below.INFER
and CREATE INFER
infer the schema for this table from an input file.
Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema which can be used in these examples.
books(Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME);
Example 1 - INFER TABLE
The following example produces a table definition by scanning the specified Avro file and inferring the table definition from selected rows in the file.
INFER TABLE AS LOAD DATA S3 's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL)"
Example 2 - INFER PIPELINE
The following example produces a table and pipeline definition by scanning the specified Avro file and inferring the table and definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
"CREATE TABLE `infer_example_table` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double NULL,
`publish_date` bigint(20) NOT NULL);
CREATE PIPELINE `infer_example_pipeline`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS '{\n \""aws_access_key_id\"":\""your_access_key_id\"",
\n \""aws_secret_access_key\"":\""your_secret_access_key\"",
\n \""aws_session_token\"":\""your_session_token\""}'
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `infer_example_table`
FORMAT AVRO(
`id` <- `id`,
`name` <- `name`,
`num_pages` <- `num_pages`,
`rating` <- `rating`,
`publish_date` <- `publish_date`);"
Example 3 - CREATE INFERRED PIPELINE
The following example creates a pipeline with name books_
by inferring the schema from the specified file.
CREATE INFERRED PIPELINE books_pipe AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
Created 'books_pipe' pipeline
The following SQL uses the SHOW CREATE PIPELINE
command to view the CREATE PIPELINE
for the pipeline that was inferred by the CREATE INFERRED PIPELINE
command above.
SHOW CREATE PIPELINE books_pipe;
Pipeline,Create Pipeline
books_pipe,"CREATE PIPELINE `books_pipe`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS <CREDENTIALS REDACTED>
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `books_pipe`
FORMAT AVRO(
`books_pipe`.`id` <- `id`,
`books_pipe`.`name` <- `name`,
`books_pipe`.`num_pages` <- `num_pages`,
`books_pipe`.`rating` <- `rating`,
`books_pipe`.`publish_date` <- `publish_date`)"
The following SQL uses the SHOW CREATE TABLE
command to view the CREATE TABLE
command for the table that was inferred by the CREATE INFERRED PIPELINE
command above.
SHOW CREATE TABLE books_pipe;
Table,Create Table
books_pipe,"CREATE TABLE `books_pipe` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double DEFAULT NULL,
`publish_date` bigint(20) NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'"
The pipeline and table definitions can be adjusted using CREATE OR REPLACE PIPELINE
(CREATE PIPELINE) and ALTER TABLE.
Once the pipeline and table definitions have been adjusted, start the pipeline.
START PIPELINE books_pipe FOREGROUND;
The command above starts a pipeline in the foreground so that errors will be displayed in the client.FOREGROUND
keyword.
Confirm that the data has been loaded.
SELECT * FROM books_pipeORDER BY id;
+----+--------------------+-----------+--------+------------------+
| id | name | num_pages | rating | publish_date |
+----+--------------------+-----------+--------+------------------+
| 1 | HappyPlace | 400 | 4.9 | 1680721200000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669665600000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591124400000000 |
+----+--------------------+-----------+--------+------------------+
Example 4 - Using AS_ JSON
The AS JSON
keyword can be used to produce pipeline and table definitions in JSON format.INFER PIPELINE
with the AS JSON
keyword.INFER TABLE
can be used in a similar manner.
The following example produces a pipeline and table definition by scanning the specified Avro file and inferring the definitions from selected rows in the file.
INFER PIPELINE AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVROAS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
"connection_string":"s3://data-folder/books.avro",
"link":null,"source_type":"S3",
"config":"{\"region\":\"us-west-2\"}",
"credentials":"{\n
\"aws_access_key_id\":\"your_access_key_id\",\n
\"aws_secret_access_key\":\"your_secret_access_key\",\n
\"aws_session_token\":\"your_session_token\"}",
"batch_interval":2500,
"resource_pool":null,
"max_partitions_per_batch":null,
"max_retries_per_batch_partition":null,
"enable_out_of_order_optimization":false,
"aggregator_pipeline":false,
"transform":null,"load_error_policy":null,
"dup_key_policy":null,
"table":"infer_example_table",
"procedure":null,
"data_format":"AVRO",
"with_temporal_conversion":false,
"avro_schema":null,
"time_zone":null,
"avro_schema_registry_url":null,
"fields_terminated_by":null,
"fields_enclosed_by":null,
"fields_escaped_by":null,
"lines_terminated_by":null,
"lines_starting_by":null,
"extended_null":null,
"enclosed_null":null,
"trailing_nullcols":null,
"null_defined_by":null,
"ignore_lines":null,
"column_list":[
"`id`","`name`",
"`num_pages`",
"`rating`",
"`publish_date`"],
"json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
"column_defaults":[null,null,null,null,null],
"where_clause":null,"set_clause":null,
"on_duplicate_key_update":null,
"kafka_key_start_index": 0,
"kafka_key_format":null,
"stop_on_error":null,
"enable_offsets_metadata_gc":false,
"gc_timestamp":0,"create_time":1716313589,
"alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
"columns":[
{"name":"`id`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`name`",
"type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
"base_type":"LONGTEXT",
"nullable":false,
"collation":"utf8_general_ci",
"length":null,"precision":null,
"scale":null,"signed":null,
"possible_values":null},
"default":null},
{"name":"`num_pages`",
"type":{"sql_text":"int(11) NOT NULL",
"base_type":"INT",
"nullable":false,
"collation":null,
"length":11,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`rating`",
"type":{"sql_text":"double NULL",
"base_type":"DOUBLE",
"nullable":true,
"collation":null,
"length":null,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null},
{"name":"`publish_date`",
"type":{"sql_text":"bigint(20) NOT NULL",
"base_type":"BIGINT",
"nullable":false,
"collation":null,
"length":20,
"precision":null,
"scale":null,
"signed":true,
"possible_values":null},
"default":null}],
"indexes":[]}}
Generate an Avro File
An Avro file can be generated that matches the schema of the following SQL CREATE TABLE
statement.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,Publish_date DATETIME);
Create and activate a Python virtual environment to use for generating the code and install the avro package.
python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate
pip3 install avro
Avro schemas are defined in JSON.
{"namespace": "books.avro","type": "record","name": "Book","fields": [{"name": "id", "type": "int"},{"name": "name", "type": "string"},{"name": "num_pages", "type": "int"},{"name": "rating", "type": "double"},{"name": "publish_timestamp", "type": "long","logicalType": "timestamp-micros"} ]}
The timestamp-micros
is a logical type in Avro that annotates an Avro int as defined in the Avro specification.
The following Python code creates an Avro file named books.books
table.
Either run the following code in the Python interpreter or save this code to a file named create_
import avro.schemafrom avro.datafile import DataFileWriterfrom avro.io import DatumWriterimport datetime as dtschema = avro.schema.parse(open("books.avsc","rb").read())day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,"rating":4.9, "publish_timestamp":day1})writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,"rating":4.9, "publish_timestamp":day2})writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,"rating":4.9, "publish_timestamp":day3})writer.close()
If you added the code to a file, change to the directory where this Python script resides and run the script to generate the Avro file.
cd /path/to/avro-scriptpython3 create_avro_file.py
After the Avro file has been created, move this file to the location from which you want to load it, like an Amazon S3 bucket.
Related Topics
Last modified: June 17, 2025