Create a Parquet Pipeline

The CREATE PIPELINE .. FORMAT PARQUET statement can be used to load Parquet files into SingleStore. This command extracts specific fields from records in Apache Parquet files and loads those into tables in SingleStore. The CREATE PIPELINE statement assigns the extracted fields to the columns of a new row to be inserted into table_name or passed to proc_name. The fields can also be assigned to temporary values in the SET clause, for use in SQL transformations.

  • Rows that don’t match the WHERE clause are not included.

  • Parquet pipelines do not support Kafka.

When you write a CREATE PIPELINE .. FORMAT PARQUET statement, you include the LOAD DATA clause. This clause supports a subset of the error recovery options that are supported by CSV LOAD DATA.

Caution

Parquet files that are compressed using the internal Parquet writer should not have a .gz extension in the file name.

Note

If you are loading .parquet data from an S3 bucket, ensure that non-empty _SUCCESS, _committed, and _started files in the S3 folder are not deleted before the data in the files is loaded into the destination table.

Syntax

CREATE [OR REPLACE] PIPELINE <pipeline_name> AS
LOAD DATA  <configuration> 
INTO TABLE <table_name> 
FORMAT PARQUET
<parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']
[SET  <column_name> = <expression>,... ]
[WHERE <expression>,...]

<parquet_subvalue_mapping>:
    ( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)

<parquet_subvalue_path>:
    {ident [::ident ...]}

The configuration is a configuration specification for loading from various data sources. Refer to CREATE PIPELINE for details.

Additional options supported by CREATE PIPELINE and LOAD DATA are supported.

parquet_subvalue_mapping

The parquet_subvalue_mapping specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET clause. The parquet_subvalue_mapping consists of a list of pairs of a column name or a temporary variable name and a parquet_subvalue_path. A parquet_subvalue_path is a list of :: -separated list of field names which are used to perform successive field name lookups in nested Parquet schemas.

