SingleStore Managed Service

Load Data from Kafka Using a Pipeline

To create and interact with a Kafka pipeline quickly, follow the instructions in this section. There are two parts to this Quickstart:

  1. Part 1: Sending Messages to Kafka

  2. Part 2: Creating a Kafka Pipeline in SingleStore

Prerequisites

To complete this Quickstart, your environment must meet the following prerequisites:

  • A working Kafka queue.

  • SingleStore DB installation –or– a SingleStore Managed Service cluster: You will connect to the database or cluster and create a pipeline to pull data from Kafka queue.

Part 1: Sending Messages to Kafka

In Kafka, create a topic named test and enter the following messages:

the quick
brown fox
jumped over
the lazy dog

In Part 2, you will create a pipeline in SingleStore to ingest these messages.

Part 2: Creating a Kafka Pipeline in SingleStore

Now that your Kafka topic contains some messages, you can use SingleStore Managed Service or SingleStore DB to create a new pipeline and ingest the messages.

At the SingleStore prompt, execute the following statements:

CREATE DATABASE quickstart_kafka;
USE quickstart_kafka;
CREATE TABLE messages (id text);

These statements create a new table and database that will be used for the Kafka pipeline. Before you can create the pipeline, you will need the IP address of the Kafka container.

To create the pipeline, execute the following statement, replacing <kafka-container-ip> with your Kafka container’s IP address:

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-container-ip>/test' INTO TABLE messages;

This command creates a new Kafka pipeline named quickstart_kafka, which reads messages from the test Kafka topic and writes it into the messages table. For more information, see the CREATE PIPELINE topic. If the statement was successful, you can test your pipeline. While you can start a pipeline after creating it, it’s always best to test it using a small set of data:

TEST PIPELINE quickstart_kafka LIMIT 1;

If this test was successful and no errors are present, then you are ready to try ingesting data. The following command will run one batch and commit the data to the SingleStore table messages.

START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;

To verify that the data exists in the messages table as expected, execute the following statement. If it was successful, you should see a non-empty result set.

SELECT * FROM messages;
****
+--------------+
| id           |
+--------------+
| the quick    |
| brown fox    |
| jumped over  |
| the lazy dog |
+--------------+

Now you are ready to start your pipeline as a background process. SingleStore will automatically ingest new messages as they are put into Kafka.

START PIPELINE quickstart_kafka;

Now that the pipeline is up and running, send a few more messages to the Kafka topic. Enter the following lines into your topic:

Lorem ipsum
dolor sit amet

In the SingleStore terminal window, run the SELECT * FROM messages; statement again. Now you will see the following output:

SELECT * FROM messages;
****
+----------------+
| id             |
+----------------+
| lorem ipsum    |
| dolor sit amet |
| the quick      |
| brown fox      |
| jumped over    |
| the lazy dog   |
+----------------+

Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_BATCHES_SUMMARY table.

SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;

This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics. This information is extremely valuable for monitoring and understanding your production pipeline system.

Notice

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see the CREATE PIPELINE topic.

Quickstart Summary

In this Kafka Quickstart, you sent multiple messages to a Kafka topic, and then created a table in SingleStore and a Kafka pipeline in SingleStore to ingest the messages. This Quickstart only demonstrated the most basic functionality of a Kafka pipeline, but you can apply the same concepts to a real-world scenario.

Now that you’re familiar with using SingleStore and Kafka, you can also try the SingleStore Pipelines Twitter Demo. This demo ingests live Twitter data into SingleStore, and shows you how to perform queries that analyze user trends and sentiments.

Securing Kafka Pipelines with TLS/SSL or Kerberos

You can connect a pipeline to a Kafka cluster through SSL and optionally authenticate through SASL. The following SASL authentication mechanisms are supported: * GSSAPI (Kerberos) * PLAIN * SCRAM-SHA-256 * SCRAM-SHA-512

This topic assumes you have already have set up, configured, and enabled SSL and/or Kerberos on your Kafka brokers. For information on how to enable this functionality, see the SSL and SASL sections in the Kafka documentation.

Warning

Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, each pipeline using SSL and SASL with Kafka also needs to adhere to that version requirement. The Kafka protocol version can be passed in through JSON in the CREATE PIPELINE statement through the CONFIG clause, similarly to this CONFIG '{"kafka_version":"0.10.0.0"}'. Alternatively, the pipelines_kafka_version global variable controls this parameter for any pipeline without a Kafka version configuration value.

Like pipelines such as S3 or Azure, credentials are passed in through JSON in the CREATE PIPELINE statement. Any credentials used to encrypt or authenticate must be present on each node in the SingleStore DB cluster.

