Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Pinot Minion is a new component which leverages the Helix Task Framework . It can be attached to an existing Pinot cluster and then execute tasks as provided by the controller. It's a generic and single place for running background jobs. They help offload computationally intensive tasks—such as adding indexes to segments and merging segments—from other components.
// coming soon
This page contains guides related to importing data from Apache Kafka using stream ingestion.
This guide shows you how to import a CSV file of records into Pinot.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This guide shows you how to import data from HDFS.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This guide shows you how to import records from a Parquet file into Pinot.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This section contains a collection of guides that will show you how to import data from a Pinot supported input format.
This section contains a collection of short guides to show you how to import from a Pinot supported file system.
Learn about the different components and logical abstractions
This page contains multiple quick start guides for deploying Pinot to a public cloud provider.
The following quick start guides will show you how to run an Apache Pinot cluster using Kubernetes on different public cloud providers.
This guide shows you how to import data from files stored in Azure Data Lake Storage (ADLS)
How to turn on the water valve
There are two ways to get data ingested into Pinot:
In your Pinot controller/server configuration, you will need to provide the following configs:
or
This path should point the local folder containing core-site.xml
and hdfs-site.xml
files from your Hadoop installation
or
These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.
You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.
To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.
For example, the following curl requests to Controller will notify it to download segment files to the proper table:
pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This guide shows you how to import records into Pinot using ORC file format.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This guide shows you how to import records into Pinot using Avro file format.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
This section contains articles that provide technical and implementation details of Pinot features
This guide shows you how to ingest a stream of records from an Apache Kafka topic into a Pinot table.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
Here you will find a collection of ready-made sample applications and examples for real-world data
These properties for the stream implementation are to be set in your controller and server configurations.
In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include
You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.
For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.
All the following configs need to be prefixed with storage.factory.
AzurePinotFS requires the following configs according to your environment:
adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret
Sample Controller Config
"pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.controller.storage.factory.adl.accountId": "xxxx"
"pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.segment.fetcher.protocols": "adl"
Sample Server Config
"pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.server.storage.factory.adl.accountId": "xxxx"
"pinot.server.storage.factory.adl.authEndpoint": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.segment.fetcher.protocols": "adl"
You can find the parameters in your account as follows: https://stackoverflow.com/questions/56349040/what-is-clientid-authtokenendpoint-clientkey-for-accessing-azure-data-lake
Please also make sure to set the following config with the value “adl”
"segment.fetcher.protocols" : "adl"
To see how to upload segments to different storage systems, check ../segment_fetcher.rst
.
HadoopPinotFS requires the following configs according to your environment:
hadoop.kerberos.principle, hadoop.kerberos.keytab, hadoop.conf.path
Please make sure to also set the following config with the value “hdfs”
"segment.fetcher.protocols" : "hdfs"
When pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to pinot Controller and Server:
push segment to shared NFS and let pinot pull segment files from the location of that NFS.
push segment to a Web server and let pinot pull segment files from the Web server with http/https link.
push segment to HDFS and let pinot pull segment files from HDFS with hdfs location uri.
push segment to other system and implement your own segment fetcher to pull data from those systems.
The first two options should be supported out of the box with pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.
Learn about the various components of Pinot and terminologies used to describe data stored in Pinot
Pinot is designed to deliver low latency queries on large datasets. In order to achieve this performance, Pinot stores data in a columnar format and adds additional indices to perform fast filtering, aggregation and group by.
Raw data is broken into small data shards and each shard is converted into a unit known as a segment. One or more segments together form a table, which is the logical container for querying Pinot using SQL/PQL.
Pinot uses a variety of terms which can refer to either abstractions that model the storage of data or infrastructure components that drive the functionality of the system.
Similar to traditional databases, Pinot has the concept of a table—a logical abstraction to refer to a collection of related data. As is the case with RDBMS, a table is a construct that consists of columns and rows (documents) that are queried using SQL. A table is associated with a schema which defines the columns in a table as well as their data types.
As opposed to RDBMS schemas, multiple tables can be created in Pinot (real-time or batch) that inherit a single schema definition. Tables are independently configured for concerns such as indexing strategies, partitioning, tenants, data sources, and/or replication.
Pinot has a distributed systems architecture that scales horizontally. Pinot expects the size of a table to grow infinitely over time. In order to achieve this, all data needs to be distributed across multiple nodes. Pinot achieves this by breaking data into smaller chunks known as segments **(this is similar to shards/partitions in HA relational databases). Segments can also be seen as time-based partitions.
In order to support multi-tenancy, Pinot has first class support for tenants. A table is associated with a tenant. This allows all tables belonging to a particular logical namespace to be grouped under a single tenant name and isolated from other tenants. This isolation between tenants provides different namespaces for applications and teams to prevent sharing tables or schemas. Development teams building applications will never have to operate an independent deployment of Pinot. An organization can operate a single cluster and scale it out as new tenants increase the overall volume of queries. Developers can manage their own schemas and tables without being impacted by any other tenant on a cluster.
By default, all tables belong to a default tenant named "default". The concept of tenants is very important, as it satisfies the architectural principle of a "database per service/application" without having to operate many independent data stores. Further, tenants will schedule resources so that segments (shards) are able to restrict a table's data to reside only on a specified set of nodes. Similar to the kind of isolation that is ubiquitously used in Linux containers, compute resources in Pinot can be scheduled to prevent resource contention between tenants.
Logically, a cluster is simply a group of tenants. As with the classical definition of a cluster, it is also a grouping of a set of compute nodes. Typically, there is only one cluster per environment/data center. There is no needed to create multiple clusters since Pinot supports the concept of tenants. At LinkedIn, the largest Pinot cluster consists of 1000+ nodes distributed across a data center. The number of nodes in a cluster can be added in a way that will linearly increase performance and availability of queries. The number of nodes and the compute resources per node will reliably predict the QPS for a Pinot cluster, and as such, capacity planning can be easily achieved using SLAs that assert performance expectations for end-user applications.
A Pinot cluster is comprised of multiple distributed system components. These components are useful to understand for operators that are monitoring system usage or are debugging an issue with a cluster deployment.
Controller
Server
Broker
Minion (optional)
The benefits of scale that make Pinot linearly scalable for an unbounded number of nodes is made possible through its integration with Apache Zookeeper and Apache Helix.
A controller is the core orchestrator that drives the consistency and routing in a Pinot cluster. Controllers are horizontally scaled as an independent component (container) and has visibility of the state of all other components in a cluster. The controller reacts and responds to state changes in the system and schedules the allocation of resources for tables, segments, or nodes. As mentioned earlier, Helix is embedded within the controller as an agent that is a participant responsible for observing and driving state changes that are subscribed to by other components.
In addition to cluster management, resource allocation, and scheduling, the controller is also the HTTP gateway for REST API administration of a Pinot deployment. A web-based query console is also provided for operators to quickly and easily run SQL/PQL queries.
A broker receives queries from a client and routes their execution to one or more Pinot servers before returning a consolidated response.
Servers host segments (shards) that are scheduled and allocated across multiple nodes and routed on an assignment to a tenant (there is a single tenant by default). Servers are independent containers that scale horizontally and are notified by Helix through state changes driven by the controller. A server can either be a real-time server or an offline server.
A real-time and offline server have very different resource usage requirements, where real-time servers are continually consuming new messages from external systems (such as Kafka topics) that are ingested and allocated on segments of a tenant. Because of this, resource isolation can be used to prioritize high-throughput real-time data streams that are ingested and then made available for query through a broker.
Pinot minion is an optional component that can be used to run background tasks such as "purge" for GDPR (General Data Protection Regulation). As Pinot is an immutable aggregate store, records containing sensitive private data need to be purged on a request-by-request basis. Minion provides a solution for this purpose that complies with GDPR while optimizing Pinot segments and building additional indices that guarantees performance in the presence of the possibility of data deletion. One can also write a custom task that runs on a periodic basis. While it's possible to perform these tasks on the Pinot servers directly, having a separate process (Minion) lessens the overall degradation of query latency as segments are impacted by mutable writes.
Brokers are the components that handle Pinot queries. They accept queries from clients and forward them to the right servers. They collect results back from the servers and consolidate them into a single response, to sent it back to the client.
Pinot Brokers are modeled as Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried. The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers may optimize to prune some of the segments as long as accuracy is not sacrificed. Helix provides the framework by which spectators can learn the location in which each partition of a resource (i.e. participant) resides. The brokers use this mechanism to learn the servers that host specific segments of a table.
In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once, by performing offline and realtime federation. Let's take this example, we have realtime data for 5 days - March 23 to March 27, and offline data has been pushed until Mar 25, which is 2 days behind realtime. The brokers maintain this time boundary.
Suppose, we get a query to this table : select sum(metric) from table
. The broker will split the query into 2 queries based on this time boundary - one for offline and one for realtime. This query becomes - select sum(metric) from table_REALTIME where date >= Mar 25
and
select sum(metric) from table_OFFLINE where date < Mar 25
The broker then merges results from both these queries before returning back to the client.
Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a broker
docker run \
--network=pinot-demo \
--name pinot-broker \
-d ${PINOT_IMAGE} StartBroker \
-zkAddress pinot-zookeeper:2181
bin/pinot-admin.sh StartBroker \
-zkAddress localhost:2181 \
-clusterName PinotCluster \
-brokerPort 7000
Servers host the data segments and serve queries off the data they host. There's two types of servers
Offline Offline servers are responsible for downloading segments from the segment store, to host and serve queries off. When a new segment is uploaded to the controller, the controller decides the servers (as many as replication) that will host the new segment and notifies them to download the segment from the segment store. On receiving this notification, the servers download the segment file and load the segment onto the server, to server queries off them.
Realtime Real time servers directly ingest from a real time stream (such as Kafka, EventHubs). Periodically, they make segments of the in-memory ingested data, based on certain thresholds. This segment is then persisted onto the segment store.
Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).
Make sure you've setup Zookeeper. If you're using docker, make sure to pull the pinot docker image. To start a server
Usage: StartServer
-serverHost <String> : Host name for controller. (required=false)
-serverPort <int> : Port number to start the server at. (required=false)
-serverAdminPort <int> : Port number to serve the server admin API at. (required=false)
-dataDir <string> : Path to directory containing data. (required=false)
-segmentDir <string> : Path to directory containing segments. (required=false)
-zkAddress <http> : Http address of Zookeeper. (required=false)
-clusterName <String> : Pinot cluster name. (required=false)
-configFileName <Config File Name> : Broker Starter Config file. (required=false)
-help : Print this message. (required=false)
>
docker run \
--network=pinot-demo \
--name pinot-server \
-d ${PINOT_IMAGE} StartServer \
-zkAddress pinot-zookeeper:2181
bin/pinot-admin.sh StartServer \
-zkAddress localhost:2181
USAGE
Usage: StartServer
-serverHost <String> : Host name for controller. (required=false)
-serverPort <int> : Port number to start the server at. (required=false)
-serverAdminPort <int> : Port number to serve the server admin API at. (required=false)
-dataDir <string> : Path to directory containing data. (required=false)
-segmentDir <string> : Path to directory containing segments. (required=false)
-zkAddress <http> : Http address of Zookeeper. (required=false)
-clusterName <String> : Pinot cluster name. (required=false)
-configFileName <Config File Name> : Server Starter Config file. (required=false)
-help : Print this message. (required=false)
This section contains quick start guides to help you get up and running with Pinot.
We want your experience getting started with Pinot to be both low effort and high reward. Here you'll find a collection of quick start guides that contain starter distributions of the Pinot platform.
This video will show you a step-by-step walk through for launching the individual components of Pinot and scaling them to multiple instances. This is an excellent resource for developers and operators that want to understand setting up each component and debugging a cluster.
We also have a step-by-step guide for manually setting up a Pinot cluster using Docker or shell scripts.
Getting data into Pinot is easy. Take a look at these two quick start guides which will help you get up and running with sample data for offline and real-time tables.
This section is an overview of the various options for importing data into Pinot.
There are multiple options for importing data into Pinot. These guides are ready-made examples that show you step-by-step instructions for importing records into Pinot, supported by our plugin architecture. These guides are meant to get you up and running with imported data as quick as possible. Pinot supports multiple file input formats without needing to change anything other than the file name. Each example imports a ready-made dataset so you can see how things work without needing to bring your own dataset.
These guides will show you how to import data from a supported file system.
These guides will show you how to import data from a Pinot supported input format.
This guide will show you how to import data using stream ingestion from Apache Kafka topics.
The following summarizes Pinot's releases, from the latest one to the earliest one.
The Pinot Controller is responsible for a number of things
Controllers maintain the global metadata (e.g. configs and schemas) of the system with the help of Zookeeper which is used as the persistent metadata store.
Controllers host Helix Controller and is responsible for managing other pinot components (brokers, servers, minions)
They maintain the mapping of which servers are responsible for which segments. This mapping is used by the servers, to download the portion of the segments that they are responsible for. This mapping is also used by the broker to decide which servers to route the queries to.
Controller has admin endpoints for viewing, creating, updating and deleting configs which help us manage and operate the cluster.
Controllers also have endpoints for segment uploads which are used in offline data pushes. They are responsible for initializing realtime consumption and coordination of persisting the realtime segments into the segment store periodically.
They undertake other management activities such as managing retention of segments, validations.
There can be multiple instances of Pinot controller for redundancy. If there are multiple controllers, Pinot expects that all of them are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .
Make sure you've . If you're using docker, make sure to . To start a controller
Ways to query Pinot
Pinot can be queried via a broker endpoint as follows. This example assumes broker is running on localhost:8099
The Pinot REST API can be accessed by invoking POST
operation with a JSON body containing the parameter sql
to the /query/sql
endpoint on a broker.
Note
This endpoint is deprecated, and will soon be removed. The standard-SQL endpoint is the recommended endpoint.
The PQL endpoint can be accessed by invoking POST
operation with a JSON body containing the parameter pql
to the /query
endpoint on a broker.
Query Console can be used for running ad-hoc queries (checkbox available to query the PQL endpoint). The Query Console can be accessed by entering the <controller host>:<controller port>
in your browser
You can also query using the pinot-admin
scripts. Make sure you follow instructions in to get Pinot locally, and then
Here's a list of the clients available to query Pinot from your application
Coming soon - JDBC client
First, a bit of naming notions. Pinot has has different components, and different ways of representing the data. In particular, data is represented by:
A table is a logical abstraction to refer to a collection of related data. It consists of columns and rows (documents).
Data in table is divided into (horizontal) shards referred to as segments.
Manages other pinot components (brokers, servers) as well as controls assignment of tables/segments to servers.
Hosts one or more segments and serves queries from those segments.
Accepts queries from clients and routes them to one or more servers, and returns consolidated response to the client.
Pinot leverages for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.
Briefly, Helix divides nodes into three logical components based on their responsibilities:
The nodes that host distributed, partitioned resources.
The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability
Pinot Controller hosts Helix Controller, in addition to hosting REST APIs for Pinot cluster administration and data ingestion. There can be multiple instances of Pinot controller for redundancy. If there are multiple controllers, Pinot expects that all of them are configured with the same back-end storage system so that they have a common view of the segments (e.g. NFS). Pinot can use other storage systems such as HDFS or .
Pinot Servers are modeled as Helix Participants, hosting Pinot tables (referred to as resources in helix terminology). Segments of a table are modeled as Helix partitions (of a resource). Thus, a Pinot server hosts one or more helix partitions of one or more helix resources (i.e. one or more segments of one or more tables).
Pinot Brokers are modeled as Spectators. They need to know the location of each segment of a table (and each replica of the segments) and route requests to the appropriate server that hosts the segments of the table being queried. The broker ensures that all the rows of the table are queried exactly once so as to return correct, consistent results for a query. The brokers (or servers) may optimize to prune some of the segments as long as accuracy is not satisfied. In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once. Helix provides the framework by which spectators can learn the location (i.e. participant) in which each partition of a resource resides. The brokers use this mechanism to learn the servers that host specific segments of a table.
You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of and provides config to Pinot Controller and Server as follows:
or
You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>
By default, Pinot does not come with a storage layer, so all the data sent, won't be stored in case of system crash. In order to persistently store the generated segments, you will need a storage layer.
Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.
Some examples of storage backends(other than local storage) currently supported are:
If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.
In order to add a new type of storage backend (say, Amazon s3) implement the following class:
S3FS extends
To setup a Pinot cluster, follow these steps
instances
instances
instances
docker run \
--network=pinot-demo \
--name pinot-controller \
-p 9000:9000 \
-d ${PINOT_IMAGE} StartController \
-zkAddress pinot-zookeeper:2181
bin/pinot-admin.sh StartController \
-zkAddress localhost:2181 \
-clusterName PinotCluster \
-controllerPort 9000
pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
When Pinot segment files are created in external systems (hadoop/spark/etc), there are several ways to push those data to Pinot Controller and Server:
push segment to shared NFS and let Pinot pull segment files from the location of that NFS.
push segment to a Web server and let Pinot pull segment files from the Web server with http/https link.
push segment to HDFS and let Pinot pull segment files from HDFS with hdfs location uri.
push segment to other system and implement your own segment fetcher to pull data from those systems.
The first two options should be supported out of the box with Pinot package. As long your remote jobs send Pinot controller with the corresponding URI to the files it will pick up the file and allocate it to proper Pinot Servers and brokers. To enable Pinot support for HDFS, you will need to provide Pinot Hadoop configuration and proper Hadoop dependencies.
In your Pinot controller/server configuration, you will need to provide the following configs:
pinot.controller.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
or
pinot.server.segment.fetcher.hdfs.hadoop.conf.path=`<file path to hadoop conf folder>
This path should point the local folder containing core-site.xml
and hdfs-site.xml
files from your Hadoop installation
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.controller.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
or
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.principle=`<your kerberos principal>
pinot.server.segment.fetcher.hdfs.hadoop.kerberos.keytab=`<your kerberos keytab>
These two configs should be the corresponding Kerberos configuration if your Hadoop installation is secured with Kerberos. Please check Hadoop Kerberos guide on how to generate Kerberos security identification.
You will also need to provide proper Hadoop dependencies jars from your Hadoop installation to your Pinot startup scripts.
To push HDFS segment files to Pinot controller, you just need to ensure you have proper Hadoop configuration as we mentioned in the previous part. Then your remote segment creation/push job can send the HDFS path of your newly created segment files to the Pinot Controller and let it download the files.
For example, the following curl requests to Controller will notify it to download segment files to the proper table:
curl -X POST -H "UPLOAD_TYPE:URI" -H "DOWNLOAD_URI:hdfs://nameservice1/hadoop/path/to/segment/file.gz" -H "content-type:application/json" -d '' localhost:9000/segments
You can also implement your own segment fetchers for other file systems and load into Pinot system with an external jar. All you need to do is to implement a class that extends the interface of SegmentFetcher and provides config to Pinot Controller and Server as follows:
pinot.controller.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
or
pinot.server.segment.fetcher.`<protocol>`.class =`<class path to your implementation>
You can also provide other configs to your fetcher under config-root pinot.server.segment.fetcher.<protocol>
The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:
"controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
"controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
"pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
"pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
"controller.enable.split.commit": "true"
In the Pinot controller config, add the following new configs:
"pinot.server.instance.enable.split.commit": "true"
Note: currently there is a bug in the controller (issue <https://github.com/apache/incubator-pinot/issues/3847>\), for now you can cherrypick the PR https://github.com/apache/incubator-pinot/pull/3849 to fix the issue as tested already. The PR is under review now.
Unique counting is a classic problem. Pinot solves it with multiple ways to trade-off between accuracy and latency.
Functions:
DistinctCount(x) -> LONG
Returns accurate count for all unique values in a column.
The underlying implementation is using a IntOpenHashSet in library: it.unimi.dsi:fastutil:8.2.3
to hold all the unique values.
Usually it takes a lot of resources and time to compute accurate results for unique counting. In some circumstance, users could tolerate with certain error rate, then we could use approximation functions to tackle this problem.
HyperLogLog is one approximation algorithm for unique counting. It uses fixed number of bits to estimate the cardinality of given data set.
Pinot leverages HyperLogLog Class in library com.clearspring.analytics:stream:2.7.0
as the data structure to hold intermediate results.
Functions:
DistinctCountHLL(x) -> LONG
For column type INT/LONG/FLOAT/DOUBLE/STRING , Pinot treats each value as an individual entry to add into HyperLogLog Object, then compute the approximation by calling method cardinality().
For column type BYTES, Pinot treats each value as a serialized HyperLogLog Object with pre-aggregated values inside. The bytes value is generated by org.apache.pinot.core.common.ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.serialize(hyperLogLog)
.
All deserialized HyperLogLog object will be merged into one then calling method cardinality() to get the approximated unique count.
Pinot documentation is powered by Gitbook, and a bi-directional Github integration is set up to back up all the changes.
The git repo is here: https://github.com/pinot-contrib/pinot-docs
For Pinot Contributor, there are majorly two ways to update the documentations.
This follows the old fashion of updating documentations.
You can checkout pinot-docs repo and modify the documentation accordingly then submit a PullRequest for review.
Once the PR got merged, the changes will automatically applied to corresponding Gitbook pages.
Please note that all Gitbook documentation follows Markdown Syntax.
Once granted edit permission, contributors could edit any page on Gitbook and then save and merge the changes by themselves. This is one example commit on Github repo to reflect the updates coming from Git book: Adding Update Document Page Commit.
Usually we grant edit permission to committers and active contributors.
Please contact admin(Email to [email protected] with the content you wanna add) to ask for edit permission for Pinot Gitbook.
Once granted the permission, you can directly working on Pinot Gitbook UI to modify the documentation, and merge changes.
cd incubator-pinot/pinot-tools/target/pinot-tools-pkg
bin/pinot-admin.sh PostQuery \
-queryType sql \
-brokerPort 8000 \
-query "select count(*) from baseballStats"
2020/03/04 12:46:33.459 INFO [PostQueryCommand] [main] Executing command: PostQuery -brokerHost localhost -brokerPort 8000 -queryType sql -query select count(*) from baseballStats
2020/03/04 12:46:33.854 INFO [PostQueryCommand] [main] Result: {"resultTable":{"dataSchema":{"columnDataTypes":["LONG"],"columnNames":["count(*)"]},"rows":[[97889]]},"exceptions":[],"numServersQueried":1,"numServersResponded":1,"numSegmentsQueried":1,"numSegmentsProcessed":1,"numSegmentsMatched":1,"numConsumingSegmentsQueried":0,"numDocsScanned":97889,"numEntriesScannedInFilter":0,"numEntriesScannedPostFilter":0,"numGroupsLimitReached":false,"totalDocs":97889,"timeUsedMs":185,"segmentStatistics":[],"traceInfo":{},"minConsumingFreshnessTimeMs":0}
Cluster is a set a nodes comprising of servers, brokers, controllers and minions.
Pinot leverages for cluster management. Helix is a cluster management framework to manage replicated, partitioned resources in a distributed system. Helix uses Zookeeper to store cluster state and metadata.
Briefly, Helix divides nodes into three logical components based on their responsibilities
The nodes that host distributed, partitioned resources
The nodes that observe the current state of each Participant and use that information to access the resources. Spectators are notified of state changes in the cluster (state of a participant, or that of a partition in a participant).
The node that observes and controls the Participant nodes. It is responsible for coordinating all transitions in the cluster and ensuring that state constraints are satisfied while maintaining cluster stability.
Pinot Servers are modeled as Participants, more details about server nodes can be found in . Pinot Brokers are modeled as Spectators, more details about broker nodes can be found in . Pinot Controllers are modeled as Controllers, more details about controller nodes can be found in .
Another way to visualize the cluster is a logical view, wherein a cluster contains , tenants contain , and tables contain .
Typically, there is only cluster per environment/data center. There is no needed to create multiple Pinot clusters since Pinot supports the concept of . At LinkedIn, the largest Pinot cluster consists of 1000+ nodes.
To setup a Pinot cluster, we need to first start Zookeeper.
Once we've started Zookeeper, we can start other components to join this cluster. If you're using docker, pull the latest apachepinot/pinot
image.
To start other components to join the cluster
Explore your cluster via
A tenant is a logical component, defined as a group of server/broker nodes with the same Helix tag.
In order to support multi-tenancy, Pinot has first class support for tenants. Every table is associated with a server tenant and a broker tenant. This controls the nodes that will be used by this table as servers and brokers. This allows all tables belonging to a particular use case to be grouped under a single tenant name.
The concept of tenants is very important when the multiple use cases are using Pinot and there is a need to provide quotas or some sort of isolation across tenants. For example, consider we have two tables Table A
and Table B
in the same Pinot cluster.
We can configure Table A
with server tenant Tenant A
and Table B
with server tenant Tenant B
. We can tag some of the server nodes for Tenant A
and some for Tenant B
. This will ensure that segments of Table A
only reside on servers tagged with Tenant A
, and segment of Table B
only reside on servers tagged with Tenant B
. The same isolation can be achieved at the broker level, by configuring broker tenants to the tables.
No need to create separate clusters for every table or use case!
This tenant is defined in the section of the table config.
This section contains 2 main fields broker
and server
which decide the tenants used for the broker and server components of this table.
In the above example,
The table will be served by brokers that have been tagged as brokerTenantName_BROKER
in Helix.
If this were an offline table, the offline segments for the table will be hosted in pinot servers tagged in helix as serverTenantName_OFFLINE
If this were a realtime table, the realtime segments (both consuming as well as completed ones) will be hosted in pinot servers tagged in helix as serverTenantName_REALTIME
.
Here's a sample broker tenant config. This will create a broker tenant sampleBrokerTenant
by tagging 3 untagged broker nodes as sampleBrokerTenant_BROKER
.
To create this tenant use the following command. The creation will fail if number of untagged broker nodes is less than numberOfInstances
.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
Here's a sample server tenant config. This will create a server tenant sampleServerTenant
by tagging 1 untagged server node as sampleServerTenant_OFFLINE
and 1 untagged server node as sampleServerTenant_REALTIME
.
To create this tenant use the following command. The creation will fail if number of untagged server nodes is less than offlineInstances
+ realtimeInstances
.
Follow instructions in to get Pinot locally, and then
Check out the table config in the to make sure it was successfully uploaded.
This starter provides a quick start for running Pinot on Google Cloud Platform (GCP)
This document provides the basic instruction to set up a Kubernetes Cluster on
Please follow this link () to install kubectl.
For Mac User
Please check kubectl version after installation.
Please follow this link () to install helm.
For Mac User
Please check helm version after installation.
__
Please follow this link () to install Google Cloud SDK.
Install Google Cloud SDK
Restart your shell
Below script will create a 3 nodes cluster named pinot-quickstart in us-west1-b with n1-standard-2 machines for demo purposes.
Please modify the parameters in the example command below:
You can monitor cluster status by command:
Once the cluster is in RUNNING status, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
To verify the connection, you can run:
Please follow this to deploy your Pinot Demo.
This quick start guide will help you bootstrap a Pinot standalone instance on your local machine.
In this guide you'll learn how to download and install Apache Pinot as a standalone instance.
This is a quickstart guide that will show you how to quickly start an example recipe in a standalone instance and is meant for learning. To run Pinot in cluster mode, please take a look at .
First, let's download the Pinot distribution for this tutorial. You can either build the distribution from source or download a packaged release.
Follow these steps to checkout code from and build Pinot locally
Download the latest binary release from , or use this command
Once you have the tar file,
We'll be using a quick-start script, which does the following:
Sets up the Pinot cluster QuickStartCluster
Creates a sample table and loads sample data
There's 3 kinds of quick start
Batch quick start creates the pinot cluster, creates an offline table baseballStats
and pushes sample offline data to the table.
That's it! We've spun up a Pinot cluster. You can continue playing with other types of quick start, or simply head on to to check out the data in the baseballStats
table.
Streaming quick start sets up a Kafka cluster and pushes sample data to a Kafka topic. Then, it creates the Pinot cluster and creates a realtime table meetupRSVP
which ingests data from the Kafka topic.
We now have a Pinot cluster with a realtime table! You can head over to to check out the data in the meetupRSVP
table.
Hybrid quick start sets up a Kafka cluster and pushes sample data to a Kafka topic. Then, it creates the Pinot cluster and creates a hybrid table airlineStats
. The realtime table ingests data from the Kafka topic. Lastly, sample data is pushed into the offline table.
Let's head over to to check out the data we pushed to the airlineStats
table.
brew install kubernetes-cli
kubectl version
brew install kubernetes-helm
helm version
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
gcloud init
GCLOUD_PROJECT=[your gcloud project name]
GCLOUD_ZONE=us-west1-b
GCLOUD_CLUSTER=pinot-quickstart
GCLOUD_MACHINE_TYPE=n1-standard-2
GCLOUD_NUM_NODES=3
gcloud container clusters create ${GCLOUD_CLUSTER} \
--num-nodes=${GCLOUD_NUM_NODES} \
--machine-type=${GCLOUD_MACHINE_TYPE} \
--zone=${GCLOUD_ZONE} \
--project=${GCLOUD_PROJECT}
gcloud compute instances list
GCLOUD_PROJECT=[your gcloud project name]
GCLOUD_ZONE=us-west1-b
GCLOUD_CLUSTER=pinot-quickstart
gcloud container clusters get-credentials ${GCLOUD_CLUSTER} --zone ${GCLOUD_ZONE} --project ${GCLOUD_PROJECT}
kubectl get nodes
GCLOUD_ZONE=us-west1-b
gcloud container clusters delete pinot-quickstart --zone=${GCLOUD_ZONE}
# define the pinot version
PINOT_VERSION=0.3.0
bin/quick-start-batch.sh
# stop previous quick start cluster, if any
bin/quick-start-streaming.sh
# stop previous quick start cluster, if any
bin/quick-start-hybrid.sh
# checkout pinot
git clone https://github.com/apache/incubator-pinot.git
cd incubator-pinot
# build pinot
mvn install package -DskipTests -Pbin-dist
# navigate to directory containing the setup scripts
cd pinot-distribution/target/apache-pinot-incubating-$PINOT_VERSION-bin/apache-pinot-incubating-$PINOT_VERSION-bin
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$PINOT_VERSION/apache-pinot-incubating-$PINOT_VERSION-bin.tar.gz
# untar it
tar -zxvf apache-pinot-incubating-$PINOT_VERSION-bin.tar.gz
# navigate to directory containing the launcher scripts
cd apache-pinot-incubating-$PINOT_VERSION-bin
This starter guide provides a quick start for running Pinot on Microsoft Azure
This document provides the basic instruction to set up a Kubernetes Cluster on Azure Kubernetes Service (AKS)
Please follow this link (https://kubernetes.io/docs/tasks/tools/install-kubectl) to install kubectl.
For Mac User
brew install kubernetes-cli
Please check kubectl version after installation.
kubectl version
Please follow this link (https://helm.sh/docs/using_helm/#installing-helm) to install helm.
For Mac User
brew install kubernetes-helm
Please check helm version after installation.
helm version
Please follow this link (https://docs.microsoft.com/en-us/cli/azure/install-azure-cli?view=azure-cli-latest) to install Azure CLI.
For Mac User
brew update && brew install azure-cli
Below script will open default browser to sign-in to your Azure Account.
az login
Below script will create a resource group in location eastus.
AKS_RESOURCE_GROUP=pinot-demo
AKS_RESOURCE_GROUP_LOCATION=eastus
az group create --name ${AKS_RESOURCE_GROUP} \
--location ${AKS_RESOURCE_GROUP_LOCATION}
Below script will create a 3 nodes cluster named pinot-quickstart for demo purposes.
Please modify the parameters in the example command below:
AKS_RESOURCE_GROUP=pinot-demo
AKS_CLUSTER_NAME=pinot-quickstart
az aks create --resource-group ${AKS_RESOURCE_GROUP} \
--name ${AKS_CLUSTER_NAME} \
--node-count 3
Once the command is succeed, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
AKS_RESOURCE_GROUP=pinot-demo
AKS_CLUSTER_NAME=pinot-quickstart
az aks get-credentials --resource-group ${AKS_RESOURCE_GROUP} \
--name ${AKS_CLUSTER_NAME}
To verify the connection, you can run:
kubectl get nodes
Please follow this Kubernetes QuickStart to deploy your Pinot Demo.
AKS_RESOURCE_GROUP=pinot-demo
AKS_CLUSTER_NAME=pinot-quickstart
az aks delete --resource-group ${AKS_RESOURCE_GROUP} \
--name ${AKS_CLUSTER_NAME}
The 0.2.0 release is the first release after the initial one and includes several improvements, reported following.
Added support for Kafka 2.0
Table rebalancer now supports a minimum number of serving replicas during rebalance
Added support for UDF in filter predicates and selection
Added support to use hex string as the representation of byte array for queries (see PR #4041)
Added support for parquet reader (see PR #3852)
Introduced interface stability and audience annotations (see PR #4063)
Refactor HelixBrokerStarter to separate constructor and start() - backwards incompatible (see PR #4100)
Admin tool for listing segments with invalid intervals for offline tables
Migrated to log4j2 (see PR #4139)
Added simple avro msg decoder
Added support for passing headers in Pinot client
Table rebalancer now supports a minimum number of serving replicas during rebalance
Support transform functions with AVG aggregation function (see PR #4557)
Configurations additions/changes
Allow customized metrics prefix (see PR #4392)
Controller.enable.batch.message.mode to false by default (see PR #3928)
RetentionManager and OfflineSegmentIntervalChecker initial delays configurable (see PR #3946)
Config to control kafka fetcher size and increase default (see PR #3869)
Added a percent threshold to consider startup of services (see PR #4011)
Make SingleConnectionBrokerRequestHandler as default (see PR #4048)
Always enable default column feature, remove the configuration (see PR #4074)
Remove redundant default broker configurations (see PR #4106)
Removed some config keys in server(see PR #4222)
Add config to disable HLC realtime segment (see PR #4235)
Make RetentionManager and OfflineSegmentIntervalChecker initial delays configurable (see PR #3946)
The following config variables are deprecated and will be removed in the next release:
pinot.broker.requestHandlerType will be removed, in favor of using the "singleConnection" broker request handler. If you have set this configuration, please remove it and use the default type ("singleConnection") for broker request handler.
We are in the process of separating Helix and Pinot controllers, so that administrators can have the option of running independent Helix controllers and Pinot controllers.
We are in the process of moving towards supporting SQL query format and results.
We are in the process of separating instance and segment assignment using instance pools to optimize the number of Helix state transitions in Pinot clusters with thousands of tables.
Task management does not work correctly in this release, due to bugs in Helix. We will upgrade to Helix 0.9.2 (or later) version to get this fixed.
You must upgrade to this release before moving onto newer versions of Pinot release. The protocol between Pinot-broker and Pinot-server has been changed and this release has the code to retain compatibility moving forward. Skipping this release may (depending on your environment) cause query errors if brokers are upgraded and servers are in the process of being upgraded.
As always, we recommend that you upgrade controllers first, and then brokers and lastly the servers in order to have zero downtime in production clusters.
Pull Request #4100 introduces a backwards incompatible change to Pinot broker. If you use the Java constructor on HelixBrokerStarter class, then you will face a compilation error with this version. You will need to construct the object and call start() method in order to start the broker.
Pull Request #4139 introduces a backwards incompatible change for log4j configuration. If you used a custom log4j configuration (log4j.xml), you need to write a new log4j2 configuration (log4j2.xml). In addition, you may need to change the arguments on the command line to start Pinot components.
If you used Pinot-admin command to start Pinot components, you don't need any change. If you used your own commands to start pinot components, you will need to pass the new log4j2 config as a jvm parameter (i.e. substitute -Dlog4j.configuration or -Dlog4j.configurationFile argument with -Dlog4j2.configurationFile=log4j2.xml).
This guide provides a quick start for running Pinot on Amazon Web Services (AWS).
This document provides the basic instruction to set up a Kubernetes Cluster on Amazon Elastic Kubernetes Service (Amazon EKS)
Please follow this link (https://kubernetes.io/docs/tasks/tools/install-kubectl) to install kubectl.
For Mac User
brew install kubernetes-cli
Please check kubectl version after installation.
kubectl version
Please follow this link (https://helm.sh/docs/using_helm/#installing-helm) to install helm.
For Mac User
brew install kubernetes-helm
Please check helm version after installation.
helm version
__
Please follow this link (https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html#install-tool-bundled) to install AWS CLI.
For Mac User
curl "https://d1vvhvl2y92vvt.cloudfront.net/awscli-exe-macos.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
Please follow this link (https://docs.aws.amazon.com/eks/latest/userguide/eksctl.html#installing-eksctl) to install AWS CLI.
For Mac User
brew tap weaveworks/tap
brew install weaveworks/tap/eksctl
For first time AWS user, please register your account at https://aws.amazon.com/.
Once created the account, you can go to AWS Identity and Access Management (IAM) to create a user and create access keys under Security Credential tab.
aws configure
Below script will create a 3 nodes cluster named pinot-quickstart in us-west-2 with t3.small machines for demo purposes.
Please modify the parameters in the example command below:
EKS_CLUSTER_NAME=pinot-quickstart
eksctl create cluster \
--name ${EKS_CLUSTER_NAME} \
--version 1.14 \
--region us-west-2 \
--nodegroup-name standard-workers \
--node-type t3.small \
--nodes 3 \
--nodes-min 3 \
--nodes-max 4 \
--node-ami auto
You can monitor cluster status by command:
EKS_CLUSTER_NAME=pinot-quickstart
aws eks describe-cluster --name ${EKS_CLUSTER_NAME}
Once the cluster is in ACTIVE status, it's ready to be used.
Simply run below command to get the credential for the cluster pinot-quickstart that you just created or your existing cluster.
EKS_CLUSTER_NAME=pinot-quickstart
aws eks update-kubeconfig --name ${EKS_CLUSTER_NAME}
To verify the connection, you can run:
kubectl get nodes
Please follow this Kubernetes QuickStart to deploy your Pinot Demo.
EKS_CLUSTER_NAME=pinot-quickstart
aws eks delete-cluster --name ${EKS_CLUSTER_NAME}
Segments for offline tables are constructed outside of Pinot, typically in Hadoop via map-reduce jobs and ingested into Pinot via REST API provided by the Controller. Pinot provides libraries to create Pinot segments out of input files in AVRO, JSON or CSV formats in a hadoop job, and push the constructed segments to the controllers via REST APIs.
When an Offline segment is ingested, the controller looks up the table’s configuration and assigns the segment to the servers that host the table. It may assign multiple servers for each segment depending on the number of replicas configured for that table.
Pinot supports different segment assignment strategies that are optimized for various use cases.
Once segments are assigned, Pinot servers get notified via Helix to “host” the segment. The servers download the segments (as a cached local copy to serve queries) and load them into local memory. All segment data is maintained in memory as long as the server hosts that segment.
Once the server has loaded the segment, Helix notifies brokers of the availability of these segments. The brokers start include the new segments for queries. Brokers support different routing strategies depending on the type of table, the segment assignment strategy and the use case.
Data in offline segments are immutable (Rows cannot be added, deleted, or modified). However, segments may be replaced with modified data.
Segments for realtime tables are constructed by Pinot servers with rows ingested from data streams such as Kafka. Rows ingested from streams are made available for query processing as soon as they are ingested, thus enabling applications such as those that need real-time charts on analytics.
In large scale installations, data in streams is typically split across multiple stream partitions. The underlying stream may provide consumer implementations that allow applications to consume data from any subset of partitions, including all partitions (or, just from one partition).
A pinot table can be configured to consume from streams in one of two modes:
LowLevel
: This is the preferred mode of consumption. Pinot creates independent partition-level consumers for each partition. Depending on the the configured number of replicas, multiple consumers may be created for each partition, taking care that no two replicas exist on the same server host. Therefore you need to provision at least as many hosts as the number of replcias configured.
HighLevel
: Pinot creates one stream-level consumer that consumes from all partitions. Each message consumed could be from any of the partitions of the stream. Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.
Of course, the underlying stream should support either mode of consumption in order for a Pinot table to use that mode. Kafka has support for both of these modes. See Pluggable Streams for more information on support of other data streams in Pinot.
In either mode, Pinot servers store the ingested rows in volatile memory until either one of the following conditions are met:
A certain number of rows are consumed
The consumption has gone on for a certain length of time
(See StreamConfigs Section on how to set these values, or have pinot compute them for you)
Upon reaching either one of these limits, the servers do the following:
Pause consumption
Persist the rows consumed so far into non-volatile storage
Continue consuming new rows into volatile memory again.
The persisted rows form what we call a completed segment (as opposed to a consuming segment that resides in volatile memory).
In LowLevel
mode, the completed segments are persisted the into local non-volatile store of pinot server as well as the segment store of the pinot cluster (See Pinot Architecture Overview). This allows for easy and automated mechanisms for replacing pinot servers, or expanding capacity, etc. Pinot has special mechanisms that ensure that the completed segment is equivalent across all replicas.
During segment completion, one winner is chosen by the controller from all the replicas as the committer server
. The committer server
builds the segment and uploads it to the controller. All the other non-committer servers
follow one of these two paths:
If the in-memory segment is equivalent to the committed segment, the non-committer
server also builds the segment locally and replaces the in-memory segment
If the in-memory segment is non equivalent to the committed segment, the non-committer
server downloads the segment from the controller.
For more details on this protocol, please refer to this doc.
In HighLevel
mode, the servers persist the consumed rows into local store (and not the segment store). Since consumption of rows can be from any partition, it is not possible to guarantee equivalence of segments across replicas.
See Consuming and Indexing rows in Realtime for details.
Pinot supports indexing data from various file formats. To support reading from a file format, a record reader need to be provided to read the file and convert records into the general format which the indexing engine can understand. The record reader serves as the connector from each individual file format to Pinot record format.
Pinot package provides the following record readers out of the box:
Avro record reader: record reader for Avro format files
CSV record reader: record reader for CSV format files
JSON record reader: record reader for JSON format files
ORC record reader: record reader for ORC format files
Thrift record reader: record reader for Thrift format files
Pinot segment record reader: record reader for Pinot segment
To initialize a record reader, the data file and table schema should be provided (for Pinot segment record reader, only need to provide the index directory because schema can be derived from the segment). The output record will follow the table schema provided.
For Avro/JSON/ORC/Pinot segment record reader, no extra configuration is required as column names and multi-values are embedded in the data file.
For CSV/Thrift record reader, extra configuration might be provided to determine the column names and multi-values for the data.
The CSV record reader config contains the following settings:
Header: the header for the CSV file (column names)
Column delimiter: delimiter for each column
Multi-value delimiter: delimiter for each value for a multi-valued column
If no config provided, use the default setting:
Use the first row in the data file as the header
Use ‘,’ as the column delimiter
Use ‘;’ as the multi-value delimiter
The Thrift record reader config is mandatory. It contains the Thrift class name for the record reader to de-serialize the Thrift objects.
The following property is to be set during segment generation in your Hadoop properties.
record.reader.path: ${FULL_PATH_OF_YOUR_RECORD_READER_CLASS}
For ORC, it would be:
record.reader.path: org.apache.pinot.orc.data.readers.ORCRecordReader
For other file formats, we provide a general interface for record reader - RecordReader. To index the file into Pinot segment, simply implement the interface and plug it into the index engine - SegmentCreationDriverImpl. We use a 2-passes algorithm to index the file into Pinot segment, hence the rewind() method is required for the record reader.
GenericRow is the record abstraction which the index engine can read and index with. It is a map from column name (String) to column value (Object). For multi-valued column, the value should be an object array (Object[]).
There are several contracts for record readers that developers should follow when implementing their own record readers:
The output GenericRow should follow the table schema provided, in the sense that:
All the columns in the schema should be preserved (if column does not exist in the original record, put default value instead)
Columns not in the schema should not be included
Values for the column should follow the field spec from the schema (data type, single-valued/multi-valued)
For the time column (refer to TimeFieldSpec), record reader should be able to read both incoming and outgoing time (we allow incoming time - time value from the original data to outgoing time - time value stored in Pinot conversion during index creation).
If incoming and outgoing time column name are the same, use incoming time field spec
If incoming and outgoing time column name are different, put both of them as time field spec
We keep both incoming and outgoing time column to handle cases where the input file contains time values that are already converted
This guide shows you how to import records into Pinot using a Thrift file.
This guide is a work in progress.
We're actively working on improving our documentation. This doc will be available very soon. Please check back in a day or two for more details.
VERSION=0.3.0
wget https://downloads.apache.org/incubator/pinot/apache-pinot-incubating-$VERSION/apache-pinot-incubating-$VERSION-bin.tar.gz
tar vxf apache-pinot-incubating-*-bin.tar.gz
cd apache-pinot-incubating-*-bin
bin/quick-start-batch.sh
docker network create -d bridge pinot-demo
docker run \
--network=pinot-demo \
--name pinot-zookeeper \
--restart always \
-p 2181:2181 \
-d zookeeper:3.5.6
docker run \
--network pinot-demo --name=zkui \
-p 9090:9090 \
-e ZK_SERVER=pinot-zookeeper:2181 \
-d qnib/plain-zkui:latest
bin/pinot-admin.sh StartZookeeper -zkPort 2181
export PINOT_VERSION=0.3.0-SNAPSHOT
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
"tenants": {
"broker": "brokerTenantName",
"server": "serverTenantName"
}
{
"tenantRole" : "BROKER",
"tenantName" : "sampleBrokerTenant",
"numberOfInstances" : 3
}
bin/pinot-admin.sh AddTenant \
-name sampleBrokerTenant
-role BROKER
-instanceCount 3 -exec
curl -i -X POST -H 'Content-Type: application/json' -d @sample-broker-tenant.json localhost:9000/tenants
{
"tenantRole" : "SERVER",
"tenantName" : "sampleServerTenant",
"offlineInstances" : 1,
"realtimeInstances" : 1
}
bin/pinot-admin.sh AddTenant \
-name sampleServerTenant \
-role SERVER \
-offlineInstanceCount 1 \
-realtimeInstanceCount 1 -exec
curl -i -X POST -H 'Content-Type: application/json' -d @sample-server-tenant.json localhost:9000/tenants
To contribute to Pinot, please follow the instructions below.
Pinot uses git for source code management. If you are new to Git, it will be good to review basics of Git and a common tasks like managing branches and rebasing.
To limit the number of branches created on the Apache Pinot repository, we recommend that you create a fork by clicking on the fork button in this page. Read more about fork workflow here
$ mkdir workspace
$ cd workspace
$ git clone [email protected]:<github username>/pinot.git
$ cd pinot
# set upstream
$ git remote add upstream https://github.com/apache/incubator-pinot
# check that the upstream shows up correctly
$ git remote -v
Pinot is a Maven project and familiarity with Maven will help you work with Pinot code. If you are new to Maven, you can read about Maven here and get a quick overview here.
Run the following maven command to setup the project.
# compile, download sources
$mvn install package -DskipTests -Pbin-dist -DdownloadSources -DdownloadJavadocs
Import the project into your favorite IDE. Setup stylesheet according to your IDE. We have provided instructions for intellij and eclipse. If you are using other IDEs, please ensure you use stylesheet based on this.
To import the Pinot stylesheet this launch intellij and navigate to Preferences
(on Mac) or Settings
on Linux.
Navigate to Editor
-> Code Style
-> Java
Select Import Scheme
-> Intellij IDES code style XML
Choose codestyle-intellij.xml
from incubator-pinot/config
folder of your workspace. Click Apply.
To import the Pinot stylesheet this launch eclipse and navigate to Preferences
(on Mac) or Settings
on Linux.
Navigate to Java->Code Style->Formatter
Choose codestyle-eclipse.xml
from incubator-pinot/config folder
of your workspace. Click Apply.
This quick start guide will show you how to set up a Pinot cluster manually.
You can try out pre-built Pinot all-in-one docker image.
(Optional) You can also follow the instructions to build your own images.
Create an isolated bridge network in docker
Start Zookeeper in daemon mode. This is a single node zookeeper setup. Zookeeper is the central metadata store for Pinot and should be set up with replication for production use. See for more information.
Start to browse Zookeeper data at .
Alternately, you can use .
Start Pinot Controller in daemon and connect to Zookeeper.
Start Pinot Broker in daemon and connect to Zookeeper.
Start Pinot Server in daemon and connect to Zookeeper.
Optionally, you can also start Kafka for setting up realtime streams. This brings up the Kafka broker on port 9092.
Now all Pinot related components are started as an empty cluster.
You can run below command to check container status.
Sample Console Output
Now it's time to start adding data to the cluster. Check out some of the or follow the and for instructions on loading your own data.
This page describes how to write your own streams to plug to Pinot. Two modes are available: high and low level.
The stream should provide the following guarantees:
Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.
(Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.
Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.
The checkpoints should be recorded only when Pinot makes a call to do so.
The consumer should be able to start consumption from one of:
latest avaialble data
earliest available data
last saved checkpoint
While consuming rows at a partition level, the stream should support the following properties:
Stream should provide a mechanism to get the current number of partitions.
Each event in a partition should have a unique offset that is not more than 64 bits long.
Refer to a partition as a number not exceeding 32 bits long.
Stream should provide the following mechanisms to get an offset for a given partition of the stream:
get the offset of the oldest event available (assuming events are aged out periodically) in the partition.
get the offset of the most recent event published in the partition
(optionally) get the offset of an event that was published at a specified time
Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.
Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1
, then the offset o2
of the following event should be such that o2 > o1
.
In addition, we have an operational requirement that the number of partitions should not be reduced over time.
In order to add a new type of stream (say, Foo) implement the following classes:
FooConsumerFactory extends
FooPartitionLevelConsumer implements
FooStreamLevelConsumer implements
FooMetadataProvider implements
FooMessageDecoder implements
Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.
The properties for the stream implementation are to be set in the table configuration, inside section.
Use the streamType
property to define the stream type. For example, for the implementation of stream foo
, set the property "streamType" : "foo"
.
The rest of the configuration properties for your stream should be set with the prefix "stream.foo"
. Be sure to use the same suffix for: (see examples below):
topic
consumer type
stream consumer factory
offset
decoder class name
decoder properties
connection timeout
fetch timeout
All values should be strings. For example:
You can have additional properties that are specific to your stream. For example:
In addition to these properties, you can define thresholds for the consuming segments:
rows threshold
time threshold
The properties for the thresholds are as follows:
An example of this implementation can be found in the , which is an implementation for the kafka stream.
$ curl -H "Content-Type: application/json" -X POST \
-d '{"sql":"select foo, count(*) from myTable group by foo limit 100"}' \
http://localhost:8099/query/sql
$ curl -H "Content-Type: application/json" -X POST \
-d '{"pql":"select count(*) from myTable group by foo top 100"}' \
http://localhost:8099/query
docker run --rm -ti \
--network pinot-demo --name=zkui \
-p 9090:9090 \
-e ZK_SERVER=pinot-zookeeper:2181 \
-d qnib/plain-zkui:latest
docker run --rm -ti \
--network=pinot-demo \
--name pinot-controller \
-p 9000:9000 \
-d ${PINOT_IMAGE} StartController \
-zkAddress pinot-zookeeper:2181
docker run --rm -ti \
--network=pinot-demo \
--name pinot-broker \
-d ${PINOT_IMAGE} StartBroker \
-zkAddress pinot-zookeeper:2181
docker run --rm -ti \
--network=pinot-demo \
--name pinot-server \
-d ${PINOT_IMAGE} StartServer \
-zkAddress pinot-zookeeper:2181
docker run --rm -ti \
--network pinot-demo --name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=pinot-zookeeper:2181/kafka \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=kafka \
-d wurstmeister/kafka:latest
docker container ls -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9ec20e4463fa wurstmeister/kafka:latest "start-kafka.sh" 43 minutes ago Up 43 minutes kafka
0775f5d8d6bf apachepinot/pinot:latest "./bin/pinot-admin.s…" 44 minutes ago Up 44 minutes 8096-8099/tcp, 9000/tcp pinot-server
64c6392b2e04 apachepinot/pinot:latest "./bin/pinot-admin.s…" 44 minutes ago Up 44 minutes 8096-8099/tcp, 9000/tcp pinot-broker
b6d0f2bd26a3 apachepinot/pinot:latest "./bin/pinot-admin.s…" 45 minutes ago Up 45 minutes 8096-8099/tcp, 0.0.0.0:9000->9000/tcp pinot-controller
570416fc530e zookeeper:3.5.6 "/docker-entrypoint.…" 45 minutes ago Up 45 minutes 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp pinot-zookeeper
cd apache-pinot-incubating-${PINOT_VERSION}-bin
bin/pinot-admin.sh StartZookeeper \
-zkPort 2191
bin/pinot-admin.sh StartController \
-zkAddress localhost:2191 \
-controllerPort 9000
bin/pinot-admin.sh StartBroker \
-zkAddress localhost:2191
bin/pinot-admin.sh StartServer \
-zkAddress localhost:2191
bin/pinot-admin.sh StartKafka \
-zkAddress=localhost:2191/kafka \
-port 19092
export PINOT_VERSION=0.3.0-SNAPSHOT
export PINOT_IMAGE=apachepinot/pinot:${PINOT_VERSION}
docker pull ${PINOT_IMAGE}
docker network create -d bridge pinot-demo
docker run \
--network=pinot-demo \
--name pinot-zookeeper \
--restart always \
-p 2181:2181 \
-d zookeeper:3.5.6
"streamType" : "foo",
"stream.foo.topic.name" : "SomeTopic",
"stream.foo.consumer.type": "LowLevel",
"stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
"stream.foo.consumer.prop.auto.offset.reset": "largest",
"stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
"stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
"stream.foo.connection.timeout.millis" : "10000", // default 30_000
"stream.foo.fetch.timeout.millis" : "10000" // default 5_000
"stream.foo.some.buffer.size" : "24g"
"realtime.segment.flush.threshold.size" : "100000"
"realtime.segment.flush.threshold.time" : "6h"
Pinot Client for Golang
Applications can use this golang client library to query Apache Pinot.
Source Code Repo: https://github.com/fx19880617/pinot-client-go
Please follow this Pinot Quickstart link to install and start Pinot batch QuickStart locally.
bin/quick-start-batch.sh
Check out Client library Github Repo
git clone [email protected]:fx19880617/pinot-client-go.git
cd pinot-client-go
Build and run the example application to query from Pinot Batch Quickstart
go build ./examples/batch-quickstart
./batch-quickstart
Pinot client could be initialized through:
pinotClient := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
pinotClient := pinot.NewFromBrokerList([]string{"localhost:8000"})
pinotClient := pinot.NewWithConfig(&pinot.ClientConfig{
ZkConfig: &pinot.ZookeeperConfig{
ZookeeperPath: zkPath,
PathPrefix: strings.Join([]string{zkPathPrefix, pinotCluster}, "/"),
SessionTimeoutSec: defaultZkSessionTimeoutSec,
},
ExtraHTTPHeader: map[string]string{
"extra-header":"value",
},
})
Please see this example for your reference.
Code snippet:
pinotClient, err := pinot.NewFromZookeeper([]string{"localhost:2123"}, "", "QuickStartCluster")
if err != nil {
log.Error(err)
}
brokerResp, err := pinotClient.ExecuteSQL("baseballStats", "select count(*) as cnt, sum(homeRuns) as sum_homeRuns from baseballStats group by teamID limit 10")
if err != nil {
log.Error(err)
}
log.Infof("Query Stats: response time - %d ms, scanned docs - %d, total docs - %d", brokerResp.TimeUsedMs, brokerResp.NumDocsScanned, brokerResp.TotalDocs)
Query Response is defined as the struct of following:
type BrokerResponse struct {
AggregationResults []*AggregationResult `json:"aggregationResults,omitempty"`
SelectionResults *SelectionResults `json:"SelectionResults,omitempty"`
ResultTable *ResultTable `json:"resultTable,omitempty"`
Exceptions []Exception `json:"exceptions"`
TraceInfo map[string]string `json:"traceInfo,omitempty"`
NumServersQueried int `json:"numServersQueried"`
NumServersResponded int `json:"numServersResponded"`
NumSegmentsQueried int `json:"numSegmentsQueried"`
NumSegmentsProcessed int `json:"numSegmentsProcessed"`
NumSegmentsMatched int `json:"numSegmentsMatched"`
NumConsumingSegmentsQueried int `json:"numConsumingSegmentsQueried"`
NumDocsScanned int64 `json:"numDocsScanned"`
NumEntriesScannedInFilter int64 `json:"numEntriesScannedInFilter"`
NumEntriesScannedPostFilter int64 `json:"numEntriesScannedPostFilter"`
NumGroupsLimitReached bool `json:"numGroupsLimitReached"`
TotalDocs int64 `json:"totalDocs"`
TimeUsedMs int `json:"timeUsedMs"`
MinConsumingFreshnessTimeMs int64 `json:"minConsumingFreshnessTimeMs"`
}
Note that AggregationResults
and SelectionResults
are holders for PQL queries.
Meanwhile ResultTable
is the holder for SQL queries. ResultTable
is defined as:
// ResultTable is a ResultTable
type ResultTable struct {
DataSchema RespSchema `json:"dataSchema"`
Rows [][]interface{} `json:"rows"`
}
RespSchema
is defined as:
// RespSchema is response schema
type RespSchema struct {
ColumnDataTypes []string `json:"columnDataTypes"`
ColumnNames []string `json:"columnNames"`
}
There are multiple functions defined for ResultTable
, like:
func (r ResultTable) GetRowCount() int
func (r ResultTable) GetColumnCount() int
func (r ResultTable) GetColumnName(columnIndex int) string
func (r ResultTable) GetColumnDataType(columnIndex int) string
func (r ResultTable) Get(rowIndex int, columnIndex int) interface{}
func (r ResultTable) GetString(rowIndex int, columnIndex int) string
func (r ResultTable) GetInt(rowIndex int, columnIndex int) int
func (r ResultTable) GetLong(rowIndex int, columnIndex int) int64
func (r ResultTable) GetFloat(rowIndex int, columnIndex int) float32
func (r ResultTable) GetDouble(rowIndex int, columnIndex int) float64
Sample Usage is here
// Print Response Schema
for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ {
fmt.Printf("%s(%s)\t", brokerResp.ResultTable.GetColumnName(c), brokerResp.ResultTable.GetColumnDataType(c))
}
fmt.Println()
// Print Row Table
for r := 0; r < brokerResp.ResultTable.GetRowCount(); r++ {
for c := 0; c < brokerResp.ResultTable.GetColumnCount(); c++ {
fmt.Printf("%v\t", brokerResp.ResultTable.Get(r, c))
}
fmt.Println()
}
Pinot has many inbuilt Aggregation Functions such as MIN, MAX, SUM, AVG etc. See PQL page for the list of aggregation functions.
Adding a new AggregationFunction requires two things
Implement AggregationFunction interface and make it available as part of the classpath
Register the function in AggregationFunctionFactory. As of today, this requires code change in Pinot but we plan to add the ability to plugin Functions without having to change Pinot code.
To get an overall idea, see MAX Aggregation Function implementation. All other implementations can be found here.
Lets look at the key methods to implements in AggregationFunction
interface AggregationFunction {
AggregationResultHolder createAggregationResultHolder();
GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity);
void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String, BlockValSet> blockValSetMap);
void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<String, BlockValSet> blockValSets);
void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<String, BlockValSet> blockValSets);
IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);
IntermediateResult extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey);
IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
FinalResult extractFinalResult(IntermediateResult intermediateResult);
}
Before getting into the implementation, it's important to understand how Aggregation works in Pinot.
This is advanced topic and assumes you know Pinot concepts. All the data in Pinot is stored in segments across multiple nodes. The query plan at a high level comprises of 3 phases
1. Map phase
This phase works on the individual segments in Pinot.
Initialization: Depending on the query type the following methods are invoked to setup the result holder. While having different methods and return types adds complexity, it helps in performance.
AGGREGATION : createAggregationResultHolder
This must return an instance of type AggregationResultHolder. You can either use the DoubleAggregationResultHolder or ObjectAggregationResultHolder
GROUP BY: createGroupByResultHolder
This method must return an instance of type GroupByResultHolder. Depending on the type of result object, you might be able to use one of the existing implementations.
Callback: For every record that matches the filter condition in the query,
one of the following methods are invoked depending on the queryType(aggregation vs group by) and columnType(single-value vs multi-value). Note that we invoke this method for a batch of records instead of every row for performance reasons and allows JVM to vectorize some of parts of the execution if possible.
AGGREGATION: aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<String,BlockValSet> blockValSetMap)
length: This represent length of the block. Typically < 10k
aggregationResultHolder: this is the object returned fromcreateAggregationResultHolder
blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction
Group By Single Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)
length: This represent length of the block. Typically < 10k
groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.
groupByResultHolder: This is the object returned fromcreateGroupByResultHolder
blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction
Group By Multi Value: aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map blockValSets)
length: This represent length of the block. Typically < 10k
groupKeyArray: Pinot internally maintains a value to int mapping and this groupKeyArray maps to the internal mapping. These values together form a unique key.
groupByResultHolder: This is the object returned fromcreateGroupByResultHolder
blockValSetMap: Map of blockValSets depending on the arguments to the AggFunction
2. Combine phase
In this phase, the results from all segments within a single pinot server are combined into IntermediateResult. The type of IntermediateResult is based on the Generic Type defined in the AggregationFunction implementation.
public interface AggregationFunction<IntermediateResult, FinalResult extends Comparable> {
IntermediateResult merge(IntermediateResult intermediateResult1, IntermediateResult intermediateResult2);
}
3. Reduce phase
There are two steps in the Reduce Phase
Merge all the IntermediateResult's from various servers using the merge function
Extract the final results by invoking the extractFinalResult method. In most cases, FinalResult is same type as IntermediateResult. AverageAggregationFunction is an example where IntermediateResult (AvgPair) is different from FinalResult(Double)
FinalResult extractFinalResult(IntermediateResult intermediateResult);
The java client can be found in pinot-clients/pinot-java-client. Here's an example of how to use the pinot-java-client
to query Pinot.
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.client.Request;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.client.ResultSet;
/**
* Demonstrates the use of the pinot-client to query Pinot from Java
*/
public class PinotClientExample {
public static void main(String[] args) {
// pinot connection
String zkUrl = "localhost:2181";
String pinotClusterName = "PinotCluster";
Connection pinotConnection = ConnectionFactory.fromZookeeper(zkUrl + "/" + pinotClusterName);
String query = "SELECT COUNT(*) FROM myTable GROUP BY foo";
// set queryType=sql for querying the sql endpoint
Request pinotClientRequest = new Request("sql", query);
ResultSetGroup pinotResultSetGroup = pinotConnection.execute(pinotClientRequest);
ResultSet resultTableResultSet = pinotResultSetGroup.getResultSet(0);
int numRows = resultTableResultSet.getRowCount();
int numColumns = resultTableResultSet.getColumnCount();
String columnValue = resultTableResultSet.getString(0, 1);
String columnName = resultTableResultSet.getColumnName(1);
System.out.println("ColumnName: " + columnName + ", ColumnValue: " + columnValue);
}
}
Connections to Pinot are created using the ConnectionFactory
class’ utility methods to create connections to a Pinot cluster given a Zookeeper URL, a Java Properties object or a list of broker addresses to connect to.
Connection connection = ConnectionFactory.fromZookeeper
("some-zookeeper-server:2191/zookeeperPath");
Connection connection = ConnectionFactory.fromProperties("demo.properties");
Connection connection = ConnectionFactory.fromHostList
("some-server:1234", "some-other-server:1234", ...);
Queries can be sent directly to the Pinot cluster using the Connection.execute(org.apache.pinot.client.Request)
and Connection.executeAsync(org.apache.pinot.client.Request)
methods of Connection:
ResultSetGroup resultSetGroup =
connection.execute(new Request("sql", "select * from foo..."));
// OR
Future<ResultSetGroup> futureResultSetGroup =
connection.executeAsync(new Request("sql", "select * from foo..."));
Queries can also use a PreparedStatement
to escape query parameters:
PreparedStatement statement =
connection.prepareStatement(new Request("sql", "select * from foo where a = ?"));
statement.setString(1, "bar");
ResultSetGroup resultSetGroup = statement.execute();
// OR
Future<ResultSetGroup> futureResultSetGroup = statement.executeAsync();
Results can be obtained with the various get methods in the first ResultSet, obtained through the getResultSet(int)
method:
Request request = new Request("sql", "select foo, bar from baz where quux = 'quuux'");
ResultSetGroup resultSetGroup = connection.execute(request);
ResultSet resultTableResultSet = pinotResultSetGroup.getResultSet(0);
for (int i = 0; i < resultSet.getRowCount(); ++i) {
System.out.println("foo: " + resultSet.getString(i, 0));
System.out.println("bar: " + resultSet.getInt(i, 1));
}
Note
The examples for the sections below this note, are for querying the PQL endpoint, which is deprecated and will be deleted soon. For more information about the 2 endpoints, visit Querying Pinot.
If queryFormat pql
is used in the Request
, there are some differences in how the results can be accessed, depending on the query.
In the case of aggregation, each aggregation function is within its own ResultSet. A query with multiple aggregation function will return one result set per aggregation function, as they are computed in parallel.
ResultSetGroup resultSetGroup =
connection.execute(new Request("pql", "select max(foo), min(foo) from bar"));
System.out.println("Number of result groups:" +
resultSetGroup.getResultSetCount(); // 2, min(foo) and max(foo)
ResultSet resultSetMax = resultSetGroup.getResultSet(0);
System.out.println("Max foo: " + resultSetMax.getInt(0));
ResultSet resultSetMin = resultSetGroup.getResultSet(1);
System.out.println("Min foo: " + resultSetMin.getInt(0));
In case of aggregation group by, there will be as many ResultSets as the number of aggregations, each of which will contain multiple results grouped by a group key.
ResultSetGroup resultSetGroup =
connection.execute(
new Request("pql", "select min(foo), max(foo) from bar group by baz"));
System.out.println("Number of result groups:" +
resultSetGroup.getResultSetCount(); // 2, min(foo) and max(foo)
ResultSet minResultSet = resultSetGroup.getResultSet(0);
for(int i = 0; i < minResultSet.length(); ++i) {
System.out.println("Minimum foo for " + minResultSet.getGroupKeyString(i, 1) +
": " + minResultSet.getInt(i));
}
ResultSet maxResultSet = resultSetGroup.getResultSet(1);
for(int i = 0; i < maxResultSet.length(); ++i) {
System.out.println("Maximum foo for " + maxResultSet.getGroupKeyString(i, 1) +
": " + maxResultSet.getInt(i));
}
The Pinot Admin UI contains all the APIs that you will need to operate and manage your cluster. It provides a set of APIs for Pinot cluster management including health check, instances management, schema and table management, data segments management.
Let's check out the tables in this cluster by going to Table -> List all tables in cluster and click on Try it out!
. We can see the baseballStats
table listed here. We can also see the exact curl
call made to the controller API.
You can look at the configuration of this table by going to Tables -> Get/Enable/Disable/Drop a table, type in baseballStats
in the table name, and click Try it out!
Let's check out the schemas in the cluster by going to Schema -> List all schemas in the cluster and click Try it out!
. We can see a schema called baseballStats
in this list.
Take a look at the schema by going to Schema -> Get a schema, type baseballStats
in the schema name, and click Try it out!
.
{
"schemaName": "baseballStats",
"dimensionFieldSpecs": [
{
"name": "playerID",
"dataType": "STRING"
},
{
"name": "yearID",
"dataType": "INT"
},
{
"name": "teamID",
"dataType": "STRING"
},
{
"name": "league",
"dataType": "STRING"
},
{
"name": "playerName",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "playerStint",
"dataType": "INT"
},
{
"name": "numberOfGames",
"dataType": "INT"
},
{
"name": "numberOfGamesAsBatter",
"dataType": "INT"
},
{
"name": "AtBatting",
"dataType": "INT"
},
{
"name": "runs",
"dataType": "INT"
},
{
"name": "hits",
"dataType": "INT"
},
{
"name": "doules",
"dataType": "INT"
},
{
"name": "tripples",
"dataType": "INT"
},
{
"name": "homeRuns",
"dataType": "INT"
},
{
"name": "runsBattedIn",
"dataType": "INT"
},
{
"name": "stolenBases",
"dataType": "INT"
},
{
"name": "caughtStealing",
"dataType": "INT"
},
{
"name": "baseOnBalls",
"dataType": "INT"
},
{
"name": "strikeouts",
"dataType": "INT"
},
{
"name": "intentionalWalks",
"dataType": "INT"
},
{
"name": "hitsByPitch",
"dataType": "INT"
},
{
"name": "sacrificeHits",
"dataType": "INT"
},
{
"name": "sacrificeFlies",
"dataType": "INT"
},
{
"name": "groundedIntoDoublePlays",
"dataType": "INT"
},
{
"name": "G_old",
"dataType": "INT"
}
]
}
Finally, let's checkout the data segments in the cluster by going to Segment -> List all segments, type in baseballStats
in the table name, and click Try it out!
. There's 1 segment for this table, called baseballStats_OFFLINE_0
.
You might have figured out by now, in order to get data into the Pinot cluster, we need a table, a schema and segments. Let's head over to Batch upload sample data, to find out more about these components and learn how to create them for your own data.
The Docker instructions on this page are still WIP
So far, we setup our cluster, ran some queries on the demo tables and explored the admin endpoints. We also uploaded some sample batch data for transcript table.
Now, it's time to ingest from a sample stream into Pinot.
First, we need to setup a stream. Pinot has out-of-the-box realtime ingestion support for Kafka. Other streams can be plugged in, more details in Pluggable Streams.
Let's setup a demo Kafka cluster locally, and create a sample topic transcript-topic
Start Kafka
docker run \
--network pinot-demo --name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=pinot-quickstart:2123/kafka \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ADVERTISED_HOST_NAME=kafka \
-d wurstmeister/kafka:latest
Create a Kafka Topic
docker exec \
-t kafka \
/opt/kafka/bin/kafka-topics.sh \
--zookeeper pinot-quickstart:2123/kafka \
--partitions=1 --replication-factor=1 \
--create --topic transcript-topic
Start Kafka
Start Kafka cluster on port 9876
using the same Zookeeper from the quick-start examples
bin/pinot-admin.sh StartKafka -zkAddress=localhost:2123/kafka -port 9876
Create a Kafka topic
Download the latest Kafka. Create a topic
If you followed the Batch upload sample data, you have already pushed a schema for your sample table. If not, head over to Creating a schema on that page, to learn how to create a schema for your sample data.
If you followed Batch upload sample data, you learnt how to push an offline table and schema. Similar to the offline table config, we will create a realtime table config for the sample. Here's the realtime table config for the transcript table. For a more detailed overview about table, checkout Table.
{
"tableName": "transcript",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"schemaName": "transcript",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "transcript-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "localhost:9876",
"realtime.segment.flush.threshold.time": "3600000",
"realtime.segment.flush.threshold.size": "50000",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}
Now that we have our table and schema, let's upload them to the cluster. As soon as the realtime table is created, it will begin ingesting from the Kafka topic.
docker run \
--network=pinot-demo \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-streaming-table-creation \
apachepinot/pinot:latest AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
-controllerHost pinot-quickstart \
-controllerPort 9000 \
-exec
bin/pinot-admin.sh AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-realtime.json \
-exec
Here's a JSON file for transcript table data:
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"Maths","score":3.8,"timestamp":1571900400000}
{"studentID":205,"firstName":"Natalie","lastName":"Jones","gender":"Female","subject":"History","score":3.5,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Maths","score":3.2,"timestamp":1571900400000}
{"studentID":207,"firstName":"Bob","lastName":"Lewis","gender":"Male","subject":"Chemistry","score":3.6,"timestamp":1572418800000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Geography","score":3.8,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"English","score":3.5,"timestamp":1572505200000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Maths","score":3.2,"timestamp":1572678000000}
{"studentID":209,"firstName":"Jane","lastName":"Doe","gender":"Female","subject":"Physics","score":3.6,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"Maths","score":3.8,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"English","score":3.5,"timestamp":1572678000000}
{"studentID":211,"firstName":"John","lastName":"Doe","gender":"Male","subject":"History","score":3.2,"timestamp":1572854400000}
{"studentID":212,"firstName":"Nick","lastName":"Young","gender":"Male","subject":"History","score":3.6,"timestamp":1572854400000}
Push sample JSON into Kafka topic, using the Kafka script from the Kafka download
bin/kafka-console-producer.sh \
--broker-list localhost:9876 \
--topic transcript-topic < /tmp/pinot-quick-start/rawData/transcript.json
As soon as data flows into the stream, the Pinot table will consume it and it will be ready for querying. Head over to the Query Console to checkout the realtime data
Explore the data on our Pinot cluster
Now that the QuickStartCluster is setup, we can start exploring the data and the APIs. Head over to in your browser.
You are now connected to the Pinot controller. Let's take a look at the following two features.
let's us run queries on the data in the Pinot cluster
We can see our baseballStats
table listed on the left (you will see meetupRSVP
or airlineStats
if you used the streaming or the hybrid quick start). Clicking on the table name should display all the names and data types of the columns of the table, and also execute a sample query select * from baseballStats limit 10
. You can query this table by typing your query in the text box and clicking the Run Query
button.
Here's some other queries you can try out:
select playerName, max(hits) from baseballStats group by playerName order by max(hits) desc
select sum(hits), sum(homeRuns), sum(numberOfGames) from baseballStats where yearID > 2010
select * from baseballStats order by league
Pinot supports a subset of standard SQL. See for more information.
The contains all the APIs that you will need to operate and manage your cluster. It provides a set of APIs for Pinot cluster management including health check, instances management, schema and table management, data segments management.
Let's check out the tables in this cluster by going to and click on Try it out!
. We can see the baseballStats
table listed here. We can also see the exact curl
call made to the controller API.
You can look at the configuration of this table by going to , type in baseballStats
in the table name, and click Try it out!
Let's check out the schemas in the cluster by going to and click Try it out!
. We can see a schema called baseballStats
in this list.
Take a look at the schema by going to , type baseballStats
in the schema name, and click Try it out!
.
Finally, let's checkout the data segments in the cluster by going to , type in baseballStats
in the table name, and click Try it out!
. There's 1 segment for this table, called baseballStats_OFFLINE_0
.
You might have figured out by now, in order to get data into the Pinot cluster, we need a table, a schema and segments. Let's head over to , to find out more about these components and learn how to create them for your own data.
To consume in realtime, we simply need to create a table with the same name as the schema and point to the Kafka topic to consume from, using a table definition such as this one:
First, we’ll start a local instance of Kafka and start streaming data into it:Untitled
This will stream one event per second from the Avro file to the Kafka topic. Then, we’ll create a realtime table, which will start consuming from the Kafka topic.
We can then query the table with the following query to see the events stream in:
Repeating the query multiple times should show the events slowly being streamed into the table.
{
"tableName": "flights",
"tableType": "REALTIME",
"segmentsConfig": {
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7",
"segmentPushFrequency": "daily",
"segmentPushType": "APPEND",
"replication": "1",
"timeColumnName": "daysSinceEpoch",
"timeType": "DAYS",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
},
"tableIndexConfig": {
"invertedIndexColumns": [
"flightNumber",
"tags",
"daysSinceEpoch"
],
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "flights-realtime",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.zk.broker.url": "localhost:2181",
"stream.kafka.hlc.zk.connect.string": "localhost:2181"
}
},
"tenants": {
"broker": "brokerTenant",
"server": "serverTenant"
},
"metadata": {
}
}
bin/pinot-admin.sh StartKafka &
bin/pinot-admin.sh StreamAvroIntoKafka -avroFile flights-2014.avro -kafkaTopic flights-realtime &
bin/pinot-admin.sh AddTable -filePath flights-definition-realtime.json
SELECT COUNT(*) FROM flights
Note
This section is a pre-read if you are planning to develop plug-ins for streams other than Kafka. Pinot supports Kafka out of the box.
Prior to commit ba9f2d, Pinot was only able to support consuming from Kafka stream.
Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to Issue #2583)
Some of the streams for which plug-ins can be added are:
You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.
Refer to Consuming and Indexing rows in Realtime for details on how Pinot consumes streaming data.
{
"schemaName": "baseballStats",
"dimensionFieldSpecs": [
{
"name": "playerID",
"dataType": "STRING"
},
{
"name": "yearID",
"dataType": "INT"
},
{
"name": "teamID",
"dataType": "STRING"
},
{
"name": "league",
"dataType": "STRING"
},
{
"name": "playerName",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "playerStint",
"dataType": "INT"
},
{
"name": "numberOfGames",
"dataType": "INT"
},
{
"name": "numberOfGamesAsBatter",
"dataType": "INT"
},
{
"name": "AtBatting",
"dataType": "INT"
},
{
"name": "runs",
"dataType": "INT"
},
{
"name": "hits",
"dataType": "INT"
},
{
"name": "doules",
"dataType": "INT"
},
{
"name": "tripples",
"dataType": "INT"
},
{
"name": "homeRuns",
"dataType": "INT"
},
{
"name": "runsBattedIn",
"dataType": "INT"
},
{
"name": "stolenBases",
"dataType": "INT"
},
{
"name": "caughtStealing",
"dataType": "INT"
},
{
"name": "baseOnBalls",
"dataType": "INT"
},
{
"name": "strikeouts",
"dataType": "INT"
},
{
"name": "intentionalWalks",
"dataType": "INT"
},
{
"name": "hitsByPitch",
"dataType": "INT"
},
{
"name": "sacrificeHits",
"dataType": "INT"
},
{
"name": "sacrificeFlies",
"dataType": "INT"
},
{
"name": "groundedIntoDoublePlays",
"dataType": "INT"
},
{
"name": "G_old",
"dataType": "INT"
}
]
}
Introduction to Apache Pinot, a real-time distributed OLAP datastore.
Pinot is a real-time distributed OLAP datastore, built to deliver scalable real-time analytics with low latency. It can ingest from batch data sources (such as Hadoop HDFS, Amazon S3, Azure ADLS, Google Cloud Storage) as well as stream data sources (such as Apache Kafka).
Pinot was built by engineers at LinkedIn and Uber and is designed to scale up and out with no upper bound. Performance always remains constant based on the size of your cluster and an expected query per second (QPS) threshold.
Our documentation is structured to let you quickly get to the content you need and is organized around the different concerns of users, operators, and developers. If you're new to Pinot and want to learn things by example, please take a look at our getting started section.
To start importing data into Pinot, check out our guides on batch import and stream ingestion based on our plugin architecture.
Pinot works very well for querying time series data with many dimensions and metrics over a vast unbounded space of records that scales linearly on a per node basis. Filters and aggregations are both easy and fast.
SELECT sum(clicks), sum(impressions) FROM AdAnalyticsTable
WHERE
((daysSinceEpoch >= 17849 AND daysSinceEpoch <= 17856)) AND
accountId IN (123456789)
GROUP BY
daysSinceEpoch TOP 100
Pinot supports SQL for querying read-only data. Learn more about querying Pinot for time series data in our PQL (Pinot Query Language) guide.
Pinot may be deployed to and operated on a cloud provider or a local or virtual machine. You may get started either with a bare-metal installation or a Kubernetes one (either locally or in the cloud). To get immediately started with Pinot, check out these quick start guides for bootstrapping a Pinot cluster using Docker or Kubernetes.
For a high-level overview that explains how Pinot works, please take a look at our basic concepts section.
To understand the distributed systems architecture that explains Pinot's operating model, please take a look at our basic architecture section.
This section focuses on answering the most frequently asked questions for people exploring the newly evolving category of distributed OLAP engines. Pinot was created by authors at both Uber and LinkedIn and has been hardened and battle tested at the very highest of load and scale.
While Pinot doesn't match the typical mold of a database product, it is best understood based on your role as either an analyst, data scientist, or application developer.
Enterprise business intelligence
For analysts and data scientists, Pinot is best viewed as a highly-scalable data platform for business intelligence. In this view, Pinot converges big data platforms with the traditional role of a data warehouse, making it a suitable replacement for analysis and reporting.
Enterprise application development
For application developers, Pinot is best viewed as an immutable aggregate store that sources events from streaming data sources, such as Kafka, and makes it available for query using SQL.
As is the case with a microservice architecture, data encapsulation ends up requiring each application to provision its own data store, as opposed to sharing one OLTP database for reads and writes. In this case, it becomes difficult to query the complete view of a domain because it becomes stored in many different databases. This is costly in terms of performance, since it requires joins across multiple microservices that expose their data over HTTP under a REST API. To prevent this, Pinot can be used to aggregate all of the data across a microservice architecture into one easily queryable view of the domain.
Pinot tenants prevent any possibility of sharing ownership of database tables across microservice teams. Developers can create their own query models of data from multiple systems of record depending on their use case and needs. As with all aggregate stores, query models are eventually consistent and immutable.
Company
Notes
Pinot originated at LinkedIn and it powers more 50+ user facing applications such as Who Viewed My Profile, Talent Analytics, Company Analytics, Ad Analytics and many more. Pinot also serves as the backend for to visualize and monitor 10,000+ business metrics.
Pinot runs on 1000+ nodes serving 100k+ queries while ingesting 1.5M+ events per second.
Uber
Pinot powers many internal and external dashboards as well as external site facing analytics applications like .
Microsoft
Microsoft Teams uses Pinot for analytics on Teams product usage data.
Weibo uses Pinot for realtime analytics on CDN & Weibo Video data to make business decisions, optimize service performance and improve user experience.
Factual
Insight Product -
A column-oriented database with various compression schemes such as Run Length, Fixed Bit Length
Pluggable indexing technologies - Sorted Index, Bitmap Index, Inverted Index
Ability to optimize query/execution plan based on query and segment metadata
Near real time ingestion from streams and batch ingestion from Hadoop
SQL-like language that supports selection, aggregation, filtering, group by, order by, distinct queries on data
Support for multi-valued fields
Horizontally scalable and fault-tolerant
Pinot is designed to execute OLAP queries with low latency. It is suited in contexts where fast analytics, such as aggregations, are needed on immutable data, possibly, with real-time data ingestion.
User facing Analytics Products
Pinot was originally built at LinkedIn to power rich interactive real-time analytic applications such as Who Viewed Profile, Company Analytics, Talent Insights, and many more. UberEats Restaurant Manager is another example of a customer facing Analytics App. At LinkedIn, Pinot powers 50+ user-facing products, ingesting millions of events per second and serving 100k+ queries per second at millisecond latency.
Real-time Dashboard for Business Metrics
Pinot can be also be used to perform typical analytical operations such as slice and dice, drill down, roll up, and pivot on large scale multi-dimensional data. For instance, at LinkedIn, Pinot powers dashboards for thousands of business metrics. One can connect various BI tools such Superset, Tableau, or PowerBI to visualize data in Pinot.
Instructions to connect Pinot with Superset can found here.
Anomaly Detection
In addition to visualizing data in Pinot, one can run Machine Learning Algorithms to detect Anomalies on the data stored in Pinot. See ThirdEye for more information on how to use Pinot for Anomaly Detection and Root Cause Analysis.
This page has a collection of frequently asked questions with answers from the community.
We have toJsonStr(key)
function which can store a top level json field as a STRING in Pinot.
Then you can use jsonExtractScalar(JSON_STRING_FIELD, JSON_PATH, OUTPUT_FORMAT)
function during query time to fetch the desired field from the json string. For example
NOTE This works well if some of your fields are nested json, but most of your fields are top level json keys. If all of your fields are within a nested JSON key, you will have to store the entire payload as 1 column, which is not ideal.
Support for flattening during ingestion is on the roadmap:
Inverted indexes are set in the tableConfig's tableIndexConfig -> invertedIndexColumns list. Here's the documentation for tableIndexConfig: along with a sample table that has set inverted indexes on some columns.
Applying inverted indexes to a table config will generate inverted index to all new segments. In order to apply the inverted indexes to all existing segments, follow steps in
Add the columns you wish to index to the tableIndexConfig-> invertedIndexColumns list. This sample table config show inverted indexes set: To update the table config use the Pinot Swagger API:
Invoke the reload API:
Right now, there’s no easy way to confirm that reload succeeded. One way it to check out the index_map file inside the segment metadata, you should see inverted index entries for the new columns. An API for this is coming soon:
Here's the page explaining the Pinot response format:
"timestamp" is a reserved keyword in SQL. Escape timestamp with double quotes.
Other commonly encountered reserved keywords are date, time, table.
For filtering on STRING columns, use single quotes
The fields in the ORDER BY
clause must be one of the group by clauses or aggregations, BEFORE applying the alias. Therefore, this will not work
Instead, this will work
You can change the number of replicas by updating the table config's section. Make sure you have at least as many servers as the replication.
For OFFLINE table, update
For REALTIME table update
After changing the replication, run a .
A rebalance is run to reassign all the segments of a table to the available servers. This is typically done when capacity changes are done i.e. adding more servers or removing servers from a table.
Offline
Use the rebalance API from the Swagger APIs on the controller , with tableType OFFLINE
Realtime
Use the rebalance API from the Swagger APIs on the controller , with tableType REALTIME.
A realtime table has 2 components, the consuming segments and the completed segments. By default, only the completed segments will get rebalanced. The consuming segments will pick the right assignment once they complete. But you can enforce the consuming segments to also be included in the rebalance, by setting the param includeConsuming
to true. Note that rebalancing the consuming segments would mean the consuming segment will drop the consumed data so far, and restart consumption from the last offset, which may lead to a short duration of data staleness.
You can check the status of the rebalance by
Checking the controller logs
Running rebalance again after a while, you should receive status "status": "NO_OP"
Checking the External View of the table, to see the changes in capacity/replicas have taken effect.
Yes, replica groups work for realtime. There's 2 parts to enabling replica groups:
Replica groups segment assignment
Replica group query routing
Replica group segment assignment
Replica group segment assignment is achieved in realtime, if number of servers is a multiple of number of replicas. The partitions get uniformly sprayed across the servers, creating replica groups.
For example, consider we have 6 partitions, 2 replicas, and 4 servers.
As you can see, the set (S0, S2) contains r1 of every partition, and (s1, S3) contains r2 of every partition. The query will only be routed to one of the sets, and not span every server. If you are are adding/removing servers from an existing table setup, you have to run for segment assignment changes to take effect.
Replica group query routing
Once replica group segment assignment is in effect, the query routing can take advantage of it. For replica group based query routing, set the following in the table config's section, and then restart brokers
Pinot enables its users to write a PinotFS abstraction layer to store data in a data layer of their choice for realtime and offline segments.
Some examples of storage backends(other than local storage) currently supported are:
If the above two filesystems do not meet your needs, you can extend the current to customize for your needs.
In order to add a new type of storage backend (say, Amazon s3) implement the following class:
S3FS extends
The example here uses the existing org.apache.pinot.filesystem.HadoopPinotFS to store realtime segments in a HDFS filesytem. In the Pinot controller config, add the following new configs:
In the Pinot controller config, add the following new configs:
Note: currently there is a bug in the controller (issue <), for now you can cherrypick the PR to fix the issue as tested already. The PR is under review now.
These properties for the stream implementation are to be set in your controller and server configurations.
In your controller and server configs, please set the FS class you would like to support. pinot.controller.storage.factory.class.${YOUR_URI_SCHEME} to the full path of the FS class you would like to include
You also need to configure pinot.controller.local.temp.dir for the local dir on the controller machine.
For filesystem specific configs, you can pass in the following with either the pinot.controller prefix or the pinot.server prefix.
All the following configs need to be prefixed with storage.factory.
AzurePinotFS requires the following configs according to your environment:
adl.accountId, adl.authEndpoint, adl.clientId, adl.clientSecret
Sample Controller Config
Sample Server Config
You can find the parameters in your account as follows:
Please also make sure to set the following config with the value “adl”
To see how to upload segments to different storage systems, check ../segment_fetcher.rst
.
HadoopPinotFS requires the following configs according to your environment:
hadoop.kerberos.principle, hadoop.kerberos.keytab, hadoop.conf.path
Please make sure to also set the following config with the value “hdfs”
This page describes how to connect Kafka to Pinot
Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.
Use Kafka Stream(High) Level Consumer
Below is a sample streamConfigs
used to create a realtime table with Kafka Stream(High) level consumer.
Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
in config stream.kafka.consumer.factory.class.name
.
Use Kafka Partition(Low) Level Consumer
Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:
Please note:
Config replicasPerPartition
under segmentsConfig
is required to specify table replication.
Config stream.kafka.consumer.type
should be specified as LowLevel
to use partition level consumer. (The use of simple
instead of LowLevel
is deprecated)
Configs stream.kafka.zk.broker.url
and stream.kafka.broker.list
are required under tableIndexConfig.streamConfigs
to provide kafka related information.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In pinot-connector-kafka-2.0/pom.xml
change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.hlc.bootstrap.server": "localhost:19092"
}
{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
},
"metadata": {
"customConfigs": {}
}
}
Select jsonExtractScalar(myJsonMapStr,'$.k1','STRING')
from myTable
where jsonExtractScalar(myJsonMapStr,'$.k1','STRING') = 'value-k1-0'"
Select sum(jsonExtractScalar(complexMapStr,'$.k4.met','INT'))
from myTable
group by jsonExtractScalar(complexMapStr,'$.k1','STRING')
select "timestamp" from myTable
SELECT COUNT(*) from myTable WHERE column = 'foo'
SELECT count(colA) as aliasA, colA from tableA GROUP BY colA ORDER BY aliasA
SELECT count(colA) as sumA, colA from tableA GROUP BY colA ORDER BY count(colA)
{
"tableName": "pinotTable",
"tableType": "OFFLINE",
"segmentsConfig": {
"replication": "3",
...
}
..
{
"tableName": "pinotTable",
"tableType": "REALTIME",
"segmentsConfig": {
"replicasPerPartition": "3",
...
}
..
r1
r2
p1
S0
S1
p2
S2
S3
p3
S0
S1
p4
S2
S3
p5
S0
S1
p6
S2
S3
{
"tableName": "pinotTable",
"tableType": "REALTIME",
"routing": {
"instanceSelectorType": "replicaGroup"
}
..
}
"controller.data.dir": "SET_TO_YOUR_HDFS_ROOT_DIR"
"controller.local.temp.dir": "SET_TO_A_LOCAL_FILESYSTEM_DIR"
"pinot.controller.storage.factory.class.hdfs": "org.apache.pinot.filesystem.HadoopPinotFS"
"pinot.controller.storage.factory.hdfs.hadoop.conf.path": "SET_TO_YOUR_HDFS_CONFIG_DIR"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.principle": "SET_IF_YOU_USE_KERBEROS"
"pinot.controller.storage.factory.hdfs.hadoop.kerberos.keytab": "SET_IF_YOU_USE_KERBEROS"
"controller.enable.split.commit": "true"
"pinot.server.instance.enable.split.commit": "true"
"pinot.controller.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.controller.storage.factory.adl.accountId": "xxxx"
"pinot.controller.storage.factory.adl.authEndpoint": "xxxx"
"pinot.controller.storage.factory.adl.clientId": "xxxx"
"pinot.controller.segment.fetcher.protocols": "adl"
"pinot.server.storage.factory.class.adl": "org.apache.pinot.filesystem.AzurePinotFS"
"pinot.server.storage.factory.adl.accountId": "xxxx"
"pinot.server.storage.factory.adl.authEndpoint": "xxxx"
"pinot.server.storage.factory.adl.clientId": "xxxx"
"pinot.server.segment.fetcher.protocols": "adl"
"segment.fetcher.protocols" : "adl"
"segment.fetcher.protocols" : "hdfs"
bin/kafka-topics.sh --create --bootstrap-server localhost:9876 --replication-factor 1 --partitions 1 --topic transcript-topic
Pinot quick start in Kubernetes
Before continuing, please make sure that you've downloaded Apache Pinot. The scripts for the setup in this guide can be found in our open source project on GitHub.
The scripts can be found in the Pinot source at ./incubator-pinot/kubernetes/helm
# checkout pinot
git clone https://github.com/apache/incubator-pinot.git
cd incubator-pinot/kubernetes/helm
Pinot repo has pre-packaged HelmCharts for Pinot and Presto. Helm Repo index file is here.
helm repo add pinot https://raw.githubusercontent.com/apache/incubator-pinot/master/kubernetes/helm
kubectl create ns pinot-quickstart
helm install pinot pinot/pinot \
-n pinot-quickstart \
--set cluster.name=pinot \
--set server.replicaCount=2
helm dependency update
For Helm v2.12.1
If your Kubernetes cluster is recently provisioned, ensure Helm is initialized by running:
helm init --service-account tiller
Then deploy a new HA Pinot cluster using the following command:
helm install --namespace "pinot-quickstart" --name "pinot" .
For Helm v3.0.0
kubectl create ns pinot-quickstart
helm install -n pinot-quickstart pinot .
Error: Please run the below command if encountering the following issue:
Error: could not find tiller.
Resolution:
kubectl -n kube-system delete deployment tiller-deploy
kubectl -n kube-system delete service/tiller-deploy
helm init --service-account tiller
Error: Please run the command below if encountering a permission issue:
Error: release pinot failed: namespaces "pinot-quickstart" is forbidden: User "system:serviceaccount:kube-system:default" cannot get resource "namespaces" in API group "" in the namespace "pinot-quickstart"
Resolution:
kubectl apply -f helm-rbac.yaml
kubectl get all -n pinot-quickstart
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install -n pinot-quickstart kafka incubator/kafka --set replicas=1
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
helm install --namespace "pinot-quickstart" --name kafka incubator/kafka
kubectl get all -n pinot-quickstart |grep kafka
Ensure the Kafka deployment is ready before executing the scripts in the following next steps.
pod/kafka-0 1/1 Running 0 2m
pod/kafka-zookeeper-0 1/1 Running 0 10m
pod/kafka-zookeeper-1 1/1 Running 0 9m
pod/kafka-zookeeper-2 1/1 Running 0 8m
The scripts below will create two Kafka topics for data ingestion:
kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime --create --partitions 1 --replication-factor 1
kubectl -n pinot-quickstart exec kafka-0 -- kafka-topics --zookeeper kafka-zookeeper:2181 --topic flights-realtime-avro --create --partitions 1 --replication-factor 1
The script below will deploy 3 batch jobs.
Ingest 19492 JSON messages to Kafka topic flights-realtime
at a speed of 1 msg/sec
Ingest 19492 Avro messages to Kafka topic flights-realtime-avro
at a speed of 1 msg/sec
Upload Pinot schema airlineStats
Create Pinot table airlineStats
to ingest data from JSON encoded Kafka topic flights-realtime
Create Pinot table airlineStatsAvro
to ingest data from Avro encoded Kafka topic flights-realtime-avro
kubectl apply -f pinot-realtime-quickstart.yml
Please use the script below to perform local port-forwarding, which will also open Pinot query console in your default web browser.
This script can be found in the Pinot source at ./incubator-pinot/kubernetes/helm
./query-pinot-data.sh
kubectl apply -f superset.yaml
kubectl exec -it pod/superset-0 -n pinot-quickstart -- bash -c 'flask fab create-admin'
kubectl exec -it pod/superset-0 -n pinot-quickstart -- bash -c 'superset db upgrade'
kubectl exec -it pod/superset-0 -n pinot-quickstart -- bash -c 'superset init'
kubectl exec -it pod/superset-0 -n pinot-quickstart -- bash -c 'superset import_datasources -p /etc/superset/pinot_example_datasource.yaml'
kubectl exec -it pod/superset-0 -n pinot-quickstart -- bash -c 'superset import_dashboards -p /etc/superset/pinot_example_dashboard.json'
You can run below command to navigate superset in your browser with the previous admin credential.
./open-superset-ui.sh
You can open the imported dashboard by clicking Dashboards
banner and then click on AirlineStats
.
You can run the command below to deploy a customized Presto with Pinot plugin installed.
helm install presto pinot/presto -n pinot
kubectl apply -f presto-coordinator.yaml
Once Presto is deployed, you can run the command below.
./pinot-presto-cli.sh
List all catalogs
presto:default> show catalogs;
Catalog
---------
pinot
system
(2 rows)
Query 20191112_050827_00003_xkm4g, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [0 rows, 0B] [0 rows/s, 0B/s]
List All tables
presto:default> show tables;
Table
--------------
airlinestats
(1 row)
Query 20191112_050907_00004_xkm4g, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:01 [1 rows, 29B] [1 rows/s, 41B/s]
Show schema
presto:default> DESCRIBE pinot.dontcare.airlinestats;
Column | Type | Extra | Comment
----------------------+---------+-------+---------
flightnum | integer | |
origin | varchar | |
quarter | integer | |
lateaircraftdelay | integer | |
divactualelapsedtime | integer | |
......
Query 20191112_051021_00005_xkm4g, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:02 [80 rows, 6.06KB] [35 rows/s, 2.66KB/s]
Count total documents
presto:default> select count(*) as cnt from pinot.dontcare.airlinestats limit 10;
cnt
------
9745
(1 row)
Query 20191112_051114_00006_xkm4g, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [1 rows, 8B] [2 rows/s, 19B/s]
kubectl delete ns pinot-quickstart
Prior to commit ba9f2d, Pinot was only able to support consuming from Kafka stream.
Pinot now enables its users to write plug-ins to consume from pub-sub streams other than Kafka. (Please refer to Issue #2583)
Some of the streams for which plug-ins can be added are:
You may encounter some limitations either in Pinot or in the stream system while developing plug-ins. Please feel free to get in touch with us when you start writing a stream plug-in, and we can help you out. We are open to receiving PRs in order to improve these abstractions if they do not work for a certain stream implementation.
Refer to Consuming and Indexing rows in Realtime for details on how Pinot consumes streaming data.
The stream should provide the following guarantees:
Exactly once delivery (unless restarting from a checkpoint) for each consumer of the stream.
(Optionally) support mechanism to split events (in some arbitrary fashion) so that each event in the stream is delivered exactly to one host out of set of hosts.
Provide ways to save a checkpoint for the data consumed so far. If the stream is partitioned, then this checkpoint is a vector of checkpoints for events consumed from individual partitions.
The checkpoints should be recorded only when Pinot makes a call to do so.
The consumer should be able to start consumption from one of:
latest avaialble data
earliest available data
last saved checkpoint
While consuming rows at a partition level, the stream should support the following properties:
Stream should provide a mechanism to get the current number of partitions.
Each event in a partition should have a unique offset that is not more than 64 bits long.
Refer to a partition as a number not exceeding 32 bits long.
Stream should provide the following mechanisms to get an offset for a given partition of the stream:
get the offset of the oldest event available (assuming events are aged out periodically) in the partition.
get the offset of the most recent event published in the partition
(optionally) get the offset of an event that was published at a specified time
Stream should provide a mechanism to consume a set of events from a partition starting from a specified offset.
Pinot assumes that the offsets of incoming events are monotonically increasing; i.e., if Pinot consumes an event at offset o1
, then the offset o2
of the following event should be such that o2 > o1
.
In addition, we have an operational requirement that the number of partitions should not be reduced over time.
In order to add a new type of stream (say,Foo) implement the following classes:
FooConsumerFactory extends StreamConsumerFactory
FooPartitionLevelConsumer implements PartitionLevelConsumer
FooStreamLevelConsumer implements StreamLevelConsumer
FooMetadataProvider implements StreamMetadataProvider
FooMessageDecoder implements StreamMessageDecoder
Depending on stream level or partition level, your implementation needs to include StreamLevelConsumer or PartitionLevelConsumer.
The properties for the stream implementation are to be set in the table configuration, inside streamConfigs section.
Use the streamType
property to define the stream type. For example, for the implementation of stream foo
, set the property "streamType" : "foo"
.
The rest of the configuration properties for your stream should be set with the prefix "stream.foo"
. Be sure to use the same suffix for: (see examples below):
topic
consumer type
stream consumer factory
offset
decoder class name
decoder properties
connnection timeout
fetch timeout
All values should be strings. For example:
"streamType" : "foo",
"stream.foo.topic.name" : "SomeTopic",
"stream.foo.consumer.type": "LowLevel",
"stream.foo.consumer.factory.class.name": "fully.qualified.pkg.ConsumerFactoryClassName",
"stream.foo.consumer.prop.auto.offset.reset": "largest",
"stream.foo.decoder.class.name" : "fully.qualified.pkg.DecoderClassName",
"stream.foo.decoder.prop.a.decoder.property" : "decoderPropValue",
"stream.foo.connection.timeout.millis" : "10000", // default 30_000
"stream.foo.fetch.timeout.millis" : "10000" // default 5_000
You can have additional properties that are specific to your stream. For example:
"stream.foo.some.buffer.size" : "24g"
In addition to these properties, you can define thresholds for the consuming segments:
rows threshold
time threshold
The properties for the thresholds are as follows:
"realtime.segment.flush.threshold.size" : "100000"
"realtime.segment.flush.threshold.time" : "6h"
An example of this implementation can be found in the KafkaConsumerFactory, which is an implementation for the kafka stream.
Pinot provides stream plugin support for Kafka 2.x version. Although the version used in this implementation is kafka 2.0.0, it’s possible to compile it with higher kafka lib version, e.g. 2.1.1.
mvn clean package -DskipTests -Pbin-dist -Dkafka.version=2.0
Use Kafka Stream(High) Level Consumer
Below is a sample streamConfigs
used to create a realtime table with Kafka Stream(High) level consumer.
Kafka 2.x HLC consumer uses org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
in config stream.kafka.consumer.factory.class.name
.
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "highLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string": "localhost:2191/kafka",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.hlc.bootstrap.server": "localhost:19092"
}
Use Kafka Partition(Low) Level Consumer
Below is a sample table config used to create a realtime table with Kafka Partition(Low) level consumer:
{
"tableName": "meetupRsvp",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "mtime",
"timeType": "MILLISECONDS",
"segmentPushType": "APPEND",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "meetupRsvp",
"replication": "1",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "meetupRSVPEvents",
"stream.kafka.decoder.class.name": "org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory",
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
},
"metadata": {
"customConfigs": {}
}
}
Please note:
Config replicasPerPartition
under segmentsConfig
is required to specify table replication.
Config stream.kafka.consumer.type
should be specified as LowLevel
to use partition level consumer. (The use of simple
instead of LowLevel
is deprecated)
Configs stream.kafka.zk.broker.url
and stream.kafka.broker.list
are required under tableIndexConfig.streamConfigs
to provide kafka related information.
Update table config for both high level and low level consumer: Update config: stream.kafka.consumer.factory.class.name
from org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory
to org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory
.
If using Stream(High) level consumer: Please also add config stream.kafka.hlc.bootstrap.server
into tableIndexConfig.streamConfigs
. This config should be the URI of Kafka broker lists, e.g. localhost:9092
.
This connector is also suitable for Kafka lib version higher than 2.0.0
. In pinot-connector-kafka-2.0/pom.xml
change the kafka.lib.version
from 2.0.0
to 2.1.1
will make this Connector working with Kafka 2.1.1
.
TODO: Deprecated
Before proceeding to contributing changes to Pinot, review the contents of this section.
Pinot depends on a number of external projects, the most notable ones are:
Apache Zookeeper
Apache Helix
Apache Kafka
Apache Thrift
Netty
Google Guava
Yammer
Helix is used for ClusterManagement, and Pinot code is tightly integrated with Helix and Zookeeper interfaces.
Kafka is the default realtime stream provider, but can be replaced with others. See customizations section for more info.
Thrift is used for message exchange between broker and server components, with Netty providing the server functionality for processing messages in a non-blocking fashion.
Guava is used for number of auxiliary components such as Caches and RateLimiters. Yammer metrics is used to register and expose metrics from Pinot components.
In addition, Pinot relies on several key external libraries for some of its core functionality: Roaring Bitmaps: Pinot’s inverted indices are built using RoaringBitmap library. t-Digest: Pinot’s digest based percentile calculations are based on T-Digest library.
Pinot is a multi-module project, with each module providing specific functionality that helps us to build services from a combination of modules. This helps keep clean interface contracts between different modules as well as reduce the overall executable size for individually deployable component.
Each module has a src/main/java
folder where the code resides and src/test/java
where the unit tests corresponding to the module’s code reside.
The following figure provides a high-level overview of the foundational Pinot modules.
pinot-common
provides classes common to Pinot components. Some key classes you will find here are:
config
: Definitions for various elements of Pinot’s table config.
metrics
: Definitions for base metrics provided by Controller, Broker and Server.
metadata
: Definitions of metadata stored in Zookeeper.
pql.parsers
: Code to compile PQL strings into corresponding AbstractSyntaxTrees (AST).
request
: Autogenerated thrift classes representing various parts of PQL requests.
response
: Definitions of response format returned by the Broker.
filesystem
: provides abstractions for working with segments
on local or remote filesystems. This module allows for users to plugin filesystems specific to their usecase. Extensions to the base PinotFS
should ideally be housed in their specific modules so as not pull in unnecessary dependencies for all users.
pinot-transport
module provides classes required to handle scatter-gather on Pinot Broker and netty wrapper classes used by Server to handle connections from Broker.
pinot-core
modules provides the core functionality of Pinot, specifically for handling segments, various index structures, query execution - filters, transformations, aggregations etc and support for realtime segments.
pinot-server
provides server specific functionality including server startup and REST APIs exposed by the server.
pinot-controller
houses all the controller specific functionality, including many cluster administration APIs, segment upload (for both offline and realtime), segment assignment, retention strategies etc.
pinot-broker
provides broker functionality that includes wiring the broker startup sequence, building broker routing tables, PQL request handling.
pinot-minion
provides functionality for running auxiliary/periodic tasks on a Pinot Cluster such as purging records for compliance with regulations like GDPR.
pinot-hadoop
provides classes for segment generation jobs using Hadoop infrastructure.
In addition to the core modules described above, Pinot code provides the following modules:
pinot-tools
: This module is a collection of many tools useful for setting up Pinot cluster, creating/updating segments.It also houses the Pinot quick start guide code.
pinot-perf
: This module has a collection of benchmark test code used to evaluate design options.
pinot-client-api
: This module houses the Java client API. See Executing queries via Java Client API for more info.
pinot-integration-tests
: This module holds integration tests that test functionality across multiple classes or components.
These tests typically do not rely on mocking and provide more end to end coverage for code.
pinot-hadoop-filesystem
and pinot-azure-filesystem
are module added to support extensions to Pinot filesystem. The functionality is broken down into modules of their own to avoid polluting the common modules with additional large libraries. These libraries bring in transitive dependencies of their own that can cause classpath conflicts at runtime. We would like to avoid this for the common usage of Pinot as much as possible.
Before you begin to contribute, make sure you have reviewed and sections and that you have created your own fork of the pinot source code.
If your change is relatively minor, you can skip this step. If you are adding new major feature, we suggest that you add a design document and solicit comments from the community before submitting any code.
is a list of current design documents.
Create a Pinot issue for the change you would like to make. Provide information on why the change is needed and how you plan to address it. Use the conversations on the issue as a way to validate assumptions and the right way to proceed. Be sure to review sections on and .
If you have a design document, please refer to the design documents in your Issue. You may even want to create multiple issues depending on the extent of your change.
Once you are clear about what you want to do, proceed with the next steps listed below.
Make the necessary changes. If the changes you plan to make are too big, make sure you break it down into smaller tasks.
Follow the recommendations/best-practices noted here when you are making changes.
Please ensure your code is adequately documented. Some things to consider for documentation:
Always include class level java docs. At the top class level, we are looking for information about what functionality is provided by the class, what state is maintained by the class, whether there are concurrency/thread-safety concerns and any exceptional behavior that the class might exhibit.
Document public methods and their parameters.
Ensure there is adequate logging for positive paths as well as exceptional paths. As a corollary to this, ensure logs are not noisy.
Do not use System.out.println to log messages. Use the slf4j
loggers.
Use logging levels correctly: set level to debug
for verbose logs useful for only for debugging.
Do not log stack traces via printStackTrace
method of the exception.
Where possible, throw specific exceptions, preferably checked exceptions, so the callers can easily determine what the erroneous conditions that need to be handled are.
Avoid catching broad exceptions (ie, catch (Exception e)
blocks, except for when this is in the run() method of a thread/runnable.
Current Pinot code does not strictly adhere to this, but we would like to change this over time and adopt best practices around exception handling.
If you are making any changes to state stored, either in Zookeeper or in segments, make sure you consider both backward and forward compatibility issues.
For backward compatibility, consider cases where one component is using the new version and another is still on the old version. E.g., when the request format between broker and server is updated, consider resulting behaviors when a new broker is talking to an older server. Will it break?
For forward compatibility, consider rollback cases. E.g., consider what happens when state persisted by new code is handled by old code. Does the old code skip over new fields?
Be cautious about pulling in external dependencies. You will need to consider multiple things when faced with a need to pull in a new library.
What capability is the addition of the library providing you with? Can existing libraries provide this functionality (may be with a little bit of effort)?
Is the external library maintained by an active community of contributors?
What are the licensing terms for the library. For more information about handling licenses, see .
Are you adding the library to modules? This will affect the rest of the Pinot code base. If the new library pulls in a lot of transitive dependencies, then we might encounter unexpected issues with multiple classes in the classpath. These issues are hard to catch with tests as the order of loading the libraries at runtime matters. If you absolutely need the support, consider adding it via extension modules, see .
Automated tests are always recommended for contributions. Make sure you write tests so that:
You verify the correctness of your contribution. This serves as proof to you as well as the reviewers.
You future proof your contributions against code refactors or other changes. While this may not always be possible (see ), its a good goal to aim for.
Identify a list of tests for the changes you have made. Depending on the scope of changes, you may need one or more of the following tests:
Unit Tests
Make sure your code has the necessary class or method level unit tests. It is important to write both positive case as well as negative case tests. Document your tests well and add meaningful assertions in the tests; when the assertions fail, ensure that the right messages are logged with information that allows other to debug.
Integration Tests
Add integration tests to cover End-to-End paths without relying on mocking (see note below). You MUST
add integration tests for REST APIs, and must include tests that cover different error codes; i.e., 200 OK, 4xx or 5xx errors that are explicit contracts of the API.
Mocking
Use to mock classes to control specific behaviors - e.g., simulate various error conditions.
Validate assumptions in tests
Make sure that adequate asserts are added in the tests to verify that the tests are passing for the right reasons.
Write reliable tests
Make sure you are writing tests that are reliable. If the tests depend on asynchronous events to be fired, do not add sleep
to your tests. Where possible, use appropriate mocking or condition based triggers.
All source code files should have license headers. To automatically add the header for any new file you plan to checkin, run in pinot
top-level folder:
Note
If you checkin third-party code or files, please make sure you review Apache guidelines:
Once you determine the code you are pulling in adhere to the guidelines above, go ahead pull the changes in. Do not add license headers for them. Follow these instructions to ensure we are compliant with Apache Licensing process:
Under pinot/licenses
add a LICENSE-<newlib> file that has the license terms of the included library.
Update the pinot/LICENSE
file to indicate the newly added library file paths under the corresponding supported Licenses.
Update the exclusion rules for license
and rat
maven plugins in the parent pom: pinot/pom.xml
.
If attention to the licensing terms in not paid early on, they will be caught much later in the process, when we prepare to make a new release. Updating code at that time to work with the right libraries at that time might require bigger refactoring changes and delay the release process.
Verifying code-style
Run the following command to verify the code-style before posting a PR
Run tests
Before you create a review request for the changes, make sure you have run the corresponding unit tests for your changes. You can run individual tests via the IDE or via maven command-line. Finally run all tests locally by running mvn clean install -Pbin-dist
.
For changes that are related to performance issues or race conditions, it is hard to write reliable tests, so we recommend running manual stress tests to validate the changes. You MUST
note the manual tests done in the PR description.
Push changes and create a PR for review
Commit your changes with a meaningful commit message.
Once you receive comments on github on your changes, be sure to respond to them on github and address the concerns. If any discussions happen offline for the changes in question, make sure to capture the outcome of the discussion, so others can follow along as well.
It is possible that while your change is being reviewed, other changes were made to the master branch. Be sure to pull rebase your change on the new changes thus:
When you have addressed all comments and have an approved PR, one of the committers can merge your PR.
After your change is merged, check to see if any documentation needs to be updated. If so, create a PR for documentation.
Usually for new features, functionalities, API changes, documentation update is required to keep users up to date and keep track of our development.
Please follow this link to accordingly.
Pinot segments can be created offline on Hadoop, or via command line from data files. Controller REST endpoint can then be used to add the segment to the table to which the segment belongs. Pinot segments can also be created by ingesting data from realtime resources (such as Kafka).
Offline Pinot workflow
To create Pinot segments on Hadoop, a workflow can be created to complete the following steps:
Pre-aggregate, clean up and prepare the data, writing it as Avro format files in a single HDFS directory
Create segments
Upload segments to the Pinot cluster
Step one can be done using your favorite tool (such as Pig, Hive or Spark), Pinot provides two MapReduce jobs to do step two and three.
Create a job properties configuration file, such as one below:
The Pinot Hadoop module contains a job that you can incorporate into your workflow to generate Pinot segments.
You can then use the SegmentTarPush job to push segments via the controller REST API.
Here is how you can create Pinot segments from standard formats like CSV/JSON/AVRO.
Follow the steps described in the section on to build pinot. Locate pinot-admin.sh
in pinot-tools/target/pinot-tools=pkg/bin/pinot-admin.sh
.
Create a top level directory containing all the CSV/JSON/AVRO files that need to be converted into segments.
The file name extensions are expected to be the same as the format name (i.e .csv
, .json
or .avro
), and are case insensitive. Note that the converter expects the .csv
extension even if the data is delimited using tabs or spaces instead.
Prepare a schema file describing the schema of the input data. The schema needs to be in JSON format. See example later in this section.
Specifically for CSV format, an optional csv config file can be provided (also in JSON format). This is used to configure parameters like the delimiter/header for the CSV file etc. A detailed description of this follows below.
Run the pinot-admin command to generate the segments. The command can be invoked as follows. Options within “[ ]” are optional. For -format, the default value is AVRO.
To configure various parameters for CSV a config file in JSON format can be provided. This file is optional, as are each of its parameters. When not provided, default values used for these parameters are described below:
fileFormat: Specify one of the following. Default is EXCEL.
EXCEL
MYSQL
RFC4180
TDF
header: If the input CSV file does not contain a header, it can be specified using this field. Note, if this is specified, then the input file is expected to not contain the header row, or else it will result in parse error. The columns in the header must be delimited by the same delimiter character as the rest of the CSV file.
delimiter: Use this to specify a delimiter character. The default value is “,”.
multiValueDelimiter: Use this to specify a delimiter character for each value in multi-valued columns. The default value is “;”.
Below is a sample config file.
Sample Schema:
You can use curl to push a segment to pinot:
Alternatively you can use the pinot-admin.sh utility to upload one or more segments:
The command uploads all the segments found in segmentDirectoryPath
. The segments could be either tar-compressed (in which case it is a file under segmentDirectoryPath
) or uncompressed (in which case it is a directory under segmentDirectoryPath
).
$ cd pinot
#
# ensure you are starting from the latest code base
# the following steps, ensure your fork's (origin's) master is up-to-date
#
$ git fetch upstream
$ git checkout master
$ git merge upstream/master
# create a branch for your issue
$ git checkout -b <your issue branch>
mvn license:format
mvn checkstyle:check
$ git add <files required for the change>
$ git commit -m "Meaningful oneliner for the change"
$ git push origin <your issue branch>
After this, create a PullRequest in `github <https://github.com/apache/incubator-pinot/pulls>`_. Include the following information in the description:
* The changes that are included in the PR.
* Design document, if any.
* Information on any implementation choices that were made.
* Evidence of sufficient testing. You ``MUST`` indicate the tests done, either manually or automated.
Once the PR is created, the code base is compiled and all tests are run via ``travis``. Make sure you followup on any issues flagged by travis and address them.
If you see test failures that are intermittent, ``please`` create an issue to track them.
Once the ``travis`` run is clear, request reviews from atleast 2 committers on the project and be sure to gently to followup on the issue with the reviewers.
# commit your changes
$ git add <updated files>
$ git commit -m "Meaningful message for the udpate"
# pull new changes
$ git checkout master
$ git merge upstream/master
$ git checkout <your issue branch>
$ git rebase master
At this time, if rebase flags any conflicts, resolve the conflicts and follow the instructions provided by the rebase command.
Run additional tests/validations for the new changes and update the PR by pushing your changes:
$ git push origin <your issue branch>
# === Index segment creation job config ===
# path.to.input: Input directory containing Avro files
path.to.input=/user/pinot/input/data
# path.to.output: Output directory containing Pinot segments
path.to.output=/user/pinot/output
# path.to.schema: Schema file for the table, stored locally
path.to.schema=flights-schema.json
# segment.table.name: Name of the table for which to generate segments
segment.table.name=flights
# === Segment tar push job config ===
# push.to.hosts: Comma separated list of controllers host names to which to push
push.to.hosts=controller_host_0,controller_host_1
# push.to.port: The port on which the controller runs
push.to.port=8888
mvn clean install -DskipTests -Pbuild-shaded-jar
hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentCreation job.properties
hadoop jar pinot-hadoop-<version>-SNAPSHOT-shaded.jar SegmentTarPush job.properties
bin/pinot-admin.sh CreateSegment -dataDir <input_data_dir> [-format [CSV/JSON/AVRO]] [-readerConfigFile <csv_config_file>] [-generatorConfigFile <generator_config_file>] -segmentName <segment_name> -schemaFile <input_schema_file> -tableName <table_name> -outDir <output_data_dir> [-overwrite]
{
"fileFormat": "EXCEL",
"header": "col1,col2,col3,col4",
"delimiter": "\t",
"multiValueDelimiter": ","
}
{
"schemaName": "flights",
"dimensionFieldSpecs": [
{
"name": "flightNumber",
"dataType": "LONG"
},
{
"name": "tags",
"dataType": "STRING",
"singleValueField": false
}
],
"metricFieldSpecs": [
{
"name": "price",
"dataType": "DOUBLE"
}
],
"timeFieldSpec": {
"incomingGranularitySpec": {
"name": "daysSinceEpoch",
"dataType": "INT",
"timeType": "DAYS"
}
}
}
curl -X POST -F segment=@<segment-tar-file-path> http://controllerHost:controllerPort/segments
pinot-tools/target/pinot-tools-pkg/bin//pinot-admin.sh UploadSegment -controllerHost <hostname> -controllerPort <port> -segmentDir <segmentDirectoryPath>
This quick start guide will show you how to run a Pinot cluster using Docker.
Create an isolated bridge network in docker
docker network create -d bridge pinot-demo
We'll be using our docker image apachepinot/pinot:latest
to run this quick start, which does the following:
Sets up the Pinot cluster
Creates a sample table and loads sample data
There are 3 types of quick start examples.
Batch example
Streaming example
Hybrid example
In this example we demonstrate how to do batch processing with Pinot.
Starts Pinot deployment by starting
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
baseballStats
Launches a standalone data ingestion job
Builds one Pinot segment for a given CSV data file for table baseballStats
Pushes the built segment to the Pinot controller
Issues sample queries to Pinot
docker run \
--network=pinot-demo \
--name pinot-quickstart \
-p 9000:9000 \
-d apachepinot/pinot:latest QuickStart \
-type batch
Once the Docker container is running, you can view the logs by running the following command.
docker logs pinot-quickstart -f
That's it! We've spun up a Pinot cluster.
docker logs pinot-quickstart -f
Your cluster is ready once you see the cluster setup completion messages and sample queries, as demonstrated below.
You can head over to Exploring Pinot to check out the data in the baseballStats
table.
In this example we demonstrate how to do stream processing with Pinot.
Starts Pinot deployment by starting
Apache Kafka
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
meetupRsvp
Launches a meetup
**stream
Publishes data to a Kafka topic meetupRSVPEvents
to be subscribed to by Pinot
Issues sample queries to Pinot
# stop previous container, if any, or use different network
docker run \
--network=pinot-demo \
--name pinot-quickstart \
-p 9000:9000 \
-d apachepinot/pinot:latest QuickStart \
-type stream
Once the cluster is up, you can head over to Exploring Pinot to check out the data in the meetupRSVPEvents
table.
In this example we demonstrate how to do hybrid stream and batch processing with Pinot.
Starts Pinot deployment by starting
Apache Kafka
Apache Zookeeper
Pinot Controller
Pinot Broker
Pinot Server
Creates a demo table
airlineStats
Launches a standalone data ingestion job
Builds Pinot segments under a given directory of Avro files for table airlineStats
Pushes built segments to Pinot controller
Launches a **stream of flights stats
Publishes data to a Kafka topic airlineStatsEvents
to be subscribed to by Pinot
Issues sample queries to Pinot
# stop previous container, if any, or use different network
docker run \
--network=pinot-demo \
--name pinot-quickstart \
-p 9000:9000 \
-d apachepinot/pinot:latest QuickStart \
-type hybrid
Once the cluster is up, you can head over to Exploring Pinot to check out the data in the airlineStats
table.
Step-by-step guide on pushing your own data into the Pinot cluster
So far, we setup our cluster, ran some queries, explored the admin endpoints. Now, it's time to get our own data into Pinot
Let's gather our data files and put it in pinot-quick-start/rawdata
.
Supported file formats are CVS, JSON, AVRO, PARQUET, THRIFT, ORC. If you don't have sample data, you can use this sample CSV.
Schema is used to define the columns and data types of the Pinot table. A detailed overview of the schema can be found in .
Briefly, we categorize our columns into 3 types
For example, in our sample table, the playerID, yearID, teamID, league, playerName
columns are the dimensions, the playerStint, numberOfgames, numberOfGamesAsBatter, AtBatting, runs, hits, doules, triples, homeRuns, runsBattedIn, stolenBases, caughtStealing, baseOnBalls, strikeouts, intentionalWalks, hitsByPitch, sacrificeHits, sacrificeFlies, groundedIntoDoublePlays, G_old
columns are the metrics and there is no time column.
Once you have identified the dimensions, metrics and time columns, create a schema for your data, using the reference below.
A table config is used to define the config related to the Pinot table. A detailed overview of the table can be found in .
Here's the table config for the sample CSV file. You can use this as a reference to build your own table config. Simply edit the tableName and schemaName.
Check the directory structure so far
Upload the table config using the following command
Check out the table config and schema in the to make sure it was successfully uploaded.
A Pinot table's data is stored as Pinot segments. A detailed overview of the segment can be found in .
To generate a segment, we need to first create a job spec yaml file. JobSpec yaml file has all the information regarding data format, input data location and pinot cluster coordinates. You can just copy over this job spec file. If you're using your own data, be sure to 1) replace transcript
with your table name 2) set the right recordReaderSpec
Use the following command to generate a segment and upload it
Sample output
Check that your segment made it to the table using the
You're all set! You should see your table in the and be able to run queries against it now.
mkdir -p /tmp/pinot-quick-start/rawdata
studentID,firstName,lastName,gender,subject,score,timestamp
200,Lucy,Smith,Female,Maths,3.8,1570863600000
200,Lucy,Smith,Female,English,3.5,1571036400000
201,Bob,King,Male,Maths,3.2,1571900400000
202,Nick,Young,Male,Physics,3.6,1572418800000
Column Type
Description
Dimensions
Typically used in filters and group by, for slicing and dicing into data
Metrics
Typically used in aggregations, represents the quantitative data
Time
Optional column, represents the timestamp associated with each row
{
"schemaName": "transcript",
"dimensionFieldSpecs": [
{
"name": "studentID",
"dataType": "INT"
},
{
"name": "firstName",
"dataType": "STRING"
},
{
"name": "lastName",
"dataType": "STRING"
},
{
"name": "gender",
"dataType": "STRING"
},
{
"name": "subject",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "score",
"dataType": "FLOAT"
}
],
"dateTimeFieldSpecs": [{
"name": "timestamp",
"dataType": "LONG",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
{
"tableName": "transcript",
"segmentsConfig" : {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"replication" : "1",
"schemaName" : "transcript"
},
"tableIndexConfig" : {
"invertedIndexColumns" : [],
"loadMode" : "MMAP"
},
"tenants" : {
"broker":"DefaultTenant",
"server":"DefaultTenant"
},
"tableType":"OFFLINE",
"metadata": {}
}
$ ls /tmp/pinot-quick-start
rawdata transcript-schema.json transcript-table-offline.json
$ ls /tmp/pinot-quick-start/rawdata
transcript.csv
docker run --rm -ti \
--network=pinot-demo \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-batch-table-creation \
apachepinot/pinot:latest AddTable \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-offline.json \
-controllerHost pinot-quickstart \
-controllerPort 9000 -exec
bin/pinot-admin.sh AddTable \
-tableConfigFile /tmp/pinot-quick-start/transcript-table-offline.json \
-schemaFile /tmp/pinot-quick-start/transcript-schema.json -exec
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
schemaURI: 'http://localhost:9000/tables/transcript/schema'
tableConfigURI: 'http://localhost:9000/tables/transcript'
pinotClusterSpecs:
- controllerURI: 'http://localhost:9000'
executionFrameworkSpec:
name: 'standalone'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start/rawdata/'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
- scheme: file
className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
schemaURI: 'http://pinot-quickstart:9000/tables/transcript/schema'
tableConfigURI: 'http://pinot-quickstart:9000/tables/transcript'
pinotClusterSpecs:
- controllerURI: 'http://pinot-quickstart:9000'
docker run --rm -ti \
--network=pinot-demo \
-v /tmp/pinot-quick-start:/tmp/pinot-quick-start \
--name pinot-data-ingestion-job \
apachepinot/pinot:latest LaunchDataIngestionJob \
-jobSpecFile /tmp/pinot-quick-start/docker-job-spec.yml
bin/pinot-admin.sh LaunchDataIngestionJob \
-jobSpecFile /tmp/pinot-quick-start/batch-job-spec.yml
SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
excludeFileNamePattern: null
executionFrameworkSpec: {extraConfigs: null, name: standalone, segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner,
segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner,
segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner}
includeFileNamePattern: glob:**\/*.csv
inputDirURI: /tmp/pinot-quick-start/rawdata/
jobType: SegmentCreationAndTarPush
outputDirURI: /tmp/pinot-quick-start/segments
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: 'http://localhost:9000'}
pinotFSSpecs:
- {className: org.apache.pinot.spi.filesystem.LocalPinotFS, configs: null, scheme: file}
pushJobSpec: null
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.csv.CSVRecordReader,
configClassName: org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig,
configs: null, dataFormat: csv}
segmentNameGeneratorSpec: null
tableSpec: {schemaURI: 'http://localhost:9000/tables/transcript/schema', tableConfigURI: 'http://localhost:9000/tables/transcript',
tableName: transcript}
Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
Finished building StatsCollector!
Collected stats for 4 documents
Using fixed bytes value dictionary for column: studentID, size: 9
Created dictionary for STRING column: studentID with cardinality: 3, max length in bytes: 3, range: 200 to 202
Using fixed bytes value dictionary for column: firstName, size: 12
Created dictionary for STRING column: firstName with cardinality: 3, max length in bytes: 4, range: Bob to Nick
Using fixed bytes value dictionary for column: lastName, size: 15
Created dictionary for STRING column: lastName with cardinality: 3, max length in bytes: 5, range: King to Young
Created dictionary for FLOAT column: score with cardinality: 4, range: 3.2 to 3.8
Using fixed bytes value dictionary for column: gender, size: 12
Created dictionary for STRING column: gender with cardinality: 2, max length in bytes: 6, range: Female to Male
Using fixed bytes value dictionary for column: subject, size: 21
Created dictionary for STRING column: subject with cardinality: 3, max length in bytes: 7, range: English to Physics
Created dictionary for LONG column: timestamp with cardinality: 4, range: 1570863600000 to 1572418800000
Start building IndexCreator!
Finished records indexing in IndexCreator!
Finished segment seal!
Converting segment: /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0 to v3 format
v3 segment location for segment: transcript_OFFLINE_1570863600000_1572418800000_0 is /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3
Deleting files in v1 segment directory: /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0
Starting building 1 star-trees with configs: [StarTreeV2BuilderConfig[splitOrder=[studentID, firstName],skipStarNodeCreation=[],functionColumnPairs=[org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair@3a48efdc],maxLeafRecords=1]] using OFF_HEAP builder
Starting building star-tree with config: StarTreeV2BuilderConfig[splitOrder=[studentID, firstName],skipStarNodeCreation=[],functionColumnPairs=[org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair@3a48efdc],maxLeafRecords=1]
Generated 3 star-tree records from 4 segment records
Finished constructing star-tree, got 9 tree nodes and 4 records under star-node
Finished creating aggregated documents, got 6 aggregated records
Finished building star-tree in 10ms
Finished building 1 star-trees in 27ms
Computed crc = 3454627653, based on files [/var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3/columns.psf, /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3/index_map, /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3/metadata.properties, /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3/star_tree_index, /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0/v3/star_tree_index_map]
Driver, record read time : 0
Driver, stats collector time : 0
Driver, indexing time : 0
Tarring segment from: /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0 to: /var/folders/3z/qn6k60qs6ps1bb6s2c26gx040000gn/T/pinot-1583443148720/output/transcript_OFFLINE_1570863600000_1572418800000_0.tar.gz
Size for segment: transcript_OFFLINE_1570863600000_1572418800000_0, uncompressed: 6.73KB, compressed: 1.89KB
Trying to create instance for class org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner
Initializing PinotFS for scheme file, classname org.apache.pinot.spi.filesystem.LocalPinotFS
Start pushing segments: [/tmp/pinot-quick-start/segments/transcript_OFFLINE_1570863600000_1572418800000_0.tar.gz]... to locations: [org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec@243c4f91] for table transcript
Pushing segment: transcript_OFFLINE_1570863600000_1572418800000_0 to location: http://localhost:9000 for table transcript
Sending request: http://localhost:9000/v2/segments?tableName=transcript to controller: nehas-mbp.hsd1.ca.comcast.net, version: Unknown
Response for pushing table transcript segment transcript_OFFLINE_1570863600000_1572418800000_0 to location http://localhost:9000 - 200: {"status":"Successfully uploaded segment: transcript_OFFLINE_1570863600000_1572418800000_0 of table: transcript"}
This page covers everything you need to know about how queries are computed in Pinot's distributed systems architecture.
This page will introduce you to the guiding principles behind the design of Apache Pinot. Here you will learn the distributed systems architecture that allows Pinot to scale the performance of queries linearly based on the number of nodes in a cluster. You'll also be introduced to the two different types of tables used to ingest and query data in offline (batch) or real-time (stream) mode.
Pinot was designed by engineers at LinkedIn and Uber to scale query performance based on the number of nodes in a cluster. As you add more nodes, query performance will always improve based on the expected query volume per second quota. To achieve horizontal scalability to an unbounded number of nodes and data storage, without performance degradation, the following guiding design principles were established.
Highly available: Pinot is built to serve low latency analytical queries for customer facing applications. By design, there is no single point of failure in Pinot. The system continues to serve queries when a node goes down.
Horizontally scalable: Ability to scale by adding new nodes as a workload changes.
Latency vs Storage: Pinot is built to provide low latency even at high-throughput. Features such as segment assignment strategy, routing strategy, star-tree indexing were developed to achieve this.
Immutable data: Pinot assumes that all data stored is immutable. For GDPR compliance, we provide an add-on solution for purging data while maintaining performance guarantees.
Dynamic configuration changes: Operations such as adding new tables, expanding a cluster, ingesting data, modifying indexing config, and re-balancing must be performed without impacting query availability or performance.
As described in the concepts, Pinot has multiple distributed system components: Controller, Broker, Server, and Minion.
Pinot uses Apache Helix for cluster management. Helix is embedded as an agent within the different components and uses Apache Zookeeper for coordination and maintaining the overall cluster state and health.
All Pinot servers and brokers are managed by Helix. Helix is a generic cluster management framework to manage partitions and replicas in a distributed system. It's helpful to think of Helix as an event-driven discovery service with push and pull notifications that drives the state of a cluster to an ideal configuration. A finite-state machine maintains a contract of stateful operations that drives the health of the cluster towards its optimal configuration. Query load is optimized as Helix updates routing configurations between nodes based on where data is stored in the cluster.
Helix divides nodes into three logical components based on their responsibilities:
Participant: These are the nodes in the cluster that actually host the distributed storage resources.
Spectator: These nodes observe the current state of each participant and routes requests accordingly. Routers, for example, need to know the instance on which a partition is hosted and its state in order to route the request to the appropriate endpoint. Routing is continually being changed to optimize cluster performance as storage primitives are added and changed.
Controller: The controller observes and manages the state of participant nodes. The controller is responsible for coordinating all state transitions in the cluster and ensures that state constraints are satisfied while maintaining cluster stability.
Helix uses Zookeeper to maintain cluster state. Each component in a Pinot cluster takes a Zookeeper address as a startup parameter. The various components that are distributed in a Pinot cluster will watch Zookeeper notifications and issue updates via its embedded Helix-defined agent.
Component
Helix Mapping
Segment
Modeled as a Helix Partition. Each can have multiple copies referred to as Replicas.
Table
Modeled as a Helix Resource. Multiple segments are grouped into a . All segments belonging to a Pinot Table have the same schema.
Controller
Embeds the Helix agent that drives the overall state of the cluster.
Server
is modeled as a Helix Participant and hosts .
Broker
Broker is modeled as a Helix Spectator that observes the cluster for changes in the state of segments and servers. In order to support multi-tenancy, brokers are also modeled as Helix Participants.
Minion
Pinot Minion is modeled as a Helix Participant.
Helix agents use Zookeeper to store and update configurations, as well as for distributed coordination. Zookeeper stores the following information about the cluster:
Resource
Stored Properties
Controller
The controller that is assigned as the current leader
Servers/Brokers
A list of servers/brokers and their configuration
Health status
Tables
List of tables
Table configurations
Table schema information
List of segments within a table
Segment
Exact server location(s) of a segment (routing table)
State of each segment (online/offline/error/consuming)
Meta data about each segment
Knowing the ZNode layout structure in Zookeeper for Helix agents in a cluster is useful for operations and/or troubleshooting cluster state and health.
Pinot's controller acts as the driver of the cluster's overall state and health. Because of its role as a Helix participant and spectator, which drives the state of other components, it is the first component that is typically started after Zookeeper. Two parameters are required for starting a controller: Zookeeper address and cluster name. The controller will automatically create a cluster via Helix if it does not yet exist.
To achieve fault tolerance, one can start multiple controllers (typically three) and one of them will act as a leader. If the leader crashes or dies, another leader is automatically elected. Leader election is achieved using Apache Helix. Having at-least one controller is required to perform any DDL equivalent operation on the cluster, such as adding a table or a segment.
The controller does not interfere with query execution. Query execution is not impacted even when all controllers nodes are offline. If all controller nodes are offline, the state of the cluster will stay as it was when the last leader went down. When a new leader comes online, a cluster resumes re-balancing activity and can accept new tables or segments.
The controller provides a REST interface to perform CRUD operations on all logical storage resources (servers, brokers, tables, and segments).
The responsibility of the broker is to route a given query to an appropriate server instance. A broker will collect and merge the responses from all servers into a final result and send it back to the requesting client. The broker provides HTTP endpoints that accept SQL queries and returns the response in JSON format.
Brokers need three key things to start.
Cluster name
Zookeeper address
Broker instance name
At the start, a broker registers as a Helix Participant and awaits notifications from other Helix agents. These notifications will be handled for table creation, a new segment being loaded, or a server starting up/or going down, in addition to any configuration changes.
Service Discovery/Routing Table
Irrespective of the kind of notification, the key responsibility of a broker is to maintain the query routing table. The query routing table is simply a mapping between segments and the servers that a segment resides on. Typically, a segment resides on more than one server. The broker computes multiple routing tables depending on the configured routing strategy for a table. The default strategy is to balance the query load across all available servers.
//This is an example ZNode config for EXTERNAL VIEW in Helix
{
"id" : "baseballStats_OFFLINE",
"simpleFields" : {
...
},
"mapFields" : {
"baseballStats_OFFLINE_0" : {
"Server_10.1.10.82_7000" : "ONLINE"
}
},
...
}
Query processing
For every query, a cluster's broker performs the following:
Fetches the routes that are computed for a query based on the routing strategy defined in a table's configuration.
Computes the list of segments to query from on each server.
Scatter-Gather: sends the requests to each server and gathers the responses.
Merge: merges the query results returned from each server.
Sends the query result to the client.
// Query: select count(*) from baseballStats limit 10
// RESPONSE
// ========
{
"resultTable": {
"dataSchema": {
"columnDataTypes": ["LONG"],
"columnNames": ["count(*)"]
},
"rows": [
[97889]
]
},
"exceptions": [],
"numServersQueried": 1,
"numServersResponded": 1,
"numSegmentsQueried": 1,
"numSegmentsProcessed": 1,
"numSegmentsMatched": 1,
"numConsumingSegmentsQueried": 0,
"numDocsScanned": 97889,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 0,
"numGroupsLimitReached": false,
"totalDocs": 97889,
"timeUsedMs": 5,
"segmentStatistics": [],
"traceInfo": {},
"minConsumingFreshnessTimeMs": 0
}
Fault tolerance
Broker instances scale horizontally without an upper bound. In a majority of cases, only three brokers are required. If most query results that are returned to a client are <1MB in size per query, one can run a broker and servers inside the same instance container. This lowers the overall footprint of a cluster deployment for use cases that do not need to guarantee a strict SLA on query performance in production.
Servers host segments and do most of the heavy lifting during query processing. Though the architecture shows that there are two kinds of servers, real-time and offline, a server does not really know if it's going to be a real-time server or an offline server. The responsibility of a server depends on the table assignment strategy.
Offline servers
Offline servers typically host segments that are immutable. In this case, segments are created outside of a cluster and uploaded via a shell-based curl request. Based on the replication factor and the segment assignment strategy, the controller picks one or more servers to host the segment. Servers are notified via Helix about the new segments. Servers fetch the segments from deep store and loads them before being ready to serve query requests. At this point, the cluster's broker detects that new segments are available and starts including them in query responses.
Real-time servers
Real-time servers are different from the offline servers. Real-time server nodes ingest data from streaming sources, such as Kafka, and generate the indexed segments in-memory (flushing segments to disk periodically). In memory segments are also known as consuming segments. These consuming segments get flushed periodically based on completion threshold (based on number of rows, time or segment size). At this point, they are known as completed segments. Completed segments are similar to the offline server's segments. Queries go over the in-flight (consuming) segments and the completed segments.
Minion is an optional component and is not required to get started with Pinot. Minion is used for purging data from a Pinot cluster (for reasons such as GDPR compliance in the UK).
Within Pinot, a logical table is modeled as one of two types of physical tables: offline or real-time. The reason for having two types of tables is because each one follows a different state model.
A real-time and offline table provide different configuration options for indexing and, in the case of real-time, the connector properties for the stream data source (i.e. Kafka). Table types also allow users to use different containers for real-time and offline server nodes. For instance, offline servers might use virtual machines with larger storage capacity where real-time servers might need higher system memory and/or more CPU cores.
The two types of tables also scale differently.
Real-time tables have a smaller retention period and scales query performance based on the ingestion rate.
Offline tables have larger retention and scales performance based on the size of stored data.
There are a few things to keep in mind when configuring the different types of tables for your workloads. When ingesting data from the same source, you can have two tables that ingest the same data that are configured differently for real-time and offline queries. Even though the two tables have the same data, performance will scale differently for queries based on your requirements. In this scenario, real-time and offline tables must share the same schema.
In batch mode, data is ingested into Pinot via an ingestion job. An ingestion job transforms a raw data source (such as a CSV file) into segments. Once segments are generated for the imported data, an ingestion job stores them into the cluster's segment store (a.k.a deep store) and notifies the controller. The notification is processed and the result is that the Helix agent on the controller updates the ideal state configuration in Zookeeper. Helix will then notify the offline server that there are new segments available. In response to the notification from the controller, the offline server downloads the newly created segments directly from the cluster's segment store. The cluster's broker, which watches for state changes in Helix, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
At table creation, a controller creates a new entry in Zookeeper for the consuming segment. Helix notices the new segment and notifies the real-time server, which start consuming data from the streaming source. The broker, which watches for changes, detects the new segments and adds them to the list of segments to query (segment-to-server routing table).
Whenever the segment is complete (i.e. full), the real-time server notifies the Controller, which checks with all replicas and picks a winner to commit the segment to. The winner commits the segment and uploads it to the cluster's segment store, updating the state of the segment from "consuming" to "online". The controller then prepares a new segment in a "consuming" state.
Queries are received by brokers—which checks the request against the segment-to-server routing table—scattering the request between real-time and offline servers.
The two tables then process the request by filtering and aggregating the queried data, which is then returned back to the broker. Finally, the broker gathers together all of the pieces of the query response and responds back to the client with the result.
0.3.0 release of Apache Pinot introduces the concept of plugins that makes it easy to extend and integrate with other systems.
The reason behind the architectural change from the previous release (0.2.0) and this release (0.3.0), is the possibility of extending Apache Pinot. The 0.2.0 release was not flexible enough to support new storage types nor new stream types. Basically, inserting a new functionality required to change too much code. Thus, the Pinot team went through an extensive refactoring and improvement of the source code.
For instance, the picture below shows the module dependencies of the 0.2.X or previous releases. If we wanted to support a new storage type, we would have had to change several modules. Pretty bad, huh?
In order to conquer this challenge, below major changes are made:
Refactored common interfaces to pinot-spi
module
Concluded four types of modules:
Pinot input format: How to read records from various data/file formats: e.g. Avro
/CSV
/JSON
/ORC
/Parquet
/Thrift
Pinot filesystem: How to operate files on various filesystems: e.g. Azure Data Lake
/Google Cloud Storage
/S3
/HDFS
Pinot stream ingestion: How to ingest data stream from various upstream systems, e.g. Kafka
/Kinesis
/Eventhub
Pinot batch ingestion: How to run Pinot batch ingestion jobs in various frameworks, like Standalone
, Hadoop
, Spark
.
Built shaded jars for each individual plugin
Added support to dynamically load pinot plugins at server startup time
Now the architecture supports a plug-and-play fashion, where new tools can be supported with little and simple extensions, without affecting big chunks of code. Integrations with new streaming services and data formats can be developed in a much more simple and convenient way.
SQL Support
Added support for DISTINCT
(#4535)
Added support default value for BYTES
column (#4583)
JDK 11
Support
Added support to tune size vs accuracy for approximation aggregation functions: DistinctCountHLL
, PercentileEst
, PercentileTDigest
(#4666)
Added Data Anonymizer Tool (#4747)
Deprecated pinot-hadoop
and pinot-spark
modules, replace with pinot-batch-ingestion-hadoop
and pinot-batch-ingestion-spark
Support STRING
and BYTES
for no dictionary columns in realtime consuming segments (#4791)
Make pinot-distribution
to build a pinot-all jar and assemble it (#4977)
Added support for PQL case insensitive (#4983)
Added experimental support for Text Search (#4993)
Upgraded Helix to version 0.9.4, task management now works as expected (#5020)
Added date_trunc
transformation function. (#4740)
Support schema evolution for consuming segment. (#4954)
APIs Additions/Changes
Pinot Controller Rest APIs
Get Table leader controller resource (#4545)
Support HTTP POST
/PUT
to upload JSON encoded schema (#4639)
Table rebalance API now requires both table name and type as parameters. (#4824)
Refactored Segments APIs (#4806)
Added segment batch deletion REST API (#4828)
Update schema API to reload table on schema change when applicable (#4838)
Enhance the task related REST APIs (#5054)
Added PinotClusterConfig REST APIs (#5073)
GET /cluster/configs
POST /cluster/configs
DELETE /cluster/configs/{configName}
Configurations Additions/Changes
Config: controller.host
is now optional in Pinot Controller
Added instance config: queriesDisabled
to disable query sending to a running server (#4767)
Added broker config: pinot.broker.enable.query.limit.override
configurable max query response size (#5040)
Removed deprecated server configs (#4903)
pinot.server.starter.enableSegmentsLoadingCheck
pinot.server.starter.timeoutInSeconds
pinot.server.instance.enable.shutdown.delay
pinot.server.instance.starter.maxShutdownWaitTime
pinot.server.instance.starter.checkIntervalTime
Decouple server instance id with hostname/port config. (#4995)
Add FieldConfig to encapsulate encoding, indexing info for a field.(#5006)
Fixed the bug of releasing the segment when there are still threads working on it. (#4764)
Fixed the bug of uneven task distribution for threads (#4793)
Fixed encryption for .tar.gz
segment file upload (#4855)
Fixed controller rest API to download segment from non local FS. (#4808)
Fixed the bug of not releasing segment lock if segment recovery throws exception (#4882)
Fixed the issue of server not registering state model factory before connecting the Helix manager (#4929)
Fixed the exception in server instance when Helix starts a new ZK session (#4976)
Fixed ThreadLocal DocIdSet issue in ExpressionFilterOperator (#5114)
Fixed the bug in default value provider classes (#5137)
Fixed the bug when no segment exists in RealtimeSegmentSelector (#5138)
We are in the process of supporting text search query functionalities.
It’s a disruptive upgrade from version 0.1.0 to this because of the protocol changes between Pinot Broker and Pinot Server. Please ensure that you upgrade to release 0.2.0 first, then upgrade to this version.
If you build your own startable or war without using scripts generated in Pinot-distribution module. For Java 8, an environment variable “plugins.dir” is required for Pinot to find out where to load all the Pinot plugin jars. For Java 11, plugins directory is required to be explicitly set into classpath. Please see pinot-admin.sh
as an example.
As always, we recommend that you upgrade controllers first, and then brokers and lastly the servers in order to have zero downtime in production clusters.
Kafka 0.9 is no longer included in the release distribution.
Pull request #4806 introduces a backward incompatible API change for segments management.
Removed segment toggle APIs
Removed list all segments in cluster APIs
Deprecated below APIs:
GET /tables/{tableName}/segments
GET /tables/{tableName}/segments/metadata
GET /tables/{tableName}/segments/crc
GET /tables/{tableName}/segments/{segmentName}
GET /tables/{tableName}/segments/{segmentName}/metadata
GET /tables/{tableName}/segments/{segmentName}/reload
POST /tables/{tableName}/segments/{segmentName}/reload
GET /tables/{tableName}/segments/reload
POST /tables/{tableName}/segments/reload
Pull request #5054 deprecated below task related APIs:
GET:
/tasks/taskqueues
: List all task queues
/tasks/taskqueuestate/{taskType}
-> /tasks/{taskType}/state
/tasks/tasks/{taskType}
-> /tasks/{taskType}/tasks
/tasks/taskstates/{taskType}
-> /tasks/{taskType}/taskstates
/tasks/taskstate/{taskName}
-> /tasks/task/{taskName}/taskstate
/tasks/taskconfig/{taskName}
-> /tasks/task/{taskName}/taskconfig
PUT:
/tasks/scheduletasks
-> POST
/tasks/schedule
/tasks/cleanuptasks/{taskType}
-> /tasks/{taskType}/cleanup
/tasks/taskqueue/{taskType}
: Toggle a task queue
DELETE:
/tasks/taskqueue/{taskType}
-> /tasks/{taskType}
Deprecated modules pinot-hadoop
and pinot-spark
and replaced with pinot-batch-ingestion-hadoop
and pinot-batch-ingestion-spark
.
Introduced new Pinot batch ingestion jobs and yaml based job specs to define segment generation jobs and segment push jobs.
You may see exceptions like below in pinot-brokers during cluster upgrade, but it's safe to ignore them.
2020/03/09 23:37:19.879 ERROR [HelixTaskExecutor] [CallbackProcessor@b808af5-pinot] [pinot-broker] [] Message cannot be processed: 78816abe-5288-4f08-88c0-f8aa596114fe, {CREATE_TIMESTAMP=1583797034542, MSG_ID=78816abe-5288-4f08-88c0-f8aa596114fe, MSG_STATE=unprocessable, MSG_SUBTYPE=REFRESH_SEGMENT, MSG_TYPE=USER_DEFINE_MSG, PARTITION_NAME=fooBar_OFFLINE, RESOURCE_NAME=brokerResource, RETRY_COUNT=0, SRC_CLUSTER=pinot, SRC_INSTANCE_TYPE=PARTICIPANT, SRC_NAME=Controller_hostname.domain,com_9000, TGT_NAME=Broker_hostname,domain.com_6998, TGT_SESSION_ID=f6e19a457b80db5, TIMEOUT=-1, segmentName=fooBar_559, tableName=fooBar_OFFLINE}{}{}
java.lang.UnsupportedOperationException: Unsupported user defined message sub type: REFRESH_SEGMENT
at org.apache.pinot.broker.broker.helix.TimeboundaryRefreshMessageHandlerFactory.createHandler(TimeboundaryRefreshMessageHandlerFactory.java:68) ~[pinot-broker-0.2.1172.jar:0.3.0-SNAPSHOT-c9d88e47e02d799dc334d7dd1446a38d9ce161a3]
at org.apache.helix.messaging.handling.HelixTaskExecutor.createMessageHandler(HelixTaskExecutor.java:1096) ~[helix-core-0.9.1.509.jar:0.9.1.509]
at org.apache.helix.messaging.handling.HelixTaskExecutor.onMessage(HelixTaskExecutor.java:866) [helix-core-0.9.1.509.jar:0.9.1.509]
Response is returned in a SQL-like tabular structure. Note, this is the response returned from the standard-SQL endpoint. For PQL endpoint response, skip to PQL endpoint response
$ curl -H "Content-Type: application/json" -X POST \
-d '{"sql":"SELECT moo, bar, foo FROM myTable ORDER BY foo DESC"}' \
http://localhost:8099/query/sql
{
"exceptions": [],
"minConsumingFreshnessTimeMs": 0,
"numConsumingSegmentsQueried": 0,
"numDocsScanned": 6,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 18,
"numGroupsLimitReached": false,
"numSegmentsMatched": 2,
"numSegmentsProcessed": 2,
"numSegmentsQueried": 2,
"numServersQueried": 1,
"numServersResponded": 1,
"resultTable": {
"dataSchema": {
"columnDataTypes": [
"LONG",
"INT",
"STRING"
],
"columnNames": [
"moo",
"bar",
"foo"
]
},
"rows": [
[
40015,
2019,
"xyz"
],
[
1002,
2001,
"pqr"
],
[
20555,
1988,
"pqr"
],
[
203,
2010,
"pqr"
],
[
500,
2008,
"abc"
],
[
60,
2003,
"abc"
]
]
},
"segmentStatistics": [],
"timeUsedMs": 4,
"totalDocs": 6,
"traceInfo": {}
}
$ curl -X POST \
-d '{"sql":"SELECT SUM(moo), MAX(bar), COUNT(*) FROM myTable"}' \
localhost:8099/query/sql -H "Content-Type: application/json"
{
"exceptions": [],
"minConsumingFreshnessTimeMs": 0,
"numConsumingSegmentsQueried": 0,
"numDocsScanned": 6,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 12,
"numGroupsLimitReached": false,
"numSegmentsMatched": 2,
"numSegmentsProcessed": 2,
"numSegmentsQueried": 2,
"numServersQueried": 1,
"numServersResponded": 1,
"resultTable": {
"dataSchema": {
"columnDataTypes": [
"DOUBLE",
"DOUBLE",
"LONG"
],
"columnNames": [
"sum(moo)",
"max(bar)",
"count(*)"
]
},
"rows": [
[
62335,
2019.0,
6
]
]
},
"segmentStatistics": [],
"timeUsedMs": 87,
"totalDocs": 6,
"traceInfo": {}
}
$ curl -X POST \
-d '{"sql":"SELECT SUM(moo), MAX(bar) FROM myTable GROUP BY foo ORDER BY foo"}' \
localhost:8099/query/sql -H "Content-Type: application/json"
{
"exceptions": [],
"minConsumingFreshnessTimeMs": 0,
"numConsumingSegmentsQueried": 0,
"numDocsScanned": 6,
"numEntriesScannedInFilter": 0,
"numEntriesScannedPostFilter": 18,
"numGroupsLimitReached": false,
"numSegmentsMatched": 2,
"numSegmentsProcessed": 2,
"numSegmentsQueried": 2,
"numServersQueried": 1,
"numServersResponded": 1,
"resultTable": {
"dataSchema": {
"columnDataTypes": [
"STRING",
"DOUBLE",
"DOUBLE"
],
"columnNames": [
"foo",
"sum(moo)",
"max(bar)"
]
},
"rows": [
[
"abc",
560.0,
2008.0
],
[
"pqr",
21760.0,
2010.0
],
[
"xyz",
40015.0,
2019.0
]
]
},
"segmentStatistics": [],
"timeUsedMs": 15,
"totalDocs": 6,
"traceInfo": {}
}
where,
Response Field
Description
resultTable
This contains everything needed to process the response
resultTable.dataSchema
This describes schema of the response (columnNames and their dataTypes)
resultTable.dataSchema.columnNames
columnNames in the response.
resultTable.dataSchema.columnDataTypes
DataTypes for each column
resultTable.rows
Actual content with values. This is an array of arrays. number of rows depends on the limit value in the query. The number of columns in each row is equal to the length of (resultTable.dataSchema.columnNames)
timeUsedms
Total time taken as seen by the broker before sending the response back to the client
totalDocs
This is number of documents/records in the table
numServersQueried
represents the number of servers queried by the broker (note that this may be less than the total number of servers since broker can apply some optimizations to minimize the number of servers)
numServersResponded
This should be equal to the numServersQueried. If this is not the same, then one of more servers might have timed out. If numServersQueried != numServersResponded the results can be considered partial and clients can retry the query with exponential back off.
numSegmentsQueried
Total number of segmentsQueried for this query. it may be less than the total number of segments since broker can apply optimizations.
numSegmentsMatched
This is the number of segments actually processed. This indicates the effectiveness of pruning logic (based on partitioning, time etc).
numSegmentsProcessed
Actual number of segments that were processed. This is where the majority of the time is spent.
numDocScanned
The number of docs/records that were scanned to process the query. This includes the docs scanned in filter phase (this can be zero if columns in query are indexed) and post filter.
numEntriesScannedInFilter
This along with numEntriesScannedInPostFilter should give an idea on where most of the time is spent during query processing. If this is high, enabling indexing for columns in tableConfig can be one way to bring it down.
numEntriesScannedPostFilter
This along with numEntriesScannedInPostFilter should give an idea on where most of the time is spent during query processing. A high number for this means the selectivity is low (i.e. pinot needs to scan a lot of records to answer the query). If this is high, adding regular inverted/bitmap index will not help. However, consider using start-tree index.
numGroupsLimitReached
If the query has group by clause and top K, pinot drops new entries after the numGroupsLimit is reached. If this boolean is set to true then the query result may not be accurate. Note that the default value for numGroupsLimit is 100k and should be sufficient for most use cases.
exceptions
Will contain the stack trace if there is any exception processing the query.
segmentStatistics
N/A
traceInfo
If trace is enabled (can be enabled for each query), this will contain the timing for each stage and each segment. Advanced feature and intended for dev/debugging purposes
Note
PQL endpoint is deprecated, and will soon be removed. The standard sql endpoint is the recommended endpoint.
The response received from PQL endpoint is different depending on the type of the query.
curl -X POST \
-d '{"pql":"select * from flights limit 3"}' \
http://localhost:8099/query
{
"selectionResults":{
"columns":[
"Cancelled",
"Carrier",
"DaysSinceEpoch",
"Delayed",
"Dest",
"DivAirports",
"Diverted",
"Month",
"Origin",
"Year"
],
"results":[
[
"0",
"AA",
"16130",
"0",
"SFO",
[],
"0",
"3",
"LAX",
"2014"
],
[
"0",
"AA",
"16130",
"0",
"LAX",
[],
"0",
"3",
"SFO",
"2014"
],
[
"0",
"AA",
"16130",
"0",
"SFO",
[],
"0",
"3",
"LAX",
"2014"
]
]
},
"traceInfo":{},
"numDocsScanned":3,
"aggregationResults":[],
"timeUsedMs":10,
"segmentStatistics":[],
"exceptions":[],
"totalDocs":102
}
curl -X POST \
-d '{"pql":"select count(*) from flights"}' \
http://localhost:8099/query
{
"traceInfo":{},
"numDocsScanned":17,
"aggregationResults":[
{
"function":"count_star",
"value":"17"
}
],
"timeUsedMs":27,
"segmentStatistics":[],
"exceptions":[],
"totalDocs":17
}
curl -X POST \
-d '{"pql":"select count(*) from flights group by Carrier"}' \
http://localhost:8099/query
{
"traceInfo":{},
"numDocsScanned":23,
"aggregationResults":[
{
"groupByResult":[
{
"value":"10",
"group":["AA"]
},
{
"value":"9",
"group":["VX"]
},
{
"value":"4",
"group":["WN"]
}
],
"function":"count_star",
"groupByColumns":["Carrier"]
}
],
"timeUsedMs":47,
"segmentStatistics":[],
"exceptions":[],
"totalDocs":23
}
Learn how to query Pinot using PQL
PQL is a derivative of SQL that supports selection, projection, aggregation, and grouping aggregation.
PQL is only a derivative of SQL, and it does not support Joins nor Subqueries. In order to support them, we suggest to rely on PrestoDB https://prestodb.io/, although Subqueries are not completely supported by PrestoDB at the moment of writing.
The Pinot Query Language (PQL) is very similar to standard SQL:
SELECT COUNT(*) FROM myTable
SELECT COUNT(*), MAX(foo), SUM(bar) FROM myTable
SELECT MIN(foo), MAX(foo), SUM(foo), AVG(foo) FROM myTable
GROUP BY bar, baz LIMIT 50
SELECT MIN(foo), MAX(foo), SUM(foo), AVG(foo) FROM myTable
GROUP BY bar, baz
ORDER BY bar, MAX(foo) DESC LIMIT 50
SELECT COUNT(*) FROM myTable
WHERE foo = 'foo'
AND bar BETWEEN 1 AND 20
OR (baz < 42 AND quux IN ('hello', 'goodbye') AND quuux NOT IN (42, 69))
SELECT * FROM myTable
WHERE quux < 5
LIMIT 50
SELECT foo, bar FROM myTable
WHERE baz > 20
ORDER BY bar DESC
LIMIT 100
Note: results might not be consistent if column ordered by has same value in multiple rows.
SELECT foo, bar FROM myTable
WHERE baz > 20
ORDER BY bar DESC
LIMIT 50, 100
To count rows where the column airlineName
starts with U
SELECT count(*) FROM SomeTable
WHERE regexp_like(airlineName, '^U.*')
GROUP BY airlineName TOP 10
As of now, functions have to be implemented within Pinot. Injecting functions is not allowed yet. The example below demonstrate the use of UDFs. More examples in Transform Function in Aggregation Grouping
SELECT count(*) FROM myTable
GROUP BY dateTimeConvert(timeColumnName, '1:MILLISECONDS:EPOCH', '1:HOURS:EPOCH', '1:HOURS')
Pinot supports queries on BYTES column using HEX string. The query response also uses hex string to represent bytes value.
E.g. the query below fetches all the rows for a given UID.
SELECT * FROM myTable
WHERE UID = "c8b3bce0b378fc5ce8067fc271a34892"
The select statement is as follows
SELECT <outputColumn> (, outputColumn, outputColumn,...)
FROM <tableName>
(WHERE ... | GROUP BY ... | ORDER BY ... | TOP ... | LIMIT ...)
outputColumn
can be *
to project all columns, columns (foo
, bar
, baz
) or aggregation functions like (MIN(foo)
, MAX(bar)
, AVG(baz)
).
EQUALS
IN
NOT IN
GT
LT
BETWEEN
REGEXP_LIKE
For Multi-Valued columns, EQUALS is similar to CONTAINS.
COUNT
MIN
MAX
SUM
AVG
MINMAXRANGE
DISTINCT
DISTINCTCOUNT
DISTINCTCOUNTHLL
DISTINCTCOUNTRAWHLL
: Returns HLL response serialized as string. The serialized HLL can be converted back into an HLL (see pinot-core/**/HllUtil.java as an example) and then aggregated with other HLLs. A common use case may be to merge HLL responses from different Pinot tables, or to allow aggregation after client-side batching.
FASTHLL
(WARN: will be deprecated soon. FASTHLL
stores serialized HyperLogLog in String format, which performs worse than DISTINCTCOUNTHLL
, which supports serialized HyperLogLog in BYTES (byte array) format)
PERCENTILE[0-100]
: e.g. PERCENTILE5
, PERCENTILE50
, PERCENTILE99
, etc.
PERCENTILEEST[0-100]
: e.g. PERCENTILEEST5
, PERCENTILEEST50
, PERCENTILEEST99
, etc.
COUNTMV
MINMV
MAXMV
SUMMV
AVGMV
MINMAXRANGEMV
DISTINCTCOUNTMV
DISTINCTCOUNTHLLMV
DISTINCTCOUNTRAWHLLMV
: Returns HLL response serialized as string. The serialized HLL can be converted back into an HLL (see pinot-core/**/HllUtil.java as an example) and then aggregated with other HLLs. A common use case may be to merge HLL responses from different Pinot tables, or to allow aggregation after client-side batching.
FASTHLLMV
(WARN: will be deprecated soon. It does not make lots of sense to configure serialized HyperLogLog column as a dimension)
PERCENTILE[0-100]MV
: e.g. PERCENTILE5MV
, PERCENTILE50MV
, PERCENTILE99MV
, etc.
PERCENTILEEST[0-100]MV
: e.g. PERCENTILEEST5MV
, PERCENTILEEST50MV
, PERCENTILEEST99MV
, etc.
Supported predicates are comparisons with a constant using the standard SQL operators (=
, <
, <=
, >
, >=
, <>
, ‘!=’) , range comparisons using BETWEEN
(foo BETWEEN 42 AND 69
), set membership (foo IN (1, 2, 4, 8)
) and exclusion (foo NOT IN (1, 2, 4, 8)
). For BETWEEN
, the range is inclusive.
Comparison with a regular expression is supported using the regexp_like function, as in WHERE regexp_like(columnName, 'regular expression')
The GROUP BY
clause groups aggregation results by a list of columns, or transform functions on columns (see below)
The ORDER BY
clause orders selection results or group by results by a list of columns. PQL supports ordering DESC
or ASC
.
The TOP n
clause causes the ‘n’ largest group results to be returned. If not specified, the top 10 groups are returned.
The LIMIT n
clause causes the selection results to contain at most ‘n’ results. The LIMIT a, b
clause paginate the selection results from the ‘a’ th results and return at most ‘b’ results. By default, 10 records are returned in the result.
In aggregation and grouping, each column can be transformed from one or multiple columns. For example, the following query will calculate the maximum value of column foo
divided by column bar
grouping on the column time
converted from time unit MILLISECONDS
to SECONDS
:
SELECT MAX(DIV(foo, bar) FROM myTable
GROUP BY DATETIMECONVERT(time, '1:MILLISECONDS:EPOCH', '1:SECONDS:EPOCH', '1:SECONDS')
Function
Description
ADD
Sum of at least two values
SUB
Difference between two values
MULT
Product of at least two values
DIV
Quotient of two values
MOD
Modulo of two values
ABS
Absolute of a value
CEIL
Rounded up to the nearest integer.
FLOOR
Rounded down to the nearest integer.
EXP
exponential of
LN
Euler’s number raised to the power of x.
SQRT
Square root of a value
TIMECONVERT
Takes 3 arguments, converts the value into another time unit.
Examples
TIMECONVERT(time, 'MILLISECONDS', 'SECONDS')
- This expression converts the value of column time
(taken to be in milliseconds) to the nearest seconds (i.e. the nearest seconds that is lower than the value of date
column)
DATETIMECONVERT
Takes 4 arguments, converts the value into another date time format, and buckets time based on the given time granularity.
DATETIMECONVERT(columnName, inputFormat, outputFormat, outputGranularity)
where,
columnName
- column name to convert
inputFormat
- format of the column columnName
outputFormat
- format of the result desired after conversion outputGranularity
- the granularity in which to bucket the result
Format is expressed as <time size>:<time unit>:<time format>:<pattern>
where,
time size
- size of the time unit eg: 1, 10
time unit
- HOURS, DAYS etc
time format
- EPOCH or SIMPLE_DATE_FORMAT
pattern
- this is defined in case of SIMPLE_DATE_FORMAT. eg: yyyyMMdd. A specific timezone can be passed using tz(timezone).
timezone
- can be expressed as long form tz(Asia/Kolkata), or short form tz(IST) or in terms of GMT tz(GMT+0530). Default is UTC. It is recommended to use long form timezone, as short forms are ambiguous with daylight savings (eg: PDT works during daylight savings, PST otherwise)
Granularity is expressed as <time size>:<time unit>
Examples
1) To convert column "Date" from hoursSinceEpoch to daysSinceEpoch and bucket it to 1 day granularity
dateTimeConvert(Date, '1:HOURS:EPOCH', '1:DAYS:EPOCH', '1:DAYS')
2) To simply bucket millis "Date" to 15 minutes granularity
dateTimeConvert(Date, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '15:MINUTES')
3) To convert column "Date" from hoursSinceEpoch to format yyyyMdd and bucket it to 1 days granularity
dateTimeConvert(Date, '1:HOURS:EPOCH', '1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', '1:DAYS')
4) To convert column "Date" from format yyyy/MM/dd to weeksSinceEpoch and bucket it to 1 weeks granularity
dateTimeConvert(Date, '1:DAYS:SIMPLE_DATE_FORMAT:yyyy/MM/dd', '1:WEEKS:EPOCH', '1:WEEKS')
5) To convert column "Date" from millis to format yyyyMdd in timezone PST
dateTimeConvert(Date, '1:MILLISECONDS:EPOCH', '1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/Los_Angeles)', '1:DAYS')
DATETRUNC
(Presto) SQL compatible date truncation, equivalent to the Presto function . Takes at least 3 and upto 5 arguments, converts the value into a specified output granularity seconds since UTC epoch that is bucketed on a unit in a specified timezone.
Examples
DATETRUNC('week', time_in_seconds, 'SECONDS')
This expression converts the column time_in_seconds
, which is a long containing seconds since UTC epoch truncated at WEEK
(where a Week starts at Monday UTC midnight). The output is a long seconds since UTC epoch.
DATETRUNC('quarter', DIV(time_milliseconds/1000), 'SECONDS', 'America/Los_Angeles', 'HOURS')
This expression converts the expression time_in_milliseconds/1000
(which is thus in seconds) into hours that are truncated at QUARTER
at the Los Angeles time zone (where a Quarter begins on 1/1, 4/1, 7/1, 10/1 in Los Angeles timezone). The output is expressed as hours since UTC epoch (note that the output is not Los Angeles timezone)
ARRAYLENGTH
Returns the length of a multi-value column
VALUEIN
Takes at least 2 arguments, where the first argument is a multi-valued column, and the following arguments are constant values. The transform function will filter the value from the multi-valued column with the given constant values. The VALUEIN
transform function is especially useful when the same multi-valued column is both filtering column and grouping column.
Examples
VALUEIN(mvColumn, 3, 5, 15)
JSONEXTRACTSCALAR
JSONEXTRACTSCALAR(jsonField, 'jsonPath', 'resultsType')
evaluates the jsonPath
on jsonField
(a string containing JSON) and returns the result as a type resultsType
jsonFieldName
is a String field with Json document.
jsonPath
is a to read from JSON document
results_type
refers to the results data type, could be INT
, LONG
, FLOAT
, DOUBLE
, STRING
, INT_ARRAY
, LONG_ARRAY
, FLOAT_ARRAY
, DOUBLE_ARRAY
, STRING_ARRAY
.
Examples
JSONEXTRACTSCALAR(profile_json_str, '$.name', 'STRING') -> "bob"
JSONEXTRACTSCALAR(profile_json_str, '$.age', 'INT') -> 37
JSONEXTRACTKEY
JSONEXTRACTKEY(jsonField, 'jsonPath')
extracts all field names based on jsonPath
as a STRING_ARRAY.
jsonFieldName
is a String field with Json document.
jsonPath
is a to read from JSON document
Examples
JSONEXTRACTSCALAR(profile_json_str, '$.*') -> ["name", "age", "phone"...]
TOP
works like LIMIT
for truncation in group by queries
No need to select the columns to group with. The following two queries are both supported in PQL, where the non-aggregation columns are ignored.
SELECT MIN(foo), MAX(foo), SUM(foo), AVG(foo) FROM mytable
GROUP BY bar, baz
TOP 50
SELECT bar, baz, MIN(foo), MAX(foo), SUM(foo), AVG(foo) FROM mytable
GROUP BY bar, baz
TOP 50
The results will always order by the aggregated value (descending). The results for query
SELECT MIN(foo), MAX(foo) FROM myTable
GROUP BY bar
TOP 50
will be the same as the combining results from the following queries
SELECT MIN(foo) FROM myTable
GROUP BY bar
TOP 50
SELECT MAX(foo) FROM myTable
GROUP BY bar
TOP 50
where we don’t put the results for the same group together.
No support for ORDER BY in aggregation group by. However, ORDER BY support was added recently and is available in the standard-SQL endpoint. It can be used in the PQL endpoint by passing queryOptions
into the payload as follows
{
"pql" : "SELECT SUM(foo), SUM(bar) from myTable GROUP BY moo ORDER BY SUM(bar) ASC, moo DESC TOP 10",
"queryOptions" : "groupByMode=sql;responseFormat=sql"
}
where,
groupByMode=sql
- standard sql way of execution group by, hence accepting order by
responseFormat=sql
- standard sql way of displaying results, in a tabular manner
Links to all the design docs
This page contains links to all the design documents
Name
Authors
Date
Description
Neha
May 2020
Subbu
May 2020
Kishore
May 2020
Kishore
Apr 2020
Neha
Apr 2020
Yupeng
Apr 2020
Neha
Mar 2020
Neha
Mar 2020
Alex Pucher
Mar 2020
Siddharth Teotia
Nov 2019
Xiang Fu
Nov 2019
Jackie Jiang
Nov 2019
Neha
Oct 2019
Jialiang Li
Sep 2019
Jialiang Li
Jun 2019
James
Jun 2019
Sunitha Beeram
May 2019
Ting, Chinmay
May 2019
Jia Guo
May 2019
Subbu
Feb 2019
Seunghyun Lee
2018
Subbu
2017
Subbu
2017
Subbu
2016
Subbu
2016
This page talks about support for text search functionality in Pinot.
Pinot supports super fast query processing through its indexes on non-BLOB like columns. Queries with exact match filters are run efficiently through a combination of dictionary encoding, inverted index and sorted index. An example:
SELECT COUNT(*) FROM Foo WHERE STRING_COL = "ABCDCD" AND INT_COL > 2000
In the above query, we are doing exact match on two columns of type STRING and INT respectively.
For arbitrary text data which falls into the BLOB/CLOB territory, we need more than exact matches. Users are interested in doing regex, phrase, fuzzy queries on BLOB like data. Before 0.3.0, one had to use regexp_like to achieve this. However, this was scan based which was not performant and features like fuzzy search (edit distance search) were not possible.
In version 0.3.0, we added support for text indexes to efficiently do arbitrary search on STRING columns where each column value is a large BLOB of text. This can be achieved by using the new built-in function TEXT_MATCH.
SELECT COUNT(*) FROM Foo WHERE TEXT_MATCH (<column_name>, <search_expression)
where <column_name> is the column text index is created on and <search_expression> can be:
Search Expression Type
Example
Phrase query
TEXT_MATCH (<column_name>, '\"distributed system\"')
Term Query
TEXT_MATCH (<column_name>, 'Java')
Boolean Query
TEXT_MATCH (<column_name>, 'Java and c++')
Prefix Query
TEXT_MATCH (<column_name>, 'stream*')
Regex Query
TEXT_MATCH (<column_name>, '/Exception.*/')
Text search should ideally be used on STRING columns where doing standard filter operations (EQUALITY, RANGE, BETWEEN) doesn't fit the bill because each column value is a reasonably large blob of text.
Consider the following snippet from Apache access log. Each line in the log consists of arbitrary data (IP addresses, URLs, timestamps, symbols etc) and represents a column value. Data like this is a good candidate for doing text search.
Let's say the following snippet of data is stored in ACCESS_LOG_COL column in Pinot table.
109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-
109.169.248.247 - - [12/Dec/2015:18:25:11 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
46.72.177.4 - - [12/Dec/2015:18:31:08 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
83.167.113.100 - - [12/Dec/2015:18:31:25 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
83.167.113.100 - - [12/Dec/2015:18:31:25 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
95.29.198.15 - - [12/Dec/2015:18:32:10 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
95.29.198.15 - - [12/Dec/2015:18:32:11 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
109.184.11.34 - - [12/Dec/2015:18:32:56 +0100] "POST /administrator/index.php HTTP/1.1" 200 4494 "http://almhuette-raith.at/administrator/" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
91.227.29.79 - - [12/Dec/2015:18:33:51 +0100] "GET /administrator/ HTTP/1.1" 200 4263 "-" "Mozilla/5.0 (Windows NT 6.0; rv:34.0) Gecko/20100101 Firefox/34.0" "-"
Few examples of search queries on this data:
Count the number of GET requests.
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(ACCESS_LOG_COL, 'GET')
Count the number of POST requests that have administrator in the URL (administrator/index)
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(ACCESS_LOG_COL, 'post AND administrator AND index')
Count the number of POST requests that have a particular URL and handled by Firefox browser
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(ACCESS_LOG_COL, 'post AND administrator AND index AND firefox')
Consider another example of simple resume text. Each line in the file represents skill-data from resumes of different candidates
Let's say the following snippet of data is stored in SKILLS_COL column in Pinot table. Each line in the input text represents a column value.
Distributed systems, Java, C++, Go, distributed query engines for analytics and data warehouses, Machine learning, spark, Kubernetes, transaction processing
Java, Python, C++, Machine learning, building and deploying large scale production systems, concurrency, multi-threading, CPU processing
C++, Python, Tensor flow, database kernel, storage, indexing and transaction processing, building large scale systems, Machine learning
Amazon EC2, AWS, hadoop, big data, spark, building high performance scalable systems, building and deploying large scale production systems, concurrency, multi-threading, Java, C++, CPU processing
Distributed systems, database development, columnar query engine, database kernel, storage, indexing and transaction processing, building large scale systems
Distributed systems, Java, realtime streaming systems, Machine learning, spark, Kubernetes, distributed storage, concurrency, multi-threading
CUDA, GPU, Python, Machine learning, database kernel, storage, indexing and transaction processing, building large scale systems
Distributed systems, Java, database engine, cluster management, docker image building and distribution
Kubernetes, cluster management, operating systems, concurrency, multi-threading, apache airflow, Apache Spark,
Apache spark, Java, C++, query processing, transaction processing, distributed storage, concurrency, multi-threading, apache airflow
Big data stream processing, Apache Flink, Apache Beam, database kernel, distributed query engines for analytics and data warehouses
CUDA, GPU processing, Tensor flow, Pandas, Python, Jupyter notebook, spark, Machine learning, building high performance scalable systems
Distributed systems, Apache Kafka, publish-subscribe, building and deploying large scale production systems, concurrency, multi-threading, C++, CPU processing, Java
Realtime stream processing, publish subscribe, columnar processing for data warehouses, concurrency, Java, multi-threading, C++,
Few examples of search queries on this data:
Count the number of candidates that have "machine learning" and "gpu processing" - a phrase search (more on this further in the document) where we are looking for exact match of phrases "machine learning" and "gpu processing" not necessarily in the same order in original data.
SELECT SKILLS_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_COL, '\"Machine learning\" AND \"gpu processing\"')
Count the number of candidates that have "distributed systems" and either 'Java' or 'C++' - a combination of searching for exact phrase "distributed systems" along with other terms.
SELECT SKILLS_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_COL, '\"distributed systems\" AND (Java C++)')
Consider a snippet from a log file containing SQL queries handled by a database. Each line (query) in the file represents a column value in QUERY_LOG_COL column in Pinot table.
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560988800000 AND 1568764800000 GROUP BY dimensionCol3 TOP 2500
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560988800000 AND 1568764800000 GROUP BY dimensionCol3 TOP 2500
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1545436800000 AND 1553212800000 GROUP BY dimensionCol3 TOP 2500
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1537228800000 AND 1537660800000 GROUP BY dimensionCol3 TOP 2500
SELECT dimensionCol2, dimensionCol4, timestamp, dimensionCol5, dimensionCol6 FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1561366800000 AND 1561370399999 AND dimensionCol3 = 2019062409 LIMIT 10000
SELECT dimensionCol2, dimensionCol4, timestamp, dimensionCol5, dimensionCol6 FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1563807600000 AND 1563811199999 AND dimensionCol3 = 2019072215 LIMIT 10000
SELECT dimensionCol2, dimensionCol4, timestamp, dimensionCol5, dimensionCol6 FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1563811200000 AND 1563814799999 AND dimensionCol3 = 2019072216 LIMIT 10000
SELECT dimensionCol2, dimensionCol4, timestamp, dimensionCol5, dimensionCol6 FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1566327600000 AND 1566329400000 AND dimensionCol3 = 2019082019 LIMIT 10000
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560834000000 AND 1560837599999 AND dimensionCol3 = 2019061805 LIMIT 0
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560870000000 AND 1560871800000 AND dimensionCol3 = 2019061815 LIMIT 0
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560871800001 AND 1560873599999 AND dimensionCol3 = 2019061815 LIMIT 0
SELECT count(dimensionCol2) FROM FOO WHERE dimensionCol1 = 18616904 AND timestamp BETWEEN 1560873600000 AND 1560877199999 AND dimensionCol3 = 2019061816 LIMIT 0
Few examples of search queries on this data:
Count the number of queries that have GROUP BY
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(QUERY_LOG_COL, '\"group by\"')
Count the number of queries that have the SELECT count... pattern
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(QUERY_LOG_COL, '\"select count\"')
Count the number of queries that use BETWEEN filter on timestamp column along with GROUP BY
SELECT COUNT(*) FROM MyTable WHERE TEXT_MATCH(QUERY_LOG_COL, '\"timestamp between\" AND \"group by\"')
Further sections in the document cover several concrete examples on each kind of query and step-by-step guide on how to write text search queries in Pinot.
Currently we support text search in a restricted manner. More specifically, we have the following constraints:
The column type should be STRING.
The column should be single-valued.
Co-existence of text index with other Pinot indexes is currently not supported.
The last two restrictions are going to be relaxed very soon in the upcoming releases.
Currently, a column in Pinot can be dictionary encoded or stored RAW. Furthermore, we can create inverted index on the dictionary encoded column. We can also create a sorted index on the dictionary encoded column.
Text index is an addition to the type of per-column indexes users can create in Pinot. However, the current implementation supports text index on RAW column. In other words, the column should not be dictionary encoded. As we relax this constraint in upcoming releases, text index can be created on a dictionary encoded column that also has other indexes (inverted, sorted etc).
Similar to other indexes, users can enable text index on a column through table config. As part of text-search feature, we have also introduced a new generic way of specifying the per-column encoding and index information. In the table config, there will be a new section with name "fieldConfigList".
IMPORTANT: This mechanism of using "fieldConfigList" is currently ONLY used for text indexes. Our plan is to migrate all other indexes to this model. We are going to do that in upcoming releases and accordingly user documentation and new guidelines will be published. So please continue to specify other index info in table config as you have done till now and use the "fieldConfigList" only for text indexes.
"fieldConfigList":[
{
"name":"text_col_1",
"encodingType":"RAW",
"indexType":"TEXT"
},
{
"name":"text_col_2",
"encodingType":"RAW",
"indexType":"TEXT"
}
]
"fieldConfigList" will be a new section in table config. It is essentially a list of per-column encoding and index information. In the above example, the list contains text index information for two columns text_col_1 and text_col_2. Each object in fieldConfigList contains the following information
name - Name of the column text index is enabled on
encodingType - As mentioned earlier, we can store a column either as RAW or dictionary encoded. Since for now we have a restriction on the text index, this should always be RAW.
indexType - This should be TEXT.
Also, since we haven't yet removed the old way of specifying the index info, each column that text index is enabled on should also be specified in noDictionaryColumns in tableIndexConfig
"tableIndexConfig": {
"noDictionaryColumns": [
"text_col_1",
"text_col_2"
]}
The above mechanism should allow the user to use text index in all of the following scenarios:
Adding new table with text index enabled on one or more columns.
Adding a new column with text index enabled to an existing table.
Enabling text index on an existing column.
Since we haven't yet removed the old way of specifying the
Once the text index is enabled on one or more columns through table config, our segment generation code will pick up the config and automatically create text index (per column). This is exactly how other indexes in Pinot are created.
Text index is supported for both offline and realtime segments.
The original text document (a value in the column with text index enabled) is parsed, tokenized and individual "indexable" terms are extracted. These terms are inserted into the index.
Pinot's text index is built on top of Lucene. Lucene's standard english text tokenizer generally works well for most classes of text. We might want to build custom text parser and tokenizer to suit particular user requirements. Accordingly, we can make this configurable for the user to specify on per column text index basis.
A new built-in function TEXT_MATCH has been introduced for using text search in SQL/PQL.
TEXT_MATCH(text_column_name, search_expression)
text_column_name - name of the column to do text search on.
search_expression - search query
We can use TEXT_MATCH function as part of our queries in the WHERE clause. Examples:
SELECT COUNT(*) FROM Foo WHERE TEXT_MATCH(...)
SELECT * FROM Foo WHERE TEXT_MATCH(...)
We can also use the TEXT_MATCH filter clause with other filter operators. For example:
SELECT COUNT(*) FROM Foo WHERE TEXT_MATCH(...) AND some_other_column_1 > 20000
SELECT COUNT(*) FROM Foo WHERE TEXT_MATCH(...) AND some_other_column_1 > 20000 AND some_other_column_2 < 100000
Combining multiple TEXT_MATCH filter clauses
SELECT COUNT(*) FROM Foo WHERE TEXT_MATCH(text_col_1, ....) AND TEXT_MATCH(text_col_2, ...)
TEXT_MATCH can be used in WHERE clause of all kinds of queries supported by Pinot
Selection query which projects one or more columns
User can also include the text column name in select list
Aggregation query
Aggregation GROUP BY query
The search expression (second argument to TEXT_MATCH function) is the query string that Pinot will use to perform text search on the column's text index. **Following expression types are supported
This query is used to do exact match of a given phrase. Exact match implies that terms in the user specified phrase should appear in the exact same order in the original text document. Note that document is referred to as the column value.
Let's take the example of resume text data containing 14 documents to walk through queries. The data is stored in column named SKILLS_COL and we have created a text index on this column.
Java, C++, worked on open source projects, coursera machine learning
Machine learning, Tensor flow, Java, Stanford university,
Distributed systems, Java, C++, Go, distributed query engines for analytics and data warehouses, Machine learning, spark, Kubernetes, transaction processing
Java, Python, C++, Machine learning, building and deploying large scale production systems, concurrency, multi-threading, CPU processing
C++, Python, Tensor flow, database kernel, storage, indexing and transaction processing, building large scale systems, Machine learning
Amazon EC2, AWS, hadoop, big data, spark, building high performance scalable systems, building and deploying large scale production systems, concurrency, multi-threading, Java, C++, CPU processing
Distributed systems, database development, columnar query engine, database kernel, storage, indexing and transaction processing, building large scale systems
Distributed systems, Java, realtime streaming systems, Machine learning, spark, Kubernetes, distributed storage, concurrency, multi-threading
CUDA, GPU, Python, Machine learning, database kernel, storage, indexing and transaction processing, building large scale systems
Distributed systems, Java, database engine, cluster management, docker image building and distribution
Kubernetes, cluster management, operating systems, concurrency, multi-threading, apache airflow, Apache Spark,
Apache spark, Java, C++, query processing, transaction processing, distributed storage, concurrency, multi-threading, apache airflow
Big data stream processing, Apache Flink, Apache Beam, database kernel, distributed query engines for analytics and data warehouses
CUDA, GPU processing, Tensor flow, Pandas, Python, Jupyter notebook, spark, Machine learning, building high performance scalable systems
Distributed systems, Apache Kafka, publish-subscribe, building and deploying large scale production systems, concurrency, multi-threading, C++, CPU processing, Java
Realtime stream processing, publish subscribe, columnar processing for data warehouses, concurrency, Java, multi-threading, C++,
C++, Java, Python, realtime streaming systems, Machine learning, spark, Kubernetes, transaction processing, distributed storage, concurrency, multi-threading, apache airflow
Databases, columnar query processing, Apache Arrow, distributed systems, Machine learning, cluster management, docker image building and distribution
Database engine, OLAP systems, OLTP transaction processing at large scale, concurrency, multi-threading, GO, building large scale systems
Example 1 - Search in SKILL_COL column to look for documents where each matching document MUST contain phrase "distributed systems" as is
SELECT SKILLS_COL FROM MyTable WHERE TEXT_MATCH(SKILLS_COL, '\"Distributed systems\"')
The search expression is '\"Distributed systems\"'
The search expression is always specified within single quotes '<your expression>'
Since we are doing a phrase search, the phrase should be specified within double quotes inside the single quotes and the double quotes should be escaped
'\"<your phrase>\"'
The above query will match the following documents:
Distributed systems, Java, C++, Go, distributed query engines for analytics and data warehouses, Machine learning, spark, Kubernetes, transaction processing
Distributed systems, database development, columnar query engine, database kernel, storage, indexing and transaction processing, building large scale systems
Distributed systems, Java, realtime streaming systems, Machine learning, spark, Kubernetes, distributed storage, concurrency, multi-threading
Distributed systems, Java, database engine, cluster management, docker image building and distribution
Distributed systems, Apache Kafka, publish-subscribe, building and deploying large scale production systems, concurrency, multi-threading, C++, CPU processing, Java
Databases, columnar query processing, Apache Arrow, distributed systems, Machine learning, cluster management, docker image building and distribution