arrow-left

All pages
gitbookPowered by GitBook
1 of 11

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Segment Fetchers

When Pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to Pinot Controller and Server:

  1. push segment to shared NFS and let Pinot pull segment files from the location of that NFS.

  2. push segment to a Web server and let Pinot pull segment files from the Web server with http/https link.

  3. push segment to HDFS and let Pinot pull segment files from HDFS with hdfs location uri.

  4. push segment to other system and implement your own segment fetcher to pull data from those systems.

The first two options should be supported out of the box with Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.

hashtag
HDFS segment fetcher configs

In your Pinot controller/server configuration, you will need to provide the following configs:

or

This path should point the local folder containing core-site.xml and hdfs-site.xml files from your Hadoop installation

or

These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.

You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.

hashtag
Push HDFS segment to Pinot Controller

To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.

For example, the following curl requests to Controller will notify it to download segment files to the proper table:

hashtag
Implement your own segment fetcher for other systems

You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of and provides config to Pinot Controller and Server as follows:

or

You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>

Extending Pinot

Basics

SegmentFetcherarrow-up-right
pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.gz" -H "content-type:application/json" -d '' localhost:9000/segments
pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>

Code Setup

hashtag
Dev Environment Setup

To contribute to Pinot, please follow the instructions below.

hashtag
Git

Pinot uses git for source code management. If you are new to Git, it will be good to review of Git and a common tasks like and .

hashtag
Getting the Source Code

hashtag
Create a fork

To limit the number of branches created on the Apache Pinot repository, we recommend that you create a fork by clicking on the fork button . Read more about

hashtag
Clone the repository locally

hashtag
Maven

Pinot is a Maven project and familiarity with Maven will help you work with Pinot code. If you are new to Maven, you can read about Maven and .

Run the following maven command to setup the project.

hashtag
Setup IDE

Import the project into your favorite IDE. Setup stylesheet according to your IDE. We have provided instructions for intellij and eclipse. If you are using other IDEs, please ensure you use stylesheet based on .

hashtag
Intellij

To import the Pinot stylesheet this launch intellij and navigate to Preferences (on Mac) or Settings on Linux.

  • Navigate to Editor -> Code Style -> Java

  • Select Import Scheme -> Intellij IDES code style XML

hashtag
Eclipse

To import the Pinot stylesheet this launch eclipse and navigate to Preferences (on Mac) or Settings on Linux.

  • Navigate to Java->Code Style->Formatter

  • Choose codestyle-eclipse.xml from incubator-pinot/config folder of your workspace. Click Apply.

Choose codestyle-intellij.xml from incubator-pinot/config folder of your workspace. Click Apply.

basicsarrow-up-right
managing branchesarrow-up-right
rebasingarrow-up-right
in this pagearrow-up-right
fork workflow herearrow-up-right
herearrow-up-right
get a quick overview herearrow-up-right
thisarrow-up-right
_images/import_scheme.png
_images/eclipse_style.png
$ mkdir workspace
$ cd workspace
$ git clone [email protected]:<github username>/pinot.git
$ cd pinot
# set upstream
$ git remote add upstream https://github.com/apache/incubator-pinot
# check that the upstream shows up correctly
$ git remote -v
# compile, download sources
$mvn install package -DskipTests -Pbin-dist -DdownloadSources -DdownloadJavadocs

Pluggable Storage

Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.

Some examples of storage backends(other than local storage) currently supported are:

  • HadoopFSarrow-up-right

  • Azure Data Lakearrow-up-right

If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.

hashtag
New Storage Type implementation

In order to add a new type of storage backend (say, Amazon s3) implement the following class:

S3FS extends

hashtag
Configurations for Realtime Tables

The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:

In the Pinot controller config, add the following new configs:

Note: currently there is a bug in the controller (issue <), for now you can cherrypick the PR to fix the issue as tested already. The PR is under review now.

hashtag
Configurations for Offline Tables

These properties for the stream implementation are to be set in your controller and server configurations.

In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include

You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.

For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.

