Load Data from Kafka

On this page

Securely Connect to Kafka from SingleStore Helios

Overview

When running a CREATE PIPELINE ... KAFKA ... statement, you may need to make a secure connection to Kafka.

Use Secure Socket Layer (SSL) for the connection and Simple Authentication and Security Layer (SASL) to authenticate. Using SASL for authentication is optional.

  • GSSAPI (Kerberos)

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

  • OAUTHBEARER SASL

This topic assumes SSL have been set up, configured, and enabled on the Kafka brokers. For information on how to enable this functionality, see the SSL and SASL sections in the Kafka documentation.

Convert Java Keystore (JKS) to Privacy Enhanced Mail (PEM) Key

To use SSL encryption for SingleStore pipelines, JKS keys need to be converted to PEM keys.

Note

In the steps below, the < > symbols indicate a variable and any information provided between these symbols is an example.

  1. Create the key and keystore. You will be prompted to enter details such as name, organizational unit, city, state, etc.

    keytool -genkey -keyalg RSA -keystore <keystore-name>.jks -storepass <password> -alias "<keystore-alias-name>"
    What is your first and last name?
     [Unknown]:- 
    What is the name of your organizational unit?
     [Unknown]:-
    What is the name of your organization?
     [Unknown]:-
  2. Export the client certificate from the keystore using the same password as in step 1.

    keytool -exportcert -rfc -file <pem-name>.pem -alias <pem-name-alias> -keystore <keystore-name>.jks
    Enter keystore password:  <password>
    Certificate stored in file <pem-name>.pem
  3. Import the client certificate to the truststore located on your Apache Server. Enter a new password for the keystore.

    keytool -keystore <truststore-name>.jks -alias <truststore-name-alias> -import -file <pem-name>.pem
    Enter keystore password:  
    Re-enter new password: 
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
  4. Convert the client keystore to Public-Key Cryptography Standards (PKCS12) format. The <key name>.jks and password are the same as in step 1.

    keytool -v -importkeystore -srckeystore <keystore-name>.jks -srcalias <keystore alias> -destkeystore <new-keystore-name>.p12 -deststoretype PKCS12
    Importing keystore <keyname>.jks to <new-keyname>.p12...
    Enter destination keystore password:  
    Re-enter new password: 
    Enter source keystore password:  
    [Storing new-keystore-name.p12]
  5. Extract the client certificate key into a .pem file. Use the import password created in step 4.

    openssl pkcs12 -in <new-keyname>.p12 -nocerts -nodes > <keyname>.pem
    Enter Import Password:

Steps for Creating a Secure Connection

To create a secure connection from SingleStore Helios to a Kafka workspace, follow these steps in order.

Upload a Certificate to Use to Connect via TLS/SSL

Use the following steps to enable TLS/SSL encryption between SingleStore Helios and Kafka.

  1. From the Workspaces page on the Cloud Portal, click on the workspace on which to enable TLS/SSL connections.

  2. Click the Security tab at the top of the page.

  3. Click on the Upload Certificate button to upload your CA certificate. This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.

Build a String with the Connection Settings

Using the following settings, create a string containing the CONFIG clause and optionally the CREDENTIALS clause of the CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement that you will be running.

SSL Connection Settings
  1. In your CONFIG JSON, if you want to enable SSL encryption only, set "security.protocol": "ssl". If you want to enable Kerberos with SSL, or otherwise want to use SASL, set "security.protocol": "sasl_ssl".

  2. Set the remaining SSL configuration in the CONFIG JSON:

    • ssl.ca.location. Value is always /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem.

  3. If your SSL certificate key is using a password, set it in your CREDENTIALS JSON.

    • ssl.key.password: Password for the SSL certificate key.

SASL Connection Settings
  1. In your CONFIG JSON, set "security.protocol": "sasl_ssl" for SSL connections, or "security.protocol": "sasl_plaintext" if you want to authenticate with Kafka without SSL encryption.

  2. If your Kafka brokers do not use SCRAM for authentication, set "sasl.mechanism": "PLAIN" in your CONFIG JSON. Otherwise, set "sasl.mechanism": "SCRAM-SHA-256" or "sasl.mechanism": "SCRAM-SHA-512".

  3. In your CONFIG JSON, provide the username, "sasl.username": "<kafka_credential_username>".

  4. In your CREDENTIALS JSON, provide the password, "sasl.password": "<kafka_credential_password>".

Note

SASL_PLAINTEXT/PLAIN authentication mode with Kafka sends your credentials unencrypted over the network. It is therefore not secure and susceptible to being sniffed.

SASL_PLAINTEXT/SCRAM authentication mode with Kafka will encrypt the credentials information sent over the network, but transport of Kafka messages themselves is not secure.

Configuring OAUTHBEARER Authentication Mechanism

OAUTHBEARER is used to secure access to resources on a server by requiring clients to obtain a bearer token. The client presents this token with each request to the server, and the server verifies the token before granting access to the requested resource.

To use SASL OAUTHBEARER authentication with Kafka, the following information is required:

  • "sasl.mechanism":"OAUTHBEARER" - Specifies the client will authenticate using an OAuth 2.0 Bearer Token.

  • "sasl.oauthbearer.client.id":"<CLIENT_ID>" - The client ID is usually provided by the OAuth provider when the client is registered. It is a unique ID that is associated with the OAuth 2.0 Bearer Token.

  • "sasl.oauthbearer.client.secret":"<CLIENT_SECRET>" The client secret is usually assigned by the OAuth provider when a client is registered and is used to authenticate the client.

  • "sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>" - This is the endpoint URL on the authorization server that is used to obtain an OAuth 2.0 Bearer Token. The client sends a request to this endpoint to get a token, which is then used to authenticate subsequent requests to the server.

Optional configurations are:

  • "sasl.oauthbearer.scope":"<SCOPE>" - Determines the permissions and resources that are available to an authorized client. This extension is optional.

  • "sasl.outhbearer.extensions":"<EXTENSION>" - Can be included in the SASL/OAUTHBEARER mechanism to provide additional data or parameters for authentication. Consult RFC-7628 for further information on SASL extensions.

  • See "OAUTHBEARER Pipelines Configuration Details" section for additional details regarding "sasl.oauthbearer.ssl.ca.location" "sasl.oauthbearer.config" fields.

Prerequisites

To use SASL OAUTHBEARER authentication the following prerequisites are required:

  • A Kafka broker with listeners configured using OAUTHBEARER authentication.

  • A connection to a database where Kafka brokers are reachable to create a pipeline to pull data from the Kafka queue.

  • An identity service was selected (e.g., Okta, Google OAuth, Facebook OAuth, Azure AD, Keycloak, etc.).

  • An Oauthbearer client was created and configured with the client_credentials grant type on the identity service.

Note

The instructions for setting up a client on the Identity Service will vary depending on the server chosen and the type of application being built.

Syntax for SASL OAUTHBEARER Pipeline

Below is the syntax for creating a Kafka pipeline using OAUTHBEARER authentication.

CREATE or REPLACE PIPELINE <pipeline_name>   
AS LOAD DATA KAFKA "<Kafka cluster location>"   
CONFIG '{"security.protocol":"SASL_SSL",  
"sasl.mechanism":"OAUTHBEARER",
"sasl.oauthbearer.client.id":"<CLIENT_ID>",   
"sasl.oauthbearer.client.secret":<"CLIENT_SECRET>",
"sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>",
"sasl.oauthbearer.scope":"<SCOPE>"}'  
INTO TABLE <table_name>;

OAUTHBEARER Pipelines Configuration Details

To review the current pipeline configuration use the command:

SELECT * FROM information_schema.pipelines

Most token providers are assumed to work out-of-the-box. However, the implementation details below will allow troubleshooting and configuring pipelines for specific cases.

Token Request Details:

  • SingleStore implements client_credentials grant type for OAUTHBEARER token requests. Ensure the OAuth client is created with support for the client_credentials grant type.

  • By default, SingleStore treats Oauthbearer tokens as opaque to address privacy concerns and uses the expires_in field from the JSON token response to determine the token's validity time. If expires_in is not present or "sasl.oauthbearer.config”:"use_expires_in=false” is set in the pipeline configuration, we fall back to decoding the Oauth token JWT and use the exp claim to determine its validity.

  • The token refresh is scheduled at 80% of the token’s lifetime and handled by SingleStore in the background.

  • For https Oauth token requests, by default, SingleStore will use the CA bundle specified at pipeline creation or try to find one of the system paths if none are specified. This behavior can be changed by including additional configuration settings in the pipeline CONFIG on creation. This change will affect token request logic only, not the total SSL Kafka communication.

    • "sasl.oauthbearer.ssl.ca.location":"system" - to use the system default path;

    • "sasl.oauthbearer.ssl.ca.location":"/usr/lib/ssl/certs/ca-certificates.crt" - to use a specific CA path location;

    • "sasl.oauthbearer.ssl.ca.location":"" (empty) to disable SSL verification;

Token Cache Details:

  • The OAUTHBEARER tokens are cached in the extractor pools for some time (along with the extractors) and the pipeline cookies (per pipeline). The Oauth server must allow multiple simultaneous active tokens due to the several nodes or extractor pools used in the SingleStore Helios architecture.

  • "sasl.oauthbearer.cookie.cache":"on" - by default, tokens are cached in the pipeline cookies to minimize the number of token requests. To disable this feature set "sasl.oauthbearer.cookie.cache":"off". To reset the pipeline cookies, use alter pipeline … set offset cursor ''.

  • The pipeline may fail because the OAUTHBEARER client config on the identity server is changed or the pipeline config is altered in a way which causes previously requested cached tokens to become invalid. This issue may be avoided by resetting the respective pipeline cache.

    • alter pipeline … set offset cursor '' - to reset cookies cache;

    • flush extractor pools - to reset extractor pools;

Kafka Version Setting

Warning

Using SSL and SASL with Kafka requires Kafka protocol version 0.9 or later; therefore, CREATE PIPELINE ... KAFKA ... and SELECT ... INTO KAFKA ... statements using SSL and SASL with Kafka also need to adhere to that version requirement. The Kafka protocol version can be passed in through JSON through the CONFIG clause, similar to this CONFIG '{"kafka_version":"0.10.0.0"}'. Alternatively, the pipelines_kafka_version engine variable controls this parameter for any pipeline without using a Kafka version configuration value in a CREATE PIPELINE ... KAFKA ... statement.

Final Step: Use the Connection String in a SQL Statement

Create your CREATE PIPELINE ... KAFKA ... or SELECT ... INTO KAFKA ... statement, using the string containing the connection settings that you created in the previous steps.

Examples

Create a Kafka Pipeline with Data from Confluent Cloud

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE messages;

Publish Messages to Confluent Cloud

SELECT text FROM t INTO
KAFKA '<Confluent Cloud workspace endpoint>/test-topic'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"};

Configure SASL OAUTHBEARER for Use with Kafka Pipelines

Note

SASL OAUTHBEARER is not supported with SELECT INTO ...

CREATE or REPLACE PIPELINE <pipeline_name>   
AS LOAD DATA KAFKA "<Kafka cluster location>"  
CONFIG '{"security.protocol":"SASL_SSL",  
"sasl.mechanism":"OAUTHBEARER",
"sasl.oauthbearer.client.id":"<CLIENT_ID>",   
"sasl.oauthbearer.client.secret":<"CLIENT_SECRET>",
"sasl.oauthbearer.token.endpoint.url":"<ENDPOINT_URL>",
"sasl.oauthbearer.scope":"<SCOPE>"}'  
INTO TABLE <table_name>;

Load Data from the Confluent Kafka Connector

The SingleStore Confluent Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStore Helios. More specifically, the Confluent Kafka Connector is a Sink (target) connector designed to read data from Kafka topics and write that data to SingleStore Helios tables.

Learn more about the SingleStore Helios Confluent Kafka Connector, including how to install and configure it, in Working with the Kafka Connector.

Connecting to Confluent Schema Registry over SSL

To enable Pipelines to connect to Confluent Schema Registry over SSL, install the registry’s certificate (ca-cert) on all nodes in your SingleStore Helios workspace. For example:

cat ca-cert >> /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem
update-ca-trust

Note

You only need to install the registry’s certificate (ca-cert) in your SingleStore Helios workspace if you are using a self-signed certificate or a certificate signed by an internal CA.

After installing the registry’s certificate on all nodes, existing Pipelines that you already created (or new Pipelines that you create) with the host name/IP address and port of Confluent Schema Registry will use SSL automatically.

Schema Registry Configuration Options

Supported configuration options include:

  • schema.registry.ssl.ca.location

  • schema.registry.ssl.ca.directory

  • schema.registry.ssl.verifypeer

  • schema.registry.ssl.certificate.location

  • schema.registry.ssl.key.location

  • schema.registry.ssl.key.password

  • schema.registry.curl.timeout

  • schema.registry.curl.tcp.keepalive

  • schema.registry.curl.tcp.keepidle

  • schema.registry.curl.keepintvl

  • schema.registry.curl.connecttimeout

  • schema.registry.username

  • schema.registry.password

  • schema.registry.curl.verbose

Load Data from Kafka Using a Pipeline

To create and interact with a Kafka pipeline quickly, follow the instructions in this section. There are two parts to this Quickstart:

  1. Part 1: Sending Messages to Kafka

  2. Part 2: Creating a Kafka Pipeline in SingleStore

Prerequisites

To complete this Quickstart, your environment must meet the following prerequisites:

  • A working Kafka queue.

  • SingleStore Helios installation –or– a SingleStore Helios workspace: You will connect to the database or workspace and create a pipeline to pull data from Kafka queue.

Upload a Certificate to Use to Connect via TLS/SSL

Use the following steps to enable TLS/SSL encryption between SingleStore Helios and Kafka.

  1. From the Workspaces page on the Cloud Portal, click on the workspace on which to enable TLS/SSL connections.

  2. Click the Security tab at the top of the page.

  3. Click on the Upload Certificate button to upload your CA certificate. This will make it available to all the nodes and will allow you to secure outbound connections via TLS/SSL.

Part 1: Sending Messages to Kafka

In Kafka, create a topic named test and enter the following messages:

the quick
brown fox
jumped over
the lazy dog

In Part 2, you will create a pipeline in SingleStore to ingest these messages.

Part 2: Creating a Kafka Pipeline in SingleStore

Now that your Kafka topic contains some messages, you can create a new pipeline and ingest the messages.

At the SingleStore prompt, execute the following statements:

CREATE DATABASE quickstart_kafka;
USE quickstart_kafka;
CREATE TABLE messages (id text);

These statements create a new table and database that will be used for the Kafka pipeline. Before you can create the pipeline, you will need the IP address of the Kafka workspace.

To create the pipeline, execute the following statement, replacing <kafka-workspace-ip> with your Kafka workspace's IP address:

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-workspace-ip>/test' INTO TABLE messages;

If you are connecting with SSL, SASL, or Kerberos, you will need to specify additional clauses in your CREATE PIPELINE statement. See Securely Connect to Kafka for more information.

Note: If you are creating a Kafka pipeline to load data from Confluent Cloud, execute the following statement instead of the previous CREATE PIPELINE statement.

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<Confluent Cloud workspace endpoint>/test'
CONFIG '{"sasl.username": "<CLUSTER_API_KEY>",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem"}'
CREDENTIALS '{"sasl.password": "<CLUSTER_API_SECRET>"}'
INTO TABLE messages;

The two CREATE PIPELINE statements just mentioned create a new Kafka pipeline named quickstart_kafka, which reads messages from the test Kafka topic and writes it into the messages table. For more information, see the CREATE PIPELINE topic. If the statement was successful, you can test your pipeline. While you can start a pipeline after creating it, it’s always best to test it using a small set of data:

TEST PIPELINE quickstart_kafka LIMIT 1;

If this test was successful and no errors are present, then you are ready to try ingesting data. The following command will run one batch and commit the data to the SingleStore table messages.

START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;

To verify that the data exists in the messages table as expected, execute the following statement. If it was successful, you should see a non-empty result set.

SELECT * FROM messages;
+--------------+
| id           |
+--------------+
| the quick    |
| brown fox    |
| jumped over  |
| the lazy dog |
+--------------+

Now you are ready to start your pipeline as a background process. SingleStore will automatically ingest new messages as they are put into Kafka.

START PIPELINE quickstart_kafka;

Now that the pipeline is up and running, send a few more messages to the Kafka topic. Enter the following lines into your topic:

Lorem ipsum
dolor sit amet

In the SingleStore terminal window, run the SELECT * FROM messages; statement again. Now you will see the following output:

