Merger Engine
Result merger refers to merging multi-data result sets acquired from all the data nodes as one result set and returning it to the requesting client correctly.
The result merger supported by ShardingSphere can be divided into five functional types: traversal, order-by, group-by, pagination and aggregation, which are combined rather than mutually exclusive. From the perspective of structure, it can be divided into stream merger, memory merger and decorator merger, among which stream merger and memory merger are mutually exclusive, and decorator merger can be further processed based on stream merger and memory merger.
Since the result set is returned from the database one by one instead of being loaded to the memory all at a time, the method of merging the result sets returned from the database can greatly reduce memory consumption and is the preferred method of merging.
Stream merger means that each time the data is obtained from the result set is able to return the correct single piece of data line by line. It is the best fit with the native method of returning the result set of the database. Traversal, order-by, and stream group-by are all examples of the stream merger.
Memory merger needs to traverse all the data in the result set and store it in the memory first. After unified grouping, ordering, aggregation and other calculations, the data is packaged into the data result set accessed one by one and returned.
Decorator merger merges and reinforces all the result sets function uniformly. Currently, decorator merger has two types: pagination merger and aggregation merger.
Traversal Merger
As the simplest merger method, traversal merger only requires the combination of multiple data result sets into a one-way linked table. After traversing current data result sets in the linked table, it only needs to move the elements of the linked table back one bit and continue traversing the next data result set.
Order-by Merger
Because there is an ORDER BY
statement in SQL, each data result has its own order. So it only needs to sort data value that the result set cursor currently points to, which is equal to sorting multiple ordered arrays. Therefore, order-by merger is the most suitable sorting algorithm in this scenario.
When merging ordered queries, ShardingSphere will compare current data values in each result set (which is realized by the Java Comparable interface) and put them into the priority queue. Each time when acquiring the next piece of data, it only needs to move down the result set cursor at the top of the queue, reenter the priority order according to the new cursor and relocate its own position.
Here is an instance to explain ShardingSphere’s order-by merger. The following picture is an illustration of ordering by the score. Data result sets returned by 3 tables are shown in the example and each of them has already been ordered according to the score, but there is no order between the 3 data result sets. Order the data value that the result set cursor currently points to in these 3 result sets. Then put them into the priority queue. The first data value of t_score_0
is the biggest, followed by that of t_score_2
and t_score_1
in sequence. Thus, the priority queue is ordered by the sequence of t_score_0
, t_score_2
and t_score_1
.
The following diagram illustrates how the order-by merger works when using next call. We can see from the diagram that when using the next
call, t_score_0
at the first of the queue will be popped out. After returning the data value currently pointed by the cursor (i.e., 100) to the requesting client, the cursor will be moved down and t_score_0
will be put back into the queue.
While the priority queue will also be ordered according to the t_score_0
data value (90 here) pointed by the cursor of the current data result set. According to the current value, t_score_0
is at the end of the queue, and the data result set of t_score_2
, originally in the second place of the queue, automatically moves to the first place of the queue.
In the second next
call, t_score_2
in the first place is popped out. Its value pointed by the cursor of the data result set is returned to the client end, with its cursor moved down to rejoin the queue, and the following will be the same way. If there is no data in the result set, it will not rejoin the queue.
It can be seen that when data in each result set is ordered, but multiple result sets are disordered, ShardingSphere can still order them with no need to upload all the data to the memory. In the stream merger method, each next
operation only acquires the right piece of data each time, which saves memory consumption to a large extent.
On the other hand, the order-by merger has maintained the orderliness on the horizontal axis and vertical axis of the data result set. Naturally ordered, the vertical axis refers to each data result set itself, which is acquired by SQL with ORDER BY
. The horizontal axis refers to the current value pointed by each data result set, and its order needs to be maintained by the priority queue. Each time when the current cursor moves down, it requires putting the result set in the priority order again, which means only the cursor of the first data result set can be moved down.
Group-by Merger
Group-by merger is the most complex one and can be divided into stream group-by merger and memory group-by merger. Stream group-by merger requires that the SQL’s ordering items must be consistent with the field and ordering types (ASC or DESC) of the group-by item; otherwise, data correctness can only be guaranteed by memory merger.
For instance, if it is sharded based on subject, the table structure contains the examinees’ name (to simplify, name repetition is not taken into consideration) and score. The following SQL is used to acquire each examinee’s total score:
SELECT name, SUM(score) FROM t_score GROUP BY name ORDER BY name;
When order-by item and group-by item are totally consistent, the data obtained is continuous. The data required by group-by is all stored in the data value that the data result set cursor currently points to. Thus, stream group-by merger can be used, as illustrated by the diagram:
The merging logic is similar to that of order-by merger. The following picture shows how the stream group-by merger works in the next
call.
We can see from the picture that, in the first next
call, t_score_java
in the first place will be popped out of the queue, along with other result set data having the same grouping value of “Jerry”. After acquiring all the students’ scores with the name of “Jerry”, the accumulation operation will proceed. Hence, after the first next
call is finished, the result set acquired is the sum of Jerry’s scores. At the same time, all the cursors in data result sets will be moved down to a different data value next to “Jerry” and reordered according to the current result set value. Thus, the data that contains the second name “John” will be put at the beginning of the queue.
Stream group-by merger is different from order-by merger only in two aspects:
- It will take out all the data with the same group item from multiple data result sets at once.
- It carried out the aggregation calculation according to the aggregation function type.
For the inconsistency between the grouping item and ordering item, it requires uploading all the data to the memory to group and aggregate, since the relevant data value needed to acquire group information is not continuous, and stream merger is not available. For example, acquire each examinee’s total score through the following SQL and order them from the highest to the lowest:
SELECT name, SUM(score) FROM t_score GROUP BY name ORDER BY score DESC;
Then, stream merger is not able to use, for the data taken out from each result set is the same as the original data of the order-by merger diagram in the upper half part structure.
When SQL only contains the group-by statement, according to different database implementations, its sorting order may not be the same as the group order. The lack of an ordering statement indicates the order is not important in this SQL. Therefore, through the optimization of SQL rewriting, ShardingSphere can automatically add the ordering item the same as the grouping item, converting it from the memory merger that consumes memory to the stream merger.
Aggregation Merger
Whether it is stream group-by merger or memory group-by merger, they process the aggregation function in the same way. In addition to grouped SQL, ungrouped SQL can also use aggregate functions. Therefore, aggregation merger is an additional merging ability based on what has been introduced above, i.e., the decorator mode. The aggregation function can be categorized into three types: comparison, sum and average.
The comparison aggregation function refers to MAX
and MIN
. They need to compare all the result set data of each group and simply return the maximum or minimum value.
The sum aggregation function refers to SUM
and COUNT
. They need to sum up all the result set data of each group.
The average aggregation function refers only to AVG
. It must be calculated through SUM
and COUNT
rewritten by SQL, which has been mentioned in the SQL rewriting section.
Pagination Merger
All the merger types above can be paginated. Pagination is the decorator added to other kinds of mergers. ShardingSphere strengthens its ability to paginate the data result set through decorator mode. The pagination merger is responsible for filtering unnecessary data.
ShardingSphere’s pagination function can be misleading to users in that they may think it will take a large amount of memory. In distributed scenarios, it can only guarantee the data correctness by rewriting LIMIT 10000000, 10
to LIMIT 0, 10000010
. Users can easily misunderstand that ShardingSphere uploads a large amount of meaningless data to the memory and has the risk of memory overflow. Actually, it can be known from the principle of stream merger that only memory group-by merger will upload all the data to the memory. Generally speaking, SQL used for OLAP grouping, is often applied to massive calculations or small result generation, and it won’t generate vast result data. Except for memory group-by merger, other scenarios all use stream merger to acquire data result set. So ShardingSphere would skip unnecessary data through the next call method in the result set, rather than storing it in the memory.
But it should be noted that pagination with LIMIT
is not the best practice, because a large amount of data still needs to be transmitted to ShardingSphere’s memory space for ordering. LIMIT
cannot query data by index, so paginating with ID is a better solution if ID continuity can be guaranteed. For example:
SELECT * FROM t_order WHERE id > 100000 AND id <= 100010 ORDER BY id;
Or query the next page through the ID of the last query result, for example:
SELECT * FROM t_order WHERE id > 10000000 LIMIT 10;
The overall structure of the merger engine is shown in the following diagram: