Load Data from Amazon Kinesis Using a Kafka Connect Pipeline
On this page
SingleStore pipelines can extract streaming data from Amazon Kinesis Data Streams using Kafka Connect source connectors, optionally transform them, and insert them into a destination table.
Prerequisites
To complete this guide, your environment must meet the following prerequisites:
-
AWS Account: This guide uses Amazon Kinesis and requires an AWS account's access key ID and secret access key.
-
SingleStore installation -or- a SingleStore cluster: You will connect to the cluster and create a pipeline to pull data from your Amazon Kinesis Data Stream.
-
Kafka Connect Pipelines enabled: This is an experimental feature that must be explicitly enabled by a user with the
SUPERpermission before creating pipelines.
Part 1: Enable Kafka Connect Pipelines
Run the following command to enable this feature:
SET GLOBAL experimental_features_config = "kafka_connect_enabled=true"
Note
This setting must be configured before creating Kafka Connect Pipelines and requires the SUPER permission.
Verify that the feature is enabled:
SHOW VARIABLES LIKE 'experimental_features_config'
+------------------------------+----------------------------+
| Variable_name | Value |
+------------------------------+----------------------------+
| experimental_features_config | kafka_connect_enabled=true |
+------------------------------+----------------------------+Part 2: Set Up Amazon Kinesis Data Stream
Create a Kinesis Data Stream
-
Log into the AWS Management Console.
-
Navigate to Kinesis.
-
Select Data Streams from the left navigation menu.
-
Select Create data stream.
-
Enter a stream name (e.
g. , my-kinesis-stream). -
Select the capacity mode:
-
On-demand: Automatically scales based on throughput
-
Provisioned: Specify the number of shards
-
-
Select Create data stream.
Note the following information for later use:
-
Stream name (e.
g. , my-kinesis-stream) -
AWS Region (e.
g. , us-east-1) -
Number of shards (for optimal pipeline performance)
Generate AWS Credentials
To access your Kinesis Data Stream, you need AWS credentials with appropriate permissions.
Required IAM Permissions
The following minimum permissions are required:
-
kinesis:GetRecords
-
kinesis:GetShardIterator
-
kinesis:DescribeStream
-
kinesis:ListShards
Create an IAM Policy
-
In the AWS Management Console, select IAM from the list of services.
-
Under Access Management, select Policies, and then select Create policy.
-
Select the JSON tab and enter the following policy (replace
<stream-name>with your stream name):{"Version": "2012-10-17","Statement": [{"Sid": "KinesisReadAccess","Effect": "Allow","Action": ["kinesis:GetRecords","kinesis:GetShardIterator","kinesis:DescribeStream","kinesis:ListShards"],"Resource": "arn:aws:kinesis:*:*:stream/<stream-name>"}]} -
Select Next and enter a policy name (e.
g. , SingleStoreKinesisReadPolicy). -
Select Create policy.
Assign the IAM Policy to a User
-
In the IAM service, select Users and then select Add users.
-
Enter a name for the new user and select Next.
-
Select Attach policies directly.
-
Search for the policy you created and select the checkbox next to it.
-
Select Next and then select Create user.
Create Access Keys
-
In the IAM service, select Users and select the user name you created.
-
Select the Security credentials tab.
-
In the Access keys section, select Create access key.
-
Select Third-party service and select Next.
-
(Optional but recommended) Add a description tag.
-
Select Create access key.
-
Download the CSV file or copy the credentials.
You will need: -
Access key ID
-
Secret access key
-
Note
If you do not download or copy the credentials before selecting Done, the secret key cannot be retrieved and will need to be recreated.
Part 3: Create a SingleStore Database and Kinesis Pipeline
Now that you have a Kinesis Data Stream configured, you can create a SingleStore database and pipeline to ingest the streaming data.
Create the Database
Create a new database to hold your data:
CREATE DATABASE kinesis_data;USE kinesis_data;
Deploy the Kafka Connect Connector
SingleStore provides full support for custom Kafka Connect connectors and enables organizations to deploy and configure them with complete control over configuration and management.
Create the Kinesis Pipeline
Use the following information to create your pipeline:
-
Stream name:
my-kinesis-stream -
AWS Region:
us-east-1 -
Access Key ID:
<your_access_ key_ id> -
Secret Access Key:
<your_secret_ access_ key> -
Number of shards: (match your Kinesis stream configuration)
Run the following command by replacing the placeholder values with your own:
CREATE INFERRED PIPELINE kinesis_pipelineAS LOAD DATA KAFKACONNECT 'kafka-connector'CONFIG '{"connector.class": "com.github.jcustenborder.kafka.connect.kinesis.KinesisSourceConnector","aws.access.key.id": "<your_access_key_id>","aws.secret.key.id": "<your_secret_access_key>","kafka.topic": "kinesis-topic","kinesis.stream": "my-kinesis-stream","kinesis.region": "us-east-1","tasks.max": 4}'CREDENTIALS '{}'FORMAT AVRO;
Important configuration notes:
-
connector.: Fully-qualified Java class name of the Kafka Connect source connectorclass -
tasks.: Set this equal to the number of shards in your Kinesis stream for optimal performance.max The default value is 4. -
kafka.: A logical identifier for the data source (does not require an actual Kafka topic)topic -
kinesis.: AWS region where your Kinesis stream is locatedregion -
Credentials: AWS credentials must be placed in the
CONFIGparameter for Kinesis.The CREDENTIALSparameter can remain empty
Static Schema Table
When an inferred Kafka Connect Pipeline is created, SingleStore automatically creates a table with a predefined structure:
CREATE TABLE `kinesis_pipeline` (`topic` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,`id` JSON COLLATE utf8mb4_bin NOT NULL,`record` JSON COLLATE utf8mb4_bin NOT NULL,SORT KEY `__UNORDERED` (),SHARD KEY ())
The table contains three columns:
-
topic: Source identifier (TEXT) -
id: Unique record identifier (JSON) -
record: Complete record data (JSON)
This static schema allows SingleStore to ingest data from various sources without requiring predefined table schemas.
Start the Pipeline
You can run the pipeline in the foreground or background.
Start in the Foreground
To test the pipeline and load existing data, run the following command:
START PIPELINE kinesis_pipeline FOREGROUND;
This command runs synchronously and returns when all available records have been loaded.
Start in the Background
For continuous streaming, run the following command:
START PIPELINE kinesis_pipeline;
This command runs the pipeline in the background, continuously polling Kinesis for new records.
Verify Pipeline Status
Check the pipeline status:
SHOW PIPELINES;+---------------------------+---------+
| Pipelines_in_kinesis_data | State |
+---------------------------+---------+
| kinesis_pipeline | Running |
+---------------------------+---------+Run the following command to query detailed pipeline information:
SELECTPIPELINE_NAME,STATE,CONFIG_JSONFROM information_schema.PIPELINESWHERE PIPELINE_NAME = 'kinesis_pipeline'
Last modified: February 18, 2026