SELECT * FROM messages;
+----------------+
| id             |
+----------------+
| lorem ipsum    |
| dolor sit amet |
| the quick      |
| brown fox      |
| jumped over    |
| the lazy dog   |
+----------------+

Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_BATCHES_SUMMARY table.

SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;

This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics. This information is extremely valuable for monitoring and understanding your production pipeline system.

Note

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see the CREATE PIPELINE topic.

Note: When a backup is restored all pipelines in that database will revert to the state (offsets, etc.) they were in when the target backup was generated. If the data isn't in Kafka anymore you will start from the earliest offsets available in Kafka.

Test your Kafka Cluster using kcat (formerly kafkacat)

When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages. kcat is a command line utility you can use to perform these tasks.

Note

kcat is available at https://github.com/edenhill/kcat. It is a third-party utility and SingleStore does not provide support for it.

Running kcat

Minimal syntax for running kcat is as follows.

kcat <mode> -b <broker> -t <topic>

When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster. Specify these parameters using the -X option. Examples of these parameters are shown in the code samples below, which demonstrate how to connect to Confluent Cloud.

<mode>

One of the following:

  • -P: Producer mode. Used to produce messages to a Kafka topic (not demonstrated here).

  • -C: Consumer mode. Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected.

  • -L: Metadata listing mode. Used here to test connectivity to a Kafka cluster.

  • -Q: Query mode. Not demonstrated here.

<broker>

The Kafka broker to connect to.

<topic>

The name of the topic to produce to, consume from, or list metadata from.

Verify connectivity to your Kafka Cluster

Running kcat in any mode, where the command returns successfully, is confirmation that you can connect. Following are two examples of how you can verify connectivity.

Verify Connectivity by Returning Metadata

In the following example, kcat connects to a Confluent Cloud cluster using -L mode to return metadata.

kcat -b <Confluent Cloud broker URL> -L \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
Metadata for all topics (from broker ...)

20 brokers:
...

10 topics:
...

Verify Connectivity by Consuming One Message from a Topic

In the following example, kcat connects to a Confluent Cloud cluster using -C mode to consume the last message from a topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>

Verify data in your Kafka Cluster

Two situations where you may want to verify data in your Kafka cluster are:

  • Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.

  • After using SingleStore to produce Kafka messages (using SELECT ... INTO KAFKA ...).

In both situations, you can run kcat in -C mode for a given topic.

The following kcat command connects to a Confluent Cloud cluster using -C mode. The command outputs all of the messages for the specified topic, also showing the offset and partition for each message. The output is piped to the file kcat_output.txt. The -e option indicates that kcat should exit once it has consumed the last message from the topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \
-f 'Partition: %p  Offset: %o  Message: %s\n' \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
> kcat_output.txt

Example: Verify messages in Kafka after producing messages using SELECT ... INTO KAFKA ...

Create a table with three columns.

CREATE TABLE numbers(a INT, b INT, c INT);

Insert data into the table.

INSERT INTO numbers(a,b,c) VALUES (10,20,30);
INSERT INTO numbers(a,b,c) VALUES (40,50,60);
INSERT INTO numbers(a,b,c) VALUES (70,80,90);

Use the SELECT ... INTO KAFKA ... command to create a Kafka message for each row of the numbers table. All of the messages are written to the topic numbers-topic in a Confluent Cloud cluster. The field values in each message correspond to the field values in the table, incremented by 10 , 20 or 30.

SELECT a + 10, b + 20, c + 30
FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'
CONFIG '{
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"ssl.ca.location":"/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",
"sasl.username":"<CLUSTER_API_KEY>"
}'
CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"
LINES TERMINATED BY '}' STARTING BY '{';

Run the kcat command shown in the section "Verify data in your Kafka cluster", above. View the kcat_output.txt output file and search for {20,40,60} to verify that the message is in the file. You will see the other two messages following {20,40,60}. The messages in the output will look as follows. The partitions and offsets may differ.

Partition: 1 Offset: 0 Message: {20,40,60}
Partition: 1 Offset: 1 Message: {50,70,90}
Partition: 1 Offset: 2 Message: {80,100,120}
...

Troubleshooting Kafka AWS Pipelines

There are a number of factors that can contribute to Kafka pipeline errors. Some of the more common errors and solutions are provided below:

Error 1933: Cannot get source metadata for pipeline.

ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot get source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator? ssl.ca.location is missing. Kafka error Local: Broker transport failure.

