Iceberg Ingest

Note

This is a Preview feature.

Apache Iceberg is an open-source table format that helps simplify analytical data processing for large datasets in data lakes. SingleStore can be used to add a speed layer to Iceberg tables. Iceberg tables can be directly ingested into SingleStore without the need for an external ETL tool and subsequently processed using SingleStore's high-performance database engine.

Remarks

The following are supported:

  • Iceberg Version 1 tables and Iceberg Version 2 tables with data files in Parquet format.

  • Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, and JDBC.

  • One-time ingest of Iceberg tables. When an Iceberg table is ingested, SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment.

Syntax

CREATE [OR REPLACE] PIPELINE <pipeline_name> AS
LOAD DATA S3 '[<warehouse_path>]'
CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC ",
"table_id": "<table_identifier>",
<configuration_json>
   [, "catalog_name": "<your_catalog_name>" ]
[, “catalog.<property>” : “property_value” [, …]]
}'
CREDENTIALS '<credentials_json>
[REPLACE] INTO TABLE <table_name>
[ON DUPLICATE KEY UPDATE]
<iceberg_subvalue_mapping>
FORMAT ICEBERG;
<iceberg_subvalue_mapping>:(
{<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])
<iceberg_subvalue_path>: {ident [::ident ...]}

The warehouse_path specifies the path to the catalog warehouse. It is required for the JDBC catalog. The warehouse_path can be set to an empty string when not needed.

All the data shaping options for Parquet pipelines are supported for Iceberg pipelines. Refer to Data Shaping with Pipelines for more information.

Catalog Specification

  • The catalog_type and table_id are required for the catalog specification.

  • The table_id identifies the Iceberg table. The table identifier is catalog-specific, but is typically in the form: database_name.table_name.

  • The catalog_name is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes. The catalog_name is required for the JDBC catalog and is optional for other catalogs.

  • The catalog.property is a list of key-value pairs for configuring the catalog connection. The property and value are passed directly to the Iceberg SDK to establish the catalog connection.

S3 Specification

  • The configuration_json is a JSON string for S3 configuration parameters such as region, endpoint_url, and compatibility_mode. Refer to CREATE PIPELINE and CREATE LINK for more information.

  • The credentials_json is an AWS credential specification in JSON which includes fields such as aws_access_key_id and aws_secret_access_key. Refer to CREATE LINK for more information.

Subvalue Mappings

