Getting Started with Confluent Platform

This guide shows how to use the SingleStore Kafka Sink connector ("the connector") on Confluent Platform and connect to your SingleStore deployments.

SingleStore recommends having hands-on experience with Confluent Platform and an understanding of its concepts. Refer to Confluent Platform for related information.

Prerequisites

  • An active SingleStore deployment

  • Docker

Configure the Connection

To connect to your SingleStore deployment from Confluent Platform using the SingleStore Kafka Sink connector, perform the following tasks:

  1. In your local environment, create a quickstart directory.

  2. Create a docker-compose.yml file in the quickstart directory and add the following to the file:

    ---
    services:
    zookeeper:
    image: confluentinc/cp-zookeeper:7.8.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "2181" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    broker:
    image: confluentinc/cp-server:7.8.0
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    - "9101:9101"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    KAFKA_JMX_PORT: 9101
    KAFKA_JMX_HOSTNAME: localhost
    KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "9092" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    schema-registry:
    image: confluentinc/cp-schema-registry:7.8.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - "8081:8081"
    environment:
    SCHEMA_REGISTRY_HOST_NAME: schema-registry
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
    SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "8081" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    connect:
    image: confluentinc/cp-server-connect:7.8.0
    hostname: connect
    container_name: connect
    depends_on:
    - broker
    - schema-registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
    CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
    CONNECT_REST_ADVERTISED_HOST_NAME: connect
    CONNECT_GROUP_ID: compose-connect-group
    CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
    CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
    CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
    CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
    CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    # CLASSPATH required due to CC-2422
    CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.8.0.jar
    CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
    CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "8083" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    control-center:
    image: confluentinc/cp-enterprise-control-center:7.8.0
    hostname: control-center
    container_name: control-center
    depends_on:
    - broker
    - schema-registry
    - connect
    ports:
    - "9021:9021"
    environment:
    CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
    CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
    CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    CONTROL_CENTER_REPLICATION_FACTOR: 1
    CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
    CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
    CONFLUENT_METRICS_TOPIC_REPLICATION: 1
    PORT: 9021
    healthcheck:
    test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
  3. Create a quickstart/plugins directory.

  4. Download the singlestore-singlestore-kafka-connector-<version>.zip file from the SingleStore Kafka Connector GitHub repository.

  5. Extract the downloaded .zip archive to the plugins directory. Now the quickstart directory has the following structure:

    quickstart/
    ├─ plugins/
    │  ├─ singlestore-singlestore-kafka-connector-<version>/
    ├─ docker-compose.yml
  6. Start Confluent Platform. Run the following command from the quickstart directory:

    docker compose up -d

    Once the processes complete, all the modules will be up and running. To check the status of all the services, run the following command:

    docker compose ps
  7. Create a Kafka topic and add data to the topic.

    1. Open Confluent Control Center at http://localhost:9021/clusters.

    2. Select controlcenter.cluster > Topics > Add topic.

    3. On the New topic page, enter a name and the number of partitions for the Kafka topic. This example uses a topic named SingleStore-quickstart.

    4. Select Create with defaults.

    5. On the SingleStore-quickstart page, select Messages > Produce a new message.

    6. On the Produce a new message dialog, add a message. In this example the following are added:

      Key

      1

      Value

      {"schema": {"type": "struct", "optional": false, "version": 1, "fields": [{ "field": "Id", "type": "string", "optional": true }, { "field": "Artist", "type": "string", "optional": true }, { "field": "Song", "type": "string", "optional": true }] }, "payload": { "Id": "1", "Artist": "Rick Astley", "Song": "Never Gonna Give You Up"}}

    7. Select Produce.

    8. Start the connector using the following command:

      Note: Specify the connection configuration of your SingleStore deployment before running this command.

      curl -X POST http://localhost:8083/connectors \
      -H "Content-Type:application/json" \
      -H "Accept:application/json" \
      -d '{
      "name": "singlestore-sink-connector",
      "config":
      {
      "connector.class": "com.singlestore.kafka.SingleStoreSinkConnector",
      "topics": "SingleStore-quickstart",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "value.converter.schemas.enable": "true",
      "connection.ddlEndpoint": "<SingleStoreDB Hostname>",
      "connection.database": "<SingleStoreDB Database>",
      "connection.user": "<SingleStoreDB User>",
      "connection.password": "<SingleStoreDB Password>"
      }
      }'

      Refer to SingleStore Kafka Sink Connector Properties for more information.

  8. Wait a few minutes, and then log in to your SingleStore deployment and run the following command to verify that the data has been ingested.

    SELECT * FROM `SingleStore-quickstart`;
    +------+-------------+-------------------------+
    | Id   | Artist      | Song                    |
    +------+-------------+-------------------------+
    | 1    | Rick Astley | Never Gonna Give You Up |
    +------+-------------+-------------------------+

    The data is added to a SingleStore table named SingleStore-quickstart, in the specified database.

Last modified: August 3, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK