Important
The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.
CREATE INFERRED PIPELINE
On this page
Infers the schema from the input files and creates a table and pipeline based on the inferred DDL.
CREATE INFERRED PIPELINE now supports Kafka Connect data sources, which allow you to use existing Kafka Connect source connectors to stream data from external systems into SingleStore.topic (TEXT), id (JSON), and record (JSON).
Syntax
CREATE INFERRED PIPELINE <pipeline_name> ASLOAD DATA {input_configuration | kafkaconnect_configuration}[FORMAT [CSV | JSON | AVRO | PARQUET | ICEBERG]][AS JSON];
Remarks
-
The
input_specifies configuration for loading files from Apache Kafka, Amazon S3, a local filesystem, Microsoft Azure, HDFS, and Google Cloud Storage.configuration Refer to CREATE PIPELINEfor more information on configuration specifications. -
The
kafkaconnect_specifies configuration for loading data using Kafka Connect source connectors:configuration KAFKACONNECT <kafka_connector>CONFIG <connector_configuration_json>CREDENTIALS <credentials_json> -
All options supported by
CREATE PIPELINEare supported byCREATE INFERRED PIPELINE. -
CSV, JSON, Avro, Parquet, and Iceberg formats are supported.
Kafka Connect Pipelines require Avro format only. -
While the default format is CSV, Kafka Connect Pipelines requires AVRO format.
-
TEXTandENUMtypes useutf8mb4charset andutf8mb4_collation by default.bin -
The
AS JSONkeyword is used to produce pipeline and table definitions in JSON format. -
Refer to the Permissions Matrix for the required permissions.
Example
The following example demonstrates how to use the CREATE INFERRED PIPELINE command to infer the schema of a Avro-formatted file in an AWS S3 bucket.
This example uses data that conforms to the schema of the books table, as shown in the following.
{"namespace": "books.avro",
"type": "record",
"name": "Book",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "num_pages", "type": "int"},
{"name": "rating", "type": "double"},
{"name": "publish_timestamp", "type": "long",
"logicalType": "timestamp-micros"} ]}Refer to Generate an Avro File for an example of generating an Avro file that conforms to this schema.
The following example creates a pipeline named books_ by inferring the schema from the specified file.
CREATE INFERRED PIPELINE books_pipe AS LOAD DATA S3's3://data_folder/books.avro'CONFIG '{"region":"<region_name>"}'CREDENTIALS '{"aws_access_key_id":"<your_access_key_id>","aws_secret_access_key":"<your_secret_access_key>","aws_session_token":"<your_session_token>"}'FORMAT AVRO;
Created 'books_pipe' table and 'books_pipe' pipelineRun the SHOW CREATE PIPELINE command to view the CREATE PIPELINE statement for the pipeline created by the CREATE INFERRED PIPELINE command.
SHOW CREATE PIPELINE books_pipe;
Pipeline,Create Pipeline
books_pipe,"CREATE PIPELINE `books_pipe`
AS LOAD DATA S3 's3://data-folder/books.avro'
CONFIG '{\""region\"":\""us-west-2\""}'
CREDENTIALS <CREDENTIALS REDACTED>
BATCH_INTERVAL 2500
DISABLE OUT_OF_ORDER OPTIMIZATION
DISABLE OFFSETS METADATA GC
INTO TABLE `books_pipe`
FORMAT AVRO(
`books_pipe`.`id` <- `id`,
`books_pipe`.`name` <- `name`,
`books_pipe`.`num_pages` <- `num_pages`,
`books_pipe`.`rating` <- `rating`,
`books_pipe`.`publish_date` <- `publish_date`)"Run the SHOW CREATE TABLE command to view the CREATE TABLE statement for the table created by the CREATE INFERRED PIPELINE command.
SHOW CREATE TABLE books_pipe;
Table,Create Table
books_pipe,"CREATE TABLE `books_pipe` (
`id` int(11) NOT NULL,
`name` longtext CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`num_pages` int(11) NOT NULL,
`rating` double DEFAULT NULL,
`publish_date` bigint(20) NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE=CREATE
AUTOSTATS_SAMPLING=ON
SQL_MODE='STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'"The pipeline and table definitions can be adjusted using CREATE OR REPLACE PIPELINE (CREATE PIPELINE) and ALTER TABLE commands, respectively.
Once the pipeline and table definitions are configured, start the pipeline.
START PIPELINE books_pipe FOREGROUND;
This command starts a pipeline in the foreground and displays any errors in the client.FOREGROUND keyword.START PIPELINE for more information.
Check if the data is loaded.
SELECT * FROM books_pipeORDER BY id;
+----+--------------------+-----------+--------+------------------+
| id | name | num_pages | rating | publish_date |
+----+--------------------+-----------+--------+------------------+
| 1 | HappyPlace | 400 | 4.9 | 1680721200000000 |
| 2 | Legends & Lattes | 304 | 4.9 | 1669665600000000 |
| 3 | The Vanishing Half | 352 | 4.9 | 1591124400000000 |
+----+--------------------+-----------+--------+------------------+Refer to Schema and Pipeline Inference - Examples for more examples.
Refer to Example: Amazon Kinesis Pipeline for Kafka Connect Pipelines example.
Last modified: February 5, 2026