Iceberg Ingest
On this page
Apache Iceberg is a table format for large analytic datasets that stores data in immutable open-source file formats including Parquet.
Remarks
The following are supported:
-
Ingest Iceberg Version 1 tables and Iceberg Version 2 tables without deletions with data files in Parquet format.
-
Ingest Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, and JDBC.
-
One-time ingest of Iceberg tables.
When an Iceberg table is ingested, SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment. Subsequent updates to the Iceberg table are not ingested.
Syntax
CREATE [OR REPLACE] PIPELINE <pipeline_name> ASLOAD DATA S3 '[<warehouse_path>]'CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC","table_id": "<table_identifier>",<configuration_json>[, "catalog_name": "<your_catalog_name>" ][, “catalog.<property>” : “property_value” [, …]]}'CREDENTIALS '<credentials_json>’[REPLACE] INTO TABLE <table_name>[ON DUPLICATE KEY UPDATE]<iceberg_subvalue_mapping>FORMAT ICEBERG;<iceberg_subvalue_mapping>:({<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])<iceberg_subvalue_path>: {ident [::ident ...]}
The warehouse_
specifies the path to the catalog warehouse.warehouse_
can be set to an empty string when not needed.
All the data shaping options for Parquet pipelines are supported for Iceberg pipelines.
Catalog Specification
-
The
catalog_
andtype table_
are required for the catalog specification.id -
The
table_
identifies the Iceberg table.id The table identifier is catalog-specific, but is typically in the form: database_ name. table_ name. -
The
catalog_
is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes.name The catalog_
is required for the JDBC catalog and is optional for other catalogs.name -
The
catalog.
is a list of key-value pairs for configuring the catalog connection.property The property and value are passed directly to the Iceberg SDK to establish the catalog connection.
S3 Specification
-
The
configuration_
is a JSON string for S3 configuration parameters such asjson region
,endpoint_
, andurl compatibility_
.mode Refer to CREATE PIPELINE and CREATE LINK for more information. -
Refer to CREATE LINK
Subvalue Mappings
The iceberg_
assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables.
-
The last field in
iceberg_
must be a primitive type.subvalue_ path -
All
iceberg_
components containing whitespace or punctuation must be surrounded by backticks (`).subvalue_ path -
The
iceberg_
may not contain Parquet nested types (list or map types).subvalue_ path Refer to Parquet - Nested Types for more information.
Extracting and Converting Values
Refer to CREATE PIPELINE for details on extracting and converting values from Parquet files.
Enable and Configure Iceberg Ingest
The following variables need to be set to for Iceberg ingest.
SET GLOBAL enable_iceberg_ingest = ON;SET GLOBAL pipelines_extractor_get_offsets_timeout_ms = 90000;SET GLOBAL java_pipelines_heap_size = 100;
The global engine variable enable_
must be set to ON
to use Iceberg ingest.OFF
by default.
SingleStore recommends setting the global engine variable pipelines_
to 90000
to prevent time outs and to provide an adequate amount of time to process the Iceberg catalog.
SingleStore recommends setting the global engine variable java_
to 100
to avoid out of memory errors.
Refer to Engine Variables for more information.
CREATE OR REPLACE - Data Refresh with Upserts
Iceberg pipelines can be used to upsert data from an Iceberg table into a SingleStore table.CREATE OR REPLACE
can be used to consume data that has been added to the Iceberg table.
CREATE OR REPLACE PIPELINE
can be used with REPLACE INTO TABLE
or ON DUPLICATE KEY UPDATE
to perform upserts.PRIMARY KEY
or a UNIQUE
index.
When a pipeline with pipeline_
exists, CREATE OR REPLACE
functions similarly to CREATE_
, but preserves pipeline metadata, including loaded offsets and data files.CREATE OR REPLACE
is run on an existing pipeline, the Iceberg pipeline will retrieve a new snapshot, a new schema, and new data files and will insert data from the new data files to the destination table in SingleStore.
If there has been data compaction or if the Iceberg REPLACE operation was used, the pipeline might attempt to insert the same data so handling duplicates is important.
Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE
, REPLACE INTO TABLE
, and ON DUPLICATE KEY UPDATE
.
Example - Glue Catalog on Amazon S3
An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE.
In this example, a table named books
is created and data from an Iceberg table that meets this schema is loaded into the books
table.
Create the table.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE);
The following data is used for this example.
(1, 'Happy Place', 400, 4.9)(2, 'Legends & Lattes', 304, 4.9)(3, 'The Vanishing Half', 352, 4.9)(4, 'The Wind Knows My Name', 304, 4.9)
The PIPELINE
statement below will load data from an Iceberg table containing the data above into the books
table.<-
are the column names from the SingleStore table into which the data will be loaded.<-
are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE books_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","table_id": "db.books"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secrect_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id | Name | NumPages | Rating |
+------+------------------------+----------+--------+
| 4 | The Wind Knows My Name | 304 | 4.9 |
| 1 | Happy Place | 400 | 4.9 |
| 2 | Legends & Lattes | 304 | 4.9 |
| 3 | The Vanishing Half | 352 | 4.9 |
+------+------------------------+----------+--------+
Refer to START PIPELINE for more information on starting pipelines.
Example - Use Subvalue Mappings
This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.
Create a table.
CREATE TABLE addresses(Id INT,Name TEXT,Street TEXT,City TEXT,Country TEXT);
The following data is used for this example.
(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))
The PIPELINE
statement below will load data from an Iceberg table containing the data above into the addresses
table.<-
are the column names from the SingleStore table into which the data will be loaded.<-
are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db2.addresses"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secrect_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The CREDENTIALS
may also contain an aws_
; refer to CREATE LINK for more information.
Test the pipeline.
TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id | Name | Street | City | Country |
+------+------+-------------+---------------+---------+
| 1 | Mylo | 123 Main St | New York | USA |
| 2 | Naya | 456 Elm St | San Francisco | USA |
+------+------+-------------+---------------+---------+
Refer to START PIPELINE for more information on starting pipelines.
Example - Snowflake Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.
Iceberg tables to be ingested in SingleStore must be created on an external volume.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "SNOWFLAKE","table_id": "db_name.schema_name.table_name","catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secretaccess_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
For the Snowflake catalog, the table_
must consist of three parts - the database name, the schema name, and the table name.catalog.
, catalog.
, catalog.
, and catalog.
are required.
Example - Upserts using REPLACE INTO
The SQL below shows how to use CREATE OR REPLACE
and REPLACE INTO
to do upserts with the Snowflake catalog.
Create a table into which data will be upserted.
CREATE TABLE upsert_table(Id INT,Name TEXT,Val INT,PRIMARY KEY(Id));
Create a pipeline to insert from the Iceberg table into the SingleStore table.
CREATE PIPELINE upserts_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "SNOWFLAKE","table_id": "db_name.schema_name.table_name","catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secretaccess_key>"}'REPLACE INTO TABLE upsert_table(Id <- id,Name <- name,Val <- value)FORMAT ICEBERG;
This command uses REPLACE INTO
to indicate that if there are duplicates of the primary key in the Iceberg table, the last-processed value should be inserted into the SingleStore table.
After new data has been inserted into an Iceberg table, use the same CREATE OR REPLACE
command to do an upsert from the (updated) Iceberg table into the SingleStore table.
Example - REST Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with REST catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "REST","table_id": "db_name.table_name","catalog.uri": "http://host.addresss:8181"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secretaccess_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Example - JDBC Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with JDBC catalog.
SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default.
The following example uses JDBC with SQLite.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secretaccess_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The following example uses JDBC with MySQL.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri": "jdbc:mysql://host.address:3306/default","catalog.jdbc.user": "<user_name>","catalog.jdbc.password": "<password>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secretaccess_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Country <- address::country)FORMAT ICEBERG;
The path_
and catalog_
are required for JDBC catalogs.
Related Topics
Last modified: October 10, 2024