Physical Optimizer
Describes the new Multistage Engine Physical Query Optimizer
MSE Physical Optimizer is included in Pinot 1.4 and is currently in Beta.
We have added a new query optimizer in the Multistage Engine that computes and tracks precise Data Distribution across the entire plan before running some critical optimizations like Sort Pushdown, Aggregate Split/Pushdown, etc.
One of the biggest features of this Optimizer is that it can eliminate Shuffles or simplify Exchanges, when applicable, for arbitrarily complex queries, without requiring any Query Hints.
To enable this Optimizer for your MSE query, you can use the following Query Options:
SET useMultistageEngine=true;
SET usePhysicalOptimizer=true;
Key Features
The examples below are based on the COLOCATED_JOIN
Quickstart.
Automatic Colocated Joins and Shuffle Simplification
Consider the query below which consists of 3 Joins. With the new query optimizer, the entire query runs without any cross-server data exchange, since the data is partitioned by userUUID into a compatible number of partitions (see the "Setting Up Table Data Distribution" section below).
SET useMultistageEngine = true;
SET usePhysicalOptimizer = true;
WITH filtered_users AS (
SELECT
userUUID
FROM userAttributes
WHERE userUUID NOT IN (
SELECT
userUUID
FROM userGroups
WHERE groupUUID = 'group-1'
)
AND userUUID IN (
SELECT
userUUID
FROM userGroups
WHERE groupUUID = 'group-2'
)
)
SELECT
userUUID,
SUM(tripAmount)
FROM userFactEvents
WHERE
userUUID IN (
SELECT userUUID FROM filtered_users
)
GROUP BY userUUID
The query plan for this query is shown below. You can see that the entire query leverages IDENTITY_EXCHANGE
, which is a 1:1 Exchange as defined in Exchange Types below.
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])
PhysicalAggregate(group=[{1}], agg#0=[$SUM0($0)], aggType=[DIRECT])
PhysicalJoin(condition=[=($1, $2)], joinType=[semi])
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
PhysicalProject(tripAmount=[$7], userUUID=[$10])
PhysicalTableScan(table=[[default, userFactEvents]])
PhysicalJoin(condition=[=($0, $1)], joinType=[semi])
PhysicalProject(userUUID=[$0])
PhysicalFilter(condition=[IS NOT TRUE($3)])
PhysicalJoin(condition=[=($1, $2)], joinType=[left])
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
PhysicalProject(userUUID=[$6], userUUID0=[$6])
PhysicalTableScan(table=[[default, userAttributes]])
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])
PhysicalProject(userUUID=[$4], $f1=[true])
PhysicalFilter(condition=[=($3, _UTF-8'group-1')])
PhysicalTableScan(table=[[default, userGroups]])
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])
PhysicalProject(userUUID=[$4])
PhysicalFilter(condition=[=($3, _UTF-8'group-2')])
PhysicalTableScan(table=[[default, userGroups]])
Shuffle Simplification with Different Servers / Partition Count
The new optimizer can simplify shuffles even if:
The Servers used by either side of a Join are different
The Partition Count for the join inputs are different
In the example below, we have a Join performed across two tables: orange (left) and green (right).
The orange table has 4 partitions and the green table has 2 partitions. The servers selected for the Orange and Green tables are [S0, S1] and [S0, S2] respectively. The Join is performed in the servers [S0, S1], because Physical Optimizer by default uses the same Workers as the leftmost input operator.
If the hash-function used for partitioning the two tables is the same, we can leverage an Identity Exchange and skip re-partitioning the data on either side of the join. This is because S0 will consist of records from partitions and of the Orange table, which together contain all records that would make up partition modulo 2. i.e.
Note that Identity Exchange does not imply that the servers in the sender and receiver will be the same. It only implies that there will be a 1:1 mapping from senders to receivers. In the example below, the data transfer from S2 to S1 will be over the network.

Automatically Skip Aggregate Exchange
To evaluate something like GROUP BY userUUID
accurately you would need to distribute records based on the userUUID
column. The old query optimizer would add a Partitioning Exchange under each Aggregate, unless one used the query hint is_partitioned_by_group_by_keys
.
The Physical Optimizer can detect when data is already partitioned by the required column, and will automatically skip adding an Exchange. This has two advantages:
We avoid unnecessary Data Exchanges
We avoid splitting the Aggregate, since by default when an Aggregate exists on top of an Exchange, a copy of the Aggregate is added under the Exchange (unless
is_skip_leaf_stage_group_by
query hint is set)
This optimization can be seen in action in the query example shared above. Since data is already partitioned by userUUID
, all aggregations are run in DIRECT
mode, i.e. without splitting the aggregate into multiple aggregates.
Segment / Server Pruning
Similar to the V1 Engine, if you have enabled segmentPrunerTypes
in your table's Routing config, the Physical Optimizer will prune segments and servers using time, partition or other pruner types for the Leaf Stage. e.g. the following query will only select segments which satisfy the following constraint:
segmentPartition = Murmur("user-1") % numPartitions
SET useMultistageEngine = true;
SET usePhysicalOptimizer = true;
WITH user_events AS (
SELECT
productCode, tripAmount
FROM
userFactEvents
WHERE
userUUID = 'user-1'
ORDER BY
ts
DESC
LIMIT 100
)
SELECT
productCode,
SUM(tripAmount)
FROM
user_events
GROUP BY productCode
If partitioning is done in a way that segments corresponding to a given partition are present on only 1 server, then the entire query above will run within a single server, simulating shard-local execution from other systems.
Solve Constant Queries in Pinot Broker
Apache Calcite is capable of detecting Filter Expressions that will always evaluate to False. In such cases, the query plan may not have any Table Scans at all. Physical Optimizer solves such queries within the Broker itself, without involving any servers.
SET useMultistageEngine = true;
SET usePhysicalOptimizer = true;
SELECT
COUNT(*)
FROM
userFactEvents
WHERE
userUUID = 'user-1' AND userUUID = 'user-2'
Worker Assignment
At present, Worker Assignment follows these simple rules:
Leaf Stage will have workers assigned based on Table Scan and Filters, using the Routing configs set in the Table Config.
Other Stages will use the same workers as the left-most input stage.
Some Plan Nodes, such as
Sort(fetch=..)
, may require data to be collected in a single Worker. In such a case, that stage will be run on a single Worker, which will be randomly selected from one of the input workers.
Limitations
Some of the features of the existing MSE query optimizer are not yet available in the Physical Optimizer. We aim to add support for most these in Pinot 1.5:
Spools.
Dynamic filters for semi-join
Last updated
Was this helpful?