# Create a Parquet Pipeline

The `CREATE PIPELINE .. FORMAT PARQUET` statement can be used to load Parquet files into SingleStore. This command extracts specific fields from records in [Apache Parquet](https://parquet.apache.org/) files and loads those into tables in SingleStore. The `CREATE PIPELINE` statement assigns the extracted fields to the columns of a new row to be inserted into `table_name` or passed to `proc_name`. The fields can also be assigned to temporary values in the `SET` clause, for use in SQL transformations.

* Rows that don’t match the `WHERE` clause are not included.
* Parquet pipelines do not support Kafka.

When you write a `CREATE PIPELINE .. FORMAT PARQUET` statement, you include the `LOAD DATA` clause. This clause supports a subset of the error recovery options that are supported by [CSV LOAD DATA](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-manipulation-language-dml/load-data/#UUID-5ba15a21-7af7-0a56-f183-8660eeee699f.md).

> **⚠️ Warning**: Parquet files that are compressed using the internal Parquet writer should not have a .gz extension in the file name.

> **📝 Note**: If you are loading .parquet data from an S3 bucket, ensure that non-empty \_SUCCESS, \_committed, and \_started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

## Syntax

```
CREATE [OR REPLACE] PIPELINE <pipeline_name> AS
LOAD DATA  <configuration> 
INTO TABLE <table_name> 
FORMAT PARQUET
<parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']
[SET  <column_name> = <expression>,... ]
[WHERE <expression>,...]

<parquet_subvalue_mapping>:
    ( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)

<parquet_subvalue_path>:
    {ident [::ident ...]}
```

The `configuration` is a configuration specification for loading from various data sources. Refer to [CREATE PIPELINE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/create-pipeline.md) for details.

Additional options supported by [CREATE PIPELINE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/create-pipeline.md) and [LOAD DATA](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-manipulation-language-dml/load-data.md) are supported.

## parquet\_subvalue\_mapping

The `parquet_subvalue_mapping` specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the `SET` clause. The `parquet_subvalue_mapping` consists of a list of pairs of a column name or a temporary variable name and a `parquet_subvalue_path`. A `parquet_subvalue_path` is a list of :: -separated list of field names which are used to perform successive field name lookups in nested Parquet schemas.

The following rules apply:

* The last field in `parquet_subvalue_path` can be either:

  * A primitive type which SingleStore loads using the type conversion rules, or
  * A nested Parquet type (`LIST`, `MAP`, or `GROUP`) which SingleStore loads into the target column as JSON or BSON.

  Refer to [Parquet Logical Types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) for more information on Parquet nested types.

  Parquet `GROUP` type is also a Parquet nested type.
* Intermediate fields in the `parquet_subvalue_path` (all components except the last one) must not be repeated qualifiers. Only the last field in the path can have a repeated qualifier or be a nested type (`LIST`, `MAP`, or `GROUP`), which SingleStore loads as JSON or BSON.
* All `parquet_subvalue_path` components containing whitespace or punctuation must be surrounded by backticks (\`).
* SingleStore supports loading entire `LIST`, `MAP`, or `GROUP` structures into JSON or BSON columns. However, extracting individual elements from `LIST`, `MAP` , or `GROUP` types is not supported. To access specific elements, load the full structure into a JSON column and use JSON functions in the queries.
* If an optional field along a `parquet_subvalue_path` is omitted in a given record, the extracted value for that column will be NULL.
* The `parquet_subvalue_mapping` must contain all columns from the SingleStore table into which the data is being loaded except for columns that have a default value.
* Parquet column names are case-sensitive.

## Load Nested Parquet Types

SingleStore supports loading Parquet nested types (`LIST`, `MAP`, and `GROUP`) into JSON or BSON columns.

## Supported Nested Types

| Parquet Type      | SingleStoreColumn Type | JSON Representation                                          |
| ----------------- | ---------------------- | ------------------------------------------------------------ |
| `LIST<type>`      | JSON or BSON           | Array:`[value1, value2, ...]`                                |
| `MAP<key, value>` | JSON or BSON           | Array of key-value objects:`[{"key": k1, "value": v1}, ...]` |
| `GROUP`           | JSON or BSON           | Object:`{"field1": value1, "field2": value2}`                |

## Parquet Types and Conversions

The tables below show recommended SingleStore [Data Types](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-types.md) and recommended conversions (if applicable) for storing Parquet Types in SingleStore. The recommended conversions can be applied using a `SET` clause. Examples of using the `SET` clause can be found in [Parquet Logical Datetime Types](https://docs.singlestore.com/#section-idm23453405414882.md).

## Parquet Basic Types and Value Conversions

Values obtained from Parquet files are converted to values stored in tables in SingleStore. That is, a value from a Parquet file must be converted to the type of the column to which that value is mapped in the `parquet_subvalue_mapping`.

The conversion from a Parquet type to a SingleStore type occurs in two parts. The system first converts a Parquet value to a temporary value as specified in the table below. Then, if the user does not specify a conversion, this temporary value is converted to the value in the SingleStore in accordance with the value conversions for the type of the (SingleStore) column. These conversions are specified on the relevant data type page. Alternatively, the user may specify a conversion using a `SET` clause.

A Parquet `INT32` being loaded into a SingleStore `DOUBLE` column is first converted to the string representation of that integer, then the string is converted to a `DOUBLE` based on the conversions specified on the SingleStore `DOUBLE` page.

The table below lists the Parquet Data Type and the temporary value to which that Parquet type is initially converted. In addition, a recommended SingleStore data type is listed; however, it is not necessary to use the recommended type.

| Parquet Data Type      | Temporary Value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | RecommendedSingleStoreData Type |
| ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------- |
| `BOOLEAN`              | "1"/"0"                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   | `BOOL`                          |
| `INT32`                | The string representation of the integer.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | `INT`                           |
| `INT64`                | The string representation of the integer.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | `BIGINT`                        |
| `INT96`                | Parquet datetime values of`INT96`type will be converted to`DATETIME(6)`values as if they are datetime literals which have been truncated to microsecond precision. The underlying value will be converted to theSingleStorecluster’s time zone according to the`TIMEZONE`clause, with a default heuristic in place when the clause is omitted.Refer to[Timezones](https://docs.singlestore.com/#section-idm234534061824633.md)and[Debugging Datetime Conversions](https://docs.singlestore.com/#section-idm234534081881644.md)for additional information. | `DATETIME(6)`                   |
| `FLOAT`                | A string convertible without loss of precision to`FLOAT`.SQL-NULL if value is not finite.                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | `FLOAT`                         |
| `DOUBLE`               | A string convertible without loss of precision to`DOUBLE`.SQL-NULL if value is not finite.                                                                                                                                                                                                                                                                                                                                                                                                                                                                | `DOUBLE`                        |
| `BYTE_ARRAY`           | Verbatim, from input bytes.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               | `LONGBLOB`                      |
| `FIXED_LEN_BYTE_ARRAY` | Verbatim, from input bytes.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               | `BINARY(L)`                     |

## Parquet Logical Types

[Logical types](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md) are used in Parquet to extend the types that Parquet can be used to store and are implemented with annotations. Some Parquet logical type annotations are interpreted by SingleStore as described in the table below. Parquet logical datetime types are addressed in the section below.

| Parquet Logical Data Type  | RecommendedSingleStoreData Type | Notes                                                                                                                                                                                                           |
| -------------------------- | ------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `STRING`                   | `TEXT`,`VARCHAR`                | Annotation not interpreted bySingleStore.                                                                                                                                                                       |
| `DECIMAL`                  | `DECIMAL`                       | A decimal annotation on a binary or`fixed_len_byte_array`type will cause the value in the Parquet file to be converted to a numeric literal compatible with a SQL`DECIMAL`type of the same scale and precision. |
| `INT(8/16/32, true/false)` | `INT`/`INT UNSIGNED`            | Unsigned integer annotations on Parquet integer types are respected.                                                                                                                                            |
| `INT(64, true/false)`      | `BIGINT`/`BIGINT UNSIGNED`      | Unsigned integer annotations on Parquet integer types are respected.                                                                                                                                            |

## Parquet Logical Datetime Types

Parquet stores datetime values as integers with logical type annotations. These logical datetime annotations are not interpreted by SingleStore thus conversions, using `SET`, may be necessary to load datetime values from Parquet into SingleStore.

Datetimes in SingleStore are stored in microseconds for `DATETIME(6)` and milliseconds for `DATETIME`.

The table below shows Parquet logical datetime types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a `SET` clause.

The timestamp conversion below works for Parquet timestamp values stored as integers in units of microseconds. For timestamp values in milliseconds or nanoseconds the `@ts` value can be adjusted by multiplying or dividing by 1000. For timestamps stored as strings in the Parquet file, there is no need to apply a conversion.

| Parquet Logical Data Type | RecommendedSingleStoreData Type | Recommended Conversion                                  |
| ------------------------- | ------------------------------- | ------------------------------------------------------- |
| `DATE`                    | `DATE`                          | `DATE_ADD('1970-01-01', INTERVAL @date DAY)`            |
| `TIMESTAMP`               | `DATETIME(6)`                   | `DATE_ADD('1970-01-01', INTERVAL @ts/1000 MICROSECOND)` |

## Timezones

Parquet datetime values of `INT96` type will be converted to SingleStore `DATETIME(6)` values as if they are datetime literals which have been truncated to microsecond precision. The underlying value will be converted to the SingleStore cluster’s time zone according to the `TIMEZONE` clause, with a default heuristic in place when the clause is omitted.

Timezone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as `INT96`. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files do not provide definitive information about the timezone of `INT96` data.

The `TIMEZONE` clause can be used to specify the timezone of a datetime value stored in Parquet to enable appropriate conversion to SingleStore.

When the `TIMEZONE` clause is omitted, SingleStore will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Using the `TIMEZONE` clause will disable this behavior. When the `TIMEZONE` clause is provided, SingleStore will assume that encoded data is in the specified time zone and will convert it to micro/milli-seconds from Unix time in the SingleStore cluster timezone.

The following validity checks are completed and errors occur:

* Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted.
* If the pipeline is performing time zone conversions, then the valid range is restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore cluster time zone.
* No automatic integer-to-datetime conversion occurs for time values encoded as `INT64` or `INT32`, even if the logical type is a time type like timestamp. Use the `SET` clause to explicitly convert such values to a `DATETIME` compatible form.

Refer to Debugging Datetime Conversions for additional information.

## Examples

## Example - parquet\_subvalue\_mapping

Consider a Parquet record with the following schema:

```
message m {
 required int64 field1;
 optional group group1 {
   optional int64 field2;
   optional int64 field3;
 }
}

```

Consider the following instance of this schema represented in JSON:

```json
{"field1":1, 
  {"group1":
    {"field2":2, 
     "field3":None}
   }
}
```

The following parquet\_subvalue\_path

```sql
group1::field2, group1::field3  
```

will extract the values `2` and `NULL`.

Consider the following instance of this schema in JSON:

```json
{"field1":1, 
  {"group1":None}
}
```

In this case, the parquet\_subvalue\_path will extract the values NULL and NULL.

## Example - Nested Parquet Schema

Consider the following Parquet schema:

```
message m1 {
 required boolean f1;
 required fixed_len_byte_array(4) f2;
 optional group g1 {
   optional binary f3 (STRING);
   required int64 f4;
 }
}

```

Consider a Parquet file with four records with the following JSON representation:

```json
{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}
```

Create a table which can be used to store this data:

```sql
CREATE TABLE parquet_tbl(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);
```

Create a pipeline that ingests the created file into `parquet_tbl`.

```sql
CREATE PIPELINE parquet_pipe 
AS LOAD DATA S3 's3://test/output.parquet'
CONFIG '{"region":"<region>"}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
             "aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE parquet_tbl 
 (@v1 <- f1,
 @v2 <- f2,
 c3 <- g1::f3,
 c4 <- g1::f4)
 FORMAT PARQUET
 SET c2 = UPPER(CONVERT(@v2, CHAR))
 WHERE @v1 = TRUE;
```

Test the pipeline:

```sql
TEST PIPELINE parquet_pipe;


```

```output

+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

```

Note the following about the output of `TEST PIPELINE parquet_exp`;

* The first record in the example.parquet .json representation was not included in the output because the `WHERE` clause, using the temporary variable `v1`, filters out rows where `f1` is false.
* The second record of the output contains NULL for columns `c3` and `c4` because the optional group `g1` is null in that record in the .json representation.
* The third record of the output contains NULL for column `c3` because the optional field `g1::f3` is null in that record in the .json representation.
* The column `c2` in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable `f2` , which is set to the value of `v2` in each record in the .json representation.

## Example - Load Nested Types as JSON

Consider the following Parquet schema:

```
message m1 {
 required boolean f1;
 required fixed_len_byte_array(4) f2;
 optional group g1 {
   optional binary f3 (STRING);
   required int64 f4;
 }
}

```

Consider a Parquet file with four records with the following JSON representation:

```json
{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}
```

Create a table which can be used to store this data:

```sql
CREATE TABLE parquet_tbl_json (c1 BOOLEAN,c2 TEXT,g1_data JSON);

```

In this table, the `g1_data` column stores the entire Parquet `g1` `GROUP` as JSON.

Create a pipeline that ingests the created file into `parquet_tbl_json`:

```sql
CREATE PIPELINE parquet_pipe_json AS
LOAD DATA S3 's3://test/output.parquet'
CONFIG '{"region":"<region>"}'
CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>",
             "aws_secret_access_key":"<your_secret_access_key>"}'
INTO TABLE parquet_tbl_json
(
  @v1     <- f1,
  @v2     <- f2,
  g1_data <- g1
)
FORMAT PARQUET
SET
  c1 = @v1,
  c2 = CONVERT(@v2, CHAR)
WHERE @v1 = TRUE;

```

This mapping loads the complete `g1` `GROUP` structure into the `g1_data` JSON column.

Test the pipeline:

```sql
TEST PIPELINE parquet_pipe_json;

```

```output

+------+------+------------------------+
| c1   | c2   | g1_data                |
+------+------+------------------------+
| true | rec2 | NULL                   |
| true | rec3 | {"f3":null,"f4":3}     |
| true | rec4 | {"f3":"four","f4":4}   |
+------+------+------------------------+

```

Note the following about the output of `TEST PIPELINE parquet_pipe_json`:

* The first record in the Parquet file was not included in the output because the `WHERE` clause filters out rows where `f1` is false.
* The second record of the output contains `NULL` for column `g1_data` because the optional group `g1` is null in that record in the JSON representation.
* The third record of the output contains a JSON object with `"f3": null` and `"f4": 3`. This is because the group `g1` exists in that record, but the optional field `f3` is null.
* The fourth record of the output contains a JSON object with both `"f3"` and `"f4"` populated. This is because the group `g1` and all its fields are present in that record.
* The column `g1_data` stores the entire `g1` group as a JSON object. This allows you to query nested fields later using JSON functions.

## Example – Load a Parquet LIST

Consider the following Parquet schema:Consider the following Parquet schema:

```
message m1 {
  required binary firstname (STRING);
  required binary lastname (STRING);
  optional group scores (LIST) {
    repeated group list {
      required int32 element;
    }
  }
}

```

Consider a Parquet file with three records expressed in JSON form:

```json
{"firstname":"John","lastname":"Doe","scores":[1,2,3]}
{"firstname":"Jane","lastname":"Smith","scores":[4,5]}
{"firstname":"Bob","lastname":"Lee","scores":null}
```

Create a table to store this data:

```sql
CREATE TABLE employees_json (firstname TEXT,lastname TEXT,scores JSON);
```

Create a pipeline that ingests the Parquet file into `employees_json`:

```sql
CREATE PIPELINE load_employees_json
AS LOAD DATA S3 's3://test/employees.parquet'
CONFIG '{"region":"<region>"}'
CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>",
             "aws_secret_access_key":"<your_secret_access_key>"}'
INTO TABLE employees_json
(
  firstname <- firstname,
  lastname  <- lastname,
  scores    <- scores
)
FORMAT PARQUET;

```

Test the pipeline:

```sql
TEST PIPELINE load_employees_json;

```

```output

+-----------+----------+------------+
| firstname | lastname | scores     |
+-----------+----------+------------+
| John      | Doe      | [1,2,3]    |
| Jane      | Smith    | [4,5]      |
| Bob       | Lee      | NULL       |
+-----------+----------+------------+

```

Note the following about the output of `TEST PIPELINE load_employees_json`:

* The `scores` column is a JSON column that stores the entire Parquet `LIST` field as a JSON array.
* If the list is `NULL` in the Parquet file, the JSON column also becomes `NULL`.

## Example – Load a Parquet MAP

Consider the following Parquet schema:

```
message m1 {
  required binary firstname (STRING);
  required binary lastname (STRING);

  optional group scores (LIST) {
    repeated group list {
      required int32 element;
    }
  }

  optional group book_report (MAP) {
    repeated group key_value {
      required int32 key;
      optional binary value (STRING);
    }
  }
}

```

Consider a Parquet file with three records expressed in JSON form:

```json
{"firstname":"John","lastname":"Doe","scores":[1,2,3],"book_report":[[1435,"A"],[2563,"B"]]}
{"firstname":"Jane","lastname":"Smith","scores":[4,5],"book_report":[[3721,"C"]]}
{"firstname":"Bob","lastname":"Lee","scores":null,"book_report":null}

```

Create a table to store this data:

```sql
CREATE TABLE employees_with_report (
  firstname TEXT,
  lastname TEXT,
  scores JSON,
  book_report JSON
);

```

Create a pipeline that ingests the Parquet file into `load_employees_with_report`:

```sql
CREATE PIPELINE load_employees_with_report
AS LOAD DATA S3 's3://test/employees_with_report.parquet'
CONFIG '{"region":"us-east-1"}'
CREDENTIALS '{
  "aws_access_key_id":"your_access_key_id",
  "aws_secret_access_key":"your_secret_access_key"
}'
INTO TABLE employees_with_report
  firstname <- firstname,
  lastname <- lastname,
  scores <- scores,
  book_report <- book_report
