Schema and Pipeline Inference

Table and pipeline definitions can be inferred from input files using the following commands:

Using the IGNORE clause with INFER PIPELINE for CSV Files

When inferring data from a CSV file, use the IGNORE <n> LINES clause to ignore the specified number of lines from the beginning of the file. 

In the following example, the first two lines from the beginning of the CSV file are ignored.

INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;

Remarks

When the CSV file contains at least one non-text column, and:

  • The IGNORE clause is not specified and the first line of the CSV is in text format, the first line is inferred as the header (to assign names to the columns in the SingleStore table) and the following lines are ingested. Otherwise, standard column names (col0, col1, etc.) are used and all the lines in the CSV are ingested.

  • The IGNORE clause is set to 0 LINES, the header is not inferred. Standard column names (col0, col1, etc.) are used as the header, and all lines in the CSV are ingested.

  • The IGNORE clause is set to a positive integer, for example “n”, and the (n+1)th line of the CSV is in text format, this line is inferred as the header (to assign names to the columns in the SingleStore table), and the following lines are ingested. Otherwise, standard column names (col0, col1, etc.) are used and lines after the (n)th line in the CSV are ingested.

When the CSV file contains only text columns, and:

  • The IGNORE clause is either not specified or set to 0 LINES, standard column names (col0, col1, etc.) are used as the header (to assign names to the columns in the SingleStore table) and all lines in the CSV are ingested.

  • The IGNORE clause is set to a positive integer, for example “n”, and the first line is a legitimate line (having a consistent number of fields as the rest of the data), then the parser will infer the first line as the header. Otherwise, standard column names (col0, col1, etc.) are used as the header, and all lines below the (n)th line are ingested.

Generate an Avro File

An Avro file can be generated that matches the schema of the following CREATE TABLE statement.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);

Create and activate a Python virtual environment to use for generating the code and install the avro package.

python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate

pip3 install avro

Avro schemas are defined in JSON. The following JSON defines an Avro schema which matches the preceding books table schema. Save the schema in a file named books.avsc.

{"namespace": "books.avro",
 "type": "record",
 "name": "Book",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "num_pages", "type": "int"},
     {"name": "rating", "type": "double"},
     {"name": "publish_timestamp", "type": "long",
               "logicalType": "timestamp-micros"} ]}

The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification. The int stores the number of microseconds from the Unix epoch, January 1, 1970 00:00:00.00000 UTC.

The following Python code creates an Avro file named books.avro that contains data matching the structure of the books table.

Either run the following code in the Python interpreter or save this code to a file named create_avro_file.py.

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import datetime as dt
schema = avro.schema.parse(open("books.avsc","rb").read())
day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)
day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)
day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)
writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)
writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,
                            "rating":4.9, "publish_timestamp":day1})
writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,
                            "rating":4.9, "publish_timestamp":day2})
writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,
                            "rating":4.9, "publish_timestamp":day3})
writer.close()

If the code is in a file, navigate to the directory containing the Python script and run it to generate the Avro file.

cd /path/to/avro-script
python3 create_avro_file.py

After the Avro file is created, move it to the desired location for loading, such as an Amazon S3 bucket.

Examples

Infer Headers When All Columns are Strings

If all non-ignored lines in a CSV file contain only text, SingleStore cannot automatically detect a header. In this case, columns are assigned default names: col0, col1, col2, etc. This example uses books.csv:

Id,Name,NumPages,Rating,PublishDate
1,HappyPlace,400,4.9,2023-04-05
2,Legends & Lattes,304,4.9,2022-11-28
3,The Vanishing Half,352,4.9,2020-06-02

Run the following command without header detection:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV;

As a result, column names are auto-generated as col0, col1, col2, col3, and col4.

Run the following command with header detection enabled:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV HEADER_DETECTION ON;

The first line is treated as the header. Columns are named correctly as Id, Name, NumPages, Rating, PublishDate.

Using AS JSON

The following example demonstrates how to use INFER PIPELINE with the AS JSON keyword with a data file in Avro format stored in an AWS S3 bucket. INFER TABLE can be used in a similar manner.

This example uses data that conforms to the schema of the books table, as shown in the following.

{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}

Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema.

The following example generates a pipeline and table definition by scanning the specified Avro file and inferring the schema from selected rows. The output is returned in JSON format.

INFER PIPELINE AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO
AS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
 "connection_string":"s3://data-folder/books.avro",
 "link":null,"source_type":"S3",
 "config":"{\"region\":\"us-west-2\"}",
 "credentials":"{\n
    \"aws_access_key_id\":\"your_access_key_id\",\n
    \"aws_secret_access_key\":\"your_secret_access_key\",\n
    \"aws_session_token\":\"your_session_token\"}",
 "batch_interval":2500,
 "resource_pool":null,
 "max_partitions_per_batch":null,
 "max_retries_per_batch_partition":null,
 "enable_out_of_order_optimization":false,
 "aggregator_pipeline":false,
 "transform":null,"load_error_policy":null,
 "dup_key_policy":null,
 "table":"infer_example_table",
 "procedure":null,
 "data_format":"AVRO",
 "with_temporal_conversion":false,
 "avro_schema":null,
 "time_zone":null,
 "avro_schema_registry_url":null,
 "fields_terminated_by":null,
 "fields_enclosed_by":null,
 "fields_escaped_by":null,
 "lines_terminated_by":null,
 "lines_starting_by":null,
 "extended_null":null,
 "enclosed_null":null,
 "trailing_nullcols":null,
 "null_defined_by":null,
 "ignore_lines":null,
 "column_list":[
  "`id`","`name`",
  "`num_pages`",
  "`rating`",
  "`publish_date`"],
 "json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
 "column_defaults":[null,null,null,null,null],
 "where_clause":null,"set_clause":null,
 "on_duplicate_key_update":null,
 "kafka_key_start_index": 0,
 "kafka_key_format":null,
 "stop_on_error":null,
 "enable_offsets_metadata_gc":false,
 "gc_timestamp":0,"create_time":1716313589,
 "alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
 "columns":[
 {"name":"`id`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`name`",
  "type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
   "base_type":"LONGTEXT",
   "nullable":false,
   "collation":"utf8_general_ci",
   "length":null,"precision":null,
   "scale":null,"signed":null,
   "possible_values":null},
  "default":null},
 {"name":"`num_pages`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`rating`",
  "type":{"sql_text":"double NULL",
   "base_type":"DOUBLE",
   "nullable":true,
   "collation":null,
   "length":null,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`publish_date`",
  "type":{"sql_text":"bigint(20) NOT NULL",
   "base_type":"BIGINT",
   "nullable":false,
   "collation":null,
   "length":20,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null}],
 "indexes":[]}}

Last modified: November 14, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.