LogoLogo
release-0.10.0
release-0.10.0
  • Introduction
  • Basics
    • Concepts
    • Architecture
    • Components
      • Cluster
      • Controller
      • Broker
      • Server
      • Minion
      • Tenant
      • Schema
      • Table
      • Segment
      • Deep Store
      • Pinot Data Explorer
    • Getting Started
      • Running Pinot locally
      • Running Pinot in Docker
      • Quick Start Examples
      • Running in Kubernetes
      • Running on public clouds
        • Running on Azure
        • Running on GCP
        • Running on AWS
      • Batch import example
      • Stream ingestion example
      • HDFS as Deep Storage
      • Troubleshooting Pinot
      • Frequently Asked Questions (FAQs)
        • General
        • Pinot On Kubernetes FAQ
        • Ingestion FAQ
        • Query FAQ
        • Operations FAQ
    • Import Data
      • Batch Ingestion
        • Spark
        • Hadoop
        • Backfill Data
        • Dimension Table
      • Stream ingestion
        • Apache Kafka
        • Amazon Kinesis
        • Apache Pulsar
      • Stream Ingestion with Upsert
      • File Systems
        • Amazon S3
        • Azure Data Lake Storage
        • HDFS
        • Google Cloud Storage
      • Input formats
      • Complex Type (Array, Map) Handling
    • Indexing
      • Forward Index
      • Inverted Index
      • Star-Tree Index
      • Bloom Filter
      • Range Index
      • Text search support
      • JSON Index
      • Geospatial
      • Timestamp Index
    • Releases
      • 0.10.0
      • 0.9.3
      • 0.9.2
      • 0.9.1
      • 0.9.0
      • 0.8.0
      • 0.7.1
      • 0.6.0
      • 0.5.0
      • 0.4.0
      • 0.3.0
      • 0.2.0
      • 0.1.0
    • Recipes
      • GitHub Events Stream
  • For Users
    • Query
      • Querying Pinot
      • Filtering with IdSet
      • Transformation Functions
      • Aggregation Functions
      • User-Defined Functions (UDFs)
      • Cardinality Estimation
      • Lookup UDF Join
      • Querying JSON data
      • Explain Plan
      • Grouping Algorithm
    • APIs
      • Broker Query API
        • Query Response Format
      • Controller Admin API
    • External Clients
      • JDBC
      • Java
      • Python
      • Golang
    • Tutorials
      • Use OSS as Deep Storage for Pinot
      • Ingest Parquet Files from S3 Using Spark
      • Creating Pinot Segments
      • Use S3 as Deep Storage for Pinot
      • Use S3 and Pinot in Docker
      • Batch Data Ingestion In Practice
      • Schema Evolution
  • For Developers
    • Basics
      • Extending Pinot
        • Writing Custom Aggregation Function
        • Segment Fetchers
      • Contribution Guidelines
      • Code Setup
      • Code Modules and Organization
      • Update Documentation
    • Advanced
      • Data Ingestion Overview
      • Ingestion Transformations
      • Null Value Support
      • Advanced Pinot Setup
    • Plugins
      • Write Custom Plugins
        • Input Format Plugin
        • Filesystem Plugin
        • Batch Segment Fetcher Plugin
        • Stream Ingestion Plugin
    • Design Documents
      • Segment Writer API
  • For Operators
    • Deployment and Monitoring
      • Setup cluster
      • Setup table
      • Setup ingestion
      • Decoupling Controller from the Data Path
      • Segment Assignment
      • Instance Assignment
      • Rebalance
        • Rebalance Servers
        • Rebalance Brokers
      • Tiered Storage
      • Pinot managed Offline flows
      • Minion merge rollup task
      • Access Control
      • Monitoring
      • Tuning
        • Realtime
        • Routing
      • Upgrading Pinot with confidence
    • Command-Line Interface (CLI)
    • Configuration Recommendation Engine
    • Tutorials
      • Authentication, Authorization, and ACLs
      • Configuring TLS/SSL
      • Build Docker Images
      • Running Pinot in Production
      • Kubernetes Deployment
      • Amazon EKS (Kafka)
      • Amazon MSK (Kafka)
      • Monitor Pinot using Prometheus and Grafana
  • Configuration Reference
    • Cluster
    • Controller
    • Broker
    • Server
    • Table
    • Schema
    • Ingestion Job Spec
    • Functions
      • ABS
      • ADD
      • arrayConcatInt
      • arrayConcatString
      • arrayContainsInt
      • arrayContainsString
      • arrayDistinctString
      • arrayDistinctInt
      • arrayIndexOfInt
      • arrayIndexOfString
      • ARRAYLENGTH
      • arrayRemoveInt
      • arrayRemoveString
      • arrayReverseInt
      • arrayReverseString
      • arraySliceInt
      • arraySliceString
      • arraySortInt
      • arraySortString
      • arrayUnionInt
      • arrayUnionString
      • AVGMV
      • ceil
      • CHR
      • codepoint
      • concat
      • count
      • COUNTMV
      • day
      • dayOfWeek
      • dayOfYear
      • DISTINCT
      • DISTINCTCOUNT
      • DISTINCTCOUNTBITMAP
      • DISTINCTCOUNTBITMAPMV
      • DISTINCTCOUNTHLL
      • DISTINCTCOUNTHLLMV
      • DISTINCTCOUNTMV
      • DISTINCTCOUNTRAWHLL
      • DISTINCTCOUNTRAWHLLMV
      • DISTINCTCOUNTRAWTHETASKETCH
      • DISTINCTCOUNTTHETASKETCH
      • DIV
      • DATETIMECONVERT
      • DATETRUNC
      • exp
      • FLOOR
      • FromDateTime
      • FromEpoch
      • FromEpochBucket
      • hour
      • JSONFORMAT
      • JSONPATH
      • JSONPATHARRAY
      • JSONPATHARRAYDEFAULTEMPTY
      • JSONPATHDOUBLE
      • JSONPATHLONG
      • JSONPATHSTRING
      • jsonextractkey
      • jsonextractscalar
      • length
      • ln
      • lower
      • lpad
      • ltrim
      • max
      • MAXMV
      • MD5
      • millisecond
      • min
      • minmaxrange
      • MINMAXRANGEMV
      • MINMV
      • minute
      • MOD
      • mode
      • month
      • mult
      • now
      • percentile
      • percentileest
      • percentileestmv
      • percentilemv
      • percentiletdigest
      • percentiletdigestmv
      • quarter
      • regexpExtract
      • remove
      • replace
      • reverse
      • round
      • rpad
      • rtrim
      • second
      • SEGMENTPARTITIONEDDISTINCTCOUNT
      • sha
      • sha256
      • sha512
      • sqrt
      • startswith
      • ST_AsBinary
      • ST_AsText
      • ST_Contains
      • ST_Distance
      • ST_GeogFromText
      • ST_GeogFromWKB
      • ST_GeometryType
      • ST_GeomFromText
      • ST_GeomFromWKB
      • STPOINT
      • ST_Polygon
      • strpos
      • ST_Union
      • SUB
      • substr
      • sum
      • summv
      • TIMECONVERT
      • timezoneHour
      • timezoneMinute
      • ToDateTime
      • ToEpoch
      • ToEpochBucket
      • ToEpochRounded
      • TOJSONMAPSTR
      • toGeometry
      • toSphericalGeography
      • trim
      • upper
      • VALUEIN
      • week
      • year
      • yearOfWeek
  • RESOURCES
    • Community
    • Team
    • Blogs
    • Presentations
    • Videos
  • Integrations
    • Tableau
    • Trino
    • ThirdEye
    • Superset
    • Presto