FORMAT PARQUET;
```

Test the pipeline:

```sql
TEST PIPELINE load_employees_with_report;

```

```output

+-----------+----------+---------+---------------------------------------------------------------+
| firstname | lastname | scores  | book_report                                                   |
+-----------+----------+---------+---------------------------------------------------------------+
| John      | Doe      | [1,2,3] | [{"key":1435,"value":"A"},{"key":2563,"value":"B"}]           |
| Jane      | Smith    | [4,5]   | [{"key":3721,"value":"C"}]                                    |
| Bob       | Lee      | null    | null                                                          |
+-----------+----------+---------+---------------------------------------------------------------+

```

Note the following about the output of `TEST PIPELINE load_employees_with_report`:

* The `book_report` `MAP` converts to a JSON array of objects.
* Each object contains `"key"` and `"value"` fields.
* This behavior differs from `LIST` conversion, which produces a simple JSON array.
* If the `MAP` is `NULL` in the Parquet file, the JSON column is also `NULL`.

## Example - Load Datetime Value

The example below shows converting a datetime stored in nanoseconds in Parquet, to a SingleStore `DATETIME(6)` value. Section [Generate a Parquet File](https://docs.singlestore.com/#section-idm234534086766775.md), creates a Parquet file with datetimes stored as integers in nanoseconds from Unix start time which can be used for this example.

Create the table.

```sql
CREATE TABLE books (
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME(6));
```

Load the data.

```sql
CREATE PIPELINE parquet_books_pipe
AS LOAD DATA S3 's3://singlestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"us-east-1"}'
INTO TABLE books
     (Id <- Id_pqt,
     Name <- Name_pqt,
     NumPages <- NumPages_pqt,
     Rating <- Rating_pqt,
     @ts <- PublishTimestamp_pqt)