Error 1933 means that there is a communication error between SingleStore and the Kafka broker.

The cause could be something as simple as the Kafka broker is offline. Some things to check are:

  • Broker Status: Confirm Kafka broker is in an online state.

  • Port number: check to see if the port number is open and not in use by any other connection endpoint.

  • Hostname: check if there is a typo in the hostname.

  • Firewall: verify the firewall's rules are configured to allow incoming and outgoing traffic on the correct port(s).

  • SSL configs: ensure that the SSL configurations are not missing and are correct.

How to Analyze the Debugging Log

This section shows how to enable debugging (if not already enabled), how to analyze the output to determine the cause, and assist in solving the error.

  1. Enable the engine variable: pipelines_extractor_debug_logging for verbose error logging.

    singlestore> set global pipelines_extractor_debug_logging=ON;
    Query OK, 0 rows affected (0.02 sec)
  2. Flush the extractor pools.

    singlestore> flush extractor pools;
    Query OK, 0 rows affected (0.00 sec)
  3. Run the CREATE PIPELINE statement and wait to see if the pipeline fails. Below is a Kafka broker unreachable sample error.

    CREATE PIPELINE my_pipeline as
    LOAD DATA KAFKA 'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'
    INTO TABLE messages;
    ERROR 1933 (HY000): Cannot get source metadata for pipeline.  
      Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator?  
      ssl.ca.location is missing. Kafka error Local: Broker transport failure.
  4. Run SHOW WARNINGS. The output provides information as to what is causing the error. As shown in this example output, there is no communication with the Kafka broker on ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092. This may be due to no usable broker is present or the broker is not running.

    singlestore> SHOW WARNINGS;
    +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Level   | Code | Message                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
    +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Warning | 1933 |
    --- TRUNCATED ---
     ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 567 connection attempt(s))
    CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op
    STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
    BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change
    
    ...
    
    "FAIL [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Connect to ipv4#192.xxx.x.xx:9092 failed: Connection refused (after 55ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed
    STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state CONNECT -> DOWN"
    "CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: refresh unavailable topics
    METADATA [thrd:main]: Hinted cache of 1/1 topic(s) being queried
    METADATA [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers
    CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: no cluster connection"
    "CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: broker in state TRY_CONNECT connecting
    STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
    BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change"
  5. To turn off pipelines_extractor_debug_logging:

    singlestore> SET GLOBAL pipelines_extractor_debug_logging = OFF;

When the variable pipelines_extractor_debug_logging is OFF, the warning messages are concise but still helpful. The output does not show all the available information. When this variable is ON, there is more information. Often an internet search will yield a solution based on the information provided in the output.

Curl the Endpoint to Check Connectivity

If using a SingleStore Helios private link, try accessing the endpoint you create that maps to the service endpoint by using: curl -v <endpoint>. Otherwise try curling the broker(s):

ubuntu@kafka:~$ curl -v ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
* Trying 196.xx.xx.xxx:9092...
* TCP_NODELAY set
* Connected to ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com (196.xx.xx.xxx) port 9092 (#0)
> GET / HTTP/1.1
> Host: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
> User-Agent: curl/7.68.0
> Accept: */*

If the endpoint connectivity is successful, but there is still an issue with communicating with the broker, contact SingleStore Support, they will be able to run a curl on the DNS entries from within the SingleStore Helios side on any instance in the same subnet.

Confirm IPs Are in the Allowlist

In the Cloud Portal page, select the Firewall tab and look at the addresses in the Outbound list.

See AWS VPC SecurityGroups for more details about configuring outbound traffic.

Confirm Listeners in Kafka server.properties is Correct

If the error: Cannot get source metadata for pipeline. Could not fetch Kafka metadata keeps occurring on pipeline creation, confirm that the hostname is correct in the Kafka server.properties file.

For example, a pipeline was created using the following DDL statement:

CREATE PIPELINE kafka
AS LOAD DATA KAFKA
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'
INTO TABLE messages;

This pipeline creation statement resulted in the following error:

ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot get
source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka
brokers reachable from the Master Aggregator? Ssl.ca.location is missing.
Kafka error Local Broker transport failure

To rectify this error, the server.properties file will need to be edited to reflect the correct hostname and to check for any other missing or incorrect settings. The location of the server.properties file may differ based on the version of Linux and Kafka that are being used.

  1. Specify the broker’s hostname and include the port number it uses.

    listeners=PLAINTEXT://ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
  2. Restart the broker:

    ubuntu@kafka:~/kafka/config$ sudo systemctl restart kafka
  3. Confirm the broker is running:

    ubuntu@kafka:~/kafka/config$ sudo systemctl status kafka | grep Active
       Active: active (running) since Wed 2022-12-21 01:29:29 UTC; 48s ago
  4. Try running the create pipeline statement again:

    CREATE PIPELINE kafka
    AS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'
    INTO TABLE messages;
  5. Check if the pipeline is ingesting data from Kafka:

    STOP PIPELINE kafka;
    Test PIPELINE kafka LIMIT 1;
    singlestore> Test PIPELINE kafka limit 1;
    +----------------------+------------------------------------------------------------------------------------------------------+
    | id                   | tweet                                                                                                |                                                                                                                                                                                          
    +----------------------+------------------------------------------------------------------------------------------------------+ 
    | 1612846720953581568  | {"created_at":1673367598,"favorite_count":0,"id":1612846720953581568,"retweet_count":0,"text":"RT    |
    |                      | @YE0NSSI: never getting over mr. jazz and boracut https://t.co/sHjsOaQ21S","username":"angelyoongo"} |
    +----------------------+------------------------------------------------------------------------------------------------------+
    1 row in set (0.49 sec)

Failed to get watermark offsets from Kafka with error Local: Unknown partition

Troubleshooting an Error 1933 error when trying to create a pipeline:

singlestore> create pipeline debug as load data kafka
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'
into table messages;
ERROR 1933 (HY000): Cannot get source metadata for pipeline. 
Failed to get watermark offsets from Kafka with error Local:
Unknown partition

Confirm all Kafka Brokers are Running

One potential cause of the watermark offsets error is the Kafka broker is not running.

In the example error above there are two brokers:

ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093

Confirm all brokers are running using the command sudo systemctl status <broker_name>. For demonstration purposes the broker names are broker1 and broker2.

The Kafka broker1 is active and running, broker2 is active but failed.

ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker1 | grep Active
     Active: active (running) since Wed 2023-01-04 01:01:56 UTC; 14min ago
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
     Active: failed (Result: exit-code) since Wed 2023-01-04 01:01:04 UTC; 15min ago

Start the offline broker using the command: sudo systemctl start <broker_name>;.

ubuntu@kafka:~/kafka/scripts$ sudo systemctl start broker2
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
  Active: active (running) since Wed 2023-01-04 01:26:21 UTC; 1min 14s ago

Once all the brokers have been started and are running, creating a pipeline should work.

singlestore> CREATE PIPELIN debug AS LOAD DATA kafka
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'
INTO TABLE messages;Query OK, 0 rows affected (0.35 sec)

Note

If all the brokers are in an offline state, the error would be: Error 1933:  Cannot get source metadata for pipeline.

ERROR 1933 ER_EXTRACTOR_EXCTRACTOR_GET_LATEST_OFFSETS

This error can occur for topics in KAFKA that have more than 1 partition defined and possibly indicates incorrect networking configuration.

ERROR 1933 ER_EXTRACTOR_EXCTRACTOR_GET_LATEST_OFFSETS : Forwarding Error.
Cannot get source metadata for pipeline.
Failed to get watermark offsets from Kafka with error Broker: Not leader for partition.

When a KAFKA client (SingleStore Helios in this case) connects to Amazon MSK with a topic with more than 1 partition, it requires access to the relevant broker which is the partition leader. See "Pattern 2" in the following AWS article: https://aws.amazon.com/blogs/big-data/how-goldman-sachs-builds-cross-account-connectivity-to-their-amazon-msk-clusters-with-aws-privatelink/

A resolution to this error may require reaching out AWS MSK.

ERROR 1970 ER_SUBPROCESS_TIMEOUT_ERROR:

Error 1970 usually occurs when trying to create a pipeline:

ERROR 1970 ER_SUBPROCESS_TIMEOUT_ERROR: Forwarding
Error(node-xxx...): Remote connection timed out because no metadata
was received from source. Stderr: <DATE> Batch starting with new
consumer.rd_kafka_version_str

This is potentially caused by the AWS MSK broker endpoints when using Private Link with SingleStore Helios. Verify that the correct AWS MSK broker endpoints have been provided to the SingleStore Support Team.

ERROR 1933: Cannot get source metadata for pipeline.  Zero partitions for topic.  Topic may not exist, or it may still be being created.

The error shown below is  likely caused by a problem with the topic or the topic is still recovering from a broker reboot.

singlestore> system date
Thu Jan  5 23:38:30 UTC 2023
singlestore> CREATE PIPELINE debug AS LOAD DATA kafka
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'
INTO TABLE messages;
ERROR 1933 (HY000): Cannot get source metadata for pipeline. 
Zero partitions for topic.  Topic may not exist, or it may still
be being created.

Check Kafka’s server.log to cross-reference the timestamp when the error occurred with the information present in the Kafka logging. The log should provide helpful information regarding the error. In the  example below, there are traces about topic recovery:

ubuntu@kafka:~/kafka/logs$ rg debug server.log | rg -v multibroker | rg -v test | rg 23:3
[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 51457 with message format version 2 (kafka.log.Log$)
[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 51457 (kafka.log.Log$)
[2023-01-05 23:38:29,998] INFO [ProducerStateManager partition=debug-1] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-1/00000000000000051457.snapshot,51457)' (kafka.log.ProducerStateManager)
[2023-01-05 23:38:30,034] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Producer state recovery took 59ms for snapshot load and 0ms for segment recovery from offset 51457 (kafka.log.Log$)
[2023-01-05 23:38:30,049] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-1, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=1, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=51457) with 1 segments in 107ms (5/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)
[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 48543 with message format version 2 (kafka.log.Log$)
[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 48543 (kafka.log.Log$)
[2023-01-05 23:38:30,300] INFO [ProducerStateManager partition=debug-0] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-0/00000000000000048543.snapshot,48543)' (kafka.log.ProducerStateManager)
[2023-01-05 23:38:30,301] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 48543 (kafka.log.Log$)
[2023-01-05 23:38:30,315] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-0, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=48543) with 1 segments in 39ms (12/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)
[2023-01-05 23:38:35,951] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)
[2023-01-05 23:38:36,414] INFO [Partition debug-1 broker=0] Log loaded for partition debug-1 with initial high watermark 51457 (kafka.cluster.Partition)

A recovery topic is usually solved by running the CREATE PIPELINE command again.

singlestore> CREATE PIPELINE debug AS LOAD DATA kafka
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'
INTO TABLE messages;
Query OK, 0 rows affected (0.05 sec)

Cannot get source metadata for pipeline.  Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition

If an error such as: Cannot get source metadata for pipeline.  Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition in the information_schema.pipelines_errors table start by checking the Kafka broker(s) server.log around the same time as the error.

*** 1. row ***
            DATABASE_NAME: kafka
           PIPELINE_NAME: debug
     ERROR_UNIX_TIMESTAMP: 1672960138.923225
               ERROR_TYPE: Error
               ERROR_CODE: 1933
            ERROR_MESSAGE: Cannot get source metadata for pipeline.  
Failed to get watermark offsets from Kafka with error Broker: Unknown 
topic or partition
               ERROR_KIND: Extract
                STD_ERROR: 2023-01-05 23:07:51.811 Batch starting 
with new consumer.  rd_kafka_version_str: 1.8.2-PRE6-4-gecdd4d-dirty
2023-01-05 23:08:58.869 Failed to get watermark offsets from Kafka
with error Broker: Unknown topic or partition_$_SERVER_ERROR: 
Failed to get watermark offsets from Kafka with error Broker: 
Unknown topic or partition

In the Kafka server.log there are recovery type traces around the time of the error in the information_schema.pipelines_errors table.

[2023-01-05 23:08:58,276] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,277] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,277] INFO Kafka startTimeMs: 1672960138244 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,279] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-01-05 23:08:58,581] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]:
Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-01-05 23:08:58,642] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:
Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-01-05 23:08:59,279] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)

Once the broker is online and fully recovered, confirm if the problematic Kafka pipeline has stop printing the unknown topic or partition errors to the information_schema.pipelines_errors table.

Last modified: October 4, 2023

Was this article helpful?