Examples

Example 1: Configure the singlestore-spark-connector Globally

The following example shows how to configure singlestore-spark-connector for SingleStore Helios.

Using Java

SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.clientEndpoint", "singlestore-host")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Using Python

spark.conf.set("spark.datasource.singlestore.clientEndpoint", "singlestore-host")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Example 2: Read a SingleStore Table into a DataFrame

The following example reads a SingleStore table named foo into a DataFrame.

val df = spark.read
.format("singlestore")
.option("clientEndpoint", "singlestore-host")
.option("user", "admin")
.load("foo")

Using Java

DataFrame df = spark
.read()
.format("singlestore")
.option("clientEndpoint", "singlestore-host")
.option("user", "admin")
.load("foo");

Using Python

df = spark \
.read \
.format("singlestore") \
.option("clientEndpoint", "singlestore-host") \
.option("user", "admin") \
.load("foo")

Example 3: Read a SingleStore Table into a DataFrame and Apply DataFrame Transformations

The following example reads a SingleStore table named foo into a DataFrame and applies DataFrame transformations:

val x = spark.read
.format("singlestore")
.option("clientEndpoint")
.option("user", "admin")
.load("foo")
.withColumn("hello", lit(2))
.filter(col("id") > 1)
.limit(1000)
.groupBy(col("id"))
.agg(count("*"))

Example 4: Configure the singlestore-spark-connector Using an External Table in Spark SQL and Select from the Table

spark.sql("CREATE TABLE bar USING singlestore OPTIONS ('clientEndpoint'='singlestore-host','dbtable'='foo.bar')")
spark.sql("SELECT * FROM bar LIMIT 10").show()

Example 5: Use the Spark Write API to Save a DataFrame to SingleStore

df.write
.format("singlestore")
.option("loadDataCompression", "LZ4")
.option("overwriteBehavior", "dropAndCreate")
.mode(SaveMode.Overwrite)
.save("foo.bar") // in format: database.table

If the target table (foo in this example) does not exist in SingleStore, the singlestore-spark-connector automatically attempts to create the table. If you specify SaveMode.Overwrite and if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate to truncate rather than re-create.

Example 6: Use dbtable

This example requires global options like clientEndpoint, user, and password to be set. Note that if you specify the dbtable option, there's no need to specify the query option, and vice versa.

Here's an example of how to use dbtable:

df = spark.read\
.format("singlestore")\
.option("dbtable", tble_name).load()

Example 7: Use query

This example requires global options like clientEndpoint, user, and password to be set. Note that if you specify the dbtable option, there's no need to specify the query option, and vice versa.

When using the query option, you must specify the database option:

df = spark.read.format("singlestore")
.option("database", "database_name")
.option("query", "SELECT * FROM table_name").load()

Example 8: Use Save Modes

Save operations can optionally take a SaveMode setting. Save modes specify how to handle existing data.

It is important to realize that these save modes do not utilize any locking and are not atomic.

  1. SaveMode.Append means that when saving a DataFrame to a data source, if the data/table already exists, the save operation is expected to append the contents of the DataFrame to the existing data.

  2. SaveMode.Overwrite means that when saving a DataFrame to a data source, if the data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

    Overwrite mode depends on overwriteBehavior option. See Merge on Save for more information.

  3. SaveMode.ErrorIfExists means that when saving a DataFrame to a data source, if the data already exists, an exception is expected to be thrown.

  4. SaveMode.Ignore means that when saving a DataFrame to a data source, if the data already exists, the save operation is expected to append the contents of the DataFrame to the existing data and ignore all duplicate key errors.

Here's an example:

df.write
.mode(SaveMode.Append)
.save("foo.bar")

Example 9: Merge on Save

When saving DataFrames or datasets to SingleStore, you can manage how SaveMode.Overwrite is interpreted by the connector via the overwriteBehavior option. This option can take one of the following values:

  1. dropAndCreate(default): Drop and create the table before writing new values.

  2. truncate: Truncate the table before writing new values.

  3. merge : Replace existing rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use the onDuplicateKeySQL option instead.)

