Load Data from Kafka

Securely Connect to Kafka from SingleStore

Overview

When running a CREATE PIPELINE ... KAFKA ... statement, you may need to make a secure connection to Kafka.

Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate. Using SASL for authentication is optional.

  • GSSAPI (Kerberos)

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

Kerberos may used for authentication.

This topic assumes SSL and/or Kerberos have been set up, configured, and enabled on the Kafka brokers. For information on how to enable this functionality, see the SSL and SASL sections in the Kafka documentation.

Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key

To use SSL encryption for SingleStore pipelines, JKS keys need to be converted to PEM keys.

Note

In the steps below, the < > symbols indicate a variable and any information provided between these symbols is an example.

  1. Create the key and keystore. You will be prompted to enter details such as name, organizational unit, city, state, etc.

    keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"
    What is your first and last name?
     [Unknown]:- 
    What is the name of your organizational unit?
     [Unknown]:-
    What is the name of your organization?
     [Unknown]:-
  2. Export the client certificate from the keystore using the same password as in step 1.

    keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jks
    Enter keystore password:  <password>
    Certificate stored in file <pem-name>.pem
  3. Import the client certificate to the truststore located on your Apache Server. Enter a new password for the keystore.

    keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pem
    Enter keystore password:  
    Re-enter new password: 
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
  4. Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format. The <key name>.jks and password are the same as in step 1.

    keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12
    Importing keystore <keyname>.jks to <new-keyname>.p12...
    Enter destination keystore password:  
    Re-enter new password: 
    Enter source keystore password:  
    [Storing new-keystore-name.p12]
  5. Extract the client certificate key into a .pem file. Use the import password created in step 4.

    openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pem
    Enter Import Password:

Note

Refer to Generating SSL Certificates for more information.

Steps for Creating a Secure Connection

To create a secure connection from SingleStore to a Kafka cluster, follow these steps in order.

Copy Security Files to the SingleStoreCluster

Securely copy the CA certificate, SSL certificate, and SSL key used for connections between the SingleStorecluster and Kafka brokers from the Kafka cluster to each SingleStore node. You should use a secure file transfer method, such as scp, to copy the files to your SingleStore nodes. The file locations on your SingleStore nodes should be consistent across the cluster.

Setup Your SingleStoreCluster for Kerberos Authentication

To configure a Kafka pipeline to authenticate with Kerberos, you must configure all of your nodes in your SingleStorecluster as clients for Kerberos authentication. To do this, perform the following steps:

  1. Securely copy the keytab file containing the SingleStore service principal (e.g. memsql/host.domain.com@REALM.NAME) from the Kerberos server to each node in your SingleStorecluster. You should use a secure file transfer method, such as scp, to copy the keytab file to your SingleStore nodes. The file location on your SingleStore nodes should be consistent across the cluster.

  2. Make sure your SingleStore nodes can connect to the KDC server using the fully-qualified domain name (FQDN) of the KDC server. This might require configuring network settings or updating /etc/hosts on your nodes.

  3. Also ensure that the memsql service account on each node can access the copied keytab file. This can be accomplished by changing file ownership or permissions. If the memsql account cannot access the keytab file, you will not be able to complete the next step because your master aggregator will not be able to restart after applying configuration updates.

  4. When authenticating with Kerberos, SingleStore needs to authenticate as a client, which means you must also install a Kerberos client onto each node in your cluster. The following installs the krb5-user package for Debian-based Linux distributions.

    sudo apt-get update && apt-get install krb5-user

    When setting up your Kerberos configuration settings, set your default realm, Kerberos admin server, and other options to those defined by your KDC server. In the examples used in this topic, the default realm is EXAMPLE.COM, and the Kerberos server settings are set to the FQDN of the KDC server host.example.com.

Build a String with the Connection Settings

Using the following settings, create a string containing the CONFIG clause and optionally the CREDENTIALS clause of the CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement that you will be running.

