Iceberg Ingest
On this page
Note
This is a Preview feature.
Apache Iceberg is an open-source table format that helps simplify analytical data processing for large datasets in data lakes.
Remarks
The following are supported:
- 
        Iceberg Version 1 tables and Iceberg Version 2 tables with data files in Parquet format. 
- 
        Iceberg tables stored in Amazon S3 with catalogs: GLUE, Snowflake, REST, JDBC, Hive, and Polaris. 
- 
        Initial load of Iceberg tables and continuous ingest of updates to Iceberg tables. 
Syntax
CREATE [OR REPLACE] PIPELINE <pipeline_name> ASLOAD DATA S3 '[<table_identifier>]'CONFIG '{"catalog_type": "GLUE|SNOWFLAKE|REST|JDBC|HIVE",<configuration_json>[, "ingest_mode": "append|upsert|one_time"][, "catalog_name": "<your_catalog_name>" ][, “catalog.<property>” : “property_value” [, …]][, "schema.change.stop":true|false ]}'CREDENTIALS '<credentials_json>’[REPLACE | MERGE] INTO TABLE <table_name>[ON DUPLICATE KEY UPDATE]<iceberg_subvalue_mapping>FORMAT ICEBERG;<iceberg_subvalue_mapping>:({<singlestore_col_name> | @<variable_name>}<- <iceberg_subvalue_path> [, ... ])<iceberg_subvalue_path>: {ident [::ident ...]}
All the data shaping options for Parquet pipelines are supported for Iceberg pipelines.
Schema inference for Iceberg pipelines is supported.
Table Identifier
- 
          The <table_identifies the Iceberg table.identifier> The <table_is catalog-specific but is typically in the form:identifier> database_.name. table_ name 
Catalog Specification
- 
          The catalog_is required for the catalog specification.type 
- 
          The catalog_is a name to associate with the catalog when reading table metadata and is used internally in SingleStore for logging and metrics purposes.name The catalog_is required for the JDBC catalog and is optional for other catalogs.name 
- 
          The catalog.is a list of key-value pairs for configuring the catalog connection.property The property and value are passed directly to the Iceberg SDK to establish the catalog connection. 
S3 Specification
- 
          The configuration_is a JSON string for S3 configuration parameters such asjson region,endpoint_, andurl compatibility_.mode Refer to CREATE PIPELINE and CREATE LINK for more information. 
Ingest Mode
SingleStore supports two types of mechanisms for managing updates to Iceberg tables, ingest_ and MERGE (merge pipelines).MERGE and ingest_ have different uses and cannot both be used in the same pipeline definition.
- 
          Three ingest modes are supported: one_for one-time loads, and two modes for continuous ingest:time appendfor append-only workloads and upsert forupsertworkloads.The default ingest_ismode append.
- 
          With continuous ingest ("ingest_, a running pipeline automatically detects updates to the Iceberg table and ingests them into the SingleStore table.mode":"append" or "ingest_ mode":"upsert") Refer to Continuous Ingest for details on append and upsert mode. 
- 
          In one-time ingest ( "ingest_) SingleStore will request Iceberg table metadata and load data from the latest snapshot available at that moment.mode":"one_ time" Subsequent updates to the Iceberg table are not automatically ingested. 
- 
          The ingest mode is specified by setting ingest_in the pipeline specification as shown in the syntax above.mode 
Refer to Continuous Ingest for details on ingest_ and a comparison of MERGE and ingest_.
Schema Change Stop
- 
          The Iceberg specification allows schema evolution; columns may be renamed, added, dropped, or type-promoted. 
- 
          To detect schema changes to Iceberg tables that are being ingested with pipelines, specify "schema.in thechange. stop":true CONFIGsection of the pipeline definition.
- 
          With this setting, when the engine detects a schema change, it pauses the ingestion and throws an error. 
- 
          The error message can be found in the ERROR_column of the information_MESSAGE schema. PIPELINES_ ERRORS view. 
Refer to Schema Change Detection for details on schema change detection and resuming ingestion after a schema change has been detected.
MERGE
The MERGE INTO TABLE clause creates a pipeline, called a merge pipeline, that continuously ingests data from an Iceberg table into a SingleStore table.
Merge pipeline handles the following types of Iceberg V2 snapshots:
- 
          Append 
- 
          Overwrite 
- 
          Replace 
- 
          Delete (positional and equality) 
In addition to merge pipelines, SingleStore supports ingest_ for continuous ingest.MERGE and ingest_ cannot be used in the same pipeline definition.
Refer to Continuous Ingest for details on merge pipelines and a comparison of MERGE and ingest_.
Credentials
- 
          The credentials_specifies S3 credentials in JSON format.json For information about the supported credential options, refer to CREATE PIPELINE. 
- 
          Refer to the Minimum Required S3 Pipeline Syntax and AWS Elastic Kubernetes Service (EKS) IAM Roles for Service Accounts (IRSA) Authentication examples in CREATE PIPELINE. . 
Subvalue Mappings
The iceberg_ assigns fields from the Iceberg table to columns in the SingleStore table or to temporary variables.::-separated list of field names is used in iceberg_ to look up fields in nested schemas.::-separated list of field names is used to look up fields in nested Parquet schemas.
- 
          The last field in iceberg_must be a primitive type.subvalue_ path 
- 
          All iceberg_components containing whitespace or punctuation must be surrounded by backticks (`).subvalue_ path 
- 
          The iceberg_may not contain Parquet nested types (list or map types).subvalue_ path Refer to Parquet - Nested Types for more information. 
Enable and Configure Iceberg Ingest
The global engine variable enable_ must be set to ON to use Iceberg ingest.OFF by default.
SET GLOBAL enable_iceberg_ingest = ON;
In addition, to use Iceberg ingest, the global engine variable java_ must be set on all nodes to the path of the JRE 11+ java binary.
SET GLOBAL java_pipelines_java11_path = <path to JRE 11+ java binary>;
Engine variables can be used to control parameters including memory usage and timeouts for Iceberg pipelines.
Pipeline parameters including memory usage and timeouts can be controlled using global engine variables with the pipelines_ prefix as specified in List of Engine Variables.
Memory usage can also be controlled using the java_ and pipelines_ engine variables.
Pipeline timeouts can be controlled using pipelines_ and pipelines_.
Refer to List of Engine Variables for more information.
Continuous Ingest
With continuous ingestion, a running pipeline automatically detects updates to an Iceberg table and ingests them into the SingleStore table.
Continuous Ingest - Append-Only
Append-only mode is used for scenarios in which new rows are added to the Iceberg table, but no rows are deleted or modified.
In append-only mode ("ingest_) the pipeline will process APPEND Iceberg snapshots.DELETE or OVERWRITE snapshot is encountered, an error is raised."ignore_ in the pipeline configuration."ignore_ as doing so may lead to data inconsistency.
Continuous Ingest - Upsert
Upsert mode is used for scenarios where updates modify non-key columns based on a specified key column(s).
In upsert mode ("ingest_), the pipeline will process updates to the Iceberg table as upserts to the SingleStore table.APPEND and OVERWRITE snapshots of the Iceberg table as upserts to the SingleStore table.
Requirements:
- 
          The SingleStore table must have a primary key or a unique index. In the <iceberg_, a column(s) in the Iceberg table must be mapped to the column(s) in SingleStore on which there is a key or unique index.subvalue_ mapping> Typically, the column(s) in the Iceberg table will also be declared as a key. 
- 
          The pipeline must be created using REPLACE.Refer to Additional CREATE PIPELINE Examples for more information. 
Limitations:
- 
          Pipelines will fail on delete snapshots. Users may override these errors by specifying "ignore_in the pipeline configuration.unsupported_ modifications":true SingleStore does not recommend setting "ignore_as doing so may lead to data inconsistency.unsupported_ modifications":true 
- 
          Limitations are expected to be addressed in future releases. 
Continuous Ingest - MERGE (Merge Pipelines)
The MERGE INTO TABLE clause creates a merge pipeline, that continuously ingests data from an Iceberg table into a SingleStore table.
For a merge pipeline, the SingleStore (destination) table:
- 
          Must have the following three columns: - 
              `$_file` VARCHAR(2048) COMMENT 'ICEBERG_ FILE_ PATH' 
- 
              `$_row` BIGINT COMMENT 'ICEBERG_ FILE_ POS' 
- 
              `$_delete` JSON DEFAULT '{}' COMMENT 'ICEBERG_ DELETED_ BY' 
 
- 
              
- 
          You may use different column names for the `$_,file` `$_, androw` `$_columns as long as the columns are marked with thedelete` COMMENTclauses shown above.
- 
          Must have shard key defined as: - 
              SHARD KEY(`$_file`, `$_ row`) 
 
- 
              
- 
          Must not be updated or modified. If the destination table is modified, updates may be applied incorrectly. 
The pipeline declaration must set the `$_ and `$_ columns as follows:
SET `$_file` = pipeline_source_file(), `$_row` = pipeline_source_file_row()
To ingest deletes, a merge pipeline must be created using a VIEW over the destination table.WHERE clause:
WHERE JSON_LENGTH(`$_delete`) = 0
Merge pipelines perform merge-on-read on-demand during table scan when a view is used.
Example - Continuous Ingest with a Merge Pipeline provides a merge pipeline example.
Compare Types of Continuous Ingest
Merge pipelines (MERGE) and Append-Only and Upsert (ingest_) have different characteristics, SingleStore recommends you select the variant that works best with your workload.
- 
          MERGErequires adding columns ($_,file $_,row $_) in the destination (SingleStore) table.delete 
- 
          When using MERGE, users may not directly update the destination (SingleStore) table.If the destination (SingleStore) table is updated, updates made to the source Iceberg table may not propagate properly. 
- 
          MERGEimplements merge-on-read.
- 
          ingest_updates the SingleStore table directly and does not have a read penalty.mode 
- 
          MERGEcan process Append, Overwrite, Replace, and Delete (positional and equality) Iceberg snapshots;ingest_has more limitations on the types of Iceberg snapshots processed.mode 
Data File Filtering
When ingesting from Iceberg tables, SingleStore applies file-level filtering to reduce the number of data files downloaded.
File-level filtering is determined by the WHERE clause in the pipeline definition.
- 
        Only a subset of expressions can be used for file filtering. 
- 
        Expressions that are not eligible for file-level filtering are applied later as row-level filtering after the file is downloaded. 
To enable file filtering before download, write the WHERE clause using the following rules:
- 
        Use any combination of the boolean operators ANDandOR.
- 
        Use the binary operators =,<,<=,>,>=with a column name on one side and a literal value on the other.For example, WHEREcolumn_name_ 1 < 9999 
- 
        Function calls are not supported in file filtering. 
Pipelines always evaluate the full WHERE clause during ingestion, after file-level filtering.
- 
        Row-level filtering may include more complex expressions, such as function calls, that are not eligible for file-level filtering. 
- 
        When the WHEREclause uses anANDoperator, unsupported expressions can be discarded for file-level filtering while still being applied during row-level filtering.
For example, WHERE col1 > 1000 AND col2 = my_
- 
        Per-file filtering: col1 > 1000
- 
        Per-row filtering: col1 > 1000 AND col2 = my_udf(col3) 
If none of the expressions in the WHERE clause can be applied at the file level, the CREATE PIPELINE statement issues the following warning:
      Pipeline's where clause is not suitable for file-level filtering
    
File-level filtering relies on column-level metadata that Iceberg generates for each data file.
Monitor Iceberg Table Ingestion
The per-file status of the Iceberg table ingestion can be queried through the PIPELINES_
Enabling OFFSETS_ on the Iceberg pipeline reduces the number of entries by tracking files only referenced by the pipeline’s current Iceberg table snapshot.OFFSETS_ can be set when you CREATE the pipeline or modify it using ALTER PIPELINE.
Schema Change Detection
If "schema. is specified in the pipeline definition, when the engine detects a schema change, it pauses the ingestion and throws an error.ERROR_ column of the information_
Consider an Iceberg table stored external to SingleStore that is to be ingested into SingleStore with a pipeline.
Let the source Iceberg table have the following schema:
(id int,bigid long,fl float,doub double,occurrence_da date,data string,st struct<a int, b int, c struct<d string, e int>>)
Assume a pipeline has been created to ingest the source Iceberg table into a destination SingleStore table.
Assume the following alter table queries are performed on the source Iceberg table.
ALTER TABLE <table_name> ALTER COLUMN id TYPE bigintALTER TABLE <table_name> RENAME COLUMN id to identityALTER TABLE <table_name> ADD COLUMN name stringALTER TABLE <table_name> DROP COLUMN occurrence_da
If "schema. is set, when the engine detects these changes to the source Iceberg table, the pipeline is stopped, and the following error message is generated.
Iceberg Table Schema at Source has changed. Schema Diff :{"schema_id": {"before": 0,"after": 8},"diff": [{"op": "column_type_change","column": "id","before": "int","after": "long","column_id": 1},{"op": "column_rename","before": "id","after": "identity","column_id": 1},{"op": "column_add","before": null,"after": "name","column_id": 13},{"op": "column_delete","before": "occurrence_da","after": null,"column_id": 5}]}
In this error message, the Iceberg schema version is included in schema_, and the diff includes one entry for each modification to the table schema.
Resume Ingestion
Follow these steps to resume ingestion after a pipeline has been paused due to schema change.
- 
          Run SHOW CREATE PIPELINE <pipeline_and capture the results of this command.name> EXTENDED - 
              The result of this query will contain ALTER PIPELINEstatements that move the "file offsets" back to the position before the pipeline was paused.
 
- 
              
- 
          Drop the pipeline. 
- 
          Apply the schema changes manually to the SingleStore table. 
- 
          Recreate the pipeline. 
- 
          Run the ALTER PIPELINEstatements captured in step 1.
- 
          Start the new pipeline. 
How to Convert Data Types in Iceberg Pipelines
The table below lists Iceberg Types, the SingleStore data type that can be used to store those types, and the recommended conversion to be applied with a SET clause.
| Iceberg Type | Recommended SingleStore Data Type | Recommended Conversion | 
|---|---|---|
| boolean | 
 | |
| int | 
               | |
| long | 
               | |
| float | 
               | |
| double | 
               | |
| decimal(P,S) | 
               | |
| date | 
               | DATE_ | 
| time | 
               | DATE_ | 
| timestamp | 
               | DATE_ | 
| timestamptz | 
               | DATE_ | 
| string | 
               | |
| uuid | 
               | |
| fixed (L) | 
               | |
| binary | 
               | 
CREATE OR REPLACE
When a pipeline with a specified pipeline_ already exists, the CREATE OR REPLACE command functions similarly to CREATE PIPELINE, with the added benefit of preserving existing pipeline metadata, such as loaded offsets and data files.CREATE OR REPLACE on an existing pipeline initiates the Iceberg pipeline to retrieve a new snapshot, schema, and data files, and inserts data from these new files into the destination table in SingleStore.
Executing CREATE OR REPLACE on an existing Iceberg pipeline may cause some data files to be ingested twice.CREATE OR REPLACE only with REPLACE INTO statements or in an upsert configuration.
CREATE PIPELINE books_create_pipe ASLOAD DATA S3 'db.books'CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","ingest_mode": "one_time"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'REPLACE INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Refer to CREATE PIPELINE for syntax for CREATE OR REPLACE PIPELINE, REPLACE INTO TABLE, and ON DUPLICATE KEY UPDATE.
Example - Glue Catalog on Amazon S3
An Iceberg table with data files in Parquet format that is stored in an AWS S3 bucket using AWS Glue can be loaded into a SingleStore table using a pipeline (CREATE PIPELINE.
In this example, a table named books is created and data from an Iceberg table that meets this schema is loaded into the books table.
Create the table.
CREATE TABLE books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Happy Place', 400, 4.9)(2, 'Legends & Lattes', 304, 4.9)(3, 'The Vanishing Half', 352, 4.9)(4, 'The Wind Knows My Name', 304, 4.9)
The PIPELINE statement below will load data from an Iceberg table containing the data above into the books table.<- are the column names from the SingleStore table into which the data will be loaded.<- are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE books_pipe ASLOAD DATA S3 'db.books'CONFIG '{"region":"us-west-2","catalog_type": "GLUE"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE books_pipe;
+------+------------------------+----------+--------+
| Id   | Name                   | NumPages | Rating |
+------+------------------------+----------+--------+
|    4 | The Wind Knows My Name |      304 |    4.9 |
|    1 | Happy Place            |      400 |    4.9 |
|    2 | Legends & Lattes       |      304 |    4.9 |
|    3 | The Vanishing Half     |      352 |    4.9 |
+------+------------------------+----------+--------+Refer to START PIPELINE for more information on starting pipelines.
Example - Use Subvalue Mappings
This example shows the use of subvalue mappings to load nested elements from an Iceberg schema into a SingleStore table.
Create a table.
CREATE TABLE addresses(Id INT,Name TEXT,Street TEXT,City TEXT,Country TEXT,PRIMARY KEY(Id));
The following data is used for this example.
(1, 'Mylo', struct('123 Main St', 'New York', 'USA'))(2, 'Naya', struct('456 Elm St', 'San Francisco', 'USA'))
The PIPELINE statement below will load data from an Iceberg table containing the data above into the addresses table.<- are the column names from the SingleStore table into which the data will be loaded.<- are the column names from the Iceberg table which is to be loaded into SingleStore.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db2.addresses'CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Test the pipeline.
TEST PIPELINE addresses_pipe;
+------+------+-------------+---------------+---------+
| Id   | Name | Street      | City          | Country |
+------+------+-------------+---------------+---------+
|    1 | Mylo | 123 Main St | New York      | USA     |
|    2 | Naya | 456 Elm St  | San Francisco | USA     |
+------+------+-------------+---------------+---------+Refer to START PIPELINE for more information on starting pipelines.
Example - Snowflake Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with a Snowflake catalog.
Iceberg tables to be ingested in SingleStore must be created on an external volume.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.schema_name.table_name'CONFIG '{"region" : "us-west-2","catalog_type": "SNOWFLAKE","catalog.uri": "jdbc:snowflake://tpq12345.snowflakecomputing.com","catalog.jdbc.user":"<user_name>","catalog.jdbc.password":"<password>","catalog.jdbc.role":"<user role>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
For the Snowflake catalog, the <table_ must consist of three parts - the database name, the schema name, and the table name, db_ in the example above.
The catalog.SELECT SYSTEM$ALLOWLIST(); in the Snowflake system.
In addition, the catalog., catalog., catalog., and catalog. are required when using the Snowflake catalog.
Example - REST Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with REST catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.table_name'CONFIG '{"region" : "us-west-2","catalog_type": "REST","catalog.uri": "http://host.addresss:8181"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Example - JDBC Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 with JDBC catalog.
SingleStore supports Postgres, MySQL, and SQLite JDBC drivers by default.java_.
The following example uses JDBC with SQLite.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.table_name'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","catalog.warehouse": "s3://path_to_warehouse","catalog.uri":"jdbc:sqlite_:file:/path_jdbc"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The following example uses JDBC with MySQL.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.table_name'CONFIG '{"region" : "us-west-2","catalog_type": "JDBC","catalog_name": "catalog_name","catalog.warehouse": "s3://path_to_warehouse","catalog.uri": "jdbc:mysql://host.address:3306/default","catalog.jdbc.user": "<user_name>","catalog.jdbc.password": "<password>"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Country <- address::country)FORMAT ICEBERG;
The catalog. and catalog_ are required for JDBC catalogs.
Example - Hive Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 using Hive Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.table_name'CONFIG '{"catalog_type": "HIVE","catalog.uri": "thrift://<service_endpoint>:46590","region": "us-east-1","catalog.hive.metastore.client.auth.mode": "PLAIN","catalog.hive.metastore.client.plain.username": "<username>","catalog.hive.metastore.client.plain.password": "<password>","catalog.metastore.use.SSL": "true","catalog.hive.metastore.truststore.type": "PKCS12","catalog.hive.metastore.truststore.path": "/path/to/your/project/hive/truststore.12""catalog.hive.metastore.truststore.password": <truststore_password>}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The catalog. is the base URL for accessing the Hive catalog's API or service endpoint.
The catalog. is the authentication mode for connecting to the Hive Metastore.
The catalog. is the username used to authenticate with the Hive Metastore.
The catalog. is the password for the authenticated user.
The catalog. is a boolean flag that secures communication with the Hive Metastore.
The catalog. is the truststore format used to validate the SSL certificate.
The catalog. is the file path that contains the SSL certificate.
The catalog. is the password needed to access the truststore.
Refer to GitHub for additional Hive configurations.
Example - Polaris Catalog on Amazon S3
Ingest an Iceberg table stored in Amazon S3 using Polaris Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA S3 'db_name.table_name'CONFIG '{"catalog_type": "REST","catalog.warehouse": "<polaris_catalog_name>","table_id": "db_name.table_name","region":"us-east-1","catalog.uri":"https://tpq12345.snowflakecomputing.com/polaris/api/catalog","region":"us-east-1","catalog.credential":"catalog.credential","catalog.scope": "PRINCIPAL_ROLE:ALL"}'CREDENTIALS '{"aws_access_key_id" : "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
The catalog. is the base URL for accessing the Polaris catalog's API or service endpoint.
The catalog. defines the access permissions for the Polaris catalog.
The catalog. is a secret key from Polaris catalog connection, formatted as <ClientID>:<Secret>.
Example - Continuous Ingest - Upsert Mode
The example below shows a pipeline using ingest_ of upsert.ingest_ is upsert, the books table has a primary key, Id, in this example.
When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books table.
CREATE PIPELINE books_upsert_pipe ASLOAD DATA S3 'db.books'CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog""ingest_mode": "upsert"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'REPLACE INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Example - Continuous Ingest - Append Mode
The example below shows a pipeline using ingest_ of append.
When started, this pipeline will ingest data from the latest snapshot of the Iceberg table into the SingleStore books table.
CREATE PIPELINE books_append_pipe ASLOAD DATA S3 'db.books'CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog","ingest_mode": "append"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'INTO TABLE books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERG;
Example - Hadoop Catalog on FS
Ingest an Iceberg table stored in the local filesystem using Hadoop Catalog.
CREATE PIPELINE addresses_pipe ASLOAD DATA FS 'db_name.table_name'CONFIG '{"catalog_type": "HADOOP","catalog.warehouse": "/tmp/warehouse_path"}'INTO TABLE addresses(Id <- id,Name <- name,Street <- address::street,City <- address::city,Country <- address::country)FORMAT ICEBERG;
Example - Continuous Ingest with a Merge Pipeline
The example below shows an example of a creating a merge pipeline.
In this example, a destination table in SingleStore named _ is created with the necessary $_, $_, and $_ columns.books is created on top of the _ table.
When started, this pipeline will ingest data from the latest snapshot of the source Iceberg table into the SingleStore _ table._ table.
Create the destination _ table.
CREATE TABLE _books(Id INT,Name TEXT,NumPages INT,Rating DOUBLE,`$_file` VARCHAR(2048),`$_row` BIGINT,`$_delete` JSON DEFAULT '{}',KEY(Id),PRIMARY KEY(`$_file`,`$_row`));
Create the books view.
CREATE VIEW booksASSELECT Id, Name, NumPages, Rating from _booksWHERE JSON_LENGTH(`$_delete`) = 0;
Define the pipeline.
CREATE PIPELINE books_pipe ASLOAD DATA S3 'db.books'CONFIG '{"region":"us-west-2","catalog_type": "GLUE","catalog_name": "s3_catalog"}'CREDENTIALS '{"aws_access_key_id": "<your_access_key_id>","aws_secret_access_key": "<your_secret_access_key>"}'MERGE INTO TABLE _books(Id <- id,Name <- name,NumPages <- numPages,Rating <- rating)FORMAT ICEBERGSET `$_file` = pipeline_source_file(),`$_row` = pipeline_source_file_row();
Troubleshooting
The following table lists errors that can occur when creating an Iceberg Ingest pipeline.
| Catalog | Error | Cause and Resolution | 
|---|---|---|
| All | protocol error … Process died unexpectedly or didn't start. | An incorrect value of the engine variable  | 
| Snowflake | Certificate for <. | An incorrect URI may cause this error. Verify that the catalog. | 
| Snowflake | SEVERE: WARNING!!! Using fail-open to connect. | This issue needs to be resolved on the Snowflake side. | 
| Snowflake | Parquet parsing errors such as “Dictionary encoding not implemented". | Set Snowflake table property  | 
Related Topics
Last modified: September 24, 2025