Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Describes the filter relation operator in the multi-stage query engine.
The filter operator is used to filter rows based on a condition.
This page describes the filter operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use the where, having or sometimes on clauses.
Filter operations apply a predicate to each row and only keep the rows that satisfy the predicate.
It is important to notice that filter operators can only be optimized using indexes when they are executed in the leaf stage. The reason for that is that the intermediate stages don't have access to the actual segments. This is why the engine will try to push down the filter operation to the leaf stage whenever possible.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not indicate whether indexes are used or not.
The filter operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the predicate.
Type: Long
The number of groups emitted by the operator. A large number of emitted rows may not be problematic, but indicates that the predicate is not very selective.
The filter operator is represented in the explain plan as a LogicalFilter
explain node.
Type: Expression
The condition that is being applied to the rows. The expression may use indexed columns ($0
, $1
, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the filter is applying the condition $5 > 2
which means that only the rows where the 6th column is greater than 2 will be emitted. In order to know which column is the 6th, you need to look at the schema of the table scanned.
As explained in explain-plan-multiple-stages, the explain plan in the multi-stage query engine does not directly indicate whether indexes are used or not.
Apache Pinot contributors are working on improving this, but it is not yet available. Meanwhile, we need an indirect approach to get that information.
First, we need to know on which stage the filter is being used. If the filter is being used in an intermediate stage, then the filter is not using indexes. In order to know the stage, you can extract stages as explained in understanding-stages.
But what about the leaf filters executed in the stage? Not all filters in the leaf stage can use indexes. The only way to know if the filter is using indexes is to use single-stage explain plan. In order to do so you need to transform the leaf stage into a single-stage query. This is a manual process that can be tedious but ends up not being so difficult once you get used to it.
See understanding-multi-stage-query for more information.
Describes the leaf operator in the multi-stage query engine.
The leaf operator is the operator that actually reads the data from the segments. Instead of being just a simple table scan, the leaf operator is a meta-operator that wraps the single-stage query engine and executes all the operators in the leaf stage of the query plan.
The leaf operator is not a relational operator itself but a meta-operator that is able to execute single-stage queries. When servers execute a leaf stage, they compile all operations in the stage but the send operator into the equivalent single-stage query and execute that using a slightly modified version of the single-stage engine.
As a result, leaf stage operators can use all the optimizations and indices that the single-stage engine can use but it also means that there may be slight differences when an operator is executed in a leaf stage compared to when it is executed in an intermediate stage. For example, operations pushed down to the leaf stage may use indexes (see how to know if indexes are used) or the semantics can be slightly different.
You can read Troubleshoot issues with the multi-stage query engine (v2) for more information on the differences between the leaf and intermediate stages, but the main ones are:
Null handling is different.
Some functions are only supported in multi-stage and some others only in single-stage.
Type coercion is different. While the single-stage engine always operates with generic types (ie uses doubles when mathematical operations are used), the multi-stage engine tries to keep the types (ie adding two integers will result in an integer).
One of the slight differences between the leaf and the normal single-stage engine is that the leaf engine tries to be not blocking.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
Type: String
The name of the table that is scanned. This is the name without the type suffix (so without _REALTIME
or _OFFLINE
). This is very useful to understand which table is being scanned by this leaf stage in case of complex queries.
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of rows selected after the filter phase.
If it is very high, that means the selectivity for the query is low and lots of rows need to be processed after the filtering. Consider refining the filter to increase the selectivity of the query.
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of entries (aka scalar values) scanned in the filtering phase of query execution.
Can be larger than the total scanned doc count because of multiple filtering predicates or multi-value entries. Can also be smaller than the total scanned doc count if indexing is used for filtering.
This along with numEntriesScannedPostFilter
indicates where most of the time is spent during table scan processing. If this value is high, enabling indexing for affected columns is a way to bring it down. Another option is to partition the data based on the dimension most heavily used in your filter queries.
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of entries (aka scalar values) scanned after the filtering phase of query execution, ie. aggregation and/or group-by phases. This is equivalent to numDocScanned * number of projected columns
.
This along with numEntriesScannedInFilter
indicates where most of the time is spent during table scan processing. A high number for this means the selectivity is low (that is, Pinot needs to scan a lot of records to answer the query). If this is high, consider using star-tree index, given a regular index won't improve performance.
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the total number of segment queried for a query. May be less than the total number of segments if the broker applies optimizations.
The broker decides how many segments to query on each server, based on broker pruning logic. The server decides how many of these segments to actually look at, based on server pruning logic. After processing segments for a query, fewer may have the matching records.
In general, numSegmentsQueried >= numSegmentsProcessed >= numSegmentsMatched
.
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the number of segments processed with at least one document matched in the query response.
The more segments are processed, the more IO has to be done. This is why selective queries where numSegmentsProcessed
is close to numSegmentsQueried
can be optimized by changing the data distribution.
Type: Integer
Similar to the same stat in single-stage queries, this stat indicates the number of segment operators used to process segments. Indicates the effectiveness of the pruning logic.
Type: Long
Similar to the same stat in single-stage queries, this stat indicates the number of rows in the table.
Type: Boolean
Similar to the same stat in single-stage queries and the same in aggregate operators, this stat indicates if the max group limit has been reached in a group by
aggregation operator executed in the leaf stage.
If this boolean is set to true, the query result may not be accurate. The default value for numGroupsLimit
is 100k, and should be sufficient for most use cases.
Type: Integer
Number of result resizes for queries
Type: Long
Time spent in resizing results for the output. Either because of LIMIT or maximum allowed group by keys or any other criteria.
Type: Long
Aggregated thread cpu time in nanoseconds for query processing from servers. This metric is only available if Pinot is configured with pinot.server.instance.enableThreadCpuTimeMeasurement
.
Type: Long
Aggregated system activities cpu time in nanoseconds for query processing (e.g. GC, OS paging etc.) This metric is only available if Pinot is configured with pinot.server.instance.enableThreadCpuTimeMeasurement
.
Type: Integer
The number of segments pruned by the server, for any reason.
Type: Integer
The number of segments pruned because they are invalid. Segments are invalid when the schema has changed and the segment has not been refreshed.
For example, if a column is added to the schema, the segment will be invalid for queries that use that column until it is refreshed.
Type: Integer
The number of segments pruned because they are not needed for the query due to the limit clause.
Pinot keeps a count of the number of rows returned by each segment. Once it's guaranteed that no more segments need to be read to satisfy the limit clause without breaking semantics, the remaining segments are pruned.
For example, a query like SELECT col1 FROM table2 LIMIT 10
can be pruned for this reason while a query like SELECT col1 FROM table2 ORDER BY col1 DESC LIMIT 10
cannot because Pinot needs to read all segments to guarantee the larger values of col1
are returned.
Type: Integer
The number of segments pruned because they are not needed for the query due to a value clause, usually a where
.
Pinot keeps the maximum and minimum values of each segment for each column. If the value clause is such that the segment cannot contain any rows that satisfy the clause, the segment is pruned.
Type: Integer
Like numSegmentsProcessed
but only for consuming segments.
Type: Integer
Like numSegmentsMatched
but only for consuming segments.
Type: Long
The time spent by the operator executing.
Type: Long
The instant in time when the operator started executing.
Given that the leaf operator is a meta-operator, it is not actually shown in the explain plan. But the leaf stage is the only operator that can execute table scans, so here we list the attributes that can be found in the explain plan for a table scan
Type: String array
Example: table=[[default, userGroups]]
The qualified name of the table that is scanned, which means it also contains the name of the database being used.
Leaf stage operators can use all the optimizations and indices that the single-stage engine can use. This means that it is usually better to push down as much as possible to the leaf stage.
The engine is smart enough to push down filters and aggregations without breaking semantics, but sometimes there are subtle SQL semantics and what the domain expert writing the query wants to do.
Sometimes things the engine is too paranoid about null handling or the query includes an unnecessary limit clause that prevents the engine from pushing down the filter.
It is recommended to analyze your explain plan to be sure that the engine is able to push down as much logic as you expect.
Describes the aggregate relation operator in the multi-stage query engine.
The aggregate operator is used to perform calculations on a set of rows and return a single row of results.
This page describes the aggregate operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use aggregate functions in a query either with or without a group by
clause.
Aggregate operations may be expensive in terms of memory, CPU and network usage. As explained in understanding stages, the multi-stage query engine breaks down the query into multiple stages and each stage is then executed in parallel on different workers. Each worker processes a subset of the data and sends the results to the coordinator which then aggregates the results. When possible, the multi-stage query engine will try to apply a divide-and-conquer strategy to reduce the amount of data that needs to be processed in the coordinator stage.
For example if the aggregation function is a sum, the engine will try to sum the results of each worker before sending the partial result to the coordinator, which would then sum the partial results in order to get the final result. But some aggregation functions, like count(distinct)
, cannot be computed in this way and require all the data to be processed in the coordinator stage.
In Apache Pinot 1.1.0, the multi-stage query engine always keeps the data in memory. This means that the amount of memory used by the engine is proportional to the number of groups generated by the group by
clause and the amount of data that needs to be kept for each group (which depends on the aggregation function).
Even when the aggregation function is a simple count
, which only requires to keep a long for each group in memory, the amount of memory used can be high if the number of groups is high. This is why the engine limits the number of groups. By default, this limit is 100.000, but this can be changed by providing hints.
The aggregate operator is a blocking operator. It needs to consume all the input data before emitting the result.
Type: Integer
Default: 100.000
Defines the max number of groups that can be created by the group by
clause. If the number of groups exceeds this limit, the query will not fail but will stop the execution.
Example:
Type: Boolean
Default: false
If set to true, the engine will consider that the data is already partitioned by the group by
keys. This means that the engine will not need to shuffle the data to group them by the group by
keys and the coordinator stage will be able to compute the final result without needing to merge the partial results.
Caution: This hint should only be used if the data is already partitioned by the group by
keys. There is no check to verify that the data is indeed partitioned by the group by
keys and using this hint when the data is not partitioned by the group by
keys will lead to incorrect results.
Example:
Type: Boolean
Default: false
If set to true, the engine will not push down the aggregate into the leaf stage. In some situations, it could be wasted effort to do group-by on leaf, eg: when cardinality of group by column is very high.
Example:
Type: Integer
Default: 10.000
Defines the initial capacity of the result holder that stores the intermediate results of the aggregation. This hint can be used to reduce the memory usage of the engine by setting a value close to the expected number of groups. It is usually recommended to not change this hint unless you know that the expected number of groups is much lower than the default value.
Example:
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. Remember that this value is affected by the number of received rows and the complexity of the aggregation function.
Type: Long
The number of groups emitted by the operator. A large number of emitted rows can indicate that the query is not well optimized.
Remember that the number of groups is limited by the num_groups_limit
hint and a large number of groups can lead to high memory usage and slow queries.
Type: Boolean
This stat is set to true when the number of groups exceeds the limit defined by the num_groups_limit
hint. In that case, the query will not fail but will return partial results, which will be indicated by the global partialResponse
stat.
The aggregate operator is represented in the explain plan as a LogicalAggregate
explain node.
Remember that these nodes appear in pairs: First in one stage where the aggregation is done in parallel and then in the upstream stage where the partial results are merged.
Type: List of Integer
The list of columns used in the group by
clause. These numbers are 0-based column indexes on the virtual row projected by the upstream.
For example the explain plan:
Is saying that the group by
clause is using the column with index 6 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 6 is the 7th column in the table as defined in its schema.
Type: Expression
The aggregation functions applied to the columns. There may be multiple agg#N
attributes, each one representing a different aggregation function.
For example the explain plan:
Has two aggregation functions: COUNT()
and MAX()
. The second is applied to the column with index 5 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 5 is the 6th column in the table as defined in its schema.
For example, it is recommended to use one of the different hyperloglog flavor instead of count(distinct)
when the cardinality of the data or their size.
For example, it is cheaper to execute count(distinct)
on an int column with 1000 distinct values than on a column that stores very long strings, even if the number of distinct values is the same.
Describes the intersect relation operator in the multi-stage query engine.
The intersect operator is a relational operator that combines two relations and returns the common rows between them. The operator is used to find the intersection of two or more relations, usually by using the SQL INTERSECT
operator.
Although it is accepted by the parser, the ALL
modifier is currently ignored. Therefore INTERSECT
and INTERSECT ALL
are equivalent. This issue has been reported in #13126
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
In pseudo-code, the algorithm looks like this:
The intersect operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The intersect operator is represented in the explain plan as a LogicalIntersect
explain node.
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
Although it is accepted in SQL, the all
attribute is not currently used in the intersect operator. The returned rows are always distinct. This issue has been reported in #13126
The intersect operator has a memory footprint that is proportional to the number of unique rows in the right input relation. It also consumes the right input relation in a blocking fashion while the left input relation is consumed in a streaming fashion.
This means that:
In case any of the input relations is significantly larger than the other, it is recommended to use the smaller relation as the right input relation.
In case one of the input is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
These two hints can be contradictory, so it is up to the user to decide which one to follow based on the specific query pattern. Remember that you can use the stage stats to check the number of rows emitted by each of the inputs and adjust the order of the inputs accordingly.
Describes the mailbox send operator in the multi-stage query engine.
The mailbox send operator is the operator that sends data to the mailbox receive operator. This is not an actual relational operator but a Pinot extension used to send data to other stages.
These operators are always the root of the intermediate and leaf stages.
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox send operator wraps these mailboxes, offering single logical mailbox to the stage. How to distribute data to different workers of the downstream stage is determined by the distribution of the operator. The supported distributions are hash
, random
and broadcast
.
hash
means there are multiple instances of the stream, and each instance contains records whose keys hash to a particular hash value. Instances are disjoint; a given record appears on exactly one stream. The list of numbers in the bracket indicates the columns used to calculate the hash. These numbers are 0-based column indexes on the virtual row projected by the upstream.
random
means there are multiple instances of the stream, and each instance contains randomly chosen records. Instances are disjoint; a given record appears on exactly one stream.
broadcast
means there are multiple instances of the stream, and all records appear in each instance. This is the most expensive distribution, as it requires sending all the data to all the workers.
The mailbox send operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
Type: number
The stage id of the operator. The root stage has id 0 and this number is incremented by 1 for each stage. Current implementation iterates over the stages in pre-order traversal, although this is not guaranteed.
This stat is useful to understand extract stages from queries, as explained in understanding stages.
Type: Int
Number of threads executing the stage. Although this stat is only reported in the send mailbox operator, it is the same for all operators in the stage.
Type: Int
The number of workers this operation is sending data to. A large fan out may indicate that the operation is sending data to many workers, which may be a bottleneck that may be improved using partitioning.
Type: Int
How many messages have been sent in heap format by this mailbox. Sending in heap messages is more efficient than sending in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Int
How many messages have been sent in raw format and therefore serialized by this mailbox. Sending in heap messages is more efficient than sending in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many bytes have been serialized by this mailbox. A high number here indicates that the mailbox is sending a lot of data to other servers, which is expensive in terms of CPU, memory and network.
Type: Long
How long it took to serialize the raw messages sent by this mailbox. This time is not wall time, but the sum of the time spent by all threads serializing messages.
Take into account that this time does not include the impact on the network or the GC.
Given that the mailbox send operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange
or PinotLogicalSortExchange
is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
Type: Expression
Example: distribution=[hash[0]]
, distribution=[random]
or distribution=[broadcast]
The distribution used by the mailbox receive operator. Values supported by Pinot are hash
, random
and broadcast
, as explained in the implementation details.
While broadcast
and random
distributions don't have any parameters, the hash
distribution includes a list of numbers in brackets. That list represents the columns used to calculate the hash and are the 0-based column indexes on the virtual row projected by the upstream operator.
For example, in following explain plan:
Indicates that the data is distributed by the first and second columns of the projected row, which are groupUUID
and userUUID
respectively.
None
Describes the multi-stage operators in general
The multi-stage query engine uses a set of operators to process the query. These operators are based on relational algebra, with some modifications to better fit the distributed nature of the engine.
These operators are the execution units that Pinot uses to execute a query. The operators are executed in a pipeline with tree structure, where each operator consumes the output of the previous operators (also known as upstreams).
Users do not directly specify these operators. Instead they write SQL queries that are translated into a logical plan, which is then transformed into different operators. The logical plan can be obtained using explaining a query, while there is no way to get the operators directly. The closest thing to the operators that users can get is the multi-stage stats output.
These operators are generated from the SQL query that you write, but even they are similar, there is not a one-to-one mapping between the SQL clauses and the operators. Some SQL clauses generate multiple operators, while some operators are generated by multiple SQL clauses.
Operators and explain plan nodes are closer than SQL clauses and operators. Although most explain plan nodes can be directly mapped to an operator, there are some exceptions:
Each PinotLogicalExchange
and each PinotLogicalSortExchange
explain node is materialized into a pair of mailbox send and mailbox receive operators.
All plan nodes that belong to the same leaf stage are executed in the leaf operator.
In general terms, the operators are the execution units that Pinot uses to execute a query and are also known as the multi-stage physical plan, while the explain plan nodes are logical plans. The difference between the two is that the operators can be actually executed, while the explain plan nodes are the logical representation of the query plan.
The following is a list of operators that are used by the multi-stage query engine:
Describes the literal relation operator in the multi-stage query engine.
The literal operator is used to define a constant value in the query. This operator may be generated by the multi-stage query engine when you use a constant value in a query.
The literal operator is a blocking operator, but given its trivial nature it should not matter.
The literal operator is a simple operator that does not require any computation.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. It should always be one.
None
Take care when using very large literals (in the order hundreds of KBs), as they may need to be sent from brokers to servers and in general may introduce latencies in the parsing and query optimization.
Describes the mailbox receive operator in the multi-stage query engine.
The mailbox receive operator is the operator that receives the data from the mailbox send operator. This is not an actual relational operator but a Pinot extension used to receive data from other stages.
Stages in the multi-stage query engine are executed in parallel by different workers. Workers send data to each other using mailboxes. The number of mailboxes depends on the send operator parallelism, the receive operator parallelism and the distribution being used. At worse, there is one mailbox per worker pair, so if the upstream send operator has a parallelism of S and the receive operator has a parallelism of R, there will be S * R mailboxes.
By default, these mailboxes are GRPC channels, but when both workers are in the same server, they can use shared memory and therefore a more efficient on heap mailbox is used.
The mailbox receive operator pulls data from these mailboxes and sends it to the downstream operator.
The mailbox receive operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
It is important to notice that the mailbox receive operator tries to be fair when reading from multiple workers.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. This operator should always emit as many rows as its upstream operator.
Type: Long
How many workers are sending data to this operator.
Type: Long
How many messages have been received in heap format by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many messages have been received in raw format and therefore serialized by this mailbox. Receiving in heap messages is more efficient than receiving them in raw format, as the messages do not need to be serialized and deserialized and no network transfer is needed.
Type: Long
How many bytes have been deserialized by this mailbox. A high number here indicates that the mailbox is receiving a lot of data from other servers, which is expensive in terms of CPU, memory and network.
Type: Long
How long it took to deserialize the raw messages sent to this mailbox. This time is not wall time, but the sum of the time spent by all threads deserializing messages.
Take into account that this time does not include the impact on the network or the GC.
Type: Long
How much time this operator has been blocked waiting while offering data to be consumed by the downstream operator. A high number here indicates that the downstream operator is slow and may be a bottleneck. For example, usually the receive operator that is the left input of a join operator has a high value here, as the join needs to consume all the messages from the right input before it can start consuming the left input.
Type: Long
How much time this operator has been blocked waiting for more data to be sent by the upstream (send) operator. A high number here indicates that the upstream operator is slow and may be a bottleneck. For example, blocking operators like aggregations, sorts, joins or window functions require all the data to be received before they can start emitting a result, so having them as upstream operators of a mailbox receive operator can lead to high values here.
Given that the mailbox receive operator is a meta-operator, it is not actually shown in the explain plan. Instead, a single PinotLogicalExchange
or PinotLogicalSortExchange
is shown in the explain plan. This exchange explain node is the logical representation of a pair of send and receive operators.
See the mailbox send operator to understand the attributes of the exchange explain node.
None
Describes the hash join relation operator in the multi-stage query engine.
The hash join operator is used to join two relations using a hash join algorithm. It is a binary operator that takes two inputs, the left and right relations, and produces a single output relation.
This is the only join operator in the multi-stage query engine and it is always created as a result of a query that contains a join clause, but can be created by other SQL queries like ones using semi-join.
There are different types of joins that can be performed using the hash join operator. Apache Pinot supports:
Inner join, where only the rows that have a match in both relations are returned.
Left join, where all the rows from the left relation are returned. The ones that have a match with the right relation are returned with the columns from the right relation, and the ones that do not have a match are returned with null values for the columns from the right relation.
Right join, like the left join but returning all the rows from the right relation, with the columns from the left relation filled with null values for the rows that do not have a match.
Full outer join, where all the rows from both relations are returned. If a row from any relation does not have a match in the other relation, the columns from the other relation are filled with null values.
Semi-join, where only the rows from the left relation that have a match in the right relation are returned. This is useful to filter the rows from the left relation based on the existence of a match in the right relation.
Anti-join, where only the rows from the left relation that do not have a match in the right relation are returned.
The hash join operator is one of the new operators introduced in the multi-stage query engine. The current implementation assumes that the right input relation is the smaller one, so it consumes this input first building a hash table that is then probed with the left input relation.
Future optimizations may include advanced heuristics to decide which input relation to consume first, but in the current implementation, it is important to specify the smaller relation as the right input.
Although the whole multi-stage query engine is designed to be able to process the data in memory, the multi-stage query engine uses the ability to execute each stage in different workers (explained in understanding stages) to be able to process the data that may not fit in the memory of a single node. Specifically, each worker processes a subset of the data. Inputs are by default partitioned by the join keys and each worker process one partition of the data.
This means that data usually needs to be shuffled between workers, which is done by the engine using a mailbox system. The engine tries to minimize the amount of data that needs to be shuffled by partitioning the data, but some techniques can be used to reduce the amount of data that needs to be shuffled, like using co-located joins.
The hash join operator is a blocking operator. It needs to consume all the input data (from both inputs) before emitting the result.
Even using partitioning, the amount of data that needs to be stored in memory can be high, so the engine tries to protect itself from running out of memory by limiting the number of groups that can be created by the join keys.
The join_overflow_mode hint can be used to control the behavior of the engine when the number of groups exceeds the limit. This limit can be defined using the max_rows_in_join hint. By default, this limit is slightly above 1 million groups and the default join overflow mode is THROW
, which means that the query will fail if the number of groups exceeds the limit.
Type: String
Default: THROW
Defines the behavior of the engine when the number of groups exceeds the limit defined by the max_rows_in_join
hint. The possible values are:
THROW
: The query will fail if the number of groups exceeds the limit.
BREAK
: The engine will stop processing the join and return the results that have been computed so far. In this case the stat maxRowsInJoinReached
will be true.
Type: Integer
Default: 1.048.576
The maximum number of groups that can be created by the join keys. What happens when this limit is reached is defined by the join_overflow_mode
hint.
Take care when increasing this limit. If the number of groups is too high, the amount of memory used by the engine can be very high, which can lead to very large GC pauses and even out of memory errors.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator. Joins can emit more rows than the input relations, so this value can be higher than the number of rows in the input. Remember that the number of groups is limited by the max_rows_in_join
hint and a large number of groups can lead to high memory usage and long GC pauses, which can affect the performance of the whole system.
Type: Boolean
This stat is set to true when the number of groups exceeds the limit defined by the max_rows_in_join
hint.
Notice that by default the engine will throw an exception when this happens in which case no stat will be emitted. Therefore this stat is only emitted when the join_overflow_mode
hint is set to BREAK
.
Type: Long
The time spent building the hash table used to probe the join keys, in milliseconds.
A large number here can indicate that the right relation is too large or the right relation is taking too long to be processed.
The hash join operator is represented in the explain plan as a LogicalJoin
explain node.
Type: Expression
The condition that is being applied to the rows to join the relations. The expression may use indexed columns ($0
, $1
, etc), functions and literals. The indexed columns are always 0-based.
For example, the following explain plan:
Is saying that the join condition is that the column with index 0 in the left relation is equal to the column with index 1 in the right relation. Given the rest of the explain plan, we can see that the column with index 0 userUUID
column in the userAttributes
table and the column with index 1 is the userUUID
column in the userGroups
table.
Type: String
The type of join that is being performed. The possible values are: inner
, left
, right
, full
, semi
and anti
, as explained in Implementation details.
Apache Pinot does not use table stats to determine the best order to consume the input relations. Instead, it assumes that the right input relation is the smaller one. That relation will always be fully consumed to build a hash table and sometimes it will be broadcasted to all workers. This means that it is important to specify the smaller relation as the right input.
Remember that left and right are relative to the order of the tables in the SQL query. It is less expensive to do a join between a large table and a small table than the other way around.
For example, this query:
is more efficient than:
Describes the transform relation operator in the multi-stage query engine.
The transform operator is used to apply a transformation to the input data. They may filter out columns or add new ones by applying functions to the existing columns. This operator is generated by the multi-stage query engine when you use a SELECT
clause in a query, but can also be used to implement other transformations.
Transform operators apply some transformation functions to the input data received from upstream. The cost of the transformation usually depends on the complexity of the functions applied, but comparing to other operators, it is usually not very high.
The transform operator is a streaming operator. It emits the blocks of rows as soon as they are received from the upstream operator.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The transform operator is represented in the explain plan as a LogicalProject
explain node.
This explain node has a list of attributes that represent the transformations applied to the input data. Each attribute has a name and a value, which is the expression used to generate the column.
For example:
Is saying that the output of the operator has three columns:
userUUID
is the 7th column in the virtual row projected by LogicalTableScan, which corresponds to the userUUID
column in the table.
deviceOS
is the 5th column in the virtual row projected by LogicalTableScan, which corresponds to the deviceOS
column in the table.
EXPR$2
is the result of the SUBSTRING($4, 0, 2)
expression applied to the 5th column in the virtual row projected by LogicalTableScan. Given we know that the 5th column is deviceOS
, we can infer that EXPR$2
is the first two characters of the deviceOS
column.
None
Describes the minus relation operator in the multi-stage query engine.
The minus operator is used to subtract the result of one query from another query. This operator is used to find the difference between two sets of rows, usually by using the SQL EXCEPT
operator.
Although it is accepted by the parser, the ALL
modifier is currently ignored. Therefore EXCEPT
and EXCEPT ALL
are equivalent. This issue has been reported in #13127
The minus operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
The current implementation consumes the whole right input relation first and stores the rows in a set. Then it consumes the left input relation one block at a time. Each time a block of rows is read from the left input relation, the operator checks if the rows are in the set of rows from the right input relation. All unique rows that are not in the set are added to a new partial result block. Once the whole left input block is analyzed, the operator emits the partial result block.
This process is repeated until all rows from the left input relation are processed.
The minus operator is a semi-blocking operator that first consumes the right input relation in a blocking fashion and then consumes the left input relation in a streaming fashion.
In pseudo-code, the algorithm looks like this:
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The minus operator is represented in the explain plan as a LogicalMinus
explain node.
Type: Boolean
This attribute is used to indicate if the operator should return all the rows or only the distinct rows.
Although it is accepted in SQL, the all
attribute is not currently used in the minus operator. The returned rows are always distinct. This issue has been reported in #13127
The minus operator ends up having to store all unique rows from both input relations in memory. This can lead to memory pressure if the input relations are large and have a high number of unique rows.
Although the minus operator ends up adding all unique rows from both input relations to a set, the order of input relations matters. While the right input relation is consumed in a blocking fashion, the left input relation is consumed in a streaming fashion. Therefore the latency of the whole query could be improved if the left input relation is producing values in streaming fashion.
In case one of the inputs is blocking and the other is not, it is recommended to use the blocking relation as the right input relation.
Describes the window relational operator in the multi-stage query engine.
The window operator is used to define a window over which to perform calculations.
This page describes the window operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you window functions in a query. You can read more about window functions in the windows functions reference documentation.
Unlike the aggregate operator, which will output one row per group, the window operator will output as many rows as input rows.
Window operators take a single input relation and apply window functions to it. For each input row, a window of rows is calculated and one or many aggregations are applied to it.
In general window operator are expensive in terms of CPU and memory usage, but they open the door to a wide range of analytical queries.
The window operator is a blocking operator. It needs to consume all the input data before emitting the result.
Window hints are configured with the windowOptions
hint, which accepts as argument a map of options and values.
For example:
Type: Integer
Default: 1048576
Max rows allowed to cache the rows in window for further processing.
Type: THROW or BREAK
Default: 'THROW'
Mode when window overflow happens, supported values:
THROW
: Break window cache build process, and throw exception, no further WINDOW operation performed.
BREAK
: Break window cache build process, continue to perform WINDOW operation, results might be partial.
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. This number is affected by the number of received rows and the complexity of the window function.
Type: Long
The number of groups emitted by the operator. A large number of emitted rows can indicate that the query is not well optimized.
Unlike the aggregate operator, which will output one row per group, the window operator will output as many rows as input rows.
Type: Boolean
This attribute is set to true
if the maximum number of rows in the window has been reached.
The window operator is represented in the explain plan as a LogicalWindow
explain node.
Type: Expression
The window expressions used by the operator. There may be more than one of these attributes depending on the number of window functions used in the query, although sometimes multiple window function clauses in SQL can be combined into a single window operator.
The expression may use indexed columns ($0
, $1
, etc) that represent the columns of the virtual row generated by the upstream.
None
Describes the union relation operator in the multi-stage query engine.
The union operator combines the results of two or more queries into a single result set. The result set contains all the rows from the queries. Contrary to other set operations (intersect and minus), the union operator does not remove duplicates from the result set. Therefore its semantic is similar to the SQL UNION
or UNION ALL
operator.
There is no guarantee on the order of the rows in the result set.
While EXCEPT
and INTERSECT
SQL clauses do not support the ALL
modifier, the UNION
clause does.
The current implementation consumes input relations one by one. It first returns all rows from the first input relation, then all rows from the second input relation, and so on.
The union operator is a streaming operator that consumes the input relations one by one. The current implementation fully consumes the inputs in order. See the order of input relations matter for more details.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The union operator is represented in the explain plan as a LogicalUnion
explain node.
Type: Boolean
Whether the union operator should remove duplicates from the result set.
Although Pinot supports the SQL UNION
and UNION ALL
clauses, the union operator does only support the UNION ALL
semantic. In order to implement the UNION
semantic, the multi-stage query engine adds an extra aggregate to calculate the distinct.
For example the plan of:
Is expected to be:
While the plan of:
Is a bit more complex
Notice that LogicalUnion
is still using all=[true]
but the LogicalAggregate
is used to remove the duplicates. This also means that while the union operator is always streaming, the union clause results in a blocking plan (given the aggregate operator is blocking).
The current implementation of the union operator consumes the input relations one by one starting from the first one. This means that the second input relation is not consumed until the first one is fully consumed and so on. Therefore is recommended to put the fastest input relation first to reduce the overall latency.
Usually a good way to set the order of the input relations is to change the input order trying to minimize the value of the downstreamWaitMs stat of all the inputs.
Describes the sort or limit relation operator in the multi-stage query engine.
The sort or limit operator is used to sort the input data, limit the number of rows emitted by the operator or both. This operator is generated by the multi-stage query engine when you use an order by
, limit
or offset
operation in a query.
None
Type: Long
The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.
Type: Long
The number of groups emitted by the operator.
The sort or limit operator is represented in the explain plan as a LogicalSort
explain node.
Type: Expression
The sort expressions used by the operator. There is one of these attributes per sort expression. The first one is sort0
, the second one is sort1
, and so on.
The value of this attribute is the expression used to sort the data and may contain indexed columns ($0
, $1
, etc) that represent the columns of the virtual row generated by the upstream.
For example, the following plan:
Is saying that the rows are sorted first by the column with index 0 and then by the column with index 2 in the virtual row generated by the upstream. That column is generated by a projection whose first column (index 0) is userUUID
, the second (index 1) is deviceOS
and third (index 2) is the result of the SUBSTRING($4, 0, 2)
expression. As we know $4
in this project is deviceOS
, we can infer that the third column is the first two characters of the deviceOS
column.
Type: ASC or DESC
The direction of the sort. There is one of these attributes per sort expression.
Type: Long
The number of rows to emit. This is the equivalent to LIMIT
in SQL. Remember that the limit can be applied without sorting, in which case the order on which the rows are emitted is undefined.
Type: Long
The number of rows to skip before emitting the rows. This is the equivalent to OFFSET
in SQL.
In SQL, usually limit
and offset
are used in the last stage of the query. But when being used in the middle of the query (like in a subquery or a CTE), it can prevent filter pushdown optimization.
For example, imagine the following query:
This query may generate the plan:
We can see that the filter deviceOS = 'windows'
is pushed down to the leaf stage. This reduce the amount of data that needs to be scanned and can improve the query performance, specially if there is an inverted index in the deviceOS
column.
But if we modify the query to add a limit
to the userAttributes
table scan:
The generated plan will be:
Here we can see that the filter deviceOS = 'windows'
is not pushed down leaf stage, which means that the engine will need to scan all the data in the userAttributes
table and then apply the filter.
The reason why the filter is not pushed down is that the limit
operation must be applied before the filter in order to not break the semantics, which in this case are saying that we want 10 rows of the userAttributes
table without considering their deviceOS
value.
In cases where you actually want to apply the filter before the limit
, you can specify the where clause in the subquery. For example:
Which will produce the following plan:
As you can see, the filter is pushed down to leaf stage, which will reduce the amount of data
offset
paginationAlthough OFFSET
and LIMIT
are a very simple way to paginate results, they can be very inefficient. It is almost always better to paginate using a WHERE
clause that uses a range of values instead of using OFFSET
.
The reason is that in order to apply an OFFSET
the engine must generate these rows and then discard them. Instead, if you use a WHERE
clause with a range of values, the engine can apply different techniques like indexes or pruning to avoid reading the rows that are not needed.
This is not a Pinot specific issue, but a general one. See for example Paging Through Results (external link) or Pagination, You Are Probably Doing It Wrong (external link).