# Kafka Pipeline Errors

There are a number of factors that can contribute to errors while creating a Kakfa pipeline. This topic lists some common errors and their solutions. For more information on creating pipelines with Kafka, refer to [Load Data from Kafka](https://docs.singlestore.com/db/v9.1/load-data/data-sources/load-data-from-kafka.md).

> **📝 Note**: - Monitor the [operation.timeout.ms](https://docs.singlestore.com/db/v9.1/load-data/data-sources/configuration-options-for-different-sources/#section-idm234488319108397.md) configuration setting while running Kafka pipelines. if `operation.timeout.ms` is set to 10 seconds in Kafka and a Kafka offset takes 20 seconds to fetch, an error is thrown. To avoid this error, increase the `operation.timeout.ms` limit.
> - When inserting data into Stored Procedures through a pipeline, out–of-order optimization is disabled by default and cannot be enabled (with the help of the `OUT_OF_ORDER OPTIMIZATION` [clause](https://docs.singlestore.com/db/v9.1/reference/sql-reference/pipelines-commands/alter-pipeline/#section-idm4637357457899232547490767548.md)). To prevent an order mismatch issue, sort the data before inserting it into the underlying table.

## ERROR 1933: Cannot get source metadata for pipeline

`ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot get source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator? ssl.ca.location is missing. Kafka error Local: Broker transport failure.`

Error 1933 means that there is a communication error between SingleStore and the Kafka broker.

The cause could be something as simple as the Kafka broker being offline. Some things to check are:

* **Broker Status**: confirm the Kafka broker is in an online state.
* **Port number:** check to see if the port number is open and not in use by any other connection endpoint.
* **Hostname**: check if there is a typo in the hostname.
* **Firewall:** verify the firewall's rules are configured to allow incoming and outgoing traffic on the correct port(s).
* **SSL configs:** ensure the SSL configurations are not missing and are correct.

## Failed to get watermark offsets from Kafka with error Local: Unknown partition

Troubleshooting an Error 1933 error when trying to create a pipeline:

```sql
CREATE PIPELINE p DEBUG as 
LOAD DATA KAFKA 
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092, 
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker' 
INTO TABLE messages;


```

```output

ERROR 1933 (HY000): Cannot get source metadata for pipeline.  
Failed to get watermark offsets from Kafka with error Local: 
Unknown partition
```

## Confirm all Kafka Brokers are Running

One potential cause of the watermark offsets error could be that the Kafka broker is not running.

In the sample error above there are two brokers:

```shell
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092 
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093
```

Confirm all brokers are running using the command `sudo systemctl status <broker_name>`. For demonstration purposes the broker names are *broker1* and *broker2*.

The Kafka broker1 is active and running, broker2 is active but failed.

```shell
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker1 | grep Active


```

```output

     Active: active (running) since Wed 2023-01-04 01:01:56 UTC; 14min ago

```

```shell
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active


```

```output

     Active: failed (Result: exit-code) since Wed 2023-01-04 01:01:04 UTC; 15min ago
```

Start the offline broker using the command: `sudo systemctl start <broker_name>;`.

```shell
ubuntu@kafka:~/kafka/scripts$ sudo systemctl start broker2

```

```
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active


```

```output

  Active: active (running) since Wed 2023-01-04 01:26:21 UTC; 1min 14s ago
```

Once all the brokers have been started and are running, creating a pipeline should work.

```sql
CREATE PIPELINE debug AS LOAD DATA kafka 
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092, 
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker' 
INTO TABLE messages;


```

```output

Query OK, 0 rows affected (0.35 sec)
```

> **📝 Note**: If all the brokers are in an offline state, the error would be: *Error 1933:  Cannot get source metadata for pipeline.*

## Confirm Listeners in Kafka `server.properties` is Correct

If the error: *Cannot get source metadata for pipeline. Could not fetch Kafka metadata* keeps occurring on pipeline creation, confirm the hostname is correct in the Kafka `server.properties` file.

For example, a pipeline was created using the following DDL statement:

```sql
CREATE PIPELINE kafka
AS LOAD DATA KAFKA 
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'
INTO TABLE messages;
```

This pipeline creation statement resulted in the following error:

```sql
ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot get 
source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka 
brokers reachable from the Master Aggregator? Ssl.ca.location is missing. 
Kafka error Local Broker transport failure
```

To rectify this error, the `server.properties` file will need to be edited to reflect the correct hostname and to check for any other missing or incorrect settings. The location of the `server.properties` file may differ based on the version of Linux and Kafka that is being used.

1. Specify the broker’s hostname and include the port number it uses.
   ```sql
   listeners=PLAINTEXT://ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092
   ```

2. Restart the broker:
   ```shell
   ubuntu@kafka:~/kafka/config$ sudo systemctl restart kafka
   ```

3. Confirm the broker is running:
   ```shell
   ubuntu@kafka:~/kafka/config$ sudo systemctl status kafka | grep Active
      Active: active (running) since Wed 2022-12-21 01:29:29 UTC; 48s ago
   ```

4. Try running the create pipeline statement again:
   ```sql
   CREATE PIPELINE kafka
   AS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'
   INTO TABLE messages;
   ```

5. Check if the pipeline is ingesting data from Kafka:
   ```sql
   STOP PIPELINE kafka;

   Test PIPELINE kafka LIMIT 1;


   ```
   ```output

   Test PIPELINE kafka limit 1;
   +----------------------+------------------------------------------------------------------------------------------------------+
   | id                   | tweet                                                                                                |                                                                                                                                                                                          
   +----------------------+------------------------------------------------------------------------------------------------------+ 
   | 1612846720953581568  | {"created_at":1673367598,"favorite_count":0,"id":1612846720953581568,"retweet_count":0,"text":"RT    |
   |                      | @YE0NSSI: never getting over mr. jazz and boracut https://t.co/sHjsOaQ21S","username":"angelyoongo"} |
   +----------------------+------------------------------------------------------------------------------------------------------+
   1 row in set (0.49 sec)
   ```

## ERROR 1933: Cannot get source metadata for pipeline. Zero partitions for topic. Topic may not exist, or it may still be being created

The error shown below is  likely caused by a problem with the topic, topic permissions, or the topic is still recovering from a broker reboot.

```sql
SYSTEM DATE


```

```output

Thu Jan  5 23:38:30 UTC 2023
```

```sql
CREATE PIPELINE debug AS LOAD DATA kafka 
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug' 
INTO TABLE messages;


```

```output

ERROR 1933 (HY000): Cannot get source metadata for pipeline.  
Zero partitions for topic.  Topic may not exist, or it may still 
be being created.
```

Check Kafka’s `server.log` to cross-reference the timestamp when the error occurred with the information present in the Kafka logging. The log should provide helpful information regarding the error. In the  example below, there are traces about topic recovery:

```shell
ubuntu@kafka:~/kafka/logs$ rg debug server.log | rg -v multibroker | rg -v test | rg 23:3
[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 51457 with message format version 2 (kafka.log.Log$)
[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 51457 (kafka.log.Log$)
[2023-01-05 23:38:29,998] INFO [ProducerStateManager partition=debug-1] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-1/00000000000000051457.snapshot,51457)' (kafka.log.ProducerStateManager)
[2023-01-05 23:38:30,034] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Producer state recovery took 59ms for snapshot load and 0ms for segment recovery from offset 51457 (kafka.log.Log$)
[2023-01-05 23:38:30,049] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-1, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=1, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=51457) with 1 segments in 107ms (5/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)
[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 48543 with message format version 2 (kafka.log.Log$)
[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 48543 (kafka.log.Log$)
[2023-01-05 23:38:30,300] INFO [ProducerStateManager partition=debug-0] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-0/00000000000000048543.snapshot,48543)' (kafka.log.ProducerStateManager)
[2023-01-05 23:38:30,301] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 48543 (kafka.log.Log$)
[2023-01-05 23:38:30,315] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-0, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=48543) with 1 segments in 39ms (12/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)
[2023-01-05 23:38:35,951] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)
[2023-01-05 23:38:36,414] INFO [Partition debug-1 broker=0] Log loaded for partition debug-1 with initial high watermark 51457 (kafka.cluster.Partition)
```

A recovery topic is usually solved by running the `CREATE PIPELINE` command again.

```sql
CREATE PIPELINE debug AS LOAD DATA kafka 
'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'
INTO TABLE messages;
Query OK, 0 rows affected (0.05 sec)
```

## Cannot get source metadata for pipeline.  Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition

If an error such as, *Cannot get source metadata for pipeline.  Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition*, arises in the `information_schema.pipelines_errors` table, check the Kafka broker(s) `server.log` around the same time as the error.

```output
*** 1. row ***
            DATABASE_NAME: kafka
           PIPELINE_NAME: debug
     ERROR_UNIX_TIMESTAMP: 1672960138.923225
               ERROR_TYPE: Error
               ERROR_CODE: 1933
            ERROR_MESSAGE: Cannot get source metadata for pipeline.  
Failed to get watermark offsets from Kafka with error Broker: Unknown 
topic or partition
               ERROR_KIND: Extract
                STD_ERROR: 2023-01-05 23:07:51.811 Batch starting 
with new consumer.  rd_kafka_version_str: 1.8.2-PRE6-4-gecdd4d-dirty
2023-01-05 23:08:58.869 Failed to get watermark offsets from Kafka
with error Broker: Unknown topic or partition_$_SERVER_ERROR: 
Failed to get watermark offsets from Kafka with error Broker: 
Unknown topic or partition
```

In the Kafka `server.log` there are recovery type traces around the time of the error in the `information_schema.pipelines_errors` table.

```shell
[2023-01-05 23:08:58,276] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,277] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,277] INFO Kafka startTimeMs: 1672960138244 (org.apache.kafka.common.utils.AppInfoParser)
[2023-01-05 23:08:58,279] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
[2023-01-05 23:08:58,581] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]: 
     Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092 
     (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-01-05 23:08:58,642] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: 
     Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092 
     (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-01-05 23:08:59,279] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)
```

Once the broker is online and fully recovered, confirm if the problematic Kafka pipeline has stopped printing the unknown topic or partition errors to the `information_schema.pipelines_errors` table.

## Debugging Kafka Pipelines

This section shows how to enable debugging (if not already enabled), how to analyze the output to determine the cause, and assist in solving the error.

> **📝 Note**: If you want to capture the debug error logs of larger size for one specific pipeline without affecting execution of others (as the global variable does), use a debug pipeline. This allows you to collect error buffers up to 10 MB (64 kB without using the debug pipeline). For more information, refer to [Debugging Pipeline Errors](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines/pipeline-troubleshooting/debugging-pipeline-errors/#section-idm234638941933936.md).

1. Enable the engine variable: `pipelines_extractor_debug_logging` for verbose error logging.
   ```sql
   SET GLOBAL pipelines_extractor_debug_logging=ON;


   ```
   ```output

   Query OK, 0 rows affected (0.02 sec)
   ```

2. Flush the extractor pools.
   ```sql
   FLUSH EXTRACTOR POOLS;


   ```
   ```output

   Query OK, 0 rows affected (0.00 sec)
   ```

3. Run the `CREATE PIPELINE` statement and wait to see if the pipeline fails. Below is a Kafka broker unreachable sample error.
   ```sql
   CREATE PIPELINE my_pipeline as 
    LOAD DATA KAFKA 'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore' 
    INTO TABLE messages;


   ```
   ```output

   ERROR 1933 (HY000): Cannot get source metadata for pipeline.  
     Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator?  
     ssl.ca.location is missing. Kafka error Local: Broker transport failure.
   ```

4. Run `SHOW WARNINGS`. The output provides information as to what is causing the error. As shown in this example output, there is no communication with the Kafka broker on `ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092`. This may be due to no usable broker is present or the broker is not running.&#x20;

   Note that if the issue is with an existing pipeline that is failing, the verbose debugging errors will be logged in the [`information_schema.pipelines_errors`](https://docs.singlestore.com/db/v9.1/reference/information-schema-reference/data-ingest/pipelines-errors.md) table.
   ```sql
   SHOW WARNINGS;


   ```
   ```output

   +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Level   | Code | Message                                                                                                                                                                               |
   +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | Warning | 1933 |
   --- TRUNCATED ---
    ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 567 connection attempt(s))
   CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op
   STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
   BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change

   ...

   "FAIL [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Connect to ipv4#192.xxx.x.xx:9092 failed: 
   Connection refused (after 55ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed
   STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state CONNECT -> DOWN"
   "CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: refresh unavailable topics
   METADATA [thrd:main]: Hinted cache of 1/1 topic(s) being queried
   METADATA [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers
   CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: no cluster connection"
   "CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: broker in state TRY_CONNECT connecting
   STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
   BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change"
   ```

5. To turn off `pipelines_extractor_debug_logging`:
   ```sql
   SET GLOBAL pipelines_extractor_debug_logging = OFF;
   ```

When the variable `pipelines_extractor_debug_logging` is OFF, the warning messages are concise but still helpful. The output does not show all the available information. When this variable is ON, there is more information. Often an internet search will yield a solution based on the information provided in the output.

If you want to capture the complete error logs, use a debug pipeline. This allows you to collect larger error buffers, up to 10 MB. For more information, refer to [Collecting Full Error Logs](https://docs.singlestore.com/db/v9.1/load-data/about-singlestore-pipelines/pipeline-troubleshooting/debugging-pipeline-errors/#section-idm234638941933936.md).

***

Modified at: March 13, 2025

Source: [/db/v9.1/load-data/data-sources/load-data-from-kafka/kafka-pipeline-errors/](https://docs.singlestore.com/db/v9.1/load-data/data-sources/load-data-from-kafka/kafka-pipeline-errors/)

(An index of the documentation is available at /llms.txt)