The iceberg_subvalue_mapping assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables. A ::-separated list of field names is used in iceberg_subvalue_path to look up fields in nested schemas. When the files in the Iceberg table are Parquet files, the ::-separated list of field names is used to look up fields in nested Parquet schemas. The following rules apply:

  • The last field in iceberg_subvalue_path must be a primitive type.

  • All iceberg_subvalue_path components containing whitespace or punctuation must be surrounded by backticks (`).

  • The iceberg_subvalue_path may not contain Parquet nested types (list or map types). Refer to Parquet - Nested Types for more information.

Enable and Configure Iceberg Ingest

The following variables need to be set for Iceberg ingest.

SET GLOBAL enable_iceberg_ingest = ON;
SET GLOBAL pipelines_extractor_get_offsets_timeout_ms = 90000;
SET GLOBAL java_pipelines_heap_size = 100;

The global engine variable enable_iceberg_ingest must be set to ON to use Iceberg ingest. This variable is set to OFF by default.

SingleStore recommends setting the global engine variable pipelines_extractor_get_offsets_timeout_ms to 90000 to prevent time outs and to provide an adequate amount of time to process the Iceberg catalog. Larger numbers of files in the Iceberg catalog require setting larger values for this variable. This variable should be adjusted higher if you see timeouts.

SingleStore recommends setting the global engine variable java_pipelines_heap_size to 100 to avoid out of memory errors. This variable should be adjusted higher if you experience out of memory errors.

Refer to Engine Variables for more information.

Ingest Data Type Mapping

The table below lists Iceberg Types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET clause.

Iceberg Type

Recommended SingleStore Data Type

Recommended Conversion

boolean

TINYINT/BOOL/BOOLEAN

int

INT

long

BIGINT

float

FLOAT

double

DOUBLE

decimal(P,S)

DECIMAL

date

DATE

DATE_ADD('1970-01-01', INTERVAL @date DAY)

time

TIME(6)

DATE_ADD('1970-01-01', INTERVAL @time_value MICROSECOND)

timestamp

DATETIME(6)

DATE_ADD('1970-01-01', INTERVAL @timestamp_value MICROSECOND)

timestamptz

DATETIME(6)

DATE_ADD('1970-01-01', INTERVAL @timestamp_value MICROSECOND)

string

LONGTEXT (utf8mb4_bin)

uuid

BINARY(16)

fixed (L)

BINARY(L)

binary

LONGBLOB

Manual Upserts

The CREATE OR REPLACE command can be used to upsert data from an Iceberg table into a SingleStore table. That is, after an initial one-time ingest from a snapshot in an Iceberg table is complete, CREATE OR REPLACE can be used to consume data that has been added to the Iceberg table.

CREATE OR REPLACE PIPELINE can be used with REPLACE INTO TABLE or ON DUPLICATE KEY UPDATE to perform upserts. An upsert updates a row in the table if there is a primary key match or inserts the row if there is no primary key match. Upserts can be performed only if the table to be updated has a PRIMARY KEY or a UNIQUE index.

When a pipeline with pipeline_name exists, CREATE OR REPLACE functions similarly to CREATE_PIPELINE, but preserves pipeline metadata, including loaded offsets and data files. When CREATE OR REPLACE is run on an existing pipeline, the Iceberg pipeline will retrieve a new snapshot, a new schema, and new data files and will insert data from the new data files to the destination table in SingleStore.

If there has been data compaction or if the Iceberg REPLACE operation was used, the pipeline might attempt to insert the same data so handling duplicates is important.

Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE, REPLACE INTO TABLE, and ON DUPLICATE KEY UPDATE. Refer to Performing Upserts for more information on upserts.

Example - Glue Catalog on Amazon S3

An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE. Refer to Apache Iceberg - Glue Catalog for information on using a GLUE catalog with Iceberg.

In this example, a table named books is created and data from an Iceberg table that meets this schema is loaded into the books table.

Create the table.

CREATE TABLE books(
Id INT,
Name TEXT,
NumPages INT,
Rating DOUBLE,
PRIMARY KEY(Id));

The following data is used for this example.

(1, 'Happy Place', 400, 4.9)
(2, 'Legends & Lattes', 304, 4.9)
(3, 'The Vanishing Half', 352, 4.9)
(4, 'The Wind Knows My Name', 304, 4.9)

The PIPELINE statement below will load data from an Iceberg table containing the data above into the books table. The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the <- are the column names from the Iceberg table which is to be loaded into SingleStore.

CREATE PIPELINE books_pipe AS
LOAD DATA S3 ''
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"table_id": "db.books"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE books
(Id <- id,
Name <- name,
NumPages <- numPages,
Rating <- rating)
FORMAT ICEBERG;

Test the pipeline.

TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id   | Name                   | NumPages | Rating |
+------+------------------------+----------+--------+
|    4 | The Wind Knows My Name |      304 |    4.9 |
|    1 | Happy Place            |      400 |    4.9 |
|    2 | Legends & Lattes       |      304 |    4.9 |
|    3 | The Vanishing Half     |      352 |    4.9 |
+------+------------------------+----------+--------+

Refer to START PIPELINE for more information on starting pipelines.

Example - Use Subvalue Mappings

This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.

Create a table.

CREATE TABLE addresses(
Id INT,
Name TEXT,
Street TEXT,
City TEXT,
Country TEXT,
PRIMARY KEY(Id));

The following data is used for this example.

(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))
(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))

The PIPELINE statement below will load data from an Iceberg table containing the data above into the addresses table. The column names on the left side of the <- are the column names from the SingleStore table into which the data will be loaded. The column names on the right side of the <- are the column names from the Iceberg table which is to be loaded into SingleStore.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 ''
CONFIG '{"region":"us-west-2",
"catalog_type": "GLUE",
"catalog_name": "s3_catalog",
"table_id": "db2.addresses"
}'
CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

The CREDENTIALS may also contain an aws_session_token; refer to CREATE LINK for more information.

Test the pipeline.

TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id   | Name | Street      | City          | Country |
+------+------+-------------+---------------+---------+
|    1 | Mylo | 123 Main St | New York      | USA     |
|    2 | Naya | 456 Elm St  | San Francisco | USA     |
+------+------+-------------+---------------+---------+

Refer to START PIPELINE for more information on starting pipelines.

Example - Snowflake Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.

Iceberg tables to be ingested in SingleStore must be created on an external volume. Refer to Tutorial: Create your first Iceberg table , Create an external volume , and Snowflake Iceberg Catalog SDK for more information.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 ''
CONFIG '{"region" : "us-west-2",
"catalog_type": "SNOWFLAKE",
"table_id": "db_name.schema_name.table_name",
"catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com",
"catalog.jdbc.user":"<user_name>",
"catalog.jdbc.password":"<password>",
"catalog.jdbc.role":"<user role>"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

For the Snowflake catalog, the table_id must consist of three parts - the database name, the schema name, and the table name. In addition, the catalog.uri, catalog.jdbc.user, catalog.jdbc.password, and catalog.jdbc.role are required.

Example - REST Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with REST catalog. Refer to Decoupling using the REST Catalog for information on using a REST catalog with Iceberg.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 ''
CONFIG '{"region" : "us-west-2",
"catalog_type": "REST",
"table_id": "db_name.table_name",
"catalog.uri": "http://host.addresss:8181"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

Example - JDBC Catalog on Amazon S3

Ingest an Iceberg table stored in Amazon S3 with JDBC catalog. Refer to Iceberg JDBC Integration for more information on using Iceberg with JDBC catalog.

SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default. Additional drivers can be added using java_pipelines_class_path.

The following example uses JDBC with SQLite.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 's3://path_to_warehouse'
CONFIG '{"region" : "us-west-2",
"catalog_type": "JDBC",
"catalog_name": "catalog_name",
"table_id": "db_name.table_name",
"catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

The following example uses JDBC with MySQL.

CREATE PIPELINE addresses_pipe AS
LOAD DATA S3 's3://path_to_warehouse'
CONFIG '{"region" : "us-west-2",
"catalog_type": "JDBC",
"catalog_name": "catalog_name",
"table_id": "db_name.table_name",
"catalog.uri": "jdbc:mysql://host.address:3306/default",
"catalog.jdbc.user": "<user_name>",
"catalog.jdbc.password": "<password>"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Country <- address::country)
FORMAT ICEBERG;

The path_to_warehouse and catalog_name are required for JDBC catalogs.

Example - Hadoop Catalog on FS

Ingest an Iceberg table stored in the local filesystem using Hadoop Catalog. When loading an Iceberg table from a local file system, the file system must be accessible from all SingleStore nodes.

CREATE PIPELINE addresses_pipe AS
LOAD DATA FS '/tmp/warehouse_path'
CONFIG '{"catalog_type": "HADOOP",
"table_id": "db_name.table_name"}'
INTO TABLE addresses
(Id <- id,
Name <- name,
Street <- address::street,
City <- address::city,
Country <- address::country)
FORMAT ICEBERG;

Example - Manual Upserts using REPLACE INTO

The SQL below shows how to use CREATE OR REPLACE and REPLACE INTO to do manual upserts with the Snowflake catalog. This example uses the books table which has a primary key on Id.

Create a pipeline to insert from the Iceberg table into the books table in SingleStore. Running this pipeline the first time will do the initial ingest of data from the Iceberg table into the SingleStore table. Re-run this pipeline to read the latest snapshot of the Iceberg table and upsert any new data that has been inserted into the Iceberg table into the SingleStore table.

CREATE OR REPLACE PIPELINE books_upserts_pipe AS
LOAD DATA S3 ''
CONFIG '{"region" : "us-west-2",
"catalog_type": "SNOWFLAKE",
"table_id": "db_name.schema_name.table_name",
"catalog.uri": "jdbc:snowflake://<acount_identifier>.snowflakecomputing.com",
"catalog.jdbc.user":"<user_name>",
"catalog.jdbc.password":"<password>",
"catalog.jdbc.role":"<user role>"}'
CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>",
"aws_secret_access_key": "<your_secret_access_key>"}'
REPLACE INTO TABLE books
(Id <- id,
Name <- name,
Val NumPages <- value)numPages,
Rating <- rating)
FORMAT ICEBERG;

This command uses REPLACE INTO to indicate that if there are duplicates of the primary key in the Iceberg table, the last-processed value should be inserted into the SingleStore table. Refer to CREATE PIPELINE and LOAD DATA for more information.

Last modified: November 19, 2024

Was this article helpful?