Schema and Pipeline Inference

Table and pipeline definitions can be inferred from input files using the following commands:

Using the IGNORE clause with INFER PIPELINE for CSV Files

When inferring data from a CSV file, use the IGNORE <n> LINES clause to ignore the specified number of lines from the beginning of the file. 

In the following example, the first two lines from the beginning of the CSV file are ignored.

INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;

Header Detection with the IGNORE Clause

CSV Header Inference can be controlled by using the HEADER_DETECTION ON/OFF clause and the IGNORE clause. The engine ignores the data in the first “X” lines, where X is the value defined in the IGNORE clause. For example:

INFER PIPELINE ....[FORMAT CSV] [IGNORE (X >= 1) LINES] [HEADER_DETECTION ON/OFF]

Default Behavior

  • If the first non-ignored line is entirely text, and subsequent lines contain a non-text field, then the first line is treated as the header and all remaining lines are ingested as data.

  • If all non-ignored lines are entirely text and there is no clear header row, then every line is ingested as data, with columns auto-named col0, col1, col2, etc.

  • If the first non-ignored line contains any non-text field and no header is inferred, then all lines are ingested as data, with columns auto-named.

HEADER_DETECTION ON

  • Only the first non-ignored line is inspected. If it’s entirely text, it becomes the header. Otherwise, no header is inferred. All subsequent lines are ingested as data.

HEADER_DETECTION OFF

  • No header is inferred. Every line (including the first non-ignored line) is ingested as data, with columns auto-named col0, col1, col2, etc.

Note

Empty lines in the source CSV file do not affect inference. Empty lines at the beginning of the file are automatically accounted for and reflected in the IGNORE LINES clause of the new CREATE PIPELINE query. However, empty lines that appear within or at the end of the data are not automatically handled and must be managed by the user. This can be done by including the SKIP PARSER ERRORS clause in the new CREATE PIPELINE query.

SingleStore does not recommend enabling the SKIP PARSER ERRORS clause, as it suppresses other parsing errors, such as inconsistent field counts or malformed records.

Infer Data from Parquet Files

Date/Time values are stored internally in Parquet as physical integers with annotated logical types. While inferring data from Parquet files, SingleStore recognizes:

  • Parquet Logical Types and Infers: Date, Time, and Timestamp columns are inferred as Date, Time(6), and Datetime(6) columns respectively.

  • Parquet Physical Types and Generates: Conversion of Parquet physical types (from int to date/time objects) is performed with the help of built-in functions that are a part of the SET CLAUSE of the generated CREATE PIPELINE query.

To infer data from Parquet files, run the following command:

INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' FORMAT PARQUET;

Infer Data from Iceberg Files

When inferring data from Iceberg files:

  • All Iceberg primitive data types and nested structs are supported. Map-type or List-type columns are skipped while inferring data, with a warning.

  • Nested structs in Iceberg can be perceived as a tree-like structure. While inferring data, nested structs are cut open to map each Iceberg field to a SingleStore table column.

    For example: An Iceberg field a struct<int, c struct<d string, e int>> will lead to the generation of three columns in SingleStore: a.b int, a.c.d string, and a.c.e int.

  • Column mappings are included in the generated CREATE PIPELINE query that enable users to copy and paste the query without defining the column mappings manually.

  • Iceberg Inference automatically handles Date and Time columns that cannot be directly mapped to a SingleStore table column without appropriate conversion built-in functions. These columns are automatically generated in the SET clause of the generated CREATE PIPELINE query.

  • For data distribution, Iceberg supports the addition of a subset of columns as the partition key with transforms. The partition key of Iceberg maps to the shard key of SingleStore.

    SingleStore identifies columns defined as the partition key, at any nesting level, and sets them as the shard key in the generated CREATE TABLE query.

  • To infer pipeline in merge mode:

    • Set the infer_pipeline_options global variable to 'merge' before inference.

    • Or run the following command:

      INFER PIPELINE AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS =merge;

      This command automatically generates CREATE TABLE, CREATE VIEW, and CREATE PIPELINE queries accordingly by adding extra metadata columns and other specific configurations.

  • To create all the inferred TABLE, VIEW, and PIPELINE objects automatically, run the following command:

    CREATE INFERRED PIPELINE <name> AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS =merge;

    Query the view and pipeline by using the specified <name>.

Generate an Avro File

An Avro file can be generated that matches the schema of the following CREATE TABLE statement.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);

Create and activate a Python virtual environment to use for generating the code and install the avro package.

python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate

pip3 install avro

Avro schemas are defined in JSON. The following JSON defines an Avro schema which matches the preceding books table schema. Save the schema in a file named books.avsc.

