All pages
Powered by GitBook
1 of 3

Extending Pinot

Writing Custom Aggregation Function

Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See PQL page for the list of aggregation functions.

Adding a new AggregationFunction requires two things

  • Implement AggregationFunction interface and make it available as part of the classpath

  • Register the function in AggregationFunctionFactory. As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

To get an overall idea, see MAX Aggregation Function implementation. All other implementations can be found here.

Lets look at the key methods to implements in AggregationFunction

interface AggregationFunction {

  AggregationResultHolder createAggregationResultHolder();

  GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);

  void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);

  void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
      Map<String, BlockValSet> blockValSets);

  IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);

  IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

}

Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

This is advanced topic and assumes you know Pinot concepts. All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

1. Map phase

This phase works on the individual segments in Pinot.

  • Initialization: Depending on the query type the following methods are invoked to setup the result holder. While having different methods and return types adds complexity, it helps in performance.

    • AGGREGATION : createAggregationResultHolderThis must return an instance of type AggregationResultHolder. You can either use the DoubleAggregationResultHolder or ObjectAggregationResultHolder

    • GROUP BY: createGroupByResultHolderThis method must return an instance of type GroupByResultHolder. Depending on the type of result object, you might be able to use one of the existing implementations.

  • Callback: For every record that matches the filter condition in the query,

    one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

    • AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)

      • length: This represent length of the block. Typically < 10k

      • aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

      • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

2. Combine phase

In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {

  IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);

}

3. Reduce phase

There are two steps in the Reduce Phase

  • Merge all the IntermediateResult's from various servers using the merge function

  • Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. AverageAggregationFunction is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

  FinalResult extractFinalResult(IntermediateResult intermediateResult);

Segment Fetchers

When Pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to Pinot Controller and Server:

  1. push segment to shared NFS and let Pinot pull segment files from the location of that NFS.

  2. push segment to a Web server and let Pinot pull segment files from the Web server with http/https link.

  3. push segment to HDFS and let Pinot pull segment files from HDFS with hdfs location uri.

  4. push segment to other system and implement your own segment fetcher to pull data from those systems.

The first two options should be supported out of the box with Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.

HDFS segment fetcher configs

In your Pinot controller/server configuration, you will need to provide the following configs:

pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>

or

pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>

This path should point the local folder containing core-site.xml and hdfs-site.xml files from your Hadoop installation

pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>

or

pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=<your kerberos keytab>

These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.

You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

Push HDFS segment to Pinot Controller

To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.

For example, the following curl requests to Controller will notify it to download segment files to the proper table:

curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.gz" -H "content-type:application/json" -d '' localhost:9000/segments

Implement your own segment fetcher for other systems

You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of SegmentFetcher and provides config to Pinot Controller and Server as follows:

pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

or

pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>