An overview of the multi-stage query engine.
This document covers:
For more information on the multi-stage query engine (v2), see the following:
See how to enable and use the multi-stage query engine
Review limitations of the multi-stage query engine and troubleshooting tips
Query Window functions and JOINs
You must use the multi-stage query engine (v2) to query distributed joins, window functions, and other multi-stage operators in real-time. See how to enable and use the multi-stage query engine (v2).
The multi-stage query engine is built to run real-time facing, complex ANSI SQL. Highlights include joins and data correlations, particularly optimized for dynamic broadcast fact-dim joins, and partitioned-based or colocated table joins.
Although the multi-stage query engine can generally execute any complex ANSI SQL (ISO/IEC 9075). It is not designed to run any generic ANSI SQL in the most efficient way.
Some use cases to avoid:
Large-scale, long-running queries designed to access and transform entire datasets are not recommended, as the multistage engine is a pure in-memory system.
Complex correlation, join algorithms that touch many tables or have many non-trivial join conditions are not recommended.
Long-running, complex queries such as ETL-type (extract, transform, and load) use cases are not recommended.
The multi-stage query engine improves query performance over the single-stage scatter-gather query engine ( v1), effectively decoupling the data exchange layer and the query engine layer.
The intermediate compute stage includes a set of processing servers and a data exchange mechanism.
Processing servers in the intermediate compute stage can be assigned to any Pinot component. Multiple servers can process data in the intermediate stage, the goal being to offload the computation from the brokers. Each server in the intermediate stage executes the same processing logic, but against different sets of data.
The data exchange service coordinates the transfer of the different sets of data to and from the processing servers.
The multi-stage query engine also includes a new query plan optimizer to produce optimal process logic in each stage and minimize data shuffling overhead.
With a multi-stage query engine, Pinot first breaks down the single scatter-gather query plan used in v1 into multiple query sub-plans that run across different sets of servers. We call these sub-plans “stage plans,” and refer to each execution as a “stage.”
Consider the following JOIN query example, which illustrates the breakdown of a query into stages. This query joins a real-time orderStatus
table with an offline customer
table:
In the first stage, the query is processed as follows:
Real-time servers execute the filter query on the orderStatus
table:
Offline servers execute the filter query offline customer
table:
The data exchange service shuffles data shuffle, so all data with the same unique customer ID is sent to the same processing server for the next stage.
On each processing server, an inner JOIN is performed.
On each processing server, an inner JOIN is performed.
Each intermediary servers (shown in Figure 1: Multi-stage query execution model) performs a local join, and
runs the same join algorithm, but on different uids.
After the join algorithm completes, the results are sent back to the broker, and then sent to clients.