Getting Started with Confluent Platform
On this page
This guide shows how to use the SingleStore Kafka Sink connector ("the connector") on Confluent Platform and connect to your SingleStore deployments.
SingleStore recommends having hands-on experience with Confluent Platform and an understanding of its concepts.
Prerequisites
-
An active SingleStore deployment
-
Docker
Configure the Connection
To connect to your SingleStore deployment from Confluent Platform using the SingleStore Kafka Sink connector, perform the following tasks:
-
In your local environment, create a quickstart directory.
-
Create a
docker-compose.
file in the quickstart directory and add the following to the file:yml ---services:zookeeper:image: confluentinc/cp-zookeeper:7.8.0hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: [ "CMD", "nc", "-z", "localhost", "2181" ]start_period: 5mstart_interval: 10sinterval: 1mtimeout: 10sretries: 5broker:image: confluentinc/cp-server:7.8.0hostname: brokercontainer_name: brokerdepends_on:- zookeeperports:- "9092:9092"- "9101:9101"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporterKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1KAFKA_JMX_PORT: 9101KAFKA_JMX_HOSTNAME: localhostKAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1CONFLUENT_METRICS_ENABLE: 'true'CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'healthcheck:test: [ "CMD", "nc", "-z", "localhost", "9092" ]start_period: 5mstart_interval: 10sinterval: 1mtimeout: 10sretries: 5schema-registry:image: confluentinc/cp-schema-registry:7.8.0hostname: schema-registrycontainer_name: schema-registrydepends_on:- brokerports:- "8081:8081"environment:SCHEMA_REGISTRY_HOST_NAME: schema-registrySCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081healthcheck:test: [ "CMD", "nc", "-z", "localhost", "8081" ]start_period: 5mstart_interval: 10sinterval: 1mtimeout: 10sretries: 5connect:image: confluentinc/cp-server-connect:7.8.0hostname: connectcontainer_name: connectdepends_on:- broker- schema-registryports:- "8083:8083"volumes:- ./plugins:/tmp/connect-pluginsenvironment:CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'CONNECT_REST_ADVERTISED_HOST_NAME: connectCONNECT_GROUP_ID: compose-connect-groupCONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configsCONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsetsCONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1CONNECT_STATUS_STORAGE_TOPIC: docker-connect-statusCONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverterCONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverterCONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081# CLASSPATH required due to CC-2422CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.8.0.jarCONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERRORhealthcheck:test: [ "CMD", "nc", "-z", "localhost", "8083" ]start_period: 5mstart_interval: 10sinterval: 1mtimeout: 10sretries: 5control-center:image: confluentinc/cp-enterprise-control-center:7.8.0hostname: control-centercontainer_name: control-centerdepends_on:- broker- schema-registry- connectports:- "9021:9021"environment:CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"CONTROL_CENTER_REPLICATION_FACTOR: 1CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1CONFLUENT_METRICS_TOPIC_REPLICATION: 1PORT: 9021healthcheck:test: [ "CMD", "curl", "-f", "http://localhost:9021" ]start_period: 5mstart_interval: 10sinterval: 1mtimeout: 10sretries: 5 -
Create a quickstart/plugins directory.
-
Download the
singlestore-singlestore-kafka-connector-<version>.
file from the SingleStore Kafka Connector GitHub repository.zip -
Extract the downloaded
.
archive to the plugins directory.zip Now the quickstart directory has the following structure: quickstart/ ├─ plugins/ │ ├─ singlestore-singlestore-kafka-connector-<version>/ ├─ docker-compose.yml
-
Start Confluent Platform.
Run the following command from the quickstart directory: docker compose up -dOnce the processes complete, all the modules will be up and running.
To check the status of all the services, run the following command: docker compose ps -
Create a Kafka topic and add data to the topic.
-
Open Confluent Control Center at http://localhost:9021/clusters.
-
Select controlcenter.
cluster > Topics > Add topic. -
On the New topic page, enter a name and the number of partitions for the Kafka topic.
This example uses a topic named SingleStore-quickstart. -
Select Create with defaults.
-
On the SingleStore-quickstart page, select Messages > Produce a new message.
-
On the Produce a new message dialog, add a message.
In this example the following are added: Key
1
Value
{"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up"}}
-
Select Produce.
-
Start the connector using the following command:
Note: Specify the connection configuration of your SingleStore deployment before running this command.
curl -X POST http://localhost:8083/connectors \-H "Content-Type:application/json" \-H "Accept:application/json" \-d '{"name": "singlestore-sink-connector","config":{"connector.class": "com.singlestore.kafka.SingleStoreSinkConnector","topics": "SingleStore-quickstart","key.converter": "org.apache.kafka.connect.storage.StringConverter","key.converter.schema.registry.url": "http://schema-registry:8081","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schema.registry.url": "http://schema-registry:8081","value.converter.schemas.enable": "true","connection.ddlEndpoint": "<SingleStoreDB Hostname>","connection.database": "<SingleStoreDB Database>","connection.user": "<SingleStoreDB User>","connection.password": "<SingleStoreDB Password>"}}'Refer to SingleStore Kafka Sink Connector Properties for more information.
-
-
Wait a few minutes, and then log in to your SingleStore deployment and run the following command to verify that the data has been ingested.
SELECT * FROM `SingleStore-quickstart`;+------+-------------+-------------------------+ | Id | Artist | Song | +------+-------------+-------------------------+ | 1 | Rick Astley | Never Gonna Give You Up | +------+-------------+-------------------------+
The data is added to a SingleStore table named
SingleStore-quickstart
, in the specified database.
Last modified: August 3, 2025