# Examples

## Example 1: Configure the `singlestore-spark-connector` Globally

The following example shows how to configure `singlestore-spark-connector` for SingleStore.

## Using Java

```Java
SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
conf.set("spark.datasource.singlestore.user", "admin")
conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
```

## Using Python

```Python
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")
```

## Example 2: Read a SingleStore Table into a DataFrame

The following example reads a SingleStore table named **foo** into a DataFrame.

```scala
val df = spark.read
    .format("singlestore")
    .option("ddlEndpoint", "singlestore-master.cluster.internal")
    .option("user", "admin")
    .load("foo")

```

## Using Java

```java
DataFrame df = spark
  .read()
  .format("singlestore")
  .option("ddlEndpoint", "singlestore-master.cluster.internal")
  .option("user", "admin")
  .load("foo");

```

## Using Python

```python
df = spark \
  .read \
  .format("singlestore") \
  .option("ddlEndpoint", "singlestore-master.cluster.internal") \
  .option("user", "admin") \
  .load("foo")

```

## Example 3: Read a SingleStore Table into a DataFrame and Apply DataFrame Transformations

The following example reads a SingleStore table named **foo** into a DataFrame and applies DataFrame transformations:

```scala
val x = spark.read
    .format("singlestore")
    .option("ddlEndpoint")
    .option("user", "admin")
    .load("foo")
    .withColumn("hello", lit(2))
    .filter(col("id") > 1)
    .limit(1000)
    .groupBy(col("id"))
    .agg(count("*"))

```

## Example 4: Configure the `singlestore-spark-connector` Using an External Table in Spark SQL and Select from the Table

```Spark&#x20;SQL
spark.sql("CREATE TABLE bar USING singlestore OPTIONS ('ddlEndpoint'='singlestore-master.cluster.internal','dbtable'='foo.bar')")

```

```Spark&#x20;SQL
spark.sql("SELECT * FROM bar LIMIT 10").show()

```

## Example 5: Use the Spark Write API to Save a DataFrame to SingleStore

```scala
df.write
    .format("singlestore")
    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

```

If the target table (**foo** in this example) does not exist in SingleStore, the `singlestore-spark-connector` automatically attempts to create the table. If you specify `SaveMode.Overwrite` and if the target table already exists, it will be recreated or truncated before load. Specify `overwriteBehavior = truncate` to truncate rather than re-create.

## Example 6: Use `dbtable`

This example requires global options like `ddlEndpoint```, `user`, and `password` to be set. Note that if you specify the `dbtable` option, there's no need to specify the `query` option, and vice versa.

Here's an example of how to use `dbtable`:

```scala
df = spark.read\    
.format("singlestore")\     
.option("dbtable", tble_name).load()
```

## Example 7: Use `query`

This example requires global options like `ddlEndpoint```, `user`, and `password` to be set. Note that if you specify the `dbtable` option, there's no need to specify the `query` option, and vice versa.

When using the `query` option, you must specify the database option:

```scala
df = spark.read.format("singlestore")
   .option("database", "database_name")
   .option("query", "SELECT * FROM table_name").load()
```

## Example 8: Use Save Modes

Save operations can optionally take a `SaveMode` setting. Save modes specify how to handle existing data.

It is important to realize that these save modes do not utilize any locking and are not atomic.

1. `SaveMode.Append` means that when saving a DataFrame to a data source, if the data/table already exists, the save operation is expected to append the contents of the DataFrame to the existing data.

2. `SaveMode.Overwrite` means that when saving a DataFrame to a data source, if the data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

   `Overwrite` mode depends on `overwriteBehavior` option. See [Merge on Save](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/load-data-from-spark/examples/#UUID-b97058f6-2ffe-8d72-364c-e0654eb5415f.md) for more information.

3. `SaveMode.ErrorIfExists` means that when saving a DataFrame to a data source, if the data already exists, an exception is expected to be thrown.

4. `SaveMode.Ignore` means that when saving a DataFrame to a data source, if the data already exists, the save operation is expected to append the contents of the DataFrame to the existing data and ignore all duplicate key errors.

Here's an example:

```scala
df.write
    .mode(SaveMode.Append)
    .save("foo.bar")

```

## Example 9: Merge on Save

When saving DataFrames or datasets to SingleStore, you can manage how `SaveMode.Overwrite` is interpreted by the connector via the `overwriteBehavior` option. This option can take one of the following values:

1. `dropAndCreate`(default): Drop and create the table before writing new values.

2. `truncate`: Truncate the table before writing new values.

3. `merge` : Replace existing rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for the update, use the `onDuplicateKeySQL` option instead.)

All these options are case-insensitive. Here's an example:

Suppose you have the following table `t`, and the `Id` column is the primary key.

```sql
SELECT * FROM t;

```

| Id | Name    | Age |
| -- | ------- | --- |
| 1  | Alice   | 20  |
| 2  | Bob     | 25  |
| 3  | Charlie | 30  |

