# Load Data from Amazon Kinesis Using a Kafka Connect Pipeline

SingleStore pipelines can extract streaming data from Amazon Kinesis Data Streams using Kafka Connect source connectors, optionally transform them, and insert them into a destination table. SingleStore Kafka Connect Pipelines leverage the Kafka Connect ecosystem to stream data from external systems into SingleStore without requiring an intermediate Kafka cluster.

## Prerequisites

To complete this guide, your environment must meet the following prerequisites:

* **AWS Account**: This guide uses Amazon Kinesis and requires an AWS account's access key ID and secret access key.
* **SingleStore installation -or- a SingleStore cluster**: You will connect to the cluster and create a pipeline to pull data from your Amazon Kinesis Data Stream.
* **Kafka Connect Pipelines enabled**: This is an experimental feature that must be explicitly enabled by a user with the `SUPER` permission before creating pipelines.

## Part 1: Enable Kafka Connect Pipelines

Run the following command to enable this feature:

```sql
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"
```

> **📝 Note**: This setting must be configured before creating Kafka Connect Pipelines and requires the `SUPER` permission. The setting persists across cluster restarts and changes take effect immediately.

Verify that the feature is enabled:

```sql
SHOW VARIABLES LIKE 'experimental_features_config'

```

```output

+------------------------------+----------------------------+
|        Variable_name         |            Value           |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+
```

## Part 2: Set Up Amazon Kinesis Data Stream

## Create a Kinesis Data Stream

1. Log into the AWS Management Console.

2. Navigate to **Kinesis**.

3. Select **Data Streams** from the left navigation menu.

4. Select **Create data stream**.

5. Enter a stream name (e.g., my-kinesis-stream).

6. Select the capacity mode:

   * **On-demand**: Automatically scales based on throughput
   * **Provisioned**: Specify the number of shards

7. Select **Create data stream**.

Note the following information for later use:

* Stream name (e.g., `my-kinesis-stream`)
* AWS Region (e.g., `us-east-1`)
* Number of shards (for optimal pipeline performance)

## Generate AWS Credentials

To access your Kinesis Data Stream, you need AWS credentials with appropriate permissions.

## Required IAM Permissions

The following minimum permissions are required:

* kinesis:GetRecords
* kinesis:GetShardIterator
* kinesis:DescribeStream
* kinesis:ListShards

## Create an IAM Policy

1. In the AWS Management Console, select **IAM** from the list of services.

2. Under **Access Management**, select **Policies**, and then select **Create policy**.

3. Select the **JSON** tab and enter the following policy (replace `<stream-name>` with your stream name):
   ```json
   {
     "Version": "2012-10-17",
     "Statement": [
       {
         "Sid": "KinesisReadAccess",
         "Effect": "Allow",
         "Action": [
           "kinesis:GetRecords",
           "kinesis:GetShardIterator",
           "kinesis:DescribeStream",
           "kinesis:ListShards"
         ],
         "Resource": "arn:aws:kinesis:*:*:stream/<stream-name>"
       }
     ]
   }
   ```

4. Select **Next** and enter a policy name (e.g., `SingleStoreKinesisReadPolicy`).

5. Select **Create policy**.

## Assign the IAM Policy to a User

1. In the **IAM** service, select **Users** and then select **Add users**.

2. Enter a name for the new user and select **Next**.

3. Select **Attach policies directly**.

4. Search for the policy you created and select the checkbox next to it.

5. Select **Next** and then select **Create user**.

## Create Access Keys

1. In the **IAM** service, select **Users** and select the user name you created.

2. Select the **Security credentials** tab.

3. In the **Access keys** section, select **Create access key**.

4. Select **Third-party service** and select **Next**.

5. (Optional but recommended) Add a description tag.

6. Select **Create access key**.

7. Download the CSV file or copy the credentials. You will need:

   * Access key ID
   * Secret access key

> **📝 Note**: If you do not download or copy the credentials before selecting **Done**, the secret key cannot be retrieved and will need to be recreated.