Powered by GitBook
On this page

Was this helpful?

Export as PDF
  1. For Developers
  2. Basics
  3. Extending Pinot

Writing Custom Aggregation Function

PreviousExtending PinotNextSegment Fetchers

Last updated 3 years ago

Was this helpful?

Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See page for the list of aggregation functions.

Adding a new AggregationFunction requires two things

  • Implement interface and make it available as part of the classpath

  • Register the function in . As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

To get an overall idea, see Aggregation Function implementation. All other implementations can be found .

Lets look at the key methods to implements in AggregationFunction

interface AggregationFunction {

  AggregationResultHolder createAggregationResultHolder();

  GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);

  void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);

  void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);

  IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

}

Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

1. Map phase

This phase works on the individual segments in Pinot.

  • Initialization: Depending on the query type the following methods are invoked to setup the result holder. While having different methods and return types adds complexity, it helps in performance.

  • Callback: For every record that matches the filter condition in the query,

    one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

    • AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)

      • length: This represent length of the block. Typically < 10k

      • aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

2. Combine phase

In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

}

3. Reduce phase

There are two steps in the Reduce Phase

  • Merge all the IntermediateResult's from various servers using the merge function

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

This is advanced topic and assumes you know Pinot . All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

AGGREGATION : createAggregationResultHolderThis must return an instance of type . You can either use the or

GROUP BY: createGroupByResultHolderThis method must return an instance of type . Depending on the type of result object, you might be able to use one of the existing .

Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

PQL
AggregationFunction
AggregationFunctionFactory
MAX
here
concepts
AggregationResultHolder
DoubleAggregationResultHolder
ObjectAggregationResultHolder
GroupByResultHolder
implementations
AverageAggregationFunction