# Schema and Pipeline Inference

Table and pipeline definitions can be inferred from input files using the following commands:

* ` 
            INFER TABLE
           `
* ` 
            INFER PIPELINE
           `
* ` 
            CREATE INFERRED PIPELINE
           `

## Using the `IGNORE` clause with `INFER PIPELINE` for CSV Files

When inferring data from a CSV file, use the `IGNORE <n> LINES` clause to ignore the specified number of lines from the beginning of the file. 

In the following example, the first two lines from the beginning of the CSV file are ignored.

```sql
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' IGNORE 2 LINES;
```

## Header Detection with the `IGNORE` Clause

CSV Header Inference can be controlled by using the `HEADER_DETECTION ON/OFF` clause and the `IGNORE` clause. The engine ignores the data in the first “X” lines, where X is the value defined in the `IGNORE` clause. For example:

```sql
INFER PIPELINE ....[FORMAT CSV] [IGNORE (X >= 1) LINES] [HEADER_DETECTION ON/OFF]
```

**Default Behavior**

* If the first non-ignored line is entirely text, and subsequent lines contain a non-text field, then the first line is treated as the header and all remaining lines are ingested as data.
* If all non-ignored lines are entirely text and there is no clear header row, then every line is ingested as data, with columns auto-named `col0`, `col1`, `col2`, etc.
* If the first non-ignored line contains any non-text field and no header is inferred, then all lines are ingested as data, with columns auto-named.

**HEADER\_DETECTION ON**

* Only the first non-ignored line is inspected. If it’s entirely text, it becomes the header. Otherwise, no header is inferred. All subsequent lines are ingested as data.

**HEADER\_DETECTION OFF**

* No header is inferred. Every line (including the first non-ignored line) is ingested as data, with columns auto-named `col0`, `col1`, `col2`, etc.

> **📝 Note**: Empty lines in the source CSV file do not affect inference. Empty lines at the beginning of the file are automatically accounted for and reflected in the `IGNORE LINES` clause of the new `CREATE PIPELINE` query. However, empty lines that appear within or at the end of the data are not automatically handled and must be managed by the user. This can be done by including the `SKIP PARSER ERRORS` clause in the new `CREATE PIPELINE` query.SingleStore does not recommend enabling the `SKIP PARSER ERRORS` clause, as it suppresses other parsing errors, such as inconsistent field counts or malformed records.

## Infer Data from Parquet Files

`Date`/`Time` values are stored internally in Parquet as physical integers with annotated logical types. While inferring data from Parquet files, SingleStore recognizes:

* **Parquet Logical Types and Infers**: `Date`, `Time`, and `Timestamp` columns are inferred as `Date`, `Time(6)`, and `Datetime(6)` columns respectively.
* **Parquet Physical Types and Generates**: Conversion of Parquet physical types (from `int` to `date/time` objects) is performed with the help of built-in functions that are a part of the `SET CLAUSE` of the generated `CREATE PIPELINE` query.

To infer data from Parquet files, run the following command:

```sql
INFER PIPELINE AS LOAD DATA FS '<path_to_local_file>' FORMAT PARQUET;
```

Parquet supports time values with a minimum precision of one millisecond. Because SingleStore stores time values at microsecond precision, Parquet date and time values are converted to microsecond precision in the `CREATE PIPELINE` statement. Seconds and milliseconds are multiplied by 1000 to convert to microseconds, while nanosecond values are divided by 1000. SingleStore does not support nanosecond precision.

The following example shows how a fully exhaustive Parquet file with all the Parquet date and time column variations is inferred:

