SingleStore Managed Service

Examples
Example 1: Configuring the singlestore-spark-connector Globally
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Example 1: Java
SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
conf.set("spark.datasource.singlestore.user", "admin")
conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Example 1: Python
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
Example 2: Reading a SingleStore Table (foo) into a DataFrame
val df = spark.read
    .format("singlestore")
    .option("ddlEndpoint", "singlestore-master.cluster.internal")
    .option("user", "admin")
    .load("foo")
Example 2: Java
DataFrame df = spark
  .read()
  .format("singlestore")
  .option("ddlEndpoint", "singlestore-master.cluster.internal")
  .option("user", "admin")
  .load("foo");
Example 2: Python
df = spark \
  .read \
  .format("singlestore") \
  .option("ddlEndpoint", "singlestore-master.cluster.internal") \
  .option("user", "admin") \
  .load("foo")
Example 3: Reading a SingleStore Table (foo) into a DataFrame and Applying Data Frame Operations
val x = spark.read
    .format("singlestore")
    .option("ddlEndpoint")
    .option("user", "admin")
    .load("foo")
    .withColumn("hello", lit(2))
    .filter(col("id") > 1)
    .limit(1000)
    .groupBy(col("id"))
    .agg(count("*"))
Example 4: Configuring the singlestore-spark-connector Using an External Table in Spark SQL and Selecting from It
spark.sql("CREATE TABLE bar USING singlestore OPTIONS ('ddlEndpoint'='singlestore-master.cluster.internal','dbtable'='foo.bar')")
spark.sql("select * from bar limit 10").show()
Example 5: Using the Spark Write API to Save a Data Frame to SingleStore
df.write
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

Note: This is the same for Java and Python.

If the target table (foo in the example above) does not exist in SingleStore the singlestore-spark-connector will automatically attempt to create the table. If you specify SaveMode.Overwrite, if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate to truncate rather than re-create.

dbtable Example

This example requires global options like ddl endpoint, user, and password to be set. Note that if you specify dbtable, there's no need to specify query, and vice versa.

Using the dbtable() parameter:

df = spark.read\    
.format("singlestore")\     
.option("dbtable", tble_name).load()
query Example

This example requires global options like ddl endpoint, user, and password to be set. Note that if you specify dbtable, there's no need to specify query, and vice versa.

When using the query option, you should specify the database option:

df = spark.read.format("singlestore")
   .option("database", "database_name")
   .option("query", "SELECT * FROM table_name").load()
Save Modes

Save operations can optionally take a SaveMode setting (the above example uses Overwrite). Save Modes specify how to handle existing data if present.

It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an overwrite, the data will be deleted before writing out the new data.

  1. SaveMode.Append means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

  2. SaveMode.Overwrite means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

    Overwrite mode depends on overwriteBehavior option, for better understanding look at the section Merging on Save.

  3. SaveMode.ErrorIfExists means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

  4. SaveMode.Ignore means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

Example of SaveMode Option
df.write
    .mode(SaveMode.Append)
    .save("foo.bar")
Retrieving the Number of Written Rows from taskMetrics

It is possible to add the listener and get the number of written rows.

spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    println("Task id: " + taskEnd.taskInfo.id.toString)
    println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)
  }
})

df.write.format("singlestore").save("example")
Specifying Keys for Tables Created by the Spark Connector

When creating a table, the singlestore-spark-connector will read options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

Notice

The default table type is a SingleStore columnstore. If you want to use a rowstore table, you will need to specify a primary key using the tableKey option.

To explain we will refer to the following example:

df.write
    .format("singlestore")
    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstname")
    .option("tableKey.unique", "username")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

In this example, we are creating three keys:

  1. A primary key on the id column

  2. A regular key on the combination of the firstname and created columns, with the key name created_firstname

  3. A unique key on the username column Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.

To change the default ColumnStore sort key you can specify it explicitly:

df.write
    .option("tableKey.columnstore", "id")

You can also customize the shard key like so.

df.write
    .option("tableKey.shard", "id, lastname")
Inserting Rows into the Table with ON DUPLICATE KEY UPDATE

When updating a rowstore table it is possible to insert rows via the ON DUPLICATE KEY UPDATE option. See INSERT in the SQL Reference for more details.

df.write
    .option("onDuplicateKeySQL", "age = age + 1")
    .option("insertBatchSize", 300)
    .mode(SaveMode.Append)
    .save("foo.bar")

As a result of the following query, all new rows will be appended without changes. If a row with the same PRIMARY KEY or UNIQUE index already exists then the corresponding age value will be increased.

When you use ON DUPLICATE KEY UPDATE, all rows of the data frame are split into batches, and every insert query will contain no more than the specified insertBatchSize rows setting.

Using the onDuplicateKeySQL Setting to Perform a Partial Update

You can also use the OnDuplicateKeySQL setting to do a partial update on rows matching a primary key. For example, consider a data frame df, with a primary key id that we created and loaded into SingleStore from a Spark Dataframe.

df.write
      .format("singlestore")
      .option("tableKey.primary", "id")
      .save("test")
spark.read.format("singlestore").load("test").show()

Output:

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|           2|
|  1|   1|           1|
|  3|   3|           3|
+---+----+------------+

In this case, the user only wants to update the column dataToUpdate, but not the column data. Note that if you would like to update both, you should instead use the overWriteBehavior setting and set it to merge.

First, we create a sample data frame that will hold the data which reflects our updates to the table. Note how both columns data and datatoUpdate in this updated data frame are different compared to the originally created object.

val updateDf = spark.createDF(
      List(
        (1, -1, 20),
        (2, -1, 100)
      ),
      List(
        ("id", IntegerType, true),
        ("data", IntegerType, true),
        ("dataToUpdate", IntegerType, true)
      )
)

The following command will only update the dataToUpdate field columns in the SingleStore table, not the data field.

updateDf.write
      .format("singlestore")
      .option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)")
      .mode(SaveMode.Append)
      .save("test")
spark.read.format("singlestore").load("test").show()

The final output of the partial update looks like this:

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|         100|
|  1|   1|          20|
|  3|   3|           3|
+---+----+------------+
Merging on Save

When saving dataframes or datasets to SingleStore, you can manage how SaveMode.Overwrite is interpreted by the connector via the option overwriteBehavior. This option can take one of the following values:

  1. dropAndCreate(default) - drop and create the table before writing new values.

  2. truncate - truncate the table before writing new values.

  3. merge - replace rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use the onDuplicateKeySQL option instead.)

All these options are case-insensitive.

Example of merge Option

Suppose you have the following table t, and the Id column is the primary key.

SELECT * FROM t;

Id

Name

Age

1

Alice

20

2

Bob

25

3

Charlie

30

If you save the following dataframe with overwriteBehavior = merge:

Id

Name

Age

2

Daniel

22

3

Eve

27

4

Franklin

35

df.write
    .format("singlestore")
    .option("overwriteBehavior", "merge")
    .mode(SaveMode.Overwrite)
    .save("db.t")

After the save is complete, the table will look like this:

Note: Rows with Id=2 and Id=3 were overwritten with new rows. The row with Id=1 was not touched and still exists in the result.

SELECT * FROM t;

Id

Name

Age

1

Alice

20

2

Daniel

22

3

Eve

27

4

Franklin

35