Learn more about multi-stage query engine and how to troubleshoot issues.
The general explanation of the multi-stage query engine is provided in the Multi-stage query engine reference documentation. This section provides a deep dive into the multi-stage query engine. Most of the concepts explained here are related to the internals of the multi-stage query engine and users don't need to know about them in order to write queries. However, understanding these concepts can help you to take advantage of the engine's capabilities and to troubleshoot issues.
Describes the intersect relation operator in the multi-stage query engine.
The intersect operator is a relational operator that combines two relations and returns the common rows between them. The operator is used to find the intersection of two or more relations, usually by using the SQL INTERSECT
Although it is accepted by the parser, the ALL
modifier is currently ignored. Therefore INTERSECT
are equivalent. This issue has been reported in #13126
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
In pseudo-code, the algorithm looks like this:
The intersect operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The intersect operator is represented in the explain plan as a LogicalIntersect
explain node.
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
Although it is accepted in SQL, the all
attribute is not currently used in the intersect operator. The returned rows are always distinct. This issue has been reported in #13126
The intersect operator has a memory footprint that is proportional to the number of unique rows in the right input relation. It also consumes the right input relation in a blocking fashion while the left input relation is consumed in a streaming fashion.
This means that:
In case any of the input relations is significantly larger than the other, it is recommended to use the smaller relation as the right input relation.
In case one of the input is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
These two hints can be contradictory, so it is up to the user to decide which one to follow based on the specific query pattern. Remember that you can use the stage stats to check the number of rows emitted by each of the inputs and adjust the order of the inputs accordingly.
Describes the filter relation operator in the multi-stage query engine.
The filter operator is used to filter rows based on a condition.
This page describes the filter operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use the where, having or sometimes on clauses.
Filter operations apply a predicate to each row and only keep the rows that satisfy the predicate.
It is important to notice that filter operators can only be optimized using indexes when they are executed in the leaf stage. The reason for that is that the intermediate stages don't have access to the actual segments. This is why the engine will try to push down the filter operation to the leaf stage whenever possible.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not indicate whether indexes are used or not.
The filter operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the predicate.
Type: Long
The number of groups emitted by the operator. A large number of emitted rows may not be problematic, but indicates that the predicate is not very selective.
The filter operator is represented in the explain plan as a LogicalFilter
explain node.
Type: Expression
The condition that is being applied to the rows. The expression may use indexed columns ($0
, $1
, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the filter is applying the condition $5 > 2
which means that only the rows where the 6th column is greater than 2 will be emitted. In order to know which column is the 6th, you need to look at the schema of the table scanned.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not directly indicate whether indexes are used or not.
Apache Pinot contributors are working on improving this, but it is not yet available. Meanwhile, we need an indirect approach to get that information.
First, we need to know on which stage the filter is being used. If the filter is being used in an intermediate stage, then the filter is not using indexes. In order to know the stage, you can extract stages as explained in understanding-stages.
But what about the leaf filters executed in the stage? Not all filters in the leaf stage can use indexes. The only way to know if the filter is using indexes is to use single-stage explain plan. In order to do so you need to transform the leaf stage into a single-stage query. This is a manual process that can be tedious but ends up not being so difficult once you get used to it.
See understanding-multi-stage-query for more information.
Describes the mailbox receive operator in the multi-stage query engine.
The mailbox receive operator is the operator that receives the data from the mailbox send operator. This is not an actual relational operator but a Pinot extension used to receive data from other stages.
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox receive operator pulls data from these mailboxes and sends it to the downstream operator.
The mailbox receive operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
It is important to notice that the mailbox receive operator tries to be fair when reading from multiple workers.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
Type: Long
How many workers are sending data to this operator.
Type: Long
How many messages have been received in heap format by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many messages have been received in raw format and therefore serialized by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many bytes have been deserialized by this mailbox. A high number here indicates that the mailbox is receiving a lot of data from other servers, which is expensive in terms of CPU, memory and network.
Type: Long
How long it took to deserialize the raw messages sent to this mailbox. This time is not wall time, but the sum of the time spent by all threads deserializing messages.
Take into account that this time does not include the impact on the network or the GC.
Type: Long
How much time this operator has been blocked waiting while offering data to be consumed by the downstream operator. A high number here indicates that the downstream operator is slow and may be a bottleneck. For example, usually the receive operator that is the left input of a join operator has a high value here, as the join needs to consume all the messages from the right input before it can start consuming the left input.
Type: Long
How much time this operator has been blocked waiting for more data to be sent by the upstream (send) operator. A high number here indicates that the upstream operator is slow and may be a bottleneck. For example, blocking operators like aggregations, sorts, joins or window functions require all the data to be received before they can start emitting a result, so having them as upstream operators of a mailbox receive operator can lead to high values here.
Given that the mailbox receive operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange
or PinotLogicalSortExchange
is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
See the to understand the attributes of the exchange explain node.