Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Describes the filter relation operator in the multi-stage query engine.
The filter operator is used to filter rows based on a condition.
This page describes the filter operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use the where, having or sometimes on clauses.
Filter operations apply a predicate to each row and only keep the rows that satisfy the predicate.
It is important to notice that filter operators can only be optimized using indexes when they are executed in the leaf stage. The reason for that is that the intermediate stages don't have access to the actual segments. This is why the engine will try to push down the filter operation to the leaf stage whenever possible.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not indicate whether indexes are used or not.
The filter operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the predicate.
Type: Long
The number of groups emitted by the operator. A large number of emitted rows may not be problematic, but indicates that the predicate is not very selective.
The filter operator is represented in the explain plan as a LogicalFilter
explain node.
Type: Expression
The condition that is being applied to the rows. The expression may use indexed columns ($0
, $1
, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the filter is applying the condition $5 > 2
which means that only the rows where the 6th column is greater than 2 will be emitted. In order to know which column is the 6th, you need to look at the schema of the table scanned.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not directly indicate whether indexes are used or not.
Apache Pinot contributors are working on improving this, but it is not yet available. Meanwhile, we need an indirect approach to get that information.
First, we need to know on which stage the filter is being used. If the filter is being used in an intermediate stage, then the filter is not using indexes. In order to know the stage, you can extract stages as explained in understanding-stages.
But what about the leaf filters executed in the stage? Not all filters in the leaf stage can use indexes. The only way to know if the filter is using indexes is to use single-stage explain plan. In order to do so you need to transform the leaf stage into a single-stage query. This is a manual process that can be tedious but ends up not being so difficult once you get used to it.
See understanding-multi-stage-query for more information.
Describes the mailbox receive operator in the multi-stage query engine.
The mailbox receive operator is the operator that receives the data from the mailbox send operator. This is not an actual relational operator but a Pinot extension used to receive data from other stages.
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox receive operator pulls data from these mailboxes and sends it to the downstream operator.
The mailbox receive operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
It is important to notice that the mailbox receive operator tries to be fair when reading from multiple workers.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
Type: Long
How many workers are sending data to this operator.
Type: Long
How many messages have been received in heap format by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many messages have been received in raw format and therefore serialized by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many bytes have been deserialized by this mailbox. A high number here indicates that the mailbox is receiving a lot of data from other servers, which is expensive in terms of CPU, memory and network.
Type: Long
How long it took to deserialize the raw messages sent to this mailbox. This time is not wall time, but the sum of the time spent by all threads deserializing messages.
Take into account that this time does not include the impact on the network or the GC.
Type: Long
How much time this operator has been blocked waiting while offering data to be consumed by the downstream operator. A high number here indicates that the downstream operator is slow and may be a bottleneck. For example, usually the receive operator that is the left input of a join operator has a high value here, as the join needs to consume all the messages from the right input before it can start consuming the left input.
Type: Long
How much time this operator has been blocked waiting for more data to be sent by the upstream (send) operator. A high number here indicates that the upstream operator is slow and may be a bottleneck. For example, blocking operators like aggregations, sorts, joins or window functions require all the data to be received before they can start emitting a result, so having them as upstream operators of a mailbox receive operator can lead to high values here.
Given that the mailbox receive operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange
or PinotLogicalSortExchange
is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
None
Describes the intersect relation operator in the multi-stage query engine.
The intersect operator is a relational operator that combines two relations and returns the common rows between them. The operator is used to find the intersection of two or more relations, usually by using the SQL INTERSECT
operator.
Although it is accepted by the parser, the ALL
modifier is currently ignored. Therefore INTERSECT
and INTERSECT ALL
are equivalent. This issue has been reported in
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
In pseudo-code, the algorithm looks like this:
The intersect operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The intersect operator is represented in the explain plan as a LogicalIntersect
explain node.
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
The intersect operator has a memory footprint that is proportional to the number of unique rows in the right input relation. It also consumes the right input relation in a blocking fashion while the left input relation is consumed in a streaming fashion.
This means that:
In case any of the input relations is significantly larger than the other, it is recommended to use the smaller relation as the right input relation.
In case one of the input is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
These two hints can be contradictory, so it is up to the user to decide which one to follow based on the specific query pattern. Remember that you can use the stage stats to check the number of rows emitted by each of the inputs and adjust the order of the inputs accordingly.
Describes the hash join relation operator in the multi-stage query engine.
The hash join operator is used to join two relations using a hash join algorithm. It is a binary operator that takes two inputs, the left and right relations, and produces a single output relation.
This is the only join operator in the multi-stage query engine and it is always created as a result of a query that contains a join clause, but can be created by other SQL queries like ones using semi-join.
There are different types of joins that can be performed using the hash join operator. Apache Pinot supports:
Inner join, where only the rows that have a match in both relations are returned.
Left join, where all the rows from the left relation are returned. The ones that have a match with the right relation are returned with the columns from the right relation, and the ones that do not have a match are returned with null values for the columns from the right relation.
Right join, like the left join but returning all the rows from the right relation, with the columns from the left relation filled with null values for the rows that do not have a match.
Full outer join, where all the rows from both relations are returned. If a row from any relation does not have a match in the other relation, the columns from the other relation are filled with null values.
Semi-join, where only the rows from the left relation that have a match in the right relation are returned. This is useful to filter the rows from the left relation based on the existence of a match in the right relation.
Anti-join, where only the rows from the left relation that do not have a match in the right relation are returned.
The hash join operator is one of the new operators introduced in the multi-stage query engine. The current implementation assumes that the right input relation is the smaller one, so it consumes this input first building a hash table that is then probed with the left input relation.
Future optimizations may include advanced heuristics to decide which input relation to consume first, but in the current implementation, it is important to specify the smaller relation as the right input.
Although the whole multi-stage query engine is designed to be able to process the data in memory, the multi-stage query engine uses the ability to execute each stage in different workers (explained in ) to be able to process the data that may not fit in the memory of a single node. Specifically, each worker processes a subset of the data. Inputs are by default partitioned by the join keys and each worker process one partition of the data.
This means that data usually needs to be shuffled between workers, which is done by the engine using a mailbox system. The engine tries to minimize the amount of data that needs to be shuffled by partitioning the data, but some techniques can be used to reduce the amount of data that needs to be shuffled, like using co-located joins.
The hash join operator is a blocking operator. It needs to consume all the input data (from both inputs) before emitting the result.
Even using partitioning, the amount of data that needs to be stored in memory can be high, so the engine tries to protect itself from running out of memory by limiting the number of groups that can be created by the join keys.
Type: String
Default: THROW
Defines the behavior of the engine when the number of groups exceeds the limit defined by the max_rows_in_join
hint. The possible values are:
THROW
: The query will fail if the number of groups exceeds the limit.
BREAK
: The engine will stop processing the join and return the results that have been computed so far. In this case the stat maxRowsInJoinReached
will be true.
Type: Integer
Default: 1.048.576
The maximum number of groups that can be created by the join keys. What happens when this limit is reached is defined by the join_overflow_mode
hint.
Take care when increasing this limit. If the number of groups is too high, the amount of memory used by the engine can be very high, which can lead to very large GC pauses and even out of memory errors.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. Joins can emit more rows than the input relations, so this value can be higher than the number of rows in the input. Remember that the number of groups is limited by the max_rows_in_join
hint and a large number of groups can lead to high memory usage and long GC pauses, which can affect the performance of the whole system.
Type: Boolean
This stat is set to true when the number of groups exceeds the limit defined by the max_rows_in_join
hint.
Notice that by default the engine will throw an exception when this happens in which case no stat will be emitted. Therefore this stat is only emitted when the join_overflow_mode
hint is set to BREAK
.
Type: Long
The time spent building the hash table used to probe the join keys, in milliseconds.
A large number here can indicate that the right relation is too large or the right relation is taking too long to be processed.
The hash join operator is represented in the explain plan as a LogicalJoin
explain node.
Type: Expression
The condition that is being applied to the rows to join the relations. The expression may use indexed columns ($0
, $1
, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the join condition is that the column with index 0 in the left relation is equal to the column with index 1 in the right relation. Given the rest of the explain plan, we can see that the column with index 0 userUUID
column in the userAttributes
table and the column with index 1 is the userUUID
column in the userGroups
table.
Type: String
Apache Pinot does not use table stats to determine the best order to consume the input relations. Instead, it assumes that the right input relation is the smaller one. That relation will always be fully consumed to build a hash table and sometimes it will be broadcasted to all workers. This means that it is important to specify the smaller relation as the right input.
Remember that left and right are relative to the order of the tables in the SQL query. It is less expensive to do a join between a large table and a small table than the other way around.
For example, this query:
is more efficient than:
Describes the multi-stage operators in general
The multi-stage query engine uses a set of operators to process the query. These operators are based on relational algebra, with some modifications to better fit the distributed nature of the engine.
These operators are the execution units that Pinot uses to execute a query. The operators are executed in a pipeline with tree structure, where each operator consumes the output of the previous operators (also known as upstreams).
Users do not directly specify these operators. Instead they write SQL queries that are translated into a logical plan, which is then transformed into different operators. The logical plan can be obtained using , while there is no way to get the operators directly. The closest thing to the operators that users can get is the output.
These operators are generated from the SQL query that you write, but even they are similar, there is not a one-to-one mapping between the SQL clauses and the operators. Some SQL clauses generate multiple operators, while some operators are generated by multiple SQL clauses.
Operators and explain plan nodes are closer than SQL clauses and operators. Although most explain plan nodes can be directly mapped to an operator, there are some exceptions:
Each PinotLogicalExchange
and each PinotLogicalSortExchange
explain node is materialized into a pair of and operators.
All plan nodes that belong to the same leaf stage are executed in the operator.
In general terms, the operators are the execution units that Pinot uses to execute a query and are also known as the multi-stage physical plan, while the explain plan nodes are logical plans. The difference between the two is that the operators can be actually executed, while the explain plan nodes are the logical representation of the query plan.
The following is a list of operators that are used by the multi-stage query engine:
See the to understand the attributes of the exchange explain node.
Although it is accepted in SQL, the all
attribute is not currently used in the intersect operator. The returned rows are always distinct. This issue has been reported in
The hint can be used to control the behavior of the engine when the number of groups exceeds the limit. This limit can be defined using the hint. By default, this limit is slightly above 1 million groups and the default join overflow mode is THROW
, which means that the query will fail if the number of groups exceeds the limit.
The type of join that is being performed. The possible values are: inner
, left
, right
, full
, semi
and anti
, as explained in .