## Part 3: Create a SingleStore Database and Kinesis Pipeline

Now that you have a Kinesis Data Stream configured, you can create a SingleStore database and pipeline to ingest the streaming data.

## Create the Database

Create a new database to hold your data:

```sql
CREATE DATABASE kinesis_data;
USE kinesis_data;

```

## Deploy the Kafka Connect Connector

SingleStore provides full support for custom Kafka Connect connectors and enables organizations to deploy and configure them with complete control over configuration and management. Organizations also handle ongoing maintenance and updates.

## Create the Kinesis Pipeline

Use the following information to create your pipeline:

* **Stream name**: `my-kinesis-stream`
* **AWS Region**: `us-east-1`
* **Access Key ID**: `<your_access_key_id>`
* **Secret Access Key**: `<your_secret_access_key>`
* **Number of shards**: (match your Kinesis stream configuration)

Run the following command by replacing the placeholder values with your own:

```sql
CREATE INFERRED PIPELINE kinesis_pipeline
AS LOAD DATA KAFKACONNECT 'kafka-connector'
CONFIG '{
  "connector.class": "com.github.jcustenborder.kafka.connect.kinesis.KinesisSourceConnector",
  "aws.access.key.id": "<your_access_key_id>",
  "aws.secret.key.id": "<your_secret_access_key>",
  "kafka.topic": "kinesis-topic",
  "kinesis.stream": "my-kinesis-stream",
  "kinesis.region": "us-east-1",
  "tasks.max": 4
}'
CREDENTIALS '{}'
FORMAT AVRO;
```

Important configuration notes:

* `connector.class`: Fully-qualified Java class name of the Kafka Connect source connector
* `tasks.max`: Set this equal to the number of shards in your Kinesis stream for optimal performance. The default value is `4`.
* `kafka.topic`: A logical identifier for the data source (does not require an actual Kafka topic)
* `kinesis.region`: AWS region where your Kinesis stream is located
* Credentials: AWS credentials must be placed in the `CONFIG` parameter for Kinesis. The `CREDENTIALS` parameter can remain empty

## Static Schema Table

When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table with a predefined structure:

```sql
CREATE TABLE `kinesis_pipeline` (
  `topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
  `id` JSON COLLATE utf8mb4_bin NOT NULL,
  `record` JSON COLLATE utf8mb4_bin NOT NULL,
  SORT KEY `__UNORDERED` (),
  SHARD KEY ()
)

```

The table contains three columns:

* `topic`: Source identifier (`TEXT`)
* `id`: Unique record identifier (`JSON`)
* `record`: Complete record data (`JSON`)

This static schema allows SingleStore to ingest data from various sources without requiring predefined table schemas.

## Start the Pipeline

You can run the pipeline in the foreground or background.

## Start in the Foreground

To test the pipeline and load existing data, run the following command:

```sql
START PIPELINE kinesis_pipeline FOREGROUND;
```

This command runs synchronously and returns when all available records have been loaded.

## Start in the Background

For continuous streaming, run the following command:

```sql
START PIPELINE kinesis_pipeline;
```

This command runs the pipeline in the background, continuously polling Kinesis for new records.

## Verify Pipeline Status

Check the pipeline status:

```sql
SHOW PIPELINES;

```

```output

+---------------------------+---------+
| Pipelines_in_kinesis_data |  State  |
+---------------------------+---------+
|      kinesis_pipeline     | Running |
+---------------------------+---------+

```

Run the following command to query detailed pipeline information:

```sql
SELECT
  PIPELINE_NAME,
  STATE,
  CONFIG_JSON
FROM information_schema.PIPELINES
WHERE PIPELINE_NAME = 'kinesis_pipeline'

```

***

Modified at: February 19, 2026

Source: [/db/v9.1/load-data/data-sources/load-data-from-amazon-kinesis-using-a-kafka-connect-pipeline/](https://docs.singlestore.com/db/v9.1/load-data/data-sources/load-data-from-amazon-kinesis-using-a-kafka-connect-pipeline/)

(An index of the documentation is available at /llms.txt)