```sql
CREATE TABLE `infer_example_table` (
    `date` date NULL,
    `timestamp_seconds` datetime(6) NULL,
    `timestamp_milliseconds` datetime(6) NULL,
    `timestamp_microseconds` datetime(6) NULL,
    `timestamp_nanoseconds` datetime(6) NULL,
    `time_seconds` time(6) NULL,
    `time_milliseconds` time(6) NULL,
    `time_microseconds` time(6) NULL,
    `time_nanoseconds` time(6) NULL
);

CREATE PIPELINE `infer_example_pipeline`
AS LOAD DATA FS '<connection_string>'
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `infer_example_table`
FORMAT Parquet
(
    @date <- `date`,
    @timestamp_seconds <- `timestamp_seconds`,
    @timestamp_milliseconds <- `timestamp_milliseconds`,
    @timestamp_microseconds <- `timestamp_microseconds`,
    @timestamp_nanoseconds <- `timestamp_nanoseconds`,
    @time_seconds <- `time_seconds`,
    @time_milliseconds <- `time_milliseconds`,
    @time_microseconds <- `time_microseconds`,
    @time_nanoseconds <- `time_nanoseconds`
)
SET
    `date` = DATE_ADD('1970-01-01', INTERVAL @date DAY),
    `timestamp_seconds` = DATE_ADD('1970-01-01', INTERVAL @timestamp_seconds*1000 MICROSECOND),
    `timestamp_milliseconds` = DATE_ADD('1970-01-01', INTERVAL @timestamp_milliseconds*1000 MICROSECOND),
    `timestamp_microseconds` = DATE_ADD('1970-01-01', INTERVAL @timestamp_microseconds MICROSECOND),
    `timestamp_nanoseconds` = DATE_ADD('1970-01-01', INTERVAL @timestamp_nanoseconds/1000 MICROSECOND),
    `time_seconds` = DATE_FORMAT(DATE_ADD('1970-01-01', INTERVAL @time_seconds*1000 MICROSECOND), '%H:%i:%s.%f'),
    `time_milliseconds` = DATE_FORMAT(DATE_ADD('1970-01-01', INTERVAL @time_milliseconds*1000 MICROSECOND), '%H:%i:%s.%f'),
    `time_microseconds` = DATE_FORMAT(DATE_ADD('1970-01-01', INTERVAL @time_microseconds MICROSECOND), '%H:%i:%s.%f'),
    `time_nanoseconds` = DATE_FORMAT(DATE_ADD('1970-01-01', INTERVAL @time_nanoseconds/1000 MICROSECOND), '%H:%i:%s.%f');
```

## Infer Data from Iceberg Files

When inferring data from Iceberg files:

* All Iceberg primitive data types and nested structs are supported. Map-type or List-type columns are skipped while inferring data, with a warning.
* Nested structs in Iceberg can be perceived as a tree-like structure. While inferring data, nested structs are cut open to map each Iceberg field to a SingleStore table column.

  For example: An Iceberg field `a struct<int, c struct<d string, e int>>` will lead to the generation of three columns in SingleStore: `a.b int`, `a.c.d string`, and `a.c.e int`.
* Column mappings are included in the generated `CREATE PIPELINE` query that enable users to copy and paste the query without defining the column mappings manually.
* Iceberg Inference automatically handles `Date` and `Time` columns that cannot be directly mapped to a SingleStore table column without appropriate conversion built-in functions. These columns are automatically generated in the `SET` clause of the generated `CREATE PIPELINE` query.
* For data distribution, Iceberg supports the addition of a subset of columns as the partition key with transforms. The partition key of Iceberg maps to the shard key of SingleStore.

  SingleStore identifies columns defined as the partition key, at any nesting level, and sets them as the shard key in the generated `CREATE TABLE` query.
* To infer pipeline in merge mode:

  * Set the `infer_pipeline_options` global variable to '`merge`' before inference.
  * Or run the following command:
    ```sql
    INFER PIPELINE AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;
    ```
    This command automatically generates `CREATE TABLE`, `CREATE VIEW`, and `CREATE PIPELINE` queries accordingly by adding extra metadata columns and other specific configurations.
* To create all the inferred `TABLE`, `VIEW`, and `PIPELINE` objects automatically, run the following command:
  ```sql
  CREATE INFERRED PIPELINE <name> AS LOAD DATA FS ‘<iceberg-table-path>’ FORMAT ICEBERG OPTIONS = ‘merge’;
  ```
  Query the view and pipeline by using the specified `<name>`.

