# Query Tuning

## Things to Consider Before Query Tuning

Before creating your database, it's important to consider the workload that will be placed upon it. The three variables that can be optimized to increase performance are concurrency, throughput, and latency.

*Concurrency* is the number of queries that are executing a on the database the same time. If your workload is primarily lightweight inserts and updates on your table concurrency will like be low. However, if a large number of those operations occur simultaneously, it can negatively impact overall performance.

*Throughput* is the number of queries that are executing per unit of time. It is a function of concurrency/unit of time. If your workload is primarily lightweight inserts and updates on a table, perhaps those queries can be encapsulated within stored procedures to prevent repeated compilation.

*Latency* is the query execution time. This variable can be optimized by following query performance best practices, such as by choosing the best database schema, table type, and keys for your business needs and workloads. Issues that are beyond the database engine's control can also cause latency, such as poor network connectivity.

Lastly, your cluster size will determine the throughput and performance of databases in the cluster. A larger size will typically result in better query performance, lower latencies, and a higher number of concurrent queries. 

You can utilize [Use the Workload Manager](https://docs.singlestore.com/db/v9.1/user-and-cluster-administration/use-the-workload-manager-and-set-resource-limits/use-the-workload-manager.md) and [Set Resource Limits](https://docs.singlestore.com/db/v9.1/user-and-cluster-administration/use-the-workload-manager-and-set-resource-limits/set-resource-limits.md) as tools to manage cluster workload and define resource pools that can improve query performance.

## Tradeoff between Aggregator Count and Leaf Count

When high concurrency is of the utmost importance, it is best to have more aggregator nodes than leaf nodes since the aggregator receives data requests from users.  &#x20;

![](https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/bltfe8fadf66cf83371/6a2c430403b373f4d9908168/High_Concurrency_diagram-IfHKrC.png)

Conversely, if low latency is more important than high concurrently, then it is better to have more leaf nodes (and therefore, more partitions) since the data is stored on leaf nodes.

![](https://images.contentstack.io/v3/assets/bltac01ee6daa3a1e14/blt17f391ba591dfc6d/6a2c43170fc2f46c2c3f06a6/Low_Latency_diagram-Q8AqCC.png)

To adjust your aggregators and leaf node counts, please see the [Cluster Management Commands](https://docs.singlestore.com/db/v9.1/reference/sql-reference/cluster-management-commands.md) page.

## Run ANALYZE

The `ANALYZE` command collects data statistics on a table to facilitate accurate query optimization. This is especially important for optimizing complex analytical queries. You should `ANALYZE` your tables after inserting, updating, or deleting large numbers of rows (30% of your table row count is a good rule of thumb). See the [Statistics and Sampling](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/statistics-and-sampling.md) topic for more information.

## Look for Hot Queries

You can do query analysis on hot running queries by using the [Workload Monitoring](https://docs.singlestore.com/db/v9.1/reference/singlestore-tools-reference/singlestore-studio/singlestore-studio-workload-monitoring.md) and [SingleStore Visual Explain](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/singlestore-visual-explain.md) features, or by running the [SHOW PLANCACHE](https://docs.singlestore.com/db/v9.1/reference/sql-reference/show-commands/show-plancache.md) command.

For example, you can use a query like the following to see the select queries with the highest average execution time. Of course, you can modify the query to filter by database, look for specific types of queries, find queries using the most total time or the highest memory use, etc.

```sql
SELECT Database_Name, Query_Text, Commits, Execution_Time, Execution_Time/Commits AS avg_time_ms, Average_Memory_Use
FROM information_schema.plancache
WHERE Query_Text LIKE 'select%'
ORDER BY avg_time_ms DESC;

```

## Impact of Hash Index on a Column having a Unique Index or Sort Key or Shard Key

A unique key requires an index implicitly and is influenced by a hash index unless it is created as an unenforced unique key.

The primary utility of a sort key is in segment elimination, so if you read a maximum of 1 million rows to find a single row based on a sort key matching lookup, a hash index will find the row and read 4096 rows; however, this depends on the cardinality of the column(s) in the key versus the table size. If there are a million rows in each partition that match the sort key, then the hash index is not useful.

Shard keys allow you to be selective in checking which partition may own a value. Since spreading different values of a variable (x) around partitions decreases the per-partition cardinality, it has some impact but it's not a replacement for other keys.

## Check if Queries are Using Indexes

One important query performance consideration is adding appropriate [indexes](https://docs.singlestore.com/db/v9.1/create-a-database/other-schema-concepts.md) for your queries. You can use [EXPLAIN](https://docs.singlestore.com/db/v9.1/reference/sql-reference/data-manipulation-language-dml/explain.md) to see whether queries are using indexes. The following examples show how to identify cases where an index can greatly improve query performance.

## Indexes for Filters

Consider the following rowstore table:

```sql
CREATE ROWSTORE TABLE qtune_1 (a INT, b INT);

```

Suppose we are running queries like:

```sql
SELECT * FROM qtune_1 WHERE a = 3;

```

`EXPLAIN` shows that running the query against the current table schema requires a full `TableScan` - scanning all the rows of `t`, which is unnecessarily expensive if a small fraction of the values in `t` equal 3.

```sql
EXPLAIN SELECT * FROM qtune_1 WHERE a = 3;

```

```output

+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Gather partitions:all alias:remote_0 parallelism_level:partition |
| Project [qtune_1.a, qtune_1.b]                                   |
| Filter [qtune_1.a = 3]                                           |
| TableScan test1.qtune_1 table_type:sharded_rowstore              |
+------------------------------------------------------------------+ 

```

If an index is added, the query will instead use an `IndexRangeScan` on the key `a`:

```sql
ALTER TABLE qtune_1 ADD INDEX (a);
EXPLAIN SELECT * FROM qtune_1 WHERE a = 3;

```

```output

+----------------------------------------------------------------------------------+
| EXPLAIN                                                                          |
+----------------------------------------------------------------------------------+
| Gather partitions:all alias:remote_0 parallelism_level:partition                 |
| Project [qtune_1.a, qtune_1.b]                                                   |
| IndexRangeScan test1.qtune_1, KEY a (a) scan:[a = 3] table_type:sharded_rowstore |
+----------------------------------------------------------------------------------+

```

A query that filters on both `a` and `b` is unable to take advantage of the filtering on `b` to reduce the rows that need to be scanned. As demonstrated below, the scan uses `a = 3` only.

```sql
EXPLAIN SELECT * FROM qtune_1 WHERE a = 3 AND b = 4;

```

```output

+----------------------------------------------------------------------------------+
| EXPLAIN                                                                          |
+----------------------------------------------------------------------------------+
| Gather partitions:all alias:remote_0 parallelism_level:partition                 |
| Project [qtune_1.a, qtune_1.b]                                                   |
| Filter [qtune_1.b = 4]                                                           |
| IndexRangeScan test1.qtune_1, KEY a (a) scan:[a = 3] table_type:sharded_rowstore |
+----------------------------------------------------------------------------------+         

```

Adding an index on `b` allows the query to scan more selectively:

```sql
ALTER TABLE qtune_1 ADD INDEX (b);
EXPLAIN SELECT * FROM qtune_1 WHERE a = 3  AND b = 4;

```

```output

+-------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                         |
+-------------------------------------------------------------------------------------------------+
| Gather partitions:all alias:remote_0 parallelism_level:partition                                |
| Project [qtune_1.a, qtune_1.b]                                                                  |
| IndexRangeScan test1.qtune_1, KEY a_2 (a, b) scan:[a = 3 AND b = 4] table_type:sharded_rowstore |
+-------------------------------------------------------------------------------------------------+
```

For columnstore tables, it's a best practice to set a sort key the table to take advantage of segment elimination.&#x20;

Segment elimination occurs when filtering on the sort key. The minimum/maximum value metadata for each segment is used at query execution time to determine whether a segment can match a filter. If not, the segment is skipped entirely and no data is examined.&#x20;

Consider a columnstore table with an explicit sort key:

```sql
CREATE TABLE qtune_2 (a int, b int, SORT KEY(a));
```

Suppose the following queries are run:

```sql
SELECT * FROM qtune_2 WHERE a = 3;
```

`EXPLAIN` shows that running the query against this table schema requires a `ColumnStoreScan` - scanning the segments (column `a`) which contain values equal to `3`. All segments not containing the value equal to `3` are not scanned. Furthermore, since segments are ordered by min and max values, the engine can easily locate the segment that contains `3`:

```sql
EXPLAIN SELECT * FROM qtune_2 WHERE a = 3;

```

```output

+------------------------------------------------------------------------------+
| EXPLAIN                                                                      |
+------------------------------------------------------------------------------+
| Gather partitions:all alias:remote_0 parallelism_level:segment               |
| Project [qtune_2.a, qtune_2.b]                                               |
| ColumnStoreFilter [qtune_2.a = 3]                                            |
| ColumnStoreScan test1.qtune_2, SORT KEY a (a) table_type:sharded_columnstore |
+------------------------------------------------------------------------------+

```

## Indexes for `GROUP BY` and `ORDER BY`

Another class of cases where indexes can improve query performance is `GROUP BY` and `ORDER BY` .

Consider this rowstore table:

```sql
CREATE ROWSTORE TABLE qtune_3 (a int, b int);

```

Executing the following query requires a `HashGroupBy`. SingleStore builds a hash table with an entry for each group of `a`:

```sql
EXPLAIN SELECT a, SUM(b) FROM qtune_3 GROUP BY a;

```

```output

+-------------------------------------------------------------------------------------+
| EXPLAIN                                                                             |
+-------------------------------------------------------------------------------------+
| Project [remote_0.a, `SUM(b)`] est_rows:1                                           |
| HashGroupBy [SUM(remote_0.`SUM(b)`) AS `SUM(b)`] groups:[remote_0.a]                |
| Gather partitions:all est_rows:1 alias:remote_0 parallelism_level:partition         |
| Project [qtune_3.a, `SUM(b)`] est_rows:1                                            |
| HashGroupBy [SUM(qtune_3.b) AS `SUM(b)`] groups:[qtune_3.a]                         |
| TableScan test1.qtune_3 table_type:sharded_rowstore est_table_rows:1 est_filtered:1 |
+-------------------------------------------------------------------------------------+

```

However, with an index on `a`, SingleStore can execute the query with a `StreamingGroupBy` operation because by scanning the index on `a`, it can process all elements of a group consecutively.

```sql
ALTER TABLE qtune_3 ADD INDEX(a);
EXPLAIN SELECT a, SUM(b) FROM qtune_3 GROUP BY a;

```

```output

+------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                        |
+------------------------------------------------------------------------------------------------+
| Project [remote_0.a] est_rows:1                                                                |
| HashGroupBy [] groups:[remote_0.a]                                                             |
| Gather partitions:all est_rows:1 alias:remote_0 parallelism_level:partition                    |
| Project [qtune_3.a] est_rows:1                                                                 |
| StreamingGroupBy [] groups:[qtune_3.a]                                                         |
| TableScan test1.qtune_3, KEY a (a) table_type:sharded_rowstore est_table_rows:1 est_filtered:1 |
+------------------------------------------------------------------------------------------------+

```

For a columnstore table, the column(s) in the `GROUP BY`  clause must match the `SORT KEY` for a `StreamingGroupBy` to be considered.

An `ORDER BY` on a rowstore table without an index, SingleStore needs to sort:

```sql
CREATE ROWSTORE TABLE qtune_4 (a INT, b INT);

```

```sql
EXPLAIN SELECT * FROM qtune_4 ORDER BY b;

```

```output

+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Project [remote_0.a, remote_0.b]                                 |
| Sort [remote_0.b]                                                |
| Gather partitions:all alias:remote_0 parallelism_level:partition |
| Project [qtune_4.a, qtune_4.b]                                   |
| Sort [qtune_4.b]                                                 |
| TableScan test1.qtune_4 table_type:sharded_rowstore              |
+------------------------------------------------------------------+

```

With an index, SingleStore can eliminate the need to sort:

```sql
ALTER TABLE qtune_4 ADD INDEX (b);
EXPLAIN SELECT * FROM qtune_4 ORDER BY b;

```

```output

+----------------------------------------+
| EXPLAIN                                |
+----------------------------------------+
| Project [qtune_4.a, qtune_4.b]         |
| GatherMerge [qtune_4.b] partitions:all |
| Project [qtune_4.a, qtune_4.b]         |
| TableScan test1.qtune_4, KEY b (b)     |
+----------------------------------------+

```

As mentioned above, it's a best practice to set a sort key on columnstore tables to take advantage of segment elimination on queries with `GROUP BY` and `ORDER BY` clauses.&#x20;

## Fanout vs Single-Partition Queries

SingleStore’s distributed architecture takes advantage of CPUs from many servers to execute your queries. This provide for extremely fast performance on aggregation queries that scan millions of rows. However, for transactional queries that select relatively few rows. It is best for each query to only involve a single partition. When a query has equality filters that completely match the shard key of the table, SingleStore can optimize it to only require execution on a single partition.

Using this table as an example:

```sql
CREATE TABLE urls (
  domain varchar(128),
  path varchar(8192),
  time_sec int,
  status_code binary(3),
  ...
  shard key (domain, path, time_sec)
);

```

The following query only involves a single partition because it has equality filters on all columns of the shard key, so the rows which match can only be on a single partition. Using `EXPLAIN`, `Gather partitions:single` indicates SingleStore is using a single-partition plan.

```sql
EXPLAIN SELECT status_code FROM urls
    WHERE domain = 'youtube.com' AND path = '/watch?v=euh_uqxwk58' AND time_sec = 1;

```

```output

+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single                                                                                                                              |
| Project [urls.status_code]                                                                                                                            |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58" AND time_sec = 1] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+

```

The following query which does not filter on `time_sec`, does not match a single partition. Therefore, the query requires selecting from all partitions, as indicated by `Gather partitions:all`.

```sql
EXPLAIN SELECT status_code FROM urls
    WHERE domain = 'youtube.com'AND path = '/watch?v=euh_uqxwk58';

```

```output

+--------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Project [urls.status_code]                                                                                                           |
| Gather partitions:all                                                                                                                |
| Project [urls.status_code]                                                                                                           |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58"] |
+--------------------------------------------------------------------------------------------------------------------------------------

```

To fix this, the table `urls` could be sharded on `domain`. This would make it easier to write queries that route to a single partition. However, some domains will have more pages than other domains and this could lead to data skew. Choosing a shard key is often a balancing act: we want the least restrictive shard key possible while also ensuring that we do not have data skew. A good compromise in this case would be to shard on `(domain,path)`. For more on choosing a performant shard key, see the related step in the [Optimizing Table Data Structures](https://docs.singlestore.com/db/v9.1/create-a-database/optimizing-table-data-structures.md) guide. If these types of performance issues are common, SingleStore recommends reviewing your table data structures overall using the full guide.

## Distributed Joins

SingleStore’s query execution architecture allows you to run arbitrary SQL queries on any table regardless of data distribution. However, you can often improve performance by optimizing your schema to minimize data movement during query execution.

## Collocating Joins

Consider the following tables:

```sql
CREATE TABLE lineitem(
    l_orderkey INT NOT NULL,
    l_linenumber INT NOT NULL,
    ...
    PRIMARY KEY(l_orderkey, l_linenumber)
);

```

```sql
CREATE TABLE orders(
    o_orderkey INT NOT NULL,
    ...
    PRIMARY KEY(o_orderkey)
);

```

When `lineitem` and `orders` are joined with the current schema, a distributed join is performed and data is repartitioned from the `lineitem` table.

```sql
EXPLAIN SELECT COUNT(*) FROM lineitem
    JOIN orders ON o_orderkey = l_orderkey;

```

```output

+---------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------+
| Project [`COUNT(*)`]                                                                                                            |
| Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`]                                                                                       |
| Gather partitions:all est_rows:1                                                                                                |
| Project [`COUNT(*)`] est_rows:1 est_select_cost:1762812                                                                         |
| Aggregate [COUNT(*) AS `COUNT(*)`]                                                                                              |
| NestedLoopJoin                                                                                                                  |
| |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = r0.l_orderkey] est_table_rows:565020 est_filtered:565020 |
| TableScan r0 storage:list stream:no                                                                                             |
| Repartition [lineitem.l_orderkey] AS r0 shard_key:[l_orderkey] est_rows:587604                                                  |
| TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) est_table_rows:587604 est_filtered:587604                       |
+---------------------------------------------------------------------------------------------------------------------------------+

```

The performance of this query can be improved by adding an explicit shard key to the `lineitem`table on `l_orderkey`. Now we can perform a local join between `lineitem` and `orders`.

```
+--------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                            |
+--------------------------------------------------------------------------------------------------------------------+
| Project [`COUNT(*)`]                                                                                               |
| Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`]                                                                          |
| Gather partitions:all                                                                                              |
| Project [`COUNT(*)`]                                                                                               |
| Aggregate [COUNT(*) AS `COUNT(*)`]                                                                                 |
| ChoosePlan                                                                                                         |
| |   :estimate                                                                                                      |
| |       SELECT COUNT(*) AS cost FROM test.lineitem                                                                 |
| |       SELECT COUNT(*) AS cost FROM test.orders                                                                   |
| |---NestedLoopJoin                                                                                                 |
| |   |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = lineitem.l_orderkey]                    |
| |   TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber)                                                |
| +---NestedLoopJoin                                                                                                 |
|     |---IndexRangeScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) scan:[l_orderkey = orders.o_orderkey] |
|     TableScan test.orders, PRIMARY KEY (o_orderkey)                                                                |
+--------------------------------------------------------------------------------------------------------------------+

```

## Reference Table Joins

Reference tables are replicated to each aggregator and leaf in the cluster, therefore their use case is best for data that is small and slowly changing. The rate at which this data changes depends on the application, but a rule of thumb is 1 million rows updated once a day.

Consider the following schema:

```sql
CREATE TABLE customer(
    c_custkey INT NOT NULL,
    c_nationkey INT NOT NULL,
    ...
    PRIMARY KEY(c_custkey),
    key(c_nationkey)
);

```

```sql
CREATE TABLE nation(
    n_nationkey INT NOT NULL,
    ...
    PRIMARY KEY(n_nationkey)
);

```

With the current schema, when we join the `customer` and `nation` tables together on `nationkey`, the `nation` table must be broadcast each time the query is run.

```sql
EXPLAIN SELECT COUNT(*) FROM customer
    JOIN nation ON n_nationkey = c_nationkey;

```

```output

+-------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                                                            |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                                                       |
| Gather partitions:all est_rows:1                                                                                                                |
| Project [`Count(*)`] est_rows:1 est_select_cost:1860408                                                                                         |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                                                              |
| NestedLoopJoin                                                                                                                                  |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = r1.n_nationkey] est_table_rows:1856808 est_filtered:1856808 |
| TableScan r1 storage:list stream:no                                                                                                             |
| Broadcast [nation.n_nationkey] AS r1 est_rows:300                                                                                               |
| TableScan test.nation, PRIMARY KEY (n_nationkey) est_table_rows:300 est_filtered:300                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------+

```

The broadcast can be avoided by making `nation` a [reference](https://docs.singlestore.com/db/v9.1/create-a-database/other-schema-concepts.md) table as it is relatively small and changes rarely. While broadcasting such a small table will likely have a negligible effect on single-query latency, repeatedly doing so can have an outsize effect on concurrent workloads.

```sql
CREATE REFERENCE TABLE nation(
    n_nationkey INT NOT NULL,
    ...
    PRIMARY KEY(n_nationkey)
);

```

```sql
EXPLAIN SELECT COUNT(*) FROM customer
    JOIN nation ON n_nationkey = c_nationkey;

```

```output

+-------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                        |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                   |
| Gather partitions:all                                                                                       |
| Project [`Count(*)`]                                                                                        |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                          |
| ChoosePlan                                                                                                  |
| |   :estimate                                                                                               |
| |       SELECT COUNT(*) AS cost FROM test.customer                                                          |
| |       SELECT COUNT(*) AS cost FROM test.nation                                                            |
| |---NestedLoopJoin                                                                                          |
| |   |---IndexSeek test.nation, PRIMARY KEY (n_nationkey) scan:[n_nationkey = customer.c_nationkey]          |
| |   TableScan test.customer, PRIMARY KEY (c_custkey)                                                        |
| +---NestedLoopJoin                                                                                          |
|     |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = nation.n_nationkey] |
|     TableScan test.nation, PRIMARY KEY (n_nationkey)                                                        |
+-------------------------------------------------------------------------------------------------------------+

```

## Joins on the Aggregator

Consider the following schema and row counts:

```sql
CREATE TABLE customer(
    c_custkey INT NOT NULL,
    c_acctbal DECIMAL(15,2) NOT NULL,
    PRIMARY KEY(c_custkey)
);
```

```sql
CREATE TABLE orders(
    o_orderkey INT NOT NULL,
    o_custkey INT NOT NULL,
    o_orderstatus varchar(20) NOT NULL,
    PRIMARY KEY(o_orderkey),
    key(o_custkey)
);
```

```
SELECT COUNT(*) FROM orders;

```

```output

+----------+
| COUNT(*) |
+----------+
|   429786 |
+----------+
```

```sql
SELECT COUNT(*) FROM orders WHERE o_orderstatus = 'open';

```

```output

+----------+
| COUNT(*) |
+----------+
|     1000 |
+----------+
```

```sql
SELECT COUNT(*) FROM orders WHERE o_orderstatus = 'open';

```

```output

+----------+
| COUNT(*) |
+----------+
|     1000 |
+----------+
```

```sql
SELECT COUNT(*) FROM customer;

```

```output

+----------+
| COUNT(*) |
+----------+
|  1014726 |
+----------+
```

```sql
SELECT COUNT(*) FROM customer WHERE c_acctbal > 100.0;

```

```output

+----------+
| COUNT(*) |
+----------+
|      988 |
+----------+

```

Note that while `customer` and `orders` are fairly large, when a query is filtered on open orders and account balances greater than 100, relatively few rows match. As a result, when `orders` and `customer` are joined using these filters, the join can be performed on an aggregator. This is shown by `EXPLAIN` as having a separate `Gather` operator for `orders` and `customer` and a `HashJoin` operator above the `Gather` .

```sql
EXPLAIN SELECT o_orderkey FROM customer
    JOIN  orders
    WHERE c_acctbal > 100.0 AND o_orderstatus = 'open' AND o_custkey = c_custkey;

```

```output

+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Project [orders.o_orderkey]                                      |
| HashJoin [orders.o_custkey = customer.c_custkey]                 |
| |---TempTable                                                    |
| |   Gather partitions:all                                        |
| |   Project [orders_0.o_orderkey, orders_0.o_custkey]            |
| |   Filter [orders_0.o_orderstatus = "open"]                     |
| |   TableScan test3.orders AS orders_0, PRIMARY KEY (o_orderkey) |
| TableScan 0tmp AS customer storage:list stream:yes               |
| TempTable                                                        |
| Gather partitions:all                                            |
| Project [customer_0.c_custkey]                                   |
| Filter [customer_0.c_acctbal > 100.0]                            |
| TableScan test3.customer AS customer_0, PRIMARY KEY (c_custkey)  |
+------------------------------------------------------------------+

```

By default, SingleStore will perform this optimization when each `Gather` pulls less than 120,000 rows from the leaves. This threshold can be changed via the `max_subselect_aggregator_rowcount` variable. The optimization on this query can also be disabled manually via the `leaf_pushdown` hint. The `leaf_pushdown` hint forces the optimizer to perform as much work as possible on the leaf nodes.

```sql
EXPLAIN SELECT WITH(LEAF_PUSHDOWN=TRUE) o_orderkey FROM customer
    JOIN orders
    WHERE c_acctbal > 100.0 AND o_orderstatus = 'open' AND o_custkey = c_custkey;

```

```output

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

```

```sql
SET max_subselect_aggregator_rowcount=500;
```

```sql
EXPLAIN SELECT o_orderkey FROM customer
    JOIN   orders
    WHERE  c_acctbal > 100.0 AND o_orderstatus = 'open' AND    o_custkey = c_custkey;

```

```output

+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+
```

## Managing Concurrency for Distributed Joins

Queries that require cross-shard data movement, i.e. queries which involve broadcasts and repartitions, generally require much larger numbers of connections and threads than other queries. The concurrency of such queries is automatically managed by [workload management](https://docs.singlestore.com/db/v9.1/user-and-cluster-administration/use-the-workload-manager-and-set-resource-limits/use-the-workload-manager.md).

In most scenarios, the default settings for workload management will schedule your workload appropriately to utilize cluster resources without exhausting connection and thread limits. If you encounter performance issues with distributed join queries run at high concurrency, see [workload management](https://docs.singlestore.com/db/v9.1/user-and-cluster-administration/use-the-workload-manager-and-set-resource-limits/use-the-workload-manager.md) to learn how to configure workload management settings.

## Feedback Reoptimization

This feature allows you to auto-tune a query’s performance issues caused by a poor plan created using inaccurate estimates. It reoptimizes the query by feeding in the statistics collected from the query's first run back into the optimizer. It helps resolve performance issues without adding any hints or query rewrites.

The Feedback Reoptimization process is as follows. Run the respective commands in sequence to auto-tune a query:

1. Profile a query.
   ```
   PROFILE <query_text>;
   ```

2. Run the reoptimization command which recompiles the query with the stats from the latest profile.
   ```
   REOPTIMIZE;
   ```

3. Save the query so that it replaces the existing plan in the plan cache.
   ```
   REOPTIMIZE COMMIT;
   ```

A query needs a profile before it can be reoptimized. A profile collects statistics such as join sizes and filter selectivity, which help to make decisions around join order, data movement, and the hash build.

The `REOPTIMIZE` command causes the optimizer to run again for the profiled query, using the runtime information instead of the estimated value. You can run the `SHOW PROFILE JSON` command after the `REOPTIMIZE` command to view the latest profile information.

To use the new plan, run the `REOPTIMIZE COMMIT` command which creates a new plan entry for the original query text. If plan pinning is enabled, the command will also pin the reoptimized plan to the original query text.

A query can be reoptimized many times. Each time `REOPTIMIZE` is run, a new plan is generated, and the system learns more about how the plans perform and can make more informed decisions for future plans. To reoptimize a query again, re-run the `REOPTIMIZE` command . To run a specific reoptimized plan, add the reoptimize level to the command, e.g. `REOPTIMIZE 1` will run the first reoptimized plan generated and `REOPTIMIZE 2` will run the second reoptimized plan generated. Once you have generated a plan you want to use for your original query, you can commit that plan via `REOPTIMIZE COMMIT <reoptimize_level>;`. After doing so, the next time you run your original query, the system will use the reoptimized plan you committed.

## Related Topics

* Training: [Query Tuning](https://training.singlestore.com/learn/course/internal/view/elearning/634/query-tuning)
* Training: [Performance Benchmarking](https://training.singlestore.com/learn/course/internal/view/elearning/618/performance-benchmarking)
* Guide: [Optimizing Table Data Structures](https://docs.singlestore.com/db/v9.1/create-a-database/optimizing-table-data-structures.md)

## In this section

* [Cross Query Stats and Sampling Cache](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/cross-query-stats-and-sampling-cache.md)
* [Feedback Reoptimization](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/feedback-reoptimization.md)
* [Join Memory Reduction Optimization](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/join-memory-reduction-optimization.md)
* [Query Performance Tools](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/query-performance-tools.md)
* [Query History](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/query-history.md)
* [SingleStore Visual Explain](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/singlestore-visual-explain.md)
* [Statistics and Sampling](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/statistics-and-sampling.md)
* [Testing Your Queries and Performance](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/testing-your-queries-and-performance.md)
* [Troubleshooting Poorly Performing Queries](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/troubleshooting-poorly-performing-queries.md)
* [Workload Profiling](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/workload-profiling.md)

***

Modified at: July 18, 2025

Source: [/db/v9.1/query-data/query-tuning/](https://docs.singlestore.com/db/v9.1/query-data/query-tuning/)

(An index of the documentation is available at /llms.txt)