If you save the following DataFrame with `overwriteBehavior = merge`:

| Id | Name     | Age |
| -- | -------- | --- |
| 2  | Daniel   | 22  |
| 3  | Eve      | 27  |
| 4  | Franklin | 35  |

```scala
df.write
    .format("singlestore")
    .option("overwriteBehavior", "merge")
    .mode(SaveMode.Overwrite)
    .save("db.t")

```

After the save is complete, the table looks like this:

**Note**: Rows with **Id=2** and **Id=3** are overwritten with new rows. The row with **Id=1** is not touched and still exists in the result.

```sql
SELECT * FROM t;

```

| Id | Name     | Age |
| -- | -------- | --- |
| 1  | Alice    | 20  |
| 2  | Daniel   | 22  |
| 3  | Eve      | 27  |
| 4  | Franklin | 35  |

## Example 10: Retrieve the Number of Written Rows from taskMetrics

The following example shows how to add a listener and get the number of written rows.

```scala
spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    println("Task id: " + taskEnd.taskInfo.id.toString)
    println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)
  }
})

df.write.format("singlestore").save("example")

```

## Example 11: Specify Keys for Tables Created by the Spark Connector

When creating a table, the `singlestore-spark-connector` reads the options prefixed with `tableKey`. These options must be formatted in a specific way in order to correctly specify the keys.

> **📝 Note**: The default table type is a columnstore. To create a rowstore table instead, enable the `createRowstoreTable` option.

Here's an example:

```scala
df.write
    .format("singlestore")
    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstname")
    .option("tableKey.unique", "username")
    .mode(SaveMode.Overwrite)
    .save("foo.bar") // in format: database.table

```

In this example, we create three keys:

1. A primary key on the `id` column.

2. A regular key on the combination of the `firstname` and `created` columns, with the key name `created_firstname`.

3. A unique key on the `username` column.

Any key can optionally specify a name, just specify it after the key type. Key names must be unique. To change the default columnstore sort key, you can specify it explicitly:

```scala
df.write
    .option("tableKey.columnstore", "id")

```

You can also customize the shard key, for example:

```scala
df.write
    .option("tableKey.shard", "id, lastname")

```

## Example 12: Insert Rows into the Table with `ON DUPLICATE KEY UPDATE`

When updating a rowstore table, it is possible to insert rows via the `ON DUPLICATE KEY UPDATE` option. For example:

```scala
df.write
    .option("onDuplicateKeySQL", "age = age + 1")
    .option("insertBatchSize", 300)
    .mode(SaveMode.Append)
    .save("foo.bar")

```

As a result of this query, all new rows are appended without changes. If a row with the same `PRIMARY KEY` or `UNIQUE` index already exists then the corresponding `age` value will be increased.

When you use `ON DUPLICATE KEY UPDATE`, all rows of the DataFrame are split into batches, and every insert query can have a maximum of `insertBatchSize` rows.

## Use the `onDuplicateKeySQL` Setting to Perform a Partial Update

You can use the `OnDuplicateKeySQL` setting to do a partial update on rows matching a primary key. For example, consider a DataFrame `df`, with a primary key `id`, created and loaded into SingleStore from a Spark Dataframe.

```scala
df.write
      .format("singlestore")
      .option("tableKey.primary", "id")
      .save("test")
spark.read.format("singlestore").load("test").show()

```

```output

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|           2|
|  1|   1|           1|
|  3|   3|           3|
+---+----+------------+

```

In this case, the user only wants to update the column `dataToUpdate`, but not the column `data`. To update both, use the `overwriteBehavior` setting instead, and set it to `merge`.

First, create a sample DataFrame that holds the data which reflects the updates to the table. Note how both columns `data` and `dataToUpdate` in this updated DataFrame are different compared to the originally created object.

```scala
val updateDf = spark.createDF(
      List(
        (1, -1, 20),
        (2, -1, 100)
      ),
      List(
        ("id", IntegerType, true),
        ("data", IntegerType, true),
        ("dataToUpdate", IntegerType, true)
      )
)

```

The following command only updates the `dataToUpdate` field columns in the SingleStore table, not the `data` field.

```scala
updateDf.write
      .format("singlestore")
      .option("onDuplicateKeySQL", "dataToUpdate = VALUES(dataToUpdate)")
      .mode(SaveMode.Append)
      .save("test")
spark.read.format("singlestore").load("test").show()

```

```output

+---+----+------------+
| id|data|dataToUpdate|
+---+----+------------+
|  2|   2|         100|
|  1|   1|          20|
|  3|   3|           3|
+---+----+------------+

```

***

Modified at: November 27, 2024

Source: [/db/v9.1/load-data/integrate-with-singlestore/load-data-from-spark/examples/](https://docs.singlestore.com/db/v9.1/load-data/integrate-with-singlestore/load-data-from-spark/examples/)

(An index of the documentation is available at /llms.txt)
