Schema and Pipeline Inference

Table and pipeline definitions can be inferred from input files using the following commands:

Generate an Avro File

An Avro file can be generated that matches the schema of the following CREATE TABLE statement.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);

Create and activate a Python virtual environment to use for generating the code and install the avro package.

python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate

pip3 install avro

Avro schemas are defined in JSON. The following JSON defines an Avro schema which matches the preceding books table schema. Save the schema in a file named books.avsc.

{"namespace": "books.avro",
 "type": "record",
 "name": "Book",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "num_pages", "type": "int"},
     {"name": "rating", "type": "double"},
     {"name": "publish_timestamp", "type": "long",
               "logicalType": "timestamp-micros"} ]}

The timestamp-micros is a logical type in Avro that annotates an Avro int as defined in the Avro specification. The int stores the number of microseconds from the Unix epoch, January 1, 1970 00:00:00.00000 UTC.

The following Python code creates an Avro file named books.avro that contains data matching the structure of the books table.

Either run the following code in the Python interpreter or save this code to a file named create_avro_file.py.

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import datetime as dt
schema = avro.schema.parse(open("books.avsc","rb").read())
day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)
day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)
day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)
writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)
writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,
                            "rating":4.9, "publish_timestamp":day1})
writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,
                            "rating":4.9, "publish_timestamp":day2})
writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,
                            "rating":4.9, "publish_timestamp":day3})
writer.close()

If the code is in a file, navigate to the directory containing the Python script and run it to generate the Avro file.

cd /path/to/avro-script
python3 create_avro_file.py

After the Avro file is created, move it to the desired location for loading, such as an Amazon S3 bucket.

Examples

Infer Headers When All Columns are Strings

If all non-ignored lines in a CSV file contain only text, SingleStore cannot automatically detect a header. In this case, columns are assigned default names: col0, col1, col2, etc. This example uses books.csv:

Id,Name,NumPages,Rating,PublishDate
1,HappyPlace,400,4.9,2023-04-05
2,Legends & Lattes,304,4.9,2022-11-28
3,The Vanishing Half,352,4.9,2020-06-02

Run the following command without header detection:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV;

As a result, column names are auto-generated as col0, col1, col2, col3, and col4.

Run the following command with header detection enabled:

INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV HEADER_DETECTION ON;

The first line is treated as the header. Columns are named correctly as Id, Name, NumPages, Rating, PublishDate.

Using AS JSON

The following example demonstrates how to use INFER PIPELINE with the AS JSON keyword with a data file in Avro format stored in an AWS S3 bucket. INFER TABLE can be used in a similar manner.

This example uses data that conforms to the schema of the books table, as shown in the following.

{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}

Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema.

The following example generates a pipeline and table definition by scanning the specified Avro file and inferring the schema from selected rows. The output is returned in JSON format.

INFER PIPELINE AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO
AS JSON;
{"pipeline_definition":
{"name":"infer_example_pipeline",
 "connection_string":"s3://data-folder/books.avro",
 "link":null,"source_type":"S3",
 "config":"{\"region\":\"us-west-2\"}",
 "credentials":"{\n
    \"aws_access_key_id\":\"your_access_key_id\",\n
    \"aws_secret_access_key\":\"your_secret_access_key\",\n
    \"aws_session_token\":\"your_session_token\"}",
 "batch_interval":2500,
 "resource_pool":null,
 "max_partitions_per_batch":null,
 "max_retries_per_batch_partition":null,
 "enable_out_of_order_optimization":false,
 "aggregator_pipeline":false,
 "transform":null,"load_error_policy":null,
 "dup_key_policy":null,
 "table":"infer_example_table",
 "procedure":null,
 "data_format":"AVRO",
 "with_temporal_conversion":false,
 "avro_schema":null,
 "time_zone":null,
 "avro_schema_registry_url":null,
 "fields_terminated_by":null,
 "fields_enclosed_by":null,
 "fields_escaped_by":null,
 "lines_terminated_by":null,
 "lines_starting_by":null,
 "extended_null":null,
 "enclosed_null":null,
 "trailing_nullcols":null,
 "null_defined_by":null,
 "ignore_lines":null,
 "column_list":[
  "`id`","`name`",
  "`num_pages`",
  "`rating`",
  "`publish_date`"],
 "json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
 "column_defaults":[null,null,null,null,null],
 "where_clause":null,"set_clause":null,
 "on_duplicate_key_update":null,
 "kafka_key_start_index": 0,
 "kafka_key_format":null,
 "stop_on_error":null,
 "enable_offsets_metadata_gc":false,
 "gc_timestamp":0,"create_time":1716313589,
 "alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
 "columns":[
 {"name":"`id`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`name`",
  "type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
   "base_type":"LONGTEXT",
   "nullable":false,
   "collation":"utf8_general_ci",
   "length":null,"precision":null,
   "scale":null,"signed":null,
   "possible_values":null},
  "default":null},
 {"name":"`num_pages`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`rating`",
  "type":{"sql_text":"double NULL",
   "base_type":"DOUBLE",
   "nullable":true,
   "collation":null,
   "length":null,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`publish_date`",
  "type":{"sql_text":"bigint(20) NOT NULL",
   "base_type":"BIGINT",
   "nullable":false,
   "collation":null,
   "length":20,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null}],
 "indexes":[]}}

Last modified: November 14, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.