The following rules apply:

  • The last field in parquet_subvalue_path must be a primitive type and must not be a Parquet nested type (also referred to as a group type). Refer to Parquet Logical Types for more information on Parquet nested types.

  • There must be no fields with a repeated type along the parquet_subvalue_path.

  • All parquet_subvalue_path components containing whitespace or punctuation must be surrounded by backticks (`).

  • The parquet_subvalue_path may not contain Parquet nested types (list or map types).

  • Extracting whole Parquet lists or maps as JSON is unsupported, as is extracting individual elements from such nested types.

  • If an optional field along a parquet_subvalue_path is omitted in a given record, the extracted value for that column will be NULL.

  • The parquet_subvalue_mapping must contain all columns from the SingleStore table into which the data is being loaded except for columns that have a default value.

  • Parquet column names are case-sensitive.

Parquet Types and Conversions

The tables below show recommended SingleStore Data Types and recommended conversions (if applicable) for storing Parquet Types in SingleStore. The recommended conversions can be applied using a SET clause. Examples of using the SET clause can be found in Parquet Logical Datetime Types.

Parquet Basic Types and Value Conversions

Values obtained from Parquet files are converted to values stored in tables in SingleStore. That is, a value from a Parquet file must be converted to the type of the column to which that value is mapped in the parquet_subvalue_mapping.

The conversion from a Parquet type to a SingleStore type occurs in two parts. The system first converts a Parquet value to a temporary value as specified in the table below. Then, if the user does not specify a conversion, this temporary value is converted to the value in the SingleStore in accordance with the value conversions for the type of the (SingleStore) column. These conversions are specified on the relevant data type page. Alternatively, the user may specify a conversion using a SET clause.

A Parquet INT32 being loaded into a SingleStore DOUBLE column is first converted to the string representation of that integer, then the string is converted to a DOUBLE based on the conversions specified on the SingleStore DOUBLE page.

The table below lists the Parquet Data Type and the temporary value to which that Parquet type is initially converted. In addition, a recommended SingleStore data type is listed; however, it is not necessary to use the recommended type.

Parquet Data Type

Temporary Value

Recommended SingleStore Data Type

BOOLEAN

"1"/"0"

BOOL

INT32

The string representation of the integer.

INT

INT64

The string representation of the integer.

BIGINT

INT96

Parquet datetime values of INT96 type will be converted to DATETIME(6) values as if they are datetime literals which have been truncated to microsecond precision. The underlying value will be converted to the SingleStore workspace’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Refer to Timezones and Debugging Datetime Conversions for additional information.

DATETIME(6)

FLOAT

A string convertible without loss of precision to FLOAT.

SQL-NULL if value is not finite.

FLOAT

DOUBLE

A string convertible without loss of precision to DOUBLE.

SQL-NULL if value is not finite.

DOUBLE

BYTE_ARRAY

Verbatim, from input bytes.

LONGBLOB

FIXED_LEN_BYTE_ARRAY

Verbatim, from input bytes.

BINARY(L)

Parquet Logical Types

Logical types are used in Parquet to extend the types that Parquet can be used to store and are implemented with annotations. Some Parquet logical type annotations are interpreted by SingleStore as described in the table below. Parquet logical datetime types are addressed in the section below.

Parquet Logical Data Type

Recommended SingleStore Data Type

Notes

STRING

TEXT, VARCHAR

Annotation not interpreted by SingleStore.

DECIMAL

DECIMAL

A decimal annotation on a binary or fixed_len_byte_array type will cause the value in the Parquet file to be converted to a numeric literal compatible with a SQL DECIMAL type of the same scale and precision.

INT(8/16/32, true/false)

INT / INT UNSIGNED

Unsigned integer annotations on Parquet integer types are respected.

INT(64, true/false)

BIGINT / BIGINT UNSIGNED

Unsigned integer annotations on Parquet integer types are respected.

Parquet Logical Datetime Types

Parquet stores datetime values as integers with logical type annotations. These logical datetime annotations are not interpreted by SingleStore thus conversions, using SET, may be necessary to load datetime values from Parquet into SingleStore.

Datetimes in SingleStore are stored in microseconds for DATETIME(6) and milliseconds for DATETIME.

The table below shows Parquet logical datetime types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET clause.

The timestamp conversion below works for Parquet timestamp values stored as integers in units of microseconds. For timestamp values in milliseconds or nanoseconds the @ts value can be adjusted by multiplying or dividing by 1000. For timestamps stored as strings in the Parquet file, there is no need to apply a conversion.

Parquet Logical Data Type

Recommended SingleStore Data Type

Recommended Conversion

DATE

DATE

DATE_ADD('1970-01-01', INTERVAL @date DAY)

TIMESTAMP

DATETIME(6)

DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND)

Timezones

Parquet datetime values of INT96 type will be converted to SingleStore DATETIME(6) values as if they are datetime literals which have been truncated to microsecond precision. The underlying value will be converted to the SingleStore workspace’s time zone according to the TIMEZONE clause, with a default heuristic in place when the clause is omitted.

Timezone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as INT96. Others, including Impala, perform no conversion and write values as they appear in the writer’s local time zone. Parquet files do not provide definitive information about the timezone of INT96 data.

The TIMEZONE clause can be used to specify the timezone of a datetime value stored in Parquet to enable appropriate conversion to SingleStore.

When the TIMEZONE clause is omitted, SingleStore will attempt to automatically perform time zone conversions based on imperfect information in file metadata. This default heuristic may produce incorrect results. Using the TIMEZONE clause will disable this behavior. When the TIMEZONE clause is provided, SingleStore will assume that encoded data is in the specified time zone and will convert it to micro/milli-seconds from Unix time in the SingleStore workspace timezone.

The following validity checks are completed and errors occur:

  • Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted.

  • If the pipeline is performing time zone conversions, then the valid range is restricted to times between 1970-01-01 and 2038-01-19 03:14:07. The validity check is performed after converting to the SingleStore workspace time zone.

  • No automatic integer-to-datetime conversion occurs for time values encoded as INT64 or INT32, even if the logical type is a time type like timestamp. Use the SET clause to explicitly convert such values to a DATETIME compatible form.

Refer to Debugging Datetime Conversions for additional information.

Examples

Example - parquet_subvalue_mapping

Consider a Parquet record with the following schema:

message m {
 required int64 field1;
 optional group group1 {
   optional int64 field2;
   optional int64 field3;
 }
}

Consider the following instance of this schema represented in JSON:

{"field1":1,
{"group1":
{"field2":2,
"field3":None}
}
}

The following parquet_subvalue_path

group1::field2, group1::field3 

will extract the values 2 and NULL.

Consider the following instance of this schema in JSON:

{"field1":1,
{"group1":None}
}

In this case, the parquet_subvalue_path will extract the values NULL and NULL.

Example - Nested Parquet Schema

Consider the following Parquet schema:

message m1 {
 required boolean f1;
 required fixed_len_byte_array(4) f2;
 optional group g1 {
   optional binary f3 (STRING);
   required int64 f4;
 }
}

Consider a Parquet file with four records with the following JSON representation:

{"f1": false, "f2": "rec1", "g1": null}
{"f1": true, "f2": "rec2", "g1": null}
{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}
{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}

Create a table which can be used to store this data:

CREATE TABLE parquet_tbl(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);

Create a pipeline that ingests the created file into parquet_tbl.

CREATE PIPELINE parquet_pipe
AS LOAD DATA S3 's3://test/output.parquet'
CONFIG '{"region":"<region>"}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE parquet_tbl
(@v1 <- f1,
@v2 <- f2,
c3 <- g1::f3,
c4 <- g1::f4)
FORMAT PARQUET
SET c2 = UPPER(CONVERT(@v2, CHAR))
WHERE @v1 = TRUE;

Test the pipeline:

TEST PIPELINE parquet_pipe;
+------+------+------+
| c3   | c4   | c2   |
+------+------+------+
| NULL | NULL | REC2 |
| NULL |    3 | REC3 |
| four |    4 | REC4 |
+------+------+------+

Note the following about the output of TEST PIPELINE parquet_exp;

  • The first record in the example.parquet .json representation was not included in the output because the WHERE clause, using the temporary variable v1, filters out rows where f1 is false.

  • The second record of the output contains NULL for columns c3 and c4 because the optional group g1 is null in that record in the .json representation.

  • The third record of the output contains NULL for column c3 because the optional field g1::f3 is null in that record in the .json representation.

  • The column c2 in the output contains uppercase values. This is because the column is set to the uppercase value of the temporary variable f2 , which is set to the value of v2 in each record in the .json representation.

Example - Load Datetime Value

The example below shows converting a datetime stored in nanoseconds in Parquet, to aSingleStore DATETIME(6) value. Section Generate a Parquet File, creates a Parquet file with datetimes stored as integers in nanoseconds from Unix start time which can be used for this example.

Create the table.

CREATE TABLE books (
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME(6));

Load the data.

CREATE PIPELINE parquet_books_pipe
AS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"<region>us-east-1"}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books
(Id <- Id_pqt,
Name <- Name_pqt,
NumPages <- NumPages_pqt,
Rating <- Rating_pqt,
@ts <- PublishTimestamp_pqt)
FORMAT PARQUET
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);

If the timestamp is stored in the Parquet file as a string, omit the SET clause and replace @ts<-PublishTimestamp_pqt with PublishTimestamp <-> PublishTimestamp_pqt.

Test the pipeline.

TEST PIPELINE parquet_books_pipe;
+------+--------------------+----------+--------+----------------------------+
| Id   | Name               | NumPages | Rating | PublishTimestamp           |
+------+--------------------+----------+--------+----------------------------+
|    1 | Happy Place        |      400 |    4.9 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+

Once a table is not needed, it can be deleted.

DELETE FROM books;

Example - Set Timezone

The following SQL shows the TIMEZONE clause. Refer to CONVERT_TZ for information about supported timezones.

CREATE PIPELINE parquet_books_pipeparquet_books_pipe_tzparquet_books_pipe_tz
AS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"<region>us-east-1"}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books
(Id <- Id_pqt,
Name <- Name_pqt,
NumPages <- NumPages_pqt,
Rating <- Rating_pqt,
@ts <- PublishTimestamp_pqt)
FORMAT PARQUET TIMEZONE 'SYSTEM'
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);

Test the pipeline.

TEST PIPELINE parquet_books_pipe_tz;
+------+--------------------+----------+--------+----------------------------+
| Id   | Name               | NumPages | Rating | PublishTimestamp           |
+------+--------------------+----------+--------+----------------------------+
|    1 | Happy Place        |      400 |    4.9 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+

Debugging Datetime Conversions

Datetime value conversion can be debugged by loading the raw datetime from Parquet into a BLOB. The example below creates the books table with an extra attribute RawTimestamp into which the raw Parquet timestamp data will be loaded. If the datetimes may be stored as strings, try loading the raw values into a TEXT attribute in addition to or instead of using a BLOB attribute.

CREATE TABLE books_debug(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PublishTimestamp DATETIME(6),
RawTimestamp BLOB);

The PIPELINE statement below loads the publishTimestamp from the Parquet file into both the PublishTimestamp column and the RawTimestamp column. A SET statement is used to convert the publishTimestamp from Parquet to the PublishTimestamp in SingleStore.

The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the <- are the column names from the Parquet file which is to be loaded into SingleStore.

CREATE PIPELINE parquet_books_debug_pipe
AS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'
CONFIG '{"region":"<region>us-east-1"}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books_debug
(Id <- Id_pqt,
Name <- Name_pqt,
NumPages <- NumPages_pqt,
Rating <- Rating_pqt,
@ts <- PublishTimestamp_pqt,
RawTimestamp<-PublishTimestamp_pqt)
FORMAT PARQUET
SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);

Test the Pipeline. The PublishTimestamp column shows the converted timestamp, the RawTimestamp column contains the timestamp from Parquet, in nanoseconds in this example. The combination of these two columns can be used to debug the conversion used in the SET statement.

SELECT *
FROM books_debug;
TEST PIPELINE parquet_books_debug_pipe;
+------+--------------------+----------+--------+---------------------+-------+---------------------+
| Id   | Name               | NumPages | Rating | RawTimestamp        | PublishTimestamp           | RawTimestamp        |
+------+--------------------+----------+--------+---------------------+-------+---------------------+
|    1 | Happy Place        |      400 |    4.9 | 1680696000000000000 | 2023-04-05 12:00:00.000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 1669636800000000000 | 2022-11-28 12:00:00.000000 |
|    3 | The Vanishing Half |      352 |    4.9 | 1591099200000000000 | 2020-06-02 12:00:00.000000 | 1591099200000000000 |
|    1 | Happy Place        |      400 |    4.9 | 2023-04-05 12:00:00.000000 | 1680696000000000000 |
|    2 | Legends & Lattes   |      304 |    4.9 | 2022-11-28 12:00:00.000000 | 1669636800000000000 |
+------+--------------------+----------+--------+---------------------+-------+---------------------+

Once a table and a pipeline is not needed, it can be dropped.

DROP PIPELINE parquet_books_pipe;
DROP PIPELINE parquet_books_pipe_tz;
DROP PIPELINE parquet_books_debug_pipe;
DROP TABLE books;
DROP TABLE books_debug;

Generate a Parquet File

Generate a file in Parquet format which can be used in the examples above. 

The Parquet file will be generated to match the schema of the books table, created above. The CREATE TABLE statement is repeated here for convenience.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
Publish_date DATETIME);

Create and activate a Python virtual environment to use for generating the code.

python3 -m venv /path/to/virtual_environment
source /path/to/virtual_environment/bin/activate

Install the pandas and pyarrow libraries for Parquet file support:

pip3 install -q pandas pyarrow

The following python code creates a Parquet file that contains data that matches the structure of the books table. Run this code in the Python interpreter, or create a file containing this code and run it.

Start the Python interpreter.

python3

Run the following code in the python interpreter. This code will create a file named books.parquet.

import pandas as pd
import datetime as dt
day1 = dt.datetime.fromisoformat('2023-04-05 12:00')
day2 = dt.datetime.fromisoformat('2022-11-28 12:00')
day3 = dt.datetime.fromisoformat('2020-06-02 12:00')
# Provided data
data = [
(1, 'Happy Place', 400, 4.9, day1),
(2, 'Legends & Lattes', 304, 4.9, day2),
(3, 'The Vanishing Half', 352, 4.9, day3)]
# Define column names
columns = ['Id_pqt', 'Name_pqt', 'NumPages_pqt','Rating_pqt', 'PublishTimestamp_pqt']
# Create a DataFrame
df = pd.DataFrame(data, columns=columns)
# Save DataFrame to Parquet file
df.to_parquet('books.parquet', index=False)

Last modified: December 5, 2024

Was this article helpful?