Important

The SingleStore 9.1 release candidate (RC) gives you the opportunity to preview, evaluate, and provide feedback on new and upcoming features prior to their general availability. In the interim, SingleStore 9.0 is recommended for production workloads, which can later be upgraded to SingleStore 9.1.

Kafka Connect Pipelines

Note

This is a Preview feature.

Overview

SingleStore Kafka Connect Pipelines can use Kafka Connect source connectors to stream data from external systems into SingleStore. SingleStore leverages the Kafka Connect ecosystem instead of building native connectors for every data source, which includes multiple pre-built connectors.

Key Concepts

The Kafka Connect framework is an open-source component of Apache Kafka that provides a scalable and reliable method to stream data between systems. Kafka Connect includes:

  • Source connectors: Pull data from external systems into Kafka

  • Sink connectors: Push data from Kafka to external systems

SingleStore uses Kafka Connect source connectors to ingest data directly into SingleStore tables without requiring an intermediate Kafka cluster. Kafka Connect Pipelines support source connectors only. Sink connectors are not supported. For writing data from SingleStore to Kafka, use the SingleStore Kafka sink connector. Kafka Connect Pipelines supports Amazon Kinesis as a data source currently. Refer to Amazon Kinesis for more information.

Architecture

Key Architectural Features

  1. Leaf Node Processing: The extractor processes data on leaf nodes rather than the Master Aggregator which reduces load on the aggregator and improves performance.

  2. Static Schema Table: Data is automatically loaded into a predefined table structure with three columns:

    • topic (TEXT): Source identifier

    • id (JSON): Unique record identifier

    • record (JSON): Complete record data

  3. JSON-Based Offset Management: Uses information_schema.PIPELINES_SOURCE_OFFSETS table to track offsets in JSON format that supports complex offset structures required by different connectors.

  4. Multi-Task Support: Pipelines can spawn multiple tasks for parallel processing when the data source supports partitioning (for example, Kinesis shards).

How Kafka Connect Pipelines Works

When a Kafka Connect pipeline is created, SingleStore performs the following:

  • Checks the connector class and configuration parameters

  • If using CREATE INFERRED PIPELINE, automatically creates an inferred table with the static schema

  • Launches the extractor process on leaf nodes

  • Ingests data from the external data source and loads data into the table

  • Stores offset information in PIPELINES_SOURCE_OFFSETS for exactly-once semantics

Deploy Kafka Connect Connectors

SingleStore provides full support for custom Kafka Connect connectors and enables organizations to deploy and configure them with complete control over configuration and management. Organizations also handle ongoing maintenance and updates.

Enable Kafka Connect Pipelines

Kafka Connect Pipelines is an experimental feature that must be explicitly enabled. Run the following command to enable this feature:

SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"

Note

This setting must be configured before creating Kafka Connect Pipelines and requires the SUPER permission. The setting persists across cluster restarts and changes take effect immediately.

Run the following command to confirm whether this feature is enabled.

SHOW VARIABLES LIKE 'experimental_features_config'
+------------------------------+----------------------------+
|        Variable_name         |            Value           |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+

After enabling the Kafka Connect Pipelines, start the Kafka Connect source connector manually using the provided configuration.

Syntax

CREATE [OR REPLACE] INFERRED PIPELINE <pipeline_name>
AS LOAD DATA KAFKACONNECT '<kafka_connector>'
CONFIG '<connector_configuration_json>'
CREDENTIALS '<credentials_json>'
FORMAT AVRO;

CONFIG Parameter

The CONFIG parameter must contain a JSON object with the following:

  • Required fields:

    • connector.class: The fully-qualified Java class name of the Kafka Connect source connector

  • Common optional fields:

    • tasks.max: The maximum number of parallel tasks. The default value is 4.

  • Connector specific fields: Vary by connector type.

Refer to CREATE PIPELINE and CREATE INFERRED PIPELINE for more information.

Offset Management

Kafka Connect Pipelines use JSON-based offsets stored in information_schema.PIPELINES_SOURCE_OFFSETS, which differs from traditional integer-based offsets used by native Kafka pipelines.

SELECT * FROM information_schema.PIPELINES_SOURCE_OFFSETS
WHERE PIPELINE_NAME = 'kafkaconnect-pipeline'

Offset Format Examples

The following is the format of KEY and VALUE of Amazon Kinesis offsets:

KEY: {"shardId":"shardId-XXXX"}
VALUE: {"sequenceNumber":"XXXX"}

Manage Kafka Connect Pipelines

