Skip to main content

Securely Connect to Kafka from SingleStoreDB

Overview

When running a CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement, you may need to make a secure connection to Kafka.

Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate. Using SASL for authentication is optional.

  • GSSAPI (Kerberos)

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • OAUTHBEARER SASL

Kerberos may used for authentication.

This topic assumes SSL and/or Kerberos have been set up, configured, and enabled on the Kafka brokers. For information on how to enable this functionality, see the SSL and SASL sections in the Kafka documentation.

Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key

To use SSL encryption for SingleStoreDB pipelines, JKS keys need to be converted to PEM keys.

Note

In the steps below, the < > symbols indicate a variable and any information provided between these symbols is an example.

  1. Create the key and keystore. You will be prompted to enter details such as name, organizational unit, city, state, etc.

    keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"
    ****
    What is your first and last name?
     [Unknown]:- 
    What is the name of your organizational unit?
     [Unknown]:-
    What is the name of your organization?
     [Unknown]:-
    
  2. Export the client certificate from the keystore using the same password as in step 1.

    keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jks
    ****
    Enter keystore password:  <password>
    Certificate stored in file <pem-name>.pem
  3. Import the client certificate to the truststore located on your Apache Server. Enter a new password for the keystore.

    keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pem
    ****
    Enter keystore password:  
    Re-enter new password: 
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
    
  4. Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format. The <key name>.jks and password are the same as in step 1.

    keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12
    ****
    Importing keystore <keyname>.jks to <new-keyname>.p12...
    Enter destination keystore password:  
    Re-enter new password: 
    Enter source keystore password:  
    [Storing new-keystore-name.p12]
    
  5. Extract the client certificate key into a .pem file. Use the import password created in step 4.

    openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pem
    ****
    Enter Import Password:
    

Note

Refer to Generating SSL Certificates for more information.

Steps for Creating a Secure Connection

To create a secure connection from SingleStoreDB to a Kafka cluster, follow these steps in order.

Copy Security Files to the SingleStoreDBCluster

Securely copy the CA certificate, SSL certificate, and SSL key used for connections between the SingleStoreDBcluster and Kafka brokers from the Kafka cluster to each SingleStoreDB node. You should use a secure file transfer method, such as scp, to copy the files to your SingleStoreDB nodes. The file locations on your SingleStoreDB nodes should be consistent across the cluster.

Setup Your SingleStoreDBCluster for Kerberos Authentication

To configure a Kafka pipeline to authenticate with Kerberos, you must configure all of your nodes in your SingleStoreDBcluster as clients for Kerberos authentication. To do this, perform the following steps:

  1. Securely copy the keytab file containing the SingleStoreDB service principal (e.g. memsql/host.domain.com@REALM.NAME) from the Kerberos server to each node in your SingleStoreDBcluster. You should use a secure file transfer method, such as scp, to copy the keytab file to your SingleStoreDB nodes. The file location on your SingleStoreDB nodes should be consistent across the cluster.

  2. Make sure your SingleStoreDB nodes can connect to the KDC server using the fully-qualified domain name (FQDN) of the KDC server. This might require configuring network settings or updating /etc/hosts on your nodes.

  3. Also ensure that the memsql service account on each node can access the copied keytab file. This can be accomplished by changing file ownership or permissions. If the memsql account cannot access the keytab file, you will not be able to complete the next step because your master aggregator will not be able to restart after applying configuration updates.

  4. When authenticating with Kerberos, SingleStoreDB needs to authenticate as a client, which means you must also install a Kerberos client onto each node in your cluster. The following installs the krb5-user package for Debian-based Linux distributions.

    sudo apt-get update && apt-get install krb5-user
    

    When setting up your Kerberos configuration settings, set your default realm, Kerberos admin server, and other options to those defined by your KDC server. In the examples used in this topic, the default realm is EXAMPLE.COM, and the Kerberos server settings are set to the FQDN of the KDC server host.example.com.

Build a String with the Connection Settings

Using the following settings, create a string containing the CONFIG clause and optionally the CREDENTIALS clause of the CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement that you will be running.