SSL Connection Settings
  1. In your CONFIG JSON, if you want to enable SSL encryption only, set "security.protocol": "ssl". If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.protocol": "sasl_ssl".

  2. Set the remaining SSL configuration in the CONFIG JSON:

    • ssl.ca.location: Path to the CA certificate on the SingleStore node.

    • ssl.certificate.location: Path to the SSL certificate on the SingleStore node.

    • ssl.key.location: Path to the SSL certificate key on the SingleStore node.

  3. If your SSL certificate key is using a password, set it in your CREDENTIALS JSON.

    • ssl.key.password: Password for the SSL certificate key.

SASL Connection Settings
  1. In your CONFIG JSON, set "security.protocol": "sasl_ssl" for SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kafka without SSL encryption.

  2. If your Kafka brokers do not use SCRAM for authentication, set "sasl.mechanism": "PLAIN" in your CONFIG JSON. Otherwise, set "sasl.mechanism": "SCRAM-SHA-256" or "sasl.mechanism": "SCRAM-SHA-512".

  3. In your CONFIG JSON, provide the username, "sasl.username": "<kafka_credential_username>".

  4. In your CREDENTIALS JSON, provide the password, "sasl.password": "<kafka_credential_password>".

Note

SASL_PLAINTEXT/PLAIN authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore not secure and susceptible to being sniffed.

SASL_PLAINTEXT/SCRAM authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.

Kerberos Connection Settings
  1. In your CONFIG JSON, set "sasl.mechanism": "GSSAPI".

  2. Set "security.protocol": "sasl_ssl" for Kerberos and SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kerberos without SSL encryption.

  3. Set the remaining Kerberos configuration in CONFIG JSON:

    • sasl.kerberos.service.name: The Kerberos principal name that Kafka runs as. For example, "kafka".

    • sasl.kerberos.keytab: The local file path on the SingleStore node to the authenticating keytab.

    • sasl.kerberos.principal: The service principal name for the SingleStorecluster. For example, "memsql/host.example.com@EXAMPLE.COM".

Kafka Version Setting

Warning

Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, CREATE PIPELINE ... KAFKA ... and SELECT ... INTO KAFKA ... statements using SSL and SASL with Kafka also need to adhere to that version requirement. The Kafka protocol version can be passed in through JSON through the CONFIG clause, similar to this CONFIG '{"kafka_version":"0.10.0.0"}'. Alternatively, the pipelines_kafka_version engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE ... KAFKA ... statement.

Final Step: Use the Connection String in a SQL Statement

Create your CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement, using the string containing the connection settings that you created in the previous steps.

Examples

The following examples make the following assumptions:

  • Port 9092 is a plaintext endpoint

  • Port 9093 is an SSL endpoint

  • Port 9094 is a plaintext SASL endpoint

  • Port 9095 is an SSL SASL endpoint

Note

The examples use CREATE PIPELINE, but the CONFIG and CREDENTIALS clauses shown can be used with SELECT ... INTO ... KAFKA also. SASL OAUTHBEARER is not supported with SELECT ... INTO ... KAFKA.

Plaintext

The following CREATE PIPELINE statements are equivalent:

CREATE PIPELINE `kafka_plaintext`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
CONFIG '{"security.protocol": "plaintext"}'
INTO table t;
CREATE PIPELINE `kafka_no_creds`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
INTO table t;

SSL

CREATE PIPELINE `kafka_ssl`
AS LOAD DATA KAFKA 'host.example.com:9093/test'
CONFIG '{"security.protocol": "ssl",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t;

Kerberos with no SSL

