arrow-left

All pages
gitbookPowered by GitBook
1 of 6

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Extending Pinot

Writing Custom Aggregation Function

Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See PQL page for the list of aggregation functions.

Adding a new AggregationFunction requires two things

  • Implement AggregationFunctionarrow-up-right interface and make it available as part of the classpath

  • Register the function in . As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

To get an overall idea, see Aggregation Function implementation. All other implementations can be found .

Lets look at the key methods to implements in AggregationFunction

Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

This is advanced topic and assumes you know Pinot . All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

1. Map phase

This phase works on the individual segments in Pinot.

  • Initialization: Depending on the query type the following methods are invoked to setup the result holder. While having different methods and return types adds complexity, it helps in performance.

    • AGGREGATION : createAggregationResultHolderThis must return an instance of type . You can either use the or

2. Combine phase

In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

3. Reduce phase

There are two steps in the Reduce Phase

  • Merge all the IntermediateResult's from various servers using the merge function

  • Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

GROUP BY: createGroupByResultHolderThis method must return an instance of type GroupByResultHolderarrow-up-right. Depending on the type of result object, you might be able to use one of the existing implementationsarrow-up-right.
  • Callback: For every record that matches the filter condition in the query,

    one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

    • AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)

      • length: This represent length of the block. Typically < 10k

      • aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

    • Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

  • AggregationFunctionFactoryarrow-up-right
    MAXarrow-up-right
    herearrow-up-right
    concepts
    AggregationResultHolderarrow-up-right
    DoubleAggregationResultHolderarrow-up-right
    ObjectAggregationResultHolderarrow-up-right
    AverageAggregationFunctionarrow-up-right
    interface AggregationFunction {
    
      AggregationResultHolder createAggregationResultHolder();
    
      GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);
    
      void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);
    
      void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
          Map<String, BlockValSet> blockValSets);
    
      void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
          Map<String, BlockValSet> blockValSets);
    
      IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);
    
      IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);
    
      IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
    
      FinalResult extractFinalResult(IntermediateResult intermediateResult);
    
    }
    public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {
    
      IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
    
    }
      FinalResult extractFinalResult(IntermediateResult intermediateResult);
  • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

  • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

  • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

  • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

  • Pluggable Streams

    Prior to commit ba9f2darrow-up-right, Pinot was only able to support consuming from Kafkaarrow-up-right stream.

    Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to Issue #2583arrow-up-right)

    Some of the streams for which plug-ins can be added are:

    • Amazon kinesisarrow-up-right

    You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.

    Refer to for details on how Pinot consumes streaming data.

    hashtag
    Requirements to support Stream Level (High Level) consumers

    The stream should provide the following guarantees:

    • Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.

    • (Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.

    • Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.

    hashtag
    Requirements to support Partition Level (Low Level) consumers

    While consuming rows at a partition level, the stream should support the following properties:

    • Stream should provide a mechanism to get the current number of partitions.

    • Each event in a partition should have a unique offset that is not more than 64 bits long.

    • Refer to a partition as a number not exceeding 32 bits long.

    In addition, we have an operational requirement that the number of partitions should not be reduced over time.

    hashtag
    Stream plug-in implementation

    In order to add a new type of stream (say,Foo) implement the following classes:

    1. FooConsumerFactory extends

    2. FooPartitionLevelConsumer implements

    3. FooStreamLevelConsumer implements

    Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.

    The properties for the stream implementation are to be set in the table configuration, inside section.

    Use the streamType property to define the stream type. For example, for the implementation of stream foo, set the property "streamType" : "foo".

    The rest of the configuration properties for your stream should be set with the prefix "stream.foo". Be sure to use the same suffix for: (see examples below):

    • topic

    • consumer type

    • stream consumer factory

    All values should be strings. For example:

    You can have additional properties that are specific to your stream. For example:

    In addition to these properties, you can define thresholds for the consuming segments:

    • rows threshold

    • time threshold

    The properties for the thresholds are as follows:

    An example of this implementation can be found in the , which is an implementation for the kafka stream.

    hashtag
    Kafka 2.x Plugin

    Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.

    hashtag
    How to build and release Pinot package with Kafka 2.x connector

    hashtag
    How to use Kafka 2.x connector

    • Use Kafka Stream(High) Level Consumer

    Below is a sample streamConfigs used to create a realtime table with Kafka Stream(High) level consumer.

    Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory in config stream.kafka.consumer.factory.class.name.

    • Use Kafka Partition(Low) Level Consumer

    Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:

    Please note:

    1. Config replicasPerPartition under segmentsConfig is required to specify table replication.

    2. Config stream.kafka.consumer.type should be specified as LowLevel to use partition level consumer. (The use of simple instead of LowLevel is deprecated)

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    • If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g.

    hashtag
    How to use this plugin with higher Kafka version?

    This connector is also suitable for Kafka lib version higher than 2.0.0. In pinot-connector-kafka-2.0/pom.xml change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    The checkpoints should be recorded only when Pinot makes a call to do so.

  • The consumer should be able to start consumption from one of:

    • latest avaialble data

    • earliest available data

    • last saved checkpoint

  • Stream should provide the following mechanisms to get an offset for a given partition of the stream:

    • get the offset of the oldest event available (assuming events are aged out periodically) in the partition.

    • get the offset of the most recent event published in the partition

    • (optionally) get the offset of an event that was published at a specified time

  • Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.

  • Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1, then the offset o2 of the following event should be such that o2 > o1.

  • FooMetadataProvider implements StreamMetadataProviderarrow-up-right

  • FooMessageDecoder implements StreamMessageDecoderarrow-up-right

  • offset
  • decoder class name

  • decoder properties

  • connnection timeout

  • fetch timeout

  • Configs stream.kafka.zk.broker.url and stream.kafka.broker.list are required under tableIndexConfig.streamConfigs to provide kafka related information.

  • localhost:9092
    .
    Azure Event Hubsarrow-up-right
    LogDevicearrow-up-right
    Pravegaarrow-up-right
    Pulsararrow-up-right
    Consuming and Indexing rows in Realtimearrow-up-right
    StreamConsumerFactoryarrow-up-right
    PartitionLevelConsumerarrow-up-right
    StreamLevelConsumerarrow-up-right
    streamConfigsarrow-up-right
    KafkaConsumerFactoryarrow-up-right
    "streamType" : "foo",
    "stream.foo.topic.name" : "SomeTopic",
    "stream.foo.consumer.type": "LowLevel",
    "stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
    "stream.foo.consumer.prop.auto.offset.reset": "largest",
    "stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
    "stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
    "stream.foo.connection.timeout.millis" : "10000", // default 30_000
    "stream.foo.fetch.timeout.millis" : "10000" // default 5_000
    "stream.foo.some.buffer.size" : "24g"
    "realtime.segment.flush.threshold.size" : "100000"
    "realtime.segment.flush.threshold.time" : "6h"
    mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "highLevel",
      "stream.kafka.topic.name": "meetupRSVPEvents",
      "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
      "stream.kafka.hlc.bootstrap.server": "localhost:19092"
    }
    {
      "tableName": "meetupRsvp",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mtime",
        "timeType": "MILLISECONDS",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "meetupRsvp",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "LowLevel",
          "stream.kafka.topic.name": "meetupRSVPEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }

    Segment Fetchers

    When Pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to Pinot Controller and Server:

    1. push segment to shared NFS and let Pinot pull segment files from the location of that NFS.

    2. push segment to a Web server and let Pinot pull segment files from the Web server with http/https link.

    push segment to HDFS and let Pinot pull segment files from HDFS with hdfs location uri.
  • push segment to other system and implement your own segment fetcher to pull data from those systems.

  • The first two options should be supported out of the box with Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.

    hashtag
    HDFS segment fetcher configs

    In your Pinot controller/server configuration, you will need to provide the following configs:

    or

    This path should point the local folder containing core-site.xml and hdfs-site.xml files from your Hadoop installation

    or

    These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.

    You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

    hashtag
    Push HDFS segment to Pinot Controller

    To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.

    For example, the following curl requests to Controller will notify it to download segment files to the proper table:

    hashtag
    Implement your own segment fetcher for other systems

    You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of SegmentFetcherarrow-up-right and provides config to Pinot Controller and Server as follows:

    or

    You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>

    pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
    pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
    pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
    pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
    curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.gz" -H "content-type:application/json" -d '' localhost:9000/segments
    pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
    pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

    Pluggable Storage

    Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.

    Some examples of storage backends(other than local storage) currently supported are:

    • HadoopFSarrow-up-right

    • Azure Data Lakearrow-up-right

    If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.

    hashtag
    New Storage Type implementation

    In order to add a new type of storage backend (say, Amazon s3) implement the following class:

    S3FS extends

    hashtag
    Configurations for Realtime Tables

    The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:

    In the Pinot controller config, add the following new configs:

    Note: currently there is a bug in the controller (issue <), for now you can cherrypick the PR to fix the issue as tested already. The PR is under review now.

    hashtag
    Configurations for Offline Tables

    These properties for the stream implementation are to be set in your controller and server configurations.

    In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include

    You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.

    For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.

    All the following configs need to be prefixed with storage.factory.

    AzurePinotFS requires the following configs according to your environment:

    adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret

    Sample Controller Config

    Sample Server Config

    You can find the parameters in your account as follows:

    Please also make sure to set the following config with the value “adl”

    To see how to upload segments to different storage systems, check ../segment_fetcher.rst.

    HadoopPinotFS requires the following configs according to your environment:

    hadoop.kerberos.principle, hadoop.kerberos.keytab, hadoop.conf.path

    Please make sure to also set the following config with the value “hdfs”

    Record Reader

    Pinot supports indexing data from various file formats. To support reading from a file format, a record reader need to be provided to read the file and convert records into the general format which the indexing engine can understand. The record reader serves as the connector from each individual file format to Pinot record format.

    Pinot package provides the following record readers out of the box:

    • Avro record reader: record reader for Avro format files

    PinotFSarrow-up-right
    PinotFSarrow-up-right
    https://github.com/apache/incubator-pinot/issues/3847>\arrow-up-right
    https://github.com/apache/incubator-pinot/pull/3849arrow-up-right
    https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lakearrow-up-right
    "controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
    "controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
    "pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
    "pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
    "pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
    "pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
    "controller.enable.split.commit": "true"
    "pinot.server.instance.enable.split.commit": "true"
    "pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
    "pinot.controller.storage.factory.adl.accountId": "xxxx"
    "pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
    "pinot.controller.storage.factory.adl.clientId": "xxxx"
    "pinot.controller.segment.fetcher.protocols": "adl"
    "pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
    "pinot.server.storage.factory.adl.accountId": "xxxx"
    "pinot.server.storage.factory.adl.authEndpoint": "xxxx"
    "pinot.server.storage.factory.adl.clientId": "xxxx"
    "pinot.server.segment.fetcher.protocols": "adl"
    "segment.fetcher.protocols" : "adl"
    "segment.fetcher.protocols" : "hdfs"
    CSV record reader: record reader for CSV format files
  • JSON record reader: record reader for JSON format files

  • ORC record reader: record reader for ORC format files

  • Thrift record reader: record reader for Thrift format files

  • Pinot segment record reader: record reader for Pinot segment

  • hashtag
    Initialize Record Reader

    To initialize a record reader, the data file and table schema should be provided (for Pinot segment record reader, only need to provide the index directory because schema can be derived from the segment). The output record will follow the table schema provided.

    For Avro/JSON/ORC/Pinot segment record reader, no extra configuration is required as column names and multi-values are embedded in the data file.

    For CSV/Thrift record reader, extra configuration might be provided to determine the column names and multi-values for the data.

    hashtag
    CSV Record Reader Config

    The CSV record reader config contains the following settings:

    • Header: the header for the CSV file (column names)

    • Column delimiter: delimiter for each column

    • Multi-value delimiter: delimiter for each value for a multi-valued column

    If no config provided, use the default setting:

    • Use the first row in the data file as the header

    • Use ‘,’ as the column delimiter

    • Use ‘;’ as the multi-value delimiter

    hashtag
    Thrift Record Reader Config

    The Thrift record reader config is mandatory. It contains the Thrift class name for the record reader to de-serialize the Thrift objects.

    hashtag
    ORC Record Reader Config

    The following property is to be set during segment generation in your Hadoop properties.

    record.reader.path: ${FULL_PATH_OF_YOUR_RECORD_READER_CLASS}

    For ORC, it would be:

    record.reader.path: org.apache.pinot.orc.data.readers.ORCRecordReader

    hashtag
    Implement Your Own Record Reader

    For other file formats, we provide a general interface for record reader - RecordReaderarrow-up-right. To index the file into Pinot segment, simply implement the interface and plug it into the index engine - SegmentCreationDriverImplarrow-up-right. We use a 2-passes algorithm to index the file into Pinot segment, hence the rewind() method is required for the record reader.

    hashtag
    Generic Row

    GenericRowarrow-up-right is the record abstraction which the index engine can read and index with. It is a map from column name (String) to column value (Object). For multi-valued column, the value should be an object array (Object[]).

    hashtag
    Contracts for Record Reader

    There are several contracts for record readers that developers should follow when implementing their own record readers:

    • The output GenericRow should follow the table schema provided, in the sense that:

      • All the columns in the schema should be preserved (if column does not exist in the original record, put default value instead)

      • Columns not in the schema should not be included

      • Values for the column should follow the field spec from the schema (data type, single-valued/multi-valued)

    • For the time column (refer to ), record reader should be able to read both incoming and outgoing time (we allow incoming time - time value from the original data to outgoing time - time value stored in Pinot conversion during index creation).

      • If incoming and outgoing time column name are the same, use incoming time field spec

      • If incoming and outgoing time column name are different, put both of them as time field spec

  • We keep both incoming and outgoing time column to handle cases where the input file contains time values that are already converted

  • TimeFieldSpecarrow-up-right