Distributed DML Query Execution
In this topic we will look at common DML query patterns and how they are executed through the distributed system. You can use the EXPLAIN command to examine a query’s aggregator-level and leaf-level query plans.
Let’s assume the following schema:
CREATE TABLE a ( a1 int, a2 int, a3 int, SHARD KEY (a1, a2), KEY (a3) ); CREATE TABLE b ( b1 int, b2 int, b3 int, SHARD KEY (b1, b2) ); CREATE REFERENCE TABLE r ( r1 int, r2 int, PRIMARY KEY (r1), KEY (r2) );
Matching the Shard Key
If you specify an equality on every column in the shard key, then the aggregator will direct the query to exactly one partition. Most queries do not fall into this pattern; instead, the aggregator must send queries to every partition in the cluster for intermediate results and then stitch them together.
These queries are sent to one partition:
SELECT * FROM a WHERE a1 = 4 AND a2 = 10; SELECT a3, count(*) FROM a WHERE a1 = 4 AND a2 = 10 GROUP BY a3;
These queries are sent to all partitions:
SELECT * FROM a WHERE a1 = 4; SELECT * FROM a WHERE a1 = 4 OR a2 = 10; SELECT * FROM a WHERE a1 IN (4, 5) AND a2 IN (10);
Secondary Index Matching
If your query uses a secondary (non-shard) index, then the aggregator must send the query to every partition in the cluster. Locally, each partition’s table will use its part of the secondary index to speed up the query. While the overall performance of the query is dictated by the seek and scan time of these indexes, the fact that the query must be sent everywhere in the cluster can increase the variance (and therefore overall latency) of the query.
This query matches the secondary index on the column a3:
SELECT * FROM a WHERE a3 = 5;
No Index Matching
Queries that do not match any index perform a full table scan on all partitions. From the perspective of the aggregator, these queries are the same as queries that match a secondary index.
Most queries that don’t involve aggregates, group-bys, or order-bys don’t require any further processing on the aggregator. These queries are forwarded verbatim to one or many partitions, and the partition’s results are streamed back to the client. More complex queries do require additional processing on the aggregator to merge the results from the leaves.
ORDER BY queries that don’t involve aggregates or group-bys can sort rows on the leaves and then merge the sorted intermediate results on the aggregator. For example, a query like
SELECT * FROM a WHERE a3 = 5 ORDER BY a1 will follow this pattern. These queries leverage distributed (leaf) processing to do the majority of filtering and sorting, which makes them scalable with the amount of data in the system.
Queries with aggregates compute aggregate values on the leaves and then use aggregate merging on the aggregator to compute a final result. Each non-associative aggregate is converted into an expression that is associative. For example,
AVG(expr) is converted to
SUM(expr)/COUNT(expr) automatically by the aggregator.
Distinct aggregates like
COUNT(DISTINCT ...) are not as efficient as simple aggregates like
COUNT(*). Distinct values must be resolved across partition boundaries (you could have
a3=10 on two different partitions in
SELECT COUNT(DISTINCT a3) FROM a), so each partition must send every distinct value it has back to the aggregator. Queries with distinct aggregates ship one row per distinct value per partition back to the aggregator and can therefore be expensive if there are a lot of distinct values.
There is an exception to this rule: if you run a
DISTINCT aggregate over the shard key, distinct values can be resolved on the leaves and the aggregator can merge aggregate values as it would with simple aggregates. An example of such a query would be
SELECT COUNT(DISTINCT a1, a2) FROM a.
It is also possible to calculate a fast approximation of distinct values; see APPROX_COUNT_DISTINCT .
GROUP BY queries are spread very efficiently across the leaves. The aggregator sends the
GROUP BY construct to the leaves so that the leaves process data down to the size of the final, grouped result set. The aggregator then merges together these grouped results (combining aggregates along the way) and sends the final result back to the client. The cost of a distributed
GROUP BY query is usually proportional to the number of rows in the final result set, since the traffic through the system is roughly the number of partitions multiplied by the size of the grouped result set.
HAVING clauses are processed entirely on the aggregator since they perform filtering after the
GROUP BY operation is complete.
As a general rule, SingleStore DB will efficiently execute any join query with a single sharded table and as many reference tables as you’d like. Since reference tables are fully replicated on every machine in the cluster, leaves can join against their local copies of reference tables.
These queries leverage reference joins:
SELECT * FROM a INNER JOIN r ON a.a1 = r.r1; SELECT * FROM r LEFT JOIN a ON a.a1 = r.r1; SELECT * FROM a, r r1, r r2, r r3; SELECT * FROM a INNER JOIN (SELECT DISTINCT r1 FROM r) x ON a.a1 = x.c;
Aligning Shard Keys for Performance
Aligning the shard keys of large tables enables more efficient joining. It is possible to perform arbitrary distributed joins across any tables and along any column. However, if you join two tables with identical shard key signatures along that shard key, the joins will be performed local to the partitions, reducing network overhead.
CREATE TABLE users ( id BIGINT AUTO_INCREMENT, user_name VARCHAR(1000), account_id BIGINT, PRIMARY KEY (id) ); CREATE TABLE clicks ( click_id BIGINT AUTO_INCREMENT, account_id BIGINT, user_id BIGINT, page_id INT, ts TIMESTAMP, SHARD KEY (user_id), PRIMARY KEY (click_id, user_id) );
In this example,
id is the shard key of the
users table, and the shard key on the
clicks table has the same signature (a single BIGINT). These queries join locally without network overhead:
SELECT * FROM users INNER JOIN clicks ON users.id = clicks.user_id WHERE clicks.page_id = 10; SELECT avg(c1.t - c2.t) FROM clicks c1 INNER JOIN clicks c2 ON c1.user_id = c2.user_id WHERE c1.page_id > c2.page_id;
Whereas this query will stream rows between leaves:
SELECT u.account_id, count(distinct user_id), count(1) FROM users u INNER JOIN clicks c ON u.account_id = c.account_id GROUP BY u.account_id;
If you identify your data layout and join patterns in advance, this technique can be an extremely effective way to run performant joins between distributed tables. For more information about how queries execute, see EXPLAIN.
WHERE clause in an
DELETE query is optimized the same way as the
WHERE clause in a
SELECT query. If the predicate matches the shard key exactly then the query is routed to a single partition.
SingleStore DB executes
INSERT queries by analyzing the insert values relevant to the shard key and routing the query to the corresponding partition. For example,
INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3) would compute the hash value of
(1, 2) and map this value to the appropriate partition.
If you are bulk inserting data with
INSERT queries, then you should take advantage of the multi-insert syntax:
INSERT INTO a (a1, a2, a3) VALUES (1, 2, 3), (2, 3, 4), .... The aggregator will chop up the multi-insert into single-partition insert queries and run them in parallel across the cluster. This technique enables your application to combat the inherent latency of running in a distributed system.
When a multi-insert statement is executed, SingleStore DB uses two steps to commit the transaction:
- Step one, where each leaf node validates their portion of the statement and acknowledges that they are ready to commit, and
- Step two, where each leaf node actually commits the transaction.
Both steps are necessary to ensure that each leaf node partition successfully receives and executes their portion of the insert statement. If an error occurs, the entire transaction is rolled back across the cluster.
For example, consider a cluster with two leaf nodes and four partitions total. When a multi-insert statement is executed against the cluster’s aggregator, each leaf node validates its portion of the statement and prepares to commit the transaction on its two partitions. When all leaf nodes have acknowledged their readiness, the aggregator notifies them to commit the transaction. Finally, the transaction is committed, and the data is successfully inserted.