# Kafka Connect Pipelines

> **📝 Note**: This is a Preview feature.

## Overview

SingleStore Kafka Connect Pipelines can use Kafka Connect source connectors to stream data from external systems into SingleStore. SingleStore leverages the Kafka Connect ecosystem instead of building native connectors for every data source, which includes multiple pre-built connectors.

## Key Concepts

The [Kafka Connect](https://kafka.apache.org/41/kafka-connect/) framework is an open-source component of Apache Kafka that provides a scalable and reliable method to stream data between systems. Kafka Connect includes:

* Source connectors: Pull data from external systems into Kafka
* Sink connectors: Push data from Kafka to external systems

SingleStore uses Kafka Connect source connectors to ingest data directly into SingleStore tables without requiring an intermediate Kafka cluster. Kafka Connect Pipelines support source connectors only. Sink connectors are not supported. For writing data from SingleStore to Kafka, use the SingleStore Kafka sink connector. Kafka Connect Pipelines supports Amazon Kinesis as a data source currently. Refer to [Amazon Kinesis](https://docs.singlestore.com/db/v9.1/load-data/data-sources/load-data-from-amazon-kinesis-using-a-kafka-connect-pipeline.md) for more information.

## Architecture

## Key Architectural Features

1. Leaf Node Processing: The extractor processes data on leaf nodes rather than the Master Aggregator which reduces load on the aggregator and improves performance.

2. Static Schema Table: Data is automatically loaded into a predefined table structure with three columns:

   * `topic` (`TEXT`): Source identifier
   * `id` (`JSON`): Unique record identifier
   * `record` (`JSON`): Complete record data

3. JSON-Based Offset Management: Uses `information_schema.PIPELINES_SOURCE_OFFSETS` table to track offsets in JSON format that supports complex offset structures required by different connectors.

4. Multi-Task Support: Pipelines can spawn multiple tasks for parallel processing when the data source supports partitioning (for example, Kinesis shards).

## How Kafka Connect Pipelines Works

When a Kafka Connect pipeline is created, SingleStore performs the following:

* Checks the connector class and configuration parameters
* If using `CREATE INFERRED PIPELINE`, automatically creates an inferred table with the static schema
* Launches the extractor process on leaf nodes
* Ingests data from the external data source and loads data into the table
* Stores offset information in `PIPELINES_SOURCE_OFFSETS` for exactly-once semantics

## Deploy Kafka Connect Connectors

SingleStore provides full support for custom Kafka Connect connectors and enables deploying and configuring them with complete control over configuration and management. Ongoing maintenance and updates are handled by users.&#x20;

## Enable Kafka Connect Pipelines

Kafka Connect Pipelines is an experimental feature that must be explicitly enabled. Run the following command to enable this feature:

```sql
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"

```

> **📝 Note**: This setting must be configured before creating Kafka Connect Pipelines and requires the `SUPER` permission. The setting persists across cluster restarts and changes take effect immediately.

Run the following command to confirm whether this feature is enabled.

```sql
SHOW VARIABLES LIKE 'experimental_features_config'

```

```output

+------------------------------+----------------------------+
|        Variable_name         |            Value           |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+

```

After enabling the Kafka Connect Pipelines, start the Kafka Connect source connector manually using the provided configuration.

## Syntax

```sql
CREATE [OR REPLACE] INFERRED PIPELINE <pipeline_name>
AS LOAD DATA KAFKACONNECT '<kafka_connector>'
CONFIG '<connector_configuration_json>'
CREDENTIALS '<credentials_json>'
FORMAT AVRO;
```

## CONFIG Parameter

The `CONFIG` parameter must contain a JSON object with the following:

* Required fields:

  * `connector.class`: The fully-qualified Java class name of the Kafka Connect source connector
* Common optional fields:

  * `tasks.max`: The maximum number of parallel tasks. The default value is `4`.
* Connector specific fields: Vary by connector type.

Refer to [`CREATE PIPELINE`](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/create-pipeline.md) and [`CREATE INFERRED PIPELINE`](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/create-inferred-pipeline.md) for more information.

## Offset Management

Kafka Connect Pipelines use JSON-based offsets stored in `information_schema.PIPELINES_SOURCE_OFFSETS`, which differs from traditional integer-based offsets used by native Kafka pipelines.

```sql
SELECT * FROM information_schema.PIPELINES_SOURCE_OFFSETS
WHERE PIPELINE_NAME = 'kafkaconnect-pipeline'
```

## Offset Format Examples

The following is the format of `KEY` and `VALUE` of Amazon Kinesis offsets:

```json
KEY: {"shardId":"shardId-XXXX"}
VALUE: {"sequenceNumber":"XXXX"}

```

## Manage Kafka Connect Pipelines

## Pipeline Lifecycle Operations

Refer to [The Lifecycle of a Pipeline](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines/pipeline-concepts/the-lifecycle-of-a-pipeline.md) for more information.

## Check Pipeline Status

```sql
SELECT 
  PIPELINE_NAME,
  STATE,
  CONFIG_JSON
FROM information_schema.PIPELINES
WHERE PIPELINE_NAME = '<kafkaconnect-pipeline>'
```

The following are the pipeline states:

* Running: Pipeline is actively ingesting data
* Stopped: Pipeline is stopped
* Error: Pipeline encountered an error

## Configuration Best Practices

## Task Configuration

Parallel processing with `tasks.max`:

* Configure `tasks.max` based on data source partitioning
* For Amazon Kinesis: Set `tasks.max` equal to the number of shards
* Monitor `TASK_ID` distribution in `PIPELINES_SOURCE_OFFSETS`

```sql
-- Check task distribution
SELECT 
  TASK_ID,
  COUNT(*) as offset_count
FROM information_schema.PIPELINES_SOURCE_OFFSETS
WHERE PIPELINE_NAME = '<pipeline_name>'
GROUP BY TASK_ID;

```

## Security Best Practices

1. Credential Storage: Always use the `CREDENTIALS` parameter for sensitive information, never include passwords in `CONFIG`

2. Network Security: Ensure secure connections to data sources (use SSL/TLS when available)

3. Access Control: Grant minimum required permissions to pipeline users

4. Audit Logging: Enable logging for pipeline operations and monitor access

## Performance Optimization

1. Configure Extraction Parameters: Use `SET` statements to tune the extraction performance:
   ```sql
   SET GLOBAL pipelines_extractor_batch_size = 10000;
   SET GLOBAL pipelines_extractor_max_batch_interval_ms = 1000;
   ```

2. Computed Columns: Create computed columns for frequently accessed JSON fields
   ```sql
   ALTER TABLE <pipeline_table>
   ADD COLUMN customer_id AS (JSON_EXTRACT_STRING(record, 'customer_id')) PERSISTED INT;
   ```

3. Indexes: Add indexes on computed columns for better query performance
   ```sql
   CREATE INDEX idx_customer_id ON <pipeline_table>(customer_id)
   ```

4. Monitor Batch Times: Track batch processing time and adjust configuration if needed

5. Offset Progress: Regularly check `PIPELINES_SOURCE_OFFSETS` to ensure offsets are advancing

## Examples

The following examples demonstrate how to ingest data from Amazon Kinesis.

## Example: Amazon Kinesis Pipeline

The following example demonstrates how to create a basic Kafka Connect Pipeline that automatically creates a table with a static schema and then ingests data from Amazon Kinesis into the table.

```sql
-- Enable the experimental feature
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true";

-- Create Kinesis pipeline
CREATE INFERRED PIPELINE kinesis_pipeline 
AS LOAD DATA KAFKACONNECT 'kafka-connector'
CONFIG '{
  "connector.class": "com.singlestore.kafka.connect.kinesis.KinesisSourceConnector",
  "aws.access.key.id": "<aws_access_key>",
  "aws.secret.access.key": "<aws_secret_key>",
  "aws.session.token": "<aws_session_token>",
  "kafka.topic": "kinesis-topic",
  "kinesis.stream": "my-kinesis-stream",
  "kinesis.region": "us-east-1",
  "tasks.max": 4
}'
CREDENTIALS '{}' -- AWS credentials are in CONFIG for Kinesis connector
FORMAT AVRO;

-- Start the pipeline
START PIPELINE kinesis_pipeline;
```

## Static Schema Table

When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table:

```sql
CREATE TABLE `kinesis_pipeline` (
`topic` TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`id` JSON COLLATE utf8mb4_bin NOT NULL,
`record` JSON COLLATE utf8mb4_bin NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
)
AUTOSTATS_CARDINALITY_MODE = INCREMENTAL
AUTOSTATS_HISTOGRAM_MODE = CREATE
AUTOSTATS_SAMPLING = ON
SQL_MODE = 'STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'
CHARACTER SET = utf8mb4
COLLATE = utf8mb4_bin;

```

## Querying Data

Extract data from the JSON record column using JSON functions:

```sql
-- Extract a specific field
SELECT
 topic,
 JSON_EXTRACT_STRING(record, 'partitionKey') AS partition_key,
 JSON_EXTRACT_STRING(record, 'sequenceNumber') AS seq_no,
 JSON_EXTRACT_STRING(record, 'shardId') AS shard_id
FROM kinesis_pipeline;

```

```output

+----------------+---------------+------------------------------------------------------------+------------------------+
| topic          | partition_key | seq_no                                                     | shard_id               |
+----------------+---------------+------------------------------------------------------------+------------------------+
| kinesis-topic  | user-c        | 49673425577497259999258948817398122652610324249597444098   | shardId-000000000000   |
| kinesis-topic  | user-b        | 49673425577497259999258948817396913726790709620422737922   | shardId-000000000000   |
| kinesis-topic  | user-b        | 49673425577497259999258948817399331578429938878772150274   | shardId-000000000000   |
+----------------+---------------+------------------------------------------------------------+------------------------+
```

```sql
-- Filter based on JSON content
SELECT *
FROM kinesis_pipeline
WHERE JSON_EXTRACT_STRING(record, 'partitionKey') = 'user-c';

```

```output

+----------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| topic          | id                               | record                                                                                                                                                                                                                                                          |
+----------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| kinesis-topic  | {"partitionKey":"user-c"}        | {"approximateArrivalTimestamp":1775700271554,"data":"eyJzZXEiOjQsIm1zZyI6ImZvdXJ0aCJ9","partitionKey":"user-c","sequenceNumber":"49673425577497259999258948817398122652610324249597444098","shardId":"shardId-000000000000","streamName":"kinesis-data-stream"} |
+----------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

```

```sql
-- Extract multiple fields
SELECT
  topic,

  -- id envelope
  JSON_EXTRACT_STRING(id, 'partitionKey') AS id_partition_key,

  -- outer record (Kinesis metadata)
  JSON_EXTRACT_STRING(record, 'partitionKey')     AS partition_key,
  JSON_EXTRACT_STRING(record, 'sequenceNumber') AS sequence_number,
  JSON_EXTRACT_STRING(record, 'shardId')        AS shard_id,
  JSON_EXTRACT_STRING(record, 'streamName')     AS stream_name,
  JSON_EXTRACT_BIGINT(record, 'approximateArrivalTimestamp') AS arrival_ts_ms,

  -- inner payload (decoded from record.data)
  JSON_EXTRACT_BIGINT(
    FROM_BASE64(JSON_EXTRACT_STRING(record, 'data')),
    'seq'
  ) AS seq,
  JSON_EXTRACT_STRING(
    FROM_BASE64(JSON_EXTRACT_STRING(record, 'data')),
    'msg'
  ) AS msg

FROM kinesis_pipeline;

```

```output

+----------------+------------------+---------------+------------------------------------------------------------+------------------------+---------------------+---------------+-----+--------+
| topic          | id_partition_key | partition_key | sequence_number                                            | shard_id               | stream_name         | arrival_ts_ms | seq | msg    |
+----------------+------------------+---------------+------------------------------------------------------------+------------------------+---------------------+---------------+-----+--------+
| kinesis-topic  | user-c           | user-c        | 49673425577497259999258948817398122652610324249597444098   | shardId-000000000000   | kinesis-data-stream | 1775700271554 | 4   | fourth |
| kinesis-topic  | user-b           | user-b        | 49673425577497259999258948817396913726790709620422737922   | shardId-000000000000   | kinesis-data-stream | 1775700271554 | 2   | second |
| kinesis-topic  | user-b           | user-b        | 49673425577497259999258948817399331578429938878772150274   | shardId-000000000000   | kinesis-data-stream | 1775700271554 | 5   | fifth  |
+----------------+------------------+---------------+------------------------------------------------------------+------------------------+---------------------+---------------+-----+--------+

```

```sql
-- Create a computed column for better performance

ALTER TABLE kinesis_pipeline ADD COLUMN partition_key
  AS JSON_EXTRACT_STRING(record, 'partitionKey') PERSISTED VARCHAR(128);

ALTER TABLE kinesis_pipeline ADD COLUMN seq
  AS JSON_EXTRACT_BIGINT(
    FROM_BASE64(JSON_EXTRACT_STRING(record, 'data')),
    'seq'
  ) PERSISTED BIGINT;

ALTER TABLE kinesis_pipeline ADD COLUMN msg
  AS JSON_EXTRACT_STRING(
    FROM_BASE64(JSON_EXTRACT_STRING(record, 'data')),
    'msg'
  ) PERSISTED VARCHAR(512);

```

```sql
-- Query using the computed column
-- Select only computed + identifiers
SELECT topic, partition_key, seq, msg, record
FROM kinesis_pipeline
WHERE seq > 3
ORDER BY seq;

```

```output

+----------------+---------------+-----+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| topic          | partition_key | seq | msg    | record                                                                                                                                                                                                                                                          |
+----------------+---------------+-----+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| kinesis-topic  | user-c        | 4   | fourth | {"approximateArrivalTimestamp":1775700271554,"data":"eyJzZXEiOjQsIm1zZyI6ImZvdXJ0aCJ9","partitionKey":"user-c","sequenceNumber":"49673425577497259999258948817398122652610324249597444098","shardId":"shardId-000000000000","streamName":"kinesis-data-stream"} |
| kinesis-topic  | user-b        | 5   | fifth  | {"approximateArrivalTimestamp":1775700271554,"data":"eyJzZXEiOjUsIm1zZyI6ImZpZnRoIn0=","partitionKey":"user-b","sequenceNumber":"49673425577497259999258948817399331578429938878772150274","shardId":"shardId-000000000000","streamName":"kinesis-data-stream"} |
+----------------+---------------+-----+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

```

***

Modified at: April 22, 2026

Source: [/db/v9.1/load-data/about-singlestore-pipelines/pipeline-concepts/kafka-connect-pipelines/](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines/pipeline-concepts/kafka-connect-pipelines/)

(An index of the documentation is available at /llms.txt)
