Query Tuning

Things to Consider Before Query Tuning

Before creating your database, it's important to consider the workload that will be placed upon it. The three variables that can be optimized to increase performance are concurrency, throughput, and latency.

Concurrency is the number of queries that are executing a on the database the same time. If your workload is primarily lightweight inserts and updates on your table concurrency will like be low. However, if a large number of those operations occur simultaneously, it can negatively impact overall performance.

Throughput is the number of queries that are executing per unit of time. It is a function of concurrency/unit of time. If your workload is primarily lightweight inserts and updates on a table, perhaps those queries can be encapsulated within stored procedures to prevent repeated compilation.

Latency is the query execution time. This variable can be optimized by following query performance best practices, such as by choosing the best database schema, table type, and keys for your business needs and workloads. Issues that are beyond the database engine's control can also cause latency, such as poor network connectivity.

Lastly, your cluster size will determine the throughput and performance of databases in the cluster. A larger size will typically result in better query performance, lower latencies, and a higher number of concurrent queries. 

You can utilize Use the Workload Manager and Set Resource Limits as tools to manage cluster workload and define resource pools that can improve query performance.

Tradeoff between Aggregator Count and Leaf Count

When high concurrency is of the utmost importance, it is best to have more aggregator nodes than leaf nodes since the aggregator receives data requests from users.

Conversely, if low latency is more important than high concurrently, then it is better to have more leaf nodes (and therefore, more partitions) since the data is stored on leaf nodes.

To adjust your aggregators and leaf node counts, please see the Cluster Management Commands page.

Run ANALYZE

The ANALYZE command collects data statistics on a table to facilitate accurate query optimization. This is especially important for optimizing complex analytical queries. You should ANALYZE your tables after inserting, updating, or deleting large numbers of rows (30% of your table row count is a good rule of thumb). See the Statistics and Sampling topic for more information.

Look for Hot Queries

You can do query analysis on hot running queries by using the Workload Monitoring and SingleStore Visual Explain features, or by running the SHOW PLANCACHE command.

For example, you can use a query like the following to see the select queries with the highest average execution time. Of course, you can modify the query to filter by database, look for specific types of queries, find queries using the most total time or the highest memory use, etc.

SELECT Database_Name, Query_Text, Commits, Execution_Time, Execution_Time/Commits AS avg_time_ms, Average_Memory_Use
FROM information_schema.plancache
WHERE Query_Text LIKE 'select%'
ORDER BY avg_time_ms DESC;

Check if Queries are Using Indexes

One important query performance consideration is adding appropriate indexes for your queries. You can use EXPLAIN to see whether queries are using indexes. The following examples show how to identify cases where  an index can greatly improve query performance.

Indexes for filters

Consider an example table:

CREATE TABLE t (a int, b int);

Suppose we are running queries like:

SELECT * FROM t WHERE a=3;

EXPLAIN shows that running the query against the current table schema requires a full Table Scan - scanning all the rows of t, which is unnecessarily expensive if a small fraction of the values in t equal 3.

EXPLAIN SELECT * FROM t WHERE a=3;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.a = 3]      |
| TableScan db.t        |
+-----------------------+

If an index is added, the query will instead use an Index Range Scan on the key a:

ALTER TABLE t ADD INDEX (a);
EXPLAIN SELECT * FROM t WHERE a=3;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

A query that filters on both a and b is unable to take advantage of the filtering on b to reduce the rows that need to be scanned. As demonstrated below, the scan uses a=3 only.

EXPLAIN SELECT * FROM t WHERE a=3 AND b=4;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.a, t.b]                          |
| Gather partitions:all                       |
| Project [t.a, t.b]                          |
| Filter [t.b = 4]                            |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+

Adding an index on (a, b) allows the query to scan more selectively:

ALTER TABLE t add index (a, b);
EXPLAIN SELECT * FROM t WHERE a=3 AND b=4;
+------------------------------------------------------------+
| EXPLAIN                                                    |
+------------------------------------------------------------+
| Project [t.a, t.b]                                         |
| Gather partitions:all                                      |
| Project [t.a, t.b]                                         |
| IndexRangeScan db.t, KEY a_2 (a, b) scan:[a = 3 AND b = 4] |
+------------------------------------------------------------+

A query that filters on b still does not match an index, since the query filters must match a prefix of the index column list to be able to effectively take advantage of the index:

EXPLAIN SELECT * FROM t WHERE b=4;
+-----------------------+
| EXPLAIN               |
+-----------------------+
| Project [t.a, t.b]    |
| Gather partitions:all |
| Project [t.a, t.b]    |
| Filter [t.b = 4]      |
| TableScan db.t        |
+-----------------------+

For columnstore tables, it's a best practice to set a sort key the table to take advantage of segment elimination.

