Important
The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.
Kafka Connect Pipelines
On this page
Note
This is a Preview feature.
Overview
SingleStore Kafka Connect Pipelines can use Kafka Connect source connectors to stream data from external systems into SingleStore.
Key Concepts
The Kafka Connect framework is an open-source component of Apache Kafka that provides a scalable and reliable method to stream data between systems.
-
Source connectors: Pull data from external systems into Kafka
-
Sink connectors: Push data from Kafka to external systems
SingleStore uses Kafka Connect source connectors to ingest data directly into SingleStore tables without requiring an intermediate Kafka cluster.
Architecture
Key Architectural Features
-
Leaf Node Processing: The extractor processes data on leaf nodes rather than the Master Aggregator which reduces load on the aggregator and improves performance.
-
Static Schema Table: Data is automatically loaded into a predefined table structure with three columns:
-
topic(TEXT): Source identifier -
id(JSON): Unique record identifier -
record(JSON): Complete record data
-
-
JSON-Based Offset Management: Uses
information_table to track offsets in JSON format that supports complex offset structures required by different connectors.schema. PIPELINES_ SOURCE_ OFFSETS -
Multi-Task Support: Pipelines can spawn multiple tasks for parallel processing when the data source supports partitioning (for example, Kinesis shards).
How Kafka Connect Pipelines Works
When a Kafka Connect pipeline is created, SingleStore performs the following:
-
Checks the connector class and configuration parameters
-
If using
CREATE INFERRED PIPELINE, automatically creates an inferred table with the static schema -
Launches the extractor process on leaf nodes
-
Ingests data from the external data source and loads data into the table
-
Stores offset information in
PIPELINES_for exactly-once semanticsSOURCE_ OFFSETS
Deploy Kafka Connect Connectors
SingleStore provides full support for custom Kafka Connect connectors and enables organizations to deploy and configure them with complete control over configuration and management.
Enable Kafka Connect Pipelines
Kafka Connect Pipelines is an experimental feature that must be explicitly enabled.
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"
Note
This setting must be configured before creating Kafka Connect Pipelines and requires the SUPER permission.
Run the following command to confirm whether this feature is enabled.
SHOW VARIABLES LIKE 'experimental_features_config'
+------------------------------+----------------------------+
| Variable_name | Value |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+After enabling the Kafka Connect Pipelines, start the Kafka Connect source connector manually using the provided configuration.
Syntax
CREATE [OR REPLACE] INFERRED PIPELINE <pipeline_name>AS LOAD DATA KAFKACONNECT '<kafka_connector>'CONFIG '<connector_configuration_json>'CREDENTIALS '<credentials_json>'FORMAT AVRO;
CONFIG Parameter
The CONFIG parameter must contain a JSON object with the following:
-
Required fields:
-
connector.: The fully-qualified Java class name of the Kafka Connect source connectorclass
-
-
Common optional fields:
-
tasks.: The maximum number of parallel tasks.max The default value is 4.
-
-
Connector specific fields: Vary by connector type.
Refer to CREATE PIPELINE and CREATE INFERRED PIPELINE for more information.
Offset Management
Kafka Connect Pipelines use JSON-based offsets stored in information_, which differs from traditional integer-based offsets used by native Kafka pipelines.
SELECT * FROM information_schema.PIPELINES_SOURCE_OFFSETSWHERE PIPELINE_NAME = 'kafkaconnect-pipeline'
Offset Format Examples
The following is the format of KEY and VALUE of Amazon Kinesis offsets:
KEY: {"shardId":"shardId-XXXX"}VALUE: {"sequenceNumber":"XXXX"}
Manage Kafka Connect Pipelines
Pipeline Lifecycle Operations
Refer to The Lifecycle of a Pipeline for more information.
Check Pipeline Status
SELECTPIPELINE_NAME,STATE,CONFIG_JSONFROM information_schema.PIPELINESWHERE PIPELINE_NAME = '<kafkaconnect-pipeline>'
The following are the pipeline states:
-
Running: Pipeline is actively ingesting data
-
Stopped: Pipeline is stopped
-
Error: Pipeline encountered an error
Configuration Best Practices
Task Configuration
Parallel processing with tasks.:
-
Configure
tasks.based on data source partitioningmax -
For Amazon Kinesis: Set
tasks.equal to the number of shardsmax -
Monitor
TASK_distribution inID PIPELINES_SOURCE_ OFFSETS
-- Check task distributionSELECTTASK_ID,COUNT(*) as offset_countFROM information_schema.PIPELINES_SOURCE_OFFSETSWHERE PIPELINE_NAME = '<pipeline_name>'GROUP BY TASK_ID;
Security Best Practices
-
Credential Storage: Always use the
CREDENTIALSparameter for sensitive information, never include passwords inCONFIG -
Network Security: Ensure secure connections to data sources (use SSL/TLS when available)
-
Access Control: Grant minimum required permissions to pipeline users
-
Audit Logging: Enable logging for pipeline operations and monitor access
Performance Optimization
-
Configure Extraction Parameters: Use
SETstatements to tune the extraction performance:SET GLOBAL pipelines_extractor_batch_size = 10000;SET GLOBAL pipelines_extractor_max_batch_interval_ms = 1000; -
Computed Columns: Create computed columns for frequently accessed JSON fields
ALTER TABLE <pipeline_table>ADD COLUMN customer_id AS (JSON_EXTRACT_STRING(record, 'customer_id')) PERSISTED INT; -
Indexes: Add indexes on computed columns for better query performance
CREATE INDEX idx_customer_id ON <pipeline_table>(customer_id) -
Monitor Batch Times: Track batch processing time and adjust configuration if needed
-
Offset Progress: Regularly check
PIPELINES_to ensure offsets are advancingSOURCE_ OFFSETS
Example: Amazon Kinesis Pipeline
-- Enable the experimental featureSET GLOBAL experimental_features_config = "kafka_connect_enabled=true";-- Create Kinesis pipelineCREATE INFERRED PIPELINE kinesis_pipelineAS LOAD DATA KAFKACONNECT 'kafka-connector'CONFIG '{"connector.class": "com.singlestore.kafka.connect.kinesis.KinesisSourceConnector","aws.access.key.id": "<aws_access_key>","aws.secret.key.id": "<aws_secret_key>","kafka.topic": "kinesis-topic","kinesis.stream": "my-kinesis-stream","kinesis.region": "us-east-1","tasks.max": 4}'CREDENTIALS '{}' -- AWS credentials are in CONFIG for Kinesis connectorFORMAT AVRO;-- Start the pipelineSTART PIPELINE kinesis_pipeline;
Static Schema Table
When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table:
CREATE TABLE `<pipeline_name>` (
`topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`id` JSON COLLATE utf8mb4_bin NOT NULL,
`record` JSON COLLATE utf8mb4_bin NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
)Querying Data
Extract data from the JSON record column using JSON functions:
-- Extract a specific fieldSELECTtopic,JSON_EXTRACT_STRING(record, 'fieldName') AS field_valueFROM kinesis_pipeline;-- Filter based on JSON contentSELECT *FROM kinesis_pipelineWHERE JSON_EXTRACT_STRING(record, 'status') = 'active';-- Extract multiple fieldsSELECTtopic,JSON_EXTRACT_STRING(record, 'customer_id') AS customer_id,JSON_EXTRACT_STRING(record, 'order_id') AS order_id,JSON_EXTRACT_STRING(record, 'timestamp') AS event_timeFROM kinesis_pipeline;-- Create a computed column for better performanceALTER TABLE kinesis_pipelineADD COLUMN status AS (JSON_EXTRACT_STRING(record, 'status')) PERSISTED STRING;CREATE INDEX idx_status ON kinesis_pipeline(status);-- Query using the computed columnSELECT *FROM kinesis_pipelineWHERE status = 'active';
Last modified: February 19, 2026