Iceberg Ingest
On this page
Note
This is a Preview feature.
Apache Iceberg is an open-source table format that helps simplify analytical data processing for large datasets in data lakes.
Remarks
The following are supported:
-
Iceberg Version 1 tables and Iceberg Version 2 tables with data files in Parquet format.
-
Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, JDBC, Hive, and Polaris.
-
Initial load of Iceberg tables and continuous ingest of updates to Iceberg tables.
Syntax
CREATE [OR REPLACE] PIPELINE <pipeline_name> ASLOAD DATA S3 '[<warehouse_path>]'CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC |HIVE","table_id": "<table_identifier>",<configuration_json>[, "ingest_mode": "append|upsert|one_time"][, "catalog_name": "<your_catalog_name>" ][, “catalog.<property>” : “property_value” [, …]]}'CREDENTIALS '<credentials_json>’[REPLACE] INTO TABLE <table_name>[ON DUPLICATE KEY UPDATE]<iceberg_subvalue_mapping>FORMAT ICEBERG;<iceberg_subvalue_mapping>:({<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])<iceberg_subvalue_path>: {ident [::ident ...]}
The warehouse_
specifies the path to the catalog warehouse.warehouse_
can be set to an empty string when not needed.
All the data shaping options for Parquet pipelines are supported for Iceberg pipelines.
Catalog Specification
-
The
catalog_
andtype table_
are required for the catalog specification.id -
The
table_
identifies the Iceberg table.id The table identifier is catalog-specific, but is typically in the form: database_ name. table_ name. -
The
catalog_
is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes.name The catalog_
is required for the JDBC catalog and is optional for other catalogs.name -
The
catalog.
is a list of key-value pairs for configuring the catalog connection.property The property and value are passed directly to the Iceberg SDK to establish the catalog connection.
S3 Specification
-
The
configuration_
is a JSON string for S3 configuration parameters such asjson region
,endpoint_
, andurl compatibility_
.mode Refer to CREATE PIPELINE and CREATE LINK for more information. -
The
credentials_
is an AWS credential specification in JSON which includes fields such as aws_json access_ key_ id and aws_ secret_ access_ key. Refer to CREATE LINK for more information.
Ingest Mode
-
Three ingest modes are supported:
one_
for one-time loads, and two modes for continuous ingest:time append
for append-only workloads and upsert forupsert
workloads.The default ingest_
ismode append
. -
With continuous ingest
("ingest_
, a running pipeline automatically detects updates to the Iceberg table and ingests them into the SingleStore table.mode":"append" or "ingest_ mode":"upsert") Refer to Continuous Ingest for details on append and upsert mode. -
In one-time ingest (
"ingest_
) SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment.mode":"one_ time" Subsequent updates to the Iceberg table are not automatically ingested. -
The ingest mode is specified by setting
ingest_
in the pipeline specification as shown in the syntax above.mode
Subvalue Mappings
The iceberg_
assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables.
-
The last field in
iceberg_
must be a primitive type.subvalue_ path -
All
iceberg_
components containing whitespace or punctuation must be surrounded by backticks (`).subvalue_ path -
The
iceberg_
may not contain Parquet nested types (list or map types).subvalue_ path Refer to Parquet - Nested Types for more information.
Enable and Configure Iceberg Ingest
The global engine variable enable_
must be set to ON
to use Iceberg ingest.OFF
by default.
SET GLOBAL enable_iceberg_ingest = ON;
Three additional variables are available to tune Iceberg ingest pipelines: pipelines_
, pipelines_
, and pipelines_
.
To avoid timeout errors during pipeline creation or execution, set pipelines_
and pipelines_
to values higher than 5 minutes to provide an adequate amount of time to process the Iceberg catalog and data.pipelines_
.
Refer to List of Engine Variables for more information.
Continuous Ingest
With continuous ingestion, a running pipeline automatically detects updates to an Iceberg table and ingests them into the SingleStore table.
Continuous Ingest - Append-Only
Append-only mode is the default and is used for scenarios in which new rows are added to the Iceberg table, but no rows are deleted or modified.
In append-only mode ("ingest_
) the pipeline will processes APPEND Iceberg snapshots."ignore_
in the pipeline configuration."ignore_
as doing so may lead to data inconsistency.
Continuous Ingest - Upsert
Upsert mode is used for scenarios where updates modify non-key columns based on a specified key column(s).
In upsert mode ("ingest_
), the pipeline will process updates to the Iceberg table as upserts to the SingleStore table.
Requirements:
-
The SingleStore table must have a primary key or a unique index.
In the <iceberg_
, a column(s) in the Iceberg table must be mapped to the column(s) in SingleStore on which there is a key or unique index.subvalue_ mapping> Typically that column(s) in the Iceberg table will also be declared as a key. -
The pipeline must be created using
REPLACE
.Refer to Additional CREATE PIPELINE Examples for more information.
Limitations:
-
Pipelines will fail on delete snapshots.
Users may override these errors by specifying "ignore_
in the pipeline configuration.unsupported_ modifications"=true SingleStore does not recommend setting "ignore_
as doing so may lead to data inconsistency.unsupported_ modifications"=true -
Limitations are expected to be addressed in future releases.
Ingest Data Type Mapping
The table below lists Iceberg Types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET
clause.
Iceberg Type |
Recommended SingleStore Data Type |
Recommended Conversion |
---|---|---|
boolean |
|
|
int |
|
|
long |
|
|
float |
|
|
double |
|
|
decimal(P,S) |
|
|
date |
|
DATE_ |
time |
|
DATE_ |
timestamp |
|
DATE_ |
timestamptz |
|
DATE_ |
string |
|
|
uuid |
|
|
fixed (L) |
|
|
binary |
|
CREATE OR REPLACE
When a pipeline with a specified pipeline_
already exists, the CREATE OR REPLACE
command functions similarly to CREATE_
, with the added benefit of preserving existing pipeline metadata, such as loaded offsets and data files.CREATE OR REPLACE
on an existing pipeline initiates the Iceberg pipeline to retrieve a new snapshot, schema, and data files, and inserts data from these new files into the destination table in SingleStore.
Executing CREATE OR REPLACE
on an existing Iceberg pipeline may cause some data files to be ingested twice.CREATE OR REPLACE
only with REPLACE INTO
statements or in an upsert configuration.
CREATE PIPELINE books_create_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db.books","ingest_mode": "one_time"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'REPLACE INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE
, REPLACE INTO TABLE
, and ON DUPLICATE KEY UPDATE
.
Example - Glue Catalog on Amazon S3
An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE.
In this example, a table named books
is created and data from an Iceberg table that meets this schema is loaded into the books
table.
Create the table.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Happy Place', 400, 4.9)(2, 'Legends & Lattes', 304, 4.9)(3, 'The Vanishing Half', 352, 4.9)(4, 'The Wind Knows My Name', 304, 4.9)
The PIPELINE
statement below will load data from an Iceberg table containing the data above into the books
table.<-
are the column names from the SingleStore table into which the data will be loaded.<-
are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE books_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","table_id": "db.books"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id | Name | NumPages | Rating |
+------+------------------------+----------+--------+
| 4 | The Wind Knows My Name | 304 | 4.9 |
| 1 | Happy Place | 400 | 4.9 |
| 2 | Legends & Lattes | 304 | 4.9 |
| 3 | The Vanishing Half | 352 | 4.9 |
+------+------------------------+----------+--------+
Refer to START PIPELINE for more information on starting pipelines.
Example - Use Subvalue Mappings
This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.
Create a table.
CREATE TABLE addresses(Id INT,Name TEXT,Street TEXT,City TEXT,Country TEXT,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))
The PIPELINE
statement below will load data from an Iceberg table containing the data above into the addresses
table.<-
are the column names from the SingleStore table into which the data will be loaded.<-
are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db2.addresses"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The CREDENTIALS
may also contain an aws_
; refer to CREATE LINK for more information.
Test the pipeline.
TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id | Name | Street | City | Country |
+------+------+-------------+---------------+---------+
| 1 | Mylo | 123 Main St | New York | USA |
| 2 | Naya | 456 Elm St | San Francisco | USA |
+------+------+-------------+---------------+---------+
Refer to START PIPELINE for more information on starting pipelines.
Example - Snowflake Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.
Iceberg tables to be ingested in SingleStore must be created on an external volume.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "SNOWFLAKE","table_id": "db_name.schema_name.table_name","catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
For the Snowflake catalog, the table_
must consist of three parts - the database name, the schema name, and the table name.catalog.
, catalog.
, catalog.
, and catalog.
are required.
Example - REST Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with REST catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "REST","table_id": "db_name.table_name","catalog.uri": "http://host.addresss:8181"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Example - JDBC Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with JDBC catalog.
SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default.java_
.
The following example uses JDBC with SQLite.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The following example uses JDBC with MySQL.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri": "jdbc:mysql://host.address:3306/default","catalog.jdbc.user": "<user_name>","catalog.jdbc.password": "<password>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Country <- address::country)FORMAT ICEBERG;
The path_
and catalog_
are required for JDBC catalogs.
Example - Hive Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 using Hive Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"catalog_type": "HIVE","catalog.uri": "thrift://<service_endpoint>:46590","region": "us-east-1","catalog.hive.metastore.client.auth.mode": "PLAIN","catalog.hive.metastore.client.plain.username": "<username>","catalog.hive.metastore.client.plain.password": "<password>","catalog.metastore.use.SSL": "true","catalog.hive.metastore.truststore.type": "PKCS12","catalog.hive.metastore.truststore.path": "/path/to/your/project/hive/truststore.12""catalog.hive.metastore.truststore.password": <truststore_password>}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The catalog.
is the base URL for accessing the Hive catalog's API or service endpoint.
The catalog.
is the authentication mode for connecting to the Hive Metastore.
The catalog.
is the username used to authenticate with the Hive Metastore.
The catalog.
is the password for the authenticated user.
The catalog.
is a boolean flag that secures communication with the Hive Metastore.
The catalog.
is the truststore format used to validate the SSL certificate.
The catalog.
is the file path that contains the SSL certificate.
The catalog.
is the password needed to access the truststore.
Refer to GitHub for additional Hive configurations.
Example - Polaris Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 using Polaris Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"catalog_type": "REST","catalog.warehouse": "<polaris_catalog_name>","table_id": "db_name.table_name","region":"us-east-1","catalog.uri":"https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog","region":"us-east-1","catalog.connection":"<polaris_catalog_secret_key>","catalog.scope": "PRINCIPAL_ROLE:ALL"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The catalog.
is the base URL for accessing the Polaris catalog's API or service endpoint.
The catalog.
defines the access permissions for the Polaris catalog.
The catalog.
is a secret key from Polaris catalog connection, formatted as <ClientID>:<Secret>
.
Example - Continuous Ingest - Upsert Mode
The example below shows a pipeline using ingest_
of upsert
.ingest_
is upsert
, the books table has a primary key, Id
, in this example.
When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books
table.
CREATE PIPELINE books_upsert_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db.books","ingest_mode": "upsert"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'REPLACE INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Example - Continuous Ingest - Append Mode
The example below shows a pipeline using ingest_
of append
.
When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books
table.
CREATE PIPELINE books_append_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db.books","ingest_mode": "append"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Example - Hadoop Catalog on FS
Ingest an Iceberg table stored in the local filesystem using Hadoop Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA FS '/tmp/warehouse_path'CONFIG '{"catalog_type": "HADOOP","table_id": "db_name.table_name"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Related Topics
Last modified: November 19, 2024