SingleStore Managed Service

Securely Connect to Kafka from SingleStore
Overview

When you run a CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ..., statement you may wish to or be required to securely connect to Kafka.

You can connect via SSL and optionally authenticate through SASL. The following SASL authentication mechanisms are supported:

  • GSSAPI (Kerberos)

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

This topic assumes you have already have set up, configured, and enabled SSL on your Kafka brokers. For information on how to enable this functionality, see the SSL and SASL sections in the Kafka documentation.

Steps for Creating a Secure Connection

To create a secure connection from SingleStore to a Kafka cluster, follow these steps in order.

Upload a Certificate to Use to Connect via TLS/SSL

Use the following steps to enable TLS/SSL encryption between SingleStore Managed Service and Kafka.

  1. From the Clusters page on the SingleStore Customer Portal, click on the cluster on which to enable TLS/SSL connections.

  2. Click the Security tab at the top of the page.

  3. Click on the Upload Certificate button to upload your CA certificate. This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.

Build a String with the Connection Settings

Using the following settings, create a string containing the CONFIG clause and optionally the CREDENTIALS clause of the CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement that you will be running.

SSL Connection Settings
  1. In your CONFIG JSON, if you want to enable SSL encryption only, set "security.protocol": "ssl". If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.protocol": "sasl_ssl".

  2. Set the remaining SSL configuration in the CONFIG JSON:

    • ssl.ca.location. Value is always /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem.

  3. If your SSL certificate key is using a password, set it in your CREDENTIALS JSON.

    • ssl.key.password: Password for the SSL certificate key.

SASL Connection Settings
  1. In your CONFIG JSON, set "security.protocol": "sasl_ssl" for SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kafka without SSL encryption.

  2. If your Kafka brokers do not use SCRAM for authentication, set "sasl.mechanism": "PLAIN" in your CONFIG JSON. Otherwise, set "sasl.mechanism": "SCRAM-SHA-256" or "sasl.mechanism": "SCRAM-SHA-512".

  3. In your CONFIG JSON, provide the username, "sasl.username": "<kafka_credential_username>".

  4. In your CREDENTIALS JSON, provide the password, "sasl.password": "<kafka_credential_password>".

Note

SASL_PLAINTEXT/PLAIN authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore not secure and susceptible to being sniffed.

SASL_PLAINTEXT/SCRAM authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.

Kafka Version Setting

Warning

Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, CREATE PIPELINE ... KAFKA ... and SELECT ... INTO KAFKA ... statements using SSL and SASL with Kafka also need to adhere to that version requirement. The Kafka protocol version can be passed in through JSON through the CONFIG clause, similar to this CONFIG '{"kafka_version":"0.10.0.0"}'. Alternatively, the pipelines_kafka_version engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE ... KAFKA ... statement.

Final Step: Use the Connection String in a SQL Statement

Create your CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement, using the string containing the connection settings that you created in the previous steps.

Examples

Create a Kafka pipeline that loads data from Confluent Cloud

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud cluster endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE messages;

Publish messages to Confluent Cloud

SELECT text FROM t INTO
KAFKA '<Confluent Cloud cluster endpoint>/test-topic'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"};