Important
The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.
Kafka Connect Pipelines
On this page
Note
This is a Preview feature.
Overview
SingleStore Kafka Connect Pipelines can use Kafka Connect source connectors to stream data from external systems into SingleStore.
Key Concepts
The Kafka Connect framework is an open-source component of Apache Kafka that provides a scalable and reliable method to stream data between systems.
-
Source connectors: Pull data from external systems into Kafka
-
Sink connectors: Push data from Kafka to external systems
SingleStore uses Kafka Connect source connectors to ingest data directly into SingleStore tables without requiring an intermediate Kafka cluster.
Architecture
Key Architectural Features
-
Leaf Node Processing: The extractor processes data on leaf nodes rather than the Master Aggregator which reduces load on the aggregator and improves performance.
-
Static Schema Table: Data is automatically loaded into a predefined table structure with three columns:
-
topic(TEXT): Source identifier -
id(JSON): Unique record identifier -
record(JSON): Complete record data
-
-
JSON-Based Offset Management: Uses
information_table to track offsets in JSON format that supports complex offset structures required by different connectors.schema. PIPELINES_ SOURCE_ OFFSETS -
Multi-Task Support: Pipelines can spawn multiple tasks for parallel processing when the data source supports partitioning (for example, Kinesis shards).
How Kafka Connect Pipelines Works
When a Kafka Connect pipeline is created, SingleStore performs the following:
-
Checks the connector class and configuration parameters
-
If using
CREATE INFERRED PIPELINE, automatically creates an inferred table with the static schema -
Launches the extractor process on leaf nodes
-
Ingests data from the external data source and loads data into the table
-
Stores offset information in
PIPELINES_for exactly-once semanticsSOURCE_ OFFSETS
Deploy Kafka Connect Connectors
SingleStore provides full support for custom Kafka Connect connectors and enables deploying and configuring them with complete control over configuration and management.
Enable Kafka Connect Pipelines
Kafka Connect Pipelines is an experimental feature that must be explicitly enabled.
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"
Note
This setting must be configured before creating Kafka Connect Pipelines and requires the SUPER permission.
Run the following command to confirm whether this feature is enabled.
SHOW VARIABLES LIKE 'experimental_features_config'
+------------------------------+----------------------------+
| Variable_name | Value |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+After enabling the Kafka Connect Pipelines, start the Kafka Connect source connector manually using the provided configuration.
Syntax
CREATE [OR REPLACE] INFERRED PIPELINE <pipeline_name>AS LOAD DATA KAFKACONNECT '<kafka_connector>'CONFIG '<connector_configuration_json>'CREDENTIALS '<credentials_json>'FORMAT AVRO;
CONFIG Parameter
The CONFIG parameter must contain a JSON object with the following:
-
Required fields:
-
connector.: The fully-qualified Java class name of the Kafka Connect source connectorclass
-
-
Common optional fields:
-
tasks.: The maximum number of parallel tasks.max The default value is 4.
-
-
Connector specific fields: Vary by connector type.
Refer to CREATE PIPELINE and CREATE INFERRED PIPELINE for more information.
Offset Management
Kafka Connect Pipelines use JSON-based offsets stored in information_, which differs from traditional integer-based offsets used by native Kafka pipelines.
SELECT * FROM information_schema.PIPELINES_SOURCE_OFFSETSWHERE PIPELINE_NAME = 'kafkaconnect-pipeline'
Offset Format Examples
The following is the format of KEY and VALUE of Amazon Kinesis offsets:
KEY: {"shardId":"shardId-XXXX"}VALUE: {"sequenceNumber":"XXXX"}
Manage Kafka Connect Pipelines
Pipeline Lifecycle Operations
Refer to The Lifecycle of a Pipeline for more information.
Check Pipeline Status
SELECTPIPELINE_NAME,STATE,CONFIG_JSONFROM information_schema.PIPELINESWHERE PIPELINE_NAME = '<kafkaconnect-pipeline>'
The following are the pipeline states:
-
Running: Pipeline is actively ingesting data
-
Stopped: Pipeline is stopped
-
Error: Pipeline encountered an error
Configuration Best Practices
Task Configuration
Parallel processing with tasks.:
-
Configure
tasks.based on data source partitioningmax -
For Amazon Kinesis: Set
tasks.equal to the number of shardsmax -
Monitor
TASK_distribution inID PIPELINES_SOURCE_ OFFSETS
-- Check task distributionSELECTTASK_ID,COUNT(*) as offset_countFROM information_schema.PIPELINES_SOURCE_OFFSETSWHERE PIPELINE_NAME = '<pipeline_name>'GROUP BY TASK_ID;
Security Best Practices
-
Credential Storage: Always use the
CREDENTIALSparameter for sensitive information, never include passwords inCONFIG -
Network Security: Ensure secure connections to data sources (use SSL/TLS when available)
-
Access Control: Grant minimum required permissions to pipeline users
-
Audit Logging: Enable logging for pipeline operations and monitor access
Performance Optimization
-
Configure Extraction Parameters: Use
SETstatements to tune the extraction performance:SET GLOBAL pipelines_extractor_batch_size = 10000;SET GLOBAL pipelines_extractor_max_batch_interval_ms = 1000; -
Computed Columns: Create computed columns for frequently accessed JSON fields
ALTER TABLE <pipeline_table>ADD COLUMN customer_id AS (JSON_EXTRACT_STRING(record, 'customer_id')) PERSISTED INT; -
Indexes: Add indexes on computed columns for better query performance
CREATE INDEX idx_customer_id ON <pipeline_table>(customer_id) -
Monitor Batch Times: Track batch processing time and adjust configuration if needed
-
Offset Progress: Regularly check
PIPELINES_to ensure offsets are advancingSOURCE_ OFFSETS
Examples
The following examples demonstrate how to ingest data from Amazon Kinesis.
Example: Amazon Kinesis Pipeline
The following example demonstrates how to create a basic Kafka Connect Pipeline that automatically creates a table with a static schema and then ingests data from Amazon Kinesis into the table.
-- Enable the experimental featureSET GLOBAL experimental_features_config = "kafka_connect_enabled=true";-- Create Kinesis pipelineCREATE INFERRED PIPELINE kinesis_pipelineAS LOAD DATA KAFKACONNECT 'kafka-connector'CONFIG '{"connector.class": "com.singlestore.kafka.connect.kinesis.KinesisSourceConnector","aws.access.key.id": "<aws_access_key>","aws.secret.key.id": "<aws_secret_key>","kafka.topic": "kinesis-topic","kinesis.stream": "my-kinesis-stream","kinesis.region": "us-east-1","tasks.max": 4}'CREDENTIALS '{}' -- AWS credentials are in CONFIG for Kinesis connectorFORMAT AVRO;-- Start the pipelineSTART PIPELINE kinesis_pipeline;
Static Schema Table
When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table:
CREATE TABLE `<pipeline_name>` (
`topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`id` JSON COLLATE utf8mb4_bin NOT NULL,
`record` JSON COLLATE utf8mb4_bin NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
)Querying Data
Extract data from the JSON record column using JSON functions:
-- Extract a specific fieldSELECTtopic,JSON_EXTRACT_STRING(record, 'fieldName') AS field_valueFROM kinesis_pipeline;-- Filter based on JSON contentSELECT *FROM kinesis_pipelineWHERE JSON_EXTRACT_STRING(record, 'status') = 'active';-- Extract multiple fieldsSELECTtopic,JSON_EXTRACT_STRING(record, 'customer_id') AS customer_id,JSON_EXTRACT_STRING(record, 'order_id') AS order_id,JSON_EXTRACT_STRING(record, 'timestamp') AS event_timeFROM kinesis_pipeline;-- Create a computed column for better performanceALTER TABLE kinesis_pipelineADD COLUMN status AS (JSON_EXTRACT_STRING(record, 'status')) PERSISTED STRING;CREATE INDEX idx_status ON kinesis_pipeline(status);-- Query using the computed columnSELECT *FROM kinesis_pipelineWHERE status = 'active';
Example 2: Amazon Kinesis Pipeline with Stored Procedure
The following example demonstrates how to use a stored procedure to process and transform incoming Kinesis data before inserting it into a custom table schema.INTO PROCEDURE clause.
-- Enable the experimental featureSET GLOBAL experimental_features_config = "kafka_connect_enabled=true";-- Create the target table for parsed recordsCREATE TABLE parsed_kinesis_stream (partitionKey BIGINT,sequence_number VARCHAR(255),shardId VARCHAR(255),data_base64 VARCHAR(255),parsed_data AS FROM_BASE64(data_base64) PERSISTED VARCHAR(255));-- Create the stored procedure to process incoming recordsCREATE OR REPLACE PROCEDURE parse_kinesis_stream(batch QUERY(topic VARCHAR(255), id JSON, record JSON))ASBEGININSERT INTO parsed_kinesis_stream (partitionKey,sequence_number,shardId,data_base64)SELECTrecord::$partitionKey,record::$sequenceNumber,record::$shardId,record::$dataFROM batch;END;-- Create Kinesis pipeline with INTO PROCEDURECREATE PIPELINE kinesis_pipeline_to_procAS LOAD DATA KAFKACONNECT 'kafka-connector'CONFIG '{"connector.class": "com.singlestore.kafka.connect.kinesis.KinesisSourceConnector","aws.access.key.id": "<aws_access_key>","aws.secret.key.id": "<aws_secret_key>","kafka.topic": "kinesis-topic","kinesis.stream": "my-kinesis-stream","kinesis.region": "us-east-1","tasks.max": 4}'CREDENTIALS '{}'BATCH_INTERVAL 2500INTO PROCEDURE parse_kinesis_streamFORMAT AVRO (`topic` <- `topic`,`id` <- `id`,`record` <- `record`);-- Start the pipelineSTART PIPELINE kinesis_pipeline_to_proc;
Target Table Schema
The target table stores both the raw base64-encoded data and a computed column that automatically decodes it:
|
Column |
Type |
Description |
|---|---|---|
|
|
|
Kinesis partition key |
|
|
|
Kinesis sequence number |
|
|
|
Source shard identifier |
|
|
|
Raw base64-encoded payload |
|
|
|
Automatically decoded payload (computed) |
Querying Data
Query the decoded data directly using the computed column:
-- Query decoded dataSELECTpartitionKey,sequence_number,parsed_dataFROM parsed_kinesis_stream;-- Extract JSON fields from decoded dataSELECTpartitionKey,JSON_EXTRACT_STRING(parsed_data, 'customer_id') AS customer_id,JSON_EXTRACT_STRING(parsed_data, 'event_type') AS event_typeFROM parsed_kinesis_stream;
Working with Base64-Encoded Data
Kinesis record payloads are delivered as base64-encoded in the record::$data field.FROM_ to decode the data before applying JSON functions:
-- Decode and extract in a single querySELECTJSON_EXTRACT_STRING(FROM_BASE64(data_base64), 'customer_id') AS customer_id,JSON_EXTRACT_STRING(FROM_BASE64(data_base64), 'status') AS statusFROM parsed_kinesis_streamWHERE JSON_EXTRACT_STRING(FROM_BASE64(data_base64), 'status') = 'active';
Last modified: