LogoLogo
latest
latest
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • JOINs
        • Lookup UDF Join
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Broker GRPC API
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
          • Examples and Scenarios
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
      • Reload a table segment
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Database
    • Ingestion Job Spec
    • Monitoring Metrics
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Manage Data
    • Import Data
      • SQL Insert Into From Files
      • Upload Pinot segment Using CommandLine
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream Ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
        • Stream ingestion with CLP
      • Upsert and Dedup
        • Stream ingestion with Upsert
        • Segment compaction on upserts
        • Stream ingestion with Dedup
      • Supported Data Formats
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Complex Type (Array, Map) Handling
        • Complex Type Examples (Unnest)
      • Ingest records with dynamic schemas
  • Functions
    • Aggregation Functions
    • Transformation Functions
    • Array Functions
    • Funnel Analysis Functions
    • Hash Functions
    • JSON Functions
    • User-Defined Functions (UDFs)
    • URL Functions
    • Unique Count and cardinality Estimation Functions
  • Window Functions
  • (Deprecating) Function List
    • ABS
    • ADD
    • ago
    • EXPR_MIN / EXPR_MAX
    • ARRAY_AGG
    • arrayConcatDouble
    • arrayConcatFloat
    • arrayConcatInt
    • arrayConcatLong
    • arrayConcatString
    • arrayContainsInt
    • arrayContainsString
    • arrayDistinctInt
    • arrayDistinctString
    • arrayIndexOfInt
    • arrayIndexOfString
    • ARRAYLENGTH
    • arrayRemoveInt
    • arrayRemoveString
    • arrayReverseInt
    • arrayReverseString
    • arraySliceInt
    • arraySliceString
    • arraySortInt
    • arraySortString
    • arrayUnionInt
    • arrayUnionString
    • AVGMV
    • Base64
    • caseWhen
    • ceil
    • CHR
    • codepoint
    • concat
    • count
    • COUNTMV
    • COVAR_POP
    • COVAR_SAMP
    • day
    • dayOfWeek
    • dayOfYear
    • DISTINCT
    • DISTINCTCOUNT
    • DISTINCTCOUNTMV
    • DISTINCT_COUNT_OFF_HEAP
    • SEGMENTPARTITIONEDDISTINCTCOUNT
    • DISTINCTCOUNTBITMAP
    • DISTINCTCOUNTBITMAPMV
    • DISTINCTCOUNTHLL
    • DISTINCTCOUNTHLLMV
    • DISTINCTCOUNTRAWHLL
    • DISTINCTCOUNTRAWHLLMV
    • DISTINCTCOUNTSMARTHLL
    • DISTINCTCOUNTHLLPLUS
    • DISTINCTCOUNTULL
    • DISTINCTCOUNTTHETASKETCH
    • DISTINCTCOUNTRAWTHETASKETCH
    • DISTINCTSUM
    • DISTINCTSUMMV
    • DISTINCTAVG
    • DISTINCTAVGMV
    • DIV
    • DATETIMECONVERT
    • DATETRUNC
    • exp
    • FIRSTWITHTIME
    • FLOOR
    • FrequentLongsSketch
    • FrequentStringsSketch
    • FromDateTime
    • FromEpoch
    • FromEpochBucket
    • FUNNELCOUNT
    • FunnelCompleteCount
    • FunnelMaxStep
    • FunnelMatchStep
    • GridDistance
    • Histogram
    • hour
    • isSubnetOf
    • JSONFORMAT
    • JSONPATH
    • JSONPATHARRAY
    • JSONPATHARRAYDEFAULTEMPTY
    • JSONPATHDOUBLE
    • JSONPATHLONG
    • JSONPATHSTRING
    • jsonextractkey
    • jsonextractscalar
    • LAG
    • LASTWITHTIME
    • LEAD
    • length
    • ln
    • lower
    • lpad
    • ltrim
    • max
    • MAXMV
    • MD5
    • millisecond
    • min
    • minmaxrange
    • MINMAXRANGEMV
    • MINMV
    • minute
    • MOD
    • mode
    • month
    • mult
    • now
    • percentile
    • percentileest
    • percentileestmv
    • percentilemv
    • percentiletdigest
    • percentiletdigestmv
    • percentilekll
    • percentilerawkll
    • percentilekllmv
    • percentilerawkllmv
    • quarter
    • regexpExtract
    • regexpReplace
    • remove
    • replace
    • reverse
    • round
    • roundDecimal
    • ROW_NUMBER
    • rpad
    • rtrim
    • second
    • sha
    • sha256
    • sha512
    • sqrt
    • startswith
    • ST_AsBinary
    • ST_AsText
    • ST_Contains
    • ST_Distance
    • ST_GeogFromText
    • ST_GeogFromWKB
    • ST_GeometryType
    • ST_GeomFromText
    • ST_GeomFromWKB
    • STPOINT
    • ST_Polygon
    • strpos
    • ST_Union
    • SUB
    • substr
    • sum
    • summv
    • TIMECONVERT
    • timezoneHour
    • timezoneMinute
    • ToDateTime
    • ToEpoch
    • ToEpochBucket
    • ToEpochRounded
    • TOJSONMAPSTR
    • toGeometry
    • toSphericalGeography
    • trim
    • upper
    • Url
    • UTF8
    • VALUEIN
    • week
    • year
    • Extract
    • yearOfWeek
    • FIRST_VALUE
    • LAST_VALUE
    • ST_GeomFromGeoJSON
    • ST_GeogFromGeoJSON
    • ST_AsGeoJSON
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
      • Realtime Ingestion Stopped
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page
  • V1 / Single Stage Query Engine
  • Within segment
  • Trimming tail groups
  • Cross segments
  • At Broker
  • GROUP BY behavior
  • HAVING behavior
  • Examples
  • V2 / Multi Stage Query Engine
  • Examples
  • Configuration Parameters/hints

Was this helpful?

Edit on GitHub
Export as PDF
  1. For Users
  2. Query
  3. Query Syntax

Grouping Algorithm

PreviousGapFill Function For Time-Series DatasetNextJOINs

Last updated 2 months ago

Was this helpful?

In this guide we will learn about the heuristics used for trimming results in Pinot's grouping algorithm (used when processing GROUP BY queries) to make sure that the server doesn't run out of memory.

V1 / Single Stage Query Engine

Within segment

When grouping rows within a segment, Pinot keeps a maximum of numGroupsLimit groups per segment. This value is set to 100,000 by default and can be configured by the pinot.server.query.executor.num.groups.limit property.

If the number of groups of a segment reaches this value, the extra groups will be ignored and the results returned may not be completely accurate. The numGroupsLimitReached property will be set to true in the query response if the value is reached.

Trimming tail groups

After the inner segment groups have been computed, the Pinot query engine optionally trims tail groups. Tail groups are ones that have a lower rank based on the ORDER BY clause used in the query.

When segment group trim is enabled, the query engine will trim the tail groups and keep only max(minSegmentGroupTrimSize, 5 * LIMIT) , where LIMIT is the maximum number of records returned by query - usually set via LIMIT clause). Pinot keeps at least 5 * LIMIT groups when trimming tail groups to ensure the accuracy of results. Trimming is performed only when ordering and limit is specified.

This value can be overridden on a query by query basis by passing the following option:

SELECT * 
FROM ...
OPTION(minSegmentGroupTrimSize=value)

Cross segments

Once grouping has been done within a segment, Pinot will merge segment results and trim tail groups and keep max(minServerGroupTrimSize, 5 * LIMIT) groups if it gets more groups.

minServerGroupTrimSize is set to 5,000 by default and can be adjusted by configuring the pinot.server.query.executor.min.server.group.trim.size property. Cross segments trim can be disabled by setting the property to -1.

When cross segments trim is enabled, the server will trim the tail groups before sending the results back to the broker. To reduce memory usage while merging per-segment results, It will also trim the tail groups when the number of groups reaches the trimThreshold.

trimThreshold is the upper bound of groups allowed in a server for each query to protect servers from running out of memory. To avoid too frequent trimming, the actual trim size is bounded to trimThreshold / 2. Combining this with the above equation, the actual trim size for a query is calculated as min(max(minServerGroupTrimSize, 5 * LIMIT), trimThreshold / 2).

This configuration is set to 1,000,000 by default and can be adjusted by configuring the pinot.server.query.executor.groupby.trim.threshold property.

