LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Implementation details
  • Blocking nature
  • Hints
  • Stats
  • executionTimeMs
  • emittedRows
  • Explain attributes
  • sort#
  • dir#
  • fetch
  • offset
  • Tips and tricks
  • Limit and offset can prevent filter pushdown
  • Do not abuse offset pagination

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Users
  2. Query
  3. Multi-stage query
  4. Operator Types

Sort or limit

Describes the sort or limit relation operator in the multi-stage query engine.

The sort or limit operator is used to sort the input data, limit the number of rows emitted by the operator or both. This operator is generated by the multi-stage query engine when you use an order by, limit or offset operation in a query.

Implementation details

Blocking nature

Hints

None

Stats

executionTimeMs

Type: Long

The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1.

emittedRows

Type: Long

The number of groups emitted by the operator.

Explain attributes

The sort or limit operator is represented in the explain plan as a LogicalSort explain node.

sort#

Type: Expression

The sort expressions used by the operator. There is one of these attributes per sort expression. The first one is sort0, the second one is sort1, and so on.

The value of this attribute is the expression used to sort the data and may contain indexed columns ($0, $1, etc) that represent the columns of the virtual row generated by the upstream.

For example, the following plan:

LogicalSort(sort0=[$0], sort1=[$2], dir0=[ASC], dir1=[ASC], fetch=[10])
  LogicalProject(userUUID=[$6], deviceOS=[$4], EXPR$2=[SUBSTRING($4, 0, 2)])
    LogicalTableScan(table=[[default, userAttributes]])

Is saying that the rows are sorted first by the column with index 0 and then by the column with index 2 in the virtual row generated by the upstream. That column is generated by a projection whose first column (index 0) is userUUID, the second (index 1) is deviceOS and third (index 2) is the result of the SUBSTRING($4, 0, 2) expression. As we know $4 in this project is deviceOS, we can infer that the third column is the first two characters of the deviceOS column.

dir#

Type: ASC or DESC

The direction of the sort. There is one of these attributes per sort expression.

fetch

Type: Long

The number of rows to emit. This is the equivalent to LIMIT in SQL. Remember that the limit can be applied without sorting, in which case the order on which the rows are emitted is undefined.

offset

Type: Long

The number of rows to skip before emitting the rows. This is the equivalent to OFFSET in SQL.

Tips and tricks

Limit and offset can prevent filter pushdown

In SQL, usually limit and offset are used in the last stage of the query. But when being used in the middle of the query (like in a subquery or a CTE), it can prevent filter pushdown optimization.

For example, imagine the following query:

select 
a.* 
from userAttributes as a
join userGroups as g
on a.userUUID = g.userUUID
where a.deviceOS = 'windows'

This query may generate the plan:

LogicalProject(daysSinceFirstTrip=[$0], deviceOS=[$1], totalTrips=[$2], userUUID=[$3])
  LogicalJoin(condition=[=($3, $4)], joinType=[inner])
    PinotLogicalExchange(distribution=[hash[3]])
      LogicalProject(daysSinceFirstTrip=[$3], deviceOS=[$4], totalTrips=[$5], userUUID=[$6])
        LogicalFilter(condition=[=($4, _UTF-8'windows')])
          LogicalTableScan(table=[[default, userAttributes]])
    PinotLogicalExchange(distribution=[hash[0]])
      LogicalProject(userUUID=[$4])
        LogicalTableScan(table=[[default, userGroups]])

We can see that the filter deviceOS = 'windows' is pushed down to the leaf stage. This reduce the amount of data that needs to be scanned and can improve the query performance, specially if there is an inverted index in the deviceOS column.

But if we modify the query to add a limit to the userAttributes table scan:

select 
a.* 
from (select * from userAttributes limit 10) as a
join userGroups as g
on a.userUUID = g.userUUID
where a.deviceOS = 'windows'

The generated plan will be:

LogicalProject(daysSinceFirstTrip=[$0], deviceOS=[$1], totalTrips=[$2], userUUID=[$3])
  LogicalJoin(condition=[=($3, $4)], joinType=[inner])
    PinotLogicalExchange(distribution=[hash[3]])
      LogicalFilter(condition=[=($1, _UTF-8'windows')])
        LogicalSort(offset=[0], fetch=[10])
          PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])
            LogicalSort(fetch=[10])
              LogicalProject(daysSinceFirstTrip=[$3], deviceOS=[$4], totalTrips=[$5], userUUID=[$6])
                LogicalTableScan(table=[[default, userAttributes]])
    PinotLogicalExchange(distribution=[hash[0]])
      LogicalProject(userUUID=[$4])
        LogicalTableScan(table=[[default, userGroups]])

Here we can see that the filter deviceOS = 'windows' is not pushed down leaf stage, which means that the engine will need to scan all the data in the userAttributes table and then apply the filter.

The reason why the filter is not pushed down is that the limit operation must be applied before the filter in order to not break the semantics, which in this case are saying that we want 10 rows of the userAttributes table without considering their deviceOS value.

In cases where you actually want to apply the filter before the limit, you can specify the where clause in the subquery. For example:

select 
a.* 
from (select * from userAttributes where deviceOS = 'windows' limit 10) as a
join userGroups as g
on a.userUUID = g.userUUID

Which will produce the following plan:

LogicalProject(daysSinceFirstTrip=[$0], deviceOS=[$1], totalTrips=[$2], userUUID=[$3])
  LogicalJoin(condition=[=($3, $4)], joinType=[inner])
    PinotLogicalExchange(distribution=[hash[3]])
      LogicalSort(offset=[0], fetch=[10])
        PinotLogicalSortExchange(distribution=[hash], collation=[[]], isSortOnSender=[false], isSortOnReceiver=[false])
          LogicalSort(fetch=[10])
            LogicalProject(daysSinceFirstTrip=[$3], deviceOS=[$4], totalTrips=[$5], userUUID=[$6])
              LogicalFilter(condition=[=($4, _UTF-8'windows')])
                LogicalTableScan(table=[[default, userAttributes]])
    PinotLogicalExchange(distribution=[hash[0]])
      LogicalProject(userUUID=[$4])
        LogicalTableScan(table=[[default, userGroups]])

As you can see, the filter is pushed down to leaf stage, which will reduce the amount of data

Do not abuse offset pagination

Although OFFSET and LIMIT are a very simple way to paginate results, they can be very inefficient. It is almost always better to paginate using a WHERE clause that uses a range of values instead of using OFFSET.

The reason is that in order to apply an OFFSET the engine must generate these rows and then discard them. Instead, if you use a WHERE clause with a range of values, the engine can apply different techniques like indexes or pruning to avoid reading the rows that are not needed.

PreviousMinusNextTransform

Last updated 11 months ago

Was this helpful?

This is not a Pinot specific issue, but a general one. See for example or .

Paging Through Results (external link)
Pagination, You Are Probably Doing It Wrong (external link)