Pipeline Lifecycle Operations

Refer to The Lifecycle of a Pipeline for more information.

Check Pipeline Status

SELECT
PIPELINE_NAME,
STATE,
CONFIG_JSON
FROM information_schema.PIPELINES
WHERE PIPELINE_NAME = '<kafkaconnect-pipeline>'

The following are the pipeline states:

  • Running: Pipeline is actively ingesting data

  • Stopped: Pipeline is stopped

  • Error: Pipeline encountered an error

Configuration Best Practices

Task Configuration

Parallel processing with tasks.max:

  • Configure tasks.max based on data source partitioning

  • For Amazon Kinesis: Set tasks.max equal to the number of shards

  • Monitor TASK_ID distribution in PIPELINES_SOURCE_OFFSETS

-- Check task distribution
SELECT
TASK_ID,
COUNT(*) as offset_count
FROM information_schema.PIPELINES_SOURCE_OFFSETS
WHERE PIPELINE_NAME = '<pipeline_name>'
GROUP BY TASK_ID;

Security Best Practices

  1. Credential Storage: Always use the CREDENTIALS parameter for sensitive information, never include passwords in CONFIG

  2. Network Security: Ensure secure connections to data sources (use SSL/TLS when available)

  3. Access Control: Grant minimum required permissions to pipeline users

  4. Audit Logging: Enable logging for pipeline operations and monitor access

Performance Optimization

  1. Configure Extraction Parameters: Use SET statements to tune the extraction performance:

    SET GLOBAL pipelines_extractor_batch_size = 10000;
    SET GLOBAL pipelines_extractor_max_batch_interval_ms = 1000;
  2. Computed Columns: Create computed columns for frequently accessed JSON fields

    ALTER TABLE <pipeline_table>
    ADD COLUMN customer_id AS (JSON_EXTRACT_STRING(record, 'customer_id')) PERSISTED INT;
  3. Indexes: Add indexes on computed columns for better query performance

    CREATE INDEX idx_customer_id ON <pipeline_table>(customer_id)
  4. Monitor Batch Times: Track batch processing time and adjust configuration if needed

  5. Offset Progress: Regularly check PIPELINES_SOURCE_OFFSETS to ensure offsets are advancing

Example: Amazon Kinesis Pipeline

-- Enable the experimental feature
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true";
-- Create Kinesis pipeline
CREATE INFERRED PIPELINE kinesis_pipeline
AS LOAD DATA KAFKACONNECT 'kafka-connector'
CONFIG '{
"connector.class": "com.singlestore.kafka.connect.kinesis.KinesisSourceConnector",
"aws.access.key.id": "<aws_access_key>",
"aws.secret.key.id": "<aws_secret_key>",
"kafka.topic": "kinesis-topic",
"kinesis.stream": "my-kinesis-stream",
"kinesis.region": "us-east-1",
"tasks.max": 4
}'
CREDENTIALS '{}' -- AWS credentials are in CONFIG for Kinesis connector
FORMAT AVRO;
-- Start the pipeline
START PIPELINE kinesis_pipeline;

Static Schema Table

When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table:

CREATE TABLE `<pipeline_name>` (
  `topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  `id` JSON COLLATE utf8mb4_bin NOT NULL,
  `record` JSON COLLATE utf8mb4_bin NOT NULL,
  SORT KEY `__UNORDERED` (),
  SHARD KEY ()
)

Querying Data

Extract data from the JSON record column using JSON functions:

-- Extract a specific field
SELECT
topic,
JSON_EXTRACT_STRING(record, 'fieldName') AS field_value
FROM kinesis_pipeline;
-- Filter based on JSON content
SELECT *
FROM kinesis_pipeline
WHERE JSON_EXTRACT_STRING(record, 'status') = 'active';
-- Extract multiple fields
SELECT
topic,
JSON_EXTRACT_STRING(record, 'customer_id') AS customer_id,
JSON_EXTRACT_STRING(record, 'order_id') AS order_id,
JSON_EXTRACT_STRING(record, 'timestamp') AS event_time
FROM kinesis_pipeline;
-- Create a computed column for better performance
ALTER TABLE kinesis_pipeline
ADD COLUMN status AS (JSON_EXTRACT_STRING(record, 'status')) PERSISTED STRING;
CREATE INDEX idx_status ON kinesis_pipeline(status);
-- Query using the computed column
SELECT *
FROM kinesis_pipeline
WHERE status = 'active';

Last modified: February 19, 2026

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.