Load Data from the Confluent Kafka Connector
On this page
The SingleStore Confluent Kafka Connector is a Kafka Connect connector that allows you to easily ingest AVRO, JSON, and CSV messages from Kafka topics into SingleStoreDB.
Learn more about the SingleStoreDB Confluent Kafka Connector, including how to install and configure it, in Working with the Kafka Connector.
Working with the Kafka Connector
To understand Kafka’s core concepts and how it works, please read the Kafka documentation.
The Confluent Kafka Connector is available via the Confluent Hub and as a download from SingleStoreDB.
Note: After you have installed the version you want to use, you will need to configure the connector properties.
The rest of this page describes how the connector works.
Note: You can also use a pipeline to Load Data from Kafka Using a Pipeline.
Connector Behavior
See the SingleStore Kafka Connector for information about the connector.
Auto-creation of tables
While loading data, if the table does not exist in SingleStoreDB, it will be created using the information from the first record.
The table name is the name of the topic.valueSchema
.valueSchema
is not a struct, then a single column with name data will be created with the schema of the record.tableKey
property.
If the table already exists, all records will be loaded directly into it.
Exactly once delivery
To achieve exactly once delivery, set singlestore.
to true
.kafka_
table will then be created.
This table contains an identifier, count of records, and time of each transaction.kafka-topic
, kafka-partition
, and kafka-offset
.kafka-connect
job succeeds.
Data is written to the table and to the kafka_
table in one transaction.
To overwrite the name of this table, use the singlestore.
property.
Data Types
The connector converts Kafka data types to SingleStoreDB data types:
Kafka Type |
SingleStoreDB Type |
---|---|
STRUCT |
JSON |
MAP |
JSON |
ARRAY |
JSON |
INT8 |
TINYINT |
INT16 |
SMALLINT |
INT32 |
INT |
INT64 |
BIGINT |
FLOAT32 |
FLOAT |
FLOAT64 |
DOUBLE |
BOOLEAN |
TINYINT |
BYTES |
TEXT |
STRING |
VARBINARY(1024) |
Table Keys
To add a column as a key in SingleStoreDB, use the tableKey
property:
Suppose you have an entity:
{"id" : 123,"name" : "Alice"}
If you want to add the id
column as a PRIMARY KEY to your SingleStoreDB table, add "tableKey.
to your properties configuration.
Doing so will generate the following query during table creation:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY (`id`))
You can also specify the name of a key by providing it like this: "tableKey.
.
This will create a key with a name:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY `someName`(`id`))
Table Names
By default, the Kafka Connector maps data from topics into SingleStoreDB tables by matching the topic name to the table name.kafka-example-topic
then the connector will load it into the SingleStoreDB table called kafka-example-topic
.
To specify a custom table name, you can use the singlestore.
property.
{..."singlestore.tableName.foo" : "bar",...}
In this example, data from the Kafka topic foo
will be written to the SingleStoreDB table called bar
.
You can use this method to specify custom table names for multiple topics:
{..."singlestore.tableName.kafka-example-topic-1" : "singlestore-table-name-1","singlestore.tableName.kafka-example-topic-2" : "singlestore-table-name-2",...}
Installing the SingleStore Kafka Connector via Confluent Hub
This guide shows you how to install and configure the SingleStore Kafka Connector in Confluent Hub, via the following process:
-
Make sure you satisfy the prerequisites.
-
Install the connector.
-
Configure the connector.
Prerequisites
Make sure you have met the following prerequisites before installing the connector.
-
MemSQL version 6.
8 or newer/SingleStore version 7. 1 or newer installed and running.
Install the Connector and Add a Connection in Confluent
Install the SingleStore Kafka Connector via the Confluent Hub.
Run the Confluent Hub CLI installation command as described on the Confluent Hub:

Accept all of the default configuration options while installing.
Now that you have the connector installed, you can create a connection.
-
Browse to the Confluent Control Center.
-
Click Connect in the left side menu.
-
Click Add Connector.
-
Select SingleStore Sink Connector.
-
Select the topics from which you want to get data.
-
Configure the connector properties.
For an explanation of the various configuration properties, see SingleStore Kafka Connector Properties.
Installing the SingleStore Kafka Connector via Download
This guide shows you how to get and install the Java-based SingleStore Kafka Connector for connecting with open source Apache Kafka.
-
Make sure you satisfy the prerequisites.
-
Download the connector JAR file.
-
Configure the connector properties.
Prerequisites
Make sure you have met the following prerequisites before installing the connector.
-
MemSQL version 6.
8 or newer/SingleStore version 7. 1 or newer installed and running -
Java Development Kit (JDK) installed
-
Apache Kafka installed and running
-
Kafka Schema Registry configured
-
Kafka Connect
-
For SingleStore Kafka Connector versions prior to 1.
1. 1, install the MariaDB JDBC driver For SingleStore Kafka Connector versions 1.
1. 1 and newer, Install/configure the latest version of the SingleStore JDBC driver
Download and the SingleStore Kafka Connector
Get the SingleStore Kafka Connector JAR file here.
You will need to have the JDK installed and configured, with JAVA_
See the README and the Quickstart files on the GitHub page for more information.
Configure the SingleStoreDB Connector Properties
The connector is configurable via a property file or Kafka-REST.kafka-connect
job.
The connector properties include the standard Kafka properties as well as some SingleStoreDB-specific properties.
For an explanation of the various SingleStoreDB-specific configuration properties, see SingleStore Kafka Connector Properties.
SingleStore Kafka Connector Properties
Configuration for the connector is controlled via the SingleStore Kafka Connector Sink configuration properties.
Confluent users will configure these properties via the Confluent UI.
The properties listed below show the SingleStoreDB-specific properties.
SingleStore Kafka Connector Sink Configuration Properties
Property |
Description |
Default |
---|---|---|
|
Hostname or IP address of the SingleStoreDB Master Aggregator in the format host[:port] (port is optional). |
|
|
Hostname or IP address of SingleStoreDB Aggregator nodes to run queries against in the format host[:port],host[:port],… (port is optional, multiple hosts separated by comma). |
|
|
If set, all connections will default to using this database. |
empty) |
|
SingleStoreDB username. |
root |
|
SingleStoreDB password. |
no password |
|
Specify a specific MySQL or JDBC parameter which will be injected into the connection URI. |
empty |
|
The maximum number of times to retry on errors before failing the task. |
10 |
|
The time in milliseconds to wait following an error before a retry attempt is made. |
3000 |
|
Specify additional keys to add to tables created by the connector; value of this property is the comma separated list with names of the columns to apply key; <index_ |
|
|
Compress data on load; one of (GZip, LZ4, Skip). |
GZip |
|
Allows or denies the use of an additional meta-table to save the recording results. |
true |
|
Specify the name of the table to save Kafka transaction metadata. |
|
|
Specify an explicit table name to use for the specified topic. |
Example Configuration
You can see an example configuration here.
Last modified: February 3, 2023