Connect with Confluent Platform

This tutorial shows how to connect to your SingleStore databases from Confluent Platform using the SingleStore Debezium connector. The connector supports Confluent Platform versions from 7.1.x to 7.8.x.

Refer to Confluent Platform Documentation for related information.

Prerequisites

  • An active SingleStore Helios deployment with OBSERVE (CDC) queries enabled. Refer to Enable CDC for more information. Run the following command to enable CDC:

    SET GLOBAL enable_observe_queries = 1;
  • Docker

Connect Confluent Platform to SingleStore

To connect to your SingleStore databases from Confluent Platform using the SingleStore Debezium connector,

  1. Create a new directory named quickstart.

  2. Create a docker-compose.yml file in the directory and add the following configuration to the yml file.

    ---
    services:
    zookeeper:
    image: confluentinc/cp-zookeeper:7.8.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "2181" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    broker:
    image: confluentinc/cp-server:7.8.0
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    - "9101:9101"
    environment:
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
    KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
    KAFKA_JMX_PORT: 9101
    KAFKA_JMX_HOSTNAME: localhost
    KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
    CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
    CONFLUENT_METRICS_ENABLE: 'true'
    CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "9092" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    schema-registry:
    image: confluentinc/cp-schema-registry:7.8.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - "8081:8081"
    environment:
    SCHEMA_REGISTRY_HOST_NAME: schema-registry
    SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
    SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "8081" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    connect:
    image: confluentinc/cp-server-connect:7.8.0
    hostname: connect
    container_name: connect
    depends_on:
    - broker
    - schema-registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
    CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
    CONNECT_REST_ADVERTISED_HOST_NAME: connect
    CONNECT_GROUP_ID: compose-connect-group
    CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
    CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
    CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
    CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
    CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
    CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    # CLASSPATH required due to CC-2422
    CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.8.0.jar
    CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
    CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
    CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    healthcheck:
    test: [ "CMD", "nc", "-z", "localhost", "8083" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
    control-center:
    image: confluentinc/cp-enterprise-control-center:7.8.0
    hostname: control-center
    container_name: control-center
    depends_on:
    - broker
    - schema-registry
    - connect
    ports:
    - "9021:9021"
    environment:
    CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
    CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
    CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    CONTROL_CENTER_REPLICATION_FACTOR: 1
    CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
    CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
    CONFLUENT_METRICS_TOPIC_REPLICATION: 1
    PORT: 9021
    healthcheck:
    test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
    start_period: 5m
    start_interval: 10s
    interval: 1m
    timeout: 10s
    retries: 5
  3. Create a directory named plugins in the quickstart directory.

  4. Download the SingleStore Debezium Connector .zip archive from GitHub and unpack it in the plugins directory. After unpacking the connector, the quickstart directory has the following structure:

    quickstart/
    ├─ plugins/
    │  ├─ singlestore-singlestore-debezium-connector-<version>/
    ├─ docker-compose.yml
  5. Start Confluent Platform. Run the following command from the quickstart directory:

    docker compose up -d
    [+] Running 6/6
     ✔ Network quickstart_default  Created
     ✔ Container zookeeper         Started
     ✔ Container broker            Started
     ✔ Container schema-registry   Started
     ✔ Container connect           Started
     ✔ Container control-center    Started 
  6. When the previous command completes and all the modules are running, run the following command to verify the status of the services:

    docker compose ps
    NAME              IMAGE                                             COMMAND                  SERVICE           CREATED         STATUS                        PORTS
    broker            confluentinc/cp-server:7.8.0                      "/etc/confluent/dock…"   broker            2 minutes ago   Up 2 minutes (healthy)        0.0.0.0:9092->9092/tcp, :::9092->9092/tcp, 0.0.0.0:9101->9101/tcp, :::9101->9101/tcp
    connect           confluentinc/cp-server-connect:7.8.0              "/etc/confluent/dock…"   connect           2 minutes ago   Up About a minute (healthy)   0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp
    control-center    confluentinc/cp-enterprise-control-center:7.8.0   "/etc/confluent/dock…"   control-center    2 minutes ago   Up About a minute (healthy)   0.0.0.0:9021->9021/tcp, :::9021->9021/tcp
    schema-registry   confluentinc/cp-schema-registry:7.8.0             "/etc/confluent/dock…"   schema-registry   2 minutes ago   Up 2 minutes (healthy)        0.0.0.0:8081->8081/tcp, :::8081->8081/tcp
    zookeeper         confluentinc/cp-zookeeper:7.8.0                   "/etc/confluent/dock…"   zookeeper         2 minutes ago   Up 2 minutes (healthy)        2888/tcp, 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 3888/tcp
  7. Start the connector. Update the connection configuration of your SingleStore deployment in the following command and then run it.

    curl -X POST http://localhost:8083/connectors \
    -H "Content-Type:application/json" \
    -H "Accept:application/json" \
    -d '{
    "name": "SingleStoreConnector",
    "config":
    {
    "connector.class": "com.singlestore.debezium.SingleStoreConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "database.hostname": "<hostname>",
    "database.port": "<port>",
    "database.user": "<user>",
    "database.password": "<password>",
    "topic.prefix": "<prefix>",
    "database.dbname": "<database>",
    "database.table": "<table>"
    }
    }'
  8. Verify that the CDC events are populated in the topic. The connector starts streaming changes to the table into the Kafka topic. The Kafka topic name is in the <topic.prefix>.<database.dbname>.<database.table> format.

  9. Go to http://localhost:9021/clusters to open the Confluent Control Center instance.

  10. Select controlcenter.cluster > Topics <your_topic> > Messages to view the change events for the table.

Last modified: March 31, 2025

Was this article helpful?

Verification instructions

Note: You must install cosign to verify the authenticity of the SingleStore file.

Use the following steps to verify the authenticity of singlestoredb-server, singlestoredb-toolbox, singlestoredb-studio, and singlestore-client SingleStore files that have been downloaded.

You may perform the following steps on any computer that can run cosign, such as the main deployment host of the cluster.

  1. (Optional) Run the following command to view the associated signature files.

    curl undefined
  2. Download the signature file from the SingleStore release server.

    • Option 1: Click the Download Signature button next to the SingleStore file.

    • Option 2: Copy and paste the following URL into the address bar of your browser and save the signature file.

    • Option 3: Run the following command to download the signature file.

      curl -O undefined
  3. After the signature file has been downloaded, run the following command to verify the authenticity of the SingleStore file.

    echo -n undefined |
    cosign verify-blob --certificate-oidc-issuer https://oidc.eks.us-east-1.amazonaws.com/id/CCDCDBA1379A5596AB5B2E46DCA385BC \
    --certificate-identity https://kubernetes.io/namespaces/freya-production/serviceaccounts/job-worker \
    --bundle undefined \
    --new-bundle-format -
    Verified OK