Load Data from Kafka
On this page
Securely Connect to Kafka from SingleStore
Overview
When running a CREATE PIPELINE .
statement, you may need to make a secure connection to Kafka.
Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate.
-
GSSAPI (Kerberos)
-
PLAIN
-
SCRAM-SHA-256
-
SCRAM-SHA-512
-
OAUTHBEARER SASL
Kerberos may used for authentication.
This topic assumes SSL and/or Kerberos have been set up, configured, and enabled on the Kafka brokers.
Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key
To use SSL encryption for SingleStore pipelines, JKS keys need to be converted to PEM keys.
Note
In the steps below, the < > symbols indicate a variable and any information provided between these symbols is an example.
-
Create the key and keystore.
You will be prompted to enter details such as name, organizational unit, city, state, etc. keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"What is your first and last name? [Unknown]:- What is the name of your organizational unit? [Unknown]:- What is the name of your organization? [Unknown]:-
-
Export the client certificate from the keystore using the same password as in step 1.
keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jksEnter keystore password: <password> Certificate stored in file <pem-name>.pem
-
Import the client certificate to the truststore located on your Apache Server.
Enter a new password for the keystore. keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pemEnter keystore password: Re-enter new password: Trust this certificate? [no]: yes Certificate was added to keystore
-
Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format.
The <key name>. jks and password are the same as in step 1. keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12Importing keystore <keyname>.jks to <new-keyname>.p12... Enter destination keystore password: Re-enter new password: Enter source keystore password: [Storing new-keystore-name.p12]
-
Extract the client certificate key into a .
pem file. Use the import password created in step 4. openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pemEnter Import Password:
Note
Refer to Generating SSL Certificates for more information.
Steps for Creating a Secure Connection
To create a secure connection from SingleStore to a Kafka cluster, follow these steps in order.
Copy Security Files to the SingleStoreCluster
Securely copy the CA certificate, SSL certificate, and SSL key used for connections between the SingleStorecluster and Kafka brokers from the Kafka cluster to each SingleStore node.scp
, to copy the files to your SingleStore nodes.
Setup Your SingleStoreCluster for Kerberos Authentication
To configure a Kafka pipeline to authenticate with Kerberos, you must configure all of your nodes in your SingleStorecluster as clients for Kerberos authentication.
-
Securely copy the keytab file containing the SingleStore service principal (e.
g. memsql/host.
) from the Kerberos server to each node in your SingleStorecluster.domain. com@REALM. NAME You should use a secure file transfer method, such as scp
, to copy the keytab file to your SingleStore nodes.The file location on your SingleStore nodes should be consistent across the cluster. -
Make sure your SingleStore nodes can connect to the KDC server using the fully-qualified domain name (FQDN) of the KDC server.
This might require configuring network settings or updating /etc/hosts
on your nodes. -
Also ensure that the
memsql
service account on each node can access the copied keytab file.This can be accomplished by changing file ownership or permissions. If the memsql
account cannot access the keytab file, you will not be able to complete the next step because your master aggregator will not be able to restart after applying configuration updates. -
When authenticating with Kerberos, SingleStore needs to authenticate as a client, which means you must also install a Kerberos client onto each node in your cluster.
The following installs the krb5-user
package for Debian-based Linux distributions.sudo apt-get update && apt-get install krb5-userWhen setting up your Kerberos configuration settings, set your default realm, Kerberos admin server, and other options to those defined by your KDC server.
In the examples used in this topic, the default realm is EXAMPLE.
, and the Kerberos server settings are set to the FQDN of the KDC serverCOM host.
.example. com
Build a String with the Connection Settings
Using the following settings, create a string containing the CONFIG
clause and optionally the CREDENTIALS
clause of the CREATE PIPELINE .
or SELECT .
statement that you will be running.
SSL Connection Settings
-
In your
CONFIG
JSON, if you want to enable SSL encryption only, set"security.
.protocol": "ssl" If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.
.protocol": "sasl_ ssl" -
Set the remaining SSL configuration in the
CONFIG
JSON:-
ssl.
: Path to the CA certificate on the SingleStore node.ca. location -
ssl.
: Path to the SSL certificate on the SingleStore node.certificate. location -
ssl.
: Path to the SSL certificate key on the SingleStore node.key. location
-
-
If your SSL certificate key is using a password, set it in your
CREDENTIALS
JSON.-
ssl.
: Password for the SSL certificate key.key. password
-
SASL Connection Settings
-
In your
CONFIG
JSON, set"security.
for SSL connections, orprotocol": "sasl_ ssl" "security.
if you want to authenticate with Kafka without SSL encryption.protocol": "sasl_ plaintext" -
If your Kafka brokers do not use SCRAM for authentication, set
"sasl.
in yourmechanism": "PLAIN" CONFIG
JSON.Otherwise, set "sasl.
ormechanism": "SCRAM-SHA-256" "sasl.
.mechanism": "SCRAM-SHA-512" -
In your
CONFIG
JSON, provide the username,"sasl.
.username": "<kafka_ credential_ username>" -
In your
CREDENTIALS
JSON, provide the password,"sasl.
.password": "<kafka_ credential_ password>"
Note
SASL_
authentication mode with Kafka sends your credentials unencrypted over the network.
SASL_
authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.
Kerberos Connection Settings
-
In your
CONFIG
JSON, set"sasl.
.mechanism": "GSSAPI" -
Set
"security.
for Kerberos and SSL connections, orprotocol": "sasl_ ssl" "security.
if you want to authenticate with Kerberos without SSL encryption.protocol": "sasl_ plaintext" -
Set the remaining Kerberos configuration in
CONFIG
JSON:-
sasl.
: The Kerberos principal name that Kafka runs as.kerberos. service. name For example, "kafka"
. -
sasl.
: The local file path on the SingleStore node to the authenticating keytab.kerberos. keytab -
sasl.
: The service principal name for the SingleStorecluster.kerberos. principal For example, "memsql/host.
.example. com@EXAMPLE. COM"
-
Configuring OAUTHBEARER Authentication Mechanism
OAUTHBEARER is used to secure access to resources on a server by requiring clients to obtain a bearer token.
To use SASL OAUTHBEARER authentication with Kafka, the following information is required:
-
"sasl.
- Specifies the client will authenticate using an OAuth 2.mechanism":"OAUTHBEARER" 0 Bearer Token. -
"sasl.
- The client ID is usually provided by the OAuth provider when the client is registered.oauthbearer. client. id":"<CLIENT_ ID>" It is a unique ID that is associated with the OAuth 2. 0 Bearer Token. -
"sasl.
The client secret is usually assigned by the OAuth provider when a client is registered and is used to authenticate the client.oauthbearer. client. secret":"<CLIENT_ SECRET>" -
"sasl.
- This is the endpoint URL on the authorization server that is used to obtain an OAuth 2.oauthbearer. token. endpoint. url":"<ENDPOINT_ URL>" 0 Bearer Token. The client sends a request to this endpoint to get a token, which is then used to authenticate subsequent requests to the server.
Optional configurations are:
-
"sasl.
- Determines the permissions and resources that are available to an authorized client.oauthbearer. scope":"<SCOPE>" This extension is optional. -
"sasl.
- Can be included in the SASL/OAUTHBEARER mechanism to provide additional data or parameters for authentication.outhbearer. extensions":"<EXTENSION>" Consult RFC-7628 for further information on SASL extensions. -
See "OAUTHBEARER Pipelines Configuration Details" section for additional details regarding "sasl.
oauthbearer. ssl. ca. location" "sasl. oauthbearer. config" fields.
Prerequisites
To use SASL OAUTHBEARER authentication the following prerequisites are required:
-
A Kafka broker with listeners configured using OAUTHBEARER authentication.
-
A connection to a database where Kafka brokers are reachable to create a pipeline to pull data from the Kafka queue.
-
An identity service was selected (e.
g. , Okta, Google OAuth, Facebook OAuth, Azure AD, Keycloak, etc. ). -
An Oauthbearer client was created and configured with the client_
credentials grant type on the identity service.
Note
The instructions for setting up a client on the Identity Service will vary depending on the server chosen and the type of application being built.
Syntax for SASL OAUTHBEARER Pipeline
Below is the syntax for creating a Kafka pipeline using OAUTHBEARER authentication.
CREATE or REPLACE PIPELINE <pipeline_name>AS LOAD DATA KAFKA "<Kafka cluster location>"CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"OAUTHBEARER","sasl.oauthbearer.client.id":"<CLIENT_ID>","sasl.oauthbearer.client.secret":<"CLIENT_SECRET>","sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>","sasl.oauthbearer.scope":"<SCOPE>"}'INTO TABLE <table_name>;
OAUTHBEARER Pipelines Configuration Details
To review the current pipeline configuration use the command:
SELECT * FROM information_schema.pipelines
Most token providers are assumed to work out-of-the-box.
Token Request Details:
-
SingleStore implements
client_
grant type for OAUTHBEARER token requests.credentials Ensure the OAuth client is created with support for the client_
grant type.credentials -
By default, SingleStore treats Oauthbearer tokens as opaque to address privacy concerns and uses the expires_
in field from the JSON token response to determine the token's validity time. If expires_
is not present orin "sasl.
is set in the pipeline configuration, we fall back to decoding the Oauth token JWT and use the exp claim to determine its validity.oauthbearer. config”:"use_ expires_ in=false” -
The token refresh is scheduled at 80% of the token’s lifetime and handled by SingleStore in the background.
-
For https Oauth token requests, by default, SingleStore will use the CA bundle specified at pipeline creation or try to find one of the system paths if none are specified.
This behavior can be changed by including additional configuration settings in the pipeline CONFIG on creation. This change will affect token request logic only, not the total SSL Kafka communication. -
"sasl.
- to use the system default path;oauthbearer. ssl. ca. location":"system" -
"sasl.
- to use a specific CA path location;oauthbearer. ssl. ca. location":"/usr/lib/ssl/certs/ca-certificates. crt" -
"sasl.
(empty) to disable SSL verification;oauthbearer. ssl. ca. location":""
-
Token Cache Details:
-
The OAUTHBEARER tokens are cached in the extractor pools for some time (along with the extractors) and the pipeline cookies (per pipeline).
The Oauth server must allow multiple simultaneous active tokens due to the several nodes or extractor pools used in the SingleStore architecture. -
"sasl.
- by default, tokens are cached in the pipeline cookies to minimize the number of token requests.oauthbearer. cookie. cache":"on" To disable this feature set "sasl.
.oauthbearer. cookie. cache":"off" To reset the pipeline cookies, use alter pipeline … set offset cursor ''
. -
The pipeline may fail because the OAUTHBEARER client config on the identity server is changed or the pipeline config is altered in a way which causes previously requested cached tokens to become invalid.
This issue may be avoided by resetting the respective pipeline cache. -
alter pipeline … set offset cursor ''
- to reset cookies cache; -
flush extractor pools
- to reset extractor pools;
-
Kafka Version Setting
Warning
Using SSL and SASL with Kafka requires Kafka protocol version 0.CREATE PIPELINE .
and SELECT .
statements using SSL and SASL with Kafka also need to adhere to that version requirement.CONFIG
clause, similar to this CONFIG '{"kafka_
.pipelines_
engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE .
statement.
Final Step: Use the Connection String in a SQL Statement
Create your CREATE PIPELINE .
or SELECT .
statement, using the string containing the connection settings that you created in the previous steps.
Examples
The following examples make the following assumptions:
-
Port 9092 is a plaintext endpoint
-
Port 9093 is an SSL endpoint
-
Port 9094 is a plaintext SASL endpoint
-
Port 9095 is an SSL SASL endpoint
Note
The examples use CREATE PIPELINE
, but the CONFIG
and CREDENTIALS
clauses shown can be used with SELECT .
also.SELECT .
.
Plaintext
The following CREATE PIPELINE
statements are equivalent:
CREATE PIPELINE `kafka_plaintext`AS LOAD DATA KAFKA 'host.example.com:9092/test'CONFIG '{"security.protocol": "plaintext"}'INTO table t;
CREATE PIPELINE `kafka_no_creds`AS LOAD DATA KAFKA 'host.example.com:9092/test'INTO table t;
SSL
CREATE PIPELINE `kafka_ssl`AS LOAD DATA KAFKA 'host.example.com:9093/test'CONFIG '{"security.protocol": "ssl","ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem","ssl.key.location": "/var/private/ssl/client_memsql_client.key","ssl.ca.location": "/var/private/ssl/ca-cert.pem"}'CREDENTIALS '{"ssl.key.password": "abcdefgh"}'INTO table t;
Kerberos with no SSL
CREATE PIPELINE `kafka_kerberos_no_ssl`AS LOAD DATA KAFKA 'host.example.com:9094/test'CONFIG '{""security.protocol": "sasl_plaintext","sasl.mechanism": "GSSAPI","sasl.kerberos.service.name": "kafka","sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM","sasl.kerberos.keytab": "/etc/krb5.keytab"}'INTO table t
Kerberos with SSL
CREATE PIPELINE `kafka_kerberos_ssl`AS LOAD DATA KAFKA 'host.example.com:9095/test'CONFIG '{"security.protocol": "sasl_ssl","sasl.mechanism": "GSSAPI","ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem","ssl.key.location": "/var/private/ssl/client_memsql_client.key","ssl.ca.location": "/var/private/ssl/ca-cert.pem","sasl.kerberos.service.name": "kafka","sasl.kerberos.principal": "memsql/host.example.com@EXAMPLE.COM","sasl.kerberos.keytab": "/etc/krb5.keytab"}'CREDENTIALS '{"ssl.key.password": "abcdefgh"}'INTO table t
SASL/PLAIN with SSL
CREATE PIPELINE `kafka_sasl_ssl_plain`AS LOAD DATA KAFKA 'host.example.com:9095/test'CONFIG '{"security.protocol": "sasl_ssl","sasl.mechanism": "PLAIN","ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem","ssl.key.location": "/var/private/ssl/client_memsql_client.key","ssl.ca.location": "/var/private/ssl/ca-cert.pem","sasl.username": "kafka"}'CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'INTO table t;
SASL/PLAIN without SSL
CREATE PIPELINE `kafka_sasl_plaintext_plain`AS LOAD DATA KAFKA 'host.example.com:9094/test'CONFIG '{"security.protocol": "sasl_plaintext","sasl.mechanism": "PLAIN","sasl.username": "kafka"}'CREDENTIALS '{"sasl.password": "metamorphosis"}'INTO table t;
SASL/SCRAM with SSL
CREATE PIPELINE `kafka_sasl_ssl_scram`AS LOAD DATA KAFKA 'host.example.com:9095/test'CONFIG '{"security.protocol": "sasl_ssl","sasl.mechanism": "SCRAM-SHA-512","ssl.certificate.location": "/var/private/ssl/client_memsql_client.pem","ssl.key.location": "/var/private/ssl/client_memsql_client.key","ssl.ca.location": "/var/private/ssl/ca-cert.pem","sasl.username": "kafka"}'CREDENTIALS '{"ssl.key.password": "abcdefgh", "sasl.password": "metamorphosis"}'INTO table t;
SASL/SCRAM without SSL
CREATE PIPELINE `kafka_sasl_plaintext_plain`AS LOAD DATA KAFKA 'host.example.com:9094/test'CONFIG '{"security.protocol": "sasl_plaintext","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "kafka"}'CREDENTIALS '{"sasl.password": "metamorphosis"}'INTO table t;
Configure SASL OAUTHBEARER for Use with Kafka Pipelines
Note
SASL OAUTHBEARER is not supported with SELECT INTO .
CREATE or REPLACE PIPELINE <pipeline_name>AS LOAD DATA KAFKA "<Kafka cluster location>"CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"OAUTHBEARER","sasl.oauthbearer.client.id":"<CLIENT_ID>","sasl.oauthbearer.client.secret":<"CLIENT_SECRET>","sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>","sasl.oauthbearer.scope":"<SCOPE>"}'INTO TABLE <table_name>;
Load Data from the Confluent Kafka Connector
The SingleStore Confluent Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore.
Learn more about the SingleStore Confluent Kafka Connector, including how to install and configure it, in Working with the Kafka Connector.
Working with the Kafka Connector
To understand Kafka’s core concepts and how it works, please read the Kafka documentation.
The Confluent Kafka Connector is available via the Confluent Hub and as a download from SingleStore.
Note: After you have installed the version you want to use, you will need to configure the connector properties.
The rest of this page describes how the connector works.
Note: You can also use a pipeline to Load Data from Kafka.
Connector Behavior
See the SingleStore Kafka Connector for information about the connector.
Auto-creation of tables
While loading data, if the table does not exist in SingleStore, it will be created using the information from the first record.
The table name is the name of the topic.valueSchema
.valueSchema
is not a struct, then a single column with name data will be created with the schema of the record.tableKey
property.
If the table already exists, all records will be loaded directly into it.
Exactly once delivery
To achieve exactly once delivery, set singlestore.
to true
.kafka_
table will then be created.
This table contains an identifier, count of records, and time of each transaction.kafka-topic
, kafka-partition
, and kafka-offset
.kafka-connect
job succeeds.
Data is written to the table and to the kafka_
table in one transaction.
To overwrite the name of this table, use the singlestore.
property.
Data Types
The connector converts Kafka data types to SingleStore data types:
Kafka Type |
SingleStore Type |
---|---|
STRUCT |
JSON |
MAP |
JSON |
ARRAY |
JSON |
INT8 |
TINYINT |
INT16 |
SMALLINT |
INT32 |
INT |
INT64 |
BIGINT |
FLOAT32 |
FLOAT |
FLOAT64 |
DOUBLE |
BOOLEAN |
TINYINT |
BYTES |
TEXT |
STRING |
VARBINARY(1024) |
Table Keys
To add a column as a key in SingleStore, use the tableKey
property:
Suppose you have an entity:
{
"id" : 123,
"name" : "Alice"
}
If you want to add the id
column as a PRIMARY KEY to your SingleStore table, add "tableKey.
to your properties configuration.
Doing so will generate the following query during table creation:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY (`id`))
You can also specify the name of a key by providing it like this: "tableKey.
.
This will create a key with a name:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY `someName`(`id`))
Table Names
By default, the Kafka Connector maps data from topics into SingleStore tables by matching the topic name to the table name.kafka-example-topic
then the connector will load it into the SingleStore table called kafka-example-topic
.
To specify a custom table name, you can use the singlestore.
property.
{
...
"singlestore.tableName.foo" : "bar",
...
}
In this example, data from the Kafka topic foo
will be written to the SingleStore table called bar
.
You can use this method to specify custom table names for multiple topics:
{
...
"singlestore.tableName.kafka-example-topic-1" : "singlestore-table-name-1",
"singlestore.tableName.kafka-example-topic-2" : "singlestore-table-name-2",
...
}
Installing the SingleStore Kafka Connector via Confluent Hub
This guide shows you how to install and configure the SingleStore Kafka Connector in Confluent Hub, via the following process:
-
Make sure you satisfy the prerequisites.
-
Install the connector.
-
Configure the connector.
Prerequisites
Make sure you have met the following prerequisites before installing the connector.
-
MemSQL version 6.
8 or newer/SingleStore version 7. 1 or newer installed and running.
Install the Connector and Add a Connection in Confluent
Install the SingleStore Kafka Connector via the Confluent Hub.
Run the Confluent Hub CLI installation command as described on the Confluent Hub:
Accept all of the default configuration options while installing.
Now that you have the connector installed, you can create a connection.
-
Browse to the Confluent Control Center.
-
Click Connect in the left side menu.
-
Click Add Connector.
-
Select SingleStore Sink Connector.
-
Select the topics from which you want to get data.
-
Configure the connector properties.
For an explanation of the various configuration properties, see SingleStore Kafka Connector Properties.
Installing the SingleStore Kafka Connector via Download
This guide shows you how to get and install the Java-based SingleStore Kafka Connector for connecting with open source Apache Kafka.
-
Make sure you satisfy the prerequisites.
-
Download the connector JAR file.
-
Configure the connector properties.
Prerequisites
Make sure you have met the following prerequisites before installing the connector.
-
MemSQL version 6.
8 or newer/SingleStore version 7. 1 or newer installed and running -
Java Development Kit (JDK) installed
-
Apache Kafka installed and running
-
Kafka Schema Registry configured
-
Kafka Connect
-
For SingleStore Kafka Connector versions prior to 1.
1. 1, install the MariaDB JDBC driver For SingleStore Kafka Connector versions 1.
1. 1 and newer, Install/configure the latest version of the SingleStore JDBC driver
Download and the SingleStore Kafka Connector
Get the SingleStore Kafka Connector JAR file here.
You will need to have the JDK installed and configured, with JAVA_
See the README and the Quickstart files on the GitHub page for more information.
Configure the SingleStore Connector Properties
The connector is configurable via a property file or Kafka-REST.kafka-connect
job.
The connector properties include the standard Kafka properties as well as some SingleStore-specific properties.
For an explanation of the various SingleStore-specific configuration properties, see SingleStore Kafka Connector Properties.
SingleStore Kafka Connector Properties
Configuration for the connector is controlled via the SingleStore Kafka Connector Sink configuration properties.
Confluent users will configure these properties via the Confluent UI.
The properties listed below show the SingleStore-specific properties.
SingleStore Kafka Connector Sink Configuration Properties
Property |
Description |
Default |
---|---|---|
|
Hostname or IP address of the SingleStore Master Aggregator in the |
|
|
Hostname or IP address of the SingleStore Aggregator nodes to run queries against in the |
|
|
If set, all connections will default to using this database. |
empty) |
|
SingleStore username. |
root |
|
SingleStore password. |
no password |
|
Specify a specific MySQL or JDBC parameter which will be injected into the connection URI. |
empty |
|
The maximum number of times to retry on errors before failing the task. |
10 |
|
The time in milliseconds to wait following an error before a retry attempt is made. |
3000 |
|
Specify additional keys to add to tables created by the connector; value of this property is the comma separated list with names of the columns to apply key; <index_ |
|
|
Compress data on load; one of (GZip, LZ4, Skip). |
GZip |
|
Allows or denies the use of an additional meta-table to save the recording results. |
true |
|
Specify the name of the table to save Kafka transaction metadata. |
|
|
Specify an explicit table name to use for the specified topic. |
Example Configuration
Refer to Config example for a sample configuration.
Load Data from Kafka Using a Pipeline
To create and interact with a Kafka pipeline quickly, follow the instructions in this section.
-
Part 1: Sending Messages to Kafka
-
Part 2: Creating a Kafka Pipeline in SingleStore
Prerequisites
To complete this Quickstart, your environment must meet the following prerequisites:
-
A working Kafka queue.
-
SingleStore installation –or– a SingleStore cluster: You will connect to the database or cluster and create a pipeline to pull data from Kafka queue.
Part 1: Sending Messages to Kafka
In Kafka, create a topic named test
and enter the following messages:
the quick
brown fox
jumped over
the lazy dog
In Part 2, you will create a pipeline in SingleStore to ingest these messages.
Part 2: Creating a Kafka Pipeline in SingleStore
Now that your Kafka topic contains some messages, you can create a new pipeline and ingest the messages.
At the SingleStore prompt, execute the following statements:
CREATE DATABASE quickstart_kafka;USE quickstart_kafka;CREATE TABLE messages (id text);
These statements create a new table and database that will be used for the Kafka pipeline.
To create the pipeline, execute the following statement, replacing <kafka-cluster-ip>
with your Kafka cluster's IP address:
CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-cluster-ip>/test' INTO TABLE messages;
If you are connecting with SSL, SASL, or Kerberos, you will need to specify additional clauses in your CREATE PIPELINE
statement.
The CREATE PIPELINE
statement just mentioned creates a new Kafka pipeline named quickstart_
, which reads messages from the test
Kafka topic and writes it into the messages
table.
TEST PIPELINE quickstart_kafka LIMIT 1;
If this test was successful and no errors are present, then you are ready to try ingesting data.messages
.
START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;
To verify that the data exists in the messages
table as expected, execute the following statement.
SELECT * FROM messages;
+--------------+
| id |
+--------------+
| the quick |
| brown fox |
| jumped over |
| the lazy dog |
+--------------+
Now you are ready to start your pipeline as a background process.
START PIPELINE quickstart_kafka;
Now that the pipeline is up and running, send a few more messages to the Kafka topic.
Lorem ipsumdolor sit amet
In the SingleStore terminal window, run the SELECT * FROM messages;
statement again.
SELECT * FROM messages;
+----------------+
| id |
+----------------+
| lorem ipsum |
| dolor sit amet |
| the quick |
| brown fox |
| jumped over |
| the lazy dog |
+----------------+
Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_
table.
SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;
This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics.
Note
Foreground pipelines and background pipelines have different intended uses and behave differently.
Note: When a backup is restored all pipelines in that database will revert to the state (offsets, etc.
Test your Kafka Cluster using kcat (formerly kafkacat)
When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages.
Note
kcat
is available at https://github.
Running kcat
Minimal syntax for running kcat is as follows.
kcat <mode> -b <broker> -t <topic>
When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster.-X
option.
<mode>
One of the following:
-
-P: Producer mode.
Used to produce messages to a Kafka topic (not demonstrated here). -
-C: Consumer mode.
Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected. -
-L: Metadata listing mode.
Used here to test connectivity to a Kafka cluster. -
-Q: Query mode.
Not demonstrated here.
<broker>
The Kafka broker to connect to.
<topic>
The name of the topic to produce to, consume from, or list metadata from.
Verify connectivity to your Kafka Cluster
Running kcat in any mode, where the command returns successfully, is confirmation that you can connect.
Verify Connectivity by Returning Metadata
In the following example, kcat connects to a Confluent Cloud cluster using -L
mode to return metadata.
kcat -b <Confluent Cloud broker URL> -L \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Metadata for all topics (from broker ...)
20 brokers:
...
10 topics:
...
Verify Connectivity by Consuming One Message from a Topic
In the following example, kcat connects to a Confluent Cloud cluster using -C
mode to consume the last message from a topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Verify data in your Kafka Cluster
Two situations where you may want to verify data in your Kafka cluster are:
-
Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.
-
After using SingleStore to produce Kafka messages (using
SELECT .
).. . INTO KAFKA . . .
In both situations, you can run kcat in -C
mode for a given topic.
The following kcat command connects to a Confluent Cloud cluster using -C
mode.kcat_
.-e
option indicates that kcat should exit once it has consumed the last message from the topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \-f 'Partition: %p Offset: %o Message: %s\n' \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>> kcat_output.txt
Example: Verify messages in Kafka after producing messages using SELECT . . . INTO KAFKA . . .
Create a table with three columns.
CREATE TABLE numbers(a INT, b INT, c INT);
Insert data into the table.
INSERT INTO numbers(a,b,c) VALUES (10,20,30);INSERT INTO numbers(a,b,c) VALUES (40,50,60);INSERT INTO numbers(a,b,c) VALUES (70,80,90);
Use the SELECT .
command to create a Kafka message for each row of the numbers
table.numbers-topic
in a Confluent Cloud cluster.10
, 20
or 30
.
SELECT a + 10, b + 20, c + 30FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"PLAIN","ssl.ca.location":"<CA certificate file path>","sasl.username":"<CLUSTER_API_KEY>"}'CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"LINES TERMINATED BY '}' STARTING BY '{';
Run the kcat command shown in the section "Verify data in your Kafka cluster", above.kcat_
output file and search for {20,40,60}
to verify that the message is in the file.{20,40,60}
.
Partition: 1 Offset: 0 Message: {20,40,60}Partition: 1 Offset: 1 Message: {50,70,90}Partition: 1 Offset: 2 Message: {80,100,120}...
Troubleshooting Kafka AWS Pipelines
There are a number of factors that can contribute to Kafka pipeline errors.
Error 1933: Cannot get source metadata for pipeline.
ERROR 1933 ER_
Error 1933 means that there is a communication error between SingleStore and the Kafka broker.
The cause could be something as simple as the Kafka broker is offline.
-
Broker Status: Confirm Kafka broker is in an online state.
-
Port number: check to see if the port number is open and not in use by any other connection endpoint.
-
Hostname: check if there is a typo in the hostname.
-
Firewall: verify the firewall's rules are configured to allow incoming and outgoing traffic on the correct port(s).
-
SSL configs: ensure that the SSL configurations are not missing and are correct.
How to Analyze the Debugging Log
This section shows how to enable debugging (if not already enabled), how to analyze the output to determine the cause, and assist in solving the error.
-
Enable the engine variable:
pipelines_
for verbose error logging.extractor_ debug_ logging SET GLOBAL pipelines_extractor_debug_logging=ON;Query OK, 0 rows affected (0.02 sec)
-
Flush the extractor pools.
FLUSH EXTRACTOR POOLS;Query OK, 0 rows affected (0.00 sec)
-
Run the
CREATE PIPELINE
statement and wait to see if the pipeline fails.Below is a Kafka broker unreachable sample error. CREATE PIPELINE my_pipeline asLOAD DATA KAFKA 'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages;ERROR 1933 (HY000): Cannot get source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator? ssl.ca.location is missing. Kafka error Local: Broker transport failure.
-
Run
SHOW WARNINGS
.The output provides information as to what is causing the error. As shown in this example output, there is no communication with the Kafka broker on ec2-xx-xxx-xx-xx.
.us-west-2. compute. amazonaws. com:9092 This may be due to no usable broker is present or the broker is not running. Note that if the issue is with an existing pipeline that is failing, the verbose debugging errors will be logged in the
information_
table.schema. pipelines_ errors SHOW WARNINGS;+---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Level | Code | Message | +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Warning | 1933 | --- TRUNCATED --- ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 567 connection attempt(s)) CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change ... "FAIL [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Connect to ipv4#192.xxx.x.xx:9092 failed: Connection refused (after 55ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state CONNECT -> DOWN" "CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: refresh unavailable topics METADATA [thrd:main]: Hinted cache of 1/1 topic(s) being queried METADATA [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: no cluster connection" "CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: broker in state TRY_CONNECT connecting STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change"
-
To turn off
pipelines_
:extractor_ debug_ logging SET GLOBAL pipelines_extractor_debug_logging = OFF;
When the variable pipelines_
is OFF, the warning messages are concise but still helpful.
Confirm Listeners in Kafka server. properties
is Correct
If the error: Cannot get source metadata for pipeline.server.
file.
For example, a pipeline was created using the following DDL statement:
CREATE PIPELINE kafkaAS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages;
This pipeline creation statement resulted in the following error:
ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot getsource metadata for pipeline. Could not fetch Kafka metadata; are the Kafkabrokers reachable from the Master Aggregator? Ssl.ca.location is missing.Kafka error Local Broker transport failure
To rectify this error, the server.
file will need to be edited to reflect the correct hostname and to check for any other missing or incorrect settings.server.
file may differ based on the version of Linux and Kafka that are being used.
-
Specify the broker’s hostname and include the port number it uses.
listeners=PLAINTEXT://ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092 -
Restart the broker:
ubuntu@kafka:~/kafka/config$ sudo systemctl restart kafka -
Confirm the broker is running:
ubuntu@kafka:~/kafka/config$ sudo systemctl status kafka | grep ActiveActive: active (running) since Wed 2022-12-21 01:29:29 UTC; 48s ago -
Try running the create pipeline statement again:
CREATE PIPELINE kafkaAS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages; -
Check if the pipeline is ingesting data from Kafka:
STOP PIPELINE kafka;Test PIPELINE kafka LIMIT 1;singlestore> Test PIPELINE kafka limit 1; +----------------------+------------------------------------------------------------------------------------------------------+ | id | tweet | +----------------------+------------------------------------------------------------------------------------------------------+ | 1612846720953581568 | {"created_at":1673367598,"favorite_count":0,"id":1612846720953581568,"retweet_count":0,"text":"RT | | | @YE0NSSI: never getting over mr. jazz and boracut https://t.co/sHjsOaQ21S","username":"angelyoongo"} | +----------------------+------------------------------------------------------------------------------------------------------+ 1 row in set (0.49 sec)
Failed to get watermark offsets from Kafka with error Local: Unknown partition
Troubleshooting an Error 1933 error when trying to create a pipeline:
singlestore> create pipeline debug as load data kafkaec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'into table messages;ERROR 1933 (HY000): Cannot get source metadata for pipeline.Failed to get watermark offsets from Kafka with error Local:Unknown partition
Confirm all Kafka Brokers are Running
One potential cause of the watermark offsets error is the Kafka broker is not running.
In the example error above there are two brokers:
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093
Confirm all brokers are running using the command sudo systemctl status <broker_
.
The Kafka broker1 is active and running, broker2 is active but failed.
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker1 | grep Active
Active: active (running) since Wed 2023-01-04 01:01:56 UTC; 14min ago
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
Active: failed (Result: exit-code) since Wed 2023-01-04 01:01:04 UTC; 15min ago
Start the offline broker using the command: sudo systemctl start <broker_
;.
ubuntu@kafka:~/kafka/scripts$ sudo systemctl start broker2
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
Active: active (running) since Wed 2023-01-04 01:26:21 UTC; 1min 14s ago
Once all the brokers have been started and are running, creating a pipeline should work.
singlestore> CREATE PIPELIN debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'INTO TABLE messages;Query OK, 0 rows affected (0.35 sec)
Note
If all the brokers are in an offline state, the error would be: Error 1933: Cannot get source metadata for pipeline.
ERROR 1933: Cannot get source metadata for pipeline. Zero partitions for topic. Topic may not exist, or it may still be being created.
The error shown below is likely caused by a problem with the topic, topic permissions, or the topic is still recovering from a broker reboot.
singlestore> system dateThu Jan 5 23:38:30 UTC 2023singlestore> CREATE PIPELINE debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'INTO TABLE messages;ERROR 1933 (HY000): Cannot get source metadata for pipeline.Zero partitions for topic. Topic may not exist, or it may stillbe being created.
Check Kafka’s server.
to cross-reference the timestamp when the error occurred with the information present in the Kafka logging.
ubuntu@kafka:~/kafka/logs$ rg debug server.log | rg -v multibroker | rg -v test | rg 23:3[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 51457 with message format version 2 (kafka.log.Log$)[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 51457 (kafka.log.Log$)[2023-01-05 23:38:29,998] INFO [ProducerStateManager partition=debug-1] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-1/00000000000000051457.snapshot,51457)' (kafka.log.ProducerStateManager)[2023-01-05 23:38:30,034] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Producer state recovery took 59ms for snapshot load and 0ms for segment recovery from offset 51457 (kafka.log.Log$)[2023-01-05 23:38:30,049] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-1, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=1, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=51457) with 1 segments in 107ms (5/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 48543 with message format version 2 (kafka.log.Log$)[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 48543 (kafka.log.Log$)[2023-01-05 23:38:30,300] INFO [ProducerStateManager partition=debug-0] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-0/00000000000000048543.snapshot,48543)' (kafka.log.ProducerStateManager)[2023-01-05 23:38:30,301] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 48543 (kafka.log.Log$)[2023-01-05 23:38:30,315] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-0, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=48543) with 1 segments in 39ms (12/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)[2023-01-05 23:38:35,951] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)[2023-01-05 23:38:36,414] INFO [Partition debug-1 broker=0] Log loaded for partition debug-1 with initial high watermark 51457 (kafka.cluster.Partition)
A recovery topic is usually solved by running the CREATE PIPELINE
command again.
singlestore> CREATE PIPELINE debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'INTO TABLE messages;Query OK, 0 rows affected (0.05 sec)
Cannot get source metadata for pipeline. Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition
If an error such as, Cannot get source metadata for pipeline.information_
table, check the Kafka broker(s) server.
around the same time as the error.
*** 1. row ***
DATABASE_NAME: kafka
PIPELINE_NAME: debug
ERROR_UNIX_TIMESTAMP: 1672960138.923225
ERROR_TYPE: Error
ERROR_CODE: 1933
ERROR_MESSAGE: Cannot get source metadata for pipeline.
Failed to get watermark offsets from Kafka with error Broker: Unknown
topic or partition
ERROR_KIND: Extract
STD_ERROR: 2023-01-05 23:07:51.811 Batch starting
with new consumer. rd_kafka_version_str: 1.8.2-PRE6-4-gecdd4d-dirty
2023-01-05 23:08:58.869 Failed to get watermark offsets from Kafka
with error Broker: Unknown topic or partition_$_SERVER_ERROR:
Failed to get watermark offsets from Kafka with error Broker:
Unknown topic or partition
In the Kafka server.
there are recovery type traces around the time of the error in the information_
table.
[2023-01-05 23:08:58,276] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,277] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,277] INFO Kafka startTimeMs: 1672960138244 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,279] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)[2023-01-05 23:08:58,581] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]:Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)[2023-01-05 23:08:58,642] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)[2023-01-05 23:08:59,279] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)
Once the broker is online and fully recovered, confirm if the problematic Kafka pipeline has stop printing the unknown topic or partition errors to the information_
table.
Last modified: January 10, 2025