LogoLogo
release-1.3.0
release-1.3.0
  • Introduction
  • Basics
    • Concepts
      • Pinot storage model
      • Architecture
      • Components
        • Cluster
          • Tenant
          • Server
          • Controller
          • Broker
          • Minion
        • Table
          • Segment
            • Deep Store
            • Segment threshold
            • Segment retention
          • Schema
          • Time boundary
        • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Create and update a table configuration
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • From Query Console
      • Batch Ingestion
        • Spark
        • Flink
        • Hadoop
        • Backfill Data
        • Dimension table
      • Stream ingestion
        • Ingest streaming data from Apache Kafka
        • Ingest streaming data from Amazon Kinesis
        • Ingest streaming data from Apache Pulsar
        • Configure indexes
      • Stream ingestion with Upsert
      • Segment compaction on upserts
      • Stream ingestion with Dedup
      • Stream ingestion with CLP
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
        • Complex Type (Array, Map) Handling
        • Complex Type Examples
        • Ingest records with dynamic schemas
      • Reload a table segment
      • Upload a table segment
    • Indexing
      • Bloom filter
      • Dictionary index
      • Forward index
      • FST index
      • Geospatial
      • Inverted index
      • JSON index
      • Native text index
      • Range index
      • Star-tree index
      • Text search support
      • Timestamp index
      • Vector index
    • Release notes
      • 1.3.0
      • 1.2.0
      • 1.1.0
      • 1.0.0
      • 0.12.1
      • 0.12.0
      • 0.11.0
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • Connect to Streamlit
      • Connect to Dash
      • Visualize data with Redash
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Query Syntax
        • Aggregation Functions
        • Array Functions
        • Cardinality Estimation
        • Explain Plan (Single-Stage)
        • Filtering with IdSet
        • Funnel Analysis
        • GapFill Function For Time-Series Dataset
        • Grouping Algorithm
        • Hash Functions
        • JOINs
        • Lookup UDF Join
        • Querying JSON data
        • Transformation Functions
        • URL Functions
        • Window Functions
      • Query Options
      • Query Quotas
      • Query using Cursors
      • Multi-stage query
        • Understanding Stages
        • Stats
        • Optimizing joins
        • Join strategies
          • Random + broadcast join strategy
          • Query time partition join strategy
          • Colocated join strategy
          • Lookup join strategy
        • Hints
        • Operator Types
          • Aggregate
          • Filter
          • Join
          • Intersect
          • Leaf
          • Literal
          • Mailbox receive
          • Mailbox send
          • Minus
          • Sort or limit
          • Transform
          • Union
          • Window
        • Stage-Level Spooling
      • User-Defined Functions (UDFs)
      • Explain plan
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
      • Controller API Reference
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Dependency Management
      • Update documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Aggregations
      • Ingestion Transformations
      • Null value support
      • Use the multi-stage query engine (v2)
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Set up cluster
      • Server Startup Status Checkers
      • Set up table
      • Set up ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
        • Rebalance Tenant
      • Separating data storage by age
        • Using multiple tenants
        • Using multiple directories
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Consistent Push and Rollback
      • Access Control
      • Monitoring
      • Tuning
        • Tuning Default MMAP Advice
        • Real-time
        • Routing
        • Query Routing using Adaptive Server Selection
        • Query Scheduling
      • Upgrading Pinot with confidence
      • Managing Logs
      • OOM Protection Using Automatic Query Killing
      • Pause ingestion based on resource utilization
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication
        • Basic auth access control
        • ZkBasicAuthAccessControl
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
      • Performance Optimization Configurations
      • Segment Operations Throttling
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Ingestion
    • Schema
    • Ingestion Job Spec
    • Monitoring Metrics
    • Functions
      • ABS
      • ADD
      • ago
      • EXPR_MIN / EXPR_MAX
      • ARRAY_AGG
      • arrayConcatDouble
      • arrayConcatFloat
      • arrayConcatInt
      • arrayConcatLong
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctInt
      • arrayDistinctString
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • Base64
      • caseWhen
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • COVAR_POP
      • COVAR_SAMP
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTAVG
      • DISTINCTAVGMV
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTSMARTHLL
      • DISTINCTCOUNTHLLPLUS
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DISTINCTCOUNTULL
      • DISTINCTSUM
      • DISTINCTSUMMV
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FIRSTWITHTIME
      • FLOOR
      • FrequentLongsSketch
      • FrequentStringsSketch
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • FUNNELCOUNT
      • FunnelCompleteCount
      • FunnelMaxStep
      • FunnelMatchStep
      • Histogram
      • hour
      • isSubnetOf
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • LAG
      • LASTWITHTIME
      • LEAD
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • percentilekll
      • percentilerawkll
      • percentilekllmv
      • percentilerawkllmv
      • quarter
      • regexpExtract
      • regexpReplace
      • remove
      • replace
      • reverse
      • round
      • roundDecimal
      • ROW_NUMBER
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • Url
      • UTF8
      • VALUEIN
      • week
      • year
      • Extract
      • yearOfWeek
      • FIRST_VALUE
      • LAST_VALUE
      • ST_GeomFromGeoJSON
      • ST_GeogFromGeoJSON
      • ST_AsGeoJSON
    • Plugin Reference
      • Stream Ingestion Connectors
      • VAR_POP
      • VAR_SAMP
      • STDDEV_POP
      • STDDEV_SAMP
    • Dynamic Environment
  • Reference
    • Single-stage query engine (v1)
    • Multi-stage query engine (v2)
    • Troubleshooting
      • Troubleshoot issues with the multi-stage query engine (v2)
      • Troubleshoot issues with ZooKeeper znodes
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
    • Spark-Pinot Connector
  • Contributing
    • Contribute Pinot documentation
    • Style guide
Powered by GitBook
On this page

Was this helpful?

Export as PDF
  1. For Developers
  2. Basics
  3. Extending Pinot

Writing Custom Aggregation Function

PreviousExtending PinotNextSegment Fetchers

Was this helpful?

Pinot has many built-in Aggregation Functions such as MIN, MAX, SUM, AVG etc. See page for the list of aggregation functions.

Adding a new AggregationFunction requires two things

  • Implement interface and make it available as part of the classpath

  • Register the function in . As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

To get an overall idea, see Aggregation Function implementation. All other implementations can be found .

Lets look at the key methods to implements in AggregationFunction

interface AggregationFunction {

  AggregationResultHolder createAggregationResultHolder();

  GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);

  void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);

  void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);

  IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

}

Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

1. Map phase

This phase works on the individual segments in Pinot.

  • Initialization: Depending on the query type the following methods are invoked to set up the result holder. While having different methods and return types adds complexity, it helps in performance.

  • Callback: For every record that matches the filter condition in the query,

    one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

    • AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)

      • length: This represent length of the block. Typically < 10k

      • aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

2. Combine phase

In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

}

3. Reduce phase

There are two steps in the Reduce Phase

  • Merge all the IntermediateResult's from various servers using the merge function

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

This is advanced topic and assumes you know Pinot . All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

AGGREGATION : createAggregationResultHolderThis must return an instance of type . You can either use the or

GROUP BY: createGroupByResultHolderThis method must return an instance of type . Depending on the type of result object, you might be able to use one of the existing .

Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

PQL
AggregationFunction
AggregationFunctionFactory
MAX
here
concepts
AggregationResultHolder
DoubleAggregationResultHolder
ObjectAggregationResultHolder
GroupByResultHolder
implementations
AverageAggregationFunction