LogoLogo
release-1.1.0
release-1.1.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
        • Tenant
        • Server
        • Controller
        • Broker
        • Minion
      • Table
        • Segment
          • Deep Store
        • Schema
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • Segment compaction on upserts
      • Stream Ingestion with Dedup
      • Stream Ingestion with CLP
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
        • Complex Type (Array, Map) Handling
        • Ingest records with dynamic schemas
      • Reload a table segment
      • Upload a table segment
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
    • Releases
      • 1.1.0
      • Apache Pinot™ 1.0.0 release notes
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Aggregation Functions
        • Cardinality Estimation
        • Explain Plan (Single-Stage)
        • Explain Plan (Multi-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
        • Querying JSON data
        • Transformation Functions
        • Window aggregate
      • Query Options
      • User-Defined Functions (UDFs)
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • ago
      • EXPR_MIN / EXPR_MAX
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • caseWhen
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTAVG
      • DISTINCTAVGMV
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DISTINCTSUM
      • DISTINCTSUMMV
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FIRSTWITHTIME
      • FLOOR
      • FrequentLongsSketch
      • FrequentStringsSketch
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • FUNNELCOUNT
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • LASTWITHTIME
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • percentilekll
      • percentilerawkll
      • percentilekllmv
      • percentilerawkllmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • ROW_NUMBER
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • yearOfWeek
      • Extract
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • What do you need to know about the multi-stage engine?
  • Why use the multi-stage query engine?
  • When not to use the multi-stage query engine?
  • Multi-stage query execution model
  • How queries are processed
  • Stage 1
  • Stage 2
  • Stage 3

Was this helpful?

Export as PDF
  1. Reference

Multi-stage query engine (v2)

An overview of the multi-stage query engine.

PreviousSingle-stage query engine (v1)NextCommunity

Was this helpful?

This document is an overview of the new multi-stage query engine.

What do you need to know about the multi-stage engine?

The multi-stage engine is the new query execution engine released in Pinot 1.0.0. Here are some useful links:

  • Get started

  • Find supported to query the multi-stage engine, including and features

  • See and for the multi-stage query engine

Why use the multi-stage query engine?

You must use the multi-stage query engine (v2) to query distributed joins, window functions, and other multi-stage operators in real-time.

When not to use the multi-stage query engine?

Although the multi-stage query engine can generally execute any complex ANSI SQL, it's not designed to run generic ANSI SQL in the most efficient way.

Some use cases to avoid:

  • Large-scale, long-running queries designed to access and transform entire datasets are not recommended, as the multistage engine is a pure in-memory system.

  • Complex correlation, join algorithms that touch many tables or have many non-trivial join conditions are not recommended.

  • Long-running, complex queries such as ETL-type (extract, transform, and load) use cases are not recommended.

Multi-stage query execution model

The intermediate compute stage includes a set of processing servers and a data exchange mechanism.

Processing servers in the intermediate compute stage can be assigned to any Pinot component. Multiple servers can process data in the intermediate stage; the goal being to offload the computation from the brokers. Each server in the intermediate stage executes the same processing logic, but against different sets of data.

The data exchange service coordinates the transfer of the different sets of data to and from the processing servers.

The multi-stage query engine also includes a new query plan optimizer to produce optimal process logic in each stage and minimize data shuffling overhead.

How queries are processed

Consider the following JOIN query example, which illustrates the breakdown of a query into stages. This query joins a real-time orderStatus table with an offline customer table.

SELECT 
  os.uid,               -- user ID
  os.rName,             -- restaurant Name
  c.ltv                 -- life-time value
FROM
  orderStatus AS os 
  INNER JOIN customer AS c
    ON os.uid = c.uid
WHERE
  os.ic < 10            -- item count
  AND c.lto > 5         –- life-time order count

Stage 1

In the first stage, the query is processed as follows:

  1. Real-time servers execute the filter query on the orderStatus table:

SELECT os.uid, os.rName FROM orderStatus AS os WHERE os.ic < 10
  1. Offline servers execute the filter query offline customer table:

SELECT c.uid c.ltv FROM customer AS c WHERE c.lto > 5
  1. The data exchange service shuffles data shuffle, so all data with the same unique customer ID is sent to the same processing server for the next stage.

  2. On each processing server, an inner JOIN is performed.

Stage 2

  1. On each processing server, an inner JOIN is performed.

  2. runs the same join algorithm, but on different uids.

Stage 3

  • After the join algorithm completes, the results are sent back to the broker, and then sent to clients.

The multi-stage query engine is built to run real-time, complex ANSI SQL (). Highlights include joins and data correlations, particularly optimized for dynamic broadcast fact-dim joins, and partitioned-based or colocated table joins.

The multi-stage query engine improves query performance over the , effectively decoupling the data exchange layer and the query engine layer.

With a multi-stage query engine, Pinot first breaks down the into multiple query sub-plans that run across different sets of servers. We call these sub-plans “stage plans,” and refer to each execution as a “stage.”

Each intermediary servers (shown in ) performs a local join, and

ISO/IEC 9075
single-stage scatter-gather query engine ( v1)
single scatter-gather query plan used in v1
Figure 1: Multi-stage query execution model
using multi-stage engine
query syntax
Window functions
JOIN
Apache Pinot 1.0 Multi-Stage Query Engine overview
Figure 1: Multi-stage query execution model
limitations
troubleshooting tips