Examples
On this page
Example 1: Configure the singlestore-spark-connector
Globally
The following example shows how to configure singlestore-spark-connector
for SingleStore Helios.
Using Java
SparkConf conf = new SparkConf();conf.set("spark.datasource.singlestore.clientEndpoint", "singlestore-host")spark.conf.set("spark.datasource.singlestore.user", "admin")spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Using Python
spark.conf.set("spark.datasource.singlestore.clientEndpoint", "singlestore-host")spark.conf.set("spark.datasource.singlestore.user", "admin")spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Example 2: Read a SingleStore Table into a DataFrame
The following example reads a SingleStore table named foo into a DataFrame.
val df = spark.read.format("singlestore").option("clientEndpoint", "singlestore-host").option("user", "admin").load("foo")
Using Java
DataFrame df = spark.read().format("singlestore").option("clientEndpoint", "singlestore-host").option("user", "admin").load("foo");
Using Python
df = spark \.read \.format("singlestore") \.option("clientEndpoint", "singlestore-host") \.option("user", "admin") \.load("foo")
Example 3: Read a SingleStore Table into a DataFrame and Apply DataFrame Transformations
The following example reads a SingleStore table named foo into a DataFrame and applies DataFrame transformations:
val x = spark.read.format("singlestore").option("clientEndpoint").option("user", "admin").load("foo").withColumn("hello", lit(2)).filter(col("id") > 1).limit(1000).groupBy(col("id")).agg(count("*"))
Example 4: Configure the singlestore-spark-connector
Using an External Table in Spark SQL and Select from the Table
spark.sql("CREATE TABLE bar USING singlestore OPTIONS ('clientEndpoint'='singlestore-host','dbtable'='foo.bar')")
spark.sql("SELECT * FROM bar LIMIT 10").show()
Example 5: Use the Spark Write API to Save a DataFrame to SingleStore
df.write.format("singlestore").option("loadDataCompression", "LZ4").option("overwriteBehavior", "dropAndCreate").mode(SaveMode.Overwrite).save("foo.bar") // in format: database.table
If the target table (foo in this example) does not exist in SingleStore, the singlestore-spark-connector
automatically attempts to create the table.SaveMode.
and if the target table already exists, it will be recreated or truncated before load.overwriteBehavior = truncate
to truncate rather than re-create.
Example 6: Use dbtable
This example requires global options like clientEndpoint
, user
, and password
to be set.dbtable
option, there's no need to specify the query
option, and vice versa.
Here's an example of how to use dbtable
:
df = spark.read\.format("singlestore")\.option("dbtable", tble_name).load()
Example 7: Use query
This example requires global options like clientEndpoint
, user
, and password
to be set.dbtable
option, there's no need to specify the query
option, and vice versa.
When using the query
option, you must specify the database option:
df = spark.read.format("singlestore").option("database", "database_name").option("query", "SELECT * FROM table_name").load()
Example 8: Use Save Modes
Save operations can optionally take a SaveMode
setting.
It is important to realize that these save modes do not utilize any locking and are not atomic.
-
SaveMode.
means that when saving a DataFrame to a data source, if the data/table already exists, the save operation is expected to append the contents of the DataFrame to the existing data.Append -
SaveMode.
means that when saving a DataFrame to a data source, if the data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.Overwrite Overwrite
mode depends onoverwriteBehavior
option.See Merge on Save for more information. -
SaveMode.
means that when saving a DataFrame to a data source, if the data already exists, an exception is expected to be thrown.ErrorIfExists -
SaveMode.
means that when saving a DataFrame to a data source, if the data already exists, the save operation is expected to append the contents of the DataFrame to the existing data and ignore all duplicate key errors.Ignore
Here's an example:
df.write.mode(SaveMode.Append).save("foo.bar")
Example 9: Merge on Save
When saving DataFrames or datasets to SingleStore, you can manage how SaveMode.
is interpreted by the connector via the overwriteBehavior
option.
-
dropAndCreate
(default): Drop and create the table before writing new values. -
truncate
: Truncate the table before writing new values. -
merge
: Replace existing rows with new rows by matching on the primary key.(Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use the onDuplicateKeySQL
option instead.)
All these options are case-insensitive.
Suppose you have the following table t
, and the Id
column is the primary key.
SELECT * FROM t;
Id |
Name |
Age |
---|---|---|
1 |
Alice |
20 |
2 |
Bob |
25 |
3 |
Charlie |
30 |
If you save the following DataFrame with overwriteBehavior = merge
:
Id |
Name |
Age |
---|---|---|
2 |
Daniel |
22 |
3 |
Eve |
27 |
4 |
Franklin |
35 |
df.write.format("singlestore").option("overwriteBehavior", "merge").mode(SaveMode.Overwrite).save("db.t")
After the save is complete, the table looks like this:
Note: Rows with Id=2 and Id=3 are overwritten with new rows.
SELECT * FROM t;
Id |
Name |
Age |
---|---|---|
1 |
Alice |
20 |
2 |
Daniel |
22 |
3 |
Eve |
27 |
4 |
Franklin |
35 |
Example 10: Retrieve the Number of Written Rows from taskMetrics
The following example shows how to add a listener and get the number of written rows.
spark.sparkContext.addSparkListener(new SparkListener() {override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {println("Task id: " + taskEnd.taskInfo.id.toString)println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)}})df.write.format("singlestore").save("example")
Example 11: Specify Keys for Tables Created by the Spark Connector
When creating a table, the singlestore-spark-connector
reads the options prefixed with tableKey
.
Note
The default table type is a columnstore.createRowstoreTable
option.
Here's an example:
df.write.format("singlestore").option("tableKey.primary", "id").option("tableKey.key.created_firstname", "created, firstname").option("tableKey.unique", "username").mode(SaveMode.Overwrite).save("foo.bar") // in format: database.table
In this example, we create three keys:
-
A primary key on the
id
column. -
A regular key on the combination of the
firstname
andcreated
columns, with the key namecreated_
.firstname -
A unique key on the
username
column.
Any key can optionally specify a name, just specify it after the key type.
df.write.option("tableKey.columnstore", "id")
You can also customize the shard key, for example:
df.write.option("tableKey.shard", "id, lastname")
Example 12: Insert Rows into the Table with ON DUPLICATE KEY UPDATE
When updating a rowstore table, it is possible to insert rows via the ON DUPLICATE KEY UPDATE
option.
df.write.option("onDuplicateKeySQL", "age = age + 1").option("insertBatchSize", 300).mode(SaveMode.Append).save("foo.bar")
As a result of this query, all new rows are appended without changes.PRIMARY KEY
or UNIQUE
index already exists then the corresponding age
value will be increased.
When you use ON DUPLICATE KEY UPDATE
, all rows of the DataFrame are split into batches, and every insert query can have a maximum of insertBatchSize
rows.
Use the onDuplicateKeySQL
Setting to Perform a Partial Update
You can use the OnDuplicateKeySQL
setting to do a partial update on rows matching a primary key.df
, with a primary key id
, created and loaded into SingleStore from a Spark Dataframe.
df.write.format("singlestore").option("tableKey.primary", "id").save("test")spark.read.format("singlestore").load("test").show()
+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
| 2| 2| 2|
| 1| 1| 1|
| 3| 3| 3|
+---+----+------------+
In this case, the user only wants to update the column dataToUpdate
, but not the column data
.overwriteBehavior
setting instead, and set it to merge
.
First, create a sample DataFrame that holds the data which reflects the updates to the table.data
and dataToUpdate
in this updated DataFrame are different compared to the originally created object.
val updateDf = spark.createDF(List((1, -1, 20),(2, -1, 100)),List(("id", IntegerType, true),("data", IntegerType, true),("dataToUpdate", IntegerType, true)))
The following command only updates the dataToUpdate
field columns in the SingleStore table, not the data
field.
updateDf.write.format("singlestore").option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)").mode(SaveMode.Append).save("test")spark.read.format("singlestore").load("test").show()
+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
| 2| 2| 100|
| 1| 1| 20|
| 3| 3| 3|
+---+----+------------+
Last modified: May 3, 2023