SSL Connection Settings

  1. In your CONFIG JSON, if you want to enable SSL encryption only, set "security.protocol": "ssl". If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.protocol": "sasl_ssl".

  2. Set the remaining SSL configuration in the CONFIG JSON:

    • ssl.ca.location: Path to the CA certificate on the SingleStoreDB node.

    • ssl.certificate.location: Path to the SSL certificate on the SingleStoreDB node.

    • ssl.key.location: Path to the SSL certificate key on the SingleStoreDB node.

  3. If your SSL certificate key is using a password, set it in your CREDENTIALS JSON.

    • ssl.key.password: Password for the SSL certificate key.

SASL Connection Settings

  1. In your CONFIG JSON, set "security.protocol": "sasl_ssl" for SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kafka without SSL encryption.

  2. If your Kafka brokers do not use SCRAM for authentication, set "sasl.mechanism": "PLAIN" in your CONFIG JSON. Otherwise, set "sasl.mechanism": "SCRAM-SHA-256" or "sasl.mechanism": "SCRAM-SHA-512".

  3. In your CONFIG JSON, provide the username, "sasl.username": "<kafka_credential_username>".

  4. In your CREDENTIALS JSON, provide the password, "sasl.password": "<kafka_credential_password>".

Note

SASL_PLAINTEXT/PLAIN authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore not secure and susceptible to being sniffed.

SASL_PLAINTEXT/SCRAM authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.

Kerberos Connection Settings

  1. In your CONFIG JSON, set "sasl.mechanism": "GSSAPI".

  2. Set "security.protocol": "sasl_ssl" for Kerberos and SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kerberos without SSL encryption.

  3. Set the remaining Kerberos configuration in CONFIG JSON:

    • sasl.kerberos.service.name: The Kerberos principal name that Kafka runs as. For example, "kafka".

    • sasl.kerberos.keytab: The local file path on the SingleStoreDB node to the authenticating keytab.

    • sasl.kerberos.principal: The service principal name for the SingleStoreDBcluster. For example, "memsql/host.example.com@EXAMPLE.COM".

Configuring OAUTHBEARER Authentication Mechanism

OAUTHBEARER is used to secure access to resources on a server by requiring clients to obtain a bearer token. The client presents this token with each request to the server, and the server verifies the token before granting access to the requested resource.

To use SASL OAUTHBEARER authentication with Kafka, the following information is required:

  • "sasl.mechanism":"OAUTHBEARER" - Specifies the client will authenticate using an OAuth 2.0 Bearer Token.

  • "sasl.oauthbearer.client.id":"<CLIENT_ID>" - The client ID is usually provided by the OAuth provider when the client is registered. It is a unique ID that is associated with the OAuth 2.0 Bearer Token.

  • "sasl.oauthbearer.client.secret":"<CLIENT_SECRET>" The client secret is usually assigned by the OAuth provider when a client is registered and is used to authenticate the client.

  • "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>" - This is the endpoint URL on the authorization server that is used to obtain an OAuth 2.0 Bearer Token. The client sends a request to this endpoint to get a token, which is then used to authenticate subsequent requests to the server.

  • "sasl.oauthbearer.scope":"<SCOPE>" - Determines the permissions and resources that are available to an authorized client. This extension is optional.

    • The scope example specifies that the access token is authorized to read from and write to Kafka topics named "story".

      oauth.scope="read:topic,write:"story"
  • “sasl outhbearer.extensions”:”<EXTENSION>” - Can be included in the SASL/OAUTHBEARER mechanism to provide additional data or parameters for authentication. The field is used to carry extension parameters that are not defined in the original SASL/OAUTHBEARER specification.

Prerequisites

To use SASL OAUTHBEARER authentication the following prerequisites are required:

  • A working Kafka queue.

  • Connection to a database or cluster to create a pipeline to pull data from the Kafka queue.

  • An Authorization Server has been selected (e.g., Auth0, Google OAuth, Facebook OAuth, etc.) and an account was created on the selected server.

  • An application created and configured on the Authorization Server.

Note

The instructions for registering with an Authorization Server will vary depending on the server chosen and the type of application being built.

Syntax for SASL OAUTHBEARER Pipeline

Below is the syntax for creating a Kafka pipeline using OAUTHBEARER authentication.

CREATE or REPLACE PIPELINE <pipeline_name>   
  AS LOAD DATA KAFKA "<Kafka cluster location>"   
  CONFIG '{"security.protocol":"SASL_SSL",  
         "sasl.mechanism":"OAUTHBEARER",
         "sasl.oauthbearer.client.id":"<CLIENT_ID>",    
         "sasl.oauthbearer.client.secret":<"CLIENT_SECRET>", 
         "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>", 
         "sasl.oauthbearer.scope":"<SCOPE>"}'  
  INTO TABLE <table_name>;