FORMAT PARQUET
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts/1000 MICROSECOND);

```

If the timestamp is stored in the Parquet file as a string, omit the `SET` clause and replace `@ts<-PublishTimestamp_pqt` with `PublishTimestamp <-> PublishTimestamp_pqt`.

Test the pipeline.

```sql
TEST PIPELINE parquet_books_pipe;

```

```output

+------+--------------------+----------+--------+----------------------------+
| Id   | Name               | NumPages | Rating | PublishTimestamp           |
+------+--------------------+----------+--------+----------------------------+
|    1 | Happy Place        |      400 |    4.9 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+
```

## Example - Set Timezone

The following SQL shows the TIMEZONE clause. Refer to [CONVERT\_TZ](https://docs.singlestore.com/db/v9.1/reference/sql-reference/date-and-time-functions/convert-tz.md) for information about supported timezones.

```sql
CREATE PIPELINE parquet_books_pipe_tz
AS LOAD DATA S3 's3://singlestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"us-east-1"}'
INTO TABLE books
     (Id <- Id_pqt,
     Name <- Name_pqt,
     NumPages <- NumPages_pqt,
     Rating <- Rating_pqt,
     @ts <- PublishTimestamp_pqt)
FORMAT PARQUET TIMEZONE 'SYSTEM'
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts/1000 MICROSECOND);