All these options are case-insensitive. Here's an example:

Suppose you have the following table t, and the Id column is the primary key.

SELECT * FROM t;

Id

Name

Age

1

Alice

20

2

Bob

25

3

Charlie

30

If you save the following DataFrame with overwriteBehavior = merge:

Id

Name

Age

2

Daniel

22

3

Eve

27

4

Franklin

35

df.write
.format("singlestore")
.option("overwriteBehavior", "merge")
.mode(SaveMode.Overwrite)
.save("db.t")

After the save is complete, the table looks like this:

Note: Rows with Id=2 and Id=3 are overwritten with new rows. The row with Id=1 is not touched and still exists in the result.

SELECT * FROM t;

Id

Name

Age

1

Alice

20

2

Daniel

22

3

Eve

27

4

Franklin

35

Example 10: Retrieve the Number of Written Rows from taskMetrics

The following example shows how to add a listener and get the number of written rows.

spark.sparkContext.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
println("Task id: " + taskEnd.taskInfo.id.toString)
println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)
}
})
df.write.format("singlestore").save("example")

Example 11: Specify Keys for Tables Created by the Spark Connector

When creating a table, the singlestore-spark-connector reads the options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

Note

The default table type is a columnstore. To create a rowstore table instead, enable the createRowstoreTable option.

Here's an example:

df.write
.format("singlestore")
.option("tableKey.primary", "id")
.option("tableKey.key.created_firstname", "created, firstname")
.option("tableKey.unique", "username")
.mode(SaveMode.Overwrite)
.save("foo.bar") // in format: database.table

In this example, we create three keys:

  1. A primary key on the id column.

  2. A regular key on the combination of the firstname and created columns, with the key name created_firstname.

  3. A unique key on the username column.

Any key can optionally specify a name, just specify it after the key type. Key names must be unique. To change the default columnstore sort key, you can specify it explicitly:

df.write
.option("tableKey.columnstore", "id")

You can also customize the shard key, for example:

df.write
.option("tableKey.shard", "id, lastname")

Example 12: Insert Rows into the Table with ON DUPLICATE KEY UPDATE

When updating a rowstore table, it is possible to insert rows via the ON DUPLICATE KEY UPDATE option. For example:

df.write
.option("onDuplicateKeySQL", "age = age + 1")
.option("insertBatchSize", 300)
.mode(SaveMode.Append)
.save("foo.bar")

As a result of this query, all new rows are appended without changes. If a row with the same PRIMARY KEY or UNIQUE index already exists then the corresponding age value will be increased.

When you use ON DUPLICATE KEY UPDATE, all rows of the DataFrame are split into batches, and every insert query can have a maximum of insertBatchSize rows.

Use the onDuplicateKeySQL Setting to Perform a Partial Update

You can use the OnDuplicateKeySQL setting to do a partial update on rows matching a primary key. For example, consider a DataFrame df, with a primary key id, created and loaded into SingleStore from a Spark Dataframe.

df.write
.format("singlestore")
.option("tableKey.primary", "id")
.save("test")
spark.read.format("singlestore").load("test").show()
+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|           2|
|  1|   1|           1|
|  3|   3|           3|
+---+----+------------+

In this case, the user only wants to update the column dataToUpdate, but not the column data. To update both, use the overwriteBehavior setting instead, and set it to merge.

First, create a sample DataFrame that holds the data which reflects the updates to the table. Note how both columns data and dataToUpdate in this updated DataFrame are different compared to the originally created object.

val updateDf = spark.createDF(
List(
(1, -1, 20),
(2, -1, 100)
),
List(
("id", IntegerType, true),
("data", IntegerType, true),
("dataToUpdate", IntegerType, true)
)
)

The following command only updates the dataToUpdate field columns in the SingleStore table, not the data field.

updateDf.write
.format("singlestore")
.option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)")
.mode(SaveMode.Append)
.save("test")
spark.read.format("singlestore").load("test").show()
+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|         100|
|  1|   1|          20|
|  3|   3|           3|
+---+----+------------+

Last modified: May 3, 2023

Was this article helpful?