OAUTHBEARER Pipelines Configuration Details

To review the current pipeline configuration use the command:

SELECT * FROM information_schema.pipelines

Most token providers are assumed to work as expected. However, the implementation details below will allow troubleshooting and configuring pipelines for specific cases.

Token Request Details:

  • SingleStore implements client_credentials grant type for OAUTHBEARER token requests. Ensure the OAuth client is created with support for the client_credentials grant type.

  • The exp claim is used to specify the expiration time for a JSON Web Token (JWT). Although it is optional, it is recommended to use it as it adds an extra layer of security.

  • The JWT must not be accepted for processing after the exp claim. Schedule the token refresh at 80% of the token’s lifetime.

Token Cache Details:

  • When several pipelines are running on various nodes, the Oauth server must allow multiple simultaneous active tokens. The OAUTHBEARER tokens are cached in the extractor pools for some time (along with the extractors) and the pipeline cookies (per pipeline).

  • sasl.oauthbearer.cookie.cache:on - By default, tokens are cached in the pipeline cookies to minimize the number of token requests. To disable this feature set sasl.oauthbearer.cookie.cache:off. To reset the pipeline cookies, use alter pipeline … set offset cursor ''.

  • If the OAUTHBEARER client on the authorization server is altered or recreated, this may cause the tokens to become invalid. Any cached invalid tokens may cause the pipelines to fail.

  • Using a different client_id for altered clients is recommended. Otherwise, reset the respective pipeline cookies and execute flush extractor pools to discard the cached tokens.

Kafka Version Setting

Warning

Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, CREATE PIPELINE ... KAFKA ... and SELECT ... INTO KAFKA ... statements using SSL and SASL with Kafka also need to adhere to that version requirement. The Kafka protocol version can be passed in through JSON through the CONFIG clause, similar to this CONFIG '{"kafka_version":"0.10.0.0"}'. Alternatively, the pipelines_kafka_version engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE ... KAFKA ... statement.

Final Step: Use the Connection String in a SQL Statement

Create your CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement, using the string containing the connection settings that you created in the previous steps.

Examples

The following examples make the following assumptions:

  • Port 9092 is a plaintext endpoint

  • Port 9093 is an SSL endpoint

  • Port 9094 is a plaintext SASL endpoint

  • Port 9095 is an SSL SASL endpoint

Note

The examples use CREATE PIPELINE, but the CONFIG and CREDENTIALS clauses shown can be used with SELECT ... INTO ... KAFKA also.

Plaintext

The following CREATE PIPELINE statements are equivalent:

CREATE PIPELINE `kafka_plaintext`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
CONFIG '{"security.protocol": "plaintext"}'
INTO table t;
CREATE PIPELINE `kafka_no_creds`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
INTO table t;

SSL

CREATE PIPELINE `kafka_ssl`
AS LOAD DATA KAFKA 'host.example.com:9093/test'
CONFIG '{"security.protocol": "ssl",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t;

Kerberos with no SSL

CREATE PIPELINE `kafka_kerberos_no_ssl`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{""security.protocol": "sasl_plaintext",
"sasl.mechanism": "GSSAPI",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
INTO table t

Kerberos with SSL

CREATE PIPELINE `kafka_kerberos_ssl`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "GSSAPI",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t

SASL/PLAIN with SSL

CREATE PIPELINE `kafka_sasl_ssl_plain`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "PLAIN",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/PLAIN without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM with SSL

CREATE PIPELINE `kafka_sasl_ssl_scram`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;

Configure SASL OAUTHBEARER for Use with Kafka Pipelines

CREATE or REPLACE PIPELINE <pipeline_name>   
AS LOAD DATA KAFKA "<Kafka cluster location>"  
CONFIG '{"security.protocol":"SASL_SSL",  
         "sasl.mechanism":"OAUTHBEARER",
         "sasl.oauthbearer.client.id":"<CLIENT_ID>",    
         "sasl.oauthbearer.client.secret":<"CLIENT_SECRET>", 
         "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>", 
         "sasl.oauthbearer.scope":"<SCOPE>"}'  
INTO TABLE <table_name>;