Kafka Pipeline Errors
On this page
There are a number of factors that can contribute to errors while creating a Kakfa pipeline.
ERROR 1933: Cannot get source metadata for pipeline
ERROR 1933 ER_
Error 1933 means that there is a communication error between SingleStore and the Kafka broker.
The cause could be something as simple as the Kafka broker being offline.
-
Broker Status: confirm the Kafka broker is in an online state.
-
Port number: check to see if the port number is open and not in use by any other connection endpoint.
-
Hostname: check if there is a typo in the hostname.
-
Firewall: verify the firewall's rules are configured to allow incoming and outgoing traffic on the correct port(s).
-
SSL configs: ensure the SSL configurations are not missing and are correct.
Failed to get watermark offsets from Kafka with error Local: Unknown partition
Troubleshooting an Error 1933 error when trying to create a pipeline:
CREATE PIPELINE p DEBUG asLOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'INTO TABLE messages;
ERROR 1933 (HY000): Cannot get source metadata for pipeline.
Failed to get watermark offsets from Kafka with error Local:
Unknown partition
Confirm all Kafka Brokers are Running
One potential cause of the watermark offsets error could be that the Kafka broker is not running.
In the sample error above there are two brokers:
ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093
Confirm all brokers are running using the command sudo systemctl status <broker_
.
The Kafka broker1 is active and running, broker2 is active but failed.
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker1 | grep Active
Active: active (running) since Wed 2023-01-04 01:01:56 UTC; 14min ago
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
Active: failed (Result: exit-code) since Wed 2023-01-04 01:01:04 UTC; 15min ago
Start the offline broker using the command: sudo systemctl start <broker_
.
ubuntu@kafka:~/kafka/scripts$ sudo systemctl start broker2
ubuntu@kafka:~/kafka/scripts$ sudo systemctl status broker2 | grep Active
Active: active (running) since Wed 2023-01-04 01:26:21 UTC; 1min 14s ago
Once all the brokers have been started and are running, creating a pipeline should work.
CREATE PIPELINE debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092,ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9093/multibroker'INTO TABLE messages;
Query OK, 0 rows affected (0.35 sec)
Note
If all the brokers are in an offline state, the error would be: Error 1933: Cannot get source metadata for pipeline.
Confirm Listeners in Kafka server. properties
is Correct
If the error: Cannot get source metadata for pipeline.server.
file.
For example, a pipeline was created using the following DDL statement:
CREATE PIPELINE kafkaAS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages;
This pipeline creation statement resulted in the following error:
ERROR 1933 ER_EXTRACTOR_EXTRACTOR_GET_LATEST_OFFSETS: Cannot getsource metadata for pipeline. Could not fetch Kafka metadata; are the Kafkabrokers reachable from the Master Aggregator? Ssl.ca.location is missing.Kafka error Local Broker transport failure
To rectify this error, the server.
file will need to be edited to reflect the correct hostname and to check for any other missing or incorrect settings.server.
file may differ based on the version of Linux and Kafka that is being used.
-
Specify the broker’s hostname and include the port number it uses.
listeners=PLAINTEXT://ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092 -
Restart the broker:
ubuntu@kafka:~/kafka/config$ sudo systemctl restart kafka -
Confirm the broker is running:
ubuntu@kafka:~/kafka/config$ sudo systemctl status kafka | grep ActiveActive: active (running) since Wed 2022-12-21 01:29:29 UTC; 48s ago -
Try running the create pipeline statement again:
CREATE PIPELINE kafkaAS LOAD DATA KAFKA'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages; -
Check if the pipeline is ingesting data from Kafka:
STOP PIPELINE kafka;Test PIPELINE kafka LIMIT 1;Test PIPELINE kafka limit 1; +----------------------+------------------------------------------------------------------------------------------------------+ | id | tweet | +----------------------+------------------------------------------------------------------------------------------------------+ | 1612846720953581568 | {"created_at":1673367598,"favorite_count":0,"id":1612846720953581568,"retweet_count":0,"text":"RT | | | @YE0NSSI: never getting over mr. jazz and boracut https://t.co/sHjsOaQ21S","username":"angelyoongo"} | +----------------------+------------------------------------------------------------------------------------------------------+ 1 row in set (0.49 sec)
ERROR 1933: Cannot get source metadata for pipeline. Zero partitions for topic. Topic may not exist, or it may still be being created
The error shown below is likely caused by a problem with the topic, or the topic is still recovering from a broker reboot.
SYSTEM DATE
Thu Jan 5 23:38:30 UTC 2023
CREATE PIPELINE debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'INTO TABLE messages;
ERROR 1933 (HY000): Cannot get source metadata for pipeline.
Zero partitions for topic. Topic may not exist, or it may still
be being created.
Check Kafka’s server.
to cross-reference the timestamp when the error occurred with the information present in the Kafka logging.
ubuntu@kafka:~/kafka/logs$ rg debug server.log | rg -v multibroker | rg -v test | rg 23:3[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 51457 with message format version 2 (kafka.log.Log$)[2023-01-05 23:38:29,975] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 51457 (kafka.log.Log$)[2023-01-05 23:38:29,998] INFO [ProducerStateManager partition=debug-1] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-1/00000000000000051457.snapshot,51457)' (kafka.log.ProducerStateManager)[2023-01-05 23:38:30,034] INFO [LogLoader partition=debug-1, dir=/tmp/kafka-logs-broker0] Producer state recovery took 59ms for snapshot load and 0ms for segment recovery from offset 51457 (kafka.log.Log$)[2023-01-05 23:38:30,049] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-1, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=1, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=51457) with 1 segments in 107ms (5/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Loading producer state till offset 48543 with message format version 2 (kafka.log.Log$)[2023-01-05 23:38:30,300] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Reloading from producer snapshot and rebuilding producer state from offset 48543 (kafka.log.Log$)[2023-01-05 23:38:30,300] INFO [ProducerStateManager partition=debug-0] Loading producer state from snapshot file 'SnapshotFile(/tmp/kafka-logs-broker0/debug-0/00000000000000048543.snapshot,48543)' (kafka.log.ProducerStateManager)[2023-01-05 23:38:30,301] INFO [LogLoader partition=debug-0, dir=/tmp/kafka-logs-broker0] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 48543 (kafka.log.Log$)[2023-01-05 23:38:30,315] INFO Completed load of Log(dir=/tmp/kafka-logs-broker0/debug-0, topicId=FeqyeH2ZR_28aMF1x40Z-g, topic=debug, partition=0, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=48543) with 1 segments in 39ms (12/17 loaded in /tmp/kafka-logs-broker0) (kafka.log.LogManager)[2023-01-05 23:38:35,951] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)[2023-01-05 23:38:36,414] INFO [Partition debug-1 broker=0] Log loaded for partition debug-1 with initial high watermark 51457 (kafka.cluster.Partition)
A recovery topic is usually solved by running the CREATE PIPELINE
command again.
CREATE PIPELINE debug AS LOAD DATA kafka'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/debug'INTO TABLE messages;Query OK, 0 rows affected (0.05 sec)
Cannot get source metadata for pipeline. Failed to get watermark offsets from Kafka with error Broker: Unknown topic or partition
If an error such as, Cannot get source metadata for pipeline.information_
table, check the Kafka broker(s) server.
around the same time as the error.
*** 1. row ***
DATABASE_NAME: kafka
PIPELINE_NAME: debug
ERROR_UNIX_TIMESTAMP: 1672960138.923225
ERROR_TYPE: Error
ERROR_CODE: 1933
ERROR_MESSAGE: Cannot get source metadata for pipeline.
Failed to get watermark offsets from Kafka with error Broker: Unknown
topic or partition
ERROR_KIND: Extract
STD_ERROR: 2023-01-05 23:07:51.811 Batch starting
with new consumer. rd_kafka_version_str: 1.8.2-PRE6-4-gecdd4d-dirty
2023-01-05 23:08:58.869 Failed to get watermark offsets from Kafka
with error Broker: Unknown topic or partition_$_SERVER_ERROR:
Failed to get watermark offsets from Kafka with error Broker:
Unknown topic or partition
In the Kafka server.
there are recovery type traces around the time of the error in the information_
table.
[2023-01-05 23:08:58,276] INFO Kafka version: 3.0.0 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,277] INFO Kafka commitId: 8cb0a5e9d3441962 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,277] INFO Kafka startTimeMs: 1672960138244 (org.apache.kafka.common.utils.AppInfoParser)[2023-01-05 23:08:58,279] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)[2023-01-05 23:08:58,581] INFO [BrokerToControllerChannelManager broker=0 name=alterIsr]:Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)[2023-01-05 23:08:58,642] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]:Recorded new controller, from now on will use broker ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092(id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)[2023-01-05 23:08:59,279] INFO [Partition debug-0 broker=0] Log loaded for partition debug-0 with initial high watermark 48543 (kafka.cluster.Partition)
Once the broker is online and fully recovered, confirm if the problematic Kafka pipeline has stopped printing the unknown topic or partition errors to the information_
table.
Debugging Kafka Pipelines
This section shows how to enable debugging (if not already enabled), how to analyze the output to determine the cause, and assist in solving the error.
-
Enable the engine variable:
pipelines_
for verbose error logging.extractor_ debug_ logging SET GLOBAL pipelines_extractor_debug_logging=ON;Query OK, 0 rows affected (0.02 sec)
-
Flush the extractor pools.
FLUSH EXTRACTOR POOLS;Query OK, 0 rows affected (0.00 sec)
-
Run the
CREATE PIPELINE
statement and wait to see if the pipeline fails.Below is a Kafka broker unreachable sample error. CREATE PIPELINE my_pipeline asLOAD DATA KAFKA 'ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com/singlestore'INTO TABLE messages;ERROR 1933 (HY000): Cannot get source metadata for pipeline. Could not fetch Kafka metadata; are the Kafka brokers reachable from the Master Aggregator? ssl.ca.location is missing. Kafka error Local: Broker transport failure.
-
Run
SHOW WARNINGS
.The output provides information as to what is causing the error. As shown in this example output, there is no communication with the Kafka broker on ec2-xx-xxx-xx-xx.
.us-west-2. compute. amazonaws. com:9092 This may be due to no usable broker is present or the broker is not running. Note that if the issue is with an existing pipeline that is failing, the verbose debugging errors will be logged in the
information_
table.schema. pipelines_ errors SHOW WARNINGS;+---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Level | Code | Message | +---------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Warning | 1933 | --- TRUNCATED --- ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Selected for cluster connection: application metadata request (broker has 567 connection attempt(s)) CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Received CONNECT op STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: 127.0.0.1:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change ... "FAIL [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Connect to ipv4#192.xxx.x.xx:9092 failed: Connection refused (after 55ms in state CONNECT) (_TRANSPORT): identical to last error: error log suppressed STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state CONNECT -> DOWN" "CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: refresh unavailable topics METADATA [thrd:main]: Hinted cache of 1/1 topic(s) being queried METADATA [thrd:main]: Skipping metadata refresh of 1 topic(s): refresh unavailable topics: no usable brokers CONNECT [thrd:main]: Not selecting any broker for cluster connection: still suppressed for 36ms: no cluster connection" "CONNECT [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: broker in state TRY_CONNECT connecting STATE [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT BROADCAST [thrd:ec2-xx-xxx-xx-xx.us-west-2.compute.amazonaws.com:9092/bootstrap]: Broadcasting state change"
-
To turn off
pipelines_
:extractor_ debug_ logging SET GLOBAL pipelines_extractor_debug_logging = OFF;
When the variable pipelines_
is OFF, the warning messages are concise but still helpful.
Collecting Full Error Logs
This method is particularly useful when creating pipelines, where the error messages received may not be descriptive enough or can be truncated due to size limitations.
By default, pipeline errors are limited to 65 KB of characters.pipelines_
), errors can easily grow up to a few megabytes in size.
To capture the complete error details, use a debug pipeline.
Note
-
Using a debug pipeline will slow down the pipeline execution, and it should only be used for error analysis.
-
The debug pipeline behaves as if the
pipelines_
variable is set toextractor_ debug_ logging on
.It will produce all of the debugging logs.
CREATE PIPELINE p DEBUG asLOAD DATA KAFKA ''INTO TABLE t;
The debugging logs will be stored in the information_
table.
SELECT * FROM information_schema.pipeline_errors INTO OUTFILE '/tmp/out.txt';
The out.
file will contain the errors with complete debugging logs for further analysis.
Last modified: October 9, 2024