All the following configs need to be prefixed with storage.factory.

AzurePinotFS requires the following configs according to your environment:

adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret

Sample Controller Config

Sample Server Config

You can find the parameters in your account as follows:

Please also make sure to set the following config with the value “adl”

To see how to upload segments to different storage systems, check ../segment_fetcher.rst.

HadoopPinotFS requires the following configs according to your environment:

hadoop.kerberos.principle, hadoop.kerberos.keytab, hadoop.conf.path

Please make sure to also set the following config with the value “hdfs”

PinotFSarrow-up-right
PinotFSarrow-up-right
https://github.com/apache/incubator-pinot/issues/3847>\arrow-up-right
https://github.com/apache/incubator-pinot/pull/3849arrow-up-right
https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lakearrow-up-right
"controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
"controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
"pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
"pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
"controller.enable.split.commit": "true"
"pinot.server.instance.enable.split.commit": "true"
"pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.controller.storage.factory.adl.accountId": "xxxx"
"pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.segment.fetcher.protocols": "adl"
"pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.server.storage.factory.adl.accountId": "xxxx"
"pinot.server.storage.factory.adl.authEndpoint": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.segment.fetcher.protocols": "adl"
"segment.fetcher.protocols" : "adl"
"segment.fetcher.protocols" : "hdfs"

Update Documentation

Pinot documentation is powered by Gitbook, and a bi-directional Github integration is set up to back up all the changes.

The git repo is here: https://github.com/pinot-contrib/pinot-docsarrow-up-right

For Pinot Contributor, there are majorly two ways to update the documentations.

hashtag
Submit a Pull Request

This follows the old fashion of updating documentations.

You can checkout repo and modify the documentation accordingly then submit a PullRequest for review.

Once the PR got merged, the changes will automatically applied to corresponding Gitbook pages.

Please note that all Gitbook documentation follows .

hashtag
Directly Edit on Gitbook

Once granted edit permission, contributors could edit any page on Gitbook and then save and merge the changes by themselves. This is one example commit on Github repo to reflect the updates coming from Git book: .

Usually we grant edit permission to committers and active contributors.

Please contact admin(Email to with the content you wanna add) to ask for edit permission for Pinot Gitbook.

Once granted the permission, you can directly working on to modify the documentation, and merge changes.

pinot-docsarrow-up-right
Markdown Syntaxarrow-up-right
Adding Update Document Page Commitarrow-up-right
[email protected]envelope
Pinot Gitbook UIarrow-up-right

Contribution Guidelines

Before you begin to contribute, make sure you have reviewed Dev Environment Setuparrow-up-right and Code Modules and Organizationarrow-up-right sections and that you have created your own fork of the pinot source code.

hashtag
Create a design document

If your change is relatively minor, you can skip this step. If you are adding new major feature, we suggest that you add a design document and solicit comments from the community before submitting any code.

Herearrow-up-right is a list of current design documents.

hashtag
Create an issue for the change

Create a Pinot issue for the change you would like to make. Provide information on why the change is needed and how you plan to address it. Use the conversations on the issue as a way to validate assumptions and the right way to proceed. Be sure to review sections on and .

If you have a design document, please refer to the design documents in your Issue. You may even want to create multiple issues depending on the extent of your change.

Once you are clear about what you want to do, proceed with the next steps listed below.

hashtag
Create a branch for your change

Make the necessary changes. If the changes you plan to make are too big, make sure you break it down into smaller tasks.

hashtag
Making the changes

Follow the recommendations/best-practices noted here when you are making changes.

hashtag
Code documentation

Please ensure your code is adequately documented. Some things to consider for documentation:

  • Always include class level java docs. At the top class level, we are looking for information about what functionality is provided by the class, what state is maintained by the class, whether there are concurrency/thread-safety concerns and any exceptional behavior that the class might exhibit.

  • Document public methods and their parameters.