The security.protocol credential specifies the encryption and authentication mechanisms used to connect to Kafka brokers. This property can be one of four values: plaintext, ssl, sasl_plaintext, or sasl_ssl. Depending on this value, you may have to supply additional credentials in your JSON.

Connect SingleStore Managed Service to Kafka using TLS/SSL

Use the following steps to enable TLS/SSL encryption between SingleStore Managed Service and Kafka.

  1. CREATE TABLE to use with the Kafka pipeline.

  2. From the Clusters page on the SingleStore Customer Portal, click on the cluster that you want to enable TLS/SSL connections.

  3. Click the Security tab at the top of the page.

  4. Click on the Upload Certificate button to upload your CA certificate. This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.

  5. Make a copy of the following SQL statement that will create the Kafka pipeline and replace the text in angle brackets with the values provided in the following steps.

    CREATE PIPELINE `<pipeline-name>`
    AS LOAD DATA KAFKA '<data-source>'
    CONFIG '{"security.protocol": "ssl",
    "ssl.ca.location": "<ca-certificate-file-path>"}'
    CREDENTIALS '{"ssl.key.password": "<your-password>"}'
    INTO table <table-name>;
    
  6. Replace <pipeline-name> with a name for your pipeline.

  7. Replace <data-source> with the Kafka data source.

  8. Replace <ca-certificate-file-path> with the full path to the SSL certificate on the SingleStore DB node.

  9. If your SSL certificate key is using a password, replace <your-password> with the SSL certificate key password.

  10. Replace <table-name> with the table you created earlier.

  11. The final SQL statement will resemble:

    CREATE PIPELINE `kafka_ssl`
    AS LOAD DATA KAFKA 'kafka-host:9093/test'
    CONFIG '{"security.protocol": "ssl",
    "ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'
    CREDENTIALS '{"ssl.key.password": "your-ssl-key-password"}'
    INTO table your-table-name;
    
  12. Copy and paste the final SQL statement into the SQL Editor of SingleStore DB Studio and execute it. This will create a pipeline that connects SingleStore Managed Service to Kafka.

  13. In the SQL Editor, START PIPELINE.

    START PIPELINE kafka_ssl;
    
  14. You may check the status of your pipeline in SingleStore DB Studio by clicking on Pipelines in the left sidebar.

Connecting over SSL

To enable SSL encryption between Kafka and SingleStore DB, perform the following steps:

  1. Securely copy the CA certificate, SSL certificate, and SSL key used for connections between the SingleStore DB cluster and Kafka brokers from the Kafka cluster to every SingleStore DB node. You should use a secure file transfer method, such as scp, to copy the files to your SingleStore DB nodes. The file locations on your SingleStore DB nodes should be consistent across the cluster.

  2. In your CONFIG JSON, if you want to enable SSL encryption only, set "security.protocol": "ssl". If you want to enable Kerberos with SSL, set "security.protocol": "sasl_ssl" and set the Kerberos credentials after you’ve completed step 3.

  3. Set the remaining SSL configuration in the CONFIG JSON:

    • ssl.ca.location: Path to the CA certificate on the SingleStore DB node.

    • ssl.certificate.location: Path to the SSL certificate on the SingleStore DB node.

    • ssl.key.location: Path to the SSL certificate key on the SingleStore DB node.

  4. If your SSL certificate key is using a password, set it in your CREDENDIALS JSON.

    • ssl.key.password: Password for SSL certificate key.

Authenticating with Kerberos