```

Test the pipeline.

```sql
TEST PIPELINE parquet_books_pipe_tz;

```

```output

+------+--------------------+----------+--------+----------------------------+
| Id   | Name               | NumPages | Rating | PublishTimestamp           |
+------+--------------------+----------+--------+----------------------------+
|    1 | Happy Place        |      400 |    4.9 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+

```

## Debugging Datetime Conversions

Datetime value conversion can be debugged by loading the raw datetime from Parquet into a `BLOB`. The example below creates the books table with an extra attribute RawTimestamp into which the raw Parquet timestamp data will be loaded. If the datetimes may be stored as strings, try loading the raw values into a `TEXT` attribute in addition to or instead of using a `BLOB` attribute.

```sql
CREATE TABLE books_debug(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME(6),
RawTimestamp BLOB);
```

The `PIPELINE` statement below loads the `publishTimestamp` from the Parquet file into both the `PublishTimestamp` column and the `RawTimestamp` column. A `SET` statement is used to convert the `publishTimestamp` from Parquet to the `PublishTimestamp` in SingleStore.

The column names on the left side of the `<-` are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the `<-` are the column names from the Parquet file which is to be loaded into SingleStore.

```sql
CREATE PIPELINE parquet_books_debug_pipe
AS LOAD DATA S3 's3://singlestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"us-east-1"}'
INTO TABLE books_debug
     (Id <- Id_pqt,
     Name <- Name_pqt,
     NumPages <- NumPages_pqt,
     Rating <- Rating_pqt,
     @ts <- PublishTimestamp_pqt,
     RawTimestamp<-PublishTimestamp_pqt)
