Examples
Example 1: Configure the singlestore-spark-connector
Globally
The following example shows how to configure singlestore-spark-connector
for SingleStoreDB.
Using Java
SparkConf conf = new SparkConf(); conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal") conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307") conf.set("spark.datasource.singlestore.user", "admin") conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Using Python
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal") spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307") spark.conf.set("spark.datasource.singlestore.user", "admin") spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Example 2: Read a SingleStoreDB Table into a DataFrame
The following example reads a SingleStoreDB table named foo into a DataFrame.
val df = spark.read .format("singlestore") .option("ddlEndpoint", "singlestore-master.cluster.internal") .option("user", "admin") .load("foo")
Using Java
DataFrame df = spark .read() .format("singlestore") .option("ddlEndpoint", "singlestore-master.cluster.internal") .option("user", "admin") .load("foo");
Using Python
df = spark \ .read \ .format("singlestore") \ .option("ddlEndpoint", "singlestore-master.cluster.internal") \ .option("user", "admin") \ .load("foo")
Example 3: Read a SingleStoreDB Table into a DataFrame and Apply DataFrame Transformations
The following example reads a SingleStoreDB table named foo into a DataFrame and applies DataFrame transformations:
val x = spark.read
.format("singlestore")
.option("ddlEndpoint")
.option("user", "admin")
.load("foo")
.withColumn("hello", lit(2))
.filter(col("id") > 1)
.limit(1000)
.groupBy(col("id"))
.agg(count("*"))
Example 4: Configure the singlestore-spark-connector
Using an External Table in Spark SQL and Select from the Table
spark.sql("CREATE TABLE bar USING singlestore OPTIONS ('ddlEndpoint'='singlestore-master.cluster.internal','dbtable'='foo.bar')")
spark.sql("SELECT * FROM bar LIMIT 10").show()
Example 5: Use the Spark Write API to Save a DataFrame to SingleStoreDB
df.write .format("singlestore") .option("loadDataCompression", "LZ4") .option("overwriteBehavior", "dropAndCreate") .mode(SaveMode.Overwrite) .save("foo.bar") // in format: database.table
If the target table (foo in this example) does not exist in SingleStoreDB, the singlestore-spark-connector
automatically attempts to create the table. If you specify SaveMode.Overwrite
and if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate
to truncate rather than re-create.
Example 6: Use dbtable
This example requires global options like ddlEndpoint
,
user
, and password
to be set. Note that if you specify the dbtable
option, there's no need to specify the query
option, and vice versa.
Here's an example of how to use dbtable
:
df = spark.read\ .format("singlestore")\ .option("dbtable", tble_name).load()
Example 7: Use query
This example requires global options like ddlEndpoint
,
user
, and password
to be set. Note that if you specify the dbtable
option, there's no need to specify the query
option, and vice versa.
When using the query
option, you must specify the database option:
df = spark.read.format("singlestore") .option("database", "database_name") .option("query", "SELECT * FROM table_name").load()
Example 8: Use Save Modes
Save operations can optionally take a SaveMode
setting. Save modes specify how to handle existing data.
It is important to realize that these save modes do not utilize any locking and are not atomic.
SaveMode.Append
means that when saving a DataFrame to a data source, if the data/table already exists, the save operation is expected to append the contents of the DataFrame to the existing data.SaveMode.Overwrite
means that when saving a DataFrame to a data source, if the data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.Overwrite
mode depends onoverwriteBehavior
option. See Merge on Save for more information.SaveMode.ErrorIfExists
means that when saving a DataFrame to a data source, if the data already exists, an exception is expected to be thrown.SaveMode.Ignore
means that when saving a DataFrame to a data source, if the data already exists, the save operation is expected to append the contents of the DataFrame to the existing data and ignore all duplicate key errors.
Here's an example:
df.write .mode(SaveMode.Append) .save("foo.bar")
Example 9: Merge on Save
When saving DataFrames or datasets to SingleStoreDB, you can manage how SaveMode.Overwrite
is interpreted by the connector via the overwriteBehavior
option. This option can take one of the following values:
dropAndCreate
(default): Drop and create the table before writing new values.truncate
: Truncate the table before writing new values.merge
: Replace existing rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use theonDuplicateKeySQL
option instead.)
All these options are case-insensitive. Here's an example:
Suppose you have the following table t
, and the Id
column is the primary key.
SELECT * FROM t;
Id | Name | Age |
---|---|---|
1 | Alice | 20 |
2 | Bob | 25 |
3 | Charlie | 30 |
If you save the following DataFrame with overwriteBehavior = merge
:
Id | Name | Age |
---|---|---|
2 | Daniel | 22 |
3 | Eve | 27 |
4 | Franklin | 35 |
df.write .format("singlestore") .option("overwriteBehavior", "merge") .mode(SaveMode.Overwrite) .save("db.t")
After the save is complete, the table looks like this:
Note: Rows with Id=2 and Id=3 are overwritten with new rows. The row with Id=1 is not touched and still exists in the result.
SELECT * FROM t;
Id | Name | Age |
---|---|---|
1 | Alice | 20 |
2 | Daniel | 22 |
3 | Eve | 27 |
4 | Franklin | 35 |
Example 10: Retrieve the Number of Written Rows from taskMetrics
The following example shows how to add a listener and get the number of written rows.
spark.sparkContext.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { println("Task id: " + taskEnd.taskInfo.id.toString) println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString) } }) df.write.format("singlestore").save("example")
Example 11: Specify Keys for Tables Created by the Spark Connector
When creating a table, the singlestore-spark-connector
reads the options prefixed with tableKey
. These options must be formatted in a specific way in order to correctly specify the keys.
Note
The default table type is a columnstore. To create a rowstore table instead, enable the createRowstoreTable
option.
Here's an example:
df.write .format("singlestore") .option("tableKey.primary", "id") .option("tableKey.key.created_firstname", "created, firstname") .option("tableKey.unique", "username") .mode(SaveMode.Overwrite) .save("foo.bar") // in format: database.table
In this example, we create three keys:
A primary key on the
id
column.A regular key on the combination of the
firstname
andcreated
columns, with the key namecreated_firstname
.A unique key on the
username
column.
Any key can optionally specify a name, just specify it after the key type. Key names must be unique. To change the default columnstore sort key, you can specify it explicitly:
df.write .option("tableKey.columnstore", "id")
You can also customize the shard key, for example:
df.write .option("tableKey.shard", "id, lastname")
Example 12: Insert Rows into the Table with ON DUPLICATE KEY UPDATE
When updating a rowstore table, it is possible to insert rows via the ON DUPLICATE KEY UPDATE
option. For example:
df.write .option("onDuplicateKeySQL", "age = age + 1") .option("insertBatchSize", 300) .mode(SaveMode.Append) .save("foo.bar")
As a result of this query, all new rows are appended without changes. If a row with the same PRIMARY KEY
or UNIQUE
index already exists then the corresponding age
value will be increased.
When you use ON DUPLICATE KEY UPDATE
, all rows of the DataFrame are split into batches, and every insert query can have a maximum of insertBatchSize
rows.
Use the onDuplicateKeySQL
Setting to Perform a Partial Update
You can use the OnDuplicateKeySQL
setting to do a partial update on rows matching a primary key. For example, consider a DataFrame df
, with a primary key id
, created and loaded into SingleStoreDB from a Spark Dataframe.
df.write .format("singlestore") .option("tableKey.primary", "id") .save("test") spark.read.format("singlestore").load("test").show() **** +---+----+------------+ | id|data|dataToUpdate| +---+----+------------+ | 2| 2| 2| | 1| 1| 1| | 3| 3| 3| +---+----+------------+
In this case, the user only wants to update the column dataToUpdate
, but not the column data
. To update both, use the overwriteBehavior
setting instead, and set it to merge
.
First, create a sample DataFrame that holds the data which reflects the updates to the table. Note how both columns data
and dataToUpdate
in this updated DataFrame are different compared to the originally created object.
val updateDf = spark.createDF( List( (1, -1, 20), (2, -1, 100) ), List( ("id", IntegerType, true), ("data", IntegerType, true), ("dataToUpdate", IntegerType, true) ) )
The following command only updates the dataToUpdate
field columns in the SingleStoreDB table, not the data
field.
updateDf.write .format("singlestore") .option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)") .mode(SaveMode.Append) .save("test") spark.read.format("singlestore").load("test").show() **** +---+----+------------+ | id|data|dataToUpdate| +---+----+------------+ | 2| 2| 100| | 1| 1| 20| | 3| 3| 3| +---+----+------------+