LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • Implementation details
  • Blocking nature
  • Hints
  • num_groups_limit
  • is_partitioned_by_group_by_keys
  • is_leaf_return_final_result
  • is_skip_leaf_stage_group_by
  • max_initial_result_holder_capacity
  • Stats
  • executionTimeMs
  • emittedRows
  • numGroupsLimitReached
  • Explain attributes
  • group
  • agg#N
  • Tips and tricks
  • Try to not use aggregation functions that cannot be parallelized

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Users
  2. Query
  3. Multi-stage query
  4. Operator Types

Aggregate

Describes the aggregate relation operator in the multi-stage query engine.

PreviousOperator TypesNextFilter

Last updated 3 months ago

Was this helpful?

The aggregate operator is used to perform calculations on a set of rows and return a single row of results.

This page describes the aggregate operator defined in the relational algebra used by multi-stage queries. This operator is generated by the multi-stage query engine when you use aggregate functions in a query either with or without a group by clause.

Implementation details

Aggregate operations may be expensive in terms of memory, CPU and network usage. As explained in , the multi-stage query engine breaks down the query into multiple stages and each stage is then executed in parallel on different workers. Each worker processes a subset of the data and sends the results to the coordinator which then aggregates the results. When possible, the multi-stage query engine will try to apply a divide-and-conquer strategy to reduce the amount of data that needs to be processed in the coordinator stage.

For example if the aggregation function is a sum, the engine will try to sum the results of each worker before sending the partial result to the coordinator, which would then sum the partial results in order to get the final result. But some aggregation functions, like count(distinct), cannot be computed in this way and require all the data to be processed in the coordinator stage.

In Apache Pinot 1.1.0, the multi-stage query engine always keeps the data in memory. This means that the amount of memory used by the engine is proportional to the number of groups generated by the group by clause and the amount of data that needs to be kept for each group (which depends on the aggregation function).

Even when the aggregation function is a simple count, which only requires to keep a long for each group in memory, the amount of memory used can be high if the number of groups is high. This is why the engine limits the number of groups. By default, this limit is 100.000, but this can be changed by providing hints.

Blocking nature

The aggregate operator is a blocking operator. It needs to consume all the input data before emitting the result.

Hints

num_groups_limit

Type: Integer

Default: 100,000

Defines the max number of groups that can be created by the group by clause. If the number of groups exceeds this limit, new groups will be ignored and only existing groups updated.

This property is applied per operator, which means that if there parallelism is 4 (and therefore there are 4 incarnations of the Aggregate operator), they can pass 4 times the value of this property to the next operator.

Example:

SELECT
/*+  aggOptions(num_groups_limit='10000000') */
    col1, count(*)
FROM table GROUP BY col1

is_partitioned_by_group_by_keys

Type: Boolean

Default: false

If set to true, the engine will consider that the data is already partitioned by the group by keys. This means that the engine will not need to shuffle the data to group them by the group by keys and the coordinator stage will be able to compute the final result without needing to merge the partial results.

Caution: This hint should only be used if the data is already partitioned by the group by keys. There is no check to verify that the data is indeed partitioned by the group by keys and using this hint when the data is not partitioned by the group by keys will lead to incorrect results.

Example:

SELECT 
/*+ aggOptions(is_partitioned_by_group_by_keys='true') */
    a.col3, a.col1, SUM(b.col3)
FROM a JOIN b ON a.col3 = b.col3 
GROUP BY a.col3, a.col1

is_leaf_return_final_result

Type: Boolean

Default: false

This is particularly useful for distinct count function family. If set to true, the engine will consider that the data is already partitioned by the aggregated column, and the coordinator stage will be able to compute the final result, and the result merge will be applied to final results.

Caution: This hint should only be used if the data is already partitioned by the aggregated column. There is no check to verify that the data is indeed partitioned by the aggregated column and using this hint when the data is not partitioned by the aggregated column will lead to incorrect results.

Example:

SELECT
/*+ aggOptions(is_leaf_return_final_result='true') */
    col1, COUNT(DISTINCT col2)
FROM a
GROUP BY col1

is_skip_leaf_stage_group_by

Type: Boolean

Default: false

If set to true, the engine will not push down the aggregate into the leaf stage. In some situations, it could be wasted effort to do group-by on leaf, eg: when cardinality of group by column is very high.

Example:

SELECT 
/*+ aggOptions(is_skip_leaf_stage_group_by='true') */ 
    a.col1, SUM(a.col3) 
FROM a
WHERE a.col3 >= 0 AND a.col2 = 'a' 
GROUP BY a.col1

max_initial_result_holder_capacity

Type: Integer

Default: 10,000

Defines the initial capacity of the result holder that stores the intermediate results of the aggregation. This hint can be used to reduce the memory usage of the engine by setting a value close to the expected number of groups. It is usually recommended to not change this hint unless you know that the expected number of groups is much lower than the default value.

Example:

SELECT 
/*+ aggOptions(max_initial_result_holder_capacity='10') */ 
    a.col1, SUM(a.col3) 
FROM a
WHERE a.col3 >= 0 AND a.col2 = 'a' 
GROUP BY a.col1

Stats

executionTimeMs

Type: Long

The summation of time spent by all threads executing the operator. This means that the wall time spent in the operation may be smaller that this value if the parallelism is larger than 1. Remember that this value is affected by the number of received rows and the complexity of the aggregation function.

emittedRows

Type: Long

The number of groups emitted by the operator. A large number of emitted rows can indicate that the query is not well optimized.

Remember that the number of groups is limited by the num_groups_limit hint and a large number of groups can lead to high memory usage and slow queries.

numGroupsLimitReached

Type: Boolean

This stat is set to true when the number of groups exceeds the limit defined by the num_groups_limit hint. In that case, the query will not fail but will return partial results, which will be indicated by the global partialResponse stat.

Explain attributes

The aggregate operator is represented in the explain plan as a LogicalAggregate explain node.

Remember that these nodes appear in pairs: First in one stage where the aggregation is done in parallel and then in the upstream stage where the partial results are merged.

group

Type: List of Integer

The list of columns used in the group by clause. These numbers are 0-based column indexes on the virtual row projected by the upstream.

For example the explain plan:

LogicalAggregate(group=[{6}], agg#0=[COUNT()], agg#1=[MAX($5)])
  LogicalTableScan(table=[[default, userAttributes]])

Is saying that the group by clause is using the column with index 6 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 6 is the 7th column in the table as defined in its schema.

agg#N

Type: Expression

The aggregation functions applied to the columns. There may be multiple agg#N attributes, each one representing a different aggregation function.

For example the explain plan:

LogicalAggregate(group=[{6}], agg#0=[COUNT()], agg#1=[MAX($5)])
  LogicalTableScan(table=[[default, userAttributes]])

Has two aggregation functions: COUNT() and MAX(). The second is applied to the column with index 5 in the virtual row projected by the upstream. Given in this case that row is generated by a table scan, the column with index 5 is the 6th column in the table as defined in its schema.

Tips and tricks

Try to not use aggregation functions that cannot be parallelized

For example, it is recommended to use one of the different hyperloglog flavor instead of count(distinct) when the cardinality of the data is high or data size is big.

For example, it is cheaper to execute count(distinct) on an int column with 1000 distinct values than on a column that stores very long strings, even if the number of distinct values is the same.

understanding stages