# Load Data from Kafka

## Securely Connect to Kafka from SingleStore Helios

## Overview

When running a `CREATE PIPELINE ... KAFKA ... `statement, you may need to make a secure connection to Kafka.

Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate. Using SASL for authentication is optional.

* GSSAPI (Kerberos)
* PLAIN
* SCRAM-SHA-256
* SCRAM-SHA-512
* OAUTHBEARER SASL

This topic assumes SSL  have been set up, configured, and enabled on the Kafka brokers. For information on how to enable this functionality, see the [SSL and SASL sections in the Kafka documentation](http://kafka.apache.org/documentation/#security_ssl).

## Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key

To use SSL encryption for SingleStore pipelines, JKS keys need to be converted to PEM keys.

> **📝 Note**: In the steps below, the **< >** symbols indicate a variable and any information provided between these symbols is an example.

1. Create the key and keystore. You will be prompted to enter details such as name, organizational unit, city, state, etc.
   ```shell
   keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"

   ```
   ```output

   What is your first and last name?
    [Unknown]:- 
   What is the name of your organizational unit?
    [Unknown]:-
   What is the name of your organization?
    [Unknown]:-

   ```

2. Export the client certificate from the keystore using the same password as in step 1.
   ```shell
   keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jks

   ```
   ```output

   Enter keystore password:  <password>
   Certificate stored in file <pem-name>.pem
   ```

3. Import the client certificate to the truststore located on your Apache Server. Enter a new password for the keystore.
   ```shell
   keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pem

   ```
   ```output

   Enter keystore password:  
   Re-enter new password: 

   Trust this certificate? [no]:  yes
   Certificate was added to keystore

   ```

4. Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format. The \<key name>.jks and password are the same as in step 1.
   ```shell
   keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12

   ```
   ```output

   Importing keystore <keyname>.jks to <new-keyname>.p12...
   Enter destination keystore password:  
   Re-enter new password: 
   Enter source keystore password:  
   [Storing new-keystore-name.p12]

   ```

5. Extract the client certificate key into a .pem file. Use the import password created in step 4.
   ```shell
   openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pem

   ```
   ```output

   Enter Import Password:

   ```

## Steps for Creating a Secure Connection

To create a secure connection from SingleStore Helios to a Kafka workspace, follow these steps in order.

## Upload a Certificate to Use to Connect via TLS/SSL

Use the following steps to enable TLS/SSL encryption between SingleStore Helios and Kafka.

1. On the [Cloud Portal](https://portal.singlestore.com), select **Workspaces**.

2. Select the three dots under the **Actions** column for your workspace on which to enable TLS/SSL connections, and then select **Access & Security** from the list.

3. On the **Security** tab, in the **Secure Connections** section, select **Upload Certificate** to upload your CA certificate. This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.

## Build a String with the Connection Settings

Using the following settings, create a string containing the `CONFIG` clause and optionally the `CREDENTIALS` clause of the `CREATE PIPELINE ... KAFKA ...` or `SELECT ... INTO KAFKA ...` statement that you will be running.

## SSL Connection Settings

1. In your `CONFIG` JSON, if you want to enable SSL encryption only, set `"security.protocol": "ssl"`. If you want to enable Kerberos with SSL, or otherwise want to use SASL, set `"security.protocol": "sasl_ssl"`.

2. Set the remaining SSL configuration in the `CONFIG` JSON:

   * `ssl.ca.location`. Value is always `/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem`.

3. If your SSL certificate key is using a password, set it in your `CREDENTIALS` JSON.

   * `ssl.key.password`: Password for the SSL certificate key.

## SASL Connection Settings

1. In your `CONFIG` JSON, set `"security.protocol": "sasl_ssl"` for SSL connections, or `"security.protocol": "sasl_plaintext"` if you want to authenticate with Kafka without SSL encryption.

2. If your Kafka brokers do not use [SCRAM](https://en.wikipedia.org/wiki/Salted_Challenge_Response_Authentication_Mechanism) for authentication, set `"sasl.mechanism": "PLAIN"` in your `CONFIG` JSON. Otherwise, set `"sasl.mechanism": "SCRAM-SHA-256"` or `"sasl.mechanism": "SCRAM-SHA-512"`.

3. In your `CONFIG` JSON, provide the username, `"sasl.username": "<kafka_credential_username>"`.

4. In your `CREDENTIALS` JSON, provide the password, `"sasl.password": "<kafka_credential_password>"`.

> **📝 Note**: `SASL_PLAINTEXT/PLAIN` authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore not secure and susceptible to being sniffed.`SASL_PLAINTEXT/SCRAM` authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.

## Configuring OAUTHBEARER Authentication Mechanism

OAUTHBEARER is used to secure access to resources on a server by requiring clients to obtain a bearer token. The client presents this token with each request to the server, and the server verifies the token before granting access to the requested resource.

To use SASL OAUTHBEARER authentication with Kafka, the following information is required:

* `"sasl.mechanism":"OAUTHBEARER"` - Specifies the client will authenticate using an OAuth 2.0 Bearer Token.
* `"sasl.oauthbearer.client.id":"<CLIENT_ID>"` - The client ID is usually provided by the OAuth provider when the client is registered. It is a unique ID that is associated with the OAuth 2.0 Bearer Token.
* `"sasl.oauthbearer.client.secret":"<CLIENT_SECRET>"` The client secret is usually assigned by the OAuth provider when a client is registered and is used to authenticate the client.
* `"sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>"` - This is the endpoint URL on the authorization server that is used to obtain an OAuth 2.0 Bearer Token. The client sends a request to this endpoint to get a token, which is then used to authenticate subsequent requests to the server.

Optional configurations are:

* `"sasl.oauthbearer.scope":"<SCOPE>"` - Determines the permissions and resources that are available to an authorized client. This extension is optional.
* `"sasl.outhbearer.extensions":"<EXTENSION>"` - Can be included in the SASL/OAUTHBEARER mechanism to provide additional data or parameters for authentication. Consult [RFC-7628](https://tools.ietf.org/html/rfc7628#section-3.1) for further information on SASL extensions.
* See "OAUTHBEARER Pipelines Configuration Details" section for additional details regarding "sasl.oauthbearer.ssl.ca.location" "sasl.oauthbearer.config" fields.

**Prerequisites**

To use SASL OAUTHBEARER authentication the following prerequisites are required:

* A Kafka broker with listeners configured using OAUTHBEARER authentication.
* A connection to a database where Kafka brokers are reachable to create a pipeline to pull data from the Kafka queue.
* An identity service was selected (e.g., Okta, Google OAuth, Facebook OAuth, Azure AD, Keycloak, etc.).
* An Oauthbearer client was created and configured with the client\_credentials grant type on the identity service.

> **📝 Note**: The instructions for setting up a client on the Identity Service will vary depending on the server chosen and the type of application being built.

**Syntax for SASL OAUTHBEARER Pipeline**

Below is the syntax for creating a Kafka pipeline using OAUTHBEARER authentication.

```sql
CREATE or REPLACE PIPELINE <pipeline_name>   
  AS LOAD DATA KAFKA "<Kafka cluster location>"   
  CONFIG '{"security.protocol":"SASL_SSL",  
         "sasl.mechanism":"OAUTHBEARER",
         "sasl.oauthbearer.client.id":"<CLIENT_ID>",    
         "sasl.oauthbearer.client.secret":<"CLIENT_SECRET>", 
         "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>", 
         "sasl.oauthbearer.scope":"<SCOPE>"}'  
  INTO TABLE <table_name>;
```

**OAUTHBEARER Pipelines Configuration Details**

To review the current pipeline configuration use the command:

```sql
SELECT * FROM information_schema.pipelines
```

Most token providers are assumed to work out-of-the-box. However, the implementation details below will allow troubleshooting and configuring pipelines for specific cases.

**Token Request Details:**

* SingleStore implements` client_credentials` grant type for OAUTHBEARER token requests. Ensure the OAuth client is created with support for the `client_credentials` grant type.
* By default, SingleStore treats Oauthbearer tokens as opaque to address [privacy concerns](https://datatracker.ietf.org/doc/html/rfc9068#name-privacy-considerations) and uses the [expires\_in](https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2) field from the JSON token response to determine the token's validity time. If `expires_in` is not present or `"sasl.oauthbearer.config”:"use_expires_in=false”` is set in the pipeline configuration, we fall back to decoding the Oauth token JWT and use the [exp claim](https://datatracker.ietf.org/doc/html/rfc7519#section-4.1.4) to determine its validity.
* The token refresh is scheduled at 80% of the token’s lifetime and handled by SingleStore in the background.
* For https Oauth token requests, by default, SingleStore will use the CA bundle specified at pipeline creation or try to find one of the system paths if none are specified. This behavior can be changed by including additional configuration settings in the pipeline CONFIG on creation. This change will affect token request logic only, not the total SSL Kafka communication.

  * `"sasl.oauthbearer.ssl.ca.location":"system"` - to use the system default path;
  * `"sasl.oauthbearer.ssl.ca.location":"/usr/lib/ssl/certs/ca-certificates.crt"` - to use a specific CA path location;
  * `"sasl.oauthbearer.ssl.ca.location":""` (empty) to disable SSL verification;

**Token Cache Details:**

* The OAUTHBEARER tokens are cached in the extractor pools for some time (along with the extractors) and the pipeline cookies (per pipeline). The Oauth server must allow multiple simultaneous active tokens due to the several nodes or extractor pools used in the SingleStore Helios architecture.
* `"sasl.oauthbearer.cookie.cache":"on"` - by default, tokens are cached in the pipeline cookies to minimize the number of token requests. To disable this feature set `"sasl.oauthbearer.cookie.cache":"off"`. To reset the pipeline cookies, use `alter pipeline … set offset cursor ''`.
* The pipeline may fail because the OAUTHBEARER client config on the identity server is changed or the pipeline config is altered in a way which causes previously requested cached tokens to become invalid. This issue may be avoided by resetting the respective pipeline cache.

  * `alter pipeline … set offset cursor ''` - to reset cookies cache;
  * `flush extractor pools` - to reset extractor pools;

## Avro Schema Caching

When using Kafka pipelines with Avro format and the Confluent Schema Registry, SingleStore can cache schemas across batch boundaries to improve performance and reduce load on the Schema Registry.

By default, schemas are fetched from the Schema Registry for every batch. Enable schema caching using the `pipelines_avro_schema_cache_enabled` global variable:

```sql
SET GLOBAL pipelines_avro_schema_cache_enabled = ON;
```

When schema caching is enabled:

* Each pipeline maintains a schema cache for each partition.
* Schemas are fetched from the Confluent Schema Registry at most once for each schema ID per partition while the pipeline remains active.
* Subsequent batches reuse cached schema definitions and avoid redundant HTTP requests.

The schema cache is cleared when:

* `ALTER PIPELINE ... SET CONFIG` modifies the pipeline configuration. This includes changing the Confluent Schema Registry URL, as it is part of the pipeline `CONFIG`.
* `ALTER PIPELINE ... SET CREDENTIALS` updates pipeline credentials.
* `CREATE OR REPLACE PIPELINE` updates a pipeline's configuration and credentials to match the new definition while preserving the pipeline state, including the pipeline ID, cursor positions, offsets, and pipeline history. SingleStore replaces all pipeline metadata with the new definition and always performs a full replacement, even when the configuration remains unchanged.
* `DROP PIPELINE` removes the pipeline.
* `DROP DATABASE` removes the database.

The schema cache is preserved when:

* `ALTER PIPELINE ... SET TRANSFORM` modifies only the transform.
* `STOP PIPELINE` and `START PIPELINE` stop and restart the pipeline.

Enable `pipelines_avro_schema_cache_enabled` when:

* Kafka Avro pipelines process frequent or high-throughput batches.
* Schemas change infrequently.
* Reducing load on the Confluent Schema Registry is important.
* Network latency to the Schema Registry affects pipeline performance.

> **📝 Note**: Schema caching applies only to Kafka pipelines that use Avro format with the Confluent Schema Registry. It does not affect other pipeline types or data formats.

## Kafka Version Setting

> **⚠️ Warning**: Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, `CREATE PIPELINE ... KAFKA ...` and `SELECT ... INTO KAFKA ...` statements using SSL and SASL with Kafka also need to adhere to that version requirement. The Kafka protocol version can be passed in through JSON through the `CONFIG` clause, similar to this `CONFIG '{"kafka_version":"0.10.0.0"}'`. Alternatively, the `pipelines_kafka_version` engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a `CREATE PIPELINE ... KAFKA ...` statement.

## Final Step: Use the Connection String in a SQL Statement

Create your `CREATE PIPELINE ... KAFKA ...` or `SELECT ... INTO KAFKA ...` statement, using the string containing the connection settings that you created in the previous steps.

## Examples

## Create a Kafka Pipeline with Data from Confluent Cloud

```sql
CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE messages;
```

## Publish Messages to Confluent Cloud

```sql
SELECT text FROM t INTO
KAFKA '<Confluent Cloud workspace endpoint>/test-topic'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
         "sasl.mechanism": "PLAIN",
         "security.protocol": "SASL_SSL",
         "ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"};
```

## Configure SASL OAUTHBEARER for Use with Kafka Pipelines

> **📝 Note**: SASL OAUTHBEARER is not supported with `SELECT INTO ...`

```sql
CREATE or REPLACE PIPELINE <pipeline_name>   
AS LOAD DATA KAFKA "<Kafka cluster location>"  
CONFIG '{"security.protocol":"SASL_SSL",  
         "sasl.mechanism":"OAUTHBEARER",
         "sasl.oauthbearer.client.id":"<CLIENT_ID>",    
         "sasl.oauthbearer.client.secret":<"CLIENT_SECRET>", 
         "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>", 
         "sasl.oauthbearer.scope":"<SCOPE>"}'  
INTO TABLE <table_name>;
```

## Connecting to Confluent Schema Registry over SSL

To enable Pipelines to connect to Confluent Schema Registry over SSL, install the registry’s certificate (ca-cert) on all nodes in your SingleStore Helios workspace. For example:

```shell
cat ca-cert >> /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem
update-ca-trust

```

> **📝 Note**: You only need to install the registry’s certificate (ca-cert) in your SingleStore Helios workspace if you are using a self-signed certificate or a certificate signed by an internal CA.

After installing the registry’s certificate on all nodes, existing Pipelines that you already created (or new Pipelines that you create) with the host name/IP address and port of Confluent Schema Registry will use SSL automatically.

## Schema Registry Configuration Options

Supported configuration options include:

* `schema.registry.ssl.ca.location`
* `schema.registry.ssl.ca.directory`
* `schema.registry.ssl.verifypeer`
* `schema.registry.ssl.certificate.location`
* `schema.registry.ssl.key.location`
* `schema.registry.ssl.key.password`
* `schema.registry.curl.timeout`
* `schema.registry.curl.tcp.keepalive`
* `schema.registry.curl.tcp.keepidle`
* `schema.registry.curl.keepintvl`
* `schema.registry.curl.connecttimeout`
* `schema.registry.username`
* `schema.registry.password`
* `schema.registry.curl.verbose`

## Load Data using the SingleStore Kafka Sink Connector

The SingleStore Kafka Sink Connector is a Kafka Connect connector that enables you to ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore Helios. It is a Sink (target) connector designed to read data from Kafka topics and write the data to SingleStore Helios databases.

Refer to [Load Data using the SingleStore Kafka Sink Connector](https://docs.singlestore.com/cloud/load-data/integrate-with-singlestore-helios/singlestore-kafka-sink-connector.md) for more information.

## In this section

* [Load Data from Kafka Using a Pipeline](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/load-data-from-kafka-using-a-pipeline.md)
* [Retrieve Kafka Properties](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/retrieve-kafka-properties.md)
* [Kafka Pipeline Using Avro Format](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/kafka-pipeline-using-avro-format.md)
* [Kafka Pipeline Using JSON Format](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/kafka-pipeline-using-json-format.md)
* [Test your Kafka Cluster using kcat (formerly kafkacat)](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/test-your-kafka-cluster-using-kcat-formerly-kafkacat.md)
* [Connect to Kafka Pipelines using an Outbound Endpoint](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/connect-to-kafka-pipelines-using-an-outbound-endpoint.md)
* [Kafka Pipeline Errors](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/kafka-pipeline-errors.md)

***

Modified at: June 10, 2026

Source: [/cloud/load-data/data-sources/load-data-from-kafka/](https://docs.singlestore.com/cloud/load-data/data-sources/load-data-from-kafka/)

(An index of the documentation is available at /llms.txt)
