Iceberg Ingest
On this page
Note
This is a Preview feature.
Apache Iceberg is an open-source table format that helps simplify analytical data processing for large datasets in data lakes.
Remarks
The following are supported:
-
Iceberg Version 1 tables and Iceberg Version 2 tables with data files in Parquet format.
-
Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, and JDBC.
-
One-time ingest of Iceberg tables.
When an Iceberg table is ingested, SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment.
Important
The global engine variable enable_ must be set to ON to use Iceberg Ingest.OFF by default.
SET GLOBAL enable_iceberg_ingest = ON;
Syntax
CREATE [OR REPLACE] PIPELINE <pipeline_name> ASLOAD DATA S3 '[<warehouse_path>]'CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC","table_id": "<table_identifier>",<configuration_json>[, "catalog_name": "<your_catalog_name>" ][, “catalog.<property>” : “property_value” [, …]]}'CREDENTIALS '<credentials_json>’[REPLACE] INTO TABLE <table_name><iceberg_subvalue_mapping>FORMAT ICEBERG;<iceberg_subvalue_mapping>:({<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])<iceberg_subvalue_path>: {ident [::ident ...]}
The warehouse_ specifies the path to the catalog warehouse.warehouse_ can be set to an empty string when not needed.
All the data shaping options for Parquet pipelines are supported for Iceberg pipelines.
CONFIG
Catalog Specification
-
The
catalog_andtype table_are required for the catalog specification.id -
The
table_identifies the Iceberg table.id The table identifier is catalog-specific, but is typically in the form: database_.name. table_ name -
The
catalog_is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes.name The catalog_is required for the JDBC catalog and is optional for other catalogs.name -
The
catalog.is a list of key-value pairs for configuring the catalog connection.property The property and value are passed directly to the Iceberg SDK to establish the catalog connection.
S3 Specification
-
The
<configuration_is a JSON string for S3 configuration parameters such asjson> region,endpoint_, andurl compatibility_.mode Refer to CREATE PIPELINE and CREATE LINK for more information.
CREDENTIALS (<credentials_ json>)
-
The
<credentials_specifies S3 credentials in JSON format.json> With this specification, credentials are directly included in the pipeline. While including credentials in the pipeline is simpler, it is less secure than using vended credentials or EKS IRSA. For information about the supported credential options, refer to CREATE PIPELINE. -
When using EKS IRSA, the
CREDENTIALSclause is optional; however, the clause may be included to add additional authentication parameters.
REPLACE INTO
REPLACE INTO is required for pipelines that use ingest_.REPLACE INTO not be used with other types of pipelines as doing so causes unnecessary overhead.
When you execute a REPLACE INTO statement on a table with a PRIMARY KEY or UNIQUE index, the engine checks for matching values in these indexes.PRIMARY KEY or UNIQUE index, REPLACE INTO operates like INSERT.
Subvalue Mappings
The iceberg_ assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables.::-separated list of field names is used in iceberg_ to look up fields in nested schemas.::-separated list of field names is used to look up fields in nested Parquet schemas.
-
The last field in
iceberg_must be a primitive type.subvalue_ path -
All
iceberg_components containing whitespace or punctuation must be surrounded by backticks (`).subvalue_ path -
The
iceberg_may not contain Parquet nested types (list or map types).subvalue_ path Refer to Parquet - Nested Types for more information.
Iceberg Catalog and Security Configuration
Catalog Types
SingleStore supports the following Iceberg Catalogs: Glue, Snowflake, REST, and JDBC.
Types of Ingest
Manual Upserts
The CREATE OR REPLACE command can be used to upsert data from an Iceberg table into a SingleStore table.CREATE OR REPLACE can be used to consume data that has been added to the Iceberg table.
CREATE OR REPLACE PIPELINE can be used with REPLACE INTO TABLE or ON DUPLICATE KEY UPDATE to perform upserts.PRIMARY KEY or a UNIQUE index.
When a pipeline with pipeline_ exists, CREATE OR REPLACE functions similarly to CREATE_, but preserves pipeline metadata, including loaded offsets and data files.CREATE OR REPLACE is run on an existing pipeline, the Iceberg pipeline will retrieve a new snapshot, a new schema, and new data files and will insert data from the new data files to the destination table in SingleStore.
If there has been data compaction or if the Iceberg REPLACE operation was used, the pipeline might attempt to insert the same data so handling duplicates is important.
Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE, REPLACE INTO TABLE, and ON DUPLICATE KEY UPDATE.
Iceberg Pipeline Configuration
Data Type Translation
The following table lists Iceberg Types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET clause.
|
Iceberg Type |
Recommended SingleStore Data Type |
Recommended Conversion |
|---|---|---|
|
boolean |
| |
|
int |
| |
|
long |
| |
|
float |
| |
|
double |
| |
|
decimal(P,S) |
| |
|
date |
|
DATE_ |
|
time |
|
DATE_ |
|
timestamp |
|
DATE_ |
|
timestamptz |
|
DATE_ |
|
string |
| |
|
uuid |
| |
|
fixed (L) |
| |
|
binary |
|
Configure Iceberg Ingest
The following variables need to be set for Iceberg ingest.
SET GLOBAL enable_iceberg_ingest = ON;SET GLOBAL pipelines_extractor_get_offsets_timeout_ms = 90000;SET GLOBAL java_pipelines_heap_size = 100;
The global engine variable enable_ must be set to ON to use Iceberg ingest.OFF by default.
SingleStore recommends setting the global engine variable pipelines_ to 90000 to prevent time outs and to provide an adequate amount of time to process the Iceberg catalog.
SingleStore recommends setting the global engine variable java_ to 100 to avoid out of memory errors.
Refer to Engine Variables for more information.
Examples
Glue Catalog on Amazon S3
An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE.
In this example, a table named books is created and data from an Iceberg table that meets this schema is loaded into the books table.
Create the table.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Happy Place', 400, 4.9)(2, 'Legends & Lattes', 304, 4.9)(3, 'The Vanishing Half', 352, 4.9)(4, 'The Wind Knows My Name', 304, 4.9)
The PIPELINE statement below will load data from an Iceberg table containing the data above into the books table.<- are the column names from the SingleStore table into which the data will be loaded.<- are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE books_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","table_id": "db.books"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id | Name | NumPages | Rating |
+------+------------------------+----------+--------+
| 4 | The Wind Knows My Name | 304 | 4.9 |
| 1 | Happy Place | 400 | 4.9 |
| 2 | Legends & Lattes | 304 | 4.9 |
| 3 | The Vanishing Half | 352 | 4.9 |
+------+------------------------+----------+--------+Refer to START PIPELINE for more information on starting pipelines.
Use Subvalue Mappings
This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.
Create a table.
CREATE TABLE addresses(Id INT,Name TEXT,Street TEXT,City TEXT,Country TEXT,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))
The PIPELINE statement below will load data from an Iceberg table containing the data above into the addresses table.<- are the column names from the SingleStore table into which the data will be loaded.<- are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "db2.addresses"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id | Name | Street | City | Country |
+------+------+-------------+---------------+---------+
| 1 | Mylo | 123 Main St | New York | USA |
| 2 | Naya | 456 Elm St | San Francisco | USA |
+------+------+-------------+---------------+---------+Refer to START PIPELINE for more information on starting pipelines.
Snowflake Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.
Iceberg tables to be ingested in SingleStore must be created on an external volume.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region": "us-west-2","catalog_type": "SNOWFLAKE","table_id": "db_name.schema_name.table_name","catalog.uri": "jdbc:snowflake://tpq12345.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
For the Snowflake catalog, the table_ must consist of three parts - the database name, the schema name, and the table name, db_ in the example above.
The catalog.SELECT SYSTEM$ALLOWLIST(); in the Snowflake system.
In addition, the catalog., catalog., catalog., and catalog. are required when using the Snowflake catalog.
REST Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with REST catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 ''CONFIG '{"region": "us-west-2","catalog_type": "REST","table_id": "db_name.table_name","catalog.uri": "http://host.addresss:8181"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
JDBC Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with JDBC catalog.
SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default.java_.
The following example uses JDBC with SQLite.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region": "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The following example uses JDBC with MySQL.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 's3://path_to_warehouse'CONFIG '{"region": "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","table_id": "db_name.table_name","catalog.uri": "jdbc:mysql://host.address:3306/default","catalog.jdbc.user": "<user_name>","catalog.jdbc.password": "<password>"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Country <- address::country)FORMAT ICEBERG;
The path_ and catalog_ are required for JDBC catalogs.
Hadoop Catalog on FS
Ingest an Iceberg table stored in the local filesystem using Hadoop Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA FS '/tmp/warehouse_path'CONFIG '{"catalog_type": "HADOOP","table_id": "db_name.table_name"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Manual Upserts using REPLACE INTO
The SQL below shows how to use CREATE OR REPLACE and REPLACE INTO to do manual upserts with the Snowflake catalog.books table which has a primary key on Id.
Create a pipeline to insert from the Iceberg table into the books table in SingleStore.
CREATE OR REPLACE PIPELINE books_upserts_pipe ASLOAD DATA S3 ''CONFIG '{"region" : "us-west-2","catalog_type": "SNOWFLAKE","table_id": "db_name.schema_name.table_name","catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'REPLACE INTO TABLE books(Id <- id,Name <- name,Val NumPages <- value)numPages,Rating <- rating)FORMAT ICEBERG;
This command uses REPLACE INTO to indicate that if there are duplicates of the primary key in the Iceberg table, the last-processed value should be inserted into the SingleStore table.
Debug Datetime Conversions
Datetime value conversion can be debugged by loading raw datetime from an Iceberg file into a BLOB.books_ table with a PublishTimestamp attribute of type DATETIME(6) and an extra attribute, RawTimestamp, into which the raw timestamp data will be loaded.TEXT attribute in addition to, or instead of, using a BLOB attribute.
CREATE TABLE books_debug(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PublishTimestamp DATETIME(6),RawTimestamp BLOB);
The PIPELINE statement below loads the publishTimestamp from the Iceberg file into both the PublishTimestamp column and the RawTimestamp column.SET statement is used to convert the publishTimestamp from Iceberg to the PublishTimestamp in SingleStore.
The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded.<- are the column names from the Iceberg file which is to be loaded into SingleStore.
CREATE PIPELINE books_debug_pipe ASLOAD DATA S3 ''CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","table_id": "dbtest.books"}'CREDENTIALS '{"aws_access_key_id": "","aws_secret_access_key": "","aws_session_token": ""}'INTO TABLE books_debug(Id <- IdIce,Name <- Name,NumPages <- NumPages,Rating <- Rating,@ts<-PublishTimestamp,RawTimestamp<-PublishTimestamp)FORMAT ICEBERGSET PublishTimestamp = DATE_ADD('1970-01-01', INTERVAL @ts MICROSECOND);
Use TEST PIPELINE to compare the values in the two columns.PublishTimestamp column shows the converted timestamp, the RawTimestamp column contains the timestamp from the Iceberg file, in microseconds in this example.SET statement.
TEST PIPELINE books_debug_pipe;
+------+--------------------+----------+--------+------------------+-------+---------------------+
| Id | Name | NumPages | Rating | RawTimestamp | PublishTimestamp |
+------+--------------------+----------+--------+------------------+-------+---------------------+
| 1 | Happy Place | 400 | 4.9 | 1680721200000000 | 2023-04-05 12:00:00.000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669665600000000 | 2022-11-28 12:00:00.000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591124400000000 | 2020-06-02 12:00:00.000000 |
+------+--------------------+----------+--------+------------------+-------+---------------------+Troubleshooting
The following table lists errors that can occur when creating an Iceberg Ingest pipeline.
|
Catalog |
Error |
Cause and Resolution |
|---|---|---|
|
All |
protocol error … Process died unexpectedly or didn't start. |
An incorrect value of the engine variable |
|
Snowflake |
Certificate for <. |
An incorrect URI may cause this error. Verify that the catalog. |
|
Snowflake |
SEVERE: WARNING!!! Using fail-open to connect. |
This issue needs to be resolved on the Snowflake side. |
|
Snowflake |
Parquet parsing errors such as “Dictionary encoding not implemented". |
Set Snowflake table property |
Related Topics
Last modified: February 6, 2026