FORMAT PARQUET
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts/1000 MICROSECOND);

```

Test the Pipeline. The `PublishTimestamp` column shows the converted timestamp, the `RawTimestamp` column contains the timestamp from Parquet, in nanoseconds in this example. The combination of these two columns can be used to debug the conversion used in the `SET` statement.

```sql
TEST PIPELINE parquet_books_debug_pipe;

```

```output

+------+--------------------+----------+--------+---------------------+-------+---------------------+
| Id   | Name               | NumPages | Rating | RawTimestamp        | PublishTimestamp           |
+------+--------------------+----------+--------+---------------------+-------+---------------------+
|    1 | Happy Place        |      400 |    4.9 | 1680696000000000000 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 1669636800000000000 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 1591099200000000000 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+---------------------+-------+---------------------+
```

Once a table and a pipeline is not needed, it can be dropped.

```sql
DROP PIPELINE parquet_books_pipe;
DROP PIPELINE parquet_books_pipe_tz;
DROP PIPELINE parquet_books_debug_pipe;

DROP TABLE books;
DROP TABLE books_debug;
```

## Generate a Parquet File

Generate a file in Parquet format which can be used in the examples above. 

The Parquet file will be generated to match the schema of the `books` table, created above. The `CREATE TABLE` statement is repeated here for convenience.

```sql
CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);
```

Create and activate a Python virtual environment to use for generating the code.

```shell
python3 -m venv /path/to/virtual_environment

