# Writing Custom Aggregation Function

Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See [PQL](https://docs.pinot.apache.org/release-0.12.0/users/user-guide-query/querying-pinot) page for the list of aggregation functions.

Adding a new AggregationFunction requires two things

* Implement [AggregationFunction](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunction.java) interface and make it available as part of the classpath
* Register the function in [AggregationFunctionFactory](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java). As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

To get an overall idea, see [MAX](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/MaxAggregationFunction.java) Aggregation Function implementation. All other implementations can be found [here](https://github.com/apache/pinot/tree/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function).

Lets look at the key methods to implements in AggregationFunction

```
interface AggregationFunction {

  AggregationResultHolder createAggregationResultHolder();

  GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);

  void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);

  void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);

  IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

}
```

Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

This is advanced topic and assumes you know Pinot [concepts](https://docs.pinot.apache.org/release-0.12.0/basics/concepts). All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

**1. Map phase**

This phase works on the individual segments in Pinot.

* Initialization: Depending on the query type the following methods are invoked to setup the result holder.  While having different methods and return types adds complexity, it helps in performance.
  * AGGREGATION : `createAggregationResultHolder`This must return an instance of type [AggregationResultHolder](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/AggregationResultHolder.java). You can either use the [DoubleAggregationResultHolder](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DoubleAggregationResultHolder.java) or [ObjectAggregationResultHolder](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/ObjectAggregationResultHolder.java)
  * GROUP BY: `createGroupByResultHolder`This method must return an instance of type [GroupByResultHolder](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByResultHolder.java). Depending on the type of result object, you might be able to use one of the existing [implementations](https://github.com/apache/pinot/tree/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby).
* Callback: For every record that matches the filter condition in the query,

  one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

  * AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map\<String,BlockValSet> blockValSetMap)
    * **length**: This represent length of the block. Typically < 10k
    * **aggregationResultHolder**: this is the object returned from`createAggregationResultHolder`
    * **blockValSetMap**: Map of blockValSets depending on the arguments to the AggFunction
  * Group By Single Value: aggregateGroupBySV(int **length**, int\[] **groupKeyArray**, GroupByResultHolder **groupByResultHolder**, Map **blockValSets**)
    * **length:** This represent length of the block. Typically < 10k
    * **groupKeyArray:**  Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.
    * **groupByResultHolder:** This is the object returned from`createGroupByResultHolder`
    * **blockValSetMap**: Map of blockValSets depending on the arguments to the AggFunction
  * Group By Multi Value: aggregateGroupBySV(int **length**, int\[] **groupKeyArray**, GroupByResultHolder **groupByResultHolder**, Map **blockValSets**)
    * **length:** This represent length of the block. Typically < 10k
    * **groupKeyArray:**  Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.
    * **groupByResultHolder:** This is the object returned from`createGroupByResultHolder`
    * **blockValSetMap**: Map of blockValSets depending on the arguments to the AggFunction

**2. Combine phase**

In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

```
public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

}
```

**3. Reduce phase**

There are two steps in the Reduce Phase

* Merge all the IntermediateResult's from various servers using the  **merge** function
* Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. [AverageAggregationFunction](https://github.com/apache/pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AvgAggregationFunction.java) is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

```
  FinalResult extractFinalResult(IntermediateResult intermediateResult);
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.pinot.apache.org/release-0.12.0/developers/developers-and-contributors/extending-pinot/custom-aggregation-function.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
