Query Plan Operations

This topic describes the operations that a query plan may use. These operations are displayed when you use Visual Explain, run EXPLAIN, or run PROFILE to show a query plan. The examples in this topic use two tables : t, a rowstore table with a primary key, and ct a columnstore table. These tables are both in database db1. For more information about interpreting these operators to increase performance, see the Query Tuning Guide.

CREATE ROWSTORE TABLE t(id INT PRIMARY KEY, a INT, b INT, KEY(a));
CREATE TABLE ct(a INT, b INT, SORT KEY(a), SHARD KEY(a));

Table access methods

  • Project - outputs a subset of columns of the input (for example, a SELECTstatement that calls out specific columns from a table) in a particular order, and optionally computes new columns that are expressions of existing ones (for example, SELECT column_a / column_b AS column_c FROM table_name).

  • TableScan - scans every row in a table using an index

  • IndexSeek - navigates to a particular row using an index

  • IndexRangeScan - scans a range of rows using an index

  • ColumnStoreScan - scans a columnstore table

  • OrderedColumnStoreScan - scans a table using the columnstore sort key in key order

EXPLAIN SELECT * FROM t WHERE t.a = 5;
+---------------------------------------------+
| EXPLAIN                                     |
+---------------------------------------------+
| Project [t.id, t.a, t.b]                    |
| Gather partitions:all                       |
| Project [t.id, t.a, t.b]                    |
| IndexRangeScan db.t, KEY a (a) scan:[a = 5] |
+---------------------------------------------+
EXPLAIN SELECT * FROM ct;
+---------------------------------------------------------------+
| EXPLAIN                                                       |
+---------------------------------------------------------------+
| Project [ct.a, ct.b]                                          |
| Gather partitions:all                                         |
| Project [ct.a, ct.b]                                          |
| ColumnStoreScan db1.ct, KEY a (a) USING CLUSTERED COLUMNSTORE |
+---------------------------------------------------------------+

Filter Methods

  • Filter - reads a stream of input rows and outputs only those rows that match a specified condition

  • BloomFilter - filters rows based on matching them against a join condition from a corresponding HashJoin

The following examples show how Bloomfilter can appear in a query plan alone, and how it can appear as part of ColumnstoreFilter when used on a columnstore table, respectively.

