Test your Kafka Cluster using kcat (formerly kafkacat)

When you use SingleStore to consume or produce Kafka messages, you may want to test your Kafka cluster directly to verify connectivity and to verify topics and messages. kcat is a command line utility you can use to perform these tasks.

Note

kcat is available at https://github.com/edenhill/kcat. It is a third-party utility and SingleStore does not provide support for it.

Running kcat

Minimal syntax for running kcat is as follows.

kcat <mode> -b <broker> -t <topic>

When you run kcat, you may need to supply additional parameters, such as SASL settings to connect to your Kafka cluster. Specify these parameters using the -X option. Examples of these parameters are shown in the code samples below, which demonstrate how to connect to Confluent Cloud.

<mode>

One of the following:

  • -P: Producer mode. Used to produce messages to a Kafka topic (not demonstrated here).

  • -C: Consumer mode. Used to consume messages from a Kafka topic. Used here to test connectivity to a Kafka cluster and to verify that data is as expected.

  • -L: Metadata listing mode. Used here to test connectivity to a Kafka cluster.

  • -Q: Query mode. Not demonstrated here.

<broker>

The Kafka broker to connect to.

<topic>

The name of the topic to produce to, consume from, or list metadata from.

Verify connectivity to your Kafka Cluster

Running kcat in any mode, where the command returns successfully, is confirmation that you can connect. Following are two examples of how you can verify connectivity.

Verify Connectivity by Returning Metadata

In the following example, kcat connects to a Confluent Cloud cluster using -L mode to return metadata.

kcat -b <Confluent Cloud broker URL> -L \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
****
Metadata for all topics (from broker ...)

20 brokers:
...

10 topics:
...

Verify Connectivity by Consuming One Message from a Topic

In the following example, kcat connects to a Confluent Cloud cluster using -C mode to consume the last message from a topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -o -1 -e \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>

Verify data in your Kafka Cluster

Two situations where you may want to verify data in your Kafka cluster are:

  • Prior to using SingleStore to consume Kafka messages via Pipelines or the Confluent Kafka Connector.

  • After using SingleStore to produce Kafka messages (using SELECT ... INTO KAFKA ...).

In both situations, you can run kcat in -C mode for a given topic.

The following kcat command connects to a Confluent Cloud cluster using -C mode. The command outputs all of the messages for the specified topic, also showing the offset and partition for each message. The output is piped to the file kcat_output.txt. The -e option indicates that kcat should exit once it has consumed the last message from the topic.

kcat -C -b <Confluent Cloud broker URL> -t <topic> -e \
-f 'Partition: %p  Offset: %o  Message: %s\n' \
-X security.protocol=SASL_SSL \
-X sasl.mechanism=PLAIN \
-X ssl.ca.location=<CA certificate file path> \
-X sasl.username=<CLUSTER_API_KEY> \
-X sasl.password=<CLUSTER_API_SECRET>
> kcat_output.txt

Example: Verify messages in Kafka after producing messages using SELECT ... INTO KAFKA ...

Create a table with three columns.

CREATE TABLE numbers(a INT, b INT, c INT);

Insert data into the table.

INSERT INTO numbers(a,b,c) VALUES (10,20,30);
INSERT INTO numbers(a,b,c) VALUES (40,50,60);
INSERT INTO numbers(a,b,c) VALUES (70,80,90);

Use the SELECT ... INTO KAFKA ... command to create a Kafka message for each row of the numbers table. All of the messages are written to the topic numbers-topic in a Confluent Cloud cluster. The field values in each message correspond to the field values in the table, incremented by 10 , 20 or 30.

SELECT a + 10, b + 20, c + 30
FROM numbers INTO KAFKA '<Confluent Cloud broker URL>/numbers-topic'
CONFIG '{
        "security.protocol":"SASL_SSL",
        "sasl.mechanism":"PLAIN",
        "ssl.ca.location":"<CA certificate file path>",
        "sasl.username":"<CLUSTER_API_KEY>"
        }'
CREDENTIALS '{"sasl.password":"<CLUSTER_API_SECRET>"}'
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY "\t"
LINES TERMINATED BY '}' STARTING BY '{';

Run the kcat command shown in the section "Verify data in your Kafka cluster", above. View the kcat_output.txt output file and search for {20,40,60} to verify that the message is in the file. You will see the other two messages following {20,40,60}. The messages in the output will look as follows. The partitions and offsets may differ.

Partition: 1  Offset: 0  Message: {20,40,60}
Partition: 1  Offset: 1  Message: {50,70,90}
Partition: 1  Offset: 2  Message: {80,100,120}
...