To configure a Kafka pipeline to authenticate with Kerberos, you must configure all of your nodes in your SingleStore DB cluster as clients for Kerberos authentication and then set the credentials in your CREATE PIPELINE statement. To do this, perform the following steps:

  1. Securely copy the keytab file containing the SingleStore DB service principal (e.g. memsql/host.domain.com@REALM.NAME) from the Kerberos server to every node in your SingleStore DB cluster. You should use a secure file transfer method, such as scp, to copy the keytab file to your SingleStore DB nodes. The file location on your SingleStore DB nodes should be consistent across the cluster.

  2. Make sure your SingleStore DB nodes can connect to the KDC server using the fully-qualified domain name (FQDN) of the KDC server. This might require configuring network settings or updating /etc/hosts on your nodes.

  3. Also ensure that the memsql service account on the node can access the copied keytab file. This can be accomplished by changing file ownership or permissions. If the memsql account cannot access the keytab file, you will not be able to complete the next step because your master aggregator will not be able to restart after applying configuration updates.

  4. When authenticating with Kerberos, SingleStore DB needs to authenticate as a client, which means you must also install a Kerberos client onto each node in your cluster. The following installs the krb5-user package for Debian-based Linux distributions.

    $ sudo apt-get update && apt-get install krb5-user
    

    When setting up your kerberos configuration settings, set your default realm, Kerberos admin server, and other options to those defined by your KDC server. In the examples used in this topic, the default realm is EXAMPLE.COM, and the Kerberos server settings are set to the FQDN of the KDC server host.example.com.

  5. In your CONFIG JSON, set "sasl.mechanism": "GSSAPI".

  6. Set "security.protocol": "sasl_ssl" for Kerberos and SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kerberos without SSL encryption. If you want to use Kerberos with SSL, make sure SSL is configured and enabled on the Kafka brokers and then add the SSL credential properties defined in the previous section.

  7. Set the remaining Kerberos configuration in CONFIG JSON:

    • sasl.kerberos.service.name: The Kerberos principal name that Kafka runs as. For example, "kafka".

    • sasl.kerberos.keytab: The local file path on the SingleStore DB node to the authenticating keytab.

    • sasl.kerberos.principal: The service principal name for the SingleStore DB cluster. For example, "memsql/host.example.com@EXAMPLE.COM".

Authenticating with PLAIN or SCRAM SASL mechanism

To configure a Kafka pipeline to authenticate with other SASL mechanism, you must set the credentials in your CREATE PIPELINE statement. To do this, perform the following steps:

  1. In your CONFIG JSON, set "sasl.mechanism": "PLAIN". If your Kafka brokers uses SCRAM for authentication, then set "sasl.mechanism": "SCRAM-SHA-256" or "sasl.mechanism": "SCRAM-SHA-512".

  2. In your CONFIG JSON, set "security.protocol": "sasl_ssl" for and SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kafka without SSL encryption.

  3. In your CONFIG JSON, provide the username, "sasl.username": "<kafka_credential_username>".

  4. In your CREDENTIALS JSON, provide the password, "sasl.password": "<kafka_credential_password>".

SASL_PLAINTEXT/PLAIN Security

Please note that SASL_PLAINTEXT/PLAIN authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore insecure and susceptible to being sniffed.

Also note that SASL_PLAINTEXT/SCRAM authentication mode with Kafka will encrypt the credentials information send over the network, but transport of Kafka messages themselves is still insecure.

Examples

The following examples make the following assumptions:

  • Port 9092 is a plaintext endpoint

  • Port 9093 is an SSL endpoint

  • Port 9094 is a plaintext SASL endpoint

  • Port 9095 is an SSL SASL endpoint

Plaintext

The following CREATE PIPELINE statements are equivalent:

CREATE PIPELINE `kafka_plaintext`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
CONFIG '{"security.protocol": "plaintext"}'
INTO table t;
CREATE PIPELINE `kafka_no_creds`
AS LOAD DATA KAFKA 'host.example.com:9092/test'
INTO table t;

SSL

CREATE PIPELINE `kafka_ssl`
AS LOAD DATA KAFKA 'host.example.com:9093/test'
CONFIG '{"security.protocol": "ssl",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t;

Kerberos with no SSL

CREATE PIPELINE `kafka_kerberos_no_ssl`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{""security.protocol": "sasl_plaintext",
"sasl.mechanism": "GSSAPI",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
INTO table t;

Kerberos with SSL

CREATE PIPELINE `kafka_kerberos_ssl`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "GSSAPI",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.kerberos.service.name": "kafka",
"sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM",
"sasl.kerberos.keytab": "/etc/krb5.keytab"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh"}'
INTO table t;

SASL/PLAIN with SSL

CREATE PIPELINE `kafka_sasl_ssl_plain`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "PLAIN",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/PLAIN without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "PLAIN",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM with SSL

CREATE PIPELINE `kafka_sasl_ssl_scram`
AS LOAD DATA KAFKA 'host.example.com:9095/test'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "SCRAM-SHA-512",
"ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem",
"ssl.key.location": "/var/private/ssl/client_memsql_client.key",
"ssl.ca.location": "/var/private/ssl/ca-cert.pem",
"sasl.username": "kafka"}'
CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'
INTO table t;

SASL/SCRAM without SSL

CREATE PIPELINE `kafka_sasl_plaintext_plain`
AS LOAD DATA KAFKA 'host.example.com:9094/test'
CONFIG '{"security.protocol": "sasl_plaintext",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "kafka"}'
CREDENTIALS '{"sasl.password": "metamorphosis"}'
INTO table t;