Create a Parquet Pipeline
On this page
The CREATE PIPELINE .
statement can be used to load Parquet files into SingleStore.CREATE PIPELINE
statement assigns the extracted fields to the columns of a new row to be inserted into table_
or passed to proc_
.SET
clause, for use in SQL transformations.
-
Rows that don’t match the
WHERE
clause are not included. -
Parquet pipelines do not support Kafka.
When you write a CREATE PIPELINE .
statement, you include the LOAD DATA
clause.
Caution
Parquet files that are compressed using the internal Parquet writer should not have a .
Note
If you are loading .
Syntax
CREATE [OR REPLACE] PIPELINE <pipeline_name> AS
LOAD DATA <configuration>
INTO TABLE <table_name>
FORMAT PARQUET
<parquet_subvalue_mapping> [TIMEZONE '<time_zone_name>']
[SET <column_name> = <expression>,... ]
[WHERE <expression>,...]
<parquet_subvalue_mapping>:
( {<column_name> | @<variable_name>} <- <subvalue_path>,, ...)
<parquet_subvalue_path>:
{ident [::ident ...]}
The configuration
is a configuration specification for loading from various data sources.
Additional options supported by CREATE PIPELINE and LOAD DATA are supported.
parquet_ subvalue_ mapping
The parquet_
specifies the mapping between fields in an input Parquet file and columns in the destination table or temporary variables that are specified in the SET
clause.parquet_
consists of a list of pairs of a column name or a temporary variable name and a parquet_
.parquet_
is a list of :: -separated list of field names which are used to perform successive field name lookups in nested Parquet schemas.
The following rules apply:
-
The last field in
parquet_
must be a primitive type and must not be a Parquet nested type (also referred to as a group type).subvalue_ path Refer to Parquet Logical Types for more information on Parquet nested types. -
There must be no fields with a repeated type along the
parquet_
.subvalue_ path -
All
parquet_
components containing whitespace or punctuation must be surrounded by backticks (`).subvalue_ path -
The
parquet_
may not contain Parquet nested types (list or map types).subvalue_ path -
Extracting whole Parquet lists or maps as JSON is unsupported, as is extracting individual elements from such nested types.
-
If an optional field along a
parquet_
is omitted in a given record, the extracted value for that column will be NULL.subvalue_ path -
The
parquet_
must contain all columns from the SingleStore table into which the data is being loaded except for columns that have a default value.subvalue_ mapping -
Parquet column names are case-sensitive.
Parquet Types and Conversions
The tables below show recommended SingleStore Data Types and recommended conversions (if applicable) for storing Parquet Types in SingleStore.SET
clause.SET
clause can be found in Parquet Logical Datetime Types.
Parquet Basic Types and Value Conversions
Values obtained from Parquet files are converted to values stored in tables in SingleStore.parquet_
.
The conversion from a Parquet type to a SingleStore type occurs in two parts.SET
clause.
A Parquet INT32
being loaded into a SingleStore DOUBLE
column is first converted to the string representation of that integer, then the string is converted to a DOUBLE
based on the conversions specified on the SingleStore DOUBLE
page.
The table below lists the Parquet Data Type and the temporary value to which that Parquet type is initially converted.
Parquet Data Type |
Temporary Value |
Recommended SingleStore Data Type |
---|---|---|
|
"1"/"0" |
|
|
The string representation of the integer. |
|
|
The string representation of the integer. |
|
|
Parquet datetime values of Refer to Timezones and Debugging Datetime Conversions for additional information. |
|
|
A string convertible without loss of precision to SQL-NULL if value is not finite. |
|
|
A string convertible without loss of precision to SQL-NULL if value is not finite. |
|
|
Verbatim, from input bytes. |
|
|
Verbatim, from input bytes. |
|
Parquet Logical Types
Logical types are used in Parquet to extend the types that Parquet can be used to store and are implemented with annotations.
Parquet Logical Data Type |
Recommended SingleStore Data Type |
Notes |
---|---|---|
|
|
Annotation not interpreted by SingleStore. |
|
|
A decimal annotation on a binary or |
|
|
Unsigned integer annotations on Parquet integer types are respected. |
|
|
Unsigned integer annotations on Parquet integer types are respected. |
Parquet Logical Datetime Types
Parquet stores datetime values as integers with logical type annotations.SET
, may be necessary to load datetime values from Parquet into SingleStore.
Datetimes in SingleStore are stored in microseconds for DATETIME(6)
and milliseconds for DATETIME
.
The table below shows Parquet logical datetime types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET
clause.
The timestamp conversion below works for Parquet timestamp values stored as integers in units of microseconds.@ts
value can be adjusted by multiplying or dividing by 1000.
Parquet Logical Data Type |
Recommended SingleStore Data Type |
Recommended Conversion |
---|---|---|
|
|
|
|
|
|
Timezones
Parquet datetime values of INT96
type will be converted to SingleStore DATETIME(6)
values as if they are datetime literals which have been truncated to microsecond precision.TIMEZONE
clause, with a default heuristic in place when the clause is omitted.
Timezone conversions may be necessary because some Parquet writers, including Hive, convert time values to UTC before encoding them as INT96
.INT96
data.
The TIMEZONE
clause can be used to specify the timezone of a datetime value stored in Parquet to enable appropriate conversion to SingleStore.
When the TIMEZONE
clause is omitted, SingleStore will attempt to automatically perform time zone conversions based on imperfect information in file metadata.TIMEZONE
clause will disable this behavior.TIMEZONE
clause is provided, SingleStore will assume that encoded data is in the specified time zone and will convert it to micro/milli-seconds from Unix time in the SingleStore workspace timezone.
The following validity checks are completed and errors occur:
-
Times outside years 0001 through 9999 will be converted to NULL and a warning will be emitted.
-
If the pipeline is performing time zone conversions, then the valid range is restricted to times between 1970-01-01 and 2038-01-19 03:14:07.
The validity check is performed after converting to the SingleStore workspace time zone. -
No automatic integer-to-datetime conversion occurs for time values encoded as
INT64
orINT32
, even if the logical type is a time type like timestamp.Use the SET
clause to explicitly convert such values to aDATETIME
compatible form.
Refer to Debugging Datetime Conversions for additional information.
Examples
Example - parquet_ subvalue_ mapping
Consider a Parquet record with the following schema:
message m {
required int64 field1;
optional group group1 {
optional int64 field2;
optional int64 field3;
}
}
Consider the following instance of this schema represented in JSON:
{"field1":1,{"group1":{"field2":2,"field3":None}}}
The following parquet_
group1::field2, group1::field3
will extract the values 2
and NULL
.
Consider the following instance of this schema in JSON:
{"field1":1,{"group1":None}}
In this case, the parquet_
Example - Nested Parquet Schema
Consider the following Parquet schema:
message m1 {
required boolean f1;
required fixed_len_byte_array(4) f2;
optional group g1 {
optional binary f3 (STRING);
required int64 f4;
}
}
Consider a Parquet file with four records with the following JSON representation:
{"f1": false, "f2": "rec1", "g1": null}{"f1": true, "f2": "rec2", "g1": null}{"f1": true, "f2": "rec3", "g1": {"f3": null, "f4": 3}}{"f1": true, "f2": "rec4", "g1": {"f3": "four", "f4": 4}}
Create a table which can be used to store this data:
CREATE TABLE parquet_tbl(c2 BLOB, c3 BLOB, c4 BIGINT UNSIGNED);
Create a pipeline that ingests the created file into parquet_
.
CREATE PIPELINE parquet_pipeAS LOAD DATA S3 's3://test/output.parquet'CONFIG '{"region":"<region>"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE parquet_tbl(@v1 <- f1,@v2 <- f2,c3 <- g1::f3,c4 <- g1::f4)FORMAT PARQUETSET c2 = UPPER(CONVERT(@v2, CHAR))WHERE @v1 = TRUE;
Test the pipeline:
TEST PIPELINE parquet_pipe;
+------+------+------+
| c3 | c4 | c2 |
+------+------+------+
| NULL | NULL | REC2 |
| NULL | 3 | REC3 |
| four | 4 | REC4 |
+------+------+------+
Note the following about the output of TEST PIPELINE parquet_
;
-
The first record in the example.
parquet . json representation was not included in the output because the WHERE
clause, using the temporary variablev1
, filters out rows wheref1
is false. -
The second record of the output contains NULL for columns
c3
andc4
because the optional groupg1
is null in that record in the .json representation. -
The third record of the output contains NULL for column
c3
because the optional fieldg1::f3
is null in that record in the .json representation. -
The column
c2
in the output contains uppercase values.This is because the column is set to the uppercase value of the temporary variable f2
, which is set to the value ofv2
in each record in the .json representation.
Example - Load Datetime Value
The example below shows converting a datetime stored in nanoseconds in Parquet, to aSingleStore DATETIME(6)
value.
Create the table.
CREATE TABLE books (Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PublishTimestamp DATETIME(6));
Load the data.
CREATE PIPELINE parquet_books_pipeAS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'CONFIG '{"region":"<region>us-east-1"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- Id_pqt,Name <- Name_pqt,NumPages <- NumPages_pqt,Rating <- Rating_pqt,@ts <- PublishTimestamp_pqt)FORMAT PARQUETSET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);
If the timestamp is stored in the Parquet file as a string, omit the SET
clause and replace @ts<-PublishTimestamp_
with PublishTimestamp <-> PublishTimestamp_
.
Test the pipeline.
TEST PIPELINE parquet_books_pipe;
+------+--------------------+----------+--------+----------------------------+
| Id | Name | NumPages | Rating | PublishTimestamp |
+------+--------------------+----------+--------+----------------------------+
| 1 | Happy Place | 400 | 4.9 | 2023-04-05 12:00:00.000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 2022-11-28 12:00:00.000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+
Once a table is not needed, it can be deleted.
DELETE FROM books;
Example - Set Timezone
The following SQL shows the TIMEZONE clause.
CREATE PIPELINE parquet_books_pipeparquet_books_pipe_tzparquet_books_pipe_tzAS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'CONFIG '{"region":"<region>us-east-1"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- Id_pqt,Name <- Name_pqt,NumPages <- NumPages_pqt,Rating <- Rating_pqt,@ts <- PublishTimestamp_pqt)FORMAT PARQUET TIMEZONE 'SYSTEM'SET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);
Test the pipeline.
TEST PIPELINE parquet_books_pipe_tz;
+------+--------------------+----------+--------+----------------------------+
| Id | Name | NumPages | Rating | PublishTimestamp |
+------+--------------------+----------+--------+----------------------------+
| 1 | Happy Place | 400 | 4.9 | 2023-04-05 12:00:00.000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 2022-11-28 12:00:00.000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+----------------------------+
Debugging Datetime Conversions
Datetime value conversion can be debugged by loading the raw datetime from Parquet into a BLOB
.TEXT
attribute in addition to or instead of using a BLOB
attribute.
CREATE TABLE books_debug(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PublishTimestamp DATETIME(6),RawTimestamp BLOB);
The PIPELINE
statement below loads the publishTimestamp
from the Parquet file into both the PublishTimestamp
column and the RawTimestamp
column.SET
statement is used to convert the publishTimestamp
from Parquet to the PublishTimestamp
in SingleStore.
The column names on the left side of the <-
are the column names from the SingleStore table into which the data will be loaded.<-
are the column names from the Parquet file which is to be loaded into SingleStore.
CREATE PIPELINE parquet_books_debug_pipeAS LOAD DATA S3 's3://testsinglestoresinglestore-docs-example-datasets/books/books.parquet'CONFIG '{"region":"<region>us-east-1"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books_debug(Id <- Id_pqt,Name <- Name_pqt,NumPages <- NumPages_pqt,Rating <- Rating_pqt,@ts <- PublishTimestamp_pqt,RawTimestamp<-PublishTimestamp_pqt)FORMAT PARQUETSET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);
Test the Pipeline.PublishTimestamp
column shows the converted timestamp, the RawTimestamp
column contains the timestamp from Parquet, in nanoseconds in this example.SET
statement.
SELECT *FROM books_debug;TEST PIPELINE parquet_books_debug_pipe;
+------+--------------------+----------+--------+---------------------+-------+---------------------+
| Id | Name | NumPages | Rating | RawTimestamp | PublishTimestamp | RawTimestamp |
+------+--------------------+----------+--------+---------------------+-------+---------------------+
| 1 | Happy Place | 400 | 4.9 | 1680696000000000000 | 2023-04-05 12:00:00.000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669636800000000000 | 2022-11-28 12:00:00.000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591099200000000000 | 2020-06-02 12:00:00.000000 | 1591099200000000000 |
| 1 | Happy Place | 400 | 4.9 | 2023-04-05 12:00:00.000000 | 1680696000000000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 2022-11-28 12:00:00.000000 | 1669636800000000000 |
+------+--------------------+----------+--------+---------------------+-------+---------------------+
Once a table and a pipeline is not needed, it can be dropped.
DROP PIPELINE parquet_books_pipe;DROP PIPELINE parquet_books_pipe_tz;DROP PIPELINE parquet_books_debug_pipe;DROP TABLE books;DROP TABLE books_debug;
Generate a Parquet File
Generate a file in Parquet format which can be used in the examples above.
The Parquet file will be generated to match the schema of the books
table, created above.CREATE TABLE
statement is repeated here for convenience.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,Publish_date DATETIME);
Create and activate a Python virtual environment to use for generating the code.
python3 -m venv /path/to/virtual_environmentsource /path/to/virtual_environment/bin/activate
Install the pandas and pyarrow libraries for Parquet file support:
pip3 install -q pandas pyarrow
The following python code creates a Parquet file that contains data that matches the structure of the books table.
Start the Python interpreter.
python3
Run the following code in the python interpreter.
import pandas as pdimport datetime as dtday1 = dt.datetime.fromisoformat('2023-04-05 12:00')day2 = dt.datetime.fromisoformat('2022-11-28 12:00')day3 = dt.datetime.fromisoformat('2020-06-02 12:00')# Provided datadata = [(1, 'Happy Place', 400, 4.9, day1),(2, 'Legends & Lattes', 304, 4.9, day2),(3, 'The Vanishing Half', 352, 4.9, day3)]# Define column namescolumns = ['Id_pqt', 'Name_pqt', 'NumPages_pqt','Rating_pqt', 'PublishTimestamp_pqt']# Create a DataFramedf = pd.DataFrame(data, columns=columns)# Save DataFrame to Parquet filedf.to_parquet('books.parquet', index=False)
Last modified: December 5, 2024