Segment elimination occurs when filtering on the sort key. The minimum/maximum value metadata for each segment is used at query execution time to determine whether a segment can match a filter; if not, the segment is skipped entirely and no data is examined.

Consider an example columnstore table with an explicit sort key:

CREATE TABLE t (a int, b int, SORT KEY(a));

Suppose the following queries are run:

SELECT * FROM t WHERE a=3;

EXPLAIN shows that running the query against this table schema requires a ColumnStoreScan - scanning the segments (column a) which contain values equal to 3. All segments not containing the value equal to 3 are not scanned. Furthermore, since segments are ordered by min and max values, the engine can easily locate the segment that contains '3'.

 EXPLAIN SELECT * FROM t WHERE a = 3;
+-----------------------------------------------------------------------------------------------+| 
| EXPLAIN                                                                                        |
+-----------------------------------------------------------------------------------------------+| 
| Gather partitions:all alias:remote_0                                                           |
| Project [t.a, t.b]                                                                             |
| ColumnStoreFilter [t.a = 3]                                                                    |
| ColumnStoreScan test4.t, KEY a (a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore  |
+----------------------------------------------------------------------------------------------- +

Indexes for group-by and order-by

Another class of cases where indexes can improve query performance is GROUP BY and ORDER BY.

Consider this example table:

CREATE TABLE t (a int, b int);

Consider the following query:

EXPLAIN SELECT a, sum(b) FROM t GROUP BY a;
+------------------------------------------------------+
| EXPLAIN                                              |
+------------------------------------------------------+
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| Gather partitions:all                                |
| Project [t.a, `sum(b)`]                              |
| HashGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t                                       |
+------------------------------------------------------+

Executing the above query requires a HashGroupBy. SingleStore builds a hash table with an entry for each group of a.

However, with an index on a, SingleStore can execute the query with a StreamingGroupBy because by scanning the index on a, it can process all elements of a group consecutively.

ALTER TABLE t add index (a);
EXPLAIN SELECT a, sum(b) FROM t GROUP BY a;
+-----------------------------------------------------------+
| EXPLAIN                                                   |
+-----------------------------------------------------------+
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| GatherMerge [t.a] partitions:all                          |
| Project [t.a, `sum(b)`]                                   |
| StreamingGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a]      |
| TableScan db.t, KEY a (a)                                 |
+-----------------------------------------------------------+

Similarly, for ORDER BY; without an index, SingleStore needs to sort:

EXPLAIN SELECT * FROM t ORDER BY b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| Sort [t.b]                       |
| TableScan db.t                   |
+----------------------------------+

With an index, SingleStore can eliminate the need to sort:

ALTER TABLE t ADD INDEX (b);
EXPLAIN SELECT * FROM t ORDER BY b;
+----------------------------------+
| EXPLAIN                          |
+----------------------------------+
| Project [t.a, t.b]               |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b]               |
| TableScan db.t, KEY b (b)        |
+----------------------------------+

Fanout vs Single-Partition Queries

SingleStore’s distributed architecture takes advantage of CPUs from many servers to execute your queries. This provide for extremely fast performance on aggregation queries that scan millions of rows. However, for transactional queries that select relatively few rows. It is best for each query to only involve a single partition. When a query has equality filters that completely match the shard key of the table, SingleStore can optimize it to only require execution on a single partition.

Using this table as an example:

CREATE TABLE urls (
domain varchar(128),
path varchar(8192),
time_sec int,
status_code binary(3),
...
shard key (domain, path, time_sec)
);

The following query only involves a single partition because it has equality filters on all columns of the shard key, so the rows which match can only be on a single partition. Using EXPLAIN, Gather partitions:single indicates SingleStore is using a single-partition plan.

EXPLAIN SELECT status_code
FROM urls
WHERE domain = 'youtube.com'
AND path = '/watch?v=euh_uqxwk58'
AND time_sec = 1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                               |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single                                                                                                                              |
| Project [urls.status_code]                                                                                                                            |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58" AND time_sec = 1] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.66 sec)

The following query which does not filter on time_sec, does not match a single partition. Therefore, the query requires selecting from all partitions, as indicated by Gather partitions:all.

EXPLAIN SELECT status_code
FROM urls
WHERE domain = 'youtube.com'
AND path = '/watch?v=euh_uqxwk58';
+--------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Project [urls.status_code]                                                                                                           |
| Gather partitions:all                                                                                                                |
| Project [urls.status_code]                                                                                                           |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58"] |
+--------------------------------------------------------------------------------------------------------------------------------------

To fix this, the table urls could be sharded on domain. This would make it easier to write queries that route to a single partition. However, some domains will have more pages than other domains and this could lead to data skew. Choosing a shard key is often a balancing act: we want the least restrictive shard key possible while also ensuring that we do not have data skew. A good compromise in this case would be to shard on (domain,path). For more on choosing a performant shard key, see the related step in the Optimizing Table Data Structures guide. If these types of performance issues are common, SingleStore recommends reviewing your table data structures overall using the full guide.

