LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Overview
  • FAQs
  • How do I enable/disable this feature for specific queries?
  • What happens if two stages are not equivalent?
  • How can I verify if stage-level spooling is working for my query?
  • Is this feature limited to WITH expressions?
  • If a WITH expression is used twice in a query, will stage-level spooling always be applied?
  • Configuration
  • Example
  • Limitations
  • Equivalent stages
  • Known issues
  • Blocks, timeouts and memory
  • Limited support on intermediate stages
  • Version support
  • References

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Users
  2. Query
  3. Multi-stage query

Stage-Level Spooling

Also know as reuse common expressions

Stage-level spooling is still under development and may have some limitations.

It is not recommended to turn them on by default but instead to enable them on a per-query basis after testing it actually improves the query performance.

Users are encouraged to report any issues they encounter.

Overview

In the multi-stage query engine, it is common for queries to inadvertently read from the same table or execute the same join multiple times. This can happen, for example, when using WITH expressions or complex joins. Such redundant operations can lead to significant performance overhead, especially when dealing with large datasets or expensive operations like joins and aggregations.

To address this issue, Apache Pinot now supports stage-level spooling, which identifies and eliminates redundant stages in the query execution plan. This optimization ensures that equivalent stages are executed only once, reducing unnecessary computation and improving query performance, particularly on stages involving repeated table scans, joins, or aggregations.

FAQs

How do I enable/disable this feature for specific queries?

Use SET useSpools = true; or SET useSpools = false; in your query.

What happens if two stages are not equivalent?

The query will run as usual, without any optimization.

How can I verify if stage-level spooling is working for my query?

Use the stage stats visualizer or EXPLAIN IMPLEMENTATION PLAN FOR to see the query execution plan. Look for the stage ID for each send operator. If stage-level spooling is applied, you should see the same stage ID for equivalent stages.

Is this feature limited to WITH expressions?

No, it works for any query with equivalent stages, no matter how they are written.

If a WITH expression is used twice in a query, will stage-level spooling always be applied?

No, the feature is only applied if the stages are equivalent after other optimizations are applied. See the limitations section for more details.

Configuration

Stage-level spooling is disabled by default but can be enabled in the following ways:

  • Globally: Change pinot.broker.multistage.spools in the broker configuration file to set whether stage-level spooling is enabled by default for all queries.

  • Per Query: Use the useSpools option in the query to enable or disable stage-level spooling for that query.

Example

See the following example to understand how stage-level spooling works:

SET useSpools = false; -- disable stage-level spooling
SELECT *
FROM T1
JOIN T2 as t2first
    ON T1.col1 = t2first.col2
JOIN T2 as t2second
    ON t2first.col3 = t2second.col3

This query will generate the following plan:

When stage-level spooling is enabled (ie, with SET useSpools = true), the plan will be optimized as follows:

Limitations

Equivalent stages

Stage-level spooling is automatically applied when the query planner detects equivalent stages. Users do not need to modify their queries to benefit from this optimization. However, understanding how it works can help in writing more efficient queries.

Two stages are considered equivalent if they:

  • Have the same operators

    • This means that they have to project the same columns, apply the same filters, and perform the same aggregations, etc

  • Their children's stages are equivalent.

  • Have different parents.

    • These two stages that are direct children of the same join or union are not equivalent.

These conditions are applied after most Pinot logical optimizations are done. This means that even if two stages are equivalent in the SQL sentence, they may not be equivalent in the final plan. This is very common when using WITH expressions, which are just syntactic sugar and are expanded into the main query before optimizations are applied.

Two Pinot optimizations can easily disable the stage-level spooling: Filter and Projection pushdown.

Filter pushdown is an optimization that pushes the filter deeper into the execution plan. For example, imagine a query like:

WITH with1 AS (
  SELECT col1, col2, col3, count(*) 
  FROM table1 
  GROUP BY col1, col2, col3
)
SELECT * FROM with1 as t2 
JOIN table2 as t2
ON t1.col2 = t2.col2
JOIN with1 as t3
ON t2.col3 = t3.col3
WHERE t1.col3 = 2

In that query, the filter t1.col3 = 2 is defined after both joins, but given that the predicate only depends on t1, Pinot will push the filter and the plan will look like:

SELECT * FROM (
  SELECT col1, col2, 2, count(*) 
  FROM table1 
  WHERE col3 = 2
  GROUP BY col1, col2
) as t2 
JOIN table2 as t2
ON t1.col2 = t2.col2
JOIN (
  SELECT col1, col2, col3, count(*)
  FROM table1
  GROUP BY col1, col2
) as t3
ON t2.col3 = t3.col3

As you can see, in the query, both usages of the WITH expression have been expanded, and the WHERE filter has been pushed down into the first one. This makes both subqueries different, so stage-level spooling will not be applied.

The same thing happens with projection pushdown, an optimization that pushes the projection down into the execution plan. This means that if you use a WITH expression twice in the same query but each time select different columns, the stages will not be equivalent.

Pinot decides to give higher priority to these optimizations than to stage-level spooling for several reasons. The main one is that these optimizations are more common and have a bigger impact on query performance. If you find use cases where you think stage-level spooling should have higher priority, please report them as a GitHub issue.

Known issues

Stage-level spooling is a very extensive feature, and some scenarios are difficult to predict. This is why it is considered still under development and why it is not enabled by default. Users are encouraged to test it before enabling it for all queries and open GitHub issues if they find any issues.

Here is a list of known issues that were detected during the design and early development of this feature.

Blocks, timeouts and memory

In some situations, a spooled stage may have parents that take a while to consume the data. During that time, the spooled stage will need to buffer the data, which can lead to memory pressure, timeouts, or other errors.

Limited support on intermediate stages

Early adopters of stage-level spooling found some issues when multi-stage spooling was enabled in very large queries. Sometimes, the intermediate stages were not spooled, leading to planning time errors.

Version support

  • This feature is available in Apache Pinot version 1.3.0 and later.

  • The first version that can spool intermediate stages is 1.4.0.

  • Stage stats visualizer was introduced in Apache Pinot version 1.3.0,

    • In 1.3.0, each spool is shown as a different node with the same stage ID, so the visualization is going to have the same shape of the JSON that stores the stats.

    • In 1.4.0, the visualization spooled stages are shown as a single node with edges to the stages that read from them.

References

PreviousWindowNextExplain plan

Last updated 2 months ago

Was this helpful?

: This is the GitHub issue that tracks the original design and work done to write this feature.

: a bit outdated but useful to understand how the feature works internally.

GitHub Issue #14196
Design Document