EXPLAIN SELECT * FROM rowstore_table_a straight_join (SELECT WITH (no_merge_this_select=true) * FROM columnstore_table_a) t WITH (bloom_filter=true) ON t.column_b = rowstore_table_a.column_b;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Top limit:[@@SESSION.`sql_select_limit`]                                                                                                                                |
| Gather partitions:all est_rows:1 alias:remote_0                                                                                                                         |
| Project [r0.column_a, r0.column_b, r1.column_a AS column_a_1, r1.column_b AS column_b_2] est_rows:1 est_select_cost:4                                                   |
| Top limit:[?]                                                                                                                                                           |
| HashJoin                                                                                                                                                                |
| |---HashTableProbe [r1.column_b = r0.column_b]                                                                                                                          |
| | HashTableBuild alias:r1                                                                                                                                               |
| | Repartition [columnstore_table_a.column_a, columnstore_table_a.column_b] AS r1 shard_key:[column_b] est_rows:1                                                        |
| ColumnStoreScan database_name.columnstore_table_a, KEY column_a (column_a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:1 est_filtered:1   |
| TableScan r0 storage:list stream:yes table_type:sharded est_table_rows:1 est_filtered:1                                                                                 |
| Repartition [rowstore_table_a.column_a, rowstore_table_a.column_b] AS r0 shard_key:[column_b] est_rows:1                                                                |
| TableScan database_name.rowstore_table_a table_type:sharded_rowstore est_table_rows:1 est_filtered:1                                                                    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
EXPLAIN SELECT * FROM columnstore_table_b straight_join (SELECT WITH (no_merge_this_select=true) * FROM columnstore_table_a) t WITH (bloom_filter=true) ON t.column_b = columnstore_table_b.column_b;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Top limit:[@@SESSION.`sql_select_limit`]                                                                                                                                |
| Gather partitions:all est_rows:1 alias:remote_0                                                                                                                         |
| Project [r0.column_a, r0.column_b, r1.column_a AS column_a_1, r1.column_b AS column_b_2] est_rows:1 est_select_cost:4                                                   |
| Top limit:[?]                                                                                                                                                           |
| HashJoin                                                                                                                                                                |
| |---HashTableProbe [r1.column_b = r0.column_b]                                                                                                                          |
| | HashTableBuild alias:r1                                                                                                                                               |
| | Repartition [columnstore_table_a.column_a, columnstore_table_a.column_b] AS r1 shard_key:[column_b] est_rows:1                                                        |
| | ColumnStoreScan database_name.columnstore_table_a, KEY column_a (column_a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:1 est_filtered:1 |
| TableScan r0 storage:list stream:yes table_type:sharded est_table_rows:1 est_filtered:1                                                                                 |
| Repartition [columnstore_table_b.column_a, columnstore_table_b.column_b] AS r0 shard_key:[column_b] est_rows:1                                                          |
| ColumnStoreScan database_name.columnstore_table_b, KEY column_a (column_a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:1 est_filtered:1   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

ColumnstoreFilter Table Access Method: Applies a Filter to a Columnstore Table

The following example demonstrates the ColumnstoreFilter query operation, using the table articles:

CREATE TABLE articles (
id INT UNSIGNED,
year int UNSIGNED,
title VARCHAR(200),
body TEXT,
SHARD KEY(id),
SORT KEY (id),
KEY (id) USING HASH,
KEY (title) USING HASH,
KEY (year) USING HASH);

The EXPLAIN statement shows the ColumnStoreFilter operation with index, because a hash index is used to apply the filter.

EXPLAIN SELECT * FROM articles WHERE title = 'Interesting title here';
+------------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                              |
+------------------------------------------------------------------------------------------------------+
| Gather partitions:all alias:remote_0                                                                 |
| Project [articles.id, articles.year, articles.title, articles.body]                                  |
| ColumnStoreFilter [articles.title = 'Interesting title here' index]                                  |
| ColumnStoreScan d.articles, KEY id_2 (id) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore |
+------------------------------------------------------------------------------------------------------+

GROUP BY and Aggregations

  • Aggregate - computes an aggregate

  • HashGroupBy - uses a hash table to compute group by results

  • StreamingGroupBy - leverages the fact that the underlying operation produces rows in order to compute group by results. The advantage of StreamingGroupBy is that it only uses a constant amount of memory

  • ShuffleGroupBy - occurs when a GROUP BY clause operates on a set of columns that do not include the shard key. First, a local GROUP BY is performed per host. Then, the data is repartitioned and GROUP BY is completed.

  • Distinct - removes duplicate rows

EXPLAIN SELECT SUM(id) FROM t;
+-----------------------------------------+
| EXPLAIN                                 |
+-----------------------------------------+
| Project [`sum(id)`]                     |
| Aggregate [SUM(`sum(id)`) AS `sum(id)`] |
| Gather partitions:all                   |
| Project [`sum(id)`]                     |
| Aggregate [SUM(t.id) AS `sum(id)`]      |
| TableScan db1.t, PRIMARY KEY (id)       |
+-----------------------------------------+
EXPLAIN SELECT SUM(id) FROM t GROUP BY a+1;
+------------------------------------------------------------+
| EXPLAIN                                                    |
+------------------------------------------------------------+
| Project [`sum(id)`]                                        |
| HashGroupBy [SUM(`sum(id)`) AS `sum(id)`] groups:[t.a + 1] |
| Gather partitions:all                                      |
| Project [`sum(id)`, t.a + 1 AS op, t.a, 1 AS op_1]         |
| HashGroupBy [SUM(t.id) AS `sum(id)`] groups:[t.a + 1]      |
| TableScan db1.t, PRIMARY KEY (id)                          |
+------------------------------------------------------------+

Distributed data movement

  • Gather - collects all the results from the leaf nodes to the aggregator node. When a query can be routed to a single partition it has the attribute partitions:single. If Gather collects data from all the partitions the attribute is set to partitions:all. If the shard key matches an IN list predicate, then the attribute is set to partitions:inlist. The query will only be sent to partitions that match the values in the IN list. Queries that have partitions:single are called single partition queries. An advantage of single partition queries is that they can scale to much higher concurrency and throughput because they only need to execute on a single partition.

  • GatherMerge - collects ordered streams of rows from the leaf nodes and merges them to output an ordered stream.

EXPLAIN SELECT * FROM t WHERE id = 1;
+-------------------------------------------------+
| EXPLAIN                                         |
+-------------------------------------------------+
| Gather partitions:single                        |
| Project [t.id, t.a, t.b]                        |
| IndexSeek db1.t, PRIMARY KEY (id) scan:[id = 1] |
+-------------------------------------------------+
EXPLAIN SELECT * FROM t WHERE id > 1;
+------------------------------------------------------+
| EXPLAIN                                              |
+------------------------------------------------------+
| Project [t.id, t.a, t.b]                             |
| Gather partitions:all                                |
| Project [t.id, t.a, t.b]                             |
| IndexRangeScan db1.t, PRIMARY KEY (id) scan:[id > 1] |
+------------------------------------------------------+
EXPLAIN SELECT * FROM t WHERE id IN (2,3,4);
+-----------------------------------------------------------------------------------+
| EXPLAIN                                                                           |
+-----------------------------------------------------------------------------------+
| Gather partitions:inlist alias:remote_0                                           |
| Project [t.id, t.a, t.b]                                                          |
| IndexSeek demo.t, PRIMARY KEY (id) scan:[id IN (...)] table_type:sharded_rowstore |
+-----------------------------------------------------------------------------------+
EXPLAIN SELECT * FROM t ORDER BY id;
+-----------------------------------+
| EXPLAIN                           |
+-----------------------------------+
| Project [t.id, t.a, t.b]          |
| GatherMerge [t.id] partitions:all |
| Project [t.id, t.a, t.b]          |
| TableScan db.t, PRIMARY KEY (id)  |
+-----------------------------------+
  • Repartition - redistributes a dataset to hash-partition it on a particular key

  • Broadcast - broadcasts a dataset to every node in a workspace

    Note

    For broadcast LEFT JOIN, a BRANCH operator is added to more accurately represent shared computations. A shared result table is now computed once, shown once, and shared across different branches in the plan.

    EXPLAIN SELECT t.*, ct.* FROM t LEFT JOIN ct ON t.a = ct.a;   
    +--------------------------------------------------------------------------------------------------------------------------------------------+
    | EXPLAIN                                                                                                                                    |
    +--------------------------------------------------------------------------------------------------------------------------------------------+
    | Gather partitions:all est_rows:30 alias:remote_0                                                                                           |
    | Project [SUBQ_VWW_1.a, SUBQ_VWW_1.b, SUBQ_VWW_1.a_1, SUBQ_VWW_1.b_2] est_rows:30 est_select_cost:32                                        |
    | TableScan 1tmp AS SUBQ_VWW_1 storage:list stream:yes est_table_rows:30 est_filtered:30                                                     |
    | UnionAll est_rows:30                                                                                                                       |
    | |---Project [r4.a, r4.b, r4.a_1, r4.b_2] est_rows:1                                                                                        |
    | |   Filter [$0 = 8]                                                                                                                        |
    | |   HashGroupBy [COUNT(*) AS $0] groups:[r4.i0]                                                                                            |
    | |   TableScan r4 storage:list stream:no table_type:sharded est_table_rows:30 est_filtered:30                                               |
    | |   Project [r3.i0, r3.a, r3.b, r3.a_1, r3.b_2] alias:r4 est_rows:30                                                                       |
    | |   TableScan r3 storage:list stream:yes table_type:sharded est_table_rows:30 est_filtered:30                                              |
    | |   Repartition AS r3 shard_key:[i0] est_rows:30                                                                                           |
    | |   Branch [SUBQ_VWW_0.ConstIntCol IS NULL] position:[2/2]                                                                               |
    | Project [r2.a, r2.b, r2.a_1, r2.b_2] est_rows:30                                                                                           |
    | TableScan r2 storage:list stream:yes table_type:sharded est_table_rows:30 est_filtered:30                                                  |
    | Project [r1.i0, r1.a, r1.b, SUBQ_VWW_0.a_1, SUBQ_VWW_0.b_2] alias:r2 est_rows:30                                                           |
    | Branch [SUBQ_VWW_0.ConstIntCol IS NOT NULL] position:[1/2]                                                                                 |
    | HashJoin type:right                                                                                                                        |
    | |---HashTableProbe [r1.a = SUBQ_VWW_0.a_1]                                                                                                 |
    | |   HashTableBuild alias:r1                                                                                                                |
    | |   Project [r0.a, r0.b, i0] alias:r1 hash_key:[a] est_rows:1                                                                              |
    | |   Window [ROW_NUMBER() OVER () AS i0]                                                                                                    |
    | |   TableScan r0 storage:list stream:yes table_type:reference est_table_rows:1 est_filtered:1                                              |
    | |   Broadcast [t.a, t.b] AS r0 distribution:tree est_rows:1                                                                                |
    | |   ColumnStoreScan test1.t, KEY __UNORDERED () USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:1 est_filtered:1 |
    | TableScan 0tmp AS SUBQ_VWW_0 storage:list stream:yes est_table_rows:3,072 est_filtered:3,072                                               |
    | Project [ct.a AS a_1, ct.b AS b_2, 0 AS ConstIntCol] est_rows:3,072                                                                        |
    | ColumnStoreScan test1.ct, KEY a (a) USING CLUSTERED COLUMNSTORE table_type:sharded_columnstore est_table_rows:3,072 est_filtered:3,072     |
    +--------------------------------------------------------------------------------------------------------------------------------------------+
    27 rows in set (0.01 sec)
EXPLAIN SELECT * FROM t,ct WHERE t.id = ct.b;
+-----------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                       |
+-----------------------------------------------------------------------------------------------+
| Project [t.id, t.a, t.b, r0.a_1, r0.b_2]                                                      |
| Gather partitions:all est_rows:1                                                              |
| Project [t.id, t.a, t.b, r0.a_1, r0.b_2] est_rows:1 est_select_cost:3                         |
| NestedLoopJoin                                                                                |
| |---IndexSeek db1.t, PRIMARY KEY (id) scan:[id = r0.b_2] est_table_rows:1 est_filtered:1      |
| TableScan r0 storage:list stream:no                                                           |
| Repartition [ct.a AS a_1, ct.b AS b_2] AS r0 shard_key:[b_2] est_rows:1                       |
| ColumnStoreScan db1.ct, KEY a (a) USING CLUSTERED COLUMNSTORE est_table_rows:1 est_filtered:1 |
+-----------------------------------------------------------------------------------------------+
  • ChoosePlan indicates that SingleStore will choose one of the listed plans at runtime based on cost estimates. estimate illustrates the statistics that are being estimated, but note that these SQL statements are not actually estimated. Instead, SingleStore uses index information to estimate these statistics.

EXPLAIN SELECT * FROM t WHERE id > 5 AND a > 5;
+-----------------------------------------------------------+
| EXPLAIN                                                   |
+-----------------------------------------------------------+
| Project [t.id, t.a, t.b]                                  |
| Gather partitions:all                                     |
| Project [t.id, t.a, t.b]                                  |
| ChoosePlan                                                |
| |   :estimate                                             |
| |       SELECT COUNT(*) AS cost FROM db1.t WHERE t.id > 5 |
| |       SELECT COUNT(*) AS cost FROM db1.t WHERE t.a > 5  |
| |---Filter [t.a > 5]                                      |
| |   IndexRangeScan db1.t, PRIMARY KEY (id) scan:[id > 5]  |
| +---Filter [t.id > 5]                                     |
|     IndexRangeScan db1.t, KEY a (a) scan:[a > 5]          |
+-----------------------------------------------------------+

Joins

The following are the three types of joins that the optimizer can perform. They are listed in order of least complex to the most complex algorithm to complete.

  1. NestedLoopJoin - performs a nested loop join: for every row on the outer side of the join SingleStore scans into the inner table to find all the matching rows.

    The complexity of the NestedLoopJoin operation is on the order of the number of rows in the outer table times the number of rows on the inner table. If there are millions of rows in each table, this operation is not efficient. It is best to add an index and or shard keys to the tables for that use case.

    If no index or shard key exists in either table, the optimizer performs keyless sharding or sorting on the table outer table before the NestedLoopJoin operation. This is displayed by an index seek operation in the EXPLAIN output.

    This is the fallback join that the optimizer will perform if all other joins fail.

  2. HashJoin - performs a hash join: SingleStorebuilds a hash table from one of the joined tables. For every row in the table stated to the left of a JOIN syntax, the hash table is scanned. If there is a match, the rows are joined.

    The hash table must fit in memory, so SingleStore attempts to construct the hash table utilizing the table with fewer rows. If there isn't enough memory to create the hash table, then a NestedLoopJoin is performed instead.

  3. MergeJoin - performs a merge join: SingleStore scans both inner and outer sides of the join at the same time and merges matching rows. Both tables must have a sort key(s) and shard key(s) on the column that is being joined. If none of these conditions exist, the other two joins are considered.

    Since values to be joined are in sort order, both tables are scanned at the same time. The optimizer needs to perform only one scan. The shard key requirement makes the join local ensuring we only consider matches on each partition. This is why the MergeJoin is the most performant.

    Note

    MergeJoin is only supported for inner joins, not for outer joins.

EXPLAIN SELECT * FROM t t1, t t2 WHERE t1.id = t2.a;
+------------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                        |
+------------------------------------------------------------------------------------------------+
| Project [t1.id, t1.a, t1.b, r0.id_1, r0.a_2, r0.b_3]                                           |
| Gather partitions:all est_rows:1                                                               |
| Project [t1.id, t1.a, t1.b, r0.id_1, r0.a_2, r0.b_3] est_rows:1 est_select_cost:3              |
| NestedLoopJoin                                                                                 |
| |---IndexSeek db1.t AS t1, PRIMARY KEY (id) scan:[id = r0.a_2] est_table_rows:1 est_filtered:1 |
| TableScan r0 storage:list stream:no                                                            |
| Repartition [t2.id AS id_1, t2.a AS a_2, t2.b AS b_3] AS r0 shard_key:[a_2] est_rows:1         |
| TableScan db1.t AS t2, PRIMARY KEY (id) est_table_rows:1 est_filtered:1                        |
+------------------------------------------------------------------------------------------------+
EXPLAIN SELECT * FROM t, ct WHERE t.b = ct.b;
+-----------------------------------------------------------------------------------------------+
| EXPLAIN                                                                                       |
+-----------------------------------------------------------------------------------------------+
| Project [r1.id, r1.a, r1.b, ct.a AS a_1, ct.b AS b_2]                                         |
| Gather partitions:all est_rows:1                                                              |
| Project [r1.id, r1.a, r1.b, ct.a AS a_1, ct.b AS b_2] est_rows:1 est_select_cost:4            |
| HashJoin [r1.b = ct.b]                                                                        |
| |---Broadcast [t.id, t.a, t.b] AS r1 est_rows:1                                               |
| |   TableScan db1.t, PRIMARY KEY (id) est_table_rows:1 est_filtered:1                         |
| ColumnStoreScan db1.ct, KEY a (a) USING CLUSTERED COLUMNSTORE est_table_rows:1 est_filtered:1 |
+-----------------------------------------------------------------------------------------------+
EXPLAIN SELECT * FROM ct t1, ct t2 WHERE t1.a = t2.a;
+--------------------------------------------------------------------------------+
| EXPLAIN                                                                        |
+--------------------------------------------------------------------------------+
| Project [t1.a, t1.b, t2.a, t2.b]                                               |
| Gather partitions:all                                                          |
| Project [t1.a, t1.b, t2.a, t2.b]                                               |
| MergeJoin condition:[t2.a = t1.a]                                              |
| |---OrderedColumnStoreScan db1.ct AS t2, KEY a (a) USING CLUSTERED COLUMNSTORE |
| +---OrderedColumnStoreScan db1.ct AS t1, KEY a (a) USING CLUSTERED COLUMNSTORE |
+--------------------------------------------------------------------------------+

Handling Parameter-Dependent Query Plan Issues

A parameter-dependent query plan issue can occur when the query optimizer generates a query execution plan optimized for a specific parameter value/set of values. Because parameter values can change, a cached plan is no longer optimal for parameter values that are used in consecutive executions. These types of plans can cause query performance problems.

Workarounds that can reduce parameter-dependent query plan performance issues are:

  • Using both the with(row_count=xxx) and with(selectivity=x.x) query hints. The hints can be used with the right table after the JOIN:

    JOIN <table_name> [AS alias] with(row_count=xxx, selectivity=x.x) ON

    These hints override the statistics the query optimizer uses. with(row_count=xxx) treats the table as having xxx number of rows. with(selectivity=x.x) sets an estimate of the fraction of the table's rows that are not filtered.

  • Using the NOPARAM function. This function disables the parameterization of constants before a query plan is compiled. Please be aware when disabling parameterization, separate query plans are created for the different parameter values thereby causing an increase in compiling time and resources.

  • Adding comments to the query. Comments are not parameterized, so using them forces the optimizer to generate different plans.

    Please note, the client can remove the comments in a query, thereby preventing the effectiveness of this workaround.

    For example:

    SELECT /*1*/ 1;

    The server could receive the following command:

    SELECT 1;

    Again, please be aware of the increase in compiling time and resources needed when separate query plans are created.

Last modified: January 10, 2024

Was this article helpful?