source /path/to/virtual_environment/bin/activate
```

Install the pandas and pyarrow libraries for Parquet file support:

```shell
pip3 install -q pandas pyarrow
```

The following python code creates a Parquet file that contains data that matches the structure of the books table. Run this code in the Python interpreter, or create a file containing this code and run it.

Start the Python interpreter.

```shell
python3
```

Run the following code in the python interpreter. This code will create a file named books.parquet.

```python
import pandas as pd
import datetime as dt

day1 = dt.datetime.fromisoformat('2023-04-05 12:00')
day2 = dt.datetime.fromisoformat('2022-11-28 12:00')
day3 = dt.datetime.fromisoformat('2020-06-02 12:00')

# Provided data
data = [
   (1, 'Happy Place', 400, 4.9, day1),
   (2, 'Legends & Lattes', 304, 4.9, day2),
   (3, 'The Vanishing Half', 352, 4.9, day3)]

# Define column names
columns = ['Id_pqt', 'Name_pqt', 'NumPages_pqt','Rating_pqt', 'PublishTimestamp_pqt']

# Create a DataFrame
df = pd.DataFrame(data, columns=columns)

# Save DataFrame to Parquet file
df.to_parquet('books.parquet', index=False)

```

***

Modified at: May 11, 2026

Source: [/db/v9.1/load-data/load-data-from-files/load-data-from-parquet-files/create-a-parquet-pipeline/](https://docs.singlestore.com/db/v9.1/load-data/load-data-from-files/load-data-from-parquet-files/create-a-parquet-pipeline/)

(An index of the documentation is available at /llms.txt)
