Skip to main content

Load Data from Kafka Using a Pipeline

To create and interact with a Kafka pipeline quickly, follow the instructions in this section. There are two parts to this Quickstart:

  1. Part 1: Sending Messages to Kafka

  2. Part 2: Creating a Kafka Pipeline in SingleStore

Prerequisites

To complete this Quickstart, your environment must meet the following prerequisites:

  • A working Kafka queue.

  • SingleStoreDB installation –or– a SingleStoreDB cluster: You will connect to the database or cluster and create a pipeline to pull data from Kafka queue.

Part 1: Sending Messages to Kafka

In Kafka, create a topic named test and enter the following messages:

the quick
brown fox
jumped over
the lazy dog

In Part 2, you will create a pipeline in SingleStore to ingest these messages.

Part 2: Creating a Kafka Pipeline in SingleStore

Now that your Kafka topic contains some messages, you can create a new pipeline and ingest the messages.

At the SingleStore prompt, execute the following statements:

CREATE DATABASE quickstart_kafka;
USE quickstart_kafka;
CREATE TABLE messages (id text);

These statements create a new table and database that will be used for the Kafka pipeline. Before you can create the pipeline, you will need the IP address of the Kafka cluster.

To create the pipeline, execute the following statement, replacing <kafka-cluster-ip> with your Kafka cluster's IP address:

CREATE PIPELINE quickstart_kafka AS LOAD DATA KAFKA '<kafka-cluster-ip>/test' INTO TABLE messages;

If you are connecting with SSL, SASL, or Kerberos, you will need to specify additional clauses in your CREATE PIPELINE statement. See Securely Connect to Kafka for more information.

The CREATE PIPELINE statement just mentioned creates a new Kafka pipeline named quickstart_kafka, which reads messages from the test Kafka topic and writes it into the messages table. For more information, see the CREATE PIPELINE topic. If the statement was successful, you can test your pipeline. While you can start a pipeline after creating it, it’s always best to test it using a small set of data:

TEST PIPELINE quickstart_kafka LIMIT 1;

If this test was successful and no errors are present, then you are ready to try ingesting data. The following command will run one batch and commit the data to the SingleStore table messages.

START PIPELINE quickstart_kafka FOREGROUND LIMIT 1 BATCHES;

To verify that the data exists in the messages table as expected, execute the following statement. If it was successful, you should see a non-empty result set.

SELECT * FROM messages;
****
+--------------+
| id           |
+--------------+
| the quick    |
| brown fox    |
| jumped over  |
| the lazy dog |
+--------------+

Now you are ready to start your pipeline as a background process. SingleStore will automatically ingest new messages as they are put into Kafka.

START PIPELINE quickstart_kafka;

Now that the pipeline is up and running, send a few more messages to the Kafka topic. Enter the following lines into your topic:

Lorem ipsum
dolor sit amet

In the SingleStore terminal window, run the SELECT * FROM messages; statement again. Now you will see the following output:

SELECT * FROM messages;
****
+----------------+
| id             |
+----------------+
| lorem ipsum    |
| dolor sit amet |
| the quick      |
| brown fox      |
| jumped over    |
| the lazy dog   |
+----------------+

Now that your pipeline is running, you can check the status and history of it at any time by querying the PIPELINES_BATCHES_SUMMARY table.

SELECT * FROM information_schema.PIPELINES_BATCHES_SUMMARY;

This system view will give one row for every recent batch the pipeline has run, as well as at-a-glance performance and volume metrics. This information is extremely valuable for monitoring and understanding your production pipeline system.

Note

Foreground pipelines and background pipelines have different intended uses and behave differently. For more information, see the CREATE PIPELINE topic.

Note: When a backup is restored all pipelines in that database will revert to the state (offsets, etc.) they were in when the target backup was generated. If the data isn't in Kafka anymore you will start from the earliest offsets available in Kafka.

Quickstart Summary

In this Kafka Quickstart, you sent multiple messages to a Kafka topic, and then created a table in SingleStore and a Kafka pipeline in SingleStore to ingest the messages. This Quickstart only demonstrated the most basic functionality of a Kafka pipeline, but you can apply the same concepts to a real-world scenario.