CREATE PIPELINE `kafka_kerberos_no_ssl`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{""security.protocol": "sasl_plaintext",
"sasl.mechanism": "GSSAPI",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
INTO table t

Kerberos with SSL

CREATE PIPELINE `kafka_kerberos_ssl`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "GSSAPI",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t

SASL/PLAIN with SSL

CREATE PIPELINE `kafka_sasl_ssl_plain`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "PLAIN",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/PLAIN without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM with SSL

CREATE PIPELINE `kafka_sasl_ssl_scram`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;

Load Data from the Confluent Kafka Connector

The SingleStore Confluent Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore. More specifically, the Confluent Kafka Connector is a Sink (target) connector designed to read data from Kafka topics and write that data to SingleStore tables.

Learn more about the SingleStore Confluent Kafka Connector, including how to install and configure it, in Working with the Kafka Connector.

Connecting to Confluent Schema Registry over SSL

To enable Pipelines to connect to Confluent Schema Registry over SSL, install the registry’s certificate (ca-cert) on all nodes in your SingleStore cluster. For example:

cat ca-cert >> /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem
update-ca-trust

Note

You only need to install the registry’s certificate (ca-cert) in your SingleStore cluster if you are using a self-signed certificate or a certificate signed by an internal CA.

After installing the registry’s certificate on all nodes, existing Pipelines that you already created (or new Pipelines that you create) with the host name/IP address and port of Confluent Schema Registry will use SSL automatically.

Schema Registry Configuration Options

Supported configuration options include:

  • schema.registry.ssl.ca.location

  • schema.registry.ssl.ca.directory

  • schema.registry.ssl.verifypeer

  • schema.registry.ssl.certificate.location

  • schema.registry.ssl.key.location

  • schema.registry.ssl.key.password

  • schema.registry.curl.timeout

  • schema.registry.curl.tcp.keepalive

  • schema.registry.curl.tcp.keepidle

  • schema.registry.curl.keepintvl

  • schema.registry.curl.connecttimeout

  • schema.registry.username

  • schema.registry.password

  • schema.registry.curl.verbose

Load Data from Kafka Using a Pipeline

To create and interact with a Kafka pipeline quickly, follow the instructions in this section. There are two parts to this Quickstart:

  1. Part 1: Sending Messages to Kafka

  2. Part 2: Creating a Kafka Pipeline in SingleStore

Prerequisites

To complete this Quickstart, your environment must meet the following prerequisites:

  • A working Kafka queue.

  • SingleStore installation –or– a SingleStore cluster: You will connect to the database or cluster and create a pipeline to pull data from Kafka queue.

Part 1: Sending Messages to Kafka

In Kafka, create a topic named test and enter the following messages:

the quick
brown fox
jumped over
the lazy dog

In Part 2, you will create a pipeline in SingleStore to ingest these messages.

Part 2: Creating a Kafka Pipeline in SingleStore

Now that your Kafka topic contains some messages, you can create a new pipeline and ingest the messages.

At the SingleStore prompt, execute the following statements:

CREATE DATABASE quickstart_kafka;
USE quickstart_kafka;
CREATE TABLE messages (id text);

These statements create a new table and database that will be used for the Kafka pipeline. Before you can create the pipeline, you will need the IP address of the Kafka cluster.

To create the pipeline, execute the following statement, replacing <kafka-cluster-ip> with your Kafka cluster's IP address:

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-cluster-ip>/test' INTO TABLE messages;

If you are connecting with SSL, SASL, or Kerberos, you will need to specify additional clauses in your CREATE PIPELINE statement. See Securely Connect to Kafka for more information.

The CREATE PIPELINE statement just mentioned creates a new Kafka pipeline named quickstart_kafka, which reads messages from the test Kafka topic and writes it into the messages table. For more information, see the CREATE PIPELINE topic. If the statement was successful, you can test your pipeline. While you can start a pipeline after creating it, it’s always best to test it using a small set of data:

TEST PIPELINE quickstart_kafka LIMIT 1;

If this test was successful and no errors are present, then you are ready to try ingesting data. The following command will run one batch and commit the data to the SingleStore table messages.

START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;

To verify that the data exists in the messages table as expected, execute the following statement. If it was successful, you should see a non-empty result set.

SELECT * FROM messages;
+--------------+
| id           |
+--------------+
| the quick    |
| brown fox    |
| jumped over  |
| the lazy dog |
+--------------+

Now you are ready to start your pipeline as a background process. SingleStore will automatically ingest new messages as they are put into Kafka.

START PIPELINE quickstart_kafka;

Now that the pipeline is up and running, send a few more messages to the Kafka topic. Enter the following lines into your topic:

Lorem ipsum
dolor sit amet

In the SingleStore terminal window, run the SELECT * FROM messages; statement again. Now you will see the following output:

SELECT * FROM messages;
+----------------+
| id             |
+----------------+
| lorem ipsum    |
| dolor sit amet |
| the quick      |
| brown fox      |
| jumped over    |
| the lazy dog   |
+----------------+

Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_BATCHES_SUMMARY table.

SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;

This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics. This information is extremely valuable for monitoring and understanding your production pipeline system.

Note

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see the CREATE PIPELINE topic.

Note: When a backup is restored all pipelines in that database will revert to the state (offsets, etc.) they were in when the target backup was generated. If the data isn't in Kafka anymore you will start from the earliest offsets available in Kafka.

Test your Kafka Cluster using kcat (formerly kafkacat)

When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages. kcat is a command line utility you can use to perform these tasks.

Note

kcat is available at https://github.com/edenhill/kcat. It is a third-party utility and SingleStore does not provide support for it.

Running kcat

Minimal syntax for running kcat is as follows.

kcat <mode> -b <broker> -t <topic>

When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster. Specify these parameters using the -X option. Examples of these parameters are shown in the code samples below, which demonstrate how to connect to Confluent Cloud.

<mode>

One of the following:

  • -P: Producer mode. Used to produce messages to a Kafka topic (not demonstrated here).

  • -C: Consumer mode. Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected.

  • -L: Metadata listing mode. Used here to test connectivity to a Kafka cluster.

  • -Q: Query mode. Not demonstrated here.

<broker>

The Kafka broker to connect to.

<topic>

The name of the topic to produce to, consume from, or list metadata from.

Verify connectivity to your Kafka Cluster

Running kcat in any mode, where the command returns successfully, is confirmation that you can connect. Following are two examples of how you can verify connectivity.

Verify Connectivity by Returning Metadata

In the following example, kcat connects to a Confluent Cloud cluster using -L mode to return metadata.

kcat -b <Confluent Cloud broker URL> -L \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
Metadata for all topics (from broker ...)

20 brokers:
...

10 topics:
...

Verify Connectivity by Consuming One Message from a Topic

In the following example, kcat connects to a Confluent Cloud cluster using -C mode to consume the last message from a topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>

Verify data in your Kafka Cluster

Two situations where you may want to verify data in your Kafka cluster are:

  • Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.

  • After using SingleStore to produce Kafka messages (using SELECT ... INTO KAFKA ...).

In both situations, you can run kcat in -C mode for a given topic.

The following kcat command connects to a Confluent Cloud cluster using -C mode. The command outputs all of the messages for the specified topic, also showing the offset and partition for each message. The output is piped to the file kcat_output.txt. The -e option indicates that kcat should exit once it has consumed the last message from the topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \
-f 'Partition: %p  Offset: %o  Message: %s\n' \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
> kcat_output.txt

Example: Verify messages in Kafka after producing messages using SELECT ... INTO KAFKA ...

Create a table with three columns.

CREATE TABLE numbers(a INT, b INT, c INT);

Insert data into the table.

INSERT INTO numbers(a,b,c) VALUES (10,20,30);
INSERT INTO numbers(a,b,c) VALUES (40,50,60);
INSERT INTO numbers(a,b,c) VALUES (70,80,90);

Use the SELECT ... INTO KAFKA ... command to create a Kafka message for each row of the numbers table. All of the messages are written to the topic numbers-topic in a Confluent Cloud cluster. The field values in each message correspond to the field values in the table, incremented by 10 , 20 or 30.

SELECT a + 10, b + 20, c + 30
FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'
CONFIG '{
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"ssl.ca.location":"<CA certificate file path>",
"sasl.username":"<CLUSTER_API_KEY>"
}'
CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"
LINES TERMINATED BY '}' STARTING BY '{';

Run the kcat command shown in the section "Verify data in your Kafka cluster", above. View the kcat_output.txt output file and search for {20,40,60} to verify that the message is in the file. You will see the other two messages following {20,40,60}. The messages in the output will look as follows. The partitions and offsets may differ.

Partition: 1 Offset: 0 Message: {20,40,60}
Partition: 1 Offset: 1 Message: {50,70,90}
Partition: 1 Offset: 2 Message: {80,100,120}
...

Last modified: January 23, 2025

Was this article helpful?