A higher threshold reduces the amount of trimming done, but consumes more heap memory. If the threshold is set to more than 1,000,000,000, the server will only trim the groups once before returning the results to the broker.

This value can be overridden on a query by query basis by passing the following option:

SELECT * 
FROM ...
OPTION(groupTrimThreshold=value)

At Broker

When broker performs the final merge of the groups returned by various servers, there is another level of trimming that takes place. The tail groups are trimmed and max(minBrokerGroupTrimSize, 5 * LIMIT) groups are retained.

Default value of minBrokerGroupTrimSize is set to 5000. This can be adjusted by configuring pinot.broker.min.group.trim.size property.

GROUP BY behavior

Pinot sets a default LIMIT of 10 if one isn't defined and this applies to GROUP BY queries as well. Therefore, if no limit is specified, Pinot will return 10 groups.

Pinot will trim tail groups based on the ORDER BY clause to reduce the memory footprint and improve the query performance. It keeps at least 5 * LIMIT groups so that the results give good enough approximation in most cases. The configurable min trim size can be used to increase the groups kept to improve the accuracy but has a larger extra memory footprint.

HAVING behavior

If the query has a HAVING clause, it is applied on the merged GROUP BY results that already have the tail groups trimmed. If the HAVING clause is the opposite of the ORDER BY order, groups matching the condition might already be trimmed and not returned. e.g.

SELECT SUM(colA) 
FROM myTable 
GROUP BY colB 
HAVING SUM(colA) < 100 
ORDER BY SUM(colA) DESC 
LIMIT 10

Increase min trim size to keep more groups in these cases.

Examples

For a simple keyed aggregation query such as:

SELECT i, j, count(*) AS cnt
FROM tab
GROUP BY i, j
ORDER BY i ASC, j ASC
LIMIT 3;

a simplified execution plan, showing where trimming happens, looks like:

BROKER_REDUCE(sort:[i, j],limit:10) <- sort and trim groups to minBrokerGroupTrimSize
  COMBINE_GROUP_BY <- sort and trim groups to minServerGroupTrimSize
    PLAN_START
      GROUP_BY <- limit to numGroupsLimit, then sort and trim to minSegmentGroupTrimSize
        PROJECT(i, j)
          DOC_ID_SET
            FILTER_MATCH_ENTIRE_SEGMENT

For sake of brevity, plan above doesn't mention that actual number of groups left is min( trim_value, 5*limit ) .

V2 / Multi Stage Query Engine

Compared to V1, V2 engine uses similar algorithm, but there are notable differences:

  • V2 doesn't implicitly limit number of query results (to 10)

  • V2 doesn't limit number of groups when aggregating cross-segment data

  • V2 doesn't trim results by default in any stage

  • V2 doesn't aggregate results in the broker, pushing final aggregation processing to server(s)

The default V2 algorithm is shown on the following diagram:

It is possible to enable group limiting and trimming at other stages with:

  • is_enable_group_trim hint - it enables trimming at all V1/V2 levels and group limiting at cross-segment level. minSegmentGroupTrimSize value needs to be set separately. Default value: false

  • mse_min_group_trim_size hint - triggers sorting and trimming of group by results at intermediate stage. Requires is_enable_group_trim hint. Default value: 5000

When the above hints are used, query processing looks as follows:

The actual processing depends on the query, which may not contain V1 leaf stage aggregate component, and rely on AggregateOperator on all levels. Moreover, since trimming relies on order and limit propagation, it may not happen in a subquery if order by column(s) are not available.