hashtag
Logging

  • Ensure there is adequate logging for positive paths as well as exceptional paths. As a corollary to this, ensure logs are not noisy.

  • Do not use System.out.println to log messages. Use the slf4j loggers.

  • Use logging levels correctly: set level to debug

hashtag
Exceptions and Exception-Handling

  • Where possible, throw specific exceptions, preferably checked exceptions, so the callers can easily determine what the erroneous conditions that need to be handled are.

  • Avoid catching broad exceptions (ie, catch (Exception e) blocks, except for when this is in the run() method of a thread/runnable.

Current Pinot code does not strictly adhere to this, but we would like to change this over time and adopt best practices around exception handling.

hashtag
Backward and Forward compatibility changes

If you are making any changes to state stored, either in Zookeeper or in segments, make sure you consider both backward and forward compatibility issues.

  • For backward compatibility, consider cases where one component is using the new version and another is still on the old version. E.g., when the request format between broker and server is updated, consider resulting behaviors when a new broker is talking to an older server. Will it break?

  • For forward compatibility, consider rollback cases. E.g., consider what happens when state persisted by new code is handled by old code. Does the old code skip over new fields?

hashtag
External libraries

Be cautious about pulling in external dependencies. You will need to consider multiple things when faced with a need to pull in a new library.

  • What capability is the addition of the library providing you with? Can existing libraries provide this functionality (may be with a little bit of effort)?

  • Is the external library maintained by an active community of contributors?

  • What are the licensing terms for the library. For more information about handling licenses, see .

hashtag
Testing your changes

Automated tests are always recommended for contributions. Make sure you write tests so that:

  1. You verify the correctness of your contribution. This serves as proof to you as well as the reviewers.

  2. You future proof your contributions against code refactors or other changes. While this may not always be possible (see ), its a good goal to aim for.

Identify a list of tests for the changes you have made. Depending on the scope of changes, you may need one or more of the following tests:

  • Unit Tests

    Make sure your code has the necessary class or method level unit tests. It is important to write both positive case as well as negative case tests. Document your tests well and add meaningful assertions in the tests; when the assertions fail, ensure that the right messages are logged with information that allows other to debug.

  • Integration Tests

    Add integration tests to cover End-to-End paths without relying on mocking (see note below). You MUST

hashtag
Testing Guidelines

  • Mocking

    Use to mock classes to control specific behaviors - e.g., simulate various error conditions.

circle-info

Note

DO NOT use advanced mock libraries such as . They make bytecode level changes to allow tests for static/private members but this typically results in other tools like jacoco to fail. They also promote incorrect implementation choices that make it harder to test additional changes. When faced with a choice to use PowerMock or advanced mocking options, you might either need to refactor the code to work better with mocking or you actually need to write an integration test instead of a unit test.

  • Validate assumptions in tests

    Make sure that adequate asserts are added in the tests to verify that the tests are passing for the right reasons.

  • Write reliable tests

    Make sure you are writing tests that are reliable. If the tests depend on asynchronous events to be fired, do not add sleep to your tests. Where possible, use appropriate mocking or condition based triggers.

hashtag
License Headers for newly added files

All source code files should have license headers. To automatically add the header for any new file you plan to checkin, run in pinot top-level folder:

Note

If you checkin third-party code or files, please make sure you review Apache guidelines:

Once you determine the code you are pulling in adhere to the guidelines above, go ahead pull the changes in. Do not add license headers for them. Follow these instructions to ensure we are compliant with Apache Licensing process:

  • Under pinot/licenses add a LICENSE-<newlib> file that has the license terms of the included library.

  • Update the pinot/LICENSE file to indicate the newly added library file paths under the corresponding supported Licenses.

  • Update the exclusion rules for

If attention to the licensing terms in not paid early on, they will be caught much later in the process, when we prepare to make a new release. Updating code at that time to work with the right libraries at that time might require bigger refactoring changes and delay the release process.

hashtag
Creating a Pull Request (PR)

  • Verifying code-style

    Run the following command to verify the code-style before posting a PR

  • Run tests

    Before you create a review request for the changes, make sure you have run the corresponding unit tests for your changes. You can run individual tests via the IDE or via maven command-line. Finally run all tests locally by running mvn clean install -Pbin-dist.

    For changes that are related to performance issues or race conditions, it is hard to write reliable tests, so we recommend running manual stress tests to validate the changes. You MUST note the manual tests done in the PR description.

  • Once you receive comments on github on your changes, be sure to respond to them on github and address the concerns. If any discussions happen offline for the changes in question, make sure to capture the outcome of the discussion, so others can follow along as well.

    It is possible that while your change is being reviewed, other changes were made to the master branch. Be sure to pull rebase your change on the new changes thus:

  • When you have addressed all comments and have an approved PR, one of the committers can merge your PR.

  • After your change is merged, check to see if any documentation needs to be updated. If so, create a PR for documentation.

hashtag
Update Documentation

Usually for new features, functionalities, API changes, documentation update is required to keep users up to date and keep track of our development.

Please follow this link to accordingly.

Record Reader

Pinot supports indexing data from various file formats. To support reading from a file format, a record reader need to be provided to read the file and convert records into the general format which the indexing engine can understand. The record reader serves as the connector from each individual file format to Pinot record format.

Pinot package provides the following record readers out of the box:

  • Avro record reader: record reader for Avro format files

  • CSV record reader: record reader for CSV format files

  • JSON record reader: record reader for JSON format files

  • ORC record reader: record reader for ORC format files

  • Thrift record reader: record reader for Thrift format files

  • Pinot segment record reader: record reader for Pinot segment

hashtag
Initialize Record Reader

To initialize a record reader, the data file and table schema should be provided (for Pinot segment record reader, only need to provide the index directory because schema can be derived from the segment). The output record will follow the table schema provided.

For Avro/JSON/ORC/Pinot segment record reader, no extra configuration is required as column names and multi-values are embedded in the data file.

For CSV/Thrift record reader, extra configuration might be provided to determine the column names and multi-values for the data.

hashtag
CSV Record Reader Config

The CSV record reader config contains the following settings:

  • Header: the header for the CSV file (column names)

  • Column delimiter: delimiter for each column

  • Multi-value delimiter: delimiter for each value for a multi-valued column

If no config provided, use the default setting:

  • Use the first row in the data file as the header

  • Use ‘,’ as the column delimiter

  • Use ‘;’ as the multi-value delimiter

hashtag
Thrift Record Reader Config

The Thrift record reader config is mandatory. It contains the Thrift class name for the record reader to de-serialize the Thrift objects.

hashtag
ORC Record Reader Config

The following property is to be set during segment generation in your Hadoop properties.

record.reader.path: ${FULL_PATH_OF_YOUR_RECORD_READER_CLASS}

For ORC, it would be:

record.reader.path: org.apache.pinot.orc.data.readers.ORCRecordReader

hashtag
Implement Your Own Record Reader

For other file formats, we provide a general interface for record reader - . To index the file into Pinot segment, simply implement the interface and plug it into the index engine - . We use a 2-passes algorithm to index the file into Pinot segment, hence the rewind() method is required for the record reader.

hashtag
Generic Row

is the record abstraction which the index engine can read and index with. It is a map from column name (String) to column value (Object). For multi-valued column, the value should be an object array (Object[]).

hashtag
Contracts for Record Reader

There are several contracts for record readers that developers should follow when implementing their own record readers:

  • The output GenericRow should follow the table schema provided, in the sense that:

    • All the columns in the schema should be preserved (if column does not exist in the original record, put default value instead)

    • Columns not in the schema should not be included

Values for the column should follow the field spec from the schema (data type, single-valued/multi-valued)

  • For the time column (refer to TimeFieldSpecarrow-up-right), record reader should be able to read both incoming and outgoing time (we allow incoming time - time value from the original data to outgoing time - time value stored in Pinot conversion during index creation).

    • If incoming and outgoing time column name are the same, use incoming time field spec

    • If incoming and outgoing time column name are different, put both of them as time field spec

    • We keep both incoming and outgoing time column to handle cases where the input file contains time values that are already converted

  • RecordReaderarrow-up-right
    SegmentCreationDriverImplarrow-up-right
    GenericRowarrow-up-right
    for verbose logs useful for only for debugging.
  • Do not log stack traces via printStackTrace method of the exception.

  • Are you adding the library to Foundational modulesarrow-up-right modules? This will affect the rest of the Pinot code base. If the new library pulls in a lot of transitive dependencies, then we might encounter unexpected issues with multiple classes in the classpath. These issues are hard to catch with tests as the order of loading the libraries at runtime matters. If you absolutely need the support, consider adding it via extension modules, see Extension modulesarrow-up-right.

    add integration tests for REST APIs, and must include tests that cover different error codes; i.e., 200 OK, 4xx or 5xx errors that are explicit contracts of the API.
    license
    and
    rat
    maven plugins in the parent pom:
    pinot/pom.xml
    .
    Push changes and create a PR for review

    Commit your changes with a meaningful commit message.

    herearrow-up-right
    Backward and Forward compatibility changesarrow-up-right
    External librariesarrow-up-right
    License Headers for newly added filesarrow-up-right
    Testing Guidelinesarrow-up-right
    Mockitoarrow-up-right
    PowerMockarrow-up-right
    Licences that can be includedarrow-up-right
    Licences that may be includedarrow-up-right
    Licenses that should not be includedarrow-up-right
    Update Document
    $ cd pinot
    #
    # ensure you are starting from the latest code base
    # the following steps, ensure your fork's (origin's) master is up-to-date
    #
    $ git fetch upstream
    $ git checkout master
    $ git merge upstream/master
    # create a branch for your issue
    $ git checkout -b <your issue branch>
    mvn license:format
    mvn checkstyle:check
    $ git add <files required for the change>
    $ git commit -m "Meaningful oneliner for the change"
    $ git push origin <your issue branch>
    
    After this, create a PullRequest in `github <https://github.com/apache/incubator-pinot/pulls>`_. Include the following information in the description:
    
      * The changes that are included in the PR.
    
      * Design document, if any.
    
      * Information on any implementation choices that were made.
    
      * Evidence of sufficient testing. You ``MUST`` indicate the tests done, either manually or automated.
    
    Once the PR is created, the code base is compiled and all tests are run via ``travis``. Make sure you followup on any issues flagged by travis and address them.
    If you see test failures that are intermittent, ``please`` create an issue to track them.
    
    Once the ``travis`` run is clear, request reviews from atleast 2 committers on the project and be sure to gently to followup on the issue with the reviewers.
    # commit your changes
    $ git add <updated files>
    $ git commit -m "Meaningful message for the udpate"
    # pull new changes
    $ git checkout master
    $ git merge upstream/master
    $ git checkout <your issue branch>
    $ git rebase master
    
    At this time, if rebase flags any conflicts, resolve the conflicts and follow the instructions provided by the rebase command.
    
    Run additional tests/validations for the new changes and update the PR by pushing your changes:
    $ git push origin <your issue branch>

    Writing Custom Aggregation Function

    Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See PQL page for the list of aggregation functions.

    Adding a new AggregationFunction requires two things

    • Implement AggregationFunctionarrow-up-right interface and make it available as part of the classpath

    • Register the function in . As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.

    To get an overall idea, see Aggregation Function implementation. All other implementations can be found .

    Lets look at the key methods to implements in AggregationFunction

    Before getting into the implementation, it's important to understand how Aggregation works in Pinot.

    This is advanced topic and assumes you know Pinot . All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases

    1. Map phase

    This phase works on the individual segments in Pinot.

    • Initialization: Depending on the query type the following methods are invoked to setup the result holder. While having different methods and return types adds complexity, it helps in performance.

      • AGGREGATION : createAggregationResultHolderThis must return an instance of type . You can either use the or

    2. Combine phase

    In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.

    3. Reduce phase

    There are two steps in the Reduce Phase

    • Merge all the IntermediateResult's from various servers using the merge function

    • Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)

    Code Modules and Organization

    TODO: Deprecated

    Before proceeding to contributing changes to Pinot, review the contents of this section.

    hashtag
    External Dependencies

    Pinot depends on a number of external projects, the most notable ones are:

    • Apache Zookeeper

    • Apache Helix

    • Apache Kafka

    • Apache Thrift

    • Netty

    • Google Guava

    • Yammer

    Helix is used for ClusterManagement, and Pinot code is tightly integrated with Helix and Zookeeper interfaces.

    Kafka is the default realtime stream provider, but can be replaced with others. See customizations section for more info.

    Thrift is used for message exchange between broker and server components, with Netty providing the server functionality for processing messages in a non-blocking fashion.

    Guava is used for number of auxiliary components such as Caches and RateLimiters. Yammer metrics is used to register and expose metrics from Pinot components.

    In addition, Pinot relies on several key external libraries for some of its core functionality: Roaring Bitmaps: Pinot’s inverted indices are built using library. t-Digest: Pinot’s digest based percentile calculations are based on library.

    hashtag
    Pinot Modules

    Pinot is a multi-module project, with each module providing specific functionality that helps us to build services from a combination of modules. This helps keep clean interface contracts between different modules as well as reduce the overall executable size for individually deployable component.

    Each module has a src/main/java folder where the code resides and src/test/java where the unit tests corresponding to the module’s code reside.

    hashtag
    Foundational modules

    The following figure provides a high-level overview of the foundational Pinot modules.

    hashtag
    pinot-common

    pinot-common provides classes common to Pinot components. Some key classes you will find here are:

    • config: Definitions for various elements of Pinot’s table config.

    • metrics: Definitions for base metrics provided by Controller, Broker and Server.

    • metadata

    hashtag
    pinot-transport

    pinot-transport module provides classes required to handle scatter-gather on Pinot Broker and netty wrapper classes used by Server to handle connections from Broker.

    hashtag
    pinot-core

    pinot-core modules provides the core functionality of Pinot, specifically for handling segments, various index structures, query execution - filters, transformations, aggregations etc and support for realtime segments.

    hashtag
    pinot-server

    pinot-server provides server specific functionality including server startup and REST APIs exposed by the server.

    hashtag
    pinot-controller

    pinot-controller houses all the controller specific functionality, including many cluster administration APIs, segment upload (for both offline and realtime), segment assignment, retention strategies etc.

    hashtag
    pinot-broker

    pinot-broker provides broker functionality that includes wiring the broker startup sequence, building broker routing tables, PQL request handling.

    hashtag
    pinot-minion

    pinot-minion provides functionality for running auxiliary/periodic tasks on a Pinot Cluster such as purging records for compliance with regulations like GDPR.

    hashtag
    pinot-hadoop

    pinot-hadoop provides classes for segment generation jobs using Hadoop infrastructure.

    hashtag
    Auxiliary modules

    In addition to the core modules described above, Pinot code provides the following modules:

    • pinot-tools: This module is a collection of many tools useful for setting up Pinot cluster, creating/updating segments.It also houses the Pinot quick start guide code.

    • pinot-perf: This module has a collection of benchmark test code used to evaluate design options.

    These tests typically do not rely on mocking and provide more end to end coverage for code.

    hashtag
    Extension modules

    pinot-hadoop-filesystem and pinot-azure-filesystem are module added to support extensions to Pinot filesystem. The functionality is broken down into modules of their own to avoid polluting the common modules with additional large libraries. These libraries bring in transitive dependencies of their own that can cause classpath conflicts at runtime. We would like to avoid this for the common usage of Pinot as much as possible.

    GROUP BY: createGroupByResultHolderThis method must return an instance of type GroupByResultHolderarrow-up-right. Depending on the type of result object, you might be able to use one of the existing implementationsarrow-up-right.
  • Callback: For every record that matches the filter condition in the query,

    one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.

    • AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)

      • length: This represent length of the block. Typically < 10k

      • aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder

      • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

    • Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

    • Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)

      • length: This represent length of the block. Typically < 10k

      • groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.

  • AggregationFunctionFactoryarrow-up-right
    MAXarrow-up-right
    herearrow-up-right
    concepts
    AggregationResultHolderarrow-up-right
    DoubleAggregationResultHolderarrow-up-right
    ObjectAggregationResultHolderarrow-up-right
    AverageAggregationFunctionarrow-up-right
    : Definitions of metadata stored in Zookeeper.
  • pql.parsers: Code to compile PQL strings into corresponding AbstractSyntaxTrees (AST).

  • request: Autogenerated thrift classes representing various parts of PQL requests.

  • response: Definitions of response format returned by the Broker.

  • filesystem: provides abstractions for working with segments on local or remote filesystems. This module allows for users to plugin filesystems specific to their usecase. Extensions to the base PinotFS should ideally be housed in their specific modules so as not pull in unnecessary dependencies for all users.

  • pinot-client-api
    : This module houses the Java client API. See
    for more info.
  • pinot-integration-tests: This module holds integration tests that test functionality across multiple classes or components.

  • RoaringBitmaparrow-up-right
    T-Digestarrow-up-right
    _images/PinotFoundation.png
    _images/PinotServer.png
    _images/PinotController.png
    _images/PinotBroker.png
    _images/PinotMinionHadoop.png
    Executing queries via Java Client APIarrow-up-right

    Pluggable Streams

    Prior to commit , Pinot was only able to support consuming from stream.

    Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to )

    Some of the streams for which plug-ins can be added are:

    interface AggregationFunction {
    
      AggregationResultHolder createAggregationResultHolder();
    
      GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);
    
      void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);
    
      void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
          Map<String, BlockValSet> blockValSets);
    
      void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
          Map<String, BlockValSet> blockValSets);
    
      IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);
    
      IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);
    
      IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
    
      FinalResult extractFinalResult(IntermediateResult intermediateResult);
    
    }
    public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {
    
      IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
    
    }
      FinalResult extractFinalResult(IntermediateResult intermediateResult);
  • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

  • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

  • groupByResultHolder: This is the object returned fromcreateGroupByResultHolder

  • blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction

  • Azure Event Hubsarrow-up-right

  • LogDevicearrow-up-right

  • Pravegaarrow-up-right

  • Pulsararrow-up-right

  • You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.

    Refer to Consuming and Indexing rows in Realtimearrow-up-right for details on how Pinot consumes streaming data.

    hashtag
    Requirements to support Stream Level (High Level) consumers

    The stream should provide the following guarantees:

    • Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.

    • (Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.

    • Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.

    • The checkpoints should be recorded only when Pinot makes a call to do so.

    • The consumer should be able to start consumption from one of:

      • latest avaialble data

      • earliest available data

    hashtag
    Requirements to support Partition Level (Low Level) consumers

    While consuming rows at a partition level, the stream should support the following properties:

    • Stream should provide a mechanism to get the current number of partitions.

    • Each event in a partition should have a unique offset that is not more than 64 bits long.

    • Refer to a partition as a number not exceeding 32 bits long.

    • Stream should provide the following mechanisms to get an offset for a given partition of the stream:

      • get the offset of the oldest event available (assuming events are aged out periodically) in the partition.

      • get the offset of the most recent event published in the partition

    • Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.

    • Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1, then the offset o2 of the following event should be such that o2 > o1.

    In addition, we have an operational requirement that the number of partitions should not be reduced over time.

    hashtag
    Stream plug-in implementation

    In order to add a new type of stream (say,Foo) implement the following classes:

    1. FooConsumerFactory extends StreamConsumerFactoryarrow-up-right

    2. FooPartitionLevelConsumer implements PartitionLevelConsumerarrow-up-right

    3. FooStreamLevelConsumer implements StreamLevelConsumerarrow-up-right

    4. FooMetadataProvider implements

    5. FooMessageDecoder implements

    Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.

    The properties for the stream implementation are to be set in the table configuration, inside streamConfigsarrow-up-right section.

    Use the streamType property to define the stream type. For example, for the implementation of stream foo, set the property "streamType" : "foo".

    The rest of the configuration properties for your stream should be set with the prefix "stream.foo". Be sure to use the same suffix for: (see examples below):

    • topic

    • consumer type

    • stream consumer factory

    • offset

    • decoder class name

    • decoder properties

    • connnection timeout

    • fetch timeout

    All values should be strings. For example:

    You can have additional properties that are specific to your stream. For example:

    In addition to these properties, you can define thresholds for the consuming segments:

    • rows threshold

    • time threshold

    The properties for the thresholds are as follows:

    An example of this implementation can be found in the KafkaConsumerFactoryarrow-up-right, which is an implementation for the kafka stream.

    hashtag
    Kafka 2.x Plugin

    Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.

    hashtag
    How to build and release Pinot package with Kafka 2.x connector

    hashtag
    How to use Kafka 2.x connector

    • Use Kafka Stream(High) Level Consumer

    Below is a sample streamConfigs used to create a realtime table with Kafka Stream(High) level consumer.

    Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory in config stream.kafka.consumer.factory.class.name.

    • Use Kafka Partition(Low) Level Consumer

    Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:

    Please note:

    1. Config replicasPerPartition under segmentsConfig is required to specify table replication.

    2. Config stream.kafka.consumer.type should be specified as LowLevel to use partition level consumer. (The use of simple instead of LowLevel is deprecated)

    3. Configs stream.kafka.zk.broker.url and stream.kafka.broker.list are required under tableIndexConfig.streamConfigs to provide kafka related information.

    hashtag
    Upgrade from Kafka 0.9 connector to Kafka 2.x connector

    • Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory.

    • If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server into tableIndexConfig.streamConfigs. This config should be the URI of Kafka broker lists, e.g. localhost:9092.

    hashtag
    How to use this plugin with higher Kafka version?

    This connector is also suitable for Kafka lib version higher than 2.0.0. In pinot-connector-kafka-2.0/pom.xml change the kafka.lib.version from 2.0.0 to 2.1.1 will make this Connector working with Kafka 2.1.1.

    ba9f2darrow-up-right
    Kafkaarrow-up-right
    Issue #2583arrow-up-right
    Amazon kinesisarrow-up-right
    "streamType" : "foo",
    "stream.foo.topic.name" : "SomeTopic",
    "stream.foo.consumer.type": "LowLevel",
    "stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
    "stream.foo.consumer.prop.auto.offset.reset": "largest",
    "stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
    "stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
    "stream.foo.connection.timeout.millis" : "10000", // default 30_000
    "stream.foo.fetch.timeout.millis" : "10000" // default 5_000
    "stream.foo.some.buffer.size" : "24g"
    "realtime.segment.flush.threshold.size" : "100000"
    "realtime.segment.flush.threshold.time" : "6h"
    mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "highLevel",
      "stream.kafka.topic.name": "meetupRSVPEvents",
      "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
      "stream.kafka.zk.broker.url": "localhost:2191/kafka",
      "stream.kafka.hlc.bootstrap.server": "localhost:19092"
    }
    {
      "tableName": "meetupRsvp",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "timeColumnName": "mtime",
        "timeType": "MILLISECONDS",
        "segmentPushType": "APPEND",
        "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
        "schemaName": "meetupRsvp",
        "replication": "1",
        "replicasPerPartition": "1"
      },
      "tenants": {},
      "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "LowLevel",
          "stream.kafka.topic.name": "meetupRSVPEvents",
          "stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
          "stream.kafka.zk.broker.url": "localhost:2191/kafka",
          "stream.kafka.broker.list": "localhost:19092"
        }
      },
      "metadata": {
        "customConfigs": {}
      }
    }
    last saved checkpoint
    (optionally) get the offset of an event that was published at a specified time
    StreamMetadataProviderarrow-up-right
    StreamMessageDecoderarrow-up-right