Working with the Kafka Connector
On this page
To understand Kafka’s core concepts and how it works, please read the Kafka documentation.
The Confluent Kafka Connector is available via the Confluent Hub and as a download from SingleStore.
Note: After you have installed the version you want to use, you will need to configure the connector properties.
The rest of this page describes how the connector works.
Note: You can also use a pipeline to Load Data from Kafka.
Connector Behavior
See the SingleStore Kafka Connector for information about the connector.
Auto-creation of tables
While loading data, if the table does not exist in SingleStore, it will be created using the information from the first record.
The table name is the name of the topic.valueSchema
.valueSchema
is not a struct, then a single column with name data will be created with the schema of the record.tableKey
property.
If the table already exists, all records will be loaded directly into it.
Exactly once delivery
To achieve exactly once delivery, set singlestore.
to true
.kafka_
table will then be created.
This table contains an identifier, count of records, and time of each transaction.kafka-topic
, kafka-partition
, and kafka-offset
.kafka-connect
job succeeds.
Data is written to the table and to the kafka_
table in one transaction.
To overwrite the name of this table, use the singlestore.
property.
Data Types
The connector converts Kafka data types to SingleStore data types:
Kafka Type |
SingleStore Type |
---|---|
STRUCT |
JSON |
MAP |
JSON |
ARRAY |
JSON |
INT8 |
TINYINT |
INT16 |
SMALLINT |
INT32 |
INT |
INT64 |
BIGINT |
FLOAT32 |
FLOAT |
FLOAT64 |
DOUBLE |
BOOLEAN |
TINYINT |
BYTES |
TEXT |
STRING |
VARBINARY(1024) |
Table Keys
To add a column as a key in SingleStore, use the tableKey
property:
Suppose you have an entity:
{
"id" : 123,
"name" : "Alice"
}
If you want to add the id
column as a PRIMARY KEY to your SingleStore table, add "tableKey.
to your properties configuration.
Doing so will generate the following query during table creation:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY (`id`))
You can also specify the name of a key by providing it like this: "tableKey.
.
This will create a key with a name:
CREATE TABLE IF NOT EXISTS `table` (`id` INT NOT NULL,`name` TEXT NOT NULL,PRIMARY KEY `someName`(`id`))
Table Names
By default, the Kafka Connector maps data from topics into SingleStore tables by matching the topic name to the table name.kafka-example-topic
then the connector will load it into the SingleStore table called kafka-example-topic
.
To specify a custom table name, you can use the singlestore.
property.
{
...
"singlestore.tableName.foo" : "bar",
...
}
In this example, data from the Kafka topic foo
will be written to the SingleStore table called bar
.
You can use this method to specify custom table names for multiple topics:
{
...
"singlestore.tableName.kafka-example-topic-1" : "singlestore-table-name-1",
"singlestore.tableName.kafka-example-topic-2" : "singlestore-table-name-2",
...
}
Last modified: August 16, 2023