Distributed Joins

SingleStore’s query execution architecture allows you to run arbitrary SQL queries on any table regardless of data distribution. However, you can often improve performance by optimizing your schema to minimize data movement during query execution.

Collocating Joins

Consider the following tables:

CREATE TABLE lineitem(
l_orderkey INT NOT NULL,
l_linenumber INT NOT NULL,
...
PRIMARY KEY(l_orderkey, l_linenumber)
);
CREATE TABLE orders(
o_orderkey INT NOT NULL,
...
PRIMARY KEY(o_orderkey)
);

When lineitem and orders are joined with the current schema, a distributed join is performed and data is repartitioned from the lineitem table.

EXPLAIN SELECT Count(*)
FROM lineitem
JOIN orders
ON o_orderkey = l_orderkey ;
+---------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------+
| Project [`COUNT(*)`]                                                                                                            |
| Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`]                                                                                       |
| Gather partitions:all est_rows:1                                                                                                |
| Project [`COUNT(*)`] est_rows:1 est_select_cost:1762812                                                                         |
| Aggregate [COUNT(*) AS `COUNT(*)`]                                                                                              |
| NestedLoopJoin                                                                                                                  |
| |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = r0.l_orderkey] est_table_rows:565020 est_filtered:565020 |
| TableScan r0 storage:list stream:no                                                                                             |
| Repartition [lineitem.l_orderkey] AS r0 shard_key:[l_orderkey] est_rows:587604                                                  |
| TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) est_table_rows:587604 est_filtered:587604                       |
+---------------------------------------------------------------------------------------------------------------------------------+

The performance of this query can be improved by adding an explicit shard key to the lineitemtable on l_orderkey. Now we can perform a local join between lineitem and orders.

+--------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+--------------------------------------------------------------------------------------------------------------------+
| Project [`COUNT(*)`] |
| Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`] |
| Gather partitions:all |
| Project [`COUNT(*)`] |
| Aggregate [COUNT(*) AS `COUNT(*)`] |
| ChoosePlan |
| | :estimate |
| | SELECT COUNT(*) AS cost FROM test.lineitem |
| | SELECT COUNT(*) AS cost FROM test.orders |
| |---NestedLoopJoin |
| | |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = lineitem.l_orderkey] |
| | TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) |
| +---NestedLoopJoin |
| |---IndexRangeScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) scan:[l_orderkey = orders.o_orderkey] |
| TableScan test.orders, PRIMARY KEY (o_orderkey) |
+--------------------------------------------------------------------------------------------------------------------+

Reference Table Joins

Reference tables are replicated to each aggregator and leaf in the cluster, therefore their use case is best for data that is small and slowly changing. The rate at which this data changes depends on the application, but a rule of thumb is 1 million rows updated once a day.

Consider the following schema:

CREATE TABLE customer(
c_custkey INT NOT NULL,
c_nationkey INT NOT NULL,
...
PRIMARY KEY(c_custkey),
key(c_nationkey)
);
CREATE TABLE nation(
n_nationkey INT NOT NULL,
...
PRIMARY KEY(n_nationkey)
);

With the current schema, when we join the customer and nation tables together on nationkey, the nation table must be broadcast each time the query is run.

EXPLAIN SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                         |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                                                            |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                                                       |
| Gather partitions:all est_rows:1                                                                                                                |
| Project [`Count(*)`] est_rows:1 est_select_cost:1860408                                                                                         |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                                                              |
| NestedLoopJoin                                                                                                                                  |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = r1.n_nationkey] est_table_rows:1856808 est_filtered:1856808 |
| TableScan r1 storage:list stream:no                                                                                                             |
| Broadcast [nation.n_nationkey] AS r1 est_rows:300                                                                                               |
| TableScan test.nation, PRIMARY KEY (n_nationkey) est_table_rows:300 est_filtered:300                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------+

The broadcast can be avoided by making nation a reference table as it is relatively small and changes rarely. While broadcasting such a small table will likely have a negligible effect on single-query latency, repeatedly doing so can have an outsize effect on concurrent workloads.

CREATE REFERENCE TABLE nation(
n_nationkey INT NOT NULL,
...
PRIMARY KEY(n_nationkey)
);
EXPLAIN SELECT Count(*)
FROM customer
JOIN nation
ON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`]                                                                                        |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`]                                                                   |
| Gather partitions:all                                                                                       |
| Project [`Count(*)`]                                                                                        |
| Aggregate [COUNT(*) AS `Count(*)`]                                                                          |
| ChoosePlan                                                                                                  |
| |   :estimate                                                                                               |
| |       SELECT COUNT(*) AS cost FROM test.customer                                                          |
| |       SELECT COUNT(*) AS cost FROM test.nation                                                            |
| |---NestedLoopJoin                                                                                          |
| |   |---IndexSeek test.nation, PRIMARY KEY (n_nationkey) scan:[n_nationkey = customer.c_nationkey]          |
| |   TableScan test.customer, PRIMARY KEY (c_custkey)                                                        |
| +---NestedLoopJoin                                                                                          |
|     |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = nation.n_nationkey] |
|     TableScan test.nation, PRIMARY KEY (n_nationkey)                                                        |
+-------------------------------------------------------------------------------------------------------------+

Joins on the Aggregator

Consider the following schema and row counts:

CREATE TABLE customer(
c_custkey INT NOT NULL,
c_acctbal decimal(15,2) not null,
PRIMARY KEY(c_custkey)
);
CREATE TABLE orders(
o_orderkey INT NOT NULL,
o_custkey INT NOT NULL,
o_orderstatus varchar(20) not null,
PRIMARY KEY(o_orderkey),
key(o_custkey)
);
SELECT COUNT(*) from orders;
+----------+
| COUNT(*) |
+----------+
|   429786 |
+----------+
SELECT COUNT(*) from orders where o_orderstatus = 'open';
+----------+
| COUNT(*) |
+----------+
|     1000 |
+----------+
SELECT COUNT(*) from customer;
+----------+
| COUNT(*) |
+----------+
|  1014726 |
+----------+
SELECT COUNT(*) from customer where c_acctbal > 100.0;
+----------+
| COUNT(*) |
+----------+
|      988 |
+----------+
1 row in set (0.06 sec)

Note that while customer and orders are fairly large, when a query is filtered on open orders and account balances greater than 100, relatively few rows match. As a result, when orders and customer are joined using these filters, the join can be performed on an aggregator. This is shown by EXPLAIN as having a separate Gather operator for orders and customer and a HashJoin operator above the Gather .

EXPLAIN SELECT o_orderkey
FROM customer
JOIN orders
where c_acctbal > 100.0
AND o_orderstatus = 'open'
AND o_custkey = c_custkey;
+------------------------------------------------------------------+
| EXPLAIN                                                          |
+------------------------------------------------------------------+
| Project [orders.o_orderkey]                                      |
| HashJoin [orders.o_custkey = customer.c_custkey]                 |
| |---TempTable                                                    |
| |   Gather partitions:all                                        |
| |   Project [orders_0.o_orderkey, orders_0.o_custkey]            |
| |   Filter [orders_0.o_orderstatus = "open"]                     |
| |   TableScan test3.orders AS orders_0, PRIMARY KEY (o_orderkey) |
| TableScan 0tmp AS customer storage:list stream:yes               |
| TempTable                                                        |
| Gather partitions:all                                            |
| Project [customer_0.c_custkey]                                   |
| Filter [customer_0.c_acctbal > 100.0]                            |
| TableScan test3.customer AS customer_0, PRIMARY KEY (c_custkey)  |
+------------------------------------------------------------------+

By default, SingleStore will perform this optimization when each Gather pulls less than 120,000 rows from the leaves. This threshold can be changed via the max_subselect_aggregator_rowcount variable. The optimization on this query can also be disabled manually via the leaf_pushdown hint. The leaf_pushdown hint forces the optimizer to perform as much work as possible on the leaf nodes.

EXPLAIN SELECT WITH(LEAF_PUSHDOWN=TRUE) o_orderkey
FROM customer
JOIN orders
where c_acctbal > 100.0
AND o_orderstatus = 'open'
AND o_custkey = c_custkey;
+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+
SET max_subselect_aggregator_rowcount=500;
EXPLAIN SELECT o_orderkey
FROM customer
JOIN orders
where c_acctbal > 100.0
AND o_orderstatus = 'open'
AND o_custkey = c_custkey;
+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                        |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey]                                                                                                        |
| Gather partitions:all est_rows:11                                                                                              |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955                                                                       |
| Filter [customer.c_acctbal > 100.0]                                                                                            |
| NestedLoopJoin                                                                                                                 |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no                                                                                            |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972                                     |
| Filter [orders.o_orderstatus = "open"]                                                                                         |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972                                         |
+--------------------------------------------------------------------------------------------------------------------------------+

Managing Concurrency for Distributed Joins

Queries that require cross-shard data movement, i.e. queries which involve broadcasts and repartitions, generally require much larger numbers of connections and threads than other queries. The concurrency of such queries is automatically managed by workload management.

In most scenarios, the default settings for workload management will schedule your workload appropriately to utilize cluster resources without exhausting connection and thread limits. If you encounter performance issues with distributed join queries run at high concurrency, see workload management to learn how to configure workload management settings.

In this section

Last modified: March 8, 2024

Was this article helpful?