## Generate an Avro File

An Avro file can be generated that matches the schema of the following `CREATE TABLE` statement.

```sql
CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);
```

Create and activate a Python virtual environment to use for generating the code and install the avro package.

```
python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate

pip3 install avro
```

Avro schemas are defined in JSON. The following JSON defines an Avro schema which matches the preceding `books` table schema. Save the schema in a file named `books.avsc`.

```json
{"namespace": "books.avro",
 "type": "record",
 "name": "Book",
 "fields": [
     {"name": "id", "type": "int"},
     {"name": "name",  "type": "string"},
     {"name": "num_pages", "type": "int"},
     {"name": "rating", "type": "double"},
     {"name": "publish_timestamp", "type": "long",
               "logicalType": "timestamp-micros"} ]}
```

The `timestamp-micros` is a logical type in Avro that annotates an Avro int as defined in the [Avro specification](https://avro.apache.org/docs/1.11.1/specification/). The int stores the number of microseconds from the Unix epoch, January 1, 1970 00:00:00.00000 UTC.

The following Python code creates an Avro file named `books.avro` that contains data matching the structure of the `books` table.

Either run the following code in the Python interpreter or save this code to a file named `create_avro_file.py`.

```python
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import datetime as dt

schema = avro.schema.parse(open("books.avsc","rb").read())

day1 = int(dt.datetime.fromisoformat('2023-04-05 12:00').timestamp()*1000*1000)
day2 = int(dt.datetime.fromisoformat('2022-11-28 12:00').timestamp()*1000*1000)
day3 = int(dt.datetime.fromisoformat('2020-06-02 12:00').timestamp()*1000*1000)

writer = DataFileWriter(open("books.avro","wb"), DatumWriter(), schema)

writer.append({"id": 1, "name": "HappyPlace", "num_pages": 400,
                            "rating":4.9, "publish_timestamp":day1})
writer.append({"id": 2, "name": "Legends & Lattes", "num_pages": 304,
                            "rating":4.9, "publish_timestamp":day2})
writer.append({"id": 3, "name": "The Vanishing Half", "num_pages": 352,
                            "rating":4.9, "publish_timestamp":day3})
writer.close()
```

If the code is in a file, navigate to the directory containing the Python script and run it to generate the Avro file.

```shell
cd /path/to/avro-script
python3 create_avro_file.py
```

After the Avro file is created, move it to the desired location for loading, such as an Amazon S3 bucket.

## Examples

## Infer Headers When All Columns are Strings

If all non-ignored lines in a CSV file contain only text, SingleStore cannot automatically detect a header. In this case, columns are assigned default names: `col0`, `col1`, `col2`, etc. This example uses `books.csv`:

```
Id,Name,NumPages,Rating,PublishDate
1,HappyPlace,400,4.9,2023-04-05
2,Legends & Lattes,304,4.9,2022-11-28
3,The Vanishing Half,352,4.9,2020-06-02

```

Run the following command without header detection:

```sql
INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV;
```

As a result, column names are auto-generated as `col0`, `col1`, `col2`, `col3`, and `col4`.

Run the following command with header detection enabled:

```
INFER PIPELINE AS LOAD DATA FS 'books.csv' FORMAT CSV HEADER_DETECTION ON;

```

The first line is treated as the header. Columns are named correctly as `Id`, `Name`, `NumPages`, `Rating`, `PublishDate`.

## Using AS JSON

The following example demonstrates how to use `INFER PIPELINE` with the `AS JSON` keyword with a data file in Avro format stored in an AWS S3 bucket. `INFER TABLE` can be used in a similar manner.

This example uses data that conforms to the schema of the books table, as shown in the following.

```
{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}
```

Refer to [Generate an Avro File](https://docs.singlestore.com/#section-idm4572725773489634329890353354.md) for an example of generating an Avro file that conforms to this schema.

The following example generates a pipeline and table definition by scanning the specified Avro file and inferring the schema from selected rows. The output is returned in JSON format.

```sql
INFER PIPELINE AS LOAD DATA S3
         's3://data_folder/books.avro'
CONFIG '{"region":"<region_name>"}'
CREDENTIALS '{
    "aws_access_key_id":"<your_access_key_id>",
    "aws_secret_access_key":"<your_secret_access_key>",
    "aws_session_token":"<your_session_token>"}'
FORMAT AVRO
AS JSON;

```

```output

{"pipeline_definition":
{"name":"infer_example_pipeline",
 "connection_string":"s3://data-folder/books.avro",
 "link":null,"source_type":"S3",
 "config":"{\"region\":\"us-west-2\"}",
 "credentials":"{\n
    \"aws_access_key_id\":\"your_access_key_id\",\n
    \"aws_secret_access_key\":\"your_secret_access_key\",\n
    \"aws_session_token\":\"your_session_token\"}",
 "batch_interval":2500,
 "resource_pool":null,
 "max_partitions_per_batch":null,
 "max_retries_per_batch_partition":null,
 "enable_out_of_order_optimization":false,
 "aggregator_pipeline":false,
 "transform":null,"load_error_policy":null,
 "dup_key_policy":null,
 "table":"infer_example_table",
 "procedure":null,
 "data_format":"AVRO",
 "with_temporal_conversion":false,
 "avro_schema":null,
 "time_zone":null,
 "avro_schema_registry_url":null,
 "fields_terminated_by":null,
 "fields_enclosed_by":null,
 "fields_escaped_by":null,
 "lines_terminated_by":null,
 "lines_starting_by":null,
 "extended_null":null,
 "enclosed_null":null,
 "trailing_nullcols":null,
 "null_defined_by":null,
 "ignore_lines":null,
 "column_list":[
  "`id`","`name`",
  "`num_pages`",
  "`rating`",
  "`publish_date`"],
 "json_paths":[["id"],["name"],["num_pages"],["rating"],["publish_date"]],
 "column_defaults":[null,null,null,null,null],
 "where_clause":null,"set_clause":null,
 "on_duplicate_key_update":null,
 "kafka_key_start_index": 0,
 "kafka_key_format":null,
 "stop_on_error":null,
 "enable_offsets_metadata_gc":false,
 "gc_timestamp":0,"create_time":1716313589,
 "alter_time":0,"cookie":null},
"table_definition":
{"name":"infer_example_table",
 "columns":[
 {"name":"`id`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`name`",
  "type":{"sql_text":"longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL",
   "base_type":"LONGTEXT",
   "nullable":false,
   "collation":"utf8_general_ci",
   "length":null,"precision":null,
   "scale":null,"signed":null,
   "possible_values":null},
  "default":null},
 {"name":"`num_pages`",
  "type":{"sql_text":"int(11) NOT NULL",
   "base_type":"INT",
   "nullable":false,
   "collation":null,
   "length":11,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`rating`",
  "type":{"sql_text":"double NULL",
   "base_type":"DOUBLE",
   "nullable":true,
   "collation":null,
   "length":null,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null},
 {"name":"`publish_date`",
  "type":{"sql_text":"bigint(20) NOT NULL",
   "base_type":"BIGINT",
   "nullable":false,
   "collation":null,
   "length":20,
   "precision":null,
   "scale":null,
   "signed":true,
   "possible_values":null},
  "default":null}],
 "indexes":[]}}
```

## Related Topics

* [CREATE PIPELINE](https://docs.singlestore.com/cloud/reference/sql-reference/pipelines-commands/create-pipeline.md)
* [LOAD DATA](https://docs.singlestore.com/cloud/reference/sql-reference/data-manipulation-language-dml/load-data.md)

***

Modified at: February 27, 2026

Source: [/cloud/load-data/about-singlestore-pipelines/pipeline-concepts/schema-and-pipeline-inference/](https://docs.singlestore.com/cloud/load-data/about-singlestore-pipelines/pipeline-concepts/schema-and-pipeline-inference/)

(An index of the documentation is available at /llms.txt)
