The multi-stage query engine uses a set of operators to process the query. These operators are based on relational algebra, with some modifications to better fit the distributed nature of the engine.
These operators are the execution units that Pinot uses to execute a query. The operators are executed in a pipeline with tree structure, where each operator consumes the output of the previous operators (also known as upstreams).
Users do not directly specify these operators. Instead they write SQL queries that are translated into a logical plan, which is then transformed into different operators. The logical plan can be obtained using explaining a query, while there is no way to get the operators directly. The closest thing to the operators that users can get is the multi-stage stats output.
Operators vs SQL clauses
These operators are generated from the SQL query that you write, but even they are similar, there is not a one-to-one mapping between the SQL clauses and the operators. Some SQL clauses generate multiple operators, while some operators are generated by multiple SQL clauses.
Operators vs explain plan nodes
Operators and explain plan nodes are closer than SQL clauses and operators. Although most explain plan nodes can be directly mapped to an operator, there are some exceptions:
Each PinotLogicalExchange and each PinotLogicalSortExchange explain node is materialized into a pair of and operators.
All plan nodes that belong to the same leaf stage are executed in the operator.
In general terms, the operators are the execution units that Pinot uses to execute a query and are also known as the multi-stage physical plan, while the explain plan nodes are logical plans. The difference between the two is that the operators can be actually executed, while the explain plan nodes are the logical representation of the query plan.
List of operators
The following is a list of operators that are used by the multi-stage query engine:
Join
Describes the hash join relation operator in the multi-stage query engine.
The hash join operator is used to join two relations using a hash join algorithm. It is a binary operator that takes two inputs, the left and right relations, and produces a single output relation.
This is the only join operator in the multi-stage query engine and it is always created as a result of a query that contains a join clause, but can be created by other SQL queries like ones using semi-join.
There are different types of joins that can be performed using the hash join operator. Apache Pinot supports:
Inner join, where only the rows that have a match in both relations are returned.
Left join, where all the rows from the left relation are returned. The ones that have a match with the right relation are returned with the columns from the right relation, and the ones that do not have a match are returned with null values for the columns from the right relation.
Right join, like the left join but returning all the rows from the right relation, with the columns from the left relation filled with null values for the rows that do not have a match.
Full outer join, where all the rows from both relations are returned. If a row from any relation does not have a match in the other relation, the columns from the other relation are filled with null values.
Semi-join, where only the rows from the left relation that have a match in the right relation are returned. This is useful to filter the rows from the left relation based on the existence of a match in the right relation.
Anti-join, where only the rows from the left relation that do not have a match in the right relation are returned.
Implementation details
The hash join operator is one of the new operators introduced in the multi-stage query engine. The current implementation assumes that the right input relation is the smaller one, so it consumes this input first building a hash table that is then probed with the left input relation.
Future optimizations may include advanced heuristics to decide which input relation to consume first, but in the current implementation, it is important to specify the smaller relation as the right input.
Although the whole multi-stage query engine is designed to be able to process the data in memory, the multi-stage query engine uses the ability to execute each stage in different workers (explained in understanding stages) to be able to process the data that may not fit in the memory of a single node. Specifically, each worker processes a subset of the data. Inputs are by default partitioned by the join keys and each worker process one partition of the data.
This means that data usually needs to be shuffled between workers, which is done by the engine using a mailbox system. The engine tries to minimize the amount of data that needs to be shuffled by partitioning the data, but some techniques can be used to reduce the amount of data that needs to be shuffled, like using co-located joins.
Blocking nature
The hash join operator is a blocking operator. It needs to consume all the input data (from both inputs) before emitting the result.
Maximum number of groups
Even using partitioning, the amount of data that needs to be stored in memory can be high, so the engine tries to protect itself from running out of memory by limiting the number of groups that can be created by the join keys.
The join_overflow_mode hint can be used to control the behavior of the engine when the number of groups exceeds the limit. This limit can be defined using the max_rows_in_join hint. By default, this limit is slightly above 1 million groups and the default join overflow mode is THROW, which means that the query will fail if the number of groups exceeds the limit.
Hints
join_overflow_mode
Type: String
Default: THROW
Defines the behavior of the engine when the number of groups exceeds the limit defined by the max_rows_in_join hint. The possible values are:
THROW: The query will fail if the number of groups exceeds the limit.
BREAK: The engine will stop processing the join and return the results that have been computed so far. In this case the stat maxRowsInJoinReached will be true.
max_rows_in_join
Type: Integer
Default: 1.048.576
The maximum number of groups that can be created by the join keys. What happens when this limit is reached is defined by the join_overflow_mode hint.
Take care when increasing this limit. If the number of groups is too high, the amount of memory used by the engine can be very high, which can lead to very large GC pauses and even out of memory errors.
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator. Joins can emit more rows than the input relations, so this value can be higher than the number of rows in the input. Remember that the number of groups is limited by the max_rows_in_join hint and a large number of groups can lead to high memory usage and long GC pauses, which can affect the performance of the whole system.
maxRowsInJoinReached
Type: Boolean
This stat is set to true when the number of groups exceeds the limit defined by the max_rows_in_join hint.
Notice that by default the engine will throw an exception when this happens in which case no stat will be emitted. Therefore this stat is only emitted when the join_overflow_mode hint is set to BREAK.
timeBuildingHashTableMs
Type: Long
The time spent building the hash table used to probe the join keys, in milliseconds.
A large number here can indicate that the right relation is too large or the right relation is taking too long to be processed.
Explain attributes
The hash join operator is represented in the explain plan as a LogicalJoin explain node.
condition
Type: Expression
The condition that is being applied to the rows to join the relations. The expression may use indexed columns ($0, $1, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the join condition is that the column with index 0 in the left relation is equal to the column with index 1 in the right relation. Given the rest of the explain plan, we can see that the column with index 0 userUUID column in the userAttributes table and the column with index 1 is the userUUID column in the userGroups table.
joinType
Type: String
The type of join that is being performed. The possible values are: inner, left, right, full, semi and anti, as explained in Implementation details.
Tips and tricks
The order of input relations matter
Apache Pinot does not use table stats to determine the best order to consume the input relations. Instead, it assumes that the right input relation is the smaller one. That relation will always be fully consumed to build a hash table and sometimes it will be broadcasted to all workers. This means that it is important to specify the smaller relation as the right input.
Remember that left and right are relative to the order of the tables in the SQL query. It is less expensive to do a join between a large table and a small table than the other way around.
select largeTable.col1, smallTable.col2
from largeTable
cross join smallTable
select largeTable.col1, smallTable.col2
from smallTable
cross join largeTable
Aggregate
Describes the aggregate relation operator in the multi-stage query engine.
The aggregate operator is used to perform calculations on a set of rows and return a single row of results.
This page describes the aggregate operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use aggregate functions in a query either with or without a group by clause.
Implementation details
Aggregate operations may be expensive in terms of memory, CPU and network usage. As explained in , the multi-stage query engine breaks down the query into multiple stages and each stage is then executed in parallel on different workers. Each worker processes a subset of the data and sends the results to the coordinator which then aggregates the results. When possible, the multi-stage query engine will try to apply a divide-and-conquer strategy to reduce the amount of data that needs to be processed in the coordinator stage.
For example if the aggregation function is a sum, the engine will try to sum the results of each worker before sending the partial result to the coordinator, which would then sum the partial results in order to get the final result. But some aggregation functions, like count(distinct), cannot be computed in this way and require all the data to be processed in the coordinator stage.
In Apache Pinot 1.1.0, the multi-stage query engine always keeps the data in memory. This means that the amount of memory used by the engine is proportional to the number of groups generated by the group by clause and the amount of data that needs to be kept for each group (which depends on the aggregation function).
Even when the aggregation function is a simple count, which only requires to keep a long for each group in memory, the amount of memory used can be high if the number of groups is high. This is why the engine limits the number of groups. By default, this limit is 100.000, but this can be changed by providing hints.
Blocking nature
The aggregate operator is a blocking operator. It needs to consume all the input data before emitting the result.
Hints
num_groups_limit
Type: Integer
Default: 100.000
Defines the max number of groups that can be created by the group by clause. If the number of groups exceeds this limit, the query will not fail but will stop the execution.
Example:
is_partitioned_by_group_by_keys
Type: Boolean
Default: false
If set to true, the engine will consider that the data is already partitioned by the group by keys. This means that the engine will not need to shuffle the data to group them by the group by keys and the coordinator stage will be able to compute the final result without needing to merge the partial results.
Caution: This hint should only be used if the data is already partitioned by the group by keys. There is no check to verify that the data is indeed partitioned by the group by keys and using this hint when the data is not partitioned by the group by keys will lead to incorrect results.
Example:
is_skip_leaf_stage_group_by
Type: Boolean
Default: false
If set to true, the engine will not push down the aggregate into the leaf stage. In some situations, it could be wasted effort to do group-by on leaf, eg: when cardinality of group by column is very high.
Example:
max_initial_result_holder_capacity
Type: Integer
Default: 10.000
Defines the initial capacity of the result holder that stores the intermediate results of the aggregation. This hint can be used to reduce the memory usage of the engine by setting a value close to the expected number of groups. It is usually recommended to not change this hint unless you know that the expected number of groups is much lower than the default value.
Example:
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. Remember that this value is affected by the number of received rows and the complexity of the aggregation function.
emittedRows
Type: Long
The number of groups emitted by the operator. A large number of emitted rows can indicate that the query is not well optimized.
Remember that the number of groups is limited by the num_groups_limit hint and a large number of groups can lead to high memory usage and slow queries.
numGroupsLimitReached
Type: Boolean
This stat is set to true when the number of groups exceeds the limit defined by the num_groups_limit hint. In that case, the query will not fail but will return partial results, which will be indicated by the global partialResponse stat.
Explain attributes
The aggregate operator is represented in the explain plan as a LogicalAggregate explain node.
Remember that these nodes appear in pairs: First in one stage where the aggregation is done in parallel and then in the upstream stage where the partial results are merged.
group
Type: List of Integer
The list of columns used in the group by clause. These numbers are 0-based column indexes on the virtual row projected by the upstream.
For example the explain plan:
Is saying that the group by clause is using the column with index 6 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 6 is the 7th column in the table as defined in its schema.
agg#N
Type: Expression
The aggregation functions applied to the columns. There may be multiple agg#N attributes, each one representing a different aggregation function.
For example the explain plan:
Has two aggregation functions: COUNT() and MAX(). The second is applied to the column with index 5 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 5 is the 6th column in the table as defined in its schema.
Tips and tricks
Try to not use aggregation functions that cannot be parallelized
For example, it is recommended to use one of the different hyperloglog flavor instead of count(distinct) when the cardinality of the data or their size.
For example, it is cheaper to execute count(distinct) on an int column with 1000 distinct values than on a column that stores very long strings, even if the number of distinct values is the same.
Filter
Describes the filter relation operator in the multi-stage query engine.
The filter operator is used to filter rows based on a condition.
This page describes the filter operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use the where, having or sometimes on clauses.
Implementation details
Filter operations apply a predicate to each row and only keep the rows that satisfy the predicate.
It is important to notice that filter operators can only be optimized using indexes when they are executed in the leaf stage. The reason for that is that the intermediate stages don't have access to the actual segments. This is why the engine will try to push down the filter operation to the leaf stage whenever possible.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not indicate whether indexes are used or not.
Blocking nature
The filter operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the predicate.
emittedRows
Type: Long
The number of groups emitted by the operator. A large number of emitted rows may not be problematic, but indicates that the predicate is not very selective.
Explain attributes
The filter operator is represented in the explain plan as a LogicalFilter explain node.
condition
Type: Expression
The condition that is being applied to the rows. The expression may use indexed columns ($0, $1, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the filter is applying the condition $5 > 2 which means that only the rows where the 6th column is greater than 2 will be emitted. In order to know which column is the 6th, you need to look at the schema of the table scanned.
Tips and tricks
How to know if indexes are used
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not directly indicate whether indexes are used or not.
Apache Pinot contributors are working on improving this, but it is not yet available. Meanwhile, we need an indirect approach to get that information.
First, we need to know on which stage the filter is being used. If the filter is being used in an intermediate stage, then the filter is not using indexes. In order to know the stage, you can extract stages as explained in understanding-stages.
But what about the leaf filters executed in the stage? Not all filters in the leaf stage can use indexes. The only way to know if the filter is using indexes is to use single-stage explain plan. In order to do so you need to transform the leaf stage into a single-stage query. This is a manual process that can be tedious but ends up not being so difficult once you get used to it.
Describes the literal relation operator in the multi-stage query engine.
The literal operator is used to define a constant value in the query. This operator may be generated by the multi-stage query engine when you use a constant value in a query.
Blocking nature
The literal operator is a blocking operator, but given its trivial nature it should not matter.
Implementation details
The literal operator is a simple operator that does not require any computation.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator. It should always be one.
Explain attributes
None
Tips and tricks
Avoid very large literals
Take care when using very large literals (in the order hundreds of KBs), as they may need to be sent from brokers to servers and in general may introduce latencies in the parsing and query optimization.
Intersect
Describes the intersect relation operator in the multi-stage query engine.
The intersect operator is a relational operator that combines two relations and returns the common rows between them. The operator is used to find the intersection of two or more relations, usually by using the SQL INTERSECT operator.
Although it is accepted by the parser, the ALL modifier is currently ignored. Therefore INTERSECT and INTERSECT ALL are equivalent. This issue has been reported in #13126
Implementation details
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
In pseudo-code, the algorithm looks like this:
Blocking nature
The intersect operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
Explain attributes
The intersect operator is represented in the explain plan as a LogicalIntersect explain node.
all
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
Although it is accepted in SQL, the all attribute is not currently used in the intersect operator. The returned rows are always distinct. This issue has been reported in
Tips and tricks
The order of input relations matter
The intersect operator has a memory footprint that is proportional to the number of unique rows in the right input relation. It also consumes the right input relation in a blocking fashion while the left input relation is consumed in a streaming fashion.
This means that:
In case any of the input relations is significantly larger than the other, it is recommended to use the smaller relation as the right input relation.
In case one of the input is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
These two hints can be contradictory, so it is up to the user to decide which one to follow based on the specific query pattern. Remember that you can use the stage stats to check the number of rows emitted by each of the inputs and adjust the order of the inputs accordingly.
Minus
Describes the minus relation operator in the multi-stage query engine.
The minus operator is used to subtract the result of one query from another query. This operator is used to find the difference between two sets of rows, usually by using the SQL EXCEPT operator.
Although it is accepted by the parser, the ALL modifier is currently ignored. Therefore EXCEPT and EXCEPT ALL are equivalent. This issue has been reported in
The minus operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are not in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
Blocking nature
The minus operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
In pseudo-code, the algorithm looks like this:
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
Explain attributes
The minus operator is represented in the explain plan as a LogicalMinus explain node.
all
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
Although it is accepted in SQL, the all attribute is not currently used in the minus operator. The returned rows are always distinct. This issue has been reported in #13127
Tips and tricks
Memory pressure
The minus operator ends up having to store all unique rows from both input relations in memory. This can lead to memory pressure if the input relations are large and have a high number of unique rows.
The order of input relations matter
Although the minus operator ends up adding all unique rows from both input relations to a set, the order of input relations matters. While the right input relation is consumed in a blocking fashion, the left input relation is consumed in a streaming fashion. Therefore the latency of the whole query could be improved if the left input relation is producing values in streaming fashion.
In case one of the inputs is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
HashSet<Row> rightRows = new HashSet<>();
Block rightBlock = rightInput.nextBlock();
while (rightBlock is not EOS) {
rightRows.addAll(rightBlock.getRows());
rightBlock = rightInput.nextBlock();
}
Block leftBlock = leftInput.nextBlock();
while (leftBlock is not EOS) {
Block partialResultBlock = new Block();
for (Row row : leftBlock.getRows()) {
if (rightRows.remove(row)) {
partialResultBlock.add(row);
}
}
emit partialResultBlock;
leftBlock = leftInput.nextBlock();
}
emit EOS
HashSet<Row> rightRows = new HashSet<>();
Block rightBlock = rightInput.nextBlock();
while (rightBlock is not EOS) {
rightRows.addAll(rightBlock.getRows());
rightBlock = rightInput.nextBlock();
}
Block leftBlock = leftInput.nextBlock();
while (leftBlock is not EOS) {
Block partialResultBlock = new Block();
for (Row row : leftBlock.getRows()) {
if (rightRows.add(row)) {
partialResultBlock.add(row);
}
}
emit partialResultBlock;
leftBlock = leftInput.nextBlock();
}
emit EOS
Mailbox receive
Describes the mailbox receive operator in the multi-stage query engine.
The mailbox receive operator is the operator that receives the data from the mailbox send operator. This is not an actual relational operator but a Pinot extension used to receive data from other stages.
Implementation details
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox receive operator pulls data from these mailboxes and sends it to the downstream operator.
Blocking nature
The mailbox receive operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
It is important to notice that the mailbox receive operator tries to be fair when reading from multiple workers.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
fanIn
Type: Long
How many workers are sending data to this operator.
inMemoryMessages
Type: Long
How many messages have been received in heap format by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
rawMessages
Type: Long
How many messages have been received in raw format and therefore serialized by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
deserializedBytes
Type: Long
How many bytes have been deserialized by this mailbox. A high number here indicates that the mailbox is receiving a lot of data from other servers, which is expensive in terms of CPU, memory and network.
deserializeTimeMs
Type: Long
How long it took to deserialize the raw messages sent to this mailbox. This time is not wall time, but the sum of the time spent by all threads deserializing messages.
Take into account that this time does not include the impact on the network or the GC.
downstreamWaitMs
Type: Long
How much time this operator has been blocked waiting while offering data to be consumed by the downstream operator. A high number here indicates that the downstream operator is slow and may be a bottleneck. For example, usually the receive operator that is the left input of a join operator has a high value here, as the join needs to consume all the messages from the right input before it can start consuming the left input.
upstreamWaitMs
Type: Long
How much time this operator has been blocked waiting for more data to be sent by the upstream (send) operator. A high number here indicates that the upstream operator is slow and may be a bottleneck. For example, blocking operators like aggregations, sorts, joins or window functions require all the data to be received before they can start emitting a result, so having them as upstream operators of a mailbox receive operator can lead to high values here.
Explain attributes
Given that the mailbox receive operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange or PinotLogicalSortExchange is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
See the to understand the attributes of the exchange explain node.
Tips and tricks
None
Mailbox send
Describes the mailbox send operator in the multi-stage query engine.
The mailbox send operator is the operator that sends data to the mailbox receive operator. This is not an actual relational operator but a Pinot extension used to send data to other stages.
These operators are always the root of the intermediate and leaf stages.
Implementation details
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
Window
Describes the window relational operator in the multi-stage query engine.
The window operator is used to define a window over which to perform calculations.
This page describes the window operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you window functions in a query. You can read more about window functions in the reference documentation.
Unlike the , which will output one row per group, the window operator will output as many rows as input rows.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox send operator wraps these mailboxes, offering single logical mailbox to the stage. How to distribute data to different workers of the downstream stage is determined by the distribution of the operator. The supported distributions are hash, random and broadcast.
hash means there are multiple instances of the stream, and each instance contains records whose keys hash to a particular hash value. Instances are disjoint; a given record appears on exactly one stream. The list of numbers in the bracket indicates the columns used to calculate the hash. These numbers are 0-based column indexes on the virtual row projected by the upstream.
random means there are multiple instances of the stream, and each instance contains randomly chosen records. Instances are disjoint; a given record appears on exactly one stream.
broadcast means there are multiple instances of the stream, and all records appear in each instance. This is the most expensive distribution, as it requires sending all the data to all the workers.
Blocking nature
The mailbox send operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
stage
Type: number
The stage id of the operator. The root stage has id 0 and this number is incremented by 1 for each stage. Current implementation iterates over the stages in pre-order traversal, although this is not guaranteed.
This stat is useful to understand extract stages from queries, as explained in understanding stages.
parallelism
Type: Int
Number of threads executing the stage. Although this stat is only reported in the send mailbox operator, it is the same for all operators in the stage.
fanOut
Type: Int
The number of workers this operation is sending data to. A large fan out may indicate that the operation is sending data to many workers, which may be a bottleneck that may be improved using partitioning.
inMemoryMessages
Type: Int
How many messages have been sent in heap format by this mailbox. Sending in heap messages is more efficient than sending in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
rawMessages
Type: Int
How many messages have been sent in raw format and therefore serialized by this mailbox. Sending in heap messages is more efficient than sending in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
serializedBytes
Type: Long
How many bytes have been serialized by this mailbox. A high number here indicates that the mailbox is sending a lot of data to other servers, which is expensive in terms of CPU, memory and network.
serializationTimeMs
Type: Long
How long it took to serialize the raw messages sent by this mailbox. This time is not wall time, but the sum of the time spent by all threads serializing messages.
Take into account that this time does not include the impact on the network or the GC.
Explain attributes
Given that the mailbox send operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange or PinotLogicalSortExchange is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
distribution
Type: Expression
Example: distribution=[hash[0]], distribution=[random] or distribution=[broadcast]
The distribution used by the mailbox receive operator. Values supported by Pinot are hash, random and broadcast, as explained in the implementation details.
While broadcast and random distributions don't have any parameters, the hash distribution includes a list of numbers in brackets. That list represents the columns used to calculate the hash and are the 0-based column indexes on the virtual row projected by the upstream operator.
For example, in following explain plan:
Indicates that the data is distributed by the first and second columns of the projected row, which are groupUUID and userUUID respectively.
Tips and tricks
None
Window operators take a single input relation and apply window functions to it. For each input row, a window of rows is calculated and one or many aggregations are applied to it.
In general window operator are expensive in terms of CPU and memory usage, but they open the door to a wide range of analytical queries.
Blocking nature
The window operator is a blocking operator. It needs to consume all the input data before emitting the result.
Hints
Window hints are configured with the windowOptions hint, which accepts as argument a map of options and values.
For example:
max_rows_in_window
Type: Integer
Default: 1048576
Max rows allowed to cache the rows in window for further processing.
window_overflow_mode
Type: THROW or BREAK
Default: 'THROW'
Mode when window overflow happens, supported values:
THROW: Break window cache build process, and throw exception, no further WINDOW operation performed.
BREAK: Break window cache build process, continue to perform WINDOW operation, results might be partial.
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the window function.
emittedRows
Type: Long
The number of groups emitted by the operator. A large number of emitted rows can indicate that the query is not well optimized.
Unlike the aggregate operator, which will output one row per group, the window operator will output as many rows as input rows.
maxRowsInWindowReached
Type: Boolean
This attribute is set to true if the maximum number of rows in the window has been reached.
Explain attributes
The window operator is represented in the explain plan as a LogicalWindow explain node.
window#
Type: Expression
The window expressions used by the operator. There may be more than one of these attributes depending on the number of window functions used in the query, although sometimes multiple window function clauses in SQL can be combined into a single window operator.
The expression may use indexed columns ($0, $1, etc) that represent the columns of the virtual row generated by the upstream.
SELECT
/*+ windowOptions(option1='value1', option2='value2') */
col1, SUM(intCol) OVER() as sum FROM table
Union
Describes the union relation operator in the multi-stage query engine.
The union operator combines the results of two or more queries into a single result set. The result set contains all the rows from the queries. Contrary to other set operations (intersect and minus), the union operator does not remove duplicates from the result set. Therefore its semantic is similar to the SQL UNION or UNION ALL operator.
There is no guarantee on the order of the rows in the result set.
While EXCEPT and INTERSECT SQL clauses do not support the ALL modifier, the UNION clause does.
Implementation details
The current implementation consumes input relations one by one. It first returns all rows from the first input relation, then all rows from the second input relation, and so on.
Blocking nature
The union operator is a streaming operator that consumes the input relations one by one. The current implementation fully consumes the inputs in order. See for more details.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
Explain attributes
The union operator is represented in the explain plan as a LogicalUnion explain node.
all
Type: Boolean
Whether the union operator should remove duplicates from the result set.
Although Pinot supports the SQL UNION and UNION ALLclauses, the union operator does only support the UNION ALL semantic. In order to implement the UNION semantic, the multi-stage query engine adds an extra to calculate the distinct.
For example the plan of:
Is expected to be:
While the plan of:
Is a bit more complex
Notice that LogicalUnion is still using all=[true] but the LogicalAggregate is used to remove the duplicates. This also means that while the union operator is always streaming, the union clause results in a blocking plan (given the operator is blocking).
Tips and tricks
The order of input relations matters
The current implementation of the union operator consumes the input relations one by one starting from the first one. This means that the second input relation is not consumed until the first one is fully consumed and so on. Therefore is recommended to put the fastest input relation first to reduce the overall latency.
Usually a good way to set the order of the input relations is to change the input order trying to minimize the value of the stat of all the inputs.
Leaf
Describes the leaf operator in the multi-stage query engine.
The leaf operator is the operator that actually reads the data from the segments. Instead of being just a simple table scan, the leaf operator is a meta-operator that wraps the single-stage query engine and executes all the operators in the leaf stage of the query plan.
Implementation details
The leaf operator is not a relational operator itself but a meta-operator that is able to execute single-stage queries. When servers execute a leaf stage, they compile all operations in the stage but the send operator into the equivalent single-stage query and execute that using a slightly modified version of the single-stage engine.
As a result, leaf stage operators can use all the optimizations and indices that the single-stage engine can use but it also means that there may be slight differences when an operator is executed in a leaf stage compared to when it is executed in an intermediate stage. For example, operations pushed down to the leaf stage may use indexes (see
Transform
Describes the transform relation operator in the multi-stage query engine.
The transform operator is used to apply a transformation to the input data. They may filter out columns or add new ones by applying functions to the existing columns. This operator is generated by the multi-stage query engine when you use a SELECT clause in a query, but can also be used to implement other transformations.
Implementation details
Transform operators apply some transformation functions to the input data received from upstream. The cost of the transformation usually depends on the complexity of the functions applied, but comparing to other operators, it is usually not very high.
Some functions are only supported in multi-stage and some others only in single-stage.
Type coercion is different. While the single-stage engine always operates with generic types (ie uses doubles when mathematical operations are used), the multi-stage engine tries to keep the types (ie adding two integers will result in an integer).
Blocking nature
One of the slight differences between the leaf and the normal single-stage engine is that the leaf engine tries to be not blocking.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
table
Type: String
The name of the table that is scanned. This is the name without the type suffix (so without _REALTIME or _OFFLINE). This is very useful to understand which table is being scanned by this leaf stage in case of complex queries.
numDocsScanned
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of rows selected after the filter phase.
If it is very high, that means the selectivity for the query is low and lots of rows need to be processed after the filtering. Consider refining the filter to increase the selectivity of the query.
numEntriesScannedInFilter
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of entries (aka scalar values) scanned in the filtering phase of query execution.
Can be larger than the total scanned doc count because of multiple filtering predicates or multi-value entries. Can also be smaller than the total scanned doc count if indexing is used for filtering.
This along with numEntriesScannedPostFilter indicates where most of the time is spent during table scan processing. If this value is high, enabling indexing for affected columns is a way to bring it down. Another option is to partition the data based on the dimension most heavily used in your filter queries.
numEntriesScannedPostFilter
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of entries (aka scalar values) scanned after the filtering phase of query execution, ie. aggregation and/or group-by phases. This is equivalent to numDocScanned * number of projected columns.
This along with numEntriesScannedInFilter indicates where most of the time is spent during table scan processing. A high number for this means the selectivity is low (that is, Pinot needs to scan a lot of records to answer the query). If this is high, consider using star-tree index, given a regular index won't improve performance.
numSegmentsQueried
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the total number of segment queried for a query. May be less than the total number of segments if the broker applies optimizations.
The broker decides how many segments to query on each server, based on broker pruning logic. The server decides how many of these segments to actually look at, based on server pruning logic. After processing segments for a query, fewer may have the matching records.
In general, numSegmentsQueried >= numSegmentsProcessed >= numSegmentsMatched.
numSegmentsProcessed
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the number of segments processed with at least one document matched in the query response.
The more segments are processed, the more IO has to be done. This is why selective queries where numSegmentsProcessed is close to numSegmentsQueried can be optimized by changing the data distribution.
numSegmentsMatched
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the number of segment operators used to process segments. Indicates the effectiveness of the pruning logic.
totalDocs
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of rows in the table.
numGroupsLimitReached
Type: Boolean
Similar to the same stat in single-stage queries and the same in aggregate operators, this stat indicates if the max group limit has been reached in a group by aggregation operator executed in the leaf stage.
If this boolean is set to true, the query result may not be accurate. The default value for numGroupsLimit is 100k, and should be sufficient for most use cases.
numResizes
Type: Integer
Number of result resizes for queries
resizeTimeMs
Type: Long
Time spent in resizing results for the output. Either because of LIMIT or maximum allowed group by keys or any other criteria.
threadCpuTimeNs
Type: Long
Aggregated thread cpu time in nanoseconds for query processing from servers. This metric is only available if Pinot is configured with pinot.server.instance.enableThreadCpuTimeMeasurement.
systemActivitiesCpuTimeNs
Type: Long
Aggregated system activities cpu time in nanoseconds for query processing (e.g. GC, OS paging etc.) This metric is only available if Pinot is configured with pinot.server.instance.enableThreadCpuTimeMeasurement.
numSegmentsPrunedByServer
Type: Integer
The number of segments pruned by the server, for any reason.
numSegmentsPrunedInvalid
Type: Integer
The number of segments pruned because they are invalid. Segments are invalid when the schema has changed and the segment has not been refreshed.
For example, if a column is added to the schema, the segment will be invalid for queries that use that column until it is refreshed.
numSegmentsPrunedByLimit
Type: Integer
The number of segments pruned because they are not needed for the query due to the limit clause.
Pinot keeps a count of the number of rows returned by each segment. Once it's guaranteed that no more segments need to be read to satisfy the limit clause without breaking semantics, the remaining segments are pruned.
For example, a query like SELECT col1 FROM table2 LIMIT 10 can be pruned for this reason while a query like SELECT col1 FROM table2 ORDER BY col1 DESC LIMIT 10 cannot because Pinot needs to read all segments to guarantee the larger values of col1 are returned.
numSegmentsPrunedByValue
Type: Integer
The number of segments pruned because they are not needed for the query due to a value clause, usually a where.
Pinot keeps the maximum and minimum values of each segment for each column. If the value clause is such that the segment cannot contain any rows that satisfy the clause, the segment is pruned.
numConsumingSegmentsProcessed
Type: Integer
Like numSegmentsProcessed but only for consuming segments.
numConsumingSegmentsMatched
Type: Integer
Like numSegmentsMatched but only for consuming segments.
operatorExecutionTimeMs
Type: Long
The time spent by the operator executing.
operatorExecStartTimeMs
Type: Long
The instant in time when the operator started executing.
Explain attributes
Given that the leaf operator is a meta-operator, it is not actually shown in the explain plan. But the leaf stage is the only operator that can execute table scans, so here we list the attributes that can be found in the explain plan for a table scan
table
Type: String array
Example: table=[[default, userGroups]]
The qualified name of the table that is scanned, which means it also contains the name of the database being used.
Tips and tricks
Try to push as much as possible to the leaf stage
Leaf stage operators can use all the optimizations and indices that the single-stage engine can use. This means that it is usually better to push down as much as possible to the leaf stage.
The engine is smart enough to push down filters and aggregations without breaking semantics, but sometimes there are subtle SQL semantics and what the domain expert writing the query wants to do.
Sometimes things the engine is too paranoid about null handling or the query includes an unnecessary limit clause that prevents the engine from pushing down the filter.
It is recommended to analyze your explain plan to be sure that the engine is able to push down as much logic as you expect.
The transform operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
Explain attributes
The transform operator is represented in the explain plan as a LogicalProject explain node.
This explain node has a list of attributes that represent the transformations applied to the input data. Each attribute has a name and a value, which is the expression used to generate the column.
For example:
Is saying that the output of the operator has three columns:
userUUID is the 7th column in the virtual row projected by LogicalTableScan, which corresponds to the userUUID column in the table.
deviceOS is the 5th column in the virtual row projected by LogicalTableScan, which corresponds to the deviceOS column in the table.
EXPR$2 is the result of the SUBSTRING($4, 0, 2) expression applied to the 5th column in the virtual row projected by LogicalTableScan. Given we know that the 5th column is deviceOS, we can infer that EXPR$2 is the first two characters of the deviceOS column.
Tips and tricks
None
select userUUID
from (select userUUID from userAttributes)
UNION ALL
(select userUUID from userGroups)
Describes the sort or limit relation operator in the multi-stage query engine.
The sort or limit operator is used to sort the input data, limit the number of rows emitted by the operator or both. This operator is generated by the multi-stage query engine when you use an order by, limit or offset operation in a query.
Implementation details
Blocking nature
Hints
None
Stats
executionTimeMs
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
emittedRows
Type: Long
The number of groups emitted by the operator.
Explain attributes
The sort or limit operator is represented in the explain plan as a LogicalSort explain node.
sort#
Type: Expression
The sort expressions used by the operator. There is one of these attributes per sort expression. The first one is sort0, the second one is sort1, and so on.
The value of this attribute is the expression used to sort the data and may contain indexed columns ($0, $1, etc) that represent the columns of the virtual row generated by the upstream.
For example, the following plan:
Is saying that the rows are sorted first by the column with index 0 and then by the column with index 2 in the virtual row generated by the upstream. That column is generated by a projection whose first column (index 0) is userUUID, the second (index 1) is deviceOS and third (index 2) is the result of the SUBSTRING($4, 0, 2) expression. As we know $4 in this project is deviceOS, we can infer that the third column is the first two characters of the deviceOS column.
dir#
Type: ASC or DESC
The direction of the sort. There is one of these attributes per sort expression.
fetch
Type: Long
The number of rows to emit. This is the equivalent to LIMIT in SQL. Remember that the limit can be applied without sorting, in which case the order on which the rows are emitted is undefined.
offset
Type: Long
The number of rows to skip before emitting the rows. This is the equivalent to OFFSET in SQL.
Tips and tricks
Limit and offset can prevent filter pushdown
In SQL, usually limit and offset are used in the last stage of the query. But when being used in the middle of the query (like in a subquery or a CTE), it can prevent filter pushdown optimization.
For example, imagine the following query:
This query may generate the plan:
We can see that the filter deviceOS = 'windows' is pushed down to the leaf stage. This reduce the amount of data that needs to be scanned and can improve the query performance, specially if there is an inverted index in the deviceOS column.
But if we modify the query to add a limit to the userAttributes table scan:
The generated plan will be:
Here we can see that the filter deviceOS = 'windows' is not pushed down leaf stage, which means that the engine will need to scan all the data in the userAttributes table and then apply the filter.
The reason why the filter is not pushed down is that the limit operation must be applied before the filter in order to not break the semantics, which in this case are saying that we want 10 rows of the userAttributes table without considering their deviceOS value.
In cases where you actually want to apply the filter before the limit, you can specify the where clause in the subquery. For example:
Which will produce the following plan:
As you can see, the filter is pushed down to leaf stage, which will reduce the amount of data
Do not abuse offset pagination
Although OFFSET and LIMIT are a very simple way to paginate results, they can be very inefficient. It is almost always better to paginate using a WHERE clause that uses a range of values instead of using OFFSET.
The reason is that in order to apply an OFFSET the engine must generate these rows and then discard them. Instead, if you use a WHERE clause with a range of values, the engine can apply different techniques like indexes or pruning to avoid reading the rows that are not needed.
This is not a Pinot specific issue, but a general one. See for example or .