Query Tuning
On this page
Things to Consider Before Query Tuning
Before creating your database, it's important to consider the workload that will be placed upon it.
Concurrency is the number of queries that are executing a on the database the same time.
Throughput is the number of queries that are executing per unit of time.
Latency is the query execution time.
Lastly, your cluster size will determine the throughput and performance of databases in the cluster.
You can utilize Use the Workload Manager and Set Resource Limits as tools to manage cluster workload and define resource pools that can improve query performance.
Tradeoff between Aggregator Count and Leaf Count
When high concurrency is of the utmost importance, it is best to have more aggregator nodes than leaf nodes since the aggregator receives data requests from users.

Conversely, if low latency is more important than high concurrently, then it is better to have more leaf nodes (and therefore, more partitions) since the data is stored on leaf nodes.

To adjust your aggregators and leaf node counts, please see the Cluster Management Commands page.
Run ANALYZE
The ANALYZE
command collects data statistics on a table to facilitate accurate query optimization.ANALYZE
your tables after inserting, updating, or deleting large numbers of rows (30% of your table row count is a good rule of thumb).
Look for Hot Queries
You can do query analysis on hot running queries by using the Workload Monitoring and SingleStoreDB Visual Explain features, or by running the SHOW PLANCACHE command.
For example, you can use a query like the following to see the select queries with the highest average execution time.
SELECT Database_Name, Query_Text, Commits, Execution_Time, Execution_Time/Commits AS avg_time_ms, Average_Memory_UseFROM information_schema.plancacheWHERE Query_Text LIKE 'select%'ORDER BY avg_time_ms DESC;
Check if Queries are Using Indexes
One important query performance consideration is adding appropriate indexes for your queries.
Indexes for filters
Consider an example table:
CREATE TABLE t (a int, b int);
Suppose we are running queries like:
SELECT * FROM t WHERE a=3;
EXPLAIN shows that running the query against the current table schema requires a full Table Scan - scanning all the rows of t
, which is unnecessarily expensive if a small fraction of the values in t
equal 3.
EXPLAIN SELECT * FROM t WHERE a=3;
+-----------------------+
| EXPLAIN |
+-----------------------+
| Project [t.a, t.b] |
| Gather partitions:all |
| Project [t.a, t.b] |
| Filter [t.a = 3] |
| TableScan db.t |
+-----------------------+
If an index is added, the query will instead use an Index Range Scan on the key a
:
ALTER TABLE t ADD INDEX (a);EXPLAIN SELECT * FROM t WHERE a=3;
+---------------------------------------------+
| EXPLAIN |
+---------------------------------------------+
| Project [t.a, t.b] |
| Gather partitions:all |
| Project [t.a, t.b] |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+
A query that filters on both a
and b
is unable to take advantage of the filtering on b
to reduce the rows that need to be scanned.a=3
only.
EXPLAIN SELECT * FROM t WHERE a=3 AND b=4;
+---------------------------------------------+
| EXPLAIN |
+---------------------------------------------+
| Project [t.a, t.b] |
| Gather partitions:all |
| Project [t.a, t.b] |
| Filter [t.b = 4] |
| IndexRangeScan db.t, KEY a (a) scan:[a = 3] |
+---------------------------------------------+
Adding an index on (a, b)
allows the query to scan more selectively:
ALTER TABLE t add index (a, b);EXPLAIN SELECT * FROM t WHERE a=3 AND b=4;
+------------------------------------------------------------+
| EXPLAIN |
+------------------------------------------------------------+
| Project [t.a, t.b] |
| Gather partitions:all |
| Project [t.a, t.b] |
| IndexRangeScan db.t, KEY a_2 (a, b) scan:[a = 3 AND b = 4] |
+------------------------------------------------------------+
A query that filters on b
still does not match an index, since the query filters must match a prefix of the index column list to be able to effectively take advantage of the index:
EXPLAIN SELECT * FROM t WHERE b=4;
+-----------------------+
| EXPLAIN |
+-----------------------+
| Project [t.a, t.b] |
| Gather partitions:all |
| Project [t.a, t.b] |
| Filter [t.b = 4] |
| TableScan db.t |
+-----------------------+
For columnstore tables, it's a best practice to set a sort key the table to take advantage of segment elimination.
Segment elimination occurs when filtering on the sort key.
Consider an example columnstore table with an explicit sort key:
CREATE TABLE t (a int, b int, SORT KEY(a));
Suppose the following queries are run:
SELECT * FROM t WHERE a=3;
EXPLAIN shows that running the query against this table schema requires a ColumnStoreScan
- scanning the segments (column a
) which contain values equal to 3.
EXPLAIN SELECT * FROM t WHERE a = 3;
+-----------------------------------------------------------------------------------------------+|
| EXPLAIN |
+-----------------------------------------------------------------------------------------------+|
| Gather partitions:all alias:remote_0 |
| Project [t.a, t.b] |
| ColumnStoreFilter [t.a = 3] |
| ColumnStoreScan test4.t, KEY a (a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore |
+----------------------------------------------------------------------------------------------- +
Indexes for group-by and order-by
Another class of cases where indexes can improve query performance is GROUP BY
and ORDER BY
.
Consider this example table:
CREATE TABLE t (a int, b int);
Consider the following query:
EXPLAIN SELECT a, sum(b) FROM t GROUP BY a;
+------------------------------------------------------+
| EXPLAIN |
+------------------------------------------------------+
| Project [t.a, `sum(b)`] |
| HashGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| Gather partitions:all |
| Project [t.a, `sum(b)`] |
| HashGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a] |
| TableScan db.t |
+------------------------------------------------------+
Executing the above query requires a HashGroupBy
.a
.
However, with an index on a
, SingleStore can execute the query with a StreamingGroupBy
because by scanning the index on a
, it can process all elements of a group consecutively.
ALTER TABLE t add index (a);EXPLAIN SELECT a, sum(b) FROM t GROUP BY a;
+-----------------------------------------------------------+
| EXPLAIN |
+-----------------------------------------------------------+
| Project [t.a, `sum(b)`] |
| StreamingGroupBy [SUM(`sum(b)`) AS `sum(b)`] groups:[t.a] |
| GatherMerge [t.a] partitions:all |
| Project [t.a, `sum(b)`] |
| StreamingGroupBy [SUM(t.b) AS `sum(b)`] groups:[t.a] |
| TableScan db.t, KEY a (a) |
+-----------------------------------------------------------+
Similarly, for ORDER BY
; without an index, SingleStore needs to sort:
EXPLAIN SELECT * FROM t ORDER BY b;
+----------------------------------+
| EXPLAIN |
+----------------------------------+
| Project [t.a, t.b] |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b] |
| Sort [t.b] |
| TableScan db.t |
+----------------------------------+
With an index, SingleStore can eliminate the need to sort:
ALTER TABLE t ADD INDEX (b);EXPLAIN SELECT * FROM t ORDER BY b;
+----------------------------------+
| EXPLAIN |
+----------------------------------+
| Project [t.a, t.b] |
| GatherMerge [t.b] partitions:all |
| Project [t.a, t.b] |
| TableScan db.t, KEY b (b) |
+----------------------------------+
Fanout vs Single-Partition Queries
SingleStore’s distributed architecture takes advantage of CPUs from many servers to execute your queries.
Using this table as an example:
CREATE TABLE urls (domain varchar(128),path varchar(8192),time_sec int,status_code binary(3),...shard key (domain, path, time_sec));
The following query only involves a single partition because it has equality filters on all columns of the shard key, so the rows which match can only be on a single partition.EXPLAIN
, Gather partitions:single
indicates SingleStore is using a single-partition plan.
EXPLAIN SELECT status_codeFROM urlsWHERE domain = 'youtube.com'AND path = '/watch?v=euh_uqxwk58'AND time_sec = 1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
| Gather partitions:single |
| Project [urls.status_code] |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58" AND time_sec = 1] |
+-------------------------------------------------------------------------------------------------------------------------------------------------------+
3 rows in set (0.66 sec)
The following query which does not filter on time_
, does not match a single partition.Gather partitions:all
.
EXPLAIN SELECT status_codeFROM urlsWHERE domain = 'youtube.com'AND path = '/watch?v=euh_uqxwk58';
+--------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+--------------------------------------------------------------------------------------------------------------------------------------+
| Project [urls.status_code] |
| Gather partitions:all |
| Project [urls.status_code] |
| IndexRangeScan test2.urls, SHARD KEY domain (domain, path, time_sec) scan:[domain = "youtube.com" AND path = "/watch?v=euh_uqxwk58"] |
+--------------------------------------------------------------------------------------------------------------------------------------
To fix this, the table urls
could be sharded on domain
.(domain,path)
.
Distributed Joins
SingleStore’s query execution architecture allows you to run arbitrary SQL queries on any table regardless of data distribution.
Collocating Joins
Consider the following tables:
CREATE TABLE lineitem(l_orderkey INT NOT NULL,l_linenumber INT NOT NULL,...PRIMARY KEY(l_orderkey, l_linenumber));CREATE TABLE orders(o_orderkey INT NOT NULL,...PRIMARY KEY(o_orderkey));
When lineitem
and orders
are joined with the current schema, a distributed join is performed and data is repartitioned from the lineitem
table.
EXPLAIN SELECT Count(*)FROM lineitemJOIN ordersON o_orderkey = l_orderkey ;
+---------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+---------------------------------------------------------------------------------------------------------------------------------+
| Project [`COUNT(*)`] |
| Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`] |
| Gather partitions:all est_rows:1 |
| Project [`COUNT(*)`] est_rows:1 est_select_cost:1762812 |
| Aggregate [COUNT(*) AS `COUNT(*)`] |
| NestedLoopJoin |
| |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = r0.l_orderkey] est_table_rows:565020 est_filtered:565020 |
| TableScan r0 storage:list stream:no |
| Repartition [lineitem.l_orderkey] AS r0 shard_key:[l_orderkey] est_rows:587604 |
| TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) est_table_rows:587604 est_filtered:587604 |
+---------------------------------------------------------------------------------------------------------------------------------+
The performance of this query can be improved by adding an explicit shard key to the lineitem
table on l_
.lineitem
and orders
.
+--------------------------------------------------------------------------------------------------------------------+| EXPLAIN |+--------------------------------------------------------------------------------------------------------------------+| Project [`COUNT(*)`] || Aggregate [SUM(`COUNT(*)`) AS `COUNT(*)`] || Gather partitions:all || Project [`COUNT(*)`] || Aggregate [COUNT(*) AS `COUNT(*)`] || ChoosePlan || | :estimate || | SELECT COUNT(*) AS cost FROM test.lineitem || | SELECT COUNT(*) AS cost FROM test.orders || |---NestedLoopJoin || | |---IndexSeek test.orders, PRIMARY KEY (o_orderkey) scan:[o_orderkey = lineitem.l_orderkey] || | TableScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) || +---NestedLoopJoin || |---IndexRangeScan test.lineitem, PRIMARY KEY (l_orderkey, l_linenumber) scan:[l_orderkey = orders.o_orderkey] || TableScan test.orders, PRIMARY KEY (o_orderkey) |+--------------------------------------------------------------------------------------------------------------------+
Reference Table Joins
Reference tables are replicated to each aggregator and leaf in the cluster, therefore their use case is best for data that is small and slowly changing.
Consider the following schema:
CREATE TABLE customer(c_custkey INT NOT NULL,c_nationkey INT NOT NULL,...PRIMARY KEY(c_custkey),key(c_nationkey));CREATE TABLE nation(n_nationkey INT NOT NULL,...PRIMARY KEY(n_nationkey));
With the current schema, when we join the customer
and nation
tables together on nationkey
, the nation
table must be broadcast each time the query is run.
EXPLAIN SELECT Count(*)FROM customerJOIN nationON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`] |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`] |
| Gather partitions:all est_rows:1 |
| Project [`Count(*)`] est_rows:1 est_select_cost:1860408 |
| Aggregate [COUNT(*) AS `Count(*)`] |
| NestedLoopJoin |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = r1.n_nationkey] est_table_rows:1856808 est_filtered:1856808 |
| TableScan r1 storage:list stream:no |
| Broadcast [nation.n_nationkey] AS r1 est_rows:300 |
| TableScan test.nation, PRIMARY KEY (n_nationkey) est_table_rows:300 est_filtered:300 |
+-------------------------------------------------------------------------------------------------------------------------------------------------+
The broadcast can be avoided by making nation
a reference table as it is relatively small and changes rarely.
CREATE REFERENCE TABLE nation(n_nationkey INT NOT NULL,...PRIMARY KEY(n_nationkey));EXPLAIN SELECT Count(*)FROM customerJOIN nationON n_nationkey = c_nationkey;
+-------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------+
| Project [`Count(*)`] |
| Aggregate [SUM(`Count(*)`) AS `Count(*)`] |
| Gather partitions:all |
| Project [`Count(*)`] |
| Aggregate [COUNT(*) AS `Count(*)`] |
| ChoosePlan |
| | :estimate |
| | SELECT COUNT(*) AS cost FROM test.customer |
| | SELECT COUNT(*) AS cost FROM test.nation |
| |---NestedLoopJoin |
| | |---IndexSeek test.nation, PRIMARY KEY (n_nationkey) scan:[n_nationkey = customer.c_nationkey] |
| | TableScan test.customer, PRIMARY KEY (c_custkey) |
| +---NestedLoopJoin |
| |---IndexRangeScan test.customer, KEY c_nationkey (c_nationkey) scan:[c_nationkey = nation.n_nationkey] |
| TableScan test.nation, PRIMARY KEY (n_nationkey) |
+-------------------------------------------------------------------------------------------------------------+
Joins on the Aggregator
Consider the following schema and row counts:
CREATE TABLE customer(c_custkey INT NOT NULL,c_acctbal decimal(15,2) not null,PRIMARY KEY(c_custkey));CREATE TABLE orders(o_orderkey INT NOT NULL,o_custkey INT NOT NULL,o_orderstatus varchar(20) not null,PRIMARY KEY(o_orderkey),key(o_custkey));SELECT COUNT(*) from orders;
+----------+
| COUNT(*) |
+----------+
| 429786 |
+----------+
SELECT COUNT(*) from orders where o_orderstatus = 'open';
+----------+
| COUNT(*) |
+----------+
| 1000 |
+----------+
SELECT COUNT(*) from customer;
+----------+
| COUNT(*) |
+----------+
| 1014726 |
+----------+
SELECT COUNT(*) from customer where c_acctbal > 100.0;
+----------+
| COUNT(*) |
+----------+
| 988 |
+----------+
1 row in set (0.06 sec)
Note that while customer
and orders
are fairly large, when a query is filtered on open orders and account balances greater than 100, relatively few rows match.orders
and customer
are joined using these filters, the join can be performed on an aggregator.EXPLAIN
as having a separate Gather
operator for orders
and customer
and a HashJoin
operator above the Gather
.
EXPLAIN SELECT o_orderkeyFROM customerJOIN orderswhere c_acctbal > 100.0AND o_orderstatus = 'open'AND o_custkey = c_custkey;
+------------------------------------------------------------------+
| EXPLAIN |
+------------------------------------------------------------------+
| Project [orders.o_orderkey] |
| HashJoin [orders.o_custkey = customer.c_custkey] |
| |---TempTable |
| | Gather partitions:all |
| | Project [orders_0.o_orderkey, orders_0.o_custkey] |
| | Filter [orders_0.o_orderstatus = "open"] |
| | TableScan test3.orders AS orders_0, PRIMARY KEY (o_orderkey) |
| TableScan 0tmp AS customer storage:list stream:yes |
| TempTable |
| Gather partitions:all |
| Project [customer_0.c_custkey] |
| Filter [customer_0.c_acctbal > 100.0] |
| TableScan test3.customer AS customer_0, PRIMARY KEY (c_custkey) |
+------------------------------------------------------------------+
By default, SingleStore will perform this optimization when each Gather
pulls less than 120,000 rows from the leaves.max_
variable.leaf_
hint.leaf_
hint forces the optimizer to perform as much work as possible on the leaf nodes.
EXPLAIN SELECT WITH(LEAF_PUSHDOWN=TRUE) o_orderkeyFROM customerJOIN orderswhere c_acctbal > 100.0AND o_orderstatus = 'open'AND o_custkey = c_custkey;
+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey] |
| Gather partitions:all est_rows:11 |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955 |
| Filter [customer.c_acctbal > 100.0] |
| NestedLoopJoin |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972 |
| Filter [orders.o_orderstatus = "open"] |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972 |
+--------------------------------------------------------------------------------------------------------------------------------+
SET max_subselect_aggregator_rowcount=500;EXPLAIN SELECT o_orderkeyFROM customerJOIN orderswhere c_acctbal > 100.0AND o_orderstatus = 'open'AND o_custkey = c_custkey;
+--------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN |
+--------------------------------------------------------------------------------------------------------------------------------+
| Project [r0.o_orderkey] |
| Gather partitions:all est_rows:11 |
| Project [r0.o_orderkey] est_rows:11 est_select_cost:1955 |
| Filter [customer.c_acctbal > 100.0] |
| NestedLoopJoin |
| |---IndexSeek test3.customer, PRIMARY KEY (c_custkey) scan:[c_custkey = r0.o_custkey] est_table_rows:1013436 est_filtered:1092 |
| TableScan r0 storage:list stream:no |
| Repartition [orders.o_orderkey, orders.o_custkey] AS r0 shard_key:[o_custkey] est_rows:972 |
| Filter [orders.o_orderstatus = "open"] |
| TableScan test3.orders, PRIMARY KEY (o_orderkey) est_table_rows:92628 est_filtered:972 |
+--------------------------------------------------------------------------------------------------------------------------------+
Managing Concurrency for Distributed Joins
Queries that require cross-shard data movement, i.
In most scenarios, the default settings for workload management will schedule your workload appropriately to utilize cluster resources without exhausting connection and thread limits.
Related Topics
In this section
Last modified: July 31, 2023