Parallel Read Support

You can enable parallel reads via the enableParallelRead option. The parallel read operation creates multiple Spark tasks. This can drastically improve the performance in some cases. Here's an example:

spark.read.format("singlestore")
.option("enableParallelRead", "automatic")
.option("parallelRead.Features", "readFromAggregatorsMaterialized,readFromAggregators")
.option("parallelRead.repartition", "true")
.option("parallelRead.repartition.columns", "a, b")
.option("parallelRead.TableCreationTimeout", "1000")
.load("db.table")

enableParallelRead Modes

The enableParallelRead option can have one of the following values:

  • disabled: Disables parallel reads and performs non-parallel reads.

  • automaticLite: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise performs a non-parallel read. In automaticLite mode, after push down of the outer sorting operation (for example, a nested SELECT statement where sorting is done in a top-level SELECT) into SingleStore is done, a non-parallel read is used.

  • automatic: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise performs a non-parallel read. In automatic mode, the singlestore-spark-connector is unable to push down an outer sorting operation into SingleStore. Final sorting is done at the Spark end of the operation.

  • forced: Performs parallel reads if at least one parallel read feature specified in parallelRead.Features is supported. Otherwise it returns an error. In forced mode, the singlestore-spark-connector is unable to push down an outer sorting operation into SingleStore. Final sorting is done at the Spark end of the operation.

Note

By default, enableParallelRead is set to automaticLite.

Parallel Read Features

The SingleStore Spark Connector supports the following parallel read features:

  • readFromAggregators

  • readFromAggregatorsMaterialized

  • readFromLeaves

Note

SingleStore Helios only supports the readFromAggregators and readFromAggregatorsMaterialized features. To avoid changes in the event of migrating the workload to the cloud, use only these two features for self-managed clusters.

The connector uses the first feature specified in parallelRead.Features which meets all the requirements. The requirements for each feature are specified below. By default, the connector uses the readFromAggregators feature. You can repartition the result set for readFromAggregators and readFromAggregatorsMaterialized features. See Parallel Read Repartitioning for more information.

readFromAggregators

When this feature is used, the number of partitions in the resulting DataFrame is the least of the number of partitions in the SingleStore database and Spark parallelism level (i.e., sum of spark.executor.cores/spark.task.cpus for all executors). You can specify the number of partitions in the resulting DataFrame using the parallelRead.maxNumPartitions option. To use this feature, all the tasks must start reading at the same time. Hence, the parallelism level of the Spark cluster must be greater than the number of partitions in the resulting DataFrame.

Use the parallelRead.tableCreationTimeoutMS option to specify a timeout for result table creation.

Requirements

To use this feature, the following requirements must be met:

  • SingleStore version 7.5+

  • SingleStore Spark Connector version 3.2+

  • Either the database option is set, or the database name is specified in the load option

  • SingleStore parallel read functionality supports the generated query

readFromAggregatorsMaterialized

When using this feature, the number of partitions in the resulting DataFrame will be the same as the number of partitions in the SingleStore database. You can specify the number of partitions in the resulting DataFrame using the parallelRead.maxNumPartitions option. This feature is very similar to the readFromAggregators feature. The only difference is that readFromAggregatorsMaterialized uses the MATERIALIZED option to create the result table. When this feature is used, the reading tasks do not have to start at the same time. Hence, the parallelism level on the Spark cluster does not affect the reading tasks. Although, using the MATERIALIZED option may cause a query to fail if SingleStore does not have enough memory to materialize the result set. Refer to Read Query Results in Parallel for more information.

Use the parallelRead.materializedTableCreationTimeoutMS option to specify a timeout for materialized result table creation.

Requirements

To use this feature, the following requirements must be met:

  • SingleStore version 7.5+

  • SingleStore Spark Connector version 3.2+

  • Either the database option is set, or the database name is specified in the load option

  • SingleStoreparallel read functionality supports the generated query

readFromLeaves

When this feature is used, the singlestore-spark-connector skips the transaction layer and reads directly from partitions on the leaf nodes. Hence, each individual read task sees an independent version of the database's distributed state. If some queries (other than read operation) are run on the database, they may affect the current read operation.

This feature supports only those query-shapes that do not perform any operation on the aggregator and can be pushed down to the leaf nodes. To determine if a specific query is being pushed down to leaf nodes, query the DataFrame for the number of partitions using df.rdd.getNumPartitions. If this value is greater than 1, then we are reading in parallel from the leaf nodes. By default, the number of partitions in the resulting DataFrame is equal to the number of partitions in the SingleStore database. You can specify the number of partitions in the resulting DataFrame using the parallelRead.maxNumPartitions option.

Requirements

To use this feature, the following requirements must be met:

  • Either the database option is set, or the database name is specified in the load option

  • The username and password provided to the singlestore-spark-connector must be uniform across all the nodes in the cluster, because parallel reads require consistent authentication and connectible leaf nodes

  • The hostnames and ports listed by SHOW LEAVES must be directly connectible from Spark

  • The generated query can be pushed down to the leaf nodes

Parallel Read Repartitioning

You can repartition the result using parallelRead.repartition option for the readFromAggregators and readFromAggregatorsMaterialized features to ensure that each task reads approximately the same amount of data. In queries with top level limit clauses, this option helps distribute the read task across multiple partitions so that all the rows do not belong to a single partition.

Last modified: February 23, 2024

Was this article helpful?