Load Data from Kafka
On this page
Securely Connect to Kafka from SingleStore Helios
Overview
When running a CREATE PIPELINE .
statement, you may need to make a secure connection to Kafka.
Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate.
-
GSSAPI (Kerberos)
-
PLAIN
-
SCRAM-SHA-256
-
SCRAM-SHA-512
-
OAUTHBEARER SASL
This topic assumes SSL have been set up, configured, and enabled on the Kafka brokers.
Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key
To use SSL encryption for SingleStore pipelines, JKS keys need to be converted to PEM keys.
Note
In the steps below, the < > symbols indicate a variable and any information provided between these symbols is an example.
-
Create the key and keystore.
You will be prompted to enter details such as name, organizational unit, city, state, etc. keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"What is your first and last name? [Unknown]:- What is the name of your organizational unit? [Unknown]:- What is the name of your organization? [Unknown]:-
-
Export the client certificate from the keystore using the same password as in step 1.
keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jksEnter keystore password: <password> Certificate stored in file <pem-name>.pem
-
Import the client certificate to the truststore located on your Apache Server.
Enter a new password for the keystore. keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pemEnter keystore password: Re-enter new password: Trust this certificate? [no]: yes Certificate was added to keystore
-
Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format.
The <key name>. jks and password are the same as in step 1. keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12Importing keystore <keyname>.jks to <new-keyname>.p12... Enter destination keystore password: Re-enter new password: Enter source keystore password: [Storing new-keystore-name.p12]
-
Extract the client certificate key into a .
pem file. Use the import password created in step 4. openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pemEnter Import Password:
Steps for Creating a Secure Connection
To create a secure connection from SingleStore Helios to a Kafka workspace, follow these steps in order.
Upload a Certificate to Use to Connect via TLS/SSL
Use the following steps to enable TLS/SSL encryption between SingleStore Helios and Kafka.
-
From the Deployments page on the Cloud Portal, select the workspace group on which to enable TLS/SSL connections.
-
Select the Security tab at the top of the page.
-
Select the Upload Certificate button to upload your CA certificate.
This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.
Build a String with the Connection Settings
Using the following settings, create a string containing the CONFIG
clause and optionally the CREDENTIALS
clause of the CREATE PIPELINE .
or SELECT .
statement that you will be running.
SSL Connection Settings
-
In your
CONFIG
JSON, if you want to enable SSL encryption only, set"security.
.protocol": "ssl" If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.
.protocol": "sasl_ ssl" -
Set the remaining SSL configuration in the
CONFIG
JSON:-
ssl.
.ca. location Value is always /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.
.pem
-
-
If your SSL certificate key is using a password, set it in your
CREDENTIALS
JSON.-
ssl.
: Password for the SSL certificate key.key. password
-
SASL Connection Settings
-
In your
CONFIG
JSON, set"security.
for SSL connections, orprotocol": "sasl_ ssl" "security.
if you want to authenticate with Kafka without SSL encryption.protocol": "sasl_ plaintext" -
If your Kafka brokers do not use SCRAM for authentication, set
"sasl.
in yourmechanism": "PLAIN" CONFIG
JSON.Otherwise, set "sasl.
ormechanism": "SCRAM-SHA-256" "sasl.
.mechanism": "SCRAM-SHA-512" -
In your
CONFIG
JSON, provide the username,"sasl.
.username": "<kafka_ credential_ username>" -
In your
CREDENTIALS
JSON, provide the password,"sasl.
.password": "<kafka_ credential_ password>"
Note
SASL_
authentication mode with Kafka sends your credentials unencrypted over the network.
SASL_
authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.
Configuring OAUTHBEARER Authentication Mechanism
OAUTHBEARER is used to secure access to resources on a server by requiring clients to obtain a bearer token.
To use SASL OAUTHBEARER authentication with Kafka, the following information is required:
-
"sasl.
- Specifies the client will authenticate using an OAuth 2.mechanism":"OAUTHBEARER" 0 Bearer Token. -
"sasl.
- The client ID is usually provided by the OAuth provider when the client is registered.oauthbearer. client. id":"<CLIENT_ ID>" It is a unique ID that is associated with the OAuth 2. 0 Bearer Token. -
"sasl.
The client secret is usually assigned by the OAuth provider when a client is registered and is used to authenticate the client.oauthbearer. client. secret":"<CLIENT_ SECRET>" -
"sasl.
- This is the endpoint URL on the authorization server that is used to obtain an OAuth 2.oauthbearer. token. endpoint. url":"<ENDPOINT_ URL>" 0 Bearer Token. The client sends a request to this endpoint to get a token, which is then used to authenticate subsequent requests to the server.
Optional configurations are:
-
"sasl.
- Determines the permissions and resources that are available to an authorized client.oauthbearer. scope":"<SCOPE>" This extension is optional. -
"sasl.
- Can be included in the SASL/OAUTHBEARER mechanism to provide additional data or parameters for authentication.outhbearer. extensions":"<EXTENSION>" Consult RFC-7628 for further information on SASL extensions. -
See "OAUTHBEARER Pipelines Configuration Details" section for additional details regarding "sasl.
oauthbearer. ssl. ca. location" "sasl. oauthbearer. config" fields.
Prerequisites
To use SASL OAUTHBEARER authentication the following prerequisites are required:
-
A Kafka broker with listeners configured using OAUTHBEARER authentication.
-
A connection to a database where Kafka brokers are reachable to create a pipeline to pull data from the Kafka queue.
-
An identity service was selected (e.
g. , Okta, Google OAuth, Facebook OAuth, Azure AD, Keycloak, etc. ). -
An Oauthbearer client was created and configured with the client_
credentials grant type on the identity service.
Note
The instructions for setting up a client on the Identity Service will vary depending on the server chosen and the type of application being built.
Syntax for SASL OAUTHBEARER Pipeline
Below is the syntax for creating a Kafka pipeline using OAUTHBEARER authentication.
CREATE or REPLACE PIPELINE <pipeline_name>AS LOAD DATA KAFKA "<Kafka cluster location>"CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"OAUTHBEARER","sasl.oauthbearer.client.id":"<CLIENT_ID>","sasl.oauthbearer.client.secret":<"CLIENT_SECRET>","sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>","sasl.oauthbearer.scope":"<SCOPE>"}'INTO TABLE <table_name>;
OAUTHBEARER Pipelines Configuration Details
To review the current pipeline configuration use the command:
SELECT * FROM information_schema.pipelines
Most token providers are assumed to work out-of-the-box.
Token Request Details:
-
SingleStore implements
client_
grant type for OAUTHBEARER token requests.credentials Ensure the OAuth client is created with support for the client_
grant type.credentials -
By default, SingleStore treats Oauthbearer tokens as opaque to address privacy concerns and uses the expires_
in field from the JSON token response to determine the token's validity time. If expires_
is not present orin "sasl.
is set in the pipeline configuration, we fall back to decoding the Oauth token JWT and use the exp claim to determine its validity.oauthbearer. config”:"use_ expires_ in=false” -
The token refresh is scheduled at 80% of the token’s lifetime and handled by SingleStore in the background.
-
For https Oauth token requests, by default, SingleStore will use the CA bundle specified at pipeline creation or try to find one of the system paths if none are specified.
This behavior can be changed by including additional configuration settings in the pipeline CONFIG on creation. This change will affect token request logic only, not the total SSL Kafka communication. -
"sasl.
- to use the system default path;oauthbearer. ssl. ca. location":"system" -
"sasl.
- to use a specific CA path location;oauthbearer. ssl. ca. location":"/usr/lib/ssl/certs/ca-certificates. crt" -
"sasl.
(empty) to disable SSL verification;oauthbearer. ssl. ca. location":""
-
Token Cache Details:
-
The OAUTHBEARER tokens are cached in the extractor pools for some time (along with the extractors) and the pipeline cookies (per pipeline).
The Oauth server must allow multiple simultaneous active tokens due to the several nodes or extractor pools used in the SingleStore Helios architecture. -
"sasl.
- by default, tokens are cached in the pipeline cookies to minimize the number of token requests.oauthbearer. cookie. cache":"on" To disable this feature set "sasl.
.oauthbearer. cookie. cache":"off" To reset the pipeline cookies, use alter pipeline … set offset cursor ''
. -
The pipeline may fail because the OAUTHBEARER client config on the identity server is changed or the pipeline config is altered in a way which causes previously requested cached tokens to become invalid.
This issue may be avoided by resetting the respective pipeline cache. -
alter pipeline … set offset cursor ''
- to reset cookies cache; -
flush extractor pools
- to reset extractor pools;
-
Kafka Version Setting
Warning
Using SSL and SASL with Kafka requires Kafka protocol version 0.CREATE PIPELINE .
and SELECT .
statements using SSL and SASL with Kafka also need to adhere to that version requirement.CONFIG
clause, similar to this CONFIG '{"kafka_
.pipelines_
engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE .
statement.
Final Step: Use the Connection String in a SQL Statement
Create your CREATE PIPELINE .
or SELECT .
statement, using the string containing the connection settings that you created in the previous steps.
Examples
Create a Kafka Pipeline with Data from Confluent Cloud
CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'CONFIG '{"sasl.username": "<CLUSTER_API_KEY>","sasl.mechanism": "PLAIN","security.protocol": "SASL_SSL","ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'INTO TABLE messages;
Publish Messages to Confluent Cloud
SELECT text FROM t INTOKAFKA '<Confluent Cloud workspace endpoint>/test-topic'CONFIG '{"sasl.username": "<CLUSTER_API_KEY>","sasl.mechanism": "PLAIN","security.protocol": "SASL_SSL","ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"};
Configure SASL OAUTHBEARER for Use with Kafka Pipelines
Note
SASL OAUTHBEARER is not supported with SELECT INTO .
CREATE or REPLACE PIPELINE <pipeline_name>AS LOAD DATA KAFKA "<Kafka cluster location>"CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"OAUTHBEARER","sasl.oauthbearer.client.id":"<CLIENT_ID>","sasl.oauthbearer.client.secret":<"CLIENT_SECRET>","sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>","sasl.oauthbearer.scope":"<SCOPE>"}'INTO TABLE <table_name>;
Load Data from the Confluent Kafka Connector
The SingleStore Confluent Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore Helios.
Learn more about the SingleStore Helios Confluent Kafka Connector, including how to install and configure it, in Working with the Kafka Connector.
Connecting to Confluent Schema Registry over SSL
To enable Pipelines to connect to Confluent Schema Registry over SSL, install the registry’s certificate (ca-cert) on all nodes in your SingleStore Helios workspace.
cat ca-cert >> /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pemupdate-ca-trust
Note
You only need to install the registry’s certificate (ca-cert) in your SingleStore Helios workspace if you are using a self-signed certificate or a certificate signed by an internal CA.
After installing the registry’s certificate on all nodes, existing Pipelines that you already created (or new Pipelines that you create) with the host name/IP address and port of Confluent Schema Registry will use SSL automatically.
Schema Registry Configuration Options
Supported configuration options include:
-
schema.
registry. ssl. ca. location -
schema.
registry. ssl. ca. directory -
schema.
registry. ssl. verifypeer -
schema.
registry. ssl. certificate. location -
schema.
registry. ssl. key. location -
schema.
registry. ssl. key. password -
schema.
registry. curl. timeout -
schema.
registry. curl. tcp. keepalive -
schema.
registry. curl. tcp. keepidle -
schema.
registry. curl. keepintvl -
schema.
registry. curl. connecttimeout -
schema.
registry. username -
schema.
registry. password -
schema.
registry. curl. verbose
Load Data from Kafka Using a Pipeline
To create and interact with a Kafka pipeline quickly, follow the instructions in this section.
-
Part 1: Sending Messages to Kafka
-
Part 2: Creating a Kafka Pipeline in SingleStore
Prerequisites
To complete this Quickstart, your environment must meet the following prerequisites:
-
A working Kafka queue.
-
SingleStore Helios installation –or– a SingleStore Helios workspace: You will connect to the database or workspace and create a pipeline to pull data from Kafka queue.
Upload a Certificate to Use to Connect via TLS/SSL
Use the following steps to enable TLS/SSL encryption between SingleStore Helios and Kafka.
-
From the Deployments page on the Cloud Portal, select the workspace group on which to enable TLS/SSL connections.
-
Select the Security tab at the top of the page.
-
Select the Upload Certificate button to upload your CA certificate.
This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.
Part 1: Sending Messages to Kafka
In Kafka, create a topic named test
and enter the following messages:
the quick
brown fox
jumped over
the lazy dog
In Part 2, you will create a pipeline in SingleStore to ingest these messages.
Part 2: Creating a Kafka Pipeline in SingleStore
Now that your Kafka topic contains some messages, you can create a new pipeline and ingest the messages.
At the SingleStore prompt, execute the following statements:
CREATE DATABASE quickstart_kafka;USE quickstart_kafka;CREATE TABLE messages (id text);
These statements create a new table and database that will be used for the Kafka pipeline.
To create the pipeline, execute the following statement, replacing <kafka-workspace-ip>
with your Kafka workspace's IP address:
CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-workspace-ip>/test' INTO TABLE messages;
If you are connecting with SSL, SASL, or Kerberos, you will need to specify additional clauses in your CREATE PIPELINE
statement.
Note: If you are creating a Kafka pipeline to load data from Confluent Cloud, execute the following statement instead of the previous CREATE PIPELINE
statement.
CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'CONFIG '{"sasl.username": "<CLUSTER_API_KEY>","sasl.mechanism": "PLAIN","security.protocol": "SASL_SSL","ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'INTO TABLE messages;
The two CREATE PIPELINE
statements just mentioned create a new Kafka pipeline named quickstart_
, which reads messages from the test
Kafka topic and writes it into the messages
table.
TEST PIPELINE quickstart_kafka LIMIT 1;
If this test was successful and no errors are present, then you are ready to try ingesting data.messages
.
START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;
To verify that the data exists in the messages
table as expected, execute the following statement.
SELECT * FROM messages;
+--------------+
| id |
+--------------+
| the quick |
| brown fox |
| jumped over |
| the lazy dog |
+--------------+
Now you are ready to start your pipeline as a background process.
START PIPELINE quickstart_kafka;
Now that the pipeline is up and running, send a few more messages to the Kafka topic.
Lorem ipsumdolor sit amet
In the SingleStore terminal window, run the SELECT * FROM messages;
statement again.
SELECT * FROM messages;
+----------------+
| id |
+----------------+
| lorem ipsum |
| dolor sit amet |
| the quick |
| brown fox |
| jumped over |
| the lazy dog |
+----------------+
Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_
table.
SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;
This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics.
Note
Foreground pipelines and background pipelines have different intended uses and behave differently.
Note: When a backup is restored all pipelines in that database will revert to the state (offsets, etc.
Test your Kafka Cluster using kcat (formerly kafkacat)
When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages.
Note
kcat
is available at https://github.
Running kcat
Minimal syntax for running kcat is as follows.
kcat <mode> -b <broker> -t <topic>
When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster.-X
option.
<mode>
One of the following:
-
-P: Producer mode.
Used to produce messages to a Kafka topic (not demonstrated here). -
-C: Consumer mode.
Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected. -
-L: Metadata listing mode.
Used here to test connectivity to a Kafka cluster. -
-Q: Query mode.
Not demonstrated here.
<broker>
The Kafka broker to connect to.
<topic>
The name of the topic to produce to, consume from, or list metadata from.
Verify connectivity to your Kafka Cluster
Running kcat in any mode, where the command returns successfully, is confirmation that you can connect.
Verify Connectivity by Returning Metadata
In the following example, kcat connects to a Confluent Cloud cluster using -L
mode to return metadata.
kcat -b <Confluent Cloud broker URL> -L \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Metadata for all topics (from broker ...)
20 brokers:
...
10 topics:
...
Verify Connectivity by Consuming One Message from a Topic
In the following example, kcat connects to a Confluent Cloud cluster using -C
mode to consume the last message from a topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Verify data in your Kafka Cluster
Two situations where you may want to verify data in your Kafka cluster are:
-
Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.
-
After using SingleStore to produce Kafka messages (using
SELECT .
).. . INTO KAFKA . . .
In both situations, you can run kcat in -C
mode for a given topic.
The following kcat command connects to a Confluent Cloud cluster using -C
mode.kcat_
.-e
option indicates that kcat should exit once it has consumed the last message from the topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \-f 'Partition: %p Offset: %o Message: %s\n' \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>> kcat_output.txt
Example: Verify messages in Kafka after producing messages using SELECT . . . INTO KAFKA . . .
Create a table with three columns.
CREATE TABLE numbers(a INT, b INT, c INT);
Insert data into the table.
INSERT INTO numbers(a,b,c) VALUES (10,20,30);INSERT INTO numbers(a,b,c) VALUES (40,50,60);INSERT INTO numbers(a,b,c) VALUES (70,80,90);
Use the SELECT .
command to create a Kafka message for each row of the numbers
table.numbers-topic
in a Confluent Cloud cluster.10
, 20
or 30
.
SELECT a + 10, b + 20, c + 30FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"PLAIN","ssl.ca.location":"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem","sasl.username":"<CLUSTER_API_KEY>"}'CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"LINES TERMINATED BY '}' STARTING BY '{';
Run the kcat command shown in the section "Verify data in your Kafka cluster", above.kcat_
output file and search for {20,40,60}
to verify that the message is in the file.{20,40,60}
.
Partition: 1 Offset: 0 Message: {20,40,60}Partition: 1 Offset: 1 Message: {50,70,90}Partition: 1 Offset: 2 Message: {80,100,120}...
Last modified: January 23, 2025