Test your Kafka Cluster using kcat (formerly kafkacat)
On this page
When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages.
Note
kcat
is available at https://github.
Running kcat
Minimal syntax for running kcat is as follows.
kcat <mode> -b <broker> -t <topic>
When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster.-X
option.
<mode>
One of the following:
-
-P: Producer mode.
Used to produce messages to a Kafka topic (not demonstrated here). -
-C: Consumer mode.
Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected. -
-L: Metadata listing mode.
Used here to test connectivity to a Kafka cluster. -
-Q: Query mode.
Not demonstrated here.
<broker>
The Kafka broker to connect to.
<topic>
The name of the topic to produce to, consume from, or list metadata from.
Verify connectivity to your Kafka Cluster
Running kcat in any mode, where the command returns successfully, is confirmation that you can connect.
Verify Connectivity by Returning Metadata
In the following example, kcat connects to a Confluent Cloud cluster using -L
mode to return metadata.
kcat -b <Confluent Cloud broker URL> -L \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Metadata for all topics (from broker ...)
20 brokers:
...
10 topics:
...
Verify Connectivity by Consuming One Message from a Topic
In the following example, kcat connects to a Confluent Cloud cluster using -C
mode to consume the last message from a topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>
Verify data in your Kafka Cluster
Two situations where you may want to verify data in your Kafka cluster are:
-
Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.
-
After using SingleStore to produce Kafka messages (using
SELECT .
).. . INTO KAFKA . . .
In both situations, you can run kcat in -C
mode for a given topic.
The following kcat command connects to a Confluent Cloud cluster using -C
mode.kcat_
.-e
option indicates that kcat should exit once it has consumed the last message from the topic.
kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \-f 'Partition: %p Offset: %o Message: %s\n' \-X security.protocol=SASL_SSL \-X sasl.mechanism=PLAIN \-X ssl.ca.location=<CA certificate file path> \-X sasl.username=<CLUSTER_API_KEY> \-X sasl.password=<CLUSTER_API_SECRET>> kcat_output.txt
Example: Verify messages in Kafka after producing messages using SELECT . . . INTO KAFKA . . .
Create a table with three columns.
CREATE TABLE numbers(a INT, b INT, c INT);
Insert data into the table.
INSERT INTO numbers(a,b,c) VALUES (10,20,30);INSERT INTO numbers(a,b,c) VALUES (40,50,60);INSERT INTO numbers(a,b,c) VALUES (70,80,90);
Use the SELECT .
command to create a Kafka message for each row of the numbers
table.numbers-topic
in a Confluent Cloud cluster.10
, 20
or 30
.
SELECT a + 10, b + 20, c + 30FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'CONFIG '{"security.protocol":"SASL_SSL","sasl.mechanism":"PLAIN","ssl.ca.location":"<CA certificate file path>","sasl.username":"<CLUSTER_API_KEY>"}'CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"LINES TERMINATED BY '}' STARTING BY '{';
Run the kcat command shown in the section "Verify data in your Kafka cluster", above.kcat_
output file and search for {20,40,60}
to verify that the message is in the file.{20,40,60}
.
Partition: 1 Offset: 0 Message: {20,40,60}Partition: 1 Offset: 1 Message: {50,70,90}Partition: 1 Offset: 2 Message: {80,100,120}...
Last modified: November 22, 2022