Load Data from Amazon Kinesis Using a Kafka Connect Pipeline

SingleStore pipelines can extract streaming data from Amazon Kinesis Data Streams using Kafka Connect source connectors, optionally transform them, and insert them into a destination table. SingleStore Kafka Connect Pipelines leverage the Kafka Connect ecosystem to stream data from external systems into SingleStore without requiring an intermediate Kafka cluster.

Prerequisites

To complete this guide, your environment must meet the following prerequisites:

  • AWS Account: This guide uses Amazon Kinesis and requires an AWS account's access key ID and secret access key.

  • SingleStore Helios installation -or- a SingleStore Helios workspace: You will connect to the workspace and create a pipeline to pull data from your Amazon Kinesis Data Stream.

  • Kafka Connect Pipelines enabled: This is an experimental feature that must be explicitly enabled by a user with the SUPER permission before creating pipelines.

Part 1: Enable Kafka Connect Pipelines

Run the following command to enable this feature:

SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"

Note

This setting must be configured before creating Kafka Connect Pipelines and requires the SUPER permission. The setting persists across cluster restarts and changes take effect immediately.

Verify that the feature is enabled:

SHOW VARIABLES LIKE 'experimental_features_config'
+------------------------------+----------------------------+
|        Variable_name         |            Value           |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+

Part 2: Set Up Amazon Kinesis Data Stream

Create a Kinesis Data Stream

  1. Log into the AWS Management Console.

  2. Navigate to Kinesis.

  3. Select Data Streams from the left navigation menu.

  4. Select Create data stream.

  5. Enter a stream name (e.g., my-kinesis-stream).

  6. Select the capacity mode:

    • On-demand: Automatically scales based on throughput

    • Provisioned: Specify the number of shards

  7. Select Create data stream.

Note the following information for later use:

  • Stream name (e.g., my-kinesis-stream)

  • AWS Region (e.g., us-east-1)

  • Number of shards (for optimal pipeline performance)

Generate AWS Credentials

To access your Kinesis Data Stream, you need AWS credentials with appropriate permissions.

Required IAM Permissions

The following minimum permissions are required:

  • kinesis:GetRecords

  • kinesis:GetShardIterator

  • kinesis:DescribeStream

  • kinesis:ListShards

Create an IAM Policy

  1. In the AWS Management Console, select IAM from the list of services.

  2. Under Access Management, select Policies, and then select Create policy.

  3. Select the JSON tab and enter the following policy (replace <stream-name> with your stream name):

    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Sid": "KinesisReadAccess",
    "Effect": "Allow",
    "Action": [
    "kinesis:GetRecords",
    "kinesis:GetShardIterator",
    "kinesis:DescribeStream",
    "kinesis:ListShards"
    ],
    "Resource": "arn:aws:kinesis:*:*:stream/<stream-name>"
    }
    ]
    }
  4. Select Next and enter a policy name (e.g., SingleStoreKinesisReadPolicy).

  5. Select Create policy.

Assign the IAM Policy to a User

  1. In the IAM service, select Users and then select Add users.

  2. Enter a name for the new user and select Next.

  3. Select Attach policies directly.

  4. Search for the policy you created and select the checkbox next to it.

  5. Select Next and then select Create user.

Create Access Keys

  1. In the IAM service, select Users and select the user name you created.

  2. Select the Security credentials tab.

  3. In the Access keys section, select Create access key.

  4. Select Third-party service and select Next.

  5. (Optional but recommended) Add a description tag.

  6. Select Create access key.

  7. Download the CSV file or copy the credentials. You will need:

    • Access key ID

    • Secret access key

Note

If you do not download or copy the credentials before selecting Done, the secret key cannot be retrieved and will need to be recreated.

Part 3: Create a SingleStore Database and Kinesis Pipeline

Now that you have a Kinesis Data Stream configured, you can create a SingleStore database and pipeline to ingest the streaming data.

Create the Database

Create a new database to hold your data:

CREATE DATABASE kinesis_data;
USE kinesis_data;

Deploy the Kafka Connect Connector

To deploy and configure custom Kafka Connect connectors, contact SingleStore Support with connector requirements.

Create the Kinesis Pipeline

Use the following information to create your pipeline:

  • Stream name: my-kinesis-stream

  • AWS Region: us-east-1

  • Access Key ID: <your_access_key_id>

  • Secret Access Key: <your_secret_access_key>

  • Number of shards: (match your Kinesis stream configuration)

Run the following command by replacing the placeholder values with your own:

CREATE INFERRED PIPELINE kinesis_pipeline
AS LOAD DATA KAFKACONNECT 'kafka-connector'
CONFIG '{
"connector.class": "com.github.jcustenborder.kafka.connect.kinesis.KinesisSourceConnector",
"aws.access.key.id": "<your_access_key_id>",
"aws.secret.key.id": "<your_secret_access_key>",
"kafka.topic": "kinesis-topic",
"kinesis.stream": "my-kinesis-stream",
"kinesis.region": "us-east-1",
"tasks.max": 4
}'
CREDENTIALS '{}'
FORMAT AVRO;

Important configuration notes:

  • connector.class: Fully-qualified Java class name of the Kafka Connect source connector

  • tasks.max: Set this equal to the number of shards in your Kinesis stream for optimal performance. The default value is 4.

  • kafka.topic: A logical identifier for the data source (does not require an actual Kafka topic)

  • kinesis.region: AWS region where your Kinesis stream is located

  • Credentials: AWS credentials must be placed in the CONFIG parameter for Kinesis. The CREDENTIALS parameter can remain empty

Static Schema Table

When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table with a predefined structure:

CREATE TABLE `kinesis_pipeline` (
`topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`id` JSON COLLATE utf8mb4_bin NOT NULL,
`record` JSON COLLATE utf8mb4_bin NOT NULL,
SORT KEY `__UNORDERED` (),
SHARD KEY ()
)

The table contains three columns:

  • topic: Source identifier (TEXT)

  • id: Unique record identifier (JSON)

  • record: Complete record data (JSON)

This static schema allows SingleStore to ingest data from various sources without requiring predefined table schemas.

Start the Pipeline

You can run the pipeline in the foreground or background.

Start in the Foreground

To test the pipeline and load existing data, run the following command:

START PIPELINE kinesis_pipeline FOREGROUND;

This command runs synchronously and returns when all available records have been loaded.

Start in the Background

For continuous streaming, run the following command:

START PIPELINE kinesis_pipeline;

This command runs the pipeline in the background, continuously polling Kinesis for new records.

Verify Pipeline Status

Check the pipeline status:

SHOW PIPELINES;
+---------------------------+---------+
| Pipelines_in_kinesis_data |  State  |
+---------------------------+---------+
|      kinesis_pipeline     | Running |
+---------------------------+---------+

Run the following command to query detailed pipeline information:

SELECT
PIPELINE_NAME,
STATE,
CONFIG_JSON
FROM information_schema.PIPELINES
WHERE PIPELINE_NAME = 'kinesis_pipeline'

Last modified: February 18, 2026

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK

Try Out This Notebook to See What’s Possible in SingleStore

Get access to other groundbreaking datasets and engage with our community for expert advice.