{"namespace": "books.avro",
 "type": "record",
 "name": "Book",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "num_pages", "type": "int"},
     {"name": "rating", "type": "double"},
     {"name": "publish_timestamp", "type": "long",
               "logicalType": "timestamp-micros"} ]}

The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification. The int stores the number of microseconds from the Unix epoch, January 1, 1970 00:00:00.00000 UTC.

The following Python code creates an Avro file named books.avro that contains data matching the structure of the books table.

Either run the following code in the Python interpreter or save this code to a file named create_avro_file.py.

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import datetime as dt
schema = avro.schema.parse(open("books.avsc","rb").read())
day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)
day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)
day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)
writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)
writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,
                            "rating":4.9, "publish_timestamp":day1})
writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,
                            "rating":4.9, "publish_timestamp":day2})
writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,
                            "rating":4.9, "publish_timestamp":day3})
writer.close()

If the code is in a file, navigate to the directory containing the Python script and run it to generate the Avro file.

cd /path/to/avro-script
python3 create_avro_file.py

After the Avro file is created, move it to the desired location for loading, such as an Amazon S3 bucket.

Examples

Infer Headers When All Columns are Strings

If all non-ignored lines in a CSV file contain only text, SingleStore cannot automatically detect a header. In this case, columns are assigned default names: col0, col1, col2, etc. This example uses books.csv:

Id,Name,NumPages,Rating,PublishDate
1,HappyPlace,400,4.9,2023-04-05
2,Legends & Lattes,304,4.9,2022-11-28
3,The Vanishing Half,352,4.9,2020-06-02

Run the following command without header detection:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV;

As a result, column names are auto-generated as col0, col1, col2, col3, and col4.

Run the following command with header detection enabled:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV HEADER_DETECTION ON;

The first line is treated as the header. Columns are named correctly as Id, Name, NumPages, Rating, PublishDate.

Using AS JSON

The following example demonstrates how to use INFER PIPELINE with the AS JSON keyword with a data file in Avro format stored in an AWS S3 bucket. INFER TABLE can be used in a similar manner.

This example uses data that conforms to the schema of the books table, as shown in the following.

{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}

Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema.

The following example generates a pipeline and table definition by scanning the specified Avro file and inferring the schema from selected rows. The output is returned in JSON format.

INFER PIPELINE AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO
AS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
 "connection_string":"s3://data-folder/books.avro",
 "link":null,"source_type":"S3",
 "config":"{\"region\":\"us-west-2\"}",
 "credentials":"{\n
    \"aws_access_key_id\":\"your_access_key_id\",\n
    \"aws_secret_access_key\":\"your_secret_access_key\",\n
    \"aws_session_token\":\"your_session_token\"}",
 "batch_interval":2500,
 "resource_pool":null,
 "max_partitions_per_batch":null,
 "max_retries_per_batch_partition":null,
 "enable_out_of_order_optimization":false,
 "aggregator_pipeline":false,
 "transform":null,"load_error_policy":null,
 "dup_key_policy":null,
 "table":"infer_example_table",
 "procedure":null,
 "data_format":"AVRO",
 "with_temporal_conversion":false,
 "avro_schema":null,
 "time_zone":null,
 "avro_schema_registry_url":null,
 "fields_terminated_by":null,
 "fields_enclosed_by":null,
 "fields_escaped_by":null,
 "lines_terminated_by":null,
 "lines_starting_by":null,
 "extended_null":null,
 "enclosed_null":null,
 "trailing_nullcols":null,
 "null_defined_by":null,
 "ignore_lines":null,
 "column_list":[
  "`id`","`name`",
  "`num_pages`",
  "`rating`",
  "`publish_date`"],
 "json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
 "column_defaults":[null,null,null,null,null],
 "where_clause":null,"set_clause":null,
 "on_duplicate_key_update":null,
 "kafka_key_start_index": 0,
 "kafka_key_format":null,
 "stop_on_error":null,
 "enable_offsets_metadata_gc":false,
 "gc_timestamp":0,"create_time":1716313589,
 "alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
 "columns":[
 {"name":"`id`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`name`",
  "type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
   "base_type":"LONGTEXT",
   "nullable":false,
   "collation":"utf8_general_ci",
   "length":null,"precision":null,
   "scale":null,"signed":null,
   "possible_values":null},
  "default":null},
 {"name":"`num_pages`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`rating`",
  "type":{"sql_text":"double NULL",
   "base_type":"DOUBLE",
   "nullable":true,
   "collation":null,
   "length":null,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`publish_date`",
  "type":{"sql_text":"bigint(20) NOT NULL",
   "base_type":"BIGINT",
   "nullable":false,
   "collation":null,
   "length":20,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null}],
 "indexes":[]}}

Last modified: November 14, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.