Getting Started with Apache Kafka (Self-Managed)
On this page
This guide shows how to install and use the Java-based SingleStore Kafka Sink connector ("the connector") with open-source Apache Kafka and connect to your SingleStore deployments.
SingleStore recommends having hands-on experience with Apache Kafka and an understanding of its concepts.
Prerequisites
-
An active SingleStore deployment
-
Java 8+
-
Install the following:
Install the Connector
To install the SingleStore Kafka Sink connector, perform the following tasks:
-
Download the
singlestore-singlestore-kafka-connector-<version>.
file from the SingleStore Kafka Connector GitHub repository.zip -
Extract the downloaded
.
archive to a directory in your local environment.zip -
Add the path of the directory to which the archive is extracted to the Kafka Connect's plugin path by configuring the
plugin.
property in thepath connect-distributed.
file.properties For example, if the archive is extracted to the /home/user/kafka/plugins/singlestore-singlestore-kafka-connector
directory, setplugin.
topath /home/user/kafka/plugins/
. -
Restart the Kafka Connect process to identify and add the plugin JARs.
Configure the Connection
To configure the connection to SingleStore, perform the following tasks:
-
Run the following command to register the connector.
Note: Specify the Kafka Connect URL and connection configuration of your SingleStore deployment before running this command.
curl -i -X POST \-H "Accept:application/json" \-H "Content-Type:application/json" \<Kafka_Connect_URL>/connectors/ \-d '{"name": "singlestore-sink-connector","config": {"connector.class":"com.singlestore.kafka.SingleStoreSinkConnector","topics":"<topic_name>","connection.ddlEndpoint":"<endpoint>","connection.database":"<database>","connection.user":"<username>","connection.password":"<password>"}}'where,
-
endpoint
: Hostname or IP address of the SingleStore deployment. -
database
: Name of the SingleStore database to connect with. -
username
: Username of the SingleStore database user. -
password
: Password for the SingleStore database user. -
topic_
: Name of the Kafka topic.name
Refer to SingleStore Kafka Sink Connector Properties for more information.
-
-
Verify that the connector is running.
curl -X GET <Kafka Connect URL>/connectors/singlestore-sink-connector/statusIf the connector is running, the output is similar to the following (output is formatted for readability):
{"name": "singlestore-sink-connector","connector": {"state": "RUNNING","worker_id": "<endpoint>:<port>"},"tasks": [{ "id": 0,"state": "RUNNING","worker_id": "<endpoint>:<port>"}],"type": "sink"} -
Add data to the topic.
-
From the Kafka installation directory, run the console producer client to add events to the Kafka topic.
For example, bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server <Bootstrap_Server_URL> -
Add each line (event) in the console.
In this example, the following events are added to a topic named songs
:{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "2", "Artist": "AC/DC", "Song": "Highway to hell"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "3", "Artist": "Nirvana", "Song": "Smells like teen spirit"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "4", "Artist": "Judy Garland", "Song": "Over the Rainbow"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "5", "Artist": "Bing Crosby", "Song": "White Christmas"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "6", "Artist": "Woody Guthrie", "Song": "This Land Is Your Land"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "7", "Artist": "Aretha Franklin", "Song": "Respect"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "8", "Artist": "Don McLean", "Song": "American Pie"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "9", "Artist": "The Andrews Sisters", "Song": "Boogie Woogie Bugle Boy"}}{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "10", "Artist": "Billy Murray", "Song": "Take Me Out to the Ball Game"}}By default, each line represents a separate event that is written to the Kafka topic.
The input follows the schema, payload
format.Alternatively, you can redirect input from a file.
For example, bin/kafka-console-producer.sh --topic <topic_name> --bootstrap-server <Bootstrap_Server_URL> < <filename.json>
-
-
Stop the producer, for example using
Ctrl+c
. -
After events are added to the topic, the connector automatically creates a corresponding table named
songs
in the specified SingleStore database and inserts rows into the table.For example: SELECT * FROM songs;+------+---------------------+------------------------------+ | Id | Artist | Song | +------+---------------------+------------------------------+ | 7 | Aretha Franklin | Respect | | 6 | Woody Guthrie | This Land Is Your Land | | 9 | The Andrews Sisters | Boogie Woogie Bugle Boy | | 10 | Billy Murray | Take Me Out to the Ball Game | | 5 | Bing Crosby | White Christmas | | 3 | Nirvana | Smells like teen spirit | | 1 | Rick Astley | Never Gonna Give You Up | | 8 | Don McLean | American Pie | | 2 | AC/DC | Highway to hell | | 4 | Judy Garland | Over the Rainbow | +------+---------------------+------------------------------+
Last modified: August 5, 2025