Examples

  • If hints are applied to query mentioned in V1 examples above, that is :

    SELECT /*+ aggOptions(is_enable_group_trim='true', mse_min_group_trim_size='10') */        
    i, j, count(*) as cnt
     FROM myTable
     GROUP BY i, j
     ORDER BY i ASC, j ASC
     LIMIT 3

    then execution plan should be as follows:

    LogicalSort
      PinotLogicalSortExchange(distribution=[hash])
        LogicalSort
          PinotLogicalAggregate <- aggregate up to num_groups_limit groups, then sort and trim output to group_trim_size
            PinotLogicalExchange(distribution=[hash[0, 1]])
              LeafStageCombineOperator(table=[mytable])
                StreamingInstanceResponse
                  CombineGroupBy <- aggregate up to minSegmentGroupTrimSize groups
                    GroupBy <- aggregate up to numGroupsLimit groups, optionally sort and trim to minSegmenGroupTrimSize
                      Project
                        DocIdSet
                          FilterMatchEntireSegment

    In the plan above trimming happens in three operators: GroupBy, CombineGroupBy and AggregateOperator (which is the physical implementation of PinotLogicalAggregate).

  • Aggregating over result of a join, e.g.

    select /*+  aggOptions(is_enable_group_trim='true', mse_min_group_trim_size='3') */ 
           t1.i, t1.j, count(*) as cnt
    from tab t1
    join tab t2 on 1=1
    group by t1.i, t1.j
    order by t1.i asc, t1.j asc
    limit 5

    should produce following execution plan:

    LogicalSort
      PinotLogicalSortExchange(distribution=[hash])
        LogicalSort
          PinotLogicalAggregate(aggType=[FINAL]) <- aggregate up to num_groups_limit groups, then sort and trim output to group_trim_size
            PinotLogicalExchange(distribution=[hash[0, 1]])
              PinotLogicalAggregate(aggType=[LEAF]) <- aggregate up to num_groups_limit groups, then sort and trim output to group_trim_size
                LogicalJoin(condition=[true])
                  PinotLogicalExchange(distribution=[random])
                    LeafStageCombineOperator(table=[mytable])
                      ...
                        FilterMatchEntireSegment
                  PinotLogicalExchange(distribution=[broadcast])
                    LeafStageCombineOperator(table=[mytable])
                      ...
                        FilterMatchEntireSegment

    in which there is no leaf stage V1 operator and all aggregation stages are implemented with V2 operator - PinotLogicalAggregate.

Configuration Parameters/hints

Parameter
Default
Query Override
Description

pinot.server.query.executor.max.execution.threads

-1 (use all execution threads)

SET maxExecutionThreads = value;

The maximum number of execution threads (parallelism of segment processing) used per query.

pinot.server.query.executor.num.groups.limit

100,000

SET numGroupsLimit = value;

The maximum number of groups allowed per segment.

pinot.server.query.executor.min.segment.group.trim.size

-1 (disabled)

SET minSegmentGroupTrimSize = value;

The minimum number of groups to keep when trimming groups at the segment level.

pinot.server.query.executor.min.server.group.trim.size

5,000

SET minServerGroupTrimSize = value;

The minimum number of groups to keep when trimming groups at the server level.

pinot.server.query.executor.groupby.trim.threshold

1,000,000

SET groupTrimThreshold = value;

The number of groups to trigger the server level trim.

pinot.broker.min.group.trim.size

5000

SET minBrokerGroupTrimSize = value;

The minimum number of groups to keep when trimming groups at the broker. Applies only to SSQ(*).

pinot.broker.mse.enable.group.trim

false (disabled)

/*+ aggOptions(is_enable_group_trim='value') */

Enable group trim for the query (if possible). Applies only to MSQ(**).

pinot.server.query.executor.mse.min.group.trim.size

5000

/*+ aggOptions(mse_min_group_trim_size='value') */ or SET mseMinGroupTrimSize = value;

The number of groups to keep when trimming groups at intermediate stage. Applies only to MSQ(**).

(*) SSQ - Single-Stage Query

(**) MSQ - Multi-Stage Query

Apart from limiting number of groups on segment level, similar limit is applied at intermediate stage. Since V2 query engine allows for subqueries, in an execution plan, there could be arbitrary number of stages doing intermediate aggregation between leaf (bottom-most) and top-most stages, and each stage can be implemented with many instances of AggregateOperator (shown as PinotLogicalAggregate in output). The operator limits number of distinct groups to 100,000 by default, which can be overridden with numGroupsLimit option or num_groups_limit aggregate hint. The limit applies to a single operator instance, meaning that next stage could receive a total of num_instances * num_groups_limit.

EXPLAIN's
Group by results approximation at various stages of V1 query execution
Default V2 engine group by results approximation
Group by results trimming at various stages of V